Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: transactions #323

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
Open

Conversation

rkrishn7
Copy link

@rkrishn7 rkrishn7 commented Aug 17, 2024

This PR adds client support for Pulsar Transactions.

Resolves #253

Notes:

  • This PR tries to mimic the Java client where possible. More details on transactions in Pulsar can be found here.
  • Will likely need to edit the Pulsar service container in CI to enable the TC and bootstrap transaction metadata
  • This PR necessitates a breaking version change as it adds a new error variant

Open Questions:

  • Should these changes be gated behind a feature flag?
  • This change likely has implications when buffering in the producer. Thoughts on how we should handle that?
  • The main Transaction API uses internal synchronization to provide a "no mut" public interface. Are there any alternative approaches that may be better?

@rkrishn7
Copy link
Author

@FlorentinDUBOIS Tagging you for visibility. I know this is a lot so no rush here!

@rkrishn7
Copy link
Author

Bumping and tagging some additional folks - @freeznet @BewareMyPower

@BewareMyPower
Copy link

I'm going to review this PR soon.

@FlorentinDUBOIS
Copy link
Collaborator

Thanks you @rkrishn7, I will take a look at this pull request and test it in production next week. Sorry for the delay for my answer

Comment on lines 95 to 97
current_backoff = std::cmp::min(
Self::OP_MAX_BACKOFF * 2u32.saturating_pow(current_retries),
Self::OP_MAX_BACKOFF,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why use cmp::min here? OP_MAX_BACKOFF should always be the smaller one. It looks like you should use OP_MIN_BACKOFF * 2u32.saturating_pow(current_retries).

BTW, could you reuse the operation_retry_parameters in Pulsar::new instead of hard coding backoff parameters? And it would be better to abstract a Backoff class like Java so that we can reuse the logic in connect_inner rather than rewriting the same logic again

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why use cmp::min here? OP_MAX_BACKOFF should always be the smaller one. It looks like you should use OP_MIN_BACKOFF * 2u32.saturating_pow(current_retries).

Thanks for catching that! Updated to use OP_MIN_BACKOFF there.

BTW, could you reuse the operation_retry_parameters in Pulsar::new instead of hard coding backoff parameters?

The same operation_retry_parameters as declared in Pulsar::new should already be getting used here because these operations rely on ConnectionSender::send_message .

The retry logic here is solely for TransactionCoordinatorNotFound errors. I think it's possible to make the backoff parameters here configurable, but they should be different from operation_retry_parameters.

And it would be better to abstract a Backoff class like Java so that we can reuse the logic in connect_inner rather than rewriting the same logic again

Agreed, but I think maybe we can make another issue for this? May not be a great idea to increase the scope of this PR since it's already quite large.

@BewareMyPower
Copy link

Should these changes be gated behind a feature flag?

I don't think so.

This change likely has implications when buffering in the producer. Thoughts on how we should handle that?

Sorry I don't get it. It only registers the partition with the transaction id via the ADD_PARTITION_TO_TXN command before adding the message to the buffer. It's necessary for transactional messages and has no impact on the regular send without transaction.

@rkrishn7
Copy link
Author

Should these changes be gated behind a feature flag?

I don't think so.

👍🏾

This change likely has implications when buffering in the producer. Thoughts on how we should handle that?

Sorry I don't get it. It only registers the partition with the transaction id via the ADD_PARTITION_TO_TXN command before adding the message to the buffer. It's necessary for transactional messages and has no impact on the regular send without transaction.

Sorry! To clarify, I think there's a potential footgun lurking here. For example:

Let's say we've enabled transactions and start producing transactional messages with a batching producer. If we haven't hit the batching threshold before the transaction's timeout, then all those buffered messages will fail to be produced. This is subtle, but definitely seems undesirable.

It seems like either transactional messages should bypass batching or we should warn the user if both transactions and batching are enabled.

@rkrishn7
Copy link
Author

Hi @BewareMyPower @FlorentinDUBOIS

Just bumping this PR! Would love to get this in soon 😄

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Transaction API support
3 participants