Conversations API¶
Tree-structured conversation management with branching and persistence.
📖 Also see: Auto-generated API Reference - Complete documentation from source code docstrings
Overview¶
The conversations API provides a powerful system for managing multi-turn conversations with support for branching, time-travel, persistence, and FSM-based flows.
ConversationManager¶
dataknobs_llm.conversations.ConversationManager ¶
ConversationManager(
llm: AsyncLLMProvider,
prompt_builder: AsyncPromptBuilder,
storage: ConversationStorage,
state: ConversationState | None = None,
metadata: Dict[str, Any] | None = None,
middleware: List[ConversationMiddleware] | None = None,
cache_rag_results: bool = False,
reuse_rag_on_branch: bool = False,
conversation_id: str | None = None,
)
Manages multi-turn conversations with persistence and branching.
This class orchestrates conversations by: - Tracking message history with tree-based branching - Managing conversation state - Persisting to storage backend - Supporting multiple conversation branches
The conversation history is stored as a tree structure where: - Root node contains the initial system prompt (if any) - Each message is a tree node with a dot-delimited ID (e.g., "0.1.2") - Branches occur when multiple children are added to the same node - Current position tracks where you are in the conversation tree
Attributes:
| Name | Type | Description |
|---|---|---|
llm |
LLM provider for completions |
|
prompt_builder |
Prompt builder with library |
|
storage |
Storage backend for persistence |
|
state |
Current conversation state (tree, metadata, position) |
|
middleware |
List of middleware to execute on requests/responses |
|
cache_rag_results |
Whether to store RAG metadata in nodes |
|
reuse_rag_on_branch |
Whether to reuse cached RAG across branches |
|
conversation_id |
str | None
|
Unique conversation identifier |
current_node_id |
str | None
|
Current position in conversation tree |
Example
# Create conversation
manager = await ConversationManager.create(
llm=llm,
prompt_builder=builder,
storage=storage_backend,
system_prompt_name="helpful_assistant"
)
# Add user message
await manager.add_message(
prompt_name="user_query",
params={"question": "What is Python?"},
role="user"
)
# Get LLM response
result = await manager.complete()
# Continue conversation
await manager.add_message(
content="Tell me more about decorators",
role="user"
)
result = await manager.complete()
# Create alternative response branch
await manager.switch_to_node("0") # Back to first user message
result2 = await manager.complete(branch_name="alt-response")
# Resume after interruption
manager2 = await ConversationManager.resume(
conversation_id=manager.conversation_id,
llm=llm,
prompt_builder=builder,
storage=storage_backend
)
Note
Tree-based branching enables:
- A/B Testing: Generate multiple responses from the same context
- Retry Logic: Try again from a previous point after failures
- Alternative Explorations: Explore different conversation paths
- Debugging: Compare different middleware or RAG configurations
Node IDs use dot notation (e.g., "0.1.2" means 3rd child of 2nd child of 1st child of root). The root node has ID "".
State is automatically persisted after every operation. Use
resume() to continue conversations across sessions or servers.
See Also
create: Create a new conversation resume: Resume an existing conversation add_message: Add user/system message complete: Get LLM completion (blocking) stream_complete: Get LLM completion (streaming) switch_to_node: Navigate to different branch get_branches: List available branches get_total_cost: Calculate cumulative cost ConversationStorage: Storage backend implementations ConversationMiddleware: Request/response processing ConversationFlow: FSM-based conversation flows
Initialize conversation manager.
Note: Use ConversationManager.create() or ConversationManager.resume() instead of calling init directly.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
llm
|
AsyncLLMProvider
|
LLM provider for completions |
required |
prompt_builder
|
AsyncPromptBuilder
|
Prompt builder with library |
required |
storage
|
ConversationStorage
|
Storage backend for persistence |
required |
state
|
ConversationState | None
|
Optional existing conversation state |
None
|
metadata
|
Dict[str, Any] | None
|
Optional metadata for new conversations |
None
|
middleware
|
List[ConversationMiddleware] | None
|
Optional list of middleware to execute |
None
|
cache_rag_results
|
bool
|
If True, store RAG metadata in node metadata for debugging and transparency |
False
|
reuse_rag_on_branch
|
bool
|
If True, reuse cached RAG results when possible (useful for testing/branching) |
False
|
conversation_id
|
str | None
|
Optional conversation ID to use when creating the first message. If not provided, a UUID is generated automatically. |
None
|
Methods:
| Name | Description |
|---|---|
create |
Create a new conversation. |
add_message |
Add a message to the current conversation node. |
complete |
Get LLM completion and add as child of current node. |
switch_to_node |
Switch current position to a different node in the tree. |
branch_from |
Position tree so the next message becomes a sibling of the given node. |
get_rag_metadata |
Get RAG metadata from a conversation node. |
Source code in packages/llm/src/dataknobs_llm/conversations/manager.py
Functions¶
create
async
classmethod
¶
create(
llm: AsyncLLMProvider,
prompt_builder: AsyncPromptBuilder,
storage: ConversationStorage,
system_prompt_name: str | None = None,
system_params: Dict[str, Any] | None = None,
metadata: Dict[str, Any] | None = None,
middleware: List[ConversationMiddleware] | None = None,
cache_rag_results: bool = False,
reuse_rag_on_branch: bool = False,
conversation_id: str | None = None,
) -> ConversationManager
Create a new conversation.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
llm
|
AsyncLLMProvider
|
LLM provider |
required |
prompt_builder
|
AsyncPromptBuilder
|
Prompt builder |
required |
storage
|
ConversationStorage
|
Storage backend |
required |
system_prompt_name
|
str | None
|
Optional system prompt to initialize with |
None
|
system_params
|
Dict[str, Any] | None
|
Optional params for system prompt |
None
|
metadata
|
Dict[str, Any] | None
|
Optional conversation metadata |
None
|
middleware
|
List[ConversationMiddleware] | None
|
Optional list of middleware to execute |
None
|
cache_rag_results
|
bool
|
If True, store RAG metadata in node metadata |
False
|
reuse_rag_on_branch
|
bool
|
If True, reuse cached RAG results when possible |
False
|
conversation_id
|
str | None
|
Optional conversation ID. If not provided, a UUID is generated when the first message is added. |
None
|
Returns:
| Type | Description |
|---|---|
ConversationManager
|
Initialized ConversationManager |
Example
manager = await ConversationManager.create( ... llm=llm, ... prompt_builder=builder, ... storage=storage, ... system_prompt_name="helpful_assistant", ... cache_rag_results=True ... )
Source code in packages/llm/src/dataknobs_llm/conversations/manager.py
add_message
async
¶
add_message(
role: str,
content: str | None = None,
prompt_name: str | None = None,
params: Dict[str, Any] | None = None,
include_rag: bool = True,
rag_configs: List[Dict[str, Any]] | None = None,
metadata: Dict[str, Any] | None = None,
name: str | None = None,
tool_call_id: str | None = None,
) -> ConversationNode
Add a message to the current conversation node.
Either content or prompt_name must be provided. If using a prompt with RAG configuration, the RAG searches will be executed and results will be automatically inserted into the prompt.
Prompt rendering (template variables, RAG) is only applied for
"system" and "user" roles. Other roles ("tool",
"assistant", "function") are stored directly with no
prompt processing.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
role
|
str
|
Message role — |
required |
content
|
str | None
|
Direct message content (if not using prompt) |
None
|
prompt_name
|
str | None
|
Name of prompt template to render (system/user only) |
None
|
params
|
Dict[str, Any] | None
|
Parameters for prompt rendering |
None
|
include_rag
|
bool
|
Whether to execute RAG searches for prompts |
True
|
rag_configs
|
List[Dict[str, Any]] | None
|
RAG configurations for inline content (only used when content is provided without prompt_name). Allows inline prompts to benefit from RAG enhancement. |
None
|
metadata
|
Dict[str, Any] | None
|
Optional metadata for this message node |
None
|
name
|
str | None
|
Optional name for the message — used for tool result messages to identify which tool produced the result. |
None
|
tool_call_id
|
str | None
|
Optional provider-assigned tool call ID — used for tool result messages to pair the result with the specific tool invocation (required by Anthropic and OpenAI APIs). |
None
|
Returns:
| Type | Description |
|---|---|
ConversationNode
|
The created ConversationNode |
Raises:
| Type | Description |
|---|---|
ValueError
|
If neither content nor prompt_name provided |
Example
# Add message from prompt
await manager.add_message(
role="user",
prompt_name="code_question",
params={"code": code_snippet}
)
# Add direct message
await manager.add_message(
role="user",
content="What is Python?"
)
# Add inline message with RAG enhancement
await manager.add_message(
role="system",
content="You are a helpful assistant. Use the context below.",
rag_configs=[{
"adapter_name": "docs",
"query": "assistant guidelines",
"placeholder": "CONTEXT",
"k": 3
}]
)
# Add tool result message
await manager.add_message(
role="tool",
content='{"success": true, "data": {"name": "flour"}}',
name="add_bank_record",
)
Note
RAG Caching Behavior:
If cache_rag_results=True and reuse_rag_on_branch=True were
set during ConversationManager creation, this method will:
- Check if the same prompt+role was used elsewhere in the tree
- Check if the RAG query parameters match (via query hash)
- Reuse cached RAG results if found (no re-search!)
- Store new RAG results if not found
This is particularly useful when exploring conversation branches, as you can avoid redundant searches for the same information.
See Also
complete: Get LLM response after adding message get_rag_metadata: Retrieve RAG metadata from a node
Source code in packages/llm/src/dataknobs_llm/conversations/manager.py
359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 | |
complete
async
¶
complete(
branch_name: str | None = None,
metadata: Dict[str, Any] | None = None,
llm_config_overrides: Dict[str, Any] | None = None,
system_prompt_override: str | None = None,
tools: list[Any] | None = None,
**llm_kwargs: Any,
) -> LLMResponse
Get LLM completion and add as child of current node.
This method: 1. Gets conversation history from root to current node 2. Optionally overrides the system prompt for this call only 3. Executes middleware (pre-LLM) 4. Calls LLM with history (and tools if provided) 5. Executes middleware (post-LLM) 6. Adds assistant response as child of current node 7. Updates current position to new node 8. Persists to storage
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
branch_name
|
str | None
|
Optional human-readable label for this branch |
None
|
metadata
|
Dict[str, Any] | None
|
Optional metadata for the assistant message node |
None
|
llm_config_overrides
|
Dict[str, Any] | None
|
Optional dict to override LLM config fields for this request only. Supported fields: model, temperature, max_tokens, top_p, stop_sequences, seed. |
None
|
system_prompt_override
|
str | None
|
Optional system prompt to use for this completion only. Replaces the first system message in the message list without mutating the conversation tree. Used by reasoning strategies to inject stage-specific context. |
None
|
tools
|
list[Any] | None
|
Optional list of tools available for this completion. Forwarded to the LLM provider's complete() method for consistent tool handling across all providers. |
None
|
**llm_kwargs
|
Any
|
Additional arguments for LLM.complete() |
{}
|
Returns:
| Type | Description |
|---|---|
LLMResponse
|
LLM response with content, usage, and cost information |
Raises:
| Type | Description |
|---|---|
ValueError
|
If conversation has no messages yet |
Example
# Get response
result = await manager.complete()
print(result.content)
print(f"Cost: ${result.cost_usd:.4f}")
# Create labeled branch
result = await manager.complete(branch_name="alternative-answer")
# With LLM parameters
result = await manager.complete(temperature=0.9, max_tokens=500)
# With config overrides (switch model per-request)
result = await manager.complete(
llm_config_overrides={"model": "gpt-4-turbo", "temperature": 0.9}
)
# With system prompt override (used by reasoning strategies)
result = await manager.complete(
system_prompt_override="You are a wizard assistant at step 2..."
)
# With tools
result = await manager.complete(tools=[search_tool, calc_tool])
Note
Middleware Execution Order (Onion Model):
- Pre-LLM: middleware[0] → middleware[1] → ... → middleware[N]
- LLM call happens
- Post-LLM: middleware[N] → ... → middleware[1] → middleware[0]
This "onion" pattern ensures that middleware wraps around the LLM
call symmetrically. For example, if middleware[0] starts a timer
in process_request(), it will be the last to run in
process_response() and can log the total elapsed time.
Automatic Cost Tracking:
The response includes cost_usd (this call) and cumulative_cost_usd
(total conversation cost) if the LLM provider returns usage statistics.
See Also
stream_complete: Streaming version for real-time output add_message: Add user/system message before calling complete switch_to_node: Navigate to different branch before completing
Source code in packages/llm/src/dataknobs_llm/conversations/manager.py
742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 | |
switch_to_node
async
¶
Switch current position to a different node in the tree.
This allows exploring different branches or backtracking in the conversation.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
node_id
|
str
|
Target node ID (dot-delimited, e.g., "0.1" or "") |
required |
Raises:
| Type | Description |
|---|---|
ValueError
|
If node_id not found in tree |
Example
Go back to first user message¶
await manager.switch_to_node("0")
Create alternative response¶
result = await manager.complete(branch_name="alternative")
Go back to root¶
await manager.switch_to_node("")
Source code in packages/llm/src/dataknobs_llm/conversations/manager.py
branch_from
async
¶
Position tree so the next message becomes a sibling of the given node.
Navigates to the parent of the specified node. The next add_message()
or complete() call will create a new child of that parent, effectively
branching the conversation from the same point.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
sibling_node_id
|
str
|
ID of the node whose parent we should navigate to. |
required |
Raises:
| Type | Description |
|---|---|
ValueError
|
If no conversation state, node not found, or node is the root (root has no parent to branch from). |
Example
Branch from the same parent as node "0.0"¶
await manager.branch_from("0.0")
Now at node "0"; next complete() creates "0.1"¶
response = await manager.complete()
Source code in packages/llm/src/dataknobs_llm/conversations/manager.py
get_rag_metadata ¶
Get RAG metadata from a conversation node.
This method retrieves the cached RAG metadata from a specific node, which includes information about RAG searches executed during prompt rendering (queries, results, query hashes, etc.).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
node_id
|
str | None
|
Node ID to retrieve metadata from (default: current node) |
None
|
Returns:
| Type | Description |
|---|---|
Dict[str, Any] | None
|
RAG metadata dictionary if present, None otherwise. Structure: |
Dict[str, Any] | None
|
```python |
Dict[str, Any] | None
|
{ "PLACEHOLDER_NAME": { "query": "resolved RAG query", "query_hash": "hash of adapter+query", "results": [...], # Search results "adapter_name": "name of RAG adapter used" }, ... # One entry per RAG placeholder |
Dict[str, Any] | None
|
} |
Dict[str, Any] | None
|
``` |
Raises:
| Type | Description |
|---|---|
ValueError
|
If node_id not found in conversation tree |
Example
# Get RAG metadata from current node
metadata = manager.get_rag_metadata()
if metadata:
for placeholder, rag_data in metadata.items():
print(f"Placeholder: {placeholder}")
print(f" Query: {rag_data['query']}")
print(f" Adapter: {rag_data['adapter_name']}")
print(f" Results: {len(rag_data['results'])} items")
print(f" Hash: {rag_data['query_hash']}")
# Get RAG metadata from specific node
metadata = manager.get_rag_metadata(node_id="0.1")
# Check if RAG was used for a message
if manager.get_rag_metadata():
print("This message used RAG-enhanced prompt")
else:
print("This message used direct content")
Note
RAG metadata is only available if cache_rag_results=True was
set during ConversationManager creation. This metadata is useful
for debugging RAG behavior, understanding what information was
retrieved, and implementing RAG result caching across branches.
See Also
add_message: Method that executes RAG and stores metadata reuse_rag_on_branch: Parameter enabling RAG cache reuse
Source code in packages/llm/src/dataknobs_llm/conversations/manager.py
1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 | |
Conversation Types¶
ConversationNode¶
dataknobs_llm.conversations.ConversationNode
dataclass
¶
ConversationNode(
message: LLMMessage,
node_id: str,
timestamp: datetime = datetime.now(),
prompt_name: str | None = None,
branch_name: str | None = None,
metadata: Dict[str, Any] = dict(),
)
Data stored in each conversation tree node.
Each node represents a single message (system, user, or assistant) in the conversation. The tree structure allows for branching conversations where multiple alternative messages can be explored.
Attributes:
| Name | Type | Description |
|---|---|---|
message |
LLMMessage
|
The LLM message (role + content) |
node_id |
str
|
Dot-delimited child positions from root (e.g., "0.1.2") |
timestamp |
datetime
|
When this message was created |
prompt_name |
str | None
|
Optional name of prompt template used to generate this |
branch_name |
str | None
|
Optional human-readable label for this branch |
metadata |
Dict[str, Any]
|
Additional metadata (usage stats, model info, etc.) |
Example
node = ConversationNode( ... message=LLMMessage(role="user", content="Hello"), ... node_id="0.1", ... timestamp=datetime.now(), ... prompt_name="greeting", ... branch_name="polite-variant" ... )
Methods:
| Name | Description |
|---|---|
from_dict |
Create node from dictionary. |
to_dict |
Convert node to dictionary for storage. |
Functions¶
from_dict
classmethod
¶
Create node from dictionary.
Source code in packages/llm/src/dataknobs_llm/conversations/storage.py
to_dict ¶
Convert node to dictionary for storage.
Source code in packages/llm/src/dataknobs_llm/conversations/storage.py
ConversationState¶
dataknobs_llm.conversations.ConversationState
dataclass
¶
ConversationState(
conversation_id: str,
message_tree: Tree,
current_node_id: str = "",
metadata: Dict[str, Any] = dict(),
created_at: datetime = datetime.now(),
updated_at: datetime = datetime.now(),
schema_version: str = SCHEMA_VERSION,
)
State of a conversation with tree-based branching support.
This replaces the linear message history with a tree structure that supports multiple branches (alternative conversation paths).
Attributes:
| Name | Type | Description |
|---|---|---|
conversation_id |
str
|
Unique conversation identifier |
message_tree |
Tree
|
Root of conversation tree (Tree[ConversationNode]) |
current_node_id |
str
|
ID of current position in tree (dot-delimited) |
metadata |
Dict[str, Any]
|
Additional conversation metadata |
created_at |
datetime
|
Conversation creation timestamp |
updated_at |
datetime
|
Last update timestamp |
schema_version |
str
|
Version of the storage schema used |
Example
Create conversation with system message¶
root_node = ConversationNode( ... message=LLMMessage(role="system", content="You are helpful"), ... node_id="" ... ) tree = Tree(root_node) state = ConversationState( ... conversation_id="conv-123", ... message_tree=tree, ... current_node_id="", ... metadata={"user_id": "alice"} ... )
Add user message¶
user_node = ConversationNode( ... message=LLMMessage(role="user", content="Hello"), ... node_id="0" ... ) tree.add_child(Tree(user_node)) state.current_node_id = "0"
Methods:
| Name | Description |
|---|---|
__post_init__ |
Initialize transient runtime state. |
from_dict |
Create state from dictionary. |
get_all_nodes |
Get all ConversationNode objects across all branches. |
get_current_messages |
Get messages from root to current position (for LLM). |
get_current_node |
Get the current tree node. |
get_current_nodes |
Get ConversationNode objects from root to current position. |
to_dict |
Convert state to dictionary for storage. |
to_export_dict |
Export the full conversation as a flat message list. |
Functions¶
__post_init__ ¶
Initialize transient runtime state.
turn_data is per-turn cross-middleware communication state.
Populated by the bot layer before each LLM call and cleared
after the turn completes. Stored as a plain attribute (not a
dataclass field) so it is invisible to dataclasses.asdict()
and dataclasses.fields() — only the custom to_dict()
and from_dict() matter for serialization, and they
deliberately exclude it.
Source code in packages/llm/src/dataknobs_llm/conversations/storage.py
from_dict
classmethod
¶
Create state from dictionary.
Reconstructs the tree from nodes and edges. Handles schema version migration if needed.
Source code in packages/llm/src/dataknobs_llm/conversations/storage.py
get_all_nodes ¶
Get all ConversationNode objects across all branches.
Unlike :meth:get_current_nodes (which returns only the active
root-to-leaf path), this returns every node in the tree sorted
by timestamp. Use this for full interaction logging, auditing, or
conversation export where abandoned branches (e.g. collection-mode
iterations) must be visible.
Returns:
| Type | Description |
|---|---|
List[ConversationNode]
|
All |
Source code in packages/llm/src/dataknobs_llm/conversations/storage.py
get_current_messages ¶
Get messages from root to current position (for LLM).
get_current_node ¶
get_current_nodes ¶
Get ConversationNode objects from root to current position.
Like get_current_messages(), but returns full ConversationNode objects including timestamps, node_ids, and metadata.
Source code in packages/llm/src/dataknobs_llm/conversations/storage.py
to_dict ¶
Convert state to dictionary for storage.
The tree is serialized as a list of edges (parent_id, child_id, node_data). Includes schema_version for backward compatibility.
Source code in packages/llm/src/dataknobs_llm/conversations/storage.py
to_export_dict ¶
Export the full conversation as a flat message list.
Returns the conversation in a format suitable for logging, auditing, and API responses. All branches are included (not just the active path), so collection-mode iterations and other abandoned branches are visible.
Returns:
| Type | Description |
|---|---|
Dict[str, Any]
|
Dictionary with |
Dict[str, Any]
|
|
Dict[str, Any]
|
order), and |
Source code in packages/llm/src/dataknobs_llm/conversations/storage.py
LLMMessage¶
dataknobs_llm.llm.LLMMessage
dataclass
¶
LLMMessage(
role: str,
content: str,
name: str | None = None,
tool_call_id: str | None = None,
function_call: Dict[str, Any] | None = None,
tool_calls: list[ToolCall] | None = None,
metadata: Dict[str, Any] = dict(),
)
Represents a message in LLM conversation.
Standard message format used across all providers. Messages are the fundamental unit of LLM interactions, containing role-based content for multi-turn conversations.
Attributes:
| Name | Type | Description |
|---|---|---|
role |
str
|
Message role - 'system', 'user', 'assistant', 'tool', or 'function' |
content |
str
|
Message content text |
name |
str | None
|
Optional name for function/tool messages or multi-user scenarios |
tool_call_id |
str | None
|
Provider-assigned ID for pairing tool results with invocations (required by OpenAI and Anthropic APIs) |
function_call |
Dict[str, Any] | None
|
Function call data for tool-using models |
tool_calls |
list[ToolCall] | None
|
Tool calls made by the assistant in this message |
metadata |
Dict[str, Any]
|
Additional metadata (timestamps, IDs, etc.) |
Example
from dataknobs_llm.llm.base import LLMMessage
# System message
system_msg = LLMMessage(
role="system",
content="You are a helpful coding assistant."
)
# User message
user_msg = LLMMessage(
role="user",
content="How do I reverse a list in Python?"
)
# Assistant message
assistant_msg = LLMMessage(
role="assistant",
content="Use the reverse() method or [::-1] slicing."
)
# Function result message
function_msg = LLMMessage(
role="function",
name="search_docs",
content='{"result": "Found 3 examples"}'
)
# Build conversation
messages = [system_msg, user_msg, assistant_msg]
Methods:
| Name | Description |
|---|---|
from_dict |
Create an LLMMessage from a dictionary. |
to_dict |
Convert to canonical dictionary format for storage/interchange. |
Functions¶
from_dict
classmethod
¶
Create an LLMMessage from a dictionary.
Handles both the new canonical format (with tool_calls as list of
dicts) and the legacy format (without tool_calls/function_call).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data
|
Dict[str, Any]
|
Dictionary with |
required |
Returns:
| Type | Description |
|---|---|
LLMMessage
|
LLMMessage instance. |
Source code in packages/llm/src/dataknobs_llm/llm/base.py
to_dict ¶
Convert to canonical dictionary format for storage/interchange.
Only includes non-None/non-empty optional fields to keep output clean.
Returns:
| Type | Description |
|---|---|
Dict[str, Any]
|
Dictionary with |
Source code in packages/llm/src/dataknobs_llm/llm/base.py
Storage¶
Abstract Storage¶
dataknobs_llm.conversations.ConversationStorage ¶
Bases: ABC
Abstract storage interface for conversations.
This interface defines the contract for persisting conversation state. Implementations can use any backend (SQL, NoSQL, file, etc.).
Lifecycle
Implementations must provide a create() classmethod for
configuration-driven instantiation, and may override close()
to release resources such as connections or pools.
Example::
class MyStorage(ConversationStorage):
@classmethod
async def create(cls, config: dict[str, Any]) -> "MyStorage":
instance = cls(...)
return instance
async def close(self) -> None:
await self._pool.close()
Methods:
| Name | Description |
|---|---|
save_conversation |
Save conversation state (upsert). |
load_conversation |
Load conversation state. |
delete_conversation |
Delete conversation. |
list_conversations |
List conversations with optional filtering and sorting. |
count_conversations |
Return the number of conversations matching the filter. |
Functions¶
save_conversation
abstractmethod
async
¶
Save conversation state (upsert).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
state
|
ConversationState
|
Conversation state to save |
required |
load_conversation
abstractmethod
async
¶
Load conversation state.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
conversation_id
|
str
|
Conversation identifier |
required |
Returns:
| Type | Description |
|---|---|
ConversationState | None
|
Conversation state or None if not found |
Source code in packages/llm/src/dataknobs_llm/conversations/storage.py
delete_conversation
abstractmethod
async
¶
Delete conversation.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
conversation_id
|
str
|
Conversation identifier |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if deleted, False if not found |
Source code in packages/llm/src/dataknobs_llm/conversations/storage.py
list_conversations
abstractmethod
async
¶
list_conversations(
filter_metadata: Dict[str, Any] | None = None,
limit: int = 100,
offset: int = 0,
sort_by: str | None = None,
sort_order: str = "desc",
) -> List[ConversationState]
List conversations with optional filtering and sorting.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
filter_metadata
|
Dict[str, Any] | None
|
Optional metadata filters |
None
|
limit
|
int
|
Maximum number of results |
100
|
offset
|
int
|
Offset for pagination |
0
|
sort_by
|
str | None
|
Field name to sort by (e.g. "updated_at", "created_at") |
None
|
sort_order
|
str
|
Sort direction, "asc" or "desc" (default: "desc") |
'desc'
|
Returns:
| Type | Description |
|---|---|
List[ConversationState]
|
List of conversation states |
Source code in packages/llm/src/dataknobs_llm/conversations/storage.py
count_conversations
async
¶
Return the number of conversations matching the filter.
Default implementation fetches all matching conversations and counts them. Subclasses should override with an efficient backend query.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
filter_metadata
|
Dict[str, Any] | None
|
Optional metadata filters (exact match). |
None
|
Returns:
| Type | Description |
|---|---|
int
|
Total number of matching conversations. |
Source code in packages/llm/src/dataknobs_llm/conversations/storage.py
Implementations¶
DataknobsConversationStorage¶
dataknobs_llm.conversations.DataknobsConversationStorage ¶
Bases: ConversationStorage
Conversation storage using dataknobs_data backends.
Stores conversations as Records with the tree serialized as nodes + edges. Works with any dataknobs backend (Memory, File, S3, Postgres, etc.).
The storage layer handles: - Automatic serialization/deserialization of conversation trees - Schema version migration when loading old conversations - Metadata-based filtering for listing conversations - Upsert operations (insert or update)
Attributes:
| Name | Type | Description |
|---|---|---|
backend |
Dataknobs async database backend instance |
Example
from dataknobs_data import database_factory
from dataknobs_llm.conversations import DataknobsConversationStorage
# Memory backend (development/testing)
db = database_factory.create(backend="memory")
storage = DataknobsConversationStorage(db)
# File backend (local persistence)
db = database_factory.create(
backend="file",
file_path="./conversations.jsonl"
)
storage = DataknobsConversationStorage(db)
# S3 backend (cloud storage)
db = database_factory.create(
backend="s3",
bucket="my-conversations",
region="us-west-2"
)
storage = DataknobsConversationStorage(db)
# Postgres backend (production)
db = database_factory.create(
backend="postgres",
host="db.example.com",
database="conversations",
user="app",
password="secret"
)
storage = DataknobsConversationStorage(db)
# Save conversation
await storage.save_conversation(state)
# Load conversation
state = await storage.load_conversation("conv-123")
# List user's conversations
user_convos = await storage.list_conversations(
filter_metadata={"user_id": "alice"},
limit=50
)
# Delete conversation
deleted = await storage.delete_conversation("conv-123")
Note
Backend Selection:
- Memory: Fast, no persistence. Use for testing or ephemeral conversations.
- File: Simple local persistence. Good for single-server deployments.
- S3: Scalable cloud storage. Best for serverless or distributed systems.
- Postgres: Full ACID guarantees. Best for production multi-server setups.
All backends support the same API, so you can switch between them by changing the database_factory configuration.
See Also
ConversationStorage: Abstract interface ConversationState: State structure being stored dataknobs_data.database_factory: Backend creation utilities
Initialize storage with dataknobs backend.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
backend
|
Any
|
Dataknobs async database backend (AsyncMemoryDatabase, etc.) |
required |
Methods:
| Name | Description |
|---|---|
close |
Close the underlying database backend. |
count_conversations |
Return the number of conversations matching the filter. |
create |
Create storage from configuration dict. |
delete_conversation |
Delete conversation from backend. |
delete_conversations |
Delete conversations matching the given filters. |
list_conversations |
List conversations from backend. |
load_conversation |
Load conversation from backend. |
save_conversation |
Save conversation to backend. |
search_conversations |
Search conversations with content, time range, and metadata filters. |
update_metadata |
Update conversation metadata. |
Source code in packages/llm/src/dataknobs_llm/conversations/storage.py
Functions¶
close
async
¶
count_conversations
async
¶
Return the number of conversations matching the filter.
Uses a backend query without deserializing full conversation states, which is more efficient than the default implementation.
Source code in packages/llm/src/dataknobs_llm/conversations/storage.py
create
async
classmethod
¶
Create storage from configuration dict.
Creates the database backend via AsyncDatabaseFactory, connects
it, and returns an initialized storage instance.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
dict[str, Any]
|
Database backend configuration. Must include a
|
required |
Returns:
| Type | Description |
|---|---|
DataknobsConversationStorage
|
Connected |
Source code in packages/llm/src/dataknobs_llm/conversations/storage.py
delete_conversation
async
¶
Delete conversation from backend.
Source code in packages/llm/src/dataknobs_llm/conversations/storage.py
delete_conversations
async
¶
delete_conversations(
content_contains: str | None = None,
created_after: datetime | None = None,
created_before: datetime | None = None,
filter_metadata: Dict[str, Any] | None = None,
) -> List[str]
Delete conversations matching the given filters.
Source code in packages/llm/src/dataknobs_llm/conversations/storage.py
list_conversations
async
¶
list_conversations(
filter_metadata: Dict[str, Any] | None = None,
limit: int = 100,
offset: int = 0,
sort_by: str | None = None,
sort_order: str = "desc",
) -> List[ConversationState]
List conversations from backend.
Source code in packages/llm/src/dataknobs_llm/conversations/storage.py
load_conversation
async
¶
Load conversation from backend.
Source code in packages/llm/src/dataknobs_llm/conversations/storage.py
save_conversation
async
¶
Save conversation to backend.
Source code in packages/llm/src/dataknobs_llm/conversations/storage.py
search_conversations
async
¶
search_conversations(
content_contains: str | None = None,
created_after: datetime | None = None,
created_before: datetime | None = None,
filter_metadata: Dict[str, Any] | None = None,
limit: int = 100,
offset: int = 0,
sort_by: str = "updated_at",
sort_order: str = "desc",
) -> List[ConversationState]
Search conversations with content, time range, and metadata filters.
Source code in packages/llm/src/dataknobs_llm/conversations/storage.py
1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 | |
update_metadata
async
¶
Update conversation metadata.
Loads the conversation, updates its metadata, and saves it back.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
conversation_id
|
str
|
ID of conversation to update |
required |
metadata
|
Dict[str, Any]
|
New metadata dict (replaces existing metadata) |
required |
Raises:
| Type | Description |
|---|---|
StorageError
|
If conversation not found or update fails |
Source code in packages/llm/src/dataknobs_llm/conversations/storage.py
Middleware¶
Abstract Middleware¶
dataknobs_llm.conversations.ConversationMiddleware ¶
Bases: ABC
Base class for conversation middleware.
Middleware can process requests before LLM and responses after LLM. Middleware is executed in order for requests, and in reverse order for responses (onion pattern).
Execution Order
Given middleware list [MW0, MW1, MW2]:
- Request: MW0 → MW1 → MW2 → LLM
- Response: LLM → MW2 → MW1 → MW0
This allows MW0 to:
1. Start a timer in process_request()
2. See the LLM call complete
3. Stop the timer in process_response() and log total time
Use Cases
- Logging: Track request/response details
- Validation: Verify request/response content
- Transformation: Modify messages or responses
- Rate Limiting: Enforce API usage limits
- Caching: Store/retrieve responses
- Monitoring: Collect metrics and analytics
- Security: Filter sensitive information
Example
from dataknobs_llm.conversations import ConversationMiddleware
import time
class TimingMiddleware(ConversationMiddleware):
'''Measure LLM call duration.'''
async def process_request(self, messages, state):
# Store start time in state metadata
state.metadata["request_start"] = time.time()
return messages
async def process_response(self, response, state):
# Calculate elapsed time
start = state.metadata.get("request_start")
if start:
elapsed = time.time() - start
if not response.metadata:
response.metadata = {}
response.metadata["llm_duration_seconds"] = elapsed
print(f"LLM call took {elapsed:.2f}s")
return response
# Use in conversation
manager = await ConversationManager.create(
llm=llm,
middleware=[TimingMiddleware()]
)
Note
Performance Tips:
- Keep
process_request()andprocess_response()fast - Use async I/O (await) for external calls (DB, network)
- Don't block the async loop with synchronous operations
- For expensive operations, consider running them in background tasks
- Store state in
state.metadatanot instance variables (thread safety)
See Also
LoggingMiddleware: Example implementation ConversationManager.complete: Where middleware is executed
Built-in Middleware¶
LoggingMiddleware¶
dataknobs_llm.conversations.LoggingMiddleware ¶
Bases: ConversationMiddleware
Middleware that logs all requests and responses.
This middleware is useful for debugging and monitoring conversations. It logs message counts, conversation IDs, and response metadata.
Example
import logging logger = logging.getLogger(name) logging.basicConfig(level=logging.INFO)
middleware = LoggingMiddleware(logger) manager = await ConversationManager.create( ... llm=llm, ... prompt_builder=builder, ... storage=storage, ... middleware=[middleware] ... )
Initialize logging middleware.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
logger
|
Logger | None
|
Logger instance to use (defaults to module logger) |
None
|
Methods:
| Name | Description |
|---|---|
process_request |
Log request details before sending to LLM. |
process_response |
Log response details after receiving from LLM. |
Source code in packages/llm/src/dataknobs_llm/conversations/middleware.py
Functions¶
process_request
async
¶
Log request details before sending to LLM.
Source code in packages/llm/src/dataknobs_llm/conversations/middleware.py
process_response
async
¶
Log response details after receiving from LLM.
Source code in packages/llm/src/dataknobs_llm/conversations/middleware.py
Usage Examples¶
Basic Conversation¶
from dataknobs_llm import create_llm_provider, LLMConfig
from dataknobs_llm.conversations import (
ConversationManager,
DataknobsConversationStorage
)
from dataknobs_llm.prompts import AsyncPromptBuilder, FileSystemPromptLibrary
from dataknobs_data.backends import AsyncMemoryDatabase
from pathlib import Path
# Setup
config = LLMConfig(provider="openai", api_key="your-key")
llm = create_llm_provider(config)
library = FileSystemPromptLibrary(prompt_dir=Path("prompts/"))
builder = AsyncPromptBuilder(library=library)
# Create storage
db = AsyncMemoryDatabase()
storage = DataknobsConversationStorage(db)
# Create conversation
manager = await ConversationManager.create(
llm=llm,
prompt_builder=builder,
storage=storage
)
# Add user message
await manager.add_message(
role="user",
prompt_name="greeting",
params={"name": "Alice"}
)
# Get assistant response
response = await manager.complete()
print(response.content)
# Continue conversation
await manager.add_message(
role="user",
content="Tell me about Python decorators"
)
response = await manager.complete()
Branching Conversations¶
# Create conversation
manager = await ConversationManager.create(llm=llm, prompt_builder=builder)
# Initial exchange
await manager.add_message(role="user", content="Help me design an API")
response1 = await manager.complete()
# Save checkpoint
checkpoint_node = manager.current_node
# Try REST approach
await manager.add_message(role="user", content="Use REST principles")
rest_response = await manager.complete(branch_name="rest_approach")
# Go back and try GraphQL
await manager.switch_to_node(checkpoint_node)
await manager.add_message(role="user", content="Use GraphQL instead")
graphql_response = await manager.complete(branch_name="graphql_approach")
# View conversation tree
tree = await manager.get_tree_structure()
print(tree)
Persistence¶
from dataknobs_llm.conversations import DataknobsConversationStorage
from dataknobs_data.backends import AsyncMemoryDatabase
# Create with persistence
db = AsyncMemoryDatabase()
storage = DataknobsConversationStorage(db)
manager = await ConversationManager.create(
llm=llm,
prompt_builder=builder,
storage=storage,
conversation_id="user123-session1"
)
# Have conversation...
await manager.add_message(role="user", content="Hello")
await manager.complete()
# Automatically persisted on each operation
# Later, restore the same conversation
manager = await ConversationManager.create(
llm=llm,
prompt_builder=builder,
storage=storage,
conversation_id="user123-session1" # Loads existing conversation
)
# Continue from where you left off
await manager.add_message(role="user", content="Continue...")
RAG Caching¶
# Enable RAG caching
manager = await ConversationManager.create(
llm=llm,
prompt_builder=builder,
cache_rag_results=True, # Store RAG metadata in conversation
reuse_rag_on_branch=True # Reuse RAG when branching
)
# Add message with RAG-enabled prompt
await manager.add_message(
role="user",
prompt_name="code_question", # Has RAG configured
params={"language": "python"}
)
# RAG search executed, metadata cached
# Get LLM response
response = await manager.complete()
# Inspect RAG metadata
rag_info = await manager.get_rag_metadata()
print(f"Query: {rag_info['RAG_DOCS']['query']}")
print(f"Results: {len(rag_info['RAG_DOCS']['results'])}")
# Branch conversation - RAG is reused from cache
await manager.switch_to_node("0")
await manager.complete(branch_name="alternative")
# No RAG search needed, uses cached results
Middleware¶
from dataknobs_llm.conversations import LoggingMiddleware, ConversationMiddleware
# Custom middleware
class TokenCounterMiddleware(ConversationMiddleware):
def __init__(self):
self.total_tokens = 0
async def after_complete(self, manager, response):
if response.usage:
self.total_tokens += response.usage.total_tokens
return response
# Use middleware
token_counter = TokenCounterMiddleware()
manager = await ConversationManager.create(
llm=llm,
prompt_builder=builder,
middlewares=[
LoggingMiddleware(),
token_counter
]
)
# Have conversation...
await manager.add_message(role="user", content="Hello")
await manager.complete()
print(f"Total tokens used: {token_counter.total_tokens}")
Multi-Turn with System Prompts¶
# Create with system prompt
manager = await ConversationManager.create(
llm=llm,
prompt_builder=builder,
system_prompt_name="code_assistant",
system_prompt_params={"language": "python"}
)
# All completions will use this system prompt
await manager.add_message(role="user", content="Review this code: def foo(): pass")
response = await manager.complete()
await manager.add_message(role="user", content="How can I improve it?")
response = await manager.complete()
Navigation¶
# Get conversation history
history = manager.get_history()
for msg in history:
print(f"{msg.role}: {msg.content}")
# Get all nodes
nodes = await manager.list_nodes()
for node in nodes:
print(f"Node {node.node_id}: {node.message.role if node.message else 'root'}")
# Navigate to specific node
await manager.switch_to_node("node-123")
# Branch from a node (navigate to its parent for sibling creation)
await manager.branch_from("node-123")
# Next add_message() or complete() creates a sibling of "node-123"
# Get parent node
parent = manager.get_parent_node()
# Get children nodes
children = manager.get_children_nodes()
Advanced Features¶
Tree Structure Analysis¶
# Get tree visualization
tree = await manager.get_tree_structure()
print(tree)
# Output example:
# root (system)
# ├── node-1 (user): "Help me design..."
# │ ├── node-2 (assistant): "Here's a REST approach..."
# │ │ └── node-3 (user): "Use REST principles"
# │ │ └── node-4 (assistant): [rest_approach]
# │ └── node-5 (user): "Use GraphQL instead"
# │ └── node-6 (assistant): [graphql_approach]
Conversation Metadata¶
# Add metadata to messages
await manager.add_message(
role="user",
content="Hello",
metadata={"source": "web", "user_id": "123"}
)
# Access metadata
current_node = manager.get_current_node()
print(current_node.metadata)
Streaming in Conversations¶
# Stream assistant response
await manager.add_message(role="user", content="Tell me a story")
# Stream chunks
async for chunk in manager.stream_complete():
print(chunk.content, end="", flush=True)
See Also¶
- Conversation Management Guide - Detailed guide
- FSM-Based Flows - Workflow orchestration
- Performance Guide - RAG caching details
- Examples - Flow examples