diff --git a/lib/api.js b/lib/api.js index 9201599e4..8e6725039 100644 --- a/lib/api.js +++ b/lib/api.js @@ -14,21 +14,7 @@ module.exports.create = function (config) { .on('channelClose', historyKeeper.channelClose) .on('channelMessage', historyKeeper.channelMessage) .on('channelOpen', historyKeeper.channelOpen) - .on('sessionClose', function (userId, reason) { - if (['BAD_MESSAGE', 'SOCKET_ERROR', 'SEND_MESSAGE_FAIL_2'].indexOf(reason) !== -1) { - if (reason && reason.code === 'ECONNRESET') { return; } - return void log.error('SESSION_CLOSE_WITH_ERROR', { - userId: userId, - reason: reason, - }); - } - - if (['SOCKET_CLOSED', 'SOCKET_ERROR'].indexOf(reason)) { return; } - log.verbose('SESSION_CLOSE_ROUTINE', { - userId: userId, - reason: reason, - }); - }) + .on('sessionClose', historyKeeper.sessionClose) .on('error', function (error, label, info) { if (!error) { return; } /* labels: diff --git a/lib/commands/channel.js b/lib/commands/channel.js index 6296aee0e..da4fb6685 100644 --- a/lib/commands/channel.js +++ b/lib/commands/channel.js @@ -261,6 +261,8 @@ Channel.writePrivateMessage = function (Env, args, cb, Server) { msg // the actual message content. Generally a string ]; + // XXX RESTRICT respect allow lists + // historyKeeper already knows how to handle metadata and message validation, so we just pass it off here // if the message isn't valid it won't be stored. Env.historyKeeper.channelMessage(Server, channelStruct, fullMessage); diff --git a/lib/commands/metadata.js b/lib/commands/metadata.js index 5b5e28f7e..a5bca0dca 100644 --- a/lib/commands/metadata.js +++ b/lib/commands/metadata.js @@ -2,21 +2,24 @@ const Data = module.exports; const Meta = require("../metadata"); -const BatchRead = require("../batch-read"); const WriteQueue = require("../write-queue"); const Core = require("./core"); const Util = require("../common-util"); +const HK = require("../hk-util"); -const batchMetadata = BatchRead("GET_METADATA"); -Data.getMetadata = function (Env, channel, cb/* , Server */) { +Data.getMetadataRaw = function (Env, channel /* channelName */, _cb) { + const cb = Util.once(Util.mkAsync(_cb)); if (!Core.isValidId(channel)) { return void cb('INVALID_CHAN'); } - if (channel.length !== 32) { return cb("INVALID_CHAN_LENGTH"); } + if (channel.length !== HK.STANDARD_CHANNEL_LENGTH) { return cb("INVALID_CHAN_LENGTH"); } - // FIXME get metadata from the server cache if it is available - batchMetadata(channel, cb, function (done) { + var cached = Env.metadata_cache[channel]; + if (HK.isMetadataMessage(cached)) { + return void cb(void 0, cached); + } + + Env.batchMetadata(channel, cb, function (done) { var ref = {}; var lineHandler = Meta.createLineHandler(ref, Env.Log.error); - return void Env.msgStore.readChannelMetadata(channel, lineHandler, function (err) { if (err) { // stream errors? @@ -27,6 +30,28 @@ Data.getMetadata = function (Env, channel, cb/* , Server */) { }); }; +Data.getMetadata = function (Env, channel, cb, Server, netfluxId) { + Data.getMetadataRaw(Env, channel, function (err, metadata) { + if (err) { return void cb(err); } + + if (!(metadata && metadata.restricted)) { + // if it's not restricted then just call back + return void cb(void 0, metadata); + } + + const session = HK.getNetfluxSession(Env, netfluxId); + const allowed = HK.listAllowedUsers(metadata); + + if (!HK.isUserSessionAllowed(allowed, session)) { + return void cb(void 0, { + restricted: metadata.restricted, + allowed: allowed, + }); + } + cb(void 0, metadata); + }); +}; + /* setMetadata - write a new line to the metadata log if a valid command is provided - data is an object: { @@ -46,7 +71,7 @@ Data.setMetadata = function (Env, safeKey, data, cb, Server) { if (Meta.commands.indexOf(command) === -1) { return void cb('UNSUPPORTED_COMMAND'); } queueMetadata(channel, function (next) { - Data.getMetadata(Env, channel, function (err, metadata) { + Data.getMetadataRaw(Env, channel, function (err, metadata) { if (err) { cb(err); return void next(); @@ -108,21 +133,70 @@ Data.setMetadata = function (Env, safeKey, data, cb, Server) { return void next(); } + // send the message back to the person who changed it + // since we know they're allowed to see it cb(void 0, metadata); next(); const metadata_cache = Env.metadata_cache; const channel_cache = Env.channel_cache; + // update the cached metadata metadata_cache[channel] = metadata; + // as well as the metadata that's attached to the index... + // XXX determine if we actually need this... var index = Util.find(channel_cache, [channel, 'index']); if (index && typeof(index) === 'object') { index.metadata = metadata; } - Server.channelBroadcast(channel, JSON.stringify(metadata), Env.historyKeeper.id); + // it's easy to check if the channel is restricted + const isRestricted = metadata.restricted; + // and these values will be used in any case + const s_metadata = JSON.stringify(metadata); + const hk_id = Env.historyKeeper.id; + + if (!isRestricted) { + // pre-allow-list behaviour + // if it's not restricted, broadcast the new metadata to everyone + return void Server.channelBroadcast(channel, s_metadata, hk_id); + } + + // otherwise derive the list of users (unsafeKeys) that are allowed to stay + const allowed = HK.listAllowedUsers(metadata); + // anyone who is not allowed will get the same error message + const s_error = JSON.stringify({ + error: 'ERESTRICTED', + channel: channel, + }); + + // iterate over the channel's userlist + const toRemove = []; + Server.getChannelUserList(channel).forEach(function (userId) { + const session = HK.getNetfluxSession(Env, userId); + + // if the user is allowed to remain, send them the metadata + if (HK.isUserSessionAllowed(allowed, session)) { + return void Server.send(userId, [ + 0, + hk_id, + 'MSG', + userId, + s_metadata + ], function () {}); + } + // otherwise they are not in the list. + // send them an error and kick them out! + Server.send(userId, [ + 0, + hk_id, + 'MSG', + userId, + s_error + ], function () {}); + }); + + Server.removeFromChannel(channel, toRemove); }); }); }); }; - - diff --git a/lib/hk-util.js b/lib/hk-util.js index 5ba923809..41d305172 100644 --- a/lib/hk-util.js +++ b/lib/hk-util.js @@ -4,7 +4,7 @@ var HK = module.exports; const nThen = require('nthen'); const Util = require("./common-util"); -const Meta = require("./metadata"); +const MetaRPC = require("./commands/metadata"); const Nacl = require('tweetnacl/nacl-fast'); const now = function () { return (new Date()).getTime(); }; @@ -71,10 +71,37 @@ const sliceCpIndex = function (cpIndex, line) { return start.concat(end); }; -const isMetadataMessage = function (parsed) { +const isMetadataMessage = HK.isMetadataMessage = function (parsed) { return Boolean(parsed && parsed.channel); }; +HK.listAllowedUsers = function (metadata) { + return (metadata.owners || []).concat((metadata.allowed || [])); +}; + +HK.getNetfluxSession = function (Env, netfluxId) { + return Env.netfluxUsers[netfluxId]; +}; + +HK.isUserSessionAllowed = function (allowed, session) { + if (!session) { return false; } + for (var unsafeKey in session) { + if (allowed.indexOf(unsafeKey) !== -1) { + return true; + } + } + return false; +}; + +HK.authenticateNetfluxSession = function (Env, netfluxId, unsafeKey) { + var user = Env.netfluxUsers[netfluxId] = Env.netfluxUsers[netfluxId] || {}; + user[unsafeKey] = +new Date(); +}; + +HK.closeNetfluxSession = function (Env, netfluxId) { + delete Env.netfluxUsers[netfluxId]; +}; + // validateKeyStrings supplied by clients must decode to 32-byte Uint8Arrays const isValidValidateKeyString = function (key) { try { @@ -151,6 +178,29 @@ const checkExpired = function (Env, Server, channel) { return true; }; +const getMetadata = HK.getMetadata = function (Env, channelName, _cb) { + var cb = Util.once(Util.mkAsync(_cb)); + + var metadata = Env.metadata_cache[channelName]; + if (metadata && typeof(metadata) === 'object') { + return void cb(undefined, metadata); + } + + MetaRPC.getMetadataRaw(Env, channelName, function (err, metadata) { + if (err) { + console.error(err); + return void cb(err); + } + if (!(metadata && typeof(metadata.channel) === 'string' && metadata.channel.length === STANDARD_CHANNEL_LENGTH)) { + return cb(); + } + + // cache it + Env.metadata_cache[channelName] = metadata; + cb(undefined, metadata); + }); +}; + /* computeIndex can call back with an error or a computed index which includes: * cpIndex: @@ -180,13 +230,16 @@ const computeIndex = function (Env, channelName, cb) { let metadata; let i = 0; - const ref = {}; - const CB = Util.once(cb); const offsetByHash = {}; let size = 0; nThen(function (w) { + getMetadata(Env, channelName, w(function (err, _metadata) { + //if (err) { console.log(err); } + metadata = _metadata; + })); + }).nThen(function (w) { // iterate over all messages in the channel log // old channels can contain metadata as the first message of the log // remember metadata the first time you encounter it @@ -195,16 +248,14 @@ const computeIndex = function (Env, channelName, cb) { let msg; // keep an eye out for the metadata line if you haven't already seen it // but only check for metadata on the first line - if (!i && !metadata && msgObj.buff.indexOf('{') === 0) { + if (!i && msgObj.buff.indexOf('{') === 0) { i++; // always increment the message counter msg = tryParse(Env, msgObj.buff.toString('utf8')); if (typeof msg === "undefined") { return readMore(); } // validate that the current line really is metadata before storing it as such - if (isMetadataMessage(msg)) { - metadata = msg; - return readMore(); - } + // skip this, as you already have metadata... + if (isMetadataMessage(msg)) { return readMore(); } } i++; if (msgObj.buff.indexOf('cp|') > -1) { @@ -245,26 +296,8 @@ const computeIndex = function (Env, channelName, cb) { size = msgObj.offset + msgObj.buff.length + 1; }); })); - }).nThen(function (w) { - // create a function which will iterate over amendments to the metadata - const handler = Meta.createLineHandler(ref, Log.error); - - // initialize the accumulator in case there was a foundational metadata line in the log content - if (metadata) { handler(void 0, metadata); } - - // iterate over the dedicated metadata log (if it exists) - // proceed even in the event of a stream error on the metadata log - store.readDedicatedMetadata(channelName, handler, w(function (err) { - if (err) { - return void Log.error("DEDICATED_METADATA_ERROR", err); - } - })); }).nThen(function () { - // when all is done, cache the metadata in memory - if (ref.index) { // but don't bother if no metadata was found... - metadata = Env.metadata_cache[channelName] = ref.meta; - } - // and return the computed index + // return the computed index CB(null, { // Only keep the checkpoints included in the last 100 messages cpIndex: sliceCpIndex(cpIndex, i), @@ -293,9 +326,7 @@ const getIndex = (Env, channelName, cb) => { // if there is a channel in memory and it has an index cached, return it if (chan && chan.index) { // enforce async behaviour - return void setTimeout(function () { - cb(undefined, chan.index); - }); + return void Util.mkAsync(cb)(undefined, chan.index); } Env.batchIndexReads(channelName, cb, function (done) { @@ -569,7 +600,7 @@ const handleRPC = function (Env, Server, seq, userId, parsed) { Server.send(userId, [seq, 'ACK']); try { // slice off the sequence number and pass in the rest of the message - Env.rpc(Server, rpc_call, function (err, output) { + Env.rpc(Server, userId, rpc_call, function (err, output) { if (err) { Server.send(userId, [0, HISTORY_KEEPER_ID, 'MSG', userId, JSON.stringify([parsed[0], 'ERROR', err])]); return; @@ -646,6 +677,7 @@ const handleGetHistory = function (Env, Server, seq, userId, parsed) { // And then check if the channel is expired. If it is, send the error and abort // FIXME this is hard to read because 'checkExpired' has side effects if (checkExpired(Env, Server, channelName)) { return void waitFor.abort(); } + // always send metadata with GET_HISTORY requests Server.send(userId, [0, HISTORY_KEEPER_ID, 'MSG', userId, JSON.stringify(index.metadata)], w); })); @@ -812,16 +844,62 @@ HK.onDirectMessage = function (Env, Server, seq, userId, json) { return; } - // If the requested history is for an expired channel, abort - // Note the if we don't have the keys for that channel in metadata_cache, we'll - // have to abort later (once we know the expiration time) - if (checkExpired(Env, Server, parsed[1])) { return; } + var first = parsed[0]; - // look up the appropriate command in the map of commands or fall back to RPC - var command = directMessageCommands[parsed[0]] || handleRPC; + if (typeof(directMessageCommands[first]) !== 'function') { + // it's either an unsupported command or an RPC call + // either way, RPC has it covered + return void handleRPC(Env, Server, seq, userId, parsed); + } - // run the command with the standard function signature - command(Env, Server, seq, userId, parsed); + // otherwise it's some kind of history retrieval command... + // go grab its metadata, because unfortunately people can ask for history + // whether or not they have joined the channel, so we can't rely on JOIN restriction + // to stop people from loading history they shouldn't see. + var channelName = parsed[1]; + nThen(function (w) { + HK.getMetadata(Env, channelName, w(function (err, metadata) { + if (err) { + // stream errors? + // we should log these, but if we can't load metadata + // then it's probably not restricted or expired + // it's not like anything else will recover from this anyway + return; + } + + + // likewise, we can't do anything more here if there's no metadata + // jump to the next block + if (!metadata) { return; } + + // If the requested history is for an expired channel, abort + // checkExpired has side effects and will disconnect users for you... + if (checkExpired(Env, Server, parsed[1])) { + // if the channel is expired just abort. + w.abort(); + return; + } + + // jump to handling the command if there's no restriction... + if (!metadata.restricted) { return; } + + // check if the user is in the allow list... + const allowed = HK.listAllowedUsers(metadata); + const session = HK.getNetfluxSession(Env, userId); + + if (HK.isUserSessionAllowed(allowed, session)) { + return; + } + + // XXX NOT ALLOWED + // respond to txid with error as in handleGetHistory + // send the allow list anyway, it might not get used currently + // but will in the future + })); + }).nThen(function () { + // run the appropriate command from the map + directMessageCommands[first](Env, Server, seq, userId, parsed); + }); }; /* onChannelMessage diff --git a/lib/metadata.js b/lib/metadata.js index 560e12bb1..97f2e484a 100644 --- a/lib/metadata.js +++ b/lib/metadata.js @@ -2,23 +2,169 @@ var Meta = module.exports; var deduplicate = require("./common-util").deduplicateString; -/* Metadata fields: +/* Metadata fields and the commands that can modify them + +we assume that these commands can only be performed +by owners or in some cases pending owners. Thus +the owners field is guaranteed to exist. * channel * validateKey * owners * ADD_OWNERS * RM_OWNERS + * RESET_OWNERS + * pending_owners + * ADD_PENDING_OWNERS + * RM_PENDING_OWNERS * expire + * UPDATE_EXPIRATION (NOT_IMPLEMENTED) + * restricted + * RESTRICT_ACCESS + * allowed + * ADD_ALLOWED + * RM_ALLOWED + * RESET_ALLOWED + * ADD_OWNERS + * RESET_OWNERS + * mailbox + * ADD_MAILBOX + * RM_MAILBOX */ var commands = {}; -var isValidOwner = function (owner) { +var isValidPublicKey = function (owner) { return typeof(owner) === 'string' && owner.length === 44; }; +// isValidPublicKey is a better indication of what the above function does +// I'm preserving this function name in case we ever want to expand its +// criteria at a later time... +var isValidOwner = isValidPublicKey; + +// ["RESTRICT_ACCESS", [true], 1561623438989] +// ["RESTRICT_ACCESS", [false], 1561623438989] +commands.RESTRICT_ACCESS = function (meta, args) { + if (!Array.isArray(args) || typeof(args[0]) !== 'boolean') { + throw new Error('INVALID_STATE'); + } + + var bool = args[0]; + + // reject the proposed command if there is no change in state + if (meta.restricted === bool) { return false; } + + // apply the new state + meta.restricted = args[0]; + + // if you're disabling access restrictions then you can assume + // then there is nothing more to do. Leave the existing list as-is + if (!bool) { return true; } + + // you're all set if an allow list already exists + if (Array.isArray(meta.allowed)) { return true; } + + // otherwise define it + meta.allowed = []; + + return true; +}; + +// ["ADD_ALLOWED", ["7eEqelGso3EBr5jHlei6av4r9w2B9XZiGGwA1EgZ-5I=", ...], 1561623438989] +commands.ADD_ALLOWED = function (meta, args) { + if (!Array.isArray(args)) { + throw new Error("INVALID_ARGS"); + } + + var allowed = meta.allowed || []; + + var changed = false; + args.forEach(function (arg) { + // don't add invalid public keys + if (!isValidPublicKey(arg)) { return; } + // don't add owners to the allow list + if (meta.owners.indexOf(arg) >= 0) { return; } + // don't duplicate entries in the allow list + if (allowed.indexOf(arg) >= 0) { return; } + allowed.push(arg); + changed = true; + }); + + if (changed) { + meta.allowed = meta.allowed || allowed; + } + + return changed; +}; + +// ["RM_ALLOWED", ["7eEqelGso3EBr5jHlei6av4r9w2B9XZiGGwA1EgZ-5I=", ...], 1561623438989] +commands.RM_ALLOWED = function (meta, args) { + if (!Array.isArray(args)) { + throw new Error("INVALID_ARGS"); + } + + // there may not be anything to remove + if (!meta.allowed) { return false; } + + var changed = false; + args.forEach(function (arg) { + var index = meta.allowed.indexOf(arg); + if (index < 0) { return; } + meta.allowed.splice(index, 1); + changed = true; + }); + + return changed; +}; + +var arrayHasChanged = function (A, B) { + var changed; + A.some(function (a) { + if (B.indexOf(a) < 0) { return (changed = true); } + }); + if (changed) { return true; } + B.some(function (b) { + if (A.indexOf(b) < 0) { return (changed = true); } + }); + return changed; +}; + +var filterInPlace = function (A, f) { + for (var i = A.length - 1; i >= 0; i--) { + if (f(A[i], i, A)) { A.splice(i, 1); } + } +}; + +// ["RESET_ALLOWED", ["7eEqelGso3EBr5jHlei6av4r9w2B9XZiGGwA1EgZ-5I=", ...], 1561623438989] +commands.RESET_ALLOWED = function (meta, args) { + if (!Array.isArray(args)) { throw new Error("INVALID_ARGS"); } + + var updated = args.filter(function (arg) { + // don't allow invalid public keys + if (!isValidPublicKey(arg)) { return false; } + // don't ever add owners to the allow list + if (meta.owners.indexOf(arg)) { return false; } + return true; + }); + + // this is strictly an optimization... + // a change in length is a clear indicator of a functional change + if (meta.allowed && meta.allowed.length !== updated.length) { + meta.allowed = updated; + return true; + } + + // otherwise we must check that the arrays contain distinct elements + // if there is no functional change, then return false + if (!arrayHasChanged(meta.allowed, updated)) { return false; } + + // otherwise overwrite the in-memory data and indicate that there was a change + meta.allowed = updated; + return true; +}; + // ["ADD_OWNERS", ["7eEqelGso3EBr5jHlei6av4r9w2B9XZiGGwA1EgZ-5I="], 1561623438989] commands.ADD_OWNERS = function (meta, args) { // bail out if args isn't an array @@ -40,6 +186,13 @@ commands.ADD_OWNERS = function (meta, args) { changed = true; }); + if (changed && Array.isArray(meta.allowed)) { + // make sure owners are not included in the allow list + filterInPlace(meta.allowed, function (member) { + return meta.owners.indexOf(member) !== -1; + }); + } + return changed; }; @@ -71,6 +224,10 @@ commands.RM_OWNERS = function (meta, args) { changed = true; }); + if (meta.owners.length === 0 && meta.restricted) { + meta.restricted = false; + } + return changed; }; @@ -141,6 +298,18 @@ commands.RESET_OWNERS = function (meta, args) { // overwrite the existing owners with the new one meta.owners = deduplicate(args.filter(isValidOwner)); + + if (Array.isArray(meta.allowed)) { + // make sure owners are not included in the allow list + filterInPlace(meta.allowed, function (member) { + return meta.owners.indexOf(member) !== -1; + }); + } + + if (meta.owners.length === 0 && meta.restricted) { + meta.restricted = false; + } + return true; }; @@ -178,6 +347,25 @@ commands.ADD_MAILBOX = function (meta, args) { return changed; }; +commands.RM_MAILBOX = function (meta, args) { + if (!Array.isArray(args)) { throw new Error("INVALID_ARGS"); } + if (!meta.mailbox || typeof(meta.mailbox) === 'undefined') { + return false; + } + if (typeof(meta.mailbox) === 'string' && args.length === 0) { + delete meta.mailbox; + return true; + } + + var changed = false; + args.forEach(function (arg) { + if (meta.mailbox[arg] === 'undefined') { return; } + delete meta.mailbox[arg]; + changed = true; + }); + return changed; +}; + commands.UPDATE_EXPIRATION = function () { throw new Error("E_NOT_IMPLEMENTED"); }; diff --git a/lib/rpc.js b/lib/rpc.js index 875032b74..c54edf31d 100644 --- a/lib/rpc.js +++ b/lib/rpc.js @@ -9,6 +9,7 @@ const Block = require("./commands/block"); const Metadata = require("./commands/metadata"); const Channel = require("./commands/channel"); const Upload = require("./commands/upload"); +const HK = require("./hk-util"); var RPC = module.exports; @@ -26,7 +27,7 @@ var isUnauthenticateMessage = function (msg) { return msg && msg.length === 2 && typeof(UNAUTHENTICATED_CALLS[msg[0]]) === 'function'; }; -var handleUnauthenticatedMessage = function (Env, msg, respond, Server) { +var handleUnauthenticatedMessage = function (Env, msg, respond, Server, netfluxId) { Env.Log.silly('LOG_RPC', msg[0]); var method = UNAUTHENTICATED_CALLS[msg[0]]; @@ -36,7 +37,7 @@ var handleUnauthenticatedMessage = function (Env, msg, respond, Server) { return void respond(err); } respond(err, [null, value, null]); - }, Server); + }, Server, netfluxId); }; const AUTHENTICATED_USER_TARGETED = { @@ -117,7 +118,7 @@ var handleAuthenticatedMessage = function (Env, unsafeKey, msg, respond, Server) return void Respond('UNSUPPORTED_RPC_CALL', msg); }; -var rpc = function (Env, Server, data, respond) { +var rpc = function (Env, Server, userId, data, respond) { if (!Array.isArray(data)) { Env.Log.debug('INVALID_ARG_FORMET', data); return void respond('INVALID_ARG_FORMAT'); @@ -136,15 +137,16 @@ var rpc = function (Env, Server, data, respond) { } if (isUnauthenticateMessage(msg)) { - return handleUnauthenticatedMessage(Env, msg, respond, Server); + return handleUnauthenticatedMessage(Env, msg, respond, Server, userId); } var signature = msg.shift(); var publicKey = msg.shift(); // make sure a user object is initialized in the cookie jar + var session; if (publicKey) { - Core.getSession(Env.Sessions, publicKey); + session = Core.getSession(Env.Sessions, publicKey); } else { Env.Log.debug("NO_PUBLIC_KEY_PROVIDED", publicKey); } @@ -174,6 +176,7 @@ var rpc = function (Env, Server, data, respond) { // check the signature on the message // refuse the command if it doesn't validate if (Core.checkSignature(Env, serialized, signature, publicKey) === true) { + HK.authenticateNetfluxSession(Env, userId, publicKey); return void handleAuthenticatedMessage(Env, publicKey, msg, respond, Server); } return void respond("INVALID_SIGNATURE_OR_PUBLIC_KEY"); @@ -202,9 +205,9 @@ RPC.create = function (Env, cb) { Core.expireSessions(Sessions); }, Core.SESSION_EXPIRATION_TIME); - cb(void 0, function (Server, data, respond) { + cb(void 0, function (Server, userId, data, respond) { try { - return rpc(Env, Server, data, respond); + return rpc(Env, Server, userId, data, respond); } catch (e) { console.log("Error from RPC with data " + JSON.stringify(data)); console.log(e.stack);