From f068857b0b9595d20114b45a3bec661829593673 Mon Sep 17 00:00:00 2001 From: Antoni Spaanderman <56turtle56@gmail.com> Date: Sat, 21 Oct 2023 23:36:55 +0200 Subject: [PATCH 1/7] initial async support --- Cargo.toml | 5 ++ integration_tests/Cargo.toml | 3 +- integration_tests/src/main.rs | 32 +++++++++- src/error.rs | 24 +++++++ src/lib.rs | 7 +++ src/subscribe.rs | 54 +++++++++++++--- src/subscribe_async.rs | 114 ++++++++++++++++++++++++++++++++++ 7 files changed, 227 insertions(+), 12 deletions(-) create mode 100644 src/subscribe_async.rs diff --git a/Cargo.toml b/Cargo.toml index 3e953ae..ff3678f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,6 +9,11 @@ repository = "https://github.com/antonilol/rust-bitcoincore-zmq" keywords = ["bitcoin", "bitcoin-core", "zmq"] categories = ["cryptography::cryptocurrencies", "network-programming"] +[features] +async = ["dep:async_zmq", "dep:futures-util"] + [dependencies] +async_zmq = { version = "0.4.0", optional = true } bitcoin = "0.30.0" +futures-util = { version = "0.3.28", optional = true } zmq = "0.10.0" diff --git a/integration_tests/Cargo.toml b/integration_tests/Cargo.toml index 0dbddf5..c965ee3 100644 --- a/integration_tests/Cargo.toml +++ b/integration_tests/Cargo.toml @@ -6,4 +6,5 @@ edition = "2021" [dependencies] bitcoin = "0.30.0" bitcoincore-rpc = "0.17.0" -bitcoincore-zmq = { path = ".." } +bitcoincore-zmq = { path = "..", features = ["async"] } +futures = "0.3.28" diff --git a/integration_tests/src/main.rs b/integration_tests/src/main.rs index 91d70ae..66ba046 100644 --- a/integration_tests/src/main.rs +++ b/integration_tests/src/main.rs @@ -2,8 +2,9 @@ mod endpoints; mod util; use bitcoincore_rpc::Client; -use bitcoincore_zmq::{subscribe_multi, subscribe_single_blocking, Message}; +use bitcoincore_zmq::{subscribe_multi, subscribe_multi_async, subscribe_single_blocking, Message}; use core::{assert_eq, ops::ControlFlow}; +use futures::StreamExt; use std::{sync::mpsc, thread}; use util::{generate, recv_timeout_2, setup_rpc, sleep, RECV_TIMEOUT}; @@ -23,6 +24,7 @@ fn main() { test_hashblock, test_hashtx, test_sub_blocking, + test_hashblock_async, } } @@ -93,3 +95,31 @@ fn test_sub_blocking(rpc: &Client) { assert_eq!(rpc_hash, zmq_hash); } + +fn test_hashblock_async(rpc: &Client) { + let mut stream = subscribe_multi_async(&[endpoints::HASHBLOCK, endpoints::RAWBLOCK]) + .expect("failed to subscribe to Bitcoin Core's ZMQ subscriber"); + + let rpc_hash = generate(rpc, 1).expect("rpc call failed").0[0]; + + let (tx, rx) = mpsc::channel(); + + thread::spawn(move || { + futures::executor::block_on(async { + while let Some(msg) = stream.next().await { + tx.send(msg).unwrap(); + } + }) + }); + + match recv_timeout_2(&rx) { + (Message::Block(block, _), Message::HashBlock(blockhash, _)) + | (Message::HashBlock(blockhash, _), Message::Block(block, _)) => { + assert_eq!(rpc_hash, block.block_hash()); + assert_eq!(rpc_hash, blockhash); + } + (msg1, msg2) => { + panic!("invalid messages received: ({msg1}, {msg2})"); + } + } +} diff --git a/src/error.rs b/src/error.rs index ff32d89..55e4718 100644 --- a/src/error.rs +++ b/src/error.rs @@ -24,6 +24,30 @@ impl From for Error { } } +#[cfg(feature = "async")] +impl From for Error { + #[inline] + fn from(value: async_zmq::SocketError) -> Self { + Self::Zmq(value.into()) + } +} + +#[cfg(feature = "async")] +impl From for Error { + #[inline] + fn from(value: async_zmq::SubscribeError) -> Self { + Self::Zmq(value.into()) + } +} + +#[cfg(feature = "async")] +impl From for Error { + #[inline] + fn from(value: async_zmq::RecvError) -> Self { + Self::Zmq(value.into()) + } +} + impl From for Error { #[inline] fn from(value: consensus::encode::Error) -> Self { diff --git a/src/lib.rs b/src/lib.rs index 01e46cd..08ca3bc 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,6 +2,8 @@ mod error; mod message; mod sequence_message; mod subscribe; +#[cfg(feature = "async")] +mod subscribe_async; pub use crate::{ error::Error, @@ -11,3 +13,8 @@ pub use crate::{ subscribe_multi, subscribe_multi_blocking, subscribe_single, subscribe_single_blocking, }, }; + +#[cfg(feature = "async")] +pub use crate::subscribe_async::{ + subscribe_async, subscribe_multi_async, MessageStream, MultiMessageStream, +}; diff --git a/src/subscribe.rs b/src/subscribe.rs index 2e295a4..35e23a7 100644 --- a/src/subscribe.rs +++ b/src/subscribe.rs @@ -3,7 +3,7 @@ use crate::{ message::{Message, SEQUENCE_LEN, TOPIC_MAX_LEN}, Error, DATA_MAX_LEN, }; -use core::{convert::Infallible, ops::ControlFlow}; +use core::{cmp::min, convert::Infallible, ops::ControlFlow, slice}; use std::{ sync::mpsc::{channel, Receiver}, thread, @@ -109,46 +109,80 @@ fn new_socket_internal(context: &Context, endpoint: &str) -> Result { Ok(socket) } +pub(crate) trait ReceiveFrom { + fn has_next(&self) -> Result; + + fn receive_into(&mut self, buf: &mut [u8]) -> Result; +} + +impl ReceiveFrom for &Socket { + fn has_next(&self) -> Result { + Ok(self.get_rcvmore()?) + } + + fn receive_into(&mut self, buf: &mut [u8]) -> Result { + Ok(self.recv_into(buf, 0)?) + } +} + +impl ReceiveFrom for slice::Iter<'_, zmq::Message> { + fn has_next(&self) -> Result { + Ok(!self.as_slice().is_empty()) + } + + fn receive_into(&mut self, buf: &mut [u8]) -> Result { + // TODO better way to handle None than unwrap + let bytes = &**self.next().unwrap(); + let len = bytes.len(); + let copy_len = min(len, buf.len()); + buf[0..copy_len].copy_from_slice(&bytes[0..copy_len]); + Ok(len) + } +} + #[inline] -fn recv_internal(socket: &Socket, data: &mut [u8; DATA_MAX_LEN]) -> Result { +pub(crate) fn recv_internal( + mut socket: R, + data: &mut [u8; DATA_MAX_LEN], +) -> Result { let mut topic = [0u8; TOPIC_MAX_LEN]; let mut sequence = [0u8; SEQUENCE_LEN]; - let topic_len = socket.recv_into(&mut topic, 0)?; + let topic_len = socket.receive_into(&mut topic)?; if topic_len > TOPIC_MAX_LEN { return Err(Error::InvalidTopic(topic_len, topic)); } - if !socket.get_rcvmore()? { + if !socket.has_next()? { return Err(Error::InvalidMutlipartLength(1)); } - let data_len = socket.recv_into(data, 0)?; + let data_len = socket.receive_into(data)?; if data_len > DATA_MAX_LEN { return Err(Error::InvalidDataLength(data_len)); } - if !socket.get_rcvmore()? { + if !socket.has_next()? { return Err(Error::InvalidMutlipartLength(2)); } - let sequence_len = socket.recv_into(&mut sequence, 0)?; + let sequence_len = socket.receive_into(&mut sequence)?; if sequence_len != SEQUENCE_LEN { return Err(Error::InvalidSequenceLength(sequence_len)); } - if !socket.get_rcvmore()? { + if !socket.has_next()? { return Message::from_parts(&topic[0..topic_len], &data[0..data_len], sequence); } let mut len = 3; loop { - socket.recv_into(&mut [], 0)?; + socket.receive_into(&mut [])?; len += 1; - if !socket.get_rcvmore()? { + if !socket.has_next()? { return Err(Error::InvalidMutlipartLength(len)); } } diff --git a/src/subscribe_async.rs b/src/subscribe_async.rs new file mode 100644 index 0000000..6708556 --- /dev/null +++ b/src/subscribe_async.rs @@ -0,0 +1,114 @@ +use crate::{error::Result, message::Message, subscribe::recv_internal, DATA_MAX_LEN}; +use async_zmq::{subscribe, Stream, StreamExt, Subscribe}; +use core::{ + pin::Pin, + task::{Context as AsyncContext, Poll}, +}; +use futures_util::stream::Fuse; +use zmq::Context as ZmqContext; + +/// Stream that asynchronously produces [`Message`]s using a ZMQ subscriber. +pub struct MessageStream { + zmq_stream: Subscribe, + data_cache: Box<[u8; DATA_MAX_LEN]>, +} + +impl MessageStream { + fn new(zmq_stream: Subscribe) -> Self { + Self { + zmq_stream, + data_cache: vec![0; DATA_MAX_LEN].into_boxed_slice().try_into().unwrap(), + } + } +} + +impl Stream for MessageStream { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut AsyncContext<'_>) -> Poll> { + self.as_mut().zmq_stream.poll_next_unpin(cx).map(|opt| { + opt.map(|res| match res { + Ok(mp) => recv_internal(mp.iter(), &mut self.data_cache), + Err(err) => Err(err.into()), + }) + }) + } +} + +/// Stream that asynchronously produces [`Message`]s using multiple ZMQ subscriber. +pub struct MultiMessageStream { + streams: Vec>, + next: usize, +} + +impl MultiMessageStream { + fn new(buf_capacity: usize) -> Self { + Self { + streams: Vec::with_capacity(buf_capacity), + next: 0, + } + } + + fn push(&mut self, stream: Subscribe) { + // Not sure if fuse is needed, but has to prevent use of closed streams. + self.streams.push(MessageStream::new(stream).fuse()); + } +} + +impl Stream for MultiMessageStream { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut AsyncContext<'_>) -> Poll> { + let mut any_pending = false; + + let mut index_iter = (self.next..self.streams.len()).chain(0..self.next); + while let Some(i) = index_iter.next() { + match self.as_mut().streams[i].poll_next_unpin(cx) { + msg @ Poll::Ready(Some(_)) => { + if let Some(next) = index_iter.next() { + self.next = next; + } + return msg; + } + Poll::Ready(None) => { + // continue + } + Poll::Pending => { + any_pending = true; + } + } + } + + if any_pending { + Poll::Pending + } else { + Poll::Ready(None) + } + } +} + +pub fn subscribe_multi_async(endpoints: &[&str]) -> Result { + let context = ZmqContext::new(); + let mut res = MultiMessageStream::new(endpoints.len()); + + for endpoint in endpoints { + let socket = new_socket_internal(&context, endpoint)?; + res.push(socket); + } + + Ok(res) +} + +pub fn subscribe_async(endpoint: &str) -> Result { + Ok(MessageStream::new(new_socket_internal( + &ZmqContext::new(), + endpoint, + )?)) +} + +fn new_socket_internal(context: &ZmqContext, endpoint: &str) -> Result { + let socket = subscribe(endpoint)?.with_context(context).connect()?; + socket.set_subscribe("")?; + + Ok(socket) +} From 34c78e17e9099b2b1b712da665aab730766972c1 Mon Sep 17 00:00:00 2001 From: Antoni Spaanderman <56turtle56@gmail.com> Date: Sun, 22 Oct 2023 14:26:24 +0200 Subject: [PATCH 2/7] split up subscriber logic in 3 different types --- src/lib.rs | 7 +- src/subscribe.rs | 204 ------------------ src/subscribe/blocking.rs | 63 ++++++ src/subscribe/mod.rs | 112 ++++++++++ src/subscribe/receiver.rs | 48 +++++ .../stream.rs} | 21 +- 6 files changed, 233 insertions(+), 222 deletions(-) delete mode 100644 src/subscribe.rs create mode 100644 src/subscribe/blocking.rs create mode 100644 src/subscribe/mod.rs create mode 100644 src/subscribe/receiver.rs rename src/{subscribe_async.rs => subscribe/stream.rs} (84%) diff --git a/src/lib.rs b/src/lib.rs index 08ca3bc..d53427c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,19 +2,18 @@ mod error; mod message; mod sequence_message; mod subscribe; -#[cfg(feature = "async")] -mod subscribe_async; pub use crate::{ error::Error, message::{Message, DATA_MAX_LEN, SEQUENCE_LEN, TOPIC_MAX_LEN}, sequence_message::SequenceMessage, subscribe::{ - subscribe_multi, subscribe_multi_blocking, subscribe_single, subscribe_single_blocking, + blocking::{subscribe_multi_blocking, subscribe_single_blocking}, + receiver::{subscribe_multi, subscribe_single}, }, }; #[cfg(feature = "async")] -pub use crate::subscribe_async::{ +pub use crate::subscribe::stream::{ subscribe_async, subscribe_multi_async, MessageStream, MultiMessageStream, }; diff --git a/src/subscribe.rs b/src/subscribe.rs deleted file mode 100644 index 35e23a7..0000000 --- a/src/subscribe.rs +++ /dev/null @@ -1,204 +0,0 @@ -use crate::{ - error::Result, - message::{Message, SEQUENCE_LEN, TOPIC_MAX_LEN}, - Error, DATA_MAX_LEN, -}; -use core::{cmp::min, convert::Infallible, ops::ControlFlow, slice}; -use std::{ - sync::mpsc::{channel, Receiver}, - thread, -}; -use zmq::{Context, Socket}; - -fn break_on_err(is_err: bool) -> ControlFlow<()> { - if is_err { - ControlFlow::Break(()) - } else { - ControlFlow::Continue(()) - } -} - -/// Subscribes to a single ZMQ endpoint and returns a [`Receiver`]. -#[inline] -pub fn subscribe_single(endpoint: &str) -> Result>> { - let (tx, rx) = channel(); - let context = Context::new(); - - let socket = new_socket_internal(&context, endpoint)?; - - thread::spawn(move || subscribe_internal(socket, |msg| break_on_err(tx.send(msg).is_err()))); - - Ok(rx) -} - -/// Subscribes to multiple ZMQ endpoints and returns a [`Receiver`]. -#[inline] -pub fn subscribe_multi(endpoints: &[&str]) -> Result>> { - let (tx, rx) = channel(); - let context = Context::new(); - - for endpoint in endpoints { - let tx = tx.clone(); - - let socket = new_socket_internal(&context, endpoint)?; - - thread::spawn(move || { - subscribe_internal(socket, |msg| break_on_err(tx.send(msg).is_err())) - }); - } - - Ok(rx) -} - -/// Subscribes to a single ZMQ endpoint and blocks the thread until [`ControlFlow::Break`] is -/// returned by the callback. -#[inline] -pub fn subscribe_single_blocking( - endpoint: &str, - callback: F, -) -> Result> -where - F: Fn(Result) -> ControlFlow, -{ - let context = Context::new(); - - let socket = new_socket_internal(&context, endpoint)?; - - Ok(subscribe_internal(socket, callback)) -} - -/// Subscribes to multiple ZMQ endpoints and blocks the thread until [`ControlFlow::Break`] is -/// returned by the callback. -#[inline] -pub fn subscribe_multi_blocking( - endpoints: &[&str], - callback: F, -) -> Result> -where - F: Fn(Result) -> ControlFlow, -{ - let (tx, rx) = channel(); - let context = Context::new(); - - for endpoint in endpoints { - let tx = tx.clone(); - - let socket = new_socket_internal(&context, endpoint)?; - - thread::spawn(move || { - subscribe_internal(socket, |msg| break_on_err(tx.send(msg).is_err())) - }); - } - - Ok((|| { - for msg in rx { - callback(msg)?; - } - - // `tx` is dropped at the end of this function - unreachable!(); - })()) -} - -#[inline] -fn new_socket_internal(context: &Context, endpoint: &str) -> Result { - let socket = context.socket(zmq::SUB)?; - socket.connect(endpoint)?; - socket.set_subscribe(b"")?; - - Ok(socket) -} - -pub(crate) trait ReceiveFrom { - fn has_next(&self) -> Result; - - fn receive_into(&mut self, buf: &mut [u8]) -> Result; -} - -impl ReceiveFrom for &Socket { - fn has_next(&self) -> Result { - Ok(self.get_rcvmore()?) - } - - fn receive_into(&mut self, buf: &mut [u8]) -> Result { - Ok(self.recv_into(buf, 0)?) - } -} - -impl ReceiveFrom for slice::Iter<'_, zmq::Message> { - fn has_next(&self) -> Result { - Ok(!self.as_slice().is_empty()) - } - - fn receive_into(&mut self, buf: &mut [u8]) -> Result { - // TODO better way to handle None than unwrap - let bytes = &**self.next().unwrap(); - let len = bytes.len(); - let copy_len = min(len, buf.len()); - buf[0..copy_len].copy_from_slice(&bytes[0..copy_len]); - Ok(len) - } -} - -#[inline] -pub(crate) fn recv_internal( - mut socket: R, - data: &mut [u8; DATA_MAX_LEN], -) -> Result { - let mut topic = [0u8; TOPIC_MAX_LEN]; - let mut sequence = [0u8; SEQUENCE_LEN]; - - let topic_len = socket.receive_into(&mut topic)?; - if topic_len > TOPIC_MAX_LEN { - return Err(Error::InvalidTopic(topic_len, topic)); - } - - if !socket.has_next()? { - return Err(Error::InvalidMutlipartLength(1)); - } - - let data_len = socket.receive_into(data)?; - if data_len > DATA_MAX_LEN { - return Err(Error::InvalidDataLength(data_len)); - } - - if !socket.has_next()? { - return Err(Error::InvalidMutlipartLength(2)); - } - - let sequence_len = socket.receive_into(&mut sequence)?; - if sequence_len != SEQUENCE_LEN { - return Err(Error::InvalidSequenceLength(sequence_len)); - } - - if !socket.has_next()? { - return Message::from_parts(&topic[0..topic_len], &data[0..data_len], sequence); - } - - let mut len = 3; - - loop { - socket.receive_into(&mut [])?; - - len += 1; - - if !socket.has_next()? { - return Err(Error::InvalidMutlipartLength(len)); - } - } -} - -#[inline] -fn subscribe_internal(socket: Socket, callback: F) -> ControlFlow -where - F: Fn(Result) -> ControlFlow, -{ - let mut data: Box<[u8; DATA_MAX_LEN]> = - vec![0; DATA_MAX_LEN].into_boxed_slice().try_into().unwrap(); - - loop { - let msg = recv_internal(&socket, &mut data); - - callback(msg)?; - } -} diff --git a/src/subscribe/blocking.rs b/src/subscribe/blocking.rs new file mode 100644 index 0000000..7baa45e --- /dev/null +++ b/src/subscribe/blocking.rs @@ -0,0 +1,63 @@ +use super::{new_socket_internal, subscribe_internal}; +use crate::{error::Result, message::Message}; +use core::{convert::Infallible, ops::ControlFlow}; +use std::{sync::mpsc::channel, thread}; +use zmq::Context; + +fn break_on_err(is_err: bool) -> ControlFlow<()> { + if is_err { + ControlFlow::Break(()) + } else { + ControlFlow::Continue(()) + } +} + +/// Subscribes to a single ZMQ endpoint and blocks the thread until [`ControlFlow::Break`] is +/// returned by the callback. +#[inline] +pub fn subscribe_single_blocking( + endpoint: &str, + callback: F, +) -> Result> +where + F: Fn(Result) -> ControlFlow, +{ + let context = Context::new(); + + let socket = new_socket_internal(&context, endpoint)?; + + Ok(subscribe_internal(socket, callback)) +} + +/// Subscribes to multiple ZMQ endpoints and blocks the thread until [`ControlFlow::Break`] is +/// returned by the callback. +#[inline] +pub fn subscribe_multi_blocking( + endpoints: &[&str], + callback: F, +) -> Result> +where + F: Fn(Result) -> ControlFlow, +{ + let (tx, rx) = channel(); + let context = Context::new(); + + for endpoint in endpoints { + let tx = tx.clone(); + + let socket = new_socket_internal(&context, endpoint)?; + + thread::spawn(move || { + subscribe_internal(socket, |msg| break_on_err(tx.send(msg).is_err())) + }); + } + + Ok((|| { + for msg in rx { + callback(msg)?; + } + + // `tx` is dropped at the end of this function + unreachable!(); + })()) +} diff --git a/src/subscribe/mod.rs b/src/subscribe/mod.rs new file mode 100644 index 0000000..8d821b5 --- /dev/null +++ b/src/subscribe/mod.rs @@ -0,0 +1,112 @@ +pub mod blocking; +pub mod receiver; +#[cfg(feature = "async")] +pub mod stream; + +use crate::{ + error::Result, + message::{Message, SEQUENCE_LEN, TOPIC_MAX_LEN}, + Error, DATA_MAX_LEN, +}; +use core::{cmp::min, convert::Infallible, ops::ControlFlow, slice}; +use zmq::{Context, Socket}; + +pub(super) fn new_socket_internal(context: &Context, endpoint: &str) -> Result { + let socket = context.socket(zmq::SUB)?; + socket.connect(endpoint)?; + socket.set_subscribe(b"")?; + + Ok(socket) +} + +pub(super) trait ReceiveFrom { + fn has_next(&self) -> Result; + + fn receive_into(&mut self, buf: &mut [u8]) -> Result; +} + +impl ReceiveFrom for &Socket { + fn has_next(&self) -> Result { + Ok(self.get_rcvmore()?) + } + + fn receive_into(&mut self, buf: &mut [u8]) -> Result { + Ok(self.recv_into(buf, 0)?) + } +} + +impl ReceiveFrom for slice::Iter<'_, zmq::Message> { + fn has_next(&self) -> Result { + Ok(!self.as_slice().is_empty()) + } + + fn receive_into(&mut self, buf: &mut [u8]) -> Result { + // TODO better way to handle None than unwrap + let bytes = &**self.next().unwrap(); + let len = bytes.len(); + let copy_len = min(len, buf.len()); + buf[0..copy_len].copy_from_slice(&bytes[0..copy_len]); + Ok(len) + } +} + +pub(super) fn recv_internal( + mut socket: R, + data: &mut [u8; DATA_MAX_LEN], +) -> Result { + let mut topic = [0u8; TOPIC_MAX_LEN]; + let mut sequence = [0u8; SEQUENCE_LEN]; + + let topic_len = socket.receive_into(&mut topic)?; + if topic_len > TOPIC_MAX_LEN { + return Err(Error::InvalidTopic(topic_len, topic)); + } + + if !socket.has_next()? { + return Err(Error::InvalidMutlipartLength(1)); + } + + let data_len = socket.receive_into(data)?; + if data_len > DATA_MAX_LEN { + return Err(Error::InvalidDataLength(data_len)); + } + + if !socket.has_next()? { + return Err(Error::InvalidMutlipartLength(2)); + } + + let sequence_len = socket.receive_into(&mut sequence)?; + if sequence_len != SEQUENCE_LEN { + return Err(Error::InvalidSequenceLength(sequence_len)); + } + + if !socket.has_next()? { + return Message::from_parts(&topic[0..topic_len], &data[0..data_len], sequence); + } + + let mut len = 3; + + loop { + socket.receive_into(&mut [])?; + + len += 1; + + if !socket.has_next()? { + return Err(Error::InvalidMutlipartLength(len)); + } + } +} + +pub(super) fn subscribe_internal(socket: Socket, callback: F) -> ControlFlow +where + F: Fn(Result) -> ControlFlow, +{ + let mut data: Box<[u8; DATA_MAX_LEN]> = + vec![0; DATA_MAX_LEN].into_boxed_slice().try_into().unwrap(); + + loop { + let msg = recv_internal(&socket, &mut data); + + callback(msg)?; + } +} diff --git a/src/subscribe/receiver.rs b/src/subscribe/receiver.rs new file mode 100644 index 0000000..aba028b --- /dev/null +++ b/src/subscribe/receiver.rs @@ -0,0 +1,48 @@ +use super::{new_socket_internal, subscribe_internal}; +use crate::{error::Result, message::Message}; +use core::ops::ControlFlow; +use std::{ + sync::mpsc::{channel, Receiver}, + thread, +}; +use zmq::Context; + +fn break_on_err(is_err: bool) -> ControlFlow<()> { + if is_err { + ControlFlow::Break(()) + } else { + ControlFlow::Continue(()) + } +} + +/// Subscribes to a single ZMQ endpoint and returns a [`Receiver`]. +#[inline] +pub fn subscribe_single(endpoint: &str) -> Result>> { + let (tx, rx) = channel(); + let context = Context::new(); + + let socket = new_socket_internal(&context, endpoint)?; + + thread::spawn(move || subscribe_internal(socket, |msg| break_on_err(tx.send(msg).is_err()))); + + Ok(rx) +} + +/// Subscribes to multiple ZMQ endpoints and returns a [`Receiver`]. +#[inline] +pub fn subscribe_multi(endpoints: &[&str]) -> Result>> { + let (tx, rx) = channel(); + let context = Context::new(); + + for endpoint in endpoints { + let tx = tx.clone(); + + let socket = new_socket_internal(&context, endpoint)?; + + thread::spawn(move || { + subscribe_internal(socket, |msg| break_on_err(tx.send(msg).is_err())) + }); + } + + Ok(rx) +} diff --git a/src/subscribe_async.rs b/src/subscribe/stream.rs similarity index 84% rename from src/subscribe_async.rs rename to src/subscribe/stream.rs index 6708556..548c1df 100644 --- a/src/subscribe_async.rs +++ b/src/subscribe/stream.rs @@ -1,5 +1,6 @@ -use crate::{error::Result, message::Message, subscribe::recv_internal, DATA_MAX_LEN}; -use async_zmq::{subscribe, Stream, StreamExt, Subscribe}; +use super::{new_socket_internal, recv_internal}; +use crate::{error::Result, message::Message, DATA_MAX_LEN}; +use async_zmq::{Stream, StreamExt, Subscribe}; use core::{ pin::Pin, task::{Context as AsyncContext, Poll}, @@ -92,7 +93,7 @@ pub fn subscribe_multi_async(endpoints: &[&str]) -> Result { let mut res = MultiMessageStream::new(endpoints.len()); for endpoint in endpoints { - let socket = new_socket_internal(&context, endpoint)?; + let socket = new_socket_internal(&context, endpoint)?.into(); res.push(socket); } @@ -100,15 +101,7 @@ pub fn subscribe_multi_async(endpoints: &[&str]) -> Result { } pub fn subscribe_async(endpoint: &str) -> Result { - Ok(MessageStream::new(new_socket_internal( - &ZmqContext::new(), - endpoint, - )?)) -} - -fn new_socket_internal(context: &ZmqContext, endpoint: &str) -> Result { - let socket = subscribe(endpoint)?.with_context(context).connect()?; - socket.set_subscribe("")?; - - Ok(socket) + Ok(MessageStream::new( + new_socket_internal(&ZmqContext::new(), endpoint)?.into(), + )) } From 65a1d5a923e4a125cc41a7c2c19b2dcd9b3bbe7f Mon Sep 17 00:00:00 2001 From: Antoni Spaanderman <56turtle56@gmail.com> Date: Sun, 22 Oct 2023 14:33:19 +0200 Subject: [PATCH 3/7] impl FusedStream for MultiMessageStream --- src/subscribe/stream.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/subscribe/stream.rs b/src/subscribe/stream.rs index 548c1df..c3ac622 100644 --- a/src/subscribe/stream.rs +++ b/src/subscribe/stream.rs @@ -5,7 +5,7 @@ use core::{ pin::Pin, task::{Context as AsyncContext, Poll}, }; -use futures_util::stream::Fuse; +use futures_util::stream::{Fuse, FusedStream}; use zmq::Context as ZmqContext; /// Stream that asynchronously produces [`Message`]s using a ZMQ subscriber. @@ -88,6 +88,12 @@ impl Stream for MultiMessageStream { } } +impl FusedStream for MultiMessageStream { + fn is_terminated(&self) -> bool { + self.streams.iter().all(|stream| stream.is_terminated()) + } +} + pub fn subscribe_multi_async(endpoints: &[&str]) -> Result { let context = ZmqContext::new(); let mut res = MultiMessageStream::new(endpoints.len()); From efd2a311d6e129235db74b2de9098ae1fc50f6e6 Mon Sep 17 00:00:00 2001 From: Antoni Spaanderman <56turtle56@gmail.com> Date: Sun, 22 Oct 2023 21:10:27 +0200 Subject: [PATCH 4/7] add example and rename function for consistency --- Cargo.toml | 5 +++++ examples/subscribe_async.rs | 17 +++++++++++++++++ src/lib.rs | 2 +- src/subscribe/stream.rs | 2 +- 4 files changed, 24 insertions(+), 2 deletions(-) create mode 100644 examples/subscribe_async.rs diff --git a/Cargo.toml b/Cargo.toml index ff3678f..c50fd09 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,3 +17,8 @@ async_zmq = { version = "0.4.0", optional = true } bitcoin = "0.30.0" futures-util = { version = "0.3.28", optional = true } zmq = "0.10.0" + +# dev dependencies can be used in examples +[dev-dependencies] +futures = "0.3.28" +futures-util = "0.3.28" diff --git a/examples/subscribe_async.rs b/examples/subscribe_async.rs new file mode 100644 index 0000000..605ceb3 --- /dev/null +++ b/examples/subscribe_async.rs @@ -0,0 +1,17 @@ +use bitcoincore_zmq::subscribe_single_async; +use futures_util::StreamExt; + +fn main() { + let mut stream = subscribe_single_async("tcp://127.0.0.1:28332").unwrap(); + + // This is a small example to demonstrate subscribe_single_async, it is okay here to use + // block_on, but not in production environments as this defeats the purpose of async. + futures::executor::block_on(async { + while let Some(msg) = stream.next().await { + match msg { + Ok(msg) => println!("Received message: {msg}"), + Err(err) => println!("Error receiving message: {err}"), + } + } + }); +} diff --git a/src/lib.rs b/src/lib.rs index d53427c..fad15bb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -15,5 +15,5 @@ pub use crate::{ #[cfg(feature = "async")] pub use crate::subscribe::stream::{ - subscribe_async, subscribe_multi_async, MessageStream, MultiMessageStream, + subscribe_multi_async, subscribe_single_async, MessageStream, MultiMessageStream, }; diff --git a/src/subscribe/stream.rs b/src/subscribe/stream.rs index c3ac622..3c0d134 100644 --- a/src/subscribe/stream.rs +++ b/src/subscribe/stream.rs @@ -106,7 +106,7 @@ pub fn subscribe_multi_async(endpoints: &[&str]) -> Result { Ok(res) } -pub fn subscribe_async(endpoint: &str) -> Result { +pub fn subscribe_single_async(endpoint: &str) -> Result { Ok(MessageStream::new( new_socket_internal(&ZmqContext::new(), endpoint)?.into(), )) From 9e9f9bc6bf17544bd2b0e80fd34a595d43991755 Mon Sep 17 00:00:00 2001 From: Antoni Spaanderman <56turtle56@gmail.com> Date: Sun, 22 Oct 2023 21:15:09 +0200 Subject: [PATCH 5/7] fix a few imports --- examples/subscribe_async.rs | 3 ++- integration_tests/src/main.rs | 4 ++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/examples/subscribe_async.rs b/examples/subscribe_async.rs index 605ceb3..0cda7e7 100644 --- a/examples/subscribe_async.rs +++ b/examples/subscribe_async.rs @@ -1,4 +1,5 @@ use bitcoincore_zmq::subscribe_single_async; +use futures::executor::block_on; use futures_util::StreamExt; fn main() { @@ -6,7 +7,7 @@ fn main() { // This is a small example to demonstrate subscribe_single_async, it is okay here to use // block_on, but not in production environments as this defeats the purpose of async. - futures::executor::block_on(async { + block_on(async { while let Some(msg) = stream.next().await { match msg { Ok(msg) => println!("Received message: {msg}"), diff --git a/integration_tests/src/main.rs b/integration_tests/src/main.rs index 66ba046..83610aa 100644 --- a/integration_tests/src/main.rs +++ b/integration_tests/src/main.rs @@ -4,7 +4,7 @@ mod util; use bitcoincore_rpc::Client; use bitcoincore_zmq::{subscribe_multi, subscribe_multi_async, subscribe_single_blocking, Message}; use core::{assert_eq, ops::ControlFlow}; -use futures::StreamExt; +use futures::{StreamExt, executor::block_on}; use std::{sync::mpsc, thread}; use util::{generate, recv_timeout_2, setup_rpc, sleep, RECV_TIMEOUT}; @@ -105,7 +105,7 @@ fn test_hashblock_async(rpc: &Client) { let (tx, rx) = mpsc::channel(); thread::spawn(move || { - futures::executor::block_on(async { + block_on(async { while let Some(msg) = stream.next().await { tx.send(msg).unwrap(); } From bf2269c6073a304aa1d9cb5143a00fff0a0676f8 Mon Sep 17 00:00:00 2001 From: Antoni Spaanderman <56turtle56@gmail.com> Date: Sun, 22 Oct 2023 21:24:36 +0200 Subject: [PATCH 6/7] add examples to Cargo.toml and specify required-features --- Cargo.toml | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/Cargo.toml b/Cargo.toml index c50fd09..2f502cc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,3 +22,16 @@ zmq = "0.10.0" [dev-dependencies] futures = "0.3.28" futures-util = "0.3.28" + +[[example]] +name = "subscribe_async" +required-features = ["async"] + +[[example]] +name = "subscribe_blocking" + +[[example]] +name = "subscribe_receiver_pool" + +[[example]] +name = "subscribe_receiver" From 50a7fbd1aa8206851fc80512ee412c1822c1ce81 Mon Sep 17 00:00:00 2001 From: Antoni Spaanderman <56turtle56@gmail.com> Date: Sun, 22 Oct 2023 21:26:38 +0200 Subject: [PATCH 7/7] format integration_tests/src/main.rs --- integration_tests/src/main.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration_tests/src/main.rs b/integration_tests/src/main.rs index 83610aa..68a708e 100644 --- a/integration_tests/src/main.rs +++ b/integration_tests/src/main.rs @@ -4,7 +4,7 @@ mod util; use bitcoincore_rpc::Client; use bitcoincore_zmq::{subscribe_multi, subscribe_multi_async, subscribe_single_blocking, Message}; use core::{assert_eq, ops::ControlFlow}; -use futures::{StreamExt, executor::block_on}; +use futures::{executor::block_on, StreamExt}; use std::{sync::mpsc, thread}; use util::{generate, recv_timeout_2, setup_rpc, sleep, RECV_TIMEOUT};