SIMD binary heap operations

Author: Wojciech Muła
Added on:2025-01-18

Contents

Introduction

Binary heap is a binary tree data structure having some interesting properties. One of them is an array-friendly memory layout, achieved by building (almost) complete binary tree.

A binary heap keeps at index 0 the maximum value, or the minimum one depending on convention — let's stick to maximum heaps. There is exactly one invariant: a child node, if exist, keep a value less than the parent node. For comparison, in the case of binary search trees, we have more strict rules: the left child keeps a smaller value than the parent's value, and the right child keeps a bigger value.

A non-root node stored at index i have the parent node at index floor[(i − 1)/2], and children nodes at indices 2 ⋅ i + 1 and 2 ⋅ i + 2.

In this text we cover two procedures related to heaps:

The procedure is_heap is vectorizable and using SIMD instructions brings profit. We also show that it is possible to define this function using forward iterators rather random iterators, as the C++ standard imposes.

The procedure push_heap can be expressed with gather and scatter, but performance is terrible. For the sake of completeness, we show the AVX-512 implementation.

There is also one more crucial method for heaps: removing the maximum value, pop_heap. However, it is difficult to vectorize, and benefits from vectorization would likely be worse than in the case of push_heap.

is_heap

The easiest approach to check if the given array is a proper heap is scanning the array from beginning, calculate the left and right children indices and then compare their values according to the invariant. We stop scanning, when both children indices are out of the array bounds.

However, this algorithm is not suitable for vectorization. What we need is sequential scan over array. We need two pointers: one pointing parents and another pointing children nodes. The "parent" pointer start at index 0, and is incremented by 1. The "children" pointer nodes start at index 1, and is incremented by 2.

Below is C++ template using the modified algorithm. What the typename argument suggests, this is suitable for forward iterators, while the standard C++ functions from <algorithm> expect random iterators.

template <typename ForwardIterator, typename Compare = std::less<>>
bool is_heap_fwd(ForwardIterator start, ForwardIterator end, Compare cmp) {
    if (start == end)
        return true;

    auto parent  = start;
    auto current = std::next(start);
    while (current != end) {
        if (cmp(*parent, *current))
            return false;

        current = std::next(current);
        if (current == end)
            break;

        if (cmp(*parent, *current))
            return false;

        parent = std::next(parent);
        current = std::next(current);
    }

    return true;
}

Below is a memory layout of a heap. We need to check if parents are greater than their children, that is: a > b and a > c, then b > d and b > e, then e > h, e > i, etc.

  0   1   2   3   4   5   6   7   8   9  10  11  12  13  14  15
┌───┬───┬───┬───┬───┬───┬───┬───┬───┬───┬───┬───┬───┬───┬───┬───┬┈┈
│ a │ bcdefghijklmnop │
└───┴───┴───┴───┴───┴───┴───┴───┴───┴───┴───┴───┴───┴───┴───┴───┴┈┈
  ▲   ▲   ▲   ▲           ▲   ▲                           ▲   ▲
  │   │   │   │           │   │                           │   │
  │   ╰───╯   ╰───────────╯   ╰───────────────────────────╯   ╰─╴┈┈
  │  level 1     level 2                level 3
  ╵
 tree level 0

We can observe that a single parent node is used twice, thus our vectorized approach would work in the following manner:

  1. Load vector of parent nodes; let's assume vectors hold 8 elements.

           0   1   2   3   4   5   6   7
         ┌───┬───┬───┬───┬───┬───┬───┬───┐
    p  = │ a │ bcdefgh │
         └───┴───┴───┴───┴───┴───┴───┴───┘
  2. Load two vectors of child nodes.

           0   1   2   3   4   5   6   7
         ┌───┬───┬───┬───┬───┬───┬───┬───┐
    c0 = │ bcdefghi │
         └───┴───┴───┴───┴───┴───┴───┴───┘
    
         ┌───┬───┬───┬───┬───┬───┬───┬───┐
    c1 = │ jklmnopr │
         └───┴───┴───┴───┴───┴───┴───┴───┘
  3. Double the parent nodes: each item appears them twice.

           0   1   2   3   4   5   6   7
         ┌───┬───┬───┬───┬───┬───┬───┬───┐
    p0 = │ a │ a │ bbccdd │
         └───┴───┴───┴───┴───┴───┴───┴───┘
    
           0   1   2   3   4   5   6   7
         ┌───┬───┬───┬───┬───┬───┬───┬───┐
    p1 = │ eeffgghh │
         └───┴───┴───┴───┴───┴───┴───┴───┘
  4. Now both pairs p0/c0 and p1/c1 contain corresponding parent-child pairs, thus a plain vector comparison yield whether all pairs hold the relation parent-child.

           0   1   2   3   4   5   6   7            0   1   2   3   4   5   6   7
         ┌───┬───┬───┬───┬───┬───┬───┬───┐        ┌───┬───┬───┬───┬───┬───┬───┬───┐
    p0 = │ a │ a │ bbccdd │   p1 = │ eeffgghh │
         └───┴───┴───┴───┴───┴───┴───┴───┘        └───┴───┴───┴───┴───┴───┴───┴───┘
    
         ┌───┬───┬───┬───┬───┬───┬───┬───┐        ┌───┬───┬───┬───┬───┬───┬───┬───┐
    c0 = │ bcdefghi │   c1 = │ jklmnopr │
         └───┴───┴───┴───┴───┴───┴───┴───┘        └───┴───┴───┴───┴───┴───┴───┴───┘

