Skip to content

Common API Reference

Complete API reference for the dataknobs-common package.

📖 Also see: Auto-generated API Reference - Complete documentation from source code docstrings

This page provides curated examples and usage patterns. The auto-generated reference provides exhaustive technical documentation with all methods, parameters, and type annotations.


Module Overview

The dataknobs-common package provides these core modules:

  • dataknobs_common.exceptions - Exception hierarchy with context support
  • dataknobs_common.registry - Generic registry implementations
  • dataknobs_common.serialization - Serialization protocols and utilities
  • dataknobs_common.retry - Configurable retry execution with backoff strategies
  • dataknobs_common.transitions - Stateless transition validation for status graphs
  • dataknobs_common.events - Event bus for pub/sub messaging
  • dataknobs_common.testing - Test utilities, markers, and configuration factories

Exceptions Module

Base Exception

DataknobsError

Base exception for all dataknobs packages.

class DataknobsError(Exception):
    """Base exception for all dataknobs packages."""

Constructor:

DataknobsError(message: str, context: dict[str, Any] | None = None)

Parameters: - message (str): Error message - context (dict[str, Any] | None): Optional context dictionary with additional error details

Attributes: - message (str): The error message - context (dict[str, Any] | None): Context dictionary if provided - details (property): Alias for context (for backward compatibility)

Example:

from dataknobs_common import DataknobsError

# Simple error
raise DataknobsError("Something went wrong")

# Error with context
raise DataknobsError(
    "Operation failed",
    context={"operation": "save", "item_id": "123"}
)

# Access context
try:
    operation()
except DataknobsError as e:
    print(e.message)  # "Operation failed"
    print(e.context)  # {"operation": "save", "item_id": "123"}
    print(e.details)  # Same as context (alias)

Standard Exceptions

All standard exceptions extend DataknobsError and follow the same constructor pattern.

ValidationError

Raised for data validation failures.

class ValidationError(DataknobsError):
    """Data validation failed."""

Example:

from dataknobs_common import ValidationError

raise ValidationError(
    "Invalid email format",
    context={"email": "invalid-email", "field": "user.email"}
)

ConfigurationError

Raised for configuration issues.

class ConfigurationError(DataknobsError):
    """Configuration error."""

Example:

from dataknobs_common import ConfigurationError

raise ConfigurationError(
    "Missing required configuration",
    context={"missing_keys": ["api_key", "endpoint"]}
)

ResourceError

Raised for resource acquisition or management failures.

class ResourceError(DataknobsError):
    """Resource error."""

Example:

from dataknobs_common import ResourceError

raise ResourceError(
    "Database connection failed",
    context={"host": "db.example.com", "port": 5432}
)

NotFoundError

Raised when an item cannot be found.

class NotFoundError(DataknobsError):
    """Item not found."""

Example:

from dataknobs_common import NotFoundError

raise NotFoundError(
    "User not found",
    context={"user_id": "123", "searched_in": "users_table"}
)

OperationError

Raised for general operation failures.

class OperationError(DataknobsError):
    """Operation failed."""

Example:

from dataknobs_common import OperationError

raise OperationError(
    "Payment processing failed",
    context={"transaction_id": "txn_123", "error_code": "INSUFFICIENT_FUNDS"}
)

ConcurrencyError

Raised for concurrent operation conflicts.

class ConcurrencyError(DataknobsError):
    """Concurrency error."""

Example:

from dataknobs_common import ConcurrencyError

raise ConcurrencyError(
    "Resource locked by another process",
    context={"resource_id": "res_123", "locked_by": "process_456"}
)

SerializationError

Raised for serialization/deserialization failures.

class SerializationError(DataknobsError):
    """Serialization error."""

Example:

from dataknobs_common import SerializationError

raise SerializationError(
    "Failed to deserialize object",
    context={"class": "User", "error": "missing required field 'email'"}
)

TimeoutError

Raised for operation timeout errors.

class TimeoutError(DataknobsError):
    """Operation timed out."""

Example:

from dataknobs_common import TimeoutError

raise TimeoutError(
    "API request timed out",
    context={"url": "https://api.example.com", "timeout_seconds": 30}
)

