API Orchestration Pattern¶
Overview¶
The API Orchestration pattern provides pre-configured FSM patterns for orchestrating multiple API calls with support for sequential, parallel, pipeline, and conditional execution modes. It includes built-in rate limiting, circuit breakers, caching, and comprehensive error handling.
Core Components¶
APIEndpoint¶
Configuration for individual API endpoints:
from dataknobs_fsm.patterns.api_orchestration import APIEndpoint
endpoint = APIEndpoint(
name="user_service",
url="https://api.example.com/users",
method="GET",
headers={"Authorization": "Bearer token"},
params={"limit": 100},
timeout=30.0,
retry_count=3,
retry_delay=1.0,
rate_limit=60, # 60 requests per minute
response_parser=lambda r: r.json(),
depends_on=["auth_service"] # Dependencies
)
OrchestrationMode¶
Execution modes for API orchestration:
from dataknobs_fsm.patterns.api_orchestration import OrchestrationMode
OrchestrationMode.SEQUENTIAL # Execute APIs one after another
OrchestrationMode.PARALLEL # Execute APIs concurrently
OrchestrationMode.FANOUT # One request triggers multiple APIs
OrchestrationMode.PIPELINE # Output of one API feeds into next
OrchestrationMode.CONDITIONAL # Execute based on conditions
OrchestrationMode.HYBRID # Mix of above patterns
APIOrchestrationConfig¶
Complete orchestration configuration:
from dataknobs_fsm.patterns.api_orchestration import APIOrchestrationConfig
config = APIOrchestrationConfig(
endpoints=[endpoint1, endpoint2, endpoint3],
mode=OrchestrationMode.SEQUENTIAL,
max_concurrent=10,
total_timeout=300.0,
fail_fast=False,
global_rate_limit=100, # 100 requests/minute globally
error_threshold=0.1, # Max 10% errors
circuit_breaker_threshold=5,
circuit_breaker_timeout=60.0,
cache_ttl=300, # Cache for 5 minutes
metrics_enabled=True
)
Basic Usage¶
APIOrchestrator¶
from dataknobs_fsm.patterns.api_orchestration import (
APIOrchestrator,
APIOrchestrationConfig,
APIEndpoint
)
import asyncio
# Define endpoints
user_api = APIEndpoint(
name="users",
url="https://api.example.com/users",
method="GET"
)
posts_api = APIEndpoint(
name="posts",
url="https://api.example.com/posts",
method="GET",
depends_on=["users"]
)
# Create configuration
config = APIOrchestrationConfig(
endpoints=[user_api, posts_api],
mode=OrchestrationMode.SEQUENTIAL
)
# Create orchestrator
orchestrator = APIOrchestrator(config)
# Execute orchestration
result = asyncio.run(orchestrator.execute({"user_id": 123}))
Orchestration Modes¶
Sequential Execution¶
APIs are called one after another:
config = APIOrchestrationConfig(
endpoints=[api1, api2, api3],
mode=OrchestrationMode.SEQUENTIAL
)
# Execution order: api1 -> api2 -> api3
Parallel Execution¶
APIs are called concurrently:
config = APIOrchestrationConfig(
endpoints=[api1, api2, api3],
mode=OrchestrationMode.PARALLEL,
max_concurrent=3
)
# All APIs called simultaneously
Pipeline Execution¶
Output of one API feeds into the next:
# Define transformation between APIs
api2 = APIEndpoint(
name="enrichment",
url="https://api.example.com/enrich",
transform_input=lambda data: {
"id": data["user"]["id"],
"name": data["user"]["name"]
}
)
config = APIOrchestrationConfig(
endpoints=[api1, api2],
mode=OrchestrationMode.PIPELINE
)
Fanout Execution¶
One request triggers multiple APIs:
config = APIOrchestrationConfig(
endpoints=[notification_api, analytics_api, audit_api],
mode=OrchestrationMode.FANOUT
)
# Single input sent to all APIs
Conditional Execution¶
Execute APIs based on conditions:
api2 = APIEndpoint(
name="premium_service",
url="https://api.example.com/premium",
condition=lambda data: data.get("user_type") == "premium"
)
config = APIOrchestrationConfig(
endpoints=[api1, api2],
mode=OrchestrationMode.CONDITIONAL
)
Rate Limiting¶
Global Rate Limiting¶
Apply rate limits across all APIs:
config = APIOrchestrationConfig(
endpoints=endpoints,
global_rate_limit=100, # 100 requests per minute
rate_limit_window=60 # 60 second window
)
Per-Endpoint Rate Limiting¶
Apply specific limits to each endpoint:
endpoint = APIEndpoint(
name="limited_api",
url="https://api.example.com/limited",
rate_limit=10, # 10 requests per minute
burst_limit=5 # Allow burst of 5 requests
)
Custom Rate Limiter¶
Rate limiting in APIOrchestrator is backed by InMemoryRateLimiter from dataknobs-common. You can use the same infrastructure directly:
from dataknobs_common.ratelimit import (
InMemoryRateLimiter, RateLimit, RateLimiterConfig,
)
# Create rate limiter with per-category rates
config = RateLimiterConfig(
default_rates=[RateLimit(limit=100, interval=60)],
categories={
"limited_api": [RateLimit(limit=10, interval=60)],
},
)
limiter = InMemoryRateLimiter(config)
# Use in async context
async def make_request(endpoint_name: str):
await limiter.acquire(endpoint_name) # Wait for capacity
response = await http_client.get(url)
return response
See the Rate Limiting guide for the full InMemoryRateLimiter API, including weighted operations, status checks, and distributed backends.
Circuit Breaker¶
Prevent cascading failures with circuit breakers:
Configuration¶
config = APIOrchestrationConfig(
endpoints=endpoints,
circuit_breaker_threshold=5, # Trip after 5 consecutive failures
circuit_breaker_timeout=60.0 # Reset after 60 seconds
)
Custom Circuit Breaker¶
from dataknobs_fsm.patterns.api_orchestration import CircuitBreaker
breaker = CircuitBreaker(threshold=3, timeout=30.0)
async def protected_call():
return await breaker.call(api_function, arg1, arg2)
Error Handling¶
Error Threshold¶
Stop orchestration if error rate exceeds threshold:
config = APIOrchestrationConfig(
endpoints=endpoints,
error_threshold=0.1, # Stop if >10% errors
fail_fast=True # Stop on first error
)
Per-Endpoint Error Handling¶
endpoint = APIEndpoint(
name="api",
url="https://api.example.com",
error_handler=lambda e: {
"error": str(e),
"fallback": "default_value"
}
)
Retry Configuration¶
endpoint = APIEndpoint(
name="api",
url="https://api.example.com",
retry_count=3,
retry_delay=1.0, # Exponential backoff
retry_on=[500, 502, 503, 504] # Retry on these status codes
)
Caching¶
Response Caching¶
Cache API responses to reduce load:
config = APIOrchestrationConfig(
endpoints=endpoints,
cache_ttl=300, # Cache for 5 minutes
cache_key_generator=lambda ep: f"{ep.name}:{ep.params}"
)
Cache Invalidation¶
orchestrator = APIOrchestrator(config)
# Clear cache for specific endpoint
orchestrator.clear_cache("user_service")
# Clear all cache
orchestrator.clear_all_cache()
Response Handling¶
Response Parsing¶
Parse and transform responses:
endpoint = APIEndpoint(
name="api",
url="https://api.example.com",
response_parser=lambda r: r.json()["data"],
transform_output=lambda data: {
"id": data["user_id"],
"name": data["full_name"]
}
)
Result Merging¶
Merge results from multiple APIs:
def merge_results(results):
"""Merge results from parallel APIs."""
merged = {}
for result in results:
merged.update(result)
return merged
config = APIOrchestrationConfig(
endpoints=endpoints,
mode=OrchestrationMode.PARALLEL,
result_merger=merge_results
)
Monitoring and Metrics¶
Enable Metrics¶
config = APIOrchestrationConfig(
endpoints=endpoints,
metrics_enabled=True,
log_requests=True,
log_responses=True
)
orchestrator = APIOrchestrator(config)
result = await orchestrator.execute(data)
# Access metrics
metrics = orchestrator.get_metrics()
print(f"Total requests: {metrics.total_requests}")
print(f"Success rate: {metrics.success_rate}%")
print(f"Average latency: {metrics.avg_latency}ms")
Custom Metrics¶
from dataknobs_fsm.io.utils import IOMetrics
metrics = IOMetrics()
# Track custom metrics
metrics.record_request("custom_api", latency=150)
metrics.record_error("custom_api", error_type="timeout")
Factory Functions¶
Sequential API Chain¶
from dataknobs_fsm.patterns.api_orchestration import create_sequential_api_chain
orchestrator = create_sequential_api_chain(
endpoints=[
("auth", "https://api.example.com/auth"),
("user", "https://api.example.com/user"),
("profile", "https://api.example.com/profile")
],
headers={"API-Key": "secret"}
)
result = await orchestrator.execute({"username": "user@example.com"})
Parallel API Aggregator¶
from dataknobs_fsm.patterns.api_orchestration import create_parallel_aggregator
orchestrator = create_parallel_aggregator(
endpoints=[
("service1", "https://api1.example.com/data"),
("service2", "https://api2.example.com/data"),
("service3", "https://api3.example.com/data")
],
max_concurrent=3,
timeout=10.0
)
aggregated = await orchestrator.execute()
Complete Examples¶
Example 1: User Data Enrichment¶
import asyncio
from dataknobs_fsm.patterns.api_orchestration import (
APIOrchestrator,
APIOrchestrationConfig,
APIEndpoint,
OrchestrationMode
)
async def enrich_user_data(user_id):
# Define API endpoints
user_api = APIEndpoint(
name="user",
url=f"https://api.example.com/users/{user_id}",
method="GET",
response_parser=lambda r: r.json()
)
posts_api = APIEndpoint(
name="posts",
url=f"https://api.example.com/users/{user_id}/posts",
method="GET",
response_parser=lambda r: r.json()["posts"]
)
analytics_api = APIEndpoint(
name="analytics",
url=f"https://analytics.example.com/users/{user_id}",
method="GET",
rate_limit=10 # Limited API
)
# Configure orchestration
config = APIOrchestrationConfig(
endpoints=[user_api, posts_api, analytics_api],
mode=OrchestrationMode.PARALLEL,
max_concurrent=3,
fail_fast=False, # Continue even if one fails
cache_ttl=600, # Cache for 10 minutes
result_merger=lambda results: {
"user": results[0],
"posts": results[1],
"analytics": results[2] if len(results) > 2 else None
}
)
# Execute orchestration
orchestrator = APIOrchestrator(config)
enriched_data = await orchestrator.execute({"user_id": user_id})
return enriched_data
# Run enrichment
result = asyncio.run(enrich_user_data(123))
Example 2: Payment Processing Pipeline¶
async def process_payment(payment_data):
# Validation API
validate_api = APIEndpoint(
name="validate",
url="https://payment.example.com/validate",
method="POST",
body=payment_data,
timeout=5.0
)
# Fraud check API
fraud_api = APIEndpoint(
name="fraud_check",
url="https://fraud.example.com/check",
method="POST",
transform_input=lambda data: {
"amount": data["amount"],
"card_hash": data["card_token"]
},
condition=lambda data: data["amount"] > 100 # Only for large amounts
)
# Payment processor
payment_api = APIEndpoint(
name="process",
url="https://processor.example.com/charge",
method="POST",
retry_count=1, # Limited retries for payments
depends_on=["validate", "fraud_check"]
)
# Notification service
notify_api = APIEndpoint(
name="notify",
url="https://notify.example.com/payment",
method="POST",
depends_on=["process"]
)
# Configure pipeline
config = APIOrchestrationConfig(
endpoints=[validate_api, fraud_api, payment_api, notify_api],
mode=OrchestrationMode.PIPELINE,
fail_fast=True, # Stop on any failure
circuit_breaker_threshold=3,
total_timeout=30.0
)
orchestrator = APIOrchestrator(config)
result = await orchestrator.execute(payment_data)
return result
Example 3: Microservices Health Check¶
async def health_check_all_services():
services = [
"auth", "user", "payment", "inventory",
"shipping", "notification", "analytics"
]
# Create health check endpoints
endpoints = [
APIEndpoint(
name=service,
url=f"https://{service}.example.com/health",
method="GET",
timeout=2.0,
retry_count=0, # No retries for health checks
error_handler=lambda e: {"status": "down", "error": str(e)}
)
for service in services
]
# Configure parallel health checks
config = APIOrchestrationConfig(
endpoints=endpoints,
mode=OrchestrationMode.PARALLEL,
max_concurrent=10,
total_timeout=5.0,
fail_fast=False, # Check all services
result_merger=lambda results: {
endpoints[i].name: results[i]
for i in range(len(results))
}
)
orchestrator = APIOrchestrator(config)
health_status = await orchestrator.execute({})
# Analyze results
healthy = sum(1 for s in health_status.values() if s.get("status") == "ok")
total = len(health_status)
return {
"healthy": healthy,
"total": total,
"percentage": (healthy / total) * 100,
"services": health_status
}
Best Practices¶
1. Use Appropriate Mode¶
- Sequential: When order matters or dependencies exist
- Parallel: For independent APIs to reduce latency
- Pipeline: When data flows through transformations
- Fanout: For event broadcasting
2. Configure Timeouts¶
# Set reasonable timeouts
endpoint = APIEndpoint(
name="api",
url="https://api.example.com",
timeout=10.0 # Endpoint timeout
)
config = APIOrchestrationConfig(
endpoints=[endpoint],
total_timeout=30.0 # Overall timeout
)
3. Handle Failures Gracefully¶
# Provide fallbacks
endpoint = APIEndpoint(
name="api",
url="https://api.example.com",
error_handler=lambda e: {"fallback": "default_value"}
)
# Use circuit breakers
config = APIOrchestrationConfig(
circuit_breaker_threshold=5,
circuit_breaker_timeout=60.0
)
4. Monitor Performance¶
# Enable metrics
config = APIOrchestrationConfig(
metrics_enabled=True,
log_requests=True
)
# Review metrics regularly
metrics = orchestrator.get_metrics()
if metrics.error_rate > 0.05: # >5% errors
alert_ops_team()
Troubleshooting¶
Common Issues¶
- Rate Limit Exceeded
- Reduce request rate
- Implement backoff
-
Use caching
-
Circuit Breaker Open
- Check endpoint health
- Review error logs
-
Adjust thresholds
-
Timeout Errors
- Increase timeouts
- Optimize API calls
-
Use parallel execution
-
High Error Rate
- Check API availability
- Review request format
- Implement retries
Next Steps¶
- Error Recovery Pattern - Advanced error handling
- LLM Workflow Pattern - LLM API orchestration
- Streaming Guide - Stream processing
- Examples - More API examples