在开发的过程中,有时会需要用到类似下面的这些操作
- 用户注册时发送认证邮件
- 带有Web界面的爬虫
- 定时计划任务
这些任务的共同特点是执行所需的时间较长,但是我们又不希望其阻塞后续的操作。因此我们将这些任务放进任务队列里来运行。
Python常见的异步任务队列实现有功能较丰富的Celery和轻量级的RQ,本文以celery为例。
Celery 是一个简单、灵活且可靠的,处理大量消息的分布式系统。
Celery经常和RabbitMQ同时提起,但实际上,RabbitMQ和celery并不是同一层面的东西。Celery需要存储介质来存储任务(称为broker),可选的broker有RabbitMQ, redis, mysql, mongodb等。
声明任务
将作为celery任务的函数使用@app.task
修饰器进行修饰。其中@app
是一个celery实例。
#tasks.py
from celery import Celery
app = Celery('tasks', broker='redis://localhost/')
@app.task
def celery_task(x, y):
return x + y
启动worker
使用如下命令启动单个worker,其中tasks表示包含celery任务的python模块
celery -A tasks worker --loglevel=info
启动后出现如下界面,表示celery已经开始监听任务。
或使用celery multi
命令启动多个worker
celery multi start worker1 worker2 worker3 worker4 worker5 -A tasks -l info -c10 --pidfile=pid/task%n.pid --logfile=log/task%n.log
如果需要关闭worker,将上面命令中的start
换成stopwait
。
调用celery任务
from tasks import celery_task
celery_task.apply_async(args=(arg,))
#或
celery_task.delay(arg)
广播任务
正常情况下,一个任务只能有一个消费者,也就是说,当一个任务被一个worker取走后,这个任务就在队列中不再存在了,其他worker无法消费该任务。如果需要让一个任务被所有worker执行,就需要用到广播。
官方文档中称RabbitMQ和Redis均可支持广播。但是,广播在使用Redis作为broker时会遇到一系列莫名其妙的问题。因此,如果需要使用广播,请尽量使用RabbitMQ作为broker。
首先我们需要配置一条队列作为广播队列。
from celery import Celery
from kombu.common import Broadcast
celery = Celery('tasks', broker='amqp://guest:guest@localhost//')
celery.conf.update(
CELERY_QUEUES=(
Broadcast('broadcast'), #此处设置消息队列broadcast为广播模式,及该队列上的消息会发送至所有监听它的worker
),
CELERY_ROUTES={
'*': {
'queue': 'broadcast'
}
}
)
需要执行任务时,将队列指定为刚才配置的广播队列即可。
celery_task.apply_async(args=(arg,),queue='broadcast')
终止任务
celery_task.revoke(terminate=True)
如果上述命令不能有效地终止任务,可以添加一个signal='SIGKILL'
参数。如果不加这个参数,默认发送的信号是SIGTERM
celery_task.revoke(terminate=True,signal='SIGKILL')
RQ
除了celery之外,我们还有一种更轻量级的选择。
RQ(Redis Queue)是一个基于Redis的轻量级任务队列库,可以轻松地与Python应用进行集成。
它的用法非常简单
创建一个任务
import requests
from redis import Redis
from rq import Queue
def count_words_at_url(url):
resp = requests.get(url)
return len(resp.text.split())
q = Queue(connection=Redis())
result = q.enqueue(count_words_at_url, 'http://nvie.com')
在项目目录下启动一个worker,以开始执行已入队的任务。
rq worker
此时rq已经在后台监听新任务,运行上面的python脚本,即可得到如下的结果:
*** Listening for work on default
Got count_words_at_url('http://nvie.com') from default
Job result = 818
*** Listening for work on default
发表回复/Leave a Reply