LLPacketQueue.cs 23 KB


  1. /*
  2. * Copyright (c) Contributors, http://opensimulator.org/
  3. * See CONTRIBUTORS.TXT for a full list of copyright holders.
  4. *
  5. * Redistribution and use in source and binary forms, with or without
  6. * modification, are permitted provided that the following conditions are met:
  7. * * Redistributions of source code must retain the above copyright
  8. * notice, this list of conditions and the following disclaimer.
  9. * * Redistributions in binary form must reproduce the above copyright
  10. * notice, this list of conditions and the following disclaimer in the
  11. * documentation and/or other materials provided with the distribution.
  12. * * Neither the name of the OpenSim Project nor the
  13. * names of its contributors may be used to endorse or promote products
  14. * derived from this software without specific prior written permission.
  15. *
  16. * THIS SOFTWARE IS PROVIDED BY THE DEVELOPERS ``AS IS'' AND ANY
  17. * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
  18. * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
  19. * DISCLAIMED. IN NO EVENT SHALL THE CONTRIBUTORS BE LIABLE FOR ANY
  20. * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
  21. * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
  22. * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
  23. * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  24. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
  25. * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  26. */
  27. using System;
  28. using System.Collections.Generic;
  29. using System.Threading;
  30. using System.Timers;
  31. using libsecondlife;
  32. using libsecondlife.Packets;
  33. using OpenSim.Framework;
  34. using OpenSim.Framework.Statistics;
  35. using OpenSim.Framework.Statistics.Interfaces;
  36. using Timer=System.Timers.Timer;
  37. namespace OpenSim.Region.ClientStack.LindenUDP
  38. {
  39. public class LLPacketQueue : IPullStatsProvider
  40. {
  41. private static readonly log4net.ILog m_log
  42. = log4net.LogManager.GetLogger(System.Reflection.MethodBase.GetCurrentMethod().DeclaringType);
  43. private bool m_enabled = true;
  44. private BlockingQueue<LLQueItem> SendQueue;
  45. private Queue<LLQueItem> IncomingPacketQueue;
  46. private Queue<LLQueItem> OutgoingPacketQueue;
  47. private Queue<LLQueItem> ResendOutgoingPacketQueue;
  48. private Queue<LLQueItem> LandOutgoingPacketQueue;
  49. private Queue<LLQueItem> WindOutgoingPacketQueue;
  50. private Queue<LLQueItem> CloudOutgoingPacketQueue;
  51. private Queue<LLQueItem> TaskOutgoingPacketQueue;
  52. private Queue<LLQueItem> TextureOutgoingPacketQueue;
  53. private Queue<LLQueItem> AssetOutgoingPacketQueue;
  54. private Dictionary<uint, uint> PendingAcks = new Dictionary<uint, uint>();
  55. private Dictionary<uint, Packet> NeedAck = new Dictionary<uint, Packet>();
  56. // All throttle times and number of bytes are calculated by dividing by this value
  57. // This value also determines how many times per throttletimems the timer will run
  58. // If throttleimems is 1000 ms, then the timer will fire every 1000/7 milliseconds
  59. private int throttleTimeDivisor = 7;
  60. private int throttletimems = 1000;
  61. private LLPacketThrottle ResendThrottle;
  62. private LLPacketThrottle LandThrottle;
  63. private LLPacketThrottle WindThrottle;
  64. private LLPacketThrottle CloudThrottle;
  65. private LLPacketThrottle TaskThrottle;
  66. private LLPacketThrottle AssetThrottle;
  67. private LLPacketThrottle TextureThrottle;
  68. private LLPacketThrottle TotalThrottle;
  69. // private long LastThrottle;
  70. // private long ThrottleInterval;
  71. private Timer throttleTimer;
  72. private LLUUID m_agentId;
  73. public LLPacketQueue(LLUUID agentId)
  74. {
  75. // While working on this, the BlockingQueue had me fooled for a bit.
  76. // The Blocking queue causes the thread to stop until there's something
  77. // in it to process. it's an on-purpose threadlock though because
  78. // without it, the clientloop will suck up all sim resources.
  79. SendQueue = new BlockingQueue<LLQueItem>();
  80. IncomingPacketQueue = new Queue<LLQueItem>();
  81. OutgoingPacketQueue = new Queue<LLQueItem>();
  82. ResendOutgoingPacketQueue = new Queue<LLQueItem>();
  83. LandOutgoingPacketQueue = new Queue<LLQueItem>();
  84. WindOutgoingPacketQueue = new Queue<LLQueItem>();
  85. CloudOutgoingPacketQueue = new Queue<LLQueItem>();
  86. TaskOutgoingPacketQueue = new Queue<LLQueItem>();
  87. TextureOutgoingPacketQueue = new Queue<LLQueItem>();
  88. AssetOutgoingPacketQueue = new Queue<LLQueItem>();
  89. // Set up the throttle classes (min, max, current) in bytes
  90. ResendThrottle = new LLPacketThrottle(5000, 100000, 16000);
  91. LandThrottle = new LLPacketThrottle(1000, 100000, 2000);
  92. WindThrottle = new LLPacketThrottle(1000, 100000, 1000);
  93. CloudThrottle = new LLPacketThrottle(1000, 100000, 1000);
  94. TaskThrottle = new LLPacketThrottle(1000, 800000, 3000);
  95. AssetThrottle = new LLPacketThrottle(1000, 800000, 1000);
  96. TextureThrottle = new LLPacketThrottle(1000, 800000, 4000);
  97. // Total Throttle trumps all
  98. // Number of bytes allowed to go out per second. (256kbps per client)
  99. TotalThrottle = new LLPacketThrottle(0, 1500000, 28000);
  100. throttleTimer = new Timer((int) (throttletimems/throttleTimeDivisor));
  101. throttleTimer.Elapsed += new ElapsedEventHandler(ThrottleTimerElapsed);
  102. throttleTimer.Start();
  103. // TIMERS needed for this
  104. // LastThrottle = DateTime.Now.Ticks;
  105. // ThrottleInterval = (long)(throttletimems/throttleTimeDivisor);
  106. m_agentId = agentId;
  107. if (StatsManager.SimExtraStats != null)
  108. {
  109. StatsManager.SimExtraStats.RegisterPacketQueueStatsProvider(m_agentId, this);
  110. }
  111. }
  112. /* STANDARD QUEUE MANIPULATION INTERFACES */
  113. public void Enqueue(LLQueItem item)
  114. {
  115. if (!m_enabled)
  116. {
  117. return;
  118. }
  119. // We could micro lock, but that will tend to actually
  120. // probably be worse than just synchronizing on SendQueue
  121. if (item == null)
  122. {
  123. SendQueue.Enqueue(item);
  124. return;
  125. }
  126. lock (this)
  127. {
  128. switch (item.throttleType)
  129. {
  130. case ThrottleOutPacketType.Resend:
  131. ThrottleCheck(ref ResendThrottle, ref ResendOutgoingPacketQueue, item);
  132. break;
  133. case ThrottleOutPacketType.Texture:
  134. ThrottleCheck(ref TextureThrottle, ref TextureOutgoingPacketQueue, item);
  135. break;
  136. case ThrottleOutPacketType.Task:
  137. ThrottleCheck(ref TaskThrottle, ref TaskOutgoingPacketQueue, item);
  138. break;
  139. case ThrottleOutPacketType.Land:
  140. ThrottleCheck(ref LandThrottle, ref LandOutgoingPacketQueue, item);
  141. break;
  142. case ThrottleOutPacketType.Asset:
  143. ThrottleCheck(ref AssetThrottle, ref AssetOutgoingPacketQueue, item);
  144. break;
  145. case ThrottleOutPacketType.Cloud:
  146. ThrottleCheck(ref CloudThrottle, ref CloudOutgoingPacketQueue, item);
  147. break;
  148. case ThrottleOutPacketType.Wind:
  149. ThrottleCheck(ref WindThrottle, ref WindOutgoingPacketQueue, item);
  150. break;
  151. default:
  152. // Acknowledgements and other such stuff should go directly to the blocking Queue
  153. // Throttling them may and likely 'will' be problematic
  154. SendQueue.Enqueue(item);
  155. break;
  156. }
  157. }
  158. }
  159. public LLQueItem Dequeue()
  160. {
  161. return SendQueue.Dequeue();
  162. }
  163. public void Flush()
  164. {
  165. lock (this)
  166. {
  167. while (PacketsWaiting())
  168. {
  169. //Now comes the fun part.. we dump all our elements into m_packetQueue that we've saved up.
  170. if (ResendOutgoingPacketQueue.Count > 0)
  171. {
  172. SendQueue.Enqueue(ResendOutgoingPacketQueue.Dequeue());
  173. }
  174. if (LandOutgoingPacketQueue.Count > 0)
  175. {
  176. SendQueue.Enqueue(LandOutgoingPacketQueue.Dequeue());
  177. }
  178. if (WindOutgoingPacketQueue.Count > 0)
  179. {
  180. SendQueue.Enqueue(WindOutgoingPacketQueue.Dequeue());
  181. }
  182. if (CloudOutgoingPacketQueue.Count > 0)
  183. {
  184. SendQueue.Enqueue(CloudOutgoingPacketQueue.Dequeue());
  185. }
  186. if (TaskOutgoingPacketQueue.Count > 0)
  187. {
  188. SendQueue.Enqueue(TaskOutgoingPacketQueue.Dequeue());
  189. }
  190. if (TextureOutgoingPacketQueue.Count > 0)
  191. {
  192. SendQueue.Enqueue(TextureOutgoingPacketQueue.Dequeue());
  193. }
  194. if (AssetOutgoingPacketQueue.Count > 0)
  195. {
  196. SendQueue.Enqueue(AssetOutgoingPacketQueue.Dequeue());
  197. }
  198. }
  199. // m_log.Info("[THROTTLE]: Processed " + throttleLoops + " packets");
  200. }
  201. }
  202. public void Close()
  203. {
  204. Flush();
  205. m_enabled = false;
  206. throttleTimer.Stop();
  207. if (StatsManager.SimExtraStats != null)
  208. {
  209. StatsManager.SimExtraStats.DeregisterPacketQueueStatsProvider(m_agentId);
  210. }
  211. }
  212. private void ResetCounters()
  213. {
  214. ResendThrottle.Reset();
  215. LandThrottle.Reset();
  216. WindThrottle.Reset();
  217. CloudThrottle.Reset();
  218. TaskThrottle.Reset();
  219. AssetThrottle.Reset();
  220. TextureThrottle.Reset();
  221. TotalThrottle.Reset();
  222. }
  223. private bool PacketsWaiting()
  224. {
  225. return (ResendOutgoingPacketQueue.Count > 0 ||
  226. LandOutgoingPacketQueue.Count > 0 ||
  227. WindOutgoingPacketQueue.Count > 0 ||
  228. CloudOutgoingPacketQueue.Count > 0 ||
  229. TaskOutgoingPacketQueue.Count > 0 ||
  230. AssetOutgoingPacketQueue.Count > 0 ||
  231. TextureOutgoingPacketQueue.Count > 0);
  232. }
  233. public void ProcessThrottle()
  234. {
  235. // I was considering this.. Will an event fire if the thread it's on is blocked?
  236. // Then I figured out.. it doesn't really matter.. because this thread won't be blocked for long
  237. // The General overhead of the UDP protocol gets sent to the queue un-throttled by this
  238. // so This'll pick up about around the right time.
  239. int MaxThrottleLoops = 4550; // 50*7 packets can be dequeued at once.
  240. int throttleLoops = 0;
  241. // We're going to dequeue all of the saved up packets until
  242. // we've hit the throttle limit or there's no more packets to send
  243. lock (this)
  244. {
  245. ResetCounters();
  246. // m_log.Info("[THROTTLE]: Entering Throttle");
  247. while (TotalThrottle.UnderLimit() && PacketsWaiting() &&
  248. (throttleLoops <= MaxThrottleLoops))
  249. {
  250. throttleLoops++;
  251. //Now comes the fun part.. we dump all our elements into m_packetQueue that we've saved up.
  252. if (ResendThrottle.UnderLimit() && ResendOutgoingPacketQueue.Count > 0)
  253. {
  254. LLQueItem qpack = ResendOutgoingPacketQueue.Dequeue();
  255. SendQueue.Enqueue(qpack);
  256. TotalThrottle.Add(qpack.Packet.ToBytes().Length);
  257. ResendThrottle.Add(qpack.Packet.ToBytes().Length);
  258. }
  259. if (LandThrottle.UnderLimit() && LandOutgoingPacketQueue.Count > 0)
  260. {
  261. LLQueItem qpack = LandOutgoingPacketQueue.Dequeue();
  262. SendQueue.Enqueue(qpack);
  263. TotalThrottle.Add(qpack.Packet.ToBytes().Length);
  264. LandThrottle.Add(qpack.Packet.ToBytes().Length);
  265. }
  266. if (WindThrottle.UnderLimit() && WindOutgoingPacketQueue.Count > 0)
  267. {
  268. LLQueItem qpack = WindOutgoingPacketQueue.Dequeue();
  269. SendQueue.Enqueue(qpack);
  270. TotalThrottle.Add(qpack.Packet.ToBytes().Length);
  271. WindThrottle.Add(qpack.Packet.ToBytes().Length);
  272. }
  273. if (CloudThrottle.UnderLimit() && CloudOutgoingPacketQueue.Count > 0)
  274. {
  275. LLQueItem qpack = CloudOutgoingPacketQueue.Dequeue();
  276. SendQueue.Enqueue(qpack);
  277. TotalThrottle.Add(qpack.Packet.ToBytes().Length);
  278. CloudThrottle.Add(qpack.Packet.ToBytes().Length);
  279. }
  280. if (TaskThrottle.UnderLimit() && TaskOutgoingPacketQueue.Count > 0)
  281. {
  282. LLQueItem qpack = TaskOutgoingPacketQueue.Dequeue();
  283. SendQueue.Enqueue(qpack);
  284. TotalThrottle.Add(qpack.Packet.ToBytes().Length);
  285. TaskThrottle.Add(qpack.Packet.ToBytes().Length);
  286. }
  287. if (TextureThrottle.UnderLimit() && TextureOutgoingPacketQueue.Count > 0)
  288. {
  289. LLQueItem qpack = TextureOutgoingPacketQueue.Dequeue();
  290. SendQueue.Enqueue(qpack);
  291. TotalThrottle.Add(qpack.Packet.ToBytes().Length);
  292. TextureThrottle.Add(qpack.Packet.ToBytes().Length);
  293. }
  294. if (AssetThrottle.UnderLimit() && AssetOutgoingPacketQueue.Count > 0)
  295. {
  296. LLQueItem qpack = AssetOutgoingPacketQueue.Dequeue();
  297. SendQueue.Enqueue(qpack);
  298. TotalThrottle.Add(qpack.Packet.ToBytes().Length);
  299. AssetThrottle.Add(qpack.Packet.ToBytes().Length);
  300. }
  301. }
  302. // m_log.Info("[THROTTLE]: Processed " + throttleLoops + " packets");
  303. }
  304. }
  305. private void ThrottleTimerElapsed(object sender, ElapsedEventArgs e)
  306. {
  307. // just to change the signature, and that ProcessThrottle
  308. // will be used elsewhere possibly
  309. ProcessThrottle();
  310. }
  311. private void ThrottleCheck(ref LLPacketThrottle throttle, ref Queue<LLQueItem> q, LLQueItem item)
  312. {
  313. // The idea.. is if the packet throttle queues are empty
  314. // and the client is under throttle for the type. Queue
  315. // it up directly. This basically short cuts having to
  316. // wait for the timer to fire to put things into the
  317. // output queue
  318. if ((q.Count == 0) && (throttle.UnderLimit()))
  319. {
  320. Monitor.Enter(this);
  321. throttle.Add(item.Packet.ToBytes().Length);
  322. TotalThrottle.Add(item.Packet.ToBytes().Length);
  323. SendQueue.Enqueue(item);
  324. Monitor.Pulse(this);
  325. Monitor.Exit(this);
  326. }
  327. else
  328. {
  329. q.Enqueue(item);
  330. }
  331. }
  332. private static int ScaleThrottle(int value, int curmax, int newmax)
  333. {
  334. return (value / curmax) * newmax;
  335. }
  336. public byte[] GetThrottlesPacked(float multiplier)
  337. {
  338. int singlefloat = 4;
  339. float tResend = ResendThrottle.Throttle*multiplier;
  340. float tLand = LandThrottle.Throttle*multiplier;
  341. float tWind = WindThrottle.Throttle*multiplier;
  342. float tCloud = CloudThrottle.Throttle*multiplier;
  343. float tTask = TaskThrottle.Throttle*multiplier;
  344. float tTexture = TextureThrottle.Throttle*multiplier;
  345. float tAsset = AssetThrottle.Throttle*multiplier;
  346. byte[] throttles = new byte[singlefloat*7];
  347. int i = 0;
  348. Buffer.BlockCopy(BitConverter.GetBytes(tResend), 0, throttles, singlefloat*i, singlefloat);
  349. i++;
  350. Buffer.BlockCopy(BitConverter.GetBytes(tLand), 0, throttles, singlefloat*i, singlefloat);
  351. i++;
  352. Buffer.BlockCopy(BitConverter.GetBytes(tWind), 0, throttles, singlefloat*i, singlefloat);
  353. i++;
  354. Buffer.BlockCopy(BitConverter.GetBytes(tCloud), 0, throttles, singlefloat*i, singlefloat);
  355. i++;
  356. Buffer.BlockCopy(BitConverter.GetBytes(tTask), 0, throttles, singlefloat*i, singlefloat);
  357. i++;
  358. Buffer.BlockCopy(BitConverter.GetBytes(tTexture), 0, throttles, singlefloat*i, singlefloat);
  359. i++;
  360. Buffer.BlockCopy(BitConverter.GetBytes(tAsset), 0, throttles, singlefloat*i, singlefloat);
  361. return throttles;
  362. }
  363. public void SetThrottleFromClient(byte[] throttle)
  364. {
  365. // From mantis http://opensimulator.org/mantis/view.php?id=1374
  366. // it appears that sometimes we are receiving empty throttle byte arrays.
  367. // TODO: Investigate this behaviour
  368. if (throttle.Length == 0)
  369. {
  370. m_log.Warn("[PACKET QUEUE]: SetThrottleFromClient unexpectedly received a throttle byte array containing no elements!");
  371. return;
  372. }
  373. int tResend = -1;
  374. int tLand = -1;
  375. int tWind = -1;
  376. int tCloud = -1;
  377. int tTask = -1;
  378. int tTexture = -1;
  379. int tAsset = -1;
  380. int tall = -1;
  381. int singlefloat = 4;
  382. //Agent Throttle Block contains 7 single floatingpoint values.
  383. int j = 0;
  384. // Some Systems may be big endian...
  385. // it might be smart to do this check more often...
  386. if (!BitConverter.IsLittleEndian)
  387. for (int i = 0; i < 7; i++)
  388. Array.Reverse(throttle, j + i*singlefloat, singlefloat);
  389. // values gotten from libsecondlife.org/wiki/Throttle. Thanks MW_
  390. // bytes
  391. // Convert to integer, since.. the full fp space isn't used.
  392. tResend = (int) BitConverter.ToSingle(throttle, j);
  393. j += singlefloat;
  394. tLand = (int) BitConverter.ToSingle(throttle, j);
  395. j += singlefloat;
  396. tWind = (int) BitConverter.ToSingle(throttle, j);
  397. j += singlefloat;
  398. tCloud = (int) BitConverter.ToSingle(throttle, j);
  399. j += singlefloat;
  400. tTask = (int) BitConverter.ToSingle(throttle, j);
  401. j += singlefloat;
  402. tTexture = (int) BitConverter.ToSingle(throttle, j);
  403. j += singlefloat;
  404. tAsset = (int) BitConverter.ToSingle(throttle, j);
  405. tall = tResend + tLand + tWind + tCloud + tTask + tTexture + tAsset;
  406. /*
  407. m_log.Info("[CLIENT]: Client AgentThrottle - Got throttle:resendbytes=" + tResend +
  408. " landbytes=" + tLand +
  409. " windbytes=" + tWind +
  410. " cloudbytes=" + tCloud +
  411. " taskbytes=" + tTask +
  412. " texturebytes=" + tTexture +
  413. " Assetbytes=" + tAsset +
  414. " Allbytes=" + tall);
  415. */
  416. // Total Sanity
  417. // Make sure that the client sent sane total values.
  418. // If the client didn't send acceptable values....
  419. // Scale the clients values down until they are acceptable.
  420. if (tall <= TotalThrottle.Max)
  421. {
  422. ResendThrottle.Throttle = tResend;
  423. LandThrottle.Throttle = tLand;
  424. WindThrottle.Throttle = tWind;
  425. CloudThrottle.Throttle = tCloud;
  426. TaskThrottle.Throttle = tTask;
  427. TextureThrottle.Throttle = tTexture;
  428. AssetThrottle.Throttle = tAsset;
  429. TotalThrottle.Throttle = tall;
  430. }
  431. else if (tall < 1)
  432. {
  433. // client is stupid, penalize him by minning everything
  434. ResendThrottle.Throttle = ResendThrottle.Min;
  435. LandThrottle.Throttle = LandThrottle.Min;
  436. WindThrottle.Throttle = WindThrottle.Min;
  437. CloudThrottle.Throttle = CloudThrottle.Min;
  438. TaskThrottle.Throttle = TaskThrottle.Min;
  439. TextureThrottle.Throttle = TextureThrottle.Min;
  440. AssetThrottle.Throttle = AssetThrottle.Min;
  441. TotalThrottle.Throttle = TotalThrottle.Min;
  442. }
  443. else
  444. {
  445. // we're over so figure out percentages and use those
  446. ResendThrottle.Throttle = tResend;
  447. LandThrottle.Throttle = ScaleThrottle(tLand, tall, TotalThrottle.Max);
  448. WindThrottle.Throttle = ScaleThrottle(tWind, tall, TotalThrottle.Max);
  449. CloudThrottle.Throttle = ScaleThrottle(tCloud, tall, TotalThrottle.Max);
  450. TaskThrottle.Throttle = ScaleThrottle(tTask, tall, TotalThrottle.Max);
  451. TextureThrottle.Throttle = ScaleThrottle(tTexture, tall, TotalThrottle.Max);
  452. AssetThrottle.Throttle = ScaleThrottle(tAsset, tall, TotalThrottle.Max);
  453. TotalThrottle.Throttle = TotalThrottle.Max;
  454. }
  455. // effectively wiggling the slider causes things reset
  456. ResetCounters();
  457. }
  458. // See IPullStatsProvider
  459. public string GetStats()
  460. {
  461. return string.Format("{0,7} {1,7} {2,7} {3,7} {4,7} {5,7} {6,7} {7,7} {8,7} {9,7}",
  462. SendQueue.Count(),
  463. IncomingPacketQueue.Count,
  464. OutgoingPacketQueue.Count,
  465. ResendOutgoingPacketQueue.Count,
  466. LandOutgoingPacketQueue.Count,
  467. WindOutgoingPacketQueue.Count,
  468. CloudOutgoingPacketQueue.Count,
  469. TaskOutgoingPacketQueue.Count,
  470. TextureOutgoingPacketQueue.Count,
  471. AssetOutgoingPacketQueue.Count);
  472. }
  473. public LLQueItem[] GetQueueArray()
  474. {
  475. return SendQueue.GetQueueArray();
  476. }
  477. }
  478. }