詳解 C-- 高性能無鎖隊列的原理與實現

  1. 無鎖隊列原理

1.1. 隊列操作模型

隊列是一種非常重要的數據結構,其特性是先進先出(FIFO),符合流水線業務流程。在進程間通信、網絡通信間經常採用隊列做緩存,緩解數據處理壓力。根據操作隊列的場景分爲:單生產者——單消費者、多生產者——單消費者、單生產者——多消費者、多生產者——多消費者四大模型。根據隊列中數據分爲:隊列中的數據是定長的、隊列中的數據是變長的。

(1)單生產者——單消費者

(2)多生產者——單消費者

(3)單生產者——多消費者

(4)多生產者——多消費者

(5)數據定長隊列

(6)數據變長隊列

1.2. 無鎖隊列簡介

生產環境中廣泛使用生產者和消費者模型,要求生產者在生產的同時,消費者可以進行消費,通常使用互斥鎖保證數據同步。但線程互斥鎖的開銷仍然比較大,因此在要求高性能、低延時場景中,推薦使用無鎖隊列。

1.3.CAS 操作

CAS 即 Compare and Swap,是所有 CPU 指令都支持 CAS 的原子操作(X86 中 CMPXCHG 彙編指令),用於實現實現各種無鎖(lock free)數據結構。

CAS 操作的 C 語言實現如下:

bool compare_and_swap ( int *memory_location, int expected_value, int new_value)
{
    if (*memory_location == expected_value)
    {
        *memory_location = new_value;
        return true;
    }
    return false;
}

CAS 用於檢查一個內存位置是否包含預期值,如果包含,則把新值復賦值到內存位置。成功返回 true,失敗返回 false。

(1)GGC 對 CAS 支持

GCC4.1 + 版本中支持 CAS 原子操作。

bool __sync_bool_compare_and_swap (type *ptr, type oldval type newval, ...);
 
type __sync_val_compare_and_swap (type *ptr, type oldval type newval, ...);

(2)Windows 對 CAS 支持

Windows 中使用 Windows API 支持 CAS。

LONG InterlockedCompareExchange(
  LONG volatile *Destination,
  LONG          ExChange,
  LONG          Comperand
);

(3)C++11 對 CAS 支持

C++11 STL 中 atomic 函數支持 CAS 並可以跨平臺。

template< class T >
bool atomic_compare_exchange_weak( std::atomic* obj,T* expected, T desired );
 
template< class T >
bool atomic_compare_exchange_weak( volatile std::atomic* obj,T* expected, T desired );

其它原子操作如下:

Fetch-And-Add:一般用來對變量做 + 1 的原子操作

Test-and-set:寫值到某個內存位置並傳回其舊值

  1. 無鎖隊列方案

2.1.boost 方案

boost 提供了三種無鎖方案,分別適用不同使用場景。

boost::lockfree::queue 是支持多個生產者和多個消費者線程的無鎖隊列。

boost::lockfree::stack 是支持多個生產者和多個消費者線程的無鎖棧。

boost::lockfree::spsc_queue 是僅支持單個生產者和單個消費者線程的無鎖隊列,比 boost::lockfree::queue 性能更好。

Boost 無鎖數據結構的 API 通過輕量級原子鎖實現 lock-free,不是真正意義的無鎖。

Boost 提供的 queue 可以設置初始容量,添加新元素時如果容量不夠,則總容量自動增長;但對於無鎖數據結構,添加新元素時如果容量不夠,總容量不會自動增長。

2.2.ConcurrentQueue

ConcurrentQueue 是基於 C++ 實現的工業級無鎖隊列方案。

GitHub:https://github.com/cameron314/concurrentqueue

ReaderWriterQueue 是基於 C++ 實現的單生產者單消費者場景的無鎖隊列方案。

GitHub:https://github.com/cameron314/readerwriterqueue

2.3.Disruptor

Disruptor 是英國外匯交易公司 LMAX 基於 JAVA 開發的一個高性能隊列。

