Skip to content

Schema Extraction

This guide covers LLM-based structured data extraction with JSON Schema validation and observability tracking.

Overview

The extraction module provides:

  • SchemaExtractor: LLM-powered structured data extraction
  • ExtractionTracker: Recording and querying extraction operations
  • ExtractionStats: Aggregated metrics for monitoring

All types are in dataknobs_llm.extraction.

SchemaExtractor

SchemaExtractor uses LLM providers to extract structured data from natural language text, validating against JSON Schema.

Basic Usage

from dataknobs_llm.extraction import SchemaExtractor, ExtractionResult
from dataknobs_llm.llm.providers.ollama import OllamaProvider

# Create provider
provider = OllamaProvider({
    "provider": "ollama",
    "model": "qwen3-coder",
    "temperature": 0.0,  # Deterministic for extraction
})

# Create extractor
extractor = SchemaExtractor(provider=provider)

# Define schema
schema = {
    "type": "object",
    "properties": {
        "name": {"type": "string"},
        "age": {"type": "integer"},
        "email": {"type": "string", "format": "email"},
    },
    "required": ["name"],
}

# Extract data
result = await extractor.extract(
    text="My name is Alice, I'm 30 years old, reach me at alice@example.com",
    schema=schema,
)

# Check result
if result.is_confident:
    print(result.data)
    # {"name": "Alice", "age": 30, "email": "alice@example.com"}
else:
    print(f"Low confidence: {result.confidence}")
    print(f"Errors: {result.errors}")

Creating from Configuration

# From config dict
extractor = SchemaExtractor.from_config({
    "provider": "ollama",
    "model": "qwen3-coder",
    "temperature": 0.0,
})

# Or using from_env_config alias
extractor = SchemaExtractor.from_env_config({
    "provider": "openai",
    "model": "gpt-3.5-turbo",
    "api_key": "${OPENAI_API_KEY}",
})

ExtractionResult

The extract() method returns an ExtractionResult:

@dataclass
class ExtractionResult:
    data: dict[str, Any]      # Extracted structured data
    confidence: float         # Score from 0.0 to 1.0
    errors: list[str]         # Validation errors
    raw_response: str         # Raw LLM response

    @property
    def is_confident(self) -> bool:
        """True if confidence >= 0.8 and no errors."""

Custom Extraction Prompts

Override the default extraction prompt:

custom_prompt = """Extract information from the user's message.

## Schema
{schema}

## Context
{context}

## Message
{text}

## Response (JSON only):"""

extractor = SchemaExtractor(
    provider=provider,
    extraction_prompt=custom_prompt,
)

Context for Extraction

Provide context to guide extraction:

result = await extractor.extract(
    text="Set it to large",
    schema=size_schema,
    context={
        "stage": "product_selection",
        "prompt": "Collecting size preference",
    },
)

Extraction Tracking

ExtractionTracker

ExtractionTracker records extraction operations for observability and debugging:

from dataknobs_llm.extraction import SchemaExtractor, ExtractionTracker

# Create tracker with default settings
tracker = ExtractionTracker(max_history=100)

# Or with custom text truncation (for memory efficiency)
tracker = ExtractionTracker(
    max_history=100,
    truncate_text_at=200,  # Truncate long text fields in records
)

# Extract with tracking
result = await extractor.extract(
    text="I'm Alice, 30 years old",
    schema=person_schema,
    tracker=tracker,  # Enable tracking
)

# Query history
all_records = tracker.query()
recent = tracker.get_recent(10)

# Get statistics
stats = tracker.get_stats()
print(f"Total: {stats.total_extractions}")
print(f"Success rate: {stats.success_rate:.1%}")
print(f"Avg confidence: {stats.avg_confidence:.2f}")
print(f"Avg duration: {stats.avg_duration_ms:.1f}ms")

# Clear history when needed
tracker.clear()

# Check history size
print(f"Records in history: {len(tracker)}")

ExtractionRecord

Each extraction creates an ExtractionRecord:

from dataknobs_llm.extraction import ExtractionRecord

record = ExtractionRecord(
    timestamp=1700000000.0,
    input_text="My name is Alice",
    extracted_data={"name": "Alice"},
    confidence=0.95,
    validation_errors=[],
    duration_ms=150.5,
    success=True,
    schema_name="Person",
    schema_hash="abc123",
    model_used="qwen3-coder",
    provider="ollama",
    context={"stage": "greeting"},
    raw_response='{"name": "Alice"}',
    input_length=17,
    truncated=False,
)

Querying History

Filter extraction history with ExtractionHistoryQuery:

from dataknobs_llm.extraction import ExtractionHistoryQuery

# Filter by schema
query = ExtractionHistoryQuery(schema_name="Person")
person_extractions = tracker.query(query)

# Filter by model
query = ExtractionHistoryQuery(model="gpt-4")
gpt4_extractions = tracker.query(query)

# Filter by success/failure
query = ExtractionHistoryQuery(success_only=True, min_confidence=0.9)
high_confidence = tracker.query(query)

