I am investigating how to develop a plugin framework for a project and Rx seems like a good fit for what i am trying to achieve. Ultimately, the project will be a set of plugins (modular functionality) that can be configured via xml to do different things. The requirements are as follows
- Enforce a modular architecture even within a plugin. This encourages loose coupling and potentially minimizes complexity. This hopefully should make individual plugin functionality easier to model and test
- Enforce immutability with respect to data to reduce complexity and ensure that state management within modules is kept to a minimum
- Discourage manual thread creation by providing thread pool threads to do work within modules wherever possible
In my mind, a plugin is essentially a data transformation entity (I'm trying to think functional here). This means a plugin either
- Takes in some data and transforms it in some way to produce new data (Not shown here)
- Generates data in itself and pushes it out to observers
- Takes in some data and does some work on the data without notifying outsiders
If you take the concept further, a plugin can consist of a number of all three types above.For example within a plugin you can have an IntGenerator module that generates some data to a ConsoleWorkUnit module etc. So what I am trying to model in the main function is the wiring that a plugin would have to do its work.
To that end, I have the following base classes using the Immutable nuget from Microsoft. What I am trying to achieve is to abstract away the Rx calls so they can be used in modules so the ultimate aim would be to wrap up calls to buffer etc in abstract classes that can be used to compose complex queries and modules. This way the code is a bit more self documenting than having to actually read all the code within a module to find out it subscribes to a buffer or window of type x etc.
public abstract class OutputBase<TOutput> : SendOutputBase<TOutput>
{
public abstract void Work();
}
public interface IBufferedBase<TOutput>
{
void Work(IList<ImmutableList<Data<TOutput>>> list);
}
public abstract class BufferedWorkBase<TInput> : IBufferedBase<TInput>
{
public abstract void Work(IList<ImmutableList<Data<TInput>>> input);
}
public abstract class SendOutputBase<TOutput>
{
private readonly ReplaySubject<ImmutableList<Data<TOutput>>> _outputNotifier;
private readonly IObservable<ImmutableList<Data<TOutput>>> _observable;
protected SendOutputBase()
{
_outputNotifier = new ReplaySubject<ImmutableList<Data<TOutput>>>(10);
_observable = _outputNotifier.SubscribeOn(ThreadPoolScheduler.Instance);
_observable = _outputNotifier.ObserveOn(ThreadPoolScheduler.Instance);
}
protected void SetOutputTo(ImmutableList<Data<TOutput>> output)
{
_outputNotifier.OnNext(output);
}
public void ConnectOutputTo(IWorkBase<TOutput> unit)
{
_observable.Subscribe(unit.Work);
}
public void BufferOutputTo(int count, IBufferedBase<TOutput> unit)
{
_observable.Buffer(count).Subscribe(unit.Work);
}
}
public abstract class WorkBase<TInput> : IWorkBase<TInput>
{
public abstract void Work(ImmutableList<Data<TInput>> input);
}
public interface IWorkBase<TInput>
{
void Work(ImmutableList<Data<TInput>> input);
}
public class Data<T>
{
private readonly T _value;
private Data(T value)
{
_value = value;
}
public static Data<TData> Create<TData>(TData value)
{
return new Data<TData>(value);
}
public T Value { get { return _value; } }
}
These base classes are used to create three classes; one for generating some int data, one to print out the data when they occur and the last to buffer the data as it comes in and sum the values in threes.
public class IntGenerator : OutputBase<int>
{
public override void Work()
{
var list = ImmutableList<Data<int>>.Empty;
var builder = list.ToBuilder();
for (var i = 0; i < 1000; i++)
{
builder.Add(Data<int>.Create(i));
}
SetOutputTo(builder.ToImmutable());
}
}
public class ConsoleWorkUnit : WorkBase<int>
{
public override void Work(ImmutableList<Data<int>> input)
{
foreach (var data in input)
{
Console.WriteLine("ConsoleWorkUnit printing {0}", data.Value);
}
}
}
public class SumPrinter : WorkBase<int>
{
public override void Work(ImmutableList<Data<int>> input)
{
input.ToObservable().Buffer(2).Subscribe(PrintSum);
}
private void PrintSum(IList<Data<int>> obj)
{
Console.WriteLine("Sum of {0}, {1} is {2} ", obj.First().Value,obj.Last().Value ,obj.Sum(x=>x.Value) );
}
}
These are run in a main like this
var intgen = new IntGenerator();
var cons = new ConsoleWorkUnit();
var sumPrinter = new SumPrinter();
intgen.ConnectOutputTo(cons);
intgen.BufferOutputTo(3,sumPrinter);
Task.Factory.StartNew(intgen.Work);
Console.ReadLine();
Is this architecture sound?
Aucun commentaire:
Enregistrer un commentaire