GitHub:https://github.com/LMAX-Exchange/disruptor

  1. 無鎖隊列實現

3.1. 環形緩衝區

RingBuffer 是生產者和消費者模型中常用的數據結構,生產者將數據追加到數組尾端,當達到數組的尾部時,生產者繞回到數組的頭部;消費者從數組頭端取走數據,當到達數組的尾部時,消費者繞回到數組頭部。

如果只有一個生產者和一個消費者,環形緩衝區可以無鎖訪問,環形緩衝區的寫入 index 只允許生產者訪問並修改,只要生產者在更新 index 前將新的值保存到緩衝區中,則消費者將始終看到一致的數據結構;讀取 index 也只允許消費者訪問並修改,消費者只要在取走數據後更新讀 index,則生產者將始終看到一致的數據結構。

空隊列時,front 與 rear 相等;當有元素進隊,則 rear 後移;有元素出隊,則 front 後移。

空隊列時,rear 等於 front;滿隊列時,隊列尾部空一個位置,因此判斷循環隊列滿時使用 (rear-front+maxn)%maxn。

入隊操作:

data[rear] = x;
 
rear = (rear+1)%maxn;

出隊操作:

x = data[front];
 
front = (front+1)%maxn;

3.2. 單生產者單消費者

對於單生產者和單消費者場景,由於 read_index 和 write_index 都只會有一個線程寫,因此不需要加鎖也不需要原子操作,直接修改即可,但讀寫數據時需要考慮遇到數組尾部的情況。

線程對 write_index 和 read_index 的讀寫操作如下:

(1)寫操作。先判斷隊列時否爲滿,如果隊列未滿,則先寫數據,寫完數據後再修改 write_index。

(2)讀操作。先判斷隊列是否爲空,如果隊列不爲空,則先讀數據,讀完再修改 read_index。

3.3. 多生產者單消費者

多生產者和單消費者場景中,由於多個生產者都會修改 write_index,所以在不加鎖的情況下必須使用原子操作。

3.4.RingBuffer 實現

RingBuffer.hpp 文件:

#pragma once
 
template <class T>
class RingBuffer
{
public:
    RingBuffer(unsigned size): m_size(size), m_front(0), m_rear(0)
    {
        m_data = new T[size];
    }
 
    ~RingBuffer()
    {
        delete [] m_data;
        m_data = NULL;
    }
 
    inline bool isEmpty() const
    {
        return m_front == m_rear;
    }
 
    inline bool isFull() const
    {
        return m_front == (m_rear + 1) % m_size;
    }
 
    bool push(const T& value)
    {
        if(isFull())
        {
            return false;
        }
        m_data[m_rear] = value;
        m_rear = (m_rear + 1) % m_size;
        return true;
    }
 
    bool push(const T* value)
    {
        if(isFull())
        {
            return false;
        }
        m_data[m_rear] = *value;
        m_rear = (m_rear + 1) % m_size;
        return true;
    }
 
    inline bool pop(T& value)
    {
        if(isEmpty())
        {
            return false;
        }
        value = m_data[m_front];
        m_front = (m_front + 1) % m_size;
        return true;
    }
 
    inline unsigned int front()const
    {
        return m_front;
    }
 
    inline unsigned int rear()const
    {
        return m_rear;
    }
 
    inline unsigned int size()const 
    {
        return m_size;
    }
private:
    unsigned int m_size;// 隊列長度
    int m_front;// 隊列頭部索引
    int m_rear;// 隊列尾部索引
    T* m_data;// 數據緩衝區
};

RingBufferTest.cpp 測試代碼:

#include <stdio.h>
#include <thread>
#include <unistd.h>
#include <sys/time.h>
#include "RingBuffer.hpp"
 
 
class Test
{
public:
   Test(int id = 0, int value = 0)
   {
        this->id = id;
        this->value = value;
        sprintf(data, "id = %d, value = %d\n", this->id, this->value);
   }
 
