1  
//
1  
//
2  
// Copyright (c) 2026 Steve Gerbino
2  
// Copyright (c) 2026 Steve Gerbino
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/corosio
7  
// Official repository: https://github.com/cppalliance/corosio
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
10  
#ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
11  
#define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
11  
#define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
12  

12  

13  
#include <boost/corosio/detail/platform.hpp>
13  
#include <boost/corosio/detail/platform.hpp>
14  

14  

15  
#if BOOST_COROSIO_HAS_EPOLL
15  
#if BOOST_COROSIO_HAS_EPOLL
16  

16  

17  
#include <boost/corosio/detail/config.hpp>
17  
#include <boost/corosio/detail/config.hpp>
18  
#include <boost/capy/ex/execution_context.hpp>
18  
#include <boost/capy/ex/execution_context.hpp>
19  

19  

20  
#include <boost/corosio/native/detail/reactor/reactor_scheduler.hpp>
20  
#include <boost/corosio/native/detail/reactor/reactor_scheduler.hpp>
21  

21  

22  
#include <boost/corosio/native/detail/epoll/epoll_traits.hpp>
22  
#include <boost/corosio/native/detail/epoll/epoll_traits.hpp>
23  
#include <boost/corosio/detail/timer_service.hpp>
23  
#include <boost/corosio/detail/timer_service.hpp>
24  
#include <boost/corosio/native/detail/make_err.hpp>
24  
#include <boost/corosio/native/detail/make_err.hpp>
25  
#include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
25  
#include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
26  
#include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
26  
#include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
27  
#include <boost/corosio/native/detail/posix/posix_stream_file_service.hpp>
27  
#include <boost/corosio/native/detail/posix/posix_stream_file_service.hpp>
28  
#include <boost/corosio/native/detail/posix/posix_random_access_file_service.hpp>
28  
#include <boost/corosio/native/detail/posix/posix_random_access_file_service.hpp>
29  

29  

30  
#include <boost/corosio/detail/except.hpp>
30  
#include <boost/corosio/detail/except.hpp>
31  

31  

32  
#include <atomic>
32  
#include <atomic>
33  
#include <chrono>
33  
#include <chrono>
34  
#include <cstdint>
34  
#include <cstdint>
35  
#include <mutex>
35  
#include <mutex>
36  
#include <vector>
36  
#include <vector>
37  

37  

38  
#include <errno.h>
38  
#include <errno.h>
39  
#include <sys/epoll.h>
39  
#include <sys/epoll.h>
40  
#include <sys/eventfd.h>
40  
#include <sys/eventfd.h>
41  
#include <sys/timerfd.h>
41  
#include <sys/timerfd.h>
42  
#include <unistd.h>
42  
#include <unistd.h>
43  

43  

