This guide provides comprehensive documentation for creating FSM configurations in the DataKnobs FSM framework.
Every FSM configuration must have the following top-level structure:
config = {
"name": "MyFSM", # Required: Name of the FSM
"version": "1.0.0", # Optional: Version (default: "1.0.0")
"description": "Description", # Optional: FSM description
"main_network": "main", # Required: Name of the main network to execute
"networks": [ # Required: List of network definitions
{
"name": "main", # Network name (must match main_network)
"states": [...], # List of state definitions
# Note: arcs can be defined at network OR state level (see below)
}
],
"data_mode": {...}, # Optional: Data handling configuration
"transaction": {...}, # Optional: Transaction configuration
"resources": [...], # Optional: External resource definitions
"execution_strategy": "depth_first", # Optional: Execution strategy
"max_transitions": 1000, # Optional: Maximum transitions (default: 1000)
"timeout_seconds": 60, # Optional: Execution timeout
"metadata": {...} # Optional: Additional metadata
}
States are the nodes in your FSM graph. Each state has a name and metadata that defines its behavior.
{
"name": "state_name", # Required: Unique name within the network
"is_start": True, # Optional: Marks this as an initial state (default: False)
"is_end": True, # Optional: Marks this as a final state (default: False)
# Schema validation (JSON Schema format)
"schema": { # Optional: JSON schema for data validation
"type": "object",
"properties": {...},
"required": [...]
},
# Function definitions
"pre_validators": [...], # Optional: Pre-validation functions (run before state)
"validators": [...], # Optional: Validation functions
"transforms": [...], # Optional: Transform functions (run on state entry)
# Arc definitions (state-level)
"arcs": [...], # Optional: Outgoing transitions (see Arcs section)
# Resources and configuration
"resources": ["db", "api"], # Optional: Required resource names
"data_mode": "copy", # Optional: Override data mode for this state
"metadata": {...} # Optional: Additional metadata
}
is_start: true
)
is_end: true
)
is_start: true
AND is_end: true
)
"states": [
{
"name": "input",
"is_start": true,
"schema": {
"type": "object",
"properties": {
"data": {"type": "string"},
"count": {"type": "integer", "minimum": 0}
},
"required": ["data"]
},
"arcs": [
{"target": "validate"} # Simple arc to next state
]
},
{
"name": "validate",
"validators": [
{
"type": "inline",
"code": "lambda state: state.data.get('count', 0) > 0"
}
],
"arcs": [
{
"target": "process",
"condition": {
"type": "inline",
"code": "lambda state: state.data.get('valid', True)"
}
},
{
"target": "error",
"condition": {
"type": "inline",
"code": "lambda state: not state.data.get('valid', True)"
}
}
]
},
{
"name": "process",
"transforms": [
{
"type": "inline",
"code": "lambda state: {'processed': state.data['data'].upper()}"
}
],
"resources": ["database"], # Requires database resource
"arcs": [
{"target": "output"}
]
},
{
"name": "output",
"is_end": true
},
{
"name": "error",
"is_end": true
}
]
Arcs define the transitions between states and the conditions under which they occur. Arcs can be defined in two ways:
Define arcs as a property of the source state:
{
"name": "state_a",
"arcs": [
{
"target": "state_b", # Required: Target state name
"condition": {...}, # Optional: Condition function
"transform": {...}, # Optional: Transform function
"priority": 0, # Optional: Priority (higher = higher priority)
"metadata": {...} # Optional: Additional metadata
}
]
}
Define arcs at the network level with explicit from
and to
fields:
{
"name": "main",
"states": [...],
"arcs": [
{
"from": "state_a", # Required: Source state name
"to": "state_b", # Required: Target state name
"name": "arc_name", # Optional: Arc name
"condition": {...}, # Optional: Condition function
"transform": {...}, # Optional: Transform function
"priority": 0, # Optional: Priority
"metadata": {...} # Optional: Additional metadata
}
]
}
Push arcs allow transitions to different networks (subnetworks):
{
"target": "initial_state", # Initial state in target network
"target_network": "validation", # Required: Target network name
"return_state": "continue", # Optional: State to return to after subnetwork
"data_isolation": "copy", # Optional: Data handling mode
"condition": {...}, # Optional: Condition function
"transform": {...} # Optional: Transform function
}
Format: "target_network[:initial_state]"
"validation:start"
pushes to the “validation” network’s “start” state{
"target": "process",
"condition": {
"type": "inline",
"code": "lambda state: state.data.get('valid', False)"
}
}
{
"target": "transform",
"transform": {
"type": "inline",
"code": "lambda state: {'records': [r.upper() for r in state.data['records']]}"
}
}
When multiple arcs from a state have conditions that evaluate to true:
Networks are collections of states and arcs that define a complete FSM or sub-FSM.
{
"name": "network_name", # Required: Unique network name
"states": [...], # Required: List of states
"arcs": [...], # Optional: Network-level arc definitions
"resources": ["db", "api"], # Optional: Resource names used by this network
"streaming": { # Optional: Streaming configuration
"enabled": true,
"chunk_size": 100,
"parallelism": 4
},
"metadata": {...} # Optional: Additional metadata
}
FSMs can have multiple networks for modular design:
{
"name": "MultiNetworkFSM",
"main_network": "main",
"networks": [
{
"name": "main",
"states": [
{
"name": "start",
"is_start": true,
"arcs": [
{
"target": "validate",
"target_network": "validation", # Push to validation network
"return_state": "process" # Return here after validation
}
]
},
{
"name": "process",
"arcs": [{"target": "end"}]
},
{
"name": "end",
"is_end": true
}
]
},
{
"name": "validation",
"states": [
{
"name": "validate",
"is_start": true,
"validators": [...],
"arcs": [{"target": "complete"}]
},
{
"name": "complete",
"is_end": true
}
]
}
]
}
Functions define the processing logic for states and transitions. They receive specific parameters and must return appropriate values.
{
"type": "inline",
"code": "lambda state: {'result': state.data['value'] * 2}"
}
{
"type": "registered",
"name": "process_data"
}
{
"type": "builtin",
"name": "validators.validate_json",
"params": {"schema": {...}}
}
{
"type": "custom",
"module": "my_module",
"name": "my_function"
}
Validation functions check data validity and are called with:
Input Parameters:
data
: The current state data (dict or object)context
: Optional execution context containing:
state_name
: Current state namemetadata
: State metadataresources
: Available resourcesvariables
: Shared variablesReturn Value:
True
if validation passes, False
otherwisesuccess
property# Example validation function
def validate_data(data, context=None):
"""Validate that required fields exist and are valid."""
if not data.get('user_id'):
return False
if data.get('amount', 0) < 0:
return False
return True
Transform functions modify data and are called with:
Input Parameters:
data
: The current state datacontext
: Optional execution context (same as validation)Return Value:
data
property# Example transform function
def transform_data(data, context=None):
"""Transform data by adding timestamp and formatting."""
return {
'original': data,
'timestamp': time.time(),
'formatted': data.get('text', '').upper()
}
Condition functions determine arc traversal and are called with:
Input Parameters:
data
: The current state datacontext
: Optional execution contextReturn Value:
True
to take the arc, False
to skip# Example condition function
def check_threshold(data, context=None):
"""Check if value exceeds threshold."""
threshold = context.get('variables', {}).get('threshold', 100)
return data.get('value', 0) > threshold
{
"name": "process",
"pre_validators": [ # Run before state entry
{
"type": "inline",
"code": "lambda data: data.get('input') is not None"
}
],
"validators": [ # Validate state data
{
"type": "registered",
"name": "validate_format"
}
],
"transforms": [ # Transform data on state entry
{
"type": "inline",
"code": "lambda data: {'result': process(data['input'])}"
}
]
}
For a state, functions execute in this order:
Data modes control how data is handled during state transitions.
{
"data_mode": {
"default": "copy", # Default mode for all states
"state_overrides": { # Override for specific states
"stream_state": "reference",
"process_state": "direct"
},
"copy_config": {...}, # Configuration for COPY mode
"reference_config": {...}, # Configuration for REFERENCE mode
"direct_config": {...} # Configuration for DIRECT mode
}
}
Configure how the FSM handles transactions across states.
{
"transaction": {
"strategy": "batch", # Transaction strategy
"batch_size": 100, # Batch size for BATCH strategy
"commit_triggers": ["save"], # State names that trigger commits
"rollback_on_error": true, # Rollback on error (default: true)
"timeout_seconds": 30 # Transaction timeout
}
}
Resources are external dependencies that states can use.
{
"resources": [
{
"name": "database",
"type": "database",
"config": {
"connection_string": "postgresql://localhost/mydb",
"pool_size": 10
},
"connection_pool_size": 10, # Connection pool size
"timeout_seconds": 30, # Operation timeout
"retry_attempts": 3, # Retry count on failure
"retry_delay_seconds": 1.0, # Delay between retries
"health_check_interval": 60 # Health check interval (seconds)
},
{
"name": "api",
"type": "http",
"config": {
"base_url": "https://api.example.com",
"headers": {
"Authorization": "Bearer ${API_TOKEN}"
},
"timeout": 30
}
},
{
"name": "llm",
"type": "llm",
"config": {
"provider": "openai",
"model": "gpt-4",
"api_key": "${OPENAI_API_KEY}",
"temperature": 0.7
}
},
{
"name": "custom_resource",
"type": "custom",
"config": {
"class": "my_module.MyResourceClass",
"param1": "value1"
}
}
]
}
Resources are available in the function context:
def fetch_data(data, context):
"""Fetch data using configured resources."""
# Access database resource
db = context['resources']['database']
result = db.query("SELECT * FROM users WHERE id = ?", [data['user_id']])
# Access API resource
api = context['resources']['api']
response = api.get(f"/users/{data['user_id']}")
return {
'db_data': result,
'api_data': response.json()
}
Control how the FSM executes states and transitions.
{
"execution_strategy": "depth_first" # Execution strategy
}
Options:
"depth_first"
- Depth-first traversal (default)"breadth_first"
- Breadth-first traversal"resource_optimized"
- Optimize for resource utilization"stream_optimized"
- Optimize for streaming datasimple_pipeline = {
"name": "SimpleDataPipeline",
"main_network": "main",
"data_mode": {
"default": "copy"
},
"networks": [{
"name": "main",
"states": [
{
"name": "input",
"is_start": true,
"schema": {
"type": "object",
"properties": {
"data": {"type": "string"}
},
"required": ["data"]
},
"arcs": [
{"target": "validate"}
]
},
{
"name": "validate",
"validators": [
{
"type": "inline",
"code": "lambda data: len(data.get('data', '')) > 0"
}
],
"arcs": [
{
"target": "transform",
"condition": {
"type": "inline",
"code": "lambda data: data.get('valid', True)"
}
},
{
"target": "error",
"condition": {
"type": "inline",
"code": "lambda data: not data.get('valid', True)"
}
}
]
},
{
"name": "transform",
"transforms": [
{
"type": "inline",
"code": "lambda data: {'result': data['data'].upper(), 'length': len(data['data'])}"
}
],
"arcs": [
{"target": "output"}
]
},
{
"name": "output",
"is_end": true
},
{
"name": "error",
"is_end": true,
"transforms": [
{
"type": "inline",
"code": "lambda data: {'error': 'Validation failed', 'input': data}"
}
]
}
]
}]
}
etl_pipeline = {
"name": "ETLPipeline",
"main_network": "main",
"data_mode": {
"default": "copy"
},
"transaction": {
"strategy": "batch",
"batch_size": 1000,
"rollback_on_error": true
},
"resources": [
{
"name": "source_db",
"type": "database",
"config": {
"connection_string": "${SOURCE_DB_URL}",
"pool_size": 5
}
},
{
"name": "target_db",
"type": "database",
"config": {
"connection_string": "${TARGET_DB_URL}",
"pool_size": 10
}
}
],
"networks": [{
"name": "main",
"states": [
{
"name": "start",
"is_start": true,
"arcs": [{"target": "extract"}]
},
{
"name": "extract",
"transforms": [
{
"type": "registered",
"name": "extract_from_source"
}
],
"resources": ["source_db"],
"arcs": [
{
"target": "validate",
"transform": {
"type": "inline",
"code": "lambda data: {'records': data['extracted'], 'count': len(data['extracted'])}"
}
}
]
},
{
"name": "validate",
"validators": [
{
"type": "registered",
"name": "validate_records"
}
],
"arcs": [
{
"target": "transform",
"condition": {
"type": "inline",
"code": "lambda data: data.get('validation_passed', False)"
}
},
{
"target": "failure",
"condition": {
"type": "inline",
"code": "lambda data: not data.get('validation_passed', False)"
}
}
]
},
{
"name": "transform",
"transforms": [
{
"type": "registered",
"name": "transform_records"
}
],
"arcs": [{"target": "load"}]
},
{
"name": "load",
"transforms": [
{
"type": "registered",
"name": "load_to_target"
}
],
"resources": ["target_db"],
"arcs": [
{
"target": "success",
"condition": {
"type": "inline",
"code": "lambda data: data.get('load_successful', False)"
}
},
{
"target": "failure",
"condition": {
"type": "inline",
"code": "lambda data: not data.get('load_successful', False)"
}
}
]
},
{
"name": "success",
"is_end": true,
"transforms": [
{
"type": "inline",
"code": "lambda data: {'status': 'success', 'records_processed': data.get('count', 0)}"
}
]
},
{
"name": "failure",
"is_end": true,
"transforms": [
{
"type": "registered",
"name": "rollback_changes"
}
]
}
]
}]
}
multi_network_fsm = {
"name": "OrderProcessingFSM",
"main_network": "main",
"networks": [
{
"name": "main",
"states": [
{
"name": "receive_order",
"is_start": true,
"arcs": [
{
"target": "validate_order",
"target_network": "validation",
"return_state": "process_payment"
}
]
},
{
"name": "process_payment",
"arcs": [
{
"target": "check_payment",
"target_network": "payment",
"return_state": "ship_order"
}
]
},
{
"name": "ship_order",
"arcs": [{"target": "complete"}]
},
{
"name": "complete",
"is_end": true
}
]
},
{
"name": "validation",
"states": [
{
"name": "validate_order",
"is_start": true,
"validators": [
{
"type": "inline",
"code": "lambda data: all([data.get('customer_id'), data.get('items')])"
}
],
"arcs": [{"target": "validation_complete"}]
},
{
"name": "validation_complete",
"is_end": true
}
]
},
{
"name": "payment",
"states": [
{
"name": "check_payment",
"is_start": true,
"transforms": [
{
"type": "registered",
"name": "process_payment_method"
}
],
"arcs": [{"target": "payment_complete"}]
},
{
"name": "payment_complete",
"is_end": true
}
]
}
]
}
For simple FSMs, you can use a simplified format that gets transformed to the full format:
{
"name": "SimpleFSM",
"states": { # Dict format for states
"start": {
"is_start": true,
"on_complete": {"target": "process"} # Inline transition
},
"process": {
"transform": "lambda data: {'result': data['input'] * 2}",
"on_complete": {"target": "end"}
},
"end": {
"final": true # Alternative to is_end
}
},
"initial_state": "start" # Alternative way to specify start state
}
FSM configurations can be written in YAML for better readability:
name: MyFSM
main_network: main
networks:
- name: main
states:
- name: start
is_start: true
arcs:
- target: process
- name: process
transforms:
- type: inline
code: "lambda data: {'result': data['value'] * 2}"
arcs:
- target: end
- name: end
is_end: true
Environment variables can be used in configurations:
${VAR_NAME}
- Required variable${VAR_NAME:-default}
- Variable with default value${VAR_NAME:?error message}
- Required with custom error message{
"resources": [
{
"name": "database",
"type": "database",
"config": {
"connection_string": "${DATABASE_URL}",
"password": "${DB_PASSWORD:-defaultpass}",
"api_key": "${API_KEY:?API key is required}"
}
}
]
}
The loader will also check for variables with the FSM_
prefix automatically.
is_start: true
)is_end: true
)"states": [
{"name": "input", "is_start": true, "arcs": [{"target": "validate"}]},
{"name": "validate", "validators": [...], "arcs": [
{"target": "process", "condition": {"type": "inline", "code": "lambda d: d.get('valid')"}},
{"target": "error", "condition": {"type": "inline", "code": "lambda d: not d.get('valid')"}}
]},
{"name": "process", "transforms": [...], "arcs": [{"target": "output"}]},
{"name": "output", "is_end": true},
{"name": "error", "is_end": true}
]
"states": [
{"name": "attempt", "transforms": [...], "arcs": [
{"target": "success", "condition": {"type": "inline", "code": "lambda d: d.get('succeeded')"}},
{"target": "wait", "condition": {"type": "inline", "code": "lambda d: d.get('retries', 0) < 3"}},
{"target": "failure", "condition": {"type": "inline", "code": "lambda d: d.get('retries', 0) >= 3"}}
]},
{"name": "wait", "transforms": [
{"type": "inline", "code": "lambda d: {**d, 'wait': 2 ** d.get('retries', 0), 'retries': d.get('retries', 0) + 1}"}
], "arcs": [{"target": "attempt"}]},
{"name": "success", "is_end": true},
{"name": "failure", "is_end": true}
]
"states": [
{"name": "split", "transforms": [
{"type": "inline", "code": "lambda d: {'chunks': split_data(d['data'])}"}
], "arcs": [
{"target": "process_chunk", "target_network": "processor", "return_state": "merge"}
]},
{"name": "merge", "transforms": [
{"type": "inline", "code": "lambda d: {'result': combine_results(d['chunks'])}"}
], "arcs": [{"target": "complete"}]}
]
"is_start": true
to at least one statemain_network
matches a network name in networks
resources
sectionIf migrating from other FSM formats, note these key differences:
is_start
and is_end
flags instead of special state types