From da11b2a23538af908d933c9a3d3ee3be6dec5d21 Mon Sep 17 00:00:00 2001 From: ggn06awu Date: Wed, 13 Jan 2016 09:15:53 +0000 Subject: [PATCH 1/6] Added wildcard support to channel listeners. We need to be able to listen on channel with a wildcard, the work around would have meant us creating far too many channels. This code lets us listen to channels like so: RPS.publish(this, { collection: Comments, options: { selector: {contextId: contextId}, channel: 'comments::' + contextId + '::*' } }); Creating a pull request to merge this to chatra's original version. --- messenger.js | 91 ++++++++++++++++++++++++++++++++-------------------- 1 file changed, 57 insertions(+), 34 deletions(-) diff --git a/messenger.js b/messenger.js index dfee2a4..9550e40 100644 --- a/messenger.js +++ b/messenger.js @@ -1,38 +1,61 @@ +/** + * Match a wildcard rule against and input string. + * + * "a*b" => everything that starts with "a" and ends with "b" + " a*" => everything that starts with "a" + " *b" => everything that ends with "b" + " *a*" => everything that has a "a" in it + " *a*b*" => everything that has a "a" in it, followed by anything, followed by a "b", followed by anything + * @param str - The full string to be matched against, e.g. my::channel::value + * @param rule - A string to match with, which can include wildcards, e.g. my::channel::* + * @returns {boolean} + * @private + */ +RPS._matchRuleShort = function(str, rule) { + return new RegExp("^" + rule.replace("*", ".*") + "$").test(str); +}; + RPS._messenger = { - channels: {}, - observers: {}, - addObserver: function (observerKey, channel) { - //console.log('RPS._messenger.addObserver; observerKey, channel:', observerKey, channel); - if (!RPS._messenger.channels[channel]) { - //console.log('RPS._messenger.addObserver → add channel; channel:', channel); - RPS._messenger.channels[channel] = {}; - } + channels: {}, + observers: {}, + addObserver: function (observerKey, channel) { + //console.log('RPS._messenger.addObserver; observerKey, channel:', observerKey, channel); + if (!RPS._messenger.channels[channel]) { + //console.log('RPS._messenger.addObserver → add channel; channel:', channel); + RPS._messenger.channels[channel] = {}; + } - RPS._messenger.channels[channel][observerKey] = true; - RPS._messenger.observers[observerKey] = channel; + RPS._messenger.channels[channel][observerKey] = true; + RPS._messenger.observers[observerKey] = channel; - RPS._sub(channel); - }, - removeObserver: function (observerKey) { - //console.log('RPS._messenger.removeObserver; observerKey:', observerKey); - var channel = RPS._messenger.observers[observerKey]; - if (channel) { - delete RPS._messenger.channels[channel][observerKey]; - if (_.isEmpty(RPS._messenger.channels[channel])) { - //console.log('RPS._messenger.removeObserver → remove channel; channel:', channel); - RPS._unsub(channel); - delete RPS._messenger.channels[channel]; - } - } - delete RPS._messenger.observers[observerKey]; - }, - onMessage: function (channel, message) { - //console.log('RPS._messenger.onMessage; channel, message:', channel, message); - _.each(RPS._messenger.channels[channel], function (flag, observerKey) { - var observer = RPS._observers[observerKey]; - if (observer) { - observer.onMessage(EJSON.clone(message)); - } - }); + RPS._sub(channel); + }, + removeObserver: function (observerKey) { + //console.log('RPS._messenger.removeObserver; observerKey:', observerKey); + var channel = RPS._messenger.observers[observerKey]; + if (channel) { + delete RPS._messenger.channels[channel][observerKey]; + if (_.isEmpty(RPS._messenger.channels[channel])) { + //console.log('RPS._messenger.removeObserver → remove channel; channel:', channel); + RPS._unsub(channel); + delete RPS._messenger.channels[channel]; + } } -}; \ No newline at end of file + delete RPS._messenger.observers[observerKey]; + }, + onMessage: function (channel, message) { + //console.log('RPS._messenger.onMessage; channel, message:', channel, message); + var channels = Object.keys(RPS._messenger.channels); + _.each(channels, function (openChannel, idx) { + //Support wildcard matching for open channels. + if (RPS._matchRuleShort(channel, openChannel)) { + _.each(RPS._messenger.channels[openChannel], function (flag, observerKey) { + var observer = RPS._observers[observerKey]; + if (observer) { + observer.onMessage(EJSON.clone(message)); + } + }); + } + }); + } +}; From 69f831a239da2852bf7b58b6af2458b675bc098c Mon Sep 17 00:00:00 2001 From: ggn06awu Date: Wed, 13 Jan 2016 09:19:04 +0000 Subject: [PATCH 2/6] Returning to 4 space indents ready for a pull. --- messenger.js | 91 ++++++++++++++++++++++++++-------------------------- 1 file changed, 46 insertions(+), 45 deletions(-) diff --git a/messenger.js b/messenger.js index 9550e40..f34d254 100644 --- a/messenger.js +++ b/messenger.js @@ -1,61 +1,62 @@ /** * Match a wildcard rule against and input string. * - * "a*b" => everything that starts with "a" and ends with "b" - " a*" => everything that starts with "a" - " *b" => everything that ends with "b" - " *a*" => everything that has a "a" in it - " *a*b*" => everything that has a "a" in it, followed by anything, followed by a "b", followed by anything + * "a*b" => everything that starts with "a" and ends with "b" + " a*" => everything that starts with "a" + " *b" => everything that ends with "b" + " *a*" => everything that has a "a" in it + " *a*b*" => everything that has a "a" in it, followed by anything, followed by a "b", followed by anything + * @param str - The full string to be matched against, e.g. my::channel::value * @param rule - A string to match with, which can include wildcards, e.g. my::channel::* * @returns {boolean} * @private */ RPS._matchRuleShort = function(str, rule) { - return new RegExp("^" + rule.replace("*", ".*") + "$").test(str); + return new RegExp("^" + rule.replace("*", ".*") + "$").test(str); }; RPS._messenger = { - channels: {}, - observers: {}, - addObserver: function (observerKey, channel) { - //console.log('RPS._messenger.addObserver; observerKey, channel:', observerKey, channel); - if (!RPS._messenger.channels[channel]) { - //console.log('RPS._messenger.addObserver → add channel; channel:', channel); - RPS._messenger.channels[channel] = {}; - } + channels: {}, + observers: {}, + addObserver: function (observerKey, channel) { + //console.log('RPS._messenger.addObserver; observerKey, channel:', observerKey, channel); + if (!RPS._messenger.channels[channel]) { + //console.log('RPS._messenger.addObserver → add channel; channel:', channel); + RPS._messenger.channels[channel] = {}; + } - RPS._messenger.channels[channel][observerKey] = true; - RPS._messenger.observers[observerKey] = channel; + RPS._messenger.channels[channel][observerKey] = true; + RPS._messenger.observers[observerKey] = channel; - RPS._sub(channel); - }, - removeObserver: function (observerKey) { - //console.log('RPS._messenger.removeObserver; observerKey:', observerKey); - var channel = RPS._messenger.observers[observerKey]; - if (channel) { - delete RPS._messenger.channels[channel][observerKey]; - if (_.isEmpty(RPS._messenger.channels[channel])) { - //console.log('RPS._messenger.removeObserver → remove channel; channel:', channel); - RPS._unsub(channel); - delete RPS._messenger.channels[channel]; - } - } - delete RPS._messenger.observers[observerKey]; - }, - onMessage: function (channel, message) { - //console.log('RPS._messenger.onMessage; channel, message:', channel, message); - var channels = Object.keys(RPS._messenger.channels); - _.each(channels, function (openChannel, idx) { - //Support wildcard matching for open channels. - if (RPS._matchRuleShort(channel, openChannel)) { - _.each(RPS._messenger.channels[openChannel], function (flag, observerKey) { - var observer = RPS._observers[observerKey]; - if (observer) { - observer.onMessage(EJSON.clone(message)); - } + RPS._sub(channel); + }, + removeObserver: function (observerKey) { + //console.log('RPS._messenger.removeObserver; observerKey:', observerKey); + var channel = RPS._messenger.observers[observerKey]; + if (channel) { + delete RPS._messenger.channels[channel][observerKey]; + if (_.isEmpty(RPS._messenger.channels[channel])) { + //console.log('RPS._messenger.removeObserver → remove channel; channel:', channel); + RPS._unsub(channel); + delete RPS._messenger.channels[channel]; + } + } + delete RPS._messenger.observers[observerKey]; + }, + onMessage: function (channel, message) { + //console.log('RPS._messenger.onMessage; channel, message:', channel, message); + var channels = Object.keys(RPS._messenger.channels); + _.each(channels, function (openChannel, idx) { + //Support wildcard matching for open channels. + if (RPS._matchRuleShort(channel, openChannel)) { + _.each(RPS._messenger.channels[openChannel], function (flag, observerKey) { + var observer = RPS._observers[observerKey]; + if (observer) { + observer.onMessage(EJSON.clone(message)); + } + }); + } }); - } - }); - } + } }; From 23642a42f50c1d98d537b8bd68b455306c490233 Mon Sep 17 00:00:00 2001 From: ggn06awu Date: Wed, 13 Jan 2016 11:07:32 +0000 Subject: [PATCH 3/6] Added an ID for mongo'less published documents Seems like the Redis routine wont work when sending messages through without some sort of ID. For something like an ephemeral chat message, there's no reason for an ID to exist, added this patch to let them fall through. --- observe-changes.js | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/observe-changes.js b/observe-changes.js index 6586eb6..888421f 100644 --- a/observe-changes.js +++ b/observe-changes.js @@ -153,6 +153,11 @@ RPS._observer.prototype.handleMessage = function (message, noPause) { //if (badTS) { // console.warn('RPS: RACE CONDITION! Don’t worry will fix it'); //} + + //If a message skips Mongo, it wont have any ID, yet the routien that follows relies on it. Forcing the issue for now. + if (message.withoutMongo && !message.id) { + message.id = message.ts + message._serverId; + } //console.log('RPS._observer.handleMessage; message, this.selector:', message, this.selector); var rightIds = this.needToFetchAlways && _.pluck(this.collection.find(this.selector, this.quickFindOptions).fetch(), '_id'), @@ -317,4 +322,4 @@ RPS._observer.prototype.kill = function () { delete RPS._observers[this.key]; this.initialized = false; //delete this.docs; -}; \ No newline at end of file +}; From e56e30f77ce0d8777a0c9e821d08e3e50427e4c9 Mon Sep 17 00:00:00 2001 From: ggn06awu Date: Sun, 3 Apr 2016 14:10:25 +0100 Subject: [PATCH 4/6] Update messenger.js The wildcard support was only working for one wildcard, this change allows support of multiple wildcards in the pattern as originally intended. --- messenger.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/messenger.js b/messenger.js index f34d254..7ac89ff 100644 --- a/messenger.js +++ b/messenger.js @@ -13,7 +13,7 @@ * @private */ RPS._matchRuleShort = function(str, rule) { - return new RegExp("^" + rule.replace("*", ".*") + "$").test(str); + return new RegExp("^" + rule.replace(/\*/g, ".*") + "$").test(str); }; RPS._messenger = { From 8b781545088511b89bf2e697b0cb0f9c5ae4f1a6 Mon Sep 17 00:00:00 2001 From: ggn06awu Date: Sun, 3 Apr 2016 14:23:59 +0100 Subject: [PATCH 5/6] Update package.js Restoring a unique package name from chatr's original code, to avoid meteor collisions. --- package.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.js b/package.js index a345b5e..331e696 100644 --- a/package.js +++ b/package.js @@ -1,5 +1,5 @@ Package.describe({ - name: 'chatra:redpubsub', + name: 'beeby:redpubsub', version: '0.8.6', // Brief, one-line summary of the package. summary: 'Custom pub/sub interface for Meteor on top of Redis', From a3c587004454c1ee5c3b3337775a14e2eb23d580 Mon Sep 17 00:00:00 2001 From: ggn06awu Date: Sun, 3 Apr 2016 14:24:34 +0100 Subject: [PATCH 6/6] Update package.js Correcting the GIT source. --- package.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.js b/package.js index 331e696..5947c60 100644 --- a/package.js +++ b/package.js @@ -4,7 +4,7 @@ Package.describe({ // Brief, one-line summary of the package. summary: 'Custom pub/sub interface for Meteor on top of Redis', // URL to the Git repository containing the source code for this package. - git: 'https://github.com/chatr/redpubsub.git', + git: 'https://github.com/ggn06awu/redpubsub.git', // By default, Meteor will default to using README.md for documentation. // To avoid submitting documentation, set this field to null. documentation: 'README.md'