Skip to content

dataknobs-llm Complete API Reference

Complete auto-generated API documentation from source code docstrings.

💡 Also see: - Curated API Guide - Hand-crafted tutorials and examples - Package Overview - Introduction and getting started - Source Code - View on GitHub


dataknobs_llm

LLM utilities for DataKnobs.

Modules:

Name Description
conversations

Conversation management system for dataknobs-llm.

exceptions

Custom exceptions for the LLM package.

execution

Execution utilities for parallel and sequential LLM task processing.

extraction

Extraction utilities for structured data extraction from text.

fsm_integration

FSM integration for LLM workflows.

llm

LLM abstraction layer for FSM patterns.

prompts

Advanced prompt engineering library for dataknobs_llm.

sources

Intent schema composition for grounded retrieval.

template_utils

Template rendering utilities for dataknobs-llm.

testing

Testing utilities for dataknobs-llm.

tools

Tool system for LLM function calling.

Classes:

Name Description
LLMProvider

Base LLM provider interface.

LLMConfig

Configuration for LLM operations.

LLMMessage

Represents a message in LLM conversation.

LLMResponse

Response from LLM.

LLMStreamResponse

Streaming response from LLM.

CompletionMode

LLM completion modes.

ModelCapability

Model capabilities.

OpenAIProvider

OpenAI LLM provider with full API support.

AnthropicProvider

Anthropic Claude LLM provider with full API support.

OllamaProvider

Ollama local LLM provider for privacy-first, offline LLM usage.

HuggingFaceProvider

HuggingFace Inference API provider.

EchoProvider

Echo provider for testing and debugging.

CachingEmbedProvider

Provider wrapper that caches embed() results persistently.

EmbeddingCache

Cache for embedding vectors, keyed by (model, text).

MemoryEmbeddingCache

In-memory cache backend for testing.

LLMProviderFactory

Factory for creating LLM providers from configuration.

TemplateStrategy

Template rendering strategies.

MessageTemplate

Template for generating message content with multiple rendering strategies.

MessageBuilder

Builder for constructing message sequences.

ResponseParser

Parser for LLM responses.

TokenCounter

Estimate token counts for different models.

CostCalculator

Calculate costs for LLM usage.

Tool

Abstract base class for LLM-callable tools.

ToolRegistry

Registry for managing available tools/functions.

DeterministicTask

A sync or async callable to execute alongside LLM tasks.

LLMTask

A single LLM call to execute.

ParallelLLMExecutor

Runs multiple LLM calls and deterministic functions concurrently.

TaskResult

Result of a single task execution.

ResponseQueueExhaustedError

EchoProvider response queue exhausted in strict mode.

ToolsNotSupportedError

Model does not support tool/function calling.

CallTracker

Collect new LLM calls across multiple CapturingProviders per turn.

CapturedCall

Record of a single LLM call captured by CapturingProvider.

CapturingProvider

Provider wrapper that records all LLM calls for capture-replay testing.

ErrorResponse

Marker for a queued error in EchoProvider's response sequence.

ResponseSequenceBuilder

Builder for creating sequences of LLM responses.

Functions:

Name Description
normalize_llm_config

Normalize various config formats to LLMConfig.

create_llm_provider

Create appropriate LLM provider based on configuration.

create_embedding_provider

Create and initialize an embedding provider from configuration.

create_caching_provider

Create and initialize a CachingEmbedProvider.

render_conditional_template

Render a template with variable substitution and conditional sections.

extraction_response

Create an LLMResponse for schema extraction.

llm_message_from_dict

Deserialize an LLMMessage from a dict.

llm_message_to_dict

Serialize an LLMMessage to a JSON-compatible dict.

llm_response_from_dict

Deserialize an LLMResponse from a dict.

llm_response_to_dict

Serialize an LLMResponse to a JSON-compatible dict.

multi_tool_response

Create an LLMResponse with multiple tool calls.

text_response

Create a simple text LLMResponse.

tool_call_from_dict

Deserialize a ToolCall from a dict.

tool_call_response

Create an LLMResponse with tool call(s).

tool_call_to_dict

Serialize a ToolCall to a JSON-compatible dict.

Classes

LLMProvider

LLMProvider(
    config: Union[LLMConfig, Config, Dict[str, Any]],
    prompt_builder: Union[PromptBuilder, AsyncPromptBuilder] | None = None,
)

Bases: ABC

Base LLM provider interface.

Initialize provider with configuration.

Parameters:

Name Type Description Default
config Union[LLMConfig, Config, Dict[str, Any]]

Configuration as LLMConfig, dataknobs Config object, or dict

required
prompt_builder Union[PromptBuilder, AsyncPromptBuilder] | None

Optional prompt builder for integrated prompting

None

Methods:

Name Description
initialize

Initialize the LLM client.

close

Close the LLM client.

validate_model

Validate that the model is available.

get_capabilities

Get model capabilities.

__enter__

Context manager entry.

__exit__

Context manager exit.

Attributes:

Name Type Description
is_initialized bool

Check if provider is initialized.

Source code in packages/llm/src/dataknobs_llm/llm/base.py
def __init__(
    self,
    config: Union[LLMConfig, Config, Dict[str, Any]],
    prompt_builder: Union[PromptBuilder, AsyncPromptBuilder] | None = None
):
    """Initialize provider with configuration.

    Args:
        config: Configuration as LLMConfig, dataknobs Config object, or dict
        prompt_builder: Optional prompt builder for integrated prompting
    """
    self.config = normalize_llm_config(config)
    self.prompt_builder = prompt_builder
    self._client = None
    self._is_initialized = False
    self._is_closing = False
    self._in_flight: set[asyncio.Task[Any]] = set()
Attributes
is_initialized property
is_initialized: bool

Check if provider is initialized.

Functions
initialize abstractmethod
initialize() -> None

Initialize the LLM client.

Source code in packages/llm/src/dataknobs_llm/llm/base.py
@abstractmethod
def initialize(self) -> None:
    """Initialize the LLM client."""
    pass
close abstractmethod
close() -> None

Close the LLM client.

Source code in packages/llm/src/dataknobs_llm/llm/base.py
@abstractmethod
def close(self) -> None:
    """Close the LLM client."""
    pass
validate_model abstractmethod async
validate_model() -> bool

Validate that the model is available.

Source code in packages/llm/src/dataknobs_llm/llm/base.py
@abstractmethod
async def validate_model(self) -> bool:
    """Validate that the model is available."""
    pass
get_capabilities
get_capabilities() -> List[ModelCapability]

Get model capabilities.

Template method: calls :meth:_detect_capabilities (subclass hook), filters out None values, then applies config overrides via :meth:_resolve_capabilities.

Subclasses override :meth:_detect_capabilities instead of this method.

Source code in packages/llm/src/dataknobs_llm/llm/base.py
def get_capabilities(self) -> List[ModelCapability]:
    """Get model capabilities.

    Template method: calls :meth:`_detect_capabilities` (subclass hook),
    filters out ``None`` values, then applies config overrides via
    :meth:`_resolve_capabilities`.

    Subclasses override :meth:`_detect_capabilities` instead of this
    method.
    """
    detected = self._detect_capabilities()
    detected = [c for c in detected if c is not None]
    return self._resolve_capabilities(detected)
__enter__
__enter__()

Context manager entry.

Source code in packages/llm/src/dataknobs_llm/llm/base.py
def __enter__(self):
    """Context manager entry."""
    self.initialize()
    return self
__exit__
__exit__(exc_type, exc_val, exc_tb)

Context manager exit.

Source code in packages/llm/src/dataknobs_llm/llm/base.py
def __exit__(self, exc_type, exc_val, exc_tb):
    """Context manager exit."""
    self.close()

LLMConfig dataclass

LLMConfig(
    provider: str,
    model: str,
    api_key: str | None = None,
    api_base: str | None = None,
    temperature: float | None = None,
    max_tokens: int | None = None,
    top_p: float | None = None,
    frequency_penalty: float | None = None,
    presence_penalty: float | None = None,
    stop_sequences: List[str] | None = None,
    mode: CompletionMode = CompletionMode.CHAT,
    system_prompt: str | None = None,
    response_format: str | None = None,
    functions: List[Dict[str, Any]] | None = None,
    function_call: Union[str, Dict[str, str]] | None = None,
    stream: bool = False,
    stream_callback: Callable[[LLMStreamResponse], None] | None = None,
    rate_limit: int | None = None,
    retry_count: int = 3,
    retry_delay: float = 1.0,
    timeout: float = 60.0,
    seed: int | None = None,
    logit_bias: Dict[str, float] | None = None,
    user_id: str | None = None,
    options: Dict[str, Any] = dict(),
    dimensions: int | None = None,
    capabilities: List[str] | None = None,
)

Configuration for LLM operations.

Comprehensive configuration for LLM providers with 20+ parameters controlling generation, rate limiting, function calling, and more. Works seamlessly with both direct instantiation and dataknobs Config objects.

This class supports: - All major LLM providers (OpenAI, Anthropic, Ollama, HuggingFace) - Generation parameters (temperature, max_tokens, top_p, etc.) - Embedding configuration (dimensions) - Function/tool calling configuration - Streaming with callbacks - Rate limiting and retry logic - Provider-specific options via options dict

Note

Generation parameters (temperature, top_p, max_tokens, etc.) default to None, meaning "not set — let the provider API apply its own default." Only explicitly supplied values are sent to the provider (see :meth:generation_params).

Example
from dataknobs_llm.llm.base import LLMConfig, CompletionMode

# Basic configuration — temperature is explicitly set here;
# omitting it would let the provider use its own default.
config = LLMConfig(
    provider="openai",
    model="gpt-4",
    api_key="sk-...",
    temperature=0.7,
    max_tokens=500
)

# Creative writing config
creative_config = LLMConfig(
    provider="anthropic",
    model="claude-3-sonnet",
    temperature=1.2,
    top_p=0.95,
    max_tokens=2000
)

# Deterministic config for testing
test_config = LLMConfig(
    provider="openai",
    model="gpt-4",
    temperature=0.0,
    seed=42,  # Reproducible outputs
    max_tokens=100
)

# Function calling config
function_config = LLMConfig(
    provider="openai",
    model="gpt-4",
    functions=[{
        "name": "search_docs",
        "description": "Search documentation",
        "parameters": {"type": "object", "properties": {...}}
    }],
    function_call="auto"
)

# Streaming with callback
def on_chunk(chunk):
    print(chunk.delta, end="")

streaming_config = LLMConfig(
    provider="openai",
    model="gpt-4",
    stream=True,
    stream_callback=on_chunk
)

# From dictionary (Config compatibility)
config_dict = {
    "provider": "ollama",
    "model": "llama2",
    "type": "llm",  # Config metadata (ignored)
    "temperature": 0.8
}
config = LLMConfig.from_dict(config_dict)

# Clone with overrides
new_config = config.clone(temperature=1.0, max_tokens=1000)
See Also

normalize_llm_config: Convert various formats to LLMConfig CompletionMode: Available completion modes

Methods:

Name Description
from_dict

Create LLMConfig from a dictionary.

to_dict

Convert LLMConfig to a dictionary.

clone

Create a copy of this config with optional overrides.

generation_params

Return only explicitly-set generation parameters.

Functions
from_dict classmethod
from_dict(config_dict: Dict[str, Any]) -> LLMConfig

Create LLMConfig from a dictionary.

This method handles dictionaries from dataknobs Config objects, which may include 'type', 'name', and 'factory' attributes. These attributes are ignored during LLMConfig construction.

Parameters:

Name Type Description Default
config_dict Dict[str, Any]

Configuration dictionary

required

Returns:

Type Description
LLMConfig

LLMConfig instance

Source code in packages/llm/src/dataknobs_llm/llm/base.py
@classmethod
def from_dict(cls, config_dict: Dict[str, Any]) -> "LLMConfig":
    """Create LLMConfig from a dictionary.

    This method handles dictionaries from dataknobs Config objects,
    which may include 'type', 'name', and 'factory' attributes.
    These attributes are ignored during LLMConfig construction.

    Args:
        config_dict: Configuration dictionary

    Returns:
        LLMConfig instance
    """
    # Filter out Config-specific attributes
    config_data = {
        k: v for k, v in config_dict.items()
        if k not in ('type', 'name', 'factory')
    }

    # Handle mode conversion if it's a string
    if 'mode' in config_data and isinstance(config_data['mode'], str):
        config_data['mode'] = CompletionMode(config_data['mode'])

    # Get dataclass fields to filter unknown attributes
    valid_fields = {f.name for f in cls.__dataclass_fields__.values()}
    filtered_data = {k: v for k, v in config_data.items() if k in valid_fields}

    return cls(**filtered_data)
to_dict
to_dict(include_config_attrs: bool = False) -> Dict[str, Any]

Convert LLMConfig to a dictionary.

Parameters:

Name Type Description Default
include_config_attrs bool

If True, includes 'type' attribute for Config compatibility

False

Returns:

Type Description
Dict[str, Any]

Configuration dictionary

Source code in packages/llm/src/dataknobs_llm/llm/base.py
def to_dict(self, include_config_attrs: bool = False) -> Dict[str, Any]:
    """Convert LLMConfig to a dictionary.

    Args:
        include_config_attrs: If True, includes 'type' attribute for Config compatibility

    Returns:
        Configuration dictionary
    """
    result = {}

    for field_info in self.__dataclass_fields__.values():
        value = getattr(self, field_info.name)

        # Handle enum conversion
        if isinstance(value, Enum):
            result[field_info.name] = value.value
        # Skip None values for optional fields
        elif value is not None:
            result[field_info.name] = value
        # Include default factories even if empty for certain fields
        elif field_info.name == 'options':
            result[field_info.name] = {}

    # Optionally add Config-compatible type attribute
    if include_config_attrs:
        result['type'] = 'llm'

    return result
clone
clone(**overrides: Any) -> LLMConfig

Create a copy of this config with optional overrides.

This method is useful for creating runtime configuration variations without mutating the original config. All dataclass fields can be overridden via keyword arguments.

Parameters:

Name Type Description Default
**overrides Any

Field values to override in the cloned config

{}

Returns:

Type Description
LLMConfig

New LLMConfig instance with overrides applied

Example

base_config = LLMConfig(provider="openai", model="gpt-4", temperature=0.7) creative_config = base_config.clone(temperature=1.2, max_tokens=500)

Source code in packages/llm/src/dataknobs_llm/llm/base.py
def clone(self, **overrides: Any) -> "LLMConfig":
    """Create a copy of this config with optional overrides.

    This method is useful for creating runtime configuration variations
    without mutating the original config. All dataclass fields can be
    overridden via keyword arguments.

    Args:
        **overrides: Field values to override in the cloned config

    Returns:
        New LLMConfig instance with overrides applied

    Example:
        >>> base_config = LLMConfig(provider="openai", model="gpt-4", temperature=0.7)
        >>> creative_config = base_config.clone(temperature=1.2, max_tokens=500)
    """
    from dataclasses import replace
    return replace(self, **overrides)
generation_params
generation_params() -> Dict[str, Any]

Return only explicitly-set generation parameters.

Providers use this to build API requests without sending unnecessary defaults. Parameters left as None are omitted, letting each provider API apply its own default.

Returns:

Type Description
Dict[str, Any]

Dictionary of generation parameter names to their values.

Dict[str, Any]

Only includes parameters that were explicitly set (non-None).

Source code in packages/llm/src/dataknobs_llm/llm/base.py
def generation_params(self) -> Dict[str, Any]:
    """Return only explicitly-set generation parameters.

    Providers use this to build API requests without sending
    unnecessary defaults. Parameters left as None are omitted,
    letting each provider API apply its own default.

    Returns:
        Dictionary of generation parameter names to their values.
        Only includes parameters that were explicitly set (non-None).
    """
    params: Dict[str, Any] = {}
    if self.temperature is not None:
        params["temperature"] = self.temperature
    if self.top_p is not None:
        params["top_p"] = self.top_p
    if self.max_tokens is not None:
        params["max_tokens"] = self.max_tokens
    if self.frequency_penalty is not None:
        params["frequency_penalty"] = self.frequency_penalty
    if self.presence_penalty is not None:
        params["presence_penalty"] = self.presence_penalty
    if self.stop_sequences is not None:
        params["stop_sequences"] = self.stop_sequences
    if self.seed is not None:
        params["seed"] = self.seed
    return params

LLMMessage dataclass

LLMMessage(
    role: str,
    content: str,
    name: str | None = None,
    tool_call_id: str | None = None,
    function_call: Dict[str, Any] | None = None,
    tool_calls: list[ToolCall] | None = None,
    metadata: Dict[str, Any] = dict(),
)

Represents a message in LLM conversation.

Standard message format used across all providers. Messages are the fundamental unit of LLM interactions, containing role-based content for multi-turn conversations.

Attributes:

Name Type Description
role str

Message role - 'system', 'user', 'assistant', 'tool', or 'function'

content str

Message content text

name str | None

Optional name for function/tool messages or multi-user scenarios

tool_call_id str | None

Provider-assigned ID for pairing tool results with invocations (required by OpenAI and Anthropic APIs)

function_call Dict[str, Any] | None

Function call data for tool-using models

tool_calls list[ToolCall] | None

Tool calls made by the assistant in this message

metadata Dict[str, Any]

Additional metadata (timestamps, IDs, etc.)

Example
from dataknobs_llm.llm.base import LLMMessage

# System message
system_msg = LLMMessage(
    role="system",
    content="You are a helpful coding assistant."
)

# User message
user_msg = LLMMessage(
    role="user",
    content="How do I reverse a list in Python?"
)

# Assistant message
assistant_msg = LLMMessage(
    role="assistant",
    content="Use the reverse() method or [::-1] slicing."
)

# Function result message
function_msg = LLMMessage(
    role="function",
    name="search_docs",
    content='{"result": "Found 3 examples"}'
)

# Build conversation
messages = [system_msg, user_msg, assistant_msg]

Methods:

Name Description
to_dict

Convert to canonical dictionary format for storage/interchange.

from_dict

Create an LLMMessage from a dictionary.

Functions
to_dict
to_dict() -> Dict[str, Any]

Convert to canonical dictionary format for storage/interchange.

Only includes non-None/non-empty optional fields to keep output clean.

Returns:

Type Description
Dict[str, Any]

Dictionary with role, content, and any present optional fields.

Source code in packages/llm/src/dataknobs_llm/llm/base.py
def to_dict(self) -> Dict[str, Any]:
    """Convert to canonical dictionary format for storage/interchange.

    Only includes non-None/non-empty optional fields to keep output clean.

    Returns:
        Dictionary with ``role``, ``content``, and any present optional fields.
    """
    d: Dict[str, Any] = {
        "role": self.role,
        "content": self.content,
    }
    if self.name is not None:
        d["name"] = self.name
    if self.tool_call_id is not None:
        d["tool_call_id"] = self.tool_call_id
    if self.tool_calls:
        d["tool_calls"] = [tc.to_dict() for tc in self.tool_calls]
    if self.function_call is not None:
        d["function_call"] = self.function_call
    if self.metadata:
        d["metadata"] = self.metadata
    return d
from_dict classmethod
from_dict(data: Dict[str, Any]) -> LLMMessage

Create an LLMMessage from a dictionary.

Handles both the new canonical format (with tool_calls as list of dicts) and the legacy format (without tool_calls/function_call).

Parameters:

Name Type Description Default
data Dict[str, Any]

Dictionary with role (required), content, and optional name, tool_calls, function_call, metadata.

required

Returns:

Type Description
LLMMessage

LLMMessage instance.

Source code in packages/llm/src/dataknobs_llm/llm/base.py
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "LLMMessage":
    """Create an LLMMessage from a dictionary.

    Handles both the new canonical format (with ``tool_calls`` as list of
    dicts) and the legacy format (without ``tool_calls``/``function_call``).

    Args:
        data: Dictionary with ``role`` (required), ``content``, and
            optional ``name``, ``tool_calls``, ``function_call``, ``metadata``.

    Returns:
        LLMMessage instance.
    """
    tool_calls = None
    if data.get("tool_calls"):
        tool_calls = [ToolCall.from_dict(tc) for tc in data["tool_calls"]]
    return cls(
        role=data["role"],
        content=data.get("content", ""),
        name=data.get("name"),
        tool_call_id=data.get("tool_call_id"),
        tool_calls=tool_calls,
        function_call=data.get("function_call"),
        metadata=data.get("metadata", {}),
    )

LLMResponse dataclass

LLMResponse(
    content: str,
    model: str,
    finish_reason: str | None = None,
    usage: Dict[str, int] | None = None,
    function_call: Dict[str, Any] | None = None,
    tool_calls: list[ToolCall] | None = None,
    metadata: Dict[str, Any] = dict(),
    created_at: datetime = datetime.now(),
    cost_usd: float | None = None,
    cumulative_cost_usd: float | None = None,
)

Response from LLM.

Standard response format returned by all LLM providers. Contains the generated content along with metadata about token usage, cost, and completion status.

Attributes:

Name Type Description
content str

Generated text content

model str

Model identifier that generated the response

finish_reason str | None

Why generation stopped - 'stop', 'length', 'function_call'

usage Dict[str, int] | None

Token usage stats (prompt_tokens, completion_tokens, total_tokens)

function_call Dict[str, Any] | None

Function call data if model requested tool use

metadata Dict[str, Any]

Provider-specific metadata

created_at datetime

Response timestamp

cost_usd float | None

Estimated cost in USD for this request

cumulative_cost_usd float | None

Running total cost for conversation

Example
from dataknobs_llm import create_llm_provider

llm = create_llm_provider("openai", model="gpt-4")
response = await llm.complete("What is Python?")

# Access response data
print(response.content)
# => "Python is a high-level programming language..."

# Check token usage
print(f"Tokens used: {response.usage['total_tokens']}")
# => Tokens used: 87

# Monitor costs
if response.cost_usd:
    print(f"Cost: ${response.cost_usd:.4f}")
    print(f"Total: ${response.cumulative_cost_usd:.4f}")

# Check completion status
if response.finish_reason == "length":
    print("Response truncated due to max_tokens limit")
See Also

LLMMessage: Request message format LLMStreamResponse: Streaming response format

LLMStreamResponse dataclass

LLMStreamResponse(
    delta: str,
    is_final: bool = False,
    finish_reason: str | None = None,
    usage: Dict[str, int] | None = None,
    tool_calls: list[ToolCall] | None = None,
    model: str | None = None,
    metadata: Dict[str, Any] = dict(),
)

Streaming response from LLM.

Represents a single chunk in a streaming LLM response. Streaming allows displaying generated text incrementally as it's produced, providing better user experience for long responses.

Attributes:

Name Type Description
delta str

Incremental content for this chunk (not cumulative)

is_final bool

True if this is the last chunk in the stream

finish_reason str | None

Why generation stopped (only set on final chunk)

usage Dict[str, int] | None

Token usage stats (only set on final chunk)

tool_calls list[ToolCall] | None

Tool calls requested by the model (only set on final chunk)

model str | None

Model identifier (only set on final chunk)

metadata Dict[str, Any]

Additional chunk metadata

Example
from dataknobs_llm import create_llm_provider

llm = create_llm_provider("openai", model="gpt-4")

# Stream and display in real-time
async for chunk in llm.stream_complete("Write a poem"):
    print(chunk.delta, end="", flush=True)

    if chunk.is_final:
        print(f"\n\nFinished: {chunk.finish_reason}")
        print(f"Tokens: {chunk.usage['total_tokens']}")

# Accumulate full response
full_text = ""
chunks_received = 0

async for chunk in llm.stream_complete("Explain Python"):
    full_text += chunk.delta
    chunks_received += 1

    # Optional: show progress
    if chunks_received % 10 == 0:
        print(f"Received {chunks_received} chunks...")

print(f"\nComplete response ({len(full_text)} chars)")
print(full_text)
See Also

LLMResponse: Non-streaming response format AsyncLLMProvider.stream_complete: Streaming method

CompletionMode

Bases: Enum

LLM completion modes.

Defines the operation mode for LLM requests. Different modes use different APIs and formatting requirements.

Attributes:

Name Type Description
CHAT

Chat completion with conversational message history

TEXT

Raw text completion (legacy models)

INSTRUCT

Instruction-following mode

EMBEDDING

Generate vector embeddings for semantic search

FUNCTION

Function/tool calling mode

Example
from dataknobs_llm.llm.base import LLMConfig, CompletionMode

# Chat mode (default for modern models)
config = LLMConfig(
    provider="openai",
    model="gpt-4",
    mode=CompletionMode.CHAT
)

# Embedding mode for vector search
embedding_config = LLMConfig(
    provider="openai",
    model="text-embedding-ada-002",
    mode=CompletionMode.EMBEDDING
)

ModelCapability

Bases: Enum

Model capabilities.

Enumerates the capabilities that different LLM models support. Providers use this to advertise what features are available for a specific model.

Attributes:

Name Type Description
TEXT_GENERATION

Basic text generation

CHAT

Multi-turn conversational interactions

EMBEDDINGS

Vector embedding generation

FUNCTION_CALLING

Tool/function calling support

VISION

Image understanding capabilities

CODE

Code generation and analysis

JSON_MODE

Structured JSON output

STREAMING

Incremental response streaming

Example
from dataknobs_llm import create_llm_provider
from dataknobs_llm.llm.base import ModelCapability

# Check model capabilities
llm = create_llm_provider("openai", model="gpt-4")
capabilities = llm.get_capabilities()

if ModelCapability.STREAMING in capabilities:
    # Use streaming
    async for chunk in llm.stream_complete("Hello"):
        print(chunk.delta, end="")

if ModelCapability.FUNCTION_CALLING in capabilities:
    # Use function calling
    response = await llm.function_call(messages, functions)

OpenAIProvider

OpenAIProvider(
    config: Union[LLMConfig, Config, Dict[str, Any]],
    prompt_builder: AsyncPromptBuilder | None = None,
)

Bases: AsyncLLMProvider

OpenAI LLM provider with full API support.

Provides async access to OpenAI's chat, completion, embedding, and function calling APIs. Supports all GPT models including GPT-4, GPT-3.5, and specialized models (vision, embeddings).

Features
  • Full GPT-4 and GPT-3.5-turbo support
  • Streaming responses for real-time output
  • Function calling for tool use
  • JSON mode for structured outputs
  • Embeddings for semantic search
  • Custom API endpoints (e.g., Azure OpenAI)
  • Automatic retry with rate limiting
  • Cost tracking
Example
from dataknobs_llm.llm.providers import OpenAIProvider
from dataknobs_llm.llm.base import LLMConfig, LLMMessage

# Basic usage
config = LLMConfig(
    provider="openai",
    model="gpt-4",
    api_key="sk-...",
    temperature=0.7
)

async with OpenAIProvider(config) as llm:
    # Simple question
    response = await llm.complete("Explain async/await")
    print(response.content)

    # Multi-turn conversation
    messages = [
        LLMMessage(role="system", content="You are a coding tutor"),
        LLMMessage(role="user", content="How do I use asyncio?")
    ]
    response = await llm.complete(messages)

# JSON mode for structured output
json_config = LLMConfig(
    provider="openai",
    model="gpt-4",
    response_format="json",
    system_prompt="Return JSON only"
)

llm = OpenAIProvider(json_config)
await llm.initialize()
response = await llm.complete(
    "List 3 Python libraries as JSON: {name, description}"
)
import json
data = json.loads(response.content)

# With Azure OpenAI
azure_config = LLMConfig(
    provider="openai",
    model="gpt-4",
    api_base="https://your-resource.openai.azure.com/",
    api_key="azure-key"
)

# Function calling
functions = [{
    "name": "search",
    "description": "Search for information",
    "parameters": {
        "type": "object",
        "properties": {
            "query": {"type": "string"}
        }
    }
}]

response = await llm.function_call(messages, functions)
if response.function_call:
    print(f"Call: {response.function_call['name']}")

Parameters:

Name Type Description Default
config Union[LLMConfig, Config, Dict[str, Any]]

LLMConfig, dataknobs Config, or dict with provider settings

required
prompt_builder AsyncPromptBuilder | None

Optional AsyncPromptBuilder for prompt rendering

None

Attributes:

Name Type Description
adapter OpenAIAdapter

Format adapter for OpenAI API

_client

OpenAI AsyncOpenAI client instance

See Also

LLMConfig: Configuration options AsyncLLMProvider: Base provider interface OpenAIAdapter: Format conversion

Methods:

Name Description
initialize

Initialize OpenAI client.

validate_model

Validate model availability.

complete

Generate completion.

stream_complete

Generate streaming completion.

embed

Generate embeddings.

function_call

Execute function calling.

Source code in packages/llm/src/dataknobs_llm/llm/providers/openai.py
def __init__(
    self,
    config: Union[LLMConfig, "Config", Dict[str, Any]],
    prompt_builder: AsyncPromptBuilder | None = None
):
    # Normalize config first
    llm_config = normalize_llm_config(config)
    super().__init__(llm_config, prompt_builder=prompt_builder)
    self.adapter = OpenAIAdapter()
Functions
initialize async
initialize() -> None

Initialize OpenAI client.

Source code in packages/llm/src/dataknobs_llm/llm/providers/openai.py
async def initialize(self) -> None:
    """Initialize OpenAI client."""
    try:
        import openai

        api_key = self.config.api_key or os.environ.get('OPENAI_API_KEY')
        if not api_key:
            raise ValueError("OpenAI API key not provided")

        self._client = openai.AsyncOpenAI(
            api_key=api_key,
            base_url=self.config.api_base,
            timeout=self.config.timeout
        )
        self._is_initialized = True
    except ImportError as e:
        raise ImportError("openai package not installed. Install with: pip install openai") from e
validate_model async
validate_model() -> bool

Validate model availability.

Source code in packages/llm/src/dataknobs_llm/llm/providers/openai.py
async def validate_model(self) -> bool:
    """Validate model availability."""
    try:
        # List available models
        models = await self._client.models.list()
        model_ids = [m.id for m in models.data]
        return self.config.model in model_ids
    except Exception:
        return False
complete async
complete(
    messages: Union[str, List[LLMMessage]],
    config_overrides: Dict[str, Any] | None = None,
    tools: list[Any] | None = None,
    **kwargs: Any,
) -> LLMResponse

Generate completion.

Parameters:

Name Type Description Default
messages Union[str, List[LLMMessage]]

Input messages or prompt

required
config_overrides Dict[str, Any] | None

Optional dict to override config fields (model, temperature, max_tokens, top_p, stop_sequences, seed)

None
tools list[Any] | None

Optional list of Tool objects for function calling

None
**kwargs Any

Additional provider-specific parameters

{}
Source code in packages/llm/src/dataknobs_llm/llm/providers/openai.py
async def complete(
    self,
    messages: Union[str, List[LLMMessage]],
    config_overrides: Dict[str, Any] | None = None,
    tools: list[Any] | None = None,
    **kwargs: Any
) -> LLMResponse:
    """Generate completion.

    Args:
        messages: Input messages or prompt
        config_overrides: Optional dict to override config fields (model,
            temperature, max_tokens, top_p, stop_sequences, seed)
        tools: Optional list of Tool objects for function calling
        **kwargs: Additional provider-specific parameters
    """
    if not self._is_initialized:
        await self.initialize()

    # Get runtime config (with overrides applied if provided)
    runtime_config = self._get_runtime_config(config_overrides)

    # Convert string to message list
    if isinstance(messages, str):
        messages = [LLMMessage(role='user', content=messages)]

    # Add system prompt if configured
    if runtime_config.system_prompt and messages[0].role != 'system':
        messages.insert(0, LLMMessage(role='system', content=runtime_config.system_prompt))

    # Adapt messages and config
    adapted_messages = self.adapter.adapt_messages(messages)
    params = self.adapter.adapt_config(runtime_config)
    params.update(kwargs)

    # Handle tools if provided
    if tools:
        params["tools"] = self.adapter.adapt_tools(tools)

    # Make API call
    response = await self._client.chat.completions.create(
        messages=adapted_messages,
        **params
    )

    return self._analyze_response(self.adapter.adapt_response(response))
stream_complete async
stream_complete(
    messages: Union[str, List[LLMMessage]],
    config_overrides: Dict[str, Any] | None = None,
    tools: list[Any] | None = None,
    **kwargs: Any,
) -> AsyncIterator[LLMStreamResponse]

Generate streaming completion.

Parameters:

Name Type Description Default
messages Union[str, List[LLMMessage]]

Input messages or prompt

required
config_overrides Dict[str, Any] | None

Optional dict to override config fields (model, temperature, max_tokens, top_p, stop_sequences, seed)

None
tools list[Any] | None

Optional list of Tool objects for function calling.

None
**kwargs Any

Additional provider-specific parameters

{}
Source code in packages/llm/src/dataknobs_llm/llm/providers/openai.py
async def stream_complete(
    self,
    messages: Union[str, List[LLMMessage]],
    config_overrides: Dict[str, Any] | None = None,
    tools: list[Any] | None = None,
    **kwargs: Any
) -> AsyncIterator[LLMStreamResponse]:
    """Generate streaming completion.

    Args:
        messages: Input messages or prompt
        config_overrides: Optional dict to override config fields (model,
            temperature, max_tokens, top_p, stop_sequences, seed)
        tools: Optional list of Tool objects for function calling.
        **kwargs: Additional provider-specific parameters
    """
    if not self._is_initialized:
        await self.initialize()

    # Get runtime config (with overrides applied if provided)
    runtime_config = self._get_runtime_config(config_overrides)

    # Convert string to message list
    if isinstance(messages, str):
        messages = [LLMMessage(role='user', content=messages)]

    # Add system prompt if configured
    if runtime_config.system_prompt and messages[0].role != 'system':
        messages.insert(0, LLMMessage(role='system', content=runtime_config.system_prompt))

    # Adapt messages and config
    adapted_messages = self.adapter.adapt_messages(messages)
    params = self.adapter.adapt_config(runtime_config)
    params['stream'] = True
    params.update(kwargs)

    # Handle tools if provided
    if tools:
        params["tools"] = self.adapter.adapt_tools(tools)

    # Stream API call
    stream = await self._client.chat.completions.create(
        messages=adapted_messages,
        **params
    )

    # Accumulate tool call deltas across chunks. OpenAI sends them
    # incrementally via delta.tool_calls[i].index.
    tool_call_accumulators: dict[int, dict[str, Any]] = {}

    async for chunk in stream:
        choice = chunk.choices[0] if chunk.choices else None
        if not choice:
            continue

        delta = choice.delta
        finish_reason = choice.finish_reason

        # Accumulate tool call deltas
        if delta.tool_calls:
            for tc_delta in delta.tool_calls:
                idx = tc_delta.index
                if idx not in tool_call_accumulators:
                    tool_call_accumulators[idx] = {
                        "id": "",
                        "name": "",
                        "arguments": "",
                    }
                acc = tool_call_accumulators[idx]
                if tc_delta.id:
                    acc["id"] += tc_delta.id
                if tc_delta.function:
                    if tc_delta.function.name:
                        acc["name"] += tc_delta.function.name
                    if tc_delta.function.arguments:
                        acc["arguments"] += tc_delta.function.arguments

        # Yield content chunks
        content = delta.content or ""
        if content or finish_reason is not None:
            # Build tool_calls on final chunk
            accumulated_tool_calls = None
            if finish_reason is not None and tool_call_accumulators:
                accumulated_tool_calls = [
                    ToolCall(
                        name=acc["name"],
                        parameters=json.loads(acc["arguments"])
                        if acc["arguments"]
                        else {},
                        id=acc["id"] or None,
                    )
                    for _, acc in sorted(tool_call_accumulators.items())
                ]

            yield LLMStreamResponse(
                delta=content,
                is_final=finish_reason is not None,
                finish_reason=finish_reason,
                tool_calls=accumulated_tool_calls,
                model=runtime_config.model if finish_reason is not None else None,
            )
embed async
embed(
    texts: Union[str, List[str]], **kwargs
) -> Union[List[float], List[List[float]]]

Generate embeddings.

Source code in packages/llm/src/dataknobs_llm/llm/providers/openai.py
async def embed(
    self,
    texts: Union[str, List[str]],
    **kwargs
) -> Union[List[float], List[List[float]]]:
    """Generate embeddings."""
    if not self._is_initialized:
        await self.initialize()

    if isinstance(texts, str):
        texts = [texts]
        single = True
    else:
        single = False

    response = await self._client.embeddings.create(
        input=texts,
        model=self.config.model or 'text-embedding-ada-002'
    )

    embeddings = [e.embedding for e in response.data]
    return embeddings[0] if single else embeddings
function_call async
function_call(
    messages: List[LLMMessage], functions: List[Dict[str, Any]], **kwargs
) -> LLMResponse

Execute function calling.

Source code in packages/llm/src/dataknobs_llm/llm/providers/openai.py
async def function_call(
    self,
    messages: List[LLMMessage],
    functions: List[Dict[str, Any]],
    **kwargs
) -> LLMResponse:
    """Execute function calling."""
    warnings.warn("function_call() is deprecated, use complete(tools=...) instead", DeprecationWarning, stacklevel=2)
    if not self._is_initialized:
        await self.initialize()

    # Add system prompt if configured
    if self.config.system_prompt and messages[0].role != 'system':
        messages.insert(0, LLMMessage(role='system', content=self.config.system_prompt))

    # Adapt messages and config
    adapted_messages = self.adapter.adapt_messages(messages)
    params = self.adapter.adapt_config(self.config)
    params['functions'] = functions
    params['function_call'] = kwargs.get('function_call', 'auto')
    params.update(kwargs)

    # Make API call
    response = await self._client.chat.completions.create(
        messages=adapted_messages,
        **params
    )

    return self._analyze_response(self.adapter.adapt_response(response))

AnthropicProvider

AnthropicProvider(
    config: Union[LLMConfig, Config, Dict[str, Any]],
    prompt_builder: AsyncPromptBuilder | None = None,
)

Bases: AsyncLLMProvider

Anthropic Claude LLM provider with full API support.

Provides async access to Anthropic's Claude models including Claude 3 (Opus, Sonnet, Haiku) and Claude 2. Supports advanced features like native tool use, vision, and extended context windows.

Features
  • Claude 3 Opus/Sonnet/Haiku and Claude 2 models
  • Native tools API for function calling (Claude 3+)
  • Vision capabilities for image understanding (Claude 3+)
  • Streaming responses for real-time output
  • Long context windows (up to 200k tokens)
  • Advanced reasoning and coding capabilities
  • System prompts for behavior control
  • JSON output mode
Example
from dataknobs_llm.llm.providers import AnthropicProvider
from dataknobs_llm.llm.base import LLMConfig, LLMMessage

# Basic usage
config = LLMConfig(
    provider="anthropic",
    model="claude-3-sonnet-20240229",
    api_key="sk-ant-...",
    temperature=0.7,
    max_tokens=1024
)

async with AnthropicProvider(config) as llm:
    # Simple completion
    response = await llm.complete("Explain machine learning")
    print(response.content)

    # With system prompt
    messages = [
        LLMMessage(
            role="system",
            content="You are an expert Python tutor"
        ),
        LLMMessage(
            role="user",
            content="How do I use decorators?"
        )
    ]
    response = await llm.complete(messages)

# Long context processing (Claude 3+)
long_config = LLMConfig(
    provider="anthropic",
    model="claude-3-opus-20240229",
    max_tokens=4096
)

llm = AnthropicProvider(long_config)
await llm.initialize()

# Process large document
with open("large_doc.txt") as f:
    long_text = f.read()  # Up to 200k tokens!

response = await llm.complete(
    f"Summarize this document:\n\n{long_text}"
)

# Tool use / function calling (Claude 3+)
tools = [
    {
        "name": "web_search",
        "description": "Search the web for information",
        "input_schema": {
            "type": "object",
            "properties": {
                "query": {
                    "type": "string",
                    "description": "Search query"
                },
                "num_results": {
                    "type": "integer",
                    "description": "Number of results"
                }
            },
            "required": ["query"]
        }
    }
]

messages = [
    LLMMessage(
        role="user",
        content="Search for latest AI news"
    )
]

response = await llm.function_call(messages, tools)
if response.function_call:
    import json
    tool_input = json.loads(response.function_call["arguments"])
    print(f"Tool: {response.function_call['name']}")
    print(f"Input: {tool_input}")

Parameters:

Name Type Description Default
config Union[LLMConfig, Config, Dict[str, Any]]

LLMConfig, dataknobs Config, or dict with provider settings

required
prompt_builder AsyncPromptBuilder | None

Optional AsyncPromptBuilder for prompt rendering

None

Attributes:

Name Type Description
_client

Anthropic AsyncAnthropic client instance

See Also

LLMConfig: Configuration options AsyncLLMProvider: Base provider interface Anthropic API Docs: https://docs.anthropic.com/

Methods:

Name Description
initialize

Initialize Anthropic client.

validate_model

Validate model availability.

complete

Generate completion.

stream_complete

Generate streaming completion.

embed

Anthropic doesn't provide embeddings.

function_call

Execute function calling with native Anthropic tools API (Claude 3+).

Source code in packages/llm/src/dataknobs_llm/llm/providers/anthropic.py
def __init__(
    self,
    config: Union[LLMConfig, "Config", Dict[str, Any]],
    prompt_builder: AsyncPromptBuilder | None = None
):
    # Normalize config first
    llm_config = normalize_llm_config(config)
    super().__init__(llm_config, prompt_builder=prompt_builder)
    self.adapter = AnthropicAdapter()
Functions
initialize async
initialize() -> None

Initialize Anthropic client.

Source code in packages/llm/src/dataknobs_llm/llm/providers/anthropic.py
async def initialize(self) -> None:
    """Initialize Anthropic client."""
    try:
        import anthropic

        api_key = self.config.api_key or os.environ.get('ANTHROPIC_API_KEY')
        if not api_key:
            raise ValueError("Anthropic API key not provided")

        self._client = anthropic.AsyncAnthropic(
            api_key=api_key,
            base_url=self.config.api_base,
            timeout=self.config.timeout
        )
        self._is_initialized = True
    except ImportError as e:
        raise ImportError("anthropic package not installed. Install with: pip install anthropic") from e
validate_model async
validate_model() -> bool

Validate model availability.

Source code in packages/llm/src/dataknobs_llm/llm/providers/anthropic.py
async def validate_model(self) -> bool:
    """Validate model availability."""
    valid_models = [
        'claude-3-opus', 'claude-3-sonnet', 'claude-3-haiku',
        'claude-2.1', 'claude-2.0', 'claude-instant-1.2'
    ]
    return any(m in self.config.model for m in valid_models)
complete async
complete(
    messages: Union[str, List[LLMMessage]],
    config_overrides: Dict[str, Any] | None = None,
    tools: list[Any] | None = None,
    **kwargs: Any,
) -> LLMResponse

Generate completion.

Parameters:

Name Type Description Default
messages Union[str, List[LLMMessage]]

Input messages or prompt

required
config_overrides Dict[str, Any] | None

Optional dict to override config fields (model, temperature, max_tokens, top_p, stop_sequences, seed)

None
tools list[Any] | None

Optional list of Tool objects for function calling

None
**kwargs Any

Additional provider-specific parameters

{}
Source code in packages/llm/src/dataknobs_llm/llm/providers/anthropic.py
async def complete(
    self,
    messages: Union[str, List[LLMMessage]],
    config_overrides: Dict[str, Any] | None = None,
    tools: list[Any] | None = None,
    **kwargs: Any
) -> LLMResponse:
    """Generate completion.

    Args:
        messages: Input messages or prompt
        config_overrides: Optional dict to override config fields (model,
            temperature, max_tokens, top_p, stop_sequences, seed)
        tools: Optional list of Tool objects for function calling
        **kwargs: Additional provider-specific parameters
    """
    if not self._is_initialized:
        await self.initialize()

    # Get runtime config (with overrides applied if provided)
    runtime_config = self._get_runtime_config(config_overrides)

    # Convert to Anthropic format
    if isinstance(messages, str):
        msg_list = [LLMMessage(role="user", content=messages)]
    else:
        msg_list = messages

    system_content, anthropic_messages = self.adapter.adapt_messages(
        msg_list, system_prompt=self.config.system_prompt,
    )

    # Build API call kwargs
    api_kwargs = self.adapter.adapt_config(runtime_config)
    api_kwargs["messages"] = anthropic_messages
    if system_content:
        api_kwargs["system"] = system_content

    # Handle tools if provided
    if tools:
        api_kwargs["tools"] = self.adapter.adapt_tools(tools)

    # Make API call
    response = await self._client.messages.create(**api_kwargs)

    return self._analyze_response(self.adapter.adapt_response(response))
stream_complete async
stream_complete(
    messages: Union[str, List[LLMMessage]],
    config_overrides: Dict[str, Any] | None = None,
    tools: list[Any] | None = None,
    **kwargs: Any,
) -> AsyncIterator[LLMStreamResponse]

Generate streaming completion.

Parameters:

Name Type Description Default
messages Union[str, List[LLMMessage]]

Input messages or prompt

required
config_overrides Dict[str, Any] | None

Optional dict to override config fields (model, temperature, max_tokens, top_p, stop_sequences, seed)

None
tools list[Any] | None

Optional list of Tool objects for function calling.

None
**kwargs Any

Additional provider-specific parameters

{}
Source code in packages/llm/src/dataknobs_llm/llm/providers/anthropic.py
async def stream_complete(
    self,
    messages: Union[str, List[LLMMessage]],
    config_overrides: Dict[str, Any] | None = None,
    tools: list[Any] | None = None,
    **kwargs: Any
) -> AsyncIterator[LLMStreamResponse]:
    """Generate streaming completion.

    Args:
        messages: Input messages or prompt
        config_overrides: Optional dict to override config fields (model,
            temperature, max_tokens, top_p, stop_sequences, seed)
        tools: Optional list of Tool objects for function calling.
        **kwargs: Additional provider-specific parameters
    """
    if not self._is_initialized:
        await self.initialize()

    # Get runtime config (with overrides applied if provided)
    runtime_config = self._get_runtime_config(config_overrides)

    # Convert to Anthropic format
    if isinstance(messages, str):
        msg_list = [LLMMessage(role="user", content=messages)]
    else:
        msg_list = messages

    system_content, anthropic_messages = self.adapter.adapt_messages(
        msg_list, system_prompt=self.config.system_prompt,
    )

    # Build stream kwargs
    stream_kwargs = self.adapter.adapt_config(runtime_config)
    stream_kwargs["messages"] = anthropic_messages
    if system_content:
        stream_kwargs["system"] = system_content

    # Handle tools if provided
    if tools:
        stream_kwargs["tools"] = self.adapter.adapt_tools(tools)

    # Stream API call
    async with self._client.messages.stream(**stream_kwargs) as stream:
        async for chunk in stream:
            if chunk.type == 'content_block_delta':
                if hasattr(chunk.delta, 'text'):
                    yield LLMStreamResponse(
                        delta=chunk.delta.text,
                        is_final=False
                    )

        # Final message — use adapter to parse content blocks
        message = await stream.get_final_message()
        parsed = self.adapter.adapt_response(message)

        yield LLMStreamResponse(
            delta='',
            is_final=True,
            finish_reason=parsed.finish_reason,
            tool_calls=parsed.tool_calls,
            model=runtime_config.model,
        )
embed async
embed(
    texts: Union[str, List[str]], **kwargs
) -> Union[List[float], List[List[float]]]

Anthropic doesn't provide embeddings.

Source code in packages/llm/src/dataknobs_llm/llm/providers/anthropic.py
async def embed(
    self,
    texts: Union[str, List[str]],
    **kwargs
) -> Union[List[float], List[List[float]]]:
    """Anthropic doesn't provide embeddings."""
    raise NotImplementedError("Anthropic doesn't provide embedding models")
function_call async
function_call(
    messages: List[LLMMessage], functions: List[Dict[str, Any]], **kwargs: Any
) -> LLMResponse

Execute function calling with native Anthropic tools API (Claude 3+).

Source code in packages/llm/src/dataknobs_llm/llm/providers/anthropic.py
    async def function_call(
        self,
        messages: List[LLMMessage],
        functions: List[Dict[str, Any]],
        **kwargs: Any
    ) -> LLMResponse:
        """Execute function calling with native Anthropic tools API (Claude 3+)."""
        warnings.warn(
            "function_call() is deprecated, use complete(tools=...) instead",
            DeprecationWarning,
            stacklevel=2,
        )
        if not self._is_initialized:
            await self.initialize()

        system_content, anthropic_messages = self.adapter.adapt_messages(
            messages, system_prompt=self.config.system_prompt,
        )

        # function_call() receives raw dicts, not Tool objects — delegate
        # to the adapter's raw function converter.
        tools = self.adapter.adapt_raw_functions(functions)

        try:
            fc_kwargs = self.adapter.adapt_config(self.config)
            fc_kwargs["messages"] = anthropic_messages
            fc_kwargs["tools"] = tools
            if system_content:
                fc_kwargs["system"] = system_content
            response = await self._client.messages.create(**fc_kwargs)

            parsed = self.adapter.adapt_response(response)

            # Legacy function_call format: extract first tool call as
            # function_call dict for backward compatibility.
            tool_use = None
            if parsed.tool_calls:
                tc = parsed.tool_calls[0]
                tool_use = {"name": tc.name, "arguments": tc.parameters}

            return LLMResponse(
                content=parsed.content,
                model=parsed.model,
                finish_reason=parsed.finish_reason,
                usage=parsed.usage,
                function_call=tool_use,
            )

        except Exception as e:
            # Fallback to prompt-based approach for older models
            logger.warning(
                "Anthropic native tools failed, falling back to prompt-based: %s", e,
            )

            function_descriptions = "\n".join([
                f"- {f['name']}: {f['description']}"
                for f in functions
            ])

            system_prompt = f"""You have access to the following functions:
{function_descriptions}

When you need to call a function, respond with:
FUNCTION_CALL: {{
    "name": "function_name",
    "arguments": {{...}}
}}"""

            messages_with_system = [
                LLMMessage(role="system", content=system_prompt)
            ] + list(messages)

            response = await self.complete(messages_with_system, **kwargs)

            # Parse function call from response
            if "FUNCTION_CALL:" in response.content:
                try:
                    func_json = response.content.split("FUNCTION_CALL:")[1].strip()
                    function_call = json.loads(func_json)
                    response.function_call = function_call
                except (json.JSONDecodeError, IndexError):
                    pass

            return response

OllamaProvider

OllamaProvider(
    config: Union[LLMConfig, Config, Dict[str, Any]],
    prompt_builder: AsyncPromptBuilder | None = None,
)

Bases: AsyncLLMProvider

Ollama local LLM provider for privacy-first, offline LLM usage.

Provides async access to locally-hosted Ollama models, enabling on-premise LLM deployment without cloud APIs. Perfect for sensitive data, air-gapped environments, and cost optimization.

Features
  • All Ollama models (Llama ⅔, Mistral, Phi, CodeLlama, etc.)
  • No API key required - fully local
  • Chat with message history
  • Streaming responses for real-time output
  • Embeddings for RAG and semantic search
  • Tool/function calling (Ollama 0.1.17+)
  • Vision models (LLaVA, bakllava)
  • Docker environment auto-detection
  • Custom model parameters (temperature, top_p, seed)
  • Zero-cost inference
Example
from dataknobs_llm.llm.providers import OllamaProvider
from dataknobs_llm.llm.base import LLMConfig, LLMMessage

# Basic local usage
config = LLMConfig(
    provider="ollama",
    model="llama2",  # or llama3, mistral, phi, etc.
    temperature=0.7
)

async with OllamaProvider(config) as llm:
    # Simple completion
    response = await llm.complete("Explain decorators in Python")
    print(response.content)

    # Multi-turn conversation
    messages = [
        LLMMessage(role="system", content="You are a helpful assistant"),
        LLMMessage(role="user", content="What is recursion?"),
        LLMMessage(role="assistant", content="Recursion is..."),
        LLMMessage(role="user", content="Show me an example")
    ]
    response = await llm.complete(messages)

# Code generation with CodeLlama
code_config = LLMConfig(
    provider="ollama",
    model="codellama",
    temperature=0.2,  # Lower for more deterministic code
    max_tokens=500
)

llm = OllamaProvider(code_config)
await llm.initialize()
response = await llm.complete(
    "Write a Python function to merge two sorted lists"
)
print(response.content)

# Remote Ollama server
remote_config = LLMConfig(
    provider="ollama",
    model="llama2",
    api_base="http://192.168.1.100:11434"  # Remote server
)

# Docker usage (auto-detects)
# In Docker, automatically uses host.docker.internal
docker_config = LLMConfig(
    provider="ollama",
    model="mistral"
)

# Vision model with image input
from dataknobs_llm.llm.base import LLMMessage
import base64

with open("image.jpg", "rb") as f:
    image_data = base64.b64encode(f.read()).decode()

vision_config = LLMConfig(
    provider="ollama",
    model="llava"  # or bakllava
)

llm = OllamaProvider(vision_config)
await llm.initialize()

messages = [
    LLMMessage(
        role="user",
        content="What objects are in this image?",
        metadata={"images": [image_data]}
    )
]

response = await llm.complete(messages)
print(response.content)

# Embeddings for RAG
embed_config = LLMConfig(
    provider="ollama",
    model="nomic-embed-text"  # or mxbai-embed-large
)

llm = OllamaProvider(embed_config)
await llm.initialize()

# Single embedding
embedding = await llm.embed("Sample text")
print(f"Dimensions: {len(embedding)}")

# Batch embeddings
texts = [
    "Python programming",
    "Machine learning basics",
    "Web development with Flask"
]
embeddings = await llm.embed(texts)
print(f"Generated {len(embeddings)} embeddings")

# Tool use (Ollama 0.1.17+)
tools = [
    {
        "type": "function",
        "function": {
            "name": "get_weather",
            "description": "Get current weather",
            "parameters": {
                "type": "object",
                "properties": {
                    "location": {"type": "string"}
                },
                "required": ["location"]
            }
        }
    }
]

response = await llm.function_call(messages, tools)

Parameters:

Name Type Description Default
config Union[LLMConfig, Config, Dict[str, Any]]

LLMConfig, dataknobs Config, or dict with provider settings

required
prompt_builder AsyncPromptBuilder | None

Optional AsyncPromptBuilder for prompt rendering

None

Attributes:

Name Type Description
base_url str

Ollama API base URL (auto-detects Docker environment)

_client

HTTP client for Ollama API

See Also

LLMConfig: Configuration options AsyncLLMProvider: Base provider interface Ollama Documentation: https://ollama.ai

Methods:

Name Description
initialize

Initialize Ollama client.

validate_model

Validate model availability.

complete

Generate completion using Ollama chat endpoint.

stream_complete

Generate streaming completion using Ollama chat endpoint.

embed

Generate embeddings.

function_call

Execute function calling with native Ollama tools support.

Source code in packages/llm/src/dataknobs_llm/llm/providers/ollama.py
def __init__(
    self,
    config: Union[LLMConfig, "Config", Dict[str, Any]],
    prompt_builder: AsyncPromptBuilder | None = None
):
    # Normalize config first
    llm_config = normalize_llm_config(config)
    super().__init__(llm_config, prompt_builder=prompt_builder)

    self.adapter = OllamaAdapter()

    # Check for Docker environment and adjust URL accordingly
    default_url = 'http://localhost:11434'
    if os.path.exists('/.dockerenv'):
        # Running in Docker, use host.docker.internal
        default_url = 'http://host.docker.internal:11434'

    # Allow environment variable override
    self.base_url = llm_config.api_base or os.environ.get('OLLAMA_BASE_URL', default_url)
Functions
initialize async
initialize() -> None

Initialize Ollama client.

Source code in packages/llm/src/dataknobs_llm/llm/providers/ollama.py
async def initialize(self) -> None:
    """Initialize Ollama client."""
    try:
        import aiohttp
        connector = aiohttp.TCPConnector(force_close=True)
        self._session = aiohttp.ClientSession(
            connector=connector,
            timeout=aiohttp.ClientTimeout(total=self.config.timeout or 30.0),
        )

        # Test connection and verify model availability
        try:
            async with self._session.get(f"{self.base_url}/api/tags") as response:
                if response.status == 200:
                    data = await response.json()
                    models = [m['name'] for m in data.get('models', [])]
                    if models:
                        # Check if configured model is available
                        matching = _find_matching_models(self.config.model, models)
                        if matching and matching[0] != self.config.model:
                            self.config.model = matching[0]
                            logger.info("Ollama: Using model %s", self.config.model)
                        elif not matching:
                            logger.warning(
                                "Ollama: Model %s not found. Available: %s",
                                self.config.model, models,
                            )
                    else:
                        logger.warning("Ollama: No models found. Please pull a model first.")
                else:
                    logger.warning("Ollama: API returned status %s", response.status)
        except Exception as e:
            logger.warning("Ollama: Could not connect to %s: %s", self.base_url, e)

        self._is_initialized = True
    except ImportError as e:
        raise ImportError("aiohttp package not installed. Install with: pip install aiohttp") from e
validate_model async
validate_model() -> bool

Validate model availability.

Source code in packages/llm/src/dataknobs_llm/llm/providers/ollama.py
async def validate_model(self) -> bool:
    """Validate model availability."""
    if not self._is_initialized or not hasattr(self, '_session'):
        return False

    try:
        async with self._session.get(f"{self.base_url}/api/tags") as response:
            if response.status == 200:
                data = await response.json()
                models = [m['name'] for m in data.get('models', [])]
                return bool(_find_matching_models(self.config.model, models))
    except Exception:
        return False
    return False
