1 #include "request_pool.h"
8 #include "logging/logging.h"
11 : rate_controller_(max_curl_requests), running_requests_(0), server_(server), mode_(mode), stopped_(false) {
12 curl_global_init(CURL_GLOBAL_DEFAULT);
13 multi_ = curl_multi_init();
14 curl_multi_setopt(multi_, CURLMOPT_PIPELINING, CURLPIPE_HTTP1 | CURLPIPE_MULTIPLEX);
17 RequestPool::~RequestPool() {
21 LOG_INFO <<
"Shutting down RequestPool...";
25 LOG_INFO <<
"...done";
27 curl_multi_cleanup(multi_);
28 curl_global_cleanup();
29 }
catch (std::exception& ex) {
30 LOG_ERROR <<
"Exception in RequestPool dtor: " << ex.what();
32 LOG_ERROR <<
"Unknown exception in RequestPool dtor";
36 void RequestPool::AddQuery(
const OSTreeObject::ptr& request) {
37 request->LaunchNotify();
39 query_queue_.push_back(request);
43 void RequestPool::AddUpload(
const OSTreeObject::ptr& request) {
44 request->LaunchNotify();
46 upload_queue_.push_back(request);
50 void RequestPool::LoopLaunch() {
51 while (running_requests_ < rate_controller_.MaxConcurrency() && (!query_queue_.empty() || !upload_queue_.empty())) {
52 OSTreeObject::ptr cur;
55 if (query_queue_.empty()) {
56 cur = upload_queue_.front();
57 upload_queue_.pop_front();
58 cur->Upload(server_, multi_, mode_);
59 total_requests_made_++;
63 cur->NotifyParents(*
this);
66 cur = query_queue_.front();
67 query_queue_.pop_front();
68 cur->MakeTestRequest(server_, multi_);
69 total_requests_made_++;
76 void RequestPool::LoopListen() {
82 fd_set fdread, fdwrite, fdexcept;
88 mc = curl_multi_timeout(multi_, &timeoutms);
90 throw std::runtime_error(
"curl_multi_timeout failed with error");
95 mc = curl_multi_fdset(multi_, &fdread, &fdwrite, &fdexcept, &maxfd);
97 throw std::runtime_error(
"curl_multi_fdset failed with error");
100 struct timeval timeout {};
103 if (timeoutms == -1) {
108 timeout.tv_sec = timeoutms / 1000;
109 timeout.tv_usec = 1000 * (timeoutms % 1000);
111 if (select(maxfd + 1, &fdread, &fdwrite, &fdexcept, &timeout) < 0) {
112 throw std::runtime_error(std::string(
"select failed with error: ") + std::strerror(errno));
114 }
else if (timeoutms > 0) {
116 long nofd_timeoutms = std::min(timeoutms, static_cast<long>(100));
117 LOG_DEBUG <<
"Waiting " << nofd_timeoutms <<
" ms for curl";
119 timeout.tv_usec = 1000 * (nofd_timeoutms % 1000);
120 if (select(0,
nullptr,
nullptr,
nullptr, &timeout) < 0) {
121 throw std::runtime_error(std::string(
"select failed with error: ") + std::strerror(errno));
127 mc = curl_multi_perform(multi_, &running_requests_);
128 if (mc != CURLM_OK) {
129 throw std::runtime_error(
"curl_multi failed with error");
131 assert(running_requests_ >= 0);
136 CURLMsg* msg = curl_multi_info_read(multi_, &msgs_in_queue);
137 if ((msg !=
nullptr) && msg->msg == CURLMSG_DONE) {
138 OSTreeObject::ptr h = ostree_object_from_curl(msg->easy_handle);
139 h->CurlDone(multi_, *
this);
140 const bool server_responded_ok = h->LastOperationResult() == ServerResponse::kOk;
141 const RateController::clock::time_point start_time = h->RequestStartTime();
142 const RateController::clock::time_point end_time = RateController::clock::now();
143 rate_controller_.RequestCompleted(start_time, end_time, server_responded_ok);
144 if (rate_controller_.ServerHasFailed()) {
147 auto duration = rate_controller_.GetSleepTime();
148 if (duration > RateController::clock::duration(0)) {
149 LOG_DEBUG <<
"Sleeping for " << std::chrono::duration_cast<std::chrono::seconds>(duration).count()
150 <<
" seconds due to server congestion.";
151 std::this_thread::sleep_for(duration);
155 }
while (msgs_in_queue > 0);