diff --git a/README.md b/README.md index 78f3bd9..98a3c70 100644 --- a/README.md +++ b/README.md @@ -17,14 +17,23 @@ ServiceStack.Azure includes implementation of the following ServiceStack provide The code to configure and start an ServiceBus MQ Server is similar to other MQ Servers: ```csharp -container.Register(c => new ServiceBusMqServer(ConnectionString)); + +container.Register(c => new ServiceBusMqServer(ConnectionString)); //prefetch defaults to 0 (Service Bus default) if not provided +// var prefetchCount = 10; +// container.Register(c => new ServiceBusMqServer(ConnectionString)); var mqServer = container.Resolve(); -mqServer.RegisterHandler(ExecuteMessage); + +mqServer.RegisterHandler(ExecuteMessage, 4); // 4 is the max concurrent calls (threads) + mqServer.Start(); ``` -Where ConnectionString is connection string to Service Bus, how to obtain it from Azure Portal you can find in [Get Started with Service Bus queues](https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dotnet-get-started-with-queues) article +Where ConnectionString is connection string to Service Bus, how to obtain it from Azure Portal you can find in [Get Started with Service Bus queues](https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dotnet-get-started-with-queues) article. + +The prefetch account defaults to 0 and can be used to allow the clients to load additional messages from the service when it receives a read operation. You can find out more from [Best Practices for performance improvements using Service Bus Messaging](https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-performance-improvements#prefetching). + +The number of thread parameter to the RegisterHandler gets or sets the maximum number of concurrent calls to the callback the message pump should initiate. When an MQ Server is registered, ServiceStack automatically publishes Requests accepted on the "One Way" pre-defined route to the registered MQ broker. The message is later picked up and executed by a Message Handler on a background Thread. diff --git a/src/ServiceStack.Azure/Messaging/ServiceBusMqMessageFactory.cs b/src/ServiceStack.Azure/Messaging/ServiceBusMqMessageFactory.cs index 2e0f1bb..6bac441 100644 --- a/src/ServiceStack.Azure/Messaging/ServiceBusMqMessageFactory.cs +++ b/src/ServiceStack.Azure/Messaging/ServiceBusMqMessageFactory.cs @@ -1,5 +1,6 @@ using ServiceStack.Messaging; using System; +using System.CodeDom; using System.Threading.Tasks; using System.Collections.Generic; using System.Collections.Concurrent; @@ -16,6 +17,7 @@ namespace ServiceStack.Azure.Messaging public class ServiceBusMqMessageFactory : IMessageFactory { protected internal readonly string address; + protected internal readonly int prefetchCount; #if !NETSTANDARD2_0 protected internal readonly NamespaceManager namespaceManager; #endif @@ -26,9 +28,10 @@ public class ServiceBusMqMessageFactory : IMessageFactory // A list of all Service Bus QueueClients - one per type & queue (priorityq, inq, outq, and dlq) private static readonly ConcurrentDictionary sbClients = new ConcurrentDictionary(); - public ServiceBusMqMessageFactory(string address) + public ServiceBusMqMessageFactory(string address, int prefetchCount=0) { this.address = address; + this.prefetchCount = prefetchCount; #if !NETSTANDARD2_0 this.namespaceManager = NamespaceManager.CreateFromConnectionString(address); #endif @@ -49,7 +52,7 @@ public void Dispose() } - protected internal void StartQueues(Dictionary handlerMap) + protected internal void StartQueues(Dictionary handlerMap, Dictionary handlerThreadCountMap) { // Create queues for each registered type this.handlerMap = handlerMap; @@ -59,6 +62,7 @@ protected internal void StartQueues(Dictionary han var mqSuffixes = new [] { ".inq", ".outq", ".priorityq", ".dlq" }; foreach (var type in this.handlerMap.Keys) { + foreach (var mqSuffix in mqSuffixes) { var queueName = QueueNames.ResolveQueueNameFn(type.Name, mqSuffix); @@ -74,12 +78,12 @@ protected internal void StartQueues(Dictionary han } var mqNames = new QueueNames(type); - AddQueueHandler(mqNames.In); - AddQueueHandler(mqNames.Priority); + AddQueueHandler(mqNames.In, handlerThreadCountMap[type]); + AddQueueHandler(mqNames.Priority, handlerThreadCountMap[type]); } } - private void AddQueueHandler(string queueName) + private void AddQueueHandler(string queueName, int threadCount=1) { queueName = queueName.SafeQueueName(); @@ -101,13 +105,16 @@ private void AddQueueHandler(string queueName) AutoComplete = false, //AutoRenewTimeout = new TimeSpan() - MaxConcurrentCalls = 1 + MaxConcurrentCalls = threadCount + }; var sbClient = QueueClient.CreateFromConnectionString(address, queueName, ReceiveMode.PeekLock); + var sbWorker = new ServiceBusMqWorker(this, CreateMessageQueueClient(), queueName, sbClient); sbClient.OnMessage(sbWorker.HandleMessage, options); #endif + sbClient.PrefetchCount = prefetchCount; sbClients.GetOrAdd(queueName, sbClient); } diff --git a/src/ServiceStack.Azure/Messaging/ServiceBusMqServer.cs b/src/ServiceStack.Azure/Messaging/ServiceBusMqServer.cs index e7e759d..99afec3 100644 --- a/src/ServiceStack.Azure/Messaging/ServiceBusMqServer.cs +++ b/src/ServiceStack.Azure/Messaging/ServiceBusMqServer.cs @@ -20,9 +20,9 @@ public int RetryCount } } - public ServiceBusMqServer(string connectionString) + public ServiceBusMqServer(string connectionString, int prefetchCount=0 ) { - MessageFactory = new ServiceBusMqMessageFactory(connectionString); + MessageFactory = new ServiceBusMqMessageFactory(connectionString, prefetchCount); } public IMessageFactory MessageFactory { get; } @@ -45,8 +45,7 @@ public ServiceBusMqServer(string connectionString) protected internal Dictionary HandlerMap => handlerMap; - //private readonly Dictionary handlerThreadCountMap - // = new Dictionary(); + private readonly Dictionary handlerThreadCountMap= new Dictionary(); public List RegisteredTypes => handlerMap.Keys.ToList(); @@ -83,7 +82,7 @@ public string GetStatus() public void RegisterHandler(Func, object> processMessageFn) { - RegisterHandler(processMessageFn, null, noOfThreads: 1); + RegisterHandler(processMessageFn, null, 1); } public void RegisterHandler(Func, object> processMessageFn, int noOfThreads) @@ -93,7 +92,7 @@ public void RegisterHandler(Func, object> processMessageFn, int n public void RegisterHandler(Func, object> processMessageFn, Action, Exception> processExceptionEx) { - RegisterHandler(processMessageFn, processExceptionEx, noOfThreads: 1); + RegisterHandler(processMessageFn, processExceptionEx, 1); } public void RegisterHandler(Func, object> processMessageFn, Action, Exception> processExceptionEx, int noOfThreads) @@ -104,7 +103,7 @@ public void RegisterHandler(Func, object> processMessageFn, Actio } handlerMap[typeof(T)] = CreateMessageHandlerFactory(processMessageFn, processExceptionEx); - //handlerThreadCountMap[typeof(T)] = noOfThreads; + handlerThreadCountMap[typeof(T)] = noOfThreads; LicenseUtils.AssertValidUsage(LicenseFeature.ServiceStack, QuotaType.Operations, handlerMap.Count); } @@ -123,7 +122,7 @@ protected IMessageHandlerFactory CreateMessageHandlerFactory(Func public void Start() { // Create the queues (if they don't exist) and start the listeners - ((ServiceBusMqMessageFactory)MessageFactory).StartQueues(this.handlerMap); + ((ServiceBusMqMessageFactory)MessageFactory).StartQueues(this.handlerMap, this.handlerThreadCountMap); } public void Stop()