Examples¶
This section provides comprehensive examples of using the dataknobs-data package in various scenarios.
Basic Usage¶
Simple CRUD Operations¶
from dataknobs_data import Record, Query, DatabaseFactory
# Create a database
factory = DatabaseFactory()
db = factory.create(backend="memory")
# Create a record
user = Record({
"name": "Alice Johnson",
"email": "alice@example.com",
"age": 30,
"active": True
})
# Store the record
user_id = db.create(user)
print(f"Created user with ID: {user_id}")
# Read the record
retrieved_user = db.read(user_id)
print(f"Retrieved: {retrieved_user.get_value('name')}")
# Update the record
user.fields["age"] = 31
db.update(user_id, user)
# Search for records
query = Query().filter("active", "=", True).filter("age", ">", 25)
results = db.search(query)
print(f"Found {len(results)} active users over 25")
# Delete the record
db.delete(user_id)
Multi-Backend Application¶
E-Commerce Platform Example¶
from dataknobs_data import Record, Query, DatabaseFactory
from dataknobs_config import Config
from datetime import datetime, timedelta
import json
class ECommercePlatform:
"""Example e-commerce platform using multiple backends."""
def __init__(self, config_path: str):
# Load configuration
self.config = Config(config_path)
factory = DatabaseFactory()
# Register factory for cleaner config
self.config.register_factory("database", factory)
# Initialize different backends for different purposes
self.product_db = self.config.get_instance("databases", "products")
self.order_db = self.config.get_instance("databases", "orders")
self.cache_db = self.config.get_instance("databases", "cache")
self.search_db = self.config.get_instance("databases", "search")
self.archive_db = self.config.get_instance("databases", "archive")
def add_product(self, product_data: dict) -> str:
"""Add a new product."""
product = Record(product_data)
product.metadata["created_at"] = datetime.utcnow().isoformat()
# Store in primary database
product_id = self.product_db.create(product)
# Index for search
self.search_db.create(product)
# Cache popular products
if product_data.get("featured", False):
self.cache_db.create(product)
return product_id
def search_products(self, query_text: str, category: str = None):
"""Search for products."""
# Build search query
query = Query()
if query_text:
query = query.filter("name", "LIKE", f"%{query_text}%")
if category:
query = query.filter("category", "=", category)
# Search in Elasticsearch for best performance
return self.search_db.search(query)
def get_product(self, product_id: str):
"""Get product with caching."""
# Check cache first
cached = self.cache_db.read(product_id)
if cached:
return cached
# Get from primary database
product = self.product_db.read(product_id)
if product:
# Update cache
self.cache_db.create(product)
return product
def place_order(self, order_data: dict) -> str:
"""Place a new order."""
order = Record(order_data)
order.metadata["created_at"] = datetime.utcnow().isoformat()
order.metadata["status"] = "pending"
# Store order
order_id = self.order_db.create(order)
# Update product inventory
for item in order_data.get("items", []):
product = self.product_db.read(item["product_id"])
if product:
current_stock = product.get_value("stock", 0)
product.fields["stock"] = current_stock - item["quantity"]
self.product_db.update(item["product_id"], product)
return order_id
def archive_old_orders(self, days: int = 365):
"""Archive orders older than specified days."""
cutoff = datetime.utcnow() - timedelta(days=days)
# Find old orders
old_orders = self.order_db.search(
Query().filter("created_at", "<", cutoff.isoformat())
)
print(f"Archiving {len(old_orders)} old orders...")
# Move to archive (S3)
for order in old_orders:
# Add archive metadata
order.metadata["archived_at"] = datetime.utcnow().isoformat()
order.metadata["archived_from"] = "orders"
# Store in archive
self.archive_db.create(order)
# Remove from primary database
self.order_db.delete(order.metadata["id"])
return len(old_orders)
# Configuration file for the platform
config = {
"databases": [
{
"name": "products",
"factory": "database",
"backend": "postgres",
"host": "localhost",
"database": "ecommerce",
"table": "products"
},
{
"name": "orders",
"factory": "database",
"backend": "postgres",
"host": "localhost",
"database": "ecommerce",
"table": "orders"
},
{
"name": "cache",
"factory": "database",
"backend": "memory"
},
{
"name": "search",
"factory": "database",
"backend": "elasticsearch",
"hosts": ["localhost:9200"],
"index": "products"
},
{
"name": "archive",
"factory": "database",
"backend": "s3",
"bucket": "ecommerce-archive",
"prefix": "orders/"
}
]
}
# Save config and use platform
with open("ecommerce_config.yaml", "w") as f:
import yaml
yaml.dump(config, f)
platform = ECommercePlatform("ecommerce_config.yaml")
Data Migration¶
Migrating Between Backends¶
from dataknobs_data import DatabaseFactory, Query, Record
from datetime import datetime
import json
def migrate_data(source_config: dict, dest_config: dict,
transform_fn=None, batch_size: int = 100):
"""
Migrate data between different backends.
Args:
source_config: Configuration for source database
dest_config: Configuration for destination database
transform_fn: Optional function to transform records
batch_size: Number of records to process at once
"""
factory = DatabaseFactory()
# Create source and destination databases
source_db = factory.create(**source_config)
dest_db = factory.create(**dest_config)
print(f"Starting migration from {source_config['backend']} "
f"to {dest_config['backend']}...")
# Get total count
total = source_db.count()
print(f"Total records to migrate: {total}")
# Migrate in batches
offset = 0
migrated = 0
while offset < total:
# Get batch of records
query = Query().limit(batch_size).offset(offset)
batch = source_db.search(query)
if not batch:
break
# Transform records if needed
if transform_fn:
batch = [transform_fn(record) for record in batch]
# Add migration metadata
for record in batch:
record.metadata["migrated_at"] = datetime.utcnow().isoformat()
record.metadata["migrated_from"] = source_config["backend"]
# Batch create in destination
dest_db.batch_create(batch)
migrated += len(batch)
offset += batch_size
print(f"Progress: {migrated}/{total} records migrated")
print(f"Migration complete! Migrated {migrated} records")
# Verify migration
dest_count = dest_db.count()
if dest_count == total:
print("✅ Verification passed: counts match")
else:
print(f"⚠️ Warning: source had {total} records, "
f"destination has {dest_count}")
return migrated
# Example: Migrate from JSON file to PostgreSQL
source = {
"backend": "file",
"path": "data.json",
"format": "json"
}
destination = {
"backend": "postgres",
"host": "localhost",
"database": "production",
"user": "dbuser",
"password": "dbpass"
}
# Define transformation function
def add_timestamps(record: Record) -> Record:
"""Add timestamps to records during migration."""
if "created_at" not in record.fields:
record.fields["created_at"] = datetime.utcnow().isoformat()
record.fields["last_modified"] = datetime.utcnow().isoformat()
return record
# Run migration
migrate_data(source, destination, transform_fn=add_timestamps)
Testing with Different Backends¶
Parameterized Testing¶
import pytest
from dataknobs_data import DatabaseFactory, Record, Query
# Test with multiple backends
@pytest.fixture(params=["memory", "file"])
def database(request, tmp_path):
"""Create database for testing."""
factory = DatabaseFactory()
if request.param == "memory":
return factory.create(backend="memory")
elif request.param == "file":
return factory.create(
backend="file",
path=str(tmp_path / "test.json")
)
class TestDatabaseOperations:
"""Test database operations across different backends."""
def test_create_and_read(self, database):
"""Test creating and reading records."""
record = Record({"name": "Test", "value": 42})
record_id = database.create(record)
retrieved = database.read(record_id)
assert retrieved is not None
assert retrieved.get_value("name") == "Test"
assert retrieved.get_value("value") == 42
def test_update(self, database):
"""Test updating records."""
record = Record({"name": "Original"})
record_id = database.create(record)
record.fields["name"] = "Updated"
success = database.update(record_id, record)
assert success
retrieved = database.read(record_id)
assert retrieved.get_value("name") == "Updated"
def test_search(self, database):
"""Test searching records."""
# Create test data
records = [
Record({"type": "A", "value": 10}),
Record({"type": "B", "value": 20}),
Record({"type": "A", "value": 30}),
]
for record in records:
database.create(record)
# Search for type A
query = Query().filter("type", "=", "A")
results = database.search(query)
assert len(results) == 2
# Search with value filter
query = Query().filter("value", ">", 15)
results = database.search(query)
assert len(results) == 2
def test_batch_operations(self, database):
"""Test batch operations."""
records = [
Record({"id": f"item-{i}", "value": i})
for i in range(10)
]
# Batch create
record_ids = database.batch_create(records)
assert len(record_ids) == 10
# Batch read
retrieved = database.batch_read(record_ids[:5])
assert len(retrieved) == 5
# Batch delete
results = database.batch_delete(record_ids[5:])
assert sum(results) == 5
assert database.count() == 5
Real-Time Data Processing¶
Stream Processing Example¶
import asyncio
from dataknobs_data import DatabaseFactory, Record
from datetime import datetime
import random
class DataStreamProcessor:
"""Process real-time data streams."""
def __init__(self):
factory = DatabaseFactory()
# Hot storage for recent data
self.hot_storage = factory.create(backend="memory")
# Warm storage for processed data
self.warm_storage = factory.create(
backend="elasticsearch",
hosts=["localhost:9200"],
index="processed_data"
)
# Cold storage for archives
self.cold_storage = factory.create(
backend="s3",
bucket="data-archive",
prefix="streams/"
)
async def process_stream(self, stream_id: str):
"""Process incoming data stream."""
buffer = []
buffer_size = 100
async for data in self.generate_stream():
# Create record
record = Record({
"stream_id": stream_id,
"timestamp": datetime.utcnow().isoformat(),
"data": data,
"processed": False
})
# Store in hot storage immediately
self.hot_storage.create(record)
buffer.append(record)
# Process in batches
if len(buffer) >= buffer_size:
await self.process_batch(buffer)
buffer = []
# Process remaining
if buffer:
await self.process_batch(buffer)
async def process_batch(self, records: list):
"""Process a batch of records."""
processed_records = []
for record in records:
# Simulate processing
processed = await self.process_record(record)
processed_records.append(processed)
# Move to warm storage
self.warm_storage.batch_create(processed_records)
# Remove from hot storage
for record in records:
self.hot_storage.delete(record.metadata["id"])
print(f"Processed batch of {len(records)} records")
async def process_record(self, record: Record) -> Record:
"""Process individual record."""
# Simulate processing time
await asyncio.sleep(0.01)
# Mark as processed
record.fields["processed"] = True
record.fields["processed_at"] = datetime.utcnow().isoformat()
# Add computed fields
if "value" in record.fields:
record.fields["doubled"] = record.get_value("value") * 2
return record
async def generate_stream(self):
"""Generate simulated data stream."""
for i in range(1000):
yield {
"value": random.randint(1, 100),
"type": random.choice(["A", "B", "C"]),
"sensor_id": f"sensor-{random.randint(1, 10)}"
}
await asyncio.sleep(0.1) # Simulate real-time data
def archive_old_data(self, days: int = 7):
"""Move old data to cold storage."""
cutoff = datetime.utcnow() - timedelta(days=days)
# Find old records in warm storage
old_records = self.warm_storage.search(
Query().filter("timestamp", "<", cutoff.isoformat())
)
if old_records:
# Move to cold storage
self.cold_storage.batch_create(old_records)
# Remove from warm storage
for record in old_records:
self.warm_storage.delete(record.metadata["id"])
print(f"Archived {len(old_records)} old records")
# Run the stream processor
processor = DataStreamProcessor()
asyncio.run(processor.process_stream("stream-001"))
Advanced Configuration Example¶
Multi-Environment Setup¶
import os
from dataknobs_config import Config
from dataknobs_data import database_factory
class ApplicationDatabase:
"""Application database with environment-specific configuration."""
def __init__(self):
self.env = os.environ.get("APP_ENV", "development")
self.config = self._load_config()
self.db = self._create_database()
def _load_config(self):
"""Load environment-specific configuration."""
config = Config()
# Base configuration
base_config = {
"app_name": "MyApp",
"version": "1.0.0"
}
# Environment-specific configs
env_configs = {
"development": {
"database": {
"backend": "memory"
}
},
"testing": {
"database": {
"backend": "file",
"path": "/tmp/test_data.json"
}
},
"staging": {
"database": {
"backend": "postgres",
"host": "${DB_HOST:staging-db.example.com}",
"database": "${DB_NAME:staging}",
"user": "${DB_USER}",
"password": "${DB_PASSWORD}"
}
},
"production": {
"database": {
"backend": "postgres",
"host": "${DB_HOST}",
"database": "${DB_NAME}",
"user": "${DB_USER}",
"password": "${DB_PASSWORD}",
"pool_size": 50,
"ssl_mode": "require"
}
}
}
# Merge configurations
config.load(base_config)
config.merge(env_configs.get(self.env, {}))
return config
def _create_database(self):
"""Create database based on configuration."""
db_config = self.config.get("database")
return database_factory.create(**db_config)
def get_database(self):
"""Get the configured database instance."""
return self.db
def health_check(self):
"""Check database health."""
try:
count = self.db.count()
return {
"status": "healthy",
"environment": self.env,
"backend": self.config.get("database.backend"),
"record_count": count
}
except Exception as e:
return {
"status": "unhealthy",
"environment": self.env,
"error": str(e)
}
# Usage
app_db = ApplicationDatabase()
db = app_db.get_database()
print(app_db.health_check())
These examples demonstrate the flexibility and power of the dataknobs-data package across various use cases and scenarios.