diff --git a/examples/subscribe_async.rs b/examples/subscribe_async.rs index 0cda7e7..02880be 100644 --- a/examples/subscribe_async.rs +++ b/examples/subscribe_async.rs @@ -1,9 +1,9 @@ -use bitcoincore_zmq::subscribe_single_async; +use bitcoincore_zmq::subscribe_async; use futures::executor::block_on; use futures_util::StreamExt; fn main() { - let mut stream = subscribe_single_async("tcp://127.0.0.1:28332").unwrap(); + let mut stream = subscribe_async(&["tcp://127.0.0.1:28332"]).unwrap(); // This is a small example to demonstrate subscribe_single_async, it is okay here to use // block_on, but not in production environments as this defeats the purpose of async. diff --git a/examples/subscribe_blocking.rs b/examples/subscribe_blocking.rs index 6fe2adc..8363df4 100644 --- a/examples/subscribe_blocking.rs +++ b/examples/subscribe_blocking.rs @@ -1,4 +1,4 @@ -use bitcoincore_zmq::subscribe_single_blocking; +use bitcoincore_zmq::subscribe_blocking; use core::ops::ControlFlow; fn main() { @@ -14,7 +14,7 @@ fn main() { ControlFlow::Continue(()) }; - match subscribe_single_blocking("tcp://127.0.0.1:28359", callback) { + match subscribe_blocking(&["tcp://127.0.0.1:28359"], callback) { Ok(ControlFlow::Break(err)) => { // Callback exited by returning ControlFlow::Break println!("Error receiving message: {err}"); diff --git a/examples/subscribe_receiver.rs b/examples/subscribe_receiver.rs index e06cc92..dc47ee9 100644 --- a/examples/subscribe_receiver.rs +++ b/examples/subscribe_receiver.rs @@ -1,7 +1,7 @@ -use bitcoincore_zmq::subscribe_multi; +use bitcoincore_zmq::subscribe_receiver; fn main() { - let rx = subscribe_multi(&["tcp://127.0.0.1:28332", "tcp://127.0.0.1:28333"]).unwrap(); + let rx = subscribe_receiver(&["tcp://127.0.0.1:28332", "tcp://127.0.0.1:28333"]).unwrap(); for msg in rx { match msg { diff --git a/examples/subscribe_receiver_pool.rs b/examples/subscribe_receiver_pool.rs index f519510..04d1650 100644 --- a/examples/subscribe_receiver_pool.rs +++ b/examples/subscribe_receiver_pool.rs @@ -1,4 +1,4 @@ -use bitcoincore_zmq::subscribe_multi; +use bitcoincore_zmq::subscribe_receiver; use std::{ sync::{Arc, Mutex}, thread, @@ -11,7 +11,7 @@ fn main() { let mut threads = Vec::new(); let rx = Arc::new(Mutex::new( - subscribe_multi(&["tcp://127.0.0.1:28332", "tcp://127.0.0.1:28333"]).unwrap(), + subscribe_receiver(&["tcp://127.0.0.1:28332", "tcp://127.0.0.1:28333"]).unwrap(), )); for id in 0..POOL_THREADS { diff --git a/integration_tests/src/main.rs b/integration_tests/src/main.rs index 42acc13..4c5e370 100644 --- a/integration_tests/src/main.rs +++ b/integration_tests/src/main.rs @@ -2,7 +2,7 @@ mod endpoints; mod util; use bitcoincore_rpc::Client; -use bitcoincore_zmq::{subscribe_multi, subscribe_multi_async, subscribe_single_blocking, Message}; +use bitcoincore_zmq::{subscribe_async, subscribe_blocking, subscribe_receiver, Message}; use core::{assert_eq, ops::ControlFlow}; use futures::{executor::block_on, StreamExt}; use std::{sync::mpsc, thread}; @@ -29,7 +29,7 @@ fn main() { } fn test_hashblock(rpc: &Client) { - let receiver = subscribe_multi(&[endpoints::HASHBLOCK, endpoints::RAWBLOCK]) + let receiver = subscribe_receiver(&[endpoints::HASHBLOCK, endpoints::RAWBLOCK]) .expect("failed to subscribe to Bitcoin Core's ZMQ publisher"); let rpc_hash = generate(rpc, 1).expect("rpc call failed").0[0]; @@ -47,7 +47,7 @@ fn test_hashblock(rpc: &Client) { } fn test_hashtx(rpc: &Client) { - let receiver = subscribe_multi(&[endpoints::HASHTX, endpoints::RAWTX]) + let receiver = subscribe_receiver(&[endpoints::HASHTX, endpoints::RAWTX]) .expect("failed to subscribe to Bitcoin Core's ZMQ publisher"); generate(rpc, 1).expect("rpc call failed"); @@ -69,7 +69,7 @@ fn test_sub_blocking(rpc: &Client) { let (tx, rx) = mpsc::channel(); let h = thread::spawn(move || { - subscribe_single_blocking(endpoints::HASHBLOCK, |msg| { + subscribe_blocking(&[endpoints::HASHBLOCK], |msg| { let msg = msg.expect("zmq message error"); match msg { @@ -99,7 +99,7 @@ fn test_sub_blocking(rpc: &Client) { } fn test_hashblock_async(rpc: &Client) { - let mut stream = subscribe_multi_async(&[endpoints::HASHBLOCK, endpoints::RAWBLOCK]) + let mut stream = subscribe_async(&[endpoints::HASHBLOCK, endpoints::RAWBLOCK]) .expect("failed to subscribe to Bitcoin Core's ZMQ subscriber"); let rpc_hash = generate(rpc, 1).expect("rpc call failed").0[0]; diff --git a/src/lib.rs b/src/lib.rs index b162e6a..972a9d3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -9,13 +9,20 @@ pub use crate::{ error::Error, message::{Message, DATA_MAX_LEN, SEQUENCE_LEN, TOPIC_MAX_LEN}, sequence_message::SequenceMessage, - subscribe::{ - blocking::{subscribe_multi_blocking, subscribe_single_blocking}, - receiver::{subscribe_multi, subscribe_single}, - }, + subscribe::{blocking::subscribe_blocking, receiver::subscribe_receiver}, }; #[cfg(feature = "async")] +pub use crate::subscribe::stream::{subscribe_async, MessageStream}; + +#[allow(deprecated)] +pub use crate::subscribe::{ + blocking::{subscribe_multi_blocking, subscribe_single_blocking}, + receiver::{subscribe_multi, subscribe_single}, +}; + +#[cfg(feature = "async")] +#[allow(deprecated)] pub use crate::subscribe::stream::{ - subscribe_multi_async, subscribe_single_async, MessageStream, MultiMessageStream, + subscribe_multi_async, subscribe_single_async, MultiMessageStream, }; diff --git a/src/subscribe/blocking.rs b/src/subscribe/blocking.rs index 7baa45e..c150d73 100644 --- a/src/subscribe/blocking.rs +++ b/src/subscribe/blocking.rs @@ -1,20 +1,14 @@ use super::{new_socket_internal, subscribe_internal}; use crate::{error::Result, message::Message}; use core::{convert::Infallible, ops::ControlFlow}; -use std::{sync::mpsc::channel, thread}; -use zmq::Context; - -fn break_on_err(is_err: bool) -> ControlFlow<()> { - if is_err { - ControlFlow::Break(()) - } else { - ControlFlow::Continue(()) - } -} /// Subscribes to a single ZMQ endpoint and blocks the thread until [`ControlFlow::Break`] is /// returned by the callback. #[inline] +#[deprecated( + since = "1.3.2", + note = "Use subscribe_blocking. This function has no performance benefit over subscribe_multi_blocking anymore." +)] pub fn subscribe_single_blocking( endpoint: &str, callback: F, @@ -22,16 +16,16 @@ pub fn subscribe_single_blocking( where F: Fn(Result) -> ControlFlow, { - let context = Context::new(); - - let socket = new_socket_internal(&context, endpoint)?; - - Ok(subscribe_internal(socket, callback)) + subscribe_blocking(&[endpoint], callback) } /// Subscribes to multiple ZMQ endpoints and blocks the thread until [`ControlFlow::Break`] is /// returned by the callback. #[inline] +#[deprecated( + since = "1.3.2", + note = "Use subscribe_blocking. The name changed because there is no distinction made anymore between subscribing to 1 or more endpoints." +)] pub fn subscribe_multi_blocking( endpoints: &[&str], callback: F, @@ -39,25 +33,20 @@ pub fn subscribe_multi_blocking( where F: Fn(Result) -> ControlFlow, { - let (tx, rx) = channel(); - let context = Context::new(); - - for endpoint in endpoints { - let tx = tx.clone(); - - let socket = new_socket_internal(&context, endpoint)?; - - thread::spawn(move || { - subscribe_internal(socket, |msg| break_on_err(tx.send(msg).is_err())) - }); - } + subscribe_blocking(endpoints, callback) +} - Ok((|| { - for msg in rx { - callback(msg)?; - } +/// Subscribes to multiple ZMQ endpoints and blocks the thread until [`ControlFlow::Break`] is +/// returned by the callback. +#[inline] +pub fn subscribe_blocking( + endpoints: &[&str], + callback: F, +) -> Result> +where + F: Fn(Result) -> ControlFlow, +{ + let (_context, socket) = new_socket_internal(endpoints)?; - // `tx` is dropped at the end of this function - unreachable!(); - })()) + Ok(subscribe_internal(socket, callback)) } diff --git a/src/subscribe/mod.rs b/src/subscribe/mod.rs index 8d821b5..68dd41a 100644 --- a/src/subscribe/mod.rs +++ b/src/subscribe/mod.rs @@ -11,12 +11,17 @@ use crate::{ use core::{cmp::min, convert::Infallible, ops::ControlFlow, slice}; use zmq::{Context, Socket}; -pub(super) fn new_socket_internal(context: &Context, endpoint: &str) -> Result { +pub(super) fn new_socket_internal(endpoints: &[&str]) -> Result<(Context, Socket)> { + let context = Context::new(); + let socket = context.socket(zmq::SUB)?; - socket.connect(endpoint)?; socket.set_subscribe(b"")?; - Ok(socket) + for endpoint in endpoints { + socket.connect(endpoint)?; + } + + Ok((context, socket)) } pub(super) trait ReceiveFrom { diff --git a/src/subscribe/receiver.rs b/src/subscribe/receiver.rs index aba028b..6ddcfe8 100644 --- a/src/subscribe/receiver.rs +++ b/src/subscribe/receiver.rs @@ -5,7 +5,6 @@ use std::{ sync::mpsc::{channel, Receiver}, thread, }; -use zmq::Context; fn break_on_err(is_err: bool) -> ControlFlow<()> { if is_err { @@ -17,32 +16,32 @@ fn break_on_err(is_err: bool) -> ControlFlow<()> { /// Subscribes to a single ZMQ endpoint and returns a [`Receiver`]. #[inline] +#[deprecated( + since = "1.3.2", + note = "Use subscribe_receiver. This function has no performance benefit over subscribe_multi anymore." +)] pub fn subscribe_single(endpoint: &str) -> Result>> { - let (tx, rx) = channel(); - let context = Context::new(); - - let socket = new_socket_internal(&context, endpoint)?; - - thread::spawn(move || subscribe_internal(socket, |msg| break_on_err(tx.send(msg).is_err()))); - - Ok(rx) + subscribe_receiver(&[endpoint]) } /// Subscribes to multiple ZMQ endpoints and returns a [`Receiver`]. #[inline] +#[deprecated( + since = "1.3.2", + note = "Use subscribe_receiver. The name changed because there is no distinction made anymore between subscribing to 1 or more endpoints." +)] pub fn subscribe_multi(endpoints: &[&str]) -> Result>> { - let (tx, rx) = channel(); - let context = Context::new(); + subscribe_receiver(endpoints) +} - for endpoint in endpoints { - let tx = tx.clone(); +/// Subscribes to multiple ZMQ endpoints and returns a [`Receiver`]. +#[inline] +pub fn subscribe_receiver(endpoints: &[&str]) -> Result>> { + let (tx, rx) = channel(); - let socket = new_socket_internal(&context, endpoint)?; + let (_context, socket) = new_socket_internal(endpoints)?; - thread::spawn(move || { - subscribe_internal(socket, |msg| break_on_err(tx.send(msg).is_err())) - }); - } + thread::spawn(move || subscribe_internal(socket, |msg| break_on_err(tx.send(msg).is_err()))); Ok(rx) } diff --git a/src/subscribe/stream.rs b/src/subscribe/stream.rs index 1be4e68..3157df5 100644 --- a/src/subscribe/stream.rs +++ b/src/subscribe/stream.rs @@ -3,10 +3,10 @@ use crate::{error::Result, message::Message, DATA_MAX_LEN}; use async_zmq::{Stream, StreamExt, Subscribe}; use core::{ pin::Pin, + slice, task::{Context as AsyncContext, Poll}, }; use futures_util::stream::FusedStream; -use zmq::Context as ZmqContext; /// Stream that asynchronously produces [`Message`]s using a ZMQ subscriber. pub struct MessageStream { @@ -53,64 +53,39 @@ impl FusedStream for MessageStream { /// Stream that asynchronously produces [`Message`]s using multiple ZMQ subscribers. The ZMQ /// sockets are polled in a round-robin fashion. -pub struct MultiMessageStream { - streams: Vec, - next: usize, -} +#[deprecated( + since = "1.3.2", + note = "This struct is only used by deprecated functions." +)] +pub struct MultiMessageStream(pub MessageStream); +#[allow(deprecated)] impl MultiMessageStream { - fn new(buf_capacity: usize) -> Self { - Self { - streams: Vec::with_capacity(buf_capacity), - next: 0, - } - } - - fn push(&mut self, stream: Subscribe) { - self.streams.push(MessageStream::new(stream)); - } - /// Returns a reference to the separate [`MessageStream`]s this [`MultiMessageStream`] is made /// of. This is useful to set socket options or use other functions provided by [`zmq`] or /// [`async_zmq`]. (See [`MessageStream::as_zmq_socket`]) pub fn as_streams(&self) -> &[MessageStream] { - &self.streams + slice::from_ref(&self.0) } /// Returns the separate [`MessageStream`]s this [`MultiMessageStream`] is made of. This is /// useful to set socket options or use other functions provided by [`zmq`] or [`async_zmq`]. /// (See [`MessageStream::as_zmq_socket`]) pub fn into_streams(self) -> Vec { - self.streams + vec![self.0] } } +#[allow(deprecated)] impl Stream for MultiMessageStream { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut AsyncContext<'_>) -> Poll> { - let mut index_iter = (self.next..self.streams.len()).chain(0..self.next); - while let Some(i) = index_iter.next() { - match self.streams[i].poll_next_unpin(cx) { - msg @ Poll::Ready(Some(_)) => { - if let Some(next) = index_iter.next() { - self.next = next; - } - return msg; - } - Poll::Ready(None) => { - // should never be returned by async_zmq - } - Poll::Pending => { - // continue, poll others and eventually return Poll::Pending - } - } - } - - Poll::Pending + self.0.poll_next_unpin(cx) } } +#[allow(deprecated)] impl FusedStream for MultiMessageStream { fn is_terminated(&self) -> bool { false @@ -118,21 +93,27 @@ impl FusedStream for MultiMessageStream { } /// Subscribes to multiple ZMQ endpoints and returns a [`MultiMessageStream`]. +#[deprecated( + since = "1.3.2", + note = "Use subscribe_async. This function has no performance benefit over subscribe_single_async anymore." +)] +#[allow(deprecated)] pub fn subscribe_multi_async(endpoints: &[&str]) -> Result { - let context = ZmqContext::new(); - let mut res = MultiMessageStream::new(endpoints.len()); - - for endpoint in endpoints { - let socket = new_socket_internal(&context, endpoint)?.into(); - res.push(socket); - } - - Ok(res) + subscribe_async(endpoints).map(MultiMessageStream) } /// Subscribes to a single ZMQ endpoint and returns a [`MessageStream`]. +#[deprecated( + since = "1.3.2", + note = "Use subscribe_async. The name changed because there is no distinction made anymore between subscribing to 1 or more endpoints." +)] pub fn subscribe_single_async(endpoint: &str) -> Result { - Ok(MessageStream::new( - new_socket_internal(&ZmqContext::new(), endpoint)?.into(), - )) + subscribe_async(&[endpoint]) +} + +/// Subscribes to multiple ZMQ endpoints and returns a [`MessageStream`]. +pub fn subscribe_async(endpoints: &[&str]) -> Result { + let (_context, socket) = new_socket_internal(endpoints)?; + + Ok(MessageStream::new(socket.into())) }