jam:pub-sub

v0.4.2Published last week

PubSub

jam:pub-sub brings four key features to Meteor apps:

  1. Method-based publish / subscribe
  2. Change Streams-based publish / subscribe + server-side data caching
  3. Subscription caching
  4. Configuring server state (optional)

Method-based publish / subscribe

Meteor's traditional publish / subscribe is truly wonderful. However, there is a cost to pushing updates reactively to all connected clients – it's resource intensive and will eventually place limits on your ability to scale your app.

One way to reduce the need for the traditional publish / subscribe is to fetch the data via a Meteor Method but there's a big problem here: the data won't be automatically merged into Minimongo and you completely lose Meteor's magical reactivity. Minimongo is great to work with and makes things easy as the source of truth on the client. Without it, you'll need to create your own stores on the client and essentially duplicate Minimongo.

With jam:pub-sub, you use Meteor.publish.once and the same Meteor.subscribe to have the data fetched via a Meteor Method and merged automatically in Minimongo so you can work with it as you're accustomed to. It also automatically preserves reactivity for the user when they make database writes. Note that these writes will not be broadcast in realtime to all connected clients by design but in many cases you might find that you don't need that feature of Meteor's traditional publish / subscribe.

Change Streams-based publish / subscribe

Beta

With jam:pub-sub and MongoDB Change Streams, you can preserve Meteor's magical reactivity for all clients while opting out of the traditional publish / subscribe and its use of the oplog. Use Meteor.publish.stream instead of using Meteor.publish and subscribe using the same Meteor.subscribe on the client.

Important: Change Streams will work best when the filter you use can be shared. To that end, if you have a publication that includes a userId, this package will filter out that condition when setting up the Change Stream because it will result in too many unique change streams. As an example, lets say you have this publication:

1Meteor.publish.stream('todos', function() {
2  return Todos.find({
3    $or: [
4      { isPrivate: false },
5      { owner: this.userId }
6    ]
7  });
8});

When this publication is invoked, it will fetch all the Todos that match the filter above and then begin watching a Change Stream with this filter:

1  { isPrivate: false }

A good rule of thumb: if you have a filter than relies on this.userId, you should use .once. If needed, you can split a compound publication into two publications, using a .stream with a filter than can be shared and a .once for the userId:

1Meteor.publish.stream('todos.public', function() {
2  return Todos.find({ isPrivate: false });
3});
4
5Meteor.publish.once('todos.private', function() {
6  return Todos.find({ isPrivate: true, owner: this.userId });
7});

The data will be merged correctly into Minimongo so you can keep one compound filter there if you'd like.

Note: In most cases, you'd likely benefit the most from using Meteor.publish.once anywhere you can and using Meteor.publish.stream only when you really need it and with a filter than can be shared.

Note: If you decide to entirely opt-out of using the traditional Meteor.publish, then you'll also want to disable the oplog entirely — add the disable-oplog package with meteor add disable-oplog.

At the moment, this feature is considered in a beta state. Based on previous Change Streams experiments by the Meteor Community, it seems that using Change Streams as a wholesale replacement for the traditional publish / subscribe could "just work". However, in practice it may be a "Your Mileage May Vary" type of situation depending on the frequency of writes, number of connected clients, how you model your data, and how you set up the cursors inside of Meteor.publish.stream. With that said, if you're interested in this feature, I'd encourage you to try it out and share your findings.

Caching .stream data on the server

By default, the intial set of documents requested by a .stream are cached and kept in sync as data changes. For example, let's say you set up a stream for a chat room. When the first person connects, they'll fetch the data for the room, e.g. the most recent 20 messages based on your sort and limit, and establish a change stream for the room.

1Meteor.publish.stream('messages.room', function({ roomId, sort, limit }) {
2  // check the data
3  return Messages.find({ roomId }, { sort, limit });
4});

As new messages are inserted / updated / deleted, that set of the 20 most recent messages will be kept in sync so that when others join the room, they'll pull directly from the cache instead of hitting the database. As users scroll back through the message history, the historical documents will not be cached to avoid expending server resources.

So instead of the traditional Meteor behavior of keeping all state for every client + 1 for the observer, you'll be only keeping the 20 most recent documents. This should be a big help in freeing up server and db resources and reducing latency.

Subscription caching

