Skip to content

FSM (Finite State Machine) Package

The FSM package provides a powerful and flexible framework for building state machines in Python. It enables you to create complex workflows, orchestrate API calls, manage resources, and build sophisticated data processing pipelines.

Features

Core Capabilities

  • Core FSM Engine: Robust state machine implementation with support for complex transitions and data flows
  • Multiple Execution Strategies: Depth-first, breadth-first, resource-optimized, and stream-optimized traversal strategies
  • Execution Modes: Synchronous, asynchronous, batch, and streaming execution engines
  • Configuration-Driven: Define FSMs using YAML/JSON configuration or programmatically

Data Handling Modes

  • COPY Mode (default): Safe concurrent processing with data isolation
  • REFERENCE Mode: Memory-efficient processing with optimistic locking
  • DIRECT Mode: High-performance in-place modifications

Resource Management

Comprehensive lifecycle management with pooling for:

  • Database connections with transaction support
  • HTTP clients with retry logic
  • File system resources
  • Custom resource providers

For LLM integration, see the dataknobs-llm package.

Transform Functions

Interface-based transform system with:

  • Validation functions for data quality
  • Transform functions for data manipulation
  • State test functions for conditional logic
  • Extensive function library

Integration Patterns

Production-ready patterns for:

  • ETL workflows (FULL_REFRESH, INCREMENTAL, UPSERT, APPEND)
  • API orchestration with multi-service coordination
  • Error recovery with retry and circuit breaker patterns
  • File processing pipelines

For LLM/AI workflows, see the dataknobs-llm package.

Advanced Features

  • Hierarchical state machines with PushArc support
  • Transaction management with commit/rollback
  • Execution history tracking and auditing
  • Configurable data isolation for sub-networks
  • Health monitoring and metrics

API Design

  • SimpleFSM: Synchronous API for straightforward workflows (works in all contexts)
  • AsyncSimpleFSM: Native async API for async/await contexts
  • AdvancedFSM: For debugging, stepping, and hooks

Installation

pip install dataknobs-fsm

Or with optional dependencies:

# With database support
pip install dataknobs-fsm[database]

# With all extras
pip install dataknobs-fsm[all]

For LLM functionality, install the separate LLM package:

pip install dataknobs-llm

Basic Usage

Using SimpleFSM API

from dataknobs_fsm.api.simple import SimpleFSM
from datetime import datetime

# Create custom functions
def validate_data(state):
    """Custom validation function."""
    if 'required_field' not in state.data:
        raise ValueError("Missing required field")
    return state.data

def process_data(state):
    """Transform function for processing."""
    data = state.data.copy()
    data['processed'] = True
    data['timestamp'] = datetime.now().isoformat()
    return data

# Define FSM configuration
config = {
    "name": "data_processor",
    "states": [
        {"name": "start", "is_start": True},
        {"name": "validate"},
        {"name": "process"},
        {"name": "end", "is_end": True}
    ],
    "arcs": [
        {
            "from": "start",
            "to": "validate",
            "transform": {"type": "registered", "name": "validate"}
        },
        {
            "from": "validate",
            "to": "process",
            "transform": {"type": "registered", "name": "process"}
        },
        {"from": "process", "to": "end"}
    ]
}

# Initialize FSM with custom functions
fsm = SimpleFSM(
    config,
    custom_functions={
        'validate': validate_data,
        'process': process_data
    }
)

# Execute the FSM
result = fsm.process(
    {"required_field": "value", "input": "data"}
)
print(result)  # {"final_state": "end", "data": {"required_field": "value", "input": "data", "processed": True, "timestamp": "..."}, ...}

Using AdvancedFSM API

import asyncio
from dataknobs_fsm.api.advanced import (
    create_advanced_fsm, ExecutionMode, ExecutionHook
)

# Set up execution hooks for monitoring
hooks = ExecutionHook(
    on_state_enter=lambda state: print(f"Entering: {state}"),
    on_state_exit=lambda state: print(f"Exiting: {state}"),
    on_error=lambda error: print(f"Error: {error}")
)

# Create FSM via factory (accepts config dict, YAML path, or FSM instance)
fsm = create_advanced_fsm(
    "fsm_config.yaml",
    execution_mode=ExecutionMode.STEP_BY_STEP
)
fsm.set_hooks(hooks)

async def run_workflow():
    test_data = {"input": "data"}

    # Step-by-step execution (step() returns StepResult)
    async with fsm.execution_context(test_data) as context:
        while True:
            result = await fsm.step(context)
            print(f"  {result.from_state} -> {result.to_state}")

            if not result.success or result.is_complete or result.transition == "none":
                break

    # Or run with profiling
    profile = await fsm.profile_execution(test_data)
    print(f"Execution time: {profile['total_time']:.4f}s")
    print(f"Transitions: {profile['transitions']}")

asyncio.run(run_workflow())

Using Configuration

# fsm_config.yaml
name: data_processor
data_mode: COPY  # or REFERENCE or DIRECT
execution_strategy: DEPTH_FIRST  # or BREADTH_FIRST, RESOURCE_OPTIMIZED, STREAM_OPTIMIZED

states:
  - name: start
    is_start: true
  - name: validate
    functions:
      state_test:
        type: builtin
        name: has_required_fields
        params:
          fields: ["user_id", "data"]
  - name: process
    functions:
      transform:
        type: inline
        code: |
          lambda state: {
              **state.data,
              'processed': True,
              'timestamp': datetime.now().isoformat()
          }
    resources:
      - type: database
        name: main_db
  - name: end
    is_end: true

arcs:
  - from: start
    to: validate
  - from: validate
    to: process
    pre_test:
      type: builtin
      name: data_valid
  - from: process
    to: end
    transform:
      type: builtin
      name: add_metadata

resources:
  database:
    main_db:
      provider: postgresql
      connection_string: ${DATABASE_URL}
      pool_size: 10
from dataknobs_fsm.api.simple import SimpleFSM

fsm = SimpleFSM("fsm_config.yaml")
result = fsm.process({"user_id": "123", "data": "input"})

Architecture

The FSM package is built with a modular, layered architecture:

Core Components

State Management

  • StateDefinition: Template for states with schemas and validation
  • StateInstance: Runtime state instances with data and context
  • StateTest, ValidityTest: Condition checking functions

Arc System

  • ArcDefinition: Transition definitions with optional transforms
  • ArcExecution: Runtime arc execution with resource management
  • PushArc: Hierarchical composition for sub-networks

Execution Engines

  • ExecutionEngine: Synchronous execution with strategy support
  • AsyncExecutionEngine: Asynchronous execution with concurrency
  • BatchExecutor: Optimized batch processing
  • StreamExecutor: Stream processing with backpressure

Data Handling

  • DataModeHandler: Abstract interface for data operations
  • CopyModeHandler: Safe concurrent processing
  • ReferenceModeHandler: Memory-efficient with locking
  • DirectModeHandler: High-performance in-place operations

Resource Management

  • ResourceManager: Central resource lifecycle control
  • ResourcePool: Connection pooling with health checks
  • Specialized providers for databases, HTTP, LLM, filesystem

Function System

  • Interface-based design with ITransformFunction, IValidationFunction
  • Extensive function library for common operations
  • Custom function registration support

Use Cases

Data Processing Pipelines

  • ETL workflows with multiple data sources and targets
  • Data validation and quality checks
  • Format transformation and normalization
  • Batch and stream processing modes

API Orchestration

  • Multi-service API coordination
  • Rate limiting and quota management
  • Retry logic with exponential backoff
  • Circuit breaker patterns for fault tolerance

LLM/AI Workflows

Note: LLM functionality has moved to the dataknobs-llm package.

The dedicated LLM package provides: - Multi-provider LLM support (OpenAI, Anthropic) - FSM-based conversation flows - Prompt templating and versioning - RAG integration and caching - A/B testing and metrics

See the LLM package documentation for details.

File Processing

  • Batch file processing with multiple formats
  • Parallel processing with resource constraints
  • Progress tracking and resumption
  • Error handling and partial failure recovery

Stream Processing

  • Real-time data ingestion and transformation
  • Backpressure handling and flow control
  • Window-based aggregations
  • Event-driven architectures

Error Recovery Patterns

  • Configurable retry strategies
  • Circuit breakers with health monitoring
  • Fallback and compensation logic
  • Dead letter queue handling

Key Concepts

Understanding the Two Mode Types

The FSM package has two distinct mode concepts that are often confused:

DataHandlingMode - HOW data is managed

Controls how individual states handle data internally:

Mode Description Use Case
COPY Creates deep copies of data for safe concurrent processing Default mode, best for multi-threaded environments
REFERENCE Works with data references using optimistic locking Large datasets, database-backed workflows
DIRECT Operates directly on source data Single-threaded, high-performance scenarios

ProcessingMode - HOW MANY records to process

Controls the execution strategy for record volume:

Mode Description Use Case
SINGLE Process one record at a time Simple record-by-record operations
BATCH Process multiple records in batches Optimizing database operations, transactions
STREAM Process continuous streams of data Large files, real-time data

Key Difference: DataHandlingMode is about memory safety and concurrency within states, while ProcessingMode is about throughput and how many records to handle at once. They work together but address different concerns.

Common Combinations

Use Case ProcessingMode DataHandlingMode Why
Web API requests SINGLE COPY Each request isolated, concurrent safety
ETL pipeline BATCH COPY Transaction boundaries, rollback support
Large file streaming STREAM REFERENCE Memory efficiency for continuous data
High-speed validation SINGLE DIRECT Maximum performance, simple operations
Database bulk updates BATCH COPY Transaction safety for batches

Execution Strategies

Choose from multiple traversal strategies:

Strategy Description When to Use
DEPTH_FIRST Explores deeply before backtracking Linear workflows, sequential processing
BREADTH_FIRST Explores all branches at same level Parallel exploration, comparison workflows
RESOURCE_OPTIMIZED Minimizes resource usage Resource-constrained environments
STREAM_OPTIMIZED Optimized for streaming data Real-time processing, event streams

Function Types

Functions can be registered and used throughout the FSM:

Type Description Example
inline Lambda expressions or code strings "lambda state: state.data.upper()"
builtin Pre-registered library functions {"type": "builtin", "name": "validate_email"}
custom Functions from Python modules {"type": "custom", "module": "myapp.transforms"}
registered Runtime-registered functions {"type": "registered", "name": "process_data"}

Next Steps

Guides: Deep dive into specific features: