Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transforms that convert one chunk into many chunks may stop reading when readableHighWaterMark exceeded #49938

Open
IanAtOcucom opened this issue Sep 28, 2023 · 3 comments

Comments

@IanAtOcucom
Copy link

IanAtOcucom commented Sep 28, 2023

Version

18.18.0

Platform

Microsoft Windows NT 10.0.19045.0 x64

Subsystem

stream

What steps will reproduce the bug?

I am only mostly certain this isn't caused by a misreading of the API documentation, but:

If you have a Transform whose writable side accepts a byte stream and may convert one incoming "chunk" into multiple output "chunks" (that is, one call to _transform() results in many calls to push()), if the push() returns false because of back-pressure, 'drain' will never be called on the stream because it has yet to fulfill the incoming chunk's read. This shows up especially when using for-await-of to handle the stream.

(based on this discussion: nodejs/help#2695)


/*
 * transform a "bytestream" into objects. [example uses strings]
 * The incoming byte stream chunk may contain 0..n objects
 * For each _transform() call, convert the stream into its many outputs
 * attempt to push them to the readable side.
 */
class TransformStream extends Transform {
  _decode(chunk,pos,callback) {
    if (pos >= 0 && pos < chunk.length) {
      let end    = chunk.indexOf(':',pos);
      end    = (end < 0 && pos < chunk.length) ? chunk.length : end;
      let substr = chunk.substring(pos,end);

      let noPressure = this.push(substr);

      if (noPressure) { //finish with the chunk or continue processing it
        if (end >= chunk.length) {
          return callback();
        } else {
          setImmediate(()=>{this._decode(chunk,end+1,callback)});
        }
      } else {
        this.once('drain',() => {
          this._decode(chunk,end+1,callback);
        })
      }
    } else {
      return callback();
    }
  }
  _transform(chunk,encoding,callback) {
    console.log("transforming chunk",chunk);
    this._decode(chunk,0,callback);
  }
}

async function wait() {
  return new Promise(resolve => {
    setTimeout(() => resolve(), 1000);
  });
}
    (async () => {
      const transformStream = new TransformStream({objectMode: true, readableHighWaterMark: 1)});

      transformStream.write('hello1:hello2:hello3:hello4:hello5');
      transformStream.write('hello6:hello7:hello8:hello9:hello10');
      transformStream.write('hello11:hello12:hello13:hello14:hello15');
      transformStream.end();

      for await (const text of transformStream) {
        await wait();
        console.error(text);
      }
    })();

How often does it reproduce? Is there a required condition?

We have a case where an input stream might be a large chunk that gets turned into a lot of outputs, and it would be more memory-efficient to pause the input stream rather than buffer the output objects. But using the highWaterMark option, the Transform stream stalls after filling the buffer with calls to push().

What is the expected behavior? Why is that the expected behavior?

There should be a way for an "amplifying" Transform or Duplex to pause reads from its input side until writes to its output side have been consumed. There should be a 'readableDrained' event of some kind to show that the buffer is empty on the output end.

What do you see instead?

reading from the Transform stops when the highWaterMark is set and callback()s from _transform() are paused.

There seems to be no way to detect that the output readable side is drained, because the 'drain' event only fires for the input writable stream.

While it IS possible to peek at the "private" _readableState and check that its buffer.length < highWaterMark and then call push() again, this isn't exactly a publicly documented "API".

Additional information

Example of peeking at private values:

 let interval = setInterval(() => {
          if (this._readableState.buffer.length < this._readableState.highWaterMark) {
            clearInterval(interval)
            this._decode(chunk,end+1,callback);
          }
        },1);
@mscdex
Copy link
Contributor

mscdex commented Sep 28, 2023

This sounds like the same underlying issue as #49302 (no event to listen for after push() returns false since 'drain' is for write()).

@IanAtOcucom
Copy link
Author

@mscdex I would agree. Also, I would agree that your comment on that issue would address this issue: Duplex streams & Transforms should fire an event when the readable side's buffer is no longer full. Unfortunately drain is already used and only applies to the writable.

Based on that bug, I suppose the "Right" way is to check the _readableState.buffer and highWaterMark. I feel very strongly that there should be a public (or at least well-defined) way to do that.

@lukiano
Copy link

lukiano commented Oct 2, 2023

It seems that Duplex.from() can convert an asynchronous generator that takes an (asynchronous) iterable into a Transform object that respects the highWaterMark on the readable side. It will push data that the generator yields but won't continue execution of the generator if push returns false.

That said, this Transform object is operating with objectMode set even if the generator yields objects of type Buffer.

In your example, the iterable argument would output the original chunks (e.g. 'hello1:hello2:hello3:hello4:hello5'), and the generator would yield them split by colon.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants