Skip to content

dataknobs-data Complete API Reference

Complete auto-generated API documentation from source code docstrings.

💡 Also see: - Curated Guide - Hand-crafted tutorials and examples - Package Overview - Introduction and getting started - Source Code - View on GitHub


dataknobs_data

DataKnobs Data Package - Unified data abstraction layer.

The dataknobs-data package provides a unified interface for working with various database backends, including SQLite, PostgreSQL, Elasticsearch, and S3. It offers structured data storage, querying, validation, migration, and vector search capabilities.

Modules:

Name Description
database

Core Database classes (SyncDatabase, AsyncDatabase) providing the main API

records

Record class for structured data with fields and metadata

fields

Field types and definitions for data validation

schema

Database schema definitions and field schemas

query

Query building with filters, operators, and sorting

query_logic

Complex boolean logic queries with AND/OR/NOT operators

factory

Database factory functions for creating database instances

streaming

Streaming operations for large-scale data processing

validation

Data validation with schemas and constraints

migration

Data migration tools for moving between backends

exceptions

Custom exceptions for error handling

Quick Examples:

Create and query a database:

```python
from dataknobs_data import database_factory, Record, Query, Operator, Filter

# Create an in-memory database
db = database_factory("memory")

# Add records
db.add(Record({"name": "Alice", "age": 30}))
db.add(Record({"name": "Bob", "age": 25}))

# Query with filters
query = Query(filters=[Filter("age", Operator.GT, 25)])
results = db.search(query)
print(results)  # [Record with Alice's data]
```

Use schemas for validation:

```python
from dataknobs_data import database_factory, Record, FieldType
from dataknobs_data.schema import DatabaseSchema

# Define schema
schema = DatabaseSchema.create(
    name=FieldType.STRING,
    age=FieldType.INTEGER,
    email=FieldType.STRING
)

# Create database with schema
db = database_factory("memory", config={"schema": schema})
db.add(Record({"name": "Alice", "age": 30, "email": "alice@example.com"}))
```

Stream large datasets:

```python
from dataknobs_data import database_factory, StreamConfig

db = database_factory("sqlite", config={"path": "large_data.db"})

# Stream records in batches
config = StreamConfig(batch_size=100)
for batch in db.stream(config=config):
    process_batch(batch.records)
```

Design Philosophy:

1. **Backend Agnostic** - Write once, deploy anywhere with multiple backend support
2. **Type Safe** - Strong typing with schema validation and field type checking
3. **Async Ready** - Full async/await support for high-performance applications
4. **Composable** - Mix and match features like validation, migration, and vector search

Installation:

```bash
pip install dataknobs-data
```

For detailed documentation, see the individual module docstrings and the online documentation at https://docs.kbs-labs.com/dataknobs

Classes:

Name Description
AsyncDatabase

Abstract base class for async database implementations.

SyncDatabase

Synchronous variant of the Database abstract base class.

BackendNotFoundError

Raised when a requested backend is not available.

ConcurrencyError

Raised when a concurrency conflict occurs.

ConfigurationError

Raised when configuration is invalid.

DatabaseConnectionError

Raised when database connection fails.

DatabaseOperationError

Raised when a database operation fails.

FieldTypeError

Raised when a field type operation fails.

MigrationError

Raised when data migration fails.

QueryError

Raised when query execution fails.

RecordNotFoundError

Raised when a requested record is not found.

RecordValidationError

Raised when record validation fails.

SerializationError

Raised when serialization/deserialization fails.

TransactionError

Raised when a transaction fails.

AsyncDatabaseFactory

Factory for creating async database backends.

DatabaseFactory

Factory for creating database backends dynamically.

Field

Represents a single field in a record.

FieldType

Enumeration of supported field types.

VectorField

Represents a vector field with embeddings and metadata.

Filter

Represents a filter condition.

Operator

Query operators for filtering.

Query

Represents a database query with filters, sorting, pagination, and vector search.

SortOrder

Sort order for query results.

SortSpec

Represents a sort specification.

ComplexQuery

A query with complex boolean logic support.

Condition

Abstract base class for query conditions.

FilterCondition

A single filter condition.

LogicCondition

A logical combination of conditions.

LogicOperator

Logical operators for combining conditions.

QueryBuilder

Builder for complex queries with boolean logic.

Record

Represents a structured data record with fields and metadata.

DedupChecker

Checks content uniqueness via hash matching and optional semantic similarity.

DedupConfig

Configuration for deduplication checking.

DedupResult

Result of a deduplication check.

SimilarItem

A record that is semantically similar to the candidate.

StreamConfig

Configuration for streaming operations.

StreamProcessor

Base class for stream processing utilities.

StreamResult

Result of streaming operation.

Classes

AsyncDatabase

AsyncDatabase(
    config: dict[str, Any] | None = None, schema: DatabaseSchema | None = None
)

Bases: ABC

Abstract base class for async database implementations.

Provides a unified async interface for CRUD operations, querying, and streaming across different backend databases. Supports schema validation, batch operations, and complex queries with boolean logic.

Example
from dataknobs_data import async_database_factory, Record, Query, Filter, Operator

# Create async database
db = async_database_factory("memory")

# Use as async context manager
async with db:
    # Create records
    id1 = await db.create(Record({"name": "Alice", "age": 30}))
    id2 = await db.create(Record({"name": "Bob", "age": 25}))

    # Query records
    query = Query(filters=[Filter("age", Operator.GT, 25)])
    results = await db.search(query)
    print(results)  # [Alice's record]

    # Update record
    await db.update(id1, Record({"name": "Alice", "age": 31}))

    # Stream large datasets
    async for record in db.stream_read():
        process_record(record)

Initialize the database with optional configuration.

Parameters:

Name Type Description Default
config dict[str, Any] | None

Backend-specific configuration parameters (may include 'schema' key)

None
schema DatabaseSchema | None

Optional database schema (overrides config schema)

None
Example
from dataknobs_data import AsyncDatabase
from dataknobs_data.schema import DatabaseSchema
from dataknobs_data.fields import FieldType

# With schema
schema = DatabaseSchema.create(
    name=FieldType.STRING,
    age=FieldType.INTEGER
)
db = AsyncDatabase(config={"path": "data.db"}, schema=schema)

Methods:

Name Description
set_schema

Set the database schema.

add_field_schema

Add a field to the database schema.

with_schema

Set schema using field definitions.

create

Create a new record in the database.

read

Read a record by ID.

update

Update an existing record.

delete

Delete a record by ID.

search

Search for records matching a query.

all

Get all records from the database.

exists

Check if a record exists.

upsert

Update or insert a record.

create_batch

Create multiple records in batch.

read_batch

Read multiple records by ID.

delete_batch

Delete multiple records by ID.

update_batch

Update multiple records.

count

Count records matching a query.

clear

Clear all records from the database.

connect

Connect to the database. Override in subclasses if needed.

close

Close the database connection. Override in subclasses if needed.

disconnect

Disconnect from the database (alias for close).

__aenter__

Async context manager entry.

__aexit__

Async context manager exit.

stream_read

Stream records from database.

stream_write

Stream records into database.

stream_transform

Stream records through a transformation.

from_backend

Factory method to create and connect a database instance.

Source code in packages/data/src/dataknobs_data/database.py
def __init__(self, config: dict[str, Any] | None = None, schema: DatabaseSchema | None = None):
    """Initialize the database with optional configuration.

    Args:
        config: Backend-specific configuration parameters (may include 'schema' key)
        schema: Optional database schema (overrides config schema)

    Example:
        ```python
        from dataknobs_data import AsyncDatabase
        from dataknobs_data.schema import DatabaseSchema
        from dataknobs_data.fields import FieldType

        # With schema
        schema = DatabaseSchema.create(
            name=FieldType.STRING,
            age=FieldType.INTEGER
        )
        db = AsyncDatabase(config={"path": "data.db"}, schema=schema)
        ```
    """
    config = config or {}

    # Extract schema from config if present and no explicit schema provided
    if schema is None and "schema" in config:
        schema = self._extract_schema_from_config(config["schema"])
        # Remove schema from config so backends don't see it
        config = {k: v for k, v in config.items() if k != "schema"}

    self.config = config
    self.schema = schema or DatabaseSchema()
    self._initialize()
Functions
set_schema
set_schema(schema: DatabaseSchema) -> None

Set the database schema.

Parameters:

Name Type Description Default
schema DatabaseSchema

The database schema to use

required
Source code in packages/data/src/dataknobs_data/database.py
def set_schema(self, schema: DatabaseSchema) -> None:
    """Set the database schema.

    Args:
        schema: The database schema to use
    """
    self.schema = schema
add_field_schema
add_field_schema(field_schema: FieldSchema) -> None

Add a field to the database schema.

Parameters:

Name Type Description Default
field_schema FieldSchema

The field schema to add

required
Source code in packages/data/src/dataknobs_data/database.py
def add_field_schema(self, field_schema: FieldSchema) -> None:
    """Add a field to the database schema.

    Args:
        field_schema: The field schema to add
    """
    self.schema.add_field(field_schema)
with_schema
with_schema(**field_definitions) -> AsyncDatabase

Set schema using field definitions.

Returns self for chaining.

Examples:

db = AsyncMemoryDatabase().with_schema( content=FieldType.TEXT, embedding=(FieldType.VECTOR, {"dimensions": 384, "source_field": "content"}) )

Source code in packages/data/src/dataknobs_data/database.py
def with_schema(self, **field_definitions) -> AsyncDatabase:
    """Set schema using field definitions.

    Returns self for chaining.

    Examples:
        db = AsyncMemoryDatabase().with_schema(
            content=FieldType.TEXT,
            embedding=(FieldType.VECTOR, {"dimensions": 384, "source_field": "content"})
        )
    """
    self.schema = DatabaseSchema.create(**field_definitions)
    return self
create abstractmethod async
create(record: Record) -> str

Create a new record in the database.

Parameters:

Name Type Description Default
record Record

The record to create

required

Returns:

Type Description
str

The ID of the created record

Source code in packages/data/src/dataknobs_data/database.py
@abstractmethod
async def create(self, record: Record) -> str:
    """Create a new record in the database.

    Args:
        record: The record to create

    Returns:
        The ID of the created record
    """
    raise NotImplementedError
read abstractmethod async
read(id: str) -> Record | None

Read a record by ID.

Parameters:

Name Type Description Default
id str

The record ID

required

Returns:

Type Description
Record | None

The record if found, None otherwise

Source code in packages/data/src/dataknobs_data/database.py
@abstractmethod
async def read(self, id: str) -> Record | None:
    """Read a record by ID.

    Args:
        id: The record ID

    Returns:
        The record if found, None otherwise
    """
    raise NotImplementedError
update abstractmethod async
update(id: str, record: Record) -> bool

Update an existing record.

Parameters:

Name Type Description Default
id str

The record ID

required
record Record

The updated record

required

Returns:

Type Description
bool

True if the record was updated, False if not found

Source code in packages/data/src/dataknobs_data/database.py
@abstractmethod
async def update(self, id: str, record: Record) -> bool:
    """Update an existing record.

    Args:
        id: The record ID
        record: The updated record

    Returns:
        True if the record was updated, False if not found
    """
    raise NotImplementedError
delete abstractmethod async
delete(id: str) -> bool

Delete a record by ID.

Parameters:

Name Type Description Default
id str

The record ID

required

Returns:

Type Description
bool

True if the record was deleted, False if not found

Source code in packages/data/src/dataknobs_data/database.py
@abstractmethod
async def delete(self, id: str) -> bool:
    """Delete a record by ID.

    Args:
        id: The record ID

    Returns:
        True if the record was deleted, False if not found
    """
    raise NotImplementedError
search abstractmethod async
search(query: Query | ComplexQuery) -> list[Record]

Search for records matching a query.

Parameters:

Name Type Description Default
query Query | ComplexQuery

The search query (simple or complex)

required

Returns:

Type Description
list[Record]

List of matching records

Source code in packages/data/src/dataknobs_data/database.py
@abstractmethod
async def search(self, query: Query | ComplexQuery) -> list[Record]:
    """Search for records matching a query.

    Args:
        query: The search query (simple or complex)

    Returns:
        List of matching records
    """
    raise NotImplementedError
all async
all() -> list[Record]

Get all records from the database.

Returns:

Type Description
list[Record]

List of all records

Source code in packages/data/src/dataknobs_data/database.py
async def all(self) -> list[Record]:
    """Get all records from the database.

    Returns:
        List of all records
    """
    # Default implementation using search with empty query
    from .query import Query
    return await self.search(Query())
exists abstractmethod async
exists(id: str) -> bool

Check if a record exists.

Parameters:

Name Type Description Default
id str

The record ID

required

Returns:

Type Description
bool

True if the record exists, False otherwise

Source code in packages/data/src/dataknobs_data/database.py
@abstractmethod
async def exists(self, id: str) -> bool:
    """Check if a record exists.

    Args:
        id: The record ID

    Returns:
        True if the record exists, False otherwise
    """
    raise NotImplementedError
upsert async
upsert(id_or_record: str | Record, record: Record | None = None) -> str

Update or insert a record.

Can be called as: - upsert(id, record) - explicit ID and record - upsert(record) - extract ID from record using Record's built-in logic

Parameters:

Name Type Description Default
id_or_record str | Record

Either an ID string or a Record

required
record Record | None

The record to upsert (if first arg is ID)

None

Returns:

Type Description
str

The record ID

Source code in packages/data/src/dataknobs_data/database.py
async def upsert(self, id_or_record: str | Record, record: Record | None = None) -> str:
    """Update or insert a record.

    Can be called as:
    - upsert(id, record) - explicit ID and record
    - upsert(record) - extract ID from record using Record's built-in logic

    Args:
        id_or_record: Either an ID string or a Record
        record: The record to upsert (if first arg is ID)

    Returns:
        The record ID
    """
    import uuid

    # Determine ID and record based on arguments
    if isinstance(id_or_record, str):
        # Called with explicit ID: upsert(id, record)
        id = id_or_record
        if record is None:
            raise ValueError("Record required when ID is provided")
    else:
        # Called with just record: upsert(record)
        record = id_or_record
        # Use Record's built-in ID property which handles all the logic
        id = record.id

        if id is None:
            # Generate a new ID if none found
            id = str(uuid.uuid4())  # type: ignore[unreachable]
            # Set it on the record for future reference
            record.storage_id = id

    # Now perform the upsert
    if await self.exists(id):
        await self.update(id, record)
    else:
        # Ensure the record has the storage_id set for create
        if not record.storage_id:
            record.storage_id = id
        created_id = await self.create(record)
        # Return the created ID (might be different from what we provided)
        return created_id or id
    return id
create_batch async
create_batch(records: list[Record]) -> list[str]

Create multiple records in batch.

Parameters:

Name Type Description Default
records list[Record]

List of records to create

required

Returns:

Type Description
list[str]

List of created record IDs

Source code in packages/data/src/dataknobs_data/database.py
async def create_batch(self, records: list[Record]) -> list[str]:
    """Create multiple records in batch.

    Args:
        records: List of records to create

    Returns:
        List of created record IDs
    """
    ids = []
    for record in records:
        id = await self.create(record)
        ids.append(id)
    return ids
read_batch async
read_batch(ids: list[str]) -> list[Record | None]

Read multiple records by ID.

Parameters:

Name Type Description Default
ids list[str]

List of record IDs

required

Returns:

Type Description
list[Record | None]

List of records (None for not found)

Source code in packages/data/src/dataknobs_data/database.py
async def read_batch(self, ids: list[str]) -> list[Record | None]:
    """Read multiple records by ID.

    Args:
        ids: List of record IDs

    Returns:
        List of records (None for not found)
    """
    records = []
    for id in ids:
        record = await self.read(id)
        records.append(record)
    return records
delete_batch async
delete_batch(ids: list[str]) -> list[bool]

Delete multiple records by ID.

Parameters:

Name Type Description Default
ids list[str]

List of record IDs

required

Returns:

Type Description
list[bool]

List of deletion results

Source code in packages/data/src/dataknobs_data/database.py
async def delete_batch(self, ids: list[str]) -> list[bool]:
    """Delete multiple records by ID.

    Args:
        ids: List of record IDs

    Returns:
        List of deletion results
    """
    results = []
    for id in ids:
        result = await self.delete(id)
        results.append(result)
    return results
update_batch async
update_batch(updates: list[tuple[str, Record]]) -> list[bool]

Update multiple records.

Default implementation calls update() for each ID/record pair. Override for better performance.

Parameters:

Name Type Description Default
updates list[tuple[str, Record]]

List of (id, record) tuples to update

required

Returns:

Type Description
list[bool]

List of success flags for each update

Source code in packages/data/src/dataknobs_data/database.py
async def update_batch(self, updates: list[tuple[str, Record]]) -> list[bool]:
    """Update multiple records.

    Default implementation calls update() for each ID/record pair.
    Override for better performance.

    Args:
        updates: List of (id, record) tuples to update

    Returns:
        List of success flags for each update
    """
    results = []
    for id, record in updates:
        result = await self.update(id, record)
        results.append(result)
    return results
count async
count(query: Query | None = None) -> int

Count records matching a query.

Parameters:

Name Type Description Default
query Query | None

Optional search query (counts all if None)

None

Returns:

Type Description
int

Number of matching records

Source code in packages/data/src/dataknobs_data/database.py
async def count(self, query: Query | None = None) -> int:
    """Count records matching a query.

    Args:
        query: Optional search query (counts all if None)

    Returns:
        Number of matching records
    """
    if query:
        results = await self.search(query)
        return len(results)
    else:
        return await self._count_all()
clear async
clear() -> int

Clear all records from the database.

Returns:

Type Description
int

Number of records deleted

Source code in packages/data/src/dataknobs_data/database.py
async def clear(self) -> int:
    """Clear all records from the database.

    Returns:
        Number of records deleted
    """
    raise NotImplementedError
connect async
connect() -> None

Connect to the database. Override in subclasses if needed.

Source code in packages/data/src/dataknobs_data/database.py
async def connect(self) -> None:  # noqa: B027
    """Connect to the database. Override in subclasses if needed."""
close async
close() -> None

Close the database connection. Override in subclasses if needed.

Source code in packages/data/src/dataknobs_data/database.py
async def close(self) -> None:  # noqa: B027
    """Close the database connection. Override in subclasses if needed."""
disconnect async
disconnect() -> None

Disconnect from the database (alias for close).

Source code in packages/data/src/dataknobs_data/database.py
async def disconnect(self) -> None:
    """Disconnect from the database (alias for close)."""
    await self.close()
__aenter__ async
__aenter__()

Async context manager entry.

Source code in packages/data/src/dataknobs_data/database.py
async def __aenter__(self):
    """Async context manager entry."""
    await self.connect()
    return self
__aexit__ async
__aexit__(exc_type, exc_val, exc_tb)

Async context manager exit.

Source code in packages/data/src/dataknobs_data/database.py
async def __aexit__(self, exc_type, exc_val, exc_tb):
    """Async context manager exit."""
    await self.close()
stream_read abstractmethod async
stream_read(
    query: Query | None = None, config: StreamConfig | None = None
) -> AsyncIterator[Record]

Stream records from database.

Yields records one at a time, fetching in batches internally.

Parameters:

Name Type Description Default
query Query | None

Optional query to filter records

None
config StreamConfig | None

Streaming configuration

None

Yields:

Type Description
AsyncIterator[Record]

Records matching the query

