FSM Integration Patterns¶
The FSM package includes pre-built patterns for common integration scenarios. These patterns provide tested, production-ready solutions that you can use directly or customize for your needs.
Note: Pattern classes must be imported directly from their respective modules as they are not exported at the package level.
Available Patterns¶
1. Database ETL Pattern¶
Database-focused Extract, Transform, and Load workflows for data processing pipelines.
Class: DatabaseETL
Import: from dataknobs_fsm.patterns.etl import DatabaseETL, ETLConfig, ETLMode
Use Cases: - Database migrations - Data warehouse loading - Database synchronization - Incremental data updates
Features:
- Multiple ETL modes: FULL_REFRESH, INCREMENTAL, UPSERT, APPEND
- Checkpoint support for resumable operations
- Configurable batch sizes and parallelism
- Built-in transformations and field mappings
2. File Processing Pattern¶
Process files of various formats with streaming support.
Class: FileProcessor
Import: from dataknobs_fsm.patterns.file_processing import FileProcessor, FileProcessingConfig
Use Cases: - CSV data processing - JSON stream processing - Log file analysis - Batch file operations
Features:
- Format support: CSV, JSON, XML, Parquet, TXT/LOG
- Processing modes: STREAM, BATCH, WHOLE
- Automatic format detection
- Configurable transformations and filters
- Memory-efficient streaming for large files
3. API Orchestration Pattern¶
Coordinate multiple API calls with advanced features.
Class: APIOrchestrator
Import: from dataknobs_fsm.patterns.api_orchestration import APIOrchestrator, APIOrchestrationConfig
Use Cases: - REST API orchestration - GraphQL query coordination - Microservice workflows - Multi-API data aggregation
Features:
- Orchestration modes: SEQUENTIAL, PARALLEL, FANOUT, PIPELINE, CONDITIONAL, HYBRID
- Built-in rate limiting and throttling
- Automatic retry with backoff
- Request/response transformation
- Authentication handling
4. LLM Workflow Pattern¶
Note: LLM functionality has moved to the dataknobs-llm package.
Use Cases: - Conversation management with FSM-based flows - Multi-turn conversations with branching - RAG (Retrieval Augmented Generation) - Prompt versioning and A/B testing
Features: - FSM-based conversation flows - Multi-provider LLM support (OpenAI, Anthropic) - Prompt templating with Jinja2 - RAG integration and caching - Conversation tree and branching
Migration guide → | LLM Package docs →
5. Error Recovery Pattern¶
Implement robust error handling and recovery strategies.
Class: ErrorRecoveryWorkflow
Import: from dataknobs_fsm.patterns.error_recovery import ErrorRecoveryWorkflow, ErrorRecoveryConfig
Use Cases: - Fault-tolerant systems - Resilient API calls - Critical workflows - High-availability services
Features:
- Recovery strategies: RETRY, CIRCUIT_BREAKER, FALLBACK, COMPENSATE, DEADLINE, BULKHEAD, CACHE
- Backoff strategies: FIXED, LINEAR, EXPONENTIAL, RANDOM
- Metrics tracking and monitoring
- Configurable failure thresholds
- Compensation and rollback support
Quick Start¶
Each pattern can be used in two ways:
1. Direct Usage with Factory Functions¶
from dataknobs_fsm.patterns.etl import create_etl_pipeline, ETLMode
# Create ETL pipeline using factory function
etl = create_etl_pipeline(
source={
"type": "database",
"provider": "postgresql",
"connection": "postgresql://source_db"
},
target={
"type": "database",
"provider": "postgresql",
"connection": "postgresql://target_db"
},
mode=ETLMode.INCREMENTAL,
transformations=[
lambda row: {**row, "processed_at": datetime.now()}
]
)
# Execute asynchronously
import asyncio
result = asyncio.run(etl.run())
2. Direct Class Instantiation¶
from dataknobs_fsm.patterns.file_processing import FileProcessor, FileProcessingConfig, ProcessingMode
# Create configuration
config = FileProcessingConfig(
input_path="data.csv",
output_path="processed.json",
mode=ProcessingMode.STREAM,
transformations=[
lambda record: {**record, "processed": True}
]
)
# Create and execute processor
processor = FileProcessor(config)
result = asyncio.run(processor.process())
3. Using Multiple Factory Functions¶
# File Processing
from dataknobs_fsm.patterns.file_processing import create_csv_processor
csv_processor = create_csv_processor(
input_file="data.csv",
output_file="output.json",
transformations=[lambda row: {**row, "status": "processed"}],
filters=[lambda row: row.get("active") == True]
)
# API Orchestration
from dataknobs_fsm.patterns.api_orchestration import create_rest_api_orchestrator
from dataknobs_fsm.patterns.api_orchestration import OrchestrationMode
api_orchestrator = create_rest_api_orchestrator(
base_url="https://api.example.com",
endpoints=[
{"name": "users", "path": "/users", "method": "GET"},
{"name": "posts", "path": "/posts", "method": "GET"}
],
auth_token="your-api-token",
rate_limit=100, # requests per second
mode=OrchestrationMode.PARALLEL
)
# LLM Workflow - now in dataknobs-llm package
from dataknobs_llm import create_llm_provider, LLMConfig
from dataknobs_llm.conversations import ConversationManager
config = LLMConfig(provider="openai", model="gpt-4", temperature=0.7)
llm = create_llm_provider(config)
# See LLM package docs for complete examples
# Error Recovery
from dataknobs_fsm.patterns.error_recovery import create_retry_workflow
from dataknobs_fsm.patterns.error_recovery import BackoffStrategy
retry_workflow = create_retry_workflow(
max_attempts=3,
backoff_strategy=BackoffStrategy.EXPONENTIAL,
initial_delay=1.0,
max_delay=60.0
)
Pattern Composition¶
Patterns can be combined using async orchestration:
import asyncio
from dataknobs_fsm.patterns.file_processing import create_csv_processor
from dataknobs_fsm.patterns.etl import create_etl_pipeline, ETLMode
from dataknobs_fsm.patterns.error_recovery import create_retry_workflow
async def complex_workflow():
"""Compose multiple patterns in a workflow."""
# Step 1: Process CSV file with retry logic
csv_processor = create_csv_processor(
input_file="raw_data.csv",
output_file="processed.json",
transformations=[lambda row: {**row, "validated": True}]
)
retry_wrapper = create_retry_workflow(
max_attempts=3,
backoff_strategy="exponential"
)
# Execute with retry
file_result = await retry_wrapper.execute(
csv_processor.process
)
# Step 2: Load to database
etl = create_etl_pipeline(
source={"type": "file", "path": "processed.json"},
target={"type": "database", "connection": "postgresql://db"},
mode=ETLMode.UPSERT
)
etl_result = await etl.run()
return {
"file_processing": file_result,
"etl": etl_result
}
# Run the composed workflow
result = asyncio.run(complex_workflow())
Customizing Patterns¶
All patterns are designed to be extensible through configuration:
from dataknobs_fsm.patterns.api_orchestration import APIOrchestrator, APIOrchestrationConfig
from dataknobs_fsm.patterns.api_orchestration import OrchestrationMode, APIEndpoint
# Create custom configuration
config = APIOrchestrationConfig(
name="custom_api_workflow",
mode=OrchestrationMode.HYBRID,
endpoints=[
APIEndpoint(
name="auth",
url="https://api.example.com/auth",
method="POST",
headers={"Content-Type": "application/json"},
retry_config={"max_attempts": 5}
),
APIEndpoint(
name="data",
url="https://api.example.com/data",
method="GET",
depends_on=["auth"], # Sequential dependency
transform=lambda resp: resp.get("data", [])
)
],
rate_limit=100,
timeout=30.0
)
# Create orchestrator with custom config
orchestrator = APIOrchestrator(config)
result = await orchestrator.orchestrate({"user": "test"})
Pattern Selection Guide¶
Choose the right pattern based on your needs:
| Pattern | Class Name | Best For | Key Modes/Strategies |
|---|---|---|---|
| Database ETL | DatabaseETL |
Database operations | FULL_REFRESH, INCREMENTAL, UPSERT, APPEND |
| File Processing | FileProcessor |
File operations | STREAM, BATCH, WHOLE |
| API Orchestration | APIOrchestrator |
API workflows | SEQUENTIAL, PARALLEL, FANOUT, PIPELINE |
| LLM Workflow | See dataknobs-llm | AI applications | Conversation flows, RAG, versioning |
| Error Recovery | ErrorRecoveryWorkflow |
Resilience | RETRY, CIRCUIT_BREAKER, FALLBACK |
Common Configurations¶
Batch Processing¶
Streaming¶
Error Handling¶
Performance Considerations¶
- Batch Size: Larger batches improve throughput but use more memory
- Parallelism: More workers increase speed but may overwhelm resources
- Streaming: Use for large datasets to maintain constant memory usage
- Caching: Enable for repeated operations on same data
- Connection Pooling: Reuse connections for database/API patterns
Monitoring and Metrics¶
Patterns with built-in metrics support:
from dataknobs_fsm.patterns.error_recovery import ErrorRecoveryWorkflow, ErrorRecoveryConfig
# Create workflow with monitoring
config = ErrorRecoveryConfig(
name="monitored_workflow",
primary_strategy="retry",
retry_config={
"max_attempts": 3,
"backoff_strategy": "exponential"
}
)
workflow = ErrorRecoveryWorkflow(config)
# Execute and get metrics
await workflow.execute(some_function, arg1, arg2)
metrics = workflow.get_metrics()
print(f"Total attempts: {metrics['total_attempts']}")
print(f"Success rate: {metrics['success_rate']}")
print(f"Average retry count: {metrics['avg_retry_count']}")
Example: Complete ETL Pipeline¶
import asyncio
from dataknobs_fsm.patterns.etl import create_data_migration
async def run_migration():
"""Example of a complete data migration."""
# Create migration with field mappings and transformations
migration = create_data_migration(
source={
"provider": "postgresql",
"connection": "postgresql://source/db",
"table": "users"
},
target={
"provider": "postgresql",
"connection": "postgresql://target/db",
"table": "users_v2"
},
field_mappings={
"user_id": "id",
"user_name": "name",
"user_email": "email"
},
transformations=[
lambda record: {
**record,
"migrated_at": datetime.now(),
"version": "2.0"
}
]
)
# Run with checkpoint for resumability
result = await migration.run(checkpoint_id="migration_2024")
print(f"Migrated {result['records_processed']} records")
print(f"Errors: {result['errors']}")
return result
# Execute migration
if __name__ == "__main__":
asyncio.run(run_migration())
Import Reference¶
# ETL Pattern
from dataknobs_fsm.patterns.etl import (
DatabaseETL, ETLConfig, ETLMode,
create_etl_pipeline, create_database_sync,
create_data_migration, create_data_warehouse_load
)
# File Processing Pattern
from dataknobs_fsm.patterns.file_processing import (
FileProcessor, FileProcessingConfig, ProcessingMode, FileFormat,
create_csv_processor, create_json_processor,
create_log_analyzer, create_batch_file_processor
)
# API Orchestration Pattern
from dataknobs_fsm.patterns.api_orchestration import (
APIOrchestrator, APIOrchestrationConfig, OrchestrationMode,
APIEndpoint, create_rest_api_orchestrator, create_graphql_orchestrator
)
# LLM Workflow Pattern - moved to dataknobs-llm package
# See: https://docs.dataknobs.com/packages/llm/
from dataknobs_llm import create_llm_provider, LLMConfig
from dataknobs_llm.conversations import ConversationManager
from dataknobs_llm.conversations.flow import ConversationFlow, FlowState
# Error Recovery Pattern
from dataknobs_fsm.patterns.error_recovery import (
ErrorRecoveryWorkflow, ErrorRecoveryConfig, RecoveryStrategy,
BackoffStrategy, create_retry_workflow, create_circuit_breaker_workflow,
create_resilient_workflow
)
Next Steps¶
- Explore individual pattern documentation:
- Database ETL
- File Processing
- API Orchestration
- LLM Workflows
- Error Recovery
- Check out examples using patterns
- Review guides for detailed usage