vendredi 2 juin 2023

Is it acceptable to put std::promise-s into a container to be set by the thread?

There is a thread which is responsible for executing a certain task and acquire the result. To execute the task certain data is needed and it is being produced by multiple clients and the result must be returned to them. The solution that I am thinking about is to put the data into a thread safe execution queue along with a std::promise which its future is retrieved in the client code and will be kept to later get the result out of it. The worker thread then fetches the data and the std::promise, execute the task and set the std::promise's value. The client which is now waiting on the std::future gets its corresponding result. The implementation works technically but my question is that

  • Is there a better way to get the same outcome?
  • Is there any issue or performance penalty with this approach?
  • What are the best paractises for this kind of problem?

Thanks

The code below shows the method described:

std::condition_variable_any workerCv;
std::mutex workerMutex;
SomeThreadSafeQueue<std::pair<int, std::promise<int>>> threadSafeQueue;

int process(int data) {
  // Create the promise
  std::promise<int> promise;
  // Get the future and keep it
  auto future = promise.get_future();
  // Move the promise along with the data into the thread safe queue
  threadSafeQueue.emplace({ data, std::move(promise) });
  // Notify the worker about the new data
  workerCv.notify_one();
  // Get the result
  return future.get();
}

void worker(std::stop_token stopToken) {
  while (!stopToken.stop_requested()) {
    
    // Construct the unique lock needed for condition variable
    std::unique_lock lock(workerMutex);
    
    // Wait for notification
    workerCv.wait(lock, stopToken, []() {return !threadSafeQueue.empty(); });
    // unlock since we don't need the lock for anything other than the condition variable
    lock.unlock();

    // Check for stop condition
    if (stopToken.stop_requested()) {
      break;
    }
    // Fetch the data and promise from the queue. This specific queue front method
    // moves the item out of the queue
    auto&& [data, promise] = threadSafeQueue.front();
    try {
      // Use the data to run some operation and set the result in the corresponding promise
      promise.set_value(someOperation(data));
    }
    catch (...) {
      // Set any possible exception into the promise
      promise.set_exception(std::current_exception());
    }
  }
}

void client(int start, int end) {
  for (int data = start; data < end; ++data) {
   
    try {
      // Call the process function that puts the data into the
      // thread safe queue and gets the result when it is ready
      // then use it
      useTheResult(process(data));
    }
    catch (const std::exception& exception) {
      // Handle the exception
      // Not thread safe but ok for demo
      std::cout << exception.what() << std::endl;
    }
  }
}

int main() {
  // Stop source for the worker thread
  std::stop_source stopSource;
  // Invoke the worker thread
  std::jthread workerThread(worker, stopSource.get_token());
  // Invoke the clients
  std::thread client1(client, 0, 10);
  std::thread client2(client, 10, 20);
  // Join the clients
  client1.join();
  client2.join();
  // Signal the worker thread to stop
  stopSource.request_stop();
  workerCv.notify_one();
}


As mentioned before the solution works as expected but I have never seen the same approach for using std::promise and std::future so I assume that there is some issue with it?

Aucun commentaire:

Enregistrer un commentaire