mercredi 22 janvier 2020

Is there a more functional implementation of this Writable Stream using RxJS that waits for a promise to resolve before processing?

I've been living under a rock for a few years so I haven't touched RxJS yet and I'm curious how it handles a situation like below.

const {Readable, Writable} = require('stream')

class DelayedStream extends Writable {
  constructor(options = {}){
    options.objectMode = true;
    super(options);

    this.promise = new Promise((resolve, reject) => {
      // open connection
      setTimeout( function() {
        console.log('resolved')
        resolve('with client')
      }, 2000) 
    }) 
  }
  _write = function(chunk, encoding, done){
    console.log(`_write("${chunk}") called`)
    // waiting for connection.
    this.promise.then((client) => {
      console.log(`processing "${chunk}"`)
      done() 
    });
  }
  _final = function(done) { //...close connection }
}

var readable = Readable.from(['one', 'two', 'three'])
var test = new DelayedStream();
readable.pipe(test)

The usage of the above is for a scenario where I am waiting for a connection to establish before processing the incoming Readable stream.

I'm not fond of having to create a new class that handles this but it works. It seems like a more functional approach would be optimal but RxJS is lower on my list of things to learn coming back to dev after a very, very long hiatus..

Thanks!

Aucun commentaire:

Enregistrer un commentaire