Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resource Isolation Feature: Extend CACHE SELECT to allow warming up backup replicas #23

Open
wants to merge 5 commits into
base: pinterest-integration-3.3
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class DataCacheSelectExecutor {
private static final Logger LOG = LogManager.getLogger(DataCacheSelectExecutor.class);

public static DataCacheSelectMetrics cacheSelect(DataCacheSelectStatement statement,
ConnectContext connectContext) throws Exception {
ConnectContext connectContext) throws Exception {
// backup original session variable
SessionVariable sessionVariableBackup = connectContext.getSessionVariable();
// clone an new session variable
Expand All @@ -54,14 +54,9 @@ public static DataCacheSelectMetrics cacheSelect(DataCacheSelectStatement statem
tmpSessionVariable.setEnableCacheSelect(true);
// Note that although setting these values in the SessionVariable is not ideal, it's way more disruptive to pipe
// this information to where it needs to be through the insertStmt.
if (statement.getNumReplicasDesired() > 1) {
// We only set this value if it is larger than the default assumption.
tmpSessionVariable.setNumDesiredDatacacheReplicas(statement.getNumReplicasDesired());
}
if (statement.getResourceIsolationGroups() != null && !statement.getResourceIsolationGroups().isEmpty()) {
// We only set this value if it is the non-default.
tmpSessionVariable.setDatacacheSelectResourceGroups(statement.getResourceIsolationGroups());
}
tmpSessionVariable.setNumDesiredDatacacheReplicas(statement.getNumReplicasDesired());
tmpSessionVariable.setNumDesiredDatacacheBackupReplicas(statement.getNumBackupReplicasDesired());
tmpSessionVariable.setDatacacheSelectResourceGroups(statement.getResourceIsolationGroups());
connectContext.setSessionVariable(tmpSessionVariable);

InsertStmt insertStmt = statement.getInsertStmt();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package com.starrocks.qe;

import com.starrocks.common.DdlException;
import com.starrocks.common.ErrorCode;
import com.starrocks.common.ErrorReportException;
import com.starrocks.common.UserException;
import com.starrocks.lake.qe.scheduler.DefaultSharedDataWorkerProvider;
import com.starrocks.planner.ScanNode;
Expand All @@ -27,6 +29,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -64,24 +67,55 @@ public CacheSelectBackendSelector(ScanNode scanNode, List<TScanRangeLocations> l
private Set<Long> assignedCnByTabletId(SystemInfoService systemInfoService, Long tabletId,
String resourceIsolationGroupId) throws UserException {
TabletComputeNodeMapper mapper = systemInfoService.internalTabletMapper();
int count = Math.max(props.numReplicasDesired, props.numBackupReplicasDesired);
// skipCount variable uses for backup cache replicas CN nodes selection
// to skip first CN node (primary) from the selected nodes
int skipCount = props.numBackupReplicasDesired > 0 ? 1 : 0;
List<Long> cnIdsOrderedByPreference =
mapper.computeNodesForTablet(tabletId, props.numReplicasDesired, resourceIsolationGroupId);
if (cnIdsOrderedByPreference.size() < props.numReplicasDesired) {
throw new DdlException(String.format("Requesting more replicas than we have available CN" +
" for the specified resource group. desiredReplicas: %d, resourceGroup: %s, tabletId: %d",
props.numReplicasDesired, resourceIsolationGroupId, tabletId));
mapper.computeNodesForTablet(tabletId, count, resourceIsolationGroupId, skipCount);
if (cnIdsOrderedByPreference.isEmpty()) {
throw new DdlException(
String.format("No CN nodes available for the specified resource group." +
" resourceGroup: %s, tabletId: %d",
resourceIsolationGroupId, tabletId));
}
if (cnIdsOrderedByPreference.size() < count) {
throw new DdlException(
String.format("Requesting more replicas than we have available CN" +
" for the specified resource group. desiredReplicas: %d," +
" desiredBackupReplicas: %d, resourceGroup: %s, tabletId: %d",
props.numReplicasDesired, props.numBackupReplicasDesired,
resourceIsolationGroupId, tabletId));
}
return new HashSet<>(cnIdsOrderedByPreference);
}

private Set<Long> assignedCnByBackupWorker(Long mainTargetCnId, String resourceIsolationGroupId)
throws UserException {
Set<Long> selectedCn = new HashSet<>();
DefaultSharedDataWorkerProvider workerProvider =
new DefaultSharedDataWorkerProvider.Factory().captureAvailableWorkers(warehouseId,
resourceIsolationGroupId);
DefaultSharedDataWorkerProvider workerProvider;
try {
workerProvider = new DefaultSharedDataWorkerProvider.Factory().captureAvailableWorkers(warehouseId,
resourceIsolationGroupId);
} catch (ErrorReportException ex) {
// captureAvailableWorkers() can throw an ErrorReportException (RuntimeException) with
// error code as ERR_NO_NODES_IN_WAREHOUSE, which should be considered as
// expected behaviour in this particular case, so transforming it to checked DdlException
// would be consistent with this class logic.
if (ex.getErrorCode() == ErrorCode.ERR_NO_NODES_IN_WAREHOUSE) {
throw new DdlException(
String.format("No CN nodes available for the specified resource group. resourceGroup: %s",
resourceIsolationGroupId));
} else {
throw ex;
}
}
List<Long> selectedCn = new ArrayList<>();
int count = Math.max(props.numReplicasDesired, props.numBackupReplicasDesired);
// skipCount variable uses for backup cache replicas CN nodes selection
// to skip first CN node (primary) from the selected nodes
int skipCount = props.numBackupReplicasDesired > 0 ? 1 : 0;
long targetBackendId = mainTargetCnId;
while (selectedCn.size() < props.numReplicasDesired) {
while (selectedCn.size() < count + skipCount) {
if (selectedCn.contains(targetBackendId) || !workerProvider.isDataNodeAvailable(targetBackendId)) {
targetBackendId = workerProvider.selectBackupWorker(targetBackendId, Optional.empty());
if (targetBackendId < 0 || selectedCn.contains(targetBackendId)) {
Expand All @@ -93,17 +127,24 @@ private Set<Long> assignedCnByBackupWorker(Long mainTargetCnId, String resourceI
}
selectedCn.add(targetBackendId);
}
return selectedCn;
if (selectedCn.isEmpty()) {
throw new DdlException(
String.format("No CN nodes available for the specified resource group. resourceGroup: %s",
resourceIsolationGroupId));
}
return selectedCn.stream().skip(skipCount).collect(Collectors.toSet());
}

@Override
public void computeScanRangeAssignment() throws UserException {
if (props.resourceIsolationGroups == null || props.resourceIsolationGroups.isEmpty()) {
if (props.resourceIsolationGroups.isEmpty()) {
throw new UserException("Should not have constructed CacheSelectBackendSelector with no" +
" resourceIsolationGroups specified.");
}
if (props.numReplicasDesired < 1) {
throw new UserException("Num replicas desired in cache must be at least 1: " + props.numReplicasDesired);
if (props.numReplicasDesired < 1 && props.numBackupReplicasDesired < 1) {
throw new UserException(String.format(
"Num replicas or backup replicas desired in cache must be at least 1: replicas [%d] backup replicas [%d]",
props.numReplicasDesired, props.numBackupReplicasDesired));
}

SystemInfoService systemInfoService = GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo();
Expand Down Expand Up @@ -146,11 +187,10 @@ public void computeScanRangeAssignment() throws UserException {
// Note that although we're not using the provided callerWorkerProvider above, the caller assumes that we used
// it to note the selected backend ids. This is used for things like checking if the worker has died
// and cancelling queries.
for (long workerId : allSelectedWorkerIds) {
callerWorkerProvider.selectWorkerUnchecked(workerId);
}
allSelectedWorkerIds.forEach(callerWorkerProvider::selectWorkerUnchecked);

// Also, caller upstream will use the workerProvider to get ComputeNode references corresponding to the compute
// nodes chosen in this function, so we must enable getting any worker regardless of availability.
callerWorkerProvider.setAllowGetAnyWorker(true);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import com.starrocks.server.GlobalStateMgr;

import java.util.ArrayList;
import java.util.List;

// Describes how a CACHE SELECT statement should choose compute nodes to populate with the data.
Expand All @@ -26,15 +25,19 @@
public class CacheSelectComputeNodeSelectionProperties {
public List<String> resourceIsolationGroups;
public int numReplicasDesired;
public int numBackupReplicasDesired;
anatoly2 marked this conversation as resolved.
Show resolved Hide resolved

public CacheSelectComputeNodeSelectionProperties(List<String> resourceIsolationGroups, int numReplicasDesired) {
if (resourceIsolationGroups == null || resourceIsolationGroups.isEmpty()) {
this.resourceIsolationGroups = new ArrayList<>();
this.resourceIsolationGroups.add(GlobalStateMgr.getCurrentState().getNodeMgr().getMySelf()
.getResourceIsolationGroup());
public CacheSelectComputeNodeSelectionProperties(List<String> resourceIsolationGroups,
int numReplicasDesired,
int numBackupReplicasDesired) {
if (resourceIsolationGroups.isEmpty()) {
anatoly2 marked this conversation as resolved.
Show resolved Hide resolved
this.resourceIsolationGroups = List.of(
GlobalStateMgr.getCurrentState().getNodeMgr().getMySelf().getResourceIsolationGroup()
);
} else {
this.resourceIsolationGroups = resourceIsolationGroups;
}
this.numReplicasDesired = Math.max(numReplicasDesired, 1);
this.numReplicasDesired = Math.max(numReplicasDesired, 0);
this.numBackupReplicasDesired = Math.max(numBackupReplicasDesired, 0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,4 @@ public String toDebugString() {
}
return sb.toString();
}
}
}
37 changes: 26 additions & 11 deletions fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
import java.io.IOException;
import java.io.Serializable;
import java.lang.reflect.Field;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -245,7 +246,8 @@ public class SessionVariable implements Serializable, Writable, Cloneable {

public static final String ENABLE_PIPELINE_ENGINE = "enable_pipeline_engine";

public static final String MAX_BUCKETS_PER_BE_TO_USE_BALANCER_ASSIGNMENT = "max_buckets_per_be_to_use_balancer_assignment";
public static final String MAX_BUCKETS_PER_BE_TO_USE_BALANCER_ASSIGNMENT =
anatoly2 marked this conversation as resolved.
Show resolved Hide resolved
"max_buckets_per_be_to_use_balancer_assignment";

public static final String ENABLE_MV_PLANNER = "enable_mv_planner";
public static final String ENABLE_INCREMENTAL_REFRESH_MV = "enable_incremental_mv";
Expand Down Expand Up @@ -522,7 +524,8 @@ public class SessionVariable implements Serializable, Writable, Cloneable {
"enable_materialized_view_transparent_union_rewrite";
public static final String ENABLE_MATERIALIZED_VIEW_REWRITE_PARTITION_COMPENSATE =
"enable_materialized_view_rewrite_partition_compensate";
public static final String ENABLE_MATERIALIZED_VIEW_AGG_PUSHDOWN_REWRITE = "enable_materialized_view_agg_pushdown_rewrite";
public static final String ENABLE_MATERIALIZED_VIEW_AGG_PUSHDOWN_REWRITE =
"enable_materialized_view_agg_pushdown_rewrite";

public static final String ENABLE_MATERIALIZED_VIEW_TEXT_MATCH_REWRITE =
"enable_materialized_view_text_match_rewrite";
Expand Down Expand Up @@ -1060,7 +1063,6 @@ public static MaterializedViewRewriteMode parse(String str) {
@VariableMgr.VarAttr(name = ENABLE_CONNECTOR_SINK_GLOBAL_SHUFFLE, flag = VariableMgr.INVISIBLE)
private boolean enableConnectorSinkGlobalShuffle = true;


@VariableMgr.VarAttr(name = ENABLE_CONNECTOR_SINK_SPILL, flag = VariableMgr.INVISIBLE)
private boolean enableConnectorSinkSpill = true;

Expand Down Expand Up @@ -1665,8 +1667,9 @@ public String getCatalog() {

private boolean enableCacheSelect = false;

private List<String> datacacheSelectResourceGroups = null;
private int numDesiredDatacacheReplicas = -1;
private List<String> datacacheSelectResourceGroups = Collections.emptyList();
private int numDesiredDatacacheReplicas = 0;
private int numDesiredDatacacheBackupReplicas = 0;

@VariableMgr.VarAttr(name = ENABLE_DYNAMIC_PRUNE_SCAN_RANGE)
private boolean enableDynamicPruneScanRange = true;
Expand Down Expand Up @@ -2396,17 +2399,19 @@ public void setPreferComputeNode(boolean preferComputeNode) {
public void setComputationFragmentSchedulingPolicy(String computationFragmentSchedulingPolicy) {
SessionVariableConstants.ComputationFragmentSchedulingPolicy result =
Enums.getIfPresent(SessionVariableConstants.ComputationFragmentSchedulingPolicy.class,
StringUtils.upperCase(computationFragmentSchedulingPolicy)).orNull();
StringUtils.upperCase(computationFragmentSchedulingPolicy)).orNull();
if (result == null) {
String legalValues = Joiner.on(" | ").join(SessionVariableConstants.ComputationFragmentSchedulingPolicy.values());
throw new IllegalArgumentException("Legal values of computation_fragment_scheduling_policy are " + legalValues);
String legalValues =
Joiner.on(" | ").join(SessionVariableConstants.ComputationFragmentSchedulingPolicy.values());
throw new IllegalArgumentException(
"Legal values of computation_fragment_scheduling_policy are " + legalValues);
}
this.computationFragmentSchedulingPolicy = StringUtils.upperCase(computationFragmentSchedulingPolicy);
}

public SessionVariableConstants.ComputationFragmentSchedulingPolicy getComputationFragmentSchedulingPolicy() {
return Enums.getIfPresent(SessionVariableConstants.ComputationFragmentSchedulingPolicy.class,
StringUtils.upperCase(computationFragmentSchedulingPolicy))
StringUtils.upperCase(computationFragmentSchedulingPolicy))
.or(SessionVariableConstants.ComputationFragmentSchedulingPolicy.COMPUTE_NODES_ONLY);
}

Expand Down Expand Up @@ -3492,15 +3497,17 @@ public boolean isEnableMaterializedViewRewritePartitionCompensate() {
return enableMaterializedViewRewritePartitionCompensate;
}

public void setEnableMaterializedViewRewritePartitionCompensate(boolean enableMaterializedViewRewritePartitionCompensate) {
public void setEnableMaterializedViewRewritePartitionCompensate(
boolean enableMaterializedViewRewritePartitionCompensate) {
this.enableMaterializedViewRewritePartitionCompensate = enableMaterializedViewRewritePartitionCompensate;
}

public boolean isEnableMaterializedViewTransparentUnionRewrite() {
return enableMaterializedViewTransparentUnionRewrite;
}

public void setEnableMaterializedViewTransparentUnionRewrite(boolean enableMaterializedViewTransparentUnionRewrite) {
public void setEnableMaterializedViewTransparentUnionRewrite(
boolean enableMaterializedViewTransparentUnionRewrite) {
this.enableMaterializedViewTransparentUnionRewrite = enableMaterializedViewTransparentUnionRewrite;
}

Expand Down Expand Up @@ -4017,10 +4024,18 @@ public int getNumDesiredDatacacheReplicas() {
return numDesiredDatacacheReplicas;
}

public int getNumDesiredDatacacheBackupReplicas() {
return numDesiredDatacacheBackupReplicas;
}

public void setNumDesiredDatacacheReplicas(int numDesiredDatacacheReplicas) {
this.numDesiredDatacacheReplicas = numDesiredDatacacheReplicas;
}

public void setNumDesiredDatacacheBackupReplicas(int numDesiredDatacacheBackupReplicas) {
this.numDesiredDatacacheBackupReplicas = numDesiredDatacacheBackupReplicas;
}

// Serialize to thrift object
// used for rest api
public TQueryOptions toThrift() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,6 @@
public class Utils {
// We can only get the tablet id for an internal scan.
public static Optional<Long> getOptionalTabletId(TScanRange scanRange) {
Optional<Long> optTabletId = Optional.empty();
if (scanRange.internal_scan_range != null) {
optTabletId = Optional.of(scanRange.internal_scan_range.tablet_id);
}
return optTabletId;
return Optional.ofNullable(scanRange.internal_scan_range).map(isr -> isr.tablet_id);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public static BackendSelector create(ScanNode scanNode,
FragmentScanRangeAssignment assignment = execFragment.getScanRangeAssignment();

int desiredDatacacheReplicas = sessionVariable.getNumDesiredDatacacheReplicas();
int desiredDatacacheBackupReplicas = sessionVariable.getNumDesiredDatacacheBackupReplicas();
List<String> datacacheSelectResourceGroups = sessionVariable.getDatacacheSelectResourceGroups();

if (scanNode instanceof SchemaScanNode) {
Expand All @@ -74,13 +75,19 @@ public static BackendSelector create(ScanNode scanNode,
return new HDFSBackendSelector(scanNode, locations, assignment, workerProvider,
sessionVariable.getForceScheduleLocal(),
sessionVariable.getHDFSBackendSelectorScanRangeShuffle());
} else if (desiredDatacacheReplicas > 1 || datacacheSelectResourceGroups != null) {
} else if (desiredDatacacheReplicas > 1 ||
desiredDatacacheBackupReplicas > 0 ||
!datacacheSelectResourceGroups.isEmpty()) {
// Note that a cacheSelect should never be hasReplicated (because currently shared-data mode otherwise
// doesn't support multiple replicas in cache), and it should never be hasColocate (because a cache select
// statement is for a single table).
return new CacheSelectBackendSelector(
scanNode, locations, assignment, workerProvider, new CacheSelectComputeNodeSelectionProperties(
datacacheSelectResourceGroups, desiredDatacacheReplicas), connectContext.getCurrentWarehouseId());
scanNode, locations, assignment, workerProvider,
new CacheSelectComputeNodeSelectionProperties(
datacacheSelectResourceGroups,
desiredDatacacheReplicas,
desiredDatacacheBackupReplicas
), connectContext.getCurrentWarehouseId());
} else {
boolean hasColocate = execFragment.isColocated();
boolean hasBucket = execFragment.isLocalBucketShuffleJoin();
Expand Down
Loading
Loading