1
0

PriorityQueue.cs 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486
  1. /*
  2. * Copyright (c) Contributors, http://opensimulator.org/
  3. * See CONTRIBUTORS.TXT for a full list of copyright holders.
  4. *
  5. * Redistribution and use in source and binary forms, with or without
  6. * modification, are permitted provided that the following conditions are met:
  7. * * Redistributions of source code must retain the above copyright
  8. * notice, this list of conditions and the following disclaimer.
  9. * * Redistributions in binary form must reproduce the above copyright
  10. * notice, this list of conditions and the following disclaimer in the
  11. * documentation and/or other materials provided with the distribution.
  12. * * Neither the name of the OpenSimulator Project nor the
  13. * names of its contributors may be used to endorse or promote products
  14. * derived from this software without specific prior written permission.
  15. *
  16. * THIS SOFTWARE IS PROVIDED BY THE DEVELOPERS ``AS IS'' AND ANY
  17. * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
  18. * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
  19. * DISCLAIMED. IN NO EVENT SHALL THE CONTRIBUTORS BE LIABLE FOR ANY
  20. * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
  21. * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
  22. * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
  23. * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  24. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
  25. * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  26. */
  27. using System;
  28. using System.Collections.Generic;
  29. using System.Collections.Concurrent;
  30. using OpenSim.Framework;
  31. namespace OpenSim.Region.Framework.Scenes
  32. {
  33. public class PriorityQueue
  34. {
  35. // private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
  36. public delegate bool UpdatePriorityHandler(ref int priority, ISceneEntity entity);
  37. /// <summary>
  38. /// Total number of queues (priorities) available
  39. /// </summary>
  40. public const int NumberOfQueues = 13; // includes immediate queues, m_queueCounts need to be set acording
  41. /// <summary>
  42. /// Number of queuest (priorities) that are processed immediately
  43. /// </summary.
  44. public const int NumberOfImmediateQueues = 2;
  45. // first queues are immediate, so no counts
  46. private static readonly int[] m_queueCounts = {0, 0, 8, 8, 5, 4, 3, 2, 1, 1, 1, 1, 1 };
  47. // this is ava, ava, attach, <10m, 20,40,80,160m,320,640,1280, +
  48. private PriorityMinHeap[] m_heaps = new PriorityMinHeap[NumberOfQueues];
  49. private ConcurrentDictionary<uint, EntityUpdate> m_lookupTable;
  50. // internal state used to ensure the deqeues are spread across the priority
  51. // queues "fairly". queuecounts is the amount to pull from each queue in
  52. // each pass. weighted towards the higher priority queues
  53. private int m_nextQueue = 0;
  54. private int m_countFromQueue = 0;
  55. // next request is a counter of the number of updates queued, it provides
  56. // a total ordering on the updates coming through the queue and is more
  57. // lightweight (and more discriminating) than tick count
  58. private ulong m_nextRequest = 0;
  59. /// <summary>
  60. /// Lock for enqueue and dequeue operations on the priority queue
  61. /// </summary>
  62. private object m_mainLock = new object();
  63. public object syncRoot
  64. {
  65. get { return m_mainLock; }
  66. }
  67. #region constructor
  68. public PriorityQueue(int capacity)
  69. {
  70. capacity /= 4;
  71. for (int i = 0; i < m_heaps.Length; ++i)
  72. m_heaps[i] = new PriorityMinHeap(capacity);
  73. m_lookupTable = new ConcurrentDictionary<uint, EntityUpdate>();
  74. m_nextQueue = NumberOfImmediateQueues;
  75. m_countFromQueue = m_queueCounts[m_nextQueue];
  76. }
  77. #endregion Constructor
  78. #region PublicMethods
  79. public void Close()
  80. {
  81. PriorityMinHeap[] tmpheaps;
  82. lock (m_mainLock)
  83. {
  84. tmpheaps = m_heaps;
  85. m_heaps = null;
  86. m_lookupTable.Clear();
  87. m_lookupTable = null;
  88. }
  89. for (int i = 0; i < tmpheaps.Length; ++i)
  90. {
  91. tmpheaps[i].Clear();
  92. tmpheaps[i] = null;
  93. }
  94. }
  95. /// <summary>
  96. /// Return the number of items in the queues
  97. /// </summary>
  98. public int Count
  99. {
  100. get
  101. {
  102. return m_lookupTable.Count;
  103. }
  104. }
  105. /// <summary>
  106. /// Enqueue an item into the specified priority queue
  107. /// </summary>
  108. public bool Enqueue(int pqueue, EntityUpdate value)
  109. {
  110. uint localid = value.Entity.LocalId;
  111. try
  112. {
  113. lock (m_mainLock)
  114. {
  115. if (m_lookupTable.TryGetValue(localid, out EntityUpdate existentup))
  116. {
  117. int eqqueue = existentup.PriorityQueue;
  118. existentup.UpdateFromNew(value, pqueue);
  119. value.Free();
  120. if (pqueue != eqqueue)
  121. {
  122. m_heaps[eqqueue].RemoveAt(existentup.PriorityQueueIndex);
  123. m_heaps[pqueue].Add(existentup);
  124. }
  125. return true;
  126. }
  127. ulong entry = m_nextRequest++;
  128. value.Update(pqueue, entry);
  129. m_heaps[pqueue].Add(value);
  130. m_lookupTable[localid] = value;
  131. }
  132. return true;
  133. }
  134. catch
  135. {
  136. return true;
  137. }
  138. }
  139. public void Remove(List<uint> ids)
  140. {
  141. try
  142. {
  143. lock (m_mainLock)
  144. {
  145. foreach (uint localid in ids)
  146. {
  147. if (m_lookupTable.TryRemove(localid, out EntityUpdate lookup))
  148. {
  149. m_heaps[lookup.PriorityQueue].RemoveAt(lookup.PriorityQueueIndex);
  150. lookup.Free();
  151. }
  152. }
  153. }
  154. }
  155. catch
  156. {
  157. }
  158. }
  159. /// <summary>
  160. /// Remove an item from one of the queues. Specifically, it removes the
  161. /// oldest item from the next queue in order to provide fair access to
  162. /// all of the queues
  163. /// </summary>
  164. public bool TryDequeue(out EntityUpdate value)
  165. {
  166. // If there is anything in immediate queues, return it first no
  167. // matter what else. Breaks fairness. But very useful.
  168. try
  169. {
  170. lock(m_mainLock)
  171. {
  172. for (int iq = 0; iq < NumberOfImmediateQueues; iq++)
  173. {
  174. if (m_heaps[iq].Count > 0)
  175. {
  176. value = m_heaps[iq].RemoveNext();
  177. return m_lookupTable.TryRemove(value.Entity.LocalId, out value);
  178. }
  179. }
  180. // To get the fair queing, we cycle through each of the
  181. // queues when finding an element to dequeue.
  182. // We pull (NumberOfQueues - QueueIndex) items from each queue in order
  183. // to give lower numbered queues a higher priority and higher percentage
  184. // of the bandwidth.
  185. PriorityMinHeap curheap = m_heaps[m_nextQueue];
  186. // Check for more items to be pulled from the current queue
  187. if (m_countFromQueue > 0 && curheap.Count > 0)
  188. {
  189. --m_countFromQueue;
  190. value = curheap.RemoveNext();
  191. return m_lookupTable.TryRemove(value.Entity.LocalId, out value);
  192. }
  193. // Find the next non-immediate queue with updates in it
  194. for (int i = NumberOfImmediateQueues; i < NumberOfQueues; ++i)
  195. {
  196. m_nextQueue++;
  197. if(m_nextQueue >= NumberOfQueues)
  198. m_nextQueue = NumberOfImmediateQueues;
  199. curheap = m_heaps[m_nextQueue];
  200. if (curheap.Count == 0)
  201. continue;
  202. m_countFromQueue = m_queueCounts[m_nextQueue];
  203. --m_countFromQueue;
  204. value = curheap.RemoveNext();
  205. return m_lookupTable.TryRemove(value.Entity.LocalId, out value);
  206. }
  207. }
  208. }
  209. catch
  210. {
  211. }
  212. value = null;
  213. return false;
  214. }
  215. public bool TryOrderedDequeue(out EntityUpdate value)
  216. {
  217. try
  218. {
  219. lock(m_mainLock)
  220. {
  221. for (int iq = 0; iq < NumberOfQueues; ++iq)
  222. {
  223. PriorityMinHeap curheap = m_heaps[iq];
  224. if (curheap.Count > 0)
  225. {
  226. value = curheap.RemoveNext();
  227. return m_lookupTable.TryRemove(value.Entity.LocalId, out value);
  228. }
  229. }
  230. }
  231. }
  232. catch
  233. {
  234. }
  235. value = null;
  236. return false;
  237. }
  238. /// <summary>
  239. /// Reapply the prioritization function to each of the updates currently
  240. /// stored in the priority queues.
  241. /// </summary
  242. public void Reprioritize(UpdatePriorityHandler handler)
  243. {
  244. int pqueue = 0;
  245. try
  246. {
  247. lock (m_mainLock)
  248. {
  249. foreach (EntityUpdate currentEU in m_lookupTable.Values)
  250. {
  251. if (handler(ref pqueue, currentEU.Entity))
  252. {
  253. // unless the priority queue has changed, there is no need to modify
  254. // the entry
  255. if (pqueue != currentEU.PriorityQueue)
  256. {
  257. m_heaps[currentEU.PriorityQueue].RemoveAt(currentEU.PriorityQueueIndex);
  258. currentEU.PriorityQueue = pqueue;
  259. m_heaps[pqueue].Add(currentEU);
  260. }
  261. }
  262. else
  263. {
  264. break;
  265. }
  266. }
  267. }
  268. }
  269. catch
  270. {
  271. }
  272. }
  273. /// <summary>
  274. /// </summary>
  275. public override string ToString()
  276. {
  277. string s = "";
  278. for (int i = 0; i < NumberOfQueues; i++)
  279. s += String.Format("{0,7} ", m_heaps[i].Count);
  280. return s;
  281. }
  282. #endregion PublicMethods
  283. }
  284. public class PriorityMinHeap
  285. {
  286. public const int MIN_CAPACITY = 16;
  287. private EntityUpdate[] m_items;
  288. private int m_size;
  289. private int minCapacity;
  290. public PriorityMinHeap(int _capacity)
  291. {
  292. minCapacity = MIN_CAPACITY;
  293. m_items = new EntityUpdate[_capacity];
  294. m_size = 0;
  295. }
  296. public int Count { get { return m_size; } }
  297. private bool BubbleUp(int index)
  298. {
  299. EntityUpdate tmp;
  300. EntityUpdate item = m_items[index];
  301. ulong itemEntryOrder = item.EntryOrder;
  302. int current, parent;
  303. for (current = index, parent = (current - 1) / 2;
  304. (current > 0) && m_items[parent].EntryOrder > itemEntryOrder;
  305. current = parent, parent = (current - 1) / 2)
  306. {
  307. tmp = m_items[parent];
  308. tmp.PriorityQueueIndex = current;
  309. m_items[current] = tmp;
  310. }
  311. if (current != index)
  312. {
  313. item.PriorityQueueIndex = current;
  314. m_items[current] = item;
  315. return true;
  316. }
  317. return false;
  318. }
  319. private void BubbleDown(int index)
  320. {
  321. if(m_size < 2)
  322. return;
  323. EntityUpdate childItem;
  324. EntityUpdate childItemR;
  325. EntityUpdate item = m_items[index];
  326. ulong itemEntryOrder = item.EntryOrder;
  327. int current;
  328. int child;
  329. int childlimit = m_size - 1;
  330. for (current = index, child = (2 * current) + 1;
  331. current < m_size / 2;
  332. current = child, child = (2 * current) + 1)
  333. {
  334. childItem = m_items[child];
  335. if (child < childlimit)
  336. {
  337. childItemR = m_items[child + 1];
  338. if(childItem.EntryOrder > childItemR.EntryOrder)
  339. {
  340. childItem = childItemR;
  341. ++child;
  342. }
  343. }
  344. if (childItem.EntryOrder >= itemEntryOrder)
  345. break;
  346. childItem.PriorityQueueIndex = current;
  347. m_items[current] = childItem;
  348. }
  349. if (current != index)
  350. {
  351. item.PriorityQueueIndex = current;
  352. m_items[current] = item;
  353. }
  354. }
  355. public void Add(EntityUpdate value)
  356. {
  357. if (m_size == m_items.Length)
  358. {
  359. int newcapacity = (int)((m_items.Length * 200L) / 100L);
  360. if (newcapacity < (m_items.Length + MIN_CAPACITY))
  361. newcapacity = m_items.Length + MIN_CAPACITY;
  362. Array.Resize<EntityUpdate>(ref m_items, newcapacity);
  363. }
  364. value.PriorityQueueIndex = m_size;
  365. m_items[m_size] = value;
  366. BubbleUp(m_size);
  367. ++m_size;
  368. }
  369. public void Clear()
  370. {
  371. for (int index = 0; index < m_size; ++index)
  372. m_items[index].Free();
  373. m_size = 0;
  374. }
  375. public void RemoveAt(int index)
  376. {
  377. if (m_size == 0)
  378. throw new InvalidOperationException("Heap is empty");
  379. if (index >= m_size)
  380. throw new ArgumentOutOfRangeException("index");
  381. --m_size;
  382. if (m_size > 0)
  383. {
  384. if (index != m_size)
  385. {
  386. EntityUpdate tmp = m_items[m_size];
  387. tmp.PriorityQueueIndex = index;
  388. m_items[index] = tmp;
  389. m_items[m_size] = null;
  390. if (!BubbleUp(index))
  391. BubbleDown(index);
  392. }
  393. }
  394. else if (m_items.Length > 4 * minCapacity)
  395. m_items = new EntityUpdate[minCapacity];
  396. }
  397. public EntityUpdate RemoveNext()
  398. {
  399. if (m_size == 0)
  400. throw new InvalidOperationException("Heap is empty");
  401. EntityUpdate item = m_items[0];
  402. --m_size;
  403. if (m_size > 0)
  404. {
  405. EntityUpdate tmp = m_items[m_size];
  406. tmp.PriorityQueueIndex = 0;
  407. m_items[0] = tmp;
  408. m_items[m_size] = null;
  409. BubbleDown(0);
  410. }
  411. else if (m_items.Length > 4 * minCapacity)
  412. m_items = new EntityUpdate[minCapacity];
  413. return item;
  414. }
  415. public bool Remove(EntityUpdate value)
  416. {
  417. int index = value.PriorityQueueIndex;
  418. if (index != -1)
  419. {
  420. RemoveAt(index);
  421. return true;
  422. }
  423. return false;
  424. }
  425. }
  426. }