Event loop simplified in Rust

This blog post will explain and guide you through the motivation for creating an event loop and how I did it, with detailed explanations along the way.

The motivation for creating an event loop is not because I'm bored; it's because Tokio does not have built-in ICMP or raw socket support. We needed an AsyncRawSocket that allows for the ICMP protocol, so we can do a low-overhead ping to tens of thousands of servers simultaneously.

Note that there is no way to have an asynchronous system without operating system support! You either have:

  1. Asynchronous operating system primitives like epoll
  2. Synchronous calls with multiple threads (not covered by this blog post)

In this tutorial-style post, I'm going to show how Devolutions Gateway solves this problem.

Let's start by appreciating the polling crate by smol-rs. Without their lovely cross-platform abstraction for network I/O, building this would be ten times harder.

The overall idea

The concept is simple:

  1. Create a socket.
  2. Create a future to register the socket and your intentions with the event loop.
  3. Wait until the system tells you something happened.
  4. Then you resolve the future.

Step by step

1. Create an async socket and register the socket

As mentioned before, there's no way to create an async socket without OS-level support. We can look at the socket implementation file.

In particular, this single line of code is the most significant: self.poller.add(&socket, Event::all(id))?; this tells the OS that we are intersted in those event for this socket.

NOTE: Interest in I/O events needs to be re-enabled using modify() again after an event is delivered if we’re interested in the next event of the same kind. polling doc

// runtime.rs
    pub fn new_socket(
        self: &Arc<Self>,
        domain: socket2::Domain,
        ty: socket2::Type,
        protocol: Option<socket2::Protocol>,
    ) -> anyhow::Result<AsyncRawSocket> {
        let socket = Socket::new(domain, ty, protocol)?;
        let id = self.next_socket_id.fetch_add(1, Ordering::SeqCst);
        
        unsafe {
            self.poller.add(&socket, Event::all(id))?; // register event listener
        }
        Ok(AsyncRawSocket::from_socket(socket, id, Arc::clone(self))?)
    }
 // socket.rs
 pub(crate) fn from_socket(
        socket: Socket,
        id: usize,
        runtime: Arc<Socket2Runtime>,
    ) -> std::io::Result<AsyncRawSocket> {
        let socket = Arc::new(socket);
        socket.set_nonblocking(true)?; // a must
        Ok(AsyncRawSocket { socket, id, runtime })
    }

Here, set_nonblocking(true) invokes fcntl(fd, F_SETFL, O_NONBLOCK) on Unix systems, while self.poller.add(&socket, Event::all(id))? maps to epoll_ctl(EPOLL_CTL_ADD, fd, &event). Both calls are direct system calls; the poller and socket modules are just thin abstractions provided by the polling crate for cross-platform support.

2. Start an event loop and listen for events

We assume you already have some background knowledge about how async works in Rust. If not, I recommend watching "The Crust of Rust" by Jon Gjengset.

The event loop

How do we build an event loop? At a high level, it's just a loop:

loop {
    let event = listen_for_event_to_happen();

    // do a few other things in between
    for subscriber in subscribers {
        if subscriber.interested_in(event) {
            subscriber.notify(event)
        }
    }
}

It's that simple. In our case, listen_for_event_to_happen corresponds to calling the poller, as seen here:

if let Err(error) = poller.wait(&mut events, Some(Duration::from_millis(200))) {
    error!(%error, "Failed to poll events");
    is_terminated.store(true, Ordering::SeqCst);
    break;
};

we'll explain the significance of the 200-millisecond timeout in a later section.

Under the hood, poller.wait is also a direct system call that corresponds to epoll_wait on Unix (and equivalent APIs on other platforms).

Notifying subscribers

Once we've collected events from the poller, the next job of the event loop is to notify the tasks that are waiting for those events. In our implementation, these subscribers are simply stored as a set of (EventWrapper, Waker) pairs. When we detect that a certain event has occurred, we look into our event_history to check if it’s present, and if so, we call the wake_by_ref() method on its corresponding waker:

Note: EventWrapper is a jsut wrapper that implements Hash so we can use Event as a key in our HashMap.

