Test ServiceBus Trigger locally by using the Azure Service Bus emulator With Docker

 Test ServiceBus Trigger locally by using the Azure Service Bus emulator With Docker

Service Bus emulator with Docker:

Install Docker: Docker Desktop

Installing WSL(Windows Subsystem for Linux)

Open Command Prompt and go with below commands

Installing WSL: WSL--install
checking the version: wsl.exe -l -v
Set the version: wsl.exe --set-default-version 2

Updating WSL: WSL --update

Docker is going to use MSSQL for queue storage, MSSQL is required for service bus emulator setup and testing.

SA Password: We have to provide SA password in .env file.
config.json: config.json will contain confgurations for queue and topic.
docker-compose.yaml: This file used to compose docker for local service bus emulator.

Steps to setup container: Create a folder to store config.json, .env, docker-componse.yaml files.

1. .env


# Environment file for user defined variables in docker-compose.yml

# 1. CONFIG_PATH: Path to Config.json file
# Ex: CONFIG_PATH="C:\\Config\\Config.json"
CONFIG_PATH="D:\source\Practice\SettingUpDockerForServiceemulator\config.json"

# 2. ACCEPT_EULA: Pass 'Y' to accept license terms for Azure SQL Edge and
# Azure Service Bus emulator.
ACCEPT_EULA="Y"

# 3. MSSQL_SA_PASSWORD to be filled by user as per policy :
# https://learn.microsoft.com/en-us/sql/relational-databases/
# security/strong-passwords?view=sql-server-linux-ver16
MSSQL_SA_PASSWORD: "CodeFirst@123"

2: config.json

{
    "UserConfig": {
     "Namespaces": [
       {
         "Name": "sbemulatorns",
         "Queues": [
           {
             "Name": "queue1",
             "Properties": {
               "DeadLetteringOnMessageExpiration": false,
               "DefaultMessageTimeToLive": "PT1H",
               "DuplicateDetectionHistoryTimeWindow": "PT20S",
               "ForwardDeadLetteredMessagesTo": "",
               "ForwardTo": "",
               "LockDuration": "PT1M",
               "MaxDeliveryCount": 10,
               "RequiresDuplicateDetection": false,
               "RequiresSession": false
             }
           }
         ],
   
         "Topics": [
           {
             "Name": "topic1",
             "Properties": {
               "DefaultMessageTimeToLive": "PT1H",
               "DuplicateDetectionHistoryTimeWindow": "PT20S",
               "RequiresDuplicateDetection": false
             },
             "Subscriptions": [
               {
                 "Name": "subscription1",
                 "Properties": {
                   "DeadLetteringOnMessageExpiration": false,
                   "DefaultMessageTimeToLive": "PT1H",
                   "LockDuration": "PT1M",
                   "MaxDeliveryCount": 10,
                   "ForwardDeadLetteredMessagesTo": "",
                   "ForwardTo": "",
                   "RequiresSession": false
                 },
                 "Rules": [
                   {
                     "Name": "app-prop-filter-1",
                     "Properties": {
                       "FilterType": "Correlation",
                       "CorrelationFilter": {
                         "ContentType": "application/text",
                         "CorrelationId": "id1",
                         "Label": "subject1",
                         "MessageId": "msgid1",
                         "ReplyTo": "someQueue",
                         "ReplyToSessionId": "sessionId",
                         "SessionId": "session1",
                         "To": "xyz"
                       }
                     }
                   }
                 ]
               },
               {
                 "Name": "subscription2",
                 "Properties": {
                   "DeadLetteringOnMessageExpiration": false,
                   "DefaultMessageTimeToLive": "PT1H",
                   "LockDuration": "PT1M",
                   "MaxDeliveryCount": 10,
                   "ForwardDeadLetteredMessagesTo": "",
                   "ForwardTo": "",
                   "RequiresSession": false
                 },
                 "Rules": [
                   {
                     "Name": "user-prop-filter-1",
                     "Properties": {
                       "FilterType": "Correlation",
                       "CorrelationFilter": {
                         "Properties": {
                           "prop3": "value3"
                         }
                       }
                     }
                   }
                 ]
               },
               {
                 "Name": "subscription3",
                 "Properties": {
                   "DeadLetteringOnMessageExpiration": false,
                   "DefaultMessageTimeToLive": "PT1H",
                   "LockDuration": "PT1M",
                   "MaxDeliveryCount": 10,
                   "ForwardDeadLetteredMessagesTo": "",
                   "ForwardTo": "",
                   "RequiresSession": false
                 }
               }
             ]
           }
         ]
       }
     ],
     "Logging": {
       "Type": "File"
     }
    }
    }


