Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do transform streams have to handle backpressure as well? #2695

Closed
vlopp opened this issue May 11, 2020 · 22 comments
Closed

Do transform streams have to handle backpressure as well? #2695

vlopp opened this issue May 11, 2020 · 22 comments
Labels

Comments

@vlopp
Copy link

vlopp commented May 11, 2020

Do Node's Transform Streams have to watch the push's return value and wait for the drain event as well? Since under they mask they're just two read and write streams connected together, I'd assume so, nevertheless all online implementations seem to just push at will.

export class MyTransform extends stream.Transform {
  constructor() {
    super({ objectMode: true });
  }
  
  async _transform(chunk: any, encoding: string, callback: TransformCallback) {
    const arrayOfStrings = extractStrings(chunk);
    for (const string of arrayOfStrings) {
      if (!this.push(string)) {
        await new Promise((res) => this.once("drain", res));
        callback();
      }
    }
  }
}
@squarewav
Copy link

Yes. Transform and Duplex are just glue for Readable / Writable so with respect to Readable just think of the rules for Readable.

If you post a link to what you saw online we can dissect. Maybe there's some nuance to it. But it's equally likely that it's just wrong. Streams take some time to grasp but like most things, if you think about it long enough it starts to make sense. Ignoring push (or write) returning false is very bad. It
can cause chunks to get queued and just pile up uncontrollably.

Note that since a Promise is really just a callback wrapped in an object and you already have an object ("this"), then you could just do things the declarative way:

_pushString(arrayOfString, ai, callback) {
    if (ai === arrayOfString.length)
        return callback();

    if (this.push(arrayOfString[ai]) === false) {
        return this.once('drain', () => {
            _pushString(arrayOfString, ai + 1, callback);
        }); 
    }   

    _pushString(arrayOfString, ai + 1, callback)
}
_transform(chunk, encoding, callback) {
    const arrayOfString = extractString(chunk);
    this._pushString(arrayOfString, 0, callback);
}

At least this could be a lot faster if the reader is slow because you're not creating lot's of Promise objects and doing unnecessary context switches.

@dschnare
Copy link

dschnare commented Jun 19, 2020

This would be even easier to read IMO.

export class MyTransform extends stream.Transform {
  constructor() {
    super({ objectMode: true });
  }
  
  _pushWithBackpressure (chunk, encoding) {
    if (!this.push(chunk, encoding))
      this.once('drain', () => this._pushWithBackpressure(chunk, encoding))
    }
  }

  _transform(chunk: any, encoding: string, callback: TransformCallback) {
    const arrayOfStrings = extractStrings(chunk);
    for (const string of arrayOfStrings) {
      this._pushWithBackPressure(string)
    }
  }
}

@squarewav
Copy link

No! That will queue up all strings AND install extraneous drain handlers.

When doing I/O JS is more declarative than functional. It's a fundamentally different way of thinking. That is what Promises try to solve but fail IMO.

@dschnare
Copy link

Yeah, I've tested this and I get a memory leak warning due to maxEventListeners being reached. However, the solution I described above is no different than the solution you provided; we're both still adding event listeners to drain when push returns false.

Also, regarding Promises; fundamentally, adding listeners or using promises they do the same thing.

TBH I have come to realize that the streams documentation and stream API around backpressure handling is gorgeously inadequate. There's talk about backpressure handling and yet no fundamental mechanisms that can be leveraged to handle it gracefully.

I get the fact that when push returns false you're supposed to stop accepting data from your source, thereby adding backpressure to your source. However, how do you do this when you write a Transform that can push several times from a single chunk? At the end of the day you have to buffer it, so why in the heck isn't there a mechanism to handle this for you? This is a very common situation. At least have a mechanism that you can optionally opt into.

@dschnare
Copy link

Or, have documentation for handling common backpressure scenarios (i.e. Transform that pushes multiple times from a single chunk, etc.).

@dschnare
Copy link

For example, with my solution and your solution we'll still get the max listener warning for say a stream of 72K records in object mode coming through. Your solution is optimized for pushing multiple chunks/objects sequentially, but it still ends up adding an event listener for drain, potentially dozens, hundreds or thousands of times (depending on how fast the objects come from the source). In my solution it would be much more event listeners being added for sure.

