Geant4 11.3.0
Toolkit for the simulation of the passage of particles through matter
Loading...
Searching...
No Matches
UserTaskQueue.cc
Go to the documentation of this file.
1//
2// MIT License
3// Copyright (c) 2020 Jonathan R. Madsen
4// Permission is hereby granted, free of charge, to any person obtaining a copy
5// of this software and associated documentation files (the "Software"), to deal
6// in the Software without restriction, including without limitation the rights
7// to use, copy, modify, merge, publish, distribute, sublicense, and
8// copies of the Software, and to permit persons to whom the Software is
9// furnished to do so, subject to the following conditions:
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED
12// "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
13// LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
14// PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
15// HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
16// ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
17// WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
18//
19// ---------------------------------------------------------------
20// Tasking class implementation
21// Class Description:
22// ---------------------------------------------------------------
23// Author: Jonathan Madsen
24// ---------------------------------------------------------------
25
26#include "PTL/UserTaskQueue.hh"
27
28#include "PTL/AutoLock.hh"
30#include "PTL/TaskGroup.hh"
31#include "PTL/ThreadData.hh"
32#include "PTL/ThreadPool.hh"
33
34#include <cassert>
35#include <chrono>
36#include <functional>
37#include <iostream>
38#include <map>
39#include <stdexcept>
40#include <thread>
41#include <utility>
42
43namespace PTL
44{
45//======================================================================================//
46
48: VUserTaskQueue(nworkers)
49, m_is_clone((parent) != nullptr)
50, m_thread_bin((parent) ? (ThreadPool::get_this_thread_id() % (nworkers + 1)) : 0)
51, m_insert_bin((parent) ? (ThreadPool::get_this_thread_id() % (nworkers + 1)) : 0)
52, m_hold((parent) ? parent->m_hold : new std::atomic_bool(false))
53, m_ntasks((parent) ? parent->m_ntasks : new std::atomic_uintmax_t(0))
54, m_mutex((parent) ? parent->m_mutex : new Mutex{})
55, m_subqueues((parent) ? parent->m_subqueues : new TaskSubQueueContainer())
56{
57 // create nthreads + 1 subqueues so there is always a subqueue available
58 if(!parent)
59 {
60 for(intmax_t i = 0; i < nworkers + 1; ++i)
61 m_subqueues->emplace_back(new TaskSubQueue(m_ntasks));
62 }
63}
64
65//======================================================================================//
66
68{
69 if(!m_is_clone)
70 {
71 for(auto& itr : *m_subqueues)
72 {
73 assert(itr->empty());
74 delete itr;
75 }
76 m_subqueues->clear();
77 delete m_hold;
78 delete m_ntasks;
79 delete m_mutex;
80 delete m_subqueues;
81 }
82}
83
84//======================================================================================//
85
86void
88{
89 if(!m_mutex)
90 throw std::runtime_error("nullptr to mutex");
91 AutoLock lk(m_mutex);
92 if(m_workers < n)
93 {
94 while(m_workers < n)
95 {
96 m_subqueues->emplace_back(new TaskSubQueue(m_ntasks));
97 ++m_workers;
98 }
99 }
100 else if(m_workers > n)
101 {
102 while(m_workers > n)
103 {
104 delete m_subqueues->back();
105 m_subqueues->pop_back();
106 --m_workers;
107 }
108 }
109}
110
111//======================================================================================//
112
115{
116 return new UserTaskQueue(workers(), this);
117}
118//======================================================================================//
119
120intmax_t
122{
123 // get a thread id number
124 static thread_local intmax_t tl_bin =
125 (m_thread_bin + ThreadPool::get_this_thread_id()) % (m_workers + 1);
126 return tl_bin;
127}
128
129//======================================================================================//
130
131intmax_t
133{
134 return (++m_insert_bin % (m_workers + 1));
135}
136
137//======================================================================================//
138
141{
142 intmax_t tbin = GetThreadBin();
143 TaskSubQueue* task_subq = (*m_subqueues)[tbin % (m_workers + 1)];
144 task_pointer _task = nullptr;
145
146 //------------------------------------------------------------------------//
147 auto get_task = [&]() {
148 if(task_subq->AcquireClaim())
149 {
150 // run task
151 _task = task_subq->PopTask(true);
152 // release the claim on the bin
153 task_subq->ReleaseClaim();
154 }
155 if(_task)
156 --(*m_ntasks);
157 // return success if valid pointer
158 return (_task != nullptr);
159 };
160 //------------------------------------------------------------------------//
161
162 // while not empty
163 while(!task_subq->empty())
164 {
165 if(get_task())
166 break;
167 }
168 return _task;
169}
170
171//======================================================================================//
172
174UserTaskQueue::GetTask(intmax_t subq, intmax_t nitr)
175{
176 // exit if empty
177 if(this->true_empty())
178 return nullptr;
179
180 // ensure the thread has a bin assignment
181 intmax_t tbin = GetThreadBin();
182 intmax_t n = (subq < 0) ? tbin : subq;
183 if(nitr < 1)
184 nitr = (m_workers + 1); // * m_ntasks->load(std::memory_order_relaxed);
185
186 if(m_hold->load(std::memory_order_relaxed))
187 {
188 return GetThreadBinTask();
189 }
190
191 task_pointer _task = nullptr;
192 //------------------------------------------------------------------------//
193 auto get_task = [&](intmax_t _n) {
194 TaskSubQueue* task_subq = (*m_subqueues)[_n % (m_workers + 1)];
195 // try to acquire a claim for the bin
196 // if acquired, no other threads will access bin until claim is released
197 if(!task_subq->empty() && task_subq->AcquireClaim())
198 {
199 // pop task out of bin
200 _task = task_subq->PopTask(n == tbin);
201 // release the claim on the bin
202 task_subq->ReleaseClaim();
203 }
204 if(_task)
205 --(*m_ntasks);
206 // return success if valid pointer
207 return (_task != nullptr);
208 };
209 //------------------------------------------------------------------------//
210
211 // there are num_workers+1 bins so there is always a bin that is open
212 // execute num_workers+2 iterations so the thread checks its bin twice
213 // while(!empty())
214 {
215 for(intmax_t i = 0; i < nitr; ++i, ++n)
216 {
217 if(get_task(n % (m_workers + 1)))
218 return _task;
219 }
220 }
221
222 // only reached if looped over all bins (and looked in own bin twice)
223 // and found no work so return an empty task and the thread will be put to
224 // sleep if there is still no work by the time it reaches its
225 // condition variable
226 return _task;
227}
228
229//======================================================================================//
230
231intmax_t
233{
234 // increment number of tasks
235 ++(*m_ntasks);
236
237 bool spin = m_hold->load(std::memory_order_relaxed);
238 intmax_t tbin = GetThreadBin();
239
240 if(data && data->within_task)
241 {
242 subq = tbin;
243 // spin = true;
244 }
245
246 // subq is -1 unless specified so unless specified
247 // GetInsertBin() call increments a counter and returns
248 // counter % (num_workers + 1) so that tasks are distributed evenly
249 // among the bins
250 intmax_t n = (subq < 0) ? GetInsertBin() : subq;
251
252 //------------------------------------------------------------------------//
253 auto insert_task = [&](intmax_t _n) {
254 TaskSubQueue* task_subq = (*m_subqueues)[_n];
255 // TaskSubQueue* next_subq = (*m_subqueues)[(_n + 1) % (m_workers + 1)];
256 // if not threads bin and size difference, insert into smaller
257 // if(n != tbin && next_subq->size() < task_subq->size())
258 // task_subq = next_subq;
259 // try to acquire a claim for the bin
260 // if acquired, no other threads will access bin until claim is released
261 if(task_subq->AcquireClaim())
262 {
263 // push the task into the bin
264 task_subq->PushTask(std::move(task));
265 // release the claim on the bin
266 task_subq->ReleaseClaim();
267 // return success
268 return true;
269 }
270 return false;
271 };
272 //------------------------------------------------------------------------//
273
274 // if not in "hold/spin mode", where thread only inserts tasks into
275 // specified bin, then move onto next bin
276 //
277 if(spin)
278 {
279 n = n % (m_workers + 1);
280 while(!insert_task(n))
281 ;
282 return n;
283 }
284
285 // there are num_workers+1 bins so there is always a bin that is open
286 // execute num_workers+2 iterations so the thread checks its bin twice
287 while(true)
288 {
289 auto _n = (n++) % (m_workers + 1);
290 if(insert_task(_n))
291 return _n;
292 }
293}
294
295//======================================================================================//
296
297void
299{
300 using task_group_type = TaskGroup<int, int>;
301 using thread_execute_map_t = std::map<int64_t, bool>;
302
303 if(!tp->is_alive())
304 {
305 func();
306 return;
307 }
308
309 task_group_type tg{ [](int& ref, int i) { return (ref += i); }, tp };
310
311 // wait for all threads to finish any work
312 // NOTE: will cause deadlock if called from a task
313 while(tp->get_active_threads_count() > 0)
314 ThisThread::sleep_for(std::chrono::milliseconds(10));
315
316 thread_execute_map_t thread_execute_map{};
317 std::vector<std::shared_ptr<VTask>> _tasks{};
318 _tasks.reserve(m_workers + 1);
319
320 AcquireHold();
321 for(int i = 0; i < (m_workers + 1); ++i)
322 {
323 if(i == GetThreadBin())
324 continue;
325
326 //--------------------------------------------------------------------//
327 auto thread_specific_func = [&]() {
328 ScopeDestructor _dtor = tg.get_scope_destructor();
329 static Mutex _mtx;
330 _mtx.lock();
331 bool& _executed = thread_execute_map[GetThreadBin()];
332 _mtx.unlock();
333 if(!_executed)
334 {
335 func();
336 _executed = true;
337 return 1;
338 }
339 return 0;
340 };
341 //--------------------------------------------------------------------//
342
343 InsertTask(tg.wrap(thread_specific_func), ThreadData::GetInstance(), i);
344 }
345
346 tp->notify_all();
347 int nexecuted = tg.join();
348 if(nexecuted != m_workers)
349 {
350 std::stringstream msg;
351 msg << "Failure executing routine on all threads! Only " << nexecuted
352 << " threads executed function out of " << m_workers << " workers";
353 std::cerr << msg.str() << std::endl;
354 }
355 ReleaseHold();
356}
357
358//======================================================================================//
359
360void
362 function_type func)
363{
364 using task_group_type = TaskGroup<int, int>;
365 using thread_execute_map_t = std::map<int64_t, bool>;
366
367 task_group_type tg{ [](int& ref, int i) { return (ref += i); }, tp };
368
369 // wait for all threads to finish any work
370 // NOTE: will cause deadlock if called from a task
371 while(tp->get_active_threads_count() > 0)
372 ThisThread::sleep_for(std::chrono::milliseconds(10));
373
374 if(!tp->is_alive())
375 {
376 func();
377 return;
378 }
379
380 thread_execute_map_t thread_execute_map{};
381
382 //========================================================================//
383 // wrap the function so that it will only be executed if the thread
384 // has an ID in the set
385 auto thread_specific_func = [&]() {
386 ScopeDestructor _dtor = tg.get_scope_destructor();
387 static Mutex _mtx;
388 _mtx.lock();
389 bool& _executed = thread_execute_map[GetThreadBin()];
390 _mtx.unlock();
391 if(!_executed && tid_set.count(ThisThread::get_id()) > 0)
392 {
393 func();
394 _executed = true;
395 return 1;
396 }
397 return 0;
398 };
399 //========================================================================//
400
401 if(tid_set.count(ThisThread::get_id()) > 0)
402 func();
403
404 AcquireHold();
405 for(int i = 0; i < (m_workers + 1); ++i)
406 {
407 if(i == GetThreadBin())
408 continue;
409
410 InsertTask(tg.wrap(thread_specific_func), ThreadData::GetInstance(), i);
411 }
412 tp->notify_all();
413 decltype(tid_set.size()) nexecuted = tg.join();
414 if(nexecuted != tid_set.size())
415 {
416 std::stringstream msg;
417 msg << "Failure executing routine on specific threads! Only " << nexecuted
418 << " threads executed function out of " << tid_set.size() << " workers";
419 std::cerr << msg.str() << std::endl;
420 }
421 ReleaseHold();
422}
423
424//======================================================================================//
425
426void
427UserTaskQueue::AcquireHold()
428{
429 bool _hold;
430 while(!(_hold = m_hold->load(std::memory_order_relaxed)))
431 {
432 m_hold->compare_exchange_strong(_hold, true, std::memory_order_release,
433 std::memory_order_relaxed);
434 }
435}
436
437//======================================================================================//
438
439void
440UserTaskQueue::ReleaseHold()
441{
442 bool _hold;
443 while((_hold = m_hold->load(std::memory_order_relaxed)))
444 {
445 m_hold->compare_exchange_strong(_hold, false, std::memory_order_release,
446 std::memory_order_relaxed);
447 }
448}
449
450//======================================================================================//
451
452} // namespace PTL
void PushTask(task_pointer &&) PTL_NO_SANITIZE_THREAD
task_pointer PopTask(bool front=true) PTL_NO_SANITIZE_THREAD
bool empty() const
static ThreadData *& GetInstance()
Definition ThreadData.cc:31
static uintmax_t get_this_thread_id()
intmax_t InsertTask(task_pointer &&, ThreadData *=nullptr, intmax_t subq=-1) override PTL_NO_SANITIZE_THREAD
intmax_t GetInsertBin() const
UserTaskQueue(intmax_t nworkers=-1, UserTaskQueue *=nullptr)
void ExecuteOnAllThreads(ThreadPool *tp, function_type f) override
void resize(intmax_t) override
VUserTaskQueue * clone() override
task_pointer GetTask(intmax_t subq=-1, intmax_t nitr=-1) override
void ExecuteOnSpecificThreads(ThreadIdSet tid_set, ThreadPool *tp, function_type f) override
std::vector< TaskSubQueue * > TaskSubQueueContainer
intmax_t GetThreadBin() const override
bool true_empty() const override
std::shared_ptr< VTask > task_pointer
task_pointer GetThreadBinTask()
~UserTaskQueue() override
std::set< ThreadId > ThreadIdSet
std::function< void()> function_type
intmax_t workers() const
VUserTaskQueue(intmax_t nworkers=-1)
Backports of C++ language features for use with C++11 compilers.
Definition AutoLock.hh:255
TemplateAutoLock< Mutex > AutoLock
Definition AutoLock.hh:479