   void display()
   {
      printf("%s", data);
   }
private:
   int id;
   int value;
   char data[128];
};
 
double getdetlatimeofday(struct timeval *begin, struct timeval *end)
{
    return (end->tv_sec + end->tv_usec * 1.0 / 1000000) -
           (begin->tv_sec + begin->tv_usec * 1.0 / 1000000);
}
 
RingBuffer<Test> queue(1 << 12);2u000
 
#define N (10 * (1 << 20))
 
void produce()
{
    struct timeval begin, end;
    gettimeofday(&begin, NULL);
    unsigned int i = 0;
    while(i < N)
    {
        if(queue.push(Test(i % 1024, i)))
        {
           i++;
        }
    }
 
    gettimeofday(&end, NULL);
    double tm = getdetlatimeofday(&begin, &end);
    printf("producer tid=%lu %f MB/s %f msg/s elapsed= %f size= %u\n", pthread_self(), N * sizeof(Test) * 1.0 / (tm * 1024 * 1024), N * 1.0 / tm, tm, i);
}
 
void consume()
{
    sleep(1);
    Test test;
    struct timeval begin, end;
    gettimeofday(&begin, NULL);
    unsigned int i = 0;
    while(i < N)
    {
        if(queue.pop(test))
        {
           // test.display();
           i++;
        }
    }
    gettimeofday(&end, NULL);
    double tm = getdetlatimeofday(&begin, &end);
    printf("consumer tid=%lu %f MB/s %f msg/s elapsed= %f, size=%u \n", pthread_self(), N * sizeof(Test) * 1.0 / (tm * 1024 * 1024), N * 1.0 / tm, tm, i);
}
 
int main(int argc, char const *argv[])
{
    std::thread producer1(produce);
    std::thread consumer(consume);
    producer1.join();
    consumer.join();
    return 0;
}

編譯:

g++ --std=c++11  RingBufferTest.cpp -o test -pthread

單生產者單消費者場景下,消息吞吐量爲 350 萬條 / 秒左右。

3.5.LockFreeQueue 實現

LockFreeQueue.hpp:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdbool.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/time.h>
#include <sys/mman.h>
 
#define SHM_NAME_LEN 128
#define MIN(a, b) ((a) > (b) ? (b) : (a))
#define IS_POT(x) ((x) && !((x) & ((x)-1)))
#define MEMORY_BARRIER __sync_synchronize()
 
template <class T>
class LockFreeQueue
{
protected:
    typedef struct
    {
        int m_lock;
        inline void spinlock_init()
        {
            m_lock = 0;
        }
 
        inline void spinlock_lock()
        {
            while(!__sync_bool_compare_and_swap(&m_lock, 0, 1)) {}
        }
 
        inline void spinlock_unlock()
        {
            __sync_lock_release(&m_lock);
        }
    } spinlock_t;
 
public:
    // size:隊列大小
    // name:共享內存key的路徑名稱,默認爲NULL,使用數組作爲底層緩衝區。
    LockFreeQueue(unsigned int size, const char* name = NULL)
    {
        memset(shm_name, 0, sizeof(shm_name));
        createQueue(name, size);
    }
 
    ~LockFreeQueue()
    {
        if(shm_name[0] == 0)
        {
            delete [] m_buffer;
            m_buffer = NULL;
        }
        else
        {
            if (munmap(m_buffer, m_size * sizeof(T)) == -1) {
                perror("munmap");
            }
            if (shm_unlink(shm_name) == -1) {
                perror("shm_unlink");
            }
        }
    }
 
    bool isFull()const
    {
#ifdef USE_POT
        return m_head == (m_tail + 1) & (m_size - 1);
#else
        return m_head == (m_tail + 1) % m_size;
#endif
    }
 
    bool isEmpty()const
    {
        return m_head == m_tail;
    }
 
    unsigned int front()const
    {
        return m_head;
    }
 
    unsigned int tail()const
    {
        return m_tail;
    }
 
    bool push(const T& value)
    {
#ifdef USE_LOCK
        m_spinLock.spinlock_lock();
#endif
        if(isFull())
        {
#ifdef USE_LOCK
            m_spinLock.spinlock_unlock();
#endif
            return false;
        }
        memcpy(m_buffer + m_tail, &value, sizeof(T));
#ifdef USE_MB
        MEMORY_BARRIER;
#endif
 
#ifdef USE_POT
        m_tail = (m_tail + 1) & (m_size - 1);
#else
        m_tail = (m_tail + 1) % m_size;
#endif
 
#ifdef USE_LOCK
        m_spinLock.spinlock_unlock();
#endif
        return true;
    }
 
    bool pop(T& value)
    {
#ifdef USE_LOCK
        m_spinLock.spinlock_lock();
#endif
        if (isEmpty())
        {
#ifdef USE_LOCK
            m_spinLock.spinlock_unlock();
#endif
            return false;
        }
        memcpy(&value, m_buffer + m_head, sizeof(T));
#ifdef USE_MB
        MEMORY_BARRIER;
#endif
 
#ifdef USE_POT
        m_head = (m_head + 1) & (m_size - 1);
#else
        m_head = (m_head + 1) % m_size;
#endif
 
#ifdef USE_LOCK
        m_spinLock.spinlock_unlock();
#endif
        return true;
    }
 
protected:
    virtual void createQueue(const char* name, unsigned int size)
    {
#ifdef USE_POT
        if (!IS_POT(size))
        {
            size = roundup_pow_of_two(size);
        }
#endif
        m_size = size;
        m_head = m_tail = 0;
        if(name == NULL)
        {
            m_buffer = new T[m_size];
        }
        else
        {
            int shm_fd = shm_open(name, O_CREAT | O_RDWR, 0666);
            if (shm_fd < 0)
            {
                perror("shm_open");
            }
 
            if (ftruncate(shm_fd, m_size * sizeof(T)) < 0)
            {
                perror("ftruncate");
                close(shm_fd);
            }
 
            void *addr = mmap(0, m_size * sizeof(T), PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0);
            if (addr == MAP_FAILED)
            {
                perror("mmap");
                close(shm_fd);
            }
            if (close(shm_fd) == -1)
            {
                perror("close");
                exit(1);
            }
 
            m_buffer = static_cast<T*>(addr);
            memcpy(shm_name, name, SHM_NAME_LEN - 1);
        }
#ifdef USE_LOCK
    spinlock_init(m_lock);
#endif
    }
    inline unsigned int roundup_pow_of_two(size_t size)
    {
        size |= size >> 1;
        size |= size >> 2;
        size |= size >> 4;
        size |= size >> 8;
        size |= size >> 16;
        size |= size >> 32;
        return size + 1;
    }
protected:
    char shm_name[SHM_NAME_LEN];
    volatile unsigned int m_head;
    volatile unsigned int m_tail;
    unsigned int m_size;
#ifdef USE_LOCK
    spinlock_t m_spinLock;
#endif
    T* m_buffer;
};

#define USE_LOCK

開啓 spinlock 鎖,多生產者多消費者場景

#define USE_MB

開啓 Memory Barrier

#define USE_POT

開啓隊列大小的 2 的冪對齊

LockFreeQueueTest.cpp 測試文件:

#include "LockFreeQueue.hpp"
#include <thread>
 
//#define USE_LOCK
 
class Test
{
public:
   Test(int id = 0, int value = 0)
   {
        this->id = id;
        this->value = value;
        sprintf(data, "id = %d, value = %d\n", this->id, this->value);
   }
 
   void display()
   {
      printf("%s", data);
   }
private:
   int id;
   int value;
   char data[128];
};
 
double getdetlatimeofday(struct timeval *begin, struct timeval *end)
{
    return (end->tv_sec + end->tv_usec * 1.0 / 1000000) -
           (begin->tv_sec + begin->tv_usec * 1.0 / 1000000);
}
 
LockFreeQueue<Test> queue(1 << 10, "/shm");
 
#define N ((1 << 20))
 
void produce()
{
    struct timeval begin, end;
    gettimeofday(&begin, NULL);
    unsigned int i = 0;
    while(i < N)
    {
        if(queue.push(Test(i >> 10, i)))
            i++;
    }
    gettimeofday(&end, NULL);
    double tm = getdetlatimeofday(&begin, &end);
    printf("producer tid=%lu %f MB/s %f msg/s elapsed= %f size= %u\n", pthread_self(), N * sizeof(Test) * 1.0 / (tm * 1024 * 1024), N * 1.0 / tm, tm, i);
}
 
void consume()
{
    Test test;
    struct timeval begin, end;
    gettimeofday(&begin, NULL);
    unsigned int i = 0;
    while(i < N)
    {
        if(queue.pop(test))
        {
           //test.display();
           i++;
        }
    }
    gettimeofday(&end, NULL);
    double tm = getdetlatimeofday(&begin, &end);
    printf("consumer tid=%lu %f MB/s %f msg/s elapsed= %f size= %u\n", pthread_self(), N * sizeof(Test) * 1.0 / (tm * 1024 * 1024), N * 1.0 / tm, tm, i);
}
 
int main(int argc, char const *argv[])
{
    std::thread producer1(produce);
    //std::thread producer2(produce);
    std::thread consumer(consume);
    producer1.join();
    //producer2.join();
    consumer.join();
 
    return 0;
}

多線程場景下,需要定義 USE_LOCK 宏,開啓鎖保護。

編譯:

g++ --std=c++11 -O3 LockFreeQueueTest.cpp -o test -lrt -pthread

4.kfifo 內核隊列


4.1.kfifo 內核隊列簡介

kfifo 是 Linux 內核的一個 FIFO 數據結構,採用環形循環隊列的數據結構來實現,提供一個無邊界的字節流服務,並且使用並行無鎖編程技術,即單生產者單消費者場景下兩個線程可以併發操作,不需要任何加鎖行爲就可以保證 kfifo 線程安全。

4.2.kfifo 內核隊列實現

kfifo 數據結構定義如下:

struct kfifo
{
    unsigned char *buffer;
    unsigned int size;
    unsigned int in;
    unsigned int out;
    spinlock_t *lock;
};
 
// 創建隊列
struct kfifo *kfifo_init(unsigned char *buffer, unsigned int size, gfp_t gfp_mask, spinlock_t *lock)
{
    struct kfifo *fifo;
    // 判斷是否爲2的冪
    BUG_ON(!is_power_of_2(size));
    fifo = kmalloc(sizeof(struct kfifo), gfp_mask);
    if (!fifo)
        return ERR_PTR(-ENOMEM);
    fifo->buffer = buffer;
    fifo->size = size;
    fifo->in = fifo->out = 0;
    fifo->lock = lock;
 
    return fifo;
}
 
// 分配空間
struct kfifo *kfifo_alloc(unsigned int size, gfp_t gfp_mask, spinlock_t *lock)
{
    unsigned char *buffer;
    struct kfifo *ret;
    // 判斷是否爲2的冪
    if (!is_power_of_2(size))
    {
        BUG_ON(size > 0x80000000);
        // 向上擴展成2的冪
        size = roundup_pow_of_two(size);
    }
 
    buffer = kmalloc(size, gfp_mask);
    if (!buffer)
        return ERR_PTR(-ENOMEM);
    ret = kfifo_init(buffer, size, gfp_mask, lock);
 
    if (IS_ERR(ret))
        kfree(buffer);
    return ret;
}
 
void kfifo_free(struct kfifo *fifo)
{
    kfree(fifo->buffer);
    kfree(fifo);
}
 
// 入隊操作
static inline unsigned int kfifo_put(struct kfifo *fifo, const unsigned char *buffer, unsigned int len)
{
    unsigned long flags;
    unsigned int ret;
    spin_lock_irqsave(fifo->lock, flags);
    ret = __kfifo_put(fifo, buffer, len);
    spin_unlock_irqrestore(fifo->lock, flags);
    return ret;
}
 
// 出隊操作
static inline unsigned int kfifo_get(struct kfifo *fifo, unsigned char *buffer, unsigned int len)
{
    unsigned long flags;
    unsigned int ret;
    spin_lock_irqsave(fifo->lock, flags);
    ret = __kfifo_get(fifo, buffer, len);
    //當fifo->in == fifo->out時,buufer爲空
    if (fifo->in == fifo->out)
        fifo->in = fifo->out = 0;
 
    spin_unlock_irqrestore(fifo->lock, flags);
    return ret;
}
 
// 入隊操作
unsigned int __kfifo_put(struct kfifo *fifo, const unsigned char *buffer, unsigned int len)
{
    unsigned int l;
    //buffer中空的長度
    len = min(len, fifo->size - fifo->in + fifo->out);
    // 內存屏障:smp_mb(),smp_rmb(), smp_wmb()來保證對方觀察到的內存操作順序
    smp_mb();
    // 將數據追加到隊列尾部
    l = min(len, fifo->size - (fifo->in & (fifo->size - 1)));
    memcpy(fifo->buffer + (fifo->in & (fifo->size - 1)), buffer, l);
    memcpy(fifo->buffer, buffer + l, len - l);
 
    smp_wmb();
    //每次累加,到達最大值後溢出,自動轉爲0
    fifo->in += len;
    return len;
}
 
// 出隊操作
unsigned int __kfifo_get(struct kfifo *fifo, unsigned char *buffer, unsigned int len)
{
    unsigned int l;
    //有數據的緩衝區的長度
    len = min(len, fifo->in - fifo->out);
    smp_rmb();
    l = min(len, fifo->size - (fifo->out & (fifo->size - 1)));
    memcpy(buffer, fifo->buffer + (fifo->out & (fifo->size - 1)), l);
    memcpy(buffer + l, fifo->buffer, len - l);
    smp_mb();
    fifo->out += len; //每次累加,到達最大值後溢出,自動轉爲0
 
    return len;
}
 
static inline void __kfifo_reset(struct kfifo *fifo)
{
    fifo->in = fifo->out = 0;
}
 
static inline void kfifo_reset(struct kfifo *fifo)
{
    unsigned long flags;
    spin_lock_irqsave(fifo->lock, flags);
    __kfifo_reset(fifo);
    spin_unlock_irqrestore(fifo->lock, flags);
}
 
static inline unsigned int __kfifo_len(struct kfifo *fifo)
{
    return fifo->in - fifo->out;
}
 
static inline unsigned int kfifo_len(struct kfifo *fifo)
{
    unsigned long flags;
    unsigned int ret;
    spin_lock_irqsave(fifo->lock, flags);
    ret = __kfifo_len(fifo);
    spin_unlock_irqrestore(fifo->lock, flags);
    return ret;
}

4.3.kfifo 設計要點

(1)保證 buffer size 爲 2 的冪

kfifo->size 值在調用者傳遞參數 size 的基礎上向 2 的冪擴展,目的是使 kfifo->size 取模運算可以轉化爲位與運算(提高運行效率)。kfifo->in % kfifo->size 轉化爲 kfifo->in & (kfifo->size – 1)

保證 size 是 2 的冪可以通過位運算的方式求餘,在頻繁操作隊列的情況下可以大大提高效率。

(2)使用 spin_lock_irqsave 與 spin_unlock_irqrestore 實現同步。

Linux 內核中有 spin_lock、spin_lock_irq 和 spin_lock_irqsave 保證同步。

