diff --git a/src/subscribe/stream.rs b/src/subscribe/stream.rs index 548c1df..c3ac622 100644 --- a/src/subscribe/stream.rs +++ b/src/subscribe/stream.rs @@ -5,7 +5,7 @@ use core::{ pin::Pin, task::{Context as AsyncContext, Poll}, }; -use futures_util::stream::Fuse; +use futures_util::stream::{Fuse, FusedStream}; use zmq::Context as ZmqContext; /// Stream that asynchronously produces [`Message`]s using a ZMQ subscriber. @@ -88,6 +88,12 @@ impl Stream for MultiMessageStream { } } +impl FusedStream for MultiMessageStream { + fn is_terminated(&self) -> bool { + self.streams.iter().all(|stream| stream.is_terminated()) + } +} + pub fn subscribe_multi_async(endpoints: &[&str]) -> Result { let context = ZmqContext::new(); let mut res = MultiMessageStream::new(endpoints.len());