include/boost/capy/io/any_write_sink.hpp

93.2% Lines (164/176) 93.3% Functions (42/45)
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/capy
8 //
9
10 #ifndef BOOST_CAPY_IO_ANY_WRITE_SINK_HPP
11 #define BOOST_CAPY_IO_ANY_WRITE_SINK_HPP
12
13 #include <boost/capy/detail/config.hpp>
14 #include <boost/capy/detail/await_suspend_helper.hpp>
15 #include <boost/capy/buffers.hpp>
16 #include <boost/capy/buffers/buffer_array.hpp>
17 #include <boost/capy/buffers/buffer_param.hpp>
18 #include <boost/capy/concept/io_awaitable.hpp>
19 #include <boost/capy/concept/write_sink.hpp>
20 #include <coroutine>
21 #include <boost/capy/ex/io_env.hpp>
22 #include <boost/capy/io_result.hpp>
23 #include <boost/capy/io_task.hpp>
24
25 #include <concepts>
26 #include <coroutine>
27 #include <cstddef>
28 #include <exception>
29 #include <new>
30 #include <span>
31 #include <stop_token>
32 #include <system_error>
33 #include <utility>
34
35 namespace boost {
36 namespace capy {
37
38 /** Type-erased wrapper for any WriteSink.
39
40 This class provides type erasure for any type satisfying the
41 @ref WriteSink concept, enabling runtime polymorphism for
42 sink write operations. It uses cached awaitable storage to achieve
43 zero steady-state allocation after construction.
44
45 The wrapper supports two construction modes:
46 - **Owning**: Pass by value to transfer ownership. The wrapper
47 allocates storage and owns the sink.
48 - **Reference**: Pass a pointer to wrap without ownership. The
49 pointed-to sink must outlive this wrapper.
50
51 @par Awaitable Preallocation
52 The constructor preallocates storage for the type-erased awaitable.
53 This reserves all virtual address space at server startup
54 so memory usage can be measured up front, rather than
55 allocating piecemeal as traffic arrives.
56
57 @par Immediate Completion
58 Operations complete immediately without suspending when the
59 buffer sequence is empty, or when the underlying sink's
60 awaitable reports readiness via `await_ready`.
61
62 @par Thread Safety
63 Not thread-safe. Concurrent operations on the same wrapper
64 are undefined behavior.
65
66 @par Example
67 @code
68 // Owning - takes ownership of the sink
69 any_write_sink ws(some_sink{args...});
70
71 // Reference - wraps without ownership
72 some_sink sink;
73 any_write_sink ws(&sink);
74
75 const_buffer buf(data, size);
76 auto [ec, n] = co_await ws.write(std::span(&buf, 1));
77 auto [ec2] = co_await ws.write_eof();
78 @endcode
79
80 @see any_write_stream, WriteSink
81 */
82 class any_write_sink
83 {
84 struct vtable;
85 struct write_awaitable_ops;
86 struct eof_awaitable_ops;
87
88 template<WriteSink S>
89 struct vtable_for_impl;
90
91 void* sink_ = nullptr;
92 vtable const* vt_ = nullptr;
93 void* cached_awaitable_ = nullptr;
94 void* storage_ = nullptr;
95 write_awaitable_ops const* active_write_ops_ = nullptr;
96 eof_awaitable_ops const* active_eof_ops_ = nullptr;
97
98 public:
99 /** Destructor.
100
101 Destroys the owned sink (if any) and releases the cached
102 awaitable storage.
103 */
104 ~any_write_sink();
105
106 /** Default constructor.
107
108 Constructs an empty wrapper. Operations on a default-constructed
109 wrapper result in undefined behavior.
110 */
111 any_write_sink() = default;
112
113 /** Non-copyable.
114
115 The awaitable cache is per-instance and cannot be shared.
116 */
117 any_write_sink(any_write_sink const&) = delete;
118 any_write_sink& operator=(any_write_sink const&) = delete;
119
120 /** Move constructor.
121
122 Transfers ownership of the wrapped sink (if owned) and
123 cached awaitable storage from `other`. After the move, `other` is
124 in a default-constructed state.
125
126 @param other The wrapper to move from.
127 */
128 1x any_write_sink(any_write_sink&& other) noexcept
129 1x : sink_(std::exchange(other.sink_, nullptr))
130 1x , vt_(std::exchange(other.vt_, nullptr))
131 1x , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
132 1x , storage_(std::exchange(other.storage_, nullptr))
133 1x , active_write_ops_(std::exchange(other.active_write_ops_, nullptr))
134 1x , active_eof_ops_(std::exchange(other.active_eof_ops_, nullptr))
135 {
136 1x }
137
138 /** Move assignment operator.
139
140 Destroys any owned sink and releases existing resources,
141 then transfers ownership from `other`.
142
143 @param other The wrapper to move from.
144 @return Reference to this wrapper.
145 */
146 any_write_sink&
147 operator=(any_write_sink&& other) noexcept;
148
149 /** Construct by taking ownership of a WriteSink.
150
151 Allocates storage and moves the sink into this wrapper.
152 The wrapper owns the sink and will destroy it.
153
154 @param s The sink to take ownership of.
155 */
156 template<WriteSink S>
157 requires (!std::same_as<std::decay_t<S>, any_write_sink>)
158 any_write_sink(S s);
159
160 /** Construct by wrapping a WriteSink without ownership.
161
162 Wraps the given sink by pointer. The sink must remain
163 valid for the lifetime of this wrapper.
164
165 @param s Pointer to the sink to wrap.
166 */
167 template<WriteSink S>
168 any_write_sink(S* s);
169
170 /** Check if the wrapper contains a valid sink.
171
172 @return `true` if wrapping a sink, `false` if default-constructed
173 or moved-from.
174 */
175 bool
176 15x has_value() const noexcept
177 {
178 15x return sink_ != nullptr;
179 }
180
181 /** Check if the wrapper contains a valid sink.
182
183 @return `true` if wrapping a sink, `false` if default-constructed
184 or moved-from.
185 */
186 explicit
187 2x operator bool() const noexcept
188 {
189 2x return has_value();
190 }
191
192 /** Initiate a partial write operation.
193
194 Writes one or more bytes from the provided buffer sequence.
195 May consume less than the full sequence.
196
197 @param buffers The buffer sequence containing data to write.
198
199 @return An awaitable yielding `(error_code,std::size_t)`.
200
201 @par Immediate Completion
202 The operation completes immediately without suspending
203 the calling coroutine when:
204 @li The buffer sequence is empty, returning `{error_code{}, 0}`.
205 @li The underlying sink's awaitable reports immediate
206 readiness via `await_ready`.
207
208 @note This is a partial operation and may not process the
209 entire buffer sequence. Use @ref write for guaranteed
210 complete transfer.
211
212 @par Preconditions
213 The wrapper must contain a valid sink (`has_value() == true`).
214 */
215 template<ConstBufferSequence CB>
216 auto
217 write_some(CB buffers);
218
219 /** Initiate a complete write operation.
220
221 Writes data from the provided buffer sequence. The operation
222 completes when all bytes have been consumed, or an error
223 occurs. Forwards to the underlying sink's `write` operation,
224 windowed through @ref buffer_param when the sequence exceeds
225 the per-call buffer limit.
226
227 @param buffers The buffer sequence containing data to write.
228
229 @return An awaitable yielding `(error_code,std::size_t)`.
230
231 @par Immediate Completion
232 The operation completes immediately without suspending
233 the calling coroutine when:
234 @li The buffer sequence is empty, returning `{error_code{}, 0}`.
235 @li Every underlying `write` call completes
236 immediately (the wrapped sink reports readiness
237 via `await_ready` on each iteration).
238
239 @par Preconditions
240 The wrapper must contain a valid sink (`has_value() == true`).
241 */
242 template<ConstBufferSequence CB>
243 io_task<std::size_t>
244 write(CB buffers);
245
246 /** Atomically write data and signal end-of-stream.
247
248 Writes all data from the buffer sequence and then signals
249 end-of-stream. The implementation decides how to partition
250 the data across calls to the underlying sink's @ref write
251 and `write_eof`. When the caller's buffer sequence is
252 non-empty, the final call to the underlying sink is always
253 `write_eof` with a non-empty buffer sequence. When the
254 caller's buffer sequence is empty, only `write_eof()` with
255 no data is called.
256
257 @param buffers The buffer sequence containing data to write.
258
259 @return An awaitable yielding `(error_code,std::size_t)`.
260
261 @par Immediate Completion
262 The operation completes immediately without suspending
263 the calling coroutine when:
264 @li The buffer sequence is empty. Only the @ref write_eof()
265 call is performed.
266 @li All underlying operations complete immediately (the
267 wrapped sink reports readiness via `await_ready`).
268
269 @par Preconditions
270 The wrapper must contain a valid sink (`has_value() == true`).
271 */
272 template<ConstBufferSequence CB>
273 io_task<std::size_t>
274 write_eof(CB buffers);
275
276 /** Signal end of data.
277
278 Indicates that no more data will be written to the sink.
279 The operation completes when the sink is finalized, or
280 an error occurs.
281
282 @return An awaitable yielding `(error_code)`.
283
284 @par Immediate Completion
285 The operation completes immediately without suspending
286 the calling coroutine when the underlying sink's awaitable
287 reports immediate readiness via `await_ready`.
288
289 @par Preconditions
290 The wrapper must contain a valid sink (`has_value() == true`).
291 */
292 auto
293 write_eof();
294
295 protected:
296 /** Rebind to a new sink after move.
297
298 Updates the internal pointer to reference a new sink object.
299 Used by owning wrappers after move assignment when the owned
300 object has moved to a new location.
301
302 @param new_sink The new sink to bind to. Must be the same
303 type as the original sink.
304
305 @note Terminates if called with a sink of different type
306 than the original.
307 */
308 template<WriteSink S>
309 void
310 rebind(S& new_sink) noexcept
311 {
312 if(vt_ != &vtable_for_impl<S>::value)
313 std::terminate();
314 sink_ = &new_sink;
315 }
316
317 private:
318 auto
319 write_some_(std::span<const_buffer const> buffers);
320
321 auto
322 write_(std::span<const_buffer const> buffers);
323
324 auto
325 write_eof_buffers_(std::span<const_buffer const> buffers);
326 };
327
328 //----------------------------------------------------------
329
330 struct any_write_sink::write_awaitable_ops
331 {
332 bool (*await_ready)(void*);
333 std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
334 io_result<std::size_t> (*await_resume)(void*);
335 void (*destroy)(void*) noexcept;
336 };
337
338 struct any_write_sink::eof_awaitable_ops
339 {
340 bool (*await_ready)(void*);
341 std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
342 io_result<> (*await_resume)(void*);
343 void (*destroy)(void*) noexcept;
344 };
345
346 struct any_write_sink::vtable
347 {
348 write_awaitable_ops const* (*construct_write_some_awaitable)(
349 void* sink,
350 void* storage,
351 std::span<const_buffer const> buffers);
352 write_awaitable_ops const* (*construct_write_awaitable)(
353 void* sink,
354 void* storage,
355 std::span<const_buffer const> buffers);
356 write_awaitable_ops const* (*construct_write_eof_buffers_awaitable)(
357 void* sink,
358 void* storage,
359 std::span<const_buffer const> buffers);
360 eof_awaitable_ops const* (*construct_eof_awaitable)(
361 void* sink,
362 void* storage);
363 std::size_t awaitable_size;
364 std::size_t awaitable_align;
365 void (*destroy)(void*) noexcept;
366 };
367
368 template<WriteSink S>
369 struct any_write_sink::vtable_for_impl
370 {
371 using WriteSomeAwaitable = decltype(std::declval<S&>().write_some(
372 std::span<const_buffer const>{}));
373 using WriteAwaitable = decltype(std::declval<S&>().write(
374 std::span<const_buffer const>{}));
375 using WriteEofBuffersAwaitable = decltype(std::declval<S&>().write_eof(
376 std::span<const_buffer const>{}));
377 using EofAwaitable = decltype(std::declval<S&>().write_eof());
378
379 static void
380 6x do_destroy_impl(void* sink) noexcept
381 {
382 6x static_cast<S*>(sink)->~S();
383 6x }
384
385 static write_awaitable_ops const*
386 40x construct_write_some_awaitable_impl(
387 void* sink,
388 void* storage,
389 std::span<const_buffer const> buffers)
390 {
391 40x auto& s = *static_cast<S*>(sink);
392 40x ::new(storage) WriteSomeAwaitable(s.write_some(buffers));
393
394 static constexpr write_awaitable_ops ops = {
395 +[](void* p) {
396 return static_cast<WriteSomeAwaitable*>(p)->await_ready();
397 },
398 +[](void* p, std::coroutine_handle<> h, io_env const* env) {
399 return detail::call_await_suspend(
400 static_cast<WriteSomeAwaitable*>(p), h, env);
401 },
402 +[](void* p) {
403 return static_cast<WriteSomeAwaitable*>(p)->await_resume();
404 },
405 +[](void* p) noexcept {
406 static_cast<WriteSomeAwaitable*>(p)->~WriteSomeAwaitable();
407 }
408 };
409 40x return &ops;
410 }
411
412 static write_awaitable_ops const*
413 78x construct_write_awaitable_impl(
414 void* sink,
415 void* storage,
416 std::span<const_buffer const> buffers)
417 {
418 78x auto& s = *static_cast<S*>(sink);
419 78x ::new(storage) WriteAwaitable(s.write(buffers));
420
421 static constexpr write_awaitable_ops ops = {
422 +[](void* p) {
423 return static_cast<WriteAwaitable*>(p)->await_ready();
424 },
425 +[](void* p, std::coroutine_handle<> h, io_env const* env) {
426 return detail::call_await_suspend(
427 static_cast<WriteAwaitable*>(p), h, env);
428 },
429 +[](void* p) {
430 return static_cast<WriteAwaitable*>(p)->await_resume();
431 },
432 +[](void* p) noexcept {
433 static_cast<WriteAwaitable*>(p)->~WriteAwaitable();
434 }
435 };
436 78x return &ops;
437 }
438
439 static write_awaitable_ops const*
440 16x construct_write_eof_buffers_awaitable_impl(
441 void* sink,
442 void* storage,
443 std::span<const_buffer const> buffers)
444 {
445 16x auto& s = *static_cast<S*>(sink);
446 16x ::new(storage) WriteEofBuffersAwaitable(s.write_eof(buffers));
447
448 static constexpr write_awaitable_ops ops = {
449 +[](void* p) {
450 return static_cast<WriteEofBuffersAwaitable*>(p)->await_ready();
451 },
452 +[](void* p, std::coroutine_handle<> h, io_env const* env) {
453 return detail::call_await_suspend(
454 static_cast<WriteEofBuffersAwaitable*>(p), h, env);
455 },
456 +[](void* p) {
457 return static_cast<WriteEofBuffersAwaitable*>(p)->await_resume();
458 },
459 +[](void* p) noexcept {
460 static_cast<WriteEofBuffersAwaitable*>(p)->~WriteEofBuffersAwaitable();
461 }
462 };
463 16x return &ops;
464 }
465
466 static eof_awaitable_ops const*
467 17x construct_eof_awaitable_impl(
468 void* sink,
469 void* storage)
470 {
471 17x auto& s = *static_cast<S*>(sink);
472 17x ::new(storage) EofAwaitable(s.write_eof());
473
474 static constexpr eof_awaitable_ops ops = {
475 +[](void* p) {
476 return static_cast<EofAwaitable*>(p)->await_ready();
477 },
478 +[](void* p, std::coroutine_handle<> h, io_env const* env) {
479 return detail::call_await_suspend(
480 static_cast<EofAwaitable*>(p), h, env);
481 },
482 +[](void* p) {
483 return static_cast<EofAwaitable*>(p)->await_resume();
484 },
485 +[](void* p) noexcept {
486 static_cast<EofAwaitable*>(p)->~EofAwaitable();
487 }
488 };
489 17x return &ops;
490 }
491
492 static constexpr std::size_t max4(
493 std::size_t a, std::size_t b,
494 std::size_t c, std::size_t d) noexcept
495 {
496 std::size_t ab = a > b ? a : b;
497 std::size_t cd = c > d ? c : d;
498 return ab > cd ? ab : cd;
499 }
500
501 static constexpr std::size_t max_awaitable_size =
502 max4(sizeof(WriteSomeAwaitable),
503 sizeof(WriteAwaitable),
504 sizeof(WriteEofBuffersAwaitable),
505 sizeof(EofAwaitable));
506
507 static constexpr std::size_t max_awaitable_align =
508 max4(alignof(WriteSomeAwaitable),
509 alignof(WriteAwaitable),
510 alignof(WriteEofBuffersAwaitable),
511 alignof(EofAwaitable));
512
513 static constexpr vtable value = {
514 &construct_write_some_awaitable_impl,
515 &construct_write_awaitable_impl,
516 &construct_write_eof_buffers_awaitable_impl,
517 &construct_eof_awaitable_impl,
518 max_awaitable_size,
519 max_awaitable_align,
520 &do_destroy_impl
521 };
522 };
523
524 //----------------------------------------------------------
525
526 inline
527 129x any_write_sink::~any_write_sink()
528 {
529 129x if(storage_)
530 {
531 6x vt_->destroy(sink_);
532 6x ::operator delete(storage_);
533 }
534 129x if(cached_awaitable_)
535 {
536 124x if(active_write_ops_)
537 1x active_write_ops_->destroy(cached_awaitable_);
538 123x else if(active_eof_ops_)
539 1x active_eof_ops_->destroy(cached_awaitable_);
540 124x ::operator delete(cached_awaitable_);
541 }
542 129x }
543
544 inline any_write_sink&
545 2x any_write_sink::operator=(any_write_sink&& other) noexcept
546 {
547 2x if(this != &other)
548 {
549 2x if(storage_)
550 {
551 vt_->destroy(sink_);
552 ::operator delete(storage_);
553 }
554 2x if(cached_awaitable_)
555 {
556 1x if(active_write_ops_)
557 1x active_write_ops_->destroy(cached_awaitable_);
558 else if(active_eof_ops_)
559 active_eof_ops_->destroy(cached_awaitable_);
560 1x ::operator delete(cached_awaitable_);
561 }
562 2x sink_ = std::exchange(other.sink_, nullptr);
563 2x vt_ = std::exchange(other.vt_, nullptr);
564 2x cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
565 2x storage_ = std::exchange(other.storage_, nullptr);
566 2x active_write_ops_ = std::exchange(other.active_write_ops_, nullptr);
567 2x active_eof_ops_ = std::exchange(other.active_eof_ops_, nullptr);
568 }
569 2x return *this;
570 }
571
572 template<WriteSink S>
573 requires (!std::same_as<std::decay_t<S>, any_write_sink>)
574 6x any_write_sink::any_write_sink(S s)
575 6x : vt_(&vtable_for_impl<S>::value)
576 {
577 struct guard {
578 any_write_sink* self;
579 bool committed = false;
580 6x ~guard() {
581 6x if(!committed && self->storage_) {
582 self->vt_->destroy(self->sink_);
583 ::operator delete(self->storage_);
584 self->storage_ = nullptr;
585 self->sink_ = nullptr;
586 }
587 6x }
588 6x } g{this};
589
590 6x storage_ = ::operator new(sizeof(S));
591 6x sink_ = ::new(storage_) S(std::move(s));
592
593 // Preallocate the awaitable storage (sized for max of write/eof)
594 6x cached_awaitable_ = ::operator new(vt_->awaitable_size);
595
596 6x g.committed = true;
597 6x }
598
599 template<WriteSink S>
600 119x any_write_sink::any_write_sink(S* s)
601 119x : sink_(s)
602 119x , vt_(&vtable_for_impl<S>::value)
603 {
604 // Preallocate the awaitable storage (sized for max of write/eof)
605 119x cached_awaitable_ = ::operator new(vt_->awaitable_size);
606 119x }
607
608 //----------------------------------------------------------
609
610 inline auto
611 any_write_sink::write_some_(
612 std::span<const_buffer const> buffers)
613 {
614 struct awaitable
615 {
616 any_write_sink* self_;
617 std::span<const_buffer const> buffers_;
618
619 bool
620 await_ready() const noexcept
621 {
622 return false;
623 }
624
625 std::coroutine_handle<>
626 await_suspend(std::coroutine_handle<> h, io_env const* env)
627 {
628 self_->active_write_ops_ = self_->vt_->construct_write_some_awaitable(
629 self_->sink_,
630 self_->cached_awaitable_,
631 buffers_);
632
633 if(self_->active_write_ops_->await_ready(self_->cached_awaitable_))
634 return h;
635
636 return self_->active_write_ops_->await_suspend(
637 self_->cached_awaitable_, h, env);
638 }
639
640 io_result<std::size_t>
641 await_resume()
642 {
643 struct guard {
644 any_write_sink* self;
645 ~guard() {
646 self->active_write_ops_->destroy(self->cached_awaitable_);
647 self->active_write_ops_ = nullptr;
648 }
649 } g{self_};
650 return self_->active_write_ops_->await_resume(
651 self_->cached_awaitable_);
652 }
653 };
654 return awaitable{this, buffers};
655 }
656
657 inline auto
658 78x any_write_sink::write_(
659 std::span<const_buffer const> buffers)
660 {
661 struct awaitable
662 {
663 any_write_sink* self_;
664 std::span<const_buffer const> buffers_;
665
666 bool
667 78x await_ready() const noexcept
668 {
669 78x return false;
670 }
671
672 std::coroutine_handle<>
673 78x await_suspend(std::coroutine_handle<> h, io_env const* env)
674 {
675 156x self_->active_write_ops_ = self_->vt_->construct_write_awaitable(
676 78x self_->sink_,
677 78x self_->cached_awaitable_,
678 buffers_);
679
680 78x if(self_->active_write_ops_->await_ready(self_->cached_awaitable_))
681 78x return h;
682
683 return self_->active_write_ops_->await_suspend(
684 self_->cached_awaitable_, h, env);
685 }
686
687 io_result<std::size_t>
688 78x await_resume()
689 {
690 struct guard {
691 any_write_sink* self;
692 78x ~guard() {
693 78x self->active_write_ops_->destroy(self->cached_awaitable_);
694 78x self->active_write_ops_ = nullptr;
695 78x }
696 78x } g{self_};
697 78x return self_->active_write_ops_->await_resume(
698 135x self_->cached_awaitable_);
699 78x }
700 };
701 78x return awaitable{this, buffers};
702 }
703
704 inline auto
705 17x any_write_sink::write_eof()
706 {
707 struct awaitable
708 {
709 any_write_sink* self_;
710
711 bool
712 17x await_ready() const noexcept
713 {
714 17x return false;
715 }
716
717 std::coroutine_handle<>
718 17x await_suspend(std::coroutine_handle<> h, io_env const* env)
719 {
720 // Construct the underlying awaitable into cached storage
721 34x self_->active_eof_ops_ = self_->vt_->construct_eof_awaitable(
722 17x self_->sink_,
723 17x self_->cached_awaitable_);
724
725 // Check if underlying is immediately ready
726 17x if(self_->active_eof_ops_->await_ready(self_->cached_awaitable_))
727 16x return h;
728
729 // Forward to underlying awaitable
730 1x return self_->active_eof_ops_->await_suspend(
731 1x self_->cached_awaitable_, h, env);
732 }
733
734 io_result<>
735 16x await_resume()
736 {
737 struct guard {
738 any_write_sink* self;
739 16x ~guard() {
740 16x self->active_eof_ops_->destroy(self->cached_awaitable_);
741 16x self->active_eof_ops_ = nullptr;
742 16x }
743 16x } g{self_};
744 16x return self_->active_eof_ops_->await_resume(
745 27x self_->cached_awaitable_);
746 16x }
747 };
748 17x return awaitable{this};
749 }
750
751 inline auto
752 16x any_write_sink::write_eof_buffers_(
753 std::span<const_buffer const> buffers)
754 {
755 struct awaitable
756 {
757 any_write_sink* self_;
758 std::span<const_buffer const> buffers_;
759
760 bool
761 16x await_ready() const noexcept
762 {
763 16x return false;
764 }
765
766 std::coroutine_handle<>
767 16x await_suspend(std::coroutine_handle<> h, io_env const* env)
768 {
769 32x self_->active_write_ops_ =
770 32x self_->vt_->construct_write_eof_buffers_awaitable(
771 16x self_->sink_,
772 16x self_->cached_awaitable_,
773 buffers_);
774
775 16x if(self_->active_write_ops_->await_ready(self_->cached_awaitable_))
776 16x return h;
777
778 return self_->active_write_ops_->await_suspend(
779 self_->cached_awaitable_, h, env);
780 }
781
782 io_result<std::size_t>
783 16x await_resume()
784 {
785 struct guard {
786 any_write_sink* self;
787 16x ~guard() {
788 16x self->active_write_ops_->destroy(self->cached_awaitable_);
789 16x self->active_write_ops_ = nullptr;
790 16x }
791 16x } g{self_};
792 16x return self_->active_write_ops_->await_resume(
793 27x self_->cached_awaitable_);
794 16x }
795 };
796 16x return awaitable{this, buffers};
797 }
798
799 template<ConstBufferSequence CB>
800 auto
801 42x any_write_sink::write_some(CB buffers)
802 {
803 struct awaitable
804 {
805 any_write_sink* self_;
806 const_buffer_array<detail::max_iovec_> ba_;
807
808 42x awaitable(
809 any_write_sink* self,
810 CB const& buffers)
811 42x : self_(self)
812 42x , ba_(buffers)
813 {
814 42x }
815
816 bool
817 42x await_ready() const noexcept
818 {
819 42x return ba_.to_span().empty();
820 }
821
822 std::coroutine_handle<>
823 40x await_suspend(std::coroutine_handle<> h, io_env const* env)
824 {
825 40x self_->active_write_ops_ = self_->vt_->construct_write_some_awaitable(
826 40x self_->sink_,
827 40x self_->cached_awaitable_,
828 40x ba_.to_span());
829
830 40x if(self_->active_write_ops_->await_ready(self_->cached_awaitable_))
831 38x return h;
832
833 2x return self_->active_write_ops_->await_suspend(
834 2x self_->cached_awaitable_, h, env);
835 }
836
837 io_result<std::size_t>
838 40x await_resume()
839 {
840 40x if(ba_.to_span().empty())
841 2x return {{}, 0};
842
843 struct guard {
844 any_write_sink* self;
845 38x ~guard() {
846 38x self->active_write_ops_->destroy(self->cached_awaitable_);
847 38x self->active_write_ops_ = nullptr;
848 38x }
849 38x } g{self_};
850 38x return self_->active_write_ops_->await_resume(
851 38x self_->cached_awaitable_);
852 38x }
853 };
854 42x return awaitable{this, buffers};
855 }
856
857 template<ConstBufferSequence CB>
858 io_task<std::size_t>
859 68x any_write_sink::write(CB buffers)
860 {
861 buffer_param<CB> bp(buffers);
862 std::size_t total = 0;
863
864 for(;;)
865 {
866 auto bufs = bp.data();
867 if(bufs.empty())
868 break;
869
870 auto [ec, n] = co_await write_(bufs);
871 total += n;
872 if(ec)
873 co_return {ec, total};
874 bp.consume(n);
875 }
876
877 co_return {{}, total};
878 136x }
879
880 template<ConstBufferSequence CB>
881 io_task<std::size_t>
882 26x any_write_sink::write_eof(CB buffers)
883 {
884 const_buffer_param<CB> bp(buffers);
885 std::size_t total = 0;
886
887 for(;;)
888 {
889 auto bufs = bp.data();
890 if(bufs.empty())
891 {
892 auto [ec] = co_await write_eof();
893 co_return {ec, total};
894 }
895
896 if(! bp.more())
897 {
898 // Last window — send atomically with EOF
899 auto [ec, n] = co_await write_eof_buffers_(bufs);
900 total += n;
901 co_return {ec, total};
902 }
903
904 auto [ec, n] = co_await write_(bufs);
905 total += n;
906 if(ec)
907 co_return {ec, total};
908 bp.consume(n);
909 }
910 52x }
911
912 } // namespace capy
913 } // namespace boost
914
915 #endif
916