From 94d1cf1205f00bec99e63bf2b8f6fb104469f6dd Mon Sep 17 00:00:00 2001 From: UbitUmarov Date: Fri, 3 Sep 2021 14:36:57 +0100 Subject: [PATCH] smartthreadpool: replace its locked dictionary --- ThirdParty/SmartThreadPool/Interfaces.cs | 2 + .../SmartThreadPool.ThreadEntry.cs | 15 +++- ThirdParty/SmartThreadPool/SmartThreadPool.cs | 81 +++++++++-------- .../SmartThreadPool/SynchronizedDictionary.cs | 89 ------------------- ThirdParty/SmartThreadPool/WorkItem.cs | 2 +- .../SmartThreadPool/WorkItemsGroupBase.cs | 1 + 6 files changed, 59 insertions(+), 131 deletions(-) delete mode 100644 ThirdParty/SmartThreadPool/SynchronizedDictionary.cs diff --git a/ThirdParty/SmartThreadPool/Interfaces.cs b/ThirdParty/SmartThreadPool/Interfaces.cs index b2e19968b4..e709aa80c2 100644 --- a/ThirdParty/SmartThreadPool/Interfaces.cs +++ b/ThirdParty/SmartThreadPool/Interfaces.cs @@ -56,6 +56,8 @@ namespace Amib.Threading /// string Name { get; set; } + int localID { get; set; } + /// /// Get/Set the maximum number of workitem that execute cocurrency on the thread pool /// diff --git a/ThirdParty/SmartThreadPool/SmartThreadPool.ThreadEntry.cs b/ThirdParty/SmartThreadPool/SmartThreadPool.ThreadEntry.cs index c1fa5a87a3..c43825bbba 100644 --- a/ThirdParty/SmartThreadPool/SmartThreadPool.ThreadEntry.cs +++ b/ThirdParty/SmartThreadPool/SmartThreadPool.ThreadEntry.cs @@ -1,6 +1,7 @@ -using System; using Amib.Threading.Internal; +using System; +using System.Threading; namespace Amib.Threading { @@ -29,19 +30,21 @@ namespace Amib.Threading /// With this variable a thread can know whatever it belongs to a /// SmartThreadPool. /// - private readonly SmartThreadPool _associatedSmartThreadPool; + private SmartThreadPool _associatedSmartThreadPool; /// /// A reference to the current work item a thread from the thread pool /// is executing. /// public WorkItem CurrentWorkItem { get; set; } + public Thread WorkThread; - public ThreadEntry(SmartThreadPool stp) + public ThreadEntry(SmartThreadPool stp, Thread th) { _associatedSmartThreadPool = stp; _creationTime = DateTime.UtcNow; _lastAliveTime = DateTime.MinValue; + WorkThread = th; } public SmartThreadPool AssociatedSmartThreadPool @@ -53,6 +56,12 @@ namespace Amib.Threading { _lastAliveTime = DateTime.UtcNow; } + + public void Clean() + { + WorkThread = null; + _associatedSmartThreadPool = null; + } } #endregion diff --git a/ThirdParty/SmartThreadPool/SmartThreadPool.cs b/ThirdParty/SmartThreadPool/SmartThreadPool.cs index 40394a6c7d..262dd03ed2 100644 --- a/ThirdParty/SmartThreadPool/SmartThreadPool.cs +++ b/ThirdParty/SmartThreadPool/SmartThreadPool.cs @@ -99,6 +99,7 @@ using System; using System.Security; using System.Threading; using System.Collections; +using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.Runtime.CompilerServices; @@ -202,7 +203,8 @@ namespace Amib.Threading /// /// Dictionary of all the threads in the thread pool. /// - private readonly SynchronizedDictionary _workerThreads = new SynchronizedDictionary(); + private readonly ConcurrentDictionary _workerThreads = new ConcurrentDictionary(); + private readonly object _workerThreadsLock = new object(); /// /// Queue of work items. @@ -271,7 +273,7 @@ namespace Amib.Threading /// work item int the SmartThreadPool /// This variable is used in case of Shutdown /// - private readonly SynchronizedDictionary _workItemsGroups = new SynchronizedDictionary(); + private readonly ConcurrentDictionary _workItemsGroups = new ConcurrentDictionary(); /// /// A common object for all the work items int the STP @@ -434,7 +436,7 @@ namespace Amib.Threading private void StartOptimalNumberOfThreads() { int threadsCount; - lock (_workerThreads.SyncRoot) + lock (_workerThreadsLock) { threadsCount = _workItemsQueue.Count; if (threadsCount == _stpStartInfo.MinWorkerThreads) @@ -553,17 +555,20 @@ namespace Amib.Threading } + private int baseWorkIDs = Environment.TickCount; internal void RegisterWorkItemsGroup(IWorkItemsGroup workItemsGroup) { - _workItemsGroups[workItemsGroup] = workItemsGroup; + int localID = Interlocked.Increment(ref baseWorkIDs); + while (_workItemsGroups.ContainsKey(localID)) + localID = Interlocked.Increment(ref baseWorkIDs); + + workItemsGroup.localID = localID; + _workItemsGroups[localID] = workItemsGroup; } internal void UnregisterWorkItemsGroup(IWorkItemsGroup workItemsGroup) { - if (_workItemsGroups.Contains(workItemsGroup)) - { - _workItemsGroups.Remove(workItemsGroup); - } + _workItemsGroups.TryRemove(workItemsGroup.localID, out IWorkItemsGroup dummy); } /// @@ -575,9 +580,9 @@ namespace Amib.Threading // There is no need to lock the two methods together // since only the current thread removes itself // and the _workerThreads is a synchronized dictionary - if (_workerThreads.Contains(Thread.CurrentThread)) + if (_workerThreads.TryRemove(Thread.CurrentThread.ManagedThreadId, out ThreadEntry te)) { - _workerThreads.Remove(Thread.CurrentThread); + te.Clean(); _windowsPCs.SampleThreads(_workerThreads.Count, _inUseWorkerThreads); _localPCs.SampleThreads(_workerThreads.Count, _inUseWorkerThreads); } @@ -592,7 +597,7 @@ namespace Amib.Threading if (_isSuspended) return; - lock (_workerThreads.SyncRoot) + lock (_workerThreadsLock) { // Don't start threads on shut down if (_shutdown) @@ -648,7 +653,7 @@ namespace Amib.Threading --threadsCount; // Add it to the dictionary and update its creation time. - _workerThreads[workerThread] = new ThreadEntry(this); + _workerThreads[workerThread.ManagedThreadId] = new ThreadEntry(this, workerThread); _windowsPCs.SampleThreads(_workerThreads.Count, _inUseWorkerThreads); _localPCs.SampleThreads(_workerThreads.Count, _inUseWorkerThreads); @@ -663,7 +668,7 @@ namespace Amib.Threading { // Keep the entry of the dictionary as thread's variable to avoid the synchronization locks // of the dictionary. - CurrentThreadEntry = _workerThreads[Thread.CurrentThread]; + CurrentThreadEntry = _workerThreads[Thread.CurrentThread.ManagedThreadId]; bool informedCompleted = false; FireOnThreadInitialization(); @@ -682,7 +687,7 @@ namespace Amib.Threading // Double lock for quit. if (_workerThreads.Count > maxworkers) { - lock (_workerThreads.SyncRoot) + lock (_workerThreadsLock) { if (_workerThreads.Count > maxworkers) { @@ -708,7 +713,7 @@ namespace Amib.Threading // Double lock for quit. if (_workerThreads.Count > minworkers) { - lock (_workerThreads.SyncRoot) + lock (_workerThreadsLock) { if (_workerThreads.Count > minworkers) { @@ -914,8 +919,8 @@ namespace Amib.Threading pcs.Dispose(); } - Thread[] threads; - lock (_workerThreads.SyncRoot) + ThreadEntry[] threadEntries; + lock (_workerThreadsLock) { // Shutdown the work items queue _workItemsQueue.Dispose(); @@ -925,8 +930,9 @@ namespace Amib.Threading _shuttingDownEvent.Set(); // Make a copy of the threads' references in the pool - threads = new Thread[_workerThreads.Count]; - _workerThreads.Keys.CopyTo(threads, 0); + threadEntries = new ThreadEntry[_workerThreads.Count]; + _workerThreads.Values.CopyTo(threadEntries, 0); + _workerThreads.Clear(); } int millisecondsLeft = millisecondsTimeout; @@ -936,8 +942,10 @@ namespace Amib.Threading bool timeout = false; // Each iteration we update the time left for the timeout. - foreach (Thread thread in threads) + foreach (ThreadEntry te in threadEntries) { + Thread thread = te.WorkThread; + // Join don't work with negative numbers if (!waitInfinitely && (millisecondsLeft < 0)) { @@ -959,19 +967,21 @@ namespace Amib.Threading //TimeSpan ts = DateTime.UtcNow - start; millisecondsLeft = millisecondsTimeout - (int)stopwatch.ElapsedMilliseconds; } + te.WorkThread = null; } if (timeout && forceAbort) { // Abort the threads in the pool - foreach (Thread thread in threads) + foreach (ThreadEntry te in threadEntries) { - + Thread thread = te.WorkThread; if ((thread != null) && thread.IsAlive ) { try { thread.Abort(); // Shutdown + te.WorkThread = null; } catch (SecurityException e) { @@ -1184,9 +1194,8 @@ namespace Amib.Threading { tih(); } - catch (Exception e) + catch { - e.GetHashCode(); Debug.Assert(false); throw; } @@ -1204,9 +1213,8 @@ namespace Amib.Threading { tth(); } - catch (Exception e) + catch { - e.GetHashCode(); Debug.Assert(false); throw; } @@ -1242,9 +1250,7 @@ namespace Amib.Threading foreach (ThreadEntry threadEntry in _workerThreads.Values) { WorkItem workItem = threadEntry.CurrentWorkItem; - if (null != workItem && - workItem.WasQueuedBy(wig) && - !workItem.IsCanceled) + if (null != workItem && !workItem.IsCanceled && workItem.WasQueuedBy(wig)) { threadEntry.CurrentWorkItem.GetWorkItemResult().Cancel(true); } @@ -1478,8 +1484,7 @@ namespace Amib.Threading } _isSuspended = false; - ICollection workItemsGroups = _workItemsGroups.Values; - foreach (WorkItemsGroup workItemsGroup in workItemsGroups) + foreach (WorkItemsGroup workItemsGroup in _workItemsGroups.Values) { workItemsGroup.OnSTPIsStarting(); } @@ -1496,8 +1501,7 @@ namespace Amib.Threading _canceledSmartThreadPool.IsCanceled = true; _canceledSmartThreadPool = new CanceledWorkItemsGroup(); - ICollection workItemsGroups = _workItemsGroups.Values; - foreach (WorkItemsGroup workItemsGroup in workItemsGroups) + foreach (WorkItemsGroup workItemsGroup in _workItemsGroups.Values) { workItemsGroup.Cancel(abortExecution); } @@ -1506,12 +1510,13 @@ namespace Amib.Threading { foreach (ThreadEntry threadEntry in _workerThreads.Values) { - WorkItem workItem = threadEntry.CurrentWorkItem; - if (null != workItem && - threadEntry.AssociatedSmartThreadPool == this && - !workItem.IsCanceled) + if(threadEntry.AssociatedSmartThreadPool == this) { - threadEntry.CurrentWorkItem.GetWorkItemResult().Cancel(true); + WorkItem workItem = threadEntry.CurrentWorkItem; + if (null != workItem && !workItem.IsCanceled) + { + threadEntry.CurrentWorkItem.GetWorkItemResult().Cancel(true); + } } } } diff --git a/ThirdParty/SmartThreadPool/SynchronizedDictionary.cs b/ThirdParty/SmartThreadPool/SynchronizedDictionary.cs deleted file mode 100644 index 0cce19ffac..0000000000 --- a/ThirdParty/SmartThreadPool/SynchronizedDictionary.cs +++ /dev/null @@ -1,89 +0,0 @@ -using System.Collections.Generic; - -namespace Amib.Threading.Internal -{ - internal class SynchronizedDictionary - { - private readonly Dictionary _dictionary; - private readonly object _lock; - - public SynchronizedDictionary() - { - _lock = new object(); - _dictionary = new Dictionary(); - } - - public int Count - { - get { return _dictionary.Count; } - } - - public bool Contains(TKey key) - { - lock (_lock) - { - return _dictionary.ContainsKey(key); - } - } - - public void Remove(TKey key) - { - lock (_lock) - { - _dictionary.Remove(key); - } - } - - public object SyncRoot - { - get { return _lock; } - } - - public TValue this[TKey key] - { - get - { - lock (_lock) - { - return _dictionary[key]; - } - } - set - { - lock (_lock) - { - _dictionary[key] = value; - } - } - } - - public Dictionary.KeyCollection Keys - { - get - { - lock (_lock) - { - return _dictionary.Keys; - } - } - } - - public Dictionary.ValueCollection Values - { - get - { - lock (_lock) - { - return _dictionary.Values; - } - } - } - public void Clear() - { - lock (_lock) - { - _dictionary.Clear(); - } - } - } -} diff --git a/ThirdParty/SmartThreadPool/WorkItem.cs b/ThirdParty/SmartThreadPool/WorkItem.cs index 96afae23f8..1a2cc7eccb 100644 --- a/ThirdParty/SmartThreadPool/WorkItem.cs +++ b/ThirdParty/SmartThreadPool/WorkItem.cs @@ -426,10 +426,10 @@ namespace Amib.Threading.Internal // We must treat the ThreadAbortException or else it will be stored in the exception variable catch (ThreadAbortException tae) { - tae.GetHashCode(); // Check if the work item was cancelled // If we got a ThreadAbortException and the STP is not shutting down, it means the // work items was cancelled. + tae.GetHashCode(); if (!SmartThreadPool.CurrentThreadEntry.AssociatedSmartThreadPool.IsShuttingdown) { Thread.ResetAbort(); diff --git a/ThirdParty/SmartThreadPool/WorkItemsGroupBase.cs b/ThirdParty/SmartThreadPool/WorkItemsGroupBase.cs index 473adc3645..c7db400d20 100644 --- a/ThirdParty/SmartThreadPool/WorkItemsGroupBase.cs +++ b/ThirdParty/SmartThreadPool/WorkItemsGroupBase.cs @@ -33,6 +33,7 @@ namespace Amib.Threading.Internal set { _name = value; } } + public int localID { get; set;} #endregion #region Abstract Methods