Test ServiceBus Trigger locally by using the Azure Service Bus emulator With Docker
Service Bus emulator with Docker:
Install Docker: Docker Desktop
Installing WSL(Windows Subsystem for Linux):
Open Command Prompt and go with below commands
Installing WSL: WSL--install
checking the version: wsl.exe -l -v
Set the version: wsl.exe --set-default-version 2
Updating WSL: WSL --update
Docker is going to use MSSQL for queue storage, MSSQL is required for service bus emulator setup and testing.
SA Password: We have to provide SA password in .env file.
config.json: config.json will contain confgurations for queue and topic.
docker-compose.yaml: This file used to compose docker for local service bus emulator.
Steps to setup container: Create a folder to store config.json, .env, docker-componse.yaml files.
1. .env
# Environment file for user defined variables in docker-compose.yml
# 1. CONFIG_PATH: Path to Config.json file
# Ex: CONFIG_PATH="C:\\Config\\Config.json"
CONFIG_PATH="D:\source\Practice\SettingUpDockerForServiceemulator\config.json"
# 2. ACCEPT_EULA: Pass 'Y' to accept license terms for Azure SQL Edge and
# Azure Service Bus emulator.
ACCEPT_EULA="Y"
# 3. MSSQL_SA_PASSWORD to be filled by user as per policy :
# https://learn.microsoft.com/en-us/sql/relational-databases/
# security/strong-passwords?view=sql-server-linux-ver16
MSSQL_SA_PASSWORD: "CodeFirst@123"
2: config.json
{
"UserConfig": {
"Namespaces": [
{
"Name": "sbemulatorns",
"Queues": [
{
"Name": "queue1",
"Properties": {
"DeadLetteringOnMessageExpiration": false,
"DefaultMessageTimeToLive": "PT1H",
"DuplicateDetectionHistoryTimeWindow": "PT20S",
"ForwardDeadLetteredMessagesTo": "",
"ForwardTo": "",
"LockDuration": "PT1M",
"MaxDeliveryCount": 10,
"RequiresDuplicateDetection": false,
"RequiresSession": false
}
}
],
"Topics": [
{
"Name": "topic1",
"Properties": {
"DefaultMessageTimeToLive": "PT1H",
"DuplicateDetectionHistoryTimeWindow": "PT20S",
"RequiresDuplicateDetection": false
},
"Subscriptions": [
{
"Name": "subscription1",
"Properties": {
"DeadLetteringOnMessageExpiration": false,
"DefaultMessageTimeToLive": "PT1H",
"LockDuration": "PT1M",
"MaxDeliveryCount": 10,
"ForwardDeadLetteredMessagesTo": "",
"ForwardTo": "",
"RequiresSession": false
},
"Rules": [
{
"Name": "app-prop-filter-1",
"Properties": {
"FilterType": "Correlation",
"CorrelationFilter": {
"ContentType": "application/text",
"CorrelationId": "id1",
"Label": "subject1",
"MessageId": "msgid1",
"ReplyTo": "someQueue",
"ReplyToSessionId": "sessionId",
"SessionId": "session1",
"To": "xyz"
}
}
}
]
},
{
"Name": "subscription2",
"Properties": {
"DeadLetteringOnMessageExpiration": false,
"DefaultMessageTimeToLive": "PT1H",
"LockDuration": "PT1M",
"MaxDeliveryCount": 10,
"ForwardDeadLetteredMessagesTo": "",
"ForwardTo": "",
"RequiresSession": false
},
"Rules": [
{
"Name": "user-prop-filter-1",
"Properties": {
"FilterType": "Correlation",
"CorrelationFilter": {
"Properties": {
"prop3": "value3"
}
}
}
}
]
},
{
"Name": "subscription3",
"Properties": {
"DeadLetteringOnMessageExpiration": false,
"DefaultMessageTimeToLive": "PT1H",
"LockDuration": "PT1M",
"MaxDeliveryCount": 10,
"ForwardDeadLetteredMessagesTo": "",
"ForwardTo": "",
"RequiresSession": false
}
}
]
}
]
}
],
"Logging": {
"Type": "File"
}
}
}
3: docker-compose.yaml:
name: microsoft-azure-servicebus-emulator
services:
emulator:
container_name: "servicebus-emulator"
image: mcr.microsoft.com/azure-messaging/servicebus-emulator:latest
volumes:
- "${CONFIG_PATH}:/ServiceBus_Emulator/ConfigFiles/Config.json"
ports:
- "5672:5672"
environment:
SQL_SERVER: sqledge
MSSQL_SA_PASSWORD: ${MSSQL_SA_PASSWORD}
ACCEPT_EULA: ${ACCEPT_EULA}
depends_on:
- sqledge
networks:
sb-emulator:
aliases:
- "sb-emulator"
sqledge:
container_name: "sqledge"
image: "mcr.microsoft.com/azure-sql-edge:latest"
networks:
sb-emulator:
aliases:
- "sqledge"
environment:
ACCEPT_EULA: ${ACCEPT_EULA}
MSSQL_SA_PASSWORD: ${MSSQL_SA_PASSWORD}
networks:
sb-emulator:
After setting up the files run below command:
Syntax:
docker compose -f <PathToDockerComposeFile> up -d
Example:
docker compose -f D:\source\Practice\SettingUpDockerForServiceemulator\docker-compose.yaml up -d
Connection String:
"SrvBusConnectionString": "Endpoint=sb://localhost:5672;
SharedAccessKeyName=RootManageSharedAccessKey;
SharedAccessKey=SAS_KEY_VALUE;UseDevelopmentEmulator=true;"
Testing Queue using Azure Functions:
Create a azure function project:
file 1: local.settings.json
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"FUNCTIONS_WORKER_RUNTIME": "dotnet-isolated",
"QueueName": "queue1",
"SrvBusConnectionString": "Endpoint=sb://localhost:5672;
SharedAccessKeyName=RootManageSharedAccessKey;
SharedAccessKey=SAS_KEY_VALUE;UseDevelopmentEmulator=true;"
}
}
ServiceBus.cs
- Function 1(Http Trigger): (Send Queue) > Sending messages to queue.
- Function 2(Http Trigger): (Receiving Queue) > Receiving messages from queue.
- Function 3(Service Bus Trigger): (Receiving Queue When Message send to Queue) > Receiving messages from queue.
using Azure.Messaging.ServiceBus;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
namespace AzureFunctions
{
public class ServiceBus
{
private readonly ILogger<ServiceBus> _logger;
private IConfiguration? config;
private ServiceBusClient? client = null;
private ServiceBusSender? sender = null;
private ServiceBusProcessor? processor;
private Handlers? _handlers = null;
private string? _queueName = null;
private string? serviceBusConnectionString = null;
ServiceBusClientOptions clientOptions = new ServiceBusClientOptions()
{
TransportType = ServiceBusTransportType.AmqpWebSockets
};
public ServiceBus(ILogger<ServiceBus> logger, IConfiguration _config)
{
_logger = logger;
_handlers = new Handlers();
config = _config;
if (config != null)
{
serviceBusConnectionString = config.Get<Values>()?.
SrvBusConnectionString ?? "";
_queueName = config.Get<Values>()?.QueueName ?? "";
}
// Creating client
client = new ServiceBusClient(serviceBusConnectionString);
// Creating processor
processor = client.CreateProcessor(_queueName,
new ServiceBusProcessorOptions());
sender = client.CreateSender(_queueName);
}
// Sending a message to a queue
[Function("SendQueue")]
public async Task<IActionResult> SendQueue([HttpTrigger(
AuthorizationLevel.Function, "get", "post")] HttpRequest req)
{
const int numOfMessages = 3;
if (sender != null && client != null)
{
using ServiceBusMessageBatch messageBatch = await
sender.CreateMessageBatchAsync();
for (int i = 1; i <= numOfMessages; i++)
{
// try adding a message to the batch
if (!messageBatch.TryAddMessage(new
ServiceBusMessage($"Message {i}")))
{
// if it is too large for the batch
throw new Exception($"The message
{i} is too large to fit in the batch.");
}
}
try
{
// Use the producer client to send the batch of
// messages to the Service Bus queue
await sender.SendMessagesAsync(messageBatch);
Console.WriteLine($"A batch of {numOfMessages}
messages has been published to the queue.");
}
finally
{
// Calling DisposeAsync on client types is
//required to ensure that network
// resources and other unmanaged objects are properly cleaned up.
await sender.DisposeAsync();
await client.DisposeAsync();
}
return new OkObjectResult(numOfMessages +
" Messages are sent to Queue: " + _queueName);
}
else
{
return new BadRequestResult();
}
}
[Function("ReceiveQueue")]
public async Task<IActionResult> ReceiveQueue([HttpTrigger
(AuthorizationLevel.Function, "get", "post")] HttpRequest req)
{
if (_handlers != null && processor != null && client != null)
{
_logger.LogInformation
("C# ReceiveQueue function processed a request.");
try
{
// add handler to process messages
processor.ProcessMessageAsync += _handlers.MessageHandler;
// add handler to process any errors
processor.ProcessErrorAsync += _handlers.ErrorHandler;
// start processing
await processor.StartProcessingAsync();
Console.WriteLine
("Wait for a minute and then press any key to end the processing");
Console.ReadKey();
// stop processing
Console.WriteLine("\nStopping the receiver...");
await processor.StopProcessingAsync();
Console.WriteLine("Stopped receiving messages");
}
finally
{
await processor.DisposeAsync();
await client.DisposeAsync();
}
}
return new OkObjectResult("Reading Message Queues is Completed!");
}
// Service bus trigger will run when a message come to either queue or topic
[Function("QueueTrigger")]
public async Task QueueTrigger(
[ServiceBusTrigger("%QueueName%",
Connection = "SrvBusConnectionString")]
ServiceBusReceivedMessage message,
ServiceBusMessageActions messageActions)
{
_logger.LogInformation("Message ID: {id}", message.MessageId);
_logger.LogInformation("Message Body: {body}", message.Body);
_logger.LogInformation("Message Content-Type: {contentType}",
message.ContentType);
// Complete the message
await messageActions.CompleteMessageAsync(message);
}
}
}
Handlers.cs
using Azure.Messaging.ServiceBus;
namespace AzureFunctions
{
internal class Handlers
{
// handle received messages
internal async Task MessageHandler(ProcessMessageEventArgs args)
{
string body = args.Message.Body.ToString();
Console.WriteLine($"Received: {body}");
// complete the message. message is deleted from the queue.
await args.CompleteMessageAsync(args.Message);
}
// handle any errors when receiving messages
internal Task ErrorHandler(ProcessErrorEventArgs args)
{
Console.WriteLine(args.Exception.ToString());
return Task.CompletedTask;
}
}
}
Values.cs
using System;using System.Collections.Generic;using System.Linq;using System.Text;using System.Threading.Tasks;
namespace AzureFunctions{ internal class Values { public string? AzureWebJobsStorage { get; set; } public string? FUNCTIONS_WORKER_RUNTIME { get; set; } public string? SrvBusConnectionString { get; set; } public string? QueueName { get; set; } }}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace AzureFunctions
{
internal class Values
{
public string? AzureWebJobsStorage { get; set; }
public string? FUNCTIONS_WORKER_RUNTIME { get; set; }
public string? SrvBusConnectionString { get; set; }
public string? QueueName { get; set; }
}
}
Testing with Postman:
As soon as we send messages to queue service bus trigger will trigger.
An Azure Service Bus trigger is used to respond to messages from a Service Bus queue or topic. It gets triggered when a new message arrives in the specified queue or topic.