Skip to content

dataknobs-fsm Complete API Reference

Complete auto-generated API documentation from source code docstrings.

💡 Also see: - Curated API Guide - Hand-crafted tutorials and examples - Package Overview - Introduction and getting started - Source Code - View on GitHub


dataknobs_fsm

DataKnobs Finite State Machine framework.

A flexible FSM framework with data modes, resource management, and streaming support for building complex data processing workflows.

Modules:

Name Description
api

Public APIs for FSM operations across different use cases.

cli

FSM CLI module.

config

FSM configuration system for loading, validating, and building FSM instances.

core

Core FSM components.

execution

Execution module for FSM state machines.

functions

FSM functions module.

io

I/O abstraction layer for FSM patterns.

observability

FSM execution observability for tracking state transitions.

patterns

Pre-configured FSM patterns for common workflow use cases.

resources

Resource management for FSM.

storage

Storage module for FSM execution history.

streaming

Streaming support for large data processing in FSM.

Classes:

Name Description
FSM

Finite State Machine core class.

StateDefinition

Definition of a state in the FSM.

StateInstance

Runtime instance of a state.

ArcDefinition

Definition of an arc between states.

DataHandlingMode

Data handling modes for state instances - defines how data is managed within states.

SimpleFSM

Synchronous FSM interface for simple workflows.

AsyncSimpleFSM

Async-first FSM interface for production workflows.

AdvancedFSM

Advanced FSM interface with full control capabilities.

ExecutionMode

Advanced execution modes.

ExecutionHook

Hook for monitoring execution events.

StepResult

Result from a single step execution.

FSMDebugger

Interactive debugger for FSM execution (fully synchronous).

ExecutionContext

Execution context for FSM processing with full mode support.

ConfigLoader

Load and process FSM configurations from various sources.

FSMBuilder

Build executable FSM instances from configuration.

ExecutionHistoryQuery

Query parameters for filtering execution history.

ExecutionRecord

Record of a single FSM state transition.

ExecutionStats

Aggregated statistics for FSM executions.

ExecutionTracker

Tracks FSM execution history with query capabilities.

Functions:

Name Description
create_advanced_fsm

Factory function to create an AdvancedFSM instance.

create_execution_record

Factory function to create an execution record.

Classes

FSM

FSM(
    name: str,
    data_mode: ProcessingMode = ProcessingMode.SINGLE,
    transaction_mode: TransactionMode = TransactionMode.NONE,
    description: str | None = None,
    resource_manager: Any | None = None,
    transaction_manager: Any | None = None,
)

Finite State Machine core class.

This is the foundational class that defines the structure and configuration of a finite state machine. It serves as the container for state networks, functions, and execution configuration.

Architecture

The FSM class uses a builder pattern approach where: 1. FSM is constructed with configuration 2. Networks are added with states and transitions 3. Functions are registered for state operations 4. FSM is validated before execution 5. Execution engines are created on-demand

Key Components

State Networks: - One or more state graphs (nodes=states, edges=transitions) - Main network for primary execution path - Sub-networks for modular workflows - Accessed via networks dict or get_network()

Function Registry: - Central registry of all functions used in workflows - Validation, transform, and test functions - Referenced by name in state/arc definitions - Ensures all function references are valid

Processing Configuration: - data_mode: How data flows (SINGLE/BATCH/STREAM) - transaction_mode: Transaction guarantees (NONE/OPTIMISTIC/PESSIMISTIC) - resource_manager: Manages external resources - transaction_manager: Coordinates transactions

Execution Engines: - Created lazily via get_engine() / get_async_engine() - Sync engine for SimpleFSM - Async engine for AsyncSimpleFSM - Cached for reuse

Attributes:

Name Type Description
name str

Unique identifier for this FSM

data_mode ProcessingMode

Data processing mode (SINGLE/BATCH/STREAM)

transaction_mode TransactionMode

Transaction handling (NONE/OPTIMISTIC/PESSIMISTIC)

description str | None

Optional FSM description

networks Dict[str, StateNetwork]

State networks by name

main_network_name str | None

Name of the main network

function_registry FunctionRegistry

Registry of all functions

resource_requirements Dict[str, Any]

Aggregated resource requirements

config Dict[str, Any]

Additional configuration

metadata Dict[str, Any]

FSM metadata

version str

FSM version for compatibility

created_at float | None

Creation timestamp

updated_at float | None

Last update timestamp

resource_manager Any | None

External resource manager

transaction_manager Any | None

Transaction coordinator

Design Patterns

Separation of Concerns: - FSM defines structure (what to execute) - Engines handle execution (how to execute) - Clear boundary between definition and runtime

Multiple Networks: - FSMs can contain multiple state graphs - Enables modular workflow composition - Sub-networks can be reused across FSMs

Lazy Initialization: - Engines created on first use - Reduces overhead for validation-only use cases - Cached for subsequent use

Validation Before Execution: - validate() checks structure before execution - Catches configuration errors early - Provides detailed error messages

Resource Management

The FSM aggregates resource requirements from all networks: - Database connections needed - API clients required - File handles expected - Memory requirements

This enables: - Pre-execution resource checks - Resource pooling optimization - Clear dependency documentation

Serialization

FSM supports serialization for persistence and transmission: - to_dict(): Convert to dictionary - from_dict(): Reconstruct from dictionary - Note: Function registry not serialized (functions must be re-registered)

Thread Safety

The FSM structure itself is read-only after construction and validation. However, execution engines may or may not be thread-safe depending on data_mode: - SINGLE mode: Not thread-safe (by design) - BATCH mode: Thread-safe if using DataHandlingMode.COPY - STREAM mode: Thread-safe with proper streaming context

Note

This is an internal API. Users should use the higher-level APIs: - SimpleFSM for synchronous workflows - AsyncSimpleFSM for async/production workflows - AdvancedFSM for debugging and profiling

Direct FSM instantiation is primarily for builders and internal use.

See Also
  • :class:~dataknobs_fsm.api.simple.SimpleFSM: Synchronous API
  • :class:~dataknobs_fsm.api.async_simple.AsyncSimpleFSM: Async API
  • :class:~dataknobs_fsm.core.network.StateNetwork: State graph
  • :class:~dataknobs_fsm.functions.base.FunctionRegistry: Function registry

Initialize FSM.

Parameters:

Name Type Description Default
name str

Name of the FSM.

required
data_mode ProcessingMode

Data processing mode.

SINGLE
transaction_mode TransactionMode

Transaction handling mode.

NONE
description str | None

Optional FSM description.

None

Methods:

Name Description
add_network

Add a network to the FSM.

remove_network

Remove a network from the FSM.

get_network

Get a network by name.

validate

Validate the FSM.

get_all_states

Get all states from all networks.

get_all_arcs

Get all arcs from all networks.

supports_streaming

Check if FSM supports streaming.

get_resource_summary

Get resource requirements summary.

clone

Create a clone of this FSM.

to_dict

Convert FSM to dictionary representation.

from_dict

Create FSM from dictionary representation.

find_state_definition

Find a state definition by name.

create_state_instance

Create a state instance from a state name.

get_state

Get a state definition by name.

is_start_state

Check if a state is a start state.

is_end_state

Check if a state is an end state.

get_start_state

Get the start state definition.

get_all_states_dict

Get all states from all networks.

get_outgoing_arcs

Get outgoing arcs from a state.

get_engine

Get or create the execution engine.

get_async_engine

Get or create the async execution engine.

execute_async

Execute the FSM asynchronously with initial data.

execute

Execute the FSM synchronously with initial data.

Source code in packages/fsm/src/dataknobs_fsm/core/fsm.py
def __init__(
    self,
    name: str,
    data_mode: ProcessingMode = ProcessingMode.SINGLE,
    transaction_mode: TransactionMode = TransactionMode.NONE,
    description: str | None = None,
    resource_manager: Any | None = None,
    transaction_manager: Any | None = None
):
    """Initialize FSM.

    Args:
        name: Name of the FSM.
        data_mode: Data processing mode.
        transaction_mode: Transaction handling mode.
        description: Optional FSM description.
    """
    self.name = name
    self.data_mode = data_mode
    self.transaction_mode = transaction_mode
    self.description = description

    # Networks
    self.networks: Dict[str, StateNetwork] = {}
    self.main_network_name: str | None = None

    # Function registry
    self.function_registry = FunctionRegistry()

    # Resource requirements
    self.resource_requirements: Dict[str, Any] = {}

    # Configuration — set to the FSMConfig (Pydantic model) by
    # FSMBuilder.build(); typed as Any because callers may also
    # assign a plain dict.
    self.config: Any = {}

    # Metadata
    self.metadata: Dict[str, Any] = {}
    self.version: str = "1.0.0"
    self.created_at: float | None = None
    self.updated_at: float | None = None

    # Execution support (from builder FSM wrapper)
    self.resource_manager = resource_manager
    self.transaction_manager = transaction_manager
    self._engine: Any | None = None  # ExecutionEngine
    self._async_engine: Any | None = None  # AsyncExecutionEngine
Attributes
main_network property
main_network: Optional[StateNetwork]

Get the main network object.

Returns:

Type Description
Optional[StateNetwork]

The main StateNetwork object or None if not set

states property
states: Dict[str, StateDefinition]

Get all states from the main network.

Returns:

Type Description
Dict[str, StateDefinition]

Dictionary of state_name -> state_definition for the main network

Functions
add_network
add_network(network: StateNetwork, is_main: bool = False) -> None

Add a network to the FSM.

Parameters:

Name Type Description Default
network StateNetwork

Network to add.

required
is_main bool

Whether this is the main network.

False
Source code in packages/fsm/src/dataknobs_fsm/core/fsm.py
def add_network(
    self,
    network: StateNetwork,
    is_main: bool = False
) -> None:
    """Add a network to the FSM.

    Args:
        network: Network to add.
        is_main: Whether this is the main network.
    """
    self.networks[network.name] = network

    if is_main or self.main_network_name is None:
        self.main_network_name = network.name

    # Aggregate resource requirements
    for resource_type, requirements in network.resource_requirements.items():
        if resource_type not in self.resource_requirements:
            self.resource_requirements[resource_type] = set()
        self.resource_requirements[resource_type].update(requirements)
remove_network
remove_network(network_name: str) -> bool

Remove a network from the FSM.

Parameters:

Name Type Description Default
network_name str

Name of network to remove.

required

Returns:

Type Description
bool

True if removed successfully.

Source code in packages/fsm/src/dataknobs_fsm/core/fsm.py
def remove_network(self, network_name: str) -> bool:
    """Remove a network from the FSM.

    Args:
        network_name: Name of network to remove.

    Returns:
        True if removed successfully.
    """
    if network_name in self.networks:
        del self.networks[network_name]

        # Update main network if needed
        if self.main_network_name == network_name:
            if self.networks:
                self.main_network_name = next(iter(self.networks.keys()))
            else:
                self.main_network_name = None

        return True
    return False
get_network
get_network(network_name: str | None = None) -> StateNetwork | None

Get a network by name.

Parameters:

Name Type Description Default
network_name str | None

Name of network (None for main network).

None

Returns:

Type Description
StateNetwork | None

Network or None if not found.

Source code in packages/fsm/src/dataknobs_fsm/core/fsm.py
def get_network(self, network_name: str | None = None) -> StateNetwork | None:
    """Get a network by name.

    Args:
        network_name: Name of network (None for main network).

    Returns:
        Network or None if not found.
    """
    if network_name is None:
        network_name = self.main_network_name

    if network_name:
        return self.networks.get(network_name)
    return None
validate
validate() -> Tuple[bool, List[str]]

Validate the FSM.

Returns:

Type Description
Tuple[bool, List[str]]

Tuple of (valid, list of errors).

Source code in packages/fsm/src/dataknobs_fsm/core/fsm.py
def validate(self) -> Tuple[bool, List[str]]:
    """Validate the FSM.

    Returns:
        Tuple of (valid, list of errors).
    """
    errors = []

    # Check for at least one network
    if not self.networks:
        errors.append("FSM has no networks")

    # Check main network exists
    if self.main_network_name and self.main_network_name not in self.networks:
        errors.append(f"Main network '{self.main_network_name}' not found")

    # Validate each network
    for network_name, network in self.networks.items():
        valid, network_errors = network.validate()
        if not valid:
            for error in network_errors:
                errors.append(f"Network '{network_name}': {error}")

    # Check function references
    all_functions = self._get_all_function_references()
    for func_name in all_functions:
        if not self.function_registry.get_function(func_name):
            errors.append(f"Function '{func_name}' not registered")

    return len(errors) == 0, errors
get_all_states
get_all_states() -> Dict[str, List[str]]

Get all states from all networks.

Returns:

Type Description
Dict[str, List[str]]

Dictionary of network_name -> list of state names.

Source code in packages/fsm/src/dataknobs_fsm/core/fsm.py
def get_all_states(self) -> Dict[str, List[str]]:
    """Get all states from all networks.

    Returns:
        Dictionary of network_name -> list of state names.
    """
    all_states = {}

    for network_name, network in self.networks.items():
        all_states[network_name] = list(network.states.keys())

    return all_states
get_all_arcs
get_all_arcs() -> Dict[str, List[str]]

Get all arcs from all networks.

Returns:

Type Description
Dict[str, List[str]]

Dictionary of network_name -> list of arc IDs.

Source code in packages/fsm/src/dataknobs_fsm/core/fsm.py
def get_all_arcs(self) -> Dict[str, List[str]]:
    """Get all arcs from all networks.

    Returns:
        Dictionary of network_name -> list of arc IDs.
    """
    all_arcs = {}

    for network_name, network in self.networks.items():
        all_arcs[network_name] = list(network.arcs.keys())

    return all_arcs
supports_streaming
supports_streaming() -> bool

Check if FSM supports streaming.

Returns:

Type Description
bool

True if any network supports streaming.

Source code in packages/fsm/src/dataknobs_fsm/core/fsm.py
def supports_streaming(self) -> bool:
    """Check if FSM supports streaming.

    Returns:
        True if any network supports streaming.
    """
    return any(network.supports_streaming for network in self.networks.values())
get_resource_summary
get_resource_summary() -> Dict[str, Any]

Get resource requirements summary.

Returns:

Type Description
Dict[str, Any]

Resource requirements summary.

Source code in packages/fsm/src/dataknobs_fsm/core/fsm.py
def get_resource_summary(self) -> Dict[str, Any]:
    """Get resource requirements summary.

    Returns:
        Resource requirements summary.
    """
    summary = {
        'total_networks': len(self.networks),
        'total_states': sum(len(n.states) for n in self.networks.values()),
        'total_arcs': sum(len(n.arcs) for n in self.networks.values()),
        'resource_types': list(self.resource_requirements.keys()),
        'supports_streaming': self.supports_streaming(),
        'data_mode': self.data_mode.value,
        'transaction_mode': self.transaction_mode.value
    }

    # Add resource counts
    for resource_type, requirements in self.resource_requirements.items():
        summary[f'{resource_type}_count'] = len(requirements)

    return summary
clone
clone() -> FSM

Create a clone of this FSM.

Returns:

Type Description
FSM

Cloned FSM.

Source code in packages/fsm/src/dataknobs_fsm/core/fsm.py
def clone(self) -> 'FSM':
    """Create a clone of this FSM.

    Returns:
        Cloned FSM.
    """
    clone = FSM(
        name=f"{self.name}_clone",
        data_mode=self.data_mode,
        transaction_mode=self.transaction_mode,
        description=self.description
    )

    # Clone networks
    for network_name, network in self.networks.items():
        # Note: This is a shallow copy - for deep clone would need to implement network.clone()
        clone.networks[network_name] = network

    clone.main_network_name = self.main_network_name
    clone.function_registry = self.function_registry
    clone.resource_requirements = self.resource_requirements.copy()
    clone.config = self.config.copy()
    clone.metadata = self.metadata.copy()
    clone.version = self.version

    return clone
to_dict
to_dict() -> Dict[str, Any]

Convert FSM to dictionary representation.

Returns:

Type Description
Dict[str, Any]

Dictionary representation.

Source code in packages/fsm/src/dataknobs_fsm/core/fsm.py
def to_dict(self) -> Dict[str, Any]:
    """Convert FSM to dictionary representation.

    Returns:
        Dictionary representation.
    """
    return {
        'name': self.name,
        'description': self.description,
        'data_mode': self.data_mode.value,
        'transaction_mode': self.transaction_mode.value,
        'main_network': self.main_network_name,
        'networks': list(self.networks.keys()),
        'resource_requirements': {
            k: list(v) if isinstance(v, set) else v
            for k, v in self.resource_requirements.items()
        },
        'config': self.config,
        'metadata': self.metadata,
        'version': self.version
    }
from_dict classmethod
from_dict(data: Dict[str, Any]) -> FSM

Create FSM from dictionary representation.

Parameters:

Name Type Description Default
data Dict[str, Any]

Dictionary with FSM data.

required

Returns:

Type Description
FSM

New FSM instance.

Source code in packages/fsm/src/dataknobs_fsm/core/fsm.py
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'FSM':
    """Create FSM from dictionary representation.

    Args:
        data: Dictionary with FSM data.

    Returns:
        New FSM instance.
    """
    fsm = cls(
        name=data['name'],
        data_mode=ProcessingMode(data.get('data_mode', 'single')),
        transaction_mode=TransactionMode(data.get('transaction_mode', 'none')),
        description=data.get('description')
    )

    fsm.main_network_name = data.get('main_network')
    fsm.config = data.get('config', {})
    fsm.metadata = data.get('metadata', {})
    fsm.version = data.get('version', '1.0.0')

    # Resource requirements
    for resource_type, requirements in data.get('resource_requirements', {}).items():
        fsm.resource_requirements[resource_type] = set(requirements)

    return fsm
find_state_definition
find_state_definition(
    state_name: str, network_name: str | None = None
) -> StateDefinition | None

Find a state definition by name.

Parameters:

Name Type Description Default
state_name str

Name of the state to find

required
network_name str | None

Optional specific network to search in

None

Returns:

Type Description
StateDefinition | None

StateDefinition if found, None otherwise

Source code in packages/fsm/src/dataknobs_fsm/core/fsm.py
def find_state_definition(self, state_name: str, network_name: str | None = None) -> StateDefinition | None:
    """Find a state definition by name.

    Args:
        state_name: Name of the state to find
        network_name: Optional specific network to search in

    Returns:
        StateDefinition if found, None otherwise
    """
    if network_name:
        # Search specific network
        network = self.networks.get(network_name)
        if network and hasattr(network, 'states'):
            return network.states.get(state_name)
    else:
        # Search all networks
        for network in self.networks.values():
            if hasattr(network, 'states') and state_name in network.states:
                return network.states[state_name]

    return None
create_state_instance
create_state_instance(
    state_name: str,
    data: Dict[str, Any] | None = None,
    network_name: str | None = None,
) -> StateInstance

Create a state instance from a state name.

Parameters:

Name Type Description Default
state_name str

Name of the state

required
data Dict[str, Any] | None

Optional initial data for the state

None
network_name str | None

Optional specific network to search in

None

Returns:

Type Description
StateInstance

StateInstance object

Source code in packages/fsm/src/dataknobs_fsm/core/fsm.py
def create_state_instance(self, state_name: str, data: Dict[str, Any] | None = None, network_name: str | None = None) -> StateInstance:
    """Create a state instance from a state name.

    Args:
        state_name: Name of the state
        data: Optional initial data for the state
        network_name: Optional specific network to search in

    Returns:
        StateInstance object
    """
    # Try to find existing state definition
    state_def = self.find_state_definition(state_name, network_name)

    if not state_def:
        # Create minimal state definition
        state_def = StateDefinition(
            name=state_name,
            type=StateType.START if state_name in ['start', 'Start', 'START'] else StateType.NORMAL
        )

    # Create and return state instance
    return StateInstance(
        definition=state_def,
        data=data or {}
    )
get_state
get_state(
    state_name: str, network_name: str | None = None
) -> StateDefinition | None

Get a state definition by name.

This is an alias for find_state_definition for compatibility.

Parameters:

Name Type Description Default
state_name str

Name of the state

required
network_name str | None

Optional specific network to search in

None

Returns:

Type Description
StateDefinition | None

StateDefinition if found, None otherwise

Source code in packages/fsm/src/dataknobs_fsm/core/fsm.py
def get_state(self, state_name: str, network_name: str | None = None) -> StateDefinition | None:
    """Get a state definition by name.

    This is an alias for find_state_definition for compatibility.

    Args:
        state_name: Name of the state
        network_name: Optional specific network to search in

    Returns:
        StateDefinition if found, None otherwise
    """
    return self.find_state_definition(state_name, network_name)
is_start_state
is_start_state(state_name: str, network_name: str | None = None) -> bool

Check if a state is a start state.

Parameters:

Name Type Description Default
state_name str

Name of the state

required
network_name str | None

Optional specific network to check in (defaults to main network)

None

Returns:

Type Description
bool

True if the state is a start state

Source code in packages/fsm/src/dataknobs_fsm/core/fsm.py
def is_start_state(self, state_name: str, network_name: str | None = None) -> bool:
    """Check if a state is a start state.

    Args:
        state_name: Name of the state
        network_name: Optional specific network to check in (defaults to main network)

    Returns:
        True if the state is a start state
    """
    network_name = network_name or self.main_network_name
    if network_name:
        network = self.networks.get(network_name)
        if network:
            return network.is_initial_state(state_name)
    return False
is_end_state
is_end_state(state_name: str, network_name: str | None = None) -> bool

Check if a state is an end state.

Parameters:

Name Type Description Default
state_name str

Name of the state

required
network_name str | None

Optional specific network to check in (defaults to main network)

None

Returns:

Type Description
bool

True if the state is an end state

Source code in packages/fsm/src/dataknobs_fsm/core/fsm.py
def is_end_state(self, state_name: str, network_name: str | None = None) -> bool:
    """Check if a state is an end state.

    Args:
        state_name: Name of the state
        network_name: Optional specific network to check in (defaults to main network)

    Returns:
        True if the state is an end state
    """
    network_name = network_name or self.main_network_name
    if network_name:
        network = self.networks.get(network_name)
        if network:
            return network.is_final_state(state_name)
    return False
get_start_state
get_start_state(network_name: str | None = None) -> StateDefinition | None

Get the start state definition.

Parameters:

Name Type Description Default
network_name str | None

Optional specific network to search in

None

Returns:

Type Description
StateDefinition | None

Start state definition if found, None otherwise

Source code in packages/fsm/src/dataknobs_fsm/core/fsm.py
def get_start_state(self, network_name: str | None = None) -> StateDefinition | None:
    """Get the start state definition.

    Args:
        network_name: Optional specific network to search in

    Returns:
        Start state definition if found, None otherwise
    """
    # If network specified, search that network
    if network_name:
        network = self.networks.get(network_name)
        if network and hasattr(network, 'states'):
            for state in network.states.values():
                if (hasattr(state, 'is_start_state') and state.is_start_state()) or (hasattr(state, 'type') and state.type == StateType.START):
                    return state
    else:
        # Search main network first
        if self.main_network_name:
            start_state = self.get_start_state(self.main_network_name)
            if start_state:
                return start_state

        # Search all networks
        for network in self.networks.values():
            if hasattr(network, 'states'):
                for state in network.states.values():
                    if (hasattr(state, 'is_start_state') and state.is_start_state()) or (hasattr(state, 'type') and state.type == StateType.START):
                        return state

    # Fallback: look for state named 'start'
    return self.find_state_definition('start', network_name)
get_all_states_dict
get_all_states_dict() -> Dict[str, Dict[str, StateDefinition]]

Get all states from all networks.

Returns:

Type Description
Dict[str, Dict[str, StateDefinition]]

Dictionary of network_name -> {state_name -> state_definition}

Source code in packages/fsm/src/dataknobs_fsm/core/fsm.py
def get_all_states_dict(self) -> Dict[str, Dict[str, StateDefinition]]:
    """Get all states from all networks.

    Returns:
        Dictionary of network_name -> {state_name -> state_definition}
    """
    all_states = {}
    for network_name, network in self.networks.items():
        if hasattr(network, 'states'):
            all_states[network_name] = network.states
    return all_states
get_outgoing_arcs
get_outgoing_arcs(
    state_name: str, network_name: str | None = None
) -> List[Any]

Get outgoing arcs from a state.

Parameters:

Name Type Description Default
state_name str

Name of the state

required
network_name str | None

Optional network name (uses main network if None)

None

Returns:

Type Description
List[Any]

List of outgoing arcs from the state

Source code in packages/fsm/src/dataknobs_fsm/core/fsm.py
def get_outgoing_arcs(self, state_name: str, network_name: str | None = None) -> List[Any]:
    """Get outgoing arcs from a state.

    Args:
        state_name: Name of the state
        network_name: Optional network name (uses main network if None)

    Returns:
        List of outgoing arcs from the state
    """
    network_name = network_name or self.main_network_name
    if not network_name:
        return []

    network = self.get_network(network_name)
    if network:
        return network.get_arcs_from_state(state_name)
    return []
get_engine
get_engine(strategy: str | None = None) -> ExecutionEngine

Get or create the execution engine.

Parameters:

Name Type Description Default
strategy str | None

Optional execution strategy override

None

Returns:

Type Description
ExecutionEngine

ExecutionEngine instance.

Source code in packages/fsm/src/dataknobs_fsm/core/fsm.py
def get_engine(self, strategy: str | None = None) -> "ExecutionEngine":
    """Get or create the execution engine.

    Args:
        strategy: Optional execution strategy override

    Returns:
        ExecutionEngine instance.
    """
    if self._engine is None:
        from dataknobs_fsm.execution.engine import ExecutionEngine, TraversalStrategy

        # Map strategy strings to enum
        strategy_map = {
            "depth_first": TraversalStrategy.DEPTH_FIRST,
            "breadth_first": TraversalStrategy.BREADTH_FIRST,
            "resource_optimized": TraversalStrategy.RESOURCE_OPTIMIZED,
            "stream_optimized": TraversalStrategy.STREAM_OPTIMIZED,
        }

        strat = TraversalStrategy.DEPTH_FIRST  # Default
        if strategy and strategy in strategy_map:
            strat = strategy_map[strategy]

        self._engine = ExecutionEngine(
            fsm=self,
            strategy=strat,
        )

    return self._engine
get_async_engine
get_async_engine(strategy: str | None = None) -> AsyncExecutionEngine

Get or create the async execution engine.

Parameters:

Name Type Description Default
strategy str | None

Optional execution strategy override

None

Returns:

Type Description
AsyncExecutionEngine

AsyncExecutionEngine instance.

Source code in packages/fsm/src/dataknobs_fsm/core/fsm.py
def get_async_engine(self, strategy: str | None = None) -> "AsyncExecutionEngine":
    """Get or create the async execution engine.

    Args:
        strategy: Optional execution strategy override

    Returns:
        AsyncExecutionEngine instance.
    """
    if self._async_engine is None:
        from dataknobs_fsm.execution.async_engine import AsyncExecutionEngine

        self._async_engine = AsyncExecutionEngine(fsm=self)

    return self._async_engine
execute_async async
execute_async(initial_data: Dict[str, Any] | None = None) -> Any

Execute the FSM asynchronously with initial data.

Parameters:

Name Type Description Default
initial_data Dict[str, Any] | None

Initial data for execution.

None

Returns:

Type Description
Any

Execution result.

Source code in packages/fsm/src/dataknobs_fsm/core/fsm.py
async def execute_async(self, initial_data: Dict[str, Any] | None = None) -> Any:
    """Execute the FSM asynchronously with initial data.

    Args:
        initial_data: Initial data for execution.

    Returns:
        Execution result.
    """
    import time

    try:
        # Get the async execution engine
        engine = self.get_async_engine()

        # Prepare execution context
        context = self._prepare_execution_context(initial_data)

        # Track execution time
        start_time = time.time()

        # Execute the FSM
        success, result = await engine.execute(
            context, 
            initial_data if self.data_mode == ProcessingMode.SINGLE else None
        )

        # Calculate duration
        duration = time.time() - start_time

        return self._format_execution_result(success, result, context, duration)

    except Exception as e:
        # Handle any exception that occurs during execution
        return self._format_execution_result(
            False, None, None, 0.0, initial_data, str(e)
        )
execute
execute(initial_data: Dict[str, Any] | None = None) -> Any

Execute the FSM synchronously with initial data.

This is a simplified API for running the FSM.

Parameters:

Name Type Description Default
initial_data Dict[str, Any] | None

Initial data for execution.

None

Returns:

Type Description
Any

Execution result.

Source code in packages/fsm/src/dataknobs_fsm/core/fsm.py
def execute(self, initial_data: Dict[str, Any] | None = None) -> Any:
    """Execute the FSM synchronously with initial data.

    This is a simplified API for running the FSM.

    Args:
        initial_data: Initial data for execution.

    Returns:
        Execution result.
    """
    import time

    try:
        # Get the execution engine
        engine = self.get_engine()

        # Prepare execution context
        context = self._prepare_execution_context(initial_data)

        # Track execution time
        start_time = time.time()

        # Execute the FSM
        success, result = engine.execute(
            context, 
            initial_data if self.data_mode == ProcessingMode.SINGLE else None
        )

        # Calculate duration
        duration = time.time() - start_time

        return self._format_execution_result(success, result, context, duration)

    except Exception as e:
        # Handle any exception that occurs during execution
        return self._format_execution_result(
            False, None, None, 0.0, initial_data, str(e)
        )

StateDefinition dataclass

StateDefinition(
    name: str,
    type: StateType = StateType.NORMAL,
    description: str = "",
    metadata: Dict[str, Any] = dict(),
    schema: StateSchema | None = None,
    data_mode: DataHandlingMode | None = None,
    resource_requirements: List[ResourceConfig] = list(),
    pre_validation_functions: List[IValidationFunction] = list(),
    validation_functions: List[IValidationFunction] = list(),
    transform_functions: List[ITransformFunction] = list(),
    end_test_function: IEndStateTestFunction | None = None,
    outgoing_arcs: List[ArcDefinition] = list(),
    timeout: float | None = None,
    retry_count: int = 0,
    retry_delay: float = 1.0,
)

Definition of a state in the FSM.

StateDefinition is the static blueprint for a state, defining its structure, schema, functions, resources, and execution configuration. It is immutable once created and can be reused across multiple executions.

Architecture

StateDefinition follows a declarative design pattern: - Define structure, not behavior - Functions referenced by interface, not implementation - Configuration-driven execution - Immutable and reusable

Key Components

Identity: - name: Unique identifier within network - type: Role in workflow (START/END/NORMAL/etc) - description: Human-readable description - metadata: Additional custom attributes

Data Validation: - schema: Optional StateSchema for data validation - pre_validation_functions: Run before state entry - validation_functions: Run after entry, before transform - Ensures data quality throughout workflow

Data Transformation: - transform_functions: Modify data during processing - Multiple transforms chain in order - Each returns modified data

Resource Requirements: - resource_requirements: List of ResourceConfig - Database connections, API clients, etc. - Acquired on entry, released on exit

Execution Configuration: - timeout: Max execution time in seconds - retry_count: Number of retry attempts on failure - retry_delay: Delay between retries in seconds - data_mode: Preferred data handling mode

Network Integration: - outgoing_arcs: List of transitions to other states - end_test_function: Determines if this is an end state

Data Modes

States can specify a preferred data_mode: - COPY: Deep copy for thread safety (default) - REFERENCE: Lazy loading for memory efficiency - DIRECT: In-place modification for performance - None: Inherit from FSM-level configuration

Validation

Multiple levels of validation: 1. Schema validation (structural) 2. Pre-validation functions (business logic before entry) 3. Validation functions (business logic after entry)

Resource Management

States declare resource requirements: - Specified as ResourceConfig objects - Include resource type, name, and configuration - Execution engine acquires before state entry - Released after state exit or on error

Error Handling

Built-in retry support: - retry_count: Number of attempts (0 = no retry) - retry_delay: Delay between attempts in seconds - Exponential backoff can be implemented in retry logic - After all retries exhausted, state fails

Note

StateDefinition is immutable after creation. To modify a state, create a new StateDefinition. This ensures consistency when the same definition is used across multiple executions.

Functions are stored as references (interfaces), not implementations. The actual function implementations are registered in the FSM's FunctionRegistry.

See Also
  • :class:~dataknobs_fsm.core.state.StateInstance: Runtime instance
  • :class:~dataknobs_fsm.core.state.StateSchema: Data schema
  • :class:~dataknobs_fsm.core.state.StateType: State type enum
  • :class:~dataknobs_fsm.functions.base.IValidationFunction: Validation interface
  • :class:~dataknobs_fsm.functions.base.ITransformFunction: Transform interface

Methods:

Name Description
is_start_state

Check if this is a start state.

is_end_state

Check if this is an end state.

validate_data

Validate data against state schema.

add_pre_validation_function

Add a pre-validation function.

add_validation_function

Add a validation function.

add_transform_function

Add a transform function.

add_outgoing_arc

Add an outgoing arc.

Attributes:

Name Type Description
is_start bool

Property alias for is_start_state().

is_end bool

Property alias for is_end_state().

arcs List[ArcDefinition]

Get the outgoing arcs from this state.

Attributes
is_start property
is_start: bool

Property alias for is_start_state().

is_end property
is_end: bool

Property alias for is_end_state().

arcs property
arcs: List[ArcDefinition]

Get the outgoing arcs from this state.

Functions
is_start_state
is_start_state() -> bool

Check if this is a start state.

Returns:

Type Description
bool

True if this is a start state.

Source code in packages/fsm/src/dataknobs_fsm/core/state.py
def is_start_state(self) -> bool:
    """Check if this is a start state.

    Returns:
        True if this is a start state.
    """
    return self.type == StateType.START
is_end_state
is_end_state() -> bool

Check if this is an end state.

Returns:

Type Description
bool

True if this is an end state.

Source code in packages/fsm/src/dataknobs_fsm/core/state.py
def is_end_state(self) -> bool:
    """Check if this is an end state.

    Returns:
        True if this is an end state.
    """
    return self.type == StateType.END
validate_data
validate_data(data: Dict[str, Any]) -> Tuple[bool, List[str]]

Validate data against state schema.

Parameters:

Name Type Description Default
data Dict[str, Any]

Data to validate.

required

Returns:

Type Description
Tuple[bool, List[str]]

Tuple of (is_valid, error_messages).

Source code in packages/fsm/src/dataknobs_fsm/core/state.py
def validate_data(self, data: Dict[str, Any]) -> Tuple[bool, List[str]]:
    """Validate data against state schema.

    Args:
        data: Data to validate.

    Returns:
        Tuple of (is_valid, error_messages).
    """
    if self.schema is None:
        return True, []
    return self.schema.validate(data)
add_pre_validation_function
add_pre_validation_function(func: IValidationFunction) -> None

Add a pre-validation function.

Parameters:

Name Type Description Default
func IValidationFunction

Pre-validation function to add.

required
Source code in packages/fsm/src/dataknobs_fsm/core/state.py
def add_pre_validation_function(self, func: IValidationFunction) -> None:
    """Add a pre-validation function.

    Args:
        func: Pre-validation function to add.
    """
    self.pre_validation_functions.append(func)
add_validation_function
add_validation_function(func: IValidationFunction) -> None

Add a validation function.

Parameters:

Name Type Description Default
func IValidationFunction

Validation function to add.

required
Source code in packages/fsm/src/dataknobs_fsm/core/state.py
def add_validation_function(self, func: IValidationFunction) -> None:
    """Add a validation function.

    Args:
        func: Validation function to add.
    """
    self.validation_functions.append(func)
add_transform_function
add_transform_function(func: ITransformFunction) -> None

Add a transform function.

Parameters:

Name Type Description Default
func ITransformFunction

Transform function to add.

required
Source code in packages/fsm/src/dataknobs_fsm/core/state.py
def add_transform_function(self, func: ITransformFunction) -> None:
    """Add a transform function.

    Args:
        func: Transform function to add.
    """
    self.transform_functions.append(func)
add_outgoing_arc
add_outgoing_arc(arc: ArcDefinition) -> None

Add an outgoing arc.

Parameters:

Name Type Description Default
arc ArcDefinition

Arc definition to add.

required
Source code in packages/fsm/src/dataknobs_fsm/core/state.py
def add_outgoing_arc(self, arc: "ArcDefinition") -> None:
    """Add an outgoing arc.

    Args:
        arc: Arc definition to add.
    """
    self.outgoing_arcs.append(arc)

StateInstance dataclass

StateInstance(
    id: str = (lambda: str(uuid4()))(),
    definition: StateDefinition = None,
    status: StateStatus = StateStatus.PENDING,
    data: Dict[str, Any] = dict(),
    data_mode_manager: DataModeManager | None = None,
    data_handler: Any | None = None,
    transaction: Transaction | None = None,
    acquired_resources: Dict[str, Any] = dict(),
    entry_time: datetime | None = None,
    exit_time: datetime | None = None,
    execution_count: int = 0,
    error_count: int = 0,
    last_error: str | None = None,
    executed_arcs: List[str] = list(),
    next_state: str | None = None,
)

Runtime instance of a state.

StateInstance represents a single execution of a StateDefinition within a workflow. It holds the runtime data, tracks execution status, manages resources, and implements the state lifecycle.

Architecture

StateInstance follows the Instance pattern: - One StateDefinition → Many StateInstances - Each instance is independent - Instance holds mutable execution state - Definition holds immutable structure

Lifecycle

StateInstance progresses through a defined lifecycle:

1. Creation:

from dataknobs_fsm.core.state import StateDefinition, StateInstance, StateType

# Define state first
state_def = StateDefinition(name="process", type=StateType.NORMAL)

# Create instance
instance = StateInstance(definition=state_def)
# Status: PENDING
# No data yet

2. Entry:

instance.enter(input_data)
# Status: ACTIVE
# Data mode handler applies transformations
# Resources acquired
# Execution tracking begins

3. Processing:

# Validation, transformation happen
instance.modify_data(updates)
# Can pause/resume if needed

4. Exit:

result_data = instance.exit(commit=True)
# Status: COMPLETED (or FAILED)
# Data mode handler commits changes
# Resources released
# Duration calculated

5. Error Handling:

instance.fail(error_message)
# Status: FAILED
# Error tracking updated
# Can retry if retry_count > 0

Attributes:

Name Type Description
id str

Unique identifier for this instance (UUID)

definition StateDefinition

The state blueprint being executed

status StateStatus

Current execution status (PENDING/ACTIVE/COMPLETED/FAILED/etc)

data Dict[str, Any]

Current state data

data_mode_manager DataModeManager | None

Manages data handling modes

data_handler Any | None

Active data mode handler (COPY/REFERENCE/DIRECT)

transaction Transaction | None

Active transaction if using transactional mode

acquired_resources Dict[str, Any]

Resources acquired for this instance

entry_time datetime | None

When state was entered

exit_time datetime | None

When state was exited

execution_count int

Number of times state has been entered

error_count int

Number of errors encountered

last_error str | None

Most recent error message

executed_arcs List[str]

IDs of arcs executed from this state

next_state str | None

Name of next state to transition to

Data Handling

StateInstance uses data mode handlers to manage how data flows:

COPY Mode: - Deep copy on entry via data_handler.on_entry() - Modifications to local copy - Commit on exit via data_handler.on_exit(commit=True) - Thread-safe, memory-intensive

REFERENCE Mode: - Lazy loading with version tracking - Optimistic locking for conflicts - on_modification() tracks changes - Memory-efficient

DIRECT Mode: - In-place modification - No copying overhead - Fastest performance - Not thread-safe

Resource Management

StateInstance tracks acquired resources:

Lifecycle: 1. acquire: add_resource(name, handle) 2. use: get_resource(name) → handle 3. release: release_resources()

Automatic Cleanup: - Resources released on exit() - Resources released on fail() - Ensures no resource leaks

Execution Tracking

StateInstance tracks detailed execution metrics:

Timing: - entry_time: When processing started - exit_time: When processing completed - get_duration(): Calculates elapsed time

Counts: - execution_count: Total entries (for retries) - error_count: Total errors encountered - executed_arcs: History of transitions

State: - status: Current execution status - next_state: Determined next state - last_error: Most recent error message

Transaction Support

StateInstance participates in transactions:

Integration: - transaction field holds active transaction - Data changes logged to transaction - Rollback on fail() - Commit on exit(commit=True)

Modes: - NONE: No transaction tracking - OPTIMISTIC: Commit at workflow end - PESSIMISTIC: Commit at each state exit

Methods:

Name Description
**Lifecycle

**

- enter

Enter state with data

- exit

Exit and finalize

- fail

Mark as failed

- skip

Skip this state

**Control

**

- pause

Temporarily pause

- resume

Resume from pause

- modify_data

Update state data

**Resources

**

- add_resource

Acquire resource

- get_resource

Get acquired resource

- release_resources

Release all resources

**Tracking

**

- record_arc_execution

Track transition

- get_duration

Get execution time

- to_dict

Serialize to dictionary

Note

StateInstance is managed by execution engines. Users typically don't create or manipulate instances directly, but may observe them via debugging tools or execution results.

Each workflow execution creates fresh StateInstances, even when reusing StateDefinitions. This ensures execution isolation.

See Also
  • :class:~dataknobs_fsm.core.state.StateDefinition: State blueprint
  • :class:~dataknobs_fsm.core.state.StateStatus: Status enum
  • :class:~dataknobs_fsm.core.data_modes.DataModeManager: Data mode manager
  • :class:~dataknobs_fsm.core.transactions.Transaction: Transaction support
Functions
__post_init__
__post_init__()

Initialize data mode manager if not provided.

Source code in packages/fsm/src/dataknobs_fsm/core/state.py
def __post_init__(self):
    """Initialize data mode manager if not provided."""
    if self.data_mode_manager is None:
        # Use definition's data_mode if available and not None, else default to COPY
        default_mode = DataHandlingMode.COPY
        if self.definition and self.definition.data_mode:
            default_mode = self.definition.data_mode
        self.data_mode_manager = DataModeManager(default_mode)
enter
enter(input_data: Dict[str, Any]) -> None

Enter the state with input data.

Parameters:

Name Type Description Default
input_data Dict[str, Any]

Input data for the state.

required
Source code in packages/fsm/src/dataknobs_fsm/core/state.py
def enter(self, input_data: Dict[str, Any]) -> None:
    """Enter the state with input data.

    Args:
        input_data: Input data for the state.
    """
    self.status = StateStatus.ACTIVE
    self.entry_time = datetime.now()
    self.execution_count += 1

    # Apply data mode
    if self.data_mode_manager:
        mode = self.definition.data_mode if self.definition and self.definition.data_mode else self.data_mode_manager.default_mode
        self.data_handler = self.data_mode_manager.get_handler(mode)
        self.data = self.data_handler.on_entry(input_data)
    else:
        self.data = input_data
exit
exit(commit: bool = True) -> Dict[str, Any]

Exit the state.

Parameters:

Name Type Description Default
commit bool

Whether to commit data changes.

True

Returns:

Type Description
Dict[str, Any]

The final state data.

Source code in packages/fsm/src/dataknobs_fsm/core/state.py
def exit(self, commit: bool = True) -> Dict[str, Any]:
    """Exit the state.

    Args:
        commit: Whether to commit data changes.

    Returns:
        The final state data.
    """
    self.exit_time = datetime.now()

    # Handle data mode exit
    if self.data_handler:
        self.data = self.data_handler.on_exit(self.data, commit)

    if self.status == StateStatus.ACTIVE:
        self.status = StateStatus.COMPLETED

    return self.data
fail
fail(error: str) -> None

Mark the state as failed.

Parameters:

Name Type Description Default
error str

Error message.

required
Source code in packages/fsm/src/dataknobs_fsm/core/state.py
def fail(self, error: str) -> None:
    """Mark the state as failed.

    Args:
        error: Error message.
    """
    self.status = StateStatus.FAILED
    self.error_count += 1
    self.last_error = error
    self.exit_time = datetime.now()
pause
pause() -> None

Pause state execution.

Source code in packages/fsm/src/dataknobs_fsm/core/state.py
def pause(self) -> None:
    """Pause state execution."""
    if self.status == StateStatus.ACTIVE:
        self.status = StateStatus.PAUSED
resume
resume() -> None

Resume paused state execution.

Source code in packages/fsm/src/dataknobs_fsm/core/state.py
def resume(self) -> None:
    """Resume paused state execution."""
    if self.status == StateStatus.PAUSED:
        self.status = StateStatus.ACTIVE
skip
skip() -> None

Skip this state.

Source code in packages/fsm/src/dataknobs_fsm/core/state.py
def skip(self) -> None:
    """Skip this state."""
    self.status = StateStatus.SKIPPED
    self.exit_time = datetime.now()
modify_data
modify_data(updates: Dict[str, Any]) -> None

Modify state data.

Parameters:

Name Type Description Default
updates Dict[str, Any]

Data updates to apply.

required
Source code in packages/fsm/src/dataknobs_fsm/core/state.py
def modify_data(self, updates: Dict[str, Any]) -> None:
    """Modify state data.

    Args:
        updates: Data updates to apply.
    """
    if self.data_handler:
        # Let the data handler manage modifications
        self.data.update(updates)
        self.data = self.data_handler.on_modification(self.data)
    else:
        self.data.update(updates)
