Skip to content

Rate Limiting

Rate Limiting

The Rate Limiter provides a unified interface for controlling operation throughput in the DataKnobs ecosystem. It supports per-category rate configuration, weighted operations, and pluggable backends for scaling from development to production.

Overview

The rate limiter abstraction allows you to:

  • Limit operation throughput with configurable rates per time window
  • Define per-category rates for different operation types (e.g., reads vs writes)
  • Choose blocking or non-blocking acquire modes
  • Switch backends without changing application code
  • Scale from single-process (in-memory) to distributed (Redis/PostgreSQL via pyrate-limiter)

Installation

The in-memory rate limiter is included in dataknobs-common:

pip install dataknobs-common

For distributed backends (Redis, PostgreSQL, SQLite):

pip install pyrate-limiter
pip install pyrate-limiter[redis]     # Redis bucket
pip install pyrate-limiter[postgres]  # PostgreSQL bucket

Quick Start

import asyncio
from dataknobs_common.ratelimit import create_rate_limiter

async def main():
    # Create a rate limiter: 10 operations per minute
    limiter = create_rate_limiter({
        "rates": [{"limit": 10, "interval": 60}],
    })

    # Non-blocking: check before proceeding
    if await limiter.try_acquire("api"):
        print("Request permitted")
    else:
        print("Rate limited — try again later")

    # Blocking: wait until capacity is available
    await limiter.acquire("api", timeout=5.0)
    print("Capacity acquired")

    # Check current status
    status = await limiter.get_status("api")
    print(f"Remaining: {status.remaining}/{status.limit}")

    # Cleanup
    await limiter.close()

if __name__ == "__main__":
    asyncio.run(main())

Core Concepts

Rate Limits

A RateLimit defines a maximum operation count within a time interval:

from dataknobs_common.ratelimit import RateLimit

# 100 requests per minute
rate = RateLimit(limit=100, interval=60)

# 10 requests per second
rate = RateLimit(limit=10, interval=1)

Multiple rates can be combined — the most restrictive rate always applies:

rates = [
    RateLimit(limit=10, interval=1),      # 10 per second
    RateLimit(limit=100, interval=60),     # 100 per minute
    RateLimit(limit=1000, interval=3600),  # 1000 per hour
]

Categories

A single rate limiter supports different rates for named categories. When acquire() or try_acquire() is called with a category name, the limiter looks up rates specific to that category, falling back to default rates for unknown categories.

limiter = create_rate_limiter({
    "default_rates": [{"limit": 100, "interval": 60}],
    "categories": {
        "api_read":  {"rates": [{"limit": 100, "interval": 6}]},
        "api_write": {"rates": [{"limit": 10, "interval": 6}]},
    },
})

await limiter.try_acquire("api_read")   # Uses api_read rates
await limiter.try_acquire("api_write")  # Uses api_write rates
await limiter.try_acquire("other")      # Uses default_rates

Blocking vs Non-Blocking

Method Behavior Use When
try_acquire() Returns True/False immediately You want to fail fast or handle the denial yourself
acquire() Blocks (async sleep) until capacity is available You want to wait for capacity
acquire(timeout=...) Blocks up to timeout seconds, then raises TimeoutError You want to wait but with a deadline

Weighted Operations

Operations can have different weights. A weight of 3 consumes three units of capacity:

# Heavy operation consumes 5 units
await limiter.acquire("batch_import", weight=5)

# Light operation consumes 1 unit (default)
await limiter.acquire("single_read")

Status

Check the current state of a rate limiter bucket:

status = await limiter.get_status("api_write")
print(f"Name: {status.name}")
print(f"Current count: {status.current_count}")
print(f"Limit: {status.limit}")
print(f"Remaining: {status.remaining}")
print(f"Reset after: {status.reset_after:.1f}s")
Field Type Description
name str Category or bucket name
current_count int Operations (total weight) in the current window
limit int Maximum allowed from the tightest rate
remaining int Remaining capacity before the limit
reset_after float Seconds until the oldest entry expires

Backend Selection

Choose your backend based on deployment needs:

In-Memory (Development/Testing)

Single-process, no external dependencies. Uses a sliding window log algorithm.

limiter = create_rate_limiter({
    "rates": [{"limit": 100, "interval": 60}],
})

Use when:

  • Unit testing
  • Local development
  • Single-process applications

Pyrate-Limiter with In-Memory Bucket

Uses pyrate-limiter's in-memory backend. Functionally similar to the built-in in-memory limiter but uses pyrate-limiter's leaky bucket algorithm.

limiter = create_rate_limiter({
    "backend": "pyrate",
    "bucket": "memory",
    "rates": [{"limit": 100, "interval": 60}],
})

Pyrate-Limiter with Redis (Distributed Production)

Distributed rate limiting using Redis. Suitable for multi-process and multi-machine deployments.

limiter = create_rate_limiter({
    "backend": "pyrate",
    "bucket": "redis",
    "default_rates": [{"limit": 100, "interval": 60}],
    "redis": {
        "url": "redis://localhost:6379",
    },
})

