123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646 |
- using System;
- using System.Collections.Generic;
- using System.Threading;
- namespace Amib.Threading.Internal
- {
- #region WorkItemsQueue class
- /// <summary>
- /// WorkItemsQueue class.
- /// </summary>
- public class WorkItemsQueue : IDisposable
- {
- #region Member variables
- /// <summary>
- /// Waiters queue (implemented as stack).
- /// </summary>
- private readonly WaiterEntry _headWaiterEntry = new WaiterEntry();
- /// <summary>
- /// Waiters count
- /// </summary>
- private int _waitersCount = 0;
- /// <summary>
- /// Work items queue
- /// </summary>
- private readonly PriorityQueue _workItems = new PriorityQueue();
- /// <summary>
- /// Indicate that work items are allowed to be queued
- /// </summary>
- private bool _isWorkItemsQueueActive = true;
- #if (WINDOWS_PHONE)
- private static readonly Dictionary<int, WaiterEntry> _waiterEntries = new Dictionary<int, WaiterEntry>();
- #elif (_WINDOWS_CE)
- private static LocalDataStoreSlot _waiterEntrySlot = Thread.AllocateDataSlot();
- #else
- [ThreadStatic]
- private static WaiterEntry _waiterEntry;
- #endif
- /// <summary>
- /// Each thread in the thread pool keeps its own waiter entry.
- /// </summary>
- private static WaiterEntry CurrentWaiterEntry
- {
- #if (WINDOWS_PHONE)
- get
- {
- lock (_waiterEntries)
- {
- WaiterEntry waiterEntry;
- if (_waiterEntries.TryGetValue(Thread.CurrentThread.ManagedThreadId, out waiterEntry))
- {
- return waiterEntry;
- }
- }
- return null;
- }
- set
- {
- lock (_waiterEntries)
- {
- _waiterEntries[Thread.CurrentThread.ManagedThreadId] = value;
- }
- }
- #elif (_WINDOWS_CE)
- get
- {
- return Thread.GetData(_waiterEntrySlot) as WaiterEntry;
- }
- set
- {
- Thread.SetData(_waiterEntrySlot, value);
- }
- #else
- get
- {
- return _waiterEntry;
- }
- set
- {
- _waiterEntry = value;
- }
- #endif
- }
- /// <summary>
- /// A flag that indicates if the WorkItemsQueue has been disposed.
- /// </summary>
- private bool _isDisposed = false;
- #endregion
- #region Public properties
- /// <summary>
- /// Returns the current number of work items in the queue
- /// </summary>
- public int Count
- {
- get
- {
- return _workItems.Count;
- }
- }
- /// <summary>
- /// Returns the current number of waiters
- /// </summary>
- public int WaitersCount
- {
- get
- {
- return _waitersCount;
- }
- }
- #endregion
- #region Public methods
- /// <summary>
- /// Enqueue a work item to the queue.
- /// </summary>
- public bool EnqueueWorkItem(WorkItem workItem)
- {
- // A work item cannot be null, since null is used in the
- // WaitForWorkItem() method to indicate timeout or cancel
- if (null == workItem)
- {
- throw new ArgumentNullException("workItem" , "workItem cannot be null");
- }
- bool enqueue = true;
- // First check if there is a waiter waiting for work item. During
- // the check, timed out waiters are ignored. If there is no
- // waiter then the work item is queued.
- lock(this)
- {
- ValidateNotDisposed();
- if (!_isWorkItemsQueueActive)
- {
- return false;
- }
- while(_waitersCount > 0)
- {
- // Dequeue a waiter.
- WaiterEntry waiterEntry = PopWaiter();
- // Signal the waiter. On success break the loop
- if (waiterEntry.Signal(workItem))
- {
- enqueue = false;
- break;
- }
- }
- if (enqueue)
- {
- // Enqueue the work item
- _workItems.Enqueue(workItem);
- }
- }
- return true;
- }
- /// <summary>
- /// Waits for a work item or exits on timeout or cancel
- /// </summary>
- /// <param name="millisecondsTimeout">Timeout in milliseconds</param>
- /// <param name="cancelEvent">Cancel wait handle</param>
- /// <returns>Returns true if the resource was granted</returns>
- public WorkItem DequeueWorkItem(
- int millisecondsTimeout,
- WaitHandle cancelEvent)
- {
- // This method cause the caller to wait for a work item.
- // If there is at least one waiting work item then the
- // method returns immidiately with it.
- //
- // If there are no waiting work items then the caller
- // is queued between other waiters for a work item to arrive.
- //
- // If a work item didn't come within millisecondsTimeout or
- // the user canceled the wait by signaling the cancelEvent
- // then the method returns null to indicate that the caller
- // didn't get a work item.
- WaiterEntry waiterEntry;
- WorkItem workItem = null;
- lock (this)
- {
- ValidateNotDisposed();
- // If there are waiting work items then take one and return.
- if (_workItems.Count > 0)
- {
- workItem = _workItems.Dequeue() as WorkItem;
- return workItem;
- }
- // No waiting work items ...
- // Get the waiter entry for the waiters queue
- waiterEntry = GetThreadWaiterEntry();
- // Put the waiter with the other waiters
- PushWaiter(waiterEntry);
- }
- // Prepare array of wait handle for the WaitHandle.WaitAny()
- WaitHandle [] waitHandles = new WaitHandle[] {
- waiterEntry.WaitHandle,
- cancelEvent };
- // Wait for an available resource, cancel event, or timeout.
- // During the wait we are supposes to exit the synchronization
- // domain. (Placing true as the third argument of the WaitAny())
- // It just doesn't work, I don't know why, so I have two lock(this)
- // statments instead of one.
- int index = STPEventWaitHandle.WaitAny(
- waitHandles,
- millisecondsTimeout,
- true);
- lock(this)
- {
- // success is true if it got a work item.
- bool success = (0 == index);
- // The timeout variable is used only for readability.
- // (We treat cancel as timeout)
- bool timeout = !success;
- // On timeout update the waiterEntry that it is timed out
- if (timeout)
- {
- // The Timeout() fails if the waiter has already been signaled
- timeout = waiterEntry.Timeout();
- // On timeout remove the waiter from the queue.
- // Note that the complexity is O(1).
- if(timeout)
- {
- RemoveWaiter(waiterEntry, false);
- }
- // Again readability
- success = !timeout;
- }
- // On success return the work item
- if (success)
- {
- workItem = waiterEntry.WorkItem;
- if (null == workItem)
- {
- workItem = _workItems.Dequeue() as WorkItem;
- }
- }
- }
- // On failure return null.
- return workItem;
- }
- /// <summary>
- /// Cleanup the work items queue, hence no more work
- /// items are allowed to be queue
- /// </summary>
- private void Cleanup()
- {
- lock(this)
- {
- // Deactivate only once
- if (!_isWorkItemsQueueActive)
- {
- return;
- }
- // Don't queue more work items
- _isWorkItemsQueueActive = false;
- foreach(WorkItem workItem in _workItems)
- {
- workItem.DisposeOfState();
- }
- // Clear the work items that are already queued
- _workItems.Clear();
- // Note:
- // I don't iterate over the queue and dispose of work items's states,
- // since if a work item has a state object that is still in use in the
- // application then I must not dispose it.
- // Tell the waiters that they were timed out.
- // It won't signal them to exit, but to ignore their
- // next work item.
- while(_waitersCount > 0)
- {
- WaiterEntry waiterEntry = PopWaiter();
- waiterEntry.Timeout();
- }
- }
- }
- public object[] GetStates()
- {
- lock (this)
- {
- object[] states = new object[_workItems.Count];
- int i = 0;
- foreach (WorkItem workItem in _workItems)
- {
- states[i] = workItem.GetWorkItemResult().State;
- ++i;
- }
- return states;
- }
- }
- #endregion
- #region Private methods
- /// <summary>
- /// Returns the WaiterEntry of the current thread
- /// </summary>
- /// <returns></returns>
- /// In order to avoid creation and destuction of WaiterEntry
- /// objects each thread has its own WaiterEntry object.
- private static WaiterEntry GetThreadWaiterEntry()
- {
- if (null == CurrentWaiterEntry)
- {
- CurrentWaiterEntry = new WaiterEntry();
- }
- CurrentWaiterEntry.Reset();
- return CurrentWaiterEntry;
- }
- #region Waiters stack methods
- /// <summary>
- /// Push a new waiter into the waiter's stack
- /// </summary>
- /// <param name="newWaiterEntry">A waiter to put in the stack</param>
- public void PushWaiter(WaiterEntry newWaiterEntry)
- {
- // Remove the waiter if it is already in the stack and
- // update waiter's count as needed
- RemoveWaiter(newWaiterEntry, false);
- // If the stack is empty then newWaiterEntry is the new head of the stack
- if (null == _headWaiterEntry._nextWaiterEntry)
- {
- _headWaiterEntry._nextWaiterEntry = newWaiterEntry;
- newWaiterEntry._prevWaiterEntry = _headWaiterEntry;
- }
- // If the stack is not empty then put newWaiterEntry as the new head
- // of the stack.
- else
- {
- // Save the old first waiter entry
- WaiterEntry oldFirstWaiterEntry = _headWaiterEntry._nextWaiterEntry;
- // Update the links
- _headWaiterEntry._nextWaiterEntry = newWaiterEntry;
- newWaiterEntry._nextWaiterEntry = oldFirstWaiterEntry;
- newWaiterEntry._prevWaiterEntry = _headWaiterEntry;
- oldFirstWaiterEntry._prevWaiterEntry = newWaiterEntry;
- }
- // Increment the number of waiters
- ++_waitersCount;
- }
- /// <summary>
- /// Pop a waiter from the waiter's stack
- /// </summary>
- /// <returns>Returns the first waiter in the stack</returns>
- private WaiterEntry PopWaiter()
- {
- // Store the current stack head
- WaiterEntry oldFirstWaiterEntry = _headWaiterEntry._nextWaiterEntry;
- // Store the new stack head
- WaiterEntry newHeadWaiterEntry = oldFirstWaiterEntry._nextWaiterEntry;
- // Update the old stack head list links and decrement the number
- // waiters.
- RemoveWaiter(oldFirstWaiterEntry, true);
- // Update the new stack head
- _headWaiterEntry._nextWaiterEntry = newHeadWaiterEntry;
- if (null != newHeadWaiterEntry)
- {
- newHeadWaiterEntry._prevWaiterEntry = _headWaiterEntry;
- }
- // Return the old stack head
- return oldFirstWaiterEntry;
- }
- /// <summary>
- /// Remove a waiter from the stack
- /// </summary>
- /// <param name="waiterEntry">A waiter entry to remove</param>
- /// <param name="popDecrement">If true the waiter count is always decremented</param>
- private void RemoveWaiter(WaiterEntry waiterEntry, bool popDecrement)
- {
- // Store the prev entry in the list
- WaiterEntry prevWaiterEntry = waiterEntry._prevWaiterEntry;
- // Store the next entry in the list
- WaiterEntry nextWaiterEntry = waiterEntry._nextWaiterEntry;
- // A flag to indicate if we need to decrement the waiters count.
- // If we got here from PopWaiter then we must decrement.
- // If we got here from PushWaiter then we decrement only if
- // the waiter was already in the stack.
- bool decrementCounter = popDecrement;
- // Null the waiter's entry links
- waiterEntry._prevWaiterEntry = null;
- waiterEntry._nextWaiterEntry = null;
- // If the waiter entry had a prev link then update it.
- // It also means that the waiter is already in the list and we
- // need to decrement the waiters count.
- if (null != prevWaiterEntry)
- {
- prevWaiterEntry._nextWaiterEntry = nextWaiterEntry;
- decrementCounter = true;
- }
- // If the waiter entry had a next link then update it.
- // It also means that the waiter is already in the list and we
- // need to decrement the waiters count.
- if (null != nextWaiterEntry)
- {
- nextWaiterEntry._prevWaiterEntry = prevWaiterEntry;
- decrementCounter = true;
- }
- // Decrement the waiters count if needed
- if (decrementCounter)
- {
- --_waitersCount;
- }
- }
- #endregion
- #endregion
- #region WaiterEntry class
- // A waiter entry in the _waiters queue.
- public sealed class WaiterEntry : IDisposable
- {
- #region Member variables
- /// <summary>
- /// Event to signal the waiter that it got the work item.
- /// </summary>
- //private AutoResetEvent _waitHandle = new AutoResetEvent(false);
- private AutoResetEvent _waitHandle = EventWaitHandleFactory.CreateAutoResetEvent();
- /// <summary>
- /// Flag to know if this waiter already quited from the queue
- /// because of a timeout.
- /// </summary>
- private bool _isTimedout = false;
- /// <summary>
- /// Flag to know if the waiter was signaled and got a work item.
- /// </summary>
- private bool _isSignaled = false;
- /// <summary>
- /// A work item that passed directly to the waiter withou going
- /// through the queue
- /// </summary>
- private WorkItem _workItem = null;
- private bool _isDisposed = false;
- // Linked list members
- internal WaiterEntry _nextWaiterEntry = null;
- internal WaiterEntry _prevWaiterEntry = null;
- #endregion
- #region Construction
- public WaiterEntry()
- {
- Reset();
- }
- #endregion
- #region Public methods
- public WaitHandle WaitHandle
- {
- get { return _waitHandle; }
- }
- public WorkItem WorkItem
- {
- get
- {
- return _workItem;
- }
- }
- /// <summary>
- /// Signal the waiter that it got a work item.
- /// </summary>
- /// <returns>Return true on success</returns>
- /// The method fails if Timeout() preceded its call
- public bool Signal(WorkItem workItem)
- {
- lock(this)
- {
- if (!_isTimedout)
- {
- _workItem = workItem;
- _isSignaled = true;
- _waitHandle.Set();
- return true;
- }
- }
- return false;
- }
- /// <summary>
- /// Mark the wait entry that it has been timed out
- /// </summary>
- /// <returns>Return true on success</returns>
- /// The method fails if Signal() preceded its call
- public bool Timeout()
- {
- lock(this)
- {
- // Time out can happen only if the waiter wasn't marked as
- // signaled
- if (!_isSignaled)
- {
- // We don't remove the waiter from the queue, the DequeueWorkItem
- // method skips _waiters that were timed out.
- _isTimedout = true;
- return true;
- }
- }
- return false;
- }
- /// <summary>
- /// Reset the wait entry so it can be used again
- /// </summary>
- public void Reset()
- {
- _workItem = null;
- _isTimedout = false;
- _isSignaled = false;
- _waitHandle.Reset();
- }
- /// <summary>
- /// Free resources
- /// </summary>
- public void Close()
- {
- if (null != _waitHandle)
- {
- _waitHandle.Close();
- _waitHandle = null;
- }
- }
- #endregion
- #region IDisposable Members
- public void Dispose()
- {
- lock (this)
- {
- if (!_isDisposed)
- {
- Close();
- }
- _isDisposed = true;
- }
- }
- #endregion
- }
- #endregion
- #region IDisposable Members
- public void Dispose()
- {
- if (!_isDisposed)
- {
- Cleanup();
- _headWaiterEntry.Close();
- }
- _isDisposed = true;
- }
- private void ValidateNotDisposed()
- {
- if(_isDisposed)
- {
- throw new ObjectDisposedException(GetType().ToString(), "The SmartThreadPool has been shutdown");
- }
- }
- #endregion
- }
- #endregion
- }
|