Examples for async await talk at Rust meetup

This commit is contained in:
Sanchayan Maity 2024-09-15 19:39:20 +05:30
commit daa6d55fb1
4 changed files with 555 additions and 0 deletions

1
.gitignore vendored Normal file
View file

@ -0,0 +1 @@
/target

349
Cargo.lock generated Normal file
View file

@ -0,0 +1,349 @@
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
version = 3
[[package]]
name = "addr2line"
version = "0.24.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f5fb1d8e4442bd405fdfd1dacb42792696b0cf9cb15882e5d097b742a676d375"
dependencies = [
"gimli",
]
[[package]]
name = "adler2"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627"
[[package]]
name = "async-example"
version = "0.1.0"
dependencies = [
"async-stream",
"futures",
"tokio",
]
[[package]]
name = "async-stream"
version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cd56dd203fef61ac097dd65721a419ddccb106b2d2b70ba60a6b529f03961a51"
dependencies = [
"async-stream-impl",
"futures-core",
"pin-project-lite",
]
[[package]]
name = "async-stream-impl"
version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "autocfg"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c4b4d0bd25bd0b74681c0ad21497610ce1b7c91b1022cd21c80c6fbdd9476b0"
[[package]]
name = "backtrace"
version = "0.3.74"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8d82cb332cdfaed17ae235a638438ac4d4839913cc2af585c3c6746e8f8bee1a"
dependencies = [
"addr2line",
"cfg-if",
"libc",
"miniz_oxide",
"object",
"rustc-demangle",
"windows-targets",
]
[[package]]
name = "cfg-if"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "futures"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0"
dependencies = [
"futures-channel",
"futures-core",
"futures-executor",
"futures-io",
"futures-sink",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-channel"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78"
dependencies = [
"futures-core",
"futures-sink",
]
[[package]]
name = "futures-core"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d"
[[package]]
name = "futures-executor"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d"
dependencies = [
"futures-core",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-io"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1"
[[package]]
name = "futures-macro"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "futures-sink"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5"
[[package]]
name = "futures-task"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004"
[[package]]
name = "futures-util"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48"
dependencies = [
"futures-channel",
"futures-core",
"futures-io",
"futures-macro",
"futures-sink",
"futures-task",
"memchr",
"pin-project-lite",
"pin-utils",
"slab",
]
[[package]]
name = "gimli"
version = "0.31.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "32085ea23f3234fc7846555e85283ba4de91e21016dc0455a16286d87a292d64"
[[package]]
name = "libc"
version = "0.2.158"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d8adc4bb1803a324070e64a98ae98f38934d91957a99cfb3a43dcbc01bc56439"
[[package]]
name = "memchr"
version = "2.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3"
[[package]]
name = "miniz_oxide"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e2d80299ef12ff69b16a84bb182e3b9df68b5a91574d3d4fa6e41b65deec4df1"
dependencies = [
"adler2",
]
[[package]]
name = "object"
version = "0.36.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "084f1a5821ac4c651660a94a7153d27ac9d8a53736203f58b31945ded098070a"
dependencies = [
"memchr",
]
[[package]]
name = "pin-project-lite"
version = "0.2.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bda66fc9667c18cb2758a2ac84d1167245054bcf85d5d1aaa6923f45801bdd02"
[[package]]
name = "pin-utils"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "proc-macro2"
version = "1.0.86"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5e719e8df665df0d1c8fbfd238015744736151d4445ec0836b8e628aae103b77"
dependencies = [
"unicode-ident",
]
[[package]]
name = "quote"
version = "1.0.37"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b5b9d34b8991d19d98081b46eacdd8eb58c6f2b201139f7c5f643cc155a633af"
dependencies = [
"proc-macro2",
]
[[package]]
name = "rustc-demangle"
version = "0.1.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f"
[[package]]
name = "slab"
version = "0.4.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f92a496fb766b417c996b9c5e57daf2f7ad3b0bebe1ccfca4856390e3d3bb67"
dependencies = [
"autocfg",
]
[[package]]
name = "syn"
version = "2.0.77"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9f35bcdf61fd8e7be6caf75f429fdca8beb3ed76584befb503b1569faee373ed"
dependencies = [
"proc-macro2",
"quote",
"unicode-ident",
]
[[package]]
name = "tokio"
version = "1.40.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e2b070231665d27ad9ec9b8df639893f46727666c6767db40317fbe920a5d998"
dependencies = [
"backtrace",
"pin-project-lite",
"tokio-macros",
]
[[package]]
name = "tokio-macros"
version = "2.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "unicode-ident"
version = "1.0.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e91b56cd4cadaeb79bbf1a5645f6b4f8dc5bde8834ad5894a8db35fda9efa1fe"
[[package]]
name = "windows-targets"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973"
dependencies = [
"windows_aarch64_gnullvm",
"windows_aarch64_msvc",
"windows_i686_gnu",
"windows_i686_gnullvm",
"windows_i686_msvc",
"windows_x86_64_gnu",
"windows_x86_64_gnullvm",
"windows_x86_64_msvc",
]
[[package]]
name = "windows_aarch64_gnullvm"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3"
[[package]]
name = "windows_aarch64_msvc"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469"
[[package]]
name = "windows_i686_gnu"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b"
[[package]]
name = "windows_i686_gnullvm"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66"
[[package]]
name = "windows_i686_msvc"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66"
[[package]]
name = "windows_x86_64_gnu"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78"
[[package]]
name = "windows_x86_64_gnullvm"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d"
[[package]]
name = "windows_x86_64_msvc"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec"

10
Cargo.toml Normal file
View file

@ -0,0 +1,10 @@
[package]
name = "async-example"
version = "0.1.0"
edition = "2021"
[dependencies]
tokio = { version = "1.36.0", default-features = false, features = ["time", "rt-multi-thread", "macros"] }
futures = "0.3.30"
async-stream = "0.3.5"

195
src/main.rs Normal file
View file

@ -0,0 +1,195 @@
#![allow(unused_imports)]
#![allow(dead_code)]
use async_stream::stream;
use futures::{
future::BoxFuture,
future::FutureExt,
select,
stream::{FuturesOrdered, FuturesUnordered, Stream},
StreamExt,
};
use std::future::Future;
use std::pin::Pin;
use tokio::task::{JoinError, JoinHandle};
use tokio::time::{sleep, Duration};
async fn sleep_then_print(timer: i32) {
println!("Start timer {}.", timer);
std::thread::sleep(Duration::from_secs(timer as u64));
// tokio::time::sleep(Duration::from_secs(timer as u64)).await;
// tokio::time::sleep(Duration::from_secs(1)).await;
println!("Timer {} done.", timer);
}
async fn blocking() {
tokio::join!(
sleep_then_print(100),
sleep_then_print(2),
sleep_then_print(3),
);
// tokio::join!(
// tokio::spawn(sleep_then_print(1)),
// tokio::spawn(sleep_then_print(2)),
// tokio::spawn(sleep_then_print(3)),
// );
}
async fn task(delay: u64, val: u64) -> u64 {
sleep(Duration::from_millis(delay)).await;
val
}
fn zero_to_three() -> impl Stream<Item = u64> {
stream! {
for i in 0..3 {
yield i;
}
}
}
async fn stream() {
let t = zero_to_three();
tokio::pin!(t);
while let Some(value) = t.next().await {
println!("got {}", value);
}
}
async fn tokio_select_main() {
let mut count = 0;
let t1 = task(300u64, 1);
let t2 = task(200u64, 2);
let t3 = zero_to_three();
tokio::pin!(t1, t2, t3);
loop {
if count > 5 {
break;
}
tokio::select! {
t1_val = &mut t1 => println!("task one completed first {}", t1_val),
t2_val = &mut t2 => println!("task two completed first {}", t2_val),
Some(val) = &mut t3.next() => println!("{:?}", val),
else => break,
};
count += 1;
}
}
async fn futures_select_main() {
let mut count = 0;
let t1 = task(300u64, 1).fuse();
let t2 = task(200u64, 2).fuse();
let t3 = zero_to_three().fuse();
tokio::pin!(t1, t2, t3);
loop {
if count > 5 {
break;
}
futures::select! {
t1_val = &mut t1 => println!("task one completed first {}", t1_val),
t2_val = &mut t2 => println!("task two completed first {}", t2_val),
val = &mut t3.next() => println!("{:?}", val),
complete => break,
};
count += 1;
}
}
async fn tokio_select_main_cond() {
let mut count = 0;
let t1 = task(300u64, 1);
let t2 = task(200u64, 2);
let t3 = zero_to_three();
tokio::pin!(t1, t2, t3);
let mut done1 = false;
let mut done2 = false;
loop {
if count > 5 {
break;
}
tokio::select! {
t1_val = &mut t1, if !done1 => {
done1 = true;
println!("task one completed first {}", t1_val);
}
t2_val = &mut t2, if !done2 => {
done2 = true;
println!("task two completed first {}", t2_val);
}
Some(val) = &mut t3.next() => println!("{:?}", val),
else => break,
};
count += 1;
}
}
async fn unordered() {
let mut tasks = FuturesUnordered::<Pin<Box<dyn Future<Output = u64>>>>::new();
tasks.push(Box::pin(task(1000, 1)));
tasks.push(Box::pin(task(1000, 2)));
tasks.push(Box::pin(task(1000, 3)));
while let Some(result) = tasks.next().await {
println!("Task result {result}");
}
}
async fn ordered() {
let mut tasks = FuturesOrdered::<Pin<Box<dyn Future<Output = u64>>>>::new();
tasks.push_back(Box::pin(task(1000, 1)));
tasks.push_back(Box::pin(task(1000, 2)));
tasks.push_back(Box::pin(task(1000, 3)));
while let Some(result) = tasks.next().await {
println!("Task result {result}");
}
}
async fn unordered_handle() {
// let mut tasks =
// FuturesUnordered::<Pin<Box<dyn Future<Output = Result<u64, JoinError>>>>>::new();
// let mut tasks = FuturesUnordered::<BoxFuture<Result<u64, JoinError>>>::new();
let mut tasks = FuturesUnordered::<JoinHandle<u64>>::new();
tasks.push(tokio::spawn(task(1000, 1)));
tasks.push(tokio::spawn(task(1000, 2)));
tasks.push(tokio::spawn(task(1000, 3)));
while let Some(result) = tasks.next().await {
println!("Task result {result:?}");
}
}
async fn hello() {
println!("Hello from async");
}
#[tokio::main(flavor = "multi_thread", worker_threads = 4)]
async fn main() {
println!("Hello Rust Bangalore!!");
// hello();
// stream().await;
// tokio_select_main().await;
// futures_main().await;
// tokio_select_main_cond().await;
// ordered().await;
// unordered().await;
// unordered_handle().await;
// blocking().await;
}