Assume that there is a class which contains some data and calculates some results given queries, and the queries take a relatively large amount of time.
An example class (everything dummy) is:
#include <vector>
#include <numeric>
#include <thread>
struct do_some_work
{
do_some_work(std::vector<int> data)
: _data(std::move(data))
, _current_query(0)
, _last_calculated_result(0)
{}
void update_query(size_t x) {
if (x < _data.size()) {
_current_query = x;
recalculate_result();
}
}
int get_result() const {
return _last_calculated_result;
}
private:
void recalculate_result() {
//dummy some work here
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
_last_calculated_result = std::accumulate(_data.cbegin(), _data.cbegin() + _current_query, 0);
}
std::vector<int> const _data;
size_t _current_query;
int _last_calculated_result;
};
and this can be used in the main code like:
#include <algorithm>
int main()
{
//make some dummy data
std::vector<int> test_data(20, 0);
std::iota(test_data.begin(), test_data.end(), 0);
{
do_some_work work(test_data);
for (size_t i = 0; i < test_data.size(); ++i) {
work.update_query(i);
std::cout << "result = {" << i << "," << work.get_result() << "}" << std::endl;
}
}
}
The above will wait in the main function a lot.
Now, assuming we want to run this querying in a tight loop (say GUI) and only care about about getting a "recent" result quickly when we query.
So, we want to move the work to a separate thread which calculates the results, and updates it, and when we get result, we get the last calculated one. That is, we want to change do_some_work
class to do its work on a thread, with minimal changes (essentially find a pattern of changes that can be applied to (mostly) any class of this type).
My stab at this is the following:
#include <vector>
#include <numeric>
#include <mutex>
#include <thread>
#include <condition_variable>
#include <iostream>
struct do_lots_of_work
{
do_lots_of_work(std::vector<int> data)
: _data(std::move(data))
, _current_query(0)
, _last_calculated_result(0)
, _worker()
, _data_mtx()
, _result_mtx()
, _cv()
, _do_exit(false)
, _work_available(false)
{
start_worker();
}
void update_query(size_t x) {
{
if (x < _data.size()) {
std::lock_guard<std::mutex> lck(_data_mtx);
_current_query = x;
_work_available = true;
_cv.notify_one();
}
}
}
int get_result() const {
std::lock_guard<std::mutex> lck(_result_mtx);
return _last_calculated_result;
}
~do_lots_of_work() {
stop_worker();
}
private:
void start_worker() {
if (!_worker.joinable()) {
std::cout << "starting worker..." << std::endl;
_worker = std::thread(&do_lots_of_work::worker_loop, this);
}
}
void stop_worker() {
std::cout << "worker stopping..." << std::endl;
if (_worker.joinable()) {
std::unique_lock<std::mutex> lck(_data_mtx);
_do_exit = true;
lck.unlock();
_cv.notify_one();
_worker.join();
}
std::cout << "worker stopped" << std::endl;
}
void worker_loop() {
std::cout << "worker started" << std::endl;
while (true) {
std::unique_lock<std::mutex> lck(_data_mtx);
_cv.wait(lck, [this]() {return _work_available || _do_exit; });
if (_do_exit) { break; }
if (_work_available) {
_work_available = false;
int query = _current_query; //take local copy
lck.unlock(); //unlock before doing lots of work.
recalculate_result(query);
}
}
}
void recalculate_result(int query) {
//dummy lots of work here
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
int const result = std::accumulate(_data.cbegin(), _data.cbegin() + query, 0);
set_result(result);
}
void set_result(int result) {
std::lock_guard<std::mutex> lck(_result_mtx);
_last_calculated_result = result;
}
std::vector<int> const _data;
size_t _current_query;
int _last_calculated_result;
std::thread _worker;
mutable std::mutex _data_mtx;
mutable std::mutex _result_mtx;
std::condition_variable _cv;
bool _do_exit;
bool _work_available;
};
and the usage is (example):
#include <algorithm>
int main()
{
//make some dummy data
std::vector<int> test_data(20, 0);
std::iota(test_data.begin(), test_data.end(), 0);
{
do_lots_of_work work(test_data);
for (size_t i = 0; i < test_data.size(); ++i) {
work.update_query(i);
std::this_thread::sleep_for(std::chrono::milliseconds(500));
std::cout << "result = {" << i << "," << work.get_result() << "}" << std::endl;
}
}
}
This seems to work, giving the last result, not stopping the main function etc.
But, this looks a LOT of changes are required to add a worker thread to a simple class like do_some_work
. Items like two mutexes
(one for the worker/main interaction data, and one for the result), one condition_variable
, one more-work-available
flag and one do-exit
flag, that is quite a bit. I guess we don't want an async
kind of mechanism because we don't want to potentially launch a new thread every time.
Now, I am not sure if there is a MUCH simpler pattern to make this kind of change, but it feels like there should be. A kind of pattern that can be used to off-load work to a thread.
So finally, my question is, can do_some_work
be converted into do_lots_of_work
in a much simpler way than the implementation above?
Aucun commentaire:
Enregistrer un commentaire