Skip to content

Parallel LLM Execution

The ParallelLLMExecutor runs multiple LLM calls (and deterministic functions) concurrently with concurrency control, per-task error isolation, and optional per-task retry.

Overview

from dataknobs_llm import ParallelLLMExecutor, LLMTask, LLMMessage

executor = ParallelLLMExecutor(provider, max_concurrency=3)

results = await executor.execute({
    "q1": LLMTask(messages=[LLMMessage(role="user", content="Generate a math question")]),
    "q2": LLMTask(messages=[LLMMessage(role="user", content="Generate a science question")]),
    "q3": LLMTask(messages=[LLMMessage(role="user", content="Generate a history question")]),
})

for tag, result in results.items():
    if result.success:
        print(f"{tag}: {result.value.content}")
    else:
        print(f"{tag}: FAILED - {result.error}")

ParallelLLMExecutor

Creating an Executor

from dataknobs_llm import ParallelLLMExecutor
from dataknobs_common.retry import RetryConfig, BackoffStrategy

executor = ParallelLLMExecutor(
    provider=llm_provider,        # Any AsyncLLMProvider
    max_concurrency=5,            # Semaphore limit
    default_retry=RetryConfig(    # Optional default retry policy
        max_attempts=3,
        backoff_strategy=BackoffStrategy.EXPONENTIAL,
    ),
)
Parameter Type Default Description
provider AsyncLLMProvider required The LLM provider for executing tasks
max_concurrency int 5 Maximum concurrent tasks (semaphore limit)
default_retry RetryConfig \| None None Default retry policy for tasks without their own
default_config_overrides dict[str, Any] \| None None Config overrides applied to all tasks (task-level overrides take precedence)

execute()

Runs LLM tasks concurrently with error isolation:

results = await executor.execute({
    "stem": LLMTask(messages=[LLMMessage(role="user", content="Generate a question stem")]),
    "distractors": LLMTask(messages=[LLMMessage(role="user", content="Generate distractors")]),
})
  • Input: dict[str, LLMTask] — mapping of tag to task
  • Output: dict[str, TaskResult] — mapping of tag to result
  • Each task runs independently; one failure does not cancel others
  • Concurrency is controlled by max_concurrency

execute_mixed()

Runs a mix of LLM and deterministic tasks concurrently:

from dataknobs_llm import DeterministicTask

results = await executor.execute_mixed({
    "question": LLMTask(
        messages=[LLMMessage(role="user", content="Generate a question")],
    ),
    "timestamp": DeterministicTask(
        fn=lambda: datetime.now().isoformat(),
    ),
    "lookup": DeterministicTask(
        fn=fetch_reference_data,
        args=("biology",),
    ),
})

Deterministic tasks: - Sync callables are run in a thread executor to avoid blocking the event loop - Async callables are awaited directly - Share the same concurrency semaphore as LLM tasks

execute_sequential()

Runs LLM tasks in order, optionally passing results forward:

results = await executor.execute_sequential(
    tasks=[
        LLMTask(
            messages=[LLMMessage(role="user", content="Generate a question")],
            tag="generate",
        ),
        LLMTask(
            messages=[LLMMessage(role="user", content="Now improve this question")],
            tag="improve",
        ),
    ],
    pass_result=True,  # Appends previous response as assistant message
)

When pass_result=True, each task's messages are augmented with the previous task's response as an assistant message, creating a chain where each step builds on the last.

Parameter Type Default Description
tasks list[LLMTask] required Ordered tasks to run
pass_result bool False Append previous result as assistant message

Returns list[TaskResult] in execution order.

LLMTask

Represents a single LLM call:

from dataknobs_llm import LLMTask, LLMMessage
from dataknobs_common.retry import RetryConfig

