Aktualizr
C++ SOTA Client
All Classes Namespaces Files Functions Variables Enumerations Enumerator Pages
secondary.cc
1 #include <boost/asio/deadline_timer.hpp>
2 #include <boost/asio/io_service.hpp>
3 #include <boost/asio/ip/tcp.hpp>
4 #include <boost/asio/placeholders.hpp>
5 #include <boost/bind.hpp>
6 
7 #include <algorithm>
8 #include <unordered_map>
9 #include <unordered_set>
10 
11 #include "ipuptanesecondary.h"
12 #include "secondary.h"
13 #include "secondary_config.h"
14 
15 namespace Primary {
16 
17 using Secondaries = std::vector<std::shared_ptr<SecondaryInterface>>;
18 using SecondaryFactoryRegistry =
19  std::unordered_map<std::string, std::function<Secondaries(const SecondaryConfig&, Aktualizr& aktualizr)>>;
20 
21 static Secondaries createIPSecondaries(const IPSecondariesConfig& config, Aktualizr& aktualizr);
22 
23 static SecondaryFactoryRegistry sec_factory_registry = {
24  {IPSecondariesConfig::Type,
25  [](const SecondaryConfig& config, Aktualizr& aktualizr) {
26  auto ip_sec_cgf = dynamic_cast<const IPSecondariesConfig&>(config);
27  return createIPSecondaries(ip_sec_cgf, aktualizr);
28  }},
29  {VirtualSecondaryConfig::Type,
30  [](const SecondaryConfig& config, Aktualizr& aktualizr) {
31  (void)aktualizr;
32  auto virtual_sec_cgf = dynamic_cast<const VirtualSecondaryConfig&>(config);
33  return Secondaries({std::make_shared<VirtualSecondary>(virtual_sec_cgf)});
34  }},
35  // {
36  // Add another secondary factory here
37  // }
38 };
39 
40 static Secondaries createSecondaries(const SecondaryConfig& config, Aktualizr& aktualizr) {
41  return (sec_factory_registry.at(config.type()))(config, aktualizr);
42 }
43 
44 void initSecondaries(Aktualizr& aktualizr, const boost::filesystem::path& config_file) {
45  if (!boost::filesystem::exists(config_file)) {
46  throw std::invalid_argument("Secondary ECUs config file does not exist: " + config_file.string());
47  }
48 
49  auto secondary_configs = SecondaryConfigParser::parse_config_file(config_file);
50 
51  for (auto& config : secondary_configs) {
52  try {
53  LOG_INFO << "Initializing " << config->type() << " Secondaries...";
54  Secondaries secondaries = createSecondaries(*config, aktualizr);
55 
56  for (const auto& secondary : secondaries) {
57  LOG_INFO << "Adding Secondary with ECU serial: " << secondary->getSerial()
58  << " with hardware ID: " << secondary->getHwId();
59  aktualizr.AddSecondary(secondary);
60  }
61  } catch (const std::exception& exc) {
62  LOG_ERROR << "Failed to initialize a Secondary: " << exc.what();
63  throw exc;
64  }
65  }
66 }
67 
69  public:
70  SecondaryWaiter(Aktualizr& aktualizr, uint16_t wait_port, int timeout_s, Secondaries& secondaries)
71  : aktualizr_(aktualizr),
72  endpoint_{boost::asio::ip::tcp::v4(), wait_port},
73  timeout_{static_cast<boost::posix_time::seconds>(timeout_s)},
74  timer_{io_context_},
75  connected_secondaries_{secondaries} {}
76 
77  void addSecondary(const std::string& ip, uint16_t port) { secondaries_to_wait_for_.insert(key(ip, port)); }
78 
79  void wait() {
80  if (secondaries_to_wait_for_.empty()) {
81  return;
82  }
83 
84  timer_.expires_from_now(timeout_);
85  timer_.async_wait([&](const boost::system::error_code& error_code) {
86  if (!!error_code) {
87  LOG_ERROR << "Wait for Secondaries has failed: " << error_code;
88  throw std::runtime_error("Error while waiting for IP Secondaries");
89  } else {
90  LOG_ERROR << "Timeout while waiting for Secondaries: " << error_code;
91  throw std::runtime_error("Timeout while waiting for IP Secondaries");
92  }
93  io_context_.stop();
94  });
95  accept();
96  io_context_.run();
97  }
98 
99  private:
100  void accept() {
101  LOG_INFO << "Waiting for connection from " << secondaries_to_wait_for_.size() << " Secondaries...";
102  acceptor_.async_accept(con_socket_,
103  boost::bind(&SecondaryWaiter::connectionHdlr, this, boost::asio::placeholders::error));
104  }
105 
106  void connectionHdlr(const boost::system::error_code& error_code) {
107  if (!error_code) {
108  auto sec_ip = con_socket_.remote_endpoint().address().to_string();
109  auto sec_port = con_socket_.remote_endpoint().port();
110 
111  LOG_INFO << "Accepted connection from a Secondary: (" << sec_ip << ":" << sec_port << ")";
112  try {
113  auto secondary = Uptane::IpUptaneSecondary::create(sec_ip, sec_port, con_socket_.native_handle());
114  if (secondary) {
115  connected_secondaries_.push_back(secondary);
116  // set ip/port in the db so that we can match everything later
117  Json::Value d;
118  d["ip"] = sec_ip;
119  d["port"] = sec_port;
120  aktualizr_.SetSecondaryData(secondary->getSerial(), Utils::jsonToCanonicalStr(d));
121  }
122  } catch (const std::exception& exc) {
123  LOG_ERROR << "Failed to initialize a Secondary: " << exc.what();
124  }
125  con_socket_.shutdown(boost::asio::ip::tcp::socket::shutdown_both);
126  con_socket_.close();
127 
128  secondaries_to_wait_for_.erase(key(sec_ip, sec_port));
129  if (!secondaries_to_wait_for_.empty()) {
130  accept();
131  } else {
132  io_context_.stop();
133  }
134  } else {
135  LOG_ERROR << "Failed to accept connection from a Secondary";
136  }
137  }
138 
139  static std::string key(const std::string& ip, uint16_t port) { return (ip + std::to_string(port)); }
140 
141  private:
142  Aktualizr& aktualizr_;
143 
144  boost::asio::io_service io_context_;
145  boost::asio::ip::tcp::endpoint endpoint_;
146  boost::asio::ip::tcp::acceptor acceptor_{io_context_, endpoint_};
147  boost::asio::ip::tcp::socket con_socket_{io_context_};
148  boost::posix_time::seconds timeout_;
149  boost::asio::deadline_timer timer_;
150 
151  Secondaries& connected_secondaries_;
152  std::unordered_set<std::string> secondaries_to_wait_for_;
153 };
154 
155 // Four options for each Secondary:
156 // 1. Secondary is configured and stored: nothing to do.
157 // 2. Secondary is configured but not stored: it must be new. Try to connect to get information and store it. This will
158 // cause re-registration.
159 // 3. Same as 2 but cannot connect: abort.
160 // 4. Secondary is stored but not configured: it must have been removed. Skip it. This will cause re-registration.
161 static Secondaries createIPSecondaries(const IPSecondariesConfig& config, Aktualizr& aktualizr) {
162  Secondaries result;
163  Secondaries new_secondaries;
164  SecondaryWaiter sec_waiter{aktualizr, config.secondaries_wait_port, config.secondaries_timeout_s, result};
165  auto secondaries_info = aktualizr.GetSecondaries();
166 
167  for (const auto& cfg : config.secondaries_cfg) {
168  SecondaryInterface::Ptr secondary;
169  const SecondaryInfo* info = nullptr;
170 
171  // Try to match the configured Secondaries to stored Secondaries.
172  auto f = std::find_if(secondaries_info.cbegin(), secondaries_info.cend(), [&cfg](const SecondaryInfo& i) {
173  Json::Value d = Utils::parseJSON(i.extra);
174  return d["ip"] == cfg.ip && d["port"] == cfg.port;
175  });
176 
177  if (f == secondaries_info.cend() && config.secondaries_cfg.size() == 1 && secondaries_info.size() == 1 &&
178  secondaries_info[0].extra.empty()) {
179  // /!\ backward compatibility: if we have just one Secondary in the old
180  // storage format (before we had the secondary_ecus table) and the
181  // configuration, migrate it to the new format.
182  info = &secondaries_info[0];
183  Json::Value d;
184  d["ip"] = cfg.ip;
185  d["port"] = cfg.port;
186  aktualizr.SetSecondaryData(info->serial, Utils::jsonToCanonicalStr(d));
187  LOG_INFO << "Migrated a single IP Secondary to new storage format.";
188  } else if (f == secondaries_info.cend()) {
189  // Secondary was not found in storage; it must be new.
190  secondary = Uptane::IpUptaneSecondary::connectAndCreate(cfg.ip, cfg.port);
191  if (secondary == nullptr) {
192  LOG_DEBUG << "Could not connect to IP Secondary at " << cfg.ip << ":" << cfg.port
193  << "; now trying to wait for it.";
194  sec_waiter.addSecondary(cfg.ip, cfg.port);
195  } else {
196  result.push_back(secondary);
197  // set ip/port in the db so that we can match everything later
198  Json::Value d;
199  d["ip"] = cfg.ip;
200  d["port"] = cfg.port;
201  aktualizr.SetSecondaryData(secondary->getSerial(), Utils::jsonToCanonicalStr(d));
202  }
203  continue;
204  } else {
205  // The configured Secondary was found in storage.
206  info = &(*f);
207  }
208 
209  if (secondary == nullptr) {
210  secondary =
211  Uptane::IpUptaneSecondary::connectAndCheck(cfg.ip, cfg.port, info->serial, info->hw_id, info->pub_key);
212  if (secondary == nullptr) {
213  throw std::runtime_error("Unable to connect to or verify IP Secondary at " + cfg.ip + ":" +
214  std::to_string(cfg.port));
215  }
216  }
217 
218  result.push_back(secondary);
219  }
220 
221  sec_waiter.wait();
222 
223  return result;
224 }
225 
226 } // namespace Primary
void AddSecondary(const std::shared_ptr< SecondaryInterface > &secondary)
Add new Secondary to aktualizr.
Definition: aktualizr.cc:99
std::vector< SecondaryInfo > GetSecondaries() const
Returns a list of the registered Secondaries, along with some associated metadata.
Definition: aktualizr.cc:107
void SetSecondaryData(const Uptane::EcuSerial &ecu, const std::string &data)
Store some free-form data to be associated with a particular Secondary, to be retrieved later through...
Definition: aktualizr.cc:103
Results of libaktualizr API calls.
Definition: results.h:12
This class provides the main APIs necessary for launching and controlling libaktualizr.
Definition: aktualizr.h:20