v0.14.4Published 4 months ago


Custom pub/sub system that works through channels (avoiding every oplog change hitting every Meteor instance creating an exponential scaling problem). It uses Redis to communicate between Meteor processes.

This package implements custom APIs for:

  1. Writing data into the database and notifying the pub/sub channel(s) about the change.
  2. Data publication mechanism that subscribes to a pub/sub channel instead of using Meteor's oplog tailing.

Most of the performance improvement comes from the fact that we split changes into separate channels, thus allowing server publications to process changes only from the channels they are interested in instead of every single change as is the case with Meteor by default. Also it fetches DB as little as possible, every observer receives method, selector, and modifer and tries to modify docs right in the memory. It does fetch DB in the case of uncertainty that the operation will be accurate (complicated modifier, race condition, limit, skip or sort options). Needless to say, redpubsub subscriptions reuse observers with the same options and observers reuse Redis channels.

This all works well in Chatra. Performance improved to a point where we no longer worry about performance (not any time soon at least). Right now ≈300 active sessions give about 5% CPU load on a single machine, before this implementation ≈150 sessions cost us about 75% of CPU.


meteor add chatra:redpubsub


This package uses Redis as the communication channel between nodes. It uses pub/sub functionality of Redis. You need to have redis-server running locally during development and RPS_REDIS_URL environment variable in production.

If you are new to redis, read this guide.


RPS.write(collection, methodName, [options], [callback]) (server & client simulation)

Insert a doc synchronously:

1var newMessageId = RPS.write(Messages, 'insert', {
2    doc: {
3      message: messageString,
4      ts:,
5      clientId: clientId
6    }

Update asynchronously (callback is passed):

1RPS.write(Messages, 'update', {
2    selector: {_id: messageId},
3    modifier: {$set: {message: messageString, updated: true}}
4}, function (error, result) {
5  if (error) console.warn(error);

Send ephemeral DB-less typing signal to listeners:

1RPS.write(Typings, 'upsert', {
2    selector: {_id: clientId},
3    modifier: {$set: {isTyping: true}},
4    withoutMongo: true // do not touch Mongo at all

Note that if you call RPS.write only on the client (outside of the universal methods, for example) channels won’t be notified about the change.

RPS.config[collectionName] = options; (server)

Configure what channel(s) to notify via RPS.config object:

1RPS.config.testCollection = {
2  channels: ['testCollection', 'anotherStaticChannel']

Define channel dinamically:

1RPS.config.Clients = {
2  channels: function (doc, selector) {
3    return 'clientById:' + doc._id;
4  }

Note that selector in above example is taken from RPS.write call.

To compute the chanell name use doc and selector properties:

1RPS.config.Clients = {
2  channels: function (doc, selector) {
3    return doc && doc.hostId && 'clientsByHostId:' + doc.hostId;
4  }

RPS.publish(subscription, [request1, request2...]) (server)

Use it inside Meteor.publish:

1Meteor.publish('messages', function (clientId) {
2    RPS.publish(this, {
3        collection: Messages,
4        options: {
5            selector: {clientId: clientId},
6            options: {fields: {secretAdminNote: 0}},
8            // channel to listen to
9            channel: 'messagesByClientId:' + clientId,
10        }
11    });

Publish two or more subscriptions:

1Meteor.publish('client', function (clientId) {
2    RPS.publish(this, [
3        {
4            collection: Clients,
5            options: {
6                selector: {_id: clientId},
7                channel: 'clientById:' + clientId
8            }
9        },
10        {
11            collection: Typings,
12            options: {
13                selector: {_id: clientId},
14                channel: 'typingByClientId:' + clientId,
15                withoutMongo: true
16            }
17        }
18    ]);

RPS.observeChanges(collection, options, callbacks) (server)

It behaves just like Meteor’s cursor.observeChange:

1var count = 0;
2var handler = RPS.observeChanges(Hits, {selector: {siteId: siteId}, options: {fields: {_id: 1}}}, {
3    added: function (id, fields) {
4      count++;
5    },
6    removed: function (id) {
7      count--;
8    }
9    // don't care about changed
12// stop it when you need:
13// handler.stop();

To test in your local app while developing the package:

ln -s ~/Projects/Chatra/redpubsub packages/redpubsub