LeetCode 3422 - Minimum Operations to Make Subarray Elements Equal
We are given an array nums and an integer k. In a single operation, we may increase or decrease any element by exactly 1. Our goal is not to make the entire array equal. Instead, we only need at least one contiguous subarray of length k to consist entirely of the same value.
Difficulty: 🟡 Medium
Topics: Array, Hash Table, Math, Sliding Window, Heap (Priority Queue)
Solution
LeetCode 3422 - Minimum Operations to Make Subarray Elements Equal
Problem Understanding
We are given an array nums and an integer k. In a single operation, we may increase or decrease any element by exactly 1.
Our goal is not to make the entire array equal. Instead, we only need at least one contiguous subarray of length k to consist entirely of the same value.
For any chosen subarray of length k, we may freely modify its elements. The cost of modifying that subarray is the total number of increment and decrement operations required to transform all of its values into a single common value.
The task is therefore:
- Consider every contiguous subarray of length
k. - Compute the minimum cost needed to make all elements in that subarray equal.
- Return the smallest such cost among all windows.
The constraints are large:
n = nums.lengthcan be as large as100000- Values range from
-10^6to10^6
Because there are up to one hundred thousand elements, any solution that recomputes costs from scratch for every window will be far too slow.
Several edge cases are worth noticing immediately:
- A window may already contain all equal values, producing cost
0. - Numbers may be negative.
kmay equaln, meaning there is only one window.- Large positive and negative values require using 64-bit arithmetic for the operation count.
- Multiple copies of the median may exist, which can complicate sliding-window median maintenance.
Approaches
Brute Force
For every subarray of length k, we could sort the window and compute the cost of converting all values to the median.
The median is optimal because minimizing
$$\sum |x_i - t|$$
is achieved by choosing a median value.
For each window:
- Extract the
kelements. - Sort them.
- Find the median.
- Sum all absolute differences from the median.
This produces the correct answer because every window is evaluated exactly and the minimum cost is returned.
Unfortunately, there are O(n) windows, and sorting each window costs O(k log k), leading to:
$$O(nk\log k)$$
which is far too slow when both n and k can reach 100000.
Key Insight
For a fixed window, the minimum cost equals the sum of distances to the window's median.
Therefore, the problem becomes:
Find the minimum median-distance cost among all sliding windows of size
k.
This is a classic sliding-window median problem.
We maintain:
- A max-heap containing the smaller half of the window.
- A min-heap containing the larger half.
- The sum of elements stored in each heap.
Once we know:
- median = largest element in the left heap
- count and sum of each side
the cost can be computed in constant time.
Suppose:
- left heap contains
Lelements with sumleftSum - right heap contains
Relements with sumrightSum - median =
m
Then:
$$\text{cost} = m \cdot L - leftSum + rightSum - m \cdot R$$
because every left-side value contributes m - x, and every right-side value contributes x - m.
The remaining challenge is supporting insertion and deletion while the window slides. We use lazy deletion together with two heaps, giving an overall complexity of O(n log k).
Approach Comparison
| Approach | Time Complexity | Space Complexity | Notes |
|---|---|---|---|
| Brute Force | O(nk log k) | O(k) | Sort every window independently |
| Optimal | O(n log k) | O(k) | Sliding median with two heaps and lazy deletion |
Algorithm Walkthrough
- Maintain two heaps:
smallis a max-heap storing the smaller half of the window.largeis a min-heap storing the larger half.
- Maintain:
smallSum, sum of values currently belonging tosmall.largeSum, sum of values currently belonging tolarge.
- Keep the invariant that:
smallcontains either the same number of valid elements aslarge, or exactly one more.- Therefore, the median is always the top of
small.
- Insert elements into the appropriate heap:
- If the value is less than or equal to the current median, place it in
small. - Otherwise place it in
large.
- When the window slides, remove the outgoing element.
- Direct heap deletion is expensive.
- Instead, mark the value in a hash map called
delayed. - When a marked value reaches the top of its heap, physically remove it.
- After every insertion or deletion, rebalance the heaps.
- Move elements between heaps until the size invariant is restored.
- Once a full window of size
kexists:
- Let
mbe the median. - Let
LandRbe the numbers of valid elements in the two heaps. - Compute
$$mL - leftSum + rightSum - mR$$
which equals the minimum cost for that window. 8. Track the minimum cost over all windows.
Why it works
For any set of values, the median minimizes the sum of absolute deviations. Therefore the optimal target value for each window is always a median of that window. The two-heap structure continuously maintains the median while the window slides. Since the cost formula exactly computes the sum of distances to the median, every window's optimal cost is evaluated correctly, and taking the minimum over all windows yields the answer.
Problem Understanding
The problem asks us to determine the minimum number of operations required to make all elements equal in at least one contiguous subarray of size k from the input array nums. An operation consists of either incrementing or decrementing a single element by 1. The input array nums contains integers that can be negative, zero, or positive, and k is guaranteed to be at least 2 and at most the length of the array. The expected output is a single integer representing the minimum total operations required.
Restated, we are trying to find a window of size k in which the elements can be equalized with the fewest total increments/decrements. The constraints indicate that the input array can be large (nums.length up to 10^5), so a brute-force approach that checks all possible value conversions for each window would be too slow. The elements themselves can range from -10^6 to 10^6, so we must handle negative numbers carefully. Edge cases include subarrays that are already equal, arrays with all identical elements, and windows with extreme values that require careful sum calculations.
Approaches
Brute-Force Approach
A naive solution is to examine every subarray of length k and calculate the number of operations to make all elements equal for each subarray. For a given subarray, the optimal target value to equalize the elements is the median, since the sum of absolute differences from the median is minimized. For each subarray:
- Extract the subarray of length
k. - Sort the subarray to find its median.
- Compute the sum of absolute differences between each element and the median.
- Track the minimum sum across all subarrays.
While correct, this approach is too slow because sorting each subarray individually is O(k log k), and there are O(n) subarrays, giving a total complexity of O(n k log k), which is infeasible for n up to 10^5.
Optimal Approach
The key observation is that we do not need to sort every subarray from scratch. By maintaining a sliding window of size k and using a balanced data structure (like a sorted list or two heaps) to keep track of the current window, we can efficiently find the median and the total absolute difference as the window moves.
The steps are:
- Maintain a multiset or a pair of heaps (max-heap for the left half, min-heap for the right half) to quickly access the median of the current window.
- Maintain a running sum of differences relative to the median to compute the number of operations.
- Slide the window by removing the leftmost element and inserting the next element, updating the heaps and median accordingly.
- Track the minimum number of operations across all windows.
This reduces redundant sorting and enables O(n log k) time complexity because each insertion/removal into the balanced data structure costs O(log k).
| Approach | Time Complexity | Space Complexity | Notes |
|---|---|---|---|
| Brute Force | O(n k log k) | O(k) | Sorts every window to find median; too slow for large n |
| Optimal | O(n log k) | O(k) | Sliding window with median tracking via heaps or sorted list |
Algorithm Walkthrough
- Initialize two heaps,
lo(max-heap for left half) andhi(min-heap for right half), to maintain the lower and upper halves of the current window. This ensures we can access the median in O(1) time. - Initialize variables
lo_sumandhi_sumto store the sum of elements in each heap. This allows efficient calculation of the sum of absolute differences from the median. - Populate the first window of size
kinto the heaps, balancing them so thatlomay have one more element thanhiifkis odd. Compute the initial number of operations using the median. - Slide the window through the array:
- Remove the element that is leaving the window from the appropriate heap and update
lo_sumorhi_sum. - Insert the new element into the appropriate heap and rebalance if necessary.
- Update
lo_sumandhi_sum. - Compute the total operations for the current median using
median * len(lo) - lo_sum + hi_sum - median * len(hi). - Update the running minimum of operations.
- After processing all windows, return the minimum operations.
Why it works: The median minimizes the sum of absolute differences in a set. By maintaining a balanced data structure as we slide the window, we guarantee that at each step we compute the optimal number of operations for the current window efficiently. The invariant is that the heaps always represent the lower and upper halves of the window, ensuring the median is correct.
Python Solution
from typing import List
import heapq
from collections import defaultdict
class DualHeap:
def __init__(self):
self.small = [] # max heap via negatives
self.large = [] # min heap
self.delayed = defaultdict(int)
self.small_size = 0
self.large_size = 0
self.small_sum = 0
self.large_sum = 0
def prune_small(self) -> None:
while self.small:
num = -self.small[0]
if self.delayed[num]:
heapq.heappop(self.small)
self.delayed[num] -= 1
else:
break
def prune_large(self) -> None:
while self.large:
num = self.large[0]
if self.delayed[num]:
heapq.heappop(self.large)
self.delayed[num] -= 1
else:
break
def rebalance(self) -> None:
if self.small_size > self.large_size + 1:
value = -heapq.heappop(self.small)
self.small_size -= 1
self.small_sum -= value
heapq.heappush(self.large, value)
self.large_size += 1
self.large_sum += value
self.prune_small()
elif self.small_size < self.large_size:
value = heapq.heappop(self.large)
self.large_size -= 1
self.large_sum -= value
heapq.heappush(self.small, -value)
self.small_size += 1
self.small_sum += value
self.prune_large()
def add(self, num: int) -> None:
if not self.small or num <= -self.small[0]:
heapq.heappush(self.small, -num)
self.small_size += 1
self.small_sum += num
else:
heapq.heappush(self.large, num)
self.large_size += 1
self.large_sum += num
self.rebalance()
def remove(self, num: int) -> None:
self.delayed[num] += 1
if num <= -self.small[0]:
self.small_size -= 1
self.small_sum -= num
if num == -self.small[0]:
self.prune_small()
else:
self.large_size -= 1
self.large_sum -= num
if self.large and num == self.large[0]:
self.prune_large()
self.rebalance()
def cost(self) -> int:
median = -self.small[0]
left_cost = median * self.small_size - self.small_sum
right_cost = self.large_sum - median * self.large_size
return left_cost + right_cost
class Solution:
def minOperations(self, nums: List[int], k: int) -> int:
dh = DualHeap()
for i in range(k):
dh.add(nums[i])
answer = dh.cost()
for i in range(k, len(nums)):
dh.add(nums[i])
dh.remove(nums[i - k])
answer = min(answer, dh.cost())
return answer
Implementation Explanation
The DualHeap class maintains the current window.
The small heap stores the lower half of the values and exposes the median through its top element. The large heap stores the upper half.
Because arbitrary deletions from a heap are expensive, the implementation uses lazy deletion. Removed values are recorded in delayed. Whenever such a value reaches the top of a heap, it is physically discarded.
The sums of each heap are maintained continuously. This allows the window cost to be computed in constant time through the median-distance formula.
The main solution initializes the first window, computes its cost, then repeatedly adds the incoming value and removes the outgoing value as the window slides across the array.
class Solution: def minOperations(self, nums: List[int], k: int) -> int: if k == 1: return 0 # any single element is already equal
# Helper functions for heaps
def add(num):
if not lo or num <= -lo[0]:
heapq.heappush(lo, -num)
nonlocal lo_sum
lo_sum += num
else:
heapq.heappush(hi, num)
nonlocal hi_sum
hi_sum += num
balance()
def remove(num):
if num <= -lo[0]:
lo.remove(-num)
heapq.heapify(lo)
nonlocal lo_sum
lo_sum -= num
else:
hi.remove(num)
heapq.heapify(hi)
nonlocal hi_sum
hi_sum -= num
balance()
def balance():
while len(lo) > len(hi) + 1:
val = -heapq.heappop(lo)
lo_sum -= val
heapq.heappush(hi, val)
nonlocal hi_sum
hi_sum += val
while len(hi) > len(lo):
val = heapq.heappop(hi)
hi_sum -= val
heapq.heappush(lo, -val)
nonlocal lo_sum
lo_sum += val
lo, hi = [], []
lo_sum, hi_sum = 0, 0
n = len(nums)
# Initialize first window
for i in range(k):
add(nums[i])
median = -lo[0]
min_ops = median * len(lo) - lo_sum + hi_sum - median * len(hi)
for i in range(k, n):
remove(nums[i - k])
add(nums[i])
median = -lo[0]
ops = median * len(lo) - lo_sum + hi_sum - median * len(hi)
min_ops = min(min_ops, ops)
return min_ops
**Implementation Walkthrough:** The Python solution uses two heaps to maintain the current window's lower and upper halves. The helper functions `add`, `remove`, and `balance` ensure the heaps stay balanced after each insertion or deletion. `lo_sum` and `hi_sum` allow computation of total operations efficiently without iterating over the window each time. Sliding the window from left to right updates the heaps and median continuously, tracking the minimal operations.
## Go Solution
```go
package main
import (
"container/heap"
)
type MaxHeap []int
type MinHeap []int
func (h MaxHeap) Len() int { return len(h) }
func (h MaxHeap) Less(i, j int) bool { return h[i] > h[j] }
func (h MaxHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h *MaxHeap) Push(x interface{}) {
*h = append(*h, x.(int))
}
func (h *MaxHeap) Pop() interface{} {
old := *h
n := len(old)
x := old[n-1]
*h = old[:n-1]
return x
}
func (h MinHeap) Len() int { return len(h) }
func (h MinHeap) Less(i, j int) bool { return h[i] < h[j] }
func (h MinHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h *MinHeap) Push(x interface{}) {
*h = append(*h, x.(int))
}
func (h *MinHeap) Pop() interface{} {
old := *h
n := len(old)
x := old[n-1]
*h = old[:n-1]
return x
}
type DualHeap struct {
small MaxHeap
large MinHeap
delayed map[int]int
smallSize int
largeSize int
smallSum int64
largeSum int64
}
func NewDualHeap() *DualHeap {
dh := &DualHeap{
delayed: make(map[int]int),
}
heap.Init(&dh.small)
heap.Init(&dh.large)
return dh
}
func (dh *DualHeap) pruneSmall() {
for dh.small.Len() > 0 {
x := dh.small[0]
if dh.delayed[x] > 0 {
heap.Pop(&dh.small)
dh.delayed[x]--
} else {
break
}
}
}
func (dh *DualHeap) pruneLarge() {
for dh.large.Len() > 0 {
x := dh.large[0]
if dh.delayed[x] > 0 {
heap.Pop(&dh.large)
dh.delayed[x]--
} else {
break
}
}
}
func (dh *DualHeap) rebalance() {
if dh.smallSize > dh.largeSize+1 {
x := heap.Pop(&dh.small).(int)
dh.smallSize--
dh.smallSum -= int64(x)
heap.Push(&dh.large, x)
dh.largeSize++
dh.largeSum += int64(x)
dh.pruneSmall()
} else if dh.smallSize < dh.largeSize {
x := heap.Pop(&dh.large).(int)
dh.largeSize--
dh.largeSum -= int64(x)
heap.Push(&dh.small, x)
dh.smallSize++
dh.smallSum += int64(x)
dh.pruneLarge()
}
}
func (dh *DualHeap) add(num int) {
if dh.small.Len() == 0 || num <= dh.small[0] {
heap.Push(&dh.small, num)
dh.smallSize++
dh.smallSum += int64(num)
} else {
heap.Push(&dh.large, num)
dh.largeSize++
dh.largeSum += int64(num)
}
dh.rebalance()
}
func (dh *DualHeap) remove(num int) {
dh.delayed[num]++
if num <= dh.small[0] {
dh.smallSize--
dh.smallSum -= int64(num)
if num == dh.small[0] {
dh.pruneSmall()
}
} else {
dh.largeSize--
dh.largeSum -= int64(num)
if dh.large.Len() > 0 && num == dh.large[0] {
dh.pruneLarge()
}
}
dh.rebalance()
}
func (dh *DualHeap) cost() int64 {
median := int64(dh.small[0])
leftCost := median*int64(dh.smallSize) - dh.smallSum
rightCost := dh.largeSum - median*int64(dh.largeSize)
return leftCost + rightCost
}
func minOperations(nums []int, k int) int64 {
dh := NewDualHeap()
for i := 0; i < k; i++ {
dh.add(nums[i])
}
ans := dh.cost()
for i := k; i < len(nums); i++ {
dh.add(nums[i])
dh.remove(nums[i-k])
cost := dh.cost()
if cost < ans {
ans = cost
}
}
return ans
}
Go-Specific Notes
The Go implementation uses container/heap to build both the max-heap and min-heap. Because operation counts can exceed 32-bit limits, all accumulated sums and costs are stored as int64.
The lazy-deletion map works exactly like the Python version and ensures that window removals remain logarithmic.
Worked Examples
Example 1
Input:
nums = [4, -3, 2, 1, -4, 6]
k = 3
Window 1:
[4, -3, 2]
Sorted:
[-3, 2, 4]
Median:
2
Cost:
| Value | Distance to 2 |
|---|---|
| 4 | 2 |
| -3 | 5 |
| 2 | 0 |
Total:
7
Window 2:
[-3, 2, 1]
Sorted:
[-3, 1, 2]
Median:
1
Cost:
| Value | Distance to 1 |
|---|---|
| -3 | 4 |
| 2 | 1 |
| 1 | 0 |
Total:
5
Window 3:
[2, 1, -4]
Median:
1
Cost:
1 + 0 + 5 = 6
Window 4:
[1, -4, 6]
Median:
1
Cost:
0 + 5 + 5 = 10
Minimum:
min(7, 5, 6, 10) = 5
Answer:
5
Example 2
Input:
nums = [-2, -2, 3, 1, 4]
k = 2
First window:
[-2, -2]
Median:
-2
Cost:
0
Since a zero-cost window already exists, the final answer is:
0
Complexity Analysis
| Measure | Complexity | Explanation |
|---|---|---|
| Time | O(n log k) | Each insertion and removal costs O(log k) |
| Space | O(k) | Heaps and lazy-deletion map store window elements |
The algorithm processes each element a constant number of times. Every heap insertion, removal, and rebalance operation takes logarithmic time in the window size, yielding an overall complexity of O(n log k). The heaps together contain at most k active elements, so memory usage is O(k).
Test Cases
sol = Solution()
assert sol.minOperations([4, -3, 2, 1, -4, 6], 3) == 5 # example 1
assert sol.minOperations([-2, -2, 3, 1, 4], 2) == 0 # example 2
assert sol.minOperations([1, 1], 2) == 0 # smallest valid input
assert sol.minOperations([1, 2], 2) == 1 # one change needed
assert sol.minOperations([5, 5, 5, 5], 3) == 0 # already equal
assert sol.minOperations([1, 2, 3, 4], 4) == 4 # entire array is the window
assert sol.minOperations([-5, -1, -3], 3) == 4 # negative values
assert sol.minOperations([1, 100, 1], 2) == 99 # large gap
assert sol.minOperations([1, 3, 2, 2, 2], 3) == 0 # later window already equal
assert sol.minOperations([10, 0, 10, 0, 10], 3) == 10 # alternating values
assert sol.minOperations([0, 0, 1000000, 0, 0], 2) == 0 # large magnitude values
Test Summary
| Test | Why |
|---|---|
[4,-3,2,1,-4,6], k=3 |
Official example |
[-2,-2,3,1,4], k=2 |
Official example with answer 0 |
[1,1], k=2 |
Smallest valid size |
[1,2], k=2 |
Single operation needed |
[5,5,5,5], k=3 |
All values already equal |
[1,2,3,4], k=4 |
Entire array forms one window |
[-5,-1,-3], k=3 |
Negative numbers |
[1,100,1], k=2 |
Large difference between values |
[1,3,2,2,2], k=3 |
Optimal window appears later |
[10,0,10,0,10], k=3 |
Repeated sliding median updates |
[0,0,1000000,0,0], k=2 |
Large magnitude values and zero-cost window |
Edge Cases
Window Already Contains Equal Elements
A common mistake is assuming every window requires modifications. If some window already consists entirely of the same value, the answer should be 0.
For example:
[-2, -2, 3, 1, 4]
k = 2
The implementation naturally handles this because the median-distance cost becomes zero.
k Equals n
When k equals the array length, there is only one valid window. Some sliding-window implementations accidentally assume at least one slide occurs.
For example:
[1, 2, 3, 4]
k = 4
The algorithm computes the first window cost and immediately returns it if no further windows exist.
Large Positive and Negative Values
Values may range from -10^6 to 10^6, and the total operation count can become much larger than a 32-bit integer.
For example:
[-1000000, 1000000, -1000000]
The implementation stores heap sums and costs using 64-bit arithmetic (int64 in Go, arbitrary precision integers in Python), preventing overflow.
Many Duplicate Values
Lazy deletion can become tricky when several identical values exist in both heaps.
For example:
[2, 2, 2, 2, 2]
The delayed-removal map tracks counts rather than individual positions. Each duplicate is removed exactly once when it reaches the heap top, ensuring correctness even with large numbers of repeated values. "sort" )
type IntHeap []int
func (h IntHeap) Len() int { return len(h) } func (h IntHeap) Less(i, j int) bool { return h[i] < h[j] } func (h IntHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h *IntHeap) Push(x interface{}) { *h = append(*h, x.(int)) } func (h *IntHeap) Pop() interface{} { old := *h n := len(old) x := old[n-1] *h = old[0 : n-1] return x }
func minOperations(nums []int, k int) int64 { n := len(nums) if k == 1 { return 0 } window := make([]int, k) copy(window, nums[:k]) sort.Ints(window) medianIndex := (k - 1) / 2 median := window[medianIndex] var loSum, hiSum int64 for i, val := range window { if i <= medianIndex { loSum += int64(val) } else { hiSum += int64(val) } } minOps := medianint64(len(window[:medianIndex+1])) - loSum + hiSum - medianint64(len(window[medianIndex+1:])) for i := k; i < n; i++ { window = append(window[1:], nums[i]) sort.Ints(window) median = window[medianIndex] loSum, hiSum = 0, 0 for j, val := range window { if j <= medianIndex { loSum += int64(val) } else { hiSum += int64(val) } } ops := medianint64(len(window[:medianIndex+1])) - loSum + hiSum - medianint64(len(window[medianIndex+1:])) if ops < minOps { minOps = ops }