Skip to content

Commit

Permalink
Merge pull request #7 from antonilol/socket_events
Browse files Browse the repository at this point in the history
allow waiting until zmq finishes handshake using socket monitoring
  • Loading branch information
antonilol authored Dec 13, 2023
2 parents 0906a1d + a065dc6 commit 5f9d018
Show file tree
Hide file tree
Showing 10 changed files with 876 additions and 69 deletions.
8 changes: 7 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,16 @@ async_zmq = { version = "0.4.0", optional = true }
bitcoin = "0.30.0"
futures-util = { version = "0.3.28", optional = true }
zmq = "0.10.0"
zmq-sys = "0.12.0"

# dev dependencies can be used in examples
# dependencies used in examples
[dev-dependencies]
futures = "0.3.28"
tokio = { version = "1.35.0", features = ["time", "rt-multi-thread", "macros"] }

[[example]]
name = "subscribe_async_timeout"
required-features = ["async"]

[[example]]
name = "subscribe_async"
Expand Down
42 changes: 42 additions & 0 deletions examples/subscribe_async_timeout.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
use bitcoincore_zmq::subscribe_async_wait_handshake;
use core::time::Duration;
use futures_util::StreamExt;
use tokio::time::timeout;

#[tokio::main]
async fn main() {
// In this example I use match instead of unwrap to clearly show where errors are produced.
// `timeout` here returns an `impl Future<Output = Result<Result<impl Stream ...>>>`. The outer
// Result is created by tokio's timeout function, and wraps the inner Result created by the
// subscribe function.
let mut stream = match timeout(
Duration::from_millis(2000),
subscribe_async_wait_handshake(&["tcp://127.0.0.1:28332"]),
)
.await
{
Ok(Ok(stream)) => {
// Ok(Ok(_)), ok from both functions.
stream
}
Ok(Err(err)) => {
// Ok(Err(_)), ok from `timeout` but an error from the subscribe function.
panic!("subscribe error: {err}");
}
Err(_) => {
// Err(_), err from `timeout` means that it timed out.
panic!("subscribe_async_wait_handshake timed out");
}
};

// like in other examples, we have a stream we can get messages from
// but this one is different in that it will terminate on disconnection, and return an error just before that
while let Some(msg) = stream.next().await {
match msg {
Ok(msg) => println!("Received message: {msg}"),
Err(err) => println!("Error receiving message: {err}"),
}
}

println!("stream terminated");
}
1 change: 1 addition & 0 deletions integration_tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ bitcoin = "0.30.0"
bitcoincore-rpc = "0.17.0"
bitcoincore-zmq = { path = "..", features = ["async"] }
futures = "0.3.28"
tokio = { version = "1.35.0", features = ["full"] }
182 changes: 176 additions & 6 deletions integration_tests/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,28 @@ mod endpoints;
mod util;

use bitcoincore_rpc::Client;
use bitcoincore_zmq::{subscribe_async, subscribe_blocking, subscribe_receiver, Message};
use core::{assert_eq, ops::ControlFlow};
use bitcoincore_zmq::{
subscribe_async, subscribe_async_monitor, subscribe_async_wait_handshake,
subscribe_async_wait_handshake_timeout, subscribe_blocking, subscribe_receiver, Error, Message,
MonitorMessage, SocketEvent, SocketMessage,
};
use core::{assert_eq, ops::ControlFlow, time::Duration};
use futures::{executor::block_on, StreamExt};
use std::{sync::mpsc, thread};
use util::{generate, recv_timeout_2, setup_rpc, sleep, RECV_TIMEOUT};
use std::{net::SocketAddr, sync::mpsc, thread};
use tokio::{
io::AsyncWriteExt,
net::{TcpListener, TcpStream},
runtime,
sync::mpsc::unbounded_channel,
};
use util::{generate, recv_timeout_2, setup_rpc, sleep, static_ref_heap, RECV_TIMEOUT};

macro_rules! test {
($($function:ident,)*) => {
let rpc = setup_rpc();
let rpc = static_ref_heap(setup_rpc());
$(
println!(concat!("Running ", stringify!($function), "..."));
$function(&rpc);
$function(rpc);
println!("ok");
)*
};
Expand All @@ -25,6 +35,10 @@ fn main() {
test_hashtx,
test_sub_blocking,
test_hashblock_async,
test_monitor,
test_subscribe_timeout_tokio,
test_subscribe_timeout_inefficient,
test_disconnect,
}
}

