You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
API Kernel: HeapTopKCore Mechanism: Maintain a bounded collection of elements with efficient access to extreme values (min/max).
This document presents the canonical heap templates for top-k selection, streaming median, k-way merge, and greedy scheduling problems. Each implementation follows consistent naming conventions and includes detailed logic explanations.
defheap_top_k(elements: Iterable[T], k: int, key: Callable=None) ->List[T]:
""" Find top-k elements efficiently. Strategy: Maintain a min-heap of size k. - Elements larger than heap root replace the root - After processing, heap contains k largest elements Time: O(n log k), Space: O(k) Why min-heap for max problem? - Min-heap root is the smallest of the k largest - Easy to check if new element should enter the set - If new_element > root, it belongs in top-k; evict root """min_heap= []
forelementinelements:
val=key(element) ifkeyelseelementiflen(min_heap) <k:
heapq.heappush(min_heap, (val, element))
elifval>min_heap[0][0]:
heapq.heapreplace(min_heap, (val, element))
return [item[1] foriteminmin_heap]
1.4 Pattern Variants
Variant
API Kernel
Use When
Key Insight
Kth Element
HeapTopK
Find kth largest/smallest
Min-heap of size k, root is answer
Top-K
HeapTopK
Find k largest/smallest elements
Same as kth, return all k elements
Streaming Median
HeapTopK
Maintain median over stream
Two heaps: max-heap for lower, min-heap for upper
K-Way Merge
KWayMerge
Merge k sorted sequences
Min-heap of k heads, always pop smallest
Interval Scheduling
HeapTopK
Meeting rooms, overlapping intervals
Min-heap of end times for greedy assignment
Task Scheduler
HeapTopK
Schedule with cooldown constraints
Max-heap for greedy selection by frequency
Greedy Simulation
HeapTopK
Repeatedly process largest/smallest
Max-heap for repeated extraction
1.5 Heap Size Strategies
Strategy
When to Use
Time Complexity
Size k min-heap
Top-k largest, kth largest
O(n log k)
Size k max-heap
Top-k smallest, kth smallest
O(n log k)
Full heap
Need all elements ordered
O(n log n)
Two heaps
Running median, balanced partition
O(n log n)
2. Base Template: Kth Largest Element (LeetCode 215)
Problem: Find the kth largest element in an unsorted array.
Invariant: Min-heap of size k contains the k largest elements; root is the kth largest.
Role: BASE TEMPLATE for HeapTopK API Kernel.
2.1 Pattern Recognition
Signal
Indicator
"kth largest/smallest"
→ Heap of size k
"without sorting"
→ Quickselect or heap
"stream of numbers"
→ Online algorithm with heap
2.2 Implementation
# Pattern: heap_kth_element# See: docs/patterns/heap/templates.md Section 1 (Base Template)classSolutionHeap:
deffindKthLargest(self, nums: List[int], k: int) ->int:
""" Find kth largest using min-heap of size k. Key Insight: - Maintain k largest elements seen so far in a min-heap - Root of min-heap = smallest of the k largest = kth largest Why min-heap for max problem? - When new element > root, it belongs in top-k - Replace root (smallest of k largest) with new element - After n elements, root is exactly the kth largest """importheapq# Min-heap stores k largest elementsmin_heap: list[int] = []
fornuminnums:
iflen(min_heap) <k:
# Phase 1: Fill heap until size kheapq.heappush(min_heap, num)
elifnum>min_heap[0]:
# Phase 2: New element larger than kth largest# Replace the current kth largestheapq.heapreplace(min_heap, num)
# Root is the kth largest elementreturnmin_heap[0]
classSolutionQuickselect:
deffindKthLargest(self, nums: List[int], k: int) ->int:
""" Quickselect algorithm: O(n) average, O(n²) worst. Partition-based selection without full sorting. Use random pivot to avoid worst-case on sorted input. """importrandomtarget_idx=k-1# kth largest at index k-1 in descending orderdefpartition(left: int, right: int) ->int:
# Random pivot to avoid O(n²) on sorted arrayspivot_idx=random.randint(left, right)
pivot_val=nums[pivot_idx]
nums[pivot_idx], nums[right] =nums[right], nums[pivot_idx]
store_idx=leftforiinrange(left, right):
ifnums[i] >=pivot_val: # >= for descending ordernums[store_idx], nums[i] =nums[i], nums[store_idx]
store_idx+=1nums[store_idx], nums[right] =nums[right], nums[store_idx]
returnstore_idxleft, right=0, len(nums) -1whileleft<=right:
pivot_idx=partition(left, right)
ifpivot_idx==target_idx:
returnnums[pivot_idx]
elifpivot_idx<target_idx:
left=pivot_idx+1else:
right=pivot_idx-1returnnums[left]
2.5 Complexity Comparison
Approach
Time (Average)
Time (Worst)
Space
Min-heap size k
O(n log k)
O(n log k)
O(k)
Quickselect
O(n)
O(n²)
O(1)
Sorting
O(n log n)
O(n log n)
O(n)
2.6 When to Use Each
Scenario
Best Approach
k is small (k << n)
Min-heap O(n log k) ≈ O(n)
k ≈ n/2
Quickselect O(n)
Need top-k elements, not just kth
Min-heap
Streaming data
Min-heap (online algorithm)
Memory constrained
Quickselect (in-place)
3. Top K Frequent Elements (LeetCode 347)
Problem: Given an integer array, return the k most frequent elements.
Variant: heap_top_k with frequency as sort key
Delta from 215: Sort by frequency instead of value; bucket sort alternative.
3.1 Pattern Recognition
Signal
Indicator
"k most frequent"
→ Count frequencies, then top-k selection
"top k by some metric"
→ Heap with custom comparator
"unique answer guaranteed"
→ No tie-breaking needed
3.2 Implementation
# Pattern: heap_top_k# See: docs/patterns/heap/templates.md Section 2fromcollectionsimportCounterimportheapqclassSolutionHeap:
deftopKFrequent(self, nums: List[int], k: int) ->List[int]:
""" Find k most frequent elements using min-heap. Two-Phase Approach: 1. Count frequencies: O(n) 2. Top-k selection: O(m log k) where m = unique elements Why min-heap for max-frequency? - Store (frequency, element) tuples - Heap root = smallest frequency among top-k - Elements with higher frequency replace root """# Phase 1: Count frequenciesfrequency_map: dict[int, int] =Counter(nums)
# Phase 2: Maintain min-heap of size kmin_heap: list[tuple[int, int]] = [] # (frequency, element)forelement, freqinfrequency_map.items():
iflen(min_heap) <k:
heapq.heappush(min_heap, (freq, element))
eliffreq>min_heap[0][0]:
heapq.heapreplace(min_heap, (freq, element))
# Extract elements (frequency is index 0, element is index 1)return [elementforfreq, elementinmin_heap]
classSolutionBucket:
deftopKFrequent(self, nums: List[int], k: int) ->List[int]:
""" Bucket sort approach: O(n) time. Key Insight: - Frequency is bounded: 1 <= freq <= n - Use array index as frequency bucket - Iterate from highest frequency bucket Why O(n)? - At most n distinct elements - At most n buckets (frequency 1 to n) - Single pass through buckets """frequency_map: dict[int, int] =Counter(nums)
# Bucket: index = frequency, value = list of elements with that frequency# Frequency can be at most len(nums)buckets: list[list[int]] = [[] for_inrange(len(nums) +1)]
forelement, freqinfrequency_map.items():
buckets[freq].append(element)
# Collect k elements starting from highest frequencyresult: list[int] = []
forfreqinrange(len(nums), 0, -1):
forelementinbuckets[freq]:
result.append(element)
iflen(result) ==k:
returnresultreturnresult
3.5 Complexity Comparison
Approach
Time
Space
Notes
Min-heap
O(n + m log k)
O(m + k)
m = unique elements
Bucket sort
O(n)
O(n)
Better when m is large
Quickselect
O(n) avg
O(m)
In-place on frequency array
Sorting
O(n + m log m)
O(m)
Simple but slower
3.6 When to Use Each
Scenario
Best Approach
General case
Heap O(n + m log k)
k is large (k ≈ m)
Bucket sort O(n)
Memory constrained
Quickselect
Need sorted output
Sort by frequency
4. Find Median from Data Stream (LeetCode 295)
Problem: Design a data structure that supports adding numbers and finding the median.
Pattern: Two heaps (max-heap for lower half, min-heap for upper half)
Role: BASE TEMPLATE for streaming median problems.
4.1 Pattern Recognition
Signal
Indicator
"median"
→ Two heaps or sorted structure
"data stream" / "online"
→ Efficient insertion, O(log n)
"running median"
→ Maintain sorted halves
4.2 The Two-Heap Insight
All numbers seen: [1, 2, 3, 4, 5]
median
↓
Lower half: [1, 2] | Upper half: [3, 4, 5]
max-heap ↑ ↑ min-heap
Median = 3 (odd count: min-heap root)
or = (2 + 3) / 2 = 2.5 (even count: average of roots)
4.3 Implementation
# Pattern: heap_median_stream# See: docs/patterns/heap/templates.md Section 3importheapqclassMedianFinder:
""" Maintain median of a data stream using two heaps. Data Structure Design: - lower_half (max-heap): stores smaller half of numbers - upper_half (min-heap): stores larger half of numbers Invariants: 1. All elements in lower_half <= all elements in upper_half 2. Size difference: |lower_half| - |upper_half| <= 1 3. If odd count, lower_half has one more element Why Two Heaps? - Median requires knowing the middle element(s) - Heaps give O(1) access to extreme values - Max-heap for lower half → O(1) access to largest of smaller numbers - Min-heap for upper half → O(1) access to smallest of larger numbers """def__init__(self):
# Max-heap for lower half (store negatives for max-heap behavior)self.lower_half: list[int] = [] # max-heap via negation# Min-heap for upper halfself.upper_half: list[int] = [] # min-heapdefaddNum(self, num: int) ->None:
""" Add number while maintaining heap invariants. Strategy: 1. Always add to lower_half first (max-heap) 2. Move largest from lower to upper (balances the halves) 3. If upper becomes larger, move smallest back to lower This ensures: - Elements are correctly partitioned (lower <= upper) - Sizes are balanced (lower has equal or one more) """# Step 1: Add to lower half (max-heap, store negative)heapq.heappush(self.lower_half, -num)
# Step 2: Move largest from lower to upper# This ensures lower_half max <= upper_half minlargest_lower=-heapq.heappop(self.lower_half)
heapq.heappush(self.upper_half, largest_lower)
# Step 3: Rebalance if upper has more elementsiflen(self.upper_half) >len(self.lower_half):
smallest_upper=heapq.heappop(self.upper_half)
heapq.heappush(self.lower_half, -smallest_upper)
deffindMedian(self) ->float:
""" Return median in O(1) time. Cases: - Odd count: median is the root of lower_half (the extra element) - Even count: median is average of both roots """iflen(self.lower_half) >len(self.upper_half):
# Odd count: lower has one morereturnfloat(-self.lower_half[0])
else:
# Even count: average of two middle elementsreturn (-self.lower_half[0] +self.upper_half[0]) /2.0
Problem: Merge k sorted linked lists into one sorted list.
Pattern: K-way merge using min-heap
Role: BASE TEMPLATE for KWayMerge API Kernel.
5.1 Pattern Recognition
Signal
Indicator
"merge k sorted"
→ K-way merge with heap
"k sorted arrays/lists"
→ Min-heap of k heads
"external merge sort"
→ Same pattern at scale
5.2 The K-Way Merge Insight
k=3 sorted lists:
List 0: 1 → 4 → 5
List 1: 1 → 3 → 4
List 2: 2 → 6
Min-heap of heads: [(1, 0), (1, 1), (2, 2)]
↑
Pop smallest, push successor
Result: 1 → 1 → 2 → 3 → 4 → 4 → 5 → 6
5.3 Implementation
# Pattern: merge_k_sorted_heap# See: docs/patterns/heap/templates.md Section 4importheapqfromtypingimportList, OptionalclassListNode:
def__init__(self, val: int=0, next: 'ListNode'=None):
self.val=valself.next=nextclassSolutionHeap:
defmergeKLists(self, lists: List[Optional[ListNode]]) ->Optional[ListNode]:
""" Merge k sorted lists using min-heap. Key Insight: - At any point, the next element in the merged list is the smallest among all current list heads - Min-heap gives O(log k) access to the smallest head - After popping, push the successor node Why (val, index, node) tuple? - val: primary sort key - index: tie-breaker (ListNode not comparable) - node: reference to actual node for result building """ifnotlists:
returnNone# Min-heap: (node_value, list_index, node)# list_index breaks ties when values are equalmin_heap: list[tuple[int, int, ListNode]] = []
# Initialize heap with all non-empty list headsfori, nodeinenumerate(lists):
ifnode:
heapq.heappush(min_heap, (node.val, i, node))
# Dummy head simplifies list constructiondummy=ListNode(0)
tail=dummywhilemin_heap:
# Pop the smallest nodeval, i, node=heapq.heappop(min_heap)
# Append to resulttail.next=nodetail=tail.next# Push successor if existsifnode.next:
heapq.heappush(min_heap, (node.next.val, i, node.next))
returndummy.next
Problem: Given an array of meeting time intervals, find the minimum number of conference rooms required.
Pattern: Greedy assignment with min-heap of end times
Variant: Interval scheduling with heap
6.1 Pattern Recognition
Signal
Indicator
"minimum rooms/resources"
→ Greedy with heap
"overlapping intervals"
→ Sort + track end times
"schedule tasks"
→ Min-heap for earliest available
6.2 The Greedy Insight
Meetings: [[0,30], [5,10], [15,20]]
Timeline:
0 5 10 15 20 25 30
|-------- Meeting 0 --------|
|- M1 -|
|-M2-|
At t=5: Meeting 1 starts, Meeting 0 still running → need 2 rooms
At t=15: Meeting 2 starts, Meeting 0 still running → need 2 rooms
Minimum rooms = 2 (max concurrent meetings)
6.3 Implementation
# Pattern: heap_interval_scheduling# See: docs/patterns/heap/templates.md Section 5importheapqclassSolutionHeap:
defminMeetingRooms(self, intervals: List[List[int]]) ->int:
""" Find minimum meeting rooms using min-heap of end times. Key Insight: - Sort meetings by start time (process chronologically) - Min-heap tracks when each room becomes free (end times) - For each meeting: if starts after earliest end, reuse room - Otherwise, allocate new room Why min-heap of end times? - We need the room that frees up earliest - If new meeting starts >= earliest end, room can be reused - Pop the old end time, push new end time """ifnotintervals:
return0# Sort by start timeintervals.sort(key=lambdax: x[0])
# Min-heap of end times (when rooms become free)end_times: list[int] = []
forstart, endinintervals:
# Check if earliest-ending room is now freeifend_timesandend_times[0] <=start:
# Room is free, reuse it (pop old end, push new end)heapq.heapreplace(end_times, end)
else:
# All rooms busy, allocate new roomheapq.heappush(end_times, end)
# Number of rooms = size of heapreturnlen(end_times)
6.4 Trace Example
Input: intervals = [[0,30], [5,10], [15,20]]
After sorting (already sorted by start): [[0,30], [5,10], [15,20]]
Process [0, 30]:
end_times = [30]
Rooms needed: 1
Process [5, 10]:
Earliest end = 30, but meeting starts at 5 (5 < 30)
Cannot reuse, allocate new room
end_times = [10, 30]
Rooms needed: 2
Process [15, 20]:
Earliest end = 10, meeting starts at 15 (15 >= 10)
Reuse room! Replace 10 with 20
end_times = [20, 30]
Rooms needed: 2
Result: 2 rooms
6.5 Alternative: Sweep Line
classSolutionSweepLine:
defminMeetingRooms(self, intervals: List[List[int]]) ->int:
""" Sweep line: track events at each time point. Events: - +1 at start time (meeting begins) - -1 at end time (meeting ends) Max concurrent events = max rooms needed. """events= []
forstart, endinintervals:
events.append((start, 1)) # Meeting startsevents.append((end, -1)) # Meeting ends# Sort by time; if same time, process ends before starts# (room frees up before new meeting uses it)events.sort(key=lambdax: (x[0], x[1]))
max_rooms=0current_rooms=0fortime, deltainevents:
current_rooms+=deltamax_rooms=max(max_rooms, current_rooms)
returnmax_rooms
6.6 Complexity Comparison
Approach
Time
Space
Notes
Min-heap
O(n log n)
O(n)
Sort + heap ops
Sweep line
O(n log n)
O(n)
Sort events
Brute force
O(n²)
O(1)
Check all pairs
6.7 Related Problems
Problem
Variation
LC 252: Meeting Rooms
Just check if any overlap
LC 56: Merge Intervals
Merge overlapping intervals
LC 435: Non-overlapping Intervals
Min removals for no overlap
LC 1094: Car Pooling
Range update + capacity check
7. Task Scheduler (LeetCode 621)
Problem: Schedule tasks with cooldown constraint n between same tasks. Return minimum time units.
Pattern: Greedy scheduling with max-heap
Variant: Heap + greedy for optimal ordering
7.1 Pattern Recognition
Signal
Indicator
"minimum time to complete"
→ Greedy scheduling
"cooldown period"
→ Track availability
"most frequent first"
→ Max-heap by count
7.2 The Greedy Insight
Tasks: A A A B B B, n = 2
Greedy: Always schedule most frequent available task
Optimal schedule:
A B _ A B _ A B
1 2 3 4 5 6 7 8
Total time = 8 units
Why most frequent first?
- Reduces idle time by distributing high-frequency tasks
- Idle slots appear when no task is available (all in cooldown)
7.3 Implementation
# Pattern: heap_task_scheduler# See: docs/patterns/heap/templates.md Section 6importheapqfromcollectionsimportCounter, dequeclassSolutionHeap:
defleastInterval(self, tasks: List[str], n: int) ->int:
""" Schedule tasks with cooldown using max-heap. Strategy: 1. Greedily pick most frequent available task 2. Track cooldown using a queue (task becomes available at time t) 3. If no task available, idle (time still advances) Why max-heap? - We want to process high-frequency tasks first - This minimizes idle time at the end - Max-heap gives O(log k) access to most frequent """# Count task frequenciestask_counts=Counter(tasks)
# Max-heap of remaining counts (negate for max-heap)max_heap= [-countforcountintask_counts.values()]
heapq.heapify(max_heap)
# Queue of (available_time, remaining_count) for tasks in cooldowncooldown_queue: deque[tuple[int, int]] =deque()
time=0whilemax_heaporcooldown_queue:
time+=1# Check if any task exits cooldownifcooldown_queueandcooldown_queue[0][0] ==time:
available_time, remaining=cooldown_queue.popleft()
heapq.heappush(max_heap, -remaining)
ifmax_heap:
# Execute most frequent available taskcount=-heapq.heappop(max_heap)
count-=1ifcount>0:
# Task has more instances, put in cooldowncooldown_queue.append((time+n+1, count))
# else: idle (no task available)returntime
7.4 Trace Example
Input: tasks = ['A','A','A','B','B','B'], n = 2
Initial:
Counts: A=3, B=3
max_heap: [-3, -3]
cooldown_queue: []
Time 1: Pop A (count=3→2), cooldown until time=4
heap: [-3], queue: [(4, 2)]
Time 2: Pop B (count=3→2), cooldown until time=5
heap: [], queue: [(4, 2), (5, 2)]
Time 3: Idle (heap empty, queue not ready)
heap: [], queue: [(4, 2), (5, 2)]
Time 4: A exits cooldown, pop A (count=2→1)
heap: [-2], queue: [(5, 2), (7, 1)]
Actually: A exits, push to heap, then pop
... continue ...
Final time = 8
7.5 Alternative: Math Formula
classSolutionMath:
defleastInterval(self, tasks: List[str], n: int) ->int:
""" Mathematical approach based on most frequent task. Observation: - Most frequent task determines the "frame" - Frame: chunks of size (n+1) with one instance of max-freq task Formula: - frames = (max_freq - 1) - frame_size = n + 1 - base_time = frames * frame_size - Add tasks with max_freq (they fill the last partial frame) Edge case: - If total tasks > formula result, no idle needed """task_counts=Counter(tasks)
max_freq=max(task_counts.values())
max_freq_count=sum(1forcountintask_counts.values() ifcount==max_freq)
# Formula: (max_freq - 1) frames × (n + 1) slots + max_freq_count final slotsformula_time= (max_freq-1) * (n+1) +max_freq_count# Actual time is max of formula and total tasksreturnmax(formula_time, len(tasks))
7.6 Visual Explanation of Formula
tasks = [A,A,A,A,B,B,B,C,C], n = 2, max_freq = 4 (task A)
Formula visualization:
Frame 1: A _ _
Frame 2: A _ _
Frame 3: A _ _
Frame 4: A
Slots per frame = n + 1 = 3
Frames (excluding last) = max_freq - 1 = 3
Base slots = 3 × 3 = 9
Plus final A = 1
Total slots = 10 (minimum, may need more if many tasks)
7.7 Complexity Comparison
Approach
Time
Space
Notes
Max-heap
O(n × m)
O(k)
n=cooldown, m=total tasks, k=unique
Math formula
O(n)
O(k)
Count frequencies only
7.8 Related Problems
Problem
Variation
LC 767: Reorganize String
No same adjacent chars
LC 358: Rearrange String k Distance
k-distance apart
LC 1834: Single-Threaded CPU
Task scheduling with priority
8. Last Stone Weight (LeetCode 1046)
Problem: Smash two heaviest stones repeatedly, return weight of last stone (or 0).
Pattern: Greedy simulation with max-heap
Variant: Simple heap operations for repeated selection
8.1 Pattern Recognition
Signal
Indicator
"repeatedly pick largest/smallest"
→ Max/min-heap
"simulation"
→ Heap for efficient selection
"until one/none left"
→ Process until heap size ≤ 1
8.2 Implementation
# Pattern: heap_greedy_simulation# See: docs/patterns/heap/templates.md Section 7importheapqclassSolutionHeap:
deflastStoneWeight(self, stones: List[int]) ->int:
""" Simulate stone smashing using max-heap. Game Rules: 1. Pick two heaviest stones x, y (x <= y) 2. If x == y: both destroyed 3. If x != y: stone of weight y - x remains Why max-heap? - Need repeated access to two largest elements - Python heapq is min-heap, so negate values - O(log n) per operation vs O(n) for linear search """# Convert to max-heap (negate all values)max_heap= [-stoneforstoneinstones]
heapq.heapify(max_heap) # O(n) heapify# Simulate until 0 or 1 stone remainswhilelen(max_heap) >1:
# Pop two largest (remember to negate)largest=-heapq.heappop(max_heap)
second=-heapq.heappop(max_heap)
# If different weights, push the differenceiflargest!=second:
heapq.heappush(max_heap, -(largest-second))
# Return last stone weight (or 0 if no stones left)return-max_heap[0] ifmax_heapelse0