include/boost/capy/when_any.hpp

99.3% Lines (133/134) 93.0% Functions (452/486)
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Michael Vandeberg
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/capy
8 //
9
10 #ifndef BOOST_CAPY_WHEN_ANY_HPP
11 #define BOOST_CAPY_WHEN_ANY_HPP
12
13 #include <boost/capy/detail/config.hpp>
14 #include <boost/capy/concept/executor.hpp>
15 #include <boost/capy/concept/io_awaitable.hpp>
16 #include <coroutine>
17 #include <boost/capy/ex/executor_ref.hpp>
18 #include <boost/capy/ex/frame_allocator.hpp>
19 #include <boost/capy/ex/io_env.hpp>
20 #include <boost/capy/task.hpp>
21
22 #include <array>
23 #include <atomic>
24 #include <exception>
25 #include <optional>
26 #include <ranges>
27 #include <stdexcept>
28 #include <stop_token>
29 #include <tuple>
30 #include <type_traits>
31 #include <utility>
32 #include <variant>
33 #include <vector>
34
35 /*
36 when_any - Race multiple tasks, return first completion
37 ========================================================
38
39 OVERVIEW:
40 ---------
41 when_any launches N tasks concurrently and completes when the FIRST task
42 finishes (success or failure). It then requests stop for all siblings and
43 waits for them to acknowledge before returning.
44
45 ARCHITECTURE:
46 -------------
47 The design mirrors when_all but with inverted completion semantics:
48
49 when_all: complete when remaining_count reaches 0 (all done)
50 when_any: complete when has_winner becomes true (first done)
51 BUT still wait for remaining_count to reach 0 for cleanup
52
53 Key components:
54 - when_any_state: Shared state tracking winner and completion
55 - when_any_runner: Wrapper coroutine for each child task
56 - when_any_launcher: Awaitable that starts all runners concurrently
57
58 CRITICAL INVARIANTS:
59 --------------------
60 1. Exactly one task becomes the winner (via atomic compare_exchange)
61 2. All tasks must complete before parent resumes (cleanup safety)
62 3. Stop is requested immediately when winner is determined
63 4. Only the winner's result/exception is stored
64
65 POSITIONAL VARIANT:
66 -------------------
67 The variadic overload returns a std::variant with one alternative per
68 input task, preserving positional correspondence. Use .index() on
69 the variant to identify which task won.
70
71 Example: when_any(task<int>, task<string>, task<int>)
72 - Raw types after void->monostate: int, string, int
73 - Result variant: std::variant<int, string, int>
74 - variant.index() tells you which task won (0, 1, or 2)
75
76 VOID HANDLING:
77 --------------
78 void tasks contribute std::monostate to the variant.
79 All-void tasks result in: variant<monostate, monostate, monostate>
80
81 MEMORY MODEL:
82 -------------
83 Synchronization chain from winner's write to parent's read:
84
85 1. Winner thread writes result_/winner_exception_ (non-atomic)
86 2. Winner thread calls signal_completion() → fetch_sub(acq_rel) on remaining_count_
87 3. Last task thread (may be winner or non-winner) calls signal_completion()
88 → fetch_sub(acq_rel) on remaining_count_, observing count becomes 0
89 4. Last task returns caller_ex_.dispatch(continuation_) via symmetric transfer
90 5. Parent coroutine resumes and reads result_/winner_exception_
91
92 Synchronization analysis:
93 - All fetch_sub operations on remaining_count_ form a release sequence
94 - Winner's fetch_sub releases; subsequent fetch_sub operations participate
95 in the modification order of remaining_count_
96 - Last task's fetch_sub(acq_rel) synchronizes-with prior releases in the
97 modification order, establishing happens-before from winner's writes
98 - Executor dispatch() is expected to provide queue-based synchronization
99 (release-on-post, acquire-on-execute) completing the chain to parent
100 - Even inline executors work (same thread = sequenced-before)
101
102 Alternative considered: Adding winner_ready_ atomic (set with release after
103 storing winner data, acquired before reading) would make synchronization
104 self-contained and not rely on executor implementation details. Current
105 approach is correct but requires careful reasoning about release sequences
106 and executor behavior.
107
108 EXCEPTION SEMANTICS:
109 --------------------
110 Unlike when_all (which captures first exception, discards others), when_any
111 treats exceptions as valid completions. If the winning task threw, that
112 exception is rethrown. Exceptions from non-winners are silently discarded.
113 */
114
115 namespace boost {
116 namespace capy {
117
118 /** Convert void to monostate for variant storage.
119
120 std::variant<void, ...> is ill-formed, so void tasks contribute
121 std::monostate to the result variant instead. Non-void types
122 pass through unchanged.
123
124 @tparam T The type to potentially convert (void becomes monostate).
125 */
126 template<typename T>
127 using void_to_monostate_t = std::conditional_t<std::is_void_v<T>, std::monostate, T>;
128
129 namespace detail {
130
131 /** Core shared state for when_any operations.
132
133 Contains all members and methods common to both heterogeneous (variadic)
134 and homogeneous (range) when_any implementations. State classes embed
135 this via composition to avoid CRTP destructor ordering issues.
136
137 @par Thread Safety
138 Atomic operations protect winner selection and completion count.
139 */
140 struct when_any_core
141 {
142 std::atomic<std::size_t> remaining_count_;
143 std::size_t winner_index_{0};
144 std::exception_ptr winner_exception_;
145 std::stop_source stop_source_;
146
147 // Bridges parent's stop token to our stop_source
148 struct stop_callback_fn
149 {
150 std::stop_source* source_;
151 9x void operator()() const noexcept { source_->request_stop(); }
152 };
153 using stop_callback_t = std::stop_callback<stop_callback_fn>;
154 std::optional<stop_callback_t> parent_stop_callback_;
155
156 std::coroutine_handle<> continuation_;
157 io_env const* caller_env_ = nullptr;
158
159 // Placed last to avoid padding (1-byte atomic followed by 8-byte aligned members)
160 std::atomic<bool> has_winner_{false};
161
162 65x explicit when_any_core(std::size_t count) noexcept
163 65x : remaining_count_(count)
164 {
165 65x }
166
167 /** Atomically claim winner status; exactly one task succeeds. */
168 190x bool try_win(std::size_t index) noexcept
169 {
170 190x bool expected = false;
171 190x if(has_winner_.compare_exchange_strong(
172 expected, true, std::memory_order_acq_rel))
173 {
174 65x winner_index_ = index;
175 65x stop_source_.request_stop();
176 65x return true;
177 }
178 125x return false;
179 }
180
181 /** @pre try_win() returned true. */
182 8x void set_winner_exception(std::exception_ptr ep) noexcept
183 {
184 8x winner_exception_ = ep;
185 8x }
186
187 // Runners signal completion directly via final_suspend; no member function needed.
188 };
189
190 /** Shared state for heterogeneous when_any operation.
191
192 Coordinates winner selection, result storage, and completion tracking
193 for all child tasks in a when_any operation. Uses composition with
194 when_any_core for shared functionality.
195
196 @par Lifetime
197 Allocated on the parent coroutine's frame, outlives all runners.
198
199 @tparam Ts Task result types.
200 */
201 template<typename... Ts>
202 struct when_any_state
203 {
204 static constexpr std::size_t task_count = sizeof...(Ts);
205 using variant_type = std::variant<void_to_monostate_t<Ts>...>;
206
207 when_any_core core_;
208 std::optional<variant_type> result_;
209 std::array<std::coroutine_handle<>, task_count> runner_handles_{};
210
211 43x when_any_state()
212 43x : core_(task_count)
213 {
214 43x }
215
216 // Runners self-destruct in final_suspend. No destruction needed here.
217
218 /** @pre core_.try_win() returned true.
219 @note Uses in_place_index (not type) for positional variant access.
220 */
221 template<std::size_t I, typename T>
222 35x void set_winner_result(T value)
223 noexcept(std::is_nothrow_move_constructible_v<T>)
224 {
225 35x result_.emplace(std::in_place_index<I>, std::move(value));
226 35x }
227
228 /** @pre core_.try_win() returned true. */
229 template<std::size_t I>
230 3x void set_winner_void() noexcept
231 {
232 3x result_.emplace(std::in_place_index<I>, std::monostate{});
233 3x }
234 };
235
236 /** Wrapper coroutine that runs a single child task for when_any.
237
238 Propagates executor/stop_token to the child, attempts to claim winner
239 status on completion, and signals completion for cleanup coordination.
240
241 @tparam StateType The state type (when_any_state or when_any_homogeneous_state).
242 */
243 template<typename StateType>
244 struct when_any_runner
245 {
246 struct promise_type // : frame_allocating_base // DISABLED FOR TESTING
247 {
248 StateType* state_ = nullptr;
249 std::size_t index_ = 0;
250 io_env env_;
251
252 190x when_any_runner get_return_object() noexcept
253 {
254 190x return when_any_runner(std::coroutine_handle<promise_type>::from_promise(*this));
255 }
256
257 // Starts suspended; launcher sets up state/ex/token then resumes
258 190x std::suspend_always initial_suspend() noexcept
259 {
260 190x return {};
261 }
262
263 190x auto final_suspend() noexcept
264 {
265 struct awaiter
266 {
267 promise_type* p_;
268 bool await_ready() const noexcept { return false; }
269 auto await_suspend(std::coroutine_handle<> h) noexcept
270 {
271 // Extract everything needed before self-destruction.
272 auto& core = p_->state_->core_;
273 auto* counter = &core.remaining_count_;
274 auto* caller_env = core.caller_env_;
275 auto cont = core.continuation_;
276
277 h.destroy();
278
279 // If last runner, dispatch parent for symmetric transfer.
280 auto remaining = counter->fetch_sub(1, std::memory_order_acq_rel);
281 if(remaining == 1)
282 return detail::symmetric_transfer(caller_env->executor.dispatch(cont));
283 return detail::symmetric_transfer(std::noop_coroutine());
284 }
285 void await_resume() const noexcept {}
286 };
287 190x return awaiter{this};
288 }
289
290 178x void return_void() noexcept {}
291
292 // Exceptions are valid completions in when_any (unlike when_all)
293 12x void unhandled_exception()
294 {
295 12x if(state_->core_.try_win(index_))
296 8x state_->core_.set_winner_exception(std::current_exception());
297 12x }
298
299 /** Injects executor and stop token into child awaitables. */
300 template<class Awaitable>
301 struct transform_awaiter
302 {
303 std::decay_t<Awaitable> a_;
304 promise_type* p_;
305
306 190x bool await_ready() { return a_.await_ready(); }
307 190x auto await_resume() { return a_.await_resume(); }
308
309 template<class Promise>
310 185x auto await_suspend(std::coroutine_handle<Promise> h)
311 {
312 using R = decltype(a_.await_suspend(h, &p_->env_));
313 if constexpr (std::is_same_v<R, std::coroutine_handle<>>)
314 185x return detail::symmetric_transfer(a_.await_suspend(h, &p_->env_));
315 else
316 return a_.await_suspend(h, &p_->env_);
317 }
318 };
319
320 template<class Awaitable>
321 190x auto await_transform(Awaitable&& a)
322 {
323 using A = std::decay_t<Awaitable>;
324 if constexpr (IoAwaitable<A>)
325 {
326 return transform_awaiter<Awaitable>{
327 380x std::forward<Awaitable>(a), this};
328 }
329 else
330 {
331 static_assert(sizeof(A) == 0, "requires IoAwaitable");
332 }
333 190x }
334 };
335
336 std::coroutine_handle<promise_type> h_;
337
338 190x explicit when_any_runner(std::coroutine_handle<promise_type> h) noexcept
339 190x : h_(h)
340 {
341 190x }
342
343 // Enable move for all clang versions - some versions need it
344 when_any_runner(when_any_runner&& other) noexcept : h_(std::exchange(other.h_, nullptr)) {}
345
346 // Non-copyable
347 when_any_runner(when_any_runner const&) = delete;
348 when_any_runner& operator=(when_any_runner const&) = delete;
349 when_any_runner& operator=(when_any_runner&&) = delete;
350
351 190x auto release() noexcept
352 {
353 190x return std::exchange(h_, nullptr);
354 }
355 };
356
357 /** Indexed overload for heterogeneous when_any (compile-time index).
358
359 Uses compile-time index I for variant construction via in_place_index.
360 Called from when_any_launcher::launch_one<I>().
361 */
362 template<std::size_t I, IoAwaitable Awaitable, typename StateType>
363 when_any_runner<StateType>
364 105x make_when_any_runner(Awaitable inner, StateType* state)
365 {
366 using T = awaitable_result_t<Awaitable>;
367 if constexpr (std::is_void_v<T>)
368 {
369 co_await std::move(inner);
370 if(state->core_.try_win(I))
371 state->template set_winner_void<I>();
372 }
373 else
374 {
375 auto result = co_await std::move(inner);
376 if(state->core_.try_win(I))
377 {
378 try
379 {
380 state->template set_winner_result<I>(std::move(result));
381 }
382 catch(...)
383 {
384 state->core_.set_winner_exception(std::current_exception());
385 }
386 }
387 }
388 210x }
389
390 /** Runtime-index overload for homogeneous when_any (range path).
391
392 Uses requires-expressions to detect state capabilities:
393 - set_winner_void(): for heterogeneous void tasks (stores monostate)
394 - set_winner_result(): for non-void tasks
395 - Neither: for homogeneous void tasks (no result storage)
396 */
397 template<IoAwaitable Awaitable, typename StateType>
398 when_any_runner<StateType>
399 85x make_when_any_runner(Awaitable inner, StateType* state, std::size_t index)
400 {
401 using T = awaitable_result_t<Awaitable>;
402 if constexpr (std::is_void_v<T>)
403 {
404 co_await std::move(inner);
405 if(state->core_.try_win(index))
406 {
407 if constexpr (requires { state->set_winner_void(); })
408 state->set_winner_void();
409 }
410 }
411 else
412 {
413 auto result = co_await std::move(inner);
414 if(state->core_.try_win(index))
415 {
416 try
417 {
418 state->set_winner_result(std::move(result));
419 }
420 catch(...)
421 {
422 state->core_.set_winner_exception(std::current_exception());
423 }
424 }
425 }
426 170x }
427
428 /** Launches all runners concurrently; see await_suspend for lifetime concerns. */
429 template<IoAwaitable... Awaitables>
430 class when_any_launcher
431 {
432 using state_type = when_any_state<awaitable_result_t<Awaitables>...>;
433
434 std::tuple<Awaitables...>* tasks_;
435 state_type* state_;
436
437 public:
438 43x when_any_launcher(
439 std::tuple<Awaitables...>* tasks,
440 state_type* state)
441 43x : tasks_(tasks)
442 43x , state_(state)
443 {
444 43x }
445
446 43x bool await_ready() const noexcept
447 {
448 43x return sizeof...(Awaitables) == 0;
449 }
450
451 /** CRITICAL: If the last task finishes synchronously, parent resumes and
452 destroys this object before await_suspend returns. Must not reference
453 `this` after the final launch_one call.
454 */
455 43x std::coroutine_handle<> await_suspend(std::coroutine_handle<> continuation, io_env const* caller_env)
456 {
457 43x state_->core_.continuation_ = continuation;
458 43x state_->core_.caller_env_ = caller_env;
459
460 43x if(caller_env->stop_token.stop_possible())
461 {
462 18x state_->core_.parent_stop_callback_.emplace(
463 9x caller_env->stop_token,
464 9x when_any_core::stop_callback_fn{&state_->core_.stop_source_});
465
466 9x if(caller_env->stop_token.stop_requested())
467 3x state_->core_.stop_source_.request_stop();
468 }
469
470 43x auto token = state_->core_.stop_source_.get_token();
471 [&]<std::size_t... Is>(std::index_sequence<Is...>) {
472 (..., launch_one<Is>(caller_env->executor, token));
473 43x }(std::index_sequence_for<Awaitables...>{});
474
475 86x return std::noop_coroutine();
476 43x }
477
478 43x void await_resume() const noexcept
479 {
480 43x }
481
482 private:
483 /** @pre Ex::dispatch() and std::coroutine_handle<>::resume() must not throw (handle may leak). */
484 template<std::size_t I>
485 105x void launch_one(executor_ref caller_ex, std::stop_token token)
486 {
487 105x auto runner = make_when_any_runner<I>(
488 105x std::move(std::get<I>(*tasks_)), state_);
489
490 105x auto h = runner.release();
491 105x h.promise().state_ = state_;
492 105x h.promise().index_ = I;
493 105x h.promise().env_ = io_env{caller_ex, token, state_->core_.caller_env_->frame_allocator};
494
495 105x std::coroutine_handle<> ch{h};
496 105x state_->runner_handles_[I] = ch;
497 105x caller_ex.post(ch);
498 210x }
499 };
500
501 } // namespace detail
502
503 /** Wait for the first awaitable to complete.
504
505 Races multiple heterogeneous awaitables concurrently and returns when the
506 first one completes. The result is a variant with one alternative per
507 input task, preserving positional correspondence.
508
509 @par Suspends
510 The calling coroutine suspends when co_await is invoked. All awaitables
511 are launched concurrently and execute in parallel. The coroutine resumes
512 only after all awaitables have completed, even though the winner is
513 determined by the first to finish.
514
515 @par Completion Conditions
516 @li Winner is determined when the first awaitable completes (success or exception)
517 @li Only one task can claim winner status via atomic compare-exchange
518 @li Once a winner exists, stop is requested for all remaining siblings
519 @li Parent coroutine resumes only after all siblings acknowledge completion
520 @li The winner's result is returned; if the winner threw, the exception is rethrown
521
522 @par Cancellation Semantics
523 Cancellation is supported via stop_token propagated through the
524 IoAwaitable protocol:
525 @li Each child awaitable receives a stop_token derived from a shared stop_source
526 @li When the parent's stop token is activated, the stop is forwarded to all children
527 @li When a winner is determined, stop_source_.request_stop() is called immediately
528 @li Siblings must handle cancellation gracefully and complete before parent resumes
529 @li Stop requests are cooperative; tasks must check and respond to them
530
531 @par Concurrency/Overlap
532 All awaitables are launched concurrently before any can complete.
533 The launcher iterates through the arguments, starting each task on the
534 caller's executor. Tasks may execute in parallel on multi-threaded
535 executors or interleave on single-threaded executors. There is no
536 guaranteed ordering of task completion.
537
538 @par Notable Error Conditions
539 @li Winner exception: if the winning task threw, that exception is rethrown
540 @li Non-winner exceptions: silently discarded (only winner's result matters)
541 @li Cancellation: tasks may complete via cancellation without throwing
542
543 @par Example
544 @code
545 task<void> example() {
546 auto result = co_await when_any(
547 fetch_int(), // task<int>
548 fetch_string() // task<std::string>
549 );
550 // result.index() is 0 or 1
551 if (result.index() == 0)
552 std::cout << "Got int: " << std::get<0>(result) << "\n";
553 else
554 std::cout << "Got string: " << std::get<1>(result) << "\n";
555 }
556 @endcode
557
558 @param as Awaitables to race concurrently (at least one required; each
559 must satisfy IoAwaitable).
560 @return A task yielding a std::variant with one alternative per awaitable.
561 Use .index() to identify the winner. Void awaitables contribute
562 std::monostate.
563
564 @throws Rethrows the winner's exception if the winning task threw an exception.
565
566 @par Remarks
567 Awaitables are moved into the coroutine frame; original objects become
568 empty after the call. The variant preserves one alternative per input
569 task. Use .index() to determine which awaitable completed first.
570 Void awaitables contribute std::monostate to the variant.
571
572 @see when_all, IoAwaitable
573 */
574 template<IoAwaitable... As>
575 requires (sizeof...(As) > 0)
576 43x [[nodiscard]] auto when_any(As... as)
577 -> task<std::variant<void_to_monostate_t<awaitable_result_t<As>>...>>
578 {
579 detail::when_any_state<awaitable_result_t<As>...> state;
580 std::tuple<As...> awaitable_tuple(std::move(as)...);
581
582 co_await detail::when_any_launcher<As...>(&awaitable_tuple, &state);
583
584 if(state.core_.winner_exception_)
585 std::rethrow_exception(state.core_.winner_exception_);
586
587 co_return std::move(*state.result_);
588 86x }
589
590 /** Concept for ranges of full I/O awaitables.
591
592 A range satisfies `IoAwaitableRange` if it is a sized input range
593 whose value type satisfies @ref IoAwaitable. This enables when_any
594 to accept any container or view of awaitables, not just std::vector.
595
596 @tparam R The range type.
597
598 @par Requirements
599 @li `R` must satisfy `std::ranges::input_range`
600 @li `R` must satisfy `std::ranges::sized_range`
601 @li `std::ranges::range_value_t<R>` must satisfy @ref IoAwaitable
602
603 @par Syntactic Requirements
604 Given `r` of type `R`:
605 @li `std::ranges::begin(r)` is valid
606 @li `std::ranges::end(r)` is valid
607 @li `std::ranges::size(r)` returns `std::ranges::range_size_t<R>`
608 @li `*std::ranges::begin(r)` satisfies @ref IoAwaitable
609
610 @par Example
611 @code
612 template<IoAwaitableRange R>
613 task<void> race_all(R&& awaitables) {
614 auto winner = co_await when_any(std::forward<R>(awaitables));
615 // Process winner...
616 }
617 @endcode
618
619 @see when_any, IoAwaitable
620 */
621 template<typename R>
622 concept IoAwaitableRange =
623 std::ranges::input_range<R> &&
624 std::ranges::sized_range<R> &&
625 IoAwaitable<std::ranges::range_value_t<R>>;
626
627 namespace detail {
628
629 /** Shared state for homogeneous when_any (range overload).
630
631 Uses composition with when_any_core for shared functionality.
632 Simpler than heterogeneous: optional<T> instead of variant, vector
633 instead of array for runner handles.
634 */
635 template<typename T>
636 struct when_any_homogeneous_state
637 {
638 when_any_core core_;
639 std::optional<T> result_;
640 std::vector<std::coroutine_handle<>> runner_handles_;
641
642 19x explicit when_any_homogeneous_state(std::size_t count)
643 19x : core_(count)
644 38x , runner_handles_(count)
645 {
646 19x }
647
648 // Runners self-destruct in final_suspend. No destruction needed here.
649
650 /** @pre core_.try_win() returned true. */
651 17x void set_winner_result(T value)
652 noexcept(std::is_nothrow_move_constructible_v<T>)
653 {
654 17x result_.emplace(std::move(value));
655 17x }
656 };
657
658 /** Specialization for void tasks (no result storage needed). */
659 template<>
660 struct when_any_homogeneous_state<void>
661 {
662 when_any_core core_;
663 std::vector<std::coroutine_handle<>> runner_handles_;
664
665 3x explicit when_any_homogeneous_state(std::size_t count)
666 3x : core_(count)
667 6x , runner_handles_(count)
668 {
669 3x }
670
671 // Runners self-destruct in final_suspend. No destruction needed here.
672
673 // No set_winner_result - void tasks have no result to store
674 };
675
676 /** Launches all runners concurrently; see await_suspend for lifetime concerns. */
677 template<IoAwaitableRange Range>
678 class when_any_homogeneous_launcher
679 {
680 using Awaitable = std::ranges::range_value_t<Range>;
681 using T = awaitable_result_t<Awaitable>;
682
683 Range* range_;
684 when_any_homogeneous_state<T>* state_;
685
686 public:
687 22x when_any_homogeneous_launcher(
688 Range* range,
689 when_any_homogeneous_state<T>* state)
690 22x : range_(range)
691 22x , state_(state)
692 {
693 22x }
694
695 22x bool await_ready() const noexcept
696 {
697 22x return std::ranges::empty(*range_);
698 }
699
700 /** CRITICAL: If the last task finishes synchronously, parent resumes and
701 destroys this object before await_suspend returns. Must not reference
702 `this` after dispatching begins.
703
704 Two-phase approach:
705 1. Create all runners (safe - no dispatch yet)
706 2. Dispatch all runners (any may complete synchronously)
707 */
708 22x std::coroutine_handle<> await_suspend(std::coroutine_handle<> continuation, io_env const* caller_env)
709 {
710 22x state_->core_.continuation_ = continuation;
711 22x state_->core_.caller_env_ = caller_env;
712
713 22x if(caller_env->stop_token.stop_possible())
714 {
715 14x state_->core_.parent_stop_callback_.emplace(
716 7x caller_env->stop_token,
717 7x when_any_core::stop_callback_fn{&state_->core_.stop_source_});
718
719 7x if(caller_env->stop_token.stop_requested())
720 4x state_->core_.stop_source_.request_stop();
721 }
722
723 22x auto token = state_->core_.stop_source_.get_token();
724
725 // Phase 1: Create all runners without dispatching.
726 // This iterates over *range_ safely because no runners execute yet.
727 22x std::size_t index = 0;
728 107x for(auto&& a : *range_)
729 {
730 85x auto runner = make_when_any_runner(
731 85x std::move(a), state_, index);
732
733 85x auto h = runner.release();
734 85x h.promise().state_ = state_;
735 85x h.promise().index_ = index;
736 85x h.promise().env_ = io_env{caller_env->executor, token, caller_env->frame_allocator};
737
738 85x state_->runner_handles_[index] = std::coroutine_handle<>{h};
739 85x ++index;
740 }
741
742 // Phase 2: Post all runners. Any may complete synchronously.
743 // After last post, state_ and this may be destroyed.
744 // Use raw pointer/count captured before posting.
745 22x std::coroutine_handle<>* handles = state_->runner_handles_.data();
746 22x std::size_t count = state_->runner_handles_.size();
747 107x for(std::size_t i = 0; i < count; ++i)
748 85x caller_env->executor.post(handles[i]);
749
750 44x return std::noop_coroutine();
751 107x }
752
753 22x void await_resume() const noexcept
754 {
755 22x }
756 };
757
758 } // namespace detail
759
760 /** Wait for the first awaitable to complete (range overload).
761
762 Races a range of awaitables with the same result type. Accepts any
763 sized input range of IoAwaitable types, enabling use with arrays,
764 spans, or custom containers.
765
766 @par Suspends
767 The calling coroutine suspends when co_await is invoked. All awaitables
768 in the range are launched concurrently and execute in parallel. The
769 coroutine resumes only after all awaitables have completed, even though
770 the winner is determined by the first to finish.
771
772 @par Completion Conditions
773 @li Winner is determined when the first awaitable completes (success or exception)
774 @li Only one task can claim winner status via atomic compare-exchange
775 @li Once a winner exists, stop is requested for all remaining siblings
776 @li Parent coroutine resumes only after all siblings acknowledge completion
777 @li The winner's index and result are returned; if the winner threw, the exception is rethrown
778
779 @par Cancellation Semantics
780 Cancellation is supported via stop_token propagated through the
781 IoAwaitable protocol:
782 @li Each child awaitable receives a stop_token derived from a shared stop_source
783 @li When the parent's stop token is activated, the stop is forwarded to all children
784 @li When a winner is determined, stop_source_.request_stop() is called immediately
785 @li Siblings must handle cancellation gracefully and complete before parent resumes
786 @li Stop requests are cooperative; tasks must check and respond to them
787
788 @par Concurrency/Overlap
789 All awaitables are launched concurrently before any can complete.
790 The launcher iterates through the range, starting each task on the
791 caller's executor. Tasks may execute in parallel on multi-threaded
792 executors or interleave on single-threaded executors. There is no
793 guaranteed ordering of task completion.
794
795 @par Notable Error Conditions
796 @li Empty range: throws std::invalid_argument immediately (not via co_return)
797 @li Winner exception: if the winning task threw, that exception is rethrown
798 @li Non-winner exceptions: silently discarded (only winner's result matters)
799 @li Cancellation: tasks may complete via cancellation without throwing
800
801 @par Example
802 @code
803 task<void> example() {
804 std::array<task<Response>, 3> requests = {
805 fetch_from_server(0),
806 fetch_from_server(1),
807 fetch_from_server(2)
808 };
809
810 auto [index, response] = co_await when_any(std::move(requests));
811 }
812 @endcode
813
814 @par Example with Vector
815 @code
816 task<Response> fetch_fastest(std::vector<Server> const& servers) {
817 std::vector<task<Response>> requests;
818 for (auto const& server : servers)
819 requests.push_back(fetch_from(server));
820
821 auto [index, response] = co_await when_any(std::move(requests));
822 co_return response;
823 }
824 @endcode
825
826 @tparam R Range type satisfying IoAwaitableRange.
827 @param awaitables Range of awaitables to race concurrently (must not be empty).
828 @return A task yielding a pair of (winner_index, result).
829
830 @throws std::invalid_argument if range is empty (thrown before coroutine suspends).
831 @throws Rethrows the winner's exception if the winning task threw an exception.
832
833 @par Remarks
834 Elements are moved from the range; for lvalue ranges, the original
835 container will have moved-from elements after this call. The range
836 is moved onto the coroutine frame to ensure lifetime safety. Unlike
837 the variadic overload, no variant wrapper is needed since all tasks
838 share the same return type.
839
840 @see when_any, IoAwaitableRange
841 */
842 template<IoAwaitableRange R>
843 requires (!std::is_void_v<awaitable_result_t<std::ranges::range_value_t<R>>>)
844 21x [[nodiscard]] auto when_any(R&& awaitables)
845 -> task<std::pair<std::size_t, awaitable_result_t<std::ranges::range_value_t<R>>>>
846 {
847 using Awaitable = std::ranges::range_value_t<R>;
848 using T = awaitable_result_t<Awaitable>;
849 using result_type = std::pair<std::size_t, T>;
850 using OwnedRange = std::remove_cvref_t<R>;
851
852 auto count = std::ranges::size(awaitables);
853 if(count == 0)
854 throw std::invalid_argument("when_any requires at least one awaitable");
855
856 // Move/copy range onto coroutine frame to ensure lifetime
857 OwnedRange owned_awaitables = std::forward<R>(awaitables);
858
859 detail::when_any_homogeneous_state<T> state(count);
860
861 co_await detail::when_any_homogeneous_launcher<OwnedRange>(&owned_awaitables, &state);
862
863 if(state.core_.winner_exception_)
864 std::rethrow_exception(state.core_.winner_exception_);
865
866 co_return result_type{state.core_.winner_index_, std::move(*state.result_)};
867 42x }
868
869 /** Wait for the first awaitable to complete (void range overload).
870
871 Races a range of void-returning awaitables. Since void awaitables have
872 no result value, only the winner's index is returned.
873
874 @par Suspends
875 The calling coroutine suspends when co_await is invoked. All awaitables
876 in the range are launched concurrently and execute in parallel. The
877 coroutine resumes only after all awaitables have completed, even though
878 the winner is determined by the first to finish.
879
880 @par Completion Conditions
881 @li Winner is determined when the first awaitable completes (success or exception)
882 @li Only one task can claim winner status via atomic compare-exchange
883 @li Once a winner exists, stop is requested for all remaining siblings
884 @li Parent coroutine resumes only after all siblings acknowledge completion
885 @li The winner's index is returned; if the winner threw, the exception is rethrown
886
887 @par Cancellation Semantics
888 Cancellation is supported via stop_token propagated through the
889 IoAwaitable protocol:
890 @li Each child awaitable receives a stop_token derived from a shared stop_source
891 @li When the parent's stop token is activated, the stop is forwarded to all children
892 @li When a winner is determined, stop_source_.request_stop() is called immediately
893 @li Siblings must handle cancellation gracefully and complete before parent resumes
894 @li Stop requests are cooperative; tasks must check and respond to them
895
896 @par Concurrency/Overlap
897 All awaitables are launched concurrently before any can complete.
898 The launcher iterates through the range, starting each task on the
899 caller's executor. Tasks may execute in parallel on multi-threaded
900 executors or interleave on single-threaded executors. There is no
901 guaranteed ordering of task completion.
902
903 @par Notable Error Conditions
904 @li Empty range: throws std::invalid_argument immediately (not via co_return)
905 @li Winner exception: if the winning task threw, that exception is rethrown
906 @li Non-winner exceptions: silently discarded (only winner's result matters)
907 @li Cancellation: tasks may complete via cancellation without throwing
908
909 @par Example
910 @code
911 task<void> example() {
912 std::vector<task<void>> tasks;
913 for (int i = 0; i < 5; ++i)
914 tasks.push_back(background_work(i));
915
916 std::size_t winner = co_await when_any(std::move(tasks));
917 // winner is the index of the first task to complete
918 }
919 @endcode
920
921 @par Example with Timeout
922 @code
923 task<void> with_timeout() {
924 std::vector<task<void>> tasks;
925 tasks.push_back(long_running_operation());
926 tasks.push_back(delay(std::chrono::seconds(5)));
927
928 std::size_t winner = co_await when_any(std::move(tasks));
929 if (winner == 1) {
930 // Timeout occurred
931 }
932 }
933 @endcode
934
935 @tparam R Range type satisfying IoAwaitableRange with void result.
936 @param awaitables Range of void awaitables to race concurrently (must not be empty).
937 @return A task yielding the winner's index (zero-based).
938
939 @throws std::invalid_argument if range is empty (thrown before coroutine suspends).
940 @throws Rethrows the winner's exception if the winning task threw an exception.
941
942 @par Remarks
943 Elements are moved from the range; for lvalue ranges, the original
944 container will have moved-from elements after this call. The range
945 is moved onto the coroutine frame to ensure lifetime safety. Unlike
946 the non-void overload, no result storage is needed since void tasks
947 produce no value.
948
949 @see when_any, IoAwaitableRange
950 */
951 template<IoAwaitableRange R>
952 requires std::is_void_v<awaitable_result_t<std::ranges::range_value_t<R>>>
953 3x [[nodiscard]] auto when_any(R&& awaitables) -> task<std::size_t>
954 {
955 using OwnedRange = std::remove_cvref_t<R>;
956
957 auto count = std::ranges::size(awaitables);
958 if(count == 0)
959 throw std::invalid_argument("when_any requires at least one awaitable");
960
961 // Move/copy range onto coroutine frame to ensure lifetime
962 OwnedRange owned_awaitables = std::forward<R>(awaitables);
963
964 detail::when_any_homogeneous_state<void> state(count);
965
966 co_await detail::when_any_homogeneous_launcher<OwnedRange>(&owned_awaitables, &state);
967
968 if(state.core_.winner_exception_)
969 std::rethrow_exception(state.core_.winner_exception_);
970
971 co_return state.core_.winner_index_;
972 6x }
973
974 } // namespace capy
975 } // namespace boost
976
977 #endif
978