WorkItemsQueue.cs 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Threading;
  4. namespace Amib.Threading.Internal
  5. {
  6. #region WorkItemsQueue class
  7. /// <summary>
  8. /// WorkItemsQueue class.
  9. /// </summary>
  10. public class WorkItemsQueue : IDisposable
  11. {
  12. #region Member variables
  13. /// <summary>
  14. /// Waiters queue (implemented as stack).
  15. /// </summary>
  16. private readonly WaiterEntry _headWaiterEntry = new WaiterEntry();
  17. /// <summary>
  18. /// Waiters count
  19. /// </summary>
  20. private int _waitersCount = 0;
  21. /// <summary>
  22. /// Work items queue
  23. /// </summary>
  24. private readonly Queue<WorkItem> _workItems = new Queue<WorkItem>();
  25. /// <summary>
  26. /// Indicate that work items are allowed to be queued
  27. /// </summary>
  28. private bool _isWorkItemsQueueActive = true;
  29. [ThreadStatic]
  30. private static WaiterEntry _waiterEntry;
  31. /// <summary>
  32. /// A flag that indicates if the WorkItemsQueue has been disposed.
  33. /// </summary>
  34. private bool _isDisposed = false;
  35. #endregion
  36. #region Public properties
  37. /// <summary>
  38. /// Returns the current number of work items in the queue
  39. /// </summary>
  40. public int Count
  41. {
  42. get
  43. {
  44. return _workItems.Count;
  45. }
  46. }
  47. /// <summary>
  48. /// Returns the current number of waiters
  49. /// </summary>
  50. public int WaitersCount
  51. {
  52. get
  53. {
  54. return _waitersCount;
  55. }
  56. }
  57. #endregion
  58. #region Public methods
  59. /// <summary>
  60. /// Enqueue a work item to the queue.
  61. /// </summary>
  62. public bool EnqueueWorkItem(WorkItem workItem)
  63. {
  64. // A work item cannot be null, since null is used in the
  65. // WaitForWorkItem() method to indicate timeout or cancel
  66. if (workItem == null)
  67. {
  68. throw new ArgumentNullException("workItem", "workItem cannot be null");
  69. }
  70. // First check if there is a waiter waiting for work item. During
  71. // the check, timed out waiters are ignored. If there is no
  72. // waiter then the work item is queued.
  73. lock (this)
  74. {
  75. ValidateNotDisposed();
  76. if (!_isWorkItemsQueueActive)
  77. return false;
  78. while (_waitersCount > 0)
  79. {
  80. // Dequeue a waiter.
  81. WaiterEntry waiterEntry = PopWaiter();
  82. // Signal the waiter. On success break the loop
  83. if (waiterEntry.Signal(workItem))
  84. return true;
  85. }
  86. // Enqueue the work item
  87. _workItems.Enqueue(workItem);
  88. }
  89. return true;
  90. }
  91. public void CloseThreadWaiter()
  92. {
  93. if(_waiterEntry != null)
  94. {
  95. _waiterEntry.Close();
  96. _waiterEntry = null;
  97. }
  98. }
  99. /// <summary>
  100. /// Waits for a work item or exits on timeout or cancel
  101. /// </summary>
  102. /// <param name="millisecondsTimeout">Timeout in milliseconds</param>
  103. /// <param name="cancelEvent">Cancel wait handle</param>
  104. /// <returns>Returns true if the resource was granted</returns>
  105. public WorkItem DequeueWorkItem( int millisecondsTimeout, WaitHandle cancelEvent)
  106. {
  107. // This method cause the caller to wait for a work item.
  108. // If there is at least one waiting work item then the
  109. // method returns immidiately with it.
  110. //
  111. // If there are no waiting work items then the caller
  112. // is queued between other waiters for a work item to arrive.
  113. //
  114. // If a work item didn't come within millisecondsTimeout or
  115. // the user canceled the wait by signaling the cancelEvent
  116. // then the method returns null to indicate that the caller
  117. // didn't get a work item.
  118. WaiterEntry waiterEntry;
  119. lock (this)
  120. {
  121. ValidateNotDisposed();
  122. // If there are waiting work items then take one and return.
  123. if (_workItems.Count > 0)
  124. return _workItems.Dequeue();
  125. // No waiting work items ...
  126. // Get the waiter entry for the waiters queue
  127. waiterEntry = GetThreadWaiterEntry();
  128. // Put the waiter with the other waiters
  129. PushWaiter(waiterEntry);
  130. }
  131. // Prepare array of wait handle for the WaitHandle.WaitAny()
  132. WaitHandle[] waitHandles = new WaitHandle[] { waiterEntry.WaitHandle, cancelEvent };
  133. // Wait for an available resource, cancel event, or timeout.
  134. // During the wait we are supposes to exit the synchronization
  135. // domain. (Placing true as the third argument of the WaitAny())
  136. // It just doesn't work, I don't know why, so I have two lock(this)
  137. // statments instead of one.
  138. int index = STPEventWaitHandle.WaitAny( waitHandles, millisecondsTimeout, true);
  139. lock (this)
  140. {
  141. // On timeout update the waiterEntry that it is timed out
  142. if (index != 0)
  143. {
  144. // The Timeout() fails if the waiter has already been signaled
  145. // On timeout remove the waiter from the queue.
  146. // Note that the complexity is O(1).
  147. if (waiterEntry.Timeout())
  148. {
  149. RemoveWaiter(waiterEntry, false);
  150. return null;
  151. }
  152. }
  153. // On success return the work item
  154. WorkItem workItem = waiterEntry.WorkItem;
  155. if (workItem == null)
  156. workItem = _workItems.Dequeue();
  157. return workItem;
  158. }
  159. }
  160. /// <summary>
  161. /// Cleanup the work items queue, hence no more work
  162. /// items are allowed to be queue
  163. /// </summary>
  164. private void Cleanup()
  165. {
  166. lock (this)
  167. {
  168. // Deactivate only once
  169. if (!_isWorkItemsQueueActive)
  170. {
  171. return;
  172. }
  173. // Don't queue more work items
  174. _isWorkItemsQueueActive = false;
  175. foreach (WorkItem workItem in _workItems)
  176. {
  177. workItem.DisposeOfState();
  178. }
  179. // Clear the work items that are already queued
  180. _workItems.Clear();
  181. // Note:
  182. // I don't iterate over the queue and dispose of work items's states,
  183. // since if a work item has a state object that is still in use in the
  184. // application then I must not dispose it.
  185. // Tell the waiters that they were timed out.
  186. // It won't signal them to exit, but to ignore their
  187. // next work item.
  188. while (_waitersCount > 0)
  189. {
  190. WaiterEntry waiterEntry = PopWaiter();
  191. waiterEntry.Timeout();
  192. }
  193. }
  194. }
  195. public object[] GetStates()
  196. {
  197. lock (this)
  198. {
  199. object[] states = new object[_workItems.Count];
  200. int i = 0;
  201. foreach (WorkItem workItem in _workItems)
  202. {
  203. states[i] = workItem.GetWorkItemResult().State;
  204. ++i;
  205. }
  206. return states;
  207. }
  208. }
  209. #endregion
  210. #region Private methods
  211. /// <summary>
  212. /// Returns the WaiterEntry of the current thread
  213. /// </summary>
  214. /// <returns></returns>
  215. /// In order to avoid creation and destuction of WaiterEntry
  216. /// objects each thread has its own WaiterEntry object.
  217. private static WaiterEntry GetThreadWaiterEntry()
  218. {
  219. if (_waiterEntry == null)
  220. {
  221. _waiterEntry = new WaiterEntry();
  222. }
  223. else
  224. _waiterEntry.Reset();
  225. return _waiterEntry;
  226. }
  227. #region Waiters stack methods
  228. /// <summary>
  229. /// Push a new waiter into the waiter's stack
  230. /// </summary>
  231. /// <param name="newWaiterEntry">A waiter to put in the stack</param>
  232. public void PushWaiter(WaiterEntry newWaiterEntry)
  233. {
  234. // Remove the waiter if it is already in the stack and
  235. // update waiter's count as needed
  236. RemoveWaiter(newWaiterEntry, false);
  237. // If the stack is empty then newWaiterEntry is the new head of the stack
  238. if (null == _headWaiterEntry._nextWaiterEntry)
  239. {
  240. _headWaiterEntry._nextWaiterEntry = newWaiterEntry;
  241. newWaiterEntry._prevWaiterEntry = _headWaiterEntry;
  242. }
  243. // If the stack is not empty then put newWaiterEntry as the new head
  244. // of the stack.
  245. else
  246. {
  247. // Save the old first waiter entry
  248. WaiterEntry oldFirstWaiterEntry = _headWaiterEntry._nextWaiterEntry;
  249. // Update the links
  250. _headWaiterEntry._nextWaiterEntry = newWaiterEntry;
  251. newWaiterEntry._nextWaiterEntry = oldFirstWaiterEntry;
  252. newWaiterEntry._prevWaiterEntry = _headWaiterEntry;
  253. oldFirstWaiterEntry._prevWaiterEntry = newWaiterEntry;
  254. }
  255. // Increment the number of waiters
  256. ++_waitersCount;
  257. }
  258. /// <summary>
  259. /// Pop a waiter from the waiter's stack
  260. /// </summary>
  261. /// <returns>Returns the first waiter in the stack</returns>
  262. private WaiterEntry PopWaiter()
  263. {
  264. // Store the current stack head
  265. WaiterEntry oldFirstWaiterEntry = _headWaiterEntry._nextWaiterEntry;
  266. // Store the new stack head
  267. WaiterEntry newHeadWaiterEntry = oldFirstWaiterEntry._nextWaiterEntry;
  268. // Update the old stack head list links and decrement the number
  269. // waiters.
  270. RemoveWaiter(oldFirstWaiterEntry, true);
  271. // Update the new stack head
  272. _headWaiterEntry._nextWaiterEntry = newHeadWaiterEntry;
  273. if (newHeadWaiterEntry != null)
  274. {
  275. newHeadWaiterEntry._prevWaiterEntry = _headWaiterEntry;
  276. }
  277. // Return the old stack head
  278. return oldFirstWaiterEntry;
  279. }
  280. /// <summary>
  281. /// Remove a waiter from the stack
  282. /// </summary>
  283. /// <param name="waiterEntry">A waiter entry to remove</param>
  284. /// <param name="popDecrement">If true the waiter count is always decremented</param>
  285. private void RemoveWaiter(WaiterEntry waiterEntry, bool popDecrement)
  286. {
  287. // Store the prev entry in the list
  288. WaiterEntry prevWaiterEntry = waiterEntry._prevWaiterEntry;
  289. waiterEntry._prevWaiterEntry = null;
  290. // Store the next entry in the list
  291. WaiterEntry nextWaiterEntry = waiterEntry._nextWaiterEntry;
  292. waiterEntry._nextWaiterEntry = null;
  293. // popDecrement indicate if we need to decrement the waiters count.
  294. // If we got here from PopWaiter then we must decrement.
  295. // If we got here from PushWaiter then we decrement only if
  296. // the waiter was already in the stack.
  297. // If the waiter entry had a prev link then update it.
  298. // It also means that the waiter is already in the list and we
  299. // need to decrement the waiters count.
  300. if (prevWaiterEntry != null)
  301. {
  302. prevWaiterEntry._nextWaiterEntry = nextWaiterEntry;
  303. popDecrement = true;
  304. }
  305. // If the waiter entry had a next link then update it.
  306. // It also means that the waiter is already in the list and we
  307. // need to decrement the waiters count.
  308. if (nextWaiterEntry != null)
  309. {
  310. nextWaiterEntry._prevWaiterEntry = prevWaiterEntry;
  311. popDecrement = true;
  312. }
  313. // Decrement the waiters count if needed
  314. if (popDecrement)
  315. --_waitersCount;
  316. }
  317. #endregion
  318. #endregion
  319. #region WaiterEntry class
  320. // A waiter entry in the _waiters queue.
  321. public sealed class WaiterEntry : IDisposable
  322. {
  323. #region Member variables
  324. /// <summary>
  325. /// Event to signal the waiter that it got the work item.
  326. /// </summary>
  327. private AutoResetEvent _waitHandle = new AutoResetEvent(false);
  328. /// <summary>
  329. /// Flag to know if this waiter already quited from the queue
  330. /// because of a timeout.
  331. /// </summary>
  332. private bool _isTimedout = false;
  333. /// <summary>
  334. /// Flag to know if the waiter was signaled and got a work item.
  335. /// </summary>
  336. private bool _isSignaled = false;
  337. /// <summary>
  338. /// A work item that passed directly to the waiter withou going
  339. /// through the queue
  340. /// </summary>
  341. private WorkItem _workItem = null;
  342. private bool _isDisposed = false;
  343. // Linked list members
  344. internal WaiterEntry _nextWaiterEntry = null;
  345. internal WaiterEntry _prevWaiterEntry = null;
  346. #endregion
  347. #region Construction
  348. public WaiterEntry()
  349. {
  350. }
  351. #endregion
  352. #region Public methods
  353. public WaitHandle WaitHandle
  354. {
  355. get { return _waitHandle; }
  356. }
  357. public WorkItem WorkItem
  358. {
  359. get
  360. {
  361. return _workItem;
  362. }
  363. }
  364. /// <summary>
  365. /// Signal the waiter that it got a work item.
  366. /// </summary>
  367. /// <returns>Return true on success</returns>
  368. /// The method fails if Timeout() preceded its call
  369. public bool Signal(WorkItem workItem)
  370. {
  371. lock (this)
  372. {
  373. if (!_isTimedout)
  374. {
  375. _workItem = workItem;
  376. _isSignaled = true;
  377. _waitHandle.Set();
  378. return true;
  379. }
  380. }
  381. return false;
  382. }
  383. /// <summary>
  384. /// Mark the wait entry that it has been timed out
  385. /// </summary>
  386. /// <returns>Return true on success</returns>
  387. /// The method fails if Signal() preceded its call
  388. public bool Timeout()
  389. {
  390. lock (this)
  391. {
  392. // Time out can happen only if the waiter wasn't marked as
  393. // signaled
  394. if (!_isSignaled)
  395. {
  396. // We don't remove the waiter from the queue, the DequeueWorkItem
  397. // method skips _waiters that were timed out.
  398. _isTimedout = true;
  399. return true;
  400. }
  401. }
  402. return false;
  403. }
  404. /// <summary>
  405. /// Reset the wait entry so it can be used again
  406. /// </summary>
  407. public void Reset()
  408. {
  409. _workItem = null;
  410. _isTimedout = false;
  411. _isSignaled = false;
  412. _waitHandle.Reset();
  413. }
  414. /// <summary>
  415. /// Free resources
  416. /// </summary>
  417. public void Close()
  418. {
  419. _workItem = null;
  420. if (null != _waitHandle)
  421. {
  422. _waitHandle.Close();
  423. _waitHandle = null;
  424. }
  425. }
  426. #endregion
  427. #region IDisposable Members
  428. public void Dispose()
  429. {
  430. lock (this)
  431. {
  432. if (!_isDisposed)
  433. {
  434. Close();
  435. _isDisposed = true;
  436. }
  437. }
  438. }
  439. #endregion
  440. }
  441. #endregion
  442. #region IDisposable Members
  443. public void Dispose()
  444. {
  445. Dispose(true);
  446. GC.SuppressFinalize(this);
  447. }
  448. protected virtual void Dispose(bool disposing)
  449. {
  450. if (!_isDisposed)
  451. {
  452. _isDisposed = true;
  453. Cleanup();
  454. _headWaiterEntry.Close();
  455. }
  456. }
  457. private void ValidateNotDisposed()
  458. {
  459. if (_isDisposed)
  460. {
  461. throw new ObjectDisposedException(GetType().ToString(), "The SmartThreadPool has been shutdown");
  462. }
  463. }
  464. #endregion
  465. }
  466. #endregion
  467. }