Event loop simplified in Rust
This blog post will explain and guide you through the motivation for creating an event loop and how I did it, with detailed explanations along the way.
The motivation for creating an event loop is not because I'm bored; it's because Tokio does not have built-in ICMP or raw socket support. We needed an AsyncRawSocket
that allows for the ICMP protocol, so we can do a low-overhead ping to tens of thousands of servers simultaneously.
Note that there is no way to have an asynchronous system without operating system support! You either have:
- Asynchronous operating system primitives like
epoll
- Synchronous calls with multiple threads (not covered by this blog post)
In this tutorial-style post, I'm going to show how Devolutions Gateway solves this problem.
Let's start by appreciating the polling crate by smol-rs. Without their lovely cross-platform abstraction for network I/O, building this would be ten times harder.
The overall idea
The concept is simple:
- Create a socket.
- Create a future to register the socket and your intentions with the event loop.
- Wait until the system tells you something happened.
- Then you resolve the future.
Step by step
1. Create an async socket and register the socket
As mentioned before, there's no way to create an async socket without OS-level support. We can look at the socket implementation file.
In particular, this single line of code is the most significant: self.poller.add(&socket, Event::all(id))?;
this tells the OS that we are intersted in those event for this socket.
NOTE: Interest in I/O events needs to be re-enabled using modify() again after an event is delivered if we’re interested in the next event of the same kind. polling doc
// runtime.rs
pub fn new_socket(
self: &Arc<Self>,
domain: socket2::Domain,
ty: socket2::Type,
protocol: Option<socket2::Protocol>,
) -> anyhow::Result<AsyncRawSocket> {
let socket = Socket::new(domain, ty, protocol)?;
let id = self.next_socket_id.fetch_add(1, Ordering::SeqCst);
unsafe {
self.poller.add(&socket, Event::all(id))?; // register event listener
}
Ok(AsyncRawSocket::from_socket(socket, id, Arc::clone(self))?)
}
// socket.rs
pub(crate) fn from_socket(
socket: Socket,
id: usize,
runtime: Arc<Socket2Runtime>,
) -> std::io::Result<AsyncRawSocket> {
let socket = Arc::new(socket);
socket.set_nonblocking(true)?; // a must
Ok(AsyncRawSocket { socket, id, runtime })
}
Here, set_nonblocking(true)
invokes fcntl(fd, F_SETFL, O_NONBLOCK)
on Unix systems, while self.poller.add(&socket, Event::all(id))?
maps to epoll_ctl(EPOLL_CTL_ADD, fd, &event)
. Both calls are direct system calls; the poller
and socket
modules are just thin abstractions provided by the polling
crate for cross-platform support.
2. Start an event loop and listen for events
We assume you already have some background knowledge about how async works in Rust. If not, I recommend watching "The Crust of Rust" by Jon Gjengset.
The event loop
How do we build an event loop? At a high level, it's just a loop:
loop {
let event = listen_for_event_to_happen();
// do a few other things in between
for subscriber in subscribers {
if subscriber.interested_in(event) {
subscriber.notify(event)
}
}
}
It's that simple. In our case, listen_for_event_to_happen
corresponds to calling the poller, as seen here:
if let Err(error) = poller.wait(&mut events, Some(Duration::from_millis(200))) {
error!(%error, "Failed to poll events");
is_terminated.store(true, Ordering::SeqCst);
break;
};
we'll explain the significance of the 200-millisecond timeout in a later section.
Under the hood, poller.wait
is also a direct system call that corresponds to epoll_wait
on Unix (and equivalent APIs on other platforms).
Notifying subscribers
Once we've collected events from the poller, the next job of the event loop is to notify the tasks that are waiting for those events. In our implementation, these subscribers are simply stored as a set of (EventWrapper, Waker)
pairs. When we detect that a certain event has occurred, we look into our event_history
to check if it’s present, and if so, we call the wake_by_ref()
method on its corresponding waker:
Note:
EventWrapper
is a jsut wrapper that implementsHash
so we can useEvent
as a key in ourHashMap
.
for (event, waker) in events_registered.iter() {
if event_history.lock().get(event).is_some() {
waker.wake_by_ref();
}
}
This is how the waiting future gets scheduled again by the executor. The Waker
is what Rust’s async runtime uses to keep track of which tasks want to be woken up. Once the socket becomes readable or writable (or any other event you're watching for), the future that was waiting on it will be polled again — and will now be able to proceed.
It’s a clean and effective design: the loop listens, stores results, and wakes futures that care.
Subscribing to an event
To allow a future to be woken up, it first needs to register interest in a specific event. This is done through the register
method:
pub(crate) fn register(&self, socket: &Socket, event: Event, waker: Waker) -> anyhow::Result<()> {
if self.is_terminated.load(Ordering::Acquire) {
Err(ScannnerNetError::AsyncRuntimeError("runtime is terminated".to_owned()))?;
}
trace!(?event, ?socket, "Registering event");
self.poller.modify(socket, event)?;
self.register_sender
.try_send(RegisterEvent::Register { event, waker })
.with_context(|| "failed to send register event to register loop")
}
This method notifies the OS that we're now interested in a new event on the socket using poller.modify
, and then pushes a message to our channel so the event loop can keep track of the waker.
The registration message is then picked up by the event loop on each iteration:
while let Ok(event) = register_receiver.try_recv() {
match event {
RegisterEvent::Register { event, waker } => {
events_registered.insert(event.into(), waker);
}
RegisterEvent::Unregister { event } => {
events_registered.remove(&event.into());
}
}
}
In more performance-sensitive environments, registration could be handled by a separate thread or async task to reduce contention with the I/O thread. But in our case, a 200ms polling loop is acceptable because ping latency isn’t that time-sensitive and network responses are inherently unpredictable.
Where the Waker
comes from
Let’s trace where this waker actually comes from. Consider this method on the AsyncRawSocket
struct:
pub fn send_to(&self, data: &'a [u8], addr: &'a SockAddr) -> impl Future<Output = std::io::Result<usize>> + 'a {
SendToFuture {
socket: Arc::clone(&self.socket),
runtime: Arc::clone(&self.runtime),
data,
addr,
id: self.id,
}
}
The send_to
method only returns a future, which is implemented as below.
impl Future for SendFuture<'_> {
type Output = std::io::Result<usize>;
fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Self::Output> {
match self.socket.send(self.data) {
Ok(a) => std::task::Poll::Ready(Ok(a)),
Err(e) => resolve(e, &self.socket, &self.runtime, Event::writable(self.id), cx.waker()),
}
}
}
Very easy to understand, if send is finished, we simply return Ok(size)
, though 99% of the time, at first execution, it will return a WouldBlock
error, but this is expected.
If there's an error (which contains the WouldBlock
error variant), whe call resolve, which handles the event registration.
fn resolve<T>(
error: std::io::Error,
socket: &Arc<Socket>,
runtime: &Arc<Socket2Runtime>,
event: Event,
waker: &std::task::Waker,
) -> std::task::Poll<std::io::Result<T>> {
if error.kind() == std::io::ErrorKind::WouldBlock {
if let Err(e) = runtime.register(socket, event, waker.clone()) {
return std::task::Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::Other,
format!("failed to register socket to poller: {}", e),
)));
}
return std::task::Poll::Pending;
}
std::task::Poll::Ready(Err(error))
}
This is how async readiness works. You try the I/O. If it fails with WouldBlock
, you register interest and return Poll::Pending
. The executor takes over, and when the event occurs, the waker gets triggered.
Summary
The event loop is just a dedicated thread that listens for I/O readiness and wakes tasks that need to proceed. Once the event loop is running, you can start using raw sockets in async style — like this:
socket.send_to(&packet_bytes, &addr).await?;
You’ll find this usage in production code here.
One last note: resource cleanup is important. Be sure to check out how we manage socket lifecycle with Drop
to avoid resource leaks.