Skip to content

dataknobs-utils API Reference

Complete API documentation for the dataknobs_utils package.

💡 Quick Links: - Complete API Documentation - Full auto-generated reference - Source Code - Browse on GitHub - Package Guide - Detailed documentation

Package Information

  • Package Name: dataknobs_utils
  • Version: 1.0.0
  • Description: Utility functions for dataknobs packages
  • Python Requirements: >=3.8

Installation

pip install dataknobs-utils

Import Statement

from dataknobs_utils import (
    elasticsearch_utils,
    emoji_utils,
    file_utils,
    json_extractor,
    json_utils,
    llm_utils,
    pandas_utils,
    requests_utils,
    resource_utils,
    sql_utils,
    stats_utils,
    subprocess_utils,
    sys_utils,
    xml_utils,
)

Module Documentation

elasticsearch_utils

Classes

TableSettings

dataknobs_utils.elasticsearch_utils.TableSettings

TableSettings(
    table_name: str, data_settings: Dict[str, Any], data_mapping: Dict[str, Any]
)

Configuration container for an Elasticsearch index.

Attributes:

Name Type Description
name

Index name.

settings

Index settings (shards, replicas, analyzers, etc.).

mapping

Field mappings and types.

Initialize table settings.

Parameters:

Name Type Description Default
table_name str

Name of the Elasticsearch index.

required
data_settings Dict[str, Any]

Index settings dictionary.

required
data_mapping Dict[str, Any]

Field mappings dictionary.

required
Source code in packages/utils/src/dataknobs_utils/elasticsearch_utils.py
def __init__(
    self,
    table_name: str,
    data_settings: Dict[str, Any],
    data_mapping: Dict[str, Any],
) -> None:
    """Initialize table settings.

    Args:
        table_name: Name of the Elasticsearch index.
        data_settings: Index settings dictionary.
        data_mapping: Field mappings dictionary.
    """
    self.name = table_name
    self.settings = data_settings
    self.mapping = data_mapping

Functions

ElasticsearchIndex

dataknobs_utils.elasticsearch_utils.ElasticsearchIndex

ElasticsearchIndex(
    request_helper: Any | None,
    table_settings: List[TableSettings],
    elasticsearch_ip: str | None = None,
    elasticsearch_port: int = 9200,
    mock_requests: Any | None = None,
)

Wrapper for managing Elasticsearch indices with settings and mappings.

Handles index creation, initialization, and querying across multiple indices with predefined settings and mappings.

Attributes:

Name Type Description
request_helper Any

RequestHelper for making HTTP requests.

tables

List of TableSettings for managed indices.

Initialize Elasticsearch index manager.

Parameters:

Name Type Description Default
request_helper Any | None

Pre-configured RequestHelper. If None, creates one using elasticsearch_ip and elasticsearch_port. Defaults to None.

required
table_settings List[TableSettings]

List of TableSettings for indices to manage.

required
elasticsearch_ip str | None

Elasticsearch host IP. Used if request_helper is None. Defaults to None (uses "localhost").

None
elasticsearch_port int

Elasticsearch port. Defaults to 9200.

9200
mock_requests Any | None

Mock requests object for testing. Defaults to None.

None

Methods:

Name Description
analyze

Analyze text using a specified analyzer.

delete_table

Delete an index.

get_cluster_health

Get Elasticsearch cluster health information.

inspect_indices

List all indices with their statistics.

is_up

Check if the Elasticsearch server is reachable.

purge

Delete and recreate all managed indices.

search

Execute an Elasticsearch search DSL query.

sql

Execute an Elasticsearch SQL query.

Source code in packages/utils/src/dataknobs_utils/elasticsearch_utils.py
def __init__(
    self,
    request_helper: Any | None,
    table_settings: List[TableSettings],
    elasticsearch_ip: str | None = None,
    elasticsearch_port: int = 9200,
    mock_requests: Any | None = None,
) -> None:
    """Initialize Elasticsearch index manager.

    Args:
        request_helper: Pre-configured RequestHelper. If None, creates one
            using elasticsearch_ip and elasticsearch_port. Defaults to None.
        table_settings: List of TableSettings for indices to manage.
        elasticsearch_ip: Elasticsearch host IP. Used if request_helper is None.
            Defaults to None (uses "localhost").
        elasticsearch_port: Elasticsearch port. Defaults to 9200.
        mock_requests: Mock requests object for testing. Defaults to None.
    """
    self.request_helper: Any  # Always set, never None
    if request_helper is None:
        # Use localhost as default if no IP is provided
        self.request_helper = requests_utils.RequestHelper(
            elasticsearch_ip or "localhost",
            elasticsearch_port,
            mock_requests=mock_requests,
        )
    else:
        self.request_helper = request_helper
    self.tables = table_settings or []
    self._init_tables()

Functions

analyze

analyze(text: str, analyzer: str, verbose: bool = False) -> Any

Analyze text using a specified analyzer.

Parameters:

Name Type Description Default
text str

Text to analyze.

required
analyzer str

Name of the analyzer to use.

required
verbose bool

If True, prints response. Defaults to False.

False

Returns:

Name Type Description
ServerResponse Any

Response with analysis results (tokens, etc.).

Source code in packages/utils/src/dataknobs_utils/elasticsearch_utils.py
def analyze(
    self,
    text: str,
    analyzer: str,
    verbose: bool = False,
) -> Any:
    """Analyze text using a specified analyzer.

    Args:
        text: Text to analyze.
        analyzer: Name of the analyzer to use.
        verbose: If True, prints response. Defaults to False.

    Returns:
        ServerResponse: Response with analysis results (tokens, etc.).
    """
    return self._request(
        "post",
        "_analyze",
        payload=json.dumps(
            {
                "analyzer": analyzer,
                "text": text,
            }
        ),
        verbose=verbose,
    )

delete_table

delete_table(table_name: str, verbose: bool = False) -> Any

Delete an index.

Parameters:

Name Type Description Default
table_name str

Name of the index to delete.

required
verbose bool

If True, prints response. Defaults to False.

False

Returns:

Name Type Description
ServerResponse Any

Delete operation response.

Source code in packages/utils/src/dataknobs_utils/elasticsearch_utils.py
def delete_table(self, table_name: str, verbose: bool = False) -> Any:
    """Delete an index.

    Args:
        table_name: Name of the index to delete.
        verbose: If True, prints response. Defaults to False.

    Returns:
        ServerResponse: Delete operation response.
    """
    return self._request("delete", table_name, verbose=verbose)

get_cluster_health

get_cluster_health(verbose: bool = False) -> Any

Get Elasticsearch cluster health information.

Parameters:

Name Type Description Default
verbose bool

If True, prints response. Defaults to False.

False

Returns:

Name Type Description
ServerResponse Any

Response with cluster health data.

Source code in packages/utils/src/dataknobs_utils/elasticsearch_utils.py
def get_cluster_health(self, verbose: bool = False) -> Any:
    """Get Elasticsearch cluster health information.

    Args:
        verbose: If True, prints response. Defaults to False.

    Returns:
        ServerResponse: Response with cluster health data.
    """
    return self._request("get", "_cluster/health", verbose=verbose)

inspect_indices

inspect_indices(verbose: bool = False) -> Any

List all indices with their statistics.

Parameters:

Name Type Description Default
verbose bool

If True, prints response. Defaults to False.

False

Returns:

Name Type Description
ServerResponse Any

Response with indices information.

Source code in packages/utils/src/dataknobs_utils/elasticsearch_utils.py
def inspect_indices(self, verbose: bool = False) -> Any:
    """List all indices with their statistics.

    Args:
        verbose: If True, prints response. Defaults to False.

    Returns:
        ServerResponse: Response with indices information.
    """
    return self._request(
        "get",
        "_cat/indices?v&pretty",
        verbose=verbose,
        response_handler=requests_utils.plain_api_response_handler,
    )

is_up

is_up() -> bool

Check if the Elasticsearch server is reachable.

Returns:

Name Type Description
bool bool

True if server responds to cluster health check.

Source code in packages/utils/src/dataknobs_utils/elasticsearch_utils.py
def is_up(self) -> bool:
    """Check if the Elasticsearch server is reachable.

    Returns:
        bool: True if server responds to cluster health check.
    """
    resp = self._request("get", "_cluster/health")
    return bool(resp.succeeded if resp else False)

purge

purge(verbose: bool = False) -> Any

Delete and recreate all managed indices.

Removes all data by deleting indices, then recreates them with original settings and mappings.

Parameters:

Name Type Description Default
verbose bool

If True, prints responses. Defaults to False.

False

Returns:

Name Type Description
ServerResponse Any

Response from the last delete operation.

Source code in packages/utils/src/dataknobs_utils/elasticsearch_utils.py
def purge(self, verbose: bool = False) -> Any:
    """Delete and recreate all managed indices.

    Removes all data by deleting indices, then recreates them with
    original settings and mappings.

    Args:
        verbose: If True, prints responses. Defaults to False.

    Returns:
        ServerResponse: Response from the last delete operation.
    """
    resp = None
    for table in self.tables:
        resp = self.delete_table(table.name, verbose=verbose)
    self._init_tables()
    return resp

search

search(
    query: Dict[str, Any], table: str | None = None, verbose: bool = False
) -> Any | None

Execute an Elasticsearch search DSL query.

Parameters:

Name Type Description Default
query Dict[str, Any]

Elasticsearch query dictionary, e.g.: {"query": {"match": {field: {"query": text, "operator": "AND"}}}}

required
table str | None

Index name to search. If None, uses first managed index. Defaults to None.

None
verbose bool

If True, prints response. Defaults to False.

False

Returns:

Type Description
Any | None

ServerResponse | None: Response with results. If successful, resp.extra contains 'hits_df' and/or 'aggs_df' DataFrames. Returns None if no table is available.

Source code in packages/utils/src/dataknobs_utils/elasticsearch_utils.py
def search(
    self,
    query: Dict[str, Any],
    table: str | None = None,
    verbose: bool = False,
) -> Any | None:
    """Execute an Elasticsearch search DSL query.

    Args:
        query: Elasticsearch query dictionary, e.g.:
            {"query": {"match": {field: {"query": text, "operator": "AND"}}}}
        table: Index name to search. If None, uses first managed index.
            Defaults to None.
        verbose: If True, prints response. Defaults to False.

    Returns:
        ServerResponse | None: Response with results. If successful,
            resp.extra contains 'hits_df' and/or 'aggs_df' DataFrames.
            Returns None if no table is available.
    """
    if table is None:
        if len(self.tables) > 0:
            table = self.tables[0].name
        else:
            return None
    resp = self._request("post", f"{table}/_search", json.dumps(query), verbose=verbose)
    if resp.succeeded:
        d = decode_results(resp.result)
        for k, df in d.items():
            resp.add_extra(k, df)
    return resp

sql

sql(
    query: str,
    fetch_size: int = 10000,
    columnar: bool = True,
    verbose: bool = False,
) -> Any

Execute an Elasticsearch SQL query.

Automatically handles pagination using cursors to retrieve all results.

Parameters:

Name Type Description Default
query str

SQL query string.

required
fetch_size int

Maximum records per fetch. Defaults to 10000.

10000
columnar bool

If True, uses compact columnar format (better for few columns). Defaults to True.

True
verbose bool

If True, prints responses. Defaults to False.

False

Returns:

Name Type Description
ServerResponse Any

Response with results. If successful, resp.extra['df'] contains a DataFrame with all query results.

Source code in packages/utils/src/dataknobs_utils/elasticsearch_utils.py
def sql(
    self,
    query: str,
    fetch_size: int = 10000,
    columnar: bool = True,
    verbose: bool = False,
) -> Any:
    """Execute an Elasticsearch SQL query.

    Automatically handles pagination using cursors to retrieve all results.

    Args:
        query: SQL query string.
        fetch_size: Maximum records per fetch. Defaults to 10000.
        columnar: If True, uses compact columnar format (better for few columns).
            Defaults to True.
        verbose: If True, prints responses. Defaults to False.

    Returns:
        ServerResponse: Response with results. If successful, resp.extra['df']
            contains a DataFrame with all query results.
    """
    df = None
    payload = json.dumps(
        {
            "query": query,
            "fetch_size": fetch_size,
            "columnar": columnar,
        }
    )
    resp = self._request(
        "post",
        "_sql?format=json",
        payload=payload,
        verbose=verbose,
    )
    rcols = resp.result.get("columns", None)
    while resp.succeeded:
        cols = [x["name"] for x in rcols]
        rdf = None
        if "values" in resp.result:  # columnar==True
            rdf = pd.DataFrame(resp.result["values"]).T
            rdf.columns = cols
        elif "rows" in resp.result:  # columnar==False
            rdf = pd.DataFrame(resp.result["rows"], columns=cols)
        if rdf is not None:
            if df is not None:
                df = pd.concat([df, rdf])
            else:
                df = rdf
        rjson = resp.result
        if "cursor" in rjson:
            resp = self._request(
                "post",
                "_sql?format=json",
                json.dumps(
                    {
                        "cursor": rjson["cursor"],
                        "columnar": columnar,
                    }
                ),
                verbose=verbose,
            )
        else:
            break
    if df is not None:
        resp.add_extra("df", df)
    return resp

Functions

build_field_query_dict

dataknobs_utils.elasticsearch_utils.build_field_query_dict

build_field_query_dict(
    fields: Union[str, List[str]], text: str, operator: str | None = None
) -> Dict[str, Any]

Build an Elasticsearch field query to search for text.

Creates either a match query (single field) or multi_match query (multiple fields).

Parameters:

Name Type Description Default
fields Union[str, List[str]]

Field name (str) or list of field names to query.

required
text str

Text to search for.

required
operator str | None

Search operator (e.g., "AND", "OR"). Uses Elasticsearch default if None. Defaults to None.

None

Returns:

Type Description
Dict[str, Any]

Dict[str, Any]: Elasticsearch query dictionary.

Source code in packages/utils/src/dataknobs_utils/elasticsearch_utils.py
def build_field_query_dict(
    fields: Union[str, List[str]], text: str, operator: str | None = None
) -> Dict[str, Any]:
    """Build an Elasticsearch field query to search for text.

    Creates either a match query (single field) or multi_match query (multiple fields).

    Args:
        fields: Field name (str) or list of field names to query.
        text: Text to search for.
        operator: Search operator (e.g., "AND", "OR"). Uses Elasticsearch
            default if None. Defaults to None.

    Returns:
        Dict[str, Any]: Elasticsearch query dictionary.
    """
    rv: Dict[str, Any]
    if isinstance(fields, str) or len(fields) == 1:  # single match
        if not isinstance(fields, str):
            fields = fields[0]
        field_dict = {"query": text}
        if operator:
            field_dict["operator"] = operator
        rv = {
            "query": {
                "match": {
                    fields: field_dict,
                }
            }
        }
    else:  # multi-match
        rv = {
            "query": {
                "multi_match": {
                    "query": text,
                    "fields": fields,
                }
            }
        }
    return rv
build_phrase_query_dict

dataknobs_utils.elasticsearch_utils.build_phrase_query_dict

build_phrase_query_dict(
    field: str, phrase: str, slop: int = 0
) -> Dict[str, Any]

Build an Elasticsearch phrase query with slop tolerance.

Parameters:

Name Type Description Default
field str

Field name to query.

required
phrase str

Exact phrase to search for.

required
slop int

Maximum number of positions between terms. Defaults to 0 (exact match).

0

Returns:

Type Description
Dict[str, Any]

Dict[str, Any]: Elasticsearch match_phrase query dictionary.

Source code in packages/utils/src/dataknobs_utils/elasticsearch_utils.py
def build_phrase_query_dict(field: str, phrase: str, slop: int = 0) -> Dict[str, Any]:
    """Build an Elasticsearch phrase query with slop tolerance.

    Args:
        field: Field name to query.
        phrase: Exact phrase to search for.
        slop: Maximum number of positions between terms. Defaults to 0 (exact match).

    Returns:
        Dict[str, Any]: Elasticsearch match_phrase query dictionary.
    """
    return {
        "query": {
            "match_phrase": {
                field: {
                    "query": phrase,
                    "slop": slop,
                }
            }
        }
    }
build_hits_dataframe

dataknobs_utils.elasticsearch_utils.build_hits_dataframe

build_hits_dataframe(query_result: Dict[str, Any]) -> pd.DataFrame | None

Extract search hits from Elasticsearch query results as DataFrame.

Parameters:

Name Type Description Default
query_result Dict[str, Any]

Elasticsearch query response dictionary.

required

Returns:

Type Description
DataFrame | None

pd.DataFrame | None: DataFrame with _source fields from hits, or None if no hits found.

Source code in packages/utils/src/dataknobs_utils/elasticsearch_utils.py
def build_hits_dataframe(query_result: Dict[str, Any]) -> pd.DataFrame | None:
    """Extract search hits from Elasticsearch query results as DataFrame.

    Args:
        query_result: Elasticsearch query response dictionary.

    Returns:
        pd.DataFrame | None: DataFrame with _source fields from hits, or None
            if no hits found.
    """
    df = None
    if "hits" in query_result:
        qr_hits = query_result["hits"]
        if "hits" in qr_hits:
            hits = qr_hits["hits"]
            dicts = [[hit["_source"]] for hit in hits]
            df = pd_utils.dicts2df(dicts, item_id=None)
    return df
build_aggs_dataframe

dataknobs_utils.elasticsearch_utils.build_aggs_dataframe

build_aggs_dataframe(query_result: Dict[str, Any]) -> pd.DataFrame | None

Extract aggregations from Elasticsearch query results as DataFrame.

Parameters:

Name Type Description Default
query_result Dict[str, Any]

Elasticsearch query response dictionary.

required

Returns:

Type Description
DataFrame | None

pd.DataFrame | None: DataFrame with aggregation results (not yet implemented).

Note

This function is a placeholder and currently returns None.

Source code in packages/utils/src/dataknobs_utils/elasticsearch_utils.py
def build_aggs_dataframe(query_result: Dict[str, Any]) -> pd.DataFrame | None:
    """Extract aggregations from Elasticsearch query results as DataFrame.

    Args:
        query_result: Elasticsearch query response dictionary.

    Returns:
        pd.DataFrame | None: DataFrame with aggregation results (not yet implemented).

    Note:
        This function is a placeholder and currently returns None.
    """
    # TODO: implement this
    return None
decode_results

dataknobs_utils.elasticsearch_utils.decode_results

decode_results(query_result: Dict[str, Any]) -> Dict[str, pd.DataFrame]

Decode Elasticsearch query results into DataFrames.

Parameters:

Name Type Description Default
query_result Dict[str, Any]

Elasticsearch query response dictionary.

required

Returns:

Type Description
Dict[str, DataFrame]

Dict[str, pd.DataFrame]: Dictionary with "hits_df" and/or "aggs_df" keys containing result DataFrames (only present if data exists).

Source code in packages/utils/src/dataknobs_utils/elasticsearch_utils.py
def decode_results(query_result: Dict[str, Any]) -> Dict[str, pd.DataFrame]:
    """Decode Elasticsearch query results into DataFrames.

    Args:
        query_result: Elasticsearch query response dictionary.

    Returns:
        Dict[str, pd.DataFrame]: Dictionary with "hits_df" and/or "aggs_df" keys
            containing result DataFrames (only present if data exists).
    """
    result = {}
    hits_df = build_hits_dataframe(query_result)
    if hits_df is not None:
        result["hits_df"] = hits_df
    aggs_df = build_aggs_dataframe(query_result)
    if aggs_df is not None:
        result["aggs_df"] = aggs_df
    return result
add_batch_data

dataknobs_utils.elasticsearch_utils.add_batch_data

add_batch_data(
    batchfile: TextIO,
    record_generator: Any,
    idx_name: str,
    source_id_fieldname: str = "id",
    cur_id: int = 1,
) -> int

Write records to Elasticsearch bulk load batch file.

Generates newline-delimited JSON (NDJSON) format for Elasticsearch bulk API, with alternating action/source lines for each record.

Parameters:

Name Type Description Default
batchfile TextIO

File handle for writing batch data.

required
record_generator Any

Generator yielding source record dictionaries to index.

required
idx_name str

Elasticsearch index name for these records.

required
source_id_fieldname str

If non-empty, adds/updates this field in source records with the document ID. Defaults to "id".

'id'
cur_id int

Starting document ID. Defaults to 1.

1

Returns:

Name Type Description
int int

Next available document ID after processing all records.

Source code in packages/utils/src/dataknobs_utils/elasticsearch_utils.py
def add_batch_data(
    batchfile: TextIO,
    record_generator: Any,
    idx_name: str,
    source_id_fieldname: str = "id",
    cur_id: int = 1,
) -> int:
    """Write records to Elasticsearch bulk load batch file.

    Generates newline-delimited JSON (NDJSON) format for Elasticsearch bulk API,
    with alternating action/source lines for each record.

    Args:
        batchfile: File handle for writing batch data.
        record_generator: Generator yielding source record dictionaries to index.
        idx_name: Elasticsearch index name for these records.
        source_id_fieldname: If non-empty, adds/updates this field in source
            records with the document ID. Defaults to "id".
        cur_id: Starting document ID. Defaults to 1.

    Returns:
        int: Next available document ID after processing all records.
    """
    for record in record_generator:
        if source_id_fieldname:
            record[source_id_fieldname] = cur_id
        action = {
            "index": {
                "_index": idx_name,
                "_id": cur_id,
            }
        }
        print(json.dumps(action), file=batchfile)
        print(json.dumps(record), file=batchfile, flush=True)
        cur_id += 1
    return cur_id
batchfile_record_generator

dataknobs_utils.elasticsearch_utils.batchfile_record_generator

batchfile_record_generator(batchfile_path: str) -> Generator[Any, None, None]

Generate records from an Elasticsearch bulk load batch file.

Parses NDJSON batch files, yielding only source documents (skipping action lines).

Parameters:

Name Type Description Default
batchfile_path str

Path to the Elasticsearch batch file.

required

Yields:

Type Description
Any

Dict[str, Any]: Each source record dictionary from the batch file.

Source code in packages/utils/src/dataknobs_utils/elasticsearch_utils.py
def batchfile_record_generator(batchfile_path: str) -> Generator[Any, None, None]:
    """Generate records from an Elasticsearch bulk load batch file.

    Parses NDJSON batch files, yielding only source documents (skipping action lines).

    Args:
        batchfile_path: Path to the Elasticsearch batch file.

    Yields:
        Dict[str, Any]: Each source record dictionary from the batch file.
    """
    with open(batchfile_path, encoding="utf-8") as f:
        for line in f:
            if not line.startswith('{"index":'):
                yield json.loads(line)
collect_batchfile_values

dataknobs_utils.elasticsearch_utils.collect_batchfile_values

collect_batchfile_values(
    batchfile_path: str, fieldname: str, default_value: Any = ""
) -> List[Any]

Collect all values for a specific field from batch file records.

Parameters:

Name Type Description Default
batchfile_path str

Path to the Elasticsearch batch file.

required
fieldname str

Name of the field whose values to collect.

required
default_value Any

Value to use when field doesn't exist in a record. Defaults to "".

''

Returns:

Type Description
List[Any]

List[Any]: List of field values from all records.

Source code in packages/utils/src/dataknobs_utils/elasticsearch_utils.py
def collect_batchfile_values(
    batchfile_path: str, fieldname: str, default_value: Any = ""
) -> List[Any]:
    """Collect all values for a specific field from batch file records.

    Args:
        batchfile_path: Path to the Elasticsearch batch file.
        fieldname: Name of the field whose values to collect.
        default_value: Value to use when field doesn't exist in a record.
            Defaults to "".

    Returns:
        List[Any]: List of field values from all records.
    """
    values = []
    for ldata in batchfile_record_generator(batchfile_path):
        values.append(ldata.get(fieldname, default_value))
    return values
collect_batchfile_records

dataknobs_utils.elasticsearch_utils.collect_batchfile_records

collect_batchfile_records(batchfile_path: str) -> pd.DataFrame

Load all batch file records into a pandas DataFrame.

Parameters:

Name Type Description Default
batchfile_path str

Path to the Elasticsearch batch file.

required

Returns:

Type Description
DataFrame

pd.DataFrame: DataFrame containing all records from the batch file.

Source code in packages/utils/src/dataknobs_utils/elasticsearch_utils.py
def collect_batchfile_records(batchfile_path: str) -> pd.DataFrame:
    """Load all batch file records into a pandas DataFrame.

    Args:
        batchfile_path: Path to the Elasticsearch batch file.

    Returns:
        pd.DataFrame: DataFrame containing all records from the batch file.
    """
    records = []
    for record in batchfile_record_generator(batchfile_path):
        records.append(record)
    return pd.DataFrame(records)

file_utils

Functions

filepath_generator

dataknobs_utils.file_utils.filepath_generator

filepath_generator(
    rootpath: str,
    descend: bool = True,
    seen: Set[str] | None = None,
    files_only: bool = True,
) -> Generator[str, None, None]

Generate all filepaths under the root path.

Parameters:

Name Type Description Default
rootpath str

The root path under which to find files.

required
descend bool

True to descend into subdirectories. Defaults to True.

True
seen Set[str] | None

Set of filepaths and/or directories to ignore. Defaults to None.

None
files_only bool

True to generate only paths to files; False to include paths to directories. Defaults to True.

True

Yields:

Name Type Description
str str

Each file path found under the root path.

Source code in packages/utils/src/dataknobs_utils/file_utils.py
def filepath_generator(
    rootpath: str,
    descend: bool = True,
    seen: Set[str] | None = None,
    files_only: bool = True,
) -> Generator[str, None, None]:
    """Generate all filepaths under the root path.

    Args:
        rootpath: The root path under which to find files.
        descend: True to descend into subdirectories. Defaults to True.
        seen: Set of filepaths and/or directories to ignore. Defaults to None.
        files_only: True to generate only paths to files; False to include
            paths to directories. Defaults to True.

    Yields:
        str: Each file path found under the root path.
    """
    if seen is None:
        seen = set()
    seen.add(rootpath)
    for root, dirs, files in os.walk(rootpath, topdown=True):
        if not descend and root != rootpath and root in seen:
            break
        for name in files:
            fpath = str(Path(root) / name)
            if fpath not in seen:
                seen.add(fpath)
                yield fpath
        if not descend or not files_only:
            for name in dirs:
                next_root = str(Path(root) / name)
                if next_root not in seen:
                    seen.add(next_root)
                    yield next_root
fileline_generator

dataknobs_utils.file_utils.fileline_generator

fileline_generator(
    filename: str, rootdir: str | None = None
) -> Generator[str, None, None]

Generate lines from the file.

Automatically handles both plain text and gzip-compressed files based on the .gz extension. All lines are stripped of leading/trailing whitespace.

Parameters:

Name Type Description Default
filename str

The name of the file to read.

required
rootdir str | None

Optional directory path to prepend to filename. Defaults to None.

None

Yields:

Name Type Description
str str

Each stripped line from the file.

Source code in packages/utils/src/dataknobs_utils/file_utils.py
def fileline_generator(filename: str, rootdir: str | None = None) -> Generator[str, None, None]:
    """Generate lines from the file.

    Automatically handles both plain text and gzip-compressed files based on
    the .gz extension. All lines are stripped of leading/trailing whitespace.

    Args:
        filename: The name of the file to read.
        rootdir: Optional directory path to prepend to filename. Defaults to None.

    Yields:
        str: Each stripped line from the file.
    """
    if rootdir is not None:
        filename = str(Path(rootdir) / filename)
    if filename.endswith(".gz"):
        with gzip.open(filename, mode="rt", encoding="utf-8") as f:
            for line in f:
                yield line.strip()
    else:
        with open(filename, encoding="utf-8") as f:
            for line in f:
                yield line.strip()
write_lines

dataknobs_utils.file_utils.write_lines

write_lines(outfile: str, lines: List[str], rootdir: str | None = None) -> None

Write lines to a file in sorted order.

Automatically handles both plain text and gzip-compressed files based on the .gz extension. Lines are sorted before writing.

Parameters:

Name Type Description Default
outfile str

The name of the output file.

required
lines List[str]

List of lines to write to the file.

required
rootdir str | None

Optional directory path to prepend to outfile. Defaults to None.

None
Source code in packages/utils/src/dataknobs_utils/file_utils.py
def write_lines(outfile: str, lines: List[str], rootdir: str | None = None) -> None:
    """Write lines to a file in sorted order.

    Automatically handles both plain text and gzip-compressed files based on
    the .gz extension. Lines are sorted before writing.

    Args:
        outfile: The name of the output file.
        lines: List of lines to write to the file.
        rootdir: Optional directory path to prepend to outfile. Defaults to None.
    """
    if rootdir is not None:
        outfile = str(Path(rootdir) / outfile)
    if outfile.endswith(".gz"):
        with gzip.open(outfile, mode="wt", encoding="utf-8") as f:
            for line in sorted(lines):
                print(line, file=f)
    else:
        with open(outfile, mode="w", encoding="utf-8") as f:
            for line in sorted(lines):
                print(line, file=f)
is_gzip_file

dataknobs_utils.file_utils.is_gzip_file

is_gzip_file(filepath: str) -> bool

Determine whether a file is gzip-compressed.

Checks the file's magic number (first 3 bytes) to identify gzip format.

Parameters:

Name Type Description Default
filepath str

Path to the file to check.

required

Returns:

Name Type Description
bool bool

True if the file is gzip-compressed, False otherwise.

Source code in packages/utils/src/dataknobs_utils/file_utils.py
def is_gzip_file(filepath: str) -> bool:
    """Determine whether a file is gzip-compressed.

    Checks the file's magic number (first 3 bytes) to identify gzip format.

    Args:
        filepath: Path to the file to check.

    Returns:
        bool: True if the file is gzip-compressed, False otherwise.
    """
    is_gzip = False
    if os.path.exists(filepath):
        with open(filepath, "rb") as f:
            b = f.read(3)
            is_gzip = b == b"\x1f\x8b\x08"
    return is_gzip

json_utils

Functions

dataknobs_utils.json_utils

Utility functions for JSON processing, streaming, and manipulation.

Provides functions for working with JSON data including nested value access, streaming from files and URLs, and tree-based JSON structure operations.

Classes:

Name Description
ArrayElementAcceptStrategy

Strategy that groups paths by array element at a specific nesting level.

GroupAcceptStrategy

Abstract strategy for determining if a Path belongs in a PathGroup.

JsonSchema

Schema representation of JSON structure with type statistics.

JsonSchemaBuilder

Build a schema view of JSON data by streaming and analyzing structure.

Path

Container for a jq path with its value and optional line number.

PathGroup

Container for a group of related paths.

PathSorter

Sort and group paths into records based on an acceptance strategy.

RecordPathBuilder

Build and output records from JSON by grouping paths.

ValuePath

Structure to hold compressed information about paths to a unique value.

ValuesIndex

Index of unique values organized by their jq paths.

Functions:

Name Description
build_jq_path

Build a jq path string from a json_stream path tuple.

build_path_tuple

Build a json_stream tuple path from a jq path string.

collect_squashed

Collect squashed JSON data into a dictionary.

explode

Explode a squashed JSON dictionary back into nested structure.

get_records_df

Collect top-level JSON records into a pandas DataFrame.

get_value

Get a value from a JSON object using a key path in indexed dot notation.

indexing_format_fn

Format a (jq_path, item) pair for indexing purposes.

indexing_format_splitter

Parse a line formatted by indexing_format_fn.

path_to_dict

Convert a jq path and value into a nested dictionary structure.

squash_data

Squash JSON data into single-level structure with jq-style keys.

stream_jq_paths

Stream JSON data and write formatted lines for each (jq_path, item) pair.

stream_json_data

Stream JSON data and call a visitor function for each value.

stream_record_paths

Stream JSON and write formatted records grouped by top-level structure.

write_squashed

Write squashed JSON data to a file.

Classes

ArrayElementAcceptStrategy

ArrayElementAcceptStrategy(max_array_level: int = -1)

Bases: GroupAcceptStrategy

Strategy that groups paths by array element at a specific nesting level.

Creates record boundaries at array elements, treating each element as a distinct record with paths that share the same array indices up to the specified level.

Attributes:

Name Type Description
max_array_level

Array nesting level at which to create records.

ref_path Path | None

Reference path for matching subsequent paths.

Initialize the array element grouping strategy.

Parameters:

Name Type Description Default
max_array_level int

Array nesting depth for record boundaries: - -1: Ignore array levels (accept all) - 0: New record at first (top-level) array - 1: New record at second array level - etc.

-1
Source code in packages/utils/src/dataknobs_utils/json_utils.py
def __init__(self, max_array_level: int = -1):
    """Initialize the array element grouping strategy.

    Args:
        max_array_level: Array nesting depth for record boundaries:
            - -1: Ignore array levels (accept all)
            - 0: New record at first (top-level) array
            - 1: New record at second array level
            - etc.
    """
    self.max_array_level = max_array_level
    self.ref_path: Path | None = None
Functions

GroupAcceptStrategy

Bases: ABC

Abstract strategy for determining if a Path belongs in a PathGroup.

Defines the logic for accepting paths as either main paths (defining the record structure) or distributed paths (shared across records).

Methods:

Name Description
accept_path

Determine if and how a path should be added to the group.

Functions
accept_path abstractmethod
accept_path(
    path: Path, group: PathGroup, distribute: bool = False
) -> str | None

Determine if and how a path should be added to the group.

Parameters:

Name Type Description Default
path Path

The path to evaluate for acceptance.

required
group PathGroup

The group that would receive the path.

required
distribute bool

If True, path is proposed as a distributed (shared) path rather than a main (record-defining) path.

False

Returns:

Type Description
str | None

str | None: One of: - 'main': Accept as main path (record structure) - 'distributed': Accept as distributed path (shared data) - None: Reject the path

Source code in packages/utils/src/dataknobs_utils/json_utils.py
@abstractmethod
def accept_path(self, path: Path, group: "PathGroup", distribute: bool = False) -> str | None:
    """Determine if and how a path should be added to the group.

    Args:
        path: The path to evaluate for acceptance.
        group: The group that would receive the path.
        distribute: If True, path is proposed as a distributed (shared) path
            rather than a main (record-defining) path.

    Returns:
        str | None: One of:
            - 'main': Accept as main path (record structure)
            - 'distributed': Accept as distributed path (shared data)
            - None: Reject the path
    """
    raise NotImplementedError

JsonSchema

JsonSchema(
    schema: Dict[str, Any] | None = None,
    values: ValuesIndex | None = None,
    values_limit: int = 0,
)

Schema representation of JSON structure with type statistics.

Maintains a mapping of jq paths to value types and their occurrence counts, with optional tracking of unique values at each path.

Schema format

{ : { : , ... }, ... }

Where value_type is the type of value (e.g., 'int', 'float', 'str') and value_count is the number of times that type occurs at the path.

Attributes:

Name Type Description
schema

Mapping of jq_path to type counts.

values

Optional ValuesIndex for tracking unique values.

Initialize a JsonSchema.

Parameters:

Name Type Description Default
schema Dict[str, Any] | None

Pre-existing schema to reconstruct from. Defaults to None.

None
values ValuesIndex | None

Pre-existing ValuesIndex to include. Defaults to None.

None
values_limit int

Maximum unique values to track per path. If 0, tracks all unique values. Defaults to 0.

0

Methods:

Name Description
add_path

Add an occurrence of a value type at a jq path.

extract_values

Extract all values at a jq path from JSON data.

