博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
基于folly的AtomicIntrusiveLinkedList无锁队列进行简单封装的多生产多消费模型
阅读量:4554 次
发布时间:2019-06-08

本文共 12592 字,大约阅读时间需要 41 分钟。

1.基于folly的AtomicIntrusiveLinkedList略微修改的无锁队列代码:

#ifndef FOLLY_REVISE_H#define FOLLY_REVISE_Hnamespace folly {    /**    * A very simple atomic single-linked list primitive    *    */    template 
struct node { T data; node* next; node(const T& data) : data(data), next(nullptr) { } node(T&& data): data(std::move(data)), next(nullptr) { } }; template
class AtomicForwardList { public: AtomicForwardList() { } AtomicForwardList(const AtomicForwardList&) = delete; AtomicForwardList& operator=(const AtomicForwardList&) = delete; AtomicForwardList(AtomicForwardList&& other) noexcept : head_(other.head_.load()) { other.head_ = nullptr; } AtomicForwardList& operator=(AtomicForwardList&& other) noexcept { AtomicForwardList tmp(std::move(other)); swap(*this, tmp); return *this; } /** * Note: list must be empty on destruction. */ ~AtomicForwardList() { assert(empty()); } bool empty() const { return head_.load() == nullptr; } /** * Atomically insert t at the head of the list. * @return True if the inserted element is the only one in the list * after the call. */ bool insertHead(T* t) { assert(t->next == nullptr); auto oldHead = head_.load(std::memory_order_relaxed); do { t->next = oldHead; /* oldHead is updated by the call below. NOTE: we don't use next(t) instead of oldHead directly due to compiler bugs (GCC prior to 4.8.3 (bug 60272), clang (bug 18899), MSVC (bug 819819); source: http://en.cppreference.com/w/cpp/atomic/atomic/compare_exchange */ } while (!head_.compare_exchange_weak(oldHead, t, std::memory_order_release, std::memory_order_relaxed)); return oldHead == nullptr; } /** * Replaces the head with nullptr, * and calls func() on the removed elements in the order from tail to head * Returns false if the list was empty. */ template
bool sweepOnce(F&& func) { if (auto head = head_.exchange(nullptr)) // why is memory_order_seq_cst { auto rhead = reverse(head); unlinkAll(rhead, std::forward
(func)); return true; } return false; } /** * Repeatedly replaces the head with nullptr * and calls func() on the removed elements in the order from tail to head. * Stops when the list is empty. */ template
void sweep(F&& func) { while (sweepOnce(std::forward
(func))) { } } /** * Similar to sweepOnce() but calls func() on elements in LIFO order * * func() is called for all elements in the list at the moment * reverseSweepOnce() is called. */ template
bool reverseSweepOnce(F&& func) { // We don't loop like sweep() does because the overall order of callbacks // would be strand-wise LIFO which is meanless to callers. if (auto head = head_.exchange(nullptr)) { unlinkAll(head, std::forward
(func)); return true; } return false; } /** * Replaces the head with nullptr, * and get the member list pointed by head in input order */ T* getInputList() { if (auto head = head_.exchange(nullptr, std::memory_order_acquire)) // why is memory_order_seq_cst { auto rhead = reverse(head); return rhead; } return nullptr; } /** * Replaces the head with nullptr * and get the member list pointed by head in reversed input order */ T* getList() { return head_.exchange(nullptr); } private: std::atomic
head_{ nullptr }; /* Reverses a linked list, returning the pointer to the new head (old tail) */ static T* reverse(T* head) { T* rhead = nullptr; while (head != nullptr) { auto t = head; head = t->next; t->next = rhead; rhead = t; } return rhead; } /* Unlinks all elements in the linked list fragment pointed to by 'head', * calling func() on every element */ template
void unlinkAll(T* head, F&& func) { while (head != nullptr) { auto t = head; head = t->next; t->next = nullptr; func(t); } } };}#endif // FOLLY_REVISE_H

2.基于上面无锁队列的封装