The vectorized AVX2 code follows this algorithm.

bool is_heap_avx2_epi32(const int32_t* begin, const int32_t* end) {
    const ssize_t k = 32/4; // words in a vector

    if (end - begin < 2 * k) {
        return std::is_heap(begin, end);
    }

    const int32_t* parent = begin;
    const int32_t* current = begin + 1;

    const __m256i lo = _mm256_setr_epi32(0, 0, 1, 1, 2, 2, 3, 3);
    const __m256i hi = _mm256_setr_epi32(4, 4, 5, 5, 6, 6, 7, 7);

    while (end - current >= 2 * k) {
        // 1. load parents
        const __m256i tmp = _mm256_loadu_si256((const __m256i*)parent);
        const __m256i p0  = _mm256_permutevar8x32_epi32(tmp, lo);
        const __m256i p1  = _mm256_permutevar8x32_epi32(tmp, hi);

        // 2. load children
        const __m256i children0 = _mm256_loadu_si256((const __m256i*)(current + 0));
        const __m256i children1 = _mm256_loadu_si256((const __m256i*)(current + k));

        // 3. compare parents with their children
        const __m256i lt0 = _mm256_cmpgt_epi32(children0, p0);
        const __m256i lt1 = _mm256_cmpgt_epi32(children1, p1);
        const __m256i t0 = _mm256_or_si256(lt0, lt1);

        if (_mm256_movemask_epi8(t0))
            return false;

        if (current + 2*k > end)
            break;

        parent  += k;
        current += 2*k;
    }

    for (ssize_t i = current - begin; i < end - begin; i++) {
        if (begin[(i - 1) / 2] < begin[i]) {
            return false;
        }
    }

    return true;
}

The only downside of AVX2 is lack of comparison of unsigned 32-bit integers. Although it is possible to express relation less-or-equals with the min and equality: ab ⇔ min(a, b) = a.

Experiments

The benchmark utility was compiled with -O3 -march=native and invoked with parameters ./benchmarks 1024 4096 8192, where the parameters are input sizes.

Benchmarked procedures
Procedure Comments
std standatd C++ std::is_heap with random access iterators
fwd scalar scalar implementation of the approach using forward iterators
rnd scalar reimplementation of std::is_heap with random access iterators
fwd SSE SSE implementation of the approach using forward iterators
fwd AVX2 AVX2 implementation of the approach using forward iterators
fwd AVX-512 AVX-512 implementation of the approach using forward iterators

Ryzen 7

  • Compiler: gcc (Debian 14.1.0-5) 14.1.0
  • CPU: AMD Ryzen 7 7730U with Radeon Graphics
procedure time in cycles per byte speed-up
  average best    
Input size 1024
std 1.024 1.016 1.0 █████▍
fwd scalar 0.454 0.449 2.3 ████████████▏
rnd scalar 0.646 0.645 1.6 ████████▍
fwd SSE 0.229 0.215 4.7 █████████████████████████▍
fwd AVX2 0.142 0.137 7.4 ████████████████████████████████████████
Input size 4096
std 0.886 0.879 1.0 ████▍
fwd scalar 0.451 0.439 2.0 ████████▉
rnd scalar 0.443 0.439 2.0 ████████▉
fwd SSE 0.169 0.166 5.3 ███████████████████████▌
fwd AVX2 0.103 0.098 9.0 ████████████████████████████████████████
Input size 8192
std 0.892 0.881 1.0 ████▍
fwd scalar 0.452 0.439 2.0 ████████▉
rnd scalar 0.446 0.444 2.0 ████████▊
fwd SSE 0.168 0.166 5.3 ███████████████████████▌
fwd AVX2 0.101 0.098 9.0 ████████████████████████████████████████

