RAG Query Processing¶
RAG Query Processing¶
This module provides query transformation and expansion utilities for improved RAG retrieval.
Overview¶
User queries often don't match well with embedded content: - Ambiguous follow-ups: "Show me an example" - Literal content to analyze: "Analyze this prompt: Write a poem" - Context-dependent questions: "What about the second option?"
Query processing transforms these into effective search queries.
Quick Start¶
from dataknobs_bots.knowledge.query import (
QueryTransformer,
TransformerConfig,
ContextualExpander,
is_ambiguous_query,
)
# Option 1: LLM-based transformation
transformer = QueryTransformer(TransformerConfig(
enabled=True,
llm_provider="ollama",
llm_model="llama3.2",
domain_context="prompt engineering"
))
await transformer.initialize()
queries = await transformer.transform("Analyze this: Write a poem")
# Option 2: No-LLM expansion using conversation context
expander = ContextualExpander(max_context_turns=3)
if is_ambiguous_query("Show me an example"):
expanded = expander.expand("Show me an example", conversation_history)
Two Approaches¶
QueryTransformer (LLM-based)¶
Uses an LLM to generate optimized search queries. Best for: - Complex queries needing interpretation - Domain-specific terminology - Multi-faceted questions - When latency is acceptable
ContextualExpander (No LLM)¶
Expands queries using conversation history keywords. Best for: - Ambiguous follow-up questions - Context-dependent queries - Low-latency requirements - When LLM calls should be minimized
QueryTransformer¶
Configuration¶
from dataknobs_bots.knowledge.query import TransformerConfig
config = TransformerConfig(
enabled=True, # Enable transformation
llm_provider="ollama", # LLM provider
llm_model="llama3.2", # Model to use
num_queries=3, # Number of queries to generate
domain_context="prompt engineering" # Domain hint
)
Basic Usage¶
from dataknobs_bots.knowledge.query import QueryTransformer, TransformerConfig
# Create and initialize
transformer = QueryTransformer(TransformerConfig(
enabled=True,
llm_provider="ollama",
llm_model="llama3.2",
num_queries=3,
domain_context="prompt engineering"
))
await transformer.initialize()
# Transform user input
queries = await transformer.transform(
"Analyze this prompt: Write a Python function to sort a list"
)
# Returns: [
# "prompt analysis techniques",
# "evaluating prompt quality",
# "code generation prompt patterns"
# ]
# Search with each query
all_results = []
for query in queries:
results = await kb.search(query)
all_results.extend(results)
With Conversation Context¶
# Transform with conversation history
queries = await transformer.transform_with_context(
user_input="What about improving it?",
conversation_context="User: How do I write prompts for code generation?\n"
"Assistant: Here are the key techniques...",
num_queries=3
)
Cleanup¶
ContextualExpander¶
Basic Usage¶
from dataknobs_bots.knowledge.query import ContextualExpander, Message
expander = ContextualExpander(
max_context_turns=3, # How many recent messages to use
include_assistant=False, # Include assistant messages
keyword_weight=2 # Keyword repetition factor
)
# Conversation history
history = [
Message(role="user", content="Tell me about chain-of-thought prompting"),
Message(role="assistant", content="Chain-of-thought is a technique..."),
Message(role="user", content="How does it compare to few-shot?"),
]
# Expand ambiguous query
expanded = expander.expand(
"Show me an example",
history
)
# Returns: "chain-of-thought prompting compare few-shot Show me an example"
With Dict History¶
# Also accepts dict format
history = [
{"role": "user", "content": "Tell me about chain-of-thought"},
{"role": "assistant", "content": "It's a technique..."},
]
expanded = expander.expand("Show me more", history)
Custom Topic Extraction¶
def extract_technical_terms(text):
"""Custom topic extractor."""
# Your extraction logic
return ["chain-of-thought", "prompting", "few-shot"]
expanded = expander.expand_with_topics(
"Show me an example",
history,
topic_extractor=extract_technical_terms
)
Ambiguity Detection¶
is_ambiguous_query¶
Check if a query needs expansion:
from dataknobs_bots.knowledge.query import is_ambiguous_query
is_ambiguous_query("Show me an example") # True (short + "example")
is_ambiguous_query("What about this one?") # True (demonstrative)
is_ambiguous_query("How do I configure OAuth?") # False (specific)
is_ambiguous_query("More") # True (very short)
Integration Pattern¶
from dataknobs_bots.knowledge.query import ContextualExpander, is_ambiguous_query
class SmartKnowledgeBase:
def __init__(self):
self.expander = ContextualExpander()
async def query(self, message, history=None, k=10):
# Expand if ambiguous and we have history
if history and is_ambiguous_query(message):
search_query = self.expander.expand(message, history)
else:
search_query = message
return await self.vector_store.search(search_query, k=k)
Complete Integration¶
RAG Pipeline with Query Processing¶
from dataknobs_bots.knowledge.query import (
ContextualExpander,
is_ambiguous_query,
)
from dataknobs_bots.knowledge.retrieval import ChunkMerger, ContextFormatter
class EnhancedRAGKnowledgeBase:
def __init__(self, vector_store):
self.vector_store = vector_store
self.expander = ContextualExpander(max_context_turns=3)
self.merger = ChunkMerger()
self.formatter = ContextFormatter()
async def query(
self,
message: str,
history: list | None = None,
k: int = 10
):
# 1. Expand ambiguous queries
if history and is_ambiguous_query(message):
search_query = self.expander.expand(message, history)
else:
search_query = message
# 2. Search
results = await self.vector_store.search(search_query, k=k)
# 3. Merge adjacent chunks
merged = self.merger.merge(results)
return merged
def format_context(self, merged_chunks):
context = self.formatter.format_merged(merged_chunks)
return self.formatter.wrap_for_prompt(context)
With LLM Query Transformation¶
from dataknobs_bots.knowledge.query import QueryTransformer, TransformerConfig
class TransformingRAGKnowledgeBase:
def __init__(self, vector_store):
self.vector_store = vector_store
self.transformer = QueryTransformer(TransformerConfig(
enabled=True,
llm_provider="ollama",
llm_model="llama3.2",
num_queries=3,
domain_context="documentation"
))
async def initialize(self):
await self.transformer.initialize()
async def query(self, message: str, k: int = 10):
# Generate multiple search queries
queries = await self.transformer.transform(message)
# Search with each query
all_results = []
seen_ids = set()
for query in queries:
results = await self.vector_store.search(query, k=k)
for result in results:
# Deduplicate
result_id = result.get("id") or hash(result.get("text", ""))
if result_id not in seen_ids:
seen_ids.add(result_id)
all_results.append(result)
# Sort by similarity and return top k
all_results.sort(key=lambda r: r.get("similarity", 0), reverse=True)
return all_results[:k]
async def close(self):
await self.transformer.close()
In DynaBot¶
class EnhancedBot(DynaBot):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.query_expander = ContextualExpander()
async def chat(
self,
message: str,
context: BotContext,
rag_query: str | None = None,
**kwargs
):
# Use explicit rag_query if provided
if rag_query:
search_query = rag_query
# Otherwise, expand if ambiguous
elif is_ambiguous_query(message):
history = await self.get_conversation_history(context)
search_query = self.query_expander.expand(message, history)
else:
search_query = message
# Continue with retrieval
return await super().chat(
message,
context,
rag_query=search_query,
**kwargs
)
API Reference¶
TransformerConfig¶
@dataclass
class TransformerConfig:
enabled: bool = False # Enable transformation
llm_provider: str = "ollama" # LLM provider
llm_model: str = "llama3.2" # Model name
num_queries: int = 3 # Queries to generate
domain_context: str = "" # Domain hint
QueryTransformer¶
class QueryTransformer:
def __init__(self, config: TransformerConfig | None = None):
"""Initialize with optional configuration."""
async def initialize(self) -> None:
"""Initialize the LLM provider."""
async def close(self) -> None:
"""Close the LLM provider."""
async def transform(
self,
user_input: str,
num_queries: int | None = None,
) -> list[str]:
"""Transform user input into optimized search queries."""
async def transform_with_context(
self,
user_input: str,
conversation_context: str,
num_queries: int | None = None,
) -> list[str]:
"""Transform with additional conversation context."""
Message¶
ContextualExpander¶
class ContextualExpander:
def __init__(
self,
max_context_turns: int = 3,
include_assistant: bool = False,
keyword_weight: int = 2,
):
"""Initialize the contextual expander."""
def expand(
self,
user_input: str,
conversation_history: list[Message] | list[dict],
) -> str:
"""Expand query with conversation context."""
def expand_with_topics(
self,
user_input: str,
conversation_history: list[Message] | list[dict],
topic_extractor: callable = None,
) -> str:
"""Expand with custom topic extraction."""
Utility Functions¶
def is_ambiguous_query(query: str) -> bool:
"""Check if a query is likely ambiguous and needs expansion."""
async def create_transformer(config: dict) -> QueryTransformer:
"""Create and initialize a QueryTransformer from config dict."""
Related¶
- Retrieval Utilities - Chunk merging and formatting
- User Guide - Complete bot usage guide