Skip to content

Commit

Permalink
[CELEBORN-1800] Introduce ApplicationTotalCount and ApplicationFallba…
Browse files Browse the repository at this point in the history
…ckCount metric to record the total and fallback count of application
  • Loading branch information
SteNicholas committed Dec 24, 2024
1 parent 6028a04 commit 95503c3
Show file tree
Hide file tree
Showing 22 changed files with 482 additions and 124 deletions.
182 changes: 181 additions & 1 deletion assets/grafana/celeborn-dashboard.json
Original file line number Diff line number Diff line change
Expand Up @@ -1472,7 +1472,7 @@
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"description": "The total count of shuffle including celeborn shuffle and spark built-in shuffle.",
"description": "The total count of shuffle including celeborn shuffle and engine built-in shuffle.",
"fieldConfig": {
"defaults": {
"color": {
Expand Down Expand Up @@ -1646,6 +1646,186 @@
],
"title": "metrics_ShuffleFallbackCount_Value",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"description": "The total count of application running with celeborn shuffle and engine built-in shuffle.",
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green"
},
{
"color": "red",
"value": 80
}
]
}
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 0,
"y": 90
},
"id": 234,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"maxHeight": 600,
"mode": "single",
"sort": "none"
}
},
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"expr": "metrics_ApplicationTotalCount_Value{instance=~\"${instance}\"}",
"legendFormat": "${baseLegend}",
"range": true,
"refId": "A"
}
],
"title": "metrics_ApplicationTotalCount_Value",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"description": "The count of application fallbacks.",
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green"
}
]
}
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 12,
"y": 90
},
"id": 235,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"maxHeight": 600,
"mode": "single",
"sort": "none"
}
},
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"expr": "metrics_ApplicationFallbackCount_Value{instance=~\"${instance}\"}",
"legendFormat": "${baseLegend}",
"range": true,
"refId": "A"
}
],
"title": "metrics_ApplicationFallbackCount_Value",
"type": "timeseries"
}
],
"title": "Master",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ public void registerJob(JobShuffleContext context) {
celebornAppId = FlinkUtils.toCelebornAppId(lifecycleManagerTimestamp, jobID);
LOG.info("CelebornAppId: {}", celebornAppId);
lifecycleManager = new LifecycleManager(celebornAppId, conf);
lifecycleManager.applicationCount().increment();
this.shuffleResourceTracker = new ShuffleResourceTracker(executor, lifecycleManager);
}
}
Expand All @@ -113,7 +114,10 @@ public void registerJob(JobShuffleContext context) {
ShuffleFallbackPolicyRunner.getActivatedFallbackPolicy(context, conf, lifecycleManager);
if (shuffleFallbackPolicy.isPresent()) {
LOG.warn("Fallback to vanilla Flink NettyShuffleMaster for job: {}.", jobID);
jobFallbackPolicies.put(jobID, shuffleFallbackPolicy.get().getClass().getName());
String jobFallbackPolicy = shuffleFallbackPolicy.get().getClass().getName();
jobFallbackPolicies.put(jobID, jobFallbackPolicy);
lifecycleManager.computeFallbackCounts(
lifecycleManager.applicationFallbackCounts(), jobFallbackPolicy);
nettyShuffleMaster().registerJob(context);
return;
}
Expand Down Expand Up @@ -161,9 +165,8 @@ public CompletableFuture<ShuffleDescriptor> registerPartitionWithProducer(
String jobFallbackPolicy = jobFallbackPolicies.get(jobID);
if (jobFallbackPolicy != null) {
try {
lifecycleManager
.shuffleFallbackCounts()
.compute(jobFallbackPolicy, (key, value) -> value == null ? 1L : value + 1L);
lifecycleManager.computeFallbackCounts(
lifecycleManager.shuffleFallbackCounts(), jobFallbackPolicy);
return nettyShuffleMaster()
.registerPartitionWithProducer(jobID, partitionDescriptor, producerDescriptor)
.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.concurrent.ConcurrentHashMap;

import scala.Int;
import scala.Option;

import org.apache.spark.*;
import org.apache.spark.internal.config.package$;
Expand Down Expand Up @@ -58,6 +59,8 @@ public class SparkShuffleManager implements ShuffleManager {
private volatile SortShuffleManager _sortShuffleManager;
private final ConcurrentHashMap.KeySetView<Integer, Boolean> sortShuffleIds =
ConcurrentHashMap.newKeySet();
private final ConcurrentHashMap.KeySetView<String, Boolean> fallbackApps =
ConcurrentHashMap.newKeySet();
private final CelebornShuffleFallbackPolicyRunner fallbackPolicyRunner;

private long sendBufferPoolCheckInterval;
Expand Down Expand Up @@ -97,6 +100,7 @@ private void initializeLifecycleManager(String appId) {
if (lifecycleManager == null) {
appUniqueId = celebornConf.appUniqueIdWithUUIDSuffix(appId);
lifecycleManager = new LifecycleManager(appUniqueId, celebornConf);
lifecycleManager.applicationCount().increment();
lifecycleManager.registerCancelShuffleCallback(SparkUtils::cancelShuffle);
if (celebornConf.clientStageRerunEnabled()) {
MapOutputTrackerMaster mapOutputTracker =
Expand All @@ -119,9 +123,19 @@ public <K, V, C> ShuffleHandle registerShuffle(
initializeLifecycleManager(appId);

lifecycleManager.shuffleCount().increment();
if (fallbackPolicyRunner.applyFallbackPolicies(dependency, lifecycleManager)) {
Option<ShuffleFallbackPolicy> fallbackPolicyOpt =
fallbackPolicyRunner.getActivatedFallbackPolicy(dependency, lifecycleManager);
if (fallbackPolicyOpt.isDefined()) {
logger.warn("Fallback to SortShuffleManager!");
sortShuffleIds.add(shuffleId);
String shuffleFallbackPolicy = fallbackPolicyOpt.get().getClass().getName();
lifecycleManager.computeFallbackCounts(
lifecycleManager.shuffleFallbackCounts(), shuffleFallbackPolicy);
if (!fallbackApps.contains(appId)) {
fallbackApps.add(appId);
lifecycleManager.computeFallbackCounts(
lifecycleManager.applicationFallbackCounts(), shuffleFallbackPolicy);
}
return sortShuffleManager().registerShuffle(shuffleId, numMaps, dependency);
} else {
lifecycleManager.registerAppShuffleDeterminate(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.spark.shuffle.celeborn

import java.util.function.BiFunction

import scala.collection.JavaConverters._

import org.apache.spark.ShuffleDependency
Expand All @@ -34,29 +32,15 @@ class CelebornShuffleFallbackPolicyRunner(conf: CelebornConf) extends Logging {
private val shuffleFallbackPolicies =
ShuffleFallbackPolicyFactory.getShuffleFallbackPolicies.asScala

def applyFallbackPolicies[K, V, C](
def getActivatedFallbackPolicy[K, V, C](
dependency: ShuffleDependency[K, V, C],
lifecycleManager: LifecycleManager): Boolean = {
lifecycleManager: LifecycleManager): Option[ShuffleFallbackPolicy] = {
val fallbackPolicy =
shuffleFallbackPolicies.find(_.needFallback(dependency, conf, lifecycleManager))
if (fallbackPolicy.isDefined) {
if (FallbackPolicy.NEVER.equals(shuffleFallbackPolicy)) {
throw new CelebornIOException(
"Fallback to spark built-in shuffle implementation is prohibited.")
} else {
lifecycleManager.shuffleFallbackCounts.compute(
fallbackPolicy.get.getClass.getName,
new BiFunction[String, java.lang.Long, java.lang.Long] {
override def apply(k: String, v: java.lang.Long): java.lang.Long = {
if (v == null) {
1L
} else {
v + 1L
}
}
})
}
if (fallbackPolicy.isDefined && FallbackPolicy.NEVER.equals(shuffleFallbackPolicy)) {
throw new CelebornIOException(
"Fallback to spark built-in shuffle implementation is prohibited.")
}
fallbackPolicy.isDefined
fallbackPolicy
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;

import scala.Option;

import org.apache.spark.*;
import org.apache.spark.internal.config.package$;
import org.apache.spark.launcher.SparkLauncher;
Expand Down Expand Up @@ -82,6 +84,8 @@ public class SparkShuffleManager implements ShuffleManager {
private volatile SortShuffleManager _sortShuffleManager;
private final ConcurrentHashMap.KeySetView<Integer, Boolean> sortShuffleIds =
ConcurrentHashMap.newKeySet();
private final ConcurrentHashMap.KeySetView<String, Boolean> fallbackApps =
ConcurrentHashMap.newKeySet();
private final CelebornShuffleFallbackPolicyRunner fallbackPolicyRunner;

private long sendBufferPoolCheckInterval;
Expand Down Expand Up @@ -139,6 +143,7 @@ private void initializeLifecycleManager(String appId) {
if (lifecycleManager == null) {
appUniqueId = celebornConf.appUniqueIdWithUUIDSuffix(appId);
lifecycleManager = new LifecycleManager(appUniqueId, celebornConf);
lifecycleManager.applicationCount().increment();
lifecycleManager.registerCancelShuffleCallback(SparkUtils::cancelShuffle);
if (celebornConf.clientStageRerunEnabled()) {
MapOutputTrackerMaster mapOutputTracker =
Expand All @@ -162,7 +167,9 @@ public <K, V, C> ShuffleHandle registerShuffle(
initializeLifecycleManager(appId);

lifecycleManager.shuffleCount().increment();
if (fallbackPolicyRunner.applyFallbackPolicies(dependency, lifecycleManager)) {
Option<ShuffleFallbackPolicy> fallbackPolicyOpt =
fallbackPolicyRunner.getActivatedFallbackPolicy(dependency, lifecycleManager);
if (fallbackPolicyOpt.isDefined()) {
if (conf.getBoolean("spark.dynamicAllocation.enabled", false)
&& !conf.getBoolean("spark.shuffle.service.enabled", false)) {
logger.error(
Expand All @@ -174,6 +181,14 @@ public <K, V, C> ShuffleHandle registerShuffle(
logger.warn("Fallback to vanilla Spark SortShuffleManager for shuffle: {}", shuffleId);
}
sortShuffleIds.add(shuffleId);
String shuffleFallbackPolicy = fallbackPolicyOpt.get().getClass().getName();
lifecycleManager.computeFallbackCounts(
lifecycleManager.shuffleFallbackCounts(), shuffleFallbackPolicy);
if (!fallbackApps.contains(appId)) {
fallbackApps.add(appId);
lifecycleManager.computeFallbackCounts(
lifecycleManager.applicationFallbackCounts(), shuffleFallbackPolicy);
}
return sortShuffleManager().registerShuffle(shuffleId, dependency);
} else {
lifecycleManager.registerAppShuffleDeterminate(
Expand Down
Loading

0 comments on commit 95503c3

Please sign in to comment.