ostrio:cron-jobs

v4.0.0Published 4 weeks ago

support support

JoSk

"JoSk" is a Node.js task manager for horizontally scaled apps, apps planning for horizontal scaling, and apps that would need to scale horizontally in the future with ease.

"JoSk" mimics the native API of setTimeout and setInterval. Tasks also can get scheduled using CRON expressions. All queued tasks are synced between all running application instances via Redis, MongoDB, or custom adapter.

"JoSk" package made for different variety of horizontally scaled apps as clusters, multi-server, and multi-threaded Node.js instances. That are running either on the same or different machines or even different data-centers. "JoSk" ensures that the only single execution of each task occurs across all running instances of the application.

Although "JoSk" is made with multi-instance apps in mind, — it works on a single-instance applications seamlessly.

Note: JoSk is the server-only package.

ToC

Main features

  • 🏢 Synchronize single task across multiple servers;
  • 🔏 Read locking to avoid simultaneous task executions across complex infrastructure;
  • 📦 Zero dependencies, written from scratch for top performance;
  • 👨‍🔬 ~99% tests coverage;
  • 💪 Bulletproof design, built-in retries, and "zombie" task recovery 🧟🔫.

Prerequisites

  • redis-server@>=5.0.0 — Redis Server Version (if used with RedisAdapter)
  • mongod@>=4.0.0 — MongoDB Server Version (if used with MongoAdapter)
  • node@>=14.20.0 — Node.js version

Older releases compatibility

  • mongod@<4.0.0 — use josk@=1.1.0
  • node@<14.20.0 — use josk@=3.0.2
  • node@<8.9.0 — use josk@=1.1.0

Install:

npm install josk --save
1// ES Module Style
2import { JoSk, RedisAdapter, MongoAdapter } from 'josk';
3
4// CommonJS
5const { JoSk, RedisAdapter, MongoAdapter } = require('josk');

API:

new JoSk({opts}):

  • opts.adapter {RedisAdapter|MongoAdapter} - [Required] RedisAdapter or MongoAdapter or custom adapter
  • opts.client {RedisClient} - [Required for RedisAdapter] RedisClient instance, like one returned from await redis.createClient().connect() method
  • opts.db {Db} - [Required for MongoAdapter] Mongo's Db instance, like one returned from MongoClient#db() method
  • opts.lockCollectionName {String} - [Optional for MongoAdapter] By default all JoSk instances use the same __JobTasks__.lock collection for locking
  • opts.prefix {String} - [Optional] use to create multiple named instances
  • opts.debug {Boolean} - [Optional] Enable debugging messages, useful during development
  • opts.autoClear {Boolean} - [Optional] Remove (Clear) obsolete tasks (any tasks which are not found in the instance memory (runtime), but exists in the database). Obsolete tasks may appear in cases when it wasn't cleared from the database on process shutdown, and/or was removed/renamed in the app. Obsolete tasks may appear if multiple app instances running different codebase within the same database, and the task may not exist on one of the instances. Default: false
  • opts.resetOnInit {Boolean} - [Optional] (use with caution) make sure all old tasks are completed during initialization. Useful for single-instance apps to clean up unfinished that occurred due to intermediate shutdown, reboot, or exception. Default: false
  • opts.zombieTime {Number} - [Optional] time in milliseconds, after this time - task will be interpreted as "zombie". This parameter allows to rescue task from "zombie mode" in case when: ready() wasn't called, exception during runtime was thrown, or caused by bad logic. While resetOnInit option helps to make sure tasks are done on startup, zombieTime option helps to solve same issue, but during runtime. Default value is 900000 (15 minutes). It's not recommended to set this value to below 60000 (one minute)
  • opts.minRevolvingDelay {Number} - [Optional] Minimum revolving delay — the minimum delay between tasks executions in milliseconds. Default: 128
  • opts.maxRevolvingDelay {Number} - [Optional] Maximum revolving delay — the maximum delay between tasks executions in milliseconds. Default: 768
  • opts.onError {Function} - [Optional] Informational hook, called instead of throwing exceptions. Default: false. Called with two arguments:
    • title {String}
    • details {Object}
    • details.description {String}
    • details.error {Mix}
    • details.uid {String} - Internal uid, suitable for .clearInterval() and .clearTimeout()
  • opts.onExecuted {Function} - [Optional] Informational hook, called when task is finished. Default: false. Called with two arguments:
    • uid {String} - uid passed into .setImmediate(), .setTimeout(), or setInterval() methods
    • details {Object}
    • details.uid {String} - Internal uid, suitable for .clearInterval() and .clearTimeout()
    • details.date {Date} - Execution timestamp as JS {Date}
    • details.delay {Number} - Execution delay (e.g. interval for .setInterval())
    • details.timestamp {Number} - Execution timestamp as unix {Number}

