Skip to content

Commit

Permalink
chore: do not reply for broadcast msg (#3595)
Browse files Browse the repository at this point in the history
  • Loading branch information
fengjiachun authored Mar 27, 2024
1 parent 9428cb8 commit 92a8e86
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 46 deletions.
4 changes: 0 additions & 4 deletions src/common/meta/src/instruction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,6 @@ pub enum InstructionReply {
OpenRegion(SimpleReply),
CloseRegion(SimpleReply),
UpgradeRegion(UpgradeRegionReply),
InvalidateTableCache(SimpleReply),
DowngradeRegion(DowngradeRegionReply),
}

Expand All @@ -213,9 +212,6 @@ impl Display for InstructionReply {
Self::OpenRegion(reply) => write!(f, "InstructionReply::OpenRegion({})", reply),
Self::CloseRegion(reply) => write!(f, "InstructionReply::CloseRegion({})", reply),
Self::UpgradeRegion(reply) => write!(f, "InstructionReply::UpgradeRegion({})", reply),
Self::InvalidateTableCache(reply) => {
write!(f, "InstructionReply::Invalidate({})", reply)
}
Self::DowngradeRegion(reply) => {
write!(f, "InstructionReply::DowngradeRegion({})", reply)
}
Expand Down
41 changes: 13 additions & 28 deletions src/frontend/src/heartbeat/handler/invalidate_table_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ use common_meta::error::Result as MetaResult;
use common_meta::heartbeat::handler::{
HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext,
};
use common_meta::instruction::{Instruction, InstructionReply, SimpleReply};
use common_telemetry::error;
use common_meta::instruction::Instruction;
use common_telemetry::debug;

#[derive(Clone)]
pub struct InvalidateTableCacheHandler {
Expand All @@ -36,35 +36,20 @@ impl HeartbeatResponseHandler for InvalidateTableCacheHandler {
}

async fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> MetaResult<HandleControl> {
let mailbox = ctx.mailbox.clone();
let cache_invalidator = self.cache_invalidator.clone();

let (meta, invalidator) = match ctx.incoming_message.take() {
Some((meta, Instruction::InvalidateCaches(caches))) => (meta, async move {
cache_invalidator
.invalidate(&Context::default(), caches)
.await
}),
_ => unreachable!("InvalidateTableCacheHandler: should be guarded by 'is_acceptable'"),
let Some((_, Instruction::InvalidateCaches(caches))) = ctx.incoming_message.take() else {
unreachable!("InvalidateTableCacheHandler: should be guarded by 'is_acceptable'")
};

let _handle = common_runtime::spawn_bg(async move {
// Local cache invalidation always succeeds.
let _ = invalidator.await;
debug!(
"InvalidateTableCacheHandler: invalidating caches: {:?}",
caches
);

if let Err(e) = mailbox
.send((
meta,
InstructionReply::InvalidateTableCache(SimpleReply {
result: true,
error: None,
}),
))
.await
{
error!(e; "Failed to send reply to mailbox");
}
});
// Invalidate local cache always success
let _ = self
.cache_invalidator
.invalidate(&Context::default(), caches)
.await?;

Ok(HandleControl::Done)
}
Expand Down
16 changes: 2 additions & 14 deletions src/frontend/src/heartbeat/handler/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::assert_matches::assert_matches;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};

Expand All @@ -22,7 +21,7 @@ use common_meta::heartbeat::handler::{
HandlerGroupExecutor, HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutor,
};
use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MessageMeta};
use common_meta::instruction::{CacheIdent, Instruction, InstructionReply, SimpleReply};
use common_meta::instruction::{CacheIdent, Instruction};
use common_meta::key::table_info::TableInfoKey;
use common_meta::key::TableMetaKey;
use partition::manager::TableRouteCacheInvalidator;
Expand Down Expand Up @@ -67,7 +66,7 @@ async fn test_invalidate_table_cache_handler() {
InvalidateTableCacheHandler::new(backend.clone()),
)]));

let (tx, mut rx) = mpsc::channel(8);
let (tx, _) = mpsc::channel(8);
let mailbox = Arc::new(HeartbeatMailbox::new(tx));

// removes a valid key
Expand All @@ -78,11 +77,6 @@ async fn test_invalidate_table_cache_handler() {
)
.await;

let (_, reply) = rx.recv().await.unwrap();
assert_matches!(
reply,
InstructionReply::InvalidateTableCache(SimpleReply { result: true, .. })
);
assert!(!backend
.inner
.lock()
Expand All @@ -96,12 +90,6 @@ async fn test_invalidate_table_cache_handler() {
Instruction::InvalidateCaches(vec![CacheIdent::TableId(0)]),
)
.await;

let (_, reply) = rx.recv().await.unwrap();
assert_matches!(
reply,
InstructionReply::InvalidateTableCache(SimpleReply { result: true, .. })
);
}

pub fn test_message_meta(id: u64, subject: &str, to: &str, from: &str) -> MessageMeta {
Expand Down

0 comments on commit 92a8e86

Please sign in to comment.