task = LLMTask(
    messages=[
        LLMMessage(role="system", content="You are a quiz generator."),
        LLMMessage(role="user", content="Generate a biology question."),
    ],
    config_overrides={"temperature": 0.9},  # Per-task provider overrides
    retry=RetryConfig(max_attempts=2),       # Per-task retry policy
    tag="bio_q1",                            # Auto-set from dict key in execute()
)
Field Type Default Description
messages list[LLMMessage] required Messages to send to the provider
config_overrides dict[str, Any] \| None None Per-task config (temperature, model, etc.)
retry RetryConfig \| None None Per-task retry (overrides executor default)
tag str "" Identifier; auto-populated from dict key

DeterministicTask

Represents a sync or async callable:

from dataknobs_llm import DeterministicTask

# Sync function
task = DeterministicTask(
    fn=compute_hash,
    args=("content",),
    kwargs={"algorithm": "sha256"},
)

# Async function
task = DeterministicTask(
    fn=fetch_from_database,
    args=("record_123",),
)
Field Type Default Description
fn Callable[..., Any] required The callable (sync or async)
args tuple[Any, ...] () Positional arguments
kwargs dict[str, Any] {} Keyword arguments
tag str "" Identifier; auto-populated from dict key

TaskResult

Result of a single task execution:

result = results["q1"]

if result.success:
    response = result.value  # LLMResponse for LLM tasks, Any for deterministic
    print(f"Completed in {result.duration_ms:.1f}ms")
else:
    print(f"Failed: {result.error}")
Field Type Description
tag str Task identifier
success bool Whether the task completed without error
value LLMResponse \| Any Return value (None on failure)
error Exception \| None The exception if failed
duration_ms float Wall-clock execution time in milliseconds

Error Handling

Each task is isolated — a failure in one task does not affect others:

results = await executor.execute({
    "good": LLMTask(messages=[LLMMessage(role="user", content="Hello")]),
    "bad": LLMTask(messages=[LLMMessage(role="user", content="trigger error")]),
})

assert results["good"].success is True   # Unaffected by other failures
assert results["bad"].success is False
assert results["bad"].error is not None

Retry

Tasks can specify retry policies individually or inherit the executor's default:

from dataknobs_common.retry import RetryConfig, BackoffStrategy

# Executor-level default
executor = ParallelLLMExecutor(
    provider=provider,
    default_retry=RetryConfig(max_attempts=3),
)

# Per-task override
results = await executor.execute({
    "critical": LLMTask(
        messages=[LLMMessage(role="user", content="Important task")],
        retry=RetryConfig(
            max_attempts=5,
            backoff_strategy=BackoffStrategy.EXPONENTIAL,
            initial_delay=1.0,
        ),
    ),
    "best_effort": LLMTask(
        messages=[LLMMessage(role="user", content="Optional task")],
        retry=RetryConfig(max_attempts=1),  # No retry
    ),
})

Retry uses RetryExecutor from dataknobs-common, which supports fixed, linear, exponential, jitter, and decorrelated backoff strategies.

Testing

Use EchoProvider for deterministic testing:

from dataknobs_llm import EchoProvider, ParallelLLMExecutor, LLMTask, LLMMessage
from dataknobs_llm.testing import text_response

provider = EchoProvider()
provider.set_responses([
    text_response("Math question"),
    text_response("Science question"),
])

executor = ParallelLLMExecutor(provider, max_concurrency=2)
results = await executor.execute({
    "math": LLMTask(messages=[LLMMessage(role="user", content="math")]),
    "science": LLMTask(messages=[LLMMessage(role="user", content="science")]),
})

assert results["math"].success
assert results["science"].success

Testing Error Handling

EchoProvider supports ErrorResponse for simulating failures:

from dataknobs_llm.testing import text_response, ErrorResponse

provider = EchoProvider()
provider.set_responses([
    text_response("OK"),
    ErrorResponse(RuntimeError("simulated failure")),
])

Imports

All main types are available from the top-level dataknobs_llm package:

from dataknobs_llm import (
    ParallelLLMExecutor,
    LLMTask,
    DeterministicTask,
    TaskResult,
    LLMMessage,
)