Registry Module

Base Registry

Registry[T]

Generic, thread-safe registry for managing named items.

class Registry(Generic[T]):
    """Thread-safe registry for managing named items."""

Type Parameter: - T: Type of items stored in the registry

Constructor:

Registry(
    name: str,
    enable_metrics: bool = False,
    allow_override: bool = False
)

Parameters: - name (str): Registry name (for logging and metrics) - enable_metrics (bool): Enable metrics tracking (default: False) - allow_override (bool): Allow overriding existing keys (default: False)

Methods:

register(key: str, item: T, metadata: dict[str, Any] | None = None) -> None

Register an item with a key.

Parameters: - key (str): Unique identifier for the item - item (T): The item to register - metadata (dict[str, Any] | None): Optional metadata

Raises: - ValueError: If key already exists and allow_override is False

Example:

from dataknobs_common import Registry

registry = Registry[str]("messages")
registry.register("greeting", "Hello, world!")
registry.register("farewell", "Goodbye!", metadata={"lang": "en"})

get(key: str) -> T

Get an item by key.

Parameters: - key (str): Item key

Returns: - T: The registered item

Raises: - KeyError: If key not found

Example:

message = registry.get("greeting")  # "Hello, world!"

get_or_none(key: str) -> T | None

Get an item by key, returning None if not found.

Parameters: - key (str): Item key

Returns: - T | None: The registered item or None

Example:

message = registry.get_or_none("greeting")  # "Hello, world!"
missing = registry.get_or_none("unknown")   # None

has(key: str) -> bool

Check if a key exists.

Parameters: - key (str): Item key

Returns: - bool: True if key exists

Example:

if registry.has("greeting"):
    print("Greeting exists")

unregister(key: str) -> T

Remove and return an item.

Parameters: - key (str): Item key

Returns: - T: The removed item

Raises: - KeyError: If key not found

Example:

removed = registry.unregister("greeting")

list_items() -> list[T]

Get list of all items.

Returns: - list[T]: All registered items

Example:

all_messages = registry.list_items()

list_keys() -> list[str]

Get list of all keys.

Returns: - list[str]: All registered keys

Example:

keys = registry.list_keys()

items() -> list[tuple[str, T]]

Get list of (key, item) tuples.

Returns: - list[tuple[str, T]]: All (key, item) pairs

Example:

for key, item in registry.items():
    print(f"{key}: {item}")

count() -> int

Get number of registered items.

Returns: - int: Number of items

Example:

total = registry.count()

clear() -> None

Remove all items.

Example:

registry.clear()

get_metadata(key: str) -> dict[str, Any] | None

Get metadata for a key.

Parameters: - key (str): Item key

Returns: - dict[str, Any] | None: Metadata or None

Example:

meta = registry.get_metadata("farewell")  # {"lang": "en"}

Magic Methods:

len(registry)           # Same as count()
key in registry         # Same as has(key)
for key in registry     # Iterate over keys

Cached Registry

CachedRegistry[T]

Registry with automatic TTL-based caching.

class CachedRegistry(Registry[T]):
    """Registry with TTL-based caching."""

Constructor:

CachedRegistry(
    name: str,
    cache_ttl: int = 300,
    enable_metrics: bool = True,
    allow_override: bool = False
)

Parameters: - name (str): Registry name - cache_ttl (int): Cache TTL in seconds (default: 300) - enable_metrics (bool): Enable metrics tracking (default: True) - allow_override (bool): Allow overriding existing keys (default: False)

Additional Methods:

get_cached(key: str, factory: Callable[[], T]) -> T

Get cached item or create with factory.

Parameters: - key (str): Cache key - factory (Callable[[], T]): Factory function to create item if not cached

Returns: - T: Cached or newly created item

Example:

from dataknobs_common import CachedRegistry

cache = CachedRegistry[Bot]("bots", cache_ttl=300)

def create_bot():
    return Bot(client_id="client1")

bot = cache.get_cached("client1", factory=create_bot)
# First call: creates bot
# Second call: returns cached bot (if within TTL)

