{"id":30170,"date":"2025-12-09T15:09:43","date_gmt":"2025-12-09T09:39:43","guid":{"rendered":"https:\/\/opstree.com\/blog\/?p=30170"},"modified":"2025-12-09T15:18:28","modified_gmt":"2025-12-09T09:48:28","slug":"azure-event-hubs-real-time-streaming","status":"publish","type":"post","link":"https:\/\/opstree.com\/blog\/2025\/12\/09\/azure-event-hubs-real-time-streaming\/","title":{"rendered":"Azure Event Hubs Explained for Real Time Data Streaming"},"content":{"rendered":"<p>Azure Event Hubs is a <strong>fully managed, cloud-based data streaming platform<\/strong> designed for real-time event ingestion and processing at massive scale.<\/p>\n<p>It serves as the &#8220;front door&#8221; for event-driven architectures, capable of handling <strong>millions of events per second<\/strong> with low latency.<!--more--><\/p>\n<p><!-- Table of Contents --><\/p>\n<div class=\"toc-container\" style=\"border: 1px solid #e5e7eb; padding: 16px; border-radius: 10px; background: #f9fafb;\">\n<h3 style=\"margin-top: 0;\">Table of Contents<\/h3>\n<ul style=\"list-style: none; padding-left: 0; margin: 0;\">\n<li><a href=\"#components-of-event-hub\">1. Components of Event Hub<\/a><\/li>\n<li><a href=\"#important-notes\">2. Important Notes<\/a><\/li>\n<li><a href=\"#advanced-features\">3. Advanced Features<\/a><\/li>\n<li><a href=\"#service-tiers\">4. Service Tiers<\/a><\/li>\n<li><a href=\"#goal-reserve-consumer\">5. Goal: Reserve the Consumer for Partition Handling in EventHub<\/a><\/li>\n<li><a href=\"#flaws\">6. Flaws<\/a><\/li>\n<li><a href=\"#checkpointing\">7. Checkpointing<\/a><\/li>\n<li><a href=\"#better-approach\">8. Better Approach<\/a><\/li>\n<li><a href=\"#new-consumer-script\">9. New Consumer Script<\/a><\/li>\n<li><a href=\"#realistic-setup\">10. Realistic Setup<\/a><\/li>\n<li><a href=\"#one-cons-of-eventhub\">11. One Cons of EventHub<\/a><\/li>\n<li><a href=\"#summary\">12. Summary<\/a><\/li>\n<\/ul>\n<\/div>\n<h2 id=\"components-of-event-hub\">Components of Event Hub<\/h2>\n<h3>Producer<\/h3>\n<p>So we discussed in the main definition that EventHub is the streaming platform, but from where it gets all this data from?<\/p>\n<p><strong>Producer<\/strong> is responsible for providing\/producing this data.<\/p>\n<p>A Producer can comprise a single source of data or multiple data sources for example:<\/p>\n<ul>\n<li>Streaming data<\/li>\n<li>Logs<\/li>\n<li>Files<\/li>\n<li>Media<\/li>\n<li>Weather Data<\/li>\n<li>IoT sensor data, etc.<\/li>\n<\/ul>\n<h3>Event Hub<\/h3>\n<p>Now comes the main component that processes all this data &#8211; i.e., <strong>EventHub<\/strong> itself.<\/p>\n<p>EventHub is the main processing unit that divides this whole data into various <strong>Partitions<\/strong>.<\/p>\n<p><strong>Partitions<\/strong> are scale units that work like \u201cfreeway lanes\u201d \u2014 add more partitions for higher throughput.<\/p>\n<p>You can assign a maximum of <strong>32 partitions<\/strong> under one EventHub.<\/p>\n<p>It\u2019s not like we can create only a single EventHub \u2014 we can create <strong>multiple EventHubs<\/strong> if higher-level processing is required.<\/p>\n<p>Multiple EventHubs can be created under a <strong>Namespace<\/strong>.<\/p>\n<p>A Namespace is nothing but a <strong>logical collection of EventHubs<\/strong>.<\/p>\n<h3>Consumers<\/h3>\n<p>Now there are <strong>Consumers<\/strong> who will consume this data.<\/p>\n<p>These can be any application that requires this data, or any Azure service such as:<\/p>\n<ul>\n<li><strong>Azure Stream Analytics<\/strong><\/li>\n<li><strong>Azure Data Explorer<\/strong>, etc.<\/li>\n<\/ul>\n<p>Just like there are multiple partitions under one EventHub, there can be multiple <strong>Consumers<\/strong> under one <strong>Consumer Group<\/strong>, and similarly, multiple <strong>Consumer Groups<\/strong> can exist as well.<\/p>\n<article class=\"text-token-text-primary w-full focus:outline-none [--shadow-height:45px] has-data-writing-block:pointer-events-none has-data-writing-block:-mt-(--shadow-height) has-data-writing-block:pt-(--shadow-height) [&amp;:has([data-writing-block])&gt;*]:pointer-events-auto scroll-mt-[calc(var(--header-height)+min(200px,max(70px,20svh)))]\" dir=\"auto\" tabindex=\"-1\" data-turn-id=\"request-WEB:e298d87a-fe9c-4540-a152-da74fef83823-13\" data-testid=\"conversation-turn-8\" data-scroll-anchor=\"true\" data-turn=\"assistant\">\n<div class=\"text-base my-auto mx-auto pb-10 [--thread-content-margin:--spacing(4)] @w-sm\/main:[--thread-content-margin:--spacing(6)] @w-lg\/main:[--thread-content-margin:--spacing(16)] px-(--thread-content-margin)\">\n<div class=\"[--thread-content-max-width:40rem] @w-lg\/main:[--thread-content-max-width:48rem] mx-auto max-w-(--thread-content-max-width) flex-1 group\/turn-messages focus-visible:outline-hidden relative flex w-full min-w-0 flex-col agent-turn\" tabindex=\"-1\">\n<div class=\"flex max-w-full flex-col grow\">\n<div class=\"min-h-8 text-message relative flex w-full flex-col items-end gap-2 text-start break-words whitespace-normal [.text-message+&amp;]:mt-1\" dir=\"auto\" data-message-author-role=\"assistant\" data-message-id=\"f6b1692b-d333-4274-a6c6-d538f415391e\" data-message-model-slug=\"gpt-5-1\">\n<div class=\"flex w-full flex-col gap-1 empty:hidden first:pt-[1px]\">\n<div class=\"markdown prose dark:prose-invert w-full break-words light markdown-new-styling\">\n<article class=\"text-token-text-primary w-full focus:outline-none [--shadow-height:45px] has-data-writing-block:pointer-events-none has-data-writing-block:-mt-(--shadow-height) has-data-writing-block:pt-(--shadow-height) [&amp;:has([data-writing-block])&gt;*]:pointer-events-auto scroll-mt-[calc(var(--header-height)+min(200px,max(70px,20svh)))]\" dir=\"auto\" tabindex=\"-1\" data-turn-id=\"request-WEB:e298d87a-fe9c-4540-a152-da74fef83823-15\" data-testid=\"conversation-turn-12\" data-scroll-anchor=\"true\" data-turn=\"assistant\">\n<div class=\"text-base my-auto mx-auto pb-10 [--thread-content-margin:--spacing(4)] @w-sm\/main:[--thread-content-margin:--spacing(6)] @w-lg\/main:[--thread-content-margin:--spacing(16)] px-(--thread-content-margin)\">\n<div class=\"[--thread-content-max-width:40rem] @w-lg\/main:[--thread-content-max-width:48rem] mx-auto max-w-(--thread-content-max-width) flex-1 group\/turn-messages focus-visible:outline-hidden relative flex w-full min-w-0 flex-col agent-turn\" tabindex=\"-1\">\n<div class=\"flex max-w-full flex-col grow\">\n<div class=\"min-h-8 text-message relative flex w-full flex-col items-end gap-2 text-start break-words whitespace-normal [.text-message+&amp;]:mt-1\" dir=\"auto\" data-message-author-role=\"assistant\" data-message-id=\"8397c544-c0ac-4cc9-bef8-7aa2bb015b46\" data-message-model-slug=\"gpt-5-1\">\n<div class=\"flex w-full flex-col gap-1 empty:hidden first:pt-[1px]\">\n<div class=\"markdown prose dark:prose-invert w-full break-words light markdown-new-styling\">\n<p data-start=\"108\" data-end=\"225\" data-is-last-node=\"\" data-is-only-node=\"\">Trusted <a href=\"https:\/\/opstree.com\/services\/database-and-data-engineering\/\" target=\"_blank\" rel=\"noopener\"><strong>data engineering services provider<\/strong><\/a> for scalable Azure Event Hubs pipelines and real time data processing.<\/p>\n<\/div>\n<\/div>\n<\/div>\n<\/div>\n<\/div>\n<\/div>\n<\/article>\n<\/div>\n<\/div>\n<\/div>\n<\/div>\n<\/div>\n<\/div>\n<\/article>\n<h2 id=\"important-notes\">Important Notes<\/h2>\n<h3>1. Within the Same Consumer Group<\/h3>\n<p>Each <strong>partition<\/strong> can be <strong>owned by only one consumer at a time<\/strong> within the same consumer group.<\/p>\n<p>So:<\/p>\n<ul>\n<li>If <strong>Consumer-1<\/strong> in <strong>Group-A<\/strong> is reading from <strong>Partition-0<\/strong>, then <strong>Consumer-2<\/strong> in <strong>Group-A<\/strong> <strong>cannot<\/strong> read from <strong>Partition-0<\/strong> at the same time.<\/li>\n<li>If <strong>Consumer-1<\/strong> stops, then <strong>Partition-0<\/strong> can be reassigned to <strong>Consumer-2<\/strong> (still in <strong>Group-A<\/strong>).<\/li>\n<\/ul>\n<h3><strong>2. Across Different Consumer Groups<\/strong><\/h3>\n<p>Partitions <strong>can be consumed simultaneously<\/strong> by consumers in <strong>different consumer groups<\/strong>.<\/p>\n<p>There\u2019s <strong>no restriction<\/strong> across groups &#8211; Event Hubs (or Kafka) treat each group as an <strong>independent subscription<\/strong> to the same event stream.<\/p>\n<p>So:<\/p>\n<ul>\n<li><strong>Consumer-1 (Group-A)<\/strong> and <strong>Consumer-3 (Group-B)<\/strong> can both read <strong>Partition-0<\/strong> <strong>at the same time<\/strong>, independently.<\/li>\n<li>They each maintain their own <strong>offset\/checkpoint<\/strong>, so they don\u2019t interfere with each other.<\/li>\n<\/ul>\n<h2 id=\"advanced-features\"><strong>Advanced Features<\/strong><\/h2>\n<h3><strong>Event Hubs Capture<\/strong><\/h3>\n<ul>\n<li><strong>Definition:<\/strong> If you want to keep track of the usage and store that data on Azure Blob Storage or <a href=\"https:\/\/opstree.com\/blog\/2025\/11\/11\/complete-guide-to-data-pipelines\/\" target=\"_blank\" rel=\"noopener\">Data Lake<\/a> you can enable this option.<\/li>\n<li><strong>Automatic Storage:<\/strong> Capture streaming data to Azure Blob Storage or Data Lake.<\/li>\n<li><strong>Retention:<\/strong> Configure time-based data retention policies.<\/li>\n<li><strong>Batch Processing:<\/strong> Enable micro-batch analytics on captured data.<\/li>\n<\/ul>\n<h3><strong>Throughput Units<\/strong><\/h3>\n<ul>\n<li><strong>Definition:<\/strong> Higher the Throughput Units, higher the processing.<\/li>\n<\/ul>\n<p><strong>Standard Tier Metrics:<\/strong><\/p>\n<ul>\n<li><strong>Ingress:<\/strong> Up to 1 MB\/second or 1,000 events\/second per unit.<\/li>\n<li><strong>Egress:<\/strong> Up to 2 MB\/second per unit.<\/li>\n<\/ul>\n<h3><strong>Auto-Inflate<\/strong><\/h3>\n<ul>\n<li><strong>Definition:<\/strong> If you want higher processing you can increase the number of throughput units, but if you don\u2019t know how many throughput units you want, you can enable this option and it will automatically adjust based on usage.<\/li>\n<li><strong>Dynamic Scaling:<\/strong> Automatically adjust throughput units based on demand.<\/li>\n<li><strong>Cost Optimization:<\/strong> Pay only for what you use.<\/li>\n<li><strong>No Manual Intervention:<\/strong> Seamless scaling without configuration changes.<\/li>\n<\/ul>\n<div style=\"border: 1px solid #d1d5db; padding: 16px; margin: 20px 0; background-color: #f0f4f8;\">\n<p style=\"margin: 0; font-weight: 600; font-size: 16px;\">Also Read &#8211; <a href=\"https:\/\/opstree.com\/blog\/2025\/11\/15\/real-time-data-processing\/\">Real-Time Data Processing: Why Stream Data is the Future of Business Decisions<\/a><\/p>\n<\/div>\n<h2 id=\"service-tiers\">Service Tiers<\/h2>\n<h3>Standard Tier<\/h3>\n<p>Basic event streaming for moderate throughput requirements.<\/p>\n<h3>Premium Tier<\/h3>\n<p>Enhanced performance with dedicated processing units and network isolation.<\/p>\n<h3>Dedicated Tier<\/h3>\n<p>Single-tenant deployments for maximum performance and compliance requirements.<\/p>\n<h2 id=\"goal-reserve-consumer\">Goal: Reserve the Consumer for Partition Handling in EventHub<\/h2>\n<h3><strong>Pre-requisite<\/strong><\/h3>\n<ul>\n<li>Account in Azure.<\/li>\n<li>Knowledge about Event Hubs.<\/li>\n<\/ul>\n<h3><strong>Project Summary<\/strong><\/h3>\n<p>There are 2 scripts:<\/p>\n<ul>\n<li><code>producer.py<\/code> \u2192 generates sample data<\/li>\n<li><code>consumer.py<\/code> \u2192 acts as a consumer<\/li>\n<\/ul>\n<p>Between them lies the <strong>EventHub<\/strong>.<\/p>\n<div style=\"background: #1e1e1e; padding: 16px; border-radius: 6px; overflow-x: auto; font-family: 'Courier New', monospace; color: #d4d4d4; font-size: 14px; line-height: 1.5; margin: 20px 0;\">\n<pre style=\"margin: 0; white-space: pre;\"># producer.py\r\nimport time\r\nimport uuid\r\nfrom datetime import datetime, timezone\r\nfrom azure.eventhub import EventHubProducerClient, EventData\r\nimport json\r\nimport os\r\n\r\n# ============================================================== \r\n#  CONFIGURATION\r\n# ==============================================================\r\n\r\nEVENTHUB_CONNECTION_STR = \"Endpoint=sb:\/\/ns-for-testing.servicebus.windows.net\/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=qniF\/Ih8uLQEKGxcQLiFQhdsHcx8X6vku+AEhIpSrtU=;EntityPath=triall\"\r\nEVENTHUB_NAME = \"triall\"\r\n\r\n# ============================================================== \r\n#  PRODUCER\r\n# ==============================================================\r\n\r\nclass EventHubProducerDemo:\r\n    def __init__(self, connection_string, eventhub_name):\r\n        self.producer = EventHubProducerClient.from_connection_string(\r\n            conn_str=connection_string,\r\n            eventhub_name=eventhub_name\r\n        )\r\n\r\n    def generate_message(self, msg_id):\r\n        \"\"\"Generate message with unique event_id for idempotency\"\"\"\r\n        return {\r\n            \"event_id\": str(uuid.uuid4()),\r\n            \"message_id\": msg_id,\r\n            \"timestamp\": datetime.now(timezone.utc).isoformat(),\r\n            \"value\": msg_id * 10,\r\n            \"data\": f\"Test message #{msg_id}\"\r\n        }\r\n\r\n    def send_messages(self, count=10, interval=1):\r\n        \"\"\"Send messages alternating between partitions\"\"\"\r\n        with self.producer:\r\n            partitions = self.producer.get_partition_ids()\r\n            print(f\" Available partitions: {partitions}\")\r\n            print(f\" Sending {count} messages\")\r\n            print(\"=\"*60)\r\n\r\n            for i in range(1, count + 1):\r\n                data = self.generate_message(i)\r\n                partition_id = partitions[(i - 1) % len(partitions)]\r\n                \r\n                event_data = EventData(json.dumps(data))\r\n                batch = self.producer.create_batch(partition_id=partition_id)\r\n                batch.add(event_data)\r\n                \r\n                self.producer.send_batch(batch)\r\n                \r\n                print(f\" MESSAGE-{i} \u2192 PARTITION-{partition_id} \")\r\n                print(f\"   Event ID: {data['event_id']}\")\r\n                \r\n                time.sleep(interval)\r\n\r\n            print(f\"\\n Sent all {count} messages!\")\r\n            print(f\" Distribution: {len(partitions)} partition(s)\")\r\n\r\n# ============================================================== \r\n#  ENTRY POINT\r\n# ==============================================================\r\n\r\nif __name__ == \"__main__\":\r\n    import sys\r\n\r\n    count = int(sys.argv[1]) if len(sys.argv) &gt; 1 else 10\r\n    interval = float(sys.argv[2]) if len(sys.argv) &gt; 2 else 1\r\n\r\n    print(f\"\\n{'#'*60}\")\r\n    print(f\"Event Hub Producer - POC\")\r\n    print(f\"{'#'*60}\")\r\n    print(f\" Event Hub: {EVENTHUB_NAME}\")\r\n    print(f\" Messages to send: {count}\")\r\n    print(f\" Interval: {interval}s\")\r\n    print(f\"{'#'*60}\\n\")\r\n\r\n    producer = EventHubProducerDemo(EVENTHUB_CONNECTION_STR, EVENTHUB_NAME)\r\n    producer.send_messages(count, interval)<\/pre>\n<\/div>\n<div style=\"background: #1e1e1e; padding: 16px; border-radius: 6px; overflow-x: auto; font-family: 'Courier New', monospace; color: #d4d4d4; font-size: 14px; line-height: 1.5; margin: 20px 0;\">\n<pre style=\"margin: 0; white-space: pre;\"># checkpoint_consumer.py (Blob Storage Version)\r\nimport asyncio\r\nimport sys\r\nfrom azure.eventhub.aio import EventHubConsumerClient\r\nfrom azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore\r\nimport json\r\n\r\n# ============= CONFIGURATION =============\r\nEVENTHUB_CONNECTION_STR = \"Endpoint=sb:\/\/namespace-for-testing.servicebus.windows.net\/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=h5d6nthEG9eZid6SowfOWMGoq5FehOn7g+AEhBuY6VU=;EntityPath=trial\"\r\nEVENTHUB_NAME = \"trial\"\r\nSTORAGE_CONNECTION_STR = \"DefaultEndpointsProtocol=https;AccountName=123storagefortesting;AccountKey=+0g3j1xa8ZNdCaf38lcpmc5aBwX9AE\/jc2DlmFjV0om5d2tgcgJlFvkHltiHz3Be4RHiu3U1CdUt+AStAM+5Ew==;EndpointSuffix=core.windows.net\"\r\nCHECKPOINT_CONTAINER = \"test\"\r\nCONSUMER_GROUP = \"$Default\"\r\n\r\nclass CheckpointConsumer:\r\n    def __init__(self, partition_id, consumer_name):\r\n        self.partition_id = partition_id\r\n        self.consumer_name = consumer_name\r\n        self.event_count = 0\r\n\r\n    async def on_event(self, partition_context, event):\r\n        \"\"\"Process events\"\"\"\r\n        try:\r\n            if event is None:\r\n                return\r\n\r\n            event_body = event.body_as_str()\r\n            event_data = json.loads(event_body)\r\n            self.event_count += 1\r\n\r\n            print(f\"\\n{'='*60}\")\r\n            print(f\" {self.consumer_name} - Event #{self.event_count}\")\r\n            print(f\"{'='*60}\")\r\n            print(f\" Partition: {partition_context.partition_id}\")\r\n            print(f\" Event ID: {event_data.get('event_id', 'N\/A')}\")\r\n            print(f\" Message ID: {event_data.get('message_id', 'N\/A')}\")\r\n            print(f\" Timestamp: {event_data.get('timestamp', 'N\/A')}\")\r\n            print(f\" Sequence #: {event.sequence_number}\")\r\n            print(f\" Offset: {event.offset}\")\r\n            print(f\" Data: {event_data.get('data', event_body[:100])}\")\r\n\r\n            await asyncio.sleep(0.1)\r\n            await partition_context.update_checkpoint(event)\r\n            print(f\" Checkpoint saved to blob (offset: {event.offset})\")\r\n            print(f\"{'='*60}\")\r\n\r\n        except json.JSONDecodeError:\r\n            print(\" Non-JSON message\")\r\n\r\n        except Exception as e:\r\n            print(f\" Error: {e}\")\r\n\r\n    async def start(self):\r\n        \"\"\"Start consumer with blob checkpoint\"\"\"\r\n        print(f\"\\n{'#'*60}\")\r\n        print(f\" Starting {self.consumer_name} (BLOB CHECKPOINTING)\")\r\n        print(f\"{'#'*60}\")\r\n\r\n        checkpoint_store = BlobCheckpointStore.from_connection_string(\r\n            conn_str=STORAGE_CONNECTION_STR,\r\n            container_name=CHECKPOINT_CONTAINER\r\n        )\r\n\r\n        client = EventHubConsumerClient.from_connection_string(\r\n            conn_str=EVENTHUB_CONNECTION_STR,\r\n            consumer_group=CONSUMER_GROUP,\r\n            checkpoint_store=checkpoint_store\r\n        )\r\n\r\n        props = await client.get_eventhub_properties()\r\n        all_partitions = props['partition_ids']\r\n\r\n        if self.partition_id not in all_partitions:\r\n            print(f\" Partition '{self.partition_id}' not found!\")\r\n            return\r\n\r\n        async with client:\r\n            await client.receive(\r\n                on_event=self.on_event,\r\n                partition_id=self.partition_id,\r\n                starting_position=\"-1\",\r\n                max_wait_time=5\r\n            )\r\n\r\nasync def main():\r\n    if len(sys.argv) &lt; 2:\r\n        print(\"\\nUsage: python checkpoint_consumer.py  [consumer_name]\")\r\n        sys.exit(1)\r\n\r\n    partition_id = sys.argv[1]\r\n    consumer_name = sys.argv[2] if len(sys.argv) &gt; 2 else f\"Consumer-P{partition_id}\"\r\n\r\n    consumer = CheckpointConsumer(partition_id, consumer_name)\r\n    await consumer.start()\r\n\r\nif __name__ == \"__main__\":\r\n    asyncio.run(main())\r\n<\/pre>\n<\/div>\n<h3>#<strong>How to Use These Scripts<\/strong><\/h3>\n<h5>producer.py<\/h5>\n<p>Run in terminal:<\/p>\n<p><code>python producer.py 10 2<\/code><\/p>\n<p>This will produce <strong>10 events<\/strong> with a delay of <strong>2 seconds<\/strong>.<\/p>\n<h5><strong>consumer.py<\/strong><\/h5>\n<p>Open two terminals:<\/p>\n<p><code>python consumer.py 0 Consumer-A python consumer.py 1 Consumer-B<\/code><\/p>\n<p>This creates two consumers \u2014 <strong>Consumer-A<\/strong> and <strong>Consumer-B<\/strong> \u2014 and assigns them <strong>Partition-0<\/strong> and <strong>Partition-1<\/strong> respectively.<\/p>\n<h3><strong>Execution<\/strong><\/h3>\n<p>As we can see on the left side, our <code>producer.py<\/code> script runs and sends sample events to EventHub.<\/p>\n<p>Since we have chosen which consumer will read from which partition, the consumers start consuming from their respective partitions.<\/p>\n<p>This is a good demo to see how the data transfer works from <strong>producer \u2192 EventHub \u2192 consumer<\/strong>.<\/p>\n<h2 id=\"flaws\"><strong>Flaws<\/strong><\/h2>\n<ul>\n<li>Nothing is perfect.<\/li>\n<li>Though this demo demonstrates how EventHub works, it has some flaws.<\/li>\n<li>The biggest flaw is that we have <strong>fixed a consumer to a partition<\/strong>.<\/li>\n<li>Now you might be thinking \u2014 what\u2019s wrong in that?<\/li>\n<li>The issue appears in case of <strong>consumer failure<\/strong>.<\/li>\n<li>If <strong>Consumer-A<\/strong> fails, then data from <strong>Partition-0<\/strong> has nowhere to go.<\/li>\n<li>In that case, we must manually start another consumer and point it to <strong>Partition-0<\/strong>.<\/li>\n<li>This manual start\/stop process and lack of automatic load balancing can lead to <a href=\"https:\/\/opstree.com\/blog\/2024\/12\/27\/can-cloud-data-be-hacked\/\" target=\"_blank\" rel=\"noopener\"><strong>data loss<\/strong><\/a>.<\/li>\n<\/ul>\n<p>Now here comes the concept of <strong>Checkpointing<\/strong>.<\/p>\n<h2 id=\"checkpointing\"><strong>Checkpointing<\/strong><\/h2>\n<p>Checkpointing refers to <strong>creating checkpoints in <a href=\"https:\/\/opstree.com\/services\/database-and-data-engineering\/\" target=\"_blank\" rel=\"noopener\">Azure Storage Service<\/a><\/strong> for each partition.<\/p>\n<p>If a consumer fails, a new consumer can <strong>resume from the last checkpoint<\/strong> of the previous one.<\/p>\n<p>A checkpoint is created <strong>after the message is completely consumed<\/strong>.<\/p>\n<p>Checkpointing is implemented in this script, but we still need to <strong>manually configure the new consumer<\/strong>.<\/p>\n<h3><strong>Why Implement This Despite the Flaw?<\/strong><\/h3>\n<p>The only motive of doing this is to show the <strong>consumer-partition relationship<\/strong> and how messages travel from <strong>producer \u2192 partition \u2192 consumer<\/strong>.<\/p>\n<h2 id=\"better-approach\"><strong>Better Approach<\/strong><\/h2>\n<p>This time, we\u2019ll use the same <strong>Producer Script<\/strong>, but the <strong>Consumer Script<\/strong> will let <strong>EventHub manage everything automatically<\/strong>.<\/p>\n<h2 id=\"new-consumer-script\"><strong>New Consumer Script<\/strong><\/h2>\n<p>Below is the updated consumer script (<code>AutoLoadBalancedConsumer<\/code>):<\/p>\n<div style=\"background: #1e1e1e; padding: 16px; border-radius: 6px; overflow-x: auto; font-family: 'Courier New', monospace; color: #d4d4d4; font-size: 14px; line-height: 1.5; margin: 20px 0;\">\n<pre style=\"margin: 0; white-space: pre;\"># Everything is managed by EventHub \r\n\r\nimport asyncio  # Python async programming\r\nimport sys      # System arguments\r\nfrom azure.eventhub.aio import EventHubConsumerClient\r\nfrom azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore\r\nimport json\r\n\r\n# ============= CONFIGURATION =============\r\nEVENTHUB_CONNECTION_STR = \"Endpoint=sb:\/\/ns-for-testing.servicebus.windows.net\/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=K7\/9j0Tf+\/zngmgONFMub\/0F3uPxqBAjz+AEhMtbYPI=;EntityPath=trial\"\r\nEVENTHUB_NAME = \"trial\"\r\nSTORAGE_CONNECTION_STR = \"DefaultEndpointsProtocol=https;AccountName=safortestingg;AccountKey=n4VUZDhmJjni\/XnOS5NSsGnXYOEMKwhevkDh1RvrSZ5OIDHriTH2rvoBCpR98n1t+5t9pyAXDxfq+AStNty0wg==;EndpointSuffix=core.windows.net\"\r\nCHECKPOINT_CONTAINER = \"test\"\r\nCONSUMER_GROUP = \"$Default\"\r\n\r\nclass AutoLoadBalancedConsumer:\r\n    def __init__(self, consumer_name):\r\n        self.consumer_name = consumer_name\r\n        self.event_count = 0\r\n        self.partition_counts = {}  # Track events per partition\r\n\r\n    async def on_event(self, partition_context, event):\r\n        \"\"\"Event handler - called for each event\"\"\"\r\n        try:\r\n            if event is None:\r\n                return\r\n\r\n            partition_id = partition_context.partition_id\r\n\r\n            if partition_id not in self.partition_counts:\r\n                self.partition_counts[partition_id] = 0\r\n\r\n            self.partition_counts[partition_id] += 1\r\n            self.event_count += 1\r\n\r\n            event_body = event.body_as_str()\r\n            event_data = json.loads(event_body)\r\n\r\n            print(f\"\\n{'='*60}\")\r\n            print(f\" {self.consumer_name} - Event #{self.event_count}\")\r\n            print(f\"{'='*60}\")\r\n            print(f\" Partition: {partition_id} (Auto-Assigned)\")\r\n            print(f\" Event ID: {event_data.get('event_id')}\")\r\n            print(f\" Message ID: {event_data.get('message_id')}\")\r\n            print(f\" Sequence: {event.sequence_number}\")\r\n            print(f\" Offset: {event.offset}\")\r\n\r\n            await asyncio.sleep(0.1)\r\n            await partition_context.update_checkpoint(event)\r\n            print(f\"\u2705 Checkpoint saved\")\r\n            print(f\" Partition Distribution: {self.partition_counts}\")\r\n            print(f\"{'='*60}\")\r\n\r\n        except Exception as e:\r\n            print(f\"\u274c Error: {e}\")\r\n\r\n    async def on_partition_initialize(self, partition_context):\r\n        print(f\"\\n {self.consumer_name} - CLAIMED PARTITION {partition_context.partition_id}\")\r\n\r\n    async def on_partition_close(self, partition_context, reason):\r\n        print(f\"\\n {self.consumer_name} - RELEASED PARTITION {partition_context.partition_id}\")\r\n        print(f\"  Reason: {reason}\")\r\n\r\n    async def on_error(self, partition_context, error):\r\n        if partition_context:\r\n            print(f\"\u274c Error on partition {partition_context.partition_id}: {error}\")\r\n        else:\r\n            print(f\"\u274c Error: {error}\")\r\n\r\n    async def start(self):\r\n        print(f\"\\n{'#'*60}\")\r\n        print(f\" {self.consumer_name} - AUTO LOAD-BALANCED CONSUMER\")\r\n        print(f\"{'#'*60}\")\r\n        print(f\" Event Hub: {EVENTHUB_NAME}\")\r\n        print(f\" Checkpoint Store: Azure Blob\")\r\n        print(f\" Load Balancing: FAST MODE (5s intervals)\")\r\n        print(f\" Ownership Expiry: 10s\\n\")\r\n        print(f\" Waiting for Event Hub to assign partitions...\\n\")\r\n\r\n        try:\r\n            checkpoint_store = BlobCheckpointStore.from_connection_string(\r\n                STORAGE_CONNECTION_STR,\r\n                CHECKPOINT_CONTAINER\r\n            )\r\n\r\n            client = EventHubConsumerClient.from_connection_string(\r\n                conn_str=EVENTHUB_CONNECTION_STR,\r\n                consumer_group=CONSUMER_GROUP,\r\n                eventhub_name=EVENTHUB_NAME,\r\n                checkpoint_store=checkpoint_store,\r\n                load_balancing_interval=5,\r\n                partition_ownership_expiration_interval=10\r\n            )\r\n\r\n            async with client:\r\n                await client.receive(\r\n                    on_event=self.on_event,\r\n                    on_partition_initialize=self.on_partition_initialize,\r\n                    on_partition_close=self.on_partition_close,\r\n                    on_error=self.on_error\r\n                )\r\n\r\n        except KeyboardInterrupt:\r\n            print(f\"\\n\u26a0\ufe0f {self.consumer_name} stopped by user\")\r\n            print(f\" Final Stats: Events={self.event_count}, Distribution={self.partition_counts}\")\r\n\r\n        except Exception as e:\r\n            print(f\"\\n\u274c Error: {e}\")\r\n\r\nasync def main():\r\n    if len(sys.argv) &lt; 2:\r\n        print(\"\\n\u26a0\ufe0f Usage: python eventprocessor_consumer.py &lt;consumer_name&gt;\")\r\n        sys.exit(1)\r\n\r\n    consumer_name = sys.argv[1]\r\n    consumer = AutoLoadBalancedConsumer(consumer_name)\r\n    await consumer.start()\r\n\r\nif __name__ == \"__main__\":\r\n    asyncio.run(main())<\/pre>\n<\/div>\n<p><strong>Key Implementations<\/strong><\/p>\n<p>The above script implements the following:<\/p>\n<ul>\n<li><strong>Checkpointing<\/strong><\/li>\n<li><strong>Automatic Load Balancing<\/strong> in case of any failure(handled by <code>EventHubConsumerClient<\/code>)<\/li>\n<li><strong>Full EventHub SDK Integration<\/strong><\/li>\n<\/ul>\n<p>Hence, this is a complete script that implements all the main functionalities of EventHub.<\/p>\n<h3><strong>Batch Processing<\/strong><\/h3>\n<p>Till now, we were consuming only single events.<\/p>\n<p>However, there might be a use-case where we want to <strong>consume events in batches<\/strong> \u2014 this can also be implemented by modifying the code accordingly.<\/p>\n<h2 id=\"realistic-setup\"><strong>Realistic Setup<\/strong><\/h2>\n<p>Now let\u2019s discuss in short what a <strong>production-level setup<\/strong> looks like.<\/p>\n<ul>\n<li><strong>Producers:<\/strong> Multiple producers send different types of data.<\/li>\n<li><strong>Namespaces \/ EventHubs:<\/strong> Multiple namespaces to categorize data &#8211; e.g., Payments, Messaging, Delivery, etc. Each namespace can have multiple EventHubs.<\/li>\n<li><strong>Partitions:<\/strong> In production, the number of partitions is a crucial decision because it directly affects <strong>throughput and cost<\/strong>.<\/li>\n<li><strong>Consumer Groups:<\/strong> Multiple consumer groups can consume from the same event stream independently.<\/li>\n<li><strong>Database Integration:<\/strong> We can integrate any <a href=\"https:\/\/opstree.com\/blog\/2023\/12\/07\/top-10-databases-for-web-applications\/\" target=\"_blank\" rel=\"noopener\">database<\/a> or cache-management system like Redis to make sure only one-time consumption of messages\/events.<\/li>\n<\/ul>\n<h2 id=\"one-cons-of-eventhub\">One Cons of EventHub<\/h2>\n<p><a href=\"https:\/\/learn.microsoft.com\/en-us\/azure\/event-hubs\/event-hubs-about\" target=\"_blank\" rel=\"noopener\">EventHub<\/a> guarantees the processing of all events, but does not guarantee only one-time processing of an event. There might be a scenario where the event is processed more than one time.<\/p>\n<p>Suppose an event is processed but just before it getting checkpointed, the process failed. When the checkpointing failed, as soon as a new consumer starts consuming it will refer to checkpoints.<\/p>\n<p>But when it will not spot the checkpoint of this event, it will again process it though it was already processed it was just that its checkpoint was not made.<\/p>\n<p>As we have discussed above to counter these type of scenarios we can use DBs or cache-management systems like Redis to create a second checkpoint in case of EventHub checkpointing failed.<\/p>\n<p>Its like a two step verification before processing an event, though it will be an addition of an engineering component make the system more complex, but will also ensure only one time processing, its helpful in cases where we cannot afford multiple processing of a single event.<\/p>\n<h2 id=\"summary\"><strong>Summary:<\/strong><\/h2>\n<p>This end-to-end setup demonstrates how data flows from <strong>Producer \u2192 EventHub \u2192 Consumer<\/strong>, and how <strong>checkpointing, load balancing, and scaling<\/strong> work together to ensure reliability and high throughput in real-world EventHub systems.<\/p>\n<p>Related Searches &#8211; <a href=\"https:\/\/opstree.com\/\" target=\"_blank\" rel=\"noopener\">DevOps Consulting Services<\/a> | <a href=\"https:\/\/opstree.com\/services\/devops-and-devsecops-services\/\" target=\"_blank\" rel=\"noopener\">Shift-left security solutions<\/a> | <a href=\"https:\/\/opstree.com\/services\/cloud-migration-and-modernization-services\/\" target=\"_blank\" rel=\"noopener\">Hybrid Cloud Migration Services<\/a><\/p>\n<p><!-- notionvc: 2a37d287-1d04-442e-9c16-01aaeddbc626 --><\/p>\n","protected":false},"excerpt":{"rendered":"<p>Azure Event Hubs is a fully managed, cloud-based data streaming platform designed for real-time event ingestion and processing at massive scale. It serves as the &#8220;front door&#8221; for event-driven architectures, capable of handling millions of events per second with low latency.<\/p>\n","protected":false},"author":244582714,"featured_media":30175,"comment_status":"closed","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"_coblocks_attr":"","_coblocks_dimensions":"","_coblocks_responsive_height":"","_coblocks_accordion_ie_support":"","jetpack_post_was_ever_published":false,"_jetpack_newsletter_access":"","_jetpack_dont_email_post_to_subs":false,"_jetpack_newsletter_tier_id":0,"_jetpack_memberships_contains_paywalled_content":false,"_jetpack_memberships_contains_paid_content":false,"footnotes":"","jetpack_publicize_message":"","jetpack_publicize_feature_enabled":true,"jetpack_social_post_already_shared":true,"jetpack_social_options":{"image_generator_settings":{"template":"highway","enabled":false},"version":2}},"categories":[28070474],"tags":[],"jetpack_publicize_connections":[],"jetpack_featured_media_url":"https:\/\/opstree.com\/blog\/wp-content\/uploads\/2025\/12\/Blog-Image-Template-6.jpg","jetpack_likes_enabled":true,"jetpack_sharing_enabled":true,"jetpack_shortlink":"https:\/\/wp.me\/pfDBOm-7QC","jetpack-related-posts":[],"_links":{"self":[{"href":"https:\/\/opstree.com\/blog\/wp-json\/wp\/v2\/posts\/30170"}],"collection":[{"href":"https:\/\/opstree.com\/blog\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/opstree.com\/blog\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/opstree.com\/blog\/wp-json\/wp\/v2\/users\/244582714"}],"replies":[{"embeddable":true,"href":"https:\/\/opstree.com\/blog\/wp-json\/wp\/v2\/comments?post=30170"}],"version-history":[{"count":10,"href":"https:\/\/opstree.com\/blog\/wp-json\/wp\/v2\/posts\/30170\/revisions"}],"predecessor-version":[{"id":30181,"href":"https:\/\/opstree.com\/blog\/wp-json\/wp\/v2\/posts\/30170\/revisions\/30181"}],"wp:featuredmedia":[{"embeddable":true,"href":"https:\/\/opstree.com\/blog\/wp-json\/wp\/v2\/media\/30175"}],"wp:attachment":[{"href":"https:\/\/opstree.com\/blog\/wp-json\/wp\/v2\/media?parent=30170"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/opstree.com\/blog\/wp-json\/wp\/v2\/categories?post=30170"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/opstree.com\/blog\/wp-json\/wp\/v2\/tags?post=30170"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}