blackfyre

Distributed asynchronous task queue/job queue

5
3
TypeScript

Blackfyre

Build Status
npm version
Dependency Status
Coverage Status
npm

Distributed asynchronous task queue/job queue

Installation

npm install blackfyre --save

Or

yarn add blackfyre

Features

  • Distribution of parallel work load
  • Real time operation
  • Delayed job
  • Priority job
  • Backend store for results
  • Task retry with different strategies & task abort
  • Process function with pre & post hook
  • Wrap function for apm

TODO

  • Redis backend store & broker
  • More tests

Overview

Performance

In our production env:

Rabbitmq mangement snapshot

Basic

const consumer = new Consumer();

consumer.registerTask(<TaskMeta>{
    name: taskName,
    concurrency: 20,
}, async (data) => {
    console.log(data);
});

await (new Producer())
    .createTask(<Task>{
        name: taskName,
        body: { test: 'test' }
    });

Task delay

await (new Producer())
    .createTask(<Task>{
        name: taskName,
        // Delay for one hour
        eta: new Date() + 60 * 60 * 1000
        body: { test: 'test' }
    });

Task retry

class CustomError extends Error {
  noRetry: boolean = true;
}

const consumer = (new Consumer())
    .registerTask(<TaskMeta>{
        name: taskName,
        concurrency: 20,
        maxRetry: 10,
    }, async (data) => {
        if (Math.random() > 0.5) {
            // will not be retry
            throw new CustomError('ignorable error');
        } else {
            // will be retry
            throw new Error('non-ignorable error');
        }
    });

Using newrelic in process wrap

import * as newrelic from 'newrelic';

const consumer = new Consumer(<ConsumerOptions>{
    processWrap(taskName: string, func: ProcessFunc): ProcessFunc {
        return newrelic.startBackgroundTransaction(taskName, async (data: any, task: Task) => {
            try {
                const result = await processFunc(data, task);
                return result;
            } catch (e) {
                newrelic.noticeError(e);
                throw e;
            }
        });
    }
});

Using prom-client

const summary = new promClient.Summary({
    name: 'job_summary',
    help: 'Summary of jobs',
    percentiles: [0.5, 0.75, 0.9, 0.99, 0.999],
    labelNames: ['state', 'taskName'],
});

const consumer = new Consumer(<ConsumerOptions>{
    preProcess(task: Task) {
      // Yes, `this` binded to the process warp function,
      // so you may share some vars with the `postProcess`
      this.endTimer = summary.startTimer({ taskName: task.name });
    },
    postProcess(task: Task, state: TaskState, errorOrResult: any) {
      this.endTimer({ state });
    },
});

Testing

  const producer = new Producer(<ProducerOptions>{
    isTestMode: true,
  });

  await producer
    .createTask(<Task>{
      name: 'example-task',
      body: { test: 'test' }
    });

console.log(producer.createdTasks[0].body);

/**
 * The output: { test: 'test' }
 */

More examples are in the folder examples or test

License

MIT