Compare commits

...

2 commits

Author SHA1 Message Date
de707f1c07 Test stream close with pad release 2024-06-24 19:00:03 +05:30
9db4c7eeb2 Use compositor to composite the streams
Instead of using four video sinks, use a compositor to composite the
incoming video streams.
2024-06-24 11:46:08 +05:30

View file

@ -63,32 +63,30 @@ fn video_bin(text: String) -> gst::Bin {
bin
}
fn depay_bin() -> gst::Bin {
fn depay_bin(bin_name: String) -> gst::Bin {
let queue = gst::ElementFactory::make("queue").build().unwrap();
let capsf = gst::ElementFactory::make("capsfilter").build().unwrap();
let parse = gst::ElementFactory::make("h264parse").build().unwrap();
let dec = gst::ElementFactory::make("avdec_h264").build().unwrap();
let convert = gst::ElementFactory::make("videoconvert").build().unwrap();
let sink = gst::ElementFactory::make("autovideosink").build().unwrap();
sink.set_property("sync", false);
parse.set_property("config-interval", -1);
let caps = gst::Caps::from_str(
"video/x-h264,width=640,height=480,format=I420,framerate=25/1,stream-format=byte-stream",
)
.unwrap();
capsf.set_property("caps", caps);
dec.set_property("discard-corrupted-frames", true);
let bin = gst::Bin::new();
let bin = gst::Bin::builder().name(bin_name).build();
bin.add_many([&queue, &capsf, &parse, &dec, &convert, &sink])
bin.add_many([&queue, &capsf, &parse, &dec, &convert])
.unwrap();
queue.link(&capsf).unwrap();
capsf.link(&parse).unwrap();
parse.link(&dec).unwrap();
dec.link(&convert).unwrap();
convert.link(&sink).unwrap();
let queue_sinkpad = queue.static_pad("sink").unwrap();
let sinkpad = gst::GhostPad::builder(gst::PadDirection::Sink)
@ -96,7 +94,14 @@ fn depay_bin() -> gst::Bin {
.build();
sinkpad.set_target(Some(&queue_sinkpad)).unwrap();
let convert_srcpad = convert.static_pad("src").unwrap();
let srcpad = gst::GhostPad::builder(gst::PadDirection::Src)
.name("src")
.build();
srcpad.set_target(Some(&convert_srcpad)).unwrap();
bin.add_pad(&sinkpad).unwrap();
bin.add_pad(&srcpad).unwrap();
bin
}
@ -104,6 +109,14 @@ fn depay_bin() -> gst::Bin {
fn receive_pipeline(pipeline: &gst::Pipeline) {
let quicsrc = gst::ElementFactory::make("quinnquicsrc").build().unwrap();
let demux = gst::ElementFactory::make("quinnquicdemux").build().unwrap();
let compositor = gst::ElementFactory::make("compositor")
.name("compositor")
.build()
.unwrap();
let sink = gst::ElementFactory::make("autovideosink")
.name("video-sink")
.build()
.unwrap();
quicsrc.set_property("initial-mtu", 1200u32);
quicsrc.set_property("min-mtu", 1200u32);
@ -113,9 +126,12 @@ fn receive_pipeline(pipeline: &gst::Pipeline) {
quicsrc.set_property("secure-connection", false);
quicsrc.set_property("server-name", "sanchayanmaity.net");
pipeline.add_many([&quicsrc, &demux]).unwrap();
pipeline
.add_many([&quicsrc, &demux, &compositor, &sink])
.unwrap();
quicsrc.link(&demux).unwrap();
compositor.link(&sink).unwrap();
let pipeline_weak = pipeline.downgrade();
demux.connect("pad-added", false, move |args| {
@ -129,15 +145,40 @@ fn receive_pipeline(pipeline: &gst::Pipeline) {
.expect("Second argument to demux pad-added must be pad");
gst::info!(CAT, "QUIC demuxer pad {} added", pad.name());
let bin = depay_bin();
let bin = depay_bin(pad.name().to_string());
pipeline.add(&bin).unwrap();
bin.sync_state_with_parent().unwrap();
let sinkpad = bin.static_pad("sink").unwrap();
let srcpad = bin.static_pad("src").unwrap();
let compositor = pipeline.by_name("compositor").unwrap();
let compositor_sink_pad = compositor.request_pad_simple("sink_%u").unwrap();
match pad.name().as_str() {
"stream_uni_0" => {
compositor_sink_pad.set_property("xpos", 0);
compositor_sink_pad.set_property("ypos", 0);
}
"stream_uni_1" => {
compositor_sink_pad.set_property("xpos", 640);
compositor_sink_pad.set_property("ypos", 0);
}
"stream_uni_2" => {
compositor_sink_pad.set_property("xpos", 0);
compositor_sink_pad.set_property("ypos", 480);
}
"datagram" => {
compositor_sink_pad.set_property("xpos", 640);
compositor_sink_pad.set_property("ypos", 480);
}
_ => (),
}
pad.link(&sinkpad).unwrap();
srcpad.link(&compositor_sink_pad).unwrap();
let weak_pipeline = pipeline.downgrade();
// Capture pipeline graph 5 secs later to correctly capture STATE changes.
@ -152,6 +193,55 @@ fn receive_pipeline(pipeline: &gst::Pipeline) {
None
});
let pipeline_weak = pipeline.downgrade();
demux.connect("pad-removed", false, move |args| {
let pipeline = match pipeline_weak.upgrade() {
Some(self_) => self_,
None => return None,
};
let pad = args[1]
.get::<gst::Pad>()
.expect("Second argument to demux pad-removed must be pad");
let pad_name = pad.name();
gst::info!(CAT, "QUIC demuxer pad {} removed", pad_name);
if let Some(bin) = pipeline.by_name(&pad_name) {
let bin_srcpad = bin.static_pad("src").unwrap();
if let Some(peer) = bin_srcpad.peer() {
bin_srcpad.unlink(&peer).unwrap();
gst::info!(CAT, "Unlinked {}", peer.name());
let compositor = pipeline.by_name("compositor").unwrap();
compositor.release_request_pad(&peer);
gst::info!(CAT, "Released pad {} from compositor", peer.name());
}
pipeline.remove(&bin).unwrap();
let pipeline_weak = pipeline.downgrade();
glib::idle_add_once(move || {
let _ = match pipeline_weak.upgrade() {
Some(self_) => self_,
None => return,
};
let _ = bin.set_state(gst::State::Null);
});
pipeline.debug_to_dot_file_with_ts(
gst::DebugGraphDetails::all(),
format!("Removed bin downstream to {pad_name}"),
);
}
None
});
}
fn send_pipeline(pipeline: &gst::Pipeline) {
@ -270,6 +360,66 @@ fn main() {
video3.sync_state_with_parent().unwrap();
pipeline.debug_to_dot_file_with_ts(gst::DebugGraphDetails::all(), "setup-quic-mux");
// Test releasing stream/request pad from muxer
let weak_pipeline = pipeline.downgrade();
glib::timeout_add_seconds_once(15, move || {
let pipeline = match weak_pipeline.upgrade() {
Some(pipeline) => pipeline,
None => return,
};
gst::info!(CAT, "Adding probe to remove Stream 3....");
video3_pad.add_probe(gst::PadProbeType::IDLE, move |pad, _probe_info| {
if let Some(peer) = pad.peer() {
gst::info!(CAT, "Removing Stream 3....");
pad.unlink(&peer).unwrap();
if let Some(parent) = peer
.parent()
.and_then(|p| p.downcast::<gst::Element>().ok())
{
gst::log!(
CAT,
"Releasing request pad {} from parent {}",
peer.name(),
parent.name()
);
parent.release_request_pad(&peer);
gst::log!(
CAT,
"Released request pad {} from parent {}",
peer.name(),
parent.name()
);
}
if let Some(parent) =
pad.parent().and_then(|p| p.downcast::<gst::Element>().ok())
{
pipeline.remove(&parent).unwrap();
let weak_pipeline = pipeline.downgrade();
glib::idle_add_once(move || {
let _ = match weak_pipeline.upgrade() {
Some(self_) => self_,
None => return,
};
let _ = parent.set_state(gst::State::Null);
});
}
}
gst::info!(CAT, "Removed Stream 3");
pipeline
.debug_to_dot_file_with_ts(gst::DebugGraphDetails::all(), "removed-Stream-3");
gst::PadProbeReturn::Drop
});
});
}
let weak_pipeline = pipeline.downgrade();