From 9a07958478defa0f5d8b26ca9217a3e36d113c47 Mon Sep 17 00:00:00 2001 From: jbesraa Date: Thu, 9 Jan 2025 17:40:14 +0200 Subject: [PATCH 1/3] Add `Sniffer::includes_message_type` fn --- roles/tests-integration/lib/sniffer.rs | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/roles/tests-integration/lib/sniffer.rs b/roles/tests-integration/lib/sniffer.rs index 72abc8e1e..1ffd6f289 100644 --- a/roles/tests-integration/lib/sniffer.rs +++ b/roles/tests-integration/lib/sniffer.rs @@ -456,6 +456,21 @@ impl Sniffer { sleep(Duration::from_secs(1)).await; } } + + pub async fn includes_message_type( + &self, + message_direction: MessageDirection, + message_type: u8, + ) -> bool { + match message_direction { + MessageDirection::ToDownstream => { + self.messages_from_upstream.has_message_type(message_type) + } + MessageDirection::ToUpstream => { + self.messages_from_downstream.has_message_type(message_type) + } + } + } } // Utility macro to assert that the downstream and upstream roles have sent specific messages. From cc9cd967041ae94ba74b445253b14abd638da60d Mon Sep 17 00:00:00 2001 From: jbesraa Date: Thu, 9 Jan 2025 17:41:09 +0200 Subject: [PATCH 2/3] Add `Sniffer::wait_for_message_type_with_remove`.. fn --- roles/tests-integration/lib/sniffer.rs | 48 +++++++++++++++++++ .../tests/sniffer_integration.rs | 39 ++++++++++++++- 2 files changed, 85 insertions(+), 2 deletions(-) diff --git a/roles/tests-integration/lib/sniffer.rs b/roles/tests-integration/lib/sniffer.rs index 1ffd6f289..d2c065636 100644 --- a/roles/tests-integration/lib/sniffer.rs +++ b/roles/tests-integration/lib/sniffer.rs @@ -457,6 +457,38 @@ impl Sniffer { } } + pub async fn wait_for_message_type_with_remove( + &self, + message_direction: MessageDirection, + message_type: u8, + ) -> bool { + let now = std::time::Instant::now(); + loop { + let has_message_type = match message_direction { + MessageDirection::ToDownstream => self + .messages_from_upstream + .has_message_type_with_remove(message_type), + MessageDirection::ToUpstream => self + .messages_from_downstream + .has_message_type_with_remove(message_type), + }; + + // ready to unblock test runtime + if has_message_type { + return true; + } + + // 10 min timeout + // only for worst case, ideally should never be triggered + if now.elapsed().as_secs() > 10 * 60 { + panic!("Timeout waiting for message type"); + } + + // sleep to reduce async lock contention + sleep(Duration::from_secs(1)).await; + } + } + pub async fn includes_message_type( &self, message_direction: MessageDirection, @@ -671,6 +703,22 @@ impl MessagesAggregator { has_message } + fn has_message_type_with_remove(&self, message_type: u8) -> bool { + self.messages + .safe_lock(|messages| { + let mut cloned_messages = messages.clone(); + for (pos, (t, _)) in cloned_messages.iter().enumerate() { + if *t == message_type { + let drained = cloned_messages.drain(pos + 1..).collect(); + *messages = drained; + return true; + } + } + false + }) + .unwrap() + } + // The aggregator queues messages in FIFO order, so this function returns the oldest message in // the queue. // diff --git a/roles/tests-integration/tests/sniffer_integration.rs b/roles/tests-integration/tests/sniffer_integration.rs index 64eeaf649..480f19de7 100644 --- a/roles/tests-integration/tests/sniffer_integration.rs +++ b/roles/tests-integration/tests/sniffer_integration.rs @@ -1,4 +1,7 @@ -use const_sv2::MESSAGE_TYPE_SETUP_CONNECTION_ERROR; +use const_sv2::{ + MESSAGE_TYPE_SETUP_CONNECTION_ERROR, MESSAGE_TYPE_SETUP_CONNECTION_SUCCESS, + MESSAGE_TYPE_SET_NEW_PREV_HASH, +}; use integration_tests_sv2::*; use roles_logic_sv2::{ common_messages_sv2::SetupConnectionError, @@ -10,7 +13,6 @@ use std::convert::TryInto; #[tokio::test] async fn test_sniffer_interrupter() { let (_tp, tp_addr) = start_template_provider(None).await; - use const_sv2::MESSAGE_TYPE_SETUP_CONNECTION_SUCCESS; let message = PoolMessages::Common(CommonMessages::SetupConnectionError(SetupConnectionError { flags: 0, @@ -33,3 +35,36 @@ async fn test_sniffer_interrupter() { assert_common_message!(&sniffer.next_message_from_downstream(), SetupConnection); assert_common_message!(&sniffer.next_message_from_upstream(), SetupConnectionError); } + +#[tokio::test] +async fn test_sniffer_wait_for_message_type_with_remove() { + let (_tp, tp_addr) = start_template_provider(None).await; + let (sniffer, sniffer_addr) = start_sniffer("".to_string(), tp_addr, false, None).await; + let _ = start_pool(Some(sniffer_addr)).await; + assert!( + sniffer + .wait_for_message_type_with_remove( + MessageDirection::ToDownstream, + MESSAGE_TYPE_SET_NEW_PREV_HASH, + ) + .await + ); + assert_eq!( + sniffer + .includes_message_type( + MessageDirection::ToDownstream, + MESSAGE_TYPE_SETUP_CONNECTION_SUCCESS + ) + .await, + false + ); + assert_eq!( + sniffer + .includes_message_type( + MessageDirection::ToDownstream, + MESSAGE_TYPE_SET_NEW_PREV_HASH + ) + .await, + false + ); +} From 020531bede8271e644e6d5a5367714e885963708 Mon Sep 17 00:00:00 2001 From: jbesraa Date: Fri, 10 Jan 2025 13:29:11 +0200 Subject: [PATCH 3/3] Use `wait_for_message_type_with_remove` in `header_timestamp_value_assertion_in_new_extended_mining_job` test. --- .../tests/pool_integration.rs | 29 +++++++------------ 1 file changed, 10 insertions(+), 19 deletions(-) diff --git a/roles/tests-integration/tests/pool_integration.rs b/roles/tests-integration/tests/pool_integration.rs index e6f3446d1..498f9da0d 100644 --- a/roles/tests-integration/tests/pool_integration.rs +++ b/roles/tests-integration/tests/pool_integration.rs @@ -1,7 +1,10 @@ use integration_tests_sv2::*; use crate::sniffer::MessageDirection; -use const_sv2::{MESSAGE_TYPE_NEW_EXTENDED_MINING_JOB, MESSAGE_TYPE_NEW_TEMPLATE}; +use const_sv2::{ + MESSAGE_TYPE_MINING_SET_NEW_PREV_HASH, MESSAGE_TYPE_NEW_EXTENDED_MINING_JOB, + MESSAGE_TYPE_NEW_TEMPLATE, +}; use roles_logic_sv2::{ common_messages_sv2::{Protocol, SetupConnection}, parsers::{AnyMessage, CommonMessages, Mining, PoolMessages, TemplateDistribution}, @@ -92,24 +95,12 @@ async fn header_timestamp_value_assertion_in_new_extended_mining_job() { } _ => panic!("SetNewPrevHash not found!"), }; - // Assertions of messages between Pool and Translator Proxy (these are not necessary for the - // test itself, but they are used to pop from the sniffer's message queue) - assert_common_message!( - &pool_translator_sniffer.next_message_from_upstream(), - SetupConnectionSuccess - ); - assert_mining_message!( - &pool_translator_sniffer.next_message_from_upstream(), - OpenExtendedMiningChannelSuccess - ); - assert_mining_message!( - &pool_translator_sniffer.next_message_from_upstream(), - NewExtendedMiningJob - ); - assert_mining_message!( - &pool_translator_sniffer.next_message_from_upstream(), - SetNewPrevHash - ); + pool_translator_sniffer + .wait_for_message_type_with_remove( + MessageDirection::ToDownstream, + MESSAGE_TYPE_MINING_SET_NEW_PREV_HASH, + ) + .await; // Wait for a second NewExtendedMiningJob message pool_translator_sniffer .wait_for_message_type(