Aktualizr
C++ SOTA Client
All Classes Namespaces Files Functions Variables Enumerations Enumerator Pages
request_pool.cc
1 #include "request_pool.h"
2 
3 #include <algorithm> // min
4 #include <chrono>
5 #include <exception>
6 #include <thread>
7 
8 #include "logging/logging.h"
9 
10 RequestPool::RequestPool(TreehubServer& server, const int max_curl_requests, const RunMode mode)
11  : rate_controller_(max_curl_requests), running_requests_(0), server_(server), mode_(mode), stopped_(false) {
12  curl_global_init(CURL_GLOBAL_DEFAULT);
13  multi_ = curl_multi_init();
14  curl_multi_setopt(multi_, CURLMOPT_PIPELINING, CURLPIPE_HTTP1 | CURLPIPE_MULTIPLEX);
15 }
16 
17 RequestPool::~RequestPool() {
18  try {
19  Abort();
20 
21  LOG_INFO << "Shutting down RequestPool...";
22  while (!is_idle()) {
23  LoopListen();
24  }
25  LOG_INFO << "...done";
26 
27  curl_multi_cleanup(multi_);
28  curl_global_cleanup();
29  } catch (std::exception& ex) {
30  LOG_ERROR << "Exception in RequestPool dtor: " << ex.what();
31  } catch (...) {
32  LOG_ERROR << "Unknown exception in RequestPool dtor";
33  }
34 }
35 
36 void RequestPool::AddQuery(const OSTreeObject::ptr& request) {
37  request->LaunchNotify();
38  if (!stopped_) {
39  query_queue_.push_back(request);
40  }
41 }
42 
43 void RequestPool::AddUpload(const OSTreeObject::ptr& request) {
44  request->LaunchNotify();
45  if (!stopped_) {
46  upload_queue_.push_back(request);
47  }
48 }
49 
50 void RequestPool::LoopLaunch() {
51  while (running_requests_ < rate_controller_.MaxConcurrency() && (!query_queue_.empty() || !upload_queue_.empty())) {
52  OSTreeObject::ptr cur;
53 
54  // Queries first, uploads second
55  if (query_queue_.empty()) {
56  cur = upload_queue_.front();
57  upload_queue_.pop_front();
58  cur->Upload(server_, multi_, mode_);
59  put_requests_made_++;
60  total_object_size_ += cur->GetSize();
61  if (mode_ == RunMode::kDryRun || mode_ == RunMode::kWalkTree) {
62  // Don't send an actual upload message, just skip to the part where we
63  // acknowledge that the object has been uploaded.
64  cur->NotifyParents(*this);
65  }
66  } else {
67  cur = query_queue_.front();
68  query_queue_.pop_front();
69  cur->MakeTestRequest(server_, multi_);
70  head_requests_made_++;
71  }
72 
73  running_requests_++;
74  }
75 }
76 
77 void RequestPool::LoopListen() {
78  // For more information about the timeout logic, read these:
79  // https://curl.haxx.se/libcurl/c/curl_multi_timeout.html
80  // https://curl.haxx.se/libcurl/c/curl_multi_fdset.html
81  CURLMcode mc;
82  // Poll for IO
83  fd_set fdread;
84  fd_set fdwrite;
85  fd_set fdexcept;
86  int maxfd = 0;
87  FD_ZERO(&fdread); // NOLINT(readability-isolate-declaration)
88  FD_ZERO(&fdwrite); // NOLINT(readability-isolate-declaration)
89  FD_ZERO(&fdexcept); // NOLINT(readability-isolate-declaration)
90  long timeoutms = 0; // NOLINT(google-runtime-int)
91  mc = curl_multi_timeout(multi_, &timeoutms);
92  if (mc != CURLM_OK) {
93  throw std::runtime_error("curl_multi_timeout failed with error");
94  }
95  // If timeoutms is 0, "it means you should proceed immediately without waiting
96  // for anything".
97  if (timeoutms != 0) {
98  mc = curl_multi_fdset(multi_, &fdread, &fdwrite, &fdexcept, &maxfd);
99  if (mc != CURLM_OK) {
100  throw std::runtime_error("curl_multi_fdset failed with error");
101  }
102 
103  struct timeval timeout {};
104  if (maxfd != -1) {
105  // "Wait for activities no longer than the set timeout."
106  if (timeoutms == -1) {
107  // "You must not wait too long (more than a few seconds perhaps)".
108  timeout.tv_sec = 3;
109  timeout.tv_usec = 0;
110  } else {
111  timeout.tv_sec = timeoutms / 1000;
112  timeout.tv_usec = 1000 * (timeoutms % 1000);
113  }
114  if (select(maxfd + 1, &fdread, &fdwrite, &fdexcept, &timeout) < 0) {
115  throw std::runtime_error(std::string("select failed with error: ") + std::strerror(errno));
116  }
117  } else if (timeoutms > 0) {
118  // If maxfd == -1, then wait the lesser of timeoutms and 100 ms.
119  long nofd_timeoutms = std::min(timeoutms, static_cast<long>(100)); // NOLINT(google-runtime-int)
120  LOG_DEBUG << "Waiting " << nofd_timeoutms << " ms for curl";
121  timeout.tv_sec = 0;
122  timeout.tv_usec = 1000 * (nofd_timeoutms % 1000);
123  if (select(0, nullptr, nullptr, nullptr, &timeout) < 0) {
124  throw std::runtime_error(std::string("select failed with error: ") + std::strerror(errno));
125  }
126  }
127  }
128 
129  // Ask curl to handle IO
130  mc = curl_multi_perform(multi_, &running_requests_);
131  if (mc != CURLM_OK) {
132  throw std::runtime_error("curl_multi failed with error");
133  }
134  assert(running_requests_ >= 0);
135 
136  // Deal with any completed requests
137  int msgs_in_queue;
138  do {
139  CURLMsg* msg = curl_multi_info_read(multi_, &msgs_in_queue);
140  if ((msg != nullptr) && msg->msg == CURLMSG_DONE) {
141  OSTreeObject::ptr h = ostree_object_from_curl(msg->easy_handle);
142  h->CurlDone(multi_, *this);
143  const bool server_responded_ok = h->LastOperationResult() == ServerResponse::kOk;
144  const RateController::clock::time_point start_time = h->RequestStartTime();
145  const RateController::clock::time_point end_time = RateController::clock::now();
146  rate_controller_.RequestCompleted(start_time, end_time, server_responded_ok);
147  if (rate_controller_.ServerHasFailed()) {
148  Abort();
149  } else {
150  auto duration = rate_controller_.GetSleepTime();
151  if (duration > RateController::clock::duration(0)) {
152  LOG_DEBUG << "Sleeping for " << std::chrono::duration_cast<std::chrono::seconds>(duration).count()
153  << " seconds due to server congestion.";
154  std::this_thread::sleep_for(duration);
155  }
156  }
157  }
158  } while (msgs_in_queue > 0);
159 }
160 
162  LoopLaunch();
163  LoopListen();
164 }
165 // vim: set tabstop=2 shiftwidth=2 expandtab:
RunMode::kWalkTree
@ kWalkTree
Walk the entire tree (without uploading).
RequestPool::Loop
void Loop()
One iteration of request-listen loop, launches multiple requests, then listens for the result.
Definition: request_pool.cc:161
TreehubServer
Definition: treehub_server.h:11
RunMode::kDryRun
@ kDryRun
Dry run.
RunMode
RunMode
Execution mode to run garage tools in.
Definition: garage_common.h:6