JobEngine.cs 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341
  1. /*
  2. * Copyright (c) Contributors, http://opensimulator.org/
  3. * See CONTRIBUTORS.TXT for a full list of copyright holders.
  4. *
  5. * Redistribution and use in source and binary forms, with or without
  6. * modification, are permitted provided that the following conditions are met:
  7. * * Redistributions of source code must retain the above copyright
  8. * notice, this list of conditions and the following disclaimer.
  9. * * Redistributions in binary form must reproduce the above copyright
  10. * notice, this list of conditions and the following disclaimer in the
  11. * documentation and/or other materials provided with the distribution.
  12. * * Neither the name of the OpenSimulator Project nor the
  13. * names of its contributors may be used to endorse or promote products
  14. * derived from this software without specific prior written permission.
  15. *
  16. * THIS SOFTWARE IS PROVIDED BY THE DEVELOPERS ``AS IS'' AND ANY
  17. * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
  18. * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
  19. * DISCLAIMED. IN NO EVENT SHALL THE CONTRIBUTORS BE LIABLE FOR ANY
  20. * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
  21. * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
  22. * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
  23. * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  24. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
  25. * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  26. */
  27. using System;
  28. using System.Collections.Concurrent;
  29. using System.Reflection;
  30. using System.Threading;
  31. using log4net;
  32. using OpenSim.Framework;
  33. namespace OpenSim.Framework.Monitoring
  34. {
  35. public class JobEngine
  36. {
  37. private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
  38. public int LogLevel { get; set; }
  39. public string Name { get; private set; }
  40. public string LoggingName { get; private set; }
  41. /// <summary>
  42. /// Is this engine running?
  43. /// </summary>
  44. public bool IsRunning { get; private set; }
  45. /// <summary>
  46. /// The current job that the engine is running.
  47. /// </summary>
  48. /// <remarks>
  49. /// Will be null if no job is currently running.
  50. /// </remarks>
  51. public Job CurrentJob { get; private set; }
  52. /// <summary>
  53. /// Number of jobs waiting to be processed.
  54. /// </summary>
  55. public int JobsWaiting { get { return m_jobQueue.Count; } }
  56. /// <summary>
  57. /// The timeout in milliseconds to wait for at least one event to be written when the recorder is stopping.
  58. /// </summary>
  59. public int RequestProcessTimeoutOnStop { get; set; }
  60. /// <summary>
  61. /// Controls whether we need to warn in the log about exceeding the max queue size.
  62. /// </summary>
  63. /// <remarks>
  64. /// This is flipped to false once queue max has been exceeded and back to true when it falls below max, in
  65. /// order to avoid spamming the log with lots of warnings.
  66. /// </remarks>
  67. private bool m_warnOverMaxQueue = true;
  68. private BlockingCollection<Job> m_jobQueue;
  69. private CancellationTokenSource m_cancelSource;
  70. /// <summary>
  71. /// Used to signal that we are ready to complete stop.
  72. /// </summary>
  73. private ManualResetEvent m_finishedProcessingAfterStop = new ManualResetEvent(false);
  74. public JobEngine(string name, string loggingName)
  75. {
  76. Name = name;
  77. LoggingName = loggingName;
  78. RequestProcessTimeoutOnStop = 5000;
  79. }
  80. public void Start()
  81. {
  82. lock (this)
  83. {
  84. if (IsRunning)
  85. return;
  86. IsRunning = true;
  87. m_finishedProcessingAfterStop.Reset();
  88. m_jobQueue = new BlockingCollection<Job>(new ConcurrentQueue<Job>(), 5000);
  89. m_cancelSource = new CancellationTokenSource();
  90. WorkManager.StartThread(
  91. ProcessRequests,
  92. Name,
  93. ThreadPriority.Normal,
  94. false,
  95. true,
  96. null,
  97. int.MaxValue);
  98. }
  99. }
  100. public void Stop()
  101. {
  102. lock (this)
  103. {
  104. try
  105. {
  106. if (!IsRunning)
  107. return;
  108. IsRunning = false;
  109. int requestsLeft = m_jobQueue.Count;
  110. if (requestsLeft <= 0)
  111. {
  112. m_cancelSource.Cancel();
  113. }
  114. else
  115. {
  116. m_log.InfoFormat("[{0}]: Waiting to write {1} events after stop.", LoggingName, requestsLeft);
  117. while (requestsLeft > 0)
  118. {
  119. if (!m_finishedProcessingAfterStop.WaitOne(RequestProcessTimeoutOnStop))
  120. {
  121. // After timeout no events have been written
  122. if (requestsLeft == m_jobQueue.Count)
  123. {
  124. m_log.WarnFormat(
  125. "[{0}]: No requests processed after {1} ms wait. Discarding remaining {2} requests",
  126. LoggingName, RequestProcessTimeoutOnStop, requestsLeft);
  127. break;
  128. }
  129. }
  130. requestsLeft = m_jobQueue.Count;
  131. }
  132. }
  133. }
  134. finally
  135. {
  136. m_cancelSource.Dispose();
  137. }
  138. }
  139. }
  140. /// <summary>
  141. /// Make a job.
  142. /// </summary>
  143. /// <remarks>
  144. /// We provide this method to replace the constructor so that we can later pool job objects if necessary to
  145. /// reduce memory churn. Normally one would directly call QueueJob() with parameters anyway.
  146. /// </remarks>
  147. /// <returns></returns>
  148. /// <param name="name">Name.</param>
  149. /// <param name="action">Action.</param>
  150. /// <param name="commonId">Common identifier.</param>
  151. public static Job MakeJob(string name, Action action, string commonId = null)
  152. {
  153. return Job.MakeJob(name, action, commonId);
  154. }
  155. /// <summary>
  156. /// Remove the next job queued for processing.
  157. /// </summary>
  158. /// <remarks>
  159. /// Returns null if there is no next job.
  160. /// Will not remove a job currently being performed.
  161. /// </remarks>
  162. public Job RemoveNextJob()
  163. {
  164. Job nextJob;
  165. m_jobQueue.TryTake(out nextJob);
  166. return nextJob;
  167. }
  168. /// <summary>
  169. /// Queue the job for processing.
  170. /// </summary>
  171. /// <returns><c>true</c>, if job was queued, <c>false</c> otherwise.</returns>
  172. /// <param name="name">Name of job. This appears on the console and in logging.</param>
  173. /// <param name="action">Action to perform.</param>
  174. /// <param name="commonId">
  175. /// Common identifier for a set of jobs. This is allows a set of jobs to be removed
  176. /// if required (e.g. all jobs for a given agent. Optional.
  177. /// </param>
  178. public bool QueueJob(string name, Action action, string commonId = null)
  179. {
  180. return QueueJob(MakeJob(name, action, commonId));
  181. }
  182. /// <summary>
  183. /// Queue the job for processing.
  184. /// </summary>
  185. /// <returns><c>true</c>, if job was queued, <c>false</c> otherwise.</returns>
  186. /// <param name="job">The job</param>
  187. /// </param>
  188. public bool QueueJob(Job job)
  189. {
  190. if (m_jobQueue.Count < m_jobQueue.BoundedCapacity)
  191. {
  192. m_jobQueue.Add(job);
  193. if (!m_warnOverMaxQueue)
  194. m_warnOverMaxQueue = true;
  195. return true;
  196. }
  197. else
  198. {
  199. if (m_warnOverMaxQueue)
  200. {
  201. m_log.WarnFormat(
  202. "[{0}]: Job queue at maximum capacity, not recording job from {1} in {2}",
  203. LoggingName, job.Name, Name);
  204. m_warnOverMaxQueue = false;
  205. }
  206. return false;
  207. }
  208. }
  209. private void ProcessRequests()
  210. {
  211. try
  212. {
  213. while (IsRunning || m_jobQueue.Count > 0)
  214. {
  215. try
  216. {
  217. CurrentJob = m_jobQueue.Take(m_cancelSource.Token);
  218. }
  219. catch (ObjectDisposedException e)
  220. {
  221. // If we see this whilst not running then it may be due to a race where this thread checks
  222. // IsRunning after the stopping thread sets it to false and disposes of the cancellation source.
  223. if (IsRunning)
  224. throw e;
  225. else
  226. break;
  227. }
  228. if (LogLevel >= 1)
  229. m_log.DebugFormat("[{0}]: Processing job {1}", LoggingName, CurrentJob.Name);
  230. try
  231. {
  232. CurrentJob.Action();
  233. }
  234. catch (Exception e)
  235. {
  236. m_log.Error(
  237. string.Format(
  238. "[{0}]: Job {1} failed, continuing. Exception ", LoggingName, CurrentJob.Name), e);
  239. }
  240. if (LogLevel >= 1)
  241. m_log.DebugFormat("[{0}]: Processed job {1}", LoggingName, CurrentJob.Name);
  242. CurrentJob = null;
  243. }
  244. }
  245. catch (OperationCanceledException)
  246. {
  247. }
  248. m_finishedProcessingAfterStop.Set();
  249. }
  250. public class Job
  251. {
  252. /// <summary>
  253. /// Name of the job.
  254. /// </summary>
  255. /// <remarks>
  256. /// This appears on console and debug output.
  257. /// </remarks>
  258. public string Name { get; private set; }
  259. /// <summary>
  260. /// Common ID for this job.
  261. /// </summary>
  262. /// <remarks>
  263. /// This allows all jobs with a certain common ID (e.g. a client UUID) to be removed en-masse if required.
  264. /// Can be null if this is not required.
  265. /// </remarks>
  266. public string CommonId { get; private set; }
  267. /// <summary>
  268. /// Action to perform when this job is processed.
  269. /// </summary>
  270. public Action Action { get; private set; }
  271. private Job(string name, string commonId, Action action)
  272. {
  273. Name = name;
  274. CommonId = commonId;
  275. Action = action;
  276. }
  277. /// <summary>
  278. /// Make a job. It needs to be separately queued.
  279. /// </summary>
  280. /// <remarks>
  281. /// We provide this method to replace the constructor so that we can pool job objects if necessary to
  282. /// to reduce memory churn. Normally one would directly call JobEngine.QueueJob() with parameters anyway.
  283. /// </remarks>
  284. /// <returns></returns>
  285. /// <param name="name">Name.</param>
  286. /// <param name="action">Action.</param>
  287. /// <param name="commonId">Common identifier.</param>
  288. public static Job MakeJob(string name, Action action, string commonId = null)
  289. {
  290. return new Job(name, commonId, action);
  291. }
  292. }
  293. }
  294. }