LeetCode 2756 - Query Batching
The problem asks us to design a throttled batching system for asynchronous queries. Instead of immediately sending every incoming request individually, we want to intelligently combine multiple requests together whenever possible.
Difficulty: 🔴 Hard
Topics: —
Solution
Problem Understanding
The problem asks us to design a throttled batching system for asynchronous queries. Instead of immediately sending every incoming request individually, we want to intelligently combine multiple requests together whenever possible.
We are given a class called QueryBatcher that receives two constructor parameters:
queryMultiple, an asynchronous function that accepts an array of string keys and returns an array of corresponding string values.t, a throttle interval in milliseconds.
The class exposes one method:
async getValue(key)
This method accepts a single key and eventually resolves to the corresponding value.
The important requirement is that calls to queryMultiple must obey a throttling rule:
- The very first request should execute immediately.
- After a batch is executed, another batch cannot execute until at least
tmilliseconds later. - Any requests that arrive during the cooldown period should be queued together into the next batch.
This creates a behavior similar to rate-limited batching. Multiple pending requests are grouped into one call after the throttle interval expires.
The returned array from queryMultiple(keys) matches the input order exactly. This means:
keys[i] -> result[i]
This property is extremely important because it allows us to associate each pending request with its correct response.
The constraints are small:
t <= 1000calls.length <= 10
However, this is fundamentally a systems design and asynchronous coordination problem rather than a performance-heavy algorithmic problem. The real challenge is correctly coordinating timers, promises, and batching behavior.
Several edge cases are important:
t = 0, batches may execute back-to-back immediately.- A request may arrive while another asynchronous query is still executing.
- Multiple requests may arrive during the throttle window and must be grouped together.
- The asynchronous function itself may take time to resolve.
- Requests arriving after the cooldown but before the previous query finishes must still execute correctly.
The problem also guarantees:
queryMultiplenever rejects.- Every key is unique.
- Returned arrays always align with input arrays.
Those guarantees simplify error handling and mapping logic.
Approaches
Brute Force Approach
The simplest implementation is to completely ignore batching and call queryMultiple([key]) every time getValue is called.
For example:
async getValue(key):
result = await queryMultiple([key])
return result[0]
This is obviously correct because every request gets its proper answer. However, it completely defeats the purpose of batching and throttling.
This implementation violates the requirement that consecutive calls to queryMultiple cannot occur within t milliseconds.
A slightly less naive brute force approach could repeatedly poll queued requests and manually sleep until the throttle interval passes. However, this still creates unnecessary overhead and awkward synchronization logic.
The core issue is that requests arriving during the cooldown period should be grouped together automatically. A better solution needs shared state and coordinated scheduling.
Key Insight
The important observation is that the batching behavior naturally divides execution into windows.
Whenever a batch executes:
- We immediately enter a cooldown period of length
t. - Any incoming requests during this cooldown are accumulated.
- Once the cooldown ends, all accumulated requests execute together.
This means we need:
- A queue of pending requests.
- A timer controlling when the next batch may run.
- A mechanism to resolve each request's individual promise after the batched query completes.
The optimal solution stores pending requests in memory and schedules exactly one future execution for the next batch.
Each queued request stores:
- The key.
- A resolver function for its promise.
When the batch executes:
- Extract all queued keys.
- Call
queryMultiple(keys). - Match results back to each resolver.
This creates an efficient event-driven batching system.
Approach Comparison
| Approach | Time Complexity | Space Complexity | Notes |
|---|---|---|---|
| Brute Force | O(n) queries | O(1) | Executes every request individually, violates throttling |
| Optimal | O(n) total | O(n) | Queues requests and executes batched asynchronous queries |
Algorithm Walkthrough
- Store the provided
queryMultiplefunction and throttle durationtinside the class. - Maintain a queue of pending requests. Each queue entry contains:
- The request key.
- The resolver function for the promise returned by
getValue.
- Maintain a boolean or timer state indicating whether a cooldown window is currently active.
- When
getValue(key)is called:
- Create a new promise.
- Store its resolver together with the key inside the queue.
- If no cooldown is currently active:
- Immediately execute a batch using all currently queued requests.
- Start a cooldown timer for
tmilliseconds.
- While the cooldown timer is active:
- Additional requests are only appended to the queue.
- No new batch is executed yet.
- When the cooldown timer expires:
- If the queue is non-empty, execute another batch immediately.
- Restart the cooldown timer again.
- During batch execution:
- Extract all queued requests.
- Build the array of keys.
- Call
await queryMultiple(keys). - Resolve each stored promise using the corresponding returned value.
- If the queue becomes empty after cooldown expiration:
- Stop scheduling additional batches.
- The system becomes idle until another request arrives.
Why it works
The algorithm maintains the invariant that at most one batch execution may start within any interval of length t.
Every request arriving during a cooldown window is appended into the pending queue. When the cooldown expires, the entire accumulated queue executes together in one batched query.
Because queryMultiple preserves input ordering, each queued resolver can safely receive the corresponding returned value at the same index.
Thus every request is eventually resolved correctly while preserving the throttling constraint.
Python Solution
from typing import List, Callable, Awaitable
import asyncio
class QueryBatcher:
def __init__(
self,
queryMultiple: Callable[[List[str]], Awaitable[List[str]]],
t: int
):
self.queryMultiple = queryMultiple
self.t = t / 1000 # convert milliseconds to seconds
self.queue = []
self.cooldown_active = False
async def _process_batch(self) -> None:
while self.queue:
current_batch = self.queue
self.queue = []
keys = [item["key"] for item in current_batch]
results = await self.queryMultiple(keys)
for item, value in zip(current_batch, results):
item["future"].set_result(value)
await asyncio.sleep(self.t)
self.cooldown_active = False
async def getValue(self, key: str) -> str:
loop = asyncio.get_running_loop()
future = loop.create_future()
self.queue.append({
"key": key,
"future": future
})
if not self.cooldown_active:
self.cooldown_active = True
asyncio.create_task(self._process_batch())
return await future
The implementation revolves around a shared request queue and a background batch-processing coroutine.
The constructor stores the asynchronous query function and converts the throttle interval from milliseconds into seconds because Python's asyncio.sleep uses seconds.
The queue stores dictionaries containing:
- The request key.
- The
Futureobject associated with that request.
The getValue method creates a new future for every incoming request. That future represents the eventual result returned to the caller.
If the batching system is currently idle, we immediately launch _process_batch() as a background task. Otherwise, the request simply waits inside the queue.
The _process_batch coroutine repeatedly processes queued requests:
- Snapshot the current queue.
- Clear the shared queue.
- Execute
queryMultiple. - Resolve every stored future.
- Sleep for the throttle interval.
If additional requests arrive during the sleep interval, they accumulate in the queue and become the next batch.
Once the queue becomes empty, the processor exits and marks the system as inactive.
Go Solution
package main
import (
"time"
)
type pendingRequest struct {
key string
resolve chan string
}
type QueryBatcher struct {
queryMultiple func([]string) <-chan []string
t time.Duration
queue []pendingRequest
cooldownActive bool
mutex chan struct{}
}
func Constructor(
queryMultiple func([]string) <-chan []string,
t int,
) QueryBatcher {
mutex := make(chan struct{}, 1)
mutex <- struct{}{}
return QueryBatcher{
queryMultiple: queryMultiple,
t: time.Duration(t) * time.Millisecond,
queue: []pendingRequest{},
mutex: mutex,
}
}
func (qb *QueryBatcher) lock() {
<-qb.mutex
}
func (qb *QueryBatcher) unlock() {
qb.mutex <- struct{}{}
}
func (qb *QueryBatcher) processBatch() {
for {
qb.lock()
if len(qb.queue) == 0 {
qb.cooldownActive = false
qb.unlock()
return
}
currentBatch := qb.queue
qb.queue = nil
qb.unlock()
keys := make([]string, len(currentBatch))
for i, item := range currentBatch {
keys[i] = item.key
}
results := <-qb.queryMultiple(keys)
for i, value := range results {
currentBatch[i].resolve <- value
}
time.Sleep(qb.t)
}
}
func (qb *QueryBatcher) GetValue(key string) <-chan string {
resultChan := make(chan string, 1)
qb.lock()
qb.queue = append(qb.queue, pendingRequest{
key: key,
resolve: resultChan,
})
if !qb.cooldownActive {
qb.cooldownActive = true
go qb.processBatch()
}
qb.unlock()
return resultChan
}
The Go implementation uses goroutines and channels instead of Python futures and asyncio tasks.
Because multiple goroutines may access the shared queue simultaneously, synchronization is necessary. This implementation uses a channel-based mutex to protect shared state.
Each request owns a dedicated response channel. When a batch finishes executing, the corresponding result is written into that channel.
The overall batching logic remains identical to the Python version.
Worked Examples
Example 1
t = 100
calls:
a at 10
b at 20
c at 30
Timeline
| Time | Event | Queue | Action |
|---|---|---|---|
| 10 | getValue(a) | [a] | Immediate batch executes |
| 10 | queryMultiple([a]) | [] | a resolves immediately |
| 20 | getValue(b) | [b] | queued |
| 30 | getValue(c) | [b, c] | queued |
| 110 | cooldown expires | [b, c] | execute batch |
| 110 | queryMultiple([b, c]) | [] | both resolve |
Final results:
| Key | Resolve Time |
|---|---|
| a | 10 |
| b | 110 |
| c | 110 |
Example 2
Now the asynchronous query itself takes 100ms.
Timeline
| Time | Event |
|---|---|
| 10 | batch [a] starts |
| 110 | a resolves |
| 110 | cooldown completes |
| 110 | batch [b, c] starts |
| 210 | b and c resolve |
The throttling logic remains identical. Only the query duration changes.
Example 3
Timeline
| Time | Event | Queue |
|---|---|---|
| 10 | batch [a] starts | [] |
| 20 | b queued | [b] |
| 30 | c queued | [b, c] |
| 40 | d queued | [b, c, d] |
| 110 | a resolves, cooldown ends | [b, c, d] |
| 110 | batch [b, c, d] starts | [] |
| 250 | e arrives | [e] |
| 300 | f arrives | [e, f] |
| 350 | e batch executes | [f] |
| 410 | b,c,d resolve | [f] |
| 450 | f resolves | [] |
This example demonstrates that overlapping asynchronous execution and cooldown windows are handled independently.
Complexity Analysis
| Measure | Complexity | Explanation |
|---|---|---|
| Time | O(n) | Each request enters and leaves the queue exactly once |
| Space | O(n) | Pending requests are stored in the queue |
The batching system processes each request a constant number of times:
- Once when inserted into the queue.
- Once during batch extraction.
- Once during result resolution.
No request is revisited repeatedly, so the total work scales linearly with the number of requests.
The queue may temporarily hold all pending requests simultaneously, requiring linear auxiliary space.
Test Cases
import asyncio
async def run_tests():
# Example 1
async def qm1(keys):
return [k + "!" for k in keys]
batcher = QueryBatcher(qm1, 100)
a = asyncio.create_task(batcher.getValue("a"))
b = asyncio.create_task(batcher.getValue("b"))
c = asyncio.create_task(batcher.getValue("c"))
assert await a == "a!" # immediate batch
assert await b == "b!" # queued request
assert await c == "c!" # queued request
# Zero throttle
async def qm2(keys):
return keys
batcher = QueryBatcher(qm2, 0)
x = await batcher.getValue("x")
assert x == "x" # no cooldown delay
# Large async delay
async def qm3(keys):
await asyncio.sleep(0.2)
return [k.upper() for k in keys]
batcher = QueryBatcher(qm3, 100)
tasks = [
asyncio.create_task(batcher.getValue("a")),
asyncio.create_task(batcher.getValue("b")),
asyncio.create_task(batcher.getValue("c")),
]
results = await asyncio.gather(*tasks)
assert results == ["A", "B", "C"] # ordering preserved
# Single request
async def qm4(keys):
return ["only"]
batcher = QueryBatcher(qm4, 100)
result = await batcher.getValue("single")
assert result == "only" # simplest case
# Multiple independent windows
async def qm5(keys):
return [k + "_done" for k in keys]
batcher = QueryBatcher(qm5, 50)
r1 = asyncio.create_task(batcher.getValue("a"))
await asyncio.sleep(0.1)
r2 = asyncio.create_task(batcher.getValue("b"))
assert await r1 == "a_done" # first window
assert await r2 == "b_done" # second window
asyncio.run(run_tests())
Test Summary
| Test | Why |
|---|---|
| Example 1 | Validates basic throttled batching |
| Zero throttle | Ensures immediate repeated execution works |
| Large async delay | Confirms batching still works with slow queries |
| Single request | Validates simplest possible input |
| Multiple windows | Ensures cooldown resets correctly |
Edge Cases
Zero Throttle Time
When t = 0, batches may execute back-to-back immediately. A buggy implementation could accidentally enter an infinite scheduling loop or incorrectly merge requests.
The implementation handles this naturally because asyncio.sleep(0) simply yields control briefly before continuing. Requests still process correctly in sequential batches.
Requests Arriving During Long Async Queries
A particularly tricky case occurs when queryMultiple itself takes significant time to finish. New requests may arrive while a previous query is still running.
The implementation correctly separates batching windows from query execution. Incoming requests are safely queued and processed during the next available batch cycle.
Empty Queue After Cooldown
After a cooldown expires, there may be no queued requests waiting. An incorrect implementation might continue spawning unnecessary timers forever.
The solution explicitly checks whether the queue is empty. If it is, the processing coroutine exits and the system becomes idle again.
Correct Result Mapping
Because batched queries return arrays of results, incorrect indexing could associate values with the wrong requests.
The implementation guarantees correctness by preserving request order:
keys[i] <-> results[i]
Each resolver receives the value at its matching index.