Skip to content

dataknobs-common Complete API Reference

Complete auto-generated API documentation from source code docstrings.

💡 Also see: - Curated Guide - Hand-crafted tutorials and examples - Package Overview - Introduction and getting started - Source Code - View on GitHub


dataknobs_common

Common utilities and base classes for dataknobs packages.

This package provides shared functionality used across all dataknobs packages:

  • Exceptions: Unified exception hierarchy with context support
  • Expressions: Safe expression evaluation engine with restricted builtins
  • Registry: Generic registry pattern for managing named items
  • Serialization: Protocols and utilities for to_dict/from_dict patterns
  • Retry: Configurable retry execution with backoff strategies
  • Transitions: Stateless transition validation for status graphs
  • Events: Event bus abstraction for pub/sub messaging
  • Testing: Test utilities, markers, and configuration factories
Example
from dataknobs_common import DataknobsError, Registry, serialize

# Use common exceptions
raise DataknobsError("Something went wrong", context={"details": "here"})

# Create a registry
registry = Registry[MyType]("my_registry")
registry.register("key", my_item)

# Serialize objects
data = serialize(my_object)

# Use event bus
from dataknobs_common.events import create_event_bus, Event, EventType
bus = create_event_bus({"backend": "memory"})

Modules:

Name Description
events

Event bus abstraction for distributed event handling.

exceptions

Common exception hierarchy for all dataknobs packages.

expressions

Safe expression evaluation engine.

ratelimit

Rate limiting abstraction supporting multiple backends.

registry

Generic registry pattern for managing named items.

retry

Retry utilities for resilient operation execution.

serialization

Serialization protocols and utilities for dataknobs packages.

testing

Test utilities for dataknobs packages.

transitions

Stateless transition validation for declarative status graphs.

Classes:

Name Description
ExpressionResult

Result of a safe expression evaluation.

Event

An event message for the event bus.

EventBus

Abstract event bus protocol supporting multiple backends.

EventType

Standard event types for registry and resource changes.

InMemoryEventBus

Simple in-memory pub/sub event bus.

Subscription

Handle for managing an event subscription.

ConcurrencyError

Raised when concurrent operation conflicts occur.

ConfigurationError

Raised when configuration is invalid or missing.

DataknobsError

Base exception for all dataknobs packages.

NotFoundError

Raised when a requested item is not found.

OperationError

Raised when an operation fails.

RateLimitError

Raised when a rate limit is exceeded.

ResourceError

Raised when resource operations fail.

SerializationError

Raised when serialization or deserialization fails.

TimeoutError

Raised when an operation times out.

ValidationError

Raised when validation fails.

InMemoryRateLimiter

Sliding-window rate limiter backed by in-memory data structures.

RateLimit

A single rate limit rule.

RateLimiter

Abstract rate limiter protocol supporting multiple backends.

RateLimiterConfig

Configuration for a rate limiter.

RateLimitStatus

Current status of a rate limiter bucket.

BackoffStrategy

Backoff strategies for retries.

RetryConfig

Configuration for retry behavior.

RetryExecutor

Executes a callable with retry logic and configurable backoff.

InvalidTransitionError

Raised when a status transition is not allowed.

TransitionValidator

Stateless validator for declarative transition graphs.

AsyncRegistry

Async-safe registry for managing named items.

CachedRegistry

Registry with time-based caching support.

PluginRegistry

Registry for plugins with factory support and defaults.

Registry

Base registry for managing named items with optional metrics.

Serializable

Protocol for objects that can be serialized to/from dict.

Functions:

Name Description
safe_eval

Evaluate a Python expression string safely.

safe_eval_value

Convenience wrapper returning just the value.

create_event_bus

Create an event bus from configuration.

create_rate_limiter

Create a rate limiter from configuration.

deserialize

Deserialize dictionary into an object.

deserialize_list

Deserialize a list of dictionaries into objects.

is_deserializable

Check if a class is deserializable.

is_serializable

Check if an object is serializable.

serialize

Serialize an object to dictionary.

serialize_list

Serialize a list of objects to list of dictionaries.

create_test_json_files

Create test JSON files.

create_test_markdown_files

Create test markdown files for ingestion.

get_test_bot_config

Get a test bot configuration.

get_test_rag_config

Get a test RAG/knowledge base configuration.

is_chromadb_available

Check if ChromaDB is available.

is_faiss_available

Check if FAISS is available.

is_ollama_available

Check if Ollama service is available.

is_ollama_model_available

Check if a specific Ollama model is available.

is_package_available

Check if a Python package is available.

is_redis_available

Check if Redis service is available.

Attributes:

Name Type Description
SAFE_BUILTINS dict[str, Any]

Builtins allowlist shared by all expression contexts.

YAML_ALIASES dict[str, Any]

Common aliases for YAML/JSON boolean and null literals.

Attributes

SAFE_BUILTINS module-attribute

SAFE_BUILTINS: dict[str, Any] = {
    "str": str,
    "int": int,
    "float": float,
    "bool": bool,
    "list": list,
    "dict": dict,
    "tuple": tuple,
    "set": set,
    "len": len,
    "min": min,
    "max": max,
    "abs": abs,
    "round": round,
    "sorted": sorted,
    "isinstance": isinstance,
    "enumerate": enumerate,
    "range": range,
    "zip": zip,
    "True": True,
    "False": False,
    "None": None,
}

Builtins allowlist shared by all expression contexts.

Explicitly excludes: exec, eval, __import__, open, getattr, setattr, delattr, globals, locals, compile, breakpoint, __builtins__ passthrough.

YAML_ALIASES module-attribute

YAML_ALIASES: dict[str, Any] = {
    "true": True,
    "false": False,
    "null": None,
    "none": None,
}

Common aliases for YAML/JSON boolean and null literals.

Included in expression scope so that config-authored expressions can use true/false/null (YAML style) alongside Python's True/False/None.

Note: scope variables with the same name override these aliases (scope is applied after YAML_ALIASES).

Classes

ExpressionResult dataclass

ExpressionResult(
    value: Any = None, success: bool = True, error: str | None = None
)

Result of a safe expression evaluation.

Attributes:

Name Type Description
value Any

The evaluated result (native Python type).

success bool

Whether evaluation succeeded.

error str | None

Exception message if evaluation failed.

Event dataclass

Event(
    type: EventType,
    topic: str,
    payload: dict[str, Any] = dict(),
    timestamp: datetime = (lambda: datetime.now(timezone.utc))(),
    event_id: str = (lambda: str(uuid.uuid4()))(),
    source: str | None = None,
    correlation_id: str | None = None,
    metadata: dict[str, Any] = dict(),
)

An event message for the event bus.

Events are immutable messages that represent something that happened. They contain a type, topic, payload, and metadata.

Attributes:

Name Type Description
type EventType

The type of event (created, updated, deleted, etc.)

topic str

The topic/channel this event belongs to (e.g., "registry:bots")

payload dict[str, Any]

The event data as a dictionary

timestamp datetime

When the event was created (defaults to now)

event_id str

Unique identifier for this event (auto-generated)

source str | None

Optional identifier for the event source

correlation_id str | None

Optional ID to correlate related events

metadata dict[str, Any]

Additional metadata for the event

Example
event = Event(
    type=EventType.UPDATED,
    topic="registry:bots",
    payload={"bot_id": "my-bot", "changes": ["config"]}
)

# Serialize for transport
data = event.to_dict()

# Restore from transport
restored = Event.from_dict(data)

Methods:

Name Description
to_dict

Convert event to dictionary for serialization.

from_dict

Create event from dictionary.

with_correlation

Create a new event with a correlation ID.

Functions
to_dict
to_dict() -> dict[str, Any]

Convert event to dictionary for serialization.

Returns:

Type Description
dict[str, Any]

Dictionary representation with ISO timestamp and string enum.

Source code in packages/common/src/dataknobs_common/events/types.py
def to_dict(self) -> dict[str, Any]:
    """Convert event to dictionary for serialization.

    Returns:
        Dictionary representation with ISO timestamp and string enum.
    """
    return {
        "type": self.type.value,
        "topic": self.topic,
        "payload": self.payload,
        "timestamp": self.timestamp.isoformat(),
        "event_id": self.event_id,
        "source": self.source,
        "correlation_id": self.correlation_id,
        "metadata": self.metadata,
    }
from_dict classmethod
from_dict(data: dict[str, Any]) -> Event

Create event from dictionary.

Parameters:

Name Type Description Default
data dict[str, Any]

Dictionary with event data

required

Returns:

Type Description
Event

Event instance

Source code in packages/common/src/dataknobs_common/events/types.py
@classmethod
def from_dict(cls, data: dict[str, Any]) -> Event:
    """Create event from dictionary.

    Args:
        data: Dictionary with event data

    Returns:
        Event instance
    """
    return cls(
        type=EventType(data["type"]),
        topic=data["topic"],
        payload=data.get("payload", {}),
        timestamp=(
            datetime.fromisoformat(data["timestamp"])
            if isinstance(data.get("timestamp"), str)
            else data.get("timestamp", datetime.now(timezone.utc))
        ),
        event_id=data.get("event_id", str(uuid.uuid4())),
        source=data.get("source"),
        correlation_id=data.get("correlation_id"),
        metadata=data.get("metadata", {}),
    )
with_correlation
with_correlation(correlation_id: str) -> Event

Create a new event with a correlation ID.

Useful for tracking related events through a workflow.

Parameters:

Name Type Description Default
correlation_id str

The correlation ID to set

required

Returns:

Type Description
Event

New Event with the correlation ID set

Source code in packages/common/src/dataknobs_common/events/types.py
def with_correlation(self, correlation_id: str) -> Event:
    """Create a new event with a correlation ID.

    Useful for tracking related events through a workflow.

    Args:
        correlation_id: The correlation ID to set

    Returns:
        New Event with the correlation ID set
    """
    return Event(
        type=self.type,
        topic=self.topic,
        payload=self.payload,
        timestamp=self.timestamp,
        event_id=self.event_id,
        source=self.source,
        correlation_id=correlation_id,
        metadata=self.metadata,
    )

EventBus

Bases: Protocol

Abstract event bus protocol supporting multiple backends.

The EventBus provides a publish-subscribe interface for distributing events across components. Different implementations support various deployment scenarios:

  • InMemoryEventBus: Single process, no external dependencies
  • PostgresEventBus: Uses LISTEN/NOTIFY, works on local and RDS
  • RedisEventBus: Redis pub/sub, works with ElastiCache

All implementations follow this protocol, allowing configuration-driven backend selection without code changes.

Example
from dataknobs_common.events import create_event_bus, Event, EventType

# Create event bus from config
bus = create_event_bus({"backend": "memory"})
await bus.connect()

# Subscribe to events
async def handler(event: Event) -> None:
    print(f"Got {event.type} on {event.topic}")

subscription = await bus.subscribe("registry:bots", handler)

# Publish an event
await bus.publish("registry:bots", Event(
    type=EventType.CREATED,
    topic="registry:bots",
    payload={"bot_id": "new-bot"}
))

# Cleanup
await subscription.cancel()
await bus.close()
Pattern Matching

Some backends support wildcard patterns for subscriptions: - "registry:" matches "registry:bots", "registry:users", etc. - ":created" matches "bots:created", "users:created", etc. Check backend documentation for supported patterns.

Methods:

Name Description
connect

Initialize the event bus connection.

close

Close connections and cleanup resources.

publish

Publish an event to a topic.

subscribe

Subscribe to events on a topic.

Functions
connect async
connect() -> None

Initialize the event bus connection.

Called before the bus is used. Should be idempotent.

Source code in packages/common/src/dataknobs_common/events/bus.py
async def connect(self) -> None:
    """Initialize the event bus connection.

    Called before the bus is used. Should be idempotent.
    """
    ...
close async
close() -> None

Close connections and cleanup resources.

Should cancel all active subscriptions and release resources.

Source code in packages/common/src/dataknobs_common/events/bus.py
async def close(self) -> None:
    """Close connections and cleanup resources.

    Should cancel all active subscriptions and release resources.
    """
    ...
publish async
publish(topic: str, event: Event) -> None

Publish an event to a topic.

The event will be delivered to all subscribers of the topic. Delivery semantics depend on the backend: - Memory: Synchronous, in-process delivery - Postgres: Fire-and-forget via NOTIFY - Redis: Fire-and-forget via PUBLISH

Parameters:

Name Type Description Default
topic str

The topic/channel to publish to

required
event Event

The event to publish

required
Source code in packages/common/src/dataknobs_common/events/bus.py
async def publish(self, topic: str, event: Event) -> None:
    """Publish an event to a topic.

    The event will be delivered to all subscribers of the topic.
    Delivery semantics depend on the backend:
    - Memory: Synchronous, in-process delivery
    - Postgres: Fire-and-forget via NOTIFY
    - Redis: Fire-and-forget via PUBLISH

    Args:
        topic: The topic/channel to publish to
        event: The event to publish
    """
    ...
subscribe async
subscribe(
    topic: str, handler: Callable[[Event], Any], pattern: str | None = None
) -> Subscription

Subscribe to events on a topic.

The handler will be called for each event published to the topic. Handlers should be async functions that accept an Event.

Parameters:

Name Type Description Default
topic str

The topic to subscribe to

required
handler Callable[[Event], Any]

Async function to call with each event

required
pattern str | None

Optional pattern for wildcard matching (backend-specific)

None

Returns:

Type Description
Subscription

Subscription handle that can be used to cancel the subscription

Source code in packages/common/src/dataknobs_common/events/bus.py
async def subscribe(
    self,
    topic: str,
    handler: Callable[[Event], Any],
    pattern: str | None = None,
) -> Subscription:
    """Subscribe to events on a topic.

    The handler will be called for each event published to the topic.
    Handlers should be async functions that accept an Event.

    Args:
        topic: The topic to subscribe to
        handler: Async function to call with each event
        pattern: Optional pattern for wildcard matching (backend-specific)

    Returns:
        Subscription handle that can be used to cancel the subscription
    """
    ...

EventType

Bases: Enum

Standard event types for registry and resource changes.

These types represent common lifecycle events for managed resources.

Example
from dataknobs_common.events import Event, EventType

event = Event(
    type=EventType.CREATED,
    topic="registry:bots",
    payload={"bot_id": "my-bot", "config": {...}}
)

InMemoryEventBus

InMemoryEventBus()

Simple in-memory pub/sub event bus.

This implementation is suitable for: - Single-process applications - Development and testing - Scenarios where events don't need to cross process boundaries

Features: - Fast, synchronous delivery within the same process - Support for wildcard patterns using fnmatch - Thread-safe using asyncio.Lock

Limitations: - Events are not persisted - Does not work across multiple processes or machines - Events are lost if there are no subscribers

Example
from dataknobs_common.events import InMemoryEventBus, Event, EventType

bus = InMemoryEventBus()
await bus.connect()

events_received = []

async def handler(event: Event) -> None:
    events_received.append(event)

# Subscribe with pattern matching
await bus.subscribe("registry:*", handler, pattern="registry:*")

# Publish
await bus.publish("registry:bots", Event(
    type=EventType.CREATED,
    topic="registry:bots",
    payload={"bot_id": "test"}
))

assert len(events_received) == 1
await bus.close()

Initialize the in-memory event bus.

Methods:

Name Description
connect

Initialize the event bus.

close

Close the event bus and cancel all subscriptions.

publish

Publish an event to a topic.

subscribe

Subscribe to events on a topic.

Attributes:

Name Type Description
subscription_count int

Get the number of active subscriptions.

Source code in packages/common/src/dataknobs_common/events/memory.py
def __init__(self) -> None:
    """Initialize the in-memory event bus."""
    self._subscriptions: dict[str, Subscription] = {}
    self._topic_subscribers: dict[str, set[str]] = {}  # topic -> subscription_ids
    self._pattern_subscribers: list[tuple[str, str]] = []  # (pattern, sub_id)
    self._lock = asyncio.Lock()
    self._connected = False
Attributes
subscription_count property
subscription_count: int

Get the number of active subscriptions.

Functions
connect async
connect() -> None

Initialize the event bus.

For in-memory bus, this just sets the connected flag.

