@@ -497,6 +497,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
497
497
var ondrain = pipeOnDrain ( src ) ;
498
498
dest . on ( 'drain' , ondrain ) ;
499
499
500
+ var cleanedUp = false ;
500
501
function cleanup ( ) {
501
502
debug ( 'cleanup' ) ;
502
503
// cleanup event handlers once the pipe is broken
@@ -509,6 +510,8 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
509
510
src . removeListener ( 'end' , cleanup ) ;
510
511
src . removeListener ( 'data' , ondata ) ;
511
512
513
+ cleanedUp = true ;
514
+
512
515
// if the reader is waiting for a drain event from this
513
516
// specific writer, then it would cause it to never start
514
517
// flowing again.
@@ -524,9 +527,16 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
524
527
debug ( 'ondata' ) ;
525
528
var ret = dest . write ( chunk ) ;
526
529
if ( false === ret ) {
527
- debug ( 'false write response, pause' ,
528
- src . _readableState . awaitDrain ) ;
529
- src . _readableState . awaitDrain ++ ;
530
+ // If the user unpiped during `dest.write()`, it is possible
531
+ // to get stuck in a permanently paused state if that write
532
+ // also returned false.
533
+ if ( state . pipesCount === 1 &&
534
+ state . pipes [ 0 ] === dest &&
535
+ src . listenerCount ( 'data' ) === 1 &&
536
+ ! cleanedUp ) {
537
+ debug ( 'false write response, pause' , src . _readableState . awaitDrain ) ;
538
+ src . _readableState . awaitDrain ++ ;
539
+ }
530
540
src . pause ( ) ;
531
541
}
532
542
}
0 commit comments