mardi 17 décembre 2019

Connection state and error metadata best practice with Rx

System

I'm building a server-side streaming calculation engine to perform live calculations on ticking market data for a financial services firm.

The idea is that the calculation server obtains connections to various market data sources (mostly Bloomberg) and recalculates various relatively complex calculations whenever new inputs (prices, FX rates, etc) arrive.

Those calculation results will then get pushed to the various clients (WPF desktop) connected to the server using SignalR. On both the client and the server, I'll be using Rx and IObservable to implement a pipeline for the calculations and cascading downstream activity from them.

Problem

The thing is, a valid state for the calculations might be 0.3995, but it might be "Waiting for price" or even "Server down".

I don't want to use the built-in Rx error system, as it seems brittle. As soon as any exception is encountered, an IObservable terminates forever and must be re-subscribed to be recovered. I would like my streams to be robust and persistent and for the expected possible recoverable error states to be pieces of data rather than exceptions.

My Current Thought

I'm toying with wrapping the return data in the stream in a sort of monad, something like this:

Does anyone have any better suggestions? Or some pointers that would help me correctly frame the problem?

    public enum TransientState 
    {
        NotReady = 0,
        Ok = 1,
        Faulted = 2,
        Fatal = 3
    }

    public struct RxMessage<T> 
    {
        public TransientState State { get; set; }
        public DateTime TimestampUtc { get; set; }
        public string Fault { get; set; }
        public T Payload { get; set; }
    }

Aucun commentaire:

Enregistrer un commentaire