using System; using System.Diagnostics; using System.Threading; namespace Amib.Threading.Internal { /// /// Holds a callback delegate and the state for that delegate. /// public partial class WorkItem { #region WorkItemState enum /// /// Indicates the state of the work item in the thread pool /// private enum WorkItemState { InQueue = 0, // Nexts: InProgress, Canceled InProgress = 1, // Nexts: Completed, Canceled Completed = 2, // Stays Completed Canceled = 3, // Stays Canceled } private static bool IsValidStatesTransition(WorkItemState currentState, WorkItemState nextState) { bool valid = false; switch (currentState) { case WorkItemState.InQueue: valid = (WorkItemState.InProgress == nextState) || (WorkItemState.Canceled == nextState); break; case WorkItemState.InProgress: valid = (WorkItemState.Completed == nextState) || (WorkItemState.Canceled == nextState); break; case WorkItemState.Completed: case WorkItemState.Canceled: // Cannot be changed break; default: // Unknown state Debug.Assert(false); break; } return valid; } #endregion #region Fields /// /// Callback delegate for the callback. /// private WorkItemCallback m_callback; private WaitCallback m_callbackNoResult; /// /// State with which to call the callback delegate. /// private object m_state; /// /// Stores the caller's context /// private ExecutionContext m_callerContext = null; /// /// Holds the result of the mehtod /// private object m_result; /// /// Hold the exception if the method threw it /// private Exception m_exception; /// /// Hold the state of the work item /// private WorkItemState m_workItemState; /// /// A ManualResetEvent to indicate that the result is ready /// private ManualResetEvent m_workItemCompleted; /// /// A reference count to the _workItemCompleted. /// When it reaches to zero _workItemCompleted is Closed /// private int m_workItemCompletedRefCount; /// /// Represents the result state of the work item /// private readonly WorkItemResult m_workItemResult; /// /// Work item info /// private readonly WorkItemInfo m_workItemInfo; /// /// Called when the WorkItem starts /// private event WorkItemStateCallback m_workItemStartedEvent; /// /// Called when the WorkItem completes /// private event WorkItemStateCallback m_workItemCompletedEvent; /// /// A reference to an object that indicates whatever the /// WorkItemsGroup has been canceled /// private CanceledWorkItemsGroup m_canceledWorkItemsGroup = CanceledWorkItemsGroup.NotCanceledWorkItemsGroup; /// /// A reference to an object that indicates whatever the /// SmartThreadPool has been canceled /// private CanceledWorkItemsGroup m_canceledSmartThreadPool = CanceledWorkItemsGroup.NotCanceledWorkItemsGroup; /// /// The work item group this work item belong to. /// private readonly IWorkItemsGroup m_workItemsGroup; /// /// The thread that executes this workitem. /// This field is available for the period when the work item is executed, before and after it is null. /// private Thread m_executingThread; /// /// The absulote time when the work item will be timeout /// private long m_expirationTime; #region Performance Counter fields /// /// Stores how long the work item waited on the stp queue /// private Stopwatch _waitingOnQueueStopwatch; /// /// Stores how much time it took the work item to execute after it went out of the queue /// private Stopwatch _processingStopwatch; #endregion #endregion #region Properties public TimeSpan WaitingTime { get { return _waitingOnQueueStopwatch.Elapsed; } } public TimeSpan ProcessTime { get { return _processingStopwatch.Elapsed; } } internal WorkItemInfo WorkItemInfo { get { return m_workItemInfo; } } #endregion #region Construction /// /// Initialize the callback holding object. /// /// The workItemGroup of the workitem /// The WorkItemInfo of te workitem /// Callback delegate for the callback. /// State with which to call the callback delegate. /// /// We assume that the WorkItem object is created within the thread /// that meant to run the callback public WorkItem(IWorkItemsGroup workItemsGroup, WorkItemInfo workItemInfo, WorkItemCallback callback, object state) { m_workItemsGroup = workItemsGroup; m_workItemInfo = workItemInfo; if (m_workItemInfo.UseCallerCallContext && !ExecutionContext.IsFlowSuppressed()) { ExecutionContext ec = ExecutionContext.Capture(); if (ec is not null) { m_callerContext = ec.CreateCopy(); ec.Dispose(); ec = null; } } m_callback = callback; m_callbackNoResult = null; m_state = state; m_workItemResult = new WorkItemResult(this); Initialize(); } public WorkItem(IWorkItemsGroup workItemsGroup, WorkItemInfo workItemInfo, WaitCallback callback, object state) { m_workItemsGroup = workItemsGroup; m_workItemInfo = workItemInfo; if (m_workItemInfo.UseCallerCallContext && !ExecutionContext.IsFlowSuppressed()) { ExecutionContext ec = ExecutionContext.Capture(); if (ec is not null) { m_callerContext = ec.CreateCopy(); ec.Dispose(); ec = null; } } m_callbackNoResult = callback; m_state = state; m_workItemResult = new WorkItemResult(this); Initialize(); } internal void Initialize() { // The _workItemState is changed directly instead of using the SetWorkItemState // method since we don't want to go throught IsValidStateTransition. m_workItemState = WorkItemState.InQueue; m_workItemCompleted = null; m_workItemCompletedRefCount = 0; _waitingOnQueueStopwatch = new Stopwatch(); _processingStopwatch = new Stopwatch(); m_expirationTime = m_workItemInfo.Timeout > 0 ? DateTime.UtcNow.Ticks + m_workItemInfo.Timeout * TimeSpan.TicksPerMillisecond : long.MaxValue; } internal bool WasQueuedBy(IWorkItemsGroup workItemsGroup) { return (workItemsGroup == m_workItemsGroup); } #endregion #region Methods internal CanceledWorkItemsGroup CanceledWorkItemsGroup { get { return m_canceledWorkItemsGroup; } set { m_canceledWorkItemsGroup = value; } } internal CanceledWorkItemsGroup CanceledSmartThreadPool { get { return m_canceledSmartThreadPool; } set { m_canceledSmartThreadPool = value; } } /// /// Change the state of the work item to in progress if it wasn't canceled. /// /// /// Return true on success or false in case the work item was canceled. /// If the work item needs to run a post execute then the method will return true. /// public bool StartingWorkItem() { _waitingOnQueueStopwatch.Stop(); _processingStopwatch.Start(); lock (this) { if (IsCanceled) { if ((m_workItemInfo.PostExecuteWorkItemCallback is not null) && ((m_workItemInfo.CallToPostExecute & CallToPostExecute.WhenWorkItemCanceled) == CallToPostExecute.WhenWorkItemCanceled)) { return true; } return false; } Debug.Assert(WorkItemState.InQueue == GetWorkItemState()); // No need for a lock yet, only after the state has changed to InProgress m_executingThread = Thread.CurrentThread; SetWorkItemState(WorkItemState.InProgress); } return true; } /// /// Execute the work item and the post execute /// public void Execute() { CallToPostExecute currentCallToPostExecute = 0; // Execute the work item if we are in the correct state switch (GetWorkItemState()) { case WorkItemState.InProgress: currentCallToPostExecute |= CallToPostExecute.WhenWorkItemNotCanceled; ExecuteWorkItem(); break; case WorkItemState.Canceled: currentCallToPostExecute |= CallToPostExecute.WhenWorkItemCanceled; break; default: Debug.Assert(false); throw new NotSupportedException(); } // Run the post execute as needed if ((currentCallToPostExecute & m_workItemInfo.CallToPostExecute) != 0) { PostExecute(); } _processingStopwatch.Stop(); } internal void FireWorkItemCompleted() { try { m_workItemCompletedEvent?.Invoke(this); } catch // Suppress exceptions { } } internal void FireWorkItemStarted() { try { m_workItemStartedEvent?.Invoke(this); } catch // Suppress exceptions { } } /// /// Execute the work item /// private void ExecuteWorkItem() { Exception exception = null; object result = null; try { try { if(m_callbackNoResult is null) { if(m_callerContext is null) result = m_callback(m_state); else { ContextCallback _ccb = new( o => { result =m_callback(o); }); ExecutionContext.Run(m_callerContext, _ccb, m_state); } } else { if (m_callerContext is null) m_callbackNoResult(m_state); else { ContextCallback _ccb = new(o => { m_callbackNoResult(o); }); ExecutionContext.Run(m_callerContext, _ccb, m_state); } } } catch (Exception e) { // Save the exception so we can rethrow it later exception = e; } // Remove the value of the execution thread, so it will be impossible to cancel the work item, // since it is already completed. // Cancelling a work item that already completed may cause the abortion of the next work item!!! Thread executionThread = Interlocked.CompareExchange(ref m_executingThread, null, m_executingThread); if (executionThread is null) { // Oops! we are going to be aborted..., Wait here so we can catch the ThreadAbortException Thread.Sleep(60 * 1000); // If after 1 minute this thread was not aborted then let it continue working. } } // We must treat the ThreadAbortException or else it will be stored in the exception variable catch (ThreadAbortException tae) { // Check if the work item was cancelled // If we got a ThreadAbortException and the STP is not shutting down, it means the // work items was cancelled. tae.GetHashCode(); //if (!SmartThreadPool.CurrentThreadEntry.AssociatedSmartThreadPool.IsShuttingdown) //{ // Thread.ResetAbort(); //} } if (!SmartThreadPool.IsWorkItemCanceled) { SetResult(result, exception); } } /// /// Runs the post execute callback /// private void PostExecute() { if (m_workItemInfo.PostExecuteWorkItemCallback is not null) { try { m_workItemInfo.PostExecuteWorkItemCallback(m_workItemResult); } catch (Exception e) { Debug.Assert(e is not null); } } } /// /// Set the result of the work item to return /// /// The result of the work item /// The exception that was throw while the workitem executed, null /// if there was no exception. internal void SetResult(object result, Exception exception) { m_result = result; m_exception = exception; SignalComplete(false); } /// /// Returns the work item result /// /// The work item result internal IWorkItemResult GetWorkItemResult() { return m_workItemResult; } /// /// Wait for all work items to complete /// /// Array of work item result objects /// The number of milliseconds to wait, or Timeout.Infinite (-1) to wait indefinitely. /// /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false. /// /// A cancel wait handle to interrupt the wait if needed /// /// true when every work item in waitableResults has completed; otherwise false. /// internal static bool WaitAll( IWaitableResult[] waitableResults, int millisecondsTimeout, bool exitContext, WaitHandle cancelWaitHandle) { if (0 == waitableResults.Length) { return true; } bool success; WaitHandle[] waitHandles = new WaitHandle[waitableResults.Length]; GetWaitHandles(waitableResults, waitHandles); if ((cancelWaitHandle is null) && (waitHandles.Length <= 64)) { success = STPEventWaitHandle.WaitAll(waitHandles, millisecondsTimeout, exitContext); } else { success = true; int millisecondsLeft = millisecondsTimeout; Stopwatch stopwatch = Stopwatch.StartNew(); WaitHandle[] whs = cancelWaitHandle is null ? new WaitHandle[] { null } : new WaitHandle[] { null, cancelWaitHandle }; bool waitInfinitely = (Timeout.Infinite == millisecondsTimeout); // Iterate over the wait handles and wait for each one to complete. // We cannot use WaitHandle.WaitAll directly, because the cancelWaitHandle // won't affect it. // Each iteration we update the time left for the timeout. for (int i = 0; i < waitableResults.Length; ++i) { // WaitAny don't work with negative numbers if (!waitInfinitely && (millisecondsLeft < 0)) { success = false; break; } whs[0] = waitHandles[i]; int result = STPEventWaitHandle.WaitAny(whs, millisecondsLeft, exitContext); if ((result > 0) || (STPEventWaitHandle.WaitTimeout == result)) { success = false; break; } if (!waitInfinitely) { // Update the time left to wait millisecondsLeft = millisecondsTimeout - (int)stopwatch.ElapsedMilliseconds; } } } // Release the wait handles ReleaseWaitHandles(waitableResults); return success; } /// /// Waits for any of the work items in the specified array to complete, cancel, or timeout /// /// Array of work item result objects /// The number of milliseconds to wait, or Timeout.Infinite (-1) to wait indefinitely. /// /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false. /// /// A cancel wait handle to interrupt the wait if needed /// /// The array index of the work item result that satisfied the wait, or WaitTimeout if no work item result satisfied the wait and a time interval equivalent to millisecondsTimeout has passed or the work item has been canceled. /// internal static int WaitAny( IWaitableResult[] waitableResults, int millisecondsTimeout, bool exitContext, WaitHandle cancelWaitHandle) { WaitHandle[] waitHandles; if (cancelWaitHandle is not null) { waitHandles = new WaitHandle[waitableResults.Length + 1]; GetWaitHandles(waitableResults, waitHandles); waitHandles[waitableResults.Length] = cancelWaitHandle; } else { waitHandles = new WaitHandle[waitableResults.Length]; GetWaitHandles(waitableResults, waitHandles); } int result = STPEventWaitHandle.WaitAny(waitHandles, millisecondsTimeout, exitContext); // Treat cancel as timeout if (cancelWaitHandle is not null) { if (result == waitableResults.Length) { result = STPEventWaitHandle.WaitTimeout; } } ReleaseWaitHandles(waitableResults); return result; } /// /// Fill an array of wait handles with the work items wait handles. /// /// An array of work item results /// An array of wait handles to fill private static void GetWaitHandles(IWaitableResult[] waitableResults, WaitHandle[] waitHandles) { for (int i = 0; i < waitableResults.Length; ++i) { WorkItemResult wir = waitableResults[i].GetWorkItemResult() as WorkItemResult; Debug.Assert(wir is not null, "All waitableResults must be WorkItemResult objects"); waitHandles[i] = wir.GetWorkItem().GetWaitHandle(); } } /// /// Release the work items' wait handles /// /// An array of work item results private static void ReleaseWaitHandles(IWaitableResult[] waitableResults) { for (int i = 0; i < waitableResults.Length; ++i) { WorkItemResult wir = (WorkItemResult)waitableResults[i].GetWorkItemResult(); wir.GetWorkItem().ReleaseWaitHandle(); } } #endregion #region Private Members private WorkItemState GetWorkItemState() { lock (this) { if (WorkItemState.Completed == m_workItemState) { return m_workItemState; } if (WorkItemState.Canceled != m_workItemState && DateTime.UtcNow.Ticks > m_expirationTime) { m_workItemState = WorkItemState.Canceled; return m_workItemState; } if(WorkItemState.InProgress != m_workItemState) { if (CanceledSmartThreadPool.IsCanceled || CanceledWorkItemsGroup.IsCanceled) { return WorkItemState.Canceled; } } return m_workItemState; } } /// /// Sets the work item's state /// /// The state to set the work item to private void SetWorkItemState(WorkItemState workItemState) { lock (this) { if (IsValidStatesTransition(m_workItemState, workItemState)) { m_workItemState = workItemState; } } } /// /// Signals that work item has been completed or canceled /// /// Indicates that the work item has been canceled private void SignalComplete(bool canceled) { SetWorkItemState(canceled ? WorkItemState.Canceled : WorkItemState.Completed); lock (this) { // If someone is waiting then signal. m_workItemCompleted?.Set(); } } internal void WorkItemIsQueued() { _waitingOnQueueStopwatch.Start(); } #endregion #region Members exposed by WorkItemResult /// /// Cancel the work item if it didn't start running yet. /// /// Returns true on success or false if the work item is in progress or already completed private bool Cancel(bool abortExecution) { bool success = false; bool signalComplete = false; lock (this) { switch (GetWorkItemState()) { case WorkItemState.Canceled: //Debug.WriteLine("Work item already canceled"); if (abortExecution) { Thread executionThread = Interlocked.CompareExchange(ref m_executingThread, null, m_executingThread); if (executionThread is not null) { //executionThread.Abort(); // "Cancel" // No need to signalComplete, because we already cancelled this work item // so it already signaled its completion. //signalComplete = true; } } success = true; break; case WorkItemState.Completed: //Debug.WriteLine("Work item cannot be canceled"); break; case WorkItemState.InProgress: if (abortExecution) { Thread executionThread = Interlocked.CompareExchange(ref m_executingThread, null, m_executingThread); if (executionThread is not null) { //executionThread.Abort(); // "Cancel" success = true; signalComplete = true; } } else { // ************************** // Stock SmartThreadPool 2.2.3 sets these to true and relies on the thread to check the // WorkItem cancellation status. However, OpenSimulator uses a different mechanism to notify // scripts of co-operative termination and the abort code also relies on this method // returning false in order to implement a small wait. // // Therefore, as was the case previously with STP, we will not signal successful cancellation // here. It's possible that OpenSimulator code could be changed in the future to remove // the need for this change. // ************************** success = false; signalComplete = false; } break; case WorkItemState.InQueue: // Signal to the wait for completion that the work // item has been completed (canceled). There is no // reason to wait for it to get out of the queue signalComplete = true; //Debug.WriteLine("Work item canceled"); success = true; break; } if (signalComplete) { SignalComplete(true); } } return success; } /// /// Get the result of the work item. /// If the work item didn't run yet then the caller waits for the result, timeout, or cancel. /// In case of error the method throws and exception /// /// The result of the work item private object GetResult(int millisecondsTimeout, bool exitContext, WaitHandle cancelWaitHandle) { object result = GetResult(millisecondsTimeout, exitContext, cancelWaitHandle, out Exception e); if (e is not null) { throw new WorkItemResultException("The work item caused an excpetion, see the inner exception for details", e); } return result; } /// /// Get the result of the work item. /// If the work item didn't run yet then the caller waits for the result, timeout, or cancel. /// In case of error the e argument is filled with the exception /// /// The result of the work item private object GetResult( int millisecondsTimeout, bool exitContext, WaitHandle cancelWaitHandle, out Exception e) { e = null; // Check for cancel if (WorkItemState.Canceled == GetWorkItemState()) { throw new WorkItemCancelException("Work item canceled"); } // Check for completion if (IsCompleted) { e = m_exception; return m_result; } // If no cancelWaitHandle is provided if (cancelWaitHandle is null) { WaitHandle wh = GetWaitHandle(); bool timeout = !STPEventWaitHandle.WaitOne(wh, millisecondsTimeout, exitContext); ReleaseWaitHandle(); if (timeout) { throw new WorkItemTimeoutException("Work item timeout"); } } else { WaitHandle wh = GetWaitHandle(); int result = STPEventWaitHandle.WaitAny(new WaitHandle[] { wh, cancelWaitHandle }); ReleaseWaitHandle(); switch (result) { case 0: // The work item signaled // Note that the signal could be also as a result of canceling the // work item (not the get result) break; case 1: case STPEventWaitHandle.WaitTimeout: throw new WorkItemTimeoutException("Work item timeout"); default: Debug.Assert(false); break; } } // Check for cancel if (WorkItemState.Canceled == GetWorkItemState()) { throw new WorkItemCancelException("Work item canceled"); } Debug.Assert(IsCompleted); e = m_exception; // Return the result return m_result; } /// /// A wait handle to wait for completion, cancel, or timeout /// private WaitHandle GetWaitHandle() { lock (this) { if (m_workItemCompleted is null) { m_workItemCompleted = new ManualResetEvent(IsCompleted); } ++m_workItemCompletedRefCount; } return m_workItemCompleted; } private void ReleaseWaitHandle() { lock (this) { if (m_workItemCompleted is not null) { --m_workItemCompletedRefCount; if (0 == m_workItemCompletedRefCount) { m_workItemCompleted.Close(); m_workItemCompleted = null; } } } } /// /// Returns true when the work item has completed or canceled /// private bool IsCompleted { get { lock (this) { WorkItemState workItemState = GetWorkItemState(); return ((workItemState == WorkItemState.Completed) || (workItemState == WorkItemState.Canceled)); } } } /// /// Returns true when the work item has canceled /// public bool IsCanceled { get { lock (this) { return (GetWorkItemState() == WorkItemState.Canceled); } } } #endregion internal event WorkItemStateCallback OnWorkItemStarted { add { m_workItemStartedEvent += value; } remove { m_workItemStartedEvent -= value; } } internal event WorkItemStateCallback OnWorkItemCompleted { add { m_workItemCompletedEvent += value; } remove { m_workItemCompletedEvent -= value; } } public void DisposeOfState() { if(m_callerContext is not null) { m_callerContext.Dispose(); m_callerContext = null; } if(m_workItemCompleted is not null) { m_workItemCompleted.Dispose(); m_workItemCompleted = null; } if (m_workItemInfo.DisposeOfStateObjects) { if (m_state is IDisposable disp) { disp.Dispose(); m_state = null; } } m_callback = null; m_callbackNoResult = null; } } }