static inline void __raw_spin_lock(raw_spinlock_t *lock)
{
    preempt_disable();
    spin_acquire(&lock->dep_map, 0, 0, _RET_IP_);
    LOCK_CONTENDED(lock, do_raw_spin_trylock, do_raw_spin_lock);
}
 
static inline void __raw_spin_lock_irq(raw_spinlock_t *lock)
{
    local_irq_disable();
    preempt_disable();
    spin_acquire(&lock->dep_map, 0, 0, _RET_IP_);
    LOCK_CONTENDED(lock, do_raw_spin_trylock, do_raw_spin_lock);
}

spin_lock 比 spin_lock_irq 速度快,但並不是線程安全的。spin_lock_irq 增加調用 local_irq_disable 函數,即禁止本地中斷,是線程安全的,既禁止本地中斷,又禁止內核搶佔。

spin_lock_irqsave 是基於 spin_lock_irq 實現的一個輔助接口,在進入和離開臨界區後,不會改變中斷的開啓、關閉狀態。

如果自旋鎖在中斷處理函數中被用到,在獲取自旋鎖前需要關閉本地中斷,spin_lock_irqsave 實現如下:

A、保存本地中斷狀態;

B、關閉本地中斷;

C、獲取自旋鎖。

解鎖時通過 spin_unlock_irqrestore 完成釋放鎖、恢復本地中斷到原來狀態等工作。

(3)線性代碼結構

代碼中沒有任何 if-else 分支來判斷是否有足夠的空間存放數據,kfifo 每次入隊或出隊只是簡單的 +len 判斷剩餘空間,並沒有對 kfifo->size 進行取模運算,所以 kfifo->in 和 kfifo->out 總是一直增大,直到 unsigned in 超過最大值時繞回到 0 這一起始端,但始終滿足:kfifo->in - kfifo->out <= kfifo->size。

(4)使用 Memory Barrier

mb():適用於多處理器和單處理器的內存屏障。

rmb():適用於多處理器和單處理器的讀內存屏障。

wmb():適用於多處理器和單處理器的寫內存屏障。

smp_mb():適用於多處理器的內存屏障。

smp_rmb():適用於多處理器的讀內存屏障。

smp_wmb():適用於多處理器的寫內存屏障。

Memory Barrier 使用場景如下:

A、實現同步原語(synchronization primitives)

B、實現無鎖數據結構(lock-free data structures)

C、驅動程序

程序在運行時內存實際訪問順序和程序代碼編寫的訪問順序不一定一致,即內存亂序訪問。內存亂序訪問行爲出現是爲了提升程序運行時的性能。內存亂序訪問主要發生在兩個階段:

A、編譯時,編譯器優化導致內存亂序訪問(指令重排)。

B、運行時,多 CPU 間交互引起內存亂序訪問。

Memory Barrier 能夠讓 CPU 或編譯器在內存訪問上有序。Memory barrier 前的內存訪問操作必定先於其後的完成。Memory Barrier 包括兩類:

A、編譯器 Memory Barrier。

B、CPU Memory Barrier。

通常,編譯器和 CPU 引起內存亂序訪問不會帶來問題,但如果程序邏輯的正確性依賴於內存訪問順序,內存亂序訪問會帶來邏輯上的錯誤。

在編譯時,編譯器對代碼做出優化時可能改變實際執行指令的順序(如 GCC 的 O2 或 O3 都會改變實際執行指令的順序)。

在運行時,CPU 雖然會亂序執行指令,但在單個 CPU 上,硬件能夠保證程序執行時所有的內存訪問操作都是按程序代碼編寫的順序執行的,Memory Barrier 沒有必要使用(不考慮編譯器優化)。爲了更快執行指令,CPU 採取流水線的執行方式,編譯器在編譯代碼時爲了使指令更適合 CPU 的流水線執行方式以及多 CPU 執行,原本指令就會出現亂序的情況。在亂序執行時,CPU 真正執行指令的順序由可用的輸入數據決定,而非程序員編寫的順序。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/2eg1Mco-h0G0epkNTKbeQw