LLUDPClient.cs 36 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874
  1. /*
  2. * Copyright (c) Contributors, http://opensimulator.org/
  3. * See CONTRIBUTORS.TXT for a full list of copyright holders.
  4. *
  5. * Redistribution and use in source and binary forms, with or without
  6. * modification, are permitted provided that the following conditions are met:
  7. * * Redistributions of source code must retain the above copyright
  8. * notice, this list of conditions and the following disclaimer.
  9. * * Redistributions in binary form must reproduce the above copyright
  10. * notice, this list of conditions and the following disclaimer in the
  11. * documentation and/or other materials provided with the distribution.
  12. * * Neither the name of the OpenSimulator Project nor the
  13. * names of its contributors may be used to endorse or promote products
  14. * derived from this software without specific prior written permission.
  15. *
  16. * THIS SOFTWARE IS PROVIDED BY THE DEVELOPERS ``AS IS'' AND ANY
  17. * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
  18. * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
  19. * DISCLAIMED. IN NO EVENT SHALL THE CONTRIBUTORS BE LIABLE FOR ANY
  20. * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
  21. * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
  22. * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
  23. * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  24. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
  25. * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  26. */
  27. using System;
  28. using System.Collections.Generic;
  29. using System.Net;
  30. using System.Threading;
  31. using log4net;
  32. using OpenSim.Framework;
  33. using OpenSim.Framework.Monitoring;
  34. using OpenMetaverse;
  35. using OpenMetaverse.Packets;
  36. using TokenBucket = OpenSim.Region.ClientStack.LindenUDP.TokenBucket;
  37. namespace OpenSim.Region.ClientStack.LindenUDP
  38. {
  39. #region Delegates
  40. /// <summary>
  41. /// Fired when updated networking stats are produced for this client
  42. /// </summary>
  43. /// <param name="inPackets">Number of incoming packets received since this
  44. /// event was last fired</param>
  45. /// <param name="outPackets">Number of outgoing packets sent since this
  46. /// event was last fired</param>
  47. /// <param name="unAckedBytes">Current total number of bytes in packets we
  48. /// are waiting on ACKs for</param>
  49. public delegate void PacketStats(int inPackets, int outPackets, int unAckedBytes);
  50. /// <summary>
  51. /// Fired when the queue for one or more packet categories is empty. This
  52. /// event can be hooked to put more data on the empty queues
  53. /// </summary>
  54. /// <param name="category">Categories of the packet queues that are empty</param>
  55. public delegate void QueueEmpty(ThrottleOutPacketTypeFlags categories);
  56. #endregion Delegates
  57. /// <summary>
  58. /// Tracks state for a client UDP connection and provides client-specific methods
  59. /// </summary>
  60. public sealed class LLUDPClient
  61. {
  62. private static readonly ILog m_log = LogManager.GetLogger(System.Reflection.MethodBase.GetCurrentMethod().DeclaringType);
  63. /// <summary>The number of packet categories to throttle on. If a throttle category is added
  64. /// or removed, this number must also change</summary>
  65. const int THROTTLE_CATEGORY_COUNT = 8;
  66. /// <summary>
  67. /// Controls whether information is logged about each outbound packet immediately before it is sent. For debug purposes.
  68. /// </summary>
  69. /// <remarks>Any level above 0 will turn on logging.</remarks>
  70. public int DebugDataOutLevel { get; set; }
  71. /// <summary>
  72. /// Controls whether information is logged about each outbound packet immediately before it is sent. For debug purposes.
  73. /// </summary>
  74. /// <remarks>Any level above 0 will turn on logging.</remarks>
  75. public int ThrottleDebugLevel
  76. {
  77. get
  78. {
  79. return m_throttleDebugLevel;
  80. }
  81. set
  82. {
  83. m_throttleDebugLevel = value;
  84. /*
  85. m_throttleClient.DebugLevel = m_throttleDebugLevel;
  86. foreach (TokenBucket tb in m_throttleCategories)
  87. tb.DebugLevel = m_throttleDebugLevel;
  88. */
  89. }
  90. }
  91. private int m_throttleDebugLevel;
  92. /// <summary>Fired when updated networking stats are produced for this client</summary>
  93. public event PacketStats OnPacketStats;
  94. /// <summary>Fired when the queue for a packet category is empty. This event can be
  95. /// hooked to put more data on the empty queue</summary>
  96. public event QueueEmpty OnQueueEmpty;
  97. public event Func<ThrottleOutPacketTypeFlags, bool> HasUpdates;
  98. /// <summary>AgentID for this client</summary>
  99. public readonly UUID AgentID;
  100. /// <summary>The remote address of the connected client</summary>
  101. public readonly IPEndPoint RemoteEndPoint;
  102. /// <summary>Circuit code that this client is connected on</summary>
  103. public readonly uint CircuitCode;
  104. /// <summary>Sequence numbers of packets we've received (for duplicate checking)</summary>
  105. public IncomingPacketHistoryCollection PacketArchive = new IncomingPacketHistoryCollection(1024);
  106. /// <summary>Packets we have sent that need to be ACKed by the client</summary>
  107. public UnackedPacketCollection NeedAcks = new UnackedPacketCollection();
  108. /// <summary>ACKs that are queued up, waiting to be sent to the client</summary>
  109. public DoubleLocklessQueue<uint> PendingAcks = new DoubleLocklessQueue<uint>();
  110. public int AckStalls;
  111. /// <summary>Current packet sequence number</summary>
  112. public int CurrentSequence;
  113. /// <summary>Current ping sequence number</summary>
  114. public byte CurrentPingSequence;
  115. /// <summary>True when this connection is alive, otherwise false</summary>
  116. public bool IsConnected = true;
  117. /// <summary>True when this connection is paused, otherwise false</summary>
  118. public bool IsPaused;
  119. /// <summary>Environment.TickCount when the last packet was received for this client</summary>
  120. public int TickLastPacketReceived;
  121. /// <summary>Smoothed round-trip time. A smoothed average of the round-trip time for sending a
  122. /// reliable packet to the client and receiving an ACK</summary>
  123. public float SRTT;
  124. /// <summary>Round-trip time variance. Measures the consistency of round-trip times</summary>
  125. public float RTTVAR;
  126. /// <summary>Retransmission timeout. Packets that have not been acknowledged in this number of
  127. /// milliseconds or longer will be resent</summary>
  128. /// <remarks>Calculated from <seealso cref="SRTT"/> and <seealso cref="RTTVAR"/> using the
  129. /// guidelines in RFC 2988</remarks>
  130. public int m_RTO;
  131. /// <summary>Number of bytes received since the last acknowledgement was sent out. This is used
  132. /// to loosely follow the TCP delayed ACK algorithm in RFC 1122 (4.2.3.2)</summary>
  133. public int BytesSinceLastACK;
  134. /// <summary>Number of packets received from this client</summary>
  135. public int PacketsReceived;
  136. /// <summary>Number of packets sent to this client</summary>
  137. public int PacketsSent;
  138. /// <summary>Number of packets resent to this client</summary>
  139. public int PacketsResent;
  140. /// <summary>Total byte count of unacked packets sent to this client</summary>
  141. public int UnackedBytes;
  142. private int m_packetsUnAckReported;
  143. /// <summary>Total number of received packets that we have reported to the OnPacketStats event(s)</summary>
  144. private int m_packetsReceivedReported;
  145. /// <summary>Total number of sent packets that we have reported to the OnPacketStats event(s)</summary>
  146. private int m_packetsSentReported;
  147. /// <summary>Holds the Environment.TickCount value of when the next OnQueueEmpty can be fired</summary>
  148. private double m_nextOnQueueEmpty = 0;
  149. /// <summary>Throttle bucket for this agent's connection</summary>
  150. private AdaptiveTokenBucket m_throttleClient;
  151. public AdaptiveTokenBucket FlowThrottle
  152. {
  153. get { return m_throttleClient; }
  154. }
  155. /// <summary>Throttle buckets for each packet category</summary>
  156. private readonly TokenBucket[] m_throttleCategories;
  157. /// <summary>Outgoing queues for throttled packets</summary>
  158. private DoubleLocklessQueue<OutgoingPacket>[] m_packetOutboxes = new DoubleLocklessQueue<OutgoingPacket>[THROTTLE_CATEGORY_COUNT];
  159. /// <summary>A container that can hold one packet for each outbox, used to store
  160. /// dequeued packets that are being held for throttling</summary>
  161. private OutgoingPacket[] m_nextPackets = new OutgoingPacket[THROTTLE_CATEGORY_COUNT];
  162. /// <summary>A reference to the LLUDPServer that is managing this client</summary>
  163. private readonly LLUDPServer m_udpServer;
  164. /// <summary>Caches packed throttle information</summary>
  165. private byte[] m_packedThrottles;
  166. private int m_defaultRTO = 1000; // 1sec is the recommendation in the RFC
  167. private int m_maxRTO = 3000;
  168. private int m_minRTO = 250;
  169. private float m_burstTime;
  170. private int m_maxRate;
  171. public double m_lastStartpingTimeMS;
  172. public int m_pingMS;
  173. public int PingTimeMS
  174. {
  175. get
  176. {
  177. if (m_pingMS < 10)
  178. return 10;
  179. if(m_pingMS > 2000)
  180. return 2000;
  181. return m_pingMS;
  182. }
  183. }
  184. private ClientInfo m_info = new ClientInfo();
  185. /// <summary>
  186. /// Default constructor
  187. /// </summary>
  188. /// <param name="server">Reference to the UDP server this client is connected to</param>
  189. /// <param name="rates">Default throttling rates and maximum throttle limits</param>
  190. /// <param name="parentThrottle">Parent HTB (hierarchical token bucket)
  191. /// that the child throttles will be governed by</param>
  192. /// <param name="circuitCode">Circuit code for this connection</param>
  193. /// <param name="agentID">AgentID for the connected agent</param>
  194. /// <param name="remoteEndPoint">Remote endpoint for this connection</param>
  195. /// <param name="defaultRTO">
  196. /// Default retransmission timeout for unacked packets. The RTO will never drop
  197. /// beyond this number.
  198. /// </param>
  199. /// <param name="maxRTO">
  200. /// The maximum retransmission timeout for unacked packets. The RTO will never exceed this number.
  201. /// </param>
  202. public LLUDPClient(
  203. LLUDPServer server, ThrottleRates rates, TokenBucket parentThrottle, uint circuitCode, UUID agentID,
  204. IPEndPoint remoteEndPoint, int defaultRTO, int maxRTO)
  205. {
  206. AgentID = agentID;
  207. RemoteEndPoint = remoteEndPoint;
  208. CircuitCode = circuitCode;
  209. m_udpServer = server;
  210. if (defaultRTO != 0)
  211. m_defaultRTO = defaultRTO;
  212. if (maxRTO != 0)
  213. m_maxRTO = maxRTO;
  214. m_burstTime = rates.BurstTime;
  215. m_maxRate = rates.ClientMaxRate;
  216. // Create a token bucket throttle for this client that has the scene token bucket as a parent
  217. m_throttleClient = new AdaptiveTokenBucket(parentThrottle, m_maxRate, m_maxRate * m_burstTime, rates.AdaptiveThrottlesEnabled);
  218. // Create an array of token buckets for this clients different throttle categories
  219. m_throttleCategories = new TokenBucket[THROTTLE_CATEGORY_COUNT];
  220. for (int i = 0; i < THROTTLE_CATEGORY_COUNT; i++)
  221. {
  222. ThrottleOutPacketType type = (ThrottleOutPacketType)i;
  223. // Initialize the packet outboxes, where packets sit while they are waiting for tokens
  224. m_packetOutboxes[i] = new DoubleLocklessQueue<OutgoingPacket>();
  225. // Initialize the token buckets that control the throttling for each category
  226. float rate = rates.GetRate(type);
  227. float burst = rate * m_burstTime;
  228. m_throttleCategories[i] = new TokenBucket(m_throttleClient, rate , burst);
  229. }
  230. m_RTO = m_defaultRTO;
  231. // Initialize this to a sane value to prevent early disconnects
  232. TickLastPacketReceived = Environment.TickCount & Int32.MaxValue;
  233. m_pingMS = 20; // so filter doesnt start at 0;
  234. }
  235. /// <summary>
  236. /// Shuts down this client connection
  237. /// </summary>
  238. public void Shutdown()
  239. {
  240. IsConnected = false;
  241. for (int i = 0; i < THROTTLE_CATEGORY_COUNT; i++)
  242. {
  243. m_packetOutboxes[i].Clear();
  244. m_nextPackets[i] = null;
  245. }
  246. // pull the throttle out of the scene throttle
  247. m_throttleClient.Parent.UnregisterRequest(m_throttleClient);
  248. PendingAcks.Clear();
  249. NeedAcks.Clear();
  250. }
  251. /// <summary>
  252. /// Gets information about this client connection
  253. /// </summary>
  254. /// <returns>Information about the client connection</returns>
  255. public ClientInfo GetClientInfo()
  256. {
  257. // TODO: This data structure is wrong in so many ways. Locking and copying the entire lists
  258. // of pending and needed ACKs for every client every time some method wants information about
  259. // this connection is a recipe for poor performance
  260. m_info.resendThrottle = (int)m_throttleCategories[(int)ThrottleOutPacketType.Resend].DripRate;
  261. m_info.landThrottle = (int)m_throttleCategories[(int)ThrottleOutPacketType.Land].DripRate;
  262. m_info.windThrottle = (int)m_throttleCategories[(int)ThrottleOutPacketType.Wind].DripRate;
  263. m_info.cloudThrottle = (int)m_throttleCategories[(int)ThrottleOutPacketType.Cloud].DripRate;
  264. m_info.taskThrottle = (int)m_throttleCategories[(int)ThrottleOutPacketType.Task].DripRate;
  265. m_info.assetThrottle = (int)m_throttleCategories[(int)ThrottleOutPacketType.Asset].DripRate;
  266. m_info.textureThrottle = (int)m_throttleCategories[(int)ThrottleOutPacketType.Texture].DripRate;
  267. m_info.totalThrottle = (int)m_throttleClient.DripRate;
  268. return m_info;
  269. }
  270. /// <summary>
  271. /// Modifies the UDP throttles
  272. /// </summary>
  273. /// <param name="info">New throttling values</param>
  274. public void SetClientInfo(ClientInfo info)
  275. {
  276. // TODO: Allowing throttles to be manually set from this function seems like a reasonable
  277. // idea. On the other hand, letting external code manipulate our ACK accounting is not
  278. // going to happen
  279. throw new NotImplementedException();
  280. }
  281. /// <summary>
  282. /// Get the total number of pakcets queued for this client.
  283. /// </summary>
  284. /// <returns></returns>
  285. public int GetTotalPacketsQueuedCount()
  286. {
  287. int total = 0;
  288. for (int i = 0; i <= (int)ThrottleOutPacketType.Asset; i++)
  289. total += m_packetOutboxes[i].Count;
  290. return total;
  291. }
  292. /// <summary>
  293. /// Get the number of packets queued for the given throttle type.
  294. /// </summary>
  295. /// <returns></returns>
  296. /// <param name="throttleType"></param>
  297. public int GetPacketsQueuedCount(ThrottleOutPacketType throttleType)
  298. {
  299. int icat = (int)throttleType;
  300. if (icat > 0 && icat < THROTTLE_CATEGORY_COUNT)
  301. return m_packetOutboxes[icat].Count;
  302. else
  303. return 0;
  304. }
  305. /// <summary>
  306. /// Return statistics information about client packet queues.
  307. /// </summary>
  308. /// <remarks>
  309. /// FIXME: This should really be done in a more sensible manner rather than sending back a formatted string.
  310. /// </remarks>
  311. /// <returns></returns>
  312. public string GetStats()
  313. {
  314. return string.Format(
  315. "{0,7} {1,7} {2,7} {3,9} {4,7} {5,7} {6,7} {7,7} {8,7} {9,8} {10,7} {11,7}",
  316. Util.EnvironmentTickCountSubtract(TickLastPacketReceived),
  317. PacketsReceived,
  318. PacketsSent,
  319. PacketsResent,
  320. UnackedBytes,
  321. m_packetOutboxes[(int)ThrottleOutPacketType.Resend].Count,
  322. m_packetOutboxes[(int)ThrottleOutPacketType.Land].Count,
  323. m_packetOutboxes[(int)ThrottleOutPacketType.Wind].Count,
  324. m_packetOutboxes[(int)ThrottleOutPacketType.Cloud].Count,
  325. m_packetOutboxes[(int)ThrottleOutPacketType.Task].Count,
  326. m_packetOutboxes[(int)ThrottleOutPacketType.Texture].Count,
  327. m_packetOutboxes[(int)ThrottleOutPacketType.Asset].Count);
  328. }
  329. public void SendPacketStats()
  330. {
  331. PacketStats callback = OnPacketStats;
  332. if (callback != null)
  333. {
  334. int newPacketsReceived = PacketsReceived - m_packetsReceivedReported;
  335. int newPacketsSent = PacketsSent - m_packetsSentReported;
  336. int newPacketUnAck = UnackedBytes - m_packetsUnAckReported;
  337. callback(newPacketsReceived, newPacketsSent, UnackedBytes);
  338. m_packetsReceivedReported += newPacketsReceived;
  339. m_packetsSentReported += newPacketsSent;
  340. m_packetsUnAckReported += newPacketUnAck;
  341. }
  342. }
  343. public void SetThrottles(byte[] throttleData)
  344. {
  345. SetThrottles(throttleData, 1.0f);
  346. }
  347. public void SetThrottles(byte[] throttleData, float factor)
  348. {
  349. byte[] adjData;
  350. int pos = 0;
  351. if (!BitConverter.IsLittleEndian)
  352. {
  353. byte[] newData = new byte[7 * 4];
  354. Buffer.BlockCopy(throttleData, 0, newData, 0, 7 * 4);
  355. for (int i = 0; i < 7; i++)
  356. Array.Reverse(newData, i * 4, 4);
  357. adjData = newData;
  358. }
  359. else
  360. {
  361. adjData = throttleData;
  362. }
  363. // 0.125f converts from bits to bytes
  364. float scale = 0.125f * factor;
  365. int resend = (int)(BitConverter.ToSingle(adjData, pos) * scale); pos += 4;
  366. int land = (int)(BitConverter.ToSingle(adjData, pos) * scale); pos += 4;
  367. int wind = (int)(BitConverter.ToSingle(adjData, pos) * scale); pos += 4;
  368. int cloud = (int)(BitConverter.ToSingle(adjData, pos) * scale); pos += 4;
  369. int task = (int)(BitConverter.ToSingle(adjData, pos) * scale); pos += 4;
  370. int texture = (int)(BitConverter.ToSingle(adjData, pos) * scale); pos += 4;
  371. int asset = (int)(BitConverter.ToSingle(adjData, pos) * scale);
  372. int total = resend + land + wind + cloud + task + texture + asset;
  373. if(total > m_maxRate)
  374. {
  375. scale = (float)total / m_maxRate;
  376. resend = (int)(resend * scale);
  377. land = (int)(land * scale);
  378. wind = (int)(wind * scale);
  379. cloud = (int)(cloud * scale);
  380. task = (int)(task * scale);
  381. texture = (int)(texture * scale);
  382. asset = (int)(texture * scale);
  383. int ntotal = resend + land + wind + cloud + task + texture + asset;
  384. m_log.DebugFormat("[LLUDPCLIENT]: limiting {0} bandwith from {1} to {2}",AgentID, ntotal, total);
  385. total = ntotal;
  386. }
  387. if (ThrottleDebugLevel > 0)
  388. {
  389. m_log.DebugFormat(
  390. "[LLUDPCLIENT]: {0} is setting throttles in {1} to Resend={2}, Land={3}, Wind={4}, Cloud={5}, Task={6}, Texture={7}, Asset={8}, TOTAL = {9}",
  391. AgentID, m_udpServer.Scene.Name, resend, land, wind, cloud, task, texture, asset, total);
  392. }
  393. TokenBucket bucket;
  394. bucket = m_throttleCategories[(int)ThrottleOutPacketType.Resend];
  395. bucket.RequestedDripRate = resend;
  396. bucket.RequestedBurst = resend * m_burstTime;
  397. bucket = m_throttleCategories[(int)ThrottleOutPacketType.Land];
  398. bucket.RequestedDripRate = land;
  399. bucket.RequestedBurst = land * m_burstTime;
  400. bucket = m_throttleCategories[(int)ThrottleOutPacketType.Wind];
  401. bucket.RequestedDripRate = wind;
  402. bucket.RequestedBurst = wind * m_burstTime;
  403. bucket = m_throttleCategories[(int)ThrottleOutPacketType.Cloud];
  404. bucket.RequestedDripRate = cloud;
  405. bucket.RequestedBurst = cloud * m_burstTime;
  406. bucket = m_throttleCategories[(int)ThrottleOutPacketType.Asset];
  407. bucket.RequestedDripRate = asset;
  408. bucket.RequestedBurst = asset * m_burstTime;
  409. bucket = m_throttleCategories[(int)ThrottleOutPacketType.Task];
  410. bucket.RequestedDripRate = task;
  411. bucket.RequestedBurst = task * m_burstTime;
  412. bucket = m_throttleCategories[(int)ThrottleOutPacketType.Texture];
  413. bucket.RequestedDripRate = texture;
  414. bucket.RequestedBurst = texture * m_burstTime;
  415. m_packedThrottles = null;
  416. }
  417. public byte[] GetThrottlesPacked(float multiplier)
  418. {
  419. byte[] data = m_packedThrottles;
  420. if (data == null)
  421. {
  422. float rate;
  423. data = new byte[7 * 4];
  424. int i = 0;
  425. // multiply by 8 to convert bytes back to bits
  426. multiplier *= 8;
  427. rate = (float)m_throttleCategories[(int)ThrottleOutPacketType.Resend].RequestedDripRate * multiplier;
  428. Buffer.BlockCopy(Utils.FloatToBytes(rate), 0, data, i, 4); i += 4;
  429. rate = (float)m_throttleCategories[(int)ThrottleOutPacketType.Land].RequestedDripRate * multiplier;
  430. Buffer.BlockCopy(Utils.FloatToBytes(rate), 0, data, i, 4); i += 4;
  431. rate = (float)m_throttleCategories[(int)ThrottleOutPacketType.Wind].RequestedDripRate * multiplier;
  432. Buffer.BlockCopy(Utils.FloatToBytes(rate), 0, data, i, 4); i += 4;
  433. rate = (float)m_throttleCategories[(int)ThrottleOutPacketType.Cloud].RequestedDripRate * multiplier;
  434. Buffer.BlockCopy(Utils.FloatToBytes(rate), 0, data, i, 4); i += 4;
  435. rate = (float)m_throttleCategories[(int)ThrottleOutPacketType.Task].RequestedDripRate * multiplier;
  436. Buffer.BlockCopy(Utils.FloatToBytes(rate), 0, data, i, 4); i += 4;
  437. rate = (float)m_throttleCategories[(int)ThrottleOutPacketType.Texture].RequestedDripRate * multiplier;
  438. Buffer.BlockCopy(Utils.FloatToBytes(rate), 0, data, i, 4); i += 4;
  439. rate = (float)m_throttleCategories[(int)ThrottleOutPacketType.Asset].RequestedDripRate * multiplier;
  440. Buffer.BlockCopy(Utils.FloatToBytes(rate), 0, data, i, 4); i += 4;
  441. m_packedThrottles = data;
  442. }
  443. return data;
  444. }
  445. public int GetCatBytesCanSend(ThrottleOutPacketType cat, int timeMS)
  446. {
  447. int icat = (int)cat;
  448. if (icat > 0 && icat < THROTTLE_CATEGORY_COUNT)
  449. {
  450. TokenBucket bucket = m_throttleCategories[icat];
  451. return bucket.GetCatBytesCanSend(timeMS);
  452. }
  453. else
  454. return 0;
  455. }
  456. /// <summary>
  457. /// Queue an outgoing packet if appropriate.
  458. /// </summary>
  459. /// <param name="packet"></param>
  460. /// <param name="forceQueue">Always queue the packet if at all possible.</param>
  461. /// <returns>
  462. /// true if the packet has been queued,
  463. /// false if the packet has not been queued and should be sent immediately.
  464. /// </returns>
  465. public bool EnqueueOutgoing(OutgoingPacket packet)
  466. {
  467. int category = (int)packet.Category;
  468. if (category >= 0 && category < m_packetOutboxes.Length)
  469. {
  470. DoubleLocklessQueue<OutgoingPacket> queue = m_packetOutboxes[category];
  471. queue.Enqueue(packet, false);
  472. return true;
  473. }
  474. else
  475. {
  476. // We don't have a token bucket for this category, so it will not be queued
  477. return false;
  478. }
  479. }
  480. /// <summary>
  481. /// Loops through all of the packet queues for this client and tries to send
  482. /// an outgoing packet from each, obeying the throttling bucket limits
  483. /// </summary>
  484. ///
  485. /// <remarks>
  486. /// Packet queues are inspected in ascending numerical order starting from 0. Therefore, queues with a lower
  487. /// ThrottleOutPacketType number will see their packet get sent first (e.g. if both Land and Wind queues have
  488. /// packets, then the packet at the front of the Land queue will be sent before the packet at the front of the
  489. /// wind queue).
  490. ///
  491. /// This function is only called from a synchronous loop in the
  492. /// UDPServer so we don't need to bother making this thread safe
  493. /// </remarks>
  494. ///
  495. /// <returns>True if any packets were sent, otherwise false</returns>
  496. public bool DequeueOutgoing()
  497. {
  498. // if (m_deliverPackets == false) return false;
  499. OutgoingPacket packet = null;
  500. DoubleLocklessQueue<OutgoingPacket> queue;
  501. bool packetSent = false;
  502. ThrottleOutPacketTypeFlags emptyCategories = 0;
  503. //string queueDebugOutput = String.Empty; // Serious debug business
  504. // do resends
  505. packet = m_nextPackets[0];
  506. if (packet != null)
  507. {
  508. if (packet.Buffer != null)
  509. {
  510. if (m_throttleCategories[0].RemoveTokens(packet.Buffer.DataLength))
  511. {
  512. // Send the packet
  513. m_udpServer.SendPacketFinal(packet);
  514. packetSent = true;
  515. m_nextPackets[0] = null;
  516. }
  517. }
  518. else
  519. m_nextPackets[0] = null;
  520. }
  521. else
  522. {
  523. queue = m_packetOutboxes[0];
  524. if (queue != null)
  525. {
  526. if(queue.Dequeue(out packet))
  527. {
  528. // A packet was pulled off the queue. See if we have
  529. // enough tokens in the bucket to send it out
  530. if (packet.Buffer != null)
  531. {
  532. if (m_throttleCategories[0].RemoveTokens(packet.Buffer.DataLength))
  533. {
  534. // Send the packet
  535. m_udpServer.SendPacketFinal(packet);
  536. packetSent = true;
  537. }
  538. else
  539. {
  540. // Save the dequeued packet for the next iteration
  541. m_nextPackets[0] = packet;
  542. }
  543. }
  544. }
  545. }
  546. else
  547. {
  548. m_packetOutboxes[0] = new DoubleLocklessQueue<OutgoingPacket>();
  549. }
  550. }
  551. if(NeedAcks.Count() > 50)
  552. {
  553. Interlocked.Increment(ref AckStalls);
  554. return true;
  555. }
  556. for (int i = 1; i < THROTTLE_CATEGORY_COUNT; i++)
  557. {
  558. //queueDebugOutput += m_packetOutboxes[i].Count + " "; // Serious debug business
  559. packet = m_nextPackets[i];
  560. if (packet != null)
  561. {
  562. if(packet.Buffer == null)
  563. {
  564. if (m_packetOutboxes[i].Count < 5)
  565. emptyCategories |= CategoryToFlag(i);
  566. m_nextPackets[i] = null;
  567. continue;
  568. }
  569. if (m_throttleCategories[i].RemoveTokens(packet.Buffer.DataLength))
  570. {
  571. // Send the packet
  572. m_udpServer.SendPacketFinal(packet);
  573. m_nextPackets[i] = null;
  574. packetSent = true;
  575. if (m_packetOutboxes[i].Count < 5)
  576. emptyCategories |= CategoryToFlag(i);
  577. }
  578. }
  579. else
  580. {
  581. // No dequeued packet waiting to be sent, try to pull one off
  582. // this queue
  583. queue = m_packetOutboxes[i];
  584. if(queue.Dequeue(out packet))
  585. {
  586. if (packet.Buffer == null)
  587. {
  588. // packet canceled elsewhere (by a ack for example)
  589. if (queue.Count < 5)
  590. emptyCategories |= CategoryToFlag(i);
  591. continue;
  592. }
  593. if (m_throttleCategories[i].RemoveTokens(packet.Buffer.DataLength))
  594. {
  595. // Send the packet
  596. m_udpServer.SendPacketFinal(packet);
  597. packetSent = true;
  598. if (queue.Count < 5)
  599. emptyCategories |= CategoryToFlag(i);
  600. }
  601. else
  602. {
  603. // Save the dequeued packet for the next iteration
  604. m_nextPackets[i] = packet;
  605. }
  606. }
  607. else
  608. {
  609. // No packets in this queue. Fire the queue empty callback
  610. // if it has not been called recently
  611. emptyCategories |= CategoryToFlag(i);
  612. }
  613. }
  614. }
  615. if (emptyCategories != 0)
  616. BeginFireQueueEmpty(emptyCategories);
  617. //m_log.Info("[LLUDPCLIENT]: Queues: " + queueDebugOutput); // Serious debug business
  618. return packetSent;
  619. }
  620. /// <summary>
  621. /// Called when we get a ping update
  622. /// </summary>
  623. /// <param name="r"> ping time in ms
  624. /// acknowledgement</param>
  625. public void UpdateRoundTrip(int p)
  626. {
  627. p *= 5;
  628. if( p> m_maxRTO)
  629. p = m_maxRTO;
  630. else if(p < m_minRTO)
  631. p = m_minRTO;
  632. m_RTO = p;
  633. }
  634. const double MIN_CALLBACK_MS = 20.0;
  635. public bool QueueEmptyRunning;
  636. /// <summary>
  637. /// Does an early check to see if this queue empty callback is already
  638. /// running, then asynchronously firing the event
  639. /// </summary>
  640. /// <param name="categories">Throttle categories to fire the callback for</param>
  641. private void BeginFireQueueEmpty(ThrottleOutPacketTypeFlags categories)
  642. {
  643. if (!QueueEmptyRunning && HasUpdates(categories) && OnQueueEmpty != null)
  644. {
  645. double start = Util.GetTimeStampMS();
  646. if (start < m_nextOnQueueEmpty)
  647. return;
  648. QueueEmptyRunning = true;
  649. m_nextOnQueueEmpty = start + MIN_CALLBACK_MS;
  650. // Asynchronously run the callback
  651. if (m_udpServer.OqrEngine.IsRunning)
  652. {
  653. LLUDPClient udpcli = this;
  654. ThrottleOutPacketTypeFlags cats = categories;
  655. Action<LLUDPClient, ThrottleOutPacketTypeFlags> act = delegate
  656. {
  657. QueueEmpty callback = udpcli.OnQueueEmpty;
  658. if (callback != null)
  659. {
  660. try
  661. {
  662. callback(cats);
  663. }
  664. catch { }
  665. if (callback != null)
  666. udpcli.QueueEmptyRunning = false;
  667. }
  668. udpcli = null;
  669. callback = null;
  670. };
  671. m_udpServer.OqrEngine.QueueJob(AgentID.ToString(), () => act(udpcli, cats));
  672. }
  673. else
  674. Util.FireAndForget(FireQueueEmpty, categories, "LLUDPClient.BeginFireQueueEmpty");
  675. }
  676. }
  677. /// <summary>
  678. /// Fires the OnQueueEmpty callback and sets the minimum time that it
  679. /// can be called again
  680. /// </summary>
  681. /// <param name="o">Throttle categories to fire the callback for,
  682. /// stored as an object to match the WaitCallback delegate
  683. /// signature</param>
  684. public void FireQueueEmpty(object o)
  685. {
  686. QueueEmpty callback = OnQueueEmpty;
  687. if (callback != null)
  688. {
  689. ThrottleOutPacketTypeFlags categories = (ThrottleOutPacketTypeFlags)o;
  690. try { callback(categories); }
  691. catch (Exception e) { m_log.Error("[LLUDPCLIENT]: OnQueueEmpty(" + categories + ") threw an exception: " + e.Message, e); }
  692. }
  693. QueueEmptyRunning = false;
  694. }
  695. internal void ForceThrottleSetting(int throttle, int setting)
  696. {
  697. if (throttle > 0 && throttle < THROTTLE_CATEGORY_COUNT)
  698. m_throttleCategories[throttle].RequestedDripRate = Math.Max(setting, LLUDPServer.MTU);
  699. }
  700. internal int GetThrottleSetting(int throttle)
  701. {
  702. if (throttle > 0 && throttle < THROTTLE_CATEGORY_COUNT)
  703. return (int)m_throttleCategories[throttle].RequestedDripRate;
  704. else
  705. return 0;
  706. }
  707. public void FreeUDPBuffer(UDPPacketBuffer buf)
  708. {
  709. m_udpServer.FreeUDPBuffer(buf);
  710. }
  711. /// <summary>
  712. /// Converts a <seealso cref="ThrottleOutPacketType"/> integer to a
  713. /// flag value
  714. /// </summary>
  715. /// <param name="i">Throttle category to convert</param>
  716. /// <returns>Flag representation of the throttle category</returns>
  717. private static ThrottleOutPacketTypeFlags CategoryToFlag(int i)
  718. {
  719. ThrottleOutPacketType category = (ThrottleOutPacketType)i;
  720. switch (category)
  721. {
  722. case ThrottleOutPacketType.Land:
  723. return ThrottleOutPacketTypeFlags.Land; // Terrain data
  724. case ThrottleOutPacketType.Wind:
  725. return ThrottleOutPacketTypeFlags.Wind; // Wind data
  726. case ThrottleOutPacketType.Cloud:
  727. return ThrottleOutPacketTypeFlags.Cloud; // Cloud data
  728. case ThrottleOutPacketType.Task:
  729. return ThrottleOutPacketTypeFlags.Task; // Object updates and everything not on the other categories
  730. case ThrottleOutPacketType.Texture:
  731. return ThrottleOutPacketTypeFlags.Texture; // Textures data (also impacts http texture and mesh by default)
  732. case ThrottleOutPacketType.Asset:
  733. return ThrottleOutPacketTypeFlags.Asset; // Non-texture Assets data
  734. default:
  735. return 0;
  736. }
  737. }
  738. }
  739. public class DoubleLocklessQueue<T> : OpenSim.Framework.LocklessQueue<T>
  740. {
  741. OpenSim.Framework.LocklessQueue<T> highQueue = new OpenSim.Framework.LocklessQueue<T>();
  742. public override int Count
  743. {
  744. get
  745. {
  746. return base.Count + highQueue.Count;
  747. }
  748. }
  749. public override bool Dequeue(out T item)
  750. {
  751. if (highQueue.Dequeue(out item))
  752. return true;
  753. return base.Dequeue(out item);
  754. }
  755. public void Enqueue(T item, bool highPriority)
  756. {
  757. if (highPriority)
  758. highQueue.Enqueue(item);
  759. else
  760. Enqueue(item);
  761. }
  762. }
  763. }