Skip to content

Async Connection Pooling

Overview

The Dataknobs async connection pooling system provides a robust, event loop-aware connection management solution for async database backends. It prevents common issues like "Event loop is closed" errors and ensures optimal performance through intelligent connection reuse.

Key Benefits

  • 5.3x performance improvement for S3 operations with aioboto3
  • 70% faster Elasticsearch operations with native async client
  • Zero "Event loop is closed" errors with loop-aware pooling
  • Automatic connection validation and recreation
  • Resource cleanup on program exit

Architecture

Design Principles

The pooling system follows these core principles:

  1. Event Loop Isolation: Each event loop gets its own connection pool
  2. Lazy Initialization: Connections created only when needed
  3. Automatic Validation: Pools validated before use, recreated if invalid
  4. Generic Implementation: Works with any connection type via protocols
  5. Clean Shutdown: Automatic cleanup on program exit

Component Overview

graph TB
    subgraph "Application Layer"
        A[AsyncDatabase] --> B[ConnectionPoolManager]
    end

    subgraph "Pool Management"
        B --> C[Pool per Event Loop]
        C --> D[Connection Validation]
        D --> E[Active Connections]
    end

    subgraph "Backend Specific"
        E --> F[AsyncElasticsearch]
        E --> G[aioboto3 Session]
        E --> H[asyncpg Pool]
    end

    style A fill:#e1f5fe
    style B fill:#b3e5fc
    style C fill:#81d4fa
    style E fill:#4fc3f7

Core Components

ConnectionPoolManager

The generic pool manager that handles pool lifecycle per event loop:

from dataknobs_data.pooling import ConnectionPoolManager

# Global manager instance
_pool_manager = ConnectionPoolManager()

# Get or create pool for current event loop
pool = await _pool_manager.get_pool(
    config=pool_config,
    create_pool_func=create_connection,
    validate_pool_func=validate_connection,
    close_pool_func=close_connection
)

Pool Configuration

Each backend has its own configuration class:

from dataknobs_data.pooling.elasticsearch import ElasticsearchPoolConfig

config = ElasticsearchPoolConfig(
    hosts=["http://localhost:9200"],
    index="my_index",
    api_key="optional_key",
    verify_certs=True
)
from dataknobs_data.pooling.s3 import S3PoolConfig

config = S3PoolConfig(
    bucket="my-bucket",
    region="us-east-1",
    endpoint_url="http://localhost:4566",  # For LocalStack
    aws_access_key_id="key",
    aws_secret_access_key="secret"
)
from dataknobs_data.pooling.postgres import PostgresPoolConfig

config = PostgresPoolConfig(
    host="localhost",
    port=5432,
    database="mydb",
    user="user",
    password="password",
    min_size=10,
    max_size=20
)

Implementation Examples

Native Async Elasticsearch

The Elasticsearch implementation uses the native AsyncElasticsearch client with connection pooling:

from dataknobs_data.backends.elasticsearch_async import AsyncElasticsearchDatabase

# Create database instance
db = AsyncElasticsearchDatabase({
    "host": "localhost",
    "port": 9200,
    "index": "my_index"
})

# Connect (gets or creates pool for current event loop)
await db.connect()

# Use the database
record = Record({"name": "test", "value": 42})
doc_id = await db.create(record)

# Search with native performance
results = await db.search(
    Query().filter("name", Operator.EQ, "test")
)

# Close when done (pool remains for other connections)
await db.close()

Native Async S3 with aioboto3

The S3 implementation uses aioboto3 with session pooling:

from dataknobs_data.backends.s3_async import AsyncS3Database

# Create database instance
db = AsyncS3Database({
    "bucket": "my-bucket",
    "prefix": "data/",
    "region": "us-east-1"
})

# Connect (gets or creates session for current event loop)
await db.connect()

# Batch operations with concurrent uploads
records = [Record({"id": i}) for i in range(100)]
ids = await db.create_batch(records)  # Uses asyncio.gather internally

# Stream large datasets efficiently
async for record in db.stream_read():
    process(record)

await db.close()

PostgreSQL with asyncpg

The PostgreSQL implementation uses asyncpg's native connection pooling:

from dataknobs_data.backends.postgres_native import AsyncPostgresDatabase

# Create database instance
db = AsyncPostgresDatabase({
    "host": "localhost",
    "database": "mydb",
    "user": "user",
    "password": "password",
    "min_connections": 10,
    "max_connections": 20
})

# Connect (gets or creates pool for current event loop)
await db.connect()

# Use prepared statements for performance
await db.create_batch(records)  # Uses COPY for bulk insert

# Transaction support
async with db.transaction():
    await db.update(id1, record1)
    await db.update(id2, record2)

await db.close()

Performance Benchmarks

S3 Operations

Operation Sync (boto3) Async (aioboto3) Improvement
Single Upload 52ms 48ms 1.08x
Batch Upload (100) 5,200ms 980ms 5.3x
Stream Read (1000) 12,000ms 2,100ms 5.7x

Elasticsearch Operations

Operation Old Async Native Async Improvement
Single Index 15ms 12ms 1.25x
Bulk Index (100) 450ms 265ms 1.7x
Complex Search 85ms 52ms 1.63x

Event Loop Management

Automatic Loop Detection

The pool manager automatically detects and handles the current event loop:

# Each event loop gets its own pool
async def task1():
    db = AsyncS3Database(config)
    await db.connect()  # Creates pool for loop 1

async def task2():
    db = AsyncS3Database(config)
    await db.connect()  # Reuses pool for loop 1

# Different loop gets different pool
async def in_new_loop():
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)

    db = AsyncS3Database(config)
    await db.connect()  # Creates new pool for loop 2

Pool Validation

Pools are automatically validated and recreated if needed:

# Pool validation happens automatically
pool = await manager.get_pool(
    config,
    create_func,
    validate_func  # Called before returning existing pool
)

# Custom validation function
async def validate_elasticsearch_client(client):
    if not await client.ping():
        raise ConnectionError("Failed to ping Elasticsearch")

Resource Management

Automatic Cleanup

The pooling system includes automatic cleanup mechanisms:

# Cleanup on exit
import atexit

class ConnectionPoolManager:
    def __init__(self):
        self._pools = {}
        atexit.register(self._cleanup_on_exit)

    def _cleanup_on_exit(self):
        """Cleanup all pools on program exit"""
        for pool in self._pools.values():
            asyncio.run(pool.close())

Manual Cleanup

You can also manually clean up resources:

# Clean up specific pool
await manager.remove_pool(config)

# Clean up all pools
await manager.close_all()

# Check pool status
count = manager.get_pool_count()
info = manager.get_pool_info()

Configuration

Environment Variables

Configure pooling behavior via environment variables:

# PostgreSQL
export POSTGRES_POOL_MIN_SIZE=10
export POSTGRES_POOL_MAX_SIZE=20
export POSTGRES_POOL_TIMEOUT=30

# Elasticsearch
export ES_POOL_CONNECTIONS=10
export ES_POOL_MAXSIZE=20

# S3
export S3_POOL_MAX_CONNECTIONS=50

Configuration Files

Use configuration files for complex setups:

# config.yaml
databases:
  elasticsearch:
    hosts:
      - http://node1:9200
      - http://node2:9200
    pool:
      connections: 10
      maxsize: 20

  postgres:
    host: localhost
    database: mydb
    pool:
      min_size: 10
      max_size: 20
      timeout: 30

  s3:
    bucket: my-bucket
    pool:
      max_connections: 50

Error Handling

Connection Errors

The pooling system handles connection errors gracefully:

try:
    pool = await manager.get_pool(config, create_func, validate_func)
except ConnectionError as e:
    # Pool validation failed, new pool will be created
    logger.warning(f"Pool invalid: {e}")

except Exception as e:
    # Fatal error
    logger.error(f"Failed to get pool: {e}")
    raise

Retry Logic

Built-in retry logic for transient failures:

async def with_retry(func, max_retries=3):
    for attempt in range(max_retries):
        try:
            return await func()
        except ConnectionError:
            if attempt == max_retries - 1:
                raise
            await asyncio.sleep(2 ** attempt)

