yotaq - Your Own Task Queue for Python
So you need a task queue for your Python project. Sure you could check celery, and after three months trying to understand the basic configuration options you’ll be good to go. Or you could use a simpler task queue like huey or rq.
Why don’t you try building your own task queue? Well, now you can!
First, we’ll use redis as our message broker. There’s no need to install redis, we’ll use docker so we keep our environment clean. Open a terminal and run:
docker run -p 6379:6379 redis
There you go. Now let’s create a Python virtual environment to handle our dependencies, which are the redis python library and dill:
virtualenv env
source env/bin/activate
pip install redis dill
Pretty good. Our python code will use dill to serialize the functions to be run and redis to store the tasks.
The client will issue the tasks to be enqueued, so open up an editor, create a file called client.py
. There, we’ll define the task that will be sent to the workers, for example:
import random
import time
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def do_something(arg1, arg2):
""" Dummy function that just waits a random amount of time """
logger.info("Performing task with arg1=%s and arg2=%s", arg1, arg2)
time.sleep(random.uniform(0.0, 1))
Now we need to configure our redis client:
import redis
r = redis.Redis(
host='localhost',
port=6379
)
Once that’s done, we’re ready to generate and enqueue some tasks:
import dill
# Generate N tasks
NUM_TASKS = 100
logger.info("Generating %i tasks", NUM_TASKS)
for i in range(NUM_TASKS):
# Generate two random arguments
a1 = random.randrange(0, 100)
a2 = random.randrange(0, 100)
# Serialize the task and its arguments
data = dill.dumps((do_something, [a1, a2]))
# Store it in the message broker
r.lpush('tasks', data)
The worker will do the work (who would’ve guessed?) by keeping an eye on the task queue and fetching the available tasks to run. Pretty simple. So open up an editor to create our worker.py
file and write the following:
# Configure our redis client
r = redis.Redis(
host='localhost',
port=6379
)
while True:
# Wait until there's an element in the 'tasks' queue
key, data = r.brpop('tasks')
# Deserialize the task
d_fun, d_args = dill.loads(data)
# Run the task
d_fun(*d_args)
Boom! You’re done! Run some workers with:
python worker.py
You can even run them in other machines, such scaling, very distributed. And then run the client to create some tasks.
python client.py
How’s that for less than 50 lines of code?