Initialization

JoSk is storage-agnostic (since v4.0.0). It's shipped with Redis and MongoDB "adapters" out of the box, with option to extend its capabilities by creating and passing a custom adapter

Redis Adapter

JoSk has no dependencies, hence make sure redis NPM package is installed in order to support Redis Storage Adapter. RedisAdapter utilize basic set of commands SET, GET, DEL, EXISTS, HSET, HGETALL, and SCAN. RedisAdapter is compatible with all Redis-alike databases, and was well-tested with Redis and KeyDB

1import { JoSk, RedisAdapter } from 'josk';
2import { createClient } from 'redis';
3
4const redisClient = await createClient({
5  url: 'redis://127.0.0.1:6379'
6}).connect();
7
8const jobs = new JoSk({
9  adapter: RedisAdapter,
10  client: redisClient,
11});

MongoDB Adapter

JoSk has no dependencies, hence make sure mongodb NPM package is installed in order to support MongoDB Storage Adapter. Note: this package will add two new MongoDB collections per each new JoSk({ prefix }). One collection for tasks and second for "Read Locking" with .lock suffix

1import { JoSk, MongoAdapter } from 'josk';
2import { MongoClient } from 'mongodb';
3
4const client = new MongoClient('mongodb://127.0.0.1:27017');
5// To avoid "DB locks" — it's a good idea to use separate DB from the "main" DB
6const mongoDb = client.db('joskdb');
7const jobs = new JoSk({
8  adapter: MongoAdapter,
9  db: mongoDb,
10});

Create the first task

After JoSk initialized simply call JoSk#setInterval to create recurring task

1const jobs = new JoSk({ /*...*/ });
2
3const task = function (ready) {
4  /* ...code here... */
5  ready();
6};
7
8const asyncTask = function (ready) {
9  /* ...code here... */
10  asyncCall(() => {
11    /* ...more code here...*/
12    ready();
13  });
14};
15
16const asyncAwaitTask = async function (ready) {
17  try {
18    /* ...code here... */
19    await asyncMethod();
20    /* ...more code here...*/
21    ready();
22  } catch (err) {
23    ready(); // <-- Always run `ready()`, even if error is thrown
24  }
25};
26
27jobs.setInterval(task, 60 * 60 * 1000, 'task1h'); // every hour
28jobs.setInterval(asyncTask, 15 * 60 * 1000, 'asyncTask15m'); // every 15 mins
29jobs.setInterval(asyncAwaitTask, 30 * 60 * 1000, 'asyncAwaitTask30m'); // every 30 mins

setInterval(func, delay, uid)

  • func {Function} - Function to call on schedule
  • delay {Number} - Delay for the first run and interval between further executions in milliseconds
  • uid {String} - Unique app-wide task id
  • Returns: {String}

Set task into interval execution loop. ready() callback is passed as the first argument into a task function.

In the example below, the next task will not be scheduled until the current is ready:

1const syncTask = function (ready) {
2  /* ...run sync code... */
3  ready();
4};
5
6const asyncAwaitTask = async function (ready) {
7  try {
8    /* ...code here... */
9    await asyncMethod();
10    /* ...more code here...*/
11    ready();
12  } catch (err) {
13    ready(); // <-- Always run `ready()`, even if error is thrown
14  }
15};
16
17jobs.setInterval(syncTask, 60 * 60 * 1000, 'syncTask1h'); // will execute every hour + time to execute the task
18jobs.setInterval(asyncAwaitTask, 60 * 60 * 1000, 'asyncAwaitTask1h'); // will execute every hour + time to execute the task

In the example below, the next task will not wait for the current task to finish:

1const syncTask = function (ready) {
2  ready();
3  /* ...run sync code... */
4};
5
6const asyncAwaitTask = async function (ready) {
7  ready();
8  /* ...code here... */
9  await asyncMethod();
10  /* ...more code here...*/
11};
12
13jobs.setInterval(syncTask, 60 * 60 * 1000, 'syncTask1h'); // will execute every hour
14jobs.setInterval(asyncAwaitTask, 60 * 60 * 1000, 'asyncAwaitTask1h'); // will execute every hour

In the next example, a long running task is executed in a loop without delay after the full execution:

1const longRunningAsyncTask = function (ready) {
2  asyncCall((error, result) => {
3    if (error) {
4      ready(); // <-- Always run `ready()`, even if call was unsuccessful
5    } else {
6      anotherCall(result.data, ['param'], (error, response) => {
7        if (error) {
8          ready(); // <-- Always run `ready()`, even if call was unsuccessful
9          return;
10        }
11
12        waitForSomethingElse(response, () => {
13          ready(); // <-- End of the full execution
14        });
15      });
16    }
17  });
18};
19
20jobs.setInterval(longRunningAsyncTask, 0, 'longRunningAsyncTask'); // run in a loop as soon as previous run is finished

setTimeout(func, delay, uid)

  • func {Function} - Function to call after delay
  • delay {Number} - Delay in milliseconds
  • uid {String} - Unique app-wide task id
  • Returns: {String}

Run a task after delay in ms. setTimeout is useful for cluster - when you need to make sure task executed only once. ready() callback is passed as the first argument into a task function.

1const syncTask = function (ready) {
2  /* ...run sync code... */
3  ready();
4};
5
6const asyncTask = function (ready) {
7  asyncCall(function () {
8    /* ...run async code... */
9    ready();
10  });
11};
12
13const asyncAwaitTask = async function (ready) {
14  try {
15    /* ...code here... */
16    await asyncMethod();
17    /* ...more code here...*/
18    ready();
19  } catch (err) {
20    ready(); // <-- Always run `ready()`, even if error is thrown
21  }
22};
23
24jobs.setTimeout(syncTask, 60 * 1000, 'syncTaskIn1m'); // will run only once across the cluster in a minute
25jobs.setTimeout(asyncTask, 60 * 1000, 'asyncTaskIn1m'); // will run only once across the cluster in a minute
26jobs.setTimeout(asyncAwaitTask, 60 * 1000, 'asyncAwaitTaskIn1m'); // will run only once across the cluster in a minute

setImmediate(func, uid)

  • func {Function} - Function to execute
  • uid {String} - Unique app-wide task id
  • Returns: {String}

Immediate execute the function, and only once. setImmediate is useful for cluster - when you need to execute function immediately and only once across all servers. ready() is passed as the first argument into the task function.

1const syncTask = function (ready) {
2  //...run sync code
3  ready();
4};
5
6const asyncTask = function (ready) {
7  asyncCall(function () {
8    //...run more async code
9    ready();
10  });
11};
12
13const asyncAwaitTask = async function (ready) {
14  try {
15    /* ...code here... */
16    await asyncMethod();
17    /* ...more code here...*/
18    ready();
19  } catch (err) {
20    ready(); // <-- Always run `ready()`, even if error is thrown
21  }
22};
23
24jobs.setImmediate(syncTask, 'syncTask'); // will run immediately and only once across the cluster
25jobs.setImmediate(asyncTask, 'asyncTask'); // will run immediately and only once across the cluster
26jobs.setImmediate(asyncAwaitTask, 'asyncTask'); // will run immediately and only once across the cluster

clearInterval(timer [, callback])

  • timer {String} — Timer id returned from JoSk#setInterval() method
  • [callback] {Function} — [Optional] callback function, called with error and result arguments. result is true when task is successfully cleared, or false when task is not found

Cancel current interval timer. Must be called in a separate event loop from setInterval.

1const timer = jobs.setInterval(func, 34789, 'unique-taskid');
2jobs.clearInterval(timer);

clearTimeout(timer [, callback])

  • timer {String} — Timer id returned from JoSk#setTimeout() method
  • [callback] {Function} — [Optional] callback function, called with error and result arguments. result is true when task is successfully cleared, or false when task is not found

Cancel current timeout timer. Must be called in a separate event loop from setTimeout.

1const timer = jobs.setTimeout(func, 34789, 'unique-taskid');
2jobs.clearTimeout(timer);

destroy()

Destroy JoSk instance. This method shouldn't be called in normal circumstances. Stop internal interval timer. After JoSk is destroyed — calling public methods would end up logged to std or if onError hook was passed to JoSk it would receive an error. Only permitted methods are clearTimeout and clearInterval.

1// EXAMPLE: DESTROY JoSk INSTANCE UPON SERVER PROCESS TERMINATION
2const jobs = new JoSk({ /* ... */ });
3
4const cleanUpBeforeTermination = function () {
5  /* ...CLEAN UP AND STOP OTHER THINGS HERE... */
6  jobs.destroy();
7  process.exit(1);
8};
9
10process.stdin.resume();
11process.on('uncaughtException', cleanUpBeforeTermination);
12process.on('exit', cleanUpBeforeTermination);
13process.on('SIGHUP', cleanUpBeforeTermination);

Examples

Use cases and usage examples

CRON

Use JoSk to invoke synchronized tasks by CRON schedule. Use cron-parser package to parse CRON schedule into timestamp. To simplify CRON scheduling grab and use createCronTask function below:

