meteor-job-collection v2.0
Persistent Reactive Job Queue for Meteor 3.x
A powerful and easy-to-use job queue for Meteor, supporting distributed workers that can run anywhere. Version 2.0 is completely rewritten in TypeScript with full async/await support for Meteor 3.x and Node.js 18+.
Note: This is a modernized TypeScript fork of the original vsivsi:job-collection. All credit for the original design and implementation goes to Vaughn Iverson.
✨ What's New in v2.0
- 🔄 Full async/await support - Modern promise-based API
- 📘 TypeScript - Complete type definitions included
- 🚀 Meteor 3.x compatible - No Fibers dependency (100% removed)
- ⚡ Node.js 22 ready - Supports Node 18, 20, and 22
- 🔙 Backward compatible - Callback APIs still supported
- 🎯 Better performance - Faster with native promises
🚨 Upgrading from v1.x?
Key Changes:
- Meteor 3.0+ and Node.js 18+ now required
- All methods now return Promises - use
async/awaitor callbacks - No Fibers dependency - pure async/await throughout
- Use
setJobAllow/setJobDenyfor permissions (orallow/denyfor backward compat)
1// OLD (v1.x with Fibers) 2const job = myJobs.getJob(id); 3 job.done(); 4 5// NEW (v2.x) 6const job = await myJobs.getJob(id); 7await job.done();
📦 Installation
For Meteor Apps (Recommended)
meteor add strigops:job-collection
Use this in your Meteor application (server and client code).
For Standalone Node.js Workers
npm install @strigo/meteor-job-collection
Use this for remote workers that connect to your Meteor app via DDP.
Requirements:
- Meteor 3.0+ (for Meteor apps)
- Node.js 18+ (20+ or 22 recommended)
- MongoDB 5.0+
Which one should I use?
- 🏠 Meteor App: Use
meteor add strigo:job-collection - 🚀 Remote Workers: Use
npm install @strigo/meteor-job-collection - 📊 Both: Meteor package in your app + npm package for scaled workers
🚀 Quick Start
Server Setup
1import { JobCollection } from 'meteor/strigops:job-collection'; 2 3// Create job collection 4const myJobs = new JobCollection('myJobQueue'); 5 6// Set up permissions 7myJobs.setJobAllow({ 8 admin: (userId) => !!userId // Authenticated users only 9}); 10 11// Start server on startup 12Meteor.startup(async () => { 13 await myJobs.startJobServer(); 14}); 15 16// Publish jobs (optional) 17Meteor.publish('allJobs', function() { 18 return myJobs.find({}); 19});
Create and Submit Jobs
1import { Job } from 'meteor/strigops:job-collection'; 2 3// Create a job with async/await 4async function scheduleEmail() { 5 const job = new Job(myJobs, 'sendEmail', { 6 to: 'user@example.com', 7 subject: 'Hello', 8 body: 'Welcome!' 9 }); 10 11 const id = await job 12 .priority('normal') 13 .retry({ retries: 5, wait: 15*60*1000 }) // 15 min between retries 14 .save(); 15 16 return id; 17}
Process Jobs (Workers)
1// Worker with async/await 2myJobs.processJobs( 3 'sendEmail', 4 { concurrency: 4 }, 5 async (job, callback) => { 6 try { 7 await sendEmail(job.data); 8 await job.done(); 9 } catch (error) { 10 await job.fail(error.message); 11 } 12 callback(); // Always call callback! 13 } 14);
📖 Core Concepts
Job Lifecycle
waiting → ready → running → completed ↓ ↓ ↓ paused ready failed → waiting (retry) ↓ ↓ waiting cancelled
Job Configuration
1const job = new Job(myJobs, 'processImage', data) 2 .priority('high') // low, normal, medium, high, critical 3 .retry({ 4 retries: 5, 5 wait: 5*60*1000, // 5 minutes between retries 6 backoff: 'exponential' // or 'constant' 7 }) 8 .repeat({ 9 repeats: 10, // or Job.forever 10 wait: 60*60*1000 // 1 hour between repeats 11 }) 12 .delay(30*1000) // Delay 30 seconds 13 .depends([job1, job2]) // Wait for dependencies 14 .after(new Date('2024-01-01')) // Run after date 15 .save();
🔧 TypeScript Support
Full type safety out of the box:
1import { Job, JobCollection } from 'meteor/strigops:job-collection'; 2 3// Define job data types 4interface EmailJobData { 5 to: string; 6 subject: string; 7 body: string; 8} 9 10// Type-safe job creation 11const job = new Job<EmailJobData>(myJobs, 'sendEmail', { 12 to: 'user@example.com', 13 subject: 'Hello', 14 body: 'Welcome!' 15}); 16 17// Type-safe workers 18myJobs.processJobs<EmailJobData>( 19 'sendEmail', 20 async (job, callback) => { 21 const { to, subject, body } = job.data; // Fully typed! 22 await sendEmail(to, subject, body); 23 await job.done(); 24 callback(); 25 } 26);
🎯 API Reference
Job Methods
All methods support both async/await and callbacks:
1// With async/await 2const id = await job.save(); 3await job.done(result); 4await job.fail(error); 5await job.log('message', { level: 'info' }); 6await job.progress(50, 100); 7await job.pause(); 8await job.resume(); 9await job.cancel(); 10await job.restart(); 11await job.remove(); 12 13// With callbacks (backward compatible) 14job.save((err, id) => { /* ... */ }); 15job.done(result, (err, success) => { /* ... */ });
Job Creation:
new Job(collection, type, data)- Create new job.priority(level)- Set priority (low/normal/medium/high/critical).retry(options)- Configure retry behavior.repeat(options)- Configure repeat behavior.delay(ms)- Delay before first run.after(date)- Run after specific date.depends(jobs)- Set job dependencies.save([options])- Save to collection
Job Control:
job.refresh()- Reload from serverjob.done([result])- Mark as completedjob.fail([error])- Mark as failedjob.pause()- Pause jobjob.resume()- Resume paused jobjob.cancel()- Cancel jobjob.restart()- Restart failed/cancelled jobjob.rerun()- Clone and rerun completed jobjob.remove()- Remove from collectionjob.ready()- Force to ready state
Job Monitoring:
job.log(message, [options])- Add log entryjob.progress(completed, total)- Update progress
JobCollection Methods
1// Get jobs 2await myJobs.getJob(id) 3await myJobs.getJobs([id1, id2, id3]) 4await myJobs.getWork(type, options) 5 6// Bulk operations 7await myJobs.readyJobs([id1, id2]) 8await myJobs.pauseJobs([id1, id2]) 9await myJobs.resumeJobs([id1, id2]) 10await myJobs.cancelJobs([id1, id2]) 11await myJobs.restartJobs([id1, id2]) 12await myJobs.removeJobs([id1, id2]) 13 14// Server control 15await myJobs.startJobServer() 16await myJobs.shutdownJobServer({ timeout: 60000 }) 17 18// Worker queue 19const workers = myJobs.processJobs(type, options, worker)
JobQueue Methods (Workers)
1const workers = myJobs.processJobs('type', options, worker); 2 3workers.pause() // Pause processing 4workers.resume() // Resume processing 5workers.trigger() // Manually trigger work check 6workers.length() // Jobs waiting in queue 7workers.running() // Jobs currently running 8workers.idle() // True if no jobs 9workers.full() // True if at max concurrency 10 11// Graceful shutdown 12workers.shutdown({ level: 'soft' }, () => { 13 console.log('Shutdown complete'); 14});
🔒 Security
Fine-grained permission control:
1// Use setJobAllow/setJobDeny (or allow/deny for backward compatibility) 2myJobs.setJobAllow({ 3 // Admin: full control 4 admin: (userId, method, params) => { 5 return Roles.userIsInRole(userId, 'admin'); 6 }, 7 8 // Manager: can manage existing jobs 9 manager: (userId, method, params) => { 10 return Roles.userIsInRole(userId, 'manager'); 11 }, 12 13 // Creator: can create new jobs 14 creator: (userId, method, params) => { 15 return userId !== null; 16 }, 17 18 // Worker: can get work and update status 19 worker: (userId, method, params) => { 20 return Roles.userIsInRole(userId, 'worker'); 21 } 22}); 23 24// Deny rules (override allow) 25myJobs.setJobDeny({ 26 admin: (userId) => userId === 'bannedUserId' 27}); 28 29// Note: You can also use allow/deny for backward compatibility 30// myJobs.allow({ ... }) and myJobs.deny({ ... }) work the same way
📡 Remote Workers (Node.js)
Run workers outside Meteor:
1// worker.js 2import DDP from 'ddp'; 3import DDPLogin from 'ddp-login'; 4import { Job } from '@strigo/meteor-job-collection'; 5 6const ddp = new DDP({ 7 host: 'localhost', 8 port: 3000, 9 use_ejson: true 10}); 11 12Job.setDDP(ddp); 13 14ddp.connect(async (err) => { 15 if (err) throw err; 16 17 await DDPLogin(ddp, { /* credentials */ }); 18 19 const workers = Job.processJobs( 20 'myQueue', 21 'sendEmail', 22 { concurrency: 10 }, 23 async (job, callback) => { 24 await sendEmail(job.data); 25 await job.done(); 26 callback(); 27 } 28 ); 29});
🔄 Advanced Features
Job Dependencies
1// Job 2 waits for Job 1 2const job1 = await new Job(myJobs, 'download', data).save(); 3const job2 = await new Job(myJobs, 'process', data) 4 .depends([job1]) 5 .save();
Scheduled Jobs (Later.js)
1const job = new Job(myJobs, 'dailyReport', {}) 2 .repeat({ 3 schedule: myJobs.later.parse.text('at 9:00 am every weekday') 4 }) 5 .save();
Progress Tracking
1myJobs.processJobs('process', async (job, callback) => { 2 const total = 100; 3 for (let i = 0; i < total; i++) { 4 await processItem(i); 5 await job.progress(i + 1, total); 6 } 7 await job.done(); 8 callback(); 9});
Logging
1// Set up file logging 2import fs from 'fs'; 3const logStream = fs.createWriteStream('jobs.log', { flags: 'a' }); 4myJobs.setLogStream(logStream); 5 6// Or use event-based logging 7myJobs.events.on('call', (msg) => { 8 console.log(`${msg.method}:`, msg.returnVal); 9});
🧹 Maintenance
Cleaning Up Old Jobs
1// Periodic cleanup 2async function cleanupJobs() { 3 const weekAgo = new Date(Date.now() - 7*24*60*60*1000); 4 const oldJobs = await myJobs 5 .find({ 6 status: { $in: ['completed', 'cancelled'] }, 7 updated: { $lt: weekAgo } 8 }) 9 .fetchAsync(); 10 11 const ids = oldJobs.map(j => j._id); 12 await myJobs.removeJobs(ids); 13} 14 15// Schedule cleanup job 16new Job(myJobs, 'cleanup', {}) 17 .repeat({ 18 schedule: myJobs.later.parse.text('at 3:00 am') 19 }) 20 .save(); 21 22myJobs.processJobs('cleanup', async (job, callback) => { 23 await cleanupJobs(); 24 await job.done(); 25 callback(); 26});
Monitoring
1// Count jobs by status 2const stats = await Promise.all([ 3 myJobs.find({ status: 'waiting' }).countAsync(), 4 myJobs.find({ status: 'running' }).countAsync(), 5 myJobs.find({ status: 'completed' }).countAsync(), 6 myJobs.find({ status: 'failed' }).countAsync() 7]); 8 9console.log('Jobs:', { 10 waiting: stats[0], 11 running: stats[1], 12 completed: stats[2], 13 failed: stats[3] 14});
⚡ Performance Tips
- Use appropriate concurrency
1 myJobs.processJobs('type', { concurrency: 10 }, worker);
- Batch operations
1 await myJobs.pauseJobs(manyIds); // Better than loop
- Add indexes for custom queries
1 await myJobs.createIndexAsync({ type: 1, created: -1 });
-
Clean up old jobs regularly
-
Use prefetch to reduce latency
1 myJobs.processJobs('type', { prefetch: 5 }, worker);
🐛 Troubleshooting
Jobs Not Processing?
1// Check job status 2const ready = await myJobs.find({ status: 'ready' }).countAsync(); 3console.log('Ready jobs:', ready); 4 5// Check worker status 6console.log('Running:', workers.running()); 7console.log('Queued:', workers.length()); 8 9// Monitor errors 10myJobs.events.on('error', (msg) => { 11 console.error('Error:', msg.error); 12});
Jobs Stuck in Running?
- Check
workTimeoutconfiguration - Look for worker crashes
- Auto-fail for expired jobs is built-in
Performance Issues?
- Add database indexes
- Increase concurrency
- Use prefetch option
- Clean up old jobs
🏗️ Building from Source
# Clone repository git clone https://github.com/strigo/meteor-job-collection.git cd meteor-job-collection # Install dependencies npm install # Build TypeScript npm run build # Test meteor test-packages ./
📚 Documentation
Job Document Schema
1{ 2 _id: JobId, 3 runId: JobId | null, 4 type: string, 5 status: 'waiting' | 'paused' | 'ready' | 'running' | 'failed' | 'cancelled' | 'completed', 6 data: object, 7 result?: object, 8 failures?: object[], 9 priority: number, 10 depends: JobId[], 11 resolved: JobId[], 12 after: Date, 13 updated: Date, 14 created: Date, 15 workTimeout?: number, 16 expiresAfter?: Date, 17 log?: LogEntry[], 18 progress: { completed: number, total: number, percent: number }, 19 retries: number, 20 retried: number, 21 retryUntil: Date, 22 retryWait: number, 23 retryBackoff: 'constant' | 'exponential', 24 repeats: number, 25 repeated: number, 26 repeatUntil: Date, 27 repeatWait: number | LaterJSSchedule 28}
DDP Methods
All DDP methods are prefixed with the collection name (e.g., myQueue_getWork):
startJobServer(options)- Start servershutdownJobServer(options)- Stop servergetJob(ids, options)- Get job(s) by IDgetWork(type, options)- Get ready jobsjobSave(doc, options)- Save jobjobRemove(ids)- Remove jobsjobPause(ids)- Pause jobsjobResume(ids)- Resume jobsjobReady(ids, options)- Ready jobsjobCancel(ids, options)- Cancel jobsjobRestart(ids, options)- Restart jobsjobRerun(id, options)- Rerun jobjobLog(id, runId, message, options)- Add logjobProgress(id, runId, completed, total)- Update progressjobDone(id, runId, result, options)- Mark completejobFail(id, runId, error, options)- Mark failed
📄 License
MIT License - Copyright (C) 2014-2024 by Vaughn Iverson
This is a modernized fork with TypeScript and async/await support. Original project: vsivsi/meteor-job-collection
🤝 Contributing
Contributions welcome! Please:
- Write tests in TypeScript
- Use async/await patterns
- Follow existing code style
- Add documentation
- Update changelog
Development Setup
git clone YOUR_REPO_URL cd meteor-job-collection npm install npm run watch # Auto-compile on changes
Running Tests
meteor test-packages ./
🔗 Links
- Changelog
- Test Guide
- Publishing Guide
- Original Package: vsivsi:job-collection
Made with ❤️ for the Meteor community