并行无锁基数排序

2018-03-13

概述

以 LSD(least significant digit)为例,单线程的基数排序可以分为三个阶段:

  • Histogram Phase: Scan a subset of the bits (length of the radix) of all elements in the list, and count the number of occurrences of each digit – place these in a histogram.
  • Prefix-sum Phase: Perform a prefix-sum on the histogram to find out where each digit should go inside the overall list.
  • Data-distribution Phase: Distribute each key-value pair to the position indicated by the prefix-sum array, and update this array so the next occurrence of that digit goes in the subsequent location.

单线程实现

以 256($2^8$) 作为基数,单线程的基数排序 C++ 实现如下:

void singlethread(int dim, int *src, int *dst) {
    const int MASK = (1 << 8) - 1;
    const int BUCKET_SIZE = 257;
    int *_src = src, *_dst = dst;
    unsigned int buckets[BUCKET_SIZE];
    unsigned int sum[BUCKET_SIZE];
    int index = 0, out_index;

    for (int iter = 0, offset = 0; iter < 4; iter++, offset += 8) {
        memset(buckets, 0, BUCKET_SIZE * sizeof(int));
        memset(sum, 0, BUCKET_SIZE * sizeof(int));
        //1. Count elements in bucket
        for (int i = 0; i < dim; i++) {
            index = ((_src[i] >> offset) & MASK) + 1;
            buckets[index]++;
        }
        //2. Perform scan
        for (int i = 1; i < BUCKET_SIZE; i++)
            sum[i] = buckets[i] + sum[i - 1];

        //3. Move data
        for (int i = 0; i < dim; i++) {
            index = (_src[i] >> offset) & MASK;
            out_index = sum[index]++;
            _dst[out_index] = _src[i];
        }
        int *temp = _src;
        _src = _dst;
        _dst = temp;
    }
    for (int i = 0; i < dim; i++)
        dst[i] = src[i];
}

多线程实现

我们可以使用多线程对基数排序的阶段一和阶段三并行计算。计算过程如下:

  1. 对输入数组的进行分段,每一个线程独立计算自己这部分的 histogram
  2. 主线程等待线程计算完成 histogram,然后计算各自线程需要的 prefix sum
  3. 各个线程独立进行数据复制

这要求每个线程都有自己的 bucketssum 数组,可以使用 pthread_barrier_t 进行同步,实现如下:

typedef struct {
    int row; // which row does this thread use
    int startIndex;
    int endIndex;
} thread_args;

pthread_t threads[32];
int *_gsrc;
int *_gdst;
int _gbuckets[32][256];
int _gsum[32][256];
thread_args args[32];
pthread_barrier_t barrier;
const int BUCKET_SIZE = 1 << 8;
const int LOG_RADIX = 8;

void *thread_sort(void *sort_arg) {
    thread_args *args = (thread_args *) sort_arg;
    int row = args->row;
    int startIndex = args->startIndex;
    int endIndex = args->endIndex;
    for (int iter = 0; iter < 4; iter++) {
        const int offset = iter * LOG_RADIX;
        memset(_gbuckets[row], 0, BUCKET_SIZE * sizeof(int));
        memset(_gsum[row], 0, BUCKET_SIZE * sizeof(int));
        for (int i = startIndex, index = 0; i < endIndex; i++) {
            index = ((_gsrc[i] >> offset) & 255);
            _gbuckets[row][index]++;
        }
        pthread_barrier_wait(&barrier);
        pthread_barrier_wait(&barrier);

        int index = 0, out_index = 0;
        for (int i = startIndex; i < endIndex; i++) {
            index = ((_gsrc[i] >> offset) & 255);
            out_index = _gsum[row][index];
            _gdst[out_index] = _gsrc[i];
            _gsum[row][index]++;
        }
        pthread_barrier_wait(&barrier);
        pthread_barrier_wait(&barrier);
    }
    for (int i = startIndex; i < endIndex; i++)
        _gdst[i] = _gsrc[i];
    return 0;
}

const int segment_threshold = 65536;

void multithread(int dim, int *src, int *dst) {
    if (dim <= segment_threshold) { // size too small, no need to create threads
        singlethread(dim, src, dst);
        return;
    }
    int max_threads_num = 32;
    _gsrc = src;
    _gdst = dst;
    pthread_barrier_init(&barrier, NULL, max_threads_num + 1);
    int segment = dim / max_threads_num, j = 0;
    for (j = 0; j < max_threads_num; j++) {
        thread_args arg = {j, j * segment, MIN((j + 1) * segment, dim)};
        args[j] = arg;
        pthread_create(&threads[j], NULL, thread_sort, &args[j]);
    }
    for (int i = 0; i < 4; i++) {
        pthread_barrier_wait(&barrier);
        int index = 0;
        for (j = 0; j < BUCKET_SIZE; j++) {
            for (int col = 0; col < max_threads_num; col++) {
                _gsum[col][j] = index;
                index += _gbuckets[col][j];
            }
        }
        pthread_barrier_wait(&barrier);
        pthread_barrier_wait(&barrier);
        int *temp = _gdst;
        _gdst = _gsrc;
        _gsrc = temp;
        pthread_barrier_wait(&barrier);
    }
    for (j = 0; j < max_threads_num; j++)
        pthread_join(threads[j], NULL);
    pthread_barrier_destroy(&barrier);
}

参考

  1. Radix sort
  2. pthread_barrier_wait