Async Runtimes
Here’s a link to the code on a GitHub repo
I’m trying to understand async
runtimes better, specifically in Rust. This post is a short attempt to build the most basic example of a future and execute it.
A better and more detailed version of this can be found in this book.
Futures
Creating a future in Rust is very straightforward. You just need to implement the Future
trait, which requires one method, poll
, to be implemented.
Let’s build a simple future.
use std::{
future::Future,
sync::{
mpsc::{sync_channel, Receiver, SyncSender},
Arc, Mutex,
},
task::{Context, Poll, Waker},
thread,
time::Duration,
};
use futures::{
future::{BoxFuture, FutureExt},
task::{waker_ref, ArcWake},
};
struct SharedState {
completed: bool,
waker: Option<Waker>,
}
pub struct TimerFuture {
shared_state: Arc<Mutex<SharedState>>,
}
The Waker
referenced above is a way for the task to inform it’s executor that it should be polled again. You can read more from the docs here.
Now, let’s make it a future and construct an instance of it.
Note: If you want to understand more about what Pin
is and why it’s required, look here. Long story short - it’s a way for Rust to ensure that the data being pointed at isn’t moved around in memory
impl Future for TimerFuture {
type Output = ();
fn poll(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
let mut shared_state = self.shared_state.lock().unwrap();
if shared_state.completed {
return Poll::Ready(());
}
shared_state.waker = Some(cx.waker().clone());
return Poll::Pending;
}
}
impl TimerFuture {
fn new(duration: Duration) -> Self {
let shared_state = Arc::new(Mutex::new(SharedState {
completed: false,
waker: None,
}));
let thread_shared_state = Arc::clone(&shared_state);
thread::spawn(move || {
thread::sleep(duration);
let mut shared_state = thread_shared_state.lock().unwrap();
shared_state.completed = true;
if let Some(waker) = shared_state.waker.take() {
waker.wake();
}
});
TimerFuture { shared_state }
}
}
There we go, we now have a future. Now, we need to run this future to completion.
Executing A Future
Let’s get some conceptual modeling out of the way first. We have the following terms - Spawner
, Executor
, Task
, Future
& Waker
. Let’s build the mental model bottom up.
A Future
is something that will complete at some point in time.
A Waker
is a way for the future to ensure that it is polled again (typically by placing the future back onto the executor queue)
A Task
is a wrapper around a Future
and a Waker
.
A Spawner
is a component that constructs a Task
and provides it to the Executor
.
An Executor
constantly checks it’s list of Tasks
and polls their futures for completion
A picture is worth a thousand words, so here’s one
Let’s write some code to represent this.
Overall Structure
Here’s the basic structs for Task
, Spawner
& Executor
.
struct Task {
future: Mutex<BoxFuture<'static, ()>>,
task_sender: SyncSender<Arc<Task>>,
}
struct Spawner {
task_sender: SyncSender<Arc<Task>>,
}
struct Executor {
task_queue: Receiver<Arc<Task>>,
}
Task
and Spawner
keep putting tasks onto the channel and the Executor
has the receiving end of the channel and keeps polling the futures within these tasks.
Task
struct Task {
future: Mutex<BoxFuture<'static, ()>>,
task_sender: SyncSender<Arc<Task>>,
}
The BoxFuture
is an aliased type for pub type BoxFuture<'a, T> = Pin<alloc::boxed::Box<dyn Future<Output = T> + Send + 'a>>
.
It’s a complicated type but it essentially means that I have a Box
container around a dynamic Future
and I do not want the data within the Box
container to be moved around in memory so I wrap it in a Pin
.
The easiest way to understand
Pin
is to work through a simple example. Let’s create a simplePin
wrapper and try to move it. The example below compiles with an error becauseString
does not haveCopy
semantics, so when it tries to move the value, it cannot because the container has been pinned in place.#[derive(Debug)] struct MyData { value: String, } let my_data = Pin::new(Box::new(MyData { value: String::from("hello"), })); let moved_data = Box::new(MyData { value: my_data.value, }); error[E0507]: cannot move out of dereference of `Pin<Box<MyData>>` --> src/main.rs:40:16 | 40 | value: my_data.value, | ^^^^^^^^^^^^^ move occurs because value has type `String`, which does not implement the `Copy` trait | help: consider cloning the value if the performance cost is acceptable | 40 | value: my_data.value.clone(), | ++++++++
Now, we said earlier that Task
is a wrapper for a Future
and a Wake
object. By implementing the trait below, we allow a Waker
object to be constructed from a Task
. More on that below.
impl ArcWake for Task {
fn wake_by_ref(arc_self: &Arc<Self>) {
let arc_clone = Arc::clone(&arc_self);
arc_self.task_sender.send(arc_clone).unwrap();
}
}
Spawner
struct Spawner {
task_sender: SyncSender<Arc<Task>>,
}
This component pushes new tasks onto the channel.
impl Spawner {
fn spawn(&self, future: impl Future<Output = ()> + Send + 'static) {
let box_future = future.boxed();
let task = Arc::new(Task {
future: Mutex::new(box_future),
task_sender: self.task_sender.clone(),
});
self.task_sender.send(task).unwrap();
}
}
Executor
struct Executor {
task_queue: Receiver<Arc<Task>>,
}
This component blocks on the receiver until it receives new tasks. Once it gets these tasks, it polls them to determine if they are complete.
There is also a convenience method for constructing an Executor
and Spawner
.
impl Executor {
fn run(&self) {
while let Ok(task) = self.task_queue.recv() {
let mut fut = task.future.lock().unwrap();
let waker = waker_ref(&task);
let context = &mut Context::from_waker(&waker);
if fut.as_mut().poll(context).is_ready() {
println!("The future is done running");
}
}
}
fn executor_and_spawner() -> (Executor, Spawner) {
let (sync_sender, receiver) = sync_channel(10000);
let executor = Executor {
task_queue: receiver,
};
let spawner = Spawner {
task_sender: sync_sender,
};
(executor, spawner)
}
}
In the run
method, you can see the Waker
object being constructed from the Task
. This is possible because we implemented the ArcWake
trait for the Task
above, and we are providing it in the Context
object to the poll
method.
Now, when the future is ready and it calls the wake
method on the Wake
object, the method in the ArcWake
trait is executed.
Putting It Together
Here’s a simple test to demonstrate how this works together
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn spawn_tasks() {
let (executor, spawner) = Executor::executor_and_spawner();
spawner.spawn(async {
println!("Hello");
TimerFuture::new(Duration::from_secs(2)).await;
println!("World");
});
drop(spawner);
executor.run();
}
}
The output should be Hello
, followed by a 2 second pause and then a World
.
Here’s a flow diagram for what is going on
-> Spawner creates task
-> Spawner puts task into execution queue
-> Executor polls the future in the task and finds it pending
-> The future will keep track of the Waker object
-> Duration elapses
-> The future calls the `.wake` method
-> The `ArcWake` implementation on Task is triggered
-> The task is again placed on the queue
-> Executor polls the future and finds it completed
Conclusion
So, there you have it - a very, very simple implementation of a single threaded executor.