You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I have a situation in which I want to prevent zombie transactions, while also supporting multiple concurrent open transactions distributed over multiple replicas of my app.
Most of the documentation or blogs I have read mentioned that you typically only want 1 IProducer instance as a singleton and use that to publish to Kafka.
However I saw that you need to assign a transactionId when building the IProducer. This results in using the same transaction id being used for multiple different concurrent workloads which I cannot do (as each workload should commit only their events).
I came up with making a IProducer pool, which will rotate through available producers that each have a predefined transaction id which is based on the instance of the pool (very basic example (not tested)):
publicclassProducerPool{ConcurrentQueue<IProducer<string,byte[]>>producers{get;}=newConcurrentQueue<IProducer<string,byte[]>>();SemaphoreSlimsemaphore=newSemaphoreSlim(1,1);publicProducerPool(ProducerConfigproducerConfig){for(vari=0;i<5;i++){// predefined transaction id so the same app will use the same transaction id when it restarts which will then close all zombie transactions as they have the same idproducerConfig.TransactionalId="my-app-name-"+i;varproducer=newProducerBuilder<string,byte[]>(producerConfig).Build();producer.InitTransactions(TimeSpan.FromSeconds(30));producers.Enqueue(producer);}semaphore.Release();}publicasyncTask<IProducer<string,byte[]>>GetProducer(){awaitsemaphore.WaitAsync();if(producers.TryDequeue(outvarproducer)){semaphore.Release();returnproducer;}returnawaitGetProducer();}publicvoidReleaseProducer(IProducer<string,byte[]>producer){producers.Enqueue(producer);semaphore.Release();}}
Problem
The pool will use the SAME transaction ids for each instance of this app. This means that if I run 2 instances of this app both instances will close each others transaction while they should both work.
To solve this I would have to introduce randomness in the transaction id generation. However if I do that, then only the same instance of this app will use the same transaction ids. This would mean that any transaction that is left open due to a program crash will not be closed when a new instance is spun up again...
Question
How can I solve this problem?
Also, I want to be able to dynamically scale as well so it the amount of instances that are running is variable based on the workload
Checklist info
Confluent.Kafka nuget version: 2.3.0
Kafka version: confluentinc/cp-kafka:7.3.2
Client configuration (for example localhost):
Operating system: Windows (linux on the kubernetes cluster)
Checklist
Please provide the following information:
A complete (i.e. we can run it), minimal program demonstrating the problem. No need to supply a project file.
Confluent.Kafka nuget version.
Apache Kafka version.
Client configuration.
Operating system.
Provide logs (with "debug" : "..." as necessary in configuration). -> not applicable
Provide broker log excerpts. -> not applicable
Critical issue.
The text was updated successfully, but these errors were encountered:
Description
I have a situation in which I want to prevent zombie transactions, while also supporting multiple concurrent open transactions distributed over multiple replicas of my app.
Most of the documentation or blogs I have read mentioned that you typically only want 1
IProducer
instance as a singleton and use that to publish to Kafka.However I saw that you need to assign a transactionId when building the
IProducer
. This results in using the same transaction id being used for multiple different concurrent workloads which I cannot do (as each workload should commit only their events).I came up with making a
IProducer
pool, which will rotate through available producers that each have a predefined transaction id which is based on the instance of the pool (very basic example (not tested)):Problem
The pool will use the SAME transaction ids for each instance of this app. This means that if I run 2 instances of this app both instances will close each others transaction while they should both work.
To solve this I would have to introduce randomness in the transaction id generation. However if I do that, then only the same instance of this app will use the same transaction ids. This would mean that any transaction that is left open due to a program crash will not be closed when a new instance is spun up again...
Question
How can I solve this problem?
Also, I want to be able to dynamically scale as well so it the amount of instances that are running is variable based on the workload
Checklist info
2.3.0
confluentinc/cp-kafka:7.3.2
Checklist
Please provide the following information:
The text was updated successfully, but these errors were encountered: