Skip to content

Conversations API

Tree-structured conversation management with branching and persistence.

📖 Also see: Auto-generated API Reference - Complete documentation from source code docstrings


Overview

The conversations API provides a powerful system for managing multi-turn conversations with support for branching, time-travel, persistence, and FSM-based flows.

ConversationManager

dataknobs_llm.conversations.ConversationManager

ConversationManager(
    llm: AsyncLLMProvider,
    prompt_builder: AsyncPromptBuilder,
    storage: ConversationStorage,
    state: ConversationState | None = None,
    metadata: Dict[str, Any] | None = None,
    middleware: List[ConversationMiddleware] | None = None,
    cache_rag_results: bool = False,
    reuse_rag_on_branch: bool = False,
    conversation_id: str | None = None,
)

Manages multi-turn conversations with persistence and branching.

This class orchestrates conversations by: - Tracking message history with tree-based branching - Managing conversation state - Persisting to storage backend - Supporting multiple conversation branches

The conversation history is stored as a tree structure where: - Root node contains the initial system prompt (if any) - Each message is a tree node with a dot-delimited ID (e.g., "0.1.2") - Branches occur when multiple children are added to the same node - Current position tracks where you are in the conversation tree

Attributes:

Name Type Description
llm

LLM provider for completions

prompt_builder

Prompt builder with library

storage

Storage backend for persistence

state

Current conversation state (tree, metadata, position)

middleware

List of middleware to execute on requests/responses

cache_rag_results

Whether to store RAG metadata in nodes

reuse_rag_on_branch

Whether to reuse cached RAG across branches

conversation_id str | None

Unique conversation identifier

current_node_id str | None

Current position in conversation tree

Example
# Create conversation
manager = await ConversationManager.create(
    llm=llm,
    prompt_builder=builder,
    storage=storage_backend,
    system_prompt_name="helpful_assistant"
)

# Add user message
await manager.add_message(
    prompt_name="user_query",
    params={"question": "What is Python?"},
    role="user"
)

# Get LLM response
result = await manager.complete()

# Continue conversation
await manager.add_message(
    content="Tell me more about decorators",
    role="user"
)
result = await manager.complete()

# Create alternative response branch
await manager.switch_to_node("0")  # Back to first user message
result2 = await manager.complete(branch_name="alt-response")

# Resume after interruption
manager2 = await ConversationManager.resume(
    conversation_id=manager.conversation_id,
    llm=llm,
    prompt_builder=builder,
    storage=storage_backend
)
Note

Tree-based branching enables:

  • A/B Testing: Generate multiple responses from the same context
  • Retry Logic: Try again from a previous point after failures
  • Alternative Explorations: Explore different conversation paths
  • Debugging: Compare different middleware or RAG configurations

Node IDs use dot notation (e.g., "0.1.2" means 3rd child of 2nd child of 1st child of root). The root node has ID "".

State is automatically persisted after every operation. Use resume() to continue conversations across sessions or servers.

See Also

create: Create a new conversation resume: Resume an existing conversation add_message: Add user/system message complete: Get LLM completion (blocking) stream_complete: Get LLM completion (streaming) switch_to_node: Navigate to different branch get_branches: List available branches get_total_cost: Calculate cumulative cost ConversationStorage: Storage backend implementations ConversationMiddleware: Request/response processing ConversationFlow: FSM-based conversation flows

Initialize conversation manager.

Note: Use ConversationManager.create() or ConversationManager.resume() instead of calling init directly.

Parameters:

Name Type Description Default
llm AsyncLLMProvider

LLM provider for completions

required
prompt_builder AsyncPromptBuilder

Prompt builder with library

required
storage ConversationStorage

Storage backend for persistence

required
state ConversationState | None

Optional existing conversation state

None
metadata Dict[str, Any] | None

Optional metadata for new conversations

None
middleware List[ConversationMiddleware] | None

Optional list of middleware to execute

None
cache_rag_results bool

If True, store RAG metadata in node metadata for debugging and transparency

False
reuse_rag_on_branch bool

If True, reuse cached RAG results when possible (useful for testing/branching)

False
conversation_id str | None

Optional conversation ID to use when creating the first message. If not provided, a UUID is generated automatically.

None

Methods:

Name Description
create

Create a new conversation.

add_message

Add a message to the current conversation node.

complete

Get LLM completion and add as child of current node.

switch_to_node

Switch current position to a different node in the tree.

branch_from

Position tree so the next message becomes a sibling of the given node.

get_rag_metadata

Get RAG metadata from a conversation node.

Source code in packages/llm/src/dataknobs_llm/conversations/manager.py
def __init__(
    self,
    llm: AsyncLLMProvider,
    prompt_builder: AsyncPromptBuilder,
    storage: ConversationStorage,
    state: ConversationState | None = None,
    metadata: Dict[str, Any] | None = None,
    middleware: List[ConversationMiddleware] | None = None,
    cache_rag_results: bool = False,
    reuse_rag_on_branch: bool = False,
    conversation_id: str | None = None,
):
    """Initialize conversation manager.

    Note: Use ConversationManager.create() or ConversationManager.resume()
    instead of calling __init__ directly.

    Args:
        llm: LLM provider for completions
        prompt_builder: Prompt builder with library
        storage: Storage backend for persistence
        state: Optional existing conversation state
        metadata: Optional metadata for new conversations
        middleware: Optional list of middleware to execute
        cache_rag_results: If True, store RAG metadata in node metadata
                         for debugging and transparency
        reuse_rag_on_branch: If True, reuse cached RAG results when
                           possible (useful for testing/branching)
        conversation_id: Optional conversation ID to use when creating
                       the first message. If not provided, a UUID is
                       generated automatically.
    """
    self.llm = llm
    self.prompt_builder = prompt_builder
    self.storage = storage
    self.state = state
    self._initial_metadata = metadata or {}
    self.middleware = middleware or []
    self.cache_rag_results = cache_rag_results
    self.reuse_rag_on_branch = reuse_rag_on_branch
    self._conversation_id = conversation_id

Functions

create async classmethod
create(
    llm: AsyncLLMProvider,
    prompt_builder: AsyncPromptBuilder,
    storage: ConversationStorage,
    system_prompt_name: str | None = None,
    system_params: Dict[str, Any] | None = None,
    metadata: Dict[str, Any] | None = None,
    middleware: List[ConversationMiddleware] | None = None,
    cache_rag_results: bool = False,
    reuse_rag_on_branch: bool = False,
    conversation_id: str | None = None,
) -> ConversationManager

Create a new conversation.

Parameters:

Name Type Description Default
llm AsyncLLMProvider

LLM provider

required
prompt_builder AsyncPromptBuilder

Prompt builder

required
storage ConversationStorage

Storage backend

required
system_prompt_name str | None

Optional system prompt to initialize with

None
system_params Dict[str, Any] | None

Optional params for system prompt

None
metadata Dict[str, Any] | None

Optional conversation metadata

None
middleware List[ConversationMiddleware] | None

Optional list of middleware to execute

None
cache_rag_results bool

If True, store RAG metadata in node metadata

False
reuse_rag_on_branch bool

If True, reuse cached RAG results when possible

False
conversation_id str | None

Optional conversation ID. If not provided, a UUID is generated when the first message is added.

None

Returns:

Type Description
ConversationManager

Initialized ConversationManager

Example

manager = await ConversationManager.create( ... llm=llm, ... prompt_builder=builder, ... storage=storage, ... system_prompt_name="helpful_assistant", ... cache_rag_results=True ... )

Source code in packages/llm/src/dataknobs_llm/conversations/manager.py
@classmethod
async def create(
    cls,
    llm: AsyncLLMProvider,
    prompt_builder: AsyncPromptBuilder,
    storage: ConversationStorage,
    system_prompt_name: str | None = None,
    system_params: Dict[str, Any] | None = None,
    metadata: Dict[str, Any] | None = None,
    middleware: List[ConversationMiddleware] | None = None,
    cache_rag_results: bool = False,
    reuse_rag_on_branch: bool = False,
    conversation_id: str | None = None,
) -> "ConversationManager":
    """Create a new conversation.

    Args:
        llm: LLM provider
        prompt_builder: Prompt builder
        storage: Storage backend
        system_prompt_name: Optional system prompt to initialize with
        system_params: Optional params for system prompt
        metadata: Optional conversation metadata
        middleware: Optional list of middleware to execute
        cache_rag_results: If True, store RAG metadata in node metadata
        reuse_rag_on_branch: If True, reuse cached RAG results when possible
        conversation_id: Optional conversation ID. If not provided, a UUID
                       is generated when the first message is added.

    Returns:
        Initialized ConversationManager

    Example:
        >>> manager = await ConversationManager.create(
        ...     llm=llm,
        ...     prompt_builder=builder,
        ...     storage=storage,
        ...     system_prompt_name="helpful_assistant",
        ...     cache_rag_results=True
        ... )
    """
    manager = cls(
        llm=llm,
        prompt_builder=prompt_builder,
        storage=storage,
        metadata=metadata,
        middleware=middleware,
        cache_rag_results=cache_rag_results,
        reuse_rag_on_branch=reuse_rag_on_branch,
        conversation_id=conversation_id,
    )

    # Initialize with system prompt if provided
    if system_prompt_name:
        await manager.add_message(
            prompt_name=system_prompt_name,
            params=system_params,
            role="system",
        )

    return manager
add_message async
add_message(
    role: str,
    content: str | None = None,
    prompt_name: str | None = None,
    params: Dict[str, Any] | None = None,
    include_rag: bool = True,
    rag_configs: List[Dict[str, Any]] | None = None,
    metadata: Dict[str, Any] | None = None,
    name: str | None = None,
    tool_call_id: str | None = None,
) -> ConversationNode

Add a message to the current conversation node.

Either content or prompt_name must be provided. If using a prompt with RAG configuration, the RAG searches will be executed and results will be automatically inserted into the prompt.

Prompt rendering (template variables, RAG) is only applied for "system" and "user" roles. Other roles ("tool", "assistant", "function") are stored directly with no prompt processing.

Parameters:

Name Type Description Default
role str

Message role — "system", "user", "assistant", "tool", or "function".

required
content str | None

Direct message content (if not using prompt)

None
prompt_name str | None

Name of prompt template to render (system/user only)

None
params Dict[str, Any] | None

Parameters for prompt rendering

None
include_rag bool

Whether to execute RAG searches for prompts

True
rag_configs List[Dict[str, Any]] | None

RAG configurations for inline content (only used when content is provided without prompt_name). Allows inline prompts to benefit from RAG enhancement.

None
metadata Dict[str, Any] | None

Optional metadata for this message node

None
name str | None

Optional name for the message — used for tool result messages to identify which tool produced the result.

None
tool_call_id str | None

Optional provider-assigned tool call ID — used for tool result messages to pair the result with the specific tool invocation (required by Anthropic and OpenAI APIs).

None

Returns:

Type Description
ConversationNode

The created ConversationNode

Raises:

Type Description
ValueError

If neither content nor prompt_name provided

Example
# Add message from prompt
await manager.add_message(
    role="user",
    prompt_name="code_question",
    params={"code": code_snippet}
)

# Add direct message
await manager.add_message(
    role="user",
    content="What is Python?"
)

# Add inline message with RAG enhancement
await manager.add_message(
    role="system",
    content="You are a helpful assistant. Use the context below.",
    rag_configs=[{
        "adapter_name": "docs",
        "query": "assistant guidelines",
        "placeholder": "CONTEXT",
        "k": 3
    }]
)

# Add tool result message
await manager.add_message(
    role="tool",
    content='{"success": true, "data": {"name": "flour"}}',
    name="add_bank_record",
)
Note

RAG Caching Behavior:

If cache_rag_results=True and reuse_rag_on_branch=True were set during ConversationManager creation, this method will:

  1. Check if the same prompt+role was used elsewhere in the tree
  2. Check if the RAG query parameters match (via query hash)
  3. Reuse cached RAG results if found (no re-search!)
  4. Store new RAG results if not found

This is particularly useful when exploring conversation branches, as you can avoid redundant searches for the same information.

See Also

complete: Get LLM response after adding message get_rag_metadata: Retrieve RAG metadata from a node

Source code in packages/llm/src/dataknobs_llm/conversations/manager.py
async def add_message(
    self,
    role: str,
    content: str | None = None,
    prompt_name: str | None = None,
    params: Dict[str, Any] | None = None,
    include_rag: bool = True,
    rag_configs: List[Dict[str, Any]] | None = None,
    metadata: Dict[str, Any] | None = None,
    name: str | None = None,
    tool_call_id: str | None = None,
) -> ConversationNode:
    """Add a message to the current conversation node.

    Either content or prompt_name must be provided. If using a prompt
    with RAG configuration, the RAG searches will be executed and results
    will be automatically inserted into the prompt.

    Prompt rendering (template variables, RAG) is only applied for
    ``"system"`` and ``"user"`` roles.  Other roles (``"tool"``,
    ``"assistant"``, ``"function"``) are stored directly with no
    prompt processing.

    Args:
        role: Message role — ``"system"``, ``"user"``, ``"assistant"``,
            ``"tool"``, or ``"function"``.
        content: Direct message content (if not using prompt)
        prompt_name: Name of prompt template to render (system/user only)
        params: Parameters for prompt rendering
        include_rag: Whether to execute RAG searches for prompts
        rag_configs: RAG configurations for inline content (only used when
                    content is provided without prompt_name). Allows inline
                    prompts to benefit from RAG enhancement.
        metadata: Optional metadata for this message node
        name: Optional name for the message — used for tool result
            messages to identify which tool produced the result.
        tool_call_id: Optional provider-assigned tool call ID — used for
            tool result messages to pair the result with the specific
            tool invocation (required by Anthropic and OpenAI APIs).

    Returns:
        The created ConversationNode

    Raises:
        ValueError: If neither content nor prompt_name provided

    Example:
        ```python
        # Add message from prompt
        await manager.add_message(
            role="user",
            prompt_name="code_question",
            params={"code": code_snippet}
        )

        # Add direct message
        await manager.add_message(
            role="user",
            content="What is Python?"
        )

        # Add inline message with RAG enhancement
        await manager.add_message(
            role="system",
            content="You are a helpful assistant. Use the context below.",
            rag_configs=[{
                "adapter_name": "docs",
                "query": "assistant guidelines",
                "placeholder": "CONTEXT",
                "k": 3
            }]
        )

        # Add tool result message
        await manager.add_message(
            role="tool",
            content='{"success": true, "data": {"name": "flour"}}',
            name="add_bank_record",
        )
        ```

    Note:
        **RAG Caching Behavior**:

        If `cache_rag_results=True` and `reuse_rag_on_branch=True` were
        set during ConversationManager creation, this method will:

        1. Check if the same prompt+role was used elsewhere in the tree
        2. Check if the RAG query parameters match (via query hash)
        3. Reuse cached RAG results if found (no re-search!)
        4. Store new RAG results if not found

        This is particularly useful when exploring conversation branches,
        as you can avoid redundant searches for the same information.

    See Also:
        complete: Get LLM response after adding message
        get_rag_metadata: Retrieve RAG metadata from a node
    """
    if not content and not prompt_name:
        raise ValueError("Either content or prompt_name must be provided")

    # Prompt rendering only applies to system/user roles.
    # Other roles (tool, assistant, function) are stored directly.
    rag_metadata_to_store = None
    if prompt_name:
        params = params or {}

        # Check if we should try to reuse cached RAG
        cached_rag = None
        if self.reuse_rag_on_branch and include_rag:
            cached_rag = await self._find_cached_rag(prompt_name, role, params)

        if role == "system":
            result = await self.prompt_builder.render_system_prompt(
                prompt_name,
                params=params,
                include_rag=include_rag,
                return_rag_metadata=self.cache_rag_results,
                cached_rag=cached_rag,
            )
        elif role == "user":
            result = await self.prompt_builder.render_user_prompt(
                prompt_name,
                params=params,
                include_rag=include_rag,
                return_rag_metadata=self.cache_rag_results,
                cached_rag=cached_rag,
            )
        else:
            raise ValueError(f"Cannot render prompt for role '{role}'")

        content = result.content

        # Store RAG metadata if caching is enabled and metadata was captured
        if self.cache_rag_results and result.rag_metadata:
            rag_metadata_to_store = result.rag_metadata

    elif content and not prompt_name and role in ("system", "user"):
        # Render inline content through the prompt builder so that
        # template variables (e.g. {{current_date}}) are always resolved,
        # and RAG enhancement is applied when rag_configs are present.
        params = params or {}
        if role == "system":
            result = await self.prompt_builder.render_inline_system_prompt(
                content,
                params=params,
                rag_configs=rag_configs,
                include_rag=include_rag and bool(rag_configs),
                return_rag_metadata=self.cache_rag_results,
            )
        else:
            result = await self.prompt_builder.render_inline_user_prompt(
                content,
                params=params,
                rag_configs=rag_configs,
                include_rag=include_rag and bool(rag_configs),
                return_rag_metadata=self.cache_rag_results,
            )

        if result:
            content = result.content
            if self.cache_rag_results and result.rag_metadata:
                rag_metadata_to_store = result.rag_metadata

    # Create message
    message = LLMMessage(
        role=role, content=content, name=name, tool_call_id=tool_call_id,
    )

    # Prepare node metadata
    node_metadata = metadata or {}
    if rag_metadata_to_store:
        node_metadata["rag_metadata"] = rag_metadata_to_store

    # Initialize state if this is the first message
    if self.state is None:
        conversation_id = self._conversation_id or str(uuid.uuid4())
        root_node = ConversationNode(
            message=message,
            node_id="",
            prompt_name=prompt_name,
            metadata=node_metadata,
        )
        tree = Tree(root_node)
        self.state = ConversationState(
            conversation_id=conversation_id,
            message_tree=tree,
            current_node_id="",
            metadata=self._initial_metadata,
        )
    else:
        # Add as child of current node
        current_tree_node = self.state.get_current_node()
        if current_tree_node is None:
            raise ValueError(f"Current node '{self.state.current_node_id}' not found")

        # Create new tree node
        new_tree_node = Tree(
            ConversationNode(
                message=message,
                node_id="",  # Will be calculated after adding to tree
                prompt_name=prompt_name,
                metadata=node_metadata,
            )
        )

        # Add to tree
        current_tree_node.add_child(new_tree_node)

        # Calculate and set node_id
        node_id = calculate_node_id(new_tree_node)
        new_tree_node.data.node_id = node_id

        # Move current position to new node
        self.state.current_node_id = node_id

    # Update timestamp
    self.state.updated_at = datetime.now()

    # Persist
    await self._save_state()

    return self.state.get_current_node().data
complete async
complete(
    branch_name: str | None = None,
    metadata: Dict[str, Any] | None = None,
    llm_config_overrides: Dict[str, Any] | None = None,
    system_prompt_override: str | None = None,
    tools: list[Any] | None = None,
    **llm_kwargs: Any,
) -> LLMResponse

Get LLM completion and add as child of current node.

This method: 1. Gets conversation history from root to current node 2. Optionally overrides the system prompt for this call only 3. Executes middleware (pre-LLM) 4. Calls LLM with history (and tools if provided) 5. Executes middleware (post-LLM) 6. Adds assistant response as child of current node 7. Updates current position to new node 8. Persists to storage

Parameters:

Name Type Description Default
branch_name str | None

Optional human-readable label for this branch

None
metadata Dict[str, Any] | None

Optional metadata for the assistant message node

