AMQ Streams Demo (Kafka on Openshift)
You need a kafka cluster running on Openshift.
Creating a topic first_topic
oc exec -it my-cluster-zookeeper-0 -c zookeeper -- bin/ --zookeeper localhost:21810 --topic first_topic --create --partitions 3 --replication-factor 3
List all topics
oc exec -it my-cluster-zookeeper-0 -c zookeeper -- bin/ --zookeeper localhost:21810 --list
Describe topic first_topic
oc exec -it my-cluster-zookeeper-0 -c zookeeper -- bin/ --zookeeper localhost:21810 --topic first_topic --describe
Creating a topic to be deleted
oc exec -it my-cluster-zookeeper-0 -c zookeeper -- bin/ --zookeeper localhost:21810 --topic second_topic --create --partitions 3 --replication-factor 3
List the topics
oc exec -it my-cluster-zookeeper-0 -c zookeeper -- bin/ --zookeeper localhost:21810 --list
Deleting the topic
oc exec -it my-cluster-zookeeper-0 -c zookeeper -- bin/ --zookeeper localhost:21810 --topic second_topic --delete
oc run kafka-producer -ti --rm=true --restart=Never -- bin/ --broker-list my-cluster-kafka-bootstrap:9092 --topic first_topic
oc run kafka-producer -ti --rm=true --restart=Never -- bin/ --broker-list my-cluster-kafka-bootstrap:9092 --topic first_topic --producer-property acks=all
oc run kafka-producer -ti --rm=true --restart=Never -- bin/ --broker-list my-cluster-kafka-bootstrap:9092 --topic new-topic
oc run kafka-consumer -ti --rm=true --restart=Never -- bin/ --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic first_topic
Open another terminal, and run:
oc run kafka-producer -ti --rm=true --restart=Never -- bin/ --broker-list my-cluster-kafka-bootstrap:9092 --topic first_topic
oc run kafka-consumer -ti --rm=true --restart=Never -- bin/ --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic first_topic --from-beginning
oc run kafka-consumer -ti --rm=true --restart=Never -- bin/ --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic first_topic --group my-first-application
Open another terminal, and run the producer:
oc run kafka-producer -ti --rm=true --restart=Never -- bin/ --broker-list my-cluster-kafka-bootstrap:9092 --topic first_topic
Using another terminal, open another consumer
oc run kafka-consumer2 -ti --rm=true --restart=Never -- bin/ --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic first_topic --group my-first-application
Note when there are 2 consumers, messages are load balanced between them. Messages will be load balanced based on the number of partitions
oc run kafka-consumer3 -ti --rm=true --restart=Never -- bin/ --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic first_topic --group my-first-application
Run the producer:
oc run kafka-producer -ti --rm=true --restart=Never -- bin/ --broker-list my-cluster-kafka-bootstrap:9092 --topic first_topic
Run the consumer with a new consumer group:
oc run kafka-consumer -ti --rm=true --restart=Never -- bin/ --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic first_topic --group my-second-application --from-beginning
When we run the consumer, we will see all messages from topic first_topic
. Now, stop the consumer and run it again:
oc run kafka-consumer -ti --rm=true --restart=Never -- bin/ --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic first_topic --group my-second-application --from-beginning
Even though we have specified --from-beginning
the consumer won't show all the messages again. This happens because the offsets have been commit to kafka for group my-second-application
oc run kafka-consumer -ti --rm=true --restart=Never -- bin/ --bootstrap-server my-cluster-kafka-bootstrap:9092 --list
oc run kafka-consumer -ti --rm=true --restart=Never -- bin/ --bootstrap-server my-cluster-kafka-bootstrap:9092 --describe --group my-first-application
Now run a consumer:
oc run kafka-consumer2 -ti --rm=true --restart=Never -- bin/ --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic first_topic --group my-first-application
And execute again the describe command:
oc run kafka-consumer -ti --rm=true --restart=Never -- bin/ --bootstrap-server my-cluster-kafka-bootstrap:9092 --describe --group my-first-application
oc run kafka-consumer -ti --rm=true --restart=Never -- bin/ --bootstrap-server my-cluster-kafka-bootstrap:9092 --group my-first-application --reset-offsets --to-earliest --execute --topic first_topic
Now execute the consumer:
oc run kafka-consumer -ti --rm=true --restart=Never -- bin/ --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic first_topic --group my-first-application
Then you will see all messages from topic first_topic
oc run kafka-consumer -ti --rm=true --restart=Never -- bin/ --bootstrap-server my-cluster-kafka-bootstrap:9092 --describe --group my-first-application
oc run kafka-consumer -ti --rm=true --restart=Never -- bin/ --bootstrap-server my-cluster-kafka-bootstrap:9092 --group my-first-application --reset-offsets --shift-by 2 --execute --topic first_topic
oc run kafka-consumer -ti --rm=true --restart=Never -- bin/ --bootstrap-server my-cluster-kafka-bootstrap:9092 --group my-first-application --reset-offsets --shift-by -2 --execute --topic first_topic
oc exec -it my-cluster-zookeeper-0 -c zookeeper -- bin/ --zookeeper localhost:21810 --topic prices --create --partitions 3 --replication-factor 3
mkdir /tmp/demo && cd /tmp/demo
mvn io.quarkus:quarkus-maven-plugin:0.15.0:create \
-DprojectGroupId=org.acme \
-DprojectArtifactId=kafka-quickstart \
code .
mkdir -p src/main/java/org/acme/kafka/quickstart
cat <<EOF > src/main/java/org/acme/kafka/quickstart/
package org.acme.kafka.quickstart;
import io.reactivex.Flowable;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import javax.enterprise.context.ApplicationScoped;
import java.util.Random;
import java.util.concurrent.TimeUnit;
* A bean producing random prices every 5 seconds.
* The prices are written to a Kafka topic (prices). The Kafka configuration is specified in the application configuration.
public class PriceGenerator {
private Random random = new Random();
@Outgoing("generated-price") //
public Flowable<Integer> generate() { //
return Flowable.interval(5, TimeUnit.SECONDS)
.map(tick -> random.nextInt(100));
cat <<EOF > src/main/java/org/acme/kafka/quickstart/
package org.acme.kafka.quickstart;
import io.smallrye.reactive.messaging.annotations.Broadcast;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import javax.enterprise.context.ApplicationScoped;
* A bean consuming data from the "prices" Kafka topic and applying some conversion.
* The result is pushed to the "my-data-stream" stream which is an in-memory stream.
public class PriceConverter {
private static final double CONVERSION_RATE = 0.88;
@Incoming("prices") //
@Outgoing("my-data-stream") //
@Broadcast //
public double process(int priceInUsd) {
return priceInUsd * CONVERSION_RATE;
cat <<EOF > src/main/java/org/acme/kafka/quickstart/
package org.acme.kafka.quickstart;
import io.smallrye.reactive.messaging.annotations.Stream;
import org.reactivestreams.Publisher;
import javax.inject.Inject;
* A simple resource retrieving the in-memory "my-data-stream" and sending the items to a server sent event.
public class PriceResource {
@Stream("my-data-stream") Publisher<Double> prices; //
public String hello() {
return "hello";
@Produces(MediaType.SERVER_SENT_EVENTS) //
public Publisher<Double> stream() { //
return prices;
cat <<EOF > src/main/resources/
# Configure the Kafka sink (we write to it)
# Configure the Kafka source (we read from it)
cat <<EOF > src/main/resources/META-INF/resources/prices.html
<!DOCTYPE html>
<html lang="en">
<meta charset="UTF-8">
<link rel="stylesheet" type="text/css"
<link rel="stylesheet" type="text/css"
<div class="container">
<h2>Last price</h2>
<div class="row">
<p class="col-md-12">The last price is <strong><span id="content">N/A</span> €</strong>.</p>
<script src=""></script>
var source = new EventSource("/prices/stream");
source.onmessage = function (event) {
document.getElementById("content").innerHTML =;
Now execute the application:
./mvnw package -DskipTests
And deploy to Openshift
# Import image stream
oc import-image redhat-openjdk-18/openjdk18-openshift --confirm -n myproject
# Create build
oc new-build --name=quarkus-kafka \
--image-stream=openjdk18-openshift \
--env="JAVA_APP_JAR=kafka-quickstart-1.0-SNAPSHOT-runner.jar" \
--binary=true -n myproject
# Start Build
oc start-build quarkus-kafka --from-file=./target -n myproject
# Create app
oc new-app quarkus-kafka -n myproject
# Expose
oc expose svc/quarkus-kafka -n myproject
# Open now this url
oc get --no-headers route -n myproject | awk '{ print $2}'