SmartThreadPool.cs 61 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649
  1. #region Release History
  2. // Smart Thread Pool
  3. // 7 Aug 2004 - Initial release
  4. //
  5. // 14 Sep 2004 - Bug fixes
  6. //
  7. // 15 Oct 2004 - Added new features
  8. // - Work items return result.
  9. // - Support waiting synchronization for multiple work items.
  10. // - Work items can be cancelled.
  11. // - Passage of the caller thread’s context to the thread in the pool.
  12. // - Minimal usage of WIN32 handles.
  13. // - Minor bug fixes.
  14. //
  15. // 26 Dec 2004 - Changes:
  16. // - Removed static constructors.
  17. // - Added finalizers.
  18. // - Changed Exceptions so they are serializable.
  19. // - Fixed the bug in one of the SmartThreadPool constructors.
  20. // - Changed the SmartThreadPool.WaitAll() so it will support any number of waiters.
  21. // The SmartThreadPool.WaitAny() is still limited by the .NET Framework.
  22. // - Added PostExecute with options on which cases to call it.
  23. // - Added option to dispose of the state objects.
  24. // - Added a WaitForIdle() method that waits until the work items queue is empty.
  25. // - Added an STPStartInfo class for the initialization of the thread pool.
  26. // - Changed exception handling so if a work item throws an exception it
  27. // is rethrown at GetResult(), rather then firing an UnhandledException event.
  28. // Note that PostExecute exception are always ignored.
  29. //
  30. // 25 Mar 2005 - Changes:
  31. // - Fixed lost of work items bug
  32. //
  33. // 3 Jul 2005: Changes.
  34. // - Fixed bug where Enqueue() throws an exception because PopWaiter() returned null, hardly reconstructed.
  35. //
  36. // 16 Aug 2005: Changes.
  37. // - Fixed bug where the InUseThreads becomes negative when canceling work items.
  38. //
  39. // 31 Jan 2006 - Changes:
  40. // - Added work items priority
  41. // - Removed support of chained delegates in callbacks and post executes (nobody really use this)
  42. // - Added work items groups
  43. // - Added work items groups idle event
  44. // - Changed SmartThreadPool.WaitAll() behavior so when it gets empty array
  45. // it returns true rather then throwing an exception.
  46. // - Added option to start the STP and the WIG as suspended
  47. // - Exception behavior changed, the real exception is returned by an
  48. // inner exception
  49. // - Added performance counters
  50. // - Added priority to the threads in the pool
  51. //
  52. // 13 Feb 2006 - Changes:
  53. // - Added a call to the dispose of the Performance Counter so
  54. // their won't be a Performance Counter leak.
  55. // - Added exception catch in case the Performance Counters cannot
  56. // be created.
  57. //
  58. // 17 May 2008 - Changes:
  59. // - Changed the dispose behavior and removed the Finalizers.
  60. // - Enabled the change of the MaxThreads and MinThreads at run time.
  61. // - Enabled the change of the Concurrency of a IWorkItemsGroup at run
  62. // time If the IWorkItemsGroup is a SmartThreadPool then the Concurrency
  63. // refers to the MaxThreads.
  64. // - Improved the cancel behavior.
  65. // - Added events for thread creation and termination.
  66. // - Fixed the HttpContext context capture.
  67. // - Changed internal collections so they use generic collections
  68. // - Added IsIdle flag to the SmartThreadPool and IWorkItemsGroup
  69. // - Added support for WinCE
  70. // - Added support for Action<T> and Func<T>
  71. //
  72. // 07 April 2009 - Changes:
  73. // - Added support for Silverlight and Mono
  74. // - Added Join, Choice, and Pipe to SmartThreadPool.
  75. // - Added local performance counters (for Mono, Silverlight, and WindowsCE)
  76. // - Changed duration measures from DateTime.Now to Stopwatch.
  77. // - Queues changed from System.Collections.Queue to System.Collections.Generic.LinkedList<T>.
  78. //
  79. // 21 December 2009 - Changes:
  80. // - Added work item timeout (passive)
  81. //
  82. // 20 August 2012 - Changes:
  83. // - Added set name to threads
  84. // - Fixed the WorkItemsQueue.Dequeue.
  85. // Replaced while (!Monitor.TryEnter(this)); with lock(this) { ... }
  86. // - Fixed SmartThreadPool.Pipe
  87. // - Added IsBackground option to threads
  88. // - Added ApartmentState to threads
  89. // - Fixed thread creation when queuing many work items at the same time.
  90. //
  91. // 24 August 2012 - Changes:
  92. // - Enabled cancel abort after cancel. See: http://smartthreadpool.codeplex.com/discussions/345937 by alecswan
  93. // - Added option to set MaxStackSize of threads
  94. #endregion
  95. using System;
  96. using System.Security;
  97. using System.Threading;
  98. using System.Collections;
  99. using System.Collections.Generic;
  100. using System.Diagnostics;
  101. using System.Runtime.CompilerServices;
  102. using Amib.Threading.Internal;
  103. namespace Amib.Threading
  104. {
  105. #region SmartThreadPool class
  106. /// <summary>
  107. /// Smart thread pool class.
  108. /// </summary>
  109. public partial class SmartThreadPool : WorkItemsGroupBase, IDisposable
  110. {
  111. #region Public Default Constants
  112. /// <summary>
  113. /// Default minimum number of threads the thread pool contains. (0)
  114. /// </summary>
  115. public const int DefaultMinWorkerThreads = 0;
  116. /// <summary>
  117. /// Default maximum number of threads the thread pool contains. (25)
  118. /// </summary>
  119. public const int DefaultMaxWorkerThreads = 25;
  120. /// <summary>
  121. /// Default idle timeout in milliseconds. (One minute)
  122. /// </summary>
  123. public const int DefaultIdleTimeout = 60 * 1000; // One minute
  124. /// <summary>
  125. /// Indicate to copy the security context of the caller and then use it in the call. (false)
  126. /// </summary>
  127. public const bool DefaultUseCallerCallContext = false;
  128. /// <summary>
  129. /// Indicate to dispose of the state objects if they support the IDispose interface. (false)
  130. /// </summary>
  131. public const bool DefaultDisposeOfStateObjects = false;
  132. /// <summary>
  133. /// The default option to run the post execute (CallToPostExecute.Always)
  134. /// </summary>
  135. public const CallToPostExecute DefaultCallToPostExecute = CallToPostExecute.Always;
  136. /// <summary>
  137. /// The default post execute method to run. (None)
  138. /// When null it means not to call it.
  139. /// </summary>
  140. public static readonly PostExecuteWorkItemCallback DefaultPostExecuteWorkItemCallback;
  141. /// <summary>
  142. /// The default is to work on work items as soon as they arrive
  143. /// and not to wait for the start. (false)
  144. /// </summary>
  145. public const bool DefaultStartSuspended = false;
  146. /// <summary>
  147. /// The default name to use for the performance counters instance. (null)
  148. /// </summary>
  149. public static readonly string DefaultPerformanceCounterInstanceName;
  150. /// <summary>
  151. /// The default thread priority (ThreadPriority.Normal)
  152. /// </summary>
  153. public const ThreadPriority DefaultThreadPriority = ThreadPriority.Normal;
  154. /// <summary>
  155. /// The default thread pool name. (SmartThreadPool)
  156. /// </summary>
  157. public const string DefaultThreadPoolName = "SmartThreadPool";
  158. /// <summary>
  159. /// The default Max Stack Size. (SmartThreadPool)
  160. /// </summary>
  161. public static readonly int? DefaultMaxStackSize = null;
  162. /// <summary>
  163. /// The default fill state with params. (false)
  164. /// It is relevant only to QueueWorkItem of Action&lt;...&gt;/Func&lt;...&gt;
  165. /// </summary>
  166. public const bool DefaultFillStateWithArgs = false;
  167. /// <summary>
  168. /// The default thread backgroundness. (true)
  169. /// </summary>
  170. public const bool DefaultAreThreadsBackground = true;
  171. /// <summary>
  172. /// The default apartment state of a thread in the thread pool.
  173. /// The default is ApartmentState.Unknown which means the STP will not
  174. /// set the apartment of the thread. It will use the .NET default.
  175. /// </summary>
  176. public const ApartmentState DefaultApartmentState = ApartmentState.Unknown;
  177. #endregion
  178. #region Member Variables
  179. /// <summary>
  180. /// Dictionary of all the threads in the thread pool.
  181. /// </summary>
  182. private readonly SynchronizedDictionary<Thread, ThreadEntry> _workerThreads = new SynchronizedDictionary<Thread, ThreadEntry>();
  183. /// <summary>
  184. /// Queue of work items.
  185. /// </summary>
  186. private readonly WorkItemsQueue _workItemsQueue = new WorkItemsQueue();
  187. /// <summary>
  188. /// Count the work items handled.
  189. /// Used by the performance counter.
  190. /// </summary>
  191. private int _workItemsProcessed;
  192. /// <summary>
  193. /// Number of threads that currently work (not idle).
  194. /// </summary>
  195. private int _inUseWorkerThreads;
  196. /// <summary>
  197. /// Stores a copy of the original STPStartInfo.
  198. /// It is used to change the MinThread and MaxThreads
  199. /// </summary>
  200. private STPStartInfo _stpStartInfo;
  201. /// <summary>
  202. /// Total number of work items that are stored in the work items queue
  203. /// plus the work items that the threads in the pool are working on.
  204. /// </summary>
  205. private int _currentWorkItemsCount;
  206. /// <summary>
  207. /// Signaled when the thread pool is idle, i.e. no thread is busy
  208. /// and the work items queue is empty
  209. /// </summary>
  210. private ManualResetEvent _isIdleWaitHandle = new ManualResetEvent(true);
  211. /// <summary>
  212. /// An event to signal all the threads to quit immediately.
  213. /// </summary>
  214. private ManualResetEvent _shuttingDownEvent = new ManualResetEvent(false);
  215. /// <summary>
  216. /// A flag to indicate if the Smart Thread Pool is now suspended.
  217. /// </summary>
  218. private bool _isSuspended;
  219. /// <summary>
  220. /// A flag to indicate the threads to quit.
  221. /// </summary>
  222. private bool _shutdown;
  223. /// <summary>
  224. /// Counts the threads created in the pool.
  225. /// It is used to name the threads.
  226. /// </summary>
  227. private int _threadCounter;
  228. /// <summary>
  229. /// Indicate that the SmartThreadPool has been disposed
  230. /// </summary>
  231. private bool _isDisposed;
  232. /// <summary>
  233. /// Holds all the WorkItemsGroup instaces that have at least one
  234. /// work item int the SmartThreadPool
  235. /// This variable is used in case of Shutdown
  236. /// </summary>
  237. private readonly SynchronizedDictionary<IWorkItemsGroup, IWorkItemsGroup> _workItemsGroups = new SynchronizedDictionary<IWorkItemsGroup, IWorkItemsGroup>();
  238. /// <summary>
  239. /// A common object for all the work items int the STP
  240. /// so we can mark them to cancel in O(1)
  241. /// </summary>
  242. private CanceledWorkItemsGroup _canceledSmartThreadPool = new CanceledWorkItemsGroup();
  243. /// <summary>
  244. /// Windows STP performance counters
  245. /// </summary>
  246. private ISTPInstancePerformanceCounters _windowsPCs = NullSTPInstancePerformanceCounters.Instance;
  247. /// <summary>
  248. /// Local STP performance counters
  249. /// </summary>
  250. private ISTPInstancePerformanceCounters _localPCs = NullSTPInstancePerformanceCounters.Instance;
  251. [ThreadStatic]
  252. private static ThreadEntry _threadEntry;
  253. /// <summary>
  254. /// An event to call after a thread is created, but before
  255. /// it's first use.
  256. /// </summary>
  257. private event ThreadInitializationHandler _onThreadInitialization;
  258. /// <summary>
  259. /// An event to call when a thread is about to exit, after
  260. /// it is no longer belong to the pool.
  261. /// </summary>
  262. private event ThreadTerminationHandler _onThreadTermination;
  263. #endregion
  264. #region Per thread properties
  265. /// <summary>
  266. /// A reference to the current work item a thread from the thread pool
  267. /// is executing.
  268. /// </summary>
  269. internal static ThreadEntry CurrentThreadEntry
  270. {
  271. get
  272. {
  273. return _threadEntry;
  274. }
  275. set
  276. {
  277. _threadEntry = value;
  278. }
  279. }
  280. #endregion
  281. #region Construction and Finalization
  282. /// <summary>
  283. /// Constructor
  284. /// </summary>
  285. public SmartThreadPool()
  286. {
  287. _stpStartInfo = new STPStartInfo();
  288. Initialize();
  289. }
  290. /// <summary>
  291. /// Constructor
  292. /// </summary>
  293. /// <param name="idleTimeout">Idle timeout in milliseconds</param>
  294. public SmartThreadPool(int idleTimeout)
  295. {
  296. _stpStartInfo = new STPStartInfo
  297. {
  298. IdleTimeout = idleTimeout,
  299. };
  300. Initialize();
  301. }
  302. /// <summary>
  303. /// Constructor
  304. /// </summary>
  305. /// <param name="idleTimeout">Idle timeout in milliseconds</param>
  306. /// <param name="maxWorkerThreads">Upper limit of threads in the pool</param>
  307. public SmartThreadPool(
  308. int idleTimeout,
  309. int maxWorkerThreads)
  310. {
  311. _stpStartInfo = new STPStartInfo
  312. {
  313. IdleTimeout = idleTimeout,
  314. MaxWorkerThreads = maxWorkerThreads,
  315. };
  316. Initialize();
  317. }
  318. /// <summary>
  319. /// Constructor
  320. /// </summary>
  321. /// <param name="idleTimeout">Idle timeout in milliseconds</param>
  322. /// <param name="maxWorkerThreads">Upper limit of threads in the pool</param>
  323. /// <param name="minWorkerThreads">Lower limit of threads in the pool</param>
  324. public SmartThreadPool(
  325. int idleTimeout,
  326. int maxWorkerThreads,
  327. int minWorkerThreads)
  328. {
  329. _stpStartInfo = new STPStartInfo
  330. {
  331. IdleTimeout = idleTimeout,
  332. MaxWorkerThreads = maxWorkerThreads,
  333. MinWorkerThreads = minWorkerThreads,
  334. };
  335. Initialize();
  336. }
  337. /// <summary>
  338. /// Constructor
  339. /// </summary>
  340. /// <param name="stpStartInfo">A SmartThreadPool configuration that overrides the default behavior</param>
  341. public SmartThreadPool(STPStartInfo stpStartInfo)
  342. {
  343. _stpStartInfo = new STPStartInfo(stpStartInfo);
  344. Initialize();
  345. }
  346. private void Initialize()
  347. {
  348. Name = _stpStartInfo.ThreadPoolName;
  349. ValidateSTPStartInfo();
  350. // _stpStartInfoRW stores a read/write copy of the STPStartInfo.
  351. // Actually only MaxWorkerThreads and MinWorkerThreads are overwritten
  352. _isSuspended = _stpStartInfo.StartSuspended;
  353. if (null != _stpStartInfo.PerformanceCounterInstanceName)
  354. {
  355. try
  356. {
  357. _windowsPCs = new STPInstancePerformanceCounters(_stpStartInfo.PerformanceCounterInstanceName);
  358. }
  359. catch (Exception e)
  360. {
  361. Debug.WriteLine("Unable to create Performance Counters: " + e);
  362. _windowsPCs = NullSTPInstancePerformanceCounters.Instance;
  363. }
  364. }
  365. if (_stpStartInfo.EnableLocalPerformanceCounters)
  366. {
  367. _localPCs = new LocalSTPInstancePerformanceCounters();
  368. }
  369. // If the STP is not started suspended then start the threads.
  370. if (!_isSuspended)
  371. {
  372. StartOptimalNumberOfThreads();
  373. }
  374. }
  375. private void StartOptimalNumberOfThreads()
  376. {
  377. int threadsCount = Math.Max(_workItemsQueue.Count, _stpStartInfo.MinWorkerThreads);
  378. threadsCount = Math.Min(threadsCount, _stpStartInfo.MaxWorkerThreads);
  379. threadsCount -= _workerThreads.Count;
  380. if (threadsCount > 0)
  381. {
  382. StartThreads(threadsCount);
  383. }
  384. }
  385. private void ValidateSTPStartInfo()
  386. {
  387. if (_stpStartInfo.MinWorkerThreads < 0)
  388. {
  389. throw new ArgumentOutOfRangeException(
  390. "MinWorkerThreads", "MinWorkerThreads cannot be negative");
  391. }
  392. if (_stpStartInfo.MaxWorkerThreads <= 0)
  393. {
  394. throw new ArgumentOutOfRangeException(
  395. "MaxWorkerThreads", "MaxWorkerThreads must be greater than zero");
  396. }
  397. if (_stpStartInfo.MinWorkerThreads > _stpStartInfo.MaxWorkerThreads)
  398. {
  399. throw new ArgumentOutOfRangeException(
  400. "MinWorkerThreads, maxWorkerThreads",
  401. "MaxWorkerThreads must be greater or equal to MinWorkerThreads");
  402. }
  403. }
  404. private static void ValidateCallback(Delegate callback)
  405. {
  406. if (callback.GetInvocationList().Length > 1)
  407. {
  408. throw new NotSupportedException("SmartThreadPool doesn't support delegates chains");
  409. }
  410. }
  411. #endregion
  412. #region Thread Processing
  413. /// <summary>
  414. /// Waits on the queue for a work item, shutdown, or timeout.
  415. /// </summary>
  416. /// <returns>
  417. /// Returns the WaitingCallback or null in case of timeout or shutdown.
  418. /// </returns>
  419. private WorkItem Dequeue()
  420. {
  421. WorkItem workItem =
  422. _workItemsQueue.DequeueWorkItem(_stpStartInfo.IdleTimeout, _shuttingDownEvent);
  423. return workItem;
  424. }
  425. /// <summary>
  426. /// Put a new work item in the queue
  427. /// </summary>
  428. /// <param name="workItem">A work item to queue</param>
  429. internal override void Enqueue(WorkItem workItem)
  430. {
  431. // Make sure the workItem is not null
  432. Debug.Assert(null != workItem);
  433. IncrementWorkItemsCount();
  434. workItem.CanceledSmartThreadPool = _canceledSmartThreadPool;
  435. _workItemsQueue.EnqueueWorkItem(workItem);
  436. workItem.WorkItemIsQueued();
  437. // If all the threads are busy then try to create a new one
  438. if (_currentWorkItemsCount > _workerThreads.Count)
  439. {
  440. StartThreads(1);
  441. }
  442. }
  443. private void IncrementWorkItemsCount()
  444. {
  445. _windowsPCs.SampleWorkItems(_workItemsQueue.Count, _workItemsProcessed);
  446. _localPCs.SampleWorkItems(_workItemsQueue.Count, _workItemsProcessed);
  447. int count = Interlocked.Increment(ref _currentWorkItemsCount);
  448. //Trace.WriteLine("WorkItemsCount = " + _currentWorkItemsCount.ToString());
  449. if (count == 1)
  450. {
  451. IsIdle = false;
  452. _isIdleWaitHandle.Reset();
  453. }
  454. }
  455. private void DecrementWorkItemsCount()
  456. {
  457. int count = Interlocked.Decrement(ref _currentWorkItemsCount);
  458. //Trace.WriteLine("WorkItemsCount = " + _currentWorkItemsCount.ToString());
  459. if (count == 0)
  460. {
  461. IsIdle = true;
  462. _isIdleWaitHandle.Set();
  463. }
  464. Interlocked.Increment(ref _workItemsProcessed);
  465. if (!_shutdown)
  466. {
  467. // The counter counts even if the work item was cancelled
  468. _windowsPCs.SampleWorkItems(_workItemsQueue.Count, _workItemsProcessed);
  469. _localPCs.SampleWorkItems(_workItemsQueue.Count, _workItemsProcessed);
  470. }
  471. }
  472. internal void RegisterWorkItemsGroup(IWorkItemsGroup workItemsGroup)
  473. {
  474. _workItemsGroups[workItemsGroup] = workItemsGroup;
  475. }
  476. internal void UnregisterWorkItemsGroup(IWorkItemsGroup workItemsGroup)
  477. {
  478. if (_workItemsGroups.Contains(workItemsGroup))
  479. {
  480. _workItemsGroups.Remove(workItemsGroup);
  481. }
  482. }
  483. /// <summary>
  484. /// Inform that the current thread is about to quit or quiting.
  485. /// The same thread may call this method more than once.
  486. /// </summary>
  487. private void InformCompleted()
  488. {
  489. // There is no need to lock the two methods together
  490. // since only the current thread removes itself
  491. // and the _workerThreads is a synchronized dictionary
  492. if (_workerThreads.Contains(Thread.CurrentThread))
  493. {
  494. _workerThreads.Remove(Thread.CurrentThread);
  495. _windowsPCs.SampleThreads(_workerThreads.Count, _inUseWorkerThreads);
  496. _localPCs.SampleThreads(_workerThreads.Count, _inUseWorkerThreads);
  497. }
  498. }
  499. /// <summary>
  500. /// Starts new threads
  501. /// </summary>
  502. /// <param name="threadsCount">The number of threads to start</param>
  503. private void StartThreads(int threadsCount)
  504. {
  505. if (_isSuspended)
  506. {
  507. return;
  508. }
  509. lock (_workerThreads.SyncRoot)
  510. {
  511. // Don't start threads on shut down
  512. if (_shutdown)
  513. {
  514. return;
  515. }
  516. for (int i = 0; i < threadsCount; ++i)
  517. {
  518. // Don't create more threads then the upper limit
  519. if (_workerThreads.Count >= _stpStartInfo.MaxWorkerThreads)
  520. {
  521. return;
  522. }
  523. // Create a new thread
  524. Thread workerThread;
  525. if(_stpStartInfo.SuppressFlow)
  526. {
  527. using(ExecutionContext.SuppressFlow())
  528. {
  529. workerThread =
  530. _stpStartInfo.MaxStackSize.HasValue
  531. ? new Thread(ProcessQueuedItems, _stpStartInfo.MaxStackSize.Value)
  532. : new Thread(ProcessQueuedItems);
  533. }
  534. }
  535. else
  536. {
  537. workerThread =
  538. _stpStartInfo.MaxStackSize.HasValue
  539. ? new Thread(ProcessQueuedItems, _stpStartInfo.MaxStackSize.Value)
  540. : new Thread(ProcessQueuedItems);
  541. }
  542. // Configure the new thread and start it
  543. workerThread.IsBackground = _stpStartInfo.AreThreadsBackground;
  544. if (_stpStartInfo.ApartmentState != ApartmentState.Unknown)
  545. {
  546. workerThread.SetApartmentState(_stpStartInfo.ApartmentState);
  547. }
  548. workerThread.Priority = _stpStartInfo.ThreadPriority;
  549. workerThread.Name = string.Format("STP:{0}:{1}", Name, _threadCounter);
  550. workerThread.Start();
  551. ++_threadCounter;
  552. // Add it to the dictionary and update its creation time.
  553. _workerThreads[workerThread] = new ThreadEntry(this);
  554. _windowsPCs.SampleThreads(_workerThreads.Count, _inUseWorkerThreads);
  555. _localPCs.SampleThreads(_workerThreads.Count, _inUseWorkerThreads);
  556. }
  557. }
  558. }
  559. /// <summary>
  560. /// A worker thread method that processes work items from the work items queue.
  561. /// </summary>
  562. private void ProcessQueuedItems()
  563. {
  564. // Keep the entry of the dictionary as thread's variable to avoid the synchronization locks
  565. // of the dictionary.
  566. CurrentThreadEntry = _workerThreads[Thread.CurrentThread];
  567. FireOnThreadInitialization();
  568. try
  569. {
  570. bool bInUseWorkerThreadsWasIncremented = false;
  571. // Process until shutdown.
  572. while (!_shutdown)
  573. {
  574. // Update the last time this thread was seen alive.
  575. // It's good for debugging.
  576. CurrentThreadEntry.IAmAlive();
  577. // The following block handles the when the MaxWorkerThreads has been
  578. // incremented by the user at run-time.
  579. // Double lock for quit.
  580. if (_workerThreads.Count > _stpStartInfo.MaxWorkerThreads)
  581. {
  582. lock (_workerThreads.SyncRoot)
  583. {
  584. if (_workerThreads.Count > _stpStartInfo.MaxWorkerThreads)
  585. {
  586. // Inform that the thread is quiting and then quit.
  587. // This method must be called within this lock or else
  588. // more threads will quit and the thread pool will go
  589. // below the lower limit.
  590. InformCompleted();
  591. break;
  592. }
  593. }
  594. }
  595. // Wait for a work item, shutdown, or timeout
  596. WorkItem workItem = Dequeue();
  597. // Update the last time this thread was seen alive.
  598. // It's good for debugging.
  599. CurrentThreadEntry.IAmAlive();
  600. // On timeout or shut down.
  601. if (null == workItem)
  602. {
  603. // Double lock for quit.
  604. if (_workerThreads.Count > _stpStartInfo.MinWorkerThreads)
  605. {
  606. lock (_workerThreads.SyncRoot)
  607. {
  608. if (_workerThreads.Count > _stpStartInfo.MinWorkerThreads)
  609. {
  610. // Inform that the thread is quiting and then quit.
  611. // This method must be called within this lock or else
  612. // more threads will quit and the thread pool will go
  613. // below the lower limit.
  614. InformCompleted();
  615. break;
  616. }
  617. }
  618. }
  619. }
  620. // If we didn't quit then skip to the next iteration.
  621. if (null == workItem)
  622. {
  623. continue;
  624. }
  625. try
  626. {
  627. // Initialize the value to false
  628. bInUseWorkerThreadsWasIncremented = false;
  629. // Set the Current Work Item of the thread.
  630. // Store the Current Work Item before the workItem.StartingWorkItem() is called,
  631. // so WorkItem.Cancel can work when the work item is between InQueue and InProgress
  632. // states.
  633. // If the work item has been cancelled BEFORE the workItem.StartingWorkItem()
  634. // (work item is in InQueue state) then workItem.StartingWorkItem() will return false.
  635. // If the work item has been cancelled AFTER the workItem.StartingWorkItem() then
  636. // (work item is in InProgress state) then the thread will be aborted
  637. CurrentThreadEntry.CurrentWorkItem = workItem;
  638. // Change the state of the work item to 'in progress' if possible.
  639. // We do it here so if the work item has been canceled we won't
  640. // increment the _inUseWorkerThreads.
  641. // The cancel mechanism doesn't delete items from the queue,
  642. // it marks the work item as canceled, and when the work item
  643. // is dequeued, we just skip it.
  644. // If the post execute of work item is set to always or to
  645. // call when the work item is canceled then the StartingWorkItem()
  646. // will return true, so the post execute can run.
  647. if (!workItem.StartingWorkItem())
  648. {
  649. continue;
  650. }
  651. // Execute the callback. Make sure to accurately
  652. // record how many callbacks are currently executing.
  653. int inUseWorkerThreads = Interlocked.Increment(ref _inUseWorkerThreads);
  654. _windowsPCs.SampleThreads(_workerThreads.Count, inUseWorkerThreads);
  655. _localPCs.SampleThreads(_workerThreads.Count, inUseWorkerThreads);
  656. // Mark that the _inUseWorkerThreads incremented, so in the finally{}
  657. // statement we will decrement it correctly.
  658. bInUseWorkerThreadsWasIncremented = true;
  659. workItem.FireWorkItemStarted();
  660. ExecuteWorkItem(workItem);
  661. }
  662. catch (Exception ex)
  663. {
  664. ex.GetHashCode();
  665. // Do nothing
  666. }
  667. finally
  668. {
  669. workItem.DisposeOfState();
  670. // Set the CurrentWorkItem to null, since we
  671. // no longer run user's code.
  672. CurrentThreadEntry.CurrentWorkItem = null;
  673. // Decrement the _inUseWorkerThreads only if we had
  674. // incremented it. Note the cancelled work items don't
  675. // increment _inUseWorkerThreads.
  676. if (bInUseWorkerThreadsWasIncremented)
  677. {
  678. int inUseWorkerThreads = Interlocked.Decrement(ref _inUseWorkerThreads);
  679. _windowsPCs.SampleThreads(_workerThreads.Count, inUseWorkerThreads);
  680. _localPCs.SampleThreads(_workerThreads.Count, inUseWorkerThreads);
  681. }
  682. // Notify that the work item has been completed.
  683. // WorkItemsGroup may enqueue their next work item.
  684. workItem.FireWorkItemCompleted();
  685. // Decrement the number of work items here so the idle
  686. // ManualResetEvent won't fluctuate.
  687. DecrementWorkItemsCount();
  688. }
  689. }
  690. }
  691. catch (ThreadAbortException tae)
  692. {
  693. tae.GetHashCode();
  694. // Handle the abort exception gracfully.
  695. Thread.ResetAbort();
  696. }
  697. catch (Exception e)
  698. {
  699. Debug.Assert(null != e);
  700. }
  701. finally
  702. {
  703. InformCompleted();
  704. FireOnThreadTermination();
  705. }
  706. }
  707. private void ExecuteWorkItem(WorkItem workItem)
  708. {
  709. _windowsPCs.SampleWorkItemsWaitTime(workItem.WaitingTime);
  710. _localPCs.SampleWorkItemsWaitTime(workItem.WaitingTime);
  711. try
  712. {
  713. workItem.Execute();
  714. }
  715. finally
  716. {
  717. _windowsPCs.SampleWorkItemsProcessTime(workItem.ProcessTime);
  718. _localPCs.SampleWorkItemsProcessTime(workItem.ProcessTime);
  719. }
  720. }
  721. #endregion
  722. #region Public Methods
  723. private void ValidateWaitForIdle()
  724. {
  725. if (null != CurrentThreadEntry && CurrentThreadEntry.AssociatedSmartThreadPool == this)
  726. {
  727. throw new NotSupportedException(
  728. "WaitForIdle cannot be called from a thread on its SmartThreadPool, it causes a deadlock");
  729. }
  730. }
  731. internal static void ValidateWorkItemsGroupWaitForIdle(IWorkItemsGroup workItemsGroup)
  732. {
  733. if (null == CurrentThreadEntry)
  734. {
  735. return;
  736. }
  737. WorkItem workItem = CurrentThreadEntry.CurrentWorkItem;
  738. ValidateWorkItemsGroupWaitForIdleImpl(workItemsGroup, workItem);
  739. if ((null != workItemsGroup) &&
  740. (null != workItem) &&
  741. CurrentThreadEntry.CurrentWorkItem.WasQueuedBy(workItemsGroup))
  742. {
  743. throw new NotSupportedException("WaitForIdle cannot be called from a thread on its SmartThreadPool, it causes a deadlock");
  744. }
  745. }
  746. [MethodImpl(MethodImplOptions.NoInlining)]
  747. private static void ValidateWorkItemsGroupWaitForIdleImpl(IWorkItemsGroup workItemsGroup, WorkItem workItem)
  748. {
  749. if ((null != workItemsGroup) &&
  750. (null != workItem) &&
  751. workItem.WasQueuedBy(workItemsGroup))
  752. {
  753. throw new NotSupportedException("WaitForIdle cannot be called from a thread on its SmartThreadPool, it causes a deadlock");
  754. }
  755. }
  756. /// <summary>
  757. /// Force the SmartThreadPool to shutdown
  758. /// </summary>
  759. public void Shutdown()
  760. {
  761. Shutdown(true, 0);
  762. }
  763. /// <summary>
  764. /// Force the SmartThreadPool to shutdown with timeout
  765. /// </summary>
  766. public void Shutdown(bool forceAbort, TimeSpan timeout)
  767. {
  768. Shutdown(forceAbort, (int)timeout.TotalMilliseconds);
  769. }
  770. /// <summary>
  771. /// Empties the queue of work items and abort the threads in the pool.
  772. /// </summary>
  773. public void Shutdown(bool forceAbort, int millisecondsTimeout)
  774. {
  775. ValidateNotDisposed();
  776. ISTPInstancePerformanceCounters pcs = _windowsPCs;
  777. if (NullSTPInstancePerformanceCounters.Instance != _windowsPCs)
  778. {
  779. // Set the _pcs to "null" to stop updating the performance
  780. // counters
  781. _windowsPCs = NullSTPInstancePerformanceCounters.Instance;
  782. pcs.Dispose();
  783. }
  784. Thread[] threads;
  785. lock (_workerThreads.SyncRoot)
  786. {
  787. // Shutdown the work items queue
  788. _workItemsQueue.Dispose();
  789. // Signal the threads to exit
  790. _shutdown = true;
  791. _shuttingDownEvent.Set();
  792. // Make a copy of the threads' references in the pool
  793. threads = new Thread[_workerThreads.Count];
  794. _workerThreads.Keys.CopyTo(threads, 0);
  795. }
  796. int millisecondsLeft = millisecondsTimeout;
  797. Stopwatch stopwatch = Stopwatch.StartNew();
  798. //DateTime start = DateTime.UtcNow;
  799. bool waitInfinitely = (Timeout.Infinite == millisecondsTimeout);
  800. bool timeout = false;
  801. // Each iteration we update the time left for the timeout.
  802. foreach (Thread thread in threads)
  803. {
  804. // Join don't work with negative numbers
  805. if (!waitInfinitely && (millisecondsLeft < 0))
  806. {
  807. timeout = true;
  808. break;
  809. }
  810. // Wait for the thread to terminate
  811. bool success = thread.Join(millisecondsLeft);
  812. if (!success)
  813. {
  814. timeout = true;
  815. break;
  816. }
  817. if (!waitInfinitely)
  818. {
  819. // Update the time left to wait
  820. //TimeSpan ts = DateTime.UtcNow - start;
  821. millisecondsLeft = millisecondsTimeout - (int)stopwatch.ElapsedMilliseconds;
  822. }
  823. }
  824. if (timeout && forceAbort)
  825. {
  826. // Abort the threads in the pool
  827. foreach (Thread thread in threads)
  828. {
  829. if ((thread != null) && thread.IsAlive )
  830. {
  831. try
  832. {
  833. thread.Abort(); // Shutdown
  834. }
  835. catch (SecurityException e)
  836. {
  837. e.GetHashCode();
  838. }
  839. catch (ThreadStateException ex)
  840. {
  841. ex.GetHashCode();
  842. // In case the thread has been terminated
  843. // after the check if it is alive.
  844. }
  845. }
  846. }
  847. }
  848. }
  849. /// <summary>
  850. /// Wait for all work items to complete
  851. /// </summary>
  852. /// <param name="waitableResults">Array of work item result objects</param>
  853. /// <returns>
  854. /// true when every work item in workItemResults has completed; otherwise false.
  855. /// </returns>
  856. public static bool WaitAll( IWaitableResult[] waitableResults)
  857. {
  858. return WaitAll(waitableResults, Timeout.Infinite, true);
  859. }
  860. /// <summary>
  861. /// Wait for all work items to complete
  862. /// </summary>
  863. /// <param name="waitableResults">Array of work item result objects</param>
  864. /// <param name="timeout">The number of milliseconds to wait, or a TimeSpan that represents -1 milliseconds to wait indefinitely. </param>
  865. /// <param name="exitContext">
  866. /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false.
  867. /// </param>
  868. /// <returns>
  869. /// true when every work item in workItemResults has completed; otherwise false.
  870. /// </returns>
  871. public static bool WaitAll( IWaitableResult[] waitableResults, TimeSpan timeout, bool exitContext)
  872. {
  873. return WaitAll(waitableResults, (int)timeout.TotalMilliseconds, exitContext);
  874. }
  875. /// <summary>
  876. /// Wait for all work items to complete
  877. /// </summary>
  878. /// <param name="waitableResults">Array of work item result objects</param>
  879. /// <param name="timeout">The number of milliseconds to wait, or a TimeSpan that represents -1 milliseconds to wait indefinitely. </param>
  880. /// <param name="exitContext">
  881. /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false.
  882. /// </param>
  883. /// <param name="cancelWaitHandle">A cancel wait handle to interrupt the wait if needed</param>
  884. /// <returns>
  885. /// true when every work item in workItemResults has completed; otherwise false.
  886. /// </returns>
  887. public static bool WaitAll( IWaitableResult[] waitableResults, TimeSpan timeout,
  888. bool exitContext, WaitHandle cancelWaitHandle)
  889. {
  890. return WaitAll(waitableResults, (int)timeout.TotalMilliseconds, exitContext, cancelWaitHandle);
  891. }
  892. /// <summary>
  893. /// Wait for all work items to complete
  894. /// </summary>
  895. /// <param name="waitableResults">Array of work item result objects</param>
  896. /// <param name="millisecondsTimeout">The number of milliseconds to wait, or Timeout.Infinite (-1) to wait indefinitely.</param>
  897. /// <param name="exitContext">
  898. /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false.
  899. /// </param>
  900. /// <returns>
  901. /// true when every work item in workItemResults has completed; otherwise false.
  902. /// </returns>
  903. public static bool WaitAll( IWaitableResult[] waitableResults, int millisecondsTimeout, bool exitContext)
  904. {
  905. return WorkItem.WaitAll(waitableResults, millisecondsTimeout, exitContext, null);
  906. }
  907. /// <summary>
  908. /// Wait for all work items to complete
  909. /// </summary>
  910. /// <param name="waitableResults">Array of work item result objects</param>
  911. /// <param name="millisecondsTimeout">The number of milliseconds to wait, or Timeout.Infinite (-1) to wait indefinitely.</param>
  912. /// <param name="exitContext">
  913. /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false.
  914. /// </param>
  915. /// <param name="cancelWaitHandle">A cancel wait handle to interrupt the wait if needed</param>
  916. /// <returns>
  917. /// true when every work item in workItemResults has completed; otherwise false.
  918. /// </returns>
  919. public static bool WaitAll( IWaitableResult[] waitableResults, int millisecondsTimeout,
  920. bool exitContext, WaitHandle cancelWaitHandle)
  921. {
  922. return WorkItem.WaitAll(waitableResults, millisecondsTimeout, exitContext, cancelWaitHandle);
  923. }
  924. /// <summary>
  925. /// Waits for any of the work items in the specified array to complete, cancel, or timeout
  926. /// </summary>
  927. /// <param name="waitableResults">Array of work item result objects</param>
  928. /// <returns>
  929. /// The array index of the work item result that satisfied the wait, or WaitTimeout if any of the work items has been canceled.
  930. /// </returns>
  931. public static int WaitAny( IWaitableResult[] waitableResults)
  932. {
  933. return WaitAny(waitableResults, Timeout.Infinite, true);
  934. }
  935. /// <summary>
  936. /// Waits for any of the work items in the specified array to complete, cancel, or timeout
  937. /// </summary>
  938. /// <param name="waitableResults">Array of work item result objects</param>
  939. /// <param name="timeout">The number of milliseconds to wait, or a TimeSpan that represents -1 milliseconds to wait indefinitely. </param>
  940. /// <param name="exitContext">
  941. /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false.
  942. /// </param>
  943. /// <returns>
  944. /// The array index of the work item result that satisfied the wait, or WaitTimeout if no work item result satisfied the wait and a time interval equivalent to millisecondsTimeout has passed or the work item has been canceled.
  945. /// </returns>
  946. public static int WaitAny( IWaitableResult[] waitableResults, TimeSpan timeout, bool exitContext)
  947. {
  948. return WaitAny(waitableResults, (int)timeout.TotalMilliseconds, exitContext);
  949. }
  950. /// <summary>
  951. /// Waits for any of the work items in the specified array to complete, cancel, or timeout
  952. /// </summary>
  953. /// <param name="waitableResults">Array of work item result objects</param>
  954. /// <param name="timeout">The number of milliseconds to wait, or a TimeSpan that represents -1 milliseconds to wait indefinitely. </param>
  955. /// <param name="exitContext">
  956. /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false.
  957. /// </param>
  958. /// <param name="cancelWaitHandle">A cancel wait handle to interrupt the wait if needed</param>
  959. /// <returns>
  960. /// The array index of the work item result that satisfied the wait, or WaitTimeout if no work item result satisfied the wait and a time interval equivalent to millisecondsTimeout has passed or the work item has been canceled.
  961. /// </returns>
  962. public static int WaitAny( IWaitableResult[] waitableResults, TimeSpan timeout,
  963. bool exitContext, WaitHandle cancelWaitHandle)
  964. {
  965. return WaitAny(waitableResults, (int)timeout.TotalMilliseconds, exitContext, cancelWaitHandle);
  966. }
  967. /// <summary>
  968. /// Waits for any of the work items in the specified array to complete, cancel, or timeout
  969. /// </summary>
  970. /// <param name="waitableResults">Array of work item result objects</param>
  971. /// <param name="millisecondsTimeout">The number of milliseconds to wait, or Timeout.Infinite (-1) to wait indefinitely.</param>
  972. /// <param name="exitContext">
  973. /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false.
  974. /// </param>
  975. /// <returns>
  976. /// The array index of the work item result that satisfied the wait, or WaitTimeout if no work item result satisfied the wait and a time interval equivalent to millisecondsTimeout has passed or the work item has been canceled.
  977. /// </returns>
  978. public static int WaitAny( IWaitableResult[] waitableResults, int millisecondsTimeout, bool exitContext)
  979. {
  980. return WorkItem.WaitAny(waitableResults, millisecondsTimeout, exitContext, null);
  981. }
  982. /// <summary>
  983. /// Waits for any of the work items in the specified array to complete, cancel, or timeout
  984. /// </summary>
  985. /// <param name="waitableResults">Array of work item result objects</param>
  986. /// <param name="millisecondsTimeout">The number of milliseconds to wait, or Timeout.Infinite (-1) to wait indefinitely.</param>
  987. /// <param name="exitContext">
  988. /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false.
  989. /// </param>
  990. /// <param name="cancelWaitHandle">A cancel wait handle to interrupt the wait if needed</param>
  991. /// <returns>
  992. /// The array index of the work item result that satisfied the wait, or WaitTimeout if no work item result satisfied the wait and a time interval equivalent to millisecondsTimeout has passed or the work item has been canceled.
  993. /// </returns>
  994. public static int WaitAny( IWaitableResult[] waitableResults, int millisecondsTimeout,
  995. bool exitContext, WaitHandle cancelWaitHandle)
  996. {
  997. return WorkItem.WaitAny(waitableResults, millisecondsTimeout, exitContext, cancelWaitHandle);
  998. }
  999. /// <summary>
  1000. /// Creates a new WorkItemsGroup.
  1001. /// </summary>
  1002. /// <param name="concurrency">The number of work items that can be run concurrently</param>
  1003. /// <returns>A reference to the WorkItemsGroup</returns>
  1004. public IWorkItemsGroup CreateWorkItemsGroup(int concurrency)
  1005. {
  1006. IWorkItemsGroup workItemsGroup = new WorkItemsGroup(this, concurrency, _stpStartInfo);
  1007. return workItemsGroup;
  1008. }
  1009. /// <summary>
  1010. /// Creates a new WorkItemsGroup.
  1011. /// </summary>
  1012. /// <param name="concurrency">The number of work items that can be run concurrently</param>
  1013. /// <param name="wigStartInfo">A WorkItemsGroup configuration that overrides the default behavior</param>
  1014. /// <returns>A reference to the WorkItemsGroup</returns>
  1015. public IWorkItemsGroup CreateWorkItemsGroup(int concurrency, WIGStartInfo wigStartInfo)
  1016. {
  1017. IWorkItemsGroup workItemsGroup = new WorkItemsGroup(this, concurrency, wigStartInfo);
  1018. return workItemsGroup;
  1019. }
  1020. #region Fire Thread's Events
  1021. private void FireOnThreadInitialization()
  1022. {
  1023. if (null != _onThreadInitialization)
  1024. {
  1025. foreach (ThreadInitializationHandler tih in _onThreadInitialization.GetInvocationList())
  1026. {
  1027. try
  1028. {
  1029. tih();
  1030. }
  1031. catch (Exception e)
  1032. {
  1033. e.GetHashCode();
  1034. Debug.Assert(false);
  1035. throw;
  1036. }
  1037. }
  1038. }
  1039. }
  1040. private void FireOnThreadTermination()
  1041. {
  1042. if (null != _onThreadTermination)
  1043. {
  1044. foreach (ThreadTerminationHandler tth in _onThreadTermination.GetInvocationList())
  1045. {
  1046. try
  1047. {
  1048. tth();
  1049. }
  1050. catch (Exception e)
  1051. {
  1052. e.GetHashCode();
  1053. Debug.Assert(false);
  1054. throw;
  1055. }
  1056. }
  1057. }
  1058. }
  1059. #endregion
  1060. /// <summary>
  1061. /// This event is fired when a thread is created.
  1062. /// Use it to initialize a thread before the work items use it.
  1063. /// </summary>
  1064. public event ThreadInitializationHandler OnThreadInitialization
  1065. {
  1066. add { _onThreadInitialization += value; }
  1067. remove { _onThreadInitialization -= value; }
  1068. }
  1069. /// <summary>
  1070. /// This event is fired when a thread is terminating.
  1071. /// Use it for cleanup.
  1072. /// </summary>
  1073. public event ThreadTerminationHandler OnThreadTermination
  1074. {
  1075. add { _onThreadTermination += value; }
  1076. remove { _onThreadTermination -= value; }
  1077. }
  1078. internal void CancelAbortWorkItemsGroup(WorkItemsGroup wig)
  1079. {
  1080. foreach (ThreadEntry threadEntry in _workerThreads.Values)
  1081. {
  1082. WorkItem workItem = threadEntry.CurrentWorkItem;
  1083. if (null != workItem &&
  1084. workItem.WasQueuedBy(wig) &&
  1085. !workItem.IsCanceled)
  1086. {
  1087. threadEntry.CurrentWorkItem.GetWorkItemResult().Cancel(true);
  1088. }
  1089. }
  1090. }
  1091. #endregion
  1092. #region Properties
  1093. /// <summary>
  1094. /// Get/Set the lower limit of threads in the pool.
  1095. /// </summary>
  1096. public int MinThreads
  1097. {
  1098. get
  1099. {
  1100. ValidateNotDisposed();
  1101. return _stpStartInfo.MinWorkerThreads;
  1102. }
  1103. set
  1104. {
  1105. Debug.Assert(value >= 0);
  1106. Debug.Assert(value <= _stpStartInfo.MaxWorkerThreads);
  1107. if (_stpStartInfo.MaxWorkerThreads < value)
  1108. {
  1109. _stpStartInfo.MaxWorkerThreads = value;
  1110. }
  1111. _stpStartInfo.MinWorkerThreads = value;
  1112. StartOptimalNumberOfThreads();
  1113. }
  1114. }
  1115. /// <summary>
  1116. /// Get/Set the upper limit of threads in the pool.
  1117. /// </summary>
  1118. public int MaxThreads
  1119. {
  1120. get
  1121. {
  1122. ValidateNotDisposed();
  1123. return _stpStartInfo.MaxWorkerThreads;
  1124. }
  1125. set
  1126. {
  1127. Debug.Assert(value > 0);
  1128. Debug.Assert(value >= _stpStartInfo.MinWorkerThreads);
  1129. if (_stpStartInfo.MinWorkerThreads > value)
  1130. {
  1131. _stpStartInfo.MinWorkerThreads = value;
  1132. }
  1133. _stpStartInfo.MaxWorkerThreads = value;
  1134. StartOptimalNumberOfThreads();
  1135. }
  1136. }
  1137. /// <summary>
  1138. /// Get the number of threads in the thread pool.
  1139. /// Should be between the lower and the upper limits.
  1140. /// </summary>
  1141. public int ActiveThreads
  1142. {
  1143. get
  1144. {
  1145. ValidateNotDisposed();
  1146. return _workerThreads.Count;
  1147. }
  1148. }
  1149. /// <summary>
  1150. /// Get the number of busy (not idle) threads in the thread pool.
  1151. /// </summary>
  1152. public int InUseThreads
  1153. {
  1154. get
  1155. {
  1156. ValidateNotDisposed();
  1157. return _inUseWorkerThreads;
  1158. }
  1159. }
  1160. /// <summary>
  1161. /// Returns true if the current running work item has been cancelled.
  1162. /// Must be used within the work item's callback method.
  1163. /// The work item should sample this value in order to know if it
  1164. /// needs to quit before its completion.
  1165. /// </summary>
  1166. public static bool IsWorkItemCanceled
  1167. {
  1168. get
  1169. {
  1170. return CurrentThreadEntry.CurrentWorkItem.IsCanceled;
  1171. }
  1172. }
  1173. /// <summary>
  1174. /// Checks if the work item has been cancelled, and if yes then abort the thread.
  1175. /// Can be used with Cancel and timeout
  1176. /// </summary>
  1177. public static void AbortOnWorkItemCancel()
  1178. {
  1179. if (IsWorkItemCanceled)
  1180. {
  1181. Thread.CurrentThread.Abort();
  1182. }
  1183. }
  1184. /// <summary>
  1185. /// Thread Pool start information (readonly)
  1186. /// </summary>
  1187. public STPStartInfo STPStartInfo
  1188. {
  1189. get
  1190. {
  1191. return _stpStartInfo.AsReadOnly();
  1192. }
  1193. }
  1194. public bool IsShuttingdown
  1195. {
  1196. get { return _shutdown; }
  1197. }
  1198. /// <summary>
  1199. /// Return the local calculated performance counters
  1200. /// Available only if STPStartInfo.EnableLocalPerformanceCounters is true.
  1201. /// </summary>
  1202. public ISTPPerformanceCountersReader PerformanceCountersReader
  1203. {
  1204. get { return (ISTPPerformanceCountersReader)_localPCs; }
  1205. }
  1206. #endregion
  1207. #region IDisposable Members
  1208. public void Dispose()
  1209. {
  1210. Dispose(true);
  1211. GC.SuppressFinalize(this);
  1212. }
  1213. protected void Dispose(bool disposing)
  1214. {
  1215. if (!_isDisposed)
  1216. {
  1217. if (!_shutdown)
  1218. {
  1219. Shutdown();
  1220. }
  1221. if (null != _shuttingDownEvent)
  1222. {
  1223. _shuttingDownEvent.Close();
  1224. _shuttingDownEvent = null;
  1225. }
  1226. _workerThreads.Clear();
  1227. if (null != _isIdleWaitHandle)
  1228. {
  1229. _isIdleWaitHandle.Close();
  1230. _isIdleWaitHandle = null;
  1231. }
  1232. if (_stpStartInfo.EnableLocalPerformanceCounters)
  1233. _localPCs.Dispose();
  1234. _isDisposed = true;
  1235. }
  1236. }
  1237. private void ValidateNotDisposed()
  1238. {
  1239. if (_isDisposed)
  1240. {
  1241. throw new ObjectDisposedException(GetType().ToString(), "The SmartThreadPool has been shutdown");
  1242. }
  1243. }
  1244. #endregion
  1245. #region WorkItemsGroupBase Overrides
  1246. /// <summary>
  1247. /// Get/Set the maximum number of work items that execute cocurrency on the thread pool
  1248. /// </summary>
  1249. public override int Concurrency
  1250. {
  1251. get { return MaxThreads; }
  1252. set { MaxThreads = value; }
  1253. }
  1254. /// <summary>
  1255. /// Get the number of work items in the queue.
  1256. /// </summary>
  1257. public override int WaitingCallbacks
  1258. {
  1259. get
  1260. {
  1261. ValidateNotDisposed();
  1262. return _workItemsQueue.Count;
  1263. }
  1264. }
  1265. /// <summary>
  1266. /// Get an array with all the state objects of the currently running items.
  1267. /// The array represents a snap shot and impact performance.
  1268. /// </summary>
  1269. public override object[] GetStates()
  1270. {
  1271. object[] states = _workItemsQueue.GetStates();
  1272. return states;
  1273. }
  1274. /// <summary>
  1275. /// WorkItemsGroup start information (readonly)
  1276. /// </summary>
  1277. public override WIGStartInfo WIGStartInfo
  1278. {
  1279. get { return _stpStartInfo.AsReadOnly(); }
  1280. }
  1281. /// <summary>
  1282. /// Start the thread pool if it was started suspended.
  1283. /// If it is already running, this method is ignored.
  1284. /// </summary>
  1285. public override void Start()
  1286. {
  1287. if (!_isSuspended)
  1288. {
  1289. return;
  1290. }
  1291. _isSuspended = false;
  1292. ICollection workItemsGroups = _workItemsGroups.Values;
  1293. foreach (WorkItemsGroup workItemsGroup in workItemsGroups)
  1294. {
  1295. workItemsGroup.OnSTPIsStarting();
  1296. }
  1297. StartOptimalNumberOfThreads();
  1298. }
  1299. /// <summary>
  1300. /// Cancel all work items using thread abortion
  1301. /// </summary>
  1302. /// <param name="abortExecution">True to stop work items by raising ThreadAbortException</param>
  1303. public override void Cancel(bool abortExecution)
  1304. {
  1305. _canceledSmartThreadPool.IsCanceled = true;
  1306. _canceledSmartThreadPool = new CanceledWorkItemsGroup();
  1307. ICollection workItemsGroups = _workItemsGroups.Values;
  1308. foreach (WorkItemsGroup workItemsGroup in workItemsGroups)
  1309. {
  1310. workItemsGroup.Cancel(abortExecution);
  1311. }
  1312. if (abortExecution)
  1313. {
  1314. foreach (ThreadEntry threadEntry in _workerThreads.Values)
  1315. {
  1316. WorkItem workItem = threadEntry.CurrentWorkItem;
  1317. if (null != workItem &&
  1318. threadEntry.AssociatedSmartThreadPool == this &&
  1319. !workItem.IsCanceled)
  1320. {
  1321. threadEntry.CurrentWorkItem.GetWorkItemResult().Cancel(true);
  1322. }
  1323. }
  1324. }
  1325. }
  1326. /// <summary>
  1327. /// Wait for the thread pool to be idle
  1328. /// </summary>
  1329. public override bool WaitForIdle(int millisecondsTimeout)
  1330. {
  1331. ValidateWaitForIdle();
  1332. return STPEventWaitHandle.WaitOne(_isIdleWaitHandle, millisecondsTimeout, false);
  1333. }
  1334. /// <summary>
  1335. /// This event is fired when all work items are completed.
  1336. /// (When IsIdle changes to true)
  1337. /// This event only work on WorkItemsGroup. On SmartThreadPool
  1338. /// it throws the NotImplementedException.
  1339. /// </summary>
  1340. public override event WorkItemsGroupIdleHandler OnIdle
  1341. {
  1342. add
  1343. {
  1344. //_onIdle += value;
  1345. }
  1346. remove
  1347. {
  1348. //_onIdle -= value;
  1349. }
  1350. }
  1351. internal override void PreQueueWorkItem()
  1352. {
  1353. ValidateNotDisposed();
  1354. }
  1355. #endregion
  1356. #region Join, Choice, Pipe, etc.
  1357. /// <summary>
  1358. /// Executes all actions in parallel.
  1359. /// Returns when they all finish.
  1360. /// </summary>
  1361. /// <param name="actions">Actions to execute</param>
  1362. public void Join(IEnumerable<Action> actions)
  1363. {
  1364. WIGStartInfo wigStartInfo = new WIGStartInfo { StartSuspended = true };
  1365. IWorkItemsGroup workItemsGroup = CreateWorkItemsGroup(int.MaxValue, wigStartInfo);
  1366. foreach (Action action in actions)
  1367. {
  1368. workItemsGroup.QueueWorkItem(action);
  1369. }
  1370. workItemsGroup.Start();
  1371. workItemsGroup.WaitForIdle();
  1372. }
  1373. /// <summary>
  1374. /// Executes all actions in parallel.
  1375. /// Returns when they all finish.
  1376. /// </summary>
  1377. /// <param name="actions">Actions to execute</param>
  1378. public void Join(params Action[] actions)
  1379. {
  1380. Join((IEnumerable<Action>)actions);
  1381. }
  1382. private class ChoiceIndex
  1383. {
  1384. public int _index = -1;
  1385. }
  1386. /// <summary>
  1387. /// Executes all actions in parallel
  1388. /// Returns when the first one completes
  1389. /// </summary>
  1390. /// <param name="actions">Actions to execute</param>
  1391. public int Choice(IEnumerable<Action> actions)
  1392. {
  1393. WIGStartInfo wigStartInfo = new WIGStartInfo { StartSuspended = true };
  1394. IWorkItemsGroup workItemsGroup = CreateWorkItemsGroup(int.MaxValue, wigStartInfo);
  1395. ManualResetEvent anActionCompleted = new ManualResetEvent(false);
  1396. ChoiceIndex choiceIndex = new ChoiceIndex();
  1397. int i = 0;
  1398. foreach (Action action in actions)
  1399. {
  1400. Action act = action;
  1401. int value = i;
  1402. workItemsGroup.QueueWorkItem(() => { act(); Interlocked.CompareExchange(ref choiceIndex._index, value, -1); anActionCompleted.Set(); });
  1403. ++i;
  1404. }
  1405. workItemsGroup.Start();
  1406. anActionCompleted.WaitOne();
  1407. anActionCompleted.Dispose();
  1408. return choiceIndex._index;
  1409. }
  1410. /// <summary>
  1411. /// Executes all actions in parallel
  1412. /// Returns when the first one completes
  1413. /// </summary>
  1414. /// <param name="actions">Actions to execute</param>
  1415. public int Choice(params Action[] actions)
  1416. {
  1417. return Choice((IEnumerable<Action>)actions);
  1418. }
  1419. /// <summary>
  1420. /// Executes actions in sequence asynchronously.
  1421. /// Returns immediately.
  1422. /// </summary>
  1423. /// <param name="pipeState">A state context that passes </param>
  1424. /// <param name="actions">Actions to execute in the order they should run</param>
  1425. public void Pipe<T>(T pipeState, IEnumerable<Action<T>> actions)
  1426. {
  1427. WIGStartInfo wigStartInfo = new WIGStartInfo { StartSuspended = true };
  1428. IWorkItemsGroup workItemsGroup = CreateWorkItemsGroup(1, wigStartInfo);
  1429. foreach (Action<T> action in actions)
  1430. {
  1431. Action<T> act = action;
  1432. workItemsGroup.QueueWorkItem(() => act(pipeState));
  1433. }
  1434. workItemsGroup.Start();
  1435. workItemsGroup.WaitForIdle();
  1436. }
  1437. /// <summary>
  1438. /// Executes actions in sequence asynchronously.
  1439. /// Returns immediately.
  1440. /// </summary>
  1441. /// <param name="pipeState"></param>
  1442. /// <param name="actions">Actions to execute in the order they should run</param>
  1443. public void Pipe<T>(T pipeState, params Action<T>[] actions)
  1444. {
  1445. Pipe(pipeState, (IEnumerable<Action<T>>)actions);
  1446. }
  1447. #endregion
  1448. }
  1449. #endregion
  1450. }