44  
namespace boost::corosio::detail {
44  
namespace boost::corosio::detail {
45  

45  

46  
/** Linux scheduler using epoll for I/O multiplexing.
46  
/** Linux scheduler using epoll for I/O multiplexing.
47  

47  

48  
    This scheduler implements the scheduler interface using Linux epoll
48  
    This scheduler implements the scheduler interface using Linux epoll
49  
    for efficient I/O event notification. It uses a single reactor model
49  
    for efficient I/O event notification. It uses a single reactor model
50  
    where one thread runs epoll_wait while other threads
50  
    where one thread runs epoll_wait while other threads
51  
    wait on a condition variable for handler work. This design provides:
51  
    wait on a condition variable for handler work. This design provides:
52  

52  

53  
    - Handler parallelism: N posted handlers can execute on N threads
53  
    - Handler parallelism: N posted handlers can execute on N threads
54  
    - No thundering herd: condition_variable wakes exactly one thread
54  
    - No thundering herd: condition_variable wakes exactly one thread
55  
    - IOCP parity: Behavior matches Windows I/O completion port semantics
55  
    - IOCP parity: Behavior matches Windows I/O completion port semantics
56  

56  

57  
    When threads call run(), they first try to execute queued handlers.
57  
    When threads call run(), they first try to execute queued handlers.
58  
    If the queue is empty and no reactor is running, one thread becomes
58  
    If the queue is empty and no reactor is running, one thread becomes
59  
    the reactor and runs epoll_wait. Other threads wait on a condition
59  
    the reactor and runs epoll_wait. Other threads wait on a condition
60  
    variable until handlers are available.
60  
    variable until handlers are available.
61  

61  

62  
    @par Thread Safety
62  
    @par Thread Safety
63  
    All public member functions are thread-safe.
63  
    All public member functions are thread-safe.
64  
*/
64  
*/
65  
class BOOST_COROSIO_DECL epoll_scheduler final : public reactor_scheduler
65  
class BOOST_COROSIO_DECL epoll_scheduler final : public reactor_scheduler
66  
{
66  
{
67  
public:
67  
public:
68  
    /** Construct the scheduler.
68  
    /** Construct the scheduler.
69  

69  

70  
        Creates an epoll instance, eventfd for reactor interruption,
70  
        Creates an epoll instance, eventfd for reactor interruption,
71  
        and timerfd for kernel-managed timer expiry.
71  
        and timerfd for kernel-managed timer expiry.
72  

72  

73  
        @param ctx Reference to the owning execution_context.
73  
        @param ctx Reference to the owning execution_context.
74  
        @param concurrency_hint Hint for expected thread count (unused).
74  
        @param concurrency_hint Hint for expected thread count (unused).
75  
    */
75  
    */
76  
    epoll_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
76  
    epoll_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
77  

77  

78  
    /// Destroy the scheduler.
78  
    /// Destroy the scheduler.
79  
    ~epoll_scheduler() override;
79  
    ~epoll_scheduler() override;
80  

80  

81  
    epoll_scheduler(epoll_scheduler const&)            = delete;
81  
    epoll_scheduler(epoll_scheduler const&)            = delete;
82  
    epoll_scheduler& operator=(epoll_scheduler const&) = delete;
82  
    epoll_scheduler& operator=(epoll_scheduler const&) = delete;
83  

83  

84  
    /// Shut down the scheduler, draining pending operations.
84  
    /// Shut down the scheduler, draining pending operations.
85  
    void shutdown() override;
85  
    void shutdown() override;
86  

86  

87  
    /// Apply runtime configuration, resizing the event buffer.
87  
    /// Apply runtime configuration, resizing the event buffer.
88  
    void configure_reactor(
88  
    void configure_reactor(
89  
        unsigned max_events,
89  
        unsigned max_events,
90  
        unsigned budget_init,
90  
        unsigned budget_init,
91  
        unsigned budget_max,
91  
        unsigned budget_max,
92  
        unsigned unassisted) override;
92  
        unsigned unassisted) override;
93  

93  

94  
    /** Return the epoll file descriptor.
94  
    /** Return the epoll file descriptor.
95  

95  

96  
        Used by socket services to register file descriptors
96  
        Used by socket services to register file descriptors
97  
        for I/O event notification.
97  
        for I/O event notification.
98  

98  

99  
        @return The epoll file descriptor.
99  
        @return The epoll file descriptor.
100  
    */
100  
    */
101  
    int epoll_fd() const noexcept
101  
    int epoll_fd() const noexcept
102  
    {
102  
    {
103  
        return epoll_fd_;
103  
        return epoll_fd_;
104  
    }
104  
    }
105  

105  

106  
    /** Register a descriptor for persistent monitoring.
106  
    /** Register a descriptor for persistent monitoring.
107  

107  

108  
        The fd is registered once and stays registered until explicitly
108  
        The fd is registered once and stays registered until explicitly
109  
        deregistered. Events are dispatched via reactor_descriptor_state which
109  
        deregistered. Events are dispatched via reactor_descriptor_state which
110  
        tracks pending read/write/connect operations.
110  
        tracks pending read/write/connect operations.
111  

111  

112  
        @param fd The file descriptor to register.
112  
        @param fd The file descriptor to register.
113  
        @param desc Pointer to descriptor data (stored in epoll_event.data.ptr).
113  
        @param desc Pointer to descriptor data (stored in epoll_event.data.ptr).
114  
    */
114  
    */
115  
    void register_descriptor(int fd, reactor_descriptor_state* desc) const;
115  
    void register_descriptor(int fd, reactor_descriptor_state* desc) const;
116  

116  

117  
    /** Deregister a persistently registered descriptor.
117  
    /** Deregister a persistently registered descriptor.
118  

118  

119  
        @param fd The file descriptor to deregister.
119  
        @param fd The file descriptor to deregister.
120  
    */
120  
    */
121  
    void deregister_descriptor(int fd) const;
121  
    void deregister_descriptor(int fd) const;
122  

122  

123  
private:
123  
private:
124  
    void
124  
    void
125  
    run_task(lock_type& lock, context_type* ctx,
125  
    run_task(lock_type& lock, context_type* ctx,
126  
        long timeout_us) override;
126  
        long timeout_us) override;
127  
    void interrupt_reactor() const override;
127  
    void interrupt_reactor() const override;
128  
    void update_timerfd() const;
128  
    void update_timerfd() const;
129  

129  

130  
    int epoll_fd_;
130  
    int epoll_fd_;
131  
    int event_fd_;
131  
    int event_fd_;
132  
    int timer_fd_;
132  
    int timer_fd_;
133  

133  

134  
    // Edge-triggered eventfd state
134  
    // Edge-triggered eventfd state
135  
    mutable std::atomic<bool> eventfd_armed_{false};
135  
    mutable std::atomic<bool> eventfd_armed_{false};
136  

136  

137  
    // Set when the earliest timer changes; flushed before epoll_wait
137  
    // Set when the earliest timer changes; flushed before epoll_wait
138  
    mutable std::atomic<bool> timerfd_stale_{false};
138  
    mutable std::atomic<bool> timerfd_stale_{false};
139  

139  

140  
    // Event buffer sized from max_events_per_poll_ (set at construction,
140  
    // Event buffer sized from max_events_per_poll_ (set at construction,
141  
    // resized by configure_reactor via io_context_options).
141  
    // resized by configure_reactor via io_context_options).
142  
    std::vector<epoll_event> event_buffer_;
142  
    std::vector<epoll_event> event_buffer_;
143  
};
143  
};
144  

144  

145  
inline epoll_scheduler::epoll_scheduler(capy::execution_context& ctx, int)
145  
inline epoll_scheduler::epoll_scheduler(capy::execution_context& ctx, int)
146  
    : epoll_fd_(-1)
146  
    : epoll_fd_(-1)
147  
    , event_fd_(-1)
147  
    , event_fd_(-1)
148  
    , timer_fd_(-1)
148  
    , timer_fd_(-1)
149  
    , event_buffer_(max_events_per_poll_)
149  
    , event_buffer_(max_events_per_poll_)
150  
{
150  
{
151  
    epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC);
151  
    epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC);
152  
    if (epoll_fd_ < 0)
152  
    if (epoll_fd_ < 0)
153  
        detail::throw_system_error(make_err(errno), "epoll_create1");
153  
        detail::throw_system_error(make_err(errno), "epoll_create1");
154  

154  

155  
    event_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
155  
    event_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
156  
    if (event_fd_ < 0)
156  
    if (event_fd_ < 0)
157  
    {
157  
    {
158  
        int errn = errno;
158  
        int errn = errno;
159  
        ::close(epoll_fd_);
159  
        ::close(epoll_fd_);
160  
        detail::throw_system_error(make_err(errn), "eventfd");
160  
        detail::throw_system_error(make_err(errn), "eventfd");
161  
    }
161  
    }
162  

162  

163  
    timer_fd_ = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
163  
    timer_fd_ = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
164  
    if (timer_fd_ < 0)
164  
    if (timer_fd_ < 0)
165  
    {
165  
    {
166  
        int errn = errno;
166  
        int errn = errno;
167  
        ::close(event_fd_);
167  
        ::close(event_fd_);
168  
        ::close(epoll_fd_);
168  
        ::close(epoll_fd_);
169  
        detail::throw_system_error(make_err(errn), "timerfd_create");
169  
        detail::throw_system_error(make_err(errn), "timerfd_create");
170  
    }
170  
    }
171  

171  

172  
    epoll_event ev{};
172  
    epoll_event ev{};
173  
    ev.events   = EPOLLIN | EPOLLET;
173  
    ev.events   = EPOLLIN | EPOLLET;
174  
    ev.data.ptr = nullptr;
174  
    ev.data.ptr = nullptr;
175  
    if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, event_fd_, &ev) < 0)
