123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512 |
- // Ami Bar
- // [email protected]
- using System;
- using System.Threading;
- using System.Runtime.CompilerServices;
- using System.Diagnostics;
- namespace Amib.Threading.Internal
- {
- #region WorkItemsGroup class
- /// <summary>
- /// Summary description for WorkItemsGroup.
- /// </summary>
- public class WorkItemsGroup : IWorkItemsGroup
- {
- #region Private members
- private object _lock = new object();
- /// <summary>
- /// Contains the name of this instance of SmartThreadPool.
- /// Can be changed by the user.
- /// </summary>
- private string _name = "WorkItemsGroup";
- /// <summary>
- /// A reference to the SmartThreadPool instance that created this
- /// WorkItemsGroup.
- /// </summary>
- private SmartThreadPool _stp;
- /// <summary>
- /// The OnIdle event
- /// </summary>
- private event WorkItemsGroupIdleHandler _onIdle;
- /// <summary>
- /// Defines how many work items of this WorkItemsGroup can run at once.
- /// </summary>
- private int _concurrency;
- /// <summary>
- /// Priority queue to hold work items before they are passed
- /// to the SmartThreadPool.
- /// </summary>
- private PriorityQueue _workItemsQueue;
- /// <summary>
- /// Indicate how many work items are waiting in the SmartThreadPool
- /// queue.
- /// This value is used to apply the concurrency.
- /// </summary>
- private int _workItemsInStpQueue;
- /// <summary>
- /// Indicate how many work items are currently running in the SmartThreadPool.
- /// This value is used with the Cancel, to calculate if we can send new
- /// work items to the STP.
- /// </summary>
- private int _workItemsExecutingInStp = 0;
- /// <summary>
- /// WorkItemsGroup start information
- /// </summary>
- private WIGStartInfo _workItemsGroupStartInfo;
- /// <summary>
- /// Signaled when all of the WorkItemsGroup's work item completed.
- /// </summary>
- private ManualResetEvent _isIdleWaitHandle = new ManualResetEvent(true);
- /// <summary>
- /// A common object for all the work items that this work items group
- /// generate so we can mark them to cancel in O(1)
- /// </summary>
- private CanceledWorkItemsGroup _canceledWorkItemsGroup = new CanceledWorkItemsGroup();
- #endregion
- #region Construction
- public WorkItemsGroup(
- SmartThreadPool stp,
- int concurrency,
- WIGStartInfo wigStartInfo)
- {
- if (concurrency <= 0)
- {
- throw new ArgumentOutOfRangeException("concurrency", concurrency, "concurrency must be greater than zero");
- }
- _stp = stp;
- _concurrency = concurrency;
- _workItemsGroupStartInfo = new WIGStartInfo(wigStartInfo);
- _workItemsQueue = new PriorityQueue();
- // The _workItemsInStpQueue gets the number of currently executing work items,
- // because once a work item is executing, it cannot be cancelled.
- _workItemsInStpQueue = _workItemsExecutingInStp;
- }
- #endregion
- #region IWorkItemsGroup implementation
- /// <summary>
- /// Get/Set the name of the SmartThreadPool instance
- /// </summary>
- public string Name
- {
- get
- {
- return _name;
- }
- set
- {
- _name = value;
- }
- }
- /// <summary>
- /// Queue a work item
- /// </summary>
- /// <param name="callback">A callback to execute</param>
- /// <returns>Returns a work item result</returns>
- public IWorkItemResult QueueWorkItem(WorkItemCallback callback)
- {
- WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback);
- EnqueueToSTPNextWorkItem(workItem);
- return workItem.GetWorkItemResult();
- }
- /// <summary>
- /// Queue a work item
- /// </summary>
- /// <param name="callback">A callback to execute</param>
- /// <param name="workItemPriority">The priority of the work item</param>
- /// <returns>Returns a work item result</returns>
- public IWorkItemResult QueueWorkItem(WorkItemCallback callback, WorkItemPriority workItemPriority)
- {
- WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, workItemPriority);
- EnqueueToSTPNextWorkItem(workItem);
- return workItem.GetWorkItemResult();
- }
- /// <summary>
- /// Queue a work item
- /// </summary>
- /// <param name="workItemInfo">Work item info</param>
- /// <param name="callback">A callback to execute</param>
- /// <returns>Returns a work item result</returns>
- public IWorkItemResult QueueWorkItem(WorkItemInfo workItemInfo, WorkItemCallback callback)
- {
- WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, workItemInfo, callback);
- EnqueueToSTPNextWorkItem(workItem);
- return workItem.GetWorkItemResult();
- }
- /// <summary>
- /// Queue a work item
- /// </summary>
- /// <param name="callback">A callback to execute</param>
- /// <param name="state">
- /// The context object of the work item. Used for passing arguments to the work item.
- /// </param>
- /// <returns>Returns a work item result</returns>
- public IWorkItemResult QueueWorkItem(WorkItemCallback callback, object state)
- {
- WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, state);
- EnqueueToSTPNextWorkItem(workItem);
- return workItem.GetWorkItemResult();
- }
- /// <summary>
- /// Queue a work item
- /// </summary>
- /// <param name="callback">A callback to execute</param>
- /// <param name="state">
- /// The context object of the work item. Used for passing arguments to the work item.
- /// </param>
- /// <param name="workItemPriority">The work item priority</param>
- /// <returns>Returns a work item result</returns>
- public IWorkItemResult QueueWorkItem(WorkItemCallback callback, object state, WorkItemPriority workItemPriority)
- {
- WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, state, workItemPriority);
- EnqueueToSTPNextWorkItem(workItem);
- return workItem.GetWorkItemResult();
- }
- /// <summary>
- /// Queue a work item
- /// </summary>
- /// <param name="workItemInfo">Work item information</param>
- /// <param name="callback">A callback to execute</param>
- /// <param name="state">
- /// The context object of the work item. Used for passing arguments to the work item.
- /// </param>
- /// <returns>Returns a work item result</returns>
- public IWorkItemResult QueueWorkItem(WorkItemInfo workItemInfo, WorkItemCallback callback, object state)
- {
- WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, workItemInfo, callback, state);
- EnqueueToSTPNextWorkItem(workItem);
- return workItem.GetWorkItemResult();
- }
- /// <summary>
- /// Queue a work item
- /// </summary>
- /// <param name="callback">A callback to execute</param>
- /// <param name="state">
- /// The context object of the work item. Used for passing arguments to the work item.
- /// </param>
- /// <param name="postExecuteWorkItemCallback">
- /// A delegate to call after the callback completion
- /// </param>
- /// <returns>Returns a work item result</returns>
- public IWorkItemResult QueueWorkItem(
- WorkItemCallback callback,
- object state,
- PostExecuteWorkItemCallback postExecuteWorkItemCallback)
- {
- WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, state, postExecuteWorkItemCallback);
- EnqueueToSTPNextWorkItem(workItem);
- return workItem.GetWorkItemResult();
- }
- /// <summary>
- /// Queue a work item
- /// </summary>
- /// <param name="callback">A callback to execute</param>
- /// <param name="state">
- /// The context object of the work item. Used for passing arguments to the work item.
- /// </param>
- /// <param name="postExecuteWorkItemCallback">
- /// A delegate to call after the callback completion
- /// </param>
- /// <param name="workItemPriority">The work item priority</param>
- /// <returns>Returns a work item result</returns>
- public IWorkItemResult QueueWorkItem(
- WorkItemCallback callback,
- object state,
- PostExecuteWorkItemCallback postExecuteWorkItemCallback,
- WorkItemPriority workItemPriority)
- {
- WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, state, postExecuteWorkItemCallback, workItemPriority);
- EnqueueToSTPNextWorkItem(workItem);
- return workItem.GetWorkItemResult();
- }
- /// <summary>
- /// Queue a work item
- /// </summary>
- /// <param name="callback">A callback to execute</param>
- /// <param name="state">
- /// The context object of the work item. Used for passing arguments to the work item.
- /// </param>
- /// <param name="postExecuteWorkItemCallback">
- /// A delegate to call after the callback completion
- /// </param>
- /// <param name="callToPostExecute">Indicates on which cases to call to the post execute callback</param>
- /// <returns>Returns a work item result</returns>
- public IWorkItemResult QueueWorkItem(
- WorkItemCallback callback,
- object state,
- PostExecuteWorkItemCallback postExecuteWorkItemCallback,
- CallToPostExecute callToPostExecute)
- {
- WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, state, postExecuteWorkItemCallback, callToPostExecute);
- EnqueueToSTPNextWorkItem(workItem);
- return workItem.GetWorkItemResult();
- }
- /// <summary>
- /// Queue a work item
- /// </summary>
- /// <param name="callback">A callback to execute</param>
- /// <param name="state">
- /// The context object of the work item. Used for passing arguments to the work item.
- /// </param>
- /// <param name="postExecuteWorkItemCallback">
- /// A delegate to call after the callback completion
- /// </param>
- /// <param name="callToPostExecute">Indicates on which cases to call to the post execute callback</param>
- /// <param name="workItemPriority">The work item priority</param>
- /// <returns>Returns a work item result</returns>
- public IWorkItemResult QueueWorkItem(
- WorkItemCallback callback,
- object state,
- PostExecuteWorkItemCallback postExecuteWorkItemCallback,
- CallToPostExecute callToPostExecute,
- WorkItemPriority workItemPriority)
- {
- WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, state, postExecuteWorkItemCallback, callToPostExecute, workItemPriority);
- EnqueueToSTPNextWorkItem(workItem);
- return workItem.GetWorkItemResult();
- }
- /// <summary>
- /// Wait for the thread pool to be idle
- /// </summary>
- public void WaitForIdle()
- {
- WaitForIdle(Timeout.Infinite);
- }
- /// <summary>
- /// Wait for the thread pool to be idle
- /// </summary>
- public bool WaitForIdle(TimeSpan timeout)
- {
- return WaitForIdle((int)timeout.TotalMilliseconds);
- }
- /// <summary>
- /// Wait for the thread pool to be idle
- /// </summary>
- public bool WaitForIdle(int millisecondsTimeout)
- {
- _stp.ValidateWorkItemsGroupWaitForIdle(this);
- return _isIdleWaitHandle.WaitOne(millisecondsTimeout, false);
- }
- public int WaitingCallbacks
- {
- get
- {
- return _workItemsQueue.Count;
- }
- }
- public event WorkItemsGroupIdleHandler OnIdle
- {
- add
- {
- _onIdle += value;
- }
- remove
- {
- _onIdle -= value;
- }
- }
- public void Cancel()
- {
- lock(_lock)
- {
- _canceledWorkItemsGroup.IsCanceled = true;
- _workItemsQueue.Clear();
- _workItemsInStpQueue = 0;
- _canceledWorkItemsGroup = new CanceledWorkItemsGroup();
- }
- }
- public void Start()
- {
- lock (this)
- {
- if (!_workItemsGroupStartInfo.StartSuspended)
- {
- return;
- }
- _workItemsGroupStartInfo.StartSuspended = false;
- }
-
- for(int i = 0; i < _concurrency; ++i)
- {
- EnqueueToSTPNextWorkItem(null, false);
- }
- }
- #endregion
- #region Private methods
- private void RegisterToWorkItemCompletion(IWorkItemResult wir)
- {
- IInternalWorkItemResult iwir = wir as IInternalWorkItemResult;
- iwir.OnWorkItemStarted += new WorkItemStateCallback(OnWorkItemStartedCallback);
- iwir.OnWorkItemCompleted += new WorkItemStateCallback(OnWorkItemCompletedCallback);
- }
- public void OnSTPIsStarting()
- {
- lock (this)
- {
- if (_workItemsGroupStartInfo.StartSuspended)
- {
- return;
- }
- }
-
- for(int i = 0; i < _concurrency; ++i)
- {
- EnqueueToSTPNextWorkItem(null, false);
- }
- }
- private object FireOnIdle(object state)
- {
- FireOnIdleImpl(_onIdle);
- return null;
- }
- [MethodImpl(MethodImplOptions.NoInlining)]
- private void FireOnIdleImpl(WorkItemsGroupIdleHandler onIdle)
- {
- if(null == onIdle)
- {
- return;
- }
- Delegate[] delegates = onIdle.GetInvocationList();
- foreach(WorkItemsGroupIdleHandler eh in delegates)
- {
- try
- {
- eh(this);
- }
- // Ignore exceptions
- catch{}
- }
- }
- private void OnWorkItemStartedCallback(WorkItem workItem)
- {
- lock(_lock)
- {
- ++_workItemsExecutingInStp;
- }
- }
- private void OnWorkItemCompletedCallback(WorkItem workItem)
- {
- EnqueueToSTPNextWorkItem(null, true);
- }
- private void EnqueueToSTPNextWorkItem(WorkItem workItem)
- {
- EnqueueToSTPNextWorkItem(workItem, false);
- }
- private void EnqueueToSTPNextWorkItem(WorkItem workItem, bool decrementWorkItemsInStpQueue)
- {
- lock(_lock)
- {
- // Got here from OnWorkItemCompletedCallback()
- if (decrementWorkItemsInStpQueue)
- {
- --_workItemsInStpQueue;
- if(_workItemsInStpQueue < 0)
- {
- _workItemsInStpQueue = 0;
- }
- --_workItemsExecutingInStp;
- if(_workItemsExecutingInStp < 0)
- {
- _workItemsExecutingInStp = 0;
- }
- }
- // If the work item is not null then enqueue it
- if (null != workItem)
- {
- workItem.CanceledWorkItemsGroup = _canceledWorkItemsGroup;
- RegisterToWorkItemCompletion(workItem.GetWorkItemResult());
- _workItemsQueue.Enqueue(workItem);
- //_stp.IncrementWorkItemsCount();
- if ((1 == _workItemsQueue.Count) &&
- (0 == _workItemsInStpQueue))
- {
- _stp.RegisterWorkItemsGroup(this);
- Trace.WriteLine("WorkItemsGroup " + Name + " is NOT idle");
- _isIdleWaitHandle.Reset();
- }
- }
- // If the work items queue of the group is empty than quit
- if (0 == _workItemsQueue.Count)
- {
- if (0 == _workItemsInStpQueue)
- {
- _stp.UnregisterWorkItemsGroup(this);
- Trace.WriteLine("WorkItemsGroup " + Name + " is idle");
- _isIdleWaitHandle.Set();
- _stp.QueueWorkItem(new WorkItemCallback(this.FireOnIdle));
- }
- return;
- }
- if (!_workItemsGroupStartInfo.StartSuspended)
- {
- if (_workItemsInStpQueue < _concurrency)
- {
- WorkItem nextWorkItem = _workItemsQueue.Dequeue() as WorkItem;
- _stp.Enqueue(nextWorkItem, true);
- ++_workItemsInStpQueue;
- }
- }
- }
- }
- #endregion
- }
- #endregion
- }
|