Welcome back to Mastering Messaging with AWS SDK for Python. Today, we're exploring the integration of Amazon Simple Notification Service (SNS) and Simple Queue Service (SQS) to efficiently distribute messages. Employing the fan-out pattern allows a single publisher to communicate with multiple subscribers with ease.
Imagine a scenario involving three services: Service A
, Service B
, and Service C
. Service A
needs to send information to both Service B
and Service C
. Rather than Service A
managing individual SQS queues for each, we can simplify this process using an SNS topic to fan out messages.
Let's quickly illustrate how Service A
publishes a message to an SNS topic, including some advanced publishing options:
Python1import boto3 2 3# Initialize the SNS resource 4sns_resource = boto3.resource('sns') 5 6# Create an SNS topic 7topic = sns_resource.create_topic(Name='ServiceAUpdates') 8topic_arn = topic.arn 9 10# Publish a basic message 11publish_response = topic.publish( 12 Message='Updates for Services B and C', 13) 14 15# Publish a message with additional parameters 16publish_response_advanced = topic.publish( 17 Message='News update', 18 Subject='Weekly News', 19 MessageAttributes={ 20 'Priority': { 21 'DataType': 'String', 22 'StringValue': 'High' 23 }, 24 'Department': { 25 'DataType': 'Number', 26 'StringValue': 'IT' 27 } 28 }, 29)
In these examples, we see a straightforward message broadcast to ServiceAUpdates
, followed by an example with a Subject
and MessageAttributes
, adding context to messages sent.
To gear up Service B
and Service C
for message receipt, we'll direct them to listen to our SNS topic. This requires subscribing their SQS queues to the SNS topic:
Python1import boto3 2 3# Initialize the SNS and SQS resources 4sns_resource = boto3.resource('sns') 5sqs_resource = boto3.resource('sqs') 6 7# Assume topic_arn is retrieved or known from previous steps 8topic = sns_resource.Topic(topic_arn) 9 10# Create SQS queues for Service B and C 11serviceBQueue = sqs_resource.create_queue(QueueName='ServiceBQueue') 12queueB_arn = serviceBQueue.attributes['QueueArn'] 13 14serviceCQueue = sqs_resource.create_queue(QueueName='ServiceCQueue') 15queueC_arn = serviceCQueue.attributes['QueueArn'] 16 17# Subscribe the SQS queues to the SNS topic using the resource interface 18subscriptionB = topic.subscribe( 19 Protocol='sqs', 20 Endpoint=queueB_arn, 21 Attributes={ 22 'RawMessageDelivery': 'true' 23 } 24) 25 26subscriptionC = topic.subscribe( 27 Protocol='sqs', 28 Endpoint=queueC_arn, 29 Attributes={ 30 'RawMessageDelivery': 'true' 31 } 32)
This strategic approach ensures Service B
and Service C
are automatically updated with messages from Service A
, efficiently demonstrating the fan-out pattern’s advantage.
Retrieving and processing messages from the queues remains straightforward, a practice reinforced from our earlier lessons:
Python1# Fetching messages from Service B's queue as an example 2messages = serviceBQueue.receive_messages(MaxNumberOfMessages=10) 3 4for message in messages: 5 print(message.body) 6 message.delete()
This lesson concentrated on using SNS and SQS together for the fan-out messaging pattern, facilitating streamlined communication across multiple services. We've revisited key concepts like topic creation and have detailed the process of publishing messages to a topic and subscribing SQS queues to an SNS topic. Experiment with different SNS topics and SQS queues, exploring the myriad ways in which this powerful integration can be customized to fit various messaging needs.