Skylake-X

  • Compiler: gcc (GCC) 11.2.0
  • CPU: Intel(R) Xeon(R) W-2104 CPU @ 3.20GHz
procedure time in cycles per byte speed-up
  average best    
Input size 1024
std 2.529 2.509 1.0 ████▍
fwd scalar 2.021 2.013 1.2 █████▍
rnd scalar 1.033 1.026 2.4 ██████████▊
fwd SSE 0.584 0.574 4.4 ███████████████████▏
fwd AVX2 0.350 0.328 7.6 █████████████████████████████████▋
fwd AVX-512 0.286 0.276 9.1 ████████████████████████████████████████
Input size 4096
std 2.567 2.498 1.0 ██▉
fwd scalar 2.008 2.000 1.2 ███▋
rnd scalar 1.021 1.005 2.5 ███████▎
fwd SSE 0.547 0.541 4.6 █████████████▌
fwd AVX2 0.289 0.280 8.9 ██████████████████████████▎
fwd AVX-512 0.190 0.184 13.6 ████████████████████████████████████████
Input size 8192
std 2.566 2.497 1.0 ██▋
fwd scalar 2.006 1.998 1.2 ███▎
rnd scalar 1.011 1.001 2.5 ██████▌
fwd SSE 0.539 0.536 4.7 ████████████▏
fwd AVX2 0.289 0.280 8.9 ███████████████████████▍
fwd AVX-512 0.175 0.164 15.2 ████████████████████████████████████████

Ice Lake

  • Compiler: gcc (GCC) 13.3.1 20240611 (Red Hat 13.3.1-2)
  • CPU: Intel(R) Xeon(R) Gold 6338 CPU @ 2.00GHz
procedure time in cycles per byte speed-up
  average best    
Input size 1024
std 4.499 2.584 1.0 ███▋
fwd scalar 1.576 1.096 2.4 ████████▌
rnd scalar 1.528 0.875 3.0 ██████████▊
fwd SSE 0.976 0.543 4.8 █████████████████▍
fwd AVX2 0.551 0.297 8.7 ███████████████████████████████▊
fwd AVX-512 0.356 0.236 10.9 ████████████████████████████████████████
Input size 4096
std 4.497 2.725 1.0 ██
fwd scalar 1.630 1.196 2.3 ████▊
rnd scalar 1.531 1.123 2.4 █████
fwd SSE 0.954 0.563 4.8 ██████████▏
fwd AVX2 0.496 0.286 9.5 ████████████████████
fwd AVX-512 0.228 0.143 19.1 ████████████████████████████████████████
Input size 8192
std 4.516 2.900 1.0 ██▌
fwd scalar 1.555 1.211 2.4 ██████▏
rnd scalar 1.507 1.083 2.7 ██████▉
fwd SSE 0.965 0.620 4.7 ████████████
fwd AVX2 0.475 0.355 8.2 █████████████████████
fwd AVX-512 0.220 0.187 15.5 ███████████████████████████████████████▉

push_heap

The procedure push_heap adds a new element to heap. We append the new value to the and of the array. We start from this index, and check whether the parent is greater. If it is, the heap invariant is hold, so no more work is needed. Otherwise, we swap the new item with the parent and recheck the swapped element with its new parent.

In each iteration we go up, until we reach the root.

The vectorized approach is based on the following observation: when we build a list of values from the new node one up to the root, then — for a proper heap — this list is sorted. Otherwise, we need to find position where to put this new value. Taking this form another angle, push_heap performs insertion sort on that list.

For example, when we're adding 11-th element of value X to the heap show below, then its parents are at indices: 5, 2, and 0. Thus, we're considering a list a, b, c, X and we already know that a > b, b > c.

          ┌───┐
          │ a◀─────────╴ level 0
          └───┘
            0
        ┌───┬───┐
        │ b │ c◀───────╴ level 1
        └───┴───┘
          1   2
    ┌───┬───┬───┬───┐
    │ d │ e │ f │ g │  ◀───╴ level 2
    └───┴───┴───┴───┘
      3   4   5   6
┌───┬───┬───┬───┬───┐
│ h │ i │ j │ k │ X◀───╴ level 3
└───┴───┴───┴───┴───┘
  7   8   9  10  11

