From b559f54766d3d0922b9f25551d53b6aa9b56c350 Mon Sep 17 00:00:00 2001 From: Antoni Spaanderman <56turtle56@gmail.com> Date: Mon, 30 Oct 2023 21:48:33 +0100 Subject: [PATCH] update readme + update async api add functions to the messagestreams to access (low level) zmq function --- README.md | 10 ++++++-- src/sequence_message.rs | 4 +-- src/subscribe/stream.rs | 55 ++++++++++++++++++++++++++++++----------- 3 files changed, 50 insertions(+), 19 deletions(-) diff --git a/README.md b/README.md index 1e4d4f6..b8f7c05 100644 --- a/README.md +++ b/README.md @@ -20,6 +20,12 @@ fn main() { For more examples, have a look in the [examples directory](examples). +### Features + +- Minimal dependencies: the 2 crates `bitcoin` and `zmq`, optionally 2 additional crates are needed for the async subscriber, `async_zmq` and `futures-util`. +- Handles all message types from Bitcoin Core: `hashblock`, `hashtx`, `block`, `tx` and `sequence`. +- Flexible: choose between blocking functions with a callback, reading from a [Receiver](https://doc.rust-lang.org/std/sync/mpsc/struct.Receiver.html) or reading from an asynchronous [Stream](https://docs.rs/futures-core/latest/futures_core/stream/trait.Stream.html) without locking to a specific async runtime. + ### Testing Tests run on every push and pull request. @@ -35,7 +41,7 @@ TODO: - This README - Message test - SequenceMessage itest -- Easy addEventListener like functionality with help of the `getzmqnotifications` rpc (bitcoincore-rpc PR: #295) +- Easy addEventListener like functionality with help of the `getzmqnotifications` rpc (bitcoincore-rpc PR: [#295](https://github.com/rust-bitcoin/rust-bitcoincore-rpc/pull/295)) - raw messages - zmq publisher -- async I/O ([pr](https://github.com/antonilol/rust-bitcoincore-zmq/pull/4)) +- include source in message diff --git a/src/sequence_message.rs b/src/sequence_message.rs index 8e9d01f..a052a6f 100644 --- a/src/sequence_message.rs +++ b/src/sequence_message.rs @@ -57,8 +57,8 @@ impl SequenceMessage { /// adds or removes a transaction to the mempool. /// /// Note that transactions that got removed from the mempool because they were included in a - /// block increment Bitcoin Core's mempool sequence, they do not produce a [`MempoolRemoval`] - /// message. + /// block increment Bitcoin Core's mempool sequence, but they do not produce a + /// [`MempoolRemoval`] message. /// /// [`MempoolAcceptance`]: SequenceMessage::MempoolAcceptance /// [`MempoolRemoval`]: SequenceMessage::MempoolRemoval diff --git a/src/subscribe/stream.rs b/src/subscribe/stream.rs index 3c0d134..220ce12 100644 --- a/src/subscribe/stream.rs +++ b/src/subscribe/stream.rs @@ -5,7 +5,7 @@ use core::{ pin::Pin, task::{Context as AsyncContext, Poll}, }; -use futures_util::stream::{Fuse, FusedStream}; +use futures_util::stream::FusedStream; use zmq::Context as ZmqContext; /// Stream that asynchronously produces [`Message`]s using a ZMQ subscriber. @@ -21,6 +21,15 @@ impl MessageStream { data_cache: vec![0; DATA_MAX_LEN].into_boxed_slice().try_into().unwrap(), } } + + /// Returns a reference to the ZMQ socket used by this stream. To get the [`zmq::Socket`], use + /// [`as_raw_socket`] on the result. This is useful to set socket options or use other + /// functions provided by [`zmq`] or [`async_zmq`]. + /// + /// [`as_raw_socket`]: Subscribe::as_raw_socket + pub fn as_zmq_socket(&self) -> &Subscribe { + &self.zmq_stream + } } impl Stream for MessageStream { @@ -36,9 +45,16 @@ impl Stream for MessageStream { } } -/// Stream that asynchronously produces [`Message`]s using multiple ZMQ subscriber. +impl FusedStream for MessageStream { + fn is_terminated(&self) -> bool { + false + } +} + +/// Stream that asynchronously produces [`Message`]s using multiple ZMQ subscribers. The ZMQ +/// sockets are polled in a round-robin fashion. pub struct MultiMessageStream { - streams: Vec>, + streams: Vec, next: usize, } @@ -51,8 +67,21 @@ impl MultiMessageStream { } fn push(&mut self, stream: Subscribe) { - // Not sure if fuse is needed, but has to prevent use of closed streams. - self.streams.push(MessageStream::new(stream).fuse()); + self.streams.push(MessageStream::new(stream)); + } + + /// Returns a reference to the separate [`MessageStream`]s this [`MultiMessageStream`] is made + /// of. This is useful to set socket options or use other functions provided by [`zmq`] or + /// [`async_zmq`]. (See [`MessageStream::as_zmq_socket`]) + pub fn as_streams(&self) -> &[MessageStream] { + &self.streams + } + + /// Returns the separate [`MessageStream`]s this [`MultiMessageStream`] is made of. This is + /// useful to set socket options or use other functions provided by [`zmq`] or [`async_zmq`]. + /// (See [`MessageStream::as_zmq_socket`]) + pub fn into_streams(self) -> Vec { + self.streams } } @@ -60,8 +89,6 @@ impl Stream for MultiMessageStream { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut AsyncContext<'_>) -> Poll> { - let mut any_pending = false; - let mut index_iter = (self.next..self.streams.len()).chain(0..self.next); while let Some(i) = index_iter.next() { match self.as_mut().streams[i].poll_next_unpin(cx) { @@ -72,28 +99,25 @@ impl Stream for MultiMessageStream { return msg; } Poll::Ready(None) => { - // continue + // should never be returned by async_zmq } Poll::Pending => { - any_pending = true; + // continue, poll others and eventually return Poll::Pending } } } - if any_pending { - Poll::Pending - } else { - Poll::Ready(None) - } + Poll::Pending } } impl FusedStream for MultiMessageStream { fn is_terminated(&self) -> bool { - self.streams.iter().all(|stream| stream.is_terminated()) + false } } +/// Subscribes to multiple ZMQ endpoints and returns a [`MultiMessageStream`]. pub fn subscribe_multi_async(endpoints: &[&str]) -> Result { let context = ZmqContext::new(); let mut res = MultiMessageStream::new(endpoints.len()); @@ -106,6 +130,7 @@ pub fn subscribe_multi_async(endpoints: &[&str]) -> Result { Ok(res) } +/// Subscribes to a single ZMQ endpoint and returns a [`MessageStream`]. pub fn subscribe_single_async(endpoint: &str) -> Result { Ok(MessageStream::new( new_socket_internal(&ZmqContext::new(), endpoint)?.into(),