This document provides a comprehensive description of how data flows through a Finite State Machine (FSM) network, including state transitions, arc execution, function invocation, and subnetwork handling.
An FSM processes input data by transitioning through a configured network of states connected by arcs. Processing begins at a designated start state and continues until reaching an end state or exhausting valid transitions.
graph TD
Input[Input Data] --> Start[Start State]
Start --> Process[Process States]
Process --> Arc[Evaluate Arcs]
Arc --> Transition[Transition]
Transition --> Process
Process --> End[End State]
End --> Output[Output Result]
States are the fundamental building blocks of an FSM network. Each state represents a distinct processing stage with:
Arcs define transitions between states and control the flow of execution through:
Networks are collections of interconnected states that define complete processing workflows:
When FSM execution begins:
flowchart LR
A[Create ExecutionContext] --> B[Find Initial State]
B --> C[Set Current State]
C --> D[Execute Initial State Transforms]
ExecutionContext Creation:
Initial State Selection:
is_start: true
Upon entering a new state, three distinct phases occur:
flowchart TD
Enter[Enter State] --> PreVal[Execute Pre-Validators]
PreVal --> PrePass{Pre-Validation Passed?}
PrePass -->|No| Error[Handle Error]
PrePass -->|Yes| Trans[Execute State Transforms]
Trans --> PostVal[Execute Post-Validators]
PostVal --> PostPass{Post-Validation Passed?}
PostPass -->|Yes| Ready[State Ready for Arc Evaluation]
PostPass -->|No| Error
Processing Phases:
The FSM supports two types of validators that execute at different phases:
Pre-Validators: Verify incoming data before transforms Post-Validators: Validate transformed data before arc evaluation
Both types can be defined as:
# Class-based validator (IValidationFunction)
def validate(self, data: Any, context: Dict[str, Any] | None = None) -> ExecutionResult:
# Validate data
# Return ExecutionResult with success/failure
return ExecutionResult.success_result(validated_data)
# Function-based validator
def validate(data: Dict[str, Any], context: FunctionContext) -> Dict[str, Any]:
# Validate data
# Return validation results or raise exception
return validated_data
# Inline lambda validator (receives StateDataWrapper)
lambda state: state.data.get('field') > threshold
Arguments passed:
data
(Any type) and optional context
dictdata
(dict) and context
(FunctionContext)state
(StateDataWrapper with .data
attribute)context
: FunctionContext containing:
state_name
: Current state namefunction_name
: Validator function namemetadata
: State and execution metadataresources
: Available resource handlesExpected output:
context.data
Transforms execute after validation to modify data. Like validators, they support multiple forms:
# Class-based transform (ITransformFunction)
def transform(self, data: Any, context: Dict[str, Any] | None = None) -> ExecutionResult:
# Transform data
return ExecutionResult.success_result(transformed_data)
# Function-based transform with state resources
def transform(data: Dict[str, Any], context: FunctionContext) -> Dict[str, Any]:
# Access state-level resources
db = context.resources.get('database') # From state resources
cache = context.resources.get('cache') # From state resources
if db:
# Use database to enrich data
user_data = db.query("SELECT * FROM users WHERE id = ?", [data['user_id']])
data['user'] = user_data.first()
if cache:
# Store in cache for later
cache.set(f"user:{data['user_id']}", data['user'])
return data
# Inline lambda transform
lambda state: {**state.data, 'new_field': 'value'}
Arguments passed:
data
(Any type) and optional context
dictdata
(dict) and context
(FunctionContext with state resources)state
(StateDataWrapper)context
: FunctionContext containing state-allocated resourcesExpected output:
context.data
After state functions complete, the engine evaluates available transitions:
flowchart TD
State[Current State] --> GetArcs[Get Outgoing Arcs]
GetArcs --> SortPri[Sort by Priority]
SortPri --> EvalCond[Evaluate Pre-tests]
EvalCond --> Filter[Filter Valid Arcs]
Filter --> Select[Select Arc by Strategy]
Select --> Execute[Execute Arc]
Pre-test functions determine arc eligibility. They’re typically defined as conditions in arc configuration:
# Class-based test function (IStateTestFunction)
def test(self, data: Any, context: Dict[str, Any] | None = None) -> Tuple[bool, str | None]:
# Evaluate condition
return (condition_met, "reason if false")
# Function-based pre-test
def pre_test(data: Dict[str, Any], context: FunctionContext) -> bool:
# Evaluate condition
return condition_met
# Inline lambda condition (most common)
lambda state: state.data.get('status') == 'ready'
Arguments passed:
data
and optional context
dictdata
(dict) and context
(FunctionContext)state
(StateDataWrapper)context
: FunctionContext with current state informationExpected output:
True
allows arc for selectionFalse
excludes arc from considerationThe engine selects from valid arcs using a deterministic priority-based approach:
Priority Rules:
priority
values in arc configuration (default: 0)This ensures predictable, deterministic behavior where the best available path is always taken, with automatic fallback to alternative paths if the primary choice leads to failure downstream.
Once selected, arc execution proceeds:
flowchart TD
Arc[Selected Arc] --> Alloc[Acquire Resources from Pool]
Alloc --> Trans[Execute Transform]
Trans --> Update[Update Context]
Update --> Release[Release Resources to Pool]
Release --> Target[Transition to Target]
Note on “Allocate Resources”: This step acquires existing resources from the pool or cache, not creating new ones. Resources are efficiently reused throughout the FSM execution.
Transform functions modify data during transition:
# Function signature for arc transforms
def transform(data: Dict[str, Any], context: FunctionContext) -> Dict[str, Any]:
# Transform data during transition
return transformed_data
Arguments passed:
data
: Current data before transitioncontext
: Enhanced FunctionContext including:
Expected output:
After arc execution, the engine transitions to the target state:
flowchart LR
A[Update Previous State] --> B[Set Current State]
B --> C[Add to History]
C --> D[Enter New State]
Transition Process:
When a push arc is encountered during execution, a special subnetwork execution process is triggered:
flowchart TD
StateA[Current State] --> EvalCond[Evaluate Push Arc Condition]
EvalCond -->|Condition Met| PrepPush[Prepare for Push]
EvalCond -->|Condition Failed| NextArc[Try Next Arc]
PrepPush --> CheckDepth[Check Max Depth]
CheckDepth -->|OK| SaveRes[Save Parent State Resources]
CheckDepth -->|Exceeded| Error[Throw DepthError]
SaveRes --> ParseTarget[Parse target_network:initial_state]
ParseTarget --> PushStack[Push to Network Stack]
PushStack --> CreateContext[Create Sub-Context]
CreateContext --> ExecSub[Execute Subnetwork]
ExecSub --> SubDone{Subnetwork Complete?}
SubDone -->|Success| UpdateData[Update Parent Data]
SubDone -->|Failure| PopStack[Pop Network Stack]
UpdateData --> ReturnState{Return State Specified?}
ReturnState -->|Yes| EnterReturn[Enter Return State]
ReturnState -->|No| Continue[Continue Normal Flow]
EnterReturn --> Continue
PopStack --> Fail[Return Failure]
1. Arc Condition Evaluation
2. Depth Limit Check
StateTransitionError
if depth exceeded3. Resource Context Preservation
context.metadata['parent_state_resources']
4. Target Network Parsing
"network_name"
format (uses default initial state)"network_name:initial_state"
format (custom entry point)5. Network Stack Management
[(network_name, return_state)]
6. Context Isolation Modes
7. Subnetwork Execution
network_name:initial_state
syntax8. Result Integration
9. Return State Processing
return_state
specified: execution continues at that state in parent network10. Resource Cleanup
flowchart TD
PushArc[Execute Push Arc] --> TryPush[Attempt Push]
TryPush --> CheckErrors{Errors?}
CheckErrors -->|Network Not Found| PopReturn[Pop Stack & Return False]
CheckErrors -->|State Not Found| PopReturn
CheckErrors -->|Depth Exceeded| ThrowError[Throw StateTransitionError]
CheckErrors -->|Context Error| PopReturn
CheckErrors -->|Subnetwork Failed| UpdateFailed[Update with Failure Result]
CheckErrors -->|Success| UpdateSuccess[Update with Success Result]
PopReturn --> Cleanup[Cleanup Resources]
UpdateFailed --> Cleanup
UpdateSuccess --> ReturnSuccess[Return True]
Error Recovery:
The complete execution loop combines all phases:
flowchart TD
Start[Start Execution] --> Init[Initialize Context]
Init --> SetStart[Set Initial State]
SetStart --> InitPreVal[Execute Initial Pre-Validators]
InitPreVal --> InitTrans[Execute Initial Transforms]
InitTrans --> Loop{Execution Loop}
Loop --> ExecPostVal[Execute Post-Validators]
ExecPostVal --> GetArcs[Get Outgoing Arcs]
GetArcs --> EvalPre[Evaluate Arc Pre-tests]
EvalPre --> HasArcs{Valid Arcs?}
HasArcs -->|No| CheckEnd{Is End State?}
CheckEnd -->|Yes| Success[Return Success]
CheckEnd -->|No| Fail[Return Failure]
HasArcs -->|Yes| SelectArc[Select Best Arc]
SelectArc --> ExecArcTrans[Execute Arc Transform]
ExecArcTrans --> Transition[Transition to Target State]
Transition --> StatePreVal[Execute State Pre-Validators]
StatePreVal --> StateTrans[Execute State Transforms]
StateTrans --> CheckMax{Max Transitions?}
CheckMax -->|No| Loop
CheckMax -->|Yes| Timeout[Return Timeout]
Key Points:
Processing terminates when:
flowchart TD
Check[Check Conditions] --> End{Is End State?}
End -->|Yes| Terminate[Return Success]
End -->|No| NoTrans{No Valid Transitions?}
NoTrans -->|Yes| Fail[Return Failure]
NoTrans -->|No| MaxTrans{Max Transitions?}
MaxTrans -->|Yes| Timeout[Return Timeout]
MaxTrans -->|No| Continue[Continue Processing]
End Conditions:
is_end: true
The FSM configuration supports two formats for defining states and arcs:
This is the native format matching the schema definition:
# Network-level configuration
networks:
- name: "main"
states:
- name: "process_data"
is_start: false
is_end: false
resources: ["main_db", "cache"] # Resources for state transforms
pre_validators: # Validate incoming data before transforms
- type: "builtin"
name: "validate_input_schema"
params:
schema: {...}
transforms:
- type: "custom"
module: "my_module"
name: "process_transform"
validators: # Post-validators: validate after transforms
- type: "builtin"
name: "validate_output_schema"
params:
schema: {...}
arcs: # Arcs defined under the source state
- target: "next_state"
condition:
type: "inline"
code: "lambda state: state.data.get('ready', False)"
transform:
type: "builtin"
name: "add_metadata"
priority: 10
resources: ["api_client"] # Arc-specific resources
For simpler configurations, arcs can be defined at the network/root level:
# SimpleFSM format
name: "my_fsm"
states:
- name: "start"
is_start: true
- name: "process"
- name: "end"
is_end: true
arcs: # Arcs at root level with 'from' and 'to' fields
- from: "start"
to: "process"
condition:
type: "inline"
code: "data.get('ready', False)"
transform:
code: "data['processed'] = True; data"
priority: 10
- from: "process"
to: "end"
Note: The simplified format with root-level arcs is automatically converted to the canonical format during loading.
Configuration Mapping to Processing:
Config Element | Processing Stage | Function Type |
---|---|---|
pre_validators |
State entry, before transforms | IValidationFunction |
transforms |
State entry, after pre-validation | ITransformFunction |
validators |
After transforms, before arc evaluation | IValidationFunction |
arcs.condition |
Arc evaluation phase | IStateTestFunction |
arcs.transform |
Arc execution phase | ITransformFunction |
Arcs connect states and control transitions. They can be defined in two ways:
states:
- name: "source_state"
arcs:
- target: "target_state"
condition: # Pre-test function
type: "builtin"
name: "check_threshold"
params:
threshold: 100
transform: # Arc transform function
type: "inline"
code: "lambda state: {...}"
priority: 5
resources: ["main_db", "cache_service"] # Resource names
arcs:
- from: "source_state"
to: "target_state"
condition:
type: "inline"
code: "data.get('value') > 100"
transform:
code: "data['processed'] = True; data"
priority: 5
Arc Processing Correspondence:
priority
(higher first)condition
function determines eligibilitytransform
modifies data during transitionresources
list specifies resource names (not types) to acquireThe FSM is flexible about data formats but internally ensures consistency:
Accepted Input Formats:
{'field1': 'value1', 'field2': 123}
Internal Processing:
ensure_dict()
to convert all data to dictionary format before passing to functions.to_dict()
._data
or .data
→ extracted and converteddict(obj)
if possible, else empty dictData Flow:
# Input can be various types
input_data = MyCustomObject() # or dict, or FSMData
# FSM converts internally
context.data = ensure_dict(input_data) # Always a dict
# Functions receive dict (or wrapped for lambdas)
def my_function(data: Dict[str, Any], context: FunctionContext):
# data is always a dictionary here
return {'processed': True, **data}
Important: While the FSM accepts various input formats, functions always receive data as a dictionary (except inline lambdas which receive a StateDataWrapper). This ensures consistent behavior regardless of input type.
The data format remains consistent across execution modes, but the processing pattern differs:
# Single mode execution
fsm.execute({'user_id': 123, 'action': 'login'})
# Functions work with this single record throughout
context.batch_data
context.batch_results
# Batch mode - input is a list
batch_data = [
{'user_id': 123, 'action': 'login'},
{'user_id': 456, 'action': 'logout'},
]
# Each record processed separately, functions see one at a time
# Stream mode - data arrives in chunks
# Functions process one chunk at a time
# Each chunk is still a dict when passed to functions
Key Point: Regardless of execution mode, individual functions always work with a single data dictionary at a time. The execution engine handles the iteration and aggregation.
Resources are external systems or services that states and arcs can access during execution.
The FSM provides built-in resource adapters for:
database
):
filesystem
):
http
):
llm
):
custom
):
Resources are defined in the FSM configuration:
resources:
- name: "main_db"
type: "database"
config:
backend: "postgres"
host: "localhost"
database: "myapp"
connection_pool_size: 10
timeout_seconds: 30
- name: "api_client"
type: "http"
config:
base_url: "https://api.example.com"
auth_token: "${API_TOKEN}"
retry_attempts: 3
Resources can be defined at two levels with different scopes:
resources
field in state config):
resources
field in arc config):
Resource Merging Strategy:
states:
- name: "data_processor"
resources: ["database", "cache"] # State resources
transforms:
- type: "custom"
name: "process_with_db" # Gets: database, cache
arcs:
- target: "next_state"
resources: ["api_client"] # Arc-specific resources
transform:
name: "enrich_via_api"
# Arc transform gets: database, cache (from state) + api_client (from arc)
Important Considerations:
"database"
, Arc also has "database"
(conflict!)"main_db"
, Arc has "analytics_db"
["main_db"]
, Arc has ["main_db"]
["main_db"]
, Arc has []
(inherits state’s)Resources are acquired by the execution engine and passed to functions via the FunctionContext:
def my_transform(data: Dict[str, Any], context: FunctionContext) -> Dict[str, Any]:
# Access resources from context
db = context.resources.get('main_db') # Returns database connection
api = context.resources.get('api_client') # Returns HTTP client
if db:
# Use database resource
result = db.query("SELECT * FROM users WHERE id = ?", [data['user_id']])
data['user'] = result.first()
if api:
# Use HTTP resource
response = api.get(f"/users/{data['user_id']}")
data['api_data'] = response.json()
return data
Resource Lifecycle and Efficiency:
Resources are efficiently managed through pooling and caching mechanisms:
Arc needs resource → Check if already cached for this owner
→ If cached: Return existing instance
→ If not: Acquire from pool or create new
Arc executes → Resource available in context.resources
Arc completes → Resource released back to pool (not destroyed)
owner_id
(combination of arc identifier and execution ID)Important: Despite the term “Resource Allocation” in arc execution, resources are NOT rebuilt on each arc execution. The system efficiently reuses pooled resources. The “allocation” refers to checking out a resource from the pool, not creating a new one.
Functions receive context information through the FunctionContext
dataclass:
@dataclass
class FunctionContext:
state_name: str # Name of current state
function_name: str # Name of function being executed
metadata: Dict[str, Any] # Additional metadata
resources: Dict[str, Any] # Acquired resource instances
state_name
(str):
"validate_input"
, "process_data"
function_name
(str):
"transform"
or "validate"
metadata
(Dict[str, Any]):
'state'
: Current state name (duplicate of state_name)'source_state'
: Source state for arc transitions'target_state'
: Target state for arc transitions'is_end_state'
: Boolean if current state is an end stateresources
(Dict[str, Any]):
"main_db"
, "api_client"
)# Validator with resource access
def validate_with_db(data: Dict[str, Any], context: FunctionContext) -> Dict[str, Any]:
db = context.resources.get('validation_db')
if db and data.get('user_id'):
exists = db.exists('users', {'id': data['user_id']})
data['user_exists'] = exists
# Access metadata
if context.metadata.get('is_end_state'):
data['final_validation'] = True
return data
# Arc transform with state information
def arc_transform(data: Dict[str, Any], context: FunctionContext) -> Dict[str, Any]:
# Use state transition information
data['from_state'] = context.metadata.get('source_state')
data['to_state'] = context.metadata.get('target_state')
data['transition_time'] = time.time()
return data
Important Notes:
context
parameter for class-based functions (IValidationFunction, etc.) is a plain Dict[str, Any]
or Nonecontext
parameter for regular functions is always a FunctionContext
dataclass instanceFSMs support hierarchical composition through subnetworks and push arcs. This allows complex state machines to be decomposed into reusable, modular components.
Networks are defined as separate entities within the FSM configuration:
name: "order_processing"
networks:
- name: "main"
states:
- name: "validate_order"
transforms: ["validate_order_data"]
arcs:
- target: "process_payment" # Regular arc
- target_network: "inventory_check" # Push arc to subnetwork
return_state: "ship_order"
data_isolation: "copy"
- name: "ship_order"
# ... state config ...
- name: "inventory_check" # Subnetwork
states:
- name: "check_stock"
is_start: true
# ... state config ...
- name: "reserve_items"
is_end: true
# ... state config ...
main_network: "main"
Push arcs are special arcs that transfer execution to a subnetwork:
arcs:
- target_network: "validation_subnet" # Required: target network name
return_state: "continue_processing" # Optional: state to return to
data_isolation: "copy" # Data handling mode
pre_test: "should_validate" # Optional: condition for push
transform: "prepare_validation_data" # Optional: transform before push
priority: 10 # Arc priority
Push arcs support specifying a custom initial state in the target network using the network_name:initial_state
syntax:
arcs:
# Standard push arc - uses target network's default initial state
- target_network: "validation_subnet"
return_state: "continue_processing"
# Push arc with custom initial state - skip to specific state
- target_network: "validation_subnet:advanced_validation"
return_state: "continue_processing"
condition:
type: "inline"
code: "lambda state: state.data.get('requires_advanced_validation')"
Syntax: "<network_name>:<initial_state>"
network_name
: Name of the target subnetworkinitial_state
: Specific state to start execution at (bypasses default initial state)Use Cases:
Important Notes:
target_network
without colon syntax continues to workflowchart TD
Main[Main State] --> Check{Pre-test?}
Check -->|Pass| Transform[Arc Transform]
Transform --> Push[Push to Subnetwork]
Push --> SubStart[Subnetwork Start]
SubStart --> SubProcess[Process in Subnetwork]
SubProcess --> SubEnd[Subnetwork End]
SubEnd --> Return[Return to Main]
Return --> RetState[Return State]
data_isolation
mode:
copy
: Subnetwork gets a copy of data (default)share
: Subnetwork operates on same data instancepartial
: Selected fields are copiedreturn_state
(if specified)The current implementation passes the entire data record to subnetworks. Future enhancements could include:
arcs:
- target_network: "validation"
data_mapping: # Map parent fields to child
order_id: id
items: products
result_mapping: # Map child results back
validation_status: is_valid
error_messages: validation_errors
Important: Arc transforms on push arcs execute BEFORE pushing to the subnetwork:
def prepare_for_validation(data: Dict[str, Any], context: FunctionContext) -> Dict[str, Any]:
"""Transform that prepares data before pushing to validation subnetwork."""
return {
'validation_request': {
'order_id': data['id'],
'items': data['items'],
'timestamp': time.time()
}
}
The subnetwork receives the transformed data, not the original.
Resources allocated in the parent network are available to subnetworks:
Current Behavior: Variables are part of ExecutionContext and persist through subnetwork calls.
Recommended Scope (for implementation):
Example usage:
def track_subnet_calls(data: Dict[str, Any], context: FunctionContext) -> Dict[str, Any]:
# Increment subnet call counter (global variable)
context.variables['subnet_calls'] = context.variables.get('subnet_calls', 0) + 1
# Set network-specific variable
context.variables[f'{context.state_name}_processed'] = True
return data
Important: States communicate PRIMARILY through the data being processed. The only shared mutable context between states is through shared variables as described in the “Cross-State Communication via Shared Variables” section below. All other contextual data modifications will be transient.
context.data
):
context.data
def transform(data: Dict[str, Any], context: FunctionContext) -> Dict[str, Any]:
# Add a field that next state can see
data['processed_by'] = context.state_name
data['processing_timestamp'] = time.time()
return data
context.metadata
or context.resources
are NOT persistedcurrent_state
, state_history
, network_stack
, etc.The following ExecutionContext fields are critical for engine operation and must NEVER be modified by user code (even if exposed):
Field | Purpose | Why Protected |
---|---|---|
current_state |
Tracks current FSM position | Engine relies on this for transitions |
previous_state |
Enables state history | Used for debugging and rollback |
network_stack |
Manages subnetwork calls | Critical for push/pop operations |
state_history |
Audit trail of execution | Used for loop detection |
current_transaction |
Transaction management | Database consistency depends on this |
resource_manager |
Resource allocation | Prevents resource leaks |
data_mode |
Processing mode (SINGLE/BATCH/STREAM) | Affects entire execution strategy |
In addition to data flow, the FSM supports shared variables for cross-state communication:
def transform(data: Dict[str, Any], context: FunctionContext) -> Dict[str, Any]:
# Read a variable set by previous state
cached_data = context.variables.get('expensive_lookup', {})
# Set a variable for later states
context.variables['processing_stage'] = 'enrichment_complete'
# Accumulate statistics across states
stats = context.variables.get('stats', {'count': 0})
stats['count'] += 1
context.variables['stats'] = stats
return data
The FSM supports four primary function types:
Purpose: Verify data integrity and business rules
Interface:
def validate(data: Dict[str, Any], context: FunctionContext) -> Dict[str, Any]
Available Information:
data
: Complete current data dictionarycontext.state_name
: Current state namecontext.metadata
: Execution and state metadatacontext.resources
: Allocated resource handlesExpected Behavior:
Purpose: Modify, enrich, or restructure data
Interface:
def transform(data: Dict[str, Any], context: FunctionContext) -> Dict[str, Any]
Available Information:
Expected Behavior:
Purpose: Determine arc eligibility (pre-test conditions)
Interface:
def test(data: Dict[str, Any], context: FunctionContext) -> bool
Available Information:
Expected Behavior:
True
if condition metFalse
to skip arcPurpose: Determine if processing should terminate
Interface:
def should_end(data: Dict[str, Any], context: FunctionContext) -> Tuple[bool, str]
Available Information:
Expected Behavior:
(True, reason)
to terminate(False, None)
to continueAll functions receive a FunctionContext with:
@dataclass
class FunctionContext:
state_name: str # Current state name
function_name: str # Function being executed
metadata: Dict[str, Any] # Execution metadata
resources: Dict[str, Any] # Resource handles
Metadata Contents:
state
: Current state nameis_end_state
: Boolean end state flag (when applicable)Processes one data record through the network:
flowchart LR
Data[Single Record] --> FSM[FSM Network]
FSM --> Result[Single Result]
Characteristics:
Processes multiple records as a batch:
flowchart TD
Batch[Batch Data] --> Split[Split Records]
Split --> P1[Process 1]
Split --> P2[Process 2]
Split --> P3[Process N]
P1 --> Collect[Collect Results]
P2 --> Collect
P3 --> Collect
Collect --> Results[Batch Results]
Characteristics:
context.batch_results
Processes continuous data stream:
flowchart TD
Stream[Data Stream] --> Chunk[Get Chunk]
Chunk --> Process[Process Chunk]
Process --> More{More Data?}
More -->|Yes| Chunk
More -->|No| Complete[Stream Complete]
Characteristics:
Push arcs enable hierarchical FSM composition:
flowchart TD
Main[Main Network] -->|Push Arc| Sub[Subnetwork]
Sub --> SubProc[Process in Subnetwork]
SubProc --> SubEnd[Subnetwork End]
SubEnd -->|Return| Return[Return State]
Return --> Main2[Continue Main Network]
The execution context maintains a network stack:
# Push operation
context.push_network(network_name="validation_subnet", return_state="continue_main")
# Stack structure
context.network_stack = [
("main_network", None),
("validation_subnet", "continue_main"),
# ... nested networks
]
# Pop operation
network, return_state = context.pop_network()
Push Arc Processing:
isolation_mode
COPY
: Deep copy data for subnetworkREFERENCE
: Share data referenceSERIALIZE
: Serialize/deserialize for isolation# Configuration
data_mapping: {'parent_field': 'child_field'}
# Execution
child_data = map_data(parent_data, data_mapping)
# Configuration
result_mapping: {'child_result': 'parent_field'}
# Execution
parent_data.update(map_data(child_result, result_mapping))
return_state
specified: transition to that statestateDiagram-v2
[*] --> MainNetwork
MainNetwork --> PushArc: Evaluate Push Arc
PushArc --> SaveContext: Save Main Context
SaveContext --> InitSubnet: Initialize Subnetwork
InitSubnet --> SubnetExec: Execute Subnetwork
SubnetExec --> SubnetEnd: Reach End State
SubnetEnd --> RestoreContext: Restore Main Context
RestoreContext --> MapResults: Map Results
MapResults --> ReturnState: Go to Return State
ReturnState --> MainNetwork: Continue Main
MainNetwork --> [*]: End
Functions may encounter errors during execution:
flowchart TD
Exec[Execute Function] --> Try[Try Execution]
Try --> Success{Success?}
Success -->|Yes| Continue[Continue]
Success -->|No| Retry{Retry Available?}
Retry -->|Yes| Delay[Delay]
Delay --> Try
Retry -->|No| HandleError[Error Handler]
HandleError --> Rollback{Rollback?}
Rollback -->|Yes| RollbackTx[Rollback Transaction]
Rollback -->|No| LogError[Log Error]
Error Categories:
Transactions span different scopes based on mode:
Transaction Strategies:
SINGLE
: One transaction per executionBATCH
: One transaction per batchMANUAL
: Explicit transaction controlNONE
: No transaction managementTransaction Flow:
flowchart TD
Start[Start Execution] --> Begin[Begin Transaction]
Begin --> Process[Process States]
Process --> Commit{Success?}
Commit -->|Yes| CommitTx[Commit Transaction]
Commit -->|No| Rollback[Rollback Transaction]
CommitTx --> End[End]
Rollback --> End
The execution context tracks performance metrics:
context.state_timings = {
'validate': 0.125,
'process': 0.847,
'complete': 0.023
}
context.function_call_count = {
'validate_schema': 3,
'transform_data': 5,
'check_condition': 12
}
Consider a simple data validation and enrichment FSM:
# Using simplified format for clarity
name: "validation_enrichment_fsm"
states:
- name: "validate_input"
is_start: true
pre_validators: # Check required fields exist
- type: "inline"
code: "lambda state: 'user_id' in state.data"
transforms:
- type: "inline"
code: "lambda state: {**state.data, 'validated': True, 'timestamp': __import__('time').time()}"
validators: # Post-transform validation
- type: "inline"
code: "lambda state: state.data.get('validated') and state.data.get('timestamp')"
- name: "enrich_data"
pre_validators: # Ensure we have valid input
- type: "inline"
code: "lambda state: state.data.get('validated') == True"
transforms:
- type: "builtin"
name: "fetch_user_details"
validators: # Verify enrichment succeeded
- type: "inline"
code: "lambda state: 'user_details' in state.data"
- name: "complete"
is_end: true
arcs:
- from: "validate_input"
to: "enrich_data"
condition:
type: "inline"
code: "lambda state: state.data.get('validated') == True"
transform:
type: "inline"
code: "lambda state: {**state.data, 'timestamp': __import__('time').time()}"
priority: 10
- from: "enrich_data"
to: "complete"
condition:
type: "inline"
code: "lambda state: 'user_details' in state.data"
1. Input: {'user_id': 123, 'action': 'login'}
2. Enter "validate_input":
- Pre-validator: Check 'user_id' exists ✓
- Transform: Add 'validated': True and timestamp
- Data: {'user_id': 123, 'action': 'login', 'validated': True, 'timestamp': 1234567890}
- Post-validator: Check 'validated' and 'timestamp' exist ✓
- Evaluate arc condition: validated == True ✓
3. Arc transition to "enrich_data":
- Arc transform: (none in this example)
- Data unchanged during transition
4. Enter "enrich_data":
- Pre-validator: Check validated == True ✓
- Transform: Fetch and add user details
- Data: {'user_id': 123, 'action': 'login', 'validated': True, 'timestamp': 1234567890, 'user_details': {...}}
- Post-validator: Check 'user_details' exists ✓
- Evaluate arc condition: 'user_details' in data ✓
5. Transition to "complete":
- End state reached
- Return final data
This example demonstrates conditional execution with different entry points in a subnetwork:
name: "order_processing_fsm"
networks:
- name: "main"
states:
- name: "start"
is_start: true
arcs:
# Path 1: Standard customer orders
- target: "standard_processing"
condition:
type: "inline"
code: "lambda state: state.data.get('customer_type') == 'standard'"
# Path 2: Premium customer orders
- target: "premium_processing"
condition:
type: "inline"
code: "lambda state: state.data.get('customer_type') == 'premium'"
- name: "standard_processing"
arcs:
# Push to validation network at basic validation state
- target_network: "validation:basic_validation"
return_state: "fulfillment"
- name: "premium_processing"
arcs:
# Push to validation network at premium validation state (skips basic)
- target_network: "validation:premium_validation"
return_state: "priority_fulfillment"
- name: "fulfillment"
is_end: true
- name: "priority_fulfillment"
is_end: true
- name: "validation"
states:
- name: "basic_validation"
is_start: true # Default entry point
arcs:
- target: "premium_validation"
transforms:
- type: "inline"
code: "lambda state: {**state.data, 'basic_validated': True}"
- name: "premium_validation"
is_end: true
transforms:
- type: "inline"
code: "lambda state: {**state.data, 'premium_validated': True}"
Standard Customer Path:
1. Input: {'customer_type': 'standard', 'order_id': 123}
2. start → standard_processing (condition: customer_type == 'standard')
3. Push to validation:basic_validation (enters at basic_validation state)
4. validation: basic_validation → premium_validation → end
5. Return to main:fulfillment
6. Final result: {'customer_type': 'standard', 'order_id': 123, 'basic_validated': True, 'premium_validated': True}
Premium Customer Path:
1. Input: {'customer_type': 'premium', 'order_id': 456}
2. start → premium_processing (condition: customer_type == 'premium')
3. Push to validation:premium_validation (skips basic_validation, enters directly at premium_validation)
4. validation: premium_validation → end
5. Return to main:priority_fulfillment
6. Final result: {'customer_type': 'premium', 'order_id': 456, 'premium_validated': True}
Key Benefits:
The FSM processing flow provides a flexible, modular approach to data processing through:
This architecture enables complex workflow orchestration while maintaining clarity and maintainability through well-defined interfaces and processing stages.