Source code in packages/utils/src/dataknobs_utils/json_utils.py
def __init__(
    self,
    schema: Dict[str, Any] | None = None,
    values: ValuesIndex | None = None,
    values_limit: int = 0,  # max number of unique values to keep
):
    """Initialize a JsonSchema.

    Args:
        schema: Pre-existing schema to reconstruct from. Defaults to None.
        values: Pre-existing ValuesIndex to include. Defaults to None.
        values_limit: Maximum unique values to track per path. If 0, tracks
            all unique values. Defaults to 0.
    """
    self.schema = schema if schema is not None else {}
    self.values = ValuesIndex() if values is None else values
    self._values_limit = values_limit
    self._df: pd.DataFrame | None = None
Attributes
df property
df: DataFrame

Get schema information as a DataFrame with columns:

jq_path value_type value_count

Functions
add_path
add_path(
    jq_path: str,
    value_type: str,
    value: Any = None,
    path: Tuple[Any, ...] | None = None,
) -> None

Add an occurrence of a value type at a jq path.

Parameters:

Name Type Description Default
jq_path str

The jq-style path identifying the location.

required
value_type str

The type of value at this path (e.g., 'int', 'str').

required
value Any

Optional actual value for unique value tracking.

None
path Tuple[Any, ...] | None

Optional full path tuple for tracking value occurrences.

None
Source code in packages/utils/src/dataknobs_utils/json_utils.py
def add_path(
    self,
    jq_path: str,
    value_type: str,
    value: Any = None,
    path: Tuple[Any, ...] | None = None,
) -> None:
    """Add an occurrence of a value type at a jq path.

    Args:
        jq_path: The jq-style path identifying the location.
        value_type: The type of value at this path (e.g., 'int', 'str').
        value: Optional actual value for unique value tracking.
        path: Optional full path tuple for tracking value occurrences.
    """
    if jq_path not in self.schema:
        self.schema[jq_path] = {value_type: 1}
    elif value_type not in self.schema[jq_path]:
        self.schema[jq_path][value_type] = 1
    else:
        self.schema[jq_path][value_type] += 1
    if value is not None:
        if self._values_limit == 0 or self.values.num_values(jq_path) < self._values_limit:
            self.values.add(value, jq_path, path=path)
    self._df = None
extract_values
extract_values(
    jq_path: str, json_data: str, unique: bool = True, timeout: int = TIMEOUT
) -> Union[List[Any], Set[Any]]

Extract all values at a jq path from JSON data.

Parameters:

Name Type Description Default
jq_path str

The jq-style path to extract values from.

required
json_data str

JSON data source (file path, URL, or JSON string).

required
unique bool

If True, returns only unique values as a set. If False, returns all values as a list. Defaults to True.

True
timeout int

Request timeout in seconds for URL sources. Defaults to 10.

TIMEOUT

Returns:

Type Description
Union[List[Any], Set[Any]]

Union[List[Any], Set[Any]]: Set of unique values if unique=True, otherwise list of all values.

Source code in packages/utils/src/dataknobs_utils/json_utils.py
def extract_values(
    self,
    jq_path: str,
    json_data: str,
    unique: bool = True,
    timeout: int = TIMEOUT,
) -> Union[List[Any], Set[Any]]:
    """Extract all values at a jq path from JSON data.

    Args:
        jq_path: The jq-style path to extract values from.
        json_data: JSON data source (file path, URL, or JSON string).
        unique: If True, returns only unique values as a set.
            If False, returns all values as a list. Defaults to True.
        timeout: Request timeout in seconds for URL sources. Defaults to 10.

    Returns:
        Union[List[Any], Set[Any]]: Set of unique values if unique=True,
            otherwise list of all values.
    """
    sresult = set()
    lresult = []

    def visitor(item: Any, path: Tuple[Any, ...]) -> None:
        cur_jq_path = build_jq_path(path, keep_list_idxs=False)
        if jq_path == cur_jq_path:
            if unique:
                sresult.add(item)
            else:
                lresult.append(item)

    stream_json_data(json_data, visitor, timeout=timeout)
    return sresult if unique else lresult

JsonSchemaBuilder

JsonSchemaBuilder(
    json_data: str,
    value_typer: Callable[[Any], str] | None = None,
    keep_unique_values: bool = False,
    invert_uniques: bool = False,
    keep_list_idxs: bool = False,
    timeout: int = TIMEOUT,
    empty_dict_type: str = "_EMPTY_DICT_",
    empty_list_type: str = "_EMPTY_LIST_",
    unk_value_type: str = "_UNKNOWN_",
    int_value_type: str = "int",
    float_value_type: str = "float",
    str_value_type: str = "str",
    url_value_type: str = "URL",
    on_add: Callable[[str], bool] | None = None,
)

Build a schema view of JSON data by streaming and analyzing structure.

Processes JSON data to extract type information, value statistics, and optionally track unique values at each path.

Initialize a JsonSchemaBuilder to analyze JSON structure.

Parameters:

Name Type Description Default
json_data str

JSON data source (file path, URL, or JSON string).

required
value_typer Callable[[Any], str] | None

Optional custom function that takes a value and returns its type string, overriding default type detection.

None
keep_unique_values bool

If True or an integer, tracks unique values. If int, limits tracking to that many unique values per path.

False
invert_uniques bool

If True, maintains reverse index from values to paths.

False
keep_list_idxs bool

If True, preserves exact array indices in paths. If False, generalizes all indices to '[]'.

False
timeout int

Request timeout in seconds for URL sources. Defaults to 10.

TIMEOUT
empty_dict_type str

Type name for empty dictionaries.

'_EMPTY_DICT_'
empty_list_type str

Type name for empty lists.

'_EMPTY_LIST_'
unk_value_type str

Type name for unclassified values.

'_UNKNOWN_'
int_value_type str

Type name for integers.

'int'
float_value_type str

Type name for floats.

'float'
str_value_type str

Type name for strings.

'str'
url_value_type str

Type name for URL strings, or None to treat as regular strings.

'URL'
on_add Callable[[str], bool] | None

Optional filter function called before adding each path. Takes jq_path and returns True to include or False to skip.

None

Attributes:

Name Type Description
partial_schema JsonSchema

Get the current, possibly incomplete, schema

schema JsonSchema

Get the schema for the json data

Source code in packages/utils/src/dataknobs_utils/json_utils.py
def __init__(
    self,
    json_data: str,
    value_typer: Callable[[Any], str] | None = None,
    keep_unique_values: bool = False,
    invert_uniques: bool = False,
    keep_list_idxs: bool = False,
    timeout: int = TIMEOUT,
    empty_dict_type: str = "_EMPTY_DICT_",
    empty_list_type: str = "_EMPTY_LIST_",
    unk_value_type: str = "_UNKNOWN_",
    int_value_type: str = "int",
    float_value_type: str = "float",
    str_value_type: str = "str",
    url_value_type: str = "URL",
    on_add: Callable[[str], bool] | None = None,
):
    """Initialize a JsonSchemaBuilder to analyze JSON structure.

    Args:
        json_data: JSON data source (file path, URL, or JSON string).
        value_typer: Optional custom function that takes a value and returns
            its type string, overriding default type detection.
        keep_unique_values: If True or an integer, tracks unique values.
            If int, limits tracking to that many unique values per path.
        invert_uniques: If True, maintains reverse index from values to paths.
        keep_list_idxs: If True, preserves exact array indices in paths.
            If False, generalizes all indices to '[]'.
        timeout: Request timeout in seconds for URL sources. Defaults to 10.
        empty_dict_type: Type name for empty dictionaries.
        empty_list_type: Type name for empty lists.
        unk_value_type: Type name for unclassified values.
        int_value_type: Type name for integers.
        float_value_type: Type name for floats.
        str_value_type: Type name for strings.
        url_value_type: Type name for URL strings, or None to treat as regular strings.
        on_add: Optional filter function called before adding each path.
            Takes jq_path and returns True to include or False to skip.
    """
    self.json_data = json_data
    self.value_typer = value_typer
    self.keep_uniques = keep_unique_values
    self.values_limit = 0 if isinstance(keep_unique_values, bool) else int(keep_unique_values)
    self.invert_uniques = invert_uniques
    self.keep_list_idxs = keep_list_idxs
    self.timeout = timeout
    self.empty_dict_type = empty_dict_type
    self.empty_list_type = empty_list_type
    self.unk_value_type = unk_value_type
    self.int_value_type = int_value_type
    self.float_value_type = float_value_type
    self.str_value_type = str_value_type
    self.url_value_type = url_value_type
    self._on_add = on_add
    self._schema = JsonSchema(values_limit=self.values_limit)
    self._built_schema = False
Attributes
partial_schema property
partial_schema: JsonSchema

Get the current, possibly incomplete, schema

schema property
schema: JsonSchema

Get the schema for the json data

Functions

Path

Path(jq_path: str, item: Any, line_num: int = -1)

Container for a jq path with its value and optional line number.

Represents a single path/value pair from JSON data, with optional tracking of the original line number for streaming scenarios.

Attributes:

Name Type Description
jq_path

Fully-qualified jq-style path (e.g., '.data[0].name').

item

The value at this path.

line_num

Optional line number from streaming (-1 if not tracked).

Initialize a Path.

Parameters:

Name Type Description Default
jq_path str

Fully-qualified jq-style path with indices.

required
item Any

The value at this path.

required
line_num int

Optional line number for ordering. Defaults to -1.

-1
Source code in packages/utils/src/dataknobs_utils/json_utils.py
def __init__(self, jq_path: str, item: Any, line_num: int = -1):
    """Initialize a Path.

    Args:
        jq_path: Fully-qualified jq-style path with indices.
        item: The value at this path.
        line_num: Optional line number for ordering. Defaults to -1.
    """
    self.jq_path = jq_path
    self.item = item
    self.line_num = line_num
    self._path_elts: List[str] | None = None  # jq_path.split('.')
    self._len: int | None = None  # Number of path elements
Attributes
path_elts property
path_elts: List[str]

Get this path's (index-qualified) elements

size property
size: int

Get the number of path_elements in this path.

Functions

PathGroup

PathGroup(accept_strategy: GroupAcceptStrategy, first_path: Path | None = None)

Container for a group of related paths.

Methods:

Name Description
accept

Add the path if it belongs in this group.

as_dict

Reconstruct the object from the paths

incorporate_paths

Incorporate (distribute) the group's appliccable paths into this group.

Attributes:

Name Type Description
num_distributed_paths int

Get the number of distributed paths in this group

num_main_paths int

Get the number of main paths in this group

paths List[Path]

Get all paths (both main and distributed)

size int

Get the total number of paths in this group.

Source code in packages/utils/src/dataknobs_utils/json_utils.py
def __init__(self, accept_strategy: GroupAcceptStrategy, first_path: Path | None = None):
    self._all_paths: List[Path] | None = None
    self.main_paths: Set[Path] | None = None
    self.distributed_paths: Set[Path] | None = None
    self.accept_strategy = accept_strategy
    if first_path is not None:
        self.accept(first_path, distribute=False)
Attributes
num_distributed_paths property
num_distributed_paths: int

Get the number of distributed paths in this group

num_main_paths property
num_main_paths: int

Get the number of main paths in this group

paths property
paths: List[Path]

Get all paths (both main and distributed)

size property
size: int

Get the total number of paths in this group.

Functions
accept
accept(path: Path, distribute: bool = False) -> bool

Add the path if it belongs in this group. :param path: The path to (potentially) add. :param distribute: True to propose the path as a distributed path :return: True if the path was accepted and added.

Source code in packages/utils/src/dataknobs_utils/json_utils.py
def accept(self, path: Path, distribute: bool = False) -> bool:
    """Add the path if it belongs in this group.
    :param path: The path to (potentially) add.
    :param distribute: True to propose the path as a distributed path
    :return: True if the path was accepted and added.
    """
    added = False
    add_type = self.accept_strategy.accept_path(path, self, distribute=distribute)
    if add_type is not None:
        if add_type == "main":
            if self.main_paths is None:
                self.main_paths = {path}
            else:
                self.main_paths.add(path)
        elif self.distributed_paths is None:
            self.distributed_paths = {path}
        else:
            self.distributed_paths.add(path)
        added = True
        self._all_paths = None
    return added
as_dict
as_dict() -> Dict[str, Any]

Reconstruct the object from the paths

Source code in packages/utils/src/dataknobs_utils/json_utils.py
def as_dict(self) -> Dict[str, Any]:
    """Reconstruct the object from the paths"""
    d: Dict[str, Any] = {}
    if self.paths is not None:
        for path in self.paths:
            path_to_dict(path.jq_path, path.item, result=d)
    return d
incorporate_paths
incorporate_paths(group: PathGroup) -> None

Incorporate (distribute) the group's appliccable paths into this group.

Source code in packages/utils/src/dataknobs_utils/json_utils.py
def incorporate_paths(self, group: "PathGroup") -> None:
    """Incorporate (distribute) the group's appliccable paths into this group."""
    for path in group.paths:
        self.accept(path, distribute=True)

PathSorter

PathSorter(
    accept_strategy: GroupAcceptStrategy,
    group_size: int = 0,
    max_groups: int = 0,
)

Sort and group paths into records based on an acceptance strategy.

Manages the incremental grouping of paths as they're streamed, creating record boundaries according to the acceptance strategy and enforcing size constraints.

Attributes:

Name Type Description
accept_strategy

Strategy for determining path membership.

group_size

Minimum group size (groups below this are dropped).

max_groups

Maximum groups kept in memory.

groups List[PathGroup] | None

List of active PathGroup instances.

Initialize a PathSorter.

Parameters:

Name Type Description Default
accept_strategy GroupAcceptStrategy

Strategy for determining how paths are grouped.

required
group_size int

Minimum required group size. Groups smaller than this are silently dropped when closed. 0 means no minimum. Defaults to 0.

0
max_groups int

Maximum groups kept in memory. Older groups are dropped when limit is reached. 0 means unlimited. Defaults to 0.

0

Methods:

Name Description
accept_path

Try to add a path to any existing group.

add_path

Add a path to the appropriate group, creating new groups as needed.

all_groups_have_size

Check if all groups have a specific size.

close_group

Close a group, finalizing its contents.

Source code in packages/utils/src/dataknobs_utils/json_utils.py
def __init__(
    self,
    accept_strategy: GroupAcceptStrategy,
    group_size: int = 0,
    max_groups: int = 0,
):
    """Initialize a PathSorter.

    Args:
        accept_strategy: Strategy for determining how paths are grouped.
        group_size: Minimum required group size. Groups smaller than this
            are silently dropped when closed. 0 means no minimum. Defaults to 0.
        max_groups: Maximum groups kept in memory. Older groups are dropped
            when limit is reached. 0 means unlimited. Defaults to 0.
    """
    self.accept_strategy = accept_strategy
    self.group_size = group_size
    # NOTE: Must keep at least 3 groups if keeping any for propagating
    #      distributed paths.
    self.max_groups = max_groups if max_groups <= 0 else max(3, max_groups)
    self.groups: List[PathGroup] | None = None
Attributes
num_groups property
num_groups: int

Get the number of groups

Functions
accept_path
accept_path(path: Path) -> bool

Try to add a path to any existing group.

Parameters:

Name Type Description Default
path Path

The path to add.

required

Returns:

Name Type Description
bool bool

True if the path was accepted by a group, False otherwise.

Source code in packages/utils/src/dataknobs_utils/json_utils.py
def accept_path(self, path: Path) -> bool:
    """Try to add a path to any existing group.

    Args:
        path: The path to add.

    Returns:
        bool: True if the path was accepted by a group, False otherwise.
    """
    if self.groups is not None:
        for group in self.groups:
            if group.accept(path):
                return True
    return False
add_path
add_path(path: Path) -> PathGroup | None

Add a path to the appropriate group, creating new groups as needed.

Paths are added to the most recent group if accepted. When a path is rejected, the current group is closed and a new group is created.

Parameters:

Name Type Description Default
path Path

The path to add to a group.

required

Returns:

Type Description
PathGroup | None

PathGroup | None: The most recently closed group if one was closed, otherwise None.

Source code in packages/utils/src/dataknobs_utils/json_utils.py
def add_path(self, path: Path) -> PathGroup | None:
    """Add a path to the appropriate group, creating new groups as needed.

    Paths are added to the most recent group if accepted. When a path is
    rejected, the current group is closed and a new group is created.

    Args:
        path: The path to add to a group.

    Returns:
        PathGroup | None: The most recently closed group if one was closed,
            otherwise None.
    """
    result = None
    if self.groups is None or len(self.groups) == 0:
        self.groups = [
            PathGroup(
                accept_strategy=self.accept_strategy,
                first_path=path,
            )
        ]
    else:
        # Assume always "incremental", such that new paths will belong in
        # the latest group
        latest_group = self.groups[-1]
        if not latest_group.accept(path):
            # Time to add a new group
            self.close_group()
            result = latest_group

            # Start a new group
            self.groups.append(
                PathGroup(
                    accept_strategy=self.accept_strategy,
                    first_path=path,
                )
            )

            # Enforce max_group limit by removing groups from the front
            if self.max_groups > 0 and len(self.groups) >= self.max_groups:
                while len(self.groups) >= self.max_groups:
                    self.groups.pop(0)
    return result
all_groups_have_size
all_groups_have_size(group_size: int) -> bool

Check if all groups have a specific size.

Parameters:

Name Type Description Default
group_size int

The size to test for.

required

Returns:

Name Type Description
bool bool

True if all groups have exactly this size, False otherwise.

Source code in packages/utils/src/dataknobs_utils/json_utils.py
def all_groups_have_size(self, group_size: int) -> bool:
    """Check if all groups have a specific size.

    Args:
        group_size: The size to test for.

    Returns:
        bool: True if all groups have exactly this size, False otherwise.
    """
    if self.groups is not None:
        for group in self.groups:
            if group.size != group_size:
                return False
        return True
    return False
close_group
close_group(idx: int = -1, check_size: bool = True) -> PathGroup | None

Close a group, finalizing its contents.

Closing a group involves: 1. Incorporating distributed paths from the previous group 2. Optionally checking size constraints and dropping undersized groups

Parameters:

Name Type Description Default
idx int

Index of group to close (-1 for last). Defaults to -1.

-1
check_size bool

If True, enforces group_size constraint. Defaults to True.

True

Returns:

Type Description
PathGroup | None

PathGroup | None: The closed group, or None if dropped for size.

Source code in packages/utils/src/dataknobs_utils/json_utils.py
def close_group(self, idx: int = -1, check_size: bool = True) -> PathGroup | None:
    """Close a group, finalizing its contents.

    Closing a group involves:
    1. Incorporating distributed paths from the previous group
    2. Optionally checking size constraints and dropping undersized groups

    Args:
        idx: Index of group to close (-1 for last). Defaults to -1.
        check_size: If True, enforces group_size constraint. Defaults to True.

    Returns:
        PathGroup | None: The closed group, or None if dropped for size.
    """
    if self.groups is None or len(self.groups) == 0:
        return None

    if idx == -1:
        idx = len(self.groups) - 1
    latest_group = self.groups[idx]

    # Add distributable lines from the prior group
    if idx > 0:
        latest_group.incorporate_paths(self.groups[idx - 1])

    # Check size constraints
    if check_size and self.group_size > 0 and len(self.groups) > 0:
        if latest_group.size < self.group_size:
            # Last group not "filled" ... need to drop
            self.groups.pop()

    return latest_group

RecordPathBuilder

RecordPathBuilder(
    json_data: str,
    output_stream: TextIO,
    line_builder_fn: Callable[[int, int, str, Any], str],
    timeout: int = TIMEOUT,
)

Build and output records from JSON by grouping paths.

Streams JSON data, groups paths into records using a PathSorter, and writes formatted output for each record.

Attributes:

Name Type Description
jdata

JSON data source.

output_stream

Where to write formatted output.

builder_fn

Function to format record lines.

timeout

Request timeout for URLs.

rec_id

Current record ID counter.

inum

Current item number counter.

sorter PathSorter | None

PathSorter for grouping paths.

Initialize a RecordPathBuilder.

Parameters:

Name Type Description Default
json_data str

JSON data source (file path, URL, or JSON string).

required
output_stream TextIO

Text stream to write formatted lines to.

required
line_builder_fn Callable[[int, int, str, Any], str]

Function with signature fn(rec_id, line_num, jq_path, item) that returns a formatted string.

required
timeout int

Request timeout in seconds for URL sources. Defaults to 10.

TIMEOUT
Source code in packages/utils/src/dataknobs_utils/json_utils.py
def __init__(
    self,
    json_data: str,
    output_stream: TextIO,
    line_builder_fn: Callable[[int, int, str, Any], str],
    timeout: int = TIMEOUT,
):
    """Initialize a RecordPathBuilder.

    Args:
        json_data: JSON data source (file path, URL, or JSON string).
        output_stream: Text stream to write formatted lines to.
        line_builder_fn: Function with signature fn(rec_id, line_num, jq_path, item)
            that returns a formatted string.
        timeout: Request timeout in seconds for URL sources. Defaults to 10.
    """
    self.jdata = json_data
    self.output_stream = output_stream
    self.builder_fn = line_builder_fn
    self.timeout = timeout
    self.rec_id = 0
    self.inum = 0
    self.sorter: PathSorter | None = None
Functions

ValuePath

ValuePath(jq_path: str, value: Any)

Structure to hold compressed information about paths to a unique value.

Stores the tree of array indices leading to each occurrence of a value in JSON data, enabling efficient tracking of where values appear.

Attributes:

Name Type Description
jq_path

The jq-style path template with generic array indices.

value

The value found at this path.

Initialize a ValuePath for tracking occurrences of a value.

Parameters:

Name Type Description Default
jq_path str

The jq-style path template (key).

required
value Any

The value found at this path.

required

Methods:

Name Description
add

Add a path occurrence to the index tree.

path_generator

Generate all concrete paths to this value.

Source code in packages/utils/src/dataknobs_utils/json_utils.py
def __init__(self, jq_path: str, value: Any):
    """Initialize a ValuePath for tracking occurrences of a value.

    Args:
        jq_path: The jq-style path template (key).
        value: The value found at this path.
    """
    self.jq_path = jq_path
    self.value = value
    self._indices = Tree(0).as_string()  # root data will hold total path count
Attributes
path_count property
path_count: int

Get the number of jq_paths to the value.

Functions
add
add(path: Tuple[Any, ...] | None) -> None

Add a path occurrence to the index tree.

Parameters:

Name Type Description Default
path Tuple[Any, ...] | None

Path tuple with the same structure as jq_path, or None if no path information is available.

required
Source code in packages/utils/src/dataknobs_utils/json_utils.py
def add(self, path: Tuple[Any, ...] | None) -> None:
    """Add a path occurrence to the index tree.

    Args:
        path: Path tuple with the same structure as jq_path, or None if
            no path information is available.
    """
    root = self.indices
    node = root
    node.data = int(node.data) + 1  # keep track of total
    if path is None:
        self._indices = root.as_string()
        return
    for elt in path:
        if isinstance(elt, int):
            found = False
            if node.has_children() and node.children is not None:
                # simplifying assumption: idxs are in consecutive order fm 0
                child = node.children[-1]
                if str(elt) == child.data:
                    node = child
                    found = True
            if not found:
                node = node.add_child(elt)
    self._indices = root.as_string()
path_generator
path_generator(result_type: str = 'jq_path') -> Any

Generate all concrete paths to this value.

Parameters:

Name Type Description Default
result_type str

Format for generated paths: - 'jq_path': Generate jq-style path strings (default) - 'path': Generate full path tuples with keys and indices - 'idx': Generate index-only tuples

'jq_path'

Yields:

Type Description
Any

Paths in the requested format for each occurrence of the value.

Source code in packages/utils/src/dataknobs_utils/json_utils.py
def path_generator(self, result_type: str = "jq_path") -> Any:
    """Generate all concrete paths to this value.

    Args:
        result_type: Format for generated paths:
            - 'jq_path': Generate jq-style path strings (default)
            - 'path': Generate full path tuples with keys and indices
            - 'idx': Generate index-only tuples

    Yields:
        Paths in the requested format for each occurrence of the value.
    """
    path = build_path_tuple(self.jq_path, any_list_idx=-1)
    for node in self.indices.collect_terminal_nodes():
        node_path = node.get_path()
        node_idx = 0
        gen_path = []
        for elt in path:
            if isinstance(elt, int):
                gen_path.append(int(node_path[node_idx].data))
                node_idx += 1
            elif result_type != "idx":
                gen_path.append(elt)
        if result_type == "jq_path":
            yield build_jq_path(tuple(gen_path), keep_list_idxs=True)
        else:
            yield gen_path

ValuesIndex

ValuesIndex()

Index of unique values organized by their jq paths.

Maintains a compressed index mapping each jq path to its unique values, with optional tracking of all occurrences via path trees.

Attributes:

Name Type Description
path_values Dict[str, Dict[Any, ValuePath]]

Nested dict mapping jq_path -> value -> ValuePath.

Methods:

Name Description
add

Add a value occurrence to the index.

get_values

Get all unique values for a jq path.

has_jqpath

Check if any values exist for the given jq path.

num_values

Get count of unique values for a jq path.

Source code in packages/utils/src/dataknobs_utils/json_utils.py
def __init__(self) -> None:
    self.path_values: Dict[str, Dict[Any, ValuePath]] = (
        {}
    )  # Dict[jq_path, Dict[value, ValuePath]]
Functions
add
add(value: Any, jq_path: str, path: Tuple[Any, ...] | None = None) -> None

Add a value occurrence to the index.

Parameters:

Name Type Description Default
value Any

The value to index.

required
jq_path str

The jq-style path where the value was found.

required
path Tuple[Any, ...] | None

Optional full path tuple for tracking specific occurrences.

None
Source code in packages/utils/src/dataknobs_utils/json_utils.py
def add(self, value: Any, jq_path: str, path: Tuple[Any, ...] | None = None) -> None:
    """Add a value occurrence to the index.

    Args:
        value: The value to index.
        jq_path: The jq-style path where the value was found.
        path: Optional full path tuple for tracking specific occurrences.
    """
    if jq_path in self.path_values:
        value_paths = self.path_values[jq_path]
    else:
        value_paths = {}
        self.path_values[jq_path] = value_paths

    if value == []:
        value = "_EMPTY_LIST_"
    elif value == {}:
        value = "_EMPTY_DICT_"

    if value in value_paths:
        value_path = value_paths[value]
    else:
        value_path = ValuePath(jq_path, value)
        value_paths[value] = value_path

    value_path.add(path)
get_values
get_values(jq_path: str) -> Set[Any]

Get all unique values for a jq path.

Parameters:

Name Type Description Default
jq_path str

The jq-style path to query.

required

Returns:

Type Description
Set[Any]

Set[Any]: Set of unique values found at this path.

Source code in packages/utils/src/dataknobs_utils/json_utils.py
def get_values(self, jq_path: str) -> Set[Any]:
    """Get all unique values for a jq path.

    Args:
        jq_path: The jq-style path to query.

    Returns:
        Set[Any]: Set of unique values found at this path.
    """
    return set(self.path_values.get(jq_path, {}).keys())
has_jqpath
has_jqpath(jq_path: str) -> bool

Check if any values exist for the given jq path.

Parameters:

Name Type Description Default
jq_path str

The jq-style path to check.

required

Returns:

Name Type Description
bool bool

True if values exist for this path, False otherwise.

Source code in packages/utils/src/dataknobs_utils/json_utils.py
def has_jqpath(self, jq_path: str) -> bool:
    """Check if any values exist for the given jq path.

    Args:
        jq_path: The jq-style path to check.

    Returns:
        bool: True if values exist for this path, False otherwise.
    """
    return jq_path in self.path_values
num_values
num_values(jq_path: str) -> int

Get count of unique values for a jq path.

Parameters:

Name Type Description Default
jq_path str

The jq-style path to query.

required

Returns:

Name Type Description
int int

Number of unique values at this path.

Source code in packages/utils/src/dataknobs_utils/json_utils.py
def num_values(self, jq_path: str) -> int:
    """Get count of unique values for a jq path.

    Args:
        jq_path: The jq-style path to query.

    Returns:
        int: Number of unique values at this path.
    """
    return len(self.path_values.get(jq_path, {}))

Functions

build_jq_path

build_jq_path(path: Tuple[Any, ...], keep_list_idxs: bool = True) -> str

Build a jq path string from a json_stream path tuple.

Converts a path tuple like ('data', 0, 'name') to a jq-style path like '.data[0].name'.

Parameters:

Name Type Description Default
path Tuple[Any, ...]

Tuple of json_stream path components, with integers representing array indices and strings representing object keys.

required
keep_list_idxs bool

If True, keeps exact array index values (e.g., [0], [1]). If False, emits generic '[]' for all array indices. Defaults to True.

True

Returns:

Name Type Description
str str

A jq-style path string.

Source code in packages/utils/src/dataknobs_utils/json_utils.py
def build_jq_path(path: Tuple[Any, ...], keep_list_idxs: bool = True) -> str:
    """Build a jq path string from a json_stream path tuple.

    Converts a path tuple like ('data', 0, 'name') to a jq-style path
    like '.data[0].name'.

    Args:
        path: Tuple of json_stream path components, with integers representing
            array indices and strings representing object keys.
        keep_list_idxs: If True, keeps exact array index values (e.g., [0], [1]).
            If False, emits generic '[]' for all array indices. Defaults to True.

    Returns:
        str: A jq-style path string.
    """
    jq_path = ""
    for elt in path:
        if isinstance(elt, int):
            jq_path += f"[{elt}]" if keep_list_idxs and elt >= 0 else "[]"
        else:
            jq_path += f".{elt}"
    return jq_path

build_path_tuple

build_path_tuple(jq_path: str, any_list_idx: int = -1) -> Tuple[Any, ...]

Build a json_stream tuple path from a jq path string.

Reverses the operation of build_jq_path, converting a jq-style path like '.data[0].name' back to a tuple like ('data', 0, 'name').

Parameters:

Name Type Description Default
jq_path str

The jq-style path string to convert (e.g., '.data[0].name').

required
any_list_idx int

Index value to use for generic '[]' array notation. Defaults to -1.

-1

Returns:

Type Description
Tuple[Any, ...]

Tuple[Any, ...]: Path tuple in json_stream format alternating between keys (str) and array indices (int).

Source code in packages/utils/src/dataknobs_utils/json_utils.py
def build_path_tuple(jq_path: str, any_list_idx: int = -1) -> Tuple[Any, ...]:
    """Build a json_stream tuple path from a jq path string.

    Reverses the operation of build_jq_path, converting a jq-style path like
    '.data[0].name' back to a tuple like ('data', 0, 'name').

    Args:
        jq_path: The jq-style path string to convert (e.g., '.data[0].name').
        any_list_idx: Index value to use for generic '[]' array notation.
            Defaults to -1.

    Returns:
        Tuple[Any, ...]: Path tuple in json_stream format alternating between
            keys (str) and array indices (int).
    """
    path = []
    for part in jq_path.split("."):
        if part == "":
            continue
        m = ELT_IDX_RE.match(part)
        if m:
            path.append(m.group(1))
            idx = m.group(2)
            idxval = any_list_idx if idx == "" else int(idx)
            path.append(idxval)
        else:
            path.append(part)
    return tuple(path)

collect_squashed

collect_squashed(
    jdata: str,
    prune_at: List[Union[str, Tuple[str, int]]] | None = None,
    timeout: int = TIMEOUT,
    result: Dict[Any, Any] | None = None,
) -> Dict[Any, Any]

Collect squashed JSON data into a dictionary.

Convenience function that squashes JSON into a flat dictionary where keys are jq-style paths and values are the leaf values from the JSON.

Parameters:

Name Type Description Default
jdata str

JSON data source (file path, URL, or JSON string).

required
prune_at List[Union[str, Tuple[str, int]]] | None

List of path specifications identifying branches to skip. Defaults to None. See squash_data() for format details.

None
timeout int

Request timeout in seconds for URL sources. Defaults to 10.

TIMEOUT
result Dict[Any, Any] | None

Optional dictionary to populate. If None, creates a new dict.

None

Returns:

Type Description
Dict[Any, Any]

Dict[Any, Any]: Dictionary mapping jq-style paths to their values.

Source code in packages/utils/src/dataknobs_utils/json_utils.py
def collect_squashed(
    jdata: str,
    prune_at: List[Union[str, Tuple[str, int]]] | None = None,
    timeout: int = TIMEOUT,
    result: Dict[Any, Any] | None = None,
) -> Dict[Any, Any]:
    """Collect squashed JSON data into a dictionary.

    Convenience function that squashes JSON into a flat dictionary where keys
    are jq-style paths and values are the leaf values from the JSON.

    Args:
        jdata: JSON data source (file path, URL, or JSON string).
        prune_at: List of path specifications identifying branches to skip.
            Defaults to None. See squash_data() for format details.
        timeout: Request timeout in seconds for URL sources. Defaults to 10.
        result: Optional dictionary to populate. If None, creates a new dict.

    Returns:
        Dict[Any, Any]: Dictionary mapping jq-style paths to their values.
    """
    if result is None:
        result = {}

    def collector_fn(jq_path: str, item: Any) -> None:
        result[jq_path] = item

    squash_data(
        collector_fn,
        jdata,
        prune_at=prune_at,
        timeout=timeout,
    )
    return result

explode

explode(squashed: Dict[Any, Any]) -> Dict[Any, Any]

Explode a squashed JSON dictionary back into nested structure.

Reverses the squashing operation by converting a flat dictionary with jq-style paths as keys back into a nested JSON-like structure.

Parameters:

Name Type Description Default
squashed Dict[Any, Any]

Dictionary with jq-style paths as keys and leaf values.

required

Returns:

Type Description
Dict[Any, Any]

Dict[Any, Any]: Nested dictionary reconstructed from the paths.

Source code in packages/utils/src/dataknobs_utils/json_utils.py
def explode(squashed: Dict[Any, Any]) -> Dict[Any, Any]:
    """Explode a squashed JSON dictionary back into nested structure.

    Reverses the squashing operation by converting a flat dictionary with
    jq-style paths as keys back into a nested JSON-like structure.

    Args:
        squashed: Dictionary with jq-style paths as keys and leaf values.

    Returns:
        Dict[Any, Any]: Nested dictionary reconstructed from the paths.
    """
    result: Dict[Any, Any] = {}
    for jq_path, value in squashed.items():
        path_to_dict(jq_path, value, result)
    return result

get_records_df

get_records_df(json_data: str, timeout: int = TIMEOUT) -> pd.DataFrame

Collect top-level JSON records into a pandas DataFrame.

Convenience function that streams JSON and collects all records into memory as a DataFrame with columns: rec_id, line_num, jq_path, item.

Warning

Not suitable for large JSON files as all data is loaded into memory.

Parameters:

Name Type Description Default
json_data str

JSON data source (file path, URL, or JSON string).

required
timeout int

Request timeout in seconds for URL sources. Defaults to 10.

TIMEOUT

Returns:

Type Description
DataFrame

pd.DataFrame: Records with columns rec_id, line_num, jq_path, item.

