A distributed task queue built with asyncio and redis, with built-in web interface
ReArq is a distributed task queue with asyncio and redis, which rewrite from arq
to make improvement and include web interface.
You can try Demo Online here.
Use MySQL backend:
pip install rearq[mysql]
Use PostgreSQL backend:
pip install rearq[postgres]
# main.py
from rearq import ReArq
rearq = ReArq(db_url='mysql://root:[email protected]:3306/rearq')
@rearq.on_shutdown
async def on_shutdown():
# you can do some clean work here like close db and so on...
print("shutdown")
@rearq.on_startup
async def on_startup():
# you should do some initialization work here
print("startup")
# you must init Tortoise ORM here
await Tortoise.init(
db_url=settings.DB_URL,
modules={"rearq": ["rearq.server.models"]},
)
@rearq.task(queue="q1")
async def add(self, a, b):
return a + b
@rearq.task(cron="*/5 * * * * * *") # run task per 5 seconds
async def timer(self):
return "timer"
> rearq main:rearq worker -q q1 -q q2 # consume tasks from q1 and q2 as the same time
2021-03-29 09:54:50.464 | INFO | rearq.worker:_main:95 - Start worker success with queue: rearq:queue:default
2021-03-29 09:54:50.465 | INFO | rearq.worker:_main:96 - Registered tasks: add, sleep, timer_add
2021-03-29 09:54:50.465 | INFO | rearq.worker:log_redis_info:86 - redis_version=6.2.1 mem_usage=1.43M clients_connected=5 db_keys=6
If you have timing task or delay task, you should run another command also:
> rearq main:rearq timer
2021-03-29 09:54:43.878 | INFO | rearq.worker:_main:275 - Start timer success
2021-03-29 09:54:43.887 | INFO | rearq.worker:_main:277 - Registered timer tasks: timer_add
2021-03-29 09:54:43.894 | INFO | rearq.worker:log_redis_info:86 - redis_version=6.2.1 mem_usage=1.25M clients_connected=2 db_keys=6
Also, you can run timer with worker together by rearq main:rearq worker -t
.
from fastapi import FastAPI
app = FastAPI()
@app.on_event("startup")
async def startup() -> None:
await Tortoise.init(
db_url=settings.DB_URL,
modules={"rearq": ["rearq.server.models"]},
)
@app.on_event("shutdown")
async def shutdown() -> None:
await rearq.close()
# then run task in view
@app.get("/test")
async def test():
job = await add.delay(args=(1, 2))
# or
job = await add.delay(kwargs={"a": 1, "b": 2})
# or
job = await add.delay(1, 2)
# or
job = await add.delay(a=1, b=2)
result = await job.result(timeout=5) # wait result for 5 seconds
print(result.result)
return result
> rearq main:rearq server
Usage: rearq server [OPTIONS]
Start rest api server.
Options:
--host TEXT Listen host. [default: 0.0.0.0]
-p, --port INTEGER Listen port. [default: 8000]
-h, --help Show this message and exit..
After server run, you can visit https://127.0.0.1:8000/docs to see all apis
and https://127.0.0.1:8000 to see web interface.
Other options will pass into uvicorn
directly, such as --root-path
etc.
rearq main:rearq server --host 0.0.0.0 --root-path /rearq
You can also mount rearq server as FastAPI sub app.
from fastapi import FastAPI
from examples.tasks import rearq
from rearq.server.app import app as rearq_app
app = FastAPI()
app.mount("/rearq", rearq_app)
rearq_app.set_rearq(rearq)
You can also start worker inside your app.
@app.on_event("startup")
async def startup():
await rearq.init()
await rearq_app.start_worker(with_timer=True, block=False)
This project is licensed under the Apache-2.0 License.