Pandas Integration¶
The DataKnobs data package provides seamless integration with pandas DataFrames, enabling efficient data analysis, transformation, and bulk operations.
Overview¶
The pandas integration enables:
- Bidirectional conversion: Convert between Records and DataFrames
- Type preservation: Maintain field types during conversion
- Metadata handling: Preserve record metadata and IDs
- Bulk operations: Efficient batch insert/update from DataFrames
- Query integration: Return query results as DataFrames
- Analysis workflows: Leverage pandas for data analysis
Core Components¶
DataFrameConverter¶
The DataFrameConverter class handles conversion between Records and DataFrames.
from dataknobs_data.pandas import DataFrameConverter
import pandas as pd
# Create converter
converter = DataFrameConverter()
# Convert records to DataFrame
records = database.search(Query())
df = converter.records_to_dataframe(records)
# Convert DataFrame back to records
new_records = converter.dataframe_to_records(df)
Conversion Options¶
Control conversion behavior with ConversionOptions:
from dataknobs_data.pandas import ConversionOptions, MetadataStrategy
options = ConversionOptions(
preserve_types=True, # Maintain field types
include_metadata=True, # Include metadata columns
metadata_strategy=MetadataStrategy.SEPARATE, # How to handle metadata
flatten_nested=True, # Flatten nested structures
parse_json=True, # Parse JSON fields
datetime_format="%Y-%m-%d", # Date format
null_handling="preserve" # How to handle nulls
)
df = converter.records_to_dataframe(records, options=options)
Type Mapping¶
Field Type to Pandas dtype¶
The converter automatically maps DataKnobs field types to appropriate pandas dtypes:
from dataknobs_data.fields import FieldType
import pandas as pd
import numpy as np
# Type mapping
TYPE_MAPPING = {
FieldType.STRING: "string", # pd.StringDtype()
FieldType.INTEGER: "Int64", # pd.Int64Dtype() - nullable
FieldType.FLOAT: "Float64", # pd.Float64Dtype() - nullable
FieldType.BOOLEAN: "boolean", # pd.BooleanDtype() - nullable
FieldType.DATETIME: "datetime64[ns]", # pd.DatetimeTZDtype()
FieldType.JSON: "object", # Python objects
FieldType.BINARY: "object", # Bytes objects
}
# Example conversion with type preservation
records = [
Record(fields={
"id": Field("id", FieldType.STRING, "user_001"),
"age": Field("age", FieldType.INTEGER, 25),
"score": Field("score", FieldType.FLOAT, 98.5),
"active": Field("active", FieldType.BOOLEAN, True),
"created": Field("created", FieldType.DATETIME, datetime.now())
})
]
df = converter.records_to_dataframe(records)
print(df.dtypes)
# id string
# age Int64
# score Float64
# active boolean
# created datetime64[ns]
Custom Type Converters¶
Define custom type conversion logic:
from dataknobs_data.pandas import TypeConverter
class CustomTypeConverter(TypeConverter):
"""Custom type conversion logic"""
def to_pandas_value(self, field):
"""Convert field value to pandas-compatible value"""
if field.type == FieldType.JSON:
# Convert JSON to string for DataFrame
return json.dumps(field.value) if field.value else None
elif field.type == "custom_type":
# Handle custom type
return str(field.value)
return super().to_pandas_value(field)
def from_pandas_value(self, value, field_type):
"""Convert pandas value back to field value"""
if field_type == FieldType.JSON:
# Parse JSON string
return json.loads(value) if value else None
elif field_type == "custom_type":
# Handle custom type
return CustomType(value)
return super().from_pandas_value(value, field_type)
# Use custom converter
converter = DataFrameConverter(type_converter=CustomTypeConverter())
Metadata Preservation¶
Metadata Strategies¶
Different strategies for handling record metadata:
from dataknobs_data.pandas import MetadataStrategy
# Strategy 1: Include metadata as columns
options = ConversionOptions(
metadata_strategy=MetadataStrategy.COLUMNS
)
df = converter.records_to_dataframe(records, options=options)
# DataFrame includes: _id, _metadata_created, _metadata_updated, etc.
# Strategy 2: Separate metadata DataFrame
options = ConversionOptions(
metadata_strategy=MetadataStrategy.SEPARATE
)
df, metadata_df = converter.records_to_dataframe(records, options=options)
# df: Contains only field data
# metadata_df: Contains record IDs and metadata
# Strategy 3: Ignore metadata
options = ConversionOptions(
metadata_strategy=MetadataStrategy.IGNORE
)
df = converter.records_to_dataframe(records, options=options)
# DataFrame contains only field values
ID Preservation¶
Preserve record IDs during conversion:
# Convert with ID preservation
df = converter.records_to_dataframe(records, preserve_ids=True)
print(df.index) # Record IDs as index
# Convert back preserving IDs
records = converter.dataframe_to_records(df, use_index_as_id=True)
for record in records:
print(record.id) # Original IDs preserved
Batch Operations¶
Bulk Insert from DataFrame¶
Efficiently insert DataFrame data into database:
from dataknobs_data.pandas import BatchOperations
# Create batch operations handler
batch_ops = BatchOperations(database)
# Bulk insert from DataFrame
df = pd.read_csv("large_dataset.csv")
result = batch_ops.bulk_insert_dataframe(
df,
batch_size=1000,
parallel=True,
validate=True # Validate against schema
)
print(f"Inserted: {result.successful}")
print(f"Failed: {result.failed}")
if result.errors:
print("Errors:", result.errors)
Bulk Update¶
Update existing records from DataFrame:
# Update records matching DataFrame index
df_updates = pd.DataFrame({
"status": ["active", "inactive", "active"],
"last_login": [datetime.now()] * 3
}, index=["id1", "id2", "id3"]) # Record IDs as index
result = batch_ops.bulk_update_dataframe(
df_updates,
id_column=None, # Use index as ID
merge_strategy="update" # or "replace"
)
Upsert Operations¶
Insert or update based on existence:
# Upsert: Update if exists, insert if new
result = batch_ops.bulk_upsert_dataframe(
df,
id_column="user_id", # Column to use as record ID
batch_size=500
)
print(f"Inserted: {result.inserted}")
print(f"Updated: {result.updated}")
Query Integration¶
Query Results as DataFrame¶
Get query results directly as DataFrame:
from dataknobs_data.pandas import PandasDatabase
# Wrap database with pandas support
pandas_db = PandasDatabase(database)
# Query returning DataFrame
df = pandas_db.search_dataframe(
Query(
filters=[Filter("status", "=", "active")],
sort=[Sort("created_at", "desc")],
limit=1000
)
)
# Use pandas for analysis
summary = df.groupby("category").agg({
"price": ["mean", "std", "count"],
"quantity": "sum"
})
Aggregation Queries¶
Perform aggregations with pandas:
# Get all data as DataFrame
df = pandas_db.all_as_dataframe()
# Complex aggregation
result = df.groupby(["category", "status"]).agg({
"revenue": "sum",
"quantity": "sum",
"customer_id": "nunique",
"created_at": ["min", "max"]
}).round(2)
# Time-based aggregation
df["created_at"] = pd.to_datetime(df["created_at"])
daily_stats = df.set_index("created_at").resample("D").agg({
"revenue": "sum",
"orders": "count"
})
Data Analysis Workflows¶
ETL Pipeline¶
Extract, Transform, Load workflow:
class DataPipeline:
"""ETL pipeline using pandas"""
def __init__(self, source_db, target_db):
self.source_db = PandasDatabase(source_db)
self.target_db = PandasDatabase(target_db)
self.converter = DataFrameConverter()
def run(self, query=None):
# Extract
df = self.source_db.search_dataframe(query or Query())
# Transform
df = self.transform(df)
# Load
result = self.target_db.bulk_insert_dataframe(df)
return result
def transform(self, df):
"""Apply transformations"""
# Clean data
df = df.dropna(subset=["required_field"])
df["email"] = df["email"].str.lower().str.strip()
# Add computed columns
df["full_name"] = df["first_name"] + " " + df["last_name"]
df["age_group"] = pd.cut(df["age"],
bins=[0, 18, 30, 50, 100],
labels=["youth", "young", "middle", "senior"])
# Aggregate if needed
if "daily_summary" in self.target_db.name:
df = df.groupby(["date", "category"]).agg({
"sales": "sum",
"customers": "nunique"
}).reset_index()
return df
# Run pipeline
pipeline = DataPipeline(source_db, target_db)
result = pipeline.run()
Data Cleaning¶
Clean and validate data using pandas:
def clean_dataset(database):
"""Clean and validate dataset"""
# Load data
pandas_db = PandasDatabase(database)
df = pandas_db.all_as_dataframe()
# Remove duplicates
df = df.drop_duplicates(subset=["email"], keep="first")
# Fix data types
df["age"] = pd.to_numeric(df["age"], errors="coerce")
df["created_at"] = pd.to_datetime(df["created_at"], errors="coerce")
# Handle missing values
df["status"] = df["status"].fillna("unknown")
df["score"] = df["score"].fillna(df["score"].median())
# Validate ranges
df.loc[df["age"] < 0, "age"] = None
df.loc[df["age"] > 150, "age"] = None
# Standardize text
df["category"] = df["category"].str.lower().str.strip()
df["category"] = df["category"].replace({
"electronic": "electronics",
"clothes": "clothing"
})
# Save cleaned data
batch_ops = BatchOperations(database)
batch_ops.bulk_upsert_dataframe(df, id_column="id")
return df
Statistical Analysis¶
Perform statistical analysis on data:
def analyze_dataset(database):
"""Statistical analysis of dataset"""
pandas_db = PandasDatabase(database)
df = pandas_db.all_as_dataframe()
# Basic statistics
print("Dataset Overview:")
print(df.describe())
# Correlation analysis
numeric_cols = df.select_dtypes(include=[np.number]).columns
correlations = df[numeric_cols].corr()
# Find strong correlations
strong_corr = correlations[abs(correlations) > 0.7]
print("\nStrong Correlations:")
print(strong_corr)
# Distribution analysis
for col in numeric_cols:
skewness = df[col].skew()
kurtosis = df[col].kurtosis()
print(f"\n{col}:")
print(f" Skewness: {skewness:.2f}")
print(f" Kurtosis: {kurtosis:.2f}")
# Outlier detection
Q1 = df[numeric_cols].quantile(0.25)
Q3 = df[numeric_cols].quantile(0.75)
IQR = Q3 - Q1
outliers = ((df[numeric_cols] < (Q1 - 1.5 * IQR)) |
(df[numeric_cols] > (Q3 + 1.5 * IQR)))
print("\nOutliers per column:")
print(outliers.sum())
return {
"summary": df.describe(),
"correlations": correlations,
"outliers": outliers
}
Performance Optimization¶
Chunked Processing¶
Process large datasets in chunks:
def process_large_dataset(database, chunk_size=10000):
"""Process large dataset in chunks"""
total = database.count()
processed = 0
while processed < total:
# Get chunk
query = Query(offset=processed, limit=chunk_size)
records = database.search(query)
# Convert to DataFrame
df = converter.records_to_dataframe(records)
# Process chunk
df = process_chunk(df)
# Save results
batch_ops.bulk_upsert_dataframe(df)
processed += len(records)
print(f"Processed {processed}/{total} records")
# Clear memory
del df
gc.collect()
Parallel Processing¶
Use parallel processing for better performance:
import multiprocessing as mp
from concurrent.futures import ProcessPoolExecutor
def parallel_transform(database, transform_fn, num_workers=4):
"""Transform data in parallel"""
# Split data into partitions
total = database.count()
partition_size = total // num_workers
def process_partition(offset, limit):
"""Process a single partition"""
query = Query(offset=offset, limit=limit)
records = database.search(query)
df = converter.records_to_dataframe(records)
df = transform_fn(df)
return df
# Process partitions in parallel
with ProcessPoolExecutor(max_workers=num_workers) as executor:
futures = []
for i in range(num_workers):
offset = i * partition_size
limit = partition_size if i < num_workers - 1 else total - offset
futures.append(executor.submit(process_partition, offset, limit))
# Combine results
results = pd.concat([f.result() for f in futures])
return results
Memory Management¶
Optimize memory usage for large DataFrames:
def optimize_dataframe_memory(df):
"""Reduce DataFrame memory usage"""
initial_memory = df.memory_usage(deep=True).sum() / 1024**2
for col in df.columns:
col_type = df[col].dtype
if col_type != 'object':
if col_type == 'float64':
df[col] = pd.to_numeric(df[col], downcast='float')
elif col_type == 'int64':
df[col] = pd.to_numeric(df[col], downcast='integer')
else:
# Convert string columns to category if appropriate
num_unique = df[col].nunique()
num_total = len(df[col])
if num_unique / num_total < 0.5: # Less than 50% unique
df[col] = df[col].astype('category')
final_memory = df.memory_usage(deep=True).sum() / 1024**2
reduction = (initial_memory - final_memory) / initial_memory * 100
print(f"Memory usage reduced by {reduction:.1f}%")
print(f"From {initial_memory:.1f} MB to {final_memory:.1f} MB")
return df
Integration Examples¶
Data Export¶
Export data to various formats:
def export_data(database, format="csv", query=None):
"""Export data in various formats"""
# Get data as DataFrame
pandas_db = PandasDatabase(database)
df = pandas_db.search_dataframe(query or Query())
if format == "csv":
df.to_csv("export.csv", index=False)
elif format == "excel":
with pd.ExcelWriter("export.xlsx") as writer:
df.to_excel(writer, sheet_name="Data", index=False)
# Add summary sheet
summary = df.describe()
summary.to_excel(writer, sheet_name="Summary")
elif format == "parquet":
df.to_parquet("export.parquet", compression="snappy")
elif format == "json":
df.to_json("export.json", orient="records", indent=2)
elif format == "html":
df.to_html("export.html", index=False)
print(f"Exported {len(df)} records to {format}")
Data Import¶
Import data from various sources:
def import_data(database, file_path, format="auto"):
"""Import data from file"""
# Detect format
if format == "auto":
ext = file_path.split(".")[-1].lower()
format = ext
# Read file into DataFrame
if format == "csv":
df = pd.read_csv(file_path)
elif format in ["xlsx", "xls"]:
df = pd.read_excel(file_path)
elif format == "parquet":
df = pd.read_parquet(file_path)
elif format == "json":
df = pd.read_json(file_path)
else:
raise ValueError(f"Unsupported format: {format}")
# Import into database
batch_ops = BatchOperations(database)
result = batch_ops.bulk_insert_dataframe(df, validate=True)
print(f"Imported {result.successful} records")
if result.failed:
print(f"Failed: {result.failed}")
return result
Real-time Analytics¶
Combine database queries with pandas analytics:
class RealTimeAnalytics:
"""Real-time analytics using pandas"""
def __init__(self, database):
self.db = PandasDatabase(database)
self.cache = {}
self.cache_ttl = 60 # seconds
def get_metrics(self, time_range="1h"):
"""Get real-time metrics"""
cache_key = f"metrics_{time_range}"
if cache_key in self.cache:
cached_time, cached_data = self.cache[cache_key]
if time.time() - cached_time < self.cache_ttl:
return cached_data
# Query recent data
cutoff = datetime.now() - pd.Timedelta(time_range)
query = Query(filters=[
Filter("created_at", ">=", cutoff)
])
df = self.db.search_dataframe(query)
# Calculate metrics
metrics = {
"total_records": len(df),
"unique_users": df["user_id"].nunique(),
"average_value": df["value"].mean(),
"total_revenue": df["revenue"].sum(),
"top_categories": df["category"].value_counts().head(5).to_dict(),
"hourly_trend": df.set_index("created_at").resample("H")["value"].sum().to_dict()
}
# Cache results
self.cache[cache_key] = (time.time(), metrics)
return metrics
Best Practices¶
- Choose appropriate strategies: Select metadata and type preservation strategies based on use case
- Optimize memory usage: Use appropriate dtypes and consider chunking for large datasets
- Validate data: Validate DataFrames before inserting into database
- Handle nulls explicitly: Define clear null handling strategy
- Use batch operations: Leverage batch operations for better performance
- Consider parallelization: Use parallel processing for large transformations
- Cache when appropriate: Cache frequently accessed DataFrames
- Monitor performance: Track conversion and operation times
- Test round-trip conversion: Ensure data integrity in conversions
- Document transformations: Keep clear records of data transformations
See Also¶
- Database Backends - Backend implementations and batch operations
- Migration Utilities - Data migration tools
- Query System - Query construction and execution