None
llm_config_overrides Dict[str, Any] | None

Optional dict to override LLM config fields for this request only. Supported fields: model, temperature, max_tokens, top_p, stop_sequences, seed.

None
system_prompt_override str | None

Optional system prompt to use for this completion only. Replaces the first system message in the message list without mutating the conversation tree. Used by reasoning strategies to inject stage-specific context.

None
tools list[Any] | None

Optional list of tools available for this completion. Forwarded to the LLM provider's complete() method for consistent tool handling across all providers.

None
**llm_kwargs Any

Additional arguments for LLM.complete()

{}

Returns:

Type Description
LLMResponse

LLM response with content, usage, and cost information

Raises:

Type Description
ValueError

If conversation has no messages yet

Example
# Get response
result = await manager.complete()
print(result.content)
print(f"Cost: ${result.cost_usd:.4f}")

# Create labeled branch
result = await manager.complete(branch_name="alternative-answer")

# With LLM parameters
result = await manager.complete(temperature=0.9, max_tokens=500)

# With config overrides (switch model per-request)
result = await manager.complete(
    llm_config_overrides={"model": "gpt-4-turbo", "temperature": 0.9}
)

# With system prompt override (used by reasoning strategies)
result = await manager.complete(
    system_prompt_override="You are a wizard assistant at step 2..."
)

# With tools
result = await manager.complete(tools=[search_tool, calc_tool])
Note

Middleware Execution Order (Onion Model):

  • Pre-LLM: middleware[0] → middleware[1] → ... → middleware[N]
  • LLM call happens
  • Post-LLM: middleware[N] → ... → middleware[1] → middleware[0]

This "onion" pattern ensures that middleware wraps around the LLM call symmetrically. For example, if middleware[0] starts a timer in process_request(), it will be the last to run in process_response() and can log the total elapsed time.

Automatic Cost Tracking:

The response includes cost_usd (this call) and cumulative_cost_usd (total conversation cost) if the LLM provider returns usage statistics.

See Also

stream_complete: Streaming version for real-time output add_message: Add user/system message before calling complete switch_to_node: Navigate to different branch before completing

Source code in packages/llm/src/dataknobs_llm/conversations/manager.py
async def complete(
    self,
    branch_name: str | None = None,
    metadata: Dict[str, Any] | None = None,
    llm_config_overrides: Dict[str, Any] | None = None,
    system_prompt_override: str | None = None,
    tools: list[Any] | None = None,
    **llm_kwargs: Any,
) -> LLMResponse:
    """Get LLM completion and add as child of current node.

    This method:
    1. Gets conversation history from root to current node
    2. Optionally overrides the system prompt for this call only
    3. Executes middleware (pre-LLM)
    4. Calls LLM with history (and tools if provided)
    5. Executes middleware (post-LLM)
    6. Adds assistant response as child of current node
    7. Updates current position to new node
    8. Persists to storage

    Args:
        branch_name: Optional human-readable label for this branch
        metadata: Optional metadata for the assistant message node
        llm_config_overrides: Optional dict to override LLM config fields
            for this request only. Supported fields: model, temperature,
            max_tokens, top_p, stop_sequences, seed.
        system_prompt_override: Optional system prompt to use for this
            completion only. Replaces the first system message in the
            message list without mutating the conversation tree. Used by
            reasoning strategies to inject stage-specific context.
        tools: Optional list of tools available for this completion.
            Forwarded to the LLM provider's complete() method for
            consistent tool handling across all providers.
        **llm_kwargs: Additional arguments for LLM.complete()

    Returns:
        LLM response with content, usage, and cost information

    Raises:
        ValueError: If conversation has no messages yet

    Example:
        ```python
        # Get response
        result = await manager.complete()
        print(result.content)
        print(f"Cost: ${result.cost_usd:.4f}")

        # Create labeled branch
        result = await manager.complete(branch_name="alternative-answer")

        # With LLM parameters
        result = await manager.complete(temperature=0.9, max_tokens=500)

        # With config overrides (switch model per-request)
        result = await manager.complete(
            llm_config_overrides={"model": "gpt-4-turbo", "temperature": 0.9}
        )

        # With system prompt override (used by reasoning strategies)
        result = await manager.complete(
            system_prompt_override="You are a wizard assistant at step 2..."
        )

        # With tools
        result = await manager.complete(tools=[search_tool, calc_tool])
        ```

    Note:
        **Middleware Execution Order** (Onion Model):

        - Pre-LLM: middleware[0] → middleware[1] → ... → middleware[N]
        - LLM call happens
        - Post-LLM: middleware[N] → ... → middleware[1] → middleware[0]

        This "onion" pattern ensures that middleware wraps around the LLM
        call symmetrically. For example, if middleware[0] starts a timer
        in `process_request()`, it will be the last to run in
        `process_response()` and can log the total elapsed time.

        **Automatic Cost Tracking**:

        The response includes `cost_usd` (this call) and `cumulative_cost_usd`
        (total conversation cost) if the LLM provider returns usage statistics.

    See Also:
        stream_complete: Streaming version for real-time output
        add_message: Add user/system message before calling complete
        switch_to_node: Navigate to different branch before completing
    """
    messages = self._prepare_completion(system_prompt_override)
    messages = await self._run_pre_middleware(messages)

    response = await self.llm.complete(
        messages,
        config_overrides=llm_config_overrides,
        tools=tools,
        **llm_kwargs,
    )

    return await self._finalize_completion(
        response, branch_name, metadata, llm_config_overrides,
        system_prompt_override=system_prompt_override,
    )
switch_to_node async
switch_to_node(node_id: str) -> None

Switch current position to a different node in the tree.

This allows exploring different branches or backtracking in the conversation.

Parameters:

Name Type Description Default
node_id str

Target node ID (dot-delimited, e.g., "0.1" or "")

required

Raises:

Type Description
ValueError

If node_id not found in tree

Example
Go back to first user message

await manager.switch_to_node("0")

Create alternative response

result = await manager.complete(branch_name="alternative")

Go back to root

await manager.switch_to_node("")

Source code in packages/llm/src/dataknobs_llm/conversations/manager.py
async def switch_to_node(self, node_id: str) -> None:
    """Switch current position to a different node in the tree.

    This allows exploring different branches or backtracking in the conversation.

    Args:
        node_id: Target node ID (dot-delimited, e.g., "0.1" or "")

    Raises:
        ValueError: If node_id not found in tree

    Example:
        >>> # Go back to first user message
        >>> await manager.switch_to_node("0")
        >>>
        >>> # Create alternative response
        >>> result = await manager.complete(branch_name="alternative")
        >>>
        >>> # Go back to root
        >>> await manager.switch_to_node("")
    """
    if not self.state:
        raise ValueError("No conversation state")

    # Verify node exists
    target_node = get_node_by_id(self.state.message_tree, node_id)
    if target_node is None:
        raise ValueError(f"Node '{node_id}' not found in conversation tree")

    # Update current position
    self.state.current_node_id = node_id
    self.state.updated_at = datetime.now()

    # Persist
    await self._save_state()
branch_from async
branch_from(sibling_node_id: str) -> None

Position tree so the next message becomes a sibling of the given node.

Navigates to the parent of the specified node. The next add_message() or complete() call will create a new child of that parent, effectively branching the conversation from the same point.

Parameters:

Name Type Description Default
sibling_node_id str

ID of the node whose parent we should navigate to.

required

Raises:

Type Description
ValueError

If no conversation state, node not found, or node is the root (root has no parent to branch from).

Example
Branch from the same parent as node "0.0"

await manager.branch_from("0.0")

Now at node "0"; next complete() creates "0.1"

response = await manager.complete()

Source code in packages/llm/src/dataknobs_llm/conversations/manager.py
async def branch_from(self, sibling_node_id: str) -> None:
    """Position tree so the next message becomes a sibling of the given node.

    Navigates to the parent of the specified node. The next ``add_message()``
    or ``complete()`` call will create a new child of that parent, effectively
    branching the conversation from the same point.

    Args:
        sibling_node_id: ID of the node whose parent we should navigate to.

    Raises:
        ValueError: If no conversation state, node not found, or node is
            the root (root has no parent to branch from).

    Example:
        >>> # Branch from the same parent as node "0.0"
        >>> await manager.branch_from("0.0")
        >>> # Now at node "0"; next complete() creates "0.1"
        >>> response = await manager.complete()
    """
    if not self.state:
        raise ValueError("No conversation state")

    sibling_node = get_node_by_id(self.state.message_tree, sibling_node_id)
    if sibling_node is None:
        raise ValueError(
            f"Node '{sibling_node_id}' not found in conversation tree"
        )

    if sibling_node.parent is None:
        raise ValueError(
            f"Node '{sibling_node_id}' is the root and has no parent to "
            "branch from"
        )

    parent_id = calculate_node_id(sibling_node.parent)
    await self.switch_to_node(parent_id)
get_rag_metadata
get_rag_metadata(node_id: str | None = None) -> Dict[str, Any] | None

Get RAG metadata from a conversation node.

This method retrieves the cached RAG metadata from a specific node, which includes information about RAG searches executed during prompt rendering (queries, results, query hashes, etc.).

Parameters:

Name Type Description Default
node_id str | None

Node ID to retrieve metadata from (default: current node)

None

Returns:

Type Description
Dict[str, Any] | None

RAG metadata dictionary if present, None otherwise. Structure:

Dict[str, Any] | None

```python

Dict[str, Any] | None

{ "PLACEHOLDER_NAME": { "query": "resolved RAG query", "query_hash": "hash of adapter+query", "results": [...], # Search results "adapter_name": "name of RAG adapter used" }, ... # One entry per RAG placeholder

Dict[str, Any] | None

}

Dict[str, Any] | None

```

Raises:

Type Description
ValueError

If node_id not found in conversation tree

Example
# Get RAG metadata from current node
metadata = manager.get_rag_metadata()
if metadata:
    for placeholder, rag_data in metadata.items():
        print(f"Placeholder: {placeholder}")
        print(f"  Query: {rag_data['query']}")
        print(f"  Adapter: {rag_data['adapter_name']}")
        print(f"  Results: {len(rag_data['results'])} items")
        print(f"  Hash: {rag_data['query_hash']}")

# Get RAG metadata from specific node
metadata = manager.get_rag_metadata(node_id="0.1")

# Check if RAG was used for a message
if manager.get_rag_metadata():
    print("This message used RAG-enhanced prompt")
else:
    print("This message used direct content")
Note

RAG metadata is only available if cache_rag_results=True was set during ConversationManager creation. This metadata is useful for debugging RAG behavior, understanding what information was retrieved, and implementing RAG result caching across branches.

See Also

add_message: Method that executes RAG and stores metadata reuse_rag_on_branch: Parameter enabling RAG cache reuse

Source code in packages/llm/src/dataknobs_llm/conversations/manager.py
def get_rag_metadata(self, node_id: str | None = None) -> Dict[str, Any] | None:
    """Get RAG metadata from a conversation node.

    This method retrieves the cached RAG metadata from a specific node,
    which includes information about RAG searches executed during prompt
    rendering (queries, results, query hashes, etc.).

    Args:
        node_id: Node ID to retrieve metadata from (default: current node)

    Returns:
        RAG metadata dictionary if present, None otherwise. Structure:

        ```python
        {
            "PLACEHOLDER_NAME": {
                "query": "resolved RAG query",
                "query_hash": "hash of adapter+query",
                "results": [...],  # Search results
                "adapter_name": "name of RAG adapter used"
            },
            ...  # One entry per RAG placeholder
        }
        ```

    Raises:
        ValueError: If node_id not found in conversation tree

    Example:
        ```python
        # Get RAG metadata from current node
        metadata = manager.get_rag_metadata()
        if metadata:
            for placeholder, rag_data in metadata.items():
                print(f"Placeholder: {placeholder}")
                print(f"  Query: {rag_data['query']}")
                print(f"  Adapter: {rag_data['adapter_name']}")
                print(f"  Results: {len(rag_data['results'])} items")
                print(f"  Hash: {rag_data['query_hash']}")

        # Get RAG metadata from specific node
        metadata = manager.get_rag_metadata(node_id="0.1")

        # Check if RAG was used for a message
        if manager.get_rag_metadata():
            print("This message used RAG-enhanced prompt")
        else:
            print("This message used direct content")
        ```

    Note:
        RAG metadata is only available if `cache_rag_results=True` was
        set during ConversationManager creation. This metadata is useful
        for debugging RAG behavior, understanding what information was
        retrieved, and implementing RAG result caching across branches.

    See Also:
        add_message: Method that executes RAG and stores metadata
        reuse_rag_on_branch: Parameter enabling RAG cache reuse
    """
    if not self.state:
        return None

    # Default to current node
    if node_id is None:
        node_id = self.state.current_node_id

    # Get node
    tree_node = get_node_by_id(self.state.message_tree, node_id)
    if tree_node is None:
        raise ValueError(f"Node '{node_id}' not found in conversation tree")

    # Return RAG metadata if present
    return tree_node.data.metadata.get("rag_metadata")

Conversation Types

ConversationNode

dataknobs_llm.conversations.ConversationNode dataclass

ConversationNode(
    message: LLMMessage,
    node_id: str,
    timestamp: datetime = datetime.now(),
    prompt_name: str | None = None,
    branch_name: str | None = None,
    metadata: Dict[str, Any] = dict(),
)

Data stored in each conversation tree node.

Each node represents a single message (system, user, or assistant) in the conversation. The tree structure allows for branching conversations where multiple alternative messages can be explored.

Attributes:

Name Type Description
message LLMMessage

The LLM message (role + content)

node_id str

Dot-delimited child positions from root (e.g., "0.1.2")

timestamp datetime

When this message was created

prompt_name str | None

Optional name of prompt template used to generate this

branch_name str | None

Optional human-readable label for this branch

metadata Dict[str, Any]

Additional metadata (usage stats, model info, etc.)

Example

node = ConversationNode( ... message=LLMMessage(role="user", content="Hello"), ... node_id="0.1", ... timestamp=datetime.now(), ... prompt_name="greeting", ... branch_name="polite-variant" ... )

Methods:

Name Description
from_dict

Create node from dictionary.

to_dict

Convert node to dictionary for storage.

Functions

from_dict classmethod
from_dict(data: Dict[str, Any]) -> ConversationNode

Create node from dictionary.

Source code in packages/llm/src/dataknobs_llm/conversations/storage.py
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "ConversationNode":
    """Create node from dictionary."""
    msg_data = data.get("message", {})
    return cls(
        message=LLMMessage.from_dict(msg_data),
        node_id=data["node_id"],
        timestamp=datetime.fromisoformat(data["timestamp"]),
        prompt_name=data.get("prompt_name"),
        branch_name=data.get("branch_name"),
        metadata=data.get("metadata", {})
    )
to_dict
to_dict() -> Dict[str, Any]

Convert node to dictionary for storage.

Source code in packages/llm/src/dataknobs_llm/conversations/storage.py
def to_dict(self) -> Dict[str, Any]:
    """Convert node to dictionary for storage."""
    return {
        "message": self.message.to_dict(),
        "node_id": self.node_id,
        "timestamp": self.timestamp.isoformat(),
        "prompt_name": self.prompt_name,
        "branch_name": self.branch_name,
        "metadata": self.metadata
    }

ConversationState

dataknobs_llm.conversations.ConversationState dataclass

ConversationState(
    conversation_id: str,
    message_tree: Tree,
    current_node_id: str = "",
    metadata: Dict[str, Any] = dict(),
    created_at: datetime = datetime.now(),
    updated_at: datetime = datetime.now(),
    schema_version: str = SCHEMA_VERSION,
)

State of a conversation with tree-based branching support.

This replaces the linear message history with a tree structure that supports multiple branches (alternative conversation paths).

Attributes:

Name Type Description
conversation_id str

Unique conversation identifier

message_tree Tree

Root of conversation tree (Tree[ConversationNode])

current_node_id str

ID of current position in tree (dot-delimited)

metadata Dict[str, Any]

Additional conversation metadata

created_at datetime

Conversation creation timestamp

updated_at datetime

Last update timestamp

schema_version str

Version of the storage schema used

Example

Create conversation with system message

root_node = ConversationNode( ... message=LLMMessage(role="system", content="You are helpful"), ... node_id="" ... ) tree = Tree(root_node) state = ConversationState( ... conversation_id="conv-123", ... message_tree=tree, ... current_node_id="", ... metadata={"user_id": "alice"} ... )

Add user message

user_node = ConversationNode( ... message=LLMMessage(role="user", content="Hello"), ... node_id="0" ... ) tree.add_child(Tree(user_node)) state.current_node_id = "0"

Methods:

Name Description
__post_init__

Initialize transient runtime state.

from_dict

Create state from dictionary.

get_all_nodes

Get all ConversationNode objects across all branches.

get_current_messages

Get messages from root to current position (for LLM).

get_current_node

Get the current tree node.

get_current_nodes

Get ConversationNode objects from root to current position.

to_dict

Convert state to dictionary for storage.

to_export_dict

Export the full conversation as a flat message list.

Functions

__post_init__
__post_init__() -> None

Initialize transient runtime state.

turn_data is per-turn cross-middleware communication state. Populated by the bot layer before each LLM call and cleared after the turn completes. Stored as a plain attribute (not a dataclass field) so it is invisible to dataclasses.asdict() and dataclasses.fields() — only the custom to_dict() and from_dict() matter for serialization, and they deliberately exclude it.

Source code in packages/llm/src/dataknobs_llm/conversations/storage.py
def __post_init__(self) -> None:
    """Initialize transient runtime state.

    ``turn_data`` is per-turn cross-middleware communication state.
    Populated by the bot layer before each LLM call and cleared
    after the turn completes.  Stored as a plain attribute (not a
    dataclass field) so it is invisible to ``dataclasses.asdict()``
    and ``dataclasses.fields()`` — only the custom ``to_dict()``
    and ``from_dict()`` matter for serialization, and they
    deliberately exclude it.
    """
    self.turn_data: Dict[str, Any] = {}
from_dict classmethod
from_dict(data: Dict[str, Any]) -> ConversationState

Create state from dictionary.

Reconstructs the tree from nodes and edges. Handles schema version migration if needed.

Source code in packages/llm/src/dataknobs_llm/conversations/storage.py
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "ConversationState":
    """Create state from dictionary.

    Reconstructs the tree from nodes and edges.
    Handles schema version migration if needed.
    """
    # Check schema version
    stored_version = data.get("schema_version", "0.0.0")  # Default to 0.0.0 if missing

    # Apply migrations if needed
    if stored_version != SCHEMA_VERSION:
        logger.info(
            f"Migrating conversation {data['conversation_id']} "
            f"from schema {stored_version} to {SCHEMA_VERSION}"
        )
        data = cls._migrate_schema(data, stored_version, SCHEMA_VERSION)

    # Create nodes indexed by ID
    nodes_by_id: Dict[str, ConversationNode] = {}
    for node_data in data["nodes"]:
        node = ConversationNode.from_dict(node_data)
        nodes_by_id[node.node_id] = node

    # Find root (node with empty ID)
    root_node = nodes_by_id.get("")
    if root_node is None:
        # Try to find node with no parent in edges
        child_ids = {edge[1] for edge in data["edges"]}
        parent_ids = {edge[0] for edge in data["edges"]}
        root_ids = parent_ids - child_ids
        if root_ids:
            root_node = nodes_by_id[root_ids.pop()]
        else:
            # Fallback: first node
            root_node = next(iter(nodes_by_id.values()))

    tree = Tree(root_node)
    tree_nodes_by_id = {"": tree}  # Map node_id -> Tree node

    # Build tree by adding edges
    for parent_id, child_id in data["edges"]:
        if parent_id in tree_nodes_by_id:
            parent_tree_node = tree_nodes_by_id[parent_id]
            child_node = nodes_by_id[child_id]
            child_tree_node = parent_tree_node.add_child(Tree(child_node))
            tree_nodes_by_id[child_id] = child_tree_node

    return cls(
        conversation_id=data["conversation_id"],
        message_tree=tree,
        current_node_id=data["current_node_id"],
        metadata=data["metadata"],
        created_at=datetime.fromisoformat(data["created_at"]),
        updated_at=datetime.fromisoformat(data["updated_at"]),
        schema_version=SCHEMA_VERSION  # Always use current version after migration
    )
