-
Notifications
You must be signed in to change notification settings - Fork 201
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix unexpected sink side unsubscribe behavior #562
Conversation
Test Results545 tests +1 537 ✔️ +1 6m 46s ⏱️ - 1m 8s Results for commit 7210270. ± Comparison against base commit 49cd280. This pull request removes 1 and adds 2 tests. Note that renamed tests count towards both.
♻️ This comment has been updated with latest results. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM but a second review is appreciated. I have limited context.
@@ -53,7 +53,9 @@ class SubscriptionStateHandlerImpl extends AbstractScheduledService implements S | |||
|
|||
@Override | |||
public void startUp() { | |||
currentState.set(SubscriptionState.of(clock)); | |||
if (currentState.get() == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this if check needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems like you are expecting scenarios where startUp can be called multiple times?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The currentState can be set at startUp or the first update from subscribe event (currently there is no set at subscribe so if subscribe happens before startup it hits NPE).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to ensure that none of the members of the class are called before startUp, as this would violate the semantics of a Service class. To prevent this, I suggest setting up preconditions in the listener methods rather than working around the issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it might be a better idea to start the subscription handler in
private void setupSubscriptionStateHandler(ExecuteStageRequest executeStageRequest) {
@@ -81,11 +83,20 @@ protected void runOneIteration() { | |||
|
|||
@Override | |||
public void onSinkUnsubscribed() { | |||
if (currentState.get() == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These two operations in the method i.e. get-then-update are not atomic, is it ok?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
currentState should never be unset back to null in this instance, thus no race on getting non-null then reading null.
currentState.updateAndGet(SubscriptionState::onSinkUnsubscribed); | ||
} | ||
|
||
@Override | ||
public void onSinkSubscribed() { | ||
// sink subscription can happen before the startup of this service. | ||
if (currentState.get() == null) { | ||
currentState.compareAndSet(null, SubscriptionState.of(clock)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same question, I was thinking the if-check is not necessary. currentState.compareAndSet(null, SubscriptionState.of(clock));
works just fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah it's not for thread safety but just a minor save to avoid the extra SubscriptionState.of(clock) creation.
if (onUnsubscribeAction != null) | ||
onUnsubscribeAction.call(); | ||
} | ||
.doOnCompleted(() -> logger.info("Sink observable subscription completed.")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: is other information useful in the log like subscribe etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
subscribe?
@@ -53,7 +53,10 @@ class SubscriptionStateHandlerImpl extends AbstractScheduledService implements S | |||
|
|||
@Override | |||
public void startUp() { | |||
currentState.set(SubscriptionState.of(clock)); | |||
if (currentState.get() == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would convert this into a Precondition instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it has to fail if the service got started again. Let me remove the check here directly (atomic set with null should be sufficient.)
@@ -478,6 +478,12 @@ private void setupSubscriptionStateHandler(ExecuteStageRequest executeStageReque | |||
}; | |||
|
|||
this.subscriptionStateHandler = subscriptionStateHandler; | |||
try { | |||
this.subscriptionStateHandler.startAsync().awaitRunning(Duration.of(5, ChronoUnit.SECONDS)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be TimeUnit no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TimeUnit is in java.util.concurrent and this Duration.of requires the temporal unit interface which is from java.time.
Context
There is a race condition on the subscription handler where the current state can be empty if the subscription is established before the handler service is started, which causes non-perpetual jobs to silently fail at startup.
Checklist
./gradlew build
compiles code correctly./gradlew test
passes all tests