From 094fe2813debbfd28e51fa7e035fa57f962fcf7a Mon Sep 17 00:00:00 2001 From: "Wang, Fei" Date: Thu, 21 Nov 2024 16:23:26 +0800 Subject: [PATCH] [CELEBORN-1728] Fix NPE when failing to connect to celeborn worker ### What changes were proposed in this pull request? Fix NPE. When failed to connect to celeborn worker, the currentReader might be `null`. ### Why are the changes needed? I am testing https://github.com/apache/celeborn/pull/2921 in the celeborn cluster. And set the `celeborn.data.io.connectionTimeout` to 30s for fetch failure testing, and it failed to connect to celeborn worker for 3 times, and then the currentReader was null. image https://github.com/turboFei/incubator-celeborn/blob/2be9682a34f97ff10b90f22f60d9fea2bc5b81b7/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java#L672 ``` 24/11/20 16:15:41 ERROR Executor: Exception in task 16238.0 in stage 9.0 (TID 108550) java.lang.NullPointerException at org.apache.celeborn.client.read.CelebornInputStream$CelebornInputStreamImpl.fillBuffer(CelebornInputStream.java:672) at org.apache.celeborn.client.read.CelebornInputStream$CelebornInputStreamImpl.read(CelebornInputStream.java:515) at java.io.BufferedInputStream.fill(BufferedInputStream.java:246) at java.io.BufferedInputStream.read1(BufferedInputStream.java:286) at java.io.BufferedInputStream.read(BufferedInputStream.java:345) at java.io.DataInputStream.read(DataInputStream.java:149) at org.sparkproject.guava.io.ByteStreams.read(ByteStreams.java:899) at org.sparkproject.guava.io.ByteStreams.readFully(ByteStreams.java:733) at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.next(UnsafeRowSerializer.scala:127) at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.next(UnsafeRowSerializer.scala:110) at scala.collection.Iterator$$anon$11.next(Iterator.scala:496) at scala.collection.Iterator$$anon$10.next(Iterator.scala:461) at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29) at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:40) at scala.collection.Iterator$$anon$10.next(Iterator.scala:461) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage8.sort_addToSorter_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage8.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:756) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:225) at org.apache.spark.sql.execution.SortExec.$anonfun$doExecute$1(SortExec.scala:119) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:498) ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? GA. Closes #2933 from turboFei/npe_reader. Authored-by: Wang, Fei Signed-off-by: mingji --- .../org/apache/celeborn/client/read/CelebornInputStream.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java b/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java index dfbb7c502b8..e63eaae621d 100644 --- a/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java +++ b/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java @@ -664,7 +664,7 @@ private boolean fillBuffer() throws IOException { appShuffleId, shuffleId, partitionId, - currentReader.getLocation(), + Optional.ofNullable(currentReader).map(PartitionReader::getLocation).orElse(null), e); IOException ioe; if (e instanceof IOException) { @@ -692,7 +692,7 @@ private boolean fillBuffer() throws IOException { appShuffleId, shuffleId, partitionId, - currentReader.getLocation(), + Optional.ofNullable(currentReader).map(PartitionReader::getLocation).orElse(null), e); throw e; }