11 std::deque<std::function<void()>> tasks;
13 std::condition_variable taskCondition;
14 std::atomic<int> remaining_tasks = 0;
15 bool shuttingDown =
false;
16 void addTask(std::function<
void()>&& callback) {
18 std::lock_guard<std::mutex> lock(mutex_);
19 tasks.emplace_back(std::move(callback));
22 taskCondition.notify_one();
25 bool getTask(std::function<
void()>& task) {
26 std::unique_lock<std::mutex> lock(mutex_);
27 taskCondition.wait(lock, [
this] {
return !tasks.empty() || shuttingDown; });
29 if (shuttingDown || tasks.empty())
return false;
31 task = std::move(tasks.front());
36 void waitUntilDone()
const {
37 while (remaining_tasks > 0) {
38 std::this_thread::yield();
49 std::thread cur_thread;
50 std::function<void()> task =
nullptr;
56 , t_queue{ &task_queue_ }
58 cur_thread = std::thread([
this]() {
65 std::function<void()> task;
66 if (t_queue->getTask(task)) {
68 t_queue->completeTask();
75 t_queue->shuttingDown =
true;
76 t_queue->taskCondition.notify_all();
77 if (cur_thread.joinable()) {
87 std::vector<Thread> threads;
90 : num_threads{ num_threads_ }
92 threads.reserve(num_threads_);
93 for (
int i = 0; i < num_threads_; i++)
94 threads.emplace_back(t_queue, i);
97 void parallel(
int num_obj, std::function<
void(
int start,
int end)>&& callback) {
98 if (num_obj == 0)
return;
99 int slice_size = num_obj / num_threads;
100 for (
int i = 0; i < num_threads; i++) {
101 int start = i * slice_size;
102 int end = start + slice_size;
103 t_queue.addTask([start, end, &callback]() { callback(start, end);});
105 if (slice_size * num_threads < num_obj) {
106 int start = slice_size * num_threads;
107 callback(start, num_obj);
110 t_queue.waitUntilDone();