Source code in packages/data/src/dataknobs_data/database.py
@abstractmethod
async def stream_read(
    self,
    query: Query | None = None,
    config: StreamConfig | None = None
) -> AsyncIterator[Record]:
    """Stream records from database.

    Yields records one at a time, fetching in batches internally.

    Args:
        query: Optional query to filter records
        config: Streaming configuration

    Yields:
        Records matching the query
    """
    raise NotImplementedError
stream_write abstractmethod async
stream_write(
    records: AsyncIterator[Record], config: StreamConfig | None = None
) -> StreamResult

Stream records into database.

Accepts an iterator and writes in batches.

Parameters:

Name Type Description Default
records AsyncIterator[Record]

Iterator of records to write

required
config StreamConfig | None

Streaming configuration

None

Returns:

Type Description
StreamResult

Result of the streaming operation

Source code in packages/data/src/dataknobs_data/database.py
@abstractmethod
async def stream_write(
    self,
    records: AsyncIterator[Record],
    config: StreamConfig | None = None
) -> StreamResult:
    """Stream records into database.

    Accepts an iterator and writes in batches.

    Args:
        records: Iterator of records to write
        config: Streaming configuration

    Returns:
        Result of the streaming operation
    """
    raise NotImplementedError
stream_transform async
stream_transform(
    query: Query | None = None,
    transform: Callable[[Record], Record | None] | None = None,
    config: StreamConfig | None = None,
) -> AsyncIterator[Record]

Stream records through a transformation.

Default implementation, can be overridden for efficiency.

Parameters:

Name Type Description Default
query Query | None

Optional query to filter records

None
transform Callable[[Record], Record | None] | None

Optional transformation function

None
config StreamConfig | None

Streaming configuration

None

Yields:

Type Description
AsyncIterator[Record]

Transformed records

Source code in packages/data/src/dataknobs_data/database.py
async def stream_transform(
    self,
    query: Query | None = None,
    transform: Callable[[Record], Record | None] | None = None,
    config: StreamConfig | None = None
) -> AsyncIterator[Record]:
    """Stream records through a transformation.

    Default implementation, can be overridden for efficiency.

    Args:
        query: Optional query to filter records
        transform: Optional transformation function
        config: Streaming configuration

    Yields:
        Transformed records
    """
    async for record in self.stream_read(query, config):
        if transform:
            transformed = transform(record)
            if transformed:  # None means filter out
                yield transformed
        else:
            yield record
from_backend async classmethod
from_backend(
    backend: str, config: dict[str, Any] | None = None
) -> AsyncDatabase

Factory method to create and connect a database instance.

Parameters:

Name Type Description Default
backend str

The backend type ("memory", "file", "s3", "postgres", "elasticsearch")

required
config dict[str, Any] | None

Backend-specific configuration

None

Returns:

Type Description
AsyncDatabase

Connected AsyncDatabase instance

Source code in packages/data/src/dataknobs_data/database.py
@classmethod
async def from_backend(cls, backend: str, config: dict[str, Any] | None = None) -> AsyncDatabase:
    """Factory method to create and connect a database instance.

    Args:
        backend: The backend type ("memory", "file", "s3", "postgres", "elasticsearch")
        config: Backend-specific configuration

    Returns:
        Connected AsyncDatabase instance
    """
    from .backends import BACKEND_REGISTRY

    backend_class = BACKEND_REGISTRY.get(backend)
    if not backend_class:
        raise ValueError(
            f"Unknown backend: {backend}. Available: {list(BACKEND_REGISTRY.keys())}"
        )

    instance = backend_class(config)
    await instance.connect()
    return instance

SyncDatabase

SyncDatabase(
    config: dict[str, Any] | None = None, schema: DatabaseSchema | None = None
)

Bases: ABC

Synchronous variant of the Database abstract base class.

Provides a unified synchronous interface for CRUD operations, querying, and streaming across different backend databases. Supports schema validation, batch operations, and complex queries with boolean logic.

Example
from dataknobs_data import database_factory, Record, Query, Filter, Operator

# Create database
db = database_factory("memory")

# Use as context manager
with db:
    # Create records
    id1 = db.create(Record({"name": "Alice", "age": 30}))
    id2 = db.create(Record({"name": "Bob", "age": 25}))

    # Query records
    query = Query(filters=[Filter("age", Operator.GT, 25)])
    results = db.search(query)
    print(results)  # [Alice's record]

    # Update record
    db.update(id1, Record({"name": "Alice", "age": 31}))

    # Stream large datasets
    for record in db.stream_read():
        process_record(record)

Initialize the database with optional configuration.

Parameters:

Name Type Description Default
config dict[str, Any] | None

Backend-specific configuration parameters (may include 'schema' key)

None
schema DatabaseSchema | None

Optional database schema (overrides config schema)

None
Example
from dataknobs_data import SyncDatabase
from dataknobs_data.schema import DatabaseSchema
from dataknobs_data.fields import FieldType

# With schema
schema = DatabaseSchema.create(
    name=FieldType.STRING,
    age=FieldType.INTEGER
)
db = SyncDatabase(config={"path": "data.db"}, schema=schema)

Methods:

Name Description
set_schema

Set the database schema.

add_field_schema

Add a field to the database schema.

with_schema

Set schema using field definitions.

create

Create a new record in the database.

read

Read a record by ID.

update

Update an existing record.

delete

Delete a record by ID.

search

Search for records matching a query (simple or complex).

all

Get all records from the database.

exists

Check if a record exists.

upsert

Update or insert a record.

create_batch

Create multiple records in batch.

read_batch

Read multiple records by ID.

delete_batch

Delete multiple records by ID.

update_batch

Update multiple records.

count

Count records matching a query.

clear

Clear all records from the database.

connect

Connect to the database. Override in subclasses if needed.

close

Close the database connection. Override in subclasses if needed.

disconnect

Disconnect from the database (alias for close).

__enter__

Context manager entry.

__exit__

Context manager exit.

stream_read

Stream records from database.

stream_write

Stream records into database.

stream_transform

Stream records through a transformation.

from_backend

Factory method to create and connect a synchronous database instance.

Source code in packages/data/src/dataknobs_data/database.py
def __init__(self, config: dict[str, Any] | None = None, schema: DatabaseSchema | None = None):
    """Initialize the database with optional configuration.

    Args:
        config: Backend-specific configuration parameters (may include 'schema' key)
        schema: Optional database schema (overrides config schema)

    Example:
        ```python
        from dataknobs_data import SyncDatabase
        from dataknobs_data.schema import DatabaseSchema
        from dataknobs_data.fields import FieldType

        # With schema
        schema = DatabaseSchema.create(
            name=FieldType.STRING,
            age=FieldType.INTEGER
        )
        db = SyncDatabase(config={"path": "data.db"}, schema=schema)
        ```
    """
    config = config or {}

    # Extract schema from config if present and no explicit schema provided
    if schema is None and "schema" in config:
        schema = AsyncDatabase._extract_schema_from_config(config["schema"])
        # Remove schema from config so backends don't see it
        config = {k: v for k, v in config.items() if k != "schema"}

    self.config = config
    self.schema = schema or DatabaseSchema()
    self._initialize()
Functions
set_schema
set_schema(schema: DatabaseSchema) -> None

Set the database schema.

Parameters:

Name Type Description Default
schema DatabaseSchema

The database schema to use

required
Source code in packages/data/src/dataknobs_data/database.py
def set_schema(self, schema: DatabaseSchema) -> None:
    """Set the database schema.

    Args:
        schema: The database schema to use
    """
    self.schema = schema
add_field_schema
add_field_schema(field_schema: FieldSchema) -> None

Add a field to the database schema.

Parameters:

Name Type Description Default
field_schema FieldSchema

The field schema to add

required
Source code in packages/data/src/dataknobs_data/database.py
def add_field_schema(self, field_schema: FieldSchema) -> None:
    """Add a field to the database schema.

    Args:
        field_schema: The field schema to add
    """
    self.schema.add_field(field_schema)
with_schema
with_schema(**field_definitions) -> SyncDatabase

Set schema using field definitions.

Returns self for chaining.

Examples:

db = SyncMemoryDatabase().with_schema( content=FieldType.TEXT, embedding=(FieldType.VECTOR, {"dimensions": 384, "source_field": "content"}) )

Source code in packages/data/src/dataknobs_data/database.py
def with_schema(self, **field_definitions) -> SyncDatabase:
    """Set schema using field definitions.

    Returns self for chaining.

    Examples:
        db = SyncMemoryDatabase().with_schema(
            content=FieldType.TEXT,
            embedding=(FieldType.VECTOR, {"dimensions": 384, "source_field": "content"})
        )
    """
    self.schema = DatabaseSchema.create(**field_definitions)
    return self
create abstractmethod
create(record: Record) -> str

Create a new record in the database.

Parameters:

Name Type Description Default
record Record

The record to create

required

Returns:

Type Description
str

The ID of the created record

Example
record = Record({"name": "Alice", "age": 30})
record_id = db.create(record)
print(record_id)  # "550e8400-e29b-41d4-a716-446655440000"
Source code in packages/data/src/dataknobs_data/database.py
@abstractmethod
def create(self, record: Record) -> str:
    """Create a new record in the database.

    Args:
        record: The record to create

    Returns:
        The ID of the created record

    Example:
        ```python
        record = Record({"name": "Alice", "age": 30})
        record_id = db.create(record)
        print(record_id)  # "550e8400-e29b-41d4-a716-446655440000"
        ```
    """
    raise NotImplementedError
read abstractmethod
read(id: str) -> Record | None

Read a record by ID.

Parameters:

Name Type Description Default
id str

The record ID

required

Returns:

Type Description
Record | None

The record if found, None otherwise

Example
record = db.read("550e8400-e29b-41d4-a716-446655440000")
if record:
    print(record.get_value("name"))  # "Alice"
Source code in packages/data/src/dataknobs_data/database.py
@abstractmethod
def read(self, id: str) -> Record | None:
    """Read a record by ID.

    Args:
        id: The record ID

    Returns:
        The record if found, None otherwise

    Example:
        ```python
        record = db.read("550e8400-e29b-41d4-a716-446655440000")
        if record:
            print(record.get_value("name"))  # "Alice"
        ```
    """
    raise NotImplementedError
update abstractmethod
update(id: str, record: Record) -> bool

Update an existing record.

Parameters:

Name Type Description Default
id str

The record ID

required
record Record

The updated record

required

Returns:

Type Description
bool

True if the record was updated, False if not found

Example
updated_record = Record({"name": "Alice", "age": 31})
success = db.update(record_id, updated_record)
print(success)  # True
Source code in packages/data/src/dataknobs_data/database.py
@abstractmethod
def update(self, id: str, record: Record) -> bool:
    """Update an existing record.

    Args:
        id: The record ID
        record: The updated record

    Returns:
        True if the record was updated, False if not found

    Example:
        ```python
        updated_record = Record({"name": "Alice", "age": 31})
        success = db.update(record_id, updated_record)
        print(success)  # True
        ```
    """
    raise NotImplementedError
delete abstractmethod
delete(id: str) -> bool

Delete a record by ID.

Parameters:

Name Type Description Default
id str

The record ID

required

Returns:

Type Description
bool

True if the record was deleted, False if not found

Example
success = db.delete(record_id)
print(success)  # True
Source code in packages/data/src/dataknobs_data/database.py
@abstractmethod
def delete(self, id: str) -> bool:
    """Delete a record by ID.

    Args:
        id: The record ID

    Returns:
        True if the record was deleted, False if not found

    Example:
        ```python
        success = db.delete(record_id)
        print(success)  # True
        ```
    """
    raise NotImplementedError
search abstractmethod
search(query: Query | ComplexQuery) -> list[Record]

Search for records matching a query (simple or complex).

Parameters:

Name Type Description Default
query Query | ComplexQuery

The search query

required

Returns:

Type Description
list[Record]

List of matching records

Example
# Simple query
query = Query(filters=[Filter("age", Operator.GT, 25)])
results = db.search(query)

# Complex query with boolean logic
from dataknobs_data.query_logic import QueryBuilder, LogicOperator

complex_query = (
    QueryBuilder()
    .where("age", Operator.GT, 25)
    .and_where("name", Operator.LIKE, "A%")
    .build()
)
results = db.search(complex_query)
Source code in packages/data/src/dataknobs_data/database.py
@abstractmethod
def search(self, query: Query | ComplexQuery) -> list[Record]:
    """Search for records matching a query (simple or complex).

    Args:
        query: The search query

    Returns:
        List of matching records

    Example:
        ```python
        # Simple query
        query = Query(filters=[Filter("age", Operator.GT, 25)])
        results = db.search(query)

        # Complex query with boolean logic
        from dataknobs_data.query_logic import QueryBuilder, LogicOperator

        complex_query = (
            QueryBuilder()
            .where("age", Operator.GT, 25)
            .and_where("name", Operator.LIKE, "A%")
            .build()
        )
        results = db.search(complex_query)
        ```
    """
    raise NotImplementedError
all
all() -> list[Record]

Get all records from the database.

Returns:

Type Description
list[Record]

List of all records

Source code in packages/data/src/dataknobs_data/database.py
def all(self) -> list[Record]:
    """Get all records from the database.

    Returns:
        List of all records
    """
    # Default implementation using search with empty query
    from .query import Query
    return self.search(Query())
exists abstractmethod
exists(id: str) -> bool

Check if a record exists.

Source code in packages/data/src/dataknobs_data/database.py
@abstractmethod
def exists(self, id: str) -> bool:
    """Check if a record exists."""
    raise NotImplementedError
upsert
upsert(id_or_record: str | Record, record: Record | None = None) -> str

Update or insert a record.

Can be called as: - upsert(id, record) - explicit ID and record - upsert(record) - extract ID from record using Record's built-in logic

Parameters:

Name Type Description Default
id_or_record str | Record

Either an ID string or a Record

required
record Record | None

The record to upsert (if first arg is ID)

None

Returns:

Type Description
str

The record ID

Source code in packages/data/src/dataknobs_data/database.py
def upsert(self, id_or_record: str | Record, record: Record | None = None) -> str:
    """Update or insert a record.

    Can be called as:
    - upsert(id, record) - explicit ID and record
    - upsert(record) - extract ID from record using Record's built-in logic

    Args:
        id_or_record: Either an ID string or a Record
        record: The record to upsert (if first arg is ID)

    Returns:
        The record ID
    """
    import uuid

    # Determine ID and record based on arguments
    if isinstance(id_or_record, str):
        # Called with explicit ID: upsert(id, record)
        id = id_or_record
        if record is None:
            raise ValueError("Record required when ID is provided")
    else:
        # Called with just record: upsert(record)
        record = id_or_record
        # Use Record's built-in ID property which handles all the logic
        id = record.id

        if id is None:
            # Generate a new ID if none found
            id = str(uuid.uuid4())  # type: ignore[unreachable]
            # Set it on the record for future reference
            record.storage_id = id

    # Now perform the upsert
    if self.exists(id):
        self.update(id, record)
    else:
        # Ensure the record has the storage_id set for create
        if not record.storage_id:
            record.storage_id = id
        created_id = self.create(record)
        # Return the created ID (might be different from what we provided)
        return created_id or id
    return id
create_batch
create_batch(records: list[Record]) -> list[str]

Create multiple records in batch.

Source code in packages/data/src/dataknobs_data/database.py
def create_batch(self, records: list[Record]) -> list[str]:
    """Create multiple records in batch."""
    ids = []
    for record in records:
        id = self.create(record)
        ids.append(id)
    return ids
read_batch
read_batch(ids: list[str]) -> list[Record | None]

Read multiple records by ID.

Source code in packages/data/src/dataknobs_data/database.py
def read_batch(self, ids: list[str]) -> list[Record | None]:
    """Read multiple records by ID."""
    records = []
    for id in ids:
        record = self.read(id)
        records.append(record)
    return records
delete_batch
delete_batch(ids: list[str]) -> list[bool]

Delete multiple records by ID.

Source code in packages/data/src/dataknobs_data/database.py
def delete_batch(self, ids: list[str]) -> list[bool]:
    """Delete multiple records by ID."""
    results = []
    for id in ids:
        result = self.delete(id)
        results.append(result)
    return results
update_batch
update_batch(updates: list[tuple[str, Record]]) -> list[bool]

Update multiple records.

Default implementation calls update() for each ID/record pair. Override for better performance.

Parameters:

Name Type Description Default
updates list[tuple[str, Record]]

List of (id, record) tuples to update

required

Returns:

Type Description
list[bool]

List of success flags for each update

Source code in packages/data/src/dataknobs_data/database.py
def update_batch(self, updates: list[tuple[str, Record]]) -> list[bool]:
    """Update multiple records.

    Default implementation calls update() for each ID/record pair.
    Override for better performance.

    Args:
        updates: List of (id, record) tuples to update

    Returns:
        List of success flags for each update
    """
    results = []
    for id, record in updates:
        result = self.update(id, record)
        results.append(result)
    return results
count
count(query: Query | None = None) -> int

Count records matching a query.

Source code in packages/data/src/dataknobs_data/database.py
def count(self, query: Query | None = None) -> int:
    """Count records matching a query."""
    if query:
        results = self.search(query)
        return len(results)
    else:
        return self._count_all()
clear
clear() -> int

Clear all records from the database.

Source code in packages/data/src/dataknobs_data/database.py
def clear(self) -> int:
    """Clear all records from the database."""
    raise NotImplementedError
connect
connect() -> None

Connect to the database. Override in subclasses if needed.

Source code in packages/data/src/dataknobs_data/database.py
def connect(self) -> None:  # noqa: B027
    """Connect to the database. Override in subclasses if needed."""
close
close() -> None

Close the database connection. Override in subclasses if needed.

Source code in packages/data/src/dataknobs_data/database.py
def close(self) -> None:  # noqa: B027
    """Close the database connection. Override in subclasses if needed."""
disconnect
disconnect() -> None

Disconnect from the database (alias for close).

Source code in packages/data/src/dataknobs_data/database.py
def disconnect(self) -> None:
    """Disconnect from the database (alias for close)."""
    self.close()
__enter__
__enter__()

Context manager entry.

Source code in packages/data/src/dataknobs_data/database.py
def __enter__(self):
    """Context manager entry."""
    self.connect()
    return self
__exit__
__exit__(exc_type, exc_val, exc_tb)

Context manager exit.

Source code in packages/data/src/dataknobs_data/database.py
def __exit__(self, exc_type, exc_val, exc_tb):
    """Context manager exit."""
    self.close()
stream_read abstractmethod
stream_read(
    query: Query | None = None, config: StreamConfig | None = None
) -> Iterator[Record]

Stream records from database.

Yields records one at a time, fetching in batches internally.

Parameters:

Name Type Description Default
query Query | None

Optional query to filter records

None
config StreamConfig | None

Streaming configuration

None

Yields:

Type Description
Record

Records matching the query

Source code in packages/data/src/dataknobs_data/database.py
@abstractmethod
def stream_read(
    self,
    query: Query | None = None,
    config: StreamConfig | None = None
) -> Iterator[Record]:
    """Stream records from database.

    Yields records one at a time, fetching in batches internally.

    Args:
        query: Optional query to filter records
        config: Streaming configuration

    Yields:
        Records matching the query
    """
    raise NotImplementedError
stream_write abstractmethod
stream_write(
    records: Iterator[Record], config: StreamConfig | None = None
) -> StreamResult

Stream records into database.

Accepts an iterator and writes in batches.

Parameters:

Name Type Description Default
records Iterator[Record]

Iterator of records to write

required
config StreamConfig | None

