Multiple listeners reading from the same stream in NodeJS


Multiple listeners reading from the same stream in NodeJS



I need two different listeners to read the input coming from a readable stream.
They seem to have a race condition, and I want to make sure I get this right...



When I use .on('data') - each listener gets back the same chunk and can read it.



When I use .on('readable') - the first listener to catch the event will read from the buffer, and the second listener will get an empty buffer?



Meaning I cannot use .on('readable') event when I have TWO listeners?



This question has not received enough attention.




3 Answers
3



When the readable event is emitted, both the listeners listen. However, the first to call the read() function would get the data and second one would get an empty buffer because it's the same input stream being read twice and only a single chunk was present.



When the data event is emitted the listener receives the chunk of data being read from the stream. So all the listeners receive the same data.



[EDIT] In detail how it works:



All readable streams begin in 2 modes: flowing and paused. All Readable streams begin in paused mode by default but they can be switched to the flowing mode using any of the three methods:
1. Attaching a 'data' event handler to the stream
2. Calling the stream.resume() method
3. Calling the stream.pipe() method to send the data to a Writable.



When you use any of the above method the stream starts to flow. It doesn't care if the data listeners are attached to the stream and there's a possibility of losing the data. Internally, the read() method is called on the stream and whatever data that is being accumulated in the internal buffer is read and emitted to the listeners. The memory usage is quite low.



When you attach a readable listener on your stream, it takes priority over the data listener and hence your stream remains in the paused mode. In the paused mode, you've to explicitly read the data from the internal buffer by calling the read() method. When the readable data is available it keeps on getting accumulated in the internal buffer until the read() method is called explicitly or the stream is resumed. You can specify the size in bytes of the chunk to be read from the internal buffer or all the available data is returned. When read() is called data event is also emitted with the chunk of data read. After consuming this data, the internal buffer is emptied. So when you have multiple readable events attached and try to consume from the same internal buffer, you are not able to get the same data multiple times.



My suggestion to you would be have just one readable listener and multiple data listeners. Having a readable would give you the flexibility to read when you want without missing any data. And with the data event handlers, you would be able to get that data in all the handlers.



TL;DR: The data event will get the data to as many listeners as you like, but only if the listener will get attached before the data is read, so use pipe method to copy the data to two PassThrough streams and read from each of those separately.


pipe


PassThrough



Here's how in detail:



First what we need to understand: A single stream can only produce the data event only once per a chunk of data. This is because the stream object does not "know" who's reading - a response is read and that's it.


data



Now, just reading your question I would answer - how come? The event will be run twice, because every data event listener will be called... unless you actually set the listener in the readable event...


readable



Let's assume that you set up something like this:


response.on("data", function A(chunk) {...});
response.on("readable", function B() {
response.on("data", function C(chunk) {...});
});



Now when a chunk 1 comes A() will receive chunk: 1. If the stream drains afterwards (meaning the response is so slow it cannot feed any new data) it will pause. Only after that the stream may become readable again and then B() function will setup a listener for C()... but... function A() is already reading the chunk as we do that so C() will work on the next stream only.


1


A()


1


drain


readable


B()


C()


A()


chunk


C()



Now as Swati Anand mentioned, you could use the same event listener - but, in my opinion, would be killing the whole beauty of streams. Using a pipe instead will be much more elegant, like this:


pipe


const PassThrough = require('stream').PassThrough;
const data1 = new PassThrough();
const data2 = new PassThrough();

response.pipe(data1).on("data", function A(){...});
response.pipe(data2);

// when you're ready
data2.on("data", function C(){...});



The idea behind this is - instead of listening on any events on the original stream, we create two pass-through ones that are like "clones" and read the data there. This way no data get's lost for each parts of the program.


data



The node.js stream docs are actually the best read about the data flow. Try to read it as a plumbing story - with connected pipes etc.



You can use RxJS to make it easier as the following


const { Observable, fromStream } = require('rxjs');
const { share } = require('rxjs/operators');
// By declaring this observable, the same stream will be use for every subscribers
const stream$ = Observable.fromStream(yourStream).share();

const subscriber1 = stream$.subscribe(console.log);
const subscriber2 = stream$.subscribe(console.log);






By clicking "Post Your Answer", you acknowledge that you have read our updated terms of service, privacy policy and cookie policy, and that your continued use of the website is subject to these policies.

Popular posts from this blog

PySpark - SparkContext: Error initializing SparkContext File does not exist

django NoReverseMatch Exception

List of Kim Possible characters