Data Processing Pipeline¶
This example demonstrates how to build a robust data processing pipeline using the FSM framework with real-world features including data validation, enrichment, aggregation, and resource management.
Overview¶
The data pipeline example shows how to: - Create a simple FSM for data processing workflows - Implement custom transform functions for validation, enrichment, and aggregation - Use resource management for tracking processing statistics - Handle errors gracefully during pipeline execution - Process records through a complete transformation pipeline
Key Components¶
Custom Transform Functions¶
The example implements three transform functions that work together in a pipeline:
DataValidator¶
Validates incoming data records to ensure they have required fields and correct data types:
class DataValidator(ITransformFunction):
def transform(self, data: Any, context: FunctionContext) -> Any:
# Check required fields
required_fields = ['id', 'timestamp', 'value']
for field in required_fields:
if field not in data:
raise ValueError(f"Missing required field: {field}")
# Validate data types
if not isinstance(data['value'], (int, float)):
raise ValueError("Value must be numeric")
# Add validation flag
data['validated'] = True
return data
DataEnricher¶
Enriches data with computed fields and metadata:
class DataEnricher(ITransformFunction):
def transform(self, data: Any, context: FunctionContext) -> Any:
# Add computed fields
data['value_squared'] = data['value'] ** 2
data['value_multiplied'] = data['value'] * self.multiplier
data['value_category'] = self._categorize_value(data['value'])
# Track processing stats using resources
if 'properties' in context.resources:
props = context.resources['properties']
count = props.get('processed_count', 0)
props.set('processed_count', count + 1)
return data
DataAggregator¶
Aggregates data into summary statistics:
class DataAggregator(ITransformFunction):
def transform(self, data: Any, context: FunctionContext) -> Any:
# Calculate aggregations
total = sum(r.get('value', 0) for r in records)
count = len(records)
avg = total / count if count > 0 else 0
return {
'type': 'aggregation',
'count': count,
'total': total,
'average': avg,
'min': min_val,
'max': max_val
}
FSM Configuration¶
The pipeline FSM uses a simple three-state network:
def create_simple_pipeline_fsm() -> FSM:
# Create FSM
fsm = FSM(name="data_pipeline")
# Register custom functions
func_manager = FunctionManager()
func_manager.register_function('validate', DataValidator())
func_manager.register_function('enrich', DataEnricher(multiplier=3))
func_manager.register_function('aggregate', DataAggregator())
# Create network with states
network = StateNetwork(name="main")
network.add_state(State(name="start", type="start"), initial=True)
network.add_state(State(name="process", type="normal"))
network.add_state(State(name="end", type="end"), final=True)
# Connect states with arcs
network.add_arc("start", "process")
network.add_arc("process", "end")
fsm.add_network(network)
return fsm
Resource Management¶
The example uses the PropertiesResource to track processing statistics:
# Create resources
resource_manager = ResourceManager()
# Add properties resource for tracking
props_resource = PropertiesResource(
name='properties',
initial_properties={
'source': 'example_pipeline',
'processed_count': 0
}
)
resource_manager.register_provider('properties', props_resource)
fsm.resource_manager = resource_manager
# Use resources during processing
props_handle = resource_manager.acquire('properties', f'record_{record["id"]}')
context.resources = {'properties': props_handle}
Processing Records¶
The pipeline processes records individually through the FSM:
# Test data
test_records = [
{'id': 1, 'timestamp': '2024-01-01', 'value': 25.0},
{'id': 2, 'timestamp': '2024-01-02', 'value': 50.0},
{'id': 3, 'timestamp': '2024-01-03', 'value': 75.0}
]
# Process each record
engine = ExecutionEngine(fsm)
results = []
for record in test_records:
context = ExecutionContext(data_mode=ProcessingMode.SINGLE)
# Acquire resources
props_handle = resource_manager.acquire('properties', f'record_{record["id"]}')
context.resources = {'properties': props_handle}
try:
success, result = engine.execute(context, record)
if success:
results.append(result)
finally:
# Release resources
resource_manager.release('properties', props_handle)
Running the Example¶
To run the data pipeline example:
Expected output:
=== Simple Data Pipeline FSM Example ===
1. Building FSM...
FSM 'data_pipeline' created
2. Setting up resources...
Resources configured
3. Processing sample data...
Processed record 1: value=25.0
Processed record 2: value=50.0
Processed record 3: value=75.0
Successfully processed 3 records
4. Sample Result:
Original value: 25.0
Validated: True
Category: medium
=== Example Complete ===
Key Features Demonstrated¶
1. Transform Function Interface¶
The example shows how to implement the ITransformFunction interface with proper method signatures:
- transform() method for data processing
- get_transform_description() for documentation
- Access to FunctionContext for resources and metadata
2. Resource Management Pattern¶
Demonstrates proper resource lifecycle: - Register resource providers with ResourceManager - Acquire resources before processing - Access resources through context during processing - Release resources after processing (even on error)
3. Error Handling¶
Shows robust error handling patterns: - Validation errors for missing fields - Type checking and data validation - Try/finally blocks for resource cleanup - Graceful error reporting
4. Data Enrichment Patterns¶
Illustrates common data processing patterns: - Adding computed fields (squared, multiplied values) - Categorizing data based on ranges - Adding processing timestamps - Tracking processing statistics
Integration with Testing¶
The example includes comprehensive tests in test_data_pipeline_example.py:
def test_pipeline_with_valid_data():
"""Test pipeline with valid input data."""
fsm = create_simple_pipeline_fsm()
engine = ExecutionEngine(fsm)
test_data = {'id': 1, 'timestamp': '2024-01-01', 'value': 42.0}
context = ExecutionContext(data_mode=ProcessingMode.SINGLE)
success, result = engine.execute(context, test_data)
assert success
assert result['validated'] is True
assert result['value_squared'] == 1764.0
assert result['value_category'] == 'medium'
Use Cases¶
This example is ideal for: - ETL (Extract, Transform, Load) pipelines - Data quality validation workflows - Real-time data processing streams - Batch data transformation jobs - IoT sensor data processing - Log processing and analysis
Next Steps¶
To extend this example for your use case:
- Add Custom Functions: Create additional transform functions for your specific data processing needs
- Configure State Networks: Design more complex state networks with conditional transitions
- Integrate Storage: Add database or file storage sinks for processed data
- Add Monitoring: Integrate metrics collection and alerting
- Scale Processing: Use stream processing for larger datasets
- Add Error Recovery: Implement retry logic and dead letter queues
Related Examples¶
- End-to-End Streaming - For high-volume stream processing
- Database ETL - For database-specific ETL patterns
- File Processor - For file-based data processing
API References¶
- SimpleFSM - Simple FSM API for basic workflows
- AdvancedFSM - Advanced FSM with debugging capabilities
- API Index - Complete API documentation