smartthreadpool: place a global throttle on threads creation if pool as less than min number of threads ( 20/s versus .net 1/s in all cases)

This commit is contained in:
UbitUmarov
2021-01-02 12:38:25 +00:00
parent 4b7502c1dc
commit d72f19111c
2 changed files with 38 additions and 30 deletions

View File

@@ -264,6 +264,8 @@ namespace Amib.Threading
/// </summary>
private bool _isDisposed;
private static long _lastThreadCreateTS = long.MinValue;
/// <summary>
/// Holds all the WorkItemsGroup instaces that have at least one
/// work item int the SmartThreadPool
@@ -431,13 +433,19 @@ namespace Amib.Threading
private void StartOptimalNumberOfThreads()
{
int threadsCount = Math.Max(_workItemsQueue.Count, _stpStartInfo.MinWorkerThreads);
threadsCount = Math.Min(threadsCount, _stpStartInfo.MaxWorkerThreads);
threadsCount -= _workerThreads.Count;
if (threadsCount > 0)
int threadsCount;
lock (_workerThreads.SyncRoot)
{
StartThreads(threadsCount);
threadsCount = _workItemsQueue.Count;
if (threadsCount == _stpStartInfo.MinWorkerThreads)
return;
if (threadsCount < _stpStartInfo.MinWorkerThreads)
threadsCount = _stpStartInfo.MinWorkerThreads;
else if (threadsCount > _stpStartInfo.MaxWorkerThreads)
threadsCount = _stpStartInfo.MaxWorkerThreads;
threadsCount -= _workerThreads.Count;
}
StartThreads(threadsCount);
}
private void ValidateSTPStartInfo()
@@ -582,26 +590,28 @@ namespace Amib.Threading
private void StartThreads(int threadsCount)
{
if (_isSuspended)
{
return;
}
lock (_workerThreads.SyncRoot)
{
// Don't start threads on shut down
if (_shutdown)
{
return;
int tmpcount = _workerThreads.Count;
if(tmpcount > _stpStartInfo.MinWorkerThreads)
{
long last = Interlocked.Read(ref _lastThreadCreateTS);
if (DateTime.UtcNow.Ticks - last < 50 * TimeSpan.TicksPerMillisecond)
return;
}
for (int i = 0; i < threadsCount; ++i)
{
// Don't create more threads then the upper limit
if (_workerThreads.Count >= _stpStartInfo.MaxWorkerThreads)
{
return;
}
tmpcount = _stpStartInfo.MaxWorkerThreads - tmpcount;
if (threadsCount > tmpcount)
threadsCount = tmpcount;
while(threadsCount > 0)
{
// Create a new thread
Thread workerThread;
if(_stpStartInfo.SuppressFlow)
@@ -621,20 +631,21 @@ namespace Amib.Threading
? new Thread(ProcessQueuedItems, _stpStartInfo.MaxStackSize.Value)
: new Thread(ProcessQueuedItems);
}
// Configure the new thread and start it
workerThread.IsBackground = _stpStartInfo.AreThreadsBackground;
if (_stpStartInfo.ApartmentState != ApartmentState.Unknown)
{
workerThread.SetApartmentState(_stpStartInfo.ApartmentState);
}
workerThread.Priority = _stpStartInfo.ThreadPriority;
workerThread.Name = string.Format("STP:{0}:{1}", Name, _threadCounter);
workerThread.Start();
Interlocked.Exchange(ref _lastThreadCreateTS, DateTime.UtcNow.Ticks);
++_threadCounter;
--threadsCount;
// Add it to the dictionary and update its creation time.
_workerThreads[workerThread] = new ThreadEntry(this);
@@ -659,6 +670,8 @@ namespace Amib.Threading
try
{
bool bInUseWorkerThreadsWasIncremented = false;
int maxworkers = _stpStartInfo.MaxWorkerThreads;
int minworkers = _stpStartInfo.MinWorkerThreads;
// Process until shutdown.
while (!_shutdown)
@@ -670,11 +683,11 @@ namespace Amib.Threading
// The following block handles the when the MaxWorkerThreads has been
// incremented by the user at run-time.
// Double lock for quit.
if (_workerThreads.Count > _stpStartInfo.MaxWorkerThreads)
if (_workerThreads.Count > maxworkers)
{
lock (_workerThreads.SyncRoot)
{
if (_workerThreads.Count > _stpStartInfo.MaxWorkerThreads)
if (_workerThreads.Count > maxworkers)
{
// Inform that the thread is quiting and then quit.
// This method must be called within this lock or else
@@ -694,14 +707,14 @@ namespace Amib.Threading
CurrentThreadEntry.IAmAlive();
// On timeout or shut down.
if (null == workItem)
if (workItem == null)
{
// Double lock for quit.
if (_workerThreads.Count > _stpStartInfo.MinWorkerThreads)
if (_workerThreads.Count > minworkers)
{
lock (_workerThreads.SyncRoot)
{
if (_workerThreads.Count > _stpStartInfo.MinWorkerThreads)
if (_workerThreads.Count > minworkers)
{
// Inform that the thread is quiting and then quit.
// This method must be called within this lock or else
@@ -712,11 +725,6 @@ namespace Amib.Threading
}
}
}
}
// If we didn't quit then skip to the next iteration.
if (null == workItem)
{
continue;
}
@@ -746,6 +754,7 @@ namespace Amib.Threading
// will return true, so the post execute can run.
if (!workItem.StartingWorkItem())
{
CurrentThreadEntry.CurrentWorkItem = null;
continue;
}

View File

@@ -292,14 +292,13 @@ namespace Amib.Threading.Internal
{
if (IsCanceled)
{
bool result = false;
if ((_workItemInfo.PostExecuteWorkItemCallback != null) &&
((_workItemInfo.CallToPostExecute & CallToPostExecute.WhenWorkItemCanceled) == CallToPostExecute.WhenWorkItemCanceled))
{
result = true;
return true;
}
return result;
return false;
}
Debug.Assert(WorkItemState.InQueue == GetWorkItemState());