1
- // Copyright Joyent, Inc. and other Node contributors.
2
- //
3
- // Permission is hereby granted, free of charge, to any person obtaining a
4
- // copy of this software and associated documentation files (the
5
- // "Software"), to deal in the Software without restriction, including
6
- // without limitation the rights to use, copy, modify, merge, publish,
7
- // distribute, sublicense, and/or sell copies of the Software, and to permit
8
- // persons to whom the Software is furnished to do so, subject to the
9
- // following conditions:
10
- //
11
- // The above copyright notice and this permission notice shall be included
12
- // in all copies or substantial portions of the Software.
13
- //
14
- // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15
- // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
16
- // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
17
- // NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
18
- // DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
19
- // OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
20
- // USE OR OTHER DEALINGS IN THE SOFTWARE.
1
+ 'use strict' ;
21
2
22
3
module . exports = Readable ;
23
4
@@ -67,10 +48,17 @@ function ReadableState(options, stream) {
67
48
68
49
options = options || { } ;
69
50
51
+ // object stream flag. Used to make read(n) ignore n and to
52
+ // make all the buffer merging and length checks go away
53
+ this . objectMode = ! ! options . objectMode ;
54
+
55
+ if ( stream instanceof Duplex )
56
+ this . objectMode = this . objectMode || ! ! options . readableObjectMode ;
57
+
70
58
// the point at which it stops calling _read() to fill the buffer
71
59
// Note: 0 is a valid value, means "don't call _read preemptively ever"
72
60
var hwm = options . highWaterMark ;
73
- var defaultHwm = options . objectMode ? 16 : 16 * 1024 ;
61
+ var defaultHwm = this . objectMode ? 16 : 16 * 1024 ;
74
62
this . highWaterMark = ( hwm || hwm === 0 ) ? hwm : defaultHwm ;
75
63
76
64
// cast to ints.
@@ -97,14 +85,6 @@ function ReadableState(options, stream) {
97
85
this . emittedReadable = false ;
98
86
this . readableListening = false ;
99
87
100
-
101
- // object stream flag. Used to make read(n) ignore n and to
102
- // make all the buffer merging and length checks go away
103
- this . objectMode = ! ! options . objectMode ;
104
-
105
- if ( stream instanceof Duplex )
106
- this . objectMode = this . objectMode || ! ! options . readableObjectMode ;
107
-
108
88
// Crypto is kind of old and crusty. Historically, its default string
109
89
// encoding is 'binary' so we have to make this configurable.
110
90
// Everything else in the universe uses 'utf8', though.
@@ -168,11 +148,15 @@ Readable.prototype.unshift = function(chunk) {
168
148
return readableAddChunk ( this , state , chunk , '' , true ) ;
169
149
} ;
170
150
151
+ Readable . prototype . isPaused = function ( ) {
152
+ return this . _readableState . flowing === false ;
153
+ } ;
154
+
171
155
function readableAddChunk ( stream , state , chunk , encoding , addToFront ) {
172
156
var er = chunkInvalid ( state , chunk ) ;
173
157
if ( er ) {
174
158
stream . emit ( 'error' , er ) ;
175
- } else if ( util . isNullOrUndefined ( chunk ) ) {
159
+ } else if ( chunk === null ) {
176
160
state . reading = false ;
177
161
if ( ! state . ended )
178
162
onEofChunk ( stream , state ) ;
@@ -261,7 +245,7 @@ function howMuchToRead(n, state) {
261
245
if ( state . objectMode )
262
246
return n === 0 ? 0 : 1 ;
263
247
264
- if ( isNaN ( n ) || util . isNull ( n ) ) {
248
+ if ( util . isNull ( n ) || isNaN ( n ) ) {
265
249
// only flow one buffer at a time
266
250
if ( state . flowing && state . buffer . length )
267
251
return state . buffer [ 0 ] . length ;
@@ -737,10 +721,6 @@ Readable.prototype.resume = function() {
737
721
if ( ! state . flowing ) {
738
722
debug ( 'resume' ) ;
739
723
state . flowing = true ;
740
- if ( ! state . reading ) {
741
- debug ( 'resume read 0' ) ;
742
- this . read ( 0 ) ;
743
- }
744
724
resume ( this , state ) ;
745
725
}
746
726
return this ;
@@ -756,6 +736,11 @@ function resume(stream, state) {
756
736
}
757
737
758
738
function resume_ ( stream , state ) {
739
+ if ( ! state . reading ) {
740
+ debug ( 'resume read 0' ) ;
741
+ stream . read ( 0 ) ;
742
+ }
743
+
759
744
state . resumeScheduled = false ;
760
745
stream . emit ( 'resume' ) ;
761
746
flow ( stream ) ;
@@ -806,7 +791,12 @@ Readable.prototype.wrap = function(stream) {
806
791
debug ( 'wrapped data' ) ;
807
792
if ( state . decoder )
808
793
chunk = state . decoder . write ( chunk ) ;
809
- if ( ! chunk || ! state . objectMode && ! chunk . length )
794
+
795
+ // don't skip over falsy values in objectMode
796
+ //if (state.objectMode && util.isNullOrUndefined(chunk))
797
+ if ( state . objectMode && ( chunk === null || chunk === undefined ) )
798
+ return ;
799
+ else if ( ! state . objectMode && ( ! chunk || ! chunk . length ) )
810
800
return ;
811
801
812
802
var ret = self . push ( chunk ) ;
0 commit comments