Expand Down Expand Up @@ -126,3 +140,159 @@ fn test_hashblock_async(rpc: &Client) {

h.join().unwrap();
}

fn test_monitor(rpc: &Client) {
let mut stream = subscribe_async_monitor(&[endpoints::HASHBLOCK])
.expect("failed to subscribe to Bitcoin Core's ZMQ publisher");

block_on(async {
while let Some(msg) = stream.next().await {
let msg = msg.unwrap();
match msg {
SocketMessage::Message(_msg) => {
break;
}
SocketMessage::Event(MonitorMessage { event, .. }) => {
if event == SocketEvent::HandshakeSucceeded {
// there is a zmq publisher on the other side!
// generate a block to generate a message
generate(rpc, 1).expect("rpc call failed");
}
}
}
}
});
}

fn test_subscribe_timeout_tokio(_rpc: &Client) {
const TIMEOUT: Duration = Duration::from_millis(500);

runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap()
.block_on(async {
let _ = tokio::time::timeout(
TIMEOUT,
subscribe_async_wait_handshake(&[endpoints::HASHBLOCK]),
)
.await
.unwrap()
.unwrap();

tokio::time::timeout(
TIMEOUT,
subscribe_async_wait_handshake(&["tcp://localhost:18443"]),
)
.await
.map(|_| ())
.expect_err("an http server will not make a zmtp handshake");

tokio::time::timeout(
TIMEOUT,
subscribe_async_wait_handshake(&[endpoints::HASHBLOCK, "tcp://localhost:18443"]),
)
.await
.map(|_| ())
.expect_err("an http server will not make a zmtp handshake");
});
}

fn test_subscribe_timeout_inefficient(_rpc: &Client) {
const TIMEOUT: Duration = Duration::from_millis(500);

block_on(async {
let _ = subscribe_async_wait_handshake_timeout(&[endpoints::HASHBLOCK], TIMEOUT)
.await
.unwrap()
.unwrap();

subscribe_async_wait_handshake_timeout(&["tcp://localhost:18443"], TIMEOUT)
.await
.map(|_| ())
.expect_err("an http server will not make a zmtp handshake");

subscribe_async_wait_handshake_timeout(
&[endpoints::HASHBLOCK, "tcp://localhost:18443"],
TIMEOUT,
)
.await
.map(|_| ())
.expect_err("an http server will not make a zmtp handshake");
});
}

fn test_disconnect(rpc: &'static Client) {
runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap()
.block_on(async {
let (tx, mut rx) = unbounded_channel();

let h = tokio::spawn(async move {
let mut stream = tokio::time::timeout(
Duration::from_millis(2000),
subscribe_async_wait_handshake(&["tcp://127.0.0.1:29999"]),
)
.await
.unwrap()
.unwrap();

tokio::time::sleep(Duration::from_millis(500)).await;

let rpc_hash = generate(rpc, 1).expect("rpc call failed").0[0];

match stream.next().await {
Some(Ok(Message::HashBlock(zmq_hash, _seq))) if rpc_hash == zmq_hash => {}
other => panic!("unexpected response: {other:?}"),
}

// send the signal to close the proxy
tx.send(()).unwrap();

match stream.next().await {
Some(Err(Error::Disconnected(endpoint)))
if endpoint == "tcp://127.0.0.1:29999" => {}
other => panic!("unexpected response: {other:?}"),
}

match stream.next().await {
None => {}
other => panic!("unexpected response: {other:?}"),
}
});

// proxy endpoints::HASHBLOCK to 127.0.0.1:29999 to simulate a disconnect
// stopping bitcoin core is not a good idea as other tests may follow this one
// taken from https://github.com/tokio-rs/tokio/discussions/3173, it is not perfect but ok for this test
let ss = TcpListener::bind("127.0.0.1:29999".parse::<SocketAddr>().unwrap())
.await
.unwrap();
let (cs, _) = ss.accept().await.unwrap();
// [6..] splits off "tcp://"
let g = TcpStream::connect(endpoints::HASHBLOCK[6..].parse::<SocketAddr>().unwrap())
.await
.unwrap();
let (mut gr, mut gw) = g.into_split();
let (mut csr, mut csw) = cs.into_split();
let h1 = tokio::spawn(async move {
let _ = tokio::io::copy(&mut gr, &mut csw).await;
let _ = csw.shutdown().await;
});
let h2 = tokio::spawn(async move {
let _ = tokio::io::copy(&mut csr, &mut gw).await;
let _ = gw.shutdown().await;
});

// wait for the signal
rx.recv().await.unwrap();

// close the proxy
h1.abort();
h2.abort();

// wait on other spawned tasks
h.await.unwrap();
});
}
4 changes: 4 additions & 0 deletions integration_tests/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ pub fn setup_rpc() -> Client {
.expect("unable to connect to Bitcoin Core regtest RPC")
}

