WorkItemsGroup.cs 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Threading;
  4. using System.Runtime.CompilerServices;
  5. using System.Diagnostics;
  6. namespace Amib.Threading.Internal
  7. {
  8. #region WorkItemsGroup class
  9. /// <summary>
  10. /// Summary description for WorkItemsGroup.
  11. /// </summary>
  12. public class WorkItemsGroup : WorkItemsGroupBase
  13. {
  14. #region Private members
  15. private readonly object _lock = new object();
  16. /// <summary>
  17. /// A reference to the SmartThreadPool instance that created this
  18. /// WorkItemsGroup.
  19. /// </summary>
  20. private readonly SmartThreadPool _stp;
  21. /// <summary>
  22. /// The OnIdle event
  23. /// </summary>
  24. private event WorkItemsGroupIdleHandler _onIdle;
  25. /// <summary>
  26. /// A flag to indicate if the Work Items Group is now suspended.
  27. /// </summary>
  28. private bool _isSuspended;
  29. /// <summary>
  30. /// Defines how many work items of this WorkItemsGroup can run at once.
  31. /// </summary>
  32. private int _concurrency;
  33. /// <summary>
  34. /// Priority queue to hold work items before they are passed
  35. /// to the SmartThreadPool.
  36. /// </summary>
  37. private readonly Queue<WorkItem> _workItemsQueue;
  38. /// <summary>
  39. /// Indicate how many work items are waiting in the SmartThreadPool
  40. /// queue.
  41. /// This value is used to apply the concurrency.
  42. /// </summary>
  43. private int _workItemsInStpQueue;
  44. /// <summary>
  45. /// Indicate how many work items are currently running in the SmartThreadPool.
  46. /// This value is used with the Cancel, to calculate if we can send new
  47. /// work items to the STP.
  48. /// </summary>
  49. private int _workItemsExecutingInStp = 0;
  50. /// <summary>
  51. /// WorkItemsGroup start information
  52. /// </summary>
  53. private readonly WIGStartInfo _workItemsGroupStartInfo;
  54. /// <summary>
  55. /// Signaled when all of the WorkItemsGroup's work item completed.
  56. /// </summary>
  57. private readonly ManualResetEvent _isIdleWaitHandle = new ManualResetEvent(true);
  58. /// <summary>
  59. /// A common object for all the work items that this work items group
  60. /// generate so we can mark them to cancel in O(1)
  61. /// </summary>
  62. private CanceledWorkItemsGroup _canceledWorkItemsGroup = new CanceledWorkItemsGroup();
  63. #endregion
  64. #region Construction
  65. public WorkItemsGroup(
  66. SmartThreadPool stp,
  67. int concurrency,
  68. WIGStartInfo wigStartInfo)
  69. {
  70. if (concurrency <= 0)
  71. {
  72. throw new ArgumentOutOfRangeException(
  73. "concurrency",
  74. concurrency,
  75. "concurrency must be greater than zero");
  76. }
  77. _stp = stp;
  78. _concurrency = concurrency;
  79. _workItemsGroupStartInfo = new WIGStartInfo(wigStartInfo).AsReadOnly();
  80. _workItemsQueue = new Queue<WorkItem>();
  81. Name = "WorkItemsGroup";
  82. // The _workItemsInStpQueue gets the number of currently executing work items,
  83. // because once a work item is executing, it cannot be cancelled.
  84. _workItemsInStpQueue = _workItemsExecutingInStp;
  85. _isSuspended = _workItemsGroupStartInfo.StartSuspended;
  86. }
  87. #endregion
  88. #region WorkItemsGroupBase Overrides
  89. public override int Concurrency
  90. {
  91. get { return _concurrency; }
  92. set
  93. {
  94. Debug.Assert(value > 0);
  95. int diff = value - _concurrency;
  96. _concurrency = value;
  97. if (diff > 0)
  98. {
  99. EnqueueToSTPNextNWorkItem(diff);
  100. }
  101. }
  102. }
  103. public override int WaitingCallbacks
  104. {
  105. get { return _workItemsQueue.Count; }
  106. }
  107. public override object[] GetStates()
  108. {
  109. lock (_lock)
  110. {
  111. object[] states = new object[_workItemsQueue.Count];
  112. int i = 0;
  113. foreach (WorkItem workItem in _workItemsQueue)
  114. {
  115. states[i] = workItem.GetWorkItemResult().State;
  116. ++i;
  117. }
  118. return states;
  119. }
  120. }
  121. /// <summary>
  122. /// WorkItemsGroup start information
  123. /// </summary>
  124. public override WIGStartInfo WIGStartInfo
  125. {
  126. get { return _workItemsGroupStartInfo; }
  127. }
  128. /// <summary>
  129. /// Start the Work Items Group if it was started suspended
  130. /// </summary>
  131. public override void Start()
  132. {
  133. // If the Work Items Group already started then quit
  134. if (!_isSuspended)
  135. {
  136. return;
  137. }
  138. _isSuspended = false;
  139. EnqueueToSTPNextNWorkItem(Math.Min(_workItemsQueue.Count, _concurrency));
  140. }
  141. public override void Cancel(bool abortExecution)
  142. {
  143. lock (_lock)
  144. {
  145. _canceledWorkItemsGroup.IsCanceled = true;
  146. _workItemsQueue.Clear();
  147. _workItemsInStpQueue = 0;
  148. _canceledWorkItemsGroup = new CanceledWorkItemsGroup();
  149. }
  150. if (abortExecution)
  151. {
  152. _stp.CancelAbortWorkItemsGroup(this);
  153. }
  154. }
  155. /// <summary>
  156. /// Wait for the thread pool to be idle
  157. /// </summary>
  158. public override bool WaitForIdle(int millisecondsTimeout)
  159. {
  160. SmartThreadPool.ValidateWorkItemsGroupWaitForIdle(this);
  161. return STPEventWaitHandle.WaitOne(_isIdleWaitHandle, millisecondsTimeout, false);
  162. }
  163. public override event WorkItemsGroupIdleHandler OnIdle
  164. {
  165. add { _onIdle += value; }
  166. remove { _onIdle -= value; }
  167. }
  168. #endregion
  169. #region Private methods
  170. private void RegisterToWorkItemCompletion(IWorkItemResult wir)
  171. {
  172. IInternalWorkItemResult iwir = (IInternalWorkItemResult)wir;
  173. iwir.OnWorkItemStarted += OnWorkItemStartedCallback;
  174. iwir.OnWorkItemCompleted += OnWorkItemCompletedCallback;
  175. }
  176. public void OnSTPIsStarting()
  177. {
  178. if (_isSuspended)
  179. {
  180. return;
  181. }
  182. EnqueueToSTPNextNWorkItem(_concurrency);
  183. }
  184. public void EnqueueToSTPNextNWorkItem(int count)
  185. {
  186. for (int i = 0; i < count; ++i)
  187. {
  188. EnqueueToSTPNextWorkItem(null, false);
  189. }
  190. }
  191. private object FireOnIdle(object state)
  192. {
  193. FireOnIdleImpl(_onIdle);
  194. return null;
  195. }
  196. [MethodImpl(MethodImplOptions.NoInlining)]
  197. private void FireOnIdleImpl(WorkItemsGroupIdleHandler onIdle)
  198. {
  199. if(null == onIdle)
  200. {
  201. return;
  202. }
  203. Delegate[] delegates = onIdle.GetInvocationList();
  204. foreach(WorkItemsGroupIdleHandler eh in delegates)
  205. {
  206. try
  207. {
  208. eh(this);
  209. }
  210. catch { } // Suppress exceptions
  211. }
  212. }
  213. private void OnWorkItemStartedCallback(WorkItem workItem)
  214. {
  215. lock(_lock)
  216. {
  217. ++_workItemsExecutingInStp;
  218. }
  219. }
  220. private void OnWorkItemCompletedCallback(WorkItem workItem)
  221. {
  222. EnqueueToSTPNextWorkItem(null, true);
  223. }
  224. internal override void Enqueue(WorkItem workItem)
  225. {
  226. EnqueueToSTPNextWorkItem(workItem);
  227. }
  228. private void EnqueueToSTPNextWorkItem(WorkItem workItem)
  229. {
  230. EnqueueToSTPNextWorkItem(workItem, false);
  231. }
  232. private void EnqueueToSTPNextWorkItem(WorkItem workItem, bool decrementWorkItemsInStpQueue)
  233. {
  234. lock(_lock)
  235. {
  236. // Got here from OnWorkItemCompletedCallback()
  237. if (decrementWorkItemsInStpQueue)
  238. {
  239. --_workItemsInStpQueue;
  240. if(_workItemsInStpQueue < 0)
  241. {
  242. _workItemsInStpQueue = 0;
  243. }
  244. --_workItemsExecutingInStp;
  245. if(_workItemsExecutingInStp < 0)
  246. {
  247. _workItemsExecutingInStp = 0;
  248. }
  249. }
  250. // If the work item is not null then enqueue it
  251. if (null != workItem)
  252. {
  253. workItem.CanceledWorkItemsGroup = _canceledWorkItemsGroup;
  254. RegisterToWorkItemCompletion(workItem.GetWorkItemResult());
  255. _workItemsQueue.Enqueue(workItem);
  256. //_stp.IncrementWorkItemsCount();
  257. if ((1 == _workItemsQueue.Count) &&
  258. (0 == _workItemsInStpQueue))
  259. {
  260. _stp.RegisterWorkItemsGroup(this);
  261. IsIdle = false;
  262. _isIdleWaitHandle.Reset();
  263. }
  264. }
  265. // If the work items queue of the group is empty than quit
  266. if (0 == _workItemsQueue.Count)
  267. {
  268. if (0 == _workItemsInStpQueue)
  269. {
  270. _stp.UnregisterWorkItemsGroup(this);
  271. IsIdle = true;
  272. _isIdleWaitHandle.Set();
  273. if (decrementWorkItemsInStpQueue && _onIdle != null && _onIdle.GetInvocationList().Length > 0)
  274. {
  275. _stp.QueueWorkItem(new WorkItemCallback(FireOnIdle));
  276. }
  277. }
  278. return;
  279. }
  280. if (!_isSuspended)
  281. {
  282. if (_workItemsInStpQueue < _concurrency)
  283. {
  284. WorkItem nextWorkItem = _workItemsQueue.Dequeue() as WorkItem;
  285. try
  286. {
  287. _stp.Enqueue(nextWorkItem);
  288. }
  289. catch (ObjectDisposedException e)
  290. {
  291. e.GetHashCode();
  292. // The STP has been shutdown
  293. }
  294. ++_workItemsInStpQueue;
  295. }
  296. }
  297. }
  298. }
  299. #endregion
  300. }
  301. #endregion
  302. }