For AWS ElastiCache (use rediss:// scheme for TLS):

limiter = create_rate_limiter({
    "backend": "pyrate",
    "bucket": "redis",
    "default_rates": [{"limit": 100, "interval": 60}],
    "redis": {
        "url": "rediss://my-cluster.cache.amazonaws.com:6379",
    },
})

Use when:

  • Multiple application instances
  • Need atomic, distributed rate limiting
  • High-throughput scenarios

Pyrate-Limiter with PostgreSQL (Persistent Production)

Persistent rate limiting using PostgreSQL.

limiter = create_rate_limiter({
    "backend": "pyrate",
    "bucket": "postgres",
    "default_rates": [{"limit": 1000, "interval": 3600}],
    "postgres": {
        "pool": existing_pool,  # psycopg pool instance
        "table": "rate_limits",
    },
})

Use when:

  • You already have PostgreSQL
  • Need persistent rate limit state across restarts

Note: Import the pyrate backend directly for type hints:

from dataknobs_common.ratelimit.pyrate import PyrateRateLimiter

Configuration-Driven Usage

Rate limiter configuration integrates with environment-specific YAML config:

# environments/development.yaml
rate_limiters:
  api:
    backend: memory
    default_rates:
      - {limit: 1000, interval: 60}

# environments/production.yaml
rate_limiters:
  api:
    backend: pyrate
    bucket: redis
    default_rates:
      - {limit: 1000, interval: 60}
    categories:
      api_read:
        rates: [{limit: 100, interval: 6}]
      api_write:
        rates: [{limit: 10, interval: 6}]
    redis:
      url: "rediss://elasticache:6379"
from dataknobs_common.ratelimit import create_rate_limiter

limiter = create_rate_limiter(config["rate_limiters"]["api"])

Configuration Reference

Common Config Keys

Key Type Default Description
backend str "memory" "memory" or "pyrate"
rates list Shorthand for default_rates
default_rates list Default rates (required if rates not set)
categories dict {} Per-category rate overrides

Pyrate Backend Config Keys

Key Type Default Description
bucket str "memory" Bucket backend: "memory", "sqlite", "redis", "postgres"
redis dict Redis connection config
redis.url str "redis://localhost:6379" Redis connection URL (use rediss:// scheme for TLS)
postgres dict PostgreSQL config
postgres.pool object psycopg pool instance (required)
postgres.table str "rate_limits" Table name for rate limit state
sqlite dict SQLite config
sqlite.db_path str "rate_limits.db" Path to SQLite database
sqlite.table str "rate_limits" Table name

Testing

For testing, use the in-memory rate limiter directly — it is the testing construct (like InMemoryEventBus):

import pytest
from dataknobs_common.ratelimit import (
    InMemoryRateLimiter,
    RateLimit,
    RateLimiterConfig,
)

@pytest.fixture
def rate_limiter():
    config = RateLimiterConfig(
        default_rates=[RateLimit(limit=100, interval=60)],
        categories={
            "api_write": [RateLimit(limit=5, interval=60)],
        },
    )
    return InMemoryRateLimiter(config)

async def test_rate_limiting(rate_limiter):
    # Permits within limit
    for _ in range(5):
        assert await rate_limiter.try_acquire("api_write") is True

    # Denies beyond limit
    assert await rate_limiter.try_acquire("api_write") is False

    # Reset restores capacity
    await rate_limiter.reset("api_write")
    assert await rate_limiter.try_acquire("api_write") is True

API Reference

RateLimiter Protocol

@runtime_checkable
class RateLimiter(Protocol):
    async def acquire(
        self, name: str = "default", weight: int = 1,
        timeout: float | None = None,
    ) -> None:
        """Acquire capacity, blocking until available.

        Raises TimeoutError if timeout is exceeded.
        """

    async def try_acquire(
        self, name: str = "default", weight: int = 1,
    ) -> bool:
        """Attempt to acquire without blocking. Returns True/False."""

    async def get_status(self, name: str = "default") -> RateLimitStatus:
        """Get current bucket status."""

    async def reset(self, name: str | None = None) -> None:
        """Reset state for a category or all categories."""

    async def close(self) -> None:
        """Release resources."""

Factory Function

def create_rate_limiter(config: dict) -> RateLimiter:
    """Create a rate limiter from configuration.

    Args:
        config: Configuration dict with rate and backend settings.

    Returns:
        RateLimiter implementation.

    Raises:
        ValueError: If backend is unknown or config is invalid.
    """

RateLimitError

class RateLimitError(OperationError):
    """Raised when a rate limit is exceeded.

    Attributes:
        retry_after: Optional seconds to wait before retrying.
    """

Usage Across DataKnobs

The rate limiter is the shared infrastructure for rate limiting across the DataKnobs ecosystem. The following packages use InMemoryRateLimiter as their rate limiting backend:

Package Component How It Uses Rate Limiting
dataknobs-fsm APIOrchestrator Per-endpoint and global rate limiting for API orchestration. Categories map to endpoint names.
dataknobs-llm RateLimitMiddleware Conversation middleware that limits requests per conversation or per client.
dataknobs-llm AsyncLLMResource Optional per-resource rate limiting for LLM provider calls (configured via requests_per_minute).

This consolidation ensures consistent rate limiting behavior, a single API to learn, and easy backend swapping (e.g., switch from in-memory to Redis for production) across all components.

Module Exports

from dataknobs_common.ratelimit import (
    # Protocol
    RateLimiter,
    # Factory
    create_rate_limiter,
    # Types
    RateLimit,
    RateLimiterConfig,
    RateLimitStatus,
    # In-memory implementation
    InMemoryRateLimiter,
)

# Exception (also available from dataknobs_common.exceptions)
from dataknobs_common import RateLimitError

# Pyrate backend (import directly — requires pyrate-limiter)
from dataknobs_common.ratelimit.pyrate import PyrateRateLimiter