Async Connection Pooling - Quick Start¶
Installation¶
The async pooling features are included in the standard dataknobs-data package:
Quick Examples¶
Elasticsearch with Native Async¶
from dataknobs_data import AsyncDatabase, Record, Query
# Simple usage - pooling is automatic!
async def main():
db = await AsyncDatabase.create("elasticsearch", {
"hosts": ["http://localhost:9200"],
"index": "my_data"
})
# Create records
record = Record({"name": "Alice", "age": 30})
id = await db.create(record)
# Search
results = await db.search(
Query().filter("age", ">=", 25)
)
await db.close()
S3 with aioboto3¶
# 5.3x faster for batch operations!
async def s3_example():
db = await AsyncDatabase.create("s3", {
"bucket": "my-bucket",
"region": "us-east-1"
})
# Batch upload - uses concurrent uploads
records = [Record({"id": i}) for i in range(100)]
ids = await db.create_batch(records)
await db.close()
PostgreSQL with asyncpg¶
# Native PostgreSQL performance
async def postgres_example():
db = await AsyncDatabase.create("postgres", {
"host": "localhost",
"database": "mydb",
"user": "user",
"password": "pass",
"min_connections": 10,
"max_connections": 20
})
# Uses prepared statements automatically
await db.create(Record({"data": "test"}))
# Transaction support
async with db.transaction():
await db.update(id1, record1)
await db.update(id2, record2)
await db.close()
Performance Comparison¶
| Backend | Operation | Old Implementation | New Pooled Implementation | Improvement |
|---|---|---|---|---|
| Elasticsearch | Bulk Index (1000) | 4.5s | 2.65s | 70% faster |
| S3 | Batch Upload (100) | 5.2s | 0.98s | 5.3x faster |
| PostgreSQL | Bulk Insert (1000) | 3.8s | 1.2s | 3.2x faster |
Key Features¶
🚀 Automatic Connection Pooling¶
- Event loop-aware pooling prevents "Event loop is closed" errors
- Connections automatically reused within same event loop
- Separate pools for different event loops
⚡ Native Async Clients¶
- Elasticsearch: Uses official AsyncElasticsearch client
- S3: Uses aioboto3 for true async S3 operations
- PostgreSQL: Uses asyncpg for maximum performance
🔄 Automatic Resource Management¶
- Pools validated before use, recreated if invalid
- Automatic cleanup on program exit
- Configurable pool sizes and timeouts
📊 Built for Scale¶
- Concurrent batch operations
- Streaming support for large datasets
- Connection retry and failover
Configuration¶
Environment Variables¶
# PostgreSQL pooling
export POSTGRES_POOL_MIN_SIZE=10
export POSTGRES_POOL_MAX_SIZE=20
# Elasticsearch pooling
export ES_POOL_CONNECTIONS=10
export ES_POOL_MAXSIZE=20
# S3 connection pooling
export S3_POOL_MAX_CONNECTIONS=50
Configuration Dictionary¶
config = {
# Elasticsearch
"elasticsearch": {
"hosts": ["http://localhost:9200"],
"index": "my_index",
"pool": {
"connections": 20,
"maxsize": 50
}
},
# PostgreSQL
"postgres": {
"host": "localhost",
"database": "mydb",
"pool": {
"min_size": 10,
"max_size": 20,
"timeout": 30
}
},
# S3
"s3": {
"bucket": "my-bucket",
"pool": {
"max_connections": 50
}
}
}
Common Patterns¶
Sharing Database Instances¶
# Good: Reuse database instance
class DataService:
def __init__(self):
self.db = None
async def initialize(self):
self.db = await AsyncDatabase.create("elasticsearch", config)
async def process_item(self, item):
# Reuses pooled connection
return await self.db.create(Record(item))
async def cleanup(self):
if self.db:
await self.db.close()
# Usage
service = DataService()
await service.initialize()
# Process many items with same connection pool
for item in items:
await service.process_item(item)
await service.cleanup()
Parallel Processing¶
import asyncio
async def parallel_operations(db, items):
# Process items in parallel using pooled connections
tasks = [db.create(Record(item)) for item in items]
results = await asyncio.gather(*tasks)
return results
Error Handling¶
async def resilient_operation(db, record, max_retries=3):
for attempt in range(max_retries):
try:
return await db.create(record)
except ConnectionError as e:
if attempt == max_retries - 1:
raise
# Pool will recreate connection on next attempt
await asyncio.sleep(2 ** attempt)
Migration from Old Implementation¶
The new implementation is backwards compatible! Your existing code will automatically use the new pooled implementation:
# Your existing code - no changes needed!
from dataknobs_data.backends.elasticsearch import AsyncElasticsearchDatabase
db = AsyncElasticsearchDatabase(config)
await db.connect()
# Automatically uses new native client with pooling
Troubleshooting¶
Issue: "Event loop is closed"¶
Solution: This is automatically prevented by the pooling system. Each event loop gets its own pool.
Issue: "Pool is exhausted"¶
Solution: Increase pool size in configuration:
Issue: Connection timeouts¶
Solution: Adjust timeout settings:
Learn More¶
- Full Documentation - Comprehensive pooling documentation
- Performance Tuning - Optimization strategies
- API Reference - Complete API documentation
- Examples - More code examples