dimanche 25 octobre 2020

Simple worker thread in C++ class

Assume that there is a class which contains some data and calculates some results given queries, and the queries take a relatively large amount of time.

An example class (everything dummy) is:

#include <vector>
#include <numeric>
#include <thread>

struct do_some_work
{
    do_some_work(std::vector<int> data) 
        : _data(std::move(data))
        , _current_query(0)
        , _last_calculated_result(0) 
    {}
    void update_query(size_t x) {
        if (x < _data.size()) {
            _current_query = x;
            recalculate_result();
        }
    }
    int get_result() const {
        return _last_calculated_result;
    }
private:
    void recalculate_result() {
        //dummy some work here     
        std::this_thread::sleep_for(std::chrono::milliseconds(1000));
        _last_calculated_result = std::accumulate(_data.cbegin(), _data.cbegin() + _current_query, 0);
    }

    std::vector<int> const _data;
    size_t _current_query;
    int _last_calculated_result;
};

and this can be used in the main code like:

#include <algorithm>

int main()
{
    //make some dummy data
    std::vector<int> test_data(20, 0);
    std::iota(test_data.begin(), test_data.end(), 0);

    {
        do_some_work work(test_data);
        for (size_t i = 0; i < test_data.size(); ++i) {
            work.update_query(i);
            std::cout << "result = {" << i << "," <<  work.get_result() << "}" << std::endl;
        }
    }
}

The above will wait in the main function a lot.

Now, assuming we want to run this querying in a tight loop (say GUI) and only care about about getting a "recent" result quickly when we query.

So, we want to move the work to a separate thread which calculates the results, and updates it, and when we get result, we get the last calculated one. That is, we want to change do_some_work class to do its work on a thread, with minimal changes (essentially find a pattern of changes that can be applied to (mostly) any class of this type).

My stab at this is the following:

#include <vector>
#include <numeric>
#include <mutex>
#include <thread>
#include <condition_variable>
#include <iostream>

struct do_lots_of_work
{
    do_lots_of_work(std::vector<int> data) 
        : _data(std::move(data))        
        , _current_query(0)
        , _last_calculated_result(0)
        , _worker()
        , _data_mtx()
        , _result_mtx()
        , _cv()
        , _do_exit(false)
        , _work_available(false)
    {
        start_worker();
    }
    void update_query(size_t x) {
        {
            if (x < _data.size()) {
                std::lock_guard<std::mutex> lck(_data_mtx);
                _current_query = x;
                _work_available = true;
                _cv.notify_one();
            }
        }        
    }
    int get_result() const {
        std::lock_guard<std::mutex> lck(_result_mtx);
        return _last_calculated_result;
    }

    ~do_lots_of_work() {
        stop_worker();
    }

private:
    void start_worker() {
        if (!_worker.joinable()) {
            std::cout << "starting worker..." << std::endl;
            _worker = std::thread(&do_lots_of_work::worker_loop, this);
        }
    }

    void stop_worker() {
        std::cout << "worker stopping..." << std::endl;
        if (_worker.joinable()) {
            std::unique_lock<std::mutex> lck(_data_mtx);
            _do_exit = true;
            lck.unlock();
            _cv.notify_one();            
            _worker.join();
        }
        std::cout << "worker stopped" << std::endl;
    }

    void worker_loop() {
        std::cout << "worker started" << std::endl;
        while (true) {
            std::unique_lock<std::mutex> lck(_data_mtx);
            _cv.wait(lck, [this]() {return _work_available || _do_exit; });
            if (_do_exit) { break; }
            if (_work_available) {
                _work_available = false;
                int query = _current_query; //take local copy
                lck.unlock(); //unlock before doing lots of work.
                recalculate_result(query);                
            }
        }
    }

    void recalculate_result(int query) {
        //dummy lots of work here
        std::this_thread::sleep_for(std::chrono::milliseconds(1000));
        int const result = std::accumulate(_data.cbegin(), _data.cbegin() + query, 0);    
        set_result(result);
    }

    void set_result(int result) {
        std::lock_guard<std::mutex> lck(_result_mtx);
        _last_calculated_result = result;
    }
    
    std::vector<int> const  _data;
    size_t                  _current_query;
    int                     _last_calculated_result;
    
    std::thread             _worker;
    mutable std::mutex      _data_mtx;
    mutable std::mutex      _result_mtx;
    std::condition_variable _cv;
    bool                    _do_exit;
    bool                    _work_available;
};

and the usage is (example):

#include <algorithm>

int main()
{
    //make some dummy data
    std::vector<int> test_data(20, 0);
    std::iota(test_data.begin(), test_data.end(), 0);

    {
        do_lots_of_work work(test_data);
        for (size_t i = 0; i < test_data.size(); ++i) {            
            work.update_query(i);
            std::this_thread::sleep_for(std::chrono::milliseconds(500));
            std::cout << "result = {" << i << "," << work.get_result() << "}" << std::endl;
        }
    }
}

This seems to work, giving the last result, not stopping the main function etc.

But, this looks a LOT of changes are required to add a worker thread to a simple class like do_some_work. Items like two mutexes (one for the worker/main interaction data, and one for the result), one condition_variable, one more-work-available flag and one do-exit flag, that is quite a bit. I guess we don't want an async kind of mechanism because we don't want to potentially launch a new thread every time.

Now, I am not sure if there is a MUCH simpler pattern to make this kind of change, but it feels like there should be. A kind of pattern that can be used to off-load work to a thread.

So finally, my question is, can do_some_work be converted into do_lots_of_work in a much simpler way than the implementation above?

Aucun commentaire:

Enregistrer un commentaire