Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix unexpected sink side unsubscribe behavior #562

Merged
merged 3 commits into from
Oct 9, 2023

Conversation

Andyz26
Copy link
Collaborator

@Andyz26 Andyz26 commented Oct 6, 2023

Context

There is a race condition on the subscription handler where the current state can be empty if the subscription is established before the handler service is started, which causes non-perpetual jobs to silently fail at startup.

  • Added handling + logs to this case.
  • Added extra test in integration test.
  • Removed sinkSubscriptionHandlerFactory on TaskExecutor level (not used anymore).

Checklist

  • ./gradlew build compiles code correctly
  • Added new tests where applicable
  • ./gradlew test passes all tests
  • Extended README or added javadocs where applicable

@Andyz26 Andyz26 had a problem deploying to Integrate Pull Request October 6, 2023 23:31 — with GitHub Actions Failure
@Andyz26 Andyz26 changed the title fix sink unsubscrib Fix unexpected sink side un-subscription Oct 6, 2023
@Andyz26 Andyz26 changed the title Fix unexpected sink side un-subscription Fix unexpected sink side unsubscribe behavior Oct 6, 2023
@github-actions
Copy link

github-actions bot commented Oct 6, 2023

Test Results

545 tests  +1   537 ✔️ +1   6m 46s ⏱️ - 1m 8s
128 suites ±0       8 💤 ±0 
128 files   ±0       0 ±0 

Results for commit 7210270. ± Comparison against base commit 49cd280.

This pull request removes 1 and adds 2 tests. Note that renamed tests count towards both.
TestContainerHelloWorld ‑ helloWorld
TestContainerHelloWorld ‑ testQuickSubmitJob
TestContainerHelloWorld ‑ testRegularSubmitJob

♻️ This comment has been updated with latest results.

Copy link
Contributor

@liuml07 liuml07 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM but a second review is appreciated. I have limited context.

@@ -53,7 +53,9 @@ class SubscriptionStateHandlerImpl extends AbstractScheduledService implements S

@Override
public void startUp() {
currentState.set(SubscriptionState.of(clock));
if (currentState.get() == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this if check needed?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems like you are expecting scenarios where startUp can be called multiple times?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The currentState can be set at startUp or the first update from subscribe event (currently there is no set at subscribe so if subscribe happens before startup it hits NPE).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to ensure that none of the members of the class are called before startUp, as this would violate the semantics of a Service class. To prevent this, I suggest setting up preconditions in the listener methods rather than working around the issue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it might be a better idea to start the subscription handler in

private void setupSubscriptionStateHandler(ExecuteStageRequest executeStageRequest) {

@@ -81,11 +83,20 @@ protected void runOneIteration() {

@Override
public void onSinkUnsubscribed() {
if (currentState.get() == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two operations in the method i.e. get-then-update are not atomic, is it ok?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

currentState should never be unset back to null in this instance, thus no race on getting non-null then reading null.

currentState.updateAndGet(SubscriptionState::onSinkUnsubscribed);
}

@Override
public void onSinkSubscribed() {
// sink subscription can happen before the startup of this service.
if (currentState.get() == null) {
currentState.compareAndSet(null, SubscriptionState.of(clock));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same question, I was thinking the if-check is not necessary. currentState.compareAndSet(null, SubscriptionState.of(clock)); works just fine.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah it's not for thread safety but just a minor save to avoid the extra SubscriptionState.of(clock) creation.

if (onUnsubscribeAction != null)
onUnsubscribeAction.call();
}
.doOnCompleted(() -> logger.info("Sink observable subscription completed."))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: is other information useful in the log like subscribe etc.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

subscribe?

@Andyz26 Andyz26 had a problem deploying to Integrate Pull Request October 9, 2023 19:49 — with GitHub Actions Failure
@@ -53,7 +53,10 @@ class SubscriptionStateHandlerImpl extends AbstractScheduledService implements S

@Override
public void startUp() {
currentState.set(SubscriptionState.of(clock));
if (currentState.get() == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would convert this into a Precondition instead.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it has to fail if the service got started again. Let me remove the check here directly (atomic set with null should be sufficient.)

@@ -478,6 +478,12 @@ private void setupSubscriptionStateHandler(ExecuteStageRequest executeStageReque
};

this.subscriptionStateHandler = subscriptionStateHandler;
try {
this.subscriptionStateHandler.startAsync().awaitRunning(Duration.of(5, ChronoUnit.SECONDS));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be TimeUnit no?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TimeUnit is in java.util.concurrent and this Duration.of requires the temporal unit interface which is from java.time.

@Andyz26 Andyz26 had a problem deploying to Integrate Pull Request October 9, 2023 21:41 — with GitHub Actions Failure
@Andyz26 Andyz26 merged commit d94a7b0 into master Oct 9, 2023
1 of 2 checks passed
@Andyz26 Andyz26 deleted the andyz/fixSinkUnsubError branch October 9, 2023 23:07
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants