SmartThreadPool.cs 62 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654
  1. #region Release History
  2. // Smart Thread Pool
  3. // 7 Aug 2004 - Initial release
  4. //
  5. // 14 Sep 2004 - Bug fixes
  6. //
  7. // 15 Oct 2004 - Added new features
  8. // - Work items return result.
  9. // - Support waiting synchronization for multiple work items.
  10. // - Work items can be cancelled.
  11. // - Passage of the caller thread’s context to the thread in the pool.
  12. // - Minimal usage of WIN32 handles.
  13. // - Minor bug fixes.
  14. //
  15. // 26 Dec 2004 - Changes:
  16. // - Removed static constructors.
  17. // - Added finalizers.
  18. // - Changed Exceptions so they are serializable.
  19. // - Fixed the bug in one of the SmartThreadPool constructors.
  20. // - Changed the SmartThreadPool.WaitAll() so it will support any number of waiters.
  21. // The SmartThreadPool.WaitAny() is still limited by the .NET Framework.
  22. // - Added PostExecute with options on which cases to call it.
  23. // - Added option to dispose of the state objects.
  24. // - Added a WaitForIdle() method that waits until the work items queue is empty.
  25. // - Added an STPStartInfo class for the initialization of the thread pool.
  26. // - Changed exception handling so if a work item throws an exception it
  27. // is rethrown at GetResult(), rather then firing an UnhandledException event.
  28. // Note that PostExecute exception are always ignored.
  29. //
  30. // 25 Mar 2005 - Changes:
  31. // - Fixed lost of work items bug
  32. //
  33. // 3 Jul 2005: Changes.
  34. // - Fixed bug where Enqueue() throws an exception because PopWaiter() returned null, hardly reconstructed.
  35. //
  36. // 16 Aug 2005: Changes.
  37. // - Fixed bug where the InUseThreads becomes negative when canceling work items.
  38. //
  39. // 31 Jan 2006 - Changes:
  40. // - Added work items priority
  41. // - Removed support of chained delegates in callbacks and post executes (nobody really use this)
  42. // - Added work items groups
  43. // - Added work items groups idle event
  44. // - Changed SmartThreadPool.WaitAll() behavior so when it gets empty array
  45. // it returns true rather then throwing an exception.
  46. // - Added option to start the STP and the WIG as suspended
  47. // - Exception behavior changed, the real exception is returned by an
  48. // inner exception
  49. // - Added performance counters
  50. // - Added priority to the threads in the pool
  51. //
  52. // 13 Feb 2006 - Changes:
  53. // - Added a call to the dispose of the Performance Counter so
  54. // their won't be a Performance Counter leak.
  55. // - Added exception catch in case the Performance Counters cannot
  56. // be created.
  57. //
  58. // 17 May 2008 - Changes:
  59. // - Changed the dispose behavior and removed the Finalizers.
  60. // - Enabled the change of the MaxThreads and MinThreads at run time.
  61. // - Enabled the change of the Concurrency of a IWorkItemsGroup at run
  62. // time If the IWorkItemsGroup is a SmartThreadPool then the Concurrency
  63. // refers to the MaxThreads.
  64. // - Improved the cancel behavior.
  65. // - Added events for thread creation and termination.
  66. // - Fixed the HttpContext context capture.
  67. // - Changed internal collections so they use generic collections
  68. // - Added IsIdle flag to the SmartThreadPool and IWorkItemsGroup
  69. // - Added support for WinCE
  70. // - Added support for Action<T> and Func<T>
  71. //
  72. // 07 April 2009 - Changes:
  73. // - Added support for Silverlight and Mono
  74. // - Added Join, Choice, and Pipe to SmartThreadPool.
  75. // - Added local performance counters (for Mono, Silverlight, and WindowsCE)
  76. // - Changed duration measures from DateTime.Now to Stopwatch.
  77. // - Queues changed from System.Collections.Queue to System.Collections.Generic.LinkedList<T>.
  78. //
  79. // 21 December 2009 - Changes:
  80. // - Added work item timeout (passive)
  81. //
  82. // 20 August 2012 - Changes:
  83. // - Added set name to threads
  84. // - Fixed the WorkItemsQueue.Dequeue.
  85. // Replaced while (!Monitor.TryEnter(this)); with lock(this) { ... }
  86. // - Fixed SmartThreadPool.Pipe
  87. // - Added IsBackground option to threads
  88. // - Added ApartmentState to threads
  89. // - Fixed thread creation when queuing many work items at the same time.
  90. //
  91. // 24 August 2012 - Changes:
  92. // - Enabled cancel abort after cancel. See: http://smartthreadpool.codeplex.com/discussions/345937 by alecswan
  93. // - Added option to set MaxStackSize of threads
  94. #endregion
  95. using System;
  96. using System.Security;
  97. using System.Threading;
  98. using System.Collections;
  99. using System.Collections.Generic;
  100. using System.Diagnostics;
  101. using System.Runtime.CompilerServices;
  102. using Amib.Threading.Internal;
  103. namespace Amib.Threading
  104. {
  105. #region SmartThreadPool class
  106. /// <summary>
  107. /// Smart thread pool class.
  108. /// </summary>
  109. public partial class SmartThreadPool : WorkItemsGroupBase, IDisposable
  110. {
  111. #region Public Default Constants
  112. /// <summary>
  113. /// Default minimum number of threads the thread pool contains. (0)
  114. /// </summary>
  115. public const int DefaultMinWorkerThreads = 0;
  116. /// <summary>
  117. /// Default maximum number of threads the thread pool contains. (25)
  118. /// </summary>
  119. public const int DefaultMaxWorkerThreads = 25;
  120. /// <summary>
  121. /// Default idle timeout in milliseconds. (One minute)
  122. /// </summary>
  123. public const int DefaultIdleTimeout = 60 * 1000; // One minute
  124. /// <summary>
  125. /// Indicate to copy the security context of the caller and then use it in the call. (false)
  126. /// </summary>
  127. public const bool DefaultUseCallerCallContext = false;
  128. /// <summary>
  129. /// Indicate to dispose of the state objects if they support the IDispose interface. (false)
  130. /// </summary>
  131. public const bool DefaultDisposeOfStateObjects = false;
  132. /// <summary>
  133. /// The default option to run the post execute (CallToPostExecute.Always)
  134. /// </summary>
  135. public const CallToPostExecute DefaultCallToPostExecute = CallToPostExecute.Always;
  136. /// <summary>
  137. /// The default post execute method to run. (None)
  138. /// When null it means not to call it.
  139. /// </summary>
  140. public static readonly PostExecuteWorkItemCallback DefaultPostExecuteWorkItemCallback;
  141. /// <summary>
  142. /// The default work item priority (WorkItemPriority.Normal)
  143. /// </summary>
  144. public const WorkItemPriority DefaultWorkItemPriority = WorkItemPriority.Normal;
  145. /// <summary>
  146. /// The default is to work on work items as soon as they arrive
  147. /// and not to wait for the start. (false)
  148. /// </summary>
  149. public const bool DefaultStartSuspended = false;
  150. /// <summary>
  151. /// The default name to use for the performance counters instance. (null)
  152. /// </summary>
  153. public static readonly string DefaultPerformanceCounterInstanceName;
  154. /// <summary>
  155. /// The default thread priority (ThreadPriority.Normal)
  156. /// </summary>
  157. public const ThreadPriority DefaultThreadPriority = ThreadPriority.Normal;
  158. /// <summary>
  159. /// The default thread pool name. (SmartThreadPool)
  160. /// </summary>
  161. public const string DefaultThreadPoolName = "SmartThreadPool";
  162. /// <summary>
  163. /// The default Max Stack Size. (SmartThreadPool)
  164. /// </summary>
  165. public static readonly int? DefaultMaxStackSize = null;
  166. /// <summary>
  167. /// The default fill state with params. (false)
  168. /// It is relevant only to QueueWorkItem of Action&lt;...&gt;/Func&lt;...&gt;
  169. /// </summary>
  170. public const bool DefaultFillStateWithArgs = false;
  171. /// <summary>
  172. /// The default thread backgroundness. (true)
  173. /// </summary>
  174. public const bool DefaultAreThreadsBackground = true;
  175. /// <summary>
  176. /// The default apartment state of a thread in the thread pool.
  177. /// The default is ApartmentState.Unknown which means the STP will not
  178. /// set the apartment of the thread. It will use the .NET default.
  179. /// </summary>
  180. public const ApartmentState DefaultApartmentState = ApartmentState.Unknown;
  181. #endregion
  182. #region Member Variables
  183. /// <summary>
  184. /// Dictionary of all the threads in the thread pool.
  185. /// </summary>
  186. private readonly SynchronizedDictionary<Thread, ThreadEntry> _workerThreads = new SynchronizedDictionary<Thread, ThreadEntry>();
  187. /// <summary>
  188. /// Queue of work items.
  189. /// </summary>
  190. private readonly WorkItemsQueue _workItemsQueue = new WorkItemsQueue();
  191. /// <summary>
  192. /// Count the work items handled.
  193. /// Used by the performance counter.
  194. /// </summary>
  195. private int _workItemsProcessed;
  196. /// <summary>
  197. /// Number of threads that currently work (not idle).
  198. /// </summary>
  199. private int _inUseWorkerThreads;
  200. /// <summary>
  201. /// Stores a copy of the original STPStartInfo.
  202. /// It is used to change the MinThread and MaxThreads
  203. /// </summary>
  204. private STPStartInfo _stpStartInfo;
  205. /// <summary>
  206. /// Total number of work items that are stored in the work items queue
  207. /// plus the work items that the threads in the pool are working on.
  208. /// </summary>
  209. private int _currentWorkItemsCount;
  210. /// <summary>
  211. /// Signaled when the thread pool is idle, i.e. no thread is busy
  212. /// and the work items queue is empty
  213. /// </summary>
  214. private ManualResetEvent _isIdleWaitHandle = new ManualResetEvent(true);
  215. /// <summary>
  216. /// An event to signal all the threads to quit immediately.
  217. /// </summary>
  218. private ManualResetEvent _shuttingDownEvent = new ManualResetEvent(false);
  219. /// <summary>
  220. /// A flag to indicate if the Smart Thread Pool is now suspended.
  221. /// </summary>
  222. private bool _isSuspended;
  223. /// <summary>
  224. /// A flag to indicate the threads to quit.
  225. /// </summary>
  226. private bool _shutdown;
  227. /// <summary>
  228. /// Counts the threads created in the pool.
  229. /// It is used to name the threads.
  230. /// </summary>
  231. private int _threadCounter;
  232. /// <summary>
  233. /// Indicate that the SmartThreadPool has been disposed
  234. /// </summary>
  235. private bool _isDisposed;
  236. /// <summary>
  237. /// Holds all the WorkItemsGroup instaces that have at least one
  238. /// work item int the SmartThreadPool
  239. /// This variable is used in case of Shutdown
  240. /// </summary>
  241. private readonly SynchronizedDictionary<IWorkItemsGroup, IWorkItemsGroup> _workItemsGroups = new SynchronizedDictionary<IWorkItemsGroup, IWorkItemsGroup>();
  242. /// <summary>
  243. /// A common object for all the work items int the STP
  244. /// so we can mark them to cancel in O(1)
  245. /// </summary>
  246. private CanceledWorkItemsGroup _canceledSmartThreadPool = new CanceledWorkItemsGroup();
  247. /// <summary>
  248. /// Windows STP performance counters
  249. /// </summary>
  250. private ISTPInstancePerformanceCounters _windowsPCs = NullSTPInstancePerformanceCounters.Instance;
  251. /// <summary>
  252. /// Local STP performance counters
  253. /// </summary>
  254. private ISTPInstancePerformanceCounters _localPCs = NullSTPInstancePerformanceCounters.Instance;
  255. [ThreadStatic]
  256. private static ThreadEntry _threadEntry;
  257. /// <summary>
  258. /// An event to call after a thread is created, but before
  259. /// it's first use.
  260. /// </summary>
  261. private event ThreadInitializationHandler _onThreadInitialization;
  262. /// <summary>
  263. /// An event to call when a thread is about to exit, after
  264. /// it is no longer belong to the pool.
  265. /// </summary>
  266. private event ThreadTerminationHandler _onThreadTermination;
  267. #endregion
  268. #region Per thread properties
  269. /// <summary>
  270. /// A reference to the current work item a thread from the thread pool
  271. /// is executing.
  272. /// </summary>
  273. internal static ThreadEntry CurrentThreadEntry
  274. {
  275. get
  276. {
  277. return _threadEntry;
  278. }
  279. set
  280. {
  281. _threadEntry = value;
  282. }
  283. }
  284. #endregion
  285. #region Construction and Finalization
  286. /// <summary>
  287. /// Constructor
  288. /// </summary>
  289. public SmartThreadPool()
  290. {
  291. _stpStartInfo = new STPStartInfo();
  292. Initialize();
  293. }
  294. /// <summary>
  295. /// Constructor
  296. /// </summary>
  297. /// <param name="idleTimeout">Idle timeout in milliseconds</param>
  298. public SmartThreadPool(int idleTimeout)
  299. {
  300. _stpStartInfo = new STPStartInfo
  301. {
  302. IdleTimeout = idleTimeout,
  303. };
  304. Initialize();
  305. }
  306. /// <summary>
  307. /// Constructor
  308. /// </summary>
  309. /// <param name="idleTimeout">Idle timeout in milliseconds</param>
  310. /// <param name="maxWorkerThreads">Upper limit of threads in the pool</param>
  311. public SmartThreadPool(
  312. int idleTimeout,
  313. int maxWorkerThreads)
  314. {
  315. _stpStartInfo = new STPStartInfo
  316. {
  317. IdleTimeout = idleTimeout,
  318. MaxWorkerThreads = maxWorkerThreads,
  319. };
  320. Initialize();
  321. }
  322. /// <summary>
  323. /// Constructor
  324. /// </summary>
  325. /// <param name="idleTimeout">Idle timeout in milliseconds</param>
  326. /// <param name="maxWorkerThreads">Upper limit of threads in the pool</param>
  327. /// <param name="minWorkerThreads">Lower limit of threads in the pool</param>
  328. public SmartThreadPool(
  329. int idleTimeout,
  330. int maxWorkerThreads,
  331. int minWorkerThreads)
  332. {
  333. _stpStartInfo = new STPStartInfo
  334. {
  335. IdleTimeout = idleTimeout,
  336. MaxWorkerThreads = maxWorkerThreads,
  337. MinWorkerThreads = minWorkerThreads,
  338. };
  339. Initialize();
  340. }
  341. /// <summary>
  342. /// Constructor
  343. /// </summary>
  344. /// <param name="stpStartInfo">A SmartThreadPool configuration that overrides the default behavior</param>
  345. public SmartThreadPool(STPStartInfo stpStartInfo)
  346. {
  347. _stpStartInfo = new STPStartInfo(stpStartInfo);
  348. Initialize();
  349. }
  350. private void Initialize()
  351. {
  352. Name = _stpStartInfo.ThreadPoolName;
  353. ValidateSTPStartInfo();
  354. // _stpStartInfoRW stores a read/write copy of the STPStartInfo.
  355. // Actually only MaxWorkerThreads and MinWorkerThreads are overwritten
  356. _isSuspended = _stpStartInfo.StartSuspended;
  357. if (null != _stpStartInfo.PerformanceCounterInstanceName)
  358. {
  359. try
  360. {
  361. _windowsPCs = new STPInstancePerformanceCounters(_stpStartInfo.PerformanceCounterInstanceName);
  362. }
  363. catch (Exception e)
  364. {
  365. Debug.WriteLine("Unable to create Performance Counters: " + e);
  366. _windowsPCs = NullSTPInstancePerformanceCounters.Instance;
  367. }
  368. }
  369. if (_stpStartInfo.EnableLocalPerformanceCounters)
  370. {
  371. _localPCs = new LocalSTPInstancePerformanceCounters();
  372. }
  373. // If the STP is not started suspended then start the threads.
  374. if (!_isSuspended)
  375. {
  376. StartOptimalNumberOfThreads();
  377. }
  378. }
  379. private void StartOptimalNumberOfThreads()
  380. {
  381. int threadsCount = Math.Max(_workItemsQueue.Count, _stpStartInfo.MinWorkerThreads);
  382. threadsCount = Math.Min(threadsCount, _stpStartInfo.MaxWorkerThreads);
  383. threadsCount -= _workerThreads.Count;
  384. if (threadsCount > 0)
  385. {
  386. StartThreads(threadsCount);
  387. }
  388. }
  389. private void ValidateSTPStartInfo()
  390. {
  391. if (_stpStartInfo.MinWorkerThreads < 0)
  392. {
  393. throw new ArgumentOutOfRangeException(
  394. "MinWorkerThreads", "MinWorkerThreads cannot be negative");
  395. }
  396. if (_stpStartInfo.MaxWorkerThreads <= 0)
  397. {
  398. throw new ArgumentOutOfRangeException(
  399. "MaxWorkerThreads", "MaxWorkerThreads must be greater than zero");
  400. }
  401. if (_stpStartInfo.MinWorkerThreads > _stpStartInfo.MaxWorkerThreads)
  402. {
  403. throw new ArgumentOutOfRangeException(
  404. "MinWorkerThreads, maxWorkerThreads",
  405. "MaxWorkerThreads must be greater or equal to MinWorkerThreads");
  406. }
  407. }
  408. private static void ValidateCallback(Delegate callback)
  409. {
  410. if (callback.GetInvocationList().Length > 1)
  411. {
  412. throw new NotSupportedException("SmartThreadPool doesn't support delegates chains");
  413. }
  414. }
  415. #endregion
  416. #region Thread Processing
  417. /// <summary>
  418. /// Waits on the queue for a work item, shutdown, or timeout.
  419. /// </summary>
  420. /// <returns>
  421. /// Returns the WaitingCallback or null in case of timeout or shutdown.
  422. /// </returns>
  423. private WorkItem Dequeue()
  424. {
  425. WorkItem workItem =
  426. _workItemsQueue.DequeueWorkItem(_stpStartInfo.IdleTimeout, _shuttingDownEvent);
  427. return workItem;
  428. }
  429. /// <summary>
  430. /// Put a new work item in the queue
  431. /// </summary>
  432. /// <param name="workItem">A work item to queue</param>
  433. internal override void Enqueue(WorkItem workItem)
  434. {
  435. // Make sure the workItem is not null
  436. Debug.Assert(null != workItem);
  437. IncrementWorkItemsCount();
  438. workItem.CanceledSmartThreadPool = _canceledSmartThreadPool;
  439. _workItemsQueue.EnqueueWorkItem(workItem);
  440. workItem.WorkItemIsQueued();
  441. // If all the threads are busy then try to create a new one
  442. if (_currentWorkItemsCount > _workerThreads.Count)
  443. {
  444. StartThreads(1);
  445. }
  446. }
  447. private void IncrementWorkItemsCount()
  448. {
  449. _windowsPCs.SampleWorkItems(_workItemsQueue.Count, _workItemsProcessed);
  450. _localPCs.SampleWorkItems(_workItemsQueue.Count, _workItemsProcessed);
  451. int count = Interlocked.Increment(ref _currentWorkItemsCount);
  452. //Trace.WriteLine("WorkItemsCount = " + _currentWorkItemsCount.ToString());
  453. if (count == 1)
  454. {
  455. IsIdle = false;
  456. _isIdleWaitHandle.Reset();
  457. }
  458. }
  459. private void DecrementWorkItemsCount()
  460. {
  461. int count = Interlocked.Decrement(ref _currentWorkItemsCount);
  462. //Trace.WriteLine("WorkItemsCount = " + _currentWorkItemsCount.ToString());
  463. if (count == 0)
  464. {
  465. IsIdle = true;
  466. _isIdleWaitHandle.Set();
  467. }
  468. Interlocked.Increment(ref _workItemsProcessed);
  469. if (!_shutdown)
  470. {
  471. // The counter counts even if the work item was cancelled
  472. _windowsPCs.SampleWorkItems(_workItemsQueue.Count, _workItemsProcessed);
  473. _localPCs.SampleWorkItems(_workItemsQueue.Count, _workItemsProcessed);
  474. }
  475. }
  476. internal void RegisterWorkItemsGroup(IWorkItemsGroup workItemsGroup)
  477. {
  478. _workItemsGroups[workItemsGroup] = workItemsGroup;
  479. }
  480. internal void UnregisterWorkItemsGroup(IWorkItemsGroup workItemsGroup)
  481. {
  482. if (_workItemsGroups.Contains(workItemsGroup))
  483. {
  484. _workItemsGroups.Remove(workItemsGroup);
  485. }
  486. }
  487. /// <summary>
  488. /// Inform that the current thread is about to quit or quiting.
  489. /// The same thread may call this method more than once.
  490. /// </summary>
  491. private void InformCompleted()
  492. {
  493. // There is no need to lock the two methods together
  494. // since only the current thread removes itself
  495. // and the _workerThreads is a synchronized dictionary
  496. if (_workerThreads.Contains(Thread.CurrentThread))
  497. {
  498. _workerThreads.Remove(Thread.CurrentThread);
  499. _windowsPCs.SampleThreads(_workerThreads.Count, _inUseWorkerThreads);
  500. _localPCs.SampleThreads(_workerThreads.Count, _inUseWorkerThreads);
  501. }
  502. }
  503. /// <summary>
  504. /// Starts new threads
  505. /// </summary>
  506. /// <param name="threadsCount">The number of threads to start</param>
  507. private void StartThreads(int threadsCount)
  508. {
  509. if (_isSuspended)
  510. {
  511. return;
  512. }
  513. lock (_workerThreads.SyncRoot)
  514. {
  515. // Don't start threads on shut down
  516. if (_shutdown)
  517. {
  518. return;
  519. }
  520. for (int i = 0; i < threadsCount; ++i)
  521. {
  522. // Don't create more threads then the upper limit
  523. if (_workerThreads.Count >= _stpStartInfo.MaxWorkerThreads)
  524. {
  525. return;
  526. }
  527. // Create a new thread
  528. Thread workerThread;
  529. if(_stpStartInfo.SuppressFlow)
  530. {
  531. using(ExecutionContext.SuppressFlow())
  532. {
  533. workerThread =
  534. _stpStartInfo.MaxStackSize.HasValue
  535. ? new Thread(ProcessQueuedItems, _stpStartInfo.MaxStackSize.Value)
  536. : new Thread(ProcessQueuedItems);
  537. }
  538. }
  539. else
  540. {
  541. workerThread =
  542. _stpStartInfo.MaxStackSize.HasValue
  543. ? new Thread(ProcessQueuedItems, _stpStartInfo.MaxStackSize.Value)
  544. : new Thread(ProcessQueuedItems);
  545. }
  546. // Configure the new thread and start it
  547. workerThread.IsBackground = _stpStartInfo.AreThreadsBackground;
  548. if (_stpStartInfo.ApartmentState != ApartmentState.Unknown)
  549. {
  550. workerThread.SetApartmentState(_stpStartInfo.ApartmentState);
  551. }
  552. workerThread.Priority = _stpStartInfo.ThreadPriority;
  553. workerThread.Name = string.Format("STP:{0}:{1}", Name, _threadCounter);
  554. workerThread.Start();
  555. ++_threadCounter;
  556. // Add it to the dictionary and update its creation time.
  557. _workerThreads[workerThread] = new ThreadEntry(this);
  558. _windowsPCs.SampleThreads(_workerThreads.Count, _inUseWorkerThreads);
  559. _localPCs.SampleThreads(_workerThreads.Count, _inUseWorkerThreads);
  560. }
  561. }
  562. }
  563. /// <summary>
  564. /// A worker thread method that processes work items from the work items queue.
  565. /// </summary>
  566. private void ProcessQueuedItems()
  567. {
  568. // Keep the entry of the dictionary as thread's variable to avoid the synchronization locks
  569. // of the dictionary.
  570. CurrentThreadEntry = _workerThreads[Thread.CurrentThread];
  571. FireOnThreadInitialization();
  572. try
  573. {
  574. bool bInUseWorkerThreadsWasIncremented = false;
  575. // Process until shutdown.
  576. while (!_shutdown)
  577. {
  578. // Update the last time this thread was seen alive.
  579. // It's good for debugging.
  580. CurrentThreadEntry.IAmAlive();
  581. // The following block handles the when the MaxWorkerThreads has been
  582. // incremented by the user at run-time.
  583. // Double lock for quit.
  584. if (_workerThreads.Count > _stpStartInfo.MaxWorkerThreads)
  585. {
  586. lock (_workerThreads.SyncRoot)
  587. {
  588. if (_workerThreads.Count > _stpStartInfo.MaxWorkerThreads)
  589. {
  590. // Inform that the thread is quiting and then quit.
  591. // This method must be called within this lock or else
  592. // more threads will quit and the thread pool will go
  593. // below the lower limit.
  594. InformCompleted();
  595. break;
  596. }
  597. }
  598. }
  599. // Wait for a work item, shutdown, or timeout
  600. WorkItem workItem = Dequeue();
  601. // Update the last time this thread was seen alive.
  602. // It's good for debugging.
  603. CurrentThreadEntry.IAmAlive();
  604. // On timeout or shut down.
  605. if (null == workItem)
  606. {
  607. // Double lock for quit.
  608. if (_workerThreads.Count > _stpStartInfo.MinWorkerThreads)
  609. {
  610. lock (_workerThreads.SyncRoot)
  611. {
  612. if (_workerThreads.Count > _stpStartInfo.MinWorkerThreads)
  613. {
  614. // Inform that the thread is quiting and then quit.
  615. // This method must be called within this lock or else
  616. // more threads will quit and the thread pool will go
  617. // below the lower limit.
  618. InformCompleted();
  619. break;
  620. }
  621. }
  622. }
  623. }
  624. // If we didn't quit then skip to the next iteration.
  625. if (null == workItem)
  626. {
  627. continue;
  628. }
  629. try
  630. {
  631. // Initialize the value to false
  632. bInUseWorkerThreadsWasIncremented = false;
  633. // Set the Current Work Item of the thread.
  634. // Store the Current Work Item before the workItem.StartingWorkItem() is called,
  635. // so WorkItem.Cancel can work when the work item is between InQueue and InProgress
  636. // states.
  637. // If the work item has been cancelled BEFORE the workItem.StartingWorkItem()
  638. // (work item is in InQueue state) then workItem.StartingWorkItem() will return false.
  639. // If the work item has been cancelled AFTER the workItem.StartingWorkItem() then
  640. // (work item is in InProgress state) then the thread will be aborted
  641. CurrentThreadEntry.CurrentWorkItem = workItem;
  642. // Change the state of the work item to 'in progress' if possible.
  643. // We do it here so if the work item has been canceled we won't
  644. // increment the _inUseWorkerThreads.
  645. // The cancel mechanism doesn't delete items from the queue,
  646. // it marks the work item as canceled, and when the work item
  647. // is dequeued, we just skip it.
  648. // If the post execute of work item is set to always or to
  649. // call when the work item is canceled then the StartingWorkItem()
  650. // will return true, so the post execute can run.
  651. if (!workItem.StartingWorkItem())
  652. {
  653. continue;
  654. }
  655. // Execute the callback. Make sure to accurately
  656. // record how many callbacks are currently executing.
  657. int inUseWorkerThreads = Interlocked.Increment(ref _inUseWorkerThreads);
  658. _windowsPCs.SampleThreads(_workerThreads.Count, inUseWorkerThreads);
  659. _localPCs.SampleThreads(_workerThreads.Count, inUseWorkerThreads);
  660. // Mark that the _inUseWorkerThreads incremented, so in the finally{}
  661. // statement we will decrement it correctly.
  662. bInUseWorkerThreadsWasIncremented = true;
  663. workItem.FireWorkItemStarted();
  664. ExecuteWorkItem(workItem);
  665. }
  666. catch (Exception ex)
  667. {
  668. ex.GetHashCode();
  669. // Do nothing
  670. }
  671. finally
  672. {
  673. workItem.DisposeOfState();
  674. // Set the CurrentWorkItem to null, since we
  675. // no longer run user's code.
  676. CurrentThreadEntry.CurrentWorkItem = null;
  677. // Decrement the _inUseWorkerThreads only if we had
  678. // incremented it. Note the cancelled work items don't
  679. // increment _inUseWorkerThreads.
  680. if (bInUseWorkerThreadsWasIncremented)
  681. {
  682. int inUseWorkerThreads = Interlocked.Decrement(ref _inUseWorkerThreads);
  683. _windowsPCs.SampleThreads(_workerThreads.Count, inUseWorkerThreads);
  684. _localPCs.SampleThreads(_workerThreads.Count, inUseWorkerThreads);
  685. }
  686. // Notify that the work item has been completed.
  687. // WorkItemsGroup may enqueue their next work item.
  688. workItem.FireWorkItemCompleted();
  689. // Decrement the number of work items here so the idle
  690. // ManualResetEvent won't fluctuate.
  691. DecrementWorkItemsCount();
  692. }
  693. }
  694. }
  695. catch (ThreadAbortException tae)
  696. {
  697. tae.GetHashCode();
  698. // Handle the abort exception gracfully.
  699. Thread.ResetAbort();
  700. }
  701. catch (Exception e)
  702. {
  703. Debug.Assert(null != e);
  704. }
  705. finally
  706. {
  707. InformCompleted();
  708. FireOnThreadTermination();
  709. }
  710. }
  711. private void ExecuteWorkItem(WorkItem workItem)
  712. {
  713. _windowsPCs.SampleWorkItemsWaitTime(workItem.WaitingTime);
  714. _localPCs.SampleWorkItemsWaitTime(workItem.WaitingTime);
  715. try
  716. {
  717. workItem.Execute();
  718. }
  719. finally
  720. {
  721. _windowsPCs.SampleWorkItemsProcessTime(workItem.ProcessTime);
  722. _localPCs.SampleWorkItemsProcessTime(workItem.ProcessTime);
  723. }
  724. }
  725. #endregion
  726. #region Public Methods
  727. private void ValidateWaitForIdle()
  728. {
  729. if (null != CurrentThreadEntry && CurrentThreadEntry.AssociatedSmartThreadPool == this)
  730. {
  731. throw new NotSupportedException(
  732. "WaitForIdle cannot be called from a thread on its SmartThreadPool, it causes a deadlock");
  733. }
  734. }
  735. internal static void ValidateWorkItemsGroupWaitForIdle(IWorkItemsGroup workItemsGroup)
  736. {
  737. if (null == CurrentThreadEntry)
  738. {
  739. return;
  740. }
  741. WorkItem workItem = CurrentThreadEntry.CurrentWorkItem;
  742. ValidateWorkItemsGroupWaitForIdleImpl(workItemsGroup, workItem);
  743. if ((null != workItemsGroup) &&
  744. (null != workItem) &&
  745. CurrentThreadEntry.CurrentWorkItem.WasQueuedBy(workItemsGroup))
  746. {
  747. throw new NotSupportedException("WaitForIdle cannot be called from a thread on its SmartThreadPool, it causes a deadlock");
  748. }
  749. }
  750. [MethodImpl(MethodImplOptions.NoInlining)]
  751. private static void ValidateWorkItemsGroupWaitForIdleImpl(IWorkItemsGroup workItemsGroup, WorkItem workItem)
  752. {
  753. if ((null != workItemsGroup) &&
  754. (null != workItem) &&
  755. workItem.WasQueuedBy(workItemsGroup))
  756. {
  757. throw new NotSupportedException("WaitForIdle cannot be called from a thread on its SmartThreadPool, it causes a deadlock");
  758. }
  759. }
  760. /// <summary>
  761. /// Force the SmartThreadPool to shutdown
  762. /// </summary>
  763. public void Shutdown()
  764. {
  765. Shutdown(true, 0);
  766. }
  767. /// <summary>
  768. /// Force the SmartThreadPool to shutdown with timeout
  769. /// </summary>
  770. public void Shutdown(bool forceAbort, TimeSpan timeout)
  771. {
  772. Shutdown(forceAbort, (int)timeout.TotalMilliseconds);
  773. }
  774. /// <summary>
  775. /// Empties the queue of work items and abort the threads in the pool.
  776. /// </summary>
  777. public void Shutdown(bool forceAbort, int millisecondsTimeout)
  778. {
  779. ValidateNotDisposed();
  780. ISTPInstancePerformanceCounters pcs = _windowsPCs;
  781. if (NullSTPInstancePerformanceCounters.Instance != _windowsPCs)
  782. {
  783. // Set the _pcs to "null" to stop updating the performance
  784. // counters
  785. _windowsPCs = NullSTPInstancePerformanceCounters.Instance;
  786. pcs.Dispose();
  787. }
  788. Thread[] threads;
  789. lock (_workerThreads.SyncRoot)
  790. {
  791. // Shutdown the work items queue
  792. _workItemsQueue.Dispose();
  793. // Signal the threads to exit
  794. _shutdown = true;
  795. _shuttingDownEvent.Set();
  796. // Make a copy of the threads' references in the pool
  797. threads = new Thread[_workerThreads.Count];
  798. _workerThreads.Keys.CopyTo(threads, 0);
  799. }
  800. int millisecondsLeft = millisecondsTimeout;
  801. Stopwatch stopwatch = Stopwatch.StartNew();
  802. //DateTime start = DateTime.UtcNow;
  803. bool waitInfinitely = (Timeout.Infinite == millisecondsTimeout);
  804. bool timeout = false;
  805. // Each iteration we update the time left for the timeout.
  806. foreach (Thread thread in threads)
  807. {
  808. // Join don't work with negative numbers
  809. if (!waitInfinitely && (millisecondsLeft < 0))
  810. {
  811. timeout = true;
  812. break;
  813. }
  814. // Wait for the thread to terminate
  815. bool success = thread.Join(millisecondsLeft);
  816. if (!success)
  817. {
  818. timeout = true;
  819. break;
  820. }
  821. if (!waitInfinitely)
  822. {
  823. // Update the time left to wait
  824. //TimeSpan ts = DateTime.UtcNow - start;
  825. millisecondsLeft = millisecondsTimeout - (int)stopwatch.ElapsedMilliseconds;
  826. }
  827. }
  828. if (timeout && forceAbort)
  829. {
  830. // Abort the threads in the pool
  831. foreach (Thread thread in threads)
  832. {
  833. if ((thread != null) && thread.IsAlive )
  834. {
  835. try
  836. {
  837. thread.Abort(); // Shutdown
  838. }
  839. catch (SecurityException e)
  840. {
  841. e.GetHashCode();
  842. }
  843. catch (ThreadStateException ex)
  844. {
  845. ex.GetHashCode();
  846. // In case the thread has been terminated
  847. // after the check if it is alive.
  848. }
  849. }
  850. }
  851. }
  852. }
  853. /// <summary>
  854. /// Wait for all work items to complete
  855. /// </summary>
  856. /// <param name="waitableResults">Array of work item result objects</param>
  857. /// <returns>
  858. /// true when every work item in workItemResults has completed; otherwise false.
  859. /// </returns>
  860. public static bool WaitAll( IWaitableResult[] waitableResults)
  861. {
  862. return WaitAll(waitableResults, Timeout.Infinite, true);
  863. }
  864. /// <summary>
  865. /// Wait for all work items to complete
  866. /// </summary>
  867. /// <param name="waitableResults">Array of work item result objects</param>
  868. /// <param name="timeout">The number of milliseconds to wait, or a TimeSpan that represents -1 milliseconds to wait indefinitely. </param>
  869. /// <param name="exitContext">
  870. /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false.
  871. /// </param>
  872. /// <returns>
  873. /// true when every work item in workItemResults has completed; otherwise false.
  874. /// </returns>
  875. public static bool WaitAll( IWaitableResult[] waitableResults, TimeSpan timeout, bool exitContext)
  876. {
  877. return WaitAll(waitableResults, (int)timeout.TotalMilliseconds, exitContext);
  878. }
  879. /// <summary>
  880. /// Wait for all work items to complete
  881. /// </summary>
  882. /// <param name="waitableResults">Array of work item result objects</param>
  883. /// <param name="timeout">The number of milliseconds to wait, or a TimeSpan that represents -1 milliseconds to wait indefinitely. </param>
  884. /// <param name="exitContext">
  885. /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false.
  886. /// </param>
  887. /// <param name="cancelWaitHandle">A cancel wait handle to interrupt the wait if needed</param>
  888. /// <returns>
  889. /// true when every work item in workItemResults has completed; otherwise false.
  890. /// </returns>
  891. public static bool WaitAll( IWaitableResult[] waitableResults, TimeSpan timeout,
  892. bool exitContext, WaitHandle cancelWaitHandle)
  893. {
  894. return WaitAll(waitableResults, (int)timeout.TotalMilliseconds, exitContext, cancelWaitHandle);
  895. }
  896. /// <summary>
  897. /// Wait for all work items to complete
  898. /// </summary>
  899. /// <param name="waitableResults">Array of work item result objects</param>
  900. /// <param name="millisecondsTimeout">The number of milliseconds to wait, or Timeout.Infinite (-1) to wait indefinitely.</param>
  901. /// <param name="exitContext">
  902. /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false.
  903. /// </param>
  904. /// <returns>
  905. /// true when every work item in workItemResults has completed; otherwise false.
  906. /// </returns>
  907. public static bool WaitAll( IWaitableResult[] waitableResults, int millisecondsTimeout, bool exitContext)
  908. {
  909. return WorkItem.WaitAll(waitableResults, millisecondsTimeout, exitContext, null);
  910. }
  911. /// <summary>
  912. /// Wait for all work items to complete
  913. /// </summary>
  914. /// <param name="waitableResults">Array of work item result objects</param>
  915. /// <param name="millisecondsTimeout">The number of milliseconds to wait, or Timeout.Infinite (-1) to wait indefinitely.</param>
  916. /// <param name="exitContext">
  917. /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false.
  918. /// </param>
  919. /// <param name="cancelWaitHandle">A cancel wait handle to interrupt the wait if needed</param>
  920. /// <returns>
  921. /// true when every work item in workItemResults has completed; otherwise false.
  922. /// </returns>
  923. public static bool WaitAll( IWaitableResult[] waitableResults, int millisecondsTimeout,
  924. bool exitContext, WaitHandle cancelWaitHandle)
  925. {
  926. return WorkItem.WaitAll(waitableResults, millisecondsTimeout, exitContext, cancelWaitHandle);
  927. }
  928. /// <summary>
  929. /// Waits for any of the work items in the specified array to complete, cancel, or timeout
  930. /// </summary>
  931. /// <param name="waitableResults">Array of work item result objects</param>
  932. /// <returns>
  933. /// The array index of the work item result that satisfied the wait, or WaitTimeout if any of the work items has been canceled.
  934. /// </returns>
  935. public static int WaitAny( IWaitableResult[] waitableResults)
  936. {
  937. return WaitAny(waitableResults, Timeout.Infinite, true);
  938. }
  939. /// <summary>
  940. /// Waits for any of the work items in the specified array to complete, cancel, or timeout
  941. /// </summary>
  942. /// <param name="waitableResults">Array of work item result objects</param>
  943. /// <param name="timeout">The number of milliseconds to wait, or a TimeSpan that represents -1 milliseconds to wait indefinitely. </param>
  944. /// <param name="exitContext">
  945. /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false.
  946. /// </param>
  947. /// <returns>
  948. /// The array index of the work item result that satisfied the wait, or WaitTimeout if no work item result satisfied the wait and a time interval equivalent to millisecondsTimeout has passed or the work item has been canceled.
  949. /// </returns>
  950. public static int WaitAny( IWaitableResult[] waitableResults, TimeSpan timeout, bool exitContext)
  951. {
  952. return WaitAny(waitableResults, (int)timeout.TotalMilliseconds, exitContext);
  953. }
  954. /// <summary>
  955. /// Waits for any of the work items in the specified array to complete, cancel, or timeout
  956. /// </summary>
  957. /// <param name="waitableResults">Array of work item result objects</param>
  958. /// <param name="timeout">The number of milliseconds to wait, or a TimeSpan that represents -1 milliseconds to wait indefinitely. </param>
  959. /// <param name="exitContext">
  960. /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false.
  961. /// </param>
  962. /// <param name="cancelWaitHandle">A cancel wait handle to interrupt the wait if needed</param>
  963. /// <returns>
  964. /// The array index of the work item result that satisfied the wait, or WaitTimeout if no work item result satisfied the wait and a time interval equivalent to millisecondsTimeout has passed or the work item has been canceled.
  965. /// </returns>
  966. public static int WaitAny( IWaitableResult[] waitableResults, TimeSpan timeout,
  967. bool exitContext, WaitHandle cancelWaitHandle)
  968. {
  969. return WaitAny(waitableResults, (int)timeout.TotalMilliseconds, exitContext, cancelWaitHandle);
  970. }
  971. /// <summary>
  972. /// Waits for any of the work items in the specified array to complete, cancel, or timeout
  973. /// </summary>
  974. /// <param name="waitableResults">Array of work item result objects</param>
  975. /// <param name="millisecondsTimeout">The number of milliseconds to wait, or Timeout.Infinite (-1) to wait indefinitely.</param>
  976. /// <param name="exitContext">
  977. /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false.
  978. /// </param>
  979. /// <returns>
  980. /// The array index of the work item result that satisfied the wait, or WaitTimeout if no work item result satisfied the wait and a time interval equivalent to millisecondsTimeout has passed or the work item has been canceled.
  981. /// </returns>
  982. public static int WaitAny( IWaitableResult[] waitableResults, int millisecondsTimeout, bool exitContext)
  983. {
  984. return WorkItem.WaitAny(waitableResults, millisecondsTimeout, exitContext, null);
  985. }
  986. /// <summary>
  987. /// Waits for any of the work items in the specified array to complete, cancel, or timeout
  988. /// </summary>
  989. /// <param name="waitableResults">Array of work item result objects</param>
  990. /// <param name="millisecondsTimeout">The number of milliseconds to wait, or Timeout.Infinite (-1) to wait indefinitely.</param>
  991. /// <param name="exitContext">
  992. /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false.
  993. /// </param>
  994. /// <param name="cancelWaitHandle">A cancel wait handle to interrupt the wait if needed</param>
  995. /// <returns>
  996. /// The array index of the work item result that satisfied the wait, or WaitTimeout if no work item result satisfied the wait and a time interval equivalent to millisecondsTimeout has passed or the work item has been canceled.
  997. /// </returns>
  998. public static int WaitAny( IWaitableResult[] waitableResults, int millisecondsTimeout,
  999. bool exitContext, WaitHandle cancelWaitHandle)
  1000. {
  1001. return WorkItem.WaitAny(waitableResults, millisecondsTimeout, exitContext, cancelWaitHandle);
  1002. }
  1003. /// <summary>
  1004. /// Creates a new WorkItemsGroup.
  1005. /// </summary>
  1006. /// <param name="concurrency">The number of work items that can be run concurrently</param>
  1007. /// <returns>A reference to the WorkItemsGroup</returns>
  1008. public IWorkItemsGroup CreateWorkItemsGroup(int concurrency)
  1009. {
  1010. IWorkItemsGroup workItemsGroup = new WorkItemsGroup(this, concurrency, _stpStartInfo);
  1011. return workItemsGroup;
  1012. }
  1013. /// <summary>
  1014. /// Creates a new WorkItemsGroup.
  1015. /// </summary>
  1016. /// <param name="concurrency">The number of work items that can be run concurrently</param>
  1017. /// <param name="wigStartInfo">A WorkItemsGroup configuration that overrides the default behavior</param>
  1018. /// <returns>A reference to the WorkItemsGroup</returns>
  1019. public IWorkItemsGroup CreateWorkItemsGroup(int concurrency, WIGStartInfo wigStartInfo)
  1020. {
  1021. IWorkItemsGroup workItemsGroup = new WorkItemsGroup(this, concurrency, wigStartInfo);
  1022. return workItemsGroup;
  1023. }
  1024. #region Fire Thread's Events
  1025. private void FireOnThreadInitialization()
  1026. {
  1027. if (null != _onThreadInitialization)
  1028. {
  1029. foreach (ThreadInitializationHandler tih in _onThreadInitialization.GetInvocationList())
  1030. {
  1031. try
  1032. {
  1033. tih();
  1034. }
  1035. catch (Exception e)
  1036. {
  1037. e.GetHashCode();
  1038. Debug.Assert(false);
  1039. throw;
  1040. }
  1041. }
  1042. }
  1043. }
  1044. private void FireOnThreadTermination()
  1045. {
  1046. if (null != _onThreadTermination)
  1047. {
  1048. foreach (ThreadTerminationHandler tth in _onThreadTermination.GetInvocationList())
  1049. {
  1050. try
  1051. {
  1052. tth();
  1053. }
  1054. catch (Exception e)
  1055. {
  1056. e.GetHashCode();
  1057. Debug.Assert(false);
  1058. throw;
  1059. }
  1060. }
  1061. }
  1062. }
  1063. #endregion
  1064. /// <summary>
  1065. /// This event is fired when a thread is created.
  1066. /// Use it to initialize a thread before the work items use it.
  1067. /// </summary>
  1068. public event ThreadInitializationHandler OnThreadInitialization
  1069. {
  1070. add { _onThreadInitialization += value; }
  1071. remove { _onThreadInitialization -= value; }
  1072. }
  1073. /// <summary>
  1074. /// This event is fired when a thread is terminating.
  1075. /// Use it for cleanup.
  1076. /// </summary>
  1077. public event ThreadTerminationHandler OnThreadTermination
  1078. {
  1079. add { _onThreadTermination += value; }
  1080. remove { _onThreadTermination -= value; }
  1081. }
  1082. internal void CancelAbortWorkItemsGroup(WorkItemsGroup wig)
  1083. {
  1084. foreach (ThreadEntry threadEntry in _workerThreads.Values)
  1085. {
  1086. WorkItem workItem = threadEntry.CurrentWorkItem;
  1087. if (null != workItem &&
  1088. workItem.WasQueuedBy(wig) &&
  1089. !workItem.IsCanceled)
  1090. {
  1091. threadEntry.CurrentWorkItem.GetWorkItemResult().Cancel(true);
  1092. }
  1093. }
  1094. }
  1095. #endregion
  1096. #region Properties
  1097. /// <summary>
  1098. /// Get/Set the lower limit of threads in the pool.
  1099. /// </summary>
  1100. public int MinThreads
  1101. {
  1102. get
  1103. {
  1104. ValidateNotDisposed();
  1105. return _stpStartInfo.MinWorkerThreads;
  1106. }
  1107. set
  1108. {
  1109. Debug.Assert(value >= 0);
  1110. Debug.Assert(value <= _stpStartInfo.MaxWorkerThreads);
  1111. if (_stpStartInfo.MaxWorkerThreads < value)
  1112. {
  1113. _stpStartInfo.MaxWorkerThreads = value;
  1114. }
  1115. _stpStartInfo.MinWorkerThreads = value;
  1116. StartOptimalNumberOfThreads();
  1117. }
  1118. }
  1119. /// <summary>
  1120. /// Get/Set the upper limit of threads in the pool.
  1121. /// </summary>
  1122. public int MaxThreads
  1123. {
  1124. get
  1125. {
  1126. ValidateNotDisposed();
  1127. return _stpStartInfo.MaxWorkerThreads;
  1128. }
  1129. set
  1130. {
  1131. Debug.Assert(value > 0);
  1132. Debug.Assert(value >= _stpStartInfo.MinWorkerThreads);
  1133. if (_stpStartInfo.MinWorkerThreads > value)
  1134. {
  1135. _stpStartInfo.MinWorkerThreads = value;
  1136. }
  1137. _stpStartInfo.MaxWorkerThreads = value;
  1138. StartOptimalNumberOfThreads();
  1139. }
  1140. }
  1141. /// <summary>
  1142. /// Get the number of threads in the thread pool.
  1143. /// Should be between the lower and the upper limits.
  1144. /// </summary>
  1145. public int ActiveThreads
  1146. {
  1147. get
  1148. {
  1149. ValidateNotDisposed();
  1150. return _workerThreads.Count;
  1151. }
  1152. }
  1153. /// <summary>
  1154. /// Get the number of busy (not idle) threads in the thread pool.
  1155. /// </summary>
  1156. public int InUseThreads
  1157. {
  1158. get
  1159. {
  1160. ValidateNotDisposed();
  1161. return _inUseWorkerThreads;
  1162. }
  1163. }
  1164. /// <summary>
  1165. /// Returns true if the current running work item has been cancelled.
  1166. /// Must be used within the work item's callback method.
  1167. /// The work item should sample this value in order to know if it
  1168. /// needs to quit before its completion.
  1169. /// </summary>
  1170. public static bool IsWorkItemCanceled
  1171. {
  1172. get
  1173. {
  1174. return CurrentThreadEntry.CurrentWorkItem.IsCanceled;
  1175. }
  1176. }
  1177. /// <summary>
  1178. /// Checks if the work item has been cancelled, and if yes then abort the thread.
  1179. /// Can be used with Cancel and timeout
  1180. /// </summary>
  1181. public static void AbortOnWorkItemCancel()
  1182. {
  1183. if (IsWorkItemCanceled)
  1184. {
  1185. Thread.CurrentThread.Abort();
  1186. }
  1187. }
  1188. /// <summary>
  1189. /// Thread Pool start information (readonly)
  1190. /// </summary>
  1191. public STPStartInfo STPStartInfo
  1192. {
  1193. get
  1194. {
  1195. return _stpStartInfo.AsReadOnly();
  1196. }
  1197. }
  1198. public bool IsShuttingdown
  1199. {
  1200. get { return _shutdown; }
  1201. }
  1202. /// <summary>
  1203. /// Return the local calculated performance counters
  1204. /// Available only if STPStartInfo.EnableLocalPerformanceCounters is true.
  1205. /// </summary>
  1206. public ISTPPerformanceCountersReader PerformanceCountersReader
  1207. {
  1208. get { return (ISTPPerformanceCountersReader)_localPCs; }
  1209. }
  1210. #endregion
  1211. #region IDisposable Members
  1212. public void Dispose()
  1213. {
  1214. Dispose(true);
  1215. GC.SuppressFinalize(this);
  1216. }
  1217. protected void Dispose(bool disposing)
  1218. {
  1219. if (!_isDisposed)
  1220. {
  1221. if (!_shutdown)
  1222. {
  1223. Shutdown();
  1224. }
  1225. if (null != _shuttingDownEvent)
  1226. {
  1227. _shuttingDownEvent.Close();
  1228. _shuttingDownEvent = null;
  1229. }
  1230. _workerThreads.Clear();
  1231. if (null != _isIdleWaitHandle)
  1232. {
  1233. _isIdleWaitHandle.Close();
  1234. _isIdleWaitHandle = null;
  1235. }
  1236. if (_stpStartInfo.EnableLocalPerformanceCounters)
  1237. _localPCs.Dispose();
  1238. _isDisposed = true;
  1239. }
  1240. }
  1241. private void ValidateNotDisposed()
  1242. {
  1243. if (_isDisposed)
  1244. {
  1245. throw new ObjectDisposedException(GetType().ToString(), "The SmartThreadPool has been shutdown");
  1246. }
  1247. }
  1248. #endregion
  1249. #region WorkItemsGroupBase Overrides
  1250. /// <summary>
  1251. /// Get/Set the maximum number of work items that execute cocurrency on the thread pool
  1252. /// </summary>
  1253. public override int Concurrency
  1254. {
  1255. get { return MaxThreads; }
  1256. set { MaxThreads = value; }
  1257. }
  1258. /// <summary>
  1259. /// Get the number of work items in the queue.
  1260. /// </summary>
  1261. public override int WaitingCallbacks
  1262. {
  1263. get
  1264. {
  1265. ValidateNotDisposed();
  1266. return _workItemsQueue.Count;
  1267. }
  1268. }
  1269. /// <summary>
  1270. /// Get an array with all the state objects of the currently running items.
  1271. /// The array represents a snap shot and impact performance.
  1272. /// </summary>
  1273. public override object[] GetStates()
  1274. {
  1275. object[] states = _workItemsQueue.GetStates();
  1276. return states;
  1277. }
  1278. /// <summary>
  1279. /// WorkItemsGroup start information (readonly)
  1280. /// </summary>
  1281. public override WIGStartInfo WIGStartInfo
  1282. {
  1283. get { return _stpStartInfo.AsReadOnly(); }
  1284. }
  1285. /// <summary>
  1286. /// Start the thread pool if it was started suspended.
  1287. /// If it is already running, this method is ignored.
  1288. /// </summary>
  1289. public override void Start()
  1290. {
  1291. if (!_isSuspended)
  1292. {
  1293. return;
  1294. }
  1295. _isSuspended = false;
  1296. ICollection workItemsGroups = _workItemsGroups.Values;
  1297. foreach (WorkItemsGroup workItemsGroup in workItemsGroups)
  1298. {
  1299. workItemsGroup.OnSTPIsStarting();
  1300. }
  1301. StartOptimalNumberOfThreads();
  1302. }
  1303. /// <summary>
  1304. /// Cancel all work items using thread abortion
  1305. /// </summary>
  1306. /// <param name="abortExecution">True to stop work items by raising ThreadAbortException</param>
  1307. public override void Cancel(bool abortExecution)
  1308. {
  1309. _canceledSmartThreadPool.IsCanceled = true;
  1310. _canceledSmartThreadPool = new CanceledWorkItemsGroup();
  1311. ICollection workItemsGroups = _workItemsGroups.Values;
  1312. foreach (WorkItemsGroup workItemsGroup in workItemsGroups)
  1313. {
  1314. workItemsGroup.Cancel(abortExecution);
  1315. }
  1316. if (abortExecution)
  1317. {
  1318. foreach (ThreadEntry threadEntry in _workerThreads.Values)
  1319. {
  1320. WorkItem workItem = threadEntry.CurrentWorkItem;
  1321. if (null != workItem &&
  1322. threadEntry.AssociatedSmartThreadPool == this &&
  1323. !workItem.IsCanceled)
  1324. {
  1325. threadEntry.CurrentWorkItem.GetWorkItemResult().Cancel(true);
  1326. }
  1327. }
  1328. }
  1329. }
  1330. /// <summary>
  1331. /// Wait for the thread pool to be idle
  1332. /// </summary>
  1333. public override bool WaitForIdle(int millisecondsTimeout)
  1334. {
  1335. ValidateWaitForIdle();
  1336. return STPEventWaitHandle.WaitOne(_isIdleWaitHandle, millisecondsTimeout, false);
  1337. }
  1338. /// <summary>
  1339. /// This event is fired when all work items are completed.
  1340. /// (When IsIdle changes to true)
  1341. /// This event only work on WorkItemsGroup. On SmartThreadPool
  1342. /// it throws the NotImplementedException.
  1343. /// </summary>
  1344. public override event WorkItemsGroupIdleHandler OnIdle
  1345. {
  1346. add
  1347. {
  1348. //_onIdle += value;
  1349. }
  1350. remove
  1351. {
  1352. //_onIdle -= value;
  1353. }
  1354. }
  1355. internal override void PreQueueWorkItem()
  1356. {
  1357. ValidateNotDisposed();
  1358. }
  1359. #endregion
  1360. #region Join, Choice, Pipe, etc.
  1361. /// <summary>
  1362. /// Executes all actions in parallel.
  1363. /// Returns when they all finish.
  1364. /// </summary>
  1365. /// <param name="actions">Actions to execute</param>
  1366. public void Join(IEnumerable<Action> actions)
  1367. {
  1368. WIGStartInfo wigStartInfo = new WIGStartInfo { StartSuspended = true };
  1369. IWorkItemsGroup workItemsGroup = CreateWorkItemsGroup(int.MaxValue, wigStartInfo);
  1370. foreach (Action action in actions)
  1371. {
  1372. workItemsGroup.QueueWorkItem(action);
  1373. }
  1374. workItemsGroup.Start();
  1375. workItemsGroup.WaitForIdle();
  1376. }
  1377. /// <summary>
  1378. /// Executes all actions in parallel.
  1379. /// Returns when they all finish.
  1380. /// </summary>
  1381. /// <param name="actions">Actions to execute</param>
  1382. public void Join(params Action[] actions)
  1383. {
  1384. Join((IEnumerable<Action>)actions);
  1385. }
  1386. private class ChoiceIndex
  1387. {
  1388. public int _index = -1;
  1389. }
  1390. /// <summary>
  1391. /// Executes all actions in parallel
  1392. /// Returns when the first one completes
  1393. /// </summary>
  1394. /// <param name="actions">Actions to execute</param>
  1395. public int Choice(IEnumerable<Action> actions)
  1396. {
  1397. WIGStartInfo wigStartInfo = new WIGStartInfo { StartSuspended = true };
  1398. IWorkItemsGroup workItemsGroup = CreateWorkItemsGroup(int.MaxValue, wigStartInfo);
  1399. ManualResetEvent anActionCompleted = new ManualResetEvent(false);
  1400. ChoiceIndex choiceIndex = new ChoiceIndex();
  1401. int i = 0;
  1402. foreach (Action action in actions)
  1403. {
  1404. Action act = action;
  1405. int value = i;
  1406. workItemsGroup.QueueWorkItem(() => { act(); Interlocked.CompareExchange(ref choiceIndex._index, value, -1); anActionCompleted.Set(); });
  1407. ++i;
  1408. }
  1409. workItemsGroup.Start();
  1410. anActionCompleted.WaitOne();
  1411. anActionCompleted.Dispose();
  1412. return choiceIndex._index;
  1413. }
  1414. /// <summary>
  1415. /// Executes all actions in parallel
  1416. /// Returns when the first one completes
  1417. /// </summary>
  1418. /// <param name="actions">Actions to execute</param>
  1419. public int Choice(params Action[] actions)
  1420. {
  1421. return Choice((IEnumerable<Action>)actions);
  1422. }
  1423. /// <summary>
  1424. /// Executes actions in sequence asynchronously.
  1425. /// Returns immediately.
  1426. /// </summary>
  1427. /// <param name="pipeState">A state context that passes </param>
  1428. /// <param name="actions">Actions to execute in the order they should run</param>
  1429. public void Pipe<T>(T pipeState, IEnumerable<Action<T>> actions)
  1430. {
  1431. WIGStartInfo wigStartInfo = new WIGStartInfo { StartSuspended = true };
  1432. IWorkItemsGroup workItemsGroup = CreateWorkItemsGroup(1, wigStartInfo);
  1433. foreach (Action<T> action in actions)
  1434. {
  1435. Action<T> act = action;
  1436. workItemsGroup.QueueWorkItem(() => act(pipeState));
  1437. }
  1438. workItemsGroup.Start();
  1439. workItemsGroup.WaitForIdle();
  1440. }
  1441. /// <summary>
  1442. /// Executes actions in sequence asynchronously.
  1443. /// Returns immediately.
  1444. /// </summary>
  1445. /// <param name="pipeState"></param>
  1446. /// <param name="actions">Actions to execute in the order they should run</param>
  1447. public void Pipe<T>(T pipeState, params Action<T>[] actions)
  1448. {
  1449. Pipe(pipeState, (IEnumerable<Action<T>>)actions);
  1450. }
  1451. #endregion
  1452. }
  1453. #endregion
  1454. }