WorkItemsQueue.cs 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Threading;
  4. namespace Amib.Threading.Internal
  5. {
  6. #region WorkItemsQueue class
  7. /// <summary>
  8. /// WorkItemsQueue class.
  9. /// </summary>
  10. public class WorkItemsQueue : IDisposable
  11. {
  12. #region Member variables
  13. /// <summary>
  14. /// Waiters queue (implemented as stack).
  15. /// </summary>
  16. private readonly WaiterEntry _headWaiterEntry = new WaiterEntry();
  17. /// <summary>
  18. /// Waiters count
  19. /// </summary>
  20. private int _waitersCount = 0;
  21. /// <summary>
  22. /// Work items queue
  23. /// </summary>
  24. private readonly Queue<WorkItem> _workItems = new Queue<WorkItem>();
  25. /// <summary>
  26. /// Indicate that work items are allowed to be queued
  27. /// </summary>
  28. private bool _isWorkItemsQueueActive = true;
  29. [ThreadStatic]
  30. private static WaiterEntry _waiterEntry;
  31. /// <summary>
  32. /// Each thread in the thread pool keeps its own waiter entry.
  33. /// </summary>
  34. private static WaiterEntry CurrentWaiterEntry
  35. {
  36. get
  37. {
  38. return _waiterEntry;
  39. }
  40. set
  41. {
  42. _waiterEntry = value;
  43. }
  44. }
  45. /// <summary>
  46. /// A flag that indicates if the WorkItemsQueue has been disposed.
  47. /// </summary>
  48. private bool _isDisposed = false;
  49. #endregion
  50. #region Public properties
  51. /// <summary>
  52. /// Returns the current number of work items in the queue
  53. /// </summary>
  54. public int Count
  55. {
  56. get
  57. {
  58. return _workItems.Count;
  59. }
  60. }
  61. /// <summary>
  62. /// Returns the current number of waiters
  63. /// </summary>
  64. public int WaitersCount
  65. {
  66. get
  67. {
  68. return _waitersCount;
  69. }
  70. }
  71. #endregion
  72. #region Public methods
  73. /// <summary>
  74. /// Enqueue a work item to the queue.
  75. /// </summary>
  76. public bool EnqueueWorkItem(WorkItem workItem)
  77. {
  78. // A work item cannot be null, since null is used in the
  79. // WaitForWorkItem() method to indicate timeout or cancel
  80. if (null == workItem)
  81. {
  82. throw new ArgumentNullException("workItem", "workItem cannot be null");
  83. }
  84. bool enqueue = true;
  85. // First check if there is a waiter waiting for work item. During
  86. // the check, timed out waiters are ignored. If there is no
  87. // waiter then the work item is queued.
  88. lock (this)
  89. {
  90. ValidateNotDisposed();
  91. if (!_isWorkItemsQueueActive)
  92. {
  93. return false;
  94. }
  95. while (_waitersCount > 0)
  96. {
  97. // Dequeue a waiter.
  98. WaiterEntry waiterEntry = PopWaiter();
  99. // Signal the waiter. On success break the loop
  100. if (waiterEntry.Signal(workItem))
  101. {
  102. enqueue = false;
  103. break;
  104. }
  105. }
  106. if (enqueue)
  107. {
  108. // Enqueue the work item
  109. _workItems.Enqueue(workItem);
  110. }
  111. }
  112. return true;
  113. }
  114. /// <summary>
  115. /// Waits for a work item or exits on timeout or cancel
  116. /// </summary>
  117. /// <param name="millisecondsTimeout">Timeout in milliseconds</param>
  118. /// <param name="cancelEvent">Cancel wait handle</param>
  119. /// <returns>Returns true if the resource was granted</returns>
  120. public WorkItem DequeueWorkItem( int millisecondsTimeout, WaitHandle cancelEvent)
  121. {
  122. // This method cause the caller to wait for a work item.
  123. // If there is at least one waiting work item then the
  124. // method returns immidiately with it.
  125. //
  126. // If there are no waiting work items then the caller
  127. // is queued between other waiters for a work item to arrive.
  128. //
  129. // If a work item didn't come within millisecondsTimeout or
  130. // the user canceled the wait by signaling the cancelEvent
  131. // then the method returns null to indicate that the caller
  132. // didn't get a work item.
  133. WaiterEntry waiterEntry;
  134. WorkItem workItem = null;
  135. lock (this)
  136. {
  137. ValidateNotDisposed();
  138. // If there are waiting work items then take one and return.
  139. if (_workItems.Count > 0)
  140. {
  141. workItem = _workItems.Dequeue();
  142. return workItem;
  143. }
  144. // No waiting work items ...
  145. // Get the waiter entry for the waiters queue
  146. waiterEntry = GetThreadWaiterEntry();
  147. // Put the waiter with the other waiters
  148. PushWaiter(waiterEntry);
  149. }
  150. // Prepare array of wait handle for the WaitHandle.WaitAny()
  151. WaitHandle[] waitHandles = new WaitHandle[] { waiterEntry.WaitHandle, cancelEvent };
  152. // Wait for an available resource, cancel event, or timeout.
  153. // During the wait we are supposes to exit the synchronization
  154. // domain. (Placing true as the third argument of the WaitAny())
  155. // It just doesn't work, I don't know why, so I have two lock(this)
  156. // statments instead of one.
  157. int index = STPEventWaitHandle.WaitAny( waitHandles, millisecondsTimeout, true);
  158. lock (this)
  159. {
  160. // success is true if it got a work item.
  161. bool success = (0 == index);
  162. // The timeout variable is used only for readability.
  163. // (We treat cancel as timeout)
  164. bool timeout = !success;
  165. // On timeout update the waiterEntry that it is timed out
  166. if (timeout)
  167. {
  168. // The Timeout() fails if the waiter has already been signaled
  169. timeout = waiterEntry.Timeout();
  170. // On timeout remove the waiter from the queue.
  171. // Note that the complexity is O(1).
  172. if (timeout)
  173. {
  174. RemoveWaiter(waiterEntry, false);
  175. }
  176. // Again readability
  177. success = !timeout;
  178. }
  179. // On success return the work item
  180. if (success)
  181. {
  182. workItem = waiterEntry.WorkItem;
  183. if (null == workItem)
  184. {
  185. workItem = _workItems.Dequeue();
  186. }
  187. }
  188. }
  189. // On failure return null.
  190. return workItem;
  191. }
  192. /// <summary>
  193. /// Cleanup the work items queue, hence no more work
  194. /// items are allowed to be queue
  195. /// </summary>
  196. private void Cleanup()
  197. {
  198. lock (this)
  199. {
  200. // Deactivate only once
  201. if (!_isWorkItemsQueueActive)
  202. {
  203. return;
  204. }
  205. // Don't queue more work items
  206. _isWorkItemsQueueActive = false;
  207. foreach (WorkItem workItem in _workItems)
  208. {
  209. workItem.DisposeOfState();
  210. }
  211. // Clear the work items that are already queued
  212. _workItems.Clear();
  213. // Note:
  214. // I don't iterate over the queue and dispose of work items's states,
  215. // since if a work item has a state object that is still in use in the
  216. // application then I must not dispose it.
  217. // Tell the waiters that they were timed out.
  218. // It won't signal them to exit, but to ignore their
  219. // next work item.
  220. while (_waitersCount > 0)
  221. {
  222. WaiterEntry waiterEntry = PopWaiter();
  223. waiterEntry.Timeout();
  224. }
  225. }
  226. }
  227. public object[] GetStates()
  228. {
  229. lock (this)
  230. {
  231. object[] states = new object[_workItems.Count];
  232. int i = 0;
  233. foreach (WorkItem workItem in _workItems)
  234. {
  235. states[i] = workItem.GetWorkItemResult().State;
  236. ++i;
  237. }
  238. return states;
  239. }
  240. }
  241. #endregion
  242. #region Private methods
  243. /// <summary>
  244. /// Returns the WaiterEntry of the current thread
  245. /// </summary>
  246. /// <returns></returns>
  247. /// In order to avoid creation and destuction of WaiterEntry
  248. /// objects each thread has its own WaiterEntry object.
  249. private static WaiterEntry GetThreadWaiterEntry()
  250. {
  251. if (null == CurrentWaiterEntry)
  252. {
  253. CurrentWaiterEntry = new WaiterEntry();
  254. }
  255. CurrentWaiterEntry.Reset();
  256. return CurrentWaiterEntry;
  257. }
  258. #region Waiters stack methods
  259. /// <summary>
  260. /// Push a new waiter into the waiter's stack
  261. /// </summary>
  262. /// <param name="newWaiterEntry">A waiter to put in the stack</param>
  263. public void PushWaiter(WaiterEntry newWaiterEntry)
  264. {
  265. // Remove the waiter if it is already in the stack and
  266. // update waiter's count as needed
  267. RemoveWaiter(newWaiterEntry, false);
  268. // If the stack is empty then newWaiterEntry is the new head of the stack
  269. if (null == _headWaiterEntry._nextWaiterEntry)
  270. {
  271. _headWaiterEntry._nextWaiterEntry = newWaiterEntry;
  272. newWaiterEntry._prevWaiterEntry = _headWaiterEntry;
  273. }
  274. // If the stack is not empty then put newWaiterEntry as the new head
  275. // of the stack.
  276. else
  277. {
  278. // Save the old first waiter entry
  279. WaiterEntry oldFirstWaiterEntry = _headWaiterEntry._nextWaiterEntry;
  280. // Update the links
  281. _headWaiterEntry._nextWaiterEntry = newWaiterEntry;
  282. newWaiterEntry._nextWaiterEntry = oldFirstWaiterEntry;
  283. newWaiterEntry._prevWaiterEntry = _headWaiterEntry;
  284. oldFirstWaiterEntry._prevWaiterEntry = newWaiterEntry;
  285. }
  286. // Increment the number of waiters
  287. ++_waitersCount;
  288. }
  289. /// <summary>
  290. /// Pop a waiter from the waiter's stack
  291. /// </summary>
  292. /// <returns>Returns the first waiter in the stack</returns>
  293. private WaiterEntry PopWaiter()
  294. {
  295. // Store the current stack head
  296. WaiterEntry oldFirstWaiterEntry = _headWaiterEntry._nextWaiterEntry;
  297. // Store the new stack head
  298. WaiterEntry newHeadWaiterEntry = oldFirstWaiterEntry._nextWaiterEntry;
  299. // Update the old stack head list links and decrement the number
  300. // waiters.
  301. RemoveWaiter(oldFirstWaiterEntry, true);
  302. // Update the new stack head
  303. _headWaiterEntry._nextWaiterEntry = newHeadWaiterEntry;
  304. if (null != newHeadWaiterEntry)
  305. {
  306. newHeadWaiterEntry._prevWaiterEntry = _headWaiterEntry;
  307. }
  308. // Return the old stack head
  309. return oldFirstWaiterEntry;
  310. }
  311. /// <summary>
  312. /// Remove a waiter from the stack
  313. /// </summary>
  314. /// <param name="waiterEntry">A waiter entry to remove</param>
  315. /// <param name="popDecrement">If true the waiter count is always decremented</param>
  316. private void RemoveWaiter(WaiterEntry waiterEntry, bool popDecrement)
  317. {
  318. // Store the prev entry in the list
  319. WaiterEntry prevWaiterEntry = waiterEntry._prevWaiterEntry;
  320. // Store the next entry in the list
  321. WaiterEntry nextWaiterEntry = waiterEntry._nextWaiterEntry;
  322. // A flag to indicate if we need to decrement the waiters count.
  323. // If we got here from PopWaiter then we must decrement.
  324. // If we got here from PushWaiter then we decrement only if
  325. // the waiter was already in the stack.
  326. bool decrementCounter = popDecrement;
  327. // Null the waiter's entry links
  328. waiterEntry._prevWaiterEntry = null;
  329. waiterEntry._nextWaiterEntry = null;
  330. // If the waiter entry had a prev link then update it.
  331. // It also means that the waiter is already in the list and we
  332. // need to decrement the waiters count.
  333. if (null != prevWaiterEntry)
  334. {
  335. prevWaiterEntry._nextWaiterEntry = nextWaiterEntry;
  336. decrementCounter = true;
  337. }
  338. // If the waiter entry had a next link then update it.
  339. // It also means that the waiter is already in the list and we
  340. // need to decrement the waiters count.
  341. if (null != nextWaiterEntry)
  342. {
  343. nextWaiterEntry._prevWaiterEntry = prevWaiterEntry;
  344. decrementCounter = true;
  345. }
  346. // Decrement the waiters count if needed
  347. if (decrementCounter)
  348. {
  349. --_waitersCount;
  350. }
  351. }
  352. #endregion
  353. #endregion
  354. #region WaiterEntry class
  355. // A waiter entry in the _waiters queue.
  356. public sealed class WaiterEntry : IDisposable
  357. {
  358. #region Member variables
  359. /// <summary>
  360. /// Event to signal the waiter that it got the work item.
  361. /// </summary>
  362. //private AutoResetEvent _waitHandle = new AutoResetEvent(false);
  363. private AutoResetEvent _waitHandle = new AutoResetEvent(false);
  364. /// <summary>
  365. /// Flag to know if this waiter already quited from the queue
  366. /// because of a timeout.
  367. /// </summary>
  368. private bool _isTimedout = false;
  369. /// <summary>
  370. /// Flag to know if the waiter was signaled and got a work item.
  371. /// </summary>
  372. private bool _isSignaled = false;
  373. /// <summary>
  374. /// A work item that passed directly to the waiter withou going
  375. /// through the queue
  376. /// </summary>
  377. private WorkItem _workItem = null;
  378. private bool _isDisposed = false;
  379. // Linked list members
  380. internal WaiterEntry _nextWaiterEntry = null;
  381. internal WaiterEntry _prevWaiterEntry = null;
  382. #endregion
  383. #region Construction
  384. public WaiterEntry()
  385. {
  386. Reset();
  387. }
  388. #endregion
  389. #region Public methods
  390. public WaitHandle WaitHandle
  391. {
  392. get { return _waitHandle; }
  393. }
  394. public WorkItem WorkItem
  395. {
  396. get
  397. {
  398. return _workItem;
  399. }
  400. }
  401. /// <summary>
  402. /// Signal the waiter that it got a work item.
  403. /// </summary>
  404. /// <returns>Return true on success</returns>
  405. /// The method fails if Timeout() preceded its call
  406. public bool Signal(WorkItem workItem)
  407. {
  408. lock (this)
  409. {
  410. if (!_isTimedout)
  411. {
  412. _workItem = workItem;
  413. _isSignaled = true;
  414. _waitHandle.Set();
  415. return true;
  416. }
  417. }
  418. return false;
  419. }
  420. /// <summary>
  421. /// Mark the wait entry that it has been timed out
  422. /// </summary>
  423. /// <returns>Return true on success</returns>
  424. /// The method fails if Signal() preceded its call
  425. public bool Timeout()
  426. {
  427. lock (this)
  428. {
  429. // Time out can happen only if the waiter wasn't marked as
  430. // signaled
  431. if (!_isSignaled)
  432. {
  433. // We don't remove the waiter from the queue, the DequeueWorkItem
  434. // method skips _waiters that were timed out.
  435. _isTimedout = true;
  436. return true;
  437. }
  438. }
  439. return false;
  440. }
  441. /// <summary>
  442. /// Reset the wait entry so it can be used again
  443. /// </summary>
  444. public void Reset()
  445. {
  446. _workItem = null;
  447. _isTimedout = false;
  448. _isSignaled = false;
  449. _waitHandle.Reset();
  450. }
  451. /// <summary>
  452. /// Free resources
  453. /// </summary>
  454. public void Close()
  455. {
  456. if (null != _waitHandle)
  457. {
  458. _waitHandle.Close();
  459. _waitHandle = null;
  460. }
  461. }
  462. #endregion
  463. #region IDisposable Members
  464. public void Dispose()
  465. {
  466. lock (this)
  467. {
  468. if (!_isDisposed)
  469. {
  470. Close();
  471. }
  472. _isDisposed = true;
  473. }
  474. }
  475. #endregion
  476. }
  477. #endregion
  478. #region IDisposable Members
  479. public void Dispose()
  480. {
  481. Dispose(true);
  482. GC.SuppressFinalize(this);
  483. }
  484. protected virtual void Dispose(bool disposing)
  485. {
  486. if (!_isDisposed)
  487. {
  488. _isDisposed = true;
  489. Cleanup();
  490. _headWaiterEntry.Close();
  491. }
  492. }
  493. private void ValidateNotDisposed()
  494. {
  495. if (_isDisposed)
  496. {
  497. throw new ObjectDisposedException(GetType().ToString(), "The SmartThreadPool has been shutdown");
  498. }
  499. }
  500. #endregion
  501. }
  502. #endregion
  503. }