1 #include "request_pool.h"
8 #include "logging/logging.h"
11 : rate_controller_(max_curl_requests), running_requests_(0), server_(server), mode_(mode), stopped_(false) {
12 curl_global_init(CURL_GLOBAL_DEFAULT);
13 multi_ = curl_multi_init();
14 curl_multi_setopt(multi_, CURLMOPT_PIPELINING, CURLPIPE_HTTP1 | CURLPIPE_MULTIPLEX);
17 RequestPool::~RequestPool() {
21 LOG_INFO <<
"Shutting down RequestPool...";
25 LOG_INFO <<
"...done";
27 curl_multi_cleanup(multi_);
28 curl_global_cleanup();
29 }
catch (std::exception& ex) {
30 LOG_ERROR <<
"Exception in RequestPool dtor: " << ex.what();
32 LOG_ERROR <<
"Unknown exception in RequestPool dtor";
36 void RequestPool::AddQuery(
const OSTreeObject::ptr& request) {
37 request->LaunchNotify();
39 query_queue_.push_back(request);
43 void RequestPool::AddUpload(
const OSTreeObject::ptr& request) {
44 request->LaunchNotify();
46 upload_queue_.push_back(request);
50 void RequestPool::LoopLaunch() {
51 while (running_requests_ < rate_controller_.MaxConcurrency() && (!query_queue_.empty() || !upload_queue_.empty())) {
52 OSTreeObject::ptr cur;
55 if (query_queue_.empty()) {
56 cur = upload_queue_.front();
57 upload_queue_.pop_front();
58 cur->Upload(server_, multi_, mode_);
60 total_object_size_ += cur->GetSize();
64 cur->NotifyParents(*
this);
67 cur = query_queue_.front();
68 query_queue_.pop_front();
69 cur->MakeTestRequest(server_, multi_);
70 head_requests_made_++;
77 void RequestPool::LoopListen() {
91 mc = curl_multi_timeout(multi_, &timeoutms);
93 throw std::runtime_error(
"curl_multi_timeout failed with error");
98 mc = curl_multi_fdset(multi_, &fdread, &fdwrite, &fdexcept, &maxfd);
100 throw std::runtime_error(
"curl_multi_fdset failed with error");
103 struct timeval timeout {};
106 if (timeoutms == -1) {
111 timeout.tv_sec = timeoutms / 1000;
112 timeout.tv_usec = 1000 * (timeoutms % 1000);
114 if (select(maxfd + 1, &fdread, &fdwrite, &fdexcept, &timeout) < 0) {
115 throw std::runtime_error(std::string(
"select failed with error: ") + std::strerror(errno));
117 }
else if (timeoutms > 0) {
119 long nofd_timeoutms = std::min(timeoutms,
static_cast<long>(100));
120 LOG_DEBUG <<
"Waiting " << nofd_timeoutms <<
" ms for curl";
122 timeout.tv_usec = 1000 * (nofd_timeoutms % 1000);
123 if (select(0,
nullptr,
nullptr,
nullptr, &timeout) < 0) {
124 throw std::runtime_error(std::string(
"select failed with error: ") + std::strerror(errno));
130 mc = curl_multi_perform(multi_, &running_requests_);
131 if (mc != CURLM_OK) {
132 throw std::runtime_error(
"curl_multi failed with error");
134 assert(running_requests_ >= 0);
139 CURLMsg* msg = curl_multi_info_read(multi_, &msgs_in_queue);
140 if ((msg !=
nullptr) && msg->msg == CURLMSG_DONE) {
141 OSTreeObject::ptr h = ostree_object_from_curl(msg->easy_handle);
142 h->CurlDone(multi_, *
this);
143 const bool server_responded_ok = h->LastOperationResult() == ServerResponse::kOk;
144 const RateController::clock::time_point start_time = h->RequestStartTime();
145 const RateController::clock::time_point end_time = RateController::clock::now();
146 rate_controller_.RequestCompleted(start_time, end_time, server_responded_ok);
147 if (rate_controller_.ServerHasFailed()) {
150 auto duration = rate_controller_.GetSleepTime();
151 if (duration > RateController::clock::duration(0)) {
152 LOG_DEBUG <<
"Sleeping for " << std::chrono::duration_cast<std::chrono::seconds>(duration).count()
153 <<
" seconds due to server congestion.";
154 std::this_thread::sleep_for(duration);
158 }
while (msgs_in_queue > 0);