WorkManager.cs 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294
  1. /*
  2. * Copyright (c) Contributors, http://opensimulator.org/
  3. * See CONTRIBUTORS.TXT for a full list of copyright holders.
  4. *
  5. * Redistribution and use in source and binary forms, with or without
  6. * modification, are permitted provided that the following conditions are met:
  7. * * Redistributions of source code must retain the above copyright
  8. * notice, this list of conditions and the following disclaimer.
  9. * * Redistributions in binary form must reproduce the above copyright
  10. * notice, this list of conditions and the following disclaimer in the
  11. * documentation and/or other materials provided with the distribution.
  12. * * Neither the name of the OpenSimulator Project nor the
  13. * names of its contributors may be used to endorse or promote products
  14. * derived from this software without specific prior written permission.
  15. *
  16. * THIS SOFTWARE IS PROVIDED BY THE DEVELOPERS ``AS IS'' AND ANY
  17. * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
  18. * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
  19. * DISCLAIMED. IN NO EVENT SHALL THE CONTRIBUTORS BE LIABLE FOR ANY
  20. * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
  21. * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
  22. * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
  23. * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  24. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
  25. * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  26. */
  27. using System;
  28. using System.Reflection;
  29. using System.Threading;
  30. using log4net;
  31. namespace OpenSim.Framework.Monitoring
  32. {
  33. /// <summary>
  34. /// Manages various work items in the simulator.
  35. /// </summary>
  36. /// <remarks>
  37. /// Currently, here work can be started
  38. /// * As a long-running and monitored thread.
  39. /// * In a thread that will never timeout but where the job is expected to eventually complete.
  40. /// * In a threadpool thread that will timeout if it takes a very long time to complete (> 10 mins).
  41. /// * As a job which will be run in a single-threaded job engine. Such jobs must not incorporate delays (sleeps,
  42. /// network waits, etc.).
  43. ///
  44. /// This is an evolving approach to better manage the work that OpenSimulator is asked to do from a very diverse
  45. /// range of sources (client actions, incoming network, outgoing network calls, etc.).
  46. ///
  47. /// Util.FireAndForget is still available to insert jobs in the threadpool, though this is equivalent to
  48. /// WorkManager.RunInThreadPool().
  49. /// </remarks>
  50. public static class WorkManager
  51. {
  52. private static readonly ILog m_log = LogManager.GetLogger(System.Reflection.MethodBase.GetCurrentMethod().DeclaringType);
  53. public static JobEngine JobEngine { get; private set; }
  54. static WorkManager()
  55. {
  56. JobEngine = new JobEngine("Non-blocking non-critical job engine", "JOB ENGINE", 30000);
  57. StatsManager.RegisterStat(
  58. new Stat(
  59. "JobsWaiting",
  60. "Number of jobs waiting for processing.",
  61. "",
  62. "",
  63. "server",
  64. "jobengine",
  65. StatType.Pull,
  66. MeasuresOfInterest.None,
  67. stat => stat.Value = JobEngine.JobsWaiting,
  68. StatVerbosity.Debug));
  69. MainConsole.Instance.Commands.AddCommand(
  70. "Debug",
  71. false,
  72. "debug jobengine",
  73. "debug jobengine <start|stop|status|log>",
  74. "Start, stop, get status or set logging level of the job engine.",
  75. "If stopped then all outstanding jobs are processed immediately.",
  76. HandleControlCommand);
  77. }
  78. public static void Stop()
  79. {
  80. JobEngine.Stop();
  81. Watchdog.Stop();
  82. }
  83. /// <summary>
  84. /// Start a new long-lived thread.
  85. /// </summary>
  86. /// <param name="start">The method that will be executed in a new thread</param>
  87. /// <param name="name">A name to give to the new thread</param>
  88. /// <param name="priority">Priority to run the thread at</param>
  89. /// <param name="isBackground">True to run this thread as a background thread, otherwise false</param>
  90. /// <param name="alarmIfTimeout">Trigger an alarm function is we have timed out</param>
  91. /// <param name="log">If true then creation of thread is logged.</param>
  92. /// <returns>The newly created Thread object</returns>
  93. public static Thread StartThread(
  94. ThreadStart start, string name, ThreadPriority priority, bool isBackground, bool alarmIfTimeout, bool log = true)
  95. {
  96. return StartThread(start, name, priority, isBackground, alarmIfTimeout, null, Watchdog.DEFAULT_WATCHDOG_TIMEOUT_MS, log);
  97. }
  98. /// <summary>
  99. /// Start a new thread that is tracked by the watchdog
  100. /// </summary>
  101. /// <param name="start">The method that will be executed in a new thread</param>
  102. /// <param name="name">A name to give to the new thread</param>
  103. /// <param name="priority">Priority to run the thread at</param>
  104. /// <param name="isBackground">True to run this thread as a background
  105. /// thread, otherwise false</param>
  106. /// <param name="alarmIfTimeout">Trigger an alarm function is we have timed out</param>
  107. /// <param name="alarmMethod">
  108. /// Alarm method to call if alarmIfTimeout is true and there is a timeout.
  109. /// Normally, this will just return some useful debugging information.
  110. /// </param>
  111. /// <param name="timeout">Number of milliseconds to wait until we issue a warning about timeout.</param>
  112. /// <param name="log">If true then creation of thread is logged.</param>
  113. /// <returns>The newly created Thread object</returns>
  114. public static Thread StartThread(
  115. ThreadStart start, string name, ThreadPriority priority, bool isBackground,
  116. bool alarmIfTimeout, Func<string> alarmMethod, int timeout, bool log = true)
  117. {
  118. Thread thread = new Thread(start);
  119. thread.Priority = priority;
  120. thread.IsBackground = isBackground;
  121. thread.Name = name;
  122. Watchdog.ThreadWatchdogInfo twi
  123. = new Watchdog.ThreadWatchdogInfo(thread, timeout, name)
  124. { AlarmIfTimeout = alarmIfTimeout, AlarmMethod = alarmMethod };
  125. Watchdog.AddThread(twi, name, log:log);
  126. thread.Start();
  127. return thread;
  128. }
  129. /// <summary>
  130. /// Run the callback in a new thread immediately. If the thread exits with an exception log it but do
  131. /// not propogate it.
  132. /// </summary>
  133. /// <param name="callback">Code for the thread to execute.</param>
  134. /// <param name="obj">Object to pass to the thread.</param>
  135. /// <param name="name">Name of the thread</param>
  136. public static void RunInThread(WaitCallback callback, object obj, string name, bool log = false)
  137. {
  138. if (Util.FireAndForgetMethod == FireAndForgetMethod.RegressionTest)
  139. {
  140. Culture.SetCurrentCulture();
  141. callback(obj);
  142. return;
  143. }
  144. ThreadStart ts = new ThreadStart(delegate()
  145. {
  146. try
  147. {
  148. Culture.SetCurrentCulture();
  149. callback(obj);
  150. Watchdog.RemoveThread(log:false);
  151. }
  152. catch (Exception e)
  153. {
  154. m_log.Error(string.Format("[WATCHDOG]: Exception in thread {0}.", name), e);
  155. }
  156. });
  157. StartThread(ts, name, ThreadPriority.Normal, true, false, log:log);
  158. }
  159. /// <summary>
  160. /// Run the callback via a threadpool thread.
  161. /// </summary>
  162. /// <remarks>
  163. /// Such jobs may run after some delay but must always complete.
  164. /// </remarks>
  165. /// <param name="callback"></param>
  166. /// <param name="obj"></param>
  167. /// <param name="name">The name of the job. This is used in monitoring and debugging.</param>
  168. public static void RunInThreadPool(System.Threading.WaitCallback callback, object obj, string name, bool timeout = true)
  169. {
  170. Util.FireAndForget(callback, obj, name, timeout);
  171. }
  172. /// <summary>
  173. /// Run a job.
  174. /// </summary>
  175. /// <remarks>
  176. /// This differs from direct scheduling (e.g. Util.FireAndForget) in that a job can be run in the job
  177. /// engine if it is running, where all jobs are currently performed in sequence on a single thread. This is
  178. /// to prevent observed overload and server freeze problems when there are hundreds of connections which all attempt to
  179. /// perform work at once (e.g. in conference situations). With lower numbers of connections, the small
  180. /// delay in performing jobs in sequence rather than concurrently has not been notiecable in testing, though a future more
  181. /// sophisticated implementation could perform jobs concurrently when the server is under low load.
  182. ///
  183. /// However, be advised that some callers of this function rely on all jobs being performed in sequence if any
  184. /// jobs are performed in sequence (i.e. if jobengine is active or not). Therefore, expanding the jobengine
  185. /// beyond a single thread will require considerable thought.
  186. ///
  187. /// Also, any jobs submitted must be guaranteed to complete within a reasonable timeframe (e.g. they cannot
  188. /// incorporate a network delay with a long timeout). At the moment, work that could suffer such issues
  189. /// should still be run directly with RunInThread(), Util.FireAndForget(), etc. This is another area where
  190. /// the job engine could be improved and so CPU utilization improved by better management of concurrency within
  191. /// OpenSimulator.
  192. /// </remarks>
  193. /// <param name="jobType">General classification for the job (e.g. "RezAttachments").</param>
  194. /// <param name="callback">Callback for job.</param>
  195. /// <param name="obj">Object to pass to callback when run</param>
  196. /// <param name="name">Specific name of job (e.g. "RezAttachments for Joe Bloggs"</param>
  197. /// <param name="canRunInThisThread">If set to true then the job may be run in ths calling thread.</param>
  198. /// <param name="mustNotTimeout">If the true then the job must never timeout.</param>
  199. /// <param name="log">If set to true then extra logging is performed.</param>
  200. public static void RunJob(
  201. string jobType, WaitCallback callback, object obj, string name,
  202. bool canRunInThisThread = false, bool mustNotTimeout = false,
  203. bool log = false)
  204. {
  205. if (Util.FireAndForgetMethod == FireAndForgetMethod.RegressionTest)
  206. {
  207. Culture.SetCurrentCulture();
  208. callback(obj);
  209. return;
  210. }
  211. if (JobEngine.IsRunning)
  212. JobEngine.QueueJob(name, () => callback(obj));
  213. else if (canRunInThisThread)
  214. callback(obj);
  215. else
  216. Util.FireAndForget(callback, obj, name, !mustNotTimeout);
  217. }
  218. private static void HandleControlCommand(string module, string[] args)
  219. {
  220. // if (SceneManager.Instance.CurrentScene != null && SceneManager.Instance.CurrentScene != m_udpServer.Scene)
  221. // return;
  222. if (args.Length < 3)
  223. {
  224. MainConsole.Instance.Output("Usage: debug jobengine <stop|start|status|log>");
  225. return;
  226. }
  227. string subCommand = args[2];
  228. if (subCommand == "stop")
  229. {
  230. JobEngine.Stop();
  231. MainConsole.Instance.OutputFormat("Stopped job engine.");
  232. }
  233. else if (subCommand == "start")
  234. {
  235. JobEngine.Start();
  236. MainConsole.Instance.OutputFormat("Started job engine.");
  237. }
  238. else if (subCommand == "status")
  239. {
  240. MainConsole.Instance.OutputFormat("Job engine running: {0}", JobEngine.IsRunning);
  241. JobEngine.Job job = JobEngine.CurrentJob;
  242. MainConsole.Instance.OutputFormat("Current job {0}", job != null ? job.Name : "none");
  243. MainConsole.Instance.OutputFormat(
  244. "Jobs waiting: {0}", JobEngine.IsRunning ? JobEngine.JobsWaiting.ToString() : "n/a");
  245. MainConsole.Instance.OutputFormat("Log Level: {0}", JobEngine.LogLevel);
  246. }
  247. else if (subCommand == "log")
  248. {
  249. if (args.Length < 4)
  250. {
  251. MainConsole.Instance.Output("Usage: debug jobengine log <level>");
  252. return;
  253. }
  254. // int logLevel;
  255. int logLevel = int.Parse(args[3]);
  256. // if (ConsoleUtil.TryParseConsoleInt(MainConsole.Instance, args[4], out logLevel))
  257. // {
  258. JobEngine.LogLevel = logLevel;
  259. MainConsole.Instance.OutputFormat("Set debug log level to {0}", JobEngine.LogLevel);
  260. // }
  261. }
  262. else
  263. {
  264. MainConsole.Instance.OutputFormat("Unrecognized job engine subcommand {0}", subCommand);
  265. }
  266. }
  267. }
  268. }