Skip to content

Commit

Permalink
[CELEBORN-1413] Support Spark 4.0
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
To support Spark 4.0.0 preview.

### Why are the changes needed?
1. Changed Scala to 2.13.
2. Introduce columnar shuffle module for spark 4.0.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Cluster test.

Closes #2813 from FMX/b1413.

Authored-by: mingji <[email protected]>
Signed-off-by: Shuang <[email protected]>
  • Loading branch information
FMX authored and RexXiong committed Dec 24, 2024
1 parent 4b60dae commit fde6365
Show file tree
Hide file tree
Showing 56 changed files with 4,038 additions and 44 deletions.
6 changes: 5 additions & 1 deletion .github/workflows/maven.yml
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,12 @@ jobs:
run: |
SPARK_BINARY_VERSION=${{ matrix.spark }}
SPARK_MAJOR_VERSION=${SPARK_BINARY_VERSION%%.*}
SPARK_MODULE_NAME=$SPARK_MAJOR_VERSION
if [[ $SPARK_MAJOR_VERSION == "3" ]]; then
SPARK_MODULE_NAME="3-4"
fi
PROFILES="-Pgoogle-mirror,spark-${{ matrix.spark }}"
TEST_MODULES="client-spark/common,client-spark/spark-${SPARK_MAJOR_VERSION},client-spark/spark-${SPARK_MAJOR_VERSION}-shaded,tests/spark-it"
TEST_MODULES="client-spark/common,client-spark/spark-${SPARK_MODULE_NAME},client-spark/spark-${SPARK_MAJOR_VERSION}-columnar-common,client-spark/spark-${SPARK_MAJOR_VERSION}-shaded,tests/spark-it"
build/mvn $PROFILES -pl $TEST_MODULES -am clean install -DskipTests
build/mvn $PROFILES -pl $TEST_MODULES -Dspark.shuffle.sort.io.plugin.class=${{ matrix.shuffle-plugin-class }} test
- name: Upload test log
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,12 @@

import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.scheduler.DAGScheduler;

