chatra:redpubsub

v1.0.1•Published 3 months ago

chatra:redpubsub

Version License

Custom pub/sub interface for Meteor on top of Redis, updated for Meteor 3 compatibility.

Table of Contents


Introduction

The chatra:redpubsub package provides a custom publish/subscribe interface for Meteor applications, leveraging Redis for real-time data synchronization across multiple server instances. This package is especially useful for scaling Meteor applications horizontally, ensuring that all instances remain in sync.


Features

  • Real-Time Data Synchronization: Uses Redis channels to synchronize data changes across multiple Meteor server instances.
  • Custom Channels: Configure custom channels for fine-grained control over data publication.
  • Asynchronous Operations: Fully supports asynchronous MongoDB operations introduced in Meteor 3.
  • No Fibers Dependency

Installation

meteor add chatra:redpubsub

Ensure that you have Redis installed and running. Set the RPS_REDIS_URL environment variable to point to your Redis instance:

export RPS_REDIS_URL=redis://localhost:6379

Compatibility

  • Meteor version 3 and above: Fully compatible, using the new asynchronous Meteor collections’ methods.

Configuration

Setting Up Redis Connection

The package uses the RPS_REDIS_URL environment variable to connect to your Redis instance. The URL should be in the format:

redis://:[password]@[hostname]:[port]

Example:

export RPS_REDIS_URL=redis://localhost:6379

Configuring Channels

You can configure channels on a per-collection basis using RPS.config:

1// server/main.js
2import { RPS } from 'meteor/chatra:redpubsub';
3
4RPS.config['collectionName'] = {
5  channels: (doc, selector, fields) => {
6    // Return a channel name or an array of channel names
7    return `custom_channel_${doc.userId}`;
8  },
9};

Usage

Server Side

Publishing Data

Use RPS.publish to publish data to clients:

1// server/main.js
2import { Meteor } from 'meteor/meteor';
3import { RPS } from 'meteor/chatra:redpubsub';
4import { CollectionName } from '/imports/api/collectionName';
5
6Meteor.publish('collectionName', function () {
7  return RPS.publish(this, {
8    collection: CollectionName,
9    options: {
10      selector: {}, // MongoDB selector
11      options: {},  // Find options
12    },
13  });
14});

Configuring Channels

You can configure custom channels for a collection:

1// server/main.js
2RPS.config['collectionName'] = {
3  channels: (doc) => `channel_${doc.userId}`,
4};

Using RPS.write on the Server

Perform write operations and automatically publish changes:

1// server/main.js
2import { RPS } from 'meteor/chatra:redpubsub';
3import { CollectionName } from '/imports/api/collectionName';
4
5async function updateDocument(docId, updateFields) {
6  const options = {
7    selector: { _id: docId },
8    modifier: { $set: updateFields },
9    options: {}, // MongoDB update options
10  };
11
12  try {
13    const result = await RPS.write(CollectionName, 'update', options);
14    console.log('Document updated:', result);
15  } catch (err) {
16    console.error('Error updating document:', err);
17  }
18}

Publishing with withoutMongo Option

This option allows you to publish changes without querying the database after the write operation.

1// server/main.js
2import { Meteor } from 'meteor/meteor';
3import { RPS } from 'meteor/chatra:redpubsub';
4import { MyCollection } from '/imports/api/myCollection';
5
6Meteor.publish('withoutMongoPub', function () {
7  return RPS.publish(this, {
8    collection: MyCollection,
9    options: {
10      selector: { active: true },
11      withoutMongo: true,  // Disable additional Mongo query after write
12    },
13  });
14});

Publishing Multiple Collections Simultaneously

You can pass an array of publication requests to RPS.publish to publish multiple collections at once:

1// server/main.js
2import { Meteor } from 'meteor/meteor';
3import { RPS } from 'meteor/chatra:redpubsub';
4import { CollectionOne } from '/imports/api/collectionOne';
5import { CollectionTwo } from '/imports/api/collectionTwo';
6
7Meteor.publish('multiCollections', function () {
8  return RPS.publish(this, [
9    {
10      collection: CollectionOne,
11      options: { selector: {} },
12    },
13    {
14      collection: CollectionTwo,
15      options: { selector: {} },
16    },
17  ]);
18});

Server-Side observeChanges

You can directly call RPS.observeChanges on the server to perform custom actions when data changes occur:

1// server/observe.js
2import { RPS } from 'meteor/chatra:redpubsub';
3import { CollectionName } from '/imports/api/collectionName';
4
5async function observeServerChanges() {
6  const handler = await RPS.observeChanges(
7    CollectionName,
8    { selector: {} },
9    {
10      added: (id, fields) => {
11        console.log('Document added:', id, fields);
12      },
13      changed: (id, fields) => {
14        console.log('Document changed:', id, fields);
15      },
16      removed: (id) => {
17        console.log('Document removed:', id);
18      },
19    }
20  );
21
22  // To stop observing:
23  // handler.stop();
24}
25
26observeServerChanges();

Client Side

Subscribing to Data

Subscribe to the published data:

1// client/main.js
2import { Meteor } from 'meteor/meteor';
3import { CollectionName } from '/imports/api/collectionName';
4
5Meteor.subscribe('collectionName');

Using RPS.write on the Client

Perform write operations from the client:

1// client/main.js
2import { RPS } from 'meteor/chatra:redpubsub';
3import { CollectionName } from '/imports/api/collectionName';
4
5async function insertDocument(doc) {
6  try {
7    const result = await RPS.write(CollectionName, 'insert', { doc });
8    console.log('Document inserted:', result);
9  } catch (err) {
10    console.error('Error inserting document:', err);
11  }
12}

Examples

Full Example

Server

1// server/main.js
2import { Meteor } from 'meteor/meteor';
3import { RPS } from 'meteor/chatra:redpubsub';
4import { Messages } from '/imports/api/messages';
5
6RPS.config['messages'] = {
7  channels: (doc) => `user_${doc.userId}_channel`,
8};
9
10Meteor.publish('userMessages', function () {
11  const userId = this.userId;
12  if (!userId) {
13    return this.ready();
14  }
15
16  return RPS.publish(this, {
17    collection: Messages,
18    options: {
19      selector: { userId },
20    },
21  });
22});
23
24Meteor.methods({
25  async 'messages.insert'(text) {
26    const userId = this.userId;
27    if (!userId) {
28      throw new Meteor.Error('Not authorized');
29    }
30
31    const doc = {
32      text,
33      userId,
34      createdAt: new Date(),
35    };
36
37    return await RPS.write(Messages, 'insert', { doc });
38  },
39});

Client

1// client/main.js
2import { Meteor } from 'meteor/meteor';
3import { Messages } from '/imports/api/messages';
4
5Meteor.subscribe('userMessages');
6
7Messages.find().observeChanges({
8  added(id, fields) {
9    console.log('Message added:', id, fields);
10  },
11  changed(id, fields) {
12    console.log('Message changed:', id, fields);
13  },
14  removed(id) {
15    console.log('Message removed:', id);
16  },
17});
18
19async function sendMessage(text) {
20  try {
21    await Meteor.callAsync('messages.insert', text);
22    console.log('Message sent');
23  } catch (err) {
24    console.error('Error sending message:', err);
25  }
26}

License

This package is licensed under the MIT License.