1 #include "request_pool.h"
8 #include "logging/logging.h"
11 : rate_controller_(max_curl_requests), running_requests_(0), server_(server), mode_(mode), stopped_(false) {
12 curl_global_init(CURL_GLOBAL_DEFAULT);
13 multi_ = curl_multi_init();
14 curl_multi_setopt(multi_, CURLMOPT_PIPELINING, CURLPIPE_HTTP1 | CURLPIPE_MULTIPLEX);
17 RequestPool::~RequestPool() {
21 LOG_INFO <<
"Shutting down RequestPool...";
25 LOG_INFO <<
"...done";
27 curl_multi_cleanup(multi_);
28 curl_global_cleanup();
29 }
catch (std::exception& ex) {
30 LOG_ERROR <<
"Exception in RequestPool dtor: " << ex.what();
32 LOG_ERROR <<
"Unknown exception in RequestPool dtor";
36 void RequestPool::AddQuery(
const OSTreeObject::ptr& request) {
37 request->LaunchNotify();
39 query_queue_.push_back(request);
43 void RequestPool::AddUpload(
const OSTreeObject::ptr& request) {
44 request->LaunchNotify();
46 upload_queue_.push_back(request);
50 void RequestPool::LoopLaunch() {
51 while (running_requests_ < rate_controller_.MaxConcurrency() && (!query_queue_.empty() || !upload_queue_.empty())) {
52 OSTreeObject::ptr cur;
55 if (query_queue_.empty()) {
56 cur = upload_queue_.front();
57 upload_queue_.pop_front();
58 cur->Upload(server_, multi_, mode_);
60 total_object_size_ += cur->GetSize();
64 cur->NotifyParents(*
this);
67 cur = query_queue_.front();
68 query_queue_.pop_front();
69 cur->MakeTestRequest(server_, multi_);
70 head_requests_made_++;
77 void RequestPool::LoopListen() {
83 fd_set fdread, fdwrite, fdexcept;
89 mc = curl_multi_timeout(multi_, &timeoutms);
91 throw std::runtime_error(
"curl_multi_timeout failed with error");
96 mc = curl_multi_fdset(multi_, &fdread, &fdwrite, &fdexcept, &maxfd);
98 throw std::runtime_error(
"curl_multi_fdset failed with error");
101 struct timeval timeout {};
104 if (timeoutms == -1) {
109 timeout.tv_sec = timeoutms / 1000;
110 timeout.tv_usec = 1000 * (timeoutms % 1000);
112 if (select(maxfd + 1, &fdread, &fdwrite, &fdexcept, &timeout) < 0) {
113 throw std::runtime_error(std::string(
"select failed with error: ") + std::strerror(errno));
115 }
else if (timeoutms > 0) {
117 long nofd_timeoutms = std::min(timeoutms, static_cast<long>(100));
118 LOG_DEBUG <<
"Waiting " << nofd_timeoutms <<
" ms for curl";
120 timeout.tv_usec = 1000 * (nofd_timeoutms % 1000);
121 if (select(0,
nullptr,
nullptr,
nullptr, &timeout) < 0) {
122 throw std::runtime_error(std::string(
"select failed with error: ") + std::strerror(errno));
128 mc = curl_multi_perform(multi_, &running_requests_);
129 if (mc != CURLM_OK) {
130 throw std::runtime_error(
"curl_multi failed with error");
132 assert(running_requests_ >= 0);
137 CURLMsg* msg = curl_multi_info_read(multi_, &msgs_in_queue);
138 if ((msg !=
nullptr) && msg->msg == CURLMSG_DONE) {
139 OSTreeObject::ptr h = ostree_object_from_curl(msg->easy_handle);
140 h->CurlDone(multi_, *
this);
141 const bool server_responded_ok = h->LastOperationResult() == ServerResponse::kOk;
142 const RateController::clock::time_point start_time = h->RequestStartTime();
143 const RateController::clock::time_point end_time = RateController::clock::now();
144 rate_controller_.RequestCompleted(start_time, end_time, server_responded_ok);
145 if (rate_controller_.ServerHasFailed()) {
148 auto duration = rate_controller_.GetSleepTime();
149 if (duration > RateController::clock::duration(0)) {
150 LOG_DEBUG <<
"Sleeping for " << std::chrono::duration_cast<std::chrono::seconds>(duration).count()
151 <<
" seconds due to server congestion.";
152 std::this_thread::sleep_for(duration);
156 }
while (msgs_in_queue > 0);