There is a thread which is responsible for executing a certain task and acquire the result. To execute the task certain data is needed and it is being produced by multiple clients
and the result must be returned to them. The solution that I am thinking about is to put the data
into a thread safe execution queue along with a std::promise
which its future
is retrieved in the client
code and will be kept to later get the result out of it. The worker thread then fetches the data and the std::promise
, execute the task and set the std::promise
's value. The client
which is now waiting on the std::future
gets its corresponding result. The implementation works technically but my question is that
- Is there a better way to get the same outcome?
- Is there any issue or performance penalty with this approach?
- What are the best paractises for this kind of problem?
Thanks
The code below shows the method described:
std::condition_variable_any workerCv;
std::mutex workerMutex;
SomeThreadSafeQueue<std::pair<int, std::promise<int>>> threadSafeQueue;
int process(int data) {
// Create the promise
std::promise<int> promise;
// Get the future and keep it
auto future = promise.get_future();
// Move the promise along with the data into the thread safe queue
threadSafeQueue.emplace({ data, std::move(promise) });
// Notify the worker about the new data
workerCv.notify_one();
// Get the result
return future.get();
}
void worker(std::stop_token stopToken) {
while (!stopToken.stop_requested()) {
// Construct the unique lock needed for condition variable
std::unique_lock lock(workerMutex);
// Wait for notification
workerCv.wait(lock, stopToken, []() {return !threadSafeQueue.empty(); });
// unlock since we don't need the lock for anything other than the condition variable
lock.unlock();
// Check for stop condition
if (stopToken.stop_requested()) {
break;
}
// Fetch the data and promise from the queue. This specific queue front method
// moves the item out of the queue
auto&& [data, promise] = threadSafeQueue.front();
try {
// Use the data to run some operation and set the result in the corresponding promise
promise.set_value(someOperation(data));
}
catch (...) {
// Set any possible exception into the promise
promise.set_exception(std::current_exception());
}
}
}
void client(int start, int end) {
for (int data = start; data < end; ++data) {
try {
// Call the process function that puts the data into the
// thread safe queue and gets the result when it is ready
// then use it
useTheResult(process(data));
}
catch (const std::exception& exception) {
// Handle the exception
// Not thread safe but ok for demo
std::cout << exception.what() << std::endl;
}
}
}
int main() {
// Stop source for the worker thread
std::stop_source stopSource;
// Invoke the worker thread
std::jthread workerThread(worker, stopSource.get_token());
// Invoke the clients
std::thread client1(client, 0, 10);
std::thread client2(client, 10, 20);
// Join the clients
client1.join();
client2.join();
// Signal the worker thread to stop
stopSource.request_stop();
workerCv.notify_one();
}
As mentioned before the solution works as expected but I have never seen the same approach for using std::promise
and std::future
so I assume that there is some issue with it?
Aucun commentaire:
Enregistrer un commentaire