Kale is a python task worker library that supports priority queues on Amazon SQS
Kale is a python task worker library that supports priority queues on Amazon SQS.
Check out our blog post - Nextdoor Taskworker: Simple, Efficient & Scalable
Like other distributed task queue system, publishers send task messages to queues and workers fetch messages from queues. For now, Kale supports only Amazon SQS for the queue.
A publisher can be any python program that imports a Kale-based task class and invokes the publish function of this class. For example, if a task class looks like this:
# tasks.py
class MyTask:
def run_task(self, arg1, arg2, *args, **kwargs):
# Do something
Then the publisher publishes a task to Amazon SQS, which normally takes 10s miliseconds to return:
import tasks
tasks.MyTask.publish(None, arg1, arg2)
The publish() function is a static method of a task class. Other than the first parameter, which can usually be None
, it has the same signiture as the run_task()
method. A worker process, which may run on a different machine, will pick up the message and execute run_task()
method of the task.
While ndkale is usable out of the box, the first parameter in publish(app_data, *args, *kwargs)
is designed for more complex situations where certain state may need to be passed outside the context of the actual task parameters. One example of this might be to pass the environment. The app_data
must be pickleable so that in can be encoded and inserted into the SQS message.
The default task object will be populated with an app_data
attribute, but the default worker will not use it. You will need to extend the default Worker or Task class to take advantage of app_data
.
A worker process runs an infinite loop. For each iteration, it does the following things:
Code: kale/queue_selector.py
A good queue selection algorithm has these requirements:
We experimented and benchmarked quite a few queue selection algorithms. We end up using an
improved version of lottery algorithm, ReducedLottery, which fulfill the above requirements.
ReducedLottery works like this:
Initialize the lottery pool with all queues
while lottery pool is not empty:
Run lottery based on queue priority to get a queue who wins the jackpot
Short poll SQS to see if the selected queue is empty
if the selected queue is not empty:
return queue
else:
Remove this queue from the lottery pool
Reset the lottery pool with all queues
Return whatever queue who wins the jackpot
The beauty of ReducedLottery:
There are two types of settings, worker config and queue config.
Settings are specified in settings modules, including AWS confidentials, queues config, queue selection
algorithm, …
Settings modules are loaded in such order:
Here’s an example
import os
AWS_REGION = 'us-west-2'
#
# Production settings
# (use this for prod to talk to Amazon SQS)
# MESSAGE_QUEUE_USE_PROXY = False
# AWS_ACCESS_KEY_ID = 'AWS KEY ID'
# AWS_SECRET_ACCESS_KEY = ''AWS SECRET KEY
#
# Development settings
# (use this for dev to talk to ElasticMQ, which is SQS emulator)
# Using elasticmq to emulate SQS locally
MESSAGE_QUEUE_USE_PROXY = True
MESSAGE_QUEUE_PROXY_PORT = 9324
MESSAGE_QUEUE_PROXY_HOST = os.getenv('MESSAGE_QUEUE_PROXY_HOST', '0.0.0.0')
AWS_ACCESS_KEY_ID = 'x'
AWS_SECRET_ACCESS_KEY = 'x'
QUEUE_CONFIG = 'taskworker/queue_config.yaml'
# SQS limits per message size, bytes
# It can be set anywhere from 1024 bytes (1KB), up to 262144 bytes (256KB).
# See http://aws.amazon.com/sqs/faqs/
SQS_TASK_SIZE_LIMIT = 256000
QUEUE_SELECTOR = 'kale.queue_selector.ReducedLottery'
Settings in the later modules overwrite those in the early-loaded modules.
All queues and their properties are in a queues config yaml file whose path is specified in the above
settings modules.
Here’s an example
# task SLA: 60/10 = 6 seconds
high_priority:
name: high_priority
priority: 100
batch_size: 10
visibility_timeout_sec: 60
long_poll_time_sec: 1
num_iterations: 10
# task SLA: 60 / 10 = 6 seconds
default:
name: default
priority: 40
batch_size: 10
visibility_timeout_sec: 60
long_poll_time_sec: 1
num_iterations: 5
# task SLA: 60 / 10 = 6 seconds
low_priority:
name: low_priority
priority: 5
batch_size: 10
visibility_timeout_sec: 60
long_poll_time_sec: 5
num_iterations: 5
From source code
python setup.py install
Using pip (from github repo)
#
# Put this in requirements.txt, then run
# pip install -r requirements.txt
#
# If you want the latest build
git+https://github.com/Nextdoor/ndkale.git#egg=ndkale
# Or put this if you want a specific commit
git+https://github.com/Nextdoor/ndkale.git@67f873ed7b0a8131cc8d72453d749ffb389d695f
#
# Run from command line
#
pip install -e git+https://github.com/Nextdoor/ndkale.git#egg=ndkale
(We’ll upload the package to PyPI soon.)
See code in the example/ directory.