Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Question] How to choose transactionid for producing events in distributed systems with multiple concurrent workloads and instances #2322

Open
8 tasks done
matijs-toonen opened this issue Oct 8, 2024 · 0 comments

Comments

@matijs-toonen
Copy link

Description

I have a situation in which I want to prevent zombie transactions, while also supporting multiple concurrent open transactions distributed over multiple replicas of my app.

Most of the documentation or blogs I have read mentioned that you typically only want 1 IProducer instance as a singleton and use that to publish to Kafka.
However I saw that you need to assign a transactionId when building the IProducer. This results in using the same transaction id being used for multiple different concurrent workloads which I cannot do (as each workload should commit only their events).

I came up with making a IProducer pool, which will rotate through available producers that each have a predefined transaction id which is based on the instance of the pool (very basic example (not tested)):

public class ProducerPool
{
    ConcurrentQueue<IProducer<string, byte[]>> producers { get; } = new ConcurrentQueue<IProducer<string, byte[]>>();
    SemaphoreSlim semaphore = new SemaphoreSlim(1, 1);

    public ProducerPool(ProducerConfig producerConfig)
    {
        for (var i = 0; i < 5; i++)
        {
            // predefined transaction id so the same app will use the same transaction id when it restarts which will then close all zombie transactions as they have the same id
            producerConfig.TransactionalId = "my-app-name-" + i;
            var producer = new ProducerBuilder<string, byte[]>(producerConfig).Build();
            producer.InitTransactions(TimeSpan.FromSeconds(30));
            producers.Enqueue(producer);
        }
        semaphore.Release();
    }
    
    public async Task<IProducer<string, byte[]>> GetProducer()
    {
        await semaphore.WaitAsync();
        if (producers.TryDequeue(out var producer))
        {
            semaphore.Release();
            return producer;
        }

        return await GetProducer();
    }

    public void ReleaseProducer(IProducer<string, byte[]> producer)
    {
        producers.Enqueue(producer);
        semaphore.Release();
    }
}

Problem

The pool will use the SAME transaction ids for each instance of this app. This means that if I run 2 instances of this app both instances will close each others transaction while they should both work.

To solve this I would have to introduce randomness in the transaction id generation. However if I do that, then only the same instance of this app will use the same transaction ids. This would mean that any transaction that is left open due to a program crash will not be closed when a new instance is spun up again...

Question

How can I solve this problem?
Also, I want to be able to dynamically scale as well so it the amount of instances that are running is variable based on the workload

Checklist info

  • Confluent.Kafka nuget version: 2.3.0
  • Kafka version: confluentinc/cp-kafka:7.3.2
  • Client configuration (for example localhost): image
  • Operating system: Windows (linux on the kubernetes cluster)

Checklist

Please provide the following information:

  • A complete (i.e. we can run it), minimal program demonstrating the problem. No need to supply a project file.
  • Confluent.Kafka nuget version.
  • Apache Kafka version.
  • Client configuration.
  • Operating system.
  • Provide logs (with "debug" : "..." as necessary in configuration). -> not applicable
  • Provide broker log excerpts. -> not applicable
  • Critical issue.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant