--- title: - Getting comfy with async await author: - Sanchayan Maity theme: - default classoption: - aspectratio=169 --- # Who - Who am I? * Embedded Systems background * Prefer C, Haskell and Rust * Organize and speak at Rust and Haskell meet-ups in Bangalore - Work? * Software Engineer @ [asymptotic](https://asymptotic.io/) * Open source consulting firm based out of Bangalore and Toronto * Work on low level systems software centred around multimedia * GStreamer, PipeWire, PulseAudio * Language Polyglots # Agenda - `Future` trait - `async`/`await` - Using futures/Runtime - Working with multiple futures (`select`, `join`, `FuturesOrdered`) - Streams - Pitfalls - `Pin`/`Unpin`/`pin_project` # Future[^1] ```rust use std::future::Future; use std::pin::Pin; use std::task::Context; pub trait Future { type Output; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll; } pub enum Poll { Ready(T), Pending, } ``` [^1]: [Associated types](https://doc.rust-lang.org/reference/items/associated-items.html) # Example ```rust async fn hello() { println!("Hello from async"); } fn main() { hello(); println!("Hello from main"); } ``` # Where's the future ```rust async fn give_number() -> u32 { 100 } ``` # Sugar town[^2] ```rust fn give_number() -> impl Future { GiveNumberFuture } struct GiveNumberFuture {} impl Future for GiveNumberFuture { type Output = u32; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { Poll::Ready(100) } } ``` [^2]: [Syntactic sugar for Future](https://ruspiro.github.io/ruspiro-async-book/02-03-async.html) # Runtimes ![*_Run Futures_*](future-meme.jpg){width=80%} # Runtimes[^3] - `futures::executor` - `tokio` - `smol-rs` - `embassy` - `glommio` - `async-std` [^3]: [The state of Async Rust: Runtimes](https://corrode.dev/blog/async/) # Example ```rust use futures::executor::block_on; async fn hello() { println!("hello, world!"); } fn main() { block_on(hello()); println!("Hello from main"); } ``` # Example ```rust async fn hello() { println!("Hello from async"); } #[tokio::main] async fn main() { hello().await; println!("Hello from main"); } ``` # Multiple futures - `join` - `join_all` - `select` - `select!` - `select_all` - `FuturesOrdered` - `FuturesUnordered` - `JoinSet` # `join` ```rust use futures::future; #[tokio::main] async fn main() { let a = async { "Future 1" }; let b = async { "Future 2" }; let pair = future::join(a, b); println!("{:?}", pair.await); } ``` # `join_all` ```rust use futures::future::join_all; async fn hello(msg: String) -> String { msg } #[tokio::main] async fn main() { let futures = vec![ hello("Future 1".to_string()), hello("Future 2".to_string()), hello("Future 3".to_string()), hello("Future 4".to_string()), ]; println!("{:?}", join_all(futures).await); } ``` # `JoinSet` ```rust use tokio::task::JoinSet; #[tokio::main] async fn main() { let mut set = JoinSet::new(); for i in 0..10 { set.spawn(async move { i }); } while let Some(res) = set.join_next().await { println!("{}", res.unwrap()); } } ``` # `future::select` ```rust pub fn select(future1: A, future2: B) -> Select where A: Future + Unpin, B: Future + Unpin, ``` # `future::select` ```rust use futures::{future, future::Either, future::FutureExt, select}; use tokio::time::{sleep, Duration}; async fn task1(delay: u64) -> u64 { sleep(Duration::from_millis(delay)).await; delay } async fn task2(delay: u64) -> String { sleep(Duration::from_millis(delay)).await; "Hello".to_string() } ``` # `future::select` ```rust #[tokio::main] async fn main() { let t1 = task1(200u64).fuse(); let t2 = task2(300u64).fuse(); tokio::pin!(t1, t2); match future::select(t1, t2).await { Either::Left((value1, _)) => println!("{}", value1), Either::Right((value2, _)) => println!("{}", value2), }; } ``` # `futures::select!`[^4] ```rust use futures::{future::FutureExt, pin_mut, select}; use tokio::time::{sleep, Duration}; async fn task(delay: u64) { sleep(Duration::from_millis(delay)).await; } #[tokio::main] async fn main() { let t1 = task(300u64).fuse(); let t2 = task(200u64).fuse(); pin_mut!(t1, t2); select! { () = t1 => println!("task one completed first"), () = t2 => println!("task two completed first"), } } ``` [^4]: [futures::select!](https://docs.rs/futures/latest/futures/macro.select.html) # `tokio::select!`[^5] ```rust use tokio::time::{sleep, Duration}; async fn task(delay: u64) { sleep(Duration::from_millis(delay)).await; } #[tokio::main] async fn main() { let t1 = task(300u64); let t2 = task(200u64); tokio::pin!(t1, t2); tokio::select! { () = t1 => println!("task one completed first"), () = t2 => println!("task two completed first"), } } ``` [^5]: [tokio::select!](https://docs.rs/tokio/latest/tokio/macro.select.html) # `loop tokio::select!` ```rust #[tokio::main] async fn main() { let mut count = 0; let t1 = task(300u64); let t2 = task(200u64); tokio::pin!(t1, t2); loop { if count > 5 { break; } tokio::select! { () = &mut t1 => println!("task one completed first"), () = &mut t2 => println!("task two completed first"), } count += 1; } } ``` # `loop futures::select!` ```rust #[tokio::main] async fn main() { let mut count = 0; let t1 = task(300u64).fuse(); let t2 = task(200u64).fuse(); tokio::pin!(t1, t2); loop { if count > 5 { break; } futures::select! { () = &mut t1 => println!("task one completed first"), () = &mut t2 => println!("task two completed first"), } count += 1; } } ``` # `Stream`[^6] ```rust pub trait Stream { type Item; // Required method fn poll_next( self: Pin<&mut Self>, cx: &mut Context<'_> ) -> Poll>; } ``` [^6]: [Guided tour of Streams](https://www.qovery.com/blog/a-guided-tour-of-streams-in-rust/) # `async-stream` ```rust fn zero_to_three() -> impl Stream { stream! { for i in 0..3 { yield i; } } } #[tokio::main] async fn main() { let s = zero_to_three(); pin_mut!(s); // needed for iteration while let Some(value) = s.next().await { println!("got {}", value); } } ``` # `futures::select!` vs `tokio::select!` - [futures::select!](https://docs.rs/futures/latest/futures/macro.select.html) - [tokio::select!](https://docs.rs/tokio/latest/tokio/macro.select.html) - [SO - What's the difference between futures::select and tokio::select?](https://stackoverflow.com/questions/60811657/what-is-the-difference-between-futuresselect-and-tokioselect) - [Provide `select!` macro](https://stackoverflow.com/questions/60811657/what-is-the-difference-between-futuresselect-and-tokioselect) # Multiple futures - [FuturesUnordered](https://docs.rs/futures/latest/futures/stream/struct.FuturesUnordered.html) - [FuturesOrdered](https://docs.rs/futures/latest/futures/stream/struct.FuturesOrdered.html) - Must read - [FuturesUnordered and the order of futures](https://without.boats/blog/futures-unordered/) # Cancellation - [futures::future::Abortable](https://docs.rs/futures/latest/futures/future/struct.Abortable.html) # Pitfalls - Blocking in `async` - [Async: What's blocking](https://ryhl.io/blog/async-what-is-blocking/) - TLDR: Async code should never spend a long time without reaching an `.await` - Cancellation safety - Holding a `Mutex` across an `await` - Must read - [Async cancellation: a case study of pub-sub in mini-redis](https://smallcultfollowing.com/babysteps/blog/2022/06/13/async-cancellation-a-case-study-of-pub-sub-in-mini-redis/) - [Yoshua Wuyts - Async Cancellation](https://blog.yoshuawuyts.com/async-cancellation-1/) - [Common mistakes with Rust Async](https://www.qovery.com/blog/common-mistakes-with-rust-async/) - [Rust tokio task cancellation patterns](https://cybernetist.com/2024/04/19/rust-tokio-task-cancellation-patterns/) - [`for await` and the battle of buffered streams](https://tmandry.gitlab.io/blog/posts/for-await-buffered-streams/) - [Mutex without lock, Queue without push: cancel safety in lilos](https://cliffle.com/blog/lilos-cancel-safety/) # Cancellation safety with `select!` So the TLDR - futures in `select!` other than the future that yields `Poll::Ready` get dropped - futures which own some form of state aren't cancellation safe, since the owned state gets dropped when another future returns `Poll::Ready` # Pinning ```rust use std::pin::Pin; use pin_project::pin_project; #[pin_project] struct Struct { #[pin] pinned: T, unpinned: U, } impl Struct { fn method(self: Pin<&mut Self>) { let this = self.project(); let _: Pin<&mut T> = this.pinned; // Pinned reference to the field let _: &mut U = this.unpinned; // Normal reference to the field } } ``` # Pinning - Must read - [std::pin](https://doc.rust-lang.org/std/pin/index.html#projections-and-structural-pinning) - [Pin and suffering](https://fasterthanli.me/articles/pin-and-suffering) - [Pin, Unpin, and why Rust needs them](https://blog.cloudflare.com/pin-and-unpin-in-rust/) - [Async Book - Pinning](https://rust-lang.github.io/async-book/04_pinning/01_chapter.html) - [pin_project](https://docs.rs/pin-project/latest/pin_project/) # More references - [Meetup code samples](https://git.sanchayanmaity.net/sanchayanmaity/async-await-rust-meetup-examples) - [Tokio tutorial](https://tokio.rs/tokio/tutorial) - [Tokio select](https://tokio.rs/tokio/tutorial/select) - [Tokio internals](https://cafbit.com/post/tokio_internals/) - [How Rust optimizes async/await - I](https://tmandry.gitlab.io/blog/posts/optimizing-await-1/) - [How Rust optimizes async/await - II](https://tmandry.gitlab.io/blog/posts/optimizing-await-2/) # Questions - Reach out on * Email: me@sanchayanmaity.net * Mastodon: [sanchayanmaity.com](https://sanchayanmaity.com/@sanchayan) * Telegram: https://t.me/SanchayanMaity * Blog: [sanchayanmaity.net](https://sanchayanmaity.net/)