add_resource
add_resource(name: str, resource: Any) -> None

Add an acquired resource.

Parameters:

Name Type Description Default
name str

Resource name.

required
resource Any

The resource handle/connection.

required
Source code in packages/fsm/src/dataknobs_fsm/core/state.py
def add_resource(self, name: str, resource: Any) -> None:
    """Add an acquired resource.

    Args:
        name: Resource name.
        resource: The resource handle/connection.
    """
    self.acquired_resources[name] = resource
get_resource
get_resource(name: str) -> Any | None

Get an acquired resource.

Parameters:

Name Type Description Default
name str

Resource name.

required

Returns:

Type Description
Any | None

The resource if available.

Source code in packages/fsm/src/dataknobs_fsm/core/state.py
def get_resource(self, name: str) -> Any | None:
    """Get an acquired resource.

    Args:
        name: Resource name.

    Returns:
        The resource if available.
    """
    return self.acquired_resources.get(name)
release_resources
release_resources() -> None

Release all acquired resources.

Source code in packages/fsm/src/dataknobs_fsm/core/state.py
def release_resources(self) -> None:
    """Release all acquired resources."""
    self.acquired_resources.clear()
record_arc_execution
record_arc_execution(arc_id: str) -> None

Record that an arc was executed.

Parameters:

Name Type Description Default
arc_id str

ID of the executed arc.

required
Source code in packages/fsm/src/dataknobs_fsm/core/state.py
def record_arc_execution(self, arc_id: str) -> None:
    """Record that an arc was executed.

    Args:
        arc_id: ID of the executed arc.
    """
    self.executed_arcs.append(arc_id)
get_duration
get_duration() -> float | None

Get execution duration in seconds.

Returns:

Type Description
float | None

Duration in seconds if available.

Source code in packages/fsm/src/dataknobs_fsm/core/state.py
def get_duration(self) -> float | None:
    """Get execution duration in seconds.

    Returns:
        Duration in seconds if available.
    """
    if self.entry_time and self.exit_time:
        return (self.exit_time - self.entry_time).total_seconds()
    elif self.entry_time:
        return (datetime.now() - self.entry_time).total_seconds()
    return None
to_dict
to_dict() -> Dict[str, Any]

Convert to dictionary representation.

Returns:

Type Description
Dict[str, Any]

Dictionary with state instance data.

Source code in packages/fsm/src/dataknobs_fsm/core/state.py
def to_dict(self) -> Dict[str, Any]:
    """Convert to dictionary representation.

    Returns:
        Dictionary with state instance data.
    """
    return {
        "id": self.id,
        "name": self.definition.name if self.definition else None,
        "status": self.status.value,
        "data": self.data,
        "entry_time": self.entry_time.isoformat() if self.entry_time else None,
        "exit_time": self.exit_time.isoformat() if self.exit_time else None,
        "duration": self.get_duration(),
        "execution_count": self.execution_count,
        "error_count": self.error_count,
        "last_error": self.last_error,
        "executed_arcs": self.executed_arcs,
        "next_state": self.next_state,
    }

ArcDefinition dataclass

ArcDefinition(
    target_state: str,
    pre_test: str | None = None,
    transform: str | TransformSpec | list[str | TransformSpec] | None = None,
    priority: int = 0,
    definition_order: int = 0,
    metadata: Dict[str, Any] = dict(),
    required_resources: Dict[str, str] = dict(),
)

Definition of an arc between states.

This class defines the static properties of an arc, including the transition logic and resource requirements.

Methods:

Name Description
__hash__

Make ArcDefinition hashable.

Functions
__hash__
__hash__() -> int

Make ArcDefinition hashable.

Source code in packages/fsm/src/dataknobs_fsm/core/arc.py
def __hash__(self) -> int:
    """Make ArcDefinition hashable."""
    if isinstance(self.transform, list):
        transform_key = tuple(
            t.name if isinstance(t, TransformSpec) else t
            for t in self.transform
        )
    elif isinstance(self.transform, TransformSpec):
        transform_key = self.transform.name
    else:
        transform_key = self.transform
    return hash((
        self.target_state,
        self.pre_test,
        transform_key,
        self.priority
    ))

DataHandlingMode

Bases: Enum

Data handling modes for state instances - defines how data is managed within states.

SimpleFSM

SimpleFSM(
    config: str | Path | dict[str, Any],
    data_mode: DataHandlingMode = DataHandlingMode.COPY,
    resources: dict[str, Any] | None = None,
    custom_functions: dict[str, Callable] | None = None,
)

Synchronous FSM interface for simple workflows.

This class provides a purely synchronous API for FSM operations, internally using AsyncSimpleFSM with a dedicated event loop managed automatically in a background thread.

SimpleFSM is designed for ease of use in scripts, prototypes, and simple pipelines where async/await complexity is not desired. It handles all async execution transparently, providing a familiar synchronous interface.

Attributes:

Name Type Description
data_mode DataHandlingMode

Data processing mode (COPY/REFERENCE/DIRECT)

_async_fsm AsyncSimpleFSM

Internal async FSM implementation

_fsm FSM

Core FSM engine

_resource_manager ResourceManager

Resource lifecycle manager

_loop AbstractEventLoop

Dedicated event loop for async operations

_loop_thread Thread

Background thread running the event loop

Methods:

Name Description
process

Process a single record through the FSM

process_batch

Process multiple records in parallel batches

process_stream

Process a stream of data from file or iterator

validate

Validate data against FSM's start state schema

get_states

List all state names in the FSM

get_resources

List all registered resource names

close

Clean up resources and close connections

Use Cases

Data Transformation: Transform data through a pipeline of state functions. Each state receives the output of the previous state, enabling sequential transformations.

Data Validation: Validate data against schemas defined in state configurations. States can enforce data quality rules and reject invalid records.

File Processing: Process large files line-by-line or in chunks using process_stream(). Supports automatic format detection (JSON, JSONL, CSV, text).

Batch Processing: Process multiple records in parallel using process_batch(). Configurable batch size and worker count for optimal throughput.

ETL Workflows: Extract-Transform-Load pipelines where data flows through extraction, transformation, and loading states with error handling.

Note

Thread Safety: SimpleFSM manages its own event loop in a background thread. While the synchronous API is thread-safe, concurrent calls will serialize due to the single event loop. For true concurrent processing, use AsyncSimpleFSM with multiple event loops or process_batch() with max_workers > 1.

Resource Management: Always call close() when done to properly release resources. Use context managers (with statement) when available in client code, or ensure close() is called in a finally block.

Data Mode Selection: - Use COPY (default) for production: safe, predictable, memory-intensive - Use REFERENCE for large datasets: memory-efficient, moderate overhead - Use DIRECT for performance: fastest, but not thread-safe

Error Handling: The process() method returns a dict with 'success' and 'error' keys rather than raising exceptions. This allows for graceful error handling in batch processing scenarios.

Example

Basic usage with configuration file:

from dataknobs_fsm.api.simple import SimpleFSM

# Create FSM from YAML config
fsm = SimpleFSM('pipeline.yaml')

# Process single record
result = fsm.process({
    'text': 'Input text to process',
    'metadata': {'source': 'user'}
})

if result['success']:
    print(f"Result: {result['data']}")
    print(f"Path: {' -> '.join(result['path'])}")
else:
    print(f"Error: {result['error']}")

# Clean up
fsm.close()

With custom functions and resources:

from dataknobs_fsm.api.simple import SimpleFSM
from dataknobs_fsm.core.data_modes import DataHandlingMode

# Define custom state functions
def validate(data):
    if 'required_field' not in data:
        raise ValueError("Missing required field")
    return data

def transform(data):
    from datetime import datetime
    data['processed'] = True
    data['timestamp'] = datetime.now().isoformat()
    return data

# Create FSM with config dict
config = {
    'name': 'validation_pipeline',
    'states': [
        {'name': 'validate', 'type': 'START', 'function': 'validate'},
        {'name': 'transform', 'type': 'END', 'function': 'transform'}
    ],
    'arcs': [
        {'from': 'validate', 'to': 'transform'}
    ]
}

# Initialize with custom functions and resources
fsm = SimpleFSM(
    config=config,
    data_mode=DataHandlingMode.COPY,
    resources={
        'database': {
            'type': 'DATABASE',
            'backend': 'memory'
        }
    },
    custom_functions={
        'validate': validate,
        'transform': transform
    }
)

# Process data
result = fsm.process({'required_field': 'value'})
print(f"Success: {result['success']}")

fsm.close()

Batch processing with progress callback:

# Define progress callback
def on_progress(current, total):
    pct = (current / total) * 100
    print(f"Progress: {current}/{total} ({pct:.1f}%)")

# Process batch
records = [{'id': i, 'text': f'Record {i}'} for i in range(100)]
results = fsm.process_batch(
    data=records,
    batch_size=10,
    max_workers=4,
    on_progress=on_progress
)

# Check results
successful = sum(1 for r in results if r['success'])
print(f"Processed {successful}/{len(records)} successfully")
See Also
  • :class:AsyncSimpleFSM: Async version for production applications
  • :class:AdvancedFSM: Full control with debugging capabilities
  • :class:DataHandlingMode: Data processing mode options
  • :func:process_file: Convenience function for file processing
  • :func:batch_process: Convenience function for batch processing
  • :func:validate_data: Convenience function for data validation

Initialize SimpleFSM from configuration.

Parameters:

Name Type Description Default
config str | Path | dict[str, Any]

FSM configuration. Can be: - Path to YAML/JSON config file (str or Path) - Dictionary containing config (inline configuration) Must define states, arcs, and optionally resources.

required
data_mode DataHandlingMode

Data handling mode controlling how data is passed between states. Options: - DataHandlingMode.COPY (default): Deep copy for safety - DataHandlingMode.REFERENCE: Lazy loading with locking - DataHandlingMode.DIRECT: In-place modification (fastest)

COPY
resources dict[str, Any] | None

Optional resource configurations. Dict mapping resource names to configuration dicts. Each config must have a 'type' key (DATABASE, FILESYSTEM, HTTP, etc.) and type-specific parameters. Example: {'db': {'type': 'DATABASE', 'backend': 'postgres', ...}}

None
custom_functions dict[str, Callable] | None

Optional custom state functions. Dict mapping function names to callables. Functions receive data dict and return data dict. Function names must match 'function' fields in state definitions. Example: {'my_func': lambda data: {'result': data['input'] * 2}}

None
Example

From configuration file:

from dataknobs_fsm.api.simple import SimpleFSM

# Load from YAML file
fsm = SimpleFSM('config.yaml')

With inline configuration:

config = {
    'name': 'simple_pipeline',
    'states': [
        {'name': 'start', 'type': 'START'},
        {'name': 'process', 'type': 'NORMAL', 'function': 'transform'},
        {'name': 'end', 'type': 'END'}
    ],
    'arcs': [
        {'from': 'start', 'to': 'process'},
        {'from': 'process', 'to': 'end'}
    ]
}

def transform(data):
    data['transformed'] = True
    return data

fsm = SimpleFSM(
    config=config,
    custom_functions={'transform': transform}
)

With data mode selection:

from dataknobs_fsm.core.data_modes import DataHandlingMode

# Use COPY for safe concurrent processing
fsm_safe = SimpleFSM('config.yaml', data_mode=DataHandlingMode.COPY)

# Use REFERENCE for memory efficiency
fsm_efficient = SimpleFSM('config.yaml', data_mode=DataHandlingMode.REFERENCE)

# Use DIRECT for maximum performance (single-threaded only)
fsm_fast = SimpleFSM('config.yaml', data_mode=DataHandlingMode.DIRECT)

With resources:

resources = {
    'database': {
        'type': 'DATABASE',
        'backend': 'postgres',
        'host': 'localhost',
        'database': 'mydb',
        'user': 'admin',
        'password': 'secret'
    },
    'http_client': {
        'type': 'HTTP',
        'base_url': 'https://api.example.com',
        'timeout': 30
    }
}

fsm = SimpleFSM('config.yaml', resources=resources)
Source code in packages/fsm/src/dataknobs_fsm/api/simple.py
def __init__(
    self,
    config: str | Path | dict[str, Any],
    data_mode: DataHandlingMode = DataHandlingMode.COPY,
    resources: dict[str, Any] | None = None,
    custom_functions: dict[str, Callable] | None = None
):
    """Initialize SimpleFSM from configuration.

    Args:
        config: FSM configuration. Can be:
            - Path to YAML/JSON config file (str or Path)
            - Dictionary containing config (inline configuration)
            Must define states, arcs, and optionally resources.
        data_mode: Data handling mode controlling how data is passed between
            states. Options:
            - DataHandlingMode.COPY (default): Deep copy for safety
            - DataHandlingMode.REFERENCE: Lazy loading with locking
            - DataHandlingMode.DIRECT: In-place modification (fastest)
        resources: Optional resource configurations. Dict mapping resource
            names to configuration dicts. Each config must have a 'type' key
            (DATABASE, FILESYSTEM, HTTP, etc.) and type-specific parameters.
            Example: {'db': {'type': 'DATABASE', 'backend': 'postgres', ...}}
        custom_functions: Optional custom state functions. Dict mapping function
            names to callables. Functions receive data dict and return data dict.
            Function names must match 'function' fields in state definitions.
            Example: {'my_func': lambda data: {'result': data['input'] * 2}}

    Example:
        From configuration file:

        ```python
        from dataknobs_fsm.api.simple import SimpleFSM

        # Load from YAML file
        fsm = SimpleFSM('config.yaml')
        ```

        With inline configuration:

        ```python
        config = {
            'name': 'simple_pipeline',
            'states': [
                {'name': 'start', 'type': 'START'},
                {'name': 'process', 'type': 'NORMAL', 'function': 'transform'},
                {'name': 'end', 'type': 'END'}
            ],
            'arcs': [
                {'from': 'start', 'to': 'process'},
                {'from': 'process', 'to': 'end'}
            ]
        }

        def transform(data):
            data['transformed'] = True
            return data

        fsm = SimpleFSM(
            config=config,
            custom_functions={'transform': transform}
        )
        ```

        With data mode selection:

        ```python
        from dataknobs_fsm.core.data_modes import DataHandlingMode

        # Use COPY for safe concurrent processing
        fsm_safe = SimpleFSM('config.yaml', data_mode=DataHandlingMode.COPY)

        # Use REFERENCE for memory efficiency
        fsm_efficient = SimpleFSM('config.yaml', data_mode=DataHandlingMode.REFERENCE)

        # Use DIRECT for maximum performance (single-threaded only)
        fsm_fast = SimpleFSM('config.yaml', data_mode=DataHandlingMode.DIRECT)
        ```

        With resources:

        ```python
        resources = {
            'database': {
                'type': 'DATABASE',
                'backend': 'postgres',
                'host': 'localhost',
                'database': 'mydb',
                'user': 'admin',
                'password': 'secret'
            },
            'http_client': {
                'type': 'HTTP',
                'base_url': 'https://api.example.com',
                'timeout': 30
            }
        }

        fsm = SimpleFSM('config.yaml', resources=resources)
        ```
    """
    # Store data_mode for compatibility
    self.data_mode = data_mode

    # Create the async FSM
    self._async_fsm = AsyncSimpleFSM(
        config=config,
        data_mode=data_mode,
        resources=resources,
        custom_functions=custom_functions
    )

    # Expose internal attributes for compatibility
    self._fsm = self._async_fsm._fsm
    self._resource_manager = self._async_fsm._resource_manager
    self._async_engine = self._async_fsm._async_engine

    # Create synchronous engine for compatibility
    from ..execution.engine import ExecutionEngine
    self._engine = ExecutionEngine(self._fsm)

    # Create a dedicated event loop for sync operations
    self._loop: asyncio.AbstractEventLoop | None = None
    self._loop_thread: threading.Thread | None = None
    self._setup_event_loop()
Attributes
config property
config: Any

Get the FSM configuration object.

Functions
process
process(
    data: dict[str, Any] | Record,
    initial_state: str | None = None,
    timeout: float | None = None,
) -> dict[str, Any]

Process a single record through the FSM synchronously.

Parameters:

Name Type Description Default
data dict[str, Any] | Record

Input data to process

required
initial_state str | None

Optional starting state (defaults to FSM start state)

None
timeout float | None

Optional timeout in seconds

None

Returns:

Type Description
dict[str, Any]

Dict containing the processed result with fields:

dict[str, Any]
  • final_state: Name of the final state reached
dict[str, Any]
  • data: The transformed data
dict[str, Any]
  • path: List of states traversed
dict[str, Any]
  • success: Whether processing succeeded
dict[str, Any]
  • error: Any error message (None if successful)
Source code in packages/fsm/src/dataknobs_fsm/api/simple.py
def process(
    self,
    data: dict[str, Any] | Record,
    initial_state: str | None = None,
    timeout: float | None = None
) -> dict[str, Any]:
    """Process a single record through the FSM synchronously.

    Args:
        data: Input data to process
        initial_state: Optional starting state (defaults to FSM start state)
        timeout: Optional timeout in seconds

    Returns:
        Dict containing the processed result with fields:
        - final_state: Name of the final state reached
        - data: The transformed data
        - path: List of states traversed
        - success: Whether processing succeeded
        - error: Any error message (None if successful)
    """
    # Create the coroutine with the async process method
    async def _process():
        # Import here to avoid circular dependency
        from ..core.context_factory import ContextFactory
        from ..core.modes import ProcessingMode
        from ..core.result_formatter import ResultFormatter

        # Convert to Record if needed
        if isinstance(data, dict):
            from dataknobs_data import Record
            record = Record(data)
        else:
            record = data

        # Create context
        context = ContextFactory.create_context(
            fsm=self._fsm,
            data=record,
            initial_state=initial_state,
            data_mode=ProcessingMode.SINGLE,
            resource_manager=self._resource_manager
        )

        try:
            # Execute FSM asynchronously
            success, result = await self._async_engine.execute(context)

            # Format result
            return ResultFormatter.format_single_result(
                context=context,
                success=success,
                result=result
            )
        except asyncio.TimeoutError:
            # Return error result instead of raising
            return ResultFormatter.format_error_result(
                context=context,
                error=TimeoutError(f"FSM execution exceeded timeout of {timeout} seconds")
            )
        except Exception as e:
            return ResultFormatter.format_error_result(
                context=context,
                error=e
            )

    if timeout:
        # Use threading for timeout support
        import concurrent.futures
        with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
            future = executor.submit(self._run_async, _process())
            try:
                return future.result(timeout=timeout)
            except concurrent.futures.TimeoutError:
                future.cancel()
                # Return an error result instead of raising
                return {
                    'success': False,
                    'error': f"FSM execution exceeded timeout of {timeout} seconds",
                    'final_state': None,
                    'data': data if isinstance(data, dict) else data.data,
                    'path': []
                }
    else:
        return self._run_async(_process())
process_batch
process_batch(
    data: list[dict[str, Any] | Record],
    batch_size: int = 10,
    max_workers: int = 4,
    on_progress: Callable | None = None,
) -> list[dict[str, Any]]

Process multiple records in parallel batches synchronously.

Parameters:

Name Type Description Default
data list[dict[str, Any] | Record]

List of input records to process

required
batch_size int

Number of records per batch

10
max_workers int

Maximum parallel workers

4
on_progress Callable | None

Optional callback for progress updates

None

Returns:

Type Description
list[dict[str, Any]]

List of results for each input record

Source code in packages/fsm/src/dataknobs_fsm/api/simple.py
def process_batch(
    self,
    data: list[dict[str, Any] | Record],
    batch_size: int = 10,
    max_workers: int = 4,
    on_progress: Callable | None = None
) -> list[dict[str, Any]]:
    """Process multiple records in parallel batches synchronously.

    Args:
        data: List of input records to process
        batch_size: Number of records per batch
        max_workers: Maximum parallel workers
        on_progress: Optional callback for progress updates

    Returns:
        List of results for each input record
    """
    return self._run_async(
        self._async_fsm.process_batch(
            data=data,
            batch_size=batch_size,
            max_workers=max_workers,
            on_progress=on_progress
        )
    )
process_stream
process_stream(
    source: str | Any,
    sink: str | None = None,
    chunk_size: int = 100,
    on_progress: Callable | None = None,
    input_format: str = "auto",
    text_field_name: str = "text",
    csv_delimiter: str = ",",
    csv_has_header: bool = True,
    skip_empty_lines: bool = True,
    use_streaming: bool = False,
) -> dict[str, Any]

Process a stream of data through the FSM synchronously.

Parameters:

Name Type Description Default
source str | Any

Data source file path or async iterator

required
sink str | None

Optional output destination

None
chunk_size int

Size of processing chunks

100
on_progress Callable | None

Optional progress callback

None
input_format str

Input file format ('auto', 'jsonl', 'json', 'csv', 'text')

'auto'
text_field_name str

Field name for text lines when converting to dict

'text'
csv_delimiter str

CSV delimiter character

','
csv_has_header bool

Whether CSV file has header row

True
skip_empty_lines bool

Skip empty lines in text files

True
use_streaming bool

Use memory-efficient streaming for large files

False

Returns:

Type Description
dict[str, Any]

Dict containing stream processing statistics

Source code in packages/fsm/src/dataknobs_fsm/api/simple.py
def process_stream(
    self,
    source: str | Any,
    sink: str | None = None,
    chunk_size: int = 100,
    on_progress: Callable | None = None,
    input_format: str = 'auto',
    text_field_name: str = 'text',
    csv_delimiter: str = ',',
    csv_has_header: bool = True,
    skip_empty_lines: bool = True,
    use_streaming: bool = False
) -> dict[str, Any]:
    """Process a stream of data through the FSM synchronously.

    Args:
        source: Data source file path or async iterator
        sink: Optional output destination
        chunk_size: Size of processing chunks
        on_progress: Optional progress callback
        input_format: Input file format ('auto', 'jsonl', 'json', 'csv', 'text')
        text_field_name: Field name for text lines when converting to dict
        csv_delimiter: CSV delimiter character
        csv_has_header: Whether CSV file has header row
        skip_empty_lines: Skip empty lines in text files
        use_streaming: Use memory-efficient streaming for large files

    Returns:
        Dict containing stream processing statistics
    """
    # If source is a string (file path), use the async version directly
    if isinstance(source, str):
        return self._run_async(
            self._async_fsm.process_stream(
                source=source,
                sink=sink,
                chunk_size=chunk_size,
                on_progress=on_progress,
                input_format=input_format,
                text_field_name=text_field_name,
                csv_delimiter=csv_delimiter,
                csv_has_header=csv_has_header,
                skip_empty_lines=skip_empty_lines,
                use_streaming=use_streaming
            )
        )
    else:
        # Source is an async iterator, need to handle it properly
        async def _process():
            return await self._async_fsm.process_stream(
                source=source,
                sink=sink,
                chunk_size=chunk_size,
                on_progress=on_progress,
                input_format=input_format,
                text_field_name=text_field_name,
                csv_delimiter=csv_delimiter,
                csv_has_header=csv_has_header,
                skip_empty_lines=skip_empty_lines,
                use_streaming=use_streaming
            )
        return self._run_async(_process())
