Lockfree, atomic, multi producer, multi consumer, C++, in process and inter-process queue
atomic, lockfree, multi producer, multi consumer queue
Github repo: http://github.com/erez-strauss/lockfree_mpmc_queue/
The CppCon 2023 presentation on youtube: https://youtu.be/M3v2GfeGJYs
The slides: CppCon2023_ES_Lockfree_MPMC_Queue_Oct3.pdf
es::lockfree::mpmc_queue<DataT, size_t N, IndexT=uint32_t, bool lazy_push=false, bool lazy_pop=false> queue;
template parameters:
Two producers, each producer places N integers into the queue, two consumers, each consumer consumes N numbers from the queue.
#include <mpmc_queue.h>
#include <iostream>
#include <thread>
#include <vector>
int main()
{
es::lockfree::mpmc_queue<unsigned> q{32};
constexpr unsigned N{1000000};
constexpr unsigned P{2};
std::atomic<uint64_t> prod_sum{0};
std::atomic<uint64_t> cons_sum{0};
auto producer = [&]() {
for (unsigned x = 0; x < N; ++x) {
while (!q.push(x))
;
prod_sum += x;
}
};
std::vector<std::thread> producers;
producers.resize(P);
for (auto& p : producers) p = std::thread{producer};
auto consumer = [&]() {
unsigned v{0};
for (unsigned x = 0; x < N; ++x) {
while (!q.pop(v))
;
cons_sum += v;
}
};
std::vector<std::thread> consumers;
consumers.resize(P);
for (auto& c : consumers) c = std::thread{consumer};
for (auto& p : producers) p.join();
for (auto& c : consumers) c.join();
std::cout << (cons_sum && cons_sum == prod_sum ? "OK" : "ERROR") << " " << cons_sum << '\n';
return 0;
}
you can try the example using: make build/example2
The push/pop operations are defined successful when compare_exchange successed on the internal buffer's entries. The write / read indexes are changed also using atomic compare_exchange as second step of the operation.
The writer threads and reader threads are cooperative within their group, so any successful operation on the _array[] entries, can be completed by other threads of the same type.
The push/write and the pop/read indexes are incremented by one as part of successful operation. The sequence number inside the _array entries hold the value of (index<<1) for empty entry, and ((index<<1)|1) odd value for entry with data.
Initialization - clear the _array entries, place the _seq (index<<1) empty/even values indicating the empty entries
push(value_type d) - as value_type are expected to be small, they are passed by value and not const reference. returns true on successful push(), false if queue is full
pop(value_type& d) - true if popped an element, false if queue is empty.
empty() - true if queue is empty, progress the _read_index if needed
consume() - consumes one or more elements, ends on empty queue or true return value from the fanctor
peek() - look at the head of the queue, without popping out the element
pop_if() - pop the head of the queue if functor returns true
push_keep_n() - push an element, and drop the oldest element from the queue if failed to push and push again. always return true;
capacity() - returns the capacity of the queue
size() - returns number of elements in the queue, its value is not guarenteed, as push/pop might be lazy to increment their respective indexes.
enqueue() / dequeue() - convenience methods call push()/pop() respectively.
The bandwidth performance, measured using the src/q_bandwidth.cpp shows that this queue is faster by two or three times than boost::lockfree::queue on some of the runs.
push(data_type d) operation is composed from the following operations:
pop(value_type& d) - similar to push
In short, push/pop - has two compare_and_set operations, first one on the array entry, and then on the index.
The es::lockfree:shared_mpmc_queue<> constructor expects a file name in the /dev/shm/ or /dev/hugepages. multiple processes and their threads can access a shared queue. The shared_mpmc_queue provides producer and consumer API.
#include<es/lockfree/shared_mpmc_queue.h>
es::lockfree::shared_mpmc_queue<es::lockfree::mpmc_queue<unsigned, 128, unsigned>> shared_q("/dev/shm/mpmc_q000");
// multiple producer processes/threads
auto prod = shared_q.get_producer();
prod.push(123);
...
// multiple consumer processes/threads
auto con = shared_q.get_consumer();
unsigned value{};
if (cons.pop(value))
std::Cout << ( value == 123 )
The mpmc_queue<> performs best, with higher bandwidth and lower latency, in the case of single-producer and single-consumer as in this case it is wait free. The mpmc_queue_pack<> groups multiple mpmc_queue<>s (G) into a single queue, the number of such internal queues is compile time template parameter, but it does not impose a limit on the number of run-time producers or consumers. In case the run-time number of producers is up to G, the performance of the mpmc_queue_pack<> will be close to best performing case for the internal mpmc_queue<> times number of producers.
The mpmc_queue_pack.h defines:
// Q - The basic mpmc queue type to hold
// G - How many queues are in a group/pack of mpmc_queues.
// K - Every how many successful pop, force going to the next internal queue with data
template<typename Q, unsigned G, unsigned K> class mpmc_queue_pack {
// ....
template<typename QT> struct producer_accessor;
template<typename QT> struct consumer_accessor;
In order to use the pack of queues as a single queue, we need to use the accessor, to enqueue and dequeue. Each producer uses a single internal mpmc_queue<> for sending, which guarentees that the order is saved, for each producer. Each consumer consumes up to K element from an internal mpmc_queue<> before checking for available data on the other internal mpmc_queue<>s in the pack.
The tradeoff using a mpmc_queue_pack<> vs mpmc_queue<> are:
mpmc_queue<>
mpmc_queue_pack<>
both the mpmc_queue<> and mpmc_queue_pack<> are:
The accessor for a single mpmc_queue<> is a reference type to the mpmc_queue<>.
Using accessor-type for a given specific queue type enables writing the same template code to use both single mpmc_queue<> or a group of them in mpmc_queue_pack<>.
Example:
#include <mpmc_queue_pack.h>
using queue_pack_type = es::lockfree::mpmc_queue_pack<es::lockfree::mpmc_queue<uint64_t, 0, uint32_t, false, false>, 4, 16>;
using producer_type = typename es::lockfree::producer_accessor<queue_pack_type>::type;
using consumer_type = typename es::lockfree::consumer_accessor<queue_pack_type>::type;
std::unique_ptr<queue_pack_type> qpack = std::make_unique<queue_pack_type>(16);
producer_type p1{*qpack};
producer_type p2{*qpack};
consumer_type c1{*qpack};
consumer_type c2{*qpack};
p1.push(1024);
p2.push(1025);
queue_pack_type::value_type v0{0};
queue_pack_type::value_type v1{0};
c1.pop(v0);
c2.pop(v1);
std::cout << "v0: " << v0 << " v1: " << v1 << '\n';
The example above shows a mpmc_queue_pack<> holding a group of four mpmc_queue<>s.
The mpmc_queue_pack<> requires its user to get an accessor, producer or consumer, in order to push / pop data into the mpmc_queue_pack.
The pop() operation from mpmc_queue_pack<> is scanning the grouped mpmc_queue<>s for data, once it finds data to consume, it will provide it to the caller, and will continue to pop element from the same mpmc_queue<>, in our example above, up to 16 pop()s in a row, then, it will scan the other queues in the group.
The stc/pack_benchmark.cpp benchmark runs the same templated benchmark code on mpmc_queue<> and mpmc_queue_pack<> and compare the gain from using the group of queues.
to run google test of the queues, run the following
make gtest_run
To generate performance report, including boost queue for comparison:
make report
Use the 'make report' to run the q_bandwidth.cpp, it will generate performance report. the performance report can be processed to generate the summary below per data size.
The benchmark tests the four different favors of the mpmc_queue<> which are differ with regardi to the incrementing the write and read indexes.
Here are results from laptop, hyper-threading is on.
$ ./scripts/report-processing.pl reports/q-bw-report.20191126-235824.txt
report for data size: 4
fastest 1-to-1: data_sz: 4 index_sz: 4 queue_name: mpmc_queue<ff> capacity: 64 bandwidth: 49,415,117
fastest 2-to-2: data_sz: 4 index_sz: 4 queue_name: mpmc_queue<ff> capacity: 64 bandwidth: 12,147,207 (24.58)
fastest 1-to-2: data_sz: 4 index_sz: 4 queue_name: mpmc_queue<tf> capacity: 64 bandwidth: 14,316,094 (28.97)
fastest 2-to-1: data_sz: 4 index_sz: 4 queue_name: mpmc_queue<ft> capacity: 64 bandwidth: 14,071,685 (28.48)
boostq 1-to-1: data_sz: 4 index_sz: - queue_name: boost:lf:queue capacity: 64 bandwidth: 6,074,995 (12.29)
boostq 2-to-2: data_sz: 4 index_sz: - queue_name: boost:lf:queue capacity: 64 bandwidth: 5,328,217 (10.78)
report for data size: 8
fastest 1-to-1: data_sz: 8 index_sz: 8 queue_name: mpmc_queue<tf> capacity: 64 bandwidth: 27,853,442
fastest 2-to-2: data_sz: 8 index_sz: 8 queue_name: mpmc_queue<tf> capacity: 64 bandwidth: 9,294,286 (33.37)
fastest 1-to-2: data_sz: 8 index_sz: 8 queue_name: mpmc_queue<tf> capacity: 8 bandwidth: 11,121,625 (39.93)
fastest 2-to-1: data_sz: 8 index_sz: 8 queue_name: mpmc_queue<ft> capacity: 8 bandwidth: 13,297,038 (47.74)
boostq 1-to-1: data_sz: 8 index_sz: - queue_name: boost:lf:queue capacity: 2 bandwidth: 5,802,189 (20.83)
boostq 2-to-2: data_sz: 8 index_sz: - queue_name: boost:lf:queue capacity: 64 bandwidth: 5,179,598 (18.60)
report for data size: 12
fastest 1-to-1: data_sz: 12 index_sz: 4 queue_name: mpmc_queue<tf> capacity: 64 bandwidth: 28,228,040
fastest 2-to-2: data_sz: 12 index_sz: 4 queue_name: mpmc_queue<tf> capacity: 8 bandwidth: 9,419,344 (33.37)
fastest 1-to-2: data_sz: 12 index_sz: 4 queue_name: mpmc_queue<tf> capacity: 64 bandwidth: 11,157,596 (39.53)
fastest 2-to-1: data_sz: 12 index_sz: 4 queue_name: mpmc_queue<ft> capacity: 8 bandwidth: 13,770,559 (48.78)
boostq 1-to-1: data_sz: 12 index_sz: - queue_name: boost:lf:queue capacity: 64 bandwidth: 6,110,304 (21.65)
boostq 2-to-2: data_sz: 12 index_sz: - queue_name: boost:lf:queue capacity: 64 bandwidth: 5,336,504 (18.90)
relax the strong ordering of the atomic operations
In the push/pop - attempt to complete the operations using transactional memory operations (xbegin/xend) and only on failure fall back to atomic compare_exchange operations.
detailed / summary benchmarks
Collecting performance reports from different machines, platforms.
You can run, test and send me performance report files
Please send your feedback to Erez Strauss [email protected] with subject line MPMCQ ...
Thanks also goes to the following queues implementations,
Dmitry Vyukov MPMC Queue
Boost MPMC lockfree
Lock free:
MIT, see LICENSE file.