Streaming configuration

None

Returns:

Type Description
StreamResult

Result of the streaming operation

Source code in packages/data/src/dataknobs_data/database.py
@abstractmethod
def stream_write(
    self,
    records: Iterator[Record],
    config: StreamConfig | None = None
) -> StreamResult:
    """Stream records into database.

    Accepts an iterator and writes in batches.

    Args:
        records: Iterator of records to write
        config: Streaming configuration

    Returns:
        Result of the streaming operation
    """
    raise NotImplementedError
stream_transform
stream_transform(
    query: Query | None = None,
    transform: Callable[[Record], Record | None] | None = None,
    config: StreamConfig | None = None,
) -> Iterator[Record]

Stream records through a transformation.

Default implementation, can be overridden for efficiency.

Parameters:

Name Type Description Default
query Query | None

Optional query to filter records

None
transform Callable[[Record], Record | None] | None

Optional transformation function

None
config StreamConfig | None

Streaming configuration

None

Yields:

Type Description
Record

Transformed records

Source code in packages/data/src/dataknobs_data/database.py
def stream_transform(
    self,
    query: Query | None = None,
    transform: Callable[[Record], Record | None] | None = None,
    config: StreamConfig | None = None
) -> Iterator[Record]:
    """Stream records through a transformation.

    Default implementation, can be overridden for efficiency.

    Args:
        query: Optional query to filter records
        transform: Optional transformation function
        config: Streaming configuration

    Yields:
        Transformed records
    """
    for record in self.stream_read(query, config):
        if transform:
            transformed = transform(record)
            if transformed:  # None means filter out
                yield transformed
        else:
            yield record
from_backend classmethod
from_backend(
    backend: str, config: dict[str, Any] | None = None
) -> SyncDatabase

Factory method to create and connect a synchronous database instance.

Parameters:

Name Type Description Default
backend str

The backend type ("memory", "file", "s3", "postgres", "elasticsearch")

required
config dict[str, Any] | None

Backend-specific configuration

None

Returns:

Type Description
SyncDatabase

Connected SyncDatabase instance

Source code in packages/data/src/dataknobs_data/database.py
@classmethod
def from_backend(cls, backend: str, config: dict[str, Any] | None = None) -> SyncDatabase:
    """Factory method to create and connect a synchronous database instance.

    Args:
        backend: The backend type ("memory", "file", "s3", "postgres", "elasticsearch")
        config: Backend-specific configuration

    Returns:
        Connected SyncDatabase instance
    """
    from .backends import SYNC_BACKEND_REGISTRY

    backend_class = SYNC_BACKEND_REGISTRY.get(backend)
    if not backend_class:
        raise ValueError(
            f"Unknown backend: {backend}. Available: {list(SYNC_BACKEND_REGISTRY.keys())}"
        )

    instance = backend_class(config)
    instance.connect()
    return instance

BackendNotFoundError

BackendNotFoundError(backend: str, available: list | None = None)

Bases: NotFoundError

Raised when a requested backend is not available.

Source code in packages/data/src/dataknobs_data/exceptions.py
def __init__(self, backend: str, available: list | None = None):
    self.backend = backend
    self.available = available or []
    message = f"Backend '{backend}' not found"
    if self.available:
        message += f". Available backends: {', '.join(self.available)}"
    super().__init__(message, context={"backend": backend, "available": self.available})

ConcurrencyError

ConcurrencyError(message: str)

Bases: ConcurrencyError

Raised when a concurrency conflict occurs.

Source code in packages/data/src/dataknobs_data/exceptions.py
def __init__(self, message: str):
    super().__init__(f"Concurrency error: {message}")

ConfigurationError

ConfigurationError(parameter: str, message: str)

Bases: ConfigurationError

Raised when configuration is invalid.

Source code in packages/data/src/dataknobs_data/exceptions.py
def __init__(self, parameter: str, message: str):
    self.parameter = parameter
    super().__init__(
        f"Configuration error for '{parameter}': {message}", context={"parameter": parameter}
    )

DatabaseConnectionError

DatabaseConnectionError(backend: str, message: str)

Bases: ResourceError

Raised when database connection fails.

Source code in packages/data/src/dataknobs_data/exceptions.py
def __init__(self, backend: str, message: str):
    self.backend = backend
    super().__init__(
        f"Failed to connect to {backend} backend: {message}", context={"backend": backend}
    )

DatabaseOperationError

DatabaseOperationError(operation: str, message: str)

Bases: OperationError

Raised when a database operation fails.

Source code in packages/data/src/dataknobs_data/exceptions.py
def __init__(self, operation: str, message: str):
    self.operation = operation
    super().__init__(
        f"Database operation '{operation}' failed: {message}", context={"operation": operation}
    )

FieldTypeError

FieldTypeError(field_name: str, expected_type: str, actual_type: str)

Bases: ValidationError

Raised when a field type operation fails.

Source code in packages/data/src/dataknobs_data/exceptions.py
def __init__(self, field_name: str, expected_type: str, actual_type: str):
    self.field_name = field_name
    self.expected_type = expected_type
    self.actual_type = actual_type
    super().__init__(
        f"Field '{field_name}' type mismatch: expected {expected_type}, got {actual_type}",
        context={
            "field_name": field_name,
            "expected_type": expected_type,
            "actual_type": actual_type,
        },
    )

MigrationError

MigrationError(source: str, target: str, message: str)

Bases: OperationError

Raised when data migration fails.

Source code in packages/data/src/dataknobs_data/exceptions.py
def __init__(self, source: str, target: str, message: str):
    self.source = source
    self.target = target
    super().__init__(
        f"Migration from {source} to {target} failed: {message}",
        context={"source": source, "target": target},
    )

QueryError

QueryError(message: str, query: Query | None = None)

Bases: OperationError

Raised when query execution fails.

Source code in packages/data/src/dataknobs_data/exceptions.py
def __init__(self, message: str, query: Query | None = None):
    self.query = query
    context = {"query": str(query)} if query else None
    super().__init__(f"Query error: {message}", context=context)

RecordNotFoundError

RecordNotFoundError(id: str)

Bases: NotFoundError

Raised when a requested record is not found.

Source code in packages/data/src/dataknobs_data/exceptions.py
def __init__(self, id: str):
    self.id = id
    super().__init__(f"Record with ID '{id}' not found", context={"id": id})

RecordValidationError

RecordValidationError(message: str, field_name: str | None = None)

Bases: ValidationError

Raised when record validation fails.

Source code in packages/data/src/dataknobs_data/exceptions.py
def __init__(self, message: str, field_name: str | None = None):
    self.field_name = field_name
    if field_name:
        message = f"Field '{field_name}': {message}"
    super().__init__(message, context={"field_name": field_name} if field_name else None)

SerializationError

SerializationError(format: str, message: str)

Bases: SerializationError

Raised when serialization/deserialization fails.

Source code in packages/data/src/dataknobs_data/exceptions.py
def __init__(self, format: str, message: str):
    self.format = format
    super().__init__(f"Serialization error ({format}): {message}", context={"format": format})

TransactionError

TransactionError(message: str)

Bases: OperationError

Raised when a transaction fails.

Source code in packages/data/src/dataknobs_data/exceptions.py
def __init__(self, message: str):
    super().__init__(f"Transaction error: {message}")

AsyncDatabaseFactory

Bases: FactoryBase

Factory for creating async database backends.

Note: Currently only some backends support async operations.

Methods:

Name Description
create

Create an async database instance.

Functions
create
create(**config: Any) -> Any

Create an async database instance.

Parameters:

Name Type Description Default
**config Any

Configuration including 'backend' field

{}

Returns:

Type Description
Any

Instance of appropriate async database backend

Raises:

Type Description
ValueError

If backend doesn't support async operations

Source code in packages/data/src/dataknobs_data/factory.py
def create(self, **config: Any) -> Any:
    """Create an async database instance.

    Args:
        **config: Configuration including 'backend' field

    Returns:
        Instance of appropriate async database backend

    Raises:
        ValueError: If backend doesn't support async operations
    """
    backend_type = config.pop("backend", "memory").lower()

    # Check if vector_enabled is set
    vector_enabled = config.get("vector_enabled", False)

    if vector_enabled:
        # All backends now have vector support (some native, some via Python)
        logger.debug(f"Vector support enabled for async backend: {backend_type}")

    # Get backend class from registry
    try:
        backend_class = async_backends.get(backend_type)
    except Exception as e:
        # Backend not found - provide helpful error message
        available = async_backends.list_keys()
        raise ValueError(
            f"Backend '{backend_type}' does not support async operations yet. "
            f"Available async backends: {', '.join(sorted(set(available)))}"
        ) from e

    # Create and return backend instance
    return backend_class.from_config(config)

DatabaseFactory

Bases: FactoryBase

Factory for creating database backends dynamically.

This factory allows creating different database implementations based on configuration, supporting all available backends.

Configuration Options

backend (str): Backend type (memory, file, postgres, elasticsearch, s3) **kwargs: Backend-specific configuration options

Example Configuration

databases: - name: main factory: database backend: postgres host: localhost database: myapp

  • name: cache factory: database backend: memory

  • name: archive factory: database backend: s3 bucket: my-archive-bucket prefix: archives/

Methods:

Name Description
create

Create a database instance based on configuration.

get_backend_info

Get information about a specific backend.

Functions
create
create(**config: Any) -> SyncDatabase

Create a database instance based on configuration.

Parameters:

Name Type Description Default
**config Any

Configuration including 'backend' field and backend-specific options

{}

Returns:

Type Description
SyncDatabase

Instance of appropriate database backend

Raises:

Type Description
ValueError

If backend type is not recognized or not available

Source code in packages/data/src/dataknobs_data/factory.py
def create(self, **config: Any) -> SyncDatabase:
    """Create a database instance based on configuration.

    Args:
        **config: Configuration including 'backend' field and backend-specific options

    Returns:
        Instance of appropriate database backend

    Raises:
        ValueError: If backend type is not recognized or not available
    """
    backend_type = config.pop("backend", "memory").lower()

    logger.info(f"Creating database with backend: {backend_type}")

    # Check if vector_enabled is set
    vector_enabled = config.get("vector_enabled", False)

    if vector_enabled:
        # All backends now have vector support (some native, some via Python)
        logger.debug(f"Vector support enabled for backend: {backend_type}")

    # Get backend class from registry
    try:
        backend_class = sync_backends.get(backend_type)
    except Exception as e:
        # Backend not found - provide helpful error message
        available = sync_backends.list_keys()
        raise ValueError(
            f"Unknown backend type: {backend_type}. "
            f"Available backends: {', '.join(sorted(set(available)))}"
        ) from e

    # Create and return backend instance
    return backend_class.from_config(config)
get_backend_info
get_backend_info(backend_type: str) -> dict[str, Any]

Get information about a specific backend.

Parameters:

Name Type Description Default
backend_type str

Name of the backend

required

Returns:

Type Description
dict[str, Any]

Dictionary with backend information from registry metadata

Source code in packages/data/src/dataknobs_data/factory.py
def get_backend_info(self, backend_type: str) -> dict[str, Any]:
    """Get information about a specific backend.

    Args:
        backend_type: Name of the backend

    Returns:
        Dictionary with backend information from registry metadata
    """
    # Normalize to lowercase for case-insensitive lookup
    backend_type = backend_type.lower()

    # Check if backend exists first
    if not sync_backends.has(backend_type):
        return {
            "description": "Unknown backend",
            "error": f"Backend '{backend_type}' not recognized",
        }

    # Get metadata from registry
    metrics = sync_backends.get_metrics(backend_type)
    return metrics.get("metadata", {})

Field dataclass

Field(
    name: str,
    value: Any,
    type: FieldType | None = None,
    metadata: dict[str, Any] = dict(),
)

Represents a single field in a record.

A Field encapsulates a named value along with its type and optional metadata. Field types are automatically detected if not explicitly provided.

Attributes:

Name Type Description
name str

The field name

value Any

The field value (can be any Python type)

type FieldType | None

The field type (auto-detected if None)

metadata dict[str, Any]

Optional metadata dictionary

Example
from dataknobs_data import Field, FieldType

# Auto-detected type
name = Field(name="name", value="Alice")
print(name.type)  # FieldType.STRING

# Explicit type
score = Field(name="score", value=95.5, type=FieldType.FLOAT)

# With metadata
vector = Field(
    name="embedding",
    value=[0.1, 0.2, 0.3],
    type=FieldType.VECTOR,
    metadata={"dimensions": 3, "model": "text-embedding-3-small"}
)

# Validation
is_valid = name.validate()  # True

# Type conversion
str_score = score.convert_to(FieldType.STRING)
print(str_score.value)  # "95.5"

Methods:

Name Description
__post_init__

Auto-detect type if not provided.

copy

Create a deep copy of the field.

validate

Validate that the value matches the field type.

convert_to

Convert the field to a different type.

to_dict

Convert the field to a dictionary representation.

from_dict

Create a field from a dictionary representation.

Functions
__post_init__
__post_init__()

Auto-detect type if not provided.

Source code in packages/data/src/dataknobs_data/fields.py
def __post_init__(self):
    """Auto-detect type if not provided."""
    if self.type is None:
        self.type = self._detect_type(self.value)
copy
copy() -> Field

Create a deep copy of the field.

Source code in packages/data/src/dataknobs_data/fields.py
def copy(self) -> Field:
    """Create a deep copy of the field."""
    return Field(
        name=self.name,
        value=copy.deepcopy(self.value),
        type=self.type,
        metadata=copy.deepcopy(self.metadata)
    )
validate
validate() -> bool

Validate that the value matches the field type.

Returns:

Type Description
bool

True if the value is valid for the field type, False otherwise

Example
# Valid field
age = Field(name="age", value=30, type=FieldType.INTEGER)
print(age.validate())  # True

# Invalid field (wrong type for value)
bad_field = Field(name="count", value="not a number", type=FieldType.INTEGER)
print(bad_field.validate())  # False
Source code in packages/data/src/dataknobs_data/fields.py
def validate(self) -> bool:
    """Validate that the value matches the field type.

    Returns:
        True if the value is valid for the field type, False otherwise

    Example:
        ```python
        # Valid field
        age = Field(name="age", value=30, type=FieldType.INTEGER)
        print(age.validate())  # True

        # Invalid field (wrong type for value)
        bad_field = Field(name="count", value="not a number", type=FieldType.INTEGER)
        print(bad_field.validate())  # False
        ```
    """
    if self.value is None:
        return True

    type_validators = {
        FieldType.STRING: lambda v: isinstance(v, str),
        FieldType.INTEGER: lambda v: isinstance(v, int) and not isinstance(v, bool),
        FieldType.FLOAT: lambda v: isinstance(v, (int, float)) and not isinstance(v, bool),
        FieldType.BOOLEAN: lambda v: isinstance(v, bool),
        FieldType.DATETIME: lambda v: isinstance(v, datetime),
        FieldType.JSON: lambda v: isinstance(v, (dict, list)),
        FieldType.BINARY: lambda v: isinstance(v, bytes),
        FieldType.TEXT: lambda v: isinstance(v, str),
    }

    if self.type is None:
        return True
    validator = type_validators.get(self.type)
    if validator:
        return validator(self.value)
    return True
convert_to
convert_to(target_type: FieldType) -> Field

Convert the field to a different type.

Parameters:

Name Type Description Default
target_type FieldType

The target FieldType to convert to

required

Returns:

Type Description
Field

A new Field with the converted value and type

Raises:

Type Description
ValueError

If conversion is not possible or fails

Example
# Integer to string
age = Field(name="age", value=30, type=FieldType.INTEGER)
age_str = age.convert_to(FieldType.STRING)
print(age_str.value)  # "30"

# String to integer
count = Field(name="count", value="42", type=FieldType.STRING)
count_int = count.convert_to(FieldType.INTEGER)
print(count_int.value)  # 42
Source code in packages/data/src/dataknobs_data/fields.py
def convert_to(self, target_type: FieldType) -> Field:
    """Convert the field to a different type.

    Args:
        target_type: The target FieldType to convert to

    Returns:
        A new Field with the converted value and type

    Raises:
        ValueError: If conversion is not possible or fails

    Example:
        ```python
        # Integer to string
        age = Field(name="age", value=30, type=FieldType.INTEGER)
        age_str = age.convert_to(FieldType.STRING)
        print(age_str.value)  # "30"

        # String to integer
        count = Field(name="count", value="42", type=FieldType.STRING)
        count_int = count.convert_to(FieldType.INTEGER)
        print(count_int.value)  # 42
        ```
    """
    if self.type == target_type:
        return self

    converters: dict[tuple[FieldType, FieldType], Callable[[Any], Any]] = {
        (FieldType.INTEGER, FieldType.STRING): str,
        (FieldType.INTEGER, FieldType.FLOAT): float,
        (FieldType.FLOAT, FieldType.STRING): str,
        (FieldType.FLOAT, FieldType.INTEGER): int,
        (FieldType.BOOLEAN, FieldType.STRING): lambda v: "true" if v else "false",
        (FieldType.BOOLEAN, FieldType.INTEGER): int,
        (FieldType.STRING, FieldType.INTEGER): int,
        (FieldType.STRING, FieldType.FLOAT): float,
        (FieldType.STRING, FieldType.BOOLEAN): lambda v: v.lower() in ("true", "1", "yes"),
        (FieldType.STRING, FieldType.TEXT): lambda v: v,
        (FieldType.TEXT, FieldType.STRING): lambda v: v,
    }

    if self.type is None:
        raise ValueError(f"Cannot convert {self.name} from None to {target_type}")

    converter_key = (self.type, target_type)
    if converter_key in converters:
        try:
            converter = converters[converter_key]
            new_value = converter(self.value)
            return Field(
                name=self.name, value=new_value, type=target_type, metadata=self.metadata.copy()
            )
        except (ValueError, TypeError) as e:
            raise ValueError(
                f"Cannot convert {self.name} from {self.type} to {target_type}: {e}"
            ) from e
    else:
        raise ValueError(f"No converter available from {self.type} to {target_type}")
to_dict
to_dict() -> dict[str, Any]

Convert the field to a dictionary representation.

Source code in packages/data/src/dataknobs_data/fields.py
def to_dict(self) -> dict[str, Any]:
    """Convert the field to a dictionary representation."""
    return {
        "name": self.name,
        "value": self.value,
        "type": self.type.value if self.type else None,
        "metadata": self.metadata,
    }
from_dict classmethod
from_dict(data: dict[str, Any]) -> Field

Create a field from a dictionary representation.

Source code in packages/data/src/dataknobs_data/fields.py
@classmethod
def from_dict(cls, data: dict[str, Any]) -> Field:
    """Create a field from a dictionary representation."""
    field_type = None
    if data.get("type"):
        field_type = FieldType(data["type"])

    # Handle vector fields specially
    if field_type in (FieldType.VECTOR, FieldType.SPARSE_VECTOR):
        return VectorField.from_dict(data)

    return cls(
        name=data["name"],
        value=data["value"],
        type=field_type,
        metadata=data.get("metadata", {}),
    )

FieldType

Bases: Enum

Enumeration of supported field types.

Defines the data types that can be stored in Record fields. Field types enable type validation, schema enforcement, and backend-specific optimizations.

Attributes:

Name Type Description
STRING

Short text (< 1000 chars)

TEXT

Long text content

INTEGER

Whole numbers

FLOAT

Decimal numbers

BOOLEAN

True/False values

DATETIME

Date and time values

JSON

Structured JSON data (dicts, lists)

