LeetCode 3422 - Minimum Operations to Make Subarray Elements Equal

We are given an array nums and an integer k. In a single operation, we may increase or decrease any element by exactly 1. Our goal is not to make the entire array equal. Instead, we only need at least one contiguous subarray of length k to consist entirely of the same value.

LeetCode Problem 3422

Difficulty: 🟡 Medium
Topics: Array, Hash Table, Math, Sliding Window, Heap (Priority Queue)

Solution

LeetCode 3422 - Minimum Operations to Make Subarray Elements Equal

Problem Understanding

We are given an array nums and an integer k. In a single operation, we may increase or decrease any element by exactly 1.

Our goal is not to make the entire array equal. Instead, we only need at least one contiguous subarray of length k to consist entirely of the same value.

For any chosen subarray of length k, we may freely modify its elements. The cost of modifying that subarray is the total number of increment and decrement operations required to transform all of its values into a single common value.

The task is therefore:

  1. Consider every contiguous subarray of length k.
  2. Compute the minimum cost needed to make all elements in that subarray equal.
  3. Return the smallest such cost among all windows.

The constraints are large:

  • n = nums.length can be as large as 100000
  • Values range from -10^6 to 10^6

Because there are up to one hundred thousand elements, any solution that recomputes costs from scratch for every window will be far too slow.

Several edge cases are worth noticing immediately:

  • A window may already contain all equal values, producing cost 0.
  • Numbers may be negative.
  • k may equal n, meaning there is only one window.
  • Large positive and negative values require using 64-bit arithmetic for the operation count.
  • Multiple copies of the median may exist, which can complicate sliding-window median maintenance.

Approaches

Brute Force

For every subarray of length k, we could sort the window and compute the cost of converting all values to the median.

The median is optimal because minimizing

$$\sum |x_i - t|$$

is achieved by choosing a median value.

For each window:

  1. Extract the k elements.
  2. Sort them.
  3. Find the median.
  4. Sum all absolute differences from the median.

This produces the correct answer because every window is evaluated exactly and the minimum cost is returned.

Unfortunately, there are O(n) windows, and sorting each window costs O(k log k), leading to:

$$O(nk\log k)$$

which is far too slow when both n and k can reach 100000.

Key Insight

For a fixed window, the minimum cost equals the sum of distances to the window's median.

Therefore, the problem becomes:

Find the minimum median-distance cost among all sliding windows of size k.

This is a classic sliding-window median problem.

We maintain:

  • A max-heap containing the smaller half of the window.
  • A min-heap containing the larger half.
  • The sum of elements stored in each heap.

Once we know:

  • median = largest element in the left heap
  • count and sum of each side

the cost can be computed in constant time.

Suppose:

  • left heap contains L elements with sum leftSum
  • right heap contains R elements with sum rightSum
  • median = m

Then:

$$\text{cost} = m \cdot L - leftSum + rightSum - m \cdot R$$

because every left-side value contributes m - x, and every right-side value contributes x - m.

The remaining challenge is supporting insertion and deletion while the window slides. We use lazy deletion together with two heaps, giving an overall complexity of O(n log k).

Approach Comparison

Approach Time Complexity Space Complexity Notes
Brute Force O(nk log k) O(k) Sort every window independently
Optimal O(n log k) O(k) Sliding median with two heaps and lazy deletion

Algorithm Walkthrough

  1. Maintain two heaps:
  • small is a max-heap storing the smaller half of the window.
  • large is a min-heap storing the larger half.
  1. Maintain:
  • smallSum, sum of values currently belonging to small.
  • largeSum, sum of values currently belonging to large.
  1. Keep the invariant that:
  • small contains either the same number of valid elements as large, or exactly one more.
  • Therefore, the median is always the top of small.
  1. Insert elements into the appropriate heap:
  • If the value is less than or equal to the current median, place it in small.
  • Otherwise place it in large.
  1. When the window slides, remove the outgoing element.
  • Direct heap deletion is expensive.
  • Instead, mark the value in a hash map called delayed.
  • When a marked value reaches the top of its heap, physically remove it.
  1. After every insertion or deletion, rebalance the heaps.
  • Move elements between heaps until the size invariant is restored.
  1. Once a full window of size k exists:
  • Let m be the median.
  • Let L and R be the numbers of valid elements in the two heaps.
  • Compute

