I am implementing a very basic producer / consumer pattern implementation in C# with the following:
public class Engine
{
private readonly BlockingCollection<string> _ProcessBuffer;
private readonly BlockingCollection<string> _ProcessOutcome;
public Engine()
{
_ProcessBuffer = new BlockingCollection<string>();
_ProcessOutcome = new BlockingCollection<string>();
}
public void DoIt()
{
Task consumer = Task.Factory.StartNew(() =>
{
Parallel.ForEach(_ProcessBuffer.GetConsumingEnumerable(), item => FooBar(item));
Console.WriteLine($"{DateTime.Now:mm:ss.fff} Consume - CompleteAdding");
_ProcessOutcome.CompleteAdding();
Console.WriteLine($"{DateTime.Now:mm:ss.fff} Consume - /CompleteAdding");
}
);
Task producer = Task.Factory.StartNew(() =>
{
//Use concurrent Q to enable safe enquing from multiple threads
//var exceptions = new ConcurrentQueue<Exception>();
//Some loop
for (int i = 0; i < 10; i++)
{
//Add something to the bag
Console.WriteLine($"{DateTime.Now:mm:ss.fff} Produce - {i}");
_ProcessBuffer.Add(Convert.ToString(i));
Console.WriteLine($"{DateTime.Now:mm:ss.fff} /Produce - {i}");
}
//Show we are all done
Console.WriteLine($"{DateTime.Now:mm:ss.fff} Produce - CompleteAdding");
_ProcessBuffer.CompleteAdding();
Console.WriteLine($"{DateTime.Now:mm:ss.fff} Produce - /CompleteAdding");
}
);
consumer.Wait();
producer.Wait();
foreach (string s in _ProcessOutcome)
{
Console.WriteLine(s);
}
}
private void FooBar(string aItem)
{
Console.WriteLine($"{DateTime.Now:mm:ss.fff} FooBar - {aItem}");
Thread.Sleep(500);
_ProcessOutcome.Add(aItem + "XX");
Console.WriteLine($"{DateTime.Now:mm:ss.fff} /FooBar - {aItem}");
}
}
which produces the following console output:
06:44.090 Produce - 2
06:44.090 /Produce - 2
06:44.090 Produce - 3
06:44.090 /Produce - 3
06:44.090 Produce - 4
06:44.090 /Produce - 4
06:44.090 Produce - 5
06:44.090 /Produce - 5
06:44.090 Produce - 6
06:44.090 /Produce - 6
06:44.091 Produce - 7
06:44.091 /Produce - 7
06:44.091 Produce - 8
06:44.091 /Produce - 8
06:44.091 Produce - 9
06:44.091 /Produce - 9
06:44.091 Produce - CompleteAdding
06:44.091 Produce - /CompleteAdding
06:44.126 FooBar - 3
06:44.126 FooBar - 4
06:44.126 FooBar - 1
06:44.126 FooBar - 0
06:44.126 FooBar - 2
06:44.126 FooBar - 5
06:44.126 FooBar - 6
06:44.126 FooBar - 7
06:44.627 /FooBar - 1
06:44.627 /FooBar - 2
06:44.627 /FooBar - 4
06:44.628 FooBar - 9
06:44.627 /FooBar - 0
06:44.627 /FooBar - 6
06:44.627 /FooBar - 5
06:44.627 /FooBar - 3
06:44.627 /FooBar - 7
06:44.627 FooBar - 8
06:45.128 /FooBar - 9
06:45.129 /FooBar - 8
06:45.129 Consume - CompleteAdding
06:45.129 Consume - /CompleteAdding
1XX
0XX
2XX
4XX
5XX
6XX
3XX
7XX
9XX
8XX
Finish...
- However my
FooBar()
consumer only starts running once the producer has completed. What I cannot seem to figure out is how to get the consumer(s) to process _ProcessBuffer as soon as it gets content? - I am guessing my Paralell.ForEach() is the culprit for 1). So how do I refactor things so that I can have more than one consumer (thread) running? Ultimately I want a single producer and multiple consumers.
Aucun commentaire:
Enregistrer un commentaire