merge server components from staging

This commit is contained in:
ansuz 2020-02-25 10:09:35 -05:00
parent 43693b45f0
commit 542150b775
6 changed files with 406 additions and 75 deletions

View File

@ -14,21 +14,7 @@ module.exports.create = function (config) {
.on('channelClose', historyKeeper.channelClose)
.on('channelMessage', historyKeeper.channelMessage)
.on('channelOpen', historyKeeper.channelOpen)
.on('sessionClose', function (userId, reason) {
if (['BAD_MESSAGE', 'SOCKET_ERROR', 'SEND_MESSAGE_FAIL_2'].indexOf(reason) !== -1) {
if (reason && reason.code === 'ECONNRESET') { return; }
return void log.error('SESSION_CLOSE_WITH_ERROR', {
userId: userId,
reason: reason,
});
}
if (['SOCKET_CLOSED', 'SOCKET_ERROR'].indexOf(reason)) { return; }
log.verbose('SESSION_CLOSE_ROUTINE', {
userId: userId,
reason: reason,
});
})
.on('sessionClose', historyKeeper.sessionClose)
.on('error', function (error, label, info) {
if (!error) { return; }
/* labels:

View File

@ -261,6 +261,8 @@ Channel.writePrivateMessage = function (Env, args, cb, Server) {
msg // the actual message content. Generally a string
];
// XXX RESTRICT respect allow lists
// historyKeeper already knows how to handle metadata and message validation, so we just pass it off here
// if the message isn't valid it won't be stored.
Env.historyKeeper.channelMessage(Server, channelStruct, fullMessage);

View File

@ -2,21 +2,24 @@
const Data = module.exports;
const Meta = require("../metadata");
const BatchRead = require("../batch-read");
const WriteQueue = require("../write-queue");
const Core = require("./core");
const Util = require("../common-util");
const HK = require("../hk-util");
const batchMetadata = BatchRead("GET_METADATA");
Data.getMetadata = function (Env, channel, cb/* , Server */) {
Data.getMetadataRaw = function (Env, channel /* channelName */, _cb) {
const cb = Util.once(Util.mkAsync(_cb));
if (!Core.isValidId(channel)) { return void cb('INVALID_CHAN'); }
if (channel.length !== 32) { return cb("INVALID_CHAN_LENGTH"); }
if (channel.length !== HK.STANDARD_CHANNEL_LENGTH) { return cb("INVALID_CHAN_LENGTH"); }
// FIXME get metadata from the server cache if it is available
batchMetadata(channel, cb, function (done) {
var cached = Env.metadata_cache[channel];
if (HK.isMetadataMessage(cached)) {
return void cb(void 0, cached);
}
Env.batchMetadata(channel, cb, function (done) {
var ref = {};
var lineHandler = Meta.createLineHandler(ref, Env.Log.error);
return void Env.msgStore.readChannelMetadata(channel, lineHandler, function (err) {
if (err) {
// stream errors?
@ -27,6 +30,28 @@ Data.getMetadata = function (Env, channel, cb/* , Server */) {
});
};
Data.getMetadata = function (Env, channel, cb, Server, netfluxId) {
Data.getMetadataRaw(Env, channel, function (err, metadata) {
if (err) { return void cb(err); }
if (!(metadata && metadata.restricted)) {
// if it's not restricted then just call back
return void cb(void 0, metadata);
}
const session = HK.getNetfluxSession(Env, netfluxId);
const allowed = HK.listAllowedUsers(metadata);
if (!HK.isUserSessionAllowed(allowed, session)) {
return void cb(void 0, {
restricted: metadata.restricted,
allowed: allowed,
});
}
cb(void 0, metadata);
});
};
/* setMetadata
- write a new line to the metadata log if a valid command is provided
- data is an object: {
@ -46,7 +71,7 @@ Data.setMetadata = function (Env, safeKey, data, cb, Server) {
if (Meta.commands.indexOf(command) === -1) { return void cb('UNSUPPORTED_COMMAND'); }
queueMetadata(channel, function (next) {
Data.getMetadata(Env, channel, function (err, metadata) {
Data.getMetadataRaw(Env, channel, function (err, metadata) {
if (err) {
cb(err);
return void next();
@ -108,21 +133,70 @@ Data.setMetadata = function (Env, safeKey, data, cb, Server) {
return void next();
}
// send the message back to the person who changed it
// since we know they're allowed to see it
cb(void 0, metadata);
next();
const metadata_cache = Env.metadata_cache;
const channel_cache = Env.channel_cache;
// update the cached metadata
metadata_cache[channel] = metadata;
// as well as the metadata that's attached to the index...
// XXX determine if we actually need this...
var index = Util.find(channel_cache, [channel, 'index']);
if (index && typeof(index) === 'object') { index.metadata = metadata; }
Server.channelBroadcast(channel, JSON.stringify(metadata), Env.historyKeeper.id);
// it's easy to check if the channel is restricted
const isRestricted = metadata.restricted;
// and these values will be used in any case
const s_metadata = JSON.stringify(metadata);
const hk_id = Env.historyKeeper.id;
if (!isRestricted) {
// pre-allow-list behaviour
// if it's not restricted, broadcast the new metadata to everyone
return void Server.channelBroadcast(channel, s_metadata, hk_id);
}
// otherwise derive the list of users (unsafeKeys) that are allowed to stay
const allowed = HK.listAllowedUsers(metadata);
// anyone who is not allowed will get the same error message
const s_error = JSON.stringify({
error: 'ERESTRICTED',
channel: channel,
});
// iterate over the channel's userlist
const toRemove = [];
Server.getChannelUserList(channel).forEach(function (userId) {
const session = HK.getNetfluxSession(Env, userId);
// if the user is allowed to remain, send them the metadata
if (HK.isUserSessionAllowed(allowed, session)) {
return void Server.send(userId, [
0,
hk_id,
'MSG',
userId,
s_metadata
], function () {});
}
// otherwise they are not in the list.
// send them an error and kick them out!
Server.send(userId, [
0,
hk_id,
'MSG',
userId,
s_error
], function () {});
});
Server.removeFromChannel(channel, toRemove);
});
});
});
};

View File

@ -4,7 +4,7 @@ var HK = module.exports;
const nThen = require('nthen');
const Util = require("./common-util");
const Meta = require("./metadata");
const MetaRPC = require("./commands/metadata");
const Nacl = require('tweetnacl/nacl-fast');
const now = function () { return (new Date()).getTime(); };
@ -71,10 +71,37 @@ const sliceCpIndex = function (cpIndex, line) {
return start.concat(end);
};
const isMetadataMessage = function (parsed) {
const isMetadataMessage = HK.isMetadataMessage = function (parsed) {
return Boolean(parsed && parsed.channel);
};
HK.listAllowedUsers = function (metadata) {
return (metadata.owners || []).concat((metadata.allowed || []));
};
HK.getNetfluxSession = function (Env, netfluxId) {
return Env.netfluxUsers[netfluxId];
};
HK.isUserSessionAllowed = function (allowed, session) {
if (!session) { return false; }
for (var unsafeKey in session) {
if (allowed.indexOf(unsafeKey) !== -1) {
return true;
}
}
return false;
};
HK.authenticateNetfluxSession = function (Env, netfluxId, unsafeKey) {
var user = Env.netfluxUsers[netfluxId] = Env.netfluxUsers[netfluxId] || {};
user[unsafeKey] = +new Date();
};
HK.closeNetfluxSession = function (Env, netfluxId) {
delete Env.netfluxUsers[netfluxId];
};
// validateKeyStrings supplied by clients must decode to 32-byte Uint8Arrays
const isValidValidateKeyString = function (key) {
try {
@ -151,6 +178,29 @@ const checkExpired = function (Env, Server, channel) {
return true;
};
const getMetadata = HK.getMetadata = function (Env, channelName, _cb) {
var cb = Util.once(Util.mkAsync(_cb));
var metadata = Env.metadata_cache[channelName];
if (metadata && typeof(metadata) === 'object') {
return void cb(undefined, metadata);
}
MetaRPC.getMetadataRaw(Env, channelName, function (err, metadata) {
if (err) {
console.error(err);
return void cb(err);
}
if (!(metadata && typeof(metadata.channel) === 'string' && metadata.channel.length === STANDARD_CHANNEL_LENGTH)) {
return cb();
}
// cache it
Env.metadata_cache[channelName] = metadata;
cb(undefined, metadata);
});
};
/* computeIndex
can call back with an error or a computed index which includes:
* cpIndex:
@ -180,13 +230,16 @@ const computeIndex = function (Env, channelName, cb) {
let metadata;
let i = 0;
const ref = {};
const CB = Util.once(cb);
const offsetByHash = {};
let size = 0;
nThen(function (w) {
getMetadata(Env, channelName, w(function (err, _metadata) {
//if (err) { console.log(err); }
metadata = _metadata;
}));
}).nThen(function (w) {
// iterate over all messages in the channel log
// old channels can contain metadata as the first message of the log
// remember metadata the first time you encounter it
@ -195,16 +248,14 @@ const computeIndex = function (Env, channelName, cb) {
let msg;
// keep an eye out for the metadata line if you haven't already seen it
// but only check for metadata on the first line
if (!i && !metadata && msgObj.buff.indexOf('{') === 0) {
if (!i && msgObj.buff.indexOf('{') === 0) {
i++; // always increment the message counter
msg = tryParse(Env, msgObj.buff.toString('utf8'));
if (typeof msg === "undefined") { return readMore(); }
// validate that the current line really is metadata before storing it as such
if (isMetadataMessage(msg)) {
metadata = msg;
return readMore();
}
// skip this, as you already have metadata...
if (isMetadataMessage(msg)) { return readMore(); }
}
i++;
if (msgObj.buff.indexOf('cp|') > -1) {
@ -245,26 +296,8 @@ const computeIndex = function (Env, channelName, cb) {
size = msgObj.offset + msgObj.buff.length + 1;
});
}));
}).nThen(function (w) {
// create a function which will iterate over amendments to the metadata
const handler = Meta.createLineHandler(ref, Log.error);
// initialize the accumulator in case there was a foundational metadata line in the log content
if (metadata) { handler(void 0, metadata); }
// iterate over the dedicated metadata log (if it exists)
// proceed even in the event of a stream error on the metadata log
store.readDedicatedMetadata(channelName, handler, w(function (err) {
if (err) {
return void Log.error("DEDICATED_METADATA_ERROR", err);
}
}));
}).nThen(function () {
// when all is done, cache the metadata in memory
if (ref.index) { // but don't bother if no metadata was found...
metadata = Env.metadata_cache[channelName] = ref.meta;
}
// and return the computed index
// return the computed index
CB(null, {
// Only keep the checkpoints included in the last 100 messages
cpIndex: sliceCpIndex(cpIndex, i),
@ -293,9 +326,7 @@ const getIndex = (Env, channelName, cb) => {
// if there is a channel in memory and it has an index cached, return it
if (chan && chan.index) {
// enforce async behaviour
return void setTimeout(function () {
cb(undefined, chan.index);
});
return void Util.mkAsync(cb)(undefined, chan.index);
}
Env.batchIndexReads(channelName, cb, function (done) {
@ -569,7 +600,7 @@ const handleRPC = function (Env, Server, seq, userId, parsed) {
Server.send(userId, [seq, 'ACK']);
try {
// slice off the sequence number and pass in the rest of the message
Env.rpc(Server, rpc_call, function (err, output) {
Env.rpc(Server, userId, rpc_call, function (err, output) {
if (err) {
Server.send(userId, [0, HISTORY_KEEPER_ID, 'MSG', userId, JSON.stringify([parsed[0], 'ERROR', err])]);
return;
@ -646,6 +677,7 @@ const handleGetHistory = function (Env, Server, seq, userId, parsed) {
// And then check if the channel is expired. If it is, send the error and abort
// FIXME this is hard to read because 'checkExpired' has side effects
if (checkExpired(Env, Server, channelName)) { return void waitFor.abort(); }
// always send metadata with GET_HISTORY requests
Server.send(userId, [0, HISTORY_KEEPER_ID, 'MSG', userId, JSON.stringify(index.metadata)], w);
}));
@ -812,16 +844,62 @@ HK.onDirectMessage = function (Env, Server, seq, userId, json) {
return;
}
// If the requested history is for an expired channel, abort
// Note the if we don't have the keys for that channel in metadata_cache, we'll
// have to abort later (once we know the expiration time)
if (checkExpired(Env, Server, parsed[1])) { return; }
var first = parsed[0];
// look up the appropriate command in the map of commands or fall back to RPC
var command = directMessageCommands[parsed[0]] || handleRPC;
if (typeof(directMessageCommands[first]) !== 'function') {
// it's either an unsupported command or an RPC call
// either way, RPC has it covered
return void handleRPC(Env, Server, seq, userId, parsed);
}
// run the command with the standard function signature
command(Env, Server, seq, userId, parsed);
// otherwise it's some kind of history retrieval command...
// go grab its metadata, because unfortunately people can ask for history
// whether or not they have joined the channel, so we can't rely on JOIN restriction
// to stop people from loading history they shouldn't see.
var channelName = parsed[1];
nThen(function (w) {
HK.getMetadata(Env, channelName, w(function (err, metadata) {
if (err) {
// stream errors?
// we should log these, but if we can't load metadata
// then it's probably not restricted or expired
// it's not like anything else will recover from this anyway
return;
}
// likewise, we can't do anything more here if there's no metadata
// jump to the next block
if (!metadata) { return; }
// If the requested history is for an expired channel, abort
// checkExpired has side effects and will disconnect users for you...
if (checkExpired(Env, Server, parsed[1])) {
// if the channel is expired just abort.
w.abort();
return;
}
// jump to handling the command if there's no restriction...
if (!metadata.restricted) { return; }
// check if the user is in the allow list...
const allowed = HK.listAllowedUsers(metadata);
const session = HK.getNetfluxSession(Env, userId);
if (HK.isUserSessionAllowed(allowed, session)) {
return;
}
// XXX NOT ALLOWED
// respond to txid with error as in handleGetHistory
// send the allow list anyway, it might not get used currently
// but will in the future
}));
}).nThen(function () {
// run the appropriate command from the map
directMessageCommands[first](Env, Server, seq, userId, parsed);
});
};
/* onChannelMessage

View File

@ -2,23 +2,169 @@ var Meta = module.exports;
var deduplicate = require("./common-util").deduplicateString;
/* Metadata fields:
/* Metadata fields and the commands that can modify them
we assume that these commands can only be performed
by owners or in some cases pending owners. Thus
the owners field is guaranteed to exist.
* channel <STRING>
* validateKey <STRING>
* owners <ARRAY>
* ADD_OWNERS
* RM_OWNERS
* RESET_OWNERS
* pending_owners <ARRAY>
* ADD_PENDING_OWNERS
* RM_PENDING_OWNERS
* expire <NUMBER>
* UPDATE_EXPIRATION (NOT_IMPLEMENTED)
* restricted <BOOLEAN>
* RESTRICT_ACCESS
* allowed <ARRAY>
* ADD_ALLOWED
* RM_ALLOWED
* RESET_ALLOWED
* ADD_OWNERS
* RESET_OWNERS
* mailbox <STRING|MAP>
* ADD_MAILBOX
* RM_MAILBOX
*/
var commands = {};
var isValidOwner = function (owner) {
var isValidPublicKey = function (owner) {
return typeof(owner) === 'string' && owner.length === 44;
};
// isValidPublicKey is a better indication of what the above function does
// I'm preserving this function name in case we ever want to expand its
// criteria at a later time...
var isValidOwner = isValidPublicKey;
// ["RESTRICT_ACCESS", [true], 1561623438989]
// ["RESTRICT_ACCESS", [false], 1561623438989]
commands.RESTRICT_ACCESS = function (meta, args) {
if (!Array.isArray(args) || typeof(args[0]) !== 'boolean') {
throw new Error('INVALID_STATE');
}
var bool = args[0];
// reject the proposed command if there is no change in state
if (meta.restricted === bool) { return false; }
// apply the new state
meta.restricted = args[0];
// if you're disabling access restrictions then you can assume
// then there is nothing more to do. Leave the existing list as-is
if (!bool) { return true; }
// you're all set if an allow list already exists
if (Array.isArray(meta.allowed)) { return true; }
// otherwise define it
meta.allowed = [];
return true;
};
// ["ADD_ALLOWED", ["7eEqelGso3EBr5jHlei6av4r9w2B9XZiGGwA1EgZ-5I=", ...], 1561623438989]
commands.ADD_ALLOWED = function (meta, args) {
if (!Array.isArray(args)) {
throw new Error("INVALID_ARGS");
}
var allowed = meta.allowed || [];
var changed = false;
args.forEach(function (arg) {
// don't add invalid public keys
if (!isValidPublicKey(arg)) { return; }
// don't add owners to the allow list
if (meta.owners.indexOf(arg) >= 0) { return; }
// don't duplicate entries in the allow list
if (allowed.indexOf(arg) >= 0) { return; }
allowed.push(arg);
changed = true;
});
if (changed) {
meta.allowed = meta.allowed || allowed;
}
return changed;
};
// ["RM_ALLOWED", ["7eEqelGso3EBr5jHlei6av4r9w2B9XZiGGwA1EgZ-5I=", ...], 1561623438989]
commands.RM_ALLOWED = function (meta, args) {
if (!Array.isArray(args)) {
throw new Error("INVALID_ARGS");
}
// there may not be anything to remove
if (!meta.allowed) { return false; }
var changed = false;
args.forEach(function (arg) {
var index = meta.allowed.indexOf(arg);
if (index < 0) { return; }
meta.allowed.splice(index, 1);
changed = true;
});
return changed;
};
var arrayHasChanged = function (A, B) {
var changed;
A.some(function (a) {
if (B.indexOf(a) < 0) { return (changed = true); }
});
if (changed) { return true; }
B.some(function (b) {
if (A.indexOf(b) < 0) { return (changed = true); }
});
return changed;
};
var filterInPlace = function (A, f) {
for (var i = A.length - 1; i >= 0; i--) {
if (f(A[i], i, A)) { A.splice(i, 1); }
}
};
// ["RESET_ALLOWED", ["7eEqelGso3EBr5jHlei6av4r9w2B9XZiGGwA1EgZ-5I=", ...], 1561623438989]
commands.RESET_ALLOWED = function (meta, args) {
if (!Array.isArray(args)) { throw new Error("INVALID_ARGS"); }
var updated = args.filter(function (arg) {
// don't allow invalid public keys
if (!isValidPublicKey(arg)) { return false; }
// don't ever add owners to the allow list
if (meta.owners.indexOf(arg)) { return false; }
return true;
});
// this is strictly an optimization...
// a change in length is a clear indicator of a functional change
if (meta.allowed && meta.allowed.length !== updated.length) {
meta.allowed = updated;
return true;
}
// otherwise we must check that the arrays contain distinct elements
// if there is no functional change, then return false
if (!arrayHasChanged(meta.allowed, updated)) { return false; }
// otherwise overwrite the in-memory data and indicate that there was a change
meta.allowed = updated;
return true;
};
// ["ADD_OWNERS", ["7eEqelGso3EBr5jHlei6av4r9w2B9XZiGGwA1EgZ-5I="], 1561623438989]
commands.ADD_OWNERS = function (meta, args) {
// bail out if args isn't an array
@ -40,6 +186,13 @@ commands.ADD_OWNERS = function (meta, args) {
changed = true;
});
if (changed && Array.isArray(meta.allowed)) {
// make sure owners are not included in the allow list
filterInPlace(meta.allowed, function (member) {
return meta.owners.indexOf(member) !== -1;
});
}
return changed;
};
@ -71,6 +224,10 @@ commands.RM_OWNERS = function (meta, args) {
changed = true;
});
if (meta.owners.length === 0 && meta.restricted) {
meta.restricted = false;
}
return changed;
};
@ -141,6 +298,18 @@ commands.RESET_OWNERS = function (meta, args) {
// overwrite the existing owners with the new one
meta.owners = deduplicate(args.filter(isValidOwner));
if (Array.isArray(meta.allowed)) {
// make sure owners are not included in the allow list
filterInPlace(meta.allowed, function (member) {
return meta.owners.indexOf(member) !== -1;
});
}
if (meta.owners.length === 0 && meta.restricted) {
meta.restricted = false;
}
return true;
};
@ -178,6 +347,25 @@ commands.ADD_MAILBOX = function (meta, args) {
return changed;
};
commands.RM_MAILBOX = function (meta, args) {
if (!Array.isArray(args)) { throw new Error("INVALID_ARGS"); }
if (!meta.mailbox || typeof(meta.mailbox) === 'undefined') {
return false;
}
if (typeof(meta.mailbox) === 'string' && args.length === 0) {
delete meta.mailbox;
return true;
}
var changed = false;
args.forEach(function (arg) {
if (meta.mailbox[arg] === 'undefined') { return; }
delete meta.mailbox[arg];
changed = true;
});
return changed;
};
commands.UPDATE_EXPIRATION = function () {
throw new Error("E_NOT_IMPLEMENTED");
};

View File

@ -9,6 +9,7 @@ const Block = require("./commands/block");
const Metadata = require("./commands/metadata");
const Channel = require("./commands/channel");
const Upload = require("./commands/upload");
const HK = require("./hk-util");
var RPC = module.exports;
@ -26,7 +27,7 @@ var isUnauthenticateMessage = function (msg) {
return msg && msg.length === 2 && typeof(UNAUTHENTICATED_CALLS[msg[0]]) === 'function';
};
var handleUnauthenticatedMessage = function (Env, msg, respond, Server) {
var handleUnauthenticatedMessage = function (Env, msg, respond, Server, netfluxId) {
Env.Log.silly('LOG_RPC', msg[0]);
var method = UNAUTHENTICATED_CALLS[msg[0]];
@ -36,7 +37,7 @@ var handleUnauthenticatedMessage = function (Env, msg, respond, Server) {
return void respond(err);
}
respond(err, [null, value, null]);
}, Server);
}, Server, netfluxId);
};
const AUTHENTICATED_USER_TARGETED = {
@ -117,7 +118,7 @@ var handleAuthenticatedMessage = function (Env, unsafeKey, msg, respond, Server)
return void Respond('UNSUPPORTED_RPC_CALL', msg);
};
var rpc = function (Env, Server, data, respond) {
var rpc = function (Env, Server, userId, data, respond) {
if (!Array.isArray(data)) {
Env.Log.debug('INVALID_ARG_FORMET', data);
return void respond('INVALID_ARG_FORMAT');
@ -136,15 +137,16 @@ var rpc = function (Env, Server, data, respond) {
}
if (isUnauthenticateMessage(msg)) {
return handleUnauthenticatedMessage(Env, msg, respond, Server);
return handleUnauthenticatedMessage(Env, msg, respond, Server, userId);
}
var signature = msg.shift();
var publicKey = msg.shift();
// make sure a user object is initialized in the cookie jar
var session;
if (publicKey) {
Core.getSession(Env.Sessions, publicKey);
session = Core.getSession(Env.Sessions, publicKey);
} else {
Env.Log.debug("NO_PUBLIC_KEY_PROVIDED", publicKey);
}
@ -174,6 +176,7 @@ var rpc = function (Env, Server, data, respond) {
// check the signature on the message
// refuse the command if it doesn't validate
if (Core.checkSignature(Env, serialized, signature, publicKey) === true) {
HK.authenticateNetfluxSession(Env, userId, publicKey);
return void handleAuthenticatedMessage(Env, publicKey, msg, respond, Server);
}
return void respond("INVALID_SIGNATURE_OR_PUBLIC_KEY");
@ -202,9 +205,9 @@ RPC.create = function (Env, cb) {
Core.expireSessions(Sessions);
}, Core.SESSION_EXPIRATION_TIME);
cb(void 0, function (Server, data, respond) {
cb(void 0, function (Server, userId, data, respond) {
try {
return rpc(Env, Server, data, respond);
return rpc(Env, Server, userId, data, respond);
} catch (e) {
console.log("Error from RPC with data " + JSON.stringify(data));
console.log(e.stack);