Testing

Unit Tests

Test pooling behavior with mocks:

import pytest
from unittest.mock import AsyncMock

@pytest.mark.asyncio
async def test_pool_reuse():
    manager = ConnectionPoolManager()
    mock_create = AsyncMock(return_value="pool")

    # First call creates pool
    pool1 = await manager.get_pool(config, mock_create)
    assert mock_create.call_count == 1

    # Second call reuses pool
    pool2 = await manager.get_pool(config, mock_create)
    assert mock_create.call_count == 1
    assert pool1 is pool2

Integration Tests

Test with real services:

@pytest.mark.integration
async def test_elasticsearch_pooling():
    db1 = AsyncElasticsearchDatabase(config)
    db2 = AsyncElasticsearchDatabase(config)

    await db1.connect()
    await db2.connect()

    # Should share the same client pool
    assert db1._client is db2._client

Troubleshooting

Common Issues

Event Loop Closed

Problem: "Event loop is closed" errors

Solution: The pooling system prevents this by maintaining separate pools per event loop

Connection Leaks

Problem: Connections not being released

Solution: Always use context managers or ensure proper cleanup:

async with AsyncDatabase.create("elasticsearch", config) as db:
    # Use database
    pass  # Automatically cleaned up

Pool Exhaustion

Problem: "Pool is exhausted" errors

Solution: Increase pool size or optimize query patterns:

config = PostgresPoolConfig(
    min_size=20,  # Increase minimum
    max_size=50   # Increase maximum
)

Debug Logging

Enable debug logging for pool operations:

import logging

logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger("dataknobs_data.pooling")
logger.setLevel(logging.DEBUG)

Best Practices

Do's

Reuse database instances when possible

# Good: Reuse instance
db = AsyncElasticsearchDatabase(config)
await db.connect()
for item in items:
    await db.create(item)

Use batch operations for bulk data

# Good: Batch operations
await db.create_batch(records)  # Single round trip

Handle cleanup properly

# Good: Proper cleanup
try:
    await db.connect()
    # Use database
finally:
    await db.close()

Don'ts

Don't create connections in loops

# Bad: Creates many connections
for item in items:
    db = AsyncElasticsearchDatabase(config)
    await db.connect()
    await db.create(item)

Don't share across event loops

# Bad: Sharing across loops
db = AsyncElasticsearchDatabase(config)

async def task1():
    await db.connect()  # Loop 1

# Different event loop
loop2 = asyncio.new_event_loop()
loop2.run_until_complete(db.connect())  # Error!

Migration Guide

From Old Async Implementation

# Old implementation
from dataknobs_data.backends.elasticsearch import AsyncElasticsearchDatabase

db = AsyncElasticsearchDatabase(config)
await db.connect()

# New implementation (automatic import)
from dataknobs_data.backends.elasticsearch import AsyncElasticsearchDatabase

db = AsyncElasticsearchDatabase(config)
await db.connect()  # Uses native client with pooling

Configuration Changes

# Old configuration
config = {
    "host": "localhost",
    "port": 9200
}

# New configuration (backwards compatible)
config = {
    "hosts": ["http://localhost:9200"],  # Or still works with host/port
    "index": "my_index"
}

API Reference

ConnectionPoolManager

dataknobs_data.pooling.base.ConnectionPoolManager

ConnectionPoolManager()

Bases: Generic[PoolType]

Generic connection pool manager that handles pools per event loop.

This class ensures that each event loop gets its own connection pool, preventing cross-loop usage errors that can occur with async connections.

Initialize the connection pool manager.

Methods:

Name Description
get_pool

Get or create a connection pool for the current event loop.

remove_pool

Remove a pool for the current event loop.

close_all

Close all connection pools.

get_pool_count

Get the number of active pools.

get_pool_info

Get information about all active pools.

Source code in packages/data/src/dataknobs_data/pooling/base.py
def __init__(self):
    """Initialize the connection pool manager."""
    # Map of (config_hash, loop_id) -> pool or (pool, close_func)
    self._pools: dict[tuple, PoolType | tuple[PoolType, Callable | None]] = {}
    # Weak references to event loops for cleanup
    self._loop_refs: WeakValueDictionary = WeakValueDictionary()
    # Register cleanup on exit
    atexit.register(self._cleanup_on_exit)

Functions

get_pool async

get_pool(
    config: BasePoolConfig,
    create_pool_func: Callable[[BasePoolConfig], Awaitable[PoolType]],
    validate_pool_func: Callable[[PoolType], Awaitable[None]] | None = None,
    close_pool_func: Callable[[PoolType], Awaitable[None]] | None = None,
) -> PoolType

Get or create a connection pool for the current event loop.

Parameters:

Name Type Description Default
config BasePoolConfig

Pool configuration

required
create_pool_func Callable[[BasePoolConfig], Awaitable[PoolType]]

Async function to create a new pool

required
validate_pool_func Callable[[PoolType], Awaitable[None]] | None

Optional async function to validate an existing pool

None
close_pool_func Callable[[PoolType], Awaitable[None]] | None

Optional async function to close a pool

None

Returns:

Type Description
PoolType

Pool instance for the current event loop

Source code in packages/data/src/dataknobs_data/pooling/base.py
async def get_pool(
    self,
    config: BasePoolConfig,
    create_pool_func: Callable[[BasePoolConfig], Awaitable[PoolType]],
    validate_pool_func: Callable[[PoolType], Awaitable[None]] | None = None,
    close_pool_func: Callable[[PoolType], Awaitable[None]] | None = None
) -> PoolType:
    """Get or create a connection pool for the current event loop.

    Args:
        config: Pool configuration
        create_pool_func: Async function to create a new pool
        validate_pool_func: Optional async function to validate an existing pool
        close_pool_func: Optional async function to close a pool

    Returns:
        Pool instance for the current event loop
    """
    loop = asyncio.get_running_loop()
    loop_id = id(loop)
    config_hash = hash(config.to_hash_key())
    pool_key = (config_hash, loop_id)

    # Check if we already have a pool for this config and loop
    if pool_key in self._pools:
        pool_entry = self._pools[pool_key]
        # Handle both old and new format
        if isinstance(pool_entry, tuple):
            pool, _ = pool_entry
        else:
            # Non-tuple format (backward compatibility)
            pool = pool_entry

        # Validate the pool if validation function provided
        if validate_pool_func:
            try:
                await validate_pool_func(pool)
                return pool
            except Exception as e:
                logger.warning(f"Pool for loop {loop_id} is invalid: {e}. Creating new one.")
                await self._close_pool(pool_key, close_pool_func)
        else:
            return pool

    # Create new pool
    logger.info(f"Creating new connection pool for loop {loop_id}")
    pool = await create_pool_func(config)

    # Store pool and loop reference with close function
    self._pools[pool_key] = (pool, close_pool_func)
    self._loop_refs[loop_id] = loop

    return pool

remove_pool async

remove_pool(config: BasePoolConfig) -> bool

Remove a pool for the current event loop.

Parameters:

Name Type Description Default
config BasePoolConfig

Pool configuration

required

Returns:

Type Description
bool

True if pool was removed, False if not found

Source code in packages/data/src/dataknobs_data/pooling/base.py
async def remove_pool(self, config: BasePoolConfig) -> bool:
    """Remove a pool for the current event loop.

    Args:
        config: Pool configuration

    Returns:
        True if pool was removed, False if not found
    """
    loop_id = id(asyncio.get_running_loop())
    config_hash = hash(config.to_hash_key())
    pool_key = (config_hash, loop_id)

    if pool_key in self._pools:
        await self._close_pool(pool_key)
        return True
    return False

close_all async

close_all()

Close all connection pools.

Source code in packages/data/src/dataknobs_data/pooling/base.py
async def close_all(self):
    """Close all connection pools."""
    for pool_key in list(self._pools.keys()):
        await self._close_pool(pool_key)

get_pool_count

get_pool_count() -> int

Get the number of active pools.

Source code in packages/data/src/dataknobs_data/pooling/base.py
def get_pool_count(self) -> int:
    """Get the number of active pools."""
    return len(self._pools)