Source code in packages/utils/src/dataknobs_utils/json_utils.py
def get_records_df(json_data: str, timeout: int = TIMEOUT) -> pd.DataFrame:
    """Collect top-level JSON records into a pandas DataFrame.

    Convenience function that streams JSON and collects all records into memory
    as a DataFrame with columns: rec_id, line_num, jq_path, item.

    Warning:
        Not suitable for large JSON files as all data is loaded into memory.

    Args:
        json_data: JSON data source (file path, URL, or JSON string).
        timeout: Request timeout in seconds for URL sources. Defaults to 10.

    Returns:
        pd.DataFrame: Records with columns rec_id, line_num, jq_path, item.
    """
    s = io.StringIO()
    stream_record_paths(
        json_data,
        s,
        lambda rid, lid, jqp, val: f"{rid}\t{lid}\t{jqp}\t{val}",
        timeout=timeout,
    )
    s.seek(0)
    df = pd.read_csv(s, sep="\t", names=["rec_id", "line_num", "jq_path", "item"])
    return df

get_value

get_value(
    json_obj: Dict[str, Any], key_path: str, default: Any | None = None
) -> Union[Any, List[Any]]

Get a value from a JSON object using a key path in indexed dot notation.

Parameters:

Name Type Description Default
json_obj Dict[str, Any]

The JSON object to search in

required
key_path str

Dot-delimited string with optional [index] notation

required
default Any | None

Value to return if path not found (default: None)

None

Returns:

Type Description
Union[Any, List[Any]]

Single value or list of values depending on key_path

Source code in packages/utils/src/dataknobs_utils/json_utils.py
def get_value(
    json_obj: Dict[str, Any], key_path: str, default: Any | None = None
) -> Union[Any, List[Any]]:
    """Get a value from a JSON object using a key path in indexed dot notation.

    Args:
        json_obj: The JSON object to search in
        key_path: Dot-delimited string with optional [index] notation
        default: Value to return if path not found (default: None)

    Returns:
        Single value or list of values depending on key_path
    """
    # Split key path into individual segments
    pattern = r"([^.\[]+)(?:\[([^\]]+)\])?"
    segments = [
        (match.group(1), match.group(2))
        for match in re.finditer(pattern, key_path)
        if match.group(1)
    ]

    def traverse(obj: Any, seg_idx: int = 0) -> Union[Any, List[Any]]:
        # Base case: reached end of segments
        if seg_idx >= len(segments):
            return obj

        key, index = segments[seg_idx]

        # Handle case where object is None or doesn't have the key
        if not isinstance(obj, (dict, list)) or (isinstance(obj, dict) and key not in obj):
            return [] if any(s[1] in ("*", "?") for s in segments[seg_idx:]) else default

        # Get the next level value
        if isinstance(obj, dict):
            next_obj = obj.get(key)
        else:  # list
            return [] if any(s[1] in ("*", "?") for s in segments[seg_idx:]) else default

        # Handle different index cases
        if index is None:
            return traverse(next_obj, seg_idx + 1)

        elif index == "*":
            if not isinstance(next_obj, list):
                return []
            results = [traverse(item, seg_idx + 1) for item in next_obj]
            # Flatten single-item lists if no more wildcards ahead
            if not any(s[1] in ("*", "?") for s in segments[seg_idx + 1 :]):
                return [r for r in results if r != default]
            return results

        elif index == "?":
            if not isinstance(next_obj, list):
                return default
            for item in next_obj:
                result = traverse(item, seg_idx + 1)
                if result != default:
                    return result
            return default

        else:  # numeric index
            try:
                idx = int(index)
                if not isinstance(next_obj, list) or idx >= len(next_obj):
                    return default
                return traverse(next_obj[idx], seg_idx + 1)
            except ValueError:
                return default

    result = traverse(json_obj)
    # Return default value if result is None and no wildcards were used
    if result is None and not any(seg[1] in ("*", "?") for seg in segments):
        return default
    return result

indexing_format_fn

indexing_format_fn(jq_path: str, item: Any) -> str

Format a (jq_path, item) pair for indexing purposes.

Creates a tab-separated format optimized for indexing with columns: value, field, flat_jq, idxs.

Parameters:

Name Type Description Default
jq_path str

The jq-style path to the item.

required
item Any

The value at the path.

required

Returns:

Name Type Description
str str

Tab-separated string with format: - value: The item value - field: The last path element (field name) - flat_jq: The path with array indices flattened to '[]' - idxs: Comma-separated list of array indices from the path

Source code in packages/utils/src/dataknobs_utils/json_utils.py
def indexing_format_fn(jq_path: str, item: Any) -> str:
    """Format a (jq_path, item) pair for indexing purposes.

    Creates a tab-separated format optimized for indexing with columns:
    value, field, flat_jq, idxs.

    Args:
        jq_path: The jq-style path to the item.
        item: The value at the path.

    Returns:
        str: Tab-separated string with format:
            - value: The item value
            - field: The last path element (field name)
            - flat_jq: The path with array indices flattened to '[]'
            - idxs: Comma-separated list of array indices from the path
    """
    idxs = ", ".join(FLATTEN_IDX_RE.findall(jq_path))
    flat_jq = FLATTEN_IDX_RE.sub("[]", jq_path)
    dotpos = flat_jq.rindex(".")
    field = flat_jq[dotpos + 1 :]
    flat_jq = flat_jq[:dotpos]
    return f"{item}\t{field}\t{flat_jq}\t{idxs}"

indexing_format_splitter

indexing_format_splitter(
    fileline: str,
) -> Tuple[str | None, str | None, str | None, str | None]

Parse a line formatted by indexing_format_fn.

Reverses indexing_format_fn to extract the original components.

Parameters:

Name Type Description Default
fileline str

Tab-separated line from indexing_format_fn.

required

Returns:

Type Description
Tuple[str | None, str | None, str | None, str | None]

Tuple[str | None, str | None, str | None, str | None]: A tuple containing: - value: The item value - field: The last path element (field name) - flat_jq: The path with array indices flattened to '[]' - idxs: Comma-separated list of array indices All values are None if line is empty.

Source code in packages/utils/src/dataknobs_utils/json_utils.py
def indexing_format_splitter(
    fileline: str,
) -> Tuple[str | None, str | None, str | None, str | None]:
    """Parse a line formatted by indexing_format_fn.

    Reverses indexing_format_fn to extract the original components.

    Args:
        fileline: Tab-separated line from indexing_format_fn.

    Returns:
        Tuple[str | None, str | None, str | None, str | None]: A tuple containing:
            - value: The item value
            - field: The last path element (field name)
            - flat_jq: The path with array indices flattened to '[]'
            - idxs: Comma-separated list of array indices
            All values are None if line is empty.
    """
    line = fileline.strip()
    value = None
    field = None
    flat_jq = None
    idxs = None
    if line:
        parts = fileline.split("\t")
        value = parts[0]
        if len(parts) > 1:
            field = parts[1]
            if len(parts) > 2:
                flat_jq = parts[2]
                if len(parts) > 3:
                    idxs = parts[3]
    return (value, field, flat_jq, idxs)

path_to_dict

path_to_dict(
    path: Union[Tuple[Any, ...], str],
    value: Any,
    result: Dict[Any, Any] | None = None,
) -> Dict[Any, Any]

Convert a jq path and value into a nested dictionary structure.

Takes a path (jq string or tuple) and reconstructs the nested dictionary structure it represents, setting the value at the leaf.

Parameters:

Name Type Description Default
path Union[Tuple[Any, ...], str]

Path to the value - either a jq-style string (e.g., '.data[0].name') or a path tuple (e.g., ('data', 0, 'name')).

required
value Any

The value to set at the path.

required
result Dict[Any, Any] | None

Optional dictionary to populate. If None, creates a new dict.

None

Returns:

Type Description
Dict[Any, Any]

Dict[Any, Any]: Dictionary with nested structure representing the path.

Source code in packages/utils/src/dataknobs_utils/json_utils.py
def path_to_dict(
    path: Union[Tuple[Any, ...], str], value: Any, result: Dict[Any, Any] | None = None
) -> Dict[Any, Any]:
    """Convert a jq path and value into a nested dictionary structure.

    Takes a path (jq string or tuple) and reconstructs the nested dictionary
    structure it represents, setting the value at the leaf.

    Args:
        path: Path to the value - either a jq-style string (e.g., '.data[0].name')
            or a path tuple (e.g., ('data', 0, 'name')).
        value: The value to set at the path.
        result: Optional dictionary to populate. If None, creates a new dict.

    Returns:
        Dict[Any, Any]: Dictionary with nested structure representing the path.
    """
    if result is None:
        result = {}
    if isinstance(path, str):
        path = build_path_tuple(path, any_list_idx=-1)

    def do_it(
        cur_dict: Dict[Any, Any], path: Tuple[Any, ...], path_idx: int, pathlen: int, value: Any
    ) -> None:
        if path_idx >= pathlen:
            return
        path_elt = path[path_idx]
        path_idx += 1
        list_idx = None
        if path_idx < pathlen:
            next_elt = path[path_idx]
            if isinstance(next_elt, int):
                list_idx = next_elt
                path_idx += 1

        if path_elt in cur_dict:
            if list_idx is None:
                if path_idx < pathlen:
                    cur_dict = cur_dict[path_elt]
                else:
                    cur_dict[path_elt] = value
            else:
                cur_list = cur_dict[path_elt]
                if len(cur_list) <= list_idx:
                    if path_idx < pathlen:
                        cur_dict = {}
                        # simplifying assumption: idxs are in consecutive order fm 0
                        cur_list.append(cur_dict)
                    else:
                        cur_list.append(value)
                elif path_idx < pathlen:
                    cur_dict = cur_list[-1]
                else:
                    cur_list.append(value)
        elif list_idx is None:
            if path_idx < pathlen:
                elt_dict: Dict[Any, Any] = {}
                cur_dict[path_elt] = elt_dict
                cur_dict = elt_dict
            else:
                cur_dict[path_elt] = value
        else:
            cur_list = []
            cur_dict[path_elt] = cur_list
            if path_idx < pathlen:
                cur_dict = {}
                cur_list.append(cur_dict)
            else:
                cur_list.append(value)
        # recurse to keep moving along the path
        do_it(cur_dict, path, path_idx, pathlen, value)

    do_it(result, path, 0, len(path), value)

    return result

squash_data

squash_data(
    builder_fn: Callable[[str, Any], None],
    json_data: str,
    prune_at: List[Union[str, Tuple[str, int]]] | None = None,
    timeout: int = TIMEOUT,
) -> None

Squash JSON data into single-level structure with jq-style keys.

Compresses nested JSON paths into a flat structure where each path becomes a jq-style key. Optionally prunes specific branches from the output.

Pruning specification formats
  • (path_element_name, path_index): Paths where path[path_index] == path_element_name
  • path_element_name or (path_element_name, None): Any path containing this element name
  • path_index or (None, path_index): Any path at this depth

Parameters:

Name Type Description Default
builder_fn Callable[[str, Any], None]

Callback function with signature fn(jq_path, item) called for each path/value pair to build results.

required
json_data str

JSON data source (file path, URL, or JSON string).

required
prune_at List[Union[str, Tuple[str, int]]] | None

List of path specifications identifying branches to skip. Defaults to None (no pruning).

None
timeout int

Request timeout in seconds for URL sources. Defaults to 10.

TIMEOUT
Source code in packages/utils/src/dataknobs_utils/json_utils.py
def squash_data(
    builder_fn: Callable[[str, Any], None],
    json_data: str,
    prune_at: List[Union[str, Tuple[str, int]]] | None = None,
    timeout: int = TIMEOUT,
) -> None:
    """Squash JSON data into single-level structure with jq-style keys.

    Compresses nested JSON paths into a flat structure where each path becomes
    a jq-style key. Optionally prunes specific branches from the output.

    Pruning specification formats:
        - (path_element_name, path_index): Paths where path[path_index] == path_element_name
        - path_element_name or (path_element_name, None): Any path containing this element name
        - path_index or (None, path_index): Any path at this depth

    Args:
        builder_fn: Callback function with signature fn(jq_path, item) called for
            each path/value pair to build results.
        json_data: JSON data source (file path, URL, or JSON string).
        prune_at: List of path specifications identifying branches to skip.
            Defaults to None (no pruning).
        timeout: Request timeout in seconds for URL sources. Defaults to 10.
    """
    raw_depths = set()
    raw_elts = set()
    elt_depths = {}
    depth_elts = defaultdict(set)

    def decode_item(item: Any) -> None:
        if item is not None:
            if isinstance(item, str):
                raw_elts.add(item)
            elif isinstance(item, int):
                raw_depths.add(item)
            elif isinstance(item, tuple) and len(item) == 2:
                elt = item[0]
                depth = item[1]
                if elt is None:
                    if depth is not None:
                        raw_depths.add(depth)
                elif depth is not None:
                    depth_elts[depth].add(elt)
                    elt_depths[elt] = depth
                else:
                    raw_elts.add(elt)
            elif isinstance(item, (list, tuple)):
                for i in item:
                    decode_item(i)

    decode_item(prune_at)
    has_raw_depths = len(raw_depths) > 0
    has_raw_elts = len(raw_elts) > 0
    has_elts = len(elt_depths) > 0
    has_depth_elts = len(depth_elts) > 0
    do_prune = has_raw_depths or has_raw_elts or has_elts or has_depth_elts

    def visitor(item: Any, path: Tuple[Any, ...]) -> None:
        if do_prune:
            cur_depth = len(path)
            if has_raw_depths and cur_depth in raw_depths:
                return
            if has_raw_elts:
                if len(raw_elts.intersection(path)) > 0:
                    return
            cur_elt = path[-1]
            if has_elts and cur_elt in elt_depths:
                if cur_depth == elt_depths[cur_elt]:
                    return
            if has_depth_elts:
                for depth, elts in depth_elts.items():
                    if depth < cur_depth and path[depth] in elts:
                        return
        # Add squashed element
        jq_path = build_jq_path(path, keep_list_idxs=True)
        builder_fn(jq_path, item)

    stream_json_data(json_data, visitor, timeout=timeout)

stream_jq_paths

stream_jq_paths(
    json_data: str,
    output_stream: TextIO,
    line_builder_fn: Callable[[str, Any], str] = lambda jq_path, item: (
        f"{jq_path}	{item}"
    ),
    keep_list_idxs: bool = True,
    timeout: int = TIMEOUT,
) -> None

Stream JSON data and write formatted lines for each (jq_path, item) pair.

Parameters:

Name Type Description Default
json_data str

JSON data source (file path, URL, or JSON string).

required
output_stream TextIO

Text stream to write formatted lines to.

required
line_builder_fn Callable[[str, Any], str]

Function that takes (jq_path, item) and returns a formatted string. Defaults to tab-separated format.

lambda jq_path, item: f'{jq_path} {item}'
keep_list_idxs bool

If True, keeps exact array index values. If False, uses generic '[]'. Defaults to True.

True
timeout int

Request timeout in seconds for URL sources. Defaults to 10.

TIMEOUT
Source code in packages/utils/src/dataknobs_utils/json_utils.py
def stream_jq_paths(
    json_data: str,
    output_stream: TextIO,
    line_builder_fn: Callable[[str, Any], str] = (lambda jq_path, item: f"{jq_path}\t{item}"),
    keep_list_idxs: bool = True,
    timeout: int = TIMEOUT,
) -> None:
    """Stream JSON data and write formatted lines for each (jq_path, item) pair.

    Args:
        json_data: JSON data source (file path, URL, or JSON string).
        output_stream: Text stream to write formatted lines to.
        line_builder_fn: Function that takes (jq_path, item) and returns a
            formatted string. Defaults to tab-separated format.
        keep_list_idxs: If True, keeps exact array index values.
            If False, uses generic '[]'. Defaults to True.
        timeout: Request timeout in seconds for URL sources. Defaults to 10.
    """

    def visitor(item: Any, path: Tuple[Any, ...]) -> None:
        jq_path = build_jq_path(path, keep_list_idxs=keep_list_idxs)
        line = line_builder_fn(jq_path, item)
        print(line, file=output_stream)

    stream_json_data(json_data, visitor, timeout=timeout)

stream_json_data

stream_json_data(
    json_data: str,
    visitor_fn: Callable[[Any, Tuple[Any, ...]], None],
    timeout: int = TIMEOUT,
) -> None

Stream JSON data and call a visitor function for each value.

Supports multiple input formats: file paths (including .gz), URLs, or JSON strings. Automatically detects and handles gzip-compressed files.

Parameters:

Name Type Description Default
json_data str

The JSON data source - can be a file path, URL (starting with 'http'), or JSON string.

required
visitor_fn Callable[[Any, Tuple[Any, ...]], None]

Function called for each JSON value with signature visitor_fn(item, path) where item is the value and path is a tuple of elements identifying the path to the item.

required
timeout int

Request timeout in seconds for URL sources. Defaults to 10.

TIMEOUT
Source code in packages/utils/src/dataknobs_utils/json_utils.py
def stream_json_data(
    json_data: str,
    visitor_fn: Callable[[Any, Tuple[Any, ...]], None],
    timeout: int = TIMEOUT,
) -> None:
    """Stream JSON data and call a visitor function for each value.

    Supports multiple input formats: file paths (including .gz), URLs, or JSON strings.
    Automatically detects and handles gzip-compressed files.

    Args:
        json_data: The JSON data source - can be a file path, URL (starting with
            'http'), or JSON string.
        visitor_fn: Function called for each JSON value with signature
            visitor_fn(item, path) where item is the value and path is a tuple
            of elements identifying the path to the item.
        timeout: Request timeout in seconds for URL sources. Defaults to 10.
    """
    if os.path.exists(json_data):
        if dk_futils.is_gzip_file(json_data):
            with gzip.open(json_data, "rt", encoding="utf-8") as f:
                json_stream.visit(f, visitor_fn)
        else:
            with open(json_data, encoding="utf-8") as f:
                json_stream.visit(f, visitor_fn)
    elif json_data.startswith("http"):
        with requests.get(json_data, stream=True, timeout=timeout) as response:
            json_stream.requests.visit(response, visitor_fn)
    elif isinstance(json_data, str):
        string_io = io.StringIO(json_data)
        json_stream.visit(string_io, visitor_fn)

stream_record_paths

stream_record_paths(
    json_data: str,
    output_stream: TextIO,
    line_builder_fn: Callable[[int, int, str, Any], str],
    timeout: int = TIMEOUT,
) -> None

Stream JSON and write formatted records grouped by top-level structure.

Identifies top-level JSON records (typically array elements) and writes formatted lines for each path within each record.

Each output line represents a (rec_id, line_num, jq_path, item) tuple where: - rec_id: 0-based record identifier - line_num: 0-based original item number from stream - jq_path: Fully-qualified path within the record - item: Value at the path

Parameters:

Name Type Description Default
json_data str

JSON data source (file path, URL, or JSON string).

required
output_stream TextIO

Text stream to write formatted lines to.

required
line_builder_fn Callable[[int, int, str, Any], str]

Function with signature fn(rec_id, line_num, jq_path, item) that returns a formatted string.

required
timeout int

Request timeout in seconds for URL sources. Defaults to 10.

TIMEOUT
Source code in packages/utils/src/dataknobs_utils/json_utils.py
def stream_record_paths(
    json_data: str,
    output_stream: TextIO,
    line_builder_fn: Callable[[int, int, str, Any], str],
    timeout: int = TIMEOUT,
) -> None:
    """Stream JSON and write formatted records grouped by top-level structure.

    Identifies top-level JSON records (typically array elements) and writes
    formatted lines for each path within each record.

    Each output line represents a (rec_id, line_num, jq_path, item) tuple where:
        - rec_id: 0-based record identifier
        - line_num: 0-based original item number from stream
        - jq_path: Fully-qualified path within the record
        - item: Value at the path

    Args:
        json_data: JSON data source (file path, URL, or JSON string).
        output_stream: Text stream to write formatted lines to.
        line_builder_fn: Function with signature fn(rec_id, line_num, jq_path, item)
            that returns a formatted string.
        timeout: Request timeout in seconds for URL sources. Defaults to 10.
    """
    rpb = RecordPathBuilder(json_data, output_stream, line_builder_fn, timeout=timeout)
    rpb.stream_record_paths()

write_squashed

write_squashed(
    dest_file: Union[str, TextIO],
    jdata: str,
    prune_at: List[Union[str, Tuple[str, int]]] | None = None,
    timeout: int = TIMEOUT,
    format_fn: Callable[[str, Any], str] = lambda jq_path, item: (
        f"{jq_path}	{item}"
    ),
) -> None

Write squashed JSON data to a file.

Parameters:

Name Type Description Default
dest_file Union[str, TextIO]

Output file path (str) or open text stream (TextIO).

required
jdata str

JSON data source (file path, URL, or JSON string).

required
prune_at List[Union[str, Tuple[str, int]]] | None

List of path specifications identifying branches to skip. Defaults to None. See squash_data() for format details.

None
timeout int

Request timeout in seconds for URL sources. Defaults to 10.

TIMEOUT
format_fn Callable[[str, Any], str]

Function to format each (jq_path, item) pair as a line. Defaults to tab-separated format.

lambda jq_path, item: f'{jq_path} {item}'
Source code in packages/utils/src/dataknobs_utils/json_utils.py
def write_squashed(
    dest_file: Union[str, TextIO],
    jdata: str,
    prune_at: List[Union[str, Tuple[str, int]]] | None = None,
    timeout: int = TIMEOUT,
    format_fn: Callable[[str, Any], str] = lambda jq_path, item: f"{jq_path}\t{item}",
) -> None:
    """Write squashed JSON data to a file.

    Args:
        dest_file: Output file path (str) or open text stream (TextIO).
        jdata: JSON data source (file path, URL, or JSON string).
        prune_at: List of path specifications identifying branches to skip.
            Defaults to None. See squash_data() for format details.
        timeout: Request timeout in seconds for URL sources. Defaults to 10.
        format_fn: Function to format each (jq_path, item) pair as a line.
            Defaults to tab-separated format.
    """
    needs_close = False
    f: TextIO
    if isinstance(dest_file, str):
        f = open(dest_file, "w", encoding="utf-8")
        needs_close = True
    else:
        f = dest_file
    squash_data(
        lambda jq_path, item: print(format_fn(jq_path, item), file=f),
        jdata,
        prune_at=prune_at,
        timeout=timeout,
    )
    if needs_close:
        f.close()

llm_utils

Classes

PromptMessage

dataknobs_utils.llm_utils.PromptMessage

PromptMessage(role: str, content: str, metadata: Dict[str, Any] | None = None)

Structured prompt message with role, content, and optional metadata.

Represents a single message in an LLM conversation with format

{"role": , "content": }

Where role is typically "system", "user", or "assistant" and content contains the prompt or LLM-generated text.

Attributes:

Name Type Description
role

Message role (e.g., "system", "user", "assistant").

content

Message text content.

metadata

Optional metadata dictionary.

Initialize prompt message with role and content.

Parameters:

Name Type Description Default
role str

Message role (e.g., "system", "user", "assistant").

required
content str

Prompt or LLM-generated text.

required
metadata Dict[str, Any] | None

Optional metadata containing generation args, execution data, and user comments. Defaults to None.

None

Methods:

Name Description
__repr__

Get message as JSON string without metadata.

build_instance

Reconstruct a PromptMessage from its dictionary representation.

get_message

Get message as a dictionary.

to_json

Serialize message to JSON string.

Source code in packages/utils/src/dataknobs_utils/llm_utils.py
def __init__(self, role: str, content: str, metadata: Dict[str, Any] | None = None):
    """Initialize prompt message with role and content.

    Args:
        role: Message role (e.g., "system", "user", "assistant").
        content: Prompt or LLM-generated text.
        metadata: Optional metadata containing generation args, execution
            data, and user comments. Defaults to None.
    """
    self.role = role
    self.content = content
    self.metadata = metadata
    self._dict: Dict[str, str] | None = None  # The dict without metadata

Functions

__repr__

__repr__() -> str

Get message as JSON string without metadata.

Returns:

Name Type Description
str str

JSON string representation of message.

Source code in packages/utils/src/dataknobs_utils/llm_utils.py
def __repr__(self) -> str:
    """Get message as JSON string without metadata.

    Returns:
        str: JSON string representation of message.
    """
    return self.to_json(with_metadata=False)

build_instance staticmethod

build_instance(message_dict: Dict[str, Any]) -> PromptMessage

Reconstruct a PromptMessage from its dictionary representation.

Parameters:

Name Type Description Default
message_dict Dict[str, Any]

Dictionary with "role", "content", and optionally "metadata" keys.

required

Returns:

Name Type Description
PromptMessage PromptMessage

Reconstructed message instance.

Source code in packages/utils/src/dataknobs_utils/llm_utils.py
@staticmethod
def build_instance(message_dict: Dict[str, Any]) -> "PromptMessage":
    """Reconstruct a PromptMessage from its dictionary representation.

    Args:
        message_dict: Dictionary with "role", "content", and optionally
            "metadata" keys.

    Returns:
        PromptMessage: Reconstructed message instance.
    """
    return PromptMessage(
        message_dict.get("role", "unknown"),
        message_dict.get("content", ""),
        metadata=message_dict.get("metadata"),
    )

get_message

get_message(with_metadata: Union[bool, str] = False) -> Dict[str, Any]

Get message as a dictionary.

Parameters:

Name Type Description Default
with_metadata Union[bool, str]

If True, includes metadata. If a string, uses that as the metadata key instead of "metadata". Defaults to False.

False

Returns:

Type Description
Dict[str, Any]

Dict[str, Any]: Dictionary with "role" and "content", optionally including metadata.

Source code in packages/utils/src/dataknobs_utils/llm_utils.py
def get_message(self, with_metadata: Union[bool, str] = False) -> Dict[str, Any]:
    """Get message as a dictionary.

    Args:
        with_metadata: If True, includes metadata. If a string, uses that
            as the metadata key instead of "metadata". Defaults to False.

    Returns:
        Dict[str, Any]: Dictionary with "role" and "content", optionally
            including metadata.
    """
    if self._dict is None:
        self._dict = {
            "role": self.role,
            "content": self.content,
        }
    retval: Dict[str, Any]
    if with_metadata and self.metadata is not None:
        retval = dict(self._dict)  # Convert to Dict[str, Any]
        attr = with_metadata if isinstance(with_metadata, str) else "metadata"
        retval[attr] = self.metadata
    else:
        retval = dict(self._dict)
    return retval

to_json

to_json(with_metadata: bool = True) -> str

Serialize message to JSON string.

Parameters:

Name Type Description Default
with_metadata bool

If True, includes metadata. Defaults to True.

True

Returns:

Name Type Description
str str

JSON string representation of message.

Raises:

Type Description
TypeError

If metadata is not JSON serializable.

Source code in packages/utils/src/dataknobs_utils/llm_utils.py
def to_json(self, with_metadata: bool = True) -> str:
    """Serialize message to JSON string.

    Args:
        with_metadata: If True, includes metadata. Defaults to True.

    Returns:
        str: JSON string representation of message.

    Raises:
        TypeError: If metadata is not JSON serializable.
    """
    return json.dumps(self.get_message(with_metadata=with_metadata))

Functions

get_value_by_key

dataknobs_utils.llm_utils.get_value_by_key

get_value_by_key(
    d: Dict[str, Any] | None, pathkey: str, default_value: Any = None
) -> Any

Get a nested value from a dictionary using dot-delimited path.

Navigates through nested dictionaries using a path key like "foo.bar.baz" to retrieve deeply nested values.

Parameters:

Name Type Description Default
d Dict[str, Any] | None

Possibly nested dictionary to search.

required
pathkey str

Dot-delimited path to the value (e.g., "foo.bar").

required
default_value Any

Value to return if path doesn't exist. Defaults to None.

None

Returns:

Name Type Description
Any Any

Value at the path, or default_value if path doesn't exist.

Examples:

>>> d = {"foo": {"bar": "baz"}}
>>> get_value_by_key(d, "foo.bar")
'baz'
Source code in packages/utils/src/dataknobs_utils/llm_utils.py
def get_value_by_key(
    d: Dict[str, Any] | None,
    pathkey: str,
    default_value: Any = None,
) -> Any:
    """Get a nested value from a dictionary using dot-delimited path.

    Navigates through nested dictionaries using a path key like "foo.bar.baz"
    to retrieve deeply nested values.

    Args:
        d: Possibly nested dictionary to search.
        pathkey: Dot-delimited path to the value (e.g., "foo.bar").
        default_value: Value to return if path doesn't exist. Defaults to None.

    Returns:
        Any: Value at the path, or default_value if path doesn't exist.

    Examples:
        >>> d = {"foo": {"bar": "baz"}}
        >>> get_value_by_key(d, "foo.bar")
        'baz'
    """
    path = pathkey.split(".")
    if d is None:
        return default_value

    for key in path:
        if not isinstance(d, dict) or key not in d:
            return default_value
        d = d[key]

    return d

pandas_utils

Functions

dataknobs_utils.pandas_utils

Pandas DataFrame utility functions and data transformations.

Provides utilities for creating, transforming, and manipulating Pandas DataFrames, including conversions between dicts, lists, and DataFrame formats.

Classes:

Name Description
GroupManager

Manage overlapping row groups in a DataFrame using JSON-encoded group lists.

Functions:

Name Description
dicts2df

Create a DataFrame from dictionaries or lists of dictionaries.

explode_json_series

Explode a Series containing JSON-encoded lists into individual items.

get_loc_range

Find the range of True values in a boolean Series.

sort_by_strlen

Sort DataFrame by string length in a text column.

Classes

GroupManager

GroupManager(df: DataFrame, group_num_col: str)

Manage overlapping row groups in a DataFrame using JSON-encoded group lists.

Handles DataFrames where rows can belong to multiple groups, with group membership stored as JSON lists of group numbers (e.g., "[1, 3]" means row belongs to groups 1 and 3). Provides utilities for marking, unmarking, querying, and analyzing group memberships.

Attributes:

Name Type Description
idf

Original input DataFrame.

gcol

Name of the group number column.

Initialize group manager with DataFrame and group column.

Parameters:

Name Type Description Default
df DataFrame

DataFrame with rows to manage (will be re-indexed if needed).

required
group_num_col str

Name of the column containing JSON-encoded group number lists.

required

Methods:

Name Description
clear_all_groups

Remove all group assignments from the DataFrame.

find_subsets

Find groups that are subsets of other groups.

get_group_locs

Get row indices for a specific group.

get_intra_ungrouped_locs

Get row indices between group boundaries that aren't in the group.

get_subgroup_manager

Create a GroupManager for subgroups within a specific group.

mark_group

Add rows to a group, creating or updating group membership.

mark_groups

Mark multiple groups in a single operation.

remove_groups

Remove multiple groups entirely.

remove_subsets

Remove all groups that are subsets of other groups.

reset_group_numbers

Renumber all groups consecutively starting from a given number.

unmark_group

Remove a group number from specified rows or entirely.

Source code in packages/utils/src/dataknobs_utils/pandas_utils.py
def __init__(
    self,
    df: pd.DataFrame,
    group_num_col: str,
):
    """Initialize group manager with DataFrame and group column.

    Args:
        df: DataFrame with rows to manage (will be re-indexed if needed).
        group_num_col: Name of the column containing JSON-encoded group
            number lists.
    """
    self.idf = df  # input dataframe
    self.gcol = group_num_col
    self._cdf = self._fix_index(df)  # collapsed dataframe
    self._es: pd.Series | None = None  # expanded series
    self._glocs: Dict[int, List[int]] | None = None  # Dict[group_num, loc_series]
    self._mdf: pd.DataFrame | None = None  # mask dataframe
Attributes
all_group_locs property
all_group_locs: Dict[int, List[int]]

Get row indices for all groups.

Returns:

Type Description
Dict[int, List[int]]

Dict[int, List[int]]: Mapping from group number to list of row indices.

all_group_nums property
all_group_nums: List[int]

Get all existing group numbers.

Returns:

Type Description
List[int]

List[int]: List of group numbers currently in use (empty if none).

collapsed_df property
collapsed_df: DataFrame

Get the DataFrame with JSON-encoded group lists.

Alias for the df property.

Returns:

Type Description
DataFrame

pd.DataFrame: DataFrame with group column containing JSON lists.

df property
df: DataFrame

Get the DataFrame with JSON-encoded group lists.

Returns:

Type Description
DataFrame

pd.DataFrame: DataFrame with group column containing JSON lists.

expanded_ser property
expanded_ser: Series

Get Series with individual group numbers and repeated indices.

Expands JSON group lists so each group number becomes a separate row with repeated indices for rows belonging to multiple groups.

Returns:

Type Description
Series

pd.Series: Series with individual group numbers, indices repeated for multi-group membership.

grouped_locs property
grouped_locs: List[int]

Get indices of all rows belonging to at least one group.

Returns:

Type Description
List[int]

List[int]: Row indices with group membership.

mask_df property
mask_df: DataFrame

Get DataFrame of boolean masks for each group.

Returns:

Type Description
DataFrame

pd.DataFrame: DataFrame where each column is a boolean mask for a group, with column names like "{group_col}_{group_num}".

max_group_num property
max_group_num: int

Get the highest group number in use.

Returns:

Name Type Description
int int

Maximum group number, or -1 if no groups exist.

ungrouped_locs property
ungrouped_locs: List[int]

Get indices of all rows not belonging to any group.

Returns:

Type Description
List[int]

List[int]: Row indices without group membership.

Functions
clear_all_groups
clear_all_groups() -> None

Remove all group assignments from the DataFrame.

Resets the group column to NaN for all rows.

Source code in packages/utils/src/dataknobs_utils/pandas_utils.py
def clear_all_groups(self) -> None:
    """Remove all group assignments from the DataFrame.

    Resets the group column to NaN for all rows.
    """
    df = self.collapsed_df
    if self.gcol in df.columns:
        df[self.gcol] = np.nan
        self._reset_edf()
find_subsets
find_subsets(proper: bool = True) -> Set[int]

Find groups that are subsets of other groups.

Parameters:

Name Type Description Default
proper bool

If True, includes proper subsets (strict subsets) and identical groups. If False, only strict subsets. Defaults to True.

True

Returns:

Type Description
Set[int]

Set[int]: Group numbers that are subsets of other groups.

Source code in packages/utils/src/dataknobs_utils/pandas_utils.py
def find_subsets(self, proper: bool = True) -> Set[int]:
    """Find groups that are subsets of other groups.

    Args:
        proper: If True, includes proper subsets (strict subsets) and
            identical groups. If False, only strict subsets. Defaults to True.

    Returns:
        Set[int]: Group numbers that are subsets of other groups.
    """
    rv = set()
    mdf = self.mask_df
    for g1, g2 in itertools.combinations(mdf.columns, 2):
        m1 = mdf[g1]
        m2 = mdf[g2]
        s1 = m1.sum()
        s2 = m2.sum()
        if proper or s1 != s2:
            combo_sum = (m1 & m2).sum()
            if combo_sum == s2:  # "later" smaller or equal
                # g2 is a (proper or smaller) subset of g1
                gn = int(g2[g2.rindex("_") + 1 :])
                rv.add(gn)
            elif combo_sum == s1:  # "earlier" smaller
                # g1 is a (smaller) subset of g2
                gn = int(g1[g1.rindex("_") + 1 :])
                rv.add(gn)
    return rv
get_group_locs
get_group_locs(group_num: int) -> List[int]

Get row indices for a specific group.

Parameters:

Name Type Description Default
group_num int

Group number to query.

required

Returns:

Type Description
List[int]

