Demo code to test stream multiplexing with quinnquic
This commit is contained in:
commit
09be7f749d
1140
Cargo.lock
generated
Normal file
1140
Cargo.lock
generated
Normal file
File diff suppressed because it is too large
Load diff
16
Cargo.toml
Normal file
16
Cargo.toml
Normal file
|
@ -0,0 +1,16 @@
|
|||
[package]
|
||||
name = "gst-quinn-quic-mux"
|
||||
license = "MPL-2.0"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
gst = { package = "gstreamer", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_22"] }
|
||||
gst-video = { package = "gstreamer-video", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
|
||||
gst-audio = { package = "gstreamer-audio", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
|
||||
gst-app = { package = "gstreamer-app", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
|
||||
ctrlc = "3.4"
|
||||
tokio = { version = "1.17", features = ["full"] }
|
||||
once_cell = "1.19"
|
||||
clap = { version = "4.5", features = ["derive"] }
|
||||
|
297
src/main.rs
Normal file
297
src/main.rs
Normal file
|
@ -0,0 +1,297 @@
|
|||
use clap::Parser;
|
||||
use gst::glib;
|
||||
use gst::prelude::*;
|
||||
use once_cell::sync::Lazy;
|
||||
use std::str::FromStr;
|
||||
|
||||
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
|
||||
gst::DebugCategory::new(
|
||||
"gst-quinn-quic-mux",
|
||||
gst::DebugColorFlags::empty(),
|
||||
Some("gst-quinn-quic-mux"),
|
||||
)
|
||||
});
|
||||
|
||||
#[derive(Parser, Debug)]
|
||||
#[command(name = "gst-quinn-quic-mux")]
|
||||
#[command(version = "0.1")]
|
||||
#[command(about = "Code for testing multiplexing with quinnquic", long_about = None)]
|
||||
struct Cli {
|
||||
#[clap(long, short, action)]
|
||||
receiver: bool,
|
||||
}
|
||||
|
||||
fn video_bin(text: String) -> gst::Bin {
|
||||
let videosrc = gst::ElementFactory::make("videotestsrc").build().unwrap();
|
||||
let overlay = gst::ElementFactory::make("clockoverlay").build().unwrap();
|
||||
let convert = gst::ElementFactory::make("videoconvert").build().unwrap();
|
||||
let x264enc = gst::ElementFactory::make("x264enc").build().unwrap();
|
||||
let capsf = gst::ElementFactory::make("capsfilter").build().unwrap();
|
||||
let parse = gst::ElementFactory::make("h264parse").build().unwrap();
|
||||
|
||||
let bin = gst::Bin::builder().name(text.clone()).build();
|
||||
|
||||
parse.set_property("config-interval", -1);
|
||||
let caps = gst::Caps::from_str(
|
||||
"video/x-h264,width=640,height=480,format=I420,framerate=25/1,stream-format=byte-stream",
|
||||
)
|
||||
.unwrap();
|
||||
capsf.set_property("caps", caps);
|
||||
|
||||
overlay.set_property("text", text);
|
||||
overlay.set_property_from_str("valignment", "top");
|
||||
overlay.set_property_from_str("halignment", "left");
|
||||
overlay.set_property_from_str("font-desc", "Sans, 72");
|
||||
|
||||
bin.add_many([&videosrc, &overlay, &convert, &x264enc, &capsf, &parse])
|
||||
.expect("Failed to add elements");
|
||||
|
||||
videosrc.link(&overlay).unwrap();
|
||||
overlay.link(&convert).unwrap();
|
||||
convert.link(&x264enc).unwrap();
|
||||
x264enc.link(&capsf).unwrap();
|
||||
capsf.link(&parse).unwrap();
|
||||
|
||||
let parse_srcpad = parse.static_pad("src").unwrap();
|
||||
let srcpad = gst::GhostPad::builder(gst::PadDirection::Src)
|
||||
.name("src")
|
||||
.build();
|
||||
srcpad.set_target(Some(&parse_srcpad)).unwrap();
|
||||
|
||||
bin.add_pad(&srcpad).unwrap();
|
||||
|
||||
bin
|
||||
}
|
||||
|
||||
fn depay_bin() -> gst::Bin {
|
||||
let queue = gst::ElementFactory::make("queue").build().unwrap();
|
||||
let capsf = gst::ElementFactory::make("capsfilter").build().unwrap();
|
||||
let parse = gst::ElementFactory::make("h264parse").build().unwrap();
|
||||
let dec = gst::ElementFactory::make("avdec_h264").build().unwrap();
|
||||
let convert = gst::ElementFactory::make("videoconvert").build().unwrap();
|
||||
let sink = gst::ElementFactory::make("autovideosink").build().unwrap();
|
||||
|
||||
sink.set_property("sync", false);
|
||||
parse.set_property("config-interval", -1);
|
||||
let caps = gst::Caps::from_str(
|
||||
"video/x-h264,width=640,height=480,format=I420,framerate=25/1,stream-format=byte-stream",
|
||||
)
|
||||
.unwrap();
|
||||
capsf.set_property("caps", caps);
|
||||
|
||||
let bin = gst::Bin::new();
|
||||
|
||||
bin.add_many([&queue, &capsf, &parse, &dec, &convert, &sink])
|
||||
.unwrap();
|
||||
|
||||
queue.link(&capsf).unwrap();
|
||||
capsf.link(&parse).unwrap();
|
||||
parse.link(&dec).unwrap();
|
||||
dec.link(&convert).unwrap();
|
||||
convert.link(&sink).unwrap();
|
||||
|
||||
let queue_sinkpad = queue.static_pad("sink").unwrap();
|
||||
let sinkpad = gst::GhostPad::builder(gst::PadDirection::Sink)
|
||||
.name("sink")
|
||||
.build();
|
||||
sinkpad.set_target(Some(&queue_sinkpad)).unwrap();
|
||||
|
||||
bin.add_pad(&sinkpad).unwrap();
|
||||
|
||||
bin
|
||||
}
|
||||
|
||||
fn receive_pipeline(pipeline: &gst::Pipeline) {
|
||||
let quicsrc = gst::ElementFactory::make("quinnquicsrc").build().unwrap();
|
||||
let demux = gst::ElementFactory::make("quinnquicdemux").build().unwrap();
|
||||
|
||||
quicsrc.set_property("initial-mtu", 1200u32);
|
||||
quicsrc.set_property("min-mtu", 1200u32);
|
||||
quicsrc.set_property("upper-bound-mtu", 65527u32);
|
||||
quicsrc.set_property("max-udp-payload-size", 65527u32);
|
||||
quicsrc.set_property("use-datagram", false);
|
||||
quicsrc.set_property("secure-connection", false);
|
||||
quicsrc.set_property("server-name", "sanchayanmaity.net");
|
||||
|
||||
pipeline.add_many([&quicsrc, &demux]).unwrap();
|
||||
|
||||
quicsrc.link(&demux).unwrap();
|
||||
|
||||
let pipeline_weak = pipeline.downgrade();
|
||||
demux.connect("pad-added", false, move |args| {
|
||||
let pipeline = match pipeline_weak.upgrade() {
|
||||
Some(self_) => self_,
|
||||
None => return None,
|
||||
};
|
||||
|
||||
let pad = args[1]
|
||||
.get::<gst::Pad>()
|
||||
.expect("Second argument to demux pad-added must be pad");
|
||||
gst::info!(CAT, "QUIC demuxer pad {} added", pad.name());
|
||||
|
||||
let bin = depay_bin();
|
||||
|
||||
pipeline.add(&bin).unwrap();
|
||||
|
||||
bin.sync_state_with_parent().unwrap();
|
||||
|
||||
let sinkpad = bin.static_pad("sink").unwrap();
|
||||
|
||||
pad.link(&sinkpad).unwrap();
|
||||
|
||||
let weak_pipeline = pipeline.downgrade();
|
||||
// Capture pipeline graph 5 secs later to correctly capture STATE changes.
|
||||
glib::timeout_add_seconds_once(10, move || {
|
||||
let pipeline = match weak_pipeline.upgrade() {
|
||||
Some(pipeline) => pipeline,
|
||||
None => return,
|
||||
};
|
||||
|
||||
pipeline.debug_to_dot_file_with_ts(gst::DebugGraphDetails::all(), "demux-pad-added");
|
||||
});
|
||||
|
||||
None
|
||||
});
|
||||
}
|
||||
|
||||
fn send_pipeline(pipeline: &gst::Pipeline) {
|
||||
let video1 = video_bin("Stream 1".to_string());
|
||||
let video2 = video_bin("Stream 2".to_string());
|
||||
let video3 = video_bin("Stream 3".to_string());
|
||||
let video4 = video_bin("Datagram".to_string());
|
||||
let mux = gst::ElementFactory::make("quinnquicmux")
|
||||
.name("quic-mux")
|
||||
.build()
|
||||
.unwrap();
|
||||
let sink = gst::ElementFactory::make("quinnquicsink")
|
||||
.name("quic-sink")
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
sink.set_property("drop-buffer-for-datagram", true);
|
||||
sink.set_property("initial-mtu", 1200u32);
|
||||
sink.set_property("min-mtu", 1200u32);
|
||||
sink.set_property("upper-bound-mtu", 65527u32);
|
||||
sink.set_property("max-udp-payload-size", 65527u32);
|
||||
sink.set_property("use-datagram", false);
|
||||
sink.set_property("secure-connection", false);
|
||||
sink.set_property("server-name", "sanchayanmaity.net");
|
||||
|
||||
pipeline
|
||||
.add_many([&video1, &video2, &video3, &video4])
|
||||
.unwrap();
|
||||
pipeline.add_many([&mux, &sink]).unwrap();
|
||||
|
||||
mux.link(&sink).unwrap();
|
||||
}
|
||||
|
||||
fn main() {
|
||||
let cli = Cli::parse();
|
||||
|
||||
gst::init().unwrap();
|
||||
|
||||
let pipeline = gst::Pipeline::new();
|
||||
let context = glib::MainContext::default();
|
||||
let main_loop = glib::MainLoop::new(Some(&context), false);
|
||||
|
||||
let _ = pipeline.set_state(gst::State::Ready);
|
||||
|
||||
if !cli.receiver {
|
||||
send_pipeline(&pipeline);
|
||||
pipeline.debug_to_dot_file_with_ts(gst::DebugGraphDetails::all(), "quic-mux-send-pipeline");
|
||||
} else {
|
||||
receive_pipeline(&pipeline);
|
||||
pipeline.debug_to_dot_file_with_ts(gst::DebugGraphDetails::all(), "quic-mux-recv-pipeline");
|
||||
}
|
||||
|
||||
let _ = pipeline.set_state(gst::State::Playing);
|
||||
|
||||
let bus = pipeline.bus().unwrap();
|
||||
let l_clone = main_loop.clone();
|
||||
let _bus_watch = bus
|
||||
.add_watch(move |_, msg| {
|
||||
use gst::MessageView;
|
||||
match msg.view() {
|
||||
MessageView::Eos(..) => {
|
||||
println!("\nReceived End of Stream, quitting...");
|
||||
l_clone.quit();
|
||||
}
|
||||
MessageView::Error(err) => {
|
||||
gst::error!(CAT, obj: &err.src().unwrap(),
|
||||
"Error from {:?}: {} ({:?})",
|
||||
err.src().map(|s| s.path_string()),
|
||||
err.error(),
|
||||
err.debug()
|
||||
);
|
||||
l_clone.quit();
|
||||
}
|
||||
_ => (),
|
||||
};
|
||||
glib::ControlFlow::Continue
|
||||
})
|
||||
.expect("Failed to add bus watch");
|
||||
|
||||
ctrlc::set_handler({
|
||||
let pipeline_weak = pipeline.downgrade();
|
||||
move || {
|
||||
let pipeline = pipeline_weak.upgrade().unwrap();
|
||||
println!("\nReceived Ctrl-c, sending EOS...");
|
||||
pipeline.send_event(gst::event::Eos::new());
|
||||
}
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
if !cli.receiver {
|
||||
let mux = pipeline.by_name("quic-mux").unwrap();
|
||||
|
||||
let video1 = pipeline.by_name("Stream 1").unwrap();
|
||||
let video2 = pipeline.by_name("Stream 2").unwrap();
|
||||
let video3 = pipeline.by_name("Stream 3").unwrap();
|
||||
let video4 = pipeline.by_name("Datagram").unwrap();
|
||||
|
||||
let video1_mux_pad = mux.request_pad_simple("stream_uni_%u").unwrap();
|
||||
let video2_mux_pad = mux.request_pad_simple("stream_uni_%u").unwrap();
|
||||
let video3_mux_pad = mux.request_pad_simple("stream_uni_%u").unwrap();
|
||||
let video4_mux_pad = mux.request_pad_simple("datagram").unwrap();
|
||||
|
||||
let video1_pad = video1.static_pad("src").unwrap();
|
||||
let video2_pad = video2.static_pad("src").unwrap();
|
||||
let video3_pad = video3.static_pad("src").unwrap();
|
||||
let video4_pad = video4.static_pad("src").unwrap();
|
||||
|
||||
video1_pad.link(&video1_mux_pad).unwrap();
|
||||
video2_pad.link(&video2_mux_pad).unwrap();
|
||||
video3_pad.link(&video3_mux_pad).unwrap();
|
||||
video4_pad.link(&video4_mux_pad).unwrap();
|
||||
|
||||
mux.sync_state_with_parent().unwrap();
|
||||
video1.sync_state_with_parent().unwrap();
|
||||
video2.sync_state_with_parent().unwrap();
|
||||
video3.sync_state_with_parent().unwrap();
|
||||
|
||||
pipeline.debug_to_dot_file_with_ts(gst::DebugGraphDetails::all(), "setup-quic-mux");
|
||||
}
|
||||
|
||||
let weak_pipeline = pipeline.downgrade();
|
||||
// Capture pipeline graph 5 secs later to correctly capture STATE changes.
|
||||
glib::timeout_add_seconds_once(10, move || {
|
||||
let pipeline = match weak_pipeline.upgrade() {
|
||||
Some(pipeline) => pipeline,
|
||||
None => return,
|
||||
};
|
||||
|
||||
let name = if !cli.receiver {
|
||||
"gst-quinnquicmux-pipeline"
|
||||
} else {
|
||||
"gst-quinnquicdemux-pipeline"
|
||||
};
|
||||
|
||||
pipeline.debug_to_dot_file_with_ts(gst::DebugGraphDetails::all(), name);
|
||||
});
|
||||
|
||||
let _ = pipeline.set_state(gst::State::Playing);
|
||||
|
||||
main_loop.run();
|
||||
|
||||
pipeline.set_state(gst::State::Null).unwrap();
|
||||
}
|
Loading…
Reference in a new issue