Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit be3bb61

Browse files
committedMar 16, 2015
Fix bugs in reader releasing algorithm
As illustrated by the new tests, there were previously issues around auto-releasing not always giving the right result for errored or closed streams, especially with regard to allowing future readers to be acquired. This commit simplifies the logic and ensures consistency across all readers obtained from errored or closed streams.
1 parent f6a2011 commit be3bb61

File tree

5 files changed

+155
-47
lines changed

5 files changed

+155
-47
lines changed
 

‎index.bs

+45-24
Original file line numberDiff line numberDiff line change
@@ -539,7 +539,17 @@ Instances of <code>ReadableStreamReader</code> are created with the internal slo
539539
<tr>
540540
<td>\[[readRequests]]
541541
<td>A List of promises returned by calls to the reader's <code>read()</code> method that have not yet been resolved,
542-
due to the <a>consumer</a> requesting <a>chunks</a> sooner than they are available.
542+
due to the <a>consumer</a> requesting <a>chunks</a> sooner than they are available
543+
</tr>
544+
<tr>
545+
<td>\[[state]]
546+
<td>A string containing the reader's current state, used internally; one of <code>"readable"</code>,
547+
<code>"closed"</code>, or <code>"errored"</code>
548+
</tr>
549+
<tr>
550+
<td>\[[storedError]]
551+
<td>A value indicating how the reader's stream failed, to be given as a failure reason or exception when trying to
552+
operate on the reader; applicable only when \[[state]] is <code>"errored"</code>
543553
</tr>
544554
</table>
545555

@@ -556,12 +566,12 @@ Instances of <code>ReadableStreamReader</code> are created with the internal slo
556566
<li> If IsReadableStreamLocked(<var>stream</var>) is <b>true</b>, throw a <b>TypeError</b> exception.
557567
<li> Set <var>stream</var>@\[[reader]] to <b>this</b>.
558568
<li> Set <b>this</b>@\[[ownerReadableStream]] to <var>stream</var>.
569+
<li> Set <b>this</b>@\[[state]] to <code>"readable"</code>.
570+
<li> Set <b>this</b>@\[[storedError]] to <b>undefined</b>.
559571
<li> Set <b>this</b>@\[[readRequests]] to a new empty List.
560572
<li> Set <b>this</b>@\[[closedPromise]] to a new promise.
561-
<li> If <var>stream</var>@\[[state]] is <code>"closed"</code>, resolve <b>this</b>@\[[closedPromise]] with
562-
<b>undefined</b>.
563-
<li> If <var>stream</var>@\[[state]] is <code>"errored"</code>, reject <b>this</b>@\[[closedPromise]] with
564-
<var>stream</bar>@\[[storedError]].
573+
<li> If <var>stream</var>@\[[state]] is <code>"closed"</code> or <code>"errored"</code>, call-with-rethrow
574+
ReleaseReadableStreamReader(<b>this</b>).
565575
</ol>
566576

567577
<h4 id="reader-prototype">Properties of the <code>ReadableStreamReader</code> Prototype</h4>
@@ -589,8 +599,11 @@ Instances of <code>ReadableStreamReader</code> are created with the internal slo
589599
<ol>
590600
<li> If IsReadableStreamReader(<b>this</b>) is <b>false</b>, return a promise rejected with a <b>TypeError</b>
591601
exception.
592-
<li> If <b>this</b>@\[[ownerReadableStream]] is <b>undefined</b>, return a new promise resolved with
593-
<b>undefined</b>.
602+
<li> If <b>this</b>@\[[state]] is <code>"closed"</code>, return a new promise resolved with <b>undefined</b>.
603+
<li> If <b>this</b>@\[[state]] is <code>"errored"</code>, return a new promise rejected with
604+
<b>this</b>@\[[storedError]].
605+
<li> Assert: <b>this</b>@\[[ownerReadableStream]] is not <b>undefined</b>.
606+
<li> Assert: <b>this</b>@\[[ownerReadableStream]]@\[[state]] is <code>"readable"</code>.
594607
<li> Return CancelReadableStream(<b>this</b>@\[[ownerReadableStream]], <var>reason</var>).
595608
</ol>
596609

@@ -614,11 +627,12 @@ Instances of <code>ReadableStreamReader</code> are created with the internal slo
614627