validate
validate(data: dict[str, Any] | Record) -> dict[str, Any]

Validate data against FSM's start state schema synchronously.

Parameters:

Name Type Description Default
data dict[str, Any] | Record

Data to validate

required

Returns:

Type Description
dict[str, Any]

Dict containing validation results

Source code in packages/fsm/src/dataknobs_fsm/api/simple.py
def validate(self, data: dict[str, Any] | Record) -> dict[str, Any]:
    """Validate data against FSM's start state schema synchronously.

    Args:
        data: Data to validate

    Returns:
        Dict containing validation results
    """
    return self._run_async(self._async_fsm.validate(data))
get_states
get_states() -> list[str]

Get list of all state names in the FSM.

Source code in packages/fsm/src/dataknobs_fsm/api/simple.py
def get_states(self) -> list[str]:
    """Get list of all state names in the FSM."""
    return self._async_fsm.get_states()
get_resources
get_resources() -> list[str]

Get list of registered resource names.

Source code in packages/fsm/src/dataknobs_fsm/api/simple.py
def get_resources(self) -> list[str]:
    """Get list of registered resource names."""
    return self._async_fsm.get_resources()
close
close() -> None

Clean up resources and close connections synchronously.

Source code in packages/fsm/src/dataknobs_fsm/api/simple.py
def close(self) -> None:
    """Clean up resources and close connections synchronously."""
    self._run_async(self._async_fsm.close())

    # Shut down the event loop
    if self._loop and self._loop.is_running():
        self._loop.call_soon_threadsafe(self._loop.stop)
        if self._loop_thread and self._loop_thread.is_alive():
            self._loop_thread.join(timeout=1.0)
aclose async
aclose() -> None

Async version of close for use in async contexts.

Source code in packages/fsm/src/dataknobs_fsm/api/simple.py
async def aclose(self) -> None:
    """Async version of close for use in async contexts."""
    await self._async_fsm.close()

AsyncSimpleFSM

AsyncSimpleFSM(
    config: str | Path | dict[str, Any],
    data_mode: DataHandlingMode = DataHandlingMode.COPY,
    resources: dict[str, Any] | None = None,
    custom_functions: dict[str, Callable] | None = None,
)

Async-first FSM interface for production workflows.

This class provides a fully asynchronous API for FSM operations, designed to work natively in async contexts without blocking calls or thread overhead. This is the recommended FSM implementation for production systems.

AsyncSimpleFSM handles all the complexity of async execution, resource management, and concurrent processing while providing a simple, clean API. It's optimized for high-throughput scenarios with proper connection pooling, error handling, and memory management.

Attributes:

Name Type Description
data_mode DataHandlingMode

Data processing mode (COPY/REFERENCE/DIRECT)

_config

Loaded FSM configuration

_fsm FSM

Core FSM engine

_resource_manager ResourceManager

Resource lifecycle and pooling manager

_async_engine AsyncExecutionEngine

Async execution engine

Methods:

Name Description
process

Process single record asynchronously

process_batch

Process multiple records with concurrency control

process_stream

Stream-process large datasets

validate

Validate data against schema

get_states

List all FSM state names

get_resources

List all registered resources

close

Release all resources and cleanup

Production Use Cases

Web API Backend: Handle thousands of concurrent requests in FastAPI/aiohttp services. Each request processes independently with automatic resource pooling.

Data Pipeline Processing: Transform large datasets with memory-efficient streaming and parallel batch processing. Configurable chunk sizes and worker counts.

Real-time Event Processing: Process events from queues (RabbitMQ, Kafka) with async consumers. High throughput with concurrent processing of independent events.

Batch Job Processing: Schedule and run large batch jobs with progress tracking and error handling. Configurable parallelism for optimal resource utilization.

Note

Concurrency Safety: AsyncSimpleFSM is safe for concurrent use when using DataHandlingMode.COPY (default). Each process() call operates on independent data. For REFERENCE or DIRECT modes, ensure external synchronization.

Resource Pooling: Resources (databases, HTTP clients) use connection pooling automatically. Configure pool_size in resource definitions for optimal performance.

Error Handling: Process methods return success/error dicts rather than raising exceptions, allowing graceful degradation. Use try/except only for critical failures.

Memory Management: For large datasets, use process_stream() with use_streaming=True for constant memory usage regardless of file size.

Example

Basic async processing:

import asyncio
from dataknobs_fsm.api.async_simple import AsyncSimpleFSM

async def main():
    # Create FSM
    fsm = AsyncSimpleFSM('pipeline.yaml')

    try:
        # Process single record
        result = await fsm.process({
            'text': 'Input data',
            'metadata': {'source': 'api'}
        })

        if result['success']:
            print(f"Result: {result['data']}")
            print(f"States: {' -> '.join(result['path'])}")
        else:
            print(f"Error: {result['error']}")
    finally:
        await fsm.close()

asyncio.run(main())

Concurrent processing with asyncio.gather:

async def process_concurrent():
    fsm = AsyncSimpleFSM('config.yaml')

    try:
        # Create tasks for concurrent execution
        tasks = [
            fsm.process({'id': 1, 'text': 'Item 1'}),
            fsm.process({'id': 2, 'text': 'Item 2'}),
            fsm.process({'id': 3, 'text': 'Item 3'})
        ]

        # Execute concurrently
        results = await asyncio.gather(*tasks)

        # Check results
        for i, result in enumerate(results, 1):
            status = "✓" if result['success'] else "✗"
            print(f"{status} Item {i}: {result.get('data', result.get('error'))}")
    finally:
        await fsm.close()

Production web service pattern:

from fastapi import FastAPI
from contextlib import asynccontextmanager

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup: initialize FSM
    app.state.fsm = AsyncSimpleFSM(
        'production.yaml',
        resources={
            'db': {
                'type': 'DATABASE',
                'backend': 'postgres',
                'pool_size': 20,  # Connection pooling
                'host': 'db.prod.example.com'
            }
        }
    )
    yield
    # Shutdown: cleanup
    await app.state.fsm.close()

app = FastAPI(lifespan=lifespan)

@app.post("/process")
async def process(request: dict):
    result = await app.state.fsm.process(request)
    return result

Batch processing with progress tracking:

async def process_with_progress():
    fsm = AsyncSimpleFSM('pipeline.yaml')

    # Progress callback
    def on_progress(current, total):
        pct = (current / total) * 100
        print(f"Progress: {current}/{total} ({pct:.1f}%)")

    try:
        records = [{'id': i} for i in range(1000)]
        results = await fsm.process_batch(
            data=records,
            batch_size=50,
            max_workers=10,
            on_progress=on_progress
        )

        successful = sum(1 for r in results if r['success'])
        print(f"Success rate: {successful}/{len(results)}")
    finally:
        await fsm.close()

Stream processing large files:

async def process_large_file():
    fsm = AsyncSimpleFSM('transform.yaml')

    try:
        # Memory-efficient streaming
        stats = await fsm.process_stream(
            source='input_100gb.jsonl',
            sink='output.jsonl',
            chunk_size=1000,
            use_streaming=True  # Constant memory usage
        )

        print(f"Processed: {stats['total_processed']}")
        print(f"Success: {stats['successful']}")
        print(f"Failed: {stats['failed']}")
        print(f"Duration: {stats['duration']:.2f}s")
        print(f"Throughput: {stats['throughput']:.2f} records/sec")
    finally:
        await fsm.close()
See Also
  • :class:SimpleFSM: Synchronous wrapper for scripts
  • :class:AdvancedFSM: Advanced API with debugging
  • :func:create_async_fsm: Factory function for creating instances
  • :mod:dataknobs_fsm.execution.async_engine: Async execution engine
  • :mod:dataknobs_fsm.resources.manager: Resource management

Initialize AsyncSimpleFSM from configuration.

Parameters:

Name Type Description Default
config str | Path | dict[str, Any]

Path to config file or config dictionary

required
data_mode DataHandlingMode

Default data mode for processing

COPY
resources dict[str, Any] | None

Optional resource configurations

None
custom_functions dict[str, Callable] | None

Optional custom functions to register

None
Source code in packages/fsm/src/dataknobs_fsm/api/async_simple.py
def __init__(
    self,
    config: str | Path | dict[str, Any],
    data_mode: DataHandlingMode = DataHandlingMode.COPY,
    resources: dict[str, Any] | None = None,
    custom_functions: dict[str, Callable] | None = None
):
    """Initialize AsyncSimpleFSM from configuration.

    Args:
        config: Path to config file or config dictionary
        data_mode: Default data mode for processing
        resources: Optional resource configurations
        custom_functions: Optional custom functions to register
    """
    self.data_mode = data_mode
    self._resources = resources or {}
    self._custom_functions = custom_functions or {}

    # Build FSM using shared build logic (registers custom functions
    # with both ConfigLoader and FSMBuilder before building)
    from dataknobs_fsm.config.builder import build_fsm
    self._fsm = build_fsm(config, custom_functions)
    self._config = self._fsm.config

    # Initialize resource manager
    self._resource_manager = ResourceManager()
    self._setup_resources()

    # Create async execution engine
    self._async_engine = AsyncExecutionEngine(self._fsm)
Attributes
config property
config: Any

Get the FSM configuration object.

Functions
process async
process(data: dict[str, Any] | Record) -> dict[str, Any]

Process a single record through the FSM asynchronously.

Parameters:

Name Type Description Default
data dict[str, Any] | Record

Input data to process

required

Returns:

Type Description
dict[str, Any]

Dict containing the processed result

Source code in packages/fsm/src/dataknobs_fsm/api/async_simple.py
async def process(self, data: dict[str, Any] | Record) -> dict[str, Any]:
    """Process a single record through the FSM asynchronously.

    Args:
        data: Input data to process

    Returns:
        Dict containing the processed result
    """
    # Convert to Record if needed
    if isinstance(data, dict):
        record = Record(data)
    else:
        record = data

    # Create context
    from ..core.modes import ProcessingMode
    context = ContextFactory.create_context(
        fsm=self._fsm,
        data=record,
        data_mode=ProcessingMode.SINGLE,
        resource_manager=self._resource_manager
    )

    try:
        # Execute FSM asynchronously
        success, result = await self._async_engine.execute(context)

        # Format result
        return ResultFormatter.format_single_result(
            context=context,
            success=success,
            result=result
        )
    except Exception as e:
        return ResultFormatter.format_error_result(
            context=context,
            error=e
        )
process_batch async
process_batch(
    data: list[dict[str, Any] | Record],
    batch_size: int = 10,
    max_workers: int = 4,
    on_progress: Callable | None = None,
) -> list[dict[str, Any]]

Process multiple records in parallel batches asynchronously.

Parameters:

Name Type Description Default
data list[dict[str, Any] | Record]

List of input records to process

required
batch_size int

Number of records per batch

10
max_workers int

Maximum parallel workers

4
on_progress Callable | None

Optional callback for progress updates

None

Returns:

Type Description
list[dict[str, Any]]

List of results for each input record

Source code in packages/fsm/src/dataknobs_fsm/api/async_simple.py
async def process_batch(
    self,
    data: list[dict[str, Any] | Record],
    batch_size: int = 10,
    max_workers: int = 4,
    on_progress: Callable | None = None
) -> list[dict[str, Any]]:
    """Process multiple records in parallel batches asynchronously.

    Args:
        data: List of input records to process
        batch_size: Number of records per batch
        max_workers: Maximum parallel workers
        on_progress: Optional callback for progress updates

    Returns:
        List of results for each input record
    """
    batch_executor = AsyncBatchExecutor(
        fsm=self._fsm,
        parallelism=max_workers,
        batch_size=batch_size,
        progress_callback=on_progress
    )

    # Convert to Records
    records = []
    for item in data:
        if isinstance(item, dict):
            records.append(Record(item))
        else:
            records.append(item)

    # Execute batch
    results = await batch_executor.execute_batch(items=records)

    # Format results
    formatted_results = []
    for result in results:
        if result.success:
            formatted_results.append({
                'final_state': result.metadata.get('final_state', 'unknown'),
                'data': result.result,
                'path': result.metadata.get('path', []),
                'success': True,
                'error': None
            })
        else:
            formatted_results.append({
                'final_state': result.metadata.get('final_state', None),
                'data': result.result if result.result else {},
                'path': result.metadata.get('path', []),
                'success': False,
                'error': str(result.error) if result.error else str(result.result)
            })

    return formatted_results
process_stream async
process_stream(
    source: str | AsyncIterator[dict[str, Any]],
    sink: str | None = None,
    chunk_size: int = 100,
    on_progress: Callable | None = None,
    input_format: str = "auto",
    text_field_name: str = "text",
    csv_delimiter: str = ",",
    csv_has_header: bool = True,
    skip_empty_lines: bool = True,
    use_streaming: bool = False,
) -> dict[str, Any]

Process a stream of data through the FSM asynchronously.

Parameters:

Name Type Description Default
source str | AsyncIterator[dict[str, Any]]

Data source (file path or async iterator)

required
sink str | None

Optional output destination

None
chunk_size int

Size of processing chunks

100
on_progress Callable | None

Optional progress callback

None
input_format str

Input file format ('auto', 'jsonl', 'json', 'csv', 'text')

'auto'
text_field_name str

Field name for text lines when converting to dict

'text'
csv_delimiter str

CSV delimiter character

','
csv_has_header bool

Whether CSV file has header row

True
skip_empty_lines bool

Skip empty lines in text files

True
use_streaming bool

Use memory-efficient streaming for large files

False

Returns:

Type Description
dict[str, Any]

Dict containing stream processing statistics

Source code in packages/fsm/src/dataknobs_fsm/api/async_simple.py
async def process_stream(
    self,
    source: str | AsyncIterator[dict[str, Any]],
    sink: str | None = None,
    chunk_size: int = 100,
    on_progress: Callable | None = None,
    input_format: str = 'auto',
    text_field_name: str = 'text',
    csv_delimiter: str = ',',
    csv_has_header: bool = True,
    skip_empty_lines: bool = True,
    use_streaming: bool = False
) -> dict[str, Any]:
    """Process a stream of data through the FSM asynchronously.

    Args:
        source: Data source (file path or async iterator)
        sink: Optional output destination
        chunk_size: Size of processing chunks
        on_progress: Optional progress callback
        input_format: Input file format ('auto', 'jsonl', 'json', 'csv', 'text')
        text_field_name: Field name for text lines when converting to dict
        csv_delimiter: CSV delimiter character
        csv_has_header: Whether CSV file has header row
        skip_empty_lines: Skip empty lines in text files
        use_streaming: Use memory-efficient streaming for large files

    Returns:
        Dict containing stream processing statistics
    """
    # Configure streaming
    stream_config = CoreStreamConfig(
        chunk_size=chunk_size,
        parallelism=4,
        memory_limit_mb=1024
    )

    # Create async stream executor
    stream_executor = AsyncStreamExecutor(
        fsm=self._fsm,
        stream_config=stream_config,
        progress_callback=on_progress
    )

    # Choose between streaming and regular mode
    if use_streaming and isinstance(source, str):
        # Use memory-efficient streaming for large files
        from ..utils.streaming_file_utils import (
            create_streaming_file_reader,
            create_streaming_file_writer,
        )

        stream_source = create_streaming_file_reader(
            file_path=source,
            config=stream_config,
            input_format=input_format,
            text_field_name=text_field_name,
            csv_delimiter=csv_delimiter,
            csv_has_header=csv_has_header,
            skip_empty_lines=skip_empty_lines
        )

        # Handle sink for streaming mode
        sink_func = None
        cleanup_func = None
        if sink:
            sink_func, cleanup_func = await create_streaming_file_writer(
                file_path=sink,
                config=stream_config
            )
    else:
        # Use regular mode (loads full chunks into memory)
        from ..utils.file_utils import create_file_reader, create_file_writer

        # Handle file source
        if isinstance(source, str):
            stream_source = create_file_reader(
                file_path=source,
                input_format=input_format,
                text_field_name=text_field_name,
                csv_delimiter=csv_delimiter,
                csv_has_header=csv_has_header,
                skip_empty_lines=skip_empty_lines
            )
        else:
            # Already an async iterator
            stream_source = source

        # Handle sink for regular mode
        sink_func = None
        cleanup_func = None
        if sink:
            sink_func, cleanup_func = create_file_writer(sink)

    try:
        # Execute stream using async executor
        result = await stream_executor.execute_stream(
            source=stream_source,
            sink=sink_func,
            chunk_size=chunk_size
        )

        return {
            'total_processed': result.total_processed,
            'successful': result.successful,
            'failed': result.failed,
            'duration': result.duration,
            'throughput': result.throughput
        }
    finally:
        # Clean up any resources (e.g., close files)
        if cleanup_func:
            if asyncio.iscoroutinefunction(cleanup_func):
                await cleanup_func()
            else:
                cleanup_func()
validate async
validate(data: dict[str, Any] | Record) -> dict[str, Any]

Validate data against FSM's start state schema asynchronously.

Parameters:

Name Type Description Default
data dict[str, Any] | Record

Data to validate

required

Returns:

Type Description
dict[str, Any]

Dict containing validation results

Source code in packages/fsm/src/dataknobs_fsm/api/async_simple.py
async def validate(self, data: dict[str, Any] | Record) -> dict[str, Any]:
    """Validate data against FSM's start state schema asynchronously.

    Args:
        data: Data to validate

    Returns:
        Dict containing validation results
    """
    # Convert to Record if needed
    if isinstance(data, dict):
        record = Record(data)
    else:
        record = data

    # Get start state
    start_state = self._fsm.get_start_state()

    # Validate against schema
    if start_state.schema:
        validation_result = start_state.schema.validate(record)
        return {
            'valid': validation_result.valid,
            'errors': validation_result.errors if not validation_result.valid else []
        }
    else:
        return {
            'valid': True,
            'errors': []
        }
get_states
get_states() -> list[str]

Get list of all state names in the FSM.

Source code in packages/fsm/src/dataknobs_fsm/api/async_simple.py
def get_states(self) -> list[str]:
    """Get list of all state names in the FSM."""
    states = []
    # The FSM has networks, and each network has states
    for network in self._fsm.networks.values():
        for state in network.states.values():
            states.append(state.name)
    return states
get_resources
get_resources() -> list[str]

Get list of registered resource names.

Source code in packages/fsm/src/dataknobs_fsm/api/async_simple.py
def get_resources(self) -> list[str]:
    """Get list of registered resource names."""
    return list(self._resource_manager._providers.keys())
close async
close() -> None

Clean up resources and close connections asynchronously.

Source code in packages/fsm/src/dataknobs_fsm/api/async_simple.py
async def close(self) -> None:
    """Clean up resources and close connections asynchronously."""
    await self._resource_manager.cleanup()

AdvancedFSM

AdvancedFSM(
    config: FSM | str | Path | dict[str, Any],
    execution_mode: ExecutionMode = ExecutionMode.STEP_BY_STEP,
    custom_functions: dict[str, Callable] | None = None,
)

Advanced FSM interface with full control capabilities.

Initialize AdvancedFSM.

Parameters:

Name Type Description Default
config FSM | str | Path | dict[str, Any]

Either a pre-built :class:FSM instance, a path to a YAML/JSON config file, or a configuration dictionary. When a config path or dict is provided the FSM is built internally using :func:~dataknobs_fsm.config.builder.build_fsm, which ensures custom functions are properly registered before building.

required
execution_mode ExecutionMode

Execution mode for advanced control.

STEP_BY_STEP
custom_functions dict[str, Callable] | None

Optional custom functions to register. When config is a path or dict, these are registered before building the FSM. In all cases (including pre-built :class:FSM instances), they are forwarded to the execution engines and merged into the function registry at execution time.

None

Methods:

Name Description
set_execution_strategy

Set custom execution strategy.

set_data_handler

Set custom data handler.

configure_transactions

Configure transaction management.

register_resource

Register a custom resource.

set_hooks

Set execution hooks for monitoring.

add_breakpoint

Add a breakpoint at a specific state.

remove_breakpoint

Remove a breakpoint.

clear_breakpoints

Clear all breakpoints.

enable_history

Enable execution history tracking.

disable_history

Disable history tracking.

create_context

Create an execution context for manual control (synchronous).

execution_context

Create an execution context for manual control.

step

Execute a single transition step.

run_until_breakpoint

Run execution until a breakpoint is hit.

trace_execution

Execute with full tracing enabled.

profile_execution

Execute with performance profiling.

get_available_transitions

Get available transitions from a state.

inspect_state

Inspect a state's configuration.

visualize_fsm

Generate a visual representation of the FSM.

validate_network

Validate the FSM network for consistency.

get_history

Get execution history if enabled.

save_history

Save execution history to storage.

load_history

Load execution history from storage.

execute_step_sync

Execute a single transition step synchronously.

execute_step_async

Execute a single transition step asynchronously.

run_until_breakpoint_sync

Run execution until a breakpoint is hit (synchronous).

trace_execution_sync

Execute with full tracing enabled (synchronous).

profile_execution_sync

Execute with performance profiling (synchronous).

Attributes:

Name Type Description
breakpoints set

Get the current breakpoints.

hooks ExecutionHook

Get the current execution hooks.

history_enabled bool

Check if history tracking is enabled.

max_history_depth int

Get the maximum history depth.

execution_history list

Get the execution history steps.

Source code in packages/fsm/src/dataknobs_fsm/api/advanced.py
def __init__(
    self,
    config: FSM | str | Path | dict[str, Any],
    execution_mode: ExecutionMode = ExecutionMode.STEP_BY_STEP,
    custom_functions: dict[str, Callable] | None = None
):
    """Initialize AdvancedFSM.

    Args:
        config: Either a pre-built :class:`FSM` instance, a path to a
            YAML/JSON config file, or a configuration dictionary.  When a
            config path or dict is provided the FSM is built internally
            using :func:`~dataknobs_fsm.config.builder.build_fsm`, which
            ensures custom functions are properly registered before
            building.
        execution_mode: Execution mode for advanced control.
        custom_functions: Optional custom functions to register.  When
            *config* is a path or dict, these are registered before
            building the FSM.  In all cases (including pre-built
            :class:`FSM` instances), they are forwarded to the
            execution engines and merged into the function registry
            at execution time.
    """
    if isinstance(config, FSM):
        fsm = config
    else:
        from dataknobs_fsm.config.builder import build_fsm
        fsm = build_fsm(config, custom_functions)

    self.fsm = fsm
    self.execution_mode = execution_mode
    self._engine = ExecutionEngine(fsm, custom_functions=custom_functions)
    self._async_engine = AsyncExecutionEngine(fsm, custom_functions=custom_functions)
    self._resource_manager = ResourceManager()
    self._transaction_manager = None
    self._history: ExecutionHistory | None = None
    self._storage: IHistoryStorage | None = None
    self._hooks = ExecutionHook()
    self._breakpoints = set()
    self._trace_buffer = []
    self._profile_data = {}
    self._custom_functions = custom_functions or {}