Vectorized approach consists the following steps.

  1. Prepare list of indices. We start at index n:

    idx0 = n;
    idx1 = (idx0 - 1) >> 1;
    idx2 = (idx1 - 1) >> 1;
    idx3 = (idx2 - 1) >> 1;
    ...
    idx7 = (idx6 - 1) >> 1;
    
  2. Perform gather using this list.

              idx0 idx1 idx2 idx3 idx4 idx5 idx6 idx7
             ┌────┬────┬────┬────┬────┬────┬────┬────┐
    values = │ 15 │  2 │  3 │  7 │ 10 │ 13 │ 20 │ 30 │
             └────┴────┴────┴────┴────┴────┴────┴────┘
               ▲
               │
               ╰─╴ new value
  3. Locate position where to insert the new value.

             ┌────┬────┬────┬────┬────┬────┬────┬────┐
    new    = │ 1515151515151515 │
             └────┴────┴────┴────┴────┴────┴────┴────┘
             ┌────┬────┬────┬────┬────┬────┬────┬────┐
    values = │ 15 │  2 │  3 │  7 │ 10 │ 13 │ 20 │ 30 │
             └────┴────┴────┴────┴────┴────┴────┴────┘
    
                    mask = new > values
    
             ┌────┬────┬────┬────┬────┬────┬────┬────┐
    mask   = │ 00 │ ff │ ff │ ff │ ff │ ff │ 00 │ 00 │
             └────┴────┴────┴────┴────┴────┴────┴────┘
                0                                  7
  4. Vector mask is converted into a bitmask (0b0011_1110) and the number of leading zeroes uniquely identifies the shuffle pattern.

  5. We load a precomputed shuffle pattern and perform shuffling.

              ┌────┬────┬────┬────┬────┬────┬────┬────┐
    values' = │  2 │  3 │  7 │ 10 │ 13 │ 15 │ 20 │ 30 │
              └────┴────┴────┴────┴────┴────┴────┴────┘
                                          ▲
                                          │
                                       new value
  6. Finally we scatter the vector to initial positions in the heap.

  7. Repeat when the new item was placed on the last position after shuffle.

First of all, this algorithm is not suitable for AVX2, as AVX2 does not support scatters. The AVX-512 implementation is shown below.

void push_heap_avx2(int32_t* start, size_t size) {
    if (size <= 255) {
        std::push_heap(start, start + size);
        return;
    }

    size_t index = size - 1;
    while (index >= 256) {
        // 1. Construct indices from the current element to parent nodes 7 levels up
        ssize_t parent = 0;

        uint32_t tmp[8];
        tmp[0] = index;
        for (int i=1; i < 8; i++) {
            parent = (index - 1)/2;
            tmp[i] = parent;
            index = parent;
        }

        const __m256i indices = _mm256_load_si256((const __m256i*)tmp);

        // 2. Load values from the selected path
        const __m256i values = _mm256_i32gather_epi32((const int*)start, indices, sizeof(uint32_t));

        // 3. Broadcast 0th element from the vector
        const __m256i last_value = _mm256_permutevar8x32_epi32(values, _mm256_setzero_si256());

        // 3. Check if the heap property is violated.
        const __m256i mask = _mm256_cmpgt_epi32(values, last_value);

        const uint8_t any_parent_less = _mm256_movemask_ps(_mm256_castsi256_ps(mask));
        if (any_parent_less == 0) {
            const __m256i sorted = _mm256_permutevar8x32_epi32(values, avx2_sort_values[7]);
            _mm256_i32scatter_epi32(start, indices, sorted, sizeof(uint32_t));
            index = tmp[7];
            continue;
        }

        // 4. Elements on the path be should sorted, we need to locate where to insert a new value.
        const int new_index = __builtin_ctz(any_parent_less) - 1;
        if (new_index > 0) {
            const __m256i sorted = _mm256_permutevar8x32_epi32(values, avx2_sort_values[new_index]);
            _mm256_i32scatter_epi32(start, indices, sorted, sizeof(uint32_t));
        }
    }

    std::push_heap(start, start + index + 1);
}

This cannot be faster than a scalar implementation. In the worst case a scalar code execute log2n memory loads, comparisons and swaps. In best case only two memory loads and one comparison is needed.

The vectorized code needs to construct vector of indices, than at least one gather and scatter which are slow. My experiments on quite decent Ice Lake machine showed 10x slowdowns, which makes this approach unpractical (although it is nice).

See also

Source code

Sample implementation is available at GitHub.