TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Michael Vandeberg
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/capy
8 : //
9 :
10 : #ifndef BOOST_CAPY_WHEN_ANY_HPP
11 : #define BOOST_CAPY_WHEN_ANY_HPP
12 :
13 : #include <boost/capy/detail/config.hpp>
14 : #include <boost/capy/concept/executor.hpp>
15 : #include <boost/capy/concept/io_awaitable.hpp>
16 : #include <coroutine>
17 : #include <boost/capy/ex/executor_ref.hpp>
18 : #include <boost/capy/ex/frame_allocator.hpp>
19 : #include <boost/capy/ex/io_env.hpp>
20 : #include <boost/capy/task.hpp>
21 :
22 : #include <array>
23 : #include <atomic>
24 : #include <exception>
25 : #include <optional>
26 : #include <ranges>
27 : #include <stdexcept>
28 : #include <stop_token>
29 : #include <tuple>
30 : #include <type_traits>
31 : #include <utility>
32 : #include <variant>
33 : #include <vector>
34 :
35 : /*
36 : when_any - Race multiple tasks, return first completion
37 : ========================================================
38 :
39 : OVERVIEW:
40 : ---------
41 : when_any launches N tasks concurrently and completes when the FIRST task
42 : finishes (success or failure). It then requests stop for all siblings and
43 : waits for them to acknowledge before returning.
44 :
45 : ARCHITECTURE:
46 : -------------
47 : The design mirrors when_all but with inverted completion semantics:
48 :
49 : when_all: complete when remaining_count reaches 0 (all done)
50 : when_any: complete when has_winner becomes true (first done)
51 : BUT still wait for remaining_count to reach 0 for cleanup
52 :
53 : Key components:
54 : - when_any_state: Shared state tracking winner and completion
55 : - when_any_runner: Wrapper coroutine for each child task
56 : - when_any_launcher: Awaitable that starts all runners concurrently
57 :
58 : CRITICAL INVARIANTS:
59 : --------------------
60 : 1. Exactly one task becomes the winner (via atomic compare_exchange)
61 : 2. All tasks must complete before parent resumes (cleanup safety)
62 : 3. Stop is requested immediately when winner is determined
63 : 4. Only the winner's result/exception is stored
64 :
65 : POSITIONAL VARIANT:
66 : -------------------
67 : The variadic overload returns a std::variant with one alternative per
68 : input task, preserving positional correspondence. Use .index() on
69 : the variant to identify which task won.
70 :
71 : Example: when_any(task<int>, task<string>, task<int>)
72 : - Raw types after void->monostate: int, string, int
73 : - Result variant: std::variant<int, string, int>
74 : - variant.index() tells you which task won (0, 1, or 2)
75 :
76 : VOID HANDLING:
77 : --------------
78 : void tasks contribute std::monostate to the variant.
79 : All-void tasks result in: variant<monostate, monostate, monostate>
80 :
81 : MEMORY MODEL:
82 : -------------
83 : Synchronization chain from winner's write to parent's read:
84 :
85 : 1. Winner thread writes result_/winner_exception_ (non-atomic)
86 : 2. Winner thread calls signal_completion() → fetch_sub(acq_rel) on remaining_count_
87 : 3. Last task thread (may be winner or non-winner) calls signal_completion()
88 : → fetch_sub(acq_rel) on remaining_count_, observing count becomes 0
89 : 4. Last task returns caller_ex_.dispatch(continuation_) via symmetric transfer
90 : 5. Parent coroutine resumes and reads result_/winner_exception_
91 :
92 : Synchronization analysis:
93 : - All fetch_sub operations on remaining_count_ form a release sequence
94 : - Winner's fetch_sub releases; subsequent fetch_sub operations participate
95 : in the modification order of remaining_count_
96 : - Last task's fetch_sub(acq_rel) synchronizes-with prior releases in the
97 : modification order, establishing happens-before from winner's writes
98 : - Executor dispatch() is expected to provide queue-based synchronization
99 : (release-on-post, acquire-on-execute) completing the chain to parent
100 : - Even inline executors work (same thread = sequenced-before)
101 :
102 : Alternative considered: Adding winner_ready_ atomic (set with release after
103 : storing winner data, acquired before reading) would make synchronization
104 : self-contained and not rely on executor implementation details. Current
105 : approach is correct but requires careful reasoning about release sequences
106 : and executor behavior.
107 :
108 : EXCEPTION SEMANTICS:
109 : --------------------
110 : Unlike when_all (which captures first exception, discards others), when_any
111 : treats exceptions as valid completions. If the winning task threw, that
112 : exception is rethrown. Exceptions from non-winners are silently discarded.
113 : */
114 :
115 : namespace boost {
116 : namespace capy {
117 :
118 : /** Convert void to monostate for variant storage.
119 :
120 : std::variant<void, ...> is ill-formed, so void tasks contribute
121 : std::monostate to the result variant instead. Non-void types
122 : pass through unchanged.
123 :
124 : @tparam T The type to potentially convert (void becomes monostate).
125 : */
126 : template<typename T>
127 : using void_to_monostate_t = std::conditional_t<std::is_void_v<T>, std::monostate, T>;
128 :
129 : namespace detail {
130 :
131 : /** Core shared state for when_any operations.
132 :
133 : Contains all members and methods common to both heterogeneous (variadic)
134 : and homogeneous (range) when_any implementations. State classes embed
135 : this via composition to avoid CRTP destructor ordering issues.
136 :
137 : @par Thread Safety
138 : Atomic operations protect winner selection and completion count.
139 : */
140 : struct when_any_core
141 : {
142 : std::atomic<std::size_t> remaining_count_;
143 : std::size_t winner_index_{0};
144 : std::exception_ptr winner_exception_;
145 : std::stop_source stop_source_;
146 :
147 : // Bridges parent's stop token to our stop_source
148 : struct stop_callback_fn
149 : {
150 : std::stop_source* source_;
151 HIT 9 : void operator()() const noexcept { source_->request_stop(); }
152 : };
153 : using stop_callback_t = std::stop_callback<stop_callback_fn>;
154 : std::optional<stop_callback_t> parent_stop_callback_;
155 :
156 : std::coroutine_handle<> continuation_;
157 : io_env const* caller_env_ = nullptr;
158 :
159 : // Placed last to avoid padding (1-byte atomic followed by 8-byte aligned members)
160 : std::atomic<bool> has_winner_{false};
161 :
162 65 : explicit when_any_core(std::size_t count) noexcept
163 65 : : remaining_count_(count)
164 : {
165 65 : }
166 :
167 : /** Atomically claim winner status; exactly one task succeeds. */
168 190 : bool try_win(std::size_t index) noexcept
169 : {
170 190 : bool expected = false;
171 190 : if(has_winner_.compare_exchange_strong(
172 : expected, true, std::memory_order_acq_rel))
173 : {
174 65 : winner_index_ = index;
175 65 : stop_source_.request_stop();
176 65 : return true;
177 : }
178 125 : return false;
179 : }
180 :
181 : /** @pre try_win() returned true. */
182 8 : void set_winner_exception(std::exception_ptr ep) noexcept
183 : {
184 8 : winner_exception_ = ep;
185 8 : }
186 :
187 : // Runners signal completion directly via final_suspend; no member function needed.
188 : };
189 :
190 : /** Shared state for heterogeneous when_any operation.
191 :
192 : Coordinates winner selection, result storage, and completion tracking
193 : for all child tasks in a when_any operation. Uses composition with
194 : when_any_core for shared functionality.
195 :
196 : @par Lifetime
197 : Allocated on the parent coroutine's frame, outlives all runners.
198 :
199 : @tparam Ts Task result types.
200 : */
201 : template<typename... Ts>
202 : struct when_any_state
203 : {
204 : static constexpr std::size_t task_count = sizeof...(Ts);
205 : using variant_type = std::variant<void_to_monostate_t<Ts>...>;
206 :
207 : when_any_core core_;
208 : std::optional<variant_type> result_;
209 : std::array<std::coroutine_handle<>, task_count> runner_handles_{};
210 :
211 43 : when_any_state()
212 43 : : core_(task_count)
213 : {
214 43 : }
215 :
216 : // Runners self-destruct in final_suspend. No destruction needed here.
217 :
218 : /** @pre core_.try_win() returned true.
219 : @note Uses in_place_index (not type) for positional variant access.
220 : */
221 : template<std::size_t I, typename T>
222 35 : void set_winner_result(T value)
223 : noexcept(std::is_nothrow_move_constructible_v<T>)
224 : {
225 35 : result_.emplace(std::in_place_index<I>, std::move(value));
226 35 : }
227 :
228 : /** @pre core_.try_win() returned true. */
229 : template<std::size_t I>
230 3 : void set_winner_void() noexcept
231 : {
232 3 : result_.emplace(std::in_place_index<I>, std::monostate{});
233 3 : }
234 : };
235 :
236 : /** Wrapper coroutine that runs a single child task for when_any.
237 :
238 : Propagates executor/stop_token to the child, attempts to claim winner
239 : status on completion, and signals completion for cleanup coordination.
240 :
241 : @tparam StateType The state type (when_any_state or when_any_homogeneous_state).
242 : */
243 : template<typename StateType>
244 : struct when_any_runner
245 : {
246 : struct promise_type // : frame_allocating_base // DISABLED FOR TESTING
247 : {
248 : StateType* state_ = nullptr;
249 : std::size_t index_ = 0;
250 : io_env env_;
251 :
252 190 : when_any_runner get_return_object() noexcept
253 : {
254 190 : return when_any_runner(std::coroutine_handle<promise_type>::from_promise(*this));
255 : }
256 :
257 : // Starts suspended; launcher sets up state/ex/token then resumes
258 190 : std::suspend_always initial_suspend() noexcept
259 : {
260 190 : return {};
261 : }
262 :
263 190 : auto final_suspend() noexcept
264 : {
265 : struct awaiter
266 : {
267 : promise_type* p_;
268 190 : bool await_ready() const noexcept { return false; }
269 190 : auto await_suspend(std::coroutine_handle<> h) noexcept
270 : {
271 : // Extract everything needed before self-destruction.
272 190 : auto& core = p_->state_->core_;
273 190 : auto* counter = &core.remaining_count_;
274 190 : auto* caller_env = core.caller_env_;
275 190 : auto cont = core.continuation_;
276 :
277 190 : h.destroy();
278 :
279 : // If last runner, dispatch parent for symmetric transfer.
280 190 : auto remaining = counter->fetch_sub(1, std::memory_order_acq_rel);
281 190 : if(remaining == 1)
282 65 : return detail::symmetric_transfer(caller_env->executor.dispatch(cont));
283 125 : return detail::symmetric_transfer(std::noop_coroutine());
284 : }
285 MIS 0 : void await_resume() const noexcept {}
286 : };
287 HIT 190 : return awaiter{this};
288 : }
289 :
290 178 : void return_void() noexcept {}
291 :
292 : // Exceptions are valid completions in when_any (unlike when_all)
293 12 : void unhandled_exception()
294 : {
295 12 : if(state_->core_.try_win(index_))
296 8 : state_->core_.set_winner_exception(std::current_exception());
297 12 : }
298 :
299 : /** Injects executor and stop token into child awaitables. */
300 : template<class Awaitable>
301 : struct transform_awaiter
302 : {
303 : std::decay_t<Awaitable> a_;
304 : promise_type* p_;
305 :
306 190 : bool await_ready() { return a_.await_ready(); }
307 190 : auto await_resume() { return a_.await_resume(); }
308 :
309 : template<class Promise>
310 185 : auto await_suspend(std::coroutine_handle<Promise> h)
311 : {
312 : using R = decltype(a_.await_suspend(h, &p_->env_));
313 : if constexpr (std::is_same_v<R, std::coroutine_handle<>>)
314 185 : return detail::symmetric_transfer(a_.await_suspend(h, &p_->env_));
315 : else
316 : return a_.await_suspend(h, &p_->env_);
317 : }
318 : };
319 :
320 : template<class Awaitable>
321 190 : auto await_transform(Awaitable&& a)
322 : {
323 : using A = std::decay_t<Awaitable>;
324 : if constexpr (IoAwaitable<A>)
325 : {
326 : return transform_awaiter<Awaitable>{
327 380 : std::forward<Awaitable>(a), this};
328 : }
329 : else
330 : {
331 : static_assert(sizeof(A) == 0, "requires IoAwaitable");
332 : }
333 190 : }
334 : };
335 :
336 : std::coroutine_handle<promise_type> h_;
337 :
338 190 : explicit when_any_runner(std::coroutine_handle<promise_type> h) noexcept
339 190 : : h_(h)
340 : {
341 190 : }
342 :
343 : // Enable move for all clang versions - some versions need it
344 : when_any_runner(when_any_runner&& other) noexcept : h_(std::exchange(other.h_, nullptr)) {}
345 :
346 : // Non-copyable
347 : when_any_runner(when_any_runner const&) = delete;
348 : when_any_runner& operator=(when_any_runner const&) = delete;
349 : when_any_runner& operator=(when_any_runner&&) = delete;
350 :
351 190 : auto release() noexcept
352 : {
353 190 : return std::exchange(h_, nullptr);
354 : }
355 : };
356 :
357 : /** Indexed overload for heterogeneous when_any (compile-time index).
358 :
359 : Uses compile-time index I for variant construction via in_place_index.
360 : Called from when_any_launcher::launch_one<I>().
361 : */
362 : template<std::size_t I, IoAwaitable Awaitable, typename StateType>
363 : when_any_runner<StateType>
364 105 : make_when_any_runner(Awaitable inner, StateType* state)
365 : {
366 : using T = awaitable_result_t<Awaitable>;
367 : if constexpr (std::is_void_v<T>)
368 : {
369 : co_await std::move(inner);
370 : if(state->core_.try_win(I))
371 : state->template set_winner_void<I>();
372 : }
373 : else
374 : {
375 : auto result = co_await std::move(inner);
376 : if(state->core_.try_win(I))
377 : {
378 : try
379 : {
380 : state->template set_winner_result<I>(std::move(result));
381 : }
382 : catch(...)
383 : {
384 : state->core_.set_winner_exception(std::current_exception());
385 : }
386 : }
387 : }
388 210 : }
389 :
390 : /** Runtime-index overload for homogeneous when_any (range path).
391 :
392 : Uses requires-expressions to detect state capabilities:
393 : - set_winner_void(): for heterogeneous void tasks (stores monostate)
394 : - set_winner_result(): for non-void tasks
395 : - Neither: for homogeneous void tasks (no result storage)
396 : */
397 : template<IoAwaitable Awaitable, typename StateType>
398 : when_any_runner<StateType>
399 85 : make_when_any_runner(Awaitable inner, StateType* state, std::size_t index)
400 : {
401 : using T = awaitable_result_t<Awaitable>;
402 : if constexpr (std::is_void_v<T>)
403 : {
404 : co_await std::move(inner);
405 : if(state->core_.try_win(index))
406 : {
407 : if constexpr (requires { state->set_winner_void(); })
408 : state->set_winner_void();
409 : }
410 : }
411 : else
412 : {
413 : auto result = co_await std::move(inner);
414 : if(state->core_.try_win(index))
415 : {
416 : try
417 : {
418 : state->set_winner_result(std::move(result));
419 : }
420 : catch(...)
421 : {
422 : state->core_.set_winner_exception(std::current_exception());
423 : }
424 : }
425 : }
426 170 : }
427 :
428 : /** Launches all runners concurrently; see await_suspend for lifetime concerns. */
429 : template<IoAwaitable... Awaitables>
430 : class when_any_launcher
431 : {
432 : using state_type = when_any_state<awaitable_result_t<Awaitables>...>;
433 :
434 : std::tuple<Awaitables...>* tasks_;
435 : state_type* state_;
436 :
437 : public:
438 43 : when_any_launcher(
439 : std::tuple<Awaitables...>* tasks,
440 : state_type* state)
441 43 : : tasks_(tasks)
442 43 : , state_(state)
443 : {
444 43 : }
445 :
446 43 : bool await_ready() const noexcept
447 : {
448 43 : return sizeof...(Awaitables) == 0;
449 : }
450 :
451 : /** CRITICAL: If the last task finishes synchronously, parent resumes and
452 : destroys this object before await_suspend returns. Must not reference
453 : `this` after the final launch_one call.
454 : */
455 43 : std::coroutine_handle<> await_suspend(std::coroutine_handle<> continuation, io_env const* caller_env)
456 : {
457 43 : state_->core_.continuation_ = continuation;
458 43 : state_->core_.caller_env_ = caller_env;
459 :
460 43 : if(caller_env->stop_token.stop_possible())
461 : {
462 18 : state_->core_.parent_stop_callback_.emplace(
463 9 : caller_env->stop_token,
464 9 : when_any_core::stop_callback_fn{&state_->core_.stop_source_});
465 :
466 9 : if(caller_env->stop_token.stop_requested())
467 3 : state_->core_.stop_source_.request_stop();
468 : }
469 :
470 43 : auto token = state_->core_.stop_source_.get_token();
471 86 : [&]<std::size_t... Is>(std::index_sequence<Is...>) {
472 43 : (..., launch_one<Is>(caller_env->executor, token));
473 43 : }(std::index_sequence_for<Awaitables...>{});
474 :
475 86 : return std::noop_coroutine();
476 43 : }
477 :
478 43 : void await_resume() const noexcept
479 : {
480 43 : }
481 :
482 : private:
483 : /** @pre Ex::dispatch() and std::coroutine_handle<>::resume() must not throw (handle may leak). */
484 : template<std::size_t I>
485 105 : void launch_one(executor_ref caller_ex, std::stop_token token)
486 : {
487 105 : auto runner = make_when_any_runner<I>(
488 105 : std::move(std::get<I>(*tasks_)), state_);
489 :
490 105 : auto h = runner.release();
491 105 : h.promise().state_ = state_;
492 105 : h.promise().index_ = I;
493 105 : h.promise().env_ = io_env{caller_ex, token, state_->core_.caller_env_->frame_allocator};
494 :
495 105 : std::coroutine_handle<> ch{h};
496 105 : state_->runner_handles_[I] = ch;
497 105 : caller_ex.post(ch);
498 210 : }
499 : };
500 :
501 : } // namespace detail
502 :
503 : /** Wait for the first awaitable to complete.
504 :
505 : Races multiple heterogeneous awaitables concurrently and returns when the
506 : first one completes. The result is a variant with one alternative per
507 : input task, preserving positional correspondence.
508 :
509 : @par Suspends
510 : The calling coroutine suspends when co_await is invoked. All awaitables
511 : are launched concurrently and execute in parallel. The coroutine resumes
512 : only after all awaitables have completed, even though the winner is
513 : determined by the first to finish.
514 :
515 : @par Completion Conditions
516 : @li Winner is determined when the first awaitable completes (success or exception)
517 : @li Only one task can claim winner status via atomic compare-exchange
518 : @li Once a winner exists, stop is requested for all remaining siblings
519 : @li Parent coroutine resumes only after all siblings acknowledge completion
520 : @li The winner's result is returned; if the winner threw, the exception is rethrown
521 :
522 : @par Cancellation Semantics
523 : Cancellation is supported via stop_token propagated through the
524 : IoAwaitable protocol:
525 : @li Each child awaitable receives a stop_token derived from a shared stop_source
526 : @li When the parent's stop token is activated, the stop is forwarded to all children
527 : @li When a winner is determined, stop_source_.request_stop() is called immediately
528 : @li Siblings must handle cancellation gracefully and complete before parent resumes
529 : @li Stop requests are cooperative; tasks must check and respond to them
530 :
531 : @par Concurrency/Overlap
532 : All awaitables are launched concurrently before any can complete.
533 : The launcher iterates through the arguments, starting each task on the
534 : caller's executor. Tasks may execute in parallel on multi-threaded
535 : executors or interleave on single-threaded executors. There is no
536 : guaranteed ordering of task completion.
537 :
538 : @par Notable Error Conditions
539 : @li Winner exception: if the winning task threw, that exception is rethrown
540 : @li Non-winner exceptions: silently discarded (only winner's result matters)
541 : @li Cancellation: tasks may complete via cancellation without throwing
542 :
543 : @par Example
544 : @code
545 : task<void> example() {
546 : auto result = co_await when_any(
547 : fetch_int(), // task<int>
548 : fetch_string() // task<std::string>
549 : );
550 : // result.index() is 0 or 1
551 : if (result.index() == 0)
552 : std::cout << "Got int: " << std::get<0>(result) << "\n";
553 : else
554 : std::cout << "Got string: " << std::get<1>(result) << "\n";
555 : }
556 : @endcode
557 :
558 : @param as Awaitables to race concurrently (at least one required; each
559 : must satisfy IoAwaitable).
560 : @return A task yielding a std::variant with one alternative per awaitable.
561 : Use .index() to identify the winner. Void awaitables contribute
562 : std::monostate.
563 :
564 : @throws Rethrows the winner's exception if the winning task threw an exception.
565 :
566 : @par Remarks
567 : Awaitables are moved into the coroutine frame; original objects become
568 : empty after the call. The variant preserves one alternative per input
569 : task. Use .index() to determine which awaitable completed first.
570 : Void awaitables contribute std::monostate to the variant.
571 :
572 : @see when_all, IoAwaitable
573 : */
574 : template<IoAwaitable... As>
575 : requires (sizeof...(As) > 0)
576 43 : [[nodiscard]] auto when_any(As... as)
577 : -> task<std::variant<void_to_monostate_t<awaitable_result_t<As>>...>>
578 : {
579 : detail::when_any_state<awaitable_result_t<As>...> state;
580 : std::tuple<As...> awaitable_tuple(std::move(as)...);
581 :
582 : co_await detail::when_any_launcher<As...>(&awaitable_tuple, &state);
583 :
584 : if(state.core_.winner_exception_)
585 : std::rethrow_exception(state.core_.winner_exception_);
586 :
587 : co_return std::move(*state.result_);
588 86 : }
589 :
590 : /** Concept for ranges of full I/O awaitables.
591 :
592 : A range satisfies `IoAwaitableRange` if it is a sized input range
593 : whose value type satisfies @ref IoAwaitable. This enables when_any
594 : to accept any container or view of awaitables, not just std::vector.
595 :
596 : @tparam R The range type.
597 :
598 : @par Requirements
599 : @li `R` must satisfy `std::ranges::input_range`
600 : @li `R` must satisfy `std::ranges::sized_range`
601 : @li `std::ranges::range_value_t<R>` must satisfy @ref IoAwaitable
602 :
603 : @par Syntactic Requirements
604 : Given `r` of type `R`:
605 : @li `std::ranges::begin(r)` is valid
606 : @li `std::ranges::end(r)` is valid
607 : @li `std::ranges::size(r)` returns `std::ranges::range_size_t<R>`
608 : @li `*std::ranges::begin(r)` satisfies @ref IoAwaitable
609 :
610 : @par Example
611 : @code
612 : template<IoAwaitableRange R>
613 : task<void> race_all(R&& awaitables) {
614 : auto winner = co_await when_any(std::forward<R>(awaitables));
615 : // Process winner...
616 : }
617 : @endcode
618 :
619 : @see when_any, IoAwaitable
620 : */
621 : template<typename R>
622 : concept IoAwaitableRange =
623 : std::ranges::input_range<R> &&
624 : std::ranges::sized_range<R> &&
625 : IoAwaitable<std::ranges::range_value_t<R>>;
626 :
627 : namespace detail {
628 :
629 : /** Shared state for homogeneous when_any (range overload).
630 :
631 : Uses composition with when_any_core for shared functionality.
632 : Simpler than heterogeneous: optional<T> instead of variant, vector
633 : instead of array for runner handles.
634 : */
635 : template<typename T>
636 : struct when_any_homogeneous_state
637 : {
638 : when_any_core core_;
639 : std::optional<T> result_;
640 : std::vector<std::coroutine_handle<>> runner_handles_;
641 :
642 19 : explicit when_any_homogeneous_state(std::size_t count)
643 19 : : core_(count)
644 38 : , runner_handles_(count)
645 : {
646 19 : }
647 :
648 : // Runners self-destruct in final_suspend. No destruction needed here.
649 :
650 : /** @pre core_.try_win() returned true. */
651 17 : void set_winner_result(T value)
652 : noexcept(std::is_nothrow_move_constructible_v<T>)
653 : {
654 17 : result_.emplace(std::move(value));
655 17 : }
656 : };
657 :
658 : /** Specialization for void tasks (no result storage needed). */
659 : template<>
660 : struct when_any_homogeneous_state<void>
661 : {
662 : when_any_core core_;
663 : std::vector<std::coroutine_handle<>> runner_handles_;
664 :
665 3 : explicit when_any_homogeneous_state(std::size_t count)
666 3 : : core_(count)
667 6 : , runner_handles_(count)
668 : {
669 3 : }
670 :
671 : // Runners self-destruct in final_suspend. No destruction needed here.
672 :
673 : // No set_winner_result - void tasks have no result to store
674 : };
675 :
676 : /** Launches all runners concurrently; see await_suspend for lifetime concerns. */
677 : template<IoAwaitableRange Range>
678 : class when_any_homogeneous_launcher
679 : {
680 : using Awaitable = std::ranges::range_value_t<Range>;
681 : using T = awaitable_result_t<Awaitable>;
682 :
683 : Range* range_;
684 : when_any_homogeneous_state<T>* state_;
685 :
686 : public:
687 22 : when_any_homogeneous_launcher(
688 : Range* range,
689 : when_any_homogeneous_state<T>* state)
690 22 : : range_(range)
691 22 : , state_(state)
692 : {
693 22 : }
694 :
695 22 : bool await_ready() const noexcept
696 : {
697 22 : return std::ranges::empty(*range_);
698 : }
699 :
700 : /** CRITICAL: If the last task finishes synchronously, parent resumes and
701 : destroys this object before await_suspend returns. Must not reference
702 : `this` after dispatching begins.
703 :
704 : Two-phase approach:
705 : 1. Create all runners (safe - no dispatch yet)
706 : 2. Dispatch all runners (any may complete synchronously)
707 : */
708 22 : std::coroutine_handle<> await_suspend(std::coroutine_handle<> continuation, io_env const* caller_env)
709 : {
710 22 : state_->core_.continuation_ = continuation;
711 22 : state_->core_.caller_env_ = caller_env;
712 :
713 22 : if(caller_env->stop_token.stop_possible())
714 : {
715 14 : state_->core_.parent_stop_callback_.emplace(
716 7 : caller_env->stop_token,
717 7 : when_any_core::stop_callback_fn{&state_->core_.stop_source_});
718 :
719 7 : if(caller_env->stop_token.stop_requested())
720 4 : state_->core_.stop_source_.request_stop();
721 : }
722 :
723 22 : auto token = state_->core_.stop_source_.get_token();
724 :
725 : // Phase 1: Create all runners without dispatching.
726 : // This iterates over *range_ safely because no runners execute yet.
727 22 : std::size_t index = 0;
728 107 : for(auto&& a : *range_)
729 : {
730 85 : auto runner = make_when_any_runner(
731 85 : std::move(a), state_, index);
732 :
733 85 : auto h = runner.release();
734 85 : h.promise().state_ = state_;
735 85 : h.promise().index_ = index;
736 85 : h.promise().env_ = io_env{caller_env->executor, token, caller_env->frame_allocator};
737 :
738 85 : state_->runner_handles_[index] = std::coroutine_handle<>{h};
739 85 : ++index;
740 : }
741 :
742 : // Phase 2: Post all runners. Any may complete synchronously.
743 : // After last post, state_ and this may be destroyed.
744 : // Use raw pointer/count captured before posting.
745 22 : std::coroutine_handle<>* handles = state_->runner_handles_.data();
746 22 : std::size_t count = state_->runner_handles_.size();
747 107 : for(std::size_t i = 0; i < count; ++i)
748 85 : caller_env->executor.post(handles[i]);
749 :
750 44 : return std::noop_coroutine();
751 107 : }
752 :
753 22 : void await_resume() const noexcept
754 : {
755 22 : }
756 : };
757 :
758 : } // namespace detail
759 :
760 : /** Wait for the first awaitable to complete (range overload).
761 :
762 : Races a range of awaitables with the same result type. Accepts any
763 : sized input range of IoAwaitable types, enabling use with arrays,
764 : spans, or custom containers.
765 :
766 : @par Suspends
767 : The calling coroutine suspends when co_await is invoked. All awaitables
768 : in the range are launched concurrently and execute in parallel. The
769 : coroutine resumes only after all awaitables have completed, even though
770 : the winner is determined by the first to finish.
771 :
772 : @par Completion Conditions
773 : @li Winner is determined when the first awaitable completes (success or exception)
774 : @li Only one task can claim winner status via atomic compare-exchange
775 : @li Once a winner exists, stop is requested for all remaining siblings
776 : @li Parent coroutine resumes only after all siblings acknowledge completion
777 : @li The winner's index and result are returned; if the winner threw, the exception is rethrown
778 :
779 : @par Cancellation Semantics
780 : Cancellation is supported via stop_token propagated through the
781 : IoAwaitable protocol:
782 : @li Each child awaitable receives a stop_token derived from a shared stop_source
783 : @li When the parent's stop token is activated, the stop is forwarded to all children
784 : @li When a winner is determined, stop_source_.request_stop() is called immediately
785 : @li Siblings must handle cancellation gracefully and complete before parent resumes
786 : @li Stop requests are cooperative; tasks must check and respond to them
787 :
788 : @par Concurrency/Overlap
789 : All awaitables are launched concurrently before any can complete.
790 : The launcher iterates through the range, starting each task on the
791 : caller's executor. Tasks may execute in parallel on multi-threaded
792 : executors or interleave on single-threaded executors. There is no
793 : guaranteed ordering of task completion.
794 :
795 : @par Notable Error Conditions
796 : @li Empty range: throws std::invalid_argument immediately (not via co_return)
797 : @li Winner exception: if the winning task threw, that exception is rethrown
798 : @li Non-winner exceptions: silently discarded (only winner's result matters)
799 : @li Cancellation: tasks may complete via cancellation without throwing
800 :
801 : @par Example
802 : @code
803 : task<void> example() {
804 : std::array<task<Response>, 3> requests = {
805 : fetch_from_server(0),
806 : fetch_from_server(1),
807 : fetch_from_server(2)
808 : };
809 :
810 : auto [index, response] = co_await when_any(std::move(requests));
811 : }
812 : @endcode
813 :
814 : @par Example with Vector
815 : @code
816 : task<Response> fetch_fastest(std::vector<Server> const& servers) {
817 : std::vector<task<Response>> requests;
818 : for (auto const& server : servers)
819 : requests.push_back(fetch_from(server));
820 :
821 : auto [index, response] = co_await when_any(std::move(requests));
822 : co_return response;
823 : }
824 : @endcode
825 :
826 : @tparam R Range type satisfying IoAwaitableRange.
827 : @param awaitables Range of awaitables to race concurrently (must not be empty).
828 : @return A task yielding a pair of (winner_index, result).
829 :
830 : @throws std::invalid_argument if range is empty (thrown before coroutine suspends).
831 : @throws Rethrows the winner's exception if the winning task threw an exception.
832 :
833 : @par Remarks
834 : Elements are moved from the range; for lvalue ranges, the original
835 : container will have moved-from elements after this call. The range
836 : is moved onto the coroutine frame to ensure lifetime safety. Unlike
837 : the variadic overload, no variant wrapper is needed since all tasks
838 : share the same return type.
839 :
840 : @see when_any, IoAwaitableRange
841 : */
842 : template<IoAwaitableRange R>
843 : requires (!std::is_void_v<awaitable_result_t<std::ranges::range_value_t<R>>>)
844 21 : [[nodiscard]] auto when_any(R&& awaitables)
845 : -> task<std::pair<std::size_t, awaitable_result_t<std::ranges::range_value_t<R>>>>
846 : {
847 : using Awaitable = std::ranges::range_value_t<R>;
848 : using T = awaitable_result_t<Awaitable>;
849 : using result_type = std::pair<std::size_t, T>;
850 : using OwnedRange = std::remove_cvref_t<R>;
851 :
852 : auto count = std::ranges::size(awaitables);
853 : if(count == 0)
854 : throw std::invalid_argument("when_any requires at least one awaitable");
855 :
856 : // Move/copy range onto coroutine frame to ensure lifetime
857 : OwnedRange owned_awaitables = std::forward<R>(awaitables);
858 :
859 : detail::when_any_homogeneous_state<T> state(count);
860 :
861 : co_await detail::when_any_homogeneous_launcher<OwnedRange>(&owned_awaitables, &state);
862 :
863 : if(state.core_.winner_exception_)
864 : std::rethrow_exception(state.core_.winner_exception_);
865 :
866 : co_return result_type{state.core_.winner_index_, std::move(*state.result_)};
867 42 : }
868 :
869 : /** Wait for the first awaitable to complete (void range overload).
870 :
871 : Races a range of void-returning awaitables. Since void awaitables have
872 : no result value, only the winner's index is returned.
873 :
874 : @par Suspends
875 : The calling coroutine suspends when co_await is invoked. All awaitables
876 : in the range are launched concurrently and execute in parallel. The
877 : coroutine resumes only after all awaitables have completed, even though
878 : the winner is determined by the first to finish.
879 :
880 : @par Completion Conditions
881 : @li Winner is determined when the first awaitable completes (success or exception)
882 : @li Only one task can claim winner status via atomic compare-exchange
883 : @li Once a winner exists, stop is requested for all remaining siblings
884 : @li Parent coroutine resumes only after all siblings acknowledge completion
885 : @li The winner's index is returned; if the winner threw, the exception is rethrown
886 :
887 : @par Cancellation Semantics
888 : Cancellation is supported via stop_token propagated through the
889 : IoAwaitable protocol:
890 : @li Each child awaitable receives a stop_token derived from a shared stop_source
891 : @li When the parent's stop token is activated, the stop is forwarded to all children
892 : @li When a winner is determined, stop_source_.request_stop() is called immediately
893 : @li Siblings must handle cancellation gracefully and complete before parent resumes
894 : @li Stop requests are cooperative; tasks must check and respond to them
895 :
896 : @par Concurrency/Overlap
897 : All awaitables are launched concurrently before any can complete.
898 : The launcher iterates through the range, starting each task on the
899 : caller's executor. Tasks may execute in parallel on multi-threaded
900 : executors or interleave on single-threaded executors. There is no
901 : guaranteed ordering of task completion.
902 :
903 : @par Notable Error Conditions
904 : @li Empty range: throws std::invalid_argument immediately (not via co_return)
905 : @li Winner exception: if the winning task threw, that exception is rethrown
906 : @li Non-winner exceptions: silently discarded (only winner's result matters)
907 : @li Cancellation: tasks may complete via cancellation without throwing
908 :
909 : @par Example
910 : @code
911 : task<void> example() {
912 : std::vector<task<void>> tasks;
913 : for (int i = 0; i < 5; ++i)
914 : tasks.push_back(background_work(i));
915 :
916 : std::size_t winner = co_await when_any(std::move(tasks));
917 : // winner is the index of the first task to complete
918 : }
919 : @endcode
920 :
921 : @par Example with Timeout
922 : @code
923 : task<void> with_timeout() {
924 : std::vector<task<void>> tasks;
925 : tasks.push_back(long_running_operation());
926 : tasks.push_back(delay(std::chrono::seconds(5)));
927 :
928 : std::size_t winner = co_await when_any(std::move(tasks));
929 : if (winner == 1) {
930 : // Timeout occurred
931 : }
932 : }
933 : @endcode
934 :
935 : @tparam R Range type satisfying IoAwaitableRange with void result.
936 : @param awaitables Range of void awaitables to race concurrently (must not be empty).
937 : @return A task yielding the winner's index (zero-based).
938 :
939 : @throws std::invalid_argument if range is empty (thrown before coroutine suspends).
940 : @throws Rethrows the winner's exception if the winning task threw an exception.
941 :
942 : @par Remarks
943 : Elements are moved from the range; for lvalue ranges, the original
944 : container will have moved-from elements after this call. The range
945 : is moved onto the coroutine frame to ensure lifetime safety. Unlike
946 : the non-void overload, no result storage is needed since void tasks
947 : produce no value.
948 :
949 : @see when_any, IoAwaitableRange
950 : */
951 : template<IoAwaitableRange R>
952 : requires std::is_void_v<awaitable_result_t<std::ranges::range_value_t<R>>>
953 3 : [[nodiscard]] auto when_any(R&& awaitables) -> task<std::size_t>
954 : {
955 : using OwnedRange = std::remove_cvref_t<R>;
956 :
957 : auto count = std::ranges::size(awaitables);
958 : if(count == 0)
959 : throw std::invalid_argument("when_any requires at least one awaitable");
960 :
961 : // Move/copy range onto coroutine frame to ensure lifetime
962 : OwnedRange owned_awaitables = std::forward<R>(awaitables);
963 :
964 : detail::when_any_homogeneous_state<void> state(count);
965 :
966 : co_await detail::when_any_homogeneous_launcher<OwnedRange>(&owned_awaitables, &state);
967 :
968 : if(state.core_.winner_exception_)
969 : std::rethrow_exception(state.core_.winner_exception_);
970 :
971 : co_return state.core_.winner_index_;
972 6 : }
973 :
974 : } // namespace capy
975 : } // namespace boost
976 :
977 : #endif
|