JSON Utils Documentation¶
The json_utils module provides advanced JSON processing capabilities including streaming, path extraction, schema analysis, and data transformation.
Overview¶
JSON Utils offers:
- Path-based Value Extraction: Extract values using dot notation with array indexing
- JSON Streaming: Process large JSON files without loading into memory
- Schema Analysis: Analyze JSON structure and data types
- Record Processing: Extract records from nested JSON structures
- Data Transformation: Convert between JSON and other formats
Core Functions¶
get_value()¶
Extract values from JSON objects using path notation.
from dataknobs_utils.json_utils import get_value
data = {
"users": [
{"name": "Alice", "age": 30},
{"name": "Bob", "age": 25}
],
"config": {"debug": True}
}
# Simple path
name = get_value(data, "config.debug") # True
# Array indexing
first_user = get_value(data, "users[0].name") # "Alice"
# Wildcard extraction
all_names = get_value(data, "users[*].name") # ["Alice", "Bob"]
# First match extraction
any_name = get_value(data, "users[?].name") # "Alice" (first found)
# Default values
missing = get_value(data, "nonexistent.path", "default") # "default"
stream_json_data()¶
Stream JSON data for memory-efficient processing.
from dataknobs_utils.json_utils import stream_json_data
def process_item(item, path):
"""Visitor function called for each JSON value"""
print(f"Path: {path}, Value: {item}")
# Stream from file
stream_json_data("large_file.json", process_item)
# Stream from URL
stream_json_data("https://api.example.com/data.json", process_item)
# Stream from string
json_string = '{"key": "value", "numbers": [1, 2, 3]}'
stream_json_data(json_string, process_item)
build_jq_path()¶
Convert stream paths to jq-style path strings.
from dataknobs_utils.json_utils import build_jq_path
# Convert path tuple to jq path
path_tuple = ("users", 0, "profile", "email")
jq_path = build_jq_path(path_tuple)
print(jq_path) # ".users[0].profile.email"
# Without list indices
jq_path = build_jq_path(path_tuple, keep_list_idxs=False)
print(jq_path) # ".users[].profile.email"
JSON Schema Analysis¶
JsonSchemaBuilder¶
Analyze JSON structure and build schemas.
from dataknobs_utils.json_utils import JsonSchemaBuilder
# Analyze JSON file
builder = JsonSchemaBuilder(
json_data="data.json",
keep_unique_values=True, # Track unique values
invert_uniques=True, # Track paths to values
keep_list_idxs=False # Generalize array indices
)
# Get schema
schema = builder.schema
# Schema as DataFrame
df = schema.df
print(df.head())
# Columns: jq_path, value_type, value_count, unique_count
# Extract unique values for a path
unique_names = schema.get_values(".users[].name")
print(unique_names) # {"Alice", "Bob", "Charlie"}
JsonSchema Class¶
Work with generated schemas.
from dataknobs_utils.json_utils import JsonSchema, ValuesIndex
# Create empty schema
schema = JsonSchema()
# Add path information
schema.add_path(".user.name", "str", value="Alice")
schema.add_path(".user.age", "int", value=30)
schema.add_path(".user.age", "int", value=25) # Another instance
# Get schema DataFrame
df = schema.df
print(df)
# Shows: jq_path, value_type, value_count, unique_count
# Extract values from actual JSON
values = schema.extract_values(".user.name", "users.json")
Record Processing¶
stream_record_paths()¶
Extract records from JSON streams.
from dataknobs_utils.json_utils import stream_record_paths
import io
# Output stream
output = io.StringIO()
# Extract records with custom formatting
def format_record(rec_id, line_num, jq_path, value):
return f"{rec_id},{line_num},{jq_path},{value}"
stream_record_paths(
json_data="records.json",
output_stream=output,
line_builder_fn=format_record
)
# Get results
output.seek(0)
results = output.read()
print(results)
get_records_df()¶
Get records as a pandas DataFrame.
from dataknobs_utils.json_utils import get_records_df
# Process JSON to DataFrame
df = get_records_df("data.json")
print(df.columns) # ["rec_id", "line_num", "jq_path", "item"]
# Analyze record structure
record_counts = df.groupby("rec_id").size()
path_frequency = df["jq_path"].value_counts()
Data Transformation¶
Squashing and Exploding¶
Transform JSON between nested and flat representations.
from dataknobs_utils.json_utils import collect_squashed, explode
# Squash nested JSON to flat key-value pairs
nested_data = {
"user": {
"profile": {"name": "Alice", "age": 30},
"preferences": {"theme": "dark"}
}
}
# Convert to flat structure
squashed = collect_squashed('{"user": {"name": "Alice", "age": 30}}')
print(squashed)
# {".user.name": "Alice", ".user.age": 30}
# Convert back to nested
exploded = explode(squashed)
print(exploded)
# {"user": {"name": "Alice", "age": 30}}
path_to_dict()¶
Convert paths and values to nested dictionaries.
from dataknobs_utils.json_utils import path_to_dict
# Build nested dict from path
result = {}
path_to_dict(".users[0].name", "Alice", result)
path_to_dict(".users[0].age", 30, result)
path_to_dict(".users[1].name", "Bob", result)
print(result)
# {"users": [{"name": "Alice", "age": 30}, {"name": "Bob"}]}
Advanced Processing¶
squash_data()¶
Process JSON with custom builder functions.
from dataknobs_utils.json_utils import squash_data
results = []
def collect_strings(jq_path, item):
"""Collect only string values"""
if isinstance(item, str):
results.append((jq_path, item))
# Process with filtering
squash_data(
builder_fn=collect_strings,
json_data="mixed_data.json",
prune_at=["metadata"] # Skip metadata branches
)
print(results) # List of (path, string_value) tuples
Filtering and Pruning¶
from dataknobs_utils.json_utils import squash_data
# Prune specific paths during processing
prune_config = [
"metadata", # Skip any "metadata" keys
("logs", 2), # Skip "logs" at depth 2
3 # Skip anything at depth 3
]
def process_filtered(jq_path, item):
print(f"Processing: {jq_path} = {item}")
squash_data(
builder_fn=process_filtered,
json_data="data.json",
prune_at=prune_config
)
Streaming Patterns¶
Large File Processing¶
from dataknobs_utils.json_utils import stream_json_data
from collections import defaultdict
# Aggregate data while streaming
stats = defaultdict(int)
def count_types(item, path):
"""Count value types while streaming"""
item_type = type(item).__name__
stats[item_type] += 1
# Process large file without loading into memory
stream_json_data("very_large_file.json", count_types)
print(f"Type distribution: {dict(stats)}")
Selective Processing¶
import json
# Only process specific paths
target_paths = {".users[].email", ".users[].profile.settings"}
collected = []
def selective_visitor(item, path):
jq_path = build_jq_path(path, keep_list_idxs=False)
if jq_path in target_paths:
collected.append((jq_path, item))
stream_json_data("users.json", selective_visitor)
Integration Examples¶
With RecordStore¶
from dataknobs_structures import RecordStore
from dataknobs_utils.json_utils import stream_json_data
# Stream JSON into RecordStore
store = RecordStore("extracted_records.tsv")
def extract_records(item, path):
"""Extract user records from JSON stream"""
if isinstance(item, dict) and "user_id" in item:
store.add_rec(item)
stream_json_data("users.json", extract_records)
store.save()
With Pandas¶
import pandas as pd
from dataknobs_utils.json_utils import JsonSchemaBuilder
# Analyze JSON schema with pandas
builder = JsonSchemaBuilder("data.json", keep_unique_values=True)
schema_df = builder.schema.df
# Analyze value types
type_distribution = schema_df.groupby("value_type")["value_count"].sum()
print(type_distribution)
# Find paths with high cardinality
high_cardinality = schema_df[schema_df["unique_count"] > 100]
print(high_cardinality)
Performance Tips¶
Memory Management¶
# For large files, use streaming instead of loading
# DON'T do this for large files:
# with open("huge_file.json") as f:
# data = json.load(f)
# DO this instead:
def process_large_file():
count = 0
def counter(item, path):
nonlocal count
count += 1
stream_json_data("huge_file.json", counter)
return count
Efficient Path Extraction¶
# Batch path extractions
paths_to_extract = [".user.name", ".user.email", ".user.age"]
extracted = {}
for path in paths_to_extract:
extracted[path] = get_value(data, path)
# Better than multiple separate calls for complex data
Error Handling¶
from dataknobs_utils.json_utils import get_value, stream_json_data
# Handle missing paths gracefully
def safe_extract(data, path):
try:
return get_value(data, path)
except Exception as e:
print(f"Failed to extract {path}: {e}")
return None
# Handle streaming errors
def safe_stream_processor(json_file):
def error_handler(item, path):
try:
# Process item
process_item(item, path)
except Exception as e:
print(f"Error processing {path}: {e}")
try:
stream_json_data(json_file, error_handler, timeout=30)
except Exception as e:
print(f"Stream processing failed: {e}")
Configuration¶
Timeouts and Limits¶
from dataknobs_utils.json_utils import JsonSchemaBuilder
# Configure processing limits
builder = JsonSchemaBuilder(
json_data="data.json",
keep_unique_values=1000, # Limit unique value tracking
timeout=60, # 60 second timeout for URLs
values_limit=500 # Stop after 500 unique values per path
)
URL Processing¶
# Stream from URLs with custom timeout
def process_api_data():
def api_processor(item, path):
if isinstance(item, dict) and "id" in item:
print(f"Processing record: {item['id']}")
stream_json_data(
"https://api.example.com/large-dataset.json",
api_processor,
timeout=120 # 2 minute timeout
)
Best Practices¶
- Use Streaming: For large JSON files, always use streaming APIs
- Path Validation: Validate paths before extraction in production code
- Error Handling: Always handle missing paths and network errors
- Memory Monitoring: Monitor memory usage when processing large datasets
- Timeout Configuration: Set appropriate timeouts for URL-based processing
See Also¶
- File Utils - File system utilities
- Utils Overview - Complete utils package documentation
- Document Processing Examples - Document handling with JSON