Aktualizr
C++ SOTA Client
All Classes Namespaces Files Functions Variables Enumerations Enumerator Pages
request_pool.cc
1 #include "request_pool.h"
2 
3 #include <algorithm> // min
4 #include <chrono>
5 #include <exception>
6 #include <thread>
7 
8 #include "logging/logging.h"
9 
10 RequestPool::RequestPool(TreehubServer& server, const int max_curl_requests, const RunMode mode)
11  : rate_controller_(max_curl_requests), running_requests_(0), server_(server), mode_(mode), stopped_(false) {
12  curl_global_init(CURL_GLOBAL_DEFAULT);
13  multi_ = curl_multi_init();
14  curl_multi_setopt(multi_, CURLMOPT_PIPELINING, CURLPIPE_HTTP1 | CURLPIPE_MULTIPLEX);
15 }
16 
17 RequestPool::~RequestPool() {
18  try {
19  Abort();
20 
21  LOG_INFO << "Shutting down RequestPool...";
22  while (!is_idle()) {
23  LoopListen();
24  }
25  LOG_INFO << "...done";
26 
27  curl_multi_cleanup(multi_);
28  curl_global_cleanup();
29  } catch (std::exception& ex) {
30  LOG_ERROR << "Exception in RequestPool dtor: " << ex.what();
31  } catch (...) {
32  LOG_ERROR << "Unknown exception in RequestPool dtor";
33  }
34 }
35 
36 void RequestPool::AddQuery(const OSTreeObject::ptr& request) {
37  request->LaunchNotify();
38  if (!stopped_) {
39  query_queue_.push_back(request);
40  }
41 }
42 
43 void RequestPool::AddUpload(const OSTreeObject::ptr& request) {
44  request->LaunchNotify();
45  if (!stopped_) {
46  upload_queue_.push_back(request);
47  }
48 }
49 
50 void RequestPool::LoopLaunch() {
51  while (running_requests_ < rate_controller_.MaxConcurrency() && (!query_queue_.empty() || !upload_queue_.empty())) {
52  OSTreeObject::ptr cur;
53 
54  // Queries first, uploads second
55  if (query_queue_.empty()) {
56  cur = upload_queue_.front();
57  upload_queue_.pop_front();
58  cur->Upload(server_, multi_, mode_);
59  total_requests_made_++;
60  if (mode_ == RunMode::kDryRun || mode_ == RunMode::kWalkTree) {
61  // Don't send an actual upload message, just skip to the part where we
62  // acknowledge that the object has been uploaded.
63  cur->NotifyParents(*this);
64  }
65  } else {
66  cur = query_queue_.front();
67  query_queue_.pop_front();
68  cur->MakeTestRequest(server_, multi_);
69  total_requests_made_++;
70  }
71 
72  running_requests_++;
73  }
74 }
75 
76 void RequestPool::LoopListen() {
77  // For more information about the timeout logic, read these:
78  // https://curl.haxx.se/libcurl/c/curl_multi_timeout.html
79  // https://curl.haxx.se/libcurl/c/curl_multi_fdset.html
80  CURLMcode mc;
81  // Poll for IO
82  fd_set fdread, fdwrite, fdexcept;
83  int maxfd = 0;
84  FD_ZERO(&fdread);
85  FD_ZERO(&fdwrite);
86  FD_ZERO(&fdexcept);
87  long timeoutms = 0; // NOLINT(google-runtime-int)
88  mc = curl_multi_timeout(multi_, &timeoutms);
89  if (mc != CURLM_OK) {
90  throw std::runtime_error("curl_multi_timeout failed with error");
91  }
92  // If timeoutms is 0, "it means you should proceed immediately without waiting
93  // for anything".
94  if (timeoutms != 0) {
95  mc = curl_multi_fdset(multi_, &fdread, &fdwrite, &fdexcept, &maxfd);
96  if (mc != CURLM_OK) {
97  throw std::runtime_error("curl_multi_fdset failed with error");
98  }
99 
100  struct timeval timeout {};
101  if (maxfd != -1) {
102  // "Wait for activities no longer than the set timeout."
103  if (timeoutms == -1) {
104  // "You must not wait too long (more than a few seconds perhaps)".
105  timeout.tv_sec = 3;
106  timeout.tv_usec = 0;
107  } else {
108  timeout.tv_sec = timeoutms / 1000;
109  timeout.tv_usec = 1000 * (timeoutms % 1000);
110  }
111  if (select(maxfd + 1, &fdread, &fdwrite, &fdexcept, &timeout) < 0) {
112  throw std::runtime_error(std::string("select failed with error: ") + std::strerror(errno));
113  }
114  } else if (timeoutms > 0) {
115  // If maxfd == -1, then wait the lesser of timeoutms and 100 ms.
116  long nofd_timeoutms = std::min(timeoutms, static_cast<long>(100)); // NOLINT(google-runtime-int)
117  LOG_DEBUG << "Waiting " << nofd_timeoutms << " ms for curl";
118  timeout.tv_sec = 0;
119  timeout.tv_usec = 1000 * (nofd_timeoutms % 1000);
120  if (select(0, nullptr, nullptr, nullptr, &timeout) < 0) {
121  throw std::runtime_error(std::string("select failed with error: ") + std::strerror(errno));
122  }
123  }
124  }
125 
126  // Ask curl to handle IO
127  mc = curl_multi_perform(multi_, &running_requests_);
128  if (mc != CURLM_OK) {
129  throw std::runtime_error("curl_multi failed with error");
130  }
131  assert(running_requests_ >= 0);
132 
133  // Deal with any completed requests
134  int msgs_in_queue;
135  do {
136  CURLMsg* msg = curl_multi_info_read(multi_, &msgs_in_queue);
137  if ((msg != nullptr) && msg->msg == CURLMSG_DONE) {
138  OSTreeObject::ptr h = ostree_object_from_curl(msg->easy_handle);
139  h->CurlDone(multi_, *this);
140  const bool server_responded_ok = h->LastOperationResult() == ServerResponse::kOk;
141  const RateController::clock::time_point start_time = h->RequestStartTime();
142  const RateController::clock::time_point end_time = RateController::clock::now();
143  rate_controller_.RequestCompleted(start_time, end_time, server_responded_ok);
144  if (rate_controller_.ServerHasFailed()) {
145  Abort();
146  } else {
147  auto duration = rate_controller_.GetSleepTime();
148  if (duration > RateController::clock::duration(0)) {
149  LOG_DEBUG << "Sleeping for " << std::chrono::duration_cast<std::chrono::seconds>(duration).count()
150  << " seconds due to server congestion.";
151  std::this_thread::sleep_for(duration);
152  }
153  }
154  }
155  } while (msgs_in_queue > 0);
156 }
157 
159  LoopLaunch();
160  LoopListen();
161 }
162 // vim: set tabstop=2 shiftwidth=2 expandtab:
RunMode::kWalkTree
Walk the entire tree (without uploading).
RequestPool::Loop
void Loop()
One iteration of request-listen loop, launches multiple requests, then listens for the result.
Definition: request_pool.cc:158
TreehubServer
Definition: treehub_server.h:11
RunMode::kDryRun
Dry run.
RunMode
RunMode
Execution mode to run garage tools in.
Definition: garage_common.h:6