So the question is: How are we supposed to handle this so that we don't get the even listener warning nonsense and yet still respect backpressure? I guess just randomly set your maxListeners=900?

For some context I have a scenario where I'm reading a .xlsx file and streaming each row as a JSON object. I have 72K rows or so. So I hit the max listeners pretty quickly. To me this doesn't feel like a great solution.

@dschnare
Copy link

dschnare commented Jun 21, 2020

@squarewav I see the genius of your solution; you only add a single drain listener and wait until that event is fired BEFORE pushing more chunks. For a really fast moving source stream you will still hit the max listener warning, but it will be less likely to happen.

NOTE: I've made some corrects from my original.

Would this be a more generalized approach to your solution:

/**
 * Pushes one or more chunks to a stream stream while handling backpressure.
 * 
 * @example
 * [1, 2, 3, 4, 5, 6, 7, 8, 9 , 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20]
 * pushWithBackpressure(stream, chunks, () => ...)
 * pushWithBackpressure(stream, ['hi', 'hello'], 'utf8', () => ...)
 * @param {Duplex} stream The Duplex stream the chunks will be pushed to
 * @param {any} chunks The chunk or array of chunks to push to a stream stream
 * @param {string|Function} [encoding] The encoding of each string chunk or callback function
 * @param {Function} [callback] Callback function called after all chunks have been pushed to the stream
 * @retrn {Duplex}
 */
const pushWithBackpressure = (stream, chunks, encoding, callback, { $index = 0 } = {}) => {
  if (!(stream instanceof Duplex)) {
    throw new TypeError('Argument "stream" must be an instance of Duplex')
  }
  chunks = [].concat(chunks).filter(x => x !== undefined)
  if (typeof encoding === 'function') {
    callback = encoding
    encoding = undefined
  }
  if ($index >= chunks.length) {
    if (typeof callback === 'function') {
      callback()
    }
    return stream
  } else if (!stream.push(chunks[$index], ...([encoding].filter(Boolean)))) {
    console.error('BACKPRESSURE', $index)
    return stream.once('drain', () => {
      console.error('DRAIN')
      pushWithBackpressure(stream, chunks, encoding, callback, { $index: $index + 1 })
    })
  }
  return pushWithBackpressure(stream, chunks, encoding, callback, { $index: $index + 1 })
}

@dschnare
Copy link

dschnare commented Jun 21, 2020

I've tried several attempts to implement this function in a Transform stream and I've found that the drain event doesn't actually get emitted, so my callback never gets called.

My Transform only works if I iterate over the objects I want to push to the stream then call the callback.

I noticed this passage in the Backpressure guidelines:

https://nodejs.org/es/docs/guides/backpressuring-in-streams/

image

Could it be that Transform streams automatically handle backpressure when calling push multiple times?

@squarewav
Copy link

squarewav commented Jun 21, 2020

I didn't really read through all of your comments but your understanding of write() is not accurate. There are two ways to call write(). One is with a callback in which case you should not call write() again until after the callback is called. So for a transform, this means that _transform will not be called repeatedly and therefore no drain events will build up (using my code). The other way to call write is to not use the callback and monitor the return value of write(). When it returns false, you stop writing. For an object mode stream, you would normally set writableHighWaterMark to something low (default is 16 I think) but in practice it might be like 2 or 3 depending on what exactly the transform is doing. So again, no major drain events will build up. You can call push multiple times for one chunk. That's what buffering is for. Streams are a relatively sophisticated feature. It takes some time to "get" it. Just keep going.

The real problem with streams is error handling. You can propagate errors up but not down. If an error occurs in a stream that is not at the end of a pipeline of streams, it is borderline impossible to cleanup the downstream streams. The result is resource leaks. That is the Achilles heal of nodejs streams actually (and maybe nodejs in general since HTTP clients are effectively downstream streams and you have no control over what HTTP clients do or not do).

@dschnare
Copy link

