What is Event Streaming?
Event streams are similar to digital root systems in business, capturing real-time data from multiple sources for further processing and analysis.
Event Streaming is Used For?
Event streaming is used in a variety of industries for payments, logistics tracking, IoT data analytics, customer communication, healthcare analytics, data integration, and more
Apache Kafka is an event streaming platform. What does that mean?
- Kafka allows you to publish and subscribe to event streams.
- It preserves these streams reliably for as long as necessary.
- You can process streams in real time or in retroactively.
- Kafka is distributed, scalable, fault tolerant, and secure.
- It works across platforms and can be monitored or fully managed by vendors.
How does Kafka work?
Kafka is a distributed system with servers and clients communicating via high-performance TCP. It can run on various platforms like bare-metal, VMs, or containers in on-premise or cloud setups.
- Servers: Form a cluster across datacenters or cloud regions, with some acting as brokers for storage.
- Clients: Enable building distributed apps and microservices to read, write, and process events at scale, even handling faults or network issues.
- Broker: A server running Kafka that connects different services.
- Event: Messages in Kafka stored as bytes on the broker’s disk.
- Producer and Consumer: Services that send or receive events in Kafka.
- Topic: A way to categorize events, like folders in a file system.
- Partition: Parts of a topic used to handle data more efficiently.
- Replication Factor: Copies of partitions for backup in the Kafka cluster.
- Offset: Tracks which events a consumer has already read . A producer producing messages to a kafka topic with 3 partitions would look like this:
- Zookeeper: Manages Kafka cluster status, permissions, and offsets.
- Consumer Group: A group of consumers that work together to read from topics.
[ Are you looking: Kubernetes Implementation ]
Configuring a Kafka production-ready cluster
Step-1 Add strimzi repo and install strimzi-operator by using helm
Step-2 Configuring kafka cluster
helm repo add strimzi https://strimzi.io/charts/
helm install strimzi-operator strimzi/strimzi-kafka-operator -n kafka
Here node affinity is used to run all kafka pods on a specific node pool.
kafka-cluster.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: my-cluster
namespace: kafka
labels:
app: kafka
environment: prod-lke
spec:
kafka:
version: 3.6.1
replicas: 3
listeners:
- name: plain
port: 9092
type: internal
tls: false
- name: tls
port: 9093
type: internal
tls: false
- name: external
port: 9094
type: loadbalancer
tls: false
authentication:
type: scram-sha-512
resources:
requests:
memory: "16Gi"
cpu: "2"
limits:
memory: "16Gi"
cpu: "4"
template:
podDisruptionBudget:
maxUnavailable: 0
pod:
affinity:
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
- matchExpressions:
- key: lke.linode.com/pool-id
operator: In
values:
- "238182"
tolerations:
- key: "kubernetes.azure.com/scalesetpriority"
operator: "Equal"
value: "spot"
effect: "NoSchedule"
config:
default.replication.factor: 3
min.insync.replicas: 2
offsets.topic.replication.factor: 3
transaction.state.log.min.isr: 2
transaction.state.log.replication.factor: 3
zookeeper.connection.timeout.ms: 6000
jmxOptions: {}
storage:
type: jbod
volumes:
- id: 0
type: persistent-claim
size: 25Gi
deleteClaim: false
class: linode-block-storage-retain
zookeeper:
replicas: 3
storage:
type: persistent-claim
size: 25Gi
class: linode-block-storage-retain
deleteClaim: false
resources:
requests:
memory: "4Gi"
cpu: "1"
limits:
memory: "4Gi"
cpu: "2"
jvmOptions:
-Xms: 2048m
-Xmx: 2048m
template:
podDisruptionBudget:
maxUnavailable: 0
pod:
affinity:
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
- matchExpressions:
- key: lke.linode.com/pool-id
operator: In
values:
- "238182"
entityOperator:
topicOperator:
resources:
requests:
memory: "256Mi"
cpu: "200m"
limits:
memory: "256Mi"
cpu: "1"
userOperator:
resources:
requests:
memory: "512Mi"
cpu: "200m"
limits:
memory: "512Mi"
cpu: "1"
template:
pod:
affinity:
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
- matchExpressions:
- key: lke.linode.com/pool-id
operator: In
values:
- "238182"
kubectl apply -f kafka-cluster.yaml -n kafka
[ Also Read: redis maxmemory best practice ]
step:3 – Kafka-ui monitoring tool is deployed which gives more insights about your kafka cluster.
kafka-ui.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: kafka-ui-deployment
labels:
app: kafka-ui
spec:
replicas: 1
selector:
matchLabels:
app: kafka-ui
template:
metadata:
labels:
app: kafka-ui
spec:
affinity:
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
- matchExpressions:
- key: lke.linode.com/pool-id
operator: In
values:
- "238182"
containers:
- name: kafka-ui
image: provectuslabs/kafka-ui:latest
env:
- name: KAFKA_CLUSTERS_0_NAME
value: "my-cluster"
- name: KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS
value: my-cluster-kafka-bootstrap.kafka.svc:9092
- name: AUTH_TYPE
value: "LOGIN_FORM"
- name: SPRING_SECURITY_USER_NAME
value: "admin"
- name: SPRING_SECURITY_USER_PASSWORD
value: "pass"
imagePullPolicy: Always
resources:
requests:
memory: "256Mi"
cpu: "100m"
limits:
memory: "1024Mi"
cpu: "1000m"
ports:
- containerPort: 8080
---
apiVersion: v1
kind: Service
metadata:
name: kafka-ui-service
spec:
selector:
app: kafka-ui
ports:
- protocol: TCP
port: 8080
type: LoadBalancer
kubectl apply -f kafka-ui.yaml -n kafka
Output of the above files after applying them
kubectl get pods -n kafka
NAME READY STATUS RESTARTS AGE
my-cluster-entity-operator-746bcbb686-8hz2f 2/2 Running 1 (25h ago) 25h
my-cluster-kafka-0 1/1 Running 0 25h
my-cluster-kafka-1 1/1 Running 0 25h
my-cluster-kafka-2 1/1 Running 0 26h
my-cluster-zookeeper-0 1/1 Running 0 25h
my-cluster-zookeeper-1 1/1 Running 0 25h
my-cluster-zookeeper-2 1/1 Running 0 25h
kafka-ui-deployment-54585c7476-rjg86 1/1 Running 0 25h
strimzi-cluster-operator-56587547d6-5phmm 1/1 Running 0 25h
step-4 Created ingress to access kafka-ui on a specific domain
ingress.yaml
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: kafka-ingress
annotations:
kubernetes.io/ingress.class: nginx
labels:
app: kafka-ingress
environment: prod-lke
spec:
rules:
- host: kafkaui.xyz.com ---> you domain
http:
paths:
- pathType: Prefix
path: "/"
backend:
service:
name: kafka-ui-service
port:
number: 8080
kubectl apply -f ingress.yaml -n kafka
Accessing kafkaui.xyz.com Can access you domain by using username and password (Kindly change the username and password in kafka-ui.yaml) username- admin password- pass
Note:- By using the above kafka-ui configuration, this kafka-ui user admin is having all the admin access of your kafka cluster.
You can find out more about cloud data migration services here.
step-5 Creating kafkauser which can access your kafka cluster.
kafkauser.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaUser
metadata:
name: shristi
labels:
strimzi.io/cluster: my-cluster
spec:
authentication:
type: scram-sha-512
kubectl apply -f kafkauser.yaml -n kafka
Now using the below command you can access your cluster details
kubectl describe kafka my-cluster -n kafka
kubectl get svc my-cluster-kafka-external-bootstrap -n kafka -o=jsonpath='{.status.loadBalancer.ingress[0].hostname}:{.spec.ports[0].port}'
---- OUTPUT ----
172-232-86-103.ip.linodeusercontent.com:9094
kubectl get secret shristi -n kafka -o jsonpath='{.data.sasl\.jaas\.config}' | base64 -d
---- OUTPUT ----
org.apache.kafka.common.security.scram.ScramLoginModule required username="shristi" password="PhOjMkJNG2w3t1SJbgc2xDCHU6ACxt21";
Now create a test producer and test consumer to test you kafka cluster. example:- Producer.py
from confluent_kafka import Producer, KafkaError
import json
import time
# Kafka broker configuration
bootstrap_servers = "172-232-86-103.ip.linodeusercontent.com:9094"
sasl_plain_username = "shristi"
sasl_plain_password = "PhOjMkJNG2w3t1SJbgc2xDCHU6ACxt21"
max_message_size = 10000 # Example: Set the maximum message size in bytes
# Create producer configuration
conf = {
'bootstrap.servers': bootstrap_servers,
'security.protocol': 'SASL_PLAINTEXT',
'sasl.mechanism': 'SCRAM-SHA-512',
'sasl.username': sasl_plain_username,
'sasl.password': sasl_plain_password
}
# Create a Kafka producer instance
producer = Producer(**conf)
# Define the topic to produce messages to
topic = 'coto_mysql.frontend_prod_db.Category'
# Track message offsets and timestamps
message_offsets = {}
message_timestamps = {}
# Callback function to handle message delivery reports from Kafka broker
def delivery_report(err, msg):
if err is not None:
if err.code() == KafkaError._TRANSPORT:
print("Kafka broker connection issue detected. Possible downtime.")
print(f'Message delivery failed: {err}')
else:
print(f'Message delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}')
message_offsets[msg.key().decode('utf-8')] = msg.offset()
message_timestamps[msg.key().decode('utf-8')] = time.time()
# Function to partition messages if they exceed the maximum message size
def produce_messages(messages):
partitioned_messages = []
current_message = {'key': '', 'value': ''}
for i, message in enumerate(messages):
if len(json.dumps(message).encode('utf-8')) > max_message_size:
print(f"Message {i+1} exceeds maximum message size. It won't be sent.")
continue
if len(json.dumps(current_message).encode('utf-8')) + len(json.dumps(message).encode('utf-8')) <= max_message_size:
current_message['key'] += f" {message['key']}"
current_message['value'] += f" {message['value']}"
else:
partitioned_messages.append(current_message)
current_message = {'key': message['key'], 'value': message['value']}
partitioned_messages.append(current_message)
return partitioned_messages
# Produce messages continuously
while True:
messages = [{'key': f'key_{i}', 'value': f'value_{i}'} for i in range(10)]
partitioned_messages = produce_messages(messages)
for message in partitioned_messages:
producer.produce(topic, key=json.dumps(message['key']).encode('utf-8'), value=json.dumps(message['value']).encode('utf-8'), callback=delivery_report)
# Flush any remaining messages in the producer buffer
producer.flush()
# Wait for a while before producing the next set of messages
time.sleep(5) # Adjust the time interval as needed
run python3 Producer.py
Consumer.py
from confluent_kafka import Consumer, KafkaError
conf = {
'bootstrap.servers': "172-232-86-103.ip.linodeusercontent.com:9094",
'group.id': 'my_consumer_group',
'auto.offset.reset': 'earliest', # Start consuming from the earliest message
'sasl.mechanism': 'SCRAM-SHA-512',
'sasl.username': 'shristi',
'sasl.password': 'PhOjMkJNG2w3t1SJbgc2xDCHU6ACxt21',
'security.protocol': 'SASL_PLAINTEXT'
}
consumer = Consumer(conf)
topic = 'test-topic'
consumer.subscribe([topic])
try:
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
# End of partition
print('%% %s [%d] reached end at offset %d\n' %
(msg.topic(), msg.partition(), msg.offset()))
elif msg.error():
raise KafkaException(msg.error())
else:
# Proper message
print('Received message: {}'.format(msg.value().decode('utf-8')))
except KeyboardInterrupt:
pass
finally:
# Clean up on exit
consumer.close()
run python3 Consumer.py
Kindly observe the consumer lag by using kafka-ui