3: docker-compose.yaml:

name: microsoft-azure-servicebus-emulator
services:
  emulator:
    container_name: "servicebus-emulator"
    image: mcr.microsoft.com/azure-messaging/servicebus-emulator:latest
    volumes:
      - "${CONFIG_PATH}:/ServiceBus_Emulator/ConfigFiles/Config.json"
    ports:
      - "5672:5672"
    environment:
      SQL_SERVER: sqledge  
      MSSQL_SA_PASSWORD: ${MSSQL_SA_PASSWORD}
      ACCEPT_EULA: ${ACCEPT_EULA}
    depends_on:
      - sqledge
    networks:
      sb-emulator:
        aliases:
          - "sb-emulator"
  sqledge:
        container_name: "sqledge"
        image: "mcr.microsoft.com/azure-sql-edge:latest"
        networks:
          sb-emulator:
            aliases:
              - "sqledge"
        environment:
          ACCEPT_EULA: ${ACCEPT_EULA}
          MSSQL_SA_PASSWORD: ${MSSQL_SA_PASSWORD}
networks:
  sb-emulator:


After setting up the files run below command:

Syntax:
docker compose -f <PathToDockerComposeFile> up -d

Example:
docker compose -f D:\source\Practice\SettingUpDockerForServiceemulator\docker-compose.yaml up -d





Connection String:

 "SrvBusConnectionString": "Endpoint=sb://localhost:5672;
    SharedAccessKeyName=RootManageSharedAccessKey;
    SharedAccessKey=SAS_KEY_VALUE;UseDevelopmentEmulator=true;"

Testing Queue using Azure Functions:




Create a azure function project:

file 1: local.settings.json

{
  "IsEncrypted": false,
  "Values": {
    "AzureWebJobsStorage": "UseDevelopmentStorage=true",
    "FUNCTIONS_WORKER_RUNTIME": "dotnet-isolated",
    "QueueName": "queue1",
    "SrvBusConnectionString": "Endpoint=sb://localhost:5672;
        SharedAccessKeyName=RootManageSharedAccessKey;
        SharedAccessKey=SAS_KEY_VALUE;UseDevelopmentEmulator=true;"
  }
}

ServiceBus.cs
  1. Function 1(Http Trigger): (Send Queue) > Sending messages to queue.
  2. Function 2(Http Trigger): (Receiving Queue) > Receiving  messages from queue.
  3. Function 3(Service Bus Trigger): (Receiving Queue When Message send to Queue) > Receiving  messages from queue.

using Azure.Messaging.ServiceBus;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;

namespace AzureFunctions
{
    public class ServiceBus
    {
        private readonly ILogger<ServiceBus> _logger;
        private IConfiguration? config;
        private ServiceBusClient? client = null;
        private ServiceBusSender? sender = null;
        private ServiceBusProcessor? processor;
        private Handlers? _handlers = null;

        private string? _queueName = null;
        private string? serviceBusConnectionString = null;

        ServiceBusClientOptions clientOptions = new ServiceBusClientOptions()
        {
            TransportType = ServiceBusTransportType.AmqpWebSockets
        };

        public ServiceBus(ILogger<ServiceBus> logger, IConfiguration _config)
        {
            _logger = logger;
            _handlers = new Handlers();
            config = _config;

            if (config != null)
            {
                serviceBusConnectionString = config.Get<Values>()?.
                                            SrvBusConnectionString ?? "";
                _queueName = config.Get<Values>()?.QueueName ?? "";
            }

            // Creating client
            client = new ServiceBusClient(serviceBusConnectionString);

            // Creating processor
            processor = client.CreateProcessor(_queueName,
                    new ServiceBusProcessorOptions());

            sender = client.CreateSender(_queueName);
        }