175  
    if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, event_fd_, &ev) < 0)
176  
    {
176  
    {
177  
        int errn = errno;
177  
        int errn = errno;
178  
        ::close(timer_fd_);
178  
        ::close(timer_fd_);
179  
        ::close(event_fd_);
179  
        ::close(event_fd_);
180  
        ::close(epoll_fd_);
180  
        ::close(epoll_fd_);
181  
        detail::throw_system_error(make_err(errn), "epoll_ctl");
181  
        detail::throw_system_error(make_err(errn), "epoll_ctl");
182  
    }
182  
    }
183  

183  

184  
    epoll_event timer_ev{};
184  
    epoll_event timer_ev{};
185  
    timer_ev.events   = EPOLLIN | EPOLLERR;
185  
    timer_ev.events   = EPOLLIN | EPOLLERR;
186  
    timer_ev.data.ptr = &timer_fd_;
186  
    timer_ev.data.ptr = &timer_fd_;
187  
    if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &timer_ev) < 0)
187  
    if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &timer_ev) < 0)
188  
    {
188  
    {
189  
        int errn = errno;
189  
        int errn = errno;
190  
        ::close(timer_fd_);
190  
        ::close(timer_fd_);
191  
        ::close(event_fd_);
191  
        ::close(event_fd_);
192  
        ::close(epoll_fd_);
192  
        ::close(epoll_fd_);
193  
        detail::throw_system_error(make_err(errn), "epoll_ctl (timerfd)");
193  
        detail::throw_system_error(make_err(errn), "epoll_ctl (timerfd)");
194  
    }
194  
    }