BINARY

Binary data (bytes)

VECTOR

Dense vector embeddings for similarity search

SPARSE_VECTOR

Sparse vector representations

Example
from dataknobs_data import Field, FieldType

# Create typed fields
name_field = Field(name="name", value="Alice", type=FieldType.STRING)
age_field = Field(name="age", value=30, type=FieldType.INTEGER)
tags_field = Field(name="tags", value=["python", "data"], type=FieldType.JSON)

# Auto-detection (type is inferred from value)
auto_field = Field(name="score", value=95.5)  # Auto-detected as FLOAT

VectorField

VectorField(
    value: ndarray | list[float],
    name: str | None = None,
    dimensions: int | None = None,
    source_field: str | None = None,
    model_name: str | None = None,
    model_version: str | None = None,
    metadata: dict[str, Any] | None = None,
)

Bases: Field

Represents a vector field with embeddings and metadata.

Examples:

Simple usage - name optional when used in Record

record = Record({ "embedding": VectorField(value=[0.1, 0.2, 0.3]) })

With explicit configuration

import numpy as np embedding_array = np.array([0.1, 0.2, 0.3]) field = VectorField( value=embedding_array, name="doc_embedding", model_name="all-MiniLM-L6-v2", source_field="content" )

From text using embedding function

def my_embedding_fn(text): # In practice, use a real model like sentence-transformers return np.array([0.1, 0.2, 0.3])

field = VectorField.from_text( "This is the text to embed", embedding_fn=my_embedding_fn )

Initialize a vector field.

Parameters:

Name Type Description Default
value ndarray | list[float]

Vector data as numpy array or list of floats

required
name str | None

Field name (optional, defaults to "embedding")

None
dimensions int | None

Expected dimensions (auto-detected if not provided)

None
source_field str | None

Name of the text field this vector was generated from

None
model_name str | None

Name of the embedding model used

None
model_version str | None

Version of the embedding model

None
metadata dict[str, Any] | None

Additional metadata

None

Methods:

Name Description
from_text

Create a VectorField from text using an embedding function.

validate

Validate the vector field.

to_list

Convert vector to a list of floats.

cosine_similarity

Compute cosine similarity with another vector.

euclidean_distance

Compute Euclidean distance to another vector.

to_dict

Convert to dictionary representation.

from_dict

Create from dictionary representation.

Source code in packages/data/src/dataknobs_data/fields.py
def __init__(
    self,
    value: np.ndarray | list[float],
    name: str | None = None,  # Made optional
    dimensions: int | None = None,  # Auto-detected from value
    source_field: str | None = None,
    model_name: str | None = None,
    model_version: str | None = None,
    metadata: dict[str, Any] | None = None,
):
    """Initialize a vector field.

    Args:
        value: Vector data as numpy array or list of floats
        name: Field name (optional, defaults to "embedding")
        dimensions: Expected dimensions (auto-detected if not provided)
        source_field: Name of the text field this vector was generated from
        model_name: Name of the embedding model used
        model_version: Version of the embedding model
        metadata: Additional metadata
    """
    # Import numpy lazily to avoid hard dependency
    try:
        import numpy as np
    except ImportError as e:
        raise ImportError(
            "numpy is required for vector fields. Install with: pip install numpy"
        ) from e

    # Set default name if not provided
    if name is None:
        name = "embedding"

    # Convert to numpy array if needed
    if isinstance(value, list):
        value = np.array(value, dtype=np.float32)
    elif isinstance(value, np.ndarray):
        # Ensure float32 dtype for consistency
        if value.dtype != np.float32:
            value = value.astype(np.float32)
    else:
        raise TypeError(
            f"Vector value must be numpy array or list, got {type(value)}"
        )

    # Auto-detect dimensions if not provided
    actual_dims = len(value) if value.ndim == 1 else value.shape[-1]
    if dimensions is None:
        dimensions = actual_dims
    elif dimensions != actual_dims:
        raise ValueError(
            f"Vector dimension mismatch for field '{name}': "
            f"expected {dimensions}, got {actual_dims}"
        )

    # Store vector metadata
    vector_metadata = metadata or {}
    vector_metadata.update({
        "dimensions": dimensions,
        "source_field": source_field,
        "model": {
            "name": model_name,
            "version": model_version,
        } if model_name else None,
    })

    super().__init__(
        name=name,
        value=value,
        type=FieldType.VECTOR,
        metadata=vector_metadata,
    )

    self.dimensions = dimensions
    self.source_field = source_field
    self.model_name = model_name
    self.model_version = model_version
Functions
from_text classmethod
from_text(
    text: str,
    embedding_fn: Callable[[str], Any],
    name: str | None = None,
    dimensions: int | None = None,
    model_name: str | None = None,
    model_version: str | None = None,
    **kwargs: Any,
) -> VectorField

Create a VectorField from text using an embedding function.

Parameters:

Name Type Description Default
text str

Text to embed

required
embedding_fn Callable[[str], Any]

Function that takes text and returns embedding vector

required
name str | None

Field name (optional, defaults to "embedding")

None
dimensions int | None

Expected dimensions (auto-detected if not provided)

None
model_name str | None

Name of the embedding model

None
model_version str | None

Version of the embedding model

None
**kwargs Any

Additional arguments passed to VectorField constructor

{}

Returns:

Type Description
VectorField

VectorField instance with the generated embedding

Example

field = VectorField.from_text( "Machine learning is fascinating", embedding_fn=model.encode, model_name="all-MiniLM-L6-v2" )

Source code in packages/data/src/dataknobs_data/fields.py
@classmethod
def from_text(
    cls,
    text: str,
    embedding_fn: Callable[[str], Any],
    name: str | None = None,
    dimensions: int | None = None,
    model_name: str | None = None,
    model_version: str | None = None,
    **kwargs: Any
) -> VectorField:
    """Create a VectorField from text using an embedding function.

    Args:
        text: Text to embed
        embedding_fn: Function that takes text and returns embedding vector
        name: Field name (optional, defaults to "embedding")
        dimensions: Expected dimensions (auto-detected if not provided)
        model_name: Name of the embedding model
        model_version: Version of the embedding model
        **kwargs: Additional arguments passed to VectorField constructor

    Returns:
        VectorField instance with the generated embedding

    Example:
        field = VectorField.from_text(
            "Machine learning is fascinating",
            embedding_fn=model.encode,
            model_name="all-MiniLM-L6-v2"
        )
    """
    embedding = embedding_fn(text)
    return cls(
        value=embedding,
        name=name,
        dimensions=dimensions,
        source_field="text",  # Indicate it came from text
        model_name=model_name,
        model_version=model_version,
        **kwargs
    )
validate
validate() -> bool

Validate the vector field.

Source code in packages/data/src/dataknobs_data/fields.py
def validate(self) -> bool:
    """Validate the vector field."""
    if self.value is None:
        return True

    try:
        import numpy as np

        if not isinstance(self.value, np.ndarray):
            return False

        if self.value.ndim not in (1, 2):
            return False

        # Check dimensions match metadata
        actual_dims = len(self.value) if self.value.ndim == 1 else self.value.shape[-1]
        expected_dims = self.metadata.get("dimensions")
        if expected_dims and actual_dims != expected_dims:
            return False

        return True
    except ImportError:
        return False
to_list
to_list() -> list[float]

Convert vector to a list of floats.

Source code in packages/data/src/dataknobs_data/fields.py
def to_list(self) -> list[float]:
    """Convert vector to a list of floats."""
    import numpy as np

    if isinstance(self.value, np.ndarray):
        return self.value.tolist()
    return list(self.value)
cosine_similarity
cosine_similarity(other: VectorField | ndarray | list[float]) -> float

Compute cosine similarity with another vector.

Source code in packages/data/src/dataknobs_data/fields.py
def cosine_similarity(self, other: VectorField | np.ndarray | list[float]) -> float:
    """Compute cosine similarity with another vector."""
    import numpy as np

    if isinstance(other, VectorField):
        other_vec = other.value
    elif isinstance(other, list):
        other_vec = np.array(other, dtype=np.float32)
    else:
        other_vec = other

    # Compute cosine similarity
    dot_product = np.dot(self.value, other_vec)
    norm_a = np.linalg.norm(self.value)
    norm_b = np.linalg.norm(other_vec)

    if norm_a == 0 or norm_b == 0:
        return 0.0

    return float(dot_product / (norm_a * norm_b))
euclidean_distance
euclidean_distance(other: VectorField | ndarray | list[float]) -> float

Compute Euclidean distance to another vector.

Source code in packages/data/src/dataknobs_data/fields.py
def euclidean_distance(self, other: VectorField | np.ndarray | list[float]) -> float:
    """Compute Euclidean distance to another vector."""
    import numpy as np

    if isinstance(other, VectorField):
        other_vec = other.value
    elif isinstance(other, list):
        other_vec = np.array(other, dtype=np.float32)
    else:
        other_vec = other

    return float(np.linalg.norm(self.value - other_vec))
to_dict
to_dict() -> dict[str, Any]

Convert to dictionary representation.

Source code in packages/data/src/dataknobs_data/fields.py
def to_dict(self) -> dict[str, Any]:
    """Convert to dictionary representation."""
    return {
        "name": self.name,
        "value": self.to_list(),
        "type": self.type.value,
        "metadata": self.metadata,
        "dimensions": self.dimensions,
    }
from_dict classmethod
from_dict(data: dict[str, Any]) -> VectorField

Create from dictionary representation.

Source code in packages/data/src/dataknobs_data/fields.py
@classmethod
def from_dict(cls, data: dict[str, Any]) -> VectorField:
    """Create from dictionary representation."""
    metadata = data.get("metadata", {})
    model_info = metadata.get("model", {})

    return cls(
        name=data["name"],
        value=data["value"],
        dimensions=data.get("dimensions") or metadata.get("dimensions"),
        source_field=metadata.get("source_field"),
        model_name=model_info.get("name") if model_info else None,
        model_version=model_info.get("version") if model_info else None,
        metadata=metadata,
    )

Filter dataclass

Filter(field: str, operator: Operator, value: Any = None)

Represents a filter condition.

A Filter combines a field name, an operator, and a value to create a query condition. Multiple filters can be combined in a Query for complex filtering.

Attributes:

Name Type Description
field str

The field name to filter on

operator Operator

The comparison operator

value Any

The value to compare against (optional for EXISTS/NOT_EXISTS operators)

Example
from dataknobs_data import Filter, Operator, Query, database_factory

# Create filters
age_filter = Filter("age", Operator.GT, 25)
name_filter = Filter("name", Operator.LIKE, "A%")
status_filter = Filter("status", Operator.IN, ["active", "pending"])

# Use in query
query = Query(filters=[age_filter, name_filter])

# Search database
db = database_factory("memory")
results = db.search(query)

Methods:

Name Description
matches

Check if a record value matches this filter.

to_dict

Convert filter to dictionary representation.

from_dict

Create filter from dictionary representation.

Functions
matches
matches(record_value: Any) -> bool

Check if a record value matches this filter.

Supports type-aware comparisons for ranges and special handling for datetime/date objects.

Source code in packages/data/src/dataknobs_data/query.py
def matches(self, record_value: Any) -> bool:
    """Check if a record value matches this filter.

    Supports type-aware comparisons for ranges and special handling
    for datetime/date objects.
    """
    if self.operator == Operator.EXISTS:
        return record_value is not None
    elif self.operator == Operator.NOT_EXISTS:
        return record_value is None
    elif record_value is None:
        return False

    if self.operator == Operator.EQ:
        return record_value == self.value
    elif self.operator == Operator.NEQ:
        return record_value != self.value
    elif self.operator == Operator.GT:
        return self._compare_values(record_value, self.value, lambda a, b: a > b)
    elif self.operator == Operator.GTE:
        return self._compare_values(record_value, self.value, lambda a, b: a >= b)
    elif self.operator == Operator.LT:
        return self._compare_values(record_value, self.value, lambda a, b: a < b)
    elif self.operator == Operator.LTE:
        return self._compare_values(record_value, self.value, lambda a, b: a <= b)
    elif self.operator == Operator.IN:
        return record_value in self.value
    elif self.operator == Operator.NOT_IN:
        return record_value not in self.value
    elif self.operator == Operator.BETWEEN:
        if not isinstance(self.value, (list, tuple)) or len(self.value) != 2:
            return False
        lower, upper = self.value
        return self._compare_values(record_value, lower, lambda a, b: a >= b) and \
               self._compare_values(record_value, upper, lambda a, b: a <= b)
    elif self.operator == Operator.NOT_BETWEEN:
        if not isinstance(self.value, (list, tuple)) or len(self.value) != 2:
            return True
        lower, upper = self.value
        return not (self._compare_values(record_value, lower, lambda a, b: a >= b) and \
                   self._compare_values(record_value, upper, lambda a, b: a <= b))
    elif self.operator == Operator.LIKE:
        if not isinstance(record_value, str):
            return False
        import re

        pattern = self.value.replace("%", ".*").replace("_", ".")
        return bool(re.match(f"^{pattern}$", record_value, re.IGNORECASE))
    elif self.operator == Operator.NOT_LIKE:
        if not isinstance(record_value, str):
            return False
        import re

        pattern = self.value.replace("%", ".*").replace("_", ".")
        return not bool(re.match(f"^{pattern}$", record_value, re.IGNORECASE))
    elif self.operator == Operator.REGEX:
        if not isinstance(record_value, str):
            return False
        import re

        return bool(re.search(self.value, record_value))
    else:
        # This should never be reached as all operators are handled above
        raise ValueError(f"Unknown operator: {self.operator}")
to_dict
to_dict() -> dict[str, Any]

Convert filter to dictionary representation.

Source code in packages/data/src/dataknobs_data/query.py
def to_dict(self) -> dict[str, Any]:
    """Convert filter to dictionary representation."""
    return {"field": self.field, "operator": self.operator.value, "value": self.value}
from_dict classmethod
from_dict(data: dict[str, Any]) -> Filter

Create filter from dictionary representation.

Source code in packages/data/src/dataknobs_data/query.py
@classmethod
def from_dict(cls, data: dict[str, Any]) -> Filter:
    """Create filter from dictionary representation."""
    return cls(
        field=data["field"], operator=Operator(data["operator"]), value=data.get("value")
    )

Operator

Bases: Enum

Query operators for filtering.

Operators used to build filter conditions in queries. Supports comparison, pattern matching, existence checks, and range queries.

Example
from dataknobs_data import Filter, Operator, Query

# Equality
filter_eq = Filter("age", Operator.EQ, 30)

# Comparison
filter_gt = Filter("score", Operator.GT, 90)

# Pattern matching (SQL LIKE)
filter_like = Filter("name", Operator.LIKE, "A%")  # Names starting with 'A'

# IN operator
filter_in = Filter("status", Operator.IN, ["active", "pending"])

# Range query
filter_between = Filter("age", Operator.BETWEEN, [20, 40])

# Build query
query = Query(filters=[filter_gt, filter_like])

Query dataclass

Query(
    filters: list[Filter] = list(),
    sort_specs: list[SortSpec] = list(),
    limit_value: int | None = None,
    offset_value: int | None = None,
    fields: list[str] | None = None,
    vector_query: VectorQuery | None = None,
)

Represents a database query with filters, sorting, pagination, and vector search.

A Query combines multiple filter conditions, sort specifications, and pagination options to retrieve records from a database. Supports fluent interface for building queries.

Attributes:

Name Type Description
filters list[Filter]

List of filter conditions

sort_specs list[SortSpec]

List of sort specifications

limit_value int | None

Maximum number of results

offset_value int | None

Number of results to skip

fields list[str] | None

List of field names to include (projection)

vector_query VectorQuery | None

Optional vector similarity search parameters

Example
from dataknobs_data import Query, Filter, Operator, SortOrder, SortSpec, database_factory

# Simple query with filters
query = Query(
    filters=[
        Filter("age", Operator.GT, 25),
        Filter("status", Operator.EQ, "active")
    ]
)

# Using fluent interface
query = (
    Query()
    .filter("age", Operator.GT, 25)
    .filter("status", Operator.EQ, "active")
    .sort_by("age", SortOrder.DESC)
    .limit(10)
    .offset(20)
)

# With field projection
query = (
    Query()
    .filter("age", Operator.GT, 25)
    .select("name", "age", "email")
)

# Execute query
db = database_factory("memory")
results = db.search(query)

Methods:

Name Description
filter

Add a filter to the query (fluent interface).

sort_by

Add a sort specification to the query (fluent interface).

sort

Add sorting (fluent interface).

set_limit

Set the result limit (fluent interface).

limit

Set limit (fluent interface).

set_offset

Set the result offset (fluent interface).

offset

Set offset (fluent interface).

select

Set field projection (fluent interface).

clear_filters

Clear all filters (fluent interface).

clear_sort

Clear all sort specifications (fluent interface).

similar_to

Add vector similarity search to the query.

near_text

Add text-based vector similarity search to the query.

hybrid

Create a hybrid query combining text and vector search.

with_reranking

Enable result reranking for vector queries.

clear_vector

Clear vector search from the query (fluent interface).

to_dict

Convert query to dictionary representation.

from_dict

Create query from dictionary representation.

copy

Create a copy of the query.

or_

Create a ComplexQuery with OR logic.

and_

Add more filters with AND logic (convenience method).

not_

Create a ComplexQuery with NOT logic.

Attributes
sort_property property
sort_property: list[SortSpec]

Get sort specifications (backward compatibility).

limit_property property
limit_property: int | None

Get limit value (backward compatibility).

offset_property property
offset_property: int | None

Get offset value (backward compatibility).

Functions
filter
filter(field: str, operator: str | Operator, value: Any = None) -> Query

Add a filter to the query (fluent interface).

Parameters:

Name Type Description Default
field str

The field name to filter on

required
operator str | Operator

The operator (string or Operator enum)

required
value Any

The value to compare against

None

Returns:

Type Description
Query

Self for method chaining

Source code in packages/data/src/dataknobs_data/query.py
def filter(self, field: str, operator: str | Operator, value: Any = None) -> Query:
    """Add a filter to the query (fluent interface).

    Args:
        field: The field name to filter on
        operator: The operator (string or Operator enum)
        value: The value to compare against

    Returns:
        Self for method chaining
    """
    if isinstance(operator, str):
        op_map = {
            "=": Operator.EQ,
            "==": Operator.EQ,
            "!=": Operator.NEQ,
            ">": Operator.GT,
            ">=": Operator.GTE,
            "<": Operator.LT,
            "<=": Operator.LTE,
            "in": Operator.IN,
            "IN": Operator.IN,
            "not_in": Operator.NOT_IN,
            "NOT IN": Operator.NOT_IN,
            "like": Operator.LIKE,
            "LIKE": Operator.LIKE,
            "regex": Operator.REGEX,
            "exists": Operator.EXISTS,
            "not_exists": Operator.NOT_EXISTS,
            "between": Operator.BETWEEN,
            "BETWEEN": Operator.BETWEEN,
            "not_between": Operator.NOT_BETWEEN,
            "NOT BETWEEN": Operator.NOT_BETWEEN,
        }
        operator = op_map.get(operator, Operator.EQ)

    self.filters.append(Filter(field=field, operator=operator, value=value))
    return self
sort_by
sort_by(field: str, order: str | SortOrder = 'asc') -> Query

Add a sort specification to the query (fluent interface).

Parameters:

Name Type Description Default
field str

The field name to sort by

required
order str | SortOrder

The sort order ("asc", "desc", or SortOrder enum)

