8 #include <boost/thread/latch.hpp>
15 #include "logging/logging.h"
18 namespace timer = std::chrono;
23 virtual void stop() = 0;
25 virtual bool claim() = 0;
30 std::atomic_bool stopped;
35 void stop()
override { stopped =
true; }
37 bool claim()
override {
return !stopped; }
42 std::atomic_uint iterations;
47 void stop()
override { iterations.store(0); }
49 bool claim()
override {
51 auto i = iterations.load();
54 }
else if (iterations.compare_exchange_strong(i, i - 1)) {
63 std::atomic_bool stopped;
64 static std::atomic_bool interrupted;
65 static void handleSignal(
int) {
66 LOG_INFO <<
"SIGINT received";
72 std::signal(SIGINT, InterruptableExecutionController::handleSignal);
75 bool claim()
override {
return !(interrupted || stopped); }
77 void stop()
override { stopped =
true; }
80 typedef timer::steady_clock::time_point TimePoint;
83 const timer::duration<int, std::milli> taskInterval;
84 std::atomic_ulong taskIndex;
87 TaskStartTimeCalculator(
const unsigned rate) : startTime{}, taskInterval{std::milli::den / rate}, taskIndex{0} {}
89 void start() { startTime = timer::steady_clock::now(); }
91 TimePoint operator()() {
93 return startTime + taskInterval * i;
97 template <
typename TaskStream>
99 std::unique_ptr<ExecutionController> controller;
100 std::vector<std::thread> workers;
101 std::vector<Statistics> statistics;
103 boost::latch threadCountDown;
104 boost::latch starter;
105 const std::string label;
107 void runWorker(TaskStream &tasks,
Statistics &stats) {
109 GMainContext *thread_context = g_main_context_new();
110 g_main_context_push_thread_default(thread_context);
112 using clock = std::chrono::steady_clock;
113 LOG_DEBUG << label <<
": Worker created: " << std::this_thread::get_id();
114 threadCountDown.count_down();
116 while (controller->claim()) {
117 auto task = tasks.nextTask();
118 const auto intendedStartTime = calculateTaskStartTime();
119 if (timer::steady_clock::now() < intendedStartTime) {
120 std::this_thread::sleep_until(intendedStartTime);
122 const clock::time_point start = clock::now();
124 const clock::time_point end = clock::now();
125 std::chrono::milliseconds executionTime = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
126 stats.recordSuccess(executionTime);
128 LOG_DEBUG << label <<
": Worker finished execution: " << std::this_thread::get_id();
130 g_main_context_pop_thread_default(thread_context);
131 g_main_context_unref(thread_context);
136 Executor(std::vector<TaskStream> &feeds,
const unsigned rate, std::unique_ptr<ExecutionController> ctrl,
137 const std::string lbl)
138 : controller{std::move(ctrl)},
140 statistics(feeds.size()),
141 calculateTaskStartTime{rate},
142 threadCountDown{feeds.size()},
145 workers.reserve(feeds.size());
147 for (
size_t i = 0; i < feeds.size(); i++) {
148 workers.push_back(std::thread(&Executor::runWorker,
this, std::ref(feeds[i]), std::ref(statistics[i])));
159 LOG_INFO << label <<
": Waiting for threads to start";
160 threadCountDown.wait();
161 calculateTaskStartTime.start();
163 LOG_INFO << label <<
": Starting tests";
165 starter.count_down();
167 for (
size_t i = 0; i < workers.size(); i++) {
168 if (workers[i].joinable()) {
174 for (
size_t i = 0; i < statistics.size(); i++) {
175 summary += statistics[i];
177 std::cout <<
"Results for: " << label << std::endl;