WorkItemsGroup.cs 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358
  1. using System;
  2. using System.Threading;
  3. using System.Runtime.CompilerServices;
  4. using System.Diagnostics;
  5. namespace Amib.Threading.Internal
  6. {
  7. #region WorkItemsGroup class
  8. /// <summary>
  9. /// Summary description for WorkItemsGroup.
  10. /// </summary>
  11. public class WorkItemsGroup : WorkItemsGroupBase
  12. {
  13. #region Private members
  14. private readonly object _lock = new object();
  15. /// <summary>
  16. /// A reference to the SmartThreadPool instance that created this
  17. /// WorkItemsGroup.
  18. /// </summary>
  19. private readonly SmartThreadPool _stp;
  20. /// <summary>
  21. /// The OnIdle event
  22. /// </summary>
  23. private event WorkItemsGroupIdleHandler _onIdle;
  24. /// <summary>
  25. /// A flag to indicate if the Work Items Group is now suspended.
  26. /// </summary>
  27. private bool _isSuspended;
  28. /// <summary>
  29. /// Defines how many work items of this WorkItemsGroup can run at once.
  30. /// </summary>
  31. private int _concurrency;
  32. /// <summary>
  33. /// Priority queue to hold work items before they are passed
  34. /// to the SmartThreadPool.
  35. /// </summary>
  36. private readonly PriorityQueue _workItemsQueue;
  37. /// <summary>
  38. /// Indicate how many work items are waiting in the SmartThreadPool
  39. /// queue.
  40. /// This value is used to apply the concurrency.
  41. /// </summary>
  42. private int _workItemsInStpQueue;
  43. /// <summary>
  44. /// Indicate how many work items are currently running in the SmartThreadPool.
  45. /// This value is used with the Cancel, to calculate if we can send new
  46. /// work items to the STP.
  47. /// </summary>
  48. private int _workItemsExecutingInStp = 0;
  49. /// <summary>
  50. /// WorkItemsGroup start information
  51. /// </summary>
  52. private readonly WIGStartInfo _workItemsGroupStartInfo;
  53. /// <summary>
  54. /// Signaled when all of the WorkItemsGroup's work item completed.
  55. /// </summary>
  56. private readonly ManualResetEvent _isIdleWaitHandle = new ManualResetEvent(true);
  57. /// <summary>
  58. /// A common object for all the work items that this work items group
  59. /// generate so we can mark them to cancel in O(1)
  60. /// </summary>
  61. private CanceledWorkItemsGroup _canceledWorkItemsGroup = new CanceledWorkItemsGroup();
  62. #endregion
  63. #region Construction
  64. public WorkItemsGroup(
  65. SmartThreadPool stp,
  66. int concurrency,
  67. WIGStartInfo wigStartInfo)
  68. {
  69. if (concurrency <= 0)
  70. {
  71. throw new ArgumentOutOfRangeException(
  72. "concurrency",
  73. concurrency,
  74. "concurrency must be greater than zero");
  75. }
  76. _stp = stp;
  77. _concurrency = concurrency;
  78. _workItemsGroupStartInfo = new WIGStartInfo(wigStartInfo).AsReadOnly();
  79. _workItemsQueue = new PriorityQueue();
  80. Name = "WorkItemsGroup";
  81. // The _workItemsInStpQueue gets the number of currently executing work items,
  82. // because once a work item is executing, it cannot be cancelled.
  83. _workItemsInStpQueue = _workItemsExecutingInStp;
  84. _isSuspended = _workItemsGroupStartInfo.StartSuspended;
  85. }
  86. #endregion
  87. #region WorkItemsGroupBase Overrides
  88. public override int Concurrency
  89. {
  90. get { return _concurrency; }
  91. set
  92. {
  93. Debug.Assert(value > 0);
  94. int diff = value - _concurrency;
  95. _concurrency = value;
  96. if (diff > 0)
  97. {
  98. EnqueueToSTPNextNWorkItem(diff);
  99. }
  100. }
  101. }
  102. public override int WaitingCallbacks
  103. {
  104. get { return _workItemsQueue.Count; }
  105. }
  106. public override object[] GetStates()
  107. {
  108. lock (_lock)
  109. {
  110. object[] states = new object[_workItemsQueue.Count];
  111. int i = 0;
  112. foreach (WorkItem workItem in _workItemsQueue)
  113. {
  114. states[i] = workItem.GetWorkItemResult().State;
  115. ++i;
  116. }
  117. return states;
  118. }
  119. }
  120. /// <summary>
  121. /// WorkItemsGroup start information
  122. /// </summary>
  123. public override WIGStartInfo WIGStartInfo
  124. {
  125. get { return _workItemsGroupStartInfo; }
  126. }
  127. /// <summary>
  128. /// Start the Work Items Group if it was started suspended
  129. /// </summary>
  130. public override void Start()
  131. {
  132. // If the Work Items Group already started then quit
  133. if (!_isSuspended)
  134. {
  135. return;
  136. }
  137. _isSuspended = false;
  138. EnqueueToSTPNextNWorkItem(Math.Min(_workItemsQueue.Count, _concurrency));
  139. }
  140. public override void Cancel(bool abortExecution)
  141. {
  142. lock (_lock)
  143. {
  144. _canceledWorkItemsGroup.IsCanceled = true;
  145. _workItemsQueue.Clear();
  146. _workItemsInStpQueue = 0;
  147. _canceledWorkItemsGroup = new CanceledWorkItemsGroup();
  148. }
  149. if (abortExecution)
  150. {
  151. _stp.CancelAbortWorkItemsGroup(this);
  152. }
  153. }
  154. /// <summary>
  155. /// Wait for the thread pool to be idle
  156. /// </summary>
  157. public override bool WaitForIdle(int millisecondsTimeout)
  158. {
  159. SmartThreadPool.ValidateWorkItemsGroupWaitForIdle(this);
  160. return STPEventWaitHandle.WaitOne(_isIdleWaitHandle, millisecondsTimeout, false);
  161. }
  162. public override event WorkItemsGroupIdleHandler OnIdle
  163. {
  164. add { _onIdle += value; }
  165. remove { _onIdle -= value; }
  166. }
  167. #endregion
  168. #region Private methods
  169. private void RegisterToWorkItemCompletion(IWorkItemResult wir)
  170. {
  171. IInternalWorkItemResult iwir = (IInternalWorkItemResult)wir;
  172. iwir.OnWorkItemStarted += OnWorkItemStartedCallback;
  173. iwir.OnWorkItemCompleted += OnWorkItemCompletedCallback;
  174. }
  175. public void OnSTPIsStarting()
  176. {
  177. if (_isSuspended)
  178. {
  179. return;
  180. }
  181. EnqueueToSTPNextNWorkItem(_concurrency);
  182. }
  183. public void EnqueueToSTPNextNWorkItem(int count)
  184. {
  185. for (int i = 0; i < count; ++i)
  186. {
  187. EnqueueToSTPNextWorkItem(null, false);
  188. }
  189. }
  190. private object FireOnIdle(object state)
  191. {
  192. FireOnIdleImpl(_onIdle);
  193. return null;
  194. }
  195. [MethodImpl(MethodImplOptions.NoInlining)]
  196. private void FireOnIdleImpl(WorkItemsGroupIdleHandler onIdle)
  197. {
  198. if(null == onIdle)
  199. {
  200. return;
  201. }
  202. Delegate[] delegates = onIdle.GetInvocationList();
  203. foreach(WorkItemsGroupIdleHandler eh in delegates)
  204. {
  205. try
  206. {
  207. eh(this);
  208. }
  209. catch { } // Suppress exceptions
  210. }
  211. }
  212. private void OnWorkItemStartedCallback(WorkItem workItem)
  213. {
  214. lock(_lock)
  215. {
  216. ++_workItemsExecutingInStp;
  217. }
  218. }
  219. private void OnWorkItemCompletedCallback(WorkItem workItem)
  220. {
  221. EnqueueToSTPNextWorkItem(null, true);
  222. }
  223. internal override void Enqueue(WorkItem workItem)
  224. {
  225. EnqueueToSTPNextWorkItem(workItem);
  226. }
  227. private void EnqueueToSTPNextWorkItem(WorkItem workItem)
  228. {
  229. EnqueueToSTPNextWorkItem(workItem, false);
  230. }
  231. private void EnqueueToSTPNextWorkItem(WorkItem workItem, bool decrementWorkItemsInStpQueue)
  232. {
  233. lock(_lock)
  234. {
  235. // Got here from OnWorkItemCompletedCallback()
  236. if (decrementWorkItemsInStpQueue)
  237. {
  238. --_workItemsInStpQueue;
  239. if(_workItemsInStpQueue < 0)
  240. {
  241. _workItemsInStpQueue = 0;
  242. }
  243. --_workItemsExecutingInStp;
  244. if(_workItemsExecutingInStp < 0)
  245. {
  246. _workItemsExecutingInStp = 0;
  247. }
  248. }
  249. // If the work item is not null then enqueue it
  250. if (null != workItem)
  251. {
  252. workItem.CanceledWorkItemsGroup = _canceledWorkItemsGroup;
  253. RegisterToWorkItemCompletion(workItem.GetWorkItemResult());
  254. _workItemsQueue.Enqueue(workItem);
  255. //_stp.IncrementWorkItemsCount();
  256. if ((1 == _workItemsQueue.Count) &&
  257. (0 == _workItemsInStpQueue))
  258. {
  259. _stp.RegisterWorkItemsGroup(this);
  260. IsIdle = false;
  261. _isIdleWaitHandle.Reset();
  262. }
  263. }
  264. // If the work items queue of the group is empty than quit
  265. if (0 == _workItemsQueue.Count)
  266. {
  267. if (0 == _workItemsInStpQueue)
  268. {
  269. _stp.UnregisterWorkItemsGroup(this);
  270. IsIdle = true;
  271. _isIdleWaitHandle.Set();
  272. if (decrementWorkItemsInStpQueue && _onIdle != null && _onIdle.GetInvocationList().Length > 0)
  273. {
  274. _stp.QueueWorkItem(new WorkItemCallback(FireOnIdle));
  275. }
  276. }
  277. return;
  278. }
  279. if (!_isSuspended)
  280. {
  281. if (_workItemsInStpQueue < _concurrency)
  282. {
  283. WorkItem nextWorkItem = _workItemsQueue.Dequeue() as WorkItem;
  284. try
  285. {
  286. _stp.Enqueue(nextWorkItem);
  287. }
  288. catch (ObjectDisposedException e)
  289. {
  290. e.GetHashCode();
  291. // The STP has been shutdown
  292. }
  293. ++_workItemsInStpQueue;
  294. }
  295. }
  296. }
  297. }
  298. #endregion
  299. }
  300. #endregion
  301. }