615628
<ol>
616629
<li> If IsReadableStreamReader(<b>this</b>) is <b>false</b>, throw a <b>TypeError</b> exception.
617-
<li> If <b>this</b>@\[[ownerReadableStream]] is <b>undefined</b> or
618-
<b>this</b>@\[[ownerReadableStream]]@\[[state]] is <code>"closed"</code>, return a new promise resolved with
630+
<li> If <b>this</b>@\[[state]] is <code>"closed"</code>, return a new promise resolved with
619631
CreateIterResultObject(<b>undefined</b>, <b>true</b>).
620-
<li> If <b>this</b>@\[[ownerReadableStream]]@\[[state]] is <code>"errored"</code>, return a new promise
621-
rejected with <b>this</b>@\[[ownerReadableStream]]@\[[storedError]].
632+
<li> If <b>this</b>@\[[state]] is <code>"errored"</code>, return a new promise rejected with
633+
<b>this</b>@\[[storedError]].
634+
<li> Assert: <b>this</b>@\[[ownerReadableStream]] is not <b>undefined</b>.
635+
<li> Assert: <b>this</b>@\[[ownerReadableStream]]@\[[state]] is <code>"readable"</code>.
622636
<li> If <b>this</b>@\[[ownerReadableStream]]@\[[queue]] is not empty,
623637
<ol>
624638
<li> Let <var>chunk</var> be DequeueValue(<b>this</b>@\[[ownerReadableStream]]@\[[queue]]).
@@ -820,16 +834,8 @@ a variable <var>stream</var>, that performs the following steps:
820834
<li> Let <var>stream</var>@\[[queue]] be a new empty List.
821835
<li> Set <var>stream</var>@\[[storedError]] to <var>e</var>.
822836
<li> Set <var>stream</var>@\[[state]] to <code>"errored"</code>.
823-
<li> If IsReadableStreamLocked(<var>stream</var>) is <b>true</b>,
824-
<ol>
825-
<li> Reject <var>stream</var>@\[[reader]]@\[[closedPromise]] with <var>e</var>.
826-
<li> Repeat for each <var>readRequestPromise</var> that is an element of
827-
<var>stream</var>@\[[reader]]@\[[readRequests]],
828-
<ol>
829-
<li> Reject <var>readRequestPromise</var> with <var>e</var>.
830-
</ol>
831-
<li> Set <var>stream</var>@\[[reader]]@\[[readRequests]] to a new empty List.
832-
</ol>
837+
<li> If IsReadableStreamLocked(<var>stream</var>) is <b>true</b>, return
838+
ReleaseReadableStreamReader(<var>stream</var>@\[[reader]]).
833839
</ol>
834840

835841
<h4 id="is-readable-stream">IsReadableStream ( x )</h4>
@@ -865,14 +871,29 @@ a variable <var>stream</var>, that performs the following steps:
865871

866872
<ol>
867873
<li> Assert: <var>reader</var>@\[[ownerReadableStream]] is not <b>undefined</b>.
868-
<li> Repeat for each <var>readRequestPromise</var> that is an element of <var>reader</var>@\[[readRequests]],
874+
<li> If <var>reader</var>@\[[ownerReadableStream]]@\[[state]] is <code>"errored"</code>,
869875
<ol>
870-
<li> Resolve <var>readRequestPromise</var> with CreateIterResultObject(<b>undefined</b>, <b>true</b>).
876+
<li> Set <var>reader</var>@\[[state]] to <code>"errored"</code>.
877+
<li> Let <var>e</var> be <var>reader</var>@\[[ownerReadableStream]]@\[[storedError]].
878+
<li> Set <var>reader</var>@\[[storedError]] to <var>e</var>.
879+
<li> Reject <var>reader</var>@\[[closedPromise]] with <var>e</var>.
880+
<li> Repeat for each <var>readRequestPromise</var> that is an element of <var>reader</var>@\[[readRequests]],
881+
<ol>
882+
<li> Reject <var>readRequestPromise</var> with <var>e</var>.
883+
</ol>
884+
</ol>
885+
<li> Otherwise,
886+
<ol>
887+
<li> Set <var>reader</var>@\[[state]] to <code>"closed"</code>.
888+
<li> Resolve <var>reader</var>@\[[closedPromise]] with <b>undefined</b>.
889+
<li> Repeat for each <var>readRequestPromise</var> that is an element of <var>reader</var>@\[[readRequests]],
890+
<ol>
891+
<li> Resolve <var>readRequestPromise</var> with CreateIterResultObject(<b>undefined</b>, <b>true</b>).
892+
</ol>
871893
</ol>
872894
<li> Set <var>reader</var>@\[[readRequests]] to a new empty List.
873895
<li> Set <var>reader</var>@\[[ownerReadableStream]]@\[[reader]] to <b>undefined</b>.
874896
<li> Set <var>reader</var>@\[[ownerReadableStream]] to <b>undefined</b>.
875-
<li> Resolve <var>reader</var>@\[[closedPromise]] with <b>undefined</b>.
876897
</ol>
877898

878899
<h4 id="should-readable-stream-apply-backpressure">ShouldReadableStreamApplyBackpressure ( stream )</h4>

‎reference-implementation/lib/readable-stream.js

+40-21
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,8 @@ class ReadableStreamReader {
154154

155155
stream._reader = this;
156156
this._ownerReadableStream = stream;
157+
this._state = 'readable';
158+
this._storedError = undefined;
157159

158160
this._readRequests = [];
159161

@@ -162,12 +164,8 @@ class ReadableStreamReader {
162164
this._closedPromise_reject = reject;
163165
});
164166

165-
if (stream._state === 'closed') {
166-
this._closedPromise_resolve(undefined);
167-
}
168-
169-
if (stream._state === 'errored') {
170-
this._closedPromise_reject(stream._storedError);
167+
if (stream._state === 'closed' || stream._state === 'errored') {
168+
ReleaseReadableStreamReader(this);
171169
}
172170
}
173171

@@ -186,10 +184,17 @@ class ReadableStreamReader {
186184
new TypeError('ReadableStreamReader.prototype.cancel can only be used on a ReadableStreamReader'));
187185
}
188186

189-
if (this._ownerReadableStream === undefined) {
187+
if (this._state === 'closed') {
190188
return Promise.resolve(undefined);
191189
}
192190

191+
if (this._state === 'errored') {
192+
return Promise.reject(this._storedError);
193+
}
194+
195+
assert(this._ownerReadableStream !== undefined);
196+
assert(this._ownerReadableStream._state === 'readable');
197+
193198
return CancelReadableStream(this._ownerReadableStream, reason);
194199
}
195200

@@ -199,14 +204,17 @@ class ReadableStreamReader {
199204
new TypeError('ReadableStreamReader.prototype.read can only be used on a ReadableStreamReader'));
200205
}
201206

202-
if (this._ownerReadableStream === undefined || this._ownerReadableStream._state === 'closed') {
207+
if (this._state === 'closed') {
203208
return Promise.resolve(CreateIterResultObject(undefined, true));
204209
}
205210

206-
if (this._ownerReadableStream._state === 'errored') {
207-
return Promise.reject(this._ownerReadableStream._storedError);
211+
if (this._state === 'errored') {
212+
return Promise.reject(this._storedError);
208213
}
209214

215+
assert(this._ownerReadableStream !== undefined);
216+
assert(this._ownerReadableStream._state === 'readable');
217+
210218
if (this._ownerReadableStream._queue.length > 0) {
211219
const chunk = DequeueValue(this._ownerReadableStream._queue);
212220

@@ -242,7 +250,7 @@ class ReadableStreamReader {
242250
throw new TypeError('Tried to release a reader lock when that reader has pending read() calls un-settled');
243251
}
244252

245-
ReleaseReadableStreamReader(this);
253+
return ReleaseReadableStreamReader(this);
246254
}
247255
}
248256

@@ -387,12 +395,7 @@ function CreateReadableStreamErrorFunction(stream) {
387395
stream._state = 'errored';
388396

389397
if (IsReadableStreamLocked(stream) === true) {
390-
stream._reader._closedPromise_reject(e);
391-
392-
for (const { _reject } of stream._reader._readRequests) {
393-
_reject(e);
394-
}
395-
stream._reader._readRequests = [];
398+
return ReleaseReadableStreamReader(stream._reader);
396399
}
397400
};
398401
}
@@ -432,14 +435,30 @@ function IsReadableStreamReader(x) {
432435
}
433436