        // Sending a message to a queue
        [Function("SendQueue")]
        public async Task<IActionResult> SendQueue([HttpTrigger(
                AuthorizationLevel.Function, "get", "post")] HttpRequest req)
        {
            const int numOfMessages = 3;
            if (sender != null && client != null)
            {
                using ServiceBusMessageBatch messageBatch = await
                        sender.CreateMessageBatchAsync();

                for (int i = 1; i <= numOfMessages; i++)
                {
                    // try adding a message to the batch
                    if (!messageBatch.TryAddMessage(new
                            ServiceBusMessage($"Message {i}")))
                    {
                        // if it is too large for the batch
                        throw new Exception($"The message
                            {i} is too large to fit in the batch.");
                    }
                }

                try
                {
                    // Use the producer client to send the batch of
                    // messages to the Service Bus queue
                    await sender.SendMessagesAsync(messageBatch);
                    Console.WriteLine($"A batch of {numOfMessages}
                            messages has been published to the queue.");
                }
                finally
                {
                    // Calling DisposeAsync on client types is
                        //required to ensure that network
                    // resources and other unmanaged objects are properly cleaned up.
                    await sender.DisposeAsync();
                    await client.DisposeAsync();
                }
                return new OkObjectResult(numOfMessages +
                        " Messages are sent to Queue: " + _queueName);
            }
            else
            {

                return new BadRequestResult();
            }
        }

        [Function("ReceiveQueue")]
        public async Task<IActionResult> ReceiveQueue([HttpTrigger
                (AuthorizationLevel.Function, "get", "post")] HttpRequest req)
        {
            if (_handlers != null && processor != null && client != null)
            {
                _logger.LogInformation
                ("C# ReceiveQueue function processed a request.");
                try
                {
                    // add handler to process messages
                    processor.ProcessMessageAsync += _handlers.MessageHandler;

                    // add handler to process any errors
                    processor.ProcessErrorAsync += _handlers.ErrorHandler;

                    // start processing
                    await processor.StartProcessingAsync();

                    Console.WriteLine
                  ("Wait for a minute and then press any key to end the processing");
                    Console.ReadKey();

                    // stop processing
                    Console.WriteLine("\nStopping the receiver...");
                    await processor.StopProcessingAsync();
                    Console.WriteLine("Stopped receiving messages");

                }
                finally
                {                  
                    await processor.DisposeAsync();
                    await client.DisposeAsync();
                }
            }
            return new OkObjectResult("Reading Message Queues is Completed!");
        }

        // Service bus trigger will run when a message come to either queue or topic
        [Function("QueueTrigger")]
        public async Task QueueTrigger(
            [ServiceBusTrigger("%QueueName%",
               Connection = "SrvBusConnectionString")]
            ServiceBusReceivedMessage message,
            ServiceBusMessageActions messageActions)
        {
            _logger.LogInformation("Message ID: {id}", message.MessageId);
            _logger.LogInformation("Message Body: {body}", message.Body);
            _logger.LogInformation("Message Content-Type: {contentType}",
                message.ContentType);

            // Complete the message
            await messageActions.CompleteMessageAsync(message);
        }
    }
}

Handlers.cs

using Azure.Messaging.ServiceBus;

namespace AzureFunctions
{
    internal class Handlers
    {
        // handle received messages
        internal async Task MessageHandler(ProcessMessageEventArgs args)
        {
            string body = args.Message.Body.ToString();
            Console.WriteLine($"Received: {body}");

            // complete the message. message is deleted from the queue.
            await args.CompleteMessageAsync(args.Message);
        }

        // handle any errors when receiving messages
        internal Task ErrorHandler(ProcessErrorEventArgs args)
        {
            Console.WriteLine(args.Exception.ToString());
            return Task.CompletedTask;
        }
    }
}

Values.cs

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace AzureFunctions
{
    internal class Values
    {
        public string? AzureWebJobsStorage { get; set; }
        public string? FUNCTIONS_WORKER_RUNTIME { get; set; }
        public string? SrvBusConnectionString { get; set; }
        public string? QueueName { get; set; }
    }
}

Testing with Postman:

As soon as we send messages to queue service bus trigger will trigger.

An Azure Service Bus trigger is used to respond to messages from a Service Bus queue or topic. It gets triggered when a new message arrives in the specified queue or topic.




   



Raviteja Mulukuntla

Hello Viewers! My name is RaviTeja Mulukuntla and I am a senior software engineer working in a reputed IT firm. I like sharing my professional experience and knowledge skills with all users across all the Digital Platforms.

Previous Post Next Post

Contact Form