Compare commits

...

11 Commits

Author SHA1 Message Date
Diva Canto
dc88ffc5b4 Delay the enqueueing of non-longpoll requests for 100ms. No need to have these requests actively on the processing queue if it seems they're not ready. 2013-07-18 17:17:20 -07:00
Diva Canto
d1e9beead8 Revert "Now trying DoubleQueue instead of BlockingQueue for the PollServiceRequestManager."
This reverts commit 5f95f4d78e.
2013-07-18 15:52:07 -07:00
Diva Canto
27377194cd Changed the timoeut of EQ 502s (no events) to 50 secs. The viewer post requests timeout in 60 secs.
There's plenty of room for improvement in handling the EQs. Some other time...
2013-07-18 13:48:56 -07:00
Justin Clark-Casey (justincc)
8c6761c152 Do some simple queue empty checks in the main outgoing udp loop instead of always performing these on a separate fired thread.
This appears to improve cpu usage since launching a new thread is more expensive than performing a small amount of inline logic.
However, needs testing at scale.
2013-07-18 21:28:36 +01:00
Diva Canto
553d9cc5d2 Applying the same fix here that dan lake applied to master -- unfortunately I can't cherry-pick because that commit has 2 parents... 2013-07-18 07:52:14 -07:00
Diva Canto
c685cc1799 Revert "This is a completely unreasonable thing to do, effectively defying the purpose of BlockingQueues. Trying this, to see the effect on CPU."
This reverts commit 5232ab0496.
2013-07-17 20:42:38 -07:00
Justin Clark-Casey (justincc)
1ba5a05cf7 try Hacking in an AutoResetEvent to control the outgoing UDP loop instead of a continuous loop with sleeps.
Does appear to have a cpu impact but may need further tweaking
2013-07-18 01:17:46 +01:00
Justin Clark-Casey (justincc)
0af3b5ed9a Revert "Put in temporary hack for performnace 'queue-empty' logic on a persistent thread rather than through fire and forget"
This reverts commit b402220dbb.

Eliminating fire and forget here does not appear to make a significant difference.
2013-07-18 00:51:10 +01:00
Justin Clark-Casey (justincc)
a94a43d249 Revert "Properly remove the hack queue update thread when the voewr shuts down"
This reverts commit 7c544c0d4e.
2013-07-18 00:50:16 +01:00
Justin Clark-Casey (justincc)
7c544c0d4e Properly remove the hack queue update thread when the voewr shuts down
No functional change.
2013-07-18 00:39:28 +01:00
Justin Clark-Casey (justincc)
b402220dbb Put in temporary hack for performnace 'queue-empty' logic on a persistent thread rather than through fire and forget
May not scale since this gives each client its own thread.
2013-07-18 00:30:22 +01:00
9 changed files with 187 additions and 81 deletions

View File

