Performance Tuning Guide¶
Overview¶
This guide provides comprehensive performance optimization strategies for Dataknobs data backends, focusing on connection pooling, query optimization, and resource management.
Quick Wins
- Enable connection pooling for 5-10x performance improvement
- Use batch operations instead of individual operations
- Implement proper indexing strategies
- Configure appropriate pool sizes for your workload
Connection Pool Optimization¶
Pool Sizing¶
Optimal pool size depends on your workload characteristics:
PostgreSQL¶
from dataknobs_data.backends.postgres_native import AsyncPostgresDatabase
# For read-heavy workloads
read_config = {
"min_connections": 20, # Higher minimum for consistent performance
"max_connections": 50, # Allow bursts
"connection_timeout": 10,
"command_timeout": 30
}
# For write-heavy workloads
write_config = {
"min_connections": 10, # Lower minimum to reduce idle connections
"max_connections": 30, # Moderate maximum
"connection_timeout": 5,
"command_timeout": 60 # Longer timeout for complex writes
}
# For mixed workloads
mixed_config = {
"min_connections": 15,
"max_connections": 40,
"connection_timeout": 10,
"command_timeout": 45
}
Elasticsearch¶
# For search-heavy workloads
search_config = {
"hosts": ["http://localhost:9200"],
"connections": 20, # More connections for parallel searches
"maxsize": 50,
"timeout": 30
}
# For indexing-heavy workloads
index_config = {
"hosts": ["http://localhost:9200"],
"connections": 10,
"maxsize": 20,
"refresh": False, # Disable immediate refresh for bulk indexing
"timeout": 60
}
Pool Monitoring¶
Monitor pool health and utilization:
from dataknobs_data.pooling import ConnectionPoolManager
manager = ConnectionPoolManager()
# Get pool statistics
info = manager.get_pool_info()
for pool_name, stats in info.items():
print(f"Pool: {pool_name}")
print(f" Event Loop: {stats['loop_id']}")
print(f" Config Hash: {stats['config_hash']}")
# Monitor pool size
count = manager.get_pool_count()
if count > 100:
logger.warning(f"High pool count: {count}")
Batch Operations¶
Bulk Insert Performance¶
Compare different insertion strategies:
| Method | Records | Time | Records/sec |
|---|---|---|---|
| Individual inserts | 1,000 | 52s | 19/s |
| Batch insert (size=100) | 1,000 | 3.2s | 312/s |
| Batch insert (size=500) | 1,000 | 2.1s | 476/s |
| Stream write | 1,000 | 1.8s | 555/s |
# Optimal batch insertion
async def bulk_insert(db, records):
# For small datasets (< 1000 records)
if len(records) < 1000:
return await db.create_batch(records)
# For large datasets, use streaming
async def record_generator():
for record in records:
yield record
result = await db.stream_write(
record_generator(),
config=StreamConfig(
batch_size=500, # Optimal batch size
parallel=True # Enable parallel processing
)
)
return result
Bulk Read Performance¶
Optimize reading large datasets:
# Efficient bulk reading with pagination
async def read_all_optimized(db, query=None):
records = []
# Use streaming for large result sets
stream_config = StreamConfig(
batch_size=1000, # Fetch 1000 at a time
buffer_size=5000 # Buffer up to 5000 records
)
async for record in db.stream_read(query, stream_config):
records.append(record)
# Process in chunks to avoid memory issues
if len(records) >= 10000:
await process_chunk(records)
records = []
# Process remaining records
if records:
await process_chunk(records)
Query Optimization¶
Elasticsearch Query Performance¶
Use Filters Instead of Queries¶
# Slow: Using query context (scoring enabled)
slow_query = {
"query": {
"match": {
"status": "active"
}
}
}
# Fast: Using filter context (no scoring)
fast_query = {
"query": {
"bool": {
"filter": [
{"term": {"status.keyword": "active"}}
]
}
}
}
Optimize Field Mappings¶
# Configure optimal mappings for your use case
mappings = {
"properties": {
"id": {"type": "keyword"}, # Exact match only
"name": {"type": "text", "fields": {"keyword": {"type": "keyword"}}}, # Both text and keyword
"timestamp": {"type": "date", "format": "epoch_millis"}, # Efficient date storage
"data": {"type": "object", "enabled": False} # Disable indexing for storage-only fields
}
}
PostgreSQL Query Performance¶
Use Prepared Statements¶
# Prepared statements for repeated queries
async def find_by_status(db, status):
# Statement is prepared and cached
query = """
SELECT * FROM records
WHERE data->>'status' = $1
ORDER BY created_at DESC
LIMIT 100
"""
return await db.execute_query(query, status)
Optimize Indexes¶
# Create appropriate indexes
async def optimize_postgres(db):
# JSONB GIN index for data field
await db.execute("""
CREATE INDEX IF NOT EXISTS idx_data_gin
ON records USING GIN (data)
""")
# B-tree index for timestamp queries
await db.execute("""
CREATE INDEX IF NOT EXISTS idx_created_at
ON records (created_at DESC)
""")
# Partial index for active records
await db.execute("""
CREATE INDEX IF NOT EXISTS idx_active
ON records (id)
WHERE data->>'status' = 'active'
""")
Memory Management¶
Streaming for Large Datasets¶
# Memory-efficient processing
async def process_large_dataset(db):
processed = 0
# Stream records instead of loading all into memory
async for record in db.stream_read():
# Process one record at a time
await process_record(record)
processed += 1
# Periodic cleanup
if processed % 10000 == 0:
import gc
gc.collect()
logger.info(f"Processed {processed} records")
Connection Pool Memory¶
# Configure pools to minimize memory usage
memory_optimized_config = {
"min_connections": 5, # Lower minimum
"max_connections": 15, # Lower maximum
"max_inactive_connection_lifetime": 300, # Close idle connections after 5 minutes
"max_queries": 50000 # Recreate connection after 50k queries
}
Concurrent Operations¶
Parallel Processing¶
import asyncio
from typing import List
async def parallel_operations(db, items: List):
# Process items in parallel batches
batch_size = 10
for i in range(0, len(items), batch_size):
batch = items[i:i + batch_size]
# Create tasks for parallel execution
tasks = [
process_item(db, item)
for item in batch
]
# Execute in parallel and wait for all
results = await asyncio.gather(*tasks)
# Handle results
for result in results:
if result.error:
logger.error(f"Processing failed: {result.error}")
Semaphore for Rate Limiting¶
# Limit concurrent operations
class RateLimitedDatabase:
def __init__(self, db, max_concurrent=10):
self.db = db
self.semaphore = asyncio.Semaphore(max_concurrent)
async def create(self, record):
async with self.semaphore:
return await self.db.create(record)
Caching Strategies¶
In-Memory Caching¶
from functools import lru_cache
from typing import Optional
import hashlib
import json
class CachedDatabase:
def __init__(self, db):
self.db = db
self._cache = {}
self._cache_ttl = 300 # 5 minutes
async def read_cached(self, id: str) -> Optional[Record]:
# Check cache first
cache_key = f"record:{id}"
if cache_key in self._cache:
cached, timestamp = self._cache[cache_key]
if time.time() - timestamp < self._cache_ttl:
return cached
# Fetch from database
record = await self.db.read(id)
if record:
self._cache[cache_key] = (record, time.time())
return record
def invalidate(self, id: str):
cache_key = f"record:{id}"
self._cache.pop(cache_key, None)
Query Result Caching¶
class QueryCache:
def __init__(self):
self._cache = {}
self._max_size = 1000
def _hash_query(self, query: Query) -> str:
# Create deterministic hash of query
query_str = json.dumps(query.to_dict(), sort_keys=True)
return hashlib.md5(query_str.encode()).hexdigest()
async def search_cached(self, db, query: Query):
cache_key = self._hash_query(query)
# Check cache
if cache_key in self._cache:
return self._cache[cache_key]
# Execute query
results = await db.search(query)
# Cache results (with size limit)
if len(self._cache) >= self._max_size:
# Remove oldest entry
oldest = next(iter(self._cache))
del self._cache[oldest]
self._cache[cache_key] = results
return results
Monitoring and Profiling¶
Performance Metrics¶
import time
from contextlib import asynccontextmanager
class PerformanceMonitor:
def __init__(self):
self.metrics = {}
@asynccontextmanager
async def measure(self, operation: str):
start = time.perf_counter()
try:
yield
finally:
duration = time.perf_counter() - start
if operation not in self.metrics:
self.metrics[operation] = []
self.metrics[operation].append(duration)
def report(self):
for operation, durations in self.metrics.items():
avg = sum(durations) / len(durations)
max_duration = max(durations)
min_duration = min(durations)
print(f"{operation}:")
print(f" Avg: {avg:.3f}s")
print(f" Min: {min_duration:.3f}s")
print(f" Max: {max_duration:.3f}s")
# Usage
monitor = PerformanceMonitor()
async with monitor.measure("create"):
await db.create(record)
async with monitor.measure("search"):
await db.search(query)
monitor.report()
Database Profiling¶
# Enable query logging for debugging
import logging
logging.basicConfig(level=logging.DEBUG)
# PostgreSQL: Log slow queries
slow_query_config = {
"log_min_duration_statement": 100, # Log queries slower than 100ms
"log_statement": "all" # Log all statements
}
# Elasticsearch: Enable slow log
PUT /my_index/_settings
{
"index.search.slowlog.threshold.query.warn": "10s",
"index.search.slowlog.threshold.query.info": "5s",
"index.search.slowlog.threshold.query.debug": "2s",
"index.search.slowlog.threshold.query.trace": "500ms"
}
Configuration Templates¶
High-Performance Configuration¶
# high-performance.yaml
databases:
postgres:
host: localhost
database: dataknobs
pool:
min_size: 20
max_size: 50
timeout: 10
max_queries: 100000
max_inactive_connection_lifetime: 600
elasticsearch:
hosts:
- http://es1:9200
- http://es2:9200
- http://es3:9200
pool:
connections: 30
maxsize: 60
index:
number_of_shards: 3
number_of_replicas: 1
refresh_interval: "30s" # Batch refresh
s3:
bucket: dataknobs-prod
pool:
max_connections: 100
transfer:
multipart_threshold: 8388608 # 8MB
max_concurrency: 10
multipart_chunksize: 8388608
max_io_queue: 100
Memory-Optimized Configuration¶
# memory-optimized.yaml
databases:
postgres:
pool:
min_size: 5
max_size: 15
max_inactive_connection_lifetime: 300
elasticsearch:
pool:
connections: 10
maxsize: 20
index:
refresh_interval: "1s"
s3:
pool:
max_connections: 25
transfer:
max_concurrency: 5
max_io_queue: 50
Backend-Specific Optimizations¶
DuckDB Backend¶
DuckDB is optimized for analytical (OLAP) workloads and can be 10-100x faster than SQLite for aggregations and large scans.
When to Use DuckDB vs SQLite¶
# Use DuckDB for analytical queries
analytics_db = factory.create(backend="duckdb", path=":memory:")
# Analytical operations (fast with DuckDB)
query = Query().filter("amount", Operator.GT, 1000)
high_value_transactions = analytics_db.search(query)
total = sum(t["amount"] for t in high_value_transactions)
# Use SQLite for transactional workloads
transactional_db = factory.create(backend="sqlite", path="app.db")
# Transactional operations (better with SQLite)
with transactional_db.transaction():
transactional_db.create(record1)
transactional_db.update(record2.id, record2)
transactional_db.delete(record3_id)
Batch Size Optimization¶
DuckDB performs best with larger batch sizes:
from dataknobs_data.streaming import StreamConfig
# Optimal for DuckDB - larger batches
duckdb_config = StreamConfig(
batch_size=10000, # Larger batches for columnar storage
parallel=True
)
# Stream write with optimized batching
result = db.stream_write(data_generator(), duckdb_config)
In-Memory Analytics¶
For temporary analytics on datasets that fit in memory:
# Load data into in-memory DuckDB for fast analysis
analytics_db = factory.create(backend="duckdb", path=":memory:")
analytics_db.connect()
# Bulk load
records = load_dataset() # Your data source
analytics_db.create_batch(records)
# Fast analytical queries
query = ComplexQuery(
condition=LogicCondition(
operator=LogicOperator.AND,
conditions=[
FilterCondition(Filter("year", Operator.EQ, 2024)),
FilterCondition(Filter("sales", Operator.GT, 50000))
]
)
)
results = analytics_db.search(query)
# Aggregate operations (very fast with DuckDB)
total_sales = sum(r["sales"] for r in results)
avg_sales = total_sales / len(results) if results else 0
Read-Only Mode for Safety¶
Use read-only mode when querying production databases:
# Open in read-only mode - cannot accidentally modify data
readonly_db = factory.create(
backend="duckdb",
path="/production/analytics.duckdb",
read_only=True
)
readonly_db.connect()
# Safe querying without risk of modifications
results = readonly_db.search(Query())
Performance Comparison¶
Typical performance characteristics (relative to SQLite):
| Operation | DuckDB Performance | Best Use Case |
|---|---|---|
| Aggregations (SUM, AVG, COUNT) | 10-100x faster | Analytics dashboards |
| Large scans (>100K rows) | 5-20x faster | Report generation |
| Complex joins | 10-50x faster | Data analysis |
| Window functions | 20-100x faster | Time-series analysis |
| Simple inserts | Similar | Data loading |
| Batch inserts | 2-5x faster | ETL pipelines |
SQLite Backend¶
SQLite is optimized for transactional (OLTP) workloads with ACID guarantees.
WAL Mode for Better Concurrency¶
# Enable WAL mode for better concurrent read performance
sqlite_db = factory.create(
backend="sqlite",
path="app.db",
journal_mode="WAL", # Write-Ahead Logging
synchronous="NORMAL" # Balance safety and speed
)
sqlite_db.connect()
Transaction Batching¶
# Batch operations in transactions for better performance
with sqlite_db.transaction():
for record in large_dataset:
sqlite_db.create(record) # All in one transaction
PostgreSQL Backend¶
Connection Pool Tuning¶
# Optimize pool size based on workload
postgres_db = factory.create(
backend="postgres",
host="localhost",
database="myapp",
pool_size=20, # Adjust based on concurrent connections
max_overflow=10
)
Index Strategy¶
# Create indexes on frequently queried JSON fields
async def optimize_postgres(conn):
# GIN index for JSON queries
await conn.execute("""
CREATE INDEX IF NOT EXISTS idx_data_gin
ON records USING GIN (data)
""")
Benchmarking Tools¶
Simple Benchmark Script¶
#!/usr/bin/env python
import asyncio
import time
from dataknobs_data import AsyncDatabase, Record
async def benchmark_backend(backend_type: str, config: dict, num_records: int = 1000):
"""Benchmark a backend's performance."""
db = await AsyncDatabase.create(backend_type, config)
records = [
Record({"id": i, "data": f"test_{i}"})
for i in range(num_records)
]
# Benchmark writes
start = time.perf_counter()
ids = await db.create_batch(records)
write_time = time.perf_counter() - start
# Benchmark reads
start = time.perf_counter()
retrieved = await db.read_batch(ids)
read_time = time.perf_counter() - start
# Benchmark search
start = time.perf_counter()
results = await db.search(Query().limit(100))
search_time = time.perf_counter() - start
# Cleanup
await db.clear()
await db.close()
# Report results
print(f"Backend: {backend_type}")
print(f" Write: {num_records / write_time:.0f} records/sec")
print(f" Read: {num_records / read_time:.0f} records/sec")
print(f" Search: {search_time * 1000:.2f}ms for 100 records")
return {
"write_rps": num_records / write_time,
"read_rps": num_records / read_time,
"search_ms": search_time * 1000
}
# Run benchmarks
async def main():
backends = {
"memory": {},
"postgres": {"host": "localhost", "database": "test"},
"elasticsearch": {"hosts": ["http://localhost:9200"], "index": "test"},
"s3": {"bucket": "test-bucket"}
}
for backend, config in backends.items():
try:
await benchmark_backend(backend, config)
except Exception as e:
print(f"Failed to benchmark {backend}: {e}")
if __name__ == "__main__":
asyncio.run(main())
See Also¶
- Async Connection Pooling - Detailed pooling documentation
- Backends Overview - Backend comparison and selection
- Migration Guide - Data migration strategies
- Configuration - Configuration best practices