Async Connection Pooling¶
Overview¶
The Dataknobs async connection pooling system provides a robust, event loop-aware connection management solution for async database backends. It prevents common issues like "Event loop is closed" errors and ensures optimal performance through intelligent connection reuse.
Key Benefits
- 5.3x performance improvement for S3 operations with aioboto3
- 70% faster Elasticsearch operations with native async client
- Zero "Event loop is closed" errors with loop-aware pooling
- Automatic connection validation and recreation
- Resource cleanup on program exit
Architecture¶
Design Principles¶
The pooling system follows these core principles:
- Event Loop Isolation: Each event loop gets its own connection pool
- Lazy Initialization: Connections created only when needed
- Automatic Validation: Pools validated before use, recreated if invalid
- Generic Implementation: Works with any connection type via protocols
- Clean Shutdown: Automatic cleanup on program exit
Component Overview¶
graph TB
subgraph "Application Layer"
A[AsyncDatabase] --> B[ConnectionPoolManager]
end
subgraph "Pool Management"
B --> C[Pool per Event Loop]
C --> D[Connection Validation]
D --> E[Active Connections]
end
subgraph "Backend Specific"
E --> F[AsyncElasticsearch]
E --> G[aioboto3 Session]
E --> H[asyncpg Pool]
end
style A fill:#e1f5fe
style B fill:#b3e5fc
style C fill:#81d4fa
style E fill:#4fc3f7
Core Components¶
ConnectionPoolManager¶
The generic pool manager that handles pool lifecycle per event loop:
from dataknobs_data.pooling import ConnectionPoolManager
# Global manager instance
_pool_manager = ConnectionPoolManager()
# Get or create pool for current event loop
pool = await _pool_manager.get_pool(
config=pool_config,
create_pool_func=create_connection,
validate_pool_func=validate_connection,
close_pool_func=close_connection
)
Pool Configuration¶
Each backend has its own configuration class:
Implementation Examples¶
Native Async Elasticsearch¶
The Elasticsearch implementation uses the native AsyncElasticsearch client with connection pooling:
from dataknobs_data.backends.elasticsearch_async import AsyncElasticsearchDatabase
# Create database instance
db = AsyncElasticsearchDatabase({
"host": "localhost",
"port": 9200,
"index": "my_index"
})
# Connect (gets or creates pool for current event loop)
await db.connect()
# Use the database
record = Record({"name": "test", "value": 42})
doc_id = await db.create(record)
# Search with native performance
results = await db.search(
Query().filter("name", Operator.EQ, "test")
)
# Close when done (pool remains for other connections)
await db.close()
Native Async S3 with aioboto3¶
The S3 implementation uses aioboto3 with session pooling:
from dataknobs_data.backends.s3_async import AsyncS3Database
# Create database instance
db = AsyncS3Database({
"bucket": "my-bucket",
"prefix": "data/",
"region": "us-east-1"
})
# Connect (gets or creates session for current event loop)
await db.connect()
# Batch operations with concurrent uploads
records = [Record({"id": i}) for i in range(100)]
ids = await db.create_batch(records) # Uses asyncio.gather internally
# Stream large datasets efficiently
async for record in db.stream_read():
process(record)
await db.close()
PostgreSQL with asyncpg¶
The PostgreSQL implementation uses asyncpg's native connection pooling:
from dataknobs_data.backends.postgres_native import AsyncPostgresDatabase
# Create database instance
db = AsyncPostgresDatabase({
"host": "localhost",
"database": "mydb",
"user": "user",
"password": "password",
"min_connections": 10,
"max_connections": 20
})
# Connect (gets or creates pool for current event loop)
await db.connect()
# Use prepared statements for performance
await db.create_batch(records) # Uses COPY for bulk insert
# Transaction support
async with db.transaction():
await db.update(id1, record1)
await db.update(id2, record2)
await db.close()
Performance Benchmarks¶
S3 Operations¶
| Operation | Sync (boto3) | Async (aioboto3) | Improvement |
|---|---|---|---|
| Single Upload | 52ms | 48ms | 1.08x |
| Batch Upload (100) | 5,200ms | 980ms | 5.3x |
| Stream Read (1000) | 12,000ms | 2,100ms | 5.7x |
Elasticsearch Operations¶
| Operation | Old Async | Native Async | Improvement |
|---|---|---|---|
| Single Index | 15ms | 12ms | 1.25x |
| Bulk Index (100) | 450ms | 265ms | 1.7x |
| Complex Search | 85ms | 52ms | 1.63x |
Event Loop Management¶
Automatic Loop Detection¶
The pool manager automatically detects and handles the current event loop:
# Each event loop gets its own pool
async def task1():
db = AsyncS3Database(config)
await db.connect() # Creates pool for loop 1
async def task2():
db = AsyncS3Database(config)
await db.connect() # Reuses pool for loop 1
# Different loop gets different pool
async def in_new_loop():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
db = AsyncS3Database(config)
await db.connect() # Creates new pool for loop 2
Pool Validation¶
Pools are automatically validated and recreated if needed:
# Pool validation happens automatically
pool = await manager.get_pool(
config,
create_func,
validate_func # Called before returning existing pool
)
# Custom validation function
async def validate_elasticsearch_client(client):
if not await client.ping():
raise ConnectionError("Failed to ping Elasticsearch")
Resource Management¶
Automatic Cleanup¶
The pooling system includes automatic cleanup mechanisms:
# Cleanup on exit
import atexit
class ConnectionPoolManager:
def __init__(self):
self._pools = {}
atexit.register(self._cleanup_on_exit)
def _cleanup_on_exit(self):
"""Cleanup all pools on program exit"""
for pool in self._pools.values():
asyncio.run(pool.close())
Manual Cleanup¶
You can also manually clean up resources:
# Clean up specific pool
await manager.remove_pool(config)
# Clean up all pools
await manager.close_all()
# Check pool status
count = manager.get_pool_count()
info = manager.get_pool_info()
Configuration¶
Environment Variables¶
Configure pooling behavior via environment variables:
# PostgreSQL
export POSTGRES_POOL_MIN_SIZE=10
export POSTGRES_POOL_MAX_SIZE=20
export POSTGRES_POOL_TIMEOUT=30
# Elasticsearch
export ES_POOL_CONNECTIONS=10
export ES_POOL_MAXSIZE=20
# S3
export S3_POOL_MAX_CONNECTIONS=50
Configuration Files¶
Use configuration files for complex setups:
# config.yaml
databases:
elasticsearch:
hosts:
- http://node1:9200
- http://node2:9200
pool:
connections: 10
maxsize: 20
postgres:
host: localhost
database: mydb
pool:
min_size: 10
max_size: 20
timeout: 30
s3:
bucket: my-bucket
pool:
max_connections: 50
Error Handling¶
Connection Errors¶
The pooling system handles connection errors gracefully:
try:
pool = await manager.get_pool(config, create_func, validate_func)
except ConnectionError as e:
# Pool validation failed, new pool will be created
logger.warning(f"Pool invalid: {e}")
except Exception as e:
# Fatal error
logger.error(f"Failed to get pool: {e}")
raise
Retry Logic¶
Built-in retry logic for transient failures:
async def with_retry(func, max_retries=3):
for attempt in range(max_retries):
try:
return await func()
except ConnectionError:
if attempt == max_retries - 1:
raise
await asyncio.sleep(2 ** attempt)
Testing¶
Unit Tests¶
Test pooling behavior with mocks:
import pytest
from unittest.mock import AsyncMock
@pytest.mark.asyncio
async def test_pool_reuse():
manager = ConnectionPoolManager()
mock_create = AsyncMock(return_value="pool")
# First call creates pool
pool1 = await manager.get_pool(config, mock_create)
assert mock_create.call_count == 1
# Second call reuses pool
pool2 = await manager.get_pool(config, mock_create)
assert mock_create.call_count == 1
assert pool1 is pool2
Integration Tests¶
Test with real services:
@pytest.mark.integration
async def test_elasticsearch_pooling():
db1 = AsyncElasticsearchDatabase(config)
db2 = AsyncElasticsearchDatabase(config)
await db1.connect()
await db2.connect()
# Should share the same client pool
assert db1._client is db2._client
Troubleshooting¶
Common Issues¶
Event Loop Closed
Problem: "Event loop is closed" errors
Solution: The pooling system prevents this by maintaining separate pools per event loop
Connection Leaks
Problem: Connections not being released
Solution: Always use context managers or ensure proper cleanup:
Pool Exhaustion
Problem: "Pool is exhausted" errors
Solution: Increase pool size or optimize query patterns:
Debug Logging¶
Enable debug logging for pool operations:
import logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger("dataknobs_data.pooling")
logger.setLevel(logging.DEBUG)
Best Practices¶
Do's¶
✅ Reuse database instances when possible
# Good: Reuse instance
db = AsyncElasticsearchDatabase(config)
await db.connect()
for item in items:
await db.create(item)
✅ Use batch operations for bulk data
✅ Handle cleanup properly
Don'ts¶
❌ Don't create connections in loops
# Bad: Creates many connections
for item in items:
db = AsyncElasticsearchDatabase(config)
await db.connect()
await db.create(item)
❌ Don't share across event loops
# Bad: Sharing across loops
db = AsyncElasticsearchDatabase(config)
async def task1():
await db.connect() # Loop 1
# Different event loop
loop2 = asyncio.new_event_loop()
loop2.run_until_complete(db.connect()) # Error!
Migration Guide¶
From Old Async Implementation¶
# Old implementation
from dataknobs_data.backends.elasticsearch import AsyncElasticsearchDatabase
db = AsyncElasticsearchDatabase(config)
await db.connect()
# New implementation (automatic import)
from dataknobs_data.backends.elasticsearch import AsyncElasticsearchDatabase
db = AsyncElasticsearchDatabase(config)
await db.connect() # Uses native client with pooling
Configuration Changes¶
# Old configuration
config = {
"host": "localhost",
"port": 9200
}
# New configuration (backwards compatible)
config = {
"hosts": ["http://localhost:9200"], # Or still works with host/port
"index": "my_index"
}
API Reference¶
ConnectionPoolManager¶
dataknobs_data.pooling.base.ConnectionPoolManager ¶
Bases: Generic[PoolType]
Generic connection pool manager that handles pools per event loop.
This class ensures that each event loop gets its own connection pool, preventing cross-loop usage errors that can occur with async connections.
Initialize the connection pool manager.
Methods:
| Name | Description |
|---|---|
get_pool |
Get or create a connection pool for the current event loop. |
remove_pool |
Remove a pool for the current event loop. |
close_all |
Close all connection pools. |
get_pool_count |
Get the number of active pools. |
get_pool_info |
Get information about all active pools. |
Source code in packages/data/src/dataknobs_data/pooling/base.py
Functions¶
get_pool
async
¶
get_pool(
config: BasePoolConfig,
create_pool_func: Callable[[BasePoolConfig], Awaitable[PoolType]],
validate_pool_func: Callable[[PoolType], Awaitable[None]] | None = None,
close_pool_func: Callable[[PoolType], Awaitable[None]] | None = None,
) -> PoolType
Get or create a connection pool for the current event loop.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
BasePoolConfig
|
Pool configuration |
required |
create_pool_func
|
Callable[[BasePoolConfig], Awaitable[PoolType]]
|
Async function to create a new pool |
required |
validate_pool_func
|
Callable[[PoolType], Awaitable[None]] | None
|
Optional async function to validate an existing pool |
None
|
close_pool_func
|
Callable[[PoolType], Awaitable[None]] | None
|
Optional async function to close a pool |
None
|
Returns:
| Type | Description |
|---|---|
PoolType
|
Pool instance for the current event loop |
Source code in packages/data/src/dataknobs_data/pooling/base.py
remove_pool
async
¶
Remove a pool for the current event loop.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
BasePoolConfig
|
Pool configuration |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if pool was removed, False if not found |
Source code in packages/data/src/dataknobs_data/pooling/base.py
close_all
async
¶
get_pool_count ¶
get_pool_info ¶
Get information about all active pools.
Source code in packages/data/src/dataknobs_data/pooling/base.py
Backend Pool Configurations¶
dataknobs_data.pooling.elasticsearch.ElasticsearchPoolConfig
dataclass
¶
ElasticsearchPoolConfig(
hosts: list[str] | None = None,
index: str = "records",
api_key: str | None = None,
basic_auth: tuple | None = None,
verify_certs: bool = True,
ca_certs: str | None = None,
client_cert: str | None = None,
client_key: str | None = None,
ssl_show_warn: bool = True,
)
Bases: BasePoolConfig
Configuration for Elasticsearch connection pools.
Methods:
| Name | Description |
|---|---|
__post_init__ |
Set default hosts if not provided. |
from_dict |
Create from configuration dictionary. |
to_connection_string |
Convert to connection string (not used for ES, but required by base). |
to_hash_key |
Create a hashable key for this configuration. |
dataknobs_data.pooling.s3.S3PoolConfig
dataclass
¶
S3PoolConfig(
bucket: str,
prefix: str = "",
region_name: str | None = None,
aws_access_key_id: str | None = None,
aws_secret_access_key: str | None = None,
aws_session_token: str | None = None,
endpoint_url: str | None = None,
)
Bases: BasePoolConfig
Configuration for S3 connection pools.
Methods:
| Name | Description |
|---|---|
from_dict |
Create from configuration dictionary. |
to_connection_string |
Convert to connection string (not used for S3, but required by base). |
to_hash_key |
Create a hashable key for this configuration. |
dataknobs_data.pooling.postgres.PostgresPoolConfig
dataclass
¶
PostgresPoolConfig(
host: str = "localhost",
port: int = 5432,
database: str = "postgres",
user: str = "postgres",
password: str = "",
min_size: int = 2,
max_size: int = 5,
command_timeout: float | None = None,
ssl: Any | None = None,
)
Bases: BasePoolConfig
Configuration for PostgreSQL connection pools.
Methods:
| Name | Description |
|---|---|
from_dict |
Create from configuration dictionary. |
to_connection_string |
Convert to PostgreSQL connection string. |
to_hash_key |
Create a hashable key for this configuration. |
Functions¶
from_dict
classmethod
¶
Create from configuration dictionary.
Supports either a connection_string parameter or individual parameters.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
dict
|
Configuration dict with either: - connection_string: PostgreSQL connection string (postgresql://user:pass@host:port/db) - OR individual parameters: host, port, database, user, password |
required |
Returns:
| Type | Description |
|---|---|
PostgresPoolConfig
|
PostgresPoolConfig instance |
See Also¶
- Backends Overview - General backend documentation
- Elasticsearch Backend - Elasticsearch-specific features
- S3 Backend - S3-specific features
- PostgreSQL Backend - PostgreSQL-specific features
- Performance Tuning - Optimization tips