WorkItemsGroup.cs 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Threading;
  4. using System.Runtime.CompilerServices;
  5. using System.Diagnostics;
  6. namespace Amib.Threading.Internal
  7. {
  8. #region WorkItemsGroup class
  9. /// <summary>
  10. /// Summary description for WorkItemsGroup.
  11. /// </summary>
  12. public class WorkItemsGroup : WorkItemsGroupBase
  13. {
  14. #region Private members
  15. private readonly object _lock = new();
  16. /// <summary>
  17. /// A reference to the SmartThreadPool instance that created this
  18. /// WorkItemsGroup.
  19. /// </summary>
  20. private readonly SmartThreadPool _stp;
  21. /// <summary>
  22. /// The OnIdle event
  23. /// </summary>
  24. private event WorkItemsGroupIdleHandler _onIdle;
  25. /// <summary>
  26. /// A flag to indicate if the Work Items Group is now suspended.
  27. /// </summary>
  28. private bool _isSuspended;
  29. /// <summary>
  30. /// Defines how many work items of this WorkItemsGroup can run at once.
  31. /// </summary>
  32. private int _concurrency;
  33. /// <summary>
  34. /// Priority queue to hold work items before they are passed
  35. /// to the SmartThreadPool.
  36. /// </summary>
  37. private readonly Queue<WorkItem> _workItemsQueue;
  38. /// <summary>
  39. /// Indicate how many work items are waiting in the SmartThreadPool
  40. /// queue.
  41. /// This value is used to apply the concurrency.
  42. /// </summary>
  43. private int _workItemsInStpQueue;
  44. /// <summary>
  45. /// Indicate how many work items are currently running in the SmartThreadPool.
  46. /// This value is used with the Cancel, to calculate if we can send new
  47. /// work items to the STP.
  48. /// </summary>
  49. private int _workItemsExecutingInStp = 0;
  50. /// <summary>
  51. /// WorkItemsGroup start information
  52. /// </summary>
  53. private readonly WIGStartInfo _workItemsGroupStartInfo;
  54. /// <summary>
  55. /// Signaled when all of the WorkItemsGroup's work item completed.
  56. /// </summary>
  57. private readonly ManualResetEvent _isIdleWaitHandle = new(true);
  58. /// <summary>
  59. /// A common object for all the work items that this work items group
  60. /// generate so we can mark them to cancel in O(1)
  61. /// </summary>
  62. private CanceledWorkItemsGroup _canceledWorkItemsGroup = new();
  63. #endregion
  64. #region Construction
  65. public WorkItemsGroup(SmartThreadPool stp, int concurrency, WIGStartInfo wigStartInfo)
  66. {
  67. if (concurrency <= 0)
  68. {
  69. throw new ArgumentOutOfRangeException(
  70. "concurrency",
  71. concurrency,
  72. "concurrency must be greater than zero");
  73. }
  74. _stp = stp;
  75. _concurrency = concurrency;
  76. _workItemsGroupStartInfo = new WIGStartInfo(wigStartInfo).AsReadOnly();
  77. _workItemsQueue = new Queue<WorkItem>();
  78. Name = "WorkItemsGroup";
  79. // The _workItemsInStpQueue gets the number of currently executing work items,
  80. // because once a work item is executing, it cannot be cancelled.
  81. _workItemsInStpQueue = _workItemsExecutingInStp;
  82. _isSuspended = _workItemsGroupStartInfo.StartSuspended;
  83. }
  84. #endregion
  85. #region WorkItemsGroupBase Overrides
  86. public override int Concurrency
  87. {
  88. get { return _concurrency; }
  89. set
  90. {
  91. Debug.Assert(value > 0);
  92. int diff = value - _concurrency;
  93. _concurrency = value;
  94. if (diff > 0)
  95. {
  96. EnqueueToSTPNextNWorkItem(diff);
  97. }
  98. }
  99. }
  100. public override int WaitingCallbacks
  101. {
  102. get { return _workItemsQueue.Count; }
  103. }
  104. public override object[] GetStates()
  105. {
  106. lock (_lock)
  107. {
  108. object[] states = new object[_workItemsQueue.Count];
  109. int i = 0;
  110. foreach (WorkItem workItem in _workItemsQueue)
  111. {
  112. states[i] = workItem.GetWorkItemResult().State;
  113. ++i;
  114. }
  115. return states;
  116. }
  117. }
  118. /// <summary>
  119. /// WorkItemsGroup start information
  120. /// </summary>
  121. public override WIGStartInfo WIGStartInfo
  122. {
  123. get { return _workItemsGroupStartInfo; }
  124. }
  125. /// <summary>
  126. /// Start the Work Items Group if it was started suspended
  127. /// </summary>
  128. public override void Start()
  129. {
  130. // If the Work Items Group already started then quit
  131. if (!_isSuspended)
  132. {
  133. return;
  134. }
  135. _isSuspended = false;
  136. EnqueueToSTPNextNWorkItem(Math.Min(_workItemsQueue.Count, _concurrency));
  137. }
  138. public override void Cancel(bool abortExecution)
  139. {
  140. lock (_lock)
  141. {
  142. _canceledWorkItemsGroup.IsCanceled = true;
  143. _workItemsQueue.Clear();
  144. _workItemsInStpQueue = 0;
  145. _canceledWorkItemsGroup = new CanceledWorkItemsGroup();
  146. }
  147. if (abortExecution)
  148. {
  149. _stp.CancelAbortWorkItemsGroup(this);
  150. }
  151. }
  152. /// <summary>
  153. /// Wait for the thread pool to be idle
  154. /// </summary>
  155. public override bool WaitForIdle(int millisecondsTimeout)
  156. {
  157. SmartThreadPool.ValidateWorkItemsGroupWaitForIdle(this);
  158. return STPEventWaitHandle.WaitOne(_isIdleWaitHandle, millisecondsTimeout, false);
  159. }
  160. public override event WorkItemsGroupIdleHandler OnIdle
  161. {
  162. add { _onIdle += value; }
  163. remove { _onIdle -= value; }
  164. }
  165. #endregion
  166. #region Private methods
  167. private void RegisterToWorkItemCompletion(IWorkItemResult wir)
  168. {
  169. IInternalWorkItemResult iwir = (IInternalWorkItemResult)wir;
  170. iwir.OnWorkItemStarted += OnWorkItemStartedCallback;
  171. iwir.OnWorkItemCompleted += OnWorkItemCompletedCallback;
  172. }
  173. public void OnSTPIsStarting()
  174. {
  175. if (_isSuspended)
  176. {
  177. return;
  178. }
  179. EnqueueToSTPNextNWorkItem(_concurrency);
  180. }
  181. public void EnqueueToSTPNextNWorkItem(int count)
  182. {
  183. for (int i = 0; i < count; ++i)
  184. {
  185. EnqueueToSTPNextWorkItem(null, false);
  186. }
  187. }
  188. private object FireOnIdle(object state)
  189. {
  190. FireOnIdleImpl(_onIdle);
  191. return null;
  192. }
  193. [MethodImpl(MethodImplOptions.NoInlining)]
  194. private void FireOnIdleImpl(WorkItemsGroupIdleHandler onIdle)
  195. {
  196. if(onIdle is null)
  197. return;
  198. Delegate[] delegates = onIdle.GetInvocationList();
  199. foreach(WorkItemsGroupIdleHandler eh in delegates)
  200. {
  201. try
  202. {
  203. eh(this);
  204. }
  205. catch { } // Suppress exceptions
  206. }
  207. }
  208. private void OnWorkItemStartedCallback(WorkItem workItem)
  209. {
  210. lock(_lock)
  211. {
  212. ++_workItemsExecutingInStp;
  213. }
  214. }
  215. private void OnWorkItemCompletedCallback(WorkItem workItem)
  216. {
  217. EnqueueToSTPNextWorkItem(null, true);
  218. }
  219. internal override void Enqueue(WorkItem workItem)
  220. {
  221. EnqueueToSTPNextWorkItem(workItem);
  222. }
  223. private void EnqueueToSTPNextWorkItem(WorkItem workItem)
  224. {
  225. EnqueueToSTPNextWorkItem(workItem, false);
  226. }
  227. private void EnqueueToSTPNextWorkItem(WorkItem workItem, bool decrementWorkItemsInStpQueue)
  228. {
  229. lock(_lock)
  230. {
  231. // Got here from OnWorkItemCompletedCallback()
  232. if (decrementWorkItemsInStpQueue)
  233. {
  234. --_workItemsInStpQueue;
  235. if(_workItemsInStpQueue < 0)
  236. {
  237. _workItemsInStpQueue = 0;
  238. }
  239. --_workItemsExecutingInStp;
  240. if(_workItemsExecutingInStp < 0)
  241. {
  242. _workItemsExecutingInStp = 0;
  243. }
  244. }
  245. // If the work item is not null then enqueue it
  246. if (workItem is not null)
  247. {
  248. workItem.CanceledWorkItemsGroup = _canceledWorkItemsGroup;
  249. RegisterToWorkItemCompletion(workItem.GetWorkItemResult());
  250. _workItemsQueue.Enqueue(workItem);
  251. //_stp.IncrementWorkItemsCount();
  252. if ((1 == _workItemsQueue.Count) &&
  253. (0 == _workItemsInStpQueue))
  254. {
  255. _stp.RegisterWorkItemsGroup(this);
  256. IsIdle = false;
  257. _isIdleWaitHandle.Reset();
  258. }
  259. }
  260. // If the work items queue of the group is empty than quit
  261. if (0 == _workItemsQueue.Count)
  262. {
  263. if (0 == _workItemsInStpQueue)
  264. {
  265. _stp.UnregisterWorkItemsGroup(this);
  266. IsIdle = true;
  267. _isIdleWaitHandle.Set();
  268. if (decrementWorkItemsInStpQueue && _onIdle is not null && _onIdle.GetInvocationList().Length > 0)
  269. {
  270. _stp.QueueWorkItem(new WorkItemCallback(FireOnIdle));
  271. }
  272. }
  273. return;
  274. }
  275. if (!_isSuspended)
  276. {
  277. if (_workItemsInStpQueue < _concurrency)
  278. {
  279. WorkItem nextWorkItem = _workItemsQueue.Dequeue();
  280. try
  281. {
  282. _stp.Enqueue(nextWorkItem);
  283. }
  284. catch (ObjectDisposedException e)
  285. {
  286. e.GetHashCode();
  287. // The STP has been shutdown
  288. }
  289. ++_workItemsInStpQueue;
  290. }
  291. }
  292. }
  293. }
  294. #endregion
  295. }
  296. #endregion
  297. }