Skip to content

Elasticsearch Backend

Overview

The Elasticsearch Backend provides powerful full-text search, analytics, and distributed storage capabilities for large-scale applications.

Features

  • Full-text search - Advanced text analysis
  • Distributed - Horizontal scaling
  • Real-time - Near real-time indexing
  • Analytics - Aggregations and facets
  • Both sync and async - Using elasticsearch-py

Configuration

from dataknobs_data import ElasticsearchDatabase

config = {
    "hosts": ["http://localhost:9200"],
    "index": "records",
    "auth": ("elastic", "password"),
    "verify_certs": True,
    "pool_size": 10
}

db = ElasticsearchDatabase(config)

Index Mapping

{
  "mappings": {
    "properties": {
      "id": {"type": "keyword"},
      "fields": {"type": "object", "dynamic": true},
      "metadata": {"type": "object", "dynamic": true},
      "created_at": {"type": "date"},
      "updated_at": {"type": "date"}
    }
  }
}

Usage Examples

# Text search across all fields
query = Query(filters=[
    Filter("_all", Operator.CONTAINS, "sensor malfunction")
])

results = db.search(query)

Aggregations

# Get statistics
aggregation = {
    "avg_temperature": {"avg": {"field": "fields.temperature"}},
    "max_humidity": {"max": {"field": "fields.humidity"}},
    "sensor_count": {"cardinality": {"field": "fields.sensor_id"}}
}

stats = db.aggregate(aggregation)

Bulk Operations

# Efficient bulk indexing
records = [Record(data) for data in dataset]

# Bulk index with refresh
db.create_batch(records, refresh="wait_for")

Search Features

Fuzzy Matching

# Fuzzy search for typos
query = Query(filters=[
    Filter("name", Operator.FUZZY, "jhon")  # Matches "john"
])

Geo Queries

# Geo-distance queries
query = Query(filters=[
    Filter("location", Operator.GEO_DISTANCE, {
        "point": {"lat": 40.7128, "lon": -74.0060},
        "distance": "10km"
    })
])

Performance Tuning

  • Sharding - Distribute data across nodes
  • Replicas - For high availability
  • Refresh interval - Balance speed vs resources
  • Bulk size - Optimize batch operations
  • Query cache - Enable for repeated queries

Cluster Management

# Check cluster health
health = db.cluster_health()
print(f"Status: {health['status']}")
print(f"Nodes: {health['number_of_nodes']}")

# Index statistics
stats = db.index_stats()
print(f"Document count: {stats['doc_count']}")
print(f"Index size: {stats['size_in_bytes']}")

Production Considerations

  • Monitoring - Use Kibana or Grafana
  • Snapshots - Regular backup snapshots
  • Security - Enable X-Pack security
  • Scaling - Plan shard allocation
  • Version - Keep Elasticsearch updated

See Also