Aktualizr
C++ SOTA Client
All Classes Namespaces Files Functions Variables Enumerations Enumerator Pages
secondary.cc
1 #include <boost/asio/deadline_timer.hpp>
2 #include <boost/asio/io_service.hpp>
3 #include <boost/asio/ip/tcp.hpp>
4 #include <boost/asio/placeholders.hpp>
5 #include <boost/bind.hpp>
6 
7 #include <algorithm>
8 #include <unordered_map>
9 #include <unordered_set>
10 
11 #include "ipuptanesecondary.h"
12 #include "secondary.h"
13 #include "secondary_config.h"
14 
15 namespace Primary {
16 
17 using Secondaries = std::vector<std::shared_ptr<Uptane::SecondaryInterface>>;
18 using SecondaryFactoryRegistry =
19  std::unordered_map<std::string, std::function<Secondaries(const SecondaryConfig&, Aktualizr& aktualizr)>>;
20 
21 static Secondaries createIPSecondaries(const IPSecondariesConfig& config, Aktualizr& aktualizr);
22 
23 static SecondaryFactoryRegistry sec_factory_registry = {
24  {IPSecondariesConfig::Type,
25  [](const SecondaryConfig& config, Aktualizr& aktualizr) {
26  auto ip_sec_cgf = dynamic_cast<const IPSecondariesConfig&>(config);
27  return createIPSecondaries(ip_sec_cgf, aktualizr);
28  }},
29  {VirtualSecondaryConfig::Type,
30  [](const SecondaryConfig& config, Aktualizr& /* unused */) {
31  auto virtual_sec_cgf = dynamic_cast<const VirtualSecondaryConfig&>(config);
32  return Secondaries({std::make_shared<VirtualSecondary>(virtual_sec_cgf)});
33  }},
34  // {
35  // Add another secondary factory here
36  // }
37 };
38 
39 static Secondaries createSecondaries(const SecondaryConfig& config, Aktualizr& aktualizr) {
40  return (sec_factory_registry.at(config.type()))(config, aktualizr);
41 }
42 
43 void initSecondaries(Aktualizr& aktualizr, const boost::filesystem::path& config_file) {
44  if (!boost::filesystem::exists(config_file)) {
45  throw std::invalid_argument("Secondary ECUs config file does not exist: " + config_file.string());
46  }
47 
48  auto secondary_configs = SecondaryConfigParser::parse_config_file(config_file);
49 
50  for (auto& config : secondary_configs) {
51  try {
52  LOG_INFO << "Registering " << config->type() << " Secondaries...";
53  Secondaries secondaries = createSecondaries(*config, aktualizr);
54 
55  for (const auto& secondary : secondaries) {
56  LOG_INFO << "Adding Secondary with ECU serial: " << secondary->getSerial()
57  << " with hardware ID: " << secondary->getHwId();
58  aktualizr.AddSecondary(secondary);
59  }
60  } catch (const std::exception& exc) {
61  LOG_ERROR << "Failed to initialize a Secondary: " << exc.what();
62  throw exc;
63  }
64  }
65 }
66 
68  public:
69  SecondaryWaiter(uint16_t wait_port, int timeout_s, Secondaries& secondaries)
70  : endpoint_{boost::asio::ip::tcp::v4(), wait_port},
71  timeout_{static_cast<boost::posix_time::seconds>(timeout_s)},
72  timer_{io_context_},
73  connected_secondaries_(secondaries) {}
74 
75  void addSecondary(const std::string& ip, uint16_t port) { secondaries_to_wait_for_.insert(key(ip, port)); }
76 
77  void wait() {
78  if (secondaries_to_wait_for_.empty()) {
79  return;
80  }
81 
82  timer_.expires_from_now(timeout_);
83  timer_.async_wait([&](const boost::system::error_code& error_code) {
84  if (!!error_code) {
85  LOG_ERROR << "Wait for Secondaries has failed: " << error_code;
86  throw std::runtime_error("Error while waiting for Secondaries");
87  } else {
88  LOG_ERROR << "Timeout while waiting for Secondaries: " << error_code;
89  throw std::runtime_error("Timeout while waiting for Secondaries");
90  }
91  io_context_.stop();
92  });
93  accept();
94  io_context_.run();
95  }
96 
97  private:
98  void accept() {
99  LOG_INFO << "Waiting for connection from " << secondaries_to_wait_for_.size() << " Secondaries...";
100  acceptor_.async_accept(con_socket_,
101  boost::bind(&SecondaryWaiter::connectionHdlr, this, boost::asio::placeholders::error));
102  }
103 
104  void connectionHdlr(const boost::system::error_code& error_code) {
105  if (!error_code) {
106  auto sec_ip = con_socket_.remote_endpoint().address().to_string();
107  auto sec_port = con_socket_.remote_endpoint().port();
108 
109  LOG_INFO << "Accepted connection from a Secondary: (" << sec_ip << ":" << sec_port << ")";
110  try {
111  auto secondary = Uptane::IpUptaneSecondary::create(sec_ip, sec_port, con_socket_.native_handle());
112  if (secondary) {
113  connected_secondaries_.push_back(secondary);
114  }
115  } catch (const std::exception& exc) {
116  LOG_ERROR << "Failed to initialize a Secondary: " << exc.what();
117  }
118  con_socket_.shutdown(boost::asio::ip::tcp::socket::shutdown_both);
119  con_socket_.close();
120 
121  secondaries_to_wait_for_.erase(key(sec_ip, sec_port));
122  if (!secondaries_to_wait_for_.empty()) {
123  accept();
124  } else {
125  io_context_.stop();
126  }
127  } else {
128  LOG_ERROR << "Failed to accept connection from a Secondary";
129  }
130  }
131 
132  static std::string key(const std::string& ip, uint16_t port) { return (ip + std::to_string(port)); }
133 
134  private:
135  boost::asio::io_service io_context_;
136  boost::asio::ip::tcp::endpoint endpoint_;
137  boost::asio::ip::tcp::acceptor acceptor_{io_context_, endpoint_};
138  boost::asio::ip::tcp::socket con_socket_{io_context_};
139  boost::posix_time::seconds timeout_;
140  boost::asio::deadline_timer timer_;
141 
142  Secondaries& connected_secondaries_;
143  std::unordered_set<std::string> secondaries_to_wait_for_;
144 };
145 
146 static Secondaries createIPSecondaries(const IPSecondariesConfig& config, Aktualizr& aktualizr) {
147  Secondaries result;
148  const bool provision = !aktualizr.IsRegistered();
149 
150  if (provision) {
151  SecondaryWaiter sec_waiter{config.secondaries_wait_port, config.secondaries_timeout_s, result};
152 
153  for (const auto& ip_sec_cfg : config.secondaries_cfg) {
154  auto secondary = Uptane::IpUptaneSecondary::connectAndCreate(ip_sec_cfg.ip, ip_sec_cfg.port);
155  if (secondary) {
156  result.push_back(secondary);
157  } else {
158  sec_waiter.addSecondary(ip_sec_cfg.ip, ip_sec_cfg.port);
159  }
160  }
161 
162  sec_waiter.wait();
163 
164  // set ip/port in the db so that we can match everything later
165  for (size_t k = 0; k < config.secondaries_cfg.size(); k++) {
166  const auto cfg = config.secondaries_cfg[k];
167  const auto sec = result[k];
168  Json::Value d;
169  d["ip"] = cfg.ip;
170  d["port"] = cfg.port;
171  aktualizr.SetSecondaryData(sec->getSerial(), Utils::jsonToCanonicalStr(d));
172  }
173  } else {
174  auto secondaries_info = aktualizr.GetSecondaries();
175 
176  for (const auto& cfg : config.secondaries_cfg) {
177  Uptane::SecondaryInterface::Ptr secondary;
178  const SecondaryInfo* info = nullptr;
179 
180  auto f = std::find_if(secondaries_info.cbegin(), secondaries_info.cend(), [&cfg](const SecondaryInfo& i) {
181  Json::Value d = Utils::parseJSON(i.extra);
182  return d["ip"] == cfg.ip && d["port"] == cfg.port;
183  });
184 
185  if (f == secondaries_info.cend() && config.secondaries_cfg.size() == 1 && secondaries_info.size() == 1) {
186  // /!\ backward compatibility: handle the case with one secondary, but
187  // store the info for later anyway
188  info = &secondaries_info[0];
189  Json::Value d;
190  d["ip"] = cfg.ip;
191  d["port"] = cfg.port;
192  aktualizr.SetSecondaryData(info->serial, Utils::jsonToCanonicalStr(d));
193  LOG_INFO << "Migrated single IP Secondary to new storage format";
194  } else if (f == secondaries_info.cend()) {
195  // Match the other way if we can
196  secondary = Uptane::IpUptaneSecondary::connectAndCreate(cfg.ip, cfg.port);
197  if (secondary == nullptr) {
198  LOG_ERROR << "Could not instantiate Secondary " << cfg.ip << ":" << cfg.port;
199  continue;
200  }
201  auto f_serial =
202  std::find_if(secondaries_info.cbegin(), secondaries_info.cend(),
203  [&secondary](const SecondaryInfo& i) { return i.serial == secondary->getSerial(); });
204  if (f_serial == secondaries_info.cend()) {
205  LOG_ERROR << "Could not instantiate Secondary " << cfg.ip << ":" << cfg.port;
206  continue;
207  }
208  info = &(*f_serial);
209  } else {
210  info = &(*f);
211  }
212 
213  if (secondary == nullptr) {
214  secondary =
215  Uptane::IpUptaneSecondary::connectAndCheck(cfg.ip, cfg.port, info->serial, info->hw_id, info->pub_key);
216  }
217 
218  if (secondary != nullptr) {
219  result.push_back(secondary);
220  } else {
221  LOG_ERROR << "Could not instantiate Secondary " << info->serial;
222  }
223  }
224  }
225 
226  return result;
227 }
228 
229 } // namespace Primary
Aktualizr::IsRegistered
bool IsRegistered() const
Returns true if the device has been registered to the backend succesffully.
Definition: aktualizr.cc:33
Aktualizr
This class provides the main APIs necessary for launching and controlling libaktualizr.
Definition: aktualizr.h:20
SecondaryInfo
Definition: invstorage.h:106
Primary::IPSecondariesConfig
Definition: secondary_config.h:29
Aktualizr::GetSecondaries
std::vector< SecondaryInfo > GetSecondaries() const
Returns a list of the registered secondaries, along with some associated metadata.
Definition: aktualizr.cc:108
Primary::SecondaryWaiter
Definition: secondary.cc:67
result
Results of libaktualizr API calls.
Definition: results.h:13
Aktualizr::SetSecondaryData
void SetSecondaryData(const Uptane::EcuSerial &ecu, const std::string &data)
Store some free-form data to be associated with a particular secondary, to be retrieved later through...
Definition: aktualizr.cc:104
Aktualizr::AddSecondary
void AddSecondary(const std::shared_ptr< Uptane::SecondaryInterface > &secondary)
Add new secondary to aktualizr.
Definition: aktualizr.cc:100