jeudi 17 août 2017

RxJava : Pattern for temporarily suspending selected source Observables without statefull variables

In a larger RxJava application, I have a number of hot, infinite source Observables. The emissions from these are merged and then processed by downstream Observers. The result of the processing needs then to be used to temporarily suspend emissions from some of the source Observables, while the non-suspended Observables are expected to continue to emit and the Observers to consume the non-suspended emissions. Being hot Observables any events which occur during suspension, can safely be ignored / dropped.

The only solution I was able to come up with so far is to apply filter with global, statefull variables. The code below shows the principle. For simplicity, I moved the logic which source Observable to suspend, out into a while loop and simply assign randomly the suspend / run decision. Also, the source Observables are replaced by simple intervals (in the real application, the events are random and come from external sources, which are wrapped in Observables)

boolean is1running = true;
boolean is2running = true;
boolean is3running = true;

public void multiStream() {

    Observable<String> ob1 = Observable
            .interval(100, TimeUnit.MILLISECONDS)
            .map(s -> "OB1::" + s)
            .filter(s -> keepRunning(1));

    Observable<String> ob2 = Observable
            .interval(100, TimeUnit.MILLISECONDS)
            .map(s -> "OB2::::" + s)
            .filter(s -> keepRunning(2));

     Observable<String> ob3 = Observable
            .interval(100, TimeUnit.MILLISECONDS)
            .map(s -> "OB3:::::" + s)
            .filter(s -> keepRunning(3));

     Observable<String> finalObs = Observable.merge(ob1, ob2, ob3);

     finalObs.subscribe(s -> System.out.println(s));

     Random randomGenerator = new Random();

     while(true)
     {
         sleep(1000);
         is1running = randomGenerator.nextBoolean();
         is2running = randomGenerator.nextBoolean();
         is3running = randomGenerator.nextBoolean();
     }
}

private boolean keepRunning(int i) {
    switch(i)
    {
        case 1: return is1running;
        case 2: return is2running;
        case 3: return is3running;
    }

    return true;
}

The code seems to work, but I am not happy about having to use global, stateful variables.

Is there better pattern for such a situation that also adheres to the functional and reactive paradigms?

Aucun commentaire:

Enregistrer un commentaire