Deploying a Production-Ready Kafka Cluster on Kubernetes with Strimzi

What is Event Streaming?

Event streams are similar to digital root systems in business, capturing real-time data from multiple sources for further processing and analysis.

Event Streaming is Used For?

Event streaming is used in a variety of industries for payments, logistics tracking, IoT data analytics, customer communication, healthcare analytics, data integration, and more

Apache Kafka is an event streaming platform. What does that mean?

  • Kafka allows you to publish and subscribe to event streams.
  • It preserves these streams reliably for as long as necessary.
  • You can process streams in real time or in retroactively.
  • Kafka is distributed, scalable, fault tolerant, and secure.
  • It works across platforms and can be monitored or fully managed by vendors.

How does Kafka work?

Kafka is a distributed system with servers and clients communicating via high-performance TCP. It can run on various platforms like bare-metal, VMs, or containers in on-premise or cloud setups.

  • Servers: Form a cluster across datacenters or cloud regions, with some acting as brokers for storage.
  • Clients: Enable building distributed apps and microservices to read, write, and process events at scale, even handling faults or network issues.
  • Broker: A server running Kafka that connects different services.
  • Event: Messages in Kafka stored as bytes on the broker’s disk.
  • Producer and Consumer: Services that send or receive events in Kafka.
  • Topic: A way to categorize events, like folders in a file system.
  • Partition: Parts of a topic used to handle data more efficiently.
  • Replication Factor: Copies of partitions for backup in the Kafka cluster.
  • Offset: Tracks which events a consumer has already read . A producer producing messages to a kafka topic with 3 partitions would look like this:
  • Zookeeper: Manages Kafka cluster status, permissions, and offsets.
  • Consumer Group: A group of consumers that work together to read from topics.

[ Are you looking: Kubernetes Implementation ]

Configuring a Kafka production-ready cluster

Step-2 Configuring kafka cluster

helm repo add strimzi https://strimzi.io/charts/


helm install strimzi-operator strimzi/strimzi-kafka-operator -n kafka

Here node affinity is used to run all kafka pods on a specific node pool.

kafka-cluster.yaml

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
  namespace: kafka
  labels:
    app: kafka
    environment: prod-lke
spec:
  kafka:
    version: 3.6.1
    replicas: 3
    listeners:
      - name: plain
        port: 9092
        type: internal
        tls: false
      - name: tls
        port: 9093
        type: internal
        tls: false
      - name: external
        port: 9094
        type: loadbalancer
        tls: false
        authentication:
          type: scram-sha-512
    resources:
      requests:
        memory: "16Gi"
        cpu: "2"
      limits:
        memory: "16Gi"
        cpu: "4"
    template:
      podDisruptionBudget:
        maxUnavailable: 0
      pod:
        affinity:
          nodeAffinity:
            requiredDuringSchedulingIgnoredDuringExecution:
              nodeSelectorTerms:
                - matchExpressions:
                    - key: lke.linode.com/pool-id
                      operator: In
                      values:
                        - "238182"   

        tolerations:
          - key: "kubernetes.azure.com/scalesetpriority"
            operator: "Equal"
            value: "spot"
            effect: "NoSchedule"
    config:
      default.replication.factor:                3
      min.insync.replicas:                       2
      offsets.topic.replication.factor:          3
      transaction.state.log.min.isr:             2
      transaction.state.log.replication.factor:  3
      zookeeper.connection.timeout.ms:           6000
    jmxOptions: {}
    storage:
      type: jbod
      volumes:
        - id: 0
          type: persistent-claim
          size: 25Gi
          deleteClaim: false
          class: linode-block-storage-retain 
  zookeeper:
    replicas: 3
    storage:
      type: persistent-claim
      size: 25Gi 
      class: linode-block-storage-retain 
      deleteClaim: false
    resources:
      requests:
        memory: "4Gi"
        cpu: "1"
      limits:
        memory: "4Gi"
        cpu: "2"
    jvmOptions:
      -Xms: 2048m
      -Xmx: 2048m
    template:
      podDisruptionBudget:
        maxUnavailable: 0
      pod:
        affinity:
          nodeAffinity:
            requiredDuringSchedulingIgnoredDuringExecution:
              nodeSelectorTerms:
                - matchExpressions:
                    - key: lke.linode.com/pool-id
                      operator: In
                      values:
                        - "238182"   
  entityOperator:
    topicOperator:
      resources:
        requests:
          memory: "256Mi"
          cpu: "200m"
        limits:
          memory: "256Mi"
          cpu: "1"
    userOperator:
      resources:
        requests:
          memory: "512Mi"
          cpu: "200m"
        limits:
          memory: "512Mi"
          cpu: "1"
    template:
      pod:
        affinity:
          nodeAffinity:
            requiredDuringSchedulingIgnoredDuringExecution:
              nodeSelectorTerms:
                - matchExpressions:
                    - key: lke.linode.com/pool-id
                      operator: In
                      values:
                        - "238182"
