1
0

WorkItemsGroup.cs 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512
  1. // Ami Bar
  2. // [email protected]
  3. using System;
  4. using System.Threading;
  5. using System.Runtime.CompilerServices;
  6. using System.Diagnostics;
  7. namespace Amib.Threading.Internal
  8. {
  9. #region WorkItemsGroup class
  10. /// <summary>
  11. /// Summary description for WorkItemsGroup.
  12. /// </summary>
  13. public class WorkItemsGroup : IWorkItemsGroup
  14. {
  15. #region Private members
  16. private object _lock = new object();
  17. /// <summary>
  18. /// Contains the name of this instance of SmartThreadPool.
  19. /// Can be changed by the user.
  20. /// </summary>
  21. private string _name = "WorkItemsGroup";
  22. /// <summary>
  23. /// A reference to the SmartThreadPool instance that created this
  24. /// WorkItemsGroup.
  25. /// </summary>
  26. private SmartThreadPool _stp;
  27. /// <summary>
  28. /// The OnIdle event
  29. /// </summary>
  30. private event WorkItemsGroupIdleHandler _onIdle;
  31. /// <summary>
  32. /// Defines how many work items of this WorkItemsGroup can run at once.
  33. /// </summary>
  34. private int _concurrency;
  35. /// <summary>
  36. /// Priority queue to hold work items before they are passed
  37. /// to the SmartThreadPool.
  38. /// </summary>
  39. private PriorityQueue _workItemsQueue;
  40. /// <summary>
  41. /// Indicate how many work items are waiting in the SmartThreadPool
  42. /// queue.
  43. /// This value is used to apply the concurrency.
  44. /// </summary>
  45. private int _workItemsInStpQueue;
  46. /// <summary>
  47. /// Indicate how many work items are currently running in the SmartThreadPool.
  48. /// This value is used with the Cancel, to calculate if we can send new
  49. /// work items to the STP.
  50. /// </summary>
  51. private int _workItemsExecutingInStp = 0;
  52. /// <summary>
  53. /// WorkItemsGroup start information
  54. /// </summary>
  55. private WIGStartInfo _workItemsGroupStartInfo;
  56. /// <summary>
  57. /// Signaled when all of the WorkItemsGroup's work item completed.
  58. /// </summary>
  59. private ManualResetEvent _isIdleWaitHandle = new ManualResetEvent(true);
  60. /// <summary>
  61. /// A common object for all the work items that this work items group
  62. /// generate so we can mark them to cancel in O(1)
  63. /// </summary>
  64. private CanceledWorkItemsGroup _canceledWorkItemsGroup = new CanceledWorkItemsGroup();
  65. #endregion
  66. #region Construction
  67. public WorkItemsGroup(
  68. SmartThreadPool stp,
  69. int concurrency,
  70. WIGStartInfo wigStartInfo)
  71. {
  72. if (concurrency <= 0)
  73. {
  74. throw new ArgumentOutOfRangeException("concurrency", concurrency, "concurrency must be greater than zero");
  75. }
  76. _stp = stp;
  77. _concurrency = concurrency;
  78. _workItemsGroupStartInfo = new WIGStartInfo(wigStartInfo);
  79. _workItemsQueue = new PriorityQueue();
  80. // The _workItemsInStpQueue gets the number of currently executing work items,
  81. // because once a work item is executing, it cannot be cancelled.
  82. _workItemsInStpQueue = _workItemsExecutingInStp;
  83. }
  84. #endregion
  85. #region IWorkItemsGroup implementation
  86. /// <summary>
  87. /// Get/Set the name of the SmartThreadPool instance
  88. /// </summary>
  89. public string Name
  90. {
  91. get
  92. {
  93. return _name;
  94. }
  95. set
  96. {
  97. _name = value;
  98. }
  99. }
  100. /// <summary>
  101. /// Queue a work item
  102. /// </summary>
  103. /// <param name="callback">A callback to execute</param>
  104. /// <returns>Returns a work item result</returns>
  105. public IWorkItemResult QueueWorkItem(WorkItemCallback callback)
  106. {
  107. WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback);
  108. EnqueueToSTPNextWorkItem(workItem);
  109. return workItem.GetWorkItemResult();
  110. }
  111. /// <summary>
  112. /// Queue a work item
  113. /// </summary>
  114. /// <param name="callback">A callback to execute</param>
  115. /// <param name="workItemPriority">The priority of the work item</param>
  116. /// <returns>Returns a work item result</returns>
  117. public IWorkItemResult QueueWorkItem(WorkItemCallback callback, WorkItemPriority workItemPriority)
  118. {
  119. WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, workItemPriority);
  120. EnqueueToSTPNextWorkItem(workItem);
  121. return workItem.GetWorkItemResult();
  122. }
  123. /// <summary>
  124. /// Queue a work item
  125. /// </summary>
  126. /// <param name="workItemInfo">Work item info</param>
  127. /// <param name="callback">A callback to execute</param>
  128. /// <returns>Returns a work item result</returns>
  129. public IWorkItemResult QueueWorkItem(WorkItemInfo workItemInfo, WorkItemCallback callback)
  130. {
  131. WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, workItemInfo, callback);
  132. EnqueueToSTPNextWorkItem(workItem);
  133. return workItem.GetWorkItemResult();
  134. }
  135. /// <summary>
  136. /// Queue a work item
  137. /// </summary>
  138. /// <param name="callback">A callback to execute</param>
  139. /// <param name="state">
  140. /// The context object of the work item. Used for passing arguments to the work item.
  141. /// </param>
  142. /// <returns>Returns a work item result</returns>
  143. public IWorkItemResult QueueWorkItem(WorkItemCallback callback, object state)
  144. {
  145. WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, state);
  146. EnqueueToSTPNextWorkItem(workItem);
  147. return workItem.GetWorkItemResult();
  148. }
  149. /// <summary>
  150. /// Queue a work item
  151. /// </summary>
  152. /// <param name="callback">A callback to execute</param>
  153. /// <param name="state">
  154. /// The context object of the work item. Used for passing arguments to the work item.
  155. /// </param>
  156. /// <param name="workItemPriority">The work item priority</param>
  157. /// <returns>Returns a work item result</returns>
  158. public IWorkItemResult QueueWorkItem(WorkItemCallback callback, object state, WorkItemPriority workItemPriority)
  159. {
  160. WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, state, workItemPriority);
  161. EnqueueToSTPNextWorkItem(workItem);
  162. return workItem.GetWorkItemResult();
  163. }
  164. /// <summary>
  165. /// Queue a work item
  166. /// </summary>
  167. /// <param name="workItemInfo">Work item information</param>
  168. /// <param name="callback">A callback to execute</param>
  169. /// <param name="state">
  170. /// The context object of the work item. Used for passing arguments to the work item.
  171. /// </param>
  172. /// <returns>Returns a work item result</returns>
  173. public IWorkItemResult QueueWorkItem(WorkItemInfo workItemInfo, WorkItemCallback callback, object state)
  174. {
  175. WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, workItemInfo, callback, state);
  176. EnqueueToSTPNextWorkItem(workItem);
  177. return workItem.GetWorkItemResult();
  178. }
  179. /// <summary>
  180. /// Queue a work item
  181. /// </summary>
  182. /// <param name="callback">A callback to execute</param>
  183. /// <param name="state">
  184. /// The context object of the work item. Used for passing arguments to the work item.
  185. /// </param>
  186. /// <param name="postExecuteWorkItemCallback">
  187. /// A delegate to call after the callback completion
  188. /// </param>
  189. /// <returns>Returns a work item result</returns>
  190. public IWorkItemResult QueueWorkItem(
  191. WorkItemCallback callback,
  192. object state,
  193. PostExecuteWorkItemCallback postExecuteWorkItemCallback)
  194. {
  195. WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, state, postExecuteWorkItemCallback);
  196. EnqueueToSTPNextWorkItem(workItem);
  197. return workItem.GetWorkItemResult();
  198. }
  199. /// <summary>
  200. /// Queue a work item
  201. /// </summary>
  202. /// <param name="callback">A callback to execute</param>
  203. /// <param name="state">
  204. /// The context object of the work item. Used for passing arguments to the work item.
  205. /// </param>
  206. /// <param name="postExecuteWorkItemCallback">
  207. /// A delegate to call after the callback completion
  208. /// </param>
  209. /// <param name="workItemPriority">The work item priority</param>
  210. /// <returns>Returns a work item result</returns>
  211. public IWorkItemResult QueueWorkItem(
  212. WorkItemCallback callback,
  213. object state,
  214. PostExecuteWorkItemCallback postExecuteWorkItemCallback,
  215. WorkItemPriority workItemPriority)
  216. {
  217. WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, state, postExecuteWorkItemCallback, workItemPriority);
  218. EnqueueToSTPNextWorkItem(workItem);
  219. return workItem.GetWorkItemResult();
  220. }
  221. /// <summary>
  222. /// Queue a work item
  223. /// </summary>
  224. /// <param name="callback">A callback to execute</param>
  225. /// <param name="state">
  226. /// The context object of the work item. Used for passing arguments to the work item.
  227. /// </param>
  228. /// <param name="postExecuteWorkItemCallback">
  229. /// A delegate to call after the callback completion
  230. /// </param>
  231. /// <param name="callToPostExecute">Indicates on which cases to call to the post execute callback</param>
  232. /// <returns>Returns a work item result</returns>
  233. public IWorkItemResult QueueWorkItem(
  234. WorkItemCallback callback,
  235. object state,
  236. PostExecuteWorkItemCallback postExecuteWorkItemCallback,
  237. CallToPostExecute callToPostExecute)
  238. {
  239. WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, state, postExecuteWorkItemCallback, callToPostExecute);
  240. EnqueueToSTPNextWorkItem(workItem);
  241. return workItem.GetWorkItemResult();
  242. }
  243. /// <summary>
  244. /// Queue a work item
  245. /// </summary>
  246. /// <param name="callback">A callback to execute</param>
  247. /// <param name="state">
  248. /// The context object of the work item. Used for passing arguments to the work item.
  249. /// </param>
  250. /// <param name="postExecuteWorkItemCallback">
  251. /// A delegate to call after the callback completion
  252. /// </param>
  253. /// <param name="callToPostExecute">Indicates on which cases to call to the post execute callback</param>
  254. /// <param name="workItemPriority">The work item priority</param>
  255. /// <returns>Returns a work item result</returns>
  256. public IWorkItemResult QueueWorkItem(
  257. WorkItemCallback callback,
  258. object state,
  259. PostExecuteWorkItemCallback postExecuteWorkItemCallback,
  260. CallToPostExecute callToPostExecute,
  261. WorkItemPriority workItemPriority)
  262. {
  263. WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, state, postExecuteWorkItemCallback, callToPostExecute, workItemPriority);
  264. EnqueueToSTPNextWorkItem(workItem);
  265. return workItem.GetWorkItemResult();
  266. }
  267. /// <summary>
  268. /// Wait for the thread pool to be idle
  269. /// </summary>
  270. public void WaitForIdle()
  271. {
  272. WaitForIdle(Timeout.Infinite);
  273. }
  274. /// <summary>
  275. /// Wait for the thread pool to be idle
  276. /// </summary>
  277. public bool WaitForIdle(TimeSpan timeout)
  278. {
  279. return WaitForIdle((int)timeout.TotalMilliseconds);
  280. }
  281. /// <summary>
  282. /// Wait for the thread pool to be idle
  283. /// </summary>
  284. public bool WaitForIdle(int millisecondsTimeout)
  285. {
  286. _stp.ValidateWorkItemsGroupWaitForIdle(this);
  287. return _isIdleWaitHandle.WaitOne(millisecondsTimeout, false);
  288. }
  289. public int WaitingCallbacks
  290. {
  291. get
  292. {
  293. return _workItemsQueue.Count;
  294. }
  295. }
  296. public event WorkItemsGroupIdleHandler OnIdle
  297. {
  298. add
  299. {
  300. _onIdle += value;
  301. }
  302. remove
  303. {
  304. _onIdle -= value;
  305. }
  306. }
  307. public void Cancel()
  308. {
  309. lock(_lock)
  310. {
  311. _canceledWorkItemsGroup.IsCanceled = true;
  312. _workItemsQueue.Clear();
  313. _workItemsInStpQueue = 0;
  314. _canceledWorkItemsGroup = new CanceledWorkItemsGroup();
  315. }
  316. }
  317. public void Start()
  318. {
  319. lock (this)
  320. {
  321. if (!_workItemsGroupStartInfo.StartSuspended)
  322. {
  323. return;
  324. }
  325. _workItemsGroupStartInfo.StartSuspended = false;
  326. }
  327. for(int i = 0; i < _concurrency; ++i)
  328. {
  329. EnqueueToSTPNextWorkItem(null, false);
  330. }
  331. }
  332. #endregion
  333. #region Private methods
  334. private void RegisterToWorkItemCompletion(IWorkItemResult wir)
  335. {
  336. IInternalWorkItemResult iwir = wir as IInternalWorkItemResult;
  337. iwir.OnWorkItemStarted += new WorkItemStateCallback(OnWorkItemStartedCallback);
  338. iwir.OnWorkItemCompleted += new WorkItemStateCallback(OnWorkItemCompletedCallback);
  339. }
  340. public void OnSTPIsStarting()
  341. {
  342. lock (this)
  343. {
  344. if (_workItemsGroupStartInfo.StartSuspended)
  345. {
  346. return;
  347. }
  348. }
  349. for(int i = 0; i < _concurrency; ++i)
  350. {
  351. EnqueueToSTPNextWorkItem(null, false);
  352. }
  353. }
  354. private object FireOnIdle(object state)
  355. {
  356. FireOnIdleImpl(_onIdle);
  357. return null;
  358. }
  359. [MethodImpl(MethodImplOptions.NoInlining)]
  360. private void FireOnIdleImpl(WorkItemsGroupIdleHandler onIdle)
  361. {
  362. if(null == onIdle)
  363. {
  364. return;
  365. }
  366. Delegate[] delegates = onIdle.GetInvocationList();
  367. foreach(WorkItemsGroupIdleHandler eh in delegates)
  368. {
  369. try
  370. {
  371. eh(this);
  372. }
  373. // Ignore exceptions
  374. catch{}
  375. }
  376. }
  377. private void OnWorkItemStartedCallback(WorkItem workItem)
  378. {
  379. lock(_lock)
  380. {
  381. ++_workItemsExecutingInStp;
  382. }
  383. }
  384. private void OnWorkItemCompletedCallback(WorkItem workItem)
  385. {
  386. EnqueueToSTPNextWorkItem(null, true);
  387. }
  388. private void EnqueueToSTPNextWorkItem(WorkItem workItem)
  389. {
  390. EnqueueToSTPNextWorkItem(workItem, false);
  391. }
  392. private void EnqueueToSTPNextWorkItem(WorkItem workItem, bool decrementWorkItemsInStpQueue)
  393. {
  394. lock(_lock)
  395. {
  396. // Got here from OnWorkItemCompletedCallback()
  397. if (decrementWorkItemsInStpQueue)
  398. {
  399. --_workItemsInStpQueue;
  400. if(_workItemsInStpQueue < 0)
  401. {
  402. _workItemsInStpQueue = 0;
  403. }
  404. --_workItemsExecutingInStp;
  405. if(_workItemsExecutingInStp < 0)
  406. {
  407. _workItemsExecutingInStp = 0;
  408. }
  409. }
  410. // If the work item is not null then enqueue it
  411. if (null != workItem)
  412. {
  413. workItem.CanceledWorkItemsGroup = _canceledWorkItemsGroup;
  414. RegisterToWorkItemCompletion(workItem.GetWorkItemResult());
  415. _workItemsQueue.Enqueue(workItem);
  416. //_stp.IncrementWorkItemsCount();
  417. if ((1 == _workItemsQueue.Count) &&
  418. (0 == _workItemsInStpQueue))
  419. {
  420. _stp.RegisterWorkItemsGroup(this);
  421. Trace.WriteLine("WorkItemsGroup " + Name + " is NOT idle");
  422. _isIdleWaitHandle.Reset();
  423. }
  424. }
  425. // If the work items queue of the group is empty than quit
  426. if (0 == _workItemsQueue.Count)
  427. {
  428. if (0 == _workItemsInStpQueue)
  429. {
  430. _stp.UnregisterWorkItemsGroup(this);
  431. Trace.WriteLine("WorkItemsGroup " + Name + " is idle");
  432. _isIdleWaitHandle.Set();
  433. _stp.QueueWorkItem(new WorkItemCallback(this.FireOnIdle));
  434. }
  435. return;
  436. }
  437. if (!_workItemsGroupStartInfo.StartSuspended)
  438. {
  439. if (_workItemsInStpQueue < _concurrency)
  440. {
  441. WorkItem nextWorkItem = _workItemsQueue.Dequeue() as WorkItem;
  442. _stp.Enqueue(nextWorkItem, true);
  443. ++_workItemsInStpQueue;
  444. }
  445. }
  446. }
  447. }
  448. #endregion
  449. }
  450. #endregion
  451. }