Normally, when a user moves between routes or components, the subscriptions will be stopped. When a user is navigating back and forth in your app, each time will result in a re-subscribe which means more spinners, a slower experience, and is generally a waste.

By caching your subscriptions, you can create a better user experience for your users. Since the subscription itself is being cached, the data in Minimongo will be updated in the background until the cacheDuration expires for that subscription at which point it will be stopped and the data will be removed from Minimongo as expected.

Configuring server state

You can configure the amount of state you want to keep on the server by setting serverState. It can be one of auto | standard | minimal | none. By default, it's set to auto.

Note: The users and roles collections are excluded from the below. They will use the standard Meteor behavior.

auto

  • If you are exclusively using .streams for a collection, it will disable Meteor's mergebox and only keep the set of documents as detailed above in Caching .stream data on the server. The tradeoff we're making here is substanially reduced server memory and CPU usage for potentially higher bandwidth.
  • If you are only using traditional publications, no changes will be made from the current behavior.

standard

  • Preserves the standard Meteor behavior of keeping state for all clients + 1 for the observer for publications and diffing oplog data.

minimal

none

  • Does not keep any server state. Disables mergebox completely. See NO_MERGE_NO_HISTORY for more info.

Usage

Add the package to your app

meteor add jam:pub-sub

Define a Method-based publication

Define a publication using Meteor.publish.once and subscribe just as you do currently. Meteor.publish.once expects you to return a cursor or an array of cursors just like Meteor.publish.

1// server
2Meteor.publish.once('notes.all', function() {
3  return Notes.find();
4});
1// client
2// Since each view layer (Blaze, React, Svelte, Vue, etc) has a different way of using `Tracker.autorun`, I've omitted it for brevity. You'd subscribe just as you do currently in your view layer of choice.
3Meteor.subscribe('notes.all')
4
5// work with the Notes collection in Minimongo as you're accustomed to
6Notes.find().fetch();

That's it. By using Meteor.publish.once, it will fetch the data initally and automatically merge it into Minimongo. Any database writes to the Notes collection will be sent reactively to the user that made the write.

Important: when naming your publications be sure to include the collection name(s) in it. This is generally common practice and this package relies on that convention. If you don't do this and you're caching the subscription, Minimongo data may be unexpectedly removed or retained when the subscription stops. It's recommended that you follow this convention for all publications including Meteor.publish. Here are some examples of including the collection name in the publication name:

1// the name you assign inside Mongo.Collection should be in your publication name(s), in this example 'notes'
2const Notes = new Mongo.Collection('notes')
3
4// as long as it appears somewhere in your publication name, you're good to go. here are some examples:
5Meteor.publish.once('notes');
6Meteor.publish.once('notes.all');
7Meteor.publish.once('notes/all');
8Meteor.publish.once('allNotes');

It also works just as you'd expect for an array of cursors:

1// server
2Meteor.publish.once('notes.todos.all', function() {
3  return [Notes.find(), Todos.find()];
4});
1// client
2Meteor.subscribe('notes.todos.all');
3
4// work with the Notes collection in Minimongo as you're accustomed to
5Notes.find().fetch();
6
7// work with the Todos collection in Minimongo as you're accusomted to
8Todos.find().fetch();

Inside Meteor.publish.once, this.userId and this.added can still be used. The added document will be included in the final result data. The rest of the low-level publish API will be disregarded, as they no longer fit into the context of a Method-based data fetch.

1Meteor.publish.once('notes.all', function() {
2  // ... //
3  const userId = this.userId;
4
5  this.added('notes', _id, fields);
6  // ... //
7  return Notes.find();
8})

Define a Change Streams-based publication

Define a publication using Meteor.publish.stream and subscribe just as you do currently. Meteor.publish.stream expects you to return a cursor or an array of cursors just like Meteor.publish.

1// server
2Meteor.publish.stream('notes.all', function() {
3  return Notes.find();
4});
1// client
2// Since each view layer (Blaze, React, Svelte, Vue, etc) has a different way of using `Tracker.autorun`, I've omitted it for brevity. You'd subscribe just as you do currently in your view layer of choice.
3Meteor.subscribe('notes.all')
4
5// work with the Notes collection in Minimongo as you're accustomed to
6Notes.find().fetch();

That's it. By using Meteor.publish.stream, any database writes to the Notes collection will be sent reactively to all connected clients just as with Meteor.publish.

Setting the maxPoolSize for Change Streams

maxPoolSize defaults to 100 which may not need adjusting. If you need to adjust it, you can set it in Meteor.settings like this:

1{
2  //...//
3  "packages": {
4    "mongo": {
5      "options": {
6        "maxPoolSize": 200 // or whatever is appropriate for your application
7      }
8    }
9  }
10  // ... //
11}

Turn on subscription caching

With jam:pub-sub, you can enable subscription caching globally or at a per-subscription level. Subscription caching is turned off by default to preserve the current behavior in Meteor. Any subscription can be cached, regardless of how it's published.

To enable subscription caching globally for every subscription:

1// put this in a file that's imported on the client at a minimum. it can be used isomorphically but the configuration only applies to the client.
2import { PubSub } from 'meteor/jam:pub-sub';
3
4PubSub.configure({
5  cache: true // defaults to false
6});

The global cacheDuration is set to 60 seconds by default. This is from when the subscription was originally set to be stopped, i.e. when the component housing the subscription was destroyed because the user navigated away. If the user comes right back, then the cache will be used. If they don't, after 60 seconds, the subscription cache will be removed. If you want to change the global cacheDuration, change it with a value in seconds:

1import { PubSub } from 'meteor/jam:pub-sub';
2
3PubSub.configure({
4  cacheDuration: 5 * 60 // sets the cacheDuration to 5 minutes. defaults to 1 min
5});

You can also configure cache and cacheDuration for each individual subscription when you use Meteor.subscribe. For example:

1Meteor.subscribe('todos.single', _id, { cacheDuration: 30 }) // caches for 30 seconds, overriding the global default
2Meteor.subscribe('notes.all', { cache: true }) // turns caching on, overriding the global default, and uses the global default cacheDuration

Note: the rest of the Meteor.subscribe API (e.g. onStop, onReady) works just as you'd expect.

Note: Because the data will remain in Minimongo while the subscription is cached, you should be mindful of your Minimongo .find selectors. Be sure to use specific selectors to .find the data you need for that particular subscription. This is generally considered best practice so this is mainly a helpful reminder.

Infinite duration

You can set a subscription so that it stays alive for the user's session by setting cacheDuration: Infinity. You'll likely want a good reason for doing so and it'd probably be wise to limit its use to .once publications. A potentially good use case would be for some particular current user info that you want to keep cached in minimongo.

Infinite scroll / pagination with .stream and .once

In general, Mongo recommends using range-based pagination rather than using skip and limit, which can be slow. Both ways are shown below.

Note: .stream can be swapped for .once in these examples. It depends on the needs of your app. Note: we’re using svelte in these examples but you can use your frontend of choice.

Set up the client

Infinite scroll

1const limit = 10;
2let max = limit;
3let sort = { updatedAt: -1 };
4let updatedAt;
5
6$m: sub = Meteor.subscribe('todos.shared', { updatedAt, sort, limit }); // we don't want to pass in a dynamic limit when we filter using updatedAt, that way we ensure we fetch at most whatever the limit is set to
7
8$m: todos = Todos.find({ isPrivate: false }, { sort, limit: max }).fetch(); // we want a dynamic limit here
9
10function next() {
11  updatedAt = todos.at(-1)?.updatedAt; // get the last updatedAt so we can use it for our publication filter
12  max += limit;
13}

Pagination

Same set up as above but we introduce skip on the client only and tweak the next() function.

1// max, limit, sort, updatedAt remain the same as the infinite scroll example above
2let skip = 0;
3
4$m: sub = Meteor.subscribe('todos.shared', { updatedAt, sort, limit });
5
6$m: todos = Todos.find({ isPrivate: false }, { sort, skip, limit }).fetch(); // if you're not caching the subscription, you'll want to remove skip here
7
8function next() {
9  updatedAt = todos.at(-1)?.updatedAt;
10  skip += limit;
11}

Set up your publication on the server

The publication is the same for both of the above client setups:

1Meteor.publish.stream('todos.shared', function({ updatedAt, sort, limit }) {
2  check(updatedAt, Match.Maybe(Date));
3  check(sort, { updatedAt: Number });
4  check(limit, Match.Maybe(Number));
5
6  return Todos.find({ isPrivate: false }, { sort, limit }); // will paginate efficiently by updatedAt automatically
7});

