simplesigner:jobs

v0.0.6Published last year

simplesigner:jobs

(inspired heavily by msavin:sjobs)

Run scheduled tasks with the simple jobs queue made just for Meteor. With tight MongoDB integration and fibers-based timing functions, this package is quick, reliable and effortless to use.

  • Jobs run on one server at a time
  • Jobs run predictably and consecutively
  • Job timers are super-efficient
  • Jobs are stored in MongoDB
  • No third party dependencies

It can run hundreds of jobs in seconds with minimal CPU impact, making it a reasonable choice for many applications. To get started, check out the quick start guide and the full API documentation below.

Coming from msavin:jobs?

This package has an API inspired by msavin:sjobs and in some cases can be a drop-in replacement. If you're coming from msavin:jobs read about the potentially breaking API differences.

The main difference in this package compared to msavin:jobs is that this package doesn't continuously poll the job queue. Instead, it intelligently sets a single timer for the next due job. This means that most of the time this package is doing absolutely nothing, compared to msavin:jobs which can use significant CPU even when idle. It also means that jobs are executed closer to their due date, instead of potentially late due to the polling interval.

Unfortunately I found the job queue system in msavin:jobs too fundamentally built-in to modify and create a PR, so it was easier to write my own package.

Coming from wildhart:jobs?

This package is a rewritten release of wildhart/meteor.jobs. Unfortunately, that project is largely unmaintained, has broken documentation, does not provide meaningful type-safety, actively rejects all attempts to fix compilation errors and improve the status-quo, and lacks any test coverage whatsoever. As such, we have taken over the role of maintaining this project for the public-good.

Quick Start

First, install the package, and import if necessary:

meteor add simplesigner:jobs
1import { Jobs } from 'meteor/simplesigner:jobs'

Then, write your background jobs like you would write your methods:

1Jobs.register({
2    "sendReminder": function (to, message) {
3        var call = HTTP.put("http://www.magic.com/sendEmail", {
4            to: to,
5            message: message
6        });
7
8        if (call.statusCode === 200) {
9            this.success(call.result);
10        } else {
11            this.reschedule({in: {minutes: 5}});
12        }
13    }
14});

Finally, schedule a background job like you would call a method:

1Jobs.run("sendReminder", "jony@apple.com", "The future is here!");

One more thing: the function above will schedule the job to run on the moment that the function was called, however, you can delay it by passing in a special configuration object at the end:

1Jobs.run("sendReminder", "jony@apple.com", "The future is here!", {
2    in: {
3        days: 3,
4    },
5    on: {
6        hour: 9,
7        minute: 42
8    },
9    priority: 9999999999
10});

The configuration object supports date, in, on, and priority, all of which are completely optional, see Jobs.run.

API Documentation

Jobs.register and Jobs.run are all you need to get started, but that's only the beginning of what the package can do. To explore the rest of the functionality, jump into the documentation:

Jobs.configure

Jobs.configure allows you to configure how the package should work. You can configure one option or all of them. Defaults are shown in the code below:

1Jobs.configure({
2    // (milliseconds) specify how long the server could be inactive before another server
3    // takes on the master role (default = 5min)
4    maxWait: Number,
5
6    // (milliseconds) specify how long after server startup the package should start running
7    startupDelay: Number,
8
9    // determine how to set the serverId - see below. (default = random string)
10    setServerId: String | Function,
11
12    // determine if/how to log the package outputs (default = console.log)\
13	error: Boolean | Function,
14    log: Boolean | Function,
15	warn: Boolean | Function,
16
17    // specify if all job queues should start automatically on first launch (default = true)...
18    //  ... after server relaunch the list of paused queues is restored from the database.
19    autoStart: Boolean,
20
21    // whether to mark successful just as successful, or remove them,
22    // otherwise you have to resolve every job with this.success() or this.remove()
23    defaultCompletion: 'success' | 'remove',
24})

setServerId - In a multi-server deployment, jobs are only executed on one server. Each server should have a unique ID so that it knows if it is control of the job queue or not. You can provide a function which returns a serverId from somewhere (e.g. from an environment variable) or just use the default of a random string. In a single-server deployment set this to a static string so that the server knows that it is always in control and can take control more quickly after a reboot.

Jobs.register

Jobs.register allows you to register a function for a job.

1Jobs.register({
2	sendEmail: function (to, content) {
3		var send = Magic.sendEmail(to, content);
4		if (send) {
5			this.success();
6		} else {
7			this.reschedule({in: {minutes: 5}});
8		}
9	},
10	sendReminder: function (userId, content) {
11		var doc = Reminders.insert({
12			to: userId,
13			content: content
14		})
15
16		if (doc) {
17			this.remove();
18		} else {
19			this.reschedule({in: {minutes: 5}});
20		}
21	}
22})

Each job is bound with a set of functions to give you maximum control over how the job runs:

  • this.document - access the job document
  • this.success() - tell the queue the job is completed
  • this.failure() - tell the queue the job failed
  • this.reschedule(config) - tell the queue to schedule the job for a future date
  • this.remove() - remove the job from the queue
  • this.replicate(config) - create a copy of the job with a different due date provided by config (returns the new jobId)

Each job must be resolved with success, failure, reschedule, and/or remove.

See Repeating Jobs and Async Jobs/Promises

Jobs.run

Jobs.run allows you to schedule a job to run. You call it just like you would call a method, by specifying the job name and its arguments. At the end, you can pass in a special configuration object. Otherwise, it will be scheduled to run as soon as possible.

1var jobDoc = Jobs.run("sendReminder", "jony@apple.com", "The future is here!", {
2    in: {
3        days: 3,
4    },
5    on: {
6        hour: 9,
7        minute: 42
8    },
9    priority: 9999999999,
10    singular: true
11});

Jobs.run returns a jobDoc.

The configuration object supports the following inputs:

  • in - Object
    • The in parameter will schedule the job at a later time, using the current time and your inputs to calculate the due time.
  • on - Object
    • The on parameter override the current time with your inputs.
  • in and on - Object
    • The supported fields for in and on can be used in singular and/or plural versions:
      • millisecond, second, minute, hour, day, month, and year
      • milliseconds, seconds, minutes, hours, days, months, and years
    • The date object will be updated in the order that is specified. This means that if it is year 2017, and you set in one year, but on 2019, the year 2019 will be the final result. However, if you set on 2019 and in one year, then the year 2020 will be the final result.
  • priority - Number
    • The default priority for each job is 0
    • If you set it to a positive integer, it will run ahead of other jobs.
    • If you set it to a negative integer, it will only run after all the zero or positive jobs have completed.
  • date - Date
    • Provide your own date. This stacks with the in and on operator, and will be applied before they perform their operations.
  • unique - Boolean
    • If a job is marked as unique, it will only be scheduled if no other job exists with the same arguments
  • singular - Boolean
    • If a job is marked as singular, it will only be scheduled if no other job is pending with the same arguments
  • awaitAsync - Boolean
    • If an async job with run with awaitAsync: true is running, then no other job of the same name will start until the running job has completed.
  • callback - Function
    • Run a callback function after scheduling the job

Jobs.execute

Jobs.execute allows you to run a job ahead of its due date. It can only work on jobs that have not been resolved.

1Jobs.execute(docId)

Jobs.reschedule

Jobs.reschedule allows you to reschedule a job. It can only work on jobs that have not been resolved.

1Jobs.reschedule(jobId, {
2	in: {
3		minutes: 5
4	},
5	priority: 9999999
6});

The configuration is passed in as the second argument, and it supports the same inputs as Jobs.run.

Jobs.replicate

Jobs.replicate allows you to replicate a job.

1var jobId = Jobs.replicate(jobId, {
2	in: {
3		minutes: 5
4	}
5})

Jobs.replicate returns a jobId.

Jobs.start

Jobs.start allows you start all the queues. This runs automatically unless autoStart is set to false. If you call the function with no arguments, it will start all the queues. If you pass in a String, it will start a queue with that name. If you pass in an Array, it will start all the queues named in the array.

1// Start all the queues
2Jobs.start()
3
4// Start just one queue
5Jobs.start("sendReminder")
6
7// Start multiple queues
8Jobs.start(["sendReminder", "sendEmail"])

Unlike msavin:sjobs, this function can be called on any server and whichever server is currently in control of the job queue will be notified.

Jobs.stop

Jobs.stop allows you stop all the queues. If you call the function with no arguments, it will stop all the queues. If you pass in a String, it will stop a queue with that name. If you pass in an Array, it will stop all the queues named in the array.

1// Stop all the queues
2Jobs.stop()
3
4// Stop just one queue
5Jobs.stop("sendReminder")
6
7// Stop multiple queues
8Jobs.stop(["sendReminder", "sendEmail"])

Unlike msavin:sjobs, this function can be called on any server and whichever server is currently in control of the job queue will be notified.

If you need to stop all jobs via mongo use:

1mongo> db.jobs_dominator_3.update({_id:"dominatorId"}, {$set: {pausedJobs: ['*']}});

