-
Notifications
You must be signed in to change notification settings - Fork 1
Tutorial
- Basic usage
- Parallel calls
- Parallel forEach and map
- Concurrency limit
- Exception handling
- Extending prototypes
- Write your own async function
Just wrap the asynchronous function you are going to use with flow.wrap
, and use it in a flow block like shown below.
var flow = require("asyncflow");
var fs = require("fs");
var exists = flow.wrap(fs.exists);
flow(function() {
if (exists("myfile.txt").wait()) {
// continue here...
}
});
Within the flow block, everything runs normally except when you call the wrapped functions. When you call a wrapped function, it will execute asynchronously and return a handle with which you can call wait
, to "actually" wait for the asynchronous function to finish and return. However notice that during the wait, Node.js event loop is not blocked, and other codes in the event loop may actually run while we wait here.
You can also wrap an entire module at once, but bear in mind that this facility is just for convenience, not all functions in a module can/should be used with asyncflow.
var flow = require("asyncflow");
var fs = require("fs");
var fsw = flow.wrap(fs);
flow(function() {
if (fsw.exists("myfile.txt").wait()) {
var content = fsw.readFile("myfile.txt").wait();
// continue here...
}
});
var flow = require("asyncflow");
var fs = require("fs");
var readdir = flow.wrap(fs.readdir);
flow(function() {
var dirA = readdir("dir_a");
var dirB = readdir("dir_b");
var files = dirA.wait().concat(dirB.wait());
// continue here...
});
Here the two readdir
s started running asynchronously when they are called, and they're been waited for when their respective handle's wait
is called. In short, parallelism*. :)
* Just to be technically correct, this should really be called concurrency, and not parallelism. Because Node.js is single threaded, so it's impossible to have two piece of code running at the same time.
You can also use flow.forEach
and flow.map
. They are intended to be used outside of a flow block.
var flow = require("asyncflow");
var fs = require("fs");
var stat = flow.wrap(function(file, i, a, cb) {
fs.stat(file, cb);
});
var files = ["file_a.txt", "file_b.txt", "file_c.txt"];
flow.map(files, stat, function(err, stats) {
// use stats here...
});
If you want to use forEach and map inside of a flow block, you can wrap them first with flow.wrap
just as with other async functions, or you can use the already wrapped flow.wrapped.forEach
and flow.wrapped.map
.
var flow = require("asyncflow");
var fs = require("fs");
var stat = flow.wrap(function(file, i, a, cb) {
fs.stat(file, cb);
});
flow(function() {
var files = ["file_a.txt", "file_b.txt", "file_c.txt"];
var stats = flow.wrapped.map(files, stat).wait();
// use stats here...
});
Notice that you are forced to specify full arguments in function(file, i, a, cb). This is because functions like Array.forEach
and Array.map
provide 3 arguments by default, and we have to add another callback as the last argument of the function, so it can be asynchronous. However, it's cumbersome to write these arguments, so we provided you a special version of flow.wrap
called flow.cwrap
, "cwrap" standing for "custom wrap". With it, you can control how many arguments are passed to the function being wrapped. For example, we can simplify the above source code with flow.cwrap
, like this:
var flow = require("asyncflow");
var fs = require("fs");
flow(function() {
var files = ["file_a.txt", "file_b.txt", "file_c.txt"];
var stats = flow.wrapped.map(files, flow.cwrap(fs.stat)).wait();
// use stats here...
});
This is possible because flow.cwrap
's second argument n
, is default to 1, meaning that only the first argument (file) is passed to fs.stat
. Notice that we also eliminated the need to add wrapper function around fs.stat
just to accommodate the number of arguments.
Did you know that both parallel calls and parallel forEach/maps have concurrency limit support built-in?
Why we need to limit concurrency you ask? Well, certain system resources will exhaust if we don't limit concurrency, for example, the number of open files, sockets, etc. Consider the following code:
var flow = require("asyncflow");
var fs = require("fs");
var readFile = flow.wrap(fs.readFile);
var fileLength = flow.wrap(function(file, i, a, cb) {
flow(function() {
cb(null, readFile(file).wait().length);
});
});
flow(function() {
var allDiskFiles = getAllDiskFiles();
var totalSize = flow.wrapped.map(allDiskFiles, fileLength).wait().
reduce(function(x, y) { return x + y });
// use totalSize here...
});
The problem with the above code is that there are too many files opening at the same time, because all files are being read at once (even though asynchronously). This will, at some point, exhaust the file system's open file handles, and when that happens, the file system will interrupt you with an exception and you won't be able to open more files.
By applying a concurrency limit, we can mitigate this resource exhaustion problem, like this:
var flow = require("asyncflow");
var fs = require("fs");
var readFile = flow.wrap(fs.readFile);
var fileLength = flow.wrap(function(file, i, a, cb) {
flow(function() {
cb(null, readFile(file).wait().length);
});
});
flow(function() {
var allDiskFiles = getAllDiskFiles();
var totalSize = flow.wrapped.map(allDiskFiles, 100, fileLength).wait().
reduce(function(x, y) { return x + y });
// use totalSize here...
});
Notice that all we needed to do is to put a number in flow.wrapped.map
to indicate an upper limit of concurrent calls.
ALERT: Don't write code like the above one in real world, this implementation is very resource inefficient.
Flow blocks can have an upper limit too, like this:
var flow = require("asyncflow");
var fs = require("fs");
var readFile = flow.wrap(fs.readFile);
flow(2, function() {
var file1 = readFile("file_a.txt");
var file2 = readFile("file_b.txt");
var file3 = readFile("file_c.txt");
// use file1, file2 and file3 here...
});
So within this flow block, only a max of 2 readFile
s are running at any given time.
Exceptions within a flow block can be caught as usual.
var flow = require("asyncflow");
var fs = require("fs");
var readFile = flow.wrap(fs.readFile);
flow(function() {
try {
var content = readFile("non-existent").wait();
} catch (e) {
console.error(e);
}
});
An exception not caught within a flow block is thrown back to the calling context, but because a flow block is an asynchronous call, these exceptions are actually uncaught exceptions, hence dangerous. You may try to use Node.js' built-in domain
module to contain them.
For example, the following code won't work:
var flow = require("asyncflow");
var fs = require("fs");
var readFile = flow.wrap(fs.readFile);
// ATTENTION, THIS WILL NOT WORK.
try {
flow(function() {
var content = readFile("NON-EXISTENT").wait();
});
} catch (e) {
console.error(e);
}
// ATTENTION, ABOVE CODE WILL NOT WORK.
The rule of thumb is that exceptions should not be thrown in any async call. This applies to Node.js in general as well as to asyncflow.
So far, we have avoided extending built-in objects' prototype to achieve a more object oriented feel. But if you think that extending built-in objects' prototypes is not a bad idea for your project, you may try to take advantage of flow.extension
members.
var flow = require("asyncflow");
var fs = require("fs");
Function.prototype.wrap = flow.extension.wrap;
Array.prototype.pmap = flow.extension.collection("map");
var readFile = fs.readFile.wrap();
flow(function() {
var allDiskFiles = getAllDiskFiles();
var totalSize = allDiskFiles.pmap(function(file, i, a, cb) {
flow(function() {
cb(null, readFile(file).wait().length);
});
}.wrap()).wait().reduce(function(x, y) { return x + y });
// use totalSize here...
});
Refer to the API for more extension facilities.
Remember that a flow block is asynchronous call, but everthing within it is synchronous (by using wrapped version of functions). So you can use a flow block to help you implement an asynchronous function in a synchronous way.
var flow = require("asyncflow");
var fs = require("fs");
var writeFile = flow.wrap(fs.writeFile);
function writeRecord(record, file, callback) {
flow(function() {
try {
writeFile(file, record).wait();
callback(null);
} catch (e) {
callback(e);
}
});
}
var someRecord = getSomeRecord();
writeRecord(someRecord, "some_file", function(err) {
if (!err) console.log("Write OK");
});
And this writeRecord
async function we just wrote, is also wrappable for use in another flow block.
var writeRecord$ = flow.wrap(writeRecord);
flow(function() {
var someRecord = getSomeRecord();
try {
writeRecord$().wait();
} catch (e) {
console.error(e);
}
});