mercredi 17 juillet 2019

How to get consumers to start as soon as there is data in producer consumer?

I am implementing a very basic producer / consumer pattern implementation in C# with the following:

public class Engine
{
    private readonly BlockingCollection<string> _ProcessBuffer;
    private readonly BlockingCollection<string> _ProcessOutcome;

    public Engine()
    {
         _ProcessBuffer = new BlockingCollection<string>();
         _ProcessOutcome = new BlockingCollection<string>();
    }

    public void DoIt()
    {


        Task consumer = Task.Factory.StartNew(() =>
                                              {
                                                  Parallel.ForEach(_ProcessBuffer.GetConsumingEnumerable(), item => FooBar(item));
                                                  Console.WriteLine($"{DateTime.Now:mm:ss.fff}  Consume - CompleteAdding");
                                                  _ProcessOutcome.CompleteAdding();
                                                  Console.WriteLine($"{DateTime.Now:mm:ss.fff}  Consume - /CompleteAdding");
                                              }
        );

        Task producer = Task.Factory.StartNew(() =>
                                              {
                                                  //Use concurrent Q to enable safe enquing from multiple threads
                                                  //var exceptions = new ConcurrentQueue<Exception>();

                                                  //Some loop
                                                  for (int i = 0; i < 10; i++)
                                                  {
                                                      //Add something to the bag
                                                      Console.WriteLine($"{DateTime.Now:mm:ss.fff}  Produce - {i}");
                                                      _ProcessBuffer.Add(Convert.ToString(i));
                                                      Console.WriteLine($"{DateTime.Now:mm:ss.fff}  /Produce - {i}");
                                                  }

                                                  //Show we are all done
                                                  Console.WriteLine($"{DateTime.Now:mm:ss.fff}  Produce - CompleteAdding");
                                                  _ProcessBuffer.CompleteAdding();
                                                  Console.WriteLine($"{DateTime.Now:mm:ss.fff}  Produce - /CompleteAdding");
                                              }
        );

        consumer.Wait();
        producer.Wait();


        foreach (string s in _ProcessOutcome)
        {
            Console.WriteLine(s);
        }
    }

    private void FooBar(string aItem)
    {
        Console.WriteLine($"{DateTime.Now:mm:ss.fff}  FooBar - {aItem}");
        Thread.Sleep(500);
        _ProcessOutcome.Add(aItem + "XX");
        Console.WriteLine($"{DateTime.Now:mm:ss.fff} /FooBar - {aItem}");

    }
}

which produces the following console output:

06:44.090  Produce - 2
06:44.090  /Produce - 2
06:44.090  Produce - 3
06:44.090  /Produce - 3
06:44.090  Produce - 4
06:44.090  /Produce - 4
06:44.090  Produce - 5
06:44.090  /Produce - 5
06:44.090  Produce - 6
06:44.090  /Produce - 6
06:44.091  Produce - 7
06:44.091  /Produce - 7
06:44.091  Produce - 8
06:44.091  /Produce - 8
06:44.091  Produce - 9
06:44.091  /Produce - 9
06:44.091  Produce - CompleteAdding
06:44.091  Produce - /CompleteAdding
06:44.126  FooBar - 3
06:44.126  FooBar - 4
06:44.126  FooBar - 1
06:44.126  FooBar - 0
06:44.126  FooBar - 2
06:44.126  FooBar - 5
06:44.126  FooBar - 6
06:44.126  FooBar - 7
06:44.627 /FooBar - 1
06:44.627 /FooBar - 2
06:44.627 /FooBar - 4
06:44.628  FooBar - 9
06:44.627 /FooBar - 0
06:44.627 /FooBar - 6
06:44.627 /FooBar - 5
06:44.627 /FooBar - 3
06:44.627 /FooBar - 7
06:44.627  FooBar - 8
06:45.128 /FooBar - 9
06:45.129 /FooBar - 8
06:45.129  Consume - CompleteAdding
06:45.129  Consume - /CompleteAdding
1XX
0XX
2XX
4XX
5XX
6XX
3XX
7XX
9XX
8XX
Finish...

  1. However my FooBar() consumer only starts running once the producer has completed. What I cannot seem to figure out is how to get the consumer(s) to process _ProcessBuffer as soon as it gets content?
  2. I am guessing my Paralell.ForEach() is the culprit for 1). So how do I refactor things so that I can have more than one consumer (thread) running? Ultimately I want a single producer and multiple consumers.

Aucun commentaire:

Enregistrer un commentaire