JoSk
"JoSk" is a Node.js task manager for horizontally scaled apps, apps planning horizontal scaling, and apps that need to easily scale horizontally in the future.
"JoSk" follows setTimeout
and setInterval
methods native API. Tasks can get scheduled using CRON expressions. All queued tasks are synced between all running application instances via MongoDB.
"JoSk" package support different horizontally scaled apps via clusters, multi-server, and multi-threaded Node.js instances. That are running on the same or different machines or different data-centers. "JoSk" ensures that the only single execution of each task occurs across all running instances of the application.
Note: JoSk is the server-only package.
ToC:
Main features:
- 🏢 Synchronize single task across multiple servers;
- 🔏 Collection locking to avoid simultaneous task executions across complex infrastructure;
- 📦 Zero dependencies, written from scratch for top performance;
- 👨🔬 ~99% tests coverage;
- 💪 Bulletproof design, built-in retries, and "zombie" task recovery 🧟🔫.
Prerequisites
mongod@>=4.0.0
— MongoDB Server Versionnode@>=14.20.0
— Node.js version
Older releases compatibility
mongod@<4.0.0
— usejosk@=1.1.0
node@<14.20.0
— usejosk@=3.0.2
node@<8.9.0
— usejosk@=1.1.0
Install:
npm install josk --save
1// ES Module Style 2import JoSk from 'josk'; 3 4// CommonJS 5const JoSk = require('josk');
Notes:
- This package is perfect when you have multiple horizontally scaled servers for load-balancing, durability, an array of micro-services or any other solution with multiple running copies of code when you need to run repeating tasks, and you need to run it only once per app/cluster, not per server;
- Limitation — task must be run not often than once per two seconds (from 2 to ∞ seconds). Example tasks: Email, SMS queue, Long-polling requests, Periodical application logic operations or Periodical data fetch, sync, and etc;
- Accuracy — Delay of each task depends on MongoDB and "de-synchronization delay". Trusted time-range of execution period is
task_delay ± (256 + MongoDB_Connection_And_Request_Delay)
. That means this package won't fit when you need to run a task with very certain delays. For other cases, if±256 ms
delays are acceptable - this package is the great solution; - Use
opts.minRevolvingDelay
andopts.maxRevolvingDelay
to set the range for random delays between executions. Revolving range acts as a safety control to make sure different servers not picking the same task at the same time. Default values (128
and768
) are the best for 3-server setup (the most common topology). Tune these options to match needs of your project. Higheropts.minRevolvingDelay
will reduce load on MongoDB; - To avoid "DB locks" — it's recommended to use separate DB from "main" application DB (same MongoDB server can have multiple DBs).
- This package implements "Collection Locking" via special collection ending with
.lock
prefix; - In total this package will add two new MongoDB collections per each
new JoSk({ prefix })
to a database it's connected.
API:
new JoSk({opts})
:
opts.db
{Object} - [Required] Connection to MongoDB, like returned as argument fromMongoClient.connect()
opts.prefix
{String} - [Optional] use to create multiple named instancesopts.lockCollectionName
{String} - [Optional] By default all JoSk instances use the same__JobTasks__.lock
collection for lockingopts.debug
{Boolean} - [Optional] Enable debugging messages, useful during developmentopts.autoClear
{Boolean} - [Optional] Remove (Clear) obsolete tasks (any tasks which are not found in the instance memory (runtime), but exists in the database). Obsolete tasks may appear in cases when it wasn't cleared from the database on process shutdown, and/or was removed/renamed in the app. Obsolete tasks may appear if multiple app instances running different codebase within the same database, and the task may not exist on one of the instances. Default:false
opts.resetOnInit
{Boolean} - [Optional] make sure all old tasks is completed before setting a new one. Useful when you run a single instance of an app, or multiple app instances on one machine, in case machine was reloaded during running task and task is unfinishedopts.zombieTime
{Number} - [Optional] time in milliseconds, after this time - task will be interpreted as "zombie". This parameter allows to rescue task from "zombie mode" in case when:ready()
wasn't called, exception during runtime was thrown, or caused by bad logic. WhileresetOnInit
option helps to make sure tasks aredone
on startup,zombieTime
option helps to solve same issue, but during runtime. Default value is900000
(15 minutes). It's not recommended to set this value to less than a minute (60000ms)opts.minRevolvingDelay
{Number} - [Optional] Minimum revolving delay — the minimum delay between tasks executions in milliseconds. Default:128
opts.maxRevolvingDelay
{Number} - [Optional] Maximum revolving delay — the maximum delay between tasks executions in milliseconds. Default:768
opts.onError
{Function} - [Optional] Informational hook, called instead of throwing exceptions. Default:false
. Called with two arguments:title
{String}details
{Object}details.description
{String}details.error
{Mix}details.uid
{String} - Internaluid
, suitable for.clearInterval()
and.clearTimeout()
opts.onExecuted
{Function} - [Optional] Informational hook, called when task is finished. Default:false
. Called with two arguments:uid
{String} -uid
passed into.setImmediate()
,.setTimeout()
, orsetInterval()
methodsdetails
{Object}details.uid
{String} - Internaluid
, suitable for.clearInterval()
and.clearTimeout()
details.date
{Date} - Execution timestamp as JS {Date}details.delay
{Number} - Executiondelay
(e.g.interval
for.setInterval()
)details.timestamp
{Number} - Execution timestamp as unix {Number}
Initialization:
1MongoClient.connect('mongodb://url', (error, client) => { 2 // To avoid "DB locks" — it's a good idea to use separate DB from "main" application DB 3 const db = client.db('dbName'); 4 const job = new JoSk({ db }); 5});
1const job = new JoSk({db: db}); 2 3const task = function (ready) { 4 /* ...code here... */ 5 ready(); 6}; 7 8const asyncTask = function (ready) { 9 /* ...code here... */ 10 asyncCall(() => { 11 /* ...more code here...*/ 12 ready()'' 13 }); 14}; 15 16job.setInterval(task, 60 * 60 * 1000, 'task1h'); // every hour 17job.setInterval(asyncTask, 15 * 60 * 1000, 'asyncTask15m'); // every 15 mins
setInterval(func, delay, uid)
func
{Function} - Function to call on scheduledelay
{Number} - Delay for first run and interval between further executions in millisecondsuid
{String} - Unique app-wide task id
Set task into interval execution loop. ready()
is passed as the first argument into a task function.
In the example below, next task will not be scheduled until the current is ready:
1const syncTask = function (ready) { 2 /* ...run sync code... */ 3 ready(); 4}; 5 6const asyncTask = function (ready) { 7 asyncCall(function () { 8 /* ...run async code... */ 9 ready(); 10 }); 11}; 12 13job.setInterval(syncTask, 60 * 60 * 1000, 'syncTask1h'); // will execute every hour + time to execute the task 14job.setInterval(asyncTask, 60 * 60 * 1000, 'asyncTask1h'); // will execute every hour + time to execute the task
In the example below, next task will not wait for the current task to finish:
1const syncTask = function (ready) { 2 ready(); 3 /* ...run sync code... */ 4}; 5 6const asyncTask = function (ready) { 7 ready(); 8 asyncCall(function () { 9 /* ...run async code... */ 10 }); 11}; 12 13job.setInterval(syncTask, 60 * 60 * 1000, 'syncTask1h'); // will execute every hour 14job.setInterval(asyncTask, 60 * 60 * 1000, 'asyncTask1h'); // will execute every hour
In this example, we're assuming to have long running task, executed in a loop without delay, but after full execution:
1const longRunningAsyncTask = function (ready) { 2 asyncCall((error, result) => { 3 if (error) { 4 ready(); // <-- Always run `ready()`, even if call was unsuccessful 5 } else { 6 anotherCall(result.data, ['param'], (error, response) => { 7 waitForSomethingElse(response, () => { 8 ready(); // <-- End of full execution 9 }); 10 }); 11 } 12 }); 13}; 14 15job.setInterval(longRunningAsyncTask, 0, 'longRunningAsyncTask'); // run in a loop as soon as previous run is finished
setTimeout(func, delay, uid)
func
{Function} - Function to call on scheduledelay
{Number} - Delay in millisecondsuid
{String} - Unique app-wide task id
Set task into timeout execution. setTimeout
is useful for cluster - when you need to make sure task executed only once. ready()
is passed as the first argument into a task function.
1const syncTask = function (ready) { 2 /* ...run sync code... */ 3 ready(); 4}; 5 6const asyncTask = function (ready) { 7 asyncCall(function () { 8 /* ...run async code... */ 9 ready(); 10 }); 11}; 12 13job.setTimeout(syncTask, 60 * 1000, 'syncTaskIn1m'); // will run only once across the cluster in a minute 14job.setTimeout(asyncTask, 60 * 1000, 'asyncTaskIn1m'); // will run only once across the cluster in a minute
setImmediate(func, uid)
func
{Function} - Function to executeuid
{String} - Unique app-wide task id
Immediate execute the function, and only once. setImmediate
is useful for cluster - when you need to execute function immediately and only once across all servers. ready()
is passed as the first argument into the task function.
1const syncTask = function (ready) { 2 //...run sync code 3 ready(); 4}; 5const asyncTask = function (ready) { 6 asyncCall(function () { 7 //...run more async code 8 ready(); 9 }); 10}; 11 12job.setImmediate(syncTask, 'syncTask'); 13job.setImmediate(asyncTask, 'asyncTask');
clearInterval(timer [, callback])
timer
{String} — Timer id returned fromJoSk#setInterval()
method[callback]
{Function} — [Optional] callback function, called witherror
andresult
arguments.result
istrue
when task is successfully cleared, orfalse
when task is not found
Cancel current interval timer. Must be called in a separate event loop from setInterval
.
1const timer = job.setInterval(func, 34789, 'unique-taskid'); 2job.clearInterval(timer);
clearTimeout(timer [, callback])
timer
{String} — Timer id returned fromJoSk#setTimeout()
method[callback]
{Function} — [Optional] callback function, called witherror
andresult
arguments.result
istrue
when task is successfully cleared, orfalse
when task is not found
Cancel current timeout timer. Should be called in a separate event loop from setTimeout
.
1const timer = job.setTimeout(func, 34789, 'unique-taskid'); 2job.clearTimeout(timer);
destroy()
Destroy JoSk instance. This method shouldn't be called in normal circumstances. Stop internal interval timer. After JoSk is destroyed — calling public methods would end up logged to std
or if onError
hook was passed to JoSk it would receive an error. Only permitted methods are clearTimeout
and clearInterval
.
1// EXAMPLE: DESTROY JoSk INSTANCE UPON SERVER PROCESS TERMINATION 2const job = new JoSk({db: db}); 3 4const cleanUpBeforeTermination = function () { 5 /* ...CLEAN UP AND STOP OTHER THINGS HERE... */ 6 job.destroy(); 7 process.exit(1); 8}; 9 10process.stdin.resume(); 11process.on('uncaughtException', cleanUpBeforeTermination); 12process.on('exit', cleanUpBeforeTermination); 13process.on('SIGHUP', cleanUpBeforeTermination);
Examples
Use cases and usage examples
CRON
Use JoSk to invoke synchronized tasks by CRON schedule. Use cron-parser
package to parse CRON tasks. createCronTask
example
1import parser from 'cron-parser'; 2 3const jobCron = new JoSk({ 4 db: db, 5 maxRevolvingDelay: 256, // <- Speed up timer speed by lowering its max revolving delay 6 zombieTime: 1024, // <- will need to call `done()` right away 7 prefix: 'cron' 8}); 9 10// CREATE HELPER FUNCTION 11const createCronTask = (uniqueName, cronTask, task) => { 12 const next = +parser.parseExpression(cronTask).next().toDate(); 13 const timeout = next - Date.now(); 14 15 return jobCron.setTimeout(function (done) { 16 done(() => { // <- call `done()` right away 17 // MAKE SURE FURTHER LOGIC EXECUTED 18 // INSIDE done() CALLBACK 19 task(); // <- Execute task 20 createCronTask(uniqueName, cronTask, task); // <- Create task for the next iteration 21 }); 22 }, timeout, uniqueName); 23}; 24 25createCronTask('This task runs every 2 seconds', '*/2 * * * * *', function () { 26 console.log(new Date); 27});
Pass arguments
1const job = new JoSk({db: db}); 2const myVar = { key: 'value' }; 3let myLet = 'Some top level or env.variable (can get changed during runtime)'; 4 5const task = function (arg1, arg2, ready) { 6 //... code here 7 ready(); 8}; 9 10const taskA = function (ready) { 11 task(myVar, myLet, ready); 12}; 13 14const taskB = function (ready) { 15 task({ otherKey: 'Another Value' }, 'Some other arguments', ready); 16}; 17 18job.setInterval(taskA, 60 * 60 * 1000, 'taskA'); 19job.setInterval(taskB, 60 * 60 * 1000, 'taskB');
Clean up old tasks
To clean up old tasks via MongoDB use next query pattern:
1// Run directly in MongoDB console: 2db.getCollection('__JobTasks__').remove({}); 3// If you're using multiple JoSk instances with prefix: 4db.getCollection('__JobTasks__PrefixHere').remove({});
MongoDB connection fine tunning
1// Recommended MongoDB connection options 2// When used with ReplicaSet 3const options = { 4 writeConcern: { 5 j: true, 6 w: 'majority', 7 wtimeout: 30000 8 }, 9 readConcern: { 10 level: 'majority' 11 }, 12 readPreference: 'primary' 13}; 14 15MongoClient.connect('mongodb://url', options, (error, client) => { 16 // To avoid "DB locks" — it's a good idea to use separate DB from "main" application DB 17 const db = client.db('dbName'); 18 const job = new JoSk({ db }); 19});
Running Tests
- Clone this package
- In Terminal (Console) go to directory where package is cloned
- Then run:
# Before run tests make sure NODE_ENV === development # Install NPM dependencies npm install --save-dev # Before run tests you need to have running MongoDB MONGO_URL="mongodb://127.0.0.1:27017/npm-josk-test-001" npm test # Be patient, tests are taking around 2 mins
Why JoSk?
JoSk
is Job-Task - Is randomly generated name by "uniq" project
Support our open source contribution:
- Upload and share files using ☄️ meteor-files.com — Continue interrupted file uploads without losing any progress. There is nothing that will stop Meteor from delivering your file to the desired destination
- Use ▲ ostr.io for Server Monitoring, Web Analytics, WebSec, Web-CRON and SEO Pre-rendering of a website
- Star on GitHub
- Star on NPM
- Star on Atmosphere
- Sponsor via GitHub
- Support via PayPal