S3 Storage Example¶
This example demonstrates how to use AWS S3 as a storage backend with the dataknobs-data package.
Complete S3 Example¶
#!/usr/bin/env python3
"""
Comprehensive S3 storage example with DataKnobs.
This example demonstrates:
- S3 configuration with environment variables
- Batch operations for performance
- Cost optimization strategies
- LocalStack for local testing
- Migration from other backends to S3
"""
import os
import json
import time
from datetime import datetime, timedelta
from dataknobs_data import Record, Query, DatabaseFactory
from dataknobs_config import Config
class S3StorageDemo:
"""Demonstrate S3 storage capabilities."""
def __init__(self, use_localstack=False):
"""
Initialize S3 storage demo.
Args:
use_localstack: Use LocalStack for local testing
"""
self.factory = DatabaseFactory()
self.use_localstack = use_localstack
self.s3_db = self._setup_s3_backend()
def _setup_s3_backend(self):
"""Set up S3 backend based on environment."""
if self.use_localstack:
print("📦 Using LocalStack for S3 testing")
return self.factory.create(
backend="s3",
bucket="demo-bucket",
prefix="dataknobs/",
region="us-east-1",
endpoint_url="http://localhost:4566",
access_key_id="test",
secret_access_key="test",
max_workers=5 # Fewer workers for local testing
)
else:
print("☁️ Using AWS S3")
return self.factory.create(
backend="s3",
bucket=os.environ.get("S3_BUCKET", "dataknobs-demo"),
prefix=os.environ.get("S3_PREFIX", "data/"),
region=os.environ.get("AWS_REGION", "us-east-1"),
max_workers=10 # More workers for production
)
def demonstrate_basic_operations(self):
"""Show basic CRUD operations with S3."""
print("\n=== Basic S3 Operations ===")
# Create a record
document = Record({
"id": "doc-001",
"title": "Important Document",
"content": "This is stored in S3",
"size_kb": 42,
"created_at": datetime.utcnow().isoformat()
})
# Add metadata (stored as S3 tags)
document.metadata["author"] = "John Doe"
document.metadata["department"] = "Engineering"
document.metadata["version"] = "1.0"
# Store in S3
doc_id = self.s3_db.create(document)
print(f"Stored document in S3: {doc_id}")
# Read back
retrieved = self.s3_db.read(doc_id)
if retrieved:
print(f"Retrieved: {retrieved.get_value('title')}")
print(f"Metadata: {retrieved.metadata}")
# Update
retrieved.fields["version"] = "1.1"
retrieved.fields["updated_at"] = datetime.utcnow().isoformat()
success = self.s3_db.update(doc_id, retrieved)
print(f"Update successful: {success}")
# Delete
deleted = self.s3_db.delete(doc_id)
print(f"Deleted: {deleted}")
def demonstrate_batch_operations(self):
"""Show efficient batch operations."""
print("\n=== Batch Operations (Performance) ===")
# Create test data
batch_size = 50
records = []
for i in range(batch_size):
records.append(Record({
"id": f"batch-{i:04d}",
"type": "test",
"value": i * 100,
"timestamp": datetime.utcnow().isoformat(),
"category": f"cat-{i % 5}" # 5 categories
}))
# Batch create with timing
print(f"Creating {batch_size} records...")
start = time.time()
record_ids = self.s3_db.batch_create(records)
elapsed = time.time() - start
print(f"Created {len(record_ids)} records in {elapsed:.2f}s")
print(f"Average: {elapsed/len(record_ids)*1000:.1f}ms per record")
# Batch read
print(f"\nReading {len(record_ids)//2} records...")
start = time.time()
retrieved = self.s3_db.batch_read(record_ids[:len(record_ids)//2])
elapsed = time.time() - start
print(f"Read {len(retrieved)} records in {elapsed:.2f}s")
# Search (downloads all, filters locally)
print("\nSearching for category 'cat-2'...")
start = time.time()
results = self.s3_db.search(
Query().filter("category", "=", "cat-2")
)
elapsed = time.time() - start
print(f"Found {len(results)} records in {elapsed:.2f}s")
# Batch delete
print(f"\nDeleting {len(record_ids)} records...")
start = time.time()
results = self.s3_db.batch_delete(record_ids)
elapsed = time.time() - start
deleted = sum(results)
print(f"Deleted {deleted} records in {elapsed:.2f}s")
def demonstrate_large_files(self):
"""Show handling of large files."""
print("\n=== Large File Handling ===")
# Create a large record (simulated)
large_data = "x" * (1024 * 1024) # 1MB of data
large_record = Record({
"id": "large-file-001",
"type": "binary",
"data": large_data,
"size_mb": len(large_data) / (1024 * 1024),
"checksum": hash(large_data)
})
print(f"Storing {large_record.get_value('size_mb'):.1f}MB file...")
start = time.time()
record_id = self.s3_db.create(large_record)
elapsed = time.time() - start
print(f"Stored in {elapsed:.2f}s")
# Note: Files > 5MB automatically use multipart upload
# Clean up
self.s3_db.delete(record_id)
def demonstrate_organization_strategies(self):
"""Show how to organize data with prefixes."""
print("\n=== Data Organization with Prefixes ===")
# Create S3 backends with different prefixes
# This allows logical separation of data types
configs = [
("logs/2024/01/", "Log entries"),
("users/active/", "Active user data"),
("backups/daily/", "Daily backups"),
("analytics/raw/", "Raw analytics data")
]
for prefix, description in configs:
db = self.factory.create(
backend="s3",
bucket=self.s3_db.bucket,
prefix=prefix,
region=self.s3_db.region,
endpoint_url=self.s3_db.endpoint_url if self.use_localstack else None,
access_key_id="test" if self.use_localstack else None,
secret_access_key="test" if self.use_localstack else None
)
# Store sample data
sample = Record({"type": description, "timestamp": datetime.utcnow().isoformat()})
record_id = db.create(sample)
print(f"Stored {description} at prefix: {prefix}")
# Clean up
db.delete(record_id)
def demonstrate_cost_optimization(self):
"""Show cost optimization strategies."""
print("\n=== Cost Optimization Strategies ===")
print("1. Batch Operations:")
print(" - Individual PUTs: $0.005 per 1,000 requests")
print(" - Batch with 10 workers: 10x fewer requests")
print(f" - Current max_workers: {self.s3_db.max_workers}")
print("\n2. Storage Classes:")
print(" - Standard: $0.023/GB/month")
print(" - Infrequent Access: $0.0125/GB/month")
print(" - Glacier: $0.004/GB/month")
print("\n3. Lifecycle Policies (set in AWS Console):")
print(" - Move to IA after 30 days")
print(" - Move to Glacier after 90 days")
print(" - Delete after 365 days")
print("\n4. Data Compression:")
# Demonstrate compression
import gzip
import base64
original = {"data": "x" * 1000} # 1KB of repeated data
compressed = base64.b64encode(
gzip.compress(json.dumps(original).encode())
).decode()
original_size = len(json.dumps(original))
compressed_size = len(compressed)
print(f" - Original size: {original_size} bytes")
print(f" - Compressed size: {compressed_size} bytes")
print(f" - Compression ratio: {original_size/compressed_size:.1f}x")
def demonstrate_migration_to_s3(self):
"""Show migrating data from other backends to S3."""
print("\n=== Migration to S3 ===")
# Create source database (memory for demo)
source_db = self.factory.create(backend="memory")
# Generate sample data
print("Creating sample data in memory...")
for i in range(20):
source_db.create(Record({
"id": f"migrate-{i:03d}",
"data": f"Record {i}",
"created": (datetime.utcnow() - timedelta(days=i)).isoformat()
}))
print(f"Source has {source_db.count()} records")
# Migrate old records to S3
cutoff = datetime.utcnow() - timedelta(days=7)
old_records = source_db.search(
Query().filter("created", "<", cutoff.isoformat())
)
print(f"Found {len(old_records)} records older than 7 days")
if old_records:
# Add migration metadata
for record in old_records:
record.metadata["migrated_at"] = datetime.utcnow().isoformat()
record.metadata["source"] = "memory"
# Batch upload to S3
print("Migrating to S3...")
migrated_ids = self.s3_db.batch_create(old_records)
print(f"Migrated {len(migrated_ids)} records to S3")
# Remove from source
for record in old_records:
source_db.delete(record.metadata["id"])
print(f"Source now has {source_db.count()} records")
print(f"S3 archive has {self.s3_db.count()} records")
# Clean up S3
self.s3_db.batch_delete(migrated_ids)
def main():
"""Run S3 storage demonstration."""
print("DataKnobs S3 Storage Example")
print("=" * 50)
# Check environment
use_localstack = os.environ.get("USE_LOCALSTACK", "").lower() == "true"
if use_localstack:
print("\n📝 Using LocalStack for testing")
print("Start LocalStack with:")
print(" docker run -p 4566:4566 localstack/localstack")
else:
print("\n☁️ Using AWS S3")
print("Ensure AWS credentials are configured:")
print(" aws configure")
try:
# Create demo instance
demo = S3StorageDemo(use_localstack)
# Run demonstrations
demo.demonstrate_basic_operations()
demo.demonstrate_batch_operations()
demo.demonstrate_large_files()
demo.demonstrate_organization_strategies()
demo.demonstrate_cost_optimization()
demo.demonstrate_migration_to_s3()
print("\n" + "=" * 50)
print("✅ S3 storage example completed successfully!")
except Exception as e:
print(f"\n❌ Error: {e}")
print("\nTroubleshooting:")
print("1. For LocalStack: export USE_LOCALSTACK=true")
print("2. For AWS: Ensure credentials are configured")
print("3. Install S3 support: pip install dataknobs-data[s3]")
if __name__ == "__main__":
main()
Running the Example¶
With LocalStack (Local Testing)¶
# Start LocalStack
docker run -d -p 4566:4566 --name localstack localstack/localstack
# Install S3 support
pip install dataknobs-data[s3]
# Run with LocalStack
export USE_LOCALSTACK=true
python s3_storage_example.py
# Stop LocalStack when done
docker stop localstack
docker rm localstack
With AWS S3¶
# Configure AWS credentials
aws configure
# Set S3 bucket (must exist)
export S3_BUCKET=my-dataknobs-bucket
export S3_PREFIX=examples/
export AWS_REGION=us-east-1
# Install S3 support
pip install dataknobs-data[s3]
# Run with AWS
python s3_storage_example.py
With MinIO (S3-Compatible)¶
# Start MinIO
docker run -d -p 9000:9000 -p 9001:9001 \
--name minio \
-e "MINIO_ROOT_USER=minioadmin" \
-e "MINIO_ROOT_PASSWORD=minioadmin" \
minio/minio server /data --console-address ":9001"
# Configure for MinIO
export S3_ENDPOINT=http://localhost:9000
export AWS_ACCESS_KEY_ID=minioadmin
export AWS_SECRET_ACCESS_KEY=minioadmin
export S3_BUCKET=dataknobs
# Create bucket (using MinIO client or web console at http://localhost:9001)
# Run example
python s3_storage_example.py
Key Concepts Demonstrated¶
- Configuration: Environment variables and endpoint customization
- Batch Operations: Parallel uploads/downloads for performance
- Metadata: Using S3 tags for record metadata
- Large Files: Automatic multipart upload for files >5MB
- Organization: Using prefixes for logical data separation
- Cost Optimization: Strategies to minimize S3 costs
- Migration: Moving data from other backends to S3
- LocalStack: Testing without AWS costs
- Error Handling: Graceful handling of connection issues
- Performance Monitoring: Timing operations for optimization
Cost Considerations¶
Storage Costs (AWS S3 Standard)¶
- First 50 TB: $0.023 per GB/month
- Next 450 TB: $0.022 per GB/month
- Over 500 TB: $0.021 per GB/month
Request Costs¶
- PUT, COPY, POST, LIST: $0.005 per 1,000 requests
- GET, SELECT: $0.0004 per 1,000 requests
- DELETE: Free
Data Transfer¶
- Transfer IN: Free
- Transfer OUT to Internet: $0.09 per GB (first 10 TB/month)
- Transfer OUT to same region: Free
Optimization Tips¶
- Use batch operations to reduce API calls
- Enable S3 Intelligent-Tiering for automatic cost optimization
- Set lifecycle policies for old data
- Use S3 Select to reduce data transfer
- Consider S3 Glacier for long-term archives
- Monitor usage with AWS Cost Explorer