Aktualizr
C++ SOTA Client
All Classes Namespaces Files Functions Variables Enumerations Enumerator Pages
secondary.cc
1 #include <boost/asio/deadline_timer.hpp>
2 #include <boost/asio/io_service.hpp>
3 #include <boost/asio/ip/tcp.hpp>
4 #include <boost/asio/placeholders.hpp>
5 #include <boost/bind.hpp>
6 
7 #include <unordered_map>
8 #include <unordered_set>
9 
10 #include "ipuptanesecondary.h"
11 #include "secondary.h"
12 #include "secondary_config.h"
13 
14 namespace Primary {
15 
16 using Secondaries = std::vector<std::shared_ptr<Uptane::SecondaryInterface>>;
17 using SecondaryFactoryRegistry = std::unordered_map<std::string, std::function<Secondaries(const SecondaryConfig&)>>;
18 
19 static Secondaries createIPSecondaries(const IPSecondariesConfig& config);
20 
21 static SecondaryFactoryRegistry sec_factory_registry = {
22  {IPSecondariesConfig::Type,
23  [](const SecondaryConfig& config) {
24  auto ip_sec_cgf = dynamic_cast<const IPSecondariesConfig&>(config);
25  return createIPSecondaries(ip_sec_cgf);
26  }},
27  {VirtualSecondaryConfig::Type,
28  [](const SecondaryConfig& config) {
29  auto virtual_sec_cgf = dynamic_cast<const VirtualSecondaryConfig&>(config);
30  return Secondaries({std::make_shared<VirtualSecondary>(virtual_sec_cgf)});
31  }},
32  // {
33  // Add another secondary factory here
34  // }
35 };
36 
37 static Secondaries createSecondaries(const SecondaryConfig& config) {
38  return (sec_factory_registry.at(config.type()))(config);
39 }
40 
41 void initSecondaries(Aktualizr& aktualizr, const boost::filesystem::path& config_file) {
42  if (!boost::filesystem::exists(config_file)) {
43  throw std::invalid_argument("Secondary ECUs config file does not exist: " + config_file.string());
44  }
45 
46  auto secondary_configs = SecondaryConfigParser::parse_config_file(config_file);
47 
48  for (auto& config : secondary_configs) {
49  try {
50  LOG_INFO << "Creating " << config->type() << " secondaries...";
51  Secondaries secondaries = createSecondaries(*config);
52 
53  for (const auto& secondary : secondaries) {
54  LOG_INFO << "Adding Secondary to Aktualizr."
55  << "HW_ID: " << secondary->getHwId() << " Serial: " << secondary->getSerial();
56  aktualizr.AddSecondary(secondary);
57  }
58  } catch (const std::exception& exc) {
59  LOG_ERROR << "Failed to initialize a secondary: " << exc.what();
60  throw exc;
61  }
62  }
63 }
64 
66  public:
67  SecondaryWaiter(uint16_t wait_port, int timeout_s, Secondaries& secondaries)
68  : endpoint_{boost::asio::ip::tcp::v4(), wait_port},
69  timeout_{static_cast<boost::posix_time::seconds>(timeout_s)},
70  timer_{io_context_},
71  connected_secondaries_(secondaries) {}
72 
73  void addSecondary(const std::string& ip, uint16_t port) { secondaries_to_wait_for_.insert(key(ip, port)); }
74 
75  void wait() {
76  if (secondaries_to_wait_for_.empty()) {
77  return;
78  }
79 
80  timer_.expires_from_now(timeout_);
81  timer_.async_wait([&](const boost::system::error_code& error_code) {
82  if (!!error_code) {
83  LOG_ERROR << "Wait for secondaries has failed: " << error_code;
84  throw std::runtime_error("Error while waiting for secondary");
85  } else {
86  LOG_ERROR << "Timeout while waiting for secondary: " << error_code;
87  throw std::runtime_error("Timeout while waiting for secondary");
88  }
89  io_context_.stop();
90  });
91  accept();
92  io_context_.run();
93  }
94 
95  private:
96  void accept() {
97  LOG_INFO << "Waiting for connection from " << secondaries_to_wait_for_.size() << " secondaries...";
98  acceptor_.async_accept(con_socket_,
99  boost::bind(&SecondaryWaiter::connectionHdlr, this, boost::asio::placeholders::error));
100  }
101 
102  void connectionHdlr(const boost::system::error_code& error_code) {
103  if (!error_code) {
104  auto sec_ip = con_socket_.remote_endpoint().address().to_string();
105  auto sec_port = con_socket_.remote_endpoint().port();
106 
107  LOG_INFO << "Accepted connection from a secondary: (" << sec_ip << ":" << sec_port << ")";
108  try {
109  auto secondary = Uptane::IpUptaneSecondary::create(sec_ip, sec_port, con_socket_.native_handle());
110  if (secondary) {
111  connected_secondaries_.push_back(secondary);
112  }
113  } catch (const std::exception& exc) {
114  LOG_ERROR << "Failed to initialize a secondary: " << exc.what();
115  }
116  con_socket_.shutdown(boost::asio::ip::tcp::socket::shutdown_both);
117  con_socket_.close();
118 
119  secondaries_to_wait_for_.erase(key(sec_ip, sec_port));
120  if (!secondaries_to_wait_for_.empty()) {
121  accept();
122  } else {
123  io_context_.stop();
124  }
125  } else {
126  LOG_ERROR << "Failed to accept connection from a secondary";
127  }
128  }
129 
130  static std::string key(const std::string& ip, uint16_t port) { return (ip + std::to_string(port)); }
131 
132  private:
133  boost::asio::io_service io_context_;
134  boost::asio::ip::tcp::endpoint endpoint_;
135  boost::asio::ip::tcp::acceptor acceptor_{io_context_, endpoint_};
136  boost::asio::ip::tcp::socket con_socket_{io_context_};
137  boost::posix_time::seconds timeout_;
138  boost::asio::deadline_timer timer_;
139 
140  Secondaries& connected_secondaries_;
141  std::unordered_set<std::string> secondaries_to_wait_for_;
142 };
143 
144 static Secondaries createIPSecondaries(const IPSecondariesConfig& config) {
145  Secondaries result;
146  SecondaryWaiter sec_waiter{config.secondaries_wait_port, config.secondaries_timeout_s, result};
147 
148  for (auto& ip_sec_cfg : config.secondaries_cfg) {
149  auto secondary = Uptane::IpUptaneSecondary::connectAndCreate(ip_sec_cfg.ip, ip_sec_cfg.port);
150  if (secondary) {
151  result.push_back(secondary);
152  } else {
153  sec_waiter.addSecondary(ip_sec_cfg.ip, ip_sec_cfg.port);
154  }
155  }
156 
157  sec_waiter.wait();
158  return result;
159 }
160 
161 } // namespace Primary
Aktualizr
This class provides the main APIs necessary for launching and controlling libaktualizr.
Definition: aktualizr.h:20
Primary::IPSecondariesConfig
Definition: secondary_config.h:29
Primary::SecondaryWaiter
Definition: secondary.cc:65
result
Results of libaktualizr API calls.
Definition: results.h:13
Aktualizr::AddSecondary
void AddSecondary(const std::shared_ptr< Uptane::SecondaryInterface > &secondary)
Add new secondary to aktualizr.
Definition: aktualizr.cc:100