invalidate_cache(key: str | None = None) -> None

Invalidate cache entry or entire cache.

Parameters: - key (str | None): Specific key to invalidate, or None to invalidate all

Example:

cache.invalidate_cache("client1")  # Invalidate specific item
cache.invalidate_cache()           # Invalidate all items

get_cache_stats() -> dict[str, Any]

Get cache statistics.

Returns: - dict[str, Any]: Statistics including hits, misses, hit_rate

Example:

stats = cache.get_cache_stats()
print(f"Hit rate: {stats['hit_rate']:.2%}")
print(f"Hits: {stats['hits']}, Misses: {stats['misses']}")

Async Registry

AsyncRegistry[T]

Async version of Registry.

class AsyncRegistry(Generic[T]):
    """Async registry for managing named items."""

Constructor:

AsyncRegistry(
    name: str,
    enable_metrics: bool = False,
    allow_override: bool = False
)

Methods:

All methods are async versions of the base Registry methods:

await registry.register(key, item, metadata=None)
item = await registry.get(key)
item = await registry.get_or_none(key)
exists = await registry.has(key)
item = await registry.unregister(key)
items = await registry.list_items()
keys = await registry.list_keys()
pairs = await registry.items()
count = await registry.count()
await registry.clear()
meta = await registry.get_metadata(key)

Example:

from dataknobs_common import AsyncRegistry

registry = AsyncRegistry[Resource]("resources")

await registry.register("db", db_resource)
resource = await registry.get("db")
count = await registry.count()

Serialization Module

Protocol

Serializable

Protocol for objects that can be serialized to/from dictionaries.

@runtime_checkable
class Serializable(Protocol):
    """Protocol for serializable objects."""

    def to_dict(self) -> dict: ...

    @classmethod
    def from_dict(cls, data: dict) -> Self: ...

Example:

from dataknobs_common import Serializable
from dataclasses import dataclass

@dataclass
class User:
    name: str
    email: str

    def to_dict(self) -> dict:
        return {"name": self.name, "email": self.email}

    @classmethod
    def from_dict(cls, data: dict) -> "User":
        return cls(name=data["name"], email=data["email"])

# Type checking works
user = User("Alice", "alice@example.com")
assert isinstance(user, Serializable)  # True

Utility Functions

serialize(obj: Serializable) -> dict

Serialize an object to a dictionary.

Parameters: - obj (Serializable): Object to serialize

Returns: - dict: Serialized dictionary

Raises: - SerializationError: If serialization fails

Example:

from dataknobs_common import serialize

user = User("Alice", "alice@example.com")
data = serialize(user)
# {"name": "Alice", "email": "alice@example.com"}

deserialize(cls: type[T], data: dict) -> T

Deserialize a dictionary to an object.

Parameters: - cls (type[T]): Class to deserialize to - data (dict): Dictionary to deserialize

Returns: - T: Deserialized object

Raises: - SerializationError: If deserialization fails

Example:

from dataknobs_common import deserialize

data = {"name": "Alice", "email": "alice@example.com"}
user = deserialize(User, data)

serialize_list(objects: list[Serializable]) -> list[dict]

Serialize a list of objects.

Parameters: - objects (list[Serializable]): List of objects to serialize

Returns: - list[dict]: List of serialized dictionaries

Raises: - SerializationError: If serialization fails

Example:

from dataknobs_common import serialize_list

users = [
    User("Alice", "alice@example.com"),
    User("Bob", "bob@example.com")
]
data = serialize_list(users)
# [{"name": "Alice", ...}, {"name": "Bob", ...}]

deserialize_list(cls: type[T], data_list: list[dict]) -> list[T]

Deserialize a list of dictionaries.

Parameters: - cls (type[T]): Class to deserialize to - data_list (list[dict]): List of dictionaries to deserialize

Returns: - list[T]: List of deserialized objects

Raises: - SerializationError: If deserialization fails

Example:

from dataknobs_common import deserialize_list

data = [
    {"name": "Alice", "email": "alice@example.com"},
    {"name": "Bob", "email": "bob@example.com"}
]
users = deserialize_list(User, data)

is_serializable(obj: Any) -> bool

Check if an object is serializable.

Parameters: - obj (Any): Object to check

Returns: - bool: True if object has to_dict method

Example:

from dataknobs_common import is_serializable

user = User("Alice", "alice@example.com")
if is_serializable(user):
    data = serialize(user)

is_deserializable(cls: type) -> bool

Check if a class is deserializable.

Parameters: - cls (type): Class to check

Returns: - bool: True if class has from_dict classmethod

Example:

from dataknobs_common import is_deserializable

if is_deserializable(User):
    user = deserialize(User, data)

JSON Safety Functions

sanitize_for_json(value: Any, on_drop: str = "silent") -> Any

Recursively traverse a value and drop anything not JSON-serializable. Handles dicts, lists, dataclasses, sets, tuples, bytes, datetime, Enum, and objects with to_dict().

Parameters: - value (Any): The value to sanitize - on_drop (str): Drop behavior — "silent" (DEBUG log, default), "warn" (WARNING log with key path), "error" (raises SerializationError listing all dropped paths)

Returns: - Any: JSON-safe copy with non-serializable values removed

Raises: - SerializationError: When on_drop="error" and non-serializable values are found

Example:

from dataknobs_common.serialization import sanitize_for_json

data = {"name": "Alice", "callback": some_function, "count": 42}

# Silent mode (default) — drops with DEBUG log
safe = sanitize_for_json(data)
# {"name": "Alice", "count": 42}

# Warn mode — WARNING log with key path
safe = sanitize_for_json(data, on_drop="warn")

# Error mode — raises SerializationError
safe = sanitize_for_json(data, on_drop="error")
# SerializationError: Non-serializable values at: callback (type=function)

validate_json_safe(value: Any) -> list[str]

Read-only traversal returning paths to non-serializable values. Does not modify the input.

Parameters: - value (Any): The value to check

Returns: - list[str]: Paths to non-serializable values. Empty list means fully JSON-safe.

Example:

from dataknobs_common.serialization import validate_json_safe

problems = validate_json_safe({"name": "ok", "fn": some_function})
# ["fn (type=function)"]

if not problems:
    print("Fully JSON-safe")

Retry Module

BackoffStrategy

Enum defining backoff algorithms for retry delays.

class BackoffStrategy(Enum):
    FIXED = "fixed"
    LINEAR = "linear"
    EXPONENTIAL = "exponential"
    JITTER = "jitter"
    DECORRELATED = "decorrelated"

Members:

  • FIXED — Constant delay between retries
  • LINEAR — Delay increases linearly (initial_delay * attempt)
  • EXPONENTIAL — Delay multiplied by backoff_multiplier each attempt
  • JITTER — Exponential backoff with random jitter (controlled by jitter_range)
  • DECORRELATED — Random delay between initial_delay and 3x previous delay

Example:

from dataknobs_common.retry import BackoffStrategy

strategy = BackoffStrategy.EXPONENTIAL
strategy = BackoffStrategy("jitter")  # From string value

RetryConfig

Dataclass configuring retry behavior.

@dataclass
class RetryConfig:
    max_attempts: int = 3
    initial_delay: float = 1.0
    max_delay: float = 60.0
    backoff_strategy: BackoffStrategy = BackoffStrategy.EXPONENTIAL
    backoff_multiplier: float = 2.0
    jitter_range: float = 0.1
    retry_on_exceptions: list[type] | None = None
    retry_on_result: Callable[[Any], bool] | None = None
    on_retry: Callable[[int, Exception], None] | None = None
    on_failure: Callable[[Exception], None] | None = None

Fields:

Field Type Default Description
max_attempts int 3 Maximum execution attempts (including the first)
initial_delay float 1.0 Base delay in seconds before the first retry
max_delay float 60.0 Upper bound on delay in seconds
backoff_strategy BackoffStrategy EXPONENTIAL Algorithm for computing delay
backoff_multiplier float 2.0 Multiplier for exponential/jitter strategies
jitter_range float 0.1 Fractional jitter range for JITTER strategy (0.1 = ±10%)
retry_on_exceptions list[type] \| None None Only retry these exception types; others propagate immediately
retry_on_result Callable \| None None Return True to trigger retry based on result value
on_retry Callable \| None None Hook called before retry sleep: (attempt, exception)
on_failure Callable \| None None Hook called when all attempts exhausted: (exception)

Example:

from dataknobs_common.retry import RetryConfig, BackoffStrategy

config = RetryConfig(
    max_attempts=5,
    initial_delay=0.5,
    max_delay=30.0,
    backoff_strategy=BackoffStrategy.JITTER,
    retry_on_exceptions=[ConnectionError, TimeoutError],
    on_retry=lambda attempt, exc: logger.warning("Retry %d: %s", attempt, exc),
)

RetryExecutor

Executes a callable with retry logic and configurable backoff.

class RetryExecutor:
    def __init__(self, config: RetryConfig) -> None: ...
    async def execute(self, func: Callable[..., Any], *args: Any, **kwargs: Any) -> Any: ...

Constructor:

  • config (RetryConfig): The retry configuration

Methods:

async execute(func, *args, **kwargs) -> Any

Execute a callable with retry logic. Supports both sync and async callables.

Parameters:

  • func (Callable): The callable to execute (sync or async)
  • *args: Positional arguments forwarded to func
  • **kwargs: Keyword arguments forwarded to func

Returns:

  • The return value of func on a successful attempt

Raises:

  • The exception from the final failed attempt, or any non-retryable exception immediately

Example:

from dataknobs_common.retry import RetryExecutor, RetryConfig, BackoffStrategy

config = RetryConfig(
    max_attempts=3,
    backoff_strategy=BackoffStrategy.FIXED,
    initial_delay=1.0,
)
executor = RetryExecutor(config)

# Async callable
result = await executor.execute(fetch_data, url)

# Sync callable (also works from async context)
result = await executor.execute(parse_json, raw_text)


Transitions Module

InvalidTransitionError

Exception raised when a status transition is not allowed. Extends OperationError.

class InvalidTransitionError(OperationError):
    def __init__(
        self,
        entity: str,
        current_status: str,
        target_status: str,
        allowed: set[str] | None = None,
    ) -> None: ...

Parameters:

  • entity (str): Name of the entity or transition graph (e.g. "run_status")
  • current_status (str): The current status being transitioned from
  • target_status (str): The target status that was rejected
  • allowed (set[str] | None): Valid targets from current_status, or None if current status is unknown

Attributes:

  • entity (str): The entity name
  • current_status (str): The current status
  • target_status (str): The rejected target status
  • allowed (set[str] | None): Allowed targets, or None for unknown status
  • context (dict): Structured context dict with keys entity, current_status, target_status, allowed (sorted list)

Example:

from dataknobs_common.transitions import InvalidTransitionError

try:
    validator.validate("completed", "running")
except InvalidTransitionError as e:
    print(e.entity)          # "order"
    print(e.current_status)  # "completed"
    print(e.target_status)   # "running"
    print(e.allowed)         # set()

TransitionValidator

Stateless validator for declarative transition graphs. Does not manage or store state — the caller owns the current status.

class TransitionValidator:
    def __init__(self, name: str, transitions: dict[str, set[str]]) -> None: ...

Constructor:

  • name (str): Human-readable name for the graph, used in error messages
  • transitions (dict[str, set[str]]): Mapping from each status to its allowed target statuses. Statuses with empty sets are terminal.

Properties:

name -> str

The name of this transition graph.

allowed_transitions -> dict[str, set[str]]

Returns a copy of the full transition graph.

statuses -> set[str]

All known statuses (sources and targets).

Methods:

validate(current_status: str | None, target_status: str) -> None

Validate a proposed transition.

Parameters:

  • current_status (str | None): The current status. If None, validation is skipped.
  • target_status (str): The desired target status.

Raises:

  • InvalidTransitionError: If the transition is not allowed.

Example:

from dataknobs_common.transitions import TransitionValidator

ORDER = TransitionValidator("order", {
    "draft":     {"submitted"},
    "submitted": {"approved", "rejected"},
    "approved":  {"shipped"},
    "shipped":   {"delivered"},
    "rejected":  set(),
    "delivered":  set(),
})

ORDER.validate("draft", "submitted")  # ok
ORDER.validate(None, "submitted")     # ok (skip)
ORDER.validate("shipped", "draft")    # raises InvalidTransitionError

is_valid(current_status: str | None, target_status: str) -> bool

Check whether a transition is allowed without raising.

Parameters:

  • current_status (str | None): The current status. If None, returns True.
  • target_status (str): The desired target status.

Returns:

  • True if the transition is allowed, False otherwise.

Example:

if ORDER.is_valid(current, target):
    update_status(target)
else:
    logger.warning("Invalid transition: %s -> %s", current, target)

get_reachable(from_status: str) -> set[str]

Compute all statuses reachable from a given status (transitive closure).

Parameters:

  • from_status (str): The starting status.

Returns:

  • Set of all reachable statuses via one or more transitions. Does not include from_status itself unless there is a cycle.

Raises:

  • InvalidTransitionError: If from_status is not a known status.

Example:

reachable = ORDER.get_reachable("draft")
# {"submitted", "approved", "rejected", "shipped", "delivered"}

reachable = ORDER.get_reachable("delivered")
# set() — terminal status


Package Information

Version

from dataknobs_common import __version__

The version string for the dataknobs-common package.

Type: str

Example:

from dataknobs_common import __version__

print(__version__)  # "1.3.0"

Import Patterns

# Exceptions
from dataknobs_common import (
    DataknobsError,
    ValidationError,
    ConfigurationError,
    ResourceError,
    NotFoundError,
    OperationError,
    ConcurrencyError,
    SerializationError,
    TimeoutError,
)

# Registry
from dataknobs_common import (
    Registry,
    CachedRegistry,
    AsyncRegistry,
)

# Serialization
from dataknobs_common import (
    Serializable,
    serialize,
    deserialize,
    serialize_list,
    deserialize_list,
    is_serializable,
    is_deserializable,
)

# JSON Safety
from dataknobs_common.serialization import sanitize_for_json, validate_json_safe

# Retry
from dataknobs_common import (
    BackoffStrategy,
    RetryConfig,
    RetryExecutor,
)

# Transitions
from dataknobs_common import (
    InvalidTransitionError,
    TransitionValidator,
)

Module Imports

# Import entire modules
from dataknobs_common import exceptions
from dataknobs_common import registry
from dataknobs_common import serialization
from dataknobs_common import retry
from dataknobs_common import transitions

Type Annotations

Registry Type Annotations

from dataknobs_common import Registry
from typing import Protocol

class Tool(Protocol):
    name: str
    description: str

# Typed registry
tool_registry: Registry[Tool] = Registry("tools")

# Function accepting registry
def process_registry(registry: Registry[Tool]) -> None:
    for tool in registry.list_items():
        print(tool.name)

Serializable Type Annotations

from dataknobs_common import Serializable
from typing import TypeVar

T = TypeVar("T", bound=Serializable)

def save_to_file(obj: T, filepath: str) -> None:
    """Save any serializable object to file."""
    data = obj.to_dict()
    with open(filepath, "w") as f:
        json.dump(data, f)

def load_from_file(cls: type[T], filepath: str) -> T:
    """Load any serializable object from file."""
    with open(filepath) as f:
        data = json.load(f)
    return cls.from_dict(data)

Error Handling Patterns

Catching All Dataknobs Errors

from dataknobs_common import DataknobsError

try:
    # Any dataknobs operation
    result = some_dataknobs_operation()
except DataknobsError as e:
    logger.error(f"Dataknobs error: {e.message}")
    if e.context:
        logger.error(f"Context: {e.context}")

Catching Specific Errors

from dataknobs_common import ValidationError, NotFoundError, ResourceError

try:
    result = process_data(input_data)
except ValidationError as e:
    # Handle validation errors
    return {"error": "validation_failed", "details": e.context}
except NotFoundError as e:
    # Handle not found errors
    return {"error": "not_found", "id": e.context.get("id")}
