Aktualizr
C++ SOTA Client
All Classes Namespaces Files Functions Variables Enumerations Enumerator Pages
reportqueue_test.cc
1 #include <gtest/gtest.h>
2 
3 #include <unistd.h>
4 #include <future>
5 #include <memory>
6 #include <string>
7 
8 #include <json/json.h>
9 
10 #include "config/config.h"
11 #include "httpfake.h"
12 #include "reportqueue.h"
13 #include "storage/invstorage.h"
14 #include "storage/sqlstorage.h"
15 #include "utilities/types.h" // TimeStamp
16 #include "utilities/utils.h"
17 
18 class HttpFakeRq : public HttpFake {
19  public:
20  HttpFakeRq(const boost::filesystem::path &test_dir_in, size_t expected_events)
21  : HttpFake(test_dir_in, ""), expected_events_(expected_events) {}
22 
23  HttpResponse handle_event(const std::string &url, const Json::Value &data) override {
24  (void)data;
25  if (url == "reportqueue/SingleEvent/events") {
26  EXPECT_EQ(data[0]["eventType"]["id"], "EcuDownloadCompleted");
27  EXPECT_EQ(data[0]["event"]["ecu"], "SingleEvent");
28  ++events_seen;
29  if (events_seen == expected_events_) {
30  expected_events_received.set_value(true);
31  }
32  return HttpResponse("", 200, CURLE_OK, "");
33  } else if (url.find("reportqueue/MultipleEvents") == 0) {
34  for (int i = 0; i < static_cast<int>(data.size()); ++i) {
35  EXPECT_EQ(data[i]["eventType"]["id"], "EcuDownloadCompleted");
36  EXPECT_EQ(data[i]["event"]["ecu"], "MultipleEvents" + std::to_string(events_seen++));
37  }
38  if (events_seen == expected_events_) {
39  expected_events_received.set_value(true);
40  }
41  return HttpResponse("", 200, CURLE_OK, "");
42  } else if (url.find("reportqueue/FailureRecovery") == 0) {
43  if (data.size() < 10) {
44  return HttpResponse("", 400, CURLE_OK, "");
45  } else {
46  for (int i = 0; i < static_cast<int>(data.size()); ++i) {
47  EXPECT_EQ(data[i]["eventType"]["id"], "EcuDownloadCompleted");
48  EXPECT_EQ(data[i]["event"]["ecu"], "FailureRecovery" + std::to_string(i));
49  }
50  events_seen = data.size();
51  if (events_seen == expected_events_) {
52  expected_events_received.set_value(true);
53  }
54  return HttpResponse("", 200, CURLE_OK, "");
55  }
56  } else if (url.find("reportqueue/StoreEvents") == 0) {
57  for (int i = 0; i < static_cast<int>(data.size()); ++i) {
58  EXPECT_EQ(data[i]["eventType"]["id"], "EcuDownloadCompleted");
59  EXPECT_EQ(data[i]["event"]["ecu"], "StoreEvents" + std::to_string(events_seen++));
60  }
61  if (events_seen == expected_events_) {
62  expected_events_received.set_value(true);
63  }
64  return HttpResponse("", 200, CURLE_OK, "");
65  }
66  LOG_ERROR << "Unexpected event: " << data;
67  return HttpResponse("", 400, CURLE_OK, "");
68  }
69 
70  size_t events_seen{0};
71  size_t expected_events_;
72  std::promise<bool> expected_events_received{};
73 };
74 
75 /* Test one event. */
76 TEST(ReportQueue, SingleEvent) {
77  TemporaryDirectory temp_dir;
78  Config config;
79  config.storage.path = temp_dir.Path();
80  config.tls.server = "reportqueue/SingleEvent";
81 
82  size_t num_events = 1;
83  auto http = std::make_shared<HttpFakeRq>(temp_dir.Path(), num_events);
84  auto sql_storage = std::make_shared<SQLStorage>(config.storage, false);
85  ReportQueue report_queue(config, http, sql_storage);
86 
87  report_queue.enqueue(std_::make_unique<EcuDownloadCompletedReport>(Uptane::EcuSerial("SingleEvent"), "", true));
88 
89  // Wait at most 30 seconds for the message to get processed.
90  http->expected_events_received.get_future().wait_for(std::chrono::seconds(20));
91  EXPECT_EQ(http->events_seen, num_events);
92 }
93 
94 /* Test ten events. */
95 TEST(ReportQueue, MultipleEvents) {
96  TemporaryDirectory temp_dir;
97  Config config;
98  config.storage.path = temp_dir.Path();
99  config.tls.server = "reportqueue/MultipleEvents";
100 
101  size_t num_events = 10;
102  auto http = std::make_shared<HttpFakeRq>(temp_dir.Path(), num_events);
103  auto sql_storage = std::make_shared<SQLStorage>(config.storage, false);
104  ReportQueue report_queue(config, http, sql_storage);
105 
106  for (int i = 0; i < 10; ++i) {
107  report_queue.enqueue(std_::make_unique<EcuDownloadCompletedReport>(
108  Uptane::EcuSerial("MultipleEvents" + std::to_string(i)), "", true));
109  }
110 
111  // Wait at most 30 seconds for the messages to get processed.
112  http->expected_events_received.get_future().wait_for(std::chrono::seconds(20));
113  EXPECT_EQ(http->events_seen, num_events);
114 }
115 
116 /* Test ten events, but the "server" returns an error the first nine times. The
117  * tenth time should succeed with an array of all ten events. */
118 TEST(ReportQueue, FailureRecovery) {
119  TemporaryDirectory temp_dir;
120  Config config;
121  config.storage.path = temp_dir.Path();
122  config.tls.server = "reportqueue/FailureRecovery";
123 
124  size_t num_events = 10;
125  auto http = std::make_shared<HttpFakeRq>(temp_dir.Path(), num_events);
126  auto sql_storage = std::make_shared<SQLStorage>(config.storage, false);
127  ReportQueue report_queue(config, http, sql_storage);
128 
129  for (size_t i = 0; i < num_events; ++i) {
130  report_queue.enqueue(std_::make_unique<EcuDownloadCompletedReport>(
131  Uptane::EcuSerial("FailureRecovery" + std::to_string(i)), "", true));
132  }
133 
134  // Wait at most 20 seconds for the messages to get processed.
135  http->expected_events_received.get_future().wait_for(std::chrono::seconds(20));
136  EXPECT_EQ(http->events_seen, num_events);
137 }
138 
139 /* Test persistent storage of unsent events in the database across
140  * ReportQueue instantiations. */
141 TEST(ReportQueue, StoreEvents) {
142  TemporaryDirectory temp_dir;
143  Config config;
144  config.storage.path = temp_dir.Path();
145  config.tls.server = "";
146 
147  auto sql_storage = std::make_shared<SQLStorage>(config.storage, false);
148  size_t num_events = 10;
149  auto check_sql = [sql_storage](size_t count) {
150  int64_t max_id = 0;
151  Json::Value report_array{Json::arrayValue};
152  sql_storage->loadReportEvents(&report_array, &max_id);
153  EXPECT_EQ(max_id, count);
154  };
155 
156  {
157  auto http = std::make_shared<HttpFakeRq>(temp_dir.Path(), num_events);
158  ReportQueue report_queue(config, http, sql_storage);
159  for (size_t i = 0; i < num_events; ++i) {
160  report_queue.enqueue(std_::make_unique<EcuDownloadCompletedReport>(
161  Uptane::EcuSerial("StoreEvents" + std::to_string(i)), "", true));
162  }
163  check_sql(num_events);
164  }
165 
166  config.tls.server = "reportqueue/StoreEvents";
167  auto http = std::make_shared<HttpFakeRq>(temp_dir.Path(), num_events);
168  ReportQueue report_queue(config, http, sql_storage);
169  // Wait at most 20 seconds for the messages to get processed.
170  http->expected_events_received.get_future().wait_for(std::chrono::seconds(20));
171  EXPECT_EQ(http->events_seen, num_events);
172  sleep(1);
173  check_sql(0);
174 }
175 
176 #ifndef __NO_MAIN__
177 int main(int argc, char **argv) {
178  ::testing::InitGoogleTest(&argc, argv);
179  logger_set_threshold(boost::log::trivial::trace);
180  return RUN_ALL_TESTS();
181 }
182 #endif
General data structures.
Definition: types.cc:55
Configuration object for an aktualizr instance running on a Primary ECU.
Definition: config.h:74