src/ex/thread_pool.cpp

90.0% Lines (81/90) 88.9% Functions (16/18)
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 // Copyright (c) 2026 Michael Vandeberg
4 //
5 // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 //
8 // Official repository: https://github.com/boostorg/capy
9 //
10
11 #include <boost/capy/ex/thread_pool.hpp>
12 #include <boost/capy/detail/intrusive.hpp>
13 #include <boost/capy/test/thread_name.hpp>
14 #include <condition_variable>
15 #include <cstdio>
16 #include <mutex>
17 #include <thread>
18 #include <vector>
19
20 /*
21 Thread pool implementation using a shared work queue.
22
23 Work items are coroutine handles wrapped in intrusive list nodes, stored
24 in a single queue protected by a mutex. Worker threads wait on a
25 condition_variable until work is available or stop is requested.
26
27 Threads are started lazily on first post() via std::call_once to avoid
28 spawning threads for pools that are constructed but never used. Each
29 thread is named with a configurable prefix plus index for debugger
30 visibility.
31
32 Shutdown sequence: stop() sets the stop flag and notifies all threads,
33 then the destructor joins threads and destroys any remaining queued
34 work without executing it.
35 */
36
37 namespace boost {
38 namespace capy {
39
40 //------------------------------------------------------------------------------
41
42 class thread_pool::impl
43 {
44 struct work : detail::intrusive_queue<work>::node
45 {
46 std::coroutine_handle<> h_;
47
48 131x explicit work(std::coroutine_handle<> h) noexcept
49 131x : h_(h)
50 {
51 131x }
52
53 131x void run()
54 {
55 131x auto h = h_;
56 131x delete this;
57 131x h.resume();
58 131x }
59
60 void destroy()
61 {
62 delete this;
63 }
64 };
65
66 std::mutex mutex_;
67 std::condition_variable cv_;
68 detail::intrusive_queue<work> q_;
69 std::vector<std::thread> threads_;
70 bool stop_{false};
71 std::size_t num_threads_;
72 char thread_name_prefix_[13]{}; // 12 chars max + null terminator
73 std::once_flag start_flag_;
74
75 public:
76 63x ~impl()
77 {
78 63x stop();
79 105x for(auto& t : threads_)
80 42x if(t.joinable())
81 t.join();
82
83 63x while(auto* w = q_.pop())
84 w->destroy();
85 63x }
86
87 63x impl(std::size_t num_threads, std::string_view thread_name_prefix)
88 63x : num_threads_(num_threads)
89 {
90 63x if(num_threads_ == 0)
91 2x num_threads_ = std::thread::hardware_concurrency();
92 63x if(num_threads_ == 0)
93 num_threads_ = 1;
94
95 // Truncate prefix to 12 chars, leaving room for up to 3-digit index.
96 63x auto n = thread_name_prefix.copy(thread_name_prefix_, 12);
97 63x thread_name_prefix_[n] = '\0';
98 63x }
99
100 void
101 131x post(std::coroutine_handle<> h)
102 {
103 131x ensure_started();
104 131x auto* w = new work(h);
105 {
106 131x std::lock_guard<std::mutex> lock(mutex_);
107 131x q_.push(w);
108 131x }
109 131x cv_.notify_one();
110 131x }
111
112 void
113 63x join() noexcept
114 {
115 63x stop();
116 105x for(auto& t : threads_)
117 42x if(t.joinable())
118 42x t.join();
119 63x }
120
121 void
122 126x stop() noexcept
123 {
124 {
125 126x std::lock_guard<std::mutex> lock(mutex_);
126 126x stop_ = true;
127 126x }
128 126x cv_.notify_all();
129 126x }
130
131 private:
132 void
133 131x ensure_started()
134 {
135 131x std::call_once(start_flag_, [this]{
136 24x threads_.reserve(num_threads_);
137 66x for(std::size_t i = 0; i < num_threads_; ++i)
138 84x threads_.emplace_back([this, i]{ run(i); });
139 24x });
140 131x }
141
142 void
143 42x run(std::size_t index)
144 {
145 // Build name; set_current_thread_name truncates to platform limits.
146 char name[16];
147 42x std::snprintf(name, sizeof(name), "%s%zu", thread_name_prefix_, index);
148 42x set_current_thread_name(name);
149
150 for(;;)
151 {
152 173x work* w = nullptr;
153 {
154 173x std::unique_lock<std::mutex> lock(mutex_);
155 173x cv_.wait(lock, [this]{
156 345x return !q_.empty() ||
157 345x stop_;
158 });
159 173x if(stop_ && q_.empty())
160 84x return;
161 131x w = q_.pop();
162 173x }
163 131x if(w)
164 131x w->run();
165 131x }
166 }
167 };
168
169 //------------------------------------------------------------------------------
170
171 63x thread_pool::
172 ~thread_pool()
173 {
174 63x impl_->join();
175 63x shutdown();
176 63x destroy();
177 63x delete impl_;
178 63x }
179
180 63x thread_pool::
181 63x thread_pool(std::size_t num_threads, std::string_view thread_name_prefix)
182 63x : impl_(new impl(num_threads, thread_name_prefix))
183 {
184 63x this->set_frame_allocator(std::allocator<void>{});
185 63x }
186
187 void
188 thread_pool::
189 stop() noexcept
190 {
191 impl_->stop();
192 }
193
194 //------------------------------------------------------------------------------
195
196 thread_pool::executor_type
197 63x thread_pool::
198 get_executor() const noexcept
199 {
200 63x return executor_type(
201 63x const_cast<thread_pool&>(*this));
202 }
203
204 void
205 131x thread_pool::executor_type::
206 post(std::coroutine_handle<> h) const
207 {
208 131x pool_->impl_->post(h);
209 131x }
210
211 } // capy
212 } // boost
213