tunguska:reactive-aggregate

v1.3.16Published 3 weeks ago

tunguska-reactive-aggregate v1.3.15

Reactively publish aggregations.

Originally based on jcbernack:reactive-aggregate.

This version removes the dependency on meteorhacks:reactive-aggregate and instead uses the underlying MongoDB Nodejs library. In addition, it uses ES6/7 coding, including Promises and import/export syntax, so should be imported into your (server) codebase where it's needed.

In spite of those changes, the API is basically unchanged and is backwards compatible, as far as I know. However, there are several additional properties of the options parameter. See the notes in the Usage section.

Changed behaviour in v1.2.3: See https://github.com/robfallows/tunguska-reactive-aggregate/issues/23 for more information.

History

See changelog.

meteor add tunguska:reactive-aggregate

This helper can be used to reactively publish the results of an aggregation.

Mongo.ObjectID support

If your collections use the Meteor default of String for MongoDB document ids, you can skip this section and may want to set options.specificWarnings.objectId = false and options.loadObjectIdModules = false

However, if you use the Mongo.ObjectID type for document ids, full support for handling Mongo.ObjectIDs is only enabled if simpl-schema and either lodash-es or lodash are installed. For backward compatibility, they are not required. (Only the set functionality of lodash-es/lodash is imported, if you're concerned about the full package bloating your code size).

You can install them in your project with:

meteor npm i simpl-schema

meteor npm i lodash-es or meteor npm i lodash

Additionally, unless you have defined SimpleSchemas for your collections, you still won't have full support for handling Mongo.ObjectIDs. The _id field of your primary collection will be handled properly without installing these packages and without having SimpleSchemas defined, but any embedded Mongo.ObjectID fields will not be handled properly unless you set up full support with these packages and schema definitions. Defining SimpleSchemas is beyond the scope of this writeup, but you can learn about it at simple-schema on GitHub.

If you're curious about why Mongo.ObjectIDs require special support at all, it's because in Meteor, aggregate must use the low-level MongoDB Nodejs library, which doesn't know the Mongo.ObjectID type and so performs conversions that break Mongo.ObjectIDs. That's what 'full support' here is working around.

Usage

1import { ReactiveAggregate } from 'meteor/tunguska:reactive-aggregate';
2
3Meteor.publish('nameOfPublication', function() {
4  ReactiveAggregate(context, collection, pipeline, options);
5});
  • context should always be this in a publication.

  • collection is the Mongo.Collection instance to query. To preserve backwards compatibility, an observer is automatically added on this collection, unless options.noAutomaticObserver is set to true.

    The backwards-compatible options observeSelector and observeOptions are now deprecated, but will continue to be honoured on an automatically added observer. However, the recommended approach is to set options.noAutomaticObserver to true and define your own oberver(s) in options.observers. There is no guarantee that deprecated options will continue to be honoured in future releases.

  • pipeline is the aggregation pipeline to execute.

  • options provides further options:

    • aggregationOptions can be used to add further, aggregation-specific options. See standard aggregation options for more information. The additional aggregation options shown in this example are not necessarily sane!

      1  ReactiveAggregate(this, collection, pipeline, {
      2    aggregationOptions: { maxTimeMS: 500, bypassDocumentValidation: true },
      3  });
    • capturePipeline: A callback function having one parameter which will return the array of documents comprising the the current pipeline execution. :warning: Use with caution: this callback will be executed each time the pipeline re-runs.

      1  ReactiveAggregate(this, collection, pipeline, {
      2    capturePipeline(docs) {
      3      console.log(docs);
      4    },
      5  });
    • clientCollection defaults to the same name as the original collection, but can be overridden to send the results to a differently named client-side collection.

      1  ReactiveAggregate(this, collection, pipeline, {
      2    clientCollection: "clientCollectionName",
      3  });
    • debounceCount: An integer representing the number of observer changes across all observers before the aggregation will be re-run. Defaults to 0 (do not count) for backwards compatibility with the original API. Used in conjunction with debounceDelay to fine-tune reactivity. The first of the two debounce options to be reached will re-run the aggregation.

    • debounceDelay: An integer representing the maximum number of milli-seconds to wait for observer changes before the aggregation is re-run. Defaults to 0 (do not wait) for backwards compatibility with the original API. Used in conjunction with debounceCount to fine-tune reactivity. The first of the two debounce options to be reached will re-run the aggregation.

    • debug: A boolean (true or false), or a callback function having one parameter which will return the aggregate#cursor.explain() result. Defaults to false (no debugging).

    • objectIDKeysToRepair: An array of SimpleSchema-style dotted path keys to fields of the schema that are Mongo.ObjectIDs. This is not needed by default and should not be used unless the default behaviour of the code fails in some way. If your schemas use Mongo.ObjectID or Mongo.Collection.ObjectID as the type for object ids, rather than the Meteor default strings, and the code does not automatically handle your object ids properly (which may happen in rare cases, based on your schemas), then you can specify schema keys here to tell the code that they are Mongo.ObjectIDs as an alternative way to get your schemas to aggregate and return properly typed object ids. For example, if your BlogPosts collection schema has a parentID key that contains the object id of a parent post, and it also has a comments field that is an array of objects, one field of which, id, is a Mongo.ObjectID of a comment document in another collection, then if your aggregations don't return properly typed Mongo.ObjectIDs in those fields automatically, you could try providing ['parentID', 'comments.$.id']. But this is a last resort, and you should expect your aggregations to return Mongo.ObjectID values properly, including the _id of your primary collection. Defaults to [].

    • noAutomaticObserver: set this to true to prevent the backwards-compatible behaviour of an observer on the given collection.

    • observers: An array of cursors. Each cursor is the result of a Collection.find(). Each of the supplied cursors will have an observer attached, so any change detected (based on the selection criteria in the find) will re-run the aggregation pipeline.

    • loadObjectIdModules: A boolean (true or false) that if true, tries to load modules necessary for ObjectId support. Defaults to true.

    • warnings: A boolean (true or false) that if false, suppresses all warnings, regardless of any specificWarnings. Defaults to true (warning messages are logged).

    • specificWarnings object, allows you to suppress specific types of warnings: (they all default to true, warning messages are logged)

      • deprecations: Warnings about deprecations.
      • objectId: Warnings related to ObjectID and dependencies for using it.

    :hand: The following parameters are deprecated and will be removed in a later version. Both these parameters are now effectively absorbed into the observers option and if required should be replaced by adding a cursor (or cursors) to the array of cursors in observers. Setting either of these to anything other than the empty object {} will result in a deprecation notice to the server console (for example: tunguska:reactive-aggregate: observeSelector is deprecated).

    • observeSelector can be given to improve efficiency. This selector is used for observing the collection.

    (e.g. { authorId: { $exists: 1 } })

    • observeOptions can be given to limit fields, further improving efficiency. Ideally used to limit fields on your query.

    If none are given any change to the collection will cause the aggregation to be re-evaluated. (e.g. { limit: 10, sort: { createdAt: -1 } })

Quick Example

A publication for one of the examples in the MongoDB docs would look like this:

1Meteor.publish("booksByAuthor", function () {
2  ReactiveAggregate(this, Books, [{
3    $group: {
4      _id: "$author",
5      books: { $push: "$$ROOT" }
6    }
7  }]);
8});

Extended Example

Define the parent collection you want to run an aggregation on. Let's say:

1import { Mongo } from 'meteor/mongo';
2export const Reports = new Mongo.Collection('Reports');

...in a location where all your other collections are defined, say /imports/both/Reports.js

Next, prepare to publish the aggregation on the Reports collection into another client-side-only collection we'll call clientReport.

Create the clientReport in the client (it's needed only for client use). This collection will be the destination into which the aggregation will be put upon completion.

Publish the aggregation on the server:

1Meteor.publish("reportTotals", function() {
2  ReactiveAggregate(this, Reports, [{
3    // assuming our Reports collection have the fields: hours, books
4    $group: {
5      '_id': this.userId,
6      'hours': {
7      // In this case, we're running summation.
8        $sum: '$hours'
9      },
10      'books': {
11        $sum: 'books'
12      }
13    }
14  }, {
15    $project: {
16      // an id can be added here, but when omitted,
17      // it is created automatically on the fly for you
18      hours: '$hours',
19      books: '$books'
20    } // Send the aggregation to the 'clientReport' collection available for client use by using the clientCollection property of options.
21  }], { clientCollection: 'clientReport' });
22});

Subscribe to the above publication on the client:

1import { Mongo } from 'meteor/mongo';
2
3// Define a named, client-only collection, matching the publication's clientCollection.
4const clientReport = new Mongo.Collection('clientReport');
5
6Template.statsBrief.onCreated(function() {
7  // subscribe to the aggregation
8  this.subscribe('reportTotals');
9
10// Then in our Template helper:
11
12Template.statsBrief.helpers({
13  reportTotals() {
14    return clientReport.find();
15  },
16});

Finally, in your template:

{{#each report in reportTotals}}
  <div>Total Hours: {{report.hours}}</div>
  <div>Total Books: {{report.books}}</div>
{{/each}}

Your aggregated values will therefore be available in the client and behave reactively just as you'd expect.

Using $lookup

The use of $lookup in an aggregation pipeline introduces the eventuality that the aggregation pipeline will need to re-run when any or all of the collections involved in the aggregation change.

By default, only the base collection is observed for changes. However, it's possible to specify an arbitrary number of observers on disparate collections. In fact, it's possible to observe a collection which is not part of the aggregation pipeline to trigger a re-run of the aggregation. This introduces some interesting approaches towards optimising "heavy" pipelines on very active collections (although perhaps you shouldn't be doing that in the first place :wink:).

1Meteor.publish("biographiesByWelshAuthors", function () {
2  ReactiveAggregate(this, Authors, [{
3    $lookup: {
4      from: "books",
5      localField: "_id",
6      foreignField: "author_id",
7      as: "author_books"
8    }
9  }], {
10    noAutomaticObserver: true,
11    debounceCount: 100,
12    debounceDelay: 100,
13    observers: [
14      Authors.find({ nationality: 'welsh'}),
15      Books.find({ category: 'biography' })
16    ]
17  });
18});

The aggregation will re-run whenever there is a change to the "welsh" authors in the authors collection or if there is a change to the biographies in the books collection.

The debounce parameters were specified, so any changes will only be made available to the client when 100 changes have been seen across both collections (in total), or after 100ms, whichever occurs first.

Non-Reactive Aggregations

Like a Meteor Method, but the results come back in a Minimongo collection.

1Meteor.publish("biographiesByWelshAuthors", function () {
2  ReactiveAggregate(this, Authors, [{
3    $lookup: {
4      from: "books",
5      localField: "_id",
6      foreignField: "author_id",
7      as: "author_books"
8    }
9  }], {
10    noAutomaticObserver: true
11  });
12});

No observers were specified and noAutomaticObserver was enabled, so the publication runs once only.

On-Demand Aggregations

Also like a Meteor Method, but the results come back in a Minimongo collection and re-running of the aggregation can be triggered by observing an arbitrary, independent collection.

1Meteor.publish("biographiesByWelshAuthors", function () {
2  ReactiveAggregate(this, Authors, [{
3    $lookup: {
4      from: "books",
5      localField: "_id",
6      foreignField: "author_id",
7      as: "author_books"
8    }
9  }], {
10    noAutomaticObserver: true,
11    observers: [
12      Reruns.find({ _id: 'welshbiographies' })
13    ]
14  });
15});

By mutating the Reruns collection on a specific _id we cause the aggregation to re-run. The mutation could be done using a Meteor Method, or using Meteor's pub/sub.


Enjoy aggregating reactively, but use sparingly. Remember, with great reactivity comes great responsibility!