complete async
complete(
    messages: Union[str, List[LLMMessage]],
    config_overrides: Dict[str, Any] | None = None,
    tools: list[Any] | None = None,
    **kwargs: Any,
) -> LLMResponse

Generate completion using Ollama chat endpoint.

Parameters:

Name Type Description Default
messages Union[str, List[LLMMessage]]

Input messages or prompt

required
config_overrides Dict[str, Any] | None

Optional dict to override config fields (model, temperature, max_tokens, top_p, stop_sequences, seed)

None
tools list[Any] | None

Optional list of Tool objects for function calling

None
**kwargs Any

Additional provider-specific parameters

{}
Source code in packages/llm/src/dataknobs_llm/llm/providers/ollama.py
async def complete(
    self,
    messages: Union[str, List[LLMMessage]],
    config_overrides: Dict[str, Any] | None = None,
    tools: list[Any] | None = None,
    **kwargs: Any
) -> LLMResponse:
    """Generate completion using Ollama chat endpoint.

    Args:
        messages: Input messages or prompt
        config_overrides: Optional dict to override config fields (model,
            temperature, max_tokens, top_p, stop_sequences, seed)
        tools: Optional list of Tool objects for function calling
        **kwargs: Additional provider-specific parameters
    """
    if not self._is_initialized:
        await self.initialize()

    # Get runtime config (with overrides applied if provided)
    runtime_config = self._get_runtime_config(config_overrides)

    # Convert to message list
    if isinstance(messages, str):
        messages = [LLMMessage(role='user', content=messages)]

    # Add system prompt if configured
    if runtime_config.system_prompt and (not messages or messages[0].role != 'system'):
        messages = [LLMMessage(role='system', content=runtime_config.system_prompt)] + list(messages)

    # Convert to Ollama format
    ollama_messages = self._messages_to_ollama(messages)

    # Build payload for chat endpoint
    payload = {
        'model': runtime_config.model,
        'messages': ollama_messages,
        'stream': False,
        'options': self._build_options(runtime_config)
    }

    # Add format if JSON mode requested
    if runtime_config.response_format == 'json':
        payload['format'] = 'json'

    # Handle tools if provided
    if tools:
        payload['tools'] = self.adapter.adapt_tools(tools)

    # Forward 'think' parameter for reasoning models (e.g. qwen3, deepseek-r1).
    # When True, the model emits <think>...</think> blocks before the answer.
    think = runtime_config.options.get('think')
    if think is not None:
        payload['think'] = bool(think)

    async with self._session.post(f"{self.base_url}/api/chat", json=payload) as response:
        if response.status != 200:
            error_text = await response.text()

            # Handle tools not supported — raise explicit error
            if response.status == 400 and "does not support tools" in error_text:
                from ...exceptions import ToolsNotSupportedError
                model_name = runtime_config.model
                raise ToolsNotSupportedError(
                    model=model_name,
                    suggestion=(
                        "For tool support, use: llama3.1:8b, qwen3:8b, "
                        "mistral:7b, or command-r:latest"
                    ),
                )
            else:
                logger.error("Ollama API error (status %s): %s", response.status, error_text)
                logger.error("Request payload: %s", json.dumps(payload, indent=2))
                response.raise_for_status()
        else:
            data = await response.json()

    parsed = self.adapter.adapt_response(data)
    # Override model with runtime config model (adapter uses response model)
    parsed.model = runtime_config.model
    return self._analyze_response(parsed)
stream_complete async
stream_complete(
    messages: Union[str, List[LLMMessage]],
    config_overrides: Dict[str, Any] | None = None,
    tools: list[Any] | None = None,
    **kwargs: Any,
) -> AsyncIterator[LLMStreamResponse]

Generate streaming completion using Ollama chat endpoint.

Uses the /api/chat endpoint with stream: true so that the model's native chat template is applied and tool calls are supported, matching the behaviour of :meth:complete.

Parameters:

Name Type Description Default
messages Union[str, List[LLMMessage]]

Input messages or prompt

required
config_overrides Dict[str, Any] | None

Optional dict to override config fields (model, temperature, max_tokens, top_p, stop_sequences, seed)

None
tools list[Any] | None

Optional list of Tool objects for function calling.

None
**kwargs Any

Additional provider-specific parameters.

{}
Source code in packages/llm/src/dataknobs_llm/llm/providers/ollama.py
async def stream_complete(
    self,
    messages: Union[str, List[LLMMessage]],
    config_overrides: Dict[str, Any] | None = None,
    tools: list[Any] | None = None,
    **kwargs: Any
) -> AsyncIterator[LLMStreamResponse]:
    """Generate streaming completion using Ollama chat endpoint.

    Uses the ``/api/chat`` endpoint with ``stream: true`` so that the
    model's native chat template is applied and tool calls are supported,
    matching the behaviour of :meth:`complete`.

    Args:
        messages: Input messages or prompt
        config_overrides: Optional dict to override config fields (model,
            temperature, max_tokens, top_p, stop_sequences, seed)
        tools: Optional list of Tool objects for function calling.
        **kwargs: Additional provider-specific parameters.
    """
    if not self._is_initialized:
        await self.initialize()

    # Get runtime config (with overrides applied if provided)
    runtime_config = self._get_runtime_config(config_overrides)

    # Convert to message list
    if isinstance(messages, str):
        messages = [LLMMessage(role='user', content=messages)]

    # Add system prompt if configured
    if runtime_config.system_prompt and (not messages or messages[0].role != 'system'):
        messages = [LLMMessage(role='system', content=runtime_config.system_prompt)] + list(messages)

    # Convert to Ollama format
    ollama_messages = self._messages_to_ollama(messages)

    # Build payload for chat endpoint (mirrors complete())
    payload: Dict[str, Any] = {
        'model': runtime_config.model,
        'messages': ollama_messages,
        'stream': True,
        'options': self._build_options(runtime_config)
    }

    # Add format if JSON mode requested
    if runtime_config.response_format == 'json':
        payload['format'] = 'json'

    # Handle tools if provided
    if tools:
        payload['tools'] = self.adapter.adapt_tools(tools)

    # Forward 'think' parameter for reasoning models (mirrors complete())
    think = runtime_config.options.get('think')
    if think is not None:
        payload['think'] = bool(think)

    async with self._session.post(f"{self.base_url}/api/chat", json=payload) as response:
        response.raise_for_status()

        async for line in response.content:
            if line:
                data = json.loads(line.decode('utf-8'))
                msg = data.get('message', {})
                done = data.get('done', False)

                if done:
                    # Use adapter for final chunk parsing
                    parsed = self.adapter.adapt_response(data)
                    yield LLMStreamResponse(
                        delta=msg.get('content', ''),
                        is_final=True,
                        finish_reason=parsed.finish_reason,
                        usage=parsed.usage,
                        tool_calls=parsed.tool_calls,
                        model=runtime_config.model,
                    )
                else:
                    yield LLMStreamResponse(
                        delta=msg.get('content', ''),
                        is_final=False,
                    )
embed async
embed(
    texts: Union[str, List[str]], **kwargs
) -> Union[List[float], List[List[float]]]

Generate embeddings.

Source code in packages/llm/src/dataknobs_llm/llm/providers/ollama.py
async def embed(
    self,
    texts: Union[str, List[str]],
    **kwargs
) -> Union[List[float], List[List[float]]]:
    """Generate embeddings."""
    if not self._is_initialized:
        await self.initialize()

    if isinstance(texts, str):
        texts = [texts]
        single = True
    else:
        single = False

    embeddings = []
    for text in texts:
        payload = {
            'model': self.config.model,
            'prompt': text
        }

        async with self._session.post(f"{self.base_url}/api/embeddings", json=payload) as response:
            response.raise_for_status()
            data = await response.json()
            embeddings.append(data['embedding'])

    return embeddings[0] if single else embeddings
function_call async
function_call(
    messages: List[LLMMessage], functions: List[Dict[str, Any]], **kwargs
) -> LLMResponse

Execute function calling with native Ollama tools support.

For Ollama 0.1.17+, uses native tools API. Falls back to prompt-based approach for older versions.

Source code in packages/llm/src/dataknobs_llm/llm/providers/ollama.py
    async def function_call(
        self,
        messages: List[LLMMessage],
        functions: List[Dict[str, Any]],
        **kwargs
    ) -> LLMResponse:
        """Execute function calling with native Ollama tools support.

        For Ollama 0.1.17+, uses native tools API.
        Falls back to prompt-based approach for older versions.
        """
        warnings.warn("function_call() is deprecated, use complete(tools=...) instead", DeprecationWarning, stacklevel=2)
        if not self._is_initialized:
            await self.initialize()

        # Add system prompt if configured
        if self.config.system_prompt and (not messages or messages[0].role != 'system'):
            messages = [LLMMessage(role='system', content=self.config.system_prompt)] + list(messages)

        # Convert to Ollama format
        ollama_messages = self._messages_to_ollama(messages)

        # function_call() receives raw dicts, not Tool objects — delegate
        # to the adapter's raw function converter.
        ollama_tools = self.adapter.adapt_raw_functions(functions)

        # Build payload with tools
        payload = {
            'model': self.config.model,
            'messages': ollama_messages,
            'tools': ollama_tools,
            'stream': False,
            'options': self._build_options()
        }

        try:
            async with self._session.post(f"{self.base_url}/api/chat", json=payload) as response:
                response.raise_for_status()
                data = await response.json()

            # Extract response and tool calls
            message = data.get('message', {})
            content = message.get('content', '')
            tool_calls = message.get('tool_calls', [])

            # Build response
            llm_response = LLMResponse(
                content=content,
                model=self.config.model,
                finish_reason='tool_calls' if tool_calls else 'stop',
                usage={
                    'prompt_tokens': data.get('prompt_eval_count', 0),
                    'completion_tokens': data.get('eval_count', 0),
                    'total_tokens': data.get('prompt_eval_count', 0) + data.get('eval_count', 0)
                } if 'eval_count' in data else None
            )

            # Add tool call information if present
            if tool_calls:
                # Use first tool call (Ollama can return multiple)
                tool_call = tool_calls[0]
                llm_response.function_call = {
                    'name': tool_call.get('function', {}).get('name', ''),
                    'arguments': tool_call.get('function', {}).get('arguments', {})
                }

            return llm_response

        except Exception as e:
            # Fallback to prompt-based approach if native tools not supported
            import logging
            logging.warning(f"Ollama native tools failed, falling back to prompt-based: {e}")

            function_descriptions = json.dumps(functions, indent=2)

            system_prompt = f"""You have access to these functions:
{function_descriptions}

To call a function, respond with JSON:
{{"function": "name", "arguments": {{...}}}}"""

            messages_with_system = [
                LLMMessage(role='system', content=system_prompt)
            ] + list(messages)

            llm_response = await self.complete(messages_with_system, **kwargs)

            # Try to parse function call
            try:
                func_data = json.loads(llm_response.content)
                if 'function' in func_data:
                    llm_response.function_call = {
                        'name': func_data['function'],
                        'arguments': func_data.get('arguments', {})
                    }
            except json.JSONDecodeError:
                pass

            return llm_response

HuggingFaceProvider

HuggingFaceProvider(
    config: Union[LLMConfig, Config, Dict[str, Any]],
    prompt_builder: AsyncPromptBuilder | None = None,
)

Bases: AsyncLLMProvider

HuggingFace Inference API provider.

Methods:

Name Description
initialize

Initialize HuggingFace client.

validate_model

Validate model availability.

complete

Generate completion.

stream_complete

HuggingFace Inference API doesn't support streaming.

embed

Generate embeddings.

function_call

HuggingFace doesn't have native function calling.

Source code in packages/llm/src/dataknobs_llm/llm/providers/huggingface.py
def __init__(
    self,
    config: Union[LLMConfig, "Config", Dict[str, Any]],
    prompt_builder: AsyncPromptBuilder | None = None
):
    # Normalize config first
    llm_config = normalize_llm_config(config)
    super().__init__(llm_config, prompt_builder=prompt_builder)
    self.base_url = llm_config.api_base or 'https://api-inference.huggingface.co/models'
Functions
initialize async
initialize() -> None

Initialize HuggingFace client.

Source code in packages/llm/src/dataknobs_llm/llm/providers/huggingface.py
async def initialize(self) -> None:
    """Initialize HuggingFace client."""
    try:
        import aiohttp

        api_key = self.config.api_key or os.environ.get('HUGGINGFACE_API_KEY')
        if not api_key:
            raise ValueError("HuggingFace API key not provided")

        self._session = aiohttp.ClientSession(
            headers={'Authorization': f'Bearer {api_key}'},
            timeout=aiohttp.ClientTimeout(total=self.config.timeout)
        )
        self._is_initialized = True
    except ImportError as e:
        raise ImportError("aiohttp package not installed. Install with: pip install aiohttp") from e
validate_model async
validate_model() -> bool

Validate model availability.

Source code in packages/llm/src/dataknobs_llm/llm/providers/huggingface.py
async def validate_model(self) -> bool:
    """Validate model availability."""
    try:
        url = f"{self.base_url}/{self.config.model}"
        async with self._session.get(url) as response:
            return response.status == 200
    except Exception:
        return False
complete async
complete(
    messages: Union[str, List[LLMMessage]],
    config_overrides: Dict[str, Any] | None = None,
    tools: list[Any] | None = None,
    **kwargs: Any,
) -> LLMResponse

Generate completion.

Parameters:

Name Type Description Default
messages Union[str, List[LLMMessage]]

Input messages or prompt

required
config_overrides Dict[str, Any] | None

Optional dict to override config fields (model, temperature, max_tokens, top_p, stop_sequences, seed)

None
tools list[Any] | None

Optional list of Tool objects (not supported — raises ToolsNotSupportedError if provided)

None
**kwargs Any

Additional provider-specific parameters

{}
Source code in packages/llm/src/dataknobs_llm/llm/providers/huggingface.py
async def complete(
    self,
    messages: Union[str, List[LLMMessage]],
    config_overrides: Dict[str, Any] | None = None,
    tools: list[Any] | None = None,
    **kwargs: Any
) -> LLMResponse:
    """Generate completion.

    Args:
        messages: Input messages or prompt
        config_overrides: Optional dict to override config fields (model,
            temperature, max_tokens, top_p, stop_sequences, seed)
        tools: Optional list of Tool objects (not supported — raises
            ToolsNotSupportedError if provided)
        **kwargs: Additional provider-specific parameters
    """
    if tools:
        from ...exceptions import ToolsNotSupportedError
        raise ToolsNotSupportedError(
            model=self.config.model,
            suggestion="HuggingFace Inference API does not support tool calling.",
        )

    if not self._is_initialized:
        await self.initialize()

    # Get runtime config (with overrides applied if provided)
    runtime_config = self._get_runtime_config(config_overrides)

    # Convert to prompt
    if isinstance(messages, str):
        prompt = messages
    else:
        prompt = self._build_prompt(messages)

    # Make API call
    url = f"{self.base_url}/{runtime_config.model}"
    gen = runtime_config.generation_params()
    parameters: Dict[str, Any] = {
        'max_new_tokens': gen.get('max_tokens', 100),
        'return_full_text': False,
    }
    if 'temperature' in gen:
        parameters['temperature'] = gen['temperature']
    if 'top_p' in gen:
        parameters['top_p'] = gen['top_p']
    payload = {
        'inputs': prompt,
        'parameters': parameters,
    }

    async with self._session.post(url, json=payload) as response:
        response.raise_for_status()
        data = await response.json()

    # Parse response
    if isinstance(data, list) and len(data) > 0:
        text = data[0].get('generated_text', '')
    else:
        text = str(data)

    return self._analyze_response(LLMResponse(
        content=text,
        model=runtime_config.model,
        finish_reason='stop'
    ))
stream_complete async
stream_complete(
    messages: Union[str, List[LLMMessage]],
    config_overrides: Dict[str, Any] | None = None,
    tools: list[Any] | None = None,
    **kwargs: Any,
) -> AsyncIterator[LLMStreamResponse]

HuggingFace Inference API doesn't support streaming.

Parameters:

Name Type Description Default
messages Union[str, List[LLMMessage]]

Input messages or prompt

required
config_overrides Dict[str, Any] | None

Optional dict to override config fields (model, temperature, max_tokens, top_p, stop_sequences, seed)

None
tools list[Any] | None

Optional list of Tool objects (not supported — raises ToolsNotSupportedError if provided)

None
**kwargs Any

Additional provider-specific parameters

{}
Source code in packages/llm/src/dataknobs_llm/llm/providers/huggingface.py
async def stream_complete(
    self,
    messages: Union[str, List[LLMMessage]],
    config_overrides: Dict[str, Any] | None = None,
    tools: list[Any] | None = None,
    **kwargs: Any
) -> AsyncIterator[LLMStreamResponse]:
    """HuggingFace Inference API doesn't support streaming.

    Args:
        messages: Input messages or prompt
        config_overrides: Optional dict to override config fields (model,
            temperature, max_tokens, top_p, stop_sequences, seed)
        tools: Optional list of Tool objects (not supported — raises
            ToolsNotSupportedError if provided)
        **kwargs: Additional provider-specific parameters
    """
    # Simulate streaming by yielding complete response (tools forwarded to
    # complete(), which raises ToolsNotSupportedError if tools are passed)
    response = await self.complete(
        messages, config_overrides=config_overrides, tools=tools, **kwargs
    )
    yield LLMStreamResponse(
        delta=response.content,
        is_final=True,
        finish_reason=response.finish_reason,
        model=response.model,
    )
embed async
embed(
    texts: Union[str, List[str]], **kwargs
) -> Union[List[float], List[List[float]]]

Generate embeddings.

Source code in packages/llm/src/dataknobs_llm/llm/providers/huggingface.py
async def embed(
    self,
    texts: Union[str, List[str]],
    **kwargs
) -> Union[List[float], List[List[float]]]:
    """Generate embeddings."""
    if not self._is_initialized:
        await self.initialize()

    if isinstance(texts, str):
        texts = [texts]
        single = True
    else:
        single = False

    url = f"{self.base_url}/{self.config.model}"
    payload = {'inputs': texts}

    async with self._session.post(url, json=payload) as response:
        response.raise_for_status()
        embeddings = await response.json()

    return embeddings[0] if single else embeddings
function_call async
function_call(
    messages: List[LLMMessage], functions: List[Dict[str, Any]], **kwargs
) -> LLMResponse

HuggingFace doesn't have native function calling.

Source code in packages/llm/src/dataknobs_llm/llm/providers/huggingface.py
async def function_call(
    self,
    messages: List[LLMMessage],
    functions: List[Dict[str, Any]],
    **kwargs
) -> LLMResponse:
    """HuggingFace doesn't have native function calling."""
    warnings.warn("function_call() is deprecated, use complete(tools=...) instead", DeprecationWarning, stacklevel=2)
    raise NotImplementedError("Function calling not supported for HuggingFace models")

EchoProvider

EchoProvider(
    config: Union[LLMConfig, Config, Dict[str, Any]],
    prompt_builder: AsyncPromptBuilder | None = None,
    responses: List[Union[str, LLMResponse]] | None = None,
    response_fn: ResponseFunction | None = None,
    strict_tools: bool = True,
    strict: bool = False,
)

Bases: AsyncLLMProvider

Echo provider for testing and debugging.

This provider echoes back input messages and generates deterministic mock embeddings. Perfect for testing without real LLM API calls.

Features: - Echoes back user messages with configurable prefix - Generates deterministic embeddings based on content hash - Supports streaming (character-by-character echo) - Mocks function calling with deterministic responses - Zero external dependencies - Instant responses - Strict mode: raise when response queue is exhausted instead of falling back to echo behavior (catches under-scripted tests) - Instance tracking: class-level tracking of created instances for inspecting providers created internally by from_config()

Scripted Response Features (for testing): - Response queue: Provide ordered list of responses - Response function: Dynamic response based on input - Pattern matching: Map input patterns to responses - Call tracking: Record all calls for assertions

Example
# Queue mode - responses consumed in order
provider = EchoProvider(config)
provider.set_responses(["First response", "Second response"])

# Function mode - dynamic responses
provider.set_response_function(
    lambda msgs: f"Got {len(msgs)} messages"
)

# Pattern mode - match input to responses
provider.add_pattern_response(r"hello", "Hi there!")
provider.add_pattern_response(r"bye", "Goodbye!")

# Check what was called
assert provider.call_count == 2
assert "hello" in provider.get_call(0).messages[0].content

# Strict mode - catch under-scripted tests
provider = EchoProvider(config, strict=True)
provider.set_responses(["only one"])
await provider.complete("first")   # Returns "only one"
await provider.complete("second")  # Raises ResponseQueueExhaustedError

# Instance tracking - inspect providers created by from_config()
with EchoProvider.track_instances() as instances:
    bot = await DynaBot.from_config(config)
assert instances[0].close_count == 0  # or == 1 on error path

Initialize EchoProvider.

Parameters:

Name Type Description Default
config Union[LLMConfig, Config, Dict[str, Any]]

LLM configuration. strict can also be enabled via config["options"]["strict"] = True for cases where the provider is created by a factory (e.g. LLMProviderFactory).

required
prompt_builder AsyncPromptBuilder | None

Optional prompt builder

None
responses List[Union[str, LLMResponse]] | None

Optional list of responses to return in order

None
response_fn ResponseFunction | None

Optional function to generate responses dynamically

None
strict_tools bool

If True (default), raise ValueError when a scripted response contains tool_calls but no tools were provided to complete(). Real providers never return tool_calls unless tool definitions are sent in the request, so this catches callers that forget to pass tools. Set to False only for tests that intentionally exercise edge cases (e.g. "what happens if tool_calls arrive unexpectedly").

True
strict bool

If True, raise ResponseQueueExhaustedError when complete() is called after all scripted responses have been consumed, instead of falling back to echo behavior. Use this to catch under-scripted tests and to exercise error-cleanup paths in callers.

False

Methods:

Name Description
set_responses

Set queue of responses to return in order.

add_response

Add a single response to the queue.

set_response_function

Set function to generate dynamic responses.

add_pattern_response

Add pattern-matched response.

clear_responses

Clear all scripted responses and reset to echo mode.

clear_history

Clear call history.

reset

Reset all state (responses, history, init count, and close count).

get_last_instance

Return the most recently created EchoProvider instance.

track_instances

Context manager that collects all EchoProvider instances created within.

reset_tracking

Clear all instance tracking state.

get_call

Get a specific call by index.

get_last_call

Get the most recent call, or None if no calls made.

get_last_user_message

Get the last user message from the most recent call.

initialize

Initialize echo provider. Tracks call count via init_count.

close

Close echo provider, tracking the call.

validate_model

Validate model (always true for echo).

complete

Echo back the input messages or return scripted response.

stream_complete

Stream echo response character by character.

embed

Generate deterministic mock embeddings.

function_call

Mock function calling with deterministic response.

Attributes:

Name Type Description
strict bool

Whether strict mode is enabled (raises on exhausted queue).

init_count int

Get number of initialize() calls made.

close_count int

Get number of close() calls made.

call_count int

Get number of complete() calls made.

calls List[Dict[str, Any]]

Get all recorded calls.

embed_call_count int

Get number of embed() calls made.

embed_calls List[Dict[str, Any]]

Get all recorded embed calls.

Source code in packages/llm/src/dataknobs_llm/llm/providers/echo.py
def __init__(
    self,
    config: Union[LLMConfig, Config, Dict[str, Any]],
    prompt_builder: AsyncPromptBuilder | None = None,
    responses: List[Union[str, LLMResponse]] | None = None,
    response_fn: ResponseFunction | None = None,
    strict_tools: bool = True,
    strict: bool = False,
):
    """Initialize EchoProvider.

    Args:
        config: LLM configuration.  ``strict`` can also be enabled via
            ``config["options"]["strict"] = True`` for cases where
            the provider is created by a factory (e.g.
            ``LLMProviderFactory``).
        prompt_builder: Optional prompt builder
        responses: Optional list of responses to return in order
        response_fn: Optional function to generate responses dynamically
        strict_tools: If True (default), raise ValueError when a scripted
            response contains tool_calls but no tools were provided to
            complete(). Real providers never return tool_calls unless tool
            definitions are sent in the request, so this catches callers
            that forget to pass tools. Set to False only for tests that
            intentionally exercise edge cases (e.g. "what happens if
            tool_calls arrive unexpectedly").
        strict: If True, raise ``ResponseQueueExhaustedError`` when
            ``complete()`` is called after all scripted responses have
            been consumed, instead of falling back to echo behavior.
            Use this to catch under-scripted tests and to exercise
            error-cleanup paths in callers.
    """
    # Normalize config first
    llm_config = normalize_llm_config(config)
    super().__init__(llm_config, prompt_builder=prompt_builder)

    # Echo-specific configuration from options
    self.echo_prefix = llm_config.options.get('echo_prefix', 'Echo: ')
    self.embedding_dim = llm_config.options.get('embedding_dim', 768)
    self.mock_tokens = llm_config.options.get('mock_tokens', True)
    self.stream_delay = llm_config.options.get('stream_delay', 0.0)  # seconds per char

    # Scripted response state
    self._response_queue: List[Union[str, LLMResponse, ErrorResponse]] = list(
        responses or []
    )
    self._response_fn: ResponseFunction | None = response_fn
    self._pattern_responses: List[
        tuple[re.Pattern[str], Union[str, LLMResponse, ErrorResponse]]
    ] = []
    self._call_history: List[Dict[str, Any]] = []
    self._embed_history: List[Dict[str, Any]] = []
    self._cycle_responses: bool = llm_config.options.get('cycle_responses', False)
    self._init_count: int = 0
    self._close_count: int = 0
    self._strict_tools: bool = strict_tools
    self._strict: bool = strict or llm_config.options.get('strict', False)

    # Instance tracking
    EchoProvider._last_instance = self
    for collector in EchoProvider._instance_collectors:
        collector.append(self)
Attributes
strict property
strict: bool

Whether strict mode is enabled (raises on exhausted queue).

init_count property
init_count: int

Get number of initialize() calls made.

close_count property
close_count: int

Get number of close() calls made.

call_count property
call_count: int

Get number of complete() calls made.

calls property
calls: List[Dict[str, Any]]

Get all recorded calls.

embed_call_count property
embed_call_count: int

Get number of embed() calls made.

embed_calls property
embed_calls: List[Dict[str, Any]]

Get all recorded embed calls.

Functions
set_responses
set_responses(
    responses: List[Union[str, LLMResponse, ErrorResponse]], cycle: bool = False
) -> EchoProvider

Set queue of responses to return in order.

Parameters:

Name Type Description Default
responses List[Union[str, LLMResponse, ErrorResponse]]

List of response strings or LLMResponse objects

required
cycle bool

If True, cycle through responses instead of exhausting

False

Returns:

Type Description
EchoProvider

Self for chaining

Source code in packages/llm/src/dataknobs_llm/llm/providers/echo.py
def set_responses(
    self,
    responses: List[Union[str, LLMResponse, ErrorResponse]],
    cycle: bool = False,
) -> EchoProvider:
    """Set queue of responses to return in order.

    Args:
        responses: List of response strings or LLMResponse objects
        cycle: If True, cycle through responses instead of exhausting

    Returns:
        Self for chaining
    """
    self._response_queue = list(responses)
    self._cycle_responses = cycle
    return self
add_response
add_response(response: Union[str, LLMResponse, ErrorResponse]) -> EchoProvider

Add a single response to the queue.

Parameters:

Name Type Description Default
response Union[str, LLMResponse, ErrorResponse]

Response string or LLMResponse object

required

Returns:

Type Description
EchoProvider

Self for chaining

Source code in packages/llm/src/dataknobs_llm/llm/providers/echo.py
def add_response(
    self, response: Union[str, LLMResponse, ErrorResponse]
) -> EchoProvider:
    """Add a single response to the queue.

    Args:
        response: Response string or LLMResponse object

    Returns:
        Self for chaining
    """
    self._response_queue.append(response)
    return self
set_response_function
set_response_function(fn: ResponseFunction) -> EchoProvider

Set function to generate dynamic responses.

The function receives the message list and returns either a string (converted to LLMResponse) or an LLMResponse object.

Parameters:

Name Type Description Default
fn ResponseFunction

Function(messages) -> str | LLMResponse

required

Returns:

Type Description
EchoProvider

Self for chaining

Source code in packages/llm/src/dataknobs_llm/llm/providers/echo.py
def set_response_function(
    self,
    fn: ResponseFunction,
) -> EchoProvider:
    """Set function to generate dynamic responses.

    The function receives the message list and returns either
    a string (converted to LLMResponse) or an LLMResponse object.

    Args:
        fn: Function(messages) -> str | LLMResponse

    Returns:
        Self for chaining
    """
    self._response_fn = fn
    return self
