Aktualizr
C++ SOTA Client
secondary.cc
1 #include <boost/asio/deadline_timer.hpp>
2 #include <boost/asio/io_service.hpp>
3 #include <boost/asio/ip/tcp.hpp>
4 #include <boost/asio/placeholders.hpp>
5 #include <boost/bind.hpp>
6 
7 #include <algorithm>
8 #include <unordered_map>
9 #include <unordered_set>
10 
11 #include "ipuptanesecondary.h"
12 #include "logging/logging.h"
13 #include "secondary.h"
14 #include "secondary_config.h"
15 #include "utilities/utils.h"
16 
17 namespace Primary {
18 
19 using Secondaries = std::vector<std::shared_ptr<SecondaryInterface>>;
20 using SecondaryFactoryRegistry =
21  std::unordered_map<std::string, std::function<Secondaries(const SecondaryConfig&, Aktualizr& aktualizr)>>;
22 
23 static Secondaries createIPSecondaries(const IPSecondariesConfig& config, Aktualizr& aktualizr);
24 
25 // NOLINTNEXTLINE(cppcoreguidelines-interfaces-global-init)
26 static SecondaryFactoryRegistry sec_factory_registry = {
27  {IPSecondariesConfig::Type,
28  [](const SecondaryConfig& config, Aktualizr& aktualizr) {
29  auto ip_sec_cgf = dynamic_cast<const IPSecondariesConfig&>(config);
30  return createIPSecondaries(ip_sec_cgf, aktualizr);
31  }},
32  {VirtualSecondaryConfig::Type,
33  [](const SecondaryConfig& config, Aktualizr& aktualizr) {
34  (void)aktualizr;
35  auto virtual_sec_cgf = dynamic_cast<const VirtualSecondaryConfig&>(config);
36  return Secondaries({std::make_shared<VirtualSecondary>(virtual_sec_cgf)});
37  }},
38  // {
39  // Add another secondary factory here
40  // }
41 };
42 
43 static Secondaries createSecondaries(const SecondaryConfig& config, Aktualizr& aktualizr) {
44  return (sec_factory_registry.at(config.type()))(config, aktualizr);
45 }
46 
47 void initSecondaries(Aktualizr& aktualizr, const boost::filesystem::path& config_file) {
48  if (!boost::filesystem::exists(config_file)) {
49  throw std::invalid_argument("Secondary ECUs config file does not exist: " + config_file.string());
50  }
51 
52  auto secondary_configs = SecondaryConfigParser::parse_config_file(config_file);
53 
54  for (auto& config : secondary_configs) {
55  try {
56  LOG_INFO << "Initializing " << config->type() << " Secondaries...";
57  Secondaries secondaries = createSecondaries(*config, aktualizr);
58 
59  for (const auto& secondary : secondaries) {
60  LOG_INFO << "Adding Secondary with ECU serial: " << secondary->getSerial()
61  << " with hardware ID: " << secondary->getHwId();
62  aktualizr.AddSecondary(secondary);
63  }
64  } catch (const std::exception& exc) {
65  LOG_ERROR << "Failed to initialize a Secondary: " << exc.what();
66  throw exc;
67  }
68  }
69 }
70 
72  public:
73  SecondaryWaiter(Aktualizr& aktualizr, uint16_t wait_port, int timeout_s, Secondaries& secondaries)
74  : aktualizr_(aktualizr),
75  endpoint_{boost::asio::ip::tcp::v4(), wait_port},
76  timeout_{static_cast<boost::posix_time::seconds>(timeout_s)},
77  timer_{io_context_},
78  connected_secondaries_{secondaries} {}
79 
80  void addSecondary(const std::string& ip, uint16_t port) { secondaries_to_wait_for_.insert(key(ip, port)); }
81 
82  void wait() {
83  if (secondaries_to_wait_for_.empty()) {
84  return;
85  }
86 
87  timer_.expires_from_now(timeout_);
88  timer_.async_wait([&](const boost::system::error_code& error_code) {
89  if (!!error_code) {
90  LOG_ERROR << "Wait for Secondaries has failed: " << error_code;
91  throw std::runtime_error("Error while waiting for IP Secondaries");
92  } else {
93  LOG_ERROR << "Timeout while waiting for Secondaries: " << error_code;
94  throw std::runtime_error("Timeout while waiting for IP Secondaries");
95  }
96  io_context_.stop();
97  });
98  accept();
99  io_context_.run();
100  }
101 
102  private:
103  void accept() {
104  LOG_INFO << "Waiting for connection from " << secondaries_to_wait_for_.size() << " Secondaries...";
105  acceptor_.async_accept(con_socket_,
106  boost::bind(&SecondaryWaiter::connectionHdlr, this, boost::asio::placeholders::error));
107  }
108 
109  void connectionHdlr(const boost::system::error_code& error_code) {
110  if (!error_code) {
111  auto sec_ip = con_socket_.remote_endpoint().address().to_string();
112  auto sec_port = con_socket_.remote_endpoint().port();
113 
114  LOG_INFO << "Accepted connection from a Secondary: (" << sec_ip << ":" << sec_port << ")";
115  try {
116  auto secondary = Uptane::IpUptaneSecondary::create(sec_ip, sec_port, con_socket_.native_handle());
117  if (secondary) {
118  connected_secondaries_.push_back(secondary);
119  // set ip/port in the db so that we can match everything later
120  Json::Value d;
121  d["ip"] = sec_ip;
122  d["port"] = sec_port;
123  aktualizr_.SetSecondaryData(secondary->getSerial(), Utils::jsonToCanonicalStr(d));
124  }
125  } catch (const std::exception& exc) {
126  LOG_ERROR << "Failed to initialize a Secondary: " << exc.what();
127  }
128  con_socket_.shutdown(boost::asio::ip::tcp::socket::shutdown_both);
129  con_socket_.close();
130 
131  secondaries_to_wait_for_.erase(key(sec_ip, sec_port));
132  if (!secondaries_to_wait_for_.empty()) {
133  accept();
134  } else {
135  io_context_.stop();
136  }
137  } else {
138  LOG_ERROR << "Failed to accept connection from a Secondary";
139  }
140  }
141 
142  static std::string key(const std::string& ip, uint16_t port) { return (ip + std::to_string(port)); }
143 
144  Aktualizr& aktualizr_;
145 
146  boost::asio::io_service io_context_;
147  boost::asio::ip::tcp::endpoint endpoint_;
148  boost::asio::ip::tcp::acceptor acceptor_{io_context_, endpoint_};
149  boost::asio::ip::tcp::socket con_socket_{io_context_};
150  boost::posix_time::seconds timeout_;
151  boost::asio::deadline_timer timer_;
152 
153  Secondaries& connected_secondaries_;
154  std::unordered_set<std::string> secondaries_to_wait_for_;
155 };
156 
157 // Four options for each Secondary:
158 // 1. Secondary is configured and stored: nothing to do.
159 // 2. Secondary is configured but not stored: it must be new. Try to connect to get information and store it. This will
160 // cause re-registration.
161 // 3. Same as 2 but cannot connect: abort.
162 // 4. Secondary is stored but not configured: it must have been removed. Skip it. This will cause re-registration.
163 static Secondaries createIPSecondaries(const IPSecondariesConfig& config, Aktualizr& aktualizr) {
164  Secondaries result;
165  Secondaries new_secondaries;
166  SecondaryWaiter sec_waiter{aktualizr, config.secondaries_wait_port, config.secondaries_timeout_s, result};
167  auto secondaries_info = aktualizr.GetSecondaries();
168 
169  for (const auto& cfg : config.secondaries_cfg) {
170  SecondaryInterface::Ptr secondary;
171  const SecondaryInfo* info = nullptr;
172 
173  // Try to match the configured Secondaries to stored Secondaries.
174  auto f = std::find_if(secondaries_info.cbegin(), secondaries_info.cend(), [&cfg](const SecondaryInfo& i) {
175  Json::Value d = Utils::parseJSON(i.extra);
176  return d["ip"] == cfg.ip && d["port"] == cfg.port;
177  });
178 
179  if (f == secondaries_info.cend() && config.secondaries_cfg.size() == 1 && secondaries_info.size() == 1 &&
180  secondaries_info[0].extra.empty()) {
181  // /!\ backward compatibility: if we have just one Secondary in the old
182  // storage format (before we had the secondary_ecus table) and the
183  // configuration, migrate it to the new format.
184  info = &secondaries_info[0];
185  Json::Value d;
186  d["ip"] = cfg.ip;
187  d["port"] = cfg.port;
188  aktualizr.SetSecondaryData(info->serial, Utils::jsonToCanonicalStr(d));
189  LOG_INFO << "Migrated a single IP Secondary to new storage format.";
190  } else if (f == secondaries_info.cend()) {
191  // Secondary was not found in storage; it must be new.
192  secondary = Uptane::IpUptaneSecondary::connectAndCreate(cfg.ip, cfg.port);
193  if (secondary == nullptr) {
194  LOG_DEBUG << "Could not connect to IP Secondary at " << cfg.ip << ":" << cfg.port
195  << "; now trying to wait for it.";
196  sec_waiter.addSecondary(cfg.ip, cfg.port);
197  } else {
198  result.push_back(secondary);
199  // set ip/port in the db so that we can match everything later
200  Json::Value d;
201  d["ip"] = cfg.ip;
202  d["port"] = cfg.port;
203  aktualizr.SetSecondaryData(secondary->getSerial(), Utils::jsonToCanonicalStr(d));
204  }
205  continue;
206  } else {
207  // The configured Secondary was found in storage.
208  info = &(*f);
209  }
210 
211  if (secondary == nullptr) {
212  secondary =
213  Uptane::IpUptaneSecondary::connectAndCheck(cfg.ip, cfg.port, info->serial, info->hw_id, info->pub_key);
214  if (secondary == nullptr) {
215  throw std::runtime_error("Unable to connect to or verify IP Secondary at " + cfg.ip + ":" +
216  std::to_string(cfg.port));
217  }
218  }
219 
220  result.push_back(secondary);
221  }
222 
223  sec_waiter.wait();
224 
225  return result;
226 }
227 
228 } // namespace Primary
Aktualizr
This class provides the main APIs necessary for launching and controlling libaktualizr.
Definition: aktualizr.h:24
SecondaryInfo
Definition: types.h:462
Aktualizr::AddSecondary
void AddSecondary(const std::shared_ptr< SecondaryInterface > &secondary)
Add new Secondary to aktualizr.
Definition: aktualizr.cc:103
Primary::IPSecondariesConfig
Definition: secondary_config.h:29
Aktualizr::GetSecondaries
std::vector< SecondaryInfo > GetSecondaries() const
Returns a list of the registered Secondaries, along with some associated metadata.
Definition: aktualizr.cc:111
Primary::SecondaryWaiter
Definition: secondary.cc:71
result
Results of libaktualizr API calls.
Definition: results.h:12
Aktualizr::SetSecondaryData
void SetSecondaryData(const Uptane::EcuSerial &ecu, const std::string &data)
Store some free-form data to be associated with a particular Secondary, to be retrieved later through...
Definition: aktualizr.cc:107