FSM (Finite State Machine) Package¶
The FSM package provides a powerful and flexible framework for building state machines in Python. It enables you to create complex workflows, orchestrate API calls, manage resources, and build sophisticated data processing pipelines.
Features¶
Core Capabilities¶
- Core FSM Engine: Robust state machine implementation with support for complex transitions and data flows
- Multiple Execution Strategies: Depth-first, breadth-first, resource-optimized, and stream-optimized traversal strategies
- Execution Modes: Synchronous, asynchronous, batch, and streaming execution engines
- Configuration-Driven: Define FSMs using YAML/JSON configuration or programmatically
Data Handling Modes¶
- COPY Mode (default): Safe concurrent processing with data isolation
- REFERENCE Mode: Memory-efficient processing with optimistic locking
- DIRECT Mode: High-performance in-place modifications
Resource Management¶
Comprehensive lifecycle management with pooling for:
- Database connections with transaction support
- HTTP clients with retry logic
- File system resources
- Custom resource providers
For LLM integration, see the dataknobs-llm package.
Transform Functions¶
Interface-based transform system with:
- Validation functions for data quality
- Transform functions for data manipulation
- State test functions for conditional logic
- Extensive function library
Integration Patterns¶
Production-ready patterns for:
- ETL workflows (FULL_REFRESH, INCREMENTAL, UPSERT, APPEND)
- API orchestration with multi-service coordination
- Error recovery with retry and circuit breaker patterns
- File processing pipelines
For LLM/AI workflows, see the dataknobs-llm package.
Advanced Features¶
- Hierarchical state machines with PushArc support
- Transaction management with commit/rollback
- Execution history tracking and auditing
- Configurable data isolation for sub-networks
- Health monitoring and metrics
API Design¶
- SimpleFSM: Synchronous API for straightforward workflows (works in all contexts)
- AsyncSimpleFSM: Native async API for async/await contexts
- AdvancedFSM: For debugging, stepping, and hooks
Quick Links¶
- Quick Start Guide - Get started with FSM in 5 minutes
- API Documentation - Complete API reference
- Patterns Guide - Pre-built integration patterns
- Examples - Real-world usage examples
Installation¶
Or with optional dependencies:
# With database support
pip install dataknobs-fsm[database]
# With all extras
pip install dataknobs-fsm[all]
For LLM functionality, install the separate LLM package:
Basic Usage¶
Using SimpleFSM API¶
from dataknobs_fsm.api.simple import SimpleFSM
from datetime import datetime
# Create custom functions
def validate_data(state):
"""Custom validation function."""
if 'required_field' not in state.data:
raise ValueError("Missing required field")
return state.data
def process_data(state):
"""Transform function for processing."""
data = state.data.copy()
data['processed'] = True
data['timestamp'] = datetime.now().isoformat()
return data
# Define FSM configuration
config = {
"name": "data_processor",
"states": [
{"name": "start", "is_start": True},
{"name": "validate"},
{"name": "process"},
{"name": "end", "is_end": True}
],
"arcs": [
{
"from": "start",
"to": "validate",
"transform": {"type": "registered", "name": "validate"}
},
{
"from": "validate",
"to": "process",
"transform": {"type": "registered", "name": "process"}
},
{"from": "process", "to": "end"}
]
}
# Initialize FSM with custom functions
fsm = SimpleFSM(
config,
custom_functions={
'validate': validate_data,
'process': process_data
}
)
# Execute the FSM
result = fsm.process(
{"required_field": "value", "input": "data"}
)
print(result) # {"final_state": "end", "data": {"required_field": "value", "input": "data", "processed": True, "timestamp": "..."}, ...}
Using AdvancedFSM API¶
import asyncio
from dataknobs_fsm.api.advanced import (
create_advanced_fsm, ExecutionMode, ExecutionHook
)
# Set up execution hooks for monitoring
hooks = ExecutionHook(
on_state_enter=lambda state: print(f"Entering: {state}"),
on_state_exit=lambda state: print(f"Exiting: {state}"),
on_error=lambda error: print(f"Error: {error}")
)
# Create FSM via factory (accepts config dict, YAML path, or FSM instance)
fsm = create_advanced_fsm(
"fsm_config.yaml",
execution_mode=ExecutionMode.STEP_BY_STEP
)
fsm.set_hooks(hooks)
async def run_workflow():
test_data = {"input": "data"}
# Step-by-step execution (step() returns StepResult)
async with fsm.execution_context(test_data) as context:
while True:
result = await fsm.step(context)
print(f" {result.from_state} -> {result.to_state}")
if not result.success or result.is_complete or result.transition == "none":
break
# Or run with profiling
profile = await fsm.profile_execution(test_data)
print(f"Execution time: {profile['total_time']:.4f}s")
print(f"Transitions: {profile['transitions']}")
asyncio.run(run_workflow())
Using Configuration¶
# fsm_config.yaml
name: data_processor
data_mode: COPY # or REFERENCE or DIRECT
execution_strategy: DEPTH_FIRST # or BREADTH_FIRST, RESOURCE_OPTIMIZED, STREAM_OPTIMIZED
states:
- name: start
is_start: true
- name: validate
functions:
state_test:
type: builtin
name: has_required_fields
params:
fields: ["user_id", "data"]
- name: process
functions:
transform:
type: inline
code: |
lambda state: {
**state.data,
'processed': True,
'timestamp': datetime.now().isoformat()
}
resources:
- type: database
name: main_db
- name: end
is_end: true
arcs:
- from: start
to: validate
- from: validate
to: process
pre_test:
type: builtin
name: data_valid
- from: process
to: end
transform:
type: builtin
name: add_metadata
resources:
database:
main_db:
provider: postgresql
connection_string: ${DATABASE_URL}
pool_size: 10
from dataknobs_fsm.api.simple import SimpleFSM
fsm = SimpleFSM("fsm_config.yaml")
result = fsm.process({"user_id": "123", "data": "input"})
Architecture¶
The FSM package is built with a modular, layered architecture:
Core Components¶
State Management¶
StateDefinition: Template for states with schemas and validationStateInstance: Runtime state instances with data and contextStateTest,ValidityTest: Condition checking functions
Arc System¶
ArcDefinition: Transition definitions with optional transformsArcExecution: Runtime arc execution with resource managementPushArc: Hierarchical composition for sub-networks
Execution Engines¶
ExecutionEngine: Synchronous execution with strategy supportAsyncExecutionEngine: Asynchronous execution with concurrencyBatchExecutor: Optimized batch processingStreamExecutor: Stream processing with backpressure
Data Handling¶
DataModeHandler: Abstract interface for data operationsCopyModeHandler: Safe concurrent processingReferenceModeHandler: Memory-efficient with lockingDirectModeHandler: High-performance in-place operations
Resource Management¶
ResourceManager: Central resource lifecycle controlResourcePool: Connection pooling with health checks- Specialized providers for databases, HTTP, LLM, filesystem
Function System¶
- Interface-based design with
ITransformFunction,IValidationFunction - Extensive function library for common operations
- Custom function registration support
Use Cases¶
Data Processing Pipelines¶
- ETL workflows with multiple data sources and targets
- Data validation and quality checks
- Format transformation and normalization
- Batch and stream processing modes
API Orchestration¶
- Multi-service API coordination
- Rate limiting and quota management
- Retry logic with exponential backoff
- Circuit breaker patterns for fault tolerance
LLM/AI Workflows¶
Note: LLM functionality has moved to the dataknobs-llm package.
The dedicated LLM package provides: - Multi-provider LLM support (OpenAI, Anthropic) - FSM-based conversation flows - Prompt templating and versioning - RAG integration and caching - A/B testing and metrics
See the LLM package documentation for details.
File Processing¶
- Batch file processing with multiple formats
- Parallel processing with resource constraints
- Progress tracking and resumption
- Error handling and partial failure recovery
Stream Processing¶
- Real-time data ingestion and transformation
- Backpressure handling and flow control
- Window-based aggregations
- Event-driven architectures
Error Recovery Patterns¶
- Configurable retry strategies
- Circuit breakers with health monitoring
- Fallback and compensation logic
- Dead letter queue handling
Key Concepts¶
Understanding the Two Mode Types¶
The FSM package has two distinct mode concepts that are often confused:
DataHandlingMode - HOW data is managed¶
Controls how individual states handle data internally:
| Mode | Description | Use Case |
|---|---|---|
| COPY | Creates deep copies of data for safe concurrent processing | Default mode, best for multi-threaded environments |
| REFERENCE | Works with data references using optimistic locking | Large datasets, database-backed workflows |
| DIRECT | Operates directly on source data | Single-threaded, high-performance scenarios |
ProcessingMode - HOW MANY records to process¶
Controls the execution strategy for record volume:
| Mode | Description | Use Case |
|---|---|---|
| SINGLE | Process one record at a time | Simple record-by-record operations |
| BATCH | Process multiple records in batches | Optimizing database operations, transactions |
| STREAM | Process continuous streams of data | Large files, real-time data |
Key Difference: DataHandlingMode is about memory safety and concurrency within states, while ProcessingMode is about throughput and how many records to handle at once. They work together but address different concerns.
Common Combinations¶
| Use Case | ProcessingMode | DataHandlingMode | Why |
|---|---|---|---|
| Web API requests | SINGLE | COPY | Each request isolated, concurrent safety |
| ETL pipeline | BATCH | COPY | Transaction boundaries, rollback support |
| Large file streaming | STREAM | REFERENCE | Memory efficiency for continuous data |
| High-speed validation | SINGLE | DIRECT | Maximum performance, simple operations |
| Database bulk updates | BATCH | COPY | Transaction safety for batches |
Execution Strategies¶
Choose from multiple traversal strategies:
| Strategy | Description | When to Use |
|---|---|---|
| DEPTH_FIRST | Explores deeply before backtracking | Linear workflows, sequential processing |
| BREADTH_FIRST | Explores all branches at same level | Parallel exploration, comparison workflows |
| RESOURCE_OPTIMIZED | Minimizes resource usage | Resource-constrained environments |
| STREAM_OPTIMIZED | Optimized for streaming data | Real-time processing, event streams |
Function Types¶
Functions can be registered and used throughout the FSM:
| Type | Description | Example |
|---|---|---|
| inline | Lambda expressions or code strings | "lambda state: state.data.upper()" |
| builtin | Pre-registered library functions | {"type": "builtin", "name": "validate_email"} |
| custom | Functions from Python modules | {"type": "custom", "module": "myapp.transforms"} |
| registered | Runtime-registered functions | {"type": "registered", "name": "process_data"} |
Next Steps¶
- Getting Started: Follow the Quick Start Guide for a hands-on introduction
- Examples: Explore real-world examples with complete code
- API Reference: Read the API Documentation for detailed reference
- Patterns: Learn about Integration Patterns for common scenarios
Guides: Deep dive into specific features:¶
- Data Modes Guide - Choosing the right data operation mode
- Resource Management - Managing external resources
- Streaming Guide - Building stream processing workflows
- CLI Usage - Using the FSM command-line interface