Data Validation Examples¶
This document provides comprehensive examples of data validation scenarios using the DataKnobs validation system, from simple field validation to complex business rules.
Example 1: User Registration System¶
Scenario¶
Complete user registration with multi-level validation, duplicate checking, and security requirements.
from dataknobs_data.validation import (
Schema, Required, Length, Pattern, Enum, Unique, Custom, Range
)
from dataknobs_data import Record, MemoryDatabase, Query
import re
import hashlib
from datetime import datetime, date
from typing import List, Dict, Any
class UserRegistrationSystem:
"""Complete user registration with comprehensive validation"""
def __init__(self):
self.user_db = MemoryDatabase()
self.setup_schemas()
self.setup_validators()
def setup_schemas(self):
"""Define validation schemas for different user types"""
# Base user schema
self.base_user_schema = (Schema("BaseUser", strict=False)
.field("username", "STRING", required=True,
constraints=[
Length(min=3, max=30),
Pattern(r"^[a-zA-Z0-9_-]+$",
"Username can only contain letters, numbers, hyphens, and underscores"),
Unique("username"),
Custom(self.check_reserved_usernames, "Username is reserved")
])
.field("email", "STRING", required=True,
constraints=[
Pattern(r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$",
"Invalid email format"),
Unique("email"),
Custom(self.check_disposable_email, "Disposable email addresses not allowed")
])
.field("password", "STRING", required=True,
constraints=[
Length(min=12, max=128),
Custom(self.validate_password_strength, "Password does not meet security requirements")
])
.field("age", "INTEGER", required=True,
constraints=[
Range(min=13, max=150, message="Age must be between 13 and 150")
])
.field("country", "STRING", required=True,
constraints=[
Custom(self.validate_country_code, "Invalid country code")
])
.field("terms_accepted", "BOOLEAN", required=True,
constraints=[
Custom(lambda x: x is True, "Terms and conditions must be accepted")
])
.field("marketing_consent", "BOOLEAN", default=False)
)
# Premium user schema (extends base)
self.premium_user_schema = (Schema("PremiumUser", strict=False)
.copy_from(self.base_user_schema)
.field("payment_method", "DICT", required=True,
constraints=[
Custom(self.validate_payment_method, "Invalid payment method")
])
.field("billing_address", "DICT", required=True,
constraints=[
Custom(self.validate_address, "Invalid billing address")
])
.field("subscription_tier", "STRING", required=True,
constraints=[
Enum(["basic", "professional", "enterprise"])
])
)
# Business user schema
self.business_user_schema = (Schema("BusinessUser", strict=False)
.copy_from(self.base_user_schema)
.field("company_name", "STRING", required=True,
constraints=[
Length(min=2, max=100)
])
.field("vat_number", "STRING", required=False,
constraints=[
Custom(self.validate_vat_number, "Invalid VAT number")
])
.field("employees", "INTEGER", required=True,
constraints=[
Range(min=1, max=1000000)
])
)
def setup_validators(self):
"""Setup custom validators"""
# Reserved usernames
self.reserved_usernames = {
"admin", "root", "administrator", "moderator", "support",
"help", "api", "www", "mail", "email", "blog", "news",
"about", "contact", "privacy", "terms", "legal"
}
# Disposable email domains
self.disposable_domains = {
"tempmail.com", "throwaway.email", "guerrillamail.com",
"mailinator.com", "10minutemail.com", "trashmail.com"
}
# Valid country codes (ISO 3166-1 alpha-2)
self.valid_countries = {
"US", "GB", "CA", "AU", "DE", "FR", "JP", "CN", "IN", "BR",
# ... add all country codes
}
def check_reserved_usernames(self, username: str) -> bool:
"""Check if username is not reserved"""
return username.lower() not in self.reserved_usernames
def check_disposable_email(self, email: str) -> bool:
"""Check if email is not from disposable service"""
domain = email.split("@")[1] if "@" in email else ""
return domain.lower() not in self.disposable_domains
def validate_password_strength(self, password: str) -> bool:
"""Validate password meets security requirements"""
if len(password) < 12:
return False
# Check complexity requirements
has_upper = re.search(r"[A-Z]", password) is not None
has_lower = re.search(r"[a-z]", password) is not None
has_digit = re.search(r"\d", password) is not None
has_special = re.search(r"[!@#$%^&*(),.?\":{}|<>]", password) is not None
# Require at least 3 out of 4 character types
complexity_score = sum([has_upper, has_lower, has_digit, has_special])
if complexity_score < 3:
return False
# Check for common patterns
common_patterns = [
r"12345", r"qwerty", r"password", r"admin",
r"([a-z])\1{2,}", # Repeated characters
r"(012|123|234|345|456|567|678|789)", # Sequential numbers
r"(abc|bcd|cde|def|efg|fgh)", # Sequential letters
]
for pattern in common_patterns:
if re.search(pattern, password.lower()):
return False
return True
def validate_country_code(self, country: str) -> bool:
"""Validate ISO country code"""
return country.upper() in self.valid_countries
def validate_payment_method(self, payment: Dict[str, Any]) -> bool:
"""Validate payment method structure"""
if not isinstance(payment, dict):
return False
payment_type = payment.get("type")
if payment_type == "credit_card":
return all([
self.validate_credit_card(payment.get("card_number", "")),
payment.get("expiry_month") in range(1, 13),
payment.get("expiry_year") >= datetime.now().year,
len(str(payment.get("cvv", ""))) in [3, 4]
])
elif payment_type == "paypal":
return self.validate_email(payment.get("paypal_email", ""))
elif payment_type == "bank_transfer":
return all([
payment.get("account_number"),
payment.get("routing_number"),
payment.get("account_holder_name")
])
return False
def validate_credit_card(self, card_number: str) -> bool:
"""Validate credit card using Luhn algorithm"""
card_number = re.sub(r'\D', '', str(card_number))
if not card_number or len(card_number) < 13 or len(card_number) > 19:
return False
# Luhn algorithm
total = 0
reverse = card_number[::-1]
for i, digit in enumerate(reverse):
n = int(digit)
if i % 2 == 1:
n *= 2
if n > 9:
n -= 9
total += n
return total % 10 == 0
def validate_email(self, email: str) -> bool:
"""Extended email validation"""
pattern = r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"
return re.match(pattern, email) is not None
def validate_address(self, address: Dict[str, Any]) -> bool:
"""Validate address structure"""
required_fields = ["street", "city", "country", "postal_code"]
if not all(field in address for field in required_fields):
return False
# Validate postal code format based on country
country = address.get("country")
postal_code = address.get("postal_code")
postal_patterns = {
"US": r"^\d{5}(-\d{4})?$", # ZIP code
"GB": r"^[A-Z]{1,2}\d{1,2}[A-Z]?\s?\d[A-Z]{2}$", # UK postcode
"CA": r"^[A-Z]\d[A-Z]\s?\d[A-Z]\d$", # Canadian postal code
# Add more patterns
}
if country in postal_patterns:
if not re.match(postal_patterns[country], postal_code):
return False
return True
def validate_vat_number(self, vat_number: str) -> bool:
"""Validate EU VAT number format"""
# Simplified VAT validation (real implementation would check with VIES)
vat_patterns = {
"GB": r"^GB\d{9}$",
"DE": r"^DE\d{9}$",
"FR": r"^FR[A-Z0-9]{11}$",
# Add more EU country patterns
}
for country, pattern in vat_patterns.items():
if re.match(pattern, vat_number):
return True
return False
def register_user(self, user_data: Dict[str, Any], user_type: str = "base") -> Dict[str, Any]:
"""Register a new user with validation"""
# Select appropriate schema
if user_type == "premium":
schema = self.premium_user_schema
elif user_type == "business":
schema = self.business_user_schema
else:
schema = self.base_user_schema
# Create record
user_record = Record(data=user_data)
# Validate
result = schema.validate(user_record, coerce=True)
if not result.valid:
return {
"success": False,
"errors": result.errors,
"warnings": result.warnings
}
# Additional security checks
security_check = self.perform_security_checks(result.value.data)
if not security_check["passed"]:
return {
"success": False,
"errors": security_check["errors"]
}
# Hash password before storage
validated_data = result.value.data.copy()
validated_data["password"] = self.hash_password(validated_data["password"])
# Add metadata
validated_data["created_at"] = datetime.now().isoformat()
validated_data["status"] = "active"
validated_data["verification_token"] = self.generate_verification_token(validated_data["email"])
# Save to database
self.user_db.insert(Record(data=validated_data))
return {
"success": True,
"user_id": validated_data.get("username"),
"verification_required": True,
"message": "Registration successful. Please check your email for verification."
}
def perform_security_checks(self, user_data: Dict[str, Any]) -> Dict[str, Any]:
"""Additional security validation"""
errors = []
# Check password not similar to username/email
password = user_data.get("password", "").lower()
username = user_data.get("username", "").lower()
email = user_data.get("email", "").split("@")[0].lower()
if username in password or email in password:
errors.append("Password too similar to username or email")
# Check for SQL injection attempts
suspicious_patterns = [
r"(\b(SELECT|INSERT|UPDATE|DELETE|DROP|UNION|ALTER)\b)",
r"(;|--|\||&&)",
r"(<script|javascript:|onerror=)",
]
for field, value in user_data.items():
if isinstance(value, str):
for pattern in suspicious_patterns:
if re.search(pattern, value, re.IGNORECASE):
errors.append(f"Suspicious content in {field}")
break
return {
"passed": len(errors) == 0,
"errors": errors
}
def hash_password(self, password: str) -> str:
"""Hash password using secure algorithm"""
# In production, use bcrypt or argon2
salt = "secure_salt_here" # Should be random per user
return hashlib.pbkdf2_hmac('sha256', password.encode(), salt.encode(), 100000).hex()
def generate_verification_token(self, email: str) -> str:
"""Generate email verification token"""
timestamp = datetime.now().timestamp()
token_data = f"{email}:{timestamp}"
return hashlib.sha256(token_data.encode()).hexdigest()
# Usage example
def test_registration():
system = UserRegistrationSystem()
# Test valid registration
valid_user = {
"username": "john_doe_123",
"email": "john@example.com",
"password": "Secure#Pass123!@#",
"age": 25,
"country": "US",
"terms_accepted": True,
"marketing_consent": False
}
result = system.register_user(valid_user, "base")
print(f"Registration result: {result}")
# Test invalid registration
invalid_user = {
"username": "admin", # Reserved
"email": "test@tempmail.com", # Disposable
"password": "weak", # Too weak
"age": 10, # Too young
"country": "XX", # Invalid country
"terms_accepted": False # Not accepted
}
result = system.register_user(invalid_user, "base")
print(f"Invalid registration errors: {result.get('errors')}")
if __name__ == "__main__":
test_registration()
Example 2: Financial Transaction Validation¶
Scenario¶
Validate financial transactions with compliance checks, fraud detection, and regulatory requirements.
from decimal import Decimal
from dataknobs_data.validation import Schema, Range, Custom, Required, Pattern
from dataknobs_data import Record
import re
from datetime import datetime, timedelta
from typing import Dict, List, Optional
class FinancialTransactionValidator:
"""Comprehensive financial transaction validation"""
def __init__(self):
self.setup_schemas()
self.transaction_history = [] # For pattern detection
self.blacklisted_accounts = set()
self.suspicious_patterns = []
def setup_schemas(self):
"""Define validation schemas for different transaction types"""
# Base transaction schema
self.base_transaction_schema = (Schema("BaseTransaction", strict=True)
.field("transaction_id", "STRING", required=True,
constraints=[
Pattern(r"^TXN-[0-9]{4}-[0-9]{6}-[A-Z0-9]{6}$",
"Invalid transaction ID format")
])
.field("amount", "FLOAT", required=True,
constraints=[
Range(min=0.01, max=1000000,
message="Amount must be between $0.01 and $1,000,000"),
Custom(self.validate_decimal_places, "Amount must have at most 2 decimal places")
])
.field("currency", "STRING", required=True,
constraints=[
Pattern(r"^[A-Z]{3}$", "Currency must be 3-letter ISO code")
])
.field("timestamp", "STRING", required=True,
constraints=[
Custom(self.validate_timestamp, "Invalid or future timestamp")
])
.field("account_from", "STRING", required=True,
constraints=[
Custom(self.validate_account_number, "Invalid account number"),
Custom(self.check_blacklist, "Account is blacklisted")
])
.field("account_to", "STRING", required=True,
constraints=[
Custom(self.validate_account_number, "Invalid account number"),
Custom(self.check_blacklist, "Account is blacklisted")
])
)
# Wire transfer schema (with additional requirements)
self.wire_transfer_schema = (Schema("WireTransfer", strict=True)
.copy_from(self.base_transaction_schema)
.field("swift_code", "STRING", required=True,
constraints=[
Pattern(r"^[A-Z]{6}[A-Z0-9]{2}([A-Z0-9]{3})?$",
"Invalid SWIFT/BIC code")
])
.field("reference", "STRING", required=True,
constraints=[
Length(min=1, max=140),
Custom(self.validate_reference, "Invalid reference format")
])
.field("beneficiary_name", "STRING", required=True,
constraints=[
Length(min=2, max=70),
Pattern(r"^[a-zA-Z\s\-'\.]+$", "Invalid beneficiary name")
])
.field("beneficiary_address", "DICT", required=True,
constraints=[
Custom(self.validate_wire_address, "Invalid beneficiary address")
])
)
# ACH transfer schema
self.ach_transfer_schema = (Schema("ACHTransfer", strict=True)
.copy_from(self.base_transaction_schema)
.field("routing_number", "STRING", required=True,
constraints=[
Pattern(r"^\d{9}$", "Routing number must be 9 digits"),
Custom(self.validate_routing_number, "Invalid routing number")
])
.field("sec_code", "STRING", required=True,
constraints=[
Enum(["PPD", "CCD", "WEB", "TEL", "ARC", "BOC", "POP"])
])
.field("effective_date", "STRING", required=True,
constraints=[
Custom(self.validate_effective_date, "Invalid effective date")
])
)
def validate_decimal_places(self, amount: float) -> bool:
"""Validate amount has at most 2 decimal places"""
decimal_amount = Decimal(str(amount))
return decimal_amount.as_tuple().exponent >= -2
def validate_timestamp(self, timestamp: str) -> bool:
"""Validate timestamp is valid and not in future"""
try:
dt = datetime.fromisoformat(timestamp)
return dt <= datetime.now()
except:
return False
def validate_account_number(self, account: str) -> bool:
"""Validate account number format"""
# IBAN validation
if account.startswith(("GB", "DE", "FR", "IT", "ES")):
return self.validate_iban(account)
# US account validation
elif re.match(r"^\d{10,17}$", account):
return True
return False
def validate_iban(self, iban: str) -> bool:
"""Validate IBAN using check digits"""
iban = iban.replace(" ", "").upper()
if not re.match(r"^[A-Z]{2}\d{2}[A-Z0-9]+$", iban):
return False
# Move first 4 chars to end
rearranged = iban[4:] + iban[:4]
# Replace letters with numbers (A=10, B=11, ..., Z=35)
numeric = ""
for char in rearranged:
if char.isdigit():
numeric += char
else:
numeric += str(ord(char) - ord('A') + 10)
# Check modulo 97
return int(numeric) % 97 == 1
def check_blacklist(self, account: str) -> bool:
"""Check if account is not blacklisted"""
return account not in self.blacklisted_accounts
def validate_routing_number(self, routing: str) -> bool:
"""Validate US routing number with checksum"""
if not re.match(r"^\d{9}$", routing):
return False
# ABA routing number checksum
weights = [3, 7, 1, 3, 7, 1, 3, 7, 1]
total = sum(int(routing[i]) * weights[i] for i in range(9))
return total % 10 == 0
def validate_reference(self, reference: str) -> bool:
"""Validate payment reference"""
# Check for suspicious patterns
suspicious = [
r"(terrorist|terrorism|drugs|laundering)",
r"(^|[^a-z])(iran|north korea|syria)([^a-z]|$)",
]
for pattern in suspicious:
if re.search(pattern, reference.lower()):
return False
return True
def validate_wire_address(self, address: Dict) -> bool:
"""Validate wire transfer beneficiary address"""
required = ["street", "city", "country"]
return all(field in address and address[field] for field in required)
def validate_effective_date(self, date_str: str) -> bool:
"""Validate ACH effective date (business days only)"""
try:
date = datetime.strptime(date_str, "%Y-%m-%d").date()
# Check not weekend
if date.weekday() >= 5: # Saturday = 5, Sunday = 6
return False
# Check not too far in future (60 days max)
if date > (datetime.now().date() + timedelta(days=60)):
return False
return True
except:
return False
def perform_aml_checks(self, transaction: Dict) -> Dict[str, Any]:
"""Anti-Money Laundering checks"""
risk_score = 0
flags = []
amount = Decimal(str(transaction.get("amount", 0)))
# Check for structuring (multiple transactions just under reporting threshold)
if 9000 < amount < 10000:
risk_score += 20
flags.append("Possible structuring")
# Check for round amounts (common in money laundering)
if amount % 1000 == 0 and amount > 5000:
risk_score += 10
flags.append("Round amount transaction")
# Check transaction velocity
velocity_risk = self.check_velocity(transaction)
risk_score += velocity_risk["score"]
if velocity_risk["flag"]:
flags.append(velocity_risk["flag"])
# Check for suspicious patterns
pattern_risk = self.check_patterns(transaction)
risk_score += pattern_risk["score"]
flags.extend(pattern_risk["flags"])
# Determine action based on risk score
if risk_score >= 70:
action = "block"
elif risk_score >= 40:
action = "review"
else:
action = "approve"
return {
"risk_score": risk_score,
"flags": flags,
"action": action,
"requires_sar": risk_score >= 60 # Suspicious Activity Report
}
def check_velocity(self, transaction: Dict) -> Dict:
"""Check transaction velocity for anomalies"""
account = transaction.get("account_from")
amount = Decimal(str(transaction.get("amount", 0)))
# Get recent transactions for this account
recent = [t for t in self.transaction_history[-100:]
if t.get("account_from") == account]
if len(recent) >= 5:
# Check if unusual number of transactions
if len(recent) > 20:
return {"score": 30, "flag": "High transaction velocity"}
# Check if unusual amount
amounts = [Decimal(str(t.get("amount", 0))) for t in recent]
avg_amount = sum(amounts) / len(amounts)
if amount > avg_amount * 5:
return {"score": 25, "flag": "Amount significantly above average"}
return {"score": 0, "flag": None}
def check_patterns(self, transaction: Dict) -> Dict:
"""Check for known suspicious patterns"""
flags = []
score = 0
# Rapid movement pattern (in -> out quickly)
if self.detect_rapid_movement(transaction):
flags.append("Rapid fund movement detected")
score += 35
# Circular transaction pattern
if self.detect_circular_pattern(transaction):
flags.append("Circular transaction pattern")
score += 40
return {"score": score, "flags": flags}
def detect_rapid_movement(self, transaction: Dict) -> bool:
"""Detect rapid in-out fund movement"""
account = transaction.get("account_to")
# Check if this account received funds recently and is now sending
for hist_txn in self.transaction_history[-20:]:
if (hist_txn.get("account_to") == account and
transaction.get("account_from") == account):
# Funds received and sent within short time
return True
return False
def detect_circular_pattern(self, transaction: Dict) -> bool:
"""Detect circular transaction patterns"""
# Simplified check - in production would use graph analysis
account_from = transaction.get("account_from")
account_to = transaction.get("account_to")
# Check if accounts have transacted in reverse recently
for hist_txn in self.transaction_history[-50:]:
if (hist_txn.get("account_from") == account_to and
hist_txn.get("account_to") == account_from):
return True
return False
def validate_transaction(self, transaction_data: Dict,
transaction_type: str = "base") -> Dict[str, Any]:
"""Validate a financial transaction"""
# Select appropriate schema
if transaction_type == "wire":
schema = self.wire_transfer_schema
elif transaction_type == "ach":
schema = self.ach_transfer_schema
else:
schema = self.base_transaction_schema
# Create record
transaction_record = Record(data=transaction_data)
# Schema validation
result = schema.validate(transaction_record, coerce=True)
if not result.valid:
return {
"valid": False,
"errors": result.errors,
"transaction_id": transaction_data.get("transaction_id")
}
# AML checks
aml_result = self.perform_aml_checks(result.value.data)
# Sanctions screening (simplified)
sanctions_result = self.screen_sanctions(result.value.data)
# Add to history for pattern detection
self.transaction_history.append(result.value.data)
# Determine final status
if aml_result["action"] == "block" or sanctions_result["blocked"]:
status = "blocked"
reason = aml_result["flags"] + sanctions_result.get("matches", [])
elif aml_result["action"] == "review":
status = "pending_review"
reason = aml_result["flags"]
else:
status = "approved"
reason = []
return {
"valid": True,
"status": status,
"transaction_id": transaction_data.get("transaction_id"),
"risk_score": aml_result["risk_score"],
"flags": reason,
"requires_sar": aml_result["requires_sar"],
"sanctions_hit": sanctions_result["blocked"]
}
def screen_sanctions(self, transaction: Dict) -> Dict:
"""Screen against sanctions lists"""
# Simplified sanctions screening
sanctioned_countries = {"Iran", "North Korea", "Syria", "Cuba"}
sanctioned_entities = {"Bad Company LLC", "Evil Corp"}
beneficiary = transaction.get("beneficiary_name", "")
# Check country
if "beneficiary_address" in transaction:
country = transaction["beneficiary_address"].get("country", "")
if country in sanctioned_countries:
return {"blocked": True, "matches": [f"Sanctioned country: {country}"]}
# Check entity
if beneficiary in sanctioned_entities:
return {"blocked": True, "matches": [f"Sanctioned entity: {beneficiary}"]}
return {"blocked": False, "matches": []}
# Usage example
def test_financial_validation():
validator = FinancialTransactionValidator()
# Valid wire transfer
valid_wire = {
"transaction_id": "TXN-2024-000001-ABC123",
"amount": 50000.00,
"currency": "USD",
"timestamp": datetime.now().isoformat(),
"account_from": "GB82WEST12345698765432",
"account_to": "DE89370400440532013000",
"swift_code": "DEUTDEFF",
"reference": "Invoice payment Q1-2024",
"beneficiary_name": "Acme Corporation",
"beneficiary_address": {
"street": "123 Business St",
"city": "Frankfurt",
"country": "Germany"
}
}
result = validator.validate_transaction(valid_wire, "wire")
print(f"Wire transfer validation: {result}")
# Suspicious transaction
suspicious = {
"transaction_id": "TXN-2024-000002-XYZ789",
"amount": 9999.99, # Just under reporting threshold
"currency": "USD",
"timestamp": datetime.now().isoformat(),
"account_from": "1234567890",
"account_to": "0987654321",
}
result = validator.validate_transaction(suspicious, "base")
print(f"Suspicious transaction: {result}")
if __name__ == "__main__":
test_financial_validation()
Example 3: Healthcare Data Quality Validation¶
Scenario¶
Validate healthcare records for data quality, HIPAA compliance, and clinical accuracy.
from dataknobs_data.validation import Schema, Custom, Pattern, Range, Required
from dataknobs_data import Record
import re
from datetime import datetime, date
from typing import Dict, List, Optional
class HealthcareDataValidator:
"""Healthcare data validation with HIPAA compliance"""
def __init__(self):
self.setup_schemas()
self.setup_medical_references()
def setup_schemas(self):
"""Define healthcare record schemas"""
# Patient demographics schema
self.patient_schema = (Schema("PatientRecord", strict=False)
.field("patient_id", "STRING", required=True,
constraints=[
Pattern(r"^PAT-\d{10}$", "Invalid patient ID format"),
Custom(self.validate_not_ssn, "Patient ID cannot be SSN")
])
.field("mrn", "STRING", required=True,
constraints=[
Pattern(r"^MRN-\d{8}$", "Invalid MRN format")
])
.field("name", "DICT", required=True,
constraints=[
Custom(self.validate_name_structure, "Invalid name structure")
])
.field("dob", "STRING", required=True,
constraints=[
Custom(self.validate_dob, "Invalid date of birth")
])
.field("gender", "STRING", required=True,
constraints=[
Enum(["M", "F", "O", "U"]) # Male, Female, Other, Unknown
])
.field("contact", "DICT", required=True,
constraints=[
Custom(self.validate_contact_info, "Invalid contact information")
])
)
# Clinical data schema
self.clinical_schema = (Schema("ClinicalRecord", strict=True)
.field("patient_id", "STRING", required=True)
.field("encounter_id", "STRING", required=True,
constraints=[
Pattern(r"^ENC-\d{12}$", "Invalid encounter ID")
])
.field("encounter_date", "STRING", required=True,
constraints=[
Custom(self.validate_encounter_date, "Invalid encounter date")
])
.field("vital_signs", "DICT", required=False,
constraints=[
Custom(self.validate_vital_signs, "Invalid vital signs")
])
.field("diagnoses", "LIST", required=True,
constraints=[
Custom(self.validate_diagnoses, "Invalid diagnosis codes")
])
.field("procedures", "LIST", required=False,
constraints=[
Custom(self.validate_procedures, "Invalid procedure codes")
])
.field("medications", "LIST", required=False,
constraints=[
Custom(self.validate_medications, "Invalid medications")
])
.field("allergies", "LIST", required=False,
constraints=[
Custom(self.validate_allergies, "Invalid allergy information")
])
.field("lab_results", "LIST", required=False,
constraints=[
Custom(self.validate_lab_results, "Invalid lab results")
])
)
def setup_medical_references(self):
"""Setup medical reference data"""
# ICD-10 code patterns (simplified)
self.icd10_pattern = r"^[A-Z]\d{2}\.?\d{0,3}$"
# CPT code pattern
self.cpt_pattern = r"^\d{5}$"
# LOINC code pattern
self.loinc_pattern = r"^\d{4,5}-\d$"
# Valid medication routes
self.valid_routes = {
"PO", "IV", "IM", "SC", "SL", "PR", "TOP", "INH", "NASAL"
}
# Normal vital sign ranges
self.vital_ranges = {
"blood_pressure_systolic": (90, 180),
"blood_pressure_diastolic": (60, 120),
"heart_rate": (40, 180),
"respiratory_rate": (8, 40),
"temperature_f": (95.0, 105.0),
"oxygen_saturation": (70, 100),
"bmi": (10, 70)
}
def validate_not_ssn(self, patient_id: str) -> bool:
"""Ensure patient ID is not a SSN"""
# Check if it looks like SSN (XXX-XX-XXXX or XXXXXXXXX)
ssn_patterns = [
r"^\d{3}-\d{2}-\d{4}$",
r"^\d{9}$"
]
for pattern in ssn_patterns:
if re.match(pattern, patient_id):
return False
return True
def validate_name_structure(self, name: Dict) -> bool:
"""Validate patient name structure"""
required = ["first", "last"]
if not all(field in name for field in required):
return False
# Check for PHI leakage in name fields
for field, value in name.items():
if isinstance(value, str):
# Check name doesn't contain numbers (except suffixes)
if field != "suffix" and re.search(r"\d", value):
return False
# Check reasonable length
if len(value) < 1 or len(value) > 50:
return False
return True
def validate_dob(self, dob: str) -> bool:
"""Validate date of birth"""
try:
birth_date = datetime.strptime(dob, "%Y-%m-%d").date()
# Check not future date
if birth_date > date.today():
return False
# Check reasonable age (0-150 years)
age = (date.today() - birth_date).days / 365.25
if age < 0 or age > 150:
return False
return True
except:
return False
def validate_contact_info(self, contact: Dict) -> bool:
"""Validate contact information with HIPAA considerations"""
# Phone validation
if "phone" in contact:
phone = re.sub(r'\D', '', contact["phone"])
if len(phone) not in [10, 11]:
return False
# Email validation
if "email" in contact:
if not re.match(r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$",
contact["email"]):
return False
# Address validation
if "address" in contact:
addr = contact["address"]
required_addr = ["street", "city", "state", "zip"]
if not all(field in addr for field in required_addr):
return False
# Validate ZIP code
if not re.match(r"^\d{5}(-\d{4})?$", addr["zip"]):
return False
return True
def validate_encounter_date(self, encounter_date: str) -> bool:
"""Validate encounter date"""
try:
enc_date = datetime.strptime(encounter_date, "%Y-%m-%d %H:%M:%S")
# Check not future date
if enc_date > datetime.now():
return False
# Check not too old (e.g., 100 years)
if (datetime.now() - enc_date).days > 36500:
return False
return True
except:
return False
def validate_vital_signs(self, vitals: Dict) -> bool:
"""Validate vital signs are within reasonable ranges"""
for vital, value in vitals.items():
if vital in self.vital_ranges:
min_val, max_val = self.vital_ranges[vital]
try:
numeric_value = float(value)
if numeric_value < min_val or numeric_value > max_val:
return False
except:
return False
# Check blood pressure format if present
if "blood_pressure" in vitals:
bp = vitals["blood_pressure"]
if not re.match(r"^\d{2,3}/\d{2,3}$", bp):
return False
systolic, diastolic = map(int, bp.split("/"))
if systolic <= diastolic:
return False
return True
def validate_diagnoses(self, diagnoses: List) -> bool:
"""Validate diagnosis codes (ICD-10)"""
if not diagnoses:
return False
for diagnosis in diagnoses:
if isinstance(diagnosis, dict):
code = diagnosis.get("code", "")
# Validate ICD-10 format
if not re.match(self.icd10_pattern, code):
return False
# Check required fields
if "description" not in diagnosis:
return False
else:
return False
return True
def validate_procedures(self, procedures: List) -> bool:
"""Validate procedure codes (CPT)"""
for procedure in procedures:
if isinstance(procedure, dict):
code = procedure.get("code", "")
# Validate CPT format
if not re.match(self.cpt_pattern, code):
return False
# Check date if present
if "date" in procedure:
try:
proc_date = datetime.strptime(procedure["date"], "%Y-%m-%d")
if proc_date > datetime.now():
return False
except:
return False
else:
return False
return True
def validate_medications(self, medications: List) -> bool:
"""Validate medication information"""
for med in medications:
if not isinstance(med, dict):
return False
# Required fields
if not all(field in med for field in ["name", "dosage", "route"]):
return False
# Validate route
if med["route"] not in self.valid_routes:
return False
# Validate dosage format (simplified)
dosage = med["dosage"]
if not re.match(r"^\d+(\.\d+)?\s*(mg|mcg|g|mL|units?|IU)", dosage):
return False
return True
def validate_allergies(self, allergies: List) -> bool:
"""Validate allergy information"""
for allergy in allergies:
if not isinstance(allergy, dict):
return False
# Required fields
if not all(field in allergy for field in ["allergen", "reaction"]):
return False
# Validate severity if present
if "severity" in allergy:
if allergy["severity"] not in ["mild", "moderate", "severe", "life-threatening"]:
return False
return True
def validate_lab_results(self, lab_results: List) -> bool:
"""Validate laboratory results"""
for result in lab_results:
if not isinstance(result, dict):
return False
# Required fields
if not all(field in result for field in ["test_name", "value", "unit", "reference_range"]):
return False
# Validate LOINC code if present
if "loinc_code" in result:
if not re.match(self.loinc_pattern, result["loinc_code"]):
return False
# Check if value is numeric or a valid result
value = result["value"]
if not (isinstance(value, (int, float)) or
value in ["positive", "negative", "pending", "abnormal"]):
return False
return True
def perform_hipaa_compliance_check(self, record: Dict) -> Dict[str, Any]:
"""Check for HIPAA compliance issues"""
issues = []
compliant = True
# Check for exposed PHI
phi_fields = ["ssn", "social_security", "license_number", "full_account"]
for field in phi_fields:
if field in record:
issues.append(f"Exposed PHI field: {field}")
compliant = False
# Check data minimization
if "patient_id" in record and "mrn" in record:
# Both identifiers present - check if necessary
pass
# Check encryption indicators (in real system)
if "encryption_status" in record and not record["encryption_status"]:
issues.append("Data not encrypted")
compliant = False
# Check audit trail
if "audit_trail" not in record or not record["audit_trail"]:
issues.append("Missing audit trail")
compliant = False
return {
"compliant": compliant,
"issues": issues,
"risk_level": "low" if compliant else "high"
}
def validate_healthcare_record(self, record_data: Dict,
record_type: str = "patient") -> Dict[str, Any]:
"""Validate a complete healthcare record"""
# Select schema
if record_type == "clinical":
schema = self.clinical_schema
else:
schema = self.patient_schema
# Create record
record = Record(data=record_data)
# Schema validation
result = schema.validate(record, coerce=True)
if not result.valid:
return {
"valid": False,
"errors": result.errors,
"warnings": result.warnings
}
# HIPAA compliance check
hipaa_check = self.perform_hipaa_compliance_check(result.value.data)
# Data quality scoring
quality_score = self.calculate_data_quality_score(result.value.data)
return {
"valid": True,
"data": result.value.data,
"hipaa_compliant": hipaa_check["compliant"],
"hipaa_issues": hipaa_check["issues"],
"quality_score": quality_score,
"warnings": result.warnings
}
def calculate_data_quality_score(self, record: Dict) -> float:
"""Calculate data quality score (0-100)"""
score = 100.0
# Completeness check
optional_fields = ["middle_name", "suffix", "secondary_phone", "emergency_contact"]
for field in optional_fields:
if field not in record or not record[field]:
score -= 2 # Small penalty for missing optional fields
# Consistency check
if "dob" in record and "age" in record:
calculated_age = (date.today() - datetime.strptime(record["dob"], "%Y-%m-%d").date()).days / 365.25
if abs(calculated_age - record["age"]) > 1:
score -= 10 # Inconsistent age
# Timeliness check
if "last_updated" in record:
last_update = datetime.fromisoformat(record["last_updated"])
days_old = (datetime.now() - last_update).days
if days_old > 365:
score -= min(20, days_old / 365 * 10) # Stale data penalty
return max(0, score)
# Usage example
def test_healthcare_validation():
validator = HealthcareDataValidator()
# Valid patient record
valid_patient = {
"patient_id": "PAT-1234567890",
"mrn": "MRN-12345678",
"name": {
"first": "John",
"last": "Doe",
"middle": "A"
},
"dob": "1980-05-15",
"gender": "M",
"contact": {
"phone": "555-123-4567",
"email": "john.doe@example.com",
"address": {
"street": "123 Main St",
"city": "Boston",
"state": "MA",
"zip": "02101"
}
},
"audit_trail": True
}
result = validator.validate_healthcare_record(valid_patient, "patient")
print(f"Patient validation: {result}")
if __name__ == "__main__":
test_healthcare_validation()
Best Practices Summary¶
- Layer validation - Combine schema, business rules, and compliance checks
- Provide clear error messages - Help users understand what needs correction
- Use appropriate constraints - Match validation to actual requirements
- Consider performance - Cache schemas and validation results when possible
- Maintain audit trails - Track validation failures for analysis
- Implement graceful degradation - Handle partial validation failures appropriately
- Version your schemas - Track schema changes over time
- Test edge cases - Validate with extreme and boundary values
- Monitor validation metrics - Track success rates and common failures
- Document validation rules - Maintain clear documentation of all validation requirements