A customizable job/task queue for javascript processes.
npm install @joakin/task-queue
A customizable job/task queue for javascript processes.
import { Queue } from "@joakin/task-queue";
const queue = Queue();
queue.add(hardToDoComputation).then(result => console.log(result));
queueTimeout
: Time a job spends waiting in the queueexecutionTimeout
: Time a job can spend executingconcurrency
: How many jobs should be run concurrentlymaxTaskCount
: How many tasks can the queue holdQueues are created by default with the following arguments:
const defaultQueue = Queue({
queueTimeout: 500,
executionTimeout: 250,
concurrency: 1,
maxTaskCount: 1
});
Feel free to pass in only the ones you want to modify when creating one.
For adding a job to the queue, just call queue.add(fn)
, where fn
is a
function that returns a promise.
The queue will save the closure and run it when it is its turn. When calling
add
, you get back a Promise
with a cancel
method that you can call to
remove the job from the queue.
queue.add(() => printToPdf(url)).then(pdf => res.send(pdf));
When you add a job, you will get back a promise with a cancel
method that you
can call to remove the job from the queue, whatever its state is (waiting or
running).
// Timeout example for illustration purposes. You can do timeouts better with the queue parameters though.
const job = queue.add(() => printToPdf(url));
const timeout = setTimeout(() => job.cancel(), 10000);
job.then(pdf => {
clearTimeout(timeout);
res.send(pdf);
});
You can also specify a callback for when the job is cancelled, by calling
queue.add(job, onCancel)
. In onCancel you can do cleanup work in case your job
has started running:
let browser;
const job = queue
.add(
() => {
browser = launchBrowser();
printToPdf(browser, url);
},
() => {
if (browser) closeBrowser(browser);
}
)
.then(pdf => {
res.send(pdf);
});
You can get the status of the queue by calling queue.stats()
:
console.log(queue.stats());
/*
{
jobs: {
total: 10,
inProgress: 4,
waiting: 6
},
full: true
}
*/
There are different error cases for an enqueued job. The promise returned when
enqueueing a job can reject with different types of errors, if you want fine
grained control over the reason why it failed.
For logging/metrics purposes, see next section about events.
queue.add(job).catch(err => {
if (err instanceof QueueTimeout) {
// Item timed out in the queue (queueTimeout configuration option)
} else if (err instanceof QueueFull) {
// The queue is full
} else if (err instanceof JobCancelled) {
// A job was cancelled
} else if (err instanceof JobTimeout) {
// A job timed out when running (executionTimeout configuration option)
} else {
// Something else failed when running the job function
}
});
The queue instance is also an event emitter, and will emit events for the
activity on the queue that you can listen to.
Events related to queue are under the queue.*
topic, and events related to
jobs are under the job.*
topic.
// New item added to the queue
queue.on("queue.new", ({ id, inProgressCount, waitingCount }) => {});
// Item timed out in the queue (queueTimeout configuration option)
queue.on("queue.timeout", ({ id, addedToTheQueueAt }) => {});
// The queue is full
queue.on("queue.full", ({ waitingCount, inProgressCount }) => {});
// A job started running
queue.on("job.started", ({ id, addedToTheQueueAt }) => {});
// A job successfully finished
queue.on("job.success", ({ id, addedToTheQueueAt, startedProcessingAt }) => {});
// A job timed out when running (executionTimeout configuration option)
queue.on("job.timeout", ({ id, addedToTheQueueAt, startedProcessingAt }) => {});
// A job was cancelled
queue.on(`job.cancel`, ({ id, addedToTheQueueAt }) => {});
// A job failed when run (promise rejected or running the function threw)
queue.on(
"job.failure",
({ id, addedToTheQueueAt, startedProcessingAt, err }) => {}
);
Here’s the type (typescript format) definitions if you want to get a better
understanding of what types the library provides and expects:
import { EventEmitter } from "events";
export { Queue, JobCancelled, QueueTimeout, QueueFull, JobTimeout };
declare function Queue({
queueTimeout,
executionTimeout,
concurrency,
maxTaskCount
}?: {
queueTimeout?: number | undefined;
executionTimeout?: number | undefined;
concurrency?: number | undefined;
maxTaskCount?: number | undefined;
}): QueueInstance;
interface QueueInstance extends EventEmitter {
add: <T>(
fn: () => Promise<T>,
onCancel?: () => void
) => CancellablePromise<T>;
stats(): QueueStats;
}
interface CancellablePromise<T> extends Promise<T> {
cancel: () => void;
}
interface QueueStats {
jobs: {
total: number;
inProgress: number;
waiting: number;
};
full: boolean;
}
/**
* Error thrown when job gets cancelled. The promise returned by the
* queue gets rejected with JobCancelled
*/
declare class JobCancelled extends Error {
constructor();
}
/**
* Thrown when task timeouts in the queue
*/
declare class QueueTimeout extends Error {
addedToTheQueueAt: number;
constructor(addedToTheQueueAt: number);
}
/**
* Thrown when there is no space for new task
*/
declare class QueueFull extends Error {
waitingCount: number;
inProgressCount: number;
constructor(waitingCount: number, inProgressCount: number);
}
/**
* Thrown when task processing takes too much time
*/
declare class JobTimeout extends Error {
constructor();
}
This library is extracted from
wikimedia/mediawiki-services-chromium-render/lib/queue.js,
where we needed an in-process queue for production pdf rendering services.
Initially, async.js/queue
was used, but wanting to use promises, configuring
timeouts and cancelling jobs made the Reading Web team at the Wikimedia
Foundation build their own task queue implementation for their production use
case.
This library is an adaptation of that work, with the following initial changes
to it: