Skip to content

Commit f2ffaba

Browse files
helloyou2012targos
authored andcommitted
stream: the position of _read() is wrong
Fixes: #33940 PR-URL: #38292 Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Robert Nagy <[email protected]> Reviewed-By: Benjamin Gruenbaum <[email protected]>
1 parent 16eb078 commit f2ffaba

File tree

2 files changed

+73
-4
lines changed

2 files changed

+73
-4
lines changed

lib/internal/fs/streams.js

+4-4
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,10 @@ ReadStream.prototype._read = function(n) {
255255
if (er) {
256256
errorOrDestroy(this, er);
257257
} else if (bytesRead > 0) {
258+
if (this.pos !== undefined) {
259+
this.pos += bytesRead;
260+
}
261+
258262
this.bytesRead += bytesRead;
259263

260264
if (bytesRead !== buf.length) {
@@ -271,10 +275,6 @@ ReadStream.prototype._read = function(n) {
271275
this.push(null);
272276
}
273277
});
274-
275-
if (this.pos !== undefined) {
276-
this.pos += n;
277-
}
278278
};
279279

280280
ReadStream.prototype._destroy = function(err, cb) {
+69
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
'use strict';
2+
3+
// Refs: https://github.com/nodejs/node/issues/33940
4+
5+
const common = require('../common');
6+
const tmpdir = require('../common/tmpdir');
7+
const fs = require('fs');
8+
const assert = require('assert');
9+
const path = require('path');
10+
11+
tmpdir.refresh();
12+
13+
const file = path.join(tmpdir.path, '/read_stream_pos_test.txt');
14+
15+
fs.writeFileSync(file, '');
16+
17+
let counter = 0;
18+
19+
setInterval(() => {
20+
counter = counter + 1;
21+
const line = `hello at ${counter}\n`;
22+
fs.writeFileSync(file, line, { flag: 'a' });
23+
}, 1);
24+
25+
const hwm = 10;
26+
let bufs = [];
27+
let isLow = false;
28+
let cur = 0;
29+
let stream;
30+
31+
setInterval(() => {
32+
if (stream) return;
33+
34+
stream = fs.createReadStream(file, {
35+
highWaterMark: hwm,
36+
start: cur
37+
});
38+
stream.on('data', common.mustCallAtLeast((chunk) => {
39+
cur += chunk.length;
40+
bufs.push(chunk);
41+
if (isLow) {
42+
const brokenLines = Buffer.concat(bufs).toString()
43+
.split('\n')
44+
.filter((line) => {
45+
const s = 'hello at'.slice(0, line.length);
46+
if (line && !line.startsWith(s)) {
47+
return true;
48+
}
49+
return false;
50+
});
51+
assert.strictEqual(brokenLines.length, 0);
52+
process.exit();
53+
return;
54+
}
55+
if (chunk.length !== hwm) {
56+
isLow = true;
57+
}
58+
}));
59+
stream.on('end', () => {
60+
stream = null;
61+
isLow = false;
62+
bufs = [];
63+
});
64+
}, 10);
65+
66+
// Time longer than 90 seconds to exit safely
67+
setTimeout(() => {
68+
process.exit();
69+
}, 90000);

0 commit comments

Comments
 (0)