Author: | Wojciech Muła |
---|---|
Added on: | 2025-01-18 |
Binary heap is a binary tree data structure having some interesting properties. One of them is an array-friendly memory layout, achieved by building (almost) complete binary tree.
A binary heap keeps at index 0 the maximum value, or the minimum one depending on convention — let's stick to maximum heaps. There is exactly one invariant: a child node, if exist, keep a value less than the parent node. For comparison, in the case of binary search trees, we have more strict rules: the left child keeps a smaller value than the parent's value, and the right child keeps a bigger value.
A non-root node stored at index i have the parent node at index floor[(i − 1)/2], and children nodes at indices 2 ⋅ i + 1 and 2 ⋅ i + 2.
In this text we cover two procedures related to heaps:
The procedure is_heap is vectorizable and using SIMD instructions brings profit. We also show that it is possible to define this function using forward iterators rather random iterators, as the C++ standard imposes.
The procedure push_heap can be expressed with gather and scatter, but performance is terrible. For the sake of completeness, we show the AVX-512 implementation.
There is also one more crucial method for heaps: removing the maximum value, pop_heap. However, it is difficult to vectorize, and benefits from vectorization would likely be worse than in the case of push_heap.
The easiest approach to check if the given array is a proper heap is scanning the array from beginning, calculate the left and right children indices and then compare their values according to the invariant. We stop scanning, when both children indices are out of the array bounds.
However, this algorithm is not suitable for vectorization. What we need is sequential scan over array. We need two pointers: one pointing parents and another pointing children nodes. The "parent" pointer start at index 0, and is incremented by 1. The "children" pointer nodes start at index 1, and is incremented by 2.
Below is C++ template using the modified algorithm. What the typename argument suggests, this is suitable for forward iterators, while the standard C++ functions from <algorithm> expect random iterators.
template <typename ForwardIterator, typename Compare = std::less<>> bool is_heap_fwd(ForwardIterator start, ForwardIterator end, Compare cmp) { if (start == end) return true; auto parent = start; auto current = std::next(start); while (current != end) { if (cmp(*parent, *current)) return false; current = std::next(current); if (current == end) break; if (cmp(*parent, *current)) return false; parent = std::next(parent); current = std::next(current); } return true; }
Below is a memory layout of a heap. We need to check if parents are greater than their children, that is: a > b and a > c, then b > d and b > e, then e > h, e > i, etc.
0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 ┌───┬───┬───┬───┬───┬───┬───┬───┬───┬───┬───┬───┬───┬───┬───┬───┬┈┈ │ a │ b │ c │ d │ e │ f │ g │ h │ i │ j │ k │ l │ m │ n │ o │ p │ └───┴───┴───┴───┴───┴───┴───┴───┴───┴───┴───┴───┴───┴───┴───┴───┴┈┈ ▲ ▲ ▲ ▲ ▲ ▲ ▲ ▲ │ │ │ │ │ │ │ │ │ ╰───╯ ╰───────────╯ ╰───────────────────────────╯ ╰─╴┈┈ │ level 1 level 2 level 3 ╵ tree level 0
We can observe that a single parent node is used twice, thus our vectorized approach would work in the following manner:
Load vector of parent nodes; let's assume vectors hold 8 elements.
0 1 2 3 4 5 6 7 ┌───┬───┬───┬───┬───┬───┬───┬───┐ p = │ a │ b │ c │ d │ e │ f │ g │ h │ └───┴───┴───┴───┴───┴───┴───┴───┘
Load two vectors of child nodes.
0 1 2 3 4 5 6 7 ┌───┬───┬───┬───┬───┬───┬───┬───┐ c0 = │ b │ c │ d │ e │ f │ g │ h │ i │ └───┴───┴───┴───┴───┴───┴───┴───┘ ┌───┬───┬───┬───┬───┬───┬───┬───┐ c1 = │ j │ k │ l │ m │ n │ o │ p │ r │ └───┴───┴───┴───┴───┴───┴───┴───┘
Double the parent nodes: each item appears them twice.
0 1 2 3 4 5 6 7 ┌───┬───┬───┬───┬───┬───┬───┬───┐ p0 = │ a │ a │ b │ b │ c │ c │ d │ d │ └───┴───┴───┴───┴───┴───┴───┴───┘ 0 1 2 3 4 5 6 7 ┌───┬───┬───┬───┬───┬───┬───┬───┐ p1 = │ e │ e │ f │ f │ g │ g │ h │ h │ └───┴───┴───┴───┴───┴───┴───┴───┘
Now both pairs p0/c0 and p1/c1 contain corresponding parent-child pairs, thus a plain vector comparison yield whether all pairs hold the relation parent-child.
0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 ┌───┬───┬───┬───┬───┬───┬───┬───┐ ┌───┬───┬───┬───┬───┬───┬───┬───┐ p0 = │ a │ a │ b │ b │ c │ c │ d │ d │ p1 = │ e │ e │ f │ f │ g │ g │ h │ h │ └───┴───┴───┴───┴───┴───┴───┴───┘ └───┴───┴───┴───┴───┴───┴───┴───┘ ┌───┬───┬───┬───┬───┬───┬───┬───┐ ┌───┬───┬───┬───┬───┬───┬───┬───┐ c0 = │ b │ c │ d │ e │ f │ g │ h │ i │ c1 = │ j │ k │ l │ m │ n │ o │ p │ r │ └───┴───┴───┴───┴───┴───┴───┴───┘ └───┴───┴───┴───┴───┴───┴───┴───┘
The vectorized AVX2 code follows this algorithm.
bool is_heap_avx2_epi32(const int32_t* begin, const int32_t* end) { const ssize_t k = 32/4; // words in a vector if (end - begin < 2 * k) { return std::is_heap(begin, end); } const int32_t* parent = begin; const int32_t* current = begin + 1; const __m256i lo = _mm256_setr_epi32(0, 0, 1, 1, 2, 2, 3, 3); const __m256i hi = _mm256_setr_epi32(4, 4, 5, 5, 6, 6, 7, 7); while (end - current >= 2 * k) { // 1. load parents const __m256i tmp = _mm256_loadu_si256((const __m256i*)parent); const __m256i p0 = _mm256_permutevar8x32_epi32(tmp, lo); const __m256i p1 = _mm256_permutevar8x32_epi32(tmp, hi); // 2. load children const __m256i children0 = _mm256_loadu_si256((const __m256i*)(current + 0)); const __m256i children1 = _mm256_loadu_si256((const __m256i*)(current + k)); // 3. compare parents with their children const __m256i lt0 = _mm256_cmpgt_epi32(children0, p0); const __m256i lt1 = _mm256_cmpgt_epi32(children1, p1); const __m256i t0 = _mm256_or_si256(lt0, lt1); if (_mm256_movemask_epi8(t0)) return false; if (current + 2*k > end) break; parent += k; current += 2*k; } for (ssize_t i = current - begin; i < end - begin; i++) { if (begin[(i - 1) / 2] < begin[i]) { return false; } } return true; }
The only downside of AVX2 is lack of comparison of unsigned 32-bit integers. Although it is possible to express relation less-or-equals with the min and equality: a ≤ b ⇔ min(a, b) = a.
The benchmark utility was compiled with -O3 -march=native and invoked with parameters ./benchmarks 1024 4096 8192, where the parameters are input sizes.
Procedure | Comments |
---|---|
std | standatd C++ std::is_heap with random access iterators |
fwd scalar | scalar implementation of the approach using forward iterators |
rnd scalar | reimplementation of std::is_heap with random access iterators |
fwd SSE | SSE implementation of the approach using forward iterators |
fwd AVX2 | AVX2 implementation of the approach using forward iterators |
fwd AVX-512 | AVX-512 implementation of the approach using forward iterators |
procedure | time in cycles per byte | speed-up | ||
---|---|---|---|---|
average | best | |||
Input size 1024 | ||||
std | 1.024 | 1.016 | 1.0 | █████▍ |
fwd scalar | 0.454 | 0.449 | 2.3 | ████████████▏ |
rnd scalar | 0.646 | 0.645 | 1.6 | ████████▍ |
fwd SSE | 0.229 | 0.215 | 4.7 | █████████████████████████▍ |
fwd AVX2 | 0.142 | 0.137 | 7.4 | ████████████████████████████████████████ |
Input size 4096 | ||||
std | 0.886 | 0.879 | 1.0 | ████▍ |
fwd scalar | 0.451 | 0.439 | 2.0 | ████████▉ |
rnd scalar | 0.443 | 0.439 | 2.0 | ████████▉ |
fwd SSE | 0.169 | 0.166 | 5.3 | ███████████████████████▌ |
fwd AVX2 | 0.103 | 0.098 | 9.0 | ████████████████████████████████████████ |
Input size 8192 | ||||
std | 0.892 | 0.881 | 1.0 | ████▍ |
fwd scalar | 0.452 | 0.439 | 2.0 | ████████▉ |
rnd scalar | 0.446 | 0.444 | 2.0 | ████████▊ |
fwd SSE | 0.168 | 0.166 | 5.3 | ███████████████████████▌ |
fwd AVX2 | 0.101 | 0.098 | 9.0 | ████████████████████████████████████████ |
procedure | time in cycles per byte | speed-up | ||
---|---|---|---|---|
average | best | |||
Input size 1024 | ||||
std | 2.529 | 2.509 | 1.0 | ████▍ |
fwd scalar | 2.021 | 2.013 | 1.2 | █████▍ |
rnd scalar | 1.033 | 1.026 | 2.4 | ██████████▊ |
fwd SSE | 0.584 | 0.574 | 4.4 | ███████████████████▏ |
fwd AVX2 | 0.350 | 0.328 | 7.6 | █████████████████████████████████▋ |
fwd AVX-512 | 0.286 | 0.276 | 9.1 | ████████████████████████████████████████ |
Input size 4096 | ||||
std | 2.567 | 2.498 | 1.0 | ██▉ |
fwd scalar | 2.008 | 2.000 | 1.2 | ███▋ |
rnd scalar | 1.021 | 1.005 | 2.5 | ███████▎ |
fwd SSE | 0.547 | 0.541 | 4.6 | █████████████▌ |
fwd AVX2 | 0.289 | 0.280 | 8.9 | ██████████████████████████▎ |
fwd AVX-512 | 0.190 | 0.184 | 13.6 | ████████████████████████████████████████ |
Input size 8192 | ||||
std | 2.566 | 2.497 | 1.0 | ██▋ |
fwd scalar | 2.006 | 1.998 | 1.2 | ███▎ |
rnd scalar | 1.011 | 1.001 | 2.5 | ██████▌ |
fwd SSE | 0.539 | 0.536 | 4.7 | ████████████▏ |
fwd AVX2 | 0.289 | 0.280 | 8.9 | ███████████████████████▍ |
fwd AVX-512 | 0.175 | 0.164 | 15.2 | ████████████████████████████████████████ |
procedure | time in cycles per byte | speed-up | ||
---|---|---|---|---|
average | best | |||
Input size 1024 | ||||
std | 4.499 | 2.584 | 1.0 | ███▋ |
fwd scalar | 1.576 | 1.096 | 2.4 | ████████▌ |
rnd scalar | 1.528 | 0.875 | 3.0 | ██████████▊ |
fwd SSE | 0.976 | 0.543 | 4.8 | █████████████████▍ |
fwd AVX2 | 0.551 | 0.297 | 8.7 | ███████████████████████████████▊ |
fwd AVX-512 | 0.356 | 0.236 | 10.9 | ████████████████████████████████████████ |
Input size 4096 | ||||
std | 4.497 | 2.725 | 1.0 | ██ |
fwd scalar | 1.630 | 1.196 | 2.3 | ████▊ |
rnd scalar | 1.531 | 1.123 | 2.4 | █████ |
fwd SSE | 0.954 | 0.563 | 4.8 | ██████████▏ |
fwd AVX2 | 0.496 | 0.286 | 9.5 | ████████████████████ |
fwd AVX-512 | 0.228 | 0.143 | 19.1 | ████████████████████████████████████████ |
Input size 8192 | ||||
std | 4.516 | 2.900 | 1.0 | ██▌ |
fwd scalar | 1.555 | 1.211 | 2.4 | ██████▏ |
rnd scalar | 1.507 | 1.083 | 2.7 | ██████▉ |
fwd SSE | 0.965 | 0.620 | 4.7 | ████████████ |
fwd AVX2 | 0.475 | 0.355 | 8.2 | █████████████████████ |
fwd AVX-512 | 0.220 | 0.187 | 15.5 | ███████████████████████████████████████▉ |
The procedure push_heap adds a new element to heap. We append the new value to the and of the array. We start from this index, and check whether the parent is greater. If it is, the heap invariant is hold, so no more work is needed. Otherwise, we swap the new item with the parent and recheck the swapped element with its new parent.
In each iteration we go up, until we reach the root.
The vectorized approach is based on the following observation: when we build a list of values from the new node one up to the root, then — for a proper heap — this list is sorted. Otherwise, we need to find position where to put this new value. Taking this form another angle, push_heap performs insertion sort on that list.
For example, when we're adding 11-th element of value X to the heap show below, then its parents are at indices: 5, 2, and 0. Thus, we're considering a list a, b, c, X and we already know that a > b, b > c.
┌───┐ │ a │ ◀─────────╴ level 0 └───┘ 0 ┌───┬───┐ │ b │ c │ ◀───────╴ level 1 └───┴───┘ 1 2 ┌───┬───┬───┬───┐ │ d │ e │ f │ g │ ◀───╴ level 2 └───┴───┴───┴───┘ 3 4 5 6 ┌───┬───┬───┬───┬───┐ │ h │ i │ j │ k │ X │ ◀───╴ level 3 └───┴───┴───┴───┴───┘ 7 8 9 10 11
Vectorized approach consists the following steps.
Prepare list of indices. We start at index n:
idx0 = n; idx1 = (idx0 - 1) >> 1; idx2 = (idx1 - 1) >> 1; idx3 = (idx2 - 1) >> 1; ... idx7 = (idx6 - 1) >> 1;
Perform gather using this list.
idx0 idx1 idx2 idx3 idx4 idx5 idx6 idx7
┌────┬────┬────┬────┬────┬────┬────┬────┐
values = │ 15 │ 2 │ 3 │ 7 │ 10 │ 13 │ 20 │ 30 │
└────┴────┴────┴────┴────┴────┴────┴────┘
▲
│
╰─╴ new value
Locate position where to insert the new value.
┌────┬────┬────┬────┬────┬────┬────┬────┐ new = │ 15 │ 15 │ 15 │ 15 │ 15 │ 15 │ 15 │ 15 │ └────┴────┴────┴────┴────┴────┴────┴────┘ ┌────┬────┬────┬────┬────┬────┬────┬────┐ values = │ 15 │ 2 │ 3 │ 7 │ 10 │ 13 │ 20 │ 30 │ └────┴────┴────┴────┴────┴────┴────┴────┘ mask = new > values ┌────┬────┬────┬────┬────┬────┬────┬────┐ mask = │ 00 │ ff │ ff │ ff │ ff │ ff │ 00 │ 00 │ └────┴────┴────┴────┴────┴────┴────┴────┘ 0 7
Vector mask is converted into a bitmask (0b0011_1110) and the number of leading zeroes uniquely identifies the shuffle pattern.
We load a precomputed shuffle pattern and perform shuffling.
┌────┬────┬────┬────┬────┬────┬────┬────┐
values' = │ 2 │ 3 │ 7 │ 10 │ 13 │ 15 │ 20 │ 30 │
└────┴────┴────┴────┴────┴────┴────┴────┘
▲
│
new value
Finally we scatter the vector to initial positions in the heap.
Repeat when the new item was placed on the last position after shuffle.
First of all, this algorithm is not suitable for AVX2, as AVX2 does not support scatters. The AVX-512 implementation is shown below.
void push_heap_avx2(int32_t* start, size_t size) { if (size <= 255) { std::push_heap(start, start + size); return; } size_t index = size - 1; while (index >= 256) { // 1. Construct indices from the current element to parent nodes 7 levels up ssize_t parent = 0; uint32_t tmp[8]; tmp[0] = index; for (int i=1; i < 8; i++) { parent = (index - 1)/2; tmp[i] = parent; index = parent; } const __m256i indices = _mm256_load_si256((const __m256i*)tmp); // 2. Load values from the selected path const __m256i values = _mm256_i32gather_epi32((const int*)start, indices, sizeof(uint32_t)); // 3. Broadcast 0th element from the vector const __m256i last_value = _mm256_permutevar8x32_epi32(values, _mm256_setzero_si256()); // 3. Check if the heap property is violated. const __m256i mask = _mm256_cmpgt_epi32(values, last_value); const uint8_t any_parent_less = _mm256_movemask_ps(_mm256_castsi256_ps(mask)); if (any_parent_less == 0) { const __m256i sorted = _mm256_permutevar8x32_epi32(values, avx2_sort_values[7]); _mm256_i32scatter_epi32(start, indices, sorted, sizeof(uint32_t)); index = tmp[7]; continue; } // 4. Elements on the path be should sorted, we need to locate where to insert a new value. const int new_index = __builtin_ctz(any_parent_less) - 1; if (new_index > 0) { const __m256i sorted = _mm256_permutevar8x32_epi32(values, avx2_sort_values[new_index]); _mm256_i32scatter_epi32(start, indices, sorted, sizeof(uint32_t)); } } std::push_heap(start, start + index + 1); }
This cannot be faster than a scalar implementation. In the worst case a scalar code execute log2n memory loads, comparisons and swaps. In best case only two memory loads and one comparison is needed.
The vectorized code needs to construct vector of indices, than at least one gather and scatter which are slow. My experiments on quite decent Ice Lake machine showed 10x slowdowns, which makes this approach unpractical (although it is nice).
Sample implementation is available at GitHub.