PriorityQueue.cs 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340
  1. /*
  2. * Copyright (c) Contributors, http://opensimulator.org/
  3. * See CONTRIBUTORS.TXT for a full list of copyright holders.
  4. *
  5. * Redistribution and use in source and binary forms, with or without
  6. * modification, are permitted provided that the following conditions are met:
  7. * * Redistributions of source code must retain the above copyright
  8. * notice, this list of conditions and the following disclaimer.
  9. * * Redistributions in binary form must reproduce the above copyright
  10. * notice, this list of conditions and the following disclaimer in the
  11. * documentation and/or other materials provided with the distribution.
  12. * * Neither the name of the OpenSimulator Project nor the
  13. * names of its contributors may be used to endorse or promote products
  14. * derived from this software without specific prior written permission.
  15. *
  16. * THIS SOFTWARE IS PROVIDED BY THE DEVELOPERS ``AS IS'' AND ANY
  17. * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
  18. * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
  19. * DISCLAIMED. IN NO EVENT SHALL THE CONTRIBUTORS BE LIABLE FOR ANY
  20. * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
  21. * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
  22. * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
  23. * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  24. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
  25. * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  26. */
  27. using System;
  28. using System.Collections;
  29. using System.Collections.Generic;
  30. using System.Reflection;
  31. using OpenSim.Framework;
  32. using OpenSim.Framework.Client;
  33. using log4net;
  34. namespace OpenSim.Framework
  35. {
  36. public class PriorityQueue
  37. {
  38. // private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
  39. public delegate bool UpdatePriorityHandler(ref uint priority, ISceneEntity entity);
  40. /// <summary>
  41. /// Total number of queues (priorities) available
  42. /// </summary>
  43. public const uint NumberOfQueues = 12; // includes immediate queues, m_queueCounts need to be set acording
  44. /// <summary>
  45. /// Number of queuest (priorities) that are processed immediately
  46. /// </summary.
  47. public const uint NumberOfImmediateQueues = 2;
  48. private MinHeap<MinHeapItem>[] m_heaps = new MinHeap<MinHeapItem>[NumberOfQueues];
  49. private Dictionary<uint, LookupItem> m_lookupTable;
  50. // internal state used to ensure the deqeues are spread across the priority
  51. // queues "fairly". queuecounts is the amount to pull from each queue in
  52. // each pass. weighted towards the higher priority queues
  53. private uint m_nextQueue = 0;
  54. private uint m_countFromQueue = 0;
  55. // first queues are imediate, so no counts
  56. // private uint[] m_queueCounts = { 0, 0, 8, 4, 4, 2, 2, 2, 2, 1, 1, 1 };
  57. private uint[] m_queueCounts = {0, 0, 8, 8, 5, 4, 3, 2, 1, 1, 1, 1};
  58. // this is ava, ava, attach, <10m, 20,40,80,160m,320,640,1280, +
  59. // next request is a counter of the number of updates queued, it provides
  60. // a total ordering on the updates coming through the queue and is more
  61. // lightweight (and more discriminating) than tick count
  62. private UInt64 m_nextRequest = 0;
  63. /// <summary>
  64. /// Lock for enqueue and dequeue operations on the priority queue
  65. /// </summary>
  66. private object m_syncRoot = new object();
  67. public object SyncRoot {
  68. get { return this.m_syncRoot; }
  69. }
  70. #region constructor
  71. public PriorityQueue() : this(MinHeap<MinHeapItem>.DEFAULT_CAPACITY) { }
  72. public PriorityQueue(int capacity)
  73. {
  74. m_lookupTable = new Dictionary<uint, LookupItem>(capacity);
  75. for (int i = 0; i < m_heaps.Length; ++i)
  76. m_heaps[i] = new MinHeap<MinHeapItem>(capacity);
  77. m_nextQueue = NumberOfImmediateQueues;
  78. m_countFromQueue = m_queueCounts[m_nextQueue];
  79. }
  80. #endregion Constructor
  81. #region PublicMethods
  82. /// <summary>
  83. /// Return the number of items in the queues
  84. /// </summary>
  85. public int Count
  86. {
  87. get
  88. {
  89. int count = 0;
  90. for (int i = 0; i < m_heaps.Length; ++i)
  91. count += m_heaps[i].Count;
  92. return count;
  93. }
  94. }
  95. /// <summary>
  96. /// Enqueue an item into the specified priority queue
  97. /// </summary>
  98. public bool Enqueue(uint pqueue, EntityUpdate value)
  99. {
  100. LookupItem lookup;
  101. uint localid = value.Entity.LocalId;
  102. UInt64 entry = m_nextRequest++;
  103. if (m_lookupTable.TryGetValue(localid, out lookup))
  104. {
  105. entry = lookup.Heap[lookup.Handle].EntryOrder;
  106. value.Update(lookup.Heap[lookup.Handle].Value);
  107. lookup.Heap.Remove(lookup.Handle);
  108. }
  109. pqueue = Util.Clamp<uint>(pqueue, 0, NumberOfQueues - 1);
  110. lookup.Heap = m_heaps[pqueue];
  111. lookup.Heap.Add(new MinHeapItem(pqueue, entry, value), ref lookup.Handle);
  112. m_lookupTable[localid] = lookup;
  113. return true;
  114. }
  115. public void Remove(List<uint> ids)
  116. {
  117. LookupItem lookup;
  118. foreach (uint localid in ids)
  119. {
  120. if (m_lookupTable.TryGetValue(localid, out lookup))
  121. {
  122. lookup.Heap.Remove(lookup.Handle);
  123. m_lookupTable.Remove(localid);
  124. }
  125. }
  126. }
  127. /// <summary>
  128. /// Remove an item from one of the queues. Specifically, it removes the
  129. /// oldest item from the next queue in order to provide fair access to
  130. /// all of the queues
  131. /// </summary>
  132. public bool TryDequeue(out EntityUpdate value, out Int32 timeinqueue)
  133. {
  134. // If there is anything in imediate queues, return it first no
  135. // matter what else. Breaks fairness. But very useful.
  136. for (int iq = 0; iq < NumberOfImmediateQueues; iq++)
  137. {
  138. if (m_heaps[iq].Count > 0)
  139. {
  140. MinHeapItem item = m_heaps[iq].RemoveMin();
  141. m_lookupTable.Remove(item.Value.Entity.LocalId);
  142. timeinqueue = Util.EnvironmentTickCountSubtract(item.EntryTime);
  143. value = item.Value;
  144. return true;
  145. }
  146. }
  147. // To get the fair queing, we cycle through each of the
  148. // queues when finding an element to dequeue.
  149. // We pull (NumberOfQueues - QueueIndex) items from each queue in order
  150. // to give lower numbered queues a higher priority and higher percentage
  151. // of the bandwidth.
  152. // Check for more items to be pulled from the current queue
  153. if (m_heaps[m_nextQueue].Count > 0 && m_countFromQueue > 0)
  154. {
  155. m_countFromQueue--;
  156. MinHeapItem item = m_heaps[m_nextQueue].RemoveMin();
  157. m_lookupTable.Remove(item.Value.Entity.LocalId);
  158. timeinqueue = Util.EnvironmentTickCountSubtract(item.EntryTime);
  159. value = item.Value;
  160. return true;
  161. }
  162. // Find the next non-immediate queue with updates in it
  163. for (uint i = NumberOfImmediateQueues; i < NumberOfQueues; ++i)
  164. {
  165. m_nextQueue++;
  166. if(m_nextQueue >= NumberOfQueues)
  167. m_nextQueue = NumberOfImmediateQueues;
  168. m_countFromQueue = m_queueCounts[m_nextQueue];
  169. if (m_heaps[m_nextQueue].Count > 0)
  170. {
  171. m_countFromQueue--;
  172. MinHeapItem item = m_heaps[m_nextQueue].RemoveMin();
  173. m_lookupTable.Remove(item.Value.Entity.LocalId);
  174. timeinqueue = Util.EnvironmentTickCountSubtract(item.EntryTime);
  175. value = item.Value;
  176. return true;
  177. }
  178. }
  179. timeinqueue = 0;
  180. value = default(EntityUpdate);
  181. return false;
  182. }
  183. /// <summary>
  184. /// Reapply the prioritization function to each of the updates currently
  185. /// stored in the priority queues.
  186. /// </summary
  187. public void Reprioritize(UpdatePriorityHandler handler)
  188. {
  189. MinHeapItem item;
  190. foreach (LookupItem lookup in new List<LookupItem>(this.m_lookupTable.Values))
  191. {
  192. if (lookup.Heap.TryGetValue(lookup.Handle, out item))
  193. {
  194. uint pqueue = item.PriorityQueue;
  195. uint localid = item.Value.Entity.LocalId;
  196. if (handler(ref pqueue, item.Value.Entity))
  197. {
  198. // unless the priority queue has changed, there is no need to modify
  199. // the entry
  200. pqueue = Util.Clamp<uint>(pqueue, 0, NumberOfQueues - 1);
  201. if (pqueue != item.PriorityQueue)
  202. {
  203. lookup.Heap.Remove(lookup.Handle);
  204. LookupItem litem = lookup;
  205. litem.Heap = m_heaps[pqueue];
  206. litem.Heap.Add(new MinHeapItem(pqueue, item), ref litem.Handle);
  207. m_lookupTable[localid] = litem;
  208. }
  209. }
  210. else
  211. {
  212. // m_log.WarnFormat("[PQUEUE]: UpdatePriorityHandler returned false for {0}",item.Value.Entity.UUID);
  213. lookup.Heap.Remove(lookup.Handle);
  214. this.m_lookupTable.Remove(localid);
  215. }
  216. }
  217. }
  218. }
  219. /// <summary>
  220. /// </summary>
  221. public override string ToString()
  222. {
  223. string s = "";
  224. for (int i = 0; i < NumberOfQueues; i++)
  225. s += String.Format("{0,7} ",m_heaps[i].Count);
  226. return s;
  227. }
  228. #endregion PublicMethods
  229. #region MinHeapItem
  230. private struct MinHeapItem : IComparable<MinHeapItem>
  231. {
  232. private EntityUpdate value;
  233. internal EntityUpdate Value {
  234. get {
  235. return this.value;
  236. }
  237. }
  238. private uint pqueue;
  239. internal uint PriorityQueue {
  240. get {
  241. return this.pqueue;
  242. }
  243. }
  244. private Int32 entrytime;
  245. internal Int32 EntryTime {
  246. get {
  247. return this.entrytime;
  248. }
  249. }
  250. private UInt64 entryorder;
  251. internal UInt64 EntryOrder
  252. {
  253. get {
  254. return this.entryorder;
  255. }
  256. }
  257. internal MinHeapItem(uint pqueue, MinHeapItem other)
  258. {
  259. this.entrytime = other.entrytime;
  260. this.entryorder = other.entryorder;
  261. this.value = other.value;
  262. this.pqueue = pqueue;
  263. }
  264. internal MinHeapItem(uint pqueue, UInt64 entryorder, EntityUpdate value)
  265. {
  266. this.entrytime = Util.EnvironmentTickCount();
  267. this.entryorder = entryorder;
  268. this.value = value;
  269. this.pqueue = pqueue;
  270. }
  271. public override string ToString()
  272. {
  273. return String.Format("[{0},{1},{2}]",pqueue,entryorder,value.Entity.LocalId);
  274. }
  275. public int CompareTo(MinHeapItem other)
  276. {
  277. // I'm assuming that the root part of an SOG is added to the update queue
  278. // before the component parts
  279. return Comparer<UInt64>.Default.Compare(this.EntryOrder, other.EntryOrder);
  280. }
  281. }
  282. #endregion
  283. #region LookupItem
  284. private struct LookupItem
  285. {
  286. internal MinHeap<MinHeapItem> Heap;
  287. internal IHandle Handle;
  288. }
  289. #endregion
  290. }
  291. }