Skip to content

Commit

Permalink
feat: invalidate table cache after updating metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Nov 29, 2023
1 parent 884ce63 commit 6896f2d
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 10 deletions.
13 changes: 13 additions & 0 deletions src/meta-srv/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,19 @@ impl HeartbeatMailbox {
serde_json::from_str(payload).context(DeserializeFromJsonSnafu { input: payload })
}

/// Parses the [Instruction] from [MailboxMessage].
#[cfg(test)]
pub(crate) fn json_instruction(msg: &MailboxMessage) -> Result<Instruction> {
let Payload::Json(payload) =
msg.payload
.as_ref()
.with_context(|| UnexpectedInstructionReplySnafu {
mailbox_message: msg.to_string(),
reason: format!("empty payload, msg: {msg:?}"),
})?;
serde_json::from_str(payload).context(DeserializeFromJsonSnafu { input: payload })
}

pub fn create(pushers: Pushers, sequence: Sequence) -> MailboxRef {
let mailbox = Arc::new(Self::new(pushers, sequence));

Expand Down
52 changes: 51 additions & 1 deletion src/meta-srv/src/procedure/region_migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ use std::any::Any;
use std::fmt::Debug;
use std::time::Duration;

use api::v1::meta::MailboxMessage;
use common_meta::instruction::Instruction;
use common_meta::key::table_info::TableInfoValue;
use common_meta::key::table_route::TableRouteValue;
use common_meta::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
Expand All @@ -43,7 +45,7 @@ use self::migration_start::RegionMigrationStart;
use crate::error::{self, Error, Result};
use crate::procedure::utils::region_lock_key;
use crate::region::lease_keeper::{OpeningRegionGuard, OpeningRegionKeeperRef};
use crate::service::mailbox::MailboxRef;
use crate::service::mailbox::{BroadcastChannel, MailboxRef};

/// It's shared in each step and available even after recovering.
///
Expand Down Expand Up @@ -235,6 +237,27 @@ impl Context {
pub fn region_id(&self) -> RegionId {
self.persistent_ctx.region_id
}

/// Broadcasts the invalidate table cache message.
pub async fn invalidate_table_cache(&self) -> Result<()> {
let table_id = self.region_id().table_id();
let instruction = Instruction::InvalidateTableIdCache(table_id);

let msg = &MailboxMessage::json_message(
"Invalidate Table Cache",
&format!("Metasrv@{}", self.server_addr()),
"Frontend broadcast",
common_time::util::current_time_millis(),
&instruction,
)
.with_context(|_| error::SerializeToJsonSnafu {
input: instruction.to_string(),
})?;

self.mailbox
.broadcast(&BroadcastChannel::Frontend, msg)
.await
}
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -346,7 +369,9 @@ mod tests {

use super::migration_end::RegionMigrationEnd;
use super::*;
use crate::handler::HeartbeatMailbox;
use crate::procedure::region_migration::test_util::TestingEnv;
use crate::service::mailbox::Channel;

fn new_persistent_context() -> PersistentContext {
PersistentContext {
Expand Down Expand Up @@ -446,4 +471,29 @@ mod tests {
assert_eq!(procedure.context.persistent_ctx.cluster_id, 2);
assert_matches!(status.unwrap(), Status::Done);
}

#[tokio::test]
async fn test_broadcast_invalidate_table_cache() {
let mut env = TestingEnv::new();
let persistent_context = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1));
let ctx = env.context_factory().new_context(persistent_context);
let mailbox_ctx = env.mailbox_context();

// No receivers.
ctx.invalidate_table_cache().await.unwrap();

let (tx, mut rx) = tokio::sync::mpsc::channel(1);

mailbox_ctx
.insert_heartbeat_response_receiver(Channel::Frontend(1), tx)
.await;

ctx.invalidate_table_cache().await.unwrap();

let resp = rx.recv().await.unwrap().unwrap();
let msg = resp.mailbox_message.unwrap();

let instruction = HeartbeatMailbox::json_instruction(&msg).unwrap();
assert_matches!(instruction, Instruction::InvalidateTableIdCache(1024));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ mod tests {
let (tx, rx) = tokio::sync::mpsc::channel(1);

mailbox_ctx
.insert_heartbeat_response_receiver(to_peer_id, tx)
.insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
.await;

// Sends an incorrect reply.
Expand Down Expand Up @@ -337,7 +337,7 @@ mod tests {
let (tx, rx) = tokio::sync::mpsc::channel(1);

mailbox_ctx
.insert_heartbeat_response_receiver(to_peer_id, tx)
.insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
.await;

// Sends an timeout error.
Expand Down Expand Up @@ -372,7 +372,7 @@ mod tests {
let (tx, rx) = tokio::sync::mpsc::channel(1);

mailbox_ctx
.insert_heartbeat_response_receiver(to_peer_id, tx)
.insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
.await;

send_mock_reply(mailbox, rx, |id| {
Expand Down Expand Up @@ -424,7 +424,7 @@ mod tests {
let (tx, rx) = tokio::sync::mpsc::channel(1);

mailbox_ctx
.insert_heartbeat_response_receiver(to_peer_id, tx)
.insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
.await;

send_mock_reply(mailbox, rx, |id| Ok(new_open_region_reply(id, true, None)));
Expand Down
5 changes: 2 additions & 3 deletions src/meta-srv/src/procedure/region_migration/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::peer::Peer;
use common_meta::sequence::Sequence;
use common_meta::DatanodeId;
use common_procedure::{Context as ProcedureContext, ProcedureId};
use common_procedure_test::MockContextProvider;
use common_time::util::current_time_millis;
Expand Down Expand Up @@ -55,10 +54,10 @@ impl MailboxContext {
/// Inserts a pusher for `datanode_id`
pub async fn insert_heartbeat_response_receiver(
&mut self,
datanode_id: DatanodeId,
channel: Channel,
tx: Sender<std::result::Result<HeartbeatResponse, tonic::Status>>,
) {
let pusher_id = Channel::Datanode(datanode_id).pusher_id();
let pusher_id = channel.pusher_id();
let pusher = Pusher::new(tx, &RequestHeader::default());
let _ = self.pushers.insert(pusher_id, pusher).await;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub(crate) mod upgrade_candidate_region;

use std::any::Any;

use common_telemetry::warn;
use serde::{Deserialize, Serialize};

use super::migration_end::RegionMigrationEnd;
Expand Down Expand Up @@ -49,13 +50,17 @@ impl State for UpdateMetadata {
UpdateMetadata::Upgrade => {
self.upgrade_candidate_region(ctx).await?;

// TODO(weny): invalidate fe cache.
if let Err(err) = ctx.invalidate_table_cache().await {
warn!("Failed to broadcast the invalidate table cache message during the upgrade candidate, error: {err:?}");
};
Ok(Box::new(RegionMigrationEnd))
}
UpdateMetadata::Rollback => {
self.rollback_downgraded_region(ctx).await?;

// TODO(weny): invalidate fe cache.
if let Err(err) = ctx.invalidate_table_cache().await {
warn!("Failed to broadcast the invalidate table cache message during the rollback, error: {err:?}");
};
Ok(Box::new(RegionMigrationEnd))
}
}
Expand Down

0 comments on commit 6896f2d

Please sign in to comment.