Attributes
breakpoints property
breakpoints: set

Get the current breakpoints.

hooks property
hooks: ExecutionHook

Get the current execution hooks.

history_enabled property
history_enabled: bool

Check if history tracking is enabled.

max_history_depth property
max_history_depth: int

Get the maximum history depth.

execution_history property
execution_history: list

Get the execution history steps.

Functions
set_execution_strategy
set_execution_strategy(strategy: TraversalStrategy) -> None

Set custom execution strategy.

Parameters:

Name Type Description Default
strategy TraversalStrategy

Execution strategy to use

required
Source code in packages/fsm/src/dataknobs_fsm/api/advanced.py
def set_execution_strategy(self, strategy: TraversalStrategy) -> None:
    """Set custom execution strategy.

    Args:
        strategy: Execution strategy to use
    """
    self._engine.strategy = strategy
set_data_handler
set_data_handler(handler: DataHandler) -> None

Set custom data handler.

Parameters:

Name Type Description Default
handler DataHandler

Data handler implementation

required
Source code in packages/fsm/src/dataknobs_fsm/api/advanced.py
def set_data_handler(self, handler: DataHandler) -> None:
    """Set custom data handler.

    Args:
        handler: Data handler implementation
    """
    self._engine.data_handler = handler
configure_transactions
configure_transactions(strategy: TransactionStrategy, **config: Any) -> None

Configure transaction management.

Parameters:

Name Type Description Default
strategy TransactionStrategy

Transaction strategy to use

required
**config Any

Strategy-specific configuration

{}
Source code in packages/fsm/src/dataknobs_fsm/api/advanced.py
def configure_transactions(
    self,
    strategy: TransactionStrategy,
    **config: Any
) -> None:
    """Configure transaction management.

    Args:
        strategy: Transaction strategy to use
        **config: Strategy-specific configuration
    """
    self._transaction_manager = TransactionManager.create(strategy, **config)
register_resource
register_resource(
    name: str, resource: IResourceProvider | dict[str, Any]
) -> None

Register a custom resource.

Parameters:

Name Type Description Default
name str

Resource name

required
resource IResourceProvider | dict[str, Any]

Resource instance or configuration

required
Source code in packages/fsm/src/dataknobs_fsm/api/advanced.py
def register_resource(
    self,
    name: str,
    resource: IResourceProvider | dict[str, Any]
) -> None:
    """Register a custom resource.

    Args:
        name: Resource name
        resource: Resource instance or configuration
    """
    if isinstance(resource, dict):
        # Use ResourceManager factory method
        self._resource_manager.register_from_dict(name, resource)
    else:
        # Assume it's already a provider
        self._resource_manager.register_provider(name, resource)
set_hooks
set_hooks(hooks: ExecutionHook) -> None

Set execution hooks for monitoring.

Parameters:

Name Type Description Default
hooks ExecutionHook

Execution hooks configuration

required
Source code in packages/fsm/src/dataknobs_fsm/api/advanced.py
def set_hooks(self, hooks: ExecutionHook) -> None:
    """Set execution hooks for monitoring.

    Args:
        hooks: Execution hooks configuration
    """
    self._hooks = hooks
add_breakpoint
add_breakpoint(state_name: str) -> None

Add a breakpoint at a specific state.

Parameters:

Name Type Description Default
state_name str

Name of state to break at

required
Source code in packages/fsm/src/dataknobs_fsm/api/advanced.py
def add_breakpoint(self, state_name: str) -> None:
    """Add a breakpoint at a specific state.

    Args:
        state_name: Name of state to break at
    """
    self._breakpoints.add(state_name)
remove_breakpoint
remove_breakpoint(state_name: str) -> None

Remove a breakpoint.

Parameters:

Name Type Description Default
state_name str

Name of state to remove breakpoint from

required
Source code in packages/fsm/src/dataknobs_fsm/api/advanced.py
def remove_breakpoint(self, state_name: str) -> None:
    """Remove a breakpoint.

    Args:
        state_name: Name of state to remove breakpoint from
    """
    self._breakpoints.discard(state_name)
clear_breakpoints
clear_breakpoints() -> None

Clear all breakpoints.

Source code in packages/fsm/src/dataknobs_fsm/api/advanced.py
def clear_breakpoints(self) -> None:
    """Clear all breakpoints."""
    self._breakpoints.clear()
enable_history
enable_history(
    storage: IHistoryStorage | None = None, max_depth: int = 100
) -> None

Enable execution history tracking.

Parameters:

Name Type Description Default
storage IHistoryStorage | None

Optional storage backend for history

None
max_depth int

Maximum history depth to track

100
Source code in packages/fsm/src/dataknobs_fsm/api/advanced.py
def enable_history(
    self,
    storage: IHistoryStorage | None = None,
    max_depth: int = 100
) -> None:
    """Enable execution history tracking.

    Args:
        storage: Optional storage backend for history
        max_depth: Maximum history depth to track
    """
    import uuid

    from dataknobs_fsm.core.data_modes import DataHandlingMode

    # Get FSM name from the FSM object
    fsm_name = getattr(self.fsm, 'name', 'unnamed_fsm')

    # Generate a unique execution ID
    execution_id = str(uuid.uuid4())

    self._history = ExecutionHistory(
        fsm_name=fsm_name,
        execution_id=execution_id,
        data_mode=DataHandlingMode.COPY,  # Default data mode
        max_depth=max_depth
    )
    self._storage = storage
disable_history
disable_history() -> None

Disable history tracking.

Source code in packages/fsm/src/dataknobs_fsm/api/advanced.py
def disable_history(self) -> None:
    """Disable history tracking."""
    self._history = None
    self._storage = None
create_context
create_context(
    data: dict[str, Any] | Record,
    data_mode: DataHandlingMode = DataHandlingMode.COPY,
    initial_state: str | None = None,
) -> ExecutionContext

Create an execution context for manual control (synchronous).

Parameters:

Name Type Description Default
data dict[str, Any] | Record

Initial data

required
data_mode DataHandlingMode

Data handling mode

COPY
initial_state str | None

Starting state name

None

Returns:

Type Description
ExecutionContext

ExecutionContext for manual execution

Source code in packages/fsm/src/dataknobs_fsm/api/advanced.py
def create_context(
    self,
    data: dict[str, Any] | Record,
    data_mode: DataHandlingMode = DataHandlingMode.COPY,
    initial_state: str | None = None
) -> ExecutionContext:
    """Create an execution context for manual control (synchronous).

    Args:
        data: Initial data
        data_mode: Data handling mode
        initial_state: Starting state name

    Returns:
        ExecutionContext for manual execution
    """
    # Create context with appropriate data handling
    # Use SINGLE processing mode as default
    processing_mode = ProcessingMode.SINGLE

    context = ContextFactory.create_context(
        self.fsm,
        data,
        data_mode=processing_mode
    )

    # Set initial state if provided
    if initial_state:
        context.set_state(initial_state)
    else:
        # Find and set initial state using shared helper
        initial_state = self._find_initial_state()
        if initial_state:
            context.set_state(initial_state)

    # Update state instance using shared helper
    if context.current_state:
        self._update_state_instance(context, context.current_state)

    # Register custom functions if any
    if self._custom_functions:
        registry = getattr(self.fsm, 'function_registry', None)
        if registry is None:
            self.fsm.function_registry = {}
            registry = self.fsm.function_registry
        for name, func in self._custom_functions.items():
            if hasattr(registry, 'register'):
                registry.register(name, func)
            elif isinstance(registry, dict):
                registry[name] = func

    return context
execution_context async
execution_context(
    data: dict[str, Any] | Record,
    data_mode: DataHandlingMode = DataHandlingMode.COPY,
    initial_state: str | None = None,
) -> AsyncGenerator[ExecutionContext, None]

Create an execution context for manual control.

Parameters:

Name Type Description Default
data dict[str, Any] | Record

Initial data

required
data_mode DataHandlingMode

Data handling mode

COPY
initial_state str | None

Starting state name

None

Yields:

Type Description
AsyncGenerator[ExecutionContext, None]

ExecutionContext for manual execution

Source code in packages/fsm/src/dataknobs_fsm/api/advanced.py
@asynccontextmanager
async def execution_context(
    self,
    data: dict[str, Any] | Record,
    data_mode: DataHandlingMode = DataHandlingMode.COPY,
    initial_state: str | None = None
) -> AsyncGenerator[ExecutionContext, None]:
    """Create an execution context for manual control.

    Args:
        data: Initial data
        data_mode: Data handling mode
        initial_state: Starting state name

    Yields:
        ExecutionContext for manual execution
    """
    # Create context using factory
    context = ContextFactory.create_context(
        fsm=self.fsm,
        data=data,
        initial_state=initial_state,
        data_mode=ProcessingMode.SINGLE,
        resource_manager=self._resource_manager
    )

    # Set transaction manager if configured
    if self._transaction_manager:
        context.transaction_manager = self._transaction_manager  # type: ignore[unreachable]

    # Get the state instance for the hook
    state_instance = context.current_state_instance
    if not state_instance:
        # Create state instance if not set by factory
        state_instance = self.fsm.create_state_instance(
            context.current_state,  # type: ignore
            context.data.copy() if isinstance(context.data, dict) else {}
        )
        context.current_state_instance = state_instance

    # Call hook with StateInstance
    if self._hooks.on_state_enter:
        await self._hooks.on_state_enter(state_instance)

    try:
        yield context
    finally:
        # Cleanup
        if self._hooks.on_state_exit:
            await self._hooks.on_state_exit(state_instance)
        await self._resource_manager.cleanup()
step async
step(context: ExecutionContext, arc_name: str | None = None) -> StepResult

Execute a single transition step.

.. versionchanged:: Return type changed from StateInstance | None to StepResult. Use result.success and result.transition != "none" instead of checking None.

Parameters:

Name Type Description Default
context ExecutionContext

Execution context

required
arc_name str | None

Optional specific arc to follow

None

Returns:

Type Description
StepResult

StepResult with transition details

Source code in packages/fsm/src/dataknobs_fsm/api/advanced.py
async def step(
    self,
    context: ExecutionContext,
    arc_name: str | None = None
) -> StepResult:
    """Execute a single transition step.

    .. versionchanged::
        Return type changed from ``StateInstance | None`` to ``StepResult``.
        Use ``result.success`` and ``result.transition != "none"`` instead
        of checking ``None``.

    Args:
        context: Execution context
        arc_name: Optional specific arc to follow

    Returns:
        StepResult with transition details
    """
    return await self.execute_step_async(context, arc_name)
run_until_breakpoint async
run_until_breakpoint(
    context: ExecutionContext, max_steps: int = 1000
) -> StepResult | None

Run execution until a breakpoint is hit.

Parameters:

Name Type Description Default
context ExecutionContext

Execution context

required
max_steps int

Maximum steps to execute (safety limit)

1000

Returns:

Type Description
StepResult | None

StepResult where execution stopped, or None if no steps taken

Source code in packages/fsm/src/dataknobs_fsm/api/advanced.py
async def run_until_breakpoint(
    self,
    context: ExecutionContext,
    max_steps: int = 1000
) -> StepResult | None:
    """Run execution until a breakpoint is hit.

    Args:
        context: Execution context
        max_steps: Maximum steps to execute (safety limit)

    Returns:
        StepResult where execution stopped, or None if no steps taken
    """
    last_result: StepResult | None = None
    data_before = context.get_data_snapshot()
    for _ in range(max_steps):
        # Check if current state is a breakpoint before stepping
        if context.current_state in self._breakpoints:
            return StepResult(
                from_state=context.current_state or "unknown",
                to_state=context.current_state or "unknown",
                transition="none",
                data_before=data_before,
                data_after=context.get_data_snapshot(),
                success=True,
                at_breakpoint=True,
                is_complete=self._is_at_end_state(context),
            )

        # Step to next state
        result = await self.step(context)
        last_result = result
        data_before = context.get_data_snapshot()

        # Check if we reached an end state or no transition occurred
        if not result.success or result.is_complete or result.transition == "none":
            return result

    # Hit max steps limit
    return last_result
trace_execution async
trace_execution(
    data: dict[str, Any] | Record, initial_state: str | None = None
) -> list[dict[str, Any]]

Execute with full tracing enabled.

Parameters:

Name Type Description Default
data dict[str, Any] | Record

Input data

required
initial_state str | None

Optional starting state

None

Returns:

Type Description
list[dict[str, Any]]

List of trace entries

Source code in packages/fsm/src/dataknobs_fsm/api/advanced.py
async def trace_execution(
    self,
    data: dict[str, Any] | Record,
    initial_state: str | None = None
) -> list[dict[str, Any]]:
    """Execute with full tracing enabled.

    Args:
        data: Input data
        initial_state: Optional starting state

    Returns:
        List of trace entries
    """
    self.execution_mode = ExecutionMode.TRACE
    self._trace_buffer.clear()

    async with self.execution_context(data, initial_state=initial_state) as context:
        # Run to completion
        while True:
            result = await self.step(context)
            if not result.success or result.is_complete or result.transition == "none":
                break

    return self._trace_buffer
profile_execution async
profile_execution(
    data: dict[str, Any] | Record, initial_state: str | None = None
) -> dict[str, Any]

Execute with performance profiling.

Parameters:

Name Type Description Default
data dict[str, Any] | Record

Input data

required
initial_state str | None

Optional starting state

None

Returns:

Type Description
dict[str, Any]

Profiling data

Source code in packages/fsm/src/dataknobs_fsm/api/advanced.py
async def profile_execution(
    self,
    data: dict[str, Any] | Record,
    initial_state: str | None = None
) -> dict[str, Any]:
    """Execute with performance profiling.

    Args:
        data: Input data
        initial_state: Optional starting state

    Returns:
        Profiling data
    """
    self.execution_mode = ExecutionMode.PROFILE
    self._profile_data.clear()

    async with self.execution_context(data, initial_state=initial_state) as context:
        start_time = time.time()
        transitions = 0

        # Track per-state timing and per-transition durations
        state_times: dict[str, list[float]] = {}
        transition_times: list[float] = []
        state_start = time.time()

        while True:
            current_state_name = context.current_state or "unknown"

            # Step
            result = await self.step(context)

            # Record state timing
            state_duration = time.time() - state_start
            if current_state_name not in state_times:
                state_times[current_state_name] = []
            state_times[current_state_name].append(state_duration)
            transition_times.append(result.duration)

            if not result.success or result.is_complete or result.transition == "none":
                break

            transitions += 1
            state_start = time.time()

    total_time = time.time() - start_time

    # Compute statistics
    self._profile_data = self._compute_profile_stats(
        total_time, transitions, state_times, context,
        transition_times=transition_times,
    )

    return self._profile_data
get_available_transitions
get_available_transitions(state_name: str) -> list[dict[str, Any]]

Get available transitions from a state.

Parameters:

Name Type Description Default
state_name str

Name of state

required

Returns:

Type Description
list[dict[str, Any]]

List of available transition information

Source code in packages/fsm/src/dataknobs_fsm/api/advanced.py
def get_available_transitions(
    self,
    state_name: str
) -> list[dict[str, Any]]:
    """Get available transitions from a state.

    Args:
        state_name: Name of state

    Returns:
        List of available transition information
    """
    arcs = self.fsm.get_outgoing_arcs(state_name)
    return [
        {
            'name': arc.name,
            'target': arc.target_state,
            'has_pre_test': arc.pre_test is not None,
            'has_transform': arc.transform is not None
        }
        for arc in arcs
    ]
inspect_state
inspect_state(state_name: str) -> dict[str, Any]

Inspect a state's configuration.

Parameters:

Name Type Description Default
state_name str

Name of state to inspect

required

Returns:

Type Description
dict[str, Any]

State configuration details

Source code in packages/fsm/src/dataknobs_fsm/api/advanced.py
def inspect_state(self, state_name: str) -> dict[str, Any]:
    """Inspect a state's configuration.

    Args:
        state_name: Name of state to inspect

    Returns:
        State configuration details
    """
    state = self.fsm.get_state(state_name)
    if not state:
        return {'error': f'State {state_name} not found'}

    return {
        'name': state.name,
        'is_start': self.fsm.is_start_state(state_name),
        'is_end': self.fsm.is_end_state(state_name),
        'has_transform': len(state.transform_functions) > 0,
        'has_validator': len(state.validation_functions) > 0,
        'resources': [r.name for r in state.resource_requirements] if state.resource_requirements else [],
        'metadata': state.metadata,
        'arcs': state.arcs
    }
visualize_fsm
visualize_fsm() -> str

Generate a visual representation of the FSM.

Returns:

Type Description
str

GraphViz DOT format string

Source code in packages/fsm/src/dataknobs_fsm/api/advanced.py
def visualize_fsm(self) -> str:
    """Generate a visual representation of the FSM.

    Returns:
        GraphViz DOT format string
    """
    lines = ['digraph FSM {']
    lines.append('  rankdir=LR;')
    lines.append('  node [shape=circle];')

    # Add states
    for state in self.fsm.states.values():
        attrs = []
        if state.is_start:
            attrs.append('style=filled')
            attrs.append('fillcolor=green')
        elif state.is_end:
            attrs.append('shape=doublecircle')
            attrs.append('style=filled')
            attrs.append('fillcolor=red')

        if attrs:
            lines.append(f'  {state.name} [{",".join(attrs)}];')
        else:
            lines.append(f'  {state.name};')

    # Add arcs
    for state_name in self.fsm.states:
        for arc in self.fsm.get_outgoing_arcs(state_name):
            label = arc.name if arc.name else ""
            lines.append(f'  {state_name} -> {arc.target_state} [label="{label}"];')

    lines.append('}')
    return '\n'.join(lines)
validate_network async
validate_network() -> dict[str, Any]

Validate the FSM network for consistency.

Returns:

Type Description
dict[str, Any]

Validation results

Source code in packages/fsm/src/dataknobs_fsm/api/advanced.py
async def validate_network(self) -> dict[str, Any]:
    """Validate the FSM network for consistency.

    Returns:
        Validation results
    """
    issues = []

    # Check for unreachable states
    reachable = set()
    to_visit = [s.name for s in self.fsm.states.values() if s.is_start]

    while to_visit:
        state = to_visit.pop(0)
        if state in reachable:
            continue
        reachable.add(state)

        arcs = self.fsm.get_outgoing_arcs(state)
        for arc in arcs:
            if arc.target_state not in reachable:
                to_visit.append(arc.target_state)

    unreachable = set(self.fsm.states.keys()) - reachable
    if unreachable:
        issues.append({
            'type': 'unreachable_states',
            'states': list(unreachable)
        })

    # Check for dead ends (non-end states with no outgoing arcs)
    for state_name, state in self.fsm.states.items():
        if not state.is_end:
            arcs = self.fsm.get_outgoing_arcs(state_name)
            if not arcs:
                issues.append({
                    'type': 'dead_end',
                    'state': state_name
                })

    return {
        'valid': len(issues) == 0,
        'issues': issues,
        'stats': {
            'total_states': len(self.fsm.states),
            'reachable_states': len(reachable),
            'unreachable_states': len(unreachable),
            'start_states': sum(1 for s in self.fsm.states.values() if s.is_start),  # type: ignore
            'end_states': sum(1 for s in self.fsm.states.values() if s.is_end)  # type: ignore
        }
    }
get_history
get_history() -> ExecutionHistory | None

Get execution history if enabled.

Returns:

Type Description
ExecutionHistory | None

Execution history or None

Source code in packages/fsm/src/dataknobs_fsm/api/advanced.py
def get_history(self) -> ExecutionHistory | None:
    """Get execution history if enabled.

    Returns:
        Execution history or None
    """
    return self._history
save_history async
save_history() -> str | None

Save execution history to storage.

Returns:

Type Description
str | None

Storage ID for the saved history, or None if no storage

str | None

is configured or no history exists.

Source code in packages/fsm/src/dataknobs_fsm/api/advanced.py
async def save_history(self) -> str | None:
    """Save execution history to storage.

    Returns:
        Storage ID for the saved history, or None if no storage
        is configured or no history exists.
    """
    if self._history and self._storage:
        return await self._storage.save_history(self._history)
    return None
load_history async
load_history(history_id: str) -> bool

Load execution history from storage.

Parameters:

Name Type Description Default
history_id str

History identifier

required

Returns:

Type Description
bool

True if loaded successfully

Source code in packages/fsm/src/dataknobs_fsm/api/advanced.py
async def load_history(self, history_id: str) -> bool:
    """Load execution history from storage.

    Args:
        history_id: History identifier

    Returns:
        True if loaded successfully
    """
    if self._storage:
        history = await self._storage.load_history(history_id)
        if history:
            self._history = history
            return True
    return False
execute_step_sync
execute_step_sync(
    context: ExecutionContext, arc_name: str | None = None
) -> StepResult

Execute a single transition step synchronously.

Parameters:

Name Type Description Default
context ExecutionContext

Execution context

required
arc_name str | None

Optional specific arc to follow

None

Returns:

Type Description
StepResult

StepResult with transition details

Source code in packages/fsm/src/dataknobs_fsm/api/advanced.py
def execute_step_sync(
    self,
    context: ExecutionContext,
    arc_name: str | None = None
) -> StepResult:
    """Execute a single transition step synchronously.

    Args:
        context: Execution context
        arc_name: Optional specific arc to follow

    Returns:
        StepResult with transition details
    """
    start_time = time.time()
    from_state = context.current_state or "initial"
    data_before = context.get_data_snapshot()

    try:
        error_result = self._enter_initial_state_sync(context)
        if error_result is not None:
            return error_result
        from_state = context.current_state or from_state

        transitions = self._get_available_transitions(context, arc_name)
        if not transitions:
            return StepResult(
                from_state=from_state, to_state=from_state,
                transition="none", data_before=data_before,
                data_after=context.get_data_snapshot(),
                duration=time.time() - start_time, success=True,
                is_complete=self._is_at_end_state(context),
            )

        arc = transitions[0]
        success, result = self._execute_arc_transform(arc, context)
        if success:
            context.data = result
        else:
            return StepResult(
                from_state=from_state, to_state=from_state,
                transition=arc.name or "error", data_before=data_before,
                data_after=context.get_data_snapshot(),
                duration=time.time() - start_time,
                success=False, error=result,
            )

        at_breakpoint = self._apply_transition(from_state, arc, context)
        self._engine._execute_state_transforms(context, arc.target_state)
        self._record_transition(from_state, arc, context)
        self._call_hook_sync('on_state_exit', from_state)
        self._call_hook_sync('on_state_enter', arc.target_state)

        return StepResult(
            from_state=from_state, to_state=arc.target_state,
            transition=arc.name or f"{from_state}->{arc.target_state}",
            data_before=data_before,
            data_after=context.get_data_snapshot(),
            duration=time.time() - start_time, success=True,
            at_breakpoint=at_breakpoint,
            is_complete=self._is_at_end_state(context),
        )

    except Exception as e:
        self._call_hook_sync('on_error', e)
        return StepResult(
            from_state=from_state, to_state=from_state,
            transition="error", data_before=data_before,
            data_after=context.get_data_snapshot(),
            duration=time.time() - start_time,
            success=False, error=str(e),
        )
execute_step_async async
execute_step_async(
    context: ExecutionContext, arc_name: str | None = None
) -> StepResult

Execute a single transition step asynchronously.

Parameters:

Name Type Description Default
context ExecutionContext

Execution context

required
arc_name str | None

Optional specific arc to follow

None

Returns:

Type Description
StepResult

StepResult with transition details

Source code in packages/fsm/src/dataknobs_fsm/api/advanced.py
async def execute_step_async(
    self,
    context: ExecutionContext,
    arc_name: str | None = None
) -> StepResult:
    """Execute a single transition step asynchronously.

    Args:
        context: Execution context
        arc_name: Optional specific arc to follow

    Returns:
        StepResult with transition details
    """
    start_time = time.time()
    from_state = context.current_state or "initial"
    data_before = context.get_data_snapshot()

    try:
        error_result = await self._enter_initial_state_async(context)
        if error_result is not None:
            return error_result
        from_state = context.current_state or from_state

        transitions = await self._get_available_transitions_async(context, arc_name)
        if not transitions:
            return StepResult(
                from_state=from_state, to_state=from_state,
                transition="none", data_before=data_before,
                data_after=context.get_data_snapshot(),
                duration=time.time() - start_time, success=True,
                is_complete=self._is_at_end_state(context),
            )

        arc = transitions[0]
        success, result = await self._execute_arc_transform_async(arc, context)
        if success:
            context.data = result
        else:
            return StepResult(
                from_state=from_state, to_state=from_state,
                transition=arc.name or "error", data_before=data_before,
                data_after=context.get_data_snapshot(),
                duration=time.time() - start_time,
                success=False, error=result,
            )

        at_breakpoint = self._apply_transition(from_state, arc, context)
        await self._execute_state_transforms_async(context, arc.target_state)
        self._record_transition(from_state, arc, context)
        await self._call_hook_async('on_state_exit', from_state)
        await self._call_hook_async('on_state_enter', arc.target_state)

        return StepResult(
            from_state=from_state, to_state=arc.target_state,
            transition=arc.name or f"{from_state}->{arc.target_state}",
            data_before=data_before,
            data_after=context.get_data_snapshot(),
            duration=time.time() - start_time, success=True,
            at_breakpoint=at_breakpoint,
            is_complete=self._is_at_end_state(context),
        )

    except Exception as e:
        await self._call_hook_async('on_error', e)
        return StepResult(
            from_state=from_state, to_state=from_state,
            transition="error", data_before=data_before,
            data_after=context.get_data_snapshot(),
            duration=time.time() - start_time,
            success=False, error=str(e),
        )
run_until_breakpoint_sync
run_until_breakpoint_sync(
    context: ExecutionContext, max_steps: int = 1000
) -> StepResult | None

Run execution until a breakpoint is hit (synchronous).

Parameters:

Name Type Description Default
context ExecutionContext

Execution context

required
max_steps int

Maximum steps to execute

1000

Returns:

Type Description
StepResult | None

StepResult where execution stopped, or None if no steps taken

Source code in packages/fsm/src/dataknobs_fsm/api/advanced.py
def run_until_breakpoint_sync(
    self,
    context: ExecutionContext,
    max_steps: int = 1000
) -> StepResult | None:
    """Run execution until a breakpoint is hit (synchronous).

    Args:
        context: Execution context
        max_steps: Maximum steps to execute

    Returns:
        StepResult where execution stopped, or None if no steps taken
    """
    last_result: StepResult | None = None
    data_before = context.get_data_snapshot()
    for _ in range(max_steps):
        if context.current_state in self._breakpoints:
            return StepResult(
                from_state=context.current_state or "unknown",
                to_state=context.current_state or "unknown",
                transition="none",
                data_before=data_before,
                data_after=context.get_data_snapshot(),
                success=True,
                at_breakpoint=True,
                is_complete=self._is_at_end_state(context),
            )

        result = self.execute_step_sync(context)
        last_result = result
        data_before = context.get_data_snapshot()

        if not result.success or result.is_complete or result.transition == "none":
            return result

    return last_result
trace_execution_sync
trace_execution_sync(
    data: dict[str, Any] | Record,
    initial_state: str | None = None,
    max_steps: int = 1000,
) -> list[dict[str, Any]]

Execute with full tracing enabled (synchronous).

Parameters:

Name Type Description Default
data dict[str, Any] | Record

Input data

required
initial_state str | None

Optional starting state

None
max_steps int

Maximum steps to execute

1000

Returns:

Type Description
list[dict[str, Any]]

List of trace entries

Source code in packages/fsm/src/dataknobs_fsm/api/advanced.py
def trace_execution_sync(
    self,
    data: dict[str, Any] | Record,
    initial_state: str | None = None,
    max_steps: int = 1000
) -> list[dict[str, Any]]:
    """Execute with full tracing enabled (synchronous).

    Args:
        data: Input data
        initial_state: Optional starting state
        max_steps: Maximum steps to execute

    Returns:
        List of trace entries
    """
    self.execution_mode = ExecutionMode.TRACE
    self._trace_buffer.clear()

    context = self.create_context(data, initial_state=initial_state)

    for _ in range(max_steps):
        # Execute step (trace recording happens inside execute_step_sync)
        result = self.execute_step_sync(context)

        # Check termination conditions
        if not result.success or result.is_complete:
            break

        if result.from_state == result.to_state and result.transition == "none":
            break

    return self._trace_buffer
profile_execution_sync
profile_execution_sync(
    data: dict[str, Any] | Record,
    initial_state: str | None = None,
    max_steps: int = 1000,
) -> dict[str, Any]

Execute with performance profiling (synchronous).

Parameters:

Name Type Description Default
data dict[str, Any] | Record

Input data

required
initial_state str | None

Optional starting state

None
max_steps int

Maximum steps to execute

1000

Returns:

Type Description
dict[str, Any]

Profiling data

Source code in packages/fsm/src/dataknobs_fsm/api/advanced.py
def profile_execution_sync(
    self,
    data: dict[str, Any] | Record,
    initial_state: str | None = None,
    max_steps: int = 1000
) -> dict[str, Any]:
    """Execute with performance profiling (synchronous).

    Args:
        data: Input data
        initial_state: Optional starting state
        max_steps: Maximum steps to execute

    Returns:
        Profiling data
    """
    self.execution_mode = ExecutionMode.PROFILE
    self._profile_data.clear()

    context = self.create_context(data, initial_state=initial_state)

    start_time = time.time()
    transitions = 0
    state_times = {}
    transition_times = []

    for _ in range(max_steps):
        state_start = time.time()
        current_state = context.current_state

        # Execute step
        result = self.execute_step_sync(context)

        # Record timings
        if current_state:
            if current_state not in state_times:
                state_times[current_state] = []
            state_times[current_state].append(time.time() - state_start)

        if result.success and result.from_state != result.to_state:
            transition_times.append(result.duration)
            transitions += 1

        # Check termination
        if not result.success or result.is_complete:
            break

        if result.from_state == result.to_state and result.transition == "none":
            break

    total_time = time.time() - start_time
    self._profile_data = self._compute_profile_stats(
        total_time, transitions, state_times, context,
        transition_times=transition_times,
    )

    return self._profile_data

ExecutionMode

Bases: Enum

Advanced execution modes.

ExecutionHook dataclass

ExecutionHook(
    on_state_enter: Callable | None = None,
    on_state_exit: Callable | None = None,
    on_arc_execute: Callable | None = None,
    on_error: Callable | None = None,
    on_resource_acquire: Callable | None = None,
    on_resource_release: Callable | None = None,
    on_transaction_begin: Callable | None = None,
    on_transaction_commit: Callable | None = None,
    on_transaction_rollback: Callable | None = None,
)

Hook for monitoring execution events.

StepResult dataclass

StepResult(
    from_state: str,
    to_state: str,
    transition: str,
    data_before: dict[str, Any] = dict(),
    data_after: dict[str, Any] = dict(),
    duration: float = 0.0,
    success: bool = True,
    error: str | None = None,
    at_breakpoint: bool = False,
    is_complete: bool = False,
)

Result from a single step execution.

FSMDebugger

FSMDebugger(fsm: AdvancedFSM)

Interactive debugger for FSM execution (fully synchronous).

Initialize debugger.

Parameters:

Name Type Description Default
fsm AdvancedFSM

Advanced FSM instance to debug

required

Methods:

Name Description
start

Start debugging session (synchronous).

step

Execute single step and return detailed result.

continue_to_breakpoint

Continue execution until a breakpoint is hit.

inspect

Inspect data at path.

watch

Add a watch expression.

unwatch

Remove a watch expression.

print_watches

Print all watch values.

print_state

Print current state information.

inspect_current_state

Get detailed information about current state.

get_history

Get recent execution history.

reset

Reset debugger with new data.

Attributes:

Name Type Description
current_state str | None

Get the current state name.

watches dict[str, Any]

Get current watch variable values.

Source code in packages/fsm/src/dataknobs_fsm/api/advanced.py
def __init__(self, fsm: AdvancedFSM):
    """Initialize debugger.

    Args:
        fsm: Advanced FSM instance to debug
    """
    self.fsm = fsm
    self.context: ExecutionContext | None = None
    self.watch_vars: dict[str, Any] = {}
    self.command_history: list[str] = []
    self.step_count: int = 0
    self.execution_history: list[StepResult] = []
Attributes
current_state property
current_state: str | None

Get the current state name.

watches property
watches: dict[str, Any]

Get current watch variable values.

Functions
start
start(data: dict[str, Any] | Record, initial_state: str | None = None) -> None

Start debugging session (synchronous).

Parameters:

Name Type Description Default
data dict[str, Any] | Record

Initial data

required
initial_state str | None

Optional starting state

None
Source code in packages/fsm/src/dataknobs_fsm/api/advanced.py
def start(
    self,
    data: dict[str, Any] | Record,
    initial_state: str | None = None
) -> None:
    """Start debugging session (synchronous).

    Args:
        data: Initial data
        initial_state: Optional starting state
    """
    self.context = self.fsm.create_context(data, initial_state=initial_state)
    self.step_count = 0
    self.execution_history.clear()

    print(f"Debugger started at state: {self.context.current_state or 'initial'}")
    print(f"Data: {self.context.get_data_snapshot()}")
step
step() -> StepResult

Execute single step and return detailed result.

Returns:

Type Description
StepResult

StepResult with transition details

Source code in packages/fsm/src/dataknobs_fsm/api/advanced.py
def step(self) -> StepResult:
    """Execute single step and return detailed result.

    Returns:
        StepResult with transition details
    """
    if not self.context:
        print("No active debugging session. Call start() first.")
        return StepResult(
            from_state="none",
            to_state="none",
            transition="error",
            success=False,
            error="No active debugging session"
        )

    result = self.fsm.execute_step_sync(self.context)
    self.step_count += 1
    self.execution_history.append(result)

    # Print step information
    if result.success:
        if result.from_state == result.to_state and result.transition == "none":
            print(f"Step {self.step_count}: No transition available from '{result.from_state}'")
        else:
            print(f"Step {self.step_count}: {result.from_state} -> {result.to_state} via '{result.transition}'")

        if result.at_breakpoint:
            print("*** Hit breakpoint ***")

        if result.is_complete:
            print("*** Reached end state ***")
    else:
        print(f"Step {self.step_count}: Error - {result.error}")

    # Check watches
    self._check_watches()

    return result
continue_to_breakpoint
continue_to_breakpoint(max_steps: int = 1000) -> StepResult | None

Continue execution until a breakpoint is hit.

Steps are recorded in execution_history and counted in step_count, keeping the debugger state consistent with the actual execution path.

Parameters:

Name Type Description Default
max_steps int

Safety limit to prevent infinite loops.

1000

Returns:

Type Description
StepResult | None

StepResult where execution stopped, or None if no session

Source code in packages/fsm/src/dataknobs_fsm/api/advanced.py
def continue_to_breakpoint(self, max_steps: int = 1000) -> StepResult | None:
    """Continue execution until a breakpoint is hit.

    Steps are recorded in ``execution_history`` and counted in
    ``step_count``, keeping the debugger state consistent with
    the actual execution path.

    Args:
        max_steps: Safety limit to prevent infinite loops.

    Returns:
        StepResult where execution stopped, or None if no session
    """
    if not self.context:
        print("No active debugging session")
        return None

    print(f"Continuing from state: {self.context.current_state}")
    last_result: StepResult | None = None
    for _ in range(max_steps):
        if self.context.current_state in self.fsm._breakpoints:
            print(f"Stopped at: {self.context.current_state}")
            print("*** At breakpoint ***")
            if last_result is not None:
                return last_result
            # Already at breakpoint before any step — synthesize result
            return StepResult(
                from_state=self.context.current_state or "unknown",
                to_state=self.context.current_state or "unknown",
                transition="none",
                data_before=self.context.get_data_snapshot(),
                data_after=self.context.get_data_snapshot(),
                success=True,
                at_breakpoint=True,
                is_complete=self.fsm._is_at_end_state(self.context),
            )

        result = self.step()
        last_result = result

        if not result.success or result.is_complete or result.transition == "none":
            return result

    return last_result
inspect
inspect(path: str = '') -> Any

Inspect data at path.

Parameters:

Name Type Description Default
path str

Dot-separated path to data field (empty for all data)

''

Returns:

Type Description
Any

Value at path

Source code in packages/fsm/src/dataknobs_fsm/api/advanced.py
def inspect(self, path: str = "") -> Any:
    """Inspect data at path.

    Args:
        path: Dot-separated path to data field (empty for all data)

    Returns:
        Value at path
    """
    if not self.context:
        print("No active debugging session")
        return None

    data = self.context.data

    if not path:
        return data

    # Navigate path
    for key in path.split('.'):
        if isinstance(data, dict):
            data = data.get(key)
        elif hasattr(data, key):
            data = getattr(data, key)
        else:
            return None
    return data
watch
watch(name: str, path: str) -> None

Add a watch expression.

Parameters:

Name Type Description Default
name str

Watch name

required
path str

Data path to watch

required
Source code in packages/fsm/src/dataknobs_fsm/api/advanced.py
def watch(self, name: str, path: str) -> None:
    """Add a watch expression.

    Args:
        name: Watch name
        path: Data path to watch
    """
    self.watch_vars[name] = path
    value = self.inspect(path)
    print(f"Watch '{name}' added: {path} = {value}")
unwatch
unwatch(name: str) -> None

Remove a watch expression.

Parameters:

Name Type Description Default
name str

Watch name to remove

required
Source code in packages/fsm/src/dataknobs_fsm/api/advanced.py
def unwatch(self, name: str) -> None:
    """Remove a watch expression.

    Args:
        name: Watch name to remove
    """
    if name in self.watch_vars:
        del self.watch_vars[name]
        print(f"Watch '{name}' removed")
print_watches
print_watches() -> None

Print all watch values.

Source code in packages/fsm/src/dataknobs_fsm/api/advanced.py
def print_watches(self) -> None:
    """Print all watch values."""
    if not self.watch_vars:
        print("No watches set")
        return

    for name, path in self.watch_vars.items():
        value = self.inspect(path)
        print(f"{name}: {path} = {value}")
print_state
print_state() -> None

Print current state information.

Source code in packages/fsm/src/dataknobs_fsm/api/advanced.py
def print_state(self) -> None:
    """Print current state information."""
    if not self.context:
        print("No active debugging session")
        return

    print("\n=== State Information ===")
    print(f"Current State: {self.context.current_state}")
    print(f"Previous State: {self.context.previous_state}")
    print(f"Is Complete: {self.context.is_complete()}")
    print("\nData:")
    data = self.context.get_data_snapshot()
    for key, value in data.items():
        print(f"  {key}: {value}")

    # Print available transitions
    transitions = self.fsm._get_available_transitions(self.context)
    if transitions:
        print("\nAvailable Transitions:")
        for arc in transitions:
            print(f"  - {arc.name or 'unnamed'} -> {arc.target_state}")
    else:
        if self.context.is_complete():
            print("\nNo transitions (end state)")
        else:
            print("\nNo available transitions")
inspect_current_state
inspect_current_state() -> dict[str, Any]

Get detailed information about current state.

Returns:

Type Description
dict[str, Any]

Dictionary with state details

Source code in packages/fsm/src/dataknobs_fsm/api/advanced.py
def inspect_current_state(self) -> dict[str, Any]:
    """Get detailed information about current state.

    Returns:
        Dictionary with state details
    """
    if not self.context:
        return {"error": "No active debugging session"}

    return {
        'state': self.context.current_state,
        'previous_state': self.context.previous_state,
        'data': self.context.get_data_snapshot(),
        'is_complete': self.context.is_complete(),
        'step_count': self.step_count,
        'at_breakpoint': self.context.current_state in self.fsm._breakpoints,
        'available_transitions': [
            {'name': arc.name, 'target': arc.target_state}
            for arc in self.fsm._get_available_transitions(self.context)
        ]
    }
get_history
get_history(limit: int = 10) -> list[StepResult]

Get recent execution history.

Parameters:

Name Type Description Default
limit int

Maximum number of steps to return

10

Returns:

Type Description
list[StepResult]

List of recent step results

Source code in packages/fsm/src/dataknobs_fsm/api/advanced.py
def get_history(self, limit: int = 10) -> list[StepResult]:
    """Get recent execution history.

    Args:
        limit: Maximum number of steps to return

    Returns:
        List of recent step results
    """
    return self.execution_history[-limit:]
reset
reset(data: dict[str, Any] | Record | None = None) -> None

Reset debugger with new data.

Parameters:

Name Type Description Default
data dict[str, Any] | Record | None

New data (uses current data if None)

None
Source code in packages/fsm/src/dataknobs_fsm/api/advanced.py
def reset(self, data: dict[str, Any] | Record | None = None) -> None:
    """Reset debugger with new data.

    Args:
        data: New data (uses current data if None)
    """
    if data is None and self.context:
        data = self.context.data

    if data is None:
        print("No data available for reset")
        return

    self.start(data)

ExecutionContext

ExecutionContext(
    data_mode: ProcessingMode = ProcessingMode.SINGLE,
    transaction_mode: TransactionMode = TransactionMode.NONE,
    resources: Dict[str, Any] | None = None,
    database: Union[SyncDatabase, AsyncDatabase] | None = None,
    stream_context: StreamContext | None = None,
)

Execution context for FSM processing with full mode support.

This context manages: - Data mode configuration (single, batch, stream) - Transaction management - Resource allocation and tracking - Stream coordination - State tracking and network stack - Parallel execution paths

Initialize execution context.

Parameters:

Name Type Description Default
data_mode ProcessingMode

Data processing mode.

SINGLE
transaction_mode TransactionMode

Transaction handling mode.

NONE
resources Dict[str, Any] | None

Initial resource configurations.

None
database Union[SyncDatabase, AsyncDatabase] | None

Database connection for transactions.

None
stream_context StreamContext | None

Stream context for stream mode.

None

Methods:

Name Description
push_network

Push a network onto the execution stack.

pop_network

Pop a network from the execution stack.

set_state

Set the current state.

allocate_resource

Allocate a resource.

release_resource

Release an allocated resource.

start_transaction

Start a new transaction.

commit_transaction

Commit the current transaction.

rollback_transaction

Rollback the current transaction.

log_operation

Log an operation in the current transaction.

set_stream_chunk

Set the current stream chunk for processing.

add_batch_item

Add an item to the batch.

add_batch_result

Add a result to batch results.

add_batch_error

Add an error to batch errors.

create_child_context

Create a child context for parallel execution.

merge_child_context

Merge a child context back into parent.

get_resource_usage

Get current resource usage statistics.

get_performance_stats

Get performance statistics.

get_complete_path

Get the complete state traversal path including current state.

clone

Create a clone of this context.

is_complete

Check if the FSM execution has reached an end state.

get_current_state

Get the name of the current state.

get_data_snapshot

Get a snapshot of the current data.

get_execution_stats

Get execution statistics.

get_current_state_instance

Get the current state instance object.

Source code in packages/fsm/src/dataknobs_fsm/execution/context.py
def __init__(
    self,
    data_mode: ProcessingMode = ProcessingMode.SINGLE,
    transaction_mode: TransactionMode = TransactionMode.NONE,
    resources: Dict[str, Any] | None = None,
    database: Union[SyncDatabase, AsyncDatabase] | None = None,
    stream_context: StreamContext | None = None
):
    """Initialize execution context.

    Args:
        data_mode: Data processing mode.
        transaction_mode: Transaction handling mode.
        resources: Initial resource configurations.
        database: Database connection for transactions.
        stream_context: Stream context for stream mode.
    """
    # Mode configuration
    self.data_mode = data_mode
    self.transaction_mode = transaction_mode

    # State tracking
    self.current_state: str | None = None
    self.previous_state: str | None = None
    self.network_stack: List[Tuple[str, str | None]] = []
    self.state_history: List[str] = []

    # Data management
    self.data: Any = None
    self.metadata: Dict[str, Any] = {}
    self.variables: Dict[str, Any] = {}

    # Resource management
    self.resources: Dict[str, ResourceAllocation] = {}
    self.resource_limits: Dict[str, Any] = resources or {}
    self.resource_manager: Any = None  # ResourceManager instance
    self.current_state_resources: Dict[str, Any] = {}  # Resources allocated to current state
    self.parent_state_resources: Dict[str, Any] = {}  # Resources from parent state (in subnetworks)

    # Transaction management
    self.database = database
    self.current_transaction: TransactionInfo | None = None
    self.transaction_history: List[TransactionInfo] = []

    # Stream coordination
    self.stream_context = stream_context
    self.current_chunk: StreamChunk | None = None
    self.processed_chunks: int = 0

    # Batch processing
    self.batch_data: List[Any] = []
    self.batch_results: List[Any] = []
    self.batch_errors: List[Tuple[int, Exception]] = []

    # Parallel execution
    self.parallel_paths: Dict[str, ExecutionContext] = {}
    self.is_child_context: bool = False
    self.parent_context: ExecutionContext | None = None

    # Performance tracking
    self.start_time: float = time.time()
    self.state_timings: Dict[str, float] = {}
    self.function_call_count: Dict[str, int] = {}

    # State instance tracking for debugging
    self.current_state_instance: Any = None

    # Context factory — optional callable that transforms FunctionContext
    # into an application-specific context object for transform functions.
    # When set, _create_function_context() calls this with the built
    # FunctionContext and returns the factory's result instead.
    self.transform_context_factory: Callable[..., Any] | None = None

    # Idempotency flag — tracks whether initial-state transforms have
    # already been executed, preventing duplicate execution when the
    # AdvancedFSM sync/async entry points call _execute_state_transforms
    # directly (bypassing the engine's own enter_state logic).
    self._initial_transforms_executed: bool = False