kubectl apply -f kafka-cluster.yaml -n kafka

[ Also Read: redis maxmemory best practice ]

step:3 – Kafka-ui monitoring tool is deployed which gives more insights about your kafka cluster.

kafka-ui.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: kafka-ui-deployment
  labels:
    app: kafka-ui
spec:
  replicas: 1
  selector:
    matchLabels:
      app: kafka-ui
  template:
    metadata:
      labels:
        app: kafka-ui
    spec:
      affinity:
        nodeAffinity:
          requiredDuringSchedulingIgnoredDuringExecution:
            nodeSelectorTerms:
            - matchExpressions:
              - key: lke.linode.com/pool-id
                operator: In
                values: 
                - "238182"
      containers:
      - name: kafka-ui
        image: provectuslabs/kafka-ui:latest
        env:
        - name: KAFKA_CLUSTERS_0_NAME
          value: "my-cluster"
        - name: KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS
          value: my-cluster-kafka-bootstrap.kafka.svc:9092
        - name: AUTH_TYPE
          value: "LOGIN_FORM"
        - name: SPRING_SECURITY_USER_NAME
          value: "admin"  
        - name: SPRING_SECURITY_USER_PASSWORD
          value: "pass"   
        imagePullPolicy: Always
        resources:
          requests:
            memory: "256Mi"
            cpu: "100m"
          limits:
            memory: "1024Mi"
            cpu: "1000m"
        ports:
        - containerPort: 8080
---
apiVersion: v1
kind: Service
metadata:
  name: kafka-ui-service
spec:
  selector:
    app: kafka-ui
  ports:
    - protocol: TCP
      port: 8080  
  type: LoadBalancer  
kubectl apply -f kafka-ui.yaml -n kafka

Output of the above files after applying them


kubectl get pods -n kafka
NAME                                                     READY   STATUS    RESTARTS      AGE
 my-cluster-entity-operator-746bcbb686-8hz2f   2/2     Running   1 (25h ago)   25h
 my-cluster-kafka-0                            1/1     Running   0             25h
 my-cluster-kafka-1                            1/1     Running   0             25h
 my-cluster-kafka-2                            1/1     Running   0             26h
 my-cluster-zookeeper-0                        1/1     Running   0             25h
 my-cluster-zookeeper-1                        1/1     Running   0             25h
 my-cluster-zookeeper-2                        1/1     Running   0             25h
 kafka-ui-deployment-54585c7476-rjg86                     1/1     Running   0             25h
 strimzi-cluster-operator-56587547d6-5phmm                1/1     Running   0             25h

step-4 Created ingress to access kafka-ui on a specific domain

ingress.yaml

apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
  name: kafka-ingress
  annotations:
    kubernetes.io/ingress.class: nginx
  labels:
    app: kafka-ingress
    environment: prod-lke
spec:
  rules:
  - host: kafkaui.xyz.com ---> you domain
    http:
      paths:
      - pathType: Prefix
        path: "/"
        backend:
          service:
            name: kafka-ui-service
            port:
              number: 8080

kubectl apply -f ingress.yaml -n kafka

Accessing kafkaui.xyz.com Can access you domain by using username and password (Kindly change the username and password in kafka-ui.yaml) username- admin password- pass

Note:- By using the above kafka-ui configuration, this kafka-ui user admin is having all the admin access of your kafka cluster.

You can find out more about cloud data migration services here.

step-5 Creating kafkauser which can access your kafka cluster.

kafkauser.yaml


apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaUser
metadata:
  name: shristi
  labels:
    strimzi.io/cluster: my-cluster
spec:
  authentication:
    type: scram-sha-512
kubectl apply -f kafkauser.yaml -n kafka

Now using the below command you can access your cluster details

kubectl describe kafka my-cluster -n kafka


kubectl get svc my-cluster-kafka-external-bootstrap -n kafka -o=jsonpath='{.status.loadBalancer.ingress[0].hostname}:{.spec.ports[0].port}'

---- OUTPUT ----
172-232-86-103.ip.linodeusercontent.com:9094


 kubectl get secret shristi -n kafka -o jsonpath='{.data.sasl\.jaas\.config}' | base64 -d

---- OUTPUT ----
org.apache.kafka.common.security.scram.ScramLoginModule required username="shristi" password="PhOjMkJNG2w3t1SJbgc2xDCHU6ACxt21";

Now create a test producer and test consumer to test you kafka cluster. example:- Producer.py



from confluent_kafka import Producer, KafkaError
import json
import time

# Kafka broker configuration
bootstrap_servers = "172-232-86-103.ip.linodeusercontent.com:9094"
sasl_plain_username = "shristi"
sasl_plain_password = "PhOjMkJNG2w3t1SJbgc2xDCHU6ACxt21"
max_message_size = 10000  # Example: Set the maximum message size in bytes

# Create producer configuration
conf = {
    'bootstrap.servers': bootstrap_servers,
    'security.protocol': 'SASL_PLAINTEXT',
    'sasl.mechanism': 'SCRAM-SHA-512',
    'sasl.username': sasl_plain_username,
    'sasl.password': sasl_plain_password
}