Source code in packages/common/src/dataknobs_common/events/memory.py
async def connect(self) -> None:
    """Initialize the event bus.

    For in-memory bus, this just sets the connected flag.
    """
    async with self._lock:
        self._connected = True
        logger.debug("InMemoryEventBus connected")
close async
close() -> None

Close the event bus and cancel all subscriptions.

Source code in packages/common/src/dataknobs_common/events/memory.py
async def close(self) -> None:
    """Close the event bus and cancel all subscriptions."""
    async with self._lock:
        self._subscriptions.clear()
        self._topic_subscribers.clear()
        self._pattern_subscribers.clear()
        self._connected = False
        logger.debug("InMemoryEventBus closed")
publish async
publish(topic: str, event: Event) -> None

Publish an event to a topic.

Delivers the event to all subscribers of the topic and any pattern subscribers that match.

Parameters:

Name Type Description Default
topic str

The topic to publish to

required
event Event

The event to publish

required
Source code in packages/common/src/dataknobs_common/events/memory.py
async def publish(self, topic: str, event: Event) -> None:
    """Publish an event to a topic.

    Delivers the event to all subscribers of the topic and any
    pattern subscribers that match.

    Args:
        topic: The topic to publish to
        event: The event to publish
    """
    if not self._connected:
        logger.warning("Publishing to disconnected event bus")

    handlers_to_call: list[tuple[str, Callable[[Event], Any]]] = []

    async with self._lock:
        # Get exact topic subscribers
        if topic in self._topic_subscribers:
            for sub_id in self._topic_subscribers[topic]:
                if sub_id in self._subscriptions:
                    sub = self._subscriptions[sub_id]
                    handlers_to_call.append((sub_id, sub.handler))

        # Get pattern subscribers
        for pattern, sub_id in self._pattern_subscribers:
            if fnmatch.fnmatch(topic, pattern):
                if sub_id in self._subscriptions:
                    sub = self._subscriptions[sub_id]
                    handlers_to_call.append((sub_id, sub.handler))

    # Call handlers outside the lock to avoid deadlocks
    for sub_id, handler in handlers_to_call:
        try:
            result = handler(event)
            if asyncio.iscoroutine(result):
                await result
        except Exception:
            logger.exception(
                "Error in event handler for subscription %s",
                sub_id,
            )

    logger.debug(
        "Published event %s to topic %s, delivered to %d handlers",
        event.event_id[:8],
        topic,
        len(handlers_to_call),
    )
subscribe async
subscribe(
    topic: str, handler: Callable[[Event], Any], pattern: str | None = None
) -> Subscription

Subscribe to events on a topic.

Parameters:

Name Type Description Default
topic str

The topic to subscribe to

required
handler Callable[[Event], Any]

Function to call with each event

required
pattern str | None

Optional wildcard pattern (uses fnmatch syntax)

None

Returns:

Type Description
Subscription

Subscription handle for managing the subscription

Source code in packages/common/src/dataknobs_common/events/memory.py
async def subscribe(
    self,
    topic: str,
    handler: Callable[[Event], Any],
    pattern: str | None = None,
) -> Subscription:
    """Subscribe to events on a topic.

    Args:
        topic: The topic to subscribe to
        handler: Function to call with each event
        pattern: Optional wildcard pattern (uses fnmatch syntax)

    Returns:
        Subscription handle for managing the subscription
    """
    subscription_id = str(uuid.uuid4())

    subscription = Subscription(
        subscription_id=subscription_id,
        topic=topic,
        handler=handler,
        pattern=pattern,
        _cancel_callback=self._unsubscribe,
    )

    async with self._lock:
        self._subscriptions[subscription_id] = subscription

        if pattern:
            # Pattern-based subscription
            self._pattern_subscribers.append((pattern, subscription_id))
            logger.debug(
                "Subscribed %s to pattern %s",
                subscription_id[:8],
                pattern,
            )
        else:
            # Exact topic subscription
            if topic not in self._topic_subscribers:
                self._topic_subscribers[topic] = set()
            self._topic_subscribers[topic].add(subscription_id)
            logger.debug(
                "Subscribed %s to topic %s",
                subscription_id[:8],
                topic,
            )

    return subscription

Subscription dataclass

Subscription(
    subscription_id: str,
    topic: str,
    handler: Any,
    pattern: str | None = None,
    created_at: datetime = (lambda: datetime.now(timezone.utc))(),
    _cancel_callback: Any = None,
)

Handle for managing an event subscription.

Subscriptions are returned when subscribing to events and can be used to cancel the subscription later.

Attributes:

Name Type Description
subscription_id str

Unique identifier for this subscription

topic str

The topic pattern this subscription is for

handler Any

Reference to the handler function

pattern str | None

Optional wildcard pattern if using pattern matching

created_at datetime

When the subscription was created

Example
async def my_handler(event: Event) -> None:
    print(f"Got event: {event.type}")

subscription = await event_bus.subscribe("registry:*", my_handler)

# Later, to unsubscribe:
await subscription.cancel()

Methods:

Name Description
cancel

Cancel this subscription.

__repr__

String representation.

Functions
cancel async
cancel() -> None

Cancel this subscription.

After canceling, the handler will no longer receive events.

Source code in packages/common/src/dataknobs_common/events/types.py
async def cancel(self) -> None:
    """Cancel this subscription.

    After canceling, the handler will no longer receive events.
    """
    if self._cancel_callback:
        await self._cancel_callback(self.subscription_id)
__repr__
__repr__() -> str

String representation.

Source code in packages/common/src/dataknobs_common/events/types.py
def __repr__(self) -> str:
    """String representation."""
    return (
        f"Subscription(id={self.subscription_id!r}, "
        f"topic={self.topic!r}, pattern={self.pattern!r})"
    )

ConcurrencyError

ConcurrencyError(
    message: str,
    context: Dict[str, Any] | None = None,
    details: Dict[str, Any] | None = None,
)

Bases: DataknobsError

Raised when concurrent operation conflicts occur.

Use this exception for concurrency-related failures including: - Lock acquisition failures - Transaction conflicts - Race conditions - Optimistic locking failures

Example
raise ConcurrencyError(
    "Record modified by another process",
    context={"record_id": "123", "expected_version": 5, "actual_version": 6}
)
Source code in packages/common/src/dataknobs_common/exceptions.py
def __init__(
    self,
    message: str,
    context: Dict[str, Any] | None = None,
    details: Dict[str, Any] | None = None,
):
    """Initialize the exception with optional context.

    Args:
        message: Error message
        context: Optional context dictionary
        details: Optional details dictionary (merged with context)
    """
    super().__init__(message)
    # Support both context and details parameters
    # Details takes precedence if both are provided
    self.context = details or context or {}
    # Alias for FSM-style compatibility
    self.details = self.context

ConfigurationError

ConfigurationError(
    message: str,
    context: Dict[str, Any] | None = None,
    details: Dict[str, Any] | None = None,
)

Bases: DataknobsError

Raised when configuration is invalid or missing.

Use this exception for configuration-related errors including: - Missing required configuration - Invalid configuration values - Configuration file not found - Circular references in configuration

Example
raise ConfigurationError(
    "Database configuration missing",
    context={"config_key": "database.primary", "available_keys": ["cache", "auth"]}
)
Source code in packages/common/src/dataknobs_common/exceptions.py
def __init__(
    self,
    message: str,
    context: Dict[str, Any] | None = None,
    details: Dict[str, Any] | None = None,
):
    """Initialize the exception with optional context.

    Args:
        message: Error message
        context: Optional context dictionary
        details: Optional details dictionary (merged with context)
    """
    super().__init__(message)
    # Support both context and details parameters
    # Details takes precedence if both are provided
    self.context = details or context or {}
    # Alias for FSM-style compatibility
    self.details = self.context

DataknobsError

DataknobsError(
    message: str,
    context: Dict[str, Any] | None = None,
    details: Dict[str, Any] | None = None,
)

Bases: Exception

Base exception for all dataknobs packages.

This is the root exception that all dataknobs packages should extend. It supports optional context data for rich error information, making debugging and error handling more effective.

Attributes:

Name Type Description
context

Dictionary containing contextual information about the error

details

Alias for context (FSM-style compatibility)

Parameters:

Name Type Description Default
message str

Human-readable error message

required
context Dict[str, Any] | None

Optional dictionary with error context (field names, IDs, etc.)

None
details Dict[str, Any] | None

Alternative to context (both are supported for compatibility)

None
Example
error = DataknobsError(
    "Operation failed",
    context={"operation": "save", "item_id": "123"}
)
str(error)
# 'Operation failed'
error.context
# {'operation': 'save', 'item_id': '123'}

Initialize the exception with optional context.

Parameters:

Name Type Description Default
message str

Error message

required
context Dict[str, Any] | None

Optional context dictionary

None
details Dict[str, Any] | None

Optional details dictionary (merged with context)

None
Source code in packages/common/src/dataknobs_common/exceptions.py
def __init__(
    self,
    message: str,
    context: Dict[str, Any] | None = None,
    details: Dict[str, Any] | None = None,
):
    """Initialize the exception with optional context.

    Args:
        message: Error message
        context: Optional context dictionary
        details: Optional details dictionary (merged with context)
    """
    super().__init__(message)
    # Support both context and details parameters
    # Details takes precedence if both are provided
    self.context = details or context or {}
    # Alias for FSM-style compatibility
    self.details = self.context
Functions

NotFoundError

NotFoundError(
    message: str,
    context: Dict[str, Any] | None = None,
    details: Dict[str, Any] | None = None,
)

Bases: DataknobsError

Raised when a requested item is not found.

Use this exception when looking up items by ID, name, or key and they don't exist. Common scenarios include: - Record not found in database - Configuration key not found - File not found - Resource not registered

Example
raise NotFoundError(
    "Record not found",
    context={"record_id": "user-123", "table": "users"}
)
Source code in packages/common/src/dataknobs_common/exceptions.py
def __init__(
    self,
    message: str,
    context: Dict[str, Any] | None = None,
    details: Dict[str, Any] | None = None,
):
    """Initialize the exception with optional context.

    Args:
        message: Error message
        context: Optional context dictionary
        details: Optional details dictionary (merged with context)
    """
    super().__init__(message)
    # Support both context and details parameters
    # Details takes precedence if both are provided
    self.context = details or context or {}
    # Alias for FSM-style compatibility
    self.details = self.context

OperationError

OperationError(
    message: str,
    context: Dict[str, Any] | None = None,
    details: Dict[str, Any] | None = None,
)

Bases: DataknobsError

Raised when an operation fails.

Use this exception for general operation failures that don't fit other categories. Common scenarios include: - Database operation failures - File I/O errors - Network operation failures - State transition errors

Example
raise OperationError(
    "Failed to save record",
    context={"operation": "update", "backend": "postgres", "error": "connection lost"}
)
Source code in packages/common/src/dataknobs_common/exceptions.py
def __init__(
    self,
    message: str,
    context: Dict[str, Any] | None = None,
    details: Dict[str, Any] | None = None,
):
    """Initialize the exception with optional context.

    Args:
        message: Error message
        context: Optional context dictionary
        details: Optional details dictionary (merged with context)
    """
    super().__init__(message)
    # Support both context and details parameters
    # Details takes precedence if both are provided
    self.context = details or context or {}
    # Alias for FSM-style compatibility
    self.details = self.context

RateLimitError

RateLimitError(
    message: str = "Rate limit exceeded",
    retry_after: float | None = None,
    context: Dict[str, Any] | None = None,
    details: Dict[str, Any] | None = None,
)

Bases: OperationError

Raised when a rate limit is exceeded.

Use this exception when an operation cannot proceed because a rate limit has been reached. Includes an optional retry_after hint indicating how many seconds the caller should wait before retrying.

Attributes:

Name Type Description
retry_after

Optional number of seconds to wait before retrying.

Example
raise RateLimitError(
    "API rate limit exceeded",
    retry_after=2.5,
    context={"category": "api_write", "limit": 10, "interval": 60}
)

Initialize the rate limit error.

Parameters:

Name Type Description Default
message str

Error message.

'Rate limit exceeded'
retry_after float | None

Optional seconds to wait before retrying.

None
context Dict[str, Any] | None

Optional context dictionary.

None
details Dict[str, Any] | None

Optional details dictionary (merged with context).

None
Source code in packages/common/src/dataknobs_common/exceptions.py
def __init__(
    self,
    message: str = "Rate limit exceeded",
    retry_after: float | None = None,
    context: Dict[str, Any] | None = None,
    details: Dict[str, Any] | None = None,
) -> None:
    """Initialize the rate limit error.

    Args:
        message: Error message.
        retry_after: Optional seconds to wait before retrying.
        context: Optional context dictionary.
        details: Optional details dictionary (merged with context).
    """
    super().__init__(message, context=context, details=details)
    self.retry_after = retry_after
Functions

ResourceError

ResourceError(
    message: str,
    context: Dict[str, Any] | None = None,
    details: Dict[str, Any] | None = None,
)

Bases: DataknobsError

Raised when resource operations fail.

Use this exception for resource management failures including: - Resource acquisition failures - Connection errors - Resource pool exhaustion - Timeout errors

Example
raise ResourceError(
    "Failed to acquire database connection",
    context={"pool_size": 10, "active_connections": 10, "timeout": 30}
)
Source code in packages/common/src/dataknobs_common/exceptions.py
def __init__(
    self,
    message: str,
    context: Dict[str, Any] | None = None,
    details: Dict[str, Any] | None = None,
):
    """Initialize the exception with optional context.

    Args:
        message: Error message
        context: Optional context dictionary
        details: Optional details dictionary (merged with context)
    """
    super().__init__(message)
    # Support both context and details parameters
    # Details takes precedence if both are provided
    self.context = details or context or {}
    # Alias for FSM-style compatibility
    self.details = self.context

SerializationError

SerializationError(
    message: str,
    context: Dict[str, Any] | None = None,
    details: Dict[str, Any] | None = None,
)

Bases: DataknobsError

Raised when serialization or deserialization fails.

Use this exception for data format conversion errors including: - JSON encoding/decoding failures - Invalid data format - Schema mismatch - Type conversion errors

Example
raise SerializationError(
    "Cannot deserialize data",
    context={"format": "json", "field": "created_at", "value": "invalid-date"}
)
Source code in packages/common/src/dataknobs_common/exceptions.py
def __init__(
    self,
    message: str,
    context: Dict[str, Any] | None = None,
    details: Dict[str, Any] | None = None,
):
    """Initialize the exception with optional context.

    Args:
        message: Error message
        context: Optional context dictionary
        details: Optional details dictionary (merged with context)
    """
    super().__init__(message)
    # Support both context and details parameters
    # Details takes precedence if both are provided
    self.context = details or context or {}
    # Alias for FSM-style compatibility
    self.details = self.context

TimeoutError

TimeoutError(
    message: str,
    context: Dict[str, Any] | None = None,
    details: Dict[str, Any] | None = None,
)

Bases: DataknobsError

Raised when an operation times out.

Use this exception when operations exceed their time limit including: - Connection timeouts - Query timeouts - Resource acquisition timeouts - Operation execution timeouts

Example
raise TimeoutError(
    "Database query timed out",
    context={"query": "SELECT * FROM large_table", "timeout_seconds": 30}
)
Source code in packages/common/src/dataknobs_common/exceptions.py
def __init__(
    self,
    message: str,
    context: Dict[str, Any] | None = None,
    details: Dict[str, Any] | None = None,
):
    """Initialize the exception with optional context.

    Args:
        message: Error message
        context: Optional context dictionary
        details: Optional details dictionary (merged with context)
    """
    super().__init__(message)
    # Support both context and details parameters
    # Details takes precedence if both are provided
    self.context = details or context or {}
    # Alias for FSM-style compatibility
    self.details = self.context

ValidationError

ValidationError(
    message: str,
    context: Dict[str, Any] | None = None,
    details: Dict[str, Any] | None = None,
)

Bases: DataknobsError

Raised when validation fails.

Use this exception when data or configuration fails validation checks. Common scenarios include: - Invalid input data - Schema validation failures - Constraint violations - Type mismatches

Example
raise ValidationError(
    "Email format invalid",
    context={"field": "email", "value": "not-an-email"}
)
Source code in packages/common/src/dataknobs_common/exceptions.py
def __init__(
    self,
    message: str,
    context: Dict[str, Any] | None = None,
    details: Dict[str, Any] | None = None,
):
    """Initialize the exception with optional context.

    Args:
        message: Error message
        context: Optional context dictionary
        details: Optional details dictionary (merged with context)
    """
    super().__init__(message)
    # Support both context and details parameters
    # Details takes precedence if both are provided
    self.context = details or context or {}
    # Alias for FSM-style compatibility
    self.details = self.context

InMemoryRateLimiter

InMemoryRateLimiter(config: RateLimiterConfig)

Sliding-window rate limiter backed by in-memory data structures.

This implementation is suitable for: - Single-process applications - Development and testing - Scenarios where rate limiting does not need to cross process boundaries

Features: - Sliding window log algorithm for accurate rate tracking - Per-category rate configuration with fallback to default rates - Weighted acquire support - Non-blocking (try_acquire) and blocking (acquire) modes - Thread-safe using asyncio.Lock

Limitations: - State is not persisted; restarting the process resets all counters - Does not work across multiple processes or machines - For distributed rate limiting, use PyrateRateLimiter with a Redis or PostgreSQL bucket

Example
from dataknobs_common.ratelimit import InMemoryRateLimiter, RateLimiterConfig, RateLimit

config = RateLimiterConfig(
    default_rates=[RateLimit(limit=10, interval=60)],
    categories={
        "api_write": [RateLimit(limit=5, interval=60)],
    },
)
limiter = InMemoryRateLimiter(config)

# Non-blocking check
if await limiter.try_acquire("api_write"):
    await make_api_call()

# Blocking — waits until capacity is available
await limiter.acquire("api_read")
await make_api_call()

Initialize the in-memory rate limiter.

Parameters:

Name Type Description Default
config RateLimiterConfig

Rate limiter configuration with default rates and optional per-category overrides.

required

Methods:

Name Description
try_acquire

Attempt to acquire capacity without blocking.

acquire

Acquire capacity, blocking until available.

get_status

Get the current status of a rate limiter bucket.

reset

Reset rate limiter state.

close

Release resources.

Source code in packages/common/src/dataknobs_common/ratelimit/memory.py
def __init__(self, config: RateLimiterConfig) -> None:
    """Initialize the in-memory rate limiter.

    Args:
        config: Rate limiter configuration with default rates and
            optional per-category overrides.
    """
    self._config = config
    # Per-bucket sliding window: name -> list of (monotonic_time, weight)
    self._buckets: dict[str, list[tuple[float, int]]] = {}
    self._lock = asyncio.Lock()
Functions
try_acquire async
try_acquire(name: str = 'default', weight: int = 1) -> bool

Attempt to acquire capacity without blocking.

Parameters:

Name Type Description Default
name str

Category name. Uses category-specific rates if configured, otherwise falls back to default rates.

'default'
weight int

Weight of the operation (default 1).

1

Returns:

Type Description
bool

True if the acquire succeeded, False if the rate limit would

bool

be exceeded.

Source code in packages/common/src/dataknobs_common/ratelimit/memory.py
async def try_acquire(self, name: str = "default", weight: int = 1) -> bool:
    """Attempt to acquire capacity without blocking.

    Args:
        name: Category name. Uses category-specific rates if configured,
            otherwise falls back to default rates.
        weight: Weight of the operation (default 1).

    Returns:
        True if the acquire succeeded, False if the rate limit would
        be exceeded.
    """
    rates = self._get_rates(name)
    async with self._lock:
        now = time.monotonic()
        self._prune(name, now, rates)
        if self._is_allowed(name, weight, now, rates):
            self._record(name, weight, now)
            logger.debug(
                "Rate limit acquired for %s (weight=%d)", name, weight
            )
            return True
        logger.debug(
            "Rate limit denied for %s (weight=%d)", name, weight
        )
        return False
acquire async
acquire(
    name: str = "default", weight: int = 1, timeout: float | None = None
) -> None

Acquire capacity, blocking until available.

Polls at 50 ms intervals until capacity is available or the timeout is exceeded.

Parameters:

Name Type Description Default
name str

Category name.

'default'
weight int

Weight of the operation (default 1).

1
timeout float | None

Maximum seconds to wait. None means wait indefinitely.

None

Raises:

Type Description
TimeoutError

If the timeout is exceeded before capacity becomes available.

Source code in packages/common/src/dataknobs_common/ratelimit/memory.py
async def acquire(
    self,
    name: str = "default",
    weight: int = 1,
    timeout: float | None = None,
) -> None:
    """Acquire capacity, blocking until available.

    Polls at 50 ms intervals until capacity is available or the
    timeout is exceeded.

    Args:
        name: Category name.
        weight: Weight of the operation (default 1).
        timeout: Maximum seconds to wait. ``None`` means wait
            indefinitely.

    Raises:
        TimeoutError: If the timeout is exceeded before capacity
            becomes available.
    """
    deadline = (time.monotonic() + timeout) if timeout is not None else None
    while True:
        if await self.try_acquire(name, weight):
            return
        if deadline is not None and time.monotonic() >= deadline:
            raise TimeoutError(
                f"Rate limit acquire timed out for '{name}'",
                context={"name": name, "weight": weight, "timeout": timeout},
            )
        await asyncio.sleep(_POLL_INTERVAL)
get_status async
get_status(name: str = 'default') -> RateLimitStatus

Get the current status of a rate limiter bucket.

Reports against the tightest (smallest limit) applicable rate.

Parameters:

Name Type Description Default
name str

Category name.

'default'

Returns:

Type Description
RateLimitStatus

Current status including count, limit, remaining capacity,

RateLimitStatus

and time until the oldest entry expires.

Source code in packages/common/src/dataknobs_common/ratelimit/memory.py
async def get_status(self, name: str = "default") -> RateLimitStatus:
    """Get the current status of a rate limiter bucket.

    Reports against the tightest (smallest limit) applicable rate.

    Args:
        name: Category name.

    Returns:
        Current status including count, limit, remaining capacity,
        and time until the oldest entry expires.
    """
    rates = self._get_rates(name)
    async with self._lock:
        now = time.monotonic()
        self._prune(name, now, rates)
        bucket = self._buckets.get(name, [])

        # Find the tightest rate (most restrictive remaining capacity)
        tightest_rate = rates[0]
        tightest_remaining = tightest_rate.limit
        for rate in rates:
            window_start = now - rate.interval
            window_weight = sum(w for ts, w in bucket if ts > window_start)
            remaining = rate.limit - window_weight
            if remaining < tightest_remaining:
                tightest_remaining = remaining
                tightest_rate = rate

        # Calculate current count and reset_after for the tightest rate
        window_start = now - tightest_rate.interval
        current_count = sum(w for ts, w in bucket if ts > window_start)
        remaining = max(0, tightest_rate.limit - current_count)

        # reset_after: time until the oldest entry in the window expires
        reset_after = 0.0
        window_entries = [(ts, w) for ts, w in bucket if ts > window_start]
        if window_entries:
            oldest_ts = window_entries[0][0]
            reset_after = max(0.0, (oldest_ts + tightest_rate.interval) - now)

        return RateLimitStatus(
            name=name,
            current_count=current_count,
            limit=tightest_rate.limit,
            remaining=remaining,
            reset_after=reset_after,
        )
reset async
reset(name: str | None = None) -> None

Reset rate limiter state.

Parameters:

Name Type Description Default
name str | None

Category to reset. If None, resets all categories.

None
Source code in packages/common/src/dataknobs_common/ratelimit/memory.py
async def reset(self, name: str | None = None) -> None:
    """Reset rate limiter state.

    Args:
        name: Category to reset. If ``None``, resets all categories.
    """
    async with self._lock:
        if name is None:
            self._buckets.clear()
            logger.debug("All rate limiter buckets reset")
        else:
            self._buckets.pop(name, None)
            logger.debug("Rate limiter bucket %s reset", name)
close async
close() -> None

Release resources.

For the in-memory implementation this is a no-op, but it satisfies the RateLimiter protocol.

Source code in packages/common/src/dataknobs_common/ratelimit/memory.py
async def close(self) -> None:
    """Release resources.

    For the in-memory implementation this is a no-op, but it
    satisfies the ``RateLimiter`` protocol.
    """
    pass

RateLimit dataclass

RateLimit(limit: int, interval: float)

A single rate limit rule.

Defines the maximum number of operations (or total weight) allowed within a time interval.

Attributes:

Name Type Description
limit int

Maximum operations (or total weight) per interval.

interval float

Window duration in seconds.

Example
# 100 requests per minute
rate = RateLimit(limit=100, interval=60)

# 10 requests per second
rate = RateLimit(limit=10, interval=1)

RateLimiter

Bases: Protocol

Abstract rate limiter protocol supporting multiple backends.

The RateLimiter provides both blocking and non-blocking rate limiting with per-category rate configuration. Different implementations support various deployment scenarios:

  • InMemoryRateLimiter: Single process, no external dependencies
  • PyrateRateLimiter: Wraps pyrate-limiter for distributed backends (Redis, PostgreSQL, SQLite)

All implementations follow this protocol, allowing configuration-driven backend selection without code changes.

Example
from dataknobs_common.ratelimit import create_rate_limiter

limiter = create_rate_limiter({
    "default_rates": [{"limit": 100, "interval": 60}],
    "categories": {
        "api_write": {"rates": [{"limit": 10, "interval": 60}]},
    },
})

# Non-blocking
if await limiter.try_acquire("api_write"):
    await make_api_call()

# Blocking with timeout
await limiter.acquire("api_read", timeout=5.0)

Methods:

Name Description
acquire

Acquire capacity, blocking until available.

try_acquire

Attempt to acquire capacity without blocking.

get_status

Get the current status of a rate limiter bucket.

reset

Reset rate limiter state.

close

Release resources held by the rate limiter.

Functions
acquire async
acquire(
    name: str = "default", weight: int = 1, timeout: float | None = None
) -> None

Acquire capacity, blocking until available.

Parameters:

Name Type Description Default
name str

Category name for rate lookup.

'default'
weight int

Weight of the operation (default 1).

1
timeout float | None

Maximum seconds to wait. None means wait indefinitely.

None

Raises:

Type Description
TimeoutError

If the timeout is exceeded.

Source code in packages/common/src/dataknobs_common/ratelimit/limiter.py
async def acquire(
    self,
    name: str = "default",
    weight: int = 1,
    timeout: float | None = None,
) -> None:
    """Acquire capacity, blocking until available.

    Args:
        name: Category name for rate lookup.
        weight: Weight of the operation (default 1).
        timeout: Maximum seconds to wait. ``None`` means wait
            indefinitely.

    Raises:
        TimeoutError: If the timeout is exceeded.
    """
    ...
try_acquire async
try_acquire(name: str = 'default', weight: int = 1) -> bool

Attempt to acquire capacity without blocking.

Parameters:

Name Type Description Default
name str

Category name for rate lookup.

'default'
weight int

Weight of the operation (default 1).

1

Returns:

Type Description
bool

True if the acquire succeeded, False otherwise.

Source code in packages/common/src/dataknobs_common/ratelimit/limiter.py
async def try_acquire(self, name: str = "default", weight: int = 1) -> bool:
    """Attempt to acquire capacity without blocking.

    Args:
        name: Category name for rate lookup.
        weight: Weight of the operation (default 1).

    Returns:
        True if the acquire succeeded, False otherwise.
    """
    ...
get_status async
get_status(name: str = 'default') -> RateLimitStatus

Get the current status of a rate limiter bucket.

Parameters:

Name Type Description Default
name str

Category name.

'default'

Returns:

Type Description
RateLimitStatus

Current status of the bucket.

Source code in packages/common/src/dataknobs_common/ratelimit/limiter.py
async def get_status(self, name: str = "default") -> RateLimitStatus:
    """Get the current status of a rate limiter bucket.

    Args:
        name: Category name.

    Returns:
        Current status of the bucket.
    """
    ...
reset async
reset(name: str | None = None) -> None

Reset rate limiter state.

Parameters:

Name Type Description Default
name str | None

Category to reset. If None, resets all categories.

None
Source code in packages/common/src/dataknobs_common/ratelimit/limiter.py
async def reset(self, name: str | None = None) -> None:
    """Reset rate limiter state.

    Args:
        name: Category to reset. If ``None``, resets all categories.
    """
    ...
close async
close() -> None

Release resources held by the rate limiter.

Source code in packages/common/src/dataknobs_common/ratelimit/limiter.py
async def close(self) -> None:
    """Release resources held by the rate limiter."""
    ...

RateLimiterConfig dataclass

RateLimiterConfig(
    default_rates: list[RateLimit],
    categories: dict[str, list[RateLimit]] = dict(),
)

Configuration for a rate limiter.

Supports per-category rate overrides. When acquire() is called with a category name, rates are looked up in categories first; if the category is not found, default_rates are used.

Attributes:

Name Type Description
default_rates list[RateLimit]

Fallback rates applied when a category has no specific configuration.

categories dict[str, list[RateLimit]]

Per-category rate overrides mapping category names to their rate lists.

Example
config = RateLimiterConfig(
    default_rates=[RateLimit(limit=100, interval=60)],
    categories={
        "api_read":  [RateLimit(limit=100, interval=6)],
        "api_write": [RateLimit(limit=10, interval=6)],
    },
)

RateLimitStatus dataclass

RateLimitStatus(
    name: str,
    current_count: int,
    limit: int,
    remaining: int,
    reset_after: float,
)

Current status of a rate limiter bucket.

Attributes:

Name Type Description
name str

Category or bucket name.

current_count int

Number of operations (total weight) in the current window.

limit int

Maximum allowed from the tightest applicable rate.

remaining int

Operations remaining before the limit is reached.

reset_after float

Seconds until the oldest entry expires and capacity is freed.

Example
status = await limiter.get_status("api_write")
if status.remaining < 5:
    logger.warning("Approaching rate limit for %s", status.name)

BackoffStrategy

Bases: Enum

Backoff strategies for retries.

Attributes:

Name Type Description
FIXED

Fixed delay between retries.

LINEAR

Delay increases linearly with each attempt.

EXPONENTIAL

Delay doubles (or multiplies by backoff_multiplier) with each attempt.

JITTER

Exponential backoff with random jitter applied.

DECORRELATED

Decorrelated jitter: each delay is random between initial_delay and 3x previous delay.

Attributes
FIXED class-attribute instance-attribute
FIXED = 'fixed'

Fixed delay between retries.

LINEAR class-attribute instance-attribute
LINEAR = 'linear'

Delay increases linearly with each attempt.

EXPONENTIAL class-attribute instance-attribute
EXPONENTIAL = 'exponential'

Delay doubles (or multiplies by backoff_multiplier) with each attempt.

JITTER class-attribute instance-attribute
JITTER = 'jitter'

Exponential backoff with random jitter applied.

DECORRELATED class-attribute instance-attribute
DECORRELATED = 'decorrelated'

Decorrelated jitter: each delay is random between initial_delay and 3x previous delay.

RetryConfig dataclass

RetryConfig(
    max_attempts: int = 3,
    initial_delay: float = 1.0,
    max_delay: float = 60.0,
    backoff_strategy: BackoffStrategy = BackoffStrategy.EXPONENTIAL,
    backoff_multiplier: float = 2.0,
    jitter_range: float = 0.1,
    retry_on_exceptions: list[type] | None = None,
    retry_on_result: Callable[[Any], bool] | None = None,
    on_retry: Callable[[int, Exception], None] | None = None,
    on_failure: Callable[[Exception], None] | None = None,
)

Configuration for retry behavior.

Attributes:

Name Type Description
max_attempts int

Maximum number of execution attempts (including the first).

initial_delay float

Base delay in seconds before the first retry.

max_delay float

Upper bound on delay in seconds.

backoff_strategy BackoffStrategy

Algorithm for computing delay between retries.

backoff_multiplier float

Multiplier for exponential and jitter strategies.

jitter_range float

Fractional jitter range for the JITTER strategy (e.g. 0.1 = ±10%).

retry_on_exceptions list[type] | None

If set, only retry when the exception is an instance of one of these types. Other exceptions propagate immediately.

retry_on_result Callable[[Any], bool] | None

If set, called with the result value. Return True to trigger a retry (e.g. to retry on empty or sentinel results).

