diff --git a/messenger.js b/messenger.js index 083bb4a..7e6f73c 100644 --- a/messenger.js +++ b/messenger.js @@ -1,3 +1,21 @@ +/** + * Match a wildcard rule against and input string. + * + * "a*b" => everything that starts with "a" and ends with "b" + " a*" => everything that starts with "a" + " *b" => everything that ends with "b" + " *a*" => everything that has a "a" in it + " *a*b*" => everything that has a "a" in it, followed by anything, followed by a "b", followed by anything + + * @param str - The full string to be matched against, e.g. my::channel::value + * @param rule - A string to match with, which can include wildcards, e.g. my::channel::* + * @returns {boolean} + * @private + */ +RPS._matchRuleShort = function(str, rule) { + return new RegExp("^" + rule.replace(/\*/g, ".*") + "$").test(str); +}; + var Fiber = Npm.require('fibers'); RPS._messenger = { @@ -30,17 +48,23 @@ RPS._messenger = { }, onMessage: function (channel, message, runWithFiber) { //console.log('RPS._messenger.onMessage; channel, message:', channel, message); - _.each(RPS._messenger.channels[channel], function (flag, observerKey) { - var observer = RPS._observers[observerKey]; - if (observer) { - var messageClone = EJSON.clone(message); - if (runWithFiber) { - Fiber(function () { - observer.onMessage(messageClone); - }).run(); - } else { - observer.onMessage(messageClone); - } + var channels = Object.keys(RPS._messenger.channels); + _.each(channels, function (openChannel, idx) { + //Support wildcard matching for open channels. + if (RPS._matchRuleShort(channel, openChannel)) { + _.each(RPS._messenger.channels[openChannel], function (flag, observerKey) { + var observer = RPS._observers[observerKey]; + if (observer) { + var messageClone = EJSON.clone(message); + if (runWithFiber) { + Fiber(function () { + observer.onMessage(messageClone); + }).run(); + } else { + observer.onMessage(messageClone); + } + } + }) } }); } diff --git a/observe-changes.js b/observe-changes.js index a5edc2a..8ffb2ac 100644 --- a/observe-changes.js +++ b/observe-changes.js @@ -169,6 +169,19 @@ RPS._observer.prototype.onMessage = function (message) { RPS._observer.prototype.handleMessage = function (message) { //noPause || this.pause(); + // fight against race condition + var badTS = this.lastTS >= message.ts; + this.lastTS = badTS ? this.lastTS : message.ts; + + //if (badTS) { + // console.warn('RPS: RACE CONDITION! Don’t worry will fix it'); + //} + + //If a message skips Mongo, it wont have any ID, yet the routien that follows relies on it. Forcing the issue for now. + if (message.withoutMongo && !message.id) { + message.id = message.ts + message._serverId; + } + //console.log('RPS._observer.handleMessage; message, this.selector:', message, this.selector); var rightIds, ids = !message.id || _.isArray(message.id) ? message.id : [message.id]; @@ -350,4 +363,4 @@ RPS._observer.prototype.kill = function () { delete RPS._observers[this.key]; this.initialized = false; //delete this.docs; -}; \ No newline at end of file +}; diff --git a/package.js b/package.js index a345b5e..5947c60 100644 --- a/package.js +++ b/package.js @@ -1,10 +1,10 @@ Package.describe({ - name: 'chatra:redpubsub', + name: 'beeby:redpubsub', version: '0.8.6', // Brief, one-line summary of the package. summary: 'Custom pub/sub interface for Meteor on top of Redis', // URL to the Git repository containing the source code for this package. - git: 'https://github.com/chatr/redpubsub.git', + git: 'https://github.com/ggn06awu/redpubsub.git', // By default, Meteor will default to using README.md for documentation. // To avoid submitting documentation, set this field to null. documentation: 'README.md'