When there's a value for updatedAt, it will automatically be included in the filter because we're sorting by that same field. Alternatively, you can explicitly set it in the filter if you like:

1// same set up as the above Meteor.publish.stream with these tweaks
2const operator = sort.updatedAt === -1 ? '$lt' : '$gt';
3
4return Todos.find(
5  { isPrivate: false, ...(updatedAt && {updatedAt: {[operator]: updatedAt}}) },
6  { sort, limit }
7);

Either way, the result will be the same. Fast, efficient pagination without using skip when fetching the data from Mongo.

Flipping the sort direction

If you wanted to allow flipping the sort direction, you can add a function like this on the client:

1function flipSort() {
2    // to clean up minimongo as expected, you'll want to do this instead of mutating sort.updatedAt directly, e.g. sort.updatedAt = -sort.updatedAt
3    sort = { updatedAt: -sort.updatedAt }
4    // we'll want to reset these two
5    max = limit;
6    updatedAt = undefined;
7  }

Pagination with go-to-page using skip

If your app needs to be able to go to a specific page when paginating, here's one way to approach it. The UX below reveals the pages as your user goes to the next so they can easily jump to any page they've previously been to rather than laying all the pages out initially for them to randomly jump to.

1let page = 1;
2let pages = new Set([page]);
3const total = 10; // you'd probably fetch this number from the db initially or use a reactive counter
4const maxPage = Math.round(total / limit);
5
6$: skip = (page - 1) * limit;
7$m: sub = Meteor.subscribe('todos.shared', { sort, skip, limit });
8
9$m: todos = Todos.find({ isPrivate: false }, { sort, skip, limit }).fetch(); // if you're not caching the subscription, you'll want to remove skip here
10
11/* if you didn't want to fetch the total, you could do something like this instead and then make some adjustments to your template as needed to support it
12  $m: if (page !== 1 && sub.ready() && !todos.length) {
13    maxPage = page;
14  }
15*/
16
17function prev() {
18  if (page > 1) {
19    page--;
20  }
21}
22
23function next() {
24  if (page < maxPage) {
25    page++;
26    pages = pages.add(page);
27  }
28}
29
30function goToPage(p) {
31  if (page === p) return;
32
33  page = p;
34}
35
36function flipSort() {
37  sort = { updatedAt: -sort.updatedAt }
38  page = 1;
39}
1  <button on:click={prev} disabled={page === 1}>&larr;</button>
2  <button on:click={next} disabled={page === maxPage}>&rarr;</button>
3  {#each [...pages] as p}
4    <button class:current={page === p} on:click={() => goToPage(p)}>{p}</button>
5  {/each}
1// on the server
2Meteor.publish.stream('todos.shared', function({ sort, skip, limit }) {
3  check(sort, { updatedAt: Number });
4  check(skip, Match.Maybe(Number));
5  check(limit, Match.Maybe(Number));
6
7  return Todos.find({ isPrivate: false }, { sort, skip, limit });
8});

Note: remember, if you prefer not to cache all the data on the client while paginating and have the data fetched for each page but still cache your other subscriptions, you can always turn off caching for a particular subscription by passing in { cache: false } as the last argument to Meteor.subscribe

Clearing the cache

On the client, each individual subcription will be automatically removed from the cache when its cacheDuration elapses.

On the server, the cache for a .stream will be cleared when there are no longer any clients subscribed to it.

Though it shouldn't be necessary, you can programmatically clear all cached subscriptions and / or the server cache with:

1import { PubSub } from 'meteor/jam:pub-sub';
2
3PubSub.clearCache();

Configuring (optional)

The package is intended to be nearly zero config needed, but there is some flexibility in how you use it.

Note: You'd likely benefit from either caching all subscriptions by setting cache: true or at a minimum caching some.

Here are the defaults:

1const config = {
2  cache: false, // globally turn subscription caching on or off, off by default.
3  cacheDuration: 1 * 60, // globally set the cacheDuration in seconds, 1 min is the default.
4  serverState: 'auto', // configure how much state you want to keep on the server
5  debug: Meteor.isDevelopment // will provide some console.logs to help make sure you have things set up as you expect
6};

To change the defaults, use:

1// put this in a file that's imported on both the client and server
2import { PubSub } from 'meteor/jam:pub-sub';
3
4PubSub.configure({
5  // ... change the defaults here ... //
6});