Welcome back to our journey through lock-free programming in C++. In the last lesson, you learned about implementing a lock-free stack. Now, we're progressing to another essential data structure: the queue. This lesson will focus on implementing a thread-safe lock-free queue, building upon the foundation of atomic operations explored in the previous lessons. By the end of this unit, you'll be equipped with the skills to implement a robust, efficient queue that operates without traditional locking mechanisms.
In this lesson, you'll learn how to build a lock-free queue using atomic operations to handle concurrency efficiently.
The following code example is a sneak peek into what you'll be creating:
C++1#include <atomic> 2#include <iostream> 3#include <thread> 4#include <memory> 5#include <chrono> 6 7template<typename T> 8class LockFreeQueue { 9private: 10 struct Node { 11 T data; 12 std::atomic<Node*> next; 13 Node(const T& value) : data(value), next(nullptr) {} 14 }; 15 16 std::atomic<Node*> head; 17 std::atomic<Node*> tail; 18 19public: 20 LockFreeQueue() { 21 Node* dummy = new Node(T{}); // Dummy node to simplify push/pop logic 22 head.store(dummy); 23 tail.store(dummy); 24 } 25 26 ~LockFreeQueue() { 27 while (Node* old_head = head.load()) { // Cleanup all nodes 28 head.store(old_head->next); 29 delete old_head; 30 } 31 } 32 33 void push(const T& value) { 34 Node* new_node = new Node(value); 35 Node* old_tail; 36 37 while (true) { 38 old_tail = tail.load(); 39 Node* tail_next = old_tail->next; 40 41 if (old_tail == tail.load()) { // Check if tail hasn't changed 42 if (tail_next == nullptr) { // Tail is truly the last node 43 if (old_tail->next.compare_exchange_weak(tail_next, new_node)) { 44 break; // Successfully added the new node 45 } 46 } else { // Tail is lagging; advance it 47 tail.compare_exchange_weak(old_tail, tail_next); 48 } 49 } 50 } 51 // Move tail to the new node 52 tail.compare_exchange_weak(old_tail, new_node); 53 std::cout << "Pushed: " << value << "\n"; 54 } 55 56 bool pop(T& result) { 57 Node* old_head; 58 59 while (true) { 60 old_head = head.load(); 61 Node* old_tail = tail.load(); 62 Node* head_next = old_head->next; 63 64 if (old_head == head.load()) { // Consistency check 65 if (old_head == old_tail) { // Queue might be empty 66 if (head_next == nullptr) { // Queue is empty 67 std::cout << "Pop failed - queue is empty.\n"; 68 return false; 69 } 70 tail.compare_exchange_weak(old_tail, head_next); // Advance tail 71 } else { // Queue is not empty 72 if (head.compare_exchange_weak(old_head, head_next)) { 73 result = head_next->data; 74 delete old_head; // Free the old dummy head node 75 std::cout << "Popped: " << result << "\n"; 76 return true; 77 } 78 } 79 } 80 } 81 } 82};
Let's break down the implementation step by step:
- Node Structure: The queue is implemented using a linked list of nodes, where each node contains the data and an atomic pointer to the next node. The constructor initializes the node with the provided data and a null pointer for the next node.
- Head and Tail Pointers: The queue maintains two atomic pointers,
head
andtail
, representing the front and back of the queue, respectively. Both pointers are initially set to a dummy node to simplify the logic. - Push Operation: The
push
method adds a new node to the queue. It first creates a new node with the provided value and then attempts to add it to the queue. The method uses a loop to handle concurrent updates to the tail pointer and ensures that the new node is correctly linked to the queue. - Pop Operation: The
pop
method removes and returns the front node from the queue. It also uses a loop to handle concurrent updates to the head and tail pointers. The method checks for empty and non-empty queue conditions and updates the pointers accordingly. - Cleanup: The destructor cleans up the queue by deleting all nodes, starting from the head.
Let's now see how to use this lock-free queue in a multithreaded environment:
C++1void pushItems(LockFreeQueue<int>& queue, int start, int end) { 2 for (int i = start; i < end; ++i) { 3 queue.push(i); 4 std::this_thread::sleep_for(std::chrono::milliseconds(50)); // Delay for interleaving 5 } 6} 7 8void popItems(LockFreeQueue<int>& queue, int count) { 9 int value; 10 for (int i = 0; i < count; ++i) { 11 queue.pop(value); 12 std::this_thread::sleep_for(std::chrono::milliseconds(30)); // Delay for interleaving 13 } 14} 15 16int main() { 17 LockFreeQueue<int> queue; 18 19 // Create threads to push and pop concurrently 20 std::thread t1(pushItems, std::ref(queue), 1, 6); 21 std::thread t2(popItems, std::ref(queue), 3); 22 std::thread t3(pushItems, std::ref(queue), 6, 11); 23 std::thread t4(popItems, std::ref(queue), 5); 24 25 t1.join(); 26 t2.join(); 27 t3.join(); 28 t4.join(); 29 30 return 0; 31}
In this example, we create a lock-free queue of integers and spawn multiple threads to push and pop items concurrently. The pushItems
and popItems
functions simulate the enqueue and dequeue operations, respectively, with a delay to introduce interleaving. The main function creates threads to perform these operations and waits for them to complete before exiting. Note, that the output may vary due to the interleaving of operations and the printed messages may be mixed up, since we don't use any synchronization for the output.
Lock-free queues are vital for high-performance systems that require concurrent processing without delays caused by mutual exclusion locks. Unlike lock-based queues, lock-free queues are more scalable, allowing multiple threads to perform enqueue and dequeue operations simultaneously without blocking. By mastering the design of these data structures, you'll enhance your ability to write multithreaded applications that fully utilize modern processor capabilities for better responsiveness and throughput.
Exciting, isn't it? Let's move on to the practice section to solidify your understanding through hands-on implementation!