LeetCode 1882 - Process Tasks Using Servers
This problem asks us to simulate a task scheduling system with two priority rules. We are given a list of servers and a list of tasks. Each server has a weight, and each task has a processing duration. At second j, task j becomes available and enters a queue.
Difficulty: 🟡 Medium
Topics: Array, Heap (Priority Queue)
Solution
Problem Understanding
This problem asks us to simulate a task scheduling system with two priority rules. We are given a list of servers and a list of tasks. Each server has a weight, and each task has a processing duration.
At second j, task j becomes available and enters a queue. Tasks must always be assigned in arrival order. Whenever there is at least one free server and at least one waiting task, we assign the earliest waiting task to the best available server.
The definition of the "best" server follows two priorities:
- Smaller server weight is better.
- If weights are equal, smaller server index is better.
Once a server starts processing a task, it becomes unavailable until the task finishes. If no servers are free when tasks are waiting, time effectively advances until at least one server becomes free again.
The goal is to return an array where ans[j] is the index of the server that processes task j.
The input arrays represent:
servers[i], the weight of serveritasks[j], the amount of time taskjtakes to complete
The constraints are large:
- Up to
2 * 10^5servers - Up to
2 * 10^5tasks
This immediately tells us that any quadratic solution will time out. We need something close to O(m log n) or O((m+n) log n).
Several edge cases are important:
- Multiple servers can become free at the same time.
- Multiple servers can have identical weights.
- Tasks can pile up when all servers are busy.
- There may be far more tasks than servers.
- Servers may stay busy for long periods while tasks continue arriving every second.
A naive simulation that checks every server repeatedly would be far too slow.
Approaches
Brute Force Approach
A straightforward simulation would process tasks one second at a time.
For each arriving task:
- Scan all servers to find which ones are free.
- Among free servers, select the one with minimum
(weight, index). - If no server is free, advance time until one becomes available.
- Repeat for every task.
This approach is correct because it directly follows the problem rules. However, it is extremely inefficient.
Each task assignment could require scanning all n servers, leading to O(m * n) complexity. With both values reaching 2 * 10^5, this could require up to 4 * 10^10 operations, which is completely impractical.
Optimal Approach
The key observation is that we repeatedly need two efficient operations:
- Find the best currently available server.
- Find which busy server becomes free next.
Both operations are classic priority queue problems.
We use two heaps:
- An available heap, ordered by
(weight, index) - A busy heap, ordered by
(free_time, weight, index)
The available heap always gives us the best server immediately.
The busy heap lets us efficiently determine which servers finish next.
Instead of simulating every second, we jump directly to important moments:
- Task arrival times
- Server completion times
This avoids unnecessary work and keeps the solution efficient.
| Approach | Time Complexity | Space Complexity | Notes |
|---|---|---|---|
| Brute Force | O(m * n) | O(n) | Scans all servers repeatedly |
| Optimal | O((m + n) log n) | O(n) | Uses two priority queues for efficient scheduling |
Algorithm Walkthrough
- Initialize an available server heap.
Every server starts free, so we insert all servers into a min heap ordered by:
- server weight
- server index
Each heap entry looks like:
(weight, index)
- Initialize a busy server heap.
This heap stores servers currently processing tasks. It is ordered by:
- time when server becomes free
- server weight
- server index
Each heap entry looks like:
(free_time, weight, index)
- Process tasks in order.
Task j arrives at time j.
4. Before assigning the current task, free all completed servers.
While the busy heap contains servers whose free_time <= current_time, remove them from the busy heap and push them back into the available heap.
This ensures the available heap always reflects the servers currently free. 5. Handle the case where no server is available.
If the available heap is empty, we cannot process the task immediately.
In this case:
- Take the server with the earliest
free_timefrom the busy heap. - Advance the current time to that
free_time. - Free every server that becomes available at this same moment.
- Assign the task.
Pop the best server from the available heap.
Record its index in the answer array. 7. Mark the server busy again.
The task finishes at:
current_time + task_duration
Push the server into the busy heap with this new completion time. 8. Continue until all tasks are assigned.
Why it works
The algorithm maintains two invariants:
- The available heap always contains exactly the currently free servers, ordered by scheduling priority.
- The busy heap always contains exactly the busy servers, ordered by next completion time.
Whenever a task must be assigned, the algorithm either:
- Uses the best currently free server, or
- Advances time to the earliest possible moment a server becomes free.
Because tasks are processed strictly in insertion order and servers are always selected using the required priority rules, the algorithm exactly matches the scheduling behavior described in the problem.
Python Solution
from typing import List
import heapq
class Solution:
def assignTasks(self, servers: List[int], tasks: List[int]) -> List[int]:
available = []
busy = []
# (weight, index)
for index, weight in enumerate(servers):
heapq.heappush(available, (weight, index))
result = [0] * len(tasks)
for current_time, task_duration in enumerate(tasks):
# Free all servers that have completed by current_time
while busy and busy[0][0] <= current_time:
free_time, weight, index = heapq.heappop(busy)
heapq.heappush(available, (weight, index))
# If no server is available, jump time forward
if not available:
current_time = busy[0][0]
while busy and busy[0][0] <= current_time:
free_time, weight, index = heapq.heappop(busy)
heapq.heappush(available, (weight, index))
# Assign task to best available server
weight, index = heapq.heappop(available)
result[current_time if False else len(result) - len(tasks) + current_time] = index
The previous line is incorrect because current_time may jump forward when servers are busy. We should instead store the task index separately. Here is the corrected complete solution:
from typing import List
import heapq
class Solution:
def assignTasks(self, servers: List[int], tasks: List[int]) -> List[int]:
available = []
busy = []
# Available servers:
# (weight, index)
for index, weight in enumerate(servers):
heapq.heappush(available, (weight, index))
result = [0] * len(tasks)
for task_index, task_duration in enumerate(tasks):
current_time = task_index
# Move completed servers back to available heap
while busy and busy[0][0] <= current_time:
free_time, weight, index = heapq.heappop(busy)
heapq.heappush(available, (weight, index))
# If no servers are free, jump to next available time
if not available:
current_time = busy[0][0]
while busy and busy[0][0] <= current_time:
free_time, weight, index = heapq.heappop(busy)
heapq.heappush(available, (weight, index))
# Get best available server
weight, index = heapq.heappop(available)
result[task_index] = index
# Mark server busy
finish_time = current_time + task_duration
heapq.heappush(busy, (finish_time, weight, index))
return result
The implementation directly mirrors the algorithm.
The available heap stores free servers ordered by (weight, index). This guarantees that popping from the heap always gives the correct server according to the problem rules.
The busy heap stores servers currently processing tasks. Its ordering by finish_time allows us to efficiently determine which server becomes free next.
For every task, we first release all servers whose work has completed. Then we either immediately assign the task or advance time to the next server completion event.
The answer array records the assigned server index for each task in arrival order.
Go Solution
package main
import (
"container/heap"
)
type AvailableServer struct {
weight int
index int
}
type BusyServer struct {
freeTime int
weight int
index int
}
type AvailableHeap []AvailableServer
func (h AvailableHeap) Len() int {
return len(h)
}
func (h AvailableHeap) Less(i, j int) bool {
if h[i].weight == h[j].weight {
return h[i].index < h[j].index
}
return h[i].weight < h[j].weight
}
func (h AvailableHeap) Swap(i, j int) {
h[i], h[j] = h[j], h[i]
}
func (h *AvailableHeap) Push(x interface{}) {
*h = append(*h, x.(AvailableServer))
}
func (h *AvailableHeap) Pop() interface{} {
old := *h
n := len(old)
item := old[n-1]
*h = old[:n-1]
return item
}
type BusyHeap []BusyServer
func (h BusyHeap) Len() int {
return len(h)
}
func (h BusyHeap) Less(i, j int) bool {
if h[i].freeTime == h[j].freeTime {
if h[i].weight == h[j].weight {
return h[i].index < h[j].index
}
return h[i].weight < h[j].weight
}
return h[i].freeTime < h[j].freeTime
}
func (h BusyHeap) Swap(i, j int) {
h[i], h[j] = h[j], h[i]
}
func (h *BusyHeap) Push(x interface{}) {
*h = append(*h, x.(BusyServer))
}
func (h *BusyHeap) Pop() interface{} {
old := *h
n := len(old)
item := old[n-1]
*h = old[:n-1]
return item
}
func assignTasks(servers []int, tasks []int) []int {
available := &AvailableHeap{}
busy := &BusyHeap{}
heap.Init(available)
heap.Init(busy)
for index, weight := range servers {
heap.Push(available, AvailableServer{
weight: weight,
index: index,
})
}
result := make([]int, len(tasks))
for taskIndex, duration := range tasks {
currentTime := taskIndex
// Free completed servers
for busy.Len() > 0 && (*busy)[0].freeTime <= currentTime {
server := heap.Pop(busy).(BusyServer)
heap.Push(available, AvailableServer{
weight: server.weight,
index: server.index,
})
}
// No available server
if available.Len() == 0 {
currentTime = (*busy)[0].freeTime
for busy.Len() > 0 && (*busy)[0].freeTime <= currentTime {
server := heap.Pop(busy).(BusyServer)
heap.Push(available, AvailableServer{
weight: server.weight,
index: server.index,
})
}
}
server := heap.Pop(available).(AvailableServer)
result[taskIndex] = server.index
heap.Push(busy, BusyServer{
freeTime: currentTime + duration,
weight: server.weight,
index: server.index,
})
}
return result
}
The Go implementation follows the same logic as the Python solution but requires explicit heap types implementing the heap.Interface.
Go's container/heap package does not provide built in min heaps for tuples, so we define custom heap structures and comparison logic manually.
The solution uses int safely because the maximum possible time is within Go's integer range.
Worked Examples
Example 1
Input:
servers = [3,3,2]
tasks = [1,2,3,2,1,2]
Initial available heap:
[(2,2), (3,0), (3,1)]
Initial busy heap:
[]
| Time | Task | Available Servers | Busy Servers | Assigned Server |
|---|---|---|---|---|
| 0 | 1 | (2,2),(3,0),(3,1) | [] | 2 |
| 1 | 2 | (3,0),(3,1) | server 2 finishes | 2 |
| 2 | 3 | (3,0),(3,1) | server 2 busy until 3 | 0 |
| 3 | 2 | (3,1) plus server 2 freed | server 0 busy until 5 | 2 |
| 4 | 1 | (3,1) | servers 0 and 2 busy | 1 |
| 5 | 2 | all free again | [] | 2 |
Final answer:
[2,2,0,2,1,2]
Example 2
Input:
servers = [5,1,4,3,2]
tasks = [2,1,2,4,5,2,1]
Initial available heap:
[(1,1),(2,4),(4,2),(5,0),(3,3)]
| Time | Task | Assigned Server | Finish Time |
|---|---|---|---|
| 0 | 2 | 1 | 2 |
| 1 | 1 | 4 | 2 |
| 2 | 2 | 1 | 4 |
| 3 | 4 | 4 | 7 |
| 4 | 5 | 1 | 9 |
| 5 | 2 | 3 | 7 |
| 6 | 1 | 2 | 7 |
Final answer:
[1,4,1,4,1,3,2]
Complexity Analysis
| Measure | Complexity | Explanation |
|---|---|---|
| Time | O((m + n) log n) | Every heap operation costs O(log n) |
| Space | O(n) | Both heaps together store at most n servers |
Each server is pushed and popped from heaps multiple times, but every heap operation costs only O(log n).
For every task:
- We may move servers between heaps.
- We assign exactly one server.
Since there are m tasks and heap sizes never exceed n, the total complexity is O((m + n) log n).
Test Cases
from typing import List
class Solution:
def assignTasks(self, servers: List[int], tasks: List[int]) -> List[int]:
import heapq
available = []
busy = []
for i, w in enumerate(servers):
heapq.heappush(available, (w, i))
result = [0] * len(tasks)
for task_index, duration in enumerate(tasks):
current_time = task_index
while busy and busy[0][0] <= current_time:
free_time, weight, index = heapq.heappop(busy)
heapq.heappush(available, (weight, index))
if not available:
current_time = busy[0][0]
while busy and busy[0][0] <= current_time:
free_time, weight, index = heapq.heappop(busy)
heapq.heappush(available, (weight, index))
weight, index = heapq.heappop(available)
result[task_index] = index
heapq.heappush(
busy,
(current_time + duration, weight, index)
)
return result
s = Solution()
# Provided example 1
assert s.assignTasks(
[3,3,2],
[1,2,3,2,1,2]
) == [2,2,0,2,1,2]
# Provided example 2
assert s.assignTasks(
[5,1,4,3,2],
[2,1,2,4,5,2,1]
) == [1,4,1,4,1,3,2]
# Single server handles all tasks sequentially
assert s.assignTasks(
[1],
[5,2,3]
) == [0,0,0]
# Multiple identical servers
assert s.assignTasks(
[1,1,1],
[1,1,1,1]
) == [0,1,2,0]
# Tasks longer than arrival spacing
assert s.assignTasks(
[2,1],
[10,10,10]
) == [1,0,1]
# Servers free simultaneously
assert s.assignTasks(
[1,2],
[2,1,1]
) == [0,1,0]
# Large task durations
assert s.assignTasks(
[10,20],
[100000,100000,100000]
) == [0,1,0]
# Tie by weight resolved by index
assert s.assignTasks(
[5,5,5],
[1,1,1]
) == [0,1,2]
| Test | Why |
|---|---|
| Example 1 | Validates standard scheduling behavior |
| Example 2 | Validates simultaneous server releases |
| Single server | Ensures sequential processing works |
| Identical servers | Tests index tie breaking |
| Long running tasks | Ensures waiting queue behavior works |
| Simultaneous freeing | Verifies multiple servers released together |
| Large durations | Tests large time jumps |
| Equal weights | Confirms smallest index selection |
Edge Cases
One important edge case occurs when all servers are busy for a long time. In this situation, tasks continue arriving every second and build up in the queue. A naive implementation might simulate each second individually, causing severe performance issues. This implementation avoids that problem by jumping directly to the earliest server completion time using the busy heap.
Another tricky case occurs when multiple servers become free at exactly the same moment. If we only free one server before assigning tasks, the scheduling order may become incorrect. The implementation handles this correctly by releasing all servers whose free_time <= current_time before making assignments.
Tie breaking between equal server weights is another common source of bugs. The problem requires choosing the smallest index when weights are equal. Both heaps explicitly include the server index in their ordering rules, ensuring deterministic and correct behavior.
A final subtle case occurs when the current task time must jump forward because no servers are available. The algorithm correctly separates task_index from current_time, ensuring the answer array remains indexed by task order rather than simulated clock time.