smartthreadpool: replace its locked dictionary

This commit is contained in:
UbitUmarov
2021-09-03 14:36:57 +01:00
parent 30c479db94
commit 94d1cf1205
6 changed files with 59 additions and 131 deletions

View File

@@ -56,6 +56,8 @@ namespace Amib.Threading
/// </summary> /// </summary>
string Name { get; set; } string Name { get; set; }
int localID { get; set; }
/// <summary> /// <summary>
/// Get/Set the maximum number of workitem that execute cocurrency on the thread pool /// Get/Set the maximum number of workitem that execute cocurrency on the thread pool
/// </summary> /// </summary>

View File

@@ -1,6 +1,7 @@
using System;
using Amib.Threading.Internal; using Amib.Threading.Internal;
using System;
using System.Threading;
namespace Amib.Threading namespace Amib.Threading
{ {
@@ -29,19 +30,21 @@ namespace Amib.Threading
/// With this variable a thread can know whatever it belongs to a /// With this variable a thread can know whatever it belongs to a
/// SmartThreadPool. /// SmartThreadPool.
/// </summary> /// </summary>
private readonly SmartThreadPool _associatedSmartThreadPool; private SmartThreadPool _associatedSmartThreadPool;
/// <summary> /// <summary>
/// A reference to the current work item a thread from the thread pool /// A reference to the current work item a thread from the thread pool
/// is executing. /// is executing.
/// </summary> /// </summary>
public WorkItem CurrentWorkItem { get; set; } public WorkItem CurrentWorkItem { get; set; }
public Thread WorkThread;
public ThreadEntry(SmartThreadPool stp) public ThreadEntry(SmartThreadPool stp, Thread th)
{ {
_associatedSmartThreadPool = stp; _associatedSmartThreadPool = stp;
_creationTime = DateTime.UtcNow; _creationTime = DateTime.UtcNow;
_lastAliveTime = DateTime.MinValue; _lastAliveTime = DateTime.MinValue;
WorkThread = th;
} }
public SmartThreadPool AssociatedSmartThreadPool public SmartThreadPool AssociatedSmartThreadPool
@@ -53,6 +56,12 @@ namespace Amib.Threading
{ {
_lastAliveTime = DateTime.UtcNow; _lastAliveTime = DateTime.UtcNow;
} }
public void Clean()
{
WorkThread = null;
_associatedSmartThreadPool = null;
}
} }
#endregion #endregion

View File

