strigops:job-collection

v2.0.6Published last week

meteor-job-collection v2.0

Atmosphere npm GitHub

Persistent Reactive Job Queue for Meteor 3.x

A powerful and easy-to-use job queue for Meteor, supporting distributed workers that can run anywhere. Version 2.0 is completely rewritten in TypeScript with full async/await support for Meteor 3.x and Node.js 18+.

Note: This is a modernized TypeScript fork of the original vsivsi:job-collection. All credit for the original design and implementation goes to Vaughn Iverson.

✨ What's New in v2.0

  • 🔄 Full async/await support - Modern promise-based API
  • 📘 TypeScript - Complete type definitions included
  • 🚀 Meteor 3.x compatible - No Fibers dependency (100% removed)
  • Node.js 22 ready - Supports Node 18, 20, and 22
  • 🔙 Backward compatible - Callback APIs still supported
  • 🎯 Better performance - Faster with native promises

🚨 Upgrading from v1.x?

Key Changes:

  • Meteor 3.0+ and Node.js 18+ now required
  • All methods now return Promises - use async/await or callbacks
  • No Fibers dependency - pure async/await throughout
  • Use setJobAllow/setJobDeny for permissions (or allow/deny for backward compat)
1// OLD (v1.x with Fibers)
2const job = myJobs.getJob(id);
3              job.done();
4
5// NEW (v2.x)
6const job = await myJobs.getJob(id);
7await job.done();

📦 Installation

meteor add strigops:job-collection

Use this in your Meteor application (server and client code).

For Standalone Node.js Workers

npm install @strigo/meteor-job-collection

Use this for remote workers that connect to your Meteor app via DDP.

Requirements:

  • Meteor 3.0+ (for Meteor apps)
  • Node.js 18+ (20+ or 22 recommended)
  • MongoDB 5.0+

Which one should I use?

  • 🏠 Meteor App: Use meteor add strigo:job-collection
  • 🚀 Remote Workers: Use npm install @strigo/meteor-job-collection
  • 📊 Both: Meteor package in your app + npm package for scaled workers

🚀 Quick Start

Server Setup

1import { JobCollection } from 'meteor/strigops:job-collection';
2
3// Create job collection
4const myJobs = new JobCollection('myJobQueue');
5
6// Set up permissions
7myJobs.setJobAllow({
8  admin: (userId) => !!userId  // Authenticated users only
9});
10
11// Start server on startup
12Meteor.startup(async () => {
13  await myJobs.startJobServer();
14});
15
16// Publish jobs (optional)
17Meteor.publish('allJobs', function() {
18  return myJobs.find({});
19});

Create and Submit Jobs

1import { Job } from 'meteor/strigops:job-collection';
2
3// Create a job with async/await
4async function scheduleEmail() {
5  const job = new Job(myJobs, 'sendEmail', {
6    to: 'user@example.com',
7    subject: 'Hello',
8    body: 'Welcome!'
9  });
10
11  const id = await job
12    .priority('normal')
13    .retry({ retries: 5, wait: 15*60*1000 })  // 15 min between retries
14    .save();
15
16  return id;
17}

Process Jobs (Workers)

1// Worker with async/await
2myJobs.processJobs(
3  'sendEmail',
4  { concurrency: 4 },
5  async (job, callback) => {
6    try {
7      await sendEmail(job.data);
8      await job.done();
9    } catch (error) {
10      await job.fail(error.message);
11    }
12    callback();  // Always call callback!
13  }
14);

📖 Core Concepts

Job Lifecycle

waiting → ready → running → completed
   ↓        ↓        ↓
paused   ready    failed → waiting (retry)
   ↓                 ↓
waiting          cancelled

Job Configuration

