Aktualizr
C++ SOTA Client
All Classes Namespaces Files Functions Variables Enumerations Enumerator Pages
reportqueue_test.cc
1 #include <gtest/gtest.h>
2 
3 #include <unistd.h>
4 #include <future>
5 #include <memory>
6 #include <string>
7 
8 #include <json/json.h>
9 
10 #include "httpfake.h"
11 #include "libaktualizr/config.h"
12 #include "reportqueue.h"
13 #include "storage/invstorage.h"
14 #include "storage/sqlstorage.h"
15 #include "utilities/utils.h"
16 
17 class HttpFakeRq : public HttpFake {
18  public:
19  HttpFakeRq(const boost::filesystem::path &test_dir_in, size_t expected_events)
20  : HttpFake(test_dir_in, ""), expected_events_(expected_events) {}
21 
22  HttpResponse handle_event(const std::string &url, const Json::Value &data) override {
23  (void)data;
24  if (url == "reportqueue/SingleEvent/events") {
25  EXPECT_EQ(data[0]["eventType"]["id"], "EcuDownloadCompleted");
26  EXPECT_EQ(data[0]["event"]["ecu"], "SingleEvent");
27  ++events_seen;
28  if (events_seen == expected_events_) {
29  expected_events_received.set_value(true);
30  }
31  return HttpResponse("", 200, CURLE_OK, "");
32  } else if (url.find("reportqueue/MultipleEvents") == 0) {
33  for (int i = 0; i < static_cast<int>(data.size()); ++i) {
34  EXPECT_EQ(data[i]["eventType"]["id"], "EcuDownloadCompleted");
35  EXPECT_EQ(data[i]["event"]["ecu"], "MultipleEvents" + std::to_string(events_seen++));
36  }
37  if (events_seen == expected_events_) {
38  expected_events_received.set_value(true);
39  }
40  return HttpResponse("", 200, CURLE_OK, "");
41  } else if (url.find("reportqueue/FailureRecovery") == 0) {
42  if (data.size() < 10) {
43  return HttpResponse("", 400, CURLE_OK, "");
44  } else {
45  for (int i = 0; i < static_cast<int>(data.size()); ++i) {
46  EXPECT_EQ(data[i]["eventType"]["id"], "EcuDownloadCompleted");
47  EXPECT_EQ(data[i]["event"]["ecu"], "FailureRecovery" + std::to_string(i));
48  }
49  events_seen = data.size();
50  if (events_seen == expected_events_) {
51  expected_events_received.set_value(true);
52  }
53  return HttpResponse("", 200, CURLE_OK, "");
54  }
55  } else if (url.find("reportqueue/StoreEvents") == 0) {
56  for (int i = 0; i < static_cast<int>(data.size()); ++i) {
57  EXPECT_EQ(data[i]["eventType"]["id"], "EcuDownloadCompleted");
58  EXPECT_EQ(data[i]["event"]["ecu"], "StoreEvents" + std::to_string(events_seen++));
59  }
60  if (events_seen == expected_events_) {
61  expected_events_received.set_value(true);
62  }
63  return HttpResponse("", 200, CURLE_OK, "");
64  }
65  LOG_ERROR << "Unexpected event: " << data;
66  return HttpResponse("", 400, CURLE_OK, "");
67  }
68 
69  size_t events_seen{0};
70  size_t expected_events_;
71  std::promise<bool> expected_events_received{};
72 };
73 
74 /* Test one event. */
75 TEST(ReportQueue, SingleEvent) {
76  TemporaryDirectory temp_dir;
77  Config config;
78  config.storage.path = temp_dir.Path();
79  config.tls.server = "reportqueue/SingleEvent";
80 
81  size_t num_events = 1;
82  auto http = std::make_shared<HttpFakeRq>(temp_dir.Path(), num_events);
83  auto sql_storage = std::make_shared<SQLStorage>(config.storage, false);
84  ReportQueue report_queue(config, http, sql_storage);
85 
86  report_queue.enqueue(std_::make_unique<EcuDownloadCompletedReport>(Uptane::EcuSerial("SingleEvent"), "", true));
87 
88  // Wait at most 30 seconds for the message to get processed.
89  http->expected_events_received.get_future().wait_for(std::chrono::seconds(20));
90  EXPECT_EQ(http->events_seen, num_events);
91 }
92 
93 /* Test ten events. */
94 TEST(ReportQueue, MultipleEvents) {
95  TemporaryDirectory temp_dir;
96  Config config;
97  config.storage.path = temp_dir.Path();
98  config.tls.server = "reportqueue/MultipleEvents";
99 
100  size_t num_events = 10;
101  auto http = std::make_shared<HttpFakeRq>(temp_dir.Path(), num_events);
102  auto sql_storage = std::make_shared<SQLStorage>(config.storage, false);
103  ReportQueue report_queue(config, http, sql_storage);
104 
105  for (int i = 0; i < 10; ++i) {
106  report_queue.enqueue(std_::make_unique<EcuDownloadCompletedReport>(
107  Uptane::EcuSerial("MultipleEvents" + std::to_string(i)), "", true));
108  }
109 
110  // Wait at most 30 seconds for the messages to get processed.
111  http->expected_events_received.get_future().wait_for(std::chrono::seconds(20));
112  EXPECT_EQ(http->events_seen, num_events);
113 }
114 
115 /* Test ten events, but the "server" returns an error the first nine times. The
116  * tenth time should succeed with an array of all ten events. */
117 TEST(ReportQueue, FailureRecovery) {
118  TemporaryDirectory temp_dir;
119  Config config;
120  config.storage.path = temp_dir.Path();
121  config.tls.server = "reportqueue/FailureRecovery";
122 
123  size_t num_events = 10;
124  auto http = std::make_shared<HttpFakeRq>(temp_dir.Path(), num_events);
125  auto sql_storage = std::make_shared<SQLStorage>(config.storage, false);
126  ReportQueue report_queue(config, http, sql_storage);
127 
128  for (size_t i = 0; i < num_events; ++i) {
129  report_queue.enqueue(std_::make_unique<EcuDownloadCompletedReport>(
130  Uptane::EcuSerial("FailureRecovery" + std::to_string(i)), "", true));
131  }
132 
133  // Wait at most 20 seconds for the messages to get processed.
134  http->expected_events_received.get_future().wait_for(std::chrono::seconds(20));
135  EXPECT_EQ(http->events_seen, num_events);
136 }
137 
138 /* Test persistent storage of unsent events in the database across
139  * ReportQueue instantiations. */
140 TEST(ReportQueue, StoreEvents) {
141  TemporaryDirectory temp_dir;
142  Config config;
143  config.storage.path = temp_dir.Path();
144  config.tls.server = "";
145 
146  auto sql_storage = std::make_shared<SQLStorage>(config.storage, false);
147  size_t num_events = 10;
148  auto check_sql = [sql_storage](size_t count) {
149  int64_t max_id = 0;
150  Json::Value report_array{Json::arrayValue};
151  sql_storage->loadReportEvents(&report_array, &max_id);
152  EXPECT_EQ(max_id, count);
153  };
154 
155  {
156  auto http = std::make_shared<HttpFakeRq>(temp_dir.Path(), num_events);
157  ReportQueue report_queue(config, http, sql_storage);
158  for (size_t i = 0; i < num_events; ++i) {
159  report_queue.enqueue(std_::make_unique<EcuDownloadCompletedReport>(
160  Uptane::EcuSerial("StoreEvents" + std::to_string(i)), "", true));
161  }
162  check_sql(num_events);
163  }
164 
165  config.tls.server = "reportqueue/StoreEvents";
166  auto http = std::make_shared<HttpFakeRq>(temp_dir.Path(), num_events);
167  ReportQueue report_queue(config, http, sql_storage);
168  // Wait at most 20 seconds for the messages to get processed.
169  http->expected_events_received.get_future().wait_for(std::chrono::seconds(20));
170  EXPECT_EQ(http->events_seen, num_events);
171  sleep(1);
172  check_sql(0);
173 }
174 
175 #ifndef __NO_MAIN__
176 int main(int argc, char **argv) {
177  ::testing::InitGoogleTest(&argc, argv);
178  logger_set_threshold(boost::log::trivial::trace);
179  return RUN_ALL_TESTS();
180 }
181 #endif
HttpFakeRq
Definition: reportqueue_test.cc:17
HttpFake
Definition: httpfake.h:20
data
General data structures.
Definition: types.h:217
HttpResponse
Definition: httpinterface.h:17
Config
Configuration object for an aktualizr instance running on a Primary ECU.
Definition: config.h:208
Uptane::EcuSerial
Definition: types.h:346
TemporaryDirectory
Definition: utils.h:82
ReportQueue
Definition: reportqueue.h:86