llthreadsafequeue.h 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492
  1. /**
  2. * @file llthreadsafequeue.h
  3. * @brief Queue protected with mutexes for cross-thread use.
  4. *
  5. * $LicenseInfo:firstyear=2010&license=viewergpl$
  6. *
  7. * Copyright (c) 2010, Linden Research, Inc. (c) 2022 Henri Beauchamp.
  8. *
  9. * Second Life Viewer Source Code
  10. * The source code in this file ("Source Code") is provided by Linden Lab
  11. * to you under the terms of the GNU General Public License, version 2.0
  12. * ("GPL"), unless you have obtained a separate licensing agreement
  13. * ("Other License"), formally executed by you and Linden Lab. Terms of
  14. * the GPL can be found in doc/GPL-license.txt in this distribution, or
  15. * online at http://secondlifegrid.net/programs/open_source/licensing/gplv2
  16. *
  17. * There are special exceptions to the terms and conditions of the GPL as
  18. * it is applied to this Source Code. View the full text of the exception
  19. * in the file doc/FLOSS-exception.txt in this software distribution, or
  20. * online at
  21. * http://secondlifegrid.net/programs/open_source/licensing/flossexception
  22. *
  23. * By copying, modifying or distributing this software, you acknowledge
  24. * that you have read and understood your obligations described above,
  25. * and agree to abide by those obligations.
  26. *
  27. * ALL LINDEN LAB SOURCE CODE IS PROVIDED "AS IS." LINDEN LAB MAKES NO
  28. * WARRANTIES, EXPRESS, IMPLIED OR OTHERWISE, REGARDING ITS ACCURACY,
  29. * COMPLETENESS OR PERFORMANCE.
  30. * $/LicenseInfo$
  31. */
  32. #ifndef LL_LLTHREADSAFEQUEUE_H
  33. #define LL_LLTHREADSAFEQUEUE_H
  34. #include <chrono>
  35. #include <functional>
  36. #include <queue>
  37. #include <stdexcept>
  38. #include <string>
  39. #include <utility>
  40. #include <vector>
  41. #include "llatomic.h"
  42. #include "hbfastmap.h"
  43. #include "llthread.h" // Also includes llmutex.h
  44. // Keep this after inclusion of llmutex.h, so that LL_USE_FIBER_AWARE_MUTEX is
  45. // properly set. HB
  46. #if LL_USE_FIBER_AWARE_MUTEX
  47. # include "boost/fiber/timed_mutex.hpp"
  48. # define LL_TIMED_MUTEX_TYPE boost::fibers::timed_mutex
  49. # define LL_CV_STATUS_TYPE boost::fibers::cv_status
  50. #else
  51. # include <condition_variable>
  52. # define LL_TIMED_MUTEX_TYPE std::timed_mutex
  53. # define LL_CV_STATUS_TYPE std::cv_status
  54. #endif
  55. ///////////////////////////////////////////////////////////////////////////////
  56. // LLThreadSafeQueueInterrupt class template
  57. ///////////////////////////////////////////////////////////////////////////////
  58. class LLThreadSafeQueueInterrupt : public std::runtime_error
  59. {
  60. public:
  61. LLThreadSafeQueueInterrupt()
  62. : std::runtime_error("queue operation interrupted")
  63. {
  64. }
  65. };
  66. // Implements a thread-safe FIFO. Let the default std::queue default to
  67. // underlying std::deque. Override if desired.
  68. template<typename ElementT, typename QueueT = std::queue<ElementT> >
  69. class LLThreadSafeQueue
  70. {
  71. public:
  72. typedef ElementT value_type;
  73. typedef std::unique_lock<LL_TIMED_MUTEX_TYPE> lock_t;
  74. // Limiting the number of pending items prevents unbounded growth of the
  75. // underlying queue.
  76. LLThreadSafeQueue(size_t capacity = 1024)
  77. : mCapacity(capacity),
  78. mEmpty(false),
  79. mClosed(false)
  80. {
  81. }
  82. virtual ~LLThreadSafeQueue() = default;
  83. // Add an element to the front of queue (will block if the queue has
  84. // reached capacity).
  85. // This call will raise an interrupt error if the queue is deleted while
  86. // the caller is blocked.
  87. template <typename T> void push(T&& element)
  88. {
  89. if (!pushIfOpen(std::forward<T>(element)))
  90. {
  91. throw(LLThreadSafeQueueInterrupt());
  92. }
  93. }
  94. // Adds an element to the queue (will block if the queue has reached its
  95. // maximum capacity). Returns false if the queue is closed before push is
  96. // possible.
  97. template <typename T> bool pushIfOpen(T&& element)
  98. {
  99. lock_t lock1(mLock);
  100. while (true)
  101. {
  102. // On the producer side, it does not matter whether the queue has
  103. // been drained or not: the moment either end calls close(),
  104. // further push() operations will fail.
  105. if (mClosed)
  106. {
  107. return false;
  108. }
  109. if (push_(lock1, std::forward<T>(element)))
  110. {
  111. return true;
  112. }
  113. // Storage is full. Wait for signal.
  114. mCapacityCond.wait(lock1);
  115. }
  116. }
  117. // Tries to add an element to the front of queue without blocking. Returns
  118. // true only if the element was actually added.
  119. template <typename T> bool tryPush(T&& element)
  120. {
  121. return tryLock([this, element = std::move(element)](lock_t& lock)
  122. {
  123. if (mClosed)
  124. {
  125. return false;
  126. }
  127. return push_(lock, std::move(element));
  128. });
  129. }
  130. // Tries to add an element to the queue, blocking if full but with timeout
  131. // after specified duration. Returns true if the element was added. There
  132. // are potentially two different timeouts involved: how long to try to lock
  133. // the mutex, versus how long to wait for the queue to stop being full.
  134. // Careful settings for each timeout might be orders of magnitude apart.
  135. // However, this method conflates them.
  136. template <typename Rep, typename Period, typename T>
  137. bool tryPushFor(const std::chrono::duration<Rep, Period>& timeout,
  138. T&& element)
  139. {
  140. // Convert duration to time_point; passing the same timeout duration to
  141. // each of multiple calls is wrong.
  142. return tryPushUntil(std::chrono::steady_clock::now() + timeout,
  143. std::forward<T>(element));
  144. }
  145. // Tries to add an element to the queue, blocking if full but with timeout
  146. // at specified time_point. Returns true if the element was added.
  147. template <typename Clock, typename Duration, typename T>
  148. bool tryPushUntil(const std::chrono::time_point<Clock, Duration>& until,
  149. T&& element)
  150. {
  151. return tryLockUntil(until,
  152. [this, until,
  153. element = std::move(element)](lock_t& lock)
  154. {
  155. while (!mClosed)
  156. {
  157. if (push_(lock, std::move(element)))
  158. {
  159. return true;
  160. }
  161. if (LL_CV_STATUS_TYPE::timeout ==
  162. mCapacityCond.wait_until(lock,
  163. until))
  164. {
  165. // Timed out; formally we might re-
  166. // check both conditions above.
  167. return false;
  168. }
  169. // If we did not time out, we were notified
  170. // for some reason. Loop back to check.
  171. }
  172. return false;
  173. });
  174. }
  175. // Pops the element at the end of the queue (will block if the queue is
  176. // empty). This call will raise an interrupt error if the queue is deleted
  177. // while the caller is blocked.
  178. ElementT pop()
  179. {
  180. lock_t lock1(mLock);
  181. ElementT value;
  182. while (true)
  183. {
  184. // On the consumer side, we always try to pop before checking
  185. // mClosed so we can finish draining the queue.
  186. pop_result popped = pop_(lock1, value);
  187. if (popped == POPPED)
  188. {
  189. return value;
  190. }
  191. // Once the queue is DONE, there will never be any more coming.
  192. if (popped == DONE)
  193. {
  194. throw(LLThreadSafeQueueInterrupt());
  195. }
  196. // If we did not pop because WAITING, i.e. canPop() returned false,
  197. // then even if the producer end has been closed, there is still at
  198. // least one item to drain: wait for it. Or we might be EMPTY, with
  199. // the queue still open. Either way, wait for signal.
  200. mEmptyCond.wait(lock1);
  201. }
  202. }
  203. // Pops an element from the end of the queue if there is one available.
  204. // Returns true only if an element was popped.
  205. bool tryPop(ElementT& element)
  206. {
  207. return tryLock([this, &element](lock_t& lock)
  208. {
  209. // Conflate EMPTY, DONE, WAITING: tryPop() behavior
  210. // when the closed queue is implemented by simple
  211. // inability to push any new elements.
  212. return pop_(lock, element) == POPPED;
  213. });
  214. }
  215. // Pops an element from the end of the queue, blocking if empty, with
  216. // timeout after specified duration. Returns true if an element was popped.
  217. template <typename Rep, typename Period>
  218. bool tryPopFor(const std::chrono::duration<Rep, Period>& timeout,
  219. ElementT& element)
  220. {
  221. // Convert duration to time_point; passing the same timeout duration to
  222. // each of multiple calls is wrong.
  223. return tryPopUntil(std::chrono::steady_clock::now() + timeout,
  224. element);
  225. }
  226. // Pops the element at the head of the queue, blocking if empty, with
  227. // timeout after specified duration. Returns true if an element was popped.
  228. template <typename Clock, typename Duration>
  229. bool tryPopUntil(const std::chrono::time_point<Clock, Duration>& until,
  230. ElementT& element)
  231. {
  232. return tryLockUntil(until,
  233. [this, until, &element](lock_t& lock)
  234. {
  235. // Conflate EMPTY, DONE, WAITING
  236. return tryPopUntil_(lock, until,
  237. element) == POPPED;
  238. });
  239. }
  240. // Returns the size of the queue.
  241. size_t size()
  242. {
  243. lock_t lock(mLock);
  244. return mStorage.size();
  245. }
  246. // Returns true when the storage is empty (lock-less and yet thread-safe
  247. // since mEmpty is atomic). HB
  248. LL_INLINE bool empty() { return mEmpty; }
  249. // Returns the capacity of the queue.
  250. LL_INLINE size_t capacity() { return mCapacity; }
  251. // Closes the queue:
  252. // - Every subsequent push() call will throw LLThreadSafeQueueInterrupt.
  253. // - Every subsequent tryPush() call will return false.
  254. // - pop() calls will return normally until the queue is drained, then
  255. // every subsequent pop() will throw LLThreadSafeQueueInterrupt.
  256. // - tryPop() calls will return normally until the queue is drained,
  257. // then every subsequent tryPop() call will return false.
  258. void close()
  259. {
  260. mClosed = true;
  261. // Wake up any blocked pop() calls
  262. mEmptyCond.notify_all();
  263. // Wake up any blocked push() calls
  264. mCapacityCond.notify_all();
  265. }
  266. // Producer's end: are we prevented from pushing any additional items ?
  267. // Now lock-less and yet thread-safe since I made mClosed atomic. HB
  268. LL_INLINE bool isClosed() { return mClosed; }
  269. // Consumer's end: are we done, is the queue entirely drained ?
  270. // Modified to take an optional 'work_remaining' pointer, that, when non
  271. // NULL, allows to return the size of the queue (saves a lock(), when
  272. // compared to using a separated size() call, and provides the actual size
  273. // as seen by done(), not the size micro seconds later, which could be
  274. // different already). HB
  275. bool done(size_t* work_remaining = NULL)
  276. {
  277. lock_t lock(mLock);
  278. size_t size = mStorage.size();
  279. if (work_remaining)
  280. {
  281. *work_remaining = size;
  282. }
  283. return !size && mClosed;
  284. }
  285. // Return the number of elements popped from the queue by the thread
  286. // calling this method now. HB
  287. U32 getCalls()
  288. {
  289. lock_t lock(mLock);
  290. threads_stats_t::const_iterator it =
  291. mStats.find(LLThread::thisThreadIdHash());
  292. return it != mStats.end() ? it->second : 0;
  293. }
  294. protected:
  295. enum pop_result { EMPTY, DONE, WAITING, POPPED };
  296. // Implementation logic, suitable for passing to tryLockUntil()
  297. template <typename Clock, typename Duration>
  298. pop_result tryPopUntil_(lock_t& lock,
  299. const std::chrono::time_point<Clock, Duration>& until,
  300. ElementT& element)
  301. {
  302. while (true)
  303. {
  304. pop_result popped = pop_(lock, element);
  305. if (popped == POPPED || popped == DONE)
  306. {
  307. // If we succeeded, great ! If we have drained the last item,
  308. // so be it. Either way, break the loop and tell caller.
  309. return popped;
  310. }
  311. // EMPTY or WAITING: wait for signal.
  312. if (LL_CV_STATUS_TYPE::timeout ==
  313. mEmptyCond.wait_until(lock, until))
  314. {
  315. // Timed out: formally we might re-check as it is, break loop.
  316. return popped;
  317. }
  318. // If we did not time out, we were notified for some reason. Loop
  319. // back to check.
  320. }
  321. }
  322. // I we are able to lock immediately, does so and runs the passed callable,
  323. // which must accept lock_t& and return bool.
  324. template <typename CALLABLE>
  325. bool tryLock(CALLABLE&& callable)
  326. {
  327. lock_t lock1(mLock, std::defer_lock);
  328. if (!lock1.try_lock())
  329. {
  330. return false;
  331. }
  332. return std::forward<CALLABLE>(callable)(lock1);
  333. }
  334. // I we are able to lock before the passed time_point, does so and runs the
  335. // passed callable, which must accept lock_t& and return bool.
  336. template <typename Clock, typename Duration, typename CALLABLE>
  337. bool tryLockUntil(const std::chrono::time_point<Clock, Duration>& until,
  338. CALLABLE&& callable)
  339. {
  340. lock_t lock1(mLock, std::defer_lock);
  341. if (!lock1.try_lock_until(until))
  342. {
  343. return false;
  344. }
  345. return std::forward<CALLABLE>(callable)(lock1);
  346. }
  347. // While lock is locked, really pushes the passed element, if possible.
  348. template <typename T>
  349. bool push_(lock_t& lock, T&& element)
  350. {
  351. if (mStorage.size() >= mCapacity)
  352. {
  353. return false;
  354. }
  355. mStorage.push(std::forward<T>(element));
  356. mEmpty = false;
  357. lock.unlock();
  358. // Now that we have pushed, if somebody has been waiting to pop, signal
  359. // them.
  360. mEmptyCond.notify_one();
  361. return true;
  362. }
  363. // While lock is locked, really pops the head element, if possible.
  364. pop_result pop_(lock_t& lock, ElementT& element)
  365. {
  366. if (mStorage.empty())
  367. {
  368. mEmpty = true;
  369. // If mStorage is empty, there is no head element.
  370. return mClosed ? DONE : EMPTY;
  371. }
  372. if (!canPop(mStorage.front()))
  373. {
  374. return WAITING;
  375. }
  376. // QueueT::front() is the element about to pop()
  377. element = mStorage.front();
  378. mStorage.pop();
  379. mEmpty = mStorage.empty();
  380. // Add one to the number of popped elements, for this thread stats. HB
  381. ++mStats[LLThread::thisThreadIdHash()];
  382. lock.unlock();
  383. // Now that we have popped, if somebody has been waiting to push,
  384. // signal them.
  385. mCapacityCond.notify_one();
  386. return POPPED;
  387. }
  388. // Is the current head element ready to pop ? Yes by default and the sub-
  389. // class can override as needed.
  390. virtual bool canPop(const ElementT& head) const { return true; }
  391. protected:
  392. typedef QueueT queue_type;
  393. QueueT mStorage;
  394. // A map to keep track of the elements popping statistics per thread. HB
  395. typedef flat_hmap<U64, U32> threads_stats_t;
  396. threads_stats_t mStats;
  397. LL_TIMED_MUTEX_TYPE mLock;
  398. LL_COND_ANY_TYPE mCapacityCond;
  399. LL_COND_ANY_TYPE mEmptyCond;
  400. LLAtomicBool mClosed;
  401. LLAtomicBool mEmpty;
  402. size_t mCapacity;
  403. };
  404. #if 0 // Not used for now
  405. ///////////////////////////////////////////////////////////////////////////////
  406. // LLPriorityQueueAdapter class template
  407. ///////////////////////////////////////////////////////////////////////////////
  408. // std::priority_queue's API is almost like std::queue, intentionally of
  409. // course, but you must access the element about to pop() as top() rather than
  410. // than as front(). Make an adapter for use with LLThreadSafeQueue.
  411. template <typename T, typename Container = std::vector<T>,
  412. typename Compare = std::less<typename Container::value_type> >
  413. class LLPriorityQueueAdapter
  414. {
  415. public:
  416. // Publish all the same types
  417. typedef std::priority_queue<T, Container, Compare> queue_type;
  418. typedef typename queue_type::container_type container_type;
  419. typedef typename queue_type::value_type value_type;
  420. typedef typename queue_type::size_type size_type;
  421. typedef typename queue_type::reference reference;
  422. typedef typename queue_type::const_reference const_reference;
  423. // Although std::queue defines both const and non-const front() methods,
  424. // std::priority_queue defines only const top().
  425. LL_INLINE const_reference front() const { return mQ.top(); }
  426. // All the rest of these merely forward to the corresponding queue_type
  427. // methods.
  428. LL_INLINE bool empty() const { return mQ.empty(); }
  429. LL_INLINE size_type size() const { return mQ.size(); }
  430. LL_INLINE void push(const value_type& value) { mQ.push(value); }
  431. LL_INLINE void push(value_type&& value) { mQ.push(std::move(value)); }
  432. LL_INLINE void pop() { mQ.pop(); }
  433. template <typename... Args>
  434. LL_INLINE void emplace(Args&&... args)
  435. {
  436. mQ.emplace(std::forward<Args>(args)...);
  437. }
  438. private:
  439. queue_type mQ;
  440. };
  441. #endif
  442. #endif // LL_LLTHREADSAFEQUEUE_H