Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,23 @@ ServiceStack.Azure includes implementation of the following ServiceStack provide
The code to configure and start an ServiceBus MQ Server is similar to other MQ Servers:

```csharp
container.Register<IMessageService>(c => new ServiceBusMqServer(ConnectionString));

container.Register<IMessageService>(c => new ServiceBusMqServer(ConnectionString)); //prefetch defaults to 0 (Service Bus default) if not provided
// var prefetchCount = 10;
// container.Register<IMessageService>(c => new ServiceBusMqServer(ConnectionString));

var mqServer = container.Resolve<IMessageService>();
mqServer.RegisterHandler<ServiceDto>(ExecuteMessage);

mqServer.RegisterHandler<ServiceDto>(ExecuteMessage, 4); // 4 is the max concurrent calls (threads)

mqServer.Start();
```

Where ConnectionString is connection string to Service Bus, how to obtain it from Azure Portal you can find in [Get Started with Service Bus queues](https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dotnet-get-started-with-queues) article
Where ConnectionString is connection string to Service Bus, how to obtain it from Azure Portal you can find in [Get Started with Service Bus queues](https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dotnet-get-started-with-queues) article.

The prefetch account defaults to 0 and can be used to allow the clients to load additional messages from the service when it receives a read operation. You can find out more from [Best Practices for performance improvements using Service Bus Messaging](https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-performance-improvements#prefetching).

The number of thread parameter to the RegisterHandler gets or sets the maximum number of concurrent calls to the callback the message pump should initiate.

When an MQ Server is registered, ServiceStack automatically publishes Requests accepted on the "One Way" pre-defined route to the registered MQ broker. The message is later picked up and executed by a Message Handler on a background Thread.

Expand Down
19 changes: 13 additions & 6 deletions src/ServiceStack.Azure/Messaging/ServiceBusMqMessageFactory.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using ServiceStack.Messaging;
using System;
using System.CodeDom;
using System.Threading.Tasks;
using System.Collections.Generic;
using System.Collections.Concurrent;
Expand All @@ -16,6 +17,7 @@ namespace ServiceStack.Azure.Messaging
public class ServiceBusMqMessageFactory : IMessageFactory
{
protected internal readonly string address;
protected internal readonly int prefetchCount;
#if !NETSTANDARD2_0
protected internal readonly NamespaceManager namespaceManager;
#endif
Expand All @@ -26,9 +28,10 @@ public class ServiceBusMqMessageFactory : IMessageFactory
// A list of all Service Bus QueueClients - one per type & queue (priorityq, inq, outq, and dlq)
private static readonly ConcurrentDictionary<string, QueueClient> sbClients = new ConcurrentDictionary<string, QueueClient>();

public ServiceBusMqMessageFactory(string address)
public ServiceBusMqMessageFactory(string address, int prefetchCount=0)
{
this.address = address;
this.prefetchCount = prefetchCount;
#if !NETSTANDARD2_0
this.namespaceManager = NamespaceManager.CreateFromConnectionString(address);
#endif
Expand All @@ -49,7 +52,7 @@ public void Dispose()

}

protected internal void StartQueues(Dictionary<Type, IMessageHandlerFactory> handlerMap)
protected internal void StartQueues(Dictionary<Type, IMessageHandlerFactory> handlerMap, Dictionary<Type, int> handlerThreadCountMap)
{
// Create queues for each registered type
this.handlerMap = handlerMap;
Expand All @@ -59,6 +62,7 @@ protected internal void StartQueues(Dictionary<Type, IMessageHandlerFactory> han
var mqSuffixes = new [] { ".inq", ".outq", ".priorityq", ".dlq" };
foreach (var type in this.handlerMap.Keys)
{

foreach (var mqSuffix in mqSuffixes)
{
var queueName = QueueNames.ResolveQueueNameFn(type.Name, mqSuffix);
Expand All @@ -74,12 +78,12 @@ protected internal void StartQueues(Dictionary<Type, IMessageHandlerFactory> han
}

var mqNames = new QueueNames(type);
AddQueueHandler(mqNames.In);
AddQueueHandler(mqNames.Priority);
AddQueueHandler(mqNames.In, handlerThreadCountMap[type]);
AddQueueHandler(mqNames.Priority, handlerThreadCountMap[type]);
}
}

private void AddQueueHandler(string queueName)
private void AddQueueHandler(string queueName, int threadCount=1)
{
queueName = queueName.SafeQueueName();

Expand All @@ -101,13 +105,16 @@ private void AddQueueHandler(string queueName)

AutoComplete = false,
//AutoRenewTimeout = new TimeSpan()
MaxConcurrentCalls = 1
MaxConcurrentCalls = threadCount

};

var sbClient = QueueClient.CreateFromConnectionString(address, queueName, ReceiveMode.PeekLock);

var sbWorker = new ServiceBusMqWorker(this, CreateMessageQueueClient(), queueName, sbClient);
sbClient.OnMessage(sbWorker.HandleMessage, options);
#endif
sbClient.PrefetchCount = prefetchCount;
sbClients.GetOrAdd(queueName, sbClient);
}

Expand Down
15 changes: 7 additions & 8 deletions src/ServiceStack.Azure/Messaging/ServiceBusMqServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ public int RetryCount
}
}

public ServiceBusMqServer(string connectionString)
public ServiceBusMqServer(string connectionString, int prefetchCount=0 )
{
MessageFactory = new ServiceBusMqMessageFactory(connectionString);
MessageFactory = new ServiceBusMqMessageFactory(connectionString, prefetchCount);
}

public IMessageFactory MessageFactory { get; }
Expand All @@ -45,8 +45,7 @@ public ServiceBusMqServer(string connectionString)

protected internal Dictionary<Type, IMessageHandlerFactory> HandlerMap => handlerMap;

//private readonly Dictionary<Type, int> handlerThreadCountMap
// = new Dictionary<Type, int>();
private readonly Dictionary<Type, int> handlerThreadCountMap= new Dictionary<Type, int>();

public List<Type> RegisteredTypes => handlerMap.Keys.ToList();

Expand Down Expand Up @@ -83,7 +82,7 @@ public string GetStatus()

public void RegisterHandler<T>(Func<IMessage<T>, object> processMessageFn)
{
RegisterHandler(processMessageFn, null, noOfThreads: 1);
RegisterHandler(processMessageFn, null, 1);
}

public void RegisterHandler<T>(Func<IMessage<T>, object> processMessageFn, int noOfThreads)
Expand All @@ -93,7 +92,7 @@ public void RegisterHandler<T>(Func<IMessage<T>, object> processMessageFn, int n

public void RegisterHandler<T>(Func<IMessage<T>, object> processMessageFn, Action<IMessageHandler, IMessage<T>, Exception> processExceptionEx)
{
RegisterHandler(processMessageFn, processExceptionEx, noOfThreads: 1);
RegisterHandler(processMessageFn, processExceptionEx, 1);
}

public void RegisterHandler<T>(Func<IMessage<T>, object> processMessageFn, Action<IMessageHandler, IMessage<T>, Exception> processExceptionEx, int noOfThreads)
Expand All @@ -104,7 +103,7 @@ public void RegisterHandler<T>(Func<IMessage<T>, object> processMessageFn, Actio
}

handlerMap[typeof(T)] = CreateMessageHandlerFactory(processMessageFn, processExceptionEx);
//handlerThreadCountMap[typeof(T)] = noOfThreads;
handlerThreadCountMap[typeof(T)] = noOfThreads;

LicenseUtils.AssertValidUsage(LicenseFeature.ServiceStack, QuotaType.Operations, handlerMap.Count);
}
Expand All @@ -123,7 +122,7 @@ protected IMessageHandlerFactory CreateMessageHandlerFactory<T>(Func<IMessage<T>
public void Start()
{
// Create the queues (if they don't exist) and start the listeners
((ServiceBusMqMessageFactory)MessageFactory).StartQueues(this.handlerMap);
((ServiceBusMqMessageFactory)MessageFactory).StartQueues(this.handlerMap, this.handlerThreadCountMap);
}

public void Stop()
Expand Down