Note: I’ve hardcoded the above code. However, by utilizing the GitHub repository and GitHub Actions, one can incorporate environment secrets and variables and employ GitHub Actions workflow to dynamically configure the entire Kafka cluster. https://github.com/shristiopstree/Kafka
Stress Test Your Kafka cluster
To verify that kafka can handle the expected load and sustain performance under high traffic, you need to stress test your Kafka cluster. Here is a generic way of doing it:
- Test Scenarios Definition: For instance, different sizes of messages, network latencies or combinations of producer and consumer configurations.
- Prepare Test Environment: Create a test environment that imitates your production environment closely. This should include hardware; network configuration; kafka cluster configuration etc., make sure there are enough resources for simulating expected loads.
- Load Generation: Use Apache Kafka’s built-in tools such as kafka-producer-perf-test.sh and kafka-consumer-perf-test.sh or any third party tool like JMeter, Gatling or custom scripts for generating loads. Configure these tools so that they mimic real world situations based on defined test scenarios.
- Scale Up: If your cluster doesn’t meet the performance requirements, consider scaling up your Kafka cluster by adding more brokers, increasing the number of partitions, or tuning Kafka configuration parameters such as
num.io.threads
,num.network.threads
,log.segment.bytes
etc.
- To ensure the reliability and availability of data, perform failure scenario simulations.
- Carry out extended trials to discover stability and performance problems.
- For strong security measures, do a security stress test.
- Combine methods to evaluate robustness, scalability, and performance comprehensively.