add_pattern_response
add_pattern_response(
    pattern: str,
    response: Union[str, LLMResponse, ErrorResponse],
    flags: int = re.IGNORECASE,
) -> EchoProvider

Add pattern-matched response.

When user message matches pattern, return the specified response.

Parameters:

Name Type Description Default
pattern str

Regex pattern to match against user messages

required
response Union[str, LLMResponse, ErrorResponse]

Response to return when pattern matches

required
flags int

Regex flags (default: case-insensitive)

IGNORECASE

Returns:

Type Description
EchoProvider

Self for chaining

Source code in packages/llm/src/dataknobs_llm/llm/providers/echo.py
def add_pattern_response(
    self,
    pattern: str,
    response: Union[str, LLMResponse, ErrorResponse],
    flags: int = re.IGNORECASE,
) -> EchoProvider:
    """Add pattern-matched response.

    When user message matches pattern, return the specified response.

    Args:
        pattern: Regex pattern to match against user messages
        response: Response to return when pattern matches
        flags: Regex flags (default: case-insensitive)

    Returns:
        Self for chaining
    """
    compiled = re.compile(pattern, flags)
    self._pattern_responses.append((compiled, response))
    return self
clear_responses
clear_responses() -> EchoProvider

Clear all scripted responses and reset to echo mode.

Returns:

Type Description
EchoProvider

Self for chaining

Source code in packages/llm/src/dataknobs_llm/llm/providers/echo.py
def clear_responses(self) -> EchoProvider:
    """Clear all scripted responses and reset to echo mode.

    Returns:
        Self for chaining
    """
    self._response_queue.clear()
    self._response_fn = None
    self._pattern_responses.clear()
    return self
clear_history
clear_history() -> EchoProvider

Clear call history.

Returns:

Type Description
EchoProvider

Self for chaining

Source code in packages/llm/src/dataknobs_llm/llm/providers/echo.py
def clear_history(self) -> EchoProvider:
    """Clear call history.

    Returns:
        Self for chaining
    """
    self._call_history.clear()
    return self
reset
reset() -> EchoProvider

Reset all state (responses, history, init count, and close count).

Returns:

Type Description
EchoProvider

Self for chaining

Source code in packages/llm/src/dataknobs_llm/llm/providers/echo.py
def reset(self) -> EchoProvider:
    """Reset all state (responses, history, init count, and close count).

    Returns:
        Self for chaining
    """
    self.clear_responses()
    self.clear_history()
    self._init_count = 0
    self._close_count = 0
    return self
get_last_instance classmethod
get_last_instance() -> EchoProvider | None

Return the most recently created EchoProvider instance.

Useful for inspecting providers created internally by code like DynaBot.from_config() where the test has no direct reference.

Source code in packages/llm/src/dataknobs_llm/llm/providers/echo.py
@classmethod
def get_last_instance(cls) -> EchoProvider | None:
    """Return the most recently created EchoProvider instance.

    Useful for inspecting providers created internally by code like
    ``DynaBot.from_config()`` where the test has no direct reference.
    """
    return cls._last_instance
track_instances classmethod
track_instances() -> Iterator[list[EchoProvider]]

Context manager that collects all EchoProvider instances created within.

Example::

with EchoProvider.track_instances() as instances:
    bot = await DynaBot.from_config(config)
assert len(instances) == 1
assert instances[0].close_count == 1  # cleaned up on error path
Source code in packages/llm/src/dataknobs_llm/llm/providers/echo.py
@classmethod
@contextmanager
def track_instances(cls) -> Iterator[list[EchoProvider]]:
    """Context manager that collects all EchoProvider instances created within.

    Example::

        with EchoProvider.track_instances() as instances:
            bot = await DynaBot.from_config(config)
        assert len(instances) == 1
        assert instances[0].close_count == 1  # cleaned up on error path
    """
    collector: list[EchoProvider] = []
    cls._instance_collectors.append(collector)
    try:
        yield collector
    finally:
        cls._instance_collectors.remove(collector)
reset_tracking classmethod
reset_tracking() -> None

Clear all instance tracking state.

Resets _last_instance and removes any active collectors (e.g. from a track_instances() context that was not properly exited). Call in test teardown to prevent cross-test leakage.

Source code in packages/llm/src/dataknobs_llm/llm/providers/echo.py
@classmethod
def reset_tracking(cls) -> None:
    """Clear all instance tracking state.

    Resets ``_last_instance`` and removes any active collectors
    (e.g. from a ``track_instances()`` context that was not
    properly exited).  Call in test teardown to prevent cross-test
    leakage.
    """
    cls._last_instance = None
    cls._instance_collectors.clear()
get_call
get_call(index: int) -> Dict[str, Any]

Get a specific call by index.

Parameters:

Name Type Description Default
index int

Call index (supports negative indexing)

required

Returns:

Type Description
Dict[str, Any]

Call record with 'messages', 'response', 'kwargs'

Source code in packages/llm/src/dataknobs_llm/llm/providers/echo.py
def get_call(self, index: int) -> Dict[str, Any]:
    """Get a specific call by index.

    Args:
        index: Call index (supports negative indexing)

    Returns:
        Call record with 'messages', 'response', 'kwargs'
    """
    return self._call_history[index]
get_last_call
get_last_call() -> Dict[str, Any] | None

Get the most recent call, or None if no calls made.

Source code in packages/llm/src/dataknobs_llm/llm/providers/echo.py
def get_last_call(self) -> Dict[str, Any] | None:
    """Get the most recent call, or None if no calls made."""
    return self._call_history[-1] if self._call_history else None
get_last_user_message
get_last_user_message() -> str | None

Get the last user message from the most recent call.

Source code in packages/llm/src/dataknobs_llm/llm/providers/echo.py
def get_last_user_message(self) -> str | None:
    """Get the last user message from the most recent call."""
    if not self._call_history:
        return None
    messages = self._call_history[-1].get("messages", [])
    for msg in reversed(messages):
        if isinstance(msg, LLMMessage) and msg.role == "user":
            return msg.content
        elif isinstance(msg, dict) and msg.get("role") == "user":
            return msg.get("content", "")
    return None
initialize async
initialize() -> None

Initialize echo provider. Tracks call count via init_count.

Note: complete(), stream_complete(), embed(), and function_call() auto-call this method if the provider is not yet initialized, which also increments init_count.

Source code in packages/llm/src/dataknobs_llm/llm/providers/echo.py
async def initialize(self) -> None:
    """Initialize echo provider. Tracks call count via ``init_count``.

    Note: ``complete()``, ``stream_complete()``, ``embed()``, and
    ``function_call()`` auto-call this method if the provider is not
    yet initialized, which also increments ``init_count``.
    """
    self._init_count += 1
    self._is_initialized = True
close async
close() -> None

Close echo provider, tracking the call.

Increments close_count on every call (even redundant ones) so tests can verify exactly how many times close was invoked.

Source code in packages/llm/src/dataknobs_llm/llm/providers/echo.py
async def close(self) -> None:
    """Close echo provider, tracking the call.

    Increments ``close_count`` on every call (even redundant ones)
    so tests can verify exactly how many times close was invoked.
    """
    self._close_count += 1
    await super().close()
validate_model async
validate_model() -> bool

Validate model (always true for echo).

Source code in packages/llm/src/dataknobs_llm/llm/providers/echo.py
async def validate_model(self) -> bool:
    """Validate model (always true for echo)."""
    return True
complete async
complete(
    messages: Union[str, List[LLMMessage]],
    config_overrides: Dict[str, Any] | None = None,
    tools: list[Any] | None = None,
    **kwargs: Any,
) -> LLMResponse

Echo back the input messages or return scripted response.

Response priority: 1. Response function (if set via set_response_function) 2. Pattern match (if added via add_pattern_response) 3. Response queue (if set via set_responses or add_response) 4. Default echo behavior

Parameters:

Name Type Description Default
messages Union[str, List[LLMMessage]]

Input messages or prompt

required
config_overrides Dict[str, Any] | None

Optional dict to override config fields (model, temperature, max_tokens, top_p, stop_sequences, seed)

None
tools list[Any] | None

Optional list of Tool objects (recorded in call history)

None
**kwargs Any

Additional parameters (ignored)

{}

Returns:

Type Description
LLMResponse

LLMResponse (scripted or echo)

Source code in packages/llm/src/dataknobs_llm/llm/providers/echo.py
async def complete(
    self,
    messages: Union[str, List[LLMMessage]],
    config_overrides: Dict[str, Any] | None = None,
    tools: list[Any] | None = None,
    **kwargs: Any
) -> LLMResponse:
    """Echo back the input messages or return scripted response.

    Response priority:
    1. Response function (if set via set_response_function)
    2. Pattern match (if added via add_pattern_response)
    3. Response queue (if set via set_responses or add_response)
    4. Default echo behavior

    Args:
        messages: Input messages or prompt
        config_overrides: Optional dict to override config fields (model,
            temperature, max_tokens, top_p, stop_sequences, seed)
        tools: Optional list of Tool objects (recorded in call history)
        **kwargs: Additional parameters (ignored)

    Returns:
        LLMResponse (scripted or echo)
    """
    if not self._is_initialized:
        await self.initialize()

    # Get runtime config (with overrides applied if provided)
    runtime_config = self._get_runtime_config(config_overrides)

    # Convert to message list
    if isinstance(messages, str):
        messages = [LLMMessage(role='user', content=messages)]

    # Try scripted response first
    try:
        response = self._resolve_response(messages, runtime_config)
    except Exception:
        # Record the failed call before re-raising
        self._call_history.append({
            'messages': messages,
            'response': None,
            'config_overrides': config_overrides,
            'tools': tools,
            'kwargs': kwargs,
            'error': True,
        })
        raise

    if response is None and self._strict:
        from dataknobs_llm.exceptions import ResponseQueueExhaustedError

        self._call_history.append({
            'messages': messages,
            'response': None,
            'config_overrides': config_overrides,
            'tools': tools,
            'kwargs': kwargs,
            'error': True,
        })
        raise ResponseQueueExhaustedError(
            call_count=len(self._call_history),
        )

    if response is None:
        # Fall back to default echo behavior
        user_messages = [msg for msg in messages if msg.role == 'user']
        if user_messages:
            content = self.echo_prefix + user_messages[-1].content
        else:
            content = self.echo_prefix + "(no user message)"

        # Add system prompt if configured and in echo
        if runtime_config.system_prompt and runtime_config.options.get('echo_system', False):
            content = f"[System: {runtime_config.system_prompt}]\n{content}"

        # Mock token usage
        prompt_tokens = sum(self._count_tokens(msg.content) for msg in messages)
        completion_tokens = self._count_tokens(content)

        response = LLMResponse(
            content=content,
            model=runtime_config.model or 'echo-model',
            finish_reason='stop',
            usage={
                'prompt_tokens': prompt_tokens,
                'completion_tokens': completion_tokens,
                'total_tokens': prompt_tokens + completion_tokens
            } if self.mock_tokens else None
        )

    response = self._analyze_response(response)

    # Validate tool_calls vs tools consistency
    if (
        self._strict_tools
        and getattr(response, "tool_calls", None)
        and not tools
    ):
        raise ValueError(
            "EchoProvider returned a response with tool_calls but no "
            "tools were provided to complete(). Real LLM providers only "
            "return tool_calls when tool definitions are included in the "
            "request. This usually means the caller forgot to pass "
            "tools=... to complete() or stream_complete(). Set "
            "strict_tools=False on EchoProvider to disable this check "
            "for tests that intentionally exercise unexpected tool_calls."
        )

    # Record the call
    self._call_history.append({
        'messages': messages,
        'response': response,
        'config_overrides': config_overrides,
        'tools': tools,
        'kwargs': kwargs,
    })

    return response
stream_complete async
stream_complete(
    messages: Union[str, List[LLMMessage]],
    config_overrides: Dict[str, Any] | None = None,
    tools: list[Any] | None = None,
    **kwargs: Any,
) -> AsyncIterator[LLMStreamResponse]

Stream echo response character by character.

Parameters:

Name Type Description Default
messages Union[str, List[LLMMessage]]

Input messages or prompt

required
config_overrides Dict[str, Any] | None

Optional dict to override config fields (model, temperature, max_tokens, top_p, stop_sequences, seed)

None
tools list[Any] | None

Optional list of Tool objects (forwarded to complete())

None
**kwargs Any

Additional parameters (ignored)

{}

Yields:

Type Description
AsyncIterator[LLMStreamResponse]

Streaming response chunks with tool_calls on final chunk

Source code in packages/llm/src/dataknobs_llm/llm/providers/echo.py
async def stream_complete(
    self,
    messages: Union[str, List[LLMMessage]],
    config_overrides: Dict[str, Any] | None = None,
    tools: list[Any] | None = None,
    **kwargs: Any
) -> AsyncIterator[LLMStreamResponse]:
    """Stream echo response character by character.

    Args:
        messages: Input messages or prompt
        config_overrides: Optional dict to override config fields (model,
            temperature, max_tokens, top_p, stop_sequences, seed)
        tools: Optional list of Tool objects (forwarded to complete())
        **kwargs: Additional parameters (ignored)

    Yields:
        Streaming response chunks with tool_calls on final chunk
    """
    if not self._is_initialized:
        await self.initialize()

    # Get full response (forwards tools to complete())
    response = await self.complete(
        messages, config_overrides=config_overrides, tools=tools, **kwargs
    )

    # Handle empty content (e.g. tool_call_response with content="")
    if not response.content:
        yield LLMStreamResponse(
            delta="",
            is_final=True,
            finish_reason=response.finish_reason or "stop",
            usage=response.usage,
            tool_calls=response.tool_calls,
            model=response.model,
        )
        return

    # Stream character by character
    for i, char in enumerate(response.content):
        is_final = (i == len(response.content) - 1)

        yield LLMStreamResponse(
            delta=char,
            is_final=is_final,
            finish_reason=response.finish_reason if is_final else None,
            usage=response.usage if is_final else None,
            tool_calls=response.tool_calls if is_final else None,
            model=response.model if is_final else None,
        )

        # Optional delay for realistic streaming
        if self.stream_delay > 0:
            import asyncio
            await asyncio.sleep(self.stream_delay)
embed async
embed(
    texts: Union[str, List[str]], **kwargs: Any
) -> Union[List[float], List[List[float]]]

Generate deterministic mock embeddings.

Parameters:

Name Type Description Default
texts Union[str, List[str]]

Input text(s)

required
**kwargs Any

Additional parameters (ignored)

{}

Returns:

Type Description
Union[List[float], List[List[float]]]

Embedding vector(s)

Source code in packages/llm/src/dataknobs_llm/llm/providers/echo.py
async def embed(
    self,
    texts: Union[str, List[str]],
    **kwargs: Any
) -> Union[List[float], List[List[float]]]:
    """Generate deterministic mock embeddings.

    Args:
        texts: Input text(s)
        **kwargs: Additional parameters (ignored)

    Returns:
        Embedding vector(s)
    """
    if not self._is_initialized:
        await self.initialize()

    self._embed_history.append({"texts": texts, "kwargs": kwargs})

    if isinstance(texts, str):
        return self._generate_embedding(texts)
    else:
        return [self._generate_embedding(text) for text in texts]
function_call async
function_call(
    messages: List[LLMMessage], functions: List[Dict[str, Any]], **kwargs: Any
) -> LLMResponse

Mock function calling with deterministic response.

Parameters:

Name Type Description Default
messages List[LLMMessage]

Conversation messages

required
functions List[Dict[str, Any]]

Available functions

required
**kwargs Any

Additional parameters (ignored)

{}

Returns:

Type Description
LLMResponse

Response with mock function call

Source code in packages/llm/src/dataknobs_llm/llm/providers/echo.py
async def function_call(
    self,
    messages: List[LLMMessage],
    functions: List[Dict[str, Any]],
    **kwargs: Any
) -> LLMResponse:
    """Mock function calling with deterministic response.

    Args:
        messages: Conversation messages
        functions: Available functions
        **kwargs: Additional parameters (ignored)

    Returns:
        Response with mock function call
    """
    warnings.warn("function_call() is deprecated, use complete(tools=...) instead", DeprecationWarning, stacklevel=2)
    if not self._is_initialized:
        await self.initialize()

    # Get last user message
    user_messages = [msg for msg in messages if msg.role == 'user']
    user_content = user_messages[-1].content if user_messages else ""

    # Mock function call: use first function with mock arguments
    if functions:
        first_func = functions[0]
        func_name = first_func.get('name', 'unknown_function')

        # Generate mock arguments based on parameters schema
        params = first_func.get('parameters', {})
        properties = params.get('properties', {})

        mock_args = {}
        for param_name, param_schema in properties.items():
            param_type = param_schema.get('type', 'string')

            # Generate mock value based on type
            if param_type == 'string':
                mock_args[param_name] = f"mock_{param_name}_from_echo"
            elif param_type == 'number' or param_type == 'integer':
                # Use hash to generate deterministic number
                hash_val = int(hashlib.md5(user_content.encode()).hexdigest()[:8], 16)
                mock_args[param_name] = hash_val % 100
            elif param_type == 'boolean':
                # Deterministic boolean based on hash
                hash_val = int(hashlib.md5(user_content.encode()).hexdigest()[:2], 16)
                mock_args[param_name] = hash_val % 2 == 0
            elif param_type == 'array':
                mock_args[param_name] = ["mock_item_1", "mock_item_2"]
            elif param_type == 'object':
                mock_args[param_name] = {"mock_key": "mock_value"}
            else:
                mock_args[param_name] = None

        # Build response with function call
        content = f"{self.echo_prefix}Calling function '{func_name}'"

        prompt_tokens = sum(self._count_tokens(msg.content) for msg in messages)
        completion_tokens = self._count_tokens(content)

        return LLMResponse(
            content=content,
            model=self.config.model or 'echo-model',
            finish_reason='function_call',
            usage={
                'prompt_tokens': prompt_tokens,
                'completion_tokens': completion_tokens,
                'total_tokens': prompt_tokens + completion_tokens
            } if self.mock_tokens else None,
            function_call={
                'name': func_name,
                'arguments': mock_args
            }
        )
    else:
        # No functions provided, just echo
        return await self.complete(messages, **kwargs)

CachingEmbedProvider

CachingEmbedProvider(inner: AsyncLLMProvider, cache: EmbeddingCache)

Bases: AsyncLLMProvider

Provider wrapper that caches embed() results persistently.

Embeddings are deterministic: same (model, text) produces the same vector. This wrapper caches them once and reuses them across scenarios and runs. complete(), stream_complete(), and function_call() pass through to the inner provider unchanged.

Lifecycle ownership: When wrapping a pre-initialized provider (e.g. one created and initialized by DynaBot.from_config()), the caching provider does NOT take ownership of the inner provider's lifecycle. initialize() skips re-initializing the inner, and close() skips closing it — the original owner is responsible for the inner's lifecycle.

Parameters:

Name Type Description Default
inner AsyncLLMProvider

The real provider to delegate to.

required
cache EmbeddingCache

The cache backend for storing embeddings.

required

Examples::

# Basic usage — cache hit/miss:
inner = EchoProvider({"provider": "echo", "model": "test"})
cache = MemoryEmbeddingCache()
provider = CachingEmbedProvider(inner, cache)
await provider.initialize()
vec1 = await provider.embed("hello")   # cache miss -> inner
vec2 = await provider.embed("hello")   # cache hit -> no inner call

# Fresh provider — CachingEmbedProvider owns the inner lifecycle:
provider = CachingEmbedProvider(inner, cache)
await provider.initialize()   # initializes both inner and cache
await provider.close()        # closes both inner and cache

# Pre-initialized provider — caller owns the inner lifecycle:
inner = OllamaProvider(config)
await inner.initialize()      # caller initializes
provider = CachingEmbedProvider(inner, MemoryEmbeddingCache())
await provider.initialize()   # skips inner, initializes cache only
await provider.close()        # skips inner, closes cache only
await inner.close()           # caller closes inner

Methods:

Name Description
validate_model

Delegate model validation to inner provider.

get_capabilities

Delegate capabilities to inner provider.

initialize

Initialize the inner provider (if not already) and the cache.

close

Close the cache, and the inner provider only if we own it.

complete

Delegate to inner provider.

stream_complete

Delegate to inner provider (async generator passthrough).

function_call

Delegate to inner provider.

embed

Return cached embeddings, delegating to inner on cache miss.

Attributes:

Name Type Description
config LLMConfig

Forward config from the inner provider.

Source code in packages/llm/src/dataknobs_llm/llm/providers/caching.py
def __init__(self, inner: AsyncLLMProvider, cache: EmbeddingCache) -> None:
    super().__init__(inner.config)
    self._inner = inner
    self._cache = cache
    self._owns_inner = False
Attributes
config property writable
config: LLMConfig

Forward config from the inner provider.

The getter always returns the inner provider's config. The setter is a no-op that absorbs the self.config = ... assignment from LLMProvider.__init__().

Functions
validate_model async
validate_model() -> bool

Delegate model validation to inner provider.

Source code in packages/llm/src/dataknobs_llm/llm/providers/caching.py
async def validate_model(self) -> bool:
    """Delegate model validation to inner provider."""
    return await self._inner.validate_model()
get_capabilities
get_capabilities() -> List[ModelCapability]

Delegate capabilities to inner provider.

Source code in packages/llm/src/dataknobs_llm/llm/providers/caching.py
def get_capabilities(self) -> List[ModelCapability]:
    """Delegate capabilities to inner provider."""
    return self._inner.get_capabilities()
initialize async
initialize() -> None

Initialize the inner provider (if not already) and the cache.

Skips inner initialization when the inner provider is already initialized — e.g. when wrapping a provider that was created and initialized by DynaBot.from_config(). Re-initializing would open duplicate connections (aiohttp sessions for Ollama, etc.).

When the inner IS initialized here, _owns_inner is set to True so that close() knows to close it. When the inner was already initialized (pre-owned), close() leaves it alone.

Source code in packages/llm/src/dataknobs_llm/llm/providers/caching.py
async def initialize(self) -> None:
    """Initialize the inner provider (if not already) and the cache.

    Skips inner initialization when the inner provider is already
    initialized — e.g. when wrapping a provider that was created and
    initialized by ``DynaBot.from_config()``. Re-initializing would
    open duplicate connections (aiohttp sessions for Ollama, etc.).

    When the inner IS initialized here, ``_owns_inner`` is set to
    ``True`` so that ``close()`` knows to close it. When the inner
    was already initialized (pre-owned), ``close()`` leaves it alone.
    """
    if not self._inner.is_initialized:
        await self._inner.initialize()
        self._owns_inner = True
    await self._cache.initialize()
    self._is_initialized = True
close async
close() -> None

Close the cache, and the inner provider only if we own it.

When the inner provider was already initialized before wrapping (pre-owned), the original owner is responsible for closing it. Only providers that were initialized by this wrapper are closed.

Source code in packages/llm/src/dataknobs_llm/llm/providers/caching.py
async def close(self) -> None:
    """Close the cache, and the inner provider only if we own it.

    When the inner provider was already initialized before wrapping
    (pre-owned), the original owner is responsible for closing it.
    Only providers that were initialized by this wrapper are closed.
    """
    if self._is_closing:
        return
    self._is_closing = True
    if self._owns_inner:
        try:
            await self._inner.close()
        except Exception:
            logger.exception("Error closing inner provider")
    try:
        await self._cache.close()
    except Exception:
        logger.exception("Error closing embedding cache")
    self._is_initialized = False
    self._is_closing = False
    self._owns_inner = False
complete async
complete(
    messages: Union[str, List[LLMMessage]],
    config_overrides: Dict[str, Any] | None = None,
    tools: list[Any] | None = None,
    **kwargs: Any,
) -> LLMResponse

Delegate to inner provider.

Source code in packages/llm/src/dataknobs_llm/llm/providers/caching.py
async def complete(
    self,
    messages: Union[str, List[LLMMessage]],
    config_overrides: Dict[str, Any] | None = None,
    tools: list[Any] | None = None,
    **kwargs: Any,
) -> LLMResponse:
    """Delegate to inner provider."""
    return await self._inner.complete(
        messages, config_overrides=config_overrides, tools=tools, **kwargs
    )
stream_complete async
stream_complete(
    messages: Union[str, List[LLMMessage]],
    config_overrides: Dict[str, Any] | None = None,
    tools: list[Any] | None = None,
    **kwargs: Any,
) -> Any

Delegate to inner provider (async generator passthrough).

Source code in packages/llm/src/dataknobs_llm/llm/providers/caching.py
async def stream_complete(  # type: ignore[override]
    self,
    messages: Union[str, List[LLMMessage]],
    config_overrides: Dict[str, Any] | None = None,
    tools: list[Any] | None = None,
    **kwargs: Any,
) -> Any:
    """Delegate to inner provider (async generator passthrough)."""
    async for chunk in self._inner.stream_complete(
        messages, config_overrides=config_overrides, tools=tools, **kwargs
    ):
        yield chunk
function_call async
function_call(
    messages: List[LLMMessage], functions: List[Dict[str, Any]], **kwargs: Any
) -> LLMResponse

Delegate to inner provider.

Source code in packages/llm/src/dataknobs_llm/llm/providers/caching.py
async def function_call(
    self,
    messages: List[LLMMessage],
    functions: List[Dict[str, Any]],
    **kwargs: Any,
) -> LLMResponse:
    """Delegate to inner provider."""
    return await self._inner.function_call(messages, functions, **kwargs)
embed async
embed(
    texts: Union[str, List[str]], **kwargs: Any
) -> Union[List[float], List[List[float]]]

Return cached embeddings, delegating to inner on cache miss.

Handles both single-text and batch forms. For batches, only the uncached texts are sent to the inner provider. The inner provider always receives a list[str] and always returns list[list[float]].

Source code in packages/llm/src/dataknobs_llm/llm/providers/caching.py
async def embed(
    self,
    texts: Union[str, List[str]],
    **kwargs: Any,
) -> Union[List[float], List[List[float]]]:
    """Return cached embeddings, delegating to inner on cache miss.

    Handles both single-text and batch forms. For batches, only the
    uncached texts are sent to the inner provider. The inner provider
    always receives a ``list[str]`` and always returns
    ``list[list[float]]``.
    """
    self._check_ready()
    model = self.config.model
    single = isinstance(texts, str)
    text_list = [texts] if single else list(texts)

    # Batch cache lookup
    cached = await self._cache.get_batch(model, text_list)
    results: list[list[float]] = [[] for _ in text_list]
    misses: list[tuple[int, str]] = []

    for i, (text, cached_vec) in enumerate(zip(text_list, cached)):
        if cached_vec is not None:
            results[i] = cached_vec
        else:
            misses.append((i, text))

    # Delegate misses to inner provider
    if misses:
        miss_texts = [t for _, t in misses]
        miss_result = await self._inner.embed(miss_texts, **kwargs)

        # miss_texts is always list[str], so inner returns
        # list[list[float]]. Cast for type safety.
        miss_vectors: list[list[float]] = miss_result  # type: ignore[assignment]

        for (idx, _text), vector in zip(misses, miss_vectors):
            results[idx] = vector

        # Store all misses in cache
        await self._cache.put_batch(
            model,
            miss_texts,
            miss_vectors,
        )

    logger.debug(
        "Embedding cache: %d hits, %d misses (model=%s)",
        len(text_list) - len(misses),
        len(misses),
        model,
    )

    return results[0] if single else results

EmbeddingCache

Bases: ABC

Cache for embedding vectors, keyed by (model, text).

Methods:

Name Description
get

Retrieve a cached vector, or None on miss.

put

Store a vector in the cache.

get_batch

Retrieve cached vectors for a batch of texts.

put_batch

Store a batch of vectors in the cache.

initialize

Prepare the cache backend (create tables, open files, etc.).

close

Release cache resources.

clear

Remove all cached entries.

count

Return the number of cached entries.

Functions
get abstractmethod async
get(model: str, text: str) -> list[float] | None

Retrieve a cached vector, or None on miss.

Source code in packages/llm/src/dataknobs_llm/llm/providers/caching.py
@abstractmethod
async def get(self, model: str, text: str) -> list[float] | None:
    """Retrieve a cached vector, or ``None`` on miss."""
put abstractmethod async
put(model: str, text: str, vector: list[float]) -> None

Store a vector in the cache.

Source code in packages/llm/src/dataknobs_llm/llm/providers/caching.py
@abstractmethod
async def put(self, model: str, text: str, vector: list[float]) -> None:
    """Store a vector in the cache."""
get_batch abstractmethod async
get_batch(model: str, texts: list[str]) -> list[list[float] | None]

Retrieve cached vectors for a batch of texts.

Returns a list parallel to texts: each element is either the cached vector or None on a miss.

Source code in packages/llm/src/dataknobs_llm/llm/providers/caching.py
@abstractmethod
async def get_batch(
    self, model: str, texts: list[str]
) -> list[list[float] | None]:
    """Retrieve cached vectors for a batch of texts.

    Returns a list parallel to *texts*: each element is either the
    cached vector or ``None`` on a miss.
    """