for (event, waker) in events_registered.iter() {
    if event_history.lock().get(event).is_some() {
        waker.wake_by_ref();
    }
}

This is how the waiting future gets scheduled again by the executor. The Waker is what Rust’s async runtime uses to keep track of which tasks want to be woken up. Once the socket becomes readable or writable (or any other event you're watching for), the future that was waiting on it will be polled again — and will now be able to proceed.

It’s a clean and effective design: the loop listens, stores results, and wakes futures that care.

Subscribing to an event

To allow a future to be woken up, it first needs to register interest in a specific event. This is done through the register method:

pub(crate) fn register(&self, socket: &Socket, event: Event, waker: Waker) -> anyhow::Result<()> {
    if self.is_terminated.load(Ordering::Acquire) {
        Err(ScannnerNetError::AsyncRuntimeError("runtime is terminated".to_owned()))?;
    }

    trace!(?event, ?socket, "Registering event");
    self.poller.modify(socket, event)?;

    self.register_sender
        .try_send(RegisterEvent::Register { event, waker })
        .with_context(|| "failed to send register event to register loop")
}

This method notifies the OS that we're now interested in a new event on the socket using poller.modify, and then pushes a message to our channel so the event loop can keep track of the waker.

The registration message is then picked up by the event loop on each iteration:

while let Ok(event) = register_receiver.try_recv() {
    match event {
        RegisterEvent::Register { event, waker } => {
            events_registered.insert(event.into(), waker);
        }
        RegisterEvent::Unregister { event } => {
            events_registered.remove(&event.into());
        }
    }
}

In more performance-sensitive environments, registration could be handled by a separate thread or async task to reduce contention with the I/O thread. But in our case, a 200ms polling loop is acceptable because ping latency isn’t that time-sensitive and network responses are inherently unpredictable.

Where the Waker comes from

Let’s trace where this waker actually comes from. Consider this method on the AsyncRawSocket struct:

There's a few others method like `recv` and `connect`, you can reference to the code.
pub fn send_to(&self, data: &'a [u8], addr: &'a SockAddr) -> impl Future<Output = std::io::Result<usize>> + 'a {
    SendToFuture {
        socket: Arc::clone(&self.socket),
        runtime: Arc::clone(&self.runtime),
        data,
        addr,
        id: self.id,
    }
}

The send_to method only returns a future, which is implemented as below.

impl Future for SendFuture<'_> {
    type Output = std::io::Result<usize>;

    fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Self::Output> {
        match self.socket.send(self.data) {
            Ok(a) => std::task::Poll::Ready(Ok(a)),
            Err(e) => resolve(e, &self.socket, &self.runtime, Event::writable(self.id), cx.waker()),
        }
    }
}

Very easy to understand, if send is finished, we simply return Ok(size), though 99% of the time, at first execution, it will return a WouldBlock error, but this is expected.

If there's an error (which contains the WouldBlock error variant), whe call resolve, which handles the event registration.

fn resolve<T>(
    error: std::io::Error,
    socket: &Arc<Socket>,
    runtime: &Arc<Socket2Runtime>,
    event: Event,
    waker: &std::task::Waker,
) -> std::task::Poll<std::io::Result<T>> {
    if error.kind() == std::io::ErrorKind::WouldBlock {
        if let Err(e) = runtime.register(socket, event, waker.clone()) {
            return std::task::Poll::Ready(Err(std::io::Error::new(
                std::io::ErrorKind::Other,
                format!("failed to register socket to poller: {}", e),
            )));
        }
        return std::task::Poll::Pending;
    }
    std::task::Poll::Ready(Err(error))
}

This is how async readiness works. You try the I/O. If it fails with WouldBlock, you register interest and return Poll::Pending. The executor takes over, and when the event occurs, the waker gets triggered.

Summary

The event loop is just a dedicated thread that listens for I/O readiness and wakes tasks that need to proceed. Once the event loop is running, you can start using raw sockets in async style — like this:

socket.send_to(&packet_bytes, &addr).await?;

You’ll find this usage in production code here.

One last note: resource cleanup is important. Be sure to check out how we manage socket lifecycle with Drop to avoid resource leaks.