more changes to smartthreadpool

This commit is contained in:
UbitUmarov
2021-01-13 16:23:49 +00:00
parent ec56517c7c
commit 1db6a541bd
5 changed files with 55 additions and 91 deletions

View File

@@ -7,21 +7,25 @@ namespace Amib.Threading.Internal
{
public const int WaitTimeout = Timeout.Infinite;
[System.Runtime.CompilerServices.MethodImpl(System.Runtime.CompilerServices.MethodImplOptions.AggressiveInlining)]
internal static bool WaitAll(WaitHandle[] waitHandles, int millisecondsTimeout, bool exitContext)
{
return WaitHandle.WaitAll(waitHandles, millisecondsTimeout, exitContext);
}
[System.Runtime.CompilerServices.MethodImpl(System.Runtime.CompilerServices.MethodImplOptions.AggressiveInlining)]
internal static int WaitAny(WaitHandle[] waitHandles)
{
return WaitHandle.WaitAny(waitHandles);
}
[System.Runtime.CompilerServices.MethodImpl(System.Runtime.CompilerServices.MethodImplOptions.AggressiveInlining)]
internal static int WaitAny(WaitHandle[] waitHandles, int millisecondsTimeout, bool exitContext)
{
return WaitHandle.WaitAny(waitHandles, millisecondsTimeout, exitContext);
}
[System.Runtime.CompilerServices.MethodImpl(System.Runtime.CompilerServices.MethodImplOptions.AggressiveInlining)]
internal static bool WaitOne(WaitHandle waitHandle, int millisecondsTimeout, bool exitContext)
{
return waitHandle.WaitOne(millisecondsTimeout, exitContext);

View File

@@ -676,10 +676,6 @@ namespace Amib.Threading
// Process until shutdown.
while (!_shutdown)
{
// Update the last time this thread was seen alive.
// It's good for debugging.
CurrentThreadEntry.IAmAlive();
// The following block handles the when the MaxWorkerThreads has been
// incremented by the user at run-time.
// Double lock for quit.
@@ -693,18 +689,17 @@ namespace Amib.Threading
// This method must be called within this lock or else
// more threads will quit and the thread pool will go
// below the lower limit.
InformCompleted();
//InformCompleted();
break;
}
}
}
CurrentThreadEntry.IAmAlive();
// Wait for a work item, shutdown, or timeout
WorkItem workItem = Dequeue();
// Update the last time this thread was seen alive.
// It's good for debugging.
CurrentThreadEntry.IAmAlive();
// On timeout or shut down.
if (workItem == null)
@@ -720,7 +715,7 @@ namespace Amib.Threading
// This method must be called within this lock or else
// more threads will quit and the thread pool will go
// below the lower limit.
InformCompleted();
//InformCompleted();
break;
}
}
@@ -728,6 +723,8 @@ namespace Amib.Threading
continue;
}
CurrentThreadEntry.IAmAlive();
try
{
// Initialize the value to false
@@ -819,6 +816,7 @@ namespace Amib.Threading
{
InformCompleted();
FireOnThreadTermination();
_workItemsQueue.CloseThreadWaiter();
}
}

View File

