1
0

WorkManager.cs 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258
  1. /*
  2. * Copyright (c) Contributors, http://opensimulator.org/
  3. * See CONTRIBUTORS.TXT for a full list of copyright holders.
  4. *
  5. * Redistribution and use in source and binary forms, with or without
  6. * modification, are permitted provided that the following conditions are met:
  7. * * Redistributions of source code must retain the above copyright
  8. * notice, this list of conditions and the following disclaimer.
  9. * * Redistributions in binary form must reproduce the above copyright
  10. * notice, this list of conditions and the following disclaimer in the
  11. * documentation and/or other materials provided with the distribution.
  12. * * Neither the name of the OpenSimulator Project nor the
  13. * names of its contributors may be used to endorse or promote products
  14. * derived from this software without specific prior written permission.
  15. *
  16. * THIS SOFTWARE IS PROVIDED BY THE DEVELOPERS ``AS IS'' AND ANY
  17. * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
  18. * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
  19. * DISCLAIMED. IN NO EVENT SHALL THE CONTRIBUTORS BE LIABLE FOR ANY
  20. * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
  21. * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
  22. * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
  23. * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  24. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
  25. * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  26. */
  27. using System;
  28. using System.Reflection;
  29. using System.Threading;
  30. using log4net;
  31. namespace OpenSim.Framework.Monitoring
  32. {
  33. /// <summary>
  34. /// Manages various work items in the simulator.
  35. /// </summary>
  36. /// <remarks>
  37. /// Currently, here work can be started
  38. /// * As a long-running and monitored thread.
  39. /// * In a thread that will never timeout but where the job is expected to eventually complete.
  40. /// * In a threadpool thread that will timeout if it takes a very long time to complete (> 10 mins).
  41. /// * As a job which will be run in a single-threaded job engine. Such jobs must not incorporate delays (sleeps,
  42. /// network waits, etc.).
  43. ///
  44. /// This is an evolving approach to better manage the work that OpenSimulator is asked to do from a very diverse
  45. /// range of sources (client actions, incoming network, outgoing network calls, etc.).
  46. ///
  47. /// Util.FireAndForget is still available to insert jobs in the threadpool, though this is equivalent to
  48. /// WorkManager.RunInThreadPool().
  49. /// </remarks>
  50. public static class WorkManager
  51. {
  52. private static readonly ILog m_log = LogManager.GetLogger(System.Reflection.MethodBase.GetCurrentMethod().DeclaringType);
  53. public static JobEngine JobEngine { get; private set; }
  54. static WorkManager()
  55. {
  56. JobEngine = new JobEngine("Non-blocking non-critical job engine", "JOB ENGINE", 30000);
  57. StatsManager.RegisterStat(
  58. new Stat(
  59. "JobsWaiting",
  60. "Number of jobs waiting for processing.",
  61. "",
  62. "",
  63. "server",
  64. "jobengine",
  65. StatType.Pull,
  66. MeasuresOfInterest.None,
  67. stat => stat.Value = JobEngine.JobsWaiting,
  68. StatVerbosity.Debug));
  69. MainConsole.Instance.Commands.AddCommand(
  70. "Debug",
  71. false,
  72. "debug jobengine",
  73. "debug jobengine <start|stop|status|log>",
  74. "Start, stop, get status or set logging level of the job engine.",
  75. "If stopped then all outstanding jobs are processed immediately.",
  76. HandleControlCommand);
  77. }
  78. public static void Stop()
  79. {
  80. JobEngine.Stop();
  81. Watchdog.Stop();
  82. }
  83. public static Thread StartThread(ThreadStart start, string name, bool alarmIfTimeout = false, bool log = true)
  84. {
  85. return StartThread(start, name, ThreadPriority.Normal, true, alarmIfTimeout, null, Watchdog.DEFAULT_WATCHDOG_TIMEOUT_MS, log);
  86. }
  87. /// <summary>
  88. /// Start a new long-lived thread.
  89. /// </summary>
  90. /// <param name="start">The method that will be executed in a new thread</param>
  91. /// <param name="name">A name to give to the new thread</param>
  92. /// <param name="priority">Priority to run the thread at</param>
  93. /// <param name="isBackground">True to run this thread as a background thread, otherwise false</param>
  94. /// <param name="alarmIfTimeout">Trigger an alarm function is we have timed out</param>
  95. /// <param name="log">If true then creation of thread is logged.</param>
  96. /// <returns>The newly created Thread object</returns>
  97. public static Thread StartThread(
  98. ThreadStart start, string name, ThreadPriority priority, bool alarmIfTimeout, bool log = true)
  99. {
  100. return StartThread(start, name, priority, true, alarmIfTimeout, null, Watchdog.DEFAULT_WATCHDOG_TIMEOUT_MS, log);
  101. }
  102. /// <summary>
  103. /// Start a new thread that is tracked by the watchdog
  104. /// </summary>
  105. /// <param name="start">The method that will be executed in a new thread</param>
  106. /// <param name="name">A name to give to the new thread</param>
  107. /// <param name="priority">Priority to run the thread at</param>
  108. /// <param name="isBackground">True to run this thread as a background
  109. /// thread, otherwise false</param>
  110. /// <param name="alarmIfTimeout">Trigger an alarm function is we have timed out</param>
  111. /// <param name="alarmMethod">
  112. /// Alarm method to call if alarmIfTimeout is true and there is a timeout.
  113. /// Normally, this will just return some useful debugging information.
  114. /// </param>
  115. /// <param name="timeout">Number of milliseconds to wait until we issue a warning about timeout.</param>
  116. /// <param name="log">If true then creation of thread is logged.</param>
  117. /// <returns>The newly created Thread object</returns>
  118. public static Thread StartThread(
  119. ThreadStart start, string name, ThreadPriority priority, bool isBackground,
  120. bool alarmIfTimeout, Func<string> alarmMethod, int timeout, bool log = true)
  121. {
  122. Thread thread = new Thread(start);
  123. thread.Priority = priority;
  124. thread.IsBackground = isBackground;
  125. thread.Name = name;
  126. Watchdog.ThreadWatchdogInfo twi
  127. = new Watchdog.ThreadWatchdogInfo(thread, timeout, name)
  128. { AlarmIfTimeout = alarmIfTimeout, AlarmMethod = alarmMethod };
  129. Watchdog.AddThread(twi, name, log:log);
  130. thread.Start();
  131. return thread;
  132. }
  133. /// <summary>
  134. /// Run the callback in a new thread immediately. If the thread exits with an exception log it but do
  135. /// not propogate it.
  136. /// </summary>
  137. /// <param name="callback">Code for the thread to execute.</param>
  138. /// <param name="obj">Object to pass to the thread.</param>
  139. /// <param name="name">Name of the thread</param>
  140. public static void RunInThread(WaitCallback callback, object obj, string name, bool log = false)
  141. {
  142. if (Util.FireAndForgetMethod == FireAndForgetMethod.RegressionTest)
  143. {
  144. Culture.SetCurrentCulture();
  145. callback(obj);
  146. return;
  147. }
  148. ThreadStart ts = new ThreadStart(delegate()
  149. {
  150. try
  151. {
  152. Culture.SetCurrentCulture();
  153. callback(obj);
  154. }
  155. catch (Exception e)
  156. {
  157. m_log.Error(string.Format("[WATCHDOG]: Exception in thread {0}.", name), e);
  158. }
  159. finally
  160. {
  161. try
  162. {
  163. Watchdog.RemoveThread(log: false);
  164. }
  165. catch { }
  166. }
  167. });
  168. StartThread(ts, name, false, log:log);
  169. }
  170. /// <summary>
  171. /// Run the callback via a threadpool thread.
  172. /// </summary>
  173. /// <remarks>
  174. /// Such jobs may run after some delay but must always complete.
  175. /// </remarks>
  176. /// <param name="callback"></param>
  177. /// <param name="obj"></param>
  178. /// <param name="name">The name of the job. This is used in monitoring and debugging.</param>
  179. public static void RunInThreadPool(System.Threading.WaitCallback callback, object obj, string name, bool timeout = true)
  180. {
  181. Util.FireAndForget(callback, obj, name, timeout);
  182. }
  183. private static void HandleControlCommand(string module, string[] args)
  184. {
  185. // if (SceneManager.Instance.CurrentScene != null && SceneManager.Instance.CurrentScene != m_udpServer.Scene)
  186. // return;
  187. if (args.Length < 3)
  188. {
  189. MainConsole.Instance.Output("Usage: debug jobengine <stop|start|status|log>");
  190. return;
  191. }
  192. string subCommand = args[2];
  193. if (subCommand == "stop")
  194. {
  195. JobEngine.Stop();
  196. MainConsole.Instance.Output("Stopped job engine.");
  197. }
  198. else if (subCommand == "start")
  199. {
  200. JobEngine.Start();
  201. MainConsole.Instance.Output("Started job engine.");
  202. }
  203. else if (subCommand == "status")
  204. {
  205. MainConsole.Instance.Output("Job engine running: {0}", JobEngine.IsRunning);
  206. JobEngine.Job job = JobEngine.CurrentJob;
  207. MainConsole.Instance.Output("Current job {0}", job != null ? job.Name : "none");
  208. MainConsole.Instance.Output(
  209. "Jobs waiting: {0}", JobEngine.IsRunning ? JobEngine.JobsWaiting.ToString() : "n/a");
  210. MainConsole.Instance.Output("Log Level: {0}", JobEngine.LogLevel);
  211. }
  212. else if (subCommand == "log")
  213. {
  214. if (args.Length < 4)
  215. {
  216. MainConsole.Instance.Output("Usage: debug jobengine log <level>");
  217. return;
  218. }
  219. // int logLevel;
  220. int logLevel = int.Parse(args[3]);
  221. // if (ConsoleUtil.TryParseConsoleInt(MainConsole.Instance, args[4], out logLevel))
  222. // {
  223. JobEngine.LogLevel = logLevel;
  224. MainConsole.Instance.Output("Set debug log level to {0}", JobEngine.LogLevel);
  225. // }
  226. }
  227. else
  228. {
  229. MainConsole.Instance.Output("Unrecognized job engine subcommand {0}", subCommand);
  230. }
  231. }
  232. }
  233. }