implement shared environment between historyKeeper and RPC

This commit is contained in:
ansuz 2020-02-17 09:01:10 -05:00
parent 1fc8c1de16
commit f86196e40a
4 changed files with 114 additions and 122 deletions

View File

@ -23,7 +23,7 @@ Channel.clearOwnedChannel = function (Env, safeKey, channelId, cb, Server) {
if (e) { return void cb(e); }
cb();
const channel_cache = Env.historyKeeper.channel_cache;
const channel_cache = Env.channel_cache;
const clear = function () {
// delete the channel cache because it will have been invalidated
@ -117,8 +117,8 @@ Channel.removeOwnedChannel = function (Env, safeKey, channelId, cb, Server) {
}
cb(void 0, 'OK');
const channel_cache = Env.historyKeeper.channel_cache;
const metadata_cache = Env.historyKeeper.metadata_cache;
const channel_cache = Env.channel_cache;
const metadata_cache = Env.metadata_cache;
const clear = function () {
delete channel_cache[channelId];
@ -187,8 +187,8 @@ Channel.trimHistory = function (Env, safeKey, data, cb) {
// clear historyKeeper's cache for this channel
Env.historyKeeper.channelClose(channelId);
cb(void 0, 'OK');
delete Env.historyKeeper.channel_cache[channelId];
delete Env.historyKeeper.metadata_cache[channelId];
delete Env.channel_cache[channelId];
delete Env.metadata_cache[channelId];
});
});
};

View File

@ -111,8 +111,8 @@ Data.setMetadata = function (Env, safeKey, data, cb, Server) {
cb(void 0, metadata);
next();
const metadata_cache = Env.historyKeeper.metadata_cache;
const channel_cache = Env.historyKeeper.channel_cache;
const metadata_cache = Env.metadata_cache;
const channel_cache = Env.channel_cache;
metadata_cache[channel] = metadata;

View File

@ -7,8 +7,20 @@ const BatchRead = require("./batch-read");
const RPC = require("./rpc");
const HK = require("./hk-util.js");
const Store = require("./storage/file");
const BlobStore = require("./storage/blob");
module.exports.create = function (config, cb) {
const Log = config.log;
var WARN = function (e, output) {
if (e && output) {
Log.warn(e, {
output: output,
message: String(e),
stack: new Error(e).stack,
});
}
};
Log.silly('HK_LOADING', 'LOADING HISTORY_KEEPER MODULE');
@ -25,9 +37,63 @@ module.exports.create = function (config, cb) {
channel_cache: {},
queueStorage: WriteQueue(),
batchIndexReads: BatchRead("HK_GET_INDEX"),
//historyKeeper: config.historyKeeper,
intervals: config.intervals || {},
maxUploadSize: config.maxUploadSize || (20 * 1024 * 1024),
Sessions: {},
paths: {},
//msgStore: config.store,
pinStore: undefined,
pinnedPads: {},
pinsLoaded: false,
pendingPinInquiries: {},
pendingUnpins: {},
pinWorkers: 5,
limits: {},
admins: [],
Log: Log,
WARN: WARN,
flushCache: config.flushCache,
adminEmail: config.adminEmail,
allowSubscriptions: config.allowSubscriptions,
myDomain: config.myDomain,
mySubdomain: config.mySubdomain,
customLimits: config.customLimits,
// FIXME this attribute isn't in the default conf
// but it is referenced in Quota
domain: config.domain
};
config.historyKeeper = {
var paths = Env.paths;
var keyOrDefaultString = function (key, def) {
return typeof(config[key]) === 'string'? config[key]: def;
};
var pinPath = paths.pin = keyOrDefaultString('pinPath', './pins');
paths.block = keyOrDefaultString('blockPath', './block');
paths.data = keyOrDefaultString('filePath', './datastore');
paths.staging = keyOrDefaultString('blobStagingPath', './blobstage');
paths.blob = keyOrDefaultString('blobPath', './blob');
Env.defaultStorageLimit = typeof(config.defaultStorageLimit) === 'number' && config.defaultStorageLimit > 0?
config.defaultStorageLimit:
Core.DEFAULT_LIMIT;
try {
Env.admins = (config.adminKeys || []).map(function (k) {
k = k.replace(/\/+$/, '');
var s = k.split('/');
return s[s.length-1];
});
} catch (e) {
console.error("Can't parse admin keys. Please update or fix your config.js file!");
}
config.historyKeeper = Env.historyKeeper = {
metadata_cache: Env.metadata_cache,
channel_cache: Env.channel_cache,
@ -63,11 +129,34 @@ module.exports.create = function (config, cb) {
Log.verbose('HK_ID', 'History keeper ID: ' + Env.id);
nThen(function (w) {
require('./storage/file').create(config, w(function (_store) {
// create a pin store
Store.create({
filePath: pinPath,
}, w(function (s) {
Env.pinStore = s;
}));
// create a channel store
Store.create(config, w(function (_store) {
config.store = _store;
Env.store = _store;
Env.msgStore = _store; // API used by rpc
Env.store = _store; // API used by historyKeeper
}));
// create a blob store
BlobStore.create({
blobPath: config.blobPath,
blobStagingPath: config.blobStagingPath,
archivePath: config.archivePath,
getSession: function (safeKey) {
return Core.getSession(Sessions, safeKey);
},
}, w(function (err, blob) {
if (err) { throw new Error(err); }
Env.blobStore = blob;
}));
}).nThen(function (w) {
// create a task store
require("./storage/tasks").create(config, w(function (e, tasks) {
if (e) {
throw e;
@ -87,7 +176,7 @@ module.exports.create = function (config, cb) {
}, 1000 * 60 * 5); // run every five minutes
}));
}).nThen(function () {
RPC.create(config, function (err, _rpc) {
RPC.create(Env, function (err, _rpc) {
if (err) { throw err; }
Env.rpc = _rpc;

View File

@ -1,6 +1,4 @@
/*jshint esversion: 6 */
const nThen = require("nthen");
const Util = require("./common-util");
const Core = require("./commands/core");
@ -14,9 +12,6 @@ const Upload = require("./commands/upload");
var RPC = module.exports;
const Store = require("./storage/file");
const BlobStore = require("./storage/blob");
const UNAUTHENTICATED_CALLS = {
GET_FILE_SIZE: Pinning.getFileSize,
GET_MULTIPLE_FILE_SIZE: Pinning.getMultipleFileSize,
@ -187,86 +182,12 @@ var rpc = function (Env, Server, data, respond) {
return void respond("INVALID_RPC_CALL");
};
RPC.create = function (config, cb) {
var Log = config.log;
// load pin-store...
Log.silly('LOADING RPC MODULE');
var keyOrDefaultString = function (key, def) {
return typeof(config[key]) === 'string'? config[key]: def;
};
var WARN = function (e, output) {
if (e && output) {
Log.warn(e, {
output: output,
message: String(e),
stack: new Error(e).stack,
});
}
};
if (typeof(config.domain) !== 'undefined') {
throw new Error('fuck');
}
var Env = {
historyKeeper: config.historyKeeper,
intervals: config.intervals || {},
maxUploadSize: config.maxUploadSize || (20 * 1024 * 1024),
Sessions: {},
paths: {},
msgStore: config.store,
pinStore: undefined,
pinnedPads: {},
pinsLoaded: false,
pendingPinInquiries: {},
pendingUnpins: {},
pinWorkers: 5,
limits: {},
admins: [],
Log: Log,
WARN: WARN,
flushCache: config.flushCache,
adminEmail: config.adminEmail,
allowSubscriptions: config.allowSubscriptions,
myDomain: config.myDomain,
mySubdomain: config.mySubdomain,
customLimits: config.customLimits,
// FIXME this attribute isn't in the default conf
// but it is referenced in Quota
domain: config.domain
};
Env.defaultStorageLimit = typeof(config.defaultStorageLimit) === 'number' && config.defaultStorageLimit > 0?
config.defaultStorageLimit:
Core.DEFAULT_LIMIT;
try {
Env.admins = (config.adminKeys || []).map(function (k) {
k = k.replace(/\/+$/, '');
var s = k.split('/');
return s[s.length-1];
});
} catch (e) {
console.error("Can't parse admin keys. Please update or fix your config.js file!");
}
RPC.create = function (Env, cb) {
var Sessions = Env.Sessions;
var paths = Env.paths;
var pinPath = paths.pin = keyOrDefaultString('pinPath', './pins');
paths.block = keyOrDefaultString('blockPath', './block');
paths.data = keyOrDefaultString('filePath', './datastore');
paths.staging = keyOrDefaultString('blobStagingPath', './blobstage');
paths.blob = keyOrDefaultString('blobPath', './blob');
var updateLimitDaily = function () {
Quota.updateCachedLimits(Env, function (e) {
if (e) {
WARN('limitUpdate', e);
Env.WARN('limitUpdate', e);
}
});
};
@ -276,24 +197,11 @@ RPC.create = function (config, cb) {
Pinning.loadChannelPins(Env);
nThen(function (w) {
Store.create({
filePath: pinPath,
}, w(function (s) {
Env.pinStore = s;
}));
BlobStore.create({
blobPath: config.blobPath,
blobStagingPath: config.blobStagingPath,
archivePath: config.archivePath,
getSession: function (safeKey) {
return Core.getSession(Sessions, safeKey);
},
}, w(function (err, blob) {
if (err) { throw new Error(err); }
Env.blobStore = blob;
}));
}).nThen(function () {
// expire old sessions once per minute
Env.intervals.sessionExpirationInterval = setInterval(function () {
Core.expireSessions(Sessions);
}, Core.SESSION_EXPIRATION_TIME);
cb(void 0, function (Server, data, respond) {
try {
return rpc(Env, Server, data, respond);
@ -302,9 +210,4 @@ RPC.create = function (config, cb) {
console.log(e.stack);
}
});
// expire old sessions once per minute
Env.intervals.sessionExpirationInterval = setInterval(function () {
Core.expireSessions(Sessions);
}, Core.SESSION_EXPIRATION_TIME);
});
};