'asc'

Returns:

Type Description
Query

Self for method chaining

Source code in packages/data/src/dataknobs_data/query.py
def sort_by(self, field: str, order: str | SortOrder = "asc") -> Query:
    """Add a sort specification to the query (fluent interface).

    Args:
        field: The field name to sort by
        order: The sort order ("asc", "desc", or SortOrder enum)

    Returns:
        Self for method chaining
    """
    if isinstance(order, str):
        order = SortOrder.ASC if order.lower() == "asc" else SortOrder.DESC

    self.sort_specs.append(SortSpec(field=field, order=order))
    return self
sort
sort(field: str, order: str | SortOrder = 'asc') -> Query

Add sorting (fluent interface).

Source code in packages/data/src/dataknobs_data/query.py
def sort(self, field: str, order: str | SortOrder = "asc") -> Query:
    """Add sorting (fluent interface)."""
    return self.sort_by(field, order)
set_limit
set_limit(limit: int) -> Query

Set the result limit (fluent interface).

Parameters:

Name Type Description Default
limit int

Maximum number of results

required

Returns:

Type Description
Query

Self for method chaining

Source code in packages/data/src/dataknobs_data/query.py
def set_limit(self, limit: int) -> Query:
    """Set the result limit (fluent interface).

    Args:
        limit: Maximum number of results

    Returns:
        Self for method chaining
    """
    self.limit_value = limit
    return self
limit
limit(value: int) -> Query

Set limit (fluent interface).

Source code in packages/data/src/dataknobs_data/query.py
def limit(self, value: int) -> Query:
    """Set limit (fluent interface)."""
    return self.set_limit(value)
set_offset
set_offset(offset: int) -> Query

Set the result offset (fluent interface).

Parameters:

Name Type Description Default
offset int

Number of results to skip

required

Returns:

Type Description
Query

Self for method chaining

Source code in packages/data/src/dataknobs_data/query.py
def set_offset(self, offset: int) -> Query:
    """Set the result offset (fluent interface).

    Args:
        offset: Number of results to skip

    Returns:
        Self for method chaining
    """
    self.offset_value = offset
    return self
offset
offset(value: int) -> Query

Set offset (fluent interface).

Source code in packages/data/src/dataknobs_data/query.py
def offset(self, value: int) -> Query:
    """Set offset (fluent interface)."""
    return self.set_offset(value)
select
select(*fields: str) -> Query

Set field projection (fluent interface).

Parameters:

Name Type Description Default
fields str

Field names to include in results

()

Returns:

Type Description
Query

Self for method chaining

Source code in packages/data/src/dataknobs_data/query.py
def select(self, *fields: str) -> Query:
    """Set field projection (fluent interface).

    Args:
        fields: Field names to include in results

    Returns:
        Self for method chaining
    """
    self.fields = list(fields) if fields else None
    return self
clear_filters
clear_filters() -> Query

Clear all filters (fluent interface).

Source code in packages/data/src/dataknobs_data/query.py
def clear_filters(self) -> Query:
    """Clear all filters (fluent interface)."""
    self.filters = []
    return self
clear_sort
clear_sort() -> Query

Clear all sort specifications (fluent interface).

Source code in packages/data/src/dataknobs_data/query.py
def clear_sort(self) -> Query:
    """Clear all sort specifications (fluent interface)."""
    self.sort_specs = []
    return self
similar_to
similar_to(
    vector: ndarray | list[float],
    field: str = "embedding",
    k: int = 10,
    metric: DistanceMetric | str = "cosine",
    include_source: bool = True,
    score_threshold: float | None = None,
) -> Query

Add vector similarity search to the query.

This method sets up a vector similarity search that will find the k most similar vectors to the provided query vector.

Parameters:

Name Type Description Default
vector ndarray | list[float]

Query vector to search for similar vectors

required
field str

Vector field name to search (default: "embedding")

'embedding'
k int

Number of results to return (default: 10)

10
metric DistanceMetric | str

Distance metric to use (default: "cosine")

'cosine'
include_source bool

Whether to include source text in results (default: True)

True
score_threshold float | None

Minimum similarity score threshold (optional)

None

Returns:

Type Description
Query

Self for method chaining

Source code in packages/data/src/dataknobs_data/query.py
def similar_to(
    self,
    vector: np.ndarray | list[float],
    field: str = "embedding",
    k: int = 10,
    metric: DistanceMetric | str = "cosine",
    include_source: bool = True,
    score_threshold: float | None = None,
) -> Query:
    """Add vector similarity search to the query.

    This method sets up a vector similarity search that will find the k most
    similar vectors to the provided query vector.

    Args:
        vector: Query vector to search for similar vectors
        field: Vector field name to search (default: "embedding")
        k: Number of results to return (default: 10)
        metric: Distance metric to use (default: "cosine")
        include_source: Whether to include source text in results (default: True)
        score_threshold: Minimum similarity score threshold (optional)

    Returns:
        Self for method chaining
    """
    self.vector_query = VectorQuery(
        vector=vector,
        field_name=field,
        k=k,
        metric=metric,
        include_source=include_source,
        score_threshold=score_threshold,
    )
    # Always update limit to match k
    self.limit_value = k
    return self
near_text
near_text(
    text: str,
    embedding_fn: Callable[[str], ndarray],
    field: str = "embedding",
    k: int = 10,
    metric: DistanceMetric | str = "cosine",
    include_source: bool = True,
    score_threshold: float | None = None,
) -> Query

Add text-based vector similarity search to the query.

This is a convenience method that converts text to a vector using the provided embedding function, then performs vector similarity search.

Parameters:

Name Type Description Default
text str

Text to convert to vector for similarity search

required
embedding_fn Callable[[str], ndarray]

Function to convert text to vector

required
field str

Vector field name to search (default: "embedding")

'embedding'
k int

Number of results to return (default: 10)

10
metric DistanceMetric | str

Distance metric to use (default: "cosine")

'cosine'
include_source bool

Whether to include source text in results (default: True)

True
score_threshold float | None

Minimum similarity score threshold (optional)

None

Returns:

Type Description
Query

Self for method chaining

Source code in packages/data/src/dataknobs_data/query.py
def near_text(
    self,
    text: str,
    embedding_fn: Callable[[str], np.ndarray],
    field: str = "embedding",
    k: int = 10,
    metric: DistanceMetric | str = "cosine",
    include_source: bool = True,
    score_threshold: float | None = None,
) -> Query:
    """Add text-based vector similarity search to the query.

    This is a convenience method that converts text to a vector using the
    provided embedding function, then performs vector similarity search.

    Args:
        text: Text to convert to vector for similarity search
        embedding_fn: Function to convert text to vector
        field: Vector field name to search (default: "embedding")
        k: Number of results to return (default: 10)
        metric: Distance metric to use (default: "cosine")
        include_source: Whether to include source text in results (default: True)
        score_threshold: Minimum similarity score threshold (optional)

    Returns:
        Self for method chaining
    """
    # Convert text to vector using provided embedding function
    vector = embedding_fn(text)
    return self.similar_to(
        vector=vector,
        field=field,
        k=k,
        metric=metric,
        include_source=include_source,
        score_threshold=score_threshold,
    )
hybrid
hybrid(
    text_query: str | None = None,
    vector: ndarray | list[float] | None = None,
    text_field: str = "content",
    vector_field: str = "embedding",
    alpha: float = 0.5,
    k: int = 10,
    metric: DistanceMetric | str = "cosine",
) -> Query

Create a hybrid query combining text and vector search.

This method combines traditional text search with vector similarity search, allowing for more nuanced queries that leverage both exact text matching and semantic similarity.

Parameters:

Name Type Description Default
text_query str | None

Text to search for (optional)

None
vector ndarray | list[float] | None

Vector for similarity search (optional)

None
text_field str

Field for text search (default: "content")

'content'
vector_field str

Field for vector search (default: "embedding")

'embedding'
alpha float

Weight balance between text (0.0) and vector (1.0) search (default: 0.5)

0.5
k int

Number of results to return (default: 10)

10
metric DistanceMetric | str

Distance metric for vector search (default: "cosine")

'cosine'

Returns:

Type Description
Query

Self for method chaining

Note
  • alpha=0.0 gives full weight to text search
  • alpha=1.0 gives full weight to vector search
  • alpha=0.5 gives equal weight to both
Source code in packages/data/src/dataknobs_data/query.py
def hybrid(
    self,
    text_query: str | None = None,
    vector: np.ndarray | list[float] | None = None,
    text_field: str = "content",
    vector_field: str = "embedding",
    alpha: float = 0.5,
    k: int = 10,
    metric: DistanceMetric | str = "cosine",
) -> Query:
    """Create a hybrid query combining text and vector search.

    This method combines traditional text search with vector similarity search,
    allowing for more nuanced queries that leverage both exact text matching
    and semantic similarity.

    Args:
        text_query: Text to search for (optional)
        vector: Vector for similarity search (optional)
        text_field: Field for text search (default: "content")
        vector_field: Field for vector search (default: "embedding")
        alpha: Weight balance between text (0.0) and vector (1.0) search (default: 0.5)
        k: Number of results to return (default: 10)
        metric: Distance metric for vector search (default: "cosine")

    Returns:
        Self for method chaining

    Note:
        - alpha=0.0 gives full weight to text search
        - alpha=1.0 gives full weight to vector search
        - alpha=0.5 gives equal weight to both
    """
    # Add text filter if provided
    if text_query:
        self.filter(text_field, Operator.LIKE, f"%{text_query}%")

    # Add vector search if provided
    if vector is not None:
        self.vector_query = VectorQuery(
            vector=vector,
            field_name=vector_field,
            k=k,
            metric=metric,
            include_source=True,
        )
        # Store alpha in vector query metadata for backend to use
        self.vector_query.metadata = {"hybrid_alpha": alpha}

    # Set limit if not already set
    if self.limit_value is None:
        self.limit_value = k

    return self
with_reranking
with_reranking(rerank_k: int | None = None) -> Query

Enable result reranking for vector queries.

Parameters:

Name Type Description Default
rerank_k int | None

Number of results to rerank (default: 2*k from vector query)

None

Returns:

Type Description
Query

Self for method chaining

Source code in packages/data/src/dataknobs_data/query.py
def with_reranking(self, rerank_k: int | None = None) -> Query:
    """Enable result reranking for vector queries.

    Args:
        rerank_k: Number of results to rerank (default: 2*k from vector query)

    Returns:
        Self for method chaining
    """
    if self.vector_query:
        self.vector_query.rerank = True
        self.vector_query.rerank_k = rerank_k or (self.vector_query.k * 2)
    return self
clear_vector
clear_vector() -> Query

Clear vector search from the query (fluent interface).

Source code in packages/data/src/dataknobs_data/query.py
def clear_vector(self) -> Query:
    """Clear vector search from the query (fluent interface)."""
    self.vector_query = None
    return self
to_dict
to_dict() -> dict[str, Any]

Convert query to dictionary representation.

Source code in packages/data/src/dataknobs_data/query.py
def to_dict(self) -> dict[str, Any]:
    """Convert query to dictionary representation."""
    result = {
        "filters": [f.to_dict() for f in self.filters],
        "sort": [s.to_dict() for s in self.sort_specs],
    }
    if self.limit_value is not None:
        result["limit"] = self.limit_value
    if self.offset_value is not None:
        result["offset"] = self.offset_value
    if self.fields is not None:
        result["fields"] = self.fields
    if self.vector_query is not None:
        result["vector_query"] = self.vector_query.to_dict()
    return result
from_dict classmethod
from_dict(data: dict[str, Any]) -> Query

Create query from dictionary representation.

Source code in packages/data/src/dataknobs_data/query.py
@classmethod
def from_dict(cls, data: dict[str, Any]) -> Query:
    """Create query from dictionary representation."""
    query = cls()

    for filter_data in data.get("filters", []):
        query.filters.append(Filter.from_dict(filter_data))

    for sort_data in data.get("sort", []):
        query.sort_specs.append(SortSpec.from_dict(sort_data))

    query.limit_value = data.get("limit")
    query.offset_value = data.get("offset")
    query.fields = data.get("fields")

    if "vector_query" in data:
        query.vector_query = VectorQuery.from_dict(data["vector_query"])

    return query
copy
copy() -> Query

Create a copy of the query.

Source code in packages/data/src/dataknobs_data/query.py
def copy(self) -> Query:
    """Create a copy of the query."""
    import copy

    return Query(
        filters=copy.deepcopy(self.filters),
        sort_specs=copy.deepcopy(self.sort_specs),
        limit_value=self.limit_value,
        offset_value=self.offset_value,
        fields=self.fields.copy() if self.fields else None,
        vector_query=copy.deepcopy(self.vector_query) if self.vector_query else None,
    )
or_
or_(*filters: Filter | Query) -> ComplexQuery

Create a ComplexQuery with OR logic.

The current query's filters become an AND group, combined with OR conditions. Example: Query with filters [A, B] calling or_(C, D) creates: (A AND B) AND (C OR D)

Parameters:

Name Type Description Default
filters Filter | Query

Filter objects or Query objects to OR together

()

Returns:

Type Description
ComplexQuery

ComplexQuery with OR logic

Source code in packages/data/src/dataknobs_data/query.py
def or_(self, *filters: Filter | Query) -> ComplexQuery:
    """Create a ComplexQuery with OR logic.

    The current query's filters become an AND group, combined with OR conditions.
    Example: Query with filters [A, B] calling or_(C, D) creates: (A AND B) AND (C OR D)

    Args:
        filters: Filter objects or Query objects to OR together

    Returns:
        ComplexQuery with OR logic
    """
    from .query_logic import (
        ComplexQuery,
        Condition,
        FilterCondition,
        LogicCondition,
        LogicOperator,
    )

    # Build OR conditions from the arguments
    or_conditions: list[Condition] = []
    for item in filters:
        if isinstance(item, Filter):
            or_conditions.append(FilterCondition(item))
        elif isinstance(item, Query):
            if len(item.filters) == 1:
                or_conditions.append(FilterCondition(item.filters[0]))
            elif item.filters:
                and_cond = LogicCondition(operator=LogicOperator.AND)
                for f in item.filters:
                    and_cond.conditions.append(FilterCondition(f))
                or_conditions.append(and_cond)

    # Create the OR condition group
    or_group = None
    if or_conditions:
        if len(or_conditions) == 1:
            or_group = or_conditions[0]
        else:
            or_group = LogicCondition(
                operator=LogicOperator.OR,
                conditions=or_conditions
            )

    # Combine with existing filters (if any) using AND
    if self.filters:
        # Create AND condition for existing filters
        if len(self.filters) == 1:
            existing = FilterCondition(self.filters[0])
        else:
            existing = LogicCondition(operator=LogicOperator.AND)
            for f in self.filters:
                existing.conditions.append(FilterCondition(f))

        # Combine existing AND new OR group with AND
        if or_group:
            root_condition = LogicCondition(
                operator=LogicOperator.AND,
                conditions=[existing, or_group]
            )
        else:
            root_condition = existing
    else:
        # No existing filters, just use OR group
        root_condition = or_group

    return ComplexQuery(
        condition=root_condition,
        sort_specs=self.sort_specs.copy(),
        limit_value=self.limit_value,
        offset_value=self.offset_value,
        fields=self.fields.copy() if self.fields else None
    )
and_
and_(*filters: Filter | Query) -> Query

Add more filters with AND logic (convenience method).

Parameters:

Name Type Description Default
filters Filter | Query

Filter objects or Query objects to AND together

()

Returns:

Type Description
Query

Self for chaining

Source code in packages/data/src/dataknobs_data/query.py
def and_(self, *filters: Filter | Query) -> Query:
    """Add more filters with AND logic (convenience method).

    Args:
        filters: Filter objects or Query objects to AND together

    Returns:
        Self for chaining
    """
    for item in filters:
        if isinstance(item, Filter):
            self.filters.append(item)
        elif isinstance(item, Query):
            self.filters.extend(item.filters)
    return self
not_
not_(filter: Filter) -> ComplexQuery

Create a ComplexQuery with NOT logic.

Parameters:

Name Type Description Default
filter Filter

Filter to negate

required

Returns:

Type Description
ComplexQuery

ComplexQuery with NOT logic

Source code in packages/data/src/dataknobs_data/query.py
def not_(self, filter: Filter) -> ComplexQuery:
    """Create a ComplexQuery with NOT logic.

    Args:
        filter: Filter to negate

    Returns:
        ComplexQuery with NOT logic
    """
    from .query_logic import (
        ComplexQuery,
        Condition,
        FilterCondition,
        LogicCondition,
        LogicOperator,
    )

    # Current filters as AND
    conditions: list[Condition] = []
    if self.filters:
        if len(self.filters) == 1:
            conditions.append(FilterCondition(self.filters[0]))
        else:
            and_cond = LogicCondition(operator=LogicOperator.AND)
            for f in self.filters:
                and_cond.conditions.append(FilterCondition(f))
            conditions.append(and_cond)

    # Add NOT condition
    not_cond = LogicCondition(
        operator=LogicOperator.NOT,
        conditions=[FilterCondition(filter)]
    )
    conditions.append(not_cond)

    # Create root condition
    if len(conditions) == 1:
        root_condition = conditions[0]
    else:
        root_condition = LogicCondition(
            operator=LogicOperator.AND,
            conditions=conditions
        )

    return ComplexQuery(
        condition=root_condition,
        sort_specs=self.sort_specs.copy(),
        limit_value=self.limit_value,
        offset_value=self.offset_value,
        fields=self.fields.copy() if self.fields else None
    )

SortOrder

Bases: Enum

Sort order for query results.

SortSpec dataclass

SortSpec(field: str, order: SortOrder = SortOrder.ASC)

Represents a sort specification.

Methods:

Name Description
to_dict

Convert sort spec to dictionary representation.

from_dict

Create sort spec from dictionary representation.

Functions
to_dict
to_dict() -> dict[str, str]

Convert sort spec to dictionary representation.

Source code in packages/data/src/dataknobs_data/query.py
def to_dict(self) -> dict[str, str]:
    """Convert sort spec to dictionary representation."""
    return {"field": self.field, "order": self.order.value}
from_dict classmethod
from_dict(data: dict[str, str]) -> SortSpec

Create sort spec from dictionary representation.

Source code in packages/data/src/dataknobs_data/query.py
@classmethod
def from_dict(cls, data: dict[str, str]) -> SortSpec:
    """Create sort spec from dictionary representation."""
    return cls(field=data["field"], order=SortOrder(data.get("order", "asc")))

ComplexQuery dataclass

ComplexQuery(
    condition: Condition | None = None,
    sort_specs: list = list(),
    limit_value: int | None = None,
    offset_value: int | None = None,
    fields: list[str] | None = None,
    vector_query: VectorQuery | None = None,
)

A query with complex boolean logic support.

Methods:

Name Description
AND

Create a complex query with AND logic.

OR

Create a complex query with OR logic.

matches

Check if a record matches this query.

to_simple_query

Convert to simple Query if possible (AND filters only).

to_dict

Convert to dictionary representation.

from_dict

Create from dictionary representation.

Functions
AND classmethod
AND(queries: list[Query]) -> ComplexQuery

Create a complex query with AND logic.

Source code in packages/data/src/dataknobs_data/query_logic.py
@classmethod
def AND(cls, queries: list[Query]) -> ComplexQuery:
    """Create a complex query with AND logic."""
    from .query import Query

    conditions: list[Condition] = []
    for q in queries:
        if isinstance(q, Query):
            # Convert Query filters to conditions
            for f in q.filters:
                conditions.append(FilterCondition(filter=f))

    return cls(
        condition=LogicCondition(operator=LogicOperator.AND, conditions=conditions)
    )
