/* * Copyright (c) Contributors, http://opensimulator.org/ * See CONTRIBUTORS.TXT for a full list of copyright holders. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * * Neither the name of the OpenSimulator Project nor the * names of its contributors may be used to endorse or promote products * derived from this software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE DEVELOPERS ``AS IS'' AND ANY * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE * DISCLAIMED. IN NO EVENT SHALL THE CONTRIBUTORS BE LIABLE FOR ANY * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ // A pool of jobs or workitems with same method (callback) but diferent argument (as object) to run in main threadpool // can have up to m_concurrency number of execution threads // it will hold each thread up to m_threadsHoldtime ms waiting for more work, before releasing it back to the pool. using System; using System.Collections.Concurrent; using System.Reflection; using System.Threading; using log4net; namespace OpenSim.Framework { public class ObjectJobEngine : IDisposable { private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); private readonly object m_mainLock = new object(); private readonly string m_name; private readonly int m_threadsHoldtime; private readonly int m_concurrency = 1; private BlockingCollection m_jobQueue; private CancellationTokenSource m_cancelSource; private WaitCallback m_callback; private int m_numberThreads = 0; private bool m_isRunning; public ObjectJobEngine(WaitCallback callback, string name, int threadsHoldtime = 1000, int concurrency = 1) { m_name = name; m_threadsHoldtime = threadsHoldtime; if (concurrency < 1) m_concurrency = 1; else m_concurrency = concurrency; if (callback != null) { m_callback = callback; m_jobQueue = new BlockingCollection(); m_cancelSource = new CancellationTokenSource(); m_isRunning = true; } } ~ObjectJobEngine() { Dispose(false); } public void Dispose() { Dispose(true); GC.SuppressFinalize(this); } private void Dispose(bool disposing) { lock(m_mainLock) { if (!m_isRunning) return; m_isRunning = false; m_cancelSource.Cancel(); } if (m_numberThreads > 0) { int cntr = 100; while (m_numberThreads > 0 && --cntr > 0) Thread.Yield(); } if (m_jobQueue != null) { m_jobQueue.Dispose(); m_jobQueue = null; } if (m_cancelSource != null) { m_cancelSource.Dispose(); m_cancelSource = null; } m_callback = null; } /// /// Number of jobs waiting to be processed. /// public int Count { get { return m_jobQueue == null ? 0 : m_jobQueue.Count; } } public void Cancel() { if (!m_isRunning || m_jobQueue == null || m_jobQueue.Count == 0) return; try { while(m_jobQueue.TryTake(out object dummy)); m_cancelSource.Cancel(); } catch { } } /// /// Queue the job for processing. /// /// true, if job was queued, false otherwise. /// The job /// public bool Enqueue(object o) { if (!m_isRunning) return false; m_jobQueue?.Add(o); lock (m_mainLock) { if (m_numberThreads < m_concurrency && m_numberThreads < m_jobQueue.Count) { Util.FireAndForget(ProcessRequests, null, m_name, false); ++m_numberThreads; } } return true; } private void ProcessRequests(object o) { object obj; while (m_isRunning) { try { if(!m_jobQueue.TryTake(out obj, m_threadsHoldtime, m_cancelSource.Token)) break; } catch { break; } if(!m_isRunning || m_callback == null) break; try { m_callback.Invoke(obj); obj = null; } catch (Exception e) { m_log.ErrorFormat( "[ObjectJob {0}]: Job failed, continuing. Exception {1}",m_name, e); } } lock (m_mainLock) --m_numberThreads; } } }