llworkqueue.h 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400
  1. /**
  2. * @file llworkqueue.h
  3. * @brief Queue used for inter-thread work passing.
  4. * @author Nat Goodspeed
  5. * @date 2021-09-30
  6. *
  7. * $LicenseInfo:firstyear=2021&license=viewergpl$
  8. *
  9. * Copyright (c) 2021, Linden Research, Inc. (c) 2022 Henri Beauchamp.
  10. *
  11. * Second Life Viewer Source Code
  12. * The source code in this file ("Source Code") is provided by Linden Lab
  13. * to you under the terms of the GNU General Public License, version 2.0
  14. * ("GPL"), unless you have obtained a separate licensing agreement
  15. * ("Other License"), formally executed by you and Linden Lab. Terms of
  16. * the GPL can be found in doc/GPL-license.txt in this distribution, or
  17. * online at http://secondlifegrid.net/programs/open_source/licensing/gplv2
  18. *
  19. * There are special exceptions to the terms and conditions of the GPL as
  20. * it is applied to this Source Code. View the full text of the exception
  21. * in the file doc/FLOSS-exception.txt in this software distribution, or
  22. * online at
  23. * http://secondlifegrid.net/programs/open_source/licensing/flossexception
  24. *
  25. * By copying, modifying or distributing this software, you acknowledge
  26. * that you have read and understood your obligations described above,
  27. * and agree to abide by those obligations.
  28. *
  29. * ALL LINDEN LAB SOURCE CODE IS PROVIDED "AS IS." LINDEN LAB MAKES NO
  30. * WARRANTIES, EXPRESS, IMPLIED OR OTHERWISE, REGARDING ITS ACCURACY,
  31. * COMPLETENESS OR PERFORMANCE.
  32. * $/LicenseInfo$
  33. */
  34. #ifndef LL_WORKQUEUE_H
  35. #define LL_WORKQUEUE_H
  36. #include <exception> // For std::current_exception
  37. #include "llcoros.h"
  38. #include "llthreadsafequeue.h"
  39. class LLWorkQueue
  40. : public LLInstanceTracker<LLWorkQueue, std::string,
  41. // Allow replacing an old, deleted work queue
  42. // with a new one (needed for restoreGL() and
  43. // the GL image worker). HB
  44. LLInstanceTrackerReplaceOnCollision>
  45. {
  46. protected:
  47. LOG_CLASS(LLWorkQueue);
  48. public:
  49. using Work = std::function<void()>;
  50. private:
  51. using super = LLInstanceTracker<LLWorkQueue, std::string,
  52. LLInstanceTrackerReplaceOnCollision>;
  53. // Changed from LLThreadSafeSchedule to LLThreadSafeQueue, to get rid of
  54. // the useless and slow std::chrono timestamps. HB
  55. using Queue = LLThreadSafeQueue<Work>;
  56. public:
  57. using Closed = LLThreadSafeQueueInterrupt;
  58. // For runFor(), runUntil()
  59. using TimePoint = std::chrono::steady_clock::time_point;
  60. struct Error : public std::runtime_error
  61. {
  62. Error(const std::string& what)
  63. : std::runtime_error(what)
  64. {
  65. }
  66. };
  67. // You may omit the LLWorkQueue name, in which case a unique name is
  68. // synthesized; for practical purposes that makes it anonymous.
  69. LLWorkQueue(const std::string& name = std::string(),
  70. // The default capacity is huge to avoid blocking the main
  71. // thread due to a starvation.
  72. U32 capacity = 1024 * 1024)
  73. : super(makeName(name)),
  74. mQueue(capacity)
  75. {
  76. }
  77. // Since the point of LLWorkQueue is to pass work to some other worker
  78. // thread(s) asynchronously, it is important that the LLWorkQueue continues
  79. // to exist until the worker thread(s) have drained it. To communicate that
  80. // it is time for them to quit, close() the queue.
  81. LL_INLINE void close() { mQueue.close(); }
  82. // LLWorkQueue supports multiple producers and multiple consumers. In the
  83. // general case it is misleading to test size(), since any other thread
  84. // might change it the nanosecond the lock is released. On that basis, some
  85. // might argue against publishing a size() method at all. But there are two
  86. // specific cases in which a test based on size() might be reasonable:
  87. // - If you are the only producer, noticing that size() == 0 is meaningful.
  88. // - If you are the only consumer, noticing that size() > 0 is meaningful.
  89. LL_INLINE U32 size() { return mQueue.size(); }
  90. // Returns true when the storage is empty (lock-less and yet thread-safe
  91. // since based on a cached atomic boolean). HB
  92. LL_INLINE bool empty() { return mQueue.empty(); }
  93. // Producer's end: are we prevented from pushing any additional items ?
  94. LL_INLINE bool isClosed() { return mQueue.isClosed(); }
  95. // Consumer's end: are we done, is the queue entirely drained ?
  96. LL_INLINE bool done() { return mQueue.done(); }
  97. // Statistics (number of completed operations) for the thread calling this.
  98. LL_INLINE U32 getCalls() { return mQueue.getCalls(); }
  99. //------------------------ Fire and forget API --------------------------//
  100. // Fire and forget
  101. template <typename CALLABLE>
  102. LL_INLINE void post(CALLABLE&& callable)
  103. {
  104. mQueue.push(std::move(callable));
  105. }
  106. // Posts work, unless the queue is closed before we can post.
  107. template <typename CALLABLE>
  108. LL_INLINE bool postIfOpen(CALLABLE&& callable)
  109. {
  110. return mQueue.pushIfOpen(std::move(callable));
  111. }
  112. // Posts work to be run to another LLWorkQueue, which may or may not still
  113. // exist and be open. Returns true if we were able to post.
  114. template <typename CALLABLE>
  115. static bool postMaybe(weak_t target, CALLABLE&& callable)
  116. {
  117. // We are being asked to post to the LLWorkQueue at 'target', which is
  118. // a weak_ptr: have to lock it to check it.
  119. auto tptr = target.lock();
  120. if (tptr)
  121. {
  122. try
  123. {
  124. tptr->post(std::forward<CALLABLE>(callable));
  125. return true; // We were able to post()
  126. }
  127. catch (const Closed&)
  128. {
  129. // The LLWorkQueue still exists, but is Closed
  130. }
  131. }
  132. // Either target no longer exists, or its LLWorkQueue is Closed.
  133. return false;
  134. }
  135. template <typename CALLABLE>
  136. LL_INLINE bool tryPost(CALLABLE&& callable)
  137. {
  138. return mQueue.tryPush(std::move(callable));
  139. }
  140. //-------------------------- Handshaking API ----------------------------//
  141. // Posts work to another LLWorkQueue to be run at a specified time,
  142. // requesting a specific callback to be run on this LLWorkQueue on
  143. // completion. Returns true if we were able to post, false if the other
  144. // LLWorkQueue is inaccessible.
  145. template <typename CALLABLE, typename FOLLOWUP>
  146. bool postTo(weak_t target, CALLABLE&& callable, FOLLOWUP&& callback)
  147. {
  148. // We are being asked to post to the LLWorkQueue at 'target', which is
  149. // a weak_ptr: have to lock it to check it.
  150. auto tptr = target.lock();
  151. if (!tptr)
  152. {
  153. // Cannot post() if the target LLWorkQueue has been destroyed
  154. return false;
  155. }
  156. // Here we believe target LLWorkQueue still exists. Post to it a lambda
  157. // that packages our callable, our callback and a weak_ptr to this
  158. // originating LLWorkQueue.
  159. tptr->post([reply = super::getWeak(), callable = std::move(callable),
  160. callback = std::move(callback)]() mutable
  161. {
  162. // Use postMaybe() below in case this originating
  163. // LLWorkQueue has been closed or destroyed. Remember,
  164. // the outer lambda is now running on a thread
  165. // servicing the target LLWorkQueue, and real time has
  166. // elapsed since postTo()'s tptr->post() call.
  167. try
  168. {
  169. // Make a reply lambda to repost to THIS
  170. // LLWorkQueue. Delegate to makeReplyLambda() so we
  171. // can partially specialize on void return.
  172. postMaybe(reply,
  173. makeReplyLambda(std::move(callable),
  174. std::move(callback)));
  175. }
  176. catch (...)
  177. {
  178. // Either variant of makeReplyLambda() is
  179. // responsible for calling the caller's callable.
  180. // If that throws, return the exception to the
  181. // originating thread.
  182. postMaybe(reply,
  183. // Bind current exception to transport
  184. // back to the originating LLWorkQueue.
  185. // Once there, rethrow it.
  186. [exc = std::current_exception()]()
  187. { std::rethrow_exception(exc); });
  188. }
  189. });
  190. // It looks like we were able to post()...
  191. return true;
  192. }
  193. // Not currently in use: define LL_WAIT_FOR_RESULT to non-zero at compile
  194. // time if at all needed. HB
  195. #if LL_WAIT_FOR_RESULT
  196. // Posts work to another LLWorkQueue, blocking the calling coroutine until
  197. // then, returning the result to caller on completion.
  198. // In general, we assume that each thread's default coroutine is busy
  199. // servicing its LLWorkQueue or whatever. To try to prevent mistakes, we
  200. // forbid calling waitForResult() from a thread's default coroutine.
  201. template <typename CALLABLE>
  202. LL_INLINE auto waitForResult(CALLABLE&& callable)
  203. {
  204. checkCoroutine("waitForResult()");
  205. return WaitForResult<CALLABLE,
  206. decltype(std::forward<CALLABLE>(callable)())>()
  207. (this, std::forward<CALLABLE>(callable));
  208. }
  209. #endif
  210. //----------------------------- Worker API ------------------------------//
  211. // Pulls work items off this LLWorkQueue until the queue is closed, at
  212. // which point it returns. This would be the typical entry point for a
  213. // simple worker thread.
  214. void runUntilClose();
  215. // Runs all work items that are ready to run. Returns true if the queue
  216. // remains open, false if the queue has been closed. This could be used by
  217. // a thread whose primary purpose is to serve the queue, but also wants to
  218. // do other things with its idle time.
  219. bool runPending();
  220. // Runs at most one ready work item (zero if none are ready). Returns
  221. // true if the queue remains open, false if it has been closed.
  222. bool runOne();
  223. // Runs a subset of ready work items, until the timeslice has been
  224. // exceeded. Returns true if the queue remains open, false if the queue has
  225. // been closed. This could be used by a busy main thread to lend a bounded
  226. // few CPU cycles to this LLWorkQueue without risking it blowing out the
  227. // length of any one frame.
  228. // Modified to take an optional 'work_remaining' pointer, that, when non
  229. // NULL, allows to return the size of the queue. HB
  230. template <typename Rep, typename Period>
  231. LL_INLINE bool runFor(const std::chrono::duration<Rep, Period>& timeslice,
  232. size_t* work_remaining = NULL)
  233. {
  234. return runUntil(TimePoint::clock::now() + timeslice, work_remaining);
  235. }
  236. // Just like runFor(), only with a specific end time instead of a timeslice
  237. // duration.
  238. // Modified to take an optional 'work_remaining' pointer, that, when non
  239. // NULL, allows to return the size of the queue. HB
  240. bool runUntil(const TimePoint& until, size_t* work_remaining = NULL);
  241. private:
  242. // General case: arbitrary C++ return type
  243. template <typename CALLABLE, typename FOLLOWUP, typename RETURNTYPE>
  244. struct MakeReplyLambda;
  245. // Specialize for CALLABLE returning void
  246. template <typename CALLABLE, typename FOLLOWUP>
  247. struct MakeReplyLambda<CALLABLE, FOLLOWUP, void>;
  248. #if LL_WAIT_FOR_RESULT
  249. // General case: arbitrary C++ return type
  250. template <typename CALLABLE, typename RETURNTYPE>
  251. struct WaitForResult;
  252. // Specialize for CALLABLE returning void
  253. template <typename CALLABLE>
  254. struct WaitForResult<CALLABLE, void>;
  255. static void checkCoroutine(const std::string& method);
  256. #endif
  257. template <typename CALLABLE, typename FOLLOWUP>
  258. LL_INLINE static auto makeReplyLambda(CALLABLE&& callable,
  259. FOLLOWUP&& callback)
  260. {
  261. return MakeReplyLambda<CALLABLE, FOLLOWUP,
  262. decltype(std::forward<CALLABLE>(callable)())>()
  263. (std::move(callable), std::move(callback));
  264. }
  265. void callWork(const Work& work);
  266. static std::string makeName(const std::string& name);
  267. static void error(const std::string& msg);
  268. private:
  269. Queue mQueue;
  270. };
  271. // General case: arbitrary C++ return type
  272. template <typename CALLABLE, typename FOLLOWUP, typename RETURNTYPE>
  273. struct LLWorkQueue::MakeReplyLambda
  274. {
  275. auto operator()(CALLABLE&& callable, FOLLOWUP&& callback)
  276. {
  277. // Call the callable in any case, but to minimize copying the result,
  278. // immediately bind it into the reply lambda. The reply lambda also
  279. // binds the original callback, so that when we, the originating
  280. // LLWorkQueue, finally receive and process the reply lambda, we will
  281. // call the bound callback with the bound result, on the same thread
  282. // that originally called postTo().
  283. return [result = std::forward<CALLABLE>(callable)(),
  284. callback = std::move(callback)]() mutable
  285. { callback(std::move(result)); };
  286. }
  287. };
  288. // Specialize for CALLABLE returning void
  289. template <typename CALLABLE, typename FOLLOWUP>
  290. struct LLWorkQueue::MakeReplyLambda<CALLABLE, FOLLOWUP, void>
  291. {
  292. auto operator()(CALLABLE&& callable, FOLLOWUP&& callback)
  293. {
  294. // Call the callable, which produces no result.
  295. std::forward<CALLABLE>(callable)();
  296. // Our completion callback is simply the caller's callback.
  297. return std::move(callback);
  298. }
  299. };
  300. #if LL_WAIT_FOR_RESULT
  301. // General case: arbitrary C++ return type
  302. template <typename CALLABLE, typename RETURNTYPE>
  303. struct LLWorkQueue::WaitForResult
  304. {
  305. auto operator()(LLWorkQueue* self, CALLABLE&& callable)
  306. {
  307. LLCoros::Promise<RETURNTYPE> promise;
  308. self->post(// We dare to bind a reference to Promise because it is
  309. // specifically designed for cross-thread communication.
  310. [&promise, callable = std::move(callable)]() mutable
  311. {
  312. try
  313. {
  314. // Call the caller's callable and trigger promise
  315. // with result
  316. promise.set_value(callable());
  317. }
  318. catch (...)
  319. {
  320. promise.set_exception(std::current_exception());
  321. }
  322. });
  323. auto future{ LLCoros::getFuture(promise) };
  324. // Now, on the calling thread, wait for that result
  325. return future.get();
  326. }
  327. };
  328. // Specialize for CALLABLE returning void
  329. template <typename CALLABLE>
  330. struct LLWorkQueue::WaitForResult<CALLABLE, void>
  331. {
  332. auto operator()(LLWorkQueue* self, CALLABLE&& callable)
  333. {
  334. LLCoros::Promise<void> promise;
  335. self->post(// We dare to bind a reference to Promise because it is
  336. // specifically designed for cross-thread communication.
  337. [&promise, callable = std::move(callable)]() mutable
  338. {
  339. try
  340. {
  341. callable();
  342. promise.set_value();
  343. }
  344. catch (...)
  345. {
  346. promise.set_exception(std::current_exception());
  347. }
  348. });
  349. auto future{ LLCoros::getFuture(promise) };
  350. // Now, on the calling thread, wait for that result
  351. return future.get();
  352. }
  353. };
  354. #endif // LL_WAIT_FOR_RESULT
  355. #endif // LL_WORKQUEUE_H