llpumpio.cpp 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933
  1. /**
  2. * @file llpumpio.cpp
  3. * @author Phoenix
  4. * @date 2004-11-21
  5. * @brief Implementation of the i/o pump and related functions.
  6. *
  7. * $LicenseInfo:firstyear=2004&license=viewergpl$
  8. *
  9. * Copyright (c) 2004-2009, Linden Research, Inc.
  10. *
  11. * Second Life Viewer Source Code
  12. * The source code in this file ("Source Code") is provided by Linden Lab
  13. * to you under the terms of the GNU General Public License, version 2.0
  14. * ("GPL"), unless you have obtained a separate licensing agreement
  15. * ("Other License"), formally executed by you and Linden Lab. Terms of
  16. * the GPL can be found in doc/GPL-license.txt in this distribution, or
  17. * online at http://secondlifegrid.net/programs/open_source/licensing/gplv2
  18. *
  19. * There are special exceptions to the terms and conditions of the GPL as
  20. * it is applied to this Source Code. View the full text of the exception
  21. * in the file doc/FLOSS-exception.txt in this software distribution, or
  22. * online at
  23. * http://secondlifegrid.net/programs/open_source/licensing/flossexception
  24. *
  25. * By copying, modifying or distributing this software, you acknowledge
  26. * that you have read and understood your obligations described above,
  27. * and agree to abide by those obligations.
  28. *
  29. * ALL LINDEN LAB SOURCE CODE IS PROVIDED "AS IS." LINDEN LAB MAKES NO
  30. * WARRANTIES, EXPRESS, IMPLIED OR OTHERWISE, REGARDING ITS ACCURACY,
  31. * COMPLETENESS OR PERFORMANCE.
  32. * $/LicenseInfo$
  33. */
  34. #include "linden_common.h"
  35. #include "apr_poll.h"
  36. #include "llpumpio.h"
  37. #include "llapr.h"
  38. #include "llfasttimer.h"
  39. #include "llstl.h"
  40. // These should not be enabled in production, but they can be intensely useful
  41. // during development for finding certain kinds of bugs.
  42. #if LL_LINUX
  43. # define LL_DEBUG_PIPE_TYPE_IN_PUMP 0
  44. # define LL_DEBUG_POLL_FILE_DESCRIPTORS 0
  45. # if LL_DEBUG_POLL_FILE_DESCRIPTORS
  46. # include "apr_portable.h"
  47. # endif
  48. #endif
  49. #if LL_DEBUG_PIPE_TYPE_IN_PUMP
  50. #include <typeinfo>
  51. #endif
  52. // Constants for poll timeout.
  53. constexpr S32 DEFAULT_POLL_TIMEOUT = 0;
  54. constexpr F32 DEFAULT_CHAIN_EXPIRY_SECS = 30.f;
  55. // Sorta spammy debug modes.
  56. #define LL_DEBUG_SPEW_BUFFER_CHANNEL_IN_ON_ERROR 0
  57. #define LL_DEBUG_PROCESS_LINK 0
  58. #define LL_DEBUG_PROCESS_RETURN_VALUE 0
  59. // Super spammy debug mode.
  60. #define LL_DEBUG_SPEW_BUFFER_CHANNEL_IN 0
  61. #define LL_DEBUG_SPEW_BUFFER_CHANNEL_OUT 0
  62. // Helper function
  63. void ll_debug_poll_fd(const char* msg, const apr_pollfd_t* pollp)
  64. {
  65. #if LL_DEBUG_POLL_FILE_DESCRIPTORS
  66. if (!pollp)
  67. {
  68. LL_DEBUGS("PumpIO") << "Poll -- " << (msg ? msg : "") << ": no pollfd."
  69. << LL_ENDL;
  70. return;
  71. }
  72. if (pollp->desc.s)
  73. {
  74. apr_os_sock_t os_sock;
  75. if (APR_SUCCESS == apr_os_sock_get(&os_sock, pollp->desc.s))
  76. {
  77. LL_DEBUGS("PumpIO") << "Poll -- " << (msg?msg:"") << " on fd "
  78. << os_sock << " at " << pollp->desc.s
  79. << LL_ENDL;
  80. }
  81. else
  82. {
  83. LL_DEBUGS("PumpIO") << "Poll -- " << (msg?msg:"") << " no fd "
  84. << " at " << pollp->desc.s << LL_ENDL;
  85. }
  86. }
  87. else if (pollp->desc.f)
  88. {
  89. apr_os_file_t os_file;
  90. if (APR_SUCCESS == apr_os_file_get(&os_file, pollp->desc.f))
  91. {
  92. LL_DEBUGS("PumpIO") << "Poll -- " << (msg?msg:"") << " on fd "
  93. << os_file << " at " << pollp->desc.f << LL_ENDL;
  94. }
  95. else
  96. {
  97. LL_DEBUGS("PumpIO") << "Poll -- " << (msg?msg:"") << " no fd "
  98. << " at " << pollp->desc.f << LL_ENDL;
  99. }
  100. }
  101. else
  102. {
  103. LL_DEBUGS("PumpIO") << "Poll -- " << (msg?msg:"") << ": no descriptor."
  104. << LL_ENDL;
  105. }
  106. #endif
  107. }
  108. /**
  109. * @struct ll_delete_apr_pollset_fd_client_data
  110. * @brief This is a simple helper class to clean up our client data.
  111. */
  112. struct ll_delete_apr_pollset_fd_client_data
  113. {
  114. typedef std::pair<LLIOPipe::ptr_t, apr_pollfd_t> pipe_conditional_t;
  115. void operator()(const pipe_conditional_t& conditional)
  116. {
  117. S32* client_id = (S32*)conditional.second.client_data;
  118. delete client_id;
  119. }
  120. };
  121. ///////////////////////////////////////////////////////////////////////////////
  122. // LLPumpIO class
  123. ///////////////////////////////////////////////////////////////////////////////
  124. LLPumpIO::LLPumpIO()
  125. : mRebuildPollset(false),
  126. mPollset(NULL),
  127. mPollsetClientID(0),
  128. mCurrentPool(NULL),
  129. mCurrentPoolReallocCount(0),
  130. mCurrentChain(mRunningChains.end())
  131. {
  132. mCurrentChain = mRunningChains.end();
  133. }
  134. LLPumpIO::~LLPumpIO()
  135. {
  136. if (mPollset)
  137. {
  138. apr_pollset_destroy(mPollset);
  139. mPollset = NULL;
  140. }
  141. if (mCurrentPool)
  142. {
  143. apr_pool_destroy(mCurrentPool);
  144. mCurrentPool = NULL;
  145. }
  146. }
  147. bool LLPumpIO::addChain(const chain_t& chain, F32 timeout)
  148. {
  149. if (chain.empty()) return false;
  150. LLChainInfo info;
  151. info.setTimeoutSeconds(timeout);
  152. info.mData = std::make_shared<LLBufferArray>();
  153. info.mData->setThreaded(false);
  154. LLLinkInfo link;
  155. #if LL_DEBUG_PIPE_TYPE_IN_PUMP
  156. LL_DEBUGS("PumpIO") << chain[0] << "Add chain '"
  157. << typeid(*(chain[0])).name() << "'" << LL_ENDL;
  158. #else
  159. LL_DEBUGS("PumpIO") << "Add chain: " << chain[0] << LL_ENDL;
  160. #endif
  161. chain_t::const_iterator it = chain.begin();
  162. chain_t::const_iterator end = chain.end();
  163. for ( ; it != end; ++it)
  164. {
  165. link.mPipe = *it;
  166. link.mChannels = info.mData->nextChannel();
  167. info.mChainLinks.push_back(link);
  168. }
  169. mPendingChains.push_back(info);
  170. return true;
  171. }
  172. bool LLPumpIO::addChain(const LLPumpIO::links_t& links,
  173. LLIOPipe::buffer_ptr_t datap, LLSD context,
  174. F32 timeout)
  175. {
  176. // Remember that if the caller is providing a full link description, we
  177. // need to have that description matched to a particular buffer.
  178. if (!datap || links.empty())
  179. {
  180. return false;
  181. }
  182. #if LL_DEBUG_PIPE_TYPE_IN_PUMP
  183. LL_DEBUGS("PumpIO") << "Add chain: " << links[0].mPipe << " '"
  184. << typeid(*(links[0].mPipe)).name() << "'" << LL_ENDL;
  185. #else
  186. LL_DEBUGS("PumpIO") << "Add chain: " << links[0].mPipe << LL_ENDL;
  187. #endif
  188. LLChainInfo info;
  189. info.setTimeoutSeconds(timeout);
  190. info.mChainLinks = links;
  191. info.mData = datap;
  192. info.mContext = context;
  193. mPendingChains.push_back(info);
  194. return true;
  195. }
  196. bool LLPumpIO::setTimeoutSeconds(F32 timeout)
  197. {
  198. // If no chain is running, return failure.
  199. if (mRunningChains.end() == mCurrentChain)
  200. {
  201. return false;
  202. }
  203. mCurrentChain->setTimeoutSeconds(timeout);
  204. return true;
  205. }
  206. void LLPumpIO::adjustTimeoutSeconds(F32 delta)
  207. {
  208. // Ensure a chain is running
  209. if (mRunningChains.end() != mCurrentChain)
  210. {
  211. mCurrentChain->adjustTimeoutSeconds(delta);
  212. }
  213. }
  214. static std::string events_2_string(apr_int16_t events)
  215. {
  216. std::ostringstream ostr;
  217. if (events & APR_POLLIN)
  218. {
  219. ostr << "read,";
  220. }
  221. if (events & APR_POLLPRI)
  222. {
  223. ostr << "priority,";
  224. }
  225. if (events & APR_POLLOUT)
  226. {
  227. ostr << "write,";
  228. }
  229. if (events & APR_POLLERR)
  230. {
  231. ostr << "error,";
  232. }
  233. if (events & APR_POLLHUP)
  234. {
  235. ostr << "hangup,";
  236. }
  237. if (events & APR_POLLNVAL)
  238. {
  239. ostr << "invalid,";
  240. }
  241. return chop_tail_copy(ostr.str(), 1);
  242. }
  243. bool LLPumpIO::setConditional(LLIOPipe* pipep, const apr_pollfd_t* pollp)
  244. {
  245. if (!pipep)
  246. {
  247. return false;
  248. }
  249. ll_debug_poll_fd("Set conditional", pollp);
  250. LL_DEBUGS("PumpIO") << "Setting conditionals ("
  251. << (pollp ? events_2_string(pollp->reqevents) : "NULL")
  252. << ") "
  253. #if LL_DEBUG_PIPE_TYPE_IN_PUMP
  254. << "on pipe " << typeid(*pipep).name()
  255. #endif
  256. << " at " << std::hex << pipep << std::dec << LL_ENDL;
  257. // Remove any matching poll file descriptors for this pipe.
  258. LLIOPipe::ptr_t pipe_ptr(pipep);
  259. LLChainInfo::conditionals_t::iterator it =
  260. mCurrentChain->mDescriptors.begin();
  261. while (it != mCurrentChain->mDescriptors.end())
  262. {
  263. LLChainInfo::pipe_conditional_t& value = (*it);
  264. if (pipe_ptr == value.first)
  265. {
  266. ll_delete_apr_pollset_fd_client_data()(value);
  267. it = mCurrentChain->mDescriptors.erase(it);
  268. mRebuildPollset = true;
  269. }
  270. else
  271. {
  272. ++it;
  273. }
  274. }
  275. if (!pollp)
  276. {
  277. mRebuildPollset = true;
  278. return true;
  279. }
  280. LLChainInfo::pipe_conditional_t value;
  281. value.first = pipe_ptr;
  282. value.second = *pollp;
  283. value.second.rtnevents = 0;
  284. if (!pollp->p)
  285. {
  286. // Each fd needs a pool to work with, so if one was not specified, use
  287. // this pool. *FIXME: should it always be this pool ?
  288. value.second.p = gAPRPoolp;
  289. }
  290. value.second.client_data = new S32(++mPollsetClientID);
  291. mCurrentChain->mDescriptors.push_back(value);
  292. mRebuildPollset = true;
  293. return true;
  294. }
  295. void LLPumpIO::pump()
  296. {
  297. pump(DEFAULT_POLL_TIMEOUT);
  298. #if LL_PUMPIO_RESPOND
  299. callback();
  300. #endif
  301. }
  302. LLPumpIO::current_chain_t LLPumpIO::removeRunningChain(LLPumpIO::current_chain_t& run_chain)
  303. {
  304. std::for_each(run_chain->mDescriptors.begin(),
  305. run_chain->mDescriptors.end(),
  306. ll_delete_apr_pollset_fd_client_data());
  307. return mRunningChains.erase(run_chain);
  308. }
  309. // Timeout is in microseconds
  310. void LLPumpIO::pump(S32 poll_timeout)
  311. {
  312. LL_FAST_TIMER(FTM_PUMP_IO);
  313. // We need to move all of the pending heads over to the running chains.
  314. PUMP_DEBUG;
  315. // Move the pending chains over to the running chaings
  316. if (!mPendingChains.empty())
  317. {
  318. PUMP_DEBUG;
  319. std::copy(mPendingChains.begin(), mPendingChains.end(),
  320. std::back_insert_iterator<running_chains_t>(mRunningChains));
  321. mPendingChains.clear();
  322. PUMP_DEBUG;
  323. }
  324. PUMP_DEBUG;
  325. // Rebuild the pollset if necessary
  326. if (mRebuildPollset)
  327. {
  328. PUMP_DEBUG;
  329. rebuildPollset();
  330. mRebuildPollset = false;
  331. }
  332. // Poll based on the last known pollset
  333. // *TODO: may want to pass in a poll timeout so it works correctly in
  334. // single and multi threaded processes.
  335. PUMP_DEBUG;
  336. typedef std::map<S32, S32> signal_client_t;
  337. signal_client_t signalled_client;
  338. const apr_pollfd_t* poll_fd = NULL;
  339. if (mPollset)
  340. {
  341. PUMP_DEBUG;
  342. S32 count = 0;
  343. S32 client_id = 0;
  344. {
  345. LL_TRACY_TIMER(TRC_PUMP_POLL);
  346. apr_pollset_poll(mPollset, poll_timeout, &count, &poll_fd);
  347. }
  348. PUMP_DEBUG;
  349. for (S32 ii = 0; ii < count; ++ii)
  350. {
  351. ll_debug_poll_fd("Signalled pipe", &poll_fd[ii]);
  352. client_id = *((S32*)poll_fd[ii].client_data);
  353. signalled_client[client_id] = ii;
  354. }
  355. PUMP_DEBUG;
  356. }
  357. PUMP_DEBUG;
  358. // set up for a check to see if each one was signalled
  359. signal_client_t::iterator not_signalled = signalled_client.end();
  360. // Process everything as appropriate
  361. running_chains_t::iterator run_chain = mRunningChains.begin();
  362. bool process_this_chain = false;
  363. while (run_chain != mRunningChains.end())
  364. {
  365. PUMP_DEBUG;
  366. if (run_chain->mInit && run_chain->mTimer.getStarted() &&
  367. run_chain->mTimer.hasExpired())
  368. {
  369. PUMP_DEBUG;
  370. if (handleChainError(*run_chain, LLIOPipe::STATUS_EXPIRED))
  371. {
  372. // the pipe probably handled the error. If the handler forgot
  373. // to reset the expiration then we need to do that here.
  374. if (run_chain->mTimer.getStarted() &&
  375. run_chain->mTimer.hasExpired())
  376. {
  377. PUMP_DEBUG;
  378. llinfos << "Error handler forgot to reset timeout. "
  379. << "Resetting to " << DEFAULT_CHAIN_EXPIRY_SECS
  380. << " seconds." << llendl;
  381. run_chain->setTimeoutSeconds(DEFAULT_CHAIN_EXPIRY_SECS);
  382. }
  383. }
  384. else
  385. {
  386. PUMP_DEBUG;
  387. // it timed out and no one handled it, so we need to
  388. // retire the chain
  389. #if LL_DEBUG_PIPE_TYPE_IN_PUMP
  390. LL_DEBUGS("PumpIO") << "Removing chain "
  391. << run_chain->mChainLinks[0].mPipe
  392. << " '"
  393. << typeid(*(run_chain->mChainLinks[0].mPipe)).name()
  394. << "' because it timed out." << LL_ENDL;
  395. #endif
  396. run_chain = removeRunningChain(run_chain);
  397. continue;
  398. }
  399. }
  400. mCurrentChain = run_chain;
  401. if (run_chain->mDescriptors.empty())
  402. {
  403. // if there are no conditionals, just process this chain.
  404. process_this_chain = true;
  405. }
  406. else
  407. {
  408. PUMP_DEBUG;
  409. // Check if this run chain was signalled. If any file descriptor is
  410. // ready for something, then go ahead and process this chain.
  411. process_this_chain = false;
  412. if (!signalled_client.empty())
  413. {
  414. PUMP_DEBUG;
  415. LLChainInfo::conditionals_t::iterator it;
  416. it = run_chain->mDescriptors.begin();
  417. LLChainInfo::conditionals_t::iterator end;
  418. end = run_chain->mDescriptors.end();
  419. S32 client_id = 0;
  420. signal_client_t::iterator signal;
  421. for (; it != end; ++it)
  422. {
  423. PUMP_DEBUG;
  424. client_id = *((S32*)(it->second.client_data));
  425. signal = signalled_client.find(client_id);
  426. if (signal == not_signalled) continue;
  427. constexpr apr_int16_t POLL_CHAIN_ERROR =
  428. APR_POLLHUP | APR_POLLNVAL | APR_POLLERR;
  429. const apr_pollfd_t* pollp = &(poll_fd[signal->second]);
  430. if (pollp->rtnevents & POLL_CHAIN_ERROR)
  431. {
  432. // Potential eror condition has been returned. If HUP
  433. // was one of them, we pass that as the error even
  434. // though there may be more. If there are in fact more
  435. // errors, we'll just wait for that detection until the
  436. // next pump() cycle to catch it so that the logic here
  437. // gets no more strained than it already is.
  438. LLIOPipe::EStatus error_status;
  439. if (pollp->rtnevents & APR_POLLHUP)
  440. {
  441. error_status = LLIOPipe::STATUS_LOST_CONNECTION;
  442. }
  443. else
  444. {
  445. error_status = LLIOPipe::STATUS_ERROR;
  446. }
  447. if (handleChainError(*run_chain, error_status)) break;
  448. ll_debug_poll_fd("Removing pipe", pollp);
  449. llwarns << "Removing pipe "
  450. << run_chain->mChainLinks[0].mPipe << " '"
  451. #if LL_DEBUG_PIPE_TYPE_IN_PUMP
  452. << typeid(*(run_chain->mChainLinks[0].mPipe)).name()
  453. #endif
  454. << "' because: "
  455. << events_2_string(pollp->rtnevents) << llendl;
  456. run_chain->mHead = run_chain->mChainLinks.end();
  457. break;
  458. }
  459. // At least 1 fd got signalled, and there were no errors.
  460. // That means we process this chain.
  461. process_this_chain = true;
  462. break;
  463. }
  464. }
  465. }
  466. if (process_this_chain)
  467. {
  468. PUMP_DEBUG;
  469. if (!run_chain->mInit)
  470. {
  471. run_chain->mHead = run_chain->mChainLinks.begin();
  472. run_chain->mInit = true;
  473. }
  474. PUMP_DEBUG;
  475. processChain(*run_chain);
  476. }
  477. PUMP_DEBUG;
  478. if (run_chain->mHead == run_chain->mChainLinks.end())
  479. {
  480. #if LL_DEBUG_PIPE_TYPE_IN_PUMP
  481. LL_DEBUGS("PumpIO") << "Removing chain "
  482. << run_chain->mChainLinks[0].mPipe
  483. << " '"
  484. << typeid(*(run_chain->mChainLinks[0].mPipe)).name()
  485. << "' because we reached the end." << LL_ENDL;
  486. #else
  487. LL_DEBUGS("PumpIO") << "Removing chain "
  488. << run_chain->mChainLinks[0].mPipe
  489. << " because we reached the end." << LL_ENDL;
  490. #endif
  491. PUMP_DEBUG;
  492. // This chain is done. Clean up any allocated memory and erase the
  493. // chain info.
  494. run_chain = removeRunningChain(run_chain);
  495. // *NOTE: may not always need to rebuild the pollset.
  496. mRebuildPollset = true;
  497. }
  498. else
  499. {
  500. PUMP_DEBUG;
  501. // This chain needs more processing: just go to the next chain.
  502. ++run_chain;
  503. }
  504. }
  505. PUMP_DEBUG;
  506. // null out the chain
  507. mCurrentChain = mRunningChains.end();
  508. END_PUMP_DEBUG;
  509. }
  510. #if LL_PUMPIO_RESPOND
  511. bool LLPumpIO::respond(LLIOPipe* pipep)
  512. {
  513. if (!pipep)
  514. {
  515. return false;
  516. }
  517. LLChainInfo info;
  518. LLLinkInfo link;
  519. link.mPipe = pipep;
  520. info.mChainLinks.push_back(link);
  521. mPendingCallbacks.push_back(info);
  522. return true;
  523. }
  524. bool LLPumpIO::respond(const links_t& links, LLIOPipe::buffer_ptr_t datap,
  525. LLSD context)
  526. {
  527. // If the caller is providing a full link description, we need to have that
  528. // description matched to a particular buffer.
  529. if (!datap || links.empty())
  530. {
  531. return false;
  532. }
  533. // Add the callback response
  534. LLChainInfo info;
  535. info.mChainLinks = links;
  536. info.mData = datap;
  537. info.mContext = context;
  538. mPendingCallbacks.push_back(info);
  539. return true;
  540. }
  541. void LLPumpIO::callback()
  542. {
  543. {
  544. std::copy(mPendingCallbacks.begin(), mPendingCallbacks.end(),
  545. std::back_insert_iterator<callbacks_t>(mCallbacks));
  546. mPendingCallbacks.clear();
  547. }
  548. if (!mCallbacks.empty())
  549. {
  550. LL_FAST_TIMER(FTM_PUMP_CALLBACK_CHAIN);
  551. for (callbacks_t::iterator it = mCallbacks.begin(),
  552. end = mCallbacks.end();
  553. it != end; ++it)
  554. {
  555. // It is always the first and last time for respone chains
  556. it->mHead = it->mChainLinks.begin();
  557. it->mInit = true;
  558. it->mEOS = true;
  559. processChain(*it);
  560. }
  561. mCallbacks.clear();
  562. }
  563. }
  564. #endif
  565. void LLPumpIO::rebuildPollset()
  566. {
  567. if (mPollset)
  568. {
  569. apr_pollset_destroy(mPollset);
  570. mPollset = NULL;
  571. }
  572. U32 size = 0;
  573. running_chains_t::iterator run_it = mRunningChains.begin();
  574. running_chains_t::iterator run_end = mRunningChains.end();
  575. for ( ; run_it != run_end; ++run_it)
  576. {
  577. size += run_it->mDescriptors.size();
  578. }
  579. if (size)
  580. {
  581. // Recycle the memory pool
  582. constexpr S32 POLLSET_POOL_RECYCLE_COUNT = 100;
  583. if (mCurrentPool &&
  584. (++mCurrentPoolReallocCount % POLLSET_POOL_RECYCLE_COUNT) == 0)
  585. {
  586. apr_pool_destroy(mCurrentPool);
  587. mCurrentPool = NULL;
  588. mCurrentPoolReallocCount = 0;
  589. }
  590. if (!mCurrentPool)
  591. {
  592. apr_status_t status = apr_pool_create(&mCurrentPool, gAPRPoolp);
  593. (void)ll_apr_warn_status(status);
  594. }
  595. // Add all of the file descriptors
  596. run_it = mRunningChains.begin();
  597. LLChainInfo::conditionals_t::iterator fd_it;
  598. LLChainInfo::conditionals_t::iterator fd_end;
  599. apr_pollset_create(&mPollset, size, mCurrentPool, 0);
  600. for ( ; run_it != run_end; ++run_it)
  601. {
  602. fd_it = run_it->mDescriptors.begin();
  603. fd_end = run_it->mDescriptors.end();
  604. for ( ; fd_it != fd_end; ++fd_it)
  605. {
  606. apr_pollset_add(mPollset, &(fd_it->second));
  607. }
  608. }
  609. }
  610. }
  611. void LLPumpIO::processChain(LLChainInfo& chain)
  612. {
  613. PUMP_DEBUG;
  614. LLIOPipe::EStatus status = LLIOPipe::STATUS_OK;
  615. links_t::iterator it = chain.mHead;
  616. links_t::iterator end = chain.mChainLinks.end();
  617. if (it == end) return;
  618. bool need_process_signaled = false;
  619. bool keep_going = true;
  620. do
  621. {
  622. #if LL_DEBUG_PROCESS_LINK
  623. # if LL_DEBUG_PIPE_TYPE_IN_PUMP
  624. llinfos << "Processing " << typeid(*(it->mPipe)).name() << "."
  625. << llendl;
  626. # else
  627. llinfos << "Processing link " << it->mPipe << "." << llendl;
  628. # endif
  629. #endif
  630. #if LL_DEBUG_SPEW_BUFFER_CHANNEL_IN
  631. if (chain.mData)
  632. {
  633. char* buf = NULL;
  634. S32 bytes = chain.mData->countAfter(it->mChannels.in(), NULL);
  635. if (bytes)
  636. {
  637. buf = new char[bytes + 1];
  638. chain.mData->readAfter(it->mChannels.in(), NULL, (U8*)buf,
  639. bytes);
  640. buf[bytes] = '\0';
  641. llinfos << "CHANNEL IN(" << it->mChannels.in() << "): "
  642. << buf << llendl;
  643. delete[] buf;
  644. buf = NULL;
  645. }
  646. else
  647. {
  648. llinfos << "CHANNEL IN(" << it->mChannels.in()<< "): (null)"
  649. << llendl;
  650. }
  651. }
  652. #endif
  653. PUMP_DEBUG;
  654. status = it->mPipe->process(it->mChannels, chain.mData, chain.mEOS,
  655. chain.mContext, this);
  656. #if LL_DEBUG_SPEW_BUFFER_CHANNEL_OUT
  657. if (chain.mData)
  658. {
  659. char* buf = NULL;
  660. S32 bytes = chain.mData->countAfter(it->mChannels.out(), NULL);
  661. if (bytes)
  662. {
  663. buf = new char[bytes + 1];
  664. chain.mData->readAfter(it->mChannels.out(), NULL, (U8*)buf,
  665. bytes);
  666. buf[bytes] = '\0';
  667. llinfos << "CHANNEL OUT(" << it->mChannels.out()<< "): "
  668. << buf << llendl;
  669. delete[] buf;
  670. buf = NULL;
  671. }
  672. else
  673. {
  674. llinfos << "CHANNEL OUT(" << it->mChannels.out()<< "): (null)"
  675. << llendl;
  676. }
  677. }
  678. #endif
  679. #if LL_DEBUG_PROCESS_RETURN_VALUE
  680. // Only bother with the success codes - error codes are logged
  681. // below.
  682. if (LLIOPipe::isSuccess(status))
  683. {
  684. llinfos << "Pipe returned: '"
  685. # if LL_DEBUG_PIPE_TYPE_IN_PUMP
  686. << typeid(*(it->mPipe)).name() << "':'"
  687. # endif
  688. << LLIOPipe::lookupStatusString(status) << "'" << llendl;
  689. }
  690. #endif
  691. PUMP_DEBUG;
  692. switch (status)
  693. {
  694. case LLIOPipe::STATUS_OK:
  695. // no-op
  696. break;
  697. case LLIOPipe::STATUS_STOP:
  698. PUMP_DEBUG;
  699. status = LLIOPipe::STATUS_OK;
  700. chain.mHead = end;
  701. keep_going = false;
  702. break;
  703. case LLIOPipe::STATUS_DONE:
  704. PUMP_DEBUG;
  705. status = LLIOPipe::STATUS_OK;
  706. chain.mHead = (it + 1);
  707. chain.mEOS = true;
  708. break;
  709. case LLIOPipe::STATUS_BREAK:
  710. PUMP_DEBUG;
  711. status = LLIOPipe::STATUS_OK;
  712. keep_going = false;
  713. break;
  714. case LLIOPipe::STATUS_NEED_PROCESS:
  715. PUMP_DEBUG;
  716. status = LLIOPipe::STATUS_OK;
  717. if (!need_process_signaled)
  718. {
  719. need_process_signaled = true;
  720. chain.mHead = it;
  721. }
  722. break;
  723. default:
  724. {
  725. PUMP_DEBUG;
  726. if (LLIOPipe::isError(status))
  727. {
  728. llinfos << "Pump generated pipe err: '"
  729. #if LL_DEBUG_PIPE_TYPE_IN_PUMP
  730. << typeid(*(it->mPipe)).name() << "':'"
  731. #endif
  732. << LLIOPipe::lookupStatusString(status) << "'"
  733. << llendl;
  734. #if LL_DEBUG_SPEW_BUFFER_CHANNEL_IN_ON_ERROR
  735. if (chain.mData)
  736. {
  737. char* buf = NULL;
  738. S32 bytes = chain.mData->countAfter(it->mChannels.in(),
  739. NULL);
  740. if (bytes)
  741. {
  742. buf = new char[bytes + 1];
  743. chain.mData->readAfter(it->mChannels.in(), NULL,
  744. (U8*)buf, bytes);
  745. buf[bytes] = '\0';
  746. llinfos << "Input After Error: " << buf << llendl;
  747. delete[] buf;
  748. buf = NULL;
  749. }
  750. else
  751. {
  752. llinfos << "Input After Error: (null)" << llendl;
  753. }
  754. }
  755. else
  756. {
  757. llinfos << "Input After Error: (null)" << llendl;
  758. }
  759. #endif
  760. keep_going = false;
  761. chain.mHead = it;
  762. if (!handleChainError(chain, status))
  763. {
  764. chain.mHead = end;
  765. }
  766. }
  767. else
  768. {
  769. llinfos << "Unhandled status code: " << status << ":"
  770. << LLIOPipe::lookupStatusString(status) << llendl;
  771. }
  772. break;
  773. }
  774. }
  775. PUMP_DEBUG;
  776. }
  777. while (keep_going && ++it != end);
  778. PUMP_DEBUG;
  779. }
  780. bool LLPumpIO::handleChainError(LLChainInfo& chain,
  781. LLIOPipe::EStatus error)
  782. {
  783. links_t::reverse_iterator rit;
  784. if (chain.mHead == chain.mChainLinks.end())
  785. {
  786. rit = links_t::reverse_iterator(chain.mHead);
  787. }
  788. else
  789. {
  790. rit = links_t::reverse_iterator(chain.mHead + 1);
  791. }
  792. links_t::reverse_iterator rend = chain.mChainLinks.rend();
  793. bool handled = false;
  794. bool keep_going = true;
  795. do
  796. {
  797. #if LL_DEBUG_PIPE_TYPE_IN_PUMP
  798. LL_DEBUGS("PumpIO") << "Passing error to "
  799. << typeid(*(rit->mPipe)).name() << "."
  800. << LL_ENDL;
  801. #endif
  802. error = rit->mPipe->handleError(error, this);
  803. switch (error)
  804. {
  805. case LLIOPipe::STATUS_OK:
  806. handled = true;
  807. chain.mHead = rit.base();
  808. break;
  809. case LLIOPipe::STATUS_STOP:
  810. case LLIOPipe::STATUS_DONE:
  811. case LLIOPipe::STATUS_BREAK:
  812. case LLIOPipe::STATUS_NEED_PROCESS:
  813. #if LL_DEBUG_PIPE_TYPE_IN_PUMP
  814. LL_DEBUGS("PumpIO") << "Pipe "
  815. << typeid(*(rit->mPipe)).name()
  816. << " returned code to stop error handler."
  817. << LL_ENDL;
  818. #endif
  819. keep_going = false;
  820. break;
  821. case LLIOPipe::STATUS_EXPIRED:
  822. keep_going = false;
  823. break;
  824. default:
  825. if (LLIOPipe::isSuccess(error))
  826. {
  827. llinfos << "Unhandled status code: " << error << ":"
  828. << LLIOPipe::lookupStatusString(error) << llendl;
  829. error = LLIOPipe::STATUS_ERROR;
  830. keep_going = false;
  831. }
  832. break;
  833. }
  834. }
  835. while (keep_going && !handled && ++rit != rend);
  836. return handled;
  837. }
  838. ///////////////////////////////////////////////////////////////////////////////
  839. // LLPumpIO::LLChainInfo structure
  840. ///////////////////////////////////////////////////////////////////////////////
  841. LLPumpIO::LLChainInfo::LLChainInfo()
  842. : mInit(false),
  843. mEOS(false)
  844. {
  845. mTimer.setTimerExpirySec(DEFAULT_CHAIN_EXPIRY_SECS);
  846. }
  847. void LLPumpIO::LLChainInfo::setTimeoutSeconds(F32 timeout)
  848. {
  849. if (timeout > 0.f)
  850. {
  851. mTimer.start();
  852. mTimer.reset();
  853. mTimer.setTimerExpirySec(timeout);
  854. }
  855. else
  856. {
  857. mTimer.stop();
  858. }
  859. }
  860. void LLPumpIO::LLChainInfo::adjustTimeoutSeconds(F32 delta)
  861. {
  862. if (mTimer.getStarted())
  863. {
  864. mTimer.setExpiryAt(mTimer.expiresAt() + (F64)delta);
  865. }
  866. }