Azure Event Hubs is a fully managed, cloud-based data streaming platform designed for real-time event ingestion and processing at massive scale.
It serves as the “front door” for event-driven architectures, capable of handling millions of events per second with low latency.
Table of Contents
Components of Event Hub
Producer
So we discussed in the main definition that EventHub is the streaming platform, but from where it gets all this data from?
Producer is responsible for providing/producing this data.
A Producer can comprise a single source of data or multiple data sources for example:
- Streaming data
- Logs
- Files
- Media
- Weather Data
- IoT sensor data, etc.
Event Hub
Now comes the main component that processes all this data – i.e., EventHub itself.
EventHub is the main processing unit that divides this whole data into various Partitions.
Partitions are scale units that work like “freeway lanes” — add more partitions for higher throughput.
You can assign a maximum of 32 partitions under one EventHub.
It’s not like we can create only a single EventHub — we can create multiple EventHubs if higher-level processing is required.
Multiple EventHubs can be created under a Namespace.
A Namespace is nothing but a logical collection of EventHubs.
Consumers
Now there are Consumers who will consume this data.
These can be any application that requires this data, or any Azure service such as:
- Azure Stream Analytics
- Azure Data Explorer, etc.
Just like there are multiple partitions under one EventHub, there can be multiple Consumers under one Consumer Group, and similarly, multiple Consumer Groups can exist as well.
Important Notes
1. Within the Same Consumer Group
Each partition can be owned by only one consumer at a time within the same consumer group.
So:
- If Consumer-1 in Group-A is reading from Partition-0, then Consumer-2 in Group-A cannot read from Partition-0 at the same time.
- If Consumer-1 stops, then Partition-0 can be reassigned to Consumer-2 (still in Group-A).
2. Across Different Consumer Groups
Partitions can be consumed simultaneously by consumers in different consumer groups.
There’s no restriction across groups – Event Hubs (or Kafka) treat each group as an independent subscription to the same event stream.
So:
- Consumer-1 (Group-A) and Consumer-3 (Group-B) can both read Partition-0 at the same time, independently.
- They each maintain their own offset/checkpoint, so they don’t interfere with each other.
Advanced Features
Event Hubs Capture
- Definition: If you want to keep track of the usage and store that data on Azure Blob Storage or Data Lake you can enable this option.
- Automatic Storage: Capture streaming data to Azure Blob Storage or Data Lake.
- Retention: Configure time-based data retention policies.
- Batch Processing: Enable micro-batch analytics on captured data.
Throughput Units
- Definition: Higher the Throughput Units, higher the processing.
Standard Tier Metrics:
- Ingress: Up to 1 MB/second or 1,000 events/second per unit.
- Egress: Up to 2 MB/second per unit.
Auto-Inflate
- Definition: If you want higher processing you can increase the number of throughput units, but if you don’t know how many throughput units you want, you can enable this option and it will automatically adjust based on usage.
- Dynamic Scaling: Automatically adjust throughput units based on demand.
- Cost Optimization: Pay only for what you use.
- No Manual Intervention: Seamless scaling without configuration changes.
Service Tiers
Standard Tier
Basic event streaming for moderate throughput requirements.
Premium Tier
Enhanced performance with dedicated processing units and network isolation.
Dedicated Tier
Single-tenant deployments for maximum performance and compliance requirements.
Goal: Reserve the Consumer for Partition Handling in EventHub
Pre-requisite
- Account in Azure.
- Knowledge about Event Hubs.
Project Summary
There are 2 scripts:
producer.py→ generates sample dataconsumer.py→ acts as a consumer
Between them lies the EventHub.
# producer.py
import time
import uuid
from datetime import datetime, timezone
from azure.eventhub import EventHubProducerClient, EventData
import json
import os
# ==============================================================
# CONFIGURATION
# ==============================================================
EVENTHUB_CONNECTION_STR = "Endpoint=sb://ns-for-testing.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=qniF/Ih8uLQEKGxcQLiFQhdsHcx8X6vku+AEhIpSrtU=;EntityPath=triall"
EVENTHUB_NAME = "triall"
# ==============================================================
# PRODUCER
# ==============================================================
class EventHubProducerDemo:
def __init__(self, connection_string, eventhub_name):
self.producer = EventHubProducerClient.from_connection_string(
conn_str=connection_string,
eventhub_name=eventhub_name
)
def generate_message(self, msg_id):
"""Generate message with unique event_id for idempotency"""
return {
"event_id": str(uuid.uuid4()),
"message_id": msg_id,
"timestamp": datetime.now(timezone.utc).isoformat(),
"value": msg_id * 10,
"data": f"Test message #{msg_id}"
}
def send_messages(self, count=10, interval=1):
"""Send messages alternating between partitions"""
with self.producer:
partitions = self.producer.get_partition_ids()
print(f" Available partitions: {partitions}")
print(f" Sending {count} messages")
print("="*60)
for i in range(1, count + 1):
data = self.generate_message(i)
partition_id = partitions[(i - 1) % len(partitions)]
event_data = EventData(json.dumps(data))
batch = self.producer.create_batch(partition_id=partition_id)
batch.add(event_data)
self.producer.send_batch(batch)
print(f" MESSAGE-{i} → PARTITION-{partition_id} ")
print(f" Event ID: {data['event_id']}")
time.sleep(interval)
print(f"\n Sent all {count} messages!")
print(f" Distribution: {len(partitions)} partition(s)")
# ==============================================================
# ENTRY POINT
# ==============================================================
if __name__ == "__main__":
import sys
count = int(sys.argv[1]) if len(sys.argv) > 1 else 10
interval = float(sys.argv[2]) if len(sys.argv) > 2 else 1
print(f"\n{'#'*60}")
print(f"Event Hub Producer - POC")
print(f"{'#'*60}")
print(f" Event Hub: {EVENTHUB_NAME}")
print(f" Messages to send: {count}")
print(f" Interval: {interval}s")
print(f"{'#'*60}\n")
producer = EventHubProducerDemo(EVENTHUB_CONNECTION_STR, EVENTHUB_NAME)
producer.send_messages(count, interval)
# checkpoint_consumer.py (Blob Storage Version)
import asyncio
import sys
from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore
import json
# ============= CONFIGURATION =============
EVENTHUB_CONNECTION_STR = "Endpoint=sb://namespace-for-testing.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=h5d6nthEG9eZid6SowfOWMGoq5FehOn7g+AEhBuY6VU=;EntityPath=trial"
EVENTHUB_NAME = "trial"
STORAGE_CONNECTION_STR = "DefaultEndpointsProtocol=https;AccountName=123storagefortesting;AccountKey=+0g3j1xa8ZNdCaf38lcpmc5aBwX9AE/jc2DlmFjV0om5d2tgcgJlFvkHltiHz3Be4RHiu3U1CdUt+AStAM+5Ew==;EndpointSuffix=core.windows.net"
CHECKPOINT_CONTAINER = "test"
CONSUMER_GROUP = "$Default"
class CheckpointConsumer:
def __init__(self, partition_id, consumer_name):
self.partition_id = partition_id
self.consumer_name = consumer_name
self.event_count = 0
async def on_event(self, partition_context, event):
"""Process events"""
try:
if event is None:
return
event_body = event.body_as_str()
event_data = json.loads(event_body)
self.event_count += 1
print(f"\n{'='*60}")
print(f" {self.consumer_name} - Event #{self.event_count}")
print(f"{'='*60}")
print(f" Partition: {partition_context.partition_id}")
print(f" Event ID: {event_data.get('event_id', 'N/A')}")
print(f" Message ID: {event_data.get('message_id', 'N/A')}")
print(f" Timestamp: {event_data.get('timestamp', 'N/A')}")
print(f" Sequence #: {event.sequence_number}")
print(f" Offset: {event.offset}")
print(f" Data: {event_data.get('data', event_body[:100])}")
await asyncio.sleep(0.1)
await partition_context.update_checkpoint(event)
print(f" Checkpoint saved to blob (offset: {event.offset})")
print(f"{'='*60}")
except json.JSONDecodeError:
print(" Non-JSON message")
except Exception as e:
print(f" Error: {e}")
async def start(self):
"""Start consumer with blob checkpoint"""
print(f"\n{'#'*60}")
print(f" Starting {self.consumer_name} (BLOB CHECKPOINTING)")
print(f"{'#'*60}")
checkpoint_store = BlobCheckpointStore.from_connection_string(
conn_str=STORAGE_CONNECTION_STR,
container_name=CHECKPOINT_CONTAINER
)
client = EventHubConsumerClient.from_connection_string(
conn_str=EVENTHUB_CONNECTION_STR,
consumer_group=CONSUMER_GROUP,
checkpoint_store=checkpoint_store
)
props = await client.get_eventhub_properties()
all_partitions = props['partition_ids']
if self.partition_id not in all_partitions:
print(f" Partition '{self.partition_id}' not found!")
return
async with client:
await client.receive(
on_event=self.on_event,
partition_id=self.partition_id,
starting_position="-1",
max_wait_time=5
)
async def main():
if len(sys.argv) < 2:
print("\nUsage: python checkpoint_consumer.py [consumer_name]")
sys.exit(1)
partition_id = sys.argv[1]
consumer_name = sys.argv[2] if len(sys.argv) > 2 else f"Consumer-P{partition_id}"
consumer = CheckpointConsumer(partition_id, consumer_name)
await consumer.start()
if __name__ == "__main__":
asyncio.run(main())
#How to Use These Scripts
producer.py
Run in terminal:
python producer.py 10 2
This will produce 10 events with a delay of 2 seconds.
consumer.py
Open two terminals:
python consumer.py 0 Consumer-A python consumer.py 1 Consumer-B
This creates two consumers — Consumer-A and Consumer-B — and assigns them Partition-0 and Partition-1 respectively.
Execution
As we can see on the left side, our producer.py script runs and sends sample events to EventHub.
Since we have chosen which consumer will read from which partition, the consumers start consuming from their respective partitions.
This is a good demo to see how the data transfer works from producer → EventHub → consumer.
Flaws
- Nothing is perfect.
- Though this demo demonstrates how EventHub works, it has some flaws.
- The biggest flaw is that we have fixed a consumer to a partition.
- Now you might be thinking — what’s wrong in that?
- The issue appears in case of consumer failure.
- If Consumer-A fails, then data from Partition-0 has nowhere to go.
- In that case, we must manually start another consumer and point it to Partition-0.
- This manual start/stop process and lack of automatic load balancing can lead to data loss.
Now here comes the concept of Checkpointing.
Checkpointing
Checkpointing refers to creating checkpoints in Azure Storage Service for each partition.
If a consumer fails, a new consumer can resume from the last checkpoint of the previous one.
A checkpoint is created after the message is completely consumed.
Checkpointing is implemented in this script, but we still need to manually configure the new consumer.
Why Implement This Despite the Flaw?
The only motive of doing this is to show the consumer-partition relationship and how messages travel from producer → partition → consumer.
Better Approach
This time, we’ll use the same Producer Script, but the Consumer Script will let EventHub manage everything automatically.
New Consumer Script
Below is the updated consumer script (AutoLoadBalancedConsumer):
# Everything is managed by EventHub
import asyncio # Python async programming
import sys # System arguments
from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore
import json
# ============= CONFIGURATION =============
EVENTHUB_CONNECTION_STR = "Endpoint=sb://ns-for-testing.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=K7/9j0Tf+/zngmgONFMub/0F3uPxqBAjz+AEhMtbYPI=;EntityPath=trial"
EVENTHUB_NAME = "trial"
STORAGE_CONNECTION_STR = "DefaultEndpointsProtocol=https;AccountName=safortestingg;AccountKey=n4VUZDhmJjni/XnOS5NSsGnXYOEMKwhevkDh1RvrSZ5OIDHriTH2rvoBCpR98n1t+5t9pyAXDxfq+AStNty0wg==;EndpointSuffix=core.windows.net"
CHECKPOINT_CONTAINER = "test"
CONSUMER_GROUP = "$Default"
class AutoLoadBalancedConsumer:
def __init__(self, consumer_name):
self.consumer_name = consumer_name
self.event_count = 0
self.partition_counts = {} # Track events per partition
async def on_event(self, partition_context, event):
"""Event handler - called for each event"""
try:
if event is None:
return
partition_id = partition_context.partition_id
if partition_id not in self.partition_counts:
self.partition_counts[partition_id] = 0
self.partition_counts[partition_id] += 1
self.event_count += 1
event_body = event.body_as_str()
event_data = json.loads(event_body)
print(f"\n{'='*60}")
print(f" {self.consumer_name} - Event #{self.event_count}")
print(f"{'='*60}")
print(f" Partition: {partition_id} (Auto-Assigned)")
print(f" Event ID: {event_data.get('event_id')}")
print(f" Message ID: {event_data.get('message_id')}")
print(f" Sequence: {event.sequence_number}")
print(f" Offset: {event.offset}")
await asyncio.sleep(0.1)
await partition_context.update_checkpoint(event)
print(f"✅ Checkpoint saved")
print(f" Partition Distribution: {self.partition_counts}")
print(f"{'='*60}")
except Exception as e:
print(f"❌ Error: {e}")
async def on_partition_initialize(self, partition_context):
print(f"\n {self.consumer_name} - CLAIMED PARTITION {partition_context.partition_id}")
async def on_partition_close(self, partition_context, reason):
print(f"\n {self.consumer_name} - RELEASED PARTITION {partition_context.partition_id}")
print(f" Reason: {reason}")
async def on_error(self, partition_context, error):
if partition_context:
print(f"❌ Error on partition {partition_context.partition_id}: {error}")
else:
print(f"❌ Error: {error}")
async def start(self):
print(f"\n{'#'*60}")
print(f" {self.consumer_name} - AUTO LOAD-BALANCED CONSUMER")
print(f"{'#'*60}")
print(f" Event Hub: {EVENTHUB_NAME}")
print(f" Checkpoint Store: Azure Blob")
print(f" Load Balancing: FAST MODE (5s intervals)")
print(f" Ownership Expiry: 10s\n")
print(f" Waiting for Event Hub to assign partitions...\n")
try:
checkpoint_store = BlobCheckpointStore.from_connection_string(
STORAGE_CONNECTION_STR,
CHECKPOINT_CONTAINER
)
client = EventHubConsumerClient.from_connection_string(
conn_str=EVENTHUB_CONNECTION_STR,
consumer_group=CONSUMER_GROUP,
eventhub_name=EVENTHUB_NAME,
checkpoint_store=checkpoint_store,
load_balancing_interval=5,
partition_ownership_expiration_interval=10
)
async with client:
await client.receive(
on_event=self.on_event,
on_partition_initialize=self.on_partition_initialize,
on_partition_close=self.on_partition_close,
on_error=self.on_error
)
except KeyboardInterrupt:
print(f"\n⚠️ {self.consumer_name} stopped by user")
print(f" Final Stats: Events={self.event_count}, Distribution={self.partition_counts}")
except Exception as e:
print(f"\n❌ Error: {e}")
async def main():
if len(sys.argv) < 2:
print("\n⚠️ Usage: python eventprocessor_consumer.py <consumer_name>")
sys.exit(1)
consumer_name = sys.argv[1]
consumer = AutoLoadBalancedConsumer(consumer_name)
await consumer.start()
if __name__ == "__main__":
asyncio.run(main())
Key Implementations
The above script implements the following:
- Checkpointing
- Automatic Load Balancing in case of any failure(handled by
EventHubConsumerClient) - Full EventHub SDK Integration
Hence, this is a complete script that implements all the main functionalities of EventHub.
Batch Processing
Till now, we were consuming only single events.
However, there might be a use-case where we want to consume events in batches — this can also be implemented by modifying the code accordingly.
Realistic Setup
Now let’s discuss in short what a production-level setup looks like.
- Producers: Multiple producers send different types of data.
- Namespaces / EventHubs: Multiple namespaces to categorize data – e.g., Payments, Messaging, Delivery, etc. Each namespace can have multiple EventHubs.
- Partitions: In production, the number of partitions is a crucial decision because it directly affects throughput and cost.
- Consumer Groups: Multiple consumer groups can consume from the same event stream independently.
- Database Integration: We can integrate any database or cache-management system like Redis to make sure only one-time consumption of messages/events.
One Cons of EventHub
EventHub guarantees the processing of all events, but does not guarantee only one-time processing of an event. There might be a scenario where the event is processed more than one time.
Suppose an event is processed but just before it getting checkpointed, the process failed. When the checkpointing failed, as soon as a new consumer starts consuming it will refer to checkpoints.
But when it will not spot the checkpoint of this event, it will again process it though it was already processed it was just that its checkpoint was not made.
As we have discussed above to counter these type of scenarios we can use DBs or cache-management systems like Redis to create a second checkpoint in case of EventHub checkpointing failed.
Its like a two step verification before processing an event, though it will be an addition of an engineering component make the system more complex, but will also ensure only one time processing, its helpful in cases where we cannot afford multiple processing of a single event.
Summary:
This end-to-end setup demonstrates how data flows from Producer → EventHub → Consumer, and how checkpointing, load balancing, and scaling work together to ensure reliability and high throughput in real-world EventHub systems.
Related Searches – DevOps Consulting Services | Shift-left security solutions | Hybrid Cloud Migration Services