diff --git a/historyKeeper.js b/historyKeeper.js index 07a555e5f..b84bb8bc3 100644 --- a/historyKeeper.js +++ b/historyKeeper.js @@ -8,6 +8,7 @@ const Crypto = require('crypto'); const Once = require("./lib/once"); const Meta = require("./lib/metadata"); const WriteQueue = require("./lib/write-queue"); +const BatchRead = require("./lib/batch-read"); let Log; const now = function () { return (new Date()).getTime(); }; @@ -231,7 +232,7 @@ module.exports.create = function (cfg) { as an added bonus: if the channel exists but its index does not then it caches the index */ - const indexQueues = {}; + const batchIndexReads = BatchRead(); const getIndex = (ctx, channelName, cb) => { const chan = ctx.channels[channelName]; // if there is a channel in memory and it has an index cached, return it @@ -242,40 +243,14 @@ module.exports.create = function (cfg) { }); } - // if a call to computeIndex is already in progress for this channel - // then add the callback for the latest invocation to the queue - // and wait for it to complete - if (Array.isArray(indexQueues[channelName])) { - indexQueues[channelName].push(cb); - return; - } - - // otherwise, make a queue for any 'getIndex' calls made before the following 'computeIndex' call completes - var queue = indexQueues[channelName] = (indexQueues[channelName] || [cb]); - - computeIndex(channelName, (err, ret) => { - if (!Array.isArray(queue)) { - // something is very wrong if there's no callback array - return void Log.error("E_INDEX_NO_CALLBACK", channelName); - } - - - // clean up the queue that you're about to handle, but keep a local copy - delete indexQueues[channelName]; - - // this is most likely an unrecoverable filesystem error - if (err) { - // call back every pending function with the error - return void queue.forEach(function (_cb) { - _cb(err); - }); - } - // cache the computed result if possible - if (chan) { chan.index = ret; } - - // call back every pending function with the result - queue.forEach(function (_cb) { - _cb(void 0, ret); + batchIndexReads(channelName, cb, function (done) { + computeIndex(channelName, (err, ret) => { + // this is most likely an unrecoverable filesystem error + if (err) { return void done(err); } + // cache the computed result if possible + if (chan) { chan.index = ret; } + // return + done(void 0, ret); }); }); }; diff --git a/lib/batch-read.js b/lib/batch-read.js new file mode 100644 index 000000000..66f106c54 --- /dev/null +++ b/lib/batch-read.js @@ -0,0 +1,58 @@ +/* + +## Purpose + +To avoid running expensive IO or computation concurrently. + +If the result of IO or computation is requested while an identical request +is already in progress, wait until the first one completes and provide its +result to every routine that requested it. + +## Usage + +Provide: + +1. a named key for the computation or resource, +2. a callback to handle the result +3. an implementation which calls back with the result + +``` +var batch = Batch(); + +var read = function (path, cb) { + batch(path, cb, function (done) { + console.log("reading %s", path); + fs.readFile(path, 'utf8', done); + }); +}; + +read('./pewpew.txt', function (err, data) { + if (err) { return void console.error(err); } + console.log(data); +}); + +read('./pewpew.txt', function (err, data) { + if (err) { return void console.error(err); } + console.log(data); +}); +``` + +*/ + +module.exports = function () { + var map = {}; + return function (id, cb, impl) { + if (typeof(cb) !== 'function' || typeof(impl) !== 'function') { + throw new Error("expected callback and implementation"); + } + if (map[id]) { return void map[id].push(cb); } + map[id] = [cb]; + impl(function () { + var args = Array.prototype.slice.call(arguments); + map[id].forEach(function (h) { + h.apply(null, args); + }); + delete map[id]; + }); + }; +};