Large File Processing Example¶
This example demonstrates how to process large files efficiently using the FSM framework with REFERENCE mode and streaming capabilities.
Overview¶
The example showcases:
- REFERENCE mode for handling large files without loading into memory
- Streaming processing for chunks of data
- Parallel processing of file chunks
- Progress tracking and statistics collection
- Error handling for partial failures
Source Code¶
The complete example is available at: packages/fsm/examples/large_file_processor.py
Implementation Details¶
FSM Configuration¶
The example uses SimpleFSM with REFERENCE mode for memory efficiency:
from dataknobs_fsm.api.simple import SimpleFSM
from dataknobs_fsm.core.data_modes import DataHandlingMode
def create_file_processor_fsm() -> SimpleFSM:
"""Create and configure the file processor FSM."""
config = {
"name": "large_file_processor",
"states": [
{"name": "start", "initial": True},
{"name": "initialize"},
{"name": "read_chunk"},
{"name": "process_chunk"},
{"name": "aggregate_results"},
{"name": "write_output"},
{"name": "complete", "terminal": True},
{"name": "error", "terminal": True}
],
"arcs": [...]
}
# Use REFERENCE mode for large files
fsm = SimpleFSM(
config,
data_mode=DataHandlingMode.REFERENCE,
custom_functions={
"initialize_processing": initialize_processing,
"read_next_chunk": read_next_chunk,
"process_chunk_data": process_chunk_data,
"aggregate_chunk_results": aggregate_chunk_results,
"write_results": write_results,
"finalize_processing": finalize_processing
}
)
return fsm
Key Functions¶
Initialize Processing¶
Sets up the file processing context with streaming support:
def initialize_processing(state) -> Dict[str, Any]:
"""Initialize the file processing context."""
data = state.data.copy()
# Get file reference
file_ref = data.get('file_reference')
# Set up processing context
data['processing'] = {
'total_lines': 0,
'processed_lines': 0,
'failed_lines': 0,
'chunks_processed': 0,
'errors': [],
'statistics': {
'min_value': None,
'max_value': None,
'sum': 0,
'count': 0
}
}
# Configure chunk size
data['chunk_size'] = data.get('chunk_size', 1000)
data['current_offset'] = 0
return data
Stream Processing¶
Processes the file in chunks to handle large datasets:
def read_next_chunk(state) -> Dict[str, Any]:
"""Read the next chunk of data from the file."""
data = state.data.copy()
file_ref = data['file_reference']
chunk_size = data['chunk_size']
# Stream the file chunk by chunk
with open(file_ref.file_path, 'r') as f:
# Seek to current offset
f.seek(data['current_offset'])
# Read chunk
chunk_data = []
for _ in range(chunk_size):
line = f.readline()
if not line:
data['end_of_file'] = True
break
chunk_data.append(line.strip())
data['current_chunk'] = chunk_data
data['current_offset'] = f.tell()
return data
Usage¶
import asyncio
from file_reference import FileReference
# Create file reference (not loaded into memory)
file_ref = FileReference(
file_path="large_dataset.csv",
file_type="csv",
metadata={"encoding": "utf-8"}
)
# Create and run the processor
fsm = create_file_processor_fsm()
result = fsm.process({
"file_reference": file_ref,
"chunk_size": 10000,
"output_file": "processed_results.json"
})
print(f"Processed {result['data']['processing']['processed_lines']} lines")
print(f"Statistics: {result['data']['processing']['statistics']}")
Benefits of This Approach¶
- Memory Efficiency: Files are never fully loaded into memory
- Scalability: Can handle files of any size
- Resilience: Partial failures don't affect the entire process
- Performance: Chunks can be processed in parallel
- Flexibility: Works with various file formats (CSV, JSONL, etc.)