LCOV - code coverage report
Current view: top level - capy - when_any.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 99.3 % 147 146 1
Test Date: 2026-03-04 22:59:25 Functions: 90.6 % 556 504 52

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2026 Michael Vandeberg
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/capy
       8                 : //
       9                 : 
      10                 : #ifndef BOOST_CAPY_WHEN_ANY_HPP
      11                 : #define BOOST_CAPY_WHEN_ANY_HPP
      12                 : 
      13                 : #include <boost/capy/detail/config.hpp>
      14                 : #include <boost/capy/concept/executor.hpp>
      15                 : #include <boost/capy/concept/io_awaitable.hpp>
      16                 : #include <coroutine>
      17                 : #include <boost/capy/ex/executor_ref.hpp>
      18                 : #include <boost/capy/ex/frame_allocator.hpp>
      19                 : #include <boost/capy/ex/io_env.hpp>
      20                 : #include <boost/capy/task.hpp>
      21                 : 
      22                 : #include <array>
      23                 : #include <atomic>
      24                 : #include <exception>
      25                 : #include <optional>
      26                 : #include <ranges>
      27                 : #include <stdexcept>
      28                 : #include <stop_token>
      29                 : #include <tuple>
      30                 : #include <type_traits>
      31                 : #include <utility>
      32                 : #include <variant>
      33                 : #include <vector>
      34                 : 
      35                 : /*
      36                 :    when_any - Race multiple tasks, return first completion
      37                 :    ========================================================
      38                 : 
      39                 :    OVERVIEW:
      40                 :    ---------
      41                 :    when_any launches N tasks concurrently and completes when the FIRST task
      42                 :    finishes (success or failure). It then requests stop for all siblings and
      43                 :    waits for them to acknowledge before returning.
      44                 : 
      45                 :    ARCHITECTURE:
      46                 :    -------------
      47                 :    The design mirrors when_all but with inverted completion semantics:
      48                 : 
      49                 :      when_all:  complete when remaining_count reaches 0 (all done)
      50                 :      when_any:  complete when has_winner becomes true (first done)
      51                 :                 BUT still wait for remaining_count to reach 0 for cleanup
      52                 : 
      53                 :    Key components:
      54                 :      - when_any_state:    Shared state tracking winner and completion
      55                 :      - when_any_runner:   Wrapper coroutine for each child task
      56                 :      - when_any_launcher: Awaitable that starts all runners concurrently
      57                 : 
      58                 :    CRITICAL INVARIANTS:
      59                 :    --------------------
      60                 :    1. Exactly one task becomes the winner (via atomic compare_exchange)
      61                 :    2. All tasks must complete before parent resumes (cleanup safety)
      62                 :    3. Stop is requested immediately when winner is determined
      63                 :    4. Only the winner's result/exception is stored
      64                 : 
      65                 :    POSITIONAL VARIANT:
      66                 :    -------------------
      67                 :    The variadic overload returns a std::variant with one alternative per
      68                 :    input task, preserving positional correspondence. Use .index() on
      69                 :    the variant to identify which task won.
      70                 : 
      71                 :    Example: when_any(task<int>, task<string>, task<int>)
      72                 :      - Raw types after void->monostate: int, string, int
      73                 :      - Result variant: std::variant<int, string, int>
      74                 :      - variant.index() tells you which task won (0, 1, or 2)
      75                 : 
      76                 :    VOID HANDLING:
      77                 :    --------------
      78                 :    void tasks contribute std::monostate to the variant.
      79                 :    All-void tasks result in: variant<monostate, monostate, monostate>
      80                 : 
      81                 :    MEMORY MODEL:
      82                 :    -------------
      83                 :    Synchronization chain from winner's write to parent's read:
      84                 : 
      85                 :    1. Winner thread writes result_/winner_exception_ (non-atomic)
      86                 :    2. Winner thread calls signal_completion() → fetch_sub(acq_rel) on remaining_count_
      87                 :    3. Last task thread (may be winner or non-winner) calls signal_completion()
      88                 :       → fetch_sub(acq_rel) on remaining_count_, observing count becomes 0
      89                 :    4. Last task returns caller_ex_.dispatch(continuation_) via symmetric transfer
      90                 :    5. Parent coroutine resumes and reads result_/winner_exception_
      91                 : 
      92                 :    Synchronization analysis:
      93                 :    - All fetch_sub operations on remaining_count_ form a release sequence
      94                 :    - Winner's fetch_sub releases; subsequent fetch_sub operations participate
      95                 :      in the modification order of remaining_count_
      96                 :    - Last task's fetch_sub(acq_rel) synchronizes-with prior releases in the
      97                 :      modification order, establishing happens-before from winner's writes
      98                 :    - Executor dispatch() is expected to provide queue-based synchronization
      99                 :      (release-on-post, acquire-on-execute) completing the chain to parent
     100                 :    - Even inline executors work (same thread = sequenced-before)
     101                 : 
     102                 :    Alternative considered: Adding winner_ready_ atomic (set with release after
     103                 :    storing winner data, acquired before reading) would make synchronization
     104                 :    self-contained and not rely on executor implementation details. Current
     105                 :    approach is correct but requires careful reasoning about release sequences
     106                 :    and executor behavior.
     107                 : 
     108                 :    EXCEPTION SEMANTICS:
     109                 :    --------------------
     110                 :    Unlike when_all (which captures first exception, discards others), when_any
     111                 :    treats exceptions as valid completions. If the winning task threw, that
     112                 :    exception is rethrown. Exceptions from non-winners are silently discarded.
     113                 : */
     114                 : 
     115                 : namespace boost {
     116                 : namespace capy {
     117                 : 
     118                 : /** Convert void to monostate for variant storage.
     119                 : 
     120                 :     std::variant<void, ...> is ill-formed, so void tasks contribute
     121                 :     std::monostate to the result variant instead. Non-void types
     122                 :     pass through unchanged.
     123                 : 
     124                 :     @tparam T The type to potentially convert (void becomes monostate).
     125                 : */
     126                 : template<typename T>
     127                 : using void_to_monostate_t = std::conditional_t<std::is_void_v<T>, std::monostate, T>;
     128                 : 
     129                 : namespace detail {
     130                 : 
     131                 : /** Core shared state for when_any operations.
     132                 : 
     133                 :     Contains all members and methods common to both heterogeneous (variadic)
     134                 :     and homogeneous (range) when_any implementations. State classes embed
     135                 :     this via composition to avoid CRTP destructor ordering issues.
     136                 : 
     137                 :     @par Thread Safety
     138                 :     Atomic operations protect winner selection and completion count.
     139                 : */
     140                 : struct when_any_core
     141                 : {
     142                 :     std::atomic<std::size_t> remaining_count_;
     143                 :     std::size_t winner_index_{0};
     144                 :     std::exception_ptr winner_exception_;
     145                 :     std::stop_source stop_source_;
     146                 : 
     147                 :     // Bridges parent's stop token to our stop_source
     148                 :     struct stop_callback_fn
     149                 :     {
     150                 :         std::stop_source* source_;
     151 HIT           9 :         void operator()() const noexcept { source_->request_stop(); }
     152                 :     };
     153                 :     using stop_callback_t = std::stop_callback<stop_callback_fn>;
     154                 :     std::optional<stop_callback_t> parent_stop_callback_;
     155                 : 
     156                 :     std::coroutine_handle<> continuation_;
     157                 :     io_env const* caller_env_ = nullptr;
     158                 : 
     159                 :     // Placed last to avoid padding (1-byte atomic followed by 8-byte aligned members)
     160                 :     std::atomic<bool> has_winner_{false};
     161                 : 
     162              65 :     explicit when_any_core(std::size_t count) noexcept
     163              65 :         : remaining_count_(count)
     164                 :     {
     165              65 :     }
     166                 : 
     167                 :     /** Atomically claim winner status; exactly one task succeeds. */
     168             190 :     bool try_win(std::size_t index) noexcept
     169                 :     {
     170             190 :         bool expected = false;
     171             190 :         if(has_winner_.compare_exchange_strong(
     172                 :             expected, true, std::memory_order_acq_rel))
     173                 :         {
     174              65 :             winner_index_ = index;
     175              65 :             stop_source_.request_stop();
     176              65 :             return true;
     177                 :         }
     178             125 :         return false;
     179                 :     }
     180                 : 
     181                 :     /** @pre try_win() returned true. */
     182               8 :     void set_winner_exception(std::exception_ptr ep) noexcept
     183                 :     {
     184               8 :         winner_exception_ = ep;
     185               8 :     }
     186                 : 
     187                 :     // Runners signal completion directly via final_suspend; no member function needed.
     188                 : };
     189                 : 
     190                 : /** Shared state for heterogeneous when_any operation.
     191                 : 
     192                 :     Coordinates winner selection, result storage, and completion tracking
     193                 :     for all child tasks in a when_any operation. Uses composition with
     194                 :     when_any_core for shared functionality.
     195                 : 
     196                 :     @par Lifetime
     197                 :     Allocated on the parent coroutine's frame, outlives all runners.
     198                 : 
     199                 :     @tparam Ts Task result types.
     200                 : */
     201                 : template<typename... Ts>
     202                 : struct when_any_state
     203                 : {
     204                 :     static constexpr std::size_t task_count = sizeof...(Ts);
     205                 :     using variant_type = std::variant<void_to_monostate_t<Ts>...>;
     206                 : 
     207                 :     when_any_core core_;
     208                 :     std::optional<variant_type> result_;
     209                 :     std::array<std::coroutine_handle<>, task_count> runner_handles_{};
     210                 : 
     211              43 :     when_any_state()
     212              43 :         : core_(task_count)
     213                 :     {
     214              43 :     }
     215                 : 
     216                 :     // Runners self-destruct in final_suspend. No destruction needed here.
     217                 : 
     218                 :     /** @pre core_.try_win() returned true.
     219                 :         @note Uses in_place_index (not type) for positional variant access.
     220                 :     */
     221                 :     template<std::size_t I, typename T>
     222              35 :     void set_winner_result(T value)
     223                 :         noexcept(std::is_nothrow_move_constructible_v<T>)
     224                 :     {
     225              35 :         result_.emplace(std::in_place_index<I>, std::move(value));
     226              35 :     }
     227                 : 
     228                 :     /** @pre core_.try_win() returned true. */
     229                 :     template<std::size_t I>
     230               3 :     void set_winner_void() noexcept
     231                 :     {
     232               3 :         result_.emplace(std::in_place_index<I>, std::monostate{});
     233               3 :     }
     234                 : };
     235                 : 
     236                 : /** Wrapper coroutine that runs a single child task for when_any.
     237                 : 
     238                 :     Propagates executor/stop_token to the child, attempts to claim winner
     239                 :     status on completion, and signals completion for cleanup coordination.
     240                 : 
     241                 :     @tparam StateType The state type (when_any_state or when_any_homogeneous_state).
     242                 : */
     243                 : template<typename StateType>
     244                 : struct when_any_runner
     245                 : {
     246                 :     struct promise_type // : frame_allocating_base  // DISABLED FOR TESTING
     247                 :     {
     248                 :         StateType* state_ = nullptr;
     249                 :         std::size_t index_ = 0;
     250                 :         io_env env_;
     251                 : 
     252             190 :         when_any_runner get_return_object() noexcept
     253                 :         {
     254             190 :             return when_any_runner(std::coroutine_handle<promise_type>::from_promise(*this));
     255                 :         }
     256                 : 
     257                 :         // Starts suspended; launcher sets up state/ex/token then resumes
     258             190 :         std::suspend_always initial_suspend() noexcept
     259                 :         {
     260             190 :             return {};
     261                 :         }
     262                 : 
     263             190 :         auto final_suspend() noexcept
     264                 :         {
     265                 :             struct awaiter
     266                 :             {
     267                 :                 promise_type* p_;
     268             190 :                 bool await_ready() const noexcept { return false; }
     269             190 :                 auto await_suspend(std::coroutine_handle<> h) noexcept
     270                 :                 {
     271                 :                     // Extract everything needed before self-destruction.
     272             190 :                     auto& core = p_->state_->core_;
     273             190 :                     auto* counter = &core.remaining_count_;
     274             190 :                     auto* caller_env = core.caller_env_;
     275             190 :                     auto cont = core.continuation_;
     276                 : 
     277             190 :                     h.destroy();
     278                 : 
     279                 :                     // If last runner, dispatch parent for symmetric transfer.
     280             190 :                     auto remaining = counter->fetch_sub(1, std::memory_order_acq_rel);
     281             190 :                     if(remaining == 1)
     282              65 :                         return detail::symmetric_transfer(caller_env->executor.dispatch(cont));
     283             125 :                     return detail::symmetric_transfer(std::noop_coroutine());
     284                 :                 }
     285 MIS           0 :                 void await_resume() const noexcept {}
     286                 :             };
     287 HIT         190 :             return awaiter{this};
     288                 :         }
     289                 : 
     290             178 :         void return_void() noexcept {}
     291                 : 
     292                 :         // Exceptions are valid completions in when_any (unlike when_all)
     293              12 :         void unhandled_exception()
     294                 :         {
     295              12 :             if(state_->core_.try_win(index_))
     296               8 :                 state_->core_.set_winner_exception(std::current_exception());
     297              12 :         }
     298                 : 
     299                 :         /** Injects executor and stop token into child awaitables. */
     300                 :         template<class Awaitable>
     301                 :         struct transform_awaiter
     302                 :         {
     303                 :             std::decay_t<Awaitable> a_;
     304                 :             promise_type* p_;
     305                 : 
     306             190 :             bool await_ready() { return a_.await_ready(); }
     307             190 :             auto await_resume() { return a_.await_resume(); }
     308                 : 
     309                 :             template<class Promise>
     310             185 :             auto await_suspend(std::coroutine_handle<Promise> h)
     311                 :             {
     312                 :                 using R = decltype(a_.await_suspend(h, &p_->env_));
     313                 :                 if constexpr (std::is_same_v<R, std::coroutine_handle<>>)
     314             185 :                     return detail::symmetric_transfer(a_.await_suspend(h, &p_->env_));
     315                 :                 else
     316                 :                     return a_.await_suspend(h, &p_->env_);
     317                 :             }
     318                 :         };
     319                 : 
     320                 :         template<class Awaitable>
     321             190 :         auto await_transform(Awaitable&& a)
     322                 :         {
     323                 :             using A = std::decay_t<Awaitable>;
     324                 :             if constexpr (IoAwaitable<A>)
     325                 :             {
     326                 :                 return transform_awaiter<Awaitable>{
     327             380 :                     std::forward<Awaitable>(a), this};
     328                 :             }
     329                 :             else
     330                 :             {
     331                 :                 static_assert(sizeof(A) == 0, "requires IoAwaitable");
     332                 :             }
     333             190 :         }
     334                 :     };
     335                 : 
     336                 :     std::coroutine_handle<promise_type> h_;
     337                 : 
     338             190 :     explicit when_any_runner(std::coroutine_handle<promise_type> h) noexcept
     339             190 :         : h_(h)
     340                 :     {
     341             190 :     }
     342                 : 
     343                 :     // Enable move for all clang versions - some versions need it
     344                 :     when_any_runner(when_any_runner&& other) noexcept : h_(std::exchange(other.h_, nullptr)) {}
     345                 : 
     346                 :     // Non-copyable
     347                 :     when_any_runner(when_any_runner const&) = delete;
     348                 :     when_any_runner& operator=(when_any_runner const&) = delete;
     349                 :     when_any_runner& operator=(when_any_runner&&) = delete;
     350                 : 
     351             190 :     auto release() noexcept
     352                 :     {
     353             190 :         return std::exchange(h_, nullptr);
     354                 :     }
     355                 : };
     356                 : 
     357                 : /** Indexed overload for heterogeneous when_any (compile-time index).
     358                 : 
     359                 :     Uses compile-time index I for variant construction via in_place_index.
     360                 :     Called from when_any_launcher::launch_one<I>().
     361                 : */
     362                 : template<std::size_t I, IoAwaitable Awaitable, typename StateType>
     363                 : when_any_runner<StateType>
     364             105 : make_when_any_runner(Awaitable inner, StateType* state)
     365                 : {
     366                 :     using T = awaitable_result_t<Awaitable>;
     367                 :     if constexpr (std::is_void_v<T>)
     368                 :     {
     369                 :         co_await std::move(inner);
     370                 :         if(state->core_.try_win(I))
     371                 :             state->template set_winner_void<I>();
     372                 :     }
     373                 :     else
     374                 :     {
     375                 :         auto result = co_await std::move(inner);
     376                 :         if(state->core_.try_win(I))
     377                 :         {
     378                 :             try
     379                 :             {
     380                 :                 state->template set_winner_result<I>(std::move(result));
     381                 :             }
     382                 :             catch(...)
     383                 :             {
     384                 :                 state->core_.set_winner_exception(std::current_exception());
     385                 :             }
     386                 :         }
     387                 :     }
     388             210 : }
     389                 : 
     390                 : /** Runtime-index overload for homogeneous when_any (range path).
     391                 : 
     392                 :     Uses requires-expressions to detect state capabilities:
     393                 :     - set_winner_void(): for heterogeneous void tasks (stores monostate)
     394                 :     - set_winner_result(): for non-void tasks
     395                 :     - Neither: for homogeneous void tasks (no result storage)
     396                 : */
     397                 : template<IoAwaitable Awaitable, typename StateType>
     398                 : when_any_runner<StateType>
     399              85 : make_when_any_runner(Awaitable inner, StateType* state, std::size_t index)
     400                 : {
     401                 :     using T = awaitable_result_t<Awaitable>;
     402                 :     if constexpr (std::is_void_v<T>)
     403                 :     {
     404                 :         co_await std::move(inner);
     405                 :         if(state->core_.try_win(index))
     406                 :         {
     407                 :             if constexpr (requires { state->set_winner_void(); })
     408                 :                 state->set_winner_void();
     409                 :         }
     410                 :     }
     411                 :     else
     412                 :     {
     413                 :         auto result = co_await std::move(inner);
     414                 :         if(state->core_.try_win(index))
     415                 :         {
     416                 :             try
     417                 :             {
     418                 :                 state->set_winner_result(std::move(result));
     419                 :             }
     420                 :             catch(...)
     421                 :             {
     422                 :                 state->core_.set_winner_exception(std::current_exception());
     423                 :             }
     424                 :         }
     425                 :     }
     426             170 : }
     427                 : 
     428                 : /** Launches all runners concurrently; see await_suspend for lifetime concerns. */
     429                 : template<IoAwaitable... Awaitables>
     430                 : class when_any_launcher
     431                 : {
     432                 :     using state_type = when_any_state<awaitable_result_t<Awaitables>...>;
     433                 : 
     434                 :     std::tuple<Awaitables...>* tasks_;
     435                 :     state_type* state_;
     436                 : 
     437                 : public:
     438              43 :     when_any_launcher(
     439                 :         std::tuple<Awaitables...>* tasks,
     440                 :         state_type* state)
     441              43 :         : tasks_(tasks)
     442              43 :         , state_(state)
     443                 :     {
     444              43 :     }
     445                 : 
     446              43 :     bool await_ready() const noexcept
     447                 :     {
     448              43 :         return sizeof...(Awaitables) == 0;
     449                 :     }
     450                 : 
     451                 :     /** CRITICAL: If the last task finishes synchronously, parent resumes and
     452                 :         destroys this object before await_suspend returns. Must not reference
     453                 :         `this` after the final launch_one call.
     454                 :     */
     455              43 :     std::coroutine_handle<> await_suspend(std::coroutine_handle<> continuation, io_env const* caller_env)
     456                 :     {
     457              43 :         state_->core_.continuation_ = continuation;
     458              43 :         state_->core_.caller_env_ = caller_env;
     459                 : 
     460              43 :         if(caller_env->stop_token.stop_possible())
     461                 :         {
     462              18 :             state_->core_.parent_stop_callback_.emplace(
     463               9 :                 caller_env->stop_token,
     464               9 :                 when_any_core::stop_callback_fn{&state_->core_.stop_source_});
     465                 : 
     466               9 :             if(caller_env->stop_token.stop_requested())
     467               3 :                 state_->core_.stop_source_.request_stop();
     468                 :         }
     469                 : 
     470              43 :         auto token = state_->core_.stop_source_.get_token();
     471              86 :         [&]<std::size_t... Is>(std::index_sequence<Is...>) {
     472              43 :             (..., launch_one<Is>(caller_env->executor, token));
     473              43 :         }(std::index_sequence_for<Awaitables...>{});
     474                 : 
     475              86 :         return std::noop_coroutine();
     476              43 :     }
     477                 : 
     478              43 :     void await_resume() const noexcept
     479                 :     {
     480              43 :     }
     481                 : 
     482                 : private:
     483                 :     /** @pre Ex::dispatch() and std::coroutine_handle<>::resume() must not throw (handle may leak). */
     484                 :     template<std::size_t I>
     485             105 :     void launch_one(executor_ref caller_ex, std::stop_token token)
     486                 :     {
     487             105 :         auto runner = make_when_any_runner<I>(
     488             105 :             std::move(std::get<I>(*tasks_)), state_);
     489                 : 
     490             105 :         auto h = runner.release();
     491             105 :         h.promise().state_ = state_;
     492             105 :         h.promise().index_ = I;
     493             105 :         h.promise().env_ = io_env{caller_ex, token, state_->core_.caller_env_->frame_allocator};
     494                 : 
     495             105 :         std::coroutine_handle<> ch{h};
     496             105 :         state_->runner_handles_[I] = ch;
     497             105 :         caller_ex.post(ch);
     498             210 :     }
     499                 : };
     500                 : 
     501                 : } // namespace detail
     502                 : 
     503                 : /** Wait for the first awaitable to complete.
     504                 : 
     505                 :     Races multiple heterogeneous awaitables concurrently and returns when the
     506                 :     first one completes. The result is a variant with one alternative per
     507                 :     input task, preserving positional correspondence.
     508                 : 
     509                 :     @par Suspends
     510                 :     The calling coroutine suspends when co_await is invoked. All awaitables
     511                 :     are launched concurrently and execute in parallel. The coroutine resumes
     512                 :     only after all awaitables have completed, even though the winner is
     513                 :     determined by the first to finish.
     514                 : 
     515                 :     @par Completion Conditions
     516                 :     @li Winner is determined when the first awaitable completes (success or exception)
     517                 :     @li Only one task can claim winner status via atomic compare-exchange
     518                 :     @li Once a winner exists, stop is requested for all remaining siblings
     519                 :     @li Parent coroutine resumes only after all siblings acknowledge completion
     520                 :     @li The winner's result is returned; if the winner threw, the exception is rethrown
     521                 : 
     522                 :     @par Cancellation Semantics
     523                 :     Cancellation is supported via stop_token propagated through the
     524                 :     IoAwaitable protocol:
     525                 :     @li Each child awaitable receives a stop_token derived from a shared stop_source
     526                 :     @li When the parent's stop token is activated, the stop is forwarded to all children
     527                 :     @li When a winner is determined, stop_source_.request_stop() is called immediately
     528                 :     @li Siblings must handle cancellation gracefully and complete before parent resumes
     529                 :     @li Stop requests are cooperative; tasks must check and respond to them
     530                 : 
     531                 :     @par Concurrency/Overlap
     532                 :     All awaitables are launched concurrently before any can complete.
     533                 :     The launcher iterates through the arguments, starting each task on the
     534                 :     caller's executor. Tasks may execute in parallel on multi-threaded
     535                 :     executors or interleave on single-threaded executors. There is no
     536                 :     guaranteed ordering of task completion.
     537                 : 
     538                 :     @par Notable Error Conditions
     539                 :     @li Winner exception: if the winning task threw, that exception is rethrown
     540                 :     @li Non-winner exceptions: silently discarded (only winner's result matters)
     541                 :     @li Cancellation: tasks may complete via cancellation without throwing
     542                 : 
     543                 :     @par Example
     544                 :     @code
     545                 :     task<void> example() {
     546                 :         auto result = co_await when_any(
     547                 :             fetch_int(),      // task<int>
     548                 :             fetch_string()    // task<std::string>
     549                 :         );
     550                 :         // result.index() is 0 or 1
     551                 :         if (result.index() == 0)
     552                 :             std::cout << "Got int: " << std::get<0>(result) << "\n";
     553                 :         else
     554                 :             std::cout << "Got string: " << std::get<1>(result) << "\n";
     555                 :     }
     556                 :     @endcode
     557                 : 
     558                 :     @param as Awaitables to race concurrently (at least one required; each
     559                 :         must satisfy IoAwaitable).
     560                 :     @return A task yielding a std::variant with one alternative per awaitable.
     561                 :         Use .index() to identify the winner. Void awaitables contribute
     562                 :         std::monostate.
     563                 : 
     564                 :     @throws Rethrows the winner's exception if the winning task threw an exception.
     565                 : 
     566                 :     @par Remarks
     567                 :     Awaitables are moved into the coroutine frame; original objects become
     568                 :     empty after the call. The variant preserves one alternative per input
     569                 :     task. Use .index() to determine which awaitable completed first.
     570                 :     Void awaitables contribute std::monostate to the variant.
     571                 : 
     572                 :     @see when_all, IoAwaitable
     573                 : */
     574                 : template<IoAwaitable... As>
     575                 :     requires (sizeof...(As) > 0)
     576              43 : [[nodiscard]] auto when_any(As... as)
     577                 :     -> task<std::variant<void_to_monostate_t<awaitable_result_t<As>>...>>
     578                 : {
     579                 :     detail::when_any_state<awaitable_result_t<As>...> state;
     580                 :     std::tuple<As...> awaitable_tuple(std::move(as)...);
     581                 : 
     582                 :     co_await detail::when_any_launcher<As...>(&awaitable_tuple, &state);
     583                 : 
     584                 :     if(state.core_.winner_exception_)
     585                 :         std::rethrow_exception(state.core_.winner_exception_);
     586                 : 
     587                 :     co_return std::move(*state.result_);
     588              86 : }
     589                 : 
     590                 : /** Concept for ranges of full I/O awaitables.
     591                 : 
     592                 :     A range satisfies `IoAwaitableRange` if it is a sized input range
     593                 :     whose value type satisfies @ref IoAwaitable. This enables when_any
     594                 :     to accept any container or view of awaitables, not just std::vector.
     595                 : 
     596                 :     @tparam R The range type.
     597                 : 
     598                 :     @par Requirements
     599                 :     @li `R` must satisfy `std::ranges::input_range`
     600                 :     @li `R` must satisfy `std::ranges::sized_range`
     601                 :     @li `std::ranges::range_value_t<R>` must satisfy @ref IoAwaitable
     602                 : 
     603                 :     @par Syntactic Requirements
     604                 :     Given `r` of type `R`:
     605                 :     @li `std::ranges::begin(r)` is valid
     606                 :     @li `std::ranges::end(r)` is valid
     607                 :     @li `std::ranges::size(r)` returns `std::ranges::range_size_t<R>`
     608                 :     @li `*std::ranges::begin(r)` satisfies @ref IoAwaitable
     609                 : 
     610                 :     @par Example
     611                 :     @code
     612                 :     template<IoAwaitableRange R>
     613                 :     task<void> race_all(R&& awaitables) {
     614                 :         auto winner = co_await when_any(std::forward<R>(awaitables));
     615                 :         // Process winner...
     616                 :     }
     617                 :     @endcode
     618                 : 
     619                 :     @see when_any, IoAwaitable
     620                 : */
     621                 : template<typename R>
     622                 : concept IoAwaitableRange =
     623                 :     std::ranges::input_range<R> &&
     624                 :     std::ranges::sized_range<R> &&
     625                 :     IoAwaitable<std::ranges::range_value_t<R>>;
     626                 : 
     627                 : namespace detail {
     628                 : 
     629                 : /** Shared state for homogeneous when_any (range overload).
     630                 : 
     631                 :     Uses composition with when_any_core for shared functionality.
     632                 :     Simpler than heterogeneous: optional<T> instead of variant, vector
     633                 :     instead of array for runner handles.
     634                 : */
     635                 : template<typename T>
     636                 : struct when_any_homogeneous_state
     637                 : {
     638                 :     when_any_core core_;
     639                 :     std::optional<T> result_;
     640                 :     std::vector<std::coroutine_handle<>> runner_handles_;
     641                 : 
     642              19 :     explicit when_any_homogeneous_state(std::size_t count)
     643              19 :         : core_(count)
     644              38 :         , runner_handles_(count)
     645                 :     {
     646              19 :     }
     647                 : 
     648                 :     // Runners self-destruct in final_suspend. No destruction needed here.
     649                 : 
     650                 :     /** @pre core_.try_win() returned true. */
     651              17 :     void set_winner_result(T value)
     652                 :         noexcept(std::is_nothrow_move_constructible_v<T>)
     653                 :     {
     654              17 :         result_.emplace(std::move(value));
     655              17 :     }
     656                 : };
     657                 : 
     658                 : /** Specialization for void tasks (no result storage needed). */
     659                 : template<>
     660                 : struct when_any_homogeneous_state<void>
     661                 : {
     662                 :     when_any_core core_;
     663                 :     std::vector<std::coroutine_handle<>> runner_handles_;
     664                 : 
     665               3 :     explicit when_any_homogeneous_state(std::size_t count)
     666               3 :         : core_(count)
     667               6 :         , runner_handles_(count)
     668                 :     {
     669               3 :     }
     670                 : 
     671                 :     // Runners self-destruct in final_suspend. No destruction needed here.
     672                 : 
     673                 :     // No set_winner_result - void tasks have no result to store
     674                 : };
     675                 : 
     676                 : /** Launches all runners concurrently; see await_suspend for lifetime concerns. */
     677                 : template<IoAwaitableRange Range>
     678                 : class when_any_homogeneous_launcher
     679                 : {
     680                 :     using Awaitable = std::ranges::range_value_t<Range>;
     681                 :     using T = awaitable_result_t<Awaitable>;
     682                 : 
     683                 :     Range* range_;
     684                 :     when_any_homogeneous_state<T>* state_;
     685                 : 
     686                 : public:
     687              22 :     when_any_homogeneous_launcher(
     688                 :         Range* range,
     689                 :         when_any_homogeneous_state<T>* state)
     690              22 :         : range_(range)
     691              22 :         , state_(state)
     692                 :     {
     693              22 :     }
     694                 : 
     695              22 :     bool await_ready() const noexcept
     696                 :     {
     697              22 :         return std::ranges::empty(*range_);
     698                 :     }
     699                 : 
     700                 :     /** CRITICAL: If the last task finishes synchronously, parent resumes and
     701                 :         destroys this object before await_suspend returns. Must not reference
     702                 :         `this` after dispatching begins.
     703                 : 
     704                 :         Two-phase approach:
     705                 :         1. Create all runners (safe - no dispatch yet)
     706                 :         2. Dispatch all runners (any may complete synchronously)
     707                 :     */
     708              22 :     std::coroutine_handle<> await_suspend(std::coroutine_handle<> continuation, io_env const* caller_env)
     709                 :     {
     710              22 :         state_->core_.continuation_ = continuation;
     711              22 :         state_->core_.caller_env_ = caller_env;
     712                 : 
     713              22 :         if(caller_env->stop_token.stop_possible())
     714                 :         {
     715              14 :             state_->core_.parent_stop_callback_.emplace(
     716               7 :                 caller_env->stop_token,
     717               7 :                 when_any_core::stop_callback_fn{&state_->core_.stop_source_});
     718                 : 
     719               7 :             if(caller_env->stop_token.stop_requested())
     720               4 :                 state_->core_.stop_source_.request_stop();
     721                 :         }
     722                 : 
     723              22 :         auto token = state_->core_.stop_source_.get_token();
     724                 : 
     725                 :         // Phase 1: Create all runners without dispatching.
     726                 :         // This iterates over *range_ safely because no runners execute yet.
     727              22 :         std::size_t index = 0;
     728             107 :         for(auto&& a : *range_)
     729                 :         {
     730              85 :             auto runner = make_when_any_runner(
     731              85 :                 std::move(a), state_, index);
     732                 : 
     733              85 :             auto h = runner.release();
     734              85 :             h.promise().state_ = state_;
     735              85 :             h.promise().index_ = index;
     736              85 :             h.promise().env_ = io_env{caller_env->executor, token, caller_env->frame_allocator};
     737                 : 
     738              85 :             state_->runner_handles_[index] = std::coroutine_handle<>{h};
     739              85 :             ++index;
     740                 :         }
     741                 : 
     742                 :         // Phase 2: Post all runners. Any may complete synchronously.
     743                 :         // After last post, state_ and this may be destroyed.
     744                 :         // Use raw pointer/count captured before posting.
     745              22 :         std::coroutine_handle<>* handles = state_->runner_handles_.data();
     746              22 :         std::size_t count = state_->runner_handles_.size();
     747             107 :         for(std::size_t i = 0; i < count; ++i)
     748              85 :             caller_env->executor.post(handles[i]);
     749                 : 
     750              44 :         return std::noop_coroutine();
     751             107 :     }
     752                 : 
     753              22 :     void await_resume() const noexcept
     754                 :     {
     755              22 :     }
     756                 : };
     757                 : 
     758                 : } // namespace detail
     759                 : 
     760                 : /** Wait for the first awaitable to complete (range overload).
     761                 : 
     762                 :     Races a range of awaitables with the same result type. Accepts any
     763                 :     sized input range of IoAwaitable types, enabling use with arrays,
     764                 :     spans, or custom containers.
     765                 : 
     766                 :     @par Suspends
     767                 :     The calling coroutine suspends when co_await is invoked. All awaitables
     768                 :     in the range are launched concurrently and execute in parallel. The
     769                 :     coroutine resumes only after all awaitables have completed, even though
     770                 :     the winner is determined by the first to finish.
     771                 : 
     772                 :     @par Completion Conditions
     773                 :     @li Winner is determined when the first awaitable completes (success or exception)
     774                 :     @li Only one task can claim winner status via atomic compare-exchange
     775                 :     @li Once a winner exists, stop is requested for all remaining siblings
     776                 :     @li Parent coroutine resumes only after all siblings acknowledge completion
     777                 :     @li The winner's index and result are returned; if the winner threw, the exception is rethrown
     778                 : 
     779                 :     @par Cancellation Semantics
     780                 :     Cancellation is supported via stop_token propagated through the
     781                 :     IoAwaitable protocol:
     782                 :     @li Each child awaitable receives a stop_token derived from a shared stop_source
     783                 :     @li When the parent's stop token is activated, the stop is forwarded to all children
     784                 :     @li When a winner is determined, stop_source_.request_stop() is called immediately
     785                 :     @li Siblings must handle cancellation gracefully and complete before parent resumes
     786                 :     @li Stop requests are cooperative; tasks must check and respond to them
     787                 : 
     788                 :     @par Concurrency/Overlap
     789                 :     All awaitables are launched concurrently before any can complete.
     790                 :     The launcher iterates through the range, starting each task on the
     791                 :     caller's executor. Tasks may execute in parallel on multi-threaded
     792                 :     executors or interleave on single-threaded executors. There is no
     793                 :     guaranteed ordering of task completion.
     794                 : 
     795                 :     @par Notable Error Conditions
     796                 :     @li Empty range: throws std::invalid_argument immediately (not via co_return)
     797                 :     @li Winner exception: if the winning task threw, that exception is rethrown
     798                 :     @li Non-winner exceptions: silently discarded (only winner's result matters)
     799                 :     @li Cancellation: tasks may complete via cancellation without throwing
     800                 : 
     801                 :     @par Example
     802                 :     @code
     803                 :     task<void> example() {
     804                 :         std::array<task<Response>, 3> requests = {
     805                 :             fetch_from_server(0),
     806                 :             fetch_from_server(1),
     807                 :             fetch_from_server(2)
     808                 :         };
     809                 : 
     810                 :         auto [index, response] = co_await when_any(std::move(requests));
     811                 :     }
     812                 :     @endcode
     813                 : 
     814                 :     @par Example with Vector
     815                 :     @code
     816                 :     task<Response> fetch_fastest(std::vector<Server> const& servers) {
     817                 :         std::vector<task<Response>> requests;
     818                 :         for (auto const& server : servers)
     819                 :             requests.push_back(fetch_from(server));
     820                 : 
     821                 :         auto [index, response] = co_await when_any(std::move(requests));
     822                 :         co_return response;
     823                 :     }
     824                 :     @endcode
     825                 : 
     826                 :     @tparam R Range type satisfying IoAwaitableRange.
     827                 :     @param awaitables Range of awaitables to race concurrently (must not be empty).
     828                 :     @return A task yielding a pair of (winner_index, result).
     829                 : 
     830                 :     @throws std::invalid_argument if range is empty (thrown before coroutine suspends).
     831                 :     @throws Rethrows the winner's exception if the winning task threw an exception.
     832                 : 
     833                 :     @par Remarks
     834                 :     Elements are moved from the range; for lvalue ranges, the original
     835                 :     container will have moved-from elements after this call. The range
     836                 :     is moved onto the coroutine frame to ensure lifetime safety. Unlike
     837                 :     the variadic overload, no variant wrapper is needed since all tasks
     838                 :     share the same return type.
     839                 : 
     840                 :     @see when_any, IoAwaitableRange
     841                 : */
     842                 : template<IoAwaitableRange R>
     843                 :     requires (!std::is_void_v<awaitable_result_t<std::ranges::range_value_t<R>>>)
     844              21 : [[nodiscard]] auto when_any(R&& awaitables)
     845                 :     -> task<std::pair<std::size_t, awaitable_result_t<std::ranges::range_value_t<R>>>>
     846                 : {
     847                 :     using Awaitable = std::ranges::range_value_t<R>;
     848                 :     using T = awaitable_result_t<Awaitable>;
     849                 :     using result_type = std::pair<std::size_t, T>;
     850                 :     using OwnedRange = std::remove_cvref_t<R>;
     851                 : 
     852                 :     auto count = std::ranges::size(awaitables);
     853                 :     if(count == 0)
     854                 :         throw std::invalid_argument("when_any requires at least one awaitable");
     855                 : 
     856                 :     // Move/copy range onto coroutine frame to ensure lifetime
     857                 :     OwnedRange owned_awaitables = std::forward<R>(awaitables);
     858                 : 
     859                 :     detail::when_any_homogeneous_state<T> state(count);
     860                 : 
     861                 :     co_await detail::when_any_homogeneous_launcher<OwnedRange>(&owned_awaitables, &state);
     862                 : 
     863                 :     if(state.core_.winner_exception_)
     864                 :         std::rethrow_exception(state.core_.winner_exception_);
     865                 : 
     866                 :     co_return result_type{state.core_.winner_index_, std::move(*state.result_)};
     867              42 : }
     868                 : 
     869                 : /** Wait for the first awaitable to complete (void range overload).
     870                 : 
     871                 :     Races a range of void-returning awaitables. Since void awaitables have
     872                 :     no result value, only the winner's index is returned.
     873                 : 
     874                 :     @par Suspends
     875                 :     The calling coroutine suspends when co_await is invoked. All awaitables
     876                 :     in the range are launched concurrently and execute in parallel. The
     877                 :     coroutine resumes only after all awaitables have completed, even though
     878                 :     the winner is determined by the first to finish.
     879                 : 
     880                 :     @par Completion Conditions
     881                 :     @li Winner is determined when the first awaitable completes (success or exception)
     882                 :     @li Only one task can claim winner status via atomic compare-exchange
     883                 :     @li Once a winner exists, stop is requested for all remaining siblings
     884                 :     @li Parent coroutine resumes only after all siblings acknowledge completion
     885                 :     @li The winner's index is returned; if the winner threw, the exception is rethrown
     886                 : 
     887                 :     @par Cancellation Semantics
     888                 :     Cancellation is supported via stop_token propagated through the
     889                 :     IoAwaitable protocol:
     890                 :     @li Each child awaitable receives a stop_token derived from a shared stop_source
     891                 :     @li When the parent's stop token is activated, the stop is forwarded to all children
     892                 :     @li When a winner is determined, stop_source_.request_stop() is called immediately
     893                 :     @li Siblings must handle cancellation gracefully and complete before parent resumes
     894                 :     @li Stop requests are cooperative; tasks must check and respond to them
     895                 : 
     896                 :     @par Concurrency/Overlap
     897                 :     All awaitables are launched concurrently before any can complete.
     898                 :     The launcher iterates through the range, starting each task on the
     899                 :     caller's executor. Tasks may execute in parallel on multi-threaded
     900                 :     executors or interleave on single-threaded executors. There is no
     901                 :     guaranteed ordering of task completion.
     902                 : 
     903                 :     @par Notable Error Conditions
     904                 :     @li Empty range: throws std::invalid_argument immediately (not via co_return)
     905                 :     @li Winner exception: if the winning task threw, that exception is rethrown
     906                 :     @li Non-winner exceptions: silently discarded (only winner's result matters)
     907                 :     @li Cancellation: tasks may complete via cancellation without throwing
     908                 : 
     909                 :     @par Example
     910                 :     @code
     911                 :     task<void> example() {
     912                 :         std::vector<task<void>> tasks;
     913                 :         for (int i = 0; i < 5; ++i)
     914                 :             tasks.push_back(background_work(i));
     915                 : 
     916                 :         std::size_t winner = co_await when_any(std::move(tasks));
     917                 :         // winner is the index of the first task to complete
     918                 :     }
     919                 :     @endcode
     920                 : 
     921                 :     @par Example with Timeout
     922                 :     @code
     923                 :     task<void> with_timeout() {
     924                 :         std::vector<task<void>> tasks;
     925                 :         tasks.push_back(long_running_operation());
     926                 :         tasks.push_back(delay(std::chrono::seconds(5)));
     927                 : 
     928                 :         std::size_t winner = co_await when_any(std::move(tasks));
     929                 :         if (winner == 1) {
     930                 :             // Timeout occurred
     931                 :         }
     932                 :     }
     933                 :     @endcode
     934                 : 
     935                 :     @tparam R Range type satisfying IoAwaitableRange with void result.
     936                 :     @param awaitables Range of void awaitables to race concurrently (must not be empty).
     937                 :     @return A task yielding the winner's index (zero-based).
     938                 : 
     939                 :     @throws std::invalid_argument if range is empty (thrown before coroutine suspends).
     940                 :     @throws Rethrows the winner's exception if the winning task threw an exception.
     941                 : 
     942                 :     @par Remarks
     943                 :     Elements are moved from the range; for lvalue ranges, the original
     944                 :     container will have moved-from elements after this call. The range
     945                 :     is moved onto the coroutine frame to ensure lifetime safety. Unlike
     946                 :     the non-void overload, no result storage is needed since void tasks
     947                 :     produce no value.
     948                 : 
     949                 :     @see when_any, IoAwaitableRange
     950                 : */
     951                 : template<IoAwaitableRange R>
     952                 :     requires std::is_void_v<awaitable_result_t<std::ranges::range_value_t<R>>>
     953               3 : [[nodiscard]] auto when_any(R&& awaitables) -> task<std::size_t>
     954                 : {
     955                 :     using OwnedRange = std::remove_cvref_t<R>;
     956                 : 
     957                 :     auto count = std::ranges::size(awaitables);
     958                 :     if(count == 0)
     959                 :         throw std::invalid_argument("when_any requires at least one awaitable");
     960                 : 
     961                 :     // Move/copy range onto coroutine frame to ensure lifetime
     962                 :     OwnedRange owned_awaitables = std::forward<R>(awaitables);
     963                 : 
     964                 :     detail::when_any_homogeneous_state<void> state(count);
     965                 : 
     966                 :     co_await detail::when_any_homogeneous_launcher<OwnedRange>(&owned_awaitables, &state);
     967                 : 
     968                 :     if(state.core_.winner_exception_)
     969                 :         std::rethrow_exception(state.core_.winner_exception_);
     970                 : 
     971                 :     co_return state.core_.winner_index_;
     972               6 : }
     973                 : 
     974                 : } // namespace capy
     975                 : } // namespace boost
     976                 : 
     977                 : #endif
        

Generated by: LCOV version 2.3