I've been living under a rock for a few years so I haven't touched RxJS yet and I'm curious how it handles a situation like below.
const {Readable, Writable} = require('stream')
class DelayedStream extends Writable {
constructor(options = {}){
options.objectMode = true;
super(options);
this.promise = new Promise((resolve, reject) => {
// open connection
setTimeout( function() {
console.log('resolved')
resolve('with client')
}, 2000)
})
}
_write = function(chunk, encoding, done){
console.log(`_write("${chunk}") called`)
// waiting for connection.
this.promise.then((client) => {
console.log(`processing "${chunk}"`)
done()
});
}
_final = function(done) { //...close connection }
}
var readable = Readable.from(['one', 'two', 'three'])
var test = new DelayedStream();
readable.pipe(test)
The usage of the above is for a scenario where I am waiting for a connection to establish before processing the incoming Readable stream.
I'm not fond of having to create a new class that handles this but it works. It seems like a more functional approach would be optimal but RxJS is lower on my list of things to learn coming back to dev after a very, very long hiatus..
Thanks!
Aucun commentaire:
Enregistrer un commentaire