1import parser from 'cron-parser';
2
3const jobsCron = new JoSk({
4  prefix: 'cron'
5});
6
7// CRON HELPER FUNCTION
8const createCronTask = (uniqueName, cronTask, task) => {
9  const next = +parser.parseExpression(cronTask).next().toDate();
10  const timeout = next - Date.now();
11
12  return jobsCron.setTimeout(function (done) {
13    done(() => {
14      task(); // <- Execute task
15      createCronTask(uniqueName, cronTask, task); // <- Create task for the next iteration
16    });
17  }, timeout, uniqueName);
18};
19
20createCronTask('This task runs every 2 seconds', '*/2 * * * * *', function () {
21  console.log(new Date);
22});

Pass arguments

Passing arguments can be done via wrapper function

1const jobs = new JoSk({ /* ... */ });
2const myVar = { key: 'value' };
3let myLet = 'Some top level or env.variable (can get changed during runtime)';
4
5const task = function (arg1, arg2, ready) {
6  //... code here
7  ready();
8};
9
10const taskA = function (ready) {
11  task(myVar, myLet, ready);
12};
13
14const taskB = function (ready) {
15  task({ otherKey: 'Another Value' }, 'Some other string', ready);
16};
17
18jobs.setInterval(taskA, 60 * 60 * 1000, 'taskA');
19jobs.setInterval(taskB, 60 * 60 * 1000, 'taskB');

Clean up old tasks

During development and tests you may want to clean up Adapter's Storage

Clean up Redis

To clean up old tasks via Redis CLI use the next query pattern:

redis-cli --no-auth-warning KEYS "josk:default:*" | xargs redis-cli --raw --no-auth-warning DEL

# If you're using multiple JoSk instances with prefix:
redis-cli --no-auth-warning KEYS "josk:prefix:*" | xargs redis-cli --raw --no-auth-warning DEL

Clean up MongoDB

To clean up old tasks via MongoDB use the next query pattern:

1// Run directly in MongoDB console:
2db.getCollection('__JobTasks__').remove({});
3// If you're using multiple JoSk instances with prefix:
4db.getCollection('__JobTasks__PrefixHere').remove({});

MongoDB connection fine tuning

1// Recommended MongoDB connection options
2// When used with ReplicaSet
3const options = {
4  writeConcern: {
5    j: true,
6    w: 'majority',
7    wtimeout: 30000
8  },
9  readConcern: {
10    level: 'majority'
11  },
12  readPreference: 'primary'
13};
14
15MongoClient.connect('mongodb://url', options, (error, client) => {
16  // To avoid "DB locks" — it's a good idea to use separate DB from "main" application DB
17  const db = client.db('dbName');
18  const jobs = new JoSk({
19    adapter: MongoAdapter,
20    db: db,
21  });
22});

Notes

  • This package is perfect when you have multiple horizontally scaled servers for load-balancing, durability, an array of micro-services or any other solution with multiple running copies of code running repeating tasks that needs to run only once per application/cluster, not per server/instance;
  • Limitation — task must be run not often than once per two seconds (from 2 to ∞ seconds). Example tasks: Email, SMS queue, Long-polling requests, Periodical application logic operations or Periodical data fetch, sync, and etc;
  • Accuracy — Delay of each task depends on storage and "de-synchronization delay". Trusted time-range of execution period is task_delay ± (256 + Storage_Request_Delay). That means this package won't fit when you need to run a task with very precise delays. For other cases, if ±256 ms delays are acceptable - this package is the great solution;
  • Use opts.minRevolvingDelay and opts.maxRevolvingDelay to set the range for random delays between executions. Revolving range acts as a safety control to make sure different servers not picking the same task at the same time. Default values (128 and 768) are the best for 3-server setup (the most common topology). Tune these options to match needs of your project. Higher opts.minRevolvingDelay will reduce storage read/writes;
  • This package implements "Read Locking" via "RedLock" for Redis and dedicated .lock collection for MongoDB.

Running Tests

  1. Clone this package
  2. In Terminal (Console) go to directory where package is cloned
  3. Then run:
# Before running tests make sure NODE_ENV === development
# Install NPM dependencies
npm install --save-dev

# Before running tests you need
# to have access to MongoDB and Redis servers
REDIS_URL="redis://127.0.0.1:6379" MONGO_URL="mongodb://127.0.0.1:27017/npm-josk-test-001" npm test

# Be patient, tests are taking around 4 mins

Run Redis tests only

Run Redis-related tests only

# Before running Redis tests you need to have Redis server installed and running
REDIS_URL="redis://127.0.0.1:6379" npm run test-redis

# Be patient, tests are taking around 2 mins

Run MongoDB tests only

Run MongoDB-related tests only

# Before running Mongo tests you need to have MongoDB server installed and running
MONGO_URL="mongodb://127.0.0.1:27017/npm-josk-test-001" npm run test-mongo

# Be patient, tests are taking around 2 mins

Why JoSk?

JoSk is Job-Task - Is randomly generated name by "uniq" project

Support our open source contribution: