|
3 | 3 | const common = require('../common');
|
4 | 4 | const assert = require('assert');
|
5 | 5 | const { Duplex, Readable, Writable, pipeline } = require('stream');
|
| 6 | +const { Blob } = require('buffer'); |
6 | 7 |
|
7 | 8 | {
|
8 | 9 | const d = Duplex.from({
|
@@ -144,3 +145,136 @@ const { Duplex, Readable, Writable, pipeline } = require('stream');
|
144 | 145 | common.mustCall(() => {}),
|
145 | 146 | );
|
146 | 147 | }
|
| 148 | + |
| 149 | +// Ensure that isDuplexNodeStream was called |
| 150 | +{ |
| 151 | + const duplex = new Duplex(); |
| 152 | + assert.strictEqual(Duplex.from(duplex), duplex); |
| 153 | +} |
| 154 | + |
| 155 | +// Ensure that Duplex.from works for blobs |
| 156 | +{ |
| 157 | + const blob = new Blob(['blob']); |
| 158 | + const expecteByteLength = blob.size; |
| 159 | + const duplex = Duplex.from(blob); |
| 160 | + duplex.on('data', common.mustCall((arrayBuffer) => { |
| 161 | + assert.strictEqual(arrayBuffer.byteLength, expecteByteLength); |
| 162 | + })); |
| 163 | +} |
| 164 | + |
| 165 | +// Ensure that given a promise rejection it emits an error |
| 166 | +{ |
| 167 | + const myErrorMessage = 'myCustomError'; |
| 168 | + Duplex.from(Promise.reject(myErrorMessage)) |
| 169 | + .on('error', common.mustCall((error) => { |
| 170 | + assert.strictEqual(error, myErrorMessage); |
| 171 | + })); |
| 172 | +} |
| 173 | + |
| 174 | +// Ensure that given a promise rejection on an async function it emits an error |
| 175 | +{ |
| 176 | + const myErrorMessage = 'myCustomError'; |
| 177 | + async function asyncFn() { |
| 178 | + return Promise.reject(myErrorMessage); |
| 179 | + } |
| 180 | + |
| 181 | + Duplex.from(asyncFn) |
| 182 | + .on('error', common.mustCall((error) => { |
| 183 | + assert.strictEqual(error, myErrorMessage); |
| 184 | + })); |
| 185 | +} |
| 186 | + |
| 187 | +// Ensure that Duplex.from throws an Invalid return value when function is void |
| 188 | +{ |
| 189 | + assert.throws(() => Duplex.from(() => {}), { |
| 190 | + code: 'ERR_INVALID_RETURN_VALUE', |
| 191 | + }); |
| 192 | +} |
| 193 | + |
| 194 | +// Ensure data if a sub object has a readable stream it's duplexified |
| 195 | +{ |
| 196 | + const msg = Buffer.from('hello'); |
| 197 | + const duplex = Duplex.from({ |
| 198 | + readable: Readable({ |
| 199 | + read() { |
| 200 | + this.push(msg); |
| 201 | + this.push(null); |
| 202 | + } |
| 203 | + }) |
| 204 | + }).on('data', common.mustCall((data) => { |
| 205 | + assert.strictEqual(data, msg); |
| 206 | + })); |
| 207 | + |
| 208 | + assert.strictEqual(duplex.writable, false); |
| 209 | +} |
| 210 | + |
| 211 | +// Ensure data if a sub object has a writable stream it's duplexified |
| 212 | +{ |
| 213 | + const msg = Buffer.from('hello'); |
| 214 | + const duplex = Duplex.from({ |
| 215 | + writable: Writable({ |
| 216 | + write: common.mustCall((data) => { |
| 217 | + assert.strictEqual(data, msg); |
| 218 | + }) |
| 219 | + }) |
| 220 | + }); |
| 221 | + |
| 222 | + duplex.write(msg); |
| 223 | + assert.strictEqual(duplex.readable, false); |
| 224 | +} |
| 225 | + |
| 226 | +// Ensure data if a sub object has a writable and readable stream it's duplexified |
| 227 | +{ |
| 228 | + const msg = Buffer.from('hello'); |
| 229 | + |
| 230 | + const duplex = Duplex.from({ |
| 231 | + readable: Readable({ |
| 232 | + read() { |
| 233 | + this.push(msg); |
| 234 | + this.push(null); |
| 235 | + } |
| 236 | + }), |
| 237 | + writable: Writable({ |
| 238 | + write: common.mustCall((data) => { |
| 239 | + assert.strictEqual(data, msg); |
| 240 | + }) |
| 241 | + }) |
| 242 | + }); |
| 243 | + |
| 244 | + duplex.pipe(duplex) |
| 245 | + .on('data', common.mustCall((data) => { |
| 246 | + assert.strictEqual(data, msg); |
| 247 | + assert.strictEqual(duplex.readable, true); |
| 248 | + assert.strictEqual(duplex.writable, true); |
| 249 | + })) |
| 250 | + .on('end', common.mustCall()); |
| 251 | +} |
| 252 | + |
| 253 | +// Ensure that given readable stream that throws an error it calls destroy |
| 254 | +{ |
| 255 | + const myErrorMessage = 'error!'; |
| 256 | + const duplex = Duplex.from(Readable({ |
| 257 | + read() { |
| 258 | + throw new Error(myErrorMessage); |
| 259 | + } |
| 260 | + })); |
| 261 | + duplex.on('error', common.mustCall((msg) => { |
| 262 | + assert.strictEqual(msg.message, myErrorMessage); |
| 263 | + })); |
| 264 | +} |
| 265 | + |
| 266 | +// Ensure that given writable stream that throws an error it calls destroy |
| 267 | +{ |
| 268 | + const myErrorMessage = 'error!'; |
| 269 | + const duplex = Duplex.from(Writable({ |
| 270 | + write(chunk, enc, cb) { |
| 271 | + cb(myErrorMessage); |
| 272 | + } |
| 273 | + })); |
| 274 | + |
| 275 | + duplex.on('error', common.mustCall((msg) => { |
| 276 | + assert.strictEqual(msg, myErrorMessage); |
| 277 | + })); |
| 278 | + |
| 279 | + duplex.write('test'); |
| 280 | +} |
0 commit comments