WorkItemsQueue.cs 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600
  1. // Ami Bar
  2. // [email protected]
  3. using System;
  4. using System.Threading;
  5. namespace Amib.Threading.Internal
  6. {
  7. #region WorkItemsQueue class
  8. /// <summary>
  9. /// WorkItemsQueue class.
  10. /// </summary>
  11. public class WorkItemsQueue : IDisposable
  12. {
  13. #region Member variables
  14. /// <summary>
  15. /// Waiters queue (implemented as stack).
  16. /// </summary>
  17. private WaiterEntry _headWaiterEntry = new WaiterEntry();
  18. /// <summary>
  19. /// Waiters count
  20. /// </summary>
  21. private int _waitersCount = 0;
  22. /// <summary>
  23. /// Work items queue
  24. /// </summary>
  25. private PriorityQueue _workItems = new PriorityQueue();
  26. /// <summary>
  27. /// Indicate that work items are allowed to be queued
  28. /// </summary>
  29. private bool _isWorkItemsQueueActive = true;
  30. /// <summary>
  31. /// Each thread in the thread pool keeps its own waiter entry.
  32. /// </summary>
  33. [ThreadStatic]
  34. private static WaiterEntry _waiterEntry;
  35. /// <summary>
  36. /// A flag that indicates if the WorkItemsQueue has been disposed.
  37. /// </summary>
  38. private bool _isDisposed = false;
  39. #endregion
  40. #region Public properties
  41. /// <summary>
  42. /// Returns the current number of work items in the queue
  43. /// </summary>
  44. public int Count
  45. {
  46. get
  47. {
  48. lock(this)
  49. {
  50. ValidateNotDisposed();
  51. return _workItems.Count;
  52. }
  53. }
  54. }
  55. /// <summary>
  56. /// Returns the current number of waiters
  57. /// </summary>
  58. public int WaitersCount
  59. {
  60. get
  61. {
  62. lock(this)
  63. {
  64. ValidateNotDisposed();
  65. return _waitersCount;
  66. }
  67. }
  68. }
  69. #endregion
  70. #region Public methods
  71. /// <summary>
  72. /// Enqueue a work item to the queue.
  73. /// </summary>
  74. public bool EnqueueWorkItem(WorkItem workItem)
  75. {
  76. // A work item cannot be null, since null is used in the
  77. // WaitForWorkItem() method to indicate timeout or cancel
  78. if (null == workItem)
  79. {
  80. throw new ArgumentNullException("workItem" , "workItem cannot be null");
  81. }
  82. bool enqueue = true;
  83. // First check if there is a waiter waiting for work item. During
  84. // the check, timed out waiters are ignored. If there is no
  85. // waiter then the work item is queued.
  86. lock(this)
  87. {
  88. ValidateNotDisposed();
  89. if (!_isWorkItemsQueueActive)
  90. {
  91. return false;
  92. }
  93. while(_waitersCount > 0)
  94. {
  95. // Dequeue a waiter.
  96. WaiterEntry waiterEntry = PopWaiter();
  97. // Signal the waiter. On success break the loop
  98. if (waiterEntry.Signal(workItem))
  99. {
  100. enqueue = false;
  101. break;
  102. }
  103. }
  104. if (enqueue)
  105. {
  106. // Enqueue the work item
  107. _workItems.Enqueue(workItem);
  108. }
  109. }
  110. return true;
  111. }
  112. /// <summary>
  113. /// Waits for a work item or exits on timeout or cancel
  114. /// </summary>
  115. /// <param name="millisecondsTimeout">Timeout in milliseconds</param>
  116. /// <param name="cancelEvent">Cancel wait handle</param>
  117. /// <returns>Returns true if the resource was granted</returns>
  118. public WorkItem DequeueWorkItem(
  119. int millisecondsTimeout,
  120. WaitHandle cancelEvent)
  121. {
  122. /// This method cause the caller to wait for a work item.
  123. /// If there is at least one waiting work item then the
  124. /// method returns immidiately with true.
  125. ///
  126. /// If there are no waiting work items then the caller
  127. /// is queued between other waiters for a work item to arrive.
  128. ///
  129. /// If a work item didn't come within millisecondsTimeout or
  130. /// the user canceled the wait by signaling the cancelEvent
  131. /// then the method returns false to indicate that the caller
  132. /// didn't get a work item.
  133. WaiterEntry waiterEntry = null;
  134. WorkItem workItem = null;
  135. lock(this)
  136. {
  137. ValidateNotDisposed();
  138. // If there are waiting work items then take one and return.
  139. if (_workItems.Count > 0)
  140. {
  141. workItem = _workItems.Dequeue() as WorkItem;
  142. return workItem;
  143. }
  144. // No waiting work items ...
  145. else
  146. {
  147. // Get the wait entry for the waiters queue
  148. waiterEntry = GetThreadWaiterEntry();
  149. // Put the waiter with the other waiters
  150. PushWaiter(waiterEntry);
  151. }
  152. }
  153. // Prepare array of wait handle for the WaitHandle.WaitAny()
  154. WaitHandle [] waitHandles = new WaitHandle [] {
  155. waiterEntry.WaitHandle,
  156. cancelEvent };
  157. // Wait for an available resource, cancel event, or timeout.
  158. // During the wait we are supposes to exit the synchronization
  159. // domain. (Placing true as the third argument of the WaitAny())
  160. // It just doesn't work, I don't know why, so I have lock(this)
  161. // statments insted of one.
  162. int index = WaitHandle.WaitAny(
  163. waitHandles,
  164. millisecondsTimeout,
  165. true);
  166. lock(this)
  167. {
  168. // success is true if it got a work item.
  169. bool success = (0 == index);
  170. // The timeout variable is used only for readability.
  171. // (We treat cancel as timeout)
  172. bool timeout = !success;
  173. // On timeout update the waiterEntry that it is timed out
  174. if (timeout)
  175. {
  176. // The Timeout() fails if the waiter has already been signaled
  177. timeout = waiterEntry.Timeout();
  178. // On timeout remove the waiter from the queue.
  179. // Note that the complexity is O(1).
  180. if(timeout)
  181. {
  182. RemoveWaiter(waiterEntry, false);
  183. }
  184. // Again readability
  185. success = !timeout;
  186. }
  187. // On success return the work item
  188. if (success)
  189. {
  190. workItem = waiterEntry.WorkItem;
  191. if (null == workItem)
  192. {
  193. workItem = _workItems.Dequeue() as WorkItem;
  194. }
  195. }
  196. }
  197. // On failure return null.
  198. return workItem;
  199. }
  200. /// <summary>
  201. /// Cleanup the work items queue, hence no more work
  202. /// items are allowed to be queue
  203. /// </summary>
  204. protected virtual void Cleanup()
  205. {
  206. lock(this)
  207. {
  208. // Deactivate only once
  209. if (!_isWorkItemsQueueActive)
  210. {
  211. return;
  212. }
  213. // Don't queue more work items
  214. _isWorkItemsQueueActive = false;
  215. foreach(WorkItem workItem in _workItems)
  216. {
  217. workItem.DisposeOfState();
  218. }
  219. // Clear the work items that are already queued
  220. _workItems.Clear();
  221. // Note:
  222. // I don't iterate over the queue and dispose of work items's states,
  223. // since if a work item has a state object that is still in use in the
  224. // application then I must not dispose it.
  225. // Tell the waiters that they were timed out.
  226. // It won't signal them to exit, but to ignore their
  227. // next work item.
  228. while(_waitersCount > 0)
  229. {
  230. WaiterEntry waiterEntry = PopWaiter();
  231. waiterEntry.Timeout();
  232. }
  233. }
  234. }
  235. #endregion
  236. #region Private methods
  237. /// <summary>
  238. /// Returns the WaiterEntry of the current thread
  239. /// </summary>
  240. /// <returns></returns>
  241. /// In order to avoid creation and destuction of WaiterEntry
  242. /// objects each thread has its own WaiterEntry object.
  243. private WaiterEntry GetThreadWaiterEntry()
  244. {
  245. if (null == _waiterEntry)
  246. {
  247. _waiterEntry = new WaiterEntry();
  248. }
  249. _waiterEntry.Reset();
  250. return _waiterEntry;
  251. }
  252. #region Waiters stack methods
  253. /// <summary>
  254. /// Push a new waiter into the waiter's stack
  255. /// </summary>
  256. /// <param name="newWaiterEntry">A waiter to put in the stack</param>
  257. public void PushWaiter(WaiterEntry newWaiterEntry)
  258. {
  259. // Remove the waiter if it is already in the stack and
  260. // update waiter's count as needed
  261. RemoveWaiter(newWaiterEntry, false);
  262. // If the stack is empty then newWaiterEntry is the new head of the stack
  263. if (null == _headWaiterEntry._nextWaiterEntry)
  264. {
  265. _headWaiterEntry._nextWaiterEntry = newWaiterEntry;
  266. newWaiterEntry._prevWaiterEntry = _headWaiterEntry;
  267. }
  268. // If the stack is not empty then put newWaiterEntry as the new head
  269. // of the stack.
  270. else
  271. {
  272. // Save the old first waiter entry
  273. WaiterEntry oldFirstWaiterEntry = _headWaiterEntry._nextWaiterEntry;
  274. // Update the links
  275. _headWaiterEntry._nextWaiterEntry = newWaiterEntry;
  276. newWaiterEntry._nextWaiterEntry = oldFirstWaiterEntry;
  277. newWaiterEntry._prevWaiterEntry = _headWaiterEntry;
  278. oldFirstWaiterEntry._prevWaiterEntry = newWaiterEntry;
  279. }
  280. // Increment the number of waiters
  281. ++_waitersCount;
  282. }
  283. /// <summary>
  284. /// Pop a waiter from the waiter's stack
  285. /// </summary>
  286. /// <returns>Returns the first waiter in the stack</returns>
  287. private WaiterEntry PopWaiter()
  288. {
  289. // Store the current stack head
  290. WaiterEntry oldFirstWaiterEntry = _headWaiterEntry._nextWaiterEntry;
  291. // Store the new stack head
  292. WaiterEntry newHeadWaiterEntry = oldFirstWaiterEntry._nextWaiterEntry;
  293. // Update the old stack head list links and decrement the number
  294. // waiters.
  295. RemoveWaiter(oldFirstWaiterEntry, true);
  296. // Update the new stack head
  297. _headWaiterEntry._nextWaiterEntry = newHeadWaiterEntry;
  298. if (null != newHeadWaiterEntry)
  299. {
  300. newHeadWaiterEntry._prevWaiterEntry = _headWaiterEntry;
  301. }
  302. // Return the old stack head
  303. return oldFirstWaiterEntry;
  304. }
  305. /// <summary>
  306. /// Remove a waiter from the stack
  307. /// </summary>
  308. /// <param name="waiterEntry">A waiter entry to remove</param>
  309. /// <param name="popDecrement">If true the waiter count is always decremented</param>
  310. private void RemoveWaiter(WaiterEntry waiterEntry, bool popDecrement)
  311. {
  312. // Store the prev entry in the list
  313. WaiterEntry prevWaiterEntry = waiterEntry._prevWaiterEntry;
  314. // Store the next entry in the list
  315. WaiterEntry nextWaiterEntry = waiterEntry._nextWaiterEntry;
  316. // A flag to indicate if we need to decrement the waiters count.
  317. // If we got here from PopWaiter then we must decrement.
  318. // If we got here from PushWaiter then we decrement only if
  319. // the waiter was already in the stack.
  320. bool decrementCounter = popDecrement;
  321. // Null the waiter's entry links
  322. waiterEntry._prevWaiterEntry = null;
  323. waiterEntry._nextWaiterEntry = null;
  324. // If the waiter entry had a prev link then update it.
  325. // It also means that the waiter is already in the list and we
  326. // need to decrement the waiters count.
  327. if (null != prevWaiterEntry)
  328. {
  329. prevWaiterEntry._nextWaiterEntry = nextWaiterEntry;
  330. decrementCounter = true;
  331. }
  332. // If the waiter entry had a next link then update it.
  333. // It also means that the waiter is already in the list and we
  334. // need to decrement the waiters count.
  335. if (null != nextWaiterEntry)
  336. {
  337. nextWaiterEntry._prevWaiterEntry = prevWaiterEntry;
  338. decrementCounter = true;
  339. }
  340. // Decrement the waiters count if needed
  341. if (decrementCounter)
  342. {
  343. --_waitersCount;
  344. }
  345. }
  346. #endregion
  347. #endregion
  348. #region WaiterEntry class
  349. // A waiter entry in the _waiters queue.
  350. public class WaiterEntry : IDisposable
  351. {
  352. #region Member variables
  353. /// <summary>
  354. /// Event to signal the waiter that it got the work item.
  355. /// </summary>
  356. private AutoResetEvent _waitHandle = new AutoResetEvent(false);
  357. /// <summary>
  358. /// Flag to know if this waiter already quited from the queue
  359. /// because of a timeout.
  360. /// </summary>
  361. private bool _isTimedout = false;
  362. /// <summary>
  363. /// Flag to know if the waiter was signaled and got a work item.
  364. /// </summary>
  365. private bool _isSignaled = false;
  366. /// <summary>
  367. /// A work item that passed directly to the waiter withou going
  368. /// through the queue
  369. /// </summary>
  370. private WorkItem _workItem = null;
  371. private bool _isDisposed = false;
  372. // Linked list members
  373. internal WaiterEntry _nextWaiterEntry = null;
  374. internal WaiterEntry _prevWaiterEntry = null;
  375. #endregion
  376. #region Construction
  377. public WaiterEntry()
  378. {
  379. Reset();
  380. }
  381. #endregion
  382. #region Public methods
  383. public WaitHandle WaitHandle
  384. {
  385. get { return _waitHandle; }
  386. }
  387. public WorkItem WorkItem
  388. {
  389. get
  390. {
  391. lock(this)
  392. {
  393. return _workItem;
  394. }
  395. }
  396. }
  397. /// <summary>
  398. /// Signal the waiter that it got a work item.
  399. /// </summary>
  400. /// <returns>Return true on success</returns>
  401. /// The method fails if Timeout() preceded its call
  402. public bool Signal(WorkItem workItem)
  403. {
  404. lock(this)
  405. {
  406. if (!_isTimedout)
  407. {
  408. _workItem = workItem;
  409. _isSignaled = true;
  410. _waitHandle.Set();
  411. return true;
  412. }
  413. }
  414. return false;
  415. }
  416. /// <summary>
  417. /// Mark the wait entry that it has been timed out
  418. /// </summary>
  419. /// <returns>Return true on success</returns>
  420. /// The method fails if Signal() preceded its call
  421. public bool Timeout()
  422. {
  423. lock(this)
  424. {
  425. // Time out can happen only if the waiter wasn't marked as
  426. // signaled
  427. if (!_isSignaled)
  428. {
  429. // We don't remove the waiter from the queue, the DequeueWorkItem
  430. // method skips _waiters that were timed out.
  431. _isTimedout = true;
  432. return true;
  433. }
  434. }
  435. return false;
  436. }
  437. /// <summary>
  438. /// Reset the wait entry so it can be used again
  439. /// </summary>
  440. public void Reset()
  441. {
  442. _workItem = null;
  443. _isTimedout = false;
  444. _isSignaled = false;
  445. _waitHandle.Reset();
  446. }
  447. /// <summary>
  448. /// Free resources
  449. /// </summary>
  450. public void Close()
  451. {
  452. if (null != _waitHandle)
  453. {
  454. _waitHandle.Close();
  455. _waitHandle = null;
  456. }
  457. }
  458. #endregion
  459. #region IDisposable Members
  460. public void Dispose()
  461. {
  462. if (!_isDisposed)
  463. {
  464. Close();
  465. _isDisposed = true;
  466. }
  467. }
  468. ~WaiterEntry()
  469. {
  470. Dispose();
  471. }
  472. #endregion
  473. }
  474. #endregion
  475. #region IDisposable Members
  476. public void Dispose()
  477. {
  478. if (!_isDisposed)
  479. {
  480. Cleanup();
  481. _isDisposed = true;
  482. GC.SuppressFinalize(this);
  483. }
  484. }
  485. ~WorkItemsQueue()
  486. {
  487. Cleanup();
  488. }
  489. private void ValidateNotDisposed()
  490. {
  491. if(_isDisposed)
  492. {
  493. throw new ObjectDisposedException(GetType().ToString(), "The SmartThreadPool has been shutdown");
  494. }
  495. }
  496. #endregion
  497. }
  498. #endregion
  499. }