Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: remove support for logical tables in the create table procedure #3592

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,6 @@ impl StartCommand {
let table_meta_allocator = Arc::new(TableMetadataAllocator::new(
table_id_sequence,
wal_options_allocator.clone(),
table_metadata_manager.table_name_manager().clone(),
));

let ddl_task_executor = Self::create_ddl_task_executor(
Expand Down
4 changes: 2 additions & 2 deletions src/common/meta/src/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use self::table_meta::TableMetadataAllocatorRef;
use crate::cache_invalidator::CacheInvalidatorRef;
use crate::datanode_manager::DatanodeManagerRef;
use crate::error::Result;
use crate::key::table_route::TableRouteValue;
use crate::key::table_route::PhysicalTableRouteValue;
use crate::key::TableMetadataManagerRef;
use crate::region_keeper::MemoryRegionKeeperRef;
use crate::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
Expand Down Expand Up @@ -86,7 +86,7 @@ pub struct TableMetadata {
/// Table id.
pub table_id: TableId,
/// Route information for each region of the table.
pub table_route: TableRouteValue,
pub table_route: PhysicalTableRouteValue,
/// The encoded wal options for regions of the table.
// If a region does not have an associated wal options, no key for the region would be found in the map.
pub region_wal_options: HashMap<RegionNumber, String>,
Expand Down
60 changes: 18 additions & 42 deletions src/common/meta/src/ddl/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ use table::table_reference::TableReference;
use crate::ddl::create_table_template::{build_template, CreateRequestBuilder};
use crate::ddl::utils::{add_peer_context_if_needed, handle_retry_error, region_storage_path};
use crate::ddl::{DdlContext, TableMetadata, TableMetadataAllocatorContext};
use crate::error::{self, Result, TableRouteNotFoundSnafu};
use crate::error::{self, Result};
use crate::key::table_name::TableNameKey;
use crate::key::table_route::TableRouteValue;
use crate::key::table_route::{PhysicalTableRouteValue, TableRouteValue};
use crate::lock_key::{CatalogLock, SchemaLock, TableNameLock};
use crate::region_keeper::OperatingRegionGuard;
use crate::rpc::ddl::CreateTableTask;
Expand Down Expand Up @@ -69,7 +69,7 @@ impl CreateTableProcedure {
};

// Only registers regions if the table route is allocated.
if let Some(TableRouteValue::Physical(x)) = &creator.data.table_route {
if let Some(x) = &creator.data.table_route {
creator.opening_regions = creator
.register_opening_regions(&context, &x.region_routes)
.map_err(BoxedError::new)
Expand Down Expand Up @@ -97,7 +97,7 @@ impl CreateTableProcedure {
})
}

fn table_route(&self) -> Result<&TableRouteValue> {
fn table_route(&self) -> Result<&PhysicalTableRouteValue> {
self.creator
.data
.table_route
Expand All @@ -111,7 +111,7 @@ impl CreateTableProcedure {
pub fn set_allocated_metadata(
&mut self,
table_id: TableId,
table_route: TableRouteValue,
table_route: PhysicalTableRouteValue,
region_wal_options: HashMap<RegionNumber, String>,
) {
self.creator
Expand Down Expand Up @@ -192,47 +192,23 @@ impl CreateTableProcedure {
/// - [Code::DeadlineExceeded](tonic::status::Code::DeadlineExceeded)
/// - [Code::Unavailable](tonic::status::Code::Unavailable)
pub async fn on_datanode_create_regions(&mut self) -> Result<Status> {
// Safety: the table route must be allocated.
match self.table_route()?.clone() {
TableRouteValue::Physical(x) => {
let request_builder = self.new_region_request_builder(None)?;
self.create_regions(&x.region_routes, request_builder).await
}
TableRouteValue::Logical(x) => {
let physical_table_id = x.physical_table_id();

let physical_table_route = self
.context
.table_metadata_manager
.table_route_manager()
.try_get_physical_table_route(physical_table_id)
.await?
.context(TableRouteNotFoundSnafu {
table_id: physical_table_id,
})?;
let region_routes = &physical_table_route.region_routes;

let request_builder = self.new_region_request_builder(Some(physical_table_id))?;

self.create_regions(region_routes, request_builder).await
}
}
let table_route = self.table_route()?.clone();
let request_builder = self.new_region_request_builder(None)?;
self.create_regions(&table_route.region_routes, request_builder)
.await
}

async fn create_regions(
&mut self,
region_routes: &[RegionRoute],
request_builder: CreateRequestBuilder,
) -> Result<Status> {
// Safety: the table_route must be allocated.
if self.table_route()?.is_physical() {
// Registers opening regions
let guards = self
.creator
.register_opening_regions(&self.context, region_routes)?;
if !guards.is_empty() {
self.creator.opening_regions = guards;
}
// Registers opening regions
let guards = self
.creator
.register_opening_regions(&self.context, region_routes)?;
if !guards.is_empty() {
self.creator.opening_regions = guards;
}

let create_table_data = &self.creator.data;
Expand Down Expand Up @@ -303,7 +279,7 @@ impl CreateTableProcedure {
// Safety: the region_wal_options must be allocated.
let region_wal_options = self.region_wal_options()?.clone();
// Safety: the table_route must be allocated.
let table_route = self.table_route()?.clone();
let table_route = TableRouteValue::Physical(self.table_route()?.clone());
manager
.create_table_metadata(raw_table_info, table_route, region_wal_options)
.await?;
Expand Down Expand Up @@ -400,7 +376,7 @@ impl TableCreator {
fn set_allocated_metadata(
&mut self,
table_id: TableId,
table_route: TableRouteValue,
table_route: PhysicalTableRouteValue,
region_wal_options: HashMap<RegionNumber, String>,
) {
self.data.task.table_info.ident.table_id = table_id;
Expand All @@ -424,7 +400,7 @@ pub struct CreateTableData {
pub state: CreateTableState,
pub task: CreateTableTask,
/// None stands for not allocated yet.
table_route: Option<TableRouteValue>,
fengjiachun marked this conversation as resolved.
Show resolved Hide resolved
table_route: Option<PhysicalTableRouteValue>,
/// None stands for not allocated yet.
pub region_wal_options: Option<HashMap<RegionNumber, String>>,
pub cluster_id: ClusterId,
Expand Down
117 changes: 34 additions & 83 deletions src/common/meta/src/ddl/table_meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,13 @@ use std::collections::HashMap;
use std::sync::Arc;

use async_trait::async_trait;
use common_catalog::consts::METRIC_ENGINE;
use common_telemetry::{debug, info};
use snafu::{ensure, OptionExt};
use store_api::metric_engine_consts::LOGICAL_TABLE_METADATA_KEY;
use snafu::ensure;
use store_api::storage::{RegionId, RegionNumber, TableId};

use crate::ddl::{TableMetadata, TableMetadataAllocatorContext};
use crate::error::{self, Result, TableNotFoundSnafu, UnsupportedSnafu};
use crate::key::table_name::{TableNameKey, TableNameManager};
use crate::key::table_route::{LogicalTableRouteValue, PhysicalTableRouteValue, TableRouteValue};
use crate::error::{self, Result, UnsupportedSnafu};
use crate::key::table_route::PhysicalTableRouteValue;
use crate::peer::Peer;
use crate::rpc::ddl::CreateTableTask;
use crate::rpc::router::{Region, RegionRoute};
Expand All @@ -38,34 +35,29 @@ pub type TableMetadataAllocatorRef = Arc<TableMetadataAllocator>;
pub struct TableMetadataAllocator {
table_id_sequence: SequenceRef,
wal_options_allocator: WalOptionsAllocatorRef,
table_name_manager: TableNameManager,
peer_allocator: PeerAllocatorRef,
}

impl TableMetadataAllocator {
pub fn new(
table_id_sequence: SequenceRef,
wal_options_allocator: WalOptionsAllocatorRef,
table_name_manager: TableNameManager,
) -> Self {
Self::with_peer_allocator(
table_id_sequence,
wal_options_allocator,
table_name_manager,
Arc::new(NoopPeerAllocator),
)
}

pub fn with_peer_allocator(
table_id_sequence: SequenceRef,
wal_options_allocator: WalOptionsAllocatorRef,
table_name_manager: TableNameManager,
peer_allocator: PeerAllocatorRef,
) -> Self {
Self {
table_id_sequence,
wal_options_allocator,
table_name_manager,
peer_allocator,
}
}
Expand Down Expand Up @@ -102,27 +94,22 @@ impl TableMetadataAllocator {

fn create_wal_options(
&self,
table_route: &TableRouteValue,
table_route: &PhysicalTableRouteValue,
) -> Result<HashMap<RegionNumber, String>> {
match table_route {
TableRouteValue::Physical(x) => {
let region_numbers = x
.region_routes
.iter()
.map(|route| route.region.id.region_number())
.collect();
allocate_region_wal_options(region_numbers, &self.wal_options_allocator)
}
TableRouteValue::Logical(_) => Ok(HashMap::new()),
}
let region_numbers = table_route
.region_routes
.iter()
.map(|route| route.region.id.region_number())
.collect();
allocate_region_wal_options(region_numbers, &self.wal_options_allocator)
}

async fn create_table_route(
&self,
ctx: &TableMetadataAllocatorContext,
table_id: TableId,
task: &CreateTableTask,
) -> Result<TableRouteValue> {
) -> Result<PhysicalTableRouteValue> {
let regions = task.partitions.len();
ensure!(
regions > 0,
Expand All @@ -131,56 +118,29 @@ impl TableMetadataAllocator {
}
);

let table_route = if task.create_table.engine == METRIC_ENGINE
&& let Some(physical_table_name) = task
.create_table
.table_options
.get(LOGICAL_TABLE_METADATA_KEY)
{
let physical_table_id = self
.table_name_manager
.get(TableNameKey::new(
&task.create_table.catalog_name,
&task.create_table.schema_name,
physical_table_name,
))
.await?
.context(TableNotFoundSnafu {
table_name: physical_table_name,
})?
.table_id();

let region_ids = (0..regions)
.map(|i| RegionId::new(table_id, i as RegionNumber))
.collect();

TableRouteValue::Logical(LogicalTableRouteValue::new(physical_table_id, region_ids))
} else {
let peers = self.peer_allocator.alloc(ctx, regions).await?;

let region_routes = task
.partitions
.iter()
.enumerate()
.map(|(i, partition)| {
let region = Region {
id: RegionId::new(table_id, i as u32),
partition: Some(partition.clone().into()),
..Default::default()
};

let peer = peers[i % peers.len()].clone();

RegionRoute {
region,
leader_peer: Some(peer),
..Default::default()
}
})
.collect::<Vec<_>>();
TableRouteValue::Physical(PhysicalTableRouteValue::new(region_routes))
};
Ok(table_route)
let peers = self.peer_allocator.alloc(ctx, regions).await?;
let region_routes = task
.partitions
.iter()
.enumerate()
.map(|(i, partition)| {
let region = Region {
id: RegionId::new(table_id, i as u32),
partition: Some(partition.clone().into()),
..Default::default()
};

let peer = peers[i % peers.len()].clone();

RegionRoute {
region,
leader_peer: Some(peer),
..Default::default()
}
})
.collect::<Vec<_>>();

Ok(PhysicalTableRouteValue::new(region_routes))
}

pub async fn create(
Expand All @@ -203,15 +163,6 @@ impl TableMetadataAllocator {
region_wal_options,
})
}

/// Sets table ids with all tasks.
pub async fn set_table_ids_on_logic_create(&self, tasks: &mut [CreateTableTask]) -> Result<()> {
for task in tasks {
let table_id = self.allocate_table_id(task).await?;
task.table_info.ident.table_id = table_id;
}
Ok(())
}
}

pub type PeerAllocatorRef = Arc<dyn PeerAllocator>;
Expand Down
2 changes: 1 addition & 1 deletion src/common/meta/src/ddl/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ pub async fn create_physical_table(
create_physical_table_metadata(
&ddl_context,
create_physical_table_task.table_info.clone(),
table_route,
TableRouteValue::Physical(table_route),
)
.await;

Expand Down
Loading