get_all_nodes
get_all_nodes() -> List[ConversationNode]

Get all ConversationNode objects across all branches.

Unlike :meth:get_current_nodes (which returns only the active root-to-leaf path), this returns every node in the tree sorted by timestamp. Use this for full interaction logging, auditing, or conversation export where abandoned branches (e.g. collection-mode iterations) must be visible.

Returns:

Type Description
List[ConversationNode]

All ConversationNode objects in chronological order.

Source code in packages/llm/src/dataknobs_llm/conversations/storage.py
def get_all_nodes(self) -> List["ConversationNode"]:
    """Get all ConversationNode objects across all branches.

    Unlike :meth:`get_current_nodes` (which returns only the active
    root-to-leaf path), this returns **every** node in the tree sorted
    by timestamp.  Use this for full interaction logging, auditing, or
    conversation export where abandoned branches (e.g. collection-mode
    iterations) must be visible.

    Returns:
        All ``ConversationNode`` objects in chronological order.
    """
    all_tree_nodes = self.message_tree.find_nodes(
        lambda n: True, traversal="bfs",  # noqa: ARG005
    )
    nodes = [
        tn.data
        for tn in all_tree_nodes
        if isinstance(tn.data, ConversationNode)
    ]
    nodes.sort(key=lambda n: n.timestamp)
    return nodes
get_current_messages
get_current_messages() -> List[LLMMessage]

Get messages from root to current position (for LLM).

Source code in packages/llm/src/dataknobs_llm/conversations/storage.py
def get_current_messages(self) -> List[LLMMessage]:
    """Get messages from root to current position (for LLM)."""
    return get_messages_for_llm(self.message_tree, self.current_node_id)
get_current_node
get_current_node() -> Tree | None

Get the current tree node.

Source code in packages/llm/src/dataknobs_llm/conversations/storage.py
def get_current_node(self) -> Tree | None:
    """Get the current tree node."""
    return get_node_by_id(self.message_tree, self.current_node_id)
get_current_nodes
get_current_nodes() -> List[ConversationNode]

Get ConversationNode objects from root to current position.

Like get_current_messages(), but returns full ConversationNode objects including timestamps, node_ids, and metadata.

Source code in packages/llm/src/dataknobs_llm/conversations/storage.py
def get_current_nodes(self) -> List["ConversationNode"]:
    """Get ConversationNode objects from root to current position.

    Like get_current_messages(), but returns full ConversationNode objects
    including timestamps, node_ids, and metadata.
    """
    return get_nodes_for_path(self.message_tree, self.current_node_id)
to_dict
to_dict() -> Dict[str, Any]

Convert state to dictionary for storage.

The tree is serialized as a list of edges (parent_id, child_id, node_data). Includes schema_version for backward compatibility.

Source code in packages/llm/src/dataknobs_llm/conversations/storage.py
def to_dict(self) -> Dict[str, Any]:
    """Convert state to dictionary for storage.

    The tree is serialized as a list of edges (parent_id, child_id, node_data).
    Includes schema_version for backward compatibility.
    """
    # Collect all nodes and their data
    nodes = []
    edges = []

    all_nodes = self.message_tree.find_nodes(lambda n: True, traversal="bfs")  # noqa: ARG005
    for tree_node in all_nodes:
        if isinstance(tree_node.data, ConversationNode):
            nodes.append(tree_node.data.to_dict())

            # Add edge to parent (if not root)
            if tree_node.parent is not None:
                parent_id = calculate_node_id(tree_node.parent)
                child_id = tree_node.data.node_id
                edges.append([parent_id, child_id])

    return {
        "schema_version": self.schema_version,
        "conversation_id": self.conversation_id,
        "nodes": nodes,
        "edges": edges,
        "current_node_id": self.current_node_id,
        "metadata": self.metadata,
        "created_at": self.created_at.isoformat(),
        "updated_at": self.updated_at.isoformat()
    }
to_export_dict
to_export_dict() -> Dict[str, Any]

Export the full conversation as a flat message list.

Returns the conversation in a format suitable for logging, auditing, and API responses. All branches are included (not just the active path), so collection-mode iterations and other abandoned branches are visible.

Returns:

Type Description
Dict[str, Any]

Dictionary with conversation_id, metadata,

Dict[str, Any]

messages (list of serialised nodes in chronological

Dict[str, Any]

order), and message_count.

Source code in packages/llm/src/dataknobs_llm/conversations/storage.py
def to_export_dict(self) -> Dict[str, Any]:
    """Export the full conversation as a flat message list.

    Returns the conversation in a format suitable for logging, auditing,
    and API responses.  All branches are included (not just the active
    path), so collection-mode iterations and other abandoned branches
    are visible.

    Returns:
        Dictionary with ``conversation_id``, ``metadata``,
        ``messages`` (list of serialised nodes in chronological
        order), and ``message_count``.
    """
    messages = [node.to_dict() for node in self.get_all_nodes()]
    return {
        "conversation_id": self.conversation_id,
        "metadata": self.metadata,
        "messages": messages,
        "message_count": len(messages),
    }

LLMMessage

dataknobs_llm.llm.LLMMessage dataclass

LLMMessage(
    role: str,
    content: str,
    name: str | None = None,
    tool_call_id: str | None = None,
    function_call: Dict[str, Any] | None = None,
    tool_calls: list[ToolCall] | None = None,
    metadata: Dict[str, Any] = dict(),
)

Represents a message in LLM conversation.

Standard message format used across all providers. Messages are the fundamental unit of LLM interactions, containing role-based content for multi-turn conversations.

Attributes:

Name Type Description
role str

Message role - 'system', 'user', 'assistant', 'tool', or 'function'

content str

Message content text

name str | None

Optional name for function/tool messages or multi-user scenarios

tool_call_id str | None

Provider-assigned ID for pairing tool results with invocations (required by OpenAI and Anthropic APIs)

function_call Dict[str, Any] | None

Function call data for tool-using models

tool_calls list[ToolCall] | None

Tool calls made by the assistant in this message

metadata Dict[str, Any]

Additional metadata (timestamps, IDs, etc.)

Example
from dataknobs_llm.llm.base import LLMMessage

# System message
system_msg = LLMMessage(
    role="system",
    content="You are a helpful coding assistant."
)

# User message
user_msg = LLMMessage(
    role="user",
    content="How do I reverse a list in Python?"
)

# Assistant message
assistant_msg = LLMMessage(
    role="assistant",
    content="Use the reverse() method or [::-1] slicing."
)

# Function result message
function_msg = LLMMessage(
    role="function",
    name="search_docs",
    content='{"result": "Found 3 examples"}'
)

# Build conversation
messages = [system_msg, user_msg, assistant_msg]

Methods:

Name Description
from_dict

Create an LLMMessage from a dictionary.

to_dict

Convert to canonical dictionary format for storage/interchange.

Functions

from_dict classmethod
from_dict(data: Dict[str, Any]) -> LLMMessage

Create an LLMMessage from a dictionary.

Handles both the new canonical format (with tool_calls as list of dicts) and the legacy format (without tool_calls/function_call).

Parameters:

Name Type Description Default
data Dict[str, Any]

Dictionary with role (required), content, and optional name, tool_calls, function_call, metadata.

required

Returns:

Type Description
LLMMessage

LLMMessage instance.

Source code in packages/llm/src/dataknobs_llm/llm/base.py
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "LLMMessage":
    """Create an LLMMessage from a dictionary.

    Handles both the new canonical format (with ``tool_calls`` as list of
    dicts) and the legacy format (without ``tool_calls``/``function_call``).

    Args:
        data: Dictionary with ``role`` (required), ``content``, and
            optional ``name``, ``tool_calls``, ``function_call``, ``metadata``.

    Returns:
        LLMMessage instance.
    """
    tool_calls = None
    if data.get("tool_calls"):
        tool_calls = [ToolCall.from_dict(tc) for tc in data["tool_calls"]]
    return cls(
        role=data["role"],
        content=data.get("content", ""),
        name=data.get("name"),
        tool_call_id=data.get("tool_call_id"),
        tool_calls=tool_calls,
        function_call=data.get("function_call"),
        metadata=data.get("metadata", {}),
    )
to_dict
to_dict() -> Dict[str, Any]

Convert to canonical dictionary format for storage/interchange.

Only includes non-None/non-empty optional fields to keep output clean.

Returns:

Type Description
Dict[str, Any]

Dictionary with role, content, and any present optional fields.

Source code in packages/llm/src/dataknobs_llm/llm/base.py
def to_dict(self) -> Dict[str, Any]:
    """Convert to canonical dictionary format for storage/interchange.

    Only includes non-None/non-empty optional fields to keep output clean.

    Returns:
        Dictionary with ``role``, ``content``, and any present optional fields.
    """
    d: Dict[str, Any] = {
        "role": self.role,
        "content": self.content,
    }
    if self.name is not None:
        d["name"] = self.name
    if self.tool_call_id is not None:
        d["tool_call_id"] = self.tool_call_id
    if self.tool_calls:
        d["tool_calls"] = [tc.to_dict() for tc in self.tool_calls]
    if self.function_call is not None:
        d["function_call"] = self.function_call
    if self.metadata:
        d["metadata"] = self.metadata
    return d

