Skip to content

Commit 226ff00

Browse files
committed
stream: fix fromAsyncGen
Fixes: nodejs#40497
1 parent c0a7020 commit 226ff00

File tree

2 files changed

+31
-2
lines changed

2 files changed

+31
-2
lines changed

lib/internal/streams/duplexify.js

+3-1
Original file line numberDiff line numberDiff line change
@@ -210,11 +210,13 @@ function fromAsyncGen(fn) {
210210
const value = fn(async function*() {
211211
while (true) {
212212
const { chunk, done, cb } = await promise;
213+
promise = null;
214+
resolve = null;
213215
process.nextTick(cb);
214216
if (done) return;
215217
if (signal.aborted) throw new AbortError();
216-
yield chunk;
217218
({ promise, resolve } = createDeferredPromise());
219+
yield chunk;
218220
}
219221
}(), { signal });
220222

test/parallel/test-stream-duplex-from.js

+28-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
const common = require('../common');
44
const assert = require('assert');
5-
const { Duplex, Readable, Writable } = require('stream');
5+
const { Duplex, Readable, Writable, pipeline } = require('stream');
66

77
{
88
const d = Duplex.from({
@@ -118,3 +118,30 @@ const { Duplex, Readable, Writable } = require('stream');
118118
assert.strictEqual(d.readable, false);
119119
}));
120120
}
121+
122+
{
123+
// https://github.com/nodejs/node/issues/40497
124+
pipeline(
125+
['abc\ndef\nghi'],
126+
Duplex.from(async function * (source) {
127+
let rest = ''
128+
for await (const chunk of source) {
129+
console.error(0)
130+
const lines = (rest + chunk.toString()).split('\n')
131+
rest = lines.pop()
132+
for (const line of lines) {
133+
yield line
134+
}
135+
}
136+
yield rest
137+
}),
138+
async function * (source) {
139+
let ret = ''
140+
for await (const x of source) {
141+
ret += x
142+
}
143+
assert.strictEqual(ret, 'abcdefghi')
144+
},
145+
common.mustCall(() => {}),
146+
)
147+
}

0 commit comments

Comments
 (0)