Skip to content

Commit af7047a

Browse files
ronagtargos
authored andcommitted
stream: add isDisturbed helper
Adds a helper util used to determine whether a stream has been disturbed (read or cancelled). Refs: #39627 PR-URL: #39628 Backport-PR-URL: #39819 Reviewed-By: Benjamin Gruenbaum <[email protected]> Reviewed-By: Matteo Collina <[email protected]>
1 parent 3a8399e commit af7047a

8 files changed

+171
-11
lines changed

doc/api/stream.md

+26-3
Original file line numberDiff line numberDiff line change
@@ -1259,16 +1259,27 @@ added: v11.4.0
12591259
Is `true` if it is safe to call [`readable.read()`][stream-read], which means
12601260
the stream has not been destroyed or emitted `'error'` or `'end'`.
12611261

1262+
##### `readable.readableAborted`
1263+
<!-- YAML
1264+
added: REPLACEME
1265+
-->
1266+
1267+
> Stability: 1 - Experimental
1268+
1269+
* {boolean}
1270+
1271+
Returns whether the stream was destroyed or errored before emitting `'end'`.
1272+
12621273
##### `readable.readableDidRead`
12631274
<!-- YAML
12641275
added: v16.7.0
12651276
-->
12661277

1278+
> Stability: 1 - Experimental
1279+
12671280
* {boolean}
12681281

1269-
Allows determining if the stream has been or is about to be read.
1270-
Returns true if `'data'`, `'end'`, `'error'` or `'close'` has been
1271-
emitted.
1282+
Returns whether `'data'` has been emitted.
12721283

12731284
##### `readable.readableEncoding`
12741285
<!-- YAML
@@ -1943,6 +1954,18 @@ Calling `Readable.from(string)` or `Readable.from(buffer)` will not have
19431954
the strings or buffers be iterated to match the other streams semantics
19441955
for performance reasons.
19451956

1957+
### `stream.Readable.isDisturbed(stream)`
1958+
<!-- YAML
1959+
added: REPLACEME
1960+
-->
1961+
1962+
> Stability: 1 - Experimental
1963+
1964+
* `stream` {stream.Readable|ReadableStream}
1965+
* Returns: `boolean`
1966+
1967+
Returns whether the stream has been read from or cancelled.
1968+
19461969
### `stream.addAbortSignal(signal, stream)`
19471970
<!-- YAML
19481971
added: v15.4.0

lib/internal/streams/readable.js

