include/boost/capy/io/any_read_source.hpp

91.2% Lines (83/91) 92.0% Functions (23/25)
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/capy
8 //
9
10 #ifndef BOOST_CAPY_IO_ANY_READ_SOURCE_HPP
11 #define BOOST_CAPY_IO_ANY_READ_SOURCE_HPP
12
13 #include <boost/capy/detail/config.hpp>
14 #include <boost/capy/detail/await_suspend_helper.hpp>
15 #include <boost/capy/buffers.hpp>
16 #include <boost/capy/buffers/buffer_array.hpp>
17 #include <boost/capy/buffers/buffer_param.hpp>
18 #include <boost/capy/concept/io_awaitable.hpp>
19 #include <boost/capy/concept/read_source.hpp>
20 #include <boost/capy/ex/io_env.hpp>
21 #include <boost/capy/io_result.hpp>
22 #include <boost/capy/io_task.hpp>
23
24 #include <concepts>
25 #include <coroutine>
26 #include <cstddef>
27 #include <exception>
28 #include <new>
29 #include <span>
30 #include <stop_token>
31 #include <system_error>
32 #include <utility>
33
34 namespace boost {
35 namespace capy {
36
37 /** Type-erased wrapper for any ReadSource.
38
39 This class provides type erasure for any type satisfying the
40 @ref ReadSource concept, enabling runtime polymorphism for
41 source read operations. It uses cached awaitable storage to achieve
42 zero steady-state allocation after construction.
43
44 The wrapper supports two construction modes:
45 - **Owning**: Pass by value to transfer ownership. The wrapper
46 allocates storage and owns the source.
47 - **Reference**: Pass a pointer to wrap without ownership. The
48 pointed-to source must outlive this wrapper.
49
50 @par Awaitable Preallocation
51 The constructor preallocates storage for the type-erased awaitable.
52 This reserves all virtual address space at server startup
53 so memory usage can be measured up front, rather than
54 allocating piecemeal as traffic arrives.
55
56 @par Immediate Completion
57 Operations complete immediately without suspending when the
58 buffer sequence is empty, or when the underlying source's
59 awaitable reports readiness via `await_ready`.
60
61 @par Thread Safety
62 Not thread-safe. Concurrent operations on the same wrapper
63 are undefined behavior.
64
65 @par Example
66 @code
67 // Owning - takes ownership of the source
68 any_read_source rs(some_source{args...});
69
70 // Reference - wraps without ownership
71 some_source source;
72 any_read_source rs(&source);
73
74 mutable_buffer buf(data, size);
75 auto [ec, n] = co_await rs.read(std::span(&buf, 1));
76 @endcode
77
78 @see any_read_stream, ReadSource
79 */
80 class any_read_source
81 {
82 struct vtable;
83 struct awaitable_ops;
84
85 template<ReadSource S>
86 struct vtable_for_impl;
87
88 void* source_ = nullptr;
89 vtable const* vt_ = nullptr;
90 void* cached_awaitable_ = nullptr;
91 void* storage_ = nullptr;
92 awaitable_ops const* active_ops_ = nullptr;
93
94 public:
95 /** Destructor.
96
97 Destroys the owned source (if any) and releases the cached
98 awaitable storage.
99 */
100 ~any_read_source();
101
102 /** Default constructor.
103
104 Constructs an empty wrapper. Operations on a default-constructed
105 wrapper result in undefined behavior.
106 */
107 any_read_source() = default;
108
109 /** Non-copyable.
110
111 The awaitable cache is per-instance and cannot be shared.
112 */
113 any_read_source(any_read_source const&) = delete;
114 any_read_source& operator=(any_read_source const&) = delete;
115
116 /** Move constructor.
117
118 Transfers ownership of the wrapped source (if owned) and
119 cached awaitable storage from `other`. After the move, `other` is
120 in a default-constructed state.
121
122 @param other The wrapper to move from.
123 */
124 1x any_read_source(any_read_source&& other) noexcept
125 1x : source_(std::exchange(other.source_, nullptr))
126 1x , vt_(std::exchange(other.vt_, nullptr))
127 1x , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
128 1x , storage_(std::exchange(other.storage_, nullptr))
129 1x , active_ops_(std::exchange(other.active_ops_, nullptr))
130 {
131 1x }
132
133 /** Move assignment operator.
134
135 Destroys any owned source and releases existing resources,
136 then transfers ownership from `other`.
137
138 @param other The wrapper to move from.
139 @return Reference to this wrapper.
140 */
141 any_read_source&
142 operator=(any_read_source&& other) noexcept;
143
144 /** Construct by taking ownership of a ReadSource.
145
146 Allocates storage and moves the source into this wrapper.
147 The wrapper owns the source and will destroy it.
148
149 @param s The source to take ownership of.
150 */
151 template<ReadSource S>
152 requires (!std::same_as<std::decay_t<S>, any_read_source>)
153 any_read_source(S s);
154
155 /** Construct by wrapping a ReadSource without ownership.
156
157 Wraps the given source by pointer. The source must remain
158 valid for the lifetime of this wrapper.
159
160 @param s Pointer to the source to wrap.
161 */
162 template<ReadSource S>
163 any_read_source(S* s);
164
165 /** Check if the wrapper contains a valid source.
166
167 @return `true` if wrapping a source, `false` if default-constructed
168 or moved-from.
169 */
170 bool
171 27x has_value() const noexcept
172 {
173 27x return source_ != nullptr;
174 }
175
176 /** Check if the wrapper contains a valid source.
177
178 @return `true` if wrapping a source, `false` if default-constructed
179 or moved-from.
180 */
181 explicit
182 8x operator bool() const noexcept
183 {
184 8x return has_value();
185 }
186
187 /** Initiate a partial read operation.
188
189 Reads one or more bytes into the provided buffer sequence.
190 May fill less than the full sequence.
191
192 @param buffers The buffer sequence to read into.
193
194 @return An awaitable yielding `(error_code,std::size_t)`.
195
196 @par Immediate Completion
197 The operation completes immediately without suspending
198 the calling coroutine when:
199 @li The buffer sequence is empty, returning `{error_code{}, 0}`.
200 @li The underlying source's awaitable reports immediate
201 readiness via `await_ready`.
202
203 @note This is a partial operation and may not process the
204 entire buffer sequence. Use @ref read for guaranteed
205 complete transfer.
206
207 @par Preconditions
208 The wrapper must contain a valid source (`has_value() == true`).
209 The caller must not call this function again after a prior
210 call returned an error (including EOF).
211 */
212 template<MutableBufferSequence MB>
213 auto
214 read_some(MB buffers);
215
216 /** Initiate a complete read operation.
217
218 Reads data into the provided buffer sequence by forwarding
219 to the underlying source's `read` operation. Large buffer
220 sequences are processed in windows, with each window
221 forwarded as a separate `read` call to the underlying source.
222 The operation completes when the entire buffer sequence is
223 filled, end-of-file is reached, or an error occurs.
224
225 @param buffers The buffer sequence to read into.
226
227 @return An awaitable yielding `(error_code,std::size_t)`.
228
229 @par Immediate Completion
230 The operation completes immediately without suspending
231 the calling coroutine when:
232 @li The buffer sequence is empty, returning `{error_code{}, 0}`.
233 @li The underlying source's `read` awaitable reports
234 immediate readiness via `await_ready`.
235
236 @par Postconditions
237 Exactly one of the following is true on return:
238 @li **Success**: `!ec` and `n == buffer_size(buffers)`.
239 The entire buffer was filled.
240 @li **End-of-stream or Error**: `ec` and `n` indicates
241 the number of bytes transferred before the failure.
242
243 @par Preconditions
244 The wrapper must contain a valid source (`has_value() == true`).
245 The caller must not call this function again after a prior
246 call returned an error (including EOF).
247 */
248 template<MutableBufferSequence MB>
249 io_task<std::size_t>
250 read(MB buffers);
251
252 protected:
253 /** Rebind to a new source after move.
254
255 Updates the internal pointer to reference a new source object.
256 Used by owning wrappers after move assignment when the owned
257 object has moved to a new location.
258
259 @param new_source The new source to bind to. Must be the same
260 type as the original source.
261
262 @note Terminates if called with a source of different type
263 than the original.
264 */
265 template<ReadSource S>
266 void
267 rebind(S& new_source) noexcept
268 {
269 if(vt_ != &vtable_for_impl<S>::value)
270 std::terminate();
271 source_ = &new_source;
272 }
273
274 private:
275 auto
276 read_(std::span<mutable_buffer const> buffers);
277 };
278
279 //----------------------------------------------------------
280
281 // ordered by call sequence for cache line coherence
282 struct any_read_source::awaitable_ops
283 {
284 bool (*await_ready)(void*);
285 std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
286 io_result<std::size_t> (*await_resume)(void*);
287 void (*destroy)(void*) noexcept;
288 };
289
290 // ordered by call frequency for cache line coherence
291 struct any_read_source::vtable
292 {
293 awaitable_ops const* (*construct_read_some_awaitable)(
294 void* source,
295 void* storage,
296 std::span<mutable_buffer const> buffers);
297 awaitable_ops const* (*construct_read_awaitable)(
298 void* source,
299 void* storage,
300 std::span<mutable_buffer const> buffers);
301 std::size_t awaitable_size;
302 std::size_t awaitable_align;
303 void (*destroy)(void*) noexcept;
304 };
305
306 template<ReadSource S>
307 struct any_read_source::vtable_for_impl
308 {
309 using ReadSomeAwaitable = decltype(std::declval<S&>().read_some(
310 std::span<mutable_buffer const>{}));
311 using ReadAwaitable = decltype(std::declval<S&>().read(
312 std::span<mutable_buffer const>{}));
313
314 static void
315 6x do_destroy_impl(void* source) noexcept
316 {
317 6x static_cast<S*>(source)->~S();
318 6x }
319
320 static awaitable_ops const*
321 52x construct_read_some_awaitable_impl(
322 void* source,
323 void* storage,
324 std::span<mutable_buffer const> buffers)
325 {
326 52x auto& s = *static_cast<S*>(source);
327 52x ::new(storage) ReadSomeAwaitable(s.read_some(buffers));
328
329 static constexpr awaitable_ops ops = {
330 +[](void* p) {
331 return static_cast<ReadSomeAwaitable*>(p)->await_ready();
332 },
333 +[](void* p, std::coroutine_handle<> h, io_env const* env) {
334 return detail::call_await_suspend(
335 static_cast<ReadSomeAwaitable*>(p), h, env);
336 },
337 +[](void* p) {
338 return static_cast<ReadSomeAwaitable*>(p)->await_resume();
339 },
340 +[](void* p) noexcept {
341 static_cast<ReadSomeAwaitable*>(p)->~ReadSomeAwaitable();
342 }
343 };
344 52x return &ops;
345 }
346
347 static awaitable_ops const*
348 116x construct_read_awaitable_impl(
349 void* source,
350 void* storage,
351 std::span<mutable_buffer const> buffers)
352 {
353 116x auto& s = *static_cast<S*>(source);
354 116x ::new(storage) ReadAwaitable(s.read(buffers));
355
356 static constexpr awaitable_ops ops = {
357 +[](void* p) {
358 return static_cast<ReadAwaitable*>(p)->await_ready();
359 },
360 +[](void* p, std::coroutine_handle<> h, io_env const* env) {
361 return detail::call_await_suspend(
362 static_cast<ReadAwaitable*>(p), h, env);
363 },
364 +[](void* p) {
365 return static_cast<ReadAwaitable*>(p)->await_resume();
366 },
367 +[](void* p) noexcept {
368 static_cast<ReadAwaitable*>(p)->~ReadAwaitable();
369 }
370 };
371 116x return &ops;
372 }
373
374 static constexpr std::size_t max_awaitable_size =
375 sizeof(ReadSomeAwaitable) > sizeof(ReadAwaitable)
376 ? sizeof(ReadSomeAwaitable)
377 : sizeof(ReadAwaitable);
378 static constexpr std::size_t max_awaitable_align =
379 alignof(ReadSomeAwaitable) > alignof(ReadAwaitable)
380 ? alignof(ReadSomeAwaitable)
381 : alignof(ReadAwaitable);
382
383 static constexpr vtable value = {
384 &construct_read_some_awaitable_impl,
385 &construct_read_awaitable_impl,
386 max_awaitable_size,
387 max_awaitable_align,
388 &do_destroy_impl
389 };
390 };
391
392 //----------------------------------------------------------
393
394 inline
395 145x any_read_source::~any_read_source()
396 {
397 145x if(storage_)
398 {
399 6x vt_->destroy(source_);
400 6x ::operator delete(storage_);
401 }
402 145x if(cached_awaitable_)
403 {
404 139x if(active_ops_)
405 1x active_ops_->destroy(cached_awaitable_);
406 139x ::operator delete(cached_awaitable_);
407 }
408 145x }
409
410 inline any_read_source&
411 4x any_read_source::operator=(any_read_source&& other) noexcept
412 {
413 4x if(this != &other)
414 {
415 3x if(storage_)
416 {
417 vt_->destroy(source_);
418 ::operator delete(storage_);
419 }
420 3x if(cached_awaitable_)
421 {
422 2x if(active_ops_)
423 1x active_ops_->destroy(cached_awaitable_);
424 2x ::operator delete(cached_awaitable_);
425 }
426 3x source_ = std::exchange(other.source_, nullptr);
427 3x vt_ = std::exchange(other.vt_, nullptr);
428 3x cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
429 3x storage_ = std::exchange(other.storage_, nullptr);
430 3x active_ops_ = std::exchange(other.active_ops_, nullptr);
431 }
432 4x return *this;
433 }
434
435 template<ReadSource S>
436 requires (!std::same_as<std::decay_t<S>, any_read_source>)
437 6x any_read_source::any_read_source(S s)
438 6x : vt_(&vtable_for_impl<S>::value)
439 {
440 struct guard {
441 any_read_source* self;
442 bool committed = false;
443 6x ~guard() {
444 6x if(!committed && self->storage_) {
445 self->vt_->destroy(self->source_);
446 ::operator delete(self->storage_);
447 self->storage_ = nullptr;
448 self->source_ = nullptr;
449 }
450 6x }
451 6x } g{this};
452
453 6x storage_ = ::operator new(sizeof(S));
454 6x source_ = ::new(storage_) S(std::move(s));
455
456 // Preallocate the awaitable storage
457 6x cached_awaitable_ = ::operator new(vt_->awaitable_size);
458
459 6x g.committed = true;
460 6x }
461
462 template<ReadSource S>
463 135x any_read_source::any_read_source(S* s)
464 135x : source_(s)
465 135x , vt_(&vtable_for_impl<S>::value)
466 {
467 // Preallocate the awaitable storage
468 135x cached_awaitable_ = ::operator new(vt_->awaitable_size);
469 135x }
470
471 //----------------------------------------------------------
472
473 template<MutableBufferSequence MB>
474 auto
475 54x any_read_source::read_some(MB buffers)
476 {
477 struct awaitable
478 {
479 any_read_source* self_;
480 mutable_buffer_array<detail::max_iovec_> ba_;
481
482 awaitable(any_read_source* self, MB const& buffers)
483 : self_(self)
484 , ba_(buffers)
485 {
486 }
487
488 bool
489 await_ready() const noexcept
490 {
491 return ba_.to_span().empty();
492 }
493
494 std::coroutine_handle<>
495 await_suspend(std::coroutine_handle<> h, io_env const* env)
496 {
497 self_->active_ops_ = self_->vt_->construct_read_some_awaitable(
498 self_->source_,
499 self_->cached_awaitable_,
500 ba_.to_span());
501
502 if(self_->active_ops_->await_ready(self_->cached_awaitable_))
503 return h;
504
505 return self_->active_ops_->await_suspend(
506 self_->cached_awaitable_, h, env);
507 }
508
509 io_result<std::size_t>
510 await_resume()
511 {
512 if(ba_.to_span().empty())
513 return {{}, 0};
514
515 struct guard {
516 any_read_source* self;
517 ~guard() {
518 self->active_ops_->destroy(self->cached_awaitable_);
519 self->active_ops_ = nullptr;
520 }
521 } g{self_};
522 return self_->active_ops_->await_resume(
523 self_->cached_awaitable_);
524 }
525 };
526 54x return awaitable(this, buffers);
527 }
528
529 inline auto
530 116x any_read_source::read_(std::span<mutable_buffer const> buffers)
531 {
532 struct awaitable
533 {
534 any_read_source* self_;
535 std::span<mutable_buffer const> buffers_;
536
537 bool
538 116x await_ready() const noexcept
539 {
540 116x return false;
541 }
542
543 std::coroutine_handle<>
544 116x await_suspend(std::coroutine_handle<> h, io_env const* env)
545 {
546 232x self_->active_ops_ = self_->vt_->construct_read_awaitable(
547 116x self_->source_,
548 116x self_->cached_awaitable_,
549 buffers_);
550
551 116x if(self_->active_ops_->await_ready(self_->cached_awaitable_))
552 116x return h;
553
554 return self_->active_ops_->await_suspend(
555 self_->cached_awaitable_, h, env);
556 }
557
558 io_result<std::size_t>
559 116x await_resume()
560 {
561 struct guard {
562 any_read_source* self;
563 116x ~guard() {
564 116x self->active_ops_->destroy(self->cached_awaitable_);
565 116x self->active_ops_ = nullptr;
566 116x }
567 116x } g{self_};
568 116x return self_->active_ops_->await_resume(
569 200x self_->cached_awaitable_);
570 116x }
571 };
572 116x return awaitable{this, buffers};
573 }
574
575 template<MutableBufferSequence MB>
576 io_task<std::size_t>
577 110x any_read_source::read(MB buffers)
578 {
579 buffer_param bp(buffers);
580 std::size_t total = 0;
581
582 for(;;)
583 {
584 auto bufs = bp.data();
585 if(bufs.empty())
586 break;
587
588 auto [ec, n] = co_await read_(bufs);
589 total += n;
590 if(ec)
591 co_return {ec, total};
592 bp.consume(n);
593 }
594
595 co_return {{}, total};
596 220x }
597
598 } // namespace capy
599 } // namespace boost
600
601 #endif
602