1const job = new Job(myJobs, 'processImage', data)
2  .priority('high')              // low, normal, medium, high, critical
3  .retry({
4    retries: 5,
5    wait: 5*60*1000,             // 5 minutes between retries
6    backoff: 'exponential'        // or 'constant'
7  })
8  .repeat({
9    repeats: 10,                  // or Job.forever
10    wait: 60*60*1000             // 1 hour between repeats
11  })
12  .delay(30*1000)                // Delay 30 seconds
13  .depends([job1, job2])         // Wait for dependencies
14  .after(new Date('2024-01-01')) // Run after date
15  .save();

🔧 TypeScript Support

Full type safety out of the box:

1import { Job, JobCollection } from 'meteor/strigops:job-collection';
2
3// Define job data types
4interface EmailJobData {
5  to: string;
6  subject: string;
7  body: string;
8}
9
10// Type-safe job creation
11const job = new Job<EmailJobData>(myJobs, 'sendEmail', {
12  to: 'user@example.com',
13  subject: 'Hello',
14  body: 'Welcome!'
15});
16
17// Type-safe workers
18myJobs.processJobs<EmailJobData>(
19  'sendEmail',
20  async (job, callback) => {
21    const { to, subject, body } = job.data;  // Fully typed!
22    await sendEmail(to, subject, body);
23    await job.done();
24    callback();
25  }
26);

🎯 API Reference

Job Methods

All methods support both async/await and callbacks:

1// With async/await
2const id = await job.save();
3await job.done(result);
4await job.fail(error);
5await job.log('message', { level: 'info' });
6await job.progress(50, 100);
7await job.pause();
8await job.resume();
9await job.cancel();
10await job.restart();
11await job.remove();
12
13// With callbacks (backward compatible)
14job.save((err, id) => { /* ... */ });
15job.done(result, (err, success) => { /* ... */ });

Job Creation:

  • new Job(collection, type, data) - Create new job
  • .priority(level) - Set priority (low/normal/medium/high/critical)
  • .retry(options) - Configure retry behavior
  • .repeat(options) - Configure repeat behavior
  • .delay(ms) - Delay before first run
  • .after(date) - Run after specific date
  • .depends(jobs) - Set job dependencies
  • .save([options]) - Save to collection

Job Control:

  • job.refresh() - Reload from server
  • job.done([result]) - Mark as completed
  • job.fail([error]) - Mark as failed
  • job.pause() - Pause job
  • job.resume() - Resume paused job
  • job.cancel() - Cancel job
  • job.restart() - Restart failed/cancelled job
  • job.rerun() - Clone and rerun completed job
  • job.remove() - Remove from collection
  • job.ready() - Force to ready state

Job Monitoring:

  • job.log(message, [options]) - Add log entry
  • job.progress(completed, total) - Update progress

JobCollection Methods

1// Get jobs
2await myJobs.getJob(id)
3await myJobs.getJobs([id1, id2, id3])
4await myJobs.getWork(type, options)
5
6// Bulk operations
7await myJobs.readyJobs([id1, id2])
8await myJobs.pauseJobs([id1, id2])
9await myJobs.resumeJobs([id1, id2])
10await myJobs.cancelJobs([id1, id2])
11await myJobs.restartJobs([id1, id2])
12await myJobs.removeJobs([id1, id2])
13
14// Server control
15await myJobs.startJobServer()
16await myJobs.shutdownJobServer({ timeout: 60000 })
17
18// Worker queue
19const workers = myJobs.processJobs(type, options, worker)

JobQueue Methods (Workers)

1const workers = myJobs.processJobs('type', options, worker);
2
3workers.pause()          // Pause processing
4workers.resume()         // Resume processing
5workers.trigger()        // Manually trigger work check
6workers.length()         // Jobs waiting in queue
7workers.running()        // Jobs currently running
8workers.idle()           // True if no jobs
9workers.full()           // True if at max concurrency
10
11// Graceful shutdown
12workers.shutdown({ level: 'soft' }, () => {
13  console.log('Shutdown complete');
14});

🔒 Security

Fine-grained permission control:

1// Use setJobAllow/setJobDeny (or allow/deny for backward compatibility)
2myJobs.setJobAllow({
3  // Admin: full control
4  admin: (userId, method, params) => {
5    return Roles.userIsInRole(userId, 'admin');
6  },
7  
8  // Manager: can manage existing jobs
9  manager: (userId, method, params) => {
10    return Roles.userIsInRole(userId, 'manager');
11  },
12  
13  // Creator: can create new jobs
14  creator: (userId, method, params) => {
15    return userId !== null;
16  },
17  
18  // Worker: can get work and update status
19  worker: (userId, method, params) => {
20    return Roles.userIsInRole(userId, 'worker');
21  }
22});
23
24// Deny rules (override allow)
25myJobs.setJobDeny({
26  admin: (userId) => userId === 'bannedUserId'
27});
28
29// Note: You can also use allow/deny for backward compatibility
30// myJobs.allow({ ... }) and myJobs.deny({ ... }) work the same way

📡 Remote Workers (Node.js)

Run workers outside Meteor:

1// worker.js
2import DDP from 'ddp';
3import DDPLogin from 'ddp-login';
4import { Job } from '@strigo/meteor-job-collection';
5
6const ddp = new DDP({
7  host: 'localhost',
8  port: 3000,
9  use_ejson: true
10});
11
12Job.setDDP(ddp);
13
14ddp.connect(async (err) => {
15  if (err) throw err;
16  
17  await DDPLogin(ddp, { /* credentials */ });
18  
19  const workers = Job.processJobs(
20    'myQueue',
21    'sendEmail',
22    { concurrency: 10 },
23    async (job, callback) => {
24      await sendEmail(job.data);
25      await job.done();
26      callback();
27    }
28  );
29});

🔄 Advanced Features

Job Dependencies

1// Job 2 waits for Job 1
2const job1 = await new Job(myJobs, 'download', data).save();
3const job2 = await new Job(myJobs, 'process', data)
4  .depends([job1])
5  .save();

Scheduled Jobs (Later.js)

1const job = new Job(myJobs, 'dailyReport', {})
2  .repeat({
3    schedule: myJobs.later.parse.text('at 9:00 am every weekday')
4  })
5  .save();

Progress Tracking

1myJobs.processJobs('process', async (job, callback) => {
2  const total = 100;
3  for (let i = 0; i < total; i++) {
4    await processItem(i);
5    await job.progress(i + 1, total);
6  }
7  await job.done();
8  callback();
9});

Logging

1// Set up file logging
2import fs from 'fs';
3const logStream = fs.createWriteStream('jobs.log', { flags: 'a' });
4myJobs.setLogStream(logStream);
5
6// Or use event-based logging
7myJobs.events.on('call', (msg) => {
8  console.log(`${msg.method}:`, msg.returnVal);
9});

🧹 Maintenance

Cleaning Up Old Jobs

1// Periodic cleanup
2async function cleanupJobs() {
3  const weekAgo = new Date(Date.now() - 7*24*60*60*1000);
4  const oldJobs = await myJobs
5    .find({
6      status: { $in: ['completed', 'cancelled'] },
7      updated: { $lt: weekAgo }
8    })
9    .fetchAsync();
10  
11  const ids = oldJobs.map(j => j._id);
12  await myJobs.removeJobs(ids);
13}
14
15// Schedule cleanup job
16new Job(myJobs, 'cleanup', {})
17  .repeat({
18    schedule: myJobs.later.parse.text('at 3:00 am')
19  })
20  .save();
21
22myJobs.processJobs('cleanup', async (job, callback) => {
23  await cleanupJobs();
24  await job.done();
25  callback();
26});

Monitoring

1// Count jobs by status
2const stats = await Promise.all([
3  myJobs.find({ status: 'waiting' }).countAsync(),
4  myJobs.find({ status: 'running' }).countAsync(),
5  myJobs.find({ status: 'completed' }).countAsync(),
6  myJobs.find({ status: 'failed' }).countAsync()
7]);
8
9console.log('Jobs:', {
10  waiting: stats[0],
11  running: stats[1],
12  completed: stats[2],
13  failed: stats[3]
14});

⚡ Performance Tips

  1. Use appropriate concurrency
1   myJobs.processJobs('type', { concurrency: 10 }, worker);
  1. Batch operations
1   await myJobs.pauseJobs(manyIds);  // Better than loop
  1. Add indexes for custom queries
1   await myJobs.createIndexAsync({ type: 1, created: -1 });
  1. Clean up old jobs regularly

  2. Use prefetch to reduce latency

1   myJobs.processJobs('type', { prefetch: 5 }, worker);

🐛 Troubleshooting

Jobs Not Processing?

1// Check job status
2const ready = await myJobs.find({ status: 'ready' }).countAsync();
3console.log('Ready jobs:', ready);
4
5// Check worker status
6console.log('Running:', workers.running());
7console.log('Queued:', workers.length());
8
9// Monitor errors
10myJobs.events.on('error', (msg) => {
11  console.error('Error:', msg.error);
12});

Jobs Stuck in Running?

  • Check workTimeout configuration
  • Look for worker crashes
  • Auto-fail for expired jobs is built-in

Performance Issues?

  • Add database indexes
  • Increase concurrency
  • Use prefetch option
  • Clean up old jobs

🏗️ Building from Source

# Clone repository
git clone https://github.com/strigo/meteor-job-collection.git
cd meteor-job-collection

# Install dependencies
npm install

# Build TypeScript
npm run build

# Test
meteor test-packages ./

📚 Documentation

Job Document Schema

1{
2  _id: JobId,
3  runId: JobId | null,
4  type: string,
5  status: 'waiting' | 'paused' | 'ready' | 'running' | 'failed' | 'cancelled' | 'completed',
6  data: object,
7  result?: object,
8  failures?: object[],
9  priority: number,
10  depends: JobId[],
11  resolved: JobId[],
12  after: Date,
13  updated: Date,
14  created: Date,
15  workTimeout?: number,
16  expiresAfter?: Date,
17  log?: LogEntry[],
18  progress: { completed: number, total: number, percent: number },
19  retries: number,
20  retried: number,
21  retryUntil: Date,
22  retryWait: number,
23  retryBackoff: 'constant' | 'exponential',
24  repeats: number,
25  repeated: number,
26  repeatUntil: Date,
27  repeatWait: number | LaterJSSchedule
28}

DDP Methods

All DDP methods are prefixed with the collection name (e.g., myQueue_getWork):

  • startJobServer(options) - Start server
  • shutdownJobServer(options) - Stop server
  • getJob(ids, options) - Get job(s) by ID
  • getWork(type, options) - Get ready jobs
  • jobSave(doc, options) - Save job
  • jobRemove(ids) - Remove jobs
  • jobPause(ids) - Pause jobs
  • jobResume(ids) - Resume jobs
  • jobReady(ids, options) - Ready jobs
  • jobCancel(ids, options) - Cancel jobs
  • jobRestart(ids, options) - Restart jobs
  • jobRerun(id, options) - Rerun job
  • jobLog(id, runId, message, options) - Add log
  • jobProgress(id, runId, completed, total) - Update progress
  • jobDone(id, runId, result, options) - Mark complete
  • jobFail(id, runId, error, options) - Mark failed

📄 License

MIT License - Copyright (C) 2014-2024 by Vaughn Iverson

This is a modernized fork with TypeScript and async/await support. Original project: vsivsi/meteor-job-collection


🤝 Contributing

Contributions welcome! Please:

  1. Write tests in TypeScript
  2. Use async/await patterns
  3. Follow existing code style
  4. Add documentation
  5. Update changelog

Development Setup

git clone YOUR_REPO_URL
cd meteor-job-collection
npm install
npm run watch  # Auto-compile on changes

Running Tests

meteor test-packages ./


Made with ❤️ for the Meteor community