+9-6
Original file line numberDiff line numberDiff line change
@@ -1191,12 +1191,15 @@ ObjectDefineProperties(Readable.prototype, {
11911191
readableDidRead: {
11921192
enumerable: false,
11931193
get: function() {
1194-
return (
1195-
this._readableState.dataEmitted ||
1196-
this._readableState.endEmitted ||
1197-
this._readableState.errorEmitted ||
1198-
this._readableState.closeEmitted
1199-
);
1194+
return this._readableState.dataEmitted;
1195+
}
1196+
},
1197+
1198+
readableAborted: {
1199+
enumerable: false,
1200+
get: function() {
1201+
return !!(this._readableState.destroyed || this._readableState.errored) &&
1202+
!this._readableState.endEmitted;
12001203
}
12011204
},
12021205

lib/internal/streams/utils.js

+13
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
'use strict';
22

33
const {
4+
Symbol,
45
SymbolAsyncIterator,
56
SymbolIterator,
67
} = primordials;
78

9+
const kIsDisturbed = Symbol('kIsDisturbed');
10+
811
function isReadable(obj) {
912
return !!(obj && typeof obj.pipe === 'function' &&
1013
typeof obj.on === 'function');
@@ -27,7 +30,17 @@ function isIterable(obj, isAsync) {
2730
typeof obj[SymbolIterator] === 'function';
2831
}
2932

33+
function isDisturbed(stream) {
34+
return !!(stream && (
35+
stream.readableDidRead ||
36+
stream.readableAborted ||
37+
stream[kIsDisturbed]
38+
));
39+
}
40+
3041
module.exports = {
42+
isDisturbed,
43+
kIsDisturbed,
3144
isIterable,
3245
isReadable,
3346
isStream,

lib/internal/webstreams/readablestream.js

+9
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,10 @@ const {
8080
queueMicrotask,
8181
} = require('internal/process/task_queues');
8282

83+
const {
84+
kIsDisturbed,
85+
} = require('internal/streams/utils');
86+
8387
const {
8488
ArrayBufferViewGetBuffer,
8589
ArrayBufferViewGetByteLength,
@@ -200,6 +204,7 @@ class ReadableStream {
200204
promise: undefined,
201205
}
202206
};
207+
203208
// The spec requires handling of the strategy first
204209
// here. Specifically, if getting the size and
205210
// highWaterMark from the strategy fail, that has
@@ -232,6 +237,10 @@ class ReadableStream {
232237
return makeTransferable(this);
233238
}
234239

240+
get [kIsDisturbed]() {
241+
return this[kState].disturbed;
242+
}
243+
235244
/**
236245
* @readonly
237246
* @type {boolean}

lib/stream.js

+1
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ const internalBuffer = require('internal/buffer');
3636
const promises = require('stream/promises');
3737

3838
const Stream = module.exports = require('internal/streams/legacy').Stream;
39+
Stream.isDisturbed = require('internal/streams/utils').isDisturbed;
3940
Stream.Readable = require('internal/streams/readable');
4041
Stream.Writable = require('internal/streams/writable');
4142
Stream.Duplex = require('internal/streams/duplex');
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const assert = require('assert');
5+
const { Readable } = require('stream');
6+
7+
{
8+
const readable = new Readable({
9+
read() {
10+
}
11+
});
12+
assert.strictEqual(readable.readableAborted, false);
13+
readable.destroy();
14+
assert.strictEqual(readable.readableAborted, true);
15+
}
16+
17+
{
18+
const readable = new Readable({
19+
read() {
20+
}
21+
});
22+
assert.strictEqual(readable.readableAborted, false);
23+
readable.push(null);
24+
readable.destroy();
25+
assert.strictEqual(readable.readableAborted, true);
26+
}
27+
28+
{
29+
const readable = new Readable({
30+
read() {
31+
}
32+
});
33+
assert.strictEqual(readable.readableAborted, false);
34+
readable.push('asd');
35+
readable.destroy();
36+
assert.strictEqual(readable.readableAborted, true);
37+
}
38+
39+
{
40+
const readable = new Readable({
41+
read() {
42+
}
43+
});
44+
assert.strictEqual(readable.readableAborted, false);
45+
readable.push('asd');
46+
readable.push(null);
47+
assert.strictEqual(readable.readableAborted, false);
48+
readable.on('end', common.mustCall(() => {
49+
assert.strictEqual(readable.readableAborted, false);
50+
readable.destroy();
51+
assert.strictEqual(readable.readableAborted, false);
52+
queueMicrotask(() => {
53+
assert.strictEqual(readable.readableAborted, false);
54+
});
55+
}));
56+
readable.resume();
57+
}

test/parallel/test-stream-readable-didRead.js

+6-2
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
'use strict';
22
const common = require('../common');
33
const assert = require('assert');
4-
const Readable = require('stream').Readable;
4+
const { isDisturbed, Readable } = require('stream');
55

66
function noop() {}
77

88
function check(readable, data, fn) {
99
assert.strictEqual(readable.readableDidRead, false);
10+
assert.strictEqual(isDisturbed(readable), false);
1011
if (data === -1) {
1112
readable.on('error', common.mustCall());
1213
readable.on('data', common.mustNotCall());
@@ -27,7 +28,10 @@ function check(readable, data, fn) {
2728
readable.on('close', common.mustCall());
2829
fn();
2930
setImmediate(() => {
30-
assert.strictEqual(readable.readableDidRead, true);
31+
assert.strictEqual(readable.readableDidRead, data > 0);
32+
if (data > 0) {
33+
assert.strictEqual(isDisturbed(readable), true);
34+
}
3135
});
3236
}
3337

test/parallel/test-whatwg-readablestream.js

+50
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
'use strict';
33

44
const common = require('../common');
5+
const { isDisturbed } = require('stream');
56
const assert = require('assert');
67
const {
78
isPromise,
@@ -1520,3 +1521,52 @@ class Source {
15201521
readableByteStreamControllerClose(controller);
15211522
readableByteStreamControllerEnqueue(controller);
15221523
}
1524+
1525+
{
1526+
const stream = new ReadableStream({
1527+
start(controller) {
1528+
controller.enqueue('a');
1529+
controller.close();
1530+
},
1531+
pull: common.mustNotCall(),
1532+
});
1533+
1534+
const reader = stream.getReader();
1535+
(async () => {
1536+
isDisturbed(stream, false);
1537+
await reader.read();
1538+
isDisturbed(stream, true);
1539+
})().then(common.mustCall());
1540+
}
1541+
1542+
{
1543+
const stream = new ReadableStream({
1544+
start(controller) {
1545+
controller.close();
1546+
},
1547+
pull: common.mustNotCall(),
1548+
});
1549+
1550+
const reader = stream.getReader();
1551+
(async () => {
1552+
isDisturbed(stream, false);
1553+
await reader.read();
1554+
isDisturbed(stream, true);
1555+
})().then(common.mustCall());
1556+
}
1557+
1558+
{
1559+
const stream = new ReadableStream({
1560+
start(controller) {
1561+
},
1562+
pull: common.mustNotCall(),
1563+
});
1564+
stream.cancel();
1565+
1566+
const reader = stream.getReader();
1567+
(async () => {
1568+
isDisturbed(stream, false);
1569+
await reader.read();
1570+
isDisturbed(stream, true);
1571+
})().then(common.mustCall());
1572+
}

0 commit comments

Comments
 (0)