except ResourceError as e:
    # Handle resource errors
    return {"error": "resource_unavailable", "resource": e.context.get("resource_id")}

Registry Error Handling

from dataknobs_common import Registry

registry = Registry[Tool]("tools")

try:
    tool = registry.get("calculator")
except KeyError:
    # Handle missing key
    tool = default_tool

# Or use get_or_none
tool = registry.get_or_none("calculator")
if tool is None:
    tool = default_tool

Serialization Error Handling

from dataknobs_common import deserialize, SerializationError

try:
    user = deserialize(User, data)
except SerializationError as e:
    logger.error(f"Failed to deserialize: {e.message}")
    logger.error(f"Error context: {e.context}")
    # Handle error appropriately

Advanced Usage Patterns

Custom Registry with Validation

from dataknobs_common import Registry

class ValidatedRegistry(Registry[T]):
    """Registry with validation on registration."""

    def register(self, key: str, item: T, metadata: dict | None = None) -> None:
        # Validate before registering
        if not self._validate(item):
            raise ValueError(f"Item validation failed: {key}")
        super().register(key, item, metadata)

    def _validate(self, item: T) -> bool:
        # Custom validation logic
        return True

Serializable with Validation

from dataknobs_common import Serializable, ValidationError
from dataclasses import dataclass

@dataclass
class User:
    name: str
    email: str

    def to_dict(self) -> dict:
        return {"name": self.name, "email": self.email}

    @classmethod
    def from_dict(cls, data: dict) -> "User":
        # Validate during deserialization
        if "@" not in data.get("email", ""):
            raise ValidationError(
                "Invalid email format",
                context={"email": data.get("email")}
            )
        return cls(name=data["name"], email=data["email"])

Exception with Rich Context

from dataknobs_common import OperationError

class ProcessingError(OperationError):
    """Custom processing error with rich context."""

    def __init__(
        self,
        stage: str,
        item_id: str,
        error: Exception,
        retry_count: int = 0
    ):
        super().__init__(
            f"Processing failed at stage '{stage}' for item '{item_id}'",
            context={
                "stage": stage,
                "item_id": item_id,
                "error_type": type(error).__name__,
                "error_message": str(error),
                "retry_count": retry_count,
            }
        )
        self.stage = stage
        self.item_id = item_id
        self.original_error = error

Best Practices

1. Use Type Parameters

# Good: Typed registry
tool_registry = Registry[Tool]("tools")

# Less ideal: Untyped registry
tool_registry = Registry("tools")  # Type checking not enforced

2. Provide Context in Exceptions

# Good: Rich context
raise NotFoundError(
    "User not found",
    context={"user_id": user_id, "search_criteria": criteria}
)

# Acceptable: Simple message
raise NotFoundError("User not found")

3. Use Serialization Utilities

# Good: Use utilities for consistent error handling
from dataknobs_common import serialize, deserialize

data = serialize(user)
restored = deserialize(User, data)

# Less ideal: Direct calls (no error wrapping)
data = user.to_dict()
restored = User.from_dict(data)

4. Extend, Don't Replace

# Good: Extend common base
class MyRegistry(Registry[Item]):
    def register_item(self, item: Item) -> None:
        self.register(item.id, item)

# Avoid: Reimplementing from scratch
class MyRegistry:
    def __init__(self):
        self._items = {}

Dependencies

The Common package has minimal dependencies:

  • Python: >= 3.12
  • Standard library only: No external dependencies

Changelog

Version 1.3.0

  • Added dataknobs_common.retry module (BackoffStrategy, RetryConfig, RetryExecutor)
  • Added dataknobs_common.transitions module (InvalidTransitionError, TransitionValidator)
  • Retry primitives extracted from dataknobs_fsm.patterns.error_recovery (zero FSM dependency)
  • FSM module re-exports from common for backward compatibility

Version 1.0.1

  • Added Registry, CachedRegistry, AsyncRegistry implementations
  • Added comprehensive Exception hierarchy
  • Added Serialization protocol and utilities
  • Initial production release

Version 1.0.0

  • Initial release with basic version management