195  

195  

196  
    timer_svc_ = &get_timer_service(ctx, *this);
196  
    timer_svc_ = &get_timer_service(ctx, *this);
197  
    timer_svc_->set_on_earliest_changed(
197  
    timer_svc_->set_on_earliest_changed(
198  
        timer_service::callback(this, [](void* p) {
198  
        timer_service::callback(this, [](void* p) {
199  
            auto* self = static_cast<epoll_scheduler*>(p);
199  
            auto* self = static_cast<epoll_scheduler*>(p);
200  
            self->timerfd_stale_.store(true, std::memory_order_release);
200  
            self->timerfd_stale_.store(true, std::memory_order_release);
201  
            self->interrupt_reactor();
201  
            self->interrupt_reactor();
202  
        }));
202  
        }));
203  

203  

204  
    get_resolver_service(ctx, *this);
204  
    get_resolver_service(ctx, *this);
205  
    get_signal_service(ctx, *this);
205  
    get_signal_service(ctx, *this);
206  
    get_stream_file_service(ctx, *this);
206  
    get_stream_file_service(ctx, *this);
207  
    get_random_access_file_service(ctx, *this);
207  
    get_random_access_file_service(ctx, *this);
208  

208  

209  
    completed_ops_.push(&task_op_);
209  
    completed_ops_.push(&task_op_);
210  
}
210  
}
211  

211  

212  
inline epoll_scheduler::~epoll_scheduler()
212  
inline epoll_scheduler::~epoll_scheduler()
213  
{
213  
{
214  
    if (timer_fd_ >= 0)
214  
    if (timer_fd_ >= 0)
215  
        ::close(timer_fd_);
215  
        ::close(timer_fd_);
216  
    if (event_fd_ >= 0)
216  
    if (event_fd_ >= 0)
217  
        ::close(event_fd_);
217  
        ::close(event_fd_);
218  
    if (epoll_fd_ >= 0)
218  
    if (epoll_fd_ >= 0)
219  
        ::close(epoll_fd_);
219  
        ::close(epoll_fd_);
220  
}
220  
}
221  

221  

