LibraryLink Utilities  3.0.1
Modern C++ wrapper over LibraryLink and WSTP
Queue.h
Go to the documentation of this file.
1 
6 #ifndef LLU_ASYNC_QUEUE_H
7 #define LLU_ASYNC_QUEUE_H
8 
9 #include <condition_variable>
10 #include <memory>
11 #include <mutex>
12 
13 namespace LLU::Async {
20  template<typename T>
22  public:
24  using value_type = T;
25  public:
29  ThreadsafeQueue() : head(new Node), tail(head.get()) {}
30 
36  std::shared_ptr<value_type> tryPop();
37 
44  bool tryPop(value_type& value);
45 
50  std::shared_ptr<value_type> waitPop();
51 
56  void waitPop(value_type& value);
57 
63  void push(value_type new_value);
64 
69  [[nodiscard]] bool empty() const;
70 
71  private:
73  struct Node {
74  std::shared_ptr<value_type> data;
75  std::unique_ptr<Node> next;
76  };
77 
78  mutable std::mutex head_mutex;
79  std::unique_ptr<Node> head;
80  mutable std::mutex tail_mutex;
81  Node* tail;
82  std::condition_variable data_cond;
83 
84  const Node* getTail() const {
85  std::lock_guard<std::mutex> tail_lock(tail_mutex);
86  return tail;
87  }
88 
89  std::unique_ptr<Node> popHead() {
90  std::unique_ptr<Node> old_head = std::move(head);
91  head = std::move(old_head->next);
92  return old_head;
93  }
94 
95  std::unique_lock<std::mutex> waitForData() {
96  std::unique_lock<std::mutex> head_lock(head_mutex);
97  data_cond.wait(head_lock, [&] { return head.get() != getTail(); });
98  return head_lock;
99  }
100 
101  std::unique_ptr<Node> waitPopHead() {
102  std::unique_lock<std::mutex> head_lock(waitForData());
103  return popHead();
104  }
105 
106  std::unique_ptr<Node> waitPopHead(value_type& value) {
107  std::unique_lock<std::mutex> head_lock(waitForData());
108  value = std::move(*head->data);
109  return popHead();
110  }
111 
112  std::unique_ptr<Node> tryPopHead() {
113  std::lock_guard<std::mutex> head_lock(head_mutex);
114  if (head.get() == getTail()) {
115  return std::unique_ptr<Node>();
116  }
117  return popHead();
118  }
119 
120  std::unique_ptr<Node> tryPopHead(value_type& value) {
121  std::lock_guard<std::mutex> head_lock(head_mutex);
122  if (head.get() == getTail()) {
123  return std::unique_ptr<Node>();
124  }
125  value = std::move(*head->data);
126  return popHead();
127  }
128  };
129 
130  template<typename T>
131  void ThreadsafeQueue<T>::push(T new_value) {
132  std::shared_ptr<T> new_data(std::make_shared<T>(std::move(new_value)));
133  std::unique_ptr<Node> p(new Node);
134  {
135  std::lock_guard<std::mutex> tail_lock(tail_mutex);
136  tail->data = new_data;
137  Node* const new_tail = p.get();
138  tail->next = std::move(p);
139  tail = new_tail;
140  }
141  data_cond.notify_one();
142  }
143 
144  template<typename T>
145  std::shared_ptr<T> ThreadsafeQueue<T>::waitPop() {
146  std::unique_ptr<Node> const old_head = waitPopHead();
147  return old_head->data;
148  }
149 
150  template<typename T>
152  std::unique_ptr<Node> const old_head = waitPopHead(value);
153  }
154 
155  template<typename T>
156  std::shared_ptr<T> ThreadsafeQueue<T>::tryPop() {
157  std::unique_ptr<Node> const old_head = tryPopHead();
158  return old_head ? old_head->data : std::shared_ptr<T>();
159  }
160 
161  template<typename T>
162  bool ThreadsafeQueue<T>::tryPop(T& value) {
163  std::unique_ptr<Node> const old_head = tryPopHead(value);
164  return static_cast<bool>(old_head);
165  }
166 
167  template<typename T>
169  std::lock_guard<std::mutex> head_lock(head_mutex);
170  return (head.get() == getTail());
171  }
172 } // namespace LLU::Async
173 #endif // LLU_ASYNC_QUEUE_H
LLU::Async::ThreadsafeQueue::empty
bool empty() const
Check if the queue is empty.
Definition: Queue.h:168
LLU::Async::ThreadsafeQueue::tryPop
std::shared_ptr< value_type > tryPop()
Get data from the queue if available.
Definition: Queue.h:156
LLU::Async::ThreadsafeQueue::value_type
T value_type
Value type of queue elements.
Definition: Queue.h:24
LLU::Async::ThreadsafeQueue::waitPop
std::shared_ptr< value_type > waitPop()
Get data from the queue, possibly waiting for it.
Definition: Queue.h:145
LLU::Async::ThreadsafeQueue::ThreadsafeQueue
ThreadsafeQueue()
Create new empty queue.
Definition: Queue.h:29
LLU::Async::ThreadsafeQueue
ThreadsafeQueue is a linked list of nodes which supports safe concurrent access to its head (removing...
Definition: Queue.h:21
LLU::Async::ThreadsafeQueue::push
void push(value_type new_value)
Push new value to the end of the queue.
Definition: Queue.h:131