Writing an asynchronous ‘thing-doer’ using RQ, Flask (… in less than 5 mins..)
A task queue is an important tool in your arsenal. It allows you to run background work asynchronously outside of a user request. This is an important pattern for writing performant apps.
I have implemented a few custom job managers and task executors for various use cases. Although a full featured job manager is out of scope of this article, the core functionality is surprisingly easy to setup. By marrying RQ (Redis Queue) library with a micro web framework like Flask you can easily build out REST backed APIs to manage and execute long running tasks.
“Talk is cheap. Show me the code.” — Linus Torvalds
Setup development environment
# Setup a virtual environment to pull in all the required packages...$ python3 -m venv venv
$ source venv/bin/activate# Ensure the venv is activated before installing dependencies...
$ (venv) pip install flask rq
Building the Flask application
Set up the basic directory structure
├── README.md
├── app
│ ├── __init__.py
│ ├── routes.py
│ └── task.py
├── main.py
app/__init__.py
app/routes.py
main.py
That’s it, you have a flask web server. To test everything is working well…
# Start the development server bundled with flask...
(venv) $ python main.py
* Serving Flask app ‘app’ (lazy loading)
* Environment: production
WARNING: This is a development server. Do not use it in a production deployment.
Use a production WSGI server instead.
* Debug mode: on
* Running on all addresses.
WARNING: This is a development server. Do not use it in a production deployment.
* Running on http://127.0.0.1:5000/ (Press CTRL+C to quit)
* Restarting with stat
* Debugger is active!
* Debugger PIN: 269–294–915
# In a new terminal, try hitting the endpoint..$ curl -ik http://127.0.0.1:5000
HTTP/1.0 200 OK
Content-Type: text/html; charset=utf-8
Content-Length: 10
Server: Werkzeug/2.0.2 Python/3.6.9
Date: Fri, 28 Jan 2022 16:12:07 GMTStatus: OK
Add a task that you intend to carry out asynchronously
app/task.py
app/routes.py
Install and start the redis-server and an RQ worker
$ sudo apt install redis-server
$ redis-server
2750:C 28 Jan 20:50:50.127 # oO0OoO0OoO0Oo Redis is starting oO0OoO0OoO0Oo
2750:C 28 Jan 20:50:50.128 # Redis version=4.0.9, bits=64, commit=00000000, modified=0, pid=2750, just started
2750:C 28 Jan 20:50:50.128 # Warning: no config file specified, using the default config. In order to specify a config file use redis-server /path/to/redis.conf
2750:M 28 Jan 20:50:50.130 * Increased maximum number of open files to 10032 (it was originally set to 1024).
_._
_.-``__ ''-._
_.-`` `. `_. ''-._ Redis 4.0.9 (00000000/0) 64 bit
.-`` .-```. ```\/ _.,_ ''-._
( ' , .-` | `, ) Running in standalone mode
|`-._`-...-` __...-.``-._|'` _.-'| Port: 6379
| `-._ `._ / _.-' | PID: 2750
`-._ `-._ `-./ _.-' _.-'
|`-._`-._ `-.__.-' _.-'_.-'|
| `-._`-._ _.-'_.-' | http://redis.io
`-._ `-._`-.__.-'_.-' _.-'
|`-._`-._ `-.__.-' _.-'_.-'|
| `-._`-._ _.-'_.-' |
`-._ `-._`-.__.-'_.-' _.-'
`-._ `-.__.-' _.-'
`-._ _.-'
`-.__.-'
2750:M 28 Jan 20:50:50.139 # Server initialized
2750:M 28 Jan 20:50:50.140 * Ready to accept connections(venv) $ rq worker
20:56:26 Worker rq:worker:b799fb5f954d418d9743478bc2e052ab: started, version 1.10.1
20:56:26 Subscribing to channel rq:pubsub:b799fb5f954d418d9743478bc2e052ab
20:56:26 *** Listening on default…
20:56:26 Cleaning registries for queue: default
At this point you have 3 terminals running…
.. a redis-server
.. a worker
.. a flask web server..
4. Congrats! You are ready to test things out..
# In a new terminal, hit the route mapped to the task...
# The flask server returns a task ID indicating that the task is queued successfully...$ curl -ik http://127.0.0.1:5000/task/long-running-task/foobar
HTTP/1.0 200 OK
Content-Type: application/json
Content-Length: 77
Server: Werkzeug/2.0.2 Python/3.6.9
Date: Fri, 28 Jan 2022 16:17:30 GMT{
"status": "success",
"task": "a33035c5-332b-468c-8558-07c8c8a160b6"
}# In the worker terminal, observe that the worker picks up the task and executes it..(venv) $ rq worker
21:47:26 Worker rq:worker:ad16093b89964f4c8bf2244e8b5bd129: started, version 1.10.1
21:47:26 Subscribing to channel rq:pubsub:ad16093b89964f4c8bf2244e8b5bd129
21:47:26 *** Listening on default...
21:47:26 Cleaning registries for queue: default
21:47:30 default: app.task.long_running_task('foobar') (a33035c5-332b-468c-8558-07c8c8a160b6)
Waiting for 10 seconds...
Message : foobar
Finished executing long running task.
21:47:41 default: Job OK (a33035c5-332b-468c-8558-07c8c8a160b6)
21:47:41 Result is kept for 500 seconds