Skip to content

Commit 4832d1c

Browse files
committed
stream: add isDisturbed helper
Adds a helper util used to determine whether a stream has been disturbed (read or cancelled). Refs: #39627 PR-URL: #39628 Reviewed-By: Benjamin Gruenbaum <[email protected]> Reviewed-By: Matteo Collina <[email protected]>
1 parent 0e841b4 commit 4832d1c

8 files changed

+169
-11
lines changed

doc/api/stream.md

+26-3
Original file line numberDiff line numberDiff line change
@@ -1261,16 +1261,27 @@ added: v11.4.0
12611261
Is `true` if it is safe to call [`readable.read()`][stream-read], which means
12621262
the stream has not been destroyed or emitted `'error'` or `'end'`.
12631263

1264+
##### `readable.readableAborted`
1265+
<!-- YAML
1266+
added: REPLACEME
1267+
-->
1268+
1269+
> Stability: 1 - Experimental
1270+
1271+
* {boolean}
1272+
1273+
Returns whether the stream was destroyed or errored before emitting `'end'`.
1274+
12641275
##### `readable.readableDidRead`
12651276
<!-- YAML
12661277
added: REPLACEME
12671278
-->
12681279

1280+
> Stability: 1 - Experimental
1281+
12691282
* {boolean}
12701283

1271-
Allows determining if the stream has been or is about to be read.
1272-
Returns true if `'data'`, `'end'`, `'error'` or `'close'` has been
1273-
emitted.
1284+
Returns whether `'data'` has been emitted.
12741285

12751286
##### `readable.readableEncoding`
12761287
<!-- YAML
@@ -2046,6 +2057,18 @@ added: REPLACEME
20462057
* `signal` {AbortSignal}
20472058
* Returns: {stream.Readable}
20482059

2060+
### `stream.Readable.isDisturbed(stream)`
2061+
<!-- YAML
2062+
added: REPLACEME
2063+
-->
2064+
2065+
> Stability: 1 - Experimental
2066+
2067+
* `stream` {stream.Readable|ReadableStream}
2068+
* Returns: `boolean`
2069+
2070+
Returns whether the stream has been read from or cancelled.
2071+
20492072
### `stream.Readable.toWeb(streamReadable)`
20502073
<!-- YAML
20512074
added: REPLACEME

lib/internal/streams/readable.js

