LibraryLink Utilities  3.0.1
Modern C++ wrapper over LibraryLink and WSTP
ThreadPool.h
Go to the documentation of this file.
1 
7 #ifndef LLU_ASYNC_THREADPOOL_H
8 #define LLU_ASYNC_THREADPOOL_H
9 
10 #include <atomic>
11 #include <deque>
12 #include <functional>
13 #include <future>
14 #include <thread>
15 #include <type_traits>
16 #include <vector>
17 
18 #include "LLU/Async/Queue.h"
19 #include "LLU/Async/Utilities.h"
21 
22 namespace LLU::Async {
23 
28  template<typename Queue>
30  public:
32  using TaskType = typename Queue::value_type;
33 
34  public:
36  BasicThreadPool() : BasicThreadPool(std::thread::hardware_concurrency()) {};
37 
42  explicit BasicThreadPool(unsigned threadCount) : joiner(threads) {
43  try {
44  for (unsigned i = 0; i < threadCount; ++i) {
45  threads.emplace_back(&BasicThreadPool::workerThread, this);
46  }
47  } catch (...) {
48  done = true;
49  throw;
50  }
51  }
52 
53  // Thread pool is non-copyable
54  BasicThreadPool(const BasicThreadPool&) = delete;
55  BasicThreadPool& operator=(const BasicThreadPool&) = delete;
56  BasicThreadPool(BasicThreadPool&&) = delete;
57  BasicThreadPool& operator=(BasicThreadPool&&) = delete;
58 
64  done = true;
65  for ([[maybe_unused]] auto& t : threads) {
66  workQueue.push(TaskType {[] {}});
67  }
68  }
69 
79  template<typename FunctionType, typename... Args>
80  std::future<std::invoke_result_t<FunctionType, Args...>> submit(FunctionType&& f, Args&&... args) {
81  auto task = Async::getPackagedTask(std::forward<FunctionType>(f), std::forward<Args>(args)...);
82  auto res = task.get_future();
83  workQueue.push(TaskType {std::move(task)});
84  return res;
85  }
86 
88  void runPendingTask() {
89  TaskType task;
90  workQueue.waitPop(task);
91  task();
92  }
93 
94  private:
95  std::atomic_bool done = false;
96  Queue workQueue;
97  std::vector<std::thread> threads;
98  Async::ThreadJoiner joiner;
99 
100  void workerThread() {
101  while (!done) {
102  runPendingTask();
103  }
104  }
105  };
106 
112  template<typename PoolQueue, typename LocalQueue>
114  public:
116  using TaskType = typename PoolQueue::value_type;
117 
118  public:
120  GenericThreadPool() : GenericThreadPool(std::thread::hardware_concurrency()) {};
121 
126  explicit GenericThreadPool(unsigned threadCount) : joiner(threads) {
127  try {
128  for (unsigned i = 0; i < threadCount; ++i) {
129  queues.emplace_back(std::make_unique<LocalQueue>());
130  }
131  for (unsigned i = 0; i < threadCount; ++i) {
132  threads.emplace_back(&GenericThreadPool::workerThread, this, i);
133  }
134  } catch (...) {
135  done = true;
136  throw;
137  }
138  }
139 
140  // Thread pool is non-copyable
141  GenericThreadPool(const GenericThreadPool&) = delete;
142  GenericThreadPool& operator=(const GenericThreadPool&) = delete;
144  GenericThreadPool& operator=(GenericThreadPool&&) = delete;
145 
151  done = true;
152  resume();
153  }
154 
164  template<typename FunctionType, typename... Args>
165  std::future<std::invoke_result_t<FunctionType, Args...>> submit(FunctionType&& f, Args&&... args) {
166  auto task = Async::getPackagedTask(std::forward<FunctionType>(f), std::forward<Args>(args)...);
167  auto res = task.get_future();
168  if (localWorkQueue) {
169  localWorkQueue->push(TaskType {std::move(task)});
170  } else {
171  poolWorkQueue.push(TaskType {std::move(task)});
172  }
173  return res;
174  }
175 
177  void runPendingTask() {
178  TaskType task;
179  if (popTaskFromLocalQueue(task) || popTaskFromPoolQueue(task) || popTaskFromOtherThreadQueue(task)) {
180  task();
181  } else {
182  std::this_thread::yield();
183  }
184  }
185 
186  private:
187  std::atomic_bool done = false;
188  PoolQueue poolWorkQueue;
189  std::vector<std::unique_ptr<LocalQueue>> queues;
190  std::vector<std::thread> threads;
191  Async::ThreadJoiner joiner;
192  inline static thread_local LocalQueue* localWorkQueue = nullptr;
193  inline static thread_local unsigned myIndex = 0;
194 
195  void workerThread(unsigned my_index_) {
196  myIndex = my_index_;
197  localWorkQueue = queues[myIndex].get();
198  while (!done) {
199  runPendingTask();
200  checkPause();
201  }
202  }
203  bool popTaskFromLocalQueue(TaskType& task) {
204  return localWorkQueue && localWorkQueue->tryPop(task);
205  }
206  bool popTaskFromPoolQueue(TaskType& task) {
207  return poolWorkQueue.tryPop(task);
208  }
209  bool popTaskFromOtherThreadQueue(TaskType& task) {
210  for (unsigned i = 0; i < queues.size(); ++i) {
211  unsigned const index = (myIndex + i + 1) % queues.size();
212  if (queues[index]->trySteal(task)) {
213  return true;
214  }
215  }
216  return false;
217  }
218  };
219 
220 } // namespace LLU::Async
221 
222 namespace LLU {
226 
230 }// namespace LLU
231 
232 #endif // LLU_ASYNC_THREADPOOL_H
LLU::Async::GenericThreadPool::runPendingTask
void runPendingTask()
This is the function that each worker thread runs in a loop.
Definition: ThreadPool.h:177
LLU::Async::GenericThreadPool::GenericThreadPool
GenericThreadPool(unsigned threadCount)
Create a GenericThreadPool with given number of threads.
Definition: ThreadPool.h:126
LLU
Main namespace of LibraryLink Utilities.
Definition: Queue.h:13
LLU::Async::GenericThreadPool
Thread pool class with support of per-thread queues and work stealing.
Definition: ThreadPool.h:113
LLU::Async::Pausable
Utility class for pausable task queues.
Definition: Async/Utilities.h:136
LLU::Async::BasicThreadPool::BasicThreadPool
BasicThreadPool(unsigned threadCount)
Create a BasicThreadPool with given number of threads.
Definition: ThreadPool.h:42
WorkStealingQueue.h
Definition and implementation of a thread-safe queue, taken from A. Williams "C++ Concurrency in Acti...
Utilities.h
Set of small utility classes and functions used in the Async part of LLU.
LLU::Async::GenericThreadPool::TaskType
typename PoolQueue::value_type TaskType
Type of the tasks processed by the Queue.
Definition: ThreadPool.h:116
LLU::Async::BasicThreadPool::TaskType
typename Queue::value_type TaskType
Type of the tasks processed by the Queue.
Definition: ThreadPool.h:32
LLU::Async::BasicThreadPool
Simple thread pool class with a single queue.
Definition: ThreadPool.h:29
LLU::Async::BasicThreadPool::submit
std::future< std::invoke_result_t< FunctionType, Args... > > submit(FunctionType &&f, Args &&... args)
Main function of the pool which accepts tasks to be evaluated by the worker threads.
Definition: ThreadPool.h:80
Queue.h
Definition and implementation of a thread-safe queue, taken from A. Williams "C++ Concurrency in Acti...
LLU::Async::BasicThreadPool::runPendingTask
void runPendingTask()
This is the function that each worker thread runs in a loop.
Definition: ThreadPool.h:88
LLU::Async::BasicThreadPool::BasicThreadPool
BasicThreadPool()
Create a BasicThreadPool with the default number of threads (equal to the hardware concurrency)
Definition: ThreadPool.h:36
LLU::Async::GenericThreadPool::submit
std::future< std::invoke_result_t< FunctionType, Args... > > submit(FunctionType &&f, Args &&... args)
Main function of the pool which accepts tasks to be evaluated by the worker threads.
Definition: ThreadPool.h:165
LLU::Async::Pausable::resume
void resume() noexcept
Signal to resume work and notify waiting worker threads.
Definition: Async/Utilities.h:157
LLU::Async::BasicThreadPool::~BasicThreadPool
~BasicThreadPool()
Destructor sets the "done" flag and unblocks all blocked threads by queuing a proper number of specia...
Definition: ThreadPool.h:63
LLU::Async::Pausable::checkPause
void checkPause()
This is the function worker threads will call to see if the work has been paused.
Definition: Async/Utilities.h:144
LLU::Async::GenericThreadPool::GenericThreadPool
GenericThreadPool()
Create a GenericThreadPool with the default number of threads (equal to the hardware concurrency)
Definition: ThreadPool.h:120
LLU::Async::WorkStealingQueue
Wrapper class around a queue, that provides the interface for work stealing.
Definition: WorkStealingQueue.h:20
LLU::Async::ThreadJoiner
A guard for a vector of threads to make sure they are joined before their destruction....
Definition: Async/Utilities.h:91
LLU::Async::GenericThreadPool::~GenericThreadPool
~GenericThreadPool()
Destructor sets the "done" flag and notifies all paused threads.
Definition: ThreadPool.h:150