List[int]: List of row indices in the group (empty if group doesn't exist).

Source code in packages/utils/src/dataknobs_utils/pandas_utils.py
def get_group_locs(self, group_num: int) -> List[int]:
    """Get row indices for a specific group.

    Args:
        group_num: Group number to query.

    Returns:
        List[int]: List of row indices in the group (empty if group doesn't exist).
    """
    return self.all_group_locs.get(group_num, [])
get_intra_ungrouped_locs
get_intra_ungrouped_locs(group_num: int) -> List[int]

Get row indices between group boundaries that aren't in the group.

Finds rows that fall within the range from the first to last row of a group but aren't actually members of the group.

Parameters:

Name Type Description Default
group_num int

Group number to analyze.

required

Returns:

Type Description
List[int]

List[int]: Row indices within group range but not in the group.

Source code in packages/utils/src/dataknobs_utils/pandas_utils.py
def get_intra_ungrouped_locs(self, group_num: int) -> List[int]:
    """Get row indices between group boundaries that aren't in the group.

    Finds rows that fall within the range from the first to last row of
    a group but aren't actually members of the group.

    Args:
        group_num: Group number to analyze.

    Returns:
        List[int]: Row indices within group range but not in the group.
    """
    result = None
    colname = f"{self.gcol}_{group_num}"
    if colname in self.mask_df:
        mcol = self.mask_df[colname]
        if mcol.any():
            startloc, endloc = get_loc_range(mcol)
            locs = ~mcol[startloc : endloc + 1]
            if locs.any():
                result = locs[locs].index.tolist()
    return result if result is not None else []
get_subgroup_manager
get_subgroup_manager(group_num: int, subgroup_num_col: str) -> GroupManager

Create a GroupManager for subgroups within a specific group.

Extracts subgroup information for rows belonging to a specific group, filtering out subgroups that are shared with other groups. Returns a new GroupManager with a copy of the DataFrame showing only the relevant subgroup structure.

Parameters:

Name Type Description Default
group_num int

Group number whose subgroups to extract.

required
subgroup_num_col str

Name of the DataFrame column containing subgroup number lists.

required

Returns:

Name Type Description
GroupManager GroupManager

New manager with only subgroups unique to this group.

Source code in packages/utils/src/dataknobs_utils/pandas_utils.py
def get_subgroup_manager(
    self,
    group_num: int,
    subgroup_num_col: str,
) -> "GroupManager":
    """Create a GroupManager for subgroups within a specific group.

    Extracts subgroup information for rows belonging to a specific group,
    filtering out subgroups that are shared with other groups. Returns a
    new GroupManager with a copy of the DataFrame showing only the relevant
    subgroup structure.

    Args:
        group_num: Group number whose subgroups to extract.
        subgroup_num_col: Name of the DataFrame column containing
            subgroup number lists.

    Returns:
        GroupManager: New manager with only subgroups unique to this group.
    """
    group_locs = self.get_group_locs(group_num)
    # Get the subgroup column data as a Series
    subgroup_ser = pd.Series(self.collapsed_df.loc[group_locs, subgroup_num_col])

    # only subgroup_locs that are not shared should remain
    es = explode_json_series(subgroup_ser)
    vc = es.index.value_counts()
    all_nums = set(es.unique())
    keeper_nums = set(es[vc[vc == 1].index])
    discard_nums = all_nums.difference(keeper_nums)

    # Make (sub) group manager with a *COPY* of this manager's df
    # so as not to destroy the subgroup column's information.
    gm = GroupManager(self.collapsed_df.copy(), subgroup_num_col)
    gm.remove_groups(list(discard_nums))

    return gm
mark_group
mark_group(
    idx_values: Union[Series, List[int]], group_num: int | None = None
) -> None

Add rows to a group, creating or updating group membership.

Assigns rows to a group number, either specified or auto-generated. If the group already exists, adds new rows to it without removing existing members.

Parameters:

Name Type Description Default
idx_values Union[Series, List[int]]

Row indices to include in the group.

required
group_num int | None

Group number to assign. If None, uses next available number (max + 1). Defaults to None.

None
Source code in packages/utils/src/dataknobs_utils/pandas_utils.py
def mark_group(
    self, idx_values: Union[pd.Series, List[int]], group_num: int | None = None
) -> None:
    """Add rows to a group, creating or updating group membership.

    Assigns rows to a group number, either specified or auto-generated.
    If the group already exists, adds new rows to it without removing
    existing members.

    Args:
        idx_values: Row indices to include in the group.
        group_num: Group number to assign. If None, uses next available
            number (max + 1). Defaults to None.
    """
    df = self.collapsed_df
    if group_num is None:
        group_num = int(self.max_group_num) + 1
    if self.gcol not in df:
        df[self.gcol] = np.nan
    cur_values = df.loc[idx_values, self.gcol]

    def add_group(v: Any) -> str:
        if pd.notna(v) and v != "":
            groups = set(json.loads(v))
            groups.add(group_num)
            return json.dumps(list(groups))
        else:
            return f"[{group_num}]"

    df[self.gcol] = df[self.gcol].astype(object)
    df.loc[idx_values, self.gcol] = cur_values.apply(add_group)
    self._reset_edf()
mark_groups
mark_groups(
    idx_value_lists: List[Union[Series, List[int]]],
    group_nums: List[int] | None = None,
) -> None

Mark multiple groups in a single operation.

Parameters:

Name Type Description Default
idx_value_lists List[Union[Series, List[int]]]

List where each element contains row indices for a group.

required
group_nums List[int] | None

Group numbers corresponding to each idx_values list. If None, auto-generates consecutive numbers. Defaults to None.

None
Source code in packages/utils/src/dataknobs_utils/pandas_utils.py
def mark_groups(
    self,
    idx_value_lists: List[Union[pd.Series, List[int]]],
    group_nums: List[int] | None = None,
) -> None:
    """Mark multiple groups in a single operation.

    Args:
        idx_value_lists: List where each element contains row indices for a group.
        group_nums: Group numbers corresponding to each idx_values list.
            If None, auto-generates consecutive numbers. Defaults to None.
    """
    for pos, idx_values in enumerate(idx_value_lists):
        self.mark_group(
            idx_values, group_num=group_nums[pos] if group_nums is not None else None
        )
remove_groups
remove_groups(group_nums: Union[List[int], Set[int]]) -> None

Remove multiple groups entirely.

Parameters:

Name Type Description Default
group_nums Union[List[int], Set[int]]

Collection of group numbers to remove completely.

required
Source code in packages/utils/src/dataknobs_utils/pandas_utils.py
def remove_groups(self, group_nums: Union[List[int], Set[int]]) -> None:
    """Remove multiple groups entirely.

    Args:
        group_nums: Collection of group numbers to remove completely.
    """
    for gnum in group_nums:
        self.unmark_group(gnum)
remove_subsets
remove_subsets(proper: bool = True) -> None

Remove all groups that are subsets of other groups.

Parameters:

Name Type Description Default
proper bool

If True, removes proper subsets and identical groups. If False, only removes strict subsets. Defaults to True.

True
Source code in packages/utils/src/dataknobs_utils/pandas_utils.py
def remove_subsets(self, proper: bool = True) -> None:
    """Remove all groups that are subsets of other groups.

    Args:
        proper: If True, removes proper subsets and identical groups.
            If False, only removes strict subsets. Defaults to True.
    """
    self.remove_groups(list(self.find_subsets(proper=proper)))
reset_group_numbers
reset_group_numbers(start_num: int = 0) -> None

Renumber all groups consecutively starting from a given number.

Preserves group memberships but assigns new consecutive numbers.

Parameters:

Name Type Description Default
start_num int

Starting group number. Defaults to 0.

0
Source code in packages/utils/src/dataknobs_utils/pandas_utils.py
def reset_group_numbers(self, start_num: int = 0) -> None:
    """Renumber all groups consecutively starting from a given number.

    Preserves group memberships but assigns new consecutive numbers.

    Args:
        start_num: Starting group number. Defaults to 0.
    """
    glocs = self.all_group_locs
    self.clear_all_groups()
    for idx, (_gnum, locs) in enumerate(glocs.items()):
        self.mark_group(locs, group_num=start_num + idx)
unmark_group
unmark_group(group_num: int, idx_values: Series | None = None) -> None

Remove a group number from specified rows or entirely.

Parameters:

Name Type Description Default
group_num int

Group number to remove.

required
idx_values Series | None

Row indices from which to remove the group. If None, removes the group from all rows. Defaults to None.

None
Source code in packages/utils/src/dataknobs_utils/pandas_utils.py
def unmark_group(self, group_num: int, idx_values: pd.Series | None = None) -> None:
    """Remove a group number from specified rows or entirely.

    Args:
        group_num: Group number to remove.
        idx_values: Row indices from which to remove the group. If None,
            removes the group from all rows. Defaults to None.
    """
    df = self.collapsed_df
    if self.gcol in df:
        if idx_values is None:
            idx_values = pd.Series(df.index)
        gser = df[self.gcol]
        mask = gser.mask(gser.index.isin(idx_values), False).astype(bool)

        def del_group(v: Any) -> Any:
            rv = v
            if pd.notna(v) and v != "":
                groups = set(json.loads(v))
                groups.discard(group_num)
                rv = json.dumps(list(groups)) if len(groups) > 0 else np.nan
            return rv

        df[self.gcol] = gser.where(mask, gser.apply(del_group))
        self._reset_edf()

Functions

dicts2df

dicts2df(
    dicts: Union[List[Dict], List[List[Dict]]],
    rename: Dict[str, str] | None = None,
    item_id: str | None = "item_id",
) -> pd.DataFrame

Create a DataFrame from dictionaries or lists of dictionaries.

Converts a list of dictionaries or a list of lists of dictionaries into a single concatenated DataFrame with optional column renaming and indexing.

Parameters:

Name Type Description Default
dicts Union[List[Dict], List[List[Dict]]]

List of dictionaries or list of lists of dictionaries.

required
rename Dict[str, str] | None

Optional mapping of column names to rename {from: to}. Defaults to None.

None
item_id str | None

Name of column to add containing the list index. Set to None to skip adding this column. Defaults to "item_id".

'item_id'

Returns:

Type Description
DataFrame

pd.DataFrame: Concatenated DataFrame from all dictionaries.

Source code in packages/utils/src/dataknobs_utils/pandas_utils.py
def dicts2df(
    dicts: Union[List[Dict], List[List[Dict]]],
    rename: Dict[str, str] | None = None,
    item_id: str | None = "item_id",
) -> pd.DataFrame:
    """Create a DataFrame from dictionaries or lists of dictionaries.

    Converts a list of dictionaries or a list of lists of dictionaries into
    a single concatenated DataFrame with optional column renaming and indexing.

    Args:
        dicts: List of dictionaries or list of lists of dictionaries.
        rename: Optional mapping of column names to rename {from: to}.
            Defaults to None.
        item_id: Name of column to add containing the list index. Set to None
            to skip adding this column. Defaults to "item_id".

    Returns:
        pd.DataFrame: Concatenated DataFrame from all dictionaries.
    """
    dfs = [pd.DataFrame.from_records(rec) for rec in dicts]
    for idx, df in enumerate(dfs):
        if rename:
            dfs[idx] = df.rename(columns=rename)
        if item_id:
            dfs[idx][item_id] = idx
    df = pd.concat(dfs).reset_index(drop=True) if len(dfs) > 0 else pd.DataFrame()
    return df

explode_json_series

explode_json_series(json_ser: Series) -> pd.Series

Explode a Series containing JSON-encoded lists into individual items.

Parses JSON list strings and expands them so each list item becomes a separate row with a repeated index.

Parameters:

Name Type Description Default
json_ser Series

Series with values as JSON-encoded lists (e.g., "[1, 2, 3]").

required

Returns:

Type Description
Series

pd.Series: Exploded Series with individual list items as values.

Source code in packages/utils/src/dataknobs_utils/pandas_utils.py
def explode_json_series(json_ser: pd.Series) -> pd.Series:
    """Explode a Series containing JSON-encoded lists into individual items.

    Parses JSON list strings and expands them so each list item becomes a
    separate row with a repeated index.

    Args:
        json_ser: Series with values as JSON-encoded lists (e.g., "[1, 2, 3]").

    Returns:
        pd.Series: Exploded Series with individual list items as values.
    """
    result = json_ser[np.logical_and(json_ser.notna(), json_ser != "")].apply(json.loads).explode()
    return pd.Series(result) if not isinstance(result, pd.Series) else result

get_loc_range

get_loc_range(bool_ser: Series) -> Tuple[int, int]

Find the range of True values in a boolean Series.

Parameters:

Name Type Description Default
bool_ser Series

Boolean Series to analyze.

required

Returns:

Type Description
Tuple[int, int]

Tuple[int, int]: Tuple of (first_loc, last_loc) containing indices of first and last True values. Returns (0, 0) if no True values exist.

Source code in packages/utils/src/dataknobs_utils/pandas_utils.py
def get_loc_range(bool_ser: pd.Series) -> Tuple[int, int]:
    """Find the range of True values in a boolean Series.

    Args:
        bool_ser: Boolean Series to analyze.

    Returns:
        Tuple[int, int]: Tuple of (first_loc, last_loc) containing indices of
            first and last True values. Returns (0, 0) if no True values exist.
    """
    # Find all True positions
    true_positions = bool_ser[bool_ser].index

    if len(true_positions) == 0:
        # No True values, return (0, 0) or raise an error
        return (0, 0)

    # Convert to int (handling both integer indices and other types)
    # Use tolist() to convert index values to Python types
    first_result = int(true_positions.tolist()[0])
    last_result = int(true_positions.tolist()[-1])

    return (first_result, last_result)

sort_by_strlen

sort_by_strlen(
    df: DataFrame, text_col: str, ascending: bool = False
) -> pd.DataFrame

Sort DataFrame by string length in a text column.

Parameters:

Name Type Description Default
df DataFrame

DataFrame to sort.

required
text_col str

Name of the text column to sort by.

required
ascending bool

If True, sort shortest to longest; if False, longest to shortest. Defaults to False.

False

Returns:

Type Description
DataFrame

pd.DataFrame: Sorted DataFrame (original is not modified).

Source code in packages/utils/src/dataknobs_utils/pandas_utils.py
def sort_by_strlen(
    df: pd.DataFrame,
    text_col: str,
    ascending: bool = False,
) -> pd.DataFrame:
    """Sort DataFrame by string length in a text column.

    Args:
        df: DataFrame to sort.
        text_col: Name of the text column to sort by.
        ascending: If True, sort shortest to longest; if False, longest to
            shortest. Defaults to False.

    Returns:
        pd.DataFrame: Sorted DataFrame (original is not modified).
    """
    return df.loc[df[text_col].str.len().sort_values(ascending=ascending).index]

requests_utils

Classes and Functions

dataknobs_utils.requests_utils

HTTP request utilities for making API calls and handling responses.

Provides convenience functions for making HTTP requests with error handling, timeout management, and response parsing.

Classes:

Name Description
MockRequests

Mock requests library for testing.

MockResponse

Mock HTTP response for testing.

RequestHelper

Helper class for making HTTP requests to a server.

ServerResponse

Wrapper for HTTP response data with convenience properties.

Functions:

Name Description
delete_request

Execute an HTTP DELETE request and return the response.

get_current_ip

Get the running machine's IPv4 address.

get_request

Execute an HTTP GET request and return the response.

post_files_request

Execute an HTTP POST request with file uploads.

post_request

Execute an HTTP POST request and return the response.

put_request

Execute an HTTP PUT request and return the response.

Classes

MockRequests

MockRequests()

Mock requests library for testing.

Simulates the requests library by storing expected responses keyed by request parameters, allowing deterministic testing without network calls.

Attributes:

Name Type Description
responses Dict[str, MockResponse]

Dictionary of registered mock responses.

r404

Default 404 response for unregistered requests.

Methods:

Name Description
add

Register a mock response for specific request parameters.

Source code in packages/utils/src/dataknobs_utils/requests_utils.py
def __init__(self) -> None:
    self.responses: Dict[str, MockResponse] = {}
    self.r404 = MockResponse(404, '"Not found"')
Functions
add
add(
    response: MockResponse,
    api: str,
    api_request: str,
    data: Any | None = None,
    files: Any | None = None,
    headers: Dict[str, Any] | None = None,
    params: Dict[str, Any] | None = None,
    timeout: int = DEFAULT_TIMEOUT,
) -> None

Register a mock response for specific request parameters.

Parameters:

Name Type Description Default
response MockResponse

Mock response to return.

required
api str

Request method ('get', 'post', 'put', 'delete').

required
api_request str

Request URL.

required
data Any | None

Request body data. Defaults to None.

None
files Any | None

Request files. Defaults to None.

None
headers Dict[str, Any] | None

Request headers. Defaults to None.

None
params Dict[str, Any] | None

Query parameters. Defaults to None.

None
timeout int

Request timeout. Defaults to DEFAULT_TIMEOUT.

DEFAULT_TIMEOUT
Source code in packages/utils/src/dataknobs_utils/requests_utils.py
def add(
    self,
    response: MockResponse,
    api: str,
    api_request: str,
    data: Any | None = None,
    files: Any | None = None,
    headers: Dict[str, Any] | None = None,
    params: Dict[str, Any] | None = None,
    timeout: int = DEFAULT_TIMEOUT,
) -> None:
    """Register a mock response for specific request parameters.

    Args:
        response: Mock response to return.
        api: Request method ('get', 'post', 'put', 'delete').
        api_request: Request URL.
        data: Request body data. Defaults to None.
        files: Request files. Defaults to None.
        headers: Request headers. Defaults to None.
        params: Query parameters. Defaults to None.
        timeout: Request timeout. Defaults to DEFAULT_TIMEOUT.
    """
    key = self._make_key(
        api,
        api_request,
        data,
        files,
        headers,
        params,
        timeout,
    )
    self.responses[key] = response

MockResponse

MockResponse(status_code: int, result: Any)

Mock HTTP response for testing.

Simulates a requests.Response object with status code and result data.

Attributes:

Name Type Description
status_code

HTTP status code.

result

Response data.

text

Response as text (JSON-serialized if result is not a string).

Methods:

Name Description
to_server_response

Convert to a ServerResponse object.

Source code in packages/utils/src/dataknobs_utils/requests_utils.py
def __init__(self, status_code: int, result: Any) -> None:
    self.status_code = status_code
    self.result = result
    self.text = result
    if not isinstance(result, str):
        self.text = json.dumps(result)
Functions
to_server_response
to_server_response() -> ServerResponse

Convert to a ServerResponse object.

Returns:

Name Type Description
ServerResponse ServerResponse

Wrapped response object.

Source code in packages/utils/src/dataknobs_utils/requests_utils.py
def to_server_response(self) -> ServerResponse:
    """Convert to a ServerResponse object.

    Returns:
        ServerResponse: Wrapped response object.
    """
    return ServerResponse(self, self.result)  # type: ignore[arg-type]

RequestHelper

RequestHelper(
    server_ip: str,
    server_port: Union[str, int],
    api_response_handler: Callable = json_api_response_handler,
    headers: Dict[str, Any] | None = None,
    timeout: int = DEFAULT_TIMEOUT,
    mock_requests: Any | None = None,
)

Helper class for making HTTP requests to a server.

Simplifies sending API requests by managing server connection details, headers, timeouts, and response handling in a reusable instance.

Attributes:

Name Type Description
ip

Server IP address.

port

Server port number.

response_handler

Function for processing responses.

headers

Default HTTP headers.

timeout

Default request timeout in seconds.

requests

Requests library or mock for testing.

Initialize request helper with server details.

Parameters:

Name Type Description Default
server_ip str

Server IP address or hostname.

required
server_port Union[str, int]

Server port number.

required
api_response_handler Callable

Default response handler function. Defaults to json_api_response_handler.

json_api_response_handler
headers Dict[str, Any] | None

Default HTTP headers. Defaults to None.

None
timeout int

Default timeout in seconds. Defaults to DEFAULT_TIMEOUT.

DEFAULT_TIMEOUT
mock_requests Any | None

Mock requests object for testing. Defaults to None.

None

Methods:

Name Description
build_url

Construct full URL from path.

delete

Convenience method for DELETE requests.

get

Convenience method for GET requests.

head

Convenience method for HEAD requests.

post

Convenience method for POST requests.

put

Convenience method for PUT requests.

request

Execute an HTTP request of any type.

Source code in packages/utils/src/dataknobs_utils/requests_utils.py
def __init__(
    self,
    server_ip: str,
    server_port: Union[str, int],
    api_response_handler: Callable = json_api_response_handler,
    headers: Dict[str, Any] | None = None,
    timeout: int = DEFAULT_TIMEOUT,
    mock_requests: Any | None = None,
) -> None:
    """Initialize request helper with server details.

    Args:
        server_ip: Server IP address or hostname.
        server_port: Server port number.
        api_response_handler: Default response handler function.
            Defaults to json_api_response_handler.
        headers: Default HTTP headers. Defaults to None.
        timeout: Default timeout in seconds. Defaults to DEFAULT_TIMEOUT.
        mock_requests: Mock requests object for testing. Defaults to None.
    """
    self.ip = server_ip
    self.port = server_port
    self.response_handler = api_response_handler
    self.headers = headers
    self.timeout = timeout
    self.requests = mock_requests if mock_requests else requests
Functions
build_url
build_url(path: str) -> str

Construct full URL from path.

Parameters:

Name Type Description Default
path str

API path (without leading slash recommended).

required

Returns:

Name Type Description
str str

Complete URL (http://ip:port/path).

Source code in packages/utils/src/dataknobs_utils/requests_utils.py
def build_url(self, path: str) -> str:
    """Construct full URL from path.

    Args:
        path: API path (without leading slash recommended).

    Returns:
        str: Complete URL (http://ip:port/path).
    """
    return f"http://{self.ip}:{self.port}/{path}"
delete
delete(
    path: str,
    params: Dict[str, Any] | None = None,
    headers: Dict[str, Any] | None = None,
    timeout: int | None = None,
    verbose: bool = False,
) -> Any

Convenience method for DELETE requests.

Parameters:

Name Type Description Default
path str

API path

required
params Dict[str, Any] | None

Optional query parameters

None
headers Dict[str, Any] | None

Optional headers

None
timeout int | None

Optional timeout

None
verbose bool

Whether to print debug info

False

Returns:

Type Description
Any

ServerResponse object

Source code in packages/utils/src/dataknobs_utils/requests_utils.py
def delete(
    self,
    path: str,
    params: Dict[str, Any] | None = None,
    headers: Dict[str, Any] | None = None,
    timeout: int | None = None,
    verbose: bool = False,
) -> Any:
    """Convenience method for DELETE requests.

    Args:
        path: API path
        params: Optional query parameters
        headers: Optional headers
        timeout: Optional timeout
        verbose: Whether to print debug info

    Returns:
        ServerResponse object
    """
    return self.request(
        "delete",
        path,
        params=params,
        headers=headers,
        timeout=timeout,
        verbose=verbose,
    )
get
get(
    path: str,
    params: Dict[str, Any] | None = None,
    headers: Dict[str, Any] | None = None,
    timeout: int | None = None,
    verbose: bool = False,
) -> Any

Convenience method for GET requests.

Parameters:

Name Type Description Default
path str

API path

required
params Dict[str, Any] | None

Optional query parameters

None
headers Dict[str, Any] | None

Optional headers

None
timeout int | None

Optional timeout

None
verbose bool

Whether to print debug info

False

Returns:

Type Description
Any

ServerResponse object

Source code in packages/utils/src/dataknobs_utils/requests_utils.py
def get(
    self,
    path: str,
    params: Dict[str, Any] | None = None,
    headers: Dict[str, Any] | None = None,
    timeout: int | None = None,
    verbose: bool = False,
) -> Any:
    """Convenience method for GET requests.

    Args:
        path: API path
        params: Optional query parameters
        headers: Optional headers
        timeout: Optional timeout
        verbose: Whether to print debug info

    Returns:
        ServerResponse object
    """
    return self.request(
        "get",
        path,
        params=params,
        headers=headers,
        timeout=timeout,
        verbose=verbose,
    )
head
head(
    path: str,
    params: Dict[str, Any] | None = None,
    headers: Dict[str, Any] | None = None,
    timeout: int | None = None,
    verbose: bool = False,
) -> Any

Convenience method for HEAD requests.

Parameters:

Name Type Description Default
path str

API path

required
params Dict[str, Any] | None

Optional query parameters

None
headers Dict[str, Any] | None

Optional headers

None
timeout int | None

Optional timeout

None
verbose bool

Whether to print debug info

False

Returns:

Type Description
Any

ServerResponse object

Source code in packages/utils/src/dataknobs_utils/requests_utils.py
def head(
    self,
    path: str,
    params: Dict[str, Any] | None = None,
    headers: Dict[str, Any] | None = None,
    timeout: int | None = None,
    verbose: bool = False,
) -> Any:
    """Convenience method for HEAD requests.

    Args:
        path: API path
        params: Optional query parameters
        headers: Optional headers
        timeout: Optional timeout
        verbose: Whether to print debug info

    Returns:
        ServerResponse object
    """
    return self.request(
        "head",
        path,
        params=params,
        headers=headers,
        timeout=timeout,
        verbose=verbose,
    )
post
post(
    path: str,
    payload: Dict[str, Any] | None = None,
    params: Dict[str, Any] | None = None,
    files: Dict[str, Any] | None = None,
    headers: Dict[str, Any] | None = None,
    timeout: int | None = None,
    verbose: bool = False,
) -> Any

Convenience method for POST requests.

Parameters:

Name Type Description Default
path str

API path

required
payload Dict[str, Any] | None

Optional request body

None
params Dict[str, Any] | None

Optional query parameters

None
files Dict[str, Any] | None

Optional files to upload

None
headers Dict[str, Any] | None

Optional headers

None
timeout int | None

Optional timeout

None
verbose bool

Whether to print debug info

False

Returns:

Type Description
Any

ServerResponse object

Source code in packages/utils/src/dataknobs_utils/requests_utils.py
def post(
    self,
    path: str,
    payload: Dict[str, Any] | None = None,
    params: Dict[str, Any] | None = None,
    files: Dict[str, Any] | None = None,
    headers: Dict[str, Any] | None = None,
    timeout: int | None = None,
    verbose: bool = False,
) -> Any:
    """Convenience method for POST requests.

    Args:
        path: API path
        payload: Optional request body
        params: Optional query parameters
        files: Optional files to upload
        headers: Optional headers
        timeout: Optional timeout
        verbose: Whether to print debug info

    Returns:
        ServerResponse object
    """
    return self.request(
        "post",
        path,
        payload=payload,
        params=params,
        files=files,
        headers=headers,
        timeout=timeout,
        verbose=verbose,
    )
put
put(
    path: str,
    payload: Dict[str, Any] | None = None,
    params: Dict[str, Any] | None = None,
    headers: Dict[str, Any] | None = None,
    timeout: int | None = None,
    verbose: bool = False,
) -> Any

Convenience method for PUT requests.

Parameters:

Name Type Description Default
path str

API path

required
payload Dict[str, Any] | None

Optional request body

None
params Dict[str, Any] | None

Optional query parameters

None
headers Dict[str, Any] | None

Optional headers

None
timeout int | None

Optional timeout

None
verbose bool

Whether to print debug info

False

Returns:

Type Description
Any

ServerResponse object

Source code in packages/utils/src/dataknobs_utils/requests_utils.py
def put(
    self,
    path: str,
    payload: Dict[str, Any] | None = None,
    params: Dict[str, Any] | None = None,
    headers: Dict[str, Any] | None = None,
    timeout: int | None = None,
    verbose: bool = False,
) -> Any:
    """Convenience method for PUT requests.

    Args:
        path: API path
        payload: Optional request body
        params: Optional query parameters
        headers: Optional headers
        timeout: Optional timeout
        verbose: Whether to print debug info

    Returns:
        ServerResponse object
    """
    return self.request(
        "put",
        path,
        payload=payload,
        params=params,
        headers=headers,
        timeout=timeout,
        verbose=verbose,
    )
request
request(
    rtype: str,
    path: str,
    payload: Dict[str, Any] | None = None,
    params: Dict[str, Any] | None = None,
    files: Dict[str, Any] | None = None,
    response_handler: Callable | None = None,
    headers: Dict[str, Any] | None = None,
    timeout: int = 0,
    verbose: Union[bool, Any] = True,
) -> ServerResponse

Execute an HTTP request of any type.

Parameters:

Name Type Description Default
rtype str

Request type - one of: 'get', 'post', 'post-files', 'put', 'delete', 'head'.

required
path str

API path portion (will be appended to server URL).

required
payload Dict[str, Any] | None

Request body data. Defaults to None.

None
params Dict[str, Any] | None

Query parameters. Defaults to None.

None
files Dict[str, Any] | None

Files for upload (for post-files requests). Defaults to None.

None
response_handler Callable | None

Response handler to override instance default. Defaults to None.

None
headers Dict[str, Any] | None

Headers to override instance default. Defaults to None.

None
timeout int

Timeout override in seconds (0 uses instance default). Defaults to 0.

0
verbose Union[bool, Any]

If True, prints response to stderr. If a file object, prints to that stream. Defaults to True.

True

Returns:

Name Type Description
ServerResponse ServerResponse

Response object with status and parsed data.

Source code in packages/utils/src/dataknobs_utils/requests_utils.py
def request(
    self,
    rtype: str,
    path: str,
    payload: Dict[str, Any] | None = None,
    params: Dict[str, Any] | None = None,
    files: Dict[str, Any] | None = None,
    response_handler: Callable | None = None,
    headers: Dict[str, Any] | None = None,
    timeout: int = 0,
    verbose: Union[bool, Any] = True,
) -> ServerResponse:
    """Execute an HTTP request of any type.

    Args:
        rtype: Request type - one of: 'get', 'post', 'post-files', 'put',
            'delete', 'head'.
        path: API path portion (will be appended to server URL).
        payload: Request body data. Defaults to None.
        params: Query parameters. Defaults to None.
        files: Files for upload (for post-files requests). Defaults to None.
        response_handler: Response handler to override instance default.
            Defaults to None.
        headers: Headers to override instance default. Defaults to None.
        timeout: Timeout override in seconds (0 uses instance default).
            Defaults to 0.
        verbose: If True, prints response to stderr. If a file object,
            prints to that stream. Defaults to True.

    Returns:
        ServerResponse: Response object with status and parsed data.
    """
    rtype = rtype.lower()
    if timeout == 0:
        timeout = self.timeout
    if headers is None:
        headers = self.headers
    if response_handler is None:
        response_handler = self.response_handler
    url = self.build_url(path)
    resp, result = None, None
    if rtype == "get":
        resp, result = get_request(
            url,
            params=params,
            headers=headers,
            timeout=timeout,
            api_response_handler=response_handler
            if response_handler is not None
            else self.response_handler,
            requests=self.requests,
        )
    elif rtype == "post":
        resp, result = post_request(
            url,
            payload if payload is not None else {},
            params=params,
            headers=headers,
            timeout=timeout,
            api_response_handler=response_handler,
            requests=self.requests,
        )
    elif rtype == "post-files":
        resp, result = post_files_request(
            url,
            files=files if files is not None else {},
            headers=headers,
            timeout=timeout,
            api_response_handler=response_handler,
            requests=self.requests,
        )
    elif rtype == "put":
        resp, result = put_request(
            url,
            payload if payload is not None else {},
            params=params,
            headers=headers,
            timeout=timeout,
            api_response_handler=response_handler,
            requests=self.requests,
        )
    elif rtype == "delete":
        resp, result = delete_request(
            url,
            params=params,
            headers=headers,
            timeout=timeout,
            api_response_handler=response_handler,
            requests=self.requests,
        )
    elif rtype == "head":
        # HEAD requests are like GET but without body
        try:
            resp = self.requests.head(
                url,
                params=params,
                headers=headers,
                timeout=timeout,
            )
            # HEAD requests don't have a body, but we still need to process the response
            result = {}  # Empty dict for HEAD response
            if response_handler:
                resp, result = response_handler(resp)
        except Exception:
            resp = None
            result = None

    rv = ServerResponse(resp, result)

    if verbose is not None:
        if isinstance(verbose, bool) and verbose:
            verbose = sys.stderr
        else:
            verbose = None
        if verbose is not None:
            print(rv, file=verbose)

    return rv

ServerResponse

ServerResponse(resp: Response | None, result: Any)

Wrapper for HTTP response data with convenience properties.

Encapsulates response data from HTTP requests, providing easy access to status codes, JSON data, and response text with consistent interface.

Attributes:

Name Type Description
resp

The underlying requests Response object.

result

Parsed response data (typically JSON).

Methods:

Name Description
add_extra

Add additional metadata to the response.

has_extra

Check if extra data has been added.

Source code in packages/utils/src/dataknobs_utils/requests_utils.py
def __init__(self, resp: requests.models.Response | None, result: Any) -> None:
    self.resp = resp
    self.result = result
    self._extra: Dict[str, Any] | None = None
Attributes
extra property
extra: Dict[str, Any]

Get the extra data dictionary.

Lazily initializes an empty dictionary for storing additional metadata.

Returns:

Type Description
Dict[str, Any]

Dict[str, Any]: Extra data dictionary.

json property
json: Any

Get the parsed JSON response data.

Alias for the result attribute.

Returns:

Name Type Description
Any Any

Parsed response data.

status property
status: int | None

Get the HTTP status code.

Returns:

Type Description
int | None

int | None: Status code, or None if no response.

status_code property
status_code: int | None

Get the HTTP status code (alias for status).

Provided for consistency with requests.Response interface.

Returns:

Type Description
int | None

int | None: Status code, or None if no response.

succeeded property
succeeded: bool

Check if the request succeeded (status 200 or 201).

Returns:

Name Type Description
bool bool

True if status code is 200 or 201, False otherwise.

text property
text: str

Get the response as text.

Returns:

Name Type Description
str str

Response text (JSON-serialized if result is not a string).

Functions
add_extra
add_extra(key: str, value: Any) -> None

Add additional metadata to the response.

Parameters:

Name Type Description Default
key str

Metadata key.

required
value Any

Metadata value.

required
Source code in packages/utils/src/dataknobs_utils/requests_utils.py
def add_extra(self, key: str, value: Any) -> None:
    """Add additional metadata to the response.

    Args:
        key: Metadata key.
        value: Metadata value.
    """
    self.extra[key] = value
has_extra
has_extra() -> bool

Check if extra data has been added.

Returns:

Name Type Description
bool bool

True if extra data exists and is non-empty.

Source code in packages/utils/src/dataknobs_utils/requests_utils.py
def has_extra(self) -> bool:
    """Check if extra data has been added.

    Returns:
        bool: True if extra data exists and is non-empty.
    """
    return self._extra is not None and len(self._extra) > 0

Functions

delete_request

delete_request(
    api_request: str,
    params: Dict[str, Any] | None = None,
    headers: Dict[str, Any] | None = None,
    timeout: int = DEFAULT_TIMEOUT,
    api_response_handler: Callable[
        [Response], Tuple[Response, Any]
    ] = default_api_response_handler,
    requests: Any = requests,
) -> Tuple[requests.models.Response, Any]

Execute an HTTP DELETE request and return the response.

Parameters:

Name Type Description Default
api_request str

Full URL for the API request.

required
params Dict[str, Any] | None

Query parameters for the request. Defaults to None.

None
headers Dict[str, Any] | None

HTTP headers. If None, uses HEADERS constant. Defaults to None.

None
timeout int

Request timeout in seconds. Defaults to DEFAULT_TIMEOUT.

DEFAULT_TIMEOUT
api_response_handler Callable[[Response], Tuple[Response, Any]]

Function to process the response. Defaults to json_api_response_handler.

default_api_response_handler
requests Any

Arg for alternate or "mock" requests package override

requests

Returns:

Type Description
Tuple[Response, Any]

Tuple[requests.models.Response, Any]: Tuple of (response object, parsed result).

Source code in packages/utils/src/dataknobs_utils/requests_utils.py
def delete_request(
    api_request: str,
    params: Dict[str, Any] | None = None,
    headers: Dict[str, Any] | None = None,
    timeout: int = DEFAULT_TIMEOUT,
    api_response_handler: Callable[
        [requests.models.Response], Tuple[requests.models.Response, Any]
    ] = default_api_response_handler,
    requests: Any = requests,  # pylint: disable-msg=W0621
) -> Tuple[requests.models.Response, Any]:
    """Execute an HTTP DELETE request and return the response.

    Args:
        api_request: Full URL for the API request.
        params: Query parameters for the request. Defaults to None.
        headers: HTTP headers. If None, uses HEADERS constant. Defaults to None.
        timeout: Request timeout in seconds. Defaults to DEFAULT_TIMEOUT.
        api_response_handler: Function to process the response. Defaults to
            json_api_response_handler.
        requests: Arg for alternate or "mock" requests package override

    Returns:
        Tuple[requests.models.Response, Any]: Tuple of (response object, parsed result).
    """
    if headers is None:
        headers = HEADERS
    return api_response_handler(
        requests.delete(api_request, headers=headers, params=params, timeout=timeout)
    )

get_current_ip

get_current_ip() -> str

Get the running machine's IPv4 address.

Attempts to determine the local IP address by: 1. Connecting to an external service (Google DNS) to find the outbound IP 2. Falling back to hostname resolution 3. Finally returning localhost as last resort

Returns:

Name Type Description
str str

The machine's IPv4 address (or "127.0.0.1" if detection fails).

Source code in packages/utils/src/dataknobs_utils/requests_utils.py
def get_current_ip() -> str:
    """Get the running machine's IPv4 address.

    Attempts to determine the local IP address by:
    1. Connecting to an external service (Google DNS) to find the outbound IP
    2. Falling back to hostname resolution
    3. Finally returning localhost as last resort

    Returns:
        str: The machine's IPv4 address (or "127.0.0.1" if detection fails).
    """
    try:
        # Create a socket and connect to an external service to get the local IP
        # We use Google's DNS server (8.8.8.8) as it's widely available
        # Note: This doesn't actually send any DNS queries, just establishes a connection
        with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
            s.connect(("8.8.8.8", 80))
            return s.getsockname()[0]
    except Exception:
        # Fallback to hostname resolution
        try:
            return socket.gethostbyname(socket.gethostname())
        except socket.gaierror:
            # If hostname doesn't resolve (common on macOS), return localhost
            return "127.0.0.1"

get_request

get_request(
    api_request: str,
    params: Dict[str, Any] | None = None,
    headers: Dict[str, Any] | None = None,
    timeout: int = DEFAULT_TIMEOUT,
    api_response_handler: Callable[
        [Response], Tuple[Response, Any]
    ] = default_api_response_handler,
    requests: Any = requests,
) -> Tuple[requests.models.Response, Any]

Execute an HTTP GET request and return the response.

Parameters:

Name Type Description Default
api_request str

Full URL for the API request.

required
params Dict[str, Any] | None

Query parameters for the request. Defaults to None.

None
headers Dict[str, Any] | None

HTTP headers. If None, uses HEADERS constant. Defaults to None.

None
timeout int

Request timeout in seconds. Defaults to DEFAULT_TIMEOUT.

DEFAULT_TIMEOUT
api_response_handler Callable[[Response], Tuple[Response, Any]]

Function to process the response. Defaults to json_api_response_handler.

default_api_response_handler
requests Any

Requests library or mock for testing. Defaults to requests.

requests

Returns:

Type Description
Tuple[Response, Any]

Tuple[requests.models.Response, Any]: Tuple of (response object, parsed result).

Source code in packages/utils/src/dataknobs_utils/requests_utils.py
def get_request(
    api_request: str,
    params: Dict[str, Any] | None = None,
    headers: Dict[str, Any] | None = None,
    timeout: int = DEFAULT_TIMEOUT,
    api_response_handler: Callable[
        [requests.models.Response], Tuple[requests.models.Response, Any]
    ] = default_api_response_handler,
    requests: Any = requests,  # pylint: disable-msg=W0621
) -> Tuple[requests.models.Response, Any]:
    """Execute an HTTP GET request and return the response.

    Args:
        api_request: Full URL for the API request.
        params: Query parameters for the request. Defaults to None.
        headers: HTTP headers. If None, uses HEADERS constant. Defaults to None.
        timeout: Request timeout in seconds. Defaults to DEFAULT_TIMEOUT.
        api_response_handler: Function to process the response. Defaults to
            json_api_response_handler.
        requests: Requests library or mock for testing. Defaults to requests.

    Returns:
        Tuple[requests.models.Response, Any]: Tuple of (response object, parsed result).
    """
    if headers is None:
        headers = HEADERS
    return api_response_handler(
        requests.get(api_request, headers=headers, params=params, timeout=timeout)
    )

post_files_request

post_files_request(
    api_request: str,
    files: Dict[str, Any],
    headers: Dict[str, Any] | None = None,
    timeout: int = DEFAULT_TIMEOUT,
    api_response_handler: Callable[
        [Response], Tuple[Response, Any]
    ] = default_api_response_handler,
    requests: Any = requests,
) -> Tuple[requests.models.Response, Any]

Execute an HTTP POST request with file uploads.

Parameters:

Name Type Description Default
api_request str

Full URL for the API request.

required
files Dict[str, Any]

Dictionary of {file_id: file_data} entries, where file_data can be: - Simple: open('report.xls', 'rb') (must open in binary mode) - Detailed: ('filename', file_object, 'content_type', {'headers': 'values'})

required
headers Dict[str, Any] | None

HTTP headers. Defaults to None.

None
timeout int

Request timeout in seconds. Defaults to DEFAULT_TIMEOUT.

DEFAULT_TIMEOUT
api_response_handler Callable[[Response], Tuple[Response, Any]]

Function to process the response. Defaults to json_api_response_handler.

default_api_response_handler
requests Any

Requests library or mock for testing. Defaults to requests.

requests

Returns:

Type Description
Tuple[Response, Any]

Tuple[requests.models.Response, Any]: Tuple of (response object, parsed result).

Examples:

from dataknobs_utils.requests_utils import post_files_request
url = "http://example.com/upload"

# Simple file upload
with open('report.xls', 'rb') as f:
    post_files_request(url, {'myfile': f})

# With metadata
with open('report.xls', 'rb') as f:
    post_files_request(url, {
        'myfile': ('report.xls', f,
                   'application/vnd.ms-excel', {'Expires': '0'})
    })
Source code in packages/utils/src/dataknobs_utils/requests_utils.py
def post_files_request(
    api_request: str,
    files: Dict[str, Any],
    headers: Dict[str, Any] | None = None,
    timeout: int = DEFAULT_TIMEOUT,
    api_response_handler: Callable[
        [requests.models.Response], Tuple[requests.models.Response, Any]
    ] = default_api_response_handler,
    requests: Any = requests,  # pylint: disable-msg=W0621
) -> Tuple[requests.models.Response, Any]:
    """Execute an HTTP POST request with file uploads.

    Args:
        api_request: Full URL for the API request.
        files: Dictionary of {file_id: file_data} entries, where file_data can be:
            - Simple: open('report.xls', 'rb') (must open in binary mode)
            - Detailed: ('filename', file_object, 'content_type', {'headers': 'values'})
        headers: HTTP headers. Defaults to None.
        timeout: Request timeout in seconds. Defaults to DEFAULT_TIMEOUT.
        api_response_handler: Function to process the response. Defaults to
            json_api_response_handler.
        requests: Requests library or mock for testing. Defaults to requests.

    Returns:
        Tuple[requests.models.Response, Any]: Tuple of (response object, parsed result).

    Examples:
        ```python
        from dataknobs_utils.requests_utils import post_files_request
        url = "http://example.com/upload"

        # Simple file upload
        with open('report.xls', 'rb') as f:
            post_files_request(url, {'myfile': f})

        # With metadata
        with open('report.xls', 'rb') as f:
            post_files_request(url, {
                'myfile': ('report.xls', f,
                           'application/vnd.ms-excel', {'Expires': '0'})
            })
        ```
    """
    return api_response_handler(
        requests.post(api_request, files=files, headers=headers, timeout=timeout)
    )

post_request

post_request(
    api_request: str,
    payload: Dict[str, Any],
    params: Dict[str, Any] | None = None,
    headers: Dict[str, Any] | None = None,
    timeout: int = DEFAULT_TIMEOUT,
    api_response_handler: Callable[
        [Response], Tuple[Response, Any]
    ] = default_api_response_handler,
    requests: Any = requests,
) -> Tuple[requests.models.Response, Any]

Execute an HTTP POST request and return the response.

Parameters:

Name Type Description Default
api_request str

Full URL for the API request.

required
payload Dict[str, Any]

Request body data to send.

required
params Dict[str, Any] | None

Query parameters for the request. Defaults to None.

None
headers Dict[str, Any] | None

HTTP headers. If None, uses HEADERS constant. Defaults to None.

None
timeout int

Request timeout in seconds. Defaults to DEFAULT_TIMEOUT.

DEFAULT_TIMEOUT
api_response_handler Callable[[Response], Tuple[Response, Any]]

Function to process the response. Defaults to json_api_response_handler.

default_api_response_handler
requests Any

Requests library or mock for testing. Defaults to requests.

requests

Returns:

Type Description
Tuple[Response, Any]

Tuple[requests.models.Response, Any]: Tuple of (response object, parsed result).

Source code in packages/utils/src/dataknobs_utils/requests_utils.py
def post_request(
    api_request: str,
    payload: Dict[str, Any],
    params: Dict[str, Any] | None = None,
    headers: Dict[str, Any] | None = None,
    timeout: int = DEFAULT_TIMEOUT,
    api_response_handler: Callable[
        [requests.models.Response], Tuple[requests.models.Response, Any]
    ] = default_api_response_handler,
    requests: Any = requests,  # pylint: disable-msg=W0621
) -> Tuple[requests.models.Response, Any]:
    """Execute an HTTP POST request and return the response.

    Args:
        api_request: Full URL for the API request.
        payload: Request body data to send.
        params: Query parameters for the request. Defaults to None.
        headers: HTTP headers. If None, uses HEADERS constant. Defaults to None.
        timeout: Request timeout in seconds. Defaults to DEFAULT_TIMEOUT.
        api_response_handler: Function to process the response. Defaults to
            json_api_response_handler.
        requests: Requests library or mock for testing. Defaults to requests.

    Returns:
        Tuple[requests.models.Response, Any]: Tuple of (response object, parsed result).
    """
    if headers is None:
        headers = HEADERS
    return api_response_handler(
        requests.post(
            api_request,
            data=payload,
            headers=headers,
            params=params,
            timeout=timeout,
        )
    )

put_request

put_request(
    api_request: str,
    payload: Dict[str, Any],
    params: Dict[str, Any] | None = None,
    headers: Dict[str, Any] | None = None,
    timeout: int = DEFAULT_TIMEOUT,
    api_response_handler: Callable[
        [Response], Tuple[Response, Any]
    ] = default_api_response_handler,
    requests: Any = requests,
) -> Tuple[requests.models.Response, Any]

Execute an HTTP PUT request and return the response.

Parameters:

Name Type Description Default
api_request str

Full URL for the API request.

required
payload Dict[str, Any]

Request body data to send.

required
params Dict[str, Any] | None

Query parameters for the request. Defaults to None.

None
headers Dict[str, Any] | None

HTTP headers. If None, uses HEADERS constant. Defaults to None.

None
timeout int

Request timeout in seconds. Defaults to DEFAULT_TIMEOUT.

DEFAULT_TIMEOUT
api_response_handler Callable[[Response], Tuple[Response, Any]]

Function to process the response. Defaults to json_api_response_handler.

default_api_response_handler
requests Any

Requests library or mock for testing. Defaults to requests.

requests

Returns:

Type Description
Tuple[Response, Any]

Tuple[requests.models.Response, Any]: Tuple of (response object, parsed result).

Source code in packages/utils/src/dataknobs_utils/requests_utils.py
def put_request(
    api_request: str,
    payload: Dict[str, Any],
    params: Dict[str, Any] | None = None,
    headers: Dict[str, Any] | None = None,
    timeout: int = DEFAULT_TIMEOUT,
    api_response_handler: Callable[
        [requests.models.Response], Tuple[requests.models.Response, Any]
    ] = default_api_response_handler,
    requests: Any = requests,  # pylint: disable-msg=W0621
) -> Tuple[requests.models.Response, Any]:
    """Execute an HTTP PUT request and return the response.

    Args:
        api_request: Full URL for the API request.
        payload: Request body data to send.
        params: Query parameters for the request. Defaults to None.
        headers: HTTP headers. If None, uses HEADERS constant. Defaults to None.
        timeout: Request timeout in seconds. Defaults to DEFAULT_TIMEOUT.
        api_response_handler: Function to process the response. Defaults to
            json_api_response_handler.
        requests: Requests library or mock for testing. Defaults to requests.

    Returns:
        Tuple[requests.models.Response, Any]: Tuple of (response object, parsed result).
    """
    if headers is None:
        headers = HEADERS
    return api_response_handler(
        requests.put(
            api_request,
            data=payload,
            headers=headers,
            params=params,
            timeout=timeout,
        )
    )

resource_utils

Functions

dataknobs_utils.resource_utils

General data utilities for working with 3rd party resources

Functions:

Name Description
active_datadir

Get the active data directory from available locations.

download_nltk_resources

Download NLTK resources that don't yet exist locally.

get_nltk_resources_dir

Get the NLTK resources directory and optionally download resources.

get_nltk_wordnet

Get NLTK's WordNet corpus, ensuring resources are downloaded.

Functions

active_datadir

active_datadir() -> str | None

Get the active data directory from available locations.

Searches for an existing data directory in the following order: 1. DATADIR environment variable 2. $HOME/data directory 3. /data directory

Note

An active data directory must exist to get a non-None result.

Returns:

Type Description
str | None

str | None: Path to the active data directory, or None if not found.

Source code in packages/utils/src/dataknobs_utils/resource_utils.py
def active_datadir() -> str | None:
    """Get the active data directory from available locations.

    Searches for an existing data directory in the following order:
    1. DATADIR environment variable
    2. $HOME/data directory
    3. /data directory

    Note:
        An active data directory must exist to get a non-None result.

    Returns:
        str | None: Path to the active data directory, or None if not found.
    """
    if _CACHE["datadir"] is None:
        _CACHE["datadir"] = os.environ.get("DATADIR", os.environ.get("HOME", "") + "/data")
        if not os.path.exists(_CACHE["datadir"]) and os.path.exists("/data"):
            _CACHE["datadir"] = "/data"
    return _CACHE["datadir"]

download_nltk_resources

download_nltk_resources(
    resources: Dict[str, str] | None,
    resources_dir: str | None = None,
    verbose: bool = False,
    downloader: Callable = nltk.download,
) -> None

Download NLTK resources that don't yet exist locally.

Parameters:

Name Type Description Default
resources Dict[str, str] | None

Dictionary mapping resource names to their relative paths (relative to resources_dir).

required
resources_dir str | None

Root directory for resources. If None, uses the default from get_nltk_resources_dir(). Defaults to None.

None
verbose bool

If True, prints download status messages. Defaults to False.

False
downloader Callable

Callable for downloading resources. Defaults to nltk.download.

download
Source code in packages/utils/src/dataknobs_utils/resource_utils.py
def download_nltk_resources(
    resources: Dict[str, str] | None,
    resources_dir: str | None = None,
    verbose: bool = False,
    downloader: Callable = nltk.download,
) -> None:
    """Download NLTK resources that don't yet exist locally.

    Args:
        resources: Dictionary mapping resource names to their relative paths
            (relative to resources_dir).
        resources_dir: Root directory for resources. If None, uses the default
            from get_nltk_resources_dir(). Defaults to None.
        verbose: If True, prints download status messages. Defaults to False.
        downloader: Callable for downloading resources. Defaults to nltk.download.
    """
    if resources is not None:
        if resources_dir is None:
            resources_dir = get_nltk_resources_dir()
        if resources_dir is None:
            return  # Can't download without a resources directory
        for resource, relpath in resources.items():
            respath = str(Path(resources_dir) / relpath)
            if not os.path.exists(respath):
                if verbose:
                    print(f"NOTE: {respath} does not exist. Downloading...", file=sys.stderr)
                downloader(resource, download_dir=resources_dir)

get_nltk_resources_dir

get_nltk_resources_dir(
    resources: Dict[str, str] | None = None,
    verbose: bool = False,
    downloader: Callable = nltk.download,
) -> str | None

Get the NLTK resources directory and optionally download resources.

Determines the NLTK resources directory from: 1. NLTK_DATA environment variable 2. active_datadir()/NLTK_RESOURCES_PATH

If resources are specified, downloads any missing resources to the directory.

Note

An active DATADIR must exist to get a non-None result.

Parameters:

Name Type Description Default
resources Dict[str, str] | None

Optional dictionary mapping resource names to their relative paths (relative to NLTK resources dir) to ensure are downloaded.

None
verbose bool

If True, prints status messages. Defaults to False.

False
downloader Callable

Callable for downloading resources. Defaults to nltk.download.

download

Returns:

Type Description
str | None

str | None: Path to NLTK resources directory, or None if not found.

Source code in packages/utils/src/dataknobs_utils/resource_utils.py
def get_nltk_resources_dir(
    resources: Dict[str, str] | None = None,
    verbose: bool = False,
    downloader: Callable = nltk.download,
) -> str | None:
    """Get the NLTK resources directory and optionally download resources.

    Determines the NLTK resources directory from:
    1. NLTK_DATA environment variable
    2. active_datadir()/NLTK_RESOURCES_PATH

    If resources are specified, downloads any missing resources to the directory.

    Note:
        An active DATADIR must exist to get a non-None result.

    Args:
        resources: Optional dictionary mapping resource names to their relative
            paths (relative to NLTK resources dir) to ensure are downloaded.
        verbose: If True, prints status messages. Defaults to False.
        downloader: Callable for downloading resources. Defaults to nltk.download.

    Returns:
        str | None: Path to NLTK resources directory, or None if not found.
    """
    if _CACHE["nltk_resources_dir"] is None:
        _CACHE["nltk_resources_dir"] = os.environ.get("NLTK_DATA", None)
        if _CACHE["nltk_resources_dir"] is None:
            datadir = active_datadir()
            if datadir is not None:
                resdir = str(Path(datadir) / NLTK_RESOURCES_PATH)
                _CACHE["nltk_resources_dir"] = resdir
                os.environ["NLTK_DATA"] = resdir
                nltk.data.path.append(resdir)
                if resources is not None:
                    download_nltk_resources(
                        resources,
                        resources_dir=resdir,
                        verbose=verbose,
                        downloader=downloader,
                    )
    return _CACHE["nltk_resources_dir"]

get_nltk_wordnet

get_nltk_wordnet(downloader: Callable = nltk.download) -> Any

Get NLTK's WordNet corpus, ensuring resources are downloaded.

Automatically downloads required WordNet resources if not already present.

Parameters:

Name Type Description Default
downloader Callable

Callable for downloading resources. Defaults to nltk.download.

download

Returns:

Type Description
Any

nltk.corpus.wordnet: The NLTK WordNet corpus object, or None if resources directory cannot be determined.

Source code in packages/utils/src/dataknobs_utils/resource_utils.py
def get_nltk_wordnet(downloader: Callable = nltk.download) -> Any:
    """Get NLTK's WordNet corpus, ensuring resources are downloaded.

    Automatically downloads required WordNet resources if not already present.

    Args:
        downloader: Callable for downloading resources. Defaults to nltk.download.

    Returns:
        nltk.corpus.wordnet: The NLTK WordNet corpus object, or None if
            resources directory cannot be determined.
    """
    # Make sure resources have been downloaded
    if _CACHE["nltk_wn"] is None:
        if get_nltk_resources_dir(resources=NLTK_RESOURCES, downloader=downloader) is not None:
            from nltk.corpus import wordnet as wn

            _CACHE["nltk_wn"] = wn
    return _CACHE["nltk_wn"]

sql_utils

Functions

dataknobs_utils.sql_utils

SQL database utility functions and connection management.

Provides utilities for working with SQL databases including PostgreSQL, with support for connection management, query execution, and data loading.

Classes:

Name Description
DataFrameRecordFetcher

Fetch records from a pandas DataFrame by ID.

DictionaryRecordFetcher

Fetch records from a dictionary mapping IDs to record values.

DotenvPostgresConnector

PostgreSQL connection manager using environment variables and project vars.

PostgresDB

PostgreSQL database wrapper with utilities for querying and managing tables.

PostgresRecordFetcher

Fetch records from a PostgreSQL table by ID.

RecordFetcher

Abstract base class for fetching records from a data source.

Classes

DataFrameRecordFetcher

DataFrameRecordFetcher(
    df: DataFrame,
    id_field_name: str = "id",
    fields_to_retrieve: List[str] | None = None,
    one_based_ids: bool = False,
)

Bases: RecordFetcher

Fetch records from a pandas DataFrame by ID.

Attributes:

Name Type Description
df

DataFrame containing records to fetch from.

Initialize DataFrame record fetcher.

Parameters:

Name Type Description Default
df DataFrame

DataFrame containing records.

required
id_field_name str

Name of the integer ID field. Defaults to "id".

'id'
fields_to_retrieve List[str] | None

Subset of fields to retrieve. If None, retrieves all fields. Defaults to None.

None
one_based_ids bool

True if DataFrame uses 1-based IDs. Defaults to False.

False

Methods:

Name Description
get_records

Fetch records from DataFrame by IDs.

Source code in packages/utils/src/dataknobs_utils/sql_utils.py
def __init__(
    self,
    df: pd.DataFrame,
    id_field_name: str = "id",
    fields_to_retrieve: List[str] | None = None,
    one_based_ids: bool = False,
) -> None:
    """Initialize DataFrame record fetcher.

    Args:
        df: DataFrame containing records.
        id_field_name: Name of the integer ID field. Defaults to "id".
        fields_to_retrieve: Subset of fields to retrieve. If None, retrieves
            all fields. Defaults to None.
        one_based_ids: True if DataFrame uses 1-based IDs. Defaults to False.
    """
    super().__init__(
        id_field_name=id_field_name,
        fields_to_retrieve=fields_to_retrieve,
        one_based_ids=one_based_ids,
    )
    self.df = df
Functions
get_records
get_records(
    ids: List[int],
    one_based: bool = False,
    fields_to_retrieve: List[str] | None = None,
) -> pd.DataFrame

Fetch records from DataFrame by IDs.

Parameters:

Name Type Description Default
ids List[int]

Collection of record IDs to retrieve.

required
one_based bool

True if provided IDs are 1-based. Defaults to False.

False
fields_to_retrieve List[str] | None

Subset of fields for this call, overriding instance default. Defaults to None.

None

Returns:

Type Description
DataFrame

pd.DataFrame: DataFrame containing the retrieved records.

Source code in packages/utils/src/dataknobs_utils/sql_utils.py
def get_records(
    self,
    ids: List[int],
    one_based: bool = False,
    fields_to_retrieve: List[str] | None = None,
) -> pd.DataFrame:
    """Fetch records from DataFrame by IDs.

    Args:
        ids: Collection of record IDs to retrieve.
        one_based: True if provided IDs are 1-based. Defaults to False.
        fields_to_retrieve: Subset of fields for this call, overriding
            instance default. Defaults to None.

    Returns:
        pd.DataFrame: DataFrame containing the retrieved records.
    """
    offset = 0
    if one_based != self.one_based:
        offset = 1 if self.one_based else -1
    adjusted_ids = [an_id + offset for an_id in ids]
    df = self.df[self.df[self.id_field_name].isin(adjusted_ids)]
    if fields_to_retrieve is None:
        fields_to_retrieve = self.fields_to_retrieve
    if fields_to_retrieve is not None:
        df = df[fields_to_retrieve]
    return df

DictionaryRecordFetcher

DictionaryRecordFetcher(
    the_dict: Dict[int, List[Any]],
    all_field_names: List[str],
    id_field_name: str = "id",
    fields_to_retrieve: List[str] | None = None,
    one_based_ids: bool = False,
)

Bases: RecordFetcher

Fetch records from a dictionary mapping IDs to record values.

Attributes:

Name Type Description
the_dict

Dictionary mapping IDs to record value lists.

field_names

Field names corresponding to record value positions.

Initialize dictionary record fetcher.

Parameters:

Name Type Description Default
the_dict Dict[int, List[Any]]

Dictionary mapping IDs to lists of record values.

required
all_field_names List[str]

Field names in same order as record value lists.

required
id_field_name str

Name of the integer ID field. Defaults to "id".

'id'
fields_to_retrieve List[str] | None

Subset of fields to retrieve. If None, retrieves all fields. Defaults to None.

None
one_based_ids bool

True if dictionary uses 1-based IDs. Defaults to False.

False

Methods:

Name Description
get_records

Fetch records from dictionary by IDs.

Source code in packages/utils/src/dataknobs_utils/sql_utils.py
def __init__(
    self,
    the_dict: Dict[int, List[Any]],
    all_field_names: List[str],
    id_field_name: str = "id",
    fields_to_retrieve: List[str] | None = None,
    one_based_ids: bool = False,
):
    """Initialize dictionary record fetcher.

    Args:
        the_dict: Dictionary mapping IDs to lists of record values.
        all_field_names: Field names in same order as record value lists.
        id_field_name: Name of the integer ID field. Defaults to "id".
        fields_to_retrieve: Subset of fields to retrieve. If None, retrieves
            all fields. Defaults to None.
        one_based_ids: True if dictionary uses 1-based IDs. Defaults to False.
    """
    super().__init__(
        id_field_name=id_field_name,
        fields_to_retrieve=fields_to_retrieve,
        one_based_ids=one_based_ids,
    )
    self.the_dict = the_dict
    self.field_names = all_field_names
Functions
get_records
get_records(
    ids: List[int],
    one_based: bool = False,
    fields_to_retrieve: List[str] | None = None,
) -> pd.DataFrame

Fetch records from dictionary by IDs.

Parameters:

Name Type Description Default
ids List[int]

Collection of record IDs to retrieve.

required
one_based bool

True if provided IDs are 1-based. Defaults to False.

False
fields_to_retrieve List[str] | None

Subset of fields for this call, overriding instance default. Defaults to None.

None

Returns:

Type Description
DataFrame

pd.DataFrame: DataFrame containing the retrieved records, with None values for missing IDs.

Source code in packages/utils/src/dataknobs_utils/sql_utils.py
def get_records(
    self,
    ids: List[int],
    one_based: bool = False,
    fields_to_retrieve: List[str] | None = None,
) -> pd.DataFrame:
    """Fetch records from dictionary by IDs.

    Args:
        ids: Collection of record IDs to retrieve.
        one_based: True if provided IDs are 1-based. Defaults to False.
        fields_to_retrieve: Subset of fields for this call, overriding
            instance default. Defaults to None.

    Returns:
        pd.DataFrame: DataFrame containing the retrieved records, with None
            values for missing IDs.
    """
    offset = 0
    if one_based != self.one_based:
        offset = 1 if self.one_based else -1
    offset_ids = [an_id + offset for an_id in ids]
    records = [
        self.the_dict.get(an_id, [an_id] + [None] * (len(self.field_names) - 1))
        for an_id in offset_ids
    ]
    df = pd.DataFrame(records, columns=self.field_names)
    if fields_to_retrieve is None:
        fields_to_retrieve = self.fields_to_retrieve
    if fields_to_retrieve is not None:
        df = df[fields_to_retrieve]
    return df

DotenvPostgresConnector

DotenvPostgresConnector(
    host: str | None = None,
    db: str | None = None,
    user: str | None = None,
    pwd: str | None = None,
    port: int | None = None,
    pvname: str = ".project_vars",
)

PostgreSQL connection manager using environment variables and project vars.

Loads database connection parameters from environment variables (.env), project variables file, or constructor arguments, with environment variables taking precedence.

Attributes:

Name Type Description
host

Database host address.

database

Database name.

user

Database username.

password

Database password.

port

Database port number.

Initialize PostgreSQL connector with environment-based configuration.

Parameters:

Name Type Description Default
host str | None

Database host. If None, uses POSTGRES_HOST environment variable or "localhost". Defaults to None.

None
db str | None

Database name. If None, uses POSTGRES_DB environment variable or "postgres". Defaults to None.

None
user str | None

Username. If None, uses POSTGRES_USER environment variable or "postgres". Defaults to None.

None
pwd str | None

Password. If None, uses POSTGRES_PASSWORD environment variable. Defaults to None.

None
port int | None

Port number. If None, uses POSTGRES_PORT environment variable or 5432. Defaults to None.

None
pvname str

Project variables filename to load. Defaults to ".project_vars".

'.project_vars'

Methods:

Name Description
get_conn

Create and return a PostgreSQL database connection.

Source code in packages/utils/src/dataknobs_utils/sql_utils.py
def __init__(
    self,
    host: str | None = None,
    db: str | None = None,
    user: str | None = None,
    pwd: str | None = None,
    port: int | None = None,
    pvname: str = ".project_vars",
) -> None:
    """Initialize PostgreSQL connector with environment-based configuration.

    Args:
        host: Database host. If None, uses POSTGRES_HOST environment variable
            or "localhost". Defaults to None.
        db: Database name. If None, uses POSTGRES_DB environment variable
            or "postgres". Defaults to None.
        user: Username. If None, uses POSTGRES_USER environment variable
            or "postgres". Defaults to None.
        pwd: Password. If None, uses POSTGRES_PASSWORD environment variable.
            Defaults to None.
        port: Port number. If None, uses POSTGRES_PORT environment variable
            or 5432. Defaults to None.
        pvname: Project variables filename to load. Defaults to ".project_vars".
    """
    config = load_project_vars(pvname=pvname)
    if host is None or db is None or user is None or pwd is None or port is None:
        load_dotenv()

    self.host = (
        os.getenv(
            "POSTGRES_HOST", config.get("POSTGRES_HOST", "localhost") if config else "localhost"
        )
        if host is None
        else host
    )
    self.database = (
        os.getenv(
            "POSTGRES_DB", config.get("POSTGRES_DB", "postgres") if config else "postgres"
        )
        if db is None
        else db
    )
    self.user = (
        os.getenv(
            "POSTGRES_USER", config.get("POSTGRES_USER", "postgres") if config else "postgres"
        )
        if user is None
        else user
    )
    self.password = (
        os.getenv(
            "POSTGRES_PASSWORD", config.get("POSTGRES_PASSWORD", None) if config else None
        )
        if pwd is None
        else pwd
    )
    self.port = (
        int(os.getenv(
            "POSTGRES_PORT", config.get("POSTGRES_PORT", 5432) if config else 5432
        ))
        if port is None
        else port
    )
Functions
get_conn
get_conn() -> Any

Create and return a PostgreSQL database connection.

Returns:

Type Description
Any

psycopg2.connection: Active database connection using configured parameters.

Source code in packages/utils/src/dataknobs_utils/sql_utils.py
def get_conn(self) -> Any:
    """Create and return a PostgreSQL database connection.

    Returns:
        psycopg2.connection: Active database connection using configured parameters.
    """
    return psycopg2.connect(
        host=self.host,
        database=self.database,
        user=self.user,
        password=self.password,
        port=self.port,
    )

PostgresDB

PostgresDB(
    host: str | DotenvPostgresConnector | None = None,
    db: str | None = None,
    user: str | None = None,
    pwd: str | None = None,
    port: int | None = None,
)

PostgreSQL database wrapper with utilities for querying and managing tables.

Provides high-level interface for executing queries, managing tables, and uploading DataFrames to PostgreSQL databases.

Attributes:

Name Type Description
_connector

Connection manager for database operations.

Initialize PostgreSQL database wrapper.

Parameters:

Name Type Description Default
host str | DotenvPostgresConnector | None

Database host or DotenvPostgresConnector instance. If None, uses environment configuration. Defaults to None.

None
db str | None

Database name. If None, uses environment configuration. Defaults to None.

None
user str | None

Username. If None, uses environment configuration. Defaults to None.

None
pwd str | None

Password. If None, uses environment configuration. Defaults to None.

None
port int | None

Port number. If None, uses environment configuration. Defaults to None.

None

Methods:

Name Description
execute

Execute a SQL statement and commit changes.

get_conn

Get a connection to the PostgreSQL database.

query

Execute a SQL query and return results as a DataFrame.

table_head

Get the first N rows from a table.

upload

Upload DataFrame data to a database table.

Source code in packages/utils/src/dataknobs_utils/sql_utils.py
def __init__(
    self,
    host: str | DotenvPostgresConnector | None = None,
    db: str | None = None,
    user: str | None = None,
    pwd: str | None = None,
    port: int | None = None,
) -> None:
    """Initialize PostgreSQL database wrapper.

    Args:
        host: Database host or DotenvPostgresConnector instance. If None,
            uses environment configuration. Defaults to None.
        db: Database name. If None, uses environment configuration.
            Defaults to None.
        user: Username. If None, uses environment configuration. Defaults to None.
        pwd: Password. If None, uses environment configuration. Defaults to None.
        port: Port number. If None, uses environment configuration. Defaults to None.
    """
    # Allow passing a connector directly (for backward compatibility)
    if isinstance(host, DotenvPostgresConnector):
        self._connector = host
    else:
        self._connector = DotenvPostgresConnector(host=host, db=db, user=user, pwd=pwd, port=port)
    self._tables_df: pd.DataFrame | None = None
    self._table_names: List[str] | None = None
Attributes
table_names property
table_names: List[str]

Get list of all table names in the database.

Returns:

Type Description
List[str]

List[str]: List of table names from the public schema.

tables_df property
tables_df: DataFrame

Get DataFrame of database table metadata.

Note

The exact schema is database-specific. For PostgreSQL, queries information_schema.tables.

Returns:

Type Description
DataFrame

pd.DataFrame: Table metadata from information_schema.tables.

Functions
execute
execute(stmt: str, params: Dict[str, Any] | None = None) -> int

Execute a SQL statement and commit changes.

Parameters:

Name Type Description Default
stmt str

SQL statement to execute.

required
params Dict[str, Any] | None

Optional dictionary of parameters for safe injection. Defaults to None.

None

Returns:

Name Type Description
int int

Number of rows affected by the statement.

Source code in packages/utils/src/dataknobs_utils/sql_utils.py
def execute(self, stmt: str, params: Dict[str, Any] | None = None) -> int:
    """Execute a SQL statement and commit changes.

    Args:
        stmt: SQL statement to execute.
        params: Optional dictionary of parameters for safe injection.
            Defaults to None.

    Returns:
        int: Number of rows affected by the statement.
    """
    with self.get_conn() as conn:
        with conn.cursor() as curs:
            curs.execute(stmt, params)
            rowcount = curs.rowcount
            conn.commit()
    return rowcount
get_conn
get_conn() -> Any

Get a connection to the PostgreSQL database.

Returns:

Type Description
Any

psycopg2.connection: Active database connection.

Source code in packages/utils/src/dataknobs_utils/sql_utils.py
def get_conn(self) -> Any:
    """Get a connection to the PostgreSQL database.

    Returns:
        psycopg2.connection: Active database connection.
    """
    return self._connector.get_conn()
query
query(query: str, params: Dict[str, Any] | None = None) -> pd.DataFrame

Execute a SQL query and return results as a DataFrame.

Uses parameterized queries for safe injection of values.

Parameters:

Name Type Description Default
query str

SQL query string to execute.

required
params Dict[str, Any] | None

Dictionary of parameters to safely inject. Each parameter "param" should appear as "%(param)s" in the query string. Defaults to None.

None

Returns:

Type Description
DataFrame

pd.DataFrame: Query results with column names from the cursor.

Source code in packages/utils/src/dataknobs_utils/sql_utils.py
def query(
    self,
    query: str,
    params: Dict[str, Any] | None = None,
) -> pd.DataFrame:
    """Execute a SQL query and return results as a DataFrame.

    Uses parameterized queries for safe injection of values.

    Args:
        query: SQL query string to execute.
        params: Dictionary of parameters to safely inject. Each parameter
            "param" should appear as "%(param)s" in the query string.
            Defaults to None.

    Returns:
        pd.DataFrame: Query results with column names from the cursor.
    """
    with self.get_conn() as conn:
        with conn.cursor() as curs:
            if params is None:
                curs.execute(query)
            else:
                curs.execute(query, params)
            df = pd.DataFrame(curs.fetchall(), columns=[desc[0] for desc in curs.description])
    return df
table_head
table_head(table_name: str, n: int = 10) -> pd.DataFrame

Get the first N rows from a table.

Parameters:

Name Type Description Default
table_name str

Name of the table to sample.

required
n int

Number of rows to return. Defaults to 10.

10

Returns:

Type Description
DataFrame

pd.DataFrame: First N rows from the table.

Source code in packages/utils/src/dataknobs_utils/sql_utils.py
def table_head(self, table_name: str, n: int = 10) -> pd.DataFrame:
    """Get the first N rows from a table.

    Args:
        table_name: Name of the table to sample.
        n: Number of rows to return. Defaults to 10.

    Returns:
        pd.DataFrame: First N rows from the table.
    """
    return self.query(f"""SELECT * FROM {table_name} LIMIT {n}""")
upload
upload(table_name: str, df: DataFrame) -> None

Upload DataFrame data to a database table.

Creates the table if it doesn't exist, inferring schema from DataFrame types.

Parameters:

Name Type Description Default
table_name str

Name of the table to insert data into.

required
df DataFrame

DataFrame with columns matching table fields and data to upload.

required
Source code in packages/utils/src/dataknobs_utils/sql_utils.py
def upload(self, table_name: str, df: pd.DataFrame) -> None:
    """Upload DataFrame data to a database table.

    Creates the table if it doesn't exist, inferring schema from DataFrame types.

    Args:
        table_name: Name of the table to insert data into.
        df: DataFrame with columns matching table fields and data to upload.
    """
    fields = ", ".join(df.columns)
    template = ", ".join(["%s"] * len(df.columns))
    if table_name not in self.table_names:
        self._create_table(table_name, df)
    with self.get_conn() as conn:
        with conn.cursor() as curs:
            sql = f"INSERT INTO {table_name} ({fields}) VALUES " + ",".join(
                curs.mogrify(
                    f"({template})",
                    [str(row[col]) for col in df.columns],
                ).decode("utf-8")
                for row in df.to_records()
            )
            curs.execute(sql)

PostgresRecordFetcher

PostgresRecordFetcher(
    db: PostgresDB,
    table_name: str,
    id_field_name: str = "id",
    fields_to_retrieve: List[str] | None = None,
    one_based_ids: bool = False,
)

Bases: RecordFetcher

Fetch records from a PostgreSQL table by ID.

Attributes:

Name Type Description
db

PostgreSQL database connection wrapper.

table_name

Name of the table to query.

Initialize PostgreSQL record fetcher.

Parameters:

Name Type Description Default
db PostgresDB

PostgresDB instance for database operations.

required
table_name str

Name of the table to fetch records from.

required
id_field_name str

Name of the integer ID field. Defaults to "id".

'id'
fields_to_retrieve List[str] | None

Subset of fields to retrieve. If None, retrieves all fields. Defaults to None.

None
one_based_ids bool

True if data source uses 1-based IDs. Defaults to False.

False

Methods:

Name Description
get_records

Fetch records from PostgreSQL table by IDs.

Source code in packages/utils/src/dataknobs_utils/sql_utils.py
def __init__(
    self,
    db: PostgresDB,
    table_name: str,
    id_field_name: str = "id",
    fields_to_retrieve: List[str] | None = None,
    one_based_ids: bool = False,
) -> None:
    """Initialize PostgreSQL record fetcher.

    Args:
        db: PostgresDB instance for database operations.
        table_name: Name of the table to fetch records from.
        id_field_name: Name of the integer ID field. Defaults to "id".
        fields_to_retrieve: Subset of fields to retrieve. If None, retrieves
            all fields. Defaults to None.
        one_based_ids: True if data source uses 1-based IDs. Defaults to False.
    """
    super().__init__(
        id_field_name=id_field_name,
        fields_to_retrieve=fields_to_retrieve,
        one_based_ids=one_based_ids,
    )
    self.db = db
    self.table_name = table_name
Functions
get_records
get_records(
    ids: List[int],
    one_based: bool = False,
    fields_to_retrieve: List[str] | None = None,
) -> pd.DataFrame

Fetch records from PostgreSQL table by IDs.

Parameters:

Name Type Description Default
ids List[int]

Collection of record IDs to retrieve.

required
one_based bool

True if provided IDs are 1-based. Defaults to False.

False
fields_to_retrieve List[str] | None

Subset of fields for this call, overriding instance default. Defaults to None.

None

Returns:

Type Description
DataFrame

pd.DataFrame: DataFrame containing the retrieved records.

Source code in packages/utils/src/dataknobs_utils/sql_utils.py
def get_records(
    self,
    ids: List[int],
    one_based: bool = False,
    fields_to_retrieve: List[str] | None = None,
) -> pd.DataFrame:
    """Fetch records from PostgreSQL table by IDs.

    Args:
        ids: Collection of record IDs to retrieve.
        one_based: True if provided IDs are 1-based. Defaults to False.
        fields_to_retrieve: Subset of fields for this call, overriding
            instance default. Defaults to None.

    Returns:
        pd.DataFrame: DataFrame containing the retrieved records.
    """
    if fields_to_retrieve is None:
        fields_to_retrieve = self.fields_to_retrieve
    if fields_to_retrieve is not None:
        fields = ", ".join(fields_to_retrieve)
    else:
        fields = "*"
    offset = 0
    if one_based != self.one_based:
        offset = 1 if self.one_based else -1
    values = ", ".join([str(value + offset) for value in ids])
    return self.db.query(f"""
       SELECT {fields}
       FROM {self.table_name}
       WHERE {self.id_field_name} IN ({values})
    """)

RecordFetcher

RecordFetcher(
    id_field_name: str = "id",
    fields_to_retrieve: List[str] | None = None,
    one_based_ids: bool = False,
)

Bases: ABC

Abstract base class for fetching records from a data source.

Provides a common interface for retrieving records by ID from various data sources (databases, DataFrames, dictionaries, etc.) with support for zero-based and one-based ID systems.

Attributes:

Name Type Description
id_field_name

Name of the ID field in the data source.

fields_to_retrieve

Subset of fields to retrieve (None for all).

one_based

True if data source uses 1-based IDs.

Initialize the record fetcher.

Parameters:

Name Type Description Default
id_field_name str

Name of the integer ID field. Defaults to "id".

'id'
fields_to_retrieve List[str] | None

Subset of fields to retrieve. If None, retrieves all fields. Defaults to None.

None
one_based_ids bool

True if data source uses 1-based IDs, False for 0-based. Defaults to False.

False

Methods:

Name Description
get_records

Fetch records by ID from the data source.

Source code in packages/utils/src/dataknobs_utils/sql_utils.py
def __init__(
    self,
    id_field_name: str = "id",
    fields_to_retrieve: List[str] | None = None,
    one_based_ids: bool = False,
) -> None:
    """Initialize the record fetcher.

    Args:
        id_field_name: Name of the integer ID field. Defaults to "id".
        fields_to_retrieve: Subset of fields to retrieve. If None, retrieves
            all fields. Defaults to None.
        one_based_ids: True if data source uses 1-based IDs, False for 0-based.
            Defaults to False.
    """
    self.id_field_name = id_field_name
    self.fields_to_retrieve = fields_to_retrieve
    self.one_based = one_based_ids
Functions
get_records abstractmethod
get_records(
    ids: List[int],
    one_based: bool = False,
    fields_to_retrieve: List[str] | None = None,
) -> pd.DataFrame

Fetch records by ID from the data source.

Parameters:

Name Type Description Default
ids List[int]

Collection of record IDs to retrieve.

required
one_based bool

True if the provided IDs are 1-based. Defaults to False.

False
fields_to_retrieve List[str] | None

Subset of fields for this call, overriding instance default. Defaults to None.

None

Returns:

Type Description
DataFrame

pd.DataFrame: DataFrame containing the retrieved records.

Raises:

Type Description
NotImplementedError

Must be implemented by subclasses.

Source code in packages/utils/src/dataknobs_utils/sql_utils.py
@abstractmethod
def get_records(
    self, ids: List[int], one_based: bool = False, fields_to_retrieve: List[str] | None = None
) -> pd.DataFrame:
    """Fetch records by ID from the data source.

    Args:
        ids: Collection of record IDs to retrieve.
        one_based: True if the provided IDs are 1-based. Defaults to False.
        fields_to_retrieve: Subset of fields for this call, overriding
            instance default. Defaults to None.

    Returns:
        pd.DataFrame: DataFrame containing the retrieved records.

    Raises:
        NotImplementedError: Must be implemented by subclasses.
    """
    raise NotImplementedError

Functions

stats_utils

Functions

dataknobs_utils.stats_utils

Statistical utility functions and timing helpers.

Provides utilities for timing operations, random waits, rate limiting, and basic statistical calculations.

Classes:

Name Description
KeyManager

Turn descriptions of key types into keys of the form type-N.

LinearRegression

A low-memory helper class to collect (x,y) samples and compute linear regression.

Monitor

A monitor tracks processing and/or access times and rates for some function.

MonitorManager

Manage a set of monitors by label, providing rollup views across all monitors.

RollingStats

A collection of statistics through a rolling window of time.

StatsAccumulator

A low-memory helper class to collect statistical samples and provide summary statistics.

Functions:

Name Description
wait_for_random_millis

Wait for a random number of milliseconds.

Classes

KeyManager

KeyManager()

Turn descriptions of key types into keys of the form type-N.

Maps (keytype, description) pairs to unique keys by tracking descriptions for each keytype and assigning sequential numbers. The same description always maps to the same key for a given keytype.

Example

from dataknobs_utils.stats_utils import KeyManager km = KeyManager() km.get_key("endpoint", "/api/users") "endpoint-0" km.get_key("endpoint", "/api/posts") "endpoint-1" km.get_key("endpoint", "/api/users") "endpoint-0"

Initialize a KeyManager.

Methods:

Name Description
get_key

Get a unique key for a (keytype, description) pair.

Source code in packages/utils/src/dataknobs_utils/stats_utils.py
def __init__(self) -> None:
    """Initialize a KeyManager."""
    self._keytype2descriptions: Dict[str, List[str]] = {}
Functions
get_key
get_key(keytype: str, description: str) -> str

Get a unique key for a (keytype, description) pair.

Parameters:

Name Type Description Default
keytype str

The key type (e.g., "endpoint", "function").

required
description str

The description to convert to a key.

required

Returns:

Name Type Description
str str

A unique key in the form "keytype-N" where N is the index.

Source code in packages/utils/src/dataknobs_utils/stats_utils.py
def get_key(self, keytype: str, description: str) -> str:
    """Get a unique key for a (keytype, description) pair.

    Args:
        keytype: The key type (e.g., "endpoint", "function").
        description: The description to convert to a key.

    Returns:
        str: A unique key in the form "keytype-N" where N is the index.
    """
    if keytype in self._keytype2descriptions:
        descriptions = self._keytype2descriptions[keytype]
    else:
        descriptions = []
        self._keytype2descriptions[keytype] = descriptions

    if description in descriptions:
        index = descriptions.index(description)
    else:
        index = len(descriptions)
        descriptions.append(description)

    result = f"{keytype}-{index}"
    return result

LinearRegression

LinearRegression()

A low-memory helper class to collect (x,y) samples and compute linear regression.

Incrementally computes the line of best fit (y = mx + b) from (x,y) samples without storing individual points. Uses the least squares method to calculate the slope (m) and intercept (b).

Attributes:

Name Type Description
n int

Number of (x,y) samples added.

x_sum float

Sum of all x values.

y_sum float

Sum of all y values.

xy_sum float

Sum of all x*y products.

xx_sum float

Sum of all x*x products.

yy_sum float

Sum of all y*y products.

m float

Slope of the regression line (computed lazily).

b float

Y-intercept of the regression line (computed lazily).

Initialize a LinearRegression accumulator.

Methods:

Name Description
__str__

Get the regression equation as a string.

add

Add an (x,y) sample point.

get_y

Compute the predicted y value for a given x.

Source code in packages/utils/src/dataknobs_utils/stats_utils.py
def __init__(self) -> None:
    """Initialize a LinearRegression accumulator."""
    self._n = 0
    self._x_sum = 0.0
    self._y_sum = 0.0
    self._xy_sum = 0.0
    self._xx_sum = 0.0
    self._yy_sum = 0.0

    self._m: float | None = None
    self._b: float | None = None
Attributes
b property
b: float

Get the y-intercept of the regression line.

m property
m: float

Get the slope of the regression line.

n property
n: int

Get the number of (x,y) samples added.

x_sum property
x_sum: float

Get the sum of all x values.

xx_sum property
xx_sum: float

Get the sum of all x*x products.

xy_sum property
xy_sum: float

Get the sum of all x*y products.

y_sum property
y_sum: float

Get the sum of all y values.

yy_sum property
yy_sum: float

Get the sum of all y*y products.

Functions
__str__
__str__() -> str

Get the regression equation as a string.

Returns:

Name Type Description
str str

The equation in the form "y = m x + b".

Source code in packages/utils/src/dataknobs_utils/stats_utils.py
def __str__(self) -> str:
    """Get the regression equation as a string.

    Returns:
        str: The equation in the form "y = m x + b".
    """
    return f"y = {self.m:.4f} x + {self.b:.4f}"
add
add(x: float, y: float) -> None

Add an (x,y) sample point.

Parameters:

Name Type Description Default
x float

The x value.

required
y float

The y value.

required
Source code in packages/utils/src/dataknobs_utils/stats_utils.py
def add(self, x: float, y: float) -> None:
    """Add an (x,y) sample point.

    Args:
        x: The x value.
        y: The y value.
    """
    self._m = None
    self._b = None
    self._n += 1
    self._x_sum += x
    self._y_sum += y
    self._xy_sum += x * y
    self._xx_sum += x * x
    self._yy_sum += y * y
get_y
get_y(x: float) -> float

Compute the predicted y value for a given x.

Parameters:

Name Type Description Default
x float

The x value.

required

Returns:

Name Type Description
float float

The predicted y value (y = mx + b).

Source code in packages/utils/src/dataknobs_utils/stats_utils.py
def get_y(self, x: float) -> float:
    """Compute the predicted y value for a given x.

    Args:
        x: The x value.

    Returns:
        float: The predicted y value (y = mx + b).
    """
    return self.m * x + self.b

Monitor

Monitor(
    description: str | None = None,
    access_times: RollingStats | None = None,
    processing_times: RollingStats | None = None,
    default_window_width: int = 300000,
    default_segment_width: int = 5000,
)

A monitor tracks processing and/or access times and rates for some function.

Monitors measure both access frequency (how often something is called) and processing time (how long it takes). Uses RollingStats to provide both cumulative and rolling window statistics.

Attributes:

Name Type Description
alive_since datetime

Time when this monitor was created.

description str | None

Optional description of what is being monitored.

last_access_time datetime | None

Time when access was last recorded.

access_times RollingStats | None

RollingStats for tracking time between accesses.

processing_times RollingStats | None

RollingStats for tracking processing duration.

access_cumulative_stats StatsAccumulator | None

Cumulative access time statistics.

access_window_stats StatsAccumulator | None

Rolling window access time statistics.

access_window_width int | None

Width of the access time rolling window.

processing_cumulative_stats StatsAccumulator | None

Cumulative processing time statistics.

processing_window_stats StatsAccumulator | None

Rolling window processing time statistics.

processing_window_width int | None

Width of the processing time rolling window.

default_window_width int

Default window width for auto-created RollingStats.

default_segment_width int

Default segment width for auto-created RollingStats.

Initialize a Monitor.

Parameters:

Name Type Description Default
description str | None

Optional description of what is being monitored. Defaults to None.

None
access_times RollingStats | None

RollingStats instance for tracking access times. Defaults to None (created on first mark).

None
processing_times RollingStats | None

RollingStats instance for tracking processing times. Defaults to None (created on first mark with endtime).

None
default_window_width int

Default window width in milliseconds for auto-created RollingStats. Defaults to 300000 (5 minutes).

300000
default_segment_width int

Default segment width in milliseconds for auto-created RollingStats. Defaults to 5000 (5 seconds).

5000

Methods:

Name Description
__str__

Get the monitor statistics as a JSON string.

as_dict

Get a dictionary containing a summary of this instance's information.

get_stats

Get window or cumulative access or processing stats.

mark

Mark another access and optionally record processing time.

Source code in packages/utils/src/dataknobs_utils/stats_utils.py
def __init__(
    self,
    description: str | None = None,
    access_times: RollingStats | None = None,
    processing_times: RollingStats | None = None,
    default_window_width: int = 300000,
    default_segment_width: int = 5000,
) -> None:
    """Initialize a Monitor.

    Args:
        description: Optional description of what is being monitored.
            Defaults to None.
        access_times: RollingStats instance for tracking access times.
            Defaults to None (created on first mark).
        processing_times: RollingStats instance for tracking processing times.
            Defaults to None (created on first mark with endtime).
        default_window_width: Default window width in milliseconds for
            auto-created RollingStats. Defaults to 300000 (5 minutes).
        default_segment_width: Default segment width in milliseconds for
            auto-created RollingStats. Defaults to 5000 (5 seconds).
    """
    self._modlock = Lock()
    self._alive_since = datetime.now()
    self._description = description
    self._last_start_time: datetime | None = None
    self._access_times = access_times
    self._processing_times = processing_times

    # Defaults for when creating RollingStats from within this Monitor
    self._default_window_width = default_window_width
    self._default_segment_width = default_segment_width
Attributes
access_cumulative_stats property
access_cumulative_stats: StatsAccumulator | None

Get cumulative access time statistics.

access_times property
access_times: RollingStats | None

Get the access_times (RollingStats).

access_window_stats property
access_window_stats: StatsAccumulator | None

Get rolling window access time statistics.

access_window_width property
access_window_width: int | None

Get the access time rolling window width in milliseconds.

alive_since property
alive_since: datetime

Get the time when this monitor was created.

default_segment_width property writable
default_segment_width: int

Get the default segment width for auto-created RollingStats.

default_window_width property writable
default_window_width: int

Get the default window width for auto-created RollingStats.

description property writable
description: str | None

Get the description of what is being monitored.

last_access_time property
last_access_time: datetime | None

Get the time at which access was last recorded.

processing_cumulative_stats property
processing_cumulative_stats: StatsAccumulator | None

Get cumulative processing time statistics.

processing_times property
processing_times: RollingStats | None

Get the processing_times (RollingStats).

processing_window_stats property
processing_window_stats: StatsAccumulator | None

Get rolling window processing time statistics.

processing_window_width property
processing_window_width: int | None

Get the processing time rolling window width in milliseconds.

Functions
__str__
__str__() -> str

Get the monitor statistics as a JSON string.

Returns:

Name Type Description
str str

JSON representation of the statistics dictionary.

Source code in packages/utils/src/dataknobs_utils/stats_utils.py
def __str__(self) -> str:
    """Get the monitor statistics as a JSON string.

    Returns:
        str: JSON representation of the statistics dictionary.
    """
    return json.dumps(self.as_dict(), sort_keys=True)
as_dict
as_dict() -> Dict[str, Any]

Get a dictionary containing a summary of this instance's information.

Returns:

Type Description
Dict[str, Any]

Dict[str, Any]: Dictionary with keys: alive_since, description (optional), last_mark (optional), access_stats (optional), processing_stats (optional).

Source code in packages/utils/src/dataknobs_utils/stats_utils.py
def as_dict(self) -> Dict[str, Any]:
    """Get a dictionary containing a summary of this instance's information.

    Returns:
        Dict[str, Any]: Dictionary with keys: alive_since, description (optional),
            last_mark (optional), access_stats (optional), processing_stats (optional).
    """
    result: Dict[str, Any] = {}

    result["alive_since"] = str(self.alive_since)
    if self.description is not None:
        result["description"] = self.description
    if self.last_access_time is not None:
        result["last_mark"] = str(self.last_access_time)
    if self.access_times is not None:
        result["access_stats"] = self.access_times.as_dict()
    if self.processing_times is not None:
        result["processing_stats"] = self.processing_times.as_dict()

    return result
get_stats
get_stats(
    access: bool = False, window: bool = False
) -> StatsAccumulator | None

Get window or cumulative access or processing stats.

Parameters:

Name Type Description Default
access bool

If True, get access stats; if False, get processing stats. Defaults to False.

False
window bool

If True, get window stats; if False, get cumulative stats. Defaults to False.

False

Returns:

Type Description
StatsAccumulator | None

StatsAccumulator | None: The requested statistics, or None if not available.

Source code in packages/utils/src/dataknobs_utils/stats_utils.py
def get_stats(self, access: bool = False, window: bool = False) -> StatsAccumulator | None:
    """Get window or cumulative access or processing stats.

    Args:
        access: If True, get access stats; if False, get processing stats.
            Defaults to False.
        window: If True, get window stats; if False, get cumulative stats.
            Defaults to False.

    Returns:
        StatsAccumulator | None: The requested statistics, or None if not available.
    """
    result = None

    rolling_stats = self._access_times if access else self._processing_times
    if rolling_stats is not None:
        result = rolling_stats.window_stats if window else rolling_stats.cumulative_stats

    return result
mark
mark(starttime: datetime, endtime: datetime | None = None) -> None

Mark another access and optionally record processing time.

Records the time between this call and the previous call as an access time. If endtime is provided, also records the time between starttime and endtime as processing time.

Parameters:

Name Type Description Default
starttime datetime

The start time of this access.

required
endtime datetime | None

Optional end time for recording processing duration. Defaults to None.

None
Source code in packages/utils/src/dataknobs_utils/stats_utils.py
def mark(self, starttime: datetime, endtime: datetime | None = None) -> None:
    """Mark another access and optionally record processing time.

    Records the time between this call and the previous call as an access time.
    If endtime is provided, also records the time between starttime and endtime
    as processing time.

    Args:
        starttime: The start time of this access.
        endtime: Optional end time for recording processing duration.
            Defaults to None.
    """
    self._modlock.acquire()
    try:
        if self._last_start_time is not None:
            if self._access_times is None:
                # initialize if needed
                self._access_times = RollingStats(
                    self.default_window_width, self.default_segment_width
                )

            self._access_times.add(self._get_millis(starttime, self._last_start_time))

        if endtime is not None:
            if self._processing_times is None:
                # initialize if needed
                self._processing_times = RollingStats(
                    self.default_window_width, self.default_segment_width
                )

            self._processing_times.add(self._get_millis(endtime, starttime))

        self._last_start_time = starttime

    finally:
        self._modlock.release()

MonitorManager

MonitorManager(
    default_window_width: int = 300000, default_segment_width: int = 5000
)

Manage a set of monitors by label, providing rollup views across all monitors.

Maintains a collection of Monitor instances identified by labels, allowing aggregation of statistics across multiple monitors. Uses a KeyManager to generate unique keys from type and description pairs.

Attributes:

Name Type Description
default_window_width int

Default window width for auto-created Monitors.

default_segment_width int

Default segment width for auto-created Monitors.

Initialize a MonitorManager.

Parameters:

Name Type Description Default
default_window_width int

Default window width in milliseconds for auto-created Monitors. Defaults to 300000 (5 minutes).

300000
default_segment_width int

Default segment width in milliseconds for auto-created Monitors. Defaults to 5000 (5 seconds).

5000

Methods:

Name Description
__str__

Get the monitor manager statistics as a JSON string.

as_dict

Get a dictionary containing a summary of this instance's information.

get_monitor

Get a monitor by label, optionally creating it if missing.

get_monitors

Get all monitors.

get_or_create_monitor_by_key_type

Get or create a monitor using a key type and description.

get_overall_stats

Get overall statistics aggregated across all monitors.

get_stats

Get window or cumulative access or processing stats for a label or all monitors.

set_monitor

Set or replace a monitor for a given label.

Source code in packages/utils/src/dataknobs_utils/stats_utils.py
def __init__(
    self, default_window_width: int = 300000, default_segment_width: int = 5000
) -> None:
    """Initialize a MonitorManager.

    Args:
        default_window_width: Default window width in milliseconds for
            auto-created Monitors. Defaults to 300000 (5 minutes).
        default_segment_width: Default segment width in milliseconds for
            auto-created Monitors. Defaults to 5000 (5 seconds).
    """
    self._monitors: Dict[str, Monitor] = {}
    self._key_manager = KeyManager()

    # Defaults for when creating a Monitor from within this MonitorManager
    self._default_window_width = default_window_width
    self._default_segment_width = default_segment_width
Attributes
default_segment_width property writable
default_segment_width: int

Get the default segment width for auto-created Monitors.

default_window_width property writable
default_window_width: int

Get the default window width for auto-created Monitors.

Functions
__str__
__str__() -> str

Get the monitor manager statistics as a JSON string.

Returns:

Name Type Description
str str

JSON representation of the statistics dictionary.

Source code in packages/utils/src/dataknobs_utils/stats_utils.py
def __str__(self) -> str:
    """Get the monitor manager statistics as a JSON string.

    Returns:
        str: JSON representation of the statistics dictionary.
    """
    return json.dumps(self.as_dict(), sort_keys=True)
as_dict
as_dict() -> Dict[str, Any]

Get a dictionary containing a summary of this instance's information.

Returns:

Type Description
Dict[str, Any]

Dict[str, Any]: Dictionary with overall_stats and individual monitor statistics keyed by label.

Source code in packages/utils/src/dataknobs_utils/stats_utils.py
def as_dict(self) -> Dict[str, Any]:
    """Get a dictionary containing a summary of this instance's information.

    Returns:
        Dict[str, Any]: Dictionary with overall_stats and individual monitor
            statistics keyed by label.
    """
    result = {}

    # add overall processing/access, cumulative/window stats
    result["overall_stats"] = self.get_overall_stats()

    # add as_dict for each individual monitor
    for key, monitor in self._monitors.items():
        result[key] = monitor.as_dict()

    return result
get_monitor
get_monitor(
    label: str, create_if_missing: bool = False, description: str | None = None
) -> Monitor | None

Get a monitor by label, optionally creating it if missing.

Parameters:

Name Type Description Default
label str

The monitor label.

required
create_if_missing bool

If True, create a new Monitor if the label doesn't exist. Defaults to False.

False
description str | None

Optional description for a newly created Monitor. Defaults to None.

None

Returns:

Type Description
Monitor | None

Monitor | None: The Monitor instance, or None if not found and create_if_missing is False.

Source code in packages/utils/src/dataknobs_utils/stats_utils.py
def get_monitor(
    self, label: str, create_if_missing: bool = False, description: str | None = None
) -> Monitor | None:
    """Get a monitor by label, optionally creating it if missing.

    Args:
        label: The monitor label.
        create_if_missing: If True, create a new Monitor if the label doesn't exist.
            Defaults to False.
        description: Optional description for a newly created Monitor.
            Defaults to None.

    Returns:
        Monitor | None: The Monitor instance, or None if not found and
            create_if_missing is False.
    """
    result = None

    if label in self._monitors:
        result = self._monitors[label]

    elif create_if_missing:
        result = Monitor(
            description=description,
            default_window_width=self.default_window_width,
            default_segment_width=self.default_segment_width,
        )
        self._monitors[label] = result

    return result
get_monitors
get_monitors() -> Dict[str, Monitor]

Get all monitors.

Returns:

Type Description
Dict[str, Monitor]

Dict[str, Monitor]: Dictionary mapping labels to Monitor instances.

Source code in packages/utils/src/dataknobs_utils/stats_utils.py
def get_monitors(self) -> Dict[str, Monitor]:
    """Get all monitors.

    Returns:
        Dict[str, Monitor]: Dictionary mapping labels to Monitor instances.
    """
    return self._monitors
get_or_create_monitor_by_key_type
get_or_create_monitor_by_key_type(keytype: str, description: str) -> Monitor

Get or create a monitor using a key type and description.

Generates a unique key from the keytype and description using the internal KeyManager, then gets or creates a monitor with that key.

Parameters:

Name Type Description Default
keytype str

The key type (e.g., "endpoint", "function").

required
description str

The description to convert to a key.

required

Returns:

Name Type Description
Monitor Monitor

The Monitor instance (always created if missing).

Source code in packages/utils/src/dataknobs_utils/stats_utils.py
def get_or_create_monitor_by_key_type(self, keytype: str, description: str) -> Monitor:
    """Get or create a monitor using a key type and description.

    Generates a unique key from the keytype and description using the internal
    KeyManager, then gets or creates a monitor with that key.

    Args:
        keytype: The key type (e.g., "endpoint", "function").
        description: The description to convert to a key.

    Returns:
        Monitor: The Monitor instance (always created if missing).
    """
    key = self._key_manager.get_key(keytype, description)
    result = self.get_monitor(key, create_if_missing=True, description=description)
    assert result is not None  # Always created when create_if_missing=True
    return result
get_overall_stats
get_overall_stats() -> Dict[str, Any]

Get overall statistics aggregated across all monitors.

Returns:

Type Description
Dict[str, Any]

Dict[str, Any]: Dictionary with keys: cumulative_processing, cumulative_access, window_processing, window_access (each optional).

Source code in packages/utils/src/dataknobs_utils/stats_utils.py
def get_overall_stats(self) -> Dict[str, Any]:
    """Get overall statistics aggregated across all monitors.

    Returns:
        Dict[str, Any]: Dictionary with keys: cumulative_processing,
            cumulative_access, window_processing, window_access (each optional).
    """
    result = {}

    cumulative_processing = self.get_stats(access=False, window=False)
    if cumulative_processing is not None:
        result["cumulative_processing"] = cumulative_processing.as_dict()

    cumulative_access = self.get_stats(access=True, window=False)
    if cumulative_access is not None:
        result["cumulative_access"] = cumulative_access.as_dict()

    window_processing = self.get_stats(access=False, window=True)
    if window_processing is not None:
        result["window_processing"] = window_processing.as_dict()

    window_access = self.get_stats(access=True, window=True)
    if window_access is not None:
        result["window_access"] = window_access.as_dict()

    return result
get_stats
get_stats(
    label: str | None = None, access: bool = False, window: bool = False
) -> StatsAccumulator | None

Get window or cumulative access or processing stats for a label or all monitors.

Parameters:

Name Type Description Default
label str | None

The monitor label, or None to aggregate across all monitors. Defaults to None.

None
access bool

If True, get access stats; if False, get processing stats. Defaults to False.

False
window bool

If True, get window stats; if False, get cumulative stats. Defaults to False.

False

Returns:

Type Description
StatsAccumulator | None

StatsAccumulator | None: The requested statistics, or None if not available.

Source code in packages/utils/src/dataknobs_utils/stats_utils.py
def get_stats(
    self, label: str | None = None, access: bool = False, window: bool = False
) -> StatsAccumulator | None:
    """Get window or cumulative access or processing stats for a label or all monitors.

    Args:
        label: The monitor label, or None to aggregate across all monitors.
            Defaults to None.
        access: If True, get access stats; if False, get processing stats.
            Defaults to False.
        window: If True, get window stats; if False, get cumulative stats.
            Defaults to False.

    Returns:
        StatsAccumulator | None: The requested statistics, or None if not available.
    """
    result = None

    if label is None:
        result = StatsAccumulator("rollup")

        # Combine access stats across all monitors
        for monitor in self._monitors.values():
            result.incorporate(monitor.get_stats(access, window))

    elif label in self._monitors:
        result = self._monitors[label].get_stats(access, window)

    return result
set_monitor
set_monitor(label: str, monitor: Monitor) -> None

Set or replace a monitor for a given label.

Parameters:

Name Type Description Default
label str

The monitor label.

required
monitor Monitor

The Monitor instance to set.

required
Source code in packages/utils/src/dataknobs_utils/stats_utils.py
def set_monitor(self, label: str, monitor: Monitor) -> None:
    """Set or replace a monitor for a given label.

    Args:
        label: The monitor label.
        monitor: The Monitor instance to set.
    """
    self._monitors[label] = monitor

RollingStats

RollingStats(window_width: int = 300000, segment_width: int = 5000)

A collection of statistics through a rolling window of time.

Maintains both cumulative statistics (since initialization) and rolling window statistics (over a recent time period). The rolling window is divided into segments that are automatically cleared as time advances, providing an efficient way to track recent activity without storing all historical data.

Attributes:

Name Type Description
window_width int

Width of the rolling window in milliseconds.

num_segments int

Number of segments dividing the window.

cur_segment int

Current segment index.

last_segment int

Last segment index (same as cur_segment).

start_time datetime

Time when this RollingStats was created or reset.

ref_time datetime

Most recent reference time (updates on each access).

cumulative_stats StatsAccumulator

Statistics accumulated since start_time.

window_stats StatsAccumulator

Statistics for the current rolling window.

current_label str

Label for the current window.

Initialize a RollingStats instance.

Parameters:

Name Type Description Default
window_width int

Width of the rolling window in milliseconds. Defaults to 300000 (5 minutes).

300000
segment_width int

Width of each segment in milliseconds. Defaults to 5000 (5 seconds).

5000

Methods:

Name Description
__str__

Get the statistics as a JSON string.

add

Add one or more values to the current segment.

as_dict

Get a dictionary containing a summary of this instance's information.

get_items_per_milli

Extract the average number of items per millisecond from the stats.

get_millis_per_item

Extract the average number of milliseconds per item from the stats.

has_window_activity

Determine whether the current window has activity.

reset

Reset all statistics and restart from current time.

Source code in packages/utils/src/dataknobs_utils/stats_utils.py
def __init__(self, window_width: int = 300000, segment_width: int = 5000) -> None:
    """Initialize a RollingStats instance.

    Args:
        window_width: Width of the rolling window in milliseconds.
            Defaults to 300000 (5 minutes).
        segment_width: Width of each segment in milliseconds.
            Defaults to 5000 (5 seconds).
    """
    self._modlock = Lock()
    self._window_width = window_width
    self._segment_width = segment_width
    self._window_delta = timedelta(milliseconds=window_width)
    self._cumulative_stats = StatsAccumulator("cumulative")

    self._num_segments = round(window_width / segment_width)
    self._segment_stats = [
        StatsAccumulator("segment-" + str(i)) for i in range(self._num_segments)
    ]
    self._starttime = datetime.now()
    self._reftime = self._starttime
    self._cur_segment = 0
Attributes
cumulative_stats property
cumulative_stats: StatsAccumulator

Get cumulative statistics since start_time.

cur_segment property
cur_segment: int

Get the current segment index (updates to current time).

current_label property
current_label: str

Get the label for the current window.

last_segment property
last_segment: int

Get the last segment index (same as cur_segment without update).

num_segments property
num_segments: int

Get the number of segments in the rolling window.

ref_time property
ref_time: datetime

Get the most recent reference time.

start_time property
start_time: datetime

Get the time when this RollingStats was created or last reset.

window_stats property
window_stats: StatsAccumulator

Get statistics for the current rolling window.

window_width property
window_width: int

Get the width of the rolling window in milliseconds.

Functions
__str__
__str__() -> str

Get the statistics as a JSON string.

Returns:

Name Type Description
str str

JSON representation of the statistics dictionary.

Source code in packages/utils/src/dataknobs_utils/stats_utils.py
def __str__(self) -> str:
    """Get the statistics as a JSON string.

    Returns:
        str: JSON representation of the statistics dictionary.
    """
    return json.dumps(self.as_dict(), sort_keys=True)
add
add(*values: float) -> int

Add one or more values to the current segment.

Parameters:

Name Type Description Default
*values float

One or more numeric values to add.

()

Returns:

Name Type Description
int int

The segment number to which the values were added (useful for testing).

Source code in packages/utils/src/dataknobs_utils/stats_utils.py
def add(self, *values: float) -> int:
    """Add one or more values to the current segment.

    Args:
        *values: One or more numeric values to add.

    Returns:
        int: The segment number to which the values were added (useful for testing).
    """
    self._modlock.acquire()
    try:
        self._inc_to_cur_segment()
        self._segment_stats[self._cur_segment].add(*values)
        result = self._cur_segment
        self._cumulative_stats.add(*values)
    finally:
        self._modlock.release()

    return result
as_dict
as_dict() -> Dict[str, Any]

Get a dictionary containing a summary of this instance's information.

Returns:

Type Description
Dict[str, Any]

Dict[str, Any]: Dictionary with keys: now, cumulative, window.

Source code in packages/utils/src/dataknobs_utils/stats_utils.py
def as_dict(self) -> Dict[str, Any]:
    """Get a dictionary containing a summary of this instance's information.

    Returns:
        Dict[str, Any]: Dictionary with keys: now, cumulative, window.
    """
    result: Dict[str, Any] = {"now": str(datetime.now())}

    cumulative_info: Dict[str, Any] = {"since": str(self.start_time)}
    self._add_stats_info(self.cumulative_stats, cumulative_info)
    result["cumulative"] = cumulative_info

    window_info: Dict[str, Any] = {"width_millis": self.window_width}
    current_window_stats = self.window_stats
    self._add_stats_info(current_window_stats, window_info)
    result["window"] = window_info

    return result
get_items_per_milli staticmethod
get_items_per_milli(stats: StatsAccumulator | None) -> float | None

Extract the average number of items per millisecond from the stats.

Parameters:

Name Type Description Default
stats StatsAccumulator | None

StatsAccumulator instance or None.

required

Returns:

Type Description
float | None

float | None: Average items per millisecond, or None if stats is None or mean is zero.

Source code in packages/utils/src/dataknobs_utils/stats_utils.py
@staticmethod
def get_items_per_milli(stats: StatsAccumulator | None) -> float | None:
    """Extract the average number of items per millisecond from the stats.

    Args:
        stats: StatsAccumulator instance or None.

    Returns:
        float | None: Average items per millisecond, or None if stats is None
            or mean is zero.
    """
    result = None

    if stats is not None:
        if stats.mean > 0:
            result = 1.0 / stats.mean

    return result
get_millis_per_item staticmethod
get_millis_per_item(stats: StatsAccumulator | None) -> float | None

Extract the average number of milliseconds per item from the stats.

Parameters:

Name Type Description Default
stats StatsAccumulator | None

StatsAccumulator instance or None.

required

Returns:

Type Description
float | None

float | None: Average milliseconds per item, or None if stats is None or has no values.

Source code in packages/utils/src/dataknobs_utils/stats_utils.py
@staticmethod
def get_millis_per_item(stats: StatsAccumulator | None) -> float | None:
    """Extract the average number of milliseconds per item from the stats.

    Args:
        stats: StatsAccumulator instance or None.

    Returns:
        float | None: Average milliseconds per item, or None if stats is None
            or has no values.
    """
    result = None

    if stats is not None:
        if stats.n > 0:
            result = stats.mean

    return result
has_window_activity
has_window_activity() -> Tuple[bool, StatsAccumulator]

Determine whether the current window has activity.

Returns:

Type Description
Tuple[bool, StatsAccumulator]

Tuple[bool, StatsAccumulator]: A tuple of (has_activity, current_window_stats) where has_activity is True if the window has any values.

Source code in packages/utils/src/dataknobs_utils/stats_utils.py
def has_window_activity(self) -> Tuple[bool, StatsAccumulator]:
    """Determine whether the current window has activity.

    Returns:
        Tuple[bool, StatsAccumulator]: A tuple of (has_activity, current_window_stats)
            where has_activity is True if the window has any values.
    """
    window_stats = self.window_stats
    return (window_stats.n > 0, window_stats)
reset
reset() -> None

Reset all statistics and restart from current time.

Source code in packages/utils/src/dataknobs_utils/stats_utils.py
def reset(self) -> None:
    """Reset all statistics and restart from current time."""
    self._modlock.acquire()
    try:
        self._starttime = datetime.now()
        self._reftime = self._starttime
        self._cur_segment = 0
        for segment in self._segment_stats:
            segment.clear()
        self._cumulative_stats.clear()
    finally:
        self._modlock.release()

StatsAccumulator

StatsAccumulator(
    label: str = "",
    other: Optional[StatsAccumulator] = None,
    as_dict: Dict[str, Any] | None = None,
    values: Union[float, List[float]] | None = None,
)

A low-memory helper class to collect statistical samples and provide summary statistics.

Accumulates statistical values in a thread-safe manner, computing mean, variance, standard deviation, min, max, and other summary statistics incrementally without storing individual values. Supports combining multiple accumulators and initialization from dictionaries.

Attributes:

Name Type Description
label str

A label or name for this instance.

n int

The number of values added.

min float

The minimum value added.

max float

The maximum value added.

sum float

The sum of all values added.

sum_of_squares float

The sum of all squared values.

mean float

The mean of the values.

std float

The standard deviation of the values.

var float

The variance of the values.

Initialize a StatsAccumulator.

Parameters:

Name Type Description Default
label str

A label or name for this instance. Defaults to "".

''
other Optional[StatsAccumulator]

Another StatsAccumulator instance to copy data from. Defaults to None.

None
as_dict Dict[str, Any] | None

Dictionary representation of stats to initialize with. Defaults to None.

None
values Union[float, List[float]] | None

A list of initial values or a single value to add. Defaults to None.

None

Methods:

Name Description
__str__

Get the statistics as a JSON string.

add

Add one or more values to the accumulator (thread-safe).

as_dict

Get a dictionary containing a summary of this instance's information.

clear

Clear all values and reset statistics.

combine

Create a new StatsAccumulator combining data from multiple accumulators.

incorporate

Incorporate another accumulator's data into this one.

initialize

Initialize with the given values.

Source code in packages/utils/src/dataknobs_utils/stats_utils.py
def __init__(
    self,
    label: str = "",
    other: Optional["StatsAccumulator"] = None,
    as_dict: Dict[str, Any] | None = None,
    values: Union[float, List[float]] | None = None,
) -> None:
    """Initialize a StatsAccumulator.

    Args:
        label: A label or name for this instance. Defaults to "".
        other: Another StatsAccumulator instance to copy data from.
            Defaults to None.
        as_dict: Dictionary representation of stats to initialize with.
            Defaults to None.
        values: A list of initial values or a single value to add.
            Defaults to None.
    """
    self._label: str = label
    self._n = 0
    self._min = 0.0
    self._max = 0.0
    self._sum = 0.0
    self._sos = 0.0
    self._modlock = Lock()

    if other is not None:
        if label != "":
            self._label = other._label
        self._n = other._n
        self._min = other._min
        self._max = other._max
        self._sum = other._sum
        self._sos = other._sos
    if as_dict is not None:
        self.initialize(label=label, as_dict=as_dict)

    if values is not None:
        if isinstance(values, list):
            self.add(*values)
        else:
            self.add(values)
Attributes
label property writable
label: str

Get the label or name

max property
max: float

Get the maximum value added

mean property
mean: float

Get the mean of the values

min property
min: float

Get the minimum value added

n property
n: int

Get the number of values added

std property
std: float

Get the standard deviation of the values

sum property
sum: float

Get the sum of all values added

sum_of_squares property
sum_of_squares: float

Get the sum of all squared values

var property
var: float

Get the variance of the values

Functions
__str__
__str__() -> str

Get the statistics as a JSON string.

Returns:

Name Type Description
str str

JSON representation of the statistics dictionary.

Source code in packages/utils/src/dataknobs_utils/stats_utils.py
def __str__(self) -> str:
    """Get the statistics as a JSON string.

    Returns:
        str: JSON representation of the statistics dictionary.
    """
    return json.dumps(self.as_dict(), sort_keys=True)
add
add(*values: float) -> StatsAccumulator

Add one or more values to the accumulator (thread-safe).

Parameters:

Name Type Description Default
*values float

One or more numeric values to add.

()

Returns:

Name Type Description
StatsAccumulator StatsAccumulator

This instance for method chaining.

Source code in packages/utils/src/dataknobs_utils/stats_utils.py
def add(self, *values: float) -> "StatsAccumulator":
    """Add one or more values to the accumulator (thread-safe).

    Args:
        *values: One or more numeric values to add.

    Returns:
        StatsAccumulator: This instance for method chaining.
    """
    self._modlock.acquire()
    try:
        for value in values:
            self._do_add(value)
    finally:
        self._modlock.release()
    return self
as_dict
as_dict(with_sums: bool = False) -> Dict[str, Any]

Get a dictionary containing a summary of this instance's information.

Parameters:

Name Type Description Default
with_sums bool

If True, include sum and sos (sum of squares) in the output. Defaults to False.

False

Returns:

Type Description
Dict[str, Any]

Dict[str, Any]: Dictionary with keys: label, n, min, max, mean, std, and optionally sum and sos.

Source code in packages/utils/src/dataknobs_utils/stats_utils.py
def as_dict(self, with_sums: bool = False) -> Dict[str, Any]:
    """Get a dictionary containing a summary of this instance's information.

    Args:
        with_sums: If True, include sum and sos (sum of squares) in the output.
            Defaults to False.

    Returns:
        Dict[str, Any]: Dictionary with keys: label, n, min, max, mean, std,
            and optionally sum and sos.
    """
    d = {
        "label": self.label,
        "n": self.n,
        "min": self.min,
        "max": self.max,
        "mean": self.mean,
        "std": self.std,
    }
    if with_sums:
        d.update({"sum": self._sum, "sos": self._sos})
    return d
clear
clear(label: str = '') -> None

Clear all values and reset statistics.

Parameters:

Name Type Description Default
label str

Optional new label to assign. Defaults to "".

''
Source code in packages/utils/src/dataknobs_utils/stats_utils.py
def clear(self, label: str = "") -> None:
    """Clear all values and reset statistics.

    Args:
        label: Optional new label to assign. Defaults to "".
    """
    self._modlock.acquire()
    try:
        self._label = label
        self._n = 0
        self._min = 0.0
        self._max = 0.0
        self._sum = 0.0
        self._sos = 0.0
    finally:
        self._modlock.release()
combine staticmethod
combine(label: str, *stats_accumulators: StatsAccumulator) -> StatsAccumulator

Create a new StatsAccumulator combining data from multiple accumulators.

Parameters:

Name Type Description Default
label str

Label for the new combined accumulator.

required
*stats_accumulators StatsAccumulator

One or more StatsAccumulator instances to combine.

()

Returns:

Name Type Description
StatsAccumulator StatsAccumulator

New accumulator as if it had accumulated all data from the provided accumulators.

Source code in packages/utils/src/dataknobs_utils/stats_utils.py
@staticmethod
def combine(label: str, *stats_accumulators: "StatsAccumulator") -> "StatsAccumulator":
    """Create a new StatsAccumulator combining data from multiple accumulators.

    Args:
        label: Label for the new combined accumulator.
        *stats_accumulators: One or more StatsAccumulator instances to combine.

    Returns:
        StatsAccumulator: New accumulator as if it had accumulated all data
            from the provided accumulators.
    """
    result = StatsAccumulator(label)
    for stats in stats_accumulators:
        result.incorporate(stats)
    return result
incorporate
incorporate(other: Optional[StatsAccumulator]) -> None

Incorporate another accumulator's data into this one.

Merges the statistics as if this accumulator had accumulated all values from both accumulators.

Parameters:

Name Type Description Default
other Optional[StatsAccumulator]

Another StatsAccumulator to incorporate. If None, no action taken.

required
Source code in packages/utils/src/dataknobs_utils/stats_utils.py
def incorporate(self, other: Optional["StatsAccumulator"]) -> None:
    """Incorporate another accumulator's data into this one.

    Merges the statistics as if this accumulator had accumulated all values
    from both accumulators.

    Args:
        other: Another StatsAccumulator to incorporate. If None, no action taken.
    """
    if other is None:
        return

    self._modlock.acquire()
    try:
        if self._n == 0:
            self._min = other.min
            self._max = other.max
        else:
            self._min = min(self._min, other.min)
            self._max = max(self._max, other.max)

        self._n += other.n
        self._sos += other.sum_of_squares
        self._sum += other.sum
    finally:
        self._modlock.release()
initialize
initialize(
    label: str = "",
    n: int = 0,
    min: float = 0,
    max: float = 0,
    mean: float = 0,
    std: float = 0,
    as_dict: Dict[str, Any] | None = None,
) -> None

Initialize with the given values.

When as_dict is provided, its values override the individual parameters. Computes internal sum and sum_of_squares from mean and std if not provided in the dictionary.

Parameters:

Name Type Description Default
label str

A label or name for this instance. Defaults to "".

''
n int

Number of values. Defaults to 0.

0
min float

Minimum value. Defaults to 0.

0
max float

Maximum value. Defaults to 0.

0
mean float

Mean value. Defaults to 0.

0
std float

Standard deviation. Defaults to 0.

0
as_dict Dict[str, Any] | None

Dictionary with keys: label, n, min, max, mean, std, sum, sos. Values from this dict override individual parameters. Defaults to None.

None
Source code in packages/utils/src/dataknobs_utils/stats_utils.py
def initialize(
    self,
    label: str = "",
    n: int = 0,
    min: float = 0,
    max: float = 0,
    mean: float = 0,
    std: float = 0,
    as_dict: Dict[str, Any] | None = None,
) -> None:
    """Initialize with the given values.

    When as_dict is provided, its values override the individual parameters.
    Computes internal sum and sum_of_squares from mean and std if not provided
    in the dictionary.

    Args:
        label: A label or name for this instance. Defaults to "".
        n: Number of values. Defaults to 0.
        min: Minimum value. Defaults to 0.
        max: Maximum value. Defaults to 0.
        mean: Mean value. Defaults to 0.
        std: Standard deviation. Defaults to 0.
        as_dict: Dictionary with keys: label, n, min, max, mean, std, sum, sos.
            Values from this dict override individual parameters. Defaults to None.
    """
    if as_dict is not None:
        if "label" in as_dict:
            label = as_dict["label"]
        if "n" in as_dict:
            n = as_dict["n"]
        if "min" in as_dict:
            min = as_dict["min"]
        if "max" in as_dict:
            max = as_dict["max"]
        if "mean" in as_dict:
            mean = as_dict["mean"]
        if "std" in as_dict:
            std = as_dict["std"]

    self._modlock.acquire()
    try:
        self._label = label
        self._n = n
        self._min = min
        self._max = max
        if as_dict is not None and "sum" in as_dict:
            self._sum = as_dict["sum"]
        else:
            self._sum = mean * n
        if as_dict is not None and "sos" in as_dict:
            self._sos = as_dict["sos"]
        else:
            self._sos = 0 if n == 0 else std * std * (n - 1.0) + self._sum * self._sum / n
    finally:
        self._modlock.release()

Functions

wait_for_random_millis

wait_for_random_millis(uptomillis: float) -> float

Wait for a random number of milliseconds.

Parameters:

Name Type Description Default
uptomillis float

Maximum number of milliseconds to wait.

required

Returns:

Name Type Description
float float

The actual wait time in milliseconds.

Source code in packages/utils/src/dataknobs_utils/stats_utils.py
def wait_for_random_millis(uptomillis: float) -> float:
    """Wait for a random number of milliseconds.

    Args:
        uptomillis: Maximum number of milliseconds to wait.

    Returns:
        float: The actual wait time in milliseconds.
    """
    waittime = random.uniform(1, uptomillis)
    time.sleep(waittime / 1000)
    return waittime

subprocess_utils

Functions

dataknobs_utils.subprocess_utils

Subprocess execution utilities for running system commands.

Provides functions for executing system commands and processing their output line-by-line with callback handlers.

Functions:

Name Description
run_command

Run a system command and process output line by line.

Functions

run_command

run_command(
    handle_line_fn: Callable[[str], bool],
    command: str,
    args: List[str] | None = None,
) -> int

Run a system command and process output line by line.

Executes a command and calls a handler function for each line of output. The handler can signal early termination by returning False.

Parameters:

Name Type Description Default
handle_line_fn Callable[[str], bool]

Callback function that takes each output line and returns True to continue processing or False to kill the process immediately.

required
command str

Command string with args (if args=None) or just command name (if args provided).

required
args List[str] | None

Optional list of command arguments. If provided, command runs without shell=True. Defaults to None.

None

Returns:

Name Type Description
int int

The command's return code, or 0 if poll returns None.

Examples:

>>> from dataknobs_utils.subprocess_utils import run_command
>>>
>>> # Print all files in the directory:
>>> run_command(lambda x: (print(x), True)[1], 'ls -1')
>>>
>>> # Print files until "foo" is found:
>>> run_command(lambda x: (print(x), x!='foo')[1], 'ls -1')
Source code in packages/utils/src/dataknobs_utils/subprocess_utils.py
def run_command(
    handle_line_fn: Callable[[str], bool], command: str, args: List[str] | None = None
) -> int:
    """Run a system command and process output line by line.

    Executes a command and calls a handler function for each line of output.
    The handler can signal early termination by returning False.

    Args:
        handle_line_fn: Callback function that takes each output line and returns
            True to continue processing or False to kill the process immediately.
        command: Command string with args (if args=None) or just command name
            (if args provided).
        args: Optional list of command arguments. If provided, command runs
            without shell=True. Defaults to None.

    Returns:
        int: The command's return code, or 0 if poll returns None.

    Examples:
        >>> from dataknobs_utils.subprocess_utils import run_command
        >>>
        >>> # Print all files in the directory:
        >>> run_command(lambda x: (print(x), True)[1], 'ls -1')
        >>>
        >>> # Print files until "foo" is found:
        >>> run_command(lambda x: (print(x), x!='foo')[1], 'ls -1')
    """
    the_args: Union[str, List[str]] = command
    shell = True
    if args is not None:
        the_args = [command] + args
        shell = False
    process = subprocess.Popen(the_args, stdout=subprocess.PIPE, shell=shell, encoding="utf8")
    while True:
        if process.stdout is not None:
            output = process.stdout.readline()
            if output == "" and process.poll() is not None:
                break
            if output:
                if not handle_line_fn(output.strip()):
                    process.kill()
                    break
        else:
            break
    rc = process.poll()
    return rc if rc is not None else 0

sys_utils

Functions

dataknobs_utils.sys_utils

System and environment utility functions.

Provides utilities for loading environment variables, network discovery, and system configuration management.

Classes:

Name Description
MySubnet

Collect and store information about the current process's subnet.

Functions:

Name Description
load_project_vars

Load project variables from the closest configuration file.

Classes

MySubnet

MySubnet()

Collect and store information about the current process's subnet.

Discovers and caches information about the local machine and other hosts on the same subnet using nmap network scanning.

Attributes:

Name Type Description
my_hostname str

The local machine's hostname.

my_ip str

The local machine's IP address.

all_ips Dict[str, str]

Dictionary mapping hostnames to IP addresses for all active hosts on the subnet (cached).

Methods:

Name Description
get_ip

Get the first IP address matching a hostname pattern.

get_ips

Get IP addresses of hosts matching a name pattern.

rescan

Clear cached subnet information to force a fresh scan.

Source code in packages/utils/src/dataknobs_utils/sys_utils.py
def __init__(self) -> None:
    self._my_hostname: str | None = None
    self._my_ip: str | None = None
    self._subnet_ips: Dict[str, str] | None = None
Attributes
all_ips property
all_ips: Dict[str, str]

Get a dictionary mapping hostname to ip_address of all hosts that are "up" on the currently running process's subnet.

These are cached on first call but will be recomputed after

a call to self.rescan().

my_hostname property
my_hostname: str

Get my hostname

my_ip property
my_ip: str

Get my IP address

Functions
get_ip
get_ip(name_re: Union[str, Pattern]) -> str | None

Get the first IP address matching a hostname pattern.

Parameters:

Name Type Description Default
name_re Union[str, Pattern]

Regular expression pattern or string to match against hostnames.

required

Returns:

Type Description
str | None

str | None: The first matching IP address, or None if no matches found.

Source code in packages/utils/src/dataknobs_utils/sys_utils.py
def get_ip(self, name_re: Union[str, re.Pattern]) -> str | None:
    """Get the first IP address matching a hostname pattern.

    Args:
        name_re: Regular expression pattern or string to match against
            hostnames.

    Returns:
        str | None: The first matching IP address, or None if no matches found.
    """
    ips = self.get_ips(name_re)
    if len(ips) > 0:
        return next(iter(ips.values()))
    return None
get_ips
get_ips(name_re: Union[str, Pattern]) -> Dict[str, str]

Get IP addresses of hosts matching a name pattern.

Parameters:

Name Type Description Default
name_re Union[str, Pattern]

Regular expression pattern or string to match against hostnames.

required

Returns:

Type Description
Dict[str, str]

Dict[str, str]: Dictionary mapping matching hostnames to their IP addresses.

Source code in packages/utils/src/dataknobs_utils/sys_utils.py
def get_ips(self, name_re: Union[str, re.Pattern]) -> Dict[str, str]:
    """Get IP addresses of hosts matching a name pattern.

    Args:
        name_re: Regular expression pattern or string to match against
            hostnames.

    Returns:
        Dict[str, str]: Dictionary mapping matching hostnames to their IP
            addresses.
    """
    return {name: ip for name, ip in self.all_ips.items() if re.match(name_re, name)}
rescan
rescan() -> None

Clear cached subnet information to force a fresh scan.

Call this method to invalidate the cached subnet hosts data. The next access to all_ips will trigger a new nmap scan.

Source code in packages/utils/src/dataknobs_utils/sys_utils.py
def rescan(self) -> None:
    """Clear cached subnet information to force a fresh scan.

    Call this method to invalidate the cached subnet hosts data. The next
    access to all_ips will trigger a new nmap scan.
    """
    self._subnet_ips = None

Functions

load_project_vars

load_project_vars(
    pvname: str = ".project_vars",
    include_dot_env: bool = True,
    set_environ: bool = False,
    start_path: Path | None = None,
) -> Dict[str, str] | None

Load project variables from the closest configuration file.

Walks up the directory tree from the current working directory to find the closest project variables file and optionally merges with .env settings.

Notes
  • Project variable files can be checked into the repo (public)
  • .env files should not be checked in (contain secrets)
  • .env variables will override project variables

Parameters:

Name Type Description Default
pvname str

Name of the project variables file. Defaults to ".project_vars".

'.project_vars'
include_dot_env bool

If True, also loads and merges the closest .env file. Defaults to True.

True
set_environ bool

If True, also sets loaded variables in os.environ. Only sets variables not already in environment. Defaults to False.

False
start_path Path | None

Directory to start searching from. Defaults to cwd.

None

Returns:

Type Description
Dict[str, str] | None

Dict[str, str] | None: Dictionary of configuration variables, or None if no configuration file is found.

Source code in packages/utils/src/dataknobs_utils/sys_utils.py
def load_project_vars(
    pvname: str = ".project_vars",
    include_dot_env: bool = True,
    set_environ: bool = False,
    start_path: Path | None = None,
) -> Dict[str, str] | None:
    """Load project variables from the closest configuration file.

    Walks up the directory tree from the current working directory to find
    the closest project variables file and optionally merges with .env settings.

    Notes:
        - Project variable files can be checked into the repo (public)
        - .env files should not be checked in (contain secrets)
        - .env variables will override project variables

    Args:
        pvname: Name of the project variables file. Defaults to ".project_vars".
        include_dot_env: If True, also loads and merges the closest .env file.
            Defaults to True.
        set_environ: If True, also sets loaded variables in os.environ.
            Only sets variables not already in environment. Defaults to False.
        start_path: Directory to start searching from. Defaults to cwd.

    Returns:
        Dict[str, str] | None: Dictionary of configuration variables, or None
            if no configuration file is found.
    """
    config = None
    path = Path(start_path) if start_path else Path.cwd()
    while not os.path.exists(path.joinpath(pvname)) and path.parent != path:
        # Walk up the parents to find the closest project variables file
        path = path.parent
    pvpath = path.joinpath(pvname).absolute()
    if os.path.exists(pvpath):
        config = dotenv_values(pvpath)
    if include_dot_env and pvname != ".env":
        cfg = load_project_vars(
            pvname=".env",
            include_dot_env=False,
            set_environ=False,
            start_path=start_path,
        )
        if cfg is not None:
            if config is not None:
                config.update(cfg)
            else:
                config = cfg

    # Optionally set in os.environ
    if set_environ and config:
        for key, value in config.items():
            if key not in os.environ:
                os.environ[key] = value

    return config

xml_utils

Functions

dataknobs_utils.xml_utils

XML processing utilities with streaming and memory-efficient parsing.

Provides classes and functions for parsing and streaming XML documents, including the XmlStream abstract base class for memory-efficient processing.

Classes:

Name Description
XMLTagStream

Memory-efficient XML tag chunking using memory-mapped file access.

XmlElementGrabber

Stream matching XML elements with their full path from root.

XmlLeafStream

Stream XML leaf nodes with their full path from root.

XmlStream

Abstract base class for streaming XML content with memory management.

Functions:

Name Description
html_table_scraper

Extract HTML table data into a pandas DataFrame.

soup_generator

Generate BeautifulSoup objects for each occurrence of a tag in XML.

Classes

XMLTagStream

XMLTagStream(
    path: str, tag_name: str, with_attrs: bool = False, encoding: str = "utf-8"
)

Memory-efficient XML tag chunking using memory-mapped file access.

Processes large XML files by extracting individual tag instances as BeautifulSoup objects using memory-mapped I/O for efficient processing without loading the entire file.

Attributes:

Name Type Description
tag_name

XML tag name to extract.

encoding

Character encoding of the XML file.

Source code in packages/utils/src/dataknobs_utils/xml_utils.py
def __init__(
    self, path: str, tag_name: str, with_attrs: bool = False, encoding: str = "utf-8"
) -> None:
    self.file = open(path, encoding="utf-8")
    self.stream = mmap.mmap(self.file.fileno(), 0, access=mmap.ACCESS_READ)
    self.tag_name = tag_name
    self.encoding = encoding
    tag_end = " " if with_attrs else ">"
    self.start_tag = f"<{tag_name}{tag_end}".encode(encoding)
    self.end_tag = f"</{tag_name}>".encode(encoding)

XmlElementGrabber

XmlElementGrabber(
    source: str | TextIO,
    match: str | Callable[[Element], bool],
    auto_clear_elts: bool = True,
)

Bases: XmlStream

Stream matching XML elements with their full path from root.

Finds and yields elements matching a tag name or custom condition, returning the highest matching element (not searching within matched elements).

Examples:

Get first 10 elements with tag "foo":

>>> import dataknobs_utils.xml_utils as xml_utils
>>> xml_fpath = "path/to/file.xml"  # Replace with actual XML file path
>>> g = xml_utils.XmlElementGrabber(xml_fpath, "foo")
>>> first_10_foos = list(g.take(10))

Attributes:

Name Type Description
match

Tag name or callable for matching elements.

count

Number of matching elements found.

Initialize the XML element grabber.

Parameters:

Name Type Description Default
source str | TextIO

Path to XML file or file object.

required
match str | Callable[[Element], bool]

Tag name to match or callable that returns True when an element should be matched.

required
auto_clear_elts bool

If True, automatically clears closed elements. Defaults to True.

True

Methods:

Name Description
loop_through_elements

Find the next matching element with its full path from root.

Source code in packages/utils/src/dataknobs_utils/xml_utils.py
def __init__(
    self,
    source: str | TextIO,
    match: str | Callable[[ET.Element], bool],
    auto_clear_elts: bool = True,
):
    """Initialize the XML element grabber.

    Args:
        source: Path to XML file or file object.
        match: Tag name to match or callable that returns True when an
            element should be matched.
        auto_clear_elts: If True, automatically clears closed elements.
            Defaults to True.
    """
    super().__init__(source, auto_clear_elts=auto_clear_elts)
    self.match = match
    self.count = 0  # The number of match nodes seen
Functions
loop_through_elements
loop_through_elements() -> list[ET.Element]

Find the next matching element with its full path from root.

Returns:

Type Description
list[Element]

list[ET.Element]: Element path from root to the next matching element.

Source code in packages/utils/src/dataknobs_utils/xml_utils.py
def loop_through_elements(self) -> list[ET.Element]:
    """Find the next matching element with its full path from root.

    Returns:
        list[ET.Element]: Element path from root to the next matching element.
    """
    gotit: list[ET.Element] | None = None
    grabbing: ET.Element | None = None
    while True:
        event, elem = self.next_xml_iter()
        if event == "start":
            self.add_to_context(elem)
            if grabbing is None and self._is_match(elem):
                grabbing = elem
        elif event == "end":
            if grabbing is not None and elem == grabbing:
                # Finished collecting match element
                grabbing = None
                self.count += 1
                gotit = self.context
            # Pop the closed element from the context
            self.pop_closed_from_context(elem)
            if gotit:
                break
    return gotit or []

XmlLeafStream

XmlLeafStream(source: str | TextIO, auto_clear_elts: bool = True)

Bases: XmlStream

Stream XML leaf nodes with their full path from root.

Iterates through an XML document yielding each leaf (terminal) element along with its ancestor path from the root. Useful for extracting data from deeply nested XML structures.

Examples:

Show paths to first 10 leaf nodes with text or attribute values:

import dataknobs_utils.xml_utils as xml_utils
xml_fpath = "path/to/file.xml"  # Replace with actual XML file path
s = xml_utils.XmlLeafStream(xml_fpath)
for idx, elts in enumerate(s):
    print(f'{idx} ', s.to_string(elts, ["value", "extension", "code", "ID"]))
    if idx >= 9:
        break

Attributes:

Name Type Description
count

Number of leaf nodes processed.

elts list[Element] | None

Most recently yielded element path.

Initialize the XML leaf stream.

Parameters:

Name Type Description Default
source str | TextIO

Path to XML file or file object.

required
auto_clear_elts bool

If True, automatically clears closed elements. Defaults to True.

True

Methods:

Name Description
loop_through_elements

Collect the next leaf element with its full path from root.

Source code in packages/utils/src/dataknobs_utils/xml_utils.py
def __init__(self, source: str | TextIO, auto_clear_elts: bool = True):
    """Initialize the XML leaf stream.

    Args:
        source: Path to XML file or file object.
        auto_clear_elts: If True, automatically clears closed elements.
            Defaults to True.
    """
    super().__init__(source, auto_clear_elts=auto_clear_elts)
    self._last_elt: ET.Element | None = None  # The last new element
    self.count = 0  # The number of terminal nodes seen
    self.elts: list[ET.Element] | None = None  # The latest yielded sequence
Functions
loop_through_elements
loop_through_elements() -> list[ET.Element]

Collect the next leaf element with its full path from root.

Returns:

Type Description
list[Element]

list[ET.Element]: Element path from root to the next leaf element.

Source code in packages/utils/src/dataknobs_utils/xml_utils.py
def loop_through_elements(self) -> list[ET.Element]:
    """Collect the next leaf element with its full path from root.

    Returns:
        list[ET.Element]: Element path from root to the next leaf element.
    """
    gotit: list[ET.Element] | None = None
    while True:
        event, elem = self.next_xml_iter()
        if event == "start":
            self._last_elt = elem
            self.add_to_context(elem)
        elif event == "end":
            last_idx = self.context_length - 1
            idx = self.find_context_idx(elem)
            if idx == last_idx and elem == self._last_elt:
                # Is terminal if its the last elt that was added that ended
                self.elts = self.context
                self.count += 1
                gotit = self.elts
            # Pop off the closed element(s)
            self.pop_closed_from_context(elem, idx=idx)
            # Reset to record the next added element
            self._last_elt = None
            if gotit:
                break
    return gotit or []

XmlStream

XmlStream(source: str | TextIO, auto_clear_elts: bool = True)

Bases: ABC

Abstract base class for streaming XML content with memory management.

Provides a framework for iterating through XML documents using element-based streaming. Subclasses must implement loop_through_elements() to define specific element collection behavior. Automatically manages element cleanup to prevent memory buildup when processing large XML files.

Note

Extending classes must implement the loop_through_elements() abstract method.

Attributes:

Name Type Description
source

XML filename or file object.

auto_clear_elts

If True, automatically clears closed elements to save memory.

Initialize the XML stream.

Parameters:

Name Type Description Default
source str | TextIO

Path to XML file or file object.

required
auto_clear_elts bool

If True, automatically clears closed elements to prevent memory buildup. Defaults to True.

True

Methods:

Name Description
add_to_context

Add an element to the context stack.

find_context_idx

Find the most recent index of an element in the context stack.

loop_through_elements

Process XML elements until collecting the next desired element(s).

next_xml_iter

Get the next event and element from the XML iterator.

pop_closed_from_context

Remove a closed element and its descendants from the context.

take

Generate the next N items from this iterator.

to_string

Convert element path to dot-delimited string with optional leaf value.

Source code in packages/utils/src/dataknobs_utils/xml_utils.py
def __init__(self, source: str | TextIO, auto_clear_elts: bool = True):
    """Initialize the XML stream.

    Args:
        source: Path to XML file or file object.
        auto_clear_elts: If True, automatically clears closed elements to prevent
            memory buildup. Defaults to True.
    """
    self._xml_iter: Any | None = None
    self.source = source
    self.auto_clear_elts = auto_clear_elts
    self._context: list[ET.Element] = []
    self._closed_elt: ET.Element | None = None
Attributes
context property
context: list[Element]

Get the current element context stack from root to current position.

Returns:

Type Description
list[Element]

list[ET.Element]: Copy of the element stack from root to current element.

context_length property
context_length: int

Get the depth of the current element in the XML tree.

Returns:

Name Type Description
int int

Number of elements in the context stack.

Functions
add_to_context
add_to_context(elem: Element) -> None

Add an element to the context stack.

Parameters:

Name Type Description Default
elem Element

Element to add to the context.

required
Source code in packages/utils/src/dataknobs_utils/xml_utils.py
def add_to_context(self, elem: ET.Element) -> None:
    """Add an element to the context stack.

    Args:
        elem: Element to add to the context.
    """
    self._context.append(elem)
find_context_idx
find_context_idx(elem: Element) -> int

Find the most recent index of an element in the context stack.

Searches backwards through the context to find the element's index.

Parameters:

Name Type Description Default
elem Element

Element to find in the context.

required

Returns:

Name Type Description
int int

Index of the element in context, or -1 if not found.

Source code in packages/utils/src/dataknobs_utils/xml_utils.py
def find_context_idx(self, elem: ET.Element) -> int:
    """Find the most recent index of an element in the context stack.

    Searches backwards through the context to find the element's index.

    Args:
        elem: Element to find in the context.

    Returns:
        int: Index of the element in context, or -1 if not found.
    """
    idx = len(self._context) - 1
    while idx >= 0:
        if self._context[idx] == elem:
            break
        idx -= 1
    return idx
loop_through_elements abstractmethod
loop_through_elements() -> list[ET.Element]

Process XML elements until collecting the next desired element(s).

Subclasses must implement this method to define specific element collection behavior, updating the context as elements are encountered.

Returns:

Type Description
list[Element]

list[ET.Element]: The collected element(s) in context from root.

Raises:

Type Description
NotImplementedError

Must be implemented by subclasses.

Source code in packages/utils/src/dataknobs_utils/xml_utils.py
@abstractmethod
def loop_through_elements(self) -> list[ET.Element]:
    """Process XML elements until collecting the next desired element(s).

    Subclasses must implement this method to define specific element
    collection behavior, updating the context as elements are encountered.

    Returns:
        list[ET.Element]: The collected element(s) in context from root.

    Raises:
        NotImplementedError: Must be implemented by subclasses.
    """
    raise NotImplementedError
next_xml_iter
next_xml_iter() -> tuple[str, ET.Element]

Get the next event and element from the XML iterator.

Returns:

Type Description
tuple[str, Element]

tuple[str, ET.Element]: Tuple of (event, element) where event is 'start' or 'end'.

Raises:

Type Description
StopIteration

When the XML iterator is exhausted.

Source code in packages/utils/src/dataknobs_utils/xml_utils.py
def next_xml_iter(self) -> tuple[str, ET.Element]:
    """Get the next event and element from the XML iterator.

    Returns:
        tuple[str, ET.Element]: Tuple of (event, element) where event is
            'start' or 'end'.

    Raises:
        StopIteration: When the XML iterator is exhausted.
    """
    if self._xml_iter is None:
        self.__iter__()
    if self._xml_iter is not None:
        try:
            event, elem = next(self._xml_iter)
            return event, elem
        except StopIteration as exc:
            raise StopIteration from exc
    raise StopIteration
pop_closed_from_context
pop_closed_from_context(closed_elem: Element, idx: int | None = None) -> None

Remove a closed element and its descendants from the context.

Truncates the context at the element's position and optionally clears the element's memory if auto_clear_elts is enabled.

Parameters:

Name Type Description Default
closed_elem Element

The element that has closed.

required
idx int | None

Index of the element in context (computed if None). Defaults to None.

None
Source code in packages/utils/src/dataknobs_utils/xml_utils.py
def pop_closed_from_context(self, closed_elem: ET.Element, idx: int | None = None) -> None:
    """Remove a closed element and its descendants from the context.

    Truncates the context at the element's position and optionally clears
    the element's memory if auto_clear_elts is enabled.

    Args:
        closed_elem: The element that has closed.
        idx: Index of the element in context (computed if None). Defaults to None.
    """
    if idx is None:
        idx = self.find_context_idx(closed_elem)
    if idx >= 0:
        self._context = self._context[:idx]
    if self.auto_clear_elts:
        if self._closed_elt is not None:
            self._closed_elt.clear()  # Clear memory
        self._closed_elt = closed_elem
take
take(n: int) -> Generator[list[ET.Element], None, None]

Generate the next N items from this iterator.

Parameters:

Name Type Description Default
n int

Number of items to take.

required

Yields:

Type Description
list[Element]

list[ET.Element]: Each collected element list, stopping early if iterator is exhausted.

Source code in packages/utils/src/dataknobs_utils/xml_utils.py
def take(self, n: int) -> Generator[list[ET.Element], None, None]:
    """Generate the next N items from this iterator.

    Args:
        n: Number of items to take.

    Yields:
        list[ET.Element]: Each collected element list, stopping early if
            iterator is exhausted.
    """
    idx = 0
    while idx < n:
        try:
            elts = next(self)
        except StopIteration:
            return
        yield elts
        idx += 1
to_string staticmethod
to_string(
    elt_path: list[Element], with_text_or_atts: bool | str | list[str] = True
) -> str

Convert element path to dot-delimited string with optional leaf value.

Creates an XPath-like string representation of the element hierarchy, optionally appending text or attribute values from the leaf element.

Parameters:

Name Type Description Default
elt_path list[Element]

List of elements from root to leaf.

required
with_text_or_atts bool | str | list[str]

Controls value appending behavior: - True: Append leaf element's text if present - str: Append value of specified attribute if present - list[str]: Append first non-empty attribute value from list - False: Don't append any values

True

Returns:

Name Type Description
str str

Dot-delimited path (e.g., "root.child.leaf|text=\"value\"").

Source code in packages/utils/src/dataknobs_utils/xml_utils.py
@staticmethod
def to_string(
    elt_path: list[ET.Element], with_text_or_atts: bool | str | list[str] = True
) -> str:
    r"""Convert element path to dot-delimited string with optional leaf value.

    Creates an XPath-like string representation of the element hierarchy,
    optionally appending text or attribute values from the leaf element.

    Args:
        elt_path: List of elements from root to leaf.
        with_text_or_atts: Controls value appending behavior:
            - True: Append leaf element's text if present
            - str: Append value of specified attribute if present
            - list[str]: Append first non-empty attribute value from list
            - False: Don't append any values

    Returns:
        str: Dot-delimited path (e.g., "root.child.leaf|text=\"value\"").
    """
    rv = ".".join(e.tag for e in elt_path)
    if with_text_or_atts:
        elem = elt_path[-1]
        text = None
        if elem.text:
            text = f'|text="{elem.text}"'
        elif isinstance(with_text_or_atts, str):
            val = elem.get(with_text_or_atts)
            if val:
                text = f'|{with_text_or_atts}="{text}"'
                text = val
        elif isinstance(with_text_or_atts, list):
            for att in with_text_or_atts:
                val = elem.get(att)
                if val:
                    text = f'|{att}="{val}"'
                    break
        if text:
            rv += text
    return rv

Functions

html_table_scraper

html_table_scraper(
    soup_table: Tag, add_header_as_row: bool = False
) -> pd.DataFrame

Extract HTML table data into a pandas DataFrame.

Scrapes table data from a BeautifulSoup table element, handling headers, nested elements, and non-breaking spaces. Concatenates text from nested elements within cells.

Parameters:

Name Type Description Default
soup_table Tag

BeautifulSoup table element to scrape.

required
add_header_as_row bool

If True, includes header row in the data rows in addition to using it as column names. Defaults to False.

False

Returns:

Type Description
DataFrame

pd.DataFrame: DataFrame with scraped table data. Column names are taken from

elements if present, otherwise columns are unnamed.

Source code in packages/utils/src/dataknobs_utils/xml_utils.py
def html_table_scraper(
    soup_table: bs4.element.Tag,
    add_header_as_row: bool = False,
) -> pd.DataFrame:
    """Extract HTML table data into a pandas DataFrame.

    Scrapes table data from a BeautifulSoup table element, handling headers,
    nested elements, and non-breaking spaces. Concatenates text from nested
    elements within cells.

    Args:
        soup_table: BeautifulSoup table element to scrape.
        add_header_as_row: If True, includes header row in the data rows
            in addition to using it as column names. Defaults to False.

    Returns:
        pd.DataFrame: DataFrame with scraped table data. Column names are taken
            from <th> elements if present, otherwise columns are unnamed.
    """
    columns: list[str] | None = None
    rows: list[list[str]] = []

    def html_text(elt: bs4.element.Tag) -> str:
        return elt.text.replace("\xa0", " ").strip()

    def td_text(td: bs4.element.Tag) -> str:
        return "\n".join(
            [  # add a \n between text of td elements
                x
                for x in [html_text(c) for c in td.children if isinstance(c, bs4.element.Tag)]
                if x  # drop empty elements under a td
            ]
        )

    for tr in soup_table.find_all("tr"):
        if columns is None:
            if tr.find("th") is not None:
                columns = [td_text(th) for th in tr.find_all("th")]
                if add_header_as_row:
                    rows.append(columns)
                continue
            else:
                columns = []
        row = [td_text(td) for td in tr.find_all("td")]
        if len(row) > 0:
            rows.append(row)

    return pd.DataFrame(rows, columns=columns if columns and len(columns) > 0 else None)

soup_generator

soup_generator(
    xmlfilepath: str,
    tag_name: str,
    with_attrs: bool = False,
    encoding: str = "utf-8",
) -> Generator[bs4.BeautifulSoup, None, None]

Generate BeautifulSoup objects for each occurrence of a tag in XML.

Efficiently processes large XML files by yielding BeautifulSoup objects for each instance of the specified tag using memory-mapped I/O.

Parameters:

Name Type Description Default
xmlfilepath str

Path to the XML file.

required
tag_name str

XML tag name to extract.

required
with_attrs bool

If True, matches tags with attributes. Defaults to False.

False
encoding str

File character encoding. Defaults to "utf-8".

'utf-8'

Yields:

Type Description
BeautifulSoup

bs4.BeautifulSoup: Soup object for each tag instance in the file.

Source code in packages/utils/src/dataknobs_utils/xml_utils.py
def soup_generator(
    xmlfilepath: str, tag_name: str, with_attrs: bool = False, encoding: str = "utf-8"
) -> Generator[bs4.BeautifulSoup, None, None]:
    """Generate BeautifulSoup objects for each occurrence of a tag in XML.

    Efficiently processes large XML files by yielding BeautifulSoup objects
    for each instance of the specified tag using memory-mapped I/O.

    Args:
        xmlfilepath: Path to the XML file.
        tag_name: XML tag name to extract.
        with_attrs: If True, matches tags with attributes. Defaults to False.
        encoding: File character encoding. Defaults to "utf-8".

    Yields:
        bs4.BeautifulSoup: Soup object for each tag instance in the file.
    """
    with XMLTagStream(
        xmlfilepath,
        tag_name,
        with_attrs=with_attrs,
        encoding=encoding,
    ) as stream:
        for soup in stream:
            yield soup

emoji_utils

Functions

dataknobs_utils.emoji_utils

Utilities for working with unicode emojis.

References: * https://home.unicode.org/emoji/about-emoji/ * https://unicode.org/emoji/techindex.html * https://www.unicode.org/Public/emoji/15.0/ * https://www.unicode.org/Public/emoji/15.0/emoji-test.txt

Emoji basics:

  • Emojis are represented by one ore more characters
  • Compound emojis are built using a "zero width joiner" of U+200D
  • This distinguishes a complex compound emoji from an adjacent emoji
  • That can be displayed as a single emoji
  • Such sequences are called "ZWJ" sequences
  • An optional "variation selector 16", U+FE0F, can follow an emoji's chars
  • This indicates to render the emoji with its variation(s)

Version and Updates:

  • Current unicode version is 15.0
  • Data collection is from emoji-test.txt
    • Watch out for format changes in that file if/when updating the version

Usage:

  • Download the desired version's emoji-test.txt resource.
  • Create an EmojiData instance with the path to the resource
  • Use EmojiData to
  • Mark emoji locations (BIO) in text
  • Extract emojis from text
  • Lookup, browse, investigate emojis with their metadata (Emoji dataclass)

Classes:

Name Description
Emoji

Metadata for a Unicode emoji from the emoji-test.txt file.

EmojiData

Parser and analyzer for Unicode emoji-test.txt files.

Functions:

Name Description
build_emoji_dataclass

Parse an emoji-test.txt file line into an Emoji dataclass.

get_emoji_seq

Convert emoji string to sequence of code points.

load_emoji_data

Load emoji data from emoji-test.txt file.

Classes

Emoji dataclass

Emoji(
    emoji: str,
    status: str,
    since_version: str,
    short_name: str,
    group: str | None = None,
    subgroup: str | None = None,
)

Metadata for a Unicode emoji from the emoji-test.txt file.

Attributes:

Name Type Description
emoji str

The emoji character(s) as a string.

status str

Qualification status (e.g., 'fully-qualified', 'minimally-qualified', 'unqualified', 'component').

since_version str

Unicode version when emoji was introduced (e.g., 'E13.0').

short_name str

Short English name describing the emoji.

group str | None

Optional emoji group category from test file. Defaults to None.

subgroup str | None

Optional emoji subgroup category from test file. Defaults to None.

EmojiData

EmojiData(emoji_test_path: str)

Parser and analyzer for Unicode emoji-test.txt files.

Loads emoji metadata from Unicode's emoji-test.txt file and provides utilities for identifying emojis in text, extracting emoji sequences, and querying emoji properties.

Attributes:

Name Type Description
emojis Dict[str, Emoji]

Dictionary mapping emoji characters to their Emoji metadata.

Methods:

Name Description
emoji_bio

Create BIO tags identifying emoji character positions in text.

emojis_with_cp

Find all emojis containing a specific code point.

get_emojis

Extract all emojis from text with their metadata.

Source code in packages/utils/src/dataknobs_utils/emoji_utils.py
def __init__(self, emoji_test_path: str):
    self.emojis: Dict[str, Emoji] = {}  # emojichars -> EmojiData
    self._echars: List[int] | None = None
    self._ldepechars: Dict[int, Set[int]] | None = None
    self._rdepechars: Dict[int, Set[int]] | None = None
    self._load_emoji_test(emoji_test_path)
Attributes
echars property
echars: List[int]

Get code points that standalone represent complete emojis.

Returns:

Type Description
List[int]

List[int]: List of code points that by themselves form valid emojis.

ldepechars property
ldepechars: Dict[int, Set[int]]

Get code points that precede other code points in emoji sequences.

Maps code points that are not standalone emojis but can precede emoji code points in compound sequences.

Returns:

Type Description
Dict[int, Set[int]]

Dict[int, Set[int]]: Dictionary where keys are left-dependent code points and values are sets of emoji code points that can follow.

rdepechars property
rdepechars: Dict[int, Set[int]]

Get code points that follow other code points in emoji sequences.

Maps code points that are not standalone emojis but can follow emoji code points in compound sequences.

Returns:

Type Description
Dict[int, Set[int]]

Dict[int, Set[int]]: Dictionary where keys are right-dependent code points and values are sets of emoji code points that can precede.

Functions
emoji_bio
emoji_bio(emoji_text: str) -> str

Create BIO tags identifying emoji character positions in text.

Generates a string of the same length as the input where each character is tagged as 'B' (Begin - first char of emoji), 'I' (Internal - subsequent chars in emoji), or 'O' (Outer - not part of emoji).

Parameters:

Name Type Description Default
emoji_text str

Input text to analyze.

required

Returns:

Name Type Description
str str

BIO-tagged string of same length as input.

Source code in packages/utils/src/dataknobs_utils/emoji_utils.py
def emoji_bio(self, emoji_text: str) -> str:
    """Create BIO tags identifying emoji character positions in text.

    Generates a string of the same length as the input where each character
    is tagged as 'B' (Begin - first char of emoji), 'I' (Internal - subsequent
    chars in emoji), or 'O' (Outer - not part of emoji).

    Args:
        emoji_text: Input text to analyze.

    Returns:
        str: BIO-tagged string of same length as input.
    """
    result = []
    start_pos = -1
    textlen = len(emoji_text)
    prevc: str | None = None
    for idx, c in enumerate(emoji_text):
        c_ord = ord(c)
        isechar = c_ord in self.echars
        isrechar = (
            c_ord in self.rdepechars
            and prevc is not None
            and ord(prevc) in self.rdepechars[c_ord]
        )
        islechar = (
            idx + 1 < textlen
            and c_ord in self.ldepechars
            and ord(emoji_text[idx + 1]) in self.ldepechars[c_ord]
        )
        issp = c in SPECIAL_CHARS

        if start_pos < 0:
            if isechar or islechar:
                start_pos = idx
                result.append("B")
            else:
                result.append("O")
        elif not isrechar and not issp:
            if isechar or islechar:
                if prevc != ZERO_WIDTH_JOINER:
                    start_pos = idx
                    result.append("B")
                else:
                    result.append("I")
            else:
                start_pos = -1
                result.append("O")
        else:
            result.append("I")
        prevc = c
    return "".join(result)
emojis_with_cp
emojis_with_cp(cp: int) -> List[Emoji]

Find all emojis containing a specific code point.

Parameters:

Name Type Description Default
cp int

Unicode code point to search for.

required

Returns:

Type Description
List[Emoji]

List[Emoji]: List of emoji metadata objects containing the code point.

Source code in packages/utils/src/dataknobs_utils/emoji_utils.py
def emojis_with_cp(self, cp: int) -> List[Emoji]:
    """Find all emojis containing a specific code point.

    Args:
        cp: Unicode code point to search for.

    Returns:
        List[Emoji]: List of emoji metadata objects containing the code point.
    """
    return [e for emoji, e in self.emojis.items() if cp in get_emoji_seq(emoji)]
get_emojis
get_emojis(text: str) -> List[Emoji]

Extract all emojis from text with their metadata.

Parameters:

Name Type Description Default
text str

Arbitrary text to search for emojis.

required

Returns:

Type Description
List[Emoji]

List[Emoji]: List of emoji metadata objects found in the text (empty if no emojis found).

Source code in packages/utils/src/dataknobs_utils/emoji_utils.py
def get_emojis(self, text: str) -> List[Emoji]:
    """Extract all emojis from text with their metadata.

    Args:
        text: Arbitrary text to search for emojis.

    Returns:
        List[Emoji]: List of emoji metadata objects found in the text
            (empty if no emojis found).
    """
    result = []
    bio = self.emoji_bio(text)
    biolen = len(bio)
    start_pos = 0
    while "B" in bio[start_pos:]:
        start_pos = bio.index("B", start_pos)
        end_pos = start_pos + 1
        while end_pos < biolen and bio[end_pos] == "I":
            end_pos += 1
        result.append(self.emojis[text[start_pos:end_pos]])
        start_pos = end_pos
    return result

Functions

build_emoji_dataclass

build_emoji_dataclass(emoji_test_line: str) -> Emoji | None

Parse an emoji-test.txt file line into an Emoji dataclass.

Parses lines matching the emoji-test.txt format to extract emoji metadata. Lines not matching the expected format are ignored.

Parameters:

Name Type Description Default
emoji_test_line str

Single line from emoji-test.txt file.

required

Returns:

Type Description
Emoji | None

Emoji | None: Parsed emoji metadata, or None if line doesn't match expected format.

Source code in packages/utils/src/dataknobs_utils/emoji_utils.py
def build_emoji_dataclass(emoji_test_line: str) -> Emoji | None:
    """Parse an emoji-test.txt file line into an Emoji dataclass.

    Parses lines matching the emoji-test.txt format to extract emoji metadata.
    Lines not matching the expected format are ignored.

    Args:
        emoji_test_line: Single line from emoji-test.txt file.

    Returns:
        Emoji | None: Parsed emoji metadata, or None if line doesn't match
            expected format.
    """
    result = None
    m = ETESTLINE_RE.match(emoji_test_line)
    if m:
        result = Emoji(
            "".join(chr(int(x, 16)) for x in m.group(1).split()),
            m.group(2),
            m.group(3),
            m.group(4).strip(),
        )
    return result

get_emoji_seq

get_emoji_seq(emoji_str: str, as_hex: bool = False) -> List[Union[int, str]]

Convert emoji string to sequence of code points.

Parameters:

Name Type Description Default
emoji_str str

Emoji string to convert.

required
as_hex bool

If True, returns hex strings; if False, returns integers. Defaults to False.

False

Returns:

Type Description
List[Union[int, str]]

List[Union[int, str]]: List of code points as integers or hex strings.

Source code in packages/utils/src/dataknobs_utils/emoji_utils.py
def get_emoji_seq(emoji_str: str, as_hex: bool = False) -> List[Union[int, str]]:
    """Convert emoji string to sequence of code points.

    Args:
        emoji_str: Emoji string to convert.
        as_hex: If True, returns hex strings; if False, returns integers.
            Defaults to False.

    Returns:
        List[Union[int, str]]: List of code points as integers or hex strings.
    """
    return [hex(ord(x)) for x in emoji_str] if as_hex else [ord(x) for x in emoji_str]

load_emoji_data

load_emoji_data() -> EmojiData | None

Load emoji data from emoji-test.txt file.

Attempts to load from the EMOJI_TEST_DATA environment variable if set, otherwise uses the default latest emoji data file.

Returns:

Type Description
EmojiData | None

EmojiData | None: Loaded emoji data, or None if the data file doesn't exist.

Source code in packages/utils/src/dataknobs_utils/emoji_utils.py
def load_emoji_data() -> EmojiData | None:
    """Load emoji data from emoji-test.txt file.

    Attempts to load from the EMOJI_TEST_DATA environment variable if set,
    otherwise uses the default latest emoji data file.

    Returns:
        EmojiData | None: Loaded emoji data, or None if the data file doesn't exist.
    """
    result = None
    datapath = os.environ.get("EMOJI_TEST_DATA", LATEST_EMOJI_DATA)
    if os.path.exists(datapath):
        result = EmojiData(datapath)
    return result

json_extractor

Classes and Functions

dataknobs_utils.json_extractor

Extract and repair JSON objects from text strings.

Provides the JSONExtractor class for finding, extracting, and repairing JSON objects embedded in text, with categorization of complete vs fixed objects.

Classes:

Name Description
JSONExtractor

Extract and repair JSON objects from text strings.

Classes

JSONExtractor

JSONExtractor()

Extract and repair JSON objects from text strings.

Provides functionality to extract well-formed JSON objects, attempt to repair malformed JSON, and separate non-JSON text. Extracted objects are categorized as complete (well-formed) or fixed (repaired from malformed state).

Attributes:

Name Type Description
complete_jsons List[Dict[str, Any]]

List of well-formed JSON objects extracted from text.

fixed_jsons List[Dict[str, Any]]

List of JSON objects that were malformed but successfully repaired.

non_json_text str

Text content that doesn't contain JSON objects.

Methods:

Name Description
extract_jsons

Extract all JSON objects from text, repairing malformed JSON when possible.

get_complete_jsons

Get all well-formed JSON objects extracted from text.

get_fixed_jsons

Get all JSON objects that were repaired from malformed state.

get_non_json_text

Get text content after removing all extracted JSON objects.

get_value

Get a value at a specific path from the first matching JSON object.

get_values

Get values at a specific path from all complete JSON objects.

Source code in packages/utils/src/dataknobs_utils/json_extractor.py
def __init__(self) -> None:
    # List of complete, well-formed JSON objects
    self.complete_jsons: List[Dict[str, Any]] = []
    # List of JSON objects that were malformed but fixed
    self.fixed_jsons: List[Dict[str, Any]] = []
    # Text that doesn't contain JSON objects
    self.non_json_text: str = ""
Functions
extract_jsons
extract_jsons(text: str) -> List[Dict[str, Any]]

Extract all JSON objects from text, repairing malformed JSON when possible.

Searches for JSON objects in the text, attempts to parse them, and repairs malformed JSON by closing unclosed brackets, quotes, and fixing trailing commas. Updates instance attributes with categorized results and remaining non-JSON text.

Parameters:

Name Type Description Default
text str

Text string potentially containing JSON objects.

required

Returns:

Type Description
List[Dict[str, Any]]

List[Dict[str, Any]]: All successfully extracted and parsed JSON objects (both complete and fixed).

Source code in packages/utils/src/dataknobs_utils/json_extractor.py
def extract_jsons(self, text: str) -> List[Dict[str, Any]]:
    """Extract all JSON objects from text, repairing malformed JSON when possible.

    Searches for JSON objects in the text, attempts to parse them, and repairs
    malformed JSON by closing unclosed brackets, quotes, and fixing trailing commas.
    Updates instance attributes with categorized results and remaining non-JSON text.

    Args:
        text: Text string potentially containing JSON objects.

    Returns:
        List[Dict[str, Any]]: All successfully extracted and parsed JSON objects
            (both complete and fixed).
    """
    self.complete_jsons = []
    self.fixed_jsons = []
    self.non_json_text = text

    # Find all potential JSON objects using regex
    # Look for patterns that start with { and end with }
    potential_jsons = self._find_json_objects(text)

    extracted_jsons = []

    for json_text, is_complete in potential_jsons:
        try:
            # Try to parse the JSON text
            json_obj = json.loads(json_text)
            if is_complete:
                self.complete_jsons.append(json_obj)
            else:
                self.fixed_jsons.append(json_obj)
            extracted_jsons.append(json_obj)

            # Remove the JSON text from non_json_text
            self.non_json_text = self.non_json_text.replace(json_text, "", 1)
        except json.JSONDecodeError:
            # If it's malformed, try to fix it
            fixed_json = self._fix_json(json_text)
            if fixed_json:
                try:
                    json_obj = json.loads(fixed_json)
                    self.fixed_jsons.append(json_obj)
                    extracted_jsons.append(json_obj)

                    # Remove the original JSON text from non_json_text
                    self.non_json_text = self.non_json_text.replace(json_text, "", 1)
                except json.JSONDecodeError:
                    # If we still can't parse it, leave it in non_json_text
                    pass

    # Clean up any remaining JSON brackets in non_json_text
    self.non_json_text = self.non_json_text.strip()

    return extracted_jsons
get_complete_jsons
get_complete_jsons() -> List[Dict[str, Any]]

Get all well-formed JSON objects extracted from text.

Returns:

Type Description
List[Dict[str, Any]]

List[Dict[str, Any]]: List of JSON objects that were successfully parsed without requiring repairs.

Source code in packages/utils/src/dataknobs_utils/json_extractor.py
def get_complete_jsons(self) -> List[Dict[str, Any]]:
    """Get all well-formed JSON objects extracted from text.

    Returns:
        List[Dict[str, Any]]: List of JSON objects that were successfully
            parsed without requiring repairs.
    """
    return self.complete_jsons
get_fixed_jsons
get_fixed_jsons() -> List[Dict[str, Any]]

Get all JSON objects that were repaired from malformed state.

Returns:

Type Description
List[Dict[str, Any]]

List[Dict[str, Any]]: List of JSON objects that required repair (closing brackets, quotes, etc.) before successful parsing.

Source code in packages/utils/src/dataknobs_utils/json_extractor.py
def get_fixed_jsons(self) -> List[Dict[str, Any]]:
    """Get all JSON objects that were repaired from malformed state.

    Returns:
        List[Dict[str, Any]]: List of JSON objects that required repair
            (closing brackets, quotes, etc.) before successful parsing.
    """
    return self.fixed_jsons
get_non_json_text
get_non_json_text() -> str

Get text content after removing all extracted JSON objects.

Returns:

Name Type Description
str str

Remaining text with all JSON objects (both complete and fixed) removed.

Source code in packages/utils/src/dataknobs_utils/json_extractor.py
def get_non_json_text(self) -> str:
    """Get text content after removing all extracted JSON objects.

    Returns:
        str: Remaining text with all JSON objects (both complete and fixed)
            removed.
    """
    return self.non_json_text
get_value
get_value(key_path: str, default: Any | None = None) -> Any

Get a value at a specific path from the first matching JSON object.

Uses dot notation to navigate nested JSON structures and retrieves the value from the first complete JSON object that contains the specified path.

Parameters:

Name Type Description Default
key_path str

Dot-separated path to the value (e.g., 's3.bucket' for nested structures).

required
default Any | None

Value to return if the path doesn't exist in any object. Defaults to None.

None

Returns:

Name Type Description
Any Any

The value found at the path, or the default value if not found.

Source code in packages/utils/src/dataknobs_utils/json_extractor.py
def get_value(self, key_path: str, default: Any | None = None) -> Any:
    """Get a value at a specific path from the first matching JSON object.

    Uses dot notation to navigate nested JSON structures and retrieves the value
    from the first complete JSON object that contains the specified path.

    Args:
        key_path: Dot-separated path to the value (e.g., 's3.bucket' for
            nested structures).
        default: Value to return if the path doesn't exist in any object.
            Defaults to None.

    Returns:
        Any: The value found at the path, or the default value if not found.
    """
    value = default
    for json_obj in self.complete_jsons:
        cur_value = get_value(json_obj, key_path)
        if cur_value is not None:
            value = cur_value
            break
    return value
get_values
get_values(key_path: str) -> List[Any]

Get values at a specific path from all complete JSON objects.

Uses dot notation to navigate nested JSON structures and retrieves values from all complete JSON objects that contain the specified path.

Parameters:

Name Type Description Default
key_path str

Dot-separated path to the value (e.g., 's3.bucket' for nested structures).

required

Returns:

Type Description
List[Any]

List[Any]: List of values found at the path (empty if none found).

Source code in packages/utils/src/dataknobs_utils/json_extractor.py
def get_values(self, key_path: str) -> List[Any]:
    """Get values at a specific path from all complete JSON objects.

    Uses dot notation to navigate nested JSON structures and retrieves values
    from all complete JSON objects that contain the specified path.

    Args:
        key_path: Dot-separated path to the value (e.g., 's3.bucket' for
            nested structures).

    Returns:
        List[Any]: List of values found at the path (empty if none found).
    """
    values = []
    for json_obj in self.complete_jsons:
        value = get_value(json_obj, key_path)
        if value is not None:
            values.append(value)
    return values

Functions

Usage Examples

File Processing Example

from dataknobs_utils import file_utils

# Generate all Python files in a directory
for filepath in file_utils.filepath_generator("/path/to/project"):
    if filepath.endswith(".py"):
        # Process each line
        for line in file_utils.fileline_generator(filepath):
            print(f"{filepath}: {line}")

# Write processed results
processed_lines = ["result 1", "result 2", "result 3"]
file_utils.write_lines("output.txt", processed_lines)

Elasticsearch Example

from dataknobs_utils import elasticsearch_utils

# Build a search query
query = elasticsearch_utils.build_field_query_dict(
    ["title", "content"], 
    "machine learning"
)

# Create index configuration
table_settings = elasticsearch_utils.TableSettings(
    "documents",
    {"number_of_shards": 1},
    {
        "properties": {
            "title": {"type": "text"},
            "content": {"type": "text"}
        }
    }
)

# Create and use index
index = elasticsearch_utils.ElasticsearchIndex(None, [table_settings])
results = index.search(query)

LLM Utils Example

from dataknobs_utils import llm_utils

# Create prompt message
message = llm_utils.PromptMessage(
    "user",
    "Analyze this data and provide insights",
    metadata={"priority": "high", "model": "gpt-4"}
)

# Access nested configuration
config = {
    "models": {
        "gpt4": {
            "temperature": 0.7,
            "max_tokens": 1000
        }
    }
}

temperature = llm_utils.get_value_by_key(
    config, "models.gpt4.temperature", 0.5
)
print(f"Temperature: {temperature}")  # 0.7

Integration Example

from dataknobs_utils import (
    file_utils, json_utils, elasticsearch_utils, llm_utils
)
import json

# Complete data processing pipeline
def process_documents(input_dir: str, es_index: str):
    """Process JSON documents and index in Elasticsearch."""

    # Step 1: Collect documents
    documents = []
    for filepath in file_utils.filepath_generator(input_dir):
        if filepath.endswith('.json'):
            for line in file_utils.fileline_generator(filepath):
                try:
                    doc = json.loads(line)
                    documents.append(doc)
                except json.JSONDecodeError:
                    continue

    # Step 2: Process with LLM utils for configuration
    config = json_utils.load_json_file("config.json")
    es_config = llm_utils.get_value_by_key(
        config, "elasticsearch.settings", {}
    )

    # Step 3: Index in Elasticsearch
    table_settings = elasticsearch_utils.TableSettings(
        es_index,
        es_config,
        {
            "properties": {
                "content": {"type": "text"},
                "timestamp": {"type": "date"}
            }
        }
    )

    index = elasticsearch_utils.ElasticsearchIndex(None, [table_settings])

    # Create batch file
    with open("batch_data.jsonl", "w") as f:
        elasticsearch_utils.add_batch_data(
            f, iter(documents), es_index
        )

    return len(documents)

# Usage
processed_count = process_documents("/data/input", "processed_docs")
print(f"Processed {processed_count} documents")

Error Handling

All functions include appropriate error handling. Here are common patterns:

from dataknobs_utils import file_utils, elasticsearch_utils

try:
    # File operations
    lines = list(file_utils.fileline_generator("data.txt"))
except FileNotFoundError:
    print("File not found")
except IOError as e:
    print(f"IO error: {e}")

try:
    # Elasticsearch operations
    index = elasticsearch_utils.ElasticsearchIndex(None, [])
    if not index.is_up():
        raise ConnectionError("Elasticsearch not available")
except ConnectionError:
    print("Cannot connect to Elasticsearch")

Testing

Example test patterns for dataknobs_utils:

import pytest
import tempfile
import os
from dataknobs_utils import file_utils, llm_utils

def test_file_operations():
    """Test file utility functions."""
    with tempfile.TemporaryDirectory() as temp_dir:
        # Test file writing and reading
        test_file = os.path.join(temp_dir, "test.txt")
        test_lines = ["line1", "line2", "line3"]

        file_utils.write_lines(test_file, test_lines)
        read_lines = list(file_utils.fileline_generator(test_file))

        assert read_lines == sorted(test_lines)  # write_lines sorts

def test_llm_utils():
    """Test LLM utility functions."""
    # Test nested dictionary access
    data = {"a": {"b": {"c": "value"}}}

    result = llm_utils.get_value_by_key(data, "a.b.c")
    assert result == "value"

    result = llm_utils.get_value_by_key(data, "x.y.z", "default")
    assert result == "default"

    # Test PromptMessage
    msg = llm_utils.PromptMessage("user", "test", {"key": "value"})
    assert msg.role == "user"
    assert msg.content == "test"
    assert msg.metadata["key"] == "value"

Performance Notes

  • File Utils: Uses generators for memory-efficient processing of large files
  • Elasticsearch Utils: Supports batch operations for better performance
  • JSON Utils: Optimized for streaming JSON processing
  • Pandas Utils: Efficient DataFrame operations with proper data types

Dependencies

Core dependencies for dataknobs_utils:

pandas>=1.3.0
numpy>=1.20.0
requests>=2.25.0
psycopg2-binary>=2.8.6  # for SQL utils
elasticsearch>=7.0.0  # optional, for elasticsearch_utils

Contributing

For contributing to dataknobs_utils:

  1. Fork the repository
  2. Create feature branch
  3. Add tests for new functionality
  4. Ensure all tests pass
  5. Submit pull request

See Contributing Guide for detailed information.

Changelog

Version 1.0.0

  • Initial release
  • Core utility modules
  • Elasticsearch integration
  • File processing utilities
  • LLM prompt management
  • JSON processing tools

License

See License for license information.