vendredi 24 mars 2023

Observer design pattern using boost asio concurrent channel

My program blocks when trying to send data from producer to consumer using boost asio channel.

The async_send method is not asynchronous. And the documentation says :this method send asynchronously data.

I tried to implement the observer design pattern using boost asio channels to send data between threads.

But Im a little bit surprised for the behaviour of async_send.

struct Subject
{
    using Channel = asio::experimental::concurrent_channel<void(
        std::error_code, std::shared_ptr<Response>)>;
    std::list<Channel> channels;
};
asio::awaitable<void> for_each_channel(Subject& subject, auto action)
{
    for(auto it = subject.channels.begin();
        it != subject.channels.end();)
    {
        if(it->is_open())
        {
            co_await action(*it);
            ++it;
        } else
        {
            it = subject.channels.erase(it);
        }
    }
}
asio::awaitable<void> notify_all(
    Subject& subject, std::shared_ptr<Response> response)
{
    co_await for_each_channel(
        subject,
        [&](Subject::Channel& channel)
        {
            return channel.async_send(
                std::error_code{},
                response,
                asio::use_awaitable); // blocks here
        });
}
asio::awaitable<void> close(Subject& subject)
{
    co_await for_each_channel(
        subject,
        [&](Subject::Channel& channel)
        {
            return channel.async_send(
                std::error_code{asio::error::operation_aborted},
                nullptr,
                asio::use_awaitable);
        });
}
auto& add_observer(Subject& subject, auto executor)
{
    return subject.channels.emplace_back(executor);
}
void remove_observer(Subject::Channel& observer)
{
    observer.close();
}
asio::awaitable<void> producer(Subject& subject)
{
    for(;;)
    {
        auto data = std::make_shared<Response>();
        co_await notify_all(subject, std::move(data));
    }
    co_await close(subject);
}
asio::awaitable<void> consumer(Subject& subject)
{
    bool ok{true};
    auto& observer =
        add_observer(subject, co_await asio::this_coro::executor);
    while(ok)
    {
        auto const [ec, response] = co_await observer.async_receive(
            asio::as_tuple(asio::use_awaitable));
        if(ec)
        {
            break;
        }
        co_await treatment(); // treat the response
    }

My question is why async_send is not asynchronous.

How to avoid blocking the producer threads ?

Is there a more useful/helpful documentation for boost asio channels other than boost documentation.

Aucun commentaire:

Enregistrer un commentaire