Skip to content

Commit

Permalink
feat(issue1179): accelerate partition reassignments on load balancing (
Browse files Browse the repository at this point in the history
…#1181)

Signed-off-by: Shichao Nie <[email protected]>
  • Loading branch information
SCNieh authored Apr 26, 2024
1 parent 1854f7a commit b862c3c
Show file tree
Hide file tree
Showing 8 changed files with 283 additions and 199 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,16 @@ protected void init() {
RecordClusterModel clusterModel = new RecordClusterModel(new LogContext(String.format("[ClusterModel id=%d] ", quorumController.nodeId())));
this.loadRetriever = new LoadRetriever(config, quorumController, clusterModel,
new LogContext(String.format("[LoadRetriever id=%d] ", quorumController.nodeId())));
this.actionExecutorService = new ControllerActionExecutorService(config, quorumController,
this.actionExecutorService = new ControllerActionExecutorService(quorumController,
new LogContext(String.format("[ExecutionManager id=%d] ", quorumController.nodeId())));
this.actionExecutorService.start();

this.anomalyDetector = new AnomalyDetectorBuilder()
.logContext(new LogContext(String.format("[AnomalyDetector id=%d] ", quorumController.nodeId())))
.maxActionsNumPerExecution(config.getInt(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_EXECUTION_STEPS))
.detectIntervalMs(config.getLong(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_ANOMALY_DETECT_INTERVAL_MS))
.maxTolerateMetricsDelayMs(config.getLong(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_ACCEPTED_METRICS_DELAY_MS))
.coolDownIntervalPerActionMs(config.getLong(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_EXECUTION_INTERVAL_MS))
.executionConcurrency(config.getInt(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_EXECUTION_CONCURRENCY))
.executionIntervalMs(config.getLong(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_EXECUTION_INTERVAL_MS))
.clusterModel(clusterModel)
.executor(this.actionExecutorService)
.addGoals(config.getConfiguredInstances(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_GOALS, Goal.class))
Expand Down Expand Up @@ -125,7 +125,6 @@ public void validateReconfiguration(Map<String, ?> configs) throws ConfigExcepti
ConfigUtils.getBoolean(objectConfigs, AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_ENABLE);
}
this.anomalyDetector.validateReconfiguration(objectConfigs);
this.actionExecutorService.validateReconfiguration(objectConfigs);
} catch (ConfigException e) {
throw e;
} catch (Exception e) {
Expand All @@ -149,7 +148,6 @@ public void reconfigure(Map<String, ?> configs) {
}
}
this.anomalyDetector.reconfigure(objectConfigs);
this.actionExecutorService.reconfigure(objectConfigs);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ public class AutoBalancerControllerConfig extends AbstractConfig {
public static final String AUTO_BALANCER_CONTROLLER_NETWORK_IN_DISTRIBUTION_DETECT_AVG_DEVIATION = PREFIX + "network.in.distribution.detect.avg.deviation";
public static final String AUTO_BALANCER_CONTROLLER_NETWORK_OUT_USAGE_DISTRIBUTION_DETECT_THRESHOLD = PREFIX + "network.out.usage.distribution.detect.threshold";
public static final String AUTO_BALANCER_CONTROLLER_NETWORK_OUT_DISTRIBUTION_DETECT_AVG_DEVIATION = PREFIX + "network.out.distribution.detect.avg.deviation";
public static final String AUTO_BALANCER_CONTROLLER_EXECUTION_CONCURRENCY = PREFIX + "execution.concurrency";
public static final String AUTO_BALANCER_CONTROLLER_EXECUTION_INTERVAL_MS = PREFIX + "execution.interval.ms";
public static final String AUTO_BALANCER_CONTROLLER_EXECUTION_STEPS = PREFIX + "execution.steps";
public static final String AUTO_BALANCER_CONTROLLER_EXCLUDE_BROKER_IDS = PREFIX + "exclude.broker.ids";
public static final String AUTO_BALANCER_CONTROLLER_EXCLUDE_TOPICS = PREFIX + "exclude.topics";
/* Default values */
Expand All @@ -61,8 +61,8 @@ public class AutoBalancerControllerConfig extends AbstractConfig {
public static final double DEFAULT_AUTO_BALANCER_CONTROLLER_NETWORK_IN_DISTRIBUTION_DETECT_AVG_DEVIATION = 0.2;
public static final long DEFAULT_AUTO_BALANCER_CONTROLLER_NETWORK_OUT_USAGE_DISTRIBUTION_DETECT_THRESHOLD = 1024 * 1024;
public static final double DEFAULT_AUTO_BALANCER_CONTROLLER_NETWORK_OUT_DISTRIBUTION_DETECT_AVG_DEVIATION = 0.2;
public static final long DEFAULT_AUTO_BALANCER_CONTROLLER_EXECUTION_INTERVAL_MS = 1000;
public static final int DEFAULT_AUTO_BALANCER_CONTROLLER_EXECUTION_STEPS = 60;
public static final int DEFAULT_AUTO_BALANCER_CONTROLLER_EXECUTION_CONCURRENCY = 50;
public static final long DEFAULT_AUTO_BALANCER_CONTROLLER_EXECUTION_INTERVAL_MS = 5000;
public static final String DEFAULT_AUTO_BALANCER_CONTROLLER_EXCLUDE_BROKER_IDS = "";
public static final String DEFAULT_AUTO_BALANCER_CONTROLLER_EXCLUDE_TOPICS = "";
/* Documents */
Expand All @@ -82,8 +82,8 @@ public class AutoBalancerControllerConfig extends AbstractConfig {
public static final String AUTO_BALANCER_CONTROLLER_NETWORK_IN_DISTRIBUTION_DETECT_AVG_DEVIATION_DOC = "The acceptable range of deviation for average network input bandwidth usage. Default is 0.2, which means the expected network traffic range will be [0.8 * loadAvg, 1.2 * loadAvg]";
public static final String AUTO_BALANCER_CONTROLLER_NETWORK_OUT_USAGE_DISTRIBUTION_DETECT_THRESHOLD_DOC = "The detection threshold of NetworkOutUsageDistributionGoal. If a broker's outbound network bandwidth usage is below this configured value, the broker will not be proactively scheduled for rebalancing.";
public static final String AUTO_BALANCER_CONTROLLER_NETWORK_OUT_DISTRIBUTION_DETECT_AVG_DEVIATION_DOC = "The acceptable range of deviation for average network out bandwidth usage. Default is 0.2, which means the expected network traffic range will be [0.8 * loadAvg, 1.2 * loadAvg]";
public static final String AUTO_BALANCER_CONTROLLER_EXECUTION_INTERVAL_MS_DOC = "Time interval between reassignments per broker in milliseconds";
public static final String AUTO_BALANCER_CONTROLLER_EXECUTION_STEPS_DOC = "The max number of reassignments per broker in one execution";
public static final String AUTO_BALANCER_CONTROLLER_EXECUTION_CONCURRENCY_DOC = "The maximum number of actions that can be executed in a single batch per broker, including partition closing and openning";
public static final String AUTO_BALANCER_CONTROLLER_EXECUTION_INTERVAL_MS_DOC = "Time interval between each action batch in milliseconds";
public static final String AUTO_BALANCER_CONTROLLER_EXCLUDE_BROKER_IDS_DOC = "Broker ids that auto balancer will ignore during balancing";
public static final String AUTO_BALANCER_CONTROLLER_EXCLUDE_TOPICS_DOC = "Topics that auto balancer will ignore during balancing";

Expand All @@ -96,8 +96,8 @@ public class AutoBalancerControllerConfig extends AbstractConfig {
AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_NETWORK_IN_DISTRIBUTION_DETECT_AVG_DEVIATION,
AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_NETWORK_OUT_USAGE_DISTRIBUTION_DETECT_THRESHOLD,
AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_NETWORK_OUT_DISTRIBUTION_DETECT_AVG_DEVIATION,
AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_EXECUTION_CONCURRENCY,
AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_EXECUTION_INTERVAL_MS,
AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_EXECUTION_STEPS,
AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_EXCLUDE_BROKER_IDS,
AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_EXCLUDE_TOPICS
);
Expand Down Expand Up @@ -149,9 +149,9 @@ public class AutoBalancerControllerConfig extends AbstractConfig {
.define(AUTO_BALANCER_CONTROLLER_ANOMALY_DETECT_INTERVAL_MS, ConfigDef.Type.LONG,
DEFAULT_AUTO_BALANCER_CONTROLLER_ANOMALY_DETECT_INTERVAL_MS, ConfigDef.Importance.HIGH,
AUTO_BALANCER_CONTROLLER_ANOMALY_DETECT_INTERVAL_MS_DOC)
.define(AUTO_BALANCER_CONTROLLER_EXECUTION_STEPS, ConfigDef.Type.INT,
DEFAULT_AUTO_BALANCER_CONTROLLER_EXECUTION_STEPS, ConfigDef.Importance.HIGH,
AUTO_BALANCER_CONTROLLER_EXECUTION_STEPS_DOC)
.define(AUTO_BALANCER_CONTROLLER_EXECUTION_CONCURRENCY, ConfigDef.Type.INT,
DEFAULT_AUTO_BALANCER_CONTROLLER_EXECUTION_CONCURRENCY, ConfigDef.Importance.HIGH,
AUTO_BALANCER_CONTROLLER_EXECUTION_CONCURRENCY_DOC)
.define(AUTO_BALANCER_CONTROLLER_EXCLUDE_BROKER_IDS, ConfigDef.Type.LIST,
DEFAULT_AUTO_BALANCER_CONTROLLER_EXCLUDE_BROKER_IDS, ConfigDef.Importance.HIGH,
AUTO_BALANCER_CONTROLLER_EXCLUDE_BROKER_IDS_DOC)
Expand Down
Loading

0 comments on commit b862c3c

Please sign in to comment.