# Filter by time range
query = ExtractionHistoryQuery(
    since=time.time() - 3600,  # Last hour
    until=time.time(),
    limit=50,
)
recent = tracker.query(query)

ExtractionStats

Get aggregated statistics:

stats = tracker.get_stats()

# Basic counts
print(f"Total: {stats.total_extractions}")
print(f"Successful: {stats.successful_extractions}")
print(f"Failed: {stats.failed_extractions}")
print(f"Success rate: {stats.success_rate:.1%}")

# Performance metrics
print(f"Avg confidence: {stats.avg_confidence:.2f}")
print(f"Avg duration: {stats.avg_duration_ms:.1f}ms")

# Time range
print(f"First: {stats.first_extraction}")
print(f"Last: {stats.last_extraction}")

# Breakdown by schema
for schema, count in stats.by_schema.items():
    print(f"  {schema}: {count}")

# Breakdown by model
for model, count in stats.by_model.items():
    print(f"  {model}: {count}")

# Most common errors
for error, count in stats.most_common_errors:
    print(f"  {error}: {count}")

Factory Function

Use create_extraction_record() for convenience:

from dataknobs_llm.extraction import create_extraction_record

record = create_extraction_record(
    input_text="My name is Alice and I'm 30 years old",
    extracted_data={"name": "Alice", "age": 30},
    confidence=0.95,
    validation_errors=[],
    duration_ms=150.5,
    schema=person_schema,  # Extracts title, computes hash
    model_used="qwen3-coder",
    provider="ollama",
    context={"stage": "greeting"},
    raw_response='{"name": "Alice", "age": 30}',
    truncate_at=200,  # Truncate long text fields
)

tracker.record(record)

Integration with Wizard Reasoning

Extraction tracking integrates with wizard reasoning for wizard-based data collection:

from dataknobs_bots.reasoning import WizardReasoning
from dataknobs_llm.extraction import ExtractionTracker

# Create tracker
tracker = ExtractionTracker()

# Configure wizard with extraction tracking
config = {
    "reasoning": {
        "strategy": "wizard",
        "wizard_config": "wizard.yaml",
        "extraction_config": {
            "provider": "ollama",
            "model": "qwen3-coder",
        },
    },
}

# Pass tracker to extract calls
# (WizardReasoning passes tracker to SchemaExtractor internally)

Best Practices

1. Use Low Temperature for Extraction

Set temperature to 0 for deterministic extraction:

extractor = SchemaExtractor.from_config({
    "provider": "ollama",
    "model": "qwen3-coder",
    "temperature": 0.0,  # Deterministic
})

2. Provide Clear Schema Titles

Include titles in schemas for better tracking:

schema = {
    "title": "PersonInfo",  # Used in ExtractionRecord.schema_name
    "type": "object",
    "properties": {...},
}

3. Monitor Success Rates

Track extraction success over time:

stats = tracker.get_stats()

if stats.success_rate < 0.8:
    log.warning(
        "Low extraction success rate: %.1f%% (%d/%d)",
        stats.success_rate * 100,
        stats.successful_extractions,
        stats.total_extractions,
    )

4. Analyze Common Errors

Use error statistics to improve prompts/schemas:

stats = tracker.get_stats()

for error, count in stats.most_common_errors[:5]:
    if "required field" in error.lower():
        log.info("Missing required field errors: %d", count)
    elif "json" in error.lower():
        log.info("JSON parsing errors: %d", count)

5. Set Appropriate History Limits

Configure tracker capacity based on needs:

# For debugging (short history)
debug_tracker = ExtractionTracker(max_history=20)

# For production monitoring
prod_tracker = ExtractionTracker(max_history=1000)

6. Manage Memory with Text Truncation

Long input texts and raw responses are truncated to save memory:

# Default truncation at 200 characters
tracker = ExtractionTracker(truncate_text_at=200)

# Store more context if needed
tracker = ExtractionTracker(truncate_text_at=500)

# Records indicate if truncation occurred
record = tracker.get_recent(1)[0]
if record.truncated:
    print(f"Original input was {record.input_length} chars")

API Reference

Classes

Class Description
SchemaExtractor LLM-powered structured data extraction
ExtractionResult Result from schema extraction
ExtractionTracker Manages extraction history with queries
ExtractionRecord Record of a single extraction operation
ExtractionStats Aggregated extraction statistics
ExtractionHistoryQuery Query parameters for filtering history

Factory Functions

Function Description
create_extraction_record() Create ExtractionRecord with auto-timestamp

ExtractionTracker Methods

Method Description
record(extraction) Add an extraction record to history
query(query) Filter history with ExtractionHistoryQuery
get_stats() Get aggregated ExtractionStats
get_recent(count) Get most recent N records
clear() Clear all extraction history
__len__() Return number of records in history

ExtractionTracker Constructor

Parameter Type Default Description
max_history int 100 Maximum records to retain
truncate_text_at int 200 Max length for text fields

Configuration Options

Option Type Description
provider string LLM provider name (required)
model string Model identifier (required)
temperature float Sampling temperature (default: 0.0)
api_key string API key if required
api_base string Custom API endpoint
extraction_prompt string Custom prompt template

See Also