Advanced Usage¶
This guide covers advanced features and patterns for power users, including the heavier packages for AI, workflows, and data processing.
Configuration Management¶
Environment-Aware Configuration¶
from dataknobs_config import Config
from dataknobs_data import database_factory
# config.yaml with environment variables
# databases:
# primary:
# backend: ${DB_BACKEND:memory}
# host: ${DB_HOST:localhost}
# port: ${DB_PORT:5432}
config = Config("config.yaml")
config.register_factory("database", database_factory)
# Backend chosen based on environment
db = config.get_instance("databases", "primary")
Factory Pattern for Dynamic Objects¶
from dataknobs_config import Config
def custom_processor_factory(config_dict):
processor_type = config_dict.get("type")
if processor_type == "fast":
return FastProcessor(**config_dict)
elif processor_type == "accurate":
return AccurateProcessor(**config_dict)
config = Config({"processors": {"main": {"type": "fast"}}})
config.register_factory("processor", custom_processor_factory)
processor = config.get_instance("processors", "main")
Data Abstraction¶
Multi-Backend Applications¶
from dataknobs_data import database_factory, Record, Query
from dataknobs_config import Config
config = Config({
"databases": {
"cache": {"backend": "memory"},
"storage": {"backend": "postgres", "connection": "..."},
"search": {"backend": "elasticsearch", "host": "..."}
}
})
config.register_factory("database", database_factory)
# Use different backends for different purposes
cache = config.get_instance("databases", "cache")
storage = config.get_instance("databases", "storage")
search = config.get_instance("databases", "search")
# Same API across all backends
record = Record({"id": "123", "content": "data"})
cache.create(record)
storage.create(record)
search.create(record)
# Query with same interface
results = search.search(Query().filter("content", "contains", "data"))
Async High-Performance Operations¶
from dataknobs_data import async_database_factory, Record, Query
async def process_large_dataset():
# Create async database using factory
db = async_database_factory.create({
"backend": "postgres",
"connection": "postgresql://...",
"pool_size": 20
})
# Batch create with pooling
records = [Record({"id": i, "data": f"item{i}"}) for i in range(10000)]
await db.bulk_create(records, batch_size=100)
# Async iteration over large result sets
async for record in db.stream(Query()):
await process_record(record)
await db.close()
Workflow Orchestration with FSM¶
Complex Multi-Stage Pipelines¶
from dataknobs_fsm import SimpleFSM, DataHandlingMode
config = {
"name": "etl_pipeline",
"states": [
{"name": "extract", "is_start": True},
{"name": "validate"},
{"name": "transform"},
{"name": "enrich"},
{"name": "load", "is_end": True},
{"name": "error"}
],
"arcs": [
{
"from": "extract",
"to": "validate",
"transform": {
"type": "builtin",
"name": "extract_from_api",
"params": {"url": "https://api.example.com/data"}
}
},
{
"from": "validate",
"to": "transform",
"pre_test": {
"type": "inline",
"code": "lambda data, ctx: data.get('valid', False)"
}
},
{
"from": "validate",
"to": "error",
"pre_test": {
"type": "inline",
"code": "lambda data, ctx: not data.get('valid', False)"
}
},
{"from": "transform", "to": "enrich"},
{"from": "enrich", "to": "load"}
]
}
fsm = SimpleFSM(config, data_mode=DataHandlingMode.COPY)
result = fsm.process({"source": "api"})
FSM with Resource Management¶
from dataknobs_fsm import SimpleFSM
from dataknobs_data import database_factory
# Create database using factory for FSM context
db = database_factory.create({
"backend": "postgres",
"connection": "postgresql://..."
})
config = {
"name": "db_processor",
"states": [
{"name": "load", "is_start": True},
{"name": "process", "is_end": True}
],
"arcs": [
{
"from": "load",
"to": "process",
"transform": {
"type": "inline",
"code": "lambda data, ctx: ctx['database'].search(Query())"
}
}
]
}
fsm = SimpleFSM(config)
fsm.context["database"] = db
result = fsm.process(data)
LLM Integration¶
Prompt Template Management¶
from dataknobs_llm import create_llm_provider, MessageTemplate, MessageBuilder, LLMMessage
# Create message templates for reusable prompts
summarize_template_v1 = MessageTemplate(
"Summarize the following in {max_words} words:\n\n{text}"
)
summarize_template_v2 = MessageTemplate(
"Provide a {max_words}-word summary of:\n{text}\n\nFocus on key points."
)
# Use LLM with templates
llm = create_llm_provider({"provider": "openai", "model": "gpt-4"})
# Build messages from template
builder = MessageBuilder()
builder.add_user_message(summarize_template_v2.format(
text="Long article content...",
max_words=50
))
response = await llm.generate(builder.messages)
Tool Calling with LLMs¶
from dataknobs_llm import create_llm_provider, Tool, ToolRegistry, LLMMessage
from typing import Dict, Any
# Define custom tools by subclassing Tool
class DatabaseSearchTool(Tool):
def __init__(self):
super().__init__(
name="search_database",
description="Search the database for relevant information"
)
@property
def schema(self) -> Dict[str, Any]:
return {
"type": "object",
"properties": {
"query": {"type": "string", "description": "Search query"}
},
"required": ["query"]
}
async def execute(self, query: str) -> list:
# Implementation
return results
class CalculatorTool(Tool):
def __init__(self):
super().__init__(
name="calculate",
description="Evaluate a mathematical expression"
)
@property
def schema(self) -> Dict[str, Any]:
return {
"type": "object",
"properties": {
"expression": {"type": "string", "description": "Math expression"}
},
"required": ["expression"]
}
async def execute(self, expression: str) -> float:
# Implementation
return result
# Register tools
registry = ToolRegistry()
registry.register(DatabaseSearchTool())
registry.register(CalculatorTool())
# Use LLM with tools
llm = create_llm_provider({
"provider": "openai",
"model": "gpt-4"
})
messages = [LLMMessage(
role="user",
content="What's 15% of the revenue from last quarter?"
)]
response = await llm.generate(messages, tools=registry.get_all())
AI Agents and Chatbots¶
Multi-Tenant Bot System¶
import asyncio
from dataknobs_bots import DynaBot, BotContext
from dataknobs_data import database_factory
async def main():
# Persistent storage using factory
db = database_factory.create({
"backend": "postgres",
"connection": "postgresql://..."
})
# Create bots for different tenants
support_bot = await DynaBot.from_config({
"llm": {"provider": "openai", "model": "gpt-4"},
"conversation_storage": {"backend": "postgres", "connection": "postgresql://..."},
"memory": {"type": "buffer", "max_messages": 20},
"system_prompt": "You are a helpful support agent."
})
sales_bot = await DynaBot.from_config({
"llm": {"provider": "anthropic", "model": "claude-3-5-sonnet-20241022"},
"conversation_storage": {"backend": "postgres", "connection": "postgresql://..."},
"memory": {"type": "buffer", "max_messages": 20},
"system_prompt": "You are a sales assistant."
})
# Use bots with context isolation
support_context = BotContext(
conversation_id="support-001",
client_id="tenant1",
user_id="user1"
)
support_response = await support_bot.chat("Help me reset password", support_context)
sales_context = BotContext(
conversation_id="sales-001",
client_id="tenant2",
user_id="user2"
)
sales_response = await sales_bot.chat("Tell me about pricing", sales_context)
asyncio.run(main())
RAG-Enabled Chatbot¶
import asyncio
from dataknobs_bots import DynaBot, BotContext
from dataknobs_data import database_factory
async def main():
# Knowledge base using factory
knowledge_base = database_factory.create({
"backend": "elasticsearch",
"host": "localhost:9200",
"index": "documentation"
})
bot_config = {
"llm": {"provider": "openai", "model": "gpt-4"},
"conversation_storage": {"backend": "memory"},
"memory": {"type": "buffer", "max_messages": 10},
"rag": {
"enabled": True,
"knowledge_base": knowledge_base,
"top_k": 5,
"score_threshold": 0.7
},
"system_prompt": "Answer questions using the provided documentation."
}
bot = await DynaBot.from_config(bot_config)
# Bot retrieves relevant docs before answering
context = BotContext(
conversation_id="docs-001",
client_id="my-app",
user_id="user1"
)
response = await bot.chat("How do I configure the database?", context)
print(response)
asyncio.run(main())
Advanced Tree Operations¶
Tree Merging and Splitting¶
from dataknobs_structures import Tree, build_tree_from_string
# Merge multiple trees
tree1 = build_tree_from_string("root -> a, b")
tree2 = build_tree_from_string("root -> c, d")
merged = Tree.merge(tree1, tree2)
# Result: root -> a, b, c, d
# Split tree at a node
subtree = tree1.extract_subtree("a")
Tree Serialization¶
import json
from dataknobs_structures import Tree
# Serialize tree to JSON
tree = build_tree_from_string("root -> child1, child2")
tree_json = tree.to_json()
# Deserialize from JSON
restored_tree = Tree.from_json(tree_json)
# Custom serialization format
def custom_serializer(node):
return {
"id": node.id,
"value": node.value,
"children": [custom_serializer(c) for c in node.children]
}
serialized = custom_serializer(tree.root)
Advanced Text Processing¶
Custom Tokenizers¶
from dataknobs_xization import masking_tokenizer
class CustomTokenizer(masking_tokenizer.MaskingTokenizer):
def __init__(self):
super().__init__()
self.add_pattern(r'\b[A-Z]{2,}\b', 'ACRONYM')
self.add_pattern(r'\$\d+\.\d{2}', 'CURRENCY')
def mask_token(self, token, token_type):
if token_type == 'ACRONYM':
return '[ACRONYM]'
elif token_type == 'CURRENCY':
return '[MONEY]'
return super().mask_token(token, token_type)
tokenizer = CustomTokenizer()
text = "IBM costs $150.99 per share"
tokens = tokenizer.tokenize(text)
# Output: ["[ACRONYM]", "costs", "[MONEY]", "per", "share"]
Text Annotation Pipeline¶
from dataknobs_xization import annotations
from dataknobs_structures import Text, TextMetaData
class AnnotationPipeline:
def __init__(self):
self.annotators = []
def add_annotator(self, annotator):
self.annotators.append(annotator)
def process(self, text):
metadata = TextMetaData()
doc = Text(text, metadata)
for annotator in self.annotators:
doc = annotator.annotate(doc)
return doc
# Create pipeline
pipeline = AnnotationPipeline()
pipeline.add_annotator(NamedEntityAnnotator())
pipeline.add_annotator(SentimentAnnotator())
pipeline.add_annotator(LanguageDetector())
# Process text
result = pipeline.process("John Smith loves Python programming.")
Advanced Elasticsearch Integration¶
Bulk Operations¶
from dataknobs_utils import elasticsearch_utils
class BulkIndexer:
def __init__(self, es_client, index_name):
self.es = es_client
self.index = index_name
self.buffer = []
self.buffer_size = 1000
def add(self, doc):
self.buffer.append(doc)
if len(self.buffer) >= self.buffer_size:
self.flush()
def flush(self):
if not self.buffer:
return
actions = []
for doc in self.buffer:
actions.append({
"_index": self.index,
"_source": doc
})
elasticsearch_utils.bulk_index(self.es, actions)
self.buffer.clear()
# Usage
indexer = BulkIndexer(es_client, "my_index")
for i in range(10000):
indexer.add({"id": i, "data": f"Document {i}"})
indexer.flush()
Custom Query Builders¶
from dataknobs_utils import elasticsearch_utils
class QueryBuilder:
def __init__(self):
self.query = {"bool": {}}
def must(self, clause):
if "must" not in self.query["bool"]:
self.query["bool"]["must"] = []
self.query["bool"]["must"].append(clause)
return self
def should(self, clause):
if "should" not in self.query["bool"]:
self.query["bool"]["should"] = []
self.query["bool"]["should"].append(clause)
return self
def filter(self, clause):
if "filter" not in self.query["bool"]:
self.query["bool"]["filter"] = []
self.query["bool"]["filter"].append(clause)
return self
def build(self):
return {"query": self.query}
# Build complex query
query = (QueryBuilder()
.must({"match": {"title": "python"}})
.filter({"range": {"date": {"gte": "2024-01-01"}}})
.should({"match": {"tags": "tutorial"}})
.build())
results = es_client.search(index="docs", body=query)
Performance Optimization¶
Caching Strategies¶
from functools import lru_cache
from dataknobs_structures import Tree
class CachedTreeProcessor:
def __init__(self):
self.cache = {}
@lru_cache(maxsize=1000)
def process_node(self, node_id, operation):
# Expensive operation
result = self._compute(node_id, operation)
return result
def process_tree(self, tree):
results = []
for node in tree.traverse():
result = self.process_node(node.id, "analyze")
results.append(result)
return results
Parallel Processing¶
from concurrent.futures import ProcessPoolExecutor
from dataknobs_xization import basic_normalization_fn
def process_batch(texts):
return [basic_normalization_fn(text) for text in texts]
def parallel_normalize(all_texts, workers=4):
batch_size = len(all_texts) // workers
batches = [all_texts[i:i+batch_size]
for i in range(0, len(all_texts), batch_size)]
with ProcessPoolExecutor(max_workers=workers) as executor:
results = executor.map(process_batch, batches)
return [item for batch in results for item in batch]
# Process large dataset
texts = [f"Text {i}" for i in range(10000)]
normalized = parallel_normalize(texts)
Custom Extensions¶
Plugin System¶
from abc import ABC, abstractmethod
class DataknobsPlugin(ABC):
@abstractmethod
def initialize(self, config):
pass
@abstractmethod
def process(self, data):
pass
@abstractmethod
def cleanup(self):
pass
class PluginManager:
def __init__(self):
self.plugins = {}
def register(self, name, plugin_class):
self.plugins[name] = plugin_class
def load(self, name, config=None):
if name not in self.plugins:
raise ValueError(f"Plugin {name} not found")
plugin = self.plugins[name]()
plugin.initialize(config or {})
return plugin
# Create custom plugin
class SentimentPlugin(DataknobsPlugin):
def initialize(self, config):
self.model = config.get("model", "default")
def process(self, data):
# Sentiment analysis logic
return {"sentiment": "positive", "score": 0.8}
def cleanup(self):
pass
# Use plugin system
manager = PluginManager()
manager.register("sentiment", SentimentPlugin)
plugin = manager.load("sentiment", {"model": "advanced"})
result = plugin.process("I love this product!")
Integration Patterns¶
Service Integration¶
from dataknobs_utils import json_utils
import requests
class DataknobsService:
def __init__(self, base_url):
self.base_url = base_url
def process_document(self, doc):
# Send to external service
response = requests.post(
f"{self.base_url}/process",
json={"document": doc}
)
# Parse response
result = response.json()
# Extract using json_utils
entities = json_utils.get_value(result, "analysis.entities", [])
sentiment = json_utils.get_value(result, "analysis.sentiment")
return {
"entities": entities,
"sentiment": sentiment
}
Next Steps¶
- Review Best Practices for production use
- Explore the API Reference for complete details
- See Examples for real-world implementations
- Read about Performance Tuning