Functions
push_network
push_network(network_name: str, return_state: str | None = None) -> None

Push a network onto the execution stack.

Parameters:

Name Type Description Default
network_name str

Name of network to push.

required
return_state str | None

State to return to after network completes.

None
Source code in packages/fsm/src/dataknobs_fsm/execution/context.py
def push_network(self, network_name: str, return_state: str | None = None) -> None:
    """Push a network onto the execution stack.

    Args:
        network_name: Name of network to push.
        return_state: State to return to after network completes.
    """
    self.network_stack.append((network_name, return_state))
pop_network
pop_network() -> Tuple[str, str | None]

Pop a network from the execution stack.

Returns:

Type Description
Tuple[str, str | None]

Tuple of (network_name, return_state).

Source code in packages/fsm/src/dataknobs_fsm/execution/context.py
def pop_network(self) -> Tuple[str, str | None]:
    """Pop a network from the execution stack.

    Returns:
        Tuple of (network_name, return_state).
    """
    if self.network_stack:
        return self.network_stack.pop()
    return ("", None)
set_state
set_state(state_name: str) -> None

Set the current state.

Parameters:

Name Type Description Default
state_name str

Name of the new state.

required
Source code in packages/fsm/src/dataknobs_fsm/execution/context.py
def set_state(self, state_name: str) -> None:
    """Set the current state.

    Args:
        state_name: Name of the new state.
    """
    if self.current_state:
        self.previous_state = self.current_state
        self.state_history.append(self.current_state)
    self.current_state = state_name
    self.state_timings[state_name] = time.time()
allocate_resource
allocate_resource(
    resource_type: str, resource_id: str, metadata: Dict[str, Any] | None = None
) -> bool

Allocate a resource.

Parameters:

Name Type Description Default
resource_type str

Type of resource.

required
resource_id str

Unique resource identifier.

required
metadata Dict[str, Any] | None

Optional resource metadata.

None

Returns:

Type Description
bool

True if allocation successful.

Source code in packages/fsm/src/dataknobs_fsm/execution/context.py
def allocate_resource(
    self,
    resource_type: str,
    resource_id: str,
    metadata: Dict[str, Any] | None = None
) -> bool:
    """Allocate a resource.

    Args:
        resource_type: Type of resource.
        resource_id: Unique resource identifier.
        metadata: Optional resource metadata.

    Returns:
        True if allocation successful.
    """
    key = f"{resource_type}:{resource_id}"

    if key in self.resources:
        if self.resources[key].status != ResourceStatus.AVAILABLE:
            return False

    self.resources[key] = ResourceAllocation(
        resource_type=resource_type,
        resource_id=resource_id,
        status=ResourceStatus.ALLOCATED,
        allocated_at=time.time(),
        metadata=metadata or {}
    )
    return True
release_resource
release_resource(resource_type: str, resource_id: str) -> bool

Release an allocated resource.

Parameters:

Name Type Description Default
resource_type str

Type of resource.

required
resource_id str

Resource identifier.

required

Returns:

Type Description
bool

True if release successful.

Source code in packages/fsm/src/dataknobs_fsm/execution/context.py
def release_resource(self, resource_type: str, resource_id: str) -> bool:
    """Release an allocated resource.

    Args:
        resource_type: Type of resource.
        resource_id: Resource identifier.

    Returns:
        True if release successful.
    """
    key = f"{resource_type}:{resource_id}"

    if key in self.resources:
        self.resources[key].status = ResourceStatus.AVAILABLE
        return True
    return False
start_transaction
start_transaction(transaction_id: str | None = None) -> bool

Start a new transaction.

Parameters:

Name Type Description Default
transaction_id str | None

Optional transaction ID.

None

Returns:

Type Description
bool

True if transaction started.

Source code in packages/fsm/src/dataknobs_fsm/execution/context.py
def start_transaction(self, transaction_id: str | None = None) -> bool:
    """Start a new transaction.

    Args:
        transaction_id: Optional transaction ID.

    Returns:
        True if transaction started.
    """
    if self.transaction_mode == TransactionMode.NONE:
        return False

    if self.current_transaction and not self.current_transaction.is_committed:
        return False

    self.current_transaction = TransactionInfo(
        transaction_id=transaction_id or str(time.time()),
        mode=self.transaction_mode,
        started_at=time.time()
    )

    # Start database transaction if available
    if self.database and hasattr(self.database, 'begin_transaction'):
        self.database.begin_transaction()

    return True
commit_transaction
commit_transaction() -> bool

Commit the current transaction.

Returns:

Type Description
bool

True if commit successful.

Source code in packages/fsm/src/dataknobs_fsm/execution/context.py
def commit_transaction(self) -> bool:
    """Commit the current transaction.

    Returns:
        True if commit successful.
    """
    if not self.current_transaction:
        return False

    # Commit database transaction
    if self.database and hasattr(self.database, 'commit'):
        self.database.commit()

    self.current_transaction.is_committed = True
    self.transaction_history.append(self.current_transaction)
    self.current_transaction = None

    return True
rollback_transaction
rollback_transaction() -> bool

Rollback the current transaction.

Returns:

Type Description
bool

True if rollback successful.

Source code in packages/fsm/src/dataknobs_fsm/execution/context.py
def rollback_transaction(self) -> bool:
    """Rollback the current transaction.

    Returns:
        True if rollback successful.
    """
    if not self.current_transaction:
        return False

    # Rollback database transaction
    if self.database and hasattr(self.database, 'rollback'):
        self.database.rollback()

    self.current_transaction.is_rolled_back = True
    self.transaction_history.append(self.current_transaction)
    self.current_transaction = None

    return True
log_operation
log_operation(operation: str, details: Dict[str, Any]) -> None

Log an operation in the current transaction.

Parameters:

Name Type Description Default
operation str

Operation name.

required
details Dict[str, Any]

Operation details.

required
Source code in packages/fsm/src/dataknobs_fsm/execution/context.py
def log_operation(self, operation: str, details: Dict[str, Any]) -> None:
    """Log an operation in the current transaction.

    Args:
        operation: Operation name.
        details: Operation details.
    """
    if self.current_transaction:
        self.current_transaction.operations.append({
            'operation': operation,
            'details': details,
            'timestamp': time.time()
        })
set_stream_chunk
set_stream_chunk(chunk: StreamChunk) -> None

Set the current stream chunk for processing.

Parameters:

Name Type Description Default
chunk StreamChunk

Stream chunk to process.

required
Source code in packages/fsm/src/dataknobs_fsm/execution/context.py
def set_stream_chunk(self, chunk: StreamChunk) -> None:
    """Set the current stream chunk for processing.

    Args:
        chunk: Stream chunk to process.
    """
    self.current_chunk = chunk
    self.processed_chunks += 1
add_batch_item
add_batch_item(item: Any) -> None

Add an item to the batch.

Parameters:

Name Type Description Default
item Any

Item to add to batch.

required
Source code in packages/fsm/src/dataknobs_fsm/execution/context.py
def add_batch_item(self, item: Any) -> None:
    """Add an item to the batch.

    Args:
        item: Item to add to batch.
    """
    if self.data_mode == ProcessingMode.BATCH:
        self.batch_data.append(item)
add_batch_result
add_batch_result(result: Any) -> None

Add a result to batch results.

Parameters:

Name Type Description Default
result Any

Processing result.

required
Source code in packages/fsm/src/dataknobs_fsm/execution/context.py
def add_batch_result(self, result: Any) -> None:
    """Add a result to batch results.

    Args:
        result: Processing result.
    """
    if self.data_mode == ProcessingMode.BATCH:
        self.batch_results.append(result)
add_batch_error
add_batch_error(index: int, error: Exception) -> None

Add an error to batch errors.

Parameters:

Name Type Description Default
index int

Batch item index.

required
error Exception

Error that occurred.

required
Source code in packages/fsm/src/dataknobs_fsm/execution/context.py
def add_batch_error(self, index: int, error: Exception) -> None:
    """Add an error to batch errors.

    Args:
        index: Batch item index.
        error: Error that occurred.
    """
    if self.data_mode == ProcessingMode.BATCH:
        self.batch_errors.append((index, error))
create_child_context
create_child_context(path_id: str) -> ExecutionContext

Create a child context for parallel execution.

Parameters:

Name Type Description Default
path_id str

Unique identifier for the execution path.

required

Returns:

Type Description
ExecutionContext

New child execution context.

Source code in packages/fsm/src/dataknobs_fsm/execution/context.py
def create_child_context(self, path_id: str) -> 'ExecutionContext':
    """Create a child context for parallel execution.

    Args:
        path_id: Unique identifier for the execution path.

    Returns:
        New child execution context.
    """
    child = ExecutionContext(
        data_mode=self.data_mode,
        transaction_mode=self.transaction_mode,
        resources=self.resource_limits.copy(),
        database=self.database,
        stream_context=self.stream_context
    )

    child.is_child_context = True
    child.parent_context = self
    child.variables = self.variables.copy()

    self.parallel_paths[path_id] = child
    return child
merge_child_context
merge_child_context(path_id: str) -> bool

Merge a child context back into parent.

Parameters:

Name Type Description Default
path_id str

Path identifier to merge.

required

Returns:

Type Description
bool

True if merge successful.

Source code in packages/fsm/src/dataknobs_fsm/execution/context.py
def merge_child_context(self, path_id: str) -> bool:
    """Merge a child context back into parent.

    Args:
        path_id: Path identifier to merge.

    Returns:
        True if merge successful.
    """
    if path_id not in self.parallel_paths:
        return False

    child = self.parallel_paths[path_id]

    # Merge results
    if self.data_mode == ProcessingMode.BATCH:
        self.batch_results.extend(child.batch_results)
        self.batch_errors.extend(child.batch_errors)

    # Merge metadata
    self.metadata.update(child.metadata)

    # Update function call counts
    for func, count in child.function_call_count.items():
        self.function_call_count[func] = self.function_call_count.get(func, 0) + count

    del self.parallel_paths[path_id]
    return True
get_resource_usage
get_resource_usage() -> Dict[str, Any]

Get current resource usage statistics.

Returns:

Type Description
Dict[str, Any]

Resource usage information.

Source code in packages/fsm/src/dataknobs_fsm/execution/context.py
def get_resource_usage(self) -> Dict[str, Any]:
    """Get current resource usage statistics.

    Returns:
        Resource usage information.
    """
    allocated = sum(1 for r in self.resources.values() 
                   if r.status == ResourceStatus.ALLOCATED)
    busy = sum(1 for r in self.resources.values() 
              if r.status == ResourceStatus.BUSY)

    return {
        'total_resources': len(self.resources),
        'allocated': allocated,
        'busy': busy,
        'available': len(self.resources) - allocated - busy,
        'by_type': self._group_resources_by_type()
    }
get_performance_stats
get_performance_stats() -> Dict[str, Any]

Get performance statistics.

Returns:

Type Description
Dict[str, Any]

Performance statistics.

Source code in packages/fsm/src/dataknobs_fsm/execution/context.py
def get_performance_stats(self) -> Dict[str, Any]:
    """Get performance statistics.

    Returns:
        Performance statistics.
    """
    elapsed_time = time.time() - self.start_time

    return {
        'elapsed_time': elapsed_time,
        'states_visited': len(self.state_history),
        'current_state': self.current_state,
        'transactions': len(self.transaction_history),
        'chunks_processed': self.processed_chunks,
        'batch_items': len(self.batch_data),
        'batch_results': len(self.batch_results),
        'batch_errors': len(self.batch_errors),
        'function_calls': dict(self.function_call_count),
        'parallel_paths': len(self.parallel_paths)
    }
get_complete_path
get_complete_path() -> List[str]

Get the complete state traversal path including current state.

Returns:

Type Description
List[str]

List of state names in traversal order.

Source code in packages/fsm/src/dataknobs_fsm/execution/context.py
def get_complete_path(self) -> List[str]:
    """Get the complete state traversal path including current state.

    Returns:
        List of state names in traversal order.
    """
    path = self.state_history.copy() if self.state_history else []

    # Add current state if not already in path and if it exists
    if self.current_state and (not path or path[-1] != self.current_state):
        path.append(self.current_state)

    return path
clone
clone() -> ExecutionContext

Create a clone of this context.

Returns:

Type Description
ExecutionContext

Cloned execution context.

Source code in packages/fsm/src/dataknobs_fsm/execution/context.py
def clone(self) -> 'ExecutionContext':
    """Create a clone of this context.

    Returns:
        Cloned execution context.
    """
    clone = ExecutionContext(
        data_mode=self.data_mode,
        transaction_mode=self.transaction_mode,
        resources=self.resource_limits.copy(),
        database=self.database,
        stream_context=self.stream_context
    )

    clone.current_state = self.current_state
    clone.previous_state = self.previous_state
    clone.state_history = self.state_history.copy()
    clone.data = self.data
    clone.metadata = self.metadata.copy()
    clone.variables = self.variables.copy()
    clone._initial_transforms_executed = self._initial_transforms_executed

    return clone
is_complete
is_complete() -> bool

Check if the FSM execution has reached an end state.

Returns:

Type Description
bool

True if in an end state or no current state, False otherwise.

Source code in packages/fsm/src/dataknobs_fsm/execution/context.py
def is_complete(self) -> bool:
    """Check if the FSM execution has reached an end state.

    Returns:
        True if in an end state or no current state, False otherwise.
    """
    if not self.current_state:
        return True

    # Check if current state is marked as ended
    return self.metadata.get('is_end_state', False)
get_current_state
get_current_state() -> str | None

Get the name of the current state.

Returns:

Type Description
str | None

Current state name or None if not set.

Source code in packages/fsm/src/dataknobs_fsm/execution/context.py
def get_current_state(self) -> str | None:
    """Get the name of the current state.

    Returns:
        Current state name or None if not set.
    """
    return self.current_state
get_data_snapshot
get_data_snapshot() -> Dict[str, Any]

Get a snapshot of the current data.

Returns:

Type Description
Dict[str, Any]

Copy of the current data dictionary.

Source code in packages/fsm/src/dataknobs_fsm/execution/context.py
def get_data_snapshot(self) -> Dict[str, Any]:
    """Get a snapshot of the current data.

    Returns:
        Copy of the current data dictionary.
    """
    if isinstance(self.data, dict):
        return self.data.copy()
    elif hasattr(self.data, '__dict__'):
        return vars(self.data).copy()
    else:
        return {'value': self.data}
get_execution_stats
get_execution_stats() -> Dict[str, Any]

Get execution statistics.

Returns:

Type Description
Dict[str, Any]

Dictionary with execution metrics.

Source code in packages/fsm/src/dataknobs_fsm/execution/context.py
def get_execution_stats(self) -> Dict[str, Any]:
    """Get execution statistics.

    Returns:
        Dictionary with execution metrics.
    """
    return {
        'states_visited': len(self.state_history),
        'current_state': self.current_state,
        'previous_state': self.previous_state,
        'transition_count': self.transition_count,
        'execution_id': self.execution_id,
        'data_mode': self.data_mode.value if self.data_mode else None,
        'transaction_mode': self.transaction_mode.value if self.transaction_mode else None
    }
get_current_state_instance
get_current_state_instance() -> Any

Get the current state instance object.

Returns:

Type Description
Any

The StateInstance object for the current state, or None if not set.

Source code in packages/fsm/src/dataknobs_fsm/execution/context.py
def get_current_state_instance(self) -> Any:
    """Get the current state instance object.

    Returns:
        The StateInstance object for the current state, or None if not set.
    """
    return self.current_state_instance

ConfigLoader

ConfigLoader(use_dataknobs_config: bool = False)

Load and process FSM configurations from various sources.

Initialize the ConfigLoader.

Parameters:

Name Type Description Default
use_dataknobs_config bool

Whether to use dataknobs_config for advanced features.

False

Methods:

Name Description
add_registered_function

Add a function name to the set of registered functions.

load_from_file

Load configuration from a file.

load_from_dict

Load configuration from a dictionary.

load_from_template

Load configuration from a template.

load_template_config

Load configuration from a template configuration object.

validate_file

Validate a configuration file without fully loading it.

merge_configs

Merge multiple FSM configurations.

Source code in packages/fsm/src/dataknobs_fsm/config/loader.py
def __init__(self, use_dataknobs_config: bool = False):
    """Initialize the ConfigLoader.

    Args:
        use_dataknobs_config: Whether to use dataknobs_config for advanced features.
    """
    self.use_dataknobs_config = use_dataknobs_config
    self._env_prefix = "FSM_"
    self._included_configs: Dict[str, Dict[str, Any]] = {}
    self._registered_functions: Set[str] = set()
Functions
add_registered_function
add_registered_function(name: str) -> None

Add a function name to the set of registered functions.

Parameters:

Name Type Description Default
name str

Function name that has been registered.

required
Source code in packages/fsm/src/dataknobs_fsm/config/loader.py
def add_registered_function(self, name: str) -> None:
    """Add a function name to the set of registered functions.

    Args:
        name: Function name that has been registered.
    """
    self._registered_functions.add(name)
load_from_file
load_from_file(
    file_path: Union[str, Path],
    resolve_env: bool = True,
    resolve_references: bool = True,
) -> FSMConfig

Load configuration from a file.

Parameters:

Name Type Description Default
file_path Union[str, Path]

Path to configuration file (JSON or YAML).

required
resolve_env bool

Whether to resolve environment variables.

True
resolve_references bool

Whether to resolve file references.

True

Returns:

Type Description
FSMConfig

Validated FSMConfig instance.

Raises:

Type Description
FileNotFoundError

If file doesn't exist.

ValueError

If file format is not supported.

Source code in packages/fsm/src/dataknobs_fsm/config/loader.py
def load_from_file(
    self,
    file_path: Union[str, Path],
    resolve_env: bool = True,
    resolve_references: bool = True,
) -> FSMConfig:
    """Load configuration from a file.

    Args:
        file_path: Path to configuration file (JSON or YAML).
        resolve_env: Whether to resolve environment variables.
        resolve_references: Whether to resolve file references.

    Returns:
        Validated FSMConfig instance.

    Raises:
        FileNotFoundError: If file doesn't exist.
        ValueError: If file format is not supported.
    """
    file_path = Path(file_path)

    if not file_path.exists():
        raise FileNotFoundError(f"Configuration file not found: {file_path}")

    # Load raw configuration
    raw_config = self._load_file(file_path)

    # Process with dataknobs_config if enabled
    if self.use_dataknobs_config:
        config_obj = DataknobsConfig(raw_config)
        processed_config = config_obj.to_dict()
    else:
        processed_config = raw_config

    # Resolve environment variables
    if resolve_env:
        processed_config = self._resolve_environment_vars(processed_config)

    # Resolve file references (includes/imports)
    if resolve_references:
        processed_config = self._resolve_references(processed_config, file_path.parent)

    # Apply common transformations and validate
    return self._finalize_config(processed_config)
load_from_dict
load_from_dict(
    config_dict: Dict[str, Any], resolve_env: bool = True
) -> FSMConfig

Load configuration from a dictionary.

Parameters:

Name Type Description Default
config_dict Dict[str, Any]

Configuration dictionary.

required
resolve_env bool

Whether to resolve environment variables.

True

Returns:

Type Description
FSMConfig

Validated FSMConfig instance.

Source code in packages/fsm/src/dataknobs_fsm/config/loader.py
def load_from_dict(
    self,
    config_dict: Dict[str, Any],
    resolve_env: bool = True,
) -> FSMConfig:
    """Load configuration from a dictionary.

    Args:
        config_dict: Configuration dictionary.
        resolve_env: Whether to resolve environment variables.

    Returns:
        Validated FSMConfig instance.
    """
    processed_config = config_dict.copy()

    # Process with dataknobs_config if enabled
    if self.use_dataknobs_config:
        config_obj = DataknobsConfig(processed_config)
        processed_config = config_obj.to_dict()

    # Resolve environment variables
    if resolve_env:
        processed_config = self._resolve_environment_vars(processed_config)

    # Apply common transformations and validate
    return self._finalize_config(processed_config)
load_from_template
load_from_template(
    template: Union[UseCaseTemplate, str],
    params: Dict[str, Any] | None = None,
    overrides: Dict[str, Any] | None = None,
) -> FSMConfig

Load configuration from a template.

Parameters:

Name Type Description Default
template Union[UseCaseTemplate, str]

Template name or enum value.

required
params Dict[str, Any] | None

Template parameters.

None
overrides Dict[str, Any] | None

Configuration overrides.

None

Returns:

Type Description
FSMConfig

Validated FSMConfig instance.

Source code in packages/fsm/src/dataknobs_fsm/config/loader.py
def load_from_template(
    self,
    template: Union[UseCaseTemplate, str],
    params: Dict[str, Any] | None = None,
    overrides: Dict[str, Any] | None = None,
) -> FSMConfig:
    """Load configuration from a template.

    Args:
        template: Template name or enum value.
        params: Template parameters.
        overrides: Configuration overrides.

    Returns:
        Validated FSMConfig instance.
    """
    if isinstance(template, str):
        template = UseCaseTemplate(template)

    # Apply template
    config_dict = apply_template(template, params, overrides)

    # Load from dictionary
    return self.load_from_dict(config_dict)
load_template_config
load_template_config(template_config: TemplateConfig) -> FSMConfig

Load configuration from a template configuration object.

Parameters:

Name Type Description Default
template_config TemplateConfig

Template configuration.

required

Returns:

Type Description
FSMConfig

Validated FSMConfig instance.

Source code in packages/fsm/src/dataknobs_fsm/config/loader.py
def load_template_config(self, template_config: TemplateConfig) -> FSMConfig:
    """Load configuration from a template configuration object.

    Args:
        template_config: Template configuration.

    Returns:
        Validated FSMConfig instance.
    """
    return self.load_from_template(
        template_config.template,
        template_config.params,
        template_config.overrides,
    )
