dataknobs-data Complete API Reference¶
Complete auto-generated API documentation from source code docstrings.
💡 Also see: - Curated Guide - Hand-crafted tutorials and examples - Package Overview - Introduction and getting started - Source Code - View on GitHub
dataknobs_data ¶
DataKnobs Data Package - Unified data abstraction layer.
The dataknobs-data package provides a unified interface for working with various
database backends, including SQLite, PostgreSQL, Elasticsearch, and S3. It offers
structured data storage, querying, validation, migration, and vector search capabilities.
Modules:
| Name | Description |
|---|---|
database |
Core Database classes (SyncDatabase, AsyncDatabase) providing the main API |
records |
Record class for structured data with fields and metadata |
fields |
Field types and definitions for data validation |
schema |
Database schema definitions and field schemas |
query |
Query building with filters, operators, and sorting |
query_logic |
Complex boolean logic queries with AND/OR/NOT operators |
factory |
Database factory functions for creating database instances |
streaming |
Streaming operations for large-scale data processing |
validation |
Data validation with schemas and constraints |
migration |
Data migration tools for moving between backends |
exceptions |
Custom exceptions for error handling |
Quick Examples:
Create and query a database:
```python
from dataknobs_data import database_factory, Record, Query, Operator, Filter
# Create an in-memory database
db = database_factory("memory")
# Add records
db.add(Record({"name": "Alice", "age": 30}))
db.add(Record({"name": "Bob", "age": 25}))
# Query with filters
query = Query(filters=[Filter("age", Operator.GT, 25)])
results = db.search(query)
print(results) # [Record with Alice's data]
```
Use schemas for validation:
```python
from dataknobs_data import database_factory, Record, FieldType
from dataknobs_data.schema import DatabaseSchema
# Define schema
schema = DatabaseSchema.create(
name=FieldType.STRING,
age=FieldType.INTEGER,
email=FieldType.STRING
)
# Create database with schema
db = database_factory("memory", config={"schema": schema})
db.add(Record({"name": "Alice", "age": 30, "email": "alice@example.com"}))
```
Stream large datasets:
```python
from dataknobs_data import database_factory, StreamConfig
db = database_factory("sqlite", config={"path": "large_data.db"})
# Stream records in batches
config = StreamConfig(batch_size=100)
for batch in db.stream(config=config):
process_batch(batch.records)
```
Design Philosophy:
1. **Backend Agnostic** - Write once, deploy anywhere with multiple backend support
2. **Type Safe** - Strong typing with schema validation and field type checking
3. **Async Ready** - Full async/await support for high-performance applications
4. **Composable** - Mix and match features like validation, migration, and vector search
Installation:
```bash
pip install dataknobs-data
```
For detailed documentation, see the individual module docstrings and the online documentation at https://docs.kbs-labs.com/dataknobs
Classes:
| Name | Description |
|---|---|
AsyncDatabase |
Abstract base class for async database implementations. |
SyncDatabase |
Synchronous variant of the Database abstract base class. |
BackendNotFoundError |
Raised when a requested backend is not available. |
ConcurrencyError |
Raised when a concurrency conflict occurs. |
ConfigurationError |
Raised when configuration is invalid. |
DatabaseConnectionError |
Raised when database connection fails. |
DatabaseOperationError |
Raised when a database operation fails. |
FieldTypeError |
Raised when a field type operation fails. |
MigrationError |
Raised when data migration fails. |
QueryError |
Raised when query execution fails. |
RecordNotFoundError |
Raised when a requested record is not found. |
RecordValidationError |
Raised when record validation fails. |
SerializationError |
Raised when serialization/deserialization fails. |
TransactionError |
Raised when a transaction fails. |
AsyncDatabaseFactory |
Factory for creating async database backends. |
DatabaseFactory |
Factory for creating database backends dynamically. |
Field |
Represents a single field in a record. |
FieldType |
Enumeration of supported field types. |
VectorField |
Represents a vector field with embeddings and metadata. |
Filter |
Represents a filter condition. |
Operator |
Query operators for filtering. |
Query |
Represents a database query with filters, sorting, pagination, and vector search. |
SortOrder |
Sort order for query results. |
SortSpec |
Represents a sort specification. |
ComplexQuery |
A query with complex boolean logic support. |
Condition |
Abstract base class for query conditions. |
FilterCondition |
A single filter condition. |
LogicCondition |
A logical combination of conditions. |
LogicOperator |
Logical operators for combining conditions. |
QueryBuilder |
Builder for complex queries with boolean logic. |
Record |
Represents a structured data record with fields and metadata. |
DedupChecker |
Checks content uniqueness via hash matching and optional semantic similarity. |
DedupConfig |
Configuration for deduplication checking. |
DedupResult |
Result of a deduplication check. |
SimilarItem |
A record that is semantically similar to the candidate. |
StreamConfig |
Configuration for streaming operations. |
StreamProcessor |
Base class for stream processing utilities. |
StreamResult |
Result of streaming operation. |
Classes¶
AsyncDatabase ¶
Bases: ABC
Abstract base class for async database implementations.
Provides a unified async interface for CRUD operations, querying, and streaming across different backend databases. Supports schema validation, batch operations, and complex queries with boolean logic.
Example
from dataknobs_data import async_database_factory, Record, Query, Filter, Operator
# Create async database
db = async_database_factory("memory")
# Use as async context manager
async with db:
# Create records
id1 = await db.create(Record({"name": "Alice", "age": 30}))
id2 = await db.create(Record({"name": "Bob", "age": 25}))
# Query records
query = Query(filters=[Filter("age", Operator.GT, 25)])
results = await db.search(query)
print(results) # [Alice's record]
# Update record
await db.update(id1, Record({"name": "Alice", "age": 31}))
# Stream large datasets
async for record in db.stream_read():
process_record(record)
Initialize the database with optional configuration.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
dict[str, Any] | None
|
Backend-specific configuration parameters (may include 'schema' key) |
None
|
schema
|
DatabaseSchema | None
|
Optional database schema (overrides config schema) |
None
|
Example
Methods:
| Name | Description |
|---|---|
set_schema |
Set the database schema. |
add_field_schema |
Add a field to the database schema. |
with_schema |
Set schema using field definitions. |
create |
Create a new record in the database. |
read |
Read a record by ID. |
update |
Update an existing record. |
delete |
Delete a record by ID. |
search |
Search for records matching a query. |
all |
Get all records from the database. |
exists |
Check if a record exists. |
upsert |
Update or insert a record. |
create_batch |
Create multiple records in batch. |
read_batch |
Read multiple records by ID. |
delete_batch |
Delete multiple records by ID. |
update_batch |
Update multiple records. |
count |
Count records matching a query. |
clear |
Clear all records from the database. |
connect |
Connect to the database. Override in subclasses if needed. |
close |
Close the database connection. Override in subclasses if needed. |
disconnect |
Disconnect from the database (alias for close). |
__aenter__ |
Async context manager entry. |
__aexit__ |
Async context manager exit. |
stream_read |
Stream records from database. |
stream_write |
Stream records into database. |
stream_transform |
Stream records through a transformation. |
from_backend |
Factory method to create and connect a database instance. |
Source code in packages/data/src/dataknobs_data/database.py
Functions¶
set_schema ¶
Set the database schema.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
schema
|
DatabaseSchema
|
The database schema to use |
required |
add_field_schema ¶
Add a field to the database schema.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
field_schema
|
FieldSchema
|
The field schema to add |
required |
with_schema ¶
Set schema using field definitions.
Returns self for chaining.
Examples:
db = AsyncMemoryDatabase().with_schema( content=FieldType.TEXT, embedding=(FieldType.VECTOR, {"dimensions": 384, "source_field": "content"}) )
Source code in packages/data/src/dataknobs_data/database.py
create
abstractmethod
async
¶
Create a new record in the database.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
record
|
Record
|
The record to create |
required |
Returns:
| Type | Description |
|---|---|
str
|
The ID of the created record |
Source code in packages/data/src/dataknobs_data/database.py
read
abstractmethod
async
¶
Read a record by ID.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
id
|
str
|
The record ID |
required |
Returns:
| Type | Description |
|---|---|
Record | None
|
The record if found, None otherwise |
update
abstractmethod
async
¶
Update an existing record.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
id
|
str
|
The record ID |
required |
record
|
Record
|
The updated record |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if the record was updated, False if not found |
Source code in packages/data/src/dataknobs_data/database.py
delete
abstractmethod
async
¶
Delete a record by ID.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
id
|
str
|
The record ID |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if the record was deleted, False if not found |
search
abstractmethod
async
¶
Search for records matching a query.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query
|
Query | ComplexQuery
|
The search query (simple or complex) |
required |
Returns:
| Type | Description |
|---|---|
list[Record]
|
List of matching records |
Source code in packages/data/src/dataknobs_data/database.py
all
async
¶
Get all records from the database.
Returns:
| Type | Description |
|---|---|
list[Record]
|
List of all records |
Source code in packages/data/src/dataknobs_data/database.py
exists
abstractmethod
async
¶
Check if a record exists.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
id
|
str
|
The record ID |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if the record exists, False otherwise |
upsert
async
¶
Update or insert a record.
Can be called as: - upsert(id, record) - explicit ID and record - upsert(record) - extract ID from record using Record's built-in logic
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
id_or_record
|
str | Record
|
Either an ID string or a Record |
required |
record
|
Record | None
|
The record to upsert (if first arg is ID) |
None
|
Returns:
| Type | Description |
|---|---|
str
|
The record ID |
Source code in packages/data/src/dataknobs_data/database.py
create_batch
async
¶
Create multiple records in batch.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
records
|
list[Record]
|
List of records to create |
required |
Returns:
| Type | Description |
|---|---|
list[str]
|
List of created record IDs |
Source code in packages/data/src/dataknobs_data/database.py
read_batch
async
¶
Read multiple records by ID.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
ids
|
list[str]
|
List of record IDs |
required |
Returns:
| Type | Description |
|---|---|
list[Record | None]
|
List of records (None for not found) |
Source code in packages/data/src/dataknobs_data/database.py
delete_batch
async
¶
Delete multiple records by ID.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
ids
|
list[str]
|
List of record IDs |
required |
Returns:
| Type | Description |
|---|---|
list[bool]
|
List of deletion results |
Source code in packages/data/src/dataknobs_data/database.py
update_batch
async
¶
Update multiple records.
Default implementation calls update() for each ID/record pair. Override for better performance.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
updates
|
list[tuple[str, Record]]
|
List of (id, record) tuples to update |
required |
Returns:
| Type | Description |
|---|---|
list[bool]
|
List of success flags for each update |
Source code in packages/data/src/dataknobs_data/database.py
count
async
¶
Count records matching a query.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query
|
Query | None
|
Optional search query (counts all if None) |
None
|
Returns:
| Type | Description |
|---|---|
int
|
Number of matching records |
Source code in packages/data/src/dataknobs_data/database.py
clear
async
¶
Clear all records from the database.
Returns:
| Type | Description |
|---|---|
int
|
Number of records deleted |
connect
async
¶
close
async
¶
disconnect
async
¶
__aenter__
async
¶
__aexit__
async
¶
stream_read
abstractmethod
async
¶
stream_read(
query: Query | None = None, config: StreamConfig | None = None
) -> AsyncIterator[Record]
Stream records from database.
Yields records one at a time, fetching in batches internally.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query
|
Query | None
|
Optional query to filter records |
None
|
config
|
StreamConfig | None
|
Streaming configuration |
None
|
Yields:
| Type | Description |
|---|---|
AsyncIterator[Record]
|
Records matching the query |
Source code in packages/data/src/dataknobs_data/database.py
stream_write
abstractmethod
async
¶
Stream records into database.
Accepts an iterator and writes in batches.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
records
|
AsyncIterator[Record]
|
Iterator of records to write |
required |
config
|
StreamConfig | None
|
Streaming configuration |
None
|
Returns:
| Type | Description |
|---|---|
StreamResult
|
Result of the streaming operation |
Source code in packages/data/src/dataknobs_data/database.py
stream_transform
async
¶
stream_transform(
query: Query | None = None,
transform: Callable[[Record], Record | None] | None = None,
config: StreamConfig | None = None,
) -> AsyncIterator[Record]
Stream records through a transformation.
Default implementation, can be overridden for efficiency.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query
|
Query | None
|
Optional query to filter records |
None
|
transform
|
Callable[[Record], Record | None] | None
|
Optional transformation function |
None
|
config
|
StreamConfig | None
|
Streaming configuration |
None
|
Yields:
| Type | Description |
|---|---|
AsyncIterator[Record]
|
Transformed records |
Source code in packages/data/src/dataknobs_data/database.py
from_backend
async
classmethod
¶
Factory method to create and connect a database instance.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
backend
|
str
|
The backend type ("memory", "file", "s3", "postgres", "elasticsearch") |
required |
config
|
dict[str, Any] | None
|
Backend-specific configuration |
None
|
Returns:
| Type | Description |
|---|---|
AsyncDatabase
|
Connected AsyncDatabase instance |
Source code in packages/data/src/dataknobs_data/database.py
SyncDatabase ¶
Bases: ABC
Synchronous variant of the Database abstract base class.
Provides a unified synchronous interface for CRUD operations, querying, and streaming across different backend databases. Supports schema validation, batch operations, and complex queries with boolean logic.
Example
from dataknobs_data import database_factory, Record, Query, Filter, Operator
# Create database
db = database_factory("memory")
# Use as context manager
with db:
# Create records
id1 = db.create(Record({"name": "Alice", "age": 30}))
id2 = db.create(Record({"name": "Bob", "age": 25}))
# Query records
query = Query(filters=[Filter("age", Operator.GT, 25)])
results = db.search(query)
print(results) # [Alice's record]
# Update record
db.update(id1, Record({"name": "Alice", "age": 31}))
# Stream large datasets
for record in db.stream_read():
process_record(record)
Initialize the database with optional configuration.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
dict[str, Any] | None
|
Backend-specific configuration parameters (may include 'schema' key) |
None
|
schema
|
DatabaseSchema | None
|
Optional database schema (overrides config schema) |
None
|
Example
Methods:
| Name | Description |
|---|---|
set_schema |
Set the database schema. |
add_field_schema |
Add a field to the database schema. |
with_schema |
Set schema using field definitions. |
create |
Create a new record in the database. |
read |
Read a record by ID. |
update |
Update an existing record. |
delete |
Delete a record by ID. |
search |
Search for records matching a query (simple or complex). |
all |
Get all records from the database. |
exists |
Check if a record exists. |
upsert |
Update or insert a record. |
create_batch |
Create multiple records in batch. |
read_batch |
Read multiple records by ID. |
delete_batch |
Delete multiple records by ID. |
update_batch |
Update multiple records. |
count |
Count records matching a query. |
clear |
Clear all records from the database. |
connect |
Connect to the database. Override in subclasses if needed. |
close |
Close the database connection. Override in subclasses if needed. |
disconnect |
Disconnect from the database (alias for close). |
__enter__ |
Context manager entry. |
__exit__ |
Context manager exit. |
stream_read |
Stream records from database. |
stream_write |
Stream records into database. |
stream_transform |
Stream records through a transformation. |
from_backend |
Factory method to create and connect a synchronous database instance. |
Source code in packages/data/src/dataknobs_data/database.py
Functions¶
set_schema ¶
Set the database schema.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
schema
|
DatabaseSchema
|
The database schema to use |
required |
add_field_schema ¶
Add a field to the database schema.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
field_schema
|
FieldSchema
|
The field schema to add |
required |
with_schema ¶
Set schema using field definitions.
Returns self for chaining.
Examples:
db = SyncMemoryDatabase().with_schema( content=FieldType.TEXT, embedding=(FieldType.VECTOR, {"dimensions": 384, "source_field": "content"}) )
Source code in packages/data/src/dataknobs_data/database.py
create
abstractmethod
¶
Create a new record in the database.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
record
|
Record
|
The record to create |
required |
Returns:
| Type | Description |
|---|---|
str
|
The ID of the created record |
Example
Source code in packages/data/src/dataknobs_data/database.py
read
abstractmethod
¶
Read a record by ID.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
id
|
str
|
The record ID |
required |
Returns:
| Type | Description |
|---|---|
Record | None
|
The record if found, None otherwise |
Example
Source code in packages/data/src/dataknobs_data/database.py
update
abstractmethod
¶
Update an existing record.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
id
|
str
|
The record ID |
required |
record
|
Record
|
The updated record |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if the record was updated, False if not found |
Example
Source code in packages/data/src/dataknobs_data/database.py
delete
abstractmethod
¶
Delete a record by ID.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
id
|
str
|
The record ID |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if the record was deleted, False if not found |
Source code in packages/data/src/dataknobs_data/database.py
search
abstractmethod
¶
Search for records matching a query (simple or complex).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query
|
Query | ComplexQuery
|
The search query |
required |
Returns:
| Type | Description |
|---|---|
list[Record]
|
List of matching records |
Example
# Simple query
query = Query(filters=[Filter("age", Operator.GT, 25)])
results = db.search(query)
# Complex query with boolean logic
from dataknobs_data.query_logic import QueryBuilder, LogicOperator
complex_query = (
QueryBuilder()
.where("age", Operator.GT, 25)
.and_where("name", Operator.LIKE, "A%")
.build()
)
results = db.search(complex_query)
Source code in packages/data/src/dataknobs_data/database.py
all ¶
Get all records from the database.
Returns:
| Type | Description |
|---|---|
list[Record]
|
List of all records |
Source code in packages/data/src/dataknobs_data/database.py
exists
abstractmethod
¶
upsert ¶
Update or insert a record.
Can be called as: - upsert(id, record) - explicit ID and record - upsert(record) - extract ID from record using Record's built-in logic
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
id_or_record
|
str | Record
|
Either an ID string or a Record |
required |
record
|
Record | None
|
The record to upsert (if first arg is ID) |
None
|
Returns:
| Type | Description |
|---|---|
str
|
The record ID |
Source code in packages/data/src/dataknobs_data/database.py
create_batch ¶
Create multiple records in batch.
read_batch ¶
Read multiple records by ID.
delete_batch ¶
update_batch ¶
Update multiple records.
Default implementation calls update() for each ID/record pair. Override for better performance.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
updates
|
list[tuple[str, Record]]
|
List of (id, record) tuples to update |
required |
Returns:
| Type | Description |
|---|---|
list[bool]
|
List of success flags for each update |
Source code in packages/data/src/dataknobs_data/database.py
count ¶
clear ¶
connect ¶
close ¶
disconnect ¶
__enter__ ¶
__exit__ ¶
stream_read
abstractmethod
¶
Stream records from database.
Yields records one at a time, fetching in batches internally.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query
|
Query | None
|
Optional query to filter records |
None
|
config
|
StreamConfig | None
|
Streaming configuration |
None
|
Yields:
| Type | Description |
|---|---|
Record
|
Records matching the query |
Source code in packages/data/src/dataknobs_data/database.py
stream_write
abstractmethod
¶
Stream records into database.
Accepts an iterator and writes in batches.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
records
|
Iterator[Record]
|
Iterator of records to write |
required |
config
|
StreamConfig | None
|
Streaming configuration |
None
|
Returns:
| Type | Description |
|---|---|
StreamResult
|
Result of the streaming operation |
Source code in packages/data/src/dataknobs_data/database.py
stream_transform ¶
stream_transform(
query: Query | None = None,
transform: Callable[[Record], Record | None] | None = None,
config: StreamConfig | None = None,
) -> Iterator[Record]
Stream records through a transformation.
Default implementation, can be overridden for efficiency.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query
|
Query | None
|
Optional query to filter records |
None
|
transform
|
Callable[[Record], Record | None] | None
|
Optional transformation function |
None
|
config
|
StreamConfig | None
|
Streaming configuration |
None
|
Yields:
| Type | Description |
|---|---|
Record
|
Transformed records |
Source code in packages/data/src/dataknobs_data/database.py
from_backend
classmethod
¶
Factory method to create and connect a synchronous database instance.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
backend
|
str
|
The backend type ("memory", "file", "s3", "postgres", "elasticsearch") |
required |
config
|
dict[str, Any] | None
|
Backend-specific configuration |
None
|
Returns:
| Type | Description |
|---|---|
SyncDatabase
|
Connected SyncDatabase instance |
Source code in packages/data/src/dataknobs_data/database.py
BackendNotFoundError ¶
Bases: NotFoundError
Raised when a requested backend is not available.
Source code in packages/data/src/dataknobs_data/exceptions.py
ConcurrencyError ¶
Bases: ConcurrencyError
Raised when a concurrency conflict occurs.
Source code in packages/data/src/dataknobs_data/exceptions.py
ConfigurationError ¶
Bases: ConfigurationError
Raised when configuration is invalid.
Source code in packages/data/src/dataknobs_data/exceptions.py
DatabaseConnectionError ¶
Bases: ResourceError
Raised when database connection fails.
Source code in packages/data/src/dataknobs_data/exceptions.py
DatabaseOperationError ¶
Bases: OperationError
Raised when a database operation fails.
Source code in packages/data/src/dataknobs_data/exceptions.py
FieldTypeError ¶
Bases: ValidationError
Raised when a field type operation fails.
Source code in packages/data/src/dataknobs_data/exceptions.py
MigrationError ¶
Bases: OperationError
Raised when data migration fails.
Source code in packages/data/src/dataknobs_data/exceptions.py
QueryError ¶
Bases: OperationError
Raised when query execution fails.
Source code in packages/data/src/dataknobs_data/exceptions.py
RecordNotFoundError ¶
Bases: NotFoundError
Raised when a requested record is not found.
Source code in packages/data/src/dataknobs_data/exceptions.py
RecordValidationError ¶
Bases: ValidationError
Raised when record validation fails.
Source code in packages/data/src/dataknobs_data/exceptions.py
SerializationError ¶
Bases: SerializationError
Raised when serialization/deserialization fails.
Source code in packages/data/src/dataknobs_data/exceptions.py
TransactionError ¶
Bases: OperationError
Raised when a transaction fails.
Source code in packages/data/src/dataknobs_data/exceptions.py
AsyncDatabaseFactory ¶
Bases: FactoryBase
Factory for creating async database backends.
Note: Currently only some backends support async operations.
Methods:
| Name | Description |
|---|---|
create |
Create an async database instance. |
Functions¶
create ¶
Create an async database instance.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
**config
|
Any
|
Configuration including 'backend' field |
{}
|
Returns:
| Type | Description |
|---|---|
Any
|
Instance of appropriate async database backend |
Raises:
| Type | Description |
|---|---|
ValueError
|
If backend doesn't support async operations |
Source code in packages/data/src/dataknobs_data/factory.py
DatabaseFactory ¶
Bases: FactoryBase
Factory for creating database backends dynamically.
This factory allows creating different database implementations based on configuration, supporting all available backends.
Example Configuration
databases: - name: main factory: database backend: postgres host: localhost database: myapp
-
name: cache factory: database backend: memory
-
name: archive factory: database backend: s3 bucket: my-archive-bucket prefix: archives/
Methods:
| Name | Description |
|---|---|
create |
Create a database instance based on configuration. |
get_backend_info |
Get information about a specific backend. |
Functions¶
create ¶
Create a database instance based on configuration.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
**config
|
Any
|
Configuration including 'backend' field and backend-specific options |
{}
|
Returns:
| Type | Description |
|---|---|
SyncDatabase
|
Instance of appropriate database backend |
Raises:
| Type | Description |
|---|---|
ValueError
|
If backend type is not recognized or not available |
Source code in packages/data/src/dataknobs_data/factory.py
get_backend_info ¶
Get information about a specific backend.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
backend_type
|
str
|
Name of the backend |
required |
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dictionary with backend information from registry metadata |
Source code in packages/data/src/dataknobs_data/factory.py
Field
dataclass
¶
Represents a single field in a record.
A Field encapsulates a named value along with its type and optional metadata. Field types are automatically detected if not explicitly provided.
Attributes:
| Name | Type | Description |
|---|---|---|
name |
str
|
The field name |
value |
Any
|
The field value (can be any Python type) |
type |
FieldType | None
|
The field type (auto-detected if None) |
metadata |
dict[str, Any]
|
Optional metadata dictionary |
Example
from dataknobs_data import Field, FieldType
# Auto-detected type
name = Field(name="name", value="Alice")
print(name.type) # FieldType.STRING
# Explicit type
score = Field(name="score", value=95.5, type=FieldType.FLOAT)
# With metadata
vector = Field(
name="embedding",
value=[0.1, 0.2, 0.3],
type=FieldType.VECTOR,
metadata={"dimensions": 3, "model": "text-embedding-3-small"}
)
# Validation
is_valid = name.validate() # True
# Type conversion
str_score = score.convert_to(FieldType.STRING)
print(str_score.value) # "95.5"
Methods:
| Name | Description |
|---|---|
__post_init__ |
Auto-detect type if not provided. |
copy |
Create a deep copy of the field. |
validate |
Validate that the value matches the field type. |
convert_to |
Convert the field to a different type. |
to_dict |
Convert the field to a dictionary representation. |
from_dict |
Create a field from a dictionary representation. |
Functions¶
__post_init__ ¶
copy ¶
Create a deep copy of the field.
validate ¶
Validate that the value matches the field type.
Returns:
| Type | Description |
|---|---|
bool
|
True if the value is valid for the field type, False otherwise |
Example
Source code in packages/data/src/dataknobs_data/fields.py
convert_to ¶
Convert the field to a different type.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
target_type
|
FieldType
|
The target FieldType to convert to |
required |
Returns:
| Type | Description |
|---|---|
Field
|
A new Field with the converted value and type |
Raises:
| Type | Description |
|---|---|
ValueError
|
If conversion is not possible or fails |
Example
# Integer to string
age = Field(name="age", value=30, type=FieldType.INTEGER)
age_str = age.convert_to(FieldType.STRING)
print(age_str.value) # "30"
# String to integer
count = Field(name="count", value="42", type=FieldType.STRING)
count_int = count.convert_to(FieldType.INTEGER)
print(count_int.value) # 42
Source code in packages/data/src/dataknobs_data/fields.py
to_dict ¶
Convert the field to a dictionary representation.
Source code in packages/data/src/dataknobs_data/fields.py
from_dict
classmethod
¶
Create a field from a dictionary representation.
Source code in packages/data/src/dataknobs_data/fields.py
FieldType ¶
Bases: Enum
Enumeration of supported field types.
Defines the data types that can be stored in Record fields. Field types enable type validation, schema enforcement, and backend-specific optimizations.
Attributes:
| Name | Type | Description |
|---|---|---|
STRING |
Short text (< 1000 chars) |
|
TEXT |
Long text content |
|
INTEGER |
Whole numbers |
|
FLOAT |
Decimal numbers |
|
BOOLEAN |
True/False values |
|
DATETIME |
Date and time values |
|
JSON |
Structured JSON data (dicts, lists) |
|
BINARY |
Binary data (bytes) |
|
VECTOR |
Dense vector embeddings for similarity search |
|
SPARSE_VECTOR |
Sparse vector representations |
Example
from dataknobs_data import Field, FieldType
# Create typed fields
name_field = Field(name="name", value="Alice", type=FieldType.STRING)
age_field = Field(name="age", value=30, type=FieldType.INTEGER)
tags_field = Field(name="tags", value=["python", "data"], type=FieldType.JSON)
# Auto-detection (type is inferred from value)
auto_field = Field(name="score", value=95.5) # Auto-detected as FLOAT
VectorField ¶
VectorField(
value: ndarray | list[float],
name: str | None = None,
dimensions: int | None = None,
source_field: str | None = None,
model_name: str | None = None,
model_version: str | None = None,
metadata: dict[str, Any] | None = None,
)
Bases: Field
Represents a vector field with embeddings and metadata.
Examples:
Simple usage - name optional when used in Record¶
record = Record({ "embedding": VectorField(value=[0.1, 0.2, 0.3]) })
With explicit configuration¶
import numpy as np embedding_array = np.array([0.1, 0.2, 0.3]) field = VectorField( value=embedding_array, name="doc_embedding", model_name="all-MiniLM-L6-v2", source_field="content" )
From text using embedding function¶
def my_embedding_fn(text): # In practice, use a real model like sentence-transformers return np.array([0.1, 0.2, 0.3])
field = VectorField.from_text( "This is the text to embed", embedding_fn=my_embedding_fn )
Initialize a vector field.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
value
|
ndarray | list[float]
|
Vector data as numpy array or list of floats |
required |
name
|
str | None
|
Field name (optional, defaults to "embedding") |
None
|
dimensions
|
int | None
|
Expected dimensions (auto-detected if not provided) |
None
|
source_field
|
str | None
|
Name of the text field this vector was generated from |
None
|
model_name
|
str | None
|
Name of the embedding model used |
None
|
model_version
|
str | None
|
Version of the embedding model |
None
|
metadata
|
dict[str, Any] | None
|
Additional metadata |
None
|
Methods:
| Name | Description |
|---|---|
from_text |
Create a VectorField from text using an embedding function. |
validate |
Validate the vector field. |
to_list |
Convert vector to a list of floats. |
cosine_similarity |
Compute cosine similarity with another vector. |
euclidean_distance |
Compute Euclidean distance to another vector. |
to_dict |
Convert to dictionary representation. |
from_dict |
Create from dictionary representation. |
Source code in packages/data/src/dataknobs_data/fields.py
319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 | |
Functions¶
from_text
classmethod
¶
from_text(
text: str,
embedding_fn: Callable[[str], Any],
name: str | None = None,
dimensions: int | None = None,
model_name: str | None = None,
model_version: str | None = None,
**kwargs: Any,
) -> VectorField
Create a VectorField from text using an embedding function.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
text
|
str
|
Text to embed |
required |
embedding_fn
|
Callable[[str], Any]
|
Function that takes text and returns embedding vector |
required |
name
|
str | None
|
Field name (optional, defaults to "embedding") |
None
|
dimensions
|
int | None
|
Expected dimensions (auto-detected if not provided) |
None
|
model_name
|
str | None
|
Name of the embedding model |
None
|
model_version
|
str | None
|
Version of the embedding model |
None
|
**kwargs
|
Any
|
Additional arguments passed to VectorField constructor |
{}
|
Returns:
| Type | Description |
|---|---|
VectorField
|
VectorField instance with the generated embedding |
Example
field = VectorField.from_text( "Machine learning is fascinating", embedding_fn=model.encode, model_name="all-MiniLM-L6-v2" )
Source code in packages/data/src/dataknobs_data/fields.py
validate ¶
Validate the vector field.
Source code in packages/data/src/dataknobs_data/fields.py
to_list ¶
Convert vector to a list of floats.
cosine_similarity ¶
Compute cosine similarity with another vector.
Source code in packages/data/src/dataknobs_data/fields.py
euclidean_distance ¶
Compute Euclidean distance to another vector.
Source code in packages/data/src/dataknobs_data/fields.py
to_dict ¶
Convert to dictionary representation.
Source code in packages/data/src/dataknobs_data/fields.py
from_dict
classmethod
¶
Create from dictionary representation.
Source code in packages/data/src/dataknobs_data/fields.py
Filter
dataclass
¶
Represents a filter condition.
A Filter combines a field name, an operator, and a value to create a query condition. Multiple filters can be combined in a Query for complex filtering.
Attributes:
| Name | Type | Description |
|---|---|---|
field |
str
|
The field name to filter on |
operator |
Operator
|
The comparison operator |
value |
Any
|
The value to compare against (optional for EXISTS/NOT_EXISTS operators) |
Example
from dataknobs_data import Filter, Operator, Query, database_factory
# Create filters
age_filter = Filter("age", Operator.GT, 25)
name_filter = Filter("name", Operator.LIKE, "A%")
status_filter = Filter("status", Operator.IN, ["active", "pending"])
# Use in query
query = Query(filters=[age_filter, name_filter])
# Search database
db = database_factory("memory")
results = db.search(query)
Methods:
| Name | Description |
|---|---|
matches |
Check if a record value matches this filter. |
to_dict |
Convert filter to dictionary representation. |
from_dict |
Create filter from dictionary representation. |
Functions¶
matches ¶
Check if a record value matches this filter.
Supports type-aware comparisons for ranges and special handling for datetime/date objects.
Source code in packages/data/src/dataknobs_data/query.py
to_dict ¶
from_dict
classmethod
¶
Create filter from dictionary representation.
Operator ¶
Bases: Enum
Query operators for filtering.
Operators used to build filter conditions in queries. Supports comparison, pattern matching, existence checks, and range queries.
Example
from dataknobs_data import Filter, Operator, Query
# Equality
filter_eq = Filter("age", Operator.EQ, 30)
# Comparison
filter_gt = Filter("score", Operator.GT, 90)
# Pattern matching (SQL LIKE)
filter_like = Filter("name", Operator.LIKE, "A%") # Names starting with 'A'
# IN operator
filter_in = Filter("status", Operator.IN, ["active", "pending"])
# Range query
filter_between = Filter("age", Operator.BETWEEN, [20, 40])
# Build query
query = Query(filters=[filter_gt, filter_like])
Query
dataclass
¶
Query(
filters: list[Filter] = list(),
sort_specs: list[SortSpec] = list(),
limit_value: int | None = None,
offset_value: int | None = None,
fields: list[str] | None = None,
vector_query: VectorQuery | None = None,
)
Represents a database query with filters, sorting, pagination, and vector search.
A Query combines multiple filter conditions, sort specifications, and pagination options to retrieve records from a database. Supports fluent interface for building queries.
Attributes:
| Name | Type | Description |
|---|---|---|
filters |
list[Filter]
|
List of filter conditions |
sort_specs |
list[SortSpec]
|
List of sort specifications |
limit_value |
int | None
|
Maximum number of results |
offset_value |
int | None
|
Number of results to skip |
fields |
list[str] | None
|
List of field names to include (projection) |
vector_query |
VectorQuery | None
|
Optional vector similarity search parameters |
Example
from dataknobs_data import Query, Filter, Operator, SortOrder, SortSpec, database_factory
# Simple query with filters
query = Query(
filters=[
Filter("age", Operator.GT, 25),
Filter("status", Operator.EQ, "active")
]
)
# Using fluent interface
query = (
Query()
.filter("age", Operator.GT, 25)
.filter("status", Operator.EQ, "active")
.sort_by("age", SortOrder.DESC)
.limit(10)
.offset(20)
)
# With field projection
query = (
Query()
.filter("age", Operator.GT, 25)
.select("name", "age", "email")
)
# Execute query
db = database_factory("memory")
results = db.search(query)
Methods:
| Name | Description |
|---|---|
filter |
Add a filter to the query (fluent interface). |
sort_by |
Add a sort specification to the query (fluent interface). |
sort |
Add sorting (fluent interface). |
set_limit |
Set the result limit (fluent interface). |
limit |
Set limit (fluent interface). |
set_offset |
Set the result offset (fluent interface). |
offset |
Set offset (fluent interface). |
select |
Set field projection (fluent interface). |
clear_filters |
Clear all filters (fluent interface). |
clear_sort |
Clear all sort specifications (fluent interface). |
similar_to |
Add vector similarity search to the query. |
near_text |
Add text-based vector similarity search to the query. |
hybrid |
Create a hybrid query combining text and vector search. |
with_reranking |
Enable result reranking for vector queries. |
clear_vector |
Clear vector search from the query (fluent interface). |
to_dict |
Convert query to dictionary representation. |
from_dict |
Create query from dictionary representation. |
copy |
Create a copy of the query. |
or_ |
Create a ComplexQuery with OR logic. |
and_ |
Add more filters with AND logic (convenience method). |
not_ |
Create a ComplexQuery with NOT logic. |
Attributes¶
sort_property
property
¶
Get sort specifications (backward compatibility).
Functions¶
filter ¶
Add a filter to the query (fluent interface).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
field
|
str
|
The field name to filter on |
required |
operator
|
str | Operator
|
The operator (string or Operator enum) |
required |
value
|
Any
|
The value to compare against |
None
|
Returns:
| Type | Description |
|---|---|
Query
|
Self for method chaining |
Source code in packages/data/src/dataknobs_data/query.py
sort_by ¶
Add a sort specification to the query (fluent interface).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
field
|
str
|
The field name to sort by |
required |
order
|
str | SortOrder
|
The sort order ("asc", "desc", or SortOrder enum) |
'asc'
|
Returns:
| Type | Description |
|---|---|
Query
|
Self for method chaining |
Source code in packages/data/src/dataknobs_data/query.py
sort ¶
set_limit ¶
Set the result limit (fluent interface).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
limit
|
int
|
Maximum number of results |
required |
Returns:
| Type | Description |
|---|---|
Query
|
Self for method chaining |
limit ¶
set_offset ¶
Set the result offset (fluent interface).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
offset
|
int
|
Number of results to skip |
required |
Returns:
| Type | Description |
|---|---|
Query
|
Self for method chaining |
Source code in packages/data/src/dataknobs_data/query.py
offset ¶
select ¶
Set field projection (fluent interface).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
fields
|
str
|
Field names to include in results |
()
|
Returns:
| Type | Description |
|---|---|
Query
|
Self for method chaining |
Source code in packages/data/src/dataknobs_data/query.py
clear_filters ¶
clear_sort ¶
similar_to ¶
similar_to(
vector: ndarray | list[float],
field: str = "embedding",
k: int = 10,
metric: DistanceMetric | str = "cosine",
include_source: bool = True,
score_threshold: float | None = None,
) -> Query
Add vector similarity search to the query.
This method sets up a vector similarity search that will find the k most similar vectors to the provided query vector.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
vector
|
ndarray | list[float]
|
Query vector to search for similar vectors |
required |
field
|
str
|
Vector field name to search (default: "embedding") |
'embedding'
|
k
|
int
|
Number of results to return (default: 10) |
10
|
metric
|
DistanceMetric | str
|
Distance metric to use (default: "cosine") |
'cosine'
|
include_source
|
bool
|
Whether to include source text in results (default: True) |
True
|
score_threshold
|
float | None
|
Minimum similarity score threshold (optional) |
None
|
Returns:
| Type | Description |
|---|---|
Query
|
Self for method chaining |
Source code in packages/data/src/dataknobs_data/query.py
near_text ¶
near_text(
text: str,
embedding_fn: Callable[[str], ndarray],
field: str = "embedding",
k: int = 10,
metric: DistanceMetric | str = "cosine",
include_source: bool = True,
score_threshold: float | None = None,
) -> Query
Add text-based vector similarity search to the query.
This is a convenience method that converts text to a vector using the provided embedding function, then performs vector similarity search.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
text
|
str
|
Text to convert to vector for similarity search |
required |
embedding_fn
|
Callable[[str], ndarray]
|
Function to convert text to vector |
required |
field
|
str
|
Vector field name to search (default: "embedding") |
'embedding'
|
k
|
int
|
Number of results to return (default: 10) |
10
|
metric
|
DistanceMetric | str
|
Distance metric to use (default: "cosine") |
'cosine'
|
include_source
|
bool
|
Whether to include source text in results (default: True) |
True
|
score_threshold
|
float | None
|
Minimum similarity score threshold (optional) |
None
|
Returns:
| Type | Description |
|---|---|
Query
|
Self for method chaining |
Source code in packages/data/src/dataknobs_data/query.py
hybrid ¶
hybrid(
text_query: str | None = None,
vector: ndarray | list[float] | None = None,
text_field: str = "content",
vector_field: str = "embedding",
alpha: float = 0.5,
k: int = 10,
metric: DistanceMetric | str = "cosine",
) -> Query
Create a hybrid query combining text and vector search.
This method combines traditional text search with vector similarity search, allowing for more nuanced queries that leverage both exact text matching and semantic similarity.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
text_query
|
str | None
|
Text to search for (optional) |
None
|
vector
|
ndarray | list[float] | None
|
Vector for similarity search (optional) |
None
|
text_field
|
str
|
Field for text search (default: "content") |
'content'
|
vector_field
|
str
|
Field for vector search (default: "embedding") |
'embedding'
|
alpha
|
float
|
Weight balance between text (0.0) and vector (1.0) search (default: 0.5) |
0.5
|
k
|
int
|
Number of results to return (default: 10) |
10
|
metric
|
DistanceMetric | str
|
Distance metric for vector search (default: "cosine") |
'cosine'
|
Returns:
| Type | Description |
|---|---|
Query
|
Self for method chaining |
Note
- alpha=0.0 gives full weight to text search
- alpha=1.0 gives full weight to vector search
- alpha=0.5 gives equal weight to both
Source code in packages/data/src/dataknobs_data/query.py
with_reranking ¶
Enable result reranking for vector queries.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
rerank_k
|
int | None
|
Number of results to rerank (default: 2*k from vector query) |
None
|
Returns:
| Type | Description |
|---|---|
Query
|
Self for method chaining |
Source code in packages/data/src/dataknobs_data/query.py
clear_vector ¶
to_dict ¶
Convert query to dictionary representation.
Source code in packages/data/src/dataknobs_data/query.py
from_dict
classmethod
¶
Create query from dictionary representation.
Source code in packages/data/src/dataknobs_data/query.py
copy ¶
Create a copy of the query.
Source code in packages/data/src/dataknobs_data/query.py
or_ ¶
Create a ComplexQuery with OR logic.
The current query's filters become an AND group, combined with OR conditions. Example: Query with filters [A, B] calling or_(C, D) creates: (A AND B) AND (C OR D)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
filters
|
Filter | Query
|
Filter objects or Query objects to OR together |
()
|
Returns:
| Type | Description |
|---|---|
ComplexQuery
|
ComplexQuery with OR logic |
Source code in packages/data/src/dataknobs_data/query.py
and_ ¶
Add more filters with AND logic (convenience method).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
filters
|
Filter | Query
|
Filter objects or Query objects to AND together |
()
|
Returns:
| Type | Description |
|---|---|
Query
|
Self for chaining |
Source code in packages/data/src/dataknobs_data/query.py
not_ ¶
Create a ComplexQuery with NOT logic.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
filter
|
Filter
|
Filter to negate |
required |
Returns:
| Type | Description |
|---|---|
ComplexQuery
|
ComplexQuery with NOT logic |
Source code in packages/data/src/dataknobs_data/query.py
SortOrder ¶
Bases: Enum
Sort order for query results.
SortSpec
dataclass
¶
ComplexQuery
dataclass
¶
ComplexQuery(
condition: Condition | None = None,
sort_specs: list = list(),
limit_value: int | None = None,
offset_value: int | None = None,
fields: list[str] | None = None,
vector_query: VectorQuery | None = None,
)
A query with complex boolean logic support.
Methods:
| Name | Description |
|---|---|
AND |
Create a complex query with AND logic. |
OR |
Create a complex query with OR logic. |
matches |
Check if a record matches this query. |
to_simple_query |
Convert to simple Query if possible (AND filters only). |
to_dict |
Convert to dictionary representation. |
from_dict |
Create from dictionary representation. |
Functions¶
AND
classmethod
¶
Create a complex query with AND logic.
Source code in packages/data/src/dataknobs_data/query_logic.py
OR
classmethod
¶
Create a complex query with OR logic.
Source code in packages/data/src/dataknobs_data/query_logic.py
matches ¶
to_simple_query ¶
Convert to simple Query if possible (AND filters only).
Source code in packages/data/src/dataknobs_data/query_logic.py
to_dict ¶
Convert to dictionary representation.
Source code in packages/data/src/dataknobs_data/query_logic.py
from_dict
classmethod
¶
Create from dictionary representation.
Source code in packages/data/src/dataknobs_data/query_logic.py
Condition ¶
Bases: ABC
Abstract base class for query conditions.
Methods:
| Name | Description |
|---|---|
matches |
Check if a record matches this condition. |
to_dict |
Convert condition to dictionary representation. |
from_dict |
Create condition from dictionary representation. |
FilterCondition
dataclass
¶
Bases: Condition
A single filter condition.
Methods:
| Name | Description |
|---|---|
matches |
Check if a record matches this filter. |
to_dict |
Convert to dictionary representation. |
from_dict |
Create from dictionary representation. |
Functions¶
matches ¶
Check if a record matches this filter.
Source code in packages/data/src/dataknobs_data/query_logic.py
to_dict ¶
from_dict
classmethod
¶
LogicCondition
dataclass
¶
Bases: Condition
A logical combination of conditions.
Methods:
| Name | Description |
|---|---|
matches |
Check if a record matches this logical condition. |
to_dict |
Convert to dictionary representation. |
from_dict |
Create from dictionary representation. |
Functions¶
matches ¶
Check if a record matches this logical condition.
Source code in packages/data/src/dataknobs_data/query_logic.py
to_dict ¶
Convert to dictionary representation.
from_dict
classmethod
¶
Create from dictionary representation.
Source code in packages/data/src/dataknobs_data/query_logic.py
LogicOperator ¶
Bases: Enum
Logical operators for combining conditions.
QueryBuilder ¶
Builder for complex queries with boolean logic.
Initialize empty query builder.
Methods:
| Name | Description |
|---|---|
where |
Add a filter condition (defaults to AND with existing conditions). |
and_ |
Add AND conditions. |
or_ |
Add OR conditions. |
not_ |
Add NOT condition. |
sort_by |
Add sort specification. |
limit |
Set result limit. |
offset |
Set result offset. |
select |
Set field projection. |
similar_to |
Add vector similarity search. |
build |
Build the final query. |
Source code in packages/data/src/dataknobs_data/query_logic.py
Functions¶
where ¶
Add a filter condition (defaults to AND with existing conditions).
Source code in packages/data/src/dataknobs_data/query_logic.py
and_ ¶
Add AND conditions.
Source code in packages/data/src/dataknobs_data/query_logic.py
or_ ¶
Add OR conditions.
Source code in packages/data/src/dataknobs_data/query_logic.py
not_ ¶
Add NOT condition.
Source code in packages/data/src/dataknobs_data/query_logic.py
sort_by ¶
Add sort specification.
Source code in packages/data/src/dataknobs_data/query_logic.py
limit ¶
offset ¶
select ¶
similar_to ¶
similar_to(
vector: ndarray | list[float],
field: str = "embedding",
k: int = 10,
metric: DistanceMetric | str = "cosine",
include_source: bool = True,
score_threshold: float | None = None,
) -> QueryBuilder
Add vector similarity search.
Source code in packages/data/src/dataknobs_data/query_logic.py
build ¶
Build the final query.
Source code in packages/data/src/dataknobs_data/query_logic.py
Record
dataclass
¶
Record(
data: dict[str, Any] | OrderedDict[str, Field] | None = None,
metadata: dict[str, Any] | None = None,
id: str | None = None,
storage_id: str | None = None,
)
Represents a structured data record with fields and metadata.
The record ID can be accessed via the id property, which:
- Returns the storage_id if set (database-assigned ID)
- Falls back to user-defined 'id' field if present
- Returns None if no ID is available
This separation allows records to have both: - A user-defined 'id' field as part of their data - A system-assigned storage_id for database operations
Example
from dataknobs_data import Record, Field, FieldType
# Create record from dict
record = Record({"name": "Alice", "age": 30, "email": "alice@example.com"})
# Access field values
print(record.get_value("name")) # "Alice"
print(record["age"]) # 30
print(record.name) # "Alice" (attribute access)
# Set field values
record.set_value("age", 31)
record["city"] = "New York"
# Work with metadata
record.metadata["source"] = "user_input"
# Convert to dict
data = record.to_dict() # {"name": "Alice", "age": 31, "email": "...", "city": "..."}
Initialize a record from various data formats.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data
|
dict[str, Any] | OrderedDict[str, Field] | None
|
Can be a dict of field names to values, or an OrderedDict of Field objects |
None
|
metadata
|
dict[str, Any] | None
|
Optional metadata for the record |
None
|
id
|
str | None
|
Optional unique identifier for the record (deprecated, use storage_id) |
None
|
storage_id
|
str | None
|
Optional storage system identifier for the record |
None
|
Example
Methods:
| Name | Description |
|---|---|
generate_id |
Generate and set a new UUID for this record. |
get_user_id |
Get the user-defined ID field value (not the storage ID). |
has_storage_id |
Check if this record has a storage system ID assigned. |
get_field |
Get a field by name. |
get_value |
Get a field's value by name, supporting dot-notation for nested paths. |
get_nested_value |
Get a value from a nested path using dot notation. |
set_field |
Set or update a field. |
set_value |
Set a field's value by name. |
remove_field |
Remove a field by name. Returns True if field was removed. |
has_field |
Check if a field exists. |
field_names |
Get list of field names. |
field_count |
Get the number of fields. |
__getitem__ |
Get field value by name or field by index. |
__setitem__ |
Set field by name. |
__delitem__ |
Delete field by name. |
__contains__ |
Check if field exists. |
__iter__ |
Iterate over field names. |
__len__ |
Get number of fields. |
validate |
Validate all fields in the record. |
get_field_object |
Get the Field object by name. |
__getattr__ |
Get field value by attribute access. |
__setattr__ |
Set field value by attribute access. |
to_dict |
Convert record to dictionary. |
from_dict |
Create a record from a dictionary representation. |
copy |
Create a copy of the record. |
project |
Create a new record with only specified fields. |
merge |
Merge another record into this one. |
Attributes:
| Name | Type | Description |
|---|---|---|
storage_id |
str | None
|
Get the storage system ID (database-assigned ID). |
id |
str | None
|
Get the record ID. |
data |
dict[str, Any]
|
Get all field values as a dictionary. |
Source code in packages/data/src/dataknobs_data/records.py
Attributes¶
storage_id
property
writable
¶
Get the storage system ID (database-assigned ID).
id
property
writable
¶
Get the record ID.
Priority order: 1. Storage ID (database-assigned) if set 2. User-defined 'id' field value 3. Metadata 'id' (for backwards compatibility) 4. record_id field (common in DataFrames)
Returns the first ID found, or None if no ID is present.
data
property
¶
Get all field values as a dictionary.
Provides a simple dict-like view of the record's data.
Functions¶
generate_id ¶
Generate and set a new UUID for this record.
Returns:
| Type | Description |
|---|---|
str
|
The generated UUID string |
get_user_id ¶
Get the user-defined ID field value (not the storage ID).
This explicitly returns the value of the 'id' field in the record's data, ignoring any storage_id that may be set.
Returns:
| Type | Description |
|---|---|
str | None
|
The value of the 'id' field if present, None otherwise |
Source code in packages/data/src/dataknobs_data/records.py
has_storage_id ¶
Check if this record has a storage system ID assigned.
Returns:
| Type | Description |
|---|---|
bool
|
True if storage_id is set, False otherwise |
get_field ¶
get_value ¶
Get a field's value by name, supporting dot-notation for nested paths.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Field name or dot-notation path (e.g., "metadata.type") |
required |
default
|
Any
|
Default value if field not found |
None
|
Returns:
| Type | Description |
|---|---|
Any
|
The field value or default |
Example
Source code in packages/data/src/dataknobs_data/records.py
get_nested_value ¶
Get a value from a nested path using dot notation.
Supports paths like: - "metadata.type" - access metadata field (if exists) or metadata dict attribute - "fields.temperature" - access field values - "metadata.config.timeout" - nested dict access
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str
|
Dot-notation path to the value |
required |
default
|
Any
|
Default value if path not found |
None
|
Returns:
| Type | Description |
|---|---|
Any
|
The value at the path or default |
Source code in packages/data/src/dataknobs_data/records.py
set_field ¶
set_field(
name: str,
value: Any,
field_type: FieldType | None = None,
field_metadata: dict[str, Any] | None = None,
) -> None
Set or update a field.
Source code in packages/data/src/dataknobs_data/records.py
set_value ¶
Set a field's value by name.
Convenience method that creates the field if it doesn't exist.
Source code in packages/data/src/dataknobs_data/records.py
remove_field ¶
Remove a field by name. Returns True if field was removed.
has_field ¶
field_names ¶
field_count ¶
__getitem__ ¶
Get field value by name or field by index.
For string keys, returns the field value directly (dict-like access). For integer keys, returns the Field object at that index for backward compatibility.
Source code in packages/data/src/dataknobs_data/records.py
__setitem__ ¶
Set field by name.
Can accept either a Field object or a raw value. When given a raw value, creates a new Field automatically.
Source code in packages/data/src/dataknobs_data/records.py
__delitem__ ¶
__contains__ ¶
__iter__ ¶
__len__ ¶
validate ¶
get_field_object ¶
Get the Field object by name.
Use this method when you need access to the Field object itself, not just its value.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
str
|
Field name |
required |
Returns:
| Type | Description |
|---|---|
Field
|
The Field object |
Raises:
| Type | Description |
|---|---|
KeyError
|
If field not found |
Source code in packages/data/src/dataknobs_data/records.py
__getattr__ ¶
Get field value by attribute access.
Provides convenient attribute-style access to field values. Falls back to normal attribute access for non-field attributes.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Attribute/field name |
required |
Returns:
| Type | Description |
|---|---|
Any
|
Field value if field exists, otherwise raises AttributeError |
Source code in packages/data/src/dataknobs_data/records.py
__setattr__ ¶
Set field value by attribute access.
Allows setting field values using attribute syntax. Special attributes (fields, metadata, _id, _storage_id) are handled normally. Properties (id, storage_id) are also handled specially.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Attribute/field name |
required |
value
|
Any
|
Value to set |
required |
Source code in packages/data/src/dataknobs_data/records.py
to_dict ¶
to_dict(
include_metadata: bool = False,
flatten: bool = True,
include_field_objects: bool = True,
) -> dict[str, Any]
Convert record to dictionary.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
include_metadata
|
bool
|
Whether to include metadata in the output |
False
|
flatten
|
bool
|
If True (default), return just field values; if False, return structured format |
True
|
include_field_objects
|
bool
|
If True and not flattened, return full Field objects |
True
|
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dictionary representation of the record |
Source code in packages/data/src/dataknobs_data/records.py
from_dict
classmethod
¶
Create a record from a dictionary representation.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data
|
dict[str, Any]
|
Dictionary containing record data |
required |
Returns:
| Type | Description |
|---|---|
Record
|
A new Record instance |
Example
Source code in packages/data/src/dataknobs_data/records.py
copy ¶
Create a copy of the record.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
deep
|
bool
|
If True, create deep copies of fields and metadata |
True
|
Source code in packages/data/src/dataknobs_data/records.py
project ¶
Create a new record with only specified fields.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
field_names
|
list[str]
|
List of field names to include in the projection |
required |
Returns:
| Type | Description |
|---|---|
Record
|
A new Record containing only the specified fields |
Example
Source code in packages/data/src/dataknobs_data/records.py
merge ¶
Merge another record into this one.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
other
|
Record
|
The record to merge |
required |
overwrite
|
bool
|
If True, overwrite existing fields; if False, keep existing |
True
|
Returns:
| Type | Description |
|---|---|
Record
|
A new merged record |
Source code in packages/data/src/dataknobs_data/records.py
DedupChecker ¶
DedupChecker(
db: AsyncDatabase,
config: DedupConfig,
vector_store: Any | None = None,
embedding_fn: Callable[[str], Awaitable[list[float]]] | None = None,
)
Checks content uniqueness via hash matching and optional semantic similarity.
Uses an AsyncDatabase for hash-based exact matching and an optional
VectorStore for semantic similarity search.
Example
checker = DedupChecker(db=dedup_db, config=DedupConfig()) await checker.register({"content": "A question about math"}, "q-1") result = await checker.check({"content": "A question about math"}) result.recommendation 'exact_duplicate'
Initialize the dedup checker.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
db
|
AsyncDatabase
|
Database for storing content hashes. |
required |
config
|
DedupConfig
|
Deduplication configuration. |
required |
vector_store
|
Any | None
|
Optional vector store for semantic similarity search.
Expects a |
None
|
embedding_fn
|
Callable[[str], Awaitable[list[float]]] | None
|
Async function that takes text and returns an embedding
vector. Required when |
None
|
Methods:
| Name | Description |
|---|---|
compute_hash |
Compute a deterministic content hash from configured fields. |
check |
Check content for duplicates. |
register |
Register content for future duplicate lookups. |
Attributes:
| Name | Type | Description |
|---|---|---|
config |
DedupConfig
|
The dedup configuration. |
Source code in packages/data/src/dataknobs_data/dedup.py
Attributes¶
Functions¶
compute_hash ¶
Compute a deterministic content hash from configured fields.
Fields are joined with | separator to avoid collisions between
values like ("a b", "c") and ("a", "b c"). Missing fields
are treated as empty strings.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
content
|
dict[str, Any]
|
Content dictionary to hash. |
required |
Returns:
| Type | Description |
|---|---|
str
|
Hex digest of the content hash. |
Source code in packages/data/src/dataknobs_data/dedup.py
check
async
¶
Check content for duplicates.
Performs an exact hash match first, then optionally checks semantic similarity if configured.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
content
|
dict[str, Any]
|
Content dictionary to check. |
required |
Returns:
| Type | Description |
|---|---|
DedupResult
|
DedupResult with match information and recommendation. |
Source code in packages/data/src/dataknobs_data/dedup.py
register
async
¶
Register content for future duplicate lookups.
Stores the content hash in the database and optionally the embedding in the vector store.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
content
|
dict[str, Any]
|
Content dictionary to register. |
required |
record_id
|
str
|
The record ID to associate with this content. |
required |
Source code in packages/data/src/dataknobs_data/dedup.py
DedupConfig
dataclass
¶
DedupConfig(
hash_fields: list[str] = (lambda: ["content"])(),
hash_algorithm: str = "md5",
semantic_check: bool = False,
semantic_fields: list[str] | None = None,
similarity_threshold: float = 0.92,
max_similar_results: int = 5,
)
Configuration for deduplication checking.
Attributes:
| Name | Type | Description |
|---|---|---|
hash_fields |
list[str]
|
Field names used for computing the content hash. |
hash_algorithm |
str
|
Hash algorithm to use ( |
semantic_check |
bool
|
Whether to perform semantic similarity search. |
semantic_fields |
list[str] | None
|
Fields concatenated for embedding. Defaults to
|
similarity_threshold |
float
|
Minimum similarity score to consider a match. |
max_similar_results |
int
|
Maximum number of similar items to return. |
DedupResult
dataclass
¶
DedupResult(
is_exact_duplicate: bool,
exact_match_id: str | None = None,
similar_items: list[SimilarItem] = list(),
recommendation: str = "unique",
content_hash: str = "",
)
Result of a deduplication check.
Attributes:
| Name | Type | Description |
|---|---|---|
is_exact_duplicate |
bool
|
Whether an exact hash match was found. |
exact_match_id |
str | None
|
The record ID of the exact match, if any. |
similar_items |
list[SimilarItem]
|
Semantically similar items found. |
recommendation |
str
|
One of |
content_hash |
str
|
The computed hash of the checked content. |
SimilarItem
dataclass
¶
A record that is semantically similar to the candidate.
Attributes:
| Name | Type | Description |
|---|---|---|
record_id |
str
|
The ID of the similar record. |
score |
float
|
Similarity score (higher is more similar). |
matched_text |
str
|
The text that was matched against. |
StreamConfig
dataclass
¶
StreamConfig(
batch_size: int = 1000,
prefetch: int = 2,
timeout: float | None = None,
on_error: Callable[[Exception, Record], bool] | None = None,
)
Configuration for streaming operations.
Methods:
| Name | Description |
|---|---|
__post_init__ |
Validate configuration. |
Functions¶
__post_init__ ¶
Validate configuration.
Source code in packages/data/src/dataknobs_data/streaming.py
StreamProcessor ¶
Base class for stream processing utilities.
Methods:
| Name | Description |
|---|---|
batch_iterator |
Convert a record iterator into batches. |
list_to_iterator |
Convert a list of records to an iterator. |
list_to_async_iterator |
Convert a list of records to an async iterator. |
iterator_to_async_iterator |
Convert a synchronous iterator to an async iterator. |
async_batch_iterator |
Convert an async record iterator into batches. |
filter_stream |
Filter records in a stream. |
async_filter_stream |
Filter records in an async stream. |
transform_stream |
Transform records in a stream, filtering out None results. |
async_transform_stream |
Transform records in an async stream, filtering out None results. |
Functions¶
batch_iterator
staticmethod
¶
Convert a record iterator into batches.
Source code in packages/data/src/dataknobs_data/streaming.py
list_to_iterator
staticmethod
¶
list_to_async_iterator
async
staticmethod
¶
Convert a list of records to an async iterator.
This adapter allows synchronous lists to be used with async streaming APIs.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
records
|
list[Record]
|
List of records |
required |
Yields:
| Type | Description |
|---|---|
AsyncIterator[Record]
|
Individual records from the list |
Source code in packages/data/src/dataknobs_data/streaming.py
iterator_to_async_iterator
async
staticmethod
¶
Convert a synchronous iterator to an async iterator.
This adapter allows synchronous iterators to be used with async streaming APIs.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
iterator
|
Iterator[Record]
|
Synchronous iterator of records |
required |
Yields:
| Type | Description |
|---|---|
AsyncIterator[Record]
|
Individual records from the iterator |
Source code in packages/data/src/dataknobs_data/streaming.py
async_batch_iterator
async
staticmethod
¶
async_batch_iterator(
iterator: AsyncIterator[Record], batch_size: int
) -> AsyncIterator[list[Record]]
Convert an async record iterator into batches.
Source code in packages/data/src/dataknobs_data/streaming.py
filter_stream
staticmethod
¶
filter_stream(
iterator: Iterator[Record], predicate: Callable[[Record], bool]
) -> Iterator[Record]
Filter records in a stream.
Source code in packages/data/src/dataknobs_data/streaming.py
async_filter_stream
async
staticmethod
¶
async_filter_stream(
iterator: AsyncIterator[Record], predicate: Callable[[Record], bool]
) -> AsyncIterator[Record]
Filter records in an async stream.
Source code in packages/data/src/dataknobs_data/streaming.py
transform_stream
staticmethod
¶
transform_stream(
iterator: Iterator[Record], transform: Callable[[Record], Record | None]
) -> Iterator[Record]
Transform records in a stream, filtering out None results.
Source code in packages/data/src/dataknobs_data/streaming.py
async_transform_stream
async
staticmethod
¶
async_transform_stream(
iterator: AsyncIterator[Record],
transform: Callable[[Record], Record | None],
) -> AsyncIterator[Record]
Transform records in an async stream, filtering out None results.
Source code in packages/data/src/dataknobs_data/streaming.py
StreamResult
dataclass
¶
StreamResult(
total_processed: int = 0,
successful: int = 0,
failed: int = 0,
errors: list[dict[str, Any]] = list(),
duration: float = 0.0,
total_batches: int = 0,
failed_indices: list[int] = list(),
)
Result of streaming operation.
Methods:
| Name | Description |
|---|---|
add_error |
Add an error to the result. |
merge |
Merge another result into this one. |
__str__ |
Human-readable representation. |
Attributes:
| Name | Type | Description |
|---|---|---|
success_rate |
float
|
Calculate success rate as percentage. |
Attributes¶
Functions¶
add_error ¶
Add an error to the result.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
record_id
|
str | None
|
ID of the record that failed |
required |
error
|
Exception
|
The exception that occurred |
required |
index
|
int | None
|
Optional index of the failed record in the original batch |
None
|
Source code in packages/data/src/dataknobs_data/streaming.py
merge ¶
Merge another result into this one.
Source code in packages/data/src/dataknobs_data/streaming.py
__str__ ¶
Human-readable representation.