include/boost/capy/ex/async_mutex.hpp

98.9% Lines (93/94) 100.0% Functions (20/20)
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/capy
8 //
9
10 #ifndef BOOST_CAPY_ASYNC_MUTEX_HPP
11 #define BOOST_CAPY_ASYNC_MUTEX_HPP
12
13 #include <boost/capy/detail/config.hpp>
14 #include <boost/capy/detail/intrusive.hpp>
15 #include <boost/capy/concept/executor.hpp>
16 #include <boost/capy/error.hpp>
17 #include <boost/capy/ex/io_env.hpp>
18 #include <boost/capy/io_result.hpp>
19
20 #include <stop_token>
21
22 #include <atomic>
23 #include <coroutine>
24 #include <new>
25 #include <utility>
26
27 /* async_mutex implementation notes
28 ================================
29
30 Waiters form a doubly-linked intrusive list (fair FIFO). lock_awaiter
31 inherits intrusive_list<lock_awaiter>::node; the list is owned by
32 async_mutex::waiters_.
33
34 Cancellation via stop_token
35 ---------------------------
36 A std::stop_callback is registered in await_suspend. Two actors can
37 race to resume the suspended coroutine: unlock() and the stop callback.
38 An atomic bool `claimed_` resolves the race -- whoever does
39 claimed_.exchange(true) and reads false wins. The loser does nothing.
40
41 The stop callback calls ex_.post(h_). The stop_callback is
42 destroyed later in await_resume. cancel_fn touches no members
43 after post returns (same pattern as delete-this).
44
45 unlock() pops waiters from the front. If the popped waiter was
46 already claimed by the stop callback, unlock() skips it and tries
47 the next. await_resume removes the (still-linked) canceled waiter
48 via waiters_.remove(this).
49
50 The stop_callback lives in a union to suppress automatic
51 construction/destruction. Placement new in await_suspend, explicit
52 destructor call in await_resume and ~lock_awaiter.
53
54 Member ordering constraint
55 --------------------------
56 The union containing stop_cb_ must be declared AFTER the members
57 the callback accesses (h_, ex_, claimed_, canceled_). If the
58 stop_cb_ destructor blocks waiting for a concurrent callback, those
59 members must still be alive (C++ destroys in reverse declaration
60 order).
61
62 active_ flag
63 ------------
64 Tracks both list membership and stop_cb_ lifetime (they are always
65 set and cleared together). Used by the destructor to clean up if the
66 coroutine is destroyed while suspended (e.g. execution_context
67 shutdown).
68
69 Cancellation scope
70 ------------------
71 Cancellation only takes effect while the coroutine is suspended in
72 the wait queue. If the mutex is unlocked, await_ready acquires it
73 immediately without checking the stop token. This is intentional:
74 the fast path has no token access and no overhead.
75
76 Threading assumptions
77 ---------------------
78 - All list mutations happen on the executor thread (await_suspend,
79 await_resume, unlock, ~lock_awaiter).
80 - The stop callback may fire from any thread, but only touches
81 claimed_ (atomic) and then calls post. It never touches the
82 list.
83 - ~lock_awaiter must be called from the executor thread. This is
84 guaranteed during normal shutdown but NOT if the coroutine frame
85 is destroyed from another thread while a stop callback could
86 fire (precondition violation, same as cppcoro/folly).
87 */
88
89 namespace boost {
90 namespace capy {
91
92 /** An asynchronous mutex for coroutines.
93
94 This mutex provides mutual exclusion for coroutines without blocking.
95 When a coroutine attempts to acquire a locked mutex, it suspends and
96 is added to an intrusive wait queue. When the holder unlocks, the next
97 waiter is resumed with the lock held.
98
99 @par Cancellation
100
101 When a coroutine is suspended waiting for the mutex and its stop
102 token is triggered, the waiter completes with `error::canceled`
103 instead of acquiring the lock.
104
105 Cancellation only applies while the coroutine is suspended in the
106 wait queue. If the mutex is unlocked when `lock()` is called, the
107 lock is acquired immediately even if the stop token is already
108 signaled.
109
110 @par Zero Allocation
111
112 No heap allocation occurs for lock operations.
113
114 @par Thread Safety
115
116 The mutex operations are designed for single-threaded use on one
117 executor. The stop callback may fire from any thread.
118
119 @par Example
120 @code
121 async_mutex cm;
122
123 task<> protected_operation() {
124 auto [ec] = co_await cm.lock();
125 if(ec)
126 co_return;
127 // ... critical section ...
128 cm.unlock();
129 }
130
131 // Or with RAII:
132 task<> protected_operation() {
133 auto [ec, guard] = co_await cm.scoped_lock();
134 if(ec)
135 co_return;
136 // ... critical section ...
137 // unlocks automatically
138 }
139 @endcode
140 */
141 class async_mutex
142 {
143 public:
144 class lock_awaiter;
145 class lock_guard;
146 class lock_guard_awaiter;
147
148 private:
149 bool locked_ = false;
150 detail::intrusive_list<lock_awaiter> waiters_;
151
152 public:
153 /** Awaiter returned by lock().
154 */
155 class lock_awaiter
156 : public detail::intrusive_list<lock_awaiter>::node
157 {
158 friend class async_mutex;
159
160 async_mutex* m_;
161 std::coroutine_handle<> h_;
162 executor_ref ex_;
163
164 // These members must be declared before stop_cb_
165 // (see comment on the union below).
166 std::atomic<bool> claimed_{false};
167 bool canceled_ = false;
168 bool active_ = false;
169
170 struct cancel_fn
171 {
172 lock_awaiter* self_;
173
174 6x void operator()() const noexcept
175 {
176 6x if(!self_->claimed_.exchange(
177 true, std::memory_order_acq_rel))
178 {
179 6x self_->canceled_ = true;
180 6x self_->ex_.post(self_->h_);
181 }
182 6x }
183 };
184
185 using stop_cb_t =
186 std::stop_callback<cancel_fn>;
187
188 // Aligned storage for stop_cb_t. Declared last:
189 // its destructor may block while the callback
190 // accesses the members above.
191 #ifdef _MSC_VER
192 # pragma warning(push)
193 # pragma warning(disable: 4324) // padded due to alignas
194 #endif
195 alignas(stop_cb_t)
196 unsigned char stop_cb_buf_[sizeof(stop_cb_t)];
197 #ifdef _MSC_VER
198 # pragma warning(pop)
199 #endif
200
201 17x stop_cb_t& stop_cb_() noexcept
202 {
203 return *reinterpret_cast<stop_cb_t*>(
204 17x stop_cb_buf_);
205 }
206
207 public:
208 70x ~lock_awaiter()
209 {
210 70x if(active_)
211 {
212 3x stop_cb_().~stop_cb_t();
213 3x m_->waiters_.remove(this);
214 }
215 70x }
216
217 35x explicit lock_awaiter(async_mutex* m) noexcept
218 35x : m_(m)
219 {
220 35x }
221
222 35x lock_awaiter(lock_awaiter&& o) noexcept
223 35x : m_(o.m_)
224 35x , h_(o.h_)
225 35x , ex_(o.ex_)
226 35x , claimed_(o.claimed_.load(
227 std::memory_order_relaxed))
228 35x , canceled_(o.canceled_)
229 35x , active_(std::exchange(o.active_, false))
230 {
231 35x }
232
233 lock_awaiter(lock_awaiter const&) = delete;
234 lock_awaiter& operator=(lock_awaiter const&) = delete;
235 lock_awaiter& operator=(lock_awaiter&&) = delete;
236
237 35x bool await_ready() const noexcept
238 {
239 35x if(!m_->locked_)
240 {
241 16x m_->locked_ = true;
242 16x return true;
243 }
244 19x return false;
245 }
246
247 /** IoAwaitable protocol overload. */
248 std::coroutine_handle<>
249 19x await_suspend(
250 std::coroutine_handle<> h,
251 io_env const* env) noexcept
252 {
253 19x if(env->stop_token.stop_requested())
254 {
255 2x canceled_ = true;
256 2x return h;
257 }
258 17x h_ = h;
259 17x ex_ = env->executor;
260 17x m_->waiters_.push_back(this);
261 51x ::new(stop_cb_buf_) stop_cb_t(
262 17x env->stop_token, cancel_fn{this});
263 17x active_ = true;
264 17x return std::noop_coroutine();
265 }
266
267 32x io_result<> await_resume() noexcept
268 {
269 32x if(active_)
270 {
271 14x stop_cb_().~stop_cb_t();
272 14x if(canceled_)
273 {
274 6x m_->waiters_.remove(this);
275 6x active_ = false;
276 6x return {make_error_code(
277 6x error::canceled)};
278 }
279 8x active_ = false;
280 }
281 26x if(canceled_)
282 2x return {make_error_code(
283 2x error::canceled)};
284 24x return {{}};
285 }
286 };
287
288 /** RAII lock guard for async_mutex.
289
290 Automatically unlocks the mutex when destroyed.
291 */
292 class [[nodiscard]] lock_guard
293 {
294 async_mutex* m_;
295
296 public:
297 5x ~lock_guard()
298 {
299 5x if(m_)
300 2x m_->unlock();
301 5x }
302
303 2x lock_guard() noexcept
304 2x : m_(nullptr)
305 {
306 2x }
307
308 2x explicit lock_guard(async_mutex* m) noexcept
309 2x : m_(m)
310 {
311 2x }
312
313 1x lock_guard(lock_guard&& o) noexcept
314 1x : m_(std::exchange(o.m_, nullptr))
315 {
316 1x }
317
318 lock_guard& operator=(lock_guard&& o) noexcept
319 {
320 if(this != &o)
321 {
322 if(m_)
323 m_->unlock();
324 m_ = std::exchange(o.m_, nullptr);
325 }
326 return *this;
327 }
328
329 lock_guard(lock_guard const&) = delete;
330 lock_guard& operator=(lock_guard const&) = delete;
331 };
332
333 /** Awaiter returned by scoped_lock() that returns a lock_guard on resume.
334 */
335 class lock_guard_awaiter
336 {
337 async_mutex* m_;
338 lock_awaiter inner_;
339
340 public:
341 4x explicit lock_guard_awaiter(async_mutex* m) noexcept
342 4x : m_(m)
343 4x , inner_(m)
344 {
345 4x }
346
347 4x bool await_ready() const noexcept
348 {
349 4x return inner_.await_ready();
350 }
351
352 /** IoAwaitable protocol overload. */
353 std::coroutine_handle<>
354 2x await_suspend(
355 std::coroutine_handle<> h,
356 io_env const* env) noexcept
357 {
358 2x return inner_.await_suspend(h, env);
359 }
360
361 4x io_result<lock_guard> await_resume() noexcept
362 {
363 4x auto r = inner_.await_resume();
364 4x if(r.ec)
365 2x return {r.ec, {}};
366 2x return {{}, lock_guard(m_)};
367 4x }
368 };
369
370 async_mutex() = default;
371
372 // Non-copyable, non-movable
373 async_mutex(async_mutex const&) = delete;
374 async_mutex& operator=(async_mutex const&) = delete;
375
376 /** Returns an awaiter that acquires the mutex.
377
378 @return An awaitable yielding `(error_code)`.
379 */
380 31x lock_awaiter lock() noexcept
381 {
382 31x return lock_awaiter{this};
383 }
384
385 /** Returns an awaiter that acquires the mutex with RAII.
386
387 @return An awaitable yielding `(error_code,lock_guard)`.
388 */
389 4x lock_guard_awaiter scoped_lock() noexcept
390 {
391 4x return lock_guard_awaiter(this);
392 }
393
394 /** Releases the mutex.
395
396 If waiters are queued, the next eligible waiter is
397 resumed with the lock held. Canceled waiters are
398 skipped. If no eligible waiter remains, the mutex
399 becomes unlocked.
400 */
401 24x void unlock() noexcept
402 {
403 for(;;)
404 {
405 24x auto* waiter = waiters_.pop_front();
406 24x if(!waiter)
407 {
408 16x locked_ = false;
409 16x return;
410 }
411 8x if(!waiter->claimed_.exchange(
412 true, std::memory_order_acq_rel))
413 {
414 8x waiter->ex_.post(waiter->h_);
415 8x return;
416 }
417 }
418 }
419
420 /** Returns true if the mutex is currently locked.
421 */
422 26x bool is_locked() const noexcept
423 {
424 26x return locked_;
425 }
426 };
427
428 } // namespace capy
429 } // namespace boost
430
431 #endif
432