vendredi 17 mars 2017

How to implement a Producer-Consumer pattern in which the producer

I'm trying to write a windows service whose producers and consumers work like this:

  • Producer: At scheduled times, get all unprocessed items (Processed = 0 on their row in the db) and add each one to the work queue that isn't already in the work queue
  • Consumer: Constantly pull items from the work queue and process them and update the db (Processed = 1 on their row)

I've tried to look for examples of this exact data flow in C#.NET so I can leverage the existing libraries. But so far I haven't found exactly that.

I see on http://ift.tt/2nNfVlv the example

private static void Produce(BufferBlock<int> queue, IEnumerable<int> values)
{
    foreach (var value in values)
    {
        queue.Post(value);
    }

    queue.Complete();
}

private static async Task<IEnumerable<int>> Consume(BufferBlock<int> queue)
{
    var ret = new List<int>();
    while (await queue.OutputAvailableAsync())
    {
        ret.Add(await queue.ReceiveAsync());
    }

    return ret;
}

Here's the "idea" of what I'm trying to modify that to do:

while(true)
{
    if(await WorkQueue.OutputAvailableAsync())
    {
        ProcessItem(await WorkQueue.ReceiveAsync());
    }
    else
    {
        await Task.Delay(5000);
    }
}

would be how the Consumer works and

MyTimer.Elapsed += Produce;

static async void Produce(object source, ElapsedEventArgs e)
{
     IEnumerable<Item> items = GetUnprocessedItemsFromDb();
     foreach(var item in items)
         if(!WorkQueue.Contains(w => w.Id == item.Id))
             WorkQueue.Enqueue(item);  
}

would be how the Producer works.

That's a rough idea of what I'm trying to do. Can any of you show me the right way to do it, or link me to the proper documentation for solving this type of problem?

Aucun commentaire:

Enregistrer un commentaire