Heap and Priority Queue Pattern
A heap is a tree-based data structure that maintains elements in a specific order. A priority queue is typically implemented using a heap and allows efficient access to the minimum or maximum element. This pattern is essential for problems requiring frequent access to extremes or maintaining sorted order dynamically.
When to Use Heap/Priority Queue
Use this pattern when:
- Need to repeatedly access minimum or maximum element
- Finding k largest or smallest elements
- Maintaining a running median or percentile
- Merging k sorted lists or streams
- Scheduling tasks based on priority
- Implementing Dijkstra's shortest path algorithm
Core Approach
Heap Operations
Min Heap: Parent ≤ Children (smallest at root) Max Heap: Parent ≥ Children (largest at root)
- Key Operations
push(x): Insert element - O(log n)pop(): Remove min/max - O(log n)peek(): View min/max - O(1)heapify(): Build heap - O(n)
Common Patterns
- K-th Element Problems: Use heap of size k
- Merge Streams: Use min heap with one element from each stream
- Scheduling: Use heap ordered by deadline or priority
- Running Statistics: Use two heaps (min and max)
Problem 1: Kth Largest Element in an Array
Difficulty: Medium
LeetCode: #215
Problem Statement
Find the kth largest element in an unsorted array. Note that it is the kth largest element in sorted order, not the kth distinct element.
Example:
Input: nums = [3, 2, 1, 5, 6, 4], k = 2
Output: 5
Explanation: The array sorted is [1, 2, 3, 4, 5, 6], so 2nd largest is 5
Approach
Use a min heap of size k: - Maintain k largest elements in heap - Smallest of these k elements is at root (kth largest overall) - When heap exceeds size k, remove smallest element
This ensures we always have k largest elements, with kth largest at top.
Solution
import heapq
def find_kth_largest(nums, k):
"""
Find kth largest element using min heap.
Time Complexity: O(n log k) - process n elements with heap of size k
Space Complexity: O(k) - heap stores k elements
"""
# Maintain min heap of k largest elements
min_heap = []
for num in nums:
heapq.heappush(min_heap, num)
# Keep heap size at k
if len(min_heap) > k:
heapq.heappop(min_heap)
# Root of min heap is kth largest
return min_heap[0]
# Alternative: Using Python's nlargest
def find_kth_largest_alt(nums, k):
"""Alternative using heapq.nlargest."""
return heapq.nlargest(k, nums)[-1]
# Test
nums = [3, 2, 1, 5, 6, 4]
k = 2
print(find_kth_largest(nums, k)) # Output: 5
Step-by-Step Walkthrough
nums = [3, 2, 1, 5, 6, 4], k = 2
Process nums[0]=3:
heap = [3], size = 1 < k, keep all
Process nums[1]=2:
heap = [2, 3], size = 2 = k, keep all
Min heap: 2 at root
Process nums[2]=1:
heap = [1, 3, 2], size = 3 > k
Pop smallest: heap = [2, 3]
Min heap: 2 at root
Process nums[3]=5:
heap = [2, 5, 3], size = 3 > k
Pop smallest: heap = [3, 5]
Min heap: 3 at root
Process nums[4]=6:
heap = [3, 5, 6], size = 3 > k
Pop smallest: heap = [5, 6]
Min heap: 5 at root
Process nums[5]=4:
heap = [4, 6, 5], size = 3 > k
Pop smallest: heap = [5, 6]
Min heap: 5 at root
Result: 5 (kth largest element)
Heap maintains 2 largest elements: [5, 6]
Smallest of these (at root) is kth largest: 5
Problem 2: Merge K Sorted Lists
Difficulty: Hard
LeetCode: #23
Problem Statement
Merge k sorted linked lists into one sorted linked list.
Example:
Approach
Use min heap to track smallest unprocessed element from each list: 1. Add first element from each list to heap 2. Pop smallest element, add to result 3. Add next element from same list to heap 4. Repeat until all elements processed
Heap efficiently finds minimum among k candidates.
Solution
import heapq
class ListNode:
def __init__(self, val=0, next=None):
self.val = val
self.next = next
def merge_k_lists(lists):
"""
Merge k sorted linked lists.
Time Complexity: O(n log k) - n total nodes, heap operations on k elements
Space Complexity: O(k) - heap stores at most k nodes
"""
# Min heap: (value, list_index, node)
# Use index to break ties (Python heapq requires comparable elements)
min_heap = []
# Add first node from each list
for i, head in enumerate(lists):
if head:
heapq.heappush(min_heap, (head.val, i, head))
# Dummy head for result list
dummy = ListNode(0)
current = dummy
while min_heap:
# Get smallest element
val, i, node = heapq.heappop(min_heap)
# Add to result
current.next = node
current = current.next
# Add next node from same list
if node.next:
heapq.heappush(min_heap, (node.next.val, i, node.next))
return dummy.next
# Helper to create linked list from array
def create_list(arr):
"""Create linked list from array."""
dummy = ListNode(0)
current = dummy
for val in arr:
current.next = ListNode(val)
current = current.next
return dummy.next
# Helper to convert linked list to array
def list_to_array(head):
"""Convert linked list to array."""
result = []
while head:
result.append(head.val)
head = head.next
return result
# Test
lists = [
create_list([1, 4, 5]),
create_list([1, 3, 4]),
create_list([2, 6])
]
merged = merge_k_lists(lists)
print(list_to_array(merged)) # Output: [1, 1, 2, 3, 4, 4, 5, 6]
Step-by-Step Walkthrough
lists = [[1,4,5], [1,3,4], [2,6]]
Initialize heap with first nodes:
heap = [(1, 0, node1), (1, 1, node1'), (2, 2, node2)]
Heap structure (min): 1 at root
Iteration 1:
Pop (1, 0, node1) - smallest
Add to result: [1]
Push next from list 0: (4, 0, node4)
heap = [(1, 1, node1'), (2, 2, node2), (4, 0, node4)]
Iteration 2:
Pop (1, 1, node1') - smallest
Add to result: [1, 1]
Push next from list 1: (3, 1, node3)
heap = [(2, 2, node2), (3, 1, node3), (4, 0, node4)]
Iteration 3:
Pop (2, 2, node2) - smallest
Add to result: [1, 1, 2]
Push next from list 2: (6, 2, node6)
heap = [(3, 1, node3), (4, 0, node4), (6, 2, node6)]
Iteration 4:
Pop (3, 1, node3)
Add to result: [1, 1, 2, 3]
Push next from list 1: (4, 1, node4')
heap = [(4, 0, node4), (4, 1, node4'), (6, 2, node6)]
Iteration 5:
Pop (4, 0, node4)
Add to result: [1, 1, 2, 3, 4]
Push next from list 0: (5, 0, node5)
heap = [(4, 1, node4'), (5, 0, node5), (6, 2, node6)]
Iteration 6:
Pop (4, 1, node4')
Add to result: [1, 1, 2, 3, 4, 4]
No next from list 1
heap = [(5, 0, node5), (6, 2, node6)]
Iteration 7:
Pop (5, 0, node5)
Add to result: [1, 1, 2, 3, 4, 4, 5]
No next from list 0
heap = [(6, 2, node6)]
Iteration 8:
Pop (6, 2, node6)
Add to result: [1, 1, 2, 3, 4, 4, 5, 6]
No next from list 2
heap = []
Result: [1, 1, 2, 3, 4, 4, 5, 6]
Problem 3: Find Median from Data Stream
Difficulty: Hard
LeetCode: #295
Problem Statement
Design a data structure that supports adding integers and finding the median efficiently.
Example:
MedianFinder mf = new MedianFinder()
mf.addNum(1)
mf.addNum(2)
mf.findMedian() -> 1.5
mf.addNum(3)
mf.findMedian() -> 2
Approach
Use two heaps to maintain balance: - Max heap (left): Stores smaller half of numbers - Min heap (right): Stores larger half of numbers
Median is either: - Middle of max heap (if odd count) - Average of both heap tops (if even count)
Maintain heaps such that: - Sizes differ by at most 1 - All elements in max heap ≤ all elements in min heap
Solution
import heapq
class MedianFinder:
"""
Maintain running median using two heaps.
Time Complexity:
- addNum: O(log n) - heap operations
- findMedian: O(1) - just peek at heap tops
Space Complexity: O(n) - store all numbers
"""
def __init__(self):
# Max heap (smaller half) - use negative values for max heap
self.small = []
# Min heap (larger half)
self.large = []
def addNum(self, num):
"""Add number to data structure."""
# Add to max heap (small) first
heapq.heappush(self.small, -num)
# Balance: ensure max(small) <= min(large)
if self.small and self.large and (-self.small[0] > self.large[0]):
val = -heapq.heappop(self.small)
heapq.heappush(self.large, val)
# Balance sizes: difference should be at most 1
if len(self.small) > len(self.large) + 1:
val = -heapq.heappop(self.small)
heapq.heappush(self.large, val)
elif len(self.large) > len(self.small):
val = heapq.heappop(self.large)
heapq.heappush(self.small, -val)
def findMedian(self):
"""Find current median."""
if len(self.small) > len(self.large):
# Odd count: median is top of larger heap (small)
return -self.small[0]
else:
# Even count: median is average of both tops
return (-self.small[0] + self.large[0]) / 2.0
# Test
mf = MedianFinder()
mf.addNum(1)
mf.addNum(2)
print(mf.findMedian()) # Output: 1.5
mf.addNum(3)
print(mf.findMedian()) # Output: 2.0
Step-by-Step Walkthrough
Operations: addNum(1), addNum(2), findMedian(), addNum(3), findMedian()
addNum(1):
Push -1 to small: small = [-1], large = []
No balancing needed (large is empty)
Sizes: small=1, large=0
addNum(2):
Push -2 to small: small = [-2, -1], large = []
Check max(small) vs min(large): -(-2)=2 > empty, skip
Balance sizes: small=2, large=0 (diff > 1)
Move from small to large: pop -2, push 2
small = [-1], large = [2]
Sizes: small=1, large=1
findMedian():
Even count: (1 + 2) / 2 = 1.5
Result: 1.5
addNum(3):
Push -3 to small: small = [-3, -1], large = [2]
Check max(small) vs min(large): -(-3)=3 > 2
Move max from small to large: pop -3, push 3
small = [-1], large = [2, 3]
Balance sizes: small=1, large=2 (large > small)
Move from large to small: pop 2, push -2
small = [-2, -1], large = [3]
Sizes: small=2, large=1
findMedian():
Odd count: top of small = -(-2) = 2
Result: 2.0
Final state:
small (max heap) = [-2, -1] → represents [2, 1]
large (min heap) = [3]
Numbers in order: 1, 2, 3
Median: 2
Key Takeaways
- Heap Selection: Min heap for k largest, max heap for k smallest
- Size Matters: Use heap when k < n for efficiency
- Two Heap Technique: Powerful for running statistics
- Comparison Tuples: Use tuples in heap for complex comparisons
- Python heapq: Only provides min heap (negate for max heap)
Common Applications
Single Heap
- K-th largest/smallest elements
- Top K frequent elements
- Task scheduling by priority
- Dijkstra's shortest path
Two Heaps
- Running median
- Sliding window median
- Balanced partitioning
- Stream percentiles
Heap Merging
- Merge K sorted arrays
- Smallest range in K lists
- Find K pairs with smallest sums
Practice Tips
- Decide between min heap and max heap based on what needs to be at root
- Consider heap size: keep k elements vs all elements
- For two heap problems, define clear invariants to maintain
- Remember Python heapq is min heap only (use negatives for max)
- Use tuples for complex element comparisons
- Think about time complexity: heap operations are O(log n)
Template Pattern
import heapq
# Single heap for k-th element
def kth_element_template(arr, k):
"""Find kth largest using min heap."""
heap = []
for num in arr:
heapq.heappush(heap, num)
if len(heap) > k:
heapq.heappop(heap)
return heap[0]
# Two heaps for running median
class MedianTemplate:
"""Maintain running median."""
def __init__(self):
self.small = [] # max heap (negated)
self.large = [] # min heap
def add(self, num):
heapq.heappush(self.small, -num)
# Balance property
if self.small and self.large and (-self.small[0] > self.large[0]):
val = -heapq.heappop(self.small)
heapq.heappush(self.large, val)
# Balance sizes
if len(self.small) > len(self.large) + 1:
val = -heapq.heappop(self.small)
heapq.heappush(self.large, val)
elif len(self.large) > len(self.small):
val = heapq.heappop(self.large)
heapq.heappush(self.small, -val)
def get_median(self):
if len(self.small) > len(self.large):
return -self.small[0]
return (-self.small[0] + self.large[0]) / 2.0
Common Mistakes
- Forgetting Python heapq is min heap only
- Not maintaining heap invariants with two heaps
- Using heap when array fits in memory (could sort instead)
- Incorrect size balancing in two heap problems
- Not handling edge cases (empty heaps, single element)
- Wrong complexity analysis (remember heapify is O(n), not O(n log n))
Three Essential Real-World Applications
Beyond basic interview problems, heaps power critical algorithms in production systems. Here are three must-know applications:
Application 1: Finding Largest/Smallest K Elements
Heaps excel at maintaining the k largest or smallest elements from a stream or large dataset.
-
Real-World Use Cases
- Top trending topics in social media
- Top-performing employees or products
- Monitoring system metrics (top CPU consumers)
- Recommendation systems (top N items)
-
Why Heaps Win
- O(log k) insertion vs O(k) for sorted list
- O(1) access to kth element vs O(k) for unsorted array
- Memory efficient: only store k elements, not entire dataset
Implementation Pattern:
def top_k_frequent(nums, k):
"""
Find k most frequent elements using heap.
Time: O(n log k) - Better than O(n log n) full sort
Space: O(n) - frequency map + O(k) heap
"""
from collections import Counter
import heapq
# Count frequencies
freq = Counter(nums)
# Min heap of size k (stores (freq, num) pairs)
# Keep k largest frequencies
heap = []
for num, count in freq.items():
heapq.heappush(heap, (count, num))
if len(heap) > k:
heapq.heappop(heap)
# Extract numbers (discard frequencies)
return [num for count, num in heap]
# Example: Social media trending
posts = [1, 1, 1, 2, 2, 3, 4, 4, 4, 4]
print(top_k_frequent(posts, 2)) # [1, 4] - top 2 posts
Application 2: Graph Algorithms (Dijkstra's Shortest Path)
Heaps are fundamental to efficient graph algorithms, particularly for finding shortest paths.
- Real-World Use Cases
- GPS navigation and route planning
- Network routing protocols
- Game AI pathfinding
- Resource allocation in distributed systems
Dijkstra's Algorithm with Heap:
import heapq
from collections import defaultdict
def dijkstra(graph, start):
"""
Find shortest paths from start to all nodes.
Time: O((V + E) log V) with heap
O(V²) with simple array (worse for sparse graphs)
Space: O(V)
Args:
graph: dict of node -> [(neighbor, weight)]
start: starting node
Returns:
dict of node -> shortest distance from start
"""
# Distance to each node (infinity initially)
distances = {node: float('inf') for node in graph}
distances[start] = 0
# Min heap: (distance, node)
heap = [(0, start)]
visited = set()
while heap:
current_dist, node = heapq.heappop(heap)
# Skip if already processed
if node in visited:
continue
visited.add(node)
# Update neighbors
for neighbor, weight in graph[node]:
distance = current_dist + weight
# Found shorter path
if distance < distances[neighbor]:
distances[neighbor] = distance
heapq.heappush(heap, (distance, neighbor))
return distances
# Example: Road network
roads = {
'A': [('B', 4), ('C', 2)],
'B': [('C', 1), ('D', 5)],
'C': [('D', 8), ('E', 10)],
'D': [('E', 2)],
'E': []
}
distances = dijkstra(roads, 'A')
print(f"Shortest path A to E: {distances['E']}") # 11
- Why Heap Matters
- Without heap: Must scan all unvisited nodes to find minimum distance - O(V²)
- With heap: Extract minimum in O(log V) - O((V + E) log V)
- For sparse graphs (E << V²), heap version is dramatically faster
Application 3: Huffman Encoding (Data Compression)
Huffman encoding builds optimal prefix-free codes for data compression using a heap-based greedy algorithm.
- Real-World Use Cases
- File compression (ZIP, GZIP)
- Image formats (JPEG uses variant)
- Network data transmission
- Database compression
How It Works: 1. Count character frequencies 2. Build heap of nodes (frequency, character) 3. Repeatedly merge two smallest nodes into parent 4. Result: Binary tree where frequent chars have shorter codes
Implementation:
import heapq
from collections import Counter, namedtuple
Node = namedtuple('Node', ['freq', 'char', 'left', 'right'])
def huffman_encoding(text):
"""
Build Huffman tree and generate codes.
Time: O(n log n) - n chars, heap operations
Space: O(n) - tree nodes
"""
if not text:
return {}, None
# Count frequencies
freq = Counter(text)
# Build heap of leaf nodes
# Use negative freq for max heap behavior (most frequent first)
heap = [Node(f, ch, None, None) for ch, f in freq.items()]
heapq.heapify(heap)
# Build tree
while len(heap) > 1:
# Pop two smallest frequency nodes
left = heapq.heappop(heap)
right = heapq.heappop(heap)
# Create parent with combined frequency
parent = Node(
freq=left.freq + right.freq,
char=None, # Internal nodes have no character
left=left,
right=right
)
heapq.heappush(heap, parent)
# Root of tree
root = heap[0]
# Generate codes (DFS on tree)
codes = {}
def generate_codes(node, code=''):
if node.char is not None: # Leaf node
codes[node.char] = code or '0' # Single char gets '0'
else:
if node.left:
generate_codes(node.left, code + '0')
if node.right:
generate_codes(node.right, code + '1')
generate_codes(root)
return codes, root
def encode_text(text, codes):
"""Encode text using Huffman codes."""
return ''.join(codes[ch] for ch in text)
def decode_text(encoded, root):
"""Decode using Huffman tree."""
if not encoded:
return ''
result = []
node = root
for bit in encoded:
# Traverse tree
node = node.left if bit == '0' else node.right
# Reached leaf
if node.char is not None:
result.append(node.char)
node = root # Reset to root
return ''.join(result)
# Example
text = "huffman encoding example"
codes, tree = huffman_encoding(text)
print("Character codes:")
for char, code in sorted(codes.items()):
print(f" '{char}': {code}")
encoded = encode_text(text, codes)
print(f"\nOriginal: {len(text) * 8} bits")
print(f"Encoded: {len(encoded)} bits")
print(f"Compression: {(1 - len(encoded)/(len(text)*8)) * 100:.1f}%")
decoded = decode_text(encoded, tree)
assert decoded == text, "Encoding/decoding failed!"
-
Compression Analysis
- Fixed-length encoding: Each character = 8 bits (ASCII)
- Huffman encoding: Frequent characters get shorter codes
- Typical compression: 20-90% depending on text redundancy
-
Why Heap Is Essential
- Greedy algorithm requires repeatedly finding minimum frequency nodes
- Heap makes this O(log n) per merge
- Without heap: O(n) scan each time → O(n²) total
- With heap: O(n log n) total
Further Learning
The applications above provide comprehensive coverage of heap usage in real systems. For additional perspectives and examples:
- Advanced Heap Applications: The external resource "Three Must-Know Applications of Heap Data Structures" offers alternative explanations and additional edge cases for these same topics.
- Practice: Implement these algorithms from scratch, then optimize. Understanding why heaps are chosen (vs alternatives) is more valuable than memorizing code.