222  
inline void
222  
inline void
223  
epoll_scheduler::shutdown()
223  
epoll_scheduler::shutdown()
224  
{
224  
{
225  
    shutdown_drain();
225  
    shutdown_drain();
226  

226  

227  
    if (event_fd_ >= 0)
227  
    if (event_fd_ >= 0)
228  
        interrupt_reactor();
228  
        interrupt_reactor();
229  
}
229  
}
230  

230  

231  
inline void
231  
inline void
232  
epoll_scheduler::configure_reactor(
232  
epoll_scheduler::configure_reactor(
233  
    unsigned max_events,
233  
    unsigned max_events,
234  
    unsigned budget_init,
234  
    unsigned budget_init,
235  
    unsigned budget_max,
235  
    unsigned budget_max,
236  
    unsigned unassisted)
236  
    unsigned unassisted)
237  
{
237  
{
238  
    reactor_scheduler::configure_reactor(
238  
    reactor_scheduler::configure_reactor(
239  
        max_events, budget_init, budget_max, unassisted);
239  
        max_events, budget_init, budget_max, unassisted);
240  
    event_buffer_.resize(max_events_per_poll_);
240  
    event_buffer_.resize(max_events_per_poll_);
241  
}
241  
}
242  

242  

243  
inline void
243  
inline void
244  
epoll_scheduler::register_descriptor(int fd, reactor_descriptor_state* desc) const
244  
epoll_scheduler::register_descriptor(int fd, reactor_descriptor_state* desc) const
245  
{
245  
{
246  
    epoll_event ev{};
246  
    epoll_event ev{};
247  
    ev.events   = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP;
247  
    ev.events   = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP;
248  
    ev.data.ptr = desc;
248  
    ev.data.ptr = desc;
249  

249  

250  
    if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) < 0)
250  
    if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) < 0)
251  
        detail::throw_system_error(make_err(errno), "epoll_ctl (register)");
251  
        detail::throw_system_error(make_err(errno), "epoll_ctl (register)");
252  

252  

253  
    desc->registered_events = ev.events;
253  
    desc->registered_events = ev.events;
254  
    desc->fd                = fd;
254  
    desc->fd                = fd;
255  
    desc->scheduler_        = this;
255  
    desc->scheduler_        = this;
256  
    desc->mutex.set_enabled(!single_threaded_);
256  
    desc->mutex.set_enabled(!single_threaded_);
257  
    desc->ready_events_.store(0, std::memory_order_relaxed);
257  
    desc->ready_events_.store(0, std::memory_order_relaxed);
258  

258  

259  
    conditionally_enabled_mutex::scoped_lock lock(desc->mutex);
259  
    conditionally_enabled_mutex::scoped_lock lock(desc->mutex);
260  
    desc->impl_ref_.reset();
260  
    desc->impl_ref_.reset();
261  
    desc->read_ready  = false;
261  
    desc->read_ready  = false;
262  
    desc->write_ready = false;
262  
    desc->write_ready = false;
263  
}
263  
}
264  

264  

265  
inline void
265  
inline void
266  
epoll_scheduler::deregister_descriptor(int fd) const
266  
epoll_scheduler::deregister_descriptor(int fd) const
267  
{
267  
{
268  
    ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
268  
    ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
269  
}
269  
}
270  

270  

271  
inline void
271  
inline void
272  
epoll_scheduler::interrupt_reactor() const
272  
epoll_scheduler::interrupt_reactor() const
273  
{
273  
{
274  
    bool expected = false;
274  
    bool expected = false;
275  
    if (eventfd_armed_.compare_exchange_strong(
275  
    if (eventfd_armed_.compare_exchange_strong(
276  
            expected, true, std::memory_order_release,
276  
            expected, true, std::memory_order_release,
277  
            std::memory_order_relaxed))
277  
            std::memory_order_relaxed))
278  
    {
278  
    {
279  
        std::uint64_t val       = 1;
279  
        std::uint64_t val       = 1;
280  
        [[maybe_unused]] auto r = ::write(event_fd_, &val, sizeof(val));
280  
        [[maybe_unused]] auto r = ::write(event_fd_, &val, sizeof(val));
281  
    }
281  
    }
282  
}
282  
}
283  

283  

