Aktualizr
C++ SOTA Client
All Classes Namespaces Files Functions Variables Enumerations Enumerator Pages
secondary_tcp_server.cc
1 #include "secondary_tcp_server.h"
2 
3 #include <netinet/tcp.h>
4 
5 #include "AKInstallationResultCode.h"
6 #include "AKIpUptaneMes.h"
7 #include "asn1/asn1_message.h"
8 #include "logging/logging.h"
9 #include "msg_dispatcher.h"
10 #include "utilities/dequeue_buffer.h"
11 
12 SecondaryTcpServer::SecondaryTcpServer(MsgDispatcher &msg_dispatcher, const std::string &primary_ip,
13  in_port_t primary_port, in_port_t port, bool reboot_after_install)
14  : msg_dispatcher_(msg_dispatcher),
15  listen_socket_(port),
16  keep_running_(true),
17  reboot_after_install_(reboot_after_install),
18  is_running_(false) {
19  if (primary_ip.empty()) {
20  return;
21  }
22 
23  ConnectionSocket conn_socket(primary_ip, primary_port, listen_socket_.port());
24  if (conn_socket.connect() == 0) {
25  LOG_INFO << "Connected to Primary, sending info about this Secondary.";
26  HandleOneConnection(*conn_socket);
27  } else {
28  LOG_INFO << "Failed to connect to Primary.";
29  }
30 }
31 
33  if (listen(*listen_socket_, SOMAXCONN) < 0) {
34  throw std::system_error(errno, std::system_category(), "listen");
35  }
36  LOG_INFO << "Secondary TCP server listening on " << listen_socket_.toString();
37 
38  {
39  std::unique_lock<std::mutex> lock(running_condition_mutex_);
40  is_running_ = true;
41  running_condition_.notify_all();
42  }
43 
44  while (keep_running_.load()) {
45  sockaddr_storage peer_sa{};
46  socklen_t peer_sa_size = sizeof(sockaddr_storage);
47 
48  LOG_DEBUG << "Waiting for connection from Primary...";
49  int con_fd = accept(*listen_socket_, reinterpret_cast<sockaddr *>(&peer_sa), &peer_sa_size);
50  if (con_fd == -1) {
51  // Accept can fail if a client closes connection/client socket before a TCP handshake completes or
52  // a network connection goes down in the middle of a TCP handshake procedure. At first glance it looks like
53  // we can just continue listening/accepting new connections in such cases instead of exiting from the server loop
54  // which leads to exiting of the overall daemon process.
55  // But, accept() failure, potentially can be caused by some incorrect state of the listening socket
56  // which means that it will keep returning error, so, exiting from the daemon process and letting
57  // systemd to restart it looks like the most reliable solution that covers all edge cases.
58  LOG_INFO << "Socket accept failed, aborting.";
59  break;
60  }
61 
62  LOG_DEBUG << "Primary connected.";
63  auto continue_running = HandleOneConnection(*Socket(con_fd));
64  if (!continue_running) {
65  keep_running_.store(false);
66  }
67  LOG_DEBUG << "Primary disconnected.";
68  }
69 
70  {
71  std::unique_lock<std::mutex> lock(running_condition_mutex_);
72  is_running_ = false;
73  running_condition_.notify_all();
74  }
75 
76  LOG_INFO << "Secondary TCP server exiting.";
77 }
78 
79 void SecondaryTcpServer::stop() {
80  LOG_DEBUG << "Stopping Secondary TCP server...";
81  keep_running_.store(false);
82  // unblock accept
83  ConnectionSocket("localhost", listen_socket_.port()).connect();
84 }
85 
86 in_port_t SecondaryTcpServer::port() const { return listen_socket_.port(); }
87 SecondaryTcpServer::ExitReason SecondaryTcpServer::exit_reason() const { return exit_reason_; }
88 
89 static bool sendResponseMessage(int socket_fd, Asn1Message::Ptr &resp_msg);
90 
91 bool SecondaryTcpServer::HandleOneConnection(int socket) {
92  // Outside the message loop, because one recv() may have parts of 2 messages
93  // Note that one recv() call returning 2+ messages doesn't work at the
94  // moment. This shouldn't be a problem until we have messages that aren't
95  // strictly request/response
96  DequeueBuffer buffer;
97  bool keep_running_server = true;
98  bool keep_running_current_session = true;
99 
100  while (keep_running_current_session) { // Keep reading until we get an error
101  // Read an incomming message
102  AKIpUptaneMes_t *m = nullptr;
103  asn_dec_rval_t res;
104  asn_codec_ctx_s context{};
105  ssize_t received;
106 
107  do {
108  received = recv(socket, buffer.Tail(), buffer.TailSpace(), 0);
109  buffer.HaveEnqueued(static_cast<size_t>(received));
110  res = ber_decode(&context, &asn_DEF_AKIpUptaneMes, reinterpret_cast<void **>(&m), buffer.Head(), buffer.Size());
111  buffer.Consume(res.consumed);
112  } while (res.code == RC_WMORE && received > 0);
113  // Note that ber_decode allocates *m even on failure, so this must always be done
114  Asn1Message::Ptr request_msg = Asn1Message::FromRaw(&m);
115 
116  if (received <= 0) {
117  LOG_DEBUG << "Primary has closed a connection socket";
118  break;
119  }
120 
121  if (res.code != RC_OK) {
122  LOG_ERROR << "Failed to receive and/or decode a message from Primary";
123  break;
124  }
125 
126  LOG_DEBUG << "Received message from Primary, try to decode it...";
127  Asn1Message::Ptr response_msg = Asn1Message::Empty();
128  MsgDispatcher::HandleStatusCode handle_status_code = msg_dispatcher_.handleMsg(request_msg, response_msg);
129 
130  switch (handle_status_code) {
131  case MsgDispatcher::HandleStatusCode::kRebootRequired: {
132  exit_reason_ = ExitReason::kRebootNeeded;
133  keep_running_current_session = sendResponseMessage(socket, response_msg);
134  if (reboot_after_install_) {
135  keep_running_server = keep_running_current_session = false;
136  }
137  break;
138  }
139  case MsgDispatcher::HandleStatusCode::kOk: {
140  keep_running_current_session = sendResponseMessage(socket, response_msg);
141  break;
142  }
143  case MsgDispatcher::HandleStatusCode::kUnkownMsg:
144  default: {
145  // TODO: consider sending NOT_SUPPORTED/Unknown message and closing connection socket
146  keep_running_current_session = false;
147  LOG_INFO << "Unknown message received from Primary!";
148  }
149  } // switch
150 
151  } // Go back round and read another message
152 
153  return keep_running_server;
154  // Parse error => Shutdown the socket
155  // write error => Shutdown the socket
156  // Timeout on write => shutdown
157 }
158 
159 void SecondaryTcpServer::wait_until_running(int timeout) {
160  std::unique_lock<std::mutex> lock(running_condition_mutex_);
161  running_condition_.wait_for(lock, std::chrono::seconds(timeout), [&] { return is_running_; });
162 }
163 
164 bool sendResponseMessage(int socket_fd, Asn1Message::Ptr &resp_msg) {
165  LOG_DEBUG << "Encoding and sending response message";
166 
167  int optval = 0;
168  setsockopt(socket_fd, IPPROTO_TCP, TCP_NODELAY, &optval, sizeof(int));
169  asn_enc_rval_t encode_result = der_encode(&asn_DEF_AKIpUptaneMes, &resp_msg->msg_, Asn1SocketWriteCallback,
170  reinterpret_cast<void *>(&socket_fd));
171  if (encode_result.encoded == -1) {
172  LOG_ERROR << "Failed to encode a response message";
173  return false; // write error
174  }
175  optval = 1;
176  setsockopt(socket_fd, IPPROTO_TCP, TCP_NODELAY, &optval, sizeof(int));
177 
178  return true;
179 }
Asn1Message::Empty
static Asn1Message::Ptr Empty()
Create a new Asn1Message, in order to fill it with data and send it.
Definition: asn1_message.h:46
DequeueBuffer::Tail
char * Tail()
A pointer to the next place to write data to.
Definition: dequeue_buffer.cc:32
DequeueBuffer::Consume
void Consume(size_t bytes)
Called after bytes have been read from Head().
Definition: dequeue_buffer.cc:16
ConnectionSocket
Definition: utils.h:146
SecondaryTcpServer::run
void run()
Accept connections on the socket, decode requests and respond using the secondary implementation.
Definition: secondary_tcp_server.cc:32
DequeueBuffer::Size
size_t Size() const
The number of elements that are valid (have been written) after Head()
Definition: dequeue_buffer.cc:11
DequeueBuffer::TailSpace
size_t TailSpace()
The number of bytes beyond Tail() that are allocated and may be written to.
Definition: dequeue_buffer.cc:38
MsgDispatcher
Definition: msg_dispatcher.h:12
Asn1Message::FromRaw
static Asn1Message::Ptr FromRaw(AKIpUptaneMes_t **msg)
Destructively copy from a raw msg pointer created by parsing an incomming message.
Definition: asn1_message.h:53
Socket
Definition: utils.h:127
DequeueBuffer
A dequeue based on a contiguous buffer in memory.
Definition: dequeue_buffer.h:11
DequeueBuffer::Head
char * Head()
A pointer to the first element that has not been Consumed().
Definition: dequeue_buffer.cc:6
DequeueBuffer::HaveEnqueued
void HaveEnqueued(size_t bytes)
Call to indicate that bytes have been written in the range Tail() ...
Definition: dequeue_buffer.cc:43