From daa6d55fb1ca546cbfe922141eb0002194f85e0b Mon Sep 17 00:00:00 2001 From: Sanchayan Maity Date: Sun, 15 Sep 2024 19:39:20 +0530 Subject: [PATCH] Examples for async await talk at Rust meetup --- .gitignore | 1 + Cargo.lock | 349 ++++++++++++++++++++++++++++++++++++++++++++++++++++ Cargo.toml | 10 ++ src/main.rs | 195 +++++++++++++++++++++++++++++ 4 files changed, 555 insertions(+) create mode 100644 .gitignore create mode 100644 Cargo.lock create mode 100644 Cargo.toml create mode 100644 src/main.rs diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ea8c4bf --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +/target diff --git a/Cargo.lock b/Cargo.lock new file mode 100644 index 0000000..7f067e7 --- /dev/null +++ b/Cargo.lock @@ -0,0 +1,349 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 3 + +[[package]] +name = "addr2line" +version = "0.24.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f5fb1d8e4442bd405fdfd1dacb42792696b0cf9cb15882e5d097b742a676d375" +dependencies = [ + "gimli", +] + +[[package]] +name = "adler2" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627" + +[[package]] +name = "async-example" +version = "0.1.0" +dependencies = [ + "async-stream", + "futures", + "tokio", +] + +[[package]] +name = "async-stream" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd56dd203fef61ac097dd65721a419ddccb106b2d2b70ba60a6b529f03961a51" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "autocfg" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c4b4d0bd25bd0b74681c0ad21497610ce1b7c91b1022cd21c80c6fbdd9476b0" + +[[package]] +name = "backtrace" +version = "0.3.74" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8d82cb332cdfaed17ae235a638438ac4d4839913cc2af585c3c6746e8f8bee1a" +dependencies = [ + "addr2line", + "cfg-if", + "libc", + "miniz_oxide", + "object", + "rustc-demangle", + "windows-targets", +] + +[[package]] +name = "cfg-if" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" + +[[package]] +name = "futures" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-channel" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" +dependencies = [ + "futures-core", + "futures-sink", +] + +[[package]] +name = "futures-core" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" + +[[package]] +name = "futures-executor" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" + +[[package]] +name = "futures-macro" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "futures-sink" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5" + +[[package]] +name = "futures-task" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004" + +[[package]] +name = "futures-util" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" +dependencies = [ + "futures-channel", + "futures-core", + "futures-io", + "futures-macro", + "futures-sink", + "futures-task", + "memchr", + "pin-project-lite", + "pin-utils", + "slab", +] + +[[package]] +name = "gimli" +version = "0.31.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32085ea23f3234fc7846555e85283ba4de91e21016dc0455a16286d87a292d64" + +[[package]] +name = "libc" +version = "0.2.158" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8adc4bb1803a324070e64a98ae98f38934d91957a99cfb3a43dcbc01bc56439" + +[[package]] +name = "memchr" +version = "2.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" + +[[package]] +name = "miniz_oxide" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2d80299ef12ff69b16a84bb182e3b9df68b5a91574d3d4fa6e41b65deec4df1" +dependencies = [ + "adler2", +] + +[[package]] +name = "object" +version = "0.36.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "084f1a5821ac4c651660a94a7153d27ac9d8a53736203f58b31945ded098070a" +dependencies = [ + "memchr", +] + +[[package]] +name = "pin-project-lite" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bda66fc9667c18cb2758a2ac84d1167245054bcf85d5d1aaa6923f45801bdd02" + +[[package]] +name = "pin-utils" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" + +[[package]] +name = "proc-macro2" +version = "1.0.86" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e719e8df665df0d1c8fbfd238015744736151d4445ec0836b8e628aae103b77" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "quote" +version = "1.0.37" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b5b9d34b8991d19d98081b46eacdd8eb58c6f2b201139f7c5f643cc155a633af" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "rustc-demangle" +version = "0.1.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f" + +[[package]] +name = "slab" +version = "0.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f92a496fb766b417c996b9c5e57daf2f7ad3b0bebe1ccfca4856390e3d3bb67" +dependencies = [ + "autocfg", +] + +[[package]] +name = "syn" +version = "2.0.77" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9f35bcdf61fd8e7be6caf75f429fdca8beb3ed76584befb503b1569faee373ed" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + +[[package]] +name = "tokio" +version = "1.40.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2b070231665d27ad9ec9b8df639893f46727666c6767db40317fbe920a5d998" +dependencies = [ + "backtrace", + "pin-project-lite", + "tokio-macros", +] + +[[package]] +name = "tokio-macros" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "unicode-ident" +version = "1.0.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e91b56cd4cadaeb79bbf1a5645f6b4f8dc5bde8834ad5894a8db35fda9efa1fe" + +[[package]] +name = "windows-targets" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973" +dependencies = [ + "windows_aarch64_gnullvm", + "windows_aarch64_msvc", + "windows_i686_gnu", + "windows_i686_gnullvm", + "windows_i686_msvc", + "windows_x86_64_gnu", + "windows_x86_64_gnullvm", + "windows_x86_64_msvc", +] + +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" + +[[package]] +name = "windows_aarch64_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" + +[[package]] +name = "windows_i686_gnu" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b" + +[[package]] +name = "windows_i686_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" + +[[package]] +name = "windows_i686_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" + +[[package]] +name = "windows_x86_64_gnu" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" + +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..2be4053 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "async-example" +version = "0.1.0" +edition = "2021" + +[dependencies] +tokio = { version = "1.36.0", default-features = false, features = ["time", "rt-multi-thread", "macros"] } +futures = "0.3.30" +async-stream = "0.3.5" + diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..c9e64bf --- /dev/null +++ b/src/main.rs @@ -0,0 +1,195 @@ +#![allow(unused_imports)] +#![allow(dead_code)] + +use async_stream::stream; +use futures::{ + future::BoxFuture, + future::FutureExt, + select, + stream::{FuturesOrdered, FuturesUnordered, Stream}, + StreamExt, +}; +use std::future::Future; +use std::pin::Pin; +use tokio::task::{JoinError, JoinHandle}; +use tokio::time::{sleep, Duration}; + +async fn sleep_then_print(timer: i32) { + println!("Start timer {}.", timer); + + std::thread::sleep(Duration::from_secs(timer as u64)); + // tokio::time::sleep(Duration::from_secs(timer as u64)).await; + // tokio::time::sleep(Duration::from_secs(1)).await; + + println!("Timer {} done.", timer); +} + +async fn blocking() { + tokio::join!( + sleep_then_print(100), + sleep_then_print(2), + sleep_then_print(3), + ); + // tokio::join!( + // tokio::spawn(sleep_then_print(1)), + // tokio::spawn(sleep_then_print(2)), + // tokio::spawn(sleep_then_print(3)), + // ); +} + +async fn task(delay: u64, val: u64) -> u64 { + sleep(Duration::from_millis(delay)).await; + val +} + +fn zero_to_three() -> impl Stream { + stream! { + for i in 0..3 { + yield i; + } + } +} + +async fn stream() { + let t = zero_to_three(); + tokio::pin!(t); + + while let Some(value) = t.next().await { + println!("got {}", value); + } +} + +async fn tokio_select_main() { + let mut count = 0; + let t1 = task(300u64, 1); + let t2 = task(200u64, 2); + let t3 = zero_to_three(); + + tokio::pin!(t1, t2, t3); + + loop { + if count > 5 { + break; + } + + tokio::select! { + t1_val = &mut t1 => println!("task one completed first {}", t1_val), + t2_val = &mut t2 => println!("task two completed first {}", t2_val), + Some(val) = &mut t3.next() => println!("{:?}", val), + else => break, + }; + + count += 1; + } +} + +async fn futures_select_main() { + let mut count = 0; + let t1 = task(300u64, 1).fuse(); + let t2 = task(200u64, 2).fuse(); + let t3 = zero_to_three().fuse(); + + tokio::pin!(t1, t2, t3); + + loop { + if count > 5 { + break; + } + + futures::select! { + t1_val = &mut t1 => println!("task one completed first {}", t1_val), + t2_val = &mut t2 => println!("task two completed first {}", t2_val), + val = &mut t3.next() => println!("{:?}", val), + complete => break, + }; + + count += 1; + } +} + +async fn tokio_select_main_cond() { + let mut count = 0; + let t1 = task(300u64, 1); + let t2 = task(200u64, 2); + let t3 = zero_to_three(); + + tokio::pin!(t1, t2, t3); + + let mut done1 = false; + let mut done2 = false; + + loop { + if count > 5 { + break; + } + + tokio::select! { + t1_val = &mut t1, if !done1 => { + done1 = true; + println!("task one completed first {}", t1_val); + } + t2_val = &mut t2, if !done2 => { + done2 = true; + println!("task two completed first {}", t2_val); + } + Some(val) = &mut t3.next() => println!("{:?}", val), + else => break, + }; + + count += 1; + } +} + +async fn unordered() { + let mut tasks = FuturesUnordered::>>>::new(); + tasks.push(Box::pin(task(1000, 1))); + tasks.push(Box::pin(task(1000, 2))); + tasks.push(Box::pin(task(1000, 3))); + + while let Some(result) = tasks.next().await { + println!("Task result {result}"); + } +} + +async fn ordered() { + let mut tasks = FuturesOrdered::>>>::new(); + tasks.push_back(Box::pin(task(1000, 1))); + tasks.push_back(Box::pin(task(1000, 2))); + tasks.push_back(Box::pin(task(1000, 3))); + + while let Some(result) = tasks.next().await { + println!("Task result {result}"); + } +} + +async fn unordered_handle() { + // let mut tasks = + // FuturesUnordered::>>>>::new(); + // let mut tasks = FuturesUnordered::>>::new(); + let mut tasks = FuturesUnordered::>::new(); + tasks.push(tokio::spawn(task(1000, 1))); + tasks.push(tokio::spawn(task(1000, 2))); + tasks.push(tokio::spawn(task(1000, 3))); + + while let Some(result) = tasks.next().await { + println!("Task result {result:?}"); + } +} + +async fn hello() { + println!("Hello from async"); +} + +#[tokio::main(flavor = "multi_thread", worker_threads = 4)] +async fn main() { + println!("Hello Rust Bangalore!!"); + // hello(); + // stream().await; + // tokio_select_main().await; + // futures_main().await; + // tokio_select_main_cond().await; + // ordered().await; + // unordered().await; + // unordered_handle().await; + // blocking().await; +}