OR classmethod
OR(queries: list[Query]) -> ComplexQuery

Create a complex query with OR logic.

Source code in packages/data/src/dataknobs_data/query_logic.py
@classmethod
def OR(cls, queries: list[Query]) -> ComplexQuery:
    """Create a complex query with OR logic."""
    from .query import Query

    conditions: list[Condition] = []
    for q in queries:
        if isinstance(q, Query):
            # Convert Query filters to conditions
            for f in q.filters:
                conditions.append(FilterCondition(filter=f))

    return cls(
        condition=LogicCondition(operator=LogicOperator.OR, conditions=conditions)
    )
matches
matches(record: Any) -> bool

Check if a record matches this query.

Source code in packages/data/src/dataknobs_data/query_logic.py
def matches(self, record: Any) -> bool:
    """Check if a record matches this query."""
    if self.condition is None:
        return True
    return self.condition.matches(record)
to_simple_query
to_simple_query() -> Query

Convert to simple Query if possible (AND filters only).

Source code in packages/data/src/dataknobs_data/query_logic.py
def to_simple_query(self) -> Query:
    """Convert to simple Query if possible (AND filters only)."""
    from .query import Query

    filters = []

    # Try to extract simple filters if all are AND conditions
    if self.condition is None:
        pass
    elif isinstance(self.condition, FilterCondition):
        filters.append(self.condition.filter)
    elif isinstance(self.condition, LogicCondition) and self.condition.operator == LogicOperator.AND:
        # Check if all sub-conditions are simple filters
        all_filters = True
        for cond in self.condition.conditions:
            if isinstance(cond, FilterCondition):
                filters.append(cond.filter)
            else:
                all_filters = False
                break

        if not all_filters:
            # Can't convert complex logic to simple query
            raise ValueError("Cannot convert complex boolean logic to simple Query")
    else:
        raise ValueError("Cannot convert complex boolean logic to simple Query")

    return Query(
        filters=filters,
        sort_specs=self.sort_specs,
        limit_value=self.limit_value,
        offset_value=self.offset_value,
        fields=self.fields,
        vector_query=self.vector_query
    )
to_dict
to_dict() -> dict[str, Any]

Convert to dictionary representation.

Source code in packages/data/src/dataknobs_data/query_logic.py
def to_dict(self) -> dict[str, Any]:
    """Convert to dictionary representation."""
    result = {}

    if self.condition:
        result["condition"] = self.condition.to_dict()

    if self.sort_specs:
        result["sort"] = [s.to_dict() for s in self.sort_specs]

    if self.limit_value is not None:
        result["limit"] = self.limit_value

    if self.offset_value is not None:
        result["offset"] = self.offset_value

    if self.fields is not None:
        result["fields"] = self.fields

    if self.vector_query is not None:
        result["vector_query"] = self.vector_query.to_dict()

    return result
from_dict classmethod
from_dict(data: dict[str, Any]) -> ComplexQuery

Create from dictionary representation.

Source code in packages/data/src/dataknobs_data/query_logic.py
@classmethod
def from_dict(cls, data: dict[str, Any]) -> ComplexQuery:
    """Create from dictionary representation."""
    from .query import SortSpec

    condition = None
    if "condition" in data:
        condition = condition_from_dict(data["condition"])

    sort_specs = []
    for sort_data in data.get("sort", []):
        sort_specs.append(SortSpec.from_dict(sort_data))

    vector_query = None
    if "vector_query" in data:
        vector_query = VectorQuery.from_dict(data["vector_query"])

    return cls(
        condition=condition,
        sort_specs=sort_specs,
        limit_value=data.get("limit"),
        offset_value=data.get("offset"),
        fields=data.get("fields"),
        vector_query=vector_query
    )

Condition

Bases: ABC

Abstract base class for query conditions.

Methods:

Name Description
matches

Check if a record matches this condition.

to_dict

Convert condition to dictionary representation.

from_dict

Create condition from dictionary representation.

Functions
matches abstractmethod
matches(record: Any) -> bool

Check if a record matches this condition.

Source code in packages/data/src/dataknobs_data/query_logic.py
@abstractmethod
def matches(self, record: Any) -> bool:
    """Check if a record matches this condition."""
    pass
to_dict abstractmethod
to_dict() -> dict[str, Any]

Convert condition to dictionary representation.

Source code in packages/data/src/dataknobs_data/query_logic.py
@abstractmethod
def to_dict(self) -> dict[str, Any]:
    """Convert condition to dictionary representation."""
    pass
from_dict abstractmethod classmethod
from_dict(data: dict[str, Any]) -> Condition

Create condition from dictionary representation.

Source code in packages/data/src/dataknobs_data/query_logic.py
@classmethod
@abstractmethod
def from_dict(cls, data: dict[str, Any]) -> Condition:
    """Create condition from dictionary representation."""
    pass

FilterCondition dataclass

FilterCondition(filter: Filter)

Bases: Condition

A single filter condition.

Methods:

Name Description
matches

Check if a record matches this filter.

to_dict

Convert to dictionary representation.

from_dict

Create from dictionary representation.

Functions
matches
matches(record: Any) -> bool

Check if a record matches this filter.

Source code in packages/data/src/dataknobs_data/query_logic.py
def matches(self, record: Any) -> bool:
    """Check if a record matches this filter."""
    from .records import Record

    if isinstance(record, Record):
        value = record.get_value(self.filter.field)
    elif isinstance(record, dict):
        # Support nested field access for dicts
        value = record
        for part in self.filter.field.split('.'):
            if isinstance(value, dict):
                value = value.get(part)
            else:
                value = None
                break
    else:
        value = getattr(record, self.filter.field, None)

    return self.filter.matches(value)
to_dict
to_dict() -> dict[str, Any]

Convert to dictionary representation.

Source code in packages/data/src/dataknobs_data/query_logic.py
def to_dict(self) -> dict[str, Any]:
    """Convert to dictionary representation."""
    return {
        "type": "filter",
        "filter": self.filter.to_dict()
    }
from_dict classmethod
from_dict(data: dict[str, Any]) -> FilterCondition

Create from dictionary representation.

Source code in packages/data/src/dataknobs_data/query_logic.py
@classmethod
def from_dict(cls, data: dict[str, Any]) -> FilterCondition:
    """Create from dictionary representation."""
    return cls(filter=Filter.from_dict(data["filter"]))

LogicCondition dataclass

LogicCondition(operator: LogicOperator, conditions: list[Condition] = list())

Bases: Condition

A logical combination of conditions.

Methods:

Name Description
matches

Check if a record matches this logical condition.

to_dict

Convert to dictionary representation.

from_dict

Create from dictionary representation.

Functions
matches
matches(record: Any) -> bool

Check if a record matches this logical condition.

Source code in packages/data/src/dataknobs_data/query_logic.py
def matches(self, record: Any) -> bool:
    """Check if a record matches this logical condition."""
    if self.operator == LogicOperator.AND:
        # All conditions must match
        return all(cond.matches(record) for cond in self.conditions)
    elif self.operator == LogicOperator.OR:
        # At least one condition must match
        return any(cond.matches(record) for cond in self.conditions)
    elif self.operator == LogicOperator.NOT:
        # No conditions should match (or negate single condition)
        if len(self.conditions) == 1:
            return not self.conditions[0].matches(record)
        else:
            # NOT with multiple conditions = none should match
            return not any(cond.matches(record) for cond in self.conditions)
    else:
        # This should never be reached as all operators are handled above
        raise ValueError(f"Unknown logical operator: {self.operator}")
to_dict
to_dict() -> dict[str, Any]

Convert to dictionary representation.

Source code in packages/data/src/dataknobs_data/query_logic.py
def to_dict(self) -> dict[str, Any]:
    """Convert to dictionary representation."""
    return {
        "type": "logic",
        "operator": self.operator.value,
        "conditions": [cond.to_dict() for cond in self.conditions]
    }
from_dict classmethod
from_dict(data: dict[str, Any]) -> LogicCondition

Create from dictionary representation.

Source code in packages/data/src/dataknobs_data/query_logic.py
@classmethod
def from_dict(cls, data: dict[str, Any]) -> LogicCondition:
    """Create from dictionary representation."""
    conditions: list[Condition] = []
    for cond_data in data.get("conditions", []):
        if cond_data["type"] == "filter":
            conditions.append(FilterCondition.from_dict(cond_data))
        elif cond_data["type"] == "logic":
            conditions.append(LogicCondition.from_dict(cond_data))

    return cls(
        operator=LogicOperator(data["operator"]),
        conditions=conditions
    )

LogicOperator

Bases: Enum

Logical operators for combining conditions.

QueryBuilder

QueryBuilder()

Builder for complex queries with boolean logic.

Initialize empty query builder.

Methods:

Name Description
where

Add a filter condition (defaults to AND with existing conditions).

and_

Add AND conditions.

or_

Add OR conditions.

not_

Add NOT condition.

sort_by

Add sort specification.

limit

Set result limit.

offset

Set result offset.

select

Set field projection.

similar_to

Add vector similarity search.

build

Build the final query.

Source code in packages/data/src/dataknobs_data/query_logic.py
def __init__(self):
    """Initialize empty query builder."""
    self.root_condition = None
    self.sort_specs = []
    self.limit_value = None
    self.offset_value = None
    self.fields = None
    self.vector_query = None
Functions
where
where(field: str, operator: str | Operator, value: Any = None) -> QueryBuilder

Add a filter condition (defaults to AND with existing conditions).

Source code in packages/data/src/dataknobs_data/query_logic.py
def where(self, field: str, operator: str | Operator, value: Any = None) -> QueryBuilder:
    """Add a filter condition (defaults to AND with existing conditions)."""
    op = Operator(operator) if isinstance(operator, str) else operator
    filter_cond = FilterCondition(Filter(field, op, value))

    if self.root_condition is None:
        self.root_condition = filter_cond
    elif isinstance(self.root_condition, LogicCondition) and self.root_condition.operator == LogicOperator.AND:
        self.root_condition.conditions.append(filter_cond)
    else:
        # Wrap existing condition in AND
        self.root_condition = LogicCondition(
            operator=LogicOperator.AND,
            conditions=[self.root_condition, filter_cond]
        )

    return self
and_
and_(*conditions: QueryBuilder | Filter | Condition) -> QueryBuilder

Add AND conditions.

Source code in packages/data/src/dataknobs_data/query_logic.py
def and_(self, *conditions: QueryBuilder | Filter | Condition) -> QueryBuilder:
    """Add AND conditions."""
    logic_cond = LogicCondition(operator=LogicOperator.AND)

    for cond in conditions:
        if isinstance(cond, QueryBuilder):
            if cond.root_condition:
                logic_cond.conditions.append(cond.root_condition)
        elif isinstance(cond, Filter):
            logic_cond.conditions.append(FilterCondition(cond))
        elif isinstance(cond, Condition):
            logic_cond.conditions.append(cond)

    if self.root_condition is None:
        self.root_condition = logic_cond
    elif isinstance(self.root_condition, LogicCondition) and self.root_condition.operator == LogicOperator.AND:
        self.root_condition.conditions.extend(logic_cond.conditions)
    else:
        self.root_condition = LogicCondition(
            operator=LogicOperator.AND,
            conditions=[self.root_condition, logic_cond]
        )

    return self
or_
or_(*conditions: QueryBuilder | Filter | Condition) -> QueryBuilder

Add OR conditions.

Source code in packages/data/src/dataknobs_data/query_logic.py
def or_(self, *conditions: QueryBuilder | Filter | Condition) -> QueryBuilder:
    """Add OR conditions."""
    logic_cond = LogicCondition(operator=LogicOperator.OR)

    for cond in conditions:
        if isinstance(cond, QueryBuilder):
            if cond.root_condition:
                logic_cond.conditions.append(cond.root_condition)
        elif isinstance(cond, Filter):
            logic_cond.conditions.append(FilterCondition(cond))
        elif isinstance(cond, Condition):
            logic_cond.conditions.append(cond)

    if self.root_condition is None:
        self.root_condition = logic_cond
    else:
        # Always wrap in OR at top level
        if isinstance(self.root_condition, LogicCondition) and self.root_condition.operator == LogicOperator.OR:
            self.root_condition.conditions.extend(logic_cond.conditions)
        else:
            self.root_condition = LogicCondition(
                operator=LogicOperator.OR,
                conditions=[self.root_condition] + logic_cond.conditions
            )

    return self
not_
not_(condition: QueryBuilder | Filter | Condition) -> QueryBuilder

Add NOT condition.

Source code in packages/data/src/dataknobs_data/query_logic.py
def not_(self, condition: QueryBuilder | Filter | Condition) -> QueryBuilder:
    """Add NOT condition."""
    if isinstance(condition, QueryBuilder):
        not_cond = LogicCondition(
            operator=LogicOperator.NOT,
            conditions=[condition.root_condition] if condition.root_condition else []
        )
    elif isinstance(condition, Filter):
        not_cond = LogicCondition(
            operator=LogicOperator.NOT,
            conditions=[FilterCondition(condition)]
        )
    else:
        not_cond = LogicCondition(
            operator=LogicOperator.NOT,
            conditions=[condition]
        )

    if self.root_condition is None:
        self.root_condition = not_cond
    elif isinstance(self.root_condition, LogicCondition) and self.root_condition.operator == LogicOperator.AND:
        self.root_condition.conditions.append(not_cond)
    else:
        self.root_condition = LogicCondition(
            operator=LogicOperator.AND,
            conditions=[self.root_condition, not_cond]
        )

    return self
sort_by
sort_by(field: str, order: str = 'asc') -> QueryBuilder

Add sort specification.

Source code in packages/data/src/dataknobs_data/query_logic.py
def sort_by(self, field: str, order: str = "asc") -> QueryBuilder:
    """Add sort specification."""
    from .query import SortOrder, SortSpec

    sort_order = SortOrder.ASC if order.lower() == "asc" else SortOrder.DESC
    self.sort_specs.append(SortSpec(field=field, order=sort_order))
    return self
limit
limit(value: int) -> QueryBuilder

Set result limit.

Source code in packages/data/src/dataknobs_data/query_logic.py
def limit(self, value: int) -> QueryBuilder:
    """Set result limit."""
    self.limit_value = value
    return self
offset
offset(value: int) -> QueryBuilder

Set result offset.

Source code in packages/data/src/dataknobs_data/query_logic.py
def offset(self, value: int) -> QueryBuilder:
    """Set result offset."""
    self.offset_value = value
    return self
select
select(*fields: str) -> QueryBuilder

Set field projection.

Source code in packages/data/src/dataknobs_data/query_logic.py
def select(self, *fields: str) -> QueryBuilder:
    """Set field projection."""
    self.fields = list(fields) if fields else None
    return self
similar_to
similar_to(
    vector: ndarray | list[float],
    field: str = "embedding",
    k: int = 10,
    metric: DistanceMetric | str = "cosine",
    include_source: bool = True,
    score_threshold: float | None = None,
) -> QueryBuilder

Add vector similarity search.

Source code in packages/data/src/dataknobs_data/query_logic.py
def similar_to(
    self,
    vector: np.ndarray | list[float],
    field: str = "embedding",
    k: int = 10,
    metric: DistanceMetric | str = "cosine",
    include_source: bool = True,
    score_threshold: float | None = None,
) -> QueryBuilder:
    """Add vector similarity search."""
    self.vector_query = VectorQuery(
        vector=vector,
        field_name=field,
        k=k,
        metric=metric,
        include_source=include_source,
        score_threshold=score_threshold,
    )
    # If limit is not set, use k as the limit
    if self.limit_value is None:
        self.limit_value = k
    return self
build
build() -> ComplexQuery

Build the final query.

Source code in packages/data/src/dataknobs_data/query_logic.py
def build(self) -> ComplexQuery:
    """Build the final query."""
    return ComplexQuery(
        condition=self.root_condition,
        sort_specs=self.sort_specs,
        limit_value=self.limit_value,
        offset_value=self.offset_value,
        fields=self.fields,
        vector_query=self.vector_query
    )

Record dataclass

Record(
    data: dict[str, Any] | OrderedDict[str, Field] | None = None,
    metadata: dict[str, Any] | None = None,
    id: str | None = None,
    storage_id: str | None = None,
)

Represents a structured data record with fields and metadata.

The record ID can be accessed via the id property, which: - Returns the storage_id if set (database-assigned ID) - Falls back to user-defined 'id' field if present - Returns None if no ID is available

This separation allows records to have both: - A user-defined 'id' field as part of their data - A system-assigned storage_id for database operations

Example
from dataknobs_data import Record, Field, FieldType

# Create record from dict
record = Record({"name": "Alice", "age": 30, "email": "alice@example.com"})

# Access field values
print(record.get_value("name"))  # "Alice"
print(record["age"])  # 30
print(record.name)  # "Alice" (attribute access)

# Set field values
record.set_value("age", 31)
record["city"] = "New York"

# Work with metadata
record.metadata["source"] = "user_input"

# Convert to dict
data = record.to_dict()  # {"name": "Alice", "age": 31, "email": "...", "city": "..."}

Initialize a record from various data formats.

Parameters:

Name Type Description Default
data dict[str, Any] | OrderedDict[str, Field] | None

Can be a dict of field names to values, or an OrderedDict of Field objects

None
metadata dict[str, Any] | None

Optional metadata for the record

None
id str | None

Optional unique identifier for the record (deprecated, use storage_id)

None
storage_id str | None

Optional storage system identifier for the record

None
Example
# From simple dict
record = Record({"name": "Alice", "age": 30})

# With metadata
record = Record(
    data={"name": "Bob"},
    metadata={"source": "api", "timestamp": "2024-01-01"}
)

# With storage_id
record = Record(
    data={"name": "Charlie"},
    storage_id="550e8400-e29b-41d4-a716-446655440000"
)

Methods:

Name Description
generate_id

Generate and set a new UUID for this record.

get_user_id

Get the user-defined ID field value (not the storage ID).

has_storage_id

Check if this record has a storage system ID assigned.

get_field

Get a field by name.

get_value

Get a field's value by name, supporting dot-notation for nested paths.

get_nested_value

Get a value from a nested path using dot notation.

set_field

Set or update a field.

set_value

Set a field's value by name.

remove_field

Remove a field by name. Returns True if field was removed.

has_field

Check if a field exists.

field_names

Get list of field names.

field_count

Get the number of fields.

__getitem__

Get field value by name or field by index.

__setitem__

Set field by name.

__delitem__

Delete field by name.

__contains__

Check if field exists.

__iter__

Iterate over field names.

__len__

Get number of fields.

validate

Validate all fields in the record.

get_field_object

Get the Field object by name.

__getattr__

Get field value by attribute access.

__setattr__

Set field value by attribute access.

to_dict

Convert record to dictionary.

from_dict

Create a record from a dictionary representation.

copy

Create a copy of the record.

project

Create a new record with only specified fields.

merge

Merge another record into this one.

Attributes:

Name Type Description
storage_id str | None

Get the storage system ID (database-assigned ID).

id str | None

Get the record ID.

data dict[str, Any]

Get all field values as a dictionary.

Source code in packages/data/src/dataknobs_data/records.py
def __init__(
    self,
    data: dict[str, Any] | OrderedDict[str, Field] | None = None,
    metadata: dict[str, Any] | None = None,
    id: str | None = None,
    storage_id: str | None = None,
):
    """Initialize a record from various data formats.

    Args:
        data: Can be a dict of field names to values, or an OrderedDict of Field objects
        metadata: Optional metadata for the record
        id: Optional unique identifier for the record (deprecated, use storage_id)
        storage_id: Optional storage system identifier for the record

    Example:
        ```python
        # From simple dict
        record = Record({"name": "Alice", "age": 30})

        # With metadata
        record = Record(
            data={"name": "Bob"},
            metadata={"source": "api", "timestamp": "2024-01-01"}
        )

        # With storage_id
        record = Record(
            data={"name": "Charlie"},
            storage_id="550e8400-e29b-41d4-a716-446655440000"
        )
        ```
    """
    self.metadata = metadata or {}
    self.fields = OrderedDict()
    self._id = id  # Deprecated
    self._storage_id = storage_id or id  # Use storage_id if provided, fall back to id

    # Process data first to populate fields
    if data:
        if isinstance(data, OrderedDict) and all(
            isinstance(v, Field) for v in data.values()
        ):
            self.fields = data
        else:
            for key, value in data.items():
                if isinstance(value, Field):
                    # Ensure the field has the correct name
                    if value.name is None or value.name == "embedding":
                        value.name = key
                    self.fields[key] = value
                else:
                    self.fields[key] = Field(name=key, value=value)

    # Now check for ID from various sources if not explicitly provided
    if self._id is None:
        # Check metadata
        if "id" in self.metadata:
            self._id = str(self.metadata["id"])
        # Check fields for id
        elif "id" in self.fields:
            value = self.get_value("id")
            if value is not None:
                self._id = str(value)
        # Check fields for record_id
        elif "record_id" in self.fields:
            value = self.get_value("record_id")
            if value is not None:
                self._id = str(value)
Attributes
storage_id property writable
storage_id: str | None

Get the storage system ID (database-assigned ID).

id property writable
id: str | None

Get the record ID.

Priority order: 1. Storage ID (database-assigned) if set 2. User-defined 'id' field value 3. Metadata 'id' (for backwards compatibility) 4. record_id field (common in DataFrames)

Returns the first ID found, or None if no ID is present.

data property
data: dict[str, Any]

Get all field values as a dictionary.

Provides a simple dict-like view of the record's data.

Functions
generate_id
generate_id() -> str

Generate and set a new UUID for this record.

Returns:

Type Description
str

The generated UUID string

Source code in packages/data/src/dataknobs_data/records.py
def generate_id(self) -> str:
    """Generate and set a new UUID for this record.

    Returns:
        The generated UUID string
    """
    new_id = str(uuid.uuid4())
    self.id = new_id
    return new_id
get_user_id
get_user_id() -> str | None

Get the user-defined ID field value (not the storage ID).

This explicitly returns the value of the 'id' field in the record's data, ignoring any storage_id that may be set.

Returns:

Type Description
str | None

The value of the 'id' field if present, None otherwise

Source code in packages/data/src/dataknobs_data/records.py
def get_user_id(self) -> str | None:
    """Get the user-defined ID field value (not the storage ID).

    This explicitly returns the value of the 'id' field in the record's data,
    ignoring any storage_id that may be set.

    Returns:
        The value of the 'id' field if present, None otherwise
    """
    if "id" in self.fields:
        value = self.get_value("id")
        if value is not None:
            return str(value)
    return None
has_storage_id
has_storage_id() -> bool

Check if this record has a storage system ID assigned.

Returns:

Type Description
bool

True if storage_id is set, False otherwise

Source code in packages/data/src/dataknobs_data/records.py
def has_storage_id(self) -> bool:
    """Check if this record has a storage system ID assigned.

    Returns:
        True if storage_id is set, False otherwise
    """
    return self._storage_id is not None
get_field
get_field(name: str) -> Field | None

Get a field by name.

Source code in packages/data/src/dataknobs_data/records.py
def get_field(self, name: str) -> Field | None:
    """Get a field by name."""
    return self.fields.get(name)
get_value
get_value(name: str, default: Any = None) -> Any

Get a field's value by name, supporting dot-notation for nested paths.

Parameters:

Name Type Description Default
name str

Field name or dot-notation path (e.g., "metadata.type")

required
default Any

Default value if field not found

None

Returns:

Type Description
Any

The field value or default

Example
record = Record({
    "name": "Alice",
    "config": {"timeout": 30, "retries": 3}
})

# Simple field access
name = record.get_value("name")  # "Alice"

# Nested path access
timeout = record.get_value("config.timeout")  # 30

# With default
missing = record.get_value("missing_field", "default")  # "default"
Source code in packages/data/src/dataknobs_data/records.py
def get_value(self, name: str, default: Any = None) -> Any:
    """Get a field's value by name, supporting dot-notation for nested paths.

    Args:
        name: Field name or dot-notation path (e.g., "metadata.type")
        default: Default value if field not found

    Returns:
        The field value or default

    Example:
        ```python
        record = Record({
            "name": "Alice",
            "config": {"timeout": 30, "retries": 3}
        })

        # Simple field access
        name = record.get_value("name")  # "Alice"

        # Nested path access
        timeout = record.get_value("config.timeout")  # 30

        # With default
        missing = record.get_value("missing_field", "default")  # "default"
        ```
    """
    # Check if this is a nested path
    if "." in name:
        return self.get_nested_value(name, default)

    # Simple field lookup
    field = self.get_field(name)
    return field.value if field else default
get_nested_value
get_nested_value(path: str, default: Any = None) -> Any

Get a value from a nested path using dot notation.

Supports paths like: - "metadata.type" - access metadata field (if exists) or metadata dict attribute - "fields.temperature" - access field values - "metadata.config.timeout" - nested dict access

Parameters:

Name Type Description Default
path str

Dot-notation path to the value

required
default Any

Default value if path not found

None

Returns:

Type Description
Any

The value at the path or default

Source code in packages/data/src/dataknobs_data/records.py
def get_nested_value(self, path: str, default: Any = None) -> Any:
    """Get a value from a nested path using dot notation.

    Supports paths like:
    - "metadata.type" - access metadata field (if exists) or metadata dict attribute
    - "fields.temperature" - access field values
    - "metadata.config.timeout" - nested dict access

    Args:
        path: Dot-notation path to the value
        default: Default value if path not found

    Returns:
        The value at the path or default
    """
    parts = path.split(".", 1)
    if len(parts) == 1:
        # No more nesting, get the value
        return self.get_value(parts[0], default)

    root, remaining = parts

    # Handle special root paths
    if root == "metadata":
        # Check if "metadata" is a field first, before falling back to attribute
        if root in self.fields:
            # It's a field, navigate through its value
            field_value = self.get_value(root, None)
            if isinstance(field_value, dict):
                return self._traverse_dict(field_value, remaining, default)
            return default
        elif self.metadata:
            # Fall back to record's metadata attribute
            return self._traverse_dict(self.metadata, remaining, default)
        else:
            return default
    elif root == "fields":
        # Get field value by name
        if "." in remaining:
            # Nested path within field value (if it's a dict)
            field_name, field_path = remaining.split(".", 1)
            field_value = self.get_value(field_name, None)
            if isinstance(field_value, dict):
                return self._traverse_dict(field_value, field_path, default)
            return default
        else:
            # Simple field access
            return self.get_value(remaining, default)
    else:
        # Check if it's a field containing a dict
        field_value = self.get_value(root, None)
        if isinstance(field_value, dict):
            return self._traverse_dict(field_value, remaining, default)
        return default
set_field
set_field(
    name: str,
    value: Any,
    field_type: FieldType | None = None,
    field_metadata: dict[str, Any] | None = None,
) -> None

Set or update a field.

Source code in packages/data/src/dataknobs_data/records.py
def set_field(
    self,
    name: str,
    value: Any,
    field_type: FieldType | None = None,
    field_metadata: dict[str, Any] | None = None,
) -> None:
    """Set or update a field."""
    self.fields[name] = Field(
        name=name, value=value, type=field_type, metadata=field_metadata or {}
    )
set_value
set_value(name: str, value: Any) -> None

Set a field's value by name.

Convenience method that creates the field if it doesn't exist.

Source code in packages/data/src/dataknobs_data/records.py
def set_value(self, name: str, value: Any) -> None:
    """Set a field's value by name.

    Convenience method that creates the field if it doesn't exist.
    """
    if name in self.fields:
        self.fields[name].value = value
    else:
        self.set_field(name, value)
remove_field
remove_field(name: str) -> bool

Remove a field by name. Returns True if field was removed.

Source code in packages/data/src/dataknobs_data/records.py
def remove_field(self, name: str) -> bool:
    """Remove a field by name. Returns True if field was removed."""
    if name in self.fields:
        del self.fields[name]
        return True
    return False
has_field
has_field(name: str) -> bool

Check if a field exists.

Source code in packages/data/src/dataknobs_data/records.py
def has_field(self, name: str) -> bool:
    """Check if a field exists."""
    return name in self.fields
field_names
field_names() -> list[str]

Get list of field names.

Source code in packages/data/src/dataknobs_data/records.py
def field_names(self) -> list[str]:
    """Get list of field names."""
    return list(self.fields.keys())
field_count
field_count() -> int

Get the number of fields.

Source code in packages/data/src/dataknobs_data/records.py
def field_count(self) -> int:
    """Get the number of fields."""
    return len(self.fields)
__getitem__
__getitem__(key: str | int) -> Any

Get field value by name or field by index.

For string keys, returns the field value directly (dict-like access). For integer keys, returns the Field object at that index for backward compatibility.

Source code in packages/data/src/dataknobs_data/records.py
def __getitem__(self, key: str | int) -> Any:
    """Get field value by name or field by index.

    For string keys, returns the field value directly (dict-like access).
    For integer keys, returns the Field object at that index for backward compatibility.
    """
    if isinstance(key, str):
        if key not in self.fields:
            raise KeyError(f"Field '{key}' not found")
        return self.fields[key].value
    elif isinstance(key, int):
        field_list = list(self.fields.values())
        if key < 0 or key >= len(field_list):
            raise IndexError(f"Field index {key} out of range")
        return field_list[key]
    else:
        raise TypeError(f"Key must be str or int, got {type(key)}")
__setitem__
__setitem__(key: str, value: Field | Any) -> None

Set field by name.

Can accept either a Field object or a raw value. When given a raw value, creates a new Field automatically.

Source code in packages/data/src/dataknobs_data/records.py
def __setitem__(self, key: str, value: Field | Any) -> None:
    """Set field by name.

    Can accept either a Field object or a raw value.
    When given a raw value, creates a new Field automatically.
    """
    if isinstance(value, Field):
        self.fields[key] = value
    else:
        self.set_field(key, value)
__delitem__
__delitem__(key: str) -> None

Delete field by name.

Source code in packages/data/src/dataknobs_data/records.py
def __delitem__(self, key: str) -> None:
    """Delete field by name."""
    if key not in self.fields:
        raise KeyError(f"Field '{key}' not found")
    del self.fields[key]
__contains__
__contains__(key: str) -> bool

Check if field exists.

Source code in packages/data/src/dataknobs_data/records.py
def __contains__(self, key: str) -> bool:
    """Check if field exists."""
    return key in self.fields
__iter__
__iter__() -> Iterator[str]

Iterate over field names.

Source code in packages/data/src/dataknobs_data/records.py
def __iter__(self) -> Iterator[str]:
    """Iterate over field names."""
    return iter(self.fields)
__len__
__len__() -> int

Get number of fields.

Source code in packages/data/src/dataknobs_data/records.py
def __len__(self) -> int:
    """Get number of fields."""
    return len(self.fields)
validate
validate() -> bool

Validate all fields in the record.

Source code in packages/data/src/dataknobs_data/records.py
def validate(self) -> bool:
    """Validate all fields in the record."""
    return all(field.validate() for field in self.fields.values())
get_field_object
get_field_object(key: str) -> Field

Get the Field object by name.

Use this method when you need access to the Field object itself, not just its value.

Parameters:

Name Type Description Default
key str

Field name

required

Returns:

Type Description
Field

The Field object

Raises:

Type Description
KeyError

If field not found

Source code in packages/data/src/dataknobs_data/records.py
def get_field_object(self, key: str) -> Field:
    """Get the Field object by name.

    Use this method when you need access to the Field object itself,
    not just its value.

    Args:
        key: Field name

    Returns:
        The Field object

    Raises:
        KeyError: If field not found
    """
    if key not in self.fields:
        raise KeyError(f"Field '{key}' not found")
    return self.fields[key]
__getattr__
__getattr__(name: str) -> Any

Get field value by attribute access.

Provides convenient attribute-style access to field values. Falls back to normal attribute access for non-field attributes.

Parameters:

Name Type Description Default
name str

Attribute/field name

required

Returns:

Type Description
Any

Field value if field exists, otherwise raises AttributeError

Source code in packages/data/src/dataknobs_data/records.py
def __getattr__(self, name: str) -> Any:
    """Get field value by attribute access.

    Provides convenient attribute-style access to field values.
    Falls back to normal attribute access for non-field attributes.

    Args:
        name: Attribute/field name

    Returns:
        Field value if field exists, otherwise raises AttributeError
    """
    # Avoid infinite recursion for special attributes
    if name.startswith("_") or name in ("fields", "metadata", "id"):
        raise AttributeError(
            f"'{type(self).__name__}' object has no attribute '{name}'"
        )

    # Check if it's a field
    if hasattr(self, "fields") and name in self.fields:
        return self.fields[name].value

    raise AttributeError(f"'{type(self).__name__}' object has no field '{name}'")
__setattr__
__setattr__(name: str, value: Any) -> None

Set field value by attribute access.

Allows setting field values using attribute syntax. Special attributes (fields, metadata, _id, _storage_id) are handled normally. Properties (id, storage_id) are also handled specially.

Parameters:

Name Type Description Default
name str

Attribute/field name

required
value Any

Value to set

required
Source code in packages/data/src/dataknobs_data/records.py
def __setattr__(self, name: str, value: Any) -> None:
    """Set field value by attribute access.

    Allows setting field values using attribute syntax.
    Special attributes (fields, metadata, _id, _storage_id) are handled normally.
    Properties (id, storage_id) are also handled specially.

    Args:
        name: Attribute/field name
        value: Value to set
    """
    # Handle special attributes and private attributes normally
    if name in ("fields", "metadata", "_id", "_storage_id") or name.startswith("_"):
        super().__setattr__(name, value)
    # Handle properties that have setters
    elif name in ("id", "storage_id"):
        # Use the property setter
        object.__setattr__(self, name, value)
    elif hasattr(self, "fields") and name in self.fields:
        # Update existing field value
        self.fields[name].value = value
    else:
        # For new fields during normal operation, create them
        # But during __init__, we need to use normal attribute setting
        if hasattr(self, "fields"):
            self.set_field(name, value)
        else:
            super().__setattr__(name, value)
to_dict
to_dict(
    include_metadata: bool = False,
    flatten: bool = True,
    include_field_objects: bool = True,
) -> dict[str, Any]

Convert record to dictionary.

Parameters:

Name Type Description Default
include_metadata bool

Whether to include metadata in the output

False
flatten bool

If True (default), return just field values; if False, return structured format

True
include_field_objects bool

If True and not flattened, return full Field objects

True

Returns:

Type Description
dict[str, Any]

Dictionary representation of the record

Source code in packages/data/src/dataknobs_data/records.py
def to_dict(
    self,
    include_metadata: bool = False,
    flatten: bool = True,
    include_field_objects: bool = True,
) -> dict[str, Any]:
    """Convert record to dictionary.

    Args:
        include_metadata: Whether to include metadata in the output
        flatten: If True (default), return just field values; if False, return structured format
        include_field_objects: If True and not flattened, return full Field objects

    Returns:
        Dictionary representation of the record
    """
    if flatten:
        # Simple dict with just values (default behavior for ergonomics)
        result = {}
        for name, field in self.fields.items():
            # Handle VectorField specially to ensure JSON serialization
            if hasattr(field, 'to_list') and callable(field.to_list):
                # VectorField has a to_list() method for serialization
                result[name] = field.to_list()
            else:
                result[name] = field.value
        if self.id:
            result["_id"] = self.id
        if include_metadata and self.metadata:
            result["_metadata"] = self.metadata
    else:
        # Structured format for serialization
        if include_field_objects:
            result = {
                "fields": {
                    name: field.to_dict() for name, field in self.fields.items()
                }
            }
        else:
            result = {
                "fields": {name: field.value for name, field in self.fields.items()}
            }
        if self.id:
            result["id"] = self.id
        if include_metadata:
            result["metadata"] = self.metadata
    return result
from_dict classmethod
from_dict(data: dict[str, Any]) -> Record

Create a record from a dictionary representation.

Parameters:

Name Type Description Default
data dict[str, Any]

Dictionary containing record data

required

Returns:

Type Description
Record

A new Record instance

Example
# From simple dict
data = {"name": "Alice", "age": 30}
record = Record.from_dict(data)

# From structured format
data = {
    "fields": {
        "name": {"value": "Alice", "type": "string"},
        "age": {"value": 30, "type": "integer"}
    },
    "metadata": {"source": "api"}
}
record = Record.from_dict(data)
Source code in packages/data/src/dataknobs_data/records.py
@classmethod
def from_dict(cls, data: dict[str, Any]) -> Record:
    """Create a record from a dictionary representation.

    Args:
        data: Dictionary containing record data

    Returns:
        A new Record instance

    Example:
        ```python
        # From simple dict
        data = {"name": "Alice", "age": 30}
        record = Record.from_dict(data)

        # From structured format
        data = {
            "fields": {
                "name": {"value": "Alice", "type": "string"},
                "age": {"value": 30, "type": "integer"}
            },
            "metadata": {"source": "api"}
        }
        record = Record.from_dict(data)
        ```
    """
    if "fields" in data:
        fields = OrderedDict()
        for name, field_data in data["fields"].items():
            if isinstance(field_data, dict) and "value" in field_data:
                # Add name to field_data for Field.from_dict
                field_data_with_name = {"name": name, **field_data}
                fields[name] = Field.from_dict(field_data_with_name)
            else:
                fields[name] = Field(name=name, value=field_data)
        metadata = data.get("metadata", {})
        record_id = data.get("id") or data.get("_id")
        return cls(data=fields, metadata=metadata, id=record_id)
    else:
        # Check for _id in flattened format
        record_id = data.pop("_id", None) if "_id" in data else None
        return cls(data=data, id=record_id)
copy
copy(deep: bool = True) -> Record

Create a copy of the record.

Parameters:

Name Type Description Default
deep bool

If True, create deep copies of fields and metadata

True
Source code in packages/data/src/dataknobs_data/records.py
def copy(self, deep: bool = True) -> Record:
    """Create a copy of the record.

    Args:
        deep: If True, create deep copies of fields and metadata
    """
    if deep:
        import copy

        new_fields = OrderedDict()
        for name, field in self.fields.items():
            # Preserve the actual field type (Field or VectorField)
            if hasattr(field, '__class__'):
                # Use the actual class of the field
                field_class = field.__class__
                if field_class.__name__ == 'VectorField':
                    # Import VectorField if needed
                    from dataknobs_data.fields import VectorField
                    new_fields[name] = VectorField(
                        name=field.name,
                        value=copy.deepcopy(field.value),
                        dimensions=getattr(field, 'dimensions', None),
                        source_field=getattr(field, 'source_field', None),
                        model_name=getattr(field, 'model_name', None),
                        model_version=getattr(field, 'model_version', None),
                        metadata=copy.deepcopy(field.metadata),
                    )
                else:
                    new_fields[name] = Field(
                        name=field.name,
                        value=copy.deepcopy(field.value),
                        type=field.type,
                        metadata=copy.deepcopy(field.metadata),
                    )
            else:
                # Fallback to regular Field
                new_fields[name] = Field(
                    name=field.name,
                    value=copy.deepcopy(field.value),
                    type=field.type,
                    metadata=copy.deepcopy(field.metadata),
                )
        new_metadata = copy.deepcopy(self.metadata)
    else:
        new_fields = OrderedDict(self.fields)  # type: ignore[arg-type]
        new_metadata = self.metadata.copy()

    return Record(data=new_fields, metadata=new_metadata, id=self.id)
