include/boost/corosio/local_stream_socket.hpp

87.5% Lines (14/16) 85.7% List of functions (6/7)
local_stream_socket.hpp
f(x) Functions (7)
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Michael Vandeberg
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_LOCAL_STREAM_SOCKET_HPP
11 #define BOOST_COROSIO_LOCAL_STREAM_SOCKET_HPP
12
13 #include <boost/corosio/detail/config.hpp>
14 #include <boost/corosio/detail/platform.hpp>
15 #include <boost/corosio/detail/except.hpp>
16 #include <boost/corosio/detail/native_handle.hpp>
17 #include <boost/corosio/detail/op_base.hpp>
18 #include <boost/corosio/io/io_stream.hpp>
19 #include <boost/capy/io_result.hpp>
20 #include <boost/corosio/detail/buffer_param.hpp>
21 #include <boost/corosio/local_endpoint.hpp>
22 #include <boost/corosio/local_stream.hpp>
23 #include <boost/corosio/shutdown_type.hpp>
24 #include <boost/capy/ex/executor_ref.hpp>
25 #include <boost/capy/ex/execution_context.hpp>
26 #include <boost/capy/ex/io_env.hpp>
27 #include <boost/capy/concept/executor.hpp>
28
29 #include <system_error>
30
31 #include <concepts>
32 #include <coroutine>
33 #include <cstddef>
34 #include <stop_token>
35 #include <type_traits>
36
37 namespace boost::corosio {
38
39 /** An asynchronous Unix stream socket for coroutine I/O.
40
41 This class provides asynchronous Unix domain stream socket
42 operations that return awaitable types. Each operation
43 participates in the affine awaitable protocol, ensuring
44 coroutines resume on the correct executor.
45
46 The socket must be opened before performing I/O operations.
47 Operations support cancellation through `std::stop_token` via
48 the affine protocol, or explicitly through the `cancel()`
49 member function.
50
51 @par Thread Safety
52 Distinct objects: Safe.@n
53 Shared objects: Unsafe. A socket must not have concurrent
54 operations of the same type (e.g., two simultaneous reads).
55 One read and one write may be in flight simultaneously.
56
57 @par Semantics
58 Wraps the platform Unix domain socket stack. Operations
59 dispatch to OS socket APIs via the io_context backend
60 (epoll, kqueue, select, or IOCP). Satisfies @ref capy::Stream.
61
62 @par Example
63 @code
64 io_context ioc;
65 local_stream_socket s(ioc);
66 s.open();
67
68 auto [ec] = co_await s.connect(local_endpoint("/tmp/my.sock"));
69 if (ec)
70 co_return;
71
72 char buf[1024];
73 auto [read_ec, n] = co_await s.read_some(
74 capy::mutable_buffer(buf, sizeof(buf)));
75 @endcode
76 */
77 class BOOST_COROSIO_DECL local_stream_socket : public io_stream
78 {
79 public:
80 /// The endpoint type used by this socket.
81 using endpoint_type = corosio::local_endpoint;
82
83 using shutdown_type = corosio::shutdown_type;
84 using enum corosio::shutdown_type;
85
86 /** Define backend hooks for local stream socket operations.
87
88 Platform backends (epoll, kqueue, select) derive from this
89 to implement socket I/O, connection, and option management.
90 */
91 struct implementation : io_stream::implementation
92 {
93 /** Initiate an asynchronous connect to the given endpoint.
94
95 @param h Coroutine handle to resume on completion.
96 @param ex Executor for dispatching the completion.
97 @param ep The local endpoint (path) to connect to.
98 @param token Stop token for cancellation.
99 @param ec Output error code.
100
101 @return Coroutine handle to resume immediately.
102 */
103 virtual std::coroutine_handle<> connect(
104 std::coroutine_handle<> h,
105 capy::executor_ref ex,
106 corosio::local_endpoint ep,
107 std::stop_token token,
108 std::error_code* ec) = 0;
109
110 /** Shut down the socket for the given direction(s).
111
112 @param what The shutdown direction.
113
114 @return Error code on failure, empty on success.
115 */
116 virtual std::error_code shutdown(shutdown_type what) noexcept = 0;
117
118 /// Return the platform socket descriptor.
119 virtual native_handle_type native_handle() const noexcept = 0;
120
121 /** Release ownership of the native socket handle.
122
123 Deregisters the socket from the reactor without closing
124 the descriptor. The caller takes ownership.
125
126 @return The native handle.
127 */
128 virtual native_handle_type release_socket() noexcept = 0;
129
130 /** Request cancellation of pending asynchronous operations.
131
132 All outstanding operations complete with operation_canceled error.
133 Check `ec == cond::canceled` for portable comparison.
134 */
135 virtual void cancel() noexcept = 0;
136
137 /** Set a socket option.
138
139 @param level The protocol level (e.g. `SOL_SOCKET`).
140 @param optname The option name (e.g. `SO_KEEPALIVE`).
141 @param data Pointer to the option value.
142 @param size Size of the option value in bytes.
143 @return Error code on failure, empty on success.
144 */
145 virtual std::error_code set_option(
146 int level,
147 int optname,
148 void const* data,
149 std::size_t size) noexcept = 0;
150
151 /** Get a socket option.
152
153 @param level The protocol level (e.g. `SOL_SOCKET`).
154 @param optname The option name (e.g. `SO_KEEPALIVE`).
155 @param data Pointer to receive the option value.
156 @param size On entry, the size of the buffer. On exit,
157 the size of the option value.
158 @return Error code on failure, empty on success.
159 */
160 virtual std::error_code
161 get_option(int level, int optname, void* data, std::size_t* size)
162 const noexcept = 0;
163
164 /// Return the cached local endpoint.
165 virtual corosio::local_endpoint local_endpoint() const noexcept = 0;
166
167 /// Return the cached remote endpoint.
168 virtual corosio::local_endpoint remote_endpoint() const noexcept = 0;
169 };
170
171 /// Represent the awaitable returned by @ref connect.
172 struct connect_awaitable
173 : detail::void_op_base<connect_awaitable>
174 {
175 local_stream_socket& s_;
176 corosio::local_endpoint endpoint_;
177
178 4x connect_awaitable(
179 local_stream_socket& s, corosio::local_endpoint ep) noexcept
180 4x : s_(s), endpoint_(ep) {}
181
182 4x std::coroutine_handle<> dispatch(
183 std::coroutine_handle<> h, capy::executor_ref ex) const
184 {
185 4x return s_.get().connect(h, ex, endpoint_, token_, &ec_);
186 }
187 };
188
189 public:
190 /** Destructor.
191
192 Closes the socket if open, cancelling any pending operations.
193 */
194 ~local_stream_socket() override;
195
196 /** Construct a socket from an execution context.
197
198 @param ctx The execution context that will own this socket.
199 */
200 explicit local_stream_socket(capy::execution_context& ctx);
201
202 /** Construct a socket from an executor.
203
204 The socket is associated with the executor's context.
205
206 @param ex The executor whose context will own the socket.
207 */
208 template<class Ex>
209 requires(!std::same_as<std::remove_cvref_t<Ex>, local_stream_socket>) &&
210 capy::Executor<Ex>
211 explicit local_stream_socket(Ex const& ex) : local_stream_socket(ex.context())
212 {
213 }
214
215 /** Move constructor.
216
217 Transfers ownership of the socket resources.
218
219 @param other The socket to move from.
220
221 @pre No awaitables returned by @p other's methods exist.
222 @pre The execution context associated with @p other must
223 outlive this socket.
224 */
225 22x local_stream_socket(local_stream_socket&& other) noexcept
226 22x : io_object(std::move(other))
227 {
228 22x }
229
230 /** Move assignment operator.
231
232 Closes any existing socket and transfers ownership.
233
234 @param other The socket to move from.
235
236 @pre No awaitables returned by either `*this` or @p other's
237 methods exist.
238 @pre The execution context associated with @p other must
239 outlive this socket.
240
241 @return Reference to this socket.
242 */
243 local_stream_socket& operator=(local_stream_socket&& other) noexcept
244 {
245 if (this != &other)
246 {
247 close();
248 io_object::operator=(std::move(other));
249 }
250 return *this;
251 }
252
253 local_stream_socket(local_stream_socket const&) = delete;
254 local_stream_socket& operator=(local_stream_socket const&) = delete;
255
256 /** Open the socket.
257
258 Creates a Unix stream socket and associates it with
259 the platform reactor.
260
261 @param proto The protocol. Defaults to local_stream{}.
262
263 @throws std::system_error on failure.
264 */
265 void open(local_stream proto = {});
266
267 /** Close the socket.
268
269 Releases socket resources. Any pending operations complete
270 with `errc::operation_canceled`.
271 */
272 void close();
273
274 /** Check if the socket is open.
275
276 @return `true` if the socket is open and ready for operations.
277 */
278 118x bool is_open() const noexcept
279 {
280 #if BOOST_COROSIO_HAS_IOCP && !defined(BOOST_COROSIO_MRDOCS)
281 return h_ && get().native_handle() != ~native_handle_type(0);
282 #else
283 118x return h_ && get().native_handle() >= 0;
284 #endif
285 }
286
287 /** Initiate an asynchronous connect operation.
288
289 If the socket is not already open, it is opened automatically.
290
291 @param ep The local endpoint (path) to connect to.
292
293 @return An awaitable that completes with io_result<>.
294
295 @throws std::system_error if the socket needs to be opened
296 and the open fails.
297 */
298 4x auto connect(corosio::local_endpoint ep)
299 {
300 4x if (!is_open())
301 open();
302 4x return connect_awaitable(*this, ep);
303 }
304
305 /** Cancel any pending asynchronous operations.
306
307 All outstanding operations complete with `errc::operation_canceled`.
308 Check `ec == cond::canceled` for portable comparison.
309 */
310 void cancel();
311
312 /** Get the native socket handle.
313
314 Returns the underlying platform-specific socket descriptor.
315 On POSIX systems this is an `int` file descriptor.
316
317 @return The native socket handle, or an invalid sentinel
318 if not open.
319 */
320 native_handle_type native_handle() const noexcept;
321
322 /** Query the number of bytes available for reading.
323
324 @return The number of bytes that can be read without blocking.
325
326 @throws std::logic_error if the socket is not open.
327 @throws std::system_error on ioctl failure.
328 */
329 std::size_t available() const;
330
331 /** Release ownership of the native socket handle.
332
333 Deregisters the socket from the backend and cancels pending
334 operations without closing the descriptor. The caller takes
335 ownership of the returned handle.
336
337 @return The native handle.
338
339 @throws std::logic_error if the socket is not open.
340
341 @post is_open() == false
342 */
343 native_handle_type release();
344
345 /** Disable sends or receives on the socket.
346
347 Unix stream connections are full-duplex: each direction
348 (send and receive) operates independently. This function
349 allows you to close one or both directions without
350 destroying the socket.
351
352 @param what Determines what operations will no longer
353 be allowed.
354
355 @throws std::system_error on failure.
356 */
357 void shutdown(shutdown_type what);
358
359 /** Shut down part or all of the socket (non-throwing).
360
361 @param what Which direction to shut down.
362 @param ec Set to the error code on failure.
363 */
364 void shutdown(shutdown_type what, std::error_code& ec) noexcept;
365
366 /** Set a socket option.
367
368 Applies a type-safe socket option to the underlying socket.
369 The option type encodes the protocol level and option name.
370
371 @param opt The option to set.
372
373 @throws std::logic_error if the socket is not open.
374 @throws std::system_error on failure.
375 */
376 template<class Option>
377 void set_option(Option const& opt)
378 {
379 if (!is_open())
380 detail::throw_logic_error("set_option: socket not open");
381 std::error_code ec = get().set_option(
382 Option::level(), Option::name(), opt.data(), opt.size());
383 if (ec)
384 detail::throw_system_error(ec, "local_stream_socket::set_option");
385 }
386
387 /** Get a socket option.
388
389 Retrieves the current value of a type-safe socket option.
390
391 @return The current option value.
392
393 @throws std::logic_error if the socket is not open.
394 @throws std::system_error on failure.
395 */
396 template<class Option>
397 Option get_option() const
398 {
399 if (!is_open())
400 detail::throw_logic_error("get_option: socket not open");
401 Option opt{};
402 std::size_t sz = opt.size();
403 std::error_code ec =
404 get().get_option(Option::level(), Option::name(), opt.data(), &sz);
405 if (ec)
406 detail::throw_system_error(ec, "local_stream_socket::get_option");
407 opt.resize(sz);
408 return opt;
409 }
410
411 /** Assign an existing file descriptor to this socket.
412
413 The socket must not already be open. The fd is adopted
414 and registered with the platform reactor. Used by
415 make_local_stream_pair() to wrap socketpair() fds.
416
417 @param fd The file descriptor to adopt. Must be a valid,
418 open, non-blocking Unix stream socket.
419
420 @throws std::system_error on failure.
421 */
422 void assign(native_handle_type fd);
423
424 /** Get the local endpoint of the socket.
425
426 Returns the local address (path) to which the socket is bound.
427 The endpoint is cached when the connection is established.
428
429 @return The local endpoint, or a default endpoint if the socket
430 is not connected.
431 */
432 corosio::local_endpoint local_endpoint() const noexcept;
433
434 /** Get the remote endpoint of the socket.
435
436 Returns the remote address (path) to which the socket is connected.
437 The endpoint is cached when the connection is established.
438
439 @return The remote endpoint, or a default endpoint if the socket
440 is not connected.
441 */
442 corosio::local_endpoint remote_endpoint() const noexcept;
443
444 protected:
445 local_stream_socket() noexcept = default;
446
447 explicit local_stream_socket(handle h) noexcept : io_object(std::move(h)) {}
448
449 private:
450 friend class local_stream_acceptor;
451
452 void open_for_family(int family, int type, int protocol);
453
454 104x inline implementation& get() const noexcept
455 {
456 104x return *static_cast<implementation*>(h_.get());
457 }
458 };
459
460 } // namespace boost::corosio
461
462 #endif // BOOST_COROSIO_LOCAL_STREAM_SOCKET_HPP
463