validate_file
validate_file(file_path: Union[str, Path]) -> bool

Validate a configuration file without fully loading it.

Parameters:

Name Type Description Default
file_path Union[str, Path]

Path to configuration file.

required

Returns:

Type Description
bool

True if valid, False otherwise.

Source code in packages/fsm/src/dataknobs_fsm/config/loader.py
def validate_file(self, file_path: Union[str, Path]) -> bool:
    """Validate a configuration file without fully loading it.

    Args:
        file_path: Path to configuration file.

    Returns:
        True if valid, False otherwise.
    """
    try:
        self.load_from_file(file_path)
        return True
    except Exception:
        return False
merge_configs
merge_configs(*configs: FSMConfig) -> FSMConfig

Merge multiple FSM configurations.

Later configurations override earlier ones.

Parameters:

Name Type Description Default
*configs FSMConfig

FSMConfig instances to merge.

()

Returns:

Type Description
FSMConfig

Merged FSMConfig instance.

Source code in packages/fsm/src/dataknobs_fsm/config/loader.py
def merge_configs(self, *configs: FSMConfig) -> FSMConfig:
    """Merge multiple FSM configurations.

    Later configurations override earlier ones.

    Args:
        *configs: FSMConfig instances to merge.

    Returns:
        Merged FSMConfig instance.
    """
    merged_dict = {}

    for config in configs:
        config_dict = config.model_dump()
        self._deep_merge(merged_dict, config_dict)

    return validate_config(merged_dict)

FSMBuilder

FSMBuilder()

Build executable FSM instances from configuration.

Initialize the FSMBuilder.

Methods:

Name Description
build

Build an FSM instance from configuration.

register_function

Register a custom function.

Source code in packages/fsm/src/dataknobs_fsm/config/builder.py
def __init__(self):
    """Initialize the FSMBuilder."""
    self._resource_manager = ResourceManager()
    self._function_manager = FunctionManager()
    self._networks: Dict[str, StateNetwork] = {}
    self._data_handlers: Dict[DataHandlingMode, DataHandler] = {}
    self._transaction_manager: TransactionManager | None = None

    # Register built-in functions on initialization
    self._register_builtin_functions()
Functions
build
build(config: FSMConfig) -> CoreFSMClass

Build an FSM instance from configuration.

Parameters:

Name Type Description Default
config FSMConfig

FSM configuration.

required

Returns:

Type Description
FSM

Executable FSM instance.

Raises:

Type Description
ValueError

If configuration is invalid or incomplete.

Source code in packages/fsm/src/dataknobs_fsm/config/builder.py
def build(self, config: FSMConfig) -> CoreFSMClass:
    """Build an FSM instance from configuration.

    Args:
        config: FSM configuration.

    Returns:
        Executable FSM instance.

    Raises:
        ValueError: If configuration is invalid or incomplete.
    """
    # Clear previous build state
    self._networks.clear()

    # 1. Register resources
    self._register_resources(config.resources)

    # 2. Initialize data handlers
    self._init_data_handlers(config.data_mode)

    # 3. Initialize transaction manager
    self._init_transaction_manager(config.transaction)

    # 4. Build networks
    for network_config in config.networks:
        network = self._build_network(network_config, config)
        self._networks[network.name] = network

    # 5. Validate completeness
    self._validate_completeness(config)

    # 6. Create core FSM instance
    from dataknobs_fsm.core.modes import ProcessingMode as CoreDataMode
    from dataknobs_fsm.core.modes import TransactionMode as CoreTransactionMode

    # Map config modes to core modes
    data_mode = CoreDataMode.SINGLE  # Default to SINGLE for now
    transaction_mode = CoreTransactionMode.NONE  # Default to NONE

    fsm = CoreFSMClass(
        name=config.name,
        data_mode=data_mode,
        transaction_mode=transaction_mode,
        description=config.description,
        resource_manager=self._resource_manager,
        transaction_manager=self._transaction_manager,
    )

    # Store config in FSM for reference
    fsm.config = config

    # Register all functions from builder into core FSM's function registry
    for func_name in self._function_manager.list_functions():
        wrapper = self._function_manager.get_function(func_name)
        if wrapper:
            # The FSM's function registry expects callable functions
            # If it's a FunctionWrapper, get the actual function
            if hasattr(wrapper, 'func'):
                fsm.function_registry.register(func_name, wrapper.func)
            else:
                fsm.function_registry.register(func_name, wrapper)

    # Add networks to FSM
    for network_name, network in self._networks.items():
        fsm.add_network(network, is_main=(network_name == config.main_network))

    # Return the core FSM directly
    return fsm
register_function
register_function(name: str, func: Callable) -> None

Register a custom function.

Parameters:

Name Type Description Default
name str

Function name for reference in configuration.

required
func Callable

Function implementation.

required
Source code in packages/fsm/src/dataknobs_fsm/config/builder.py
def register_function(self, name: str, func: Callable) -> None:
    """Register a custom function.

    Args:
        name: Function name for reference in configuration.
        func: Function implementation.
    """
    self._function_manager.register_function(name, func, FunctionSource.REGISTERED)

ExecutionHistoryQuery dataclass

ExecutionHistoryQuery(
    from_state: str | None = None,
    to_state: str | None = None,
    trigger: str | None = None,
    transition_name: str | None = None,
    since: float | None = None,
    until: float | None = None,
    success_only: bool = False,
    failed_only: bool = False,
    limit: int | None = None,
)

Query parameters for filtering execution history.

Attributes:

Name Type Description
from_state str | None

Filter by source state

to_state str | None

Filter by target state

trigger str | None

Filter by trigger type

transition_name str | None

Filter by transition name

since float | None

Filter to records after this timestamp

until float | None

Filter to records before this timestamp

success_only bool

Only include successful transitions

failed_only bool

Only include failed transitions

limit int | None

Maximum number of records to return

ExecutionRecord dataclass

ExecutionRecord(
    from_state: str,
    to_state: str,
    timestamp: float,
    trigger: str = "step",
    transition_name: str | None = None,
    duration_in_state_ms: float = 0.0,
    data_before: dict[str, Any] | None = None,
    data_after: dict[str, Any] | None = None,
    condition_evaluated: str | None = None,
    condition_result: bool | None = None,
    success: bool = True,
    error: str | None = None,
)

Record of a single FSM state transition.

Captures all relevant information about a state transition including timing, states, trigger, and data snapshots.

Attributes:

Name Type Description
from_state str

State name before the transition

to_state str

State name after the transition

timestamp float

Unix timestamp when transition occurred

trigger str

What triggered the transition (e.g., "step", "auto", "external")

transition_name str | None

Name of the transition/arc that was taken

duration_in_state_ms float

Time spent in the from_state in milliseconds

data_before dict[str, Any] | None

Data state before the transition

data_after dict[str, Any] | None

Data state after the transition

condition_evaluated str | None

The condition expression that was evaluated (if any)

condition_result bool | None

Result of the condition evaluation (True/False)

success bool

Whether the transition completed successfully

error str | None

Error message if transition failed

Example
record = ExecutionRecord(
    from_state="processing",
    to_state="complete",
    timestamp=time.time(),
    trigger="step",
    duration_in_state_ms=5000.0,
    data_before={"items": []},
    data_after={"items": ["processed"]},
    success=True,
)

Methods:

Name Description
to_dict

Convert record to dictionary.

from_dict

Create record from dictionary.

from_step_result

Create record from a StepResult.

Functions
to_dict
to_dict() -> dict[str, Any]

Convert record to dictionary.

Returns:

Type Description
dict[str, Any]

Dictionary representation of the record

Source code in packages/fsm/src/dataknobs_fsm/observability.py
def to_dict(self) -> dict[str, Any]:
    """Convert record to dictionary.

    Returns:
        Dictionary representation of the record
    """
    return asdict(self)
from_dict classmethod
from_dict(data: dict[str, Any]) -> ExecutionRecord

Create record from dictionary.

Parameters:

Name Type Description Default
data dict[str, Any]

Dictionary containing record fields

required

Returns:

Type Description
ExecutionRecord

ExecutionRecord instance

Source code in packages/fsm/src/dataknobs_fsm/observability.py
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "ExecutionRecord":
    """Create record from dictionary.

    Args:
        data: Dictionary containing record fields

    Returns:
        ExecutionRecord instance
    """
    return cls(**data)
from_step_result classmethod
from_step_result(
    step_result: Any,
    trigger: str = "step",
    state_entry_time: float | None = None,
    condition_evaluated: str | None = None,
) -> ExecutionRecord

Create record from a StepResult.

Parameters:

Name Type Description Default
step_result Any

StepResult from AdvancedFSM execution

required
trigger str

What triggered this transition

'step'
state_entry_time float | None

When the from_state was entered (for duration calc)

None
condition_evaluated str | None

The condition expression if known

None

Returns:

Type Description
ExecutionRecord

ExecutionRecord instance

Source code in packages/fsm/src/dataknobs_fsm/observability.py
@classmethod
def from_step_result(
    cls,
    step_result: Any,
    trigger: str = "step",
    state_entry_time: float | None = None,
    condition_evaluated: str | None = None,
) -> "ExecutionRecord":
    """Create record from a StepResult.

    Args:
        step_result: StepResult from AdvancedFSM execution
        trigger: What triggered this transition
        state_entry_time: When the from_state was entered (for duration calc)
        condition_evaluated: The condition expression if known

    Returns:
        ExecutionRecord instance
    """
    now = time.time()
    duration_ms = (now - state_entry_time) * 1000 if state_entry_time else 0.0

    return cls(
        from_state=step_result.from_state,
        to_state=step_result.to_state,
        timestamp=now,
        trigger=trigger,
        transition_name=step_result.transition,
        duration_in_state_ms=duration_ms,
        data_before=step_result.data_before,
        data_after=step_result.data_after,
        condition_evaluated=condition_evaluated,
        condition_result=True if condition_evaluated else None,
        success=step_result.success,
        error=step_result.error,
    )

ExecutionStats dataclass

ExecutionStats(
    total_transitions: int = 0,
    successful_transitions: int = 0,
    failed_transitions: int = 0,
    unique_paths: int = 0,
    avg_duration_per_state_ms: float = 0.0,
    most_visited_state: str | None = None,
    most_common_trigger: str | None = None,
    first_transition: float | None = None,
    last_transition: float | None = None,
)

Aggregated statistics for FSM executions.

Attributes:

Name Type Description
total_transitions int

Total number of transitions

successful_transitions int

Number of successful transitions

failed_transitions int

Number of failed transitions

unique_paths int

Number of unique state-to-state paths taken

avg_duration_per_state_ms float

Average time spent in each state

most_visited_state str | None

State that was visited most often

most_common_trigger str | None

Most frequent transition trigger

first_transition float | None

Timestamp of first transition

last_transition float | None

Timestamp of last transition

Attributes
success_rate property
success_rate: float

Calculate success rate as percentage.

Returns:

Type Description
float

Success rate between 0.0 and 100.0

has_failures property
has_failures: bool

Check if there were any failed transitions.

Returns:

Type Description
bool

True if failed_transitions > 0

ExecutionTracker

ExecutionTracker(max_history: int = 100)

Tracks FSM execution history with query capabilities.

Manages a bounded history of state transitions and provides methods for querying and aggregating execution data.

Attributes:

Name Type Description
max_history

Maximum number of records to retain

Example
tracker = ExecutionTracker(max_history=1000)

# Record from StepResult
tracker.record_from_step_result(step_result, trigger="step")

# Or record directly
tracker.record(ExecutionRecord(
    from_state="start",
    to_state="processing",
    timestamp=time.time(),
    trigger="step",
    success=True,
))

# Query history
recent = tracker.query(ExecutionHistoryQuery(
    from_state="processing",
    since=time.time() - 3600,
))

# Get stats
stats = tracker.get_stats()

Initialize tracker.

Parameters:

Name Type Description Default
max_history int

Maximum records to retain (default 100)

100

Methods:

Name Description
record

Record an execution.

record_from_step_result

Record execution from a StepResult.

mark_state_entry

Mark the current time as state entry time.

query

Query execution history.

get_stats

Get aggregated execution statistics.

get_state_flow

Get the sequence of states visited.

clear

Clear all execution history.

__len__

Return number of records in history.

Source code in packages/fsm/src/dataknobs_fsm/observability.py
def __init__(self, max_history: int = 100):
    """Initialize tracker.

    Args:
        max_history: Maximum records to retain (default 100)
    """
    self._history: list[ExecutionRecord] = []
    self._max_history = max_history
    self._state_entry_time: float | None = None
Functions
record
record(execution: ExecutionRecord) -> None

Record an execution.

Parameters:

Name Type Description Default
execution ExecutionRecord

The execution record to store

required
Source code in packages/fsm/src/dataknobs_fsm/observability.py
def record(self, execution: ExecutionRecord) -> None:
    """Record an execution.

    Args:
        execution: The execution record to store
    """
    self._history.append(execution)
    if len(self._history) > self._max_history:
        self._history.pop(0)
    # Update state entry time for next transition
    self._state_entry_time = time.time()
record_from_step_result
record_from_step_result(
    step_result: Any,
    trigger: str = "step",
    condition_evaluated: str | None = None,
) -> ExecutionRecord

Record execution from a StepResult.

Convenience method that creates an ExecutionRecord from StepResult and records it.

Parameters:

Name Type Description Default
step_result Any

StepResult from AdvancedFSM execution

required
trigger str

What triggered this transition

'step'
condition_evaluated str | None

The condition expression if known

None

Returns:

Type Description
ExecutionRecord

The created ExecutionRecord

Source code in packages/fsm/src/dataknobs_fsm/observability.py
def record_from_step_result(
    self,
    step_result: Any,
    trigger: str = "step",
    condition_evaluated: str | None = None,
) -> ExecutionRecord:
    """Record execution from a StepResult.

    Convenience method that creates an ExecutionRecord from StepResult
    and records it.

    Args:
        step_result: StepResult from AdvancedFSM execution
        trigger: What triggered this transition
        condition_evaluated: The condition expression if known

    Returns:
        The created ExecutionRecord
    """
    record = ExecutionRecord.from_step_result(
        step_result=step_result,
        trigger=trigger,
        state_entry_time=self._state_entry_time,
        condition_evaluated=condition_evaluated,
    )
    self.record(record)
    return record
mark_state_entry
mark_state_entry() -> None

Mark the current time as state entry time.

Call this when entering a new state to track duration.

Source code in packages/fsm/src/dataknobs_fsm/observability.py
def mark_state_entry(self) -> None:
    """Mark the current time as state entry time.

    Call this when entering a new state to track duration.
    """
    self._state_entry_time = time.time()
query
query(query: ExecutionHistoryQuery | None = None) -> list[ExecutionRecord]

Query execution history.

Parameters:

Name Type Description Default
query ExecutionHistoryQuery | None

Query parameters, or None for all records

None

Returns:

Type Description
list[ExecutionRecord]

List of matching execution records

Source code in packages/fsm/src/dataknobs_fsm/observability.py
def query(
    self, query: ExecutionHistoryQuery | None = None
) -> list[ExecutionRecord]:
    """Query execution history.

    Args:
        query: Query parameters, or None for all records

    Returns:
        List of matching execution records
    """
    if query is None:
        return list(self._history)

    results = self._history

    if query.from_state:
        results = [r for r in results if r.from_state == query.from_state]

    if query.to_state:
        results = [r for r in results if r.to_state == query.to_state]

    if query.trigger:
        results = [r for r in results if r.trigger == query.trigger]

    if query.transition_name:
        results = [r for r in results if r.transition_name == query.transition_name]

    if query.since:
        results = [r for r in results if r.timestamp >= query.since]

    if query.until:
        results = [r for r in results if r.timestamp <= query.until]

    if query.success_only:
        results = [r for r in results if r.success]

    if query.failed_only:
        results = [r for r in results if not r.success]

    if query.limit:
        results = results[-query.limit:]

    return results
get_stats
get_stats() -> ExecutionStats

Get aggregated execution statistics.

Returns:

Type Description
ExecutionStats

ExecutionStats with aggregated metrics

Source code in packages/fsm/src/dataknobs_fsm/observability.py
def get_stats(self) -> ExecutionStats:
    """Get aggregated execution statistics.

    Returns:
        ExecutionStats with aggregated metrics
    """
    if not self._history:
        return ExecutionStats()

    # Count triggers and states
    trigger_counts: dict[str, int] = {}
    state_counts: dict[str, int] = {}
    unique_paths: set[tuple[str, str]] = set()
    total_duration = 0.0
    successful = 0
    failed = 0

    for record in self._history:
        # Count trigger types
        trigger_counts[record.trigger] = trigger_counts.get(record.trigger, 0) + 1

        # Count state visits (to_state)
        state_counts[record.to_state] = state_counts.get(record.to_state, 0) + 1

        # Track unique paths
        unique_paths.add((record.from_state, record.to_state))

        # Sum durations
        total_duration += record.duration_in_state_ms

        # Count success/failure
        if record.success:
            successful += 1
        else:
            failed += 1

    # Find most common trigger
    most_common_trigger = (
        max(trigger_counts, key=trigger_counts.get) if trigger_counts else None
    )

    # Find most visited state
    most_visited_state = (
        max(state_counts, key=state_counts.get) if state_counts else None
    )

    return ExecutionStats(
        total_transitions=len(self._history),
        successful_transitions=successful,
        failed_transitions=failed,
        unique_paths=len(unique_paths),
        avg_duration_per_state_ms=(
            total_duration / len(self._history) if self._history else 0.0
        ),
        most_visited_state=most_visited_state,
        most_common_trigger=most_common_trigger,
        first_transition=self._history[0].timestamp,
        last_transition=self._history[-1].timestamp,
    )
get_state_flow
get_state_flow() -> list[str]

Get the sequence of states visited.

Returns:

Type Description
list[str]

List of state names in order visited

Source code in packages/fsm/src/dataknobs_fsm/observability.py
def get_state_flow(self) -> list[str]:
    """Get the sequence of states visited.

    Returns:
        List of state names in order visited
    """
    if not self._history:
        return []

    states = [self._history[0].from_state]
    for record in self._history:
        states.append(record.to_state)
    return states
clear
clear() -> None

Clear all execution history.

Source code in packages/fsm/src/dataknobs_fsm/observability.py
def clear(self) -> None:
    """Clear all execution history."""
    self._history.clear()
    self._state_entry_time = None
__len__
__len__() -> int

Return number of records in history.

Source code in packages/fsm/src/dataknobs_fsm/observability.py
def __len__(self) -> int:
    """Return number of records in history."""
    return len(self._history)

Functions

create_advanced_fsm

create_advanced_fsm(
    config: str | Path | dict[str, Any] | FSM,
    custom_functions: dict[str, Callable] | None = None,
    **kwargs: Any,
) -> AdvancedFSM

Factory function to create an AdvancedFSM instance.

Delegates to :class:AdvancedFSM which handles both pre-built FSM instances and raw config (path or dict) using :func:~dataknobs_fsm.config.builder.build_fsm.

Parameters:

Name Type Description Default
config str | Path | dict[str, Any] | FSM

Configuration dict, file path, or pre-built FSM instance.

required
custom_functions dict[str, Callable] | None

Optional custom functions to register.

None
**kwargs Any

Additional arguments forwarded to AdvancedFSM.

{}

Returns:

Type Description
AdvancedFSM

Configured AdvancedFSM instance.

Source code in packages/fsm/src/dataknobs_fsm/api/advanced.py
def create_advanced_fsm(
    config: str | Path | dict[str, Any] | FSM,
    custom_functions: dict[str, Callable] | None = None,
    **kwargs: Any
) -> AdvancedFSM:
    """Factory function to create an AdvancedFSM instance.

    Delegates to :class:`AdvancedFSM` which handles both pre-built FSM
    instances and raw config (path or dict) using
    :func:`~dataknobs_fsm.config.builder.build_fsm`.

    Args:
        config: Configuration dict, file path, or pre-built FSM instance.
        custom_functions: Optional custom functions to register.
        **kwargs: Additional arguments forwarded to AdvancedFSM.

    Returns:
        Configured AdvancedFSM instance.
    """
    return AdvancedFSM(config, custom_functions=custom_functions, **kwargs)

create_execution_record

create_execution_record(
    from_state: str,
    to_state: str,
    trigger: str = "step",
    transition_name: str | None = None,
    duration_in_state_ms: float = 0.0,
    data_before: dict[str, Any] | None = None,
    data_after: dict[str, Any] | None = None,
    condition_evaluated: str | None = None,
    condition_result: bool | None = None,
    success: bool = True,
    error: str | None = None,
) -> ExecutionRecord

Factory function to create an execution record.

Convenience function that automatically sets the timestamp.

Parameters:

Name Type Description Default
from_state str

State name before transition

required
to_state str

State name after transition

required
trigger str

What triggered the transition

'step'
transition_name str | None

Name of the transition/arc

None
duration_in_state_ms float

Time spent in from_state

0.0
data_before dict[str, Any] | None

Data state before transition

None
data_after dict[str, Any] | None

Data state after transition

None
condition_evaluated str | None

The condition expression that was evaluated

None
condition_result bool | None

Result of the condition evaluation

None
success bool

Whether transition succeeded

True
error str | None

Error message if transition failed

None

Returns:

Type Description
ExecutionRecord

ExecutionRecord with current timestamp

Source code in packages/fsm/src/dataknobs_fsm/observability.py
def create_execution_record(
    from_state: str,
    to_state: str,
    trigger: str = "step",
    transition_name: str | None = None,
    duration_in_state_ms: float = 0.0,
    data_before: dict[str, Any] | None = None,
    data_after: dict[str, Any] | None = None,
    condition_evaluated: str | None = None,
    condition_result: bool | None = None,
    success: bool = True,
    error: str | None = None,
) -> ExecutionRecord:
    """Factory function to create an execution record.

    Convenience function that automatically sets the timestamp.

    Args:
        from_state: State name before transition
        to_state: State name after transition
        trigger: What triggered the transition
        transition_name: Name of the transition/arc
        duration_in_state_ms: Time spent in from_state
        data_before: Data state before transition
        data_after: Data state after transition
        condition_evaluated: The condition expression that was evaluated
        condition_result: Result of the condition evaluation
        success: Whether transition succeeded
        error: Error message if transition failed

    Returns:
        ExecutionRecord with current timestamp
    """
    return ExecutionRecord(
        from_state=from_state,
        to_state=to_state,
        timestamp=time.time(),
        trigger=trigger,
        transition_name=transition_name,
        duration_in_state_ms=duration_in_state_ms,
        data_before=data_before,
        data_after=data_after,
        condition_evaluated=condition_evaluated,
        condition_result=condition_result,
        success=success,
        error=error,
    )