【Python】Celery基本使用

【Python】Celery基本使用,第1张

目录

初识celery

说明

celery的框架由三部分组成:

使用场景

使用优点

安装及使用

安装

案例演示

1. 创建项目celerypro

2. 创建异步任务执行文件celery_task

3. 使用命令启动celery

4. 执行测试函数

5. 异步获取结果


初识celery

celery是一个简单、灵活且可靠的,处理大量消息的分布式系统,专注于处理异步任务队列,同时也支持任务调度。


说明
  1. 分布式系统:一个系统应用由不同的组件构成,我们将不同组件架构在不同的服务器中,不同组件之间通过消息通信的方式来实现协调工作。


  2. 图中user指的是django或者flask这样的框架,接收请求之后需要进行处理。


  3. 图中AMQPbroker指的是中间件,我们可以用RabbitMQ或者Redis来承担相关工作。


  4. 图中celery workers值得是celery,它作为消费者监听中间件中的任务,这里就需要用到并发的 *** 作了,celery实现了并发(进程+协程),我们只需要调用就可以了。


celery的框架由三部分组成:
  • 消息中间件(message broker)
    •  Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。


      包括,RabbitMQ, Redis等等。


    •     官方推荐用rabbitMQ,因为它持久稳定。


  • 任务执行单元(worker)
    •  Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。


  • 任务执行结果存储(task result store)
    •  Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等

另外, Celery还支持不同的并发和序列化的手段

  • 并发:Prefork, Eventlet, gevent, threads/single threaded
  • 序列化:pickle, json, yaml, msgpack. zlib, bzip2 compression, Cryptographic message signing 等等

使用场景

celery是一个强大分布式任务队列的异步处理框架,它可以让任务的执行完全脱离主程序,甚至可以被分配到其他主机上运行。


我们通常使用它来实现异步任务(async task)和定时任务(crontab)。


异步任务:将耗时 *** 作任务提交给Celery去异步执行,比如发送短信/邮件、消息推送、音视频处理等等

定时任务:定时执行某件事情,比如每天数据统计

使用优点

Simple(简单)
Celery 使用和维护都非常简单,并且不需要配置文件。


Highly Available(高可用)
woker和client会在网络连接丢失或者失败时,自动进行重试。


并且有的brokers 也支持“双主”或者“主/从”的方式实现高可用。


Fast(快速)
单个的Celery进程每分钟可以处理百万级的任务,并且只需要毫秒级的往返延迟(使用 RabbitMQ, librabbitmq, 和优化设置时)

Flexible(灵活)
Celery几乎每个部分都可以扩展使用,自定义池实现、序列化、压缩方案、日志记录、调度器、消费者、生产者、broker传输等等。


安装及使用 安装
pip install celery

使用过程很简单

1. 确定异步任务函数,通过命令确定之后,生产者会把函数名和相关参数传给消息中间件。


2. 通过一条命令把celery启动,实现celery workers对消息队列的监听。


案例演示

这里为了方便,我们使用redis来进行演示,redis在使用的时候充当两个角色,一个是消息中间件,一个是存储结果的数据库。


1. 创建项目celerypro 2. 创建异步任务执行文件celery_task
import celery
import time
backend = 'redis://127.0.0.1:6379/1'    # 设置redis的1数据库来存放结果
broker = 'redis://127.0.0.1:6379/2'      # 设置redis的2数据库存放消息中间件
cel = celery.Celery('test', backend=backend, broker=broker)
    # 参数说明:第一个是celery的名字,这个celery和哪个项目相关就命名哪个
    # 后面两个关键字参数则是指定消息中间件和结果存放位置。


@cel.task def send_email(name): print("向%s发送邮件..." % name) time.sleep(5) print("向%s发送邮件完成" % name) return "ok" @cel.task def send_msg(name): print("向%s发送短信..." % name) time.sleep(5) print("向%s发送短信完成" % name) return "ok"

3. 使用命令启动celery
celery --app=demo worker -l INFO

 下面是执行之后的效果:

在使用过程中也可能会出现下面问题:

 这是因为在celery用到了协程,协程在使用的使用需要用猴子补丁,具体解决方式是,首先下载eventlet:

pip install eventlet

然后对启动命令进行修改:

celery --app=celery_task worker -l INFO -P eventlet
4. 执行测试函数

建立一个文件,放入下面代码进行celery异步函数的测试:

from celery_task import send_email, send_msg

result1 = send_email.delay("张三")
print(result1.id)
result2 = send_email.delay("李四")
print(result2.id)
result3 = send_email.delay("王五")
print(result3.id)
result4 = send_email.delay("赵六")
print(result4.id)

下面是运行结果,而在celery的界面中也可以看到对应的日志信息:

运行的结果不是异步函数的返回值,而是一个id值,因为celery会将函数进行异步处理,结果会存放到指定的数据库中,而我们取值的时候就需要用id值了。


5. 异步获取结果
from celery.result import AsyncResult
from celery_task import cel

async_result=AsyncResult(id="275f43a8-a5bb-4822-9a90-8be3feeb3b4", app=cel)

if async_result.successful():
    result = async_result.get()
    print(result)
    # result.forget() # 将结果删除
elif async_result.failed():
    print('执行失败')
elif async_result.status == 'PENDING':
    print('任务等待中被执行')
elif async_result.status == 'RETRY':
    print('任务异常后正在重试')
elif async_result.status == 'STARTED':
    print('任务已经开始被执行')

说明:执行失败效果是代码有错但是异步不会停止,还是会执行获得id,但是当获取结果的时候,async_result.failed()结果为真。


如果要演示记得重启celery,否则修改不生效。


欢迎分享,转载请注明来源:内存溢出

原文地址: http://www.outofmemory.cn/langs/570375.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-04-09
下一篇 2022-04-09

发表评论

登录后才能评论

评论列表(0条)

保存