@squarewav I honestly can't get your solution to work though (or mine for that matter). My Transform stream when calling push() returns false after a few calls to push (expected), however the stream never emits drain so only the first "batch" of pushes is ever made available to the Readable side of the stream, so I lose the rest of the data. In fact because drain is not called _transform() is only called once due to the callback is never being called.

Agreed, error handling with streams is a crap shoot, however using stream.pipeline can solve that (at least from my experience and reading the documentation).

@dschnare
Copy link

Hate to draw this out. However, I was able to write a contrived test script that doesn't have the drain event issue I'm encountering. All works as expected.

However, a problem arises when I read from a CSV text file, parse it in a Transform#_transform and call push for each parsed row/record in the file; drain is never called in my case.

/**
 * Converts a CSV file into a stream of JSON records.
 * 
 * The CSV stream will have its encoding set to "utf-8" when calling this function.
 * 
 * Each JSON record will be keyed with the first CSV record in the file (i.e. the
 * column/field names will be taken from the first line of the file).
 * 
 * The returned Readable stream is readable in object mode, where each record as
 * a JSON object is available on the stream.
 * 
 * @example
 * const r = FS.createReadableStream('data.csv')
 * csvToJsonStream(r).on('data', record => console.log(record))
 * @param {import('stream').Readable} csvStream The CSV string stream to convert into a JSON stream
 * @param {{ columns?: string[] }} [options]
 * @return {import('stream').Readable} The readable stream of JSON records
 */
const csvToJsonStream = (csvStream, { columns = null } = {}) => {
  if (!(csvStream instanceof Readable)) {
    throw new TypeError('CSV stream must be an instance of Readable')
  }

  return pipeline(
    csvStream,
    new CsvRowStream(),
    new JsonStream(columns),
    error => {
      if (error) {
        console.error('CsvExtractor error :: ' + error)
      }
    }
  )
}

// Convert a CSV text stream into a JSON array of strings stream in object mode.
// The CSV headrs are read from the first line in the text stream.
class CsvRowStream extends Transform {
  constructor () {
    super({ readableObjectMode: true, defaultEncoding: 'utf8' })
    this._buffer = ''
    this._columnCount = 0
  }

  _transform (chunk, encoding, done) {
    try {
      this._buffer += chunk
      const { records, lastIndex } = parseCsv(this._buffer, { colCount: this._columnCount })
      if (lastIndex && this._buffer[lastIndex - 1] === '\n') {
        this._buffer = this._buffer.slice(lastIndex)
      }
      if (!this._columnCount) {
        this._columnCount = (records[0] || []).length
      }
      // Drain is never emitted! So this doesn't work.
      pushWithBackpressure(this, records, done)
      // for (const r of records) {
      //   this.push(r)
      // }
      // done()
    } catch (error) {
      done(error)
    }
  }
}

// Convert a JSON string array object mode stream into a JSON object object mode stream.
// Object field names are read from the first record/entry in the stream.
class JsonStream extends Transform {
  constructor (columns) {
    super({ objectMode: true })
    this._columns = columns ? columns.slice() : columns
  }

  get columns () {
    return this._columns ? this._columns.slice() : null
  }

  set columns (value) {
    if (!Array.isArray(value) || value.some(c => c && typeof c !== 'string')) {
      throw new TypeError('Columns must be an array of non-empty strings')
    }
    this._columns = value.slice()
  }

  _transform (fields, _, done) {
    if (!Array.isArray(fields)) {
      done(null, fields)
    } else if (this._columns && this._columns.length) {
      try {
        if (fields.length !== this._columns.length) {
          throw Object.assign(
            new Error(`Column count mismatch. Expected ${this._columns.length} fields and only got ${fields.length}`),
            { name: 'ColumnMismatchError', fields: fields.slice(), columns: this._columns.slice() }
          )
        }

        const entity = fields.reduce((obj, value, col) => {
          return Object.assign(obj, { [this._columns[col]]: value })
        }, {})
        done(null, entity)
      } catch (error) {
        done(error)
      }
    } else if (!this._columns) {
      try {
        this.columns = fields
        done(null)
      } catch (error) {
        done(error)
      }
    } else {
      done(null, fields)
    }
  }
}