project
project(field_names: list[str]) -> Record

Create a new record with only specified fields.

Parameters:

Name Type Description Default
field_names list[str]

List of field names to include in the projection

required

Returns:

Type Description
Record

A new Record containing only the specified fields

Example
record = Record({"name": "Alice", "age": 30, "email": "alice@example.com"})

# Project to specific fields
subset = record.project(["name", "age"])
print(subset.field_names())  # ["name", "age"]
Source code in packages/data/src/dataknobs_data/records.py
def project(self, field_names: list[str]) -> Record:
    """Create a new record with only specified fields.

    Args:
        field_names: List of field names to include in the projection

    Returns:
        A new Record containing only the specified fields

    Example:
        ```python
        record = Record({"name": "Alice", "age": 30, "email": "alice@example.com"})

        # Project to specific fields
        subset = record.project(["name", "age"])
        print(subset.field_names())  # ["name", "age"]
        ```
    """
    projected_fields = OrderedDict()
    for name in field_names:
        if name in self.fields:
            projected_fields[name] = self.fields[name]
    return Record(data=projected_fields, metadata=self.metadata.copy(), id=self.id)
merge
merge(other: Record, overwrite: bool = True) -> Record

Merge another record into this one.

Parameters:

Name Type Description Default
other Record

The record to merge

required
overwrite bool

If True, overwrite existing fields; if False, keep existing

True

Returns:

Type Description
Record

A new merged record

Source code in packages/data/src/dataknobs_data/records.py
def merge(self, other: Record, overwrite: bool = True) -> Record:
    """Merge another record into this one.

    Args:
        other: The record to merge
        overwrite: If True, overwrite existing fields; if False, keep existing

    Returns:
        A new merged record
    """
    merged_fields = OrderedDict(self.fields)
    for name, field_obj in other.fields.items():
        if overwrite or name not in merged_fields:
            merged_fields[name] = field_obj

    merged_metadata = self.metadata.copy()
    if overwrite:
        merged_metadata.update(other.metadata)

    # Use the ID from this record, or from other if this doesn't have one
    merged_id = self.id if self.id else other.id

    return Record(data=merged_fields, metadata=merged_metadata, id=merged_id)

DedupChecker

DedupChecker(
    db: AsyncDatabase,
    config: DedupConfig,
    vector_store: Any | None = None,
    embedding_fn: Callable[[str], Awaitable[list[float]]] | None = None,
)

Checks content uniqueness via hash matching and optional semantic similarity.

Uses an AsyncDatabase for hash-based exact matching and an optional VectorStore for semantic similarity search.

Example

checker = DedupChecker(db=dedup_db, config=DedupConfig()) await checker.register({"content": "A question about math"}, "q-1") result = await checker.check({"content": "A question about math"}) result.recommendation 'exact_duplicate'

Initialize the dedup checker.

Parameters:

Name Type Description Default
db AsyncDatabase

Database for storing content hashes.

required
config DedupConfig

Deduplication configuration.

required
vector_store Any | None

Optional vector store for semantic similarity search. Expects a VectorStore-compatible interface.

None
embedding_fn Callable[[str], Awaitable[list[float]]] | None

Async function that takes text and returns an embedding vector. Required when config.semantic_check is True and vector_store is provided.

None

Methods:

Name Description
compute_hash

Compute a deterministic content hash from configured fields.

check

Check content for duplicates.

register

Register content for future duplicate lookups.

Attributes:

Name Type Description
config DedupConfig

The dedup configuration.

Source code in packages/data/src/dataknobs_data/dedup.py
def __init__(
    self,
    db: AsyncDatabase,
    config: DedupConfig,
    vector_store: Any | None = None,
    embedding_fn: Callable[[str], Awaitable[list[float]]] | None = None,
) -> None:
    """Initialize the dedup checker.

    Args:
        db: Database for storing content hashes.
        config: Deduplication configuration.
        vector_store: Optional vector store for semantic similarity search.
            Expects a ``VectorStore``-compatible interface.
        embedding_fn: Async function that takes text and returns an embedding
            vector. Required when ``config.semantic_check`` is True and
            ``vector_store`` is provided.
    """
    self._db = db
    self._config = config
    self._vector_store = vector_store
    self._embedding_fn = embedding_fn
Attributes
config property
config: DedupConfig

The dedup configuration.

Functions
compute_hash
compute_hash(content: dict[str, Any]) -> str

Compute a deterministic content hash from configured fields.

Fields are joined with | separator to avoid collisions between values like ("a b", "c") and ("a", "b c"). Missing fields are treated as empty strings.

Parameters:

Name Type Description Default
content dict[str, Any]

Content dictionary to hash.

required

Returns:

Type Description
str

Hex digest of the content hash.

Source code in packages/data/src/dataknobs_data/dedup.py
def compute_hash(self, content: dict[str, Any]) -> str:
    """Compute a deterministic content hash from configured fields.

    Fields are joined with ``|`` separator to avoid collisions between
    values like ``("a b", "c")`` and ``("a", "b c")``. Missing fields
    are treated as empty strings.

    Args:
        content: Content dictionary to hash.

    Returns:
        Hex digest of the content hash.
    """
    parts: list[str] = []
    for field_name in self._config.hash_fields:
        value = content.get(field_name, "")
        parts.append(str(value))

    combined = "|".join(parts)

    if self._config.hash_algorithm == "sha256":
        return hashlib.sha256(combined.encode()).hexdigest()
    return hashlib.md5(combined.encode()).hexdigest()
check async
check(content: dict[str, Any]) -> DedupResult

Check content for duplicates.

Performs an exact hash match first, then optionally checks semantic similarity if configured.

Parameters:

Name Type Description Default
content dict[str, Any]

Content dictionary to check.

required

Returns:

Type Description
DedupResult

DedupResult with match information and recommendation.

Source code in packages/data/src/dataknobs_data/dedup.py
async def check(self, content: dict[str, Any]) -> DedupResult:
    """Check content for duplicates.

    Performs an exact hash match first, then optionally checks semantic
    similarity if configured.

    Args:
        content: Content dictionary to check.

    Returns:
        DedupResult with match information and recommendation.
    """
    content_hash = self.compute_hash(content)

    # Step 1: Exact hash match
    exact_match = await self._find_exact_match(content_hash)
    if exact_match:
        return DedupResult(
            is_exact_duplicate=True,
            exact_match_id=exact_match,
            recommendation="exact_duplicate",
            content_hash=content_hash,
        )

    # Step 2: Semantic similarity (optional)
    similar_items: list[SimilarItem] = []
    if (
        self._config.semantic_check
        and self._vector_store is not None
        and self._embedding_fn is not None
    ):
        similar_items = await self._find_similar(content)

    # Step 3: Build recommendation
    recommendation = "unique"
    if similar_items:
        recommendation = "possible_duplicate"

    return DedupResult(
        is_exact_duplicate=False,
        similar_items=similar_items,
        recommendation=recommendation,
        content_hash=content_hash,
    )
register async
register(content: dict[str, Any], record_id: str) -> None

Register content for future duplicate lookups.

Stores the content hash in the database and optionally the embedding in the vector store.

Parameters:

Name Type Description Default
content dict[str, Any]

Content dictionary to register.

required
record_id str

The record ID to associate with this content.

required
Source code in packages/data/src/dataknobs_data/dedup.py
async def register(
    self,
    content: dict[str, Any],
    record_id: str,
) -> None:
    """Register content for future duplicate lookups.

    Stores the content hash in the database and optionally the embedding
    in the vector store.

    Args:
        content: Content dictionary to register.
        record_id: The record ID to associate with this content.
    """
    content_hash = self.compute_hash(content)

    # Store hash record in database
    record = Record({"content_hash": content_hash, "record_id": record_id})
    await self._db.create(record)

    # Store embedding in vector store (if semantic check enabled)
    if (
        self._config.semantic_check
        and self._vector_store is not None
        and self._embedding_fn is not None
    ):
        text = self._build_semantic_text(content)
        embedding = await self._embedding_fn(text)
        await self._vector_store.add_vectors(
            vectors=np.array([embedding], dtype=np.float32),
            ids=[record_id],
            metadata=[{"text": text, "content_hash": content_hash}],
        )

    logger.debug(
        "Registered content for dedup: record_id=%s, hash=%s",
        record_id,
        content_hash[:8],
    )

DedupConfig dataclass

DedupConfig(
    hash_fields: list[str] = (lambda: ["content"])(),
    hash_algorithm: str = "md5",
    semantic_check: bool = False,
    semantic_fields: list[str] | None = None,
    similarity_threshold: float = 0.92,
    max_similar_results: int = 5,
)

Configuration for deduplication checking.

Attributes:

Name Type Description
hash_fields list[str]

Field names used for computing the content hash.

hash_algorithm str

Hash algorithm to use ("md5" or "sha256").

semantic_check bool

Whether to perform semantic similarity search.

semantic_fields list[str] | None

Fields concatenated for embedding. Defaults to hash_fields if not set.

similarity_threshold float

Minimum similarity score to consider a match.

max_similar_results int

Maximum number of similar items to return.

DedupResult dataclass

DedupResult(
    is_exact_duplicate: bool,
    exact_match_id: str | None = None,
    similar_items: list[SimilarItem] = list(),
    recommendation: str = "unique",
    content_hash: str = "",
)

Result of a deduplication check.

Attributes:

Name Type Description
is_exact_duplicate bool

Whether an exact hash match was found.

exact_match_id str | None

The record ID of the exact match, if any.

similar_items list[SimilarItem]

Semantically similar items found.

recommendation str

One of "unique", "possible_duplicate", or "exact_duplicate".

content_hash str

The computed hash of the checked content.

SimilarItem dataclass

SimilarItem(record_id: str, score: float, matched_text: str = '')

A record that is semantically similar to the candidate.

Attributes:

Name Type Description
record_id str

The ID of the similar record.

score float

Similarity score (higher is more similar).

matched_text str

The text that was matched against.

StreamConfig dataclass

StreamConfig(
    batch_size: int = 1000,
    prefetch: int = 2,
    timeout: float | None = None,
    on_error: Callable[[Exception, Record], bool] | None = None,
)

Configuration for streaming operations.

Methods:

Name Description
__post_init__

Validate configuration.

Functions
__post_init__
__post_init__()

Validate configuration.

Source code in packages/data/src/dataknobs_data/streaming.py
def __post_init__(self):
    """Validate configuration."""
    if self.batch_size <= 0:
        raise ValueError("batch_size must be positive")
    if self.prefetch < 0:
        raise ValueError("prefetch must be non-negative")
    if self.timeout is not None and self.timeout <= 0:
        raise ValueError("timeout must be positive if specified")

StreamProcessor

Base class for stream processing utilities.

Methods:

Name Description
batch_iterator

Convert a record iterator into batches.

list_to_iterator

Convert a list of records to an iterator.

list_to_async_iterator

Convert a list of records to an async iterator.

iterator_to_async_iterator

Convert a synchronous iterator to an async iterator.

async_batch_iterator

Convert an async record iterator into batches.

filter_stream

Filter records in a stream.

async_filter_stream

Filter records in an async stream.

transform_stream

Transform records in a stream, filtering out None results.

async_transform_stream

Transform records in an async stream, filtering out None results.

Functions
batch_iterator staticmethod
batch_iterator(
    iterator: Iterator[Record], batch_size: int
) -> Iterator[list[Record]]

Convert a record iterator into batches.

Source code in packages/data/src/dataknobs_data/streaming.py
@staticmethod
def batch_iterator(
    iterator: Iterator[Record],
    batch_size: int
) -> Iterator[list[Record]]:
    """Convert a record iterator into batches."""
    batch = []
    for record in iterator:
        batch.append(record)
        if len(batch) >= batch_size:
            yield batch
            batch = []
    if batch:
        yield batch
list_to_iterator staticmethod
list_to_iterator(records: list[Record]) -> Iterator[Record]

Convert a list of records to an iterator.

Parameters:

Name Type Description Default
records list[Record]

List of records

required

Yields:

Type Description
Record

Individual records from the list

Source code in packages/data/src/dataknobs_data/streaming.py
@staticmethod
def list_to_iterator(records: list[Record]) -> Iterator[Record]:
    """Convert a list of records to an iterator.

    Args:
        records: List of records

    Yields:
        Individual records from the list
    """
    for record in records:
        yield record
list_to_async_iterator async staticmethod
list_to_async_iterator(records: list[Record]) -> AsyncIterator[Record]

Convert a list of records to an async iterator.

This adapter allows synchronous lists to be used with async streaming APIs.

Parameters:

Name Type Description Default
records list[Record]

List of records

required

Yields:

Type Description
AsyncIterator[Record]

Individual records from the list

Source code in packages/data/src/dataknobs_data/streaming.py
@staticmethod
async def list_to_async_iterator(records: list[Record]) -> AsyncIterator[Record]:
    """Convert a list of records to an async iterator.

    This adapter allows synchronous lists to be used with async streaming APIs.

    Args:
        records: List of records

    Yields:
        Individual records from the list
    """
    for record in records:
        yield record
iterator_to_async_iterator async staticmethod
iterator_to_async_iterator(iterator: Iterator[Record]) -> AsyncIterator[Record]

Convert a synchronous iterator to an async iterator.

This adapter allows synchronous iterators to be used with async streaming APIs.

Parameters:

Name Type Description Default
iterator Iterator[Record]

Synchronous iterator of records

required

Yields:

Type Description
AsyncIterator[Record]

Individual records from the iterator

Source code in packages/data/src/dataknobs_data/streaming.py
@staticmethod
async def iterator_to_async_iterator(iterator: Iterator[Record]) -> AsyncIterator[Record]:
    """Convert a synchronous iterator to an async iterator.

    This adapter allows synchronous iterators to be used with async streaming APIs.

    Args:
        iterator: Synchronous iterator of records

    Yields:
        Individual records from the iterator
    """
    for record in iterator:
        yield record
async_batch_iterator async staticmethod
async_batch_iterator(
    iterator: AsyncIterator[Record], batch_size: int
) -> AsyncIterator[list[Record]]

Convert an async record iterator into batches.

Source code in packages/data/src/dataknobs_data/streaming.py
@staticmethod
async def async_batch_iterator(
    iterator: AsyncIterator[Record],
    batch_size: int
) -> AsyncIterator[list[Record]]:
    """Convert an async record iterator into batches."""
    batch = []
    async for record in iterator:
        batch.append(record)
        if len(batch) >= batch_size:
            yield batch
            batch = []
    if batch:
        yield batch
filter_stream staticmethod
filter_stream(
    iterator: Iterator[Record], predicate: Callable[[Record], bool]
) -> Iterator[Record]

Filter records in a stream.

Source code in packages/data/src/dataknobs_data/streaming.py
@staticmethod
def filter_stream(
    iterator: Iterator[Record],
    predicate: Callable[[Record], bool]
) -> Iterator[Record]:
    """Filter records in a stream."""
    for record in iterator:
        if predicate(record):
            yield record
async_filter_stream async staticmethod
async_filter_stream(
    iterator: AsyncIterator[Record], predicate: Callable[[Record], bool]
) -> AsyncIterator[Record]

Filter records in an async stream.

Source code in packages/data/src/dataknobs_data/streaming.py
@staticmethod
async def async_filter_stream(
    iterator: AsyncIterator[Record],
    predicate: Callable[[Record], bool]
) -> AsyncIterator[Record]:
    """Filter records in an async stream."""
    async for record in iterator:
        if predicate(record):
            yield record
transform_stream staticmethod
transform_stream(
    iterator: Iterator[Record], transform: Callable[[Record], Record | None]
) -> Iterator[Record]

Transform records in a stream, filtering out None results.

Source code in packages/data/src/dataknobs_data/streaming.py
@staticmethod
def transform_stream(
    iterator: Iterator[Record],
    transform: Callable[[Record], Record | None]
) -> Iterator[Record]:
    """Transform records in a stream, filtering out None results."""
    for record in iterator:
        result = transform(record)
        if result is not None:
            yield result
async_transform_stream async staticmethod
async_transform_stream(
    iterator: AsyncIterator[Record],
    transform: Callable[[Record], Record | None],
) -> AsyncIterator[Record]

Transform records in an async stream, filtering out None results.

Source code in packages/data/src/dataknobs_data/streaming.py
@staticmethod
async def async_transform_stream(
    iterator: AsyncIterator[Record],
    transform: Callable[[Record], Record | None]
) -> AsyncIterator[Record]:
    """Transform records in an async stream, filtering out None results."""
    async for record in iterator:
        result = transform(record)
        if result is not None:
            yield result

StreamResult dataclass

StreamResult(
    total_processed: int = 0,
    successful: int = 0,
    failed: int = 0,
    errors: list[dict[str, Any]] = list(),
    duration: float = 0.0,
    total_batches: int = 0,
    failed_indices: list[int] = list(),
)

Result of streaming operation.

Methods:

Name Description
add_error

Add an error to the result.

merge

Merge another result into this one.

__str__

Human-readable representation.

Attributes:

Name Type Description
success_rate float

Calculate success rate as percentage.

Attributes
success_rate property
success_rate: float

Calculate success rate as percentage.

Functions
add_error
add_error(
    record_id: str | None, error: Exception, index: int | None = None
) -> None

Add an error to the result.

Parameters:

Name Type Description Default
record_id str | None

ID of the record that failed

required
error Exception

The exception that occurred

required
index int | None

Optional index of the failed record in the original batch

None
Source code in packages/data/src/dataknobs_data/streaming.py
def add_error(self, record_id: str | None, error: Exception, index: int | None = None) -> None:
    """Add an error to the result.

    Args:
        record_id: ID of the record that failed
        error: The exception that occurred
        index: Optional index of the failed record in the original batch
    """
    self.errors.append({
        "record_id": record_id,
        "error": str(error),
        "type": type(error).__name__,
        "index": index
    })
    if index is not None:
        self.failed_indices.append(index)
merge
merge(other: StreamResult) -> None

Merge another result into this one.

Source code in packages/data/src/dataknobs_data/streaming.py
def merge(self, other: StreamResult) -> None:
    """Merge another result into this one."""
    self.total_processed += other.total_processed
    self.successful += other.successful
    self.failed += other.failed
    self.errors.extend(other.errors)
    self.duration += other.duration
    self.total_batches += other.total_batches
    self.failed_indices.extend(other.failed_indices)
__str__
__str__() -> str

Human-readable representation.

Source code in packages/data/src/dataknobs_data/streaming.py
def __str__(self) -> str:
    """Human-readable representation."""
    return (
        f"StreamResult(processed={self.total_processed}, "
        f"successful={self.successful}, failed={self.failed}, "
        f"success_rate={self.success_rate:.1f}%, "
        f"duration={self.duration:.2f}s)"
    )