LeetCode 2756 - Query Batching

The problem asks us to design a throttled batching system for asynchronous queries. Instead of immediately sending every incoming request individually, we want to intelligently combine multiple requests together whenever possible.

LeetCode Problem 2756

Difficulty: 🔴 Hard
Topics:

Solution

Problem Understanding

The problem asks us to design a throttled batching system for asynchronous queries. Instead of immediately sending every incoming request individually, we want to intelligently combine multiple requests together whenever possible.

We are given a class called QueryBatcher that receives two constructor parameters:

  • queryMultiple, an asynchronous function that accepts an array of string keys and returns an array of corresponding string values.
  • t, a throttle interval in milliseconds.

The class exposes one method:

async getValue(key)

This method accepts a single key and eventually resolves to the corresponding value.

The important requirement is that calls to queryMultiple must obey a throttling rule:

  • The very first request should execute immediately.
  • After a batch is executed, another batch cannot execute until at least t milliseconds later.
  • Any requests that arrive during the cooldown period should be queued together into the next batch.

This creates a behavior similar to rate-limited batching. Multiple pending requests are grouped into one call after the throttle interval expires.

The returned array from queryMultiple(keys) matches the input order exactly. This means:

keys[i] -> result[i]

This property is extremely important because it allows us to associate each pending request with its correct response.

The constraints are small:

  • t <= 1000
  • calls.length <= 10

However, this is fundamentally a systems design and asynchronous coordination problem rather than a performance-heavy algorithmic problem. The real challenge is correctly coordinating timers, promises, and batching behavior.

Several edge cases are important:

  • t = 0, batches may execute back-to-back immediately.
  • A request may arrive while another asynchronous query is still executing.
  • Multiple requests may arrive during the throttle window and must be grouped together.
  • The asynchronous function itself may take time to resolve.
  • Requests arriving after the cooldown but before the previous query finishes must still execute correctly.

The problem also guarantees:

  • queryMultiple never rejects.
  • Every key is unique.
  • Returned arrays always align with input arrays.

Those guarantees simplify error handling and mapping logic.

Approaches

Brute Force Approach

The simplest implementation is to completely ignore batching and call queryMultiple([key]) every time getValue is called.

For example:

async getValue(key):
    result = await queryMultiple([key])
    return result[0]

This is obviously correct because every request gets its proper answer. However, it completely defeats the purpose of batching and throttling.

This implementation violates the requirement that consecutive calls to queryMultiple cannot occur within t milliseconds.

A slightly less naive brute force approach could repeatedly poll queued requests and manually sleep until the throttle interval passes. However, this still creates unnecessary overhead and awkward synchronization logic.

The core issue is that requests arriving during the cooldown period should be grouped together automatically. A better solution needs shared state and coordinated scheduling.

Key Insight

The important observation is that the batching behavior naturally divides execution into windows.

Whenever a batch executes:

  1. We immediately enter a cooldown period of length t.
  2. Any incoming requests during this cooldown are accumulated.
  3. Once the cooldown ends, all accumulated requests execute together.

This means we need:

  • A queue of pending requests.
  • A timer controlling when the next batch may run.
  • A mechanism to resolve each request's individual promise after the batched query completes.

The optimal solution stores pending requests in memory and schedules exactly one future execution for the next batch.

Each queued request stores:

  • The key.
  • A resolver function for its promise.

When the batch executes:

  1. Extract all queued keys.
  2. Call queryMultiple(keys).
  3. Match results back to each resolver.

This creates an efficient event-driven batching system.

Approach Comparison

Approach Time Complexity Space Complexity Notes
Brute Force O(n) queries O(1) Executes every request individually, violates throttling
Optimal O(n) total O(n) Queues requests and executes batched asynchronous queries

Algorithm Walkthrough

  1. Store the provided queryMultiple function and throttle duration t inside the class.
  2. Maintain a queue of pending requests. Each queue entry contains:
  • The request key.
  • The resolver function for the promise returned by getValue.
  1. Maintain a boolean or timer state indicating whether a cooldown window is currently active.
  2. When getValue(key) is called:
  • Create a new promise.
  • Store its resolver together with the key inside the queue.
  1. If no cooldown is currently active:
  • Immediately execute a batch using all currently queued requests.
  • Start a cooldown timer for t milliseconds.
  1. While the cooldown timer is active:
  • Additional requests are only appended to the queue.
  • No new batch is executed yet.
  1. When the cooldown timer expires:
  • If the queue is non-empty, execute another batch immediately.
  • Restart the cooldown timer again.
  1. During batch execution:
  • Extract all queued requests.
  • Build the array of keys.
  • Call await queryMultiple(keys).
  • Resolve each stored promise using the corresponding returned value.
  1. If the queue becomes empty after cooldown expiration:
  • Stop scheduling additional batches.
  • The system becomes idle until another request arrives.

Why it works

The algorithm maintains the invariant that at most one batch execution may start within any interval of length t.

Every request arriving during a cooldown window is appended into the pending queue. When the cooldown expires, the entire accumulated queue executes together in one batched query.

Because queryMultiple preserves input ordering, each queued resolver can safely receive the corresponding returned value at the same index.

Thus every request is eventually resolved correctly while preserving the throttling constraint.

Python Solution

from typing import List, Callable, Awaitable
import asyncio

class QueryBatcher:

    def __init__(
        self,
        queryMultiple: Callable[[List[str]], Awaitable[List[str]]],
        t: int
    ):
        self.queryMultiple = queryMultiple
        self.t = t / 1000  # convert milliseconds to seconds

        self.queue = []
        self.cooldown_active = False

    async def _process_batch(self) -> None:
        while self.queue:
            current_batch = self.queue
            self.queue = []

            keys = [item["key"] for item in current_batch]

            results = await self.queryMultiple(keys)

            for item, value in zip(current_batch, results):
                item["future"].set_result(value)

            await asyncio.sleep(self.t)

        self.cooldown_active = False

    async def getValue(self, key: str) -> str:
        loop = asyncio.get_running_loop()
        future = loop.create_future()

        self.queue.append({
            "key": key,
            "future": future
        })

        if not self.cooldown_active:
            self.cooldown_active = True
            asyncio.create_task(self._process_batch())

        return await future

The implementation revolves around a shared request queue and a background batch-processing coroutine.

The constructor stores the asynchronous query function and converts the throttle interval from milliseconds into seconds because Python's asyncio.sleep uses seconds.

The queue stores dictionaries containing:

  • The request key.
  • The Future object associated with that request.

The getValue method creates a new future for every incoming request. That future represents the eventual result returned to the caller.

If the batching system is currently idle, we immediately launch _process_batch() as a background task. Otherwise, the request simply waits inside the queue.

The _process_batch coroutine repeatedly processes queued requests:

  1. Snapshot the current queue.
  2. Clear the shared queue.
  3. Execute queryMultiple.
  4. Resolve every stored future.
  5. Sleep for the throttle interval.

If additional requests arrive during the sleep interval, they accumulate in the queue and become the next batch.

Once the queue becomes empty, the processor exits and marks the system as inactive.

Go Solution

package main

import (
	"time"
)

type pendingRequest struct {
	key      string
	resolve  chan string
}

type QueryBatcher struct {
	queryMultiple func([]string) <-chan []string
	t             time.Duration

	queue          []pendingRequest
	cooldownActive bool
	mutex          chan struct{}
}

func Constructor(
	queryMultiple func([]string) <-chan []string,
	t int,
) QueryBatcher {
	mutex := make(chan struct{}, 1)
	mutex <- struct{}{}

	return QueryBatcher{
		queryMultiple: queryMultiple,
		t:             time.Duration(t) * time.Millisecond,
		queue:         []pendingRequest{},
		mutex:         mutex,
	}
}

func (qb *QueryBatcher) lock() {
	<-qb.mutex
}

func (qb *QueryBatcher) unlock() {
	qb.mutex <- struct{}{}
}

func (qb *QueryBatcher) processBatch() {
	for {
		qb.lock()

		if len(qb.queue) == 0 {
			qb.cooldownActive = false
			qb.unlock()
			return
		}

		currentBatch := qb.queue
		qb.queue = nil

		qb.unlock()

		keys := make([]string, len(currentBatch))

		for i, item := range currentBatch {
			keys[i] = item.key
		}

		results := <-qb.queryMultiple(keys)

		for i, value := range results {
			currentBatch[i].resolve <- value
		}

		time.Sleep(qb.t)
	}
}

func (qb *QueryBatcher) GetValue(key string) <-chan string {
	resultChan := make(chan string, 1)

	qb.lock()

	qb.queue = append(qb.queue, pendingRequest{
		key:     key,
		resolve: resultChan,
	})

	if !qb.cooldownActive {
		qb.cooldownActive = true
		go qb.processBatch()
	}

	qb.unlock()

	return resultChan
}

The Go implementation uses goroutines and channels instead of Python futures and asyncio tasks.

Because multiple goroutines may access the shared queue simultaneously, synchronization is necessary. This implementation uses a channel-based mutex to protect shared state.

Each request owns a dedicated response channel. When a batch finishes executing, the corresponding result is written into that channel.

The overall batching logic remains identical to the Python version.

Worked Examples

Example 1

t = 100
calls:
a at 10
b at 20
c at 30