$$mL - leftSum + rightSum - mR$$

which equals the minimum cost for that window. 8. Track the minimum cost over all windows.

Why it works

For any set of values, the median minimizes the sum of absolute deviations. Therefore the optimal target value for each window is always a median of that window. The two-heap structure continuously maintains the median while the window slides. Since the cost formula exactly computes the sum of distances to the median, every window's optimal cost is evaluated correctly, and taking the minimum over all windows yields the answer.

Problem Understanding

The problem asks us to determine the minimum number of operations required to make all elements equal in at least one contiguous subarray of size k from the input array nums. An operation consists of either incrementing or decrementing a single element by 1. The input array nums contains integers that can be negative, zero, or positive, and k is guaranteed to be at least 2 and at most the length of the array. The expected output is a single integer representing the minimum total operations required.

Restated, we are trying to find a window of size k in which the elements can be equalized with the fewest total increments/decrements. The constraints indicate that the input array can be large (nums.length up to 10^5), so a brute-force approach that checks all possible value conversions for each window would be too slow. The elements themselves can range from -10^6 to 10^6, so we must handle negative numbers carefully. Edge cases include subarrays that are already equal, arrays with all identical elements, and windows with extreme values that require careful sum calculations.

Approaches

Brute-Force Approach

A naive solution is to examine every subarray of length k and calculate the number of operations to make all elements equal for each subarray. For a given subarray, the optimal target value to equalize the elements is the median, since the sum of absolute differences from the median is minimized. For each subarray:

  1. Extract the subarray of length k.
  2. Sort the subarray to find its median.
  3. Compute the sum of absolute differences between each element and the median.
  4. Track the minimum sum across all subarrays.

While correct, this approach is too slow because sorting each subarray individually is O(k log k), and there are O(n) subarrays, giving a total complexity of O(n k log k), which is infeasible for n up to 10^5.

Optimal Approach

The key observation is that we do not need to sort every subarray from scratch. By maintaining a sliding window of size k and using a balanced data structure (like a sorted list or two heaps) to keep track of the current window, we can efficiently find the median and the total absolute difference as the window moves.

The steps are:

  1. Maintain a multiset or a pair of heaps (max-heap for the left half, min-heap for the right half) to quickly access the median of the current window.
  2. Maintain a running sum of differences relative to the median to compute the number of operations.
  3. Slide the window by removing the leftmost element and inserting the next element, updating the heaps and median accordingly.
  4. Track the minimum number of operations across all windows.

This reduces redundant sorting and enables O(n log k) time complexity because each insertion/removal into the balanced data structure costs O(log k).

Approach Time Complexity Space Complexity Notes
Brute Force O(n k log k) O(k) Sorts every window to find median; too slow for large n
Optimal O(n log k) O(k) Sliding window with median tracking via heaps or sorted list

Algorithm Walkthrough

  1. Initialize two heaps, lo (max-heap for left half) and hi (min-heap for right half), to maintain the lower and upper halves of the current window. This ensures we can access the median in O(1) time.
  2. Initialize variables lo_sum and hi_sum to store the sum of elements in each heap. This allows efficient calculation of the sum of absolute differences from the median.
  3. Populate the first window of size k into the heaps, balancing them so that lo may have one more element than hi if k is odd. Compute the initial number of operations using the median.
  4. Slide the window through the array:
  • Remove the element that is leaving the window from the appropriate heap and update lo_sum or hi_sum.
  • Insert the new element into the appropriate heap and rebalance if necessary.
  • Update lo_sum and hi_sum.
  • Compute the total operations for the current median using median * len(lo) - lo_sum + hi_sum - median * len(hi).
  • Update the running minimum of operations.
  1. After processing all windows, return the minimum operations.

Why it works: The median minimizes the sum of absolute differences in a set. By maintaining a balanced data structure as we slide the window, we guarantee that at each step we compute the optimal number of operations for the current window efficiently. The invariant is that the heaps always represent the lower and upper halves of the window, ensuring the median is correct.

Python Solution

from typing import List
import heapq
from collections import defaultdict

class DualHeap:
    def __init__(self):
        self.small = []  # max heap via negatives
        self.large = []  # min heap

        self.delayed = defaultdict(int)

        self.small_size = 0
        self.large_size = 0

        self.small_sum = 0
        self.large_sum = 0

    def prune_small(self) -> None:
        while self.small:
            num = -self.small[0]
            if self.delayed[num]:
                heapq.heappop(self.small)
                self.delayed[num] -= 1
            else:
                break

    def prune_large(self) -> None:
        while self.large:
            num = self.large[0]
            if self.delayed[num]:
                heapq.heappop(self.large)
                self.delayed[num] -= 1
            else:
                break

    def rebalance(self) -> None:
        if self.small_size > self.large_size + 1:
            value = -heapq.heappop(self.small)

            self.small_size -= 1
            self.small_sum -= value

            heapq.heappush(self.large, value)
            self.large_size += 1
            self.large_sum += value

            self.prune_small()

        elif self.small_size < self.large_size:
            value = heapq.heappop(self.large)

            self.large_size -= 1
            self.large_sum -= value

            heapq.heappush(self.small, -value)
            self.small_size += 1
            self.small_sum += value

            self.prune_large()

    def add(self, num: int) -> None:
        if not self.small or num <= -self.small[0]:
            heapq.heappush(self.small, -num)
            self.small_size += 1
            self.small_sum += num
        else:
            heapq.heappush(self.large, num)
            self.large_size += 1
            self.large_sum += num

        self.rebalance()

    def remove(self, num: int) -> None:
        self.delayed[num] += 1

        if num <= -self.small[0]:
            self.small_size -= 1
            self.small_sum -= num

            if num == -self.small[0]:
                self.prune_small()
        else:
            self.large_size -= 1
            self.large_sum -= num

            if self.large and num == self.large[0]:
                self.prune_large()

        self.rebalance()

    def cost(self) -> int:
        median = -self.small[0]

        left_cost = median * self.small_size - self.small_sum
        right_cost = self.large_sum - median * self.large_size

        return left_cost + right_cost

class Solution:
    def minOperations(self, nums: List[int], k: int) -> int:
        dh = DualHeap()

        for i in range(k):
            dh.add(nums[i])

        answer = dh.cost()

        for i in range(k, len(nums)):
            dh.add(nums[i])
            dh.remove(nums[i - k])

            answer = min(answer, dh.cost())

        return answer

Implementation Explanation

The DualHeap class maintains the current window.

The small heap stores the lower half of the values and exposes the median through its top element. The large heap stores the upper half.

Because arbitrary deletions from a heap are expensive, the implementation uses lazy deletion. Removed values are recorded in delayed. Whenever such a value reaches the top of a heap, it is physically discarded.

The sums of each heap are maintained continuously. This allows the window cost to be computed in constant time through the median-distance formula.

The main solution initializes the first window, computes its cost, then repeatedly adds the incoming value and removes the outgoing value as the window slides across the array.

