diff --git a/futures-async-await/Makefile b/futures-async-await/Makefile new file mode 100644 index 0000000..10e9758 --- /dev/null +++ b/futures-async-await/Makefile @@ -0,0 +1,11 @@ +all: + make slide slideshow + +slide: + pandoc -t beamer async.md -f markdown-implicit_figures -V colorlinks=true -V linkcolor=blue -V urlcolor=red -o async.pdf + +slideshow: + pandoc -t beamer async.md -f markdown-implicit_figures -V colorlinks=true -V linkcolor=blue -V urlcolor=red -i -o async-slideshow.pdf + +view: + zathura --mode=presentation async.pdf & diff --git a/futures-async-await/async-slideshow.pdf b/futures-async-await/async-slideshow.pdf new file mode 100644 index 0000000..0445ad9 Binary files /dev/null and b/futures-async-await/async-slideshow.pdf differ diff --git a/futures-async-await/async.md b/futures-async-await/async.md new file mode 100644 index 0000000..50a47d9 --- /dev/null +++ b/futures-async-await/async.md @@ -0,0 +1,474 @@ +--- +title: +- Getting comfy with async await +author: +- Sanchayan Maity +theme: +- default +classoption: +- aspectratio=169 +--- + +# Who + +- Who am I? + * Embedded Systems background + * Prefer C, Haskell and Rust + * Organize and speak at Rust and Haskell meet-ups in Bangalore +- Work? + * Software Engineer @ [asymptotic](https://asymptotic.io/) + * Open source consulting firm based out of Bangalore and Toronto + * Work on low level systems software centred around multimedia + * GStreamer, PipeWire, PulseAudio + * Language Polyglots + +# Agenda + +- `Future` trait +- `async`/`await` +- Using futures/Runtime +- Working with multiple futures (`select`, `join`, `FuturesOrdered`) +- Streams +- Pitfalls +- `Pin`/`Unpin`/`pin_project` + +# Future[^1] + +```rust +use std::future::Future; +use std::pin::Pin; +use std::task::Context; + +pub trait Future { + type Output; + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) + -> Poll; +} + +pub enum Poll { + Ready(T), + Pending, +} +``` + +[^1]: [Associated types](https://doc.rust-lang.org/reference/items/associated-items.html) + +# Example + +```rust +async fn hello() { + println!("Hello from async"); +} + +fn main() { + hello(); + println!("Hello from main"); +} +``` + +# Where's the future + +```rust +async fn give_number() -> u32 { + 100 +} +``` + +# Sugar town[^2] + +```rust +fn give_number() -> impl Future { + GiveNumberFuture +} + +struct GiveNumberFuture {} + +impl Future for GiveNumberFuture { + type Output = u32; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) + -> Poll { + Poll::Ready(100) + } +} +``` + +[^2]: [Syntactic sugar for Future](https://ruspiro.github.io/ruspiro-async-book/02-03-async.html) + + +# Runtimes + +![*_Run Futures_*](future-meme.jpg){width=80%} + + +# Runtimes[^3] + +- `futures::executor` +- `tokio` +- `smol-rs` +- `embassy` +- `glommio` +- `async-std` + +[^3]: [The state of Async Rust: Runtimes](https://corrode.dev/blog/async/) + + +# Example + +```rust +use futures::executor::block_on; + +async fn hello() { + println!("hello, world!"); +} + +fn main() { + block_on(hello()); + println!("Hello from main"); +} +``` + +# Example + +```rust +async fn hello() { + println!("Hello from async"); +} + +#[tokio::main] +async fn main() { + hello().await; + println!("Hello from main"); +} +``` + +# Multiple futures + +- `join` +- `join_all` +- `select` +- `select!` +- `select_all` +- `FuturesOrdered` +- `FuturesUnordered` +- `JoinSet` + +# `join` + +```rust +use futures::future; + +#[tokio::main] +async fn main() { + let a = async { "Future 1" }; + let b = async { "Future 2" }; + let pair = future::join(a, b); + + println!("{:?}", pair.await); +} +``` + +# `join_all` + +```rust +use futures::future::join_all; +async fn hello(msg: String) -> String { + msg +} + +#[tokio::main] +async fn main() { + let futures = vec![ + hello("Future 1".to_string()), + hello("Future 2".to_string()), + hello("Future 3".to_string()), + hello("Future 4".to_string()), + ]; + + println!("{:?}", join_all(futures).await); +} +``` + +# `JoinSet` + +```rust +use tokio::task::JoinSet; + +#[tokio::main] +async fn main() { + let mut set = JoinSet::new(); + for i in 0..10 { + set.spawn(async move { i }); + } + + while let Some(res) = set.join_next().await { + println!("{}", res.unwrap()); + } +} +``` + +# `future::select` + +```rust +pub fn select(future1: A, future2: B) -> Select +where + A: Future + Unpin, + B: Future + Unpin, +``` + + +# `future::select` + + +```rust +use futures::{future, future::Either, future::FutureExt, select}; +use tokio::time::{sleep, Duration}; + +async fn task1(delay: u64) -> u64 { + sleep(Duration::from_millis(delay)).await; + delay +} + +async fn task2(delay: u64) -> String { + sleep(Duration::from_millis(delay)).await; + "Hello".to_string() +} +``` + +# `future::select` + +```rust +#[tokio::main] +async fn main() { + let t1 = task1(200u64).fuse(); + let t2 = task2(300u64).fuse(); + + tokio::pin!(t1, t2); + + match future::select(t1, t2).await { + Either::Left((value1, _)) => println!("{}", value1), + Either::Right((value2, _)) => println!("{}", value2), + }; +} +``` + + +# `futures::select!`[^4] + +```rust +use futures::{future::FutureExt, pin_mut, select}; +use tokio::time::{sleep, Duration}; +async fn task(delay: u64) { + sleep(Duration::from_millis(delay)).await; +} + +#[tokio::main] +async fn main() { + let t1 = task(300u64).fuse(); + let t2 = task(200u64).fuse(); + pin_mut!(t1, t2); + select! { + () = t1 => println!("task one completed first"), + () = t2 => println!("task two completed first"), + } +} +``` + +[^4]: [futures::select!](https://docs.rs/futures/latest/futures/macro.select.html) + + +# `tokio::select!`[^5] + +```rust +use tokio::time::{sleep, Duration}; +async fn task(delay: u64) { + sleep(Duration::from_millis(delay)).await; +} + +#[tokio::main] +async fn main() { + let t1 = task(300u64); + let t2 = task(200u64); + tokio::pin!(t1, t2); + tokio::select! { + () = t1 => println!("task one completed first"), + () = t2 => println!("task two completed first"), + } +} +``` + +[^5]: [tokio::select!](https://docs.rs/tokio/latest/tokio/macro.select.html) + +# `loop tokio::select!` + +```rust +#[tokio::main] +async fn main() { + let mut count = 0; + let t1 = task(300u64); + let t2 = task(200u64); + tokio::pin!(t1, t2); + loop { + if count > 5 { + break; + } + tokio::select! { + () = &mut t1 => println!("task one completed first"), + () = &mut t2 => println!("task two completed first"), + } + count += 1; + } +} +``` + + +# `loop futures::select!` + +```rust +#[tokio::main] +async fn main() { + let mut count = 0; + let t1 = task(300u64).fuse(); + let t2 = task(200u64).fuse(); + tokio::pin!(t1, t2); + loop { + if count > 5 { + break; + } + futures::select! { + () = &mut t1 => println!("task one completed first"), + () = &mut t2 => println!("task two completed first"), + } + count += 1; + } +} +``` + +# `Stream`[^6] + +```rust +pub trait Stream { + type Item; + + // Required method + fn poll_next( + self: Pin<&mut Self>, + cx: &mut Context<'_> + ) -> Poll>; +} +``` + +[^6]: [Guided tour of Streams](https://www.qovery.com/blog/a-guided-tour-of-streams-in-rust/) + + +# `async-stream` + +```rust +fn zero_to_three() -> impl Stream { + stream! { + for i in 0..3 { + yield i; + } + } +} + +#[tokio::main] +async fn main() { + let s = zero_to_three(); + pin_mut!(s); // needed for iteration + + while let Some(value) = s.next().await { + println!("got {}", value); + } +} +``` + + +# `futures::select!` vs `tokio::select!` + +- [futures::select!](https://docs.rs/futures/latest/futures/macro.select.html) +- [tokio::select!](https://docs.rs/tokio/latest/tokio/macro.select.html) +- [SO - What's the difference between futures::select and tokio::select?](https://stackoverflow.com/questions/60811657/what-is-the-difference-between-futuresselect-and-tokioselect) +- [Provide `select!` macro](https://stackoverflow.com/questions/60811657/what-is-the-difference-between-futuresselect-and-tokioselect) + +# Multiple futures + +- [FuturesUnordered](https://docs.rs/futures/latest/futures/stream/struct.FuturesUnordered.html) +- [FuturesOrdered](https://docs.rs/futures/latest/futures/stream/struct.FuturesOrdered.html) +- Must read + - [FuturesUnordered and the order of futures](https://without.boats/blog/futures-unordered/) + +# Cancellation + +- [futures::future::Abortable](https://docs.rs/futures/latest/futures/future/struct.Abortable.html) + +# Pitfalls + +- Blocking in `async` + - [Async: What's blocking](https://ryhl.io/blog/async-what-is-blocking/) + - TLDR: Async code should never spend a long time without reaching an `.await` +- Cancellation safety +- Holding a `Mutex` across an `await` +- Must read + - [Async cancellation: a case study of pub-sub in mini-redis](https://smallcultfollowing.com/babysteps/blog/2022/06/13/async-cancellation-a-case-study-of-pub-sub-in-mini-redis/) + - [Yoshua Wuyts - Async Cancellation](https://blog.yoshuawuyts.com/async-cancellation-1/) + - [Common mistakes with Rust Async](https://www.qovery.com/blog/common-mistakes-with-rust-async/) + - [Rust tokio task cancellation patterns](https://cybernetist.com/2024/04/19/rust-tokio-task-cancellation-patterns/) + - [`for await` and the battle of buffered streams](https://tmandry.gitlab.io/blog/posts/for-await-buffered-streams/) + - [Mutex without lock, Queue without push: cancel safety in lilos](https://cliffle.com/blog/lilos-cancel-safety/) + + +# Cancellation safety with `select!` + +So the TLDR + +- futures in `select!` other than the future that yields `Poll::Ready` get dropped +- futures which own some form of state aren't cancellation safe, since the + owned state gets dropped when another future returns `Poll::Ready` + + +# Pinning + +```rust +use std::pin::Pin; +use pin_project::pin_project; + +#[pin_project] +struct Struct { + #[pin] + pinned: T, + unpinned: U, +} + +impl Struct { + fn method(self: Pin<&mut Self>) { + let this = self.project(); + let _: Pin<&mut T> = this.pinned; // Pinned reference to the field + let _: &mut U = this.unpinned; // Normal reference to the field + } +} +``` + +# Pinning + +- Must read + - [std::pin](https://doc.rust-lang.org/std/pin/index.html#projections-and-structural-pinning) + - [pin_project](https://docs.rs/pin-project/latest/pin_project/) + - [Pin and suffering](https://fasterthanli.me/articles/pin-and-suffering) + - [Pin, Unpin, and why Rust needs them](https://blog.cloudflare.com/pin-and-unpin-in-rust/) + +# More references + +- [Meetup code samples](https://git.sanchayanmaity.net/sanchayanmaity/async-await-rust-meetup-examples) +- [Tokio tutorial](https://tokio.rs/tokio/tutorial) +- [Tokio internals](https://cafbit.com/post/tokio_internals/) +- [How Rust optimizes async/await - I](https://tmandry.gitlab.io/blog/posts/optimizing-await-1/) +- [How Rust optimizes async/await - II](https://tmandry.gitlab.io/blog/posts/optimizing-await-2/) + +# Questions + +- Reach out on + * Email: me@sanchayanmaity.net + * Mastodon: [sanchayanmaity.com](https://sanchayanmaity.com/@sanchayan) + * Telegram: https://t.me/SanchayanMaity + * Blog: [sanchayanmaity.net](https://sanchayanmaity.net/) diff --git a/futures-async-await/async.pdf b/futures-async-await/async.pdf new file mode 100644 index 0000000..113c7ec Binary files /dev/null and b/futures-async-await/async.pdf differ diff --git a/futures-async-await/future-meme.jpg b/futures-async-await/future-meme.jpg new file mode 100644 index 0000000..6093329 Binary files /dev/null and b/futures-async-await/future-meme.jpg differ