@@ -96,7 +96,17 @@ namespace OpenSim.Framework.Servers.HttpServer
private void ReQueueEvent(PollServiceHttpRequest req)
{
if (m_running)
m_requests.Enqueue(req);
{
// delay the enqueueing for 100ms. There's no need to have the event
// actively on the queue
Timer t = new Timer(self => {
((Timer)self).Dispose();
m_requests.Enqueue(req);
});
t.Change(100, Timeout.Infinite);
}
}
public void Enqueue(PollServiceHttpRequest req)
@@ -118,7 +128,7 @@ namespace OpenSim.Framework.Servers.HttpServer
// The only purpose of this thread is to check the EQs for events.
// If there are events, that thread will be placed in the "ready-to-serve" queue, m_requests.
// If there are no events, that thread will be back to its "waiting" queue, m_longPollRequests.
// All other types of tasks (Inventory handlers) don't have the long-poll nature,
// All other types of tasks (Inventory handlers, http-in, etc) don't have the long-poll nature,
// so if they aren't ready to be served by a worker thread (no events), they are placed
// directly back in the "ready-to-serve" queue by the worker thread.
while (m_running)
@@ -185,33 +195,35 @@ namespace OpenSim.Framework.Servers.HttpServer
{
while (m_running)
{
PollServiceHttpRequest req = m_requests.Dequeue(5000);
//m_log.WarnFormat("[YYY]: Dequeued {0}", (req == null ? "null" : req.PollServiceArgs.Type.ToString()));
Watchdog.UpdateThread();
PollServiceHttpRequest req = null;
lock (m_requests)
if (req != null)
{
if (m_requests.Count() > 0)
req = m_requests.Dequeue();
}
if (req == null)
Thread.Sleep(100);
else
{
//PollServiceHttpRequest req = m_requests.Dequeue(5000);
//m_log.WarnFormat("[YYY]: Dequeued {0}", (req == null ? "null" : req.PollServiceArgs.Type.ToString()));
if (req != null)
try
{
try
if (req.PollServiceArgs.HasEvents(req.RequestID, req.PollServiceArgs.Id))
{
if (req.PollServiceArgs.HasEvents(req.RequestID, req.PollServiceArgs.Id))
Hashtable responsedata = req.PollServiceArgs.GetEvents(req.RequestID, req.PollServiceArgs.Id);
if (responsedata == null)
continue;
if (req.PollServiceArgs.Type == PollServiceEventArgs.EventType.LongPoll) // This is the event queue
{
Hashtable responsedata = req.PollServiceArgs.GetEvents(req.RequestID, req.PollServiceArgs.Id);
if (responsedata == null)
continue;
if (req.PollServiceArgs.Type == PollServiceEventArgs.EventType.LongPoll) // This is the event queue
try
{
req.DoHTTPGruntWork(m_server, responsedata);
}
catch (ObjectDisposedException) // Browser aborted before we could read body, server closed the stream
{
// Ignore it, no need to reply
}
}
else
{
m_threadPool.QueueWorkItem(x =>
{
try
{
@@ -221,41 +233,27 @@ namespace OpenSim.Framework.Servers.HttpServer
{
// Ignore it, no need to reply
}
}
else
{
m_threadPool.QueueWorkItem(x =>
{
try
{
req.DoHTTPGruntWork(m_server, responsedata);
}
catch (ObjectDisposedException) // Browser aborted before we could read body, server closed the stream
{
// Ignore it, no need to reply
}
return null;
}, null);
}
return null;
}, null);
}
}
else
{
if ((Environment.TickCount - req.RequestTime) > req.PollServiceArgs.TimeOutms)
{
req.DoHTTPGruntWork(
m_server, req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id));
}
else
{
if ((Environment.TickCount - req.RequestTime) > req.PollServiceArgs.TimeOutms)
{
req.DoHTTPGruntWork(
m_server, req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id));
}
else
{
ReQueueEvent(req);
}
ReQueueEvent(req);
}
}
catch (Exception e)
{
m_log.ErrorFormat("Exception in poll service thread: " + e.ToString());
}
}
catch (Exception e)
{
m_log.ErrorFormat("Exception in poll service thread: " + e.ToString());
}
}
}

View File

@@ -1779,10 +1779,12 @@ namespace OpenSim.Framework
FireAndForget(callback, null);
}
public static void InitThreadPool(int maxThreads)
public static void InitThreadPool(int minThreads, int maxThreads)
{
if (maxThreads < 2)
throw new ArgumentOutOfRangeException("maxThreads", "maxThreads must be greater than 2");
if (minThreads > maxThreads || minThreads < 2)
throw new ArgumentOutOfRangeException("minThreads", "minThreads must be greater than 2 and less than or equal to maxThreads");
if (m_ThreadPool != null)
throw new InvalidOperationException("SmartThreadPool is already initialized");
@@ -1791,6 +1793,7 @@ namespace OpenSim.Framework
startInfo.IdleTimeout = 2000;
startInfo.MaxWorkerThreads = maxThreads;
startInfo.MinWorkerThreads = 2;
startInfo.MinWorkerThreads = minThreads;
m_ThreadPool = new SmartThreadPool(startInfo);
}
@@ -1865,7 +1868,7 @@ namespace OpenSim.Framework
break;
case FireAndForgetMethod.SmartThreadPool:
if (m_ThreadPool == null)
InitThreadPool(15);
InitThreadPool(2, 15);
m_ThreadPool.QueueWorkItem((cb, o) => cb(o), realCallback, obj);
break;
case FireAndForgetMethod.Thread:

View File