Storage

Abstract Storage

dataknobs_llm.conversations.ConversationStorage

Bases: ABC

Abstract storage interface for conversations.

This interface defines the contract for persisting conversation state. Implementations can use any backend (SQL, NoSQL, file, etc.).

Lifecycle

Implementations must provide a create() classmethod for configuration-driven instantiation, and may override close() to release resources such as connections or pools.

Example::

class MyStorage(ConversationStorage):
    @classmethod
    async def create(cls, config: dict[str, Any]) -> "MyStorage":
        instance = cls(...)
        return instance

    async def close(self) -> None:
        await self._pool.close()

Methods:

Name Description
save_conversation

Save conversation state (upsert).

load_conversation

Load conversation state.

delete_conversation

Delete conversation.

list_conversations

List conversations with optional filtering and sorting.

count_conversations

Return the number of conversations matching the filter.

Functions

save_conversation abstractmethod async
save_conversation(state: ConversationState) -> None

Save conversation state (upsert).

Parameters:

Name Type Description Default
state ConversationState

Conversation state to save

required
Source code in packages/llm/src/dataknobs_llm/conversations/storage.py
@abstractmethod
async def save_conversation(self, state: ConversationState) -> None:
    """Save conversation state (upsert).

    Args:
        state: Conversation state to save
    """
    pass
load_conversation abstractmethod async
load_conversation(conversation_id: str) -> ConversationState | None

Load conversation state.

Parameters:

Name Type Description Default
conversation_id str

Conversation identifier

required

Returns:

Type Description
ConversationState | None

Conversation state or None if not found

Source code in packages/llm/src/dataknobs_llm/conversations/storage.py
@abstractmethod
async def load_conversation(
    self,
    conversation_id: str
) -> ConversationState | None:
    """Load conversation state.

    Args:
        conversation_id: Conversation identifier

    Returns:
        Conversation state or None if not found
    """
    pass
delete_conversation abstractmethod async
delete_conversation(conversation_id: str) -> bool

Delete conversation.

Parameters:

Name Type Description Default
conversation_id str

Conversation identifier

required

Returns:

Type Description
bool

True if deleted, False if not found

Source code in packages/llm/src/dataknobs_llm/conversations/storage.py
@abstractmethod
async def delete_conversation(self, conversation_id: str) -> bool:
    """Delete conversation.

    Args:
        conversation_id: Conversation identifier

    Returns:
        True if deleted, False if not found
    """
    pass
list_conversations abstractmethod async
list_conversations(
    filter_metadata: Dict[str, Any] | None = None,
    limit: int = 100,
    offset: int = 0,
    sort_by: str | None = None,
    sort_order: str = "desc",
) -> List[ConversationState]

List conversations with optional filtering and sorting.

Parameters:

Name Type Description Default
filter_metadata Dict[str, Any] | None

Optional metadata filters

None
limit int

Maximum number of results

100
offset int

Offset for pagination

0
sort_by str | None

Field name to sort by (e.g. "updated_at", "created_at")

None
sort_order str

Sort direction, "asc" or "desc" (default: "desc")

'desc'

Returns:

Type Description
List[ConversationState]

List of conversation states

Source code in packages/llm/src/dataknobs_llm/conversations/storage.py
@abstractmethod
async def list_conversations(
    self,
    filter_metadata: Dict[str, Any] | None = None,
    limit: int = 100,
    offset: int = 0,
    sort_by: str | None = None,
    sort_order: str = "desc",
) -> List[ConversationState]:
    """List conversations with optional filtering and sorting.

    Args:
        filter_metadata: Optional metadata filters
        limit: Maximum number of results
        offset: Offset for pagination
        sort_by: Field name to sort by (e.g. "updated_at", "created_at")
        sort_order: Sort direction, "asc" or "desc" (default: "desc")

    Returns:
        List of conversation states
    """
    pass
count_conversations async
count_conversations(filter_metadata: Dict[str, Any] | None = None) -> int

Return the number of conversations matching the filter.

Default implementation fetches all matching conversations and counts them. Subclasses should override with an efficient backend query.

Parameters:

Name Type Description Default
filter_metadata Dict[str, Any] | None

Optional metadata filters (exact match).

None

Returns:

Type Description
int

Total number of matching conversations.

Source code in packages/llm/src/dataknobs_llm/conversations/storage.py
async def count_conversations(
    self,
    filter_metadata: Dict[str, Any] | None = None,
) -> int:
    """Return the number of conversations matching the filter.

    Default implementation fetches all matching conversations and counts
    them.  Subclasses should override with an efficient backend query.

    Args:
        filter_metadata: Optional metadata filters (exact match).

    Returns:
        Total number of matching conversations.
    """
    results = await self.list_conversations(
        filter_metadata=filter_metadata,
        limit=100_000,
    )
    return len(results)

Implementations

DataknobsConversationStorage

dataknobs_llm.conversations.DataknobsConversationStorage

DataknobsConversationStorage(backend: Any)

Bases: ConversationStorage

Conversation storage using dataknobs_data backends.

Stores conversations as Records with the tree serialized as nodes + edges. Works with any dataknobs backend (Memory, File, S3, Postgres, etc.).

The storage layer handles: - Automatic serialization/deserialization of conversation trees - Schema version migration when loading old conversations - Metadata-based filtering for listing conversations - Upsert operations (insert or update)

Attributes:

Name Type Description
backend

Dataknobs async database backend instance

Example
from dataknobs_data import database_factory
from dataknobs_llm.conversations import DataknobsConversationStorage

# Memory backend (development/testing)
db = database_factory.create(backend="memory")
storage = DataknobsConversationStorage(db)

# File backend (local persistence)
db = database_factory.create(
    backend="file",
    file_path="./conversations.jsonl"
)
storage = DataknobsConversationStorage(db)

# S3 backend (cloud storage)
db = database_factory.create(
    backend="s3",
    bucket="my-conversations",
    region="us-west-2"
)
storage = DataknobsConversationStorage(db)

# Postgres backend (production)
db = database_factory.create(
    backend="postgres",
    host="db.example.com",
    database="conversations",
    user="app",
    password="secret"
)
storage = DataknobsConversationStorage(db)

# Save conversation
await storage.save_conversation(state)

# Load conversation
state = await storage.load_conversation("conv-123")

# List user's conversations
user_convos = await storage.list_conversations(
    filter_metadata={"user_id": "alice"},
    limit=50
)

# Delete conversation
deleted = await storage.delete_conversation("conv-123")
Note

Backend Selection:

  • Memory: Fast, no persistence. Use for testing or ephemeral conversations.
  • File: Simple local persistence. Good for single-server deployments.
  • S3: Scalable cloud storage. Best for serverless or distributed systems.
  • Postgres: Full ACID guarantees. Best for production multi-server setups.

All backends support the same API, so you can switch between them by changing the database_factory configuration.

See Also

ConversationStorage: Abstract interface ConversationState: State structure being stored dataknobs_data.database_factory: Backend creation utilities

Initialize storage with dataknobs backend.

Parameters:

Name Type Description Default
backend Any

Dataknobs async database backend (AsyncMemoryDatabase, etc.)

required

Methods:

Name Description
close

Close the underlying database backend.

count_conversations

Return the number of conversations matching the filter.

create

Create storage from configuration dict.

delete_conversation

Delete conversation from backend.

delete_conversations

Delete conversations matching the given filters.

list_conversations

List conversations from backend.

load_conversation

Load conversation from backend.

save_conversation

Save conversation to backend.

search_conversations

Search conversations with content, time range, and metadata filters.

update_metadata

Update conversation metadata.

Source code in packages/llm/src/dataknobs_llm/conversations/storage.py
def __init__(self, backend: Any):
    """Initialize storage with dataknobs backend.

    Args:
        backend: Dataknobs async database backend (AsyncMemoryDatabase, etc.)
    """
    self.backend = backend
Functions
close async
close() -> None

Close the underlying database backend.

Source code in packages/llm/src/dataknobs_llm/conversations/storage.py
async def close(self) -> None:
    """Close the underlying database backend."""
    if self.backend and hasattr(self.backend, "close"):
        await self.backend.close()
count_conversations async
count_conversations(filter_metadata: Dict[str, Any] | None = None) -> int

Return the number of conversations matching the filter.

Uses a backend query without deserializing full conversation states, which is more efficient than the default implementation.

Source code in packages/llm/src/dataknobs_llm/conversations/storage.py
async def count_conversations(
    self,
    filter_metadata: Dict[str, Any] | None = None,
) -> int:
    """Return the number of conversations matching the filter.

    Uses a backend query without deserializing full conversation states,
    which is more efficient than the default implementation.
    """
    try:
        try:
            from dataknobs_data.query import Query
        except ImportError:
            raise StorageError(
                "dataknobs_data package not available. "
                "Install it to use DataknobsConversationStorage."
            ) from None

        query = Query()
        if filter_metadata:
            for key, value in filter_metadata.items():
                query.filter(f"metadata.{key}", "=", value)

        return await self.backend.count(query)

    except StorageError:
        raise
    except Exception as e:
        raise StorageError(f"Failed to count conversations: {e}") from e
create async classmethod
create(config: dict[str, Any]) -> DataknobsConversationStorage

Create storage from configuration dict.

Creates the database backend via AsyncDatabaseFactory, connects it, and returns an initialized storage instance.

Parameters:

Name Type Description Default
config dict[str, Any]

Database backend configuration. Must include a backend key (e.g. "memory", "sqlite", "postgres"). Additional keys are passed through to the factory (e.g. path, host).

required

Returns:

Type Description
DataknobsConversationStorage