on_retry Callable[[int, Exception], None] | None

Hook called before each retry sleep with (attempt_number, exception).

on_failure Callable[[Exception], None] | None

Hook called when all attempts are exhausted with the final exception.

RetryExecutor

RetryExecutor(config: RetryConfig)

Executes a callable with retry logic and configurable backoff.

Supports both sync and async callables. Sync callables are invoked directly; async callables are awaited.

Example
config = RetryConfig(max_attempts=3, backoff_strategy=BackoffStrategy.FIXED)
executor = RetryExecutor(config)

# Async usage
result = await executor.execute(fetch_data, url)

# Sync callable also works (called from async context)
result = await executor.execute(parse_json, raw_text)

Methods:

Name Description
execute

Execute a callable with retry logic.

Source code in packages/common/src/dataknobs_common/retry.py
def __init__(self, config: RetryConfig) -> None:
    self.config = config
Functions
execute async
execute(func: Callable[..., Any], *args: Any, **kwargs: Any) -> Any

Execute a callable with retry logic.

Parameters:

Name Type Description Default
func Callable[..., Any]

The callable to execute. May be sync or async.

required
*args Any

Positional arguments forwarded to func.

()
**kwargs Any

Keyword arguments forwarded to func.

{}

Returns:

Type Description
Any

The return value of func on a successful attempt.

Raises:

Type Description
Exception

The exception from the final failed attempt, or any non-retryable exception immediately.

Source code in packages/common/src/dataknobs_common/retry.py
async def execute(self, func: Callable[..., Any], *args: Any, **kwargs: Any) -> Any:
    """Execute a callable with retry logic.

    Args:
        func: The callable to execute. May be sync or async.
        *args: Positional arguments forwarded to func.
        **kwargs: Keyword arguments forwarded to func.

    Returns:
        The return value of func on a successful attempt.

    Raises:
        Exception: The exception from the final failed attempt, or any
            non-retryable exception immediately.
    """
    last_exception: Exception | None = None
    previous_delay: float | None = None
    is_coro = asyncio.iscoroutinefunction(func)

    for attempt in range(1, self.config.max_attempts + 1):
        try:
            result = await func(*args, **kwargs) if is_coro else func(*args, **kwargs)

            # Check if we should retry based on the result value
            if self.config.retry_on_result and self.config.retry_on_result(result):
                if attempt < self.config.max_attempts:
                    delay = self._calculate_delay(attempt, previous_delay)
                    previous_delay = delay
                    logger.debug(
                        "Retry on result (attempt %d/%d), delay=%.2fs",
                        attempt, self.config.max_attempts, delay,
                    )
                    await asyncio.sleep(delay)
                    continue

            return result

        except Exception as e:
            last_exception = e

            # If exception filtering is configured, only retry matching types
            if self.config.retry_on_exceptions:
                if not any(
                    isinstance(e, exc_type)
                    for exc_type in self.config.retry_on_exceptions
                ):
                    raise

            if attempt < self.config.max_attempts:
                delay = self._calculate_delay(attempt, previous_delay)
                previous_delay = delay

                if self.config.on_retry:
                    self.config.on_retry(attempt, e)

                logger.debug(
                    "Retry after exception (attempt %d/%d), delay=%.2fs: %s",
                    attempt, self.config.max_attempts, delay, e,
                )
                await asyncio.sleep(delay)
            else:
                if self.config.on_failure:
                    self.config.on_failure(e)
                raise

    # Should not be reached, but satisfies type checker
    raise last_exception  # type: ignore[misc]

InvalidTransitionError

InvalidTransitionError(
    entity: str,
    current_status: str,
    target_status: str,
    allowed: set[str] | None = None,
)

Bases: OperationError

Raised when a status transition is not allowed.

Attributes:

Name Type Description
entity

Name of the entity or transition graph (e.g. "run_status").

current_status

The current status that was being transitioned from.

target_status

The target status that was rejected.

allowed

The set of statuses that are valid targets from current_status, or None if the current status itself is unknown.

Source code in packages/common/src/dataknobs_common/transitions.py
def __init__(
    self,
    entity: str,
    current_status: str,
    target_status: str,
    allowed: set[str] | None = None,
) -> None:
    self.entity = entity
    self.current_status = current_status
    self.target_status = target_status
    self.allowed = allowed

    if allowed is not None:
        allowed_str = ", ".join(sorted(allowed)) if allowed else "(none — terminal)"
        message = (
            f"{entity}: cannot transition from '{current_status}' to '{target_status}'. "
            f"Allowed targets: {allowed_str}"
        )
    else:
        message = (
            f"{entity}: unknown current status '{current_status}'"
        )

    super().__init__(
        message,
        context={
            "entity": entity,
            "current_status": current_status,
            "target_status": target_status,
            "allowed": sorted(allowed) if allowed else [],
        },
    )

TransitionValidator

TransitionValidator(name: str, transitions: dict[str, set[str]])

Stateless validator for declarative transition graphs.

Declares which status transitions are allowed and validates proposed transitions. Does not manage or store state — the caller owns the current status.

Parameters:

Name Type Description Default
name str

Human-readable name for this transition graph, used in error messages (e.g. "run_status", "order_state").

required
transitions dict[str, set[str]]

Mapping from each status to the set of statuses it may transition to. Statuses that appear only as targets (not as keys) are implicitly terminal (no outgoing transitions).

required
Example
ORDER = TransitionValidator("order", {
    "draft":     {"submitted"},
    "submitted": {"approved", "rejected"},
    "approved":  {"shipped"},
    "shipped":   {"delivered"},
    "rejected":  set(),
    "delivered":  set(),
})

ORDER.validate("draft", "submitted")  # ok
ORDER.validate("shipped", "draft")    # raises InvalidTransitionError

Methods:

Name Description
validate

Validate a proposed transition.

is_valid

Check whether a transition is allowed without raising.

get_reachable

Compute all statuses reachable from a given status (transitive closure).

Attributes:

Name Type Description
name str

The name of this transition graph.

allowed_transitions dict[str, set[str]]

Return a copy of the full transition graph.

statuses set[str]

Return all known statuses (sources and targets).

Source code in packages/common/src/dataknobs_common/transitions.py
def __init__(self, name: str, transitions: dict[str, set[str]]) -> None:
    self._name = name
    self._transitions = {k: set(v) for k, v in transitions.items()}
Attributes
name property
name: str

The name of this transition graph.

allowed_transitions property
allowed_transitions: dict[str, set[str]]

Return a copy of the full transition graph.

statuses property
statuses: set[str]

Return all known statuses (sources and targets).

Functions
validate
validate(current_status: str | None, target_status: str) -> None

Validate a proposed transition.

Parameters:

Name Type Description Default
current_status str | None