@@ -86,6 +86,7 @@ namespace OpenSim
IConfig startupConfig = Config.Configs["Startup"];
IConfig networkConfig = Config.Configs["Network"];
int stpMinThreads = 2;
int stpMaxThreads = 15;
if (startupConfig != null)
@@ -112,12 +113,13 @@ namespace OpenSim
if (!String.IsNullOrEmpty(asyncCallMethodStr) && Utils.EnumTryParse<FireAndForgetMethod>(asyncCallMethodStr, out asyncCallMethod))
Util.FireAndForgetMethod = asyncCallMethod;
stpMinThreads = startupConfig.GetInt("MinPoolThreads", 15);
stpMaxThreads = startupConfig.GetInt("MaxPoolThreads", 15);
m_consolePrompt = startupConfig.GetString("ConsolePrompt", @"Region (\R) ");
}
if (Util.FireAndForgetMethod == FireAndForgetMethod.SmartThreadPool)
Util.InitThreadPool(stpMaxThreads);
Util.InitThreadPool(stpMinThreads, stpMaxThreads);
m_log.Info("[OPENSIM MAIN]: Using async_call_method " + Util.FireAndForgetMethod);
}

View File

@@ -65,6 +65,13 @@ namespace OpenSim.Region.ClientStack.Linden
/// </value>
public int DebugLevel { get; set; }
// Viewer post requests timeout in 60 secs
// https://bitbucket.org/lindenlab/viewer-release/src/421c20423df93d650cc305dc115922bb30040999/indra/llmessage/llhttpclient.cpp?at=default#cl-44
//
private const int VIEWER_TIMEOUT = 60 * 1000;
// Just to be safe, we work on a 10 sec shorter cycle
private const int SERVER_EQ_TIME_NO_EVENTS = VIEWER_TIMEOUT - (10 * 1000);
protected Scene m_scene;
private Dictionary<UUID, int> m_ids = new Dictionary<UUID, int>();
@@ -363,8 +370,8 @@ namespace OpenSim.Region.ClientStack.Linden
}
caps.RegisterPollHandler(
"EventQueueGet",
new PollServiceEventArgs(null, GenerateEqgCapPath(eventQueueGetUUID), HasEvents, GetEvents, NoEvents, agentID, 40000));
"EventQueueGet",
new PollServiceEventArgs(null, GenerateEqgCapPath(eventQueueGetUUID), HasEvents, GetEvents, NoEvents, agentID, SERVER_EQ_TIME_NO_EVENTS));
Random rnd = new Random(Environment.TickCount);
lock (m_ids)

View File

@@ -485,6 +485,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
m_udpServer = udpServer;
m_udpClient = udpClient;
m_udpClient.OnQueueEmpty += HandleQueueEmpty;
m_udpClient.HasUpdates += HandleHasUpdates;
m_udpClient.OnPacketStats += PopulateStats;
m_prioritizer = new Prioritizer(m_scene);
@@ -3776,6 +3777,17 @@ namespace OpenSim.Region.ClientStack.LindenUDP
ResendPrimUpdate(update);
}
// OpenSim.Framework.Lazy<List<ObjectUpdatePacket.ObjectDataBlock>> objectUpdateBlocks = new OpenSim.Framework.Lazy<List<ObjectUpdatePacket.ObjectDataBlock>>();
// OpenSim.Framework.Lazy<List<ObjectUpdateCompressedPacket.ObjectDataBlock>> compressedUpdateBlocks = new OpenSim.Framework.Lazy<List<ObjectUpdateCompressedPacket.ObjectDataBlock>>();
// OpenSim.Framework.Lazy<List<ImprovedTerseObjectUpdatePacket.ObjectDataBlock>> terseUpdateBlocks = new OpenSim.Framework.Lazy<List<ImprovedTerseObjectUpdatePacket.ObjectDataBlock>>();
// OpenSim.Framework.Lazy<List<ImprovedTerseObjectUpdatePacket.ObjectDataBlock>> terseAgentUpdateBlocks = new OpenSim.Framework.Lazy<List<ImprovedTerseObjectUpdatePacket.ObjectDataBlock>>();
//
// OpenSim.Framework.Lazy<List<EntityUpdate>> objectUpdates = new OpenSim.Framework.Lazy<List<EntityUpdate>>();
// OpenSim.Framework.Lazy<List<EntityUpdate>> compressedUpdates = new OpenSim.Framework.Lazy<List<EntityUpdate>>();
// OpenSim.Framework.Lazy<List<EntityUpdate>> terseUpdates = new OpenSim.Framework.Lazy<List<EntityUpdate>>();
// OpenSim.Framework.Lazy<List<EntityUpdate>> terseAgentUpdates = new OpenSim.Framework.Lazy<List<EntityUpdate>>();
private void ProcessEntityUpdates(int maxUpdates)
{
OpenSim.Framework.Lazy<List<ObjectUpdatePacket.ObjectDataBlock>> objectUpdateBlocks = new OpenSim.Framework.Lazy<List<ObjectUpdatePacket.ObjectDataBlock>>();
@@ -3788,6 +3800,15 @@ namespace OpenSim.Region.ClientStack.LindenUDP
OpenSim.Framework.Lazy<List<EntityUpdate>> terseUpdates = new OpenSim.Framework.Lazy<List<EntityUpdate>>();
OpenSim.Framework.Lazy<List<EntityUpdate>> terseAgentUpdates = new OpenSim.Framework.Lazy<List<EntityUpdate>>();
// objectUpdateBlocks.Value.Clear();
// compressedUpdateBlocks.Value.Clear();
// terseUpdateBlocks.Value.Clear();
// terseAgentUpdateBlocks.Value.Clear();
// objectUpdates.Value.Clear();
// compressedUpdates.Value.Clear();
// terseUpdates.Value.Clear();
// terseAgentUpdates.Value.Clear();
// Check to see if this is a flush
if (maxUpdates <= 0)
{
@@ -4113,8 +4134,14 @@ namespace OpenSim.Region.ClientStack.LindenUDP
void HandleQueueEmpty(ThrottleOutPacketTypeFlags categories)
{
// if (!m_udpServer.IsRunningOutbound)
// return;
if ((categories & ThrottleOutPacketTypeFlags.Task) != 0)
{
// if (!m_udpServer.IsRunningOutbound)
// return;
if (m_maxUpdates == 0 || m_LastQueueFill == 0)
{
m_maxUpdates = m_udpServer.PrimUpdatesPerCallback;
@@ -4140,6 +4167,27 @@ namespace OpenSim.Region.ClientStack.LindenUDP
ImageManager.ProcessImageQueue(m_udpServer.TextureSendLimit);
}
internal bool HandleHasUpdates(ThrottleOutPacketTypeFlags categories)
{
bool hasUpdates = false;
if ((categories & ThrottleOutPacketTypeFlags.Task) != 0)
{
if (m_entityUpdates.Count > 0)
hasUpdates = true;
else if (m_entityProps.Count > 0)
hasUpdates = true;
}
if ((categories & ThrottleOutPacketTypeFlags.Texture) != 0)
{
if (ImageManager.HasUpdates())
hasUpdates = true;
}
return hasUpdates;
}
public void SendAssetUploadCompleteMessage(sbyte AssetType, bool Success, UUID AssetFullID)
{
AssetUploadCompletePacket newPack = new AssetUploadCompletePacket();

View File

@@ -206,6 +206,13 @@ namespace OpenSim.Region.ClientStack.LindenUDP
}
}
public bool HasUpdates()
{
J2KImage image = GetHighestPriorityImage();
return image != null && image.IsDecoded;
}
public bool ProcessImageQueue(int packetsToSend)
{
int packetsSent = 0;

View File

@@ -31,6 +31,7 @@ using System.Net;
using System.Threading;
using log4net;
using OpenSim.Framework;
using OpenSim.Framework.Monitoring;
using OpenMetaverse;
using OpenMetaverse.Packets;
@@ -81,6 +82,8 @@ namespace OpenSim.Region.ClientStack.LindenUDP
/// hooked to put more data on the empty queue</summary>
public event QueueEmpty OnQueueEmpty;
public event Func<ThrottleOutPacketTypeFlags, bool> HasUpdates;
/// <summary>AgentID for this client</summary>
public readonly UUID AgentID;
/// <summary>The remote address of the connected client</summary>
@@ -613,15 +616,38 @@ namespace OpenSim.Region.ClientStack.LindenUDP
/// <param name="categories">Throttle categories to fire the callback for</param>
private void BeginFireQueueEmpty(ThrottleOutPacketTypeFlags categories)
{
if (m_nextOnQueueEmpty != 0 && (Environment.TickCount & Int32.MaxValue) >= m_nextOnQueueEmpty)
// if (m_nextOnQueueEmpty != 0 && (Environment.TickCount & Int32.MaxValue) >= m_nextOnQueueEmpty)
if (!m_isQueueEmptyRunning && (Environment.TickCount & Int32.MaxValue) >= m_nextOnQueueEmpty)
{
m_isQueueEmptyRunning = true;
int start = Environment.TickCount & Int32.MaxValue;
const int MIN_CALLBACK_MS = 30;
m_nextOnQueueEmpty = start + MIN_CALLBACK_MS;
if (m_nextOnQueueEmpty == 0)
m_nextOnQueueEmpty = 1;
// Use a value of 0 to signal that FireQueueEmpty is running
m_nextOnQueueEmpty = 0;
// Asynchronously run the callback
Util.FireAndForget(FireQueueEmpty, categories);
// m_nextOnQueueEmpty = 0;
m_categories = categories;
if (HasUpdates(m_categories))
{
// Asynchronously run the callback
Util.FireAndForget(FireQueueEmpty, categories);
}
else
{
m_isQueueEmptyRunning = false;
}
}
}
private bool m_isQueueEmptyRunning;
private ThrottleOutPacketTypeFlags m_categories = 0;
/// <summary>
/// Fires the OnQueueEmpty callback and sets the minimum time that it
/// can be called again
@@ -631,22 +657,31 @@ namespace OpenSim.Region.ClientStack.LindenUDP
/// signature</param>
private void FireQueueEmpty(object o)
{
const int MIN_CALLBACK_MS = 30;
// int start = Environment.TickCount & Int32.MaxValue;
// const int MIN_CALLBACK_MS = 30;
ThrottleOutPacketTypeFlags categories = (ThrottleOutPacketTypeFlags)o;
QueueEmpty callback = OnQueueEmpty;
int start = Environment.TickCount & Int32.MaxValue;
// if (m_udpServer.IsRunningOutbound)
// {
ThrottleOutPacketTypeFlags categories = (ThrottleOutPacketTypeFlags)o;
QueueEmpty callback = OnQueueEmpty;
if (callback != null)
{
try { callback(categories); }
catch (Exception e) { m_log.Error("[LLUDPCLIENT]: OnQueueEmpty(" + categories + ") threw an exception: " + e.Message, e); }
}
if (callback != null)
{
// if (m_udpServer.IsRunningOutbound)
// {
try { callback(categories); }
catch (Exception e) { m_log.Error("[LLUDPCLIENT]: OnQueueEmpty(" + categories + ") threw an exception: " + e.Message, e); }
// }
}
// }
m_nextOnQueueEmpty = start + MIN_CALLBACK_MS;
if (m_nextOnQueueEmpty == 0)
m_nextOnQueueEmpty = 1;
// m_nextOnQueueEmpty = start + MIN_CALLBACK_MS;
// if (m_nextOnQueueEmpty == 0)
// m_nextOnQueueEmpty = 1;
// }
m_isQueueEmptyRunning = false;
}
/// <summary>

View File

@@ -806,8 +806,12 @@ namespace OpenSim.Region.ClientStack.LindenUDP
}
PacketPool.Instance.ReturnPacket(packet);
m_dataPresentEvent.Set();
}
private AutoResetEvent m_dataPresentEvent = new AutoResetEvent(false);
/// <summary>
/// Start the process of sending a packet to the client.
/// </summary>
@@ -1658,7 +1662,8 @@ namespace OpenSim.Region.ClientStack.LindenUDP
// Action generic every round
Action<IClientAPI> clientPacketHandler = ClientOutgoingPacketHandler;
while (base.IsRunningOutbound)
while (true)
// while (base.IsRunningOutbound)
{
try
{
@@ -1718,8 +1723,9 @@ namespace OpenSim.Region.ClientStack.LindenUDP
// If nothing was sent, sleep for the minimum amount of time before a
// token bucket could get more tokens
if (!m_packetSent)
Thread.Sleep((int)TickCountResolution);
//if (!m_packetSent)
// Thread.Sleep((int)TickCountResolution);
m_dataPresentEvent.WaitOne(100);
Watchdog.UpdateThread();
}

View File

@@ -308,8 +308,8 @@ namespace OpenMetaverse
public void AsyncBeginSend(UDPPacketBuffer buf)
{
if (IsRunningOutbound)
{
// if (IsRunningOutbound)
// {
try
{
m_udpSocket.BeginSendTo(
@@ -323,7 +323,7 @@ namespace OpenMetaverse
}
catch (SocketException) { }
catch (ObjectDisposedException) { }
}
// }
}
void AsyncEndSend(IAsyncResult result)