pub fn static_ref_heap<T>(val: T) -> &'static T {
Box::leak(Box::new(val))
}

fn get_cookie_path() -> String {
env::var("BITCOIN_CORE_COOKIE_PATH").expect(
"env var BITCOIN_CORE_COOKIE_PATH probably not set, \
Expand Down
20 changes: 18 additions & 2 deletions src/error.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use crate::message::{DATA_MAX_LEN, SEQUENCE_LEN, TOPIC_MAX_LEN};
use crate::{
message::{DATA_MAX_LEN, SEQUENCE_LEN, TOPIC_MAX_LEN},
monitor::MonitorMessageError,
};
use bitcoin::consensus;
use core::{cmp::min, fmt};

Expand All @@ -15,6 +18,8 @@ pub enum Error {
Invalid256BitHashLength(usize),
BitcoinDeserialization(consensus::encode::Error),
Zmq(zmq::Error),
MonitorMessage(MonitorMessageError),
Disconnected(String),
}

impl Error {
Expand Down Expand Up @@ -69,6 +74,13 @@ impl From<consensus::encode::Error> for Error {
}
}

impl From<MonitorMessageError> for Error {
#[inline]
fn from(value: MonitorMessageError) -> Self {
Self::MonitorMessage(value)
}
}

impl fmt::Display for Error {
#[inline]
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
Expand Down Expand Up @@ -115,6 +127,8 @@ impl fmt::Display for Error {
write!(f, "bitcoin consensus deserialization error: {e}")
}
Self::Zmq(e) => write!(f, "ZMQ Error: {e}"),
Self::MonitorMessage(err) => write!(f, "unable to parse monitor message: {err}"),
Self::Disconnected(url) => write!(f, "disconnected from {url}"),
}
}
}
Expand All @@ -125,13 +139,15 @@ impl std::error::Error for Error {
Some(match self {
Self::BitcoinDeserialization(e) => e,
Self::Zmq(e) => e,
Self::MonitorMessage(e) => e,
Self::InvalidMutlipartLength(_)
| Self::InvalidTopic(_, _)
| Self::InvalidDataLength(_)
| Self::InvalidSequenceLength(_)
| Self::InvalidSequenceMessageLength(_)
| Self::InvalidSequenceMessageLabel(_)
| Self::Invalid256BitHashLength(_) => return None,
| Self::Invalid256BitHashLength(_)
| Self::Disconnected(_) => return None,
})
}
}
12 changes: 11 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,28 @@

mod error;
mod message;
mod monitor;
mod sequence_message;
mod subscribe;

pub use crate::{
error::Error,
message::{Message, DATA_MAX_LEN, SEQUENCE_LEN, TOPIC_MAX_LEN},
monitor::{
event::{HandshakeFailure, SocketEvent},
MonitorMessage,
},
sequence_message::SequenceMessage,
subscribe::{blocking::subscribe_blocking, receiver::subscribe_receiver},
};

#[cfg(feature = "async")]
pub use crate::subscribe::stream::{subscribe_async, MessageStream};
pub use crate::subscribe::stream::{
subscribe_async, subscribe_async_monitor, subscribe_async_monitor_stream,
subscribe_async_stream::{self, MessageStream},
subscribe_async_wait_handshake, subscribe_async_wait_handshake_stream,
subscribe_async_wait_handshake_timeout, SocketMessage,
};

#[allow(deprecated)]
pub use crate::subscribe::{
Expand Down
Loading

0 comments on commit 5f9d018

Please sign in to comment.