Middleware Guide¶
Middleware components for request/response lifecycle processing in DataKnobs Bots.
Overview¶
Middleware provides hooks into the bot request/response lifecycle, enabling:
- Logging: Track all interactions for debugging and analytics
- Cost Tracking: Monitor LLM usage and costs per turn
- Tool Observability: React to tool executions (audit, cost, analytics)
- Dependency Injection: Inject per-request resources (DB sessions, auth tokens)
- Message Transforms: Strip PII, sanitize input, detect attacks
- Metrics: Export performance data to monitoring systems
- Rate Limiting: Control request rates
- Authentication: Validate requests before processing
Lifecycle Hooks¶
The middleware pipeline uses a unified lifecycle based on TurnState. All turn
types (chat(), stream_chat(), greet()) flow through the same hooks:
User Message (or greet)
│
▼
┌─────────────────────────┐
│ on_turn_start(turn) │ ← Pre-processing, plugin_data, message transforms
└─────────────────────────┘
│
▼
┌─────────────────────────┐
│ Bot Processing │ ← Generation + tool execution loop
│ (tool_calls? → │
│ _execute_tools → │
│ re-generate) │
└─────────────────────────┘
│
▼ (success) ▼ (error)
┌─────────────────────────┐ ┌─────────────────────────┐
│ on_tool_executed(exec) │ │ on_error() │
│ (once per tool call) │ └─────────────────────────┘
├─────────────────────────┤ │
│ after_turn(turn) │ ▼
└─────────────────────────┘ Error re-raised
│
▼
Response (or stream complete)
Hook failure handling (on_hook_error):
If any middleware hook itself raises (e.g., a logging sink is down during
after_turn), the exception is caught, logged, and all middleware are
notified via on_hook_error(hook_name, error, context). This is separate
from on_error, which fires for request-level failures.
Error Semantics¶
| Hook | Fires when | Request succeeded? |
|---|---|---|
on_error |
Request preparation or generation fails | No |
on_hook_error |
A middleware hook itself raises | Yes (response already delivered) |
Middleware Base Class¶
Middleware is a concrete class with all hooks as no-ops. Subclasses override
only the hooks they need — no need to implement every method:
from dataknobs_bots.middleware.base import Middleware
from dataknobs_bots.bot.turn import TurnState
class MyCostTracker(Middleware):
# Only override what you need — everything else is a no-op
async def after_turn(self, turn: TurnState) -> None:
if turn.usage:
await save_usage(turn.usage, turn.context.client_id)
Built-in Middleware¶
DataKnobs Bots provides two built-in middleware classes. Both are fully migrated
to the unified TurnState hooks (on_turn_start, after_turn).
CostTrackingMiddleware¶
Tracks LLM API costs and token usage across different providers.
Features¶
- Real token usage from provider responses (via
after_turn) - Cost calculation with configurable rates
- Statistics by client and provider
- Export to JSON/CSV
Basic Usage¶
from dataknobs_bots.middleware import CostTrackingMiddleware
# Create middleware with default rates
cost_tracker = CostTrackingMiddleware()
# Or with custom rates
cost_tracker = CostTrackingMiddleware(
cost_rates={
"openai": {
"gpt-4o": {"input": 0.0025, "output": 0.01},
"gpt-4o-mini": {"input": 0.00015, "output": 0.0006},
},
"anthropic": {
"claude-3-5-sonnet": {"input": 0.003, "output": 0.015},
}
}
)
Retrieving Statistics¶
# Get stats for a specific client
stats = cost_tracker.get_client_stats("my-client")
print(f"Total cost: ${stats['total_cost_usd']:.4f}")
print(f"Total requests: {stats['total_requests']}")
# Get total cost across all clients
total = cost_tracker.get_total_cost()
print(f"Total cost: ${total:.4f}")
# Get token counts
tokens = cost_tracker.get_total_tokens()
print(f"Input tokens: {tokens['input']}")
print(f"Output tokens: {tokens['output']}")
# Export as JSON
json_data = cost_tracker.export_stats_json()
# Export as CSV
csv_data = cost_tracker.export_stats_csv()
Default Cost Rates¶
The middleware includes current rates (as of Dec 2024) for:
| Provider | Model | Input (per 1K) | Output (per 1K) |
|---|---|---|---|
| OpenAI | gpt-4o | $0.0025 | $0.01 |
| OpenAI | gpt-4o-mini | $0.00015 | $0.0006 |
| OpenAI | gpt-4-turbo | $0.01 | $0.03 |
| Anthropic | claude-3-5-sonnet | $0.003 | $0.015 |
| Anthropic | claude-3-5-haiku | $0.0008 | $0.004 |
| gemini-1.5-pro | $0.00125 | $0.005 | |
| gemini-2.0-flash | $0.0001 | $0.0004 | |
| Ollama | * | $0.00 | $0.00 |
LoggingMiddleware¶
Logs all user messages and bot responses for monitoring and debugging.
Features¶
- Structured logging with timestamps
- Configurable log levels
- Optional JSON format for log aggregation
- Full metadata capture
Basic Usage¶
from dataknobs_bots.middleware import LoggingMiddleware
# Basic logging
logger = LoggingMiddleware()
# With JSON format (for log aggregation systems)
logger = LoggingMiddleware(
log_level="INFO",
include_metadata=True,
json_format=True
)
# Debug logging without metadata
logger = LoggingMiddleware(
log_level="DEBUG",
include_metadata=False
)
Log Output Examples¶
Standard Format:
INFO:dataknobs_bots.middleware.logging.ConversationLogger:User message: {'timestamp': '2024-12-08T10:30:00+00:00', 'event': 'user_message', 'client_id': 'my-app', 'user_id': 'user-123', 'conversation_id': 'conv-1', 'message_length': 45}
JSON Format:
{"timestamp": "2024-12-08T10:30:00+00:00", "event": "user_message", "client_id": "my-app", "user_id": "user-123", "conversation_id": "conv-1", "message_length": 45}
Creating Custom Middleware¶
Basic Template — Unified Hooks (Preferred)¶
Override only the hooks you need. All others are no-ops.
from dataknobs_bots.middleware.base import Middleware
from dataknobs_bots.bot.turn import TurnState, ToolExecution
from dataknobs_bots.bot.context import BotContext
class MyMiddleware(Middleware):
"""Custom middleware — override only what you need."""
async def on_turn_start(self, turn: TurnState) -> str | None:
"""Pre-processing before LLM generation."""
turn.plugin_data["request_id"] = generate_id()
return None # or return sanitized_message
async def after_turn(self, turn: TurnState) -> None:
"""Post-processing after any turn completes."""
if turn.usage:
log.info(
"Turn complete: %s tokens",
turn.usage.get("input", 0) + turn.usage.get("output", 0),
)
async def on_tool_executed(
self, execution: ToolExecution, context: BotContext
) -> None:
"""Called after each tool execution."""
log.info("Tool %s: %s", execution.tool_name, execution.result)
async def on_error(
self, error: Exception, message: str, context: BotContext
) -> None:
"""Called when a request-level error occurs."""
log.error("Request failed: %s", error)
Example: Per-Request Dependency Injection¶
Use on_turn_start to inject per-request resources and after_turn to clean up.
Resources are available to tools via ToolExecutionContext.extra["turn_data"].
class SessionMiddleware(Middleware):
"""Inject a database session for each turn."""
def __init__(self, db_factory):
self._db_factory = db_factory
async def on_turn_start(self, turn: TurnState) -> str | None:
turn.plugin_data["db.session"] = await self._db_factory()
return None
async def after_turn(self, turn: TurnState) -> None:
session = turn.plugin_data.get("db.session")
if session:
await session.close()
Tools access the session via the context bridge:
class MyTool(ContextAwareTool):
async def execute(self, **kwargs):
session = self.context.extra["turn_data"]["db.session"]
return await session.execute(...)
Example: PII Stripping with Restoration¶
Note: For chat() and greet(), mutations to turn.response_content
in after_turn propagate to the caller. For stream_chat(), chunks were
already yielded before after_turn runs — mutations update
turn.response_content (available for logging/storage) but do not affect
the content the streaming consumer already received.
The transformed message also replaces the original in conversation history and memory. For PII stripping this is typically desired (the original PII is not persisted).
class PIIMiddleware(Middleware):
async def on_turn_start(self, turn: TurnState) -> str | None:
stripped, mappings = strip_pii(turn.message)
turn.plugin_data["pii.mappings"] = mappings
return stripped # Transformed message sent to LLM
async def after_turn(self, turn: TurnState) -> None:
# For chat/greet this updates the returned response.
# For streaming, this updates turn.response_content for
# logging/storage but does not affect already-yielded chunks.
mappings = turn.plugin_data.get("pii.mappings", {})
if mappings:
turn.response_content = restore_pii(
turn.response_content, mappings
)
Example: Rate Limiting Middleware¶
Use InMemoryRateLimiter from dataknobs-common for the rate limiting backend:
from dataknobs_common.ratelimit import (
InMemoryRateLimiter, RateLimit, RateLimiterConfig,
)
from dataknobs_common.exceptions import RateLimitError
from dataknobs_bots.middleware.base import Middleware
from dataknobs_bots.bot.turn import TurnState
class RateLimitMiddleware(Middleware):
"""Rate limiting middleware backed by InMemoryRateLimiter."""
def __init__(self, max_requests: int = 10, window_seconds: int = 60):
config = RateLimiterConfig(
default_rates=[RateLimit(limit=max_requests, interval=window_seconds)],
)
self._limiter = InMemoryRateLimiter(config)
async def on_turn_start(self, turn: TurnState) -> str | None:
client_id = turn.context.client_id
if not await self._limiter.try_acquire(client_id):
status = await self._limiter.get_status(client_id)
raise RateLimitError(
f"Rate limit exceeded for {client_id}",
retry_after=status.reset_after,
)
return None
See the Rate Limiting guide for the full InMemoryRateLimiter API, including per-category rates, weighted operations, and distributed backends.
TurnState Reference¶
TurnState (dataknobs_bots.bot.turn) is the per-turn state carrier. Created at
the start of each chat(), stream_chat(), or greet() call. Available to
middleware via on_turn_start and after_turn.
Key Fields¶
| Field | Type | Description |
|---|---|---|
mode |
TurnMode |
How the turn was initiated: CHAT, STREAM, GREET |
message |
str |
User message ("" for greet) |
context |
BotContext |
Bot context (client_id, conversation_id, user_id, etc.) |
response_content |
str |
Final response text (populated after generation) |
usage |
dict[str, int] \| None |
Token usage: {"input": N, "output": M} |
model |
str \| None |
Model that generated the response |
provider_name |
str \| None |
Provider name (e.g., "OpenAIProvider") |
tool_executions |
list[ToolExecution] |
Tool executions recorded during the turn |
plugin_data |
dict[str, Any] |
Cross-middleware communication dict |
ToolExecution¶
ToolExecution (dataknobs_bots.bot.turn) records a single tool execution:
| Field | Type | Description |
|---|---|---|
tool_name |
str |
Name of the tool |
parameters |
dict[str, Any] |
Parameters passed to the tool |
result |
Any |
Tool return value (if successful) |
error |
str \| None |
Error message (if failed) |
duration_ms |
float \| None |
Execution time in milliseconds |
plugin_data¶
plugin_data is a per-turn writable dict that bridges across the entire pipeline:
on_turn_start(turn) ← write plugin_data
→ ConversationMiddleware ← reads/writes via state.turn_data (same dict)
→ Tool execution ← reads via context.extra["turn_data"]
→ after_turn(turn) ← reads final plugin_data
Namespace by convention with dotted keys: "pii.mappings", "session.db",
"auth.claims".
Middleware Interface¶
All hooks are concrete no-ops on the Middleware base class. Override only
what you need.
Preferred Hooks¶
| Hook | Signature | When |
|---|---|---|
on_turn_start |
(turn: TurnState) -> str \| None |
Before processing; can transform message and write plugin_data |
after_turn |
(turn: TurnState) -> None |
After any turn completes (chat, stream, greet) |
on_tool_executed |
(execution: ToolExecution, context: BotContext) -> None |
After each tool call (post-turn, not real-time) |
Error Hooks¶
| Hook | Signature | When |
|---|---|---|
on_error |
(error: Exception, message: str, context: BotContext) -> None |
Request failed |
on_hook_error |
(hook_name: str, error: Exception, context: BotContext) -> None |
A middleware hook itself failed |
on_tool_executed Timing¶
on_tool_executed fires post-turn during _finalize_turn(), not in real-time
as tools execute. This hook is for auditing and logging, not for aborting or
rate-limiting mid-turn. Tool executions are also available as
turn.tool_executions in the after_turn hook for batch processing.
Legacy Hooks¶
The following hooks are kept for backward compatibility but are deprecated. Existing middleware using these hooks will continue to work. Migrate to the unified hooks at your convenience.
| Legacy Hook | Replacement | Notes |
|---|---|---|
before_message(message, context) |
on_turn_start(turn) |
on_turn_start provides full TurnState + plugin_data + message transforms |
after_message(response, context, **kwargs) |
after_turn(turn) |
after_turn fires for all turn types with real usage data |
post_stream(message, response, context) |
after_turn(turn) |
after_turn eliminates the chat-vs-stream split |
Both legacy and unified hooks fire on every turn — you can migrate incrementally.
Configuration¶
YAML Configuration¶
middleware:
# Built-in cost tracking
- class: dataknobs_bots.middleware.CostTrackingMiddleware
params:
track_tokens: true
cost_rates:
openai:
gpt-4o:
input: 0.0025
output: 0.01
# Built-in logging
- class: dataknobs_bots.middleware.LoggingMiddleware
params:
log_level: INFO
include_metadata: true
json_format: false
# Custom middleware
- class: my_app.middleware.RateLimitMiddleware
params:
max_requests: 100
window_seconds: 60
Programmatic Middleware via from_config()¶
Use the middleware= keyword argument to inject middleware programmatically,
bypassing config-driven middleware construction:
from dataknobs_bots import DynaBot
from dataknobs_bots.middleware import CostTrackingMiddleware, LoggingMiddleware
cost_tracker = CostTrackingMiddleware()
logger_mw = LoggingMiddleware(json_format=True)
bot = await DynaBot.from_config(
config,
middleware=[cost_tracker, logger_mw], # Overrides config middleware
)
When middleware= is passed, it completely replaces any middleware defined in the
config dict. Pass middleware=[] to explicitly disable all middleware.
Best Practices¶
-
Use Unified Hooks: Prefer
on_turn_startandafter_turnover legacy hooks. They provide the fullTurnStateand work uniformly across all turn types. -
Order Matters: Middleware executes in list order.
on_turn_startmessage transforms chain: each middleware receives the message as modified by the previous one. -
Namespace plugin_data: Use dotted keys (
"pii.mappings","session.db") to avoid collisions between middleware. -
Error Handling: Implement
on_errorfor request failures andon_hook_errorfor middleware infrastructure failures. -
Performance: Keep middleware lightweight. Offload heavy processing to background tasks.
-
Testing: Use
BotTestHarnesswithmiddleware=[...]to test middleware in integration.
Related Documentation¶
- Configuration Reference - Full configuration options
- User Guide - Getting started tutorials
- Bot Manager Guide - Multi-tenant deployment