public class SparkCommonUtils {
public static void validateAttemptConfig(SparkConf conf) throws IllegalArgumentException {
int DEFAULT_MAX_CONSECUTIVE_STAGE_ATTEMPTS = 4;
int maxStageAttempts =
conf.getInt(
"spark.stage.maxConsecutiveAttempts",
DAGScheduler.DEFAULT_MAX_CONSECUTIVE_STAGE_ATTEMPTS());
conf.getInt("spark.stage.maxConsecutiveAttempts", DEFAULT_MAX_CONSECUTIVE_STAGE_ATTEMPTS);
// In Spark 2, the parameter is referred to as MAX_TASK_FAILURES, while in Spark 3, it has been
// changed to TASK_MAX_FAILURES. The default value for both is consistently set to 4.
int maxTaskAttempts = conf.getInt("spark.task.maxFailures", 4);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@
<relativePath>../../pom.xml</relativePath>
</parent>

<artifactId>celeborn-client-spark-3_${scala.binary.version}</artifactId>
<artifactId>celeborn-client-spark-3-4_${scala.binary.version}</artifactId>
<packaging>jar</packaging>
<name>Celeborn Client for Spark 3</name>
<name>Celeborn Client for Spark 3 and 4</name>

<dependencies>
<dependency>
Expand Down
2 changes: 1 addition & 1 deletion client-spark/spark-3-columnar-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
<dependencies>
<dependency>
<groupId>org.apache.celeborn</groupId>
<artifactId>celeborn-client-spark-3_${scala.binary.version}</artifactId>
<artifactId>celeborn-client-spark-3-4_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
Expand Down
2 changes: 1 addition & 1 deletion client-spark/spark-3-columnar-shuffle/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
</dependency>
<dependency>
<groupId>org.apache.celeborn</groupId>
<artifactId>celeborn-client-spark-3_${scala.binary.version}</artifactId>
<artifactId>celeborn-client-spark-3-4_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
Expand Down
2 changes: 1 addition & 1 deletion client-spark/spark-3-shaded/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
<dependencies>
<dependency>
<groupId>org.apache.celeborn</groupId>
<artifactId>celeborn-client-spark-3_${scala.binary.version}</artifactId>
<artifactId>celeborn-client-spark-3-4_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
Expand Down
68 changes: 68 additions & 0 deletions client-spark/spark-4-columnar-shuffle/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.celeborn</groupId>
<artifactId>celeborn-parent_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<relativePath>../../pom.xml</relativePath>
</parent>

<artifactId>celeborn-spark-4-columnar-shuffle_${scala.binary.version}</artifactId>
<packaging>jar</packaging>
<name>Celeborn Client for Spark 4 Columnar Shuffle</name>

<dependencies>
<dependency>
<groupId>org.apache.celeborn</groupId>
<artifactId>celeborn-spark-3-columnar-common_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.celeborn</groupId>
<artifactId>celeborn-client_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.celeborn</groupId>
<artifactId>celeborn-client-spark-3_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-inline</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.shuffle.celeborn;

import java.io.IOException;

import scala.Product2;

import com.google.common.annotations.VisibleForTesting;
import org.apache.spark.ShuffleDependency;
import org.apache.spark.TaskContext;
import org.apache.spark.annotation.Private;
import org.apache.spark.serializer.Serializer;
import org.apache.spark.shuffle.ShuffleWriteMetricsReporter;
import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
import org.apache.spark.sql.execution.UnsafeRowSerializer;
import org.apache.spark.sql.execution.columnar.CelebornBatchBuilder;
import org.apache.spark.sql.execution.columnar.CelebornColumnarBatchBuilder;
import org.apache.spark.sql.execution.columnar.CelebornColumnarBatchCodeGenBuild;
import org.apache.spark.sql.execution.metric.SQLMetric;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.celeborn.client.ShuffleClient;
import org.apache.celeborn.common.CelebornConf;

@Private
public class ColumnarHashBasedShuffleWriter<K, V, C> extends HashBasedShuffleWriter<K, V, C> {

private static final Logger logger =
LoggerFactory.getLogger(ColumnarHashBasedShuffleWriter.class);

private final int stageId;
private final int shuffleId;
private final CelebornBatchBuilder[] celebornBatchBuilders;
private final StructType schema;
private final Serializer depSerializer;
private final boolean isColumnarShuffle;
private final int columnarShuffleBatchSize;
private final boolean columnarShuffleCodeGenEnabled;
private final boolean columnarShuffleDictionaryEnabled;
private final double columnarShuffleDictionaryMaxFactor;

public ColumnarHashBasedShuffleWriter(
int shuffleId,
CelebornShuffleHandle<K, V, C> handle,
TaskContext taskContext,
CelebornConf conf,
ShuffleClient client,
ShuffleWriteMetricsReporter metrics,
SendBufferPool sendBufferPool)
throws IOException {
super(shuffleId, handle, taskContext, conf, client, metrics, sendBufferPool);
columnarShuffleBatchSize = conf.columnarShuffleBatchSize();
columnarShuffleCodeGenEnabled = conf.columnarShuffleCodeGenEnabled();
columnarShuffleDictionaryEnabled = conf.columnarShuffleDictionaryEnabled();
columnarShuffleDictionaryMaxFactor = conf.columnarShuffleDictionaryMaxFactor();
ShuffleDependency<?, ?, ?> shuffleDependency = handle.dependency();
this.stageId = taskContext.stageId();
this.shuffleId = shuffleDependency.shuffleId();
this.schema = CustomShuffleDependencyUtils.getSchema(shuffleDependency);
this.depSerializer = handle.dependency().serializer();
this.celebornBatchBuilders =
new CelebornBatchBuilder[handle.dependency().partitioner().numPartitions()];
this.isColumnarShuffle = schema != null && CelebornBatchBuilder.supportsColumnarType(schema);
}

@Override
protected void fastWrite0(scala.collection.Iterator iterator)
throws IOException, InterruptedException {
if (isColumnarShuffle) {
logger.info("Fast columnar write of columnar shuffle {} for stage {}.", shuffleId, stageId);
fastColumnarWrite0(iterator);
} else {
super.fastWrite0(iterator);
}
}

private void fastColumnarWrite0(scala.collection.Iterator iterator) throws IOException {
final scala.collection.Iterator<Product2<Integer, UnsafeRow>> records = iterator;

SQLMetric dataSize = SparkUtils.getDataSize((UnsafeRowSerializer) depSerializer);
while (records.hasNext()) {
final Product2<Integer, UnsafeRow> record = records.next();
final int partitionId = record._1();
final UnsafeRow row = record._2();

if (celebornBatchBuilders[partitionId] == null) {
CelebornBatchBuilder columnBuilders;
if (columnarShuffleCodeGenEnabled && !columnarShuffleDictionaryEnabled) {
columnBuilders =
new CelebornColumnarBatchCodeGenBuild().create(schema, columnarShuffleBatchSize);
} else {
columnBuilders =
new CelebornColumnarBatchBuilder(
schema,
columnarShuffleBatchSize,
columnarShuffleDictionaryMaxFactor,
columnarShuffleDictionaryEnabled);
}
columnBuilders.newBuilders();
celebornBatchBuilders[partitionId] = columnBuilders;
}

celebornBatchBuilders[partitionId].writeRow(row);
if (celebornBatchBuilders[partitionId].getRowCnt() >= columnarShuffleBatchSize) {
byte[] arr = celebornBatchBuilders[partitionId].buildColumnBytes();
pushGiantRecord(partitionId, arr, arr.length);
if (dataSize != null) {
dataSize.add(arr.length);
}
celebornBatchBuilders[partitionId].newBuilders();
}
tmpRecordsWritten++;
}
}

@Override
protected void closeWrite() throws IOException {
if (canUseFastWrite() && isColumnarShuffle) {
closeColumnarWrite();
} else {
super.closeWrite();
}
}

private void closeColumnarWrite() throws IOException {
SQLMetric dataSize = SparkUtils.getDataSize((UnsafeRowSerializer) depSerializer);
for (int i = 0; i < celebornBatchBuilders.length; i++) {
final CelebornBatchBuilder builders = celebornBatchBuilders[i];
if (builders != null && builders.getRowCnt() > 0) {
byte[] buffers = builders.buildColumnBytes();
if (dataSize != null) {
dataSize.add(buffers.length);
}
mergeData(i, buffers, 0, buffers.length);
// free buffer
celebornBatchBuilders[i] = null;
}
}
}

@VisibleForTesting
public boolean isColumnarShuffle() {
return isColumnarShuffle;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.shuffle.celeborn

import org.apache.spark.{ShuffleDependency, TaskContext}
import org.apache.spark.serializer.SerializerInstance
import org.apache.spark.shuffle.ShuffleReadMetricsReporter
import org.apache.spark.sql.execution.UnsafeRowSerializer
import org.apache.spark.sql.execution.columnar.{CelebornBatchBuilder, CelebornColumnarBatchSerializer}

import org.apache.celeborn.common.CelebornConf

class CelebornColumnarShuffleReader[K, C](
handle: CelebornShuffleHandle[K, _, C],
startPartition: Int,
endPartition: Int,
startMapIndex: Int = 0,
endMapIndex: Int = Int.MaxValue,
context: TaskContext,
conf: CelebornConf,
metrics: ShuffleReadMetricsReporter,
shuffleIdTracker: ExecutorShuffleIdTracker)
extends CelebornShuffleReader[K, C](
handle,
startPartition,
endPartition,
startMapIndex,
endMapIndex,
context,
conf,
metrics,
shuffleIdTracker) {

override def newSerializerInstance(dep: ShuffleDependency[K, _, C]): SerializerInstance = {
val schema = CustomShuffleDependencyUtils.getSchema(dep)
if (schema != null && CelebornBatchBuilder.supportsColumnarType(schema)) {
logInfo(s"Creating column batch serializer of columnar shuffle ${dep.shuffleId}.")
val dataSize = SparkUtils.getDataSize(dep.serializer.asInstanceOf[UnsafeRowSerializer])
new CelebornColumnarBatchSerializer(
schema,
conf.columnarShuffleOffHeapEnabled,
dataSize).newInstance()
} else {
super.newSerializerInstance(dep)
}
}
}
Loading

0 comments on commit fde6365

Please sign in to comment.