using System; using System.Threading; using System.Diagnostics; namespace Amib.Threading.Internal { /// /// Holds a callback delegate and the state for that delegate. /// public partial class WorkItem { #region WorkItemState enum /// /// Indicates the state of the work item in the thread pool /// private enum WorkItemState { InQueue = 0, // Nexts: InProgress, Canceled InProgress = 1, // Nexts: Completed, Canceled Completed = 2, // Stays Completed Canceled = 3, // Stays Canceled } private static bool IsValidStatesTransition(WorkItemState currentState, WorkItemState nextState) { bool valid = false; switch (currentState) { case WorkItemState.InQueue: valid = (WorkItemState.InProgress == nextState) || (WorkItemState.Canceled == nextState); break; case WorkItemState.InProgress: valid = (WorkItemState.Completed == nextState) || (WorkItemState.Canceled == nextState); break; case WorkItemState.Completed: case WorkItemState.Canceled: // Cannot be changed break; default: // Unknown state Debug.Assert(false); break; } return valid; } #endregion #region Fields /// /// Callback delegate for the callback. /// private WorkItemCallback _callback; private WaitCallback _callbackNoResult; /// /// State with which to call the callback delegate. /// private object _state; /// /// Stores the caller's context /// private ExecutionContext _callerContext = null; /// /// Holds the result of the mehtod /// private object _result; /// /// Hold the exception if the method threw it /// private Exception _exception; /// /// Hold the state of the work item /// private WorkItemState _workItemState; /// /// A ManualResetEvent to indicate that the result is ready /// private ManualResetEvent _workItemCompleted; /// /// A reference count to the _workItemCompleted. /// When it reaches to zero _workItemCompleted is Closed /// private int _workItemCompletedRefCount; /// /// Represents the result state of the work item /// private readonly WorkItemResult _workItemResult; /// /// Work item info /// private readonly WorkItemInfo _workItemInfo; /// /// Called when the WorkItem starts /// private event WorkItemStateCallback _workItemStartedEvent; /// /// Called when the WorkItem completes /// private event WorkItemStateCallback _workItemCompletedEvent; /// /// A reference to an object that indicates whatever the /// WorkItemsGroup has been canceled /// private CanceledWorkItemsGroup _canceledWorkItemsGroup = CanceledWorkItemsGroup.NotCanceledWorkItemsGroup; /// /// A reference to an object that indicates whatever the /// SmartThreadPool has been canceled /// private CanceledWorkItemsGroup _canceledSmartThreadPool = CanceledWorkItemsGroup.NotCanceledWorkItemsGroup; /// /// The work item group this work item belong to. /// private readonly IWorkItemsGroup _workItemsGroup; /// /// The thread that executes this workitem. /// This field is available for the period when the work item is executed, before and after it is null. /// private Thread _executingThread; /// /// The absulote time when the work item will be timeout /// private long _expirationTime; #region Performance Counter fields /// /// Stores how long the work item waited on the stp queue /// private Stopwatch _waitingOnQueueStopwatch; /// /// Stores how much time it took the work item to execute after it went out of the queue /// private Stopwatch _processingStopwatch; #endregion #endregion #region Properties public TimeSpan WaitingTime { get { return _waitingOnQueueStopwatch.Elapsed; } } public TimeSpan ProcessTime { get { return _processingStopwatch.Elapsed; } } internal WorkItemInfo WorkItemInfo { get { return _workItemInfo; } } #endregion #region Construction /// /// Initialize the callback holding object. /// /// The workItemGroup of the workitem /// The WorkItemInfo of te workitem /// Callback delegate for the callback. /// State with which to call the callback delegate. /// /// We assume that the WorkItem object is created within the thread /// that meant to run the callback public WorkItem(IWorkItemsGroup workItemsGroup, WorkItemInfo workItemInfo, WorkItemCallback callback, object state) { _workItemsGroup = workItemsGroup; _workItemInfo = workItemInfo; if (_workItemInfo.UseCallerCallContext && !ExecutionContext.IsFlowSuppressed()) { ExecutionContext ec = ExecutionContext.Capture(); if (ec != null) _callerContext = ec.CreateCopy(); ec.Dispose(); ec = null; } _callback = callback; _callbackNoResult = null; _state = state; _workItemResult = new WorkItemResult(this); Initialize(); } public WorkItem(IWorkItemsGroup workItemsGroup, WorkItemInfo workItemInfo, WaitCallback callback, object state) { _workItemsGroup = workItemsGroup; _workItemInfo = workItemInfo; if (_workItemInfo.UseCallerCallContext && !ExecutionContext.IsFlowSuppressed()) { ExecutionContext ec = ExecutionContext.Capture(); if (ec != null) _callerContext = ec.CreateCopy(); ec.Dispose(); ec = null; } _callbackNoResult = callback; _state = state; _workItemResult = new WorkItemResult(this); Initialize(); } internal void Initialize() { // The _workItemState is changed directly instead of using the SetWorkItemState // method since we don't want to go throught IsValidStateTransition. _workItemState = WorkItemState.InQueue; _workItemCompleted = null; _workItemCompletedRefCount = 0; _waitingOnQueueStopwatch = new Stopwatch(); _processingStopwatch = new Stopwatch(); _expirationTime = _workItemInfo.Timeout > 0 ? DateTime.UtcNow.Ticks + _workItemInfo.Timeout * TimeSpan.TicksPerMillisecond : long.MaxValue; } internal bool WasQueuedBy(IWorkItemsGroup workItemsGroup) { return (workItemsGroup == _workItemsGroup); } #endregion #region Methods internal CanceledWorkItemsGroup CanceledWorkItemsGroup { get { return _canceledWorkItemsGroup; } set { _canceledWorkItemsGroup = value; } } internal CanceledWorkItemsGroup CanceledSmartThreadPool { get { return _canceledSmartThreadPool; } set { _canceledSmartThreadPool = value; } } /// /// Change the state of the work item to in progress if it wasn't canceled. /// /// /// Return true on success or false in case the work item was canceled. /// If the work item needs to run a post execute then the method will return true. /// public bool StartingWorkItem() { _waitingOnQueueStopwatch.Stop(); _processingStopwatch.Start(); lock (this) { if (IsCanceled) { if ((_workItemInfo.PostExecuteWorkItemCallback != null) && ((_workItemInfo.CallToPostExecute & CallToPostExecute.WhenWorkItemCanceled) == CallToPostExecute.WhenWorkItemCanceled)) { return true; } return false; } Debug.Assert(WorkItemState.InQueue == GetWorkItemState()); // No need for a lock yet, only after the state has changed to InProgress _executingThread = Thread.CurrentThread; SetWorkItemState(WorkItemState.InProgress); } return true; } /// /// Execute the work item and the post execute /// public void Execute() { CallToPostExecute currentCallToPostExecute = 0; // Execute the work item if we are in the correct state switch (GetWorkItemState()) { case WorkItemState.InProgress: currentCallToPostExecute |= CallToPostExecute.WhenWorkItemNotCanceled; ExecuteWorkItem(); break; case WorkItemState.Canceled: currentCallToPostExecute |= CallToPostExecute.WhenWorkItemCanceled; break; default: Debug.Assert(false); throw new NotSupportedException(); } // Run the post execute as needed if ((currentCallToPostExecute & _workItemInfo.CallToPostExecute) != 0) { PostExecute(); } _processingStopwatch.Stop(); } internal void FireWorkItemCompleted() { try { _workItemCompletedEvent?.Invoke(this); } catch // Suppress exceptions { } } internal void FireWorkItemStarted() { try { _workItemStartedEvent?.Invoke(this); } catch // Suppress exceptions { } } /// /// Execute the work item /// private void ExecuteWorkItem() { Exception exception = null; object result = null; try { try { if(_callbackNoResult == null) { if(_callerContext == null) result = _callback(_state); else { ContextCallback _ccb = new ContextCallback( o => { result =_callback(o); }); ExecutionContext.Run(_callerContext, _ccb, _state); } } else { if (_callerContext == null) _callbackNoResult(_state); else { ContextCallback _ccb = new ContextCallback(o => { _callbackNoResult(o); }); ExecutionContext.Run(_callerContext, _ccb, _state); } } } catch (Exception e) { // Save the exception so we can rethrow it later exception = e; } // Remove the value of the execution thread, so it will be impossible to cancel the work item, // since it is already completed. // Cancelling a work item that already completed may cause the abortion of the next work item!!! Thread executionThread = Interlocked.CompareExchange(ref _executingThread, null, _executingThread); if (null == executionThread) { // Oops! we are going to be aborted..., Wait here so we can catch the ThreadAbortException Thread.Sleep(60 * 1000); // If after 1 minute this thread was not aborted then let it continue working. } } // We must treat the ThreadAbortException or else it will be stored in the exception variable catch (ThreadAbortException tae) { // Check if the work item was cancelled // If we got a ThreadAbortException and the STP is not shutting down, it means the // work items was cancelled. tae.GetHashCode(); if (!SmartThreadPool.CurrentThreadEntry.AssociatedSmartThreadPool.IsShuttingdown) { Thread.ResetAbort(); } } if (!SmartThreadPool.IsWorkItemCanceled) { SetResult(result, exception); } } /// /// Runs the post execute callback /// private void PostExecute() { if (null != _workItemInfo.PostExecuteWorkItemCallback) { try { _workItemInfo.PostExecuteWorkItemCallback(_workItemResult); } catch (Exception e) { Debug.Assert(null != e); } } } /// /// Set the result of the work item to return /// /// The result of the work item /// The exception that was throw while the workitem executed, null /// if there was no exception. internal void SetResult(object result, Exception exception) { _result = result; _exception = exception; SignalComplete(false); } /// /// Returns the work item result /// /// The work item result internal IWorkItemResult GetWorkItemResult() { return _workItemResult; } /// /// Wait for all work items to complete /// /// Array of work item result objects /// The number of milliseconds to wait, or Timeout.Infinite (-1) to wait indefinitely. /// /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false. /// /// A cancel wait handle to interrupt the wait if needed /// /// true when every work item in waitableResults has completed; otherwise false. /// internal static bool WaitAll( IWaitableResult[] waitableResults, int millisecondsTimeout, bool exitContext, WaitHandle cancelWaitHandle) { if (0 == waitableResults.Length) { return true; } bool success; WaitHandle[] waitHandles = new WaitHandle[waitableResults.Length]; GetWaitHandles(waitableResults, waitHandles); if ((null == cancelWaitHandle) && (waitHandles.Length <= 64)) { success = STPEventWaitHandle.WaitAll(waitHandles, millisecondsTimeout, exitContext); } else { success = true; int millisecondsLeft = millisecondsTimeout; Stopwatch stopwatch = Stopwatch.StartNew(); WaitHandle[] whs; if (null != cancelWaitHandle) { whs = new WaitHandle[] { null, cancelWaitHandle }; } else { whs = new WaitHandle[] { null }; } bool waitInfinitely = (Timeout.Infinite == millisecondsTimeout); // Iterate over the wait handles and wait for each one to complete. // We cannot use WaitHandle.WaitAll directly, because the cancelWaitHandle // won't affect it. // Each iteration we update the time left for the timeout. for (int i = 0; i < waitableResults.Length; ++i) { // WaitAny don't work with negative numbers if (!waitInfinitely && (millisecondsLeft < 0)) { success = false; break; } whs[0] = waitHandles[i]; int result = STPEventWaitHandle.WaitAny(whs, millisecondsLeft, exitContext); if ((result > 0) || (STPEventWaitHandle.WaitTimeout == result)) { success = false; break; } if (!waitInfinitely) { // Update the time left to wait millisecondsLeft = millisecondsTimeout - (int)stopwatch.ElapsedMilliseconds; } } } // Release the wait handles ReleaseWaitHandles(waitableResults); return success; } /// /// Waits for any of the work items in the specified array to complete, cancel, or timeout /// /// Array of work item result objects /// The number of milliseconds to wait, or Timeout.Infinite (-1) to wait indefinitely. /// /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false. /// /// A cancel wait handle to interrupt the wait if needed /// /// The array index of the work item result that satisfied the wait, or WaitTimeout if no work item result satisfied the wait and a time interval equivalent to millisecondsTimeout has passed or the work item has been canceled. /// internal static int WaitAny( IWaitableResult[] waitableResults, int millisecondsTimeout, bool exitContext, WaitHandle cancelWaitHandle) { WaitHandle[] waitHandles; if (null != cancelWaitHandle) { waitHandles = new WaitHandle[waitableResults.Length + 1]; GetWaitHandles(waitableResults, waitHandles); waitHandles[waitableResults.Length] = cancelWaitHandle; } else { waitHandles = new WaitHandle[waitableResults.Length]; GetWaitHandles(waitableResults, waitHandles); } int result = STPEventWaitHandle.WaitAny(waitHandles, millisecondsTimeout, exitContext); // Treat cancel as timeout if (null != cancelWaitHandle) { if (result == waitableResults.Length) { result = STPEventWaitHandle.WaitTimeout; } } ReleaseWaitHandles(waitableResults); return result; } /// /// Fill an array of wait handles with the work items wait handles. /// /// An array of work item results /// An array of wait handles to fill private static void GetWaitHandles( IWaitableResult[] waitableResults, WaitHandle[] waitHandles) { for (int i = 0; i < waitableResults.Length; ++i) { WorkItemResult wir = waitableResults[i].GetWorkItemResult() as WorkItemResult; Debug.Assert(null != wir, "All waitableResults must be WorkItemResult objects"); waitHandles[i] = wir.GetWorkItem().GetWaitHandle(); } } /// /// Release the work items' wait handles /// /// An array of work item results private static void ReleaseWaitHandles(IWaitableResult[] waitableResults) { for (int i = 0; i < waitableResults.Length; ++i) { WorkItemResult wir = (WorkItemResult)waitableResults[i].GetWorkItemResult(); wir.GetWorkItem().ReleaseWaitHandle(); } } #endregion #region Private Members private WorkItemState GetWorkItemState() { lock (this) { if (WorkItemState.Completed == _workItemState) { return _workItemState; } if (WorkItemState.Canceled != _workItemState && DateTime.UtcNow.Ticks > _expirationTime) { _workItemState = WorkItemState.Canceled; return _workItemState; } if(WorkItemState.InProgress != _workItemState) { if (CanceledSmartThreadPool.IsCanceled || CanceledWorkItemsGroup.IsCanceled) { return WorkItemState.Canceled; } } return _workItemState; } } /// /// Sets the work item's state /// /// The state to set the work item to private void SetWorkItemState(WorkItemState workItemState) { lock (this) { if (IsValidStatesTransition(_workItemState, workItemState)) { _workItemState = workItemState; } } } /// /// Signals that work item has been completed or canceled /// /// Indicates that the work item has been canceled private void SignalComplete(bool canceled) { SetWorkItemState(canceled ? WorkItemState.Canceled : WorkItemState.Completed); lock (this) { // If someone is waiting then signal. if (null != _workItemCompleted) { _workItemCompleted.Set(); } } } internal void WorkItemIsQueued() { _waitingOnQueueStopwatch.Start(); } #endregion #region Members exposed by WorkItemResult /// /// Cancel the work item if it didn't start running yet. /// /// Returns true on success or false if the work item is in progress or already completed private bool Cancel(bool abortExecution) { bool success = false; bool signalComplete = false; lock (this) { switch (GetWorkItemState()) { case WorkItemState.Canceled: //Debug.WriteLine("Work item already canceled"); if (abortExecution) { Thread executionThread = Interlocked.CompareExchange(ref _executingThread, null, _executingThread); if (null != executionThread) { executionThread.Abort(); // "Cancel" // No need to signalComplete, because we already cancelled this work item // so it already signaled its completion. //signalComplete = true; } } success = true; break; case WorkItemState.Completed: //Debug.WriteLine("Work item cannot be canceled"); break; case WorkItemState.InProgress: if (abortExecution) { Thread executionThread = Interlocked.CompareExchange(ref _executingThread, null, _executingThread); if (null != executionThread) { executionThread.Abort(); // "Cancel" success = true; signalComplete = true; } } else { // ************************** // Stock SmartThreadPool 2.2.3 sets these to true and relies on the thread to check the // WorkItem cancellation status. However, OpenSimulator uses a different mechanism to notify // scripts of co-operative termination and the abort code also relies on this method // returning false in order to implement a small wait. // // Therefore, as was the case previously with STP, we will not signal successful cancellation // here. It's possible that OpenSimulator code could be changed in the future to remove // the need for this change. // ************************** success = false; signalComplete = false; } break; case WorkItemState.InQueue: // Signal to the wait for completion that the work // item has been completed (canceled). There is no // reason to wait for it to get out of the queue signalComplete = true; //Debug.WriteLine("Work item canceled"); success = true; break; } if (signalComplete) { SignalComplete(true); } } return success; } /// /// Get the result of the work item. /// If the work item didn't run yet then the caller waits for the result, timeout, or cancel. /// In case of error the method throws and exception /// /// The result of the work item private object GetResult( int millisecondsTimeout, bool exitContext, WaitHandle cancelWaitHandle) { Exception e; object result = GetResult(millisecondsTimeout, exitContext, cancelWaitHandle, out e); if (null != e) { throw new WorkItemResultException("The work item caused an excpetion, see the inner exception for details", e); } return result; } /// /// Get the result of the work item. /// If the work item didn't run yet then the caller waits for the result, timeout, or cancel. /// In case of error the e argument is filled with the exception /// /// The result of the work item private object GetResult( int millisecondsTimeout, bool exitContext, WaitHandle cancelWaitHandle, out Exception e) { e = null; // Check for cancel if (WorkItemState.Canceled == GetWorkItemState()) { throw new WorkItemCancelException("Work item canceled"); } // Check for completion if (IsCompleted) { e = _exception; return _result; } // If no cancelWaitHandle is provided if (null == cancelWaitHandle) { WaitHandle wh = GetWaitHandle(); bool timeout = !STPEventWaitHandle.WaitOne(wh, millisecondsTimeout, exitContext); ReleaseWaitHandle(); if (timeout) { throw new WorkItemTimeoutException("Work item timeout"); } } else { WaitHandle wh = GetWaitHandle(); int result = STPEventWaitHandle.WaitAny(new WaitHandle[] { wh, cancelWaitHandle }); ReleaseWaitHandle(); switch (result) { case 0: // The work item signaled // Note that the signal could be also as a result of canceling the // work item (not the get result) break; case 1: case STPEventWaitHandle.WaitTimeout: throw new WorkItemTimeoutException("Work item timeout"); default: Debug.Assert(false); break; } } // Check for cancel if (WorkItemState.Canceled == GetWorkItemState()) { throw new WorkItemCancelException("Work item canceled"); } Debug.Assert(IsCompleted); e = _exception; // Return the result return _result; } /// /// A wait handle to wait for completion, cancel, or timeout /// private WaitHandle GetWaitHandle() { lock (this) { if (null == _workItemCompleted) { _workItemCompleted = new ManualResetEvent(IsCompleted); } ++_workItemCompletedRefCount; } return _workItemCompleted; } private void ReleaseWaitHandle() { lock (this) { if (null != _workItemCompleted) { --_workItemCompletedRefCount; if (0 == _workItemCompletedRefCount) { _workItemCompleted.Close(); _workItemCompleted = null; } } } } /// /// Returns true when the work item has completed or canceled /// private bool IsCompleted { get { lock (this) { WorkItemState workItemState = GetWorkItemState(); return ((workItemState == WorkItemState.Completed) || (workItemState == WorkItemState.Canceled)); } } } /// /// Returns true when the work item has canceled /// public bool IsCanceled { get { lock (this) { return (GetWorkItemState() == WorkItemState.Canceled); } } } #endregion internal event WorkItemStateCallback OnWorkItemStarted { add { _workItemStartedEvent += value; } remove { _workItemStartedEvent -= value; } } internal event WorkItemStateCallback OnWorkItemCompleted { add { _workItemCompletedEvent += value; } remove { _workItemCompletedEvent -= value; } } public void DisposeOfState() { if(_callerContext != null) { _callerContext.Dispose(); _callerContext = null; } if(_workItemCompleted != null) { _workItemCompleted.Dispose(); _workItemCompleted = null; } if (_workItemInfo.DisposeOfStateObjects) { IDisposable disp = _state as IDisposable; if (null != disp) { disp.Dispose(); _state = null; } } _callback = null; _callbackNoResult = null; } } }