dataknobs-fsm Complete API Reference¶
Complete auto-generated API documentation from source code docstrings.
💡 Also see: - Curated API Guide - Hand-crafted tutorials and examples - Package Overview - Introduction and getting started - Source Code - View on GitHub
dataknobs_fsm ¶
DataKnobs Finite State Machine framework.
A flexible FSM framework with data modes, resource management, and streaming support for building complex data processing workflows.
Modules:
| Name | Description |
|---|---|
api |
Public APIs for FSM operations across different use cases. |
cli |
FSM CLI module. |
config |
FSM configuration system for loading, validating, and building FSM instances. |
core |
Core FSM components. |
execution |
Execution module for FSM state machines. |
functions |
FSM functions module. |
io |
I/O abstraction layer for FSM patterns. |
observability |
FSM execution observability for tracking state transitions. |
patterns |
Pre-configured FSM patterns for common workflow use cases. |
resources |
Resource management for FSM. |
storage |
Storage module for FSM execution history. |
streaming |
Streaming support for large data processing in FSM. |
Classes:
| Name | Description |
|---|---|
FSM |
Finite State Machine core class. |
StateDefinition |
Definition of a state in the FSM. |
StateInstance |
Runtime instance of a state. |
ArcDefinition |
Definition of an arc between states. |
DataHandlingMode |
Data handling modes for state instances - defines how data is managed within states. |
SimpleFSM |
Synchronous FSM interface for simple workflows. |
AsyncSimpleFSM |
Async-first FSM interface for production workflows. |
AdvancedFSM |
Advanced FSM interface with full control capabilities. |
ExecutionMode |
Advanced execution modes. |
ExecutionHook |
Hook for monitoring execution events. |
StepResult |
Result from a single step execution. |
FSMDebugger |
Interactive debugger for FSM execution (fully synchronous). |
ExecutionContext |
Execution context for FSM processing with full mode support. |
ConfigLoader |
Load and process FSM configurations from various sources. |
FSMBuilder |
Build executable FSM instances from configuration. |
ExecutionHistoryQuery |
Query parameters for filtering execution history. |
ExecutionRecord |
Record of a single FSM state transition. |
ExecutionStats |
Aggregated statistics for FSM executions. |
ExecutionTracker |
Tracks FSM execution history with query capabilities. |
Functions:
| Name | Description |
|---|---|
create_advanced_fsm |
Factory function to create an AdvancedFSM instance. |
create_execution_record |
Factory function to create an execution record. |
Classes¶
FSM ¶
FSM(
name: str,
data_mode: ProcessingMode = ProcessingMode.SINGLE,
transaction_mode: TransactionMode = TransactionMode.NONE,
description: str | None = None,
resource_manager: Any | None = None,
transaction_manager: Any | None = None,
)
Finite State Machine core class.
This is the foundational class that defines the structure and configuration of a finite state machine. It serves as the container for state networks, functions, and execution configuration.
Architecture
The FSM class uses a builder pattern approach where: 1. FSM is constructed with configuration 2. Networks are added with states and transitions 3. Functions are registered for state operations 4. FSM is validated before execution 5. Execution engines are created on-demand
Key Components
State Networks: - One or more state graphs (nodes=states, edges=transitions) - Main network for primary execution path - Sub-networks for modular workflows - Accessed via networks dict or get_network()
Function Registry: - Central registry of all functions used in workflows - Validation, transform, and test functions - Referenced by name in state/arc definitions - Ensures all function references are valid
Processing Configuration: - data_mode: How data flows (SINGLE/BATCH/STREAM) - transaction_mode: Transaction guarantees (NONE/OPTIMISTIC/PESSIMISTIC) - resource_manager: Manages external resources - transaction_manager: Coordinates transactions
Execution Engines: - Created lazily via get_engine() / get_async_engine() - Sync engine for SimpleFSM - Async engine for AsyncSimpleFSM - Cached for reuse
Attributes:
| Name | Type | Description |
|---|---|---|
name |
str
|
Unique identifier for this FSM |
data_mode |
ProcessingMode
|
Data processing mode (SINGLE/BATCH/STREAM) |
transaction_mode |
TransactionMode
|
Transaction handling (NONE/OPTIMISTIC/PESSIMISTIC) |
description |
str | None
|
Optional FSM description |
networks |
Dict[str, StateNetwork]
|
State networks by name |
main_network_name |
str | None
|
Name of the main network |
function_registry |
FunctionRegistry
|
Registry of all functions |
resource_requirements |
Dict[str, Any]
|
Aggregated resource requirements |
config |
Dict[str, Any]
|
Additional configuration |
metadata |
Dict[str, Any]
|
FSM metadata |
version |
str
|
FSM version for compatibility |
created_at |
float | None
|
Creation timestamp |
updated_at |
float | None
|
Last update timestamp |
resource_manager |
Any | None
|
External resource manager |
transaction_manager |
Any | None
|
Transaction coordinator |
Design Patterns
Separation of Concerns: - FSM defines structure (what to execute) - Engines handle execution (how to execute) - Clear boundary between definition and runtime
Multiple Networks: - FSMs can contain multiple state graphs - Enables modular workflow composition - Sub-networks can be reused across FSMs
Lazy Initialization: - Engines created on first use - Reduces overhead for validation-only use cases - Cached for subsequent use
Validation Before Execution: - validate() checks structure before execution - Catches configuration errors early - Provides detailed error messages
Resource Management
The FSM aggregates resource requirements from all networks: - Database connections needed - API clients required - File handles expected - Memory requirements
This enables: - Pre-execution resource checks - Resource pooling optimization - Clear dependency documentation
Serialization
FSM supports serialization for persistence and transmission: - to_dict(): Convert to dictionary - from_dict(): Reconstruct from dictionary - Note: Function registry not serialized (functions must be re-registered)
Thread Safety
The FSM structure itself is read-only after construction and validation. However, execution engines may or may not be thread-safe depending on data_mode: - SINGLE mode: Not thread-safe (by design) - BATCH mode: Thread-safe if using DataHandlingMode.COPY - STREAM mode: Thread-safe with proper streaming context
Note
This is an internal API. Users should use the higher-level APIs: - SimpleFSM for synchronous workflows - AsyncSimpleFSM for async/production workflows - AdvancedFSM for debugging and profiling
Direct FSM instantiation is primarily for builders and internal use.
See Also
- :class:
~dataknobs_fsm.api.simple.SimpleFSM: Synchronous API - :class:
~dataknobs_fsm.api.async_simple.AsyncSimpleFSM: Async API - :class:
~dataknobs_fsm.core.network.StateNetwork: State graph - :class:
~dataknobs_fsm.functions.base.FunctionRegistry: Function registry
Initialize FSM.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Name of the FSM. |
required |
data_mode
|
ProcessingMode
|
Data processing mode. |
SINGLE
|
transaction_mode
|
TransactionMode
|
Transaction handling mode. |
NONE
|
description
|
str | None
|
Optional FSM description. |
None
|
Methods:
| Name | Description |
|---|---|
add_network |
Add a network to the FSM. |
remove_network |
Remove a network from the FSM. |
get_network |
Get a network by name. |
validate |
Validate the FSM. |
get_all_states |
Get all states from all networks. |
get_all_arcs |
Get all arcs from all networks. |
supports_streaming |
Check if FSM supports streaming. |
get_resource_summary |
Get resource requirements summary. |
clone |
Create a clone of this FSM. |
to_dict |
Convert FSM to dictionary representation. |
from_dict |
Create FSM from dictionary representation. |
find_state_definition |
Find a state definition by name. |
create_state_instance |
Create a state instance from a state name. |
get_state |
Get a state definition by name. |
is_start_state |
Check if a state is a start state. |
is_end_state |
Check if a state is an end state. |
get_start_state |
Get the start state definition. |
get_all_states_dict |
Get all states from all networks. |
get_outgoing_arcs |
Get outgoing arcs from a state. |
get_engine |
Get or create the execution engine. |
get_async_engine |
Get or create the async execution engine. |
execute_async |
Execute the FSM asynchronously with initial data. |
execute |
Execute the FSM synchronously with initial data. |
Source code in packages/fsm/src/dataknobs_fsm/core/fsm.py
Attributes¶
main_network
property
¶
Get the main network object.
Returns:
| Type | Description |
|---|---|
Optional[StateNetwork]
|
The main StateNetwork object or None if not set |
states
property
¶
Get all states from the main network.
Returns:
| Type | Description |
|---|---|
Dict[str, StateDefinition]
|
Dictionary of state_name -> state_definition for the main network |
Functions¶
add_network ¶
Add a network to the FSM.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
network
|
StateNetwork
|
Network to add. |
required |
is_main
|
bool
|
Whether this is the main network. |
False
|
Source code in packages/fsm/src/dataknobs_fsm/core/fsm.py
remove_network ¶
Remove a network from the FSM.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
network_name
|
str
|
Name of network to remove. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if removed successfully. |
Source code in packages/fsm/src/dataknobs_fsm/core/fsm.py
get_network ¶
Get a network by name.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
network_name
|
str | None
|
Name of network (None for main network). |
None
|
Returns:
| Type | Description |
|---|---|
StateNetwork | None
|
Network or None if not found. |
Source code in packages/fsm/src/dataknobs_fsm/core/fsm.py
validate ¶
Validate the FSM.
Returns:
| Type | Description |
|---|---|
Tuple[bool, List[str]]
|
Tuple of (valid, list of errors). |
Source code in packages/fsm/src/dataknobs_fsm/core/fsm.py
get_all_states ¶
Get all states from all networks.
Returns:
| Type | Description |
|---|---|
Dict[str, List[str]]
|
Dictionary of network_name -> list of state names. |
Source code in packages/fsm/src/dataknobs_fsm/core/fsm.py
get_all_arcs ¶
Get all arcs from all networks.
Returns:
| Type | Description |
|---|---|
Dict[str, List[str]]
|
Dictionary of network_name -> list of arc IDs. |
Source code in packages/fsm/src/dataknobs_fsm/core/fsm.py
supports_streaming ¶
Check if FSM supports streaming.
Returns:
| Type | Description |
|---|---|
bool
|
True if any network supports streaming. |
get_resource_summary ¶
Get resource requirements summary.
Returns:
| Type | Description |
|---|---|
Dict[str, Any]
|
Resource requirements summary. |
Source code in packages/fsm/src/dataknobs_fsm/core/fsm.py
clone ¶
Create a clone of this FSM.
Returns:
| Type | Description |
|---|---|
FSM
|
Cloned FSM. |
Source code in packages/fsm/src/dataknobs_fsm/core/fsm.py
to_dict ¶
Convert FSM to dictionary representation.
Returns:
| Type | Description |
|---|---|
Dict[str, Any]
|
Dictionary representation. |
Source code in packages/fsm/src/dataknobs_fsm/core/fsm.py
from_dict
classmethod
¶
Create FSM from dictionary representation.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data
|
Dict[str, Any]
|
Dictionary with FSM data. |
required |
Returns:
| Type | Description |
|---|---|
FSM
|
New FSM instance. |
Source code in packages/fsm/src/dataknobs_fsm/core/fsm.py
find_state_definition ¶
Find a state definition by name.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
state_name
|
str
|
Name of the state to find |
required |
network_name
|
str | None
|
Optional specific network to search in |
None
|
Returns:
| Type | Description |
|---|---|
StateDefinition | None
|
StateDefinition if found, None otherwise |
Source code in packages/fsm/src/dataknobs_fsm/core/fsm.py
create_state_instance ¶
create_state_instance(
state_name: str,
data: Dict[str, Any] | None = None,
network_name: str | None = None,
) -> StateInstance
Create a state instance from a state name.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
state_name
|
str
|
Name of the state |
required |
data
|
Dict[str, Any] | None
|
Optional initial data for the state |
None
|
network_name
|
str | None
|
Optional specific network to search in |
None
|
Returns:
| Type | Description |
|---|---|
StateInstance
|
StateInstance object |
Source code in packages/fsm/src/dataknobs_fsm/core/fsm.py
get_state ¶
Get a state definition by name.
This is an alias for find_state_definition for compatibility.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
state_name
|
str
|
Name of the state |
required |
network_name
|
str | None
|
Optional specific network to search in |
None
|
Returns:
| Type | Description |
|---|---|
StateDefinition | None
|
StateDefinition if found, None otherwise |
Source code in packages/fsm/src/dataknobs_fsm/core/fsm.py
is_start_state ¶
Check if a state is a start state.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
state_name
|
str
|
Name of the state |
required |
network_name
|
str | None
|
Optional specific network to check in (defaults to main network) |
None
|
Returns:
| Type | Description |
|---|---|
bool
|
True if the state is a start state |
Source code in packages/fsm/src/dataknobs_fsm/core/fsm.py
is_end_state ¶
Check if a state is an end state.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
state_name
|
str
|
Name of the state |
required |
network_name
|
str | None
|
Optional specific network to check in (defaults to main network) |
None
|
Returns:
| Type | Description |
|---|---|
bool
|
True if the state is an end state |
Source code in packages/fsm/src/dataknobs_fsm/core/fsm.py
get_start_state ¶
Get the start state definition.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
network_name
|
str | None
|
Optional specific network to search in |
None
|
Returns:
| Type | Description |
|---|---|
StateDefinition | None
|
Start state definition if found, None otherwise |
Source code in packages/fsm/src/dataknobs_fsm/core/fsm.py
get_all_states_dict ¶
Get all states from all networks.
Returns:
| Type | Description |
|---|---|
Dict[str, Dict[str, StateDefinition]]
|
Dictionary of network_name -> {state_name -> state_definition} |
Source code in packages/fsm/src/dataknobs_fsm/core/fsm.py
get_outgoing_arcs ¶
Get outgoing arcs from a state.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
state_name
|
str
|
Name of the state |
required |
network_name
|
str | None
|
Optional network name (uses main network if None) |
None
|
Returns:
| Type | Description |
|---|---|
List[Any]
|
List of outgoing arcs from the state |
Source code in packages/fsm/src/dataknobs_fsm/core/fsm.py
get_engine ¶
Get or create the execution engine.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
strategy
|
str | None
|
Optional execution strategy override |
None
|
Returns:
| Type | Description |
|---|---|
ExecutionEngine
|
ExecutionEngine instance. |
Source code in packages/fsm/src/dataknobs_fsm/core/fsm.py
get_async_engine ¶
Get or create the async execution engine.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
strategy
|
str | None
|
Optional execution strategy override |
None
|
Returns:
| Type | Description |
|---|---|
AsyncExecutionEngine
|
AsyncExecutionEngine instance. |
Source code in packages/fsm/src/dataknobs_fsm/core/fsm.py
execute_async
async
¶
Execute the FSM asynchronously with initial data.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
initial_data
|
Dict[str, Any] | None
|
Initial data for execution. |
None
|
Returns:
| Type | Description |
|---|---|
Any
|
Execution result. |
Source code in packages/fsm/src/dataknobs_fsm/core/fsm.py
execute ¶
Execute the FSM synchronously with initial data.
This is a simplified API for running the FSM.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
initial_data
|
Dict[str, Any] | None
|
Initial data for execution. |
None
|
Returns:
| Type | Description |
|---|---|
Any
|
Execution result. |
Source code in packages/fsm/src/dataknobs_fsm/core/fsm.py
StateDefinition
dataclass
¶
StateDefinition(
name: str,
type: StateType = StateType.NORMAL,
description: str = "",
metadata: Dict[str, Any] = dict(),
schema: StateSchema | None = None,
data_mode: DataHandlingMode | None = None,
resource_requirements: List[ResourceConfig] = list(),
pre_validation_functions: List[IValidationFunction] = list(),
validation_functions: List[IValidationFunction] = list(),
transform_functions: List[ITransformFunction] = list(),
end_test_function: IEndStateTestFunction | None = None,
outgoing_arcs: List[ArcDefinition] = list(),
timeout: float | None = None,
retry_count: int = 0,
retry_delay: float = 1.0,
)
Definition of a state in the FSM.
StateDefinition is the static blueprint for a state, defining its structure, schema, functions, resources, and execution configuration. It is immutable once created and can be reused across multiple executions.
Architecture
StateDefinition follows a declarative design pattern: - Define structure, not behavior - Functions referenced by interface, not implementation - Configuration-driven execution - Immutable and reusable
Key Components
Identity: - name: Unique identifier within network - type: Role in workflow (START/END/NORMAL/etc) - description: Human-readable description - metadata: Additional custom attributes
Data Validation: - schema: Optional StateSchema for data validation - pre_validation_functions: Run before state entry - validation_functions: Run after entry, before transform - Ensures data quality throughout workflow
Data Transformation: - transform_functions: Modify data during processing - Multiple transforms chain in order - Each returns modified data
Resource Requirements: - resource_requirements: List of ResourceConfig - Database connections, API clients, etc. - Acquired on entry, released on exit
Execution Configuration: - timeout: Max execution time in seconds - retry_count: Number of retry attempts on failure - retry_delay: Delay between retries in seconds - data_mode: Preferred data handling mode
Network Integration: - outgoing_arcs: List of transitions to other states - end_test_function: Determines if this is an end state
Data Modes
States can specify a preferred data_mode: - COPY: Deep copy for thread safety (default) - REFERENCE: Lazy loading for memory efficiency - DIRECT: In-place modification for performance - None: Inherit from FSM-level configuration
Validation
Multiple levels of validation: 1. Schema validation (structural) 2. Pre-validation functions (business logic before entry) 3. Validation functions (business logic after entry)
Resource Management
States declare resource requirements: - Specified as ResourceConfig objects - Include resource type, name, and configuration - Execution engine acquires before state entry - Released after state exit or on error
Error Handling
Built-in retry support: - retry_count: Number of attempts (0 = no retry) - retry_delay: Delay between attempts in seconds - Exponential backoff can be implemented in retry logic - After all retries exhausted, state fails
Note
StateDefinition is immutable after creation. To modify a state, create a new StateDefinition. This ensures consistency when the same definition is used across multiple executions.
Functions are stored as references (interfaces), not implementations. The actual function implementations are registered in the FSM's FunctionRegistry.
See Also
- :class:
~dataknobs_fsm.core.state.StateInstance: Runtime instance - :class:
~dataknobs_fsm.core.state.StateSchema: Data schema - :class:
~dataknobs_fsm.core.state.StateType: State type enum - :class:
~dataknobs_fsm.functions.base.IValidationFunction: Validation interface - :class:
~dataknobs_fsm.functions.base.ITransformFunction: Transform interface
Methods:
| Name | Description |
|---|---|
is_start_state |
Check if this is a start state. |
is_end_state |
Check if this is an end state. |
validate_data |
Validate data against state schema. |
add_pre_validation_function |
Add a pre-validation function. |
add_validation_function |
Add a validation function. |
add_transform_function |
Add a transform function. |
add_outgoing_arc |
Add an outgoing arc. |
Attributes:
| Name | Type | Description |
|---|---|---|
is_start |
bool
|
Property alias for is_start_state(). |
is_end |
bool
|
Property alias for is_end_state(). |
arcs |
List[ArcDefinition]
|
Get the outgoing arcs from this state. |
Attributes¶
Functions¶
is_start_state ¶
Check if this is a start state.
Returns:
| Type | Description |
|---|---|
bool
|
True if this is a start state. |
is_end_state ¶
Check if this is an end state.
Returns:
| Type | Description |
|---|---|
bool
|
True if this is an end state. |
validate_data ¶
Validate data against state schema.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data
|
Dict[str, Any]
|
Data to validate. |
required |
Returns:
| Type | Description |
|---|---|
Tuple[bool, List[str]]
|
Tuple of (is_valid, error_messages). |
Source code in packages/fsm/src/dataknobs_fsm/core/state.py
add_pre_validation_function ¶
Add a pre-validation function.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
func
|
IValidationFunction
|
Pre-validation function to add. |
required |
add_validation_function ¶
Add a validation function.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
func
|
IValidationFunction
|
Validation function to add. |
required |
add_transform_function ¶
Add a transform function.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
func
|
ITransformFunction
|
Transform function to add. |
required |
add_outgoing_arc ¶
Add an outgoing arc.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
arc
|
ArcDefinition
|
Arc definition to add. |
required |
StateInstance
dataclass
¶
StateInstance(
id: str = (lambda: str(uuid4()))(),
definition: StateDefinition = None,
status: StateStatus = StateStatus.PENDING,
data: Dict[str, Any] = dict(),
data_mode_manager: DataModeManager | None = None,
data_handler: Any | None = None,
transaction: Transaction | None = None,
acquired_resources: Dict[str, Any] = dict(),
entry_time: datetime | None = None,
exit_time: datetime | None = None,
execution_count: int = 0,
error_count: int = 0,
last_error: str | None = None,
executed_arcs: List[str] = list(),
next_state: str | None = None,
)
Runtime instance of a state.
StateInstance represents a single execution of a StateDefinition within a workflow. It holds the runtime data, tracks execution status, manages resources, and implements the state lifecycle.
Architecture
StateInstance follows the Instance pattern: - One StateDefinition → Many StateInstances - Each instance is independent - Instance holds mutable execution state - Definition holds immutable structure
Lifecycle
StateInstance progresses through a defined lifecycle:
1. Creation:
from dataknobs_fsm.core.state import StateDefinition, StateInstance, StateType
# Define state first
state_def = StateDefinition(name="process", type=StateType.NORMAL)
# Create instance
instance = StateInstance(definition=state_def)
# Status: PENDING
# No data yet
2. Entry:
instance.enter(input_data)
# Status: ACTIVE
# Data mode handler applies transformations
# Resources acquired
# Execution tracking begins
3. Processing:
4. Exit:
result_data = instance.exit(commit=True)
# Status: COMPLETED (or FAILED)
# Data mode handler commits changes
# Resources released
# Duration calculated
5. Error Handling:
Attributes:
| Name | Type | Description |
|---|---|---|
id |
str
|
Unique identifier for this instance (UUID) |
definition |
StateDefinition
|
The state blueprint being executed |
status |
StateStatus
|
Current execution status (PENDING/ACTIVE/COMPLETED/FAILED/etc) |
data |
Dict[str, Any]
|
Current state data |
data_mode_manager |
DataModeManager | None
|
Manages data handling modes |
data_handler |
Any | None
|
Active data mode handler (COPY/REFERENCE/DIRECT) |
transaction |
Transaction | None
|
Active transaction if using transactional mode |
acquired_resources |
Dict[str, Any]
|
Resources acquired for this instance |
entry_time |
datetime | None
|
When state was entered |
exit_time |
datetime | None
|
When state was exited |
execution_count |
int
|
Number of times state has been entered |
error_count |
int
|
Number of errors encountered |
last_error |
str | None
|
Most recent error message |
executed_arcs |
List[str]
|
IDs of arcs executed from this state |
next_state |
str | None
|
Name of next state to transition to |
Data Handling
StateInstance uses data mode handlers to manage how data flows:
COPY Mode: - Deep copy on entry via data_handler.on_entry() - Modifications to local copy - Commit on exit via data_handler.on_exit(commit=True) - Thread-safe, memory-intensive
REFERENCE Mode: - Lazy loading with version tracking - Optimistic locking for conflicts - on_modification() tracks changes - Memory-efficient
DIRECT Mode: - In-place modification - No copying overhead - Fastest performance - Not thread-safe
Resource Management
StateInstance tracks acquired resources:
Lifecycle: 1. acquire: add_resource(name, handle) 2. use: get_resource(name) → handle 3. release: release_resources()
Automatic Cleanup: - Resources released on exit() - Resources released on fail() - Ensures no resource leaks
Execution Tracking
StateInstance tracks detailed execution metrics:
Timing: - entry_time: When processing started - exit_time: When processing completed - get_duration(): Calculates elapsed time
Counts: - execution_count: Total entries (for retries) - error_count: Total errors encountered - executed_arcs: History of transitions
State: - status: Current execution status - next_state: Determined next state - last_error: Most recent error message
Transaction Support
StateInstance participates in transactions:
Integration: - transaction field holds active transaction - Data changes logged to transaction - Rollback on fail() - Commit on exit(commit=True)
Modes: - NONE: No transaction tracking - OPTIMISTIC: Commit at workflow end - PESSIMISTIC: Commit at each state exit
Methods:
| Name | Description |
|---|---|
**Lifecycle |
** |
- enter |
Enter state with data |
- exit |
Exit and finalize |
- fail |
Mark as failed |
- skip |
Skip this state |
**Control |
** |
- pause |
Temporarily pause |
- resume |
Resume from pause |
- modify_data |
Update state data |
**Resources |
** |
- add_resource |
Acquire resource |
- get_resource |
Get acquired resource |
- release_resources |
Release all resources |
**Tracking |
** |
- record_arc_execution |
Track transition |
- get_duration |
Get execution time |
- to_dict |
Serialize to dictionary |
Note
StateInstance is managed by execution engines. Users typically don't create or manipulate instances directly, but may observe them via debugging tools or execution results.
Each workflow execution creates fresh StateInstances, even when reusing StateDefinitions. This ensures execution isolation.
See Also
- :class:
~dataknobs_fsm.core.state.StateDefinition: State blueprint - :class:
~dataknobs_fsm.core.state.StateStatus: Status enum - :class:
~dataknobs_fsm.core.data_modes.DataModeManager: Data mode manager - :class:
~dataknobs_fsm.core.transactions.Transaction: Transaction support
Functions¶
__post_init__ ¶
Initialize data mode manager if not provided.
Source code in packages/fsm/src/dataknobs_fsm/core/state.py
enter ¶
Enter the state with input data.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
input_data
|
Dict[str, Any]
|
Input data for the state. |
required |
Source code in packages/fsm/src/dataknobs_fsm/core/state.py
exit ¶
Exit the state.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
commit
|
bool
|
Whether to commit data changes. |
True
|
Returns:
| Type | Description |
|---|---|
Dict[str, Any]
|
The final state data. |
Source code in packages/fsm/src/dataknobs_fsm/core/state.py
fail ¶
Mark the state as failed.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
error
|
str
|
Error message. |
required |
Source code in packages/fsm/src/dataknobs_fsm/core/state.py
pause ¶
resume ¶
skip ¶
modify_data ¶
Modify state data.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
updates
|
Dict[str, Any]
|
Data updates to apply. |
required |
Source code in packages/fsm/src/dataknobs_fsm/core/state.py
add_resource ¶
Add an acquired resource.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Resource name. |
required |
resource
|
Any
|
The resource handle/connection. |
required |
get_resource ¶
Get an acquired resource.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Resource name. |
required |
Returns:
| Type | Description |
|---|---|
Any | None
|
The resource if available. |
release_resources ¶
record_arc_execution ¶
Record that an arc was executed.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
arc_id
|
str
|
ID of the executed arc. |
required |
get_duration ¶
Get execution duration in seconds.
Returns:
| Type | Description |
|---|---|
float | None
|
Duration in seconds if available. |
Source code in packages/fsm/src/dataknobs_fsm/core/state.py
to_dict ¶
Convert to dictionary representation.
Returns:
| Type | Description |
|---|---|
Dict[str, Any]
|
Dictionary with state instance data. |
Source code in packages/fsm/src/dataknobs_fsm/core/state.py
ArcDefinition
dataclass
¶
ArcDefinition(
target_state: str,
pre_test: str | None = None,
transform: str | TransformSpec | list[str | TransformSpec] | None = None,
priority: int = 0,
definition_order: int = 0,
metadata: Dict[str, Any] = dict(),
required_resources: Dict[str, str] = dict(),
)
Definition of an arc between states.
This class defines the static properties of an arc, including the transition logic and resource requirements.
Methods:
| Name | Description |
|---|---|
__hash__ |
Make ArcDefinition hashable. |
Functions¶
__hash__ ¶
Make ArcDefinition hashable.
Source code in packages/fsm/src/dataknobs_fsm/core/arc.py
DataHandlingMode ¶
Bases: Enum
Data handling modes for state instances - defines how data is managed within states.
SimpleFSM ¶
SimpleFSM(
config: str | Path | dict[str, Any],
data_mode: DataHandlingMode = DataHandlingMode.COPY,
resources: dict[str, Any] | None = None,
custom_functions: dict[str, Callable] | None = None,
)
Synchronous FSM interface for simple workflows.
This class provides a purely synchronous API for FSM operations, internally using AsyncSimpleFSM with a dedicated event loop managed automatically in a background thread.
SimpleFSM is designed for ease of use in scripts, prototypes, and simple pipelines where async/await complexity is not desired. It handles all async execution transparently, providing a familiar synchronous interface.
Attributes:
| Name | Type | Description |
|---|---|---|
data_mode |
DataHandlingMode
|
Data processing mode (COPY/REFERENCE/DIRECT) |
_async_fsm |
AsyncSimpleFSM
|
Internal async FSM implementation |
_fsm |
FSM
|
Core FSM engine |
_resource_manager |
ResourceManager
|
Resource lifecycle manager |
_loop |
AbstractEventLoop
|
Dedicated event loop for async operations |
_loop_thread |
Thread
|
Background thread running the event loop |
Methods:
| Name | Description |
|---|---|
process |
Process a single record through the FSM |
process_batch |
Process multiple records in parallel batches |
process_stream |
Process a stream of data from file or iterator |
validate |
Validate data against FSM's start state schema |
get_states |
List all state names in the FSM |
get_resources |
List all registered resource names |
close |
Clean up resources and close connections |
Use Cases
Data Transformation: Transform data through a pipeline of state functions. Each state receives the output of the previous state, enabling sequential transformations.
Data Validation: Validate data against schemas defined in state configurations. States can enforce data quality rules and reject invalid records.
File Processing:
Process large files line-by-line or in chunks using process_stream().
Supports automatic format detection (JSON, JSONL, CSV, text).
Batch Processing:
Process multiple records in parallel using process_batch(). Configurable
batch size and worker count for optimal throughput.
ETL Workflows: Extract-Transform-Load pipelines where data flows through extraction, transformation, and loading states with error handling.
Note
Thread Safety: SimpleFSM manages its own event loop in a background thread. While the synchronous API is thread-safe, concurrent calls will serialize due to the single event loop. For true concurrent processing, use AsyncSimpleFSM with multiple event loops or process_batch() with max_workers > 1.
Resource Management: Always call close() when done to properly release resources. Use context managers (with statement) when available in client code, or ensure close() is called in a finally block.
Data Mode Selection: - Use COPY (default) for production: safe, predictable, memory-intensive - Use REFERENCE for large datasets: memory-efficient, moderate overhead - Use DIRECT for performance: fastest, but not thread-safe
Error Handling: The process() method returns a dict with 'success' and 'error' keys rather than raising exceptions. This allows for graceful error handling in batch processing scenarios.
Example
Basic usage with configuration file:
from dataknobs_fsm.api.simple import SimpleFSM
# Create FSM from YAML config
fsm = SimpleFSM('pipeline.yaml')
# Process single record
result = fsm.process({
'text': 'Input text to process',
'metadata': {'source': 'user'}
})
if result['success']:
print(f"Result: {result['data']}")
print(f"Path: {' -> '.join(result['path'])}")
else:
print(f"Error: {result['error']}")
# Clean up
fsm.close()
With custom functions and resources:
from dataknobs_fsm.api.simple import SimpleFSM
from dataknobs_fsm.core.data_modes import DataHandlingMode
# Define custom state functions
def validate(data):
if 'required_field' not in data:
raise ValueError("Missing required field")
return data
def transform(data):
from datetime import datetime
data['processed'] = True
data['timestamp'] = datetime.now().isoformat()
return data
# Create FSM with config dict
config = {
'name': 'validation_pipeline',
'states': [
{'name': 'validate', 'type': 'START', 'function': 'validate'},
{'name': 'transform', 'type': 'END', 'function': 'transform'}
],
'arcs': [
{'from': 'validate', 'to': 'transform'}
]
}
# Initialize with custom functions and resources
fsm = SimpleFSM(
config=config,
data_mode=DataHandlingMode.COPY,
resources={
'database': {
'type': 'DATABASE',
'backend': 'memory'
}
},
custom_functions={
'validate': validate,
'transform': transform
}
)
# Process data
result = fsm.process({'required_field': 'value'})
print(f"Success: {result['success']}")
fsm.close()
Batch processing with progress callback:
# Define progress callback
def on_progress(current, total):
pct = (current / total) * 100
print(f"Progress: {current}/{total} ({pct:.1f}%)")
# Process batch
records = [{'id': i, 'text': f'Record {i}'} for i in range(100)]
results = fsm.process_batch(
data=records,
batch_size=10,
max_workers=4,
on_progress=on_progress
)
# Check results
successful = sum(1 for r in results if r['success'])
print(f"Processed {successful}/{len(records)} successfully")
See Also
- :class:
AsyncSimpleFSM: Async version for production applications - :class:
AdvancedFSM: Full control with debugging capabilities - :class:
DataHandlingMode: Data processing mode options - :func:
process_file: Convenience function for file processing - :func:
batch_process: Convenience function for batch processing - :func:
validate_data: Convenience function for data validation
Initialize SimpleFSM from configuration.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
str | Path | dict[str, Any]
|
FSM configuration. Can be: - Path to YAML/JSON config file (str or Path) - Dictionary containing config (inline configuration) Must define states, arcs, and optionally resources. |
required |
data_mode
|
DataHandlingMode
|
Data handling mode controlling how data is passed between states. Options: - DataHandlingMode.COPY (default): Deep copy for safety - DataHandlingMode.REFERENCE: Lazy loading with locking - DataHandlingMode.DIRECT: In-place modification (fastest) |
COPY
|
resources
|
dict[str, Any] | None
|
Optional resource configurations. Dict mapping resource names to configuration dicts. Each config must have a 'type' key (DATABASE, FILESYSTEM, HTTP, etc.) and type-specific parameters. Example: {'db': {'type': 'DATABASE', 'backend': 'postgres', ...}} |
None
|
custom_functions
|
dict[str, Callable] | None
|
Optional custom state functions. Dict mapping function names to callables. Functions receive data dict and return data dict. Function names must match 'function' fields in state definitions. Example: {'my_func': lambda data: {'result': data['input'] * 2}} |
None
|
Example
From configuration file:
With inline configuration:
config = {
'name': 'simple_pipeline',
'states': [
{'name': 'start', 'type': 'START'},
{'name': 'process', 'type': 'NORMAL', 'function': 'transform'},
{'name': 'end', 'type': 'END'}
],
'arcs': [
{'from': 'start', 'to': 'process'},
{'from': 'process', 'to': 'end'}
]
}
def transform(data):
data['transformed'] = True
return data
fsm = SimpleFSM(
config=config,
custom_functions={'transform': transform}
)
With data mode selection:
from dataknobs_fsm.core.data_modes import DataHandlingMode
# Use COPY for safe concurrent processing
fsm_safe = SimpleFSM('config.yaml', data_mode=DataHandlingMode.COPY)
# Use REFERENCE for memory efficiency
fsm_efficient = SimpleFSM('config.yaml', data_mode=DataHandlingMode.REFERENCE)
# Use DIRECT for maximum performance (single-threaded only)
fsm_fast = SimpleFSM('config.yaml', data_mode=DataHandlingMode.DIRECT)
With resources:
Source code in packages/fsm/src/dataknobs_fsm/api/simple.py
398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 | |
Attributes¶
Functions¶
process ¶
process(
data: dict[str, Any] | Record,
initial_state: str | None = None,
timeout: float | None = None,
) -> dict[str, Any]
Process a single record through the FSM synchronously.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data
|
dict[str, Any] | Record
|
Input data to process |
required |
initial_state
|
str | None
|
Optional starting state (defaults to FSM start state) |
None
|
timeout
|
float | None
|
Optional timeout in seconds |
None
|
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dict containing the processed result with fields: |
dict[str, Any]
|
|
dict[str, Any]
|
|
dict[str, Any]
|
|
dict[str, Any]
|
|
dict[str, Any]
|
|
Source code in packages/fsm/src/dataknobs_fsm/api/simple.py
553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 | |
process_batch ¶
process_batch(
data: list[dict[str, Any] | Record],
batch_size: int = 10,
max_workers: int = 4,
on_progress: Callable | None = None,
) -> list[dict[str, Any]]
Process multiple records in parallel batches synchronously.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data
|
list[dict[str, Any] | Record]
|
List of input records to process |
required |
batch_size
|
int
|
Number of records per batch |
10
|
max_workers
|
int
|
Maximum parallel workers |
4
|
on_progress
|
Callable | None
|
Optional callback for progress updates |
None
|
Returns:
| Type | Description |
|---|---|
list[dict[str, Any]]
|
List of results for each input record |
Source code in packages/fsm/src/dataknobs_fsm/api/simple.py
process_stream ¶
process_stream(
source: str | Any,
sink: str | None = None,
chunk_size: int = 100,
on_progress: Callable | None = None,
input_format: str = "auto",
text_field_name: str = "text",
csv_delimiter: str = ",",
csv_has_header: bool = True,
skip_empty_lines: bool = True,
use_streaming: bool = False,
) -> dict[str, Any]
Process a stream of data through the FSM synchronously.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
source
|
str | Any
|
Data source file path or async iterator |
required |
sink
|
str | None
|
Optional output destination |
None
|
chunk_size
|
int
|
Size of processing chunks |
100
|
on_progress
|
Callable | None
|
Optional progress callback |
None
|
input_format
|
str
|
Input file format ('auto', 'jsonl', 'json', 'csv', 'text') |
'auto'
|
text_field_name
|
str
|
Field name for text lines when converting to dict |
'text'
|
csv_delimiter
|
str
|
CSV delimiter character |
','
|
csv_has_header
|
bool
|
Whether CSV file has header row |
True
|
skip_empty_lines
|
bool
|
Skip empty lines in text files |
True
|
use_streaming
|
bool
|
Use memory-efficient streaming for large files |
False
|
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dict containing stream processing statistics |
Source code in packages/fsm/src/dataknobs_fsm/api/simple.py
validate ¶
Validate data against FSM's start state schema synchronously.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data
|
dict[str, Any] | Record
|
Data to validate |
required |
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dict containing validation results |
Source code in packages/fsm/src/dataknobs_fsm/api/simple.py
get_states ¶
get_resources ¶
close ¶
Clean up resources and close connections synchronously.
Source code in packages/fsm/src/dataknobs_fsm/api/simple.py
AsyncSimpleFSM ¶
AsyncSimpleFSM(
config: str | Path | dict[str, Any],
data_mode: DataHandlingMode = DataHandlingMode.COPY,
resources: dict[str, Any] | None = None,
custom_functions: dict[str, Callable] | None = None,
)
Async-first FSM interface for production workflows.
This class provides a fully asynchronous API for FSM operations, designed to work natively in async contexts without blocking calls or thread overhead. This is the recommended FSM implementation for production systems.
AsyncSimpleFSM handles all the complexity of async execution, resource management, and concurrent processing while providing a simple, clean API. It's optimized for high-throughput scenarios with proper connection pooling, error handling, and memory management.
Attributes:
| Name | Type | Description |
|---|---|---|
data_mode |
DataHandlingMode
|
Data processing mode (COPY/REFERENCE/DIRECT) |
_config |
Loaded FSM configuration |
|
_fsm |
FSM
|
Core FSM engine |
_resource_manager |
ResourceManager
|
Resource lifecycle and pooling manager |
_async_engine |
AsyncExecutionEngine
|
Async execution engine |
Methods:
| Name | Description |
|---|---|
process |
Process single record asynchronously |
process_batch |
Process multiple records with concurrency control |
process_stream |
Stream-process large datasets |
validate |
Validate data against schema |
get_states |
List all FSM state names |
get_resources |
List all registered resources |
close |
Release all resources and cleanup |
Production Use Cases
Web API Backend: Handle thousands of concurrent requests in FastAPI/aiohttp services. Each request processes independently with automatic resource pooling.
Data Pipeline Processing: Transform large datasets with memory-efficient streaming and parallel batch processing. Configurable chunk sizes and worker counts.
Real-time Event Processing: Process events from queues (RabbitMQ, Kafka) with async consumers. High throughput with concurrent processing of independent events.
Batch Job Processing: Schedule and run large batch jobs with progress tracking and error handling. Configurable parallelism for optimal resource utilization.
Note
Concurrency Safety: AsyncSimpleFSM is safe for concurrent use when using DataHandlingMode.COPY (default). Each process() call operates on independent data. For REFERENCE or DIRECT modes, ensure external synchronization.
Resource Pooling: Resources (databases, HTTP clients) use connection pooling automatically. Configure pool_size in resource definitions for optimal performance.
Error Handling: Process methods return success/error dicts rather than raising exceptions, allowing graceful degradation. Use try/except only for critical failures.
Memory Management: For large datasets, use process_stream() with use_streaming=True for constant memory usage regardless of file size.
Example
Basic async processing:
import asyncio
from dataknobs_fsm.api.async_simple import AsyncSimpleFSM
async def main():
# Create FSM
fsm = AsyncSimpleFSM('pipeline.yaml')
try:
# Process single record
result = await fsm.process({
'text': 'Input data',
'metadata': {'source': 'api'}
})
if result['success']:
print(f"Result: {result['data']}")
print(f"States: {' -> '.join(result['path'])}")
else:
print(f"Error: {result['error']}")
finally:
await fsm.close()
asyncio.run(main())
Concurrent processing with asyncio.gather:
async def process_concurrent():
fsm = AsyncSimpleFSM('config.yaml')
try:
# Create tasks for concurrent execution
tasks = [
fsm.process({'id': 1, 'text': 'Item 1'}),
fsm.process({'id': 2, 'text': 'Item 2'}),
fsm.process({'id': 3, 'text': 'Item 3'})
]
# Execute concurrently
results = await asyncio.gather(*tasks)
# Check results
for i, result in enumerate(results, 1):
status = "✓" if result['success'] else "✗"
print(f"{status} Item {i}: {result.get('data', result.get('error'))}")
finally:
await fsm.close()
Production web service pattern:
from fastapi import FastAPI
from contextlib import asynccontextmanager
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup: initialize FSM
app.state.fsm = AsyncSimpleFSM(
'production.yaml',
resources={
'db': {
'type': 'DATABASE',
'backend': 'postgres',
'pool_size': 20, # Connection pooling
'host': 'db.prod.example.com'
}
}
)
yield
# Shutdown: cleanup
await app.state.fsm.close()
app = FastAPI(lifespan=lifespan)
@app.post("/process")
async def process(request: dict):
result = await app.state.fsm.process(request)
return result
Batch processing with progress tracking:
async def process_with_progress():
fsm = AsyncSimpleFSM('pipeline.yaml')
# Progress callback
def on_progress(current, total):
pct = (current / total) * 100
print(f"Progress: {current}/{total} ({pct:.1f}%)")
try:
records = [{'id': i} for i in range(1000)]
results = await fsm.process_batch(
data=records,
batch_size=50,
max_workers=10,
on_progress=on_progress
)
successful = sum(1 for r in results if r['success'])
print(f"Success rate: {successful}/{len(results)}")
finally:
await fsm.close()
Stream processing large files:
async def process_large_file():
fsm = AsyncSimpleFSM('transform.yaml')
try:
# Memory-efficient streaming
stats = await fsm.process_stream(
source='input_100gb.jsonl',
sink='output.jsonl',
chunk_size=1000,
use_streaming=True # Constant memory usage
)
print(f"Processed: {stats['total_processed']}")
print(f"Success: {stats['successful']}")
print(f"Failed: {stats['failed']}")
print(f"Duration: {stats['duration']:.2f}s")
print(f"Throughput: {stats['throughput']:.2f} records/sec")
finally:
await fsm.close()
See Also
- :class:
SimpleFSM: Synchronous wrapper for scripts - :class:
AdvancedFSM: Advanced API with debugging - :func:
create_async_fsm: Factory function for creating instances - :mod:
dataknobs_fsm.execution.async_engine: Async execution engine - :mod:
dataknobs_fsm.resources.manager: Resource management
Initialize AsyncSimpleFSM from configuration.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
str | Path | dict[str, Any]
|
Path to config file or config dictionary |
required |
data_mode
|
DataHandlingMode
|
Default data mode for processing |
COPY
|
resources
|
dict[str, Any] | None
|
Optional resource configurations |
None
|
custom_functions
|
dict[str, Callable] | None
|
Optional custom functions to register |
None
|
Source code in packages/fsm/src/dataknobs_fsm/api/async_simple.py
Attributes¶
Functions¶
process
async
¶
Process a single record through the FSM asynchronously.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data
|
dict[str, Any] | Record
|
Input data to process |
required |
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dict containing the processed result |
Source code in packages/fsm/src/dataknobs_fsm/api/async_simple.py
process_batch
async
¶
process_batch(
data: list[dict[str, Any] | Record],
batch_size: int = 10,
max_workers: int = 4,
on_progress: Callable | None = None,
) -> list[dict[str, Any]]
Process multiple records in parallel batches asynchronously.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data
|
list[dict[str, Any] | Record]
|
List of input records to process |
required |
batch_size
|
int
|
Number of records per batch |
10
|
max_workers
|
int
|
Maximum parallel workers |
4
|
on_progress
|
Callable | None
|
Optional callback for progress updates |
None
|
Returns:
| Type | Description |
|---|---|
list[dict[str, Any]]
|
List of results for each input record |
Source code in packages/fsm/src/dataknobs_fsm/api/async_simple.py
process_stream
async
¶
process_stream(
source: str | AsyncIterator[dict[str, Any]],
sink: str | None = None,
chunk_size: int = 100,
on_progress: Callable | None = None,
input_format: str = "auto",
text_field_name: str = "text",
csv_delimiter: str = ",",
csv_has_header: bool = True,
skip_empty_lines: bool = True,
use_streaming: bool = False,
) -> dict[str, Any]
Process a stream of data through the FSM asynchronously.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
source
|
str | AsyncIterator[dict[str, Any]]
|
Data source (file path or async iterator) |
required |
sink
|
str | None
|
Optional output destination |
None
|
chunk_size
|
int
|
Size of processing chunks |
100
|
on_progress
|
Callable | None
|
Optional progress callback |
None
|
input_format
|
str
|
Input file format ('auto', 'jsonl', 'json', 'csv', 'text') |
'auto'
|
text_field_name
|
str
|
Field name for text lines when converting to dict |
'text'
|
csv_delimiter
|
str
|
CSV delimiter character |
','
|
csv_has_header
|
bool
|
Whether CSV file has header row |
True
|
skip_empty_lines
|
bool
|
Skip empty lines in text files |
True
|
use_streaming
|
bool
|
Use memory-efficient streaming for large files |
False
|
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dict containing stream processing statistics |
Source code in packages/fsm/src/dataknobs_fsm/api/async_simple.py
666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 | |
validate
async
¶
Validate data against FSM's start state schema asynchronously.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data
|
dict[str, Any] | Record
|
Data to validate |
required |
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dict containing validation results |
Source code in packages/fsm/src/dataknobs_fsm/api/async_simple.py
get_states ¶
Get list of all state names in the FSM.
Source code in packages/fsm/src/dataknobs_fsm/api/async_simple.py
get_resources ¶
AdvancedFSM ¶
AdvancedFSM(
config: FSM | str | Path | dict[str, Any],
execution_mode: ExecutionMode = ExecutionMode.STEP_BY_STEP,
custom_functions: dict[str, Callable] | None = None,
)
Advanced FSM interface with full control capabilities.
Initialize AdvancedFSM.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
FSM | str | Path | dict[str, Any]
|
Either a pre-built :class: |
required |
execution_mode
|
ExecutionMode
|
Execution mode for advanced control. |
STEP_BY_STEP
|
custom_functions
|
dict[str, Callable] | None
|
Optional custom functions to register. When
config is a path or dict, these are registered before
building the FSM. In all cases (including pre-built
:class: |
None
|
Methods:
| Name | Description |
|---|---|
set_execution_strategy |
Set custom execution strategy. |
set_data_handler |
Set custom data handler. |
configure_transactions |
Configure transaction management. |
register_resource |
Register a custom resource. |
set_hooks |
Set execution hooks for monitoring. |
add_breakpoint |
Add a breakpoint at a specific state. |
remove_breakpoint |
Remove a breakpoint. |
clear_breakpoints |
Clear all breakpoints. |
enable_history |
Enable execution history tracking. |
disable_history |
Disable history tracking. |
create_context |
Create an execution context for manual control (synchronous). |
execution_context |
Create an execution context for manual control. |
step |
Execute a single transition step. |
run_until_breakpoint |
Run execution until a breakpoint is hit. |
trace_execution |
Execute with full tracing enabled. |
profile_execution |
Execute with performance profiling. |
get_available_transitions |
Get available transitions from a state. |
inspect_state |
Inspect a state's configuration. |
visualize_fsm |
Generate a visual representation of the FSM. |
validate_network |
Validate the FSM network for consistency. |
get_history |
Get execution history if enabled. |
save_history |
Save execution history to storage. |
load_history |
Load execution history from storage. |
execute_step_sync |
Execute a single transition step synchronously. |
execute_step_async |
Execute a single transition step asynchronously. |
run_until_breakpoint_sync |
Run execution until a breakpoint is hit (synchronous). |
trace_execution_sync |
Execute with full tracing enabled (synchronous). |
profile_execution_sync |
Execute with performance profiling (synchronous). |
Attributes:
| Name | Type | Description |
|---|---|---|
breakpoints |
set
|
Get the current breakpoints. |
hooks |
ExecutionHook
|
Get the current execution hooks. |
history_enabled |
bool
|
Check if history tracking is enabled. |
max_history_depth |
int
|
Get the maximum history depth. |
execution_history |
list
|
Get the execution history steps. |
Source code in packages/fsm/src/dataknobs_fsm/api/advanced.py
Attributes¶
Functions¶
set_execution_strategy ¶
Set custom execution strategy.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
strategy
|
TraversalStrategy
|
Execution strategy to use |
required |
set_data_handler ¶
Set custom data handler.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
handler
|
DataHandler
|
Data handler implementation |
required |
configure_transactions ¶
Configure transaction management.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
strategy
|
TransactionStrategy
|
Transaction strategy to use |
required |
**config
|
Any
|
Strategy-specific configuration |
{}
|
Source code in packages/fsm/src/dataknobs_fsm/api/advanced.py
register_resource ¶
Register a custom resource.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Resource name |
required |
resource
|
IResourceProvider | dict[str, Any]
|
Resource instance or configuration |
required |
Source code in packages/fsm/src/dataknobs_fsm/api/advanced.py
set_hooks ¶
Set execution hooks for monitoring.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
hooks
|
ExecutionHook
|
Execution hooks configuration |
required |
add_breakpoint ¶
Add a breakpoint at a specific state.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
state_name
|
str
|
Name of state to break at |
required |
remove_breakpoint ¶
Remove a breakpoint.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
state_name
|
str
|
Name of state to remove breakpoint from |
required |
clear_breakpoints ¶
enable_history ¶
Enable execution history tracking.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
storage
|
IHistoryStorage | None
|
Optional storage backend for history |
None
|
max_depth
|
int
|
Maximum history depth to track |
100
|
Source code in packages/fsm/src/dataknobs_fsm/api/advanced.py
disable_history ¶
create_context ¶
create_context(
data: dict[str, Any] | Record,
data_mode: DataHandlingMode = DataHandlingMode.COPY,
initial_state: str | None = None,
) -> ExecutionContext
Create an execution context for manual control (synchronous).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data
|
dict[str, Any] | Record
|
Initial data |
required |
data_mode
|
DataHandlingMode
|
Data handling mode |
COPY
|
initial_state
|
str | None
|
Starting state name |
None
|
Returns:
| Type | Description |
|---|---|
ExecutionContext
|
ExecutionContext for manual execution |
Source code in packages/fsm/src/dataknobs_fsm/api/advanced.py
execution_context
async
¶
execution_context(
data: dict[str, Any] | Record,
data_mode: DataHandlingMode = DataHandlingMode.COPY,
initial_state: str | None = None,
) -> AsyncGenerator[ExecutionContext, None]
Create an execution context for manual control.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data
|
dict[str, Any] | Record
|
Initial data |
required |
data_mode
|
DataHandlingMode
|
Data handling mode |
COPY
|
initial_state
|
str | None
|
Starting state name |
None
|
Yields:
| Type | Description |
|---|---|
AsyncGenerator[ExecutionContext, None]
|
ExecutionContext for manual execution |
Source code in packages/fsm/src/dataknobs_fsm/api/advanced.py
step
async
¶
Execute a single transition step.
.. versionchanged::
Return type changed from StateInstance | None to StepResult.
Use result.success and result.transition != "none" instead
of checking None.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
context
|
ExecutionContext
|
Execution context |
required |
arc_name
|
str | None
|
Optional specific arc to follow |
None
|
Returns:
| Type | Description |
|---|---|
StepResult
|
StepResult with transition details |
Source code in packages/fsm/src/dataknobs_fsm/api/advanced.py
run_until_breakpoint
async
¶
Run execution until a breakpoint is hit.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
context
|
ExecutionContext
|
Execution context |
required |
max_steps
|
int
|
Maximum steps to execute (safety limit) |
1000
|
Returns:
| Type | Description |
|---|---|
StepResult | None
|
StepResult where execution stopped, or None if no steps taken |
Source code in packages/fsm/src/dataknobs_fsm/api/advanced.py
trace_execution
async
¶
trace_execution(
data: dict[str, Any] | Record, initial_state: str | None = None
) -> list[dict[str, Any]]
Execute with full tracing enabled.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data
|
dict[str, Any] | Record
|
Input data |
required |
initial_state
|
str | None
|
Optional starting state |
None
|
Returns:
| Type | Description |
|---|---|
list[dict[str, Any]]
|
List of trace entries |
Source code in packages/fsm/src/dataknobs_fsm/api/advanced.py
profile_execution
async
¶
profile_execution(
data: dict[str, Any] | Record, initial_state: str | None = None
) -> dict[str, Any]
Execute with performance profiling.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data
|
dict[str, Any] | Record
|
Input data |
required |
initial_state
|
str | None
|
Optional starting state |
None
|
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Profiling data |
Source code in packages/fsm/src/dataknobs_fsm/api/advanced.py
get_available_transitions ¶
Get available transitions from a state.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
state_name
|
str
|
Name of state |
required |
Returns:
| Type | Description |
|---|---|
list[dict[str, Any]]
|
List of available transition information |
Source code in packages/fsm/src/dataknobs_fsm/api/advanced.py
inspect_state ¶
Inspect a state's configuration.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
state_name
|
str
|
Name of state to inspect |
required |
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
State configuration details |
Source code in packages/fsm/src/dataknobs_fsm/api/advanced.py
visualize_fsm ¶
Generate a visual representation of the FSM.
Returns:
| Type | Description |
|---|---|
str
|
GraphViz DOT format string |
Source code in packages/fsm/src/dataknobs_fsm/api/advanced.py
validate_network
async
¶
Validate the FSM network for consistency.
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Validation results |
Source code in packages/fsm/src/dataknobs_fsm/api/advanced.py
get_history ¶
Get execution history if enabled.
Returns:
| Type | Description |
|---|---|
ExecutionHistory | None
|
Execution history or None |
save_history
async
¶
Save execution history to storage.
Returns:
| Type | Description |
|---|---|
str | None
|
Storage ID for the saved history, or None if no storage |
str | None
|
is configured or no history exists. |
Source code in packages/fsm/src/dataknobs_fsm/api/advanced.py
load_history
async
¶
Load execution history from storage.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
history_id
|
str
|
History identifier |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if loaded successfully |
Source code in packages/fsm/src/dataknobs_fsm/api/advanced.py
execute_step_sync ¶
Execute a single transition step synchronously.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
context
|
ExecutionContext
|
Execution context |
required |
arc_name
|
str | None
|
Optional specific arc to follow |
None
|
Returns:
| Type | Description |
|---|---|
StepResult
|
StepResult with transition details |
Source code in packages/fsm/src/dataknobs_fsm/api/advanced.py
1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 | |
execute_step_async
async
¶
Execute a single transition step asynchronously.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
context
|
ExecutionContext
|
Execution context |
required |
arc_name
|
str | None
|
Optional specific arc to follow |
None
|
Returns:
| Type | Description |
|---|---|
StepResult
|
StepResult with transition details |
Source code in packages/fsm/src/dataknobs_fsm/api/advanced.py
1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605 1606 | |
run_until_breakpoint_sync ¶
Run execution until a breakpoint is hit (synchronous).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
context
|
ExecutionContext
|
Execution context |
required |
max_steps
|
int
|
Maximum steps to execute |
1000
|
Returns:
| Type | Description |
|---|---|
StepResult | None
|
StepResult where execution stopped, or None if no steps taken |
Source code in packages/fsm/src/dataknobs_fsm/api/advanced.py
trace_execution_sync ¶
trace_execution_sync(
data: dict[str, Any] | Record,
initial_state: str | None = None,
max_steps: int = 1000,
) -> list[dict[str, Any]]
Execute with full tracing enabled (synchronous).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data
|
dict[str, Any] | Record
|
Input data |
required |
initial_state
|
str | None
|
Optional starting state |
None
|
max_steps
|
int
|
Maximum steps to execute |
1000
|
Returns:
| Type | Description |
|---|---|
list[dict[str, Any]]
|
List of trace entries |
Source code in packages/fsm/src/dataknobs_fsm/api/advanced.py
profile_execution_sync ¶
profile_execution_sync(
data: dict[str, Any] | Record,
initial_state: str | None = None,
max_steps: int = 1000,
) -> dict[str, Any]
Execute with performance profiling (synchronous).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data
|
dict[str, Any] | Record
|
Input data |
required |
initial_state
|
str | None
|
Optional starting state |
None
|
max_steps
|
int
|
Maximum steps to execute |
1000
|
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Profiling data |
Source code in packages/fsm/src/dataknobs_fsm/api/advanced.py
ExecutionMode ¶
Bases: Enum
Advanced execution modes.
ExecutionHook
dataclass
¶
ExecutionHook(
on_state_enter: Callable | None = None,
on_state_exit: Callable | None = None,
on_arc_execute: Callable | None = None,
on_error: Callable | None = None,
on_resource_acquire: Callable | None = None,
on_resource_release: Callable | None = None,
on_transaction_begin: Callable | None = None,
on_transaction_commit: Callable | None = None,
on_transaction_rollback: Callable | None = None,
)
Hook for monitoring execution events.
StepResult
dataclass
¶
StepResult(
from_state: str,
to_state: str,
transition: str,
data_before: dict[str, Any] = dict(),
data_after: dict[str, Any] = dict(),
duration: float = 0.0,
success: bool = True,
error: str | None = None,
at_breakpoint: bool = False,
is_complete: bool = False,
)
Result from a single step execution.
FSMDebugger ¶
Interactive debugger for FSM execution (fully synchronous).
Initialize debugger.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
fsm
|
AdvancedFSM
|
Advanced FSM instance to debug |
required |
Methods:
| Name | Description |
|---|---|
start |
Start debugging session (synchronous). |
step |
Execute single step and return detailed result. |
continue_to_breakpoint |
Continue execution until a breakpoint is hit. |
inspect |
Inspect data at path. |
watch |
Add a watch expression. |
unwatch |
Remove a watch expression. |
print_watches |
Print all watch values. |
print_state |
Print current state information. |
inspect_current_state |
Get detailed information about current state. |
get_history |
Get recent execution history. |
reset |
Reset debugger with new data. |
Attributes:
| Name | Type | Description |
|---|---|---|
current_state |
str | None
|
Get the current state name. |
watches |
dict[str, Any]
|
Get current watch variable values. |
Source code in packages/fsm/src/dataknobs_fsm/api/advanced.py
Attributes¶
Functions¶
start ¶
Start debugging session (synchronous).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data
|
dict[str, Any] | Record
|
Initial data |
required |
initial_state
|
str | None
|
Optional starting state |
None
|
Source code in packages/fsm/src/dataknobs_fsm/api/advanced.py
step ¶
Execute single step and return detailed result.
Returns:
| Type | Description |
|---|---|
StepResult
|
StepResult with transition details |
Source code in packages/fsm/src/dataknobs_fsm/api/advanced.py
continue_to_breakpoint ¶
Continue execution until a breakpoint is hit.
Steps are recorded in execution_history and counted in
step_count, keeping the debugger state consistent with
the actual execution path.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
max_steps
|
int
|
Safety limit to prevent infinite loops. |
1000
|
Returns:
| Type | Description |
|---|---|
StepResult | None
|
StepResult where execution stopped, or None if no session |
Source code in packages/fsm/src/dataknobs_fsm/api/advanced.py
inspect ¶
Inspect data at path.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str
|
Dot-separated path to data field (empty for all data) |
''
|
Returns:
| Type | Description |
|---|---|
Any
|
Value at path |
Source code in packages/fsm/src/dataknobs_fsm/api/advanced.py
watch ¶
Add a watch expression.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Watch name |
required |
path
|
str
|
Data path to watch |
required |
Source code in packages/fsm/src/dataknobs_fsm/api/advanced.py
unwatch ¶
Remove a watch expression.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Watch name to remove |
required |
print_watches ¶
Print all watch values.
Source code in packages/fsm/src/dataknobs_fsm/api/advanced.py
print_state ¶
Print current state information.
Source code in packages/fsm/src/dataknobs_fsm/api/advanced.py
inspect_current_state ¶
Get detailed information about current state.
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dictionary with state details |
Source code in packages/fsm/src/dataknobs_fsm/api/advanced.py
get_history ¶
Get recent execution history.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
limit
|
int
|
Maximum number of steps to return |
10
|
Returns:
| Type | Description |
|---|---|
list[StepResult]
|
List of recent step results |
Source code in packages/fsm/src/dataknobs_fsm/api/advanced.py
ExecutionContext ¶
ExecutionContext(
data_mode: ProcessingMode = ProcessingMode.SINGLE,
transaction_mode: TransactionMode = TransactionMode.NONE,
resources: Dict[str, Any] | None = None,
database: Union[SyncDatabase, AsyncDatabase] | None = None,
stream_context: StreamContext | None = None,
)
Execution context for FSM processing with full mode support.
This context manages: - Data mode configuration (single, batch, stream) - Transaction management - Resource allocation and tracking - Stream coordination - State tracking and network stack - Parallel execution paths
Initialize execution context.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data_mode
|
ProcessingMode
|
Data processing mode. |
SINGLE
|
transaction_mode
|
TransactionMode
|
Transaction handling mode. |
NONE
|
resources
|
Dict[str, Any] | None
|
Initial resource configurations. |
None
|
database
|
Union[SyncDatabase, AsyncDatabase] | None
|
Database connection for transactions. |
None
|
stream_context
|
StreamContext | None
|
Stream context for stream mode. |
None
|
Methods:
| Name | Description |
|---|---|
push_network |
Push a network onto the execution stack. |
pop_network |
Pop a network from the execution stack. |
set_state |
Set the current state. |
allocate_resource |
Allocate a resource. |
release_resource |
Release an allocated resource. |
start_transaction |
Start a new transaction. |
commit_transaction |
Commit the current transaction. |
rollback_transaction |
Rollback the current transaction. |
log_operation |
Log an operation in the current transaction. |
set_stream_chunk |
Set the current stream chunk for processing. |
add_batch_item |
Add an item to the batch. |
add_batch_result |
Add a result to batch results. |
add_batch_error |
Add an error to batch errors. |
create_child_context |
Create a child context for parallel execution. |
merge_child_context |
Merge a child context back into parent. |
get_resource_usage |
Get current resource usage statistics. |
get_performance_stats |
Get performance statistics. |
get_complete_path |
Get the complete state traversal path including current state. |
clone |
Create a clone of this context. |
is_complete |
Check if the FSM execution has reached an end state. |
get_current_state |
Get the name of the current state. |
get_data_snapshot |
Get a snapshot of the current data. |
get_execution_stats |
Get execution statistics. |
get_current_state_instance |
Get the current state instance object. |
Source code in packages/fsm/src/dataknobs_fsm/execution/context.py
Functions¶
push_network ¶
Push a network onto the execution stack.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
network_name
|
str
|
Name of network to push. |
required |
return_state
|
str | None
|
State to return to after network completes. |
None
|
Source code in packages/fsm/src/dataknobs_fsm/execution/context.py
pop_network ¶
Pop a network from the execution stack.
Returns:
| Type | Description |
|---|---|
Tuple[str, str | None]
|
Tuple of (network_name, return_state). |
Source code in packages/fsm/src/dataknobs_fsm/execution/context.py
set_state ¶
Set the current state.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
state_name
|
str
|
Name of the new state. |
required |
Source code in packages/fsm/src/dataknobs_fsm/execution/context.py
allocate_resource ¶
allocate_resource(
resource_type: str, resource_id: str, metadata: Dict[str, Any] | None = None
) -> bool
Allocate a resource.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
resource_type
|
str
|
Type of resource. |
required |
resource_id
|
str
|
Unique resource identifier. |
required |
metadata
|
Dict[str, Any] | None
|
Optional resource metadata. |
None
|
Returns:
| Type | Description |
|---|---|
bool
|
True if allocation successful. |
Source code in packages/fsm/src/dataknobs_fsm/execution/context.py
release_resource ¶
Release an allocated resource.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
resource_type
|
str
|
Type of resource. |
required |
resource_id
|
str
|
Resource identifier. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if release successful. |
Source code in packages/fsm/src/dataknobs_fsm/execution/context.py
start_transaction ¶
Start a new transaction.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
transaction_id
|
str | None
|
Optional transaction ID. |
None
|
Returns:
| Type | Description |
|---|---|
bool
|
True if transaction started. |
Source code in packages/fsm/src/dataknobs_fsm/execution/context.py
commit_transaction ¶
Commit the current transaction.
Returns:
| Type | Description |
|---|---|
bool
|
True if commit successful. |
Source code in packages/fsm/src/dataknobs_fsm/execution/context.py
rollback_transaction ¶
Rollback the current transaction.
Returns:
| Type | Description |
|---|---|
bool
|
True if rollback successful. |
Source code in packages/fsm/src/dataknobs_fsm/execution/context.py
log_operation ¶
Log an operation in the current transaction.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
operation
|
str
|
Operation name. |
required |
details
|
Dict[str, Any]
|
Operation details. |
required |
Source code in packages/fsm/src/dataknobs_fsm/execution/context.py
set_stream_chunk ¶
Set the current stream chunk for processing.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
chunk
|
StreamChunk
|
Stream chunk to process. |
required |
add_batch_item ¶
Add an item to the batch.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
item
|
Any
|
Item to add to batch. |
required |
add_batch_result ¶
Add a result to batch results.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
result
|
Any
|
Processing result. |
required |
add_batch_error ¶
Add an error to batch errors.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
index
|
int
|
Batch item index. |
required |
error
|
Exception
|
Error that occurred. |
required |
Source code in packages/fsm/src/dataknobs_fsm/execution/context.py
create_child_context ¶
Create a child context for parallel execution.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path_id
|
str
|
Unique identifier for the execution path. |
required |
Returns:
| Type | Description |
|---|---|
ExecutionContext
|
New child execution context. |
Source code in packages/fsm/src/dataknobs_fsm/execution/context.py
merge_child_context ¶
Merge a child context back into parent.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path_id
|
str
|
Path identifier to merge. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if merge successful. |
Source code in packages/fsm/src/dataknobs_fsm/execution/context.py
get_resource_usage ¶
Get current resource usage statistics.
Returns:
| Type | Description |
|---|---|
Dict[str, Any]
|
Resource usage information. |
Source code in packages/fsm/src/dataknobs_fsm/execution/context.py
get_performance_stats ¶
Get performance statistics.
Returns:
| Type | Description |
|---|---|
Dict[str, Any]
|
Performance statistics. |
Source code in packages/fsm/src/dataknobs_fsm/execution/context.py
get_complete_path ¶
Get the complete state traversal path including current state.
Returns:
| Type | Description |
|---|---|
List[str]
|
List of state names in traversal order. |
Source code in packages/fsm/src/dataknobs_fsm/execution/context.py
clone ¶
Create a clone of this context.
Returns:
| Type | Description |
|---|---|
ExecutionContext
|
Cloned execution context. |
Source code in packages/fsm/src/dataknobs_fsm/execution/context.py
is_complete ¶
Check if the FSM execution has reached an end state.
Returns:
| Type | Description |
|---|---|
bool
|
True if in an end state or no current state, False otherwise. |
Source code in packages/fsm/src/dataknobs_fsm/execution/context.py
get_current_state ¶
Get the name of the current state.
Returns:
| Type | Description |
|---|---|
str | None
|
Current state name or None if not set. |
get_data_snapshot ¶
Get a snapshot of the current data.
Returns:
| Type | Description |
|---|---|
Dict[str, Any]
|
Copy of the current data dictionary. |
Source code in packages/fsm/src/dataknobs_fsm/execution/context.py
get_execution_stats ¶
Get execution statistics.
Returns:
| Type | Description |
|---|---|
Dict[str, Any]
|
Dictionary with execution metrics. |
Source code in packages/fsm/src/dataknobs_fsm/execution/context.py
get_current_state_instance ¶
Get the current state instance object.
Returns:
| Type | Description |
|---|---|
Any
|
The StateInstance object for the current state, or None if not set. |
ConfigLoader ¶
Load and process FSM configurations from various sources.
Initialize the ConfigLoader.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
use_dataknobs_config
|
bool
|
Whether to use dataknobs_config for advanced features. |
False
|
Methods:
| Name | Description |
|---|---|
add_registered_function |
Add a function name to the set of registered functions. |
load_from_file |
Load configuration from a file. |
load_from_dict |
Load configuration from a dictionary. |
load_from_template |
Load configuration from a template. |
load_template_config |
Load configuration from a template configuration object. |
validate_file |
Validate a configuration file without fully loading it. |
merge_configs |
Merge multiple FSM configurations. |
Source code in packages/fsm/src/dataknobs_fsm/config/loader.py
Functions¶
add_registered_function ¶
Add a function name to the set of registered functions.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Function name that has been registered. |
required |
load_from_file ¶
load_from_file(
file_path: Union[str, Path],
resolve_env: bool = True,
resolve_references: bool = True,
) -> FSMConfig
Load configuration from a file.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
file_path
|
Union[str, Path]
|
Path to configuration file (JSON or YAML). |
required |
resolve_env
|
bool
|
Whether to resolve environment variables. |
True
|
resolve_references
|
bool
|
Whether to resolve file references. |
True
|
Returns:
| Type | Description |
|---|---|
FSMConfig
|
Validated FSMConfig instance. |
Raises:
| Type | Description |
|---|---|
FileNotFoundError
|
If file doesn't exist. |
ValueError
|
If file format is not supported. |
Source code in packages/fsm/src/dataknobs_fsm/config/loader.py
load_from_dict ¶
Load configuration from a dictionary.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config_dict
|
Dict[str, Any]
|
Configuration dictionary. |
required |
resolve_env
|
bool
|
Whether to resolve environment variables. |
True
|
Returns:
| Type | Description |
|---|---|
FSMConfig
|
Validated FSMConfig instance. |
Source code in packages/fsm/src/dataknobs_fsm/config/loader.py
load_from_template ¶
load_from_template(
template: Union[UseCaseTemplate, str],
params: Dict[str, Any] | None = None,
overrides: Dict[str, Any] | None = None,
) -> FSMConfig
Load configuration from a template.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
template
|
Union[UseCaseTemplate, str]
|
Template name or enum value. |
required |
params
|
Dict[str, Any] | None
|
Template parameters. |
None
|
overrides
|
Dict[str, Any] | None
|
Configuration overrides. |
None
|
Returns:
| Type | Description |
|---|---|
FSMConfig
|
Validated FSMConfig instance. |
Source code in packages/fsm/src/dataknobs_fsm/config/loader.py
load_template_config ¶
Load configuration from a template configuration object.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
template_config
|
TemplateConfig
|
Template configuration. |
required |
Returns:
| Type | Description |
|---|---|
FSMConfig
|
Validated FSMConfig instance. |
Source code in packages/fsm/src/dataknobs_fsm/config/loader.py
validate_file ¶
Validate a configuration file without fully loading it.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
file_path
|
Union[str, Path]
|
Path to configuration file. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if valid, False otherwise. |
Source code in packages/fsm/src/dataknobs_fsm/config/loader.py
merge_configs ¶
Merge multiple FSM configurations.
Later configurations override earlier ones.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
*configs
|
FSMConfig
|
FSMConfig instances to merge. |
()
|
Returns:
| Type | Description |
|---|---|
FSMConfig
|
Merged FSMConfig instance. |
Source code in packages/fsm/src/dataknobs_fsm/config/loader.py
FSMBuilder ¶
Build executable FSM instances from configuration.
Initialize the FSMBuilder.
Methods:
| Name | Description |
|---|---|
build |
Build an FSM instance from configuration. |
register_function |
Register a custom function. |
Source code in packages/fsm/src/dataknobs_fsm/config/builder.py
Functions¶
build ¶
Build an FSM instance from configuration.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
FSMConfig
|
FSM configuration. |
required |
Returns:
| Type | Description |
|---|---|
FSM
|
Executable FSM instance. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If configuration is invalid or incomplete. |
Source code in packages/fsm/src/dataknobs_fsm/config/builder.py
register_function ¶
Register a custom function.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Function name for reference in configuration. |
required |
func
|
Callable
|
Function implementation. |
required |
Source code in packages/fsm/src/dataknobs_fsm/config/builder.py
ExecutionHistoryQuery
dataclass
¶
ExecutionHistoryQuery(
from_state: str | None = None,
to_state: str | None = None,
trigger: str | None = None,
transition_name: str | None = None,
since: float | None = None,
until: float | None = None,
success_only: bool = False,
failed_only: bool = False,
limit: int | None = None,
)
Query parameters for filtering execution history.
Attributes:
| Name | Type | Description |
|---|---|---|
from_state |
str | None
|
Filter by source state |
to_state |
str | None
|
Filter by target state |
trigger |
str | None
|
Filter by trigger type |
transition_name |
str | None
|
Filter by transition name |
since |
float | None
|
Filter to records after this timestamp |
until |
float | None
|
Filter to records before this timestamp |
success_only |
bool
|
Only include successful transitions |
failed_only |
bool
|
Only include failed transitions |
limit |
int | None
|
Maximum number of records to return |
ExecutionRecord
dataclass
¶
ExecutionRecord(
from_state: str,
to_state: str,
timestamp: float,
trigger: str = "step",
transition_name: str | None = None,
duration_in_state_ms: float = 0.0,
data_before: dict[str, Any] | None = None,
data_after: dict[str, Any] | None = None,
condition_evaluated: str | None = None,
condition_result: bool | None = None,
success: bool = True,
error: str | None = None,
)
Record of a single FSM state transition.
Captures all relevant information about a state transition including timing, states, trigger, and data snapshots.
Attributes:
| Name | Type | Description |
|---|---|---|
from_state |
str
|
State name before the transition |
to_state |
str
|
State name after the transition |
timestamp |
float
|
Unix timestamp when transition occurred |
trigger |
str
|
What triggered the transition (e.g., "step", "auto", "external") |
transition_name |
str | None
|
Name of the transition/arc that was taken |
duration_in_state_ms |
float
|
Time spent in the from_state in milliseconds |
data_before |
dict[str, Any] | None
|
Data state before the transition |
data_after |
dict[str, Any] | None
|
Data state after the transition |
condition_evaluated |
str | None
|
The condition expression that was evaluated (if any) |
condition_result |
bool | None
|
Result of the condition evaluation (True/False) |
success |
bool
|
Whether the transition completed successfully |
error |
str | None
|
Error message if transition failed |
Example
Methods:
| Name | Description |
|---|---|
to_dict |
Convert record to dictionary. |
from_dict |
Create record from dictionary. |
from_step_result |
Create record from a StepResult. |
Functions¶
to_dict ¶
Convert record to dictionary.
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dictionary representation of the record |
from_dict
classmethod
¶
Create record from dictionary.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data
|
dict[str, Any]
|
Dictionary containing record fields |
required |
Returns:
| Type | Description |
|---|---|
ExecutionRecord
|
ExecutionRecord instance |
Source code in packages/fsm/src/dataknobs_fsm/observability.py
from_step_result
classmethod
¶
from_step_result(
step_result: Any,
trigger: str = "step",
state_entry_time: float | None = None,
condition_evaluated: str | None = None,
) -> ExecutionRecord
Create record from a StepResult.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
step_result
|
Any
|
StepResult from AdvancedFSM execution |
required |
trigger
|
str
|
What triggered this transition |
'step'
|
state_entry_time
|
float | None
|
When the from_state was entered (for duration calc) |
None
|
condition_evaluated
|
str | None
|
The condition expression if known |
None
|
Returns:
| Type | Description |
|---|---|
ExecutionRecord
|
ExecutionRecord instance |
Source code in packages/fsm/src/dataknobs_fsm/observability.py
ExecutionStats
dataclass
¶
ExecutionStats(
total_transitions: int = 0,
successful_transitions: int = 0,
failed_transitions: int = 0,
unique_paths: int = 0,
avg_duration_per_state_ms: float = 0.0,
most_visited_state: str | None = None,
most_common_trigger: str | None = None,
first_transition: float | None = None,
last_transition: float | None = None,
)
Aggregated statistics for FSM executions.
Attributes:
| Name | Type | Description |
|---|---|---|
total_transitions |
int
|
Total number of transitions |
successful_transitions |
int
|
Number of successful transitions |
failed_transitions |
int
|
Number of failed transitions |
unique_paths |
int
|
Number of unique state-to-state paths taken |
avg_duration_per_state_ms |
float
|
Average time spent in each state |
most_visited_state |
str | None
|
State that was visited most often |
most_common_trigger |
str | None
|
Most frequent transition trigger |
first_transition |
float | None
|
Timestamp of first transition |
last_transition |
float | None
|
Timestamp of last transition |
ExecutionTracker ¶
Tracks FSM execution history with query capabilities.
Manages a bounded history of state transitions and provides methods for querying and aggregating execution data.
Attributes:
| Name | Type | Description |
|---|---|---|
max_history |
Maximum number of records to retain |
Example
tracker = ExecutionTracker(max_history=1000)
# Record from StepResult
tracker.record_from_step_result(step_result, trigger="step")
# Or record directly
tracker.record(ExecutionRecord(
from_state="start",
to_state="processing",
timestamp=time.time(),
trigger="step",
success=True,
))
# Query history
recent = tracker.query(ExecutionHistoryQuery(
from_state="processing",
since=time.time() - 3600,
))
# Get stats
stats = tracker.get_stats()
Initialize tracker.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
max_history
|
int
|
Maximum records to retain (default 100) |
100
|
Methods:
| Name | Description |
|---|---|
record |
Record an execution. |
record_from_step_result |
Record execution from a StepResult. |
mark_state_entry |
Mark the current time as state entry time. |
query |
Query execution history. |
get_stats |
Get aggregated execution statistics. |
get_state_flow |
Get the sequence of states visited. |
clear |
Clear all execution history. |
__len__ |
Return number of records in history. |
Source code in packages/fsm/src/dataknobs_fsm/observability.py
Functions¶
record ¶
Record an execution.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
execution
|
ExecutionRecord
|
The execution record to store |
required |
Source code in packages/fsm/src/dataknobs_fsm/observability.py
record_from_step_result ¶
record_from_step_result(
step_result: Any,
trigger: str = "step",
condition_evaluated: str | None = None,
) -> ExecutionRecord
Record execution from a StepResult.
Convenience method that creates an ExecutionRecord from StepResult and records it.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
step_result
|
Any
|
StepResult from AdvancedFSM execution |
required |
trigger
|
str
|
What triggered this transition |
'step'
|
condition_evaluated
|
str | None
|
The condition expression if known |
None
|
Returns:
| Type | Description |
|---|---|
ExecutionRecord
|
The created ExecutionRecord |
Source code in packages/fsm/src/dataknobs_fsm/observability.py
mark_state_entry ¶
Mark the current time as state entry time.
Call this when entering a new state to track duration.
query ¶
Query execution history.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query
|
ExecutionHistoryQuery | None
|
Query parameters, or None for all records |
None
|
Returns:
| Type | Description |
|---|---|
list[ExecutionRecord]
|
List of matching execution records |
Source code in packages/fsm/src/dataknobs_fsm/observability.py
get_stats ¶
Get aggregated execution statistics.
Returns:
| Type | Description |
|---|---|
ExecutionStats
|
ExecutionStats with aggregated metrics |
Source code in packages/fsm/src/dataknobs_fsm/observability.py
get_state_flow ¶
Get the sequence of states visited.
Returns:
| Type | Description |
|---|---|
list[str]
|
List of state names in order visited |
Source code in packages/fsm/src/dataknobs_fsm/observability.py
clear ¶
Functions¶
create_advanced_fsm ¶
create_advanced_fsm(
config: str | Path | dict[str, Any] | FSM,
custom_functions: dict[str, Callable] | None = None,
**kwargs: Any,
) -> AdvancedFSM
Factory function to create an AdvancedFSM instance.
Delegates to :class:AdvancedFSM which handles both pre-built FSM
instances and raw config (path or dict) using
:func:~dataknobs_fsm.config.builder.build_fsm.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
str | Path | dict[str, Any] | FSM
|
Configuration dict, file path, or pre-built FSM instance. |
required |
custom_functions
|
dict[str, Callable] | None
|
Optional custom functions to register. |
None
|
**kwargs
|
Any
|
Additional arguments forwarded to AdvancedFSM. |
{}
|
Returns:
| Type | Description |
|---|---|
AdvancedFSM
|
Configured AdvancedFSM instance. |
Source code in packages/fsm/src/dataknobs_fsm/api/advanced.py
create_execution_record ¶
create_execution_record(
from_state: str,
to_state: str,
trigger: str = "step",
transition_name: str | None = None,
duration_in_state_ms: float = 0.0,
data_before: dict[str, Any] | None = None,
data_after: dict[str, Any] | None = None,
condition_evaluated: str | None = None,
condition_result: bool | None = None,
success: bool = True,
error: str | None = None,
) -> ExecutionRecord
Factory function to create an execution record.
Convenience function that automatically sets the timestamp.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
from_state
|
str
|
State name before transition |
required |
to_state
|
str
|
State name after transition |
required |
trigger
|
str
|
What triggered the transition |
'step'
|
transition_name
|
str | None
|
Name of the transition/arc |
None
|
duration_in_state_ms
|
float
|
Time spent in from_state |
0.0
|
data_before
|
dict[str, Any] | None
|
Data state before transition |
None
|
data_after
|
dict[str, Any] | None
|
Data state after transition |
None
|
condition_evaluated
|
str | None
|
The condition expression that was evaluated |
None
|
condition_result
|
bool | None
|
Result of the condition evaluation |
None
|
success
|
bool
|
Whether transition succeeded |
True
|
error
|
str | None
|
Error message if transition failed |
None
|
Returns:
| Type | Description |
|---|---|
ExecutionRecord
|
ExecutionRecord with current timestamp |