@dschnare
Copy link

@squarewav I've narrowed my problem down to completely different transform. So to conclude my blunder here; your solution and my generalized solution both work as advertised. My problem was due to some other area of my pipeline.

I will report back when I discover my issue.

@dschnare
Copy link

@squarewav My problem came from me creating the source stream, then running some async code (awaiting) before I piped the source to the transforms and destination stream. My guess is that the source stream was filling the writeable side of the stream too quickly.

By moving the async work before the source stream is opened, this solved my problem of no drain events being fired in my Transform instances.

However, I have a few questions if you don't mind helping me with:

  1. The docs say all readable streams start out in paused mode. So why did opening the readable stream, waiting for some async work to be done, then piping the source to transforms and the destination stream make any difference? Shouldn't the source have been paused and waiting for calls to read or a data event listener to be added to enter flow mode? Shouldn't data have been buffered for that short window? I know the order of things was not optimal, but still in principle data should have been buffered no?

  2. I have an isolated test that demonstrate things are working as expected, however I can "break" things by instead of piping to stdout I pipe to a file on disk. Now I'm back to no drain event emitted.

Example:

// works as expected; writes all CSV records to stdout
pipe(source, ...transforms, process.stdout, error => ...)
// does not work as expected; only writes the first "batch" of parsed CSV records
pipe(source, ...transforms, FS.createWriteStream('./out.csv', { encoding: 'utf8' }, error => ...)

@dschnare
Copy link

@squarewav Ok. After changing the order my async operations I thought I was out of the dark, but the fundamental problem still remains; Transfrom classes DO NOT emit the drain event. Your solution just doesn't work. I thought it was working, but that was ONLY when I piped to stdout. The moment I pipe to disk, when backpressure actually occurs, then I'm stuck again; no drain event.

I have found that in order to listen to the drain event when needing to push multiple chunks to the read queue you need to use: Transform#_readableState.pipes.on('drain', () => ...)

https://stackoverflow.com/questions/20769132/whats-the-proper-way-to-handle-back-pressure-in-a-node-js-transform-stream

This was the point I was trying to make above, although I got a bit wordy about it; there is inadequate documentation on how to properly handle backpressure when implementing a Transform class that needs to push several chunks on the read queue for every call to _transform().

I get the whole backpressure dance when calling Writable#write() directly, or when calling Readable#push during an invocation of a Readable#_read(), but Transforms are bit different because they come with their own _read and _write methods. You shouldn't need to override those just to call push multiple times in my opinion. Or at least there's some missing documentation around this use case since you can't even leverage the drain event!

@dschnare
Copy link

Here's an updated version of pushWithBackpressure that does work as expected:

const { Duplex } = require('stream')

/**
 * Pushes one or more chunks to a stream stream while handling backpressure.
 * 
 * @example
 * [1, 2, 3, 4, 5, 6, 7, 8, 9 , 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20]
 * pushWithBackpressure(stream, chunks, () => ...)
 * pushWithBackpressure(stream, ['hi', 'hello'], 'utf8', () => ...)
 * @param {Duplex} stream The Duplex stream the chunks will be pushed to
 * @param {any} chunks The chunk or array of chunks to push to a stream stream
 * @param {string|Function} [encoding] The encoding of each string chunk or callback function
 * @param {Function} [callback] Callback function called after all chunks have been pushed to the stream
 * @retrn {Duplex}
 */
const pushWithBackpressure = (stream, chunks, encoding, callback = null, $index = 0) => {
  if (!(stream instanceof Duplex)) {
    throw new TypeError('Argument "stream" must be an instance of Duplex')
  }
  chunks = [].concat(chunks).filter(x => x !== undefined)
  if (typeof encoding === 'function') {
    callback = encoding
    encoding = undefined
  }
  if ($index >= chunks.length) {
    if (typeof callback === 'function') {
      callback()
    }
    return stream
  } else if (!stream.push(chunks[$index], ...([encoding].filter(Boolean)))) {
    const pipedStreams = [].concat(
      (stream._readableState || {}).pipes || stream
    ).filter(Boolean)
    let listenerCalled = false
    const drainListener = () => {
      if (listenerCalled) {
        return
      }
      listenerCalled = true
      for (const stream of pipedStreams) {
        stream.removeListener('drain', drainListener)
      }
      pushWithBackpressure(stream, chunks, encoding, callback, $index + 1)
    }
    for (const stream of pipedStreams) {
      stream.once('drain', drainListener)
    }
    return stream
  }
  return pushWithBackpressure(stream, chunks, encoding, callback, $index + 1)
}

exports.pushWithBackpressure = pushWithBackpressure

@dschnare
Copy link

@squarewav I couldn't have done this without your help. Thanks, just wish there was a better way to go about this.

@summer-ji-eng
Copy link

OMG, I'm having exactly same issue and went through the same painful process. Really wish someone could update the documentation on how to properly handle backpressure, or add mechanism of handling it easily.

@alexandremacedo
Copy link

alexandremacedo commented Oct 30, 2023

It work's for me:
Using .on("data").on("end") and .on("drain")

readStream.pipe(transformer1).pipe(slowTransformer2).on("data", (csv) => {
    if (!writeStream.write(csv)) {
        readStream.pause();
    }
}).on("end", () => {
    writeStream.end();
});

writeStream.on("drain", () => {
    readStream.resume();
});

@lio-mengxiang
Copy link

lio-mengxiang commented Feb 26, 2024

nevertheless

Do Node's Transform Streams have to watch the push's return value and wait for the drain event as well? Since under they mask they're just two read and write streams connected together, I'd assume so, nevertheless all online implementations seem to just push at will.

export class MyTransform extends stream.Transform {
  constructor() {
    super({ objectMode: true });
  }
  
  async _transform(chunk: any, encoding: string, callback: TransformCallback) {
    const arrayOfStrings = extractStrings(chunk);
    for (const string of arrayOfStrings) {
      if (!this.push(string)) {
        await new Promise((res) => this.once("drain", res));
        callback();
      }
    }
  }
}

hello, i have carefully debugged the source code about readable 、writable code (transform stream rely on them). so don't need to determine the returned value of this.push in transform stream. because pipe method can help you do this.

you only should use the transform stream like following code:

export class MyTransform extends stream.Transform {
  constructor() {
    super({ objectMode: true });
  }
  
  async _transform(chunk: any, encoding: string, callback: TransformCallback) {
    const arrayOfStrings = extractStrings(chunk);
    for (const string of arrayOfStrings) {
        this.push(string)
        callback();
    }
  }
}

it's very important to call this.push and callback synchronously.

why is that?

you can understand simply that this.push only push previous data into the readable stream's buffer, and callback() will use these data immediately without passing writeable stream's buffer. so it's very fit to pipe method to control the back pressure. i can Implement a simplified pipe method in node.js:

pipe(ws){
  this.on('data',(chunk)=>{
    let flag = ws.write(chunk);
    if(!flag){
      this.pause();
    }
  });
  ws.on('drain',()=>{
    this.resume();
  })
}

i wrote a chinese article using exploring a similar principle,so forgive me for not being able to provide articles in English

@avivkeller
Copy link
Member

@vlopp is your issue resolved?

Copy link

github-actions bot commented Jan 7, 2025

It seems there has been no activity on this issue for a while, and it is being closed in 30 days. If you believe this issue should remain open, please leave a comment.
If you need further assistance or have questions, you can also search for similar issues on Stack Overflow.
Make sure to look at the README file for the most updated links.

@github-actions github-actions bot added the stale label Jan 7, 2025
Copy link

github-actions bot commented Feb 6, 2025

It seems there has been no activity on this issue for a while, and it is being closed. If you believe this issue should remain open, please leave a comment.
If you need further assistance or have questions, you can also search for similar issues on Stack Overflow.
Make sure to look at the README file for the most updated links.

@github-actions github-actions bot closed this as not planned Won't fix, can't repro, duplicate, stale Feb 6, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

7 participants