@@ -987,6 +987,12 @@ namespace Amib.Threading.Internal
_callerContext = null;
}
if(_workItemCompleted != null)
{
_workItemCompleted.Dispose();
_workItemCompleted = null;
}
if (_workItemInfo.DisposeOfStateObjects)
{
IDisposable disp = _state as IDisposable;

View File

@@ -335,7 +335,7 @@ namespace Amib.Threading.Internal
{
if (_workItemsInStpQueue < _concurrency)
{
WorkItem nextWorkItem = _workItemsQueue.Dequeue() as WorkItem;
WorkItem nextWorkItem = _workItemsQueue.Dequeue();
try
{
_stp.Enqueue(nextWorkItem);

View File

@@ -36,22 +36,6 @@ namespace Amib.Threading.Internal
[ThreadStatic]
private static WaiterEntry _waiterEntry;
/// <summary>
/// Each thread in the thread pool keeps its own waiter entry.
/// </summary>
private static WaiterEntry CurrentWaiterEntry
{
get
{
return _waiterEntry;
}
set
{
_waiterEntry = value;
}
}
/// <summary>
/// A flag that indicates if the WorkItemsQueue has been disposed.
/// </summary>
@@ -95,13 +79,11 @@ namespace Amib.Threading.Internal
{
// A work item cannot be null, since null is used in the
// WaitForWorkItem() method to indicate timeout or cancel
if (null == workItem)
if (workItem == null)
{
throw new ArgumentNullException("workItem", "workItem cannot be null");
}
bool enqueue = true;
// First check if there is a waiter waiting for work item. During
// the check, timed out waiters are ignored. If there is no
// waiter then the work item is queued.
@@ -110,9 +92,7 @@ namespace Amib.Threading.Internal
ValidateNotDisposed();
if (!_isWorkItemsQueueActive)
{
return false;
}
while (_waitersCount > 0)
{
@@ -121,21 +101,24 @@ namespace Amib.Threading.Internal
// Signal the waiter. On success break the loop
if (waiterEntry.Signal(workItem))
{
enqueue = false;
break;
}
return true;
}
if (enqueue)
{
// Enqueue the work item
_workItems.Enqueue(workItem);
}
// Enqueue the work item
_workItems.Enqueue(workItem);
}
return true;
}
public void CloseThreadWaiter()
{
if(_waiterEntry != null)
{
_waiterEntry.Close();
_waiterEntry = null;
}
}
/// <summary>
/// Waits for a work item or exits on timeout or cancel
@@ -158,17 +141,13 @@ namespace Amib.Threading.Internal
// didn't get a work item.
WaiterEntry waiterEntry;
WorkItem workItem = null;
lock (this)
{
ValidateNotDisposed();
// If there are waiting work items then take one and return.
if (_workItems.Count > 0)
{
workItem = _workItems.Dequeue();
return workItem;
}
return _workItems.Dequeue();
// No waiting work items ...
@@ -193,43 +172,26 @@ namespace Amib.Threading.Internal
lock (this)
{
// success is true if it got a work item.
bool success = (0 == index);
// The timeout variable is used only for readability.
// (We treat cancel as timeout)
bool timeout = !success;
// On timeout update the waiterEntry that it is timed out
if (timeout)
if (index != 0)
{
// The Timeout() fails if the waiter has already been signaled
timeout = waiterEntry.Timeout();
// On timeout remove the waiter from the queue.
// Note that the complexity is O(1).
if (timeout)
if (waiterEntry.Timeout())
{
RemoveWaiter(waiterEntry, false);
return null;
}
// Again readability
success = !timeout;
}
// On success return the work item
if (success)
{
workItem = waiterEntry.WorkItem;
WorkItem workItem = waiterEntry.WorkItem;
if (workItem == null)
workItem = _workItems.Dequeue();
if (null == workItem)
{
workItem = _workItems.Dequeue();
}
}
return workItem;
}
// On failure return null.
return workItem;
}
/// <summary>
@@ -300,12 +262,13 @@ namespace Amib.Threading.Internal
/// objects each thread has its own WaiterEntry object.
private static WaiterEntry GetThreadWaiterEntry()
{
if (null == CurrentWaiterEntry)
if (_waiterEntry == null)
{
CurrentWaiterEntry = new WaiterEntry();
_waiterEntry = new WaiterEntry();
}
CurrentWaiterEntry.Reset();
return CurrentWaiterEntry;
else
_waiterEntry.Reset();
return _waiterEntry;
}
#region Waiters stack methods
@@ -325,7 +288,6 @@ namespace Amib.Threading.Internal
{
_headWaiterEntry._nextWaiterEntry = newWaiterEntry;
newWaiterEntry._prevWaiterEntry = _headWaiterEntry;
}
// If the stack is not empty then put newWaiterEntry as the new head
// of the stack.
@@ -363,7 +325,7 @@ namespace Amib.Threading.Internal
// Update the new stack head
_headWaiterEntry._nextWaiterEntry = newHeadWaiterEntry;
if (null != newHeadWaiterEntry)
if (newHeadWaiterEntry != null)
{
newHeadWaiterEntry._prevWaiterEntry = _headWaiterEntry;
}
@@ -381,43 +343,38 @@ namespace Amib.Threading.Internal
{
// Store the prev entry in the list
WaiterEntry prevWaiterEntry = waiterEntry._prevWaiterEntry;
waiterEntry._prevWaiterEntry = null;
// Store the next entry in the list
WaiterEntry nextWaiterEntry = waiterEntry._nextWaiterEntry;
waiterEntry._nextWaiterEntry = null;
// A flag to indicate if we need to decrement the waiters count.
// popDecrement indicate if we need to decrement the waiters count.
// If we got here from PopWaiter then we must decrement.
// If we got here from PushWaiter then we decrement only if
// the waiter was already in the stack.
bool decrementCounter = popDecrement;
// Null the waiter's entry links
waiterEntry._prevWaiterEntry = null;
waiterEntry._nextWaiterEntry = null;
// If the waiter entry had a prev link then update it.
// It also means that the waiter is already in the list and we
// need to decrement the waiters count.
if (null != prevWaiterEntry)
if (prevWaiterEntry != null)
{
prevWaiterEntry._nextWaiterEntry = nextWaiterEntry;
decrementCounter = true;
popDecrement = true;
}
// If the waiter entry had a next link then update it.
// It also means that the waiter is already in the list and we
// need to decrement the waiters count.
if (null != nextWaiterEntry)
if (nextWaiterEntry != null)
{
nextWaiterEntry._prevWaiterEntry = prevWaiterEntry;
decrementCounter = true;
popDecrement = true;
}
// Decrement the waiters count if needed
if (decrementCounter)
{
if (popDecrement)
--_waitersCount;
}
}
#endregion
@@ -434,7 +391,6 @@ namespace Amib.Threading.Internal
/// <summary>
/// Event to signal the waiter that it got the work item.
/// </summary>
//private AutoResetEvent _waitHandle = new AutoResetEvent(false);
private AutoResetEvent _waitHandle = new AutoResetEvent(false);
/// <summary>
@@ -466,7 +422,6 @@ namespace Amib.Threading.Internal
public WaiterEntry()
{
Reset();
}
#endregion
@@ -544,6 +499,7 @@ namespace Amib.Threading.Internal
/// </summary>
public void Close()
{
_workItem = null;
if (null != _waitHandle)
{
_waitHandle.Close();
@@ -562,8 +518,8 @@ namespace Amib.Threading.Internal
if (!_isDisposed)
{
Close();
_isDisposed = true;
}
_isDisposed = true;
}
}