WorkItemsQueue.cs 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Threading;
  4. namespace Amib.Threading.Internal
  5. {
  6. #region WorkItemsQueue class
  7. /// <summary>
  8. /// WorkItemsQueue class.
  9. /// </summary>
  10. public class WorkItemsQueue : IDisposable
  11. {
  12. #region Member variables
  13. /// <summary>
  14. /// Waiters queue (implemented as stack).
  15. /// </summary>
  16. private readonly WaiterEntry _headWaiterEntry = new WaiterEntry();
  17. /// <summary>
  18. /// Waiters count
  19. /// </summary>
  20. private int _waitersCount = 0;
  21. /// <summary>
  22. /// Work items queue
  23. /// </summary>
  24. private readonly PriorityQueue _workItems = new PriorityQueue();
  25. /// <summary>
  26. /// Indicate that work items are allowed to be queued
  27. /// </summary>
  28. private bool _isWorkItemsQueueActive = true;
  29. #if (WINDOWS_PHONE)
  30. private static readonly Dictionary<int, WaiterEntry> _waiterEntries = new Dictionary<int, WaiterEntry>();
  31. #elif (_WINDOWS_CE)
  32. private static LocalDataStoreSlot _waiterEntrySlot = Thread.AllocateDataSlot();
  33. #else
  34. [ThreadStatic]
  35. private static WaiterEntry _waiterEntry;
  36. #endif
  37. /// <summary>
  38. /// Each thread in the thread pool keeps its own waiter entry.
  39. /// </summary>
  40. private static WaiterEntry CurrentWaiterEntry
  41. {
  42. #if (WINDOWS_PHONE)
  43. get
  44. {
  45. lock (_waiterEntries)
  46. {
  47. WaiterEntry waiterEntry;
  48. if (_waiterEntries.TryGetValue(Thread.CurrentThread.ManagedThreadId, out waiterEntry))
  49. {
  50. return waiterEntry;
  51. }
  52. }
  53. return null;
  54. }
  55. set
  56. {
  57. lock (_waiterEntries)
  58. {
  59. _waiterEntries[Thread.CurrentThread.ManagedThreadId] = value;
  60. }
  61. }
  62. #elif (_WINDOWS_CE)
  63. get
  64. {
  65. return Thread.GetData(_waiterEntrySlot) as WaiterEntry;
  66. }
  67. set
  68. {
  69. Thread.SetData(_waiterEntrySlot, value);
  70. }
  71. #else
  72. get
  73. {
  74. return _waiterEntry;
  75. }
  76. set
  77. {
  78. _waiterEntry = value;
  79. }
  80. #endif
  81. }
  82. /// <summary>
  83. /// A flag that indicates if the WorkItemsQueue has been disposed.
  84. /// </summary>
  85. private bool _isDisposed = false;
  86. #endregion
  87. #region Public properties
  88. /// <summary>
  89. /// Returns the current number of work items in the queue
  90. /// </summary>
  91. public int Count
  92. {
  93. get
  94. {
  95. return _workItems.Count;
  96. }
  97. }
  98. /// <summary>
  99. /// Returns the current number of waiters
  100. /// </summary>
  101. public int WaitersCount
  102. {
  103. get
  104. {
  105. return _waitersCount;
  106. }
  107. }
  108. #endregion
  109. #region Public methods
  110. /// <summary>
  111. /// Enqueue a work item to the queue.
  112. /// </summary>
  113. public bool EnqueueWorkItem(WorkItem workItem)
  114. {
  115. // A work item cannot be null, since null is used in the
  116. // WaitForWorkItem() method to indicate timeout or cancel
  117. if (null == workItem)
  118. {
  119. throw new ArgumentNullException("workItem" , "workItem cannot be null");
  120. }
  121. bool enqueue = true;
  122. // First check if there is a waiter waiting for work item. During
  123. // the check, timed out waiters are ignored. If there is no
  124. // waiter then the work item is queued.
  125. lock(this)
  126. {
  127. ValidateNotDisposed();
  128. if (!_isWorkItemsQueueActive)
  129. {
  130. return false;
  131. }
  132. while(_waitersCount > 0)
  133. {
  134. // Dequeue a waiter.
  135. WaiterEntry waiterEntry = PopWaiter();
  136. // Signal the waiter. On success break the loop
  137. if (waiterEntry.Signal(workItem))
  138. {
  139. enqueue = false;
  140. break;
  141. }
  142. }
  143. if (enqueue)
  144. {
  145. // Enqueue the work item
  146. _workItems.Enqueue(workItem);
  147. }
  148. }
  149. return true;
  150. }
  151. /// <summary>
  152. /// Waits for a work item or exits on timeout or cancel
  153. /// </summary>
  154. /// <param name="millisecondsTimeout">Timeout in milliseconds</param>
  155. /// <param name="cancelEvent">Cancel wait handle</param>
  156. /// <returns>Returns true if the resource was granted</returns>
  157. public WorkItem DequeueWorkItem(
  158. int millisecondsTimeout,
  159. WaitHandle cancelEvent)
  160. {
  161. // This method cause the caller to wait for a work item.
  162. // If there is at least one waiting work item then the
  163. // method returns immidiately with it.
  164. //
  165. // If there are no waiting work items then the caller
  166. // is queued between other waiters for a work item to arrive.
  167. //
  168. // If a work item didn't come within millisecondsTimeout or
  169. // the user canceled the wait by signaling the cancelEvent
  170. // then the method returns null to indicate that the caller
  171. // didn't get a work item.
  172. WaiterEntry waiterEntry;
  173. WorkItem workItem = null;
  174. lock (this)
  175. {
  176. ValidateNotDisposed();
  177. // If there are waiting work items then take one and return.
  178. if (_workItems.Count > 0)
  179. {
  180. workItem = _workItems.Dequeue() as WorkItem;
  181. return workItem;
  182. }
  183. // No waiting work items ...
  184. // Get the waiter entry for the waiters queue
  185. waiterEntry = GetThreadWaiterEntry();
  186. // Put the waiter with the other waiters
  187. PushWaiter(waiterEntry);
  188. }
  189. // Prepare array of wait handle for the WaitHandle.WaitAny()
  190. WaitHandle [] waitHandles = new WaitHandle[] {
  191. waiterEntry.WaitHandle,
  192. cancelEvent };
  193. // Wait for an available resource, cancel event, or timeout.
  194. // During the wait we are supposes to exit the synchronization
  195. // domain. (Placing true as the third argument of the WaitAny())
  196. // It just doesn't work, I don't know why, so I have two lock(this)
  197. // statments instead of one.
  198. int index = STPEventWaitHandle.WaitAny(
  199. waitHandles,
  200. millisecondsTimeout,
  201. true);
  202. lock(this)
  203. {
  204. // success is true if it got a work item.
  205. bool success = (0 == index);
  206. // The timeout variable is used only for readability.
  207. // (We treat cancel as timeout)
  208. bool timeout = !success;
  209. // On timeout update the waiterEntry that it is timed out
  210. if (timeout)
  211. {
  212. // The Timeout() fails if the waiter has already been signaled
  213. timeout = waiterEntry.Timeout();
  214. // On timeout remove the waiter from the queue.
  215. // Note that the complexity is O(1).
  216. if(timeout)
  217. {
  218. RemoveWaiter(waiterEntry, false);
  219. }
  220. // Again readability
  221. success = !timeout;
  222. }
  223. // On success return the work item
  224. if (success)
  225. {
  226. workItem = waiterEntry.WorkItem;
  227. if (null == workItem)
  228. {
  229. workItem = _workItems.Dequeue() as WorkItem;
  230. }
  231. }
  232. }
  233. // On failure return null.
  234. return workItem;
  235. }
  236. /// <summary>
  237. /// Cleanup the work items queue, hence no more work
  238. /// items are allowed to be queue
  239. /// </summary>
  240. private void Cleanup()
  241. {
  242. lock(this)
  243. {
  244. // Deactivate only once
  245. if (!_isWorkItemsQueueActive)
  246. {
  247. return;
  248. }
  249. // Don't queue more work items
  250. _isWorkItemsQueueActive = false;
  251. foreach(WorkItem workItem in _workItems)
  252. {
  253. workItem.DisposeOfState();
  254. }
  255. // Clear the work items that are already queued
  256. _workItems.Clear();
  257. // Note:
  258. // I don't iterate over the queue and dispose of work items's states,
  259. // since if a work item has a state object that is still in use in the
  260. // application then I must not dispose it.
  261. // Tell the waiters that they were timed out.
  262. // It won't signal them to exit, but to ignore their
  263. // next work item.
  264. while(_waitersCount > 0)
  265. {
  266. WaiterEntry waiterEntry = PopWaiter();
  267. waiterEntry.Timeout();
  268. }
  269. }
  270. }
  271. public object[] GetStates()
  272. {
  273. lock (this)
  274. {
  275. object[] states = new object[_workItems.Count];
  276. int i = 0;
  277. foreach (WorkItem workItem in _workItems)
  278. {
  279. states[i] = workItem.GetWorkItemResult().State;
  280. ++i;
  281. }
  282. return states;
  283. }
  284. }
  285. #endregion
  286. #region Private methods
  287. /// <summary>
  288. /// Returns the WaiterEntry of the current thread
  289. /// </summary>
  290. /// <returns></returns>
  291. /// In order to avoid creation and destuction of WaiterEntry
  292. /// objects each thread has its own WaiterEntry object.
  293. private static WaiterEntry GetThreadWaiterEntry()
  294. {
  295. if (null == CurrentWaiterEntry)
  296. {
  297. CurrentWaiterEntry = new WaiterEntry();
  298. }
  299. CurrentWaiterEntry.Reset();
  300. return CurrentWaiterEntry;
  301. }
  302. #region Waiters stack methods
  303. /// <summary>
  304. /// Push a new waiter into the waiter's stack
  305. /// </summary>
  306. /// <param name="newWaiterEntry">A waiter to put in the stack</param>
  307. public void PushWaiter(WaiterEntry newWaiterEntry)
  308. {
  309. // Remove the waiter if it is already in the stack and
  310. // update waiter's count as needed
  311. RemoveWaiter(newWaiterEntry, false);
  312. // If the stack is empty then newWaiterEntry is the new head of the stack
  313. if (null == _headWaiterEntry._nextWaiterEntry)
  314. {
  315. _headWaiterEntry._nextWaiterEntry = newWaiterEntry;
  316. newWaiterEntry._prevWaiterEntry = _headWaiterEntry;
  317. }
  318. // If the stack is not empty then put newWaiterEntry as the new head
  319. // of the stack.
  320. else
  321. {
  322. // Save the old first waiter entry
  323. WaiterEntry oldFirstWaiterEntry = _headWaiterEntry._nextWaiterEntry;
  324. // Update the links
  325. _headWaiterEntry._nextWaiterEntry = newWaiterEntry;
  326. newWaiterEntry._nextWaiterEntry = oldFirstWaiterEntry;
  327. newWaiterEntry._prevWaiterEntry = _headWaiterEntry;
  328. oldFirstWaiterEntry._prevWaiterEntry = newWaiterEntry;
  329. }
  330. // Increment the number of waiters
  331. ++_waitersCount;
  332. }
  333. /// <summary>
  334. /// Pop a waiter from the waiter's stack
  335. /// </summary>
  336. /// <returns>Returns the first waiter in the stack</returns>
  337. private WaiterEntry PopWaiter()
  338. {
  339. // Store the current stack head
  340. WaiterEntry oldFirstWaiterEntry = _headWaiterEntry._nextWaiterEntry;
  341. // Store the new stack head
  342. WaiterEntry newHeadWaiterEntry = oldFirstWaiterEntry._nextWaiterEntry;
  343. // Update the old stack head list links and decrement the number
  344. // waiters.
  345. RemoveWaiter(oldFirstWaiterEntry, true);
  346. // Update the new stack head
  347. _headWaiterEntry._nextWaiterEntry = newHeadWaiterEntry;
  348. if (null != newHeadWaiterEntry)
  349. {
  350. newHeadWaiterEntry._prevWaiterEntry = _headWaiterEntry;
  351. }
  352. // Return the old stack head
  353. return oldFirstWaiterEntry;
  354. }
  355. /// <summary>
  356. /// Remove a waiter from the stack
  357. /// </summary>
  358. /// <param name="waiterEntry">A waiter entry to remove</param>
  359. /// <param name="popDecrement">If true the waiter count is always decremented</param>
  360. private void RemoveWaiter(WaiterEntry waiterEntry, bool popDecrement)
  361. {
  362. // Store the prev entry in the list
  363. WaiterEntry prevWaiterEntry = waiterEntry._prevWaiterEntry;
  364. // Store the next entry in the list
  365. WaiterEntry nextWaiterEntry = waiterEntry._nextWaiterEntry;
  366. // A flag to indicate if we need to decrement the waiters count.
  367. // If we got here from PopWaiter then we must decrement.
  368. // If we got here from PushWaiter then we decrement only if
  369. // the waiter was already in the stack.
  370. bool decrementCounter = popDecrement;
  371. // Null the waiter's entry links
  372. waiterEntry._prevWaiterEntry = null;
  373. waiterEntry._nextWaiterEntry = null;
  374. // If the waiter entry had a prev link then update it.
  375. // It also means that the waiter is already in the list and we
  376. // need to decrement the waiters count.
  377. if (null != prevWaiterEntry)
  378. {
  379. prevWaiterEntry._nextWaiterEntry = nextWaiterEntry;
  380. decrementCounter = true;
  381. }
  382. // If the waiter entry had a next link then update it.
  383. // It also means that the waiter is already in the list and we
  384. // need to decrement the waiters count.
  385. if (null != nextWaiterEntry)
  386. {
  387. nextWaiterEntry._prevWaiterEntry = prevWaiterEntry;
  388. decrementCounter = true;
  389. }
  390. // Decrement the waiters count if needed
  391. if (decrementCounter)
  392. {
  393. --_waitersCount;
  394. }
  395. }
  396. #endregion
  397. #endregion
  398. #region WaiterEntry class
  399. // A waiter entry in the _waiters queue.
  400. public sealed class WaiterEntry : IDisposable
  401. {
  402. #region Member variables
  403. /// <summary>
  404. /// Event to signal the waiter that it got the work item.
  405. /// </summary>
  406. //private AutoResetEvent _waitHandle = new AutoResetEvent(false);
  407. private AutoResetEvent _waitHandle = EventWaitHandleFactory.CreateAutoResetEvent();
  408. /// <summary>
  409. /// Flag to know if this waiter already quited from the queue
  410. /// because of a timeout.
  411. /// </summary>
  412. private bool _isTimedout = false;
  413. /// <summary>
  414. /// Flag to know if the waiter was signaled and got a work item.
  415. /// </summary>
  416. private bool _isSignaled = false;
  417. /// <summary>
  418. /// A work item that passed directly to the waiter withou going
  419. /// through the queue
  420. /// </summary>
  421. private WorkItem _workItem = null;
  422. private bool _isDisposed = false;
  423. // Linked list members
  424. internal WaiterEntry _nextWaiterEntry = null;
  425. internal WaiterEntry _prevWaiterEntry = null;
  426. #endregion
  427. #region Construction
  428. public WaiterEntry()
  429. {
  430. Reset();
  431. }
  432. #endregion
  433. #region Public methods
  434. public WaitHandle WaitHandle
  435. {
  436. get { return _waitHandle; }
  437. }
  438. public WorkItem WorkItem
  439. {
  440. get
  441. {
  442. return _workItem;
  443. }
  444. }
  445. /// <summary>
  446. /// Signal the waiter that it got a work item.
  447. /// </summary>
  448. /// <returns>Return true on success</returns>
  449. /// The method fails if Timeout() preceded its call
  450. public bool Signal(WorkItem workItem)
  451. {
  452. lock(this)
  453. {
  454. if (!_isTimedout)
  455. {
  456. _workItem = workItem;
  457. _isSignaled = true;
  458. _waitHandle.Set();
  459. return true;
  460. }
  461. }
  462. return false;
  463. }
  464. /// <summary>
  465. /// Mark the wait entry that it has been timed out
  466. /// </summary>
  467. /// <returns>Return true on success</returns>
  468. /// The method fails if Signal() preceded its call
  469. public bool Timeout()
  470. {
  471. lock(this)
  472. {
  473. // Time out can happen only if the waiter wasn't marked as
  474. // signaled
  475. if (!_isSignaled)
  476. {
  477. // We don't remove the waiter from the queue, the DequeueWorkItem
  478. // method skips _waiters that were timed out.
  479. _isTimedout = true;
  480. return true;
  481. }
  482. }
  483. return false;
  484. }
  485. /// <summary>
  486. /// Reset the wait entry so it can be used again
  487. /// </summary>
  488. public void Reset()
  489. {
  490. _workItem = null;
  491. _isTimedout = false;
  492. _isSignaled = false;
  493. _waitHandle.Reset();
  494. }
  495. /// <summary>
  496. /// Free resources
  497. /// </summary>
  498. public void Close()
  499. {
  500. if (null != _waitHandle)
  501. {
  502. _waitHandle.Close();
  503. _waitHandle = null;
  504. }
  505. }
  506. #endregion
  507. #region IDisposable Members
  508. public void Dispose()
  509. {
  510. lock (this)
  511. {
  512. if (!_isDisposed)
  513. {
  514. Close();
  515. }
  516. _isDisposed = true;
  517. }
  518. }
  519. #endregion
  520. }
  521. #endregion
  522. #region IDisposable Members
  523. public void Dispose()
  524. {
  525. if (!_isDisposed)
  526. {
  527. Cleanup();
  528. }
  529. _isDisposed = true;
  530. }
  531. private void ValidateNotDisposed()
  532. {
  533. if(_isDisposed)
  534. {
  535. throw new ObjectDisposedException(GetType().ToString(), "The SmartThreadPool has been shutdown");
  536. }
  537. }
  538. #endregion
  539. }
  540. #endregion
  541. }