From 56fdb954ebae09cbdc2e1b0e18ec10810b084019 Mon Sep 17 00:00:00 2001 From: Antoni Spaanderman <56turtle56@gmail.com> Date: Sun, 22 Oct 2023 14:33:19 +0200 Subject: [PATCH] impl FusedStream for MultiMessageStream --- src/subscribe/stream.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/subscribe/stream.rs b/src/subscribe/stream.rs index 548c1df..c3ac622 100644 --- a/src/subscribe/stream.rs +++ b/src/subscribe/stream.rs @@ -5,7 +5,7 @@ use core::{ pin::Pin, task::{Context as AsyncContext, Poll}, }; -use futures_util::stream::Fuse; +use futures_util::stream::{Fuse, FusedStream}; use zmq::Context as ZmqContext; /// Stream that asynchronously produces [`Message`]s using a ZMQ subscriber. @@ -88,6 +88,12 @@ impl Stream for MultiMessageStream { } } +impl FusedStream for MultiMessageStream { + fn is_terminated(&self) -> bool { + self.streams.iter().all(|stream| stream.is_terminated()) + } +} + pub fn subscribe_multi_async(endpoints: &[&str]) -> Result { let context = ZmqContext::new(); let mut res = MultiMessageStream::new(endpoints.len());