put_batch abstractmethod async
put_batch(model: str, texts: list[str], vectors: list[list[float]]) -> None

Store a batch of vectors in the cache.

Source code in packages/llm/src/dataknobs_llm/llm/providers/caching.py
@abstractmethod
async def put_batch(
    self, model: str, texts: list[str], vectors: list[list[float]]
) -> None:
    """Store a batch of vectors in the cache."""
initialize abstractmethod async
initialize() -> None

Prepare the cache backend (create tables, open files, etc.).

Implementations MUST be idempotent — calling initialize() on an already-initialized cache is a no-op. This is required because CachingEmbedProvider.initialize() calls cache.initialize() unconditionally.

Source code in packages/llm/src/dataknobs_llm/llm/providers/caching.py
@abstractmethod
async def initialize(self) -> None:
    """Prepare the cache backend (create tables, open files, etc.).

    Implementations MUST be idempotent — calling ``initialize()``
    on an already-initialized cache is a no-op. This is required
    because ``CachingEmbedProvider.initialize()`` calls
    ``cache.initialize()`` unconditionally.
    """
close abstractmethod async
close() -> None

Release cache resources.

Implementations MUST be idempotent — calling close() on an already-closed or never-initialized cache is a no-op.

Source code in packages/llm/src/dataknobs_llm/llm/providers/caching.py
@abstractmethod
async def close(self) -> None:
    """Release cache resources.

    Implementations MUST be idempotent — calling ``close()`` on an
    already-closed or never-initialized cache is a no-op.
    """
clear abstractmethod async
clear() -> None

Remove all cached entries.

Source code in packages/llm/src/dataknobs_llm/llm/providers/caching.py
@abstractmethod
async def clear(self) -> None:
    """Remove all cached entries."""
count abstractmethod async
count() -> int

Return the number of cached entries.

Source code in packages/llm/src/dataknobs_llm/llm/providers/caching.py
@abstractmethod
async def count(self) -> int:
    """Return the number of cached entries."""

MemoryEmbeddingCache

MemoryEmbeddingCache()

Bases: EmbeddingCache

In-memory cache backend for testing.

Source code in packages/llm/src/dataknobs_llm/llm/providers/caching.py
def __init__(self) -> None:
    self._store: dict[str, list[float]] = {}

LLMProviderFactory

LLMProviderFactory(is_async: bool = True)

Factory for creating LLM providers from configuration.

This factory class integrates with the dataknobs Config system, allowing providers to be instantiated via Config.get_factory().

Example
from dataknobs_config import Config
config = Config({
    "llm": [{
        "name": "gpt4",
        "provider": "openai",
        "model": "gpt-4",
        "factory": "dataknobs_llm.LLMProviderFactory"
    }]
})
factory = config.get_factory("llm", "gpt4")
provider = factory.create(config.get("llm", "gpt4"))

Initialize the factory.

Parameters:

Name Type Description Default
is_async bool

Whether to create async providers (default: True)

True

Methods:

Name Description
create

Create an LLM provider from configuration.

register_provider

Register a custom provider class.

__call__

Allow factory to be called directly.

Source code in packages/llm/src/dataknobs_llm/llm/providers/__init__.py
def __init__(self, is_async: bool = True):
    """Initialize the factory.

    Args:
        is_async: Whether to create async providers (default: True)
    """
    self.is_async = is_async

    # Lazily populate provider registry
    if LLMProviderFactory._providers['openai'] is None:
        LLMProviderFactory._providers.update({
            'openai': OpenAIProvider,
            'anthropic': AnthropicProvider,
            'ollama': OllamaProvider,
            'huggingface': HuggingFaceProvider,
            'echo': EchoProvider,
        })
Functions
create
create(
    config: LLMConfig | Config | dict[str, Any], **kwargs: Any
) -> AsyncLLMProvider | SyncLLMProvider

Create an LLM provider from configuration.

Parameters:

Name Type Description Default
config LLMConfig | Config | dict[str, Any]

Configuration (LLMConfig, Config object, or dict)

required
**kwargs Any

Additional arguments passed to provider constructor

{}

Returns:

Type Description
AsyncLLMProvider | SyncLLMProvider

LLM provider instance

Raises:

Type Description
ValueError

If provider type is unknown

Source code in packages/llm/src/dataknobs_llm/llm/providers/__init__.py
def create(
    self,
    config: LLMConfig | Config | dict[str, Any],
    **kwargs: Any
) -> AsyncLLMProvider | SyncLLMProvider:
    """Create an LLM provider from configuration.

    Args:
        config: Configuration (LLMConfig, Config object, or dict)
        **kwargs: Additional arguments passed to provider constructor

    Returns:
        LLM provider instance

    Raises:
        ValueError: If provider type is unknown
    """
    # Normalize config to LLMConfig
    llm_config = normalize_llm_config(config)

    # Get provider class
    provider_class = self._providers.get(llm_config.provider.lower())
    if not provider_class:
        raise ValueError(
            f"Unknown provider: {llm_config.provider}. "
            f"Available providers: {list(self._providers.keys())}"
        )

    # Create provider instance
    if self.is_async:
        return provider_class(llm_config)
    else:
        # Wrap in sync adapter
        async_provider = provider_class(llm_config)
        return SyncProviderAdapter(async_provider)  # type: ignore
register_provider classmethod
register_provider(name: str, provider_class: type[AsyncLLMProvider]) -> None

Register a custom provider class.

Allows extending the factory with custom provider implementations.

Parameters:

Name Type Description Default
name str

Provider name (e.g., 'custom')

required
provider_class type[AsyncLLMProvider]

Provider class (must inherit from AsyncLLMProvider)

required
Example
class CustomProvider(AsyncLLMProvider):
    pass
LLMProviderFactory.register_provider('custom', CustomProvider)
Source code in packages/llm/src/dataknobs_llm/llm/providers/__init__.py
@classmethod
def register_provider(
    cls,
    name: str,
    provider_class: type[AsyncLLMProvider]
) -> None:
    """Register a custom provider class.

    Allows extending the factory with custom provider implementations.

    Args:
        name: Provider name (e.g., 'custom')
        provider_class: Provider class (must inherit from AsyncLLMProvider)

    Example:
        ```python
        class CustomProvider(AsyncLLMProvider):
            pass
        LLMProviderFactory.register_provider('custom', CustomProvider)
        ```
    """
    cls._providers[name.lower()] = provider_class
__call__
__call__(
    config: LLMConfig | Config | dict[str, Any], **kwargs: Any
) -> AsyncLLMProvider | SyncLLMProvider

Allow factory to be called directly.

Makes the factory callable for convenience.

Parameters:

Name Type Description Default
config LLMConfig | Config | dict[str, Any]

Configuration

required
**kwargs Any

Additional arguments

{}

Returns:

Type Description
AsyncLLMProvider | SyncLLMProvider

LLM provider instance

Source code in packages/llm/src/dataknobs_llm/llm/providers/__init__.py
def __call__(
    self,
    config: LLMConfig | Config | dict[str, Any],
    **kwargs: Any
) -> AsyncLLMProvider | SyncLLMProvider:
    """Allow factory to be called directly.

    Makes the factory callable for convenience.

    Args:
        config: Configuration
        **kwargs: Additional arguments

    Returns:
        LLM provider instance
    """
    return self.create(config, **kwargs)

TemplateStrategy

Bases: Enum

Template rendering strategies.

MessageTemplate dataclass

MessageTemplate(
    template: str,
    variables: List[str] = list(),
    strategy: TemplateStrategy = TemplateStrategy.SIMPLE,
)

Template for generating message content with multiple rendering strategies.

Supports two template strategies: 1. SIMPLE (default): Uses Python str.format() with {variable} syntax. - All variables must be provided - Clean and straightforward - Example: "Hello {name}!"

  1. CONDITIONAL: Advanced conditional rendering with {{variable}} and ((conditional)) syntax.
  2. Variables can be optional
  3. Conditional sections with (( ... ))
  4. Whitespace-aware substitution
  5. Example: "Hello {{name}}((, you have {{count}} messages))"

Methods:

Name Description
__post_init__

Extract variables from template based on strategy.

format

Format template with variables using the selected strategy.

partial

Create partial template with some variables filled.

from_conditional

Create a MessageTemplate using the CONDITIONAL strategy.

Functions
__post_init__
__post_init__()

Extract variables from template based on strategy.

Source code in packages/llm/src/dataknobs_llm/llm/utils.py
def __post_init__(self):
    """Extract variables from template based on strategy."""
    if not self.variables:
        if self.strategy == TemplateStrategy.SIMPLE:
            # Extract {variable} patterns (single braces)
            self.variables = re.findall(r'\{(\w+)\}', self.template)
        elif self.strategy == TemplateStrategy.CONDITIONAL:
            # Extract {{variable}} patterns (double braces)
            # Extract just the variable names (group 2 from the regex)
            self.variables = [match.group(2) for match in re.finditer(r'\{\{(\s*)(\w+)(\s*)\}\}', self.template)]
            # Remove duplicates while preserving order
            seen = set()
            unique_vars = []
            for var in self.variables:
                if var not in seen:
                    seen.add(var)
                    unique_vars.append(var)
            self.variables = unique_vars
format
format(**kwargs: Any) -> str

Format template with variables using the selected strategy.

Parameters:

Name Type Description Default
**kwargs Any

Variable values

{}

Returns:

Type Description
str

Formatted prompt

Raises:

Type Description
ValueError

If using SIMPLE strategy and required variables are missing

Source code in packages/llm/src/dataknobs_llm/llm/utils.py
def format(self, **kwargs: Any) -> str:
    """Format template with variables using the selected strategy.

    Args:
        **kwargs: Variable values

    Returns:
        Formatted prompt

    Raises:
        ValueError: If using SIMPLE strategy and required variables are missing
    """
    if self.strategy == TemplateStrategy.SIMPLE:
        # Simple strategy: all variables must be provided
        missing = set(self.variables) - set(kwargs.keys())
        if missing:
            raise ValueError(f"Missing variables: {missing}")
        return self.template.format(**kwargs)

    elif self.strategy == TemplateStrategy.CONDITIONAL:
        # Conditional strategy: use render_conditional_template
        return render_conditional_template(self.template, kwargs)

    else:
        raise ValueError(f"Unknown template strategy: {self.strategy}")
partial
partial(**kwargs: Any) -> MessageTemplate

Create partial template with some variables filled.

Parameters:

Name Type Description Default
**kwargs Any

Variable values to fill

{}

Returns:

Type Description
MessageTemplate

New template with partial values

Source code in packages/llm/src/dataknobs_llm/llm/utils.py
def partial(self, **kwargs: Any) -> 'MessageTemplate':
    """Create partial template with some variables filled.

    Args:
        **kwargs: Variable values to fill

    Returns:
        New template with partial values
    """
    if self.strategy == TemplateStrategy.SIMPLE:
        # Simple strategy: replace {variable} patterns
        new_template = self.template
        new_variables = self.variables.copy()

        for key, value in kwargs.items():
            if key in new_variables:
                new_template = new_template.replace(f'{{{key}}}', str(value))
                new_variables.remove(key)

        return MessageTemplate(new_template, new_variables, self.strategy)

    elif self.strategy == TemplateStrategy.CONDITIONAL:
        # For conditional templates, render with provided variables
        # and keep the template structure for remaining variables
        new_template = self.template
        new_variables = self.variables.copy()

        # Replace only the provided variables with single-brace format
        # so they become literals in the new template
        for key, value in kwargs.items():
            if key in new_variables:
                # Replace {{var}} with the value, but keep it as a literal
                # We do this by using a placeholder that won't match the patterns
                pattern = r'\{\{\s*' + key + r'\s*\}\}'
                new_template = re.sub(
                    pattern,
                    str(value),
                    new_template
                )
                new_variables.remove(key)

        return MessageTemplate(new_template, new_variables, self.strategy)

    else:
        raise ValueError(f"Unknown template strategy: {self.strategy}")
from_conditional classmethod
from_conditional(
    template: str, variables: List[str] | None = None
) -> MessageTemplate

Create a MessageTemplate using the CONDITIONAL strategy.

Convenience method for creating templates with advanced conditional rendering.

Parameters:

Name Type Description Default
template str

Template string with {{variable}} and ((conditional)) syntax

required
variables List[str] | None

Optional explicit list of variables

None

Returns:

Type Description
MessageTemplate

MessageTemplate configured with CONDITIONAL strategy

Example
template = MessageTemplate.from_conditional(
    "Hello {{name}}((, you have {{count}} messages))"
)
template.format(name="Alice", count=5)
# "Hello Alice, you have 5 messages"
template.format(name="Bob")
# "Hello Bob"
Source code in packages/llm/src/dataknobs_llm/llm/utils.py
@classmethod
def from_conditional(cls, template: str, variables: List[str] | None = None) -> 'MessageTemplate':
    """Create a MessageTemplate using the CONDITIONAL strategy.

    Convenience method for creating templates with advanced conditional rendering.

    Args:
        template: Template string with {{variable}} and ((conditional)) syntax
        variables: Optional explicit list of variables

    Returns:
        MessageTemplate configured with CONDITIONAL strategy

    Example:
        ```python
        template = MessageTemplate.from_conditional(
            "Hello {{name}}((, you have {{count}} messages))"
        )
        template.format(name="Alice", count=5)
        # "Hello Alice, you have 5 messages"
        template.format(name="Bob")
        # "Hello Bob"
        ```
    """
    return cls(template=template, variables=variables or [], strategy=TemplateStrategy.CONDITIONAL)

MessageBuilder

MessageBuilder()

Builder for constructing message sequences.

Methods:

Name Description
system

Add system message.

user

Add user message.

assistant

Add assistant message.

function

Add function message.

from_template

Add message from template.

build

Build message list.

clear

Clear all messages.

Source code in packages/llm/src/dataknobs_llm/llm/utils.py
def __init__(self):
    self.messages = []
Functions
system
system(content: str) -> MessageBuilder

Add system message.

Parameters:

Name Type Description Default
content str

Message content

required

Returns:

Type Description
MessageBuilder

Self for chaining

Source code in packages/llm/src/dataknobs_llm/llm/utils.py
def system(self, content: str) -> 'MessageBuilder':
    """Add system message.

    Args:
        content: Message content

    Returns:
        Self for chaining
    """
    self.messages.append(LLMMessage(role='system', content=content))
    return self
user
user(content: str) -> MessageBuilder

Add user message.

Parameters:

Name Type Description Default
content str

Message content

required

Returns:

Type Description
MessageBuilder

Self for chaining

Source code in packages/llm/src/dataknobs_llm/llm/utils.py
def user(self, content: str) -> 'MessageBuilder':
    """Add user message.

    Args:
        content: Message content

    Returns:
        Self for chaining
    """
    self.messages.append(LLMMessage(role='user', content=content))
    return self
assistant
assistant(content: str) -> MessageBuilder

Add assistant message.

Parameters:

Name Type Description Default
content str

Message content

required

Returns:

Type Description
MessageBuilder

Self for chaining

Source code in packages/llm/src/dataknobs_llm/llm/utils.py
def assistant(self, content: str) -> 'MessageBuilder':
    """Add assistant message.

    Args:
        content: Message content

    Returns:
        Self for chaining
    """
    self.messages.append(LLMMessage(role='assistant', content=content))
    return self
function
function(
    name: str, content: str, function_call: Dict[str, Any] | None = None
) -> MessageBuilder

Add function message.

Parameters:

Name Type Description Default
name str

Function name

required
content str

Function result

required
function_call Dict[str, Any] | None

Function call details

None

Returns:

Type Description
MessageBuilder

Self for chaining

Source code in packages/llm/src/dataknobs_llm/llm/utils.py
def function(
    self,
    name: str,
    content: str,
    function_call: Dict[str, Any] | None = None
) -> 'MessageBuilder':
    """Add function message.

    Args:
        name: Function name
        content: Function result
        function_call: Function call details

    Returns:
        Self for chaining
    """
    self.messages.append(LLMMessage(
        role='function',
        name=name,
        content=content,
        function_call=function_call
    ))
    return self
from_template
from_template(
    role: str, template: MessageTemplate, **kwargs: Any
) -> MessageBuilder

Add message from template.

Parameters:

Name Type Description Default
role str

Message role

required
template MessageTemplate

Message template

required
**kwargs Any

Template variables

{}

Returns:

Type Description
MessageBuilder

Self for chaining

Source code in packages/llm/src/dataknobs_llm/llm/utils.py
def from_template(
    self,
    role: str,
    template: MessageTemplate,
    **kwargs: Any
) -> 'MessageBuilder':
    """Add message from template.

    Args:
        role: Message role
        template: Message template
        **kwargs: Template variables

    Returns:
        Self for chaining
    """
    content = template.format(**kwargs)
    self.messages.append(LLMMessage(role=role, content=content))
    return self
build
build() -> List[LLMMessage]

Build message list.

Returns:

Type Description
List[LLMMessage]

List of messages

Source code in packages/llm/src/dataknobs_llm/llm/utils.py
def build(self) -> List[LLMMessage]:
    """Build message list.

    Returns:
        List of messages
    """
    return self.messages.copy()
clear
clear() -> MessageBuilder

Clear all messages.

Returns:

Type Description
MessageBuilder

Self for chaining

Source code in packages/llm/src/dataknobs_llm/llm/utils.py
def clear(self) -> 'MessageBuilder':
    """Clear all messages.

    Returns:
        Self for chaining
    """
    self.messages.clear()
    return self

ResponseParser

Parser for LLM responses.

Methods:

Name Description
extract_json

Extract JSON from response.

extract_code

Extract code blocks from response.

extract_list

Extract list items from response.

extract_sections

Extract sections from response.

Functions
extract_json staticmethod
extract_json(response: Union[str, LLMResponse]) -> Dict[str, Any] | None

Extract JSON from response.

Parameters:

Name Type Description Default
response Union[str, LLMResponse]

LLM response

required

Returns:

Type Description
Dict[str, Any] | None

Extracted JSON or None

Source code in packages/llm/src/dataknobs_llm/llm/utils.py
@staticmethod
def extract_json(response: Union[str, LLMResponse]) -> Dict[str, Any] | None:
    """Extract JSON from response.

    Args:
        response: LLM response

    Returns:
        Extracted JSON or None
    """
    text = response.content if isinstance(response, LLMResponse) else response

    # Try to find JSON in the text
    json_patterns = [
        r'\{[^}]*\}',  # Simple object
        r'\[[^\]]*\]',  # Array
        r'```json\s*(.*?)\s*```',  # Markdown code block
        r'```\s*(.*?)\s*```',  # Generic code block
    ]

    for pattern in json_patterns:
        matches = re.findall(pattern, text, re.DOTALL)
        for match in matches:
            try:
                return json.loads(match)
            except json.JSONDecodeError:
                continue

    # Try parsing the entire text as JSON
    try:
        return json.loads(text)
    except json.JSONDecodeError:
        return None
extract_code staticmethod
extract_code(
    response: Union[str, LLMResponse], language: str | None = None
) -> List[str]

Extract code blocks from response.

Parameters:

Name Type Description Default
response Union[str, LLMResponse]

LLM response

required
language str | None

Optional language filter

None

Returns:

Type Description
List[str]

List of code blocks

Source code in packages/llm/src/dataknobs_llm/llm/utils.py
@staticmethod
def extract_code(
    response: Union[str, LLMResponse],
    language: str | None = None
) -> List[str]:
    """Extract code blocks from response.

    Args:
        response: LLM response
        language: Optional language filter

    Returns:
        List of code blocks
    """
    text = response.content if isinstance(response, LLMResponse) else response

    if language:
        # Language-specific code blocks
        pattern = rf'```{language}\s*(.*?)\s*```'
    else:
        # All code blocks
        pattern = r'```(?:\w+)?\s*(.*?)\s*```'

    matches = re.findall(pattern, text, re.DOTALL)
    return [m.strip() for m in matches]
extract_list staticmethod
extract_list(
    response: Union[str, LLMResponse], numbered: bool = False
) -> List[str]

Extract list items from response.

Parameters:

Name Type Description Default
response Union[str, LLMResponse]

LLM response

required
numbered bool

Whether to look for numbered lists

False

Returns:

Type Description
List[str]

List of items

Source code in packages/llm/src/dataknobs_llm/llm/utils.py
@staticmethod
def extract_list(
    response: Union[str, LLMResponse],
    numbered: bool = False
) -> List[str]:
    """Extract list items from response.

    Args:
        response: LLM response
        numbered: Whether to look for numbered lists

    Returns:
        List of items
    """
    text = response.content if isinstance(response, LLMResponse) else response

    if numbered:
        # Numbered list (1. item, 2. item, etc.)
        pattern = r'^\d+\.\s+(.+)$'
    else:
        # Bullet points (-, *, •)
        pattern = r'^[-*•]\s+(.+)$'

    matches = re.findall(pattern, text, re.MULTILINE)
    return [m.strip() for m in matches]
extract_sections staticmethod
extract_sections(response: Union[str, LLMResponse]) -> Dict[str, str]

Extract sections from response.

Parameters:

Name Type Description Default
response Union[str, LLMResponse]

LLM response

required

Returns:

Type Description
Dict[str, str]

Dictionary of section name to content

Source code in packages/llm/src/dataknobs_llm/llm/utils.py
@staticmethod
def extract_sections(
    response: Union[str, LLMResponse]
) -> Dict[str, str]:
    """Extract sections from response.

    Args:
        response: LLM response

    Returns:
        Dictionary of section name to content
    """
    text = response.content if isinstance(response, LLMResponse) else response

    # Split by headers (# Header, ## Header, etc.)
    sections = {}
    current_section = 'main'
    current_content = []

    for line in text.split('\n'):
        header_match = re.match(r'^#+\s+(.+)$', line)
        if header_match:
            # Save previous section
            if current_content:
                sections[current_section] = '\n'.join(current_content).strip()
            # Start new section
            current_section = header_match.group(1).strip()
            current_content = []
        else:
            current_content.append(line)

    # Save last section
    if current_content:
        sections[current_section] = '\n'.join(current_content).strip()

    return sections

TokenCounter

Estimate token counts for different models.

Methods:

Name Description
estimate_tokens

Estimate token count for text.

estimate_messages_tokens

Estimate token count for messages.

fits_in_context

Check if text fits in context window.

Functions
estimate_tokens classmethod
estimate_tokens(text: str, model: str = 'default') -> int

Estimate token count for text.

Parameters:

Name Type Description Default
text str

Input text

required
model str

Model name

'default'

Returns:

Type Description
int

Estimated token count

Source code in packages/llm/src/dataknobs_llm/llm/utils.py
@classmethod
def estimate_tokens(
    cls,
    text: str,
    model: str = 'default'
) -> int:
    """Estimate token count for text.

    Args:
        text: Input text
        model: Model name

    Returns:
        Estimated token count
    """
    # Find matching model pattern
    ratio = cls.TOKENS_PER_CHAR['default']
    for pattern, r in cls.TOKENS_PER_CHAR.items():
        if pattern in model.lower():
            ratio = r
            break

    # Estimate based on character count
    return int(len(text) * ratio)
estimate_messages_tokens classmethod
estimate_messages_tokens(
    messages: List[LLMMessage], model: str = "default"
) -> int

Estimate token count for messages.

Parameters:

Name Type Description Default
messages List[LLMMessage]

List of messages

required
model str

Model name

'default'

Returns:

Type Description
int

Estimated token count

Source code in packages/llm/src/dataknobs_llm/llm/utils.py
@classmethod
def estimate_messages_tokens(
    cls,
    messages: List[LLMMessage],
    model: str = 'default'
) -> int:
    """Estimate token count for messages.

    Args:
        messages: List of messages
        model: Model name

    Returns:
        Estimated token count
    """
    total = 0
    for msg in messages:
        # Add role tokens (approximately 4 tokens)
        total += 4
        # Add content tokens
        total += cls.estimate_tokens(msg.content, model)
        # Add name tokens if present
        if msg.name:
            total += cls.estimate_tokens(msg.name, model)

    return total
fits_in_context classmethod
fits_in_context(text: str, model: str, max_tokens: int) -> bool

Check if text fits in context window.

Parameters:

Name Type Description Default
text str

Input text

required
model str

Model name

required
max_tokens int

Maximum token limit

required

Returns:

Type Description
bool

True if fits

Source code in packages/llm/src/dataknobs_llm/llm/utils.py
@classmethod
def fits_in_context(
    cls,
    text: str,
    model: str,
    max_tokens: int
) -> bool:
    """Check if text fits in context window.

    Args:
        text: Input text
        model: Model name
        max_tokens: Maximum token limit

    Returns:
        True if fits
    """
    estimated = cls.estimate_tokens(text, model)
    return estimated <= max_tokens

CostCalculator

Calculate costs for LLM usage.

Methods:

Name Description
calculate_cost

Calculate cost for LLM response.

estimate_cost

Estimate cost for text completion.

Functions
calculate_cost classmethod
calculate_cost(response: LLMResponse, model: str | None = None) -> float | None

Calculate cost for LLM response.

Parameters:

Name Type Description Default
response LLMResponse

LLM response with usage info

required
model str | None

Model name (if not in response)

None

Returns:

Type Description
float | None

Cost in USD or None if cannot calculate

Source code in packages/llm/src/dataknobs_llm/llm/utils.py
@classmethod
def calculate_cost(
    cls,
    response: LLMResponse,
    model: str | None = None
) -> float | None:
    """Calculate cost for LLM response.

    Args:
        response: LLM response with usage info
        model: Model name (if not in response)

    Returns:
        Cost in USD or None if cannot calculate
    """
    if not response.usage:
        return None

    model = model or response.model

    # Find matching pricing
    pricing = None
    for pattern, prices in cls.PRICING.items():
        if pattern in model.lower():
            pricing = prices
            break

    if not pricing:
        return None

    # Calculate cost
    input_cost = (response.usage.get('prompt_tokens', 0) / 1000) * pricing['input']
    output_cost = (response.usage.get('completion_tokens', 0) / 1000) * pricing['output']

    return input_cost + output_cost
estimate_cost classmethod
estimate_cost(
    text: str, model: str, expected_output_tokens: int = 100
) -> float | None

Estimate cost for text completion.

Parameters:

Name Type Description Default
text str

Input text

required
model str

Model name

required
expected_output_tokens int

Expected output length

100

Returns:

Type Description
float | None

Estimated cost in USD

Source code in packages/llm/src/dataknobs_llm/llm/utils.py
@classmethod
def estimate_cost(
    cls,
    text: str,
    model: str,
    expected_output_tokens: int = 100
) -> float | None:
    """Estimate cost for text completion.

    Args:
        text: Input text
        model: Model name
        expected_output_tokens: Expected output length

    Returns:
        Estimated cost in USD
    """
    # Find matching pricing
    pricing = None
    for pattern, prices in cls.PRICING.items():
        if pattern in model.lower():
            pricing = prices
            break

    if not pricing:
        return None

    # Estimate tokens
    input_tokens = TokenCounter.estimate_tokens(text, model)

    # Calculate cost
    input_cost = (input_tokens / 1000) * pricing['input']
    output_cost = (expected_output_tokens / 1000) * pricing['output']

    return input_cost + output_cost

Tool

Tool(name: str, description: str, metadata: Dict[str, Any] | None = None)

Bases: ABC

Abstract base class for LLM-callable tools.

A Tool represents a function that can be called by an LLM during generation. Each tool has a name, description, parameter schema, and execution logic.

Example

class CalculatorTool(Tool): def init(self): super().init( name="calculator", description="Performs basic arithmetic operations" )

@property
def schema(self) -> Dict[str, Any]:
    return {
        "type": "object",
        "properties": {
            "operation": {
                "type": "string",
                "enum": ["add", "subtract", "multiply", "divide"]
            },
            "a": {"type": "number"},
            "b": {"type": "number"}
        },
        "required": ["operation", "a", "b"]
    }

async def execute(self, operation: str, a: float, b: float) -> float:
    if operation == "add":
        return a + b
    elif operation == "subtract":
        return a - b
    elif operation == "multiply":
        return a * b
    elif operation == "divide":
        return a / b
    else:
        raise ValueError(f"Unknown operation: {operation}")

Initialize a tool.

Parameters:

Name Type Description Default
name str

Unique identifier for the tool

required
description str

Human-readable description of what the tool does

required
metadata Dict[str, Any] | None

Optional metadata about the tool

None

Methods:

Name Description
execute

Execute the tool with given parameters.

to_function_definition

Convert tool to OpenAI function calling format.

to_anthropic_tool_definition

Convert tool to Anthropic tool format.

validate_parameters

Validate parameters against schema.

__repr__

String representation of tool.

__str__

Human-readable string representation.

Attributes:

Name Type Description
schema Dict[str, Any]

Get JSON schema for tool parameters.

Source code in packages/llm/src/dataknobs_llm/tools/base.py
def __init__(
    self,
    name: str,
    description: str,
    metadata: Dict[str, Any] | None = None
):
    """Initialize a tool.

    Args:
        name: Unique identifier for the tool
        description: Human-readable description of what the tool does
        metadata: Optional metadata about the tool
    """
    self.name = name
    self.description = description
    self.metadata = metadata or {}
Attributes
schema abstractmethod property
schema: Dict[str, Any]

Get JSON schema for tool parameters.

Returns a JSON Schema dictionary describing the parameters this tool accepts. The schema should follow the JSON Schema specification and is used by LLMs to understand how to call the tool.

Returns:

Type Description
Dict[str, Any]

JSON Schema dictionary for tool parameters

Example

{ "type": "object", "properties": { "query": { "type": "string", "description": "The search query" }, "max_results": { "type": "integer", "description": "Maximum number of results", "default": 10 } }, "required": ["query"] }

Functions
execute abstractmethod async
execute(**kwargs: Any) -> Any

Execute the tool with given parameters.

This method performs the actual tool logic. Parameters are passed as keyword arguments matching the schema definition.

Parameters:

Name Type Description Default
**kwargs Any

Tool parameters matching the schema

{}

Returns:

Type Description
Any

Tool execution result (can be any JSON-serializable type)

Raises:

Type Description
Exception

If tool execution fails

Source code in packages/llm/src/dataknobs_llm/tools/base.py
@abstractmethod
async def execute(self, **kwargs: Any) -> Any:
    """Execute the tool with given parameters.

    This method performs the actual tool logic. Parameters are passed
    as keyword arguments matching the schema definition.

    Args:
        **kwargs: Tool parameters matching the schema

    Returns:
        Tool execution result (can be any JSON-serializable type)

    Raises:
        Exception: If tool execution fails
    """
    pass
to_function_definition
to_function_definition() -> Dict[str, Any]

Convert tool to OpenAI function calling format.

Returns a dictionary in the format expected by OpenAI's function calling API.

Returns:

Type Description
Dict[str, Any]

Function definition dictionary

Example

{ "name": "search_web", "description": "Search the web for information", "parameters": { "type": "object", "properties": { "query": {"type": "string"} }, "required": ["query"] } }

Source code in packages/llm/src/dataknobs_llm/tools/base.py
def to_function_definition(self) -> Dict[str, Any]:
    """Convert tool to OpenAI function calling format.

    Returns a dictionary in the format expected by OpenAI's function
    calling API.

    Returns:
        Function definition dictionary

    Example:
        {
            "name": "search_web",
            "description": "Search the web for information",
            "parameters": {
                "type": "object",
                "properties": {
                    "query": {"type": "string"}
                },
                "required": ["query"]
            }
        }
    """
    return {
        "name": self.name,
        "description": self.description,
        "parameters": self.schema,
    }
to_anthropic_tool_definition
to_anthropic_tool_definition() -> Dict[str, Any]

Convert tool to Anthropic tool format.

Returns a dictionary in the format expected by Anthropic's Claude API.

Returns:

Type Description
Dict[str, Any]

Tool definition dictionary

Source code in packages/llm/src/dataknobs_llm/tools/base.py
def to_anthropic_tool_definition(self) -> Dict[str, Any]:
    """Convert tool to Anthropic tool format.

    Returns a dictionary in the format expected by Anthropic's Claude API.

    Returns:
        Tool definition dictionary
    """
    return {
        "name": self.name,
        "description": self.description,
        "input_schema": self.schema,
    }
validate_parameters
validate_parameters(**kwargs: Any) -> bool

Validate parameters against schema.

Optional method for parameter validation before execution. By default, assumes LLM provides valid parameters.

Parameters:

Name Type Description Default
**kwargs Any

Parameters to validate

{}

Returns:

Type Description
bool

True if valid, False otherwise

Source code in packages/llm/src/dataknobs_llm/tools/base.py
def validate_parameters(self, **kwargs: Any) -> bool:
    """Validate parameters against schema.

    Optional method for parameter validation before execution.
    By default, assumes LLM provides valid parameters.

    Args:
        **kwargs: Parameters to validate

    Returns:
        True if valid, False otherwise
    """
    # Basic validation - check required fields
    required = self.schema.get("required", [])
    return all(field in kwargs for field in required)
__repr__
__repr__() -> str

String representation of tool.

Source code in packages/llm/src/dataknobs_llm/tools/base.py
def __repr__(self) -> str:
    """String representation of tool."""
    return f"Tool(name={self.name!r}, description={self.description!r})"
__str__
__str__() -> str

Human-readable string representation.

Source code in packages/llm/src/dataknobs_llm/tools/base.py
def __str__(self) -> str:
    """Human-readable string representation."""
    return f"{self.name}: {self.description}"

ToolRegistry

ToolRegistry(track_executions: bool = False, max_execution_history: int = 100)

Bases: Registry[Tool]

Registry for managing available tools/functions.

The ToolRegistry provides a central place to register and discover tools that can be called by LLMs. It supports tool registration, retrieval, listing, and conversion to function calling formats.

Built on dataknobs_common.Registry for consistency across the ecosystem.

Example
# Create registry
registry = ToolRegistry()

# Register tools
registry.register_tool(CalculatorTool())
registry.register_tool(WebSearchTool())

# Check if tool exists
if registry.has_tool("calculator"):
    tool = registry.get_tool("calculator")
    result = await tool.execute(operation="add", a=5, b=3)

# Get all tools for LLM function calling
functions = registry.to_function_definitions()

# List available tools
tools = registry.list_tools()
for tool_info in tools:
    print(f"{tool_info['name']}: {tool_info['description']}")

Initialize a tool registry.

Parameters:

Name Type Description Default
track_executions bool

If True, record execution history for observability and debugging. Default False.

False
max_execution_history int

Maximum number of execution records to retain when tracking is enabled. Default 100.

100

Methods:

Name Description
register_tool

Register a tool.

register_many

Register multiple tools at once.

get_tool

Get a tool by name.

has_tool

Check if a tool with the given name exists.

list_tools

List all registered tools with their metadata.

get_tool_names

Get list of all registered tool names.

to_function_definitions

Convert tools to OpenAI function calling format.

to_anthropic_tool_definitions

Convert tools to Anthropic tool format.

execute_tool

Execute a tool by name with given parameters.

get_execution_history

Query tool execution history.

get_execution_stats

Get aggregated execution statistics.

clear_execution_history

Clear all execution history.

execution_history_count

Get number of records in execution history.

filter_by_metadata

Filter tools by metadata attributes.

clone

Create a shallow copy of this registry.

__repr__

String representation of registry.

__str__

Human-readable string representation.

Attributes:

Name Type Description
tracking_enabled bool

Check if execution tracking is enabled.

Source code in packages/llm/src/dataknobs_llm/tools/registry.py
def __init__(
    self,
    track_executions: bool = False,
    max_execution_history: int = 100,
):
    """Initialize a tool registry.

    Args:
        track_executions: If True, record execution history for
            observability and debugging. Default False.
        max_execution_history: Maximum number of execution records
            to retain when tracking is enabled. Default 100.
    """
    super().__init__(name="tools", enable_metrics=True)
    self._track_executions = track_executions
    self._execution_tracker: ExecutionTracker | None = (
        ExecutionTracker(max_history=max_execution_history)
        if track_executions
        else None
    )
Attributes
tracking_enabled property
tracking_enabled: bool

Check if execution tracking is enabled.

Returns:

Type Description
bool

True if tracking is enabled, False otherwise

Functions
register_tool
register_tool(tool: Tool) -> None

Register a tool.

Parameters:

Name Type Description Default
tool Tool

Tool instance to register

required

Raises:

Type Description
OperationError

If a tool with the same name already exists

Example

calculator = CalculatorTool() registry.register_tool(calculator)

Source code in packages/llm/src/dataknobs_llm/tools/registry.py
def register_tool(self, tool: Tool) -> None:
    """Register a tool.

    Args:
        tool: Tool instance to register

    Raises:
        OperationError: If a tool with the same name already exists

    Example:
        >>> calculator = CalculatorTool()
        >>> registry.register_tool(calculator)
    """
    try:
        self.register(
            tool.name,
            tool,
            metadata={"description": tool.description, "schema": tool.schema},
        )
    except OperationError as e:
        # Re-raise with more specific message for backward compatibility
        raise OperationError(
            f"Tool with name '{tool.name}' already registered. "
            f"Use unregister() first or choose a different name.",
            context=e.context,
        ) from e
register_many
register_many(tools: List[Tool]) -> None

Register multiple tools at once.

Parameters:

Name Type Description Default
tools List[Tool]

List of Tool instances to register

required

Raises:

Type Description
OperationError

If any tool name conflicts with existing tools

Example

tools = [CalculatorTool(), SearchTool(), WeatherTool()] registry.register_many(tools)

Source code in packages/llm/src/dataknobs_llm/tools/registry.py
def register_many(self, tools: List[Tool]) -> None:
    """Register multiple tools at once.

    Args:
        tools: List of Tool instances to register

    Raises:
        OperationError: If any tool name conflicts with existing tools

    Example:
        >>> tools = [CalculatorTool(), SearchTool(), WeatherTool()]
        >>> registry.register_many(tools)
    """
    # Check for conflicts first
    for tool in tools:
        if self.has(tool.name):
            raise OperationError(
                f"Tool with name '{tool.name}' already registered",
                context={"tool_name": tool.name, "registry": self.name},
            )

    # Register all tools
    for tool in tools:
        self.register(
            tool.name,
            tool,
            metadata={"description": tool.description, "schema": tool.schema},
        )
get_tool
get_tool(name: str) -> Tool

Get a tool by name.

Parameters:

Name Type Description Default
name str

Name of the tool to retrieve

required

Returns:

Type Description
Tool

Tool instance

Raises:

Type Description
NotFoundError

If no tool with the given name exists

Example

tool = registry.get_tool("calculator") result = await tool.execute(operation="add", a=5, b=3)

Source code in packages/llm/src/dataknobs_llm/tools/registry.py
def get_tool(self, name: str) -> Tool:
    """Get a tool by name.

    Args:
        name: Name of the tool to retrieve

    Returns:
        Tool instance

    Raises:
        NotFoundError: If no tool with the given name exists

    Example:
        >>> tool = registry.get_tool("calculator")
        >>> result = await tool.execute(operation="add", a=5, b=3)
    """
    try:
        return self.get(name)
    except NotFoundError as e:
        # Re-raise with more specific message for backward compatibility
        raise NotFoundError(
            f"Tool not found: {name}",
            context=e.context,
        ) from e
has_tool
has_tool(name: str) -> bool

Check if a tool with the given name exists.

Parameters:

Name Type Description Default
name str

Name of the tool to check

required

Returns:

Type Description
bool

True if tool exists, False otherwise

Example
if registry.has_tool("calculator"):
    print("Calculator available")
Source code in packages/llm/src/dataknobs_llm/tools/registry.py
def has_tool(self, name: str) -> bool:
    """Check if a tool with the given name exists.

    Args:
        name: Name of the tool to check

    Returns:
        True if tool exists, False otherwise

    Example:
        ```python
        if registry.has_tool("calculator"):
            print("Calculator available")
        ```
    """
    return self.has(name)
list_tools
list_tools() -> List[Dict[str, Any]]

List all registered tools with their metadata.

Returns:

Type Description
List[Dict[str, Any]]

List of dictionaries containing tool information

Example
tools = registry.list_tools()
for tool_info in tools:
    print(f"{tool_info['name']}: {tool_info['description']}")

Returns format: [ { "name": "calculator", "description": "Performs arithmetic operations", "schema": {...}, "metadata": {...} }, ... ]

Source code in packages/llm/src/dataknobs_llm/tools/registry.py
def list_tools(self) -> List[Dict[str, Any]]:
    """List all registered tools with their metadata.

    Returns:
        List of dictionaries containing tool information

    Example:
        ```python
        tools = registry.list_tools()
        for tool_info in tools:
            print(f"{tool_info['name']}: {tool_info['description']}")
        ```

        Returns format:
        [
            {
                "name": "calculator",
                "description": "Performs arithmetic operations",
                "schema": {...},
                "metadata": {...}
            },
            ...
        ]
    """
    return [
        {
            "name": tool.name,
            "description": tool.description,
            "schema": tool.schema,
            "metadata": tool.metadata,
        }
        for tool in self.list_items()
    ]
get_tool_names
get_tool_names() -> List[str]

Get list of all registered tool names.

Returns:

Type Description
List[str]

List of tool names

Example

names = registry.get_tool_names() print(names) ['calculator', 'search', 'weather']

Source code in packages/llm/src/dataknobs_llm/tools/registry.py
def get_tool_names(self) -> List[str]:
    """Get list of all registered tool names.

    Returns:
        List of tool names

    Example:
        >>> names = registry.get_tool_names()
        >>> print(names)
        ['calculator', 'search', 'weather']
    """
    return self.list_keys()
to_function_definitions
to_function_definitions(
    include_only: Set[str] | None = None, exclude: Set[str] | None = None
) -> List[Dict[str, Any]]

Convert tools to OpenAI function calling format.

Parameters:

Name Type Description Default
include_only Set[str] | None

If provided, only include tools with these names

None
exclude Set[str] | None

If provided, exclude tools with these names

None

Returns:

Type Description
List[Dict[str, Any]]

List of function definition dictionaries

Example
# Get all tools
functions = registry.to_function_definitions()

# Get only specific tools
functions = registry.to_function_definitions(
    include_only={"calculator", "web_search"}
)

# Get all except specific tools
functions = registry.to_function_definitions(
    exclude={"dangerous_tool"}
)
Source code in packages/llm/src/dataknobs_llm/tools/registry.py
def to_function_definitions(
    self, include_only: Set[str] | None = None, exclude: Set[str] | None = None
) -> List[Dict[str, Any]]:
    """Convert tools to OpenAI function calling format.

    Args:
        include_only: If provided, only include tools with these names
        exclude: If provided, exclude tools with these names

    Returns:
        List of function definition dictionaries

    Example:
        ```python
        # Get all tools
        functions = registry.to_function_definitions()

        # Get only specific tools
        functions = registry.to_function_definitions(
            include_only={"calculator", "web_search"}
        )

        # Get all except specific tools
        functions = registry.to_function_definitions(
            exclude={"dangerous_tool"}
        )
        ```
    """
    tools_to_include = []

    for name, tool in self.items():
        # Apply filters
        if include_only and name not in include_only:
            continue
        if exclude and name in exclude:
            continue

        tools_to_include.append(tool)

    return [tool.to_function_definition() for tool in tools_to_include]
to_anthropic_tool_definitions
to_anthropic_tool_definitions(
    include_only: Set[str] | None = None, exclude: Set[str] | None = None
) -> List[Dict[str, Any]]

Convert tools to Anthropic tool format.

Parameters:

Name Type Description Default
include_only Set[str] | None

If provided, only include tools with these names

None
exclude Set[str] | None

If provided, exclude tools with these names

None

Returns:

Type Description
List[Dict[str, Any]]

List of tool definition dictionaries

Example
tools = registry.to_anthropic_tool_definitions()
# Use with Anthropic API
response = client.messages.create(
    model="claude-3-sonnet",
    tools=tools,
    messages=[...]
)
Source code in packages/llm/src/dataknobs_llm/tools/registry.py
def to_anthropic_tool_definitions(
    self, include_only: Set[str] | None = None, exclude: Set[str] | None = None
) -> List[Dict[str, Any]]:
    """Convert tools to Anthropic tool format.

    Args:
        include_only: If provided, only include tools with these names
        exclude: If provided, exclude tools with these names

    Returns:
        List of tool definition dictionaries

    Example:
        ```python
        tools = registry.to_anthropic_tool_definitions()
        # Use with Anthropic API
        response = client.messages.create(
            model="claude-3-sonnet",
            tools=tools,
            messages=[...]
        )
        ```
    """
    tools_to_include = []

    for name, tool in self.items():
        # Apply filters
        if include_only and name not in include_only:
            continue
        if exclude and name in exclude:
            continue

        tools_to_include.append(tool)

    return [tool.to_anthropic_tool_definition() for tool in tools_to_include]
execute_tool async
execute_tool(name: str, **kwargs: Any) -> Any

Execute a tool by name with given parameters.

This is a convenience method for getting and executing a tool in a single call. When execution tracking is enabled, records timing, parameters, and results for observability.

Parameters:

Name Type Description Default
name str

Name of the tool to execute

required
**kwargs Any

Parameters to pass to the tool. Special parameters starting with '_' (like _context) are passed to the tool but excluded from execution records.

{}

Returns:

Type Description
Any

Tool execution result

Raises:

Type Description
NotFoundError

If tool not found

Exception

If tool execution fails

Example
result = await registry.execute_tool(
    "calculator",
    operation="add",
    a=5,
    b=3
)
print(result)
# 8

# With tracking enabled
registry = ToolRegistry(track_executions=True)
await registry.execute_tool("calculator", operation="add", a=5, b=3)
history = registry.get_execution_history(tool_name="calculator")
Source code in packages/llm/src/dataknobs_llm/tools/registry.py
async def execute_tool(self, name: str, **kwargs: Any) -> Any:
    """Execute a tool by name with given parameters.

    This is a convenience method for getting and executing a tool
    in a single call. When execution tracking is enabled, records
    timing, parameters, and results for observability.

    Args:
        name: Name of the tool to execute
        **kwargs: Parameters to pass to the tool. Special parameters
            starting with '_' (like _context) are passed to the tool
            but excluded from execution records.

    Returns:
        Tool execution result

    Raises:
        NotFoundError: If tool not found
        Exception: If tool execution fails

    Example:
        ```python
        result = await registry.execute_tool(
            "calculator",
            operation="add",
            a=5,
            b=3
        )
        print(result)
        # 8

        # With tracking enabled
        registry = ToolRegistry(track_executions=True)
        await registry.execute_tool("calculator", operation="add", a=5, b=3)
        history = registry.get_execution_history(tool_name="calculator")
        ```
    """
    tool = self.get_tool(name)

    # Separate internal params from tool params
    # Internal params (starting with _) are used by the registry but not passed to tools
    tool_params = {k: v for k, v in kwargs.items() if not k.startswith("_")}
    context = kwargs.get("_context")

    if not self._track_executions or self._execution_tracker is None:
        return await tool.execute(**tool_params)

    # Extract context ID if available
    context_id = getattr(context, "conversation_id", None) if context else None

    start_time = time.time()
    try:
        result = await tool.execute(**tool_params)
        duration_ms = (time.time() - start_time) * 1000

        self._execution_tracker.record(
            ToolExecutionRecord(
                tool_name=name,
                timestamp=start_time,
                parameters=tool_params,
                result=result,
                duration_ms=duration_ms,
                success=True,
                context_id=context_id,
            )
        )
        return result

    except Exception as e:
        duration_ms = (time.time() - start_time) * 1000
        self._execution_tracker.record(
            ToolExecutionRecord(
                tool_name=name,
                timestamp=start_time,
                parameters=tool_params,
                result=None,
                duration_ms=duration_ms,
                success=False,
                error=str(e),
                context_id=context_id,
            )
        )
        raise
get_execution_history
get_execution_history(
    tool_name: str | None = None,
    context_id: str | None = None,
    since: float | None = None,
    until: float | None = None,
    success_only: bool = False,
    failed_only: bool = False,
    limit: int | None = None,
) -> list[ToolExecutionRecord]

Query tool execution history.

Only available when tracking is enabled.

Parameters:

Name Type Description Default
tool_name str | None

Filter by tool name

None
context_id str | None

Filter by context/conversation ID

None
since float | None

Filter to records after this timestamp

None
until float | None

Filter to records before this timestamp

None
success_only bool

Only include successful executions

False
failed_only bool

Only include failed executions

False
limit int | None

Maximum number of records to return

None

Returns:

Type Description
list[ToolExecutionRecord]

List of matching execution records, or empty list if

list[ToolExecutionRecord]

tracking is not enabled

Example
# Get all executions for a specific tool
calc_history = registry.get_execution_history(tool_name="calculator")

# Get recent failed executions
failures = registry.get_execution_history(
    since=time.time() - 3600,
    failed_only=True
)

# Get executions for a conversation
conv_history = registry.get_execution_history(
    context_id="conv-123"
)
Source code in packages/llm/src/dataknobs_llm/tools/registry.py
def get_execution_history(
    self,
    tool_name: str | None = None,
    context_id: str | None = None,
    since: float | None = None,
    until: float | None = None,
    success_only: bool = False,
    failed_only: bool = False,
    limit: int | None = None,
) -> list[ToolExecutionRecord]:
    """Query tool execution history.

    Only available when tracking is enabled.

    Args:
        tool_name: Filter by tool name
        context_id: Filter by context/conversation ID
        since: Filter to records after this timestamp
        until: Filter to records before this timestamp
        success_only: Only include successful executions
        failed_only: Only include failed executions
        limit: Maximum number of records to return

    Returns:
        List of matching execution records, or empty list if
        tracking is not enabled

    Example:
        ```python
        # Get all executions for a specific tool
        calc_history = registry.get_execution_history(tool_name="calculator")

        # Get recent failed executions
        failures = registry.get_execution_history(
            since=time.time() - 3600,
            failed_only=True
        )

        # Get executions for a conversation
        conv_history = registry.get_execution_history(
            context_id="conv-123"
        )
        ```
    """
    if self._execution_tracker is None:
        return []

    query = ExecutionHistoryQuery(
        tool_name=tool_name,
        context_id=context_id,
        since=since,
        until=until,
        success_only=success_only,
        failed_only=failed_only,
        limit=limit,
    )
    return self._execution_tracker.query(query)
get_execution_stats
get_execution_stats(tool_name: str | None = None) -> ExecutionStats

Get aggregated execution statistics.

Only available when tracking is enabled.

Parameters:

Name Type Description Default
tool_name str | None

Get stats for specific tool, or None for all tools

None

Returns:

Type Description
ExecutionStats

ExecutionStats with aggregated metrics, or empty stats if

ExecutionStats

tracking is not enabled

Example
# Get stats for all tools
all_stats = registry.get_execution_stats()
print(f"Total: {all_stats.total_executions}")
print(f"Success rate: {all_stats.success_rate:.1f}%")

# Get stats for specific tool
calc_stats = registry.get_execution_stats("calculator")
print(f"Avg duration: {calc_stats.avg_duration_ms:.2f}ms")
Source code in packages/llm/src/dataknobs_llm/tools/registry.py
def get_execution_stats(self, tool_name: str | None = None) -> ExecutionStats:
    """Get aggregated execution statistics.

    Only available when tracking is enabled.

    Args:
        tool_name: Get stats for specific tool, or None for all tools

    Returns:
        ExecutionStats with aggregated metrics, or empty stats if
        tracking is not enabled

    Example:
        ```python
        # Get stats for all tools
        all_stats = registry.get_execution_stats()
        print(f"Total: {all_stats.total_executions}")
        print(f"Success rate: {all_stats.success_rate:.1f}%")

        # Get stats for specific tool
        calc_stats = registry.get_execution_stats("calculator")
        print(f"Avg duration: {calc_stats.avg_duration_ms:.2f}ms")
        ```
    """
    if self._execution_tracker is None:
        return ExecutionStats(tool_name=tool_name)

    return self._execution_tracker.get_stats(tool_name)
clear_execution_history
clear_execution_history() -> None

Clear all execution history.

Has no effect if tracking is not enabled.

Example
# Clear history before a test run
registry.clear_execution_history()
Source code in packages/llm/src/dataknobs_llm/tools/registry.py
def clear_execution_history(self) -> None:
    """Clear all execution history.

    Has no effect if tracking is not enabled.

    Example:
        ```python
        # Clear history before a test run
        registry.clear_execution_history()
        ```
    """
    if self._execution_tracker is not None:
        self._execution_tracker.clear()
execution_history_count
execution_history_count() -> int

Get number of records in execution history.

Returns:

Type Description
int

Number of records, or 0 if tracking is not enabled

Source code in packages/llm/src/dataknobs_llm/tools/registry.py
def execution_history_count(self) -> int:
    """Get number of records in execution history.

    Returns:
        Number of records, or 0 if tracking is not enabled
    """
    if self._execution_tracker is None:
        return 0
    return len(self._execution_tracker)
filter_by_metadata
filter_by_metadata(**filters: Any) -> List[Tool]

Filter tools by metadata attributes.

Parameters:

Name Type Description Default
**filters Any

Key-value pairs to match in tool metadata

{}

Returns:

Type Description
List[Tool]

List of tools matching all filters

Example
# Get all tools with category="math"
math_tools = registry.filter_by_metadata(category="math")

# Get tools with multiple criteria
safe_tools = registry.filter_by_metadata(
    category="utility",
    safe=True
)
Source code in packages/llm/src/dataknobs_llm/tools/registry.py
def filter_by_metadata(self, **filters: Any) -> List[Tool]:
    """Filter tools by metadata attributes.

    Args:
        **filters: Key-value pairs to match in tool metadata

    Returns:
        List of tools matching all filters

    Example:
        ```python
        # Get all tools with category="math"
        math_tools = registry.filter_by_metadata(category="math")

        # Get tools with multiple criteria
        safe_tools = registry.filter_by_metadata(
            category="utility",
            safe=True
        )
        ```
    """
    matching_tools = []

    for tool in self.list_items():
        # Check if all filters match
        matches = True
        for key, value in filters.items():
            if key not in tool.metadata or tool.metadata[key] != value:
                matches = False
                break

        if matches:
            matching_tools.append(tool)

    return matching_tools
clone
clone(preserve_history: bool = False) -> ToolRegistry

Create a shallow copy of this registry.

Parameters:

Name Type Description Default
preserve_history bool

If True and tracking is enabled, copy execution history to the new registry. Default False.

False

Returns:

Type Description
ToolRegistry

New ToolRegistry with same tools and tracking settings

Example

original = ToolRegistry(track_executions=True) original.register_tool(CalculatorTool())

copy = original.clone() copy.count() 1 copy.tracking_enabled True

Source code in packages/llm/src/dataknobs_llm/tools/registry.py
def clone(self, preserve_history: bool = False) -> "ToolRegistry":
    """Create a shallow copy of this registry.

    Args:
        preserve_history: If True and tracking is enabled, copy
            execution history to the new registry. Default False.

    Returns:
        New ToolRegistry with same tools and tracking settings

    Example:
        >>> original = ToolRegistry(track_executions=True)
        >>> original.register_tool(CalculatorTool())
        >>>
        >>> copy = original.clone()
        >>> copy.count()
        1
        >>> copy.tracking_enabled
        True
    """
    # Determine max history from current tracker
    max_history = (
        self._execution_tracker._max_history
        if self._execution_tracker
        else 100
    )

    new_registry = ToolRegistry(
        track_executions=self._track_executions,
        max_execution_history=max_history,
    )

    for name, tool in self.items():
        new_registry.register(name, tool, allow_overwrite=True)

    # Optionally copy history
    # Note: use 'is not None' to avoid __len__ evaluation (empty tracker is falsy)
    if (
        preserve_history
        and self._execution_tracker is not None
        and new_registry._execution_tracker is not None
    ):
        for record in self._execution_tracker.query():
            new_registry._execution_tracker.record(record)

    return new_registry
__repr__
__repr__() -> str

String representation of registry.

Source code in packages/llm/src/dataknobs_llm/tools/registry.py
def __repr__(self) -> str:
    """String representation of registry."""
    tracking = ", tracking=True" if self._track_executions else ""
    return f"ToolRegistry(tools={self.count()}{tracking})"
__str__
__str__() -> str

Human-readable string representation.

Source code in packages/llm/src/dataknobs_llm/tools/registry.py
def __str__(self) -> str:
    """Human-readable string representation."""
    if self.count() == 0:
        tracking = ", tracking enabled" if self._track_executions else ""
        return f"ToolRegistry(empty{tracking})"

    tool_names = ", ".join(self.list_keys())
    tracking = ", tracking enabled" if self._track_executions else ""
    return f"ToolRegistry({self.count()} tools: {tool_names}{tracking})"

DeterministicTask dataclass

DeterministicTask(
    fn: Callable[..., Any],
    args: tuple[Any, ...] = (),
    kwargs: dict[str, Any] = dict(),
    tag: str = "",
)

A sync or async callable to execute alongside LLM tasks.

Attributes:

Name Type Description
fn Callable[..., Any]

The callable to execute. May be sync or async.

args tuple[Any, ...]

Positional arguments forwarded to fn.

kwargs dict[str, Any]

Keyword arguments forwarded to fn.

tag str

Identifier for result lookup.

LLMTask dataclass

LLMTask(
    messages: list[LLMMessage],
    config_overrides: dict[str, Any] | None = None,
    retry: RetryConfig | None = None,
    tag: str = "",
)

