The DataKnobs Data Package provides a unified data abstraction layer with support for multiple backends, data validation, migration, and streaming capabilities.
The Database
class provides async database operations, while SyncDatabase
provides synchronous operations.
from dataknobs_data import Database, SyncDatabase
# Async usage
async def main():
db = await AsyncDatabase.from_backend("memory") # Auto-connects
record = Record({"name": "Alice", "age": 30})
id = await db.create(record)
retrieved = await db.read(id)
await db.close()
# Sync usage
db = SyncDatabase.from_backend("memory") # Auto-connects
record = Record({"name": "Bob", "age": 25})
id = db.create(record)
retrieved = db.read(id)
db.close()
create(record: Record) -> str
: Create a new record and return its IDread(id: str) -> Record | None
: Read a record by IDupdate(id: str, record: Record) -> bool
: Update an existing recorddelete(id: str) -> bool
: Delete a recordexists(id: str) -> bool
: Check if a record existsupsert(id: str, record: Record) -> str
: Update or insert a recordsearch(query: Query) -> List[Record]
: Search for recordscount(query: Query | None) -> int
: Count matching recordsclear() -> int
: Delete all recordsstream_read(query, config) -> Iterator[Record]
: Stream recordsstream_write(records, config) -> StreamResult
: Stream write recordsRecord
represents a data record with fields and metadata.
from dataknobs_data import Record
# Create from dict
record = Record({"name": "Alice", "age": 30})
# With metadata
record = Record(
data={"name": "Bob", "age": 25},
metadata={"created_by": "user1", "version": 1}
)
# Access values
name = record.get_value("name")
age = record.get_value("age", default=0)
# Set values
record.set_value("email", "alice@example.com")
# Check fields
if record.has_field("email"):
email = record.get_value("email")
# Convert to dict
data_dict = record.to_dict()
Build queries to search and filter records.
from dataknobs_data import Query, Operator, SortOrder
# Simple query
query = Query().filter("age", Operator.GT, 25)
# Complex query with multiple filters
query = (Query()
.filter("status", Operator.EQ, "active")
.filter("age", Operator.GTE, 18)
.filter("age", Operator.LTE, 65)
.sort("age", SortOrder.DESC)
.limit(10)
.offset(20))
# Available operators
# Operator.EQ - equals
# Operator.NEQ - not equals
# Operator.GT - greater than
# Operator.GTE - greater than or equal
# Operator.LT - less than
# Operator.LTE - less than or equal
# Operator.IN - in list
# Operator.NOT_IN - not in list
# Operator.LIKE - pattern match (SQL LIKE)
# Operator.EXISTS - field exists
# Operator.NOT_EXISTS - field doesn't exist
# Operator.REGEX - regular expression match
Stream large datasets efficiently.
from dataknobs_data import StreamConfig, StreamResult
# Configure streaming
config = StreamConfig(
batch_size=100,
buffer_size=1000,
on_error=lambda e, r: print(f"Error: {e}")
)
# Stream read
async for record in db.stream_read(query, config):
process(record)
# Stream write
result = await db.stream_write(record_iterator, config)
print(f"Processed: {result.total_processed}")
print(f"Successful: {result.successful}")
print(f"Failed: {result.failed}")
The validation module (dataknobs_data.validation
) provides schema-based validation with constraints and type coercion.
Define validation schemas for your data.
from dataknobs_data.validation import Schema, FieldType
from dataknobs_data.validation.constraints import Required, Range, Length, Pattern
# Create schema
schema = Schema("UserSchema")
schema.field("id", FieldType.INTEGER, required=True)
schema.field("name", FieldType.STRING,
constraints=[Length(min=1, max=100)])
schema.field("email", FieldType.STRING,
constraints=[Pattern(r"^.+@.+\..+$")])
schema.field("age", FieldType.INTEGER,
constraints=[Range(min=0, max=150)])
# Validate record
record = Record({"id": 1, "name": "Alice", "email": "alice@example.com", "age": 30})
result = schema.validate(record)
if result.valid:
print("Valid record!")
else:
print("Errors:", result.errors)
# With type coercion
record = Record({"id": "1", "age": "30"}) # String values
result = schema.validate(record, coerce=True) # Will convert to int
Built-in constraints for field validation:
from dataknobs_data.validation.constraints import *
# Required fields
Required() # Field must be present and non-None
Required(allow_empty=True) # Allow empty strings/collections
# Numeric ranges
Range(min=0, max=100) # Inclusive range
Range(min=0) # Only minimum
Range(max=100) # Only maximum
# String/collection length
Length(min=1, max=50) # Length constraints
Length(min=5) # Minimum length only
# Pattern matching
Pattern(r"^\d{3}-\d{4}$") # Regex pattern
Pattern(r"^[A-Z]+$", re.IGNORECASE) # With flags
# Enumeration
Enum(["active", "inactive", "pending"]) # Must be one of values
# Uniqueness (with context)
Unique() # Value must be unique across all records
# Custom validation
def validate_email(value):
return "@" in value and "." in value
Custom(validate_email, "Invalid email format")
# Composite constraints
All([Required(), Range(min=0)]) # All must pass
AnyOf([Pattern(r"^\d+$"), Pattern(r"^[A-Z]+$")]) # At least one must pass
# Constraint composition with operators
constraint = Required() & Range(min=0, max=100) # AND
constraint = Length(min=10) | Pattern(r"^\d{5}$") # OR
Type coercion for automatic type conversion:
from dataknobs_data.validation import Coercer, FieldType
coercer = Coercer()
# Coerce single value
result = coercer.coerce("123", FieldType.INTEGER)
if result.valid:
value = result.value # 123 (int)
# Coerce multiple values
data = {"age": "30", "active": "true", "score": "95.5"}
results = coercer.coerce_many(data, {
"age": FieldType.INTEGER,
"active": FieldType.BOOLEAN,
"score": FieldType.FLOAT
})
The migration module (dataknobs_data.migration
) provides data transformation and migration capabilities.
Define operations to transform records:
from dataknobs_data.migration.operations import *
# Add field
add_op = AddField("status", default="active")
# Remove field
remove_op = RemoveField("deprecated_field")
# Rename field
rename_op = RenameField("old_name", "new_name")
# Transform field
def uppercase(value):
return value.upper() if value else value
transform_op = TransformField("name", uppercase)
# Composite operations
composite = CompositeOperation([
AddField("created_at", default=datetime.now()),
RemoveField("temp_field"),
RenameField("user_name", "username")
])
Create migrations to evolve your data:
from dataknobs_data.migration import Migration
# Create migration
migration = Migration(
name="add_user_status",
version="1.0.0",
description="Add status field to user records"
)
# Add operations
migration.add_operation(AddField("status", default="active"))
migration.add_operation(RemoveField("legacy_field"))
# Apply to record
record = Record({"name": "Alice", "legacy_field": "old"})
result = migration.apply(record)
if result.valid:
migrated = result.value # Record with changes applied
# Reverse migration
result = migration.reverse(migrated)
Batch migration for databases:
from dataknobs_data.migration import Migrator
# Create migrator
migrator = Migrator(source_db, target_db)
# Configure migration
migration = Migration("update_schema", "2.0.0")
migration.add_operation(AddField("version", default=2))
# Run migration
async def migrate():
progress = await migrator.migrate(
migration=migration,
query=Query().filter("type", Operator.EQ, "user"),
batch_size=100,
on_progress=lambda p: print(f"Progress: {p.percentage}%")
)
print(f"Migrated: {progress.successful}")
print(f"Failed: {progress.failed}")
print(f"Duration: {progress.duration}s")
In-memory storage for testing and development:
db = await Database.create("memory")
JSON file-based storage:
db = await Database.create("file", {
"path": "/data/records.json",
"pretty": True,
"backup": True
})
PostgreSQL database storage:
db = await Database.create("postgres", {
"host": "localhost",
"port": 5432,
"database": "mydb",
"user": "user",
"password": "pass",
"table": "records",
"schema": "public"
})
AWS S3 storage:
db = await Database.create("s3", {
"bucket": "my-bucket",
"prefix": "records/",
"region": "us-west-2",
"aws_access_key_id": "key",
"aws_secret_access_key": "secret"
})
Elasticsearch storage:
db = await Database.create("elasticsearch", {
"host": "localhost",
"port": 9200,
"index": "records",
"refresh": True
})
The package defines specific exceptions for different error scenarios:
from dataknobs_data import (
DataknobsDataError, # Base exception
RecordNotFoundError, # Record doesn't exist
RecordValidationError, # Validation failed
FieldTypeError, # Invalid field type
DatabaseConnectionError, # Connection issues
DatabaseOperationError, # Operation failed
QueryError, # Invalid query
SerializationError, # Serialization failed
BackendNotFoundError, # Unknown backend
ConfigurationError, # Invalid configuration
ConcurrencyError, # Concurrent access issue
TransactionError, # Transaction failed
MigrationError # Migration failed
)
try:
record = await db.read("unknown-id")
except RecordNotFoundError as e:
print(f"Record not found: {e}")
Convenient factory functions for creating databases:
from dataknobs_data import database_factory, async_database_factory
# Synchronous factory
db = database_factory("memory")
db = database_factory("postgres", config)
# Asynchronous factory
db = await async_database_factory("memory")
db = await async_database_factory("s3", config)
Many components support configuration through dictionaries or environment variables:
# From environment variables (using python-dotenv)
db = await Database.create("postgres") # Uses .env file
# From explicit config
config = {
"host": os.getenv("DB_HOST"),
"port": int(os.getenv("DB_PORT", 5432)),
"database": os.getenv("DB_NAME")
}
db = await Database.create("postgres", config)
close()
:
async with await Database.create("memory") as db:
# Use db
pass # Auto-closes
from dataknobs_data import Database, Record
async def process_record(db: Database, id: str) -> Record | None:
return await db.read(id)
try:
await db.create(record)
except RecordValidationError as e:
# Handle validation error
pass
except DatabaseOperationError as e:
# Handle database error
pass
# Good - streams data
async for record in db.stream_read(query):
process(record)
# Bad for large datasets - loads all into memory
records = await db.search(query)
for record in records:
process(record)
schema = create_user_schema()
result = schema.validate(record)
if result.valid:
await db.create(record)
else:
handle_validation_errors(result.errors)