Connected DataknobsConversationStorage instance.

Source code in packages/llm/src/dataknobs_llm/conversations/storage.py
@classmethod
async def create(cls, config: dict[str, Any]) -> "DataknobsConversationStorage":
    """Create storage from configuration dict.

    Creates the database backend via ``AsyncDatabaseFactory``, connects
    it, and returns an initialized storage instance.

    Args:
        config: Database backend configuration.  Must include a
            ``backend`` key (e.g. ``"memory"``, ``"sqlite"``,
            ``"postgres"``).  Additional keys are passed through to
            the factory (e.g. ``path``, ``host``).

    Returns:
        Connected ``DataknobsConversationStorage`` instance.
    """
    from dataknobs_data.factory import AsyncDatabaseFactory

    db_factory = AsyncDatabaseFactory()
    backend = db_factory.create(**config)
    await backend.connect()
    return cls(backend)
delete_conversation async
delete_conversation(conversation_id: str) -> bool

Delete conversation from backend.

Source code in packages/llm/src/dataknobs_llm/conversations/storage.py
async def delete_conversation(self, conversation_id: str) -> bool:
    """Delete conversation from backend."""
    try:
        return await self.backend.delete(conversation_id)
    except Exception as e:
        raise StorageError(f"Failed to delete conversation: {e}") from e
delete_conversations async
delete_conversations(
    content_contains: str | None = None,
    created_after: datetime | None = None,
    created_before: datetime | None = None,
    filter_metadata: Dict[str, Any] | None = None,
) -> List[str]

Delete conversations matching the given filters.

Source code in packages/llm/src/dataknobs_llm/conversations/storage.py
async def delete_conversations(
    self,
    content_contains: str | None = None,
    created_after: datetime | None = None,
    created_before: datetime | None = None,
    filter_metadata: Dict[str, Any] | None = None,
) -> List[str]:
    """Delete conversations matching the given filters."""
    if not any([content_contains, created_after, created_before, filter_metadata]):
        raise ValueError(
            "At least one filter parameter must be provided "
            "to prevent accidental deletion of all conversations."
        )

    try:
        try:
            from dataknobs_data.query import Query
        except ImportError:
            raise StorageError(
                "dataknobs_data package not available. "
                "Install it to use DataknobsConversationStorage."
            ) from None

        # Build query with time range and metadata filters (no limit).
        query = Query()

        if created_after:
            query.filter("created_at", ">=", created_after.isoformat())
        if created_before:
            query.filter("created_at", "<=", created_before.isoformat())

        if filter_metadata:
            for key, value in filter_metadata.items():
                query.filter(f"metadata.{key}", "=", value)

        results = await self.backend.search(query)
        states = [self._record_to_state(record) for record in results]

        # Post-query content filtering
        if content_contains:
            needle = content_contains.lower()
            states = [
                s for s in states
                if self._conversation_contains_text(s, needle)
            ]

        # Delete each matching conversation
        deleted_ids: List[str] = []
        for state in states:
            await self.backend.delete(state.conversation_id)
            deleted_ids.append(state.conversation_id)

        return deleted_ids

    except ValueError:
        raise
    except StorageError:
        raise
    except Exception as e:
        raise StorageError(f"Failed to delete conversations: {e}") from e
list_conversations async
list_conversations(
    filter_metadata: Dict[str, Any] | None = None,
    limit: int = 100,
    offset: int = 0,
    sort_by: str | None = None,
    sort_order: str = "desc",
) -> List[ConversationState]

List conversations from backend.

Source code in packages/llm/src/dataknobs_llm/conversations/storage.py
async def list_conversations(
    self,
    filter_metadata: Dict[str, Any] | None = None,
    limit: int = 100,
    offset: int = 0,
    sort_by: str | None = None,
    sort_order: str = "desc",
) -> List[ConversationState]:
    """List conversations from backend."""
    try:
        # Import Query for filtering
        try:
            from dataknobs_data.query import Query
        except ImportError:
            raise StorageError(
                "dataknobs_data package not available. "
                "Install it to use DataknobsConversationStorage."
            ) from None

        # Build query with metadata filters using fluent interface
        query = Query()
        query.limit(limit).offset(offset)

        if filter_metadata:
            for key, value in filter_metadata.items():
                # Add filter for metadata.key = value
                query.filter(f"metadata.{key}", "=", value)

        if sort_by:
            query.sort_by(sort_by, sort_order)

        # Search with query
        results = await self.backend.search(query)

        # Convert records to conversation states
        return [self._record_to_state(record) for record in results]

    except Exception as e:
        raise StorageError(f"Failed to list conversations: {e}") from e
load_conversation async
load_conversation(conversation_id: str) -> ConversationState | None

Load conversation from backend.

Source code in packages/llm/src/dataknobs_llm/conversations/storage.py
async def load_conversation(
    self,
    conversation_id: str
) -> ConversationState | None:
    """Load conversation from backend."""
    try:
        # Read record by ID
        record = await self.backend.read(conversation_id)
        if record is None:
            return None

        return self._record_to_state(record)

    except Exception as e:
        raise StorageError(f"Failed to load conversation: {e}") from e
save_conversation async
save_conversation(state: ConversationState) -> None

Save conversation to backend.

Source code in packages/llm/src/dataknobs_llm/conversations/storage.py
async def save_conversation(self, state: ConversationState) -> None:
    """Save conversation to backend."""
    try:
        record = self._state_to_record(state)
        # Use upsert to insert or update
        await self.backend.upsert(state.conversation_id, record)
    except Exception as e:
        raise StorageError(f"Failed to save conversation: {e}") from e
search_conversations async
search_conversations(
    content_contains: str | None = None,
    created_after: datetime | None = None,
    created_before: datetime | None = None,
    filter_metadata: Dict[str, Any] | None = None,
    limit: int = 100,
    offset: int = 0,
    sort_by: str = "updated_at",
    sort_order: str = "desc",
) -> List[ConversationState]

Search conversations with content, time range, and metadata filters.

Source code in packages/llm/src/dataknobs_llm/conversations/storage.py
async def search_conversations(
    self,
    content_contains: str | None = None,
    created_after: datetime | None = None,
    created_before: datetime | None = None,
    filter_metadata: Dict[str, Any] | None = None,
    limit: int = 100,
    offset: int = 0,
    sort_by: str = "updated_at",
    sort_order: str = "desc",
) -> List[ConversationState]:
    """Search conversations with content, time range, and metadata filters."""
    try:
        try:
            from dataknobs_data.query import Query
        except ImportError:
            raise StorageError(
                "dataknobs_data package not available. "
                "Install it to use DataknobsConversationStorage."
            ) from None

        # Build query with time range and metadata filters
        # Fetch a larger set when content filtering will be applied
        # post-query, since content filtering reduces the result count.
        fetch_limit = limit + offset if content_contains is None else 0
        query = Query()
        if fetch_limit:
            query.limit(fetch_limit)

        if created_after:
            query.filter("created_at", ">=", created_after.isoformat())
        if created_before:
            query.filter("created_at", "<=", created_before.isoformat())

        if filter_metadata:
            for key, value in filter_metadata.items():
                query.filter(f"metadata.{key}", "=", value)

        query.sort_by(sort_by, sort_order)

        results = await self.backend.search(query)
        states = [self._record_to_state(record) for record in results]

        # Post-query content filtering: walk each conversation's message
        # tree and check if any message content matches.
        if content_contains:
            needle = content_contains.lower()
            filtered: List[ConversationState] = []
            for state in states:
                if self._conversation_contains_text(state, needle):
                    filtered.append(state)
            states = filtered

        # Apply offset/limit after content filtering
        if content_contains:
            states = states[offset:offset + limit]

        return states

    except StorageError:
        raise
    except Exception as e:
        raise StorageError(f"Failed to search conversations: {e}") from e
update_metadata async
update_metadata(conversation_id: str, metadata: Dict[str, Any]) -> None

Update conversation metadata.

Loads the conversation, updates its metadata, and saves it back.

Parameters:

Name Type Description Default
conversation_id str

ID of conversation to update

required
metadata Dict[str, Any]

New metadata dict (replaces existing metadata)

required

Raises:

Type Description
StorageError

If conversation not found or update fails

Source code in packages/llm/src/dataknobs_llm/conversations/storage.py
async def update_metadata(
    self,
    conversation_id: str,
    metadata: Dict[str, Any]
) -> None:
    """Update conversation metadata.

    Loads the conversation, updates its metadata, and saves it back.

    Args:
        conversation_id: ID of conversation to update
        metadata: New metadata dict (replaces existing metadata)

    Raises:
        StorageError: If conversation not found or update fails
    """
    try:
        # Load existing conversation
        state = await self.load_conversation(conversation_id)
        if state is None:
            raise StorageError(f"Conversation not found: {conversation_id}")

        # Update metadata
        state.metadata = metadata

        # Save back
        await self.save_conversation(state)

    except StorageError:
        raise
    except Exception as e:
        raise StorageError(f"Failed to update metadata: {e}") from e

Middleware

Abstract Middleware

dataknobs_llm.conversations.ConversationMiddleware

Bases: ABC

Base class for conversation middleware.

Middleware can process requests before LLM and responses after LLM. Middleware is executed in order for requests, and in reverse order for responses (onion pattern).

Execution Order

Given middleware list [MW0, MW1, MW2]:

  • Request: MW0 → MW1 → MW2 → LLM
  • Response: LLM → MW2 → MW1 → MW0

This allows MW0 to: 1. Start a timer in process_request() 2. See the LLM call complete 3. Stop the timer in process_response() and log total time

Use Cases
  • Logging: Track request/response details
  • Validation: Verify request/response content
  • Transformation: Modify messages or responses
  • Rate Limiting: Enforce API usage limits
  • Caching: Store/retrieve responses
  • Monitoring: Collect metrics and analytics
  • Security: Filter sensitive information
