Skip to content
This repository was archived by the owner on Jan 26, 2022. It is now read-only.

NodeJS Stream Meeting Notes - Async Iterators #74

Closed
benjamingr opened this issue Jan 13, 2017 · 13 comments
Closed

NodeJS Stream Meeting Notes - Async Iterators #74

benjamingr opened this issue Jan 13, 2017 · 13 comments

Comments

@benjamingr
Copy link

benjamingr commented Jan 13, 2017

Hey,

Node discussed Symbol.asyncIterator streams in the last WG meeting yesterday (I had to miss it at the last minute :( ).

Symbol.asyncIterator can be supported inside streams without internal changes, however merging it inside core depends on what is the adopted change for Promises in core. We are happy to maintain the prototype and expand if needed. We should take care on what is the story for writing to streams, as it should match somehow the story for reading, i.e. we should be able to write with promises as well.

One issue raised was how to make async iterators work with writable streams - that is, there needs to be a way to do:

async function* foo() {
   yield 1;
   yield 2;
   yield 3;
}

And have that coerced into a NodeJS stream - I don't think it should be too problematic to do in Node and I'm excited that the streams WG is keeping a prototype for interop. Pinging @mcollina who attended the meeting.

@mcollina
Copy link

Thanks @benjamingr. I hope this will clarify things further.

Node.js Streams are composed of three types/classes:

  • Readable, the source of the data
  • Writable, the destination of the data
  • Duplex, both Readable and Writable, for socket etc
  • Transform, both Readable and Writable, to be used to transform data as it pass by

If we are not interested in controlling the amount of data (backpressure) that the Node.js process accepts, we can use the 'data' event:

stream.on('data', function (chunk) {
  // do something with chunk
})

stream.on('end', function () {
  // data finished
})

If we are interested in controlling the amount of data that the Node.js process accepts, we can use pipe():

readable.pipe(transform).pipe(writable)

We can also concatenate multiple transforms:

readable.pipe(transform1).pipe(transform2).pipe(writable)

AsyncIterator is another way we can interact with streams, without using pipe(), but still controlling the amount of data that we can accept, respecting backpressure.

In @calvinmetcalf prototype, we can do:

async function test() {
  for await (let x of new MyStream()) {
    console.log(x);
  }
}

test().then(()=>console.log('done'), e=>{console.log(e)});

This allows us to read from a stream just using a language feature. However it is not clear how we could write() to another stream, or create a processing pipeline like pipe().

Ideally we should be able to do:

async function doubleStream (iter) {
   for await (let x of iter) {
     yield x * 2
   }
}

async function toDest (iter, dest) {
   for await (let x of iter) {
     await dest.write(s)
   }

  await dest.end()
}


toDest(doubleStream(doubleStream(stream)), writable)

Error handling was also a topic in the meeting, but it is not clear what it happens if something inside the for await loops throws. As it seems, it should land in the AsyncIterator.throw method if we understand this correctly, but I do not see it happening in our demo. We might want to destroy the underlining resources if that happens, however we should probably discuss this further between ourselves.

@RangerMauve
Copy link

Would it make sense to allow Writeable.write to take an async (or sync) iterator and write the contents to the stream by iterating over it?

async function *messagesFromSomewhere(somewhere){
 // etc...
}

var messages = messagesFromSomewhere("here");

fs.createWriteStream("messages.txt").write(messages);

This would make it easy to consume sequences with streams, and I think would be intuitive to use without breaking backwards compatibility.

@mcollina
Copy link

async function *messagesFromSomewhere(somewhere){
 // etc...
}

var source = Readable.wrap(messagesFromSomewhere("here"))
var dest = fs.createWriteStream("messages.txt")

source.pipe(dest)

Can I know if messagesFromSomewhere is finished?

@calvinmetcalf
Copy link

yes the promise result includes a done member, having the ability to create a readable from an async iterator seams like a good idea

@RangerMauve
Copy link

Would it make sense to call it Readable.from() to be similar to Array.from and similar methods in other types like Observable?

Would it make sense to allow regular iterables like Array or Set, then? This would potentially help get rid of the need for user libraries like stream-array.

@calvinmetcalf
Copy link

we'd probably need it to be more like the Readable.prototype.wrap so that options could be sepcified (i.e. objectMode, highwatermark and encoding)

@mcollina
Copy link

@RangerMauve I agree. However it's not the point of the discussion, we can have that in our readable-stream repo.

I think this issue is only about async-iterator, whatever the method name is, we need to make sure can wrap them easily, and possibly assemble them in a transform pipeline.

@calvinmetcalf do you know how would you implement Readable.wrap()?

@calvinmetcalf
Copy link

yes I'll open a pull for async-iter

@littledan
Copy link
Member

This is all very interesting; it's really great to hear that async iteration may fit seemlessly with Node Streams. Do you have any feedback that could affect the semantics of this proposal? Do the currently proposed semantics work well for Node?

@calvinmetcalf
Copy link

I've been following this spec for a while and am a big fan and think it's a great fit.

@mcollina
Copy link

I didn't know about it, and I'm really happy with the spec.

@rauschma
Copy link

What is the officially recommended way for using for-await-of with streams on Node.js at the moment? An npm package such as async-iterate-stream?

@vsemozhetbyt
Copy link

See nodejs/node#17755

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

8 participants