Data Backends Example¶
This example demonstrates how to use different database backends with the dataknobs-data package.
Complete Example¶
#!/usr/bin/env python3
"""
Example showing how to use different database backends.
"""
from dataknobs_data import Record, Query, DatabaseFactory
from dataknobs_config import Config
import os
import tempfile
def demonstrate_memory_backend():
"""Demonstrate in-memory backend for caching."""
print("\n=== Memory Backend (Caching) ===")
factory = DatabaseFactory()
cache = factory.create(backend="memory")
# Store frequently accessed data
hot_data = Record({
"id": "hot-001",
"type": "cache",
"data": "frequently accessed",
"hits": 0
})
cache_id = cache.create(hot_data)
print(f"Cached data with ID: {cache_id}")
# Simulate cache hits
for _ in range(5):
data = cache.read(cache_id)
if data:
data.fields["hits"] += 1
cache.update(cache_id, data)
final = cache.read(cache_id)
print(f"Cache hits: {final.get_value('hits')}")
return cache
def demonstrate_file_backend():
"""Demonstrate file backend for persistence."""
print("\n=== File Backend (JSON/CSV/Parquet) ===")
factory = DatabaseFactory()
# JSON format
with tempfile.NamedTemporaryFile(suffix=".json", delete=False) as f:
json_path = f.name
json_db = factory.create(
backend="file",
path=json_path,
format="json"
)
# Store structured data
records = [
Record({"name": "Alice", "age": 30, "city": "New York"}),
Record({"name": "Bob", "age": 25, "city": "San Francisco"}),
Record({"name": "Charlie", "age": 35, "city": "New York"})
]
for record in records:
json_db.create(record)
# Query data
ny_residents = json_db.search(
Query().filter("city", "=", "New York")
)
print(f"New York residents: {len(ny_residents)}")
# CSV format for tabular data
with tempfile.NamedTemporaryFile(suffix=".csv", delete=False) as f:
csv_path = f.name
csv_db = factory.create(
backend="file",
path=csv_path,
format="csv"
)
# Note: CSV backend works best with flat, tabular data
csv_db.create(Record({"id": "1", "value": "100", "status": "active"}))
csv_db.create(Record({"id": "2", "value": "200", "status": "inactive"}))
print(f"CSV records: {csv_db.count()}")
# Clean up
os.unlink(json_path)
os.unlink(csv_path)
return json_db
def demonstrate_duckdb_backend():
"""Demonstrate DuckDB backend for fast analytics."""
print("\n=== DuckDB Backend (Analytics) ===")
factory = DatabaseFactory()
# Create in-memory DuckDB for fast analytics
duck_db = factory.create(backend="duckdb", path=":memory:")
duck_db.connect()
# Load analytical dataset
sales_data = [
Record({"product": "Widget", "sales": 15000, "region": "West", "quarter": "Q1"}),
Record({"product": "Gadget", "sales": 25000, "region": "East", "quarter": "Q1"}),
Record({"product": "Widget", "sales": 18000, "region": "West", "quarter": "Q2"}),
Record({"product": "Gadget", "sales": 30000, "region": "East", "quarter": "Q2"}),
Record({"product": "Doohickey", "sales": 12000, "region": "South", "quarter": "Q1"}),
Record({"product": "Doohickey", "sales": 16000, "region": "South", "quarter": "Q2"}),
]
# Batch load for optimal performance
ids = duck_db.create_batch(sales_data)
print(f"Loaded {len(ids)} sales records")
# Fast analytical queries (much faster than SQLite)
from dataknobs_data.query import Query, Operator
# Aggregate by region
west_sales = duck_db.search(Query().filter("region", Operator.EQ, "West"))
west_total = sum(r["sales"] for r in west_sales)
print(f"West region total sales: ${west_total:,}")
# High-value products
high_value = duck_db.search(Query().filter("sales", Operator.GT, 20000))
print(f"High-value products (>$20K): {len(high_value)}")
# Complex analytical query
q2_analysis = duck_db.search(Query().filter("quarter", Operator.EQ, "Q2"))
q2_total = sum(r["sales"] for r in q2_analysis)
q2_avg = q2_total / len(q2_analysis) if q2_analysis else 0
print(f"Q2 Analysis: Total=${q2_total:,}, Avg=${q2_avg:,.2f}")
duck_db.close()
return duck_db
def demonstrate_postgres_backend():
"""Demonstrate PostgreSQL backend (requires running instance)."""
print("\n=== PostgreSQL Backend (Production) ===")
# Check if PostgreSQL is available
factory = DatabaseFactory()
if not factory.is_backend_available("postgres"):
print("PostgreSQL backend not available")
print("Install with: pip install dataknobs-data[postgres]")
return None
try:
# Create PostgreSQL database
pg_db = factory.create(
backend="postgres",
host=os.environ.get("PG_HOST", "localhost"),
port=int(os.environ.get("PG_PORT", 5432)),
database=os.environ.get("PG_DATABASE", "test"),
user=os.environ.get("PG_USER", "postgres"),
password=os.environ.get("PG_PASSWORD", "postgres"),
table="demo_records"
)
# Create some records
user = Record({
"username": "john_doe",
"email": "john@example.com",
"role": "admin",
"active": True
})
user_id = pg_db.create(user)
print(f"Created user in PostgreSQL: {user_id}")
# Complex query
admins = pg_db.search(
Query()
.filter("role", "=", "admin")
.filter("active", "=", True)
.sort("username", "ASC")
)
print(f"Active admins: {len(admins)}")
return pg_db
except Exception as e:
print(f"Could not connect to PostgreSQL: {e}")
return None
def demonstrate_elasticsearch_backend():
"""Demonstrate Elasticsearch backend (requires running instance)."""
print("\n=== Elasticsearch Backend (Search) ===")
factory = DatabaseFactory()
if not factory.is_backend_available("elasticsearch"):
print("Elasticsearch backend not available")
print("Install with: pip install dataknobs-data[elasticsearch]")
return None
try:
# Create Elasticsearch database
es_db = factory.create(
backend="elasticsearch",
hosts=[os.environ.get("ES_HOST", "localhost:9200")],
index="demo_index",
username=os.environ.get("ES_USER"),
password=os.environ.get("ES_PASSWORD")
)
# Index documents
documents = [
Record({
"title": "Introduction to Python",
"content": "Python is a versatile programming language...",
"tags": ["python", "programming", "tutorial"],
"views": 1000
}),
Record({
"title": "Advanced Python Techniques",
"content": "Learn advanced Python programming patterns...",
"tags": ["python", "advanced", "patterns"],
"views": 500
}),
Record({
"title": "Web Development with Django",
"content": "Build web applications using Django framework...",
"tags": ["python", "django", "web"],
"views": 750
})
]
for doc in documents:
es_db.create(doc)
# Full-text search
results = es_db.search(
Query().filter("content", "LIKE", "%Python%")
)
print(f"Documents mentioning Python: {len(results)}")
# Range query
popular = es_db.search(
Query().filter("views", ">", 600)
)
print(f"Popular documents (>600 views): {len(popular)}")
return es_db
except Exception as e:
print(f"Could not connect to Elasticsearch: {e}")
return None
def demonstrate_s3_backend():
"""Demonstrate S3 backend (requires AWS credentials or LocalStack)."""
print("\n=== S3 Backend (Archive) ===")
factory = DatabaseFactory()
if not factory.is_backend_available("s3"):
print("S3 backend not available")
print("Install with: pip install dataknobs-data[s3]")
return None
try:
# For LocalStack testing
if os.environ.get("USE_LOCALSTACK"):
s3_db = factory.create(
backend="s3",
bucket="demo-bucket",
prefix="archives/",
region="us-east-1",
endpoint_url="http://localhost:4566",
access_key_id="test",
secret_access_key="test"
)
else:
# Production S3
s3_db = factory.create(
backend="s3",
bucket=os.environ.get("S3_BUCKET", "my-archive"),
prefix="demo/",
region=os.environ.get("AWS_REGION", "us-east-1")
)
# Archive data
archive = Record({
"type": "backup",
"timestamp": "2024-01-01T00:00:00Z",
"data": {"important": "data", "to": "archive"},
"size_mb": 42
})
archive_id = s3_db.create(archive)
print(f"Archived to S3: {archive_id}")
# List archives
total = s3_db.count()
print(f"Total archives in S3: {total}")
return s3_db
except Exception as e:
print(f"Could not connect to S3: {e}")
return None
def demonstrate_backend_migration():
"""Demonstrate migrating data between backends."""
print("\n=== Backend Migration ===")
factory = DatabaseFactory()
# Source: Memory (simulate production data)
source = factory.create(backend="memory")
# Create sample data
for i in range(10):
source.create(Record({
"id": f"record-{i}",
"value": i * 100,
"status": "active" if i % 2 == 0 else "inactive"
}))
print(f"Source has {source.count()} records")
# Destination: File (for backup)
with tempfile.NamedTemporaryFile(suffix=".json", delete=False) as f:
backup_path = f.name
dest = factory.create(
backend="file",
path=backup_path,
format="json"
)
# Migrate all data
all_records = source.search(Query())
for record in all_records:
dest.create(record)
print(f"Migrated {dest.count()} records to {backup_path}")
# Clean up
os.unlink(backup_path)
def main():
"""Run all backend demonstrations."""
print("DataKnobs Backend Examples")
print("=" * 50)
# Show available backends
factory = DatabaseFactory()
backends = factory.get_available_backends()
print(f"Available backends: {', '.join(backends)}")
# Demonstrate each backend
memory_db = demonstrate_memory_backend()
file_db = demonstrate_file_backend()
duck_db = demonstrate_duckdb_backend()
pg_db = demonstrate_postgres_backend()
es_db = demonstrate_elasticsearch_backend()
s3_db = demonstrate_s3_backend()
# Demonstrate migration
demonstrate_backend_migration()
print("\n" + "=" * 50)
print("✅ Backend examples completed!")
print("\nKey Takeaways:")
print("- Memory: Fast, temporary, good for caching")
print("- File: Simple persistence, good for small datasets")
print("- DuckDB: Fast analytics, 10-100x faster than SQLite for OLAP")
print("- PostgreSQL: ACID compliance, complex queries")
print("- Elasticsearch: Full-text search, analytics")
print("- S3: Unlimited storage, cost-effective archival")
if __name__ == "__main__":
main()
Running the Example¶
Basic Setup¶
# Install the package
pip install dataknobs-data
# Run with memory and file backends (no external dependencies)
python data_backends_example.py
With PostgreSQL¶
# Install PostgreSQL support
pip install dataknobs-data[postgres]
# Set environment variables
export PG_HOST=localhost
export PG_DATABASE=test
export PG_USER=postgres
export PG_PASSWORD=postgres
# Run the example
python data_backends_example.py
With Elasticsearch¶
# Install Elasticsearch support
pip install dataknobs-data[elasticsearch]
# Start Elasticsearch with Docker
docker run -p 9200:9200 -e "discovery.type=single-node" elasticsearch:8.11.0
# Set environment variables
export ES_HOST=localhost:9200
# Run the example
python data_backends_example.py
With S3 (LocalStack)¶
# Install S3 support
pip install dataknobs-data[s3]
# Start LocalStack
docker run -p 4566:4566 localstack/localstack
# Set environment variables
export USE_LOCALSTACK=1
# Run the example
python data_backends_example.py
Key Concepts Demonstrated¶
- Backend Selection: Choose the right backend for your use case
- Configuration: Use environment variables and config files
- CRUD Operations: Create, Read, Update, Delete across all backends
- Querying: Consistent query API across different storage types
- Migration: Move data between different backends
- Error Handling: Gracefully handle missing dependencies
- Performance: Understand performance characteristics of each backend