Resource Management Guide¶
Overview¶
The DataKnobs FSM package provides comprehensive resource management for external services and dependencies. Resources are managed through a central ResourceManager that handles acquisition, release, pooling, and health monitoring.
Understanding Resources¶
Resources represent external dependencies that your FSM needs to interact with: - Database connections - File systems - HTTP services - LLM providers - Custom services
Key Features¶
- Automatic Lifecycle Management: Resources are acquired when needed and released when done
- Connection Pooling: Efficient reuse of connections
- Health Monitoring: Health checks and metrics tracking
- Owner Tracking: Track which states own which resources
- Cleanup Guarantees: Automatic cleanup on errors or completion
Resource Interface¶
All resources implement the IResourceProvider protocol:
from dataknobs_fsm.resources.base import IResourceProvider, ResourceHealth, ResourceMetrics
from typing import Any, Protocol, runtime_checkable
@runtime_checkable
class IResourceProvider(Protocol):
"""Interface for resource providers."""
def acquire(self, **kwargs) -> Any:
"""Acquire a resource."""
...
def release(self, resource: Any) -> None:
"""Release a resource."""
...
def validate(self, resource: Any) -> bool:
"""Validate that a resource is still valid."""
...
def health_check(self) -> ResourceHealth:
"""Check the health of the resource provider."""
...
def get_metrics(self) -> ResourceMetrics:
"""Get resource metrics."""
...
Resource Manager¶
The ResourceManager handles all resource lifecycle operations:
from dataknobs_fsm.resources.manager import ResourceManager
from dataknobs_fsm.resources.database import DatabaseResourceAdapter
# Create resource manager
manager = ResourceManager()
# Register a database resource
db_resource = DatabaseResourceAdapter(
name="main_db",
backend="postgresql",
connection_string="postgresql://user:pass@localhost/db"
)
manager.register_provider("database", db_resource)
# Acquire resource with owner tracking
resource = manager.acquire("database", owner_id="state_123")
# Use resource...
# Release when done
manager.release("database", owner_id="state_123")
# Or use context manager
with manager.resource_context("database", owner_id="state_456") as db:
# Use database
pass # Automatically released
Resource Status and Metrics¶
# Get resource status
status = manager.get_resource_status("database")
print(f"Active connections: {status['active_count']}")
print(f"Current owners: {status['owners']}")
# Get metrics
metrics = manager.get_metrics("database")
print(f"Total acquisitions: {metrics['database'].total_acquisitions}")
print(f"Average hold time: {metrics['database'].average_hold_time}s")
# Health check
health = manager.health_check("database")
print(f"Database health: {health['database']}") # ResourceHealth.HEALTHY
Built-in Resource Adapters¶
Database Resource¶
Connect to any database supported by dataknobs_data:
from dataknobs_fsm.resources.database import DatabaseResourceAdapter
# Create database resource
db_resource = DatabaseResourceAdapter(
name="postgres_db",
backend="postgresql",
host="localhost",
port=5432,
database="mydb",
user="user",
password="pass"
)
# Register with manager
manager.register_provider("database", db_resource)
# Use in FSM
from dataknobs_fsm.api.simple import SimpleFSM
fsm = SimpleFSM(config)
# Resources can be passed to SimpleFSM constructor
fsm = SimpleFSM(config, resources={"database": db_config})
Supported Backends¶
Via dataknobs_data integration: - PostgreSQL - MySQL - SQLite - Memory (for testing) - File-based databases
File System Resource¶
Manage file system operations:
from dataknobs_fsm.resources.filesystem import FileSystemResourceAdapter
# Create filesystem resource
fs_resource = FileSystemResourceAdapter(
name="data_fs",
base_path="/data/processing",
temp_dir="/tmp/fsm",
auto_cleanup=True
)
manager.register_provider("filesystem", fs_resource)
# Acquire and use
with manager.resource_context("filesystem", "state_123") as fs:
# Read file
content = fs.read("input.csv")
# Write file
fs.write("output.json", processed_data)
# Create temp file
temp_path = fs.create_temp()
HTTP Service Resource¶
Manage HTTP connections:
from dataknobs_fsm.resources.http import HTTPResourceAdapter
# Create HTTP resource
http_resource = HTTPResourceAdapter(
name="api_service",
base_url="https://api.example.com",
timeout=30,
headers={"Authorization": "Bearer token"},
retry_count=3
)
manager.register_provider("api", http_resource)
# Use in state
with manager.resource_context("api", "state_123") as api:
response = api.get("/users/123")
data = response.json()
LLM Resource¶
Note: LLM functionality has moved to the dataknobs-llm package.
For LLM integration, use the dedicated LLM package which provides: - Multi-provider LLM support (OpenAI, Anthropic) - Conversation management with FSM-based flows - Prompt templating and versioning - RAG integration and caching
Quick example:
from dataknobs_llm import create_llm_provider, LLMConfig
# Create LLM provider
config = LLMConfig(
provider="openai",
model="gpt-4",
api_key="your-key",
temperature=0.7
)
llm = create_llm_provider(config)
# Use with conversation manager
response = await llm.acomplete("Analyze this text")
See the LLM package documentation for complete details.
Resource Pooling¶
Configure connection pooling for better performance:
from dataknobs_fsm.resources.pool import ResourcePool, PoolConfig
# Configure pool
pool_config = PoolConfig(
min_size=2,
max_size=10,
max_idle_time=300, # 5 minutes
acquire_timeout=30
)
# Register pooled resource
db_resource = DatabaseResourceAdapter(name="db", backend="postgresql")
manager.register_provider("database", db_resource, pool_config=pool_config)
# Pool metrics
pool_metrics = manager.get_metrics("database_pool")
print(f"Pool size: {pool_metrics['database_pool'].size}")
print(f"Available: {pool_metrics['database_pool'].available}")
Resource Requirements in FSM¶
Define resource requirements in FSM configuration:
Configuration-Based Requirements¶
networks:
- name: main
resources:
- database
- filesystem
states:
- name: process_data
resources:
- database # This state needs database
functions:
transform:
type: custom
name: process_with_db
Code-Based Requirements¶
from dataknobs_fsm.functions.base import ResourceConfig
# Define resource requirements
requirements = [
ResourceConfig(name="database", required=True, timeout=30),
ResourceConfig(name="cache", required=False)
]
# Configure resources from requirements
resources = manager.configure_from_requirements(requirements, owner_id="state_123")
Resource Lifecycle¶
Acquisition and Release Flow¶
# Resource lifecycle in state execution
class StateExecution:
def execute(self, state, context):
owner_id = f"state_{state.name}_{context.id}"
# 1. Acquire required resources
for resource_name in state.resources:
resource = manager.acquire(resource_name, owner_id)
context.resources[resource_name] = resource
try:
# 2. Execute state with resources
result = state.transform(context.data, context.resources)
# 3. Release resources on success
for resource_name in state.resources:
manager.release(resource_name, owner_id)
return result
except Exception as e:
# 4. Cleanup on error
manager.release_all(owner_id)
raise
Owner Tracking¶
Resources are tracked by owner ID:
# Check if owner has resource
if manager.has_resource("database", "state_123"):
db = manager.get_resource("database", "state_123")
# Get all owners of a resource
owners = manager.get_resource_owners("database")
print(f"Database is used by: {owners}")
# Release all resources for an owner
manager.release_all("state_123")
Custom Resources¶
Create custom resources by implementing the IResourceProvider interface:
from dataknobs_fsm.resources.base import (
BaseResourceProvider,
ResourceHealth,
ResourceStatus,
ResourceMetrics
)
class RedisResourceProvider(BaseResourceProvider):
def __init__(self, name: str, host: str, port: int = 6379):
super().__init__(name, {"host": host, "port": port})
self.host = host
self.port = port
self.client = None
def acquire(self, **kwargs):
import redis
if not self.client:
self.client = redis.Redis(host=self.host, port=self.port)
self.status = ResourceStatus.ACTIVE
return self.client
def release(self, resource):
self.status = ResourceStatus.IDLE
# Keep connection for reuse
def validate(self, resource):
try:
return resource.ping()
except:
return False
def health_check(self):
if self.validate(self.client):
return ResourceHealth.HEALTHY
return ResourceHealth.UNHEALTHY
def close(self):
if self.client:
self.client.close()
self.client = None
# Register custom resource
redis = RedisResourceProvider("cache", "localhost")
manager.register_provider("redis", redis)
Simple Resource Provider¶
For testing or simple use cases, create resources from dictionaries:
# Create simple provider from dict
manager.register_from_dict("config", {
"data": {
"api_key": "secret",
"endpoint": "https://api.example.com"
}
})
# Or create simple provider with static data
simple_provider = manager.create_simple_provider("settings", {"timeout": 30})
manager.register_provider("settings", simple_provider)
Resource Health Monitoring¶
Health Checks¶
# Check specific resource health
health = manager.health_check("database")
if health["database"] == ResourceHealth.UNHEALTHY:
# Handle unhealthy resource
pass
# Check all resources
all_health = manager.health_check()
for name, status in all_health.items():
print(f"{name}: {status}")
Resource Metrics¶
Track resource usage:
from dataknobs_fsm.resources.base import ResourceMetrics
# Get metrics for specific resource
metrics = manager.get_metrics("database")
db_metrics = metrics["database"]
print(f"Total acquisitions: {db_metrics.total_acquisitions}")
print(f"Active connections: {db_metrics.active_connections}")
print(f"Failed acquisitions: {db_metrics.failed_acquisitions}")
print(f"Average hold time: {db_metrics.average_hold_time}s")
print(f"Average acquisition time: {db_metrics.average_acquisition_time}s")
# Get all metrics
all_metrics = manager.get_metrics()
Async Cleanup¶
The ResourceManager supports async cleanup for resources:
import asyncio
async def cleanup_resources():
# Async cleanup of all resources
await manager.cleanup()
# Run cleanup
asyncio.run(cleanup_resources())
Resource Patterns¶
Resource Manager as Context Manager¶
# Auto-cleanup with context manager
with ResourceManager() as manager:
manager.register_provider("db", db_resource)
# Use resources
with manager.resource_context("db", "owner1") as db:
result = db.query("SELECT * FROM users")
# Manager and all resources cleaned up automatically
Shared Resource Manager¶
# Singleton pattern for shared resources
class SharedResources:
_instance = None
_manager = None
@classmethod
def get_manager(cls):
if cls._instance is None:
cls._instance = cls()
cls._manager = ResourceManager()
cls._setup_resources()
return cls._manager
@classmethod
def _setup_resources(cls):
# Register shared resources
cls._manager.register_provider(
"database",
DatabaseResourceAdapter("db", backend="postgresql")
)
Resource with FSM¶
from dataknobs_fsm.api.simple import SimpleFSM
# Pass resources to FSM
fsm = SimpleFSM(
config,
resources={
"database": {"backend": "postgresql", "host": "localhost"},
"cache": {"type": "redis", "host": "localhost"}
}
)
# Resources are automatically registered and available in states
Best Practices¶
1. Always Release Resources¶
Use context managers or try/finally:
# Good - automatic cleanup
with manager.resource_context("database", owner_id) as db:
result = db.query(sql)
# Also good - explicit cleanup
resource = manager.acquire("database", owner_id)
try:
result = use_resource(resource)
finally:
manager.release("database", owner_id)
2. Use Owner IDs¶
Always provide meaningful owner IDs:
# Good - traceable owner
owner_id = f"state_{state_name}_{execution_id}"
resource = manager.acquire("database", owner_id)
# Bad - generic owner
resource = manager.acquire("database", "state")
3. Monitor Resource Health¶
Regularly check resource health:
# Periodic health check
import threading
import time
def monitor_resources():
while True:
health = manager.health_check()
for name, status in health.items():
if status != ResourceHealth.HEALTHY:
logger.warning(f"Resource {name} is {status}")
time.sleep(60)
monitor_thread = threading.Thread(target=monitor_resources, daemon=True)
monitor_thread.start()
4. Configure Appropriate Pools¶
Size pools based on workload:
# High-throughput configuration
pool_config = PoolConfig(
min_size=5,
max_size=20,
acquire_timeout=10
)
# Low-throughput configuration
pool_config = PoolConfig(
min_size=1,
max_size=5,
max_idle_time=600 # Longer idle time
)
5. Handle Resource Errors¶
from dataknobs_fsm.functions.base import ResourceError
try:
resource = manager.acquire("database", owner_id, timeout=30)
except ResourceError as e:
logger.error(f"Failed to acquire {e.resource_name}: {e}")
# Use fallback or retry
Common Pitfalls¶
1. Resource Leaks¶
# ❌ Bad - resource not released
resource = manager.acquire("database", owner_id)
result = process(resource)
return result # Resource never released!
# ✅ Good - proper cleanup
with manager.resource_context("database", owner_id) as resource:
result = process(resource)
return result
2. Not Tracking Owners¶
# ❌ Bad - no owner tracking
db = db_resource.acquire() # Direct acquisition
# ✅ Good - use manager with owner
db = manager.acquire("database", "state_123")
3. Ignoring Health Checks¶
# ❌ Bad - no health validation
resource = manager.acquire("api", owner_id)
result = resource.call() # Might be unhealthy!
# ✅ Good - validate health
if manager.validate_resource("api"):
resource = manager.acquire("api", owner_id)
result = resource.call()
else:
# Handle unhealthy resource
pass
Troubleshooting¶
Debug Resource Status¶
# Get detailed resource information
all_resources = manager.get_all_resources()
for name, status in all_resources.items():
print(f"\nResource: {name}")
print(f" Has provider: {status['provider_exists']}")
print(f" Has pool: {status['has_pool']}")
print(f" Active count: {status['active_count']}")
print(f" Owners: {status['owners']}")
Common Issues¶
- Resource Timeout: Increase acquire_timeout in pool config
- Pool Exhausted: Increase max_size or reduce hold time
- Health Check Failures: Check resource connectivity and credentials
- Memory Leaks: Ensure all resources are released properly
Conclusion¶
The DataKnobs FSM resource management system provides:
- Centralized Management: Single point for all resource operations
- Owner Tracking: Know who owns what resources
- Health Monitoring: Track resource health and metrics
- Pooling Support: Efficient resource reuse
- Cleanup Guarantees: Automatic cleanup on errors
Use the ResourceManager for all external dependencies to ensure proper lifecycle management.
Next Steps¶
- Streaming Guide - Process large datasets with resources
- Data Modes Guide - Understand data handling modes
- API Reference - Resource API documentation
- Pattern Catalog - Resource usage patterns