llcoproceduremanager.cpp 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509
  1. /**
  2. * @file llcoproceduremanager.cpp
  3. * @author Rider Linden
  4. * @brief Singleton class for managing asset uploads to the sim.
  5. *
  6. * $LicenseInfo:firstyear=2015&license=viewergpl$
  7. *
  8. * Copyright (c) 2015, Linden Research, Inc.
  9. *
  10. * Second Life Viewer Source Code
  11. * The source code in this file ("Source Code") is provided by Linden Lab
  12. * to you under the terms of the GNU General Public License, version 2.0
  13. * ("GPL"), unless you have obtained a separate licensing agreement
  14. * ("Other License"), formally executed by you and Linden Lab. Terms of
  15. * the GPL can be found in doc/GPL-license.txt in this distribution, or
  16. * online at http://secondlifegrid.net/programs/open_source/licensing/gplv2
  17. *
  18. * There are special exceptions to the terms and conditions of the GPL as
  19. * it is applied to this Source Code. View the full text of the exception
  20. * in the file doc/FLOSS-exception.txt in this software distribution, or
  21. * online at
  22. * http://secondlifegrid.net/programs/open_source/licensing/flossexception
  23. *
  24. * By copying, modifying or distributing this software, you acknowledge
  25. * that you have read and understood your obligations described above,
  26. * and agree to abide by those obligations.
  27. *
  28. * ALL LINDEN LAB SOURCE CODE IS PROVIDED "AS IS." LINDEN LAB MAKES NO
  29. * WARRANTIES, EXPRESS, IMPLIED OR OTHERWISE, REGARDING ITS ACCURACY,
  30. * COMPLETENESS OR PERFORMANCE.
  31. * $/LicenseInfo$
  32. */
  33. #include "linden_common.h"
  34. #include <deque>
  35. #include <mutex>
  36. #include "boost/assign.hpp"
  37. #include "boost/fiber/condition_variable.hpp"
  38. #include "llcoproceduremanager.h"
  39. #include "llatomic.h"
  40. // Map of pool sizes for known pools
  41. static std::map<std::string, U32> sDefaultPoolSizes =
  42. boost::assign::map_list_of
  43. (std::string("Upload"), 1)
  44. (std::string("AssetStorage"), 16)
  45. // Keep AIS serialized to avoid getting COF out-of-sync
  46. (std::string("AIS"), 1);
  47. #define DEFAULT_POOL_SIZE 5
  48. // Made huge so that we do not fail enqueuing new coprocs due to the queue size
  49. #define COPROC_DEFAULT_QUEUE_SIZE (1024 * 1024)
  50. ///////////////////////////////////////////////////////////////////////////////
  51. // LLCoprocedureQueue template class. It used to be LLThreadSafeQueue, but was
  52. // only used here by LLCoprocedurePool, and the "performance viewer" changes
  53. // (i.e. the complexification) to the new LLThreadSafeQueue are of no interest
  54. // to LLCoprocedurePool, much to the contrary, since the new queue can throw()
  55. // (while we thoroughly avoid that with this old implementation), and cannot
  56. // use fiber-aware mutexes (because they break normal mutexes used elsewhere)
  57. // unlike here, where they *are* needed to avoid promises being badly locked
  58. // (which only causes spurious, harmless warnings, but still)... So I moved the
  59. // old queue code here and renamed it LLCoprocedureQueue. HB
  60. ///////////////////////////////////////////////////////////////////////////////
  61. template<typename ElementT>
  62. class LLCoprocedureQueue
  63. {
  64. public:
  65. typedef ElementT value_type;
  66. LLCoprocedureQueue(U32 capacity = 1024)
  67. : mCapacity(capacity)
  68. {
  69. }
  70. // Adds an element to the front of queue (will block if the queue has
  71. // reached its maximum capacity).
  72. void pushFront(const ElementT& element)
  73. {
  74. std::unique_lock<decltype(mLock)> lock1(mLock);
  75. while (true)
  76. {
  77. if (mStorage.size() < mCapacity)
  78. {
  79. mStorage.push_front(element);
  80. mEmptyCond.notify_one();
  81. return;
  82. }
  83. // Storage full. Wait for signal.
  84. mCapacityCond.wait(lock1);
  85. }
  86. }
  87. // Tries to add an element to the front of the queue without blocking.
  88. // Returns true only if the element was actually added.
  89. bool tryPushFront(const ElementT& element)
  90. {
  91. std::unique_lock<decltype(mLock)> lock1(mLock, std::defer_lock);
  92. if (!lock1.try_lock() || mStorage.size() >= mCapacity)
  93. {
  94. return false;
  95. }
  96. mStorage.push_front(element);
  97. mEmptyCond.notify_one();
  98. return true;
  99. }
  100. // Pops the element at the end of the queue (will block if the queue is
  101. // empty).
  102. ElementT popBack()
  103. {
  104. std::unique_lock<decltype(mLock)> lock1(mLock);
  105. while (true)
  106. {
  107. if (!mStorage.empty())
  108. {
  109. ElementT value = mStorage.back();
  110. mStorage.pop_back();
  111. mCapacityCond.notify_one();
  112. return value;
  113. }
  114. // Storage empty. Wait for signal.
  115. mEmptyCond.wait(lock1);
  116. }
  117. }
  118. // Pops an element from the end of the queue if there is one available.
  119. // Returns true only if an element was popped.
  120. bool tryPopBack(ElementT& element)
  121. {
  122. std::unique_lock<decltype(mLock)> lock1(mLock, std::defer_lock);
  123. if (!lock1.try_lock() || mStorage.empty())
  124. {
  125. return false;
  126. }
  127. element = mStorage.back();
  128. mStorage.pop_back();
  129. mCapacityCond.notify_one();
  130. return true;
  131. }
  132. // Returns the size of the queue.
  133. size_t size()
  134. {
  135. std::unique_lock<decltype(mLock)> lock(mLock);
  136. return mStorage.size();
  137. }
  138. private:
  139. std::deque<ElementT> mStorage;
  140. boost::fibers::mutex mLock;
  141. boost::fibers::condition_variable mCapacityCond;
  142. boost::fibers::condition_variable mEmptyCond;
  143. U32 mCapacity;
  144. };
  145. ///////////////////////////////////////////////////////////////////////////////
  146. // LLCoprocedurePool class
  147. ///////////////////////////////////////////////////////////////////////////////
  148. class LLCoprocedurePool
  149. {
  150. protected:
  151. LOG_CLASS(LLCoprocedurePool);
  152. public:
  153. // Non-copyable
  154. LLCoprocedurePool(const LLCoprocedurePool&) = delete;
  155. LLCoprocedurePool& operator=(const LLCoprocedurePool&) = delete;
  156. typedef LLCoprocedureManager::coprocedure_t coprocedure_t;
  157. LLCoprocedurePool(const std::string& name, size_t size);
  158. // Places the coprocedure on the queue for processing.
  159. //
  160. // @param name Is used for debugging and should identify this coroutine.
  161. // @param proc Is a bound function to be executed
  162. //
  163. // @return This method returns a UUID that can be used later to cancel
  164. // execution.
  165. LLUUID enqueueCoprocedure(const std::string& name, coprocedure_t proc);
  166. // Requests a shutdown of the upload manager.
  167. void shutdown();
  168. LL_INLINE U32 countActive() { return mNumActiveCoprocs; }
  169. LL_INLINE U32 countPending() { return mNumPendingCoprocs; }
  170. LL_INLINE U32 count()
  171. {
  172. return mNumActiveCoprocs + mNumPendingCoprocs;
  173. }
  174. private:
  175. void coprocedureInvokerCoro(LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t adapter);
  176. private:
  177. struct QueuedCoproc
  178. {
  179. typedef std::shared_ptr<QueuedCoproc> ptr_t;
  180. QueuedCoproc(const std::string& name, const LLUUID& id,
  181. coprocedure_t proc)
  182. : mName(name),
  183. mId(id),
  184. mProc(proc)
  185. {
  186. }
  187. std::string mName;
  188. LLUUID mId;
  189. coprocedure_t mProc;
  190. };
  191. std::string mPoolName;
  192. LLEventStream mWakeupTrigger;
  193. typedef std::map<std::string,
  194. LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t> adapter_map_t;
  195. adapter_map_t mCoroMapping;
  196. typedef LLCoprocedureQueue<QueuedCoproc::ptr_t> coproc_queue_t;
  197. coproc_queue_t mPendingCoprocs;
  198. LLAtomicU32 mNumActiveCoprocs;
  199. LLAtomicU32 mNumPendingCoprocs;
  200. LLCore::HttpRequest::policy_t mHTTPPolicy;
  201. bool mShutdown;
  202. };
  203. LLCoprocedurePool::LLCoprocedurePool(const std::string& pool_name, size_t size)
  204. : mPoolName(pool_name),
  205. mPendingCoprocs(COPROC_DEFAULT_QUEUE_SIZE),
  206. mNumActiveCoprocs(0),
  207. mNumPendingCoprocs(0),
  208. mShutdown(false),
  209. mWakeupTrigger("CoprocedurePool" + pool_name, true),
  210. mHTTPPolicy(LLCore::HttpRequest::DEFAULT_POLICY_ID)
  211. {
  212. std::string adapt_name = mPoolName + "Adapter";
  213. std::string full_name = "LLCoprocedurePool(" + mPoolName +
  214. ")::coprocedureInvokerCoro";
  215. for (size_t count = 0; count < size; ++count)
  216. {
  217. LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t adapter =
  218. std::make_shared<LLCoreHttpUtil::HttpCoroutineAdapter>(adapt_name,
  219. mHTTPPolicy);
  220. std::string pooled_coro =
  221. gCoros.launch(full_name,
  222. boost::bind(&LLCoprocedurePool::coprocedureInvokerCoro,
  223. this, adapter));
  224. mCoroMapping.emplace(pooled_coro, adapter);
  225. }
  226. llinfos << "Created coprocedure pool named \"" << mPoolName << "\" with "
  227. << size << " items." << llendl;
  228. mWakeupTrigger.post(LLSD());
  229. }
  230. void LLCoprocedurePool::shutdown()
  231. {
  232. mShutdown = true;
  233. mWakeupTrigger.post(LLSD());
  234. }
  235. LLUUID LLCoprocedurePool::enqueueCoprocedure(const std::string& name,
  236. coprocedure_t proc)
  237. {
  238. LLUUID id;
  239. id.generate();
  240. if (mPendingCoprocs.tryPushFront(std::make_shared<QueuedCoproc>(name, id,
  241. proc)))
  242. {
  243. ++mNumPendingCoprocs;
  244. LL_DEBUGS("CoreHttp") << "Coprocedure(" << name << ") enqueued with id="
  245. << id << " in pool: " << mPoolName << LL_ENDL;
  246. mWakeupTrigger.post(LLSD());
  247. return id;
  248. }
  249. llwarns << "Failure to enqueue new coprocedure " << name << " in pool: "
  250. << mPoolName << llendl;
  251. return LLUUID::null;
  252. }
  253. void LLCoprocedurePool::coprocedureInvokerCoro(LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t adapter)
  254. {
  255. while (!mShutdown)
  256. {
  257. llcoro::suspendUntilEventOn(mWakeupTrigger);
  258. while (!mShutdown && mPendingCoprocs.size())
  259. {
  260. QueuedCoproc::ptr_t coproc = mPendingCoprocs.popBack();
  261. if (!coproc)
  262. {
  263. break;
  264. }
  265. ++mNumActiveCoprocs;
  266. --mNumPendingCoprocs;
  267. LL_DEBUGS("CoreHttp") << "Dequeued and invoking coprocedure("
  268. << coproc->mName << ") with id="
  269. << coproc->mId << " in pool: " << mPoolName
  270. << LL_ENDL;
  271. try
  272. {
  273. coproc->mProc(adapter, coproc->mId);
  274. }
  275. catch (std::exception& e)
  276. {
  277. llwarns << "Coprocedure(" << coproc->mName << ") id="
  278. << coproc->mId << " threw an exception ! Message=\""
  279. << e.what() << "\"" << " in pool: " << mPoolName
  280. << llendl;
  281. }
  282. catch (...)
  283. {
  284. llwarns << "A non std::exception was thrown from "
  285. << coproc->mName << " with id=" << coproc->mId
  286. << " in pool: " << mPoolName << llendl;
  287. }
  288. --mNumActiveCoprocs;
  289. LL_DEBUGS("CoreHttp") << "Finished coprocedure("
  290. << coproc->mName << ") in pool: "
  291. << mPoolName
  292. << " - Coprocedures still active: "
  293. << mNumActiveCoprocs
  294. << " - Coprocedures still pending: "
  295. << mNumPendingCoprocs << LL_ENDL;
  296. }
  297. }
  298. llinfos << "Exiting coroutine for pool: " << mPoolName << llendl;
  299. }
  300. ///////////////////////////////////////////////////////////////////////////////
  301. // LLCoprocedureManager class
  302. ///////////////////////////////////////////////////////////////////////////////
  303. LLCoprocedureManager::pool_ptr_t LLCoprocedureManager::initializePool(const std::string& pool_name)
  304. {
  305. // Attempt to look up a pool size in the configuration. If found use it.
  306. std::string key_name = "PoolSize" + pool_name;
  307. size_t size = 0;
  308. if (pool_name.empty())
  309. {
  310. llerrs << "Poolname must not be empty" << llendl;
  311. }
  312. if (mPropertyQueryFn && !mPropertyQueryFn.empty())
  313. {
  314. size = mPropertyQueryFn(key_name);
  315. }
  316. if (size == 0)
  317. {
  318. // If not found grab the known default... If there is no known default
  319. // use a reasonable number like 5.
  320. std::map<std::string, U32>::iterator it =
  321. sDefaultPoolSizes.find(pool_name);
  322. size = it == sDefaultPoolSizes.end() ? DEFAULT_POOL_SIZE : it->second;
  323. if (mPropertyDefineFn && !mPropertyDefineFn.empty())
  324. {
  325. mPropertyDefineFn(key_name, size);
  326. }
  327. llinfos << "No setting for \"" << key_name
  328. << "\" setting pool size to default of " << size << llendl;
  329. }
  330. pool_ptr_t pool = std::make_shared<LLCoprocedurePool>(pool_name, size);
  331. if (!pool)
  332. {
  333. llerrs << "Unable to create pool named \"" << pool_name << "\" FATAL !"
  334. << llendl;
  335. }
  336. mPoolMap.emplace(pool_name, pool);
  337. return pool;
  338. }
  339. LLUUID LLCoprocedureManager::enqueueCoprocedure(const std::string& pool,
  340. const std::string& name,
  341. coprocedure_t proc)
  342. {
  343. // Attempt to find the pool and enqueue the procedure. If the pool does not
  344. // exist, create it.
  345. pool_ptr_t target_pool;
  346. pool_map_t::iterator it = mPoolMap.find(pool);
  347. if (it == mPoolMap.end() || !it->second)
  348. {
  349. llwarns << "Pool " << pool
  350. << " was not initialized. Initializing it now (could cause a crash)."
  351. << llendl;
  352. target_pool = initializePool(pool);
  353. }
  354. else
  355. {
  356. target_pool = it->second;
  357. }
  358. return target_pool->enqueueCoprocedure(name, proc);
  359. }
  360. void LLCoprocedureManager::cleanup()
  361. {
  362. for (pool_map_t::const_iterator it = mPoolMap.begin(), end = mPoolMap.end();
  363. it != end; ++it)
  364. {
  365. if (it->second)
  366. {
  367. it->second->shutdown();
  368. }
  369. }
  370. #if 0 // Do NOT destroy pools now: this causes crashes on exit. The map will
  371. // be "naturally" destroyed/cleared on LLCoprocedureManager destruction
  372. // in the compiler generated code (by destructors chaining virtue).
  373. mPoolMap.clear();
  374. #endif
  375. }
  376. void LLCoprocedureManager::setPropertyMethods(setting_query_t queryfn,
  377. setting_upd_t updatefn)
  378. {
  379. mPropertyQueryFn = queryfn;
  380. mPropertyDefineFn = updatefn;
  381. // Workaround until we get mutex into initializePool
  382. initializePool("Upload");
  383. }
  384. U32 LLCoprocedureManager::countPending() const
  385. {
  386. U32 count = 0;
  387. for (pool_map_t::const_iterator it = mPoolMap.begin(), end = mPoolMap.end();
  388. it != end; ++it)
  389. {
  390. if (it->second)
  391. {
  392. count += it->second->countPending();
  393. }
  394. }
  395. return count;
  396. }
  397. U32 LLCoprocedureManager::countPending(const std::string& pool) const
  398. {
  399. pool_map_t::const_iterator it = mPoolMap.find(pool);
  400. return it != mPoolMap.end() && it->second ? it->second->countPending() : 0;
  401. }
  402. U32 LLCoprocedureManager::countActive() const
  403. {
  404. U32 count = 0;
  405. for (pool_map_t::const_iterator it = mPoolMap.begin(), end = mPoolMap.end();
  406. it != end; ++it)
  407. {
  408. if (it->second)
  409. {
  410. count += it->second->countActive();
  411. }
  412. }
  413. return count;
  414. }
  415. U32 LLCoprocedureManager::countActive(const std::string& pool) const
  416. {
  417. pool_map_t::const_iterator it = mPoolMap.find(pool);
  418. return it != mPoolMap.end() && it->second ? it->second->countActive() : 0;
  419. }
  420. U32 LLCoprocedureManager::count() const
  421. {
  422. U32 count = 0;
  423. for (pool_map_t::const_iterator it = mPoolMap.begin(), end = mPoolMap.end();
  424. it != end; ++it)
  425. {
  426. if (it->second)
  427. {
  428. count += it->second->count();
  429. }
  430. }
  431. return count;
  432. }
  433. U32 LLCoprocedureManager::count(const std::string& pool) const
  434. {
  435. pool_map_t::const_iterator it = mPoolMap.find(pool);
  436. return it != mPoolMap.end() && it->second ? it->second->count() : 0;
  437. }