diff --git a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs index beb4cc088839d8..fb1b363cc75d60 100644 --- a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs +++ b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs @@ -826,7 +826,7 @@ public bool StartAsyncOperation(SocketAsyncContext context, TOperation operation } // Called on the epoll thread whenever we receive an epoll notification. - public void HandleEvent(SocketAsyncContext context) + public void HandleEvent(SocketAsyncContext context, List toEnqueue) { AsyncOperation op; using (Lock()) @@ -866,7 +866,15 @@ public void HandleEvent(SocketAsyncContext context) } // Dispatch the op so we can try to process it. - op.Dispatch(); + var e = op.Event; + if (e is null) + { + toEnqueue.Add(op); + } + else + { + e.Set(); + } } internal void ProcessAsyncOperation(TOperation op) @@ -1946,7 +1954,7 @@ public SocketError SendFileAsync(SafeFileHandle fileHandle, long offset, long co return SocketError.IOPending; } - public unsafe void HandleEvents(Interop.Sys.SocketEvents events) + public unsafe void HandleEvents(Interop.Sys.SocketEvents events, List toEnqueue) { if ((events & Interop.Sys.SocketEvents.Error) != 0) { @@ -1957,12 +1965,12 @@ public unsafe void HandleEvents(Interop.Sys.SocketEvents events) if ((events & Interop.Sys.SocketEvents.Read) != 0) { - _receiveQueue.HandleEvent(this); + _receiveQueue.HandleEvent(this, toEnqueue); } if ((events & Interop.Sys.SocketEvents.Write) != 0) { - _sendQueue.HandleEvent(this); + _sendQueue.HandleEvent(this, toEnqueue); } } diff --git a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs index 9d5f48b7045e37..94bab0712273d5 100644 --- a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs +++ b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs @@ -4,6 +4,7 @@ using System; using System.Collections.Concurrent; +using System.Collections.Generic; using System.Diagnostics; using System.Runtime.InteropServices; using System.Threading; @@ -105,7 +106,7 @@ public bool TryRegister(SafeSocketHandle socket, out Interop.Error error) // private static readonly IntPtr MaxHandles = IntPtr.Size == 4 ? (IntPtr)int.MaxValue : (IntPtr)long.MaxValue; #endif - private static readonly IntPtr MinHandlesForAdditionalEngine = s_engineCount == 1 ? MaxHandles : (IntPtr)32; + private static readonly IntPtr MinHandlesForAdditionalEngine = s_engineCount == 1 ? MaxHandles : (IntPtr)int.Parse(Environment.GetEnvironmentVariable("MinHandles")!); // // Sentinel handle value to identify events from the "shutdown pipe," used to signal an event loop to stop @@ -308,6 +309,7 @@ private void EventLoop() try { bool shutdown = false; + List toEnqueue = new List(EventBufferCount); while (!shutdown) { int numEvents = EventBufferCount; @@ -333,11 +335,18 @@ private void EventLoop() _handleToContextMap.TryGetValue(handle, out SocketAsyncContext? context); if (context != null) { - context.HandleEvents(_buffer[i].Events); + context.HandleEvents(_buffer[i].Events, toEnqueue); context = null; } } } + + if (toEnqueue.Count > 0) + { + ThreadPool.UnsafeQueueUserWorkItem(toEnqueue, preferLocal: false); + + toEnqueue.Clear(); + } } FreeNativeResources(); diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPool.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPool.cs index fe1021272a282e..9e3d984a7110e1 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPool.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPool.cs @@ -424,7 +424,7 @@ private ThreadPoolWorkQueueThreadLocals CreateThreadLocals() return ThreadPoolWorkQueueThreadLocals.threadLocals = new ThreadPoolWorkQueueThreadLocals(this); } - internal void EnsureThreadRequested() + internal bool EnsureThreadRequested() { // // If we have not yet requested #procs threads, then request a new thread. @@ -439,10 +439,12 @@ internal void EnsureThreadRequested() if (prev == count) { ThreadPool.RequestWorkerThread(); - break; + return true; } count = prev; } + + return false; } internal void MarkThreadRequestSatisfied() @@ -489,6 +491,30 @@ public void Enqueue(object callback, bool forceGlobal) EnsureThreadRequested(); } + public bool Enqueue(List callbacks, bool forceGlobal) + { + ThreadPoolWorkQueueThreadLocals? tl = null; + if (!forceGlobal) + tl = ThreadPoolWorkQueueThreadLocals.threadLocals; + + if (null != tl) + { + foreach (var callback in callbacks) + { + tl.workStealingQueue.LocalPush(callback); + } + } + else + { + foreach (var callback in callbacks) + { + workItems.Enqueue(callback); + } + } + + return EnsureThreadRequested(); + } + internal bool LocalFindAndPop(object callback) { ThreadPoolWorkQueueThreadLocals? tl = ThreadPoolWorkQueueThreadLocals.threadLocals; @@ -1179,6 +1205,13 @@ public static bool UnsafeQueueUserWorkItem(IThreadPoolWorkItem callBack, bool pr return true; } + public static bool UnsafeQueueUserWorkItem(List callBacks, bool preferLocal) + { + EnsureInitialized(); + + return ThreadPoolGlobals.workQueue.Enqueue(callBacks, forceGlobal: !preferLocal); + } + internal static void UnsafeQueueUserWorkItemInternal(object callBack, bool preferLocal) { Debug.Assert((callBack is IThreadPoolWorkItem) ^ (callBack is Task)); diff --git a/src/libraries/System.Threading.ThreadPool/ref/System.Threading.ThreadPool.cs b/src/libraries/System.Threading.ThreadPool/ref/System.Threading.ThreadPool.cs index e9188f9ee2739f..8b6ad2bd555215 100644 --- a/src/libraries/System.Threading.ThreadPool/ref/System.Threading.ThreadPool.cs +++ b/src/libraries/System.Threading.ThreadPool/ref/System.Threading.ThreadPool.cs @@ -40,6 +40,7 @@ public static partial class ThreadPool [System.CLSCompliantAttribute(false)] public static unsafe bool UnsafeQueueNativeOverlapped(System.Threading.NativeOverlapped* overlapped) { throw null; } public static bool UnsafeQueueUserWorkItem(System.Threading.IThreadPoolWorkItem callBack, bool preferLocal) { throw null; } + public static bool UnsafeQueueUserWorkItem(System.Collections.Generic.List callBacks, bool preferLocal) { throw null; } public static bool UnsafeQueueUserWorkItem(System.Threading.WaitCallback callBack, object? state) { throw null; } public static bool UnsafeQueueUserWorkItem(System.Action callBack, TState state, bool preferLocal) { throw null; } public static System.Threading.RegisteredWaitHandle UnsafeRegisterWaitForSingleObject(System.Threading.WaitHandle waitObject, System.Threading.WaitOrTimerCallback callBack, object? state, int millisecondsTimeOutInterval, bool executeOnlyOnce) { throw null; } diff --git a/src/libraries/System.Threading.ThreadPool/ref/System.Threading.ThreadPool.csproj b/src/libraries/System.Threading.ThreadPool/ref/System.Threading.ThreadPool.csproj index a86046bb60d448..a380d18ddd033f 100644 --- a/src/libraries/System.Threading.ThreadPool/ref/System.Threading.ThreadPool.csproj +++ b/src/libraries/System.Threading.ThreadPool/ref/System.Threading.ThreadPool.csproj @@ -10,5 +10,6 @@ +