284  
inline void
284  
inline void
285  
epoll_scheduler::update_timerfd() const
285  
epoll_scheduler::update_timerfd() const
286  
{
286  
{
287  
    auto nearest = timer_svc_->nearest_expiry();
287  
    auto nearest = timer_svc_->nearest_expiry();
288  

288  

289  
    itimerspec ts{};
289  
    itimerspec ts{};
290  
    int flags = 0;
290  
    int flags = 0;
291  

291  

292  
    if (nearest == timer_service::time_point::max())
292  
    if (nearest == timer_service::time_point::max())
293  
    {
293  
    {
294  
        // No timers — disarm by setting to 0 (relative)
294  
        // No timers — disarm by setting to 0 (relative)
295  
    }
295  
    }
296  
    else
296  
    else
297  
    {
297  
    {
298  
        auto now = std::chrono::steady_clock::now();
298  
        auto now = std::chrono::steady_clock::now();
299  
        if (nearest <= now)
299  
        if (nearest <= now)
300  
        {
300  
        {
301  
            // Use 1ns instead of 0 — zero disarms the timerfd
301  
            // Use 1ns instead of 0 — zero disarms the timerfd
302  
            ts.it_value.tv_nsec = 1;
302  
            ts.it_value.tv_nsec = 1;
303  
        }
303  
        }
304  
        else
304  
        else
305  
        {
305  
        {
306  
            auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(
306  
            auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(
307  
                            nearest - now)
307  
                            nearest - now)
308  
                            .count();
308  
                            .count();
309  
            ts.it_value.tv_sec  = nsec / 1000000000;
309  
            ts.it_value.tv_sec  = nsec / 1000000000;
310  
            ts.it_value.tv_nsec = nsec % 1000000000;
310  
            ts.it_value.tv_nsec = nsec % 1000000000;
311  
            if (ts.it_value.tv_sec == 0 && ts.it_value.tv_nsec == 0)
311  
            if (ts.it_value.tv_sec == 0 && ts.it_value.tv_nsec == 0)
312  
                ts.it_value.tv_nsec = 1;
312  
                ts.it_value.tv_nsec = 1;
313  
        }
313  
        }
314  
    }
314  
    }
315  

315  

316  
    if (::timerfd_settime(timer_fd_, flags, &ts, nullptr) < 0)
316  
    if (::timerfd_settime(timer_fd_, flags, &ts, nullptr) < 0)
317  
        detail::throw_system_error(make_err(errno), "timerfd_settime");
317  
        detail::throw_system_error(make_err(errno), "timerfd_settime");
318  
}
318  
}
319  

319  

320  
inline void
320  
inline void
321  
epoll_scheduler::run_task(
321  
epoll_scheduler::run_task(
322  
    lock_type& lock, context_type* ctx, long timeout_us)
322  
    lock_type& lock, context_type* ctx, long timeout_us)
