Aktualizr
C++ SOTA Client
apiqueue.h
1 #ifndef AKTUALIZR_APIQUEUE_H
2 #define AKTUALIZR_APIQUEUE_H
3 
4 #include <atomic>
5 #include <condition_variable>
6 #include <functional>
7 #include <future>
8 #include <mutex>
9 #include <queue>
10 #include <thread>
11 #include <utility>
12 
13 namespace api {
14 
15 ///
16 /// Provides a thread-safe way to pause and terminate task execution.
17 /// A task must call canContinue() method to check the current state.
18 ///
20  public:
21  ///
22  /// Called by the controlling thread to request the task to pause or resume.
23  /// Has no effect if the task was aborted.
24  /// @return `true` if the state was changed, `false` otherwise.
25  ///
26  bool setPause(bool set_paused);
27 
28  ///
29  /// Called by the controlling thread to request the task to abort.
30  /// @return `false` if the task was already aborted, `true` otherwise.
31  ///
32  bool setAbort();
33 
34  ///
35  /// Called by the controlled thread to query the currently requested state.
36  /// Sleeps if the state is `Paused` and `blocking == true`.
37  /// @return `true` for `Running` state, `false` for `Aborted`,
38  /// and also `false` for the `Paused` state, if the call is non-blocking.
39  ///
40  bool canContinue(bool blocking = true) const;
41 
42  ////
43  //// Sets token to the initial state
44  ////
45  void reset();
46 
47  private:
48  enum class State {
49  kRunning, // transitions: ->Paused, ->Aborted
50  kPaused, // transitions: ->Running, ->Aborted
51  kAborted // transitions: none
52  } state_{State::kRunning};
53  mutable std::mutex m_;
54  mutable std::condition_variable cv_;
55 };
56 
57 class CommandQueue {
58  public:
59  ~CommandQueue();
60  void run();
61  bool pause(bool do_pause); // returns true iff pause→resume or resume→pause
62  void abort(bool restart_thread = true);
63 
64  template <class R>
65  std::future<R> enqueue(const std::function<R()>& f) {
66  std::packaged_task<R()> task(f);
67  auto r = task.get_future();
68  {
69  std::lock_guard<std::mutex> lock(m_);
70  queue_.push(std::packaged_task<void()>(std::move(task)));
71  }
72  cv_.notify_all();
73  return r;
74  }
75 
76  template <class R>
77  std::future<R> enqueue(const std::function<R(const api::FlowControlToken*)>& f) {
78  std::packaged_task<R()> task(std::bind(f, &token_));
79  auto r = task.get_future();
80  {
81  std::lock_guard<std::mutex> lock(m_);
82  queue_.push(std::packaged_task<void()>(std::move(task)));
83  }
84  cv_.notify_all();
85  return r;
86  }
87 
88  private:
89  std::atomic_bool shutdown_{false};
90  std::atomic_bool paused_{false};
91 
92  std::thread thread_;
93  std::mutex thread_m_;
94 
95  std::queue<std::packaged_task<void()>> queue_;
96  std::mutex m_;
97  std::condition_variable cv_;
98  FlowControlToken token_;
99 };
100 
101 } // namespace api
102 #endif // AKTUALIZR_APIQUEUE_H
api::FlowControlToken::setAbort
bool setAbort()
Called by the controlling thread to request the task to abort.
Definition: apiqueue.cc:21
api::FlowControlToken::canContinue
bool canContinue(bool blocking=true) const
Called by the controlled thread to query the currently requested state.
Definition: apiqueue.cc:33
api::FlowControlToken
Provides a thread-safe way to pause and terminate task execution.
Definition: apiqueue.h:19
api::CommandQueue
Definition: apiqueue.h:57
api::FlowControlToken::setPause
bool setPause(bool set_paused)
Called by the controlling thread to request the task to pause or resume.
Definition: apiqueue.cc:6