Skip to content

Commit

Permalink
update readme + update async api
Browse files Browse the repository at this point in the history
add functions to the messagestreams to access (low level) zmq function
  • Loading branch information
antonilol committed Oct 30, 2023
1 parent ab59811 commit b559f54
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 19 deletions.
10 changes: 8 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ fn main() {

For more examples, have a look in the [examples directory](examples).

### Features

- Minimal dependencies: the 2 crates `bitcoin` and `zmq`, optionally 2 additional crates are needed for the async subscriber, `async_zmq` and `futures-util`.
- Handles all message types from Bitcoin Core: `hashblock`, `hashtx`, `block`, `tx` and `sequence`.
- Flexible: choose between blocking functions with a callback, reading from a [Receiver](https://doc.rust-lang.org/std/sync/mpsc/struct.Receiver.html) or reading from an asynchronous [Stream](https://docs.rs/futures-core/latest/futures_core/stream/trait.Stream.html) without locking to a specific async runtime.

### Testing

Tests run on every push and pull request.
Expand All @@ -35,7 +41,7 @@ TODO:
- This README
- Message test
- SequenceMessage itest
- Easy addEventListener like functionality with help of the `getzmqnotifications` rpc (bitcoincore-rpc PR: #295)
- Easy addEventListener like functionality with help of the `getzmqnotifications` rpc (bitcoincore-rpc PR: [#295](https://github.com/rust-bitcoin/rust-bitcoincore-rpc/pull/295))
- raw messages
- zmq publisher
- async I/O ([pr](https://github.com/antonilol/rust-bitcoincore-zmq/pull/4))
- include source in message
4 changes: 2 additions & 2 deletions src/sequence_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ impl SequenceMessage {
/// adds or removes a transaction to the mempool.
///
/// Note that transactions that got removed from the mempool because they were included in a
/// block increment Bitcoin Core's mempool sequence, they do not produce a [`MempoolRemoval`]
/// message.
/// block increment Bitcoin Core's mempool sequence, but they do not produce a
/// [`MempoolRemoval`] message.
///
/// [`MempoolAcceptance`]: SequenceMessage::MempoolAcceptance
/// [`MempoolRemoval`]: SequenceMessage::MempoolRemoval
Expand Down
55 changes: 40 additions & 15 deletions src/subscribe/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use core::{
pin::Pin,
task::{Context as AsyncContext, Poll},
};
use futures_util::stream::{Fuse, FusedStream};
use futures_util::stream::FusedStream;
use zmq::Context as ZmqContext;

/// Stream that asynchronously produces [`Message`]s using a ZMQ subscriber.
Expand All @@ -21,6 +21,15 @@ impl MessageStream {
data_cache: vec![0; DATA_MAX_LEN].into_boxed_slice().try_into().unwrap(),
}
}

/// Returns a reference to the ZMQ socket used by this stream. To get the [`zmq::Socket`], use
/// [`as_raw_socket`] on the result. This is useful to set socket options or use other
/// functions provided by [`zmq`] or [`async_zmq`].
///
/// [`as_raw_socket`]: Subscribe::as_raw_socket
pub fn as_zmq_socket(&self) -> &Subscribe {
&self.zmq_stream
}
}

impl Stream for MessageStream {
Expand All @@ -36,9 +45,16 @@ impl Stream for MessageStream {
}
}

/// Stream that asynchronously produces [`Message`]s using multiple ZMQ subscriber.
impl FusedStream for MessageStream {
fn is_terminated(&self) -> bool {
false
}
}

/// Stream that asynchronously produces [`Message`]s using multiple ZMQ subscribers. The ZMQ
/// sockets are polled in a round-robin fashion.
pub struct MultiMessageStream {
streams: Vec<Fuse<MessageStream>>,
streams: Vec<MessageStream>,
next: usize,
}

Expand All @@ -51,17 +67,28 @@ impl MultiMessageStream {
}

fn push(&mut self, stream: Subscribe) {
// Not sure if fuse is needed, but has to prevent use of closed streams.
self.streams.push(MessageStream::new(stream).fuse());
self.streams.push(MessageStream::new(stream));
}

/// Returns a reference to the separate [`MessageStream`]s this [`MultiMessageStream`] is made
/// of. This is useful to set socket options or use other functions provided by [`zmq`] or
/// [`async_zmq`]. (See [`MessageStream::as_zmq_socket`])
pub fn as_streams(&self) -> &[MessageStream] {
&self.streams
}

/// Returns the separate [`MessageStream`]s this [`MultiMessageStream`] is made of. This is
/// useful to set socket options or use other functions provided by [`zmq`] or [`async_zmq`].
/// (See [`MessageStream::as_zmq_socket`])
pub fn into_streams(self) -> Vec<MessageStream> {
self.streams
}
}

impl Stream for MultiMessageStream {
type Item = Result<Message>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut AsyncContext<'_>) -> Poll<Option<Self::Item>> {
let mut any_pending = false;

let mut index_iter = (self.next..self.streams.len()).chain(0..self.next);
while let Some(i) = index_iter.next() {
match self.as_mut().streams[i].poll_next_unpin(cx) {
Expand All @@ -72,28 +99,25 @@ impl Stream for MultiMessageStream {
return msg;
}
Poll::Ready(None) => {
// continue
// should never be returned by async_zmq
}
Poll::Pending => {
any_pending = true;
// continue, poll others and eventually return Poll::Pending
}
}
}

if any_pending {
Poll::Pending
} else {
Poll::Ready(None)
}
Poll::Pending
}
}

impl FusedStream for MultiMessageStream {
fn is_terminated(&self) -> bool {
self.streams.iter().all(|stream| stream.is_terminated())
false
}
}

/// Subscribes to multiple ZMQ endpoints and returns a [`MultiMessageStream`].
pub fn subscribe_multi_async(endpoints: &[&str]) -> Result<MultiMessageStream> {
let context = ZmqContext::new();
let mut res = MultiMessageStream::new(endpoints.len());
Expand All @@ -106,6 +130,7 @@ pub fn subscribe_multi_async(endpoints: &[&str]) -> Result<MultiMessageStream> {
Ok(res)
}

/// Subscribes to a single ZMQ endpoint and returns a [`MessageStream`].
pub fn subscribe_single_async(endpoint: &str) -> Result<MessageStream> {
Ok(MessageStream::new(
new_socket_internal(&ZmqContext::new(), endpoint)?.into(),
Expand Down

0 comments on commit b559f54

Please sign in to comment.