Distributed task queue with full async support
Documentation: https://taskiq-python.github.io/
Taskiq is an asynchronous distributed task queue for python.
This project takes inspiration from big projects such as Celery and Dramatiq.
But taskiq can send and run both the sync and async functions, has
integration with popular async frameworks, such as FastAPI and AioHTTP.
Also, we use PEP-612 to provide the best autosuggestions possible. All code is type-hinted.
This project can be installed using pip:
pip install taskiq
Or it can be installed directly from git:
pip install git+https://github.com/taskiq-python/taskiq
At first you need to create a broker. Broker is an object that can communicate to workers using distributed queues.
We have differet brokers for different queue backends. For example, we have a broker for NATS, Redis, RabbitMQ, Kafka and even more. Choose the one that fits you and create an instance.
from taskiq_nats import JetStreamBroker
broker = JetStreamBroker("nats://localhost:4222", queue="my_queue")
Declaring tasks is as easy as declaring a function. Just add a decorator to your function and you are ready to go.
import asyncio
from taskiq_nats import JetStreamBroker
broker = JetStreamBroker("nats://localhost:4222", queue="my_queue2")
@broker.task
async def my_task(a: int, b: int) -> None:
print("AB", a + b)
async def main():
await broker.startup()
await my_task.kiq(1, 2)
await broker.shutdown()
if __name__ == "__main__":
asyncio.run(main())
The message is going to be sent to the broker and then to the worker. The worker will execute the function. To start worker processes, just run the following command:
taskiq worker path.to.the.module:broker
Where path.to.the.module
is the path to the module where the broker is defined and broker
is the name of the broker variable.
If you have tasks in different modules, you can ask taskiq to automatically import them by passing the --fs-discover
flag:
taskiq worker path.to.the.module:broker --fs-discover
It will import all modules called tasks.py
in the current directory and all subdirectories.
Also, we support hot reload for workers. To enable it, just pass the --reload
flag. It will reload the worker when the code changes (To use it, install taskiq with reload extra. E.g pip install taskiq[reload]
).
Also, we have cool integrations with popular async frameworks. For example, we have an integration with FastAPI or AioHTTP. You can use it to reuse dependencies from your web app in your tasks.
Read about all features in our documentation: https://taskiq-python.github.io/
We use pre-commit to do linting locally.
After cloning this project, please install pre-commit. It helps fix files before committing changes.
pre-commit install
Pytest can run without any additional actions or options.
pytest
To run docs locally, you need to install yarn.
First, you need to install dependencies.
yarn install
After that you can set up a docs server by running:
yarn docs:dev