The current status. If None, validation is skipped (useful for callers that don't yet know the current state).

required
target_status str

The desired target status.

required

Raises:

Type Description
InvalidTransitionError

If the transition is not allowed.

Source code in packages/common/src/dataknobs_common/transitions.py
def validate(self, current_status: str | None, target_status: str) -> None:
    """Validate a proposed transition.

    Args:
        current_status: The current status. If ``None``, validation is
            skipped (useful for callers that don't yet know the current
            state).
        target_status: The desired target status.

    Raises:
        InvalidTransitionError: If the transition is not allowed.
    """
    if current_status is None:
        return

    allowed = self._transitions.get(current_status)
    if allowed is None:
        raise InvalidTransitionError(
            entity=self._name,
            current_status=current_status,
            target_status=target_status,
            allowed=None,
        )

    if target_status not in allowed:
        raise InvalidTransitionError(
            entity=self._name,
            current_status=current_status,
            target_status=target_status,
            allowed=allowed,
        )
is_valid
is_valid(current_status: str | None, target_status: str) -> bool

Check whether a transition is allowed without raising.

Parameters:

Name Type Description Default
current_status str | None

The current status. If None, returns True (same skip behavior as :meth:validate).

required
target_status str

The desired target status.

required

Returns:

Type Description
bool

True if the transition is allowed (or current is None),

bool

False otherwise.

Source code in packages/common/src/dataknobs_common/transitions.py
def is_valid(self, current_status: str | None, target_status: str) -> bool:
    """Check whether a transition is allowed without raising.

    Args:
        current_status: The current status. If ``None``, returns ``True``
            (same skip behavior as :meth:`validate`).
        target_status: The desired target status.

    Returns:
        ``True`` if the transition is allowed (or current is ``None``),
        ``False`` otherwise.
    """
    if current_status is None:
        return True
    allowed = self._transitions.get(current_status)
    return allowed is not None and target_status in allowed
get_reachable
get_reachable(from_status: str) -> set[str]

Compute all statuses reachable from a given status (transitive closure).

Parameters:

Name Type Description Default
from_status str

The starting status.

required

Returns:

Type Description
set[str]

Set of all statuses reachable via one or more transitions.

set[str]

Does not include from_status itself unless there is a cycle.

Raises:

Type Description
InvalidTransitionError

If from_status is not a known status.

Source code in packages/common/src/dataknobs_common/transitions.py
def get_reachable(self, from_status: str) -> set[str]:
    """Compute all statuses reachable from a given status (transitive closure).

    Args:
        from_status: The starting status.

    Returns:
        Set of all statuses reachable via one or more transitions.
        Does not include ``from_status`` itself unless there is a cycle.

    Raises:
        InvalidTransitionError: If ``from_status`` is not a known status.
    """
    if from_status not in self._transitions:
        raise InvalidTransitionError(
            entity=self._name,
            current_status=from_status,
            target_status="",
            allowed=None,
        )

    reachable: set[str] = set()
    frontier = set(self._transitions.get(from_status, set()))

    while frontier:
        status = frontier.pop()
        if status not in reachable:
            reachable.add(status)
            next_targets = self._transitions.get(status, set())
            frontier.update(next_targets - reachable)

    return reachable

AsyncRegistry

AsyncRegistry(name: str, enable_metrics: bool = False)

Bases: Generic[T]

Async-safe registry for managing named items.

Similar to Registry but uses asyncio locks for async-safe operations. Use this when working in async contexts.

Parameters:

Name Type Description Default
name str

Registry name

required
enable_metrics bool

Enable metrics tracking

False
Example

registry = AsyncRegistryTool await registry.register("tool1", my_tool) tool = await registry.get("tool1")

Initialize async registry.

Parameters:

Name Type Description Default
name str

Registry name

required
enable_metrics bool

Enable metrics tracking

False

Methods:

Name Description
register

Register an item by key.

unregister

Unregister and return an item.

get

Get an item by key.

get_optional

Get an item, returning None if not found.

has

Check if item exists.

list_keys

List all registered keys.

list_items

List all registered items.

items

Get all key-item pairs.

count

Get count of registered items.

clear

Clear all items.

get_metrics

Get registration metrics.

__len__

Get number of registered items using len().

__contains__

Check if item exists using 'in' operator.

__iter__

Iterate over registered items.

Attributes:

Name Type Description
name str

Get registry name.

Source code in packages/common/src/dataknobs_common/registry.py
def __init__(self, name: str, enable_metrics: bool = False):
    """Initialize async registry.

    Args:
        name: Registry name
        enable_metrics: Enable metrics tracking
    """
    self._name = name
    self._items: Dict[str, T] = {}
    self._lock = asyncio.Lock()
    self._metrics: Dict[str, Dict[str, Any]] | None = {} if enable_metrics else None
Attributes
name property
name: str

Get registry name.

Functions
register async
register(
    key: str,
    item: T,
    metadata: Dict[str, Any] | None = None,
    allow_overwrite: bool = False,
) -> None

Register an item by key.

Parameters:

Name Type Description Default
key str

Unique identifier

required
item T

Item to register

required
metadata Dict[str, Any] | None

Optional metadata

None
allow_overwrite bool

Allow overwriting existing items

False

Raises:

Type Description
OperationError

If item exists and allow_overwrite is False

Source code in packages/common/src/dataknobs_common/registry.py
async def register(
    self,
    key: str,
    item: T,
    metadata: Dict[str, Any] | None = None,
    allow_overwrite: bool = False,
) -> None:
    """Register an item by key.

    Args:
        key: Unique identifier
        item: Item to register
        metadata: Optional metadata
        allow_overwrite: Allow overwriting existing items

    Raises:
        OperationError: If item exists and allow_overwrite is False
    """
    async with self._lock:
        if not allow_overwrite and key in self._items:
            raise OperationError(
                f"Item '{key}' already registered in {self._name}",
                context={"key": key, "registry": self._name},
            )

        self._items[key] = item

        if self._metrics is not None:
            self._metrics[key] = {
                "registered_at": time.time(),
                "metadata": metadata or {},
            }
unregister async
unregister(key: str) -> T

Unregister and return an item.

Parameters:

Name Type Description Default
key str

Key to unregister

required

Returns:

Type Description
T

The unregistered item

Raises:

Type Description
NotFoundError

If item not found

Source code in packages/common/src/dataknobs_common/registry.py
async def unregister(self, key: str) -> T:
    """Unregister and return an item.

    Args:
        key: Key to unregister

    Returns:
        The unregistered item

    Raises:
        NotFoundError: If item not found
    """
    async with self._lock:
        if key not in self._items:
            raise NotFoundError(
                f"Item not found: {key}",
                context={"key": key, "registry": self._name},
            )

        item = self._items.pop(key)

        if self._metrics is not None and key in self._metrics:
            del self._metrics[key]

        return item
get async
get(key: str) -> T

Get an item by key.

Parameters:

Name Type Description Default
key str

Key to retrieve

required

Returns:

Type Description
T

The registered item

Raises:

Type Description
NotFoundError

If item not found

Source code in packages/common/src/dataknobs_common/registry.py
async def get(self, key: str) -> T:
    """Get an item by key.

    Args:
        key: Key to retrieve

    Returns:
        The registered item

    Raises:
        NotFoundError: If item not found
    """
    async with self._lock:
        if key not in self._items:
            raise NotFoundError(
                f"Item not found: {key}",
                context={"key": key, "registry": self._name, "available_keys": list(self._items.keys())},
            )
        return self._items[key]
get_optional async
get_optional(key: str) -> T | None

Get an item, returning None if not found.

Parameters:

Name Type Description Default
key str

Key to retrieve

required

Returns:

Type Description
T | None

The item or None

Source code in packages/common/src/dataknobs_common/registry.py
async def get_optional(self, key: str) -> T | None:
    """Get an item, returning None if not found.

    Args:
        key: Key to retrieve

    Returns:
        The item or None
    """
    async with self._lock:
        return self._items.get(key)
has async
has(key: str) -> bool

Check if item exists.

Parameters:

Name Type Description Default
key str

Key to check

required

Returns:

Type Description
bool

True if exists

Source code in packages/common/src/dataknobs_common/registry.py
async def has(self, key: str) -> bool:
    """Check if item exists.

    Args:
        key: Key to check

    Returns:
        True if exists
    """
    async with self._lock:
        return key in self._items
list_keys async
list_keys() -> List[str]

List all registered keys.

Returns:

Type Description
List[str]

List of keys

Source code in packages/common/src/dataknobs_common/registry.py
async def list_keys(self) -> List[str]:
    """List all registered keys.

    Returns:
        List of keys
    """
    async with self._lock:
        return list(self._items.keys())
list_items async
list_items() -> List[T]

List all registered items.

Returns:

Type Description
List[T]

List of items

Source code in packages/common/src/dataknobs_common/registry.py
async def list_items(self) -> List[T]:
    """List all registered items.

    Returns:
        List of items
    """
    async with self._lock:
        return list(self._items.values())
items async
items() -> List[tuple[str, T]]

Get all key-item pairs.

Returns:

Type Description
List[tuple[str, T]]

List of (key, item) tuples

Source code in packages/common/src/dataknobs_common/registry.py
async def items(self) -> List[tuple[str, T]]:
    """Get all key-item pairs.

    Returns:
        List of (key, item) tuples
    """
    async with self._lock:
        return list(self._items.items())
count async
count() -> int

Get count of registered items.

Returns:

Type Description
int

Number of items

Source code in packages/common/src/dataknobs_common/registry.py
async def count(self) -> int:
    """Get count of registered items.

    Returns:
        Number of items
    """
    async with self._lock:
        return len(self._items)
clear async
clear() -> None

Clear all items.

Source code in packages/common/src/dataknobs_common/registry.py
async def clear(self) -> None:
    """Clear all items."""
    async with self._lock:
        self._items.clear()
        if self._metrics is not None:
            self._metrics.clear()
get_metrics async
get_metrics(key: str | None = None) -> Dict[str, Any]

Get registration metrics.

Parameters:

Name Type Description Default
key str | None

Optional specific key

None

Returns:

Type Description
Dict[str, Any]

Metrics dictionary

Source code in packages/common/src/dataknobs_common/registry.py
async def get_metrics(self, key: str | None = None) -> Dict[str, Any]:
    """Get registration metrics.

    Args:
        key: Optional specific key

    Returns:
        Metrics dictionary
    """
    async with self._lock:
        if self._metrics is None:
            return {}

        if key:
            return self._metrics.get(key, {})

        return dict(self._metrics)
__len__
__len__() -> int

Get number of registered items using len().

Source code in packages/common/src/dataknobs_common/registry.py
def __len__(self) -> int:
    """Get number of registered items using len()."""
    # Note: This is synchronous but safe since it just reads the dict
    return len(self._items)
__contains__
__contains__(key: str) -> bool

Check if item exists using 'in' operator.

Source code in packages/common/src/dataknobs_common/registry.py
def __contains__(self, key: str) -> bool:
    """Check if item exists using 'in' operator."""
    # Note: This is synchronous but safe since it just reads the dict
    return key in self._items
__iter__
__iter__()

Iterate over registered items.

Source code in packages/common/src/dataknobs_common/registry.py
def __iter__(self):
    """Iterate over registered items."""
    # Note: Returns iterator over current snapshot
    return iter(list(self._items.values()))

CachedRegistry

CachedRegistry(name: str, cache_ttl: int = 300, max_cache_size: int = 1000)

Bases: Registry[T]

Registry with time-based caching support.

Extends the base registry with caching capabilities. Items can be retrieved from cache with automatic expiration and refresh based on TTL. Implements LRU eviction when cache size exceeds limits.

Parameters:

Name Type Description Default
name str

Registry name

required
cache_ttl int

Cache time-to-live in seconds (default: 300)

300
max_cache_size int

Maximum number of cached items (default: 1000)

1000
Example
registry = CachedRegistry[Bot]("bots", cache_ttl=300)
bot = registry.get_cached(
    "client1",
    factory=lambda: create_bot("client1")
)

Initialize cached registry.

Parameters:

Name Type Description Default
name str

Registry name

required
cache_ttl int

Time-to-live for cached items in seconds

300
max_cache_size int

Maximum cache size before eviction

1000

Methods:

Name Description
get_cached

Get item from cache with automatic refresh.

invalidate_cache

Invalidate cache for a key or all keys.

get_cache_stats

Get cache statistics.

Source code in packages/common/src/dataknobs_common/registry.py
def __init__(
    self,
    name: str,
    cache_ttl: int = 300,
    max_cache_size: int = 1000,
):
    """Initialize cached registry.

    Args:
        name: Registry name
        cache_ttl: Time-to-live for cached items in seconds
        max_cache_size: Maximum cache size before eviction
    """
    super().__init__(name, enable_metrics=True)
    self._cache: Dict[str, tuple[T, float]] = {}
    self._cache_ttl = cache_ttl
    self._max_cache_size = max_cache_size
    self._cache_hits = 0
    self._cache_misses = 0
Functions
get_cached
get_cached(
    key: str, factory: Callable[[], T], force_refresh: bool = False
) -> T

Get item from cache with automatic refresh.

If item exists in cache and is not expired, returns cached version. Otherwise, calls factory to create new item and caches it.

Parameters:

Name Type Description Default
key str

Cache key

required
factory Callable[[], T]

Callable that creates the item if not cached

required
force_refresh bool

Force refresh even if cached

False

Returns:

Type Description
T

Cached or newly created item

Example
def create_bot():
    return Bot("my-bot")
bot = registry.get_cached("bot1", create_bot)
Source code in packages/common/src/dataknobs_common/registry.py
def get_cached(
    self,
    key: str,
    factory: Callable[[], T],
    force_refresh: bool = False,
) -> T:
    """Get item from cache with automatic refresh.

    If item exists in cache and is not expired, returns cached version.
    Otherwise, calls factory to create new item and caches it.

    Args:
        key: Cache key
        factory: Callable that creates the item if not cached
        force_refresh: Force refresh even if cached

    Returns:
        Cached or newly created item

    Example:
        ```python
        def create_bot():
            return Bot("my-bot")
        bot = registry.get_cached("bot1", create_bot)
        ```
    """
    with self._lock:
        # Check cache
        if not force_refresh and key in self._cache:
            item, cached_at = self._cache[key]
            if time.time() - cached_at < self._cache_ttl:
                self._cache_hits += 1
                return item

        # Cache miss - create new item
        self._cache_misses += 1
        item = factory()
        self._cache[key] = (item, time.time())

        # Evict if cache too large
        if len(self._cache) > self._max_cache_size:
            self._evict_oldest()

        return item
invalidate_cache
invalidate_cache(key: str | None = None) -> None

Invalidate cache for a key or all keys.

Parameters:

Name Type Description Default
key str | None

Specific key to invalidate, or None to invalidate all

None
Example
registry.invalidate_cache("bot1")  # Invalidate one
registry.invalidate_cache()  # Invalidate all
Source code in packages/common/src/dataknobs_common/registry.py
def invalidate_cache(self, key: str | None = None) -> None:
    """Invalidate cache for a key or all keys.

    Args:
        key: Specific key to invalidate, or None to invalidate all

    Example:
        ```python
        registry.invalidate_cache("bot1")  # Invalidate one
        registry.invalidate_cache()  # Invalidate all
        ```
    """
    with self._lock:
        if key:
            if key in self._cache:
                del self._cache[key]
        else:
            self._cache.clear()
get_cache_stats
get_cache_stats() -> Dict[str, Any]

Get cache statistics.

Returns:

Type Description
Dict[str, Any]

Dictionary with cache statistics

Example
stats = registry.get_cache_stats()
print(f"Hit rate: {stats['hit_rate']:.2%}")
Source code in packages/common/src/dataknobs_common/registry.py
def get_cache_stats(self) -> Dict[str, Any]:
    """Get cache statistics.

    Returns:
        Dictionary with cache statistics

    Example:
        ```python
        stats = registry.get_cache_stats()
        print(f"Hit rate: {stats['hit_rate']:.2%}")
        ```
    """
    with self._lock:
        total = self._cache_hits + self._cache_misses
        hit_rate = self._cache_hits / total if total > 0 else 0.0

        return {
            "size": len(self._cache),
            "max_size": self._max_cache_size,
            "ttl_seconds": self._cache_ttl,
            "hits": self._cache_hits,
            "misses": self._cache_misses,
            "total_requests": total,
            "hit_rate": hit_rate,
        }

PluginRegistry

PluginRegistry(
    name: str,
    default_factory: type[T] | Callable[..., T] | None = None,
    validate_type: type | None = None,
)

Bases: Generic[T]

Registry for plugins with factory support and defaults.

A specialized registry pattern for managing plugins (adapters, handlers, providers, etc.) that supports: - Class or factory function registration - Lazy instantiation with configuration - Default fallback when plugin not found - Instance caching - Type validation

This pattern is useful when you need to: - Register different implementations of an interface - Create instances on-demand with configuration - Provide graceful fallbacks for unregistered keys

Parameters:

Name Type Description Default
name str

Registry name

required
default_factory type[T] | Callable[..., T] | None

Default factory to use when key not found

None
Example
from dataknobs_common.registry import PluginRegistry

# Define base class
class Handler:
    def __init__(self, name: str, config: dict):
        self.name = name
        self.config = config

class DefaultHandler(Handler):
    pass

class CustomHandler(Handler):
    pass

# Create registry with default
registry = PluginRegistry[Handler]("handlers", default_factory=DefaultHandler)

# Register plugins
registry.register("custom", CustomHandler)

# Get instances
handler = registry.get("custom", config={"timeout": 30})
default = registry.get("unknown", config={})  # Uses default
With async factories
async def create_async_handler(name, config):
    handler = AsyncHandler(name, config)
    await handler.initialize()
    return handler

registry.register("async", create_async_handler)
handler = await registry.get_async("async", config={"url": "..."})

Initialize plugin registry.

Parameters:

Name Type Description Default
name str

Registry name for identification

required
default_factory type[T] | Callable[..., T] | None

Default class or factory to use when key not found

None
validate_type type | None

Optional base type to validate registrations against

None

Methods:

Name Description
register

Register a plugin class or factory.

unregister

Unregister a plugin.

is_registered

Check if a plugin is registered.

get

Get a plugin instance.

get_async

Get a plugin instance, supporting async factories.

list_keys

List all registered plugin keys.

clear_cache

Clear cached instances.

get_factory

Get the registered factory for a key.

set_default_factory

Set the default factory.

bulk_register

Register multiple plugins at once.

copy

Get a copy of all registered factories.

__len__

Get number of registered plugins.

__contains__

Check if plugin is registered using 'in' operator.

__repr__

Get string representation.

Attributes:

Name Type Description
name str

Get registry name.

cached_instances Dict[str, T]

Get the dictionary of cached instances.

Source code in packages/common/src/dataknobs_common/registry.py
def __init__(
    self,
    name: str,
    default_factory: type[T] | Callable[..., T] | None = None,
    validate_type: type | None = None,
):
    """Initialize plugin registry.

    Args:
        name: Registry name for identification
        default_factory: Default class or factory to use when key not found
        validate_type: Optional base type to validate registrations against
    """
    self._name = name
    self._factories: Dict[str, type[T] | Callable[..., T]] = {}
    self._instances: Dict[str, T] = {}
    self._lock = threading.RLock()
    self._default_factory = default_factory
    self._validate_type = validate_type
Attributes
name property
name: str

Get registry name.

cached_instances property
cached_instances: Dict[str, T]

Get the dictionary of cached instances.

Returns:

Type Description
Dict[str, T]

Dictionary mapping keys to cached instances

Note

This returns the internal cache dictionary. Modifications will affect the cache directly.

Functions
register
register(
    key: str, factory: type[T] | Callable[..., T], override: bool = False
) -> None

Register a plugin class or factory.

Parameters:

Name Type Description Default
key str

Unique identifier for the plugin

required
factory type[T] | Callable[..., T]

Plugin class or factory function that creates instances

required
override bool

If True, allow overriding existing registration

False

Raises:

Type Description
OperationError

If key already registered and override=False

TypeError

If factory doesn't match validate_type

Example
# Register a class
registry.register("handler1", MyHandler)

# Register a factory function
registry.register("handler2", lambda name, config: create_handler(name, config))
Source code in packages/common/src/dataknobs_common/registry.py
def register(
    self,
    key: str,
    factory: type[T] | Callable[..., T],
    override: bool = False,
) -> None:
    """Register a plugin class or factory.

    Args:
        key: Unique identifier for the plugin
        factory: Plugin class or factory function that creates instances
        override: If True, allow overriding existing registration

    Raises:
        OperationError: If key already registered and override=False
        TypeError: If factory doesn't match validate_type

    Example:
        ```python
        # Register a class
        registry.register("handler1", MyHandler)

        # Register a factory function
        registry.register("handler2", lambda name, config: create_handler(name, config))
        ```
    """
    with self._lock:
        # Check for existing registration
        if not override and key in self._factories:
            raise OperationError(
                f"Plugin '{key}' already registered in {self._name}. "
                f"Use override=True to replace.",
                context={"key": key, "registry": self._name},
            )

        # Validate type if specified
        if self._validate_type and isinstance(factory, type):
            if not issubclass(factory, self._validate_type):
                raise TypeError(
                    f"Factory class must be a subclass of {self._validate_type.__name__}, "
                    f"got {factory.__name__}"
                )
        elif not callable(factory):
            raise TypeError(
                f"Factory must be a class or callable, got {type(factory).__name__}"
            )

        # Register
        self._factories[key] = factory

        # Clear cached instance if overriding
        if key in self._instances:
            del self._instances[key]
unregister
unregister(key: str) -> None

Unregister a plugin.

Parameters:

Name Type Description Default
key str

Key to unregister

required

Raises:

Type Description
NotFoundError

If key not registered

Source code in packages/common/src/dataknobs_common/registry.py
def unregister(self, key: str) -> None:
    """Unregister a plugin.

    Args:
        key: Key to unregister

    Raises:
        NotFoundError: If key not registered
    """
    with self._lock:
        if key not in self._factories:
            raise NotFoundError(
                f"Plugin not found: {key}",
                context={"key": key, "registry": self._name},
            )

        del self._factories[key]

        # Clear cached instance
        if key in self._instances:
            del self._instances[key]
is_registered
is_registered(key: str) -> bool

Check if a plugin is registered.

Parameters:

Name Type Description Default
key str

Key to check

required

Returns:

Type Description
bool

True if registered

Source code in packages/common/src/dataknobs_common/registry.py
def is_registered(self, key: str) -> bool:
    """Check if a plugin is registered.

    Args:
        key: Key to check

    Returns:
        True if registered
    """
    with self._lock:
        return key in self._factories
get
get(
    key: str,
    config: Dict[str, Any] | None = None,
    use_cache: bool = True,
    use_default: bool = True,
) -> T

Get a plugin instance.

Creates instance if not cached, using the registered factory.

Parameters:

Name Type Description Default
key str

Plugin identifier

required
config Dict[str, Any] | None

Configuration dictionary passed to factory

None
use_cache bool

Return cached instance if available

True
use_default bool

Use default factory if key not registered

True

Returns:

Type Description
T

Plugin instance

Raises:

Type Description
NotFoundError

If key not registered and use_default=False

Example
handler = registry.get("custom", config={"timeout": 30})
Source code in packages/common/src/dataknobs_common/registry.py
def get(
    self,
    key: str,
    config: Dict[str, Any] | None = None,
    use_cache: bool = True,
    use_default: bool = True,
) -> T:
    """Get a plugin instance.

    Creates instance if not cached, using the registered factory.

    Args:
        key: Plugin identifier
        config: Configuration dictionary passed to factory
        use_cache: Return cached instance if available
        use_default: Use default factory if key not registered

    Returns:
        Plugin instance

    Raises:
        NotFoundError: If key not registered and use_default=False

    Example:
        ```python
        handler = registry.get("custom", config={"timeout": 30})
        ```
    """
    with self._lock:
        # Check cache
        if use_cache and key in self._instances:
            return self._instances[key]

        # Get factory
        if key in self._factories:
            factory = self._factories[key]
        elif use_default and self._default_factory:
            factory = self._default_factory
        else:
            raise NotFoundError(
                f"Plugin '{key}' not registered and no default available",
                context={
                    "key": key,
                    "registry": self._name,
                    "available": list(self._factories.keys()),
                },
            )

        # Create instance
        try:
            if isinstance(factory, type):
                instance = factory(key, config or {})
            else:
                instance = factory(key, config or {})

            # Validate instance type if specified
            if self._validate_type and not isinstance(instance, self._validate_type):
                raise TypeError(
                    f"Factory must return a {self._validate_type.__name__} instance, "
                    f"got {type(instance).__name__}"
                )

        except Exception as e:
            raise OperationError(
                f"Failed to create plugin '{key}': {e}",
                context={"key": key, "registry": self._name},
            ) from e

        # Cache instance
        if use_cache:
            self._instances[key] = instance

        return instance
get_async async
get_async(
    key: str,
    config: Dict[str, Any] | None = None,
    use_cache: bool = True,
    use_default: bool = True,
) -> T

Get a plugin instance, supporting async factories.

Like get() but awaits the factory if it's a coroutine function.

Parameters:

Name Type Description Default
key str

Plugin identifier

required
config Dict[str, Any] | None

Configuration dictionary

None
use_cache bool

Return cached instance if available

True
use_default bool

Use default factory if key not registered

True

Returns:

Type Description
T

Plugin instance

Example
handler = await registry.get_async("async-handler", config={"url": "..."})
Source code in packages/common/src/dataknobs_common/registry.py
async def get_async(
    self,
    key: str,
    config: Dict[str, Any] | None = None,
    use_cache: bool = True,
    use_default: bool = True,
) -> T:
    """Get a plugin instance, supporting async factories.

    Like get() but awaits the factory if it's a coroutine function.

    Args:
        key: Plugin identifier
        config: Configuration dictionary
        use_cache: Return cached instance if available
        use_default: Use default factory if key not registered

    Returns:
        Plugin instance

    Example:
        ```python
        handler = await registry.get_async("async-handler", config={"url": "..."})
        ```
    """
    with self._lock:
        # Check cache
        if use_cache and key in self._instances:
            return self._instances[key]

        # Get factory
        if key in self._factories:
            factory = self._factories[key]
        elif use_default and self._default_factory:
            factory = self._default_factory
        else:
            raise NotFoundError(
                f"Plugin '{key}' not registered and no default available",
                context={
                    "key": key,
                    "registry": self._name,
                    "available": list(self._factories.keys()),
                },
            )

    # Create instance (outside lock for async)
    try:
        if isinstance(factory, type):
            instance = factory(key, config or {})
        else:
            result = factory(key, config or {})
            # Await if coroutine
            if asyncio.iscoroutine(result):
                instance = await result
            else:
                instance = result

        # Validate instance type
        if self._validate_type and not isinstance(instance, self._validate_type):
            raise TypeError(
                f"Factory must return a {self._validate_type.__name__} instance, "
                f"got {type(instance).__name__}"
            )

    except Exception as e:
        raise OperationError(
            f"Failed to create plugin '{key}': {e}",
            context={"key": key, "registry": self._name},
        ) from e

    # Cache instance
    with self._lock:
        if use_cache:
            self._instances[key] = instance

    return instance
list_keys
list_keys() -> List[str]

List all registered plugin keys.

Returns:

Type Description
List[str]

List of registered keys

Source code in packages/common/src/dataknobs_common/registry.py
def list_keys(self) -> List[str]:
    """List all registered plugin keys.

    Returns:
        List of registered keys
    """
    with self._lock:
        return list(self._factories.keys())
clear_cache
clear_cache(key: str | None = None) -> None

Clear cached instances.

Parameters:

Name Type Description Default
key str | None

Specific key to clear, or None for all

None
Source code in packages/common/src/dataknobs_common/registry.py
def clear_cache(self, key: str | None = None) -> None:
    """Clear cached instances.

    Args:
        key: Specific key to clear, or None for all
    """
    with self._lock:
        if key:
            if key in self._instances:
                del self._instances[key]
        else:
            self._instances.clear()
get_factory
get_factory(key: str) -> type[T] | Callable[..., T] | None

Get the registered factory for a key.

Parameters:

Name Type Description Default
key str

Plugin identifier

required

Returns:

Type Description
type[T] | Callable[..., T] | None

Factory class or function, or None if not registered

Source code in packages/common/src/dataknobs_common/registry.py
def get_factory(self, key: str) -> type[T] | Callable[..., T] | None:
    """Get the registered factory for a key.

    Args:
        key: Plugin identifier

    Returns:
        Factory class or function, or None if not registered
    """
    with self._lock:
        return self._factories.get(key)
set_default_factory
set_default_factory(factory: type[T] | Callable[..., T]) -> None

Set the default factory.

Parameters:

Name Type Description Default
factory type[T] | Callable[..., T]

New default factory

required

Raises:

Type Description
TypeError

If factory doesn't match validate_type

Source code in packages/common/src/dataknobs_common/registry.py
def set_default_factory(self, factory: type[T] | Callable[..., T]) -> None:
    """Set the default factory.

    Args:
        factory: New default factory

    Raises:
        TypeError: If factory doesn't match validate_type
    """
    if self._validate_type and isinstance(factory, type):
        if not issubclass(factory, self._validate_type):
            raise TypeError(
                f"Default factory must be a subclass of {self._validate_type.__name__}"
            )

    self._default_factory = factory
bulk_register
bulk_register(
    factories: Dict[str, type[T] | Callable[..., T]], override: bool = False
) -> None

Register multiple plugins at once.

Parameters:

Name Type Description Default
factories Dict[str, type[T] | Callable[..., T]]

Dictionary mapping keys to factories

required
override bool

Allow overriding existing registrations

False
Example
registry.bulk_register({
    "handler1": Handler1,
    "handler2": Handler2,
})
Source code in packages/common/src/dataknobs_common/registry.py
def bulk_register(
    self,
    factories: Dict[str, type[T] | Callable[..., T]],
    override: bool = False,
) -> None:
    """Register multiple plugins at once.

    Args:
        factories: Dictionary mapping keys to factories
        override: Allow overriding existing registrations

    Example:
        ```python
        registry.bulk_register({
            "handler1": Handler1,
            "handler2": Handler2,
        })
        ```
    """
    for key, factory in factories.items():
        self.register(key, factory, override=override)
copy
copy() -> Dict[str, type[T] | Callable[..., T]]

Get a copy of all registered factories.

Returns:

Type Description
Dict[str, type[T] | Callable[..., T]]

Dictionary of key to factory mappings

Source code in packages/common/src/dataknobs_common/registry.py
def copy(self) -> Dict[str, type[T] | Callable[..., T]]:
    """Get a copy of all registered factories.

    Returns:
        Dictionary of key to factory mappings
    """
    with self._lock:
        return dict(self._factories)
__len__
__len__() -> int

Get number of registered plugins.

Source code in packages/common/src/dataknobs_common/registry.py
def __len__(self) -> int:
    """Get number of registered plugins."""
    return len(self._factories)
__contains__
__contains__(key: str) -> bool

Check if plugin is registered using 'in' operator.

Source code in packages/common/src/dataknobs_common/registry.py
def __contains__(self, key: str) -> bool:
    """Check if plugin is registered using 'in' operator."""
    return self.is_registered(key)
__repr__
__repr__() -> str

Get string representation.

Source code in packages/common/src/dataknobs_common/registry.py
def __repr__(self) -> str:
    """Get string representation."""
    return (
        f"PluginRegistry("
        f"name='{self._name}', "
        f"plugins={len(self._factories)}, "
        f"cached={len(self._instances)}"
        f")"
    )

Registry

Registry(name: str, enable_metrics: bool = False)

Bases: Generic[T]

Base registry for managing named items with optional metrics.

This is a thread-safe registry that manages a collection of items by unique keys. It provides core operations for registration, lookup, and enumeration.

The registry is generic, so you can specify the type of items it manages for better type safety.

Attributes:

Name Type Description
name str

Name of the registry (for logging/debugging)

Parameters:

Name Type Description Default
name str

Name for this registry instance

required
enable_metrics bool

Whether to track registration metrics

False
Example
registry = Registry[str]("my_registry")
registry.register("key1", "value1")
registry.get("key1")
# 'value1'
registry.count()
# 1

Initialize the registry.

Parameters:

Name Type Description Default
name str

Registry name for identification

required
enable_metrics bool

Enable metrics tracking

False

Methods:

Name Description
register

Register an item by key.

unregister

Unregister and return an item by key.

get

Get an item by key.

get_optional

Get an item by key, returning None if not found.

has

Check if item exists.

list_keys

List all registered keys.

list_items

List all registered items.

items

Get all key-item pairs.

count

Get count of registered items.

clear

Clear all items from registry.

get_metrics

Get registration metrics.

__len__

Get number of registered items using len().

__contains__

Check if item exists using 'in' operator.

__iter__

Iterate over registered items.

Source code in packages/common/src/dataknobs_common/registry.py
def __init__(self, name: str, enable_metrics: bool = False):
    """Initialize the registry.

    Args:
        name: Registry name for identification
        enable_metrics: Enable metrics tracking
    """
    self._name = name
    self._items: Dict[str, T] = {}
    self._lock = threading.RLock()
    self._metrics: Dict[str, Dict[str, Any]] | None = {} if enable_metrics else None
Attributes
name property
name: str

Get registry name.

Functions
register
register(
    key: str,
    item: T,
    metadata: Dict[str, Any] | None = None,
    allow_overwrite: bool = False,
) -> None

Register an item by key.

Parameters:

Name Type Description Default
key str

Unique identifier for the item

required
item T

Item to register

required
metadata Dict[str, Any] | None

Optional metadata about the item

None
allow_overwrite bool

Whether to allow overwriting existing items

False

Raises:

Type Description
OperationError

If item already exists and allow_overwrite is False

Example
registry.register("tool1", my_tool, metadata={"version": "1.0"})
Source code in packages/common/src/dataknobs_common/registry.py
def register(
    self,
    key: str,
    item: T,
    metadata: Dict[str, Any] | None = None,
    allow_overwrite: bool = False,
) -> None:
    """Register an item by key.

    Args:
        key: Unique identifier for the item
        item: Item to register
        metadata: Optional metadata about the item
        allow_overwrite: Whether to allow overwriting existing items

    Raises:
        OperationError: If item already exists and allow_overwrite is False

    Example:
        ```python
        registry.register("tool1", my_tool, metadata={"version": "1.0"})
        ```
    """
    with self._lock:
        if not allow_overwrite and key in self._items:
            raise OperationError(
                f"Item '{key}' already registered in {self._name}",
                context={"key": key, "registry": self._name},
            )

        self._items[key] = item

        if self._metrics is not None:
            self._metrics[key] = {
                "registered_at": time.time(),
                "metadata": metadata or {},
            }
unregister
unregister(key: str) -> T

Unregister and return an item by key.

Parameters:

Name Type Description Default
key str

Key of item to unregister

required

Returns:

Type Description
T

The unregistered item

Raises:

Type Description
NotFoundError

If item not found

Example
item = registry.unregister("tool1")
Source code in packages/common/src/dataknobs_common/registry.py
def unregister(self, key: str) -> T:
    """Unregister and return an item by key.

    Args:
        key: Key of item to unregister

    Returns:
        The unregistered item

    Raises:
        NotFoundError: If item not found

    Example:
        ```python
        item = registry.unregister("tool1")
        ```
    """
    with self._lock:
        if key not in self._items:
            raise NotFoundError(
                f"Item not found: {key}",
                context={"key": key, "registry": self._name},
            )

        item = self._items.pop(key)

        if self._metrics is not None and key in self._metrics:
            del self._metrics[key]

        return item
get
get(key: str) -> T

Get an item by key.

Parameters:

Name Type Description Default
key str

Key of item to retrieve

required

Returns:

Type Description
T

The registered item

Raises:

Type Description
NotFoundError

If item not found

Example
item = registry.get("tool1")
Source code in packages/common/src/dataknobs_common/registry.py
def get(self, key: str) -> T:
    """Get an item by key.

    Args:
        key: Key of item to retrieve

    Returns:
        The registered item

    Raises:
        NotFoundError: If item not found

    Example:
        ```python
        item = registry.get("tool1")
        ```
    """
    with self._lock:
        if key not in self._items:
            raise NotFoundError(
                f"Item not found: {key}",
                context={"key": key, "registry": self._name, "available_keys": list(self._items.keys())},
            )
        return self._items[key]
get_optional
get_optional(key: str) -> T | None

Get an item by key, returning None if not found.

Parameters:

Name Type Description Default
key str

Key of item to retrieve

required

Returns:

Type Description
T | None

The registered item or None

Example
item = registry.get_optional("tool1")
if item is None:
    print("Not found")
Source code in packages/common/src/dataknobs_common/registry.py
def get_optional(self, key: str) -> T | None:
    """Get an item by key, returning None if not found.

    Args:
        key: Key of item to retrieve

    Returns:
        The registered item or None

    Example:
        ```python
        item = registry.get_optional("tool1")
        if item is None:
            print("Not found")
        ```
    """
    with self._lock:
        return self._items.get(key)
has
has(key: str) -> bool

Check if item exists.

Parameters:

Name Type Description Default
key str

Key to check

required

Returns:

Type Description
bool

True if item exists

Example
if registry.has("tool1"):
    print("Found")
Source code in packages/common/src/dataknobs_common/registry.py
def has(self, key: str) -> bool:
    """Check if item exists.

    Args:
        key: Key to check

    Returns:
        True if item exists

    Example:
        ```python
        if registry.has("tool1"):
            print("Found")
        ```
    """
    with self._lock:
        return key in self._items
list_keys
list_keys() -> List[str]

List all registered keys.

Returns:

Type Description
List[str]

List of registered keys

Example
keys = registry.list_keys()
print(keys)
# ['tool1', 'tool2']
Source code in packages/common/src/dataknobs_common/registry.py
def list_keys(self) -> List[str]:
    """List all registered keys.

    Returns:
        List of registered keys

    Example:
        ```python
        keys = registry.list_keys()
        print(keys)
        # ['tool1', 'tool2']
        ```
    """
    with self._lock:
        return list(self._items.keys())
list_items
list_items() -> List[T]

List all registered items.

Returns:

Type Description
List[T]

List of registered items

Example
items = registry.list_items()
for item in items:
    print(item)
Source code in packages/common/src/dataknobs_common/registry.py
def list_items(self) -> List[T]:
    """List all registered items.

    Returns:
        List of registered items

    Example:
        ```python
        items = registry.list_items()
        for item in items:
            print(item)
        ```
    """
    with self._lock:
        return list(self._items.values())
items
items() -> List[tuple[str, T]]

Get all key-item pairs.

Returns:

Type Description
List[tuple[str, T]]

List of (key, item) tuples

Example
for key, item in registry.items():
    print(f"{key}: {item}")
Source code in packages/common/src/dataknobs_common/registry.py
def items(self) -> List[tuple[str, T]]:
    """Get all key-item pairs.

    Returns:
        List of (key, item) tuples

    Example:
        ```python
        for key, item in registry.items():
            print(f"{key}: {item}")
        ```
    """
    with self._lock:
        return list(self._items.items())
count
count() -> int

Get count of registered items.

Returns:

Type Description
int

Number of items in registry

Example
count = registry.count()
print(f"Registry has {count} items")
Source code in packages/common/src/dataknobs_common/registry.py
def count(self) -> int:
    """Get count of registered items.

    Returns:
        Number of items in registry

    Example:
        ```python
        count = registry.count()
        print(f"Registry has {count} items")
        ```
    """
    with self._lock:
        return len(self._items)
clear
clear() -> None

Clear all items from registry.

Example
registry.clear()
registry.count()
# 0
Source code in packages/common/src/dataknobs_common/registry.py
def clear(self) -> None:
    """Clear all items from registry.

    Example:
        ```python
        registry.clear()
        registry.count()
        # 0
        ```
    """
    with self._lock:
        self._items.clear()
        if self._metrics is not None:
            self._metrics.clear()
get_metrics
get_metrics(key: str | None = None) -> Dict[str, Any]

Get registration metrics.

Parameters:

Name Type Description Default
key str | None

Optional specific key to get metrics for

None

Returns:

Type Description
Dict[str, Any]

Metrics dictionary

Example
metrics = registry.get_metrics()
print(metrics)
# {'tool1': {'registered_at': 1699456789.0, 'metadata': {}}}
Source code in packages/common/src/dataknobs_common/registry.py
def get_metrics(self, key: str | None = None) -> Dict[str, Any]:
    """Get registration metrics.

    Args:
        key: Optional specific key to get metrics for

    Returns:
        Metrics dictionary

    Example:
        ```python
        metrics = registry.get_metrics()
        print(metrics)
        # {'tool1': {'registered_at': 1699456789.0, 'metadata': {}}}
        ```
    """
    with self._lock:
        if self._metrics is None:
            return {}

        if key:
            return self._metrics.get(key, {})

        return dict(self._metrics)
__len__
__len__() -> int

Get number of registered items using len().

Source code in packages/common/src/dataknobs_common/registry.py
def __len__(self) -> int:
    """Get number of registered items using len()."""
    return self.count()
__contains__
__contains__(key: str) -> bool

Check if item exists using 'in' operator.

Source code in packages/common/src/dataknobs_common/registry.py
def __contains__(self, key: str) -> bool:
    """Check if item exists using 'in' operator."""
    return self.has(key)
__iter__
__iter__()

Iterate over registered items.

Source code in packages/common/src/dataknobs_common/registry.py
def __iter__(self):
    """Iterate over registered items."""
    return iter(self.list_items())

Serializable

Bases: Protocol

Protocol for objects that can be serialized to/from dict.

Implement this protocol by providing to_dict() and from_dict() methods. The @runtime_checkable decorator allows isinstance() checks at runtime.

Methods:

Name Description
to_dict

Convert object to dictionary representation

from_dict

Create object from dictionary representation

Example
class MyClass:
    def __init__(self, value: str):
        self.value = value

    def to_dict(self) -> dict:
        return {"value": self.value}

    @classmethod
    def from_dict(cls, data: dict) -> "MyClass":
        return cls(data["value"])

obj = MyClass("test")
isinstance(obj, Serializable)
# True
Functions
to_dict
to_dict() -> Dict[str, Any]

Convert object to dictionary representation.

Returns:

Type Description
Dict[str, Any]

Dictionary with serialized data

Raises:

Type Description
SerializationError

If serialization fails

Source code in packages/common/src/dataknobs_common/serialization.py
def to_dict(self) -> Dict[str, Any]:
    """Convert object to dictionary representation.

    Returns:
        Dictionary with serialized data

    Raises:
        SerializationError: If serialization fails
    """
    ...
from_dict classmethod
from_dict(data: Dict[str, Any]) -> T

Create object from dictionary representation.

Parameters:

Name Type Description Default
data Dict[str, Any]

Dictionary with serialized data

required

Returns:

Type Description
T

Deserialized object instance

Raises:

Type Description
SerializationError

If deserialization fails

Source code in packages/common/src/dataknobs_common/serialization.py
@classmethod
def from_dict(cls: Type[T], data: Dict[str, Any]) -> T:
    """Create object from dictionary representation.

    Args:
        data: Dictionary with serialized data

    Returns:
        Deserialized object instance

    Raises:
        SerializationError: If deserialization fails
    """
    ...

Functions

safe_eval

safe_eval(
    code: str,
    scope: dict[str, Any] | None = None,
    *,
    coerce_bool: bool = False,
    restrict_builtins: bool = True,
    default: Any = None,
) -> ExpressionResult

Evaluate a Python expression string safely.

Wraps the expression in a function body, executes with restricted globals, and returns the native result. This is the shared core used by wizard conditions and derivation expressions.

Security model (when restrict_builtins=True):

  1. __builtins__ restricted to SAFE_BUILTINS (blocks exec, eval, __import__, open, etc.)
  2. AST validation blocks dunder attribute access (prevents MRO traversal via __class__.__bases__.__subclasses__)

Parameters:

Name Type Description Default
code str

Python expression string. If it does not start with return, one is prepended automatically.

required
scope dict[str, Any] | None

Variables available in the expression. Merged on top of SAFE_BUILTINS and YAML_ALIASES. Callers provide context-specific variables here (e.g., data, value, has(), bank). Scope variables with the same name as YAML aliases override the alias.

None
coerce_bool bool

If True, coerce the result to bool (for condition evaluation). If False, return native type.

False
restrict_builtins bool

If True (default), set __builtins__ to SAFE_BUILTINS and validate AST for unsafe access, blocking exec, eval, __import__, open, and MRO traversal. If False, use Python's default builtins and skip AST validation (for trusted code only).

True
default Any

Value to return on evaluation failure. Defaults to None. For condition evaluation, callers typically pass default=False.

None

Returns:

Type Description
ExpressionResult

ExpressionResult with the evaluated value and success status.

Source code in packages/common/src/dataknobs_common/expressions.py
def safe_eval(
    code: str,
    scope: dict[str, Any] | None = None,
    *,
    coerce_bool: bool = False,
    restrict_builtins: bool = True,
    default: Any = None,
) -> ExpressionResult:
    """Evaluate a Python expression string safely.

    Wraps the expression in a function body, executes with restricted
    globals, and returns the native result.  This is the shared core
    used by wizard conditions and derivation expressions.

    Security model (when ``restrict_builtins=True``):

    1. ``__builtins__`` restricted to ``SAFE_BUILTINS`` (blocks
       ``exec``, ``eval``, ``__import__``, ``open``, etc.)
    2. AST validation blocks dunder attribute access (prevents
       MRO traversal via ``__class__.__bases__.__subclasses__``)

    Args:
        code: Python expression string.  If it does not start with
            ``return``, one is prepended automatically.
        scope: Variables available in the expression.  Merged on top
            of ``SAFE_BUILTINS`` and ``YAML_ALIASES``.  Callers
            provide context-specific variables here (e.g., ``data``,
            ``value``, ``has()``, ``bank``).  Scope variables with
            the same name as YAML aliases override the alias.
        coerce_bool: If True, coerce the result to ``bool`` (for
            condition evaluation).  If False, return native type.
        restrict_builtins: If True (default), set ``__builtins__``
            to ``SAFE_BUILTINS`` and validate AST for unsafe access,
            blocking ``exec``, ``eval``, ``__import__``, ``open``,
            and MRO traversal.  If False, use Python's default
            builtins and skip AST validation (for trusted code only).
        default: Value to return on evaluation failure.  Defaults
            to ``None``.  For condition evaluation, callers typically
            pass ``default=False``.

    Returns:
        ExpressionResult with the evaluated value and success status.
    """
    try:
        stripped = code.strip()
        if not stripped:
            return ExpressionResult(
                value=default,
                success=False,
                error="Empty expression",
            )

        # Reject multiline expressions — config-authored expressions
        # should be single-line.  Multiline strings could inject
        # module-scope code past the function wrapper.
        if "\n" in stripped:
            return ExpressionResult(
                value=default,
                success=False,
                error="Multiline expressions are not allowed",
            )

        if not stripped.startswith("return"):
            stripped = f"return {stripped}"

        if not restrict_builtins:
            logger.warning(
                "safe_eval called with restrict_builtins=False — "
                "full Python builtins available (trusted code only)"
            )

        # AST validation: block dunder access when builtins restricted
        if restrict_builtins:
            exec_code = f"def _fn():\n    {stripped}\n_result = _fn()"
            ast_error = _validate_ast(exec_code)
            if ast_error:
                return ExpressionResult(
                    value=default,
                    success=False,
                    error=ast_error,
                )

        global_vars: dict[str, Any] = {}
        if restrict_builtins:
            global_vars["__builtins__"] = SAFE_BUILTINS
        global_vars.update(YAML_ALIASES)
        if scope:
            global_vars.update(scope)

        local_vars: dict[str, Any] = {}
        exec_code = f"def _fn():\n    {stripped}\n_result = _fn()"
        exec(exec_code, global_vars, local_vars)  # nosec B102

        result = local_vars.get("_result", default)
        if coerce_bool:
            result = bool(result)

        return ExpressionResult(value=result, success=True)

    except Exception as e:
        return ExpressionResult(
            value=default,
            success=False,
            error=str(e),
        )

safe_eval_value

safe_eval_value(
    code: str, scope: dict[str, Any] | None = None, **kwargs: Any
) -> Any

Convenience wrapper returning just the value.

Same as safe_eval(...).value. Suitable for call sites that only need the result and handle errors via the default value.

Source code in packages/common/src/dataknobs_common/expressions.py
def safe_eval_value(
    code: str,
    scope: dict[str, Any] | None = None,
    **kwargs: Any,
) -> Any:
    """Convenience wrapper returning just the value.

    Same as ``safe_eval(...).value``.  Suitable for call sites that
    only need the result and handle errors via the default value.
    """
    return safe_eval(code, scope, **kwargs).value

create_event_bus

create_event_bus(config: dict[str, Any]) -> EventBus

Create an event bus from configuration.

Factory function that creates the appropriate EventBus implementation based on the 'backend' key in the config.

Parameters:

Name Type Description Default
config dict[str, Any]

Configuration dict with 'backend' key and backend-specific options

required

Returns:

Type Description
EventBus

EventBus instance

Raises:

Type Description
ValueError

If backend type is not recognized

Example
# Memory backend (default)
bus = create_event_bus({"backend": "memory"})

# Postgres backend
bus = create_event_bus({
    "backend": "postgres",
    "connection_string": "postgresql://user:pass@host/db"
})

# Redis backend
bus = create_event_bus({
    "backend": "redis",
    "host": "localhost",
    "port": 6379
})
Source code in packages/common/src/dataknobs_common/events/bus.py
def create_event_bus(config: dict[str, Any]) -> EventBus:
    """Create an event bus from configuration.

    Factory function that creates the appropriate EventBus implementation
    based on the 'backend' key in the config.

    Args:
        config: Configuration dict with 'backend' key and backend-specific options

    Returns:
        EventBus instance

    Raises:
        ValueError: If backend type is not recognized

    Example:
        ```python
        # Memory backend (default)
        bus = create_event_bus({"backend": "memory"})

        # Postgres backend
        bus = create_event_bus({
            "backend": "postgres",
            "connection_string": "postgresql://user:pass@host/db"
        })

        # Redis backend
        bus = create_event_bus({
            "backend": "redis",
            "host": "localhost",
            "port": 6379
        })
        ```
    """
    # Import here to avoid circular imports
    from .memory import InMemoryEventBus

    backend = config.get("backend", "memory")

    if backend == "memory":
        return InMemoryEventBus()
    elif backend == "postgres":
        from .postgres import PostgresEventBus

        return PostgresEventBus(
            connection_string=config.get("connection_string", ""),
            channel_prefix=config.get("channel_prefix", "events"),
        )
    elif backend == "redis":
        from .redis import RedisEventBus

        return RedisEventBus(
            host=config.get("host", "localhost"),
            port=config.get("port", 6379),
            password=config.get("password"),
            ssl=config.get("ssl", False),
            channel_prefix=config.get("channel_prefix", "events"),
        )
    else:
        raise ValueError(
            f"Unknown event bus backend: {backend}. "
            f"Available backends: memory, postgres, redis"
        )

create_rate_limiter

create_rate_limiter(config: dict[str, Any]) -> RateLimiter

Create a rate limiter from configuration.

Factory function that creates the appropriate RateLimiter implementation based on the backend key in the config.

Parameters:

Name Type Description Default
config dict[str, Any]

Configuration dict. See below for keys.

required

Returns:

Type Description
RateLimiter

RateLimiter instance.

Raises:

Type Description
ValueError

If the backend is not recognized or required config is missing.

Config keys

backend: "memory" (default) or "pyrate". rates: Shorthand for default_rates. default_rates: List of {"limit": int, "interval": float} dicts. categories: Dict mapping category names to {"rates": [{"limit": ..., "interval": ...}]}. bucket: Pyrate bucket backend ("memory", "sqlite", "redis", "postgres"). redis: Redis connection config (for pyrate redis bucket). postgres: Postgres config (for pyrate postgres bucket). sqlite: SQLite config (for pyrate sqlite bucket).

Example
# Simple in-memory limiter
limiter = create_rate_limiter({
    "rates": [{"limit": 10, "interval": 60}],
})

# Per-category with pyrate + Redis
limiter = create_rate_limiter({
    "backend": "pyrate",
    "bucket": "redis",
    "default_rates": [{"limit": 100, "interval": 60}],
    "categories": {
        "api_write": {"rates": [{"limit": 10, "interval": 60}]},
    },
    "redis": {"url": "redis://localhost:6379"},
})
Source code in packages/common/src/dataknobs_common/ratelimit/limiter.py
def create_rate_limiter(config: dict[str, Any]) -> RateLimiter:
    """Create a rate limiter from configuration.

    Factory function that creates the appropriate RateLimiter implementation
    based on the ``backend`` key in the config.

    Args:
        config: Configuration dict. See below for keys.

    Returns:
        RateLimiter instance.

    Raises:
        ValueError: If the backend is not recognized or required config
            is missing.

    Config keys:
        backend: ``"memory"`` (default) or ``"pyrate"``.
        rates: Shorthand for ``default_rates``.
        default_rates: List of ``{"limit": int, "interval": float}`` dicts.
        categories: Dict mapping category names to
            ``{"rates": [{"limit": ..., "interval": ...}]}``.
        bucket: Pyrate bucket backend (``"memory"``, ``"sqlite"``,
            ``"redis"``, ``"postgres"``).
        redis: Redis connection config (for pyrate redis bucket).
        postgres: Postgres config (for pyrate postgres bucket).
        sqlite: SQLite config (for pyrate sqlite bucket).

    Example:
        ```python
        # Simple in-memory limiter
        limiter = create_rate_limiter({
            "rates": [{"limit": 10, "interval": 60}],
        })

        # Per-category with pyrate + Redis
        limiter = create_rate_limiter({
            "backend": "pyrate",
            "bucket": "redis",
            "default_rates": [{"limit": 100, "interval": 60}],
            "categories": {
                "api_write": {"rates": [{"limit": 10, "interval": 60}]},
            },
            "redis": {"url": "redis://localhost:6379"},
        })
        ```
    """
    from .memory import InMemoryRateLimiter

    backend = config.get("backend", "memory")
    parsed = _parse_config(config)

    if backend == "memory":
        return InMemoryRateLimiter(parsed)
    elif backend == "pyrate":
        from .pyrate import PyrateRateLimiter

        return PyrateRateLimiter(parsed, config)
    else:
        raise ValueError(
            f"Unknown rate limiter backend: {backend}. "
            f"Available backends: memory, pyrate"
        )

deserialize

deserialize(cls: Type[T], data: Dict[str, Any]) -> T

Deserialize dictionary into an object.

Convenience function that calls from_dict() with error handling.

Parameters:

Name Type Description Default
cls Type[T]

Class to deserialize into (must have from_dict classmethod)

required
data Dict[str, Any]

Dictionary with serialized data

required

Returns:

Type Description
T

Deserialized object instance

Raises:

Type Description
SerializationError

If class doesn't support deserialization or deserialization fails

Example
class Point:
    def __init__(self, x: int, y: int):
        self.x, self.y = x, y
    @classmethod
    def from_dict(cls, data: dict):
        return cls(data["x"], data["y"])

data = {"x": 10, "y": 20}
point = deserialize(Point, data)
# point.x, point.y
# (10, 20)
Source code in packages/common/src/dataknobs_common/serialization.py
def deserialize(cls: Type[T], data: Dict[str, Any]) -> T:
    """Deserialize dictionary into an object.

    Convenience function that calls from_dict() with error handling.

    Args:
        cls: Class to deserialize into (must have from_dict classmethod)
        data: Dictionary with serialized data

    Returns:
        Deserialized object instance

    Raises:
        SerializationError: If class doesn't support deserialization or deserialization fails

    Example:
        ```python
        class Point:
            def __init__(self, x: int, y: int):
                self.x, self.y = x, y
            @classmethod
            def from_dict(cls, data: dict):
                return cls(data["x"], data["y"])

        data = {"x": 10, "y": 20}
        point = deserialize(Point, data)
        # point.x, point.y
        # (10, 20)
        ```
    """
    if not hasattr(cls, "from_dict"):
        raise SerializationError(
            f"Class {cls.__name__} is not deserializable (missing from_dict classmethod)",
            context={"class": cls.__name__},
        )

    if not isinstance(data, dict):
        raise SerializationError(
            f"Data must be a dict, got {type(data).__name__}",
            context={"class": cls.__name__, "data_type": type(data).__name__},
        )

    try:
        return cls.from_dict(data)
    except Exception as e:
        if isinstance(e, SerializationError):
            raise
        raise SerializationError(
            f"Failed to deserialize {cls.__name__}: {e}",
            context={"class": cls.__name__, "error": str(e), "data": data},
        ) from e

deserialize_list

deserialize_list(cls: Type[T], data_list: list[Dict[str, Any]]) -> list[T]

Deserialize a list of dictionaries into objects.

Parameters:

Name Type Description Default
cls Type[T]

Class to deserialize into

required
data_list list[Dict[str, Any]]

List of serialized dictionaries

required

Returns:

Type Description
list[T]

List of deserialized objects

Raises:

Type Description
SerializationError

If any item cannot be deserialized

Example
data_list = [{"x": 1, "y": 2}, {"x": 3, "y": 4}]
points = deserialize_list(Point, data_list)
len(points)
# 2
Source code in packages/common/src/dataknobs_common/serialization.py
def deserialize_list(cls: Type[T], data_list: list[Dict[str, Any]]) -> list[T]:
    """Deserialize a list of dictionaries into objects.

    Args:
        cls: Class to deserialize into
        data_list: List of serialized dictionaries

    Returns:
        List of deserialized objects

    Raises:
        SerializationError: If any item cannot be deserialized

    Example:
        ```python
        data_list = [{"x": 1, "y": 2}, {"x": 3, "y": 4}]
        points = deserialize_list(Point, data_list)
        len(points)
        # 2
        ```
    """
    return [deserialize(cls, data) for data in data_list]

is_deserializable

is_deserializable(cls: Type) -> bool

Check if a class is deserializable.

Parameters:

Name Type Description Default
cls Type

Class to check

required

Returns:

Type Description
bool

True if class has from_dict classmethod

Example
class Point:
    @classmethod
    def from_dict(cls, data): return cls()

is_deserializable(Point)
# True
is_deserializable(str)
# False
Source code in packages/common/src/dataknobs_common/serialization.py
def is_deserializable(cls: Type) -> bool:
    """Check if a class is deserializable.

    Args:
        cls: Class to check

    Returns:
        True if class has from_dict classmethod

    Example:
        ```python
        class Point:
            @classmethod
            def from_dict(cls, data): return cls()

        is_deserializable(Point)
        # True
        is_deserializable(str)
        # False
        ```
    """
    return hasattr(cls, "from_dict")

is_serializable

is_serializable(obj: Any) -> bool

Check if an object is serializable.

Parameters:

Name Type Description Default
obj Any

Object to check

required

Returns:

Type Description
bool

True if object has to_dict method

Example
class Point:
    def to_dict(self): return {}

is_serializable(Point())
# True
is_serializable("string")
# False
Source code in packages/common/src/dataknobs_common/serialization.py
def is_serializable(obj: Any) -> bool:
    """Check if an object is serializable.

    Args:
        obj: Object to check

    Returns:
        True if object has to_dict method

    Example:
        ```python
        class Point:
            def to_dict(self): return {}

        is_serializable(Point())
        # True
        is_serializable("string")
        # False
        ```
    """
    return isinstance(obj, Serializable) or hasattr(obj, "to_dict")

serialize

serialize(obj: Any) -> Dict[str, Any]

Serialize an object to dictionary.

Convenience function that calls to_dict() with error handling.

Parameters:

Name Type Description Default
obj Any

Object to serialize (must have to_dict method)

required

Returns:

Type Description
Dict[str, Any]

Serialized dictionary

Raises:

Type Description
SerializationError

If object doesn't support serialization or serialization fails

Example
class Point:
    def __init__(self, x: int, y: int):
        self.x, self.y = x, y
    def to_dict(self):
        return {"x": self.x, "y": self.y}

point = Point(10, 20)
data = serialize(point)
# {'x': 10, 'y': 20}
Source code in packages/common/src/dataknobs_common/serialization.py
def serialize(obj: Any) -> Dict[str, Any]:
    """Serialize an object to dictionary.

    Convenience function that calls to_dict() with error handling.

    Args:
        obj: Object to serialize (must have to_dict method)

    Returns:
        Serialized dictionary

    Raises:
        SerializationError: If object doesn't support serialization or serialization fails

    Example:
        ```python
        class Point:
            def __init__(self, x: int, y: int):
                self.x, self.y = x, y
            def to_dict(self):
                return {"x": self.x, "y": self.y}

        point = Point(10, 20)
        data = serialize(point)
        # {'x': 10, 'y': 20}
        ```
    """
    if not hasattr(obj, "to_dict"):
        raise SerializationError(
            f"Object of type {type(obj).__name__} is not serializable (missing to_dict method)",
            context={"type": type(obj).__name__, "object": str(obj)},
        )

    try:
        result = obj.to_dict()
        if not isinstance(result, dict):
            raise SerializationError(
                f"to_dict() must return a dict, got {type(result).__name__}",
                context={"type": type(obj).__name__, "result_type": type(result).__name__},
            )
        return result
    except Exception as e:
        if isinstance(e, SerializationError):
            raise
        raise SerializationError(
            f"Failed to serialize {type(obj).__name__}: {e}",
            context={"type": type(obj).__name__, "error": str(e)},
        ) from e

serialize_list

serialize_list(items: list[Any]) -> list[Dict[str, Any]]

Serialize a list of objects to list of dictionaries.

Parameters:

Name Type Description Default
items list[Any]

List of serializable objects

required

Returns:

Type Description
list[Dict[str, Any]]

List of serialized dictionaries

Raises:

Type Description
SerializationError

If any item cannot be serialized

Example
items = [Point(1, 2), Point(3, 4)]
data_list = serialize_list(items)
len(data_list)
# 2
Source code in packages/common/src/dataknobs_common/serialization.py
def serialize_list(items: list[Any]) -> list[Dict[str, Any]]:
    """Serialize a list of objects to list of dictionaries.

    Args:
        items: List of serializable objects

    Returns:
        List of serialized dictionaries

    Raises:
        SerializationError: If any item cannot be serialized

    Example:
        ```python
        items = [Point(1, 2), Point(3, 4)]
        data_list = serialize_list(items)
        len(data_list)
        # 2
        ```
    """
    return [serialize(item) for item in items]

create_test_json_files

create_test_json_files(tmp_path: Path) -> list[str]

Create test JSON files.

Parameters:

Name Type Description Default
tmp_path Path

Temporary directory path (from pytest fixture)

required

Returns:

Type Description
list[str]

List of created file paths as strings

Source code in packages/common/src/dataknobs_common/testing.py
def create_test_json_files(tmp_path: Path) -> list[str]:
    """Create test JSON files.

    Args:
        tmp_path: Temporary directory path (from pytest fixture)

    Returns:
        List of created file paths as strings
    """
    import json

    files = []

    # Create test JSON file 1
    json1 = tmp_path / "test_data1.json"
    json1.write_text(
        json.dumps(
            {
                "title": "Test Data 1",
                "items": [
                    {"id": 1, "name": "Item 1", "value": 100},
                    {"id": 2, "name": "Item 2", "value": 200},
                ],
                "metadata": {"version": "1.0", "created": "2024-01-01"},
            },
            indent=2,
        )
    )
    files.append(str(json1))

    # Create test JSON file 2
    json2 = tmp_path / "test_data2.json"
    json2.write_text(
        json.dumps(
            {
                "title": "Test Data 2",
                "items": [
                    {"id": 3, "name": "Item 3", "value": 300},
                    {"id": 4, "name": "Item 4", "value": 400},
                ],
                "metadata": {"version": "1.0", "created": "2024-01-02"},
            },
            indent=2,
        )
    )
    files.append(str(json2))

    return files

create_test_markdown_files

create_test_markdown_files(tmp_path: Path) -> list[str]

Create test markdown files for ingestion.

Parameters:

Name Type Description Default
tmp_path Path

Temporary directory path (from pytest fixture)

required

Returns:

Type Description
list[str]

List of created file paths as strings

Example
def test_ingestion(tmp_path):
    files = create_test_markdown_files(tmp_path)
    # files contains paths to test markdown documents
Source code in packages/common/src/dataknobs_common/testing.py
def create_test_markdown_files(tmp_path: Path) -> list[str]:
    """Create test markdown files for ingestion.

    Args:
        tmp_path: Temporary directory path (from pytest fixture)

    Returns:
        List of created file paths as strings

    Example:
        ```python
        def test_ingestion(tmp_path):
            files = create_test_markdown_files(tmp_path)
            # files contains paths to test markdown documents
        ```
    """
    files = []

    # Create test markdown file 1
    md1 = tmp_path / "test_doc1.md"
    md1.write_text(
        """# Test Document 1

## Introduction

This is a test document for validating ingestion and retrieval.

### Key Points

1. First important point
2. Second important point
3. Third important point

## Details

More detailed information about the topic goes here.
"""
    )
    files.append(str(md1))

    # Create test markdown file 2
    md2 = tmp_path / "test_doc2.md"
    md2.write_text(
        """# Test Document 2

## Overview

Another test document with different content.

## Content

- Item A: Description of item A
- Item B: Description of item B
- Item C: Description of item C

## Summary

This concludes the second test document.
"""
    )
    files.append(str(md2))

    return files

get_test_bot_config

get_test_bot_config(
    use_echo_llm: bool = True,
    use_in_memory_storage: bool = True,
    include_memory: bool = False,
    system_prompt: str | None = None,
) -> dict[str, Any]

Get a test bot configuration.

Parameters:

Name Type Description Default
use_echo_llm bool

Use echo LLM instead of real LLM (default: True)

True
use_in_memory_storage bool

Use in-memory conversation storage (default: True)

True
include_memory bool

Include buffer memory configuration (default: False)

False
system_prompt str | None

Optional system prompt content

None

Returns:

Type Description
dict[str, Any]

Bot configuration dictionary suitable for DynaBot.from_config()

Example
config = get_test_bot_config(
    use_echo_llm=True,
    system_prompt="You are a test assistant."
)
bot = await DynaBot.from_config(config)
Source code in packages/common/src/dataknobs_common/testing.py
def get_test_bot_config(
    use_echo_llm: bool = True,
    use_in_memory_storage: bool = True,
    include_memory: bool = False,
    system_prompt: str | None = None,
) -> dict[str, Any]:
    """Get a test bot configuration.

    Args:
        use_echo_llm: Use echo LLM instead of real LLM (default: True)
        use_in_memory_storage: Use in-memory conversation storage (default: True)
        include_memory: Include buffer memory configuration (default: False)
        system_prompt: Optional system prompt content

    Returns:
        Bot configuration dictionary suitable for DynaBot.from_config()

    Example:
        ```python
        config = get_test_bot_config(
            use_echo_llm=True,
            system_prompt="You are a test assistant."
        )
        bot = await DynaBot.from_config(config)
        ```
    """
    config: dict[str, Any] = {
        "llm": {
            "provider": "echo" if use_echo_llm else "openai",
            "model": "test" if use_echo_llm else "gpt-4o-mini",
            "temperature": 0.7,
        },
        "conversation_storage": {
            "backend": "memory" if use_in_memory_storage else "file",
        },
    }

    if include_memory:
        config["memory"] = {
            "type": "buffer",
            "max_messages": 10,
        }

    if system_prompt:
        config["system_prompt"] = system_prompt

    return config

get_test_rag_config

get_test_rag_config(
    use_in_memory_store: bool = True,
    embedding_provider: str = "ollama",
    embedding_model: str = "nomic-embed-text",
) -> dict[str, Any]

Get a test RAG/knowledge base configuration.

Parameters:

Name Type Description Default
use_in_memory_store bool

Use in-memory vector store (default: True)

True
embedding_provider str

Embedding provider (default: "ollama")

'ollama'
embedding_model str

Embedding model name (default: "nomic-embed-text")

'nomic-embed-text'

Returns:

Type Description
dict[str, Any]

Knowledge base configuration dictionary

Example
config = get_test_rag_config(use_in_memory_store=True)
bot_config = get_test_bot_config()
bot_config["knowledge_base"] = config
Source code in packages/common/src/dataknobs_common/testing.py
def get_test_rag_config(
    use_in_memory_store: bool = True,
    embedding_provider: str = "ollama",
    embedding_model: str = "nomic-embed-text",
) -> dict[str, Any]:
    """Get a test RAG/knowledge base configuration.

    Args:
        use_in_memory_store: Use in-memory vector store (default: True)
        embedding_provider: Embedding provider (default: "ollama")
        embedding_model: Embedding model name (default: "nomic-embed-text")

    Returns:
        Knowledge base configuration dictionary

    Example:
        ```python
        config = get_test_rag_config(use_in_memory_store=True)
        bot_config = get_test_bot_config()
        bot_config["knowledge_base"] = config
        ```
    """
    return {
        "type": "rag",
        "vector_store": {
            "backend": "memory" if use_in_memory_store else "faiss",
            "dimensions": 768,
            "metric": "cosine",
        },
        "embedding_provider": embedding_provider,
        "embedding_model": embedding_model,
        "chunking": {
            "max_chunk_size": 800,
        },
        "retrieval": {
            "top_k": 5,
            "score_threshold": 0.7,
        },
    }

is_chromadb_available

is_chromadb_available() -> bool

Check if ChromaDB is available.

Returns:

Type Description
bool

True if ChromaDB can be imported, False otherwise

Source code in packages/common/src/dataknobs_common/testing.py
def is_chromadb_available() -> bool:
    """Check if ChromaDB is available.

    Returns:
        True if ChromaDB can be imported, False otherwise
    """
    return importlib.util.find_spec("chromadb") is not None

is_faiss_available

is_faiss_available() -> bool

Check if FAISS is available.

Returns:

Type Description
bool

True if FAISS can be imported, False otherwise

Source code in packages/common/src/dataknobs_common/testing.py
def is_faiss_available() -> bool:
    """Check if FAISS is available.

    Returns:
        True if FAISS can be imported, False otherwise
    """
    return importlib.util.find_spec("faiss") is not None

is_ollama_available

is_ollama_available() -> bool

Check if Ollama service is available.

Returns:

Type Description
bool

True if Ollama is running, False otherwise

Source code in packages/common/src/dataknobs_common/testing.py
def is_ollama_available() -> bool:
    """Check if Ollama service is available.

    Returns:
        True if Ollama is running, False otherwise
    """
    try:
        result = subprocess.run(
            ["ollama", "list"],
            capture_output=True,
            text=True,
            timeout=5,
            check=False,
        )
        return result.returncode == 0
    except (subprocess.TimeoutExpired, FileNotFoundError, OSError):
        return False

is_ollama_model_available

is_ollama_model_available(model_name: str = 'nomic-embed-text') -> bool

Check if a specific Ollama model is available.

Parameters:

Name Type Description Default
model_name str

Name of the model to check (default: nomic-embed-text)

'nomic-embed-text'

Returns:

Type Description
bool

True if model is available, False otherwise

Source code in packages/common/src/dataknobs_common/testing.py
def is_ollama_model_available(model_name: str = "nomic-embed-text") -> bool:
    """Check if a specific Ollama model is available.

    Args:
        model_name: Name of the model to check (default: nomic-embed-text)

    Returns:
        True if model is available, False otherwise
    """
    if not is_ollama_available():
        return False

    try:
        result = subprocess.run(
            ["ollama", "list"],
            capture_output=True,
            text=True,
            timeout=5,
            check=False,
        )
        return model_name in result.stdout
    except (subprocess.TimeoutExpired, FileNotFoundError, OSError):
        return False

is_package_available

is_package_available(package_name: str) -> bool

Check if a Python package is available.

Parameters:

Name Type Description Default
package_name str

Name of the package to check

required

Returns:

Type Description
bool

True if package can be imported, False otherwise

Source code in packages/common/src/dataknobs_common/testing.py
def is_package_available(package_name: str) -> bool:
    """Check if a Python package is available.

    Args:
        package_name: Name of the package to check

    Returns:
        True if package can be imported, False otherwise
    """
    return importlib.util.find_spec(package_name) is not None

is_redis_available

is_redis_available(host: str | None = None, port: int | None = None) -> bool

Check if Redis service is available.

Reads REDIS_HOST and REDIS_PORT environment variables when explicit arguments are not provided, falling back to localhost:6379. This ensures the check works both on the host and inside Docker where Redis runs on a network hostname.

Parameters:

Name Type Description Default
host str | None

Redis host (default: $REDIS_HOST or localhost)

None
port int | None

Redis port (default: $REDIS_PORT or 6379)

None

Returns:

Type Description
bool

True if Redis is available, False otherwise

Source code in packages/common/src/dataknobs_common/testing.py
def is_redis_available(host: str | None = None, port: int | None = None) -> bool:
    """Check if Redis service is available.

    Reads ``REDIS_HOST`` and ``REDIS_PORT`` environment variables when
    explicit arguments are not provided, falling back to ``localhost:6379``.
    This ensures the check works both on the host and inside Docker
    where Redis runs on a network hostname.

    Args:
        host: Redis host (default: ``$REDIS_HOST`` or ``localhost``)
        port: Redis port (default: ``$REDIS_PORT`` or ``6379``)

    Returns:
        True if Redis is available, False otherwise
    """
    import os

    host = host if host is not None else os.environ.get("REDIS_HOST", "localhost")
    port = port if port is not None else int(os.environ.get("REDIS_PORT", "6379"))
    try:
        import socket

        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(1)
        result = sock.connect_ex((host, port))
        sock.close()
        return result == 0
    except OSError:
        return False