FSM Configuration Guide¶
This guide provides comprehensive documentation for creating and understanding FSM configurations in the DataKnobs FSM framework.
Complete Configuration Reference
For the full configuration guide with all options and examples, see the FSM package documentation.
Quick Reference¶
Basic Structure¶
Every FSM configuration requires:
config = {
"name": "MyFSM", # FSM name
"main_network": "main", # Main network to execute
"networks": [ # List of networks
{
"name": "main",
"states": [...], # State definitions
"arcs": [...] # Transition definitions
}
]
}
State Definition¶
States are the nodes in your FSM:
{
"name": "state_name",
"is_start": True, # Initial state flag
"is_end": True, # Final state flag
"functions": { # State functions
"transform": {...}, # Data transformation
"validate": {...} # Data validation
},
"schema": {...} # JSON schema for validation
}
Arc (Transition) Definition¶
Arcs define transitions between states:
{
"from": "source_state",
"to": "target_state",
"condition": {...}, # Optional condition
"transform": {...}, # Optional transformation
"priority": 0 # Arc priority
}
Essential Concepts¶
1. States and Types¶
Initial States (is_start: True)
- Entry points for FSM execution
- At least one required per network
Final States (is_end: True)
- Terminal states where execution ends
- No outgoing arcs allowed
Normal States - Intermediate processing states - Must have incoming and outgoing arcs
2. Functions¶
Functions define processing logic:
# Inline function
{
"type": "inline",
"code": "lambda state: {'result': state.data['value'] * 2}"
}
# Registered function
{
"type": "registered",
"name": "process_data"
}
# Built-in function
{
"type": "builtin",
"name": "validate_json",
"params": {"schema": {...}}
}
3. Data Modes¶
Control how data flows through states:
- COPY (default): Safe for transactions, higher memory
- REFERENCE: Memory efficient, shared data
- DIRECT: Most efficient, no rollback
Common Patterns¶
Validation → Process → Output¶
{
"states": [
{"name": "input", "is_start": True},
{"name": "validate", "functions": {"validate": {...}}},
{"name": "process", "functions": {"transform": {...}}},
{"name": "output", "is_end": True},
{"name": "error", "is_end": True}
],
"arcs": [
{"from": "input", "to": "validate"},
{"from": "validate", "to": "process", "condition": "valid"},
{"from": "validate", "to": "error", "condition": "not valid"},
{"from": "process", "to": "output"}
]
}
Retry with Backoff¶
{
"states": [
{"name": "attempt"},
{"name": "retry_wait"},
{"name": "success", "is_end": True},
{"name": "failure", "is_end": True}
],
"arcs": [
{"from": "attempt", "to": "success", "condition": "succeeded"},
{"from": "attempt", "to": "retry_wait", "condition": "retry_needed"},
{"from": "attempt", "to": "failure", "condition": "max_retries"},
{"from": "retry_wait", "to": "attempt"}
]
}
Examples¶
Simple Data Pipeline¶
simple_pipeline = {
"name": "SimpleDataPipeline",
"main_network": "main",
"networks": [{
"name": "main",
"states": [
{"name": "input", "is_start": True},
{
"name": "transform",
"functions": {
"transform": {
"type": "inline",
"code": "lambda state: {'result': state.data['data'].upper()}"
}
}
},
{"name": "output", "is_end": True}
],
"arcs": [
{"from": "input", "to": "transform"},
{"from": "transform", "to": "output"}
]
}]
}
ETL Pipeline¶
etl_pipeline = {
"name": "ETLPipeline",
"main_network": "main",
"data_mode": {"default": "copy"}, # Transaction safety
"networks": [{
"name": "main",
"states": [
{"name": "start", "is_start": True},
{"name": "extract", "functions": {"transform": {...}}},
{"name": "validate", "functions": {"validate": {...}}},
{"name": "transform", "functions": {"transform": {...}}},
{"name": "load", "functions": {"transform": {...}}},
{"name": "success", "is_end": True},
{"name": "failure", "is_end": True}
],
"arcs": [
{"from": "start", "to": "extract"},
{"from": "extract", "to": "validate"},
{"from": "validate", "to": "transform", "condition": "valid"},
{"from": "validate", "to": "failure", "condition": "not valid"},
{"from": "transform", "to": "load"},
{"from": "load", "to": "success", "condition": "success"},
{"from": "load", "to": "failure", "condition": "failed"}
]
}]
}
Best Practices¶
- Always define initial and final states
- Use meaningful state and arc names
- Validate data early in the pipeline
- Choose appropriate data modes:
- COPY for transactional workflows
- REFERENCE for streaming/read-only
- DIRECT for simple transformations
- Register complex functions instead of inline code
- Add error states for graceful failure handling
- Use conditions to control flow
- Document with metadata
Troubleshooting¶
Common Errors¶
| Error | Solution |
|---|---|
| "Network must have at least one start state" | Add "is_start": True to a state |
| "Arc target 'X' not found in network" | Ensure arc targets exist |
| "Main network 'X' not found" | Check main_network name matches |
| Function execution errors | Verify lambda syntax and data structure |
Debugging Tips¶
- Use FSM debugger to step through execution
- Add logging in transform functions
- Start simple and add complexity gradually
- Test functions independently first
Related Examples¶
- End-to-End Streaming - Streaming data through FSM
- Database ETL - ETL pipeline pattern
- File Processing - File transformation workflows
- LLM Conversation - Conversational AI patterns
Multi-Transform Arcs¶
The transform field on an arc can be a single function name or a list.
When a list is provided, transforms are executed sequentially -- each
transform's output becomes the next transform's input:
from dataknobs_fsm.core.arc import ArcDefinition
# Single transform
arc = ArcDefinition(target_state="next", transform="validate")
# Multi-transform pipeline
arc = ArcDefinition(
target_state="next",
transform=["validate", "normalize", "enrich"],
)
In YAML configuration:
All transforms in the list share a single FunctionContext (with the same
resources and metadata). If any transform raises an error, the entire arc
execution fails with a FunctionError.
Each transform function receives (data, func_context) and returns the
transformed data. If a transform returns an ExecutionResult, a successful
result is unwrapped to its .data field and a failed result raises
FunctionError.
Push Arcs¶
Push arcs enable hierarchical FSM composition by pushing execution to a sub-network:
from dataknobs_fsm.core.arc import PushArc, DataIsolationMode
push = PushArc(
target_state="sub_start",
target_network="validation",
return_state="review",
isolation_mode=DataIsolationMode.COPY,
data_mapping={"order_id": "id"}, # parent → child
result_mapping={"is_valid": "validated"}, # child → parent
)
| Field | Type | Default | Description |
|---|---|---|---|
target_network |
str |
"" |
Name of the sub-network to push to |
return_state |
str \| None |
None |
State to return to after sub-network completes |
isolation_mode |
DataIsolationMode |
COPY |
How data is isolated between networks |
pass_context |
bool |
True |
Whether to pass execution context to sub-network |
data_mapping |
dict[str, str] |
{} |
Map parent fields to child fields |
result_mapping |
dict[str, str] |
{} |
Map child results back to parent fields |
DataIsolationMode options:
COPY-- deep copy data (safe, default)REFERENCE-- pass by reference (fast, shared mutations)SERIALIZE-- serialize/deserialize (maximum isolation)
See the Subflows Guide for a complete walkthrough.
API References¶
- SimpleFSM API - Simple synchronous API
- AsyncSimpleFSM API - Async API for streaming
- AdvancedFSM API - Advanced features and debugging
Full Documentation¶
For complete details including: - All configuration options - Advanced patterns - Migration guides - Network composition - Resource management - Streaming configuration
See the API documentation for complete details.