include/boost/capy/io/any_write_stream.hpp

94.1% Lines (95/101) 85.7% Functions (30/35)
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/capy
8 //
9
10 #ifndef BOOST_CAPY_IO_ANY_WRITE_STREAM_HPP
11 #define BOOST_CAPY_IO_ANY_WRITE_STREAM_HPP
12
13 #include <boost/capy/detail/config.hpp>
14 #include <boost/capy/detail/await_suspend_helper.hpp>
15 #include <boost/capy/buffers.hpp>
16 #include <boost/capy/buffers/buffer_array.hpp>
17 #include <boost/capy/concept/io_awaitable.hpp>
18 #include <boost/capy/concept/write_stream.hpp>
19 #include <coroutine>
20 #include <boost/capy/ex/io_env.hpp>
21 #include <boost/capy/io_result.hpp>
22
23 #include <concepts>
24 #include <coroutine>
25 #include <cstddef>
26 #include <exception>
27 #include <new>
28 #include <span>
29 #include <stop_token>
30 #include <system_error>
31 #include <utility>
32
33 namespace boost {
34 namespace capy {
35
36 /** Type-erased wrapper for any WriteStream.
37
38 This class provides type erasure for any type satisfying the
39 @ref WriteStream concept, enabling runtime polymorphism for
40 write operations. It uses cached awaitable storage to achieve
41 zero steady-state allocation after construction.
42
43 The wrapper supports two construction modes:
44 - **Owning**: Pass by value to transfer ownership. The wrapper
45 allocates storage and owns the stream.
46 - **Reference**: Pass a pointer to wrap without ownership. The
47 pointed-to stream must outlive this wrapper.
48
49 @par Awaitable Preallocation
50 The constructor preallocates storage for the type-erased awaitable.
51 This reserves all virtual address space at server startup
52 so memory usage can be measured up front, rather than
53 allocating piecemeal as traffic arrives.
54
55 @par Immediate Completion
56 Operations complete immediately without suspending when the
57 buffer sequence is empty, or when the underlying stream's
58 awaitable reports readiness via `await_ready`.
59
60 @par Thread Safety
61 Not thread-safe. Concurrent operations on the same wrapper
62 are undefined behavior.
63
64 @par Example
65 @code
66 // Owning - takes ownership of the stream
67 any_write_stream stream(socket{ioc});
68
69 // Reference - wraps without ownership
70 socket sock(ioc);
71 any_write_stream stream(&sock);
72
73 const_buffer buf(data, size);
74 auto [ec, n] = co_await stream.write_some(std::span(&buf, 1));
75 @endcode
76
77 @see any_read_stream, any_stream, WriteStream
78 */
79 class any_write_stream
80 {
81 struct vtable;
82
83 template<WriteStream S>
84 struct vtable_for_impl;
85
86 // ordered for cache line coherence
87 void* stream_ = nullptr;
88 vtable const* vt_ = nullptr;
89 void* cached_awaitable_ = nullptr;
90 void* storage_ = nullptr;
91 bool awaitable_active_ = false;
92
93 public:
94 /** Destructor.
95
96 Destroys the owned stream (if any) and releases the cached
97 awaitable storage.
98 */
99 ~any_write_stream();
100
101 /** Default constructor.
102
103 Constructs an empty wrapper. Operations on a default-constructed
104 wrapper result in undefined behavior.
105 */
106 1x any_write_stream() = default;
107
108 /** Non-copyable.
109
110 The awaitable cache is per-instance and cannot be shared.
111 */
112 any_write_stream(any_write_stream const&) = delete;
113 any_write_stream& operator=(any_write_stream const&) = delete;
114
115 /** Move constructor.
116
117 Transfers ownership of the wrapped stream (if owned) and
118 cached awaitable storage from `other`. After the move, `other` is
119 in a default-constructed state.
120
121 @param other The wrapper to move from.
122 */
123 2x any_write_stream(any_write_stream&& other) noexcept
124 2x : stream_(std::exchange(other.stream_, nullptr))
125 2x , vt_(std::exchange(other.vt_, nullptr))
126 2x , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
127 2x , storage_(std::exchange(other.storage_, nullptr))
128 2x , awaitable_active_(std::exchange(other.awaitable_active_, false))
129 {
130 2x }
131
132 /** Move assignment operator.
133
134 Destroys any owned stream and releases existing resources,
135 then transfers ownership from `other`.
136
137 @param other The wrapper to move from.
138 @return Reference to this wrapper.
139 */
140 any_write_stream&
141 operator=(any_write_stream&& other) noexcept;
142
143 /** Construct by taking ownership of a WriteStream.
144
145 Allocates storage and moves the stream into this wrapper.
146 The wrapper owns the stream and will destroy it.
147
148 @param s The stream to take ownership of.
149 */
150 template<WriteStream S>
151 requires (!std::same_as<std::decay_t<S>, any_write_stream>)
152 any_write_stream(S s);
153
154 /** Construct by wrapping a WriteStream without ownership.
155
156 Wraps the given stream by pointer. The stream must remain
157 valid for the lifetime of this wrapper.
158
159 @param s Pointer to the stream to wrap.
160 */
161 template<WriteStream S>
162 any_write_stream(S* s);
163
164 /** Check if the wrapper contains a valid stream.
165
166 @return `true` if wrapping a stream, `false` if default-constructed
167 or moved-from.
168 */
169 bool
170 21x has_value() const noexcept
171 {
172 21x return stream_ != nullptr;
173 }
174
175 /** Check if the wrapper contains a valid stream.
176
177 @return `true` if wrapping a stream, `false` if default-constructed
178 or moved-from.
179 */
180 explicit
181 3x operator bool() const noexcept
182 {
183 3x return has_value();
184 }
185
186 /** Initiate an asynchronous write operation.
187
188 Writes data from the provided buffer sequence. The operation
189 completes when at least one byte has been written, or an error
190 occurs.
191
192 @param buffers The buffer sequence containing data to write.
193 Passed by value to ensure the sequence lives in the
194 coroutine frame across suspension points.
195
196 @return An awaitable yielding `(error_code,std::size_t)`.
197
198 @par Immediate Completion
199 The operation completes immediately without suspending
200 the calling coroutine when:
201 @li The buffer sequence is empty, returning `{error_code{}, 0}`.
202 @li The underlying stream's awaitable reports immediate
203 readiness via `await_ready`.
204
205 @note This is a partial operation and may not process the
206 entire buffer sequence. Use the composed @ref write algorithm
207 for guaranteed complete transfer.
208
209 @par Preconditions
210 The wrapper must contain a valid stream (`has_value() == true`).
211 */
212 template<ConstBufferSequence CB>
213 auto
214 write_some(CB buffers);
215
216 protected:
217 /** Rebind to a new stream after move.
218
219 Updates the internal pointer to reference a new stream object.
220 Used by owning wrappers after move assignment when the owned
221 object has moved to a new location.
222
223 @param new_stream The new stream to bind to. Must be the same
224 type as the original stream.
225
226 @note Terminates if called with a stream of different type
227 than the original.
228 */
229 template<WriteStream S>
230 void
231 rebind(S& new_stream) noexcept
232 {
233 if(vt_ != &vtable_for_impl<S>::value)
234 std::terminate();
235 stream_ = &new_stream;
236 }
237 };
238
239 //----------------------------------------------------------
240
241 struct any_write_stream::vtable
242 {
243 // ordered by call frequency for cache line coherence
244 void (*construct_awaitable)(
245 void* stream,
246 void* storage,
247 std::span<const_buffer const> buffers);
248 bool (*await_ready)(void*);
249 std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
250 io_result<std::size_t> (*await_resume)(void*);
251 void (*destroy_awaitable)(void*) noexcept;
252 std::size_t awaitable_size;
253 std::size_t awaitable_align;
254 void (*destroy)(void*) noexcept;
255 };
256
257 template<WriteStream S>
258 struct any_write_stream::vtable_for_impl
259 {
260 using Awaitable = decltype(std::declval<S&>().write_some(
261 std::span<const_buffer const>{}));
262
263 static void
264 1x do_destroy_impl(void* stream) noexcept
265 {
266 1x static_cast<S*>(stream)->~S();
267 1x }
268
269 static void
270 75x construct_awaitable_impl(
271 void* stream,
272 void* storage,
273 std::span<const_buffer const> buffers)
274 {
275 75x auto& s = *static_cast<S*>(stream);
276 75x ::new(storage) Awaitable(s.write_some(buffers));
277 75x }
278
279 static constexpr vtable value = {
280 &construct_awaitable_impl,
281 75x +[](void* p) {
282 75x return static_cast<Awaitable*>(p)->await_ready();
283 },
284 2x +[](void* p, std::coroutine_handle<> h, io_env const* env) {
285 2x return detail::call_await_suspend(
286 2x static_cast<Awaitable*>(p), h, env);
287 },
288 73x +[](void* p) {
289 73x return static_cast<Awaitable*>(p)->await_resume();
290 },
291 77x +[](void* p) noexcept {
292 12x static_cast<Awaitable*>(p)->~Awaitable();
293 },
294 sizeof(Awaitable),
295 alignof(Awaitable),
296 &do_destroy_impl
297 };
298 };
299
300 //----------------------------------------------------------
301
302 inline
303 95x any_write_stream::~any_write_stream()
304 {
305 95x if(storage_)
306 {
307 1x vt_->destroy(stream_);
308 1x ::operator delete(storage_);
309 }
310 95x if(cached_awaitable_)
311 {
312 85x if(awaitable_active_)
313 1x vt_->destroy_awaitable(cached_awaitable_);
314 85x ::operator delete(cached_awaitable_);
315 }
316 95x }
317
318 inline any_write_stream&
319 5x any_write_stream::operator=(any_write_stream&& other) noexcept
320 {
321 5x if(this != &other)
322 {
323 5x if(storage_)
324 {
325 vt_->destroy(stream_);
326 ::operator delete(storage_);
327 }
328 5x if(cached_awaitable_)
329 {
330 2x if(awaitable_active_)
331 1x vt_->destroy_awaitable(cached_awaitable_);
332 2x ::operator delete(cached_awaitable_);
333 }
334 5x stream_ = std::exchange(other.stream_, nullptr);
335 5x vt_ = std::exchange(other.vt_, nullptr);
336 5x cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
337 5x storage_ = std::exchange(other.storage_, nullptr);
338 5x awaitable_active_ = std::exchange(other.awaitable_active_, false);
339 }
340 5x return *this;
341 }
342
343 template<WriteStream S>
344 requires (!std::same_as<std::decay_t<S>, any_write_stream>)
345 1x any_write_stream::any_write_stream(S s)
346 1x : vt_(&vtable_for_impl<S>::value)
347 {
348 struct guard {
349 any_write_stream* self;
350 bool committed = false;
351 1x ~guard() {
352 1x if(!committed && self->storage_) {
353 self->vt_->destroy(self->stream_);
354 ::operator delete(self->storage_);
355 self->storage_ = nullptr;
356 self->stream_ = nullptr;
357 }
358 1x }
359 1x } g{this};
360
361 1x storage_ = ::operator new(sizeof(S));
362 1x stream_ = ::new(storage_) S(std::move(s));
363
364 // Preallocate the awaitable storage
365 1x cached_awaitable_ = ::operator new(vt_->awaitable_size);
366
367 1x g.committed = true;
368 1x }
369
370 template<WriteStream S>
371 86x any_write_stream::any_write_stream(S* s)
372 86x : stream_(s)
373 86x , vt_(&vtable_for_impl<S>::value)
374 {
375 // Preallocate the awaitable storage
376 86x cached_awaitable_ = ::operator new(vt_->awaitable_size);
377 86x }
378
379 //----------------------------------------------------------
380
381 template<ConstBufferSequence CB>
382 auto
383 79x any_write_stream::write_some(CB buffers)
384 {
385 struct awaitable
386 {
387 any_write_stream* self_;
388 const_buffer_array<detail::max_iovec_> ba_;
389
390 79x awaitable(
391 any_write_stream* self,
392 CB const& buffers) noexcept
393 79x : self_(self)
394 79x , ba_(buffers)
395 {
396 79x }
397
398 bool
399 79x await_ready() const noexcept
400 {
401 79x return ba_.to_span().empty();
402 }
403
404 std::coroutine_handle<>
405 75x await_suspend(std::coroutine_handle<> h, io_env const* env)
406 {
407 75x self_->vt_->construct_awaitable(
408 75x self_->stream_,
409 75x self_->cached_awaitable_,
410 75x ba_.to_span());
411 75x self_->awaitable_active_ = true;
412
413 75x if(self_->vt_->await_ready(self_->cached_awaitable_))
414 73x return h;
415
416 2x return self_->vt_->await_suspend(
417 2x self_->cached_awaitable_, h, env);
418 }
419
420 io_result<std::size_t>
421 77x await_resume()
422 {
423 77x if(!self_->awaitable_active_)
424 4x return {{}, 0};
425 struct guard {
426 any_write_stream* self;
427 73x ~guard() {
428 73x self->vt_->destroy_awaitable(self->cached_awaitable_);
429 73x self->awaitable_active_ = false;
430 73x }
431 73x } g{self_};
432 73x return self_->vt_->await_resume(
433 73x self_->cached_awaitable_);
434 73x }
435 };
436 79x return awaitable{this, buffers};
437 }
438
439 } // namespace capy
440 } // namespace boost
441
442 #endif
443