S3 Backend¶
The S3 backend provides scalable object storage using AWS S3 or S3-compatible services. It's ideal for archival, backup, and storing large datasets with unlimited capacity.
Features¶
- 🚀 Parallel Operations: Uses ThreadPoolExecutor for batch operations
- 🏷️ Metadata Support: Stores record metadata as S3 object tags
- 💾 Index Caching: Caches object listings for performance
- 🔧 S3-Compatible: Works with AWS S3, MinIO, LocalStack
- 📦 Multipart Upload: Automatic for large objects
- 💰 Cost Optimized: Efficient batching reduces API calls
Installation¶
Configuration¶
Basic Configuration¶
from dataknobs_data.backends.s3 import S3Database
db = S3Database.from_config({
"bucket": "my-data-bucket",
"prefix": "records/",
"region": "us-east-1"
})
With Environment Variables¶
# config.yaml
databases:
- name: archive
class: dataknobs_data.backends.s3.S3Database
bucket: ${S3_BUCKET}
prefix: ${S3_PREFIX:data/}
region: ${AWS_REGION:us-east-1}
endpoint_url: ${S3_ENDPOINT:} # Empty for AWS
max_workers: ${S3_MAX_WORKERS:10}
Using LocalStack for Testing¶
# For local development with LocalStack
db = S3Database.from_config({
"bucket": "test-bucket",
"prefix": "records/",
"region": "us-east-1",
"endpoint_url": "http://localhost:4566",
"access_key_id": "test",
"secret_access_key": "test"
})
Start LocalStack:
Usage Examples¶
Basic CRUD Operations¶
from dataknobs_data import Record, Query
from dataknobs_data.backends.s3 import S3Database
# Initialize
db = S3Database.from_config({
"bucket": "my-bucket",
"prefix": "app/records/"
})
# Create
record = Record({
"id": "user-123",
"name": "Alice",
"email": "alice@example.com",
"created_at": "2024-01-01T00:00:00Z"
})
record_id = db.create(record)
# Read
retrieved = db.read(record_id)
print(f"Name: {retrieved.get_value('name')}")
# Update
record.fields["email"] = "alice.smith@example.com"
db.update(record_id, record)
# Delete
db.delete(record_id)
Batch Operations¶
# Batch operations are much more efficient
records = [
Record({"id": f"user-{i}", "name": f"User {i}"})
for i in range(100)
]
# Parallel upload (10 workers by default)
record_ids = db.batch_create(records)
# Parallel read
retrieved = db.batch_read(record_ids[:50])
# Parallel delete
results = db.batch_delete(record_ids[50:])
Metadata and Tags¶
# S3 backend stores metadata as object tags
record = Record({
"name": "Important Document",
"content": "..."
})
# Metadata is preserved
record.metadata["created_by"] = "admin"
record.metadata["version"] = "1.0"
record.metadata["department"] = "Engineering"
record_id = db.create(record)
# Tags are visible in S3 console
# Tags: id=xxx, created_at=xxx, created_by=admin, version=1.0, department=Engineering
Searching and Filtering¶
# Note: S3 doesn't support native queries
# All objects are downloaded and filtered locally
query = Query()\
.filter("department", "=", "Engineering")\
.filter("active", "=", True)\
.sort("created_at", "DESC")\
.limit(10)
results = db.search(query)
# This downloads ALL objects, then filters locally
# Consider using Elasticsearch for complex queries
Performance Optimization¶
1. Batch Operations¶
# Bad: Individual operations
for record in records:
db.create(record) # One API call each
# Good: Batch operations
db.batch_create(records) # Parallel API calls
2. Prefix Organization¶
# Organize data with prefixes for efficient listing
db = S3Database.from_config({
"bucket": "my-bucket",
"prefix": "data/2024/01/" # Date-based organization
})
# Enables efficient listing of specific time periods
3. Index Caching¶
# The S3 backend caches object listings
db.count() # First call: lists all objects
db.count() # Subsequent calls: uses cache
# Cache expires after 60 seconds by default
# Configurable via cache_ttl parameter
4. Multipart Upload¶
# Large objects automatically use multipart upload
large_record = Record({
"id": "large-file",
"data": "x" * (10 * 1024 * 1024) # 10MB
})
# Automatically uses multipart upload for objects > 5MB
db.create(large_record)
Cost Optimization¶
API Call Reduction¶
# Use batch operations to minimize API calls
# Cost: $0.0004 per 1,000 PUT requests
# Bad: 1000 individual creates = 1000 PUT requests
for record in records:
db.create(record) # $0.40
# Good: Batch create = ~100 PUT requests (with 10 workers)
db.batch_create(records) # $0.04
Storage Classes¶
# Configure storage class for cost optimization
db = S3Database.from_config({
"bucket": "archive-bucket",
"prefix": "archive/",
"storage_class": "GLACIER" # For infrequent access
})
Lifecycle Policies¶
# Set up lifecycle policies in AWS Console or via boto3
# Example: Move to Glacier after 30 days
lifecycle_config = {
'Rules': [{
'ID': 'ArchiveOldData',
'Status': 'Enabled',
'Transitions': [{
'Days': 30,
'StorageClass': 'GLACIER'
}]
}]
}
Limitations and Considerations¶
Query Performance¶
- S3 doesn't support native queries
- All filtering happens client-side
- Consider using S3 Select for large objects
- Use Elasticsearch for complex queries
Consistency Model¶
- S3 provides strong read-after-write consistency
- List operations may have slight delay
- Use index caching carefully in high-write scenarios
Size Limits¶
- Maximum object size: 5TB
- Multipart upload required for objects > 5GB
- Maximum 10,000 parts per multipart upload
Advanced Features¶
S3 Select Integration (Future)¶
# Future enhancement: Use S3 Select for efficient queries
query = "SELECT * FROM S3Object WHERE department = 'Engineering'"
results = db.select_query(query) # Coming soon
Cross-Region Replication¶
# Configure in AWS Console for disaster recovery
# Primary bucket: us-east-1
# Replica bucket: eu-west-1
Versioning¶
Testing with LocalStack¶
import os
from dataknobs_data.backends.s3 import S3Database
# Set environment for LocalStack
os.environ["AWS_ACCESS_KEY_ID"] = "test"
os.environ["AWS_SECRET_ACCESS_KEY"] = "test"
# Create test database
test_db = S3Database.from_config({
"bucket": "test-bucket",
"prefix": "test/",
"region": "us-east-1",
"endpoint_url": "http://localhost:4566"
})
# Run tests
record = Record({"test": "data"})
record_id = test_db.create(record)
assert test_db.read(record_id) is not None
test_db.delete(record_id)
Migration from Other Backends¶
from dataknobs_data import DatabaseFactory
factory = DatabaseFactory()
# Source database (e.g., PostgreSQL)
source = factory.create(
backend="postgres",
host="localhost",
database="production"
)
# Destination S3 archive
archive = factory.create(
backend="s3",
bucket="archive-bucket",
prefix="postgres-backup/"
)
# Migrate old records
from datetime import datetime, timedelta
cutoff = datetime.now() - timedelta(days=365)
old_records = source.search(
Query().filter("created_at", "<", cutoff.isoformat())
)
# Batch upload to S3
archive.batch_create(old_records)
# Delete from source
for record in old_records:
source.delete(record.metadata["id"])
print(f"Archived {len(old_records)} records to S3")
Best Practices¶
- Use prefixes for logical organization
- Batch operations whenever possible
- Cache frequently accessed objects locally
- Monitor costs with AWS Cost Explorer
- Set up lifecycle policies for old data
- Use versioning for critical data
- Enable server-side encryption
- Configure bucket policies for security
- Use CloudFront for global distribution
- Implement retry logic for resilience