#ifndef COMPOSITE_ATOMIC_LIST_H#define COMPOSITE_ATOMIC_LIST_H/*** Compose a multiple-producers and multiple-consumers atomic list* through given consumer number AtomicForwardList*/#include 
#include
#include "folly_revise.h"namespace folly{ template
class CompositeAtomicList { public: using size_type = typename std::vector
>::size_type; public: CompositeAtomicList(size_type producerNum, size_type consumerNum) : m_producerNum(producerNum), m_consumerNum(consumerNum) { // it is meanless if there is no producer or consumer assert(producerNum > 0); assert(consumerNum > 0); // the number of composite list is equal to consumer number m_compositeList.resize(consumerNum); // initialize the first insertion index of the producers m_producerIdxs.resize(producerNum); for (std::vector
::size_type si = 0; si != m_producerIdxs.size(); ++si) { m_producerIdxs[si] = si % consumerNum; } } CompositeAtomicList(const CompositeAtomicList&) = delete; CompositeAtomicList& operator=(const CompositeAtomicList&) = delete; //CompositeAtomicList(CompositeAtomicList&& other) noexcept // : m_producerNum(other.m_producerNum), m_consumerNum(other.m_consumerNum), // m_producerIdxs(std::move(other.m_producerIdxs), // m_compositeList(std::move(other.m_compositeList) //{ //} CompositeAtomicList(CompositeAtomicList&& other) noexcept = default; CompositeAtomicList& operator=(CompositeAtomicList&& other) noexcept = default; ~CompositeAtomicList() = default; // producer num size_type getProducerNum() const { return m_producerNum; } // consumer num size_type getConsumerNum() const { return m_consumerNum; } bool empty() const { // if there is one consumer list is not empty, the CompositeList is not empty for (const auto& item : m_compositeList) { if (!item.empty()) { return false; } } return true; } // insert node for producer number producer_num bool insertHead(size_type producer_num, T* t) { auto ret = m_compositeList[m_producerIdxs[producer_num]].insertHead(t); m_producerIdxs[producer_num] = (++m_producerIdxs[producer_num]) % m_consumerNum; return ret; } /** * A consumer function * consume nodes for consumer number consumer_num through * invoking function func for every list node in consumer_num. * You should invoke all consumer function for a particular consumer_num * within just one thread to evenly distribute the tasks. * * Recommend calling std::this_thread::yield() when this function returns false */ template
bool sweepOnce(size_type consumer_num, F&& func) { return m_compositeList[consumer_num].sweepOnce(std::forward
(func)); } /** * A consumer function * repeat consume nodes for consumer number consumer_num through * invoking function func for every list node in consumer_sum. * You should invoke all consumer function for a particular consumer_num * within just one thread to evenly distribute the tasks. * * Recommend calling std::this_thread::yield() after calling this function */ template
void sweep(size_type consumer_num, F&& func) { m_compositeList[consumer_num].sweep(std::forward
(func)); } /** * A consumer function * consume nodes for all consumer numbers once through * invoking function func for every list node in consumer_num. * You could invoke this function after all task handler threads terminated * to ensure all nodes have been consumed */ template
void sweepAll(F&& func) { for (size_type si = 0; si != m_consumerNum; ++si) { sweepOnce(si, std::forward
(func)); } } /** * A consumer function * Similar to sweepOnce() but calls func() on elements in LIFO order * * func() is called for all elements in the list at the moment * reverseSweepOnce() is called. * * Recommend calling std::this_thread::yield() when this function returns false */ template
bool reverseSweepOnce(size_type consumer_num, F&& func) { return m_compositeList[consumer_num].reverseSweepOnce(std::forward
(func)); } /** * A consumer function * get all the nodes from consumer list consumer_num in input order * @ return a list of node * * Recommend calling std::this_thread::yield() when this function returns nullptr */ T* getInputList(size_type consumer_num) { return m_compositeList[consumer_num].getInputList(); } /** * A consumer function * get all the nodes from consumer list consumer_num in reversed input order * @ return a list of node * * Recommend calling std::this_thread::yield() when this function returns nullptr */ T* getList() { return m_compositeList[consumer_num].getList(); } private: // the producer and consumer count size_type m_producerNum; size_type m_consumerNum; // the next inserted list for producers std::vector
m_producerIdxs; // the composite atomic lists std::vector
> m_compositeList; };}#endif // COMPOSITE_ATOMIC_LIST_H

3.测试用代码:

#include 
#include
#include
#include
#include
#include
#include
#include
#include "folly_revise.h"#include "composite_atomic_list.h"using namespace folly;struct student_name{ student_name(int age = 0) : age(age), next(nullptr) { } int age; student_name* next;};using ATOMIC_STUDENT_LIST = CompositeAtomicList
;constexpr int PRODUCE_THREAD_NUM = 10; // producing thread numberconstexpr int CONSUME_THREAD_NUM = 5; // consuming thread numberATOMIC_STUDENT_LIST g_students(PRODUCE_THREAD_NUM, CONSUME_THREAD_NUM);std::atomic
g_inserts; // insert num (successful)std::atomic
g_drops; // drop num (successful)std::atomic
g_printNum; // as same as g_dropsstd::atomic
g_ageInSum; // age sum when producing student_namestd::atomic
g_ageOutSum; // age sum when consuming student_namestd::atomic
goOn(true);constexpr int ONE_THREAD_PRODUCE_NUM = 2000000; // when testing, no more than this number, you know 20,000,00 * 100 * 10 ~= MAX_INT if thread num <= 10inline void printOne(student_name* t){ g_printNum.fetch_add(1, std::memory_order_relaxed); g_ageOutSum.fetch_add(t->age, std::memory_order_relaxed); g_drops.fetch_add(1, std::memory_order_relaxed); delete t;}void insert_students(int idNo){ std::default_random_engine dre(time(nullptr)); std::uniform_int_distribution
ageDi(1, 99); for (int i = 0; i < ONE_THREAD_PRODUCE_NUM; ++i) { int newAge = ageDi(dre); g_ageInSum.fetch_add(newAge, std::memory_order_relaxed); g_students.insertHead(idNo, new student_name(newAge)); // use memory_order_relaxed avoiding affect folly memory order g_inserts.fetch_add(1, std::memory_order_relaxed); }}void drop_students(int idNo){ while (goOn.load(std::memory_order_relaxed)) { //auto st = g_students.getInputList(); //while (st) //{ // auto next = st->next; // printOne(st); // // use memory_order_relaxed avoiding affect folly memory order // g_drops.fetch_add(1, std::memory_order_relaxed); // st = next; //} g_students.sweep(idNo, printOne); std::this_thread::yield(); }}int main(){ std::vector
> insert_threads; for (int i = 0; i != PRODUCE_THREAD_NUM; ++i) { insert_threads.push_back(std::async(std::launch::async, insert_students, i)); } std::vector
> drop_threads; for (int i = 0; i != CONSUME_THREAD_NUM; ++i) { drop_threads.push_back(std::async(std::launch::async, drop_students, i)); } for (auto& item : insert_threads) { item.get(); } goOn.store(std::memory_order_relaxed); for (auto& item : drop_threads) { item.get(); } g_students.sweepAll(printOne); std::cout << "insert count1: " << g_inserts.load() << std::endl; std::cout << "drop count1: " << g_drops.load() << std::endl; std::cout << "print num1: " << g_printNum.load() << std::endl; std::cout << "age in1: " << g_ageInSum.load() << std::endl; std::cout << "age out1: " << g_ageOutSum.load() << std::endl; std::cout << std::endl;}

4. 基于AtomicIntrusiveLinkedList插入操作可以一次插入一个节点,而移出操作则会一次移出多个节点,如果每个消费队列都使用一个AtomicInstructiveLinkedList来存储,只要生产均匀分布到各个消费队列中,应该可以实现比较好的效果。不过,由于生产均匀分布分布到各个消费队列中并不那么容易实现,通过使用随机化之类的方式,可以防止人为导致的不均匀。不过,都不能从根本上解决问题,所以,上述方法只有在比较容易实现生产均匀分布到各个消费队列时,适合采用。

转载于:https://www.cnblogs.com/albizzia/p/8535790.html

你可能感兴趣的文章
机器学习课程笔记 (1)
查看>>
基础数据类型 格式化输出
查看>>
第九周作业
查看>>
解析大型.NET ERP系统 单据编码功能实现
查看>>
互联网创业应该如何找到创意 - RethinkDB创始人Slava Akhmechet的几点建议
查看>>
互联网技术架构给我们的启示
查看>>
hbase redis mysql重要知识点总结
查看>>
取数字(dp优化)
查看>>
web app builder arcgis 自定义弹窗
查看>>
第六天冲刺
查看>>
Golang学习 - strconv 包
查看>>
ERROR util.Shell: Failed to locate the winutils binary in the hadoop binary path
查看>>
imx6 system boot
查看>>
[SDOI2017]硬币游戏
查看>>
Azure 网站、云服务和虚拟机比较
查看>>
Windows 10在2018四月更新中默认安装了OpenSSH客户端
查看>>
jQuery常用函数
查看>>
一个忙着找实习工作的大三在校生的真实感受!!!
查看>>
Linux 下的 scp
查看>>
理解同步,异步和延迟脚本
查看>>