Aktualizr
C++ SOTA Client
apiqueue.cc
1 #include "apiqueue.h"
2 #include "logging/logging.h"
3 
4 namespace api {
5 
6 bool FlowControlToken::setPause(bool set_paused) {
7  {
8  std::lock_guard<std::mutex> lock(m_);
9  if (set_paused && state_ == State::kRunning) {
10  state_ = State::kPaused;
11  } else if (!set_paused && state_ == State::kPaused) {
12  state_ = State::kRunning;
13  } else {
14  return false;
15  }
16  }
17  cv_.notify_all();
18  return true;
19 }
20 
22  {
23  std::lock_guard<std::mutex> g(m_);
24  if (state_ == State::kAborted) {
25  return false;
26  }
27  state_ = State::kAborted;
28  }
29  cv_.notify_all();
30  return true;
31 }
32 
33 bool FlowControlToken::canContinue(bool blocking) const {
34  std::unique_lock<std::mutex> lk(m_);
35  if (blocking) {
36  cv_.wait(lk, [this] { return state_ != State::kPaused; });
37  }
38  return state_ == State::kRunning;
39 }
40 
41 void FlowControlToken::reset() {
42  std::lock_guard<std::mutex> g(m_);
43  state_ = State::kRunning;
44 }
45 
46 CommandQueue::~CommandQueue() {
47  try {
48  abort(false);
49  } catch (std::exception& ex) {
50  LOG_ERROR << "~CommandQueue() exception: " << ex.what() << std::endl;
51  } catch (...) {
52  LOG_ERROR << "~CommandQueue() unknown exception" << std::endl;
53  }
54 }
55 
56 void CommandQueue::run() {
57  std::lock_guard<std::mutex> g(thread_m_);
58  if (!thread_.joinable()) {
59  thread_ = std::thread([this] {
60  std::unique_lock<std::mutex> lock(m_);
61  for (;;) {
62  cv_.wait(lock, [this] { return (!queue_.empty() && !paused_) || shutdown_; });
63  if (shutdown_) {
64  break;
65  }
66  auto task = std::move(queue_.front());
67  queue_.pop();
68  lock.unlock();
69  task();
70  lock.lock();
71  }
72  });
73  }
74 }
75 
76 bool CommandQueue::pause(bool do_pause) {
77  bool has_effect;
78  {
79  std::lock_guard<std::mutex> lock(m_);
80  has_effect = paused_ != do_pause;
81  paused_ = do_pause;
82  token_.setPause(do_pause);
83  }
84  cv_.notify_all();
85 
86  return has_effect;
87 }
88 
89 void CommandQueue::abort(bool restart_thread) {
90  {
91  std::lock_guard<std::mutex> thread_g(thread_m_);
92  {
93  std::lock_guard<std::mutex> g(m_);
94  token_.setAbort();
95  shutdown_ = true;
96  }
97  cv_.notify_all();
98  if (thread_.joinable()) {
99  thread_.join();
100  }
101  {
102  // Flush the queue and reset to initial state
103  std::lock_guard<std::mutex> g(m_);
104  std::queue<std::packaged_task<void()>>().swap(queue_);
105  token_.reset();
106  shutdown_ = false;
107  }
108  }
109  if (restart_thread) {
110  run();
111  }
112 }
113 } // namespace api
api::FlowControlToken::setAbort
bool setAbort()
Called by the controlling thread to request the task to abort.
Definition: apiqueue.cc:21
api::FlowControlToken::canContinue
bool canContinue(bool blocking=true) const
Called by the controlled thread to query the currently requested state.
Definition: apiqueue.cc:33
api::FlowControlToken::setPause
bool setPause(bool set_paused)
Called by the controlling thread to request the task to pause or resume.
Definition: apiqueue.cc:6