class Solution: def minOperations(self, nums: List[int], k: int) -> int: if k == 1: return 0 # any single element is already equal

    # Helper functions for heaps
    def add(num):
        if not lo or num <= -lo[0]:
            heapq.heappush(lo, -num)
            nonlocal lo_sum
            lo_sum += num
        else:
            heapq.heappush(hi, num)
            nonlocal hi_sum
            hi_sum += num
        balance()

    def remove(num):
        if num <= -lo[0]:
            lo.remove(-num)
            heapq.heapify(lo)
            nonlocal lo_sum
            lo_sum -= num
        else:
            hi.remove(num)
            heapq.heapify(hi)
            nonlocal hi_sum
            hi_sum -= num
        balance()

    def balance():
        while len(lo) > len(hi) + 1:
            val = -heapq.heappop(lo)
            lo_sum -= val
            heapq.heappush(hi, val)
            nonlocal hi_sum
            hi_sum += val
        while len(hi) > len(lo):
            val = heapq.heappop(hi)
            hi_sum -= val
            heapq.heappush(lo, -val)
            nonlocal lo_sum
            lo_sum += val

    lo, hi = [], []
    lo_sum, hi_sum = 0, 0
    n = len(nums)
    # Initialize first window
    for i in range(k):
        add(nums[i])
    median = -lo[0]
    min_ops = median * len(lo) - lo_sum + hi_sum - median * len(hi)

    for i in range(k, n):
        remove(nums[i - k])
        add(nums[i])
        median = -lo[0]
        ops = median * len(lo) - lo_sum + hi_sum - median * len(hi)
        min_ops = min(min_ops, ops)

    return min_ops

**Implementation Walkthrough:** The Python solution uses two heaps to maintain the current window's lower and upper halves. The helper functions `add`, `remove`, and `balance` ensure the heaps stay balanced after each insertion or deletion. `lo_sum` and `hi_sum` allow computation of total operations efficiently without iterating over the window each time. Sliding the window from left to right updates the heaps and median continuously, tracking the minimal operations.

## Go Solution