The in-control server should observe the change and stop instantly. Use {$unset: {pausedJobs: 1}} or {$set: {pausedJobs: []}} to start all the queues again.

Jobs.clear

Jobs.clear allows you to clear all or some of the jobs in your database.

1var count = Jobs.clear(state, jobName, ...arguments, callback);
2e.g:
3count = Jobs.clear(); 		// remove all completed jobs (success or failure)
4count = Jobs.clear('*');	// remove all jobs
5count = Jobs.clear('failure', 'sendEmail', 'jony@apple.com', function(err, count) {console.log(err, count)});

Parameters:

  • state for selecting a job state (either pending, success, failure, or * to select all of them), or omit to all except pending jobs.
  • jobName to only remove jobs with a specific name.
  • provide arguments to match jobs only with the same arguments.
  • callback to provide a callback function with error and result parameters, where result is the number of jobs removed.

Jobs.remove

Jobs.remove allows you to remove a job from the collection.

1var success = Jobs.remove(docId);

Jobs.jobs

Jobs.jobs gives access to an object of defined job functions:

1var jobNames = Object.keys(Jobs.jobs);  // ['sendEmail', 'sendReminder']
2var nJobTypes = jobNames.length;        // 2

Jobs.collection

Jobs.collection allows you to access the MongoDB collection where the jobs are stored. Ideally, you should not require interaction with the database directly.

Repeating jobs

Repeating jobs can be created by using this.reschedule() in the job function, e.g.:

1Jobs.register({
2	processMonthlyPayments() {
3		this.reschedule({in: {months: 1}});
4		processPayments();
5	},
6});
7
8Jobs.run('processMonthlyPayments', {singular: true});

Since this package doesn't keep a job history (compared with msavin:sjobs), you can use this.reschedule() indefinitely without polluting the jobs database, instead of having to use this.replicate() followed by this.remove().

Async Jobs

The job function can use async/await or return a promise:

1Jobs.register({
2	async asyncJob(...args) {
3		await new Promise(resolve => Meteor.setTimeout(() => resolve(0), 4000));
4		this.remove();
5	},
6	promiseJob(...args) {
7		return new Promise(resolve => Meteor.setTimeout(() => {
8			this.remove();
9			resolve(0);
10		}, 8000));
11	},
12});

This defers the error message 'Job was not resolved with success, failure, reschedule or remove' until the promise resolves. Note that:

  • While jobs are executing their status is set to 'executing'.
  • Other jobs of the same type will still run when scheduled while asynchronous jobs are executing, unless the running job was configured with awaitSync: true, in which case the pending job will wait until the previous job of that name has completed.
  • Asynchronous code may need to be wrapped in Meteor.bindEnvironment().

Bulk Operations

The job queue intelligently prevents lots of a single job dominating the job queue, so feel free to use this package to safely schedule bulk operations, e.g, sending 1000s of emails. Although it may take some time to send all of these emails, any other jobs which are scheduled to run while they are being sent will still be run on time. Run each operation as its own job (e.g, 1000 separate "sendSingleEmail" jobs rather than a single "send1000Emails" job. The job queue will run all 1000 "sendSingleEmail" jobs in sequence, but after each job it will check if any other jobs need to run first.


API Differences From msavin:sjobs

If any of these differences make this package unsuitable for you, please let me know and I'll consider fixing.

  • This package doesn't keep a job history.
  • failed jobs are not retried, unless they have already been rescheduled.
  • The Job configuration object doesn't support the data attribute - I never found any use for this.
  • The following Jobs.configure() options are not available or different:
    • interval - this package doesn't regularly query the job queue for due jobs, instead it intelligently sets a timer for the next job.
    • getDate
    • disableDevelopmentMode
    • remoteCollection
    • autoStart - only relevant on first launch. On relaunch the list of paused queues is restored from the database.
  • The following Jobs.configure() options have additional options:
    • setServerId can be a String as as well as a Function
    • log can be a Boolean as well as a Function
  • In a job function, this.set() and this.get() are not provided - I never found any use for this.
  • In a job function, this.success() and this.failure() to not take a result parameter - this package doesn't keep a job history
  • singular jobs only check for pending jobs of the same name, so they can be run again even if a previous job failed.
  • Jobs.start() and Jobs.stop() can be called on any server and whichever server is in control of the job queue will be notified.
  • Jobs.cancel() doesn't exist. Just remove it with Jobs.remove() - I don't see the point in keeping old jobs lying around.
  • Jobs.clear() can take additional argument parameters to only delete jobs matching those arguments.
  • Jobs.jobs doesn't exist in msavin:sjobs

Version History

See: documentation/HISTORY.md