Aktualizr
C++ SOTA Client
All Classes Namespaces Files Functions Variables Enumerations Enumerator Pages
secondary_tcp_server.cc
1 #include "secondary_tcp_server.h"
2 
3 #include <netinet/tcp.h>
4 
5 #include "AKInstallationResultCode.h"
6 #include "AKIpUptaneMes.h"
7 #include "asn1/asn1_message.h"
8 #include "logging/logging.h"
9 #include "msg_handler.h"
10 #include "utilities/dequeue_buffer.h"
11 
12 SecondaryTcpServer::SecondaryTcpServer(MsgHandler &msg_handler, const std::string &primary_ip, in_port_t primary_port,
13  in_port_t port, bool reboot_after_install)
14  : msg_handler_(msg_handler),
15  listen_socket_(port),
16  keep_running_(true),
17  reboot_after_install_(reboot_after_install),
18  is_running_(false) {
19  if (primary_ip.empty()) {
20  return;
21  }
22 
23  ConnectionSocket conn_socket(primary_ip, primary_port, listen_socket_.port());
24  if (conn_socket.connect() == 0) {
25  LOG_INFO << "Connected to Primary, sending info about this Secondary.";
26  HandleOneConnection(*conn_socket);
27  } else {
28  LOG_INFO << "Failed to connect to Primary.";
29  }
30 }
31 
33  if (listen(*listen_socket_, SOMAXCONN) < 0) {
34  throw std::system_error(errno, std::system_category(), "listen");
35  }
36  LOG_INFO << "Secondary TCP server listening on " << listen_socket_.toString();
37 
38  {
39  std::unique_lock<std::mutex> lock(running_condition_mutex_);
40  is_running_ = true;
41  running_condition_.notify_all();
42  }
43 
44  bool first_connection = true;
45 
46  while (keep_running_.load()) {
47  sockaddr_storage peer_sa{};
48  socklen_t peer_sa_size = sizeof(sockaddr_storage);
49 
50  LOG_DEBUG << "Waiting for connection from Primary...";
51  int con_fd = accept(*listen_socket_, reinterpret_cast<sockaddr *>(&peer_sa), &peer_sa_size);
52  if (con_fd == -1) {
53  // Accept can fail if a client closes connection/client socket before a TCP handshake completes or
54  // a network connection goes down in the middle of a TCP handshake procedure. At first glance it looks like
55  // we can just continue listening/accepting new connections in such cases instead of exiting from the server loop
56  // which leads to exiting of the overall daemon process.
57  // But, accept() failure, potentially can be caused by some incorrect state of the listening socket
58  // which means that it will keep returning error, so, exiting from the daemon process and letting
59  // systemd to restart it looks like the most reliable solution that covers all edge cases.
60  LOG_INFO << "Socket accept failed, aborting.";
61  break;
62  }
63 
64  if (first_connection) {
65  LOG_INFO << "Primary connected.";
66  first_connection = false;
67  } else {
68  LOG_DEBUG << "Primary reconnected.";
69  }
70  auto continue_running = HandleOneConnection(*Socket(con_fd));
71  if (!continue_running) {
72  keep_running_.store(false);
73  }
74  LOG_DEBUG << "Primary disconnected.";
75  }
76 
77  {
78  std::unique_lock<std::mutex> lock(running_condition_mutex_);
79  is_running_ = false;
80  running_condition_.notify_all();
81  }
82 
83  LOG_INFO << "Secondary TCP server exiting.";
84 }
85 
86 void SecondaryTcpServer::stop() {
87  LOG_DEBUG << "Stopping Secondary TCP server...";
88  keep_running_.store(false);
89  // unblock accept
90  ConnectionSocket("localhost", listen_socket_.port()).connect();
91 }
92 
93 in_port_t SecondaryTcpServer::port() const { return listen_socket_.port(); }
94 SecondaryTcpServer::ExitReason SecondaryTcpServer::exit_reason() const { return exit_reason_; }
95 
96 static bool sendResponseMessage(int socket_fd, const Asn1Message::Ptr &resp_msg);
97 
98 bool SecondaryTcpServer::HandleOneConnection(int socket) {
99  // Outside the message loop, because one recv() may have parts of 2 messages
100  // Note that one recv() call returning 2+ messages doesn't work at the
101  // moment. This shouldn't be a problem until we have messages that aren't
102  // strictly request/response
103  DequeueBuffer buffer;
104  bool keep_running_server = true;
105  bool keep_running_current_session = true;
106 
107  while (keep_running_current_session) { // Keep reading until we get an error
108  // Read an incomming message
109  AKIpUptaneMes_t *m = nullptr;
110  asn_dec_rval_t res;
111  asn_codec_ctx_s context{};
112  ssize_t received;
113 
114  do {
115  received = recv(socket, buffer.Tail(), buffer.TailSpace(), 0);
116  buffer.HaveEnqueued(static_cast<size_t>(received));
117  res = ber_decode(&context, &asn_DEF_AKIpUptaneMes, reinterpret_cast<void **>(&m), buffer.Head(), buffer.Size());
118  buffer.Consume(res.consumed);
119  } while (res.code == RC_WMORE && received > 0);
120  // Note that ber_decode allocates *m even on failure, so this must always be done
121  Asn1Message::Ptr request_msg = Asn1Message::FromRaw(&m);
122 
123  if (received == 0) {
124  LOG_TRACE << "Primary has closed a connection socket";
125  break;
126  }
127 
128  if (received < 0) {
129  LOG_ERROR << "Error while reading message data from a socket: " << strerror(errno);
130  break;
131  }
132 
133  if (res.code != RC_OK) {
134  LOG_ERROR << "Failed to decode a message received from Primary";
135  break;
136  }
137 
138  LOG_DEBUG << "Received a request from Primary: " << request_msg->toStr();
139  Asn1Message::Ptr response_msg = Asn1Message::Empty();
140  MsgHandler::ReturnCode handle_status_code = msg_handler_.handleMsg(request_msg, response_msg);
141 
142  switch (handle_status_code) {
143  case MsgHandler::ReturnCode::kRebootRequired: {
144  exit_reason_ = ExitReason::kRebootNeeded;
145  keep_running_current_session = sendResponseMessage(socket, response_msg);
146  if (reboot_after_install_) {
147  keep_running_server = keep_running_current_session = false;
148  }
149  break;
150  }
151  case MsgHandler::ReturnCode::kOk: {
152  keep_running_current_session = sendResponseMessage(socket, response_msg);
153  break;
154  }
155  case MsgHandler::ReturnCode::kUnkownMsg:
156  default: {
157  // TODO: consider sending NOT_SUPPORTED/Unknown message and closing connection socket
158  keep_running_current_session = false;
159  LOG_INFO << "Unknown message received from Primary!";
160  }
161  } // switch
162 
163  } // Go back round and read another message
164 
165  return keep_running_server;
166  // Parse error => Shutdown the socket
167  // write error => Shutdown the socket
168  // Timeout on write => shutdown
169 }
170 
171 void SecondaryTcpServer::wait_until_running(int timeout) {
172  std::unique_lock<std::mutex> lock(running_condition_mutex_);
173  running_condition_.wait_for(lock, std::chrono::seconds(timeout), [&] { return is_running_; });
174 }
175 
176 bool sendResponseMessage(int socket_fd, const Asn1Message::Ptr &resp_msg) {
177  LOG_DEBUG << "Encoding and sending response message";
178 
179  int optval = 0;
180  setsockopt(socket_fd, IPPROTO_TCP, TCP_NODELAY, &optval, sizeof(int));
181  asn_enc_rval_t encode_result = der_encode(&asn_DEF_AKIpUptaneMes, &resp_msg->msg_, Asn1SocketWriteCallback,
182  reinterpret_cast<void *>(&socket_fd));
183  if (encode_result.encoded == -1) {
184  LOG_ERROR << "Failed to encode a response message";
185  return false; // write error
186  }
187  optval = 1;
188  setsockopt(socket_fd, IPPROTO_TCP, TCP_NODELAY, &optval, sizeof(int));
189 
190  return true;
191 }
Asn1Message::Empty
static Asn1Message::Ptr Empty()
Create a new Asn1Message, in order to fill it with data and send it.
Definition: asn1_message.h:46
DequeueBuffer::Tail
char * Tail()
A pointer to the next place to write data to.
Definition: dequeue_buffer.cc:32
DequeueBuffer::Consume
void Consume(size_t bytes)
Called after bytes have been read from Head().
Definition: dequeue_buffer.cc:16
ConnectionSocket
Definition: utils.h:124
MsgHandler
Definition: msg_handler.h:10
SecondaryTcpServer::run
void run()
Accept connections on the socket, decode requests and respond using the secondary implementation.
Definition: secondary_tcp_server.cc:32
DequeueBuffer::Size
size_t Size() const
The number of elements that are valid (have been written) after Head()
Definition: dequeue_buffer.cc:11
DequeueBuffer::TailSpace
size_t TailSpace()
The number of bytes beyond Tail() that are allocated and may be written to.
Definition: dequeue_buffer.cc:38
Asn1Message::FromRaw
static Asn1Message::Ptr FromRaw(AKIpUptaneMes_t **msg)
Destructively copy from a raw msg pointer created by parsing an incomming message.
Definition: asn1_message.h:53
Socket
Definition: utils.h:105
DequeueBuffer
A dequeue based on a contiguous buffer in memory.
Definition: dequeue_buffer.h:11
DequeueBuffer::Head
char * Head()
A pointer to the first element that has not been Consumed().
Definition: dequeue_buffer.cc:6
DequeueBuffer::HaveEnqueued
void HaveEnqueued(size_t bytes)
Call to indicate that bytes have been written in the range Tail() ...
Definition: dequeue_buffer.cc:43