Timeline

Time Event Queue Action
10 getValue(a) [a] Immediate batch executes
10 queryMultiple([a]) [] a resolves immediately
20 getValue(b) [b] queued
30 getValue(c) [b, c] queued
110 cooldown expires [b, c] execute batch
110 queryMultiple([b, c]) [] both resolve

Final results:

Key Resolve Time
a 10
b 110
c 110

Example 2

Now the asynchronous query itself takes 100ms.

Timeline

Time Event
10 batch [a] starts
110 a resolves
110 cooldown completes
110 batch [b, c] starts
210 b and c resolve

The throttling logic remains identical. Only the query duration changes.

Example 3

Timeline

Time Event Queue
10 batch [a] starts []
20 b queued [b]
30 c queued [b, c]
40 d queued [b, c, d]
110 a resolves, cooldown ends [b, c, d]
110 batch [b, c, d] starts []
250 e arrives [e]
300 f arrives [e, f]
350 e batch executes [f]
410 b,c,d resolve [f]
450 f resolves []

This example demonstrates that overlapping asynchronous execution and cooldown windows are handled independently.

Complexity Analysis

Measure Complexity Explanation
Time O(n) Each request enters and leaves the queue exactly once
Space O(n) Pending requests are stored in the queue

The batching system processes each request a constant number of times:

  • Once when inserted into the queue.
  • Once during batch extraction.
  • Once during result resolution.

No request is revisited repeatedly, so the total work scales linearly with the number of requests.

The queue may temporarily hold all pending requests simultaneously, requiring linear auxiliary space.

Test Cases

import asyncio

async def run_tests():

    # Example 1
    async def qm1(keys):
        return [k + "!" for k in keys]

    batcher = QueryBatcher(qm1, 100)

    a = asyncio.create_task(batcher.getValue("a"))
    b = asyncio.create_task(batcher.getValue("b"))
    c = asyncio.create_task(batcher.getValue("c"))

    assert await a == "a!"  # immediate batch
    assert await b == "b!"  # queued request
    assert await c == "c!"  # queued request

    # Zero throttle
    async def qm2(keys):
        return keys

    batcher = QueryBatcher(qm2, 0)

    x = await batcher.getValue("x")
    assert x == "x"  # no cooldown delay

    # Large async delay
    async def qm3(keys):
        await asyncio.sleep(0.2)
        return [k.upper() for k in keys]

    batcher = QueryBatcher(qm3, 100)

    tasks = [
        asyncio.create_task(batcher.getValue("a")),
        asyncio.create_task(batcher.getValue("b")),
        asyncio.create_task(batcher.getValue("c")),
    ]

    results = await asyncio.gather(*tasks)

    assert results == ["A", "B", "C"]  # ordering preserved

    # Single request
    async def qm4(keys):
        return ["only"]

    batcher = QueryBatcher(qm4, 100)

    result = await batcher.getValue("single")
    assert result == "only"  # simplest case

    # Multiple independent windows
    async def qm5(keys):
        return [k + "_done" for k in keys]

    batcher = QueryBatcher(qm5, 50)

    r1 = asyncio.create_task(batcher.getValue("a"))

    await asyncio.sleep(0.1)

    r2 = asyncio.create_task(batcher.getValue("b"))

    assert await r1 == "a_done"  # first window
    assert await r2 == "b_done"  # second window

asyncio.run(run_tests())

Test Summary

Test Why
Example 1 Validates basic throttled batching
Zero throttle Ensures immediate repeated execution works
Large async delay Confirms batching still works with slow queries
Single request Validates simplest possible input
Multiple windows Ensures cooldown resets correctly

Edge Cases

Zero Throttle Time

When t = 0, batches may execute back-to-back immediately. A buggy implementation could accidentally enter an infinite scheduling loop or incorrectly merge requests.

The implementation handles this naturally because asyncio.sleep(0) simply yields control briefly before continuing. Requests still process correctly in sequential batches.

Requests Arriving During Long Async Queries

A particularly tricky case occurs when queryMultiple itself takes significant time to finish. New requests may arrive while a previous query is still running.

The implementation correctly separates batching windows from query execution. Incoming requests are safely queued and processed during the next available batch cycle.

Empty Queue After Cooldown

After a cooldown expires, there may be no queued requests waiting. An incorrect implementation might continue spawning unnecessary timers forever.

The solution explicitly checks whether the queue is empty. If it is, the processing coroutine exits and the system becomes idle again.

Correct Result Mapping

Because batched queries return arrays of results, incorrect indexing could associate values with the wrong requests.

The implementation guarantees correctness by preserving request order:

keys[i] <-> results[i]

Each resolver receives the value at its matching index.