```go
package main

import (
	"container/heap"
)

type MaxHeap []int
type MinHeap []int

func (h MaxHeap) Len() int           { return len(h) }
func (h MaxHeap) Less(i, j int) bool { return h[i] > h[j] }
func (h MaxHeap) Swap(i, j int)      { h[i], h[j] = h[j], h[i] }

func (h *MaxHeap) Push(x interface{}) {
	*h = append(*h, x.(int))
}

func (h *MaxHeap) Pop() interface{} {
	old := *h
	n := len(old)
	x := old[n-1]
	*h = old[:n-1]
	return x
}

func (h MinHeap) Len() int           { return len(h) }
func (h MinHeap) Less(i, j int) bool { return h[i] < h[j] }
func (h MinHeap) Swap(i, j int)      { h[i], h[j] = h[j], h[i] }

func (h *MinHeap) Push(x interface{}) {
	*h = append(*h, x.(int))
}

func (h *MinHeap) Pop() interface{} {
	old := *h
	n := len(old)
	x := old[n-1]
	*h = old[:n-1]
	return x
}

type DualHeap struct {
	small MaxHeap
	large MinHeap

	delayed map[int]int

	smallSize int
	largeSize int

	smallSum int64
	largeSum int64
}

func NewDualHeap() *DualHeap {
	dh := &DualHeap{
		delayed: make(map[int]int),
	}
	heap.Init(&dh.small)
	heap.Init(&dh.large)
	return dh
}

func (dh *DualHeap) pruneSmall() {
	for dh.small.Len() > 0 {
		x := dh.small[0]
		if dh.delayed[x] > 0 {
			heap.Pop(&dh.small)
			dh.delayed[x]--
		} else {
			break
		}
	}
}

func (dh *DualHeap) pruneLarge() {
	for dh.large.Len() > 0 {
		x := dh.large[0]
		if dh.delayed[x] > 0 {
			heap.Pop(&dh.large)
			dh.delayed[x]--
		} else {
			break
		}
	}
}

func (dh *DualHeap) rebalance() {
	if dh.smallSize > dh.largeSize+1 {
		x := heap.Pop(&dh.small).(int)

		dh.smallSize--
		dh.smallSum -= int64(x)

		heap.Push(&dh.large, x)
		dh.largeSize++
		dh.largeSum += int64(x)

		dh.pruneSmall()
	} else if dh.smallSize < dh.largeSize {
		x := heap.Pop(&dh.large).(int)

		dh.largeSize--
		dh.largeSum -= int64(x)

		heap.Push(&dh.small, x)
		dh.smallSize++
		dh.smallSum += int64(x)

		dh.pruneLarge()
	}
}

func (dh *DualHeap) add(num int) {
	if dh.small.Len() == 0 || num <= dh.small[0] {
		heap.Push(&dh.small, num)
		dh.smallSize++
		dh.smallSum += int64(num)
	} else {
		heap.Push(&dh.large, num)
		dh.largeSize++
		dh.largeSum += int64(num)
	}

	dh.rebalance()
}

func (dh *DualHeap) remove(num int) {
	dh.delayed[num]++

	if num <= dh.small[0] {
		dh.smallSize--
		dh.smallSum -= int64(num)

		if num == dh.small[0] {
			dh.pruneSmall()
		}
	} else {
		dh.largeSize--
		dh.largeSum -= int64(num)

		if dh.large.Len() > 0 && num == dh.large[0] {
			dh.pruneLarge()
		}
	}

	dh.rebalance()
}

func (dh *DualHeap) cost() int64 {
	median := int64(dh.small[0])

	leftCost := median*int64(dh.smallSize) - dh.smallSum
	rightCost := dh.largeSum - median*int64(dh.largeSize)

	return leftCost + rightCost
}

func minOperations(nums []int, k int) int64 {
	dh := NewDualHeap()

	for i := 0; i < k; i++ {
		dh.add(nums[i])
	}

	ans := dh.cost()

	for i := k; i < len(nums); i++ {
		dh.add(nums[i])
		dh.remove(nums[i-k])

		cost := dh.cost()
		if cost < ans {
			ans = cost
		}
	}

	return ans
}

Go-Specific Notes

The Go implementation uses container/heap to build both the max-heap and min-heap. Because operation counts can exceed 32-bit limits, all accumulated sums and costs are stored as int64.

The lazy-deletion map works exactly like the Python version and ensures that window removals remain logarithmic.

Worked Examples

Example 1

Input:

nums = [4, -3, 2, 1, -4, 6]
k = 3

Window 1:

[4, -3, 2]

Sorted:

[-3, 2, 4]

Median:

2

Cost:

Value Distance to 2
4 2
-3 5
2 0

Total:

7

Window 2:

[-3, 2, 1]

Sorted:

[-3, 1, 2]

Median:

1

Cost:

Value Distance to 1
-3 4
2 1
1 0

Total:

5

Window 3:

[2, 1, -4]

Median:

1

Cost:

1 + 0 + 5 = 6

Window 4:

[1, -4, 6]

Median:

1

Cost:

0 + 5 + 5 = 10

Minimum:

min(7, 5, 6, 10) = 5

Answer:

5

Example 2

Input:

nums = [-2, -2, 3, 1, 4]
k = 2

First window:

[-2, -2]

Median:

-2

Cost:

0

Since a zero-cost window already exists, the final answer is:

0

Complexity Analysis

Measure Complexity Explanation
Time O(n log k) Each insertion and removal costs O(log k)
Space O(k) Heaps and lazy-deletion map store window elements

The algorithm processes each element a constant number of times. Every heap insertion, removal, and rebalance operation takes logarithmic time in the window size, yielding an overall complexity of O(n log k). The heaps together contain at most k active elements, so memory usage is O(k).

Test Cases

sol = Solution()

assert sol.minOperations([4, -3, 2, 1, -4, 6], 3) == 5  # example 1
assert sol.minOperations([-2, -2, 3, 1, 4], 2) == 0  # example 2

assert sol.minOperations([1, 1], 2) == 0  # smallest valid input
assert sol.minOperations([1, 2], 2) == 1  # one change needed

assert sol.minOperations([5, 5, 5, 5], 3) == 0  # already equal
assert sol.minOperations([1, 2, 3, 4], 4) == 4  # entire array is the window

assert sol.minOperations([-5, -1, -3], 3) == 4  # negative values
assert sol.minOperations([1, 100, 1], 2) == 99  # large gap

assert sol.minOperations([1, 3, 2, 2, 2], 3) == 0  # later window already equal
assert sol.minOperations([10, 0, 10, 0, 10], 3) == 10  # alternating values

assert sol.minOperations([0, 0, 1000000, 0, 0], 2) == 0  # large magnitude values

Test Summary

Test Why
[4,-3,2,1,-4,6], k=3 Official example
[-2,-2,3,1,4], k=2 Official example with answer 0
[1,1], k=2 Smallest valid size
[1,2], k=2 Single operation needed
[5,5,5,5], k=3 All values already equal
[1,2,3,4], k=4 Entire array forms one window
[-5,-1,-3], k=3 Negative numbers
[1,100,1], k=2 Large difference between values
[1,3,2,2,2], k=3 Optimal window appears later
[10,0,10,0,10], k=3 Repeated sliding median updates
[0,0,1000000,0,0], k=2 Large magnitude values and zero-cost window

Edge Cases

Window Already Contains Equal Elements

A common mistake is assuming every window requires modifications. If some window already consists entirely of the same value, the answer should be 0.

For example:

[-2, -2, 3, 1, 4]
k = 2

The implementation naturally handles this because the median-distance cost becomes zero.

k Equals n

When k equals the array length, there is only one valid window. Some sliding-window implementations accidentally assume at least one slide occurs.

For example:

[1, 2, 3, 4]
k = 4

The algorithm computes the first window cost and immediately returns it if no further windows exist.

Large Positive and Negative Values

Values may range from -10^6 to 10^6, and the total operation count can become much larger than a 32-bit integer.

For example:

[-1000000, 1000000, -1000000]

The implementation stores heap sums and costs using 64-bit arithmetic (int64 in Go, arbitrary precision integers in Python), preventing overflow.

Many Duplicate Values

Lazy deletion can become tricky when several identical values exist in both heaps.

For example:

[2, 2, 2, 2, 2]

The delayed-removal map tracks counts rather than individual positions. Each duplicate is removed exactly once when it reaches the heap top, ensuring correctness even with large numbers of repeated values. "sort" )

type IntHeap []int

func (h IntHeap) Len() int { return len(h) } func (h IntHeap) Less(i, j int) bool { return h[i] < h[j] } func (h IntHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }

func (h *IntHeap) Push(x interface{}) { *h = append(*h, x.(int)) } func (h *IntHeap) Pop() interface{} { old := *h n := len(old) x := old[n-1] *h = old[0 : n-1] return x }

func minOperations(nums []int, k int) int64 { n := len(nums) if k == 1 { return 0 } window := make([]int, k) copy(window, nums[:k]) sort.Ints(window) medianIndex := (k - 1) / 2 median := window[medianIndex] var loSum, hiSum int64 for i, val := range window { if i <= medianIndex { loSum += int64(val) } else { hiSum += int64(val) } } minOps := medianint64(len(window[:medianIndex+1])) - loSum + hiSum - medianint64(len(window[medianIndex+1:])) for i := k; i < n; i++ { window = append(window[1:], nums[i]) sort.Ints(window) median = window[medianIndex] loSum, hiSum = 0, 0 for j, val := range window { if j <= medianIndex { loSum += int64(val) } else { hiSum += int64(val) } } ops := medianint64(len(window[:medianIndex+1])) - loSum + hiSum - medianint64(len(window[medianIndex+1:])) if ops < minOps { minOps = ops }