# Create a Kafka producer instance
producer = Producer(**conf)

# Define the topic to produce messages to
topic = 'coto_mysql.frontend_prod_db.Category'

# Track message offsets and timestamps
message_offsets = {}
message_timestamps = {}

# Callback function to handle message delivery reports from Kafka broker
def delivery_report(err, msg):
    if err is not None:
        if err.code() == KafkaError._TRANSPORT:
            print("Kafka broker connection issue detected. Possible downtime.")
        print(f'Message delivery failed: {err}')
    else:
        print(f'Message delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}')
        message_offsets[msg.key().decode('utf-8')] = msg.offset()
        message_timestamps[msg.key().decode('utf-8')] = time.time()

# Function to partition messages if they exceed the maximum message size
def produce_messages(messages):
    partitioned_messages = []
    current_message = {'key': '', 'value': ''}
    for i, message in enumerate(messages):
        if len(json.dumps(message).encode('utf-8')) > max_message_size:
            print(f"Message {i+1} exceeds maximum message size. It won't be sent.")
            continue
        if len(json.dumps(current_message).encode('utf-8')) + len(json.dumps(message).encode('utf-8')) <= max_message_size:
            current_message['key'] += f" {message['key']}"
            current_message['value'] += f" {message['value']}"
        else:
            partitioned_messages.append(current_message)
            current_message = {'key': message['key'], 'value': message['value']}
    partitioned_messages.append(current_message)
    return partitioned_messages

# Produce messages continuously
while True:
    messages = [{'key': f'key_{i}', 'value': f'value_{i}'} for i in range(10)]
    partitioned_messages = produce_messages(messages)
    for message in partitioned_messages:
        producer.produce(topic, key=json.dumps(message['key']).encode('utf-8'), value=json.dumps(message['value']).encode('utf-8'), callback=delivery_report)
    
    # Flush any remaining messages in the producer buffer
    producer.flush()
    
    # Wait for a while before producing the next set of messages
    time.sleep(5)  # Adjust the time interval as needed

run python3 Producer.py

Consumer.py

from confluent_kafka import Consumer, KafkaError

conf = {
    'bootstrap.servers': "172-232-86-103.ip.linodeusercontent.com:9094",
    'group.id': 'my_consumer_group',
    'auto.offset.reset': 'earliest',  # Start consuming from the earliest message
    'sasl.mechanism': 'SCRAM-SHA-512',
    'sasl.username': 'shristi',
    'sasl.password': 'PhOjMkJNG2w3t1SJbgc2xDCHU6ACxt21',
    'security.protocol': 'SASL_PLAINTEXT'
}

consumer = Consumer(conf)

topic = 'test-topic'
consumer.subscribe([topic])

try:
    while True:
        msg = consumer.poll(timeout=1.0)
        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                # End of partition
                print('%% %s [%d] reached end at offset %d\n' %
                      (msg.topic(), msg.partition(), msg.offset()))
            elif msg.error():
                raise KafkaException(msg.error())
        else:
            # Proper message
            print('Received message: {}'.format(msg.value().decode('utf-8')))

except KeyboardInterrupt:
    pass

finally:
    # Clean up on exit
    consumer.close()

run python3 Consumer.py

Kindly observe the consumer lag by using kafka-ui

Note: I’ve hardcoded the above code. However, by utilizing the GitHub repository and GitHub Actions, one can incorporate environment secrets and variables and employ GitHub Actions workflow to dynamically configure the entire Kafka cluster. https://github.com/shristiopstree/Kafka

Stress Test Your Kafka cluster

To verify that kafka can handle the expected load and sustain performance under high traffic, you need to stress test your Kafka cluster. Here is a generic way of doing it:

  • Test Scenarios Definition: For instance, different sizes of messages, network latencies or combinations of producer and consumer configurations.
  • Prepare Test Environment: Create a test environment that imitates your production environment closely. This should include hardware; network configuration; kafka cluster configuration etc., make sure there are enough resources for simulating expected loads.
  • Load Generation: Use Apache Kafka’s built-in tools such as kafka-producer-perf-test.sh and kafka-consumer-perf-test.sh or any third party tool like JMeter, Gatling or custom scripts for generating loads. Configure these tools so that they mimic real world situations based on defined test scenarios.
  • Scale Up: If your cluster doesn’t meet the performance requirements, consider scaling up your Kafka cluster by adding more brokers, increasing the number of partitions, or tuning Kafka configuration parameters such as num.io.threads, num.network.threads,log.segment.bytes etc.
  • To ensure the reliability and availability of data, perform failure scenario simulations.
  • Carry out extended trials to discover stability and performance problems.
  • For strong security measures, do a security stress test.
  • Combine methods to evaluate robustness, scalability, and performance comprehensively.

Author: Shristi Gupta

Devops, Devops Solutioning, Linux, Terraform, AWS

Leave a Reply