You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
publicinterfaceChannelStateWriterextendsCloseable {
/** * Channel state write result. */classChannelStateWriteResult {
finalCompletableFuture<Collection<InputChannelStateHandle>> inputChannelStateHandles;
finalCompletableFuture<Collection<ResultSubpartitionStateHandle>> resultSubpartitionStateHandles;
ChannelStateWriteResult() {
this(newCompletableFuture<>(), newCompletableFuture<>());
}
ChannelStateWriteResult(
CompletableFuture<Collection<InputChannelStateHandle>> inputChannelStateHandles,
CompletableFuture<Collection<ResultSubpartitionStateHandle>> resultSubpartitionStateHandles) {
this.inputChannelStateHandles = inputChannelStateHandles;
this.resultSubpartitionStateHandles = resultSubpartitionStateHandles;
}
publicCompletableFuture<Collection<InputChannelStateHandle>> getInputChannelStateHandles() {
returninputChannelStateHandles;
}
publicCompletableFuture<Collection<ResultSubpartitionStateHandle>> getResultSubpartitionStateHandles() {
returnresultSubpartitionStateHandles;
}
publicstaticfinalChannelStateWriteResultEMPTY = newChannelStateWriteResult(
CompletableFuture.completedFuture(Collections.emptyList()),
CompletableFuture.completedFuture(Collections.emptyList())
);
publicvoidfail(Throwablee) {
inputChannelStateHandles.completeExceptionally(e);
resultSubpartitionStateHandles.completeExceptionally(e);
}
booleanisDone() {
returninputChannelStateHandles.isDone() && resultSubpartitionStateHandles.isDone();
}
}
/** * Sequence number for the buffers that were saved during the previous execution attempt; then restored; and now are * to be saved again (as opposed to the buffers received from the upstream or from the operator). */intSEQUENCE_NUMBER_RESTORED = -1;
/** * Signifies that buffer sequence number is unknown (e.g. if passing sequence numbers is not implemented). */intSEQUENCE_NUMBER_UNKNOWN = -2;
/** * Initiate write of channel state for the given checkpoint id. */voidstart(longcheckpointId, CheckpointOptionscheckpointOptions);
/** * Add in-flight buffers from the {@link org.apache.flink.runtime.io.network.partition.consumer.InputChannel InputChannel}. * Must be called after {@link #start} (long)} and before {@link #finishInput(long)}. * Buffers are recycled after they are written or exception occurs. * @param startSeqNum sequence number of the 1st passed buffer. * It is intended to use for incremental snapshots. * If no data is passed it is ignored. * @param data zero or more <b>data</b> buffers ordered by their sequence numbers * @see org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter#SEQUENCE_NUMBER_RESTORED * @see org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter#SEQUENCE_NUMBER_UNKNOWN */voidaddInputData(longcheckpointId, InputChannelInfoinfo, intstartSeqNum, CloseableIterator<Buffer> data);
/** * Add in-flight buffers from the {@link org.apache.flink.runtime.io.network.partition.ResultSubpartition ResultSubpartition}. * Must be called after {@link #start} and before {@link #finishOutput(long)}. * Buffers are recycled after they are written or exception occurs. * @param startSeqNum sequence number of the 1st passed buffer. * It is intended to use for incremental snapshots. * If no data is passed it is ignored. * @param data zero or more <b>data</b> buffers ordered by their sequence numbers * @throws IllegalArgumentException if one or more passed buffers {@link Buffer#isBuffer() isn't a buffer} * @see org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter#SEQUENCE_NUMBER_RESTORED * @see org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter#SEQUENCE_NUMBER_UNKNOWN */voidaddOutputData(longcheckpointId, ResultSubpartitionInfoinfo, intstartSeqNum, Buffer... data) throwsIllegalArgumentException;
/** * Finalize write of channel state data for the given checkpoint id. * Must be called after {@link #start(long, CheckpointOptions)} and all of the input data of the given checkpoint added. * When both {@link #finishInput} and {@link #finishOutput} were called the results can be (eventually) obtained * using {@link #getAndRemoveWriteResult} */voidfinishInput(longcheckpointId);
/** * Finalize write of channel state data for the given checkpoint id. * Must be called after {@link #start(long, CheckpointOptions)} and all of the output data of the given checkpoint added. * When both {@link #finishInput} and {@link #finishOutput} were called the results can be (eventually) obtained * using {@link #getAndRemoveWriteResult} */voidfinishOutput(longcheckpointId);
/** * Aborts the checkpoint and fails pending result for this checkpoint. * @param cleanup true if {@link #getAndRemoveWriteResult(long)} is not supposed to be called afterwards. */voidabort(longcheckpointId, Throwablecause, booleancleanup);
/** * Must be called after {@link #start(long, CheckpointOptions)} once. * @throws IllegalArgumentException if the passed checkpointId is not known. */ChannelStateWriteResultgetAndRemoveWriteResult(longcheckpointId) throwsIllegalArgumentException;
ChannelStateWriterNO_OP = newNoOpChannelStateWriter();
/** * No-op implementation of {@link ChannelStateWriter}. */classNoOpChannelStateWriterimplementsChannelStateWriter {
@Overridepublicvoidstart(longcheckpointId, CheckpointOptionscheckpointOptions) {
}
@OverridepublicvoidaddInputData(longcheckpointId, InputChannelInfoinfo, intstartSeqNum, CloseableIterator<Buffer> data) {
}
@OverridepublicvoidaddOutputData(longcheckpointId, ResultSubpartitionInfoinfo, intstartSeqNum, Buffer... data) {
}
@OverridepublicvoidfinishInput(longcheckpointId) {
}
@OverridepublicvoidfinishOutput(longcheckpointId) {
}
@Overridepublicvoidabort(longcheckpointId, Throwablecause, booleancleanup) {
}
@OverridepublicChannelStateWriteResultgetAndRemoveWriteResult(longcheckpointId) {
returnnewChannelStateWriteResult(
CompletableFuture.completedFuture(Collections.emptyList()),
CompletableFuture.completedFuture(Collections.emptyList()));
}
@Overridepublicvoidclose() {
}
}
}
// 实现类publicclassChannelStateWriterImplimplementsChannelStateWriter {
privatestaticfinalLoggerLOG = LoggerFactory.getLogger(ChannelStateWriterImpl.class);
privatestaticfinalintDEFAULT_MAX_CHECKPOINTS = 1000; // includes max-concurrent-checkpoints + checkpoints to be aborted (scheduled via mailbox)privatefinalStringtaskName;
privatefinalChannelStateWriteRequestExecutorexecutor;
privatefinalConcurrentMap<Long, ChannelStateWriteResult> results;
privatefinalintmaxCheckpoints;
/** * Creates a {@link ChannelStateWriterImpl} with {@link #DEFAULT_MAX_CHECKPOINTS} as {@link #maxCheckpoints}. */publicChannelStateWriterImpl(StringtaskName, CheckpointStorageWorkerViewstreamFactoryResolver) {
this(taskName, streamFactoryResolver, DEFAULT_MAX_CHECKPOINTS);
}
/** * Creates a {@link ChannelStateWriterImpl} with {@link ChannelStateSerializerImpl default} {@link ChannelStateSerializer}, * and a {@link ChannelStateWriteRequestExecutorImpl}. * @param taskName * @param streamFactoryResolver a factory to obtain output stream factory for a given checkpoint * @param maxCheckpoints maximum number of checkpoints to be written currently or finished but not taken yet. */ChannelStateWriterImpl(StringtaskName, CheckpointStorageWorkerViewstreamFactoryResolver, intmaxCheckpoints) {
this(
taskName,
newConcurrentHashMap<>(maxCheckpoints),
newChannelStateWriteRequestExecutorImpl(taskName, newChannelStateWriteRequestDispatcherImpl(streamFactoryResolver, newChannelStateSerializerImpl())),
maxCheckpoints);
}
ChannelStateWriterImpl(
StringtaskName,
ConcurrentMap<Long, ChannelStateWriteResult> results,
ChannelStateWriteRequestExecutorexecutor,
intmaxCheckpoints) {
this.taskName = taskName;
this.results = results;
this.maxCheckpoints = maxCheckpoints;
this.executor = executor;
}
@Overridepublicvoidstart(longcheckpointId, CheckpointOptionscheckpointOptions) {
LOG.debug("{} starting checkpoint {} ({})", taskName, checkpointId, checkpointOptions);
ChannelStateWriteResultresult = newChannelStateWriteResult();
// 发送checkpoint开启请求ChannelStateWriteResultput = results.computeIfAbsent(checkpointId, id -> {
Preconditions.checkState(results.size() < maxCheckpoints, String.format("%s can't start %d, results.size() > maxCheckpoints: %d > %d", taskName, checkpointId, results.size(), maxCheckpoints));
// 发送请求enqueue(newCheckpointStartRequest(checkpointId, result, checkpointOptions.getTargetLocation()), false);
returnresult;
});
Preconditions.checkArgument(put == result, taskName + " result future already present for checkpoint " + checkpointId);
}
@OverridepublicvoidaddInputData(longcheckpointId, InputChannelInfoinfo, intstartSeqNum, CloseableIterator<Buffer> iterator) {
LOG.debug(
"{} adding input data, checkpoint {}, channel: {}, startSeqNum: {}",
taskName,
checkpointId,
info,
startSeqNum);
// 将inputChannel信息写入buffer迭代器enqueue(write(checkpointId, info, iterator), false);
}
@OverridepublicvoidaddOutputData(longcheckpointId, ResultSubpartitionInfoinfo, intstartSeqNum, Buffer... data) {
LOG.debug(
"{} adding output data, checkpoint {}, channel: {}, startSeqNum: {}, num buffers: {}",
taskName,
checkpointId,
info,
startSeqNum,
data == null ? 0 : data.length);
// 发送写请求enqueue(write(checkpointId, info, data), false);
}
@OverridepublicvoidfinishInput(longcheckpointId) {
LOG.debug("{} finishing input data, checkpoint {}", taskName, checkpointId);
// 发送完成请求inputenqueue(completeInput(checkpointId), false);
}
@OverridepublicvoidfinishOutput(longcheckpointId) {
LOG.debug("{} finishing output data, checkpoint {}", taskName, checkpointId);
enqueue(completeOutput(checkpointId), false);
}
@Overridepublicvoidabort(longcheckpointId, Throwablecause, booleancleanup) {
LOG.debug("{} aborting, checkpoint {}", taskName, checkpointId);
// 中断开始的checkpoint和未开始的enqueue(ChannelStateWriteRequest.abort(checkpointId, cause), true); // abort already startedenqueue(ChannelStateWriteRequest.abort(checkpointId, cause), false); // abort enqueued but not startedif (cleanup) {
results.remove(checkpointId);
}
}
@OverridepublicChannelStateWriteResultgetAndRemoveWriteResult(longcheckpointId) {
LOG.debug("{} requested write result, checkpoint {}", taskName, checkpointId);
ChannelStateWriteResultresult = results.remove(checkpointId);
Preconditions.checkArgument(result != null, taskName + " channel state write result not found for checkpoint " + checkpointId);
returnresult;
}
publicvoidopen() {
executor.start();
}
@Overridepublicvoidclose() throwsIOException {
LOG.debug("close, dropping checkpoints {}", results.keySet());
results.clear();
executor.close();
}
privatevoidenqueue(ChannelStateWriteRequestrequest, booleanatTheFront) {
// state check and previous errors check are performed inside the workertry {
if (atTheFront) {
executor.submitPriority(request);
} else {
// 提交checkpount请求executor.submit(request);
}
} catch (Exceptione) {
RuntimeExceptionwrapped = newRuntimeException("unable to send request to worker", e);
try {
request.cancel(e);
} catch (ExceptioncancelException) {
wrapped.addSuppressed(cancelException);
}
throwwrapped;
}
}
privatestaticStringbuildBufferTypeErrorMessage(Bufferbuffer) {
try {
AbstractEventevent = EventSerializer.fromBuffer(buffer, ChannelStateWriterImpl.class.getClassLoader());
returnString.format("Should be buffer but [%s] found", event);
}
catch (Exceptionex) {
return"Should be buffer";
}
}
}
publicclassCheckpointMetadataimplementsDisposable {
/** The checkpoint ID. */privatefinallongcheckpointId;
/** The operator states. */// 算子状态privatefinalCollection<OperatorState> operatorStates;
/** The states generated by the CheckpointCoordinator. */// checkpoint协调器生成的状态privatefinalCollection<MasterState> masterStates;
publicCheckpointMetadata(longcheckpointId, Collection<OperatorState> operatorStates, Collection<MasterState> masterStates) {
this.checkpointId = checkpointId;
this.operatorStates = operatorStates;
this.masterStates = checkNotNull(masterStates, "masterStates");
}
publiclonggetCheckpointId() {
returncheckpointId;
}
publicCollection<OperatorState> getOperatorStates() {
returnoperatorStates;
}
publicCollection<MasterState> getMasterStates() {
returnmasterStates;
}
@Overridepublicvoiddispose() throwsException {
for (OperatorStateoperatorState : operatorStates) {
operatorState.discardState();
}
operatorStates.clear();
masterStates.clear();
}
@OverridepublicStringtoString() {
return"Checkpoint Metadata";
}
}
MetadataSerializer
统一元数据序列化器,提供savepoint反序列化接口
publicinterfaceMetadataSerializerextendsVersioned {
/** * 根据输入流反序列化savepoint * Deserializes a savepoint from an input stream. * * @param dis Input stream to deserialize savepoint from * @param userCodeClassLoader the user code class loader * @param externalPointer the external pointer of the given checkpoint * @return The deserialized savepoint * @throws IOException Serialization failures are forwarded */CheckpointMetadatadeserialize(DataInputStreamdis, ClassLoaderuserCodeClassLoader, StringexternalPointer) throwsIOException;
}
MetadataV2V3SerializerBase
基本checkpoint元数据布局
* +--------------+---------------+-----------------+
* | checkpointID | master states | operator states |
* +--------------+---------------+-----------------+
master state
* +--------------+---------------------+---------+------+---------------+
* | magic number | num remaining bytes | version | name | payload bytes |
* +--------------+---------------------+---------+------+---------------+
privateExecutionGraphcreateAndRestoreExecutionGraph(
JobManagerJobMetricGroupcurrentJobManagerJobMetricGroup,
ShuffleMaster<?> shuffleMaster,
JobMasterPartitionTrackerpartitionTracker,
ExecutionDeploymentTrackerexecutionDeploymentTracker) throwsException {
ExecutionGraphnewExecutionGraph = createExecutionGraph(currentJobManagerJobMetricGroup, shuffleMaster, partitionTracker, executionDeploymentTracker);
finalCheckpointCoordinatorcheckpointCoordinator = newExecutionGraph.getCheckpointCoordinator();
// 恢复checkpoint状态if (checkpointCoordinator != null) {
// check whether we find a valid checkpointif (!checkpointCoordinator.restoreLatestCheckpointedStateToAll(
newHashSet<>(newExecutionGraph.getAllVertices().values()),
false)) {
// check whether we can restore from a savepoint// 检查恢复状态tryRestoreExecutionGraphFromSavepoint(newExecutionGraph, jobGraph.getSavepointRestoreSettings());
}
}
returnnewExecutionGraph;
}
// 恢复savepointprivatevoidtryRestoreExecutionGraphFromSavepoint(ExecutionGraphexecutionGraphToRestore, SavepointRestoreSettingssavepointRestoreSettings) throwsException {
if (savepointRestoreSettings.restoreSavepoint()) {
finalCheckpointCoordinatorcheckpointCoordinator = executionGraphToRestore.getCheckpointCoordinator();
if (checkpointCoordinator != null) {
// 恢复savepointcheckpointCoordinator.restoreSavepoint(
savepointRestoreSettings.getRestorePath(),
savepointRestoreSettings.allowNonRestoredState(),
executionGraphToRestore.getAllVertices(),
userCodeLoader);
}
}
}
恢复savepoint
publicbooleanrestoreSavepoint(
StringsavepointPointer,
booleanallowNonRestored,
Map<JobVertexID, ExecutionJobVertex> tasks,
ClassLoaderuserClassLoader) throwsException {
Preconditions.checkNotNull(savepointPointer, "The savepoint path cannot be null.");
LOG.info("Starting job {} from savepoint {} ({})",
job, savepointPointer, (allowNonRestored ? "allowing non restored state" : ""));
// 解析checkpoint pathfinalCompletedCheckpointStorageLocationcheckpointLocation = checkpointStorage.resolveCheckpoint(savepointPointer);
// Load the savepoint as a checkpoint into the system// 加载校验checkpointCompletedCheckpointsavepoint = Checkpoints.loadAndValidateCheckpoint(
job, tasks, checkpointLocation, userClassLoader, allowNonRestored);
// 添加到完成的checkpoint存储,底层存储为双端队列completedCheckpointStore.addCheckpoint(savepoint);
// Reset the checkpoint ID counterlongnextCheckpointId = savepoint.getCheckpointID() + 1;
// 设置下次checkpointidcheckpointIdCounter.setCount(nextCheckpointId);
LOG.info("Reset the checkpoint ID of job {} to {}.", job, nextCheckpointId);
// 恢复最后checkpoint状态returnrestoreLatestCheckpointedStateInternal(newHashSet<>(tasks.values()), true, true, allowNonRestored);
}
恢复最新的checkpoint状态
privatebooleanrestoreLatestCheckpointedStateInternal(
finalSet<ExecutionJobVertex> tasks,
finalbooleanrestoreCoordinators,
finalbooleanerrorIfNoCheckpoint,
finalbooleanallowNonRestoredState) throwsException {
synchronized (lock) {
if (shutdown) {
thrownewIllegalStateException("CheckpointCoordinator is shut down");
}
// We create a new shared state registry object, so that all pending async disposal requests from previous// runs will go against the old object (were they can do no harm).// This must happen under the checkpoint lock.sharedStateRegistry.close();
sharedStateRegistry = sharedStateRegistryFactory.create(executor);
// Recover the checkpoints, TODO this could be done only when there is a new leader, not on each recovery// 恢复checkpointcompletedCheckpointStore.recover();
// Now, we re-register all (shared) states from the checkpoint store with the new registryfor (CompletedCheckpointcompletedCheckpoint : completedCheckpointStore.getAllCheckpoints()) {
// 注册共享状态之后恢复completedCheckpoint.registerSharedStatesAfterRestored(sharedStateRegistry);
}
LOG.debug("Status of the shared state registry of job {} after restore: {}.", job, sharedStateRegistry);
// Restore from the latest checkpoint// 获取最后的checkpointCompletedCheckpointlatest = completedCheckpointStore.getLatestCheckpoint(isPreferCheckpointForRecovery);
if (latest == null) {
if (errorIfNoCheckpoint) {
thrownewIllegalStateException("No completed checkpoint available");
} else {
LOG.debug("Resetting the master hooks.");
MasterHooks.reset(masterHooks.values(), LOG);
returnfalse;
}
}
LOG.info("Restoring job {} from latest valid checkpoint: {}.", job, latest);
// re-assign the task statesfinalMap<OperatorID, OperatorState> operatorStates = latest.getOperatorStates();
// 用于恢复savepoint和恢复checkpointStateAssignmentOperationstateAssignmentOperation =
newStateAssignmentOperation(latest.getCheckpointID(), tasks, operatorStates, allowNonRestoredState);
// 分配状态stateAssignmentOperation.assignStates();
// call master hooks for restore. we currently call them also on "regional restore" because// there is no other failure notification mechanism in the master hooks// ultimately these should get removed anyways in favor of the operator coordinatorsMasterHooks.restoreMasterHooks(
masterHooks,
latest.getMasterHookStates(),
latest.getCheckpointID(),
allowNonRestoredState,
LOG);
if (restoreCoordinators) {
restoreStateToCoordinators(operatorStates);
}
// update metricsif (statsTracker != null) {
longrestoreTimestamp = System.currentTimeMillis();
RestoredCheckpointStatsrestored = newRestoredCheckpointStats(
latest.getCheckpointID(),
latest.getProperties(),
restoreTimestamp,
latest.getExternalPointer());
statsTracker.reportRestoredCheckpoint(restored);
}
returntrue;
}
}
CheckpointCoordinator机制
初始化CheckpointCoordinator
publicCheckpointCoordinator(
JobIDjob,
CheckpointCoordinatorConfigurationchkConfig,
ExecutionVertex[] tasksToTrigger,
ExecutionVertex[] tasksToWaitFor,
ExecutionVertex[] tasksToCommitTo,
Collection<OperatorCoordinatorCheckpointContext> coordinatorsToCheckpoint,
CheckpointIDCountercheckpointIDCounter,
CompletedCheckpointStorecompletedCheckpointStore,
StateBackendcheckpointStateBackend,
Executorexecutor,
ScheduledExecutortimer,
SharedStateRegistryFactorysharedStateRegistryFactory,
CheckpointFailureManagerfailureManager,
Clockclock) {
// sanity checkscheckNotNull(checkpointStateBackend);
// max "in between duration" can be one year - this is to prevent numeric overflows// 获取最小checkpoint暂停间隔longminPauseBetweenCheckpoints = chkConfig.getMinPauseBetweenCheckpoints();
if (minPauseBetweenCheckpoints > 365L * 24 * 60 * 60 * 1_000) {
minPauseBetweenCheckpoints = 365L * 24 * 60 * 60 * 1_000;
}
// it does not make sense to schedule checkpoints more often then the desired// time between checkpoints// 获取基础间隔longbaseInterval = chkConfig.getCheckpointInterval();
// 如果基础间隔小雨最小checkpoint暂停间隔则基础间隔等于最小暂停if (baseInterval < minPauseBetweenCheckpoints) {
baseInterval = minPauseBetweenCheckpoints;
}
this.job = checkNotNull(job);
this.baseInterval = baseInterval;
this.checkpointTimeout = chkConfig.getCheckpointTimeout();
this.minPauseBetweenCheckpoints = minPauseBetweenCheckpoints;
this.tasksToTrigger = checkNotNull(tasksToTrigger);
this.tasksToWaitFor = checkNotNull(tasksToWaitFor);
this.tasksToCommitTo = checkNotNull(tasksToCommitTo);
this.coordinatorsToCheckpoint = Collections.unmodifiableCollection(coordinatorsToCheckpoint);
this.pendingCheckpoints = newLinkedHashMap<>();
this.checkpointIdCounter = checkNotNull(checkpointIDCounter);
this.completedCheckpointStore = checkNotNull(completedCheckpointStore);
this.executor = checkNotNull(executor);
this.sharedStateRegistryFactory = checkNotNull(sharedStateRegistryFactory);
this.sharedStateRegistry = sharedStateRegistryFactory.create(executor);
this.isPreferCheckpointForRecovery = chkConfig.isPreferCheckpointForRecovery();
this.failureManager = checkNotNull(failureManager);
this.clock = checkNotNull(clock);
this.isExactlyOnceMode = chkConfig.isExactlyOnce();
this.unalignedCheckpointsEnabled = chkConfig.isUnalignedCheckpointsEnabled();
this.recentPendingCheckpoints = newArrayDeque<>(NUM_GHOST_CHECKPOINT_IDS);
this.masterHooks = newHashMap<>();
this.timer = timer;
// 解析checkpoint参数this.checkpointProperties = CheckpointProperties.forCheckpoint(chkConfig.getCheckpointRetentionPolicy());
try {
// 创建checkpoint存储this.checkpointStorage = checkpointStateBackend.createCheckpointStorage(job);
// 初始化checkpointcheckpointStorage.initializeBaseLocations();
} catch (IOExceptione) {
thrownewFlinkRuntimeException("Failed to create checkpoint storage at checkpoint coordinator side.", e);
}
try {
// Make sure the checkpoint ID enumerator is running. Possibly// issues a blocking call to ZooKeeper.// 启动checkpointId计数器,提供基于Atomic内存计数器和Zookeeper计数器checkpointIDCounter.start();
} catch (Throwablet) {
thrownewRuntimeException("Failed to start checkpoint ID counter: " + t.getMessage(), t);
}
this.requestDecider = newCheckpointRequestDecider(
chkConfig.getMaxConcurrentCheckpoints(),
this::rescheduleTrigger,
this.clock,
this.minPauseBetweenCheckpoints,
this.pendingCheckpoints::size);
}
触发savepoint
publicCompletableFuture<CompletedCheckpoint> triggerSavepoint(@NullablefinalStringtargetLocation) {
// 是否不对齐checkpointfinalCheckpointPropertiesproperties = CheckpointProperties.forSavepoint(!unalignedCheckpointsEnabled);
// 触发savepointreturntriggerSavepointInternal(properties, false, targetLocation);
}
privateCompletableFuture<CompletedCheckpoint> triggerSavepointInternal(
finalCheckpointPropertiescheckpointProperties,
finalbooleanadvanceToEndOfEventTime,
@NullablefinalStringtargetLocation) {
checkNotNull(checkpointProperties);
// TODO, call triggerCheckpoint directly after removing timer thread// for now, execute the trigger in timer thread to avoid competition// 记录结果,异步调用finalCompletableFuture<CompletedCheckpoint> resultFuture = newCompletableFuture<>();
// 触发checkpointtimer.execute(() -> triggerCheckpoint(
checkpointProperties,
targetLocation,
false,
advanceToEndOfEventTime)
.whenComplete((completedCheckpoint, throwable) -> {
if (throwable == null) {
resultFuture.complete(completedCheckpoint);
} else {
resultFuture.completeExceptionally(throwable);
}
}));
returnresultFuture;
}
publicCompletableFuture<CompletedCheckpoint> triggerCheckpoint(
CheckpointPropertiesprops,
@NullableStringexternalSavepointLocation,
booleanisPeriodic,
booleanadvanceToEndOfTime) {
if (advanceToEndOfTime && !(props.isSynchronous() && props.isSavepoint())) {
returnFutureUtils.completedExceptionally(newIllegalArgumentException(
"Only synchronous savepoints are allowed to advance the watermark to MAX."));
}
// 创建checkpoint请求CheckpointTriggerRequestrequest = newCheckpointTriggerRequest(props, externalSavepointLocation, isPeriodic, advanceToEndOfTime);
// 关闭请求并且执行chooseRequestToExecute(request).ifPresent(this::startTriggeringCheckpoint);
returnrequest.onCompletionPromise;
}