dataknobs-llm Complete API Reference¶
Complete auto-generated API documentation from source code docstrings.
💡 Also see: - Curated API Guide - Hand-crafted tutorials and examples - Package Overview - Introduction and getting started - Source Code - View on GitHub
dataknobs_llm ¶
LLM utilities for DataKnobs.
Modules:
| Name | Description |
|---|---|
conversations |
Conversation management system for dataknobs-llm. |
exceptions |
Custom exceptions for the LLM package. |
execution |
Execution utilities for parallel and sequential LLM task processing. |
extraction |
Extraction utilities for structured data extraction from text. |
fsm_integration |
FSM integration for LLM workflows. |
llm |
LLM abstraction layer for FSM patterns. |
prompts |
Advanced prompt engineering library for dataknobs_llm. |
sources |
Intent schema composition for grounded retrieval. |
template_utils |
Template rendering utilities for dataknobs-llm. |
testing |
Testing utilities for dataknobs-llm. |
tools |
Tool system for LLM function calling. |
Classes:
| Name | Description |
|---|---|
LLMProvider |
Base LLM provider interface. |
LLMConfig |
Configuration for LLM operations. |
LLMMessage |
Represents a message in LLM conversation. |
LLMResponse |
Response from LLM. |
LLMStreamResponse |
Streaming response from LLM. |
CompletionMode |
LLM completion modes. |
ModelCapability |
Model capabilities. |
OpenAIProvider |
OpenAI LLM provider with full API support. |
AnthropicProvider |
Anthropic Claude LLM provider with full API support. |
OllamaProvider |
Ollama local LLM provider for privacy-first, offline LLM usage. |
HuggingFaceProvider |
HuggingFace Inference API provider. |
EchoProvider |
Echo provider for testing and debugging. |
CachingEmbedProvider |
Provider wrapper that caches |
EmbeddingCache |
Cache for embedding vectors, keyed by (model, text). |
MemoryEmbeddingCache |
In-memory cache backend for testing. |
LLMProviderFactory |
Factory for creating LLM providers from configuration. |
TemplateStrategy |
Template rendering strategies. |
MessageTemplate |
Template for generating message content with multiple rendering strategies. |
MessageBuilder |
Builder for constructing message sequences. |
ResponseParser |
Parser for LLM responses. |
TokenCounter |
Estimate token counts for different models. |
CostCalculator |
Calculate costs for LLM usage. |
Tool |
Abstract base class for LLM-callable tools. |
ToolRegistry |
Registry for managing available tools/functions. |
DeterministicTask |
A sync or async callable to execute alongside LLM tasks. |
LLMTask |
A single LLM call to execute. |
ParallelLLMExecutor |
Runs multiple LLM calls and deterministic functions concurrently. |
TaskResult |
Result of a single task execution. |
ResponseQueueExhaustedError |
EchoProvider response queue exhausted in strict mode. |
ToolsNotSupportedError |
Model does not support tool/function calling. |
CallTracker |
Collect new LLM calls across multiple CapturingProviders per turn. |
CapturedCall |
Record of a single LLM call captured by CapturingProvider. |
CapturingProvider |
Provider wrapper that records all LLM calls for capture-replay testing. |
ErrorResponse |
Marker for a queued error in EchoProvider's response sequence. |
ResponseSequenceBuilder |
Builder for creating sequences of LLM responses. |
Functions:
| Name | Description |
|---|---|
normalize_llm_config |
Normalize various config formats to LLMConfig. |
create_llm_provider |
Create appropriate LLM provider based on configuration. |
create_embedding_provider |
Create and initialize an embedding provider from configuration. |
create_caching_provider |
Create and initialize a |
render_conditional_template |
Render a template with variable substitution and conditional sections. |
extraction_response |
Create an LLMResponse for schema extraction. |
llm_message_from_dict |
Deserialize an LLMMessage from a dict. |
llm_message_to_dict |
Serialize an LLMMessage to a JSON-compatible dict. |
llm_response_from_dict |
Deserialize an LLMResponse from a dict. |
llm_response_to_dict |
Serialize an LLMResponse to a JSON-compatible dict. |
multi_tool_response |
Create an LLMResponse with multiple tool calls. |
text_response |
Create a simple text LLMResponse. |
tool_call_from_dict |
Deserialize a ToolCall from a dict. |
tool_call_response |
Create an LLMResponse with tool call(s). |
tool_call_to_dict |
Serialize a ToolCall to a JSON-compatible dict. |
Classes¶
LLMProvider ¶
LLMProvider(
config: Union[LLMConfig, Config, Dict[str, Any]],
prompt_builder: Union[PromptBuilder, AsyncPromptBuilder] | None = None,
)
Bases: ABC
Base LLM provider interface.
Initialize provider with configuration.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
Union[LLMConfig, Config, Dict[str, Any]]
|
Configuration as LLMConfig, dataknobs Config object, or dict |
required |
prompt_builder
|
Union[PromptBuilder, AsyncPromptBuilder] | None
|
Optional prompt builder for integrated prompting |
None
|
Methods:
| Name | Description |
|---|---|
initialize |
Initialize the LLM client. |
close |
Close the LLM client. |
validate_model |
Validate that the model is available. |
get_capabilities |
Get model capabilities. |
__enter__ |
Context manager entry. |
__exit__ |
Context manager exit. |
Attributes:
| Name | Type | Description |
|---|---|---|
is_initialized |
bool
|
Check if provider is initialized. |
Source code in packages/llm/src/dataknobs_llm/llm/base.py
Attributes¶
Functions¶
initialize
abstractmethod
¶
close
abstractmethod
¶
validate_model
abstractmethod
async
¶
get_capabilities ¶
Get model capabilities.
Template method: calls :meth:_detect_capabilities (subclass hook),
filters out None values, then applies config overrides via
:meth:_resolve_capabilities.
Subclasses override :meth:_detect_capabilities instead of this
method.
Source code in packages/llm/src/dataknobs_llm/llm/base.py
__enter__ ¶
LLMConfig
dataclass
¶
LLMConfig(
provider: str,
model: str,
api_key: str | None = None,
api_base: str | None = None,
temperature: float | None = None,
max_tokens: int | None = None,
top_p: float | None = None,
frequency_penalty: float | None = None,
presence_penalty: float | None = None,
stop_sequences: List[str] | None = None,
mode: CompletionMode = CompletionMode.CHAT,
system_prompt: str | None = None,
response_format: str | None = None,
functions: List[Dict[str, Any]] | None = None,
function_call: Union[str, Dict[str, str]] | None = None,
stream: bool = False,
stream_callback: Callable[[LLMStreamResponse], None] | None = None,
rate_limit: int | None = None,
retry_count: int = 3,
retry_delay: float = 1.0,
timeout: float = 60.0,
seed: int | None = None,
logit_bias: Dict[str, float] | None = None,
user_id: str | None = None,
options: Dict[str, Any] = dict(),
dimensions: int | None = None,
capabilities: List[str] | None = None,
)
Configuration for LLM operations.
Comprehensive configuration for LLM providers with 20+ parameters controlling generation, rate limiting, function calling, and more. Works seamlessly with both direct instantiation and dataknobs Config objects.
This class supports: - All major LLM providers (OpenAI, Anthropic, Ollama, HuggingFace) - Generation parameters (temperature, max_tokens, top_p, etc.) - Embedding configuration (dimensions) - Function/tool calling configuration - Streaming with callbacks - Rate limiting and retry logic - Provider-specific options via options dict
Note
Generation parameters (temperature, top_p, max_tokens,
etc.) default to None, meaning "not set — let the provider API
apply its own default." Only explicitly supplied values are sent
to the provider (see :meth:generation_params).
Example
from dataknobs_llm.llm.base import LLMConfig, CompletionMode
# Basic configuration — temperature is explicitly set here;
# omitting it would let the provider use its own default.
config = LLMConfig(
provider="openai",
model="gpt-4",
api_key="sk-...",
temperature=0.7,
max_tokens=500
)
# Creative writing config
creative_config = LLMConfig(
provider="anthropic",
model="claude-3-sonnet",
temperature=1.2,
top_p=0.95,
max_tokens=2000
)
# Deterministic config for testing
test_config = LLMConfig(
provider="openai",
model="gpt-4",
temperature=0.0,
seed=42, # Reproducible outputs
max_tokens=100
)
# Function calling config
function_config = LLMConfig(
provider="openai",
model="gpt-4",
functions=[{
"name": "search_docs",
"description": "Search documentation",
"parameters": {"type": "object", "properties": {...}}
}],
function_call="auto"
)
# Streaming with callback
def on_chunk(chunk):
print(chunk.delta, end="")
streaming_config = LLMConfig(
provider="openai",
model="gpt-4",
stream=True,
stream_callback=on_chunk
)
# From dictionary (Config compatibility)
config_dict = {
"provider": "ollama",
"model": "llama2",
"type": "llm", # Config metadata (ignored)
"temperature": 0.8
}
config = LLMConfig.from_dict(config_dict)
# Clone with overrides
new_config = config.clone(temperature=1.0, max_tokens=1000)
See Also
normalize_llm_config: Convert various formats to LLMConfig CompletionMode: Available completion modes
Methods:
| Name | Description |
|---|---|
from_dict |
Create LLMConfig from a dictionary. |
to_dict |
Convert LLMConfig to a dictionary. |
clone |
Create a copy of this config with optional overrides. |
generation_params |
Return only explicitly-set generation parameters. |
Functions¶
from_dict
classmethod
¶
Create LLMConfig from a dictionary.
This method handles dictionaries from dataknobs Config objects, which may include 'type', 'name', and 'factory' attributes. These attributes are ignored during LLMConfig construction.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config_dict
|
Dict[str, Any]
|
Configuration dictionary |
required |
Returns:
| Type | Description |
|---|---|
LLMConfig
|
LLMConfig instance |
Source code in packages/llm/src/dataknobs_llm/llm/base.py
to_dict ¶
Convert LLMConfig to a dictionary.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
include_config_attrs
|
bool
|
If True, includes 'type' attribute for Config compatibility |
False
|
Returns:
| Type | Description |
|---|---|
Dict[str, Any]
|
Configuration dictionary |
Source code in packages/llm/src/dataknobs_llm/llm/base.py
clone ¶
Create a copy of this config with optional overrides.
This method is useful for creating runtime configuration variations without mutating the original config. All dataclass fields can be overridden via keyword arguments.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
**overrides
|
Any
|
Field values to override in the cloned config |
{}
|
Returns:
| Type | Description |
|---|---|
LLMConfig
|
New LLMConfig instance with overrides applied |
Example
base_config = LLMConfig(provider="openai", model="gpt-4", temperature=0.7) creative_config = base_config.clone(temperature=1.2, max_tokens=500)
Source code in packages/llm/src/dataknobs_llm/llm/base.py
generation_params ¶
Return only explicitly-set generation parameters.
Providers use this to build API requests without sending unnecessary defaults. Parameters left as None are omitted, letting each provider API apply its own default.
Returns:
| Type | Description |
|---|---|
Dict[str, Any]
|
Dictionary of generation parameter names to their values. |
Dict[str, Any]
|
Only includes parameters that were explicitly set (non-None). |
Source code in packages/llm/src/dataknobs_llm/llm/base.py
LLMMessage
dataclass
¶
LLMMessage(
role: str,
content: str,
name: str | None = None,
tool_call_id: str | None = None,
function_call: Dict[str, Any] | None = None,
tool_calls: list[ToolCall] | None = None,
metadata: Dict[str, Any] = dict(),
)
Represents a message in LLM conversation.
Standard message format used across all providers. Messages are the fundamental unit of LLM interactions, containing role-based content for multi-turn conversations.
Attributes:
| Name | Type | Description |
|---|---|---|
role |
str
|
Message role - 'system', 'user', 'assistant', 'tool', or 'function' |
content |
str
|
Message content text |
name |
str | None
|
Optional name for function/tool messages or multi-user scenarios |
tool_call_id |
str | None
|
Provider-assigned ID for pairing tool results with invocations (required by OpenAI and Anthropic APIs) |
function_call |
Dict[str, Any] | None
|
Function call data for tool-using models |
tool_calls |
list[ToolCall] | None
|
Tool calls made by the assistant in this message |
metadata |
Dict[str, Any]
|
Additional metadata (timestamps, IDs, etc.) |
Example
from dataknobs_llm.llm.base import LLMMessage
# System message
system_msg = LLMMessage(
role="system",
content="You are a helpful coding assistant."
)
# User message
user_msg = LLMMessage(
role="user",
content="How do I reverse a list in Python?"
)
# Assistant message
assistant_msg = LLMMessage(
role="assistant",
content="Use the reverse() method or [::-1] slicing."
)
# Function result message
function_msg = LLMMessage(
role="function",
name="search_docs",
content='{"result": "Found 3 examples"}'
)
# Build conversation
messages = [system_msg, user_msg, assistant_msg]
Methods:
| Name | Description |
|---|---|
to_dict |
Convert to canonical dictionary format for storage/interchange. |
from_dict |
Create an LLMMessage from a dictionary. |
Functions¶
to_dict ¶
Convert to canonical dictionary format for storage/interchange.
Only includes non-None/non-empty optional fields to keep output clean.
Returns:
| Type | Description |
|---|---|
Dict[str, Any]
|
Dictionary with |
Source code in packages/llm/src/dataknobs_llm/llm/base.py
from_dict
classmethod
¶
Create an LLMMessage from a dictionary.
Handles both the new canonical format (with tool_calls as list of
dicts) and the legacy format (without tool_calls/function_call).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data
|
Dict[str, Any]
|
Dictionary with |
required |
Returns:
| Type | Description |
|---|---|
LLMMessage
|
LLMMessage instance. |
Source code in packages/llm/src/dataknobs_llm/llm/base.py
LLMResponse
dataclass
¶
LLMResponse(
content: str,
model: str,
finish_reason: str | None = None,
usage: Dict[str, int] | None = None,
function_call: Dict[str, Any] | None = None,
tool_calls: list[ToolCall] | None = None,
metadata: Dict[str, Any] = dict(),
created_at: datetime = datetime.now(),
cost_usd: float | None = None,
cumulative_cost_usd: float | None = None,
)
Response from LLM.
Standard response format returned by all LLM providers. Contains the generated content along with metadata about token usage, cost, and completion status.
Attributes:
| Name | Type | Description |
|---|---|---|
content |
str
|
Generated text content |
model |
str
|
Model identifier that generated the response |
finish_reason |
str | None
|
Why generation stopped - 'stop', 'length', 'function_call' |
usage |
Dict[str, int] | None
|
Token usage stats (prompt_tokens, completion_tokens, total_tokens) |
function_call |
Dict[str, Any] | None
|
Function call data if model requested tool use |
metadata |
Dict[str, Any]
|
Provider-specific metadata |
created_at |
datetime
|
Response timestamp |
cost_usd |
float | None
|
Estimated cost in USD for this request |
cumulative_cost_usd |
float | None
|
Running total cost for conversation |
Example
from dataknobs_llm import create_llm_provider
llm = create_llm_provider("openai", model="gpt-4")
response = await llm.complete("What is Python?")
# Access response data
print(response.content)
# => "Python is a high-level programming language..."
# Check token usage
print(f"Tokens used: {response.usage['total_tokens']}")
# => Tokens used: 87
# Monitor costs
if response.cost_usd:
print(f"Cost: ${response.cost_usd:.4f}")
print(f"Total: ${response.cumulative_cost_usd:.4f}")
# Check completion status
if response.finish_reason == "length":
print("Response truncated due to max_tokens limit")
See Also
LLMMessage: Request message format LLMStreamResponse: Streaming response format
LLMStreamResponse
dataclass
¶
LLMStreamResponse(
delta: str,
is_final: bool = False,
finish_reason: str | None = None,
usage: Dict[str, int] | None = None,
tool_calls: list[ToolCall] | None = None,
model: str | None = None,
metadata: Dict[str, Any] = dict(),
)
Streaming response from LLM.
Represents a single chunk in a streaming LLM response. Streaming allows displaying generated text incrementally as it's produced, providing better user experience for long responses.
Attributes:
| Name | Type | Description |
|---|---|---|
delta |
str
|
Incremental content for this chunk (not cumulative) |
is_final |
bool
|
True if this is the last chunk in the stream |
finish_reason |
str | None
|
Why generation stopped (only set on final chunk) |
usage |
Dict[str, int] | None
|
Token usage stats (only set on final chunk) |
tool_calls |
list[ToolCall] | None
|
Tool calls requested by the model (only set on final chunk) |
model |
str | None
|
Model identifier (only set on final chunk) |
metadata |
Dict[str, Any]
|
Additional chunk metadata |
Example
from dataknobs_llm import create_llm_provider
llm = create_llm_provider("openai", model="gpt-4")
# Stream and display in real-time
async for chunk in llm.stream_complete("Write a poem"):
print(chunk.delta, end="", flush=True)
if chunk.is_final:
print(f"\n\nFinished: {chunk.finish_reason}")
print(f"Tokens: {chunk.usage['total_tokens']}")
# Accumulate full response
full_text = ""
chunks_received = 0
async for chunk in llm.stream_complete("Explain Python"):
full_text += chunk.delta
chunks_received += 1
# Optional: show progress
if chunks_received % 10 == 0:
print(f"Received {chunks_received} chunks...")
print(f"\nComplete response ({len(full_text)} chars)")
print(full_text)
See Also
LLMResponse: Non-streaming response format AsyncLLMProvider.stream_complete: Streaming method
CompletionMode ¶
Bases: Enum
LLM completion modes.
Defines the operation mode for LLM requests. Different modes use different APIs and formatting requirements.
Attributes:
| Name | Type | Description |
|---|---|---|
CHAT |
Chat completion with conversational message history |
|
TEXT |
Raw text completion (legacy models) |
|
INSTRUCT |
Instruction-following mode |
|
EMBEDDING |
Generate vector embeddings for semantic search |
|
FUNCTION |
Function/tool calling mode |
Example
from dataknobs_llm.llm.base import LLMConfig, CompletionMode
# Chat mode (default for modern models)
config = LLMConfig(
provider="openai",
model="gpt-4",
mode=CompletionMode.CHAT
)
# Embedding mode for vector search
embedding_config = LLMConfig(
provider="openai",
model="text-embedding-ada-002",
mode=CompletionMode.EMBEDDING
)
ModelCapability ¶
Bases: Enum
Model capabilities.
Enumerates the capabilities that different LLM models support. Providers use this to advertise what features are available for a specific model.
Attributes:
| Name | Type | Description |
|---|---|---|
TEXT_GENERATION |
Basic text generation |
|
CHAT |
Multi-turn conversational interactions |
|
EMBEDDINGS |
Vector embedding generation |
|
FUNCTION_CALLING |
Tool/function calling support |
|
VISION |
Image understanding capabilities |
|
CODE |
Code generation and analysis |
|
JSON_MODE |
Structured JSON output |
|
STREAMING |
Incremental response streaming |
Example
from dataknobs_llm import create_llm_provider
from dataknobs_llm.llm.base import ModelCapability
# Check model capabilities
llm = create_llm_provider("openai", model="gpt-4")
capabilities = llm.get_capabilities()
if ModelCapability.STREAMING in capabilities:
# Use streaming
async for chunk in llm.stream_complete("Hello"):
print(chunk.delta, end="")
if ModelCapability.FUNCTION_CALLING in capabilities:
# Use function calling
response = await llm.function_call(messages, functions)
OpenAIProvider ¶
OpenAIProvider(
config: Union[LLMConfig, Config, Dict[str, Any]],
prompt_builder: AsyncPromptBuilder | None = None,
)
Bases: AsyncLLMProvider
OpenAI LLM provider with full API support.
Provides async access to OpenAI's chat, completion, embedding, and function calling APIs. Supports all GPT models including GPT-4, GPT-3.5, and specialized models (vision, embeddings).
Features
- Full GPT-4 and GPT-3.5-turbo support
- Streaming responses for real-time output
- Function calling for tool use
- JSON mode for structured outputs
- Embeddings for semantic search
- Custom API endpoints (e.g., Azure OpenAI)
- Automatic retry with rate limiting
- Cost tracking
Example
from dataknobs_llm.llm.providers import OpenAIProvider
from dataknobs_llm.llm.base import LLMConfig, LLMMessage
# Basic usage
config = LLMConfig(
provider="openai",
model="gpt-4",
api_key="sk-...",
temperature=0.7
)
async with OpenAIProvider(config) as llm:
# Simple question
response = await llm.complete("Explain async/await")
print(response.content)
# Multi-turn conversation
messages = [
LLMMessage(role="system", content="You are a coding tutor"),
LLMMessage(role="user", content="How do I use asyncio?")
]
response = await llm.complete(messages)
# JSON mode for structured output
json_config = LLMConfig(
provider="openai",
model="gpt-4",
response_format="json",
system_prompt="Return JSON only"
)
llm = OpenAIProvider(json_config)
await llm.initialize()
response = await llm.complete(
"List 3 Python libraries as JSON: {name, description}"
)
import json
data = json.loads(response.content)
# With Azure OpenAI
azure_config = LLMConfig(
provider="openai",
model="gpt-4",
api_base="https://your-resource.openai.azure.com/",
api_key="azure-key"
)
# Function calling
functions = [{
"name": "search",
"description": "Search for information",
"parameters": {
"type": "object",
"properties": {
"query": {"type": "string"}
}
}
}]
response = await llm.function_call(messages, functions)
if response.function_call:
print(f"Call: {response.function_call['name']}")
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
Union[LLMConfig, Config, Dict[str, Any]]
|
LLMConfig, dataknobs Config, or dict with provider settings |
required |
prompt_builder
|
AsyncPromptBuilder | None
|
Optional AsyncPromptBuilder for prompt rendering |
None
|
Attributes:
| Name | Type | Description |
|---|---|---|
adapter |
OpenAIAdapter
|
Format adapter for OpenAI API |
_client |
OpenAI AsyncOpenAI client instance |
See Also
LLMConfig: Configuration options AsyncLLMProvider: Base provider interface OpenAIAdapter: Format conversion
Methods:
| Name | Description |
|---|---|
initialize |
Initialize OpenAI client. |
validate_model |
Validate model availability. |
complete |
Generate completion. |
stream_complete |
Generate streaming completion. |
embed |
Generate embeddings. |
function_call |
Execute function calling. |
Source code in packages/llm/src/dataknobs_llm/llm/providers/openai.py
Functions¶
initialize
async
¶
Initialize OpenAI client.
Source code in packages/llm/src/dataknobs_llm/llm/providers/openai.py
validate_model
async
¶
Validate model availability.
Source code in packages/llm/src/dataknobs_llm/llm/providers/openai.py
complete
async
¶
complete(
messages: Union[str, List[LLMMessage]],
config_overrides: Dict[str, Any] | None = None,
tools: list[Any] | None = None,
**kwargs: Any,
) -> LLMResponse
Generate completion.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
messages
|
Union[str, List[LLMMessage]]
|
Input messages or prompt |
required |
config_overrides
|
Dict[str, Any] | None
|
Optional dict to override config fields (model, temperature, max_tokens, top_p, stop_sequences, seed) |
None
|
tools
|
list[Any] | None
|
Optional list of Tool objects for function calling |
None
|
**kwargs
|
Any
|
Additional provider-specific parameters |
{}
|
Source code in packages/llm/src/dataknobs_llm/llm/providers/openai.py
stream_complete
async
¶
stream_complete(
messages: Union[str, List[LLMMessage]],
config_overrides: Dict[str, Any] | None = None,
tools: list[Any] | None = None,
**kwargs: Any,
) -> AsyncIterator[LLMStreamResponse]
Generate streaming completion.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
messages
|
Union[str, List[LLMMessage]]
|
Input messages or prompt |
required |
config_overrides
|
Dict[str, Any] | None
|
Optional dict to override config fields (model, temperature, max_tokens, top_p, stop_sequences, seed) |
None
|
tools
|
list[Any] | None
|
Optional list of Tool objects for function calling. |
None
|
**kwargs
|
Any
|
Additional provider-specific parameters |
{}
|
Source code in packages/llm/src/dataknobs_llm/llm/providers/openai.py
426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 | |
embed
async
¶
Generate embeddings.
Source code in packages/llm/src/dataknobs_llm/llm/providers/openai.py
function_call
async
¶
function_call(
messages: List[LLMMessage], functions: List[Dict[str, Any]], **kwargs
) -> LLMResponse
Execute function calling.
Source code in packages/llm/src/dataknobs_llm/llm/providers/openai.py
AnthropicProvider ¶
AnthropicProvider(
config: Union[LLMConfig, Config, Dict[str, Any]],
prompt_builder: AsyncPromptBuilder | None = None,
)
Bases: AsyncLLMProvider
Anthropic Claude LLM provider with full API support.
Provides async access to Anthropic's Claude models including Claude 3 (Opus, Sonnet, Haiku) and Claude 2. Supports advanced features like native tool use, vision, and extended context windows.
Features
- Claude 3 Opus/Sonnet/Haiku and Claude 2 models
- Native tools API for function calling (Claude 3+)
- Vision capabilities for image understanding (Claude 3+)
- Streaming responses for real-time output
- Long context windows (up to 200k tokens)
- Advanced reasoning and coding capabilities
- System prompts for behavior control
- JSON output mode
Example
from dataknobs_llm.llm.providers import AnthropicProvider
from dataknobs_llm.llm.base import LLMConfig, LLMMessage
# Basic usage
config = LLMConfig(
provider="anthropic",
model="claude-3-sonnet-20240229",
api_key="sk-ant-...",
temperature=0.7,
max_tokens=1024
)
async with AnthropicProvider(config) as llm:
# Simple completion
response = await llm.complete("Explain machine learning")
print(response.content)
# With system prompt
messages = [
LLMMessage(
role="system",
content="You are an expert Python tutor"
),
LLMMessage(
role="user",
content="How do I use decorators?"
)
]
response = await llm.complete(messages)
# Long context processing (Claude 3+)
long_config = LLMConfig(
provider="anthropic",
model="claude-3-opus-20240229",
max_tokens=4096
)
llm = AnthropicProvider(long_config)
await llm.initialize()
# Process large document
with open("large_doc.txt") as f:
long_text = f.read() # Up to 200k tokens!
response = await llm.complete(
f"Summarize this document:\n\n{long_text}"
)
# Tool use / function calling (Claude 3+)
tools = [
{
"name": "web_search",
"description": "Search the web for information",
"input_schema": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query"
},
"num_results": {
"type": "integer",
"description": "Number of results"
}
},
"required": ["query"]
}
}
]
messages = [
LLMMessage(
role="user",
content="Search for latest AI news"
)
]
response = await llm.function_call(messages, tools)
if response.function_call:
import json
tool_input = json.loads(response.function_call["arguments"])
print(f"Tool: {response.function_call['name']}")
print(f"Input: {tool_input}")
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
Union[LLMConfig, Config, Dict[str, Any]]
|
LLMConfig, dataknobs Config, or dict with provider settings |
required |
prompt_builder
|
AsyncPromptBuilder | None
|
Optional AsyncPromptBuilder for prompt rendering |
None
|
Attributes:
| Name | Type | Description |
|---|---|---|
_client |
Anthropic AsyncAnthropic client instance |
See Also
LLMConfig: Configuration options AsyncLLMProvider: Base provider interface Anthropic API Docs: https://docs.anthropic.com/
Methods:
| Name | Description |
|---|---|
initialize |
Initialize Anthropic client. |
validate_model |
Validate model availability. |
complete |
Generate completion. |
stream_complete |
Generate streaming completion. |
embed |
Anthropic doesn't provide embeddings. |
function_call |
Execute function calling with native Anthropic tools API (Claude 3+). |
Source code in packages/llm/src/dataknobs_llm/llm/providers/anthropic.py
Functions¶
initialize
async
¶
Initialize Anthropic client.
Source code in packages/llm/src/dataknobs_llm/llm/providers/anthropic.py
validate_model
async
¶
Validate model availability.
Source code in packages/llm/src/dataknobs_llm/llm/providers/anthropic.py
complete
async
¶
complete(
messages: Union[str, List[LLMMessage]],
config_overrides: Dict[str, Any] | None = None,
tools: list[Any] | None = None,
**kwargs: Any,
) -> LLMResponse
Generate completion.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
messages
|
Union[str, List[LLMMessage]]
|
Input messages or prompt |
required |
config_overrides
|
Dict[str, Any] | None
|
Optional dict to override config fields (model, temperature, max_tokens, top_p, stop_sequences, seed) |
None
|
tools
|
list[Any] | None
|
Optional list of Tool objects for function calling |
None
|
**kwargs
|
Any
|
Additional provider-specific parameters |
{}
|
Source code in packages/llm/src/dataknobs_llm/llm/providers/anthropic.py
stream_complete
async
¶
stream_complete(
messages: Union[str, List[LLMMessage]],
config_overrides: Dict[str, Any] | None = None,
tools: list[Any] | None = None,
**kwargs: Any,
) -> AsyncIterator[LLMStreamResponse]
Generate streaming completion.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
messages
|
Union[str, List[LLMMessage]]
|
Input messages or prompt |
required |
config_overrides
|
Dict[str, Any] | None
|
Optional dict to override config fields (model, temperature, max_tokens, top_p, stop_sequences, seed) |
None
|
tools
|
list[Any] | None
|
Optional list of Tool objects for function calling. |
None
|
**kwargs
|
Any
|
Additional provider-specific parameters |
{}
|
Source code in packages/llm/src/dataknobs_llm/llm/providers/anthropic.py
embed
async
¶
Anthropic doesn't provide embeddings.
Source code in packages/llm/src/dataknobs_llm/llm/providers/anthropic.py
function_call
async
¶
function_call(
messages: List[LLMMessage], functions: List[Dict[str, Any]], **kwargs: Any
) -> LLMResponse
Execute function calling with native Anthropic tools API (Claude 3+).
Source code in packages/llm/src/dataknobs_llm/llm/providers/anthropic.py
589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 | |
OllamaProvider ¶
OllamaProvider(
config: Union[LLMConfig, Config, Dict[str, Any]],
prompt_builder: AsyncPromptBuilder | None = None,
)
Bases: AsyncLLMProvider
Ollama local LLM provider for privacy-first, offline LLM usage.
Provides async access to locally-hosted Ollama models, enabling on-premise LLM deployment without cloud APIs. Perfect for sensitive data, air-gapped environments, and cost optimization.
Features
- All Ollama models (Llama ⅔, Mistral, Phi, CodeLlama, etc.)
- No API key required - fully local
- Chat with message history
- Streaming responses for real-time output
- Embeddings for RAG and semantic search
- Tool/function calling (Ollama 0.1.17+)
- Vision models (LLaVA, bakllava)
- Docker environment auto-detection
- Custom model parameters (temperature, top_p, seed)
- Zero-cost inference
Example
from dataknobs_llm.llm.providers import OllamaProvider
from dataknobs_llm.llm.base import LLMConfig, LLMMessage
# Basic local usage
config = LLMConfig(
provider="ollama",
model="llama2", # or llama3, mistral, phi, etc.
temperature=0.7
)
async with OllamaProvider(config) as llm:
# Simple completion
response = await llm.complete("Explain decorators in Python")
print(response.content)
# Multi-turn conversation
messages = [
LLMMessage(role="system", content="You are a helpful assistant"),
LLMMessage(role="user", content="What is recursion?"),
LLMMessage(role="assistant", content="Recursion is..."),
LLMMessage(role="user", content="Show me an example")
]
response = await llm.complete(messages)
# Code generation with CodeLlama
code_config = LLMConfig(
provider="ollama",
model="codellama",
temperature=0.2, # Lower for more deterministic code
max_tokens=500
)
llm = OllamaProvider(code_config)
await llm.initialize()
response = await llm.complete(
"Write a Python function to merge two sorted lists"
)
print(response.content)
# Remote Ollama server
remote_config = LLMConfig(
provider="ollama",
model="llama2",
api_base="http://192.168.1.100:11434" # Remote server
)
# Docker usage (auto-detects)
# In Docker, automatically uses host.docker.internal
docker_config = LLMConfig(
provider="ollama",
model="mistral"
)
# Vision model with image input
from dataknobs_llm.llm.base import LLMMessage
import base64
with open("image.jpg", "rb") as f:
image_data = base64.b64encode(f.read()).decode()
vision_config = LLMConfig(
provider="ollama",
model="llava" # or bakllava
)
llm = OllamaProvider(vision_config)
await llm.initialize()
messages = [
LLMMessage(
role="user",
content="What objects are in this image?",
metadata={"images": [image_data]}
)
]
response = await llm.complete(messages)
print(response.content)
# Embeddings for RAG
embed_config = LLMConfig(
provider="ollama",
model="nomic-embed-text" # or mxbai-embed-large
)
llm = OllamaProvider(embed_config)
await llm.initialize()
# Single embedding
embedding = await llm.embed("Sample text")
print(f"Dimensions: {len(embedding)}")
# Batch embeddings
texts = [
"Python programming",
"Machine learning basics",
"Web development with Flask"
]
embeddings = await llm.embed(texts)
print(f"Generated {len(embeddings)} embeddings")
# Tool use (Ollama 0.1.17+)
tools = [
{
"type": "function",
"function": {
"name": "get_weather",
"description": "Get current weather",
"parameters": {
"type": "object",
"properties": {
"location": {"type": "string"}
},
"required": ["location"]
}
}
}
]
response = await llm.function_call(messages, tools)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
Union[LLMConfig, Config, Dict[str, Any]]
|
LLMConfig, dataknobs Config, or dict with provider settings |
required |
prompt_builder
|
AsyncPromptBuilder | None
|
Optional AsyncPromptBuilder for prompt rendering |
None
|
Attributes:
| Name | Type | Description |
|---|---|---|
base_url |
str
|
Ollama API base URL (auto-detects Docker environment) |
_client |
HTTP client for Ollama API |
See Also
LLMConfig: Configuration options AsyncLLMProvider: Base provider interface Ollama Documentation: https://ollama.ai
Methods:
| Name | Description |
|---|---|
initialize |
Initialize Ollama client. |
validate_model |
Validate model availability. |
complete |
Generate completion using Ollama chat endpoint. |
stream_complete |
Generate streaming completion using Ollama chat endpoint. |
embed |
Generate embeddings. |
function_call |
Execute function calling with native Ollama tools support. |
Source code in packages/llm/src/dataknobs_llm/llm/providers/ollama.py
Functions¶
initialize
async
¶
Initialize Ollama client.
Source code in packages/llm/src/dataknobs_llm/llm/providers/ollama.py
validate_model
async
¶
Validate model availability.
Source code in packages/llm/src/dataknobs_llm/llm/providers/ollama.py
complete
async
¶
complete(
messages: Union[str, List[LLMMessage]],
config_overrides: Dict[str, Any] | None = None,
tools: list[Any] | None = None,
**kwargs: Any,
) -> LLMResponse
Generate completion using Ollama chat endpoint.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
messages
|
Union[str, List[LLMMessage]]
|
Input messages or prompt |
required |
config_overrides
|
Dict[str, Any] | None
|
Optional dict to override config fields (model, temperature, max_tokens, top_p, stop_sequences, seed) |
None
|
tools
|
list[Any] | None
|
Optional list of Tool objects for function calling |
None
|
**kwargs
|
Any
|
Additional provider-specific parameters |
{}
|
Source code in packages/llm/src/dataknobs_llm/llm/providers/ollama.py
648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 | |
stream_complete
async
¶
stream_complete(
messages: Union[str, List[LLMMessage]],
config_overrides: Dict[str, Any] | None = None,
tools: list[Any] | None = None,
**kwargs: Any,
) -> AsyncIterator[LLMStreamResponse]
Generate streaming completion using Ollama chat endpoint.
Uses the /api/chat endpoint with stream: true so that the
model's native chat template is applied and tool calls are supported,
matching the behaviour of :meth:complete.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
messages
|
Union[str, List[LLMMessage]]
|
Input messages or prompt |
required |
config_overrides
|
Dict[str, Any] | None
|
Optional dict to override config fields (model, temperature, max_tokens, top_p, stop_sequences, seed) |
None
|
tools
|
list[Any] | None
|
Optional list of Tool objects for function calling. |
None
|
**kwargs
|
Any
|
Additional provider-specific parameters. |
{}
|
Source code in packages/llm/src/dataknobs_llm/llm/providers/ollama.py
730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 | |
embed
async
¶
Generate embeddings.
Source code in packages/llm/src/dataknobs_llm/llm/providers/ollama.py
function_call
async
¶
function_call(
messages: List[LLMMessage], functions: List[Dict[str, Any]], **kwargs
) -> LLMResponse
Execute function calling with native Ollama tools support.
For Ollama 0.1.17+, uses native tools API. Falls back to prompt-based approach for older versions.
Source code in packages/llm/src/dataknobs_llm/llm/providers/ollama.py
843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 | |
HuggingFaceProvider ¶
HuggingFaceProvider(
config: Union[LLMConfig, Config, Dict[str, Any]],
prompt_builder: AsyncPromptBuilder | None = None,
)
Bases: AsyncLLMProvider
HuggingFace Inference API provider.
Methods:
| Name | Description |
|---|---|
initialize |
Initialize HuggingFace client. |
validate_model |
Validate model availability. |
complete |
Generate completion. |
stream_complete |
HuggingFace Inference API doesn't support streaming. |
embed |
Generate embeddings. |
function_call |
HuggingFace doesn't have native function calling. |
Source code in packages/llm/src/dataknobs_llm/llm/providers/huggingface.py
Functions¶
initialize
async
¶
Initialize HuggingFace client.
Source code in packages/llm/src/dataknobs_llm/llm/providers/huggingface.py
validate_model
async
¶
Validate model availability.
Source code in packages/llm/src/dataknobs_llm/llm/providers/huggingface.py
complete
async
¶
complete(
messages: Union[str, List[LLMMessage]],
config_overrides: Dict[str, Any] | None = None,
tools: list[Any] | None = None,
**kwargs: Any,
) -> LLMResponse
Generate completion.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
messages
|
Union[str, List[LLMMessage]]
|
Input messages or prompt |
required |
config_overrides
|
Dict[str, Any] | None
|
Optional dict to override config fields (model, temperature, max_tokens, top_p, stop_sequences, seed) |
None
|
tools
|
list[Any] | None
|
Optional list of Tool objects (not supported — raises ToolsNotSupportedError if provided) |
None
|
**kwargs
|
Any
|
Additional provider-specific parameters |
{}
|
Source code in packages/llm/src/dataknobs_llm/llm/providers/huggingface.py
stream_complete
async
¶
stream_complete(
messages: Union[str, List[LLMMessage]],
config_overrides: Dict[str, Any] | None = None,
tools: list[Any] | None = None,
**kwargs: Any,
) -> AsyncIterator[LLMStreamResponse]
HuggingFace Inference API doesn't support streaming.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
messages
|
Union[str, List[LLMMessage]]
|
Input messages or prompt |
required |
config_overrides
|
Dict[str, Any] | None
|
Optional dict to override config fields (model, temperature, max_tokens, top_p, stop_sequences, seed) |
None
|
tools
|
list[Any] | None
|
Optional list of Tool objects (not supported — raises ToolsNotSupportedError if provided) |
None
|
**kwargs
|
Any
|
Additional provider-specific parameters |
{}
|
Source code in packages/llm/src/dataknobs_llm/llm/providers/huggingface.py
embed
async
¶
Generate embeddings.
Source code in packages/llm/src/dataknobs_llm/llm/providers/huggingface.py
function_call
async
¶
function_call(
messages: List[LLMMessage], functions: List[Dict[str, Any]], **kwargs
) -> LLMResponse
HuggingFace doesn't have native function calling.
Source code in packages/llm/src/dataknobs_llm/llm/providers/huggingface.py
EchoProvider ¶
EchoProvider(
config: Union[LLMConfig, Config, Dict[str, Any]],
prompt_builder: AsyncPromptBuilder | None = None,
responses: List[Union[str, LLMResponse]] | None = None,
response_fn: ResponseFunction | None = None,
strict_tools: bool = True,
strict: bool = False,
)
Bases: AsyncLLMProvider
Echo provider for testing and debugging.
This provider echoes back input messages and generates deterministic mock embeddings. Perfect for testing without real LLM API calls.
Features:
- Echoes back user messages with configurable prefix
- Generates deterministic embeddings based on content hash
- Supports streaming (character-by-character echo)
- Mocks function calling with deterministic responses
- Zero external dependencies
- Instant responses
- Strict mode: raise when response queue is exhausted instead of
falling back to echo behavior (catches under-scripted tests)
- Instance tracking: class-level tracking of created instances
for inspecting providers created internally by from_config()
Scripted Response Features (for testing): - Response queue: Provide ordered list of responses - Response function: Dynamic response based on input - Pattern matching: Map input patterns to responses - Call tracking: Record all calls for assertions
Example
# Queue mode - responses consumed in order
provider = EchoProvider(config)
provider.set_responses(["First response", "Second response"])
# Function mode - dynamic responses
provider.set_response_function(
lambda msgs: f"Got {len(msgs)} messages"
)
# Pattern mode - match input to responses
provider.add_pattern_response(r"hello", "Hi there!")
provider.add_pattern_response(r"bye", "Goodbye!")
# Check what was called
assert provider.call_count == 2
assert "hello" in provider.get_call(0).messages[0].content
# Strict mode - catch under-scripted tests
provider = EchoProvider(config, strict=True)
provider.set_responses(["only one"])
await provider.complete("first") # Returns "only one"
await provider.complete("second") # Raises ResponseQueueExhaustedError
# Instance tracking - inspect providers created by from_config()
with EchoProvider.track_instances() as instances:
bot = await DynaBot.from_config(config)
assert instances[0].close_count == 0 # or == 1 on error path
Initialize EchoProvider.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
Union[LLMConfig, Config, Dict[str, Any]]
|
LLM configuration. |
required |
prompt_builder
|
AsyncPromptBuilder | None
|
Optional prompt builder |
None
|
responses
|
List[Union[str, LLMResponse]] | None
|
Optional list of responses to return in order |
None
|
response_fn
|
ResponseFunction | None
|
Optional function to generate responses dynamically |
None
|
strict_tools
|
bool
|
If True (default), raise ValueError when a scripted response contains tool_calls but no tools were provided to complete(). Real providers never return tool_calls unless tool definitions are sent in the request, so this catches callers that forget to pass tools. Set to False only for tests that intentionally exercise edge cases (e.g. "what happens if tool_calls arrive unexpectedly"). |
True
|
strict
|
bool
|
If True, raise |
False
|
Methods:
| Name | Description |
|---|---|
set_responses |
Set queue of responses to return in order. |
add_response |
Add a single response to the queue. |
set_response_function |
Set function to generate dynamic responses. |
add_pattern_response |
Add pattern-matched response. |
clear_responses |
Clear all scripted responses and reset to echo mode. |
clear_history |
Clear call history. |
reset |
Reset all state (responses, history, init count, and close count). |
get_last_instance |
Return the most recently created EchoProvider instance. |
track_instances |
Context manager that collects all EchoProvider instances created within. |
reset_tracking |
Clear all instance tracking state. |
get_call |
Get a specific call by index. |
get_last_call |
Get the most recent call, or None if no calls made. |
get_last_user_message |
Get the last user message from the most recent call. |
initialize |
Initialize echo provider. Tracks call count via |
close |
Close echo provider, tracking the call. |
validate_model |
Validate model (always true for echo). |
complete |
Echo back the input messages or return scripted response. |
stream_complete |
Stream echo response character by character. |
embed |
Generate deterministic mock embeddings. |
function_call |
Mock function calling with deterministic response. |
Attributes:
| Name | Type | Description |
|---|---|---|
strict |
bool
|
Whether strict mode is enabled (raises on exhausted queue). |
init_count |
int
|
Get number of initialize() calls made. |
close_count |
int
|
Get number of close() calls made. |
call_count |
int
|
Get number of complete() calls made. |
calls |
List[Dict[str, Any]]
|
Get all recorded calls. |
embed_call_count |
int
|
Get number of embed() calls made. |
embed_calls |
List[Dict[str, Any]]
|
Get all recorded embed calls. |
Source code in packages/llm/src/dataknobs_llm/llm/providers/echo.py
Attributes¶
Functions¶
set_responses ¶
set_responses(
responses: List[Union[str, LLMResponse, ErrorResponse]], cycle: bool = False
) -> EchoProvider
Set queue of responses to return in order.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
responses
|
List[Union[str, LLMResponse, ErrorResponse]]
|
List of response strings or LLMResponse objects |
required |
cycle
|
bool
|
If True, cycle through responses instead of exhausting |
False
|
Returns:
| Type | Description |
|---|---|
EchoProvider
|
Self for chaining |
Source code in packages/llm/src/dataknobs_llm/llm/providers/echo.py
add_response ¶
Add a single response to the queue.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
response
|
Union[str, LLMResponse, ErrorResponse]
|
Response string or LLMResponse object |
required |
Returns:
| Type | Description |
|---|---|
EchoProvider
|
Self for chaining |
Source code in packages/llm/src/dataknobs_llm/llm/providers/echo.py
set_response_function ¶
Set function to generate dynamic responses.
The function receives the message list and returns either a string (converted to LLMResponse) or an LLMResponse object.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
fn
|
ResponseFunction
|
Function(messages) -> str | LLMResponse |
required |
Returns:
| Type | Description |
|---|---|
EchoProvider
|
Self for chaining |
Source code in packages/llm/src/dataknobs_llm/llm/providers/echo.py
add_pattern_response ¶
add_pattern_response(
pattern: str,
response: Union[str, LLMResponse, ErrorResponse],
flags: int = re.IGNORECASE,
) -> EchoProvider
Add pattern-matched response.
When user message matches pattern, return the specified response.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pattern
|
str
|
Regex pattern to match against user messages |
required |
response
|
Union[str, LLMResponse, ErrorResponse]
|
Response to return when pattern matches |
required |
flags
|
int
|
Regex flags (default: case-insensitive) |
IGNORECASE
|
Returns:
| Type | Description |
|---|---|
EchoProvider
|
Self for chaining |
Source code in packages/llm/src/dataknobs_llm/llm/providers/echo.py
clear_responses ¶
Clear all scripted responses and reset to echo mode.
Returns:
| Type | Description |
|---|---|
EchoProvider
|
Self for chaining |
Source code in packages/llm/src/dataknobs_llm/llm/providers/echo.py
clear_history ¶
reset ¶
Reset all state (responses, history, init count, and close count).
Returns:
| Type | Description |
|---|---|
EchoProvider
|
Self for chaining |
Source code in packages/llm/src/dataknobs_llm/llm/providers/echo.py
get_last_instance
classmethod
¶
Return the most recently created EchoProvider instance.
Useful for inspecting providers created internally by code like
DynaBot.from_config() where the test has no direct reference.
Source code in packages/llm/src/dataknobs_llm/llm/providers/echo.py
track_instances
classmethod
¶
Context manager that collects all EchoProvider instances created within.
Example::
with EchoProvider.track_instances() as instances:
bot = await DynaBot.from_config(config)
assert len(instances) == 1
assert instances[0].close_count == 1 # cleaned up on error path
Source code in packages/llm/src/dataknobs_llm/llm/providers/echo.py
reset_tracking
classmethod
¶
Clear all instance tracking state.
Resets _last_instance and removes any active collectors
(e.g. from a track_instances() context that was not
properly exited). Call in test teardown to prevent cross-test
leakage.
Source code in packages/llm/src/dataknobs_llm/llm/providers/echo.py
get_call ¶
Get a specific call by index.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
index
|
int
|
Call index (supports negative indexing) |
required |
Returns:
| Type | Description |
|---|---|
Dict[str, Any]
|
Call record with 'messages', 'response', 'kwargs' |
Source code in packages/llm/src/dataknobs_llm/llm/providers/echo.py
get_last_call ¶
Get the most recent call, or None if no calls made.
get_last_user_message ¶
Get the last user message from the most recent call.
Source code in packages/llm/src/dataknobs_llm/llm/providers/echo.py
initialize
async
¶
Initialize echo provider. Tracks call count via init_count.
Note: complete(), stream_complete(), embed(), and
function_call() auto-call this method if the provider is not
yet initialized, which also increments init_count.
Source code in packages/llm/src/dataknobs_llm/llm/providers/echo.py
close
async
¶
Close echo provider, tracking the call.
Increments close_count on every call (even redundant ones)
so tests can verify exactly how many times close was invoked.
Source code in packages/llm/src/dataknobs_llm/llm/providers/echo.py
validate_model
async
¶
complete
async
¶
complete(
messages: Union[str, List[LLMMessage]],
config_overrides: Dict[str, Any] | None = None,
tools: list[Any] | None = None,
**kwargs: Any,
) -> LLMResponse
Echo back the input messages or return scripted response.
Response priority: 1. Response function (if set via set_response_function) 2. Pattern match (if added via add_pattern_response) 3. Response queue (if set via set_responses or add_response) 4. Default echo behavior
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
messages
|
Union[str, List[LLMMessage]]
|
Input messages or prompt |
required |
config_overrides
|
Dict[str, Any] | None
|
Optional dict to override config fields (model, temperature, max_tokens, top_p, stop_sequences, seed) |
None
|
tools
|
list[Any] | None
|
Optional list of Tool objects (recorded in call history) |
None
|
**kwargs
|
Any
|
Additional parameters (ignored) |
{}
|
Returns:
| Type | Description |
|---|---|
LLMResponse
|
LLMResponse (scripted or echo) |
Source code in packages/llm/src/dataknobs_llm/llm/providers/echo.py
576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 | |
stream_complete
async
¶
stream_complete(
messages: Union[str, List[LLMMessage]],
config_overrides: Dict[str, Any] | None = None,
tools: list[Any] | None = None,
**kwargs: Any,
) -> AsyncIterator[LLMStreamResponse]
Stream echo response character by character.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
messages
|
Union[str, List[LLMMessage]]
|
Input messages or prompt |
required |
config_overrides
|
Dict[str, Any] | None
|
Optional dict to override config fields (model, temperature, max_tokens, top_p, stop_sequences, seed) |
None
|
tools
|
list[Any] | None
|
Optional list of Tool objects (forwarded to complete()) |
None
|
**kwargs
|
Any
|
Additional parameters (ignored) |
{}
|
Yields:
| Type | Description |
|---|---|
AsyncIterator[LLMStreamResponse]
|
Streaming response chunks with tool_calls on final chunk |
Source code in packages/llm/src/dataknobs_llm/llm/providers/echo.py
embed
async
¶
Generate deterministic mock embeddings.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
texts
|
Union[str, List[str]]
|
Input text(s) |
required |
**kwargs
|
Any
|
Additional parameters (ignored) |
{}
|
Returns:
| Type | Description |
|---|---|
Union[List[float], List[List[float]]]
|
Embedding vector(s) |
Source code in packages/llm/src/dataknobs_llm/llm/providers/echo.py
function_call
async
¶
function_call(
messages: List[LLMMessage], functions: List[Dict[str, Any]], **kwargs: Any
) -> LLMResponse
Mock function calling with deterministic response.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
messages
|
List[LLMMessage]
|
Conversation messages |
required |
functions
|
List[Dict[str, Any]]
|
Available functions |
required |
**kwargs
|
Any
|
Additional parameters (ignored) |
{}
|
Returns:
| Type | Description |
|---|---|
LLMResponse
|
Response with mock function call |
Source code in packages/llm/src/dataknobs_llm/llm/providers/echo.py
778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 | |
CachingEmbedProvider ¶
Bases: AsyncLLMProvider
Provider wrapper that caches embed() results persistently.
Embeddings are deterministic: same (model, text) produces the same
vector. This wrapper caches them once and reuses them across scenarios
and runs. complete(), stream_complete(), and function_call()
pass through to the inner provider unchanged.
Lifecycle ownership: When wrapping a pre-initialized provider (e.g.
one created and initialized by DynaBot.from_config()), the caching
provider does NOT take ownership of the inner provider's lifecycle.
initialize() skips re-initializing the inner, and close() skips
closing it — the original owner is responsible for the inner's lifecycle.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
inner
|
AsyncLLMProvider
|
The real provider to delegate to. |
required |
cache
|
EmbeddingCache
|
The cache backend for storing embeddings. |
required |
Examples::
# Basic usage — cache hit/miss:
inner = EchoProvider({"provider": "echo", "model": "test"})
cache = MemoryEmbeddingCache()
provider = CachingEmbedProvider(inner, cache)
await provider.initialize()
vec1 = await provider.embed("hello") # cache miss -> inner
vec2 = await provider.embed("hello") # cache hit -> no inner call
# Fresh provider — CachingEmbedProvider owns the inner lifecycle:
provider = CachingEmbedProvider(inner, cache)
await provider.initialize() # initializes both inner and cache
await provider.close() # closes both inner and cache
# Pre-initialized provider — caller owns the inner lifecycle:
inner = OllamaProvider(config)
await inner.initialize() # caller initializes
provider = CachingEmbedProvider(inner, MemoryEmbeddingCache())
await provider.initialize() # skips inner, initializes cache only
await provider.close() # skips inner, closes cache only
await inner.close() # caller closes inner
Methods:
| Name | Description |
|---|---|
validate_model |
Delegate model validation to inner provider. |
get_capabilities |
Delegate capabilities to inner provider. |
initialize |
Initialize the inner provider (if not already) and the cache. |
close |
Close the cache, and the inner provider only if we own it. |
complete |
Delegate to inner provider. |
stream_complete |
Delegate to inner provider (async generator passthrough). |
function_call |
Delegate to inner provider. |
embed |
Return cached embeddings, delegating to inner on cache miss. |
Attributes:
| Name | Type | Description |
|---|---|---|
config |
LLMConfig
|
Forward config from the inner provider. |
Source code in packages/llm/src/dataknobs_llm/llm/providers/caching.py
Attributes¶
config
property
writable
¶
Forward config from the inner provider.
The getter always returns the inner provider's config. The setter
is a no-op that absorbs the self.config = ... assignment from
LLMProvider.__init__().
Functions¶
validate_model
async
¶
get_capabilities ¶
initialize
async
¶
Initialize the inner provider (if not already) and the cache.
Skips inner initialization when the inner provider is already
initialized — e.g. when wrapping a provider that was created and
initialized by DynaBot.from_config(). Re-initializing would
open duplicate connections (aiohttp sessions for Ollama, etc.).
When the inner IS initialized here, _owns_inner is set to
True so that close() knows to close it. When the inner
was already initialized (pre-owned), close() leaves it alone.
Source code in packages/llm/src/dataknobs_llm/llm/providers/caching.py
close
async
¶
Close the cache, and the inner provider only if we own it.
When the inner provider was already initialized before wrapping (pre-owned), the original owner is responsible for closing it. Only providers that were initialized by this wrapper are closed.
Source code in packages/llm/src/dataknobs_llm/llm/providers/caching.py
complete
async
¶
complete(
messages: Union[str, List[LLMMessage]],
config_overrides: Dict[str, Any] | None = None,
tools: list[Any] | None = None,
**kwargs: Any,
) -> LLMResponse
Delegate to inner provider.
Source code in packages/llm/src/dataknobs_llm/llm/providers/caching.py
stream_complete
async
¶
stream_complete(
messages: Union[str, List[LLMMessage]],
config_overrides: Dict[str, Any] | None = None,
tools: list[Any] | None = None,
**kwargs: Any,
) -> Any
Delegate to inner provider (async generator passthrough).
Source code in packages/llm/src/dataknobs_llm/llm/providers/caching.py
function_call
async
¶
function_call(
messages: List[LLMMessage], functions: List[Dict[str, Any]], **kwargs: Any
) -> LLMResponse
Delegate to inner provider.
Source code in packages/llm/src/dataknobs_llm/llm/providers/caching.py
embed
async
¶
Return cached embeddings, delegating to inner on cache miss.
Handles both single-text and batch forms. For batches, only the
uncached texts are sent to the inner provider. The inner provider
always receives a list[str] and always returns
list[list[float]].
Source code in packages/llm/src/dataknobs_llm/llm/providers/caching.py
EmbeddingCache ¶
Bases: ABC
Cache for embedding vectors, keyed by (model, text).
Methods:
| Name | Description |
|---|---|
get |
Retrieve a cached vector, or |
put |
Store a vector in the cache. |
get_batch |
Retrieve cached vectors for a batch of texts. |
put_batch |
Store a batch of vectors in the cache. |
initialize |
Prepare the cache backend (create tables, open files, etc.). |
close |
Release cache resources. |
clear |
Remove all cached entries. |
count |
Return the number of cached entries. |
Functions¶
get
abstractmethod
async
¶
put
abstractmethod
async
¶
get_batch
abstractmethod
async
¶
Retrieve cached vectors for a batch of texts.
Returns a list parallel to texts: each element is either the
cached vector or None on a miss.
Source code in packages/llm/src/dataknobs_llm/llm/providers/caching.py
put_batch
abstractmethod
async
¶
initialize
abstractmethod
async
¶
Prepare the cache backend (create tables, open files, etc.).
Implementations MUST be idempotent — calling initialize()
on an already-initialized cache is a no-op. This is required
because CachingEmbedProvider.initialize() calls
cache.initialize() unconditionally.
Source code in packages/llm/src/dataknobs_llm/llm/providers/caching.py
close
abstractmethod
async
¶
Release cache resources.
Implementations MUST be idempotent — calling close() on an
already-closed or never-initialized cache is a no-op.
clear
abstractmethod
async
¶
MemoryEmbeddingCache ¶
Bases: EmbeddingCache
In-memory cache backend for testing.
Source code in packages/llm/src/dataknobs_llm/llm/providers/caching.py
LLMProviderFactory ¶
Factory for creating LLM providers from configuration.
This factory class integrates with the dataknobs Config system, allowing providers to be instantiated via Config.get_factory().
Example
Initialize the factory.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
is_async
|
bool
|
Whether to create async providers (default: True) |
True
|
Methods:
| Name | Description |
|---|---|
create |
Create an LLM provider from configuration. |
register_provider |
Register a custom provider class. |
__call__ |
Allow factory to be called directly. |
Source code in packages/llm/src/dataknobs_llm/llm/providers/__init__.py
Functions¶
create ¶
create(
config: LLMConfig | Config | dict[str, Any], **kwargs: Any
) -> AsyncLLMProvider | SyncLLMProvider
Create an LLM provider from configuration.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
LLMConfig | Config | dict[str, Any]
|
Configuration (LLMConfig, Config object, or dict) |
required |
**kwargs
|
Any
|
Additional arguments passed to provider constructor |
{}
|
Returns:
| Type | Description |
|---|---|
AsyncLLMProvider | SyncLLMProvider
|
LLM provider instance |
Raises:
| Type | Description |
|---|---|
ValueError
|
If provider type is unknown |
Source code in packages/llm/src/dataknobs_llm/llm/providers/__init__.py
register_provider
classmethod
¶
Register a custom provider class.
Allows extending the factory with custom provider implementations.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Provider name (e.g., 'custom') |
required |
provider_class
|
type[AsyncLLMProvider]
|
Provider class (must inherit from AsyncLLMProvider) |
required |
Example
Source code in packages/llm/src/dataknobs_llm/llm/providers/__init__.py
__call__ ¶
__call__(
config: LLMConfig | Config | dict[str, Any], **kwargs: Any
) -> AsyncLLMProvider | SyncLLMProvider
Allow factory to be called directly.
Makes the factory callable for convenience.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
LLMConfig | Config | dict[str, Any]
|
Configuration |
required |
**kwargs
|
Any
|
Additional arguments |
{}
|
Returns:
| Type | Description |
|---|---|
AsyncLLMProvider | SyncLLMProvider
|
LLM provider instance |
Source code in packages/llm/src/dataknobs_llm/llm/providers/__init__.py
TemplateStrategy ¶
Bases: Enum
Template rendering strategies.
MessageTemplate
dataclass
¶
MessageTemplate(
template: str,
variables: List[str] = list(),
strategy: TemplateStrategy = TemplateStrategy.SIMPLE,
)
Template for generating message content with multiple rendering strategies.
Supports two template strategies: 1. SIMPLE (default): Uses Python str.format() with {variable} syntax. - All variables must be provided - Clean and straightforward - Example: "Hello {name}!"
- CONDITIONAL: Advanced conditional rendering with {{variable}} and ((conditional)) syntax.
- Variables can be optional
- Conditional sections with (( ... ))
- Whitespace-aware substitution
- Example: "Hello {{name}}((, you have {{count}} messages))"
Methods:
| Name | Description |
|---|---|
__post_init__ |
Extract variables from template based on strategy. |
format |
Format template with variables using the selected strategy. |
partial |
Create partial template with some variables filled. |
from_conditional |
Create a MessageTemplate using the CONDITIONAL strategy. |
Functions¶
__post_init__ ¶
Extract variables from template based on strategy.
Source code in packages/llm/src/dataknobs_llm/llm/utils.py
format ¶
Format template with variables using the selected strategy.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
**kwargs
|
Any
|
Variable values |
{}
|
Returns:
| Type | Description |
|---|---|
str
|
Formatted prompt |
Raises:
| Type | Description |
|---|---|
ValueError
|
If using SIMPLE strategy and required variables are missing |
Source code in packages/llm/src/dataknobs_llm/llm/utils.py
partial ¶
Create partial template with some variables filled.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
**kwargs
|
Any
|
Variable values to fill |
{}
|
Returns:
| Type | Description |
|---|---|
MessageTemplate
|
New template with partial values |
Source code in packages/llm/src/dataknobs_llm/llm/utils.py
from_conditional
classmethod
¶
Create a MessageTemplate using the CONDITIONAL strategy.
Convenience method for creating templates with advanced conditional rendering.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
template
|
str
|
Template string with {{variable}} and ((conditional)) syntax |
required |
variables
|
List[str] | None
|
Optional explicit list of variables |
None
|
Returns:
| Type | Description |
|---|---|
MessageTemplate
|
MessageTemplate configured with CONDITIONAL strategy |
Example
Source code in packages/llm/src/dataknobs_llm/llm/utils.py
MessageBuilder ¶
Builder for constructing message sequences.
Methods:
| Name | Description |
|---|---|
system |
Add system message. |
user |
Add user message. |
assistant |
Add assistant message. |
function |
Add function message. |
from_template |
Add message from template. |
build |
Build message list. |
clear |
Clear all messages. |
Source code in packages/llm/src/dataknobs_llm/llm/utils.py
Functions¶
system ¶
Add system message.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
content
|
str
|
Message content |
required |
Returns:
| Type | Description |
|---|---|
MessageBuilder
|
Self for chaining |
Source code in packages/llm/src/dataknobs_llm/llm/utils.py
user ¶
Add user message.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
content
|
str
|
Message content |
required |
Returns:
| Type | Description |
|---|---|
MessageBuilder
|
Self for chaining |
Source code in packages/llm/src/dataknobs_llm/llm/utils.py
assistant ¶
Add assistant message.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
content
|
str
|
Message content |
required |
Returns:
| Type | Description |
|---|---|
MessageBuilder
|
Self for chaining |
Source code in packages/llm/src/dataknobs_llm/llm/utils.py
function ¶
Add function message.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Function name |
required |
content
|
str
|
Function result |
required |
function_call
|
Dict[str, Any] | None
|
Function call details |
None
|
Returns:
| Type | Description |
|---|---|
MessageBuilder
|
Self for chaining |
Source code in packages/llm/src/dataknobs_llm/llm/utils.py
from_template ¶
Add message from template.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
role
|
str
|
Message role |
required |
template
|
MessageTemplate
|
Message template |
required |
**kwargs
|
Any
|
Template variables |
{}
|
Returns:
| Type | Description |
|---|---|
MessageBuilder
|
Self for chaining |
Source code in packages/llm/src/dataknobs_llm/llm/utils.py
build ¶
clear ¶
ResponseParser ¶
Parser for LLM responses.
Methods:
| Name | Description |
|---|---|
extract_json |
Extract JSON from response. |
extract_code |
Extract code blocks from response. |
extract_list |
Extract list items from response. |
extract_sections |
Extract sections from response. |
Functions¶
extract_json
staticmethod
¶
Extract JSON from response.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
response
|
Union[str, LLMResponse]
|
LLM response |
required |
Returns:
| Type | Description |
|---|---|
Dict[str, Any] | None
|
Extracted JSON or None |
Source code in packages/llm/src/dataknobs_llm/llm/utils.py
extract_code
staticmethod
¶
Extract code blocks from response.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
response
|
Union[str, LLMResponse]
|
LLM response |
required |
language
|
str | None
|
Optional language filter |
None
|
Returns:
| Type | Description |
|---|---|
List[str]
|
List of code blocks |
Source code in packages/llm/src/dataknobs_llm/llm/utils.py
extract_list
staticmethod
¶
Extract list items from response.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
response
|
Union[str, LLMResponse]
|
LLM response |
required |
numbered
|
bool
|
Whether to look for numbered lists |
False
|
Returns:
| Type | Description |
|---|---|
List[str]
|
List of items |
Source code in packages/llm/src/dataknobs_llm/llm/utils.py
extract_sections
staticmethod
¶
Extract sections from response.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
response
|
Union[str, LLMResponse]
|
LLM response |
required |
Returns:
| Type | Description |
|---|---|
Dict[str, str]
|
Dictionary of section name to content |
Source code in packages/llm/src/dataknobs_llm/llm/utils.py
TokenCounter ¶
Estimate token counts for different models.
Methods:
| Name | Description |
|---|---|
estimate_tokens |
Estimate token count for text. |
estimate_messages_tokens |
Estimate token count for messages. |
fits_in_context |
Check if text fits in context window. |
Functions¶
estimate_tokens
classmethod
¶
Estimate token count for text.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
text
|
str
|
Input text |
required |
model
|
str
|
Model name |
'default'
|
Returns:
| Type | Description |
|---|---|
int
|
Estimated token count |
Source code in packages/llm/src/dataknobs_llm/llm/utils.py
estimate_messages_tokens
classmethod
¶
Estimate token count for messages.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
messages
|
List[LLMMessage]
|
List of messages |
required |
model
|
str
|
Model name |
'default'
|
Returns:
| Type | Description |
|---|---|
int
|
Estimated token count |
Source code in packages/llm/src/dataknobs_llm/llm/utils.py
fits_in_context
classmethod
¶
Check if text fits in context window.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
text
|
str
|
Input text |
required |
model
|
str
|
Model name |
required |
max_tokens
|
int
|
Maximum token limit |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if fits |
Source code in packages/llm/src/dataknobs_llm/llm/utils.py
CostCalculator ¶
Calculate costs for LLM usage.
Methods:
| Name | Description |
|---|---|
calculate_cost |
Calculate cost for LLM response. |
estimate_cost |
Estimate cost for text completion. |
Functions¶
calculate_cost
classmethod
¶
Calculate cost for LLM response.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
response
|
LLMResponse
|
LLM response with usage info |
required |
model
|
str | None
|
Model name (if not in response) |
None
|
Returns:
| Type | Description |
|---|---|
float | None
|
Cost in USD or None if cannot calculate |
Source code in packages/llm/src/dataknobs_llm/llm/utils.py
estimate_cost
classmethod
¶
Estimate cost for text completion.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
text
|
str
|
Input text |
required |
model
|
str
|
Model name |
required |
expected_output_tokens
|
int
|
Expected output length |
100
|
Returns:
| Type | Description |
|---|---|
float | None
|
Estimated cost in USD |
Source code in packages/llm/src/dataknobs_llm/llm/utils.py
Tool ¶
Bases: ABC
Abstract base class for LLM-callable tools.
A Tool represents a function that can be called by an LLM during generation. Each tool has a name, description, parameter schema, and execution logic.
Example
class CalculatorTool(Tool): def init(self): super().init( name="calculator", description="Performs basic arithmetic operations" )
@property
def schema(self) -> Dict[str, Any]:
return {
"type": "object",
"properties": {
"operation": {
"type": "string",
"enum": ["add", "subtract", "multiply", "divide"]
},
"a": {"type": "number"},
"b": {"type": "number"}
},
"required": ["operation", "a", "b"]
}
async def execute(self, operation: str, a: float, b: float) -> float:
if operation == "add":
return a + b
elif operation == "subtract":
return a - b
elif operation == "multiply":
return a * b
elif operation == "divide":
return a / b
else:
raise ValueError(f"Unknown operation: {operation}")
Initialize a tool.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Unique identifier for the tool |
required |
description
|
str
|
Human-readable description of what the tool does |
required |
metadata
|
Dict[str, Any] | None
|
Optional metadata about the tool |
None
|
Methods:
| Name | Description |
|---|---|
execute |
Execute the tool with given parameters. |
to_function_definition |
Convert tool to OpenAI function calling format. |
to_anthropic_tool_definition |
Convert tool to Anthropic tool format. |
validate_parameters |
Validate parameters against schema. |
__repr__ |
String representation of tool. |
__str__ |
Human-readable string representation. |
Attributes:
| Name | Type | Description |
|---|---|---|
schema |
Dict[str, Any]
|
Get JSON schema for tool parameters. |
Source code in packages/llm/src/dataknobs_llm/tools/base.py
Attributes¶
schema
abstractmethod
property
¶
Get JSON schema for tool parameters.
Returns a JSON Schema dictionary describing the parameters this tool accepts. The schema should follow the JSON Schema specification and is used by LLMs to understand how to call the tool.
Returns:
| Type | Description |
|---|---|
Dict[str, Any]
|
JSON Schema dictionary for tool parameters |
Example
{ "type": "object", "properties": { "query": { "type": "string", "description": "The search query" }, "max_results": { "type": "integer", "description": "Maximum number of results", "default": 10 } }, "required": ["query"] }
Functions¶
execute
abstractmethod
async
¶
Execute the tool with given parameters.
This method performs the actual tool logic. Parameters are passed as keyword arguments matching the schema definition.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
**kwargs
|
Any
|
Tool parameters matching the schema |
{}
|
Returns:
| Type | Description |
|---|---|
Any
|
Tool execution result (can be any JSON-serializable type) |
Raises:
| Type | Description |
|---|---|
Exception
|
If tool execution fails |
Source code in packages/llm/src/dataknobs_llm/tools/base.py
to_function_definition ¶
Convert tool to OpenAI function calling format.
Returns a dictionary in the format expected by OpenAI's function calling API.
Returns:
| Type | Description |
|---|---|
Dict[str, Any]
|
Function definition dictionary |
Example
{ "name": "search_web", "description": "Search the web for information", "parameters": { "type": "object", "properties": { "query": {"type": "string"} }, "required": ["query"] } }
Source code in packages/llm/src/dataknobs_llm/tools/base.py
to_anthropic_tool_definition ¶
Convert tool to Anthropic tool format.
Returns a dictionary in the format expected by Anthropic's Claude API.
Returns:
| Type | Description |
|---|---|
Dict[str, Any]
|
Tool definition dictionary |
Source code in packages/llm/src/dataknobs_llm/tools/base.py
validate_parameters ¶
Validate parameters against schema.
Optional method for parameter validation before execution. By default, assumes LLM provides valid parameters.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
**kwargs
|
Any
|
Parameters to validate |
{}
|
Returns:
| Type | Description |
|---|---|
bool
|
True if valid, False otherwise |
Source code in packages/llm/src/dataknobs_llm/tools/base.py
__repr__ ¶
ToolRegistry ¶
Registry for managing available tools/functions.
The ToolRegistry provides a central place to register and discover tools that can be called by LLMs. It supports tool registration, retrieval, listing, and conversion to function calling formats.
Built on dataknobs_common.Registry for consistency across the ecosystem.
Example
# Create registry
registry = ToolRegistry()
# Register tools
registry.register_tool(CalculatorTool())
registry.register_tool(WebSearchTool())
# Check if tool exists
if registry.has_tool("calculator"):
tool = registry.get_tool("calculator")
result = await tool.execute(operation="add", a=5, b=3)
# Get all tools for LLM function calling
functions = registry.to_function_definitions()
# List available tools
tools = registry.list_tools()
for tool_info in tools:
print(f"{tool_info['name']}: {tool_info['description']}")
Initialize a tool registry.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
track_executions
|
bool
|
If True, record execution history for observability and debugging. Default False. |
False
|
max_execution_history
|
int
|
Maximum number of execution records to retain when tracking is enabled. Default 100. |
100
|
Methods:
| Name | Description |
|---|---|
register_tool |
Register a tool. |
register_many |
Register multiple tools at once. |
get_tool |
Get a tool by name. |
has_tool |
Check if a tool with the given name exists. |
list_tools |
List all registered tools with their metadata. |
get_tool_names |
Get list of all registered tool names. |
to_function_definitions |
Convert tools to OpenAI function calling format. |
to_anthropic_tool_definitions |
Convert tools to Anthropic tool format. |
execute_tool |
Execute a tool by name with given parameters. |
get_execution_history |
Query tool execution history. |
get_execution_stats |
Get aggregated execution statistics. |
clear_execution_history |
Clear all execution history. |
execution_history_count |
Get number of records in execution history. |
filter_by_metadata |
Filter tools by metadata attributes. |
clone |
Create a shallow copy of this registry. |
__repr__ |
String representation of registry. |
__str__ |
Human-readable string representation. |
Attributes:
| Name | Type | Description |
|---|---|---|
tracking_enabled |
bool
|
Check if execution tracking is enabled. |
Source code in packages/llm/src/dataknobs_llm/tools/registry.py
Attributes¶
tracking_enabled
property
¶
Check if execution tracking is enabled.
Returns:
| Type | Description |
|---|---|
bool
|
True if tracking is enabled, False otherwise |
Functions¶
register_tool ¶
Register a tool.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
tool
|
Tool
|
Tool instance to register |
required |
Raises:
| Type | Description |
|---|---|
OperationError
|
If a tool with the same name already exists |
Example
calculator = CalculatorTool() registry.register_tool(calculator)
Source code in packages/llm/src/dataknobs_llm/tools/registry.py
register_many ¶
Register multiple tools at once.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
tools
|
List[Tool]
|
List of Tool instances to register |
required |
Raises:
| Type | Description |
|---|---|
OperationError
|
If any tool name conflicts with existing tools |
Example
tools = [CalculatorTool(), SearchTool(), WeatherTool()] registry.register_many(tools)
Source code in packages/llm/src/dataknobs_llm/tools/registry.py
get_tool ¶
Get a tool by name.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Name of the tool to retrieve |
required |
Returns:
| Type | Description |
|---|---|
Tool
|
Tool instance |
Raises:
| Type | Description |
|---|---|
NotFoundError
|
If no tool with the given name exists |
Example
tool = registry.get_tool("calculator") result = await tool.execute(operation="add", a=5, b=3)
Source code in packages/llm/src/dataknobs_llm/tools/registry.py
has_tool ¶
Check if a tool with the given name exists.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Name of the tool to check |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if tool exists, False otherwise |
Source code in packages/llm/src/dataknobs_llm/tools/registry.py
list_tools ¶
List all registered tools with their metadata.
Returns:
| Type | Description |
|---|---|
List[Dict[str, Any]]
|
List of dictionaries containing tool information |
Example
tools = registry.list_tools()
for tool_info in tools:
print(f"{tool_info['name']}: {tool_info['description']}")
Returns format: [ { "name": "calculator", "description": "Performs arithmetic operations", "schema": {...}, "metadata": {...} }, ... ]
Source code in packages/llm/src/dataknobs_llm/tools/registry.py
get_tool_names ¶
Get list of all registered tool names.
Returns:
| Type | Description |
|---|---|
List[str]
|
List of tool names |
Example
names = registry.get_tool_names() print(names) ['calculator', 'search', 'weather']
Source code in packages/llm/src/dataknobs_llm/tools/registry.py
to_function_definitions ¶
to_function_definitions(
include_only: Set[str] | None = None, exclude: Set[str] | None = None
) -> List[Dict[str, Any]]
Convert tools to OpenAI function calling format.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
include_only
|
Set[str] | None
|
If provided, only include tools with these names |
None
|
exclude
|
Set[str] | None
|
If provided, exclude tools with these names |
None
|
Returns:
| Type | Description |
|---|---|
List[Dict[str, Any]]
|
List of function definition dictionaries |
Example
Source code in packages/llm/src/dataknobs_llm/tools/registry.py
to_anthropic_tool_definitions ¶
to_anthropic_tool_definitions(
include_only: Set[str] | None = None, exclude: Set[str] | None = None
) -> List[Dict[str, Any]]
Convert tools to Anthropic tool format.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
include_only
|
Set[str] | None
|
If provided, only include tools with these names |
None
|
exclude
|
Set[str] | None
|
If provided, exclude tools with these names |
None
|
Returns:
| Type | Description |
|---|---|
List[Dict[str, Any]]
|
List of tool definition dictionaries |
Example
Source code in packages/llm/src/dataknobs_llm/tools/registry.py
execute_tool
async
¶
Execute a tool by name with given parameters.
This is a convenience method for getting and executing a tool in a single call. When execution tracking is enabled, records timing, parameters, and results for observability.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Name of the tool to execute |
required |
**kwargs
|
Any
|
Parameters to pass to the tool. Special parameters starting with '_' (like _context) are passed to the tool but excluded from execution records. |
{}
|
Returns:
| Type | Description |
|---|---|
Any
|
Tool execution result |
Raises:
| Type | Description |
|---|---|
NotFoundError
|
If tool not found |
Exception
|
If tool execution fails |
Example
result = await registry.execute_tool(
"calculator",
operation="add",
a=5,
b=3
)
print(result)
# 8
# With tracking enabled
registry = ToolRegistry(track_executions=True)
await registry.execute_tool("calculator", operation="add", a=5, b=3)
history = registry.get_execution_history(tool_name="calculator")
Source code in packages/llm/src/dataknobs_llm/tools/registry.py
301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 | |
get_execution_history ¶
get_execution_history(
tool_name: str | None = None,
context_id: str | None = None,
since: float | None = None,
until: float | None = None,
success_only: bool = False,
failed_only: bool = False,
limit: int | None = None,
) -> list[ToolExecutionRecord]
Query tool execution history.
Only available when tracking is enabled.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
tool_name
|
str | None
|
Filter by tool name |
None
|
context_id
|
str | None
|
Filter by context/conversation ID |
None
|
since
|
float | None
|
Filter to records after this timestamp |
None
|
until
|
float | None
|
Filter to records before this timestamp |
None
|
success_only
|
bool
|
Only include successful executions |
False
|
failed_only
|
bool
|
Only include failed executions |
False
|
limit
|
int | None
|
Maximum number of records to return |
None
|
Returns:
| Type | Description |
|---|---|
list[ToolExecutionRecord]
|
List of matching execution records, or empty list if |
list[ToolExecutionRecord]
|
tracking is not enabled |
Example
# Get all executions for a specific tool
calc_history = registry.get_execution_history(tool_name="calculator")
# Get recent failed executions
failures = registry.get_execution_history(
since=time.time() - 3600,
failed_only=True
)
# Get executions for a conversation
conv_history = registry.get_execution_history(
context_id="conv-123"
)
Source code in packages/llm/src/dataknobs_llm/tools/registry.py
get_execution_stats ¶
Get aggregated execution statistics.
Only available when tracking is enabled.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
tool_name
|
str | None
|
Get stats for specific tool, or None for all tools |
None
|
Returns:
| Type | Description |
|---|---|
ExecutionStats
|
ExecutionStats with aggregated metrics, or empty stats if |
ExecutionStats
|
tracking is not enabled |
Example
# Get stats for all tools
all_stats = registry.get_execution_stats()
print(f"Total: {all_stats.total_executions}")
print(f"Success rate: {all_stats.success_rate:.1f}%")
# Get stats for specific tool
calc_stats = registry.get_execution_stats("calculator")
print(f"Avg duration: {calc_stats.avg_duration_ms:.2f}ms")
Source code in packages/llm/src/dataknobs_llm/tools/registry.py
clear_execution_history ¶
Clear all execution history.
Has no effect if tracking is not enabled.
Source code in packages/llm/src/dataknobs_llm/tools/registry.py
execution_history_count ¶
Get number of records in execution history.
Returns:
| Type | Description |
|---|---|
int
|
Number of records, or 0 if tracking is not enabled |
Source code in packages/llm/src/dataknobs_llm/tools/registry.py
filter_by_metadata ¶
Filter tools by metadata attributes.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
**filters
|
Any
|
Key-value pairs to match in tool metadata |
{}
|
Returns:
| Type | Description |
|---|---|
List[Tool]
|
List of tools matching all filters |
Example
Source code in packages/llm/src/dataknobs_llm/tools/registry.py
clone ¶
Create a shallow copy of this registry.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
preserve_history
|
bool
|
If True and tracking is enabled, copy execution history to the new registry. Default False. |
False
|
Returns:
| Type | Description |
|---|---|
ToolRegistry
|
New ToolRegistry with same tools and tracking settings |
Example
original = ToolRegistry(track_executions=True) original.register_tool(CalculatorTool())
copy = original.clone() copy.count() 1 copy.tracking_enabled True
Source code in packages/llm/src/dataknobs_llm/tools/registry.py
__repr__ ¶
__str__ ¶
Human-readable string representation.
Source code in packages/llm/src/dataknobs_llm/tools/registry.py
DeterministicTask
dataclass
¶
DeterministicTask(
fn: Callable[..., Any],
args: tuple[Any, ...] = (),
kwargs: dict[str, Any] = dict(),
tag: str = "",
)
A sync or async callable to execute alongside LLM tasks.
Attributes:
| Name | Type | Description |
|---|---|---|
fn |
Callable[..., Any]
|
The callable to execute. May be sync or async. |
args |
tuple[Any, ...]
|
Positional arguments forwarded to fn. |
kwargs |
dict[str, Any]
|
Keyword arguments forwarded to fn. |
tag |
str
|
Identifier for result lookup. |
LLMTask
dataclass
¶
LLMTask(
messages: list[LLMMessage],
config_overrides: dict[str, Any] | None = None,
retry: RetryConfig | None = None,
tag: str = "",
)
A single LLM call to execute.
Attributes:
| Name | Type | Description |
|---|---|---|
messages |
list[LLMMessage]
|
Messages to send to the LLM provider. |
config_overrides |
dict[str, Any] | None
|
Per-task provider config overrides (temperature, model, etc.). |
retry |
RetryConfig | None
|
Per-task retry policy. Overrides the executor's default_retry. |
tag |
str
|
Identifier for result lookup. Populated automatically from the dict key
when using |
ParallelLLMExecutor ¶
ParallelLLMExecutor(
provider: AsyncLLMProvider,
max_concurrency: int = 5,
default_retry: RetryConfig | None = None,
default_config_overrides: dict[str, Any] | None = None,
)
Runs multiple LLM calls and deterministic functions concurrently.
Features:
- Concurrency control via asyncio.Semaphore
- Per-task error isolation (one failure does not cancel others)
- Optional per-task retry via RetryExecutor
- Mixed execution of LLM tasks and deterministic callables
Example
executor = ParallelLLMExecutor(provider, max_concurrency=3) results = await executor.execute({ ... "q1": LLMTask(messages=[LLMMessage(role="user", content="Hello")]), ... "q2": LLMTask(messages=[LLMMessage(role="user", content="World")]), ... }) assert results["q1"].success
Initialize the executor.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
provider
|
AsyncLLMProvider
|
The LLM provider to use for LLM tasks. |
required |
max_concurrency
|
int
|
Maximum number of concurrent tasks. |
5
|
default_retry
|
RetryConfig | None
|
Default retry policy applied to tasks that do not specify their own. |
None
|
default_config_overrides
|
dict[str, Any] | None
|
Config overrides applied to every LLM
task. Per-task |
None
|
Methods:
| Name | Description |
|---|---|
execute |
Run LLM tasks concurrently with error isolation. |
execute_mixed |
Run a mix of LLM and deterministic tasks concurrently. |
execute_sequential |
Run LLM tasks sequentially, optionally passing results forward. |
Source code in packages/llm/src/dataknobs_llm/execution/parallel.py
Functions¶
execute
async
¶
Run LLM tasks concurrently with error isolation.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
tasks
|
dict[str, LLMTask]
|
Mapping of tag to LLMTask. Tags identify results. |
required |
Returns:
| Type | Description |
|---|---|
dict[str, TaskResult]
|
Mapping of tag to TaskResult. |
Source code in packages/llm/src/dataknobs_llm/execution/parallel.py
execute_mixed
async
¶
Run a mix of LLM and deterministic tasks concurrently.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
tasks
|
dict[str, LLMTask | DeterministicTask]
|
Mapping of tag to task (LLMTask or DeterministicTask). |
required |
Returns:
| Type | Description |
|---|---|
dict[str, TaskResult]
|
Mapping of tag to TaskResult. |
Source code in packages/llm/src/dataknobs_llm/execution/parallel.py
execute_sequential
async
¶
Run LLM tasks sequentially, optionally passing results forward.
When pass_result is True, each task's messages are augmented with
the previous task's response as an assistant message.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
tasks
|
list[LLMTask]
|
Ordered list of LLM tasks to run. |
required |
pass_result
|
bool
|
If True, append previous result as assistant message. |
False
|
Returns:
| Type | Description |
|---|---|
list[TaskResult]
|
List of TaskResult in execution order. |
Source code in packages/llm/src/dataknobs_llm/execution/parallel.py
TaskResult
dataclass
¶
TaskResult(
tag: str,
success: bool,
value: LLMResponse | Any,
error: Exception | None = None,
duration_ms: float = 0.0,
)
Result of a single task execution.
Attributes:
| Name | Type | Description |
|---|---|---|
tag |
str
|
The task identifier. |
success |
bool
|
Whether the task completed without error. |
value |
LLMResponse | Any
|
The return value. |
error |
Exception | None
|
The exception if the task failed, |
duration_ms |
float
|
Wall-clock execution time in milliseconds. |
ResponseQueueExhaustedError ¶
Bases: OperationError
EchoProvider response queue exhausted in strict mode.
Raised when strict=True and a complete() call is made after
all scripted responses have been consumed. Indicates the test
scripted fewer responses than the code actually needed.
Source code in packages/llm/src/dataknobs_llm/exceptions.py
ToolsNotSupportedError ¶
Bases: OperationError
Model does not support tool/function calling.
Source code in packages/llm/src/dataknobs_llm/exceptions.py
CallTracker ¶
Collect new LLM calls across multiple CapturingProviders per turn.
In multi-LLM bot scenarios (e.g. a main LLM and an extraction LLM),
a single user turn may trigger calls to several providers.
CallTracker collects calls from all registered providers since the
last collection, assigns sequential global indices, and returns them
in registration order.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
**providers
|
CapturingProvider
|
Named |
{}
|
Example
main = CapturingProvider(real_main, role="main")
extraction = CapturingProvider(real_extraction, role="extraction")
tracker = CallTracker(main=main, extraction=extraction)
# ... run a bot turn that triggers LLM calls ...
new_calls = tracker.collect_new_calls()
for call in new_calls:
print(f"[{call.call_index}] {call.role}: {call.response['content'][:40]}")
Methods:
| Name | Description |
|---|---|
get_provider |
Get a registered provider by name. |
collect_new_calls |
Collect calls made since the last collection. |
Attributes:
| Name | Type | Description |
|---|---|---|
provider_names |
list[str]
|
Get list of registered provider names. |
total_calls |
int
|
Total number of calls collected across all providers. |
Source code in packages/llm/src/dataknobs_llm/testing.py
Attributes¶
Functions¶
get_provider ¶
Get a registered provider by name.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Provider registration name |
required |
Returns:
| Type | Description |
|---|---|
CapturingProvider | None
|
CapturingProvider or None if not found |
Source code in packages/llm/src/dataknobs_llm/testing.py
collect_new_calls ¶
Collect calls made since the last collection.
Returns new calls from all providers, sorted by provider
registration order. Each call receives a sequential
call_index relative to all previously collected calls.
Returns:
| Type | Description |
|---|---|
list[CapturedCall]
|
List of new CapturedCall objects with global call_index values |
Source code in packages/llm/src/dataknobs_llm/testing.py
CapturedCall
dataclass
¶
CapturedCall(
role: str,
messages: list[dict[str, Any]],
response: dict[str, Any],
config_overrides: dict[str, Any] | None = None,
tools: list[Any] | None = None,
duration_seconds: float = 0.0,
call_index: int = 0,
)
Record of a single LLM call captured by CapturingProvider.
Attributes:
| Name | Type | Description |
|---|---|---|
role |
str
|
Provider role tag (e.g., "main", "extraction") |
messages |
list[dict[str, Any]]
|
Serialized request messages (list of dicts) |
response |
dict[str, Any]
|
Serialized LLM response (dict) |
config_overrides |
dict[str, Any] | None
|
Config overrides passed to the call, if any |
tools |
list[Any] | None
|
Tool definitions passed to the call, if any |
duration_seconds |
float
|
Wall-clock duration of the call |
call_index |
int
|
Per-instance call ordering (0-based) |
CapturingProvider ¶
Bases: AsyncLLMProvider
Provider wrapper that records all LLM calls for capture-replay testing.
Wraps a real AsyncLLMProvider delegate, forwarding all calls while
recording request/response pairs as CapturedCall objects. The role
tag (e.g., "main" or "extraction") enables replay to route responses
to the correct EchoProvider.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
delegate
|
AsyncLLMProvider
|
Real provider to wrap |
required |
role
|
str
|
Tag identifying this provider's role (default: "main") |
'main'
|
Example
from dataknobs_llm.testing import CapturingProvider
real_provider = OllamaProvider(config)
capturing = CapturingProvider(real_provider, role="main")
# Use normally — calls pass through to the real provider
response = await capturing.complete(messages)
# Inspect what was captured
assert capturing.call_count == 1
call = capturing.captured_calls[0]
print(f"Sent {len(call.messages)} messages, got: {call.response['content'][:50]}")
Methods:
| Name | Description |
|---|---|
initialize |
Delegate initialization to the wrapped provider. |
close |
Delegate close to the wrapped provider. |
validate_model |
Delegate model validation to the wrapped provider. |
get_capabilities |
Delegate capability detection to the wrapped provider. |
complete |
Forward completion to delegate and capture the call. |
stream_complete |
Forward streaming completion to delegate and capture the assembled response. |
embed |
Delegate embedding to the wrapped provider (not captured). |
function_call |
Delegate function calling to the wrapped provider and capture the call. |
Attributes:
| Name | Type | Description |
|---|---|---|
role |
str
|
Provider role tag. |
captured_calls |
list[CapturedCall]
|
All captured calls (read-only copy). |
call_count |
int
|
Number of captured calls. |
Source code in packages/llm/src/dataknobs_llm/testing.py
Attributes¶
Functions¶
initialize
async
¶
close
async
¶
validate_model
async
¶
Delegate model validation to the wrapped provider.
get_capabilities ¶
complete
async
¶
complete(
messages: Union[str, List[LLMMessage]],
config_overrides: Dict[str, Any] | None = None,
tools: list[Any] | None = None,
**kwargs: Any,
) -> LLMResponse
Forward completion to delegate and capture the call.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
messages
|
Union[str, List[LLMMessage]]
|
Input messages (string or list of LLMMessage) |
required |
config_overrides
|
Dict[str, Any] | None
|
Optional per-request config overrides |
None
|
tools
|
list[Any] | None
|
Optional tool definitions |
None
|
**kwargs
|
Any
|
Additional provider-specific parameters |
{}
|
Returns:
| Type | Description |
|---|---|
LLMResponse
|
LLMResponse from the delegate (unchanged) |
Source code in packages/llm/src/dataknobs_llm/testing.py
stream_complete
async
¶
stream_complete(
messages: Union[str, List[LLMMessage]],
config_overrides: Dict[str, Any] | None = None,
tools: list[Any] | None = None,
**kwargs: Any,
) -> AsyncIterator[LLMStreamResponse]
Forward streaming completion to delegate and capture the assembled response.
Yields chunks to the caller in real time. After the stream completes, assembles the full response content and records a CapturedCall.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
messages
|
Union[str, List[LLMMessage]]
|
Input messages |
required |
config_overrides
|
Dict[str, Any] | None
|
Optional per-request config overrides |
None
|
tools
|
list[Any] | None
|
Optional tool definitions |
None
|
**kwargs
|
Any
|
Additional provider-specific parameters |
{}
|
Yields:
| Type | Description |
|---|---|
AsyncIterator[LLMStreamResponse]
|
LLMStreamResponse chunks from the delegate |
Source code in packages/llm/src/dataknobs_llm/testing.py
embed
async
¶
Delegate embedding to the wrapped provider (not captured).
Source code in packages/llm/src/dataknobs_llm/testing.py
function_call
async
¶
function_call(
messages: List[LLMMessage], functions: List[Dict[str, Any]], **kwargs: Any
) -> LLMResponse
Delegate function calling to the wrapped provider and capture the call.
Source code in packages/llm/src/dataknobs_llm/testing.py
ErrorResponse ¶
Marker for a queued error in EchoProvider's response sequence.
When EchoProvider encounters an ErrorResponse in its queue, it raises the contained exception instead of returning an LLMResponse.
Usage
provider = EchoProvider(config) provider.set_responses([ text_response("ok"), ErrorResponse(RuntimeError("provider unavailable")), text_response("recovered"), ])
Source code in packages/llm/src/dataknobs_llm/llm/providers/echo.py
ResponseSequenceBuilder ¶
Builder for creating sequences of LLM responses.
Provides a fluent API for building test response sequences, useful for testing multi-turn conversations and ReAct loops.
Example
Initialize builder.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
model
|
str
|
Model identifier for all responses |
'test-model'
|
Methods:
| Name | Description |
|---|---|
add_text |
Add a text response to the sequence. |
add_tool_call |
Add a tool call response to the sequence. |
add_multi_tool |
Add a multi-tool call response to the sequence. |
add_extraction |
Add an extraction response to the sequence. |
add |
Add a custom LLMResponse to the sequence. |
build |
Build and return the response sequence. |
configure |
Configure an EchoProvider with this sequence. |
Source code in packages/llm/src/dataknobs_llm/testing.py
Functions¶
add_text ¶
Add a text response to the sequence.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
content
|
str
|
Response text |
required |
Returns:
| Type | Description |
|---|---|
ResponseSequenceBuilder
|
Self for chaining |
Source code in packages/llm/src/dataknobs_llm/testing.py
add_tool_call ¶
Add a tool call response to the sequence.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
tool_name
|
str
|
Name of tool to call |
required |
arguments
|
dict[str, Any] | None
|
Tool arguments |
None
|
Returns:
| Type | Description |
|---|---|
ResponseSequenceBuilder
|
Self for chaining |
Source code in packages/llm/src/dataknobs_llm/testing.py
add_multi_tool ¶
Add a multi-tool call response to the sequence.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
tools
|
list[tuple[str, dict[str, Any]]]
|
List of (name, args) tuples |
required |
Returns:
| Type | Description |
|---|---|
ResponseSequenceBuilder
|
Self for chaining |
Source code in packages/llm/src/dataknobs_llm/testing.py
add_extraction ¶
Add an extraction response to the sequence.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data
|
dict[str, Any]
|
Extracted data dict |
required |
Returns:
| Type | Description |
|---|---|
ResponseSequenceBuilder
|
Self for chaining |
Source code in packages/llm/src/dataknobs_llm/testing.py
add ¶
Add a custom LLMResponse to the sequence.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
response
|
LLMResponse
|
Custom response object |
required |
Returns:
| Type | Description |
|---|---|
ResponseSequenceBuilder
|
Self for chaining |
Source code in packages/llm/src/dataknobs_llm/testing.py
build ¶
Build and return the response sequence.
Returns:
| Type | Description |
|---|---|
list[LLMResponse]
|
List of LLMResponse objects |
configure ¶
Configure an EchoProvider with this sequence.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
provider
|
EchoProvider
|
EchoProvider to configure |
required |
Returns:
| Type | Description |
|---|---|
EchoProvider
|
The configured provider |
Source code in packages/llm/src/dataknobs_llm/testing.py
Functions¶
normalize_llm_config ¶
Normalize various config formats to LLMConfig.
This helper function accepts LLMConfig instances, dataknobs Config objects, or plain dictionaries and returns a standardized LLMConfig instance.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
Union[LLMConfig, Config, Dict[str, Any]]
|
Configuration as LLMConfig, Config object, or dictionary |
required |
Returns:
| Type | Description |
|---|---|
LLMConfig
|
LLMConfig instance |
Raises:
| Type | Description |
|---|---|
TypeError
|
If config type is not supported |
Source code in packages/llm/src/dataknobs_llm/llm/base.py
create_llm_provider ¶
create_llm_provider(
config: LLMConfig | Config | dict[str, Any], is_async: bool = True
) -> AsyncLLMProvider | SyncLLMProvider
Create appropriate LLM provider based on configuration.
Convenience function that uses LLMProviderFactory internally. Now supports LLMConfig, Config objects, and dictionaries.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
LLMConfig | Config | dict[str, Any]
|
LLM configuration (LLMConfig, Config, or dict) |
required |
is_async
|
bool
|
Whether to create async provider |
True
|
Returns:
| Type | Description |
|---|---|
AsyncLLMProvider | SyncLLMProvider
|
LLM provider instance |
Example
Source code in packages/llm/src/dataknobs_llm/llm/providers/__init__.py
create_embedding_provider
async
¶
create_embedding_provider(
config: dict[str, Any],
*,
default_provider: str = "ollama",
default_model: str = "nomic-embed-text",
) -> AsyncLLMProvider
Create and initialize an embedding provider from configuration.
Normalizes configuration from two supported formats:
- Nested format:
{"embedding": {"provider": "ollama", "model": "..."}}-- the"embedding"sub-dict is extracted and used. All extra keys in the sub-dict (api_base,api_key,dimensions, etc.) are forwarded to the provider. - Legacy prefix format:
{"embedding_provider": "ollama", "embedding_model": "..."}--embedding_prefixed keys at the top level.api_base,api_key, anddimensionsare also forwarded when present at the top level.
When neither format is present, default_provider / default_model
are used (ollama / nomic-embed-text).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
dict[str, Any]
|
Configuration dict. |
required |
default_provider
|
str
|
Default provider if not specified. |
'ollama'
|
default_model
|
str
|
Default model if not specified. |
'nomic-embed-text'
|
Returns:
| Type | Description |
|---|---|
AsyncLLMProvider
|
Initialized |
Example
Source code in packages/llm/src/dataknobs_llm/llm/providers/__init__.py
199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 | |
create_caching_provider
async
¶
create_caching_provider(
inner: AsyncLLMProvider,
cache_path: str | Path | None = None,
*,
cache_backend: str = "sqlite",
) -> CachingEmbedProvider
Create and initialize a CachingEmbedProvider.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
inner
|
AsyncLLMProvider
|
The provider to wrap. |
required |
cache_path
|
str | Path | None
|
Path for the SQLite cache file. Required when
cache_backend is |
None
|
cache_backend
|
str
|
|
'sqlite'
|
Returns:
| Type | Description |
|---|---|
CachingEmbedProvider
|
An initialized |
Raises:
| Type | Description |
|---|---|
ValueError
|
If cache_backend is |
Source code in packages/llm/src/dataknobs_llm/llm/providers/caching.py
render_conditional_template ¶
Render a template with variable substitution and conditional sections.
Variable substitution: - {{variable}} syntax for placeholders - Variables in params dict are replaced with their values - Variables not in params are left unchanged ({{variable}} remains as-is) - Whitespace handling: {{ var }} -> " value " when substituted, " {{var}} " when not
Conditional sections: - ((optional content)) syntax for conditional blocks - Section is removed if all {{variables}} inside are empty/None/missing - Section is rendered (without parentheses) if any variable has a value - Variables inside conditionals are replaced with empty strings if missing - Nested conditionals are processed recursively
Example
template = "Hello {{name}}((, you have {{count}} messages))" params = {"name": "Alice", "count": 5} result = "Hello Alice, you have 5 messages"
params = {"name": "Bob"} # no count result = "Hello Bob" # conditional section removed
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
template
|
str
|
The template string |
required |
params
|
Dict[str, Any]
|
Dictionary of parameters to substitute |
required |
Returns:
| Type | Description |
|---|---|
str
|
The rendered template |
Source code in packages/llm/src/dataknobs_llm/template_utils.py
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 | |
extraction_response ¶
Create an LLMResponse for schema extraction.
The data is JSON-encoded as the response content, mimicking how extraction LLMs return structured data.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data
|
dict[str, Any]
|
Extracted data dict |
required |
model
|
str
|
Model identifier (default: "test-model") |
'test-model'
|
Returns:
| Type | Description |
|---|---|
LLMResponse
|
LLMResponse with JSON content |
Example
response = extraction_response({"name": "Math Tutor", "level": 5}) import json json.loads(response.content)
Source code in packages/llm/src/dataknobs_llm/testing.py
llm_message_from_dict ¶
Deserialize an LLMMessage from a dict.
Delegates to :meth:LLMMessage.from_dict. Kept for backward compatibility.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
d
|
dict[str, Any]
|
Dictionary representation |
required |
Returns:
| Type | Description |
|---|---|
LLMMessage
|
LLMMessage instance |
Source code in packages/llm/src/dataknobs_llm/testing.py
llm_message_to_dict ¶
Serialize an LLMMessage to a JSON-compatible dict.
Delegates to :meth:LLMMessage.to_dict. Kept for backward compatibility.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
msg
|
LLMMessage
|
LLMMessage to serialize |
required |
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dictionary representation (only non-None optional fields included) |
Source code in packages/llm/src/dataknobs_llm/testing.py
llm_response_from_dict ¶
Deserialize an LLMResponse from a dict.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
d
|
dict[str, Any]
|
Dictionary representation |
required |
Returns:
| Type | Description |
|---|---|
LLMResponse
|
LLMResponse instance |
Source code in packages/llm/src/dataknobs_llm/testing.py
llm_response_to_dict ¶
Serialize an LLMResponse to a JSON-compatible dict.
Omits created_at and cumulative_cost_usd (runtime artifacts that
make captures non-deterministic). Only includes non-None optional fields.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
resp
|
LLMResponse
|
LLMResponse to serialize |
required |
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dictionary representation |
Source code in packages/llm/src/dataknobs_llm/testing.py
multi_tool_response ¶
multi_tool_response(
tools: list[tuple[str, dict[str, Any]]],
*,
content: str = "",
model: str = "test-model",
) -> LLMResponse
Create an LLMResponse with multiple tool calls.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
tools
|
list[tuple[str, dict[str, Any]]]
|
List of (tool_name, arguments) tuples |
required |
content
|
str
|
Optional text content alongside tool calls |
''
|
model
|
str
|
Model identifier (default: "test-model") |
'test-model'
|
Returns:
| Type | Description |
|---|---|
LLMResponse
|
LLMResponse with multiple tool_calls |
Example
response = multi_tool_response([ ... ("preview_config", {}), ... ("validate_config", {"strict": True}), ... ]) len(response.tool_calls) 2
Source code in packages/llm/src/dataknobs_llm/testing.py
text_response ¶
text_response(
content: str,
*,
model: str = "test-model",
finish_reason: str = "stop",
usage: dict[str, int] | None = None,
metadata: dict[str, Any] | None = None,
) -> LLMResponse
Create a simple text LLMResponse.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
content
|
str
|
Response text content |
required |
model
|
str
|
Model identifier (default: "test-model") |
'test-model'
|
finish_reason
|
str
|
Why generation stopped (default: "stop") |
'stop'
|
usage
|
dict[str, int] | None
|
Optional token usage dict |
None
|
metadata
|
dict[str, Any] | None
|
Optional metadata dict |
None
|
Returns:
| Type | Description |
|---|---|
LLMResponse
|
LLMResponse with text content |
Example
response = text_response("Hello, world!") response.content 'Hello, world!'
Source code in packages/llm/src/dataknobs_llm/testing.py
tool_call_from_dict ¶
Deserialize a ToolCall from a dict.
Delegates to :meth:ToolCall.from_dict. Kept for backward compatibility.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
d
|
dict[str, Any]
|
Dictionary representation |
required |
Returns:
| Type | Description |
|---|---|
ToolCall
|
ToolCall instance |
Source code in packages/llm/src/dataknobs_llm/testing.py
tool_call_response ¶
tool_call_response(
tool_name: str,
arguments: dict[str, Any] | None = None,
*,
tool_id: str | None = None,
content: str = "",
model: str = "test-model",
additional_tools: list[tuple[str, dict[str, Any]]] | None = None,
) -> LLMResponse
Create an LLMResponse with tool call(s).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
tool_name
|
str
|
Name of the tool to call |
required |
arguments
|
dict[str, Any] | None
|
Arguments to pass to the tool (default: {}) |
None
|
tool_id
|
str | None
|
Unique ID for the tool call (auto-generated if not provided) |
None
|
content
|
str
|
Optional text content alongside tool call |
''
|
model
|
str
|
Model identifier (default: "test-model") |
'test-model'
|
additional_tools
|
list[tuple[str, dict[str, Any]]] | None
|
Additional tool calls as (name, args) tuples |
None
|
Returns:
| Type | Description |
|---|---|
LLMResponse
|
LLMResponse with tool_calls populated |
Example
response = tool_call_response("get_weather", {"city": "NYC"}) response.tool_calls[0].name 'get_weather' response.tool_calls[0].parameters
Multiple tool calls¶
response = tool_call_response( ... "preview_config", {}, ... additional_tools=[("validate_config", {})] ... ) len(response.tool_calls) 2
Source code in packages/llm/src/dataknobs_llm/testing.py
tool_call_to_dict ¶
Serialize a ToolCall to a JSON-compatible dict.
Delegates to :meth:ToolCall.to_dict. Kept for backward compatibility.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
tc
|
ToolCall
|
ToolCall to serialize |
required |
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dictionary representation |