 |
LibraryLink Utilities
3.0.1
Modern C++ wrapper over LibraryLink and WSTP
|
Go to the documentation of this file.
7 #ifndef LLU_ASYNC_THREADPOOL_H
8 #define LLU_ASYNC_THREADPOOL_H
15 #include <type_traits>
22 namespace LLU::Async {
28 template<
typename Queue>
44 for (
unsigned i = 0; i < threadCount; ++i) {
45 threads.emplace_back(&BasicThreadPool::workerThread,
this);
65 for ([[maybe_unused]]
auto& t : threads) {
79 template<
typename FunctionType,
typename... Args>
80 std::future<std::invoke_result_t<FunctionType, Args...>>
submit(FunctionType&& f, Args&&... args) {
81 auto task = Async::getPackagedTask(std::forward<FunctionType>(f), std::forward<Args>(args)...);
82 auto res = task.get_future();
83 workQueue.push(
TaskType {std::move(task)});
90 workQueue.waitPop(task);
95 std::atomic_bool done =
false;
97 std::vector<std::thread> threads;
100 void workerThread() {
112 template<
typename PoolQueue,
typename LocalQueue>
128 for (
unsigned i = 0; i < threadCount; ++i) {
129 queues.emplace_back(std::make_unique<LocalQueue>());
131 for (
unsigned i = 0; i < threadCount; ++i) {
132 threads.emplace_back(&GenericThreadPool::workerThread,
this, i);
164 template<
typename FunctionType,
typename... Args>
165 std::future<std::invoke_result_t<FunctionType, Args...>>
submit(FunctionType&& f, Args&&... args) {
166 auto task = Async::getPackagedTask(std::forward<FunctionType>(f), std::forward<Args>(args)...);
167 auto res = task.get_future();
168 if (localWorkQueue) {
169 localWorkQueue->push(
TaskType {std::move(task)});
171 poolWorkQueue.push(
TaskType {std::move(task)});
179 if (popTaskFromLocalQueue(task) || popTaskFromPoolQueue(task) || popTaskFromOtherThreadQueue(task)) {
182 std::this_thread::yield();
187 std::atomic_bool done =
false;
188 PoolQueue poolWorkQueue;
189 std::vector<std::unique_ptr<LocalQueue>> queues;
190 std::vector<std::thread> threads;
192 inline static thread_local LocalQueue* localWorkQueue =
nullptr;
193 inline static thread_local
unsigned myIndex = 0;
195 void workerThread(
unsigned my_index_) {
197 localWorkQueue = queues[myIndex].get();
203 bool popTaskFromLocalQueue(
TaskType& task) {
204 return localWorkQueue && localWorkQueue->tryPop(task);
206 bool popTaskFromPoolQueue(
TaskType& task) {
207 return poolWorkQueue.tryPop(task);
209 bool popTaskFromOtherThreadQueue(
TaskType& task) {
210 for (
unsigned i = 0; i < queues.size(); ++i) {
211 unsigned const index = (myIndex + i + 1) % queues.size();
212 if (queues[index]->trySteal(task)) {
232 #endif // LLU_ASYNC_THREADPOOL_H
void runPendingTask()
This is the function that each worker thread runs in a loop.
Definition: ThreadPool.h:177
GenericThreadPool(unsigned threadCount)
Create a GenericThreadPool with given number of threads.
Definition: ThreadPool.h:126
Main namespace of LibraryLink Utilities.
Definition: Queue.h:13
Thread pool class with support of per-thread queues and work stealing.
Definition: ThreadPool.h:113
Utility class for pausable task queues.
Definition: Async/Utilities.h:136
BasicThreadPool(unsigned threadCount)
Create a BasicThreadPool with given number of threads.
Definition: ThreadPool.h:42
Definition and implementation of a thread-safe queue, taken from A. Williams "C++ Concurrency in Acti...
Set of small utility classes and functions used in the Async part of LLU.
typename PoolQueue::value_type TaskType
Type of the tasks processed by the Queue.
Definition: ThreadPool.h:116
typename Queue::value_type TaskType
Type of the tasks processed by the Queue.
Definition: ThreadPool.h:32
Simple thread pool class with a single queue.
Definition: ThreadPool.h:29
std::future< std::invoke_result_t< FunctionType, Args... > > submit(FunctionType &&f, Args &&... args)
Main function of the pool which accepts tasks to be evaluated by the worker threads.
Definition: ThreadPool.h:80
Definition and implementation of a thread-safe queue, taken from A. Williams "C++ Concurrency in Acti...
void runPendingTask()
This is the function that each worker thread runs in a loop.
Definition: ThreadPool.h:88
BasicThreadPool()
Create a BasicThreadPool with the default number of threads (equal to the hardware concurrency)
Definition: ThreadPool.h:36
std::future< std::invoke_result_t< FunctionType, Args... > > submit(FunctionType &&f, Args &&... args)
Main function of the pool which accepts tasks to be evaluated by the worker threads.
Definition: ThreadPool.h:165
void resume() noexcept
Signal to resume work and notify waiting worker threads.
Definition: Async/Utilities.h:157
~BasicThreadPool()
Destructor sets the "done" flag and unblocks all blocked threads by queuing a proper number of specia...
Definition: ThreadPool.h:63
void checkPause()
This is the function worker threads will call to see if the work has been paused.
Definition: Async/Utilities.h:144
GenericThreadPool()
Create a GenericThreadPool with the default number of threads (equal to the hardware concurrency)
Definition: ThreadPool.h:120
Wrapper class around a queue, that provides the interface for work stealing.
Definition: WorkStealingQueue.h:20
A guard for a vector of threads to make sure they are joined before their destruction....
Definition: Async/Utilities.h:91
~GenericThreadPool()
Destructor sets the "done" flag and notifies all paused threads.
Definition: ThreadPool.h:150