Skip to content

Grounded Sources

Grounded Sources

The grounded source abstraction provides a uniform interface for queryable data sources in retrieval pipelines. Sources receive structured intent and translate it deterministically to native queries — the LLM never generates query syntax.

Overview

Grounded sources solve the problem of querying heterogeneous data backends (vector stores, SQL databases, Elasticsearch, etc.) through a single retrieval pipeline. Each source:

  • Declares a schema of queryable dimensions (optional)
  • Receives structured intent (RetrievalIntent) from an intent resolution layer
  • Translates intent to native queries in deterministic code
  • Returns normalized results (SourceResult) regardless of backing store

This module lives in dataknobs-data (not dataknobs-bots) so any project using the data layer can define and query sources without depending on the LLM or bots packages.

Quick Start

from dataknobs_data.sources.base import (
    GroundedSource,
    RetrievalIntent,
    SourceResult,
)

# Create an intent (typically produced by an LLM or config)
intent = RetrievalIntent(
    text_queries=["OAuth grant types", "authorization code flow"],
    scope="focused",
)

# Query a source
results = await source.query(intent, top_k=5, score_threshold=0.3)
for result in results:
    print(f"[{result.relevance:.2f}] {result.source_name}: {result.content[:80]}")

Core Types

RetrievalIntent

Source-agnostic structured intent for retrieval.

from dataknobs_data.sources.base import RetrievalIntent

intent = RetrievalIntent(
    text_queries=["search phrase 1", "search phrase 2"],
    filters={
        "source_name": {"field": "value"},  # Keyed by source name
    },
    scope="focused",       # Retrieval breadth hint
    raw_data={},           # Full extraction dict for provenance
)

Attributes:

Attribute Type Description
text_queries list[str] Semantic search phrases. Always present.
filters dict[str, Any] Structured conditions keyed by source name.
scope str Retrieval breadth hint (e.g., "focused", "broad").
raw_data dict[str, Any] Full extraction dict, preserved for provenance.

SourceResult

Normalized result that all sources produce.

from dataknobs_data.sources.base import SourceResult

result = SourceResult(
    content="The authorization code grant type is used to...",
    source_id="chunk_42",
    source_name="knowledge_base",
    source_type="vector_kb",
    relevance=0.92,
    metadata={"heading_path": "Section 4.1 > Authorization Code"},
)

# Convert to dict for compatibility with existing formatters
result_dict = result.to_dict()

Attributes:

Attribute Type Description
content str Text content for inclusion in synthesis prompts.
source_id str Unique ID within the source (chunk ID, primary key, etc.).
source_name str Which GroundedSource produced this result.
source_type str Category string ("vector_kb", "database", etc.).
relevance float Score from 0.0 to 1.0. Default 1.0.
metadata dict[str, Any] Source-specific metadata (heading paths, field values, etc.).

SourceSchema

Schema fragment a source declares for intent extraction.

from dataknobs_data.sources.base import SourceSchema

schema = SourceSchema(
    source_name="case_db",
    fields={
        "category": {
            "type": "string",
            "enum": ["security", "compliance", "operations"],
        },
        "severity": {
            "type": "string",
            "enum": ["low", "medium", "high", "critical"],
        },
    },
    required_fields=["category"],
    description="Case study database with security incident reports",
)

GroundedSource ABC

All sources implement the GroundedSource abstract base class:

from dataknobs_data.sources.base import GroundedSource, RetrievalIntent, SourceResult

class MySource(GroundedSource):
    @property
    def name(self) -> str:
        return "my_source"

    @property
    def source_type(self) -> str:
        return "custom"

    def get_schema(self) -> SourceSchema | None:
        # Return None for text-only sources (default)
        # Return SourceSchema to declare filter dimensions
        return None

    async def query(
        self,
        intent: RetrievalIntent,
        *,
        top_k: int = 5,
        score_threshold: float = 0.0,
    ) -> list[SourceResult]:
        # Translate intent to native query — deterministic code, not LLM
        results = []
        for query_text in intent.text_queries:
            # Execute against your backing store
            hits = await self._search(query_text, limit=top_k)
            results.extend([
                SourceResult(
                    content=hit.text,
                    source_id=str(hit.id),
                    source_name=self.name,
                    source_type=self.source_type,
                    relevance=hit.score,
                )
                for hit in hits
                if hit.score >= score_threshold
            ])
        return sorted(results, key=lambda r: r.relevance, reverse=True)[:top_k]

    async def close(self) -> None:
        # Release resources (optional, default no-op)
        pass

Built-in Implementations

VectorKnowledgeSource (in dataknobs-bots)

VectorKnowledgeSource wraps an existing RAGKnowledgeBase as a GroundedSource. It lives in dataknobs-bots (not dataknobs-data) because it depends on the bot-layer KnowledgeBase class.

from dataknobs_bots.knowledge.sources.vector import VectorKnowledgeSource

source = VectorKnowledgeSource(knowledge_base)
results = await source.query(intent, top_k=5, score_threshold=0.3)

This is used automatically by the GroundedReasoning strategy when a bot has a configured knowledge base.

DatabaseSource

Wraps any AsyncDatabase backend with text search across configured fields.

from dataknobs_data.sources.database import DatabaseSource
from dataknobs_data.backends.memory import AsyncMemoryDatabase

db = AsyncMemoryDatabase()
source = DatabaseSource(
    db=db,
    name="case_studies",
    content_field="summary",
    text_search_fields=["title", "summary", "tags"],
)
results = await source.query(intent)

For each text query, DatabaseSource searches all configured text_search_fields for matching records. Structured filters from intent.filters[source_name] are also applied when present.

Relevance Scoring

DatabaseSource computes a term-coverage relevance score for each result rather than returning a flat 1.0. The score reflects what fraction of the query terms appear in the record's searchable fields:

  • The content field receives 2x weight (it's the primary field users care about)
  • Secondary text search fields receive 1x weight each
  • Score = matched_weight / total_weight, with a minimum floor of 0.05

This makes score_threshold meaningful for database sources — a record matching only 1 of 3 query terms in a secondary field scores lower than one matching all terms in the content field. Results are returned sorted by relevance descending.

Multi-Source Retrieval

When used with the GroundedReasoning strategy in dataknobs-bots, multiple sources are queried in parallel and results are merged via weighted round-robin. Each source has a configurable weight (default 1) that determines how many results it contributes per round-robin cycle. Results are deduplicated by (source_name, source_id) when enabled.

# In GroundedReasoning, sources are queried and merged:
results_by_source = await strategy._retrieve_from_sources(intent)
merged = strategy._merge_source_results(results_by_source)

Testing

Use AsyncMemoryDatabase for DatabaseSource tests and mock knowledge bases for VectorKnowledgeSource tests:

from dataknobs_data.backends.memory import AsyncMemoryDatabase
from dataknobs_data.sources.database import DatabaseSource
from dataknobs_data.sources.base import RetrievalIntent

db = AsyncMemoryDatabase()
await db.create(Record({"title": "OAuth Overview", "summary": "OAuth 2.0 is..."}))

source = DatabaseSource(db=db, name="docs", content_field="summary",
                        text_search_fields=["title", "summary"])

intent = RetrievalIntent(text_queries=["OAuth"])
results = await source.query(intent)
assert len(results) > 0