dataknobs-utils API Reference¶
Complete API documentation for the dataknobs_utils package.
💡 Quick Links: - Complete API Documentation - Full auto-generated reference - Source Code - Browse on GitHub - Package Guide - Detailed documentation
Package Information¶
- Package Name:
dataknobs_utils - Version: 1.0.0
- Description: Utility functions for dataknobs packages
- Python Requirements: >=3.8
Installation¶
Import Statement¶
from dataknobs_utils import (
elasticsearch_utils,
emoji_utils,
file_utils,
json_extractor,
json_utils,
llm_utils,
pandas_utils,
requests_utils,
resource_utils,
sql_utils,
stats_utils,
subprocess_utils,
sys_utils,
xml_utils,
)
Module Documentation¶
elasticsearch_utils¶
Classes¶
TableSettings¶
dataknobs_utils.elasticsearch_utils.TableSettings ¶
Configuration container for an Elasticsearch index.
Attributes:
| Name | Type | Description |
|---|---|---|
name |
Index name. |
|
settings |
Index settings (shards, replicas, analyzers, etc.). |
|
mapping |
Field mappings and types. |
Initialize table settings.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
table_name
|
str
|
Name of the Elasticsearch index. |
required |
data_settings
|
Dict[str, Any]
|
Index settings dictionary. |
required |
data_mapping
|
Dict[str, Any]
|
Field mappings dictionary. |
required |
Source code in packages/utils/src/dataknobs_utils/elasticsearch_utils.py
Functions¶
ElasticsearchIndex¶
dataknobs_utils.elasticsearch_utils.ElasticsearchIndex ¶
ElasticsearchIndex(
request_helper: Any | None,
table_settings: List[TableSettings],
elasticsearch_ip: str | None = None,
elasticsearch_port: int = 9200,
mock_requests: Any | None = None,
)
Wrapper for managing Elasticsearch indices with settings and mappings.
Handles index creation, initialization, and querying across multiple indices with predefined settings and mappings.
Attributes:
| Name | Type | Description |
|---|---|---|
request_helper |
Any
|
RequestHelper for making HTTP requests. |
tables |
List of TableSettings for managed indices. |
Initialize Elasticsearch index manager.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
request_helper
|
Any | None
|
Pre-configured RequestHelper. If None, creates one using elasticsearch_ip and elasticsearch_port. Defaults to None. |
required |
table_settings
|
List[TableSettings]
|
List of TableSettings for indices to manage. |
required |
elasticsearch_ip
|
str | None
|
Elasticsearch host IP. Used if request_helper is None. Defaults to None (uses "localhost"). |
None
|
elasticsearch_port
|
int
|
Elasticsearch port. Defaults to 9200. |
9200
|
mock_requests
|
Any | None
|
Mock requests object for testing. Defaults to None. |
None
|
Methods:
| Name | Description |
|---|---|
analyze |
Analyze text using a specified analyzer. |
delete_table |
Delete an index. |
get_cluster_health |
Get Elasticsearch cluster health information. |
inspect_indices |
List all indices with their statistics. |
is_up |
Check if the Elasticsearch server is reachable. |
purge |
Delete and recreate all managed indices. |
search |
Execute an Elasticsearch search DSL query. |
sql |
Execute an Elasticsearch SQL query. |
Source code in packages/utils/src/dataknobs_utils/elasticsearch_utils.py
Functions¶
analyze ¶
Analyze text using a specified analyzer.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
text
|
str
|
Text to analyze. |
required |
analyzer
|
str
|
Name of the analyzer to use. |
required |
verbose
|
bool
|
If True, prints response. Defaults to False. |
False
|
Returns:
| Name | Type | Description |
|---|---|---|
ServerResponse |
Any
|
Response with analysis results (tokens, etc.). |
Source code in packages/utils/src/dataknobs_utils/elasticsearch_utils.py
delete_table ¶
Delete an index.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
table_name
|
str
|
Name of the index to delete. |
required |
verbose
|
bool
|
If True, prints response. Defaults to False. |
False
|
Returns:
| Name | Type | Description |
|---|---|---|
ServerResponse |
Any
|
Delete operation response. |
Source code in packages/utils/src/dataknobs_utils/elasticsearch_utils.py
get_cluster_health ¶
Get Elasticsearch cluster health information.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
verbose
|
bool
|
If True, prints response. Defaults to False. |
False
|
Returns:
| Name | Type | Description |
|---|---|---|
ServerResponse |
Any
|
Response with cluster health data. |
Source code in packages/utils/src/dataknobs_utils/elasticsearch_utils.py
inspect_indices ¶
List all indices with their statistics.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
verbose
|
bool
|
If True, prints response. Defaults to False. |
False
|
Returns:
| Name | Type | Description |
|---|---|---|
ServerResponse |
Any
|
Response with indices information. |
Source code in packages/utils/src/dataknobs_utils/elasticsearch_utils.py
is_up ¶
Check if the Elasticsearch server is reachable.
Returns:
| Name | Type | Description |
|---|---|---|
bool |
bool
|
True if server responds to cluster health check. |
Source code in packages/utils/src/dataknobs_utils/elasticsearch_utils.py
purge ¶
Delete and recreate all managed indices.
Removes all data by deleting indices, then recreates them with original settings and mappings.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
verbose
|
bool
|
If True, prints responses. Defaults to False. |
False
|
Returns:
| Name | Type | Description |
|---|---|---|
ServerResponse |
Any
|
Response from the last delete operation. |
Source code in packages/utils/src/dataknobs_utils/elasticsearch_utils.py
search ¶
Execute an Elasticsearch search DSL query.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query
|
Dict[str, Any]
|
Elasticsearch query dictionary, e.g.: {"query": {"match": {field: {"query": text, "operator": "AND"}}}} |
required |
table
|
str | None
|
Index name to search. If None, uses first managed index. Defaults to None. |
None
|
verbose
|
bool
|
If True, prints response. Defaults to False. |
False
|
Returns:
| Type | Description |
|---|---|
Any | None
|
ServerResponse | None: Response with results. If successful, resp.extra contains 'hits_df' and/or 'aggs_df' DataFrames. Returns None if no table is available. |
Source code in packages/utils/src/dataknobs_utils/elasticsearch_utils.py
sql ¶
Execute an Elasticsearch SQL query.
Automatically handles pagination using cursors to retrieve all results.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query
|
str
|
SQL query string. |
required |
fetch_size
|
int
|
Maximum records per fetch. Defaults to 10000. |
10000
|
columnar
|
bool
|
If True, uses compact columnar format (better for few columns). Defaults to True. |
True
|
verbose
|
bool
|
If True, prints responses. Defaults to False. |
False
|
Returns:
| Name | Type | Description |
|---|---|---|
ServerResponse |
Any
|
Response with results. If successful, resp.extra['df'] contains a DataFrame with all query results. |
Source code in packages/utils/src/dataknobs_utils/elasticsearch_utils.py
Functions¶
build_field_query_dict¶
dataknobs_utils.elasticsearch_utils.build_field_query_dict ¶
build_field_query_dict(
fields: Union[str, List[str]], text: str, operator: str | None = None
) -> Dict[str, Any]
Build an Elasticsearch field query to search for text.
Creates either a match query (single field) or multi_match query (multiple fields).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
fields
|
Union[str, List[str]]
|
Field name (str) or list of field names to query. |
required |
text
|
str
|
Text to search for. |
required |
operator
|
str | None
|
Search operator (e.g., "AND", "OR"). Uses Elasticsearch default if None. Defaults to None. |
None
|
Returns:
| Type | Description |
|---|---|
Dict[str, Any]
|
Dict[str, Any]: Elasticsearch query dictionary. |
Source code in packages/utils/src/dataknobs_utils/elasticsearch_utils.py
build_phrase_query_dict¶
dataknobs_utils.elasticsearch_utils.build_phrase_query_dict ¶
Build an Elasticsearch phrase query with slop tolerance.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
field
|
str
|
Field name to query. |
required |
phrase
|
str
|
Exact phrase to search for. |
required |
slop
|
int
|
Maximum number of positions between terms. Defaults to 0 (exact match). |
0
|
Returns:
| Type | Description |
|---|---|
Dict[str, Any]
|
Dict[str, Any]: Elasticsearch match_phrase query dictionary. |
Source code in packages/utils/src/dataknobs_utils/elasticsearch_utils.py
build_hits_dataframe¶
dataknobs_utils.elasticsearch_utils.build_hits_dataframe ¶
Extract search hits from Elasticsearch query results as DataFrame.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query_result
|
Dict[str, Any]
|
Elasticsearch query response dictionary. |
required |
Returns:
| Type | Description |
|---|---|
DataFrame | None
|
pd.DataFrame | None: DataFrame with _source fields from hits, or None if no hits found. |
Source code in packages/utils/src/dataknobs_utils/elasticsearch_utils.py
build_aggs_dataframe¶
dataknobs_utils.elasticsearch_utils.build_aggs_dataframe ¶
Extract aggregations from Elasticsearch query results as DataFrame.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query_result
|
Dict[str, Any]
|
Elasticsearch query response dictionary. |
required |
Returns:
| Type | Description |
|---|---|
DataFrame | None
|
pd.DataFrame | None: DataFrame with aggregation results (not yet implemented). |
Note
This function is a placeholder and currently returns None.
Source code in packages/utils/src/dataknobs_utils/elasticsearch_utils.py
decode_results¶
dataknobs_utils.elasticsearch_utils.decode_results ¶
Decode Elasticsearch query results into DataFrames.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query_result
|
Dict[str, Any]
|
Elasticsearch query response dictionary. |
required |
Returns:
| Type | Description |
|---|---|
Dict[str, DataFrame]
|
Dict[str, pd.DataFrame]: Dictionary with "hits_df" and/or "aggs_df" keys containing result DataFrames (only present if data exists). |
Source code in packages/utils/src/dataknobs_utils/elasticsearch_utils.py
add_batch_data¶
dataknobs_utils.elasticsearch_utils.add_batch_data ¶
add_batch_data(
batchfile: TextIO,
record_generator: Any,
idx_name: str,
source_id_fieldname: str = "id",
cur_id: int = 1,
) -> int
Write records to Elasticsearch bulk load batch file.
Generates newline-delimited JSON (NDJSON) format for Elasticsearch bulk API, with alternating action/source lines for each record.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
batchfile
|
TextIO
|
File handle for writing batch data. |
required |
record_generator
|
Any
|
Generator yielding source record dictionaries to index. |
required |
idx_name
|
str
|
Elasticsearch index name for these records. |
required |
source_id_fieldname
|
str
|
If non-empty, adds/updates this field in source records with the document ID. Defaults to "id". |
'id'
|
cur_id
|
int
|
Starting document ID. Defaults to 1. |
1
|
Returns:
| Name | Type | Description |
|---|---|---|
int |
int
|
Next available document ID after processing all records. |
Source code in packages/utils/src/dataknobs_utils/elasticsearch_utils.py
batchfile_record_generator¶
dataknobs_utils.elasticsearch_utils.batchfile_record_generator ¶
Generate records from an Elasticsearch bulk load batch file.
Parses NDJSON batch files, yielding only source documents (skipping action lines).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
batchfile_path
|
str
|
Path to the Elasticsearch batch file. |
required |
Yields:
| Type | Description |
|---|---|
Any
|
Dict[str, Any]: Each source record dictionary from the batch file. |
Source code in packages/utils/src/dataknobs_utils/elasticsearch_utils.py
collect_batchfile_values¶
dataknobs_utils.elasticsearch_utils.collect_batchfile_values ¶
collect_batchfile_values(
batchfile_path: str, fieldname: str, default_value: Any = ""
) -> List[Any]
Collect all values for a specific field from batch file records.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
batchfile_path
|
str
|
Path to the Elasticsearch batch file. |
required |
fieldname
|
str
|
Name of the field whose values to collect. |
required |
default_value
|
Any
|
Value to use when field doesn't exist in a record. Defaults to "". |
''
|
Returns:
| Type | Description |
|---|---|
List[Any]
|
List[Any]: List of field values from all records. |
Source code in packages/utils/src/dataknobs_utils/elasticsearch_utils.py
collect_batchfile_records¶
dataknobs_utils.elasticsearch_utils.collect_batchfile_records ¶
Load all batch file records into a pandas DataFrame.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
batchfile_path
|
str
|
Path to the Elasticsearch batch file. |
required |
Returns:
| Type | Description |
|---|---|
DataFrame
|
pd.DataFrame: DataFrame containing all records from the batch file. |
Source code in packages/utils/src/dataknobs_utils/elasticsearch_utils.py
file_utils¶
Functions¶
filepath_generator¶
dataknobs_utils.file_utils.filepath_generator ¶
filepath_generator(
rootpath: str,
descend: bool = True,
seen: Set[str] | None = None,
files_only: bool = True,
) -> Generator[str, None, None]
Generate all filepaths under the root path.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
rootpath
|
str
|
The root path under which to find files. |
required |
descend
|
bool
|
True to descend into subdirectories. Defaults to True. |
True
|
seen
|
Set[str] | None
|
Set of filepaths and/or directories to ignore. Defaults to None. |
None
|
files_only
|
bool
|
True to generate only paths to files; False to include paths to directories. Defaults to True. |
True
|
Yields:
| Name | Type | Description |
|---|---|---|
str |
str
|
Each file path found under the root path. |
Source code in packages/utils/src/dataknobs_utils/file_utils.py
fileline_generator¶
dataknobs_utils.file_utils.fileline_generator ¶
Generate lines from the file.
Automatically handles both plain text and gzip-compressed files based on the .gz extension. All lines are stripped of leading/trailing whitespace.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
filename
|
str
|
The name of the file to read. |
required |
rootdir
|
str | None
|
Optional directory path to prepend to filename. Defaults to None. |
None
|
Yields:
| Name | Type | Description |
|---|---|---|
str |
str
|
Each stripped line from the file. |
Source code in packages/utils/src/dataknobs_utils/file_utils.py
write_lines¶
dataknobs_utils.file_utils.write_lines ¶
Write lines to a file in sorted order.
Automatically handles both plain text and gzip-compressed files based on the .gz extension. Lines are sorted before writing.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
outfile
|
str
|
The name of the output file. |
required |
lines
|
List[str]
|
List of lines to write to the file. |
required |
rootdir
|
str | None
|
Optional directory path to prepend to outfile. Defaults to None. |
None
|
Source code in packages/utils/src/dataknobs_utils/file_utils.py
is_gzip_file¶
dataknobs_utils.file_utils.is_gzip_file ¶
Determine whether a file is gzip-compressed.
Checks the file's magic number (first 3 bytes) to identify gzip format.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
filepath
|
str
|
Path to the file to check. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
bool |
bool
|
True if the file is gzip-compressed, False otherwise. |
Source code in packages/utils/src/dataknobs_utils/file_utils.py
json_utils¶
Functions¶
dataknobs_utils.json_utils ¶
Utility functions for JSON processing, streaming, and manipulation.
Provides functions for working with JSON data including nested value access, streaming from files and URLs, and tree-based JSON structure operations.
Classes:
| Name | Description |
|---|---|
ArrayElementAcceptStrategy |
Strategy that groups paths by array element at a specific nesting level. |
GroupAcceptStrategy |
Abstract strategy for determining if a Path belongs in a PathGroup. |
JsonSchema |
Schema representation of JSON structure with type statistics. |
JsonSchemaBuilder |
Build a schema view of JSON data by streaming and analyzing structure. |
Path |
Container for a jq path with its value and optional line number. |
PathGroup |
Container for a group of related paths. |
PathSorter |
Sort and group paths into records based on an acceptance strategy. |
RecordPathBuilder |
Build and output records from JSON by grouping paths. |
ValuePath |
Structure to hold compressed information about paths to a unique value. |
ValuesIndex |
Index of unique values organized by their jq paths. |
Functions:
| Name | Description |
|---|---|
build_jq_path |
Build a jq path string from a json_stream path tuple. |
build_path_tuple |
Build a json_stream tuple path from a jq path string. |
collect_squashed |
Collect squashed JSON data into a dictionary. |
explode |
Explode a squashed JSON dictionary back into nested structure. |
get_records_df |
Collect top-level JSON records into a pandas DataFrame. |
get_value |
Get a value from a JSON object using a key path in indexed dot notation. |
indexing_format_fn |
Format a (jq_path, item) pair for indexing purposes. |
indexing_format_splitter |
Parse a line formatted by indexing_format_fn. |
path_to_dict |
Convert a jq path and value into a nested dictionary structure. |
squash_data |
Squash JSON data into single-level structure with jq-style keys. |
stream_jq_paths |
Stream JSON data and write formatted lines for each (jq_path, item) pair. |
stream_json_data |
Stream JSON data and call a visitor function for each value. |
stream_record_paths |
Stream JSON and write formatted records grouped by top-level structure. |
write_squashed |
Write squashed JSON data to a file. |
Classes¶
ArrayElementAcceptStrategy ¶
Bases: GroupAcceptStrategy
Strategy that groups paths by array element at a specific nesting level.
Creates record boundaries at array elements, treating each element as a distinct record with paths that share the same array indices up to the specified level.
Attributes:
| Name | Type | Description |
|---|---|---|
max_array_level |
Array nesting level at which to create records. |
|
ref_path |
Path | None
|
Reference path for matching subsequent paths. |
Initialize the array element grouping strategy.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
max_array_level
|
int
|
Array nesting depth for record boundaries: - -1: Ignore array levels (accept all) - 0: New record at first (top-level) array - 1: New record at second array level - etc. |
-1
|
Source code in packages/utils/src/dataknobs_utils/json_utils.py
Functions¶
GroupAcceptStrategy ¶
Bases: ABC
Abstract strategy for determining if a Path belongs in a PathGroup.
Defines the logic for accepting paths as either main paths (defining the record structure) or distributed paths (shared across records).
Methods:
| Name | Description |
|---|---|
accept_path |
Determine if and how a path should be added to the group. |
Functions¶
accept_path
abstractmethod
¶
Determine if and how a path should be added to the group.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
Path
|
The path to evaluate for acceptance. |
required |
group
|
PathGroup
|
The group that would receive the path. |
required |
distribute
|
bool
|
If True, path is proposed as a distributed (shared) path rather than a main (record-defining) path. |
False
|
Returns:
| Type | Description |
|---|---|
str | None
|
str | None: One of: - 'main': Accept as main path (record structure) - 'distributed': Accept as distributed path (shared data) - None: Reject the path |
Source code in packages/utils/src/dataknobs_utils/json_utils.py
JsonSchema ¶
JsonSchema(
schema: Dict[str, Any] | None = None,
values: ValuesIndex | None = None,
values_limit: int = 0,
)
Schema representation of JSON structure with type statistics.
Maintains a mapping of jq paths to value types and their occurrence counts, with optional tracking of unique values at each path.
Schema format
{
Where value_type is the type of value (e.g., 'int', 'float', 'str') and value_count is the number of times that type occurs at the path.
Attributes:
| Name | Type | Description |
|---|---|---|
schema |
Mapping of jq_path to type counts. |
|
values |
Optional ValuesIndex for tracking unique values. |
Initialize a JsonSchema.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
schema
|
Dict[str, Any] | None
|
Pre-existing schema to reconstruct from. Defaults to None. |
None
|
values
|
ValuesIndex | None
|
Pre-existing ValuesIndex to include. Defaults to None. |
None
|
values_limit
|
int
|
Maximum unique values to track per path. If 0, tracks all unique values. Defaults to 0. |
0
|
Methods:
| Name | Description |
|---|---|
add_path |
Add an occurrence of a value type at a jq path. |
extract_values |
Extract all values at a jq path from JSON data. |
Source code in packages/utils/src/dataknobs_utils/json_utils.py
Attributes¶
df
property
¶
Get schema information as a DataFrame with columns:
jq_path value_type value_count
Functions¶
add_path ¶
add_path(
jq_path: str,
value_type: str,
value: Any = None,
path: Tuple[Any, ...] | None = None,
) -> None
Add an occurrence of a value type at a jq path.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
jq_path
|
str
|
The jq-style path identifying the location. |
required |
value_type
|
str
|
The type of value at this path (e.g., 'int', 'str'). |
required |
value
|
Any
|
Optional actual value for unique value tracking. |
None
|
path
|
Tuple[Any, ...] | None
|
Optional full path tuple for tracking value occurrences. |
None
|
Source code in packages/utils/src/dataknobs_utils/json_utils.py
extract_values ¶
extract_values(
jq_path: str, json_data: str, unique: bool = True, timeout: int = TIMEOUT
) -> Union[List[Any], Set[Any]]
Extract all values at a jq path from JSON data.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
jq_path
|
str
|
The jq-style path to extract values from. |
required |
json_data
|
str
|
JSON data source (file path, URL, or JSON string). |
required |
unique
|
bool
|
If True, returns only unique values as a set. If False, returns all values as a list. Defaults to True. |
True
|
timeout
|
int
|
Request timeout in seconds for URL sources. Defaults to 10. |
TIMEOUT
|
Returns:
| Type | Description |
|---|---|
Union[List[Any], Set[Any]]
|
Union[List[Any], Set[Any]]: Set of unique values if unique=True, otherwise list of all values. |
Source code in packages/utils/src/dataknobs_utils/json_utils.py
JsonSchemaBuilder ¶
JsonSchemaBuilder(
json_data: str,
value_typer: Callable[[Any], str] | None = None,
keep_unique_values: bool = False,
invert_uniques: bool = False,
keep_list_idxs: bool = False,
timeout: int = TIMEOUT,
empty_dict_type: str = "_EMPTY_DICT_",
empty_list_type: str = "_EMPTY_LIST_",
unk_value_type: str = "_UNKNOWN_",
int_value_type: str = "int",
float_value_type: str = "float",
str_value_type: str = "str",
url_value_type: str = "URL",
on_add: Callable[[str], bool] | None = None,
)
Build a schema view of JSON data by streaming and analyzing structure.
Processes JSON data to extract type information, value statistics, and optionally track unique values at each path.
Initialize a JsonSchemaBuilder to analyze JSON structure.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
json_data
|
str
|
JSON data source (file path, URL, or JSON string). |
required |
value_typer
|
Callable[[Any], str] | None
|
Optional custom function that takes a value and returns its type string, overriding default type detection. |
None
|
keep_unique_values
|
bool
|
If True or an integer, tracks unique values. If int, limits tracking to that many unique values per path. |
False
|
invert_uniques
|
bool
|
If True, maintains reverse index from values to paths. |
False
|
keep_list_idxs
|
bool
|
If True, preserves exact array indices in paths. If False, generalizes all indices to '[]'. |
False
|
timeout
|
int
|
Request timeout in seconds for URL sources. Defaults to 10. |
TIMEOUT
|
empty_dict_type
|
str
|
Type name for empty dictionaries. |
'_EMPTY_DICT_'
|
empty_list_type
|
str
|
Type name for empty lists. |
'_EMPTY_LIST_'
|
unk_value_type
|
str
|
Type name for unclassified values. |
'_UNKNOWN_'
|
int_value_type
|
str
|
Type name for integers. |
'int'
|
float_value_type
|
str
|
Type name for floats. |
'float'
|
str_value_type
|
str
|
Type name for strings. |
'str'
|
url_value_type
|
str
|
Type name for URL strings, or None to treat as regular strings. |
'URL'
|
on_add
|
Callable[[str], bool] | None
|
Optional filter function called before adding each path. Takes jq_path and returns True to include or False to skip. |
None
|
Attributes:
| Name | Type | Description |
|---|---|---|
partial_schema |
JsonSchema
|
Get the current, possibly incomplete, schema |
schema |
JsonSchema
|
Get the schema for the json data |
Source code in packages/utils/src/dataknobs_utils/json_utils.py
Path ¶
Container for a jq path with its value and optional line number.
Represents a single path/value pair from JSON data, with optional tracking of the original line number for streaming scenarios.
Attributes:
| Name | Type | Description |
|---|---|---|
jq_path |
Fully-qualified jq-style path (e.g., '.data[0].name'). |
|
item |
The value at this path. |
|
line_num |
Optional line number from streaming (-1 if not tracked). |
Initialize a Path.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
jq_path
|
str
|
Fully-qualified jq-style path with indices. |
required |
item
|
Any
|
The value at this path. |
required |
line_num
|
int
|
Optional line number for ordering. Defaults to -1. |
-1
|
Source code in packages/utils/src/dataknobs_utils/json_utils.py
PathGroup ¶
Container for a group of related paths.
Methods:
| Name | Description |
|---|---|
accept |
Add the path if it belongs in this group. |
as_dict |
Reconstruct the object from the paths |
incorporate_paths |
Incorporate (distribute) the group's appliccable paths into this group. |
Attributes:
| Name | Type | Description |
|---|---|---|
num_distributed_paths |
int
|
Get the number of distributed paths in this group |
num_main_paths |
int
|
Get the number of main paths in this group |
paths |
List[Path]
|
Get all paths (both main and distributed) |
size |
int
|
Get the total number of paths in this group. |
Source code in packages/utils/src/dataknobs_utils/json_utils.py
Attributes¶
num_distributed_paths
property
¶
Get the number of distributed paths in this group
Functions¶
accept ¶
Add the path if it belongs in this group. :param path: The path to (potentially) add. :param distribute: True to propose the path as a distributed path :return: True if the path was accepted and added.
Source code in packages/utils/src/dataknobs_utils/json_utils.py
as_dict ¶
Reconstruct the object from the paths
Source code in packages/utils/src/dataknobs_utils/json_utils.py
incorporate_paths ¶
Incorporate (distribute) the group's appliccable paths into this group.
PathSorter ¶
Sort and group paths into records based on an acceptance strategy.
Manages the incremental grouping of paths as they're streamed, creating record boundaries according to the acceptance strategy and enforcing size constraints.
Attributes:
| Name | Type | Description |
|---|---|---|
accept_strategy |
Strategy for determining path membership. |
|
group_size |
Minimum group size (groups below this are dropped). |
|
max_groups |
Maximum groups kept in memory. |
|
groups |
List[PathGroup] | None
|
List of active PathGroup instances. |
Initialize a PathSorter.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
accept_strategy
|
GroupAcceptStrategy
|
Strategy for determining how paths are grouped. |
required |
group_size
|
int
|
Minimum required group size. Groups smaller than this are silently dropped when closed. 0 means no minimum. Defaults to 0. |
0
|
max_groups
|
int
|
Maximum groups kept in memory. Older groups are dropped when limit is reached. 0 means unlimited. Defaults to 0. |
0
|
Methods:
| Name | Description |
|---|---|
accept_path |
Try to add a path to any existing group. |
add_path |
Add a path to the appropriate group, creating new groups as needed. |
all_groups_have_size |
Check if all groups have a specific size. |
close_group |
Close a group, finalizing its contents. |
Source code in packages/utils/src/dataknobs_utils/json_utils.py
Attributes¶
Functions¶
accept_path ¶
Try to add a path to any existing group.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
Path
|
The path to add. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
bool |
bool
|
True if the path was accepted by a group, False otherwise. |
Source code in packages/utils/src/dataknobs_utils/json_utils.py
add_path ¶
Add a path to the appropriate group, creating new groups as needed.
Paths are added to the most recent group if accepted. When a path is rejected, the current group is closed and a new group is created.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
Path
|
The path to add to a group. |
required |
Returns:
| Type | Description |
|---|---|
PathGroup | None
|
PathGroup | None: The most recently closed group if one was closed, otherwise None. |
Source code in packages/utils/src/dataknobs_utils/json_utils.py
all_groups_have_size ¶
Check if all groups have a specific size.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
group_size
|
int
|
The size to test for. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
bool |
bool
|
True if all groups have exactly this size, False otherwise. |
Source code in packages/utils/src/dataknobs_utils/json_utils.py
close_group ¶
Close a group, finalizing its contents.
Closing a group involves: 1. Incorporating distributed paths from the previous group 2. Optionally checking size constraints and dropping undersized groups
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
idx
|
int
|
Index of group to close (-1 for last). Defaults to -1. |
-1
|
check_size
|
bool
|
If True, enforces group_size constraint. Defaults to True. |
True
|
Returns:
| Type | Description |
|---|---|
PathGroup | None
|
PathGroup | None: The closed group, or None if dropped for size. |
Source code in packages/utils/src/dataknobs_utils/json_utils.py
RecordPathBuilder ¶
RecordPathBuilder(
json_data: str,
output_stream: TextIO,
line_builder_fn: Callable[[int, int, str, Any], str],
timeout: int = TIMEOUT,
)
Build and output records from JSON by grouping paths.
Streams JSON data, groups paths into records using a PathSorter, and writes formatted output for each record.
Attributes:
| Name | Type | Description |
|---|---|---|
jdata |
JSON data source. |
|
output_stream |
Where to write formatted output. |
|
builder_fn |
Function to format record lines. |
|
timeout |
Request timeout for URLs. |
|
rec_id |
Current record ID counter. |
|
inum |
Current item number counter. |
|
sorter |
PathSorter | None
|
PathSorter for grouping paths. |
Initialize a RecordPathBuilder.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
json_data
|
str
|
JSON data source (file path, URL, or JSON string). |
required |
output_stream
|
TextIO
|
Text stream to write formatted lines to. |
required |
line_builder_fn
|
Callable[[int, int, str, Any], str]
|
Function with signature fn(rec_id, line_num, jq_path, item) that returns a formatted string. |
required |
timeout
|
int
|
Request timeout in seconds for URL sources. Defaults to 10. |
TIMEOUT
|
Source code in packages/utils/src/dataknobs_utils/json_utils.py
Functions¶
ValuePath ¶
Structure to hold compressed information about paths to a unique value.
Stores the tree of array indices leading to each occurrence of a value in JSON data, enabling efficient tracking of where values appear.
Attributes:
| Name | Type | Description |
|---|---|---|
jq_path |
The jq-style path template with generic array indices. |
|
value |
The value found at this path. |
Initialize a ValuePath for tracking occurrences of a value.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
jq_path
|
str
|
The jq-style path template (key). |
required |
value
|
Any
|
The value found at this path. |
required |
Methods:
| Name | Description |
|---|---|
add |
Add a path occurrence to the index tree. |
path_generator |
Generate all concrete paths to this value. |
Source code in packages/utils/src/dataknobs_utils/json_utils.py
Attributes¶
Functions¶
add ¶
Add a path occurrence to the index tree.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
Tuple[Any, ...] | None
|
Path tuple with the same structure as jq_path, or None if no path information is available. |
required |
Source code in packages/utils/src/dataknobs_utils/json_utils.py
path_generator ¶
Generate all concrete paths to this value.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
result_type
|
str
|
Format for generated paths: - 'jq_path': Generate jq-style path strings (default) - 'path': Generate full path tuples with keys and indices - 'idx': Generate index-only tuples |
'jq_path'
|
Yields:
| Type | Description |
|---|---|
Any
|
Paths in the requested format for each occurrence of the value. |
Source code in packages/utils/src/dataknobs_utils/json_utils.py
ValuesIndex ¶
Index of unique values organized by their jq paths.
Maintains a compressed index mapping each jq path to its unique values, with optional tracking of all occurrences via path trees.
Attributes:
| Name | Type | Description |
|---|---|---|
path_values |
Dict[str, Dict[Any, ValuePath]]
|
Nested dict mapping jq_path -> value -> ValuePath. |
Methods:
| Name | Description |
|---|---|
add |
Add a value occurrence to the index. |
get_values |
Get all unique values for a jq path. |
has_jqpath |
Check if any values exist for the given jq path. |
num_values |
Get count of unique values for a jq path. |
Source code in packages/utils/src/dataknobs_utils/json_utils.py
Functions¶
add ¶
Add a value occurrence to the index.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
value
|
Any
|
The value to index. |
required |
jq_path
|
str
|
The jq-style path where the value was found. |
required |
path
|
Tuple[Any, ...] | None
|
Optional full path tuple for tracking specific occurrences. |
None
|
Source code in packages/utils/src/dataknobs_utils/json_utils.py
get_values ¶
Get all unique values for a jq path.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
jq_path
|
str
|
The jq-style path to query. |
required |
Returns:
| Type | Description |
|---|---|
Set[Any]
|
Set[Any]: Set of unique values found at this path. |
Source code in packages/utils/src/dataknobs_utils/json_utils.py
has_jqpath ¶
Check if any values exist for the given jq path.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
jq_path
|
str
|
The jq-style path to check. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
bool |
bool
|
True if values exist for this path, False otherwise. |
Source code in packages/utils/src/dataknobs_utils/json_utils.py
num_values ¶
Get count of unique values for a jq path.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
jq_path
|
str
|
The jq-style path to query. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
int |
int
|
Number of unique values at this path. |
Source code in packages/utils/src/dataknobs_utils/json_utils.py
Functions¶
build_jq_path ¶
Build a jq path string from a json_stream path tuple.
Converts a path tuple like ('data', 0, 'name') to a jq-style path like '.data[0].name'.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
Tuple[Any, ...]
|
Tuple of json_stream path components, with integers representing array indices and strings representing object keys. |
required |
keep_list_idxs
|
bool
|
If True, keeps exact array index values (e.g., [0], [1]). If False, emits generic '[]' for all array indices. Defaults to True. |
True
|
Returns:
| Name | Type | Description |
|---|---|---|
str |
str
|
A jq-style path string. |
Source code in packages/utils/src/dataknobs_utils/json_utils.py
build_path_tuple ¶
Build a json_stream tuple path from a jq path string.
Reverses the operation of build_jq_path, converting a jq-style path like '.data[0].name' back to a tuple like ('data', 0, 'name').
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
jq_path
|
str
|
The jq-style path string to convert (e.g., '.data[0].name'). |
required |
any_list_idx
|
int
|
Index value to use for generic '[]' array notation. Defaults to -1. |
-1
|
Returns:
| Type | Description |
|---|---|
Tuple[Any, ...]
|
Tuple[Any, ...]: Path tuple in json_stream format alternating between keys (str) and array indices (int). |
Source code in packages/utils/src/dataknobs_utils/json_utils.py
collect_squashed ¶
collect_squashed(
jdata: str,
prune_at: List[Union[str, Tuple[str, int]]] | None = None,
timeout: int = TIMEOUT,
result: Dict[Any, Any] | None = None,
) -> Dict[Any, Any]
Collect squashed JSON data into a dictionary.
Convenience function that squashes JSON into a flat dictionary where keys are jq-style paths and values are the leaf values from the JSON.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
jdata
|
str
|
JSON data source (file path, URL, or JSON string). |
required |
prune_at
|
List[Union[str, Tuple[str, int]]] | None
|
List of path specifications identifying branches to skip. Defaults to None. See squash_data() for format details. |
None
|
timeout
|
int
|
Request timeout in seconds for URL sources. Defaults to 10. |
TIMEOUT
|
result
|
Dict[Any, Any] | None
|
Optional dictionary to populate. If None, creates a new dict. |
None
|
Returns:
| Type | Description |
|---|---|
Dict[Any, Any]
|
Dict[Any, Any]: Dictionary mapping jq-style paths to their values. |
Source code in packages/utils/src/dataknobs_utils/json_utils.py
explode ¶
Explode a squashed JSON dictionary back into nested structure.
Reverses the squashing operation by converting a flat dictionary with jq-style paths as keys back into a nested JSON-like structure.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
squashed
|
Dict[Any, Any]
|
Dictionary with jq-style paths as keys and leaf values. |
required |
Returns:
| Type | Description |
|---|---|
Dict[Any, Any]
|
Dict[Any, Any]: Nested dictionary reconstructed from the paths. |
Source code in packages/utils/src/dataknobs_utils/json_utils.py
get_records_df ¶
Collect top-level JSON records into a pandas DataFrame.
Convenience function that streams JSON and collects all records into memory as a DataFrame with columns: rec_id, line_num, jq_path, item.
Warning
Not suitable for large JSON files as all data is loaded into memory.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
json_data
|
str
|
JSON data source (file path, URL, or JSON string). |
required |
timeout
|
int
|
Request timeout in seconds for URL sources. Defaults to 10. |
TIMEOUT
|
Returns:
| Type | Description |
|---|---|
DataFrame
|
pd.DataFrame: Records with columns rec_id, line_num, jq_path, item. |
Source code in packages/utils/src/dataknobs_utils/json_utils.py
get_value ¶
get_value(
json_obj: Dict[str, Any], key_path: str, default: Any | None = None
) -> Union[Any, List[Any]]
Get a value from a JSON object using a key path in indexed dot notation.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
json_obj
|
Dict[str, Any]
|
The JSON object to search in |
required |
key_path
|
str
|
Dot-delimited string with optional [index] notation |
required |
default
|
Any | None
|
Value to return if path not found (default: None) |
None
|
Returns:
| Type | Description |
|---|---|
Union[Any, List[Any]]
|
Single value or list of values depending on key_path |
Source code in packages/utils/src/dataknobs_utils/json_utils.py
indexing_format_fn ¶
Format a (jq_path, item) pair for indexing purposes.
Creates a tab-separated format optimized for indexing with columns: value, field, flat_jq, idxs.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
jq_path
|
str
|
The jq-style path to the item. |
required |
item
|
Any
|
The value at the path. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
str |
str
|
Tab-separated string with format: - value: The item value - field: The last path element (field name) - flat_jq: The path with array indices flattened to '[]' - idxs: Comma-separated list of array indices from the path |
Source code in packages/utils/src/dataknobs_utils/json_utils.py
indexing_format_splitter ¶
Parse a line formatted by indexing_format_fn.
Reverses indexing_format_fn to extract the original components.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
fileline
|
str
|
Tab-separated line from indexing_format_fn. |
required |
Returns:
| Type | Description |
|---|---|
Tuple[str | None, str | None, str | None, str | None]
|
Tuple[str | None, str | None, str | None, str | None]: A tuple containing: - value: The item value - field: The last path element (field name) - flat_jq: The path with array indices flattened to '[]' - idxs: Comma-separated list of array indices All values are None if line is empty. |
Source code in packages/utils/src/dataknobs_utils/json_utils.py
path_to_dict ¶
path_to_dict(
path: Union[Tuple[Any, ...], str],
value: Any,
result: Dict[Any, Any] | None = None,
) -> Dict[Any, Any]
Convert a jq path and value into a nested dictionary structure.
Takes a path (jq string or tuple) and reconstructs the nested dictionary structure it represents, setting the value at the leaf.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
Union[Tuple[Any, ...], str]
|
Path to the value - either a jq-style string (e.g., '.data[0].name') or a path tuple (e.g., ('data', 0, 'name')). |
required |
value
|
Any
|
The value to set at the path. |
required |
result
|
Dict[Any, Any] | None
|
Optional dictionary to populate. If None, creates a new dict. |
None
|
Returns:
| Type | Description |
|---|---|
Dict[Any, Any]
|
Dict[Any, Any]: Dictionary with nested structure representing the path. |
Source code in packages/utils/src/dataknobs_utils/json_utils.py
430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 | |
squash_data ¶
squash_data(
builder_fn: Callable[[str, Any], None],
json_data: str,
prune_at: List[Union[str, Tuple[str, int]]] | None = None,
timeout: int = TIMEOUT,
) -> None
Squash JSON data into single-level structure with jq-style keys.
Compresses nested JSON paths into a flat structure where each path becomes a jq-style key. Optionally prunes specific branches from the output.
Pruning specification formats
- (path_element_name, path_index): Paths where path[path_index] == path_element_name
- path_element_name or (path_element_name, None): Any path containing this element name
- path_index or (None, path_index): Any path at this depth
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
builder_fn
|
Callable[[str, Any], None]
|
Callback function with signature fn(jq_path, item) called for each path/value pair to build results. |
required |
json_data
|
str
|
JSON data source (file path, URL, or JSON string). |
required |
prune_at
|
List[Union[str, Tuple[str, int]]] | None
|
List of path specifications identifying branches to skip. Defaults to None (no pruning). |
None
|
timeout
|
int
|
Request timeout in seconds for URL sources. Defaults to 10. |
TIMEOUT
|
Source code in packages/utils/src/dataknobs_utils/json_utils.py
219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 | |
stream_jq_paths ¶
stream_jq_paths(
json_data: str,
output_stream: TextIO,
line_builder_fn: Callable[[str, Any], str] = lambda jq_path, item: (
f"{jq_path} {item}"
),
keep_list_idxs: bool = True,
timeout: int = TIMEOUT,
) -> None
Stream JSON data and write formatted lines for each (jq_path, item) pair.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
json_data
|
str
|
JSON data source (file path, URL, or JSON string). |
required |
output_stream
|
TextIO
|
Text stream to write formatted lines to. |
required |
line_builder_fn
|
Callable[[str, Any], str]
|
Function that takes (jq_path, item) and returns a formatted string. Defaults to tab-separated format. |
lambda jq_path, item: f'{jq_path} {item}'
|
keep_list_idxs
|
bool
|
If True, keeps exact array index values. If False, uses generic '[]'. Defaults to True. |
True
|
timeout
|
int
|
Request timeout in seconds for URL sources. Defaults to 10. |
TIMEOUT
|
Source code in packages/utils/src/dataknobs_utils/json_utils.py
stream_json_data ¶
stream_json_data(
json_data: str,
visitor_fn: Callable[[Any, Tuple[Any, ...]], None],
timeout: int = TIMEOUT,
) -> None
Stream JSON data and call a visitor function for each value.
Supports multiple input formats: file paths (including .gz), URLs, or JSON strings. Automatically detects and handles gzip-compressed files.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
json_data
|
str
|
The JSON data source - can be a file path, URL (starting with 'http'), or JSON string. |
required |
visitor_fn
|
Callable[[Any, Tuple[Any, ...]], None]
|
Function called for each JSON value with signature visitor_fn(item, path) where item is the value and path is a tuple of elements identifying the path to the item. |
required |
timeout
|
int
|
Request timeout in seconds for URL sources. Defaults to 10. |
TIMEOUT
|
Source code in packages/utils/src/dataknobs_utils/json_utils.py
stream_record_paths ¶
stream_record_paths(
json_data: str,
output_stream: TextIO,
line_builder_fn: Callable[[int, int, str, Any], str],
timeout: int = TIMEOUT,
) -> None
Stream JSON and write formatted records grouped by top-level structure.
Identifies top-level JSON records (typically array elements) and writes formatted lines for each path within each record.
Each output line represents a (rec_id, line_num, jq_path, item) tuple where: - rec_id: 0-based record identifier - line_num: 0-based original item number from stream - jq_path: Fully-qualified path within the record - item: Value at the path
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
json_data
|
str
|
JSON data source (file path, URL, or JSON string). |
required |
output_stream
|
TextIO
|
Text stream to write formatted lines to. |
required |
line_builder_fn
|
Callable[[int, int, str, Any], str]
|
Function with signature fn(rec_id, line_num, jq_path, item) that returns a formatted string. |
required |
timeout
|
int
|
Request timeout in seconds for URL sources. Defaults to 10. |
TIMEOUT
|
Source code in packages/utils/src/dataknobs_utils/json_utils.py
write_squashed ¶
write_squashed(
dest_file: Union[str, TextIO],
jdata: str,
prune_at: List[Union[str, Tuple[str, int]]] | None = None,
timeout: int = TIMEOUT,
format_fn: Callable[[str, Any], str] = lambda jq_path, item: (
f"{jq_path} {item}"
),
) -> None
Write squashed JSON data to a file.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
dest_file
|
Union[str, TextIO]
|
Output file path (str) or open text stream (TextIO). |
required |
jdata
|
str
|
JSON data source (file path, URL, or JSON string). |
required |
prune_at
|
List[Union[str, Tuple[str, int]]] | None
|
List of path specifications identifying branches to skip. Defaults to None. See squash_data() for format details. |
None
|
timeout
|
int
|
Request timeout in seconds for URL sources. Defaults to 10. |
TIMEOUT
|
format_fn
|
Callable[[str, Any], str]
|
Function to format each (jq_path, item) pair as a line. Defaults to tab-separated format. |
lambda jq_path, item: f'{jq_path} {item}'
|
Source code in packages/utils/src/dataknobs_utils/json_utils.py
llm_utils¶
Classes¶
PromptMessage¶
dataknobs_utils.llm_utils.PromptMessage ¶
Structured prompt message with role, content, and optional metadata.
Where role is typically "system", "user", or "assistant" and content contains the prompt or LLM-generated text.
Attributes:
| Name | Type | Description |
|---|---|---|
role |
Message role (e.g., "system", "user", "assistant"). |
|
content |
Message text content. |
|
metadata |
Optional metadata dictionary. |
Initialize prompt message with role and content.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
role
|
str
|
Message role (e.g., "system", "user", "assistant"). |
required |
content
|
str
|
Prompt or LLM-generated text. |
required |
metadata
|
Dict[str, Any] | None
|
Optional metadata containing generation args, execution data, and user comments. Defaults to None. |
None
|
Methods:
| Name | Description |
|---|---|
__repr__ |
Get message as JSON string without metadata. |
build_instance |
Reconstruct a PromptMessage from its dictionary representation. |
get_message |
Get message as a dictionary. |
to_json |
Serialize message to JSON string. |
Source code in packages/utils/src/dataknobs_utils/llm_utils.py
Functions¶
__repr__ ¶
Get message as JSON string without metadata.
Returns:
| Name | Type | Description |
|---|---|---|
str |
str
|
JSON string representation of message. |
build_instance
staticmethod
¶
Reconstruct a PromptMessage from its dictionary representation.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
message_dict
|
Dict[str, Any]
|
Dictionary with "role", "content", and optionally "metadata" keys. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
PromptMessage |
PromptMessage
|
Reconstructed message instance. |
Source code in packages/utils/src/dataknobs_utils/llm_utils.py
get_message ¶
Get message as a dictionary.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
with_metadata
|
Union[bool, str]
|
If True, includes metadata. If a string, uses that as the metadata key instead of "metadata". Defaults to False. |
False
|
Returns:
| Type | Description |
|---|---|
Dict[str, Any]
|
Dict[str, Any]: Dictionary with "role" and "content", optionally including metadata. |
Source code in packages/utils/src/dataknobs_utils/llm_utils.py
to_json ¶
Serialize message to JSON string.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
with_metadata
|
bool
|
If True, includes metadata. Defaults to True. |
True
|
Returns:
| Name | Type | Description |
|---|---|---|
str |
str
|
JSON string representation of message. |
Raises:
| Type | Description |
|---|---|
TypeError
|
If metadata is not JSON serializable. |
Source code in packages/utils/src/dataknobs_utils/llm_utils.py
Functions¶
get_value_by_key¶
dataknobs_utils.llm_utils.get_value_by_key ¶
Get a nested value from a dictionary using dot-delimited path.
Navigates through nested dictionaries using a path key like "foo.bar.baz" to retrieve deeply nested values.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
d
|
Dict[str, Any] | None
|
Possibly nested dictionary to search. |
required |
pathkey
|
str
|
Dot-delimited path to the value (e.g., "foo.bar"). |
required |
default_value
|
Any
|
Value to return if path doesn't exist. Defaults to None. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
Any |
Any
|
Value at the path, or default_value if path doesn't exist. |
Examples:
Source code in packages/utils/src/dataknobs_utils/llm_utils.py
pandas_utils¶
Functions¶
dataknobs_utils.pandas_utils ¶
Pandas DataFrame utility functions and data transformations.
Provides utilities for creating, transforming, and manipulating Pandas DataFrames, including conversions between dicts, lists, and DataFrame formats.
Classes:
| Name | Description |
|---|---|
GroupManager |
Manage overlapping row groups in a DataFrame using JSON-encoded group lists. |
Functions:
| Name | Description |
|---|---|
dicts2df |
Create a DataFrame from dictionaries or lists of dictionaries. |
explode_json_series |
Explode a Series containing JSON-encoded lists into individual items. |
get_loc_range |
Find the range of True values in a boolean Series. |
sort_by_strlen |
Sort DataFrame by string length in a text column. |
Classes¶
GroupManager ¶
Manage overlapping row groups in a DataFrame using JSON-encoded group lists.
Handles DataFrames where rows can belong to multiple groups, with group membership stored as JSON lists of group numbers (e.g., "[1, 3]" means row belongs to groups 1 and 3). Provides utilities for marking, unmarking, querying, and analyzing group memberships.
Attributes:
| Name | Type | Description |
|---|---|---|
idf |
Original input DataFrame. |
|
gcol |
Name of the group number column. |
Initialize group manager with DataFrame and group column.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
df
|
DataFrame
|
DataFrame with rows to manage (will be re-indexed if needed). |
required |
group_num_col
|
str
|
Name of the column containing JSON-encoded group number lists. |
required |
Methods:
| Name | Description |
|---|---|
clear_all_groups |
Remove all group assignments from the DataFrame. |
find_subsets |
Find groups that are subsets of other groups. |
get_group_locs |
Get row indices for a specific group. |
get_intra_ungrouped_locs |
Get row indices between group boundaries that aren't in the group. |
get_subgroup_manager |
Create a GroupManager for subgroups within a specific group. |
mark_group |
Add rows to a group, creating or updating group membership. |
mark_groups |
Mark multiple groups in a single operation. |
remove_groups |
Remove multiple groups entirely. |
remove_subsets |
Remove all groups that are subsets of other groups. |
reset_group_numbers |
Renumber all groups consecutively starting from a given number. |
unmark_group |
Remove a group number from specified rows or entirely. |
Source code in packages/utils/src/dataknobs_utils/pandas_utils.py
Attributes¶
all_group_locs
property
¶
Get row indices for all groups.
Returns:
| Type | Description |
|---|---|
Dict[int, List[int]]
|
Dict[int, List[int]]: Mapping from group number to list of row indices. |
all_group_nums
property
¶
Get all existing group numbers.
Returns:
| Type | Description |
|---|---|
List[int]
|
List[int]: List of group numbers currently in use (empty if none). |
collapsed_df
property
¶
Get the DataFrame with JSON-encoded group lists.
Alias for the df property.
Returns:
| Type | Description |
|---|---|
DataFrame
|
pd.DataFrame: DataFrame with group column containing JSON lists. |
df
property
¶
Get the DataFrame with JSON-encoded group lists.
Returns:
| Type | Description |
|---|---|
DataFrame
|
pd.DataFrame: DataFrame with group column containing JSON lists. |
expanded_ser
property
¶
Get Series with individual group numbers and repeated indices.
Expands JSON group lists so each group number becomes a separate row with repeated indices for rows belonging to multiple groups.
Returns:
| Type | Description |
|---|---|
Series
|
pd.Series: Series with individual group numbers, indices repeated for multi-group membership. |
grouped_locs
property
¶
Get indices of all rows belonging to at least one group.
Returns:
| Type | Description |
|---|---|
List[int]
|
List[int]: Row indices with group membership. |
mask_df
property
¶
Get DataFrame of boolean masks for each group.
Returns:
| Type | Description |
|---|---|
DataFrame
|
pd.DataFrame: DataFrame where each column is a boolean mask for a group, with column names like "{group_col}_{group_num}". |
max_group_num
property
¶
Get the highest group number in use.
Returns:
| Name | Type | Description |
|---|---|---|
int |
int
|
Maximum group number, or -1 if no groups exist. |
ungrouped_locs
property
¶
Get indices of all rows not belonging to any group.
Returns:
| Type | Description |
|---|---|
List[int]
|
List[int]: Row indices without group membership. |
Functions¶
clear_all_groups ¶
Remove all group assignments from the DataFrame.
Resets the group column to NaN for all rows.
Source code in packages/utils/src/dataknobs_utils/pandas_utils.py
find_subsets ¶
Find groups that are subsets of other groups.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
proper
|
bool
|
If True, includes proper subsets (strict subsets) and identical groups. If False, only strict subsets. Defaults to True. |
True
|
Returns:
| Type | Description |
|---|---|
Set[int]
|
Set[int]: Group numbers that are subsets of other groups. |
Source code in packages/utils/src/dataknobs_utils/pandas_utils.py
get_group_locs ¶
Get row indices for a specific group.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
group_num
|
int
|
Group number to query. |
required |
Returns:
| Type | Description |
|---|---|
List[int]
|
List[int]: List of row indices in the group (empty if group doesn't exist). |
Source code in packages/utils/src/dataknobs_utils/pandas_utils.py
get_intra_ungrouped_locs ¶
Get row indices between group boundaries that aren't in the group.
Finds rows that fall within the range from the first to last row of a group but aren't actually members of the group.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
group_num
|
int
|
Group number to analyze. |
required |
Returns:
| Type | Description |
|---|---|
List[int]
|
List[int]: Row indices within group range but not in the group. |
Source code in packages/utils/src/dataknobs_utils/pandas_utils.py
get_subgroup_manager ¶
Create a GroupManager for subgroups within a specific group.
Extracts subgroup information for rows belonging to a specific group, filtering out subgroups that are shared with other groups. Returns a new GroupManager with a copy of the DataFrame showing only the relevant subgroup structure.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
group_num
|
int
|
Group number whose subgroups to extract. |
required |
subgroup_num_col
|
str
|
Name of the DataFrame column containing subgroup number lists. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
GroupManager |
GroupManager
|
New manager with only subgroups unique to this group. |
Source code in packages/utils/src/dataknobs_utils/pandas_utils.py
mark_group ¶
Add rows to a group, creating or updating group membership.
Assigns rows to a group number, either specified or auto-generated. If the group already exists, adds new rows to it without removing existing members.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
idx_values
|
Union[Series, List[int]]
|
Row indices to include in the group. |
required |
group_num
|
int | None
|
Group number to assign. If None, uses next available number (max + 1). Defaults to None. |
None
|
Source code in packages/utils/src/dataknobs_utils/pandas_utils.py
mark_groups ¶
mark_groups(
idx_value_lists: List[Union[Series, List[int]]],
group_nums: List[int] | None = None,
) -> None
Mark multiple groups in a single operation.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
idx_value_lists
|
List[Union[Series, List[int]]]
|
List where each element contains row indices for a group. |
required |
group_nums
|
List[int] | None
|
Group numbers corresponding to each idx_values list. If None, auto-generates consecutive numbers. Defaults to None. |
None
|
Source code in packages/utils/src/dataknobs_utils/pandas_utils.py
remove_groups ¶
Remove multiple groups entirely.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
group_nums
|
Union[List[int], Set[int]]
|
Collection of group numbers to remove completely. |
required |
Source code in packages/utils/src/dataknobs_utils/pandas_utils.py
remove_subsets ¶
Remove all groups that are subsets of other groups.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
proper
|
bool
|
If True, removes proper subsets and identical groups. If False, only removes strict subsets. Defaults to True. |
True
|
Source code in packages/utils/src/dataknobs_utils/pandas_utils.py
reset_group_numbers ¶
Renumber all groups consecutively starting from a given number.
Preserves group memberships but assigns new consecutive numbers.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
start_num
|
int
|
Starting group number. Defaults to 0. |
0
|
Source code in packages/utils/src/dataknobs_utils/pandas_utils.py
unmark_group ¶
Remove a group number from specified rows or entirely.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
group_num
|
int
|
Group number to remove. |
required |
idx_values
|
Series | None
|
Row indices from which to remove the group. If None, removes the group from all rows. Defaults to None. |
None
|
Source code in packages/utils/src/dataknobs_utils/pandas_utils.py
Functions¶
dicts2df ¶
dicts2df(
dicts: Union[List[Dict], List[List[Dict]]],
rename: Dict[str, str] | None = None,
item_id: str | None = "item_id",
) -> pd.DataFrame
Create a DataFrame from dictionaries or lists of dictionaries.
Converts a list of dictionaries or a list of lists of dictionaries into a single concatenated DataFrame with optional column renaming and indexing.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
dicts
|
Union[List[Dict], List[List[Dict]]]
|
List of dictionaries or list of lists of dictionaries. |
required |
rename
|
Dict[str, str] | None
|
Optional mapping of column names to rename {from: to}. Defaults to None. |
None
|
item_id
|
str | None
|
Name of column to add containing the list index. Set to None to skip adding this column. Defaults to "item_id". |
'item_id'
|
Returns:
| Type | Description |
|---|---|
DataFrame
|
pd.DataFrame: Concatenated DataFrame from all dictionaries. |
Source code in packages/utils/src/dataknobs_utils/pandas_utils.py
explode_json_series ¶
Explode a Series containing JSON-encoded lists into individual items.
Parses JSON list strings and expands them so each list item becomes a separate row with a repeated index.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
json_ser
|
Series
|
Series with values as JSON-encoded lists (e.g., "[1, 2, 3]"). |
required |
Returns:
| Type | Description |
|---|---|
Series
|
pd.Series: Exploded Series with individual list items as values. |
Source code in packages/utils/src/dataknobs_utils/pandas_utils.py
get_loc_range ¶
Find the range of True values in a boolean Series.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
bool_ser
|
Series
|
Boolean Series to analyze. |
required |
Returns:
| Type | Description |
|---|---|
Tuple[int, int]
|
Tuple[int, int]: Tuple of (first_loc, last_loc) containing indices of first and last True values. Returns (0, 0) if no True values exist. |
Source code in packages/utils/src/dataknobs_utils/pandas_utils.py
sort_by_strlen ¶
Sort DataFrame by string length in a text column.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
df
|
DataFrame
|
DataFrame to sort. |
required |
text_col
|
str
|
Name of the text column to sort by. |
required |
ascending
|
bool
|
If True, sort shortest to longest; if False, longest to shortest. Defaults to False. |
False
|
Returns:
| Type | Description |
|---|---|
DataFrame
|
pd.DataFrame: Sorted DataFrame (original is not modified). |
Source code in packages/utils/src/dataknobs_utils/pandas_utils.py
requests_utils¶
Classes and Functions¶
dataknobs_utils.requests_utils ¶
HTTP request utilities for making API calls and handling responses.
Provides convenience functions for making HTTP requests with error handling, timeout management, and response parsing.
Classes:
| Name | Description |
|---|---|
MockRequests |
Mock requests library for testing. |
MockResponse |
Mock HTTP response for testing. |
RequestHelper |
Helper class for making HTTP requests to a server. |
ServerResponse |
Wrapper for HTTP response data with convenience properties. |
Functions:
| Name | Description |
|---|---|
delete_request |
Execute an HTTP DELETE request and return the response. |
get_current_ip |
Get the running machine's IPv4 address. |
get_request |
Execute an HTTP GET request and return the response. |
post_files_request |
Execute an HTTP POST request with file uploads. |
post_request |
Execute an HTTP POST request and return the response. |
put_request |
Execute an HTTP PUT request and return the response. |
Classes¶
MockRequests ¶
Mock requests library for testing.
Simulates the requests library by storing expected responses keyed by request parameters, allowing deterministic testing without network calls.
Attributes:
| Name | Type | Description |
|---|---|---|
responses |
Dict[str, MockResponse]
|
Dictionary of registered mock responses. |
r404 |
Default 404 response for unregistered requests. |
Methods:
| Name | Description |
|---|---|
add |
Register a mock response for specific request parameters. |
Source code in packages/utils/src/dataknobs_utils/requests_utils.py
Functions¶
add ¶
add(
response: MockResponse,
api: str,
api_request: str,
data: Any | None = None,
files: Any | None = None,
headers: Dict[str, Any] | None = None,
params: Dict[str, Any] | None = None,
timeout: int = DEFAULT_TIMEOUT,
) -> None
Register a mock response for specific request parameters.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
response
|
MockResponse
|
Mock response to return. |
required |
api
|
str
|
Request method ('get', 'post', 'put', 'delete'). |
required |
api_request
|
str
|
Request URL. |
required |
data
|
Any | None
|
Request body data. Defaults to None. |
None
|
files
|
Any | None
|
Request files. Defaults to None. |
None
|
headers
|
Dict[str, Any] | None
|
Request headers. Defaults to None. |
None
|
params
|
Dict[str, Any] | None
|
Query parameters. Defaults to None. |
None
|
timeout
|
int
|
Request timeout. Defaults to DEFAULT_TIMEOUT. |
DEFAULT_TIMEOUT
|
Source code in packages/utils/src/dataknobs_utils/requests_utils.py
MockResponse ¶
Mock HTTP response for testing.
Simulates a requests.Response object with status code and result data.
Attributes:
| Name | Type | Description |
|---|---|---|
status_code |
HTTP status code. |
|
result |
Response data. |
|
text |
Response as text (JSON-serialized if result is not a string). |
Methods:
| Name | Description |
|---|---|
to_server_response |
Convert to a ServerResponse object. |
Source code in packages/utils/src/dataknobs_utils/requests_utils.py
Functions¶
to_server_response ¶
Convert to a ServerResponse object.
Returns:
| Name | Type | Description |
|---|---|---|
ServerResponse |
ServerResponse
|
Wrapped response object. |
Source code in packages/utils/src/dataknobs_utils/requests_utils.py
RequestHelper ¶
RequestHelper(
server_ip: str,
server_port: Union[str, int],
api_response_handler: Callable = json_api_response_handler,
headers: Dict[str, Any] | None = None,
timeout: int = DEFAULT_TIMEOUT,
mock_requests: Any | None = None,
)
Helper class for making HTTP requests to a server.
Simplifies sending API requests by managing server connection details, headers, timeouts, and response handling in a reusable instance.
Attributes:
| Name | Type | Description |
|---|---|---|
ip |
Server IP address. |
|
port |
Server port number. |
|
response_handler |
Function for processing responses. |
|
headers |
Default HTTP headers. |
|
timeout |
Default request timeout in seconds. |
|
requests |
Requests library or mock for testing. |
Initialize request helper with server details.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
server_ip
|
str
|
Server IP address or hostname. |
required |
server_port
|
Union[str, int]
|
Server port number. |
required |
api_response_handler
|
Callable
|
Default response handler function. Defaults to json_api_response_handler. |
json_api_response_handler
|
headers
|
Dict[str, Any] | None
|
Default HTTP headers. Defaults to None. |
None
|
timeout
|
int
|
Default timeout in seconds. Defaults to DEFAULT_TIMEOUT. |
DEFAULT_TIMEOUT
|
mock_requests
|
Any | None
|
Mock requests object for testing. Defaults to None. |
None
|
Methods:
| Name | Description |
|---|---|
build_url |
Construct full URL from path. |
delete |
Convenience method for DELETE requests. |
get |
Convenience method for GET requests. |
head |
Convenience method for HEAD requests. |
post |
Convenience method for POST requests. |
put |
Convenience method for PUT requests. |
request |
Execute an HTTP request of any type. |
Source code in packages/utils/src/dataknobs_utils/requests_utils.py
Functions¶
build_url ¶
Construct full URL from path.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str
|
API path (without leading slash recommended). |
required |
Returns:
| Name | Type | Description |
|---|---|---|
str |
str
|
Complete URL (http://ip:port/path). |
Source code in packages/utils/src/dataknobs_utils/requests_utils.py
delete ¶
delete(
path: str,
params: Dict[str, Any] | None = None,
headers: Dict[str, Any] | None = None,
timeout: int | None = None,
verbose: bool = False,
) -> Any
Convenience method for DELETE requests.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str
|
API path |
required |
params
|
Dict[str, Any] | None
|
Optional query parameters |
None
|
headers
|
Dict[str, Any] | None
|
Optional headers |
None
|
timeout
|
int | None
|
Optional timeout |
None
|
verbose
|
bool
|
Whether to print debug info |
False
|
Returns:
| Type | Description |
|---|---|
Any
|
ServerResponse object |
Source code in packages/utils/src/dataknobs_utils/requests_utils.py
get ¶
get(
path: str,
params: Dict[str, Any] | None = None,
headers: Dict[str, Any] | None = None,
timeout: int | None = None,
verbose: bool = False,
) -> Any
Convenience method for GET requests.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str
|
API path |
required |
params
|
Dict[str, Any] | None
|
Optional query parameters |
None
|
headers
|
Dict[str, Any] | None
|
Optional headers |
None
|
timeout
|
int | None
|
Optional timeout |
None
|
verbose
|
bool
|
Whether to print debug info |
False
|
Returns:
| Type | Description |
|---|---|
Any
|
ServerResponse object |
Source code in packages/utils/src/dataknobs_utils/requests_utils.py
head ¶
head(
path: str,
params: Dict[str, Any] | None = None,
headers: Dict[str, Any] | None = None,
timeout: int | None = None,
verbose: bool = False,
) -> Any
Convenience method for HEAD requests.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str
|
API path |
required |
params
|
Dict[str, Any] | None
|
Optional query parameters |
None
|
headers
|
Dict[str, Any] | None
|
Optional headers |
None
|
timeout
|
int | None
|
Optional timeout |
None
|
verbose
|
bool
|
Whether to print debug info |
False
|
Returns:
| Type | Description |
|---|---|
Any
|
ServerResponse object |
Source code in packages/utils/src/dataknobs_utils/requests_utils.py
post ¶
post(
path: str,
payload: Dict[str, Any] | None = None,
params: Dict[str, Any] | None = None,
files: Dict[str, Any] | None = None,
headers: Dict[str, Any] | None = None,
timeout: int | None = None,
verbose: bool = False,
) -> Any
Convenience method for POST requests.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str
|
API path |
required |
payload
|
Dict[str, Any] | None
|
Optional request body |
None
|
params
|
Dict[str, Any] | None
|
Optional query parameters |
None
|
files
|
Dict[str, Any] | None
|
Optional files to upload |
None
|
headers
|
Dict[str, Any] | None
|
Optional headers |
None
|
timeout
|
int | None
|
Optional timeout |
None
|
verbose
|
bool
|
Whether to print debug info |
False
|
Returns:
| Type | Description |
|---|---|
Any
|
ServerResponse object |
Source code in packages/utils/src/dataknobs_utils/requests_utils.py
put ¶
put(
path: str,
payload: Dict[str, Any] | None = None,
params: Dict[str, Any] | None = None,
headers: Dict[str, Any] | None = None,
timeout: int | None = None,
verbose: bool = False,
) -> Any
Convenience method for PUT requests.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str
|
API path |
required |
payload
|
Dict[str, Any] | None
|
Optional request body |
None
|
params
|
Dict[str, Any] | None
|
Optional query parameters |
None
|
headers
|
Dict[str, Any] | None
|
Optional headers |
None
|
timeout
|
int | None
|
Optional timeout |
None
|
verbose
|
bool
|
Whether to print debug info |
False
|
Returns:
| Type | Description |
|---|---|
Any
|
ServerResponse object |
Source code in packages/utils/src/dataknobs_utils/requests_utils.py
request ¶
request(
rtype: str,
path: str,
payload: Dict[str, Any] | None = None,
params: Dict[str, Any] | None = None,
files: Dict[str, Any] | None = None,
response_handler: Callable | None = None,
headers: Dict[str, Any] | None = None,
timeout: int = 0,
verbose: Union[bool, Any] = True,
) -> ServerResponse
Execute an HTTP request of any type.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
rtype
|
str
|
Request type - one of: 'get', 'post', 'post-files', 'put', 'delete', 'head'. |
required |
path
|
str
|
API path portion (will be appended to server URL). |
required |
payload
|
Dict[str, Any] | None
|
Request body data. Defaults to None. |
None
|
params
|
Dict[str, Any] | None
|
Query parameters. Defaults to None. |
None
|
files
|
Dict[str, Any] | None
|
Files for upload (for post-files requests). Defaults to None. |
None
|
response_handler
|
Callable | None
|
Response handler to override instance default. Defaults to None. |
None
|
headers
|
Dict[str, Any] | None
|
Headers to override instance default. Defaults to None. |
None
|
timeout
|
int
|
Timeout override in seconds (0 uses instance default). Defaults to 0. |
0
|
verbose
|
Union[bool, Any]
|
If True, prints response to stderr. If a file object, prints to that stream. Defaults to True. |
True
|
Returns:
| Name | Type | Description |
|---|---|---|
ServerResponse |
ServerResponse
|
Response object with status and parsed data. |
Source code in packages/utils/src/dataknobs_utils/requests_utils.py
415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 | |
ServerResponse ¶
Wrapper for HTTP response data with convenience properties.
Encapsulates response data from HTTP requests, providing easy access to status codes, JSON data, and response text with consistent interface.
Attributes:
| Name | Type | Description |
|---|---|---|
resp |
The underlying requests Response object. |
|
result |
Parsed response data (typically JSON). |
Methods:
| Name | Description |
|---|---|
add_extra |
Add additional metadata to the response. |
has_extra |
Check if extra data has been added. |
Source code in packages/utils/src/dataknobs_utils/requests_utils.py
Attributes¶
extra
property
¶
Get the extra data dictionary.
Lazily initializes an empty dictionary for storing additional metadata.
Returns:
| Type | Description |
|---|---|
Dict[str, Any]
|
Dict[str, Any]: Extra data dictionary. |
json
property
¶
Get the parsed JSON response data.
Alias for the result attribute.
Returns:
| Name | Type | Description |
|---|---|---|
Any |
Any
|
Parsed response data. |
status
property
¶
Get the HTTP status code.
Returns:
| Type | Description |
|---|---|
int | None
|
int | None: Status code, or None if no response. |
status_code
property
¶
Get the HTTP status code (alias for status).
Provided for consistency with requests.Response interface.
Returns:
| Type | Description |
|---|---|
int | None
|
int | None: Status code, or None if no response. |
succeeded
property
¶
Check if the request succeeded (status 200 or 201).
Returns:
| Name | Type | Description |
|---|---|---|
bool |
bool
|
True if status code is 200 or 201, False otherwise. |
text
property
¶
Get the response as text.
Returns:
| Name | Type | Description |
|---|---|---|
str |
str
|
Response text (JSON-serialized if result is not a string). |
Functions¶
add_extra ¶
Add additional metadata to the response.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
str
|
Metadata key. |
required |
value
|
Any
|
Metadata value. |
required |
has_extra ¶
Check if extra data has been added.
Returns:
| Name | Type | Description |
|---|---|---|
bool |
bool
|
True if extra data exists and is non-empty. |
Functions¶
delete_request ¶
delete_request(
api_request: str,
params: Dict[str, Any] | None = None,
headers: Dict[str, Any] | None = None,
timeout: int = DEFAULT_TIMEOUT,
api_response_handler: Callable[
[Response], Tuple[Response, Any]
] = default_api_response_handler,
requests: Any = requests,
) -> Tuple[requests.models.Response, Any]
Execute an HTTP DELETE request and return the response.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
api_request
|
str
|
Full URL for the API request. |
required |
params
|
Dict[str, Any] | None
|
Query parameters for the request. Defaults to None. |
None
|
headers
|
Dict[str, Any] | None
|
HTTP headers. If None, uses HEADERS constant. Defaults to None. |
None
|
timeout
|
int
|
Request timeout in seconds. Defaults to DEFAULT_TIMEOUT. |
DEFAULT_TIMEOUT
|
api_response_handler
|
Callable[[Response], Tuple[Response, Any]]
|
Function to process the response. Defaults to json_api_response_handler. |
default_api_response_handler
|
requests
|
Any
|
Arg for alternate or "mock" requests package override |
requests
|
Returns:
| Type | Description |
|---|---|
Tuple[Response, Any]
|
Tuple[requests.models.Response, Any]: Tuple of (response object, parsed result). |
Source code in packages/utils/src/dataknobs_utils/requests_utils.py
get_current_ip ¶
Get the running machine's IPv4 address.
Attempts to determine the local IP address by: 1. Connecting to an external service (Google DNS) to find the outbound IP 2. Falling back to hostname resolution 3. Finally returning localhost as last resort
Returns:
| Name | Type | Description |
|---|---|---|
str |
str
|
The machine's IPv4 address (or "127.0.0.1" if detection fails). |
Source code in packages/utils/src/dataknobs_utils/requests_utils.py
get_request ¶
get_request(
api_request: str,
params: Dict[str, Any] | None = None,
headers: Dict[str, Any] | None = None,
timeout: int = DEFAULT_TIMEOUT,
api_response_handler: Callable[
[Response], Tuple[Response, Any]
] = default_api_response_handler,
requests: Any = requests,
) -> Tuple[requests.models.Response, Any]
Execute an HTTP GET request and return the response.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
api_request
|
str
|
Full URL for the API request. |
required |
params
|
Dict[str, Any] | None
|
Query parameters for the request. Defaults to None. |
None
|
headers
|
Dict[str, Any] | None
|
HTTP headers. If None, uses HEADERS constant. Defaults to None. |
None
|
timeout
|
int
|
Request timeout in seconds. Defaults to DEFAULT_TIMEOUT. |
DEFAULT_TIMEOUT
|
api_response_handler
|
Callable[[Response], Tuple[Response, Any]]
|
Function to process the response. Defaults to json_api_response_handler. |
default_api_response_handler
|
requests
|
Any
|
Requests library or mock for testing. Defaults to requests. |
requests
|
Returns:
| Type | Description |
|---|---|
Tuple[Response, Any]
|
Tuple[requests.models.Response, Any]: Tuple of (response object, parsed result). |
Source code in packages/utils/src/dataknobs_utils/requests_utils.py
post_files_request ¶
post_files_request(
api_request: str,
files: Dict[str, Any],
headers: Dict[str, Any] | None = None,
timeout: int = DEFAULT_TIMEOUT,
api_response_handler: Callable[
[Response], Tuple[Response, Any]
] = default_api_response_handler,
requests: Any = requests,
) -> Tuple[requests.models.Response, Any]
Execute an HTTP POST request with file uploads.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
api_request
|
str
|
Full URL for the API request. |
required |
files
|
Dict[str, Any]
|
Dictionary of {file_id: file_data} entries, where file_data can be: - Simple: open('report.xls', 'rb') (must open in binary mode) - Detailed: ('filename', file_object, 'content_type', {'headers': 'values'}) |
required |
headers
|
Dict[str, Any] | None
|
HTTP headers. Defaults to None. |
None
|
timeout
|
int
|
Request timeout in seconds. Defaults to DEFAULT_TIMEOUT. |
DEFAULT_TIMEOUT
|
api_response_handler
|
Callable[[Response], Tuple[Response, Any]]
|
Function to process the response. Defaults to json_api_response_handler. |
default_api_response_handler
|
requests
|
Any
|
Requests library or mock for testing. Defaults to requests. |
requests
|
Returns:
| Type | Description |
|---|---|
Tuple[Response, Any]
|
Tuple[requests.models.Response, Any]: Tuple of (response object, parsed result). |
Examples:
from dataknobs_utils.requests_utils import post_files_request
url = "http://example.com/upload"
# Simple file upload
with open('report.xls', 'rb') as f:
post_files_request(url, {'myfile': f})
# With metadata
with open('report.xls', 'rb') as f:
post_files_request(url, {
'myfile': ('report.xls', f,
'application/vnd.ms-excel', {'Expires': '0'})
})
Source code in packages/utils/src/dataknobs_utils/requests_utils.py
post_request ¶
post_request(
api_request: str,
payload: Dict[str, Any],
params: Dict[str, Any] | None = None,
headers: Dict[str, Any] | None = None,
timeout: int = DEFAULT_TIMEOUT,
api_response_handler: Callable[
[Response], Tuple[Response, Any]
] = default_api_response_handler,
requests: Any = requests,
) -> Tuple[requests.models.Response, Any]
Execute an HTTP POST request and return the response.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
api_request
|
str
|
Full URL for the API request. |
required |
payload
|
Dict[str, Any]
|
Request body data to send. |
required |
params
|
Dict[str, Any] | None
|
Query parameters for the request. Defaults to None. |
None
|
headers
|
Dict[str, Any] | None
|
HTTP headers. If None, uses HEADERS constant. Defaults to None. |
None
|
timeout
|
int
|
Request timeout in seconds. Defaults to DEFAULT_TIMEOUT. |
DEFAULT_TIMEOUT
|
api_response_handler
|
Callable[[Response], Tuple[Response, Any]]
|
Function to process the response. Defaults to json_api_response_handler. |
default_api_response_handler
|
requests
|
Any
|
Requests library or mock for testing. Defaults to requests. |
requests
|
Returns:
| Type | Description |
|---|---|
Tuple[Response, Any]
|
Tuple[requests.models.Response, Any]: Tuple of (response object, parsed result). |
Source code in packages/utils/src/dataknobs_utils/requests_utils.py
put_request ¶
put_request(
api_request: str,
payload: Dict[str, Any],
params: Dict[str, Any] | None = None,
headers: Dict[str, Any] | None = None,
timeout: int = DEFAULT_TIMEOUT,
api_response_handler: Callable[
[Response], Tuple[Response, Any]
] = default_api_response_handler,
requests: Any = requests,
) -> Tuple[requests.models.Response, Any]
Execute an HTTP PUT request and return the response.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
api_request
|
str
|
Full URL for the API request. |
required |
payload
|
Dict[str, Any]
|
Request body data to send. |
required |
params
|
Dict[str, Any] | None
|
Query parameters for the request. Defaults to None. |
None
|
headers
|
Dict[str, Any] | None
|
HTTP headers. If None, uses HEADERS constant. Defaults to None. |
None
|
timeout
|
int
|
Request timeout in seconds. Defaults to DEFAULT_TIMEOUT. |
DEFAULT_TIMEOUT
|
api_response_handler
|
Callable[[Response], Tuple[Response, Any]]
|
Function to process the response. Defaults to json_api_response_handler. |
default_api_response_handler
|
requests
|
Any
|
Requests library or mock for testing. Defaults to requests. |
requests
|
Returns:
| Type | Description |
|---|---|
Tuple[Response, Any]
|
Tuple[requests.models.Response, Any]: Tuple of (response object, parsed result). |
Source code in packages/utils/src/dataknobs_utils/requests_utils.py
resource_utils¶
Functions¶
dataknobs_utils.resource_utils ¶
General data utilities for working with 3rd party resources
Functions:
| Name | Description |
|---|---|
active_datadir |
Get the active data directory from available locations. |
download_nltk_resources |
Download NLTK resources that don't yet exist locally. |
get_nltk_resources_dir |
Get the NLTK resources directory and optionally download resources. |
get_nltk_wordnet |
Get NLTK's WordNet corpus, ensuring resources are downloaded. |
Functions¶
active_datadir ¶
Get the active data directory from available locations.
Searches for an existing data directory in the following order: 1. DATADIR environment variable 2. $HOME/data directory 3. /data directory
Note
An active data directory must exist to get a non-None result.
Returns:
| Type | Description |
|---|---|
str | None
|
str | None: Path to the active data directory, or None if not found. |
Source code in packages/utils/src/dataknobs_utils/resource_utils.py
download_nltk_resources ¶
download_nltk_resources(
resources: Dict[str, str] | None,
resources_dir: str | None = None,
verbose: bool = False,
downloader: Callable = nltk.download,
) -> None
Download NLTK resources that don't yet exist locally.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
resources
|
Dict[str, str] | None
|
Dictionary mapping resource names to their relative paths (relative to resources_dir). |
required |
resources_dir
|
str | None
|
Root directory for resources. If None, uses the default from get_nltk_resources_dir(). Defaults to None. |
None
|
verbose
|
bool
|
If True, prints download status messages. Defaults to False. |
False
|
downloader
|
Callable
|
Callable for downloading resources. Defaults to nltk.download. |
download
|
Source code in packages/utils/src/dataknobs_utils/resource_utils.py
get_nltk_resources_dir ¶
get_nltk_resources_dir(
resources: Dict[str, str] | None = None,
verbose: bool = False,
downloader: Callable = nltk.download,
) -> str | None
Get the NLTK resources directory and optionally download resources.
Determines the NLTK resources directory from: 1. NLTK_DATA environment variable 2. active_datadir()/NLTK_RESOURCES_PATH
If resources are specified, downloads any missing resources to the directory.
Note
An active DATADIR must exist to get a non-None result.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
resources
|
Dict[str, str] | None
|
Optional dictionary mapping resource names to their relative paths (relative to NLTK resources dir) to ensure are downloaded. |
None
|
verbose
|
bool
|
If True, prints status messages. Defaults to False. |
False
|
downloader
|
Callable
|
Callable for downloading resources. Defaults to nltk.download. |
download
|
Returns:
| Type | Description |
|---|---|
str | None
|
str | None: Path to NLTK resources directory, or None if not found. |
Source code in packages/utils/src/dataknobs_utils/resource_utils.py
get_nltk_wordnet ¶
Get NLTK's WordNet corpus, ensuring resources are downloaded.
Automatically downloads required WordNet resources if not already present.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
downloader
|
Callable
|
Callable for downloading resources. Defaults to nltk.download. |
download
|
Returns:
| Type | Description |
|---|---|
Any
|
nltk.corpus.wordnet: The NLTK WordNet corpus object, or None if resources directory cannot be determined. |
Source code in packages/utils/src/dataknobs_utils/resource_utils.py
sql_utils¶
Functions¶
dataknobs_utils.sql_utils ¶
SQL database utility functions and connection management.
Provides utilities for working with SQL databases including PostgreSQL, with support for connection management, query execution, and data loading.
Classes:
| Name | Description |
|---|---|
DataFrameRecordFetcher |
Fetch records from a pandas DataFrame by ID. |
DictionaryRecordFetcher |
Fetch records from a dictionary mapping IDs to record values. |
DotenvPostgresConnector |
PostgreSQL connection manager using environment variables and project vars. |
PostgresDB |
PostgreSQL database wrapper with utilities for querying and managing tables. |
PostgresRecordFetcher |
Fetch records from a PostgreSQL table by ID. |
RecordFetcher |
Abstract base class for fetching records from a data source. |
Classes¶
DataFrameRecordFetcher ¶
DataFrameRecordFetcher(
df: DataFrame,
id_field_name: str = "id",
fields_to_retrieve: List[str] | None = None,
one_based_ids: bool = False,
)
Bases: RecordFetcher
Fetch records from a pandas DataFrame by ID.
Attributes:
| Name | Type | Description |
|---|---|---|
df |
DataFrame containing records to fetch from. |
Initialize DataFrame record fetcher.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
df
|
DataFrame
|
DataFrame containing records. |
required |
id_field_name
|
str
|
Name of the integer ID field. Defaults to "id". |
'id'
|
fields_to_retrieve
|
List[str] | None
|
Subset of fields to retrieve. If None, retrieves all fields. Defaults to None. |
None
|
one_based_ids
|
bool
|
True if DataFrame uses 1-based IDs. Defaults to False. |
False
|
Methods:
| Name | Description |
|---|---|
get_records |
Fetch records from DataFrame by IDs. |
Source code in packages/utils/src/dataknobs_utils/sql_utils.py
Functions¶
get_records ¶
get_records(
ids: List[int],
one_based: bool = False,
fields_to_retrieve: List[str] | None = None,
) -> pd.DataFrame
Fetch records from DataFrame by IDs.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
ids
|
List[int]
|
Collection of record IDs to retrieve. |
required |
one_based
|
bool
|
True if provided IDs are 1-based. Defaults to False. |
False
|
fields_to_retrieve
|
List[str] | None
|
Subset of fields for this call, overriding instance default. Defaults to None. |
None
|
Returns:
| Type | Description |
|---|---|
DataFrame
|
pd.DataFrame: DataFrame containing the retrieved records. |
Source code in packages/utils/src/dataknobs_utils/sql_utils.py
DictionaryRecordFetcher ¶
DictionaryRecordFetcher(
the_dict: Dict[int, List[Any]],
all_field_names: List[str],
id_field_name: str = "id",
fields_to_retrieve: List[str] | None = None,
one_based_ids: bool = False,
)
Bases: RecordFetcher
Fetch records from a dictionary mapping IDs to record values.
Attributes:
| Name | Type | Description |
|---|---|---|
the_dict |
Dictionary mapping IDs to record value lists. |
|
field_names |
Field names corresponding to record value positions. |
Initialize dictionary record fetcher.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
the_dict
|
Dict[int, List[Any]]
|
Dictionary mapping IDs to lists of record values. |
required |
all_field_names
|
List[str]
|
Field names in same order as record value lists. |
required |
id_field_name
|
str
|
Name of the integer ID field. Defaults to "id". |
'id'
|
fields_to_retrieve
|
List[str] | None
|
Subset of fields to retrieve. If None, retrieves all fields. Defaults to None. |
None
|
one_based_ids
|
bool
|
True if dictionary uses 1-based IDs. Defaults to False. |
False
|
Methods:
| Name | Description |
|---|---|
get_records |
Fetch records from dictionary by IDs. |
Source code in packages/utils/src/dataknobs_utils/sql_utils.py
Functions¶
get_records ¶
get_records(
ids: List[int],
one_based: bool = False,
fields_to_retrieve: List[str] | None = None,
) -> pd.DataFrame
Fetch records from dictionary by IDs.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
ids
|
List[int]
|
Collection of record IDs to retrieve. |
required |
one_based
|
bool
|
True if provided IDs are 1-based. Defaults to False. |
False
|
fields_to_retrieve
|
List[str] | None
|
Subset of fields for this call, overriding instance default. Defaults to None. |
None
|
Returns:
| Type | Description |
|---|---|
DataFrame
|
pd.DataFrame: DataFrame containing the retrieved records, with None values for missing IDs. |
Source code in packages/utils/src/dataknobs_utils/sql_utils.py
DotenvPostgresConnector ¶
DotenvPostgresConnector(
host: str | None = None,
db: str | None = None,
user: str | None = None,
pwd: str | None = None,
port: int | None = None,
pvname: str = ".project_vars",
)
PostgreSQL connection manager using environment variables and project vars.
Loads database connection parameters from environment variables (.env), project variables file, or constructor arguments, with environment variables taking precedence.
Attributes:
| Name | Type | Description |
|---|---|---|
host |
Database host address. |
|
database |
Database name. |
|
user |
Database username. |
|
password |
Database password. |
|
port |
Database port number. |
Initialize PostgreSQL connector with environment-based configuration.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
host
|
str | None
|
Database host. If None, uses POSTGRES_HOST environment variable or "localhost". Defaults to None. |
None
|
db
|
str | None
|
Database name. If None, uses POSTGRES_DB environment variable or "postgres". Defaults to None. |
None
|
user
|
str | None
|
Username. If None, uses POSTGRES_USER environment variable or "postgres". Defaults to None. |
None
|
pwd
|
str | None
|
Password. If None, uses POSTGRES_PASSWORD environment variable. Defaults to None. |
None
|
port
|
int | None
|
Port number. If None, uses POSTGRES_PORT environment variable or 5432. Defaults to None. |
None
|
pvname
|
str
|
Project variables filename to load. Defaults to ".project_vars". |
'.project_vars'
|
Methods:
| Name | Description |
|---|---|
get_conn |
Create and return a PostgreSQL database connection. |
Source code in packages/utils/src/dataknobs_utils/sql_utils.py
PostgresDB ¶
PostgresDB(
host: str | DotenvPostgresConnector | None = None,
db: str | None = None,
user: str | None = None,
pwd: str | None = None,
port: int | None = None,
)
PostgreSQL database wrapper with utilities for querying and managing tables.
Provides high-level interface for executing queries, managing tables, and uploading DataFrames to PostgreSQL databases.
Attributes:
| Name | Type | Description |
|---|---|---|
_connector |
Connection manager for database operations. |
Initialize PostgreSQL database wrapper.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
host
|
str | DotenvPostgresConnector | None
|
Database host or DotenvPostgresConnector instance. If None, uses environment configuration. Defaults to None. |
None
|
db
|
str | None
|
Database name. If None, uses environment configuration. Defaults to None. |
None
|
user
|
str | None
|
Username. If None, uses environment configuration. Defaults to None. |
None
|
pwd
|
str | None
|
Password. If None, uses environment configuration. Defaults to None. |
None
|
port
|
int | None
|
Port number. If None, uses environment configuration. Defaults to None. |
None
|
Methods:
| Name | Description |
|---|---|
execute |
Execute a SQL statement and commit changes. |
get_conn |
Get a connection to the PostgreSQL database. |
query |
Execute a SQL query and return results as a DataFrame. |
table_head |
Get the first N rows from a table. |
upload |
Upload DataFrame data to a database table. |
Source code in packages/utils/src/dataknobs_utils/sql_utils.py
Attributes¶
table_names
property
¶
Get list of all table names in the database.
Returns:
| Type | Description |
|---|---|
List[str]
|
List[str]: List of table names from the public schema. |
tables_df
property
¶
Get DataFrame of database table metadata.
Note
The exact schema is database-specific. For PostgreSQL, queries information_schema.tables.
Returns:
| Type | Description |
|---|---|
DataFrame
|
pd.DataFrame: Table metadata from information_schema.tables. |
Functions¶
execute ¶
Execute a SQL statement and commit changes.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
stmt
|
str
|
SQL statement to execute. |
required |
params
|
Dict[str, Any] | None
|
Optional dictionary of parameters for safe injection. Defaults to None. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
int |
int
|
Number of rows affected by the statement. |
Source code in packages/utils/src/dataknobs_utils/sql_utils.py
get_conn ¶
Get a connection to the PostgreSQL database.
Returns:
| Type | Description |
|---|---|
Any
|
psycopg2.connection: Active database connection. |
query ¶
Execute a SQL query and return results as a DataFrame.
Uses parameterized queries for safe injection of values.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query
|
str
|
SQL query string to execute. |
required |
params
|
Dict[str, Any] | None
|
Dictionary of parameters to safely inject. Each parameter "param" should appear as "%(param)s" in the query string. Defaults to None. |
None
|
Returns:
| Type | Description |
|---|---|
DataFrame
|
pd.DataFrame: Query results with column names from the cursor. |
Source code in packages/utils/src/dataknobs_utils/sql_utils.py
table_head ¶
Get the first N rows from a table.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
table_name
|
str
|
Name of the table to sample. |
required |
n
|
int
|
Number of rows to return. Defaults to 10. |
10
|
Returns:
| Type | Description |
|---|---|
DataFrame
|
pd.DataFrame: First N rows from the table. |
Source code in packages/utils/src/dataknobs_utils/sql_utils.py
upload ¶
Upload DataFrame data to a database table.
Creates the table if it doesn't exist, inferring schema from DataFrame types.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
table_name
|
str
|
Name of the table to insert data into. |
required |
df
|
DataFrame
|
DataFrame with columns matching table fields and data to upload. |
required |
Source code in packages/utils/src/dataknobs_utils/sql_utils.py
PostgresRecordFetcher ¶
PostgresRecordFetcher(
db: PostgresDB,
table_name: str,
id_field_name: str = "id",
fields_to_retrieve: List[str] | None = None,
one_based_ids: bool = False,
)
Bases: RecordFetcher
Fetch records from a PostgreSQL table by ID.
Attributes:
| Name | Type | Description |
|---|---|---|
db |
PostgreSQL database connection wrapper. |
|
table_name |
Name of the table to query. |
Initialize PostgreSQL record fetcher.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
db
|
PostgresDB
|
PostgresDB instance for database operations. |
required |
table_name
|
str
|
Name of the table to fetch records from. |
required |
id_field_name
|
str
|
Name of the integer ID field. Defaults to "id". |
'id'
|
fields_to_retrieve
|
List[str] | None
|
Subset of fields to retrieve. If None, retrieves all fields. Defaults to None. |
None
|
one_based_ids
|
bool
|
True if data source uses 1-based IDs. Defaults to False. |
False
|
Methods:
| Name | Description |
|---|---|
get_records |
Fetch records from PostgreSQL table by IDs. |
Source code in packages/utils/src/dataknobs_utils/sql_utils.py
Functions¶
get_records ¶
get_records(
ids: List[int],
one_based: bool = False,
fields_to_retrieve: List[str] | None = None,
) -> pd.DataFrame
Fetch records from PostgreSQL table by IDs.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
ids
|
List[int]
|
Collection of record IDs to retrieve. |
required |
one_based
|
bool
|
True if provided IDs are 1-based. Defaults to False. |
False
|
fields_to_retrieve
|
List[str] | None
|
Subset of fields for this call, overriding instance default. Defaults to None. |
None
|
Returns:
| Type | Description |
|---|---|
DataFrame
|
pd.DataFrame: DataFrame containing the retrieved records. |
Source code in packages/utils/src/dataknobs_utils/sql_utils.py
RecordFetcher ¶
RecordFetcher(
id_field_name: str = "id",
fields_to_retrieve: List[str] | None = None,
one_based_ids: bool = False,
)
Bases: ABC
Abstract base class for fetching records from a data source.
Provides a common interface for retrieving records by ID from various data sources (databases, DataFrames, dictionaries, etc.) with support for zero-based and one-based ID systems.
Attributes:
| Name | Type | Description |
|---|---|---|
id_field_name |
Name of the ID field in the data source. |
|
fields_to_retrieve |
Subset of fields to retrieve (None for all). |
|
one_based |
True if data source uses 1-based IDs. |
Initialize the record fetcher.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
id_field_name
|
str
|
Name of the integer ID field. Defaults to "id". |
'id'
|
fields_to_retrieve
|
List[str] | None
|
Subset of fields to retrieve. If None, retrieves all fields. Defaults to None. |
None
|
one_based_ids
|
bool
|
True if data source uses 1-based IDs, False for 0-based. Defaults to False. |
False
|
Methods:
| Name | Description |
|---|---|
get_records |
Fetch records by ID from the data source. |
Source code in packages/utils/src/dataknobs_utils/sql_utils.py
Functions¶
get_records
abstractmethod
¶
get_records(
ids: List[int],
one_based: bool = False,
fields_to_retrieve: List[str] | None = None,
) -> pd.DataFrame
Fetch records by ID from the data source.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
ids
|
List[int]
|
Collection of record IDs to retrieve. |
required |
one_based
|
bool
|
True if the provided IDs are 1-based. Defaults to False. |
False
|
fields_to_retrieve
|
List[str] | None
|
Subset of fields for this call, overriding instance default. Defaults to None. |
None
|
Returns:
| Type | Description |
|---|---|
DataFrame
|
pd.DataFrame: DataFrame containing the retrieved records. |
Raises:
| Type | Description |
|---|---|
NotImplementedError
|
Must be implemented by subclasses. |
Source code in packages/utils/src/dataknobs_utils/sql_utils.py
Functions¶
stats_utils¶
Functions¶
dataknobs_utils.stats_utils ¶
Statistical utility functions and timing helpers.
Provides utilities for timing operations, random waits, rate limiting, and basic statistical calculations.
Classes:
| Name | Description |
|---|---|
KeyManager |
Turn descriptions of key types into keys of the form type-N. |
LinearRegression |
A low-memory helper class to collect (x,y) samples and compute linear regression. |
Monitor |
A monitor tracks processing and/or access times and rates for some function. |
MonitorManager |
Manage a set of monitors by label, providing rollup views across all monitors. |
RollingStats |
A collection of statistics through a rolling window of time. |
StatsAccumulator |
A low-memory helper class to collect statistical samples and provide summary statistics. |
Functions:
| Name | Description |
|---|---|
wait_for_random_millis |
Wait for a random number of milliseconds. |
Classes¶
KeyManager ¶
Turn descriptions of key types into keys of the form type-N.
Maps (keytype, description) pairs to unique keys by tracking descriptions for each keytype and assigning sequential numbers. The same description always maps to the same key for a given keytype.
Example
from dataknobs_utils.stats_utils import KeyManager km = KeyManager() km.get_key("endpoint", "/api/users") "endpoint-0" km.get_key("endpoint", "/api/posts") "endpoint-1" km.get_key("endpoint", "/api/users") "endpoint-0"
Initialize a KeyManager.
Methods:
| Name | Description |
|---|---|
get_key |
Get a unique key for a (keytype, description) pair. |
Source code in packages/utils/src/dataknobs_utils/stats_utils.py
Functions¶
get_key ¶
Get a unique key for a (keytype, description) pair.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
keytype
|
str
|
The key type (e.g., "endpoint", "function"). |
required |
description
|
str
|
The description to convert to a key. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
str |
str
|
A unique key in the form "keytype-N" where N is the index. |
Source code in packages/utils/src/dataknobs_utils/stats_utils.py
LinearRegression ¶
A low-memory helper class to collect (x,y) samples and compute linear regression.
Incrementally computes the line of best fit (y = mx + b) from (x,y) samples without storing individual points. Uses the least squares method to calculate the slope (m) and intercept (b).
Attributes:
| Name | Type | Description |
|---|---|---|
n |
int
|
Number of (x,y) samples added. |
x_sum |
float
|
Sum of all x values. |
y_sum |
float
|
Sum of all y values. |
xy_sum |
float
|
Sum of all x*y products. |
xx_sum |
float
|
Sum of all x*x products. |
yy_sum |
float
|
Sum of all y*y products. |
m |
float
|
Slope of the regression line (computed lazily). |
b |
float
|
Y-intercept of the regression line (computed lazily). |
Initialize a LinearRegression accumulator.
Methods:
| Name | Description |
|---|---|
__str__ |
Get the regression equation as a string. |
add |
Add an (x,y) sample point. |
get_y |
Compute the predicted y value for a given x. |
Source code in packages/utils/src/dataknobs_utils/stats_utils.py
Attributes¶
Functions¶
__str__ ¶
Get the regression equation as a string.
Returns:
| Name | Type | Description |
|---|---|---|
str |
str
|
The equation in the form "y = m x + b". |
add ¶
Add an (x,y) sample point.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
x
|
float
|
The x value. |
required |
y
|
float
|
The y value. |
required |
Source code in packages/utils/src/dataknobs_utils/stats_utils.py
get_y ¶
Compute the predicted y value for a given x.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
x
|
float
|
The x value. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
float |
float
|
The predicted y value (y = mx + b). |
Monitor ¶
Monitor(
description: str | None = None,
access_times: RollingStats | None = None,
processing_times: RollingStats | None = None,
default_window_width: int = 300000,
default_segment_width: int = 5000,
)
A monitor tracks processing and/or access times and rates for some function.
Monitors measure both access frequency (how often something is called) and processing time (how long it takes). Uses RollingStats to provide both cumulative and rolling window statistics.
Attributes:
| Name | Type | Description |
|---|---|---|
alive_since |
datetime
|
Time when this monitor was created. |
description |
str | None
|
Optional description of what is being monitored. |
last_access_time |
datetime | None
|
Time when access was last recorded. |
access_times |
RollingStats | None
|
RollingStats for tracking time between accesses. |
processing_times |
RollingStats | None
|
RollingStats for tracking processing duration. |
access_cumulative_stats |
StatsAccumulator | None
|
Cumulative access time statistics. |
access_window_stats |
StatsAccumulator | None
|
Rolling window access time statistics. |
access_window_width |
int | None
|
Width of the access time rolling window. |
processing_cumulative_stats |
StatsAccumulator | None
|
Cumulative processing time statistics. |
processing_window_stats |
StatsAccumulator | None
|
Rolling window processing time statistics. |
processing_window_width |
int | None
|
Width of the processing time rolling window. |
default_window_width |
int
|
Default window width for auto-created RollingStats. |
default_segment_width |
int
|
Default segment width for auto-created RollingStats. |
Initialize a Monitor.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
description
|
str | None
|
Optional description of what is being monitored. Defaults to None. |
None
|
access_times
|
RollingStats | None
|
RollingStats instance for tracking access times. Defaults to None (created on first mark). |
None
|
processing_times
|
RollingStats | None
|
RollingStats instance for tracking processing times. Defaults to None (created on first mark with endtime). |
None
|
default_window_width
|
int
|
Default window width in milliseconds for auto-created RollingStats. Defaults to 300000 (5 minutes). |
300000
|
default_segment_width
|
int
|
Default segment width in milliseconds for auto-created RollingStats. Defaults to 5000 (5 seconds). |
5000
|
Methods:
| Name | Description |
|---|---|
__str__ |
Get the monitor statistics as a JSON string. |
as_dict |
Get a dictionary containing a summary of this instance's information. |
get_stats |
Get window or cumulative access or processing stats. |
mark |
Mark another access and optionally record processing time. |
Source code in packages/utils/src/dataknobs_utils/stats_utils.py
Attributes¶
access_cumulative_stats
property
¶
Get cumulative access time statistics.
access_window_stats
property
¶
Get rolling window access time statistics.
access_window_width
property
¶
Get the access time rolling window width in milliseconds.
default_segment_width
property
writable
¶
Get the default segment width for auto-created RollingStats.
default_window_width
property
writable
¶
Get the default window width for auto-created RollingStats.
description
property
writable
¶
Get the description of what is being monitored.
last_access_time
property
¶
Get the time at which access was last recorded.
processing_cumulative_stats
property
¶
Get cumulative processing time statistics.
processing_times
property
¶
Get the processing_times (RollingStats).
processing_window_stats
property
¶
Get rolling window processing time statistics.
processing_window_width
property
¶
Get the processing time rolling window width in milliseconds.
Functions¶
__str__ ¶
Get the monitor statistics as a JSON string.
Returns:
| Name | Type | Description |
|---|---|---|
str |
str
|
JSON representation of the statistics dictionary. |
as_dict ¶
Get a dictionary containing a summary of this instance's information.
Returns:
| Type | Description |
|---|---|
Dict[str, Any]
|
Dict[str, Any]: Dictionary with keys: alive_since, description (optional), last_mark (optional), access_stats (optional), processing_stats (optional). |
Source code in packages/utils/src/dataknobs_utils/stats_utils.py
get_stats ¶
Get window or cumulative access or processing stats.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
access
|
bool
|
If True, get access stats; if False, get processing stats. Defaults to False. |
False
|
window
|
bool
|
If True, get window stats; if False, get cumulative stats. Defaults to False. |
False
|
Returns:
| Type | Description |
|---|---|
StatsAccumulator | None
|
StatsAccumulator | None: The requested statistics, or None if not available. |
Source code in packages/utils/src/dataknobs_utils/stats_utils.py
mark ¶
Mark another access and optionally record processing time.
Records the time between this call and the previous call as an access time. If endtime is provided, also records the time between starttime and endtime as processing time.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
starttime
|
datetime
|
The start time of this access. |
required |
endtime
|
datetime | None
|
Optional end time for recording processing duration. Defaults to None. |
None
|
Source code in packages/utils/src/dataknobs_utils/stats_utils.py
MonitorManager ¶
Manage a set of monitors by label, providing rollup views across all monitors.
Maintains a collection of Monitor instances identified by labels, allowing aggregation of statistics across multiple monitors. Uses a KeyManager to generate unique keys from type and description pairs.
Attributes:
| Name | Type | Description |
|---|---|---|
default_window_width |
int
|
Default window width for auto-created Monitors. |
default_segment_width |
int
|
Default segment width for auto-created Monitors. |
Initialize a MonitorManager.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
default_window_width
|
int
|
Default window width in milliseconds for auto-created Monitors. Defaults to 300000 (5 minutes). |
300000
|
default_segment_width
|
int
|
Default segment width in milliseconds for auto-created Monitors. Defaults to 5000 (5 seconds). |
5000
|
Methods:
| Name | Description |
|---|---|
__str__ |
Get the monitor manager statistics as a JSON string. |
as_dict |
Get a dictionary containing a summary of this instance's information. |
get_monitor |
Get a monitor by label, optionally creating it if missing. |
get_monitors |
Get all monitors. |
get_or_create_monitor_by_key_type |
Get or create a monitor using a key type and description. |
get_overall_stats |
Get overall statistics aggregated across all monitors. |
get_stats |
Get window or cumulative access or processing stats for a label or all monitors. |
set_monitor |
Set or replace a monitor for a given label. |
Source code in packages/utils/src/dataknobs_utils/stats_utils.py
Attributes¶
default_segment_width
property
writable
¶
Get the default segment width for auto-created Monitors.
default_window_width
property
writable
¶
Get the default window width for auto-created Monitors.
Functions¶
__str__ ¶
Get the monitor manager statistics as a JSON string.
Returns:
| Name | Type | Description |
|---|---|---|
str |
str
|
JSON representation of the statistics dictionary. |
as_dict ¶
Get a dictionary containing a summary of this instance's information.
Returns:
| Type | Description |
|---|---|
Dict[str, Any]
|
Dict[str, Any]: Dictionary with overall_stats and individual monitor statistics keyed by label. |
Source code in packages/utils/src/dataknobs_utils/stats_utils.py
get_monitor ¶
get_monitor(
label: str, create_if_missing: bool = False, description: str | None = None
) -> Monitor | None
Get a monitor by label, optionally creating it if missing.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
label
|
str
|
The monitor label. |
required |
create_if_missing
|
bool
|
If True, create a new Monitor if the label doesn't exist. Defaults to False. |
False
|
description
|
str | None
|
Optional description for a newly created Monitor. Defaults to None. |
None
|
Returns:
| Type | Description |
|---|---|
Monitor | None
|
Monitor | None: The Monitor instance, or None if not found and create_if_missing is False. |
Source code in packages/utils/src/dataknobs_utils/stats_utils.py
get_monitors ¶
Get all monitors.
Returns:
| Type | Description |
|---|---|
Dict[str, Monitor]
|
Dict[str, Monitor]: Dictionary mapping labels to Monitor instances. |
get_or_create_monitor_by_key_type ¶
Get or create a monitor using a key type and description.
Generates a unique key from the keytype and description using the internal KeyManager, then gets or creates a monitor with that key.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
keytype
|
str
|
The key type (e.g., "endpoint", "function"). |
required |
description
|
str
|
The description to convert to a key. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
Monitor |
Monitor
|
The Monitor instance (always created if missing). |
Source code in packages/utils/src/dataknobs_utils/stats_utils.py
get_overall_stats ¶
Get overall statistics aggregated across all monitors.
Returns:
| Type | Description |
|---|---|
Dict[str, Any]
|
Dict[str, Any]: Dictionary with keys: cumulative_processing, cumulative_access, window_processing, window_access (each optional). |
Source code in packages/utils/src/dataknobs_utils/stats_utils.py
get_stats ¶
get_stats(
label: str | None = None, access: bool = False, window: bool = False
) -> StatsAccumulator | None
Get window or cumulative access or processing stats for a label or all monitors.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
label
|
str | None
|
The monitor label, or None to aggregate across all monitors. Defaults to None. |
None
|
access
|
bool
|
If True, get access stats; if False, get processing stats. Defaults to False. |
False
|
window
|
bool
|
If True, get window stats; if False, get cumulative stats. Defaults to False. |
False
|
Returns:
| Type | Description |
|---|---|
StatsAccumulator | None
|
StatsAccumulator | None: The requested statistics, or None if not available. |
Source code in packages/utils/src/dataknobs_utils/stats_utils.py
RollingStats ¶
A collection of statistics through a rolling window of time.
Maintains both cumulative statistics (since initialization) and rolling window statistics (over a recent time period). The rolling window is divided into segments that are automatically cleared as time advances, providing an efficient way to track recent activity without storing all historical data.
Attributes:
| Name | Type | Description |
|---|---|---|
window_width |
int
|
Width of the rolling window in milliseconds. |
num_segments |
int
|
Number of segments dividing the window. |
cur_segment |
int
|
Current segment index. |
last_segment |
int
|
Last segment index (same as cur_segment). |
start_time |
datetime
|
Time when this RollingStats was created or reset. |
ref_time |
datetime
|
Most recent reference time (updates on each access). |
cumulative_stats |
StatsAccumulator
|
Statistics accumulated since start_time. |
window_stats |
StatsAccumulator
|
Statistics for the current rolling window. |
current_label |
str
|
Label for the current window. |
Initialize a RollingStats instance.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
window_width
|
int
|
Width of the rolling window in milliseconds. Defaults to 300000 (5 minutes). |
300000
|
segment_width
|
int
|
Width of each segment in milliseconds. Defaults to 5000 (5 seconds). |
5000
|
Methods:
| Name | Description |
|---|---|
__str__ |
Get the statistics as a JSON string. |
add |
Add one or more values to the current segment. |
as_dict |
Get a dictionary containing a summary of this instance's information. |
get_items_per_milli |
Extract the average number of items per millisecond from the stats. |
get_millis_per_item |
Extract the average number of milliseconds per item from the stats. |
has_window_activity |
Determine whether the current window has activity. |
reset |
Reset all statistics and restart from current time. |
Source code in packages/utils/src/dataknobs_utils/stats_utils.py
Attributes¶
cumulative_stats
property
¶
Get cumulative statistics since start_time.
last_segment
property
¶
Get the last segment index (same as cur_segment without update).
start_time
property
¶
Get the time when this RollingStats was created or last reset.
window_stats
property
¶
Get statistics for the current rolling window.
Functions¶
__str__ ¶
Get the statistics as a JSON string.
Returns:
| Name | Type | Description |
|---|---|---|
str |
str
|
JSON representation of the statistics dictionary. |
add ¶
Add one or more values to the current segment.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
*values
|
float
|
One or more numeric values to add. |
()
|
Returns:
| Name | Type | Description |
|---|---|---|
int |
int
|
The segment number to which the values were added (useful for testing). |
Source code in packages/utils/src/dataknobs_utils/stats_utils.py
as_dict ¶
Get a dictionary containing a summary of this instance's information.
Returns:
| Type | Description |
|---|---|
Dict[str, Any]
|
Dict[str, Any]: Dictionary with keys: now, cumulative, window. |
Source code in packages/utils/src/dataknobs_utils/stats_utils.py
get_items_per_milli
staticmethod
¶
Extract the average number of items per millisecond from the stats.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
stats
|
StatsAccumulator | None
|
StatsAccumulator instance or None. |
required |
Returns:
| Type | Description |
|---|---|
float | None
|
float | None: Average items per millisecond, or None if stats is None or mean is zero. |
Source code in packages/utils/src/dataknobs_utils/stats_utils.py
get_millis_per_item
staticmethod
¶
Extract the average number of milliseconds per item from the stats.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
stats
|
StatsAccumulator | None
|
StatsAccumulator instance or None. |
required |
Returns:
| Type | Description |
|---|---|
float | None
|
float | None: Average milliseconds per item, or None if stats is None or has no values. |
Source code in packages/utils/src/dataknobs_utils/stats_utils.py
has_window_activity ¶
Determine whether the current window has activity.
Returns:
| Type | Description |
|---|---|
Tuple[bool, StatsAccumulator]
|
Tuple[bool, StatsAccumulator]: A tuple of (has_activity, current_window_stats) where has_activity is True if the window has any values. |
Source code in packages/utils/src/dataknobs_utils/stats_utils.py
reset ¶
Reset all statistics and restart from current time.
Source code in packages/utils/src/dataknobs_utils/stats_utils.py
StatsAccumulator ¶
StatsAccumulator(
label: str = "",
other: Optional[StatsAccumulator] = None,
as_dict: Dict[str, Any] | None = None,
values: Union[float, List[float]] | None = None,
)
A low-memory helper class to collect statistical samples and provide summary statistics.
Accumulates statistical values in a thread-safe manner, computing mean, variance, standard deviation, min, max, and other summary statistics incrementally without storing individual values. Supports combining multiple accumulators and initialization from dictionaries.
Attributes:
| Name | Type | Description |
|---|---|---|
label |
str
|
A label or name for this instance. |
n |
int
|
The number of values added. |
min |
float
|
The minimum value added. |
max |
float
|
The maximum value added. |
sum |
float
|
The sum of all values added. |
sum_of_squares |
float
|
The sum of all squared values. |
mean |
float
|
The mean of the values. |
std |
float
|
The standard deviation of the values. |
var |
float
|
The variance of the values. |
Initialize a StatsAccumulator.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
label
|
str
|
A label or name for this instance. Defaults to "". |
''
|
other
|
Optional[StatsAccumulator]
|
Another StatsAccumulator instance to copy data from. Defaults to None. |
None
|
as_dict
|
Dict[str, Any] | None
|
Dictionary representation of stats to initialize with. Defaults to None. |
None
|
values
|
Union[float, List[float]] | None
|
A list of initial values or a single value to add. Defaults to None. |
None
|
Methods:
| Name | Description |
|---|---|
__str__ |
Get the statistics as a JSON string. |
add |
Add one or more values to the accumulator (thread-safe). |
as_dict |
Get a dictionary containing a summary of this instance's information. |
clear |
Clear all values and reset statistics. |
combine |
Create a new StatsAccumulator combining data from multiple accumulators. |
incorporate |
Incorporate another accumulator's data into this one. |
initialize |
Initialize with the given values. |
Source code in packages/utils/src/dataknobs_utils/stats_utils.py
Attributes¶
Functions¶
__str__ ¶
Get the statistics as a JSON string.
Returns:
| Name | Type | Description |
|---|---|---|
str |
str
|
JSON representation of the statistics dictionary. |
add ¶
Add one or more values to the accumulator (thread-safe).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
*values
|
float
|
One or more numeric values to add. |
()
|
Returns:
| Name | Type | Description |
|---|---|---|
StatsAccumulator |
StatsAccumulator
|
This instance for method chaining. |
Source code in packages/utils/src/dataknobs_utils/stats_utils.py
as_dict ¶
Get a dictionary containing a summary of this instance's information.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
with_sums
|
bool
|
If True, include sum and sos (sum of squares) in the output. Defaults to False. |
False
|
Returns:
| Type | Description |
|---|---|
Dict[str, Any]
|
Dict[str, Any]: Dictionary with keys: label, n, min, max, mean, std, and optionally sum and sos. |
Source code in packages/utils/src/dataknobs_utils/stats_utils.py
clear ¶
Clear all values and reset statistics.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
label
|
str
|
Optional new label to assign. Defaults to "". |
''
|
Source code in packages/utils/src/dataknobs_utils/stats_utils.py
combine
staticmethod
¶
Create a new StatsAccumulator combining data from multiple accumulators.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
label
|
str
|
Label for the new combined accumulator. |
required |
*stats_accumulators
|
StatsAccumulator
|
One or more StatsAccumulator instances to combine. |
()
|
Returns:
| Name | Type | Description |
|---|---|---|
StatsAccumulator |
StatsAccumulator
|
New accumulator as if it had accumulated all data from the provided accumulators. |
Source code in packages/utils/src/dataknobs_utils/stats_utils.py
incorporate ¶
Incorporate another accumulator's data into this one.
Merges the statistics as if this accumulator had accumulated all values from both accumulators.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
other
|
Optional[StatsAccumulator]
|
Another StatsAccumulator to incorporate. If None, no action taken. |
required |
Source code in packages/utils/src/dataknobs_utils/stats_utils.py
initialize ¶
initialize(
label: str = "",
n: int = 0,
min: float = 0,
max: float = 0,
mean: float = 0,
std: float = 0,
as_dict: Dict[str, Any] | None = None,
) -> None
Initialize with the given values.
When as_dict is provided, its values override the individual parameters. Computes internal sum and sum_of_squares from mean and std if not provided in the dictionary.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
label
|
str
|
A label or name for this instance. Defaults to "". |
''
|
n
|
int
|
Number of values. Defaults to 0. |
0
|
min
|
float
|
Minimum value. Defaults to 0. |
0
|
max
|
float
|
Maximum value. Defaults to 0. |
0
|
mean
|
float
|
Mean value. Defaults to 0. |
0
|
std
|
float
|
Standard deviation. Defaults to 0. |
0
|
as_dict
|
Dict[str, Any] | None
|
Dictionary with keys: label, n, min, max, mean, std, sum, sos. Values from this dict override individual parameters. Defaults to None. |
None
|
Source code in packages/utils/src/dataknobs_utils/stats_utils.py
Functions¶
wait_for_random_millis ¶
Wait for a random number of milliseconds.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
uptomillis
|
float
|
Maximum number of milliseconds to wait. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
float |
float
|
The actual wait time in milliseconds. |
Source code in packages/utils/src/dataknobs_utils/stats_utils.py
subprocess_utils¶
Functions¶
dataknobs_utils.subprocess_utils ¶
Subprocess execution utilities for running system commands.
Provides functions for executing system commands and processing their output line-by-line with callback handlers.
Functions:
| Name | Description |
|---|---|
run_command |
Run a system command and process output line by line. |
Functions¶
run_command ¶
run_command(
handle_line_fn: Callable[[str], bool],
command: str,
args: List[str] | None = None,
) -> int
Run a system command and process output line by line.
Executes a command and calls a handler function for each line of output. The handler can signal early termination by returning False.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
handle_line_fn
|
Callable[[str], bool]
|
Callback function that takes each output line and returns True to continue processing or False to kill the process immediately. |
required |
command
|
str
|
Command string with args (if args=None) or just command name (if args provided). |
required |
args
|
List[str] | None
|
Optional list of command arguments. If provided, command runs without shell=True. Defaults to None. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
int |
int
|
The command's return code, or 0 if poll returns None. |
Examples:
>>> from dataknobs_utils.subprocess_utils import run_command
>>>
>>> # Print all files in the directory:
>>> run_command(lambda x: (print(x), True)[1], 'ls -1')
>>>
>>> # Print files until "foo" is found:
>>> run_command(lambda x: (print(x), x!='foo')[1], 'ls -1')
Source code in packages/utils/src/dataknobs_utils/subprocess_utils.py
sys_utils¶
Functions¶
dataknobs_utils.sys_utils ¶
System and environment utility functions.
Provides utilities for loading environment variables, network discovery, and system configuration management.
Classes:
| Name | Description |
|---|---|
MySubnet |
Collect and store information about the current process's subnet. |
Functions:
| Name | Description |
|---|---|
load_project_vars |
Load project variables from the closest configuration file. |
Classes¶
MySubnet ¶
Collect and store information about the current process's subnet.
Discovers and caches information about the local machine and other hosts on the same subnet using nmap network scanning.
Attributes:
| Name | Type | Description |
|---|---|---|
my_hostname |
str
|
The local machine's hostname. |
my_ip |
str
|
The local machine's IP address. |
all_ips |
Dict[str, str]
|
Dictionary mapping hostnames to IP addresses for all active hosts on the subnet (cached). |
Methods:
| Name | Description |
|---|---|
get_ip |
Get the first IP address matching a hostname pattern. |
get_ips |
Get IP addresses of hosts matching a name pattern. |
rescan |
Clear cached subnet information to force a fresh scan. |
Source code in packages/utils/src/dataknobs_utils/sys_utils.py
Attributes¶
all_ips
property
¶
Get a dictionary mapping hostname to ip_address of all hosts that are "up" on the currently running process's subnet.
These are cached on first call but will be recomputed after
a call to self.rescan().
Functions¶
get_ip ¶
Get the first IP address matching a hostname pattern.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name_re
|
Union[str, Pattern]
|
Regular expression pattern or string to match against hostnames. |
required |
Returns:
| Type | Description |
|---|---|
str | None
|
str | None: The first matching IP address, or None if no matches found. |
Source code in packages/utils/src/dataknobs_utils/sys_utils.py
get_ips ¶
Get IP addresses of hosts matching a name pattern.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name_re
|
Union[str, Pattern]
|
Regular expression pattern or string to match against hostnames. |
required |
Returns:
| Type | Description |
|---|---|
Dict[str, str]
|
Dict[str, str]: Dictionary mapping matching hostnames to their IP addresses. |
Source code in packages/utils/src/dataknobs_utils/sys_utils.py
rescan ¶
Clear cached subnet information to force a fresh scan.
Call this method to invalidate the cached subnet hosts data. The next access to all_ips will trigger a new nmap scan.
Source code in packages/utils/src/dataknobs_utils/sys_utils.py
Functions¶
load_project_vars ¶
load_project_vars(
pvname: str = ".project_vars",
include_dot_env: bool = True,
set_environ: bool = False,
start_path: Path | None = None,
) -> Dict[str, str] | None
Load project variables from the closest configuration file.
Walks up the directory tree from the current working directory to find the closest project variables file and optionally merges with .env settings.
Notes
- Project variable files can be checked into the repo (public)
- .env files should not be checked in (contain secrets)
- .env variables will override project variables
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pvname
|
str
|
Name of the project variables file. Defaults to ".project_vars". |
'.project_vars'
|
include_dot_env
|
bool
|
If True, also loads and merges the closest .env file. Defaults to True. |
True
|
set_environ
|
bool
|
If True, also sets loaded variables in os.environ. Only sets variables not already in environment. Defaults to False. |
False
|
start_path
|
Path | None
|
Directory to start searching from. Defaults to cwd. |
None
|
Returns:
| Type | Description |
|---|---|
Dict[str, str] | None
|
Dict[str, str] | None: Dictionary of configuration variables, or None if no configuration file is found. |
Source code in packages/utils/src/dataknobs_utils/sys_utils.py
xml_utils¶
Functions¶
dataknobs_utils.xml_utils ¶
XML processing utilities with streaming and memory-efficient parsing.
Provides classes and functions for parsing and streaming XML documents, including the XmlStream abstract base class for memory-efficient processing.
Classes:
| Name | Description |
|---|---|
XMLTagStream |
Memory-efficient XML tag chunking using memory-mapped file access. |
XmlElementGrabber |
Stream matching XML elements with their full path from root. |
XmlLeafStream |
Stream XML leaf nodes with their full path from root. |
XmlStream |
Abstract base class for streaming XML content with memory management. |
Functions:
| Name | Description |
|---|---|
html_table_scraper |
Extract HTML table data into a pandas DataFrame. |
soup_generator |
Generate BeautifulSoup objects for each occurrence of a tag in XML. |
Classes¶
XMLTagStream ¶
Memory-efficient XML tag chunking using memory-mapped file access.
Processes large XML files by extracting individual tag instances as BeautifulSoup objects using memory-mapped I/O for efficient processing without loading the entire file.
Attributes:
| Name | Type | Description |
|---|---|---|
tag_name |
XML tag name to extract. |
|
encoding |
Character encoding of the XML file. |
Source code in packages/utils/src/dataknobs_utils/xml_utils.py
XmlElementGrabber ¶
XmlElementGrabber(
source: str | TextIO,
match: str | Callable[[Element], bool],
auto_clear_elts: bool = True,
)
Bases: XmlStream
Stream matching XML elements with their full path from root.
Finds and yields elements matching a tag name or custom condition, returning the highest matching element (not searching within matched elements).
Examples:
Get first 10 elements with tag "foo":
>>> import dataknobs_utils.xml_utils as xml_utils
>>> xml_fpath = "path/to/file.xml" # Replace with actual XML file path
>>> g = xml_utils.XmlElementGrabber(xml_fpath, "foo")
>>> first_10_foos = list(g.take(10))
Attributes:
| Name | Type | Description |
|---|---|---|
match |
Tag name or callable for matching elements. |
|
count |
Number of matching elements found. |
Initialize the XML element grabber.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
source
|
str | TextIO
|
Path to XML file or file object. |
required |
match
|
str | Callable[[Element], bool]
|
Tag name to match or callable that returns True when an element should be matched. |
required |
auto_clear_elts
|
bool
|
If True, automatically clears closed elements. Defaults to True. |
True
|
Methods:
| Name | Description |
|---|---|
loop_through_elements |
Find the next matching element with its full path from root. |
Source code in packages/utils/src/dataknobs_utils/xml_utils.py
Functions¶
loop_through_elements ¶
Find the next matching element with its full path from root.
Returns:
| Type | Description |
|---|---|
list[Element]
|
list[ET.Element]: Element path from root to the next matching element. |
Source code in packages/utils/src/dataknobs_utils/xml_utils.py
XmlLeafStream ¶
Bases: XmlStream
Stream XML leaf nodes with their full path from root.
Iterates through an XML document yielding each leaf (terminal) element along with its ancestor path from the root. Useful for extracting data from deeply nested XML structures.
Examples:
Show paths to first 10 leaf nodes with text or attribute values:
import dataknobs_utils.xml_utils as xml_utils
xml_fpath = "path/to/file.xml" # Replace with actual XML file path
s = xml_utils.XmlLeafStream(xml_fpath)
for idx, elts in enumerate(s):
print(f'{idx} ', s.to_string(elts, ["value", "extension", "code", "ID"]))
if idx >= 9:
break
Attributes:
| Name | Type | Description |
|---|---|---|
count |
Number of leaf nodes processed. |
|
elts |
list[Element] | None
|
Most recently yielded element path. |
Initialize the XML leaf stream.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
source
|
str | TextIO
|
Path to XML file or file object. |
required |
auto_clear_elts
|
bool
|
If True, automatically clears closed elements. Defaults to True. |
True
|
Methods:
| Name | Description |
|---|---|
loop_through_elements |
Collect the next leaf element with its full path from root. |
Source code in packages/utils/src/dataknobs_utils/xml_utils.py
Functions¶
loop_through_elements ¶
Collect the next leaf element with its full path from root.
Returns:
| Type | Description |
|---|---|
list[Element]
|
list[ET.Element]: Element path from root to the next leaf element. |
Source code in packages/utils/src/dataknobs_utils/xml_utils.py
XmlStream ¶
Bases: ABC
Abstract base class for streaming XML content with memory management.
Provides a framework for iterating through XML documents using element-based streaming. Subclasses must implement loop_through_elements() to define specific element collection behavior. Automatically manages element cleanup to prevent memory buildup when processing large XML files.
Note
Extending classes must implement the loop_through_elements() abstract method.
Attributes:
| Name | Type | Description |
|---|---|---|
source |
XML filename or file object. |
|
auto_clear_elts |
If True, automatically clears closed elements to save memory. |
Initialize the XML stream.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
source
|
str | TextIO
|
Path to XML file or file object. |
required |
auto_clear_elts
|
bool
|
If True, automatically clears closed elements to prevent memory buildup. Defaults to True. |
True
|
Methods:
| Name | Description |
|---|---|
add_to_context |
Add an element to the context stack. |
find_context_idx |
Find the most recent index of an element in the context stack. |
loop_through_elements |
Process XML elements until collecting the next desired element(s). |
next_xml_iter |
Get the next event and element from the XML iterator. |
pop_closed_from_context |
Remove a closed element and its descendants from the context. |
take |
Generate the next N items from this iterator. |
to_string |
Convert element path to dot-delimited string with optional leaf value. |
Source code in packages/utils/src/dataknobs_utils/xml_utils.py
Attributes¶
context
property
¶
Get the current element context stack from root to current position.
Returns:
| Type | Description |
|---|---|
list[Element]
|
list[ET.Element]: Copy of the element stack from root to current element. |
context_length
property
¶
Get the depth of the current element in the XML tree.
Returns:
| Name | Type | Description |
|---|---|---|
int |
int
|
Number of elements in the context stack. |
Functions¶
add_to_context ¶
Add an element to the context stack.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
elem
|
Element
|
Element to add to the context. |
required |
find_context_idx ¶
Find the most recent index of an element in the context stack.
Searches backwards through the context to find the element's index.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
elem
|
Element
|
Element to find in the context. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
int |
int
|
Index of the element in context, or -1 if not found. |
Source code in packages/utils/src/dataknobs_utils/xml_utils.py
loop_through_elements
abstractmethod
¶
Process XML elements until collecting the next desired element(s).
Subclasses must implement this method to define specific element collection behavior, updating the context as elements are encountered.
Returns:
| Type | Description |
|---|---|
list[Element]
|
list[ET.Element]: The collected element(s) in context from root. |
Raises:
| Type | Description |
|---|---|
NotImplementedError
|
Must be implemented by subclasses. |
Source code in packages/utils/src/dataknobs_utils/xml_utils.py
next_xml_iter ¶
Get the next event and element from the XML iterator.
Returns:
| Type | Description |
|---|---|
tuple[str, Element]
|
tuple[str, ET.Element]: Tuple of (event, element) where event is 'start' or 'end'. |
Raises:
| Type | Description |
|---|---|
StopIteration
|
When the XML iterator is exhausted. |
Source code in packages/utils/src/dataknobs_utils/xml_utils.py
pop_closed_from_context ¶
Remove a closed element and its descendants from the context.
Truncates the context at the element's position and optionally clears the element's memory if auto_clear_elts is enabled.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
closed_elem
|
Element
|
The element that has closed. |
required |
idx
|
int | None
|
Index of the element in context (computed if None). Defaults to None. |
None
|
Source code in packages/utils/src/dataknobs_utils/xml_utils.py
take ¶
Generate the next N items from this iterator.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
n
|
int
|
Number of items to take. |
required |
Yields:
| Type | Description |
|---|---|
list[Element]
|
list[ET.Element]: Each collected element list, stopping early if iterator is exhausted. |
Source code in packages/utils/src/dataknobs_utils/xml_utils.py
to_string
staticmethod
¶
Convert element path to dot-delimited string with optional leaf value.
Creates an XPath-like string representation of the element hierarchy, optionally appending text or attribute values from the leaf element.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
elt_path
|
list[Element]
|
List of elements from root to leaf. |
required |
with_text_or_atts
|
bool | str | list[str]
|
Controls value appending behavior: - True: Append leaf element's text if present - str: Append value of specified attribute if present - list[str]: Append first non-empty attribute value from list - False: Don't append any values |
True
|
Returns:
| Name | Type | Description |
|---|---|---|
str |
str
|
Dot-delimited path (e.g., "root.child.leaf|text=\"value\""). |
Source code in packages/utils/src/dataknobs_utils/xml_utils.py
Functions¶
html_table_scraper ¶
Extract HTML table data into a pandas DataFrame.
Scrapes table data from a BeautifulSoup table element, handling headers, nested elements, and non-breaking spaces. Concatenates text from nested elements within cells.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
soup_table
|
Tag
|
BeautifulSoup table element to scrape. |
required |
add_header_as_row
|
bool
|
If True, includes header row in the data rows in addition to using it as column names. Defaults to False. |
False
|
Returns:
| Type | Description | |
|---|---|---|
DataFrame
|
pd.DataFrame: DataFrame with scraped table data. Column names are taken from elements if present, otherwise columns are unnamed.
| |
Source code in packages/utils/src/dataknobs_utils/xml_utils.py
soup_generator ¶
soup_generator(
xmlfilepath: str,
tag_name: str,
with_attrs: bool = False,
encoding: str = "utf-8",
) -> Generator[bs4.BeautifulSoup, None, None]
Generate BeautifulSoup objects for each occurrence of a tag in XML.
Efficiently processes large XML files by yielding BeautifulSoup objects for each instance of the specified tag using memory-mapped I/O.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
xmlfilepath
|
str
|
Path to the XML file. |
required |
tag_name
|
str
|
XML tag name to extract. |
required |
with_attrs
|
bool
|
If True, matches tags with attributes. Defaults to False. |
False
|
encoding
|
str
|
File character encoding. Defaults to "utf-8". |
'utf-8'
|
Yields:
| Type | Description |
|---|---|
BeautifulSoup
|
bs4.BeautifulSoup: Soup object for each tag instance in the file. |
Source code in packages/utils/src/dataknobs_utils/xml_utils.py
emoji_utils¶
Functions¶
dataknobs_utils.emoji_utils ¶
Utilities for working with unicode emojis.
References: * https://home.unicode.org/emoji/about-emoji/ * https://unicode.org/emoji/techindex.html * https://www.unicode.org/Public/emoji/15.0/ * https://www.unicode.org/Public/emoji/15.0/emoji-test.txt
Emoji basics:
- Emojis are represented by one ore more characters
- Compound emojis are built using a "zero width joiner" of U+200D
- This distinguishes a complex compound emoji from an adjacent emoji
- That can be displayed as a single emoji
- Such sequences are called "ZWJ" sequences
- An optional "variation selector 16", U+FE0F, can follow an emoji's chars
- This indicates to render the emoji with its variation(s)
Version and Updates:
- Current unicode version is 15.0
- Data collection is from emoji-test.txt
- Watch out for format changes in that file if/when updating the version
Usage:
- Download the desired version's emoji-test.txt resource.
- Create an EmojiData instance with the path to the resource
- Use EmojiData to
- Mark emoji locations (BIO) in text
- Extract emojis from text
- Lookup, browse, investigate emojis with their metadata (Emoji dataclass)
Classes:
| Name | Description |
|---|---|
Emoji |
Metadata for a Unicode emoji from the emoji-test.txt file. |
EmojiData |
Parser and analyzer for Unicode emoji-test.txt files. |
Functions:
| Name | Description |
|---|---|
build_emoji_dataclass |
Parse an emoji-test.txt file line into an Emoji dataclass. |
get_emoji_seq |
Convert emoji string to sequence of code points. |
load_emoji_data |
Load emoji data from emoji-test.txt file. |
Classes¶
Emoji
dataclass
¶
Emoji(
emoji: str,
status: str,
since_version: str,
short_name: str,
group: str | None = None,
subgroup: str | None = None,
)
Metadata for a Unicode emoji from the emoji-test.txt file.
Attributes:
| Name | Type | Description |
|---|---|---|
emoji |
str
|
The emoji character(s) as a string. |
status |
str
|
Qualification status (e.g., 'fully-qualified', 'minimally-qualified', 'unqualified', 'component'). |
since_version |
str
|
Unicode version when emoji was introduced (e.g., 'E13.0'). |
short_name |
str
|
Short English name describing the emoji. |
group |
str | None
|
Optional emoji group category from test file. Defaults to None. |
subgroup |
str | None
|
Optional emoji subgroup category from test file. Defaults to None. |
EmojiData ¶
Parser and analyzer for Unicode emoji-test.txt files.
Loads emoji metadata from Unicode's emoji-test.txt file and provides utilities for identifying emojis in text, extracting emoji sequences, and querying emoji properties.
Attributes:
| Name | Type | Description |
|---|---|---|
emojis |
Dict[str, Emoji]
|
Dictionary mapping emoji characters to their Emoji metadata. |
Methods:
| Name | Description |
|---|---|
emoji_bio |
Create BIO tags identifying emoji character positions in text. |
emojis_with_cp |
Find all emojis containing a specific code point. |
get_emojis |
Extract all emojis from text with their metadata. |
Source code in packages/utils/src/dataknobs_utils/emoji_utils.py
Attributes¶
echars
property
¶
Get code points that standalone represent complete emojis.
Returns:
| Type | Description |
|---|---|
List[int]
|
List[int]: List of code points that by themselves form valid emojis. |
ldepechars
property
¶
Get code points that precede other code points in emoji sequences.
Maps code points that are not standalone emojis but can precede emoji code points in compound sequences.
Returns:
| Type | Description |
|---|---|
Dict[int, Set[int]]
|
Dict[int, Set[int]]: Dictionary where keys are left-dependent code points and values are sets of emoji code points that can follow. |
rdepechars
property
¶
Get code points that follow other code points in emoji sequences.
Maps code points that are not standalone emojis but can follow emoji code points in compound sequences.
Returns:
| Type | Description |
|---|---|
Dict[int, Set[int]]
|
Dict[int, Set[int]]: Dictionary where keys are right-dependent code points and values are sets of emoji code points that can precede. |
Functions¶
emoji_bio ¶
Create BIO tags identifying emoji character positions in text.
Generates a string of the same length as the input where each character is tagged as 'B' (Begin - first char of emoji), 'I' (Internal - subsequent chars in emoji), or 'O' (Outer - not part of emoji).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
emoji_text
|
str
|
Input text to analyze. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
str |
str
|
BIO-tagged string of same length as input. |
Source code in packages/utils/src/dataknobs_utils/emoji_utils.py
emojis_with_cp ¶
Find all emojis containing a specific code point.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
cp
|
int
|
Unicode code point to search for. |
required |
Returns:
| Type | Description |
|---|---|
List[Emoji]
|
List[Emoji]: List of emoji metadata objects containing the code point. |
Source code in packages/utils/src/dataknobs_utils/emoji_utils.py
get_emojis ¶
Extract all emojis from text with their metadata.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
text
|
str
|
Arbitrary text to search for emojis. |
required |
Returns:
| Type | Description |
|---|---|
List[Emoji]
|
List[Emoji]: List of emoji metadata objects found in the text (empty if no emojis found). |
Source code in packages/utils/src/dataknobs_utils/emoji_utils.py
Functions¶
build_emoji_dataclass ¶
Parse an emoji-test.txt file line into an Emoji dataclass.
Parses lines matching the emoji-test.txt format to extract emoji metadata. Lines not matching the expected format are ignored.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
emoji_test_line
|
str
|
Single line from emoji-test.txt file. |
required |
Returns:
| Type | Description |
|---|---|
Emoji | None
|
Emoji | None: Parsed emoji metadata, or None if line doesn't match expected format. |
Source code in packages/utils/src/dataknobs_utils/emoji_utils.py
get_emoji_seq ¶
Convert emoji string to sequence of code points.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
emoji_str
|
str
|
Emoji string to convert. |
required |
as_hex
|
bool
|
If True, returns hex strings; if False, returns integers. Defaults to False. |
False
|
Returns:
| Type | Description |
|---|---|
List[Union[int, str]]
|
List[Union[int, str]]: List of code points as integers or hex strings. |
Source code in packages/utils/src/dataknobs_utils/emoji_utils.py
load_emoji_data ¶
Load emoji data from emoji-test.txt file.
Attempts to load from the EMOJI_TEST_DATA environment variable if set, otherwise uses the default latest emoji data file.
Returns:
| Type | Description |
|---|---|
EmojiData | None
|
EmojiData | None: Loaded emoji data, or None if the data file doesn't exist. |
Source code in packages/utils/src/dataknobs_utils/emoji_utils.py
json_extractor¶
Classes and Functions¶
dataknobs_utils.json_extractor ¶
Extract and repair JSON objects from text strings.
Provides the JSONExtractor class for finding, extracting, and repairing JSON objects embedded in text, with categorization of complete vs fixed objects.
Classes:
| Name | Description |
|---|---|
JSONExtractor |
Extract and repair JSON objects from text strings. |
Classes¶
JSONExtractor ¶
Extract and repair JSON objects from text strings.
Provides functionality to extract well-formed JSON objects, attempt to repair malformed JSON, and separate non-JSON text. Extracted objects are categorized as complete (well-formed) or fixed (repaired from malformed state).
Attributes:
| Name | Type | Description |
|---|---|---|
complete_jsons |
List[Dict[str, Any]]
|
List of well-formed JSON objects extracted from text. |
fixed_jsons |
List[Dict[str, Any]]
|
List of JSON objects that were malformed but successfully repaired. |
non_json_text |
str
|
Text content that doesn't contain JSON objects. |
Methods:
| Name | Description |
|---|---|
extract_jsons |
Extract all JSON objects from text, repairing malformed JSON when possible. |
get_complete_jsons |
Get all well-formed JSON objects extracted from text. |
get_fixed_jsons |
Get all JSON objects that were repaired from malformed state. |
get_non_json_text |
Get text content after removing all extracted JSON objects. |
get_value |
Get a value at a specific path from the first matching JSON object. |
get_values |
Get values at a specific path from all complete JSON objects. |
Source code in packages/utils/src/dataknobs_utils/json_extractor.py
Functions¶
extract_jsons ¶
Extract all JSON objects from text, repairing malformed JSON when possible.
Searches for JSON objects in the text, attempts to parse them, and repairs malformed JSON by closing unclosed brackets, quotes, and fixing trailing commas. Updates instance attributes with categorized results and remaining non-JSON text.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
text
|
str
|
Text string potentially containing JSON objects. |
required |
Returns:
| Type | Description |
|---|---|
List[Dict[str, Any]]
|
List[Dict[str, Any]]: All successfully extracted and parsed JSON objects (both complete and fixed). |
Source code in packages/utils/src/dataknobs_utils/json_extractor.py
get_complete_jsons ¶
Get all well-formed JSON objects extracted from text.
Returns:
| Type | Description |
|---|---|
List[Dict[str, Any]]
|
List[Dict[str, Any]]: List of JSON objects that were successfully parsed without requiring repairs. |
Source code in packages/utils/src/dataknobs_utils/json_extractor.py
get_fixed_jsons ¶
Get all JSON objects that were repaired from malformed state.
Returns:
| Type | Description |
|---|---|
List[Dict[str, Any]]
|
List[Dict[str, Any]]: List of JSON objects that required repair (closing brackets, quotes, etc.) before successful parsing. |
Source code in packages/utils/src/dataknobs_utils/json_extractor.py
get_non_json_text ¶
Get text content after removing all extracted JSON objects.
Returns:
| Name | Type | Description |
|---|---|---|
str |
str
|
Remaining text with all JSON objects (both complete and fixed) removed. |
Source code in packages/utils/src/dataknobs_utils/json_extractor.py
get_value ¶
Get a value at a specific path from the first matching JSON object.
Uses dot notation to navigate nested JSON structures and retrieves the value from the first complete JSON object that contains the specified path.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key_path
|
str
|
Dot-separated path to the value (e.g., 's3.bucket' for nested structures). |
required |
default
|
Any | None
|
Value to return if the path doesn't exist in any object. Defaults to None. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
Any |
Any
|
The value found at the path, or the default value if not found. |
Source code in packages/utils/src/dataknobs_utils/json_extractor.py
get_values ¶
Get values at a specific path from all complete JSON objects.
Uses dot notation to navigate nested JSON structures and retrieves values from all complete JSON objects that contain the specified path.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key_path
|
str
|
Dot-separated path to the value (e.g., 's3.bucket' for nested structures). |
required |
Returns:
| Type | Description |
|---|---|
List[Any]
|
List[Any]: List of values found at the path (empty if none found). |
Source code in packages/utils/src/dataknobs_utils/json_extractor.py
Functions¶
Usage Examples¶
File Processing Example¶
from dataknobs_utils import file_utils
# Generate all Python files in a directory
for filepath in file_utils.filepath_generator("/path/to/project"):
if filepath.endswith(".py"):
# Process each line
for line in file_utils.fileline_generator(filepath):
print(f"{filepath}: {line}")
# Write processed results
processed_lines = ["result 1", "result 2", "result 3"]
file_utils.write_lines("output.txt", processed_lines)
Elasticsearch Example¶
from dataknobs_utils import elasticsearch_utils
# Build a search query
query = elasticsearch_utils.build_field_query_dict(
["title", "content"],
"machine learning"
)
# Create index configuration
table_settings = elasticsearch_utils.TableSettings(
"documents",
{"number_of_shards": 1},
{
"properties": {
"title": {"type": "text"},
"content": {"type": "text"}
}
}
)
# Create and use index
index = elasticsearch_utils.ElasticsearchIndex(None, [table_settings])
results = index.search(query)
LLM Utils Example¶
from dataknobs_utils import llm_utils
# Create prompt message
message = llm_utils.PromptMessage(
"user",
"Analyze this data and provide insights",
metadata={"priority": "high", "model": "gpt-4"}
)
# Access nested configuration
config = {
"models": {
"gpt4": {
"temperature": 0.7,
"max_tokens": 1000
}
}
}
temperature = llm_utils.get_value_by_key(
config, "models.gpt4.temperature", 0.5
)
print(f"Temperature: {temperature}") # 0.7
Integration Example¶
from dataknobs_utils import (
file_utils, json_utils, elasticsearch_utils, llm_utils
)
import json
# Complete data processing pipeline
def process_documents(input_dir: str, es_index: str):
"""Process JSON documents and index in Elasticsearch."""
# Step 1: Collect documents
documents = []
for filepath in file_utils.filepath_generator(input_dir):
if filepath.endswith('.json'):
for line in file_utils.fileline_generator(filepath):
try:
doc = json.loads(line)
documents.append(doc)
except json.JSONDecodeError:
continue
# Step 2: Process with LLM utils for configuration
config = json_utils.load_json_file("config.json")
es_config = llm_utils.get_value_by_key(
config, "elasticsearch.settings", {}
)
# Step 3: Index in Elasticsearch
table_settings = elasticsearch_utils.TableSettings(
es_index,
es_config,
{
"properties": {
"content": {"type": "text"},
"timestamp": {"type": "date"}
}
}
)
index = elasticsearch_utils.ElasticsearchIndex(None, [table_settings])
# Create batch file
with open("batch_data.jsonl", "w") as f:
elasticsearch_utils.add_batch_data(
f, iter(documents), es_index
)
return len(documents)
# Usage
processed_count = process_documents("/data/input", "processed_docs")
print(f"Processed {processed_count} documents")
Error Handling¶
All functions include appropriate error handling. Here are common patterns:
from dataknobs_utils import file_utils, elasticsearch_utils
try:
# File operations
lines = list(file_utils.fileline_generator("data.txt"))
except FileNotFoundError:
print("File not found")
except IOError as e:
print(f"IO error: {e}")
try:
# Elasticsearch operations
index = elasticsearch_utils.ElasticsearchIndex(None, [])
if not index.is_up():
raise ConnectionError("Elasticsearch not available")
except ConnectionError:
print("Cannot connect to Elasticsearch")
Testing¶
Example test patterns for dataknobs_utils:
import pytest
import tempfile
import os
from dataknobs_utils import file_utils, llm_utils
def test_file_operations():
"""Test file utility functions."""
with tempfile.TemporaryDirectory() as temp_dir:
# Test file writing and reading
test_file = os.path.join(temp_dir, "test.txt")
test_lines = ["line1", "line2", "line3"]
file_utils.write_lines(test_file, test_lines)
read_lines = list(file_utils.fileline_generator(test_file))
assert read_lines == sorted(test_lines) # write_lines sorts
def test_llm_utils():
"""Test LLM utility functions."""
# Test nested dictionary access
data = {"a": {"b": {"c": "value"}}}
result = llm_utils.get_value_by_key(data, "a.b.c")
assert result == "value"
result = llm_utils.get_value_by_key(data, "x.y.z", "default")
assert result == "default"
# Test PromptMessage
msg = llm_utils.PromptMessage("user", "test", {"key": "value"})
assert msg.role == "user"
assert msg.content == "test"
assert msg.metadata["key"] == "value"
Performance Notes¶
- File Utils: Uses generators for memory-efficient processing of large files
- Elasticsearch Utils: Supports batch operations for better performance
- JSON Utils: Optimized for streaming JSON processing
- Pandas Utils: Efficient DataFrame operations with proper data types
Dependencies¶
Core dependencies for dataknobs_utils:
pandas>=1.3.0
numpy>=1.20.0
requests>=2.25.0
psycopg2-binary>=2.8.6 # for SQL utils
elasticsearch>=7.0.0 # optional, for elasticsearch_utils
Contributing¶
For contributing to dataknobs_utils:
- Fork the repository
- Create feature branch
- Add tests for new functionality
- Ensure all tests pass
- Submit pull request
See Contributing Guide for detailed information.
Changelog¶
Version 1.0.0¶
- Initial release
- Core utility modules
- Elasticsearch integration
- File processing utilities
- LLM prompt management
- JSON processing tools
License¶
See License for license information.