Skip to content

Commit

Permalink
Merge pull request #8 from antonilol/reuse_socket
Browse files Browse the repository at this point in the history
use 1 socket to subscribe to multiple endpoints
  • Loading branch information
antonilol authored Nov 9, 2023
2 parents 8b5ac91 + 7b79333 commit ddf3604
Show file tree
Hide file tree
Showing 10 changed files with 103 additions and 122 deletions.
4 changes: 2 additions & 2 deletions examples/subscribe_async.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use bitcoincore_zmq::subscribe_single_async;
use bitcoincore_zmq::subscribe_async;
use futures::executor::block_on;
use futures_util::StreamExt;

fn main() {
let mut stream = subscribe_single_async("tcp://127.0.0.1:28332").unwrap();
let mut stream = subscribe_async(&["tcp://127.0.0.1:28332"]).unwrap();

// This is a small example to demonstrate subscribe_single_async, it is okay here to use
// block_on, but not in production environments as this defeats the purpose of async.
Expand Down
4 changes: 2 additions & 2 deletions examples/subscribe_blocking.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use bitcoincore_zmq::subscribe_single_blocking;
use bitcoincore_zmq::subscribe_blocking;
use core::ops::ControlFlow;

fn main() {
Expand All @@ -14,7 +14,7 @@ fn main() {
ControlFlow::Continue(())
};

match subscribe_single_blocking("tcp://127.0.0.1:28359", callback) {
match subscribe_blocking(&["tcp://127.0.0.1:28359"], callback) {
Ok(ControlFlow::Break(err)) => {
// Callback exited by returning ControlFlow::Break
println!("Error receiving message: {err}");
Expand Down
4 changes: 2 additions & 2 deletions examples/subscribe_receiver.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use bitcoincore_zmq::subscribe_multi;
use bitcoincore_zmq::subscribe_receiver;

fn main() {
let rx = subscribe_multi(&["tcp://127.0.0.1:28332", "tcp://127.0.0.1:28333"]).unwrap();
let rx = subscribe_receiver(&["tcp://127.0.0.1:28332", "tcp://127.0.0.1:28333"]).unwrap();

for msg in rx {
match msg {
Expand Down
4 changes: 2 additions & 2 deletions examples/subscribe_receiver_pool.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use bitcoincore_zmq::subscribe_multi;
use bitcoincore_zmq::subscribe_receiver;
use std::{
sync::{Arc, Mutex},
thread,
Expand All @@ -11,7 +11,7 @@ fn main() {
let mut threads = Vec::new();

let rx = Arc::new(Mutex::new(
subscribe_multi(&["tcp://127.0.0.1:28332", "tcp://127.0.0.1:28333"]).unwrap(),
subscribe_receiver(&["tcp://127.0.0.1:28332", "tcp://127.0.0.1:28333"]).unwrap(),
));

for id in 0..POOL_THREADS {
Expand Down
10 changes: 5 additions & 5 deletions integration_tests/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ mod endpoints;
mod util;

use bitcoincore_rpc::Client;
use bitcoincore_zmq::{subscribe_multi, subscribe_multi_async, subscribe_single_blocking, Message};
use bitcoincore_zmq::{subscribe_async, subscribe_blocking, subscribe_receiver, Message};
use core::{assert_eq, ops::ControlFlow};
use futures::{executor::block_on, StreamExt};
use std::{sync::mpsc, thread};
Expand All @@ -29,7 +29,7 @@ fn main() {
}

fn test_hashblock(rpc: &Client) {
let receiver = subscribe_multi(&[endpoints::HASHBLOCK, endpoints::RAWBLOCK])
let receiver = subscribe_receiver(&[endpoints::HASHBLOCK, endpoints::RAWBLOCK])
.expect("failed to subscribe to Bitcoin Core's ZMQ publisher");

let rpc_hash = generate(rpc, 1).expect("rpc call failed").0[0];
Expand All @@ -47,7 +47,7 @@ fn test_hashblock(rpc: &Client) {
}

fn test_hashtx(rpc: &Client) {
let receiver = subscribe_multi(&[endpoints::HASHTX, endpoints::RAWTX])
let receiver = subscribe_receiver(&[endpoints::HASHTX, endpoints::RAWTX])
.expect("failed to subscribe to Bitcoin Core's ZMQ publisher");

generate(rpc, 1).expect("rpc call failed");
Expand All @@ -69,7 +69,7 @@ fn test_sub_blocking(rpc: &Client) {
let (tx, rx) = mpsc::channel();

let h = thread::spawn(move || {
subscribe_single_blocking(endpoints::HASHBLOCK, |msg| {
subscribe_blocking(&[endpoints::HASHBLOCK], |msg| {
let msg = msg.expect("zmq message error");

match msg {
Expand Down Expand Up @@ -99,7 +99,7 @@ fn test_sub_blocking(rpc: &Client) {
}

fn test_hashblock_async(rpc: &Client) {
let mut stream = subscribe_multi_async(&[endpoints::HASHBLOCK, endpoints::RAWBLOCK])
let mut stream = subscribe_async(&[endpoints::HASHBLOCK, endpoints::RAWBLOCK])
.expect("failed to subscribe to Bitcoin Core's ZMQ subscriber");

let rpc_hash = generate(rpc, 1).expect("rpc call failed").0[0];
Expand Down
17 changes: 12 additions & 5 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,20 @@ pub use crate::{
error::Error,
message::{Message, DATA_MAX_LEN, SEQUENCE_LEN, TOPIC_MAX_LEN},
sequence_message::SequenceMessage,
subscribe::{
blocking::{subscribe_multi_blocking, subscribe_single_blocking},
receiver::{subscribe_multi, subscribe_single},
},
subscribe::{blocking::subscribe_blocking, receiver::subscribe_receiver},
};

#[cfg(feature = "async")]
pub use crate::subscribe::stream::{subscribe_async, MessageStream};

#[allow(deprecated)]
pub use crate::subscribe::{
blocking::{subscribe_multi_blocking, subscribe_single_blocking},
receiver::{subscribe_multi, subscribe_single},
};

#[cfg(feature = "async")]
#[allow(deprecated)]
pub use crate::subscribe::stream::{
subscribe_multi_async, subscribe_single_async, MessageStream, MultiMessageStream,
subscribe_multi_async, subscribe_single_async, MultiMessageStream,
};
57 changes: 23 additions & 34 deletions src/subscribe/blocking.rs
Original file line number Diff line number Diff line change
@@ -1,63 +1,52 @@
use super::{new_socket_internal, subscribe_internal};
use crate::{error::Result, message::Message};
use core::{convert::Infallible, ops::ControlFlow};
use std::{sync::mpsc::channel, thread};
use zmq::Context;

fn break_on_err(is_err: bool) -> ControlFlow<()> {
if is_err {
ControlFlow::Break(())
} else {
ControlFlow::Continue(())
}
}

/// Subscribes to a single ZMQ endpoint and blocks the thread until [`ControlFlow::Break`] is
/// returned by the callback.
#[inline]
#[deprecated(
since = "1.3.2",
note = "Use subscribe_blocking. This function has no performance benefit over subscribe_multi_blocking anymore."
)]
pub fn subscribe_single_blocking<F, B>(
endpoint: &str,
callback: F,
) -> Result<ControlFlow<B, Infallible>>
where
F: Fn(Result<Message>) -> ControlFlow<B>,
{
let context = Context::new();

let socket = new_socket_internal(&context, endpoint)?;

Ok(subscribe_internal(socket, callback))
subscribe_blocking(&[endpoint], callback)
}

/// Subscribes to multiple ZMQ endpoints and blocks the thread until [`ControlFlow::Break`] is
/// returned by the callback.
#[inline]
#[deprecated(
since = "1.3.2",
note = "Use subscribe_blocking. The name changed because there is no distinction made anymore between subscribing to 1 or more endpoints."
)]
pub fn subscribe_multi_blocking<F, B>(
endpoints: &[&str],
callback: F,
) -> Result<ControlFlow<B, Infallible>>
where
F: Fn(Result<Message>) -> ControlFlow<B>,
{
let (tx, rx) = channel();
let context = Context::new();

for endpoint in endpoints {
let tx = tx.clone();

let socket = new_socket_internal(&context, endpoint)?;

thread::spawn(move || {
subscribe_internal(socket, |msg| break_on_err(tx.send(msg).is_err()))
});
}
subscribe_blocking(endpoints, callback)
}

Ok((|| {
for msg in rx {
callback(msg)?;
}
/// Subscribes to multiple ZMQ endpoints and blocks the thread until [`ControlFlow::Break`] is
/// returned by the callback.
#[inline]
pub fn subscribe_blocking<F, B>(
endpoints: &[&str],
callback: F,
) -> Result<ControlFlow<B, Infallible>>
where
F: Fn(Result<Message>) -> ControlFlow<B>,
{
let (_context, socket) = new_socket_internal(endpoints)?;

// `tx` is dropped at the end of this function
unreachable!();
})())
Ok(subscribe_internal(socket, callback))
}
11 changes: 8 additions & 3 deletions src/subscribe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,17 @@ use crate::{
use core::{cmp::min, convert::Infallible, ops::ControlFlow, slice};
use zmq::{Context, Socket};

pub(super) fn new_socket_internal(context: &Context, endpoint: &str) -> Result<Socket> {
pub(super) fn new_socket_internal(endpoints: &[&str]) -> Result<(Context, Socket)> {
let context = Context::new();

let socket = context.socket(zmq::SUB)?;
socket.connect(endpoint)?;
socket.set_subscribe(b"")?;

Ok(socket)
for endpoint in endpoints {
socket.connect(endpoint)?;
}

Ok((context, socket))
}

pub(super) trait ReceiveFrom {
Expand Down
35 changes: 17 additions & 18 deletions src/subscribe/receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use std::{
sync::mpsc::{channel, Receiver},
thread,
};
use zmq::Context;

fn break_on_err(is_err: bool) -> ControlFlow<()> {
if is_err {
Expand All @@ -17,32 +16,32 @@ fn break_on_err(is_err: bool) -> ControlFlow<()> {

/// Subscribes to a single ZMQ endpoint and returns a [`Receiver`].
#[inline]
#[deprecated(
since = "1.3.2",
note = "Use subscribe_receiver. This function has no performance benefit over subscribe_multi anymore."
)]
pub fn subscribe_single(endpoint: &str) -> Result<Receiver<Result<Message>>> {
let (tx, rx) = channel();
let context = Context::new();

let socket = new_socket_internal(&context, endpoint)?;

thread::spawn(move || subscribe_internal(socket, |msg| break_on_err(tx.send(msg).is_err())));

Ok(rx)
subscribe_receiver(&[endpoint])
}

/// Subscribes to multiple ZMQ endpoints and returns a [`Receiver`].
#[inline]
#[deprecated(
since = "1.3.2",
note = "Use subscribe_receiver. The name changed because there is no distinction made anymore between subscribing to 1 or more endpoints."
)]
pub fn subscribe_multi(endpoints: &[&str]) -> Result<Receiver<Result<Message>>> {
let (tx, rx) = channel();
let context = Context::new();
subscribe_receiver(endpoints)
}

for endpoint in endpoints {
let tx = tx.clone();
/// Subscribes to multiple ZMQ endpoints and returns a [`Receiver`].
#[inline]
pub fn subscribe_receiver(endpoints: &[&str]) -> Result<Receiver<Result<Message>>> {
let (tx, rx) = channel();

let socket = new_socket_internal(&context, endpoint)?;
let (_context, socket) = new_socket_internal(endpoints)?;

thread::spawn(move || {
subscribe_internal(socket, |msg| break_on_err(tx.send(msg).is_err()))
});
}
thread::spawn(move || subscribe_internal(socket, |msg| break_on_err(tx.send(msg).is_err())));

Ok(rx)
}
Loading

0 comments on commit ddf3604

Please sign in to comment.