samedi 23 mars 2019

Trying to build a simple Pipe and Filter

I am trying to implement the Pipe and Filter pattern integrating TPL Dataflow within it. I am having issues where not all my results are churned out. For example, i put 99999 items into the pipeline and only 85238 came out.

EmployeeModel.cs

public class EmployeeModel
{

    public String FirstName { get; set; }

    public String LastName { get; set; }

    public String FullName { get; set; }

    public override String ToString()
    {
        return $"FirstName: {FirstName}\nLastName: {LastName}\nFullName: {FullName}\n";
    }
}

IFilter.cs

public interface IFilter<T>
{
    T Execute(T input);
}

AbstractParallelFilter.cs

public abstract class AbstractParallelFilter<T> : IFilter<T>
{
    public AbstractParallelFilter()
    {
        TransformBlock = new TransformBlock<T, T>(new Func<T, T>(Execute), new ExecutionDataflowBlockOptions()
        {
             BoundedCapacity = DataflowBlockOptions.Unbounded,
             MaxDegreeOfParallelism = Environment.ProcessorCount

        });
    }


    public abstract T Execute(T input);

    internal TransformBlock<T, T> TransformBlock { get; private set; }
}

IParallelPipeline.cs

public interface IParallelPipeline<T>
{

    IParallelPipeline<T> Register(AbstractParallelFilter<T> filter);

    IParallelPipeline<T> CompleteRegisteration();

    IParallelPipeline<T> Process(T input);

    Task CompleteProcessing();


    ConcurrentBag<T> Results { get; set; }
}

AbstractParallelPipeline.cs

public abstract class AbstractParallelPipeline<T>: IParallelPipeline<T>
{

    public AbstractParallelPipeline()
    {
        filters = new List<AbstractParallelFilter<T>>();

        Results = new ConcurrentBag<T>();
    }

    public IParallelPipeline<T> Register(AbstractParallelFilter<T> filter)
    {
        filters.Add(filter);
        return this;
    }

    public abstract IParallelPipeline<T> Process(T input);

    public Task CompleteProcessing()
    {
        if (filters.Count == 0)
            throw new Exception("No filters have been registered");

        filters.First().TransformBlock.Complete();

        return filters.Last().TransformBlock.Completion;
    }

    public IParallelPipeline<T> CompleteRegisteration()
    {
        if (filters.Count < 2)
        {
            return this;
        }
        else
        {
            for (int i = filters.Count - 2; i >= 0; i--)
            {
                filters[i].TransformBlock.LinkTo(filters[i + 1].TransformBlock, new DataflowLinkOptions() { PropagateCompletion = true });
            }

            ActionBlock<T> dumpBlock = new ActionBlock<T>(x => Results.Add(x));
            filters.Last().TransformBlock.LinkTo(dumpBlock, new DataflowLinkOptions() { PropagateCompletion = true });
        }

        return this;
    }

    public IList<AbstractParallelFilter<T>> filters;

    public ConcurrentBag<T> Results { get; set; }
}

ParallelPipeline.cs

public class ParallelPipeline<T> : AbstractParallelPipeline<T>
{
    public override IParallelPipeline<T> Process(T input)
    {
        filters.First().TransformBlock.Post(input);
        return this;
    }
}

Program.cs

class Program
{
    static void Main(string[] args)
    {
        List<EmployeeModel> employeeModels = new List<EmployeeModel>();
        int count = 99999;

        for (int i = 0; i < count; i++)
        {
            EmployeeModel employee = new EmployeeModel()
            {
                FirstName = NameGenerator.GenerateFirstName(Gender.Female),
                LastName = NameGenerator.GenerateLastName()
            };
            employeeModels.Add(employee);
        }

        IParallelPipeline<EmployeeModel> parallelPipeline = new ParallelPipeline<EmployeeModel>()
            .Register(new ParallelFirstNameToUpperFilter())
            .Register(new ParallelLastNameToUpperFilter())
            .Register(new ParallelFullNameConcatFilter())
            .CompleteRegisteration();

        for (int i = 0; i < count; i++)
        {
            parallelPipeline.Process(employeeModels[i]);
        }
        parallelPipeline
            .CompleteProcessing()
            .Wait();


        Console.WriteLine(parallelPipeline.Results.Count);

        Console.Read();
    }
}

Aucun commentaire:

Enregistrer un commentaire