llqueuedthread.cpp 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462
  1. /**
  2. * @file llqueuedthread.cpp
  3. *
  4. * $LicenseInfo:firstyear=2004&license=viewergpl$
  5. *
  6. * Copyright (c) 2004-2009, Linden Research, Inc. (c) 2022 Henri Beauchamp.
  7. *
  8. * Second Life Viewer Source Code
  9. * The source code in this file ("Source Code") is provided by Linden Lab
  10. * to you under the terms of the GNU General Public License, version 2.0
  11. * ("GPL"), unless you have obtained a separate licensing agreement
  12. * ("Other License"), formally executed by you and Linden Lab. Terms of
  13. * the GPL can be found in doc/GPL-license.txt in this distribution, or
  14. * online at http://secondlifegrid.net/programs/open_source/licensing/gplv2
  15. *
  16. * There are special exceptions to the terms and conditions of the GPL as
  17. * it is applied to this Source Code. View the full text of the exception
  18. * in the file doc/FLOSS-exception.txt in this software distribution, or
  19. * online at
  20. * http://secondlifegrid.net/programs/open_source/licensing/flossexception
  21. *
  22. * By copying, modifying or distributing this software, you acknowledge
  23. * that you have read and understood your obligations described above,
  24. * and agree to abide by those obligations.
  25. *
  26. * ALL LINDEN LAB SOURCE CODE IS PROVIDED "AS IS." LINDEN LAB MAKES NO
  27. * WARRANTIES, EXPRESS, IMPLIED OR OTHERWISE, REGARDING ITS ACCURACY,
  28. * COMPLETENESS OR PERFORMANCE.
  29. * $/LicenseInfo$
  30. */
  31. #include "linden_common.h"
  32. #include "llqueuedthread.h"
  33. #include "llstl.h"
  34. #include "lltimer.h" // For ms_sleep()
  35. ///////////////////////////////////////////////////////////////////////////////
  36. // LLQueuedThread class
  37. ///////////////////////////////////////////////////////////////////////////////
  38. // Main thread
  39. LLQueuedThread::LLQueuedThread(const std::string& name)
  40. : LLThread(name),
  41. mNextHandle(0),
  42. mIdleThread(true)
  43. {
  44. // Always set the LLThread to the paused state before starting it, so that
  45. // the calling thread may fully initialize before actually starting the
  46. // LLQueuedThread processing, which shall be done either with an explicit
  47. // call to unpause() or via update(). HB
  48. pause();
  49. // Launch the LLThread
  50. start();
  51. }
  52. // Main thread
  53. LLQueuedThread::~LLQueuedThread()
  54. {
  55. shutdown();
  56. // ~LLThread() will be called here
  57. }
  58. // Main thread
  59. void LLQueuedThread::shutdown()
  60. {
  61. llinfos << "Shutting down: " << mName << llendl;
  62. setQuitting();
  63. llinfos << mName << " has been set quitting." << llendl;
  64. unpause(); // Main thread
  65. llinfos << "Waiting for "<< mName << " to stop..." << llendl;
  66. S32 timeout = 1000;
  67. for ( ; timeout > 0; --timeout)
  68. {
  69. if (isStopped())
  70. {
  71. break;
  72. }
  73. ms_sleep(10);
  74. }
  75. if (timeout == 0)
  76. {
  77. llwarns << mName << " timed out !" << llendl;
  78. }
  79. else
  80. {
  81. llinfos << mName << " stopped." << llendl;
  82. }
  83. LLMutexTrylock locker(mDataLock, 100); // Try for 1 second. HB
  84. if (!locker.isLocked())
  85. {
  86. llwarns << "Data lock busy for: " << mName << llendl;
  87. }
  88. // Continue nonetheless... The thread is stopped at this point, so... HB
  89. S32 active_count = 0;
  90. request_map_t::iterator it = mRequestMap.begin();
  91. while (it != mRequestMap.end())
  92. {
  93. QueuedRequest* req = it->second;
  94. S32 status = req->getStatus();
  95. if (status == STATUS_QUEUED || status == STATUS_INPROGRESS)
  96. {
  97. ++active_count;
  98. req->setStatus(STATUS_ABORTED);
  99. }
  100. it = mRequestMap.erase(it);
  101. req->deleteRequest();
  102. }
  103. if (active_count)
  104. {
  105. llwarns << "Called with " << active_count << " active requests for "
  106. << mName << llendl;
  107. }
  108. }
  109. // Main thread
  110. //virtual
  111. size_t LLQueuedThread::update()
  112. {
  113. size_t pending = getPending();
  114. if (pending)
  115. {
  116. unpause();
  117. }
  118. return pending;
  119. }
  120. // May be called from any thread
  121. //virtual
  122. size_t LLQueuedThread::getPending()
  123. {
  124. size_t res;
  125. lockData();
  126. res = mRequestQueue.size();
  127. unlockData();
  128. return res;
  129. }
  130. // Main thread
  131. void LLQueuedThread::waitOnPending()
  132. {
  133. while (true)
  134. {
  135. update();
  136. if (mIdleThread)
  137. {
  138. break;
  139. }
  140. yield();
  141. }
  142. }
  143. // Main thread
  144. void LLQueuedThread::printQueueStats()
  145. {
  146. lockData();
  147. if (!mRequestQueue.empty())
  148. {
  149. QueuedRequest* req = *mRequestQueue.begin();
  150. llinfos << llformat("Pending requests:%d Current status:%d",
  151. mRequestQueue.size(), req->getStatus()) << llendl;
  152. }
  153. else
  154. {
  155. llinfos << "Queued thread idle" << llendl;
  156. }
  157. unlockData();
  158. }
  159. // Main thread
  160. LLQueuedThread::handle_t LLQueuedThread::generateHandle()
  161. {
  162. lockData();
  163. handle_t res = ++mNextHandle;
  164. unlockData();
  165. return res;
  166. }
  167. // Main thread
  168. bool LLQueuedThread::addRequest(QueuedRequest* req)
  169. {
  170. if (mStatus == QUITTING)
  171. {
  172. return false;
  173. }
  174. lockData();
  175. req->setStatus(STATUS_QUEUED);
  176. mRequestQueue.insert(req);
  177. mRequestMap.emplace(req->mHandle, req);
  178. unlockData();
  179. // Something has been added to the queue
  180. if (!isPaused())
  181. {
  182. wake(); // Wake the thread up if necessary.
  183. }
  184. return true;
  185. }
  186. // Main thread
  187. LLQueuedThread::QueuedRequest* LLQueuedThread::getRequest(handle_t handle)
  188. {
  189. QueuedRequest* res = NULL;
  190. lockData();
  191. request_map_t::iterator it = mRequestMap.find(handle);
  192. if (it != mRequestMap.end())
  193. {
  194. res = it->second;
  195. }
  196. unlockData();
  197. return res;
  198. }
  199. LLQueuedThread::status_t LLQueuedThread::getRequestStatus(handle_t handle)
  200. {
  201. status_t res = STATUS_EXPIRED;
  202. lockData();
  203. request_map_t::iterator it = mRequestMap.find(handle);
  204. if (it != mRequestMap.end())
  205. {
  206. res = it->second->getStatus();
  207. }
  208. unlockData();
  209. return res;
  210. }
  211. void LLQueuedThread::abortRequest(handle_t handle, bool autocomplete)
  212. {
  213. lockData();
  214. request_map_t::iterator it = mRequestMap.find(handle);
  215. if (it != mRequestMap.end())
  216. {
  217. it->second->setFlags(FLAG_ABORT |
  218. (autocomplete ? FLAG_AUTO_COMPLETE : 0));
  219. }
  220. unlockData();
  221. }
  222. // Main thread
  223. void LLQueuedThread::setFlags(handle_t handle, U32 flags)
  224. {
  225. lockData();
  226. request_map_t::iterator it = mRequestMap.find(handle);
  227. if (it != mRequestMap.end())
  228. {
  229. it->second->setFlags(flags);
  230. }
  231. unlockData();
  232. }
  233. void LLQueuedThread::setPriority(handle_t handle, U32 priority)
  234. {
  235. lockData();
  236. request_map_t::iterator it = mRequestMap.find(handle);
  237. if (it != mRequestMap.end())
  238. {
  239. QueuedRequest* req = it->second;
  240. if (req->getPriority() != priority)
  241. {
  242. if (req->getStatus() == STATUS_INPROGRESS)
  243. {
  244. // Not in list
  245. req->setPriority(priority);
  246. }
  247. else if (req->getStatus() == STATUS_QUEUED)
  248. {
  249. // Remove from list then re-insert
  250. if (mRequestQueue.erase(req) != 1)
  251. {
  252. llwarns << "Request " << mName
  253. << " was not in the requests queue !" << llendl;
  254. llassert(false);
  255. }
  256. req->setPriority(priority);
  257. mRequestQueue.insert(req);
  258. }
  259. }
  260. }
  261. unlockData();
  262. }
  263. bool LLQueuedThread::completeRequest(handle_t handle)
  264. {
  265. bool res = false;
  266. lockData();
  267. request_map_t::iterator it = mRequestMap.find(handle);
  268. if (it != mRequestMap.end())
  269. {
  270. QueuedRequest* req = it->second;
  271. S32 status = req->getStatus();
  272. llassert_always(status != STATUS_QUEUED &&
  273. status != STATUS_INPROGRESS);
  274. mRequestMap.erase(it);
  275. req->deleteRequest();
  276. res = true;
  277. }
  278. unlockData();
  279. return res;
  280. }
  281. ///////////////////////////////////////////////////////////////////////////////
  282. // LLQueuedThread class: runs on its *own* thread
  283. ///////////////////////////////////////////////////////////////////////////////
  284. size_t LLQueuedThread::processNextRequest()
  285. {
  286. QueuedRequest* req;
  287. // Get next request from pool
  288. lockData();
  289. while (true)
  290. {
  291. req = NULL;
  292. if (mRequestQueue.empty())
  293. {
  294. break;
  295. }
  296. req = *mRequestQueue.begin();
  297. mRequestQueue.erase(mRequestQueue.begin());
  298. if (!req) continue;
  299. if (mStatus == QUITTING || (req->getFlags() & FLAG_ABORT))
  300. {
  301. LL_DEBUGS("QueuedThread") << mName << ": aborting request "
  302. << std::hex << (uintptr_t)req << std::dec
  303. << LL_ENDL;
  304. req->setStatus(STATUS_ABORTED);
  305. req->finishRequest(false);
  306. if (req->getFlags() & FLAG_AUTO_COMPLETE)
  307. {
  308. LL_DEBUGS("QueuedThread") << mName
  309. << ": deleting auto-complete request "
  310. << std::hex << (uintptr_t)req << std::dec
  311. << LL_ENDL;
  312. mRequestMap.erase(req->mHandle);
  313. req->deleteRequest();
  314. }
  315. continue;
  316. }
  317. llassert_always(req->getStatus() == STATUS_QUEUED);
  318. break;
  319. }
  320. U32 start_priority = 0;
  321. if (req)
  322. {
  323. LL_DEBUGS("QueuedThread") << mName << ": flagging request "
  324. << std::hex << (uintptr_t)req << std::dec
  325. << " as being in progress" << LL_ENDL;
  326. req->setStatus(STATUS_INPROGRESS);
  327. start_priority = req->getPriority();
  328. }
  329. unlockData();
  330. if (req)
  331. {
  332. // Process request
  333. bool ok = req->processRequest();
  334. setRequestResult(req, ok);
  335. if (!ok && start_priority < PRIORITY_NORMAL)
  336. {
  337. yield();
  338. }
  339. }
  340. return getPending();
  341. }
  342. void LLQueuedThread::setRequestResult(QueuedRequest* req, bool result)
  343. {
  344. if (result)
  345. {
  346. lockData();
  347. LL_DEBUGS("QueuedThread") << mName << ": flagging request "
  348. << std::hex << (uintptr_t)req << std::dec
  349. << " as complete" << LL_ENDL;
  350. req->setStatus(STATUS_COMPLETE);
  351. req->finishRequest(true);
  352. if (req->getFlags() & FLAG_AUTO_COMPLETE)
  353. {
  354. LL_DEBUGS("QueuedThread") << mName
  355. << ": deleting auto-complete request "
  356. << std::hex << (uintptr_t)req << std::dec
  357. << LL_ENDL;
  358. mRequestMap.erase(req->mHandle);
  359. req->deleteRequest();
  360. }
  361. unlockData();
  362. }
  363. else
  364. {
  365. lockData();
  366. LL_DEBUGS("QueuedThread") << mName << ": decreasing request "
  367. << std::hex << (uintptr_t)req << std::dec
  368. << " priority" << LL_ENDL;
  369. req->setStatus(STATUS_QUEUED);
  370. mRequestQueue.insert(req);
  371. unlockData();
  372. }
  373. }
  374. //virtual
  375. bool LLQueuedThread::runCondition()
  376. {
  377. // mRunCondition must be locked here
  378. return !mIdleThread || !mRequestQueue.empty();
  379. }
  380. //virtual
  381. void LLQueuedThread::run()
  382. {
  383. // Call checkPause() immediately so we do not try to do anything before the
  384. // class is fully constructed
  385. checkPause();
  386. startThread();
  387. while (!isQuitting())
  388. {
  389. mIdleThread = false;
  390. threadedUpdate();
  391. if (!processNextRequest())
  392. {
  393. mIdleThread = true;
  394. yield();
  395. }
  396. // This will block on the condition until runCondition() returns true,
  397. // the thread is unpaused, or the thread leaves the RUNNING state.
  398. checkPause();
  399. }
  400. endThread();
  401. llinfos << "Queued thread " << mName << " exiting." << llendl;
  402. }
  403. ///////////////////////////////////////////////////////////////////////////////
  404. // LLQueuedThread::QueuedRequest sub-class
  405. ///////////////////////////////////////////////////////////////////////////////
  406. //virtual
  407. LLQueuedThread::QueuedRequest::~QueuedRequest()
  408. {
  409. llassert_always(mStatus == STATUS_DELETE);
  410. }
  411. //virtual
  412. void LLQueuedThread::QueuedRequest::deleteRequest()
  413. {
  414. if (mStatus == STATUS_INPROGRESS)
  415. {
  416. llwarns << "Deleting a request in progress !" << llendl;
  417. }
  418. setStatus(STATUS_DELETE);
  419. delete this;
  420. }