+9-6
Original file line numberDiff line numberDiff line change
@@ -1180,12 +1180,15 @@ ObjectDefineProperties(Readable.prototype, {
11801180
readableDidRead: {
11811181
enumerable: false,
11821182
get: function() {
1183-
return (
1184-
this._readableState.dataEmitted ||
1185-
this._readableState.endEmitted ||
1186-
this._readableState.errorEmitted ||
1187-
this._readableState.closeEmitted
1188-
);
1183+
return this._readableState.dataEmitted;
1184+
}
1185+
},
1186+
1187+
readableAborted: {
1188+
enumerable: false,
1189+
get: function() {
1190+
return !!(this._readableState.destroyed || this._readableState.errored) &&
1191+
!this._readableState.endEmitted;
11891192
}
11901193
},
11911194

lib/internal/streams/utils.js

+11
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ const {
77
} = primordials;
88

99
const kDestroyed = Symbol('kDestroyed');
10+
const kIsDisturbed = Symbol('kIsDisturbed');
1011

1112
function isReadableNodeStream(obj) {
1213
return !!(
@@ -204,8 +205,18 @@ function willEmitClose(stream) {
204205
);
205206
}
206207

208+
function isDisturbed(stream) {
209+
return !!(stream && (
210+
stream.readableDidRead ||
211+
stream.readableAborted ||
212+
stream[kIsDisturbed]
213+
));
214+
}
215+
207216
module.exports = {
208217
kDestroyed,
218+
isDisturbed,
219+
kIsDisturbed,
209220
isClosed,
210221
isDestroyed,
211222
isDuplexNodeStream,

lib/internal/webstreams/readablestream.js

+9
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,10 @@ const {
8080
queueMicrotask,
8181
} = require('internal/process/task_queues');
8282

83+
const {
84+
kIsDisturbed,
85+
} = require('internal/streams/utils');
86+
8387
const {
8488
ArrayBufferViewGetBuffer,
8589
ArrayBufferViewGetByteLength,
@@ -200,6 +204,7 @@ class ReadableStream {
200204
promise: undefined,
201205
}
202206
};
207+
203208
// The spec requires handling of the strategy first
204209
// here. Specifically, if getting the size and
205210
// highWaterMark from the strategy fail, that has
@@ -232,6 +237,10 @@ class ReadableStream {
232237
return makeTransferable(this);
233238
}
234239

240+
get [kIsDisturbed]() {
241+
return this[kState].disturbed;
242+
}
243+
235244
/**
236245
* @readonly
237246
* @type {boolean}

lib/stream.js

+1
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ const internalBuffer = require('internal/buffer');
3838
const promises = require('stream/promises');
3939

4040
const Stream = module.exports = require('internal/streams/legacy').Stream;
41+
Stream.isDisturbed = require('internal/streams/utils').isDisturbed;
4142
Stream.Readable = require('internal/streams/readable');
4243
Stream.Writable = require('internal/streams/writable');
4344
Stream.Duplex = require('internal/streams/duplex');
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const assert = require('assert');
5+
const { Readable } = require('stream');
6+
7+
{
8+
const readable = new Readable({
9+
read() {
10+
}
11+
});
12+
assert.strictEqual(readable.readableAborted, false);
13+
readable.destroy();
14+
assert.strictEqual(readable.readableAborted, true);
15+
}
16+
17+
{
18+
const readable = new Readable({
19+
read() {
20+
}
21+
});
22+
assert.strictEqual(readable.readableAborted, false);
23+
readable.push(null);
24+
readable.destroy();
25+
assert.strictEqual(readable.readableAborted, true);
26+
}
27+
28+
{
29+
const readable = new Readable({
30+
read() {
31+
}
32+
});
33+
assert.strictEqual(readable.readableAborted, false);
34+
readable.push('asd');
35+
readable.destroy();
36+
assert.strictEqual(readable.readableAborted, true);
37+
}
38+
39+
{
40+
const readable = new Readable({
41+
read() {
42+
}
43+
});
44+
assert.strictEqual(readable.readableAborted, false);
45+
readable.push('asd');
46+
readable.push(null);
47+
assert.strictEqual(readable.readableAborted, false);
48+
readable.on('end', common.mustCall(() => {
49+
assert.strictEqual(readable.readableAborted, false);
50+
readable.destroy();
51+
assert.strictEqual(readable.readableAborted, false);
52+
queueMicrotask(() => {
53+
assert.strictEqual(readable.readableAborted, false);
54+
});
55+
}));
56+
readable.resume();
57+
}

test/parallel/test-stream-readable-didRead.js

+6-2
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
'use strict';
22
const common = require('../common');
33
const assert = require('assert');
4-
const Readable = require('stream').Readable;
4+
const { isDisturbed, Readable } = require('stream');
55

66
function noop() {}
77

88
function check(readable, data, fn) {
99
assert.strictEqual(readable.readableDidRead, false);
10+
assert.strictEqual(isDisturbed(readable), false);
1011
if (data === -1) {
1112
readable.on('error', common.mustCall());
1213
readable.on('data', common.mustNotCall());
@@ -27,7 +28,10 @@ function check(readable, data, fn) {
2728
readable.on('close', common.mustCall());
2829
fn();
2930
setImmediate(() => {
30-
assert.strictEqual(readable.readableDidRead, true);
31+
assert.strictEqual(readable.readableDidRead, data > 0);
32+
if (data > 0) {
33+
assert.strictEqual(isDisturbed(readable), true);
34+
}
3135
});
3236
}
3337

test/parallel/test-whatwg-readablestream.js

+50
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
'use strict';
33

44
const common = require('../common');
5+
const { isDisturbed } = require('stream');
56
const assert = require('assert');
67
const {
78
isPromise,
@@ -1520,3 +1521,52 @@ class Source {
15201521
readableByteStreamControllerClose(controller);
15211522
readableByteStreamControllerEnqueue(controller);
15221523
}
1524+
1525+
{
1526+
const stream = new ReadableStream({
1527+
start(controller) {
1528+
controller.enqueue('a');
1529+
controller.close();
1530+
},
1531+
pull: common.mustNotCall(),
1532+
});
1533+
1534+
const reader = stream.getReader();
1535+
(async () => {
1536+
isDisturbed(stream, false);
1537+
await reader.read();
1538+
isDisturbed(stream, true);
1539+
})().then(common.mustCall());
1540+
}
1541+
1542+
{
1543+
const stream = new ReadableStream({
1544+
start(controller) {
1545+
controller.close();
1546+
},
1547+
pull: common.mustNotCall(),
1548+
});
1549+
1550+
const reader = stream.getReader();
1551+
(async () => {
1552+
isDisturbed(stream, false);
1553+
await reader.read();
1554+
isDisturbed(stream, true);
1555+
})().then(common.mustCall());
1556+
}
1557+
1558+
{
1559+
const stream = new ReadableStream({
1560+
start(controller) {
1561+
},
1562+
pull: common.mustNotCall(),
1563+
});
1564+
stream.cancel();
1565+
1566+
const reader = stream.getReader();
1567+
(async () => {
1568+
isDisturbed(stream, false);
1569+
await reader.read();
1570+
isDisturbed(stream, true);
1571+
})().then(common.mustCall());
1572+
}

0 commit comments

Comments
 (0)