Example
from dataknobs_llm.conversations import ConversationMiddleware
import time

class TimingMiddleware(ConversationMiddleware):
    '''Measure LLM call duration.'''

    async def process_request(self, messages, state):
        # Store start time in state metadata
        state.metadata["request_start"] = time.time()
        return messages

    async def process_response(self, response, state):
        # Calculate elapsed time
        start = state.metadata.get("request_start")
        if start:
            elapsed = time.time() - start
            if not response.metadata:
                response.metadata = {}
            response.metadata["llm_duration_seconds"] = elapsed
            print(f"LLM call took {elapsed:.2f}s")
        return response

# Use in conversation
manager = await ConversationManager.create(
    llm=llm,
    middleware=[TimingMiddleware()]
)
Note

Performance Tips:

  • Keep process_request() and process_response() fast
  • Use async I/O (await) for external calls (DB, network)
  • Don't block the async loop with synchronous operations
  • For expensive operations, consider running them in background tasks
  • Store state in state.metadata not instance variables (thread safety)
See Also

LoggingMiddleware: Example implementation ConversationManager.complete: Where middleware is executed

Built-in Middleware

LoggingMiddleware

dataknobs_llm.conversations.LoggingMiddleware

LoggingMiddleware(logger: Logger | None = None)

Bases: ConversationMiddleware

Middleware that logs all requests and responses.

This middleware is useful for debugging and monitoring conversations. It logs message counts, conversation IDs, and response metadata.

Example

import logging logger = logging.getLogger(name) logging.basicConfig(level=logging.INFO)

middleware = LoggingMiddleware(logger) manager = await ConversationManager.create( ... llm=llm, ... prompt_builder=builder, ... storage=storage, ... middleware=[middleware] ... )

Initialize logging middleware.

Parameters:

Name Type Description Default
logger Logger | None

Logger instance to use (defaults to module logger)

None

Methods:

Name Description
process_request

Log request details before sending to LLM.

process_response

Log response details after receiving from LLM.

Source code in packages/llm/src/dataknobs_llm/conversations/middleware.py
def __init__(self, logger: logging.Logger | None = None):
    """Initialize logging middleware.

    Args:
        logger: Logger instance to use (defaults to module logger)
    """
    self.logger = logger or logging.getLogger(__name__)
Functions
process_request async
process_request(
    messages: List[LLMMessage], state: ConversationState
) -> List[LLMMessage]

Log request details before sending to LLM.

Source code in packages/llm/src/dataknobs_llm/conversations/middleware.py
async def process_request(
    self,
    messages: List[LLMMessage],
    state: ConversationState
) -> List[LLMMessage]:
    """Log request details before sending to LLM."""
    self.logger.info(
        f"Conversation {state.conversation_id} - "
        f"Sending {len(messages)} messages to LLM"
    )
    self.logger.debug(
        f"Conversation {state.conversation_id} - "
        f"Message roles: {[msg.role for msg in messages]}"
    )
    return messages
process_response async
process_response(
    response: LLMResponse, state: ConversationState
) -> LLMResponse

Log response details after receiving from LLM.

Source code in packages/llm/src/dataknobs_llm/conversations/middleware.py
async def process_response(
    self,
    response: LLMResponse,
    state: ConversationState
) -> LLMResponse:
    """Log response details after receiving from LLM."""
    content_length = len(response.content) if response.content else 0
    self.logger.info(
        f"Conversation {state.conversation_id} - "
        f"Received response: {content_length} chars, "
        f"model={response.model}, finish_reason={response.finish_reason}"
    )
    if response.usage:
        self.logger.debug(
            f"Conversation {state.conversation_id} - "
            f"Token usage: {response.usage}"
        )
    return response

Usage Examples

Basic Conversation

from dataknobs_llm import create_llm_provider, LLMConfig
from dataknobs_llm.conversations import (
    ConversationManager,
    DataknobsConversationStorage
)
from dataknobs_llm.prompts import AsyncPromptBuilder, FileSystemPromptLibrary
from dataknobs_data.backends import AsyncMemoryDatabase
from pathlib import Path

# Setup
config = LLMConfig(provider="openai", api_key="your-key")
llm = create_llm_provider(config)
library = FileSystemPromptLibrary(prompt_dir=Path("prompts/"))
builder = AsyncPromptBuilder(library=library)

# Create storage
db = AsyncMemoryDatabase()
storage = DataknobsConversationStorage(db)

# Create conversation
manager = await ConversationManager.create(
    llm=llm,
    prompt_builder=builder,
    storage=storage
)

# Add user message
await manager.add_message(
    role="user",
    prompt_name="greeting",
    params={"name": "Alice"}
)

# Get assistant response
response = await manager.complete()
print(response.content)

# Continue conversation
await manager.add_message(
    role="user",
    content="Tell me about Python decorators"
)
response = await manager.complete()

Branching Conversations

# Create conversation
manager = await ConversationManager.create(llm=llm, prompt_builder=builder)

# Initial exchange
await manager.add_message(role="user", content="Help me design an API")
response1 = await manager.complete()

# Save checkpoint
checkpoint_node = manager.current_node

# Try REST approach
await manager.add_message(role="user", content="Use REST principles")
rest_response = await manager.complete(branch_name="rest_approach")

# Go back and try GraphQL
await manager.switch_to_node(checkpoint_node)
await manager.add_message(role="user", content="Use GraphQL instead")
graphql_response = await manager.complete(branch_name="graphql_approach")

# View conversation tree
tree = await manager.get_tree_structure()
print(tree)

Persistence

from dataknobs_llm.conversations import DataknobsConversationStorage
from dataknobs_data.backends import AsyncMemoryDatabase

# Create with persistence
db = AsyncMemoryDatabase()
storage = DataknobsConversationStorage(db)

manager = await ConversationManager.create(
    llm=llm,
    prompt_builder=builder,
    storage=storage,
    conversation_id="user123-session1"
)

# Have conversation...
await manager.add_message(role="user", content="Hello")
await manager.complete()

# Automatically persisted on each operation

# Later, restore the same conversation
manager = await ConversationManager.create(
    llm=llm,
    prompt_builder=builder,
    storage=storage,
    conversation_id="user123-session1"  # Loads existing conversation
)

# Continue from where you left off
await manager.add_message(role="user", content="Continue...")

RAG Caching

# Enable RAG caching
manager = await ConversationManager.create(
    llm=llm,
    prompt_builder=builder,
    cache_rag_results=True,      # Store RAG metadata in conversation
    reuse_rag_on_branch=True     # Reuse RAG when branching
)

# Add message with RAG-enabled prompt
await manager.add_message(
    role="user",
    prompt_name="code_question",  # Has RAG configured
    params={"language": "python"}
)
# RAG search executed, metadata cached

# Get LLM response
response = await manager.complete()

# Inspect RAG metadata
rag_info = await manager.get_rag_metadata()
print(f"Query: {rag_info['RAG_DOCS']['query']}")
print(f"Results: {len(rag_info['RAG_DOCS']['results'])}")

# Branch conversation - RAG is reused from cache
await manager.switch_to_node("0")
await manager.complete(branch_name="alternative")
# No RAG search needed, uses cached results

Middleware

from dataknobs_llm.conversations import LoggingMiddleware, ConversationMiddleware

# Custom middleware
class TokenCounterMiddleware(ConversationMiddleware):
    def __init__(self):
        self.total_tokens = 0

    async def after_complete(self, manager, response):
        if response.usage:
            self.total_tokens += response.usage.total_tokens
        return response

# Use middleware
token_counter = TokenCounterMiddleware()
manager = await ConversationManager.create(
    llm=llm,
    prompt_builder=builder,
    middlewares=[
        LoggingMiddleware(),
        token_counter
    ]
)

# Have conversation...
await manager.add_message(role="user", content="Hello")
await manager.complete()

print(f"Total tokens used: {token_counter.total_tokens}")

Multi-Turn with System Prompts

# Create with system prompt
manager = await ConversationManager.create(
    llm=llm,
    prompt_builder=builder,
    system_prompt_name="code_assistant",
    system_prompt_params={"language": "python"}
)

# All completions will use this system prompt
await manager.add_message(role="user", content="Review this code: def foo(): pass")
response = await manager.complete()

await manager.add_message(role="user", content="How can I improve it?")
response = await manager.complete()
# Get conversation history
history = manager.get_history()
for msg in history:
    print(f"{msg.role}: {msg.content}")

# Get all nodes
nodes = await manager.list_nodes()
for node in nodes:
    print(f"Node {node.node_id}: {node.message.role if node.message else 'root'}")

# Navigate to specific node
await manager.switch_to_node("node-123")

# Branch from a node (navigate to its parent for sibling creation)
await manager.branch_from("node-123")
# Next add_message() or complete() creates a sibling of "node-123"

# Get parent node
parent = manager.get_parent_node()

# Get children nodes
children = manager.get_children_nodes()

Advanced Features

Tree Structure Analysis

# Get tree visualization
tree = await manager.get_tree_structure()
print(tree)

# Output example:
# root (system)
# ├── node-1 (user): "Help me design..."
# │   ├── node-2 (assistant): "Here's a REST approach..."
# │   │   └── node-3 (user): "Use REST principles"
# │   │       └── node-4 (assistant): [rest_approach]
# │   └── node-5 (user): "Use GraphQL instead"
# │       └── node-6 (assistant): [graphql_approach]

Conversation Metadata

# Add metadata to messages
await manager.add_message(
    role="user",
    content="Hello",
    metadata={"source": "web", "user_id": "123"}
)

# Access metadata
current_node = manager.get_current_node()
print(current_node.metadata)

Streaming in Conversations

# Stream assistant response
await manager.add_message(role="user", content="Tell me a story")

# Stream chunks
async for chunk in manager.stream_complete():
    print(chunk.content, end="", flush=True)

See Also