chatra:redpubsub
Custom pub/sub interface for Meteor on top of Redis, updated for Meteor 3 compatibility.
Table of Contents
Introduction
The chatra:redpubsub
package provides a custom publish/subscribe interface for Meteor applications, leveraging Redis for real-time data synchronization across multiple server instances. This package is especially useful for scaling Meteor applications horizontally, ensuring that all instances remain in sync.
Features
- Real-Time Data Synchronization: Uses Redis channels to synchronize data changes across multiple Meteor server instances.
- Custom Channels: Configure custom channels for fine-grained control over data publication.
- Asynchronous Operations: Fully supports asynchronous MongoDB operations introduced in Meteor 3.
- No Fibers Dependency
Installation
meteor add chatra:redpubsub
Ensure that you have Redis installed and running. Set the RPS_REDIS_URL
environment variable to point to your Redis instance:
export RPS_REDIS_URL=redis://localhost:6379
Compatibility
- Meteor version 3 and above: Fully compatible, using the new asynchronous Meteor collections’ methods.
Configuration
Setting Up Redis Connection
The package uses the RPS_REDIS_URL
environment variable to connect to your Redis instance. The URL should be in the format:
redis://:[password]@[hostname]:[port]
Example:
export RPS_REDIS_URL=redis://localhost:6379
Configuring Channels
You can configure channels on a per-collection basis using RPS.config
:
1// server/main.js 2import { RPS } from 'meteor/chatra:redpubsub'; 3 4RPS.config['collectionName'] = { 5 channels: (doc, selector, fields) => { 6 // Return a channel name or an array of channel names 7 return `custom_channel_${doc.userId}`; 8 }, 9};
Usage
Server Side
Publishing Data
Use RPS.publish
to publish data to clients:
1// server/main.js 2import { Meteor } from 'meteor/meteor'; 3import { RPS } from 'meteor/chatra:redpubsub'; 4import { CollectionName } from '/imports/api/collectionName'; 5 6Meteor.publish('collectionName', function () { 7 return RPS.publish(this, { 8 collection: CollectionName, 9 options: { 10 selector: {}, // MongoDB selector 11 options: {}, // Find options 12 }, 13 }); 14});
Configuring Channels
You can configure custom channels for a collection:
1// server/main.js 2RPS.config['collectionName'] = { 3 channels: (doc) => `channel_${doc.userId}`, 4};
Using RPS.write
on the Server
Perform write operations and automatically publish changes:
1// server/main.js 2import { RPS } from 'meteor/chatra:redpubsub'; 3import { CollectionName } from '/imports/api/collectionName'; 4 5async function updateDocument(docId, updateFields) { 6 const options = { 7 selector: { _id: docId }, 8 modifier: { $set: updateFields }, 9 options: {}, // MongoDB update options 10 }; 11 12 try { 13 const result = await RPS.write(CollectionName, 'update', options); 14 console.log('Document updated:', result); 15 } catch (err) { 16 console.error('Error updating document:', err); 17 } 18}
Publishing with withoutMongo
Option
This option allows you to publish changes without querying the database after the write operation.
1// server/main.js 2import { Meteor } from 'meteor/meteor'; 3import { RPS } from 'meteor/chatra:redpubsub'; 4import { MyCollection } from '/imports/api/myCollection'; 5 6Meteor.publish('withoutMongoPub', function () { 7 return RPS.publish(this, { 8 collection: MyCollection, 9 options: { 10 selector: { active: true }, 11 withoutMongo: true, // Disable additional Mongo query after write 12 }, 13 }); 14});
Publishing Multiple Collections Simultaneously
You can pass an array of publication requests to RPS.publish
to publish multiple collections at once:
1// server/main.js 2import { Meteor } from 'meteor/meteor'; 3import { RPS } from 'meteor/chatra:redpubsub'; 4import { CollectionOne } from '/imports/api/collectionOne'; 5import { CollectionTwo } from '/imports/api/collectionTwo'; 6 7Meteor.publish('multiCollections', function () { 8 return RPS.publish(this, [ 9 { 10 collection: CollectionOne, 11 options: { selector: {} }, 12 }, 13 { 14 collection: CollectionTwo, 15 options: { selector: {} }, 16 }, 17 ]); 18});
Server-Side observeChanges
You can directly call RPS.observeChanges
on the server to perform custom actions when data changes occur:
1// server/observe.js 2import { RPS } from 'meteor/chatra:redpubsub'; 3import { CollectionName } from '/imports/api/collectionName'; 4 5async function observeServerChanges() { 6 const handler = await RPS.observeChanges( 7 CollectionName, 8 { selector: {} }, 9 { 10 added: (id, fields) => { 11 console.log('Document added:', id, fields); 12 }, 13 changed: (id, fields) => { 14 console.log('Document changed:', id, fields); 15 }, 16 removed: (id) => { 17 console.log('Document removed:', id); 18 }, 19 } 20 ); 21 22 // To stop observing: 23 // handler.stop(); 24} 25 26observeServerChanges();
Client Side
Subscribing to Data
Subscribe to the published data:
1// client/main.js 2import { Meteor } from 'meteor/meteor'; 3import { CollectionName } from '/imports/api/collectionName'; 4 5Meteor.subscribe('collectionName');
Using RPS.write
on the Client
Perform write operations from the client:
1// client/main.js 2import { RPS } from 'meteor/chatra:redpubsub'; 3import { CollectionName } from '/imports/api/collectionName'; 4 5async function insertDocument(doc) { 6 try { 7 const result = await RPS.write(CollectionName, 'insert', { doc }); 8 console.log('Document inserted:', result); 9 } catch (err) { 10 console.error('Error inserting document:', err); 11 } 12}
Examples
Full Example
Server
1// server/main.js 2import { Meteor } from 'meteor/meteor'; 3import { RPS } from 'meteor/chatra:redpubsub'; 4import { Messages } from '/imports/api/messages'; 5 6RPS.config['messages'] = { 7 channels: (doc) => `user_${doc.userId}_channel`, 8}; 9 10Meteor.publish('userMessages', function () { 11 const userId = this.userId; 12 if (!userId) { 13 return this.ready(); 14 } 15 16 return RPS.publish(this, { 17 collection: Messages, 18 options: { 19 selector: { userId }, 20 }, 21 }); 22}); 23 24Meteor.methods({ 25 async 'messages.insert'(text) { 26 const userId = this.userId; 27 if (!userId) { 28 throw new Meteor.Error('Not authorized'); 29 } 30 31 const doc = { 32 text, 33 userId, 34 createdAt: new Date(), 35 }; 36 37 return await RPS.write(Messages, 'insert', { doc }); 38 }, 39});
Client
1// client/main.js 2import { Meteor } from 'meteor/meteor'; 3import { Messages } from '/imports/api/messages'; 4 5Meteor.subscribe('userMessages'); 6 7Messages.find().observeChanges({ 8 added(id, fields) { 9 console.log('Message added:', id, fields); 10 }, 11 changed(id, fields) { 12 console.log('Message changed:', id, fields); 13 }, 14 removed(id) { 15 console.log('Message removed:', id); 16 }, 17}); 18 19async function sendMessage(text) { 20 try { 21 await Meteor.callAsync('messages.insert', text); 22 console.log('Message sent'); 23 } catch (err) { 24 console.error('Error sending message:', err); 25 } 26}
License
This package is licensed under the MIT License.