diff --git a/Cargo.toml b/Cargo.toml index ab6f6e8..3e953ae 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,7 @@ name = "bitcoincore-zmq" version = "1.2.0" edition = "2021" license = "MIT" -description = "Bitcoin Core ZMQ subscriber" +description = "Bitcoin Core ZMQ subscriber with minimal dependencies" homepage = "https://github.com/antonilol/rust-bitcoincore-zmq" repository = "https://github.com/antonilol/rust-bitcoincore-zmq" keywords = ["bitcoin", "bitcoin-core", "zmq"] diff --git a/README.md b/README.md index e5c2531..20c5582 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ [![Integration tests](https://github.com/antonilol/rust-bitcoincore-zmq/actions/workflows/integration_tests.yml/badge.svg)](https://github.com/antonilol/rust-bitcoincore-zmq/actions/workflows/integration_tests.yml) [![crates.io](https://img.shields.io/crates/v/bitcoincore-zmq.svg)](https://crates.io/crates/bitcoincore-zmq) -# Rust Bitcoin Core ZMQ +# Rust Bitcoin Core ZMQ Subscriber ### Usage example @@ -17,11 +17,17 @@ fn main() { } ``` +For more examples, have a look in the [examples directory](examples). + ### Testing Tests run on every push and pull request. Integration tests use the latest version of the 3 most recent major Bitcoin Core versions, see [integration_tests.yml](.github/workflows/integration_tests.yml#L19-L21). +### Useful resources + +- [Bitcoin Core ZMQ documentation](https://github.com/bitcoin/bitcoin/blob/master/doc/zmq.md) + --- TODO: @@ -31,3 +37,4 @@ TODO: - Easy addEventListener like functionality with help of the `getzmqnotifications` rpc (bitcoincore-rpc PR: #295) - raw messages - zmq publisher +- async I/O ([pr](https://github.com/antonilol/rust-bitcoincore-zmq/pull/4)) diff --git a/examples/subscribe_blocking.rs b/examples/subscribe_blocking.rs new file mode 100644 index 0000000..6fe2adc --- /dev/null +++ b/examples/subscribe_blocking.rs @@ -0,0 +1,30 @@ +use bitcoincore_zmq::subscribe_single_blocking; +use core::ops::ControlFlow; + +fn main() { + let callback = |msg| { + match msg { + Ok(msg) => println!("Received message: {msg}"), + Err(err) => { + // Do this to exit and return the error + return ControlFlow::Break(err); + } + } + + ControlFlow::Continue(()) + }; + + match subscribe_single_blocking("tcp://127.0.0.1:28359", callback) { + Ok(ControlFlow::Break(err)) => { + // Callback exited by returning ControlFlow::Break + println!("Error receiving message: {err}"); + } + Err(err) => { + println!("Unable to connect: {err}"); + } + Ok(ControlFlow::Continue(v)) => { + // unreachable + match v {} + } + } +} diff --git a/examples/subscribe_receiver.rs b/examples/subscribe_receiver.rs new file mode 100644 index 0000000..e06cc92 --- /dev/null +++ b/examples/subscribe_receiver.rs @@ -0,0 +1,12 @@ +use bitcoincore_zmq::subscribe_multi; + +fn main() { + let rx = subscribe_multi(&["tcp://127.0.0.1:28332", "tcp://127.0.0.1:28333"]).unwrap(); + + for msg in rx { + match msg { + Ok(msg) => println!("Received message: {msg}"), + Err(err) => println!("Error receiving message: {err}"), + } + } +} diff --git a/examples/subscribe_receiver_pool.rs b/examples/subscribe_receiver_pool.rs new file mode 100644 index 0000000..f519510 --- /dev/null +++ b/examples/subscribe_receiver_pool.rs @@ -0,0 +1,33 @@ +use bitcoincore_zmq::subscribe_multi; +use std::{ + sync::{Arc, Mutex}, + thread, +}; + +/// Use 4 threads to handle messages +const POOL_THREADS: usize = 4; + +fn main() { + let mut threads = Vec::new(); + + let rx = Arc::new(Mutex::new( + subscribe_multi(&["tcp://127.0.0.1:28332", "tcp://127.0.0.1:28333"]).unwrap(), + )); + + for id in 0..POOL_THREADS { + let rx = rx.clone(); + + threads.push(thread::spawn(move || { + while let Ok(msg) = { rx.lock().unwrap().recv() } { + match msg { + Ok(msg) => println!("Thread {id}: Received message: {msg}"), + Err(err) => println!("Thread {id}: Error receiving message: {err}"), + } + } + })); + } + + for t in threads { + t.join().unwrap(); + } +}