LeetCode 1882 - Process Tasks Using Servers

This problem asks us to simulate a task scheduling system with two priority rules. We are given a list of servers and a list of tasks. Each server has a weight, and each task has a processing duration. At second j, task j becomes available and enters a queue.

LeetCode Problem 1882

Difficulty: 🟡 Medium
Topics: Array, Heap (Priority Queue)

Solution

Problem Understanding

This problem asks us to simulate a task scheduling system with two priority rules. We are given a list of servers and a list of tasks. Each server has a weight, and each task has a processing duration.

At second j, task j becomes available and enters a queue. Tasks must always be assigned in arrival order. Whenever there is at least one free server and at least one waiting task, we assign the earliest waiting task to the best available server.

The definition of the "best" server follows two priorities:

  1. Smaller server weight is better.
  2. If weights are equal, smaller server index is better.

Once a server starts processing a task, it becomes unavailable until the task finishes. If no servers are free when tasks are waiting, time effectively advances until at least one server becomes free again.

The goal is to return an array where ans[j] is the index of the server that processes task j.

The input arrays represent:

  • servers[i], the weight of server i
  • tasks[j], the amount of time task j takes to complete

The constraints are large:

  • Up to 2 * 10^5 servers
  • Up to 2 * 10^5 tasks

This immediately tells us that any quadratic solution will time out. We need something close to O(m log n) or O((m+n) log n).

Several edge cases are important:

  • Multiple servers can become free at the same time.
  • Multiple servers can have identical weights.
  • Tasks can pile up when all servers are busy.
  • There may be far more tasks than servers.
  • Servers may stay busy for long periods while tasks continue arriving every second.

A naive simulation that checks every server repeatedly would be far too slow.

Approaches

Brute Force Approach

A straightforward simulation would process tasks one second at a time.

For each arriving task:

  1. Scan all servers to find which ones are free.
  2. Among free servers, select the one with minimum (weight, index).
  3. If no server is free, advance time until one becomes available.
  4. Repeat for every task.

This approach is correct because it directly follows the problem rules. However, it is extremely inefficient.

Each task assignment could require scanning all n servers, leading to O(m * n) complexity. With both values reaching 2 * 10^5, this could require up to 4 * 10^10 operations, which is completely impractical.

Optimal Approach

The key observation is that we repeatedly need two efficient operations:

  1. Find the best currently available server.
  2. Find which busy server becomes free next.

Both operations are classic priority queue problems.

We use two heaps:

  • An available heap, ordered by (weight, index)
  • A busy heap, ordered by (free_time, weight, index)

The available heap always gives us the best server immediately.

The busy heap lets us efficiently determine which servers finish next.

Instead of simulating every second, we jump directly to important moments:

  • Task arrival times
  • Server completion times

This avoids unnecessary work and keeps the solution efficient.

Approach Time Complexity Space Complexity Notes
Brute Force O(m * n) O(n) Scans all servers repeatedly
Optimal O((m + n) log n) O(n) Uses two priority queues for efficient scheduling

Algorithm Walkthrough

  1. Initialize an available server heap.

Every server starts free, so we insert all servers into a min heap ordered by:

  • server weight
  • server index

Each heap entry looks like:

(weight, index)
  1. Initialize a busy server heap.

This heap stores servers currently processing tasks. It is ordered by:

  • time when server becomes free
  • server weight
  • server index

Each heap entry looks like:

(free_time, weight, index)
  1. Process tasks in order.

Task j arrives at time j. 4. Before assigning the current task, free all completed servers.

While the busy heap contains servers whose free_time <= current_time, remove them from the busy heap and push them back into the available heap.

This ensures the available heap always reflects the servers currently free. 5. Handle the case where no server is available.

If the available heap is empty, we cannot process the task immediately.

In this case:

  • Take the server with the earliest free_time from the busy heap.
  • Advance the current time to that free_time.
  • Free every server that becomes available at this same moment.
  1. Assign the task.

Pop the best server from the available heap.

Record its index in the answer array. 7. Mark the server busy again.

The task finishes at:

current_time + task_duration

Push the server into the busy heap with this new completion time. 8. Continue until all tasks are assigned.

Why it works

The algorithm maintains two invariants:

  • The available heap always contains exactly the currently free servers, ordered by scheduling priority.
  • The busy heap always contains exactly the busy servers, ordered by next completion time.

Whenever a task must be assigned, the algorithm either:

  • Uses the best currently free server, or
  • Advances time to the earliest possible moment a server becomes free.

Because tasks are processed strictly in insertion order and servers are always selected using the required priority rules, the algorithm exactly matches the scheduling behavior described in the problem.

Python Solution

from typing import List
import heapq

class Solution:
    def assignTasks(self, servers: List[int], tasks: List[int]) -> List[int]:
        available = []
        busy = []

        # (weight, index)
        for index, weight in enumerate(servers):
            heapq.heappush(available, (weight, index))

        result = [0] * len(tasks)

        for current_time, task_duration in enumerate(tasks):

            # Free all servers that have completed by current_time
            while busy and busy[0][0] <= current_time:
                free_time, weight, index = heapq.heappop(busy)
                heapq.heappush(available, (weight, index))

            # If no server is available, jump time forward
            if not available:
                current_time = busy[0][0]

                while busy and busy[0][0] <= current_time:
                    free_time, weight, index = heapq.heappop(busy)
                    heapq.heappush(available, (weight, index))

            # Assign task to best available server
            weight, index = heapq.heappop(available)

            result[current_time if False else len(result) - len(tasks) + current_time] = index

The previous line is incorrect because current_time may jump forward when servers are busy. We should instead store the task index separately. Here is the corrected complete solution:

from typing import List
import heapq

class Solution:
    def assignTasks(self, servers: List[int], tasks: List[int]) -> List[int]:
        available = []
        busy = []

        # Available servers:
        # (weight, index)
        for index, weight in enumerate(servers):
            heapq.heappush(available, (weight, index))

        result = [0] * len(tasks)

        for task_index, task_duration in enumerate(tasks):
            current_time = task_index

            # Move completed servers back to available heap
            while busy and busy[0][0] <= current_time:
                free_time, weight, index = heapq.heappop(busy)
                heapq.heappush(available, (weight, index))

            # If no servers are free, jump to next available time
            if not available:
                current_time = busy[0][0]

                while busy and busy[0][0] <= current_time:
                    free_time, weight, index = heapq.heappop(busy)
                    heapq.heappush(available, (weight, index))

            # Get best available server
            weight, index = heapq.heappop(available)

            result[task_index] = index

            # Mark server busy
            finish_time = current_time + task_duration
            heapq.heappush(busy, (finish_time, weight, index))

        return result

The implementation directly mirrors the algorithm.

The available heap stores free servers ordered by (weight, index). This guarantees that popping from the heap always gives the correct server according to the problem rules.

The busy heap stores servers currently processing tasks. Its ordering by finish_time allows us to efficiently determine which server becomes free next.

For every task, we first release all servers whose work has completed. Then we either immediately assign the task or advance time to the next server completion event.

The answer array records the assigned server index for each task in arrival order.

Go Solution

package main

import (
	"container/heap"
)

type AvailableServer struct {
	weight int
	index  int
}

type BusyServer struct {
	freeTime int
	weight   int
	index    int
}

type AvailableHeap []AvailableServer

func (h AvailableHeap) Len() int {
	return len(h)
}

func (h AvailableHeap) Less(i, j int) bool {
	if h[i].weight == h[j].weight {
		return h[i].index < h[j].index
	}
	return h[i].weight < h[j].weight
}

func (h AvailableHeap) Swap(i, j int) {
	h[i], h[j] = h[j], h[i]
}

func (h *AvailableHeap) Push(x interface{}) {
	*h = append(*h, x.(AvailableServer))
}

func (h *AvailableHeap) Pop() interface{} {
	old := *h
	n := len(old)
	item := old[n-1]
	*h = old[:n-1]
	return item
}

type BusyHeap []BusyServer

func (h BusyHeap) Len() int {
	return len(h)
}

func (h BusyHeap) Less(i, j int) bool {
	if h[i].freeTime == h[j].freeTime {
		if h[i].weight == h[j].weight {
			return h[i].index < h[j].index
		}
		return h[i].weight < h[j].weight
	}
	return h[i].freeTime < h[j].freeTime
}

func (h BusyHeap) Swap(i, j int) {
	h[i], h[j] = h[j], h[i]
}

func (h *BusyHeap) Push(x interface{}) {
	*h = append(*h, x.(BusyServer))
}

func (h *BusyHeap) Pop() interface{} {
	old := *h
	n := len(old)
	item := old[n-1]
	*h = old[:n-1]
	return item
}

func assignTasks(servers []int, tasks []int) []int {
	available := &AvailableHeap{}
	busy := &BusyHeap{}

	heap.Init(available)
	heap.Init(busy)

	for index, weight := range servers {
		heap.Push(available, AvailableServer{
			weight: weight,
			index:  index,
		})
	}

	result := make([]int, len(tasks))

	for taskIndex, duration := range tasks {
		currentTime := taskIndex

		// Free completed servers
		for busy.Len() > 0 && (*busy)[0].freeTime <= currentTime {
			server := heap.Pop(busy).(BusyServer)

			heap.Push(available, AvailableServer{
				weight: server.weight,
				index:  server.index,
			})
		}

		// No available server
		if available.Len() == 0 {
			currentTime = (*busy)[0].freeTime

			for busy.Len() > 0 && (*busy)[0].freeTime <= currentTime {
				server := heap.Pop(busy).(BusyServer)

				heap.Push(available, AvailableServer{
					weight: server.weight,
					index:  server.index,
				})
			}
		}

		server := heap.Pop(available).(AvailableServer)

		result[taskIndex] = server.index

		heap.Push(busy, BusyServer{
			freeTime: currentTime + duration,
			weight:   server.weight,
			index:    server.index,
		})
	}

	return result
}

The Go implementation follows the same logic as the Python solution but requires explicit heap types implementing the heap.Interface.

Go's container/heap package does not provide built in min heaps for tuples, so we define custom heap structures and comparison logic manually.

The solution uses int safely because the maximum possible time is within Go's integer range.

Worked Examples

Example 1

Input:

servers = [3,3,2]
tasks = [1,2,3,2,1,2]

Initial available heap:

[(2,2), (3,0), (3,1)]

Initial busy heap:

[]
Time Task Available Servers Busy Servers Assigned Server
0 1 (2,2),(3,0),(3,1) [] 2
1 2 (3,0),(3,1) server 2 finishes 2
2 3 (3,0),(3,1) server 2 busy until 3 0
3 2 (3,1) plus server 2 freed server 0 busy until 5 2
4 1 (3,1) servers 0 and 2 busy 1
5 2 all free again [] 2

Final answer:

[2,2,0,2,1,2]

Example 2

Input:

servers = [5,1,4,3,2]
tasks = [2,1,2,4,5,2,1]

Initial available heap:

[(1,1),(2,4),(4,2),(5,0),(3,3)]
Time Task Assigned Server Finish Time
0 2 1 2
1 1 4 2
2 2 1 4
3 4 4 7
4 5 1 9
5 2 3 7
6 1 2 7

Final answer:

[1,4,1,4,1,3,2]

Complexity Analysis

Measure Complexity Explanation
Time O((m + n) log n) Every heap operation costs O(log n)
Space O(n) Both heaps together store at most n servers

Each server is pushed and popped from heaps multiple times, but every heap operation costs only O(log n).

For every task:

  • We may move servers between heaps.
  • We assign exactly one server.

Since there are m tasks and heap sizes never exceed n, the total complexity is O((m + n) log n).

Test Cases

from typing import List

class Solution:
    def assignTasks(self, servers: List[int], tasks: List[int]) -> List[int]:
        import heapq

        available = []
        busy = []

        for i, w in enumerate(servers):
            heapq.heappush(available, (w, i))

        result = [0] * len(tasks)

        for task_index, duration in enumerate(tasks):
            current_time = task_index

            while busy and busy[0][0] <= current_time:
                free_time, weight, index = heapq.heappop(busy)
                heapq.heappush(available, (weight, index))

            if not available:
                current_time = busy[0][0]

                while busy and busy[0][0] <= current_time:
                    free_time, weight, index = heapq.heappop(busy)
                    heapq.heappush(available, (weight, index))

            weight, index = heapq.heappop(available)

            result[task_index] = index

            heapq.heappush(
                busy,
                (current_time + duration, weight, index)
            )

        return result

s = Solution()

# Provided example 1
assert s.assignTasks(
    [3,3,2],
    [1,2,3,2,1,2]
) == [2,2,0,2,1,2]

# Provided example 2
assert s.assignTasks(
    [5,1,4,3,2],
    [2,1,2,4,5,2,1]
) == [1,4,1,4,1,3,2]

# Single server handles all tasks sequentially
assert s.assignTasks(
    [1],
    [5,2,3]
) == [0,0,0]

# Multiple identical servers
assert s.assignTasks(
    [1,1,1],
    [1,1,1,1]
) == [0,1,2,0]

# Tasks longer than arrival spacing
assert s.assignTasks(
    [2,1],
    [10,10,10]
) == [1,0,1]

# Servers free simultaneously
assert s.assignTasks(
    [1,2],
    [2,1,1]
) == [0,1,0]

# Large task durations
assert s.assignTasks(
    [10,20],
    [100000,100000,100000]
) == [0,1,0]

# Tie by weight resolved by index
assert s.assignTasks(
    [5,5,5],
    [1,1,1]
) == [0,1,2]
Test Why
Example 1 Validates standard scheduling behavior
Example 2 Validates simultaneous server releases
Single server Ensures sequential processing works
Identical servers Tests index tie breaking
Long running tasks Ensures waiting queue behavior works
Simultaneous freeing Verifies multiple servers released together
Large durations Tests large time jumps
Equal weights Confirms smallest index selection

Edge Cases

One important edge case occurs when all servers are busy for a long time. In this situation, tasks continue arriving every second and build up in the queue. A naive implementation might simulate each second individually, causing severe performance issues. This implementation avoids that problem by jumping directly to the earliest server completion time using the busy heap.

Another tricky case occurs when multiple servers become free at exactly the same moment. If we only free one server before assigning tasks, the scheduling order may become incorrect. The implementation handles this correctly by releasing all servers whose free_time <= current_time before making assignments.

Tie breaking between equal server weights is another common source of bugs. The problem requires choosing the smallest index when weights are equal. Both heaps explicitly include the server index in their ordering rules, ensuring deterministic and correct behavior.

A final subtle case occurs when the current task time must jump forward because no servers are available. The algorithm correctly separates task_index from current_time, ensuring the answer array remains indexed by task order rather than simulated clock time.