src/ex/detail/strand_service.cpp

97.8% Lines (89/91) 95.5% Functions (21/22)
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/capy
8 //
9
10 #include "src/ex/detail/strand_queue.hpp"
11 #include <boost/capy/ex/detail/strand_service.hpp>
12 #include <atomic>
13 #include <coroutine>
14 #include <mutex>
15 #include <thread>
16 #include <utility>
17
18 namespace boost {
19 namespace capy {
20 namespace detail {
21
22 //----------------------------------------------------------
23
24 /** Implementation state for a strand.
25
26 Each strand_impl provides serialization for coroutines
27 dispatched through strands that share it.
28 */
29 struct strand_impl
30 {
31 std::mutex mutex_;
32 strand_queue pending_;
33 bool locked_ = false;
34 std::atomic<std::thread::id> dispatch_thread_{};
35 void* cached_frame_ = nullptr;
36 };
37
38 //----------------------------------------------------------
39
40 /** Invoker coroutine for strand dispatch.
41
42 Uses custom allocator to recycle frame - one allocation
43 per strand_impl lifetime, stored in trailer for recovery.
44 */
45 struct strand_invoker
46 {
47 struct promise_type
48 {
49 13x void* operator new(std::size_t n, strand_impl& impl)
50 {
51 13x constexpr auto A = alignof(strand_impl*);
52 13x std::size_t padded = (n + A - 1) & ~(A - 1);
53 13x std::size_t total = padded + sizeof(strand_impl*);
54
55 13x void* p = impl.cached_frame_
56 13x ? std::exchange(impl.cached_frame_, nullptr)
57 9x : ::operator new(total);
58
59 // Trailer lets delete recover impl
60 13x *reinterpret_cast<strand_impl**>(
61 13x static_cast<char*>(p) + padded) = &impl;
62 13x return p;
63 }
64
65 13x void operator delete(void* p, std::size_t n) noexcept
66 {
67 13x constexpr auto A = alignof(strand_impl*);
68 13x std::size_t padded = (n + A - 1) & ~(A - 1);
69
70 13x auto* impl = *reinterpret_cast<strand_impl**>(
71 static_cast<char*>(p) + padded);
72
73 13x if (!impl->cached_frame_)
74 13x impl->cached_frame_ = p;
75 else
76 ::operator delete(p);
77 13x }
78
79 13x strand_invoker get_return_object() noexcept
80 13x { return {std::coroutine_handle<promise_type>::from_promise(*this)}; }
81
82 13x std::suspend_always initial_suspend() noexcept { return {}; }
83 13x std::suspend_never final_suspend() noexcept { return {}; }
84 13x void return_void() noexcept {}
85 void unhandled_exception() { std::terminate(); }
86 };
87
88 std::coroutine_handle<promise_type> h_;
89 };
90
91 //----------------------------------------------------------
92
93 /** Concrete implementation of strand_service.
94
95 Holds the fixed pool of strand_impl objects.
96 */
97 class strand_service_impl : public strand_service
98 {
99 static constexpr std::size_t num_impls = 211;
100
101 strand_impl impls_[num_impls];
102 std::size_t salt_ = 0;
103 std::mutex mutex_;
104
105 public:
106 explicit
107 21x strand_service_impl(execution_context&)
108 4452x {
109 21x }
110
111 strand_impl*
112 25x get_implementation() override
113 {
114 25x std::lock_guard<std::mutex> lock(mutex_);
115 25x std::size_t index = salt_++;
116 25x index = index % num_impls;
117 25x return &impls_[index];
118 25x }
119
120 protected:
121 void
122 21x shutdown() override
123 {
124 4452x for(std::size_t i = 0; i < num_impls; ++i)
125 {
126 4431x std::lock_guard<std::mutex> lock(impls_[i].mutex_);
127 4431x impls_[i].locked_ = true;
128
129 4431x if(impls_[i].cached_frame_)
130 {
131 9x ::operator delete(impls_[i].cached_frame_);
132 9x impls_[i].cached_frame_ = nullptr;
133 }
134 4431x }
135 21x }
136
137 private:
138 static bool
139 328x enqueue(strand_impl& impl, std::coroutine_handle<> h)
140 {
141 328x std::lock_guard<std::mutex> lock(impl.mutex_);
142 328x impl.pending_.push(h);
143 328x if(!impl.locked_)
144 {
145 13x impl.locked_ = true;
146 13x return true;
147 }
148 315x return false;
149 328x }
150
151 static void
152 18x dispatch_pending(strand_impl& impl)
153 {
154 18x strand_queue::taken_batch batch;
155 {
156 18x std::lock_guard<std::mutex> lock(impl.mutex_);
157 18x batch = impl.pending_.take_all();
158 18x }
159 18x impl.pending_.dispatch_batch(batch);
160 18x }
161
162 static bool
163 18x try_unlock(strand_impl& impl)
164 {
165 18x std::lock_guard<std::mutex> lock(impl.mutex_);
166 18x if(impl.pending_.empty())
167 {
168 13x impl.locked_ = false;
169 13x return true;
170 }
171 5x return false;
172 18x }
173
174 static void
175 18x set_dispatch_thread(strand_impl& impl) noexcept
176 {
177 18x impl.dispatch_thread_.store(std::this_thread::get_id());
178 18x }
179
180 static void
181 13x clear_dispatch_thread(strand_impl& impl) noexcept
182 {
183 13x impl.dispatch_thread_.store(std::thread::id{});
184 13x }
185
186 // Loops until queue empty (aggressive). Alternative: per-batch fairness
187 // (repost after each batch to let other work run) - explore if starvation observed.
188 static strand_invoker
189 13x make_invoker(strand_impl& impl)
190 {
191 strand_impl* p = &impl;
192 for(;;)
193 {
194 set_dispatch_thread(*p);
195 dispatch_pending(*p);
196 if(try_unlock(*p))
197 {
198 clear_dispatch_thread(*p);
199 co_return;
200 }
201 }
202 26x }
203
204 friend class strand_service;
205 };
206
207 //----------------------------------------------------------
208
209 21x strand_service::
210 21x strand_service()
211 21x : service()
212 {
213 21x }
214
215 21x strand_service::
216 ~strand_service() = default;
217
218 bool
219 6x strand_service::
220 running_in_this_thread(strand_impl& impl) noexcept
221 {
222 6x return impl.dispatch_thread_.load() == std::this_thread::get_id();
223 }
224
225 std::coroutine_handle<>
226 5x strand_service::
227 dispatch(strand_impl& impl, executor_ref ex, std::coroutine_handle<> h)
228 {
229 5x if(running_in_this_thread(impl))
230 2x return h;
231
232 3x if(strand_service_impl::enqueue(impl, h))
233 3x ex.post(strand_service_impl::make_invoker(impl).h_);
234 3x return std::noop_coroutine();
235 }
236
237 void
238 325x strand_service::
239 post(strand_impl& impl, executor_ref ex, std::coroutine_handle<> h)
240 {
241 325x if(strand_service_impl::enqueue(impl, h))
242 10x ex.post(strand_service_impl::make_invoker(impl).h_);
243 325x }
244
245 strand_service&
246 25x get_strand_service(execution_context& ctx)
247 {
248 25x return ctx.use_service<strand_service_impl>();
249 }
250
251 } // namespace detail
252 } // namespace capy
253 } // namespace boost
254