-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathindex.html
840 lines (786 loc) · 37.4 KB
/
index.html
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
<!doctype html>
<html>
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0, maximum-scale=1.0, user-scalable=no">
<title>Kafka Fundamentals</title>
<meta name="description" content="A presentation about Apache Kafka, covering basic knowledge everyone using Kafka for event collaboration should have.">
<meta name="author" content="Martin Grotzke, inoio gmbh">
<link rel="stylesheet" href="dist/reset.css">
<link rel="stylesheet" href="dist/reveal.css">
<link rel="stylesheet" href="dist/theme/moon.css" id="theme">
<!-- Theme used for syntax highlighted code -->
<link rel="stylesheet" href="plugin/highlight/monokai.css">
<!-- Printing and PDF exports -->
<script>
var link = document.createElement( 'link' );
link.rel = 'stylesheet';
link.type = 'text/css';
link.href = window.location.search.match( /print-pdf/gi ) ? 'css/print/pdf.css' : 'css/print/paper.css';
document.getElementsByTagName( 'head' )[0].appendChild( link );
</script>
<style type="text/css">
code {
color: #ccc;
}
</style>
</head>
<body>
<div class="reveal">
<div class="slides">
<section>
<div style="margin-top: 15%">
<h1>Kafka Fundamentals</h1>
<p>
<small>
This is about (java) producers, consumers and the cluster itself, i.e. topic partitions, replicas etc.<br/>
It's <em>not</em> about e.g. Kafka Connect, Kafka Streams or 3rd party client libraries like spring-kafka or reactor-kafka.
</small>
</p>
<p><small>(For Kafka 3.x)</small></p>
<div style="margin-top: 15%">
<small>(if you find a potential improvement please submit a pull request <a href="https://github.com/inoio/slides-kafka-fundamentals/">here</a>)</small>
</div>
<aside class="notes">
<ul>
<li>presentation mode = F11</li>
<li>change font size: CTRL + mouse wheel (CTRL + Plus in normal view)</li>
<li>switch tab: CTRL + Tab</li>
</ul>
</aside>
</section>
<section>
<h2>agenda</h2>
<ol>
<li>High level overview</li>
<li>Kafka cluster</li>
<li>Producers</li>
<li>Consumers</li>
<li>Transactional Messaging</li>
</ol>
</section>
<section>
<section>
<h2>high level overview</h2>
</section>
<section>
<h3>what is kafka?</h3>
<p>Kafka is an event streaming platform, combining three key capabilities:</p>
<ol>
<li>To <em>publish</em> (write) and <em>subscribe to</em> (read) streams of events</li>
<li>To <em>store</em> streams of events durably and reliably for as long as you want</li>
<li>To <em>process</em> streams of events as they occur or retrospectively</li>
</ol>
<aside class="notes">
<ul>
<li>it's pub/sub, not point to point, i.e. not a message queue</li>
</ul>
note: source https://kafka.apache.org/intro
</aside>
</section>
<section>
<h3>it's a log: an ordered, persistent sequence of events</h3>
<div><img src="img/log-of-records.svg" alt="log"></div>
</section>
<section>
<h3>events are organized in topics</h3>
<div><img src="img/topics.svg" alt="topics"></div>
</section>
<section>
<h3>topics are split into partitions</h3>
<div><img src="img/partitions.svg" alt="partitions"></div>
</section>
<section>
<h3>producers write to topic partitions</h3>
<div><img src="img/producer.svg" alt="producer"></div>
</section>
<section>
<h3>consumers read from topic partitions</h3>
<div><img src="img/consumer.svg" alt="consumer"></div>
</section>
<!--section>
<h3>Compacted Topics</h3>
<div><img width="50%" src="img/compacted-topic.svg" alt="compacted topic"></div>
<p><small><code>cleanup.policy=compact</code> (default: "delete")</small></p>
<p style="font-size: 70%">
<em>"Log compaction ensures that Kafka will always retain at least the last known value for each message key within the log of data for a single topic partition."</em> (<a href="https://kafka.apache.org/documentation/#compaction">Kafka docs</a>)
</p>
</section-->
<section>
<h3>that's it?</h3>
</section>
</section>
<section>
<section>
<h2>kafka cluster</h2>
</section>
<section>
<h3>replicated logs — leaders and replicas</h3>
<div style="margin-left: -3em"><img width="70%" src="img/leaders-and-replicas.svg" alt="leaders and replicas"></div>
<aside class="notes">
<ul style="font-size: 90%">
<li>Each partition is replicated across a configurable number of servers for fault-tolerance. Each of these replicas is designated as either a leader or a follower.</li>
<li>A leader replica is responsible for all read and write requests for a given partition. It receives data from producers, appends it to its local log, and sends the new data to its followers.</li>
<li>Follower replicas replicate the leader, they just receive data and maintain a copy of it.</li>
<li>Kafka guarantees, that replicas are placed on different brokers</li>
</ul>
</aside>
</section>
<section>
<h3>replicated logs — broker (default) config</h3>
<div><img width="40%" src="img/leaders-and-replicas-2.svg" alt="leaders and replicas"></div>
<div style="font-size: 70%">
<ul>
<li class="fragment fade-up"><code>num.partitions=3</code> – default number of log partitions per topic (default: 1)</li>
<li class="fragment fade-up"><code>default.replication.factor=3</code> – default replication factors for automatically created topics (Kafka default: 1, MSK default: 3 for 3-AZ clusters, 2 for 2-AZ clusters)</li>
<li class="fragment fade-up"><code>log.retention.hours</code> – time to keep a log file (default: 168 = 7d)</li>
<li class="fragment fade-up"><code>auto.create.topics.enable</code> – allow automatic topic creation on the broker when subscribing to or assigning a topic (Kafka default: true, MSK default: false)</li>
</ul>
<p>
<small>
source:
<a href="https://kafka.apache.org/documentation/#config">Kafka config</a>
/ <a href="https://docs.aws.amazon.com/msk/latest/developerguide/msk-default-configuration.html">MSK default config</a>
</small>
</p>
</div>
</section>
<section>
<h3>replicated logs — topic creation/config</h3>
<div><img width="40%" src="img/leaders-and-replicas-2.svg" alt="leaders and replicas"></div>
<p>$ bin/kafka-topics.sh --create --topic my-topic \<br/>
--partitions 5 \<br/>
--replication-factor 3 \<br/>
--config retention.ms=864000000
</p>
<aside class="notes">
864000000ms = 10d
</aside>
</section>
<section>
<h3>replicated logs — in-sync replicas (ISR)</h3>
<div><img src="img/insync-replicas_2.svg" alt="insync-replicas"></div>
<div style="font-size: 80%">
<ul>
<li class="fragment fade-up"><code>min.insync.replicas</code> – the minimum number of replicas that must acknowledge a write to be considered successful, if used with "<code>acks=all</code>" (Kafka default: 1, MSK default: 2 for 3-AZ clusters, 1 for 2-AZ clusters)</li>
<li class="fragment fade-up">
A typical scenario would be <code>replication factor 3</code>,
<code>min.insync.replicas=2</code>,
and produce with <code>acks=all</code>
</li>
</ul>
</div>
</section>
<section>
<h3>leader election</h3>
<div><img width="60%" src="img/broker-unavailable.svg" alt="broker unavailable"></div>
<ul>
<li>Is triggered when a leader replica fails, or when a new replica is added to a partition</li>
<li>Controlled by the Kafka broker that acts as the (*active) controller for the cluster</li>
</ul>
</section>
<section>
<h3>leader election - related config</h3>
<div>
<ul style="font-size: 80%">
<li><code>replication.factor</code> – the more replicas, the more fault-tolerant the partition is</li>
<li><code>min.insync.replicas</code> – helps prevent data loss in case of leader failure</li>
<li>
<code>unclean.leader.election.enable</code> – controls whether the leader can be elected from an out-of-sync replica
– if <code>true</code>, availability is preferred over consistency/durability (Kafka default: false, MSK default: true)
</li>
</ul>
</div>
</section>
<section>
<h3>that's the cluster (?)</h3>
</section>
</section>
<section>
<section>
<h2>the java producer</h2>
</section>
<section style="width: 106%; margin-left: -0.5em">
<img style="float: left; padding-right: 0.5em" src="img/producer-flow.svg" width="28%" alt="producer flow" />
<h3>java producer overview</h3>
<div>
<ul style="font-size: 96%">
<li>connect to broker(s) and cache metadata</li>
<li>
send records to broker(s) / leaders
<ol>
<li>serialize key/value</li>
<li>select topic partition to write to</li>
<li>maybe compress data</li>
<li>buffer/accumulate messages</li>
<li>send message batches</li>
</ol>
</li>
</ul>
</div>
</section>
<section>
<h3>connect to broker(s) and cache metadata</h3>
<ul style="font-size: 80%">
<li><code>bootstrap.servers</code> – list of host/port pairs for establishing the initial connection to the cluster. Is used to discover the full set of servers/brokers</li>
<li>At least 2 are recommended for fault-tolerance</li>
<li>The bootstrap server returns list of all brokers in the cluster and all metadata like topics, partitions, partition leader, replication factor etc.</li>
<li><code>metadata.max.age.ms</code> – update metadata even if there's no leadership change happening due to new brokers or topics / partitions (default: 5 min)
</li>
</ul>
</section>
<section style="width: 106%; margin-left: -0.5em">
<img style="float: left; padding-right: 0.8em" src="img/producerrecord.svg" width="23%" alt="producer record" />
<h3>producer record</h3>
<div>
<ul style="font-size: 90%; width: 70%">
<li>If <code>timestamp</code> is not provided, the producer will stamp the record with its current time</li>
<li>Headers are useful to additionally describe the message or value, e.g. to allow consumers to decide about the deserializer, or information useful for monitoring</li>
<li>Also <code>value</code> can be <code>null</code> – tombstone for compacted topics</li>
</ul>
</div>
</section>
<section>
<h3>serialize key/value</h3>
<ul style="font-size: 80%">
<li><code>key.serializer</code> – Serializer class for key, e.g. <code>...StringSerializer</code></li>
<li><code>value.serializer</code> – Serializer class for value, e.g. <code>...ByteArraySerializer</code></li>
<li>Not covered here: <a href="https://docs.confluent.io/platform/current/schema-registry/index.html">Confluent Schema Registry</a></li>
</ul>
</section>
<section>
<h3>select topic partition</h3>
<img src="img/select-partition.svg" width="80%" alt="select partition" />
<p style="font-size: 60%">Messages with the same key are written to the same partition – ordering is provided</p>
<aside class="notes">
<ul>
<li>Changing the number of partitions at runtime affects ordering!</li>
</ul>
</aside>
</section>
<section>
<h3>buffer/accumulate messages</h3>
<img src="img/producer-buffer.svg" width="100%" alt="producer buffer" />
<p style="font-size: 60%">The <code>send()</code> method is asynchronous, it adds the record to a buffer of pending record sends and immediately returns. This allows the producer to batch together individual records for efficiency.</p>
</section>
<section>
<h3>buffer/accumulate messages (config)</h3>
<img style="margin-top: 0px" src="img/producer-buffer.svg" width="40%" alt="producer buffer" />
<ul style="font-size: 70%">
<li><code>batch.size</code> – batch size in bytes, i.e. the upper bound of the batch size to be sent (default: 16 kB)</li>
<li><code>linger.ms</code> – time to give for batches to fill, to make batching more likely even under moderate load (default: 0)</li>
<li><code>buffer.memory</code> – the total amount of memory available for buffering. If records are sent faster than they can be transmitted to the server this buffer space will be exhausted (default: 32 MB)</li>
<li><code>max.block.ms</code> – max time to block <code>send()</code> calls when the buffer space is exhausted, after which a <code>TimeoutException</code> is thrown (default: 1 min)</li>
</ul>
<aside class="notes">
<ul>
<li>batch.size
<ul>
<li>Requests sent to brokers will contain multiple batches, one for each partition with data available to be sent.</li>
<li>A small batch size will make batching less common and may reduce throughput (a batch size of zero will disable batching entirely).</li>
<li>A very large batch size may use memory a bit more wastefully as we will always allocate a buffer of the specified batch size in anticipation of additional records.</li>
</ul>
</li>
</ul>
</aside>
</section>
<section>
<h3>send message batches</h3>
<img src="img/producer-sender-thread.svg" width="80%" alt="producer sender thread" />
<p style="font-size: 60%">Batches are grouped by broker and sent if either <code>batch.size</code> or <code>linger.ms</code> is reached.</p>
</section>
<section>
<h3>send message batches in parallel</h3>
<img src="img/producer-sender-thread_2.svg" width="75%" alt="producer parallel requests" />
<p style="font-size: 60%"><code>max.in.flight.requests.per.connection</code> controls the number of messages to be sent without any acknowledgment (default: 5)</p>
</section>
<section>
<h3>message batch send failures</h3>
<img src="img/producer-sender-thread_3.svg" width="75%" alt="producer request fails" />
<p style="font-size: 60%">With <code>max.in.flight.requests.per.connection > 1</code> there could be gaps if a request fails</p>
</section>
<section>
<h3>handling send failures (retries)</h3>
<img src="img/producer-sender-thread_4.svg" style="margin-top: 0px" width="70%" alt="producer request retries" />
<ul style="font-size: 60%">
<li><code>retries</code> – number of times to resend a failed request (default: <code>Integer.MAX_VALUE</code>)</li>
<li><code>request.timeout.ms</code> – max time the client will wait for the response (default: 30 sec)</li>
<li><code>delivery.timeout.ms</code> – max time for <code>send()</code> to complete (default: 2 min)</li>
<li>Note: with <code>max.in.flight.requests.per.connection > 1</code> and <code>retries > 0</code> message reordering could occur</li>
</ul>
</section>
<section>
<h3>handling send failures (retries)</h3>
<img src="img/producer-sender-thread_5.svg" width="70%" alt="producer request retries with guaranteed ordering" />
<p style="font-size: 60%">
To allow <code>retries</code> and prevent message reordering set <code>max.in.flight.requests.per.connection = 1</code>
</p>
</section>
<section>
<h3>send failure - duplicates</h3>
<img src="img/producer-sender-thread_6.svg" width="70%" alt="producer send failure duplicates" />
<p style="font-size: 60%">
If the response gets lost, enabled <code>retries</code> could lead to duplicate messages <br/>(<em>at-least-once</em> delivery semantics)
</p>
</section>
<section>
<h3>idempotent producer</h3>
<img src="img/producer-sender-thread_7.svg" style="margin-top: 0px" width="70%" alt="idempotent producer" />
<ul style="font-size: 60%">
<li>
<code>enable.idempotence</code> – prevents message duplication and reordering for a single producer (!),
to achieve <em>exactly-once</em> delivery semantics (default: <code>true</code> for Kafka 3.x)
</li>
<li>Requires <code>max.in.flight.requests.per.connection <= 5</code>
(with message ordering preserved), <code>retries > 0</code> and <code>acks='all'</code></li>
<li>If conflicting configurations are set and idempotence is not explicitly enabled, it is disabled. If explicitly enabled and conflicting configs are set, a <code>ConfigException</code> is thrown</li>
<li>Application level retries e.g. due to a app/producer crash could still lead to duplicates</li>
</ul>
</section>
<section>
<h3>message durability / consistency</h3>
<img src="img/producer-acks.svg" style="margin-top: 0px" width="80%" alt="producer acks" />
<ul style="font-size: 60%">
<li>
<code>acks</code> – number of acknowledgments the producer requires the leader to have received before considering a request complete (default: <code>all</code> for Kafka 3.x)
</li>
<li><code>acks=0</code> – do not wait for any acknowledgment</li>
<li><code>acks=1</code> – leader writes the record and responds without awaiting acks from followers</li>
<li>
<code>acks=all</code> – leader waits for <code>min.insync.replicas</code> to acknowledge.
This guarantees that the record will not be lost as long as at least one in-sync replica remains alive
</li>
</ul>
</section>
<section>
<h3>that's the producer (?)</h3>
</section>
</section>
<section>
<section>
<h2>the java consumer</h2>
</section>
<section>
<h3>java consumer overview</h3>
<ul>
<li>connect to broker(s) and cache metadata</li>
<li>subscribe to topic(s)</li>
<li>join consumer group</li>
<li>continuously: poll records from broker(s) and commit offsets</li>
</ul>
<pre style="width: 100%">
<code data-trim data-noescape>
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
process(record);
}
}
</code>
</pre>
</section>
<section>
<h3>consumer offsets</h3>
<div>
<img src="img/consumer-offsets_2.svg" width="70%" alt="consumer offsets" />
</div>
<ul style="font-size: 60%">
<li>committed offset: where to resume from in the event of a restart</li>
<li>
<code>auto.offset.reset</code> – how to behave when no offsets have been committed,
or when a committed offset is no longer valid (<code>earliest</code>/<code>latest</code>/<code>none</code>, default: <code>latest</code>)
</li>
</ul>
<aside class="notes">
<ul>
<li><code>auto.offset.reset=none</code>: throw exception to the consumer if no previous offset is found for the consumer's group</li>
</ul>
</aside>
</section>
<section>
<h3>consumer offset management</h3>
<ul style="font-size: 74%">
<li>
The default <code>enable.auto.commit=true</code> means that offsets are committed automatically
with <code>auto.commit.interval.ms</code> (default: 5 sec)<br/>
<div style="font-size: 80%; background-color: darkslategrey">
⚠️ For <em>at-least-once</em> delivery you must consume all data returned
from <code>poll()</code> before any subsequent calls, or before closing the consumer (otherwise, the
committed offset could get ahead of the consumed position)</div>
</li>
<li>It's also possible to "manually" <code>commitSync</code> or <code>commitAsync</code> an offset
<ul>
<li>
<code>commitSync</code>: the consumer is blocked until the commit request returns successfully,
including retries
</li>
<li>
<code>commitAsync</code>: send the commit request and return immediately to process the next message
or batch of messages
</li>
</ul>
</li>
</ul>
<aside class="notes">
code source: https://kafka.apache.org/33/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html
</aside>
</section>
<section>
<h3>consumer offset storage</h3>
<img src="img/consumer-committed_offsets.svg" width="45%" alt="consumer committed offsets" />
<ul style="font-size: 70%">
<li>Kafka stores offsets in a compacted topic called <code>__consumer_offsets</code></li>
<li><code>offsets.retention.minutes</code> – retention period (default: 7 days)</li>
<li>
Alternative: when processing to an external system, the consumer's offset can be stored together with the output (atomically),
to be used with <code>seek(TopicPartition, long)</code> on restart
</li>
</ul>
</section>
<section>
<h3>consumer offset lags</h3>
<img src="img/consumer-offsets_3.svg" width="55%" alt="consumer offset lag" />
<ul style="font-size: 65%">
<li>With a growing consumer lag eventually messages could get lost</li>
<li>Increasing the log retention time could be a temporary workaround</li>
<li>Adding consumers (and/or partitions) could increase throughput</li>
<li>Increasing fetched batches could increase throughput (with the cost of latency)
<ul>
<li><code>fetch.max.wait.ms</code> – sets a maximum threshold for time-based batching (default: 500)</li>
<li><code>fetch.min.bytes</code> – sets a minimum threshold for size-based batching (default: 1)</li>
</ul>
</li>
</ul>
</section>
<section>
<h3>consumer groups</h3>
<p style="font-size: 60%">
Ensure, that a single topic partition is processed by <em>1</em> consumer of the group.<br/>
A consumer group is identified by a group id.
</p>
<div>
<img src="img/consumer-groups_2.svg" width="50%" alt="2 consumer groups" />
</div>
</section>
<section>
<h3>consumer group rebalancing</h3>
<div>
<img src="img/consumer-group_1-consumer.svg" style="margin-left: 0.5em; margin-right: 0.5em" width="38%" alt="consumer group 1 consumer" />
<img src="img/consumer-group_2-consumer.svg" style="margin-left: 0.5em; margin-right: 0.5em" width="38%" alt="consumer group 2 consumer" />
<img src="img/consumer-group_3-consumer.svg" style="margin-left: 0.5em; margin-right: 0.5em; vertical-align: top" width="38%" alt="consumer group 3 consumer" />
<img src="img/consumer-group_4-consumer.svg" style="margin-left: 0.5em; margin-right: 0.5em; vertical-align: top" width="38%" alt="consumer group 4 consumer" />
</div>
<p style="font-size: 70%">
The <em>group coordinator</em> is responsible for managing<br/>
<span style="padding: 0.2em">a)</span> group members and
<span style="padding: 0.2em">b)</span> their partition assignments.
</p>
<aside class="notes">
<ul>
<li>Rebalancing is orchestrated by the group coordinator, involves communication with all consumers in the group</li>
<li>The group coordinator is chosen from <code>__consumer_offsets</code> leaders</li>
</ul>
</aside>
</section>
<section>
<h3>consumer group rebalancing</h3>
<div>
<img src="img/consumer-group_1-consumer.svg" style="margin-left: 0.5em; margin-right: 0.5em" width="25%" alt="consumer group 1 consumer" />
<img src="img/consumer-group_2-consumer.svg" style="margin-left: 0.5em; margin-right: 0.5em" width="25%" alt="consumer group 2 consumer" />
</div>
<ul style="font-size: 65%">
<li>In the past rebalancing was "stop the world", with Kafka 2.4 <em>incremental cooperative rebalancing</em>
was introduced to reduce consumption downtime
</li>
<li>
<code>partition.assignment.strategy</code> – a list of class names, ordered by preference,
of supported partition assignment strategies to distribute partition ownership (default: <code>[RangeAssignor, CooperativeStickyAssignor]</code>)
</li>
<li>
I.e. the default config (Kafka 3.3) uses <code>RangeAssignor</code>, but allows upgrading to
<code>CooperativeStickyAssignor</code> with just a single rolling bounce (removing <code>RangeAssignor</code> from the list)
(see also <a href="https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=177048248">KIP-726</a> to change the default)
</li>
<li>
For newer clusters <code>CooperativeStickyAssignor</code> should be preferred to get cooperative rebalancing
</li>
<li>
A nice explanation / visualization of different strategies can be found in <a href="https://chrzaszcz.dev/2021/09/kafka-assignors/">this blog post</a>
</li>
</ul>
</section>
<section>
<h3>group management: consumer liveness</h3>
<ul style="font-size: 65%">
<li>Each member in the group must send heartbeats to the coordinator to indicate its liveness.
If no heartbeats are received before the expiration of the configured session timeout, the coordinator
will remove this client from the group and initiate a rebalance.
</li>
<li class="fragment fade-up">
<code>session.timeout.ms</code> – max time that can pass without a heartbeat (default: 45 sec,
got increased from 10 sec for 3.0, see <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-735%3A+Increase+default+consumer+session+timeout">KIP-735</a>)
<div class="fragment fade-up" style="margin-top: 0.3em; margin-bottom: 0.3em; font-size: 94%; background-color: darkslategrey">
⚠️ Make sure to <code>close()</code> the consumer on shutdown to keep processing latency low!
</div>
</li>
<li class="fragment fade-up">
<code>heartbeat.interval.ms</code> – time between heartbeats sent to the consumer coordinator;
must be < <code>session.timeout.ms</code>, recommended to be < 1/3 of it;
also used for discovering that a rebalance is in progress
(default: 3 sec)
</li>
<li class="fragment fade-up">
<code>max.poll.interval.ms</code> – max delay between invocations of <code>poll()</code>.
If exceeded, the consumer is considered failed and the group will rebalance (default: 5 min)
</li>
<li class="fragment fade-up">
<code>max.poll.records</code> – max number of records that <code>poll()</code> will return;
because the consumer will only join a rebalance inside <code>poll()</code>,
reducing this config reduces the delay of a group rebalance (default: 500)
</li>
</ul>
<aside class="notes">
<ul>
<li><code>session.timeout.ms</code> got increased from 10 to 45 sec with Kafka 3 /
<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-735%3A+Increase+default+consumer+session+timeout">KIP-735</a>
to be more tolerant to temporary/spurious member failures
</li>
</ul>
</aside>
</section>
<section>
<h3>that's the consumer (?)</h3>
</section>
</section>
<section>
<section>
<h2>transactional messaging</h2>
</section>
<section>
<h3>transactional messaging "use cases"</h3>
<ol style="font-size: 90%">
<li>Atomic writes across multiple partitions: consumers see all or nothing</li>
<li>
Exactly-once processing in "read-process-write" cycles <em>from</em> Kafka topics <em>to</em> Kafka topics,
committing offsets atomically
</li>
<li>
Exactly-once processing & zombie fencing: if a "zombie instance" (unavailable/dangling) is "replaced" with a new one,
input topics could be processed by both instances, causing duplicates in output topics
</li>
</ol>
<aside class="notes">
<ol style="font-size: 85%">
<li>
Atomic writes across multiple partitions: send a batch of messages to multiple partitions such that
either all messages in the batch are eventually visible to any consumer or none are ever visible to consumers
</li>
<li>
zombie fencing: a zombie instance is e.g. a producer that's temporarily partitioned from the brokers
or undergoing an application pause.
</li>
</ol>
</aside>
</section>
<section>
<h3>transactional producer</h3>
<pre style="width: 100%">
<code data-trim data-noescape>
Properties props = new Properties();
props.put("enable.idempotence", "true");
props.put("transactional.id", "producer-1");
Producer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions();
// ...
producer.beginTransaction();
// ...
producer.commitTransaction();
</code>
</pre>
<ul style="font-size: 75%">
<li>When the <code>transactional.id</code> is specified, all messages sent by the producer <em>must</em> be part of a transaction</li>
<li>There can be only one open transaction per producer</li>
</ul>
</section>
<section>
<h3>1) atomic writes across partitions</h3>
<pre style="width: 100%">
<code data-trim data-noescape>
try {
producer.beginTransaction();
producer.send(topic1Record);
producer.send(topic2Record);
producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException) {
// We can't recover, so our only option is to close and exit
producer.close();
} catch (KafkaException e) {
// For all other exceptions, just abort the transaction and try again
producer.abortTransaction();
}
</code>
</pre>
<p style="font-size: 75%">
All messages sent between <code>beginTransaction()</code> and <code>commitTransaction()</code> will be part of a single transaction
</p>
</section>
<section>
<h3>1.1) Reading Transactional Messages</h3>
<ul style="font-size: 85%">
<li>
For atomic writes across partitions to work (from consumer perspective),
consumer <code>isolation.level</code> must be set to <code>read_committed</code> (default: <code>read_uncommitted</code>):
<code>poll()</code> will only return transactional messages which have been committed
</li>
<li>If set to <code>read_uncommitted</code>, <code>poll()</code> will return <em>all</em> messages, even transactional
messages which have been aborted</li>
<li>Non-transactional messages will be returned unconditionally in either mode</li>
</ul>
</section>
<section>
<h3>2) committing offsets in "read-process-write"</h3>
<pre style="width: 100%; font-size: 53%">
<code data-trim data-noescape>
KafkaConsumer<String, String> c = new KafkaConsumer<>(consProps);
KafkaProducer<String, String> p = new KafkaProducer<>(prodProps);
p.initTransactions();
while(true) {
ConsumerRecords<String, String> inputs = c.poll(100);
p.beginTransaction();
process(inputs).forEach(p::send);
sendOffsetsResult = p.sendOffsetsToTransaction(
offsets(inputs), c.groupMetadata()
);
p.endTransaction();
}
</code>
</pre>
<ul style="font-size: 58%">
<li>
<code>sendOffsetsToTransaction</code> – sends a list of specified offsets to the consumer group coordinator,
and also marks those offsets as part of the current transaction</li>
<li>
The committed offset should be the next message your application will consume, i.e. <code>lastProcessedMessageOffset + 1</code>
</li>
<li><em>Note:</em> the consumer should have set <code>enable.auto.commit=false</code></li>
<li>These offsets will be committed only if the transaction is committed</li>
</ul>
</section>
<section>
<h3>3) exactly-once processing & zombie fencing</h3>
<pre style="width: 100%">
<code data-trim data-noescape>
Properties props = new Properties();
props.put("enable.idempotence", "true");
props.put("transactional.id", stableUniqueId);
Producer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions();
// ...
</code>
</pre>
<ul style="font-size: 65%">
<li>
<code>transactional.id</code> – unique id used for transactional delivery, is used to provide
continuity of transactional state across application restarts. I.e. when taking zombie instances (like network
partitions) as a given, for exactly-once semantics the <code>transactional.id</code> must be stable for
multiple producer sessions
</li>
<li>
<code>initTransactions()</code> – Ensures any transactions initiated by previous instances of the
producer with the same <code>transactional.id</code> are completed. If the previous instance had failed
with a transaction in progress, it will be aborted. If the last transaction had begun completion, but not
yet finished, this method awaits its completion
</li>
</ul>
</section>
<section>
<h3>that's transactional messaging (?)</h3>
<div style="font-size: 70%; background-color: darkslategrey">
⚠️ <em>Disclaimer</em>: this was a high-level overview. To use it please learn more
and dig deeper to see how it fits your application, workload and operational model!<br/>
If not used correctly you might not get what you want...
<p>
Usually a more robust approach is to use <em>at-least-once</em> semantics and let consumers deduplicate.
</p>
</div>
<p>Further reading</p>
<ul style="font-size: 70%;">
<li>
<a href="https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/">
Exactly-Once Semantics Are Possible: Here’s How Kafka Does It
</a> (06/2017)
</li>
<li>
<a href="https://www.confluent.io/blog/transactions-apache-kafka/">
Transactions in Apache Kafka
</a> (11/2017)
</li>
<li>
<a href="https://tgrez.github.io/posts/2019-04-13-kafka-transactions.html">
When Kafka transactions might fail
</a> (04/2019)
</li>
<li>
<a href="https://www.confluent.io/blog/simplified-robust-exactly-one-semantics-in-kafka-2-5/">
Improved Robustness and Usability of Exactly-Once Semantics in Apache Kafka
</a> (07/2020)
</li>
<li>
<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics">
KIP-447: Producer scalability for exactly once semantics
</a> (05/2022)
</li>
</ul>
</section>
</section>
<section>
<section>
<div style="margin-top: 30%"></div>
<h3>the end</h3>
<div style="margin-top: 30%">
<small>p.s. if you found a potential improvement please submit a pull request <a href="https://github.com/inoio/slides-kafka-fundamentals/">here</a></small>
</div>
</section>
<section>
<h3>Further Reading</h3>
<p><small>A collection of links that could be of interest</small></p>
<ul style="font-size: 70%;">
<li>
<a href="https://developers.redhat.com/articles/2022/05/03/fine-tune-kafka-performance-kafka-optimization-theorem">
Fine-tune Kafka performance with the Kafka optimization theorem
</a>
</li>
<li>
<a href="https://www.confluent.io/blog/how-choose-number-topics-partitions-kafka-cluster/">
How to Choose the Number of Topics/Partitions in a Kafka Cluster?
</a>
</li>
<li><a href="https://strimzi.io/blog/2020/10/15/producer-tuning/">Optimizing Kafka producers</a></li>
<li><a href="https://strimzi.io/blog/2021/01/07/consumer-tuning/">Optimizing Kafka consumers</a></li>
</ul>
</section>
</section>
</div>
</div>
<script src="dist/reveal.js"></script>
<script src="plugin/notes/notes.js"></script>
<script src="plugin/markdown/markdown.js"></script>
<script src="plugin/highlight/highlight.js"></script>
<script>
// More info about initialization & config:
// - https://revealjs.com/initialization/
// - https://revealjs.com/config/
Reveal.initialize({
hash: true,
slideNumber: 'c/t',
// Learn about plugins: https://revealjs.com/plugins/
plugins: [ RevealMarkdown, RevealHighlight, RevealNotes ]
});
</script>
</body>
</html>