Web hook task queue.
nTorque is a task
queue service that uses web hooks.
It is free, open source software released into the public domain that
you can use from any programming language (that speaks HTTP) to queue
up and reliably execute idempotent tasks. For example, in Python:
import os
import requests
params = {'url': 'http://example.com/myhooks/send_email'}
data = {'user_id': 1234}
endpoint = os.environ.get('NTORQUE_URL')
response = requests.post(endpoint, data=data, params=params)
nTorque is designed to be a good solution when you need more reliability than
fire-and-forget but you don’t need an AMPQ / ESB sledgehammer to crack
your “do this later” nut.
Because it uses web hooks, you can:
nTorque provides the following endpoints:
POST /
to enqueue a taskGET /tasks/:id
to view task statusAnd the following features:
nTorque is a Python application comprising of a web application and one or more
worker processes. These use a PostgreSQL database to persist tasks and a
Redis database as a notification channel.
+------+ | +--------+ +--------+ +--------+ |
|POST /| |Frontend| |Web app | |Postgres|
|------| | |--------| |--------| |--------| |
|- url |+- ->|- auth |+-->|- store |+-->|- tasks |
|- data| | |- rate | |- notify| | | |
| | | limits| | | | |
+------+ | +--------+ +--------+ +--------+
+ ^ + |
| | | url
rpush get data |
| | | |
v + v |
| +--------+ +--------+ +---------+
|Redis | |Worker | | |Web hook |
| +--------+ |--------| |---------|
| |- POST |+-|->|- perform|
| nTorque +-blpop-> | data | | task |
+--------+ | +---------+
In the event of a response with status code:
Hack here if you’d like a different strategy.
The real crux of nTorque is a trade-off between request timeout and retry delay.
It’s worth understanding this before deploying – and how to simply mitigate
it by a) specifying an appropriate default timeout and b) overriding this as
necessary on a task by task basis.
Like RQ and Resque, nTorque uses Redis as a push messaging channel. A
request comes in, a notification is rpush
d onto a channel and blpop
d off.
This means that tasks are executed immediately, with a nice evented / push
notification pattern.
Unlike RQ and Resque, nTorque doesn’t trust Redis as a persistence layer.
Instead, it relies on good-old-fashioned PostgreSQL: the first thing nTorque does
when a new task arrives is write it to disk. It then notifies a consumer process
using Redis BLPOP. The consumer then reads the data from disk and performs
the task by making an HTTP request to its url.
In most cases, this request will succeed, the task will be marked as completed
and no more needs to be done. However, this won’t happen every time, e.g.: when
there’s a network error or the webhook server is temporarily down. Because there
are edge case failure scenarios where the web hook response is unreliable, nTorque
refuses to rely on it as the source of truth™ about a task’s status. Instead,
the single source of truth is the PostgreSQL database.
This is achieved by automatically setting a task to retry every time it’s read
(“acquired”) from the database. Specifically, the query that reads the task data
is performed within a transaction that also updates the task’s due date and retry
count. This means that in any failure scenario, nTorque can always just be restarted
(potentially on a new server as long as it connects to the same database) and you
can be sure that tasks will be performed at least once no matter where they were
in the pipeline when whatever it was fell over.
Incidentally, tasks due to be retried are picked up by a background process that
polls the database every NTORQUE_REQUEUE_INTERVAL
seconds.
More importantly, and where this description has been heading, is the relation
between the due date of the task as it lies, gloriously in repose, and the
timeout of the web hook call. For there is one thing we don’t want to do, and
that is keep retrying tasks before they’ve had a chance to complete.
In order to prevent this behaviour, we impose a simple constraint:
The due date set when the task is transactionally read and incremented must
be longer than the web hook timeout.
This means that, in the worst case (when a web hook request does timeout or
fail to respond), you must wait for the full timeout duration before your task
is retried. So whilst you may naturally want to set a relatively high timeout
for long running tasks, you may want to keep it shorter for simper tasks like
sending your new user’s welcome or reset password email: so that they’re
retried faster.
The good news is that, in addition to the global NTORQUE_DEFAULT_TIMEOUT
configuration variable, you can set an appropriate timeout for different tasks
using the timeout
query parameter.
Simple – once you know how the system works.
Clone the repo, install the Python app using:
bash pip_install.sh
You need Redis and Postgres running. If necessary, create the database:
createdb -T template0 -E UTF8 ntorque
If you like, install Foreman, to run the multiple processes, using:
bundle install
Run the migrations:
foreman run alembic upgrade head
Bootstrap an app (if you’d like to authenticate access with an API key):
foreman run python alembic/scripts/create_application.py --name YOURAPP
You should then be able to:
foreman start
Alternatively, skip the Foreman stuff and run the commands listed in Processes
manually / using a Docker / Chef / init.d wrapper. Or push to Heroku, run the
migrations and it should just work.
Algorithm / Behaviour:
NTORQUE_BACKOFF
: exponential
(default) or linear
NTORQUE_CLEANUP_AFTER_DAYS
: how many days to leave tasks in the db for, defaults7
NTORQUE_DEFAULT_TIMEOUT
: how long, in seconds, to wait before treating a web60
see the algorithm sectionNTORQUE_MIN_DUE_DELAY
: minimum delay before retrying – don’t set any lower2
NTORQUE_MAX_DUE_DELAY
: maximum retry delay – defaults to 7200
but youNTORQUE_DEFAULT_TIMEOUT
NTORQUE_MAX_RETRIES
: how many attempts before giving up on a task – defaults36
NTORQUE_REQUEUE_INTERVAL
: how often, in seconds, to poll the database forNTORQUE_TRANSIENT_REQUEST_ERRORS
: 4xx errors which ntorque should retry – defaults to ‘408,423,429,449’Deployment:
NTORQUE_AUTHENTICATE
: whether to require authentication; defaults to True
NTORQUE_ENABLE_HSTS
: set this to True
if you’re using HSTSHSTS_PROTOCOL_HEADER
: set this to, e.g.: X-Forwarded-Proto
if you’re runningMODE
: if set to development
this will run Gunicorn in watch mode (so the appproduction
it will run Gunicornrun.sh
and / or gunicorn.py
Redis:
NTORQUE_REDIS_CHANNEL
: name of your Redis list used as a notification channel;ntorque
REDIS_URL
, etc.: see pyramid_redis for details on how to configure yourDatabase:
DATABASE_URL
, defaults to postgresql:///ntorque
SQLALCHEMY_MAX_OVERFLOW
, SQLALCHEMY_POOL_CLASS
, SQLALCHEMY_POOL_SIZE
andSQLALCHEMY_POOL_RECYCLE
– see the SQLAlchemy docs on engine configurationSQLALCHEMY_POOL_CLASS=sqlalchemy.pool.NullPool
If you set NTORQUE_AUTHENTICATE
to True
then you need to create at least one
application (e.g.: using the alembic/scripts/create_application.py
script) and
provide its api key in the NTORQUE_API_KEY
header when enqueuing a task.
POST /
To enqueue a task, make a POST request to the root path of your nTorque
installation.
Required:
url
query parameter; this is the url to your web hook that you want nTorqueOptional:
method
query parameter; which http method to use when calling the webhook –timeout
query parameter; how long, in seconds, to wait before treating theData:
This aside, you can pass through any POST data, encoded as any content type you
like. The data, content type and character encoding will be passed on in the POST
(or DELETE, PUT or PATCH) request to your web hook.
Headers:
Aside from the content type, length and charset headers, derived from your
request, you can specify headers to pass through to your web hook, by prefixing
the header name with NTORQUE-PASSTHROUGH-
. So, for example, to pass through
a FOO: Bar
header, you would provide NTORQUE-PASSTHROUGH-FOO: Bar
in your
request headers.
Response:
You should receive a 201 response with the url to the task in the Location
header.
GET /task/:id
Returns a JSON data dict with status information about a task.
POST /task/:id/push
Pushes a task onto the redis notification channel to be consumed, aquired and
performed. You should not normally need to use this. It’s exposed as an
optimisation for hybrid integrations.
nTorque is a system for reliably calling web hook task handlers: not for
implementing them. You are responsible for implementing and exposing your own
web hooks. In most languages and frameworks this is simple, e.g.: in Ruby
using Sinatra:
post '/hooks/foo' do
# your code here
end
Or in Python using Flask:
@app.route('/hooks/foo', methods=['POST'])
def foo():
# your code here
Key things to bear in mind are:
After successfully performing their task, your web hooks are expected to return
an HTTP response with a 200
or 201
status code. If not, nTorque will keep
retrying the task.
Your web server must be configured with a high enough timeout to allow tasks
enough time to complete. If not, you may be responding with an error when tasks
are actually being performed successfully.
For example, for a 30 minute timeout with Apache as a proxy:
Timeout 1800
ProxyTimeout 1800
Or with Nginx:
send_timeout 1800;
proxy_send_timeout 1800;
If your web hooks are exposed on a public IP, you are likely to want to secure
them, e.g.: using HTTPS and an authentication credential like an API key.
It’s also worth noting that you may need to turn off CSRF validation.
Raise bugs / issues on GitHub.