LeetCode 1834 - Single-Threaded CPU

The problem asks us to simulate the execution of tasks on a single-threaded CPU. Each task has an enqueue time and a processing time.

LeetCode Problem 1834

Difficulty: 🟡 Medium
Topics: Array, Sorting, Heap (Priority Queue)

Solution

Problem Understanding

The problem asks us to simulate the execution of tasks on a single-threaded CPU. Each task has an enqueue time and a processing time. The CPU can process one task at a time and must select the next task according to two rules: first by shortest processing time, and then by smallest index if there is a tie. If no task is available, the CPU remains idle until the next task arrives.

The input is a list of tasks represented as tasks[i] = [enqueueTimei, processingTimei]. Each task is uniquely identified by its index in the original list. The output should be the order of task indices as they are processed by the CPU.

Constraints indicate that the input size can be up to 100,000 tasks, and enqueue/processing times can be as large as 10^9, which means we cannot use naive simulation methods that repeatedly scan all tasks at each time step. We need an efficient selection method, typically a priority queue (min-heap).

Important edge cases include tasks that have the same enqueue time, tasks with identical processing times, or when the CPU has to remain idle because the next task has not yet arrived.

Approaches

A brute-force approach would simulate every time unit from 0 to the maximum enqueue/processing time. At each unit, we would scan all tasks to find which are available and select the one with the smallest processing time. This approach is correct but extremely inefficient for large input sizes because it results in a time complexity of O(n * maxTime), which is unacceptable given the constraints.

The optimal approach uses two key insights:

  1. Sorting tasks by enqueue time allows us to process them in order of availability.
  2. Using a min-heap (priority queue) keyed by (processingTime, index) allows the CPU to efficiently select the next task with the shortest processing time and resolve ties by index in O(log n) per selection.

This approach reduces unnecessary scanning and directly simulates task processing events rather than every time unit.

Approach Time Complexity Space Complexity Notes
Brute Force O(n * maxTime) O(n) Simulate every time unit, scanning all tasks at each step
Optimal O(n log n) O(n) Sort tasks and use a heap to select the next task efficiently

Algorithm Walkthrough

  1. Attach indices: Pair each task with its original index for tie-breaking.

  2. Sort tasks: Sort the tasks by their enqueue time.

  3. Initialize heap: Use a min-heap to store tasks that are available, keyed first by processing time and then by index.

  4. Simulate CPU: Track the current time. For each iteration:

  5. Add all tasks whose enqueue time is <= current time to the heap.

  6. If the heap is not empty, pop the task with the smallest processing time (tie broken by index) and execute it, updating the current time.

  7. If the heap is empty, advance the current time to the enqueue time of the next task.

  8. Repeat until all tasks are processed.

Why it works: The heap ensures the CPU always selects the shortest available task according to the rules. Sorting by enqueue time ensures tasks are considered in the correct order. By advancing time only when necessary, we avoid idle simulation.

Python Solution

from typing import List
import heapq

class Solution:
    def getOrder(self, tasks: List[List[int]]) -> List[int]:
        # Attach original indices to tasks
        indexed_tasks = [(enqueue, process, i) for i, (enqueue, process) in enumerate(tasks)]
        # Sort tasks by enqueue time
        indexed_tasks.sort()
        
        result = []
        min_heap = []
        time = 0
        i = 0
        n = len(tasks)
        
        while len(result) < n:
            # Add all tasks that are now available to the heap
            while i < n and indexed_tasks[i][0] <= time:
                enqueue, process, idx = indexed_tasks[i]
                heapq.heappush(min_heap, (process, idx))
                i += 1
            
            if min_heap:
                process, idx = heapq.heappop(min_heap)
                time += process
                result.append(idx)
            else:
                # If no task is available, jump time to next task's enqueue time
                time = indexed_tasks[i][0]
        
        return result

Explanation: The Python solution uses a heap to store available tasks. At each step, all newly available tasks are pushed onto the heap. The CPU processes the task at the top of the heap, ensuring the shortest processing time is chosen. If the heap is empty, the CPU jumps to the next task's enqueue time to avoid idle time.

Go Solution

package main

import (
    "container/heap"
    "sort"
)

type Task struct {
    enqueue int
    process int
    index   int
}

type TaskHeap []Task

func (h TaskHeap) Len() int { return len(h) }
func (h TaskHeap) Less(i, j int) bool {
    if h[i].process == h[j].process {
        return h[i].index < h[j].index
    }
    return h[i].process < h[j].process
}
func (h TaskHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h *TaskHeap) Push(x interface{}) {
    *h = append(*h, x.(Task))
}
func (h *TaskHeap) Pop() interface{} {
    old := *h
    n := len(old)
    x := old[n-1]
    *h = old[:n-1]
    return x
}

func getOrder(tasks [][]int) []int {
    n := len(tasks)
    indexedTasks := make([]Task, n)
    for i, t := range tasks {
        indexedTasks[i] = Task{enqueue: t[0], process: t[1], index: i}
    }
    sort.Slice(indexedTasks, func(i, j int) bool {
        return indexedTasks[i].enqueue < indexedTasks[j].enqueue
    })
    
    result := make([]int, 0, n)
    h := &TaskHeap{}
    heap.Init(h)
    
    time := 0
    i := 0
    
    for len(result) < n {
        for i < n && indexedTasks[i].enqueue <= time {
            heap.Push(h, indexedTasks[i])
            i++
        }
        if h.Len() > 0 {
            t := heap.Pop(h).(Task)
            time += t.process
            result = append(result, t.index)
        } else {
            time = indexedTasks[i].enqueue
        }
    }
    
    return result
}

Go-specific notes: In Go, we define a custom TaskHeap type that implements the heap.Interface methods. The heap is ordered by processing time, with ties broken by the original index. The logic mirrors the Python version, with explicit slice handling and heap operations.

Worked Examples

Example 1: [[1,2],[2,4],[3,2],[4,1]]

Time Available Tasks Heap State (process, idx) CPU Action Result
1 0 (2,0) process 0 [0]
2 1 (4,1) idle [0]
3 2 (2,2),(4,1) process 2 [0,2]
4 3 (1,3),(4,1) idle [0,2]
5 - (1,3),(4,1) process 3 [0,2,3]
6 - (4,1) process 1 [0,2,3,1]

Example 2: [[7,10],[7,12],[7,5],[7,4],[7,2]]

All tasks become available at time 7, heap orders them by processing time:

Heap sequence: (2,4),(4,3),(5,2),(10,0),(12,1) → Result [4,3,2,0,1]

Complexity Analysis

Measure Complexity Explanation
Time O(n log n) Sorting tasks is O(n log n), and each task is pushed/popped from the heap once, each O(log n)
Space O(n) Heap may contain up to n tasks, and we store indexed tasks and the result array

The dominant term is the sorting step and heap operations. Space scales linearly with the number of tasks.

Test Cases

# Provided examples
assert Solution().getOrder([[1,2],[2,4],[3,2],[4,1]]) == [0,2,3,1]  # Example 1
assert Solution().getOrder([[7,10],[7,12],[7,5],[7,4],[7,2]]) == [4,3,2,0,1]  # Example 2