Skip to content

Commit

Permalink
Merge pull request #4 from antonilol/async
Browse files Browse the repository at this point in the history
initial async support
  • Loading branch information
antonilol authored Oct 22, 2023
2 parents a4dd04a + 50a7fbd commit d293960
Show file tree
Hide file tree
Showing 11 changed files with 441 additions and 173 deletions.
23 changes: 23 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,29 @@ repository = "https://github.com/antonilol/rust-bitcoincore-zmq"
keywords = ["bitcoin", "bitcoin-core", "zmq"]
categories = ["cryptography::cryptocurrencies", "network-programming"]

[features]
async = ["dep:async_zmq", "dep:futures-util"]

[dependencies]
async_zmq = { version = "0.4.0", optional = true }
bitcoin = "0.30.0"
futures-util = { version = "0.3.28", optional = true }
zmq = "0.10.0"

# dev dependencies can be used in examples
[dev-dependencies]
futures = "0.3.28"
futures-util = "0.3.28"

[[example]]
name = "subscribe_async"
required-features = ["async"]

[[example]]
name = "subscribe_blocking"

[[example]]
name = "subscribe_receiver_pool"

[[example]]
name = "subscribe_receiver"
18 changes: 18 additions & 0 deletions examples/subscribe_async.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
use bitcoincore_zmq::subscribe_single_async;
use futures::executor::block_on;
use futures_util::StreamExt;

fn main() {
let mut stream = subscribe_single_async("tcp://127.0.0.1:28332").unwrap();

// This is a small example to demonstrate subscribe_single_async, it is okay here to use
// block_on, but not in production environments as this defeats the purpose of async.
block_on(async {
while let Some(msg) = stream.next().await {
match msg {
Ok(msg) => println!("Received message: {msg}"),
Err(err) => println!("Error receiving message: {err}"),
}
}
});
}
3 changes: 2 additions & 1 deletion integration_tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ edition = "2021"
[dependencies]
bitcoin = "0.30.0"
bitcoincore-rpc = "0.17.0"
bitcoincore-zmq = { path = ".." }
bitcoincore-zmq = { path = "..", features = ["async"] }
futures = "0.3.28"
32 changes: 31 additions & 1 deletion integration_tests/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ mod endpoints;
mod util;

use bitcoincore_rpc::Client;
use bitcoincore_zmq::{subscribe_multi, subscribe_single_blocking, Message};
use bitcoincore_zmq::{subscribe_multi, subscribe_multi_async, subscribe_single_blocking, Message};
use core::{assert_eq, ops::ControlFlow};
use futures::{executor::block_on, StreamExt};
use std::{sync::mpsc, thread};
use util::{generate, recv_timeout_2, setup_rpc, sleep, RECV_TIMEOUT};

Expand All @@ -23,6 +24,7 @@ fn main() {
test_hashblock,
test_hashtx,
test_sub_blocking,
test_hashblock_async,
}
}

Expand Down Expand Up @@ -93,3 +95,31 @@ fn test_sub_blocking(rpc: &Client) {

assert_eq!(rpc_hash, zmq_hash);
}

fn test_hashblock_async(rpc: &Client) {
let mut stream = subscribe_multi_async(&[endpoints::HASHBLOCK, endpoints::RAWBLOCK])
.expect("failed to subscribe to Bitcoin Core's ZMQ subscriber");

let rpc_hash = generate(rpc, 1).expect("rpc call failed").0[0];

let (tx, rx) = mpsc::channel();

thread::spawn(move || {
block_on(async {
while let Some(msg) = stream.next().await {
tx.send(msg).unwrap();
}
})
});

match recv_timeout_2(&rx) {
(Message::Block(block, _), Message::HashBlock(blockhash, _))
| (Message::HashBlock(blockhash, _), Message::Block(block, _)) => {
assert_eq!(rpc_hash, block.block_hash());
assert_eq!(rpc_hash, blockhash);
}
(msg1, msg2) => {
panic!("invalid messages received: ({msg1}, {msg2})");
}
}
}
24 changes: 24 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,30 @@ impl From<zmq::Error> for Error {
}
}

#[cfg(feature = "async")]
impl From<async_zmq::SocketError> for Error {
#[inline]
fn from(value: async_zmq::SocketError) -> Self {
Self::Zmq(value.into())
}
}

#[cfg(feature = "async")]
impl From<async_zmq::SubscribeError> for Error {
#[inline]
fn from(value: async_zmq::SubscribeError) -> Self {
Self::Zmq(value.into())
}
}

#[cfg(feature = "async")]
impl From<async_zmq::RecvError> for Error {
#[inline]
fn from(value: async_zmq::RecvError) -> Self {
Self::Zmq(value.into())
}
}

impl From<consensus::encode::Error> for Error {
#[inline]
fn from(value: consensus::encode::Error) -> Self {
Expand Down
8 changes: 7 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@ pub use crate::{
message::{Message, DATA_MAX_LEN, SEQUENCE_LEN, TOPIC_MAX_LEN},
sequence_message::SequenceMessage,
subscribe::{
subscribe_multi, subscribe_multi_blocking, subscribe_single, subscribe_single_blocking,
blocking::{subscribe_multi_blocking, subscribe_single_blocking},
receiver::{subscribe_multi, subscribe_single},
},
};

#[cfg(feature = "async")]
pub use crate::subscribe::stream::{
subscribe_multi_async, subscribe_single_async, MessageStream, MultiMessageStream,
};
170 changes: 0 additions & 170 deletions src/subscribe.rs

This file was deleted.

63 changes: 63 additions & 0 deletions src/subscribe/blocking.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
use super::{new_socket_internal, subscribe_internal};
use crate::{error::Result, message::Message};
use core::{convert::Infallible, ops::ControlFlow};
use std::{sync::mpsc::channel, thread};
use zmq::Context;

fn break_on_err(is_err: bool) -> ControlFlow<()> {
if is_err {
ControlFlow::Break(())
} else {
ControlFlow::Continue(())
}
}

/// Subscribes to a single ZMQ endpoint and blocks the thread until [`ControlFlow::Break`] is
/// returned by the callback.
#[inline]
pub fn subscribe_single_blocking<F, B>(
endpoint: &str,
callback: F,
) -> Result<ControlFlow<B, Infallible>>
where
F: Fn(Result<Message>) -> ControlFlow<B>,
{
let context = Context::new();

let socket = new_socket_internal(&context, endpoint)?;

Ok(subscribe_internal(socket, callback))
}

/// Subscribes to multiple ZMQ endpoints and blocks the thread until [`ControlFlow::Break`] is
/// returned by the callback.
#[inline]
pub fn subscribe_multi_blocking<F, B>(
endpoints: &[&str],
callback: F,
) -> Result<ControlFlow<B, Infallible>>
where
F: Fn(Result<Message>) -> ControlFlow<B>,
{
let (tx, rx) = channel();
let context = Context::new();

for endpoint in endpoints {
let tx = tx.clone();

let socket = new_socket_internal(&context, endpoint)?;

thread::spawn(move || {
subscribe_internal(socket, |msg| break_on_err(tx.send(msg).is_err()))
});
}

Ok((|| {
for msg in rx {
callback(msg)?;
}

// `tx` is dropped at the end of this function
unreachable!();
})())
}
Loading

0 comments on commit d293960

Please sign in to comment.