{"id":18421,"date":"2024-05-14T13:55:11","date_gmt":"2024-05-14T08:25:11","guid":{"rendered":"https:\/\/opstree.com\/blog\/?p=18421"},"modified":"2026-03-05T18:06:14","modified_gmt":"2026-03-05T12:36:14","slug":"deploying-a-production-ready-kafka-cluster-on-kubernetes-with-strimzi","status":"publish","type":"post","link":"https:\/\/opstree.com\/blog\/2024\/05\/14\/deploying-a-production-ready-kafka-cluster-on-kubernetes-with-strimzi\/","title":{"rendered":"Kafka on Kubernetes Made Easy with Strimzi (Step-by-Step Deployment)"},"content":{"rendered":"\r\n<p>Using the open-source Strimzi operator simplifies the process of deploying Apache Kafka on Kubernetes, as it treats Kafka components like native Kubernetes resources.<\/p>\r\n<p class=\"wp-block-image size-full\"><img decoding=\"async\" class=\"wp-image-18626\" src=\"https:\/\/blog.opstree.com\/wp-content\/uploads\/2024\/05\/image.png\" alt=\"\" \/>Deploying <strong data-start=\"450\" data-end=\"486\">Kafka on Kubernetes with Strimzi<\/strong> has quickly become the industry-standard approach for building scalable and production-ready data streaming platforms. As modern applications generate massive volumes of real-time data, organizations need a reliable, flexible, and automated way to manage Kafka clusters. Kubernetes provides the perfect foundation with its built-in orchestration, scaling, and self-healing capabilities, making it ideal for running stateful distributed systems like Kafka.<\/p>\r\n<h2 class=\"wp-block-image size-full\"><strong>What is Event Streaming?<\/strong><\/h2>\r\n\r\n\r\n\r\n\r\n\r\n<p class=\"has-small-font-size\">Event streams are similar to digital root systems in business, capturing real-time data from multiple sources for further processing and analysis.<!--more--><\/p>\r\n\r\n\r\n\r\n<h2 class=\"is-style-default has-medium-font-size\"><strong> Event Streaming is Used For?<\/strong><\/h2>\r\n\r\n\r\n\r\n<p class=\"has-small-font-size\">Event streaming is used in a variety of industries for payments, logistics tracking, IoT data analytics, customer communication, healthcare analytics, <a href=\"https:\/\/opstree.com\/services\/middleware-database-and-data-engineering\/\"><strong>data integration<\/strong><\/a>, and more<\/p>\r\n\r\n\r\n\r\n<h2 class=\"has-medium-font-size\"><strong>Apache Kafka is an event streaming platform. What does that mean?<\/strong><\/h2>\r\n\r\n\r\n\r\n<ul>\r\n<li class=\"has-small-font-size\">Kafka allows you to publish and subscribe to event streams.<\/li>\r\n\r\n\r\n\r\n<li class=\"has-small-font-size\">It preserves these streams reliably for as long as necessary.<\/li>\r\n\r\n\r\n\r\n<li class=\"has-small-font-size\">You can process streams in real time or in retroactively.<\/li>\r\n\r\n\r\n\r\n<li class=\"has-small-font-size\">Kafka is distributed, scalable, fault tolerant, and secure.<\/li>\r\n\r\n\r\n\r\n<li class=\"has-small-font-size\">It works across platforms and can be monitored or fully managed by vendors.<\/li>\r\n<\/ul>\r\n\r\n\r\n\r\n<h2 class=\"wp-block-heading has-medium-font-size\"><strong>How does Kafka work?<\/strong><\/h2>\r\n\r\n\r\n\r\n<p class=\"has-small-font-size\">Kafka is a distributed system with servers and clients communicating via high-performance TCP. It can run on various platforms like bare-metal, VMs, or containers in on-premise or <a href=\"https:\/\/opstree.com\/blog\/2024\/06\/13\/devops-cloud-migration\/\">cloud setups<\/a>.<\/p>\r\n\r\n\r\n\r\n<ul>\r\n<li class=\"has-small-font-size\"><strong>Servers<\/strong>: Form a cluster across datacenters or cloud regions, with some acting as brokers for storage.<\/li>\r\n\r\n\r\n\r\n<li class=\"has-small-font-size\"><strong>Clients<\/strong>: Enable building distributed apps and microservices to read, write, and process events at scale, even handling faults or network issues.<\/li>\r\n<li class=\"has-small-font-size\"><strong>Broker<\/strong>: A server running Kafka that connects different services.<\/li>\r\n<li class=\"has-small-font-size\"><strong>Event<\/strong>: Messages in Kafka stored as bytes on the broker&#8217;s disk.<\/li>\r\n<li class=\"has-small-font-size\"><strong>Producer and Consumer<\/strong>: Services that send or receive events in Kafka.<\/li>\r\n<li class=\"has-small-font-size\"><strong>Topic<\/strong>: A way to categorize events, like folders in a file system.<\/li>\r\n<li class=\"has-small-font-size\"><strong>Partition<\/strong>: Parts of a topic used to handle data more efficiently.<\/li>\r\n<li class=\"has-small-font-size\"><strong>Replication Factor<\/strong>: Copies of partitions for backup in the Kafka cluster.<\/li>\r\n<li class=\"has-small-font-size\"><strong>Offset<\/strong>: Tracks which events a consumer has already read . A producer producing messages to a kafka topic with 3 partitions would look like this:<\/li>\r\n<li class=\"has-small-font-size\"><strong>Zookeeper<\/strong>: Manages Kafka cluster status, permissions, and offsets.<\/li>\r\n<li class=\"has-small-font-size\"><strong>Consumer Group<\/strong>: A group of consumers that work together to read from topics.<\/li>\r\n<\/ul>\r\n<p><strong>[ Are you looking: <a href=\"https:\/\/opstree.com\/services\/devsecops-transformation-and-automation\/\">Kubernetes Implementation<\/a> ]<\/strong><\/p>\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n<ul>\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n<\/ul>\r\n\r\n\r\n\r\n\r\n\r\n<ul>\r\n\r\n<\/ul>\r\n\r\n\r\n\r\n<h2 class=\"has-medium-font-size\"><strong>Configuring a Kafka production-ready cluster<\/strong><\/h2>\r\n\r\n\r\n\r\n<p class=\"has-dark-gray-color has-text-color has-link-color has-small-font-size wp-elements-3a5a90491f4dbc46fe66eb245bad6017\"><strong>Step-1 Add strimzi repo and install strimzi-operator by using helm<\/strong><\/p>\r\n\r\n\r\n\r\n<p class=\"has-small-font-size\"><strong>Step-2 Configuring kafka cluster<\/strong><\/p>\r\n\r\n\r\n\r\n<pre class=\"wp-block-code has-small-font-size\"><code>helm repo add strimzi https:\/\/strimzi.io\/charts\/\r\n\r\n\r\nhelm install strimzi-operator strimzi\/strimzi-kafka-operator -n kafka\r\n<\/code><\/pre>\r\n\r\n\r\n\r\n<p class=\"has-small-font-size\">Here node affinity is used to run all <a href=\"https:\/\/opstree.com\/blog\/2024\/12\/31\/stream-and-analyze-postgresql-data-from-s3-using-kafka-and-ksqldb-part-2\/\">kafka<\/a> pods on a specific node pool.<\/p>\r\n\r\n\r\n\r\n<p class=\"has-small-font-size\">kafka-cluster.yaml<\/p>\r\n\r\n\r\n\r\n<pre class=\"wp-block-code has-small-font-size\"><code>apiVersion: kafka.strimzi.io\/v1beta2\r\nkind: Kafka\r\nmetadata:\r\n  name: my-cluster\r\n  namespace: kafka\r\n  labels:\r\n    app: kafka\r\n    environment: prod-lke\r\nspec:\r\n  kafka:\r\n    version: 3.6.1\r\n    replicas: 3\r\n    listeners:\r\n      - name: plain\r\n        port: 9092\r\n        type: internal\r\n        tls: false\r\n      - name: tls\r\n        port: 9093\r\n        type: internal\r\n        tls: false\r\n      - name: external\r\n        port: 9094\r\n        type: loadbalancer\r\n        tls: false\r\n        authentication:\r\n          type: scram-sha-512\r\n    resources:\r\n      requests:\r\n        memory: \"16Gi\"\r\n        cpu: \"2\"\r\n      limits:\r\n        memory: \"16Gi\"\r\n        cpu: \"4\"\r\n    template:\r\n      podDisruptionBudget:\r\n        maxUnavailable: 0\r\n      pod:\r\n        affinity:\r\n          nodeAffinity:\r\n            requiredDuringSchedulingIgnoredDuringExecution:\r\n              nodeSelectorTerms:\r\n                - matchExpressions:\r\n                    - key: lke.linode.com\/pool-id\r\n                      operator: In\r\n                      values:\r\n                        - \"238182\"   \r\n\r\n        tolerations:\r\n          - key: \"kubernetes.azure.com\/scalesetpriority\"\r\n            operator: \"Equal\"\r\n            value: \"spot\"\r\n            effect: \"NoSchedule\"\r\n    config:\r\n      default.replication.factor:                3\r\n      min.insync.replicas:                       2\r\n      offsets.topic.replication.factor:          3\r\n      transaction.state.log.min.isr:             2\r\n      transaction.state.log.replication.factor:  3\r\n      zookeeper.connection.timeout.ms:           6000\r\n    jmxOptions: {}\r\n    storage:\r\n      type: jbod\r\n      volumes:\r\n        - id: 0\r\n          type: persistent-claim\r\n          size: 25Gi\r\n          deleteClaim: false\r\n          class: linode-block-storage-retain \r\n  zookeeper:\r\n    replicas: 3\r\n    storage:\r\n      type: persistent-claim\r\n      size: 25Gi \r\n      class: linode-block-storage-retain \r\n      deleteClaim: false\r\n    resources:\r\n      requests:\r\n        memory: \"4Gi\"\r\n        cpu: \"1\"\r\n      limits:\r\n        memory: \"4Gi\"\r\n        cpu: \"2\"\r\n    jvmOptions:\r\n      -Xms: 2048m\r\n      -Xmx: 2048m\r\n    template:\r\n      podDisruptionBudget:\r\n        maxUnavailable: 0\r\n      pod:\r\n        affinity:\r\n          nodeAffinity:\r\n            requiredDuringSchedulingIgnoredDuringExecution:\r\n              nodeSelectorTerms:\r\n                - matchExpressions:\r\n                    - key: lke.linode.com\/pool-id\r\n                      operator: In\r\n                      values:\r\n                        - \"238182\"   \r\n  entityOperator:\r\n    topicOperator:\r\n      resources:\r\n        requests:\r\n          memory: \"256Mi\"\r\n          cpu: \"200m\"\r\n        limits:\r\n          memory: \"256Mi\"\r\n          cpu: \"1\"\r\n    userOperator:\r\n      resources:\r\n        requests:\r\n          memory: \"512Mi\"\r\n          cpu: \"200m\"\r\n        limits:\r\n          memory: \"512Mi\"\r\n          cpu: \"1\"\r\n    template:\r\n      pod:\r\n        affinity:\r\n          nodeAffinity:\r\n            requiredDuringSchedulingIgnoredDuringExecution:\r\n              nodeSelectorTerms:\r\n                - matchExpressions:\r\n                    - key: lke.linode.com\/pool-id\r\n                      operator: In\r\n                      values:\r\n                        - \"238182\"\r\n<\/code><\/pre>\r\n\r\n\r\n\r\n<pre class=\"wp-block-code has-small-font-size\"><code>kubectl apply -f kafka-cluster.yaml -n kafka<\/code><\/pre>\r\n\r\n\r\n\r\n<p><strong>[ Also Read: <a href=\"https:\/\/opstree.com\/blog\/2019\/04\/16\/redis-best-practices-and-performance-tuning\/\">redis maxmemory best practice<\/a> ]<\/strong><\/p>\r\n<p class=\"has-small-font-size\"><strong>step:3 &#8211; Kafka-ui monitoring tool is deployed which gives more insights about your kafka cluster.<\/strong><\/p>\r\n\r\n\r\n\r\n<p class=\"has-small-font-size\">kafka-ui.yaml<\/p>\r\n\r\n\r\n\r\n<pre class=\"wp-block-code has-small-font-size\"><code>apiVersion: apps\/v1\r\nkind: Deployment\r\nmetadata:\r\n  name: kafka-ui-deployment\r\n  labels:\r\n    app: kafka-ui\r\nspec:\r\n  replicas: 1\r\n  selector:\r\n    matchLabels:\r\n      app: kafka-ui\r\n  template:\r\n    metadata:\r\n      labels:\r\n        app: kafka-ui\r\n    spec:\r\n      affinity:\r\n        nodeAffinity:\r\n          requiredDuringSchedulingIgnoredDuringExecution:\r\n            nodeSelectorTerms:\r\n            - matchExpressions:\r\n              - key: lke.linode.com\/pool-id\r\n                operator: In\r\n                values: \r\n                - \"238182\"\r\n      containers:\r\n      - name: kafka-ui\r\n        image: provectuslabs\/kafka-ui:latest\r\n        env:\r\n        - name: KAFKA_CLUSTERS_0_NAME\r\n          value: \"my-cluster\"\r\n        - name: KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS\r\n          value: my-cluster-kafka-bootstrap.kafka.svc:9092\r\n        - name: AUTH_TYPE\r\n          value: \"LOGIN_FORM\"\r\n        - name: SPRING_SECURITY_USER_NAME\r\n          value: \"admin\"  \r\n        - name: SPRING_SECURITY_USER_PASSWORD\r\n          value: \"pass\"   \r\n        imagePullPolicy: Always\r\n        resources:\r\n          requests:\r\n            memory: \"256Mi\"\r\n            cpu: \"100m\"\r\n          limits:\r\n            memory: \"1024Mi\"\r\n            cpu: \"1000m\"\r\n        ports:\r\n        - containerPort: 8080\r\n---\r\napiVersion: v1\r\nkind: Service\r\nmetadata:\r\n  name: kafka-ui-service\r\nspec:\r\n  selector:\r\n    app: kafka-ui\r\n  ports:\r\n    - protocol: TCP\r\n      port: 8080  \r\n  type: LoadBalancer  \r\n<\/code><\/pre>\r\n\r\n\r\n\r\n<pre class=\"wp-block-code has-small-font-size\"><code>kubectl apply -f kafka-ui.yaml -n kafka<\/code><\/pre>\r\n\r\n\r\n\r\n<p class=\"has-small-font-size\">Output of the above files after applying them<\/p>\r\n\r\n\r\n\r\n<pre class=\"wp-block-code has-small-font-size\"><code>\r\nkubectl get pods -n kafka\r\nNAME                                                     READY   STATUS    RESTARTS      AGE\r\n my-cluster-entity-operator-746bcbb686-8hz2f   2\/2     Running   1 (25h ago)   25h\r\n my-cluster-kafka-0                            1\/1     Running   0             25h\r\n my-cluster-kafka-1                            1\/1     Running   0             25h\r\n my-cluster-kafka-2                            1\/1     Running   0             26h\r\n my-cluster-zookeeper-0                        1\/1     Running   0             25h\r\n my-cluster-zookeeper-1                        1\/1     Running   0             25h\r\n my-cluster-zookeeper-2                        1\/1     Running   0             25h\r\n kafka-ui-deployment-54585c7476-rjg86                     1\/1     Running   0             25h\r\n strimzi-cluster-operator-56587547d6-5phmm                1\/1     Running   0             25h<\/code><\/pre>\r\n\r\n\r\n\r\n<p class=\"has-small-font-size\"><strong>step-4 Created ingress to access kafka-ui on a specific domain<\/strong><\/p>\r\n\r\n\r\n\r\n<p class=\"has-small-font-size\">ingress.yaml<\/p>\r\n\r\n\r\n\r\n<pre class=\"wp-block-code has-small-font-size\"><code>apiVersion: networking.k8s.io\/v1\r\nkind: Ingress\r\nmetadata:\r\n  name: kafka-ingress\r\n  annotations:\r\n    kubernetes.io\/ingress.class: nginx\r\n  labels:\r\n    app: kafka-ingress\r\n    environment: prod-lke\r\nspec:\r\n  rules:\r\n  - host: <strong>kafkaui.xyz.com ---&gt; you domain<\/strong>\r\n    http:\r\n      paths:\r\n      - pathType: Prefix\r\n        path: \"\/\"\r\n        backend:\r\n          service:\r\n            name: kafka-ui-service\r\n            port:\r\n              number: 8080\r\n<\/code><\/pre>\r\n\r\n\r\n\r\n<pre class=\"wp-block-code has-small-font-size\"><code>\r\nkubectl apply -f ingress.yaml -n kafka<\/code><\/pre>\r\n\r\n\r\n\r\n<p class=\"has-small-font-size\">Accessing <strong>kafkaui.xyz.com<\/strong> Can access you domain by using username and password (Kindly change the username and password in kafka-ui.yaml) username- admin password- pass<\/p>\r\n\r\n\r\n\r\n<p class=\"has-small-font-size\">Note:- By using the above kafka-ui configuration, this kafka-ui user admin is having all the admin access of your kafka cluster.<\/p>\r\n<p>You can find out more about <a href=\"https:\/\/opstree.com\/services\/cloud-engineering-modernisation-migrations\/\">cloud data migration services<\/a> here.<\/p>\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n<p class=\"has-small-font-size\"><strong>step-5 Creating kafkauser which can access your kafka cluster.<\/strong><\/p>\r\n\r\n\r\n\r\n<p class=\"has-small-font-size\">kafkauser.yaml<\/p>\r\n\r\n\r\n\r\n<pre class=\"wp-block-code has-small-font-size\"><code>\r\napiVersion: kafka.strimzi.io\/v1beta2\r\nkind: KafkaUser\r\nmetadata:\r\n  name: shristi\r\n  labels:\r\n    strimzi.io\/cluster: my-cluster\r\nspec:\r\n  authentication:\r\n    type: scram-sha-512\r\n<\/code><\/pre>\r\n\r\n\r\n\r\n<pre class=\"wp-block-code has-small-font-size\"><code>kubectl apply -f kafkauser.yaml -n kafka<\/code><\/pre>\r\n\r\n\r\n\r\n<p class=\"has-small-font-size\">Now using the below command you can access your cluster details<\/p>\r\n\r\n\r\n\r\n<pre class=\"wp-block-code has-small-font-size\"><code>kubectl describe kafka my-cluster -n kafka\r\n\r\n\r\nkubectl get svc my-cluster-kafka-external-bootstrap -n kafka -o=jsonpath='{.status.loadBalancer.ingress[0].hostname}:{.spec.ports[0].port}'\r\n\r\n---- OUTPUT ----\r\n<strong>172-232-86-103.ip.linodeusercontent.com:9094<\/strong>\r\n\r\n\r\n kubectl get secret shristi -n kafka -o jsonpath='{.data.sasl\\.jaas\\.config}' | base64 -d\r\n\r\n---- OUTPUT ----\r\norg.apache.kafka.common.security.scram.ScramLoginModule required username=\"<strong>shristi<\/strong>\" password=\"<strong>PhOjMkJNG2w3t1SJbgc2xDCHU6ACxt21<\/strong>\";\r\n\r\n<\/code><\/pre>\r\n\r\n\r\n\r\n<p class=\"has-small-font-size\">Now create a test producer and test consumer to test you kafka cluster. example:- Producer.py<\/p>\r\n\r\n\r\n\r\n<pre class=\"wp-block-code has-small-font-size\"><code>\r\n\r\nfrom confluent_kafka import Producer, KafkaError\r\nimport json\r\nimport time\r\n\r\n# Kafka broker configuration\r\nbootstrap_servers = \"<strong>172-232-86-103.ip.linodeusercontent.com:9094\"\r\n<\/strong>sasl_plain_username<strong> = \"shristi\"\r\n<\/strong>sasl_plain_password<strong> = \"PhOjMkJNG2w3t1SJbgc2xDCHU6ACxt21\"<\/strong>\r\nmax_message_size = 10000  # Example: Set the maximum message size in bytes\r\n\r\n# Create producer configuration\r\nconf = {\r\n    'bootstrap.servers': bootstrap_servers,\r\n    'security.protocol': 'SASL_PLAINTEXT',\r\n    'sasl.mechanism': 'SCRAM-SHA-512',\r\n    'sasl.username': sasl_plain_username,\r\n    'sasl.password': sasl_plain_password\r\n}\r\n\r\n# Create a Kafka producer instance\r\nproducer = Producer(**conf)\r\n\r\n# Define the topic to produce messages to\r\ntopic = 'coto_mysql.frontend_prod_db.Category'\r\n\r\n# Track message offsets and timestamps\r\nmessage_offsets = {}\r\nmessage_timestamps = {}\r\n\r\n# Callback function to handle message delivery reports from Kafka broker\r\ndef delivery_report(err, msg):\r\n    if err is not None:\r\n        if err.code() == KafkaError._TRANSPORT:\r\n            print(\"Kafka broker connection issue detected. Possible downtime.\")\r\n        print(f'Message delivery failed: {err}')\r\n    else:\r\n        print(f'Message delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}')\r\n        message_offsets[msg.key().decode('utf-8')] = msg.offset()\r\n        message_timestamps[msg.key().decode('utf-8')] = time.time()\r\n\r\n# Function to partition messages if they exceed the maximum message size\r\ndef produce_messages(messages):\r\n    partitioned_messages = []\r\n    current_message = {'key': '', 'value': ''}\r\n    for i, message in enumerate(messages):\r\n        if len(json.dumps(message).encode('utf-8')) &gt; max_message_size:\r\n            print(f\"Message {i+1} exceeds maximum message size. It won't be sent.\")\r\n            continue\r\n        if len(json.dumps(current_message).encode('utf-8')) + len(json.dumps(message).encode('utf-8')) &lt;= max_message_size:\r\n            current_message['key'] += f\" {message['key']}\"\r\n            current_message['value'] += f\" {message['value']}\"\r\n        else:\r\n            partitioned_messages.append(current_message)\r\n            current_message = {'key': message['key'], 'value': message['value']}\r\n    partitioned_messages.append(current_message)\r\n    return partitioned_messages\r\n\r\n# Produce messages continuously\r\nwhile True:\r\n    messages = [{'key': f'key_{i}', 'value': f'value_{i}'} for i in range(10)]\r\n    partitioned_messages = produce_messages(messages)\r\n    for message in partitioned_messages:\r\n        producer.produce(topic, key=json.dumps(message['key']).encode('utf-8'), value=json.dumps(message['value']).encode('utf-8'), callback=delivery_report)\r\n    \r\n    # Flush any remaining messages in the producer buffer\r\n    producer.flush()\r\n    \r\n    # Wait for a while before producing the next set of messages\r\n    time.sleep(5)  # Adjust the time interval as needed\r\n\r\n<\/code><\/pre>\r\n\r\n\r\n\r\n<p class=\"has-small-font-size\">run <code> python3 Producer.py<\/code><\/p>\r\n\r\n\r\n\r\n<p class=\"has-small-font-size\">Consumer.py<\/p>\r\n\r\n\r\n\r\n<pre class=\"wp-block-code has-small-font-size\"><code>from confluent_kafka import Consumer, KafkaError\r\n\r\nconf = {\r\n    'bootstrap.servers': \"<strong>172-232-86-103.ip.linodeusercontent.com:9094<\/strong>\",\r\n    'group.id': 'my_consumer_group',\r\n    'auto.offset.reset': 'earliest',  # Start consuming from the earliest message\r\n    'sasl.mechanism': 'SCRAM-SHA-512',\r\n    'sasl.username': '<strong>shristi<\/strong>',\r\n    'sasl.password': '<strong>PhOjMkJNG2w3t1SJbgc2xDCHU6ACxt2<\/strong>1',\r\n    'security.protocol': 'SASL_PLAINTEXT'\r\n}\r\n\r\nconsumer = Consumer(conf)\r\n\r\ntopic = 'test-topic'\r\nconsumer.subscribe([topic])\r\n\r\ntry:\r\n    while True:\r\n        msg = consumer.poll(timeout=1.0)\r\n        if msg is None:\r\n            continue\r\n        if msg.error():\r\n            if msg.error().code() == KafkaError._PARTITION_EOF:\r\n                # End of partition\r\n                print('%% %s [%d] reached end at offset %d\\n' %\r\n                      (msg.topic(), msg.partition(), msg.offset()))\r\n            elif msg.error():\r\n                raise KafkaException(msg.error())\r\n        else:\r\n            # Proper message\r\n            print('Received message: {}'.format(msg.value().decode('utf-8')))\r\n\r\nexcept KeyboardInterrupt:\r\n    pass\r\n\r\nfinally:\r\n    # Clean up on exit\r\n    consumer.close()\r\n<\/code><\/pre>\r\n\r\n\r\n\r\n<p class=\"has-small-font-size\">run <code> python3 Consumer.py<\/code><\/p>\r\n\r\n\r\n\r\n<p class=\"has-small-font-size\">Kindly observe the consumer lag by using kafka-ui<\/p>\r\n\r\n\r\n\r\n<figure class=\"wp-block-image size-large\"><img decoding=\"async\" class=\"wp-image-18506\" src=\"https:\/\/blog.opstree.com\/wp-content\/uploads\/2024\/04\/image-11-1024x309.png\" alt=\"\" \/><\/figure>\r\n\r\n\r\n\r\n<p class=\"is-style-info has-small-font-size\">Note: I&#8217;ve hardcoded the above code. However, by utilizing the GitHub repository and GitHub Actions, one can incorporate environment secrets and variables and employ GitHub Actions workflow to dynamically configure the entire Kafka cluster. <a href=\"https:\/\/github.com\/shristiopstree\/Kafka\" target=\"_blank\" rel=\"noopener\">https:\/\/github.com\/shristiopstree\/Kafka<\/a><\/p>\r\n\r\n\r\n\r\n<h3 class=\"has-medium-font-size\"><strong>Stress Test Your Kafka cluster<\/strong><\/h3>\r\n\r\n\r\n\r\n<p class=\"has-small-font-size\">To verify that kafka can handle the expected load and sustain performance under high traffic, you need to stress test your Kafka cluster. Here is a generic way of doing it:<\/p>\r\n\r\n\r\n\r\n<ul>\r\n<li class=\"has-small-font-size\"><strong>Test Scenarios Definition<\/strong>: For instance, different sizes of messages, network latencies or combinations of producer and consumer configurations.<\/li>\r\n<\/ul>\r\n\r\n\r\n\r\n<ul>\r\n<li class=\"has-small-font-size\"><strong>Prepare Test Environment<\/strong>: Create a test environment that imitates your production environment closely. This should include hardware; network configuration; kafka cluster configuration etc., make sure there are enough resources for simulating expected loads.<\/li>\r\n<\/ul>\r\n\r\n\r\n\r\n<ul>\r\n<li class=\"has-small-font-size\"><strong>Load Generation<\/strong>: Use<a href=\"https:\/\/opstree.com\/blog\/2024\/09\/27\/apache-flink-for-real-time-stream-processing\/\"> Apache Kafka&#8217;s<\/a> built-in tools such as kafka-producer-perf-test.sh and kafka-consumer-perf-test.sh or any third party tool like JMeter, Gatling or custom scripts for generating loads. Configure these tools so that they mimic real world situations based on defined test scenarios.<\/li>\r\n<\/ul>\r\n\r\n\r\n\r\n<ul>\r\n<li class=\"has-small-font-size\"><strong>Scale Up<\/strong>: If your cluster doesn&#8217;t meet the performance requirements, consider scaling up your Kafka cluster by adding more brokers, increasing the number of partitions, or tuning Kafka configuration parameters such as <code>num.io.threads<\/code>, <code>num.network.threads<\/code>,<code>log.segment.bytes<\/code> etc.<\/li>\r\n<\/ul>\r\n\r\n\r\n\r\n<ul class=\"has-small-font-size\">\r\n<li>To ensure the reliability and availability of data, perform failure scenario simulations.<\/li>\r\n\r\n\r\n\r\n<li>Carry out extended trials to discover stability and performance problems.<\/li>\r\n\r\n\r\n\r\n<li>For strong security measures, do a security stress test.<\/li>\r\n\r\n\r\n\r\n<li>Combine methods to evaluate robustness, scalability, and performance comprehensively.<\/li>\r\n<\/ul>\r\n<p>\r\n\r\n<\/p>\r\n","protected":false},"excerpt":{"rendered":"<p>Event streams are similar to digital root systems in business, capturing real-time data from multiple sources for further processing and analysis.<\/p>\n","protected":false},"author":242684335,"featured_media":18465,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"_coblocks_attr":"","_coblocks_dimensions":"","_coblocks_responsive_height":"","_coblocks_accordion_ie_support":"","jetpack_post_was_ever_published":false,"_jetpack_newsletter_access":"","_jetpack_dont_email_post_to_subs":false,"_jetpack_newsletter_tier_id":0,"_jetpack_memberships_contains_paywalled_content":false,"_jetpack_memberships_contains_paid_content":false,"footnotes":"","jetpack_publicize_message":"","jetpack_publicize_feature_enabled":true,"jetpack_social_post_already_shared":true,"jetpack_social_options":{"image_generator_settings":{"template":"highway","enabled":false},"version":2}},"categories":[768739351],"tags":[768739344,207392,343865,768739407],"jetpack_publicize_connections":[],"jetpack_featured_media_url":"https:\/\/opstree.com\/blog\/wp-content\/uploads\/2024\/05\/Deploying-a-Production-Ready-Kafka-Cluster-on-Kubernetes-with-Strimzi-4.png","jetpack_likes_enabled":true,"jetpack_sharing_enabled":true,"jetpack_shortlink":"https:\/\/wp.me\/pfDBOm-4N7","jetpack-related-posts":[],"_links":{"self":[{"href":"https:\/\/opstree.com\/blog\/wp-json\/wp\/v2\/posts\/18421"}],"collection":[{"href":"https:\/\/opstree.com\/blog\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/opstree.com\/blog\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/opstree.com\/blog\/wp-json\/wp\/v2\/users\/242684335"}],"replies":[{"embeddable":true,"href":"https:\/\/opstree.com\/blog\/wp-json\/wp\/v2\/comments?post=18421"}],"version-history":[{"count":16,"href":"https:\/\/opstree.com\/blog\/wp-json\/wp\/v2\/posts\/18421\/revisions"}],"predecessor-version":[{"id":30907,"href":"https:\/\/opstree.com\/blog\/wp-json\/wp\/v2\/posts\/18421\/revisions\/30907"}],"wp:featuredmedia":[{"embeddable":true,"href":"https:\/\/opstree.com\/blog\/wp-json\/wp\/v2\/media\/18465"}],"wp:attachment":[{"href":"https:\/\/opstree.com\/blog\/wp-json\/wp\/v2\/media?parent=18421"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/opstree.com\/blog\/wp-json\/wp\/v2\/categories?post=18421"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/opstree.com\/blog\/wp-json\/wp\/v2\/tags?post=18421"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}