include/boost/capy/io/any_read_stream.hpp

87.4% Lines (83/95) 81.6% Functions (31/38)
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/capy
8 //
9
10 #ifndef BOOST_CAPY_IO_ANY_READ_STREAM_HPP
11 #define BOOST_CAPY_IO_ANY_READ_STREAM_HPP
12
13 #include <boost/capy/detail/config.hpp>
14 #include <boost/capy/detail/await_suspend_helper.hpp>
15 #include <boost/capy/buffers.hpp>
16 #include <boost/capy/buffers/buffer_array.hpp>
17 #include <boost/capy/concept/io_awaitable.hpp>
18 #include <boost/capy/concept/read_stream.hpp>
19 #include <boost/capy/ex/io_env.hpp>
20 #include <boost/capy/io_result.hpp>
21
22 #include <concepts>
23 #include <coroutine>
24 #include <cstddef>
25 #include <exception>
26 #include <new>
27 #include <span>
28 #include <stop_token>
29 #include <system_error>
30 #include <utility>
31
32 namespace boost {
33 namespace capy {
34
35 /** Type-erased wrapper for any ReadStream.
36
37 This class provides type erasure for any type satisfying the
38 @ref ReadStream concept, enabling runtime polymorphism for
39 read operations. It uses cached awaitable storage to achieve
40 zero steady-state allocation after construction.
41
42 The wrapper supports two construction modes:
43 - **Owning**: Pass by value to transfer ownership. The wrapper
44 allocates storage and owns the stream.
45 - **Reference**: Pass a pointer to wrap without ownership. The
46 pointed-to stream must outlive this wrapper.
47
48 @par Awaitable Preallocation
49 The constructor preallocates storage for the type-erased awaitable.
50 This reserves all virtual address space at server startup
51 so memory usage can be measured up front, rather than
52 allocating piecemeal as traffic arrives.
53
54 @par Immediate Completion
55 When the underlying stream's awaitable reports ready immediately
56 (e.g. buffered data already available), the wrapper skips
57 coroutine suspension entirely and returns the result inline.
58
59 @par Thread Safety
60 Not thread-safe. Concurrent operations on the same wrapper
61 are undefined behavior.
62
63 @par Example
64 @code
65 // Owning - takes ownership of the stream
66 any_read_stream stream(socket{ioc});
67
68 // Reference - wraps without ownership
69 socket sock(ioc);
70 any_read_stream stream(&sock);
71
72 mutable_buffer buf(data, size);
73 auto [ec, n] = co_await stream.read_some(buf);
74 @endcode
75
76 @see any_write_stream, any_stream, ReadStream
77 */
78 class any_read_stream
79 {
80 struct vtable;
81
82 template<ReadStream S>
83 struct vtable_for_impl;
84
85 // ordered for cache line coherence
86 void* stream_ = nullptr;
87 vtable const* vt_ = nullptr;
88 void* cached_awaitable_ = nullptr;
89 void* storage_ = nullptr;
90 bool awaitable_active_ = false;
91
92 public:
93 /** Destructor.
94
95 Destroys the owned stream (if any) and releases the cached
96 awaitable storage.
97 */
98 ~any_read_stream();
99
100 /** Default constructor.
101
102 Constructs an empty wrapper. Operations on a default-constructed
103 wrapper result in undefined behavior.
104 */
105 1x any_read_stream() = default;
106
107 /** Non-copyable.
108
109 The awaitable cache is per-instance and cannot be shared.
110 */
111 any_read_stream(any_read_stream const&) = delete;
112 any_read_stream& operator=(any_read_stream const&) = delete;
113
114 /** Move constructor.
115
116 Transfers ownership of the wrapped stream (if owned) and
117 cached awaitable storage from `other`. After the move, `other` is
118 in a default-constructed state.
119
120 @param other The wrapper to move from.
121 */
122 2x any_read_stream(any_read_stream&& other) noexcept
123 2x : stream_(std::exchange(other.stream_, nullptr))
124 2x , vt_(std::exchange(other.vt_, nullptr))
125 2x , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
126 2x , storage_(std::exchange(other.storage_, nullptr))
127 2x , awaitable_active_(std::exchange(other.awaitable_active_, false))
128 {
129 2x }
130
131 /** Move assignment operator.
132
133 Destroys any owned stream and releases existing resources,
134 then transfers ownership from `other`.
135
136 @param other The wrapper to move from.
137 @return Reference to this wrapper.
138 */
139 any_read_stream&
140 operator=(any_read_stream&& other) noexcept;
141
142 /** Construct by taking ownership of a ReadStream.
143
144 Allocates storage and moves the stream into this wrapper.
145 The wrapper owns the stream and will destroy it.
146
147 @param s The stream to take ownership of.
148 */
149 template<ReadStream S>
150 requires (!std::same_as<std::decay_t<S>, any_read_stream>)
151 any_read_stream(S s);
152
153 /** Construct by wrapping a ReadStream without ownership.
154
155 Wraps the given stream by pointer. The stream must remain
156 valid for the lifetime of this wrapper.
157
158 @param s Pointer to the stream to wrap.
159 */
160 template<ReadStream S>
161 any_read_stream(S* s);
162
163 /** Check if the wrapper contains a valid stream.
164
165 @return `true` if wrapping a stream, `false` if default-constructed
166 or moved-from.
167 */
168 bool
169 25x has_value() const noexcept
170 {
171 25x return stream_ != nullptr;
172 }
173
174 /** Check if the wrapper contains a valid stream.
175
176 @return `true` if wrapping a stream, `false` if default-constructed
177 or moved-from.
178 */
179 explicit
180 3x operator bool() const noexcept
181 {
182 3x return has_value();
183 }
184
185 /** Initiate an asynchronous read operation.
186
187 Reads data into the provided buffer sequence. The operation
188 completes when at least one byte has been read, or an error
189 occurs.
190
191 @param buffers The buffer sequence to read into. Passed by
192 value to ensure the sequence lives in the coroutine frame
193 across suspension points.
194
195 @return An awaitable yielding `(error_code,std::size_t)`.
196
197 @par Immediate Completion
198 The operation completes immediately without suspending
199 the calling coroutine when the underlying stream's
200 awaitable reports immediate readiness via `await_ready`.
201
202 @note This is a partial operation and may not process the
203 entire buffer sequence. Use the composed @ref read algorithm
204 for guaranteed complete transfer.
205
206 @par Preconditions
207 The wrapper must contain a valid stream (`has_value() == true`).
208 The caller must not call this function again after a prior
209 call returned an error (including EOF).
210 */
211 template<MutableBufferSequence MB>
212 auto
213 read_some(MB buffers);
214
215 protected:
216 /** Rebind to a new stream after move.
217
218 Updates the internal pointer to reference a new stream object.
219 Used by owning wrappers after move assignment when the owned
220 object has moved to a new location.
221
222 @param new_stream The new stream to bind to. Must be the same
223 type as the original stream.
224
225 @note Terminates if called with a stream of different type
226 than the original.
227 */
228 template<ReadStream S>
229 void
230 rebind(S& new_stream) noexcept
231 {
232 if(vt_ != &vtable_for_impl<S>::value)
233 std::terminate();
234 stream_ = &new_stream;
235 }
236 };
237
238 //----------------------------------------------------------
239
240 struct any_read_stream::vtable
241 {
242 // ordered by call frequency for cache line coherence
243 void (*construct_awaitable)(
244 void* stream,
245 void* storage,
246 std::span<mutable_buffer const> buffers);
247 bool (*await_ready)(void*);
248 std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
249 io_result<std::size_t> (*await_resume)(void*);
250 void (*destroy_awaitable)(void*) noexcept;
251 std::size_t awaitable_size;
252 std::size_t awaitable_align;
253 void (*destroy)(void*) noexcept;
254 };
255
256 template<ReadStream S>
257 struct any_read_stream::vtable_for_impl
258 {
259 using Awaitable = decltype(std::declval<S&>().read_some(
260 std::span<mutable_buffer const>{}));
261
262 static void
263 1x do_destroy_impl(void* stream) noexcept
264 {
265 1x static_cast<S*>(stream)->~S();
266 1x }
267
268 static void
269 91x construct_awaitable_impl(
270 void* stream,
271 void* storage,
272 std::span<mutable_buffer const> buffers)
273 {
274 91x auto& s = *static_cast<S*>(stream);
275 91x ::new(storage) Awaitable(s.read_some(buffers));
276 91x }
277
278 static constexpr vtable value = {
279 &construct_awaitable_impl,
280 91x +[](void* p) {
281 91x return static_cast<Awaitable*>(p)->await_ready();
282 },
283 +[](void* p, std::coroutine_handle<> h, io_env const* env) {
284 return detail::call_await_suspend(
285 static_cast<Awaitable*>(p), h, env);
286 },
287 89x +[](void* p) {
288 89x return static_cast<Awaitable*>(p)->await_resume();
289 },
290 93x +[](void* p) noexcept {
291 16x static_cast<Awaitable*>(p)->~Awaitable();
292 },
293 sizeof(Awaitable),
294 alignof(Awaitable),
295 &do_destroy_impl
296 };
297 };
298
299 //----------------------------------------------------------
300
301 inline
302 101x any_read_stream::~any_read_stream()
303 {
304 101x if(storage_)
305 {
306 1x vt_->destroy(stream_);
307 1x ::operator delete(storage_);
308 }
309 101x if(cached_awaitable_)
310 {
311 91x if(awaitable_active_)
312 1x vt_->destroy_awaitable(cached_awaitable_);
313 91x ::operator delete(cached_awaitable_);
314 }
315 101x }
316
317 inline any_read_stream&
318 5x any_read_stream::operator=(any_read_stream&& other) noexcept
319 {
320 5x if(this != &other)
321 {
322 5x if(storage_)
323 {
324 vt_->destroy(stream_);
325 ::operator delete(storage_);
326 }
327 5x if(cached_awaitable_)
328 {
329 2x if(awaitable_active_)
330 1x vt_->destroy_awaitable(cached_awaitable_);
331 2x ::operator delete(cached_awaitable_);
332 }
333 5x stream_ = std::exchange(other.stream_, nullptr);
334 5x vt_ = std::exchange(other.vt_, nullptr);
335 5x cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
336 5x storage_ = std::exchange(other.storage_, nullptr);
337 5x awaitable_active_ = std::exchange(other.awaitable_active_, false);
338 }
339 5x return *this;
340 }
341
342 template<ReadStream S>
343 requires (!std::same_as<std::decay_t<S>, any_read_stream>)
344 1x any_read_stream::any_read_stream(S s)
345 1x : vt_(&vtable_for_impl<S>::value)
346 {
347 struct guard {
348 any_read_stream* self;
349 bool committed = false;
350 1x ~guard() {
351 1x if(!committed && self->storage_) {
352 self->vt_->destroy(self->stream_);
353 ::operator delete(self->storage_);
354 self->storage_ = nullptr;
355 self->stream_ = nullptr;
356 }
357 1x }
358 1x } g{this};
359
360 1x storage_ = ::operator new(sizeof(S));
361 1x stream_ = ::new(storage_) S(std::move(s));
362
363 // Preallocate the awaitable storage
364 1x cached_awaitable_ = ::operator new(vt_->awaitable_size);
365
366 1x g.committed = true;
367 1x }
368
369 template<ReadStream S>
370 92x any_read_stream::any_read_stream(S* s)
371 92x : stream_(s)
372 92x , vt_(&vtable_for_impl<S>::value)
373 {
374 // Preallocate the awaitable storage
375 92x cached_awaitable_ = ::operator new(vt_->awaitable_size);
376 92x }
377
378 //----------------------------------------------------------
379
380 template<MutableBufferSequence MB>
381 auto
382 91x any_read_stream::read_some(MB buffers)
383 {
384 // VFALCO in theory, we could use if constexpr to detect a
385 // span and then pass that through to read_some without the array
386 struct awaitable
387 {
388 any_read_stream* self_;
389 mutable_buffer_array<detail::max_iovec_> ba_;
390
391 bool
392 14x await_ready()
393 {
394 14x self_->vt_->construct_awaitable(
395 14x self_->stream_,
396 14x self_->cached_awaitable_,
397 14x ba_.to_span());
398 14x self_->awaitable_active_ = true;
399
400 28x return self_->vt_->await_ready(
401 14x self_->cached_awaitable_);
402 }
403
404 std::coroutine_handle<>
405 await_suspend(std::coroutine_handle<> h, io_env const* env)
406 {
407 return self_->vt_->await_suspend(
408 self_->cached_awaitable_, h, env);
409 }
410
411 io_result<std::size_t>
412 14x await_resume()
413 {
414 struct guard {
415 any_read_stream* self;
416 14x ~guard() {
417 14x self->vt_->destroy_awaitable(self->cached_awaitable_);
418 14x self->awaitable_active_ = false;
419 14x }
420 14x } g{self_};
421 14x return self_->vt_->await_resume(
422 24x self_->cached_awaitable_);
423 14x }
424 };
425 return awaitable{this,
426 91x mutable_buffer_array<detail::max_iovec_>(buffers)};
427 91x }
428
429 } // namespace capy
430 } // namespace boost
431
432 #endif
433