323  
{
323  
{
324  
    int timeout_ms;
324  
    int timeout_ms;
325  
    if (task_interrupted_)
325  
    if (task_interrupted_)
326  
        timeout_ms = 0;
326  
        timeout_ms = 0;
327  
    else if (timeout_us < 0)
327  
    else if (timeout_us < 0)
328  
        timeout_ms = -1;
328  
        timeout_ms = -1;
329  
    else
329  
    else
330  
        timeout_ms = static_cast<int>((timeout_us + 999) / 1000);
330  
        timeout_ms = static_cast<int>((timeout_us + 999) / 1000);
331  

331  

332  
    if (lock.owns_lock())
332  
    if (lock.owns_lock())
333  
        lock.unlock();
333  
        lock.unlock();
334  

334  

335  
    task_cleanup on_exit{this, &lock, ctx};
335  
    task_cleanup on_exit{this, &lock, ctx};
336  

336  

337  
    // Flush deferred timerfd programming before blocking
337  
    // Flush deferred timerfd programming before blocking
338  
    if (timerfd_stale_.exchange(false, std::memory_order_acquire))
338  
    if (timerfd_stale_.exchange(false, std::memory_order_acquire))
339  
        update_timerfd();
339  
        update_timerfd();
340  

340  

341  
    int nfds = ::epoll_wait(
341  
    int nfds = ::epoll_wait(
342  
        epoll_fd_, event_buffer_.data(),
342  
        epoll_fd_, event_buffer_.data(),
343  
        static_cast<int>(event_buffer_.size()), timeout_ms);
343  
        static_cast<int>(event_buffer_.size()), timeout_ms);
344  

344  

345  
    if (nfds < 0 && errno != EINTR)
345  
    if (nfds < 0 && errno != EINTR)
346  
        detail::throw_system_error(make_err(errno), "epoll_wait");
346  
        detail::throw_system_error(make_err(errno), "epoll_wait");
347  

347  

348  
    bool check_timers = false;
348  
    bool check_timers = false;
349  
    op_queue local_ops;
349  
    op_queue local_ops;
350  

350  

351  
    for (int i = 0; i < nfds; ++i)
351  
    for (int i = 0; i < nfds; ++i)
352  
    {
352  
    {
353  
        if (event_buffer_[i].data.ptr == nullptr)
353  
        if (event_buffer_[i].data.ptr == nullptr)
354  
        {
354  
        {
355  
            std::uint64_t val;
355  
            std::uint64_t val;
356  
            // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
356  
            // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
357  
            [[maybe_unused]] auto r = ::read(event_fd_, &val, sizeof(val));
357  
            [[maybe_unused]] auto r = ::read(event_fd_, &val, sizeof(val));
358  
            eventfd_armed_.store(false, std::memory_order_relaxed);
358  
            eventfd_armed_.store(false, std::memory_order_relaxed);
359  
            continue;
359  
            continue;
360  
        }
360  
        }
361  

361  

362  
        if (event_buffer_[i].data.ptr == &timer_fd_)
362  
        if (event_buffer_[i].data.ptr == &timer_fd_)
363  
        {
363  
        {
364  
            std::uint64_t expirations;
364  
            std::uint64_t expirations;
365  
            // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
365  
            // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
366  
            [[maybe_unused]] auto r =
366  
            [[maybe_unused]] auto r =
367  
                ::read(timer_fd_, &expirations, sizeof(expirations));
367  
                ::read(timer_fd_, &expirations, sizeof(expirations));
368  
            check_timers = true;
368  
            check_timers = true;
369  
            continue;
369  
            continue;
370  
        }
370  
        }
371  

371  

372  
        auto* desc =
372  
        auto* desc =
373  
            static_cast<reactor_descriptor_state*>(event_buffer_[i].data.ptr);
373  
            static_cast<reactor_descriptor_state*>(event_buffer_[i].data.ptr);
374  
        desc->add_ready_events(event_buffer_[i].events);
374  
        desc->add_ready_events(event_buffer_[i].events);
375  

375  

376  
        bool expected = false;
376  
        bool expected = false;
377  
        if (desc->is_enqueued_.compare_exchange_strong(
377  
        if (desc->is_enqueued_.compare_exchange_strong(
378  
                expected, true, std::memory_order_release,
378  
                expected, true, std::memory_order_release,
379  
                std::memory_order_relaxed))
379  
                std::memory_order_relaxed))
380  
        {
380  
        {
381  
            local_ops.push(desc);
381  
            local_ops.push(desc);
382  
        }
382  
        }
383  
    }
383  
    }
384  

384  

385  
    if (check_timers)
385  
    if (check_timers)
386  
    {
386  
    {
387  
        timer_svc_->process_expired();
387  
        timer_svc_->process_expired();
388  
        update_timerfd();
388  
        update_timerfd();
389  
    }
389  
    }
390  

390  

391  
    lock.lock();
391  
    lock.lock();
392  

392  

393  
    if (!local_ops.empty())
393  
    if (!local_ops.empty())
394  
        completed_ops_.splice(local_ops);
394  
        completed_ops_.splice(local_ops);
395  
}
395  
}
396  

396  

397  
} // namespace boost::corosio::detail
397  
} // namespace boost::corosio::detail
398  

398  

399  
#endif // BOOST_COROSIO_HAS_EPOLL
399  
#endif // BOOST_COROSIO_HAS_EPOLL
400  

400  

401  
#endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
401  
#endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP