WorkManager.cs 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290
  1. /*
  2. * Copyright (c) Contributors, http://opensimulator.org/
  3. * See CONTRIBUTORS.TXT for a full list of copyright holders.
  4. *
  5. * Redistribution and use in source and binary forms, with or without
  6. * modification, are permitted provided that the following conditions are met:
  7. * * Redistributions of source code must retain the above copyright
  8. * notice, this list of conditions and the following disclaimer.
  9. * * Redistributions in binary form must reproduce the above copyright
  10. * notice, this list of conditions and the following disclaimer in the
  11. * documentation and/or other materials provided with the distribution.
  12. * * Neither the name of the OpenSimulator Project nor the
  13. * names of its contributors may be used to endorse or promote products
  14. * derived from this software without specific prior written permission.
  15. *
  16. * THIS SOFTWARE IS PROVIDED BY THE DEVELOPERS ``AS IS'' AND ANY
  17. * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
  18. * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
  19. * DISCLAIMED. IN NO EVENT SHALL THE CONTRIBUTORS BE LIABLE FOR ANY
  20. * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
  21. * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
  22. * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
  23. * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  24. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
  25. * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  26. */
  27. using System;
  28. using System.Reflection;
  29. using System.Threading;
  30. using log4net;
  31. namespace OpenSim.Framework.Monitoring
  32. {
  33. /// <summary>
  34. /// Manages various work items in the simulator.
  35. /// </summary>
  36. /// <remarks>
  37. /// Currently, here work can be started
  38. /// * As a long-running and monitored thread.
  39. /// * In a thread that will never timeout but where the job is expected to eventually complete.
  40. /// * In a threadpool thread that will timeout if it takes a very long time to complete (> 10 mins).
  41. /// * As a job which will be run in a single-threaded job engine. Such jobs must not incorporate delays (sleeps,
  42. /// network waits, etc.).
  43. ///
  44. /// This is an evolving approach to better manage the work that OpenSimulator is asked to do from a very diverse
  45. /// range of sources (client actions, incoming network, outgoing network calls, etc.).
  46. ///
  47. /// Util.FireAndForget is still available to insert jobs in the threadpool, though this is equivalent to
  48. /// WorkManager.RunInThreadPool().
  49. /// </remarks>
  50. public static class WorkManager
  51. {
  52. private static readonly ILog m_log = LogManager.GetLogger(System.Reflection.MethodBase.GetCurrentMethod().DeclaringType);
  53. public static JobEngine JobEngine { get; private set; }
  54. static WorkManager()
  55. {
  56. JobEngine = new JobEngine("Non-blocking non-critical job engine", "JOB ENGINE");
  57. StatsManager.RegisterStat(
  58. new Stat(
  59. "JobsWaiting",
  60. "Number of jobs waiting for processing.",
  61. "",
  62. "",
  63. "server",
  64. "jobengine",
  65. StatType.Pull,
  66. MeasuresOfInterest.None,
  67. stat => stat.Value = JobEngine.JobsWaiting,
  68. StatVerbosity.Debug));
  69. MainConsole.Instance.Commands.AddCommand(
  70. "Debug",
  71. false,
  72. "debug jobengine",
  73. "debug jobengine <start|stop|status|log>",
  74. "Start, stop, get status or set logging level of the job engine.",
  75. "If stopped then all outstanding jobs are processed immediately.",
  76. HandleControlCommand);
  77. }
  78. /// <summary>
  79. /// Start a new long-lived thread.
  80. /// </summary>
  81. /// <param name="start">The method that will be executed in a new thread</param>
  82. /// <param name="name">A name to give to the new thread</param>
  83. /// <param name="priority">Priority to run the thread at</param>
  84. /// <param name="isBackground">True to run this thread as a background thread, otherwise false</param>
  85. /// <param name="alarmIfTimeout">Trigger an alarm function is we have timed out</param>
  86. /// <param name="log">If true then creation of thread is logged.</param>
  87. /// <returns>The newly created Thread object</returns>
  88. public static Thread StartThread(
  89. ThreadStart start, string name, ThreadPriority priority, bool isBackground, bool alarmIfTimeout, bool log = true)
  90. {
  91. return StartThread(start, name, priority, isBackground, alarmIfTimeout, null, Watchdog.DEFAULT_WATCHDOG_TIMEOUT_MS, log);
  92. }
  93. /// <summary>
  94. /// Start a new thread that is tracked by the watchdog
  95. /// </summary>
  96. /// <param name="start">The method that will be executed in a new thread</param>
  97. /// <param name="name">A name to give to the new thread</param>
  98. /// <param name="priority">Priority to run the thread at</param>
  99. /// <param name="isBackground">True to run this thread as a background
  100. /// thread, otherwise false</param>
  101. /// <param name="alarmIfTimeout">Trigger an alarm function is we have timed out</param>
  102. /// <param name="alarmMethod">
  103. /// Alarm method to call if alarmIfTimeout is true and there is a timeout.
  104. /// Normally, this will just return some useful debugging information.
  105. /// </param>
  106. /// <param name="timeout">Number of milliseconds to wait until we issue a warning about timeout.</param>
  107. /// <param name="log">If true then creation of thread is logged.</param>
  108. /// <returns>The newly created Thread object</returns>
  109. public static Thread StartThread(
  110. ThreadStart start, string name, ThreadPriority priority, bool isBackground,
  111. bool alarmIfTimeout, Func<string> alarmMethod, int timeout, bool log = true)
  112. {
  113. Thread thread = new Thread(start);
  114. thread.Priority = priority;
  115. thread.IsBackground = isBackground;
  116. Watchdog.ThreadWatchdogInfo twi
  117. = new Watchdog.ThreadWatchdogInfo(thread, timeout, name)
  118. { AlarmIfTimeout = alarmIfTimeout, AlarmMethod = alarmMethod };
  119. Watchdog.AddThread(twi, name, log:log);
  120. thread.Start();
  121. thread.Name = name;
  122. return thread;
  123. }
  124. /// <summary>
  125. /// Run the callback in a new thread immediately. If the thread exits with an exception log it but do
  126. /// not propogate it.
  127. /// </summary>
  128. /// <param name="callback">Code for the thread to execute.</param>
  129. /// <param name="obj">Object to pass to the thread.</param>
  130. /// <param name="name">Name of the thread</param>
  131. public static void RunInThread(WaitCallback callback, object obj, string name, bool log = false)
  132. {
  133. if (Util.FireAndForgetMethod == FireAndForgetMethod.RegressionTest)
  134. {
  135. Culture.SetCurrentCulture();
  136. callback(obj);
  137. return;
  138. }
  139. ThreadStart ts = new ThreadStart(delegate()
  140. {
  141. try
  142. {
  143. Culture.SetCurrentCulture();
  144. callback(obj);
  145. Watchdog.RemoveThread(log:false);
  146. }
  147. catch (Exception e)
  148. {
  149. m_log.Error(string.Format("[WATCHDOG]: Exception in thread {0}.", name), e);
  150. }
  151. });
  152. StartThread(ts, name, ThreadPriority.Normal, true, false, log:log);
  153. }
  154. /// <summary>
  155. /// Run the callback via a threadpool thread.
  156. /// </summary>
  157. /// <remarks>
  158. /// Such jobs may run after some delay but must always complete.
  159. /// </remarks>
  160. /// <param name="callback"></param>
  161. /// <param name="obj"></param>
  162. /// <param name="name">The name of the job. This is used in monitoring and debugging.</param>
  163. public static void RunInThreadPool(System.Threading.WaitCallback callback, object obj, string name)
  164. {
  165. Util.FireAndForget(callback, obj, name);
  166. }
  167. /// <summary>
  168. /// Run a job.
  169. /// </summary>
  170. /// <remarks>
  171. /// This differs from direct scheduling (e.g. Util.FireAndForget) in that a job can be run in the job
  172. /// engine if it is running, where all jobs are currently performed in sequence on a single thread. This is
  173. /// to prevent observed overload and server freeze problems when there are hundreds of connections which all attempt to
  174. /// perform work at once (e.g. in conference situations). With lower numbers of connections, the small
  175. /// delay in performing jobs in sequence rather than concurrently has not been notiecable in testing, though a future more
  176. /// sophisticated implementation could perform jobs concurrently when the server is under low load.
  177. ///
  178. /// However, be advised that some callers of this function rely on all jobs being performed in sequence if any
  179. /// jobs are performed in sequence (i.e. if jobengine is active or not). Therefore, expanding the jobengine
  180. /// beyond a single thread will require considerable thought.
  181. ///
  182. /// Also, any jobs submitted must be guaranteed to complete within a reasonable timeframe (e.g. they cannot
  183. /// incorporate a network delay with a long timeout). At the moment, work that could suffer such issues
  184. /// should still be run directly with RunInThread(), Util.FireAndForget(), etc. This is another area where
  185. /// the job engine could be improved and so CPU utilization improved by better management of concurrency within
  186. /// OpenSimulator.
  187. /// </remarks>
  188. /// <param name="jobType">General classification for the job (e.g. "RezAttachments").</param>
  189. /// <param name="callback">Callback for job.</param>
  190. /// <param name="obj">Object to pass to callback when run</param>
  191. /// <param name="name">Specific name of job (e.g. "RezAttachments for Joe Bloggs"</param>
  192. /// <param name="canRunInThisThread">If set to true then the job may be run in ths calling thread.</param>
  193. /// <param name="mustNotTimeout">If the true then the job must never timeout.</param>
  194. /// <param name="log">If set to true then extra logging is performed.</param>
  195. public static void RunJob(
  196. string jobType, WaitCallback callback, object obj, string name,
  197. bool canRunInThisThread = false, bool mustNotTimeout = false,
  198. bool log = false)
  199. {
  200. if (Util.FireAndForgetMethod == FireAndForgetMethod.RegressionTest)
  201. {
  202. Culture.SetCurrentCulture();
  203. callback(obj);
  204. return;
  205. }
  206. if (JobEngine.IsRunning)
  207. JobEngine.QueueJob(name, () => callback(obj));
  208. else if (canRunInThisThread)
  209. callback(obj);
  210. else if (mustNotTimeout)
  211. RunInThread(callback, obj, name, log);
  212. else
  213. Util.FireAndForget(callback, obj, name);
  214. }
  215. private static void HandleControlCommand(string module, string[] args)
  216. {
  217. // if (SceneManager.Instance.CurrentScene != null && SceneManager.Instance.CurrentScene != m_udpServer.Scene)
  218. // return;
  219. if (args.Length < 3)
  220. {
  221. MainConsole.Instance.Output("Usage: debug jobengine <stop|start|status|log>");
  222. return;
  223. }
  224. string subCommand = args[2];
  225. if (subCommand == "stop")
  226. {
  227. JobEngine.Stop();
  228. MainConsole.Instance.OutputFormat("Stopped job engine.");
  229. }
  230. else if (subCommand == "start")
  231. {
  232. JobEngine.Start();
  233. MainConsole.Instance.OutputFormat("Started job engine.");
  234. }
  235. else if (subCommand == "status")
  236. {
  237. MainConsole.Instance.OutputFormat("Job engine running: {0}", JobEngine.IsRunning);
  238. JobEngine.Job job = JobEngine.CurrentJob;
  239. MainConsole.Instance.OutputFormat("Current job {0}", job != null ? job.Name : "none");
  240. MainConsole.Instance.OutputFormat(
  241. "Jobs waiting: {0}", JobEngine.IsRunning ? JobEngine.JobsWaiting.ToString() : "n/a");
  242. MainConsole.Instance.OutputFormat("Log Level: {0}", JobEngine.LogLevel);
  243. }
  244. else if (subCommand == "log")
  245. {
  246. if (args.Length < 4)
  247. {
  248. MainConsole.Instance.Output("Usage: debug jobengine log <level>");
  249. return;
  250. }
  251. // int logLevel;
  252. int logLevel = int.Parse(args[3]);
  253. // if (ConsoleUtil.TryParseConsoleInt(MainConsole.Instance, args[4], out logLevel))
  254. // {
  255. JobEngine.LogLevel = logLevel;
  256. MainConsole.Instance.OutputFormat("Set debug log level to {0}", JobEngine.LogLevel);
  257. // }
  258. }
  259. else
  260. {
  261. MainConsole.Instance.OutputFormat("Unrecognized job engine subcommand {0}", subCommand);
  262. }
  263. }
  264. }
  265. }