434437
function ReleaseReadableStreamReader(reader) {
435-
for (const { _resolve } of reader._readRequests) {
436-
_resolve(CreateIterResultObject(undefined, true));
438+
assert(reader._ownerReadableStream !== undefined);
439+
440+
if (reader._ownerReadableStream._state === 'errored') {
441+
reader._state = 'errored';
442+
443+
const e = reader._ownerReadableStream._storedError;
444+
reader._storedError = e;
445+
reader._closedPromise_reject(e);
446+
447+
for (const { _reject } of reader._readRequests) {
448+
_reject(e);
449+
}
450+
} else {
451+
reader._state = 'closed';
452+
reader._closedPromise_resolve(undefined);
453+
454+
for (const { _resolve } of reader._readRequests) {
455+
_resolve(CreateIterResultObject(undefined, true));
456+
}
437457
}
438-
reader._readRequests = [];
439458

459+
reader._readRequests = [];
440460
reader._ownerReadableStream._reader = undefined;
441461
reader._ownerReadableStream = undefined;
442-
reader._closedPromise_resolve(undefined);
443462
}
444463

445464
function ShouldReadableStreamApplyBackpressure(stream) {

‎reference-implementation/test/readable-stream-reader.js

+34
Original file line numberDiff line numberDiff line change
@@ -198,3 +198,37 @@ test('cancel() on a released reader is a no-op and does not pass through', t =>
198198

199199
setTimeout(() => t.end(), 50);
200200
});
201+
202+
test('Getting a second reader after erroring the stream should succeed', t => {
203+
t.plan(5);
204+
205+
let doError;
206+
const theError = new Error('bad');
207+
const rs = new ReadableStream({
208+
start(enqueue, close, error) {
209+
doError = error;
210+
}
211+
});
212+
213+
const reader1 = rs.getReader();
214+
215+
reader1.closed.catch(e => {
216+
t.equal(e, theError, 'the first reader closed getter should be rejected with the error');
217+
});
218+
219+
reader1.read().catch(e => {
220+
t.equal(e, theError, 'the first reader read() should be rejected with the error');
221+
});
222+
223+
t.throws(() => rs.getReader(), /TypeError/, 'trying to get another reader before erroring should throw');
224+
225+
doError(theError);
226+
227+
rs.getReader().closed.catch(e => {
228+
t.equal(e, theError, 'the second reader closed getter should be rejected with the error');
229+
});
230+
231+
rs.getReader().read().catch(e => {
232+
t.equal(e, theError, 'the third reader read() should be rejected with the error');
233+
});
234+
});

‎reference-implementation/test/templated/readable-stream-closed.js

+13-2
Original file line numberDiff line numberDiff line change
@@ -47,10 +47,21 @@ export default (label, factory) => {
4747
startPromise.then(() => {
4848
t.equal(ws.state, 'writable', 'writable stream should start in writable state');
4949

50-
rs.pipeTo(ws).then(() => {
50+
return rs.pipeTo(ws).then(() => {
5151
t.pass('pipeTo promise should be fulfilled');
5252
t.equal(ws.state, 'closed', 'writable stream should become closed');
5353
});
54-
});
54+
})
55+
.catch(e => t.error(e));
56+
});
57+
58+
test('should be able to acquire multiple readers, since they are all auto-released', t => {
59+
const rs = factory();
60+
61+
rs.getReader();
62+
63+
t.doesNotThrow(() => rs.getReader(), 'getting a second reader should not throw');
64+
t.doesNotThrow(() => rs.getReader(), 'getting a third reader should not throw');
65+
t.end();
5566
});
5667
};

‎reference-implementation/test/templated/readable-stream-errored-sync-only.js

+23
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,27 @@ export default (label, factory, error) => {
1616
cancelPromise2.catch(e => t.equal(e, error, 'second cancel() call should reject with the error'));
1717
t.notEqual(cancelPromise1, cancelPromise2, 'cancel() calls should return distinct promises');
1818
});
19+
20+
test('reader cancel() should return a distinct rejected promise each time', t => {
21+
t.plan(3);
22+
const rs = factory();
23+
const reader = rs.getReader();
24+
25+
const cancelPromise1 = reader.cancel();
26+
const cancelPromise2 = reader.cancel();
27+
28+
cancelPromise1.catch(e => t.equal(e, error, 'first cancel() call should reject with the error'));
29+
cancelPromise2.catch(e => t.equal(e, error, 'second cancel() call should reject with the error'));
30+
t.notEqual(cancelPromise1, cancelPromise2, 'cancel() calls should return distinct promises');
31+
});
32+
33+
test('should be able to acquire multiple readers, since they are all auto-released', t => {
34+
const rs = factory();
35+
36+
rs.getReader();
37+
38+
t.doesNotThrow(() => rs.getReader(), 'getting a second reader should not throw');
39+
t.doesNotThrow(() => rs.getReader(), 'getting a third reader should not throw');
40+
t.end();
41+
});
1942
};

0 commit comments

Comments
 (0)
Please sign in to comment.