A single LLM call to execute.

Attributes:

Name Type Description
messages list[LLMMessage]

Messages to send to the LLM provider.

config_overrides dict[str, Any] | None

Per-task provider config overrides (temperature, model, etc.).

retry RetryConfig | None

Per-task retry policy. Overrides the executor's default_retry.

tag str

Identifier for result lookup. Populated automatically from the dict key when using execute() or execute_mixed().

ParallelLLMExecutor

ParallelLLMExecutor(
    provider: AsyncLLMProvider,
    max_concurrency: int = 5,
    default_retry: RetryConfig | None = None,
    default_config_overrides: dict[str, Any] | None = None,
)

Runs multiple LLM calls and deterministic functions concurrently.

Features: - Concurrency control via asyncio.Semaphore - Per-task error isolation (one failure does not cancel others) - Optional per-task retry via RetryExecutor - Mixed execution of LLM tasks and deterministic callables

Example

executor = ParallelLLMExecutor(provider, max_concurrency=3) results = await executor.execute({ ... "q1": LLMTask(messages=[LLMMessage(role="user", content="Hello")]), ... "q2": LLMTask(messages=[LLMMessage(role="user", content="World")]), ... }) assert results["q1"].success

Initialize the executor.

Parameters:

Name Type Description Default
provider AsyncLLMProvider

The LLM provider to use for LLM tasks.

required
max_concurrency int

Maximum number of concurrent tasks.

5
default_retry RetryConfig | None

Default retry policy applied to tasks that do not specify their own.

None
default_config_overrides dict[str, Any] | None

Config overrides applied to every LLM task. Per-task config_overrides take precedence over these defaults when both are set.

None

Methods:

Name Description
execute

Run LLM tasks concurrently with error isolation.

execute_mixed

Run a mix of LLM and deterministic tasks concurrently.

execute_sequential

Run LLM tasks sequentially, optionally passing results forward.

Source code in packages/llm/src/dataknobs_llm/execution/parallel.py
def __init__(
    self,
    provider: AsyncLLMProvider,
    max_concurrency: int = 5,
    default_retry: RetryConfig | None = None,
    default_config_overrides: dict[str, Any] | None = None,
) -> None:
    """Initialize the executor.

    Args:
        provider: The LLM provider to use for LLM tasks.
        max_concurrency: Maximum number of concurrent tasks.
        default_retry: Default retry policy applied to tasks that do not
            specify their own.
        default_config_overrides: Config overrides applied to every LLM
            task.  Per-task ``config_overrides`` take precedence over
            these defaults when both are set.
    """
    self._provider = provider
    self._max_concurrency = max_concurrency
    self._default_retry = default_retry
    self._default_config_overrides = default_config_overrides
Functions
execute async
execute(tasks: dict[str, LLMTask]) -> dict[str, TaskResult]

Run LLM tasks concurrently with error isolation.

Parameters:

Name Type Description Default
tasks dict[str, LLMTask]

Mapping of tag to LLMTask. Tags identify results.

required

Returns:

Type Description
dict[str, TaskResult]

Mapping of tag to TaskResult.

Source code in packages/llm/src/dataknobs_llm/execution/parallel.py
async def execute(
    self,
    tasks: dict[str, LLMTask],
) -> dict[str, TaskResult]:
    """Run LLM tasks concurrently with error isolation.

    Args:
        tasks: Mapping of tag to LLMTask. Tags identify results.

    Returns:
        Mapping of tag to TaskResult.
    """
    if not tasks:
        return {}

    semaphore = asyncio.Semaphore(self._max_concurrency)
    start_all = time.monotonic()

    async def _run(tag: str, task: LLMTask) -> TaskResult:
        async with semaphore:
            return await self._execute_single_llm(tag, task)

    coros = [_run(tag, task) for tag, task in tasks.items()]
    results = await asyncio.gather(*coros)
    result_map = {r.tag: r for r in results}

    total_ms = (time.monotonic() - start_all) * 1000
    successes = sum(1 for r in results if r.success)
    failures = len(results) - successes
    logger.info(
        "Parallel execution complete: %d tasks (%d ok, %d failed) in %.1fms",
        len(results),
        successes,
        failures,
        total_ms,
    )

    return result_map
execute_mixed async
execute_mixed(
    tasks: dict[str, LLMTask | DeterministicTask],
) -> dict[str, TaskResult]

Run a mix of LLM and deterministic tasks concurrently.

Parameters:

Name Type Description Default
tasks dict[str, LLMTask | DeterministicTask]

Mapping of tag to task (LLMTask or DeterministicTask).

required

Returns:

Type Description
dict[str, TaskResult]

Mapping of tag to TaskResult.

Source code in packages/llm/src/dataknobs_llm/execution/parallel.py
async def execute_mixed(
    self,
    tasks: dict[str, LLMTask | DeterministicTask],
) -> dict[str, TaskResult]:
    """Run a mix of LLM and deterministic tasks concurrently.

    Args:
        tasks: Mapping of tag to task (LLMTask or DeterministicTask).

    Returns:
        Mapping of tag to TaskResult.
    """
    if not tasks:
        return {}

    semaphore = asyncio.Semaphore(self._max_concurrency)
    start_all = time.monotonic()

    async def _run(tag: str, task: LLMTask | DeterministicTask) -> TaskResult:
        async with semaphore:
            if isinstance(task, LLMTask):
                return await self._execute_single_llm(tag, task)
            return await self._execute_single_deterministic(tag, task)

    coros = [_run(tag, task) for tag, task in tasks.items()]
    results = await asyncio.gather(*coros)
    result_map = {r.tag: r for r in results}

    total_ms = (time.monotonic() - start_all) * 1000
    successes = sum(1 for r in results if r.success)
    failures = len(results) - successes
    logger.info(
        "Mixed execution complete: %d tasks (%d ok, %d failed) in %.1fms",
        len(results),
        successes,
        failures,
        total_ms,
    )

    return result_map
execute_sequential async
execute_sequential(
    tasks: list[LLMTask], pass_result: bool = False
) -> list[TaskResult]

Run LLM tasks sequentially, optionally passing results forward.

When pass_result is True, each task's messages are augmented with the previous task's response as an assistant message.

Parameters:

Name Type Description Default
tasks list[LLMTask]

Ordered list of LLM tasks to run.

required
pass_result bool

If True, append previous result as assistant message.

False

Returns:

Type Description
list[TaskResult]

List of TaskResult in execution order.

Source code in packages/llm/src/dataknobs_llm/execution/parallel.py
async def execute_sequential(
    self,
    tasks: list[LLMTask],
    pass_result: bool = False,
) -> list[TaskResult]:
    """Run LLM tasks sequentially, optionally passing results forward.

    When ``pass_result`` is True, each task's messages are augmented with
    the previous task's response as an assistant message.

    Args:
        tasks: Ordered list of LLM tasks to run.
        pass_result: If True, append previous result as assistant message.

    Returns:
        List of TaskResult in execution order.
    """
    results: list[TaskResult] = []
    for i, task in enumerate(tasks):
        tag = task.tag or f"step_{i}"
        if pass_result and results and results[-1].success:
            prev_response = results[-1].value
            task = LLMTask(
                messages=list(task.messages)
                + [LLMMessage(role="assistant", content=prev_response.content)],
                config_overrides=task.config_overrides,
                retry=task.retry,
                tag=tag,
            )
        result = await self._execute_single_llm(tag, task)
        results.append(result)
    return results

TaskResult dataclass

TaskResult(
    tag: str,
    success: bool,
    value: LLMResponse | Any,
    error: Exception | None = None,
    duration_ms: float = 0.0,
)

Result of a single task execution.

Attributes:

Name Type Description
tag str

The task identifier.

success bool

Whether the task completed without error.

value LLMResponse | Any

The return value. LLMResponse for LLM tasks, arbitrary for deterministic tasks. None on failure.

error Exception | None

The exception if the task failed, None otherwise.

duration_ms float

Wall-clock execution time in milliseconds.

ResponseQueueExhaustedError

ResponseQueueExhaustedError(call_count: int)

Bases: OperationError

EchoProvider response queue exhausted in strict mode.

Raised when strict=True and a complete() call is made after all scripted responses have been consumed. Indicates the test scripted fewer responses than the code actually needed.

Source code in packages/llm/src/dataknobs_llm/exceptions.py
def __init__(self, call_count: int) -> None:
    self.call_count = call_count
    super().__init__(
        f"EchoProvider response queue exhausted after {call_count} "
        f"call(s) (strict mode). The test scripted fewer responses "
        f"than the code requires. Either add more responses to the "
        f"queue or set strict=False to fall back to echo behavior."
    )

ToolsNotSupportedError

ToolsNotSupportedError(model: str, suggestion: str = '')

Bases: OperationError

Model does not support tool/function calling.

Source code in packages/llm/src/dataknobs_llm/exceptions.py
def __init__(self, model: str, suggestion: str = "") -> None:
    self.model = model
    self.suggestion = suggestion
    msg = f"Model '{model}' does not support tools."
    if suggestion:
        msg = f"{msg} {suggestion}"
    super().__init__(msg)

CallTracker

CallTracker(**providers: CapturingProvider)

Collect new LLM calls across multiple CapturingProviders per turn.

In multi-LLM bot scenarios (e.g. a main LLM and an extraction LLM), a single user turn may trigger calls to several providers. CallTracker collects calls from all registered providers since the last collection, assigns sequential global indices, and returns them in registration order.

Parameters:

Name Type Description Default
**providers CapturingProvider

Named CapturingProvider instances. The keyword name is used only for get_provider() lookups.

{}
Example
main = CapturingProvider(real_main, role="main")
extraction = CapturingProvider(real_extraction, role="extraction")
tracker = CallTracker(main=main, extraction=extraction)

# ... run a bot turn that triggers LLM calls ...

new_calls = tracker.collect_new_calls()
for call in new_calls:
    print(f"[{call.call_index}] {call.role}: {call.response['content'][:40]}")

Methods:

Name Description
get_provider

Get a registered provider by name.

collect_new_calls

Collect calls made since the last collection.

Attributes:

Name Type Description
provider_names list[str]

Get list of registered provider names.

total_calls int

Total number of calls collected across all providers.

Source code in packages/llm/src/dataknobs_llm/testing.py
def __init__(self, **providers: CapturingProvider) -> None:
    self._providers: dict[str, CapturingProvider] = dict(providers)
    # Track how many calls we've already seen per provider
    self._cursors: dict[str, int] = {
        name: p.call_count for name, p in self._providers.items()
    }
    self._global_index: int = 0
Attributes
provider_names property
provider_names: list[str]

Get list of registered provider names.

total_calls property
total_calls: int

Total number of calls collected across all providers.

Functions
get_provider
get_provider(name: str) -> CapturingProvider | None

Get a registered provider by name.

Parameters:

Name Type Description Default
name str

Provider registration name

required

Returns:

Type Description
CapturingProvider | None

CapturingProvider or None if not found

Source code in packages/llm/src/dataknobs_llm/testing.py
def get_provider(self, name: str) -> CapturingProvider | None:
    """Get a registered provider by name.

    Args:
        name: Provider registration name

    Returns:
        CapturingProvider or None if not found
    """
    return self._providers.get(name)
collect_new_calls
collect_new_calls() -> list[CapturedCall]

Collect calls made since the last collection.

Returns new calls from all providers, sorted by provider registration order. Each call receives a sequential call_index relative to all previously collected calls.

Returns:

Type Description
list[CapturedCall]

List of new CapturedCall objects with global call_index values

Source code in packages/llm/src/dataknobs_llm/testing.py
def collect_new_calls(self) -> list[CapturedCall]:
    """Collect calls made since the last collection.

    Returns new calls from all providers, sorted by provider
    registration order.  Each call receives a sequential
    ``call_index`` relative to all previously collected calls.

    Returns:
        List of new CapturedCall objects with global call_index values
    """
    new_calls: list[CapturedCall] = []

    for name, provider in self._providers.items():
        cursor = self._cursors[name]
        all_calls = provider.captured_calls
        provider_new = all_calls[cursor:]

        for call in provider_new:
            new_calls.append(
                CapturedCall(
                    role=call.role,
                    messages=call.messages,
                    response=call.response,
                    config_overrides=call.config_overrides,
                    tools=call.tools,
                    duration_seconds=call.duration_seconds,
                    call_index=self._global_index,
                )
            )
            self._global_index += 1

        self._cursors[name] = len(all_calls)

    return new_calls

CapturedCall dataclass

CapturedCall(
    role: str,
    messages: list[dict[str, Any]],
    response: dict[str, Any],
    config_overrides: dict[str, Any] | None = None,
    tools: list[Any] | None = None,
    duration_seconds: float = 0.0,
    call_index: int = 0,
)

Record of a single LLM call captured by CapturingProvider.

Attributes:

Name Type Description
role str

Provider role tag (e.g., "main", "extraction")

messages list[dict[str, Any]]

Serialized request messages (list of dicts)

response dict[str, Any]

Serialized LLM response (dict)

config_overrides dict[str, Any] | None

Config overrides passed to the call, if any

tools list[Any] | None

Tool definitions passed to the call, if any

duration_seconds float

Wall-clock duration of the call

call_index int

Per-instance call ordering (0-based)

CapturingProvider

CapturingProvider(delegate: AsyncLLMProvider, role: str = 'main')

Bases: AsyncLLMProvider

Provider wrapper that records all LLM calls for capture-replay testing.

Wraps a real AsyncLLMProvider delegate, forwarding all calls while recording request/response pairs as CapturedCall objects. The role tag (e.g., "main" or "extraction") enables replay to route responses to the correct EchoProvider.

Parameters:

Name Type Description Default
delegate AsyncLLMProvider

Real provider to wrap

required
role str

Tag identifying this provider's role (default: "main")

'main'
Example
from dataknobs_llm.testing import CapturingProvider

real_provider = OllamaProvider(config)
capturing = CapturingProvider(real_provider, role="main")

# Use normally — calls pass through to the real provider
response = await capturing.complete(messages)

# Inspect what was captured
assert capturing.call_count == 1
call = capturing.captured_calls[0]
print(f"Sent {len(call.messages)} messages, got: {call.response['content'][:50]}")

Methods:

Name Description
initialize

Delegate initialization to the wrapped provider.

close

Delegate close to the wrapped provider.

validate_model

Delegate model validation to the wrapped provider.

get_capabilities

Delegate capability detection to the wrapped provider.

complete

Forward completion to delegate and capture the call.

stream_complete

Forward streaming completion to delegate and capture the assembled response.

embed

Delegate embedding to the wrapped provider (not captured).

function_call

Delegate function calling to the wrapped provider and capture the call.

Attributes:

Name Type Description
role str

Provider role tag.

captured_calls list[CapturedCall]

All captured calls (read-only copy).

call_count int

Number of captured calls.

Source code in packages/llm/src/dataknobs_llm/testing.py
def __init__(self, delegate: AsyncLLMProvider, role: str = "main") -> None:
    # Initialize with the delegate's config (satisfies LLMProvider.__init__)
    super().__init__(delegate.config)
    self._delegate = delegate
    self._role = role
    self._captured_calls: list[CapturedCall] = []
Attributes
role property
role: str

Provider role tag.

captured_calls property
captured_calls: list[CapturedCall]

All captured calls (read-only copy).

call_count property
call_count: int

Number of captured calls.

Functions
initialize async
initialize() -> None

Delegate initialization to the wrapped provider.

Source code in packages/llm/src/dataknobs_llm/testing.py
async def initialize(self) -> None:
    """Delegate initialization to the wrapped provider."""
    await self._delegate.initialize()
close async
close() -> None

Delegate close to the wrapped provider.

Source code in packages/llm/src/dataknobs_llm/testing.py
async def close(self) -> None:
    """Delegate close to the wrapped provider."""
    await self._delegate.close()
validate_model async
validate_model() -> bool

Delegate model validation to the wrapped provider.

Source code in packages/llm/src/dataknobs_llm/testing.py
async def validate_model(self) -> bool:
    """Delegate model validation to the wrapped provider."""
    if hasattr(self._delegate, "validate_model"):
        return await self._delegate.validate_model()
    return True
get_capabilities
get_capabilities() -> List[ModelCapability]

Delegate capability detection to the wrapped provider.

Source code in packages/llm/src/dataknobs_llm/testing.py
def get_capabilities(self) -> List[ModelCapability]:
    """Delegate capability detection to the wrapped provider."""
    return self._delegate.get_capabilities()
complete async
complete(
    messages: Union[str, List[LLMMessage]],
    config_overrides: Dict[str, Any] | None = None,
    tools: list[Any] | None = None,
    **kwargs: Any,
) -> LLMResponse

Forward completion to delegate and capture the call.

Parameters:

Name Type Description Default
messages Union[str, List[LLMMessage]]

Input messages (string or list of LLMMessage)

required
config_overrides Dict[str, Any] | None

Optional per-request config overrides

None
tools list[Any] | None

Optional tool definitions

None
**kwargs Any

Additional provider-specific parameters

{}

Returns:

Type Description
LLMResponse

LLMResponse from the delegate (unchanged)

Source code in packages/llm/src/dataknobs_llm/testing.py
async def complete(
    self,
    messages: Union[str, List[LLMMessage]],
    config_overrides: Dict[str, Any] | None = None,
    tools: list[Any] | None = None,
    **kwargs: Any,
) -> LLMResponse:
    """Forward completion to delegate and capture the call.

    Args:
        messages: Input messages (string or list of LLMMessage)
        config_overrides: Optional per-request config overrides
        tools: Optional tool definitions
        **kwargs: Additional provider-specific parameters

    Returns:
        LLMResponse from the delegate (unchanged)
    """
    # Serialize messages for capture
    if isinstance(messages, str):
        serialized_msgs = [{"role": "user", "content": messages}]
    else:
        serialized_msgs = [llm_message_to_dict(m) for m in messages]

    start = time.monotonic()
    response = await self._delegate.complete(
        messages, config_overrides=config_overrides, tools=tools, **kwargs
    )
    duration = time.monotonic() - start

    self._captured_calls.append(
        CapturedCall(
            role=self._role,
            messages=serialized_msgs,
            response=llm_response_to_dict(response),
            config_overrides=config_overrides,
            tools=tools,
            duration_seconds=round(duration, 4),
            call_index=len(self._captured_calls),
        )
    )

    return response
stream_complete async
stream_complete(
    messages: Union[str, List[LLMMessage]],
    config_overrides: Dict[str, Any] | None = None,
    tools: list[Any] | None = None,
    **kwargs: Any,
) -> AsyncIterator[LLMStreamResponse]

Forward streaming completion to delegate and capture the assembled response.

Yields chunks to the caller in real time. After the stream completes, assembles the full response content and records a CapturedCall.

Parameters:

Name Type Description Default
messages Union[str, List[LLMMessage]]

Input messages

required
config_overrides Dict[str, Any] | None

Optional per-request config overrides

None
tools list[Any] | None

Optional tool definitions

None
**kwargs Any

Additional provider-specific parameters

{}

Yields:

Type Description
AsyncIterator[LLMStreamResponse]

LLMStreamResponse chunks from the delegate

Source code in packages/llm/src/dataknobs_llm/testing.py
async def stream_complete(
    self,
    messages: Union[str, List[LLMMessage]],
    config_overrides: Dict[str, Any] | None = None,
    tools: list[Any] | None = None,
    **kwargs: Any,
) -> AsyncIterator[LLMStreamResponse]:
    """Forward streaming completion to delegate and capture the assembled response.

    Yields chunks to the caller in real time. After the stream completes,
    assembles the full response content and records a CapturedCall.

    Args:
        messages: Input messages
        config_overrides: Optional per-request config overrides
        tools: Optional tool definitions
        **kwargs: Additional provider-specific parameters

    Yields:
        LLMStreamResponse chunks from the delegate
    """
    if isinstance(messages, str):
        serialized_msgs = [{"role": "user", "content": messages}]
    else:
        serialized_msgs = [llm_message_to_dict(m) for m in messages]

    # Collect chunks while yielding them through
    assembled_content = ""
    final_chunk: LLMStreamResponse | None = None
    start = time.monotonic()

    async for chunk in self._delegate.stream_complete(
        messages, config_overrides=config_overrides, tools=tools, **kwargs
    ):
        assembled_content += chunk.delta
        if chunk.is_final:
            final_chunk = chunk
        yield chunk

    duration = time.monotonic() - start

    # Build assembled response for capture
    assembled_response = LLMResponse(
        content=assembled_content,
        model=final_chunk.model or self._delegate.config.model if final_chunk else self._delegate.config.model,
        finish_reason=final_chunk.finish_reason if final_chunk else None,
        usage=final_chunk.usage if final_chunk else None,
        tool_calls=final_chunk.tool_calls if final_chunk else None,
    )

    self._captured_calls.append(
        CapturedCall(
            role=self._role,
            messages=serialized_msgs,
            response=llm_response_to_dict(assembled_response),
            config_overrides=config_overrides,
            tools=tools,
            duration_seconds=round(duration, 4),
            call_index=len(self._captured_calls),
        )
    )
embed async
embed(
    texts: Union[str, List[str]], **kwargs: Any
) -> Union[List[float], List[List[float]]]

Delegate embedding to the wrapped provider (not captured).

Source code in packages/llm/src/dataknobs_llm/testing.py
async def embed(
    self,
    texts: Union[str, List[str]],
    **kwargs: Any,
) -> Union[List[float], List[List[float]]]:
    """Delegate embedding to the wrapped provider (not captured)."""
    return await self._delegate.embed(texts, **kwargs)
function_call async
function_call(
    messages: List[LLMMessage], functions: List[Dict[str, Any]], **kwargs: Any
) -> LLMResponse

Delegate function calling to the wrapped provider and capture the call.

Source code in packages/llm/src/dataknobs_llm/testing.py
async def function_call(
    self,
    messages: List[LLMMessage],
    functions: List[Dict[str, Any]],
    **kwargs: Any,
) -> LLMResponse:
    """Delegate function calling to the wrapped provider and capture the call."""
    serialized_msgs = [llm_message_to_dict(m) for m in messages]

    start = time.monotonic()
    response = await self._delegate.function_call(messages, functions, **kwargs)
    duration = time.monotonic() - start

    self._captured_calls.append(
        CapturedCall(
            role=self._role,
            messages=serialized_msgs,
            response=llm_response_to_dict(response),
            tools=functions,
            duration_seconds=round(duration, 4),
            call_index=len(self._captured_calls),
        )
    )

    return response

ErrorResponse

ErrorResponse(exception: Exception)

Marker for a queued error in EchoProvider's response sequence.

When EchoProvider encounters an ErrorResponse in its queue, it raises the contained exception instead of returning an LLMResponse.

Usage

provider = EchoProvider(config) provider.set_responses([ text_response("ok"), ErrorResponse(RuntimeError("provider unavailable")), text_response("recovered"), ])

Source code in packages/llm/src/dataknobs_llm/llm/providers/echo.py
def __init__(self, exception: Exception) -> None:
    self.exception = exception

ResponseSequenceBuilder

ResponseSequenceBuilder(model: str = 'test-model')

Builder for creating sequences of LLM responses.

Provides a fluent API for building test response sequences, useful for testing multi-turn conversations and ReAct loops.

Example
responses = (
    ResponseSequenceBuilder()
    .add_tool_call("list_templates", {})
    .add_tool_call("get_template", {"name": "quiz"})
    .add_text("I found the quiz template!")
    .build()
)

provider.set_responses(responses)

Initialize builder.

Parameters:

Name Type Description Default
model str

Model identifier for all responses

'test-model'

Methods:

Name Description
add_text

Add a text response to the sequence.

add_tool_call

Add a tool call response to the sequence.

add_multi_tool

Add a multi-tool call response to the sequence.

add_extraction

Add an extraction response to the sequence.

add

Add a custom LLMResponse to the sequence.

build

Build and return the response sequence.

configure

Configure an EchoProvider with this sequence.

Source code in packages/llm/src/dataknobs_llm/testing.py
def __init__(self, model: str = "test-model"):
    """Initialize builder.

    Args:
        model: Model identifier for all responses
    """
    self._model = model
    self._responses: list[LLMResponse] = []
Functions
add_text
add_text(content: str) -> ResponseSequenceBuilder

Add a text response to the sequence.

Parameters:

Name Type Description Default
content str

Response text

required

Returns:

Type Description
ResponseSequenceBuilder

Self for chaining

Source code in packages/llm/src/dataknobs_llm/testing.py
def add_text(self, content: str) -> ResponseSequenceBuilder:
    """Add a text response to the sequence.

    Args:
        content: Response text

    Returns:
        Self for chaining
    """
    self._responses.append(text_response(content, model=self._model))
    return self
add_tool_call
add_tool_call(
    tool_name: str, arguments: dict[str, Any] | None = None
) -> ResponseSequenceBuilder

Add a tool call response to the sequence.

Parameters:

Name Type Description Default
tool_name str

Name of tool to call

required
arguments dict[str, Any] | None

Tool arguments

None

Returns:

Type Description
ResponseSequenceBuilder

Self for chaining

Source code in packages/llm/src/dataknobs_llm/testing.py
def add_tool_call(
    self,
    tool_name: str,
    arguments: dict[str, Any] | None = None,
) -> ResponseSequenceBuilder:
    """Add a tool call response to the sequence.

    Args:
        tool_name: Name of tool to call
        arguments: Tool arguments

    Returns:
        Self for chaining
    """
    self._responses.append(
        tool_call_response(tool_name, arguments, model=self._model)
    )
    return self
add_multi_tool
add_multi_tool(
    tools: list[tuple[str, dict[str, Any]]],
) -> ResponseSequenceBuilder

Add a multi-tool call response to the sequence.

Parameters:

Name Type Description Default
tools list[tuple[str, dict[str, Any]]]

List of (name, args) tuples

required

Returns:

Type Description
ResponseSequenceBuilder

Self for chaining

Source code in packages/llm/src/dataknobs_llm/testing.py
def add_multi_tool(
    self,
    tools: list[tuple[str, dict[str, Any]]],
) -> ResponseSequenceBuilder:
    """Add a multi-tool call response to the sequence.

    Args:
        tools: List of (name, args) tuples

    Returns:
        Self for chaining
    """
    self._responses.append(multi_tool_response(tools, model=self._model))
    return self
add_extraction
add_extraction(data: dict[str, Any]) -> ResponseSequenceBuilder

Add an extraction response to the sequence.

Parameters:

Name Type Description Default
data dict[str, Any]

Extracted data dict

required

Returns:

Type Description
ResponseSequenceBuilder

Self for chaining

Source code in packages/llm/src/dataknobs_llm/testing.py
def add_extraction(
    self,
    data: dict[str, Any],
) -> ResponseSequenceBuilder:
    """Add an extraction response to the sequence.

    Args:
        data: Extracted data dict

    Returns:
        Self for chaining
    """
    self._responses.append(extraction_response(data, model=self._model))
    return self
add
add(response: LLMResponse) -> ResponseSequenceBuilder

Add a custom LLMResponse to the sequence.

Parameters:

Name Type Description Default
response LLMResponse

Custom response object

required

Returns:

Type Description
ResponseSequenceBuilder

Self for chaining

Source code in packages/llm/src/dataknobs_llm/testing.py
def add(self, response: LLMResponse) -> ResponseSequenceBuilder:
    """Add a custom LLMResponse to the sequence.

    Args:
        response: Custom response object

    Returns:
        Self for chaining
    """
    self._responses.append(response)
    return self
build
build() -> list[LLMResponse]

Build and return the response sequence.

Returns:

Type Description
list[LLMResponse]

List of LLMResponse objects

Source code in packages/llm/src/dataknobs_llm/testing.py
def build(self) -> list[LLMResponse]:
    """Build and return the response sequence.

    Returns:
        List of LLMResponse objects
    """
    return list(self._responses)
configure
configure(provider: EchoProvider) -> EchoProvider

Configure an EchoProvider with this sequence.

Parameters:

Name Type Description Default
provider EchoProvider

EchoProvider to configure

required

Returns:

Type Description
EchoProvider

The configured provider

Source code in packages/llm/src/dataknobs_llm/testing.py
def configure(self, provider: EchoProvider) -> EchoProvider:
    """Configure an EchoProvider with this sequence.

    Args:
        provider: EchoProvider to configure

    Returns:
        The configured provider
    """
    provider.set_responses(self._responses)
    return provider

Functions

normalize_llm_config

normalize_llm_config(
    config: Union[LLMConfig, Config, Dict[str, Any]],
) -> LLMConfig

Normalize various config formats to LLMConfig.

This helper function accepts LLMConfig instances, dataknobs Config objects, or plain dictionaries and returns a standardized LLMConfig instance.

Parameters:

Name Type Description Default
config Union[LLMConfig, Config, Dict[str, Any]]

Configuration as LLMConfig, Config object, or dictionary

required

Returns:

Type Description
LLMConfig

LLMConfig instance

Raises:

Type Description
TypeError

If config type is not supported

Source code in packages/llm/src/dataknobs_llm/llm/base.py
def normalize_llm_config(config: Union["LLMConfig", Config, Dict[str, Any]]) -> "LLMConfig":
    """Normalize various config formats to LLMConfig.

    This helper function accepts LLMConfig instances, dataknobs Config objects,
    or plain dictionaries and returns a standardized LLMConfig instance.

    Args:
        config: Configuration as LLMConfig, Config object, or dictionary

    Returns:
        LLMConfig instance

    Raises:
        TypeError: If config type is not supported
    """
    # Already an LLMConfig instance
    if isinstance(config, LLMConfig):
        return config

    # Dictionary (possibly from Config.get())
    if isinstance(config, dict):
        return LLMConfig.from_dict(config)

    # dataknobs Config object - try to get the config dict
    # We check for the get method to identify Config objects
    if hasattr(config, 'get') and hasattr(config, 'get_types'):
        # It's a Config object, extract the llm configuration
        # Try to get first llm config, or fall back to first available type
        try:
            config_dict = config.get('llm', 0)
        except Exception as e:
            # If no 'llm' type, try to get first available config of any type
            types = config.get_types()
            if types:
                config_dict = config.get(types[0], 0)
            else:
                raise ValueError("Config object has no configurations") from e

        return LLMConfig.from_dict(config_dict)

    raise TypeError(
        f"Unsupported config type: {type(config).__name__}. "
        f"Expected LLMConfig, Config, or dict."
    )

create_llm_provider

create_llm_provider(
    config: LLMConfig | Config | dict[str, Any], is_async: bool = True
) -> AsyncLLMProvider | SyncLLMProvider

Create appropriate LLM provider based on configuration.

Convenience function that uses LLMProviderFactory internally. Now supports LLMConfig, Config objects, and dictionaries.

Parameters:

Name Type Description Default
config LLMConfig | Config | dict[str, Any]

LLM configuration (LLMConfig, Config, or dict)

required
is_async bool

Whether to create async provider

True

Returns:

Type Description
AsyncLLMProvider | SyncLLMProvider

LLM provider instance

Example
# Direct usage with dict
provider = create_llm_provider({
    "provider": "openai",
    "model": "gpt-4",
    "api_key": "..."
})

# With Config object
from dataknobs_config import Config
config = Config({"llm": [{"provider": "openai", "model": "gpt-4"}]})
provider = create_llm_provider(config)
Source code in packages/llm/src/dataknobs_llm/llm/providers/__init__.py
def create_llm_provider(
    config: LLMConfig | Config | dict[str, Any],
    is_async: bool = True
) -> AsyncLLMProvider | SyncLLMProvider:
    """Create appropriate LLM provider based on configuration.

    Convenience function that uses LLMProviderFactory internally.
    Now supports LLMConfig, Config objects, and dictionaries.

    Args:
        config: LLM configuration (LLMConfig, Config, or dict)
        is_async: Whether to create async provider

    Returns:
        LLM provider instance

    Example:
        ```python
        # Direct usage with dict
        provider = create_llm_provider({
            "provider": "openai",
            "model": "gpt-4",
            "api_key": "..."
        })

        # With Config object
        from dataknobs_config import Config
        config = Config({"llm": [{"provider": "openai", "model": "gpt-4"}]})
        provider = create_llm_provider(config)
        ```
    """
    factory = LLMProviderFactory(is_async=is_async)
    return factory.create(config)

create_embedding_provider async

create_embedding_provider(
    config: dict[str, Any],
    *,
    default_provider: str = "ollama",
    default_model: str = "nomic-embed-text",
) -> AsyncLLMProvider

Create and initialize an embedding provider from configuration.

Normalizes configuration from two supported formats:

  • Nested format: {"embedding": {"provider": "ollama", "model": "..."}} -- the "embedding" sub-dict is extracted and used. All extra keys in the sub-dict (api_base, api_key, dimensions, etc.) are forwarded to the provider.
  • Legacy prefix format: {"embedding_provider": "ollama", "embedding_model": "..."} -- embedding_ prefixed keys at the top level. api_base, api_key, and dimensions are also forwarded when present at the top level.

When neither format is present, default_provider / default_model are used (ollama / nomic-embed-text).

Parameters:

Name Type Description Default
config dict[str, Any]

Configuration dict.

required
default_provider str

Default provider if not specified.

'ollama'
default_model str

Default model if not specified.

'nomic-embed-text'

Returns:

Type Description
AsyncLLMProvider

Initialized AsyncLLMProvider instance ready for embed() calls.

Example
provider = await create_embedding_provider({
    "embedding": {
        "provider": "ollama",
        "model": "nomic-embed-text",
    },
})
embedding = await provider.embed("hello world")
Source code in packages/llm/src/dataknobs_llm/llm/providers/__init__.py
async def create_embedding_provider(
    config: dict[str, Any],
    *,
    default_provider: str = "ollama",
    default_model: str = "nomic-embed-text",
) -> AsyncLLMProvider:
    """Create and initialize an embedding provider from configuration.

    Normalizes configuration from two supported formats:

    - **Nested format:** ``{"embedding": {"provider": "ollama", "model": "..."}}``
      -- the ``"embedding"`` sub-dict is extracted and used.  All extra keys
      in the sub-dict (``api_base``, ``api_key``, ``dimensions``, etc.) are
      forwarded to the provider.
    - **Legacy prefix format:** ``{"embedding_provider": "ollama",
      "embedding_model": "..."}`` -- ``embedding_`` prefixed keys at the
      top level.  ``api_base``, ``api_key``, and ``dimensions`` are also
      forwarded when present at the top level.

    When neither format is present, *default_provider* / *default_model*
    are used (``ollama`` / ``nomic-embed-text``).

    Args:
        config: Configuration dict.
        default_provider: Default provider if not specified.
        default_model: Default model if not specified.

    Returns:
        Initialized ``AsyncLLMProvider`` instance ready for ``embed()`` calls.

    Example:
        ```python
        provider = await create_embedding_provider({
            "embedding": {
                "provider": "ollama",
                "model": "nomic-embed-text",
            },
        })
        embedding = await provider.embed("hello world")
        ```
    """
    # 1. Nested "embedding" sub-dict (preferred)
    extra: dict[str, Any]
    embedding_config = config.get("embedding", {})
    if embedding_config and isinstance(embedding_config, dict):
        provider_name = embedding_config.get("provider", default_provider)
        model_name = embedding_config.get("model", default_model)
        # Forward all extra keys (api_base, api_key, dimensions, etc.)
        extra = {
            k: v for k, v in embedding_config.items()
            if k not in ("provider", "model")
        }
    else:
        # 2. Legacy prefix format (embedding_provider / embedding_model)
        provider_name = config.get("embedding_provider", default_provider)
        model_name = config.get("embedding_model", default_model)
        extra = {}
        for passthrough in ("api_base", "api_key", "dimensions"):
            if passthrough in config:
                extra[passthrough] = config[passthrough]

    factory = LLMProviderFactory(is_async=True)
    provider_config = {
        "provider": provider_name,
        "model": model_name,
        **extra,
        "mode": "embedding",  # Always forced — must come after **extra
    }
    try:
        provider = factory.create(provider_config)
        await provider.initialize()
    except Exception:
        _logger.exception(
            "Failed to create embedding provider: %s/%s",
            provider_name,
            model_name,
        )
        raise

    _logger.info(
        "Embedding provider initialized: %s/%s",
        provider_name,
        model_name,
    )
    return provider

create_caching_provider async

create_caching_provider(
    inner: AsyncLLMProvider,
    cache_path: str | Path | None = None,
    *,
    cache_backend: str = "sqlite",
) -> CachingEmbedProvider

Create and initialize a CachingEmbedProvider.

Parameters:

Name Type Description Default
inner AsyncLLMProvider

The provider to wrap.

required
cache_path str | Path | None

Path for the SQLite cache file. Required when cache_backend is "sqlite". Ignored for "memory".

None
cache_backend str

"sqlite" (default) or "memory". The "sqlite" backend requires aiosqlite (install via pip install 'dataknobs-llm[sqlite-cache]').

'sqlite'

Returns:

Type Description
CachingEmbedProvider

An initialized CachingEmbedProvider.

Raises:

Type Description
ValueError

If cache_backend is "sqlite" and no cache_path is provided, or if cache_backend is unknown.

Source code in packages/llm/src/dataknobs_llm/llm/providers/caching.py
async def create_caching_provider(
    inner: AsyncLLMProvider,
    cache_path: str | Path | None = None,
    *,
    cache_backend: str = "sqlite",
) -> CachingEmbedProvider:
    """Create and initialize a ``CachingEmbedProvider``.

    Args:
        inner: The provider to wrap.
        cache_path: Path for the SQLite cache file. Required when
            *cache_backend* is ``"sqlite"``. Ignored for ``"memory"``.
        cache_backend: ``"sqlite"`` (default) or ``"memory"``.
            The ``"sqlite"`` backend requires ``aiosqlite``
            (install via ``pip install 'dataknobs-llm[sqlite-cache]'``).

    Returns:
        An initialized ``CachingEmbedProvider``.

    Raises:
        ValueError: If *cache_backend* is ``"sqlite"`` and no *cache_path*
            is provided, or if *cache_backend* is unknown.
    """
    if cache_backend == "memory":
        cache: EmbeddingCache = MemoryEmbeddingCache()
    elif cache_backend == "sqlite":
        if cache_path is None:
            raise ValueError(
                "cache_path is required for the 'sqlite' cache backend"
            )
        cache = SqliteEmbeddingCache(cache_path)
    else:
        raise ValueError(
            f"Unknown cache backend: {cache_backend!r}. "
            f"Use 'sqlite' or 'memory'."
        )

    provider = CachingEmbedProvider(inner, cache)
    await provider.initialize()
    return provider

render_conditional_template

render_conditional_template(template: str, params: Dict[str, Any]) -> str

Render a template with variable substitution and conditional sections.

Variable substitution: - {{variable}} syntax for placeholders - Variables in params dict are replaced with their values - Variables not in params are left unchanged ({{variable}} remains as-is) - Whitespace handling: {{ var }} -> " value " when substituted, " {{var}} " when not

Conditional sections: - ((optional content)) syntax for conditional blocks - Section is removed if all {{variables}} inside are empty/None/missing - Section is rendered (without parentheses) if any variable has a value - Variables inside conditionals are replaced with empty strings if missing - Nested conditionals are processed recursively

Example

template = "Hello {{name}}((, you have {{count}} messages))" params = {"name": "Alice", "count": 5} result = "Hello Alice, you have 5 messages"

params = {"name": "Bob"} # no count result = "Hello Bob" # conditional section removed

Parameters:

Name Type Description Default
template str

The template string

required
params Dict[str, Any]

Dictionary of parameters to substitute

required

Returns:

Type Description
str

The rendered template

Source code in packages/llm/src/dataknobs_llm/template_utils.py
def render_conditional_template(template: str, params: Dict[str, Any]) -> str:
    """Render a template with variable substitution and conditional sections.

    Variable substitution:
    - {{variable}} syntax for placeholders
    - Variables in params dict are replaced with their values
    - Variables not in params are left unchanged ({{variable}} remains as-is)
    - Whitespace handling: {{ var }} -> " value " when substituted, " {{var}} " when not

    Conditional sections:
    - ((optional content)) syntax for conditional blocks
    - Section is removed if all {{variables}} inside are empty/None/missing
    - Section is rendered (without parentheses) if any variable has a value
    - Variables inside conditionals are replaced with empty strings if missing
    - Nested conditionals are processed recursively

    Example:
        template = "Hello {{name}}((, you have {{count}} messages))"
        params = {"name": "Alice", "count": 5}
        result = "Hello Alice, you have 5 messages"

        params = {"name": "Bob"}  # no count
        result = "Hello Bob"  # conditional section removed

    Args:
        template: The template string
        params: Dictionary of parameters to substitute

    Returns:
        The rendered template
    """
    def replace_variable(text: str, params: Dict[str, Any], in_conditional: bool = False) -> str:
        """Replace variables in text with proper whitespace handling."""
        # Pattern to match variables with optional whitespace
        var_pattern = r'\{\{(\s*)(\w+)(\s*)\}\}'

        def replace_var(match):
            """Replace a single variable with whitespace handling."""
            prefix_ws = match.group(1)
            var_name = match.group(2)
            suffix_ws = match.group(3)

            if var_name not in params:
                if in_conditional:
                    # In conditional sections, missing variables become empty
                    return ""
                else:
                    # Outside conditionals, preserve the pattern but move whitespace outside
                    if prefix_ws or suffix_ws:
                        return f"{prefix_ws}{{{{{var_name}}}}}{suffix_ws}"
                    else:
                        return match.group(0)

            value = params[var_name]
            if value is None:
                if in_conditional:
                    return ""
                else:
                    # Move whitespace outside for None values
                    if prefix_ws or suffix_ws:
                        return f"{prefix_ws}{{{{{var_name}}}}}{suffix_ws}"
                    else:
                        return ""
            else:
                # Preserve whitespace when substituting
                return f"{prefix_ws}{value!s}{suffix_ws}"

        return re.sub(var_pattern, replace_var, text)

    def find_all_variables(text: str) -> set:
        """Find all variables in text, including nested conditionals."""
        var_pattern = r'\{\{(\s*)(\w+)(\s*)\}\}'
        variables = set()
        for match in re.finditer(var_pattern, text):
            variables.add(match.group(2))
        return variables

    def process_conditionals(text: str, params: Dict[str, Any]) -> str:
        """Process conditional sections recursively."""
        result = text
        changed = True

        while changed:
            changed = False
            # Find the first (( ... )) section
            start_pos = 0
            while True:
                start = result.find('((', start_pos)
                if start == -1:
                    break

                # Find matching )) - must track ALL parens for correct nesting
                depth = 1
                paren_depth = 0  # Track single parentheses
                end = start + 2
                while end < len(result) and depth > 0:
                    if result[end:end+2] == '((':
                        depth += 1
                        end += 2
                    elif result[end:end+2] == '))':
                        # Only count as )) if we're not inside single parens
                        if paren_depth == 0:
                            depth -= 1
                            end += 2
                        else:
                            # This is ) followed by another )
                            paren_depth -= 1
                            end += 1
                    elif result[end] == '(':
                        paren_depth += 1
                        end += 1
                    elif result[end] == ')':
                        paren_depth -= 1
                        end += 1
                    else:
                        end += 1

                if depth == 0:
                    # Found a complete section
                    content = result[start+2:end-2]

                    # Find ALL variables in this section (including nested)
                    all_vars = find_all_variables(content)

                    if all_vars:
                        # Check if all variables are empty/missing
                        has_value = False
                        for var_name in all_vars:
                            if var_name in params:
                                value = params[var_name]
                                if value is not None:
                                    if isinstance(value, str):
                                        # For strings, check if non-empty after stripping
                                        if value.strip():
                                            has_value = True
                                            break
                                    else:
                                        # For non-strings, any truthy value counts
                                        if value:
                                            has_value = True
                                            break

                        if not has_value:
                            # Remove the entire section - all variables are empty/missing
                            result = result[:start] + result[end:]
                        else:
                            # At least one variable has a value, process nested conditionals
                            processed_content = process_conditionals(content, params)
                            # Then substitute variables in the processed content
                            rendered = replace_variable(processed_content, params, in_conditional=True)
                            result = result[:start] + rendered + result[end:]
                    else:
                        # No variables in this section, keep the content as-is
                        # But still process any nested conditionals
                        processed_content = process_conditionals(content, params)
                        result = result[:start] + processed_content + result[end:]

                    changed = True
                    break
                else:
                    # Unmatched parentheses, leave as-is and move on
                    start_pos = start + 1

        return result

    # First process all conditional sections
    result = process_conditionals(template, params)

    # Then handle remaining variables outside of conditional sections
    result = replace_variable(result, params, in_conditional=False)

    return result

extraction_response

extraction_response(
    data: dict[str, Any], *, model: str = "test-model"
) -> LLMResponse

Create an LLMResponse for schema extraction.

The data is JSON-encoded as the response content, mimicking how extraction LLMs return structured data.

Parameters:

Name Type Description Default
data dict[str, Any]

Extracted data dict

required
model str

Model identifier (default: "test-model")

'test-model'

Returns:

Type Description
LLMResponse

LLMResponse with JSON content

Example

response = extraction_response({"name": "Math Tutor", "level": 5}) import json json.loads(response.content)

Source code in packages/llm/src/dataknobs_llm/testing.py
def extraction_response(
    data: dict[str, Any],
    *,
    model: str = "test-model",
) -> LLMResponse:
    """Create an LLMResponse for schema extraction.

    The data is JSON-encoded as the response content, mimicking
    how extraction LLMs return structured data.

    Args:
        data: Extracted data dict
        model: Model identifier (default: "test-model")

    Returns:
        LLMResponse with JSON content

    Example:
        >>> response = extraction_response({"name": "Math Tutor", "level": 5})
        >>> import json
        >>> json.loads(response.content)
        {'name': 'Math Tutor', 'level': 5}
    """
    return LLMResponse(
        content=json.dumps(data),
        model=model,
        finish_reason="stop",
    )

llm_message_from_dict

llm_message_from_dict(d: dict[str, Any]) -> LLMMessage

Deserialize an LLMMessage from a dict.

Delegates to :meth:LLMMessage.from_dict. Kept for backward compatibility.

Parameters:

Name Type Description Default
d dict[str, Any]

Dictionary representation

required

Returns:

Type Description
LLMMessage

LLMMessage instance

Source code in packages/llm/src/dataknobs_llm/testing.py
def llm_message_from_dict(d: dict[str, Any]) -> LLMMessage:
    """Deserialize an LLMMessage from a dict.

    Delegates to :meth:`LLMMessage.from_dict`. Kept for backward compatibility.

    Args:
        d: Dictionary representation

    Returns:
        LLMMessage instance
    """
    return LLMMessage.from_dict(d)

llm_message_to_dict

llm_message_to_dict(msg: LLMMessage) -> dict[str, Any]

Serialize an LLMMessage to a JSON-compatible dict.

Delegates to :meth:LLMMessage.to_dict. Kept for backward compatibility.

Parameters:

Name Type Description Default
msg LLMMessage

LLMMessage to serialize

required

Returns:

Type Description
dict[str, Any]

Dictionary representation (only non-None optional fields included)

Source code in packages/llm/src/dataknobs_llm/testing.py
def llm_message_to_dict(msg: LLMMessage) -> dict[str, Any]:
    """Serialize an LLMMessage to a JSON-compatible dict.

    Delegates to :meth:`LLMMessage.to_dict`. Kept for backward compatibility.

    Args:
        msg: LLMMessage to serialize

    Returns:
        Dictionary representation (only non-None optional fields included)
    """
    return msg.to_dict()

llm_response_from_dict

llm_response_from_dict(d: dict[str, Any]) -> LLMResponse

Deserialize an LLMResponse from a dict.

Parameters:

Name Type Description Default
d dict[str, Any]

Dictionary representation

required

Returns:

Type Description
LLMResponse

LLMResponse instance

Source code in packages/llm/src/dataknobs_llm/testing.py
def llm_response_from_dict(d: dict[str, Any]) -> LLMResponse:
    """Deserialize an LLMResponse from a dict.

    Args:
        d: Dictionary representation

    Returns:
        LLMResponse instance
    """
    tool_calls = None
    if "tool_calls" in d and d["tool_calls"] is not None:
        tool_calls = [tool_call_from_dict(tc) for tc in d["tool_calls"]]

    return LLMResponse(
        content=d["content"],
        model=d["model"],
        finish_reason=d.get("finish_reason"),
        usage=d.get("usage"),
        function_call=d.get("function_call"),
        tool_calls=tool_calls,
        metadata=d.get("metadata", {}),
        cost_usd=d.get("cost_usd"),
    )

llm_response_to_dict

llm_response_to_dict(resp: LLMResponse) -> dict[str, Any]

Serialize an LLMResponse to a JSON-compatible dict.

Omits created_at and cumulative_cost_usd (runtime artifacts that make captures non-deterministic). Only includes non-None optional fields.

Parameters:

Name Type Description Default
resp LLMResponse

LLMResponse to serialize

required

Returns:

Type Description
dict[str, Any]

Dictionary representation

Source code in packages/llm/src/dataknobs_llm/testing.py
def llm_response_to_dict(resp: LLMResponse) -> dict[str, Any]:
    """Serialize an LLMResponse to a JSON-compatible dict.

    Omits ``created_at`` and ``cumulative_cost_usd`` (runtime artifacts that
    make captures non-deterministic). Only includes non-None optional fields.

    Args:
        resp: LLMResponse to serialize

    Returns:
        Dictionary representation
    """
    d: dict[str, Any] = {"content": resp.content, "model": resp.model}
    if resp.finish_reason is not None:
        d["finish_reason"] = resp.finish_reason
    if resp.usage is not None:
        d["usage"] = resp.usage
    if resp.function_call is not None:
        d["function_call"] = resp.function_call
    if resp.tool_calls is not None:
        d["tool_calls"] = [tool_call_to_dict(tc) for tc in resp.tool_calls]
    if resp.metadata:
        d["metadata"] = resp.metadata
    if resp.cost_usd is not None:
        d["cost_usd"] = resp.cost_usd
    return d

multi_tool_response

multi_tool_response(
    tools: list[tuple[str, dict[str, Any]]],
    *,
    content: str = "",
    model: str = "test-model",
) -> LLMResponse

Create an LLMResponse with multiple tool calls.

Parameters:

Name Type Description Default
tools list[tuple[str, dict[str, Any]]]

List of (tool_name, arguments) tuples

required
content str

Optional text content alongside tool calls

''
model str

Model identifier (default: "test-model")

'test-model'

Returns:

Type Description
LLMResponse

LLMResponse with multiple tool_calls

Example

response = multi_tool_response([ ... ("preview_config", {}), ... ("validate_config", {"strict": True}), ... ]) len(response.tool_calls) 2

Source code in packages/llm/src/dataknobs_llm/testing.py
def multi_tool_response(
    tools: list[tuple[str, dict[str, Any]]],
    *,
    content: str = "",
    model: str = "test-model",
) -> LLMResponse:
    """Create an LLMResponse with multiple tool calls.

    Args:
        tools: List of (tool_name, arguments) tuples
        content: Optional text content alongside tool calls
        model: Model identifier (default: "test-model")

    Returns:
        LLMResponse with multiple tool_calls

    Example:
        >>> response = multi_tool_response([
        ...     ("preview_config", {}),
        ...     ("validate_config", {"strict": True}),
        ... ])
        >>> len(response.tool_calls)
        2
    """
    tool_calls = [
        ToolCall(
            name=name,
            parameters=args,
            id=f"tc-{uuid.uuid4().hex[:8]}",
        )
        for name, args in tools
    ]

    return LLMResponse(
        content=content,
        model=model,
        finish_reason="tool_calls",
        tool_calls=tool_calls,
    )

text_response

text_response(
    content: str,
    *,
    model: str = "test-model",
    finish_reason: str = "stop",
    usage: dict[str, int] | None = None,
    metadata: dict[str, Any] | None = None,
) -> LLMResponse

Create a simple text LLMResponse.

Parameters:

Name Type Description Default
content str

Response text content

required
model str

Model identifier (default: "test-model")

'test-model'
finish_reason str

Why generation stopped (default: "stop")

'stop'
usage dict[str, int] | None

Optional token usage dict

None
metadata dict[str, Any] | None

Optional metadata dict

None

Returns:

Type Description
LLMResponse

LLMResponse with text content

Example

response = text_response("Hello, world!") response.content 'Hello, world!'

Source code in packages/llm/src/dataknobs_llm/testing.py
def text_response(
    content: str,
    *,
    model: str = "test-model",
    finish_reason: str = "stop",
    usage: dict[str, int] | None = None,
    metadata: dict[str, Any] | None = None,
) -> LLMResponse:
    """Create a simple text LLMResponse.

    Args:
        content: Response text content
        model: Model identifier (default: "test-model")
        finish_reason: Why generation stopped (default: "stop")
        usage: Optional token usage dict
        metadata: Optional metadata dict

    Returns:
        LLMResponse with text content

    Example:
        >>> response = text_response("Hello, world!")
        >>> response.content
        'Hello, world!'
    """
    return LLMResponse(
        content=content,
        model=model,
        finish_reason=finish_reason,
        usage=usage,
        metadata=metadata or {},
    )

tool_call_from_dict

tool_call_from_dict(d: dict[str, Any]) -> ToolCall

Deserialize a ToolCall from a dict.

Delegates to :meth:ToolCall.from_dict. Kept for backward compatibility.

Parameters:

Name Type Description Default
d dict[str, Any]

Dictionary representation

required

Returns:

Type Description
ToolCall

ToolCall instance

Source code in packages/llm/src/dataknobs_llm/testing.py
def tool_call_from_dict(d: dict[str, Any]) -> ToolCall:
    """Deserialize a ToolCall from a dict.

    Delegates to :meth:`ToolCall.from_dict`. Kept for backward compatibility.

    Args:
        d: Dictionary representation

    Returns:
        ToolCall instance
    """
    return ToolCall.from_dict(d)

tool_call_response

tool_call_response(
    tool_name: str,
    arguments: dict[str, Any] | None = None,
    *,
    tool_id: str | None = None,
    content: str = "",
    model: str = "test-model",
    additional_tools: list[tuple[str, dict[str, Any]]] | None = None,
) -> LLMResponse

Create an LLMResponse with tool call(s).

Parameters:

Name Type Description Default
tool_name str

Name of the tool to call

required
arguments dict[str, Any] | None

Arguments to pass to the tool (default: {})

None
tool_id str | None

Unique ID for the tool call (auto-generated if not provided)

None
content str

Optional text content alongside tool call

''
model str

Model identifier (default: "test-model")

'test-model'
additional_tools list[tuple[str, dict[str, Any]]] | None

Additional tool calls as (name, args) tuples

None

Returns:

Type Description
LLMResponse

LLMResponse with tool_calls populated

Example

response = tool_call_response("get_weather", {"city": "NYC"}) response.tool_calls[0].name 'get_weather' response.tool_calls[0].parameters

Multiple tool calls

response = tool_call_response( ... "preview_config", {}, ... additional_tools=[("validate_config", {})] ... ) len(response.tool_calls) 2

Source code in packages/llm/src/dataknobs_llm/testing.py
def tool_call_response(
    tool_name: str,
    arguments: dict[str, Any] | None = None,
    *,
    tool_id: str | None = None,
    content: str = "",
    model: str = "test-model",
    additional_tools: list[tuple[str, dict[str, Any]]] | None = None,
) -> LLMResponse:
    """Create an LLMResponse with tool call(s).

    Args:
        tool_name: Name of the tool to call
        arguments: Arguments to pass to the tool (default: {})
        tool_id: Unique ID for the tool call (auto-generated if not provided)
        content: Optional text content alongside tool call
        model: Model identifier (default: "test-model")
        additional_tools: Additional tool calls as (name, args) tuples

    Returns:
        LLMResponse with tool_calls populated

    Example:
        >>> response = tool_call_response("get_weather", {"city": "NYC"})
        >>> response.tool_calls[0].name
        'get_weather'
        >>> response.tool_calls[0].parameters
        {'city': 'NYC'}

        # Multiple tool calls
        >>> response = tool_call_response(
        ...     "preview_config", {},
        ...     additional_tools=[("validate_config", {})]
        ... )
        >>> len(response.tool_calls)
        2
    """
    tools = [
        ToolCall(
            name=tool_name,
            parameters=arguments or {},
            id=tool_id or f"tc-{uuid.uuid4().hex[:8]}",
        )
    ]

    if additional_tools:
        for name, args in additional_tools:
            tools.append(
                ToolCall(
                    name=name,
                    parameters=args,
                    id=f"tc-{uuid.uuid4().hex[:8]}",
                )
            )

    return LLMResponse(
        content=content,
        model=model,
        finish_reason="tool_calls",
        tool_calls=tools,
    )

tool_call_to_dict

tool_call_to_dict(tc: ToolCall) -> dict[str, Any]

Serialize a ToolCall to a JSON-compatible dict.

Delegates to :meth:ToolCall.to_dict. Kept for backward compatibility.

Parameters:

Name Type Description Default
tc ToolCall

ToolCall to serialize

required

Returns:

Type Description
dict[str, Any]

Dictionary representation

Source code in packages/llm/src/dataknobs_llm/testing.py
def tool_call_to_dict(tc: ToolCall) -> dict[str, Any]:
    """Serialize a ToolCall to a JSON-compatible dict.

    Delegates to :meth:`ToolCall.to_dict`. Kept for backward compatibility.

    Args:
        tc: ToolCall to serialize

    Returns:
        Dictionary representation
    """
    return tc.to_dict()