WorkManager.cs 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314
  1. /*
  2. * Copyright (c) Contributors, http://opensimulator.org/
  3. * See CONTRIBUTORS.TXT for a full list of copyright holders.
  4. *
  5. * Redistribution and use in source and binary forms, with or without
  6. * modification, are permitted provided that the following conditions are met:
  7. * * Redistributions of source code must retain the above copyright
  8. * notice, this list of conditions and the following disclaimer.
  9. * * Redistributions in binary form must reproduce the above copyright
  10. * notice, this list of conditions and the following disclaimer in the
  11. * documentation and/or other materials provided with the distribution.
  12. * * Neither the name of the OpenSimulator Project nor the
  13. * names of its contributors may be used to endorse or promote products
  14. * derived from this software without specific prior written permission.
  15. *
  16. * THIS SOFTWARE IS PROVIDED BY THE DEVELOPERS ``AS IS'' AND ANY
  17. * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
  18. * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
  19. * DISCLAIMED. IN NO EVENT SHALL THE CONTRIBUTORS BE LIABLE FOR ANY
  20. * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
  21. * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
  22. * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
  23. * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  24. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
  25. * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  26. */
  27. using System;
  28. using System.Runtime.CompilerServices;
  29. using System.Threading;
  30. using log4net;
  31. namespace OpenSim.Framework.Monitoring
  32. {
  33. /// <summary>
  34. /// Manages various work items in the simulator.
  35. /// </summary>
  36. /// <remarks>
  37. /// Currently, here work can be started
  38. /// * As a long-running and monitored thread.
  39. /// * In a thread that will never timeout but where the job is expected to eventually complete.
  40. /// * In a threadpool thread that will timeout if it takes a very long time to complete (> 10 mins).
  41. /// * As a job which will be run in a single-threaded job engine. Such jobs must not incorporate delays (sleeps,
  42. /// network waits, etc.).
  43. ///
  44. /// This is an evolving approach to better manage the work that OpenSimulator is asked to do from a very diverse
  45. /// range of sources (client actions, incoming network, outgoing network calls, etc.).
  46. ///
  47. /// </remarks>
  48. public static class WorkManager
  49. {
  50. private static readonly ILog m_log = LogManager.GetLogger(System.Reflection.MethodBase.GetCurrentMethod().DeclaringType);
  51. public static JobEngine JobEngine { get; private set; }
  52. static WorkManager()
  53. {
  54. JobEngine = new JobEngine("Non-blocking non-critical job engine", "JOB ENGINE", 30000);
  55. StatsManager.RegisterStat(
  56. new Stat(
  57. "JobsWaiting",
  58. "Number of jobs waiting for processing.",
  59. "",
  60. "",
  61. "server",
  62. "jobengine",
  63. StatType.Pull,
  64. MeasuresOfInterest.None,
  65. stat => stat.Value = JobEngine == null ? 0 : JobEngine.JobsWaiting,
  66. StatVerbosity.Debug));
  67. MainConsole.Instance.Commands.AddCommand(
  68. "Debug",
  69. false,
  70. "debug jobengine",
  71. "debug jobengine <start|stop|status|log>",
  72. "Start, stop, get status or set logging level of the job engine.",
  73. "If stopped then all outstanding jobs are processed immediately.",
  74. HandleControlCommand);
  75. }
  76. public static void Stop()
  77. {
  78. JobEngine.Stop();
  79. Watchdog.Stop();
  80. }
  81. [MethodImpl(MethodImplOptions.AggressiveInlining)]
  82. public static Thread StartThread(ThreadStart start, string name, bool alarmIfTimeout = false, bool log = true)
  83. {
  84. return StartThread(start, name, ThreadPriority.Normal, true, alarmIfTimeout, null, Watchdog.DEFAULT_WATCHDOG_TIMEOUT_MS, log);
  85. }
  86. /// <summary>
  87. /// Start a new long-lived thread.
  88. /// </summary>
  89. /// <param name="start">The method that will be executed in a new thread</param>
  90. /// <param name="name">A name to give to the new thread</param>
  91. /// <param name="priority">Priority to run the thread at</param>
  92. /// <param name="isBackground">True to run this thread as a background thread, otherwise false</param>
  93. /// <param name="alarmIfTimeout">Trigger an alarm function is we have timed out</param>
  94. /// <param name="log">If true then creation of thread is logged.</param>
  95. /// <returns>The newly created Thread object</returns>
  96. [MethodImpl(MethodImplOptions.AggressiveInlining)]
  97. public static Thread StartThread(
  98. ThreadStart start, string name, ThreadPriority priority, bool alarmIfTimeout, bool log = true)
  99. {
  100. return StartThread(start, name, priority, true, alarmIfTimeout, null, Watchdog.DEFAULT_WATCHDOG_TIMEOUT_MS, log);
  101. }
  102. /// <summary>
  103. /// Start a new thread that is tracked by the watchdog
  104. /// </summary>
  105. /// <param name="start">The method that will be executed in a new thread</param>
  106. /// <param name="name">A name to give to the new thread</param>
  107. /// <param name="priority">Priority to run the thread at</param>
  108. /// <param name="isBackground">True to run this thread as a background
  109. /// thread, otherwise false</param>
  110. /// <param name="alarmIfTimeout">Trigger an alarm function is we have timed out</param>
  111. /// <param name="alarmMethod">
  112. /// Alarm method to call if alarmIfTimeout is true and there is a timeout.
  113. /// Normally, this will just return some useful debugging information.
  114. /// </param>
  115. /// <param name="timeout">Number of milliseconds to wait until we issue a warning about timeout.</param>
  116. /// <param name="log">If true then creation of thread is logged.</param>
  117. /// <returns>The newly created Thread object</returns>
  118. public static Thread StartThread(
  119. ThreadStart start, string name, ThreadPriority priority, bool isBackground,
  120. bool alarmIfTimeout, Func<string> alarmMethod, int timeout, bool log = true, bool SuspendFlow = true)
  121. {
  122. Thread thread;
  123. if(SuspendFlow)
  124. {
  125. using (ExecutionContext.SuppressFlow())
  126. {
  127. thread = new Thread(start);
  128. }
  129. }
  130. else
  131. {
  132. thread = new Thread(start);
  133. }
  134. thread.Priority = priority;
  135. thread.IsBackground = isBackground;
  136. thread.Name = name;
  137. Watchdog.ThreadWatchdogInfo twi = new Watchdog.ThreadWatchdogInfo(thread, timeout, name)
  138. {
  139. AlarmIfTimeout = alarmIfTimeout,
  140. AlarmMethod = alarmMethod
  141. };
  142. Watchdog.AddThread(twi, name, log);
  143. thread.Start();
  144. return thread;
  145. }
  146. public static Thread StartThread(
  147. ThreadStart start, string name, ThreadPriority priority, int stackSize = -1, bool suspendflow = true)
  148. {
  149. Thread thread;
  150. if (suspendflow)
  151. {
  152. using (ExecutionContext.SuppressFlow())
  153. {
  154. if (stackSize > 0)
  155. thread = new Thread(start, stackSize);
  156. else
  157. thread = new Thread(start);
  158. }
  159. }
  160. else
  161. {
  162. if (stackSize > 0)
  163. thread = new Thread(start, stackSize);
  164. else
  165. thread = new Thread(start);
  166. }
  167. thread.Priority = priority;
  168. thread.IsBackground = true;
  169. thread.Name = name;
  170. Watchdog.ThreadWatchdogInfo twi = new Watchdog.ThreadWatchdogInfo(thread, Watchdog.DEFAULT_WATCHDOG_TIMEOUT_MS, name)
  171. {
  172. AlarmIfTimeout = false,
  173. AlarmMethod = null
  174. };
  175. Watchdog.AddThread(twi, name, false);
  176. thread.Start();
  177. return thread;
  178. }
  179. /// <summary>
  180. /// Run the callback in a new thread immediately. If the thread exits with an exception log it but do
  181. /// not propogate it.
  182. /// </summary>
  183. /// <param name="callback">Code for the thread to execute.</param>
  184. /// <param name="obj">Object to pass to the thread.</param>
  185. /// <param name="name">Name of the thread</param>
  186. public static void RunInThread(WaitCallback callback, object obj, string name, bool log = false)
  187. {
  188. if (Util.FireAndForgetMethod == FireAndForgetMethod.RegressionTest)
  189. {
  190. Culture.SetCurrentCulture();
  191. callback(obj);
  192. return;
  193. }
  194. ThreadStart ts = new ThreadStart(delegate()
  195. {
  196. try
  197. {
  198. Culture.SetCurrentCulture();
  199. callback(obj);
  200. }
  201. catch (Exception e)
  202. {
  203. m_log.Error(string.Format("[WATCHDOG]: Exception in thread {0}.", name), e);
  204. }
  205. finally
  206. {
  207. try
  208. {
  209. Watchdog.RemoveThread(log: false);
  210. }
  211. catch { }
  212. }
  213. });
  214. StartThread(ts, name, false, log);
  215. }
  216. /// <summary>
  217. /// Run the callback via a threadpool thread.
  218. /// </summary>
  219. /// <remarks>
  220. /// Such jobs may run after some delay but must always complete.
  221. /// </remarks>
  222. /// <param name="callback"></param>
  223. /// <param name="obj"></param>
  224. /// <param name="name">The name of the job. This is used in monitoring and debugging.</param>
  225. [MethodImpl(MethodImplOptions.AggressiveInlining)]
  226. public static void RunInThreadPool(WaitCallback callback, object obj, string name, bool timeout = true)
  227. {
  228. Util.FireAndForget(callback, obj, name, timeout);
  229. }
  230. [MethodImpl(MethodImplOptions.AggressiveInlining)]
  231. public static void RunInThreadPool(WaitCallback callback, string name, bool timeout = true)
  232. {
  233. Util.FireAndForget(callback, null, name, timeout);
  234. }
  235. private static void HandleControlCommand(string module, string[] args)
  236. {
  237. // if (SceneManager.Instance.CurrentScene != null && SceneManager.Instance.CurrentScene != m_udpServer.Scene)
  238. // return;
  239. if (args.Length < 3)
  240. {
  241. MainConsole.Instance.Output("Usage: debug jobengine <stop|start|status|log>");
  242. return;
  243. }
  244. string subCommand = args[2];
  245. if (subCommand == "stop")
  246. {
  247. JobEngine.Stop();
  248. MainConsole.Instance.Output("Stopped job engine.");
  249. }
  250. else if (subCommand == "start")
  251. {
  252. JobEngine.Start();
  253. MainConsole.Instance.Output("Started job engine.");
  254. }
  255. else if (subCommand == "status")
  256. {
  257. MainConsole.Instance.Output("Job engine running: {0}", JobEngine.IsRunning);
  258. MainConsole.Instance.Output(
  259. "Jobs waiting: {0}", JobEngine.IsRunning ? JobEngine.JobsWaiting.ToString() : "n/a");
  260. MainConsole.Instance.Output("Log Level: {0}", JobEngine.LogLevel);
  261. }
  262. else if (subCommand == "log")
  263. {
  264. if (args.Length < 4)
  265. {
  266. MainConsole.Instance.Output("Usage: debug jobengine log <level>");
  267. return;
  268. }
  269. // int logLevel;
  270. int logLevel = int.Parse(args[3]);
  271. // if (ConsoleUtil.TryParseConsoleInt(MainConsole.Instance, args[4], out logLevel))
  272. // {
  273. JobEngine.LogLevel = logLevel;
  274. MainConsole.Instance.Output("Set debug log level to {0}", JobEngine.LogLevel);
  275. // }
  276. }
  277. else
  278. {
  279. MainConsole.Instance.Output("Unrecognized job engine subcommand {0}", subCommand);
  280. }
  281. }
  282. }
  283. }