Quick Start Guide¶
Get started with the FSM package in just a few minutes! This guide will walk you through the basics of creating and running finite state machines.
Installation¶
Or with optional dependencies:
# With database support
pip install dataknobs-fsm[database]
# With LLM provider support
pip install dataknobs-fsm[llm]
Your First FSM¶
Let's create a simple FSM that processes data through multiple stages:
from dataknobs_fsm.api.simple import SimpleFSM
from dataknobs_fsm.core.data_modes import DataHandlingMode
# Define FSM configuration
config = {
"name": "simple_processor",
"states": [
{"name": "start", "is_start": True},
{"name": "validate"},
{"name": "process"},
{"name": "complete", "is_end": True}
],
"arcs": [
{
"from": "start",
"to": "validate",
"transform": {
"type": "inline",
"code": "lambda data, ctx: {**data, 'validated': False}"
}
},
{
"from": "validate",
"to": "process",
"transform": {
"type": "inline",
"code": "lambda data, ctx: {**data, 'validated': True}"
},
"pre_test": {
"type": "inline",
"code": "lambda data, ctx: data.get('value', 0) > 0"
}
},
{
"from": "process",
"to": "complete",
"transform": {
"type": "inline",
"code": "lambda data, ctx: {**data, 'result': data['value'] * 2}"
}
}
]
}
# Create FSM instance
fsm = SimpleFSM(config, data_mode=DataHandlingMode.COPY)
# Process data through the FSM
result = fsm.process({"value": 10})
print(result)
# Output: {'final_state': 'complete', 'data': {'value': 10, 'validated': True, 'result': 20}, 'path': ['start', 'validate', 'process', 'complete'], 'success': True, 'error': None, 'metadata': {'arc_start_validate_usage': 1, 'arc_validate_process_usage': 1, 'arc_process_complete_usage': 1}}
Using Configuration Files¶
For more complex FSMs, use YAML configuration:
# workflow.yaml
name: data_workflow
description: A simple data processing workflow
states:
- name: start
is_start: true
- name: fetch_data
metadata:
timeout: 30
- name: transform
- name: save
- name: complete
is_end: true
arcs:
- from: start
to: fetch_data
- from: fetch_data
to: transform
transform:
type: python
module: myapp.transformers
name: clean_data
- from: transform
to: save
transform:
type: lambda
code: |
lambda data: {
**data,
"saved": True,
"timestamp": __import__("datetime").datetime.now().isoformat()
}
- from: save
to: complete
Load and run the configuration:
from dataknobs_fsm.api.simple import SimpleFSM
# Load FSM from configuration file
fsm = SimpleFSM("workflow.yaml")
# Process input data
result = fsm.process({"source": "api", "records": 100})
print(f"Final state: {result['final_state']}")
print(f"Success: {result['success']}")
print(f"Processed data: {result['data']}")
Async Execution¶
The FSM package provides both synchronous and asynchronous APIs. For I/O-bound operations or when working in async contexts, use the AsyncSimpleFSM:
import asyncio
from dataknobs_fsm.api.async_simple import AsyncSimpleFSM
from dataknobs_fsm.api.simple import SimpleFSM
# Define async processing functions
async def fetch_data(state):
"""Simulate async API call."""
await asyncio.sleep(1)
data = state.data.copy()
data["fetched"] = True
return data
async def save_data(state):
"""Simulate async database save."""
await asyncio.sleep(0.5)
data = state.data.copy()
data["saved"] = True
return data
# Configuration with registered functions
config = {
"name": "async_workflow",
"states": [
{"name": "start", "is_start": True},
{"name": "fetch"},
{"name": "save"},
{"name": "done", "is_end": True}
],
"arcs": [
{
"from": "start",
"to": "fetch",
"transform": {"type": "registered", "name": "fetch_data"}
},
{
"from": "fetch",
"to": "save",
"transform": {"type": "registered", "name": "save_data"}
},
{
"from": "save",
"to": "done"
}
]
}
# Use AsyncSimpleFSM for native async support
async def main():
fsm = AsyncSimpleFSM(
config,
custom_functions={
"fetch_data": fetch_data,
"save_data": save_data
}
)
result = await fsm.process({"id": 123})
print(f"Success: {result['success']}")
print(f"Data: {result['data']}")
await fsm.close()
# Run the async FSM
asyncio.run(main())
# Note: SimpleFSM is for synchronous contexts only.
# Use AsyncSimpleFSM when working with async/await.
Using Resources¶
Manage external resources like databases and APIs:
from dataknobs_fsm.api.simple import SimpleFSM
# Configuration with resources
config = {
"name": "resource_workflow",
"resources": [
{
"name": "db",
"type": "database",
"provider": "sqlite",
"config": {"database": "myapp.db"}
}
],
"states": [
{"name": "start", "is_start": True},
{
"name": "query",
"resources": ["db"] # This state requires the database resource
},
{"name": "done", "is_end": True}
],
"arcs": [
{
"from": "start",
"to": "query"
},
{
"from": "query",
"to": "done",
"transform": {
"type": "registered",
"name": "process_query_result"
}
}
]
}
# Custom function that uses resources
def process_query_result(state):
"""Process database query results."""
# In actual implementation, resources are accessed via context
data = state.data.copy()
data["processed"] = True
return data
# Create FSM with resources
fsm = SimpleFSM(
config,
custom_functions={"process_query_result": process_query_result}
)
# Process with resource management
result = fsm.process({"user_id": 1})
print(f"Result: {result}")
Batch Processing¶
Process multiple items efficiently:
from dataknobs_fsm.api.simple import SimpleFSM
# Configuration for batch processing
config = {
"name": "batch_processor",
"states": [
{"name": "start", "is_start": True},
{"name": "process"},
{"name": "done", "is_end": True}
],
"arcs": [
{
"from": "start",
"to": "process",
"transform": {
"type": "inline",
"code": "lambda data, ctx: {**data, 'processed': True}"
}
},
{"from": "process", "to": "done"}
]
}
# Create FSM for batch processing
fsm = SimpleFSM(config)
# Process batch of items
items = [
{"id": 1, "value": 10},
{"id": 2, "value": 20},
{"id": 3, "value": 30}
]
# Use process_batch method
results = fsm.process_batch(
items,
batch_size=10,
max_workers=3
)
for result in results:
print(f"Item {result['data']['id']}: Success={result['success']}")
Error Handling¶
Implement error handling with the AdvancedFSM:
import asyncio
import random
from dataknobs_fsm.api.advanced import create_advanced_fsm, ExecutionMode
def risky_operation(state):
"""Operation that might fail."""
if random.random() < 0.5:
raise ValueError("Random failure")
data = state.data.copy()
data["processed"] = True
return data
# Configuration with error handling
config = {
"name": "error_handler",
"states": [
{"name": "start", "is_start": True},
{"name": "process"},
{"name": "error"},
{"name": "done", "is_end": True}
],
"arcs": [
{
"from": "start",
"to": "process",
"transform": {
"type": "registered",
"name": "risky_operation"
},
"error_handler": {
"target_state": "error",
"max_retries": 3
}
},
{
"from": "error",
"to": "process",
"transform": {
"type": "inline",
"code": "lambda data, ctx: {**data, 'retry': True}"
}
},
{"from": "process", "to": "done"}
]
}
# Create FSM with error handling using factory function
fsm = create_advanced_fsm(
config,
custom_functions={"risky_operation": risky_operation},
execution_mode=ExecutionMode.DEBUG
)
# Execute and check result
async def run_error_handling():
trace = await fsm.trace_execution({"input": "data"})
for entry in trace:
print(f" {entry.get('from_state')} -> {entry.get('to_state')}")
asyncio.run(run_error_handling())
Advanced Features with AdvancedFSM¶
For debugging and step-by-step execution:
import asyncio
from dataknobs_fsm.api.advanced import create_advanced_fsm, ExecutionMode, ExecutionHook
# Set up hooks for monitoring state transitions
hooks = ExecutionHook(
on_state_enter=lambda state: print(f"-> Entering: {state}"),
on_state_exit=lambda state: print(f"<- Exiting: {state}")
)
# Create FSM via factory (accepts config dict, YAML path, or FSM instance)
fsm = create_advanced_fsm(
"workflow.yaml",
execution_mode=ExecutionMode.STEP_BY_STEP
)
fsm.set_hooks(hooks)
async def debug_workflow():
test_data = {"input": "data"}
# Step-by-step execution (step() returns StepResult)
async with fsm.execution_context(test_data) as context:
while True:
result = await fsm.step(context)
print(f" {result.from_state} -> {result.to_state}")
if not result.success or result.is_complete or result.transition == "none":
break
# Or run with profiling
profile = await fsm.profile_execution(test_data)
print(f"\nExecution took {profile['total_time']:.4f}s")
print(f"Transitions: {profile['transitions']}")
asyncio.run(debug_workflow())
Next Steps¶
Now that you understand the basics:
- Explore Integration Patterns for pre-built solutions
- Read the API Documentation for detailed reference
- Check out Examples for real-world use cases
- Learn about Data Modes for efficient data handling
- Understand Resource Management for external integrations