@@ -99,6 +99,7 @@ using System;
using System.Security; using System.Security;
using System.Threading; using System.Threading;
using System.Collections; using System.Collections;
using System.Collections.Concurrent;
using System.Collections.Generic; using System.Collections.Generic;
using System.Diagnostics; using System.Diagnostics;
using System.Runtime.CompilerServices; using System.Runtime.CompilerServices;
@@ -202,7 +203,8 @@ namespace Amib.Threading
/// <summary> /// <summary>
/// Dictionary of all the threads in the thread pool. /// Dictionary of all the threads in the thread pool.
/// </summary> /// </summary>
private readonly SynchronizedDictionary<Thread, ThreadEntry> _workerThreads = new SynchronizedDictionary<Thread, ThreadEntry>(); private readonly ConcurrentDictionary<int, ThreadEntry> _workerThreads = new ConcurrentDictionary<int, ThreadEntry>();
private readonly object _workerThreadsLock = new object();
/// <summary> /// <summary>
/// Queue of work items. /// Queue of work items.
@@ -271,7 +273,7 @@ namespace Amib.Threading
/// work item int the SmartThreadPool /// work item int the SmartThreadPool
/// This variable is used in case of Shutdown /// This variable is used in case of Shutdown
/// </summary> /// </summary>
private readonly SynchronizedDictionary<IWorkItemsGroup, IWorkItemsGroup> _workItemsGroups = new SynchronizedDictionary<IWorkItemsGroup, IWorkItemsGroup>(); private readonly ConcurrentDictionary<int, IWorkItemsGroup> _workItemsGroups = new ConcurrentDictionary<int, IWorkItemsGroup>();
/// <summary> /// <summary>
/// A common object for all the work items int the STP /// A common object for all the work items int the STP
@@ -434,7 +436,7 @@ namespace Amib.Threading
private void StartOptimalNumberOfThreads() private void StartOptimalNumberOfThreads()
{ {
int threadsCount; int threadsCount;
lock (_workerThreads.SyncRoot) lock (_workerThreadsLock)
{ {
threadsCount = _workItemsQueue.Count; threadsCount = _workItemsQueue.Count;
if (threadsCount == _stpStartInfo.MinWorkerThreads) if (threadsCount == _stpStartInfo.MinWorkerThreads)
@@ -553,17 +555,20 @@ namespace Amib.Threading
} }
private int baseWorkIDs = Environment.TickCount;
internal void RegisterWorkItemsGroup(IWorkItemsGroup workItemsGroup) internal void RegisterWorkItemsGroup(IWorkItemsGroup workItemsGroup)
{ {
_workItemsGroups[workItemsGroup] = workItemsGroup; int localID = Interlocked.Increment(ref baseWorkIDs);
while (_workItemsGroups.ContainsKey(localID))
localID = Interlocked.Increment(ref baseWorkIDs);
workItemsGroup.localID = localID;
_workItemsGroups[localID] = workItemsGroup;
} }
internal void UnregisterWorkItemsGroup(IWorkItemsGroup workItemsGroup) internal void UnregisterWorkItemsGroup(IWorkItemsGroup workItemsGroup)
{ {
if (_workItemsGroups.Contains(workItemsGroup)) _workItemsGroups.TryRemove(workItemsGroup.localID, out IWorkItemsGroup dummy);
{
_workItemsGroups.Remove(workItemsGroup);
}
} }
/// <summary> /// <summary>
@@ -575,9 +580,9 @@ namespace Amib.Threading
// There is no need to lock the two methods together // There is no need to lock the two methods together
// since only the current thread removes itself // since only the current thread removes itself
// and the _workerThreads is a synchronized dictionary // and the _workerThreads is a synchronized dictionary
if (_workerThreads.Contains(Thread.CurrentThread)) if (_workerThreads.TryRemove(Thread.CurrentThread.ManagedThreadId, out ThreadEntry te))
{ {
_workerThreads.Remove(Thread.CurrentThread); te.Clean();
_windowsPCs.SampleThreads(_workerThreads.Count, _inUseWorkerThreads); _windowsPCs.SampleThreads(_workerThreads.Count, _inUseWorkerThreads);
_localPCs.SampleThreads(_workerThreads.Count, _inUseWorkerThreads); _localPCs.SampleThreads(_workerThreads.Count, _inUseWorkerThreads);
} }
@@ -592,7 +597,7 @@ namespace Amib.Threading
if (_isSuspended) if (_isSuspended)
return; return;
lock (_workerThreads.SyncRoot) lock (_workerThreadsLock)
{ {
// Don't start threads on shut down // Don't start threads on shut down
if (_shutdown) if (_shutdown)
@@ -648,7 +653,7 @@ namespace Amib.Threading
--threadsCount; --threadsCount;
// Add it to the dictionary and update its creation time. // Add it to the dictionary and update its creation time.
_workerThreads[workerThread] = new ThreadEntry(this); _workerThreads[workerThread.ManagedThreadId] = new ThreadEntry(this, workerThread);
_windowsPCs.SampleThreads(_workerThreads.Count, _inUseWorkerThreads); _windowsPCs.SampleThreads(_workerThreads.Count, _inUseWorkerThreads);
_localPCs.SampleThreads(_workerThreads.Count, _inUseWorkerThreads); _localPCs.SampleThreads(_workerThreads.Count, _inUseWorkerThreads);
@@ -663,7 +668,7 @@ namespace Amib.Threading
{ {
// Keep the entry of the dictionary as thread's variable to avoid the synchronization locks // Keep the entry of the dictionary as thread's variable to avoid the synchronization locks
// of the dictionary. // of the dictionary.
CurrentThreadEntry = _workerThreads[Thread.CurrentThread]; CurrentThreadEntry = _workerThreads[Thread.CurrentThread.ManagedThreadId];
bool informedCompleted = false; bool informedCompleted = false;
FireOnThreadInitialization(); FireOnThreadInitialization();
@@ -682,7 +687,7 @@ namespace Amib.Threading
// Double lock for quit. // Double lock for quit.
if (_workerThreads.Count > maxworkers) if (_workerThreads.Count > maxworkers)
{ {
lock (_workerThreads.SyncRoot) lock (_workerThreadsLock)
{ {
if (_workerThreads.Count > maxworkers) if (_workerThreads.Count > maxworkers)
{ {
@@ -708,7 +713,7 @@ namespace Amib.Threading
// Double lock for quit. // Double lock for quit.
if (_workerThreads.Count > minworkers) if (_workerThreads.Count > minworkers)
{ {
lock (_workerThreads.SyncRoot) lock (_workerThreadsLock)
{ {
if (_workerThreads.Count > minworkers) if (_workerThreads.Count > minworkers)
{ {
@@ -914,8 +919,8 @@ namespace Amib.Threading
pcs.Dispose(); pcs.Dispose();
} }
Thread[] threads; ThreadEntry[] threadEntries;
lock (_workerThreads.SyncRoot) lock (_workerThreadsLock)
{ {
// Shutdown the work items queue // Shutdown the work items queue
_workItemsQueue.Dispose(); _workItemsQueue.Dispose();
@@ -925,8 +930,9 @@ namespace Amib.Threading
_shuttingDownEvent.Set(); _shuttingDownEvent.Set();
// Make a copy of the threads' references in the pool // Make a copy of the threads' references in the pool
threads = new Thread[_workerThreads.Count]; threadEntries = new ThreadEntry[_workerThreads.Count];
_workerThreads.Keys.CopyTo(threads, 0); _workerThreads.Values.CopyTo(threadEntries, 0);
_workerThreads.Clear();
} }
int millisecondsLeft = millisecondsTimeout; int millisecondsLeft = millisecondsTimeout;
@@ -936,8 +942,10 @@ namespace Amib.Threading
bool timeout = false; bool timeout = false;
// Each iteration we update the time left for the timeout. // Each iteration we update the time left for the timeout.
foreach (Thread thread in threads) foreach (ThreadEntry te in threadEntries)
{ {
Thread thread = te.WorkThread;
// Join don't work with negative numbers // Join don't work with negative numbers
if (!waitInfinitely && (millisecondsLeft < 0)) if (!waitInfinitely && (millisecondsLeft < 0))
{ {
@@ -959,19 +967,21 @@ namespace Amib.Threading
//TimeSpan ts = DateTime.UtcNow - start; //TimeSpan ts = DateTime.UtcNow - start;
millisecondsLeft = millisecondsTimeout - (int)stopwatch.ElapsedMilliseconds; millisecondsLeft = millisecondsTimeout - (int)stopwatch.ElapsedMilliseconds;
} }
te.WorkThread = null;
} }
if (timeout && forceAbort) if (timeout && forceAbort)
{ {
// Abort the threads in the pool // Abort the threads in the pool
foreach (Thread thread in threads) foreach (ThreadEntry te in threadEntries)
{ {
Thread thread = te.WorkThread;
if ((thread != null) && thread.IsAlive ) if ((thread != null) && thread.IsAlive )
{ {
try try
{ {
thread.Abort(); // Shutdown thread.Abort(); // Shutdown
te.WorkThread = null;
} }
catch (SecurityException e) catch (SecurityException e)
{ {
@@ -1184,9 +1194,8 @@ namespace Amib.Threading
{ {
tih(); tih();
} }
catch (Exception e) catch
{ {
e.GetHashCode();
Debug.Assert(false); Debug.Assert(false);
throw; throw;
} }
@@ -1204,9 +1213,8 @@ namespace Amib.Threading
{ {
tth(); tth();
} }
catch (Exception e) catch
{ {
e.GetHashCode();
Debug.Assert(false); Debug.Assert(false);
throw; throw;
} }
@@ -1242,9 +1250,7 @@ namespace Amib.Threading
foreach (ThreadEntry threadEntry in _workerThreads.Values) foreach (ThreadEntry threadEntry in _workerThreads.Values)
{ {
WorkItem workItem = threadEntry.CurrentWorkItem; WorkItem workItem = threadEntry.CurrentWorkItem;
if (null != workItem && if (null != workItem && !workItem.IsCanceled && workItem.WasQueuedBy(wig))
workItem.WasQueuedBy(wig) &&
!workItem.IsCanceled)
{ {
threadEntry.CurrentWorkItem.GetWorkItemResult().Cancel(true); threadEntry.CurrentWorkItem.GetWorkItemResult().Cancel(true);
} }
@@ -1478,8 +1484,7 @@ namespace Amib.Threading
} }
_isSuspended = false; _isSuspended = false;
ICollection workItemsGroups = _workItemsGroups.Values; foreach (WorkItemsGroup workItemsGroup in _workItemsGroups.Values)
foreach (WorkItemsGroup workItemsGroup in workItemsGroups)
{ {
workItemsGroup.OnSTPIsStarting(); workItemsGroup.OnSTPIsStarting();
} }
@@ -1496,8 +1501,7 @@ namespace Amib.Threading
_canceledSmartThreadPool.IsCanceled = true; _canceledSmartThreadPool.IsCanceled = true;
_canceledSmartThreadPool = new CanceledWorkItemsGroup(); _canceledSmartThreadPool = new CanceledWorkItemsGroup();
ICollection workItemsGroups = _workItemsGroups.Values; foreach (WorkItemsGroup workItemsGroup in _workItemsGroups.Values)
foreach (WorkItemsGroup workItemsGroup in workItemsGroups)
{ {
workItemsGroup.Cancel(abortExecution); workItemsGroup.Cancel(abortExecution);
} }
@@ -1506,12 +1510,13 @@ namespace Amib.Threading
{ {
foreach (ThreadEntry threadEntry in _workerThreads.Values) foreach (ThreadEntry threadEntry in _workerThreads.Values)
{ {
WorkItem workItem = threadEntry.CurrentWorkItem; if(threadEntry.AssociatedSmartThreadPool == this)
if (null != workItem &&
threadEntry.AssociatedSmartThreadPool == this &&
!workItem.IsCanceled)
{ {
threadEntry.CurrentWorkItem.GetWorkItemResult().Cancel(true); WorkItem workItem = threadEntry.CurrentWorkItem;
if (null != workItem && !workItem.IsCanceled)
{
threadEntry.CurrentWorkItem.GetWorkItemResult().Cancel(true);
}
} }
} }
} }

View File

@@ -1,89 +0,0 @@
using System.Collections.Generic;
namespace Amib.Threading.Internal
{
internal class SynchronizedDictionary<TKey, TValue>
{
private readonly Dictionary<TKey, TValue> _dictionary;
private readonly object _lock;
public SynchronizedDictionary()
{
_lock = new object();
_dictionary = new Dictionary<TKey, TValue>();
}
public int Count
{
get { return _dictionary.Count; }
}
public bool Contains(TKey key)
{
lock (_lock)
{
return _dictionary.ContainsKey(key);
}
}
public void Remove(TKey key)
{
lock (_lock)
{
_dictionary.Remove(key);
}
}
public object SyncRoot
{
get { return _lock; }
}
public TValue this[TKey key]
{
get
{
lock (_lock)
{
return _dictionary[key];
}
}
set
{
lock (_lock)
{
_dictionary[key] = value;
}
}
}
public Dictionary<TKey, TValue>.KeyCollection Keys
{
get
{
lock (_lock)
{
return _dictionary.Keys;
}
}
}
public Dictionary<TKey, TValue>.ValueCollection Values
{
get
{
lock (_lock)
{
return _dictionary.Values;
}
}
}
public void Clear()
{
lock (_lock)
{
_dictionary.Clear();
}
}
}
}

View File

@@ -426,10 +426,10 @@ namespace Amib.Threading.Internal
// We must treat the ThreadAbortException or else it will be stored in the exception variable // We must treat the ThreadAbortException or else it will be stored in the exception variable
catch (ThreadAbortException tae) catch (ThreadAbortException tae)
{ {
tae.GetHashCode();
// Check if the work item was cancelled // Check if the work item was cancelled
// If we got a ThreadAbortException and the STP is not shutting down, it means the // If we got a ThreadAbortException and the STP is not shutting down, it means the
// work items was cancelled. // work items was cancelled.
tae.GetHashCode();
if (!SmartThreadPool.CurrentThreadEntry.AssociatedSmartThreadPool.IsShuttingdown) if (!SmartThreadPool.CurrentThreadEntry.AssociatedSmartThreadPool.IsShuttingdown)
{ {
Thread.ResetAbort(); Thread.ResetAbort();

View File

@@ -33,6 +33,7 @@ namespace Amib.Threading.Internal
set { _name = value; } set { _name = value; }
} }
public int localID { get; set;}
#endregion #endregion
#region Abstract Methods #region Abstract Methods