get_pool_info

get_pool_info() -> dict[str, Any]

Get information about all active pools.

Source code in packages/data/src/dataknobs_data/pooling/base.py
def get_pool_info(self) -> dict[str, Any]:
    """Get information about all active pools."""
    info = {}
    for (config_hash, loop_id), pool_entry in self._pools.items():
        # Handle both old and new format
        if isinstance(pool_entry, tuple):
            pool, _ = pool_entry
        else:
            # Non-tuple format (backward compatibility)
            pool = pool_entry

        key = f"config_{config_hash}_loop_{loop_id}"
        info[key] = {
            "loop_id": loop_id,
            "config_hash": config_hash,
            "pool": str(pool)
        }
    return info

Backend Pool Configurations

dataknobs_data.pooling.elasticsearch.ElasticsearchPoolConfig dataclass

ElasticsearchPoolConfig(
    hosts: list[str] | None = None,
    index: str = "records",
    api_key: str | None = None,
    basic_auth: tuple | None = None,
    verify_certs: bool = True,
    ca_certs: str | None = None,
    client_cert: str | None = None,
    client_key: str | None = None,
    ssl_show_warn: bool = True,
)

Bases: BasePoolConfig

Configuration for Elasticsearch connection pools.

Methods:

Name Description
__post_init__

Set default hosts if not provided.

from_dict

Create from configuration dictionary.

to_connection_string

Convert to connection string (not used for ES, but required by base).

to_hash_key

Create a hashable key for this configuration.

Functions

__post_init__

__post_init__()

Set default hosts if not provided.

from_dict classmethod

from_dict(config: dict) -> ElasticsearchPoolConfig

Create from configuration dictionary.

to_connection_string

to_connection_string() -> str

Convert to connection string (not used for ES, but required by base).

to_hash_key

to_hash_key() -> tuple

Create a hashable key for this configuration.

dataknobs_data.pooling.s3.S3PoolConfig dataclass

S3PoolConfig(
    bucket: str,
    prefix: str = "",
    region_name: str | None = None,
    aws_access_key_id: str | None = None,
    aws_secret_access_key: str | None = None,
    aws_session_token: str | None = None,
    endpoint_url: str | None = None,
)

Bases: BasePoolConfig

Configuration for S3 connection pools.

Methods:

Name Description
from_dict

Create from configuration dictionary.

to_connection_string

Convert to connection string (not used for S3, but required by base).

to_hash_key

Create a hashable key for this configuration.

Functions

from_dict classmethod

from_dict(config: dict) -> S3PoolConfig

Create from configuration dictionary.

to_connection_string

to_connection_string() -> str

Convert to connection string (not used for S3, but required by base).

to_hash_key

to_hash_key() -> tuple

Create a hashable key for this configuration.

dataknobs_data.pooling.postgres.PostgresPoolConfig dataclass

PostgresPoolConfig(
    host: str = "localhost",
    port: int = 5432,
    database: str = "postgres",
    user: str = "postgres",
    password: str = "",
    min_size: int = 2,
    max_size: int = 5,
    command_timeout: float | None = None,
    ssl: Any | None = None,
)

Bases: BasePoolConfig

Configuration for PostgreSQL connection pools.

Methods:

Name Description
from_dict

Create from configuration dictionary.

to_connection_string

Convert to PostgreSQL connection string.

to_hash_key

Create a hashable key for this configuration.

Functions

from_dict classmethod

from_dict(config: dict) -> PostgresPoolConfig

Create from configuration dictionary.

Supports either a connection_string parameter or individual parameters.

Parameters:

Name Type Description Default
config dict

Configuration dict with either: - connection_string: PostgreSQL connection string (postgresql://user:pass@host:port/db) - OR individual parameters: host, port, database, user, password

required

Returns:

Type Description
PostgresPoolConfig

PostgresPoolConfig instance

to_connection_string

to_connection_string() -> str

Convert to PostgreSQL connection string.

to_hash_key

to_hash_key() -> tuple

Create a hashable key for this configuration.

See Also