澳门威利斯人_威利斯人娱乐「手机版」

来自 网络资讯 2019-05-02 14:26 的文章
当前位置: 澳门威利斯人 > 网络资讯 > 正文

分布式任务队列Celery入门与进阶,Django使用Cele

二、架构&专业规律

  Celery由以下三局地组成:音信中间件(Broker)、职务实施单元Worker、结果存款和储蓄(Backend),如下图:

  图片 1

行事原理:

  1. 职分模块Task包括异步职务和定时职责。个中,异步任务平日在事情逻辑中被触发并发往音讯队列,而定期职分由Celery Beat进度周期性地将义务发往新闻队列;
  2. 职务奉行单元Worker实时监视音讯队列获取队列中的任务施行;
  3. Woker试行完职务后将结果保存在Backend中;

一.2Celery适用场景

异步职分管理:比如给登记用户发送短音信或然确认邮件义务。 大型职分:推行时间较长的天职,举例录制和图纸管理,增多水印和转码等,必要实行职责时间长。 定期举办的职分:援救职务的定时施行和设定时期实践。比方品质压测按时举办。

5.3.2  启动worker进程

Worker进度运营和眼前运维命令同样。

 

celery –A proj.mycelery worker –l info

图片 2

 

放置钩子函数

  Celery在实行职务时候,提供了钩子方法用于在任务实践到位时候进行相应的操作,在Task源码中提供了成百上千情景钩子函数如:on_success(成功后实施)、on_failure(战败时候实践)、on_retry(义务重试时候执行)、after_return(任务回到时候实践),在进展应用是大家只须要重写这几个措施,完结相应的操作就能够。

在偏下示例中,我们继续修改period_task.py,分别定义多个职分来演示任务战败、重试、职务成功后施行的操作:

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# Author:wd
from project import app
from celery.utils.log import get_task_logger
from celery import Task

logger = get_task_logger(__name__)

class demotask(Task):

    def on_success(self, retval, task_id, args, kwargs):   # 任务成功执行
        logger.info('task id:{} , arg:{} , successful !'.format(task_id,args))



    def on_failure(self, exc, task_id, args, kwargs, einfo):  #任务失败执行
        logger.info('task id:{} , arg:{} , failed ! erros : {}' .format(task_id,args,exc))


    def on_retry(self, exc, task_id, args, kwargs, einfo):    #任务重试执行
        logger.info('task id:{} , arg:{} , retry !  einfo: {}'.format(task_id, args, exc))

@app.task(base=demotask,bind=True)
def add(self,x,y):
    try:
        a=[]
        a[10]==1
    except Exception as e:
        raise self.retry(exc=e, countdown=5, max_retries=1) # 出错每5秒尝试一次,总共尝试1次
    return x y

@app.task(base=demotask)
def sayhi(name):
    a=[]
    a[10]==1
    return 'hi {}'.format(name)

@app.task(base=demotask)
def sum(a,b):
    return 'a b={} '.format(a b)

那儿的安插文件config.py:

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# Author:wd

from project import app
from celery.schedules import crontab

BROKER_URL = 'redis://10.1.210.69:6379/0' # Broker配置,使用Redis作为消息中间件

CELERY_RESULT_BACKEND = 'redis://10.1.210.69:6379/0' # BACKEND配置,这里使用redis

CELERY_RESULT_SERIALIZER = 'json' # 结果序列化方案

CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任务过期时间

CELERY_TIMEZONE='Asia/Shanghai'   # 时区配置

CELERY_IMPORTS = (     # 指定导入的任务模块,可以指定多个
    'project.tasks',
    'project.period_task',
)

app.conf.beat_schedule = {
'add': {          # 每10秒执行
        'task': 'project.period_task.add',  #任务路径
        'schedule': 10.0,
        'args': (10,12),
    },
'sayhi': {          # 每10秒执行
        'task': 'project.period_task.sayhi',  #任务路径
        'schedule': 10.0,
        'args': ('wd',),
    },
'sum': {          # 每10秒执行
        'task': 'project.period_task.sum',  #任务路径
        'schedule': 10.0,
        'args': (1,3),
    },
}

接下来重启worker和beat,查看日志:

图片 3

 

3Celery单独实施任务

2.2     Celery安装

应用方式介绍:

Celery的运营重视新闻队列,使用时要求设置redis可能rabbit。

此处大家采用Redis。安装redis库:

sudo yum install redis

  

启动redis:

sudo service redis start

 

安装celery库

sudo pip install celery==4.0.2

 

结果存款和储蓄Backend

  Backend结果存款和储蓄官方也提供了成都百货上千的囤积格局协理:RabbitMQ、 Redis、Memcached,SQLAlchemy, Django OENCOREM、Apache Cassandra、Elasticsearch。

2.2     Celery安装

使用格局介绍:

Celery的运作重视音讯队列,使用时索要设置redis只怕rabbit。

此间大家应用Redis。安装redis库:

sudo yum install redis

启动redis:

sudo service redis start

安装celery库

sudo pip install celery==4.0.2

4.3     配置Celery

大家在mycelery.py文件中证明celery的布署文件在settings.py中,并且是以CELE纳瓦拉Y伊始。

   

app.config_from_object('django.conf:settings', namespace='CELERY')

 

在settings.py文件中增多celery配置:

 图片 4

 

小编们的布署是应用redis作为音讯队列,音信的代办和结果都以用redis,职责的连串化使用json格式。

注重:redis://1二七.0.0.1:6379/0以此证明使用的redis的0号队列,假若有七个celery任务都使用同一个队列,则会变成职分混乱。最棒是celery实例单独选择一个种类。

一、简介

  Celery是由Python开采、简单、灵活、可相信的布满式任务队列,其本质是劳动者消费者模型,生产者发送职责到消息队列,消费者承受管理任务。Celery侧重于实时操作,但对调节补助也很好,其天天能够管理数以百万计的天职。特点:

  • 简简单单:熟谙celery的干活流程后,配置使用不难
  • 高可用:当职务试行倒闭或施行进度中产生一而再中断,celery会自动尝试再次奉行义务
  • 相当慢:八个单进度的celery每分钟可管理上百万个职责
  • 利落:差不多celery的相继零部件都足以被扩展及自定制

动用场景举个例子:

  一.web运用:当用户在网站进行有个别操作要求十分短日子成功时,大家得以将那种操作交给Celery实行,直接再次回到给用户,等到Celery实行到位之后公告用户,大大提好网址的产出以及用户的体验感。

  二.职分场景:举例在运行场景下要求批量在几百台机器实施有个别命令恐怕任务,此时Celery能够轻松消除。

  叁.定期任务:向定期导数据报表、定期发送布告类似现象,即便Linux的安顿职责能够帮小编落成,不过充裕不方便人民群众管理,而Celery能够提供处理接口和加多的API。

三.二 调用职责

直接展开python交互命令行

实行下边代码:

图片 5

能够celery的窗口看看职责的进行新闻

图片 6

任务履市场价格况监察和控制和收获结果:

图片 7

三.三     职责调用方法总括

有二种艺术:

delay和apply_async ,delay方法是apply_async简化版。

add.delay(2, 2)
add.apply_async((2, 2))
add.apply_async((2, 2), queue='lopri')

 

delay方法是apply_async简化版本。

apply_async方法是能够带相当多的安排参数,包蕴钦定队列等

  • Queue 指定队列名称,能够把不一样任务分配到不相同的队列

 

简单的讲利用

  目录结构:

project/
├── __init__.py  
├── config.py
└── tasks.py

各目录文件说明:

__init__.py:起初化Celery以及加载配置文件

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# Author:wd
from celery import Celery
app = Celery('project')                                # 创建 Celery 实例
app.config_from_object('project.config')               # 加载配置模块

config.py:  Celery相关配置文件,更多配备参考:

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# Author:wd

BROKER_URL = 'redis://10.1.210.69:6379/0' # Broker配置,使用Redis作为消息中间件

CELERY_RESULT_BACKEND = 'redis://10.1.210.69:6379/0' # BACKEND配置,这里使用redis

CELERY_RESULT_SERIALIZER = 'json' # 结果序列化方案

CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任务过期时间

CELERY_TIMEZONE='Asia/Shanghai'   # 时区配置

CELERY_IMPORTS = (     # 指定导入的任务模块,可以指定多个
    'project.tasks',
)

tasks.py :职分定义文件

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# Author:wd

from project import app
@app.task
def show_name(name):
    return name

启动Worker:

celery worker -A project -l debug

次第参数含义:

  worker: 代表第启动的角色是work当然还有beat等任何剧中人物;

  -A :项目路线,这里我的目录是project

  -l:运转的日记品级,越来越多参数使用celery --help查看

查阅日志输出,会开采大家定义的天职,以及有关布置:

图片 8

 

  即便起步了worker,不过我们还亟需经过delay或apply_async来将任务加多到worker中,这里大家通过交互式方法加多职务,并再次来到AsyncResult对象,通过AsyncResult对象得到结果:

图片 9

AsyncResult除了get方法用于常用获取结果方法外还提以下常用艺术或性质:

  • state: 重回职务景况;
  • task_id: 重临职责id;
  • result: 重临职务结果,同get()方法;
  • ready(): 判定职分是还是不是以及有结果,有结果为True,不然False;
  • info(): 获取职务新闻,默感到结果;
  • wait(t): 等待t秒后获得结果,若职责实施完结,则不等待直接获得结果,若任务在施行中,则wait时期一贯不通,直到超时报错;
  • successfu(): 判定任务是还是不是中标,成功为True,不然为False;

6 Celery深入

Celery义务帮助多元的运营格局:

  1. 支撑动态钦点并发数 --autoscale=10,三 (always keep 3 processes, but grow to 拾 if necessary).
  2. 支撑链式职务
  3. 支持Group任务
  4. 支撑任务差别优先级
  5. 支撑钦命任务队列
  6. 支撑采取eventlet格局运作worker

譬如说:内定并发数为1000

celery -A proj.mycelery worker -c 1000

那些足以依赖使用的深深机关驾驭和上学。

上述正是本文的全体内容,希望对大家的上学抱有扶助,也可望大家多多扶助脚本之家。

4.5     编写task任务

编辑职分文件

tasks.py

在tasks.py文件中增加上面代码

# Create your tasks here
from __future__ import absolute_import, unicode_literals
from celery import shared_task

@shared_task
def add(x, y):
    return x   y

@shared_task
def mul(x, y):
    return x * y

@shared_task
def xsum(numbers):
    return sum(numbers)

 

启动celery:

celery -A proj.mycelery worker -l info

 

证实:proj为模块名称,mycelery为celery的实例所在的文书。

初始成功打字与印刷:

 图片 10

 

任务绑定

  Celery可由此职务绑定到实例获取到职分的上下文,那样我们能够在任务局营时候得到到职责的情况,记录相关日志等。

修改职分中的period_task.py,如下:

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# Author:wd
from project import app
from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)
@app.task(bind=True)  # 绑定任务
def add(self,x,y):
    logger.info(self.request.__dict__)  #打印日志
    try:
        a=[]
        a[10]==1
    except Exception as e:
        raise self.retry(exc=e, countdown=5, max_retries=3) # 出错每5秒尝试一次,总共尝试3次
    return x y

在上述代码中,通过bind参数将任务绑定,self指职务的上下文,通过self获取职责状态,同时在职务出错开上下班时间开始展览职分重试,大家观望日志:

图片 11

4.2 成立项目文件

成立3个等级次序:名字称为proj

- proj/
 - proj/__init__.py
 - proj/settings.py
 - proj/urls.py
 - proj/wsgi.py
- manage.py

成立贰个新的文书: proj/proj/mycelery.py

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')

app = Celery('proj')

# Using a string here means the worker don't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()

在proj/proj/__init__.py:添加

from __future__ import absolute_import, unicode_literals

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .mycelery import app as celery_app

__all__ = ['celery_app']

5.二     定期施行

定期每日上午七:27分运营。

注意:设置职务时间时只顾时间格式,UTC时间依然地面时间。

#crontab任务
#每天7:30调用task.add
from celery.schedules import crontab

CELERY_BEAT_SCHEDULE = {
    # Executes every Monday morning at 7:30 A.M
    'add-every-monday-morning': {
        'task': 'tasks.add',
        'schedule': crontab(hour=7, minute=30),
        'args': (16, 16),
    },
}

 

定期职分&铺排职分

  Celery的提供的定时职责至关心重视要靠schedules来形成,通过beat组件周期性将职务发送给woker执行。在示范中,新建文件period_task.py,并充分职责到布置文件中:

period_task.py:

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# Author:wd
from project import app
from celery.schedules import crontab

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    sender.add_periodic_task(10.0, add.s(1,3), name='1 3=') # 每10秒执行add
    sender.add_periodic_task(
        crontab(hour=16, minute=56, day_of_week=1),      #每周一下午四点五十六执行sayhai
        sayhi.s('wd'),name='say_hi'
    )



@app.task
def add(x,y):
    print(x y)
    return x y


@app.task
def sayhi(name):
    return 'hello %s' % name

config.py

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# Author:wd

BROKER_URL = 'redis://10.1.210.69:6379/0' # Broker配置,使用Redis作为消息中间件

CELERY_RESULT_BACKEND = 'redis://10.1.210.69:6379/0' # BACKEND配置,这里使用redis

CELERY_RESULT_SERIALIZER = 'json' # 结果序列化方案

CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任务过期时间

CELERY_TIMEZONE='Asia/Shanghai'   # 时区配置

CELERY_IMPORTS = (     # 指定导入的任务模块,可以指定多个
    'project.tasks',
    'project.period_task', #定时任务
)

启动worker和beat:

celery worker -A project -l debug #启动work
celery beat -A  project.period_task -l  debug #启动beat,注意此时对应的文件路径

小编们可以观测worker日志:

图片 12

还是能通过安插文件方式钦命定时和安排任务,此时的配备文件如下:

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# Author:wd

from project import app
from celery.schedules import crontab

BROKER_URL = 'redis://10.1.210.69:6379/0' # Broker配置,使用Redis作为消息中间件

CELERY_RESULT_BACKEND = 'redis://10.1.210.69:6379/0' # BACKEND配置,这里使用redis

CELERY_RESULT_SERIALIZER = 'json' # 结果序列化方案

CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任务过期时间

CELERY_TIMEZONE='Asia/Shanghai'   # 时区配置

CELERY_IMPORTS = (     # 指定导入的任务模块,可以指定多个
    'project.tasks',
    'project.period_task',
)

app.conf.beat_schedule = {
    'period_add_task': {    # 计划任务
        'task': 'project.period_task.add',  #任务路径
        'schedule': crontab(hour=18, minute=16, day_of_week=1),
        'args': (3, 4),
    },
'add-every-30-seconds': {          # 每10秒执行
        'task': 'project.period_task.sayhi',  #任务路径
        'schedule': 10.0,
        'args': ('wd',)
    },
}

此时的period_task.py只需求注册到woker中就行了,如下:

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# Author:wd
from project import app

@app.task
def add(x,y):
    print(x y)
    return x y


@app.task
def sayhi(name):
    return 'hello %s' % name

一样运营worker和beat结果和率先种方法一样。愈多详细的始末请参考:

您可能感兴趣的稿子:

  • Django中使用celery达成异步职务的言传身教代码
  • 异步任务队列Celery在Django中的使用办法

7      参考资料

Celery官网:

Celery与Django:

celery定期职分:

音信中间件Broker

  音讯中间件Broker官方提供了成千上万备选方案,帮衬RabbitMQ、Redis、AmazonSQS、MongoDB、Memcached 等,官方推荐RabbitMQ。

5Celery定期职分

Celery作为异步任务队列,大家能够依据大家设置的日子,定期的进行一些职分,举例天天数据库备份,日志转存等。

Celery的定期职分安排万分轻巧:

定期任务的安插依然在setting.py文件中。

表达:假若以为celery 的多少配置文件和Django 的都在setting.py 2个文件中不方便人民群众,能够分拆出来,只须要在mycelery.py 的文书中指明就可以。

app.config_from_object('django.conf:yoursettingsfile', namespace='CELERY')

6      Celery深入

Celery任务帮助多元的周转格局:

  • 帮助动态内定并发数 --autoscale=10,三 (always keep 3 processes, but grow to 拾 if necessary).
  • 支撑链式职责
  • 支持Group任务
  • 支撑职分不一致优先级
  • 援助钦命职责队列
  • 帮助选择eventlet形式运作worker

举个例子:钦赐并发数为一千

celery -A proj.mycelery worker -c 1000

 

这几个足以依赖使用的中肯机关掌握和读书。

 

 

delay &apply_async

  对于delay和apply_async都能够用来张开任务的调整,本质上是delay对apply_async举办了再1遍封装(或许能够说是飞快格局),两者都回到AsyncResult对象,以下是七个点子源码。

图片 13图片 14

    def delay(self, *args, **kwargs):
        """Star argument version of :meth:`apply_async`.

        Does not support the extra options enabled by :meth:`apply_async`.

        Arguments:
            *args (Any): Positional arguments passed on to the task.
            **kwargs (Any): Keyword arguments passed on to the task.
        Returns:
            celery.result.AsyncResult: Future promise.
        """
        return self.apply_async(args, kwargs)

delay源码

图片 15图片 16

    def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
                    link=None, link_error=None, shadow=None, **options):
        """Apply tasks asynchronously by sending a message.

        Arguments:
            args (Tuple): The positional arguments to pass on to the task.

            kwargs (Dict): The keyword arguments to pass on to the task.

            countdown (float): Number of seconds into the future that the
                task should execute.  Defaults to immediate execution.

            eta (~datetime.datetime): Absolute time and date of when the task
                should be executed.  May not be specified if `countdown`
                is also supplied.

            expires (float, ~datetime.datetime): Datetime or
                seconds in the future for the task should expire.
                The task won't be executed after the expiration time.

            shadow (str): Override task name used in logs/monitoring.
                Default is retrieved from :meth:`shadow_name`.

            connection (kombu.Connection): Re-use existing broker connection
                instead of acquiring one from the connection pool.

            retry (bool): If enabled sending of the task message will be
                retried in the event of connection loss or failure.
                Default is taken from the :setting:`task_publish_retry`
                setting.  Note that you need to handle the
                producer/connection manually for this to work.

            retry_policy (Mapping): Override the retry policy used.
                See the :setting:`task_publish_retry_policy` setting.

            queue (str, kombu.Queue): The queue to route the task to.
                This must be a key present in :setting:`task_queues`, or
                :setting:`task_create_missing_queues` must be
                enabled.  See :ref:`guide-routing` for more
                information.

            exchange (str, kombu.Exchange): Named custom exchange to send the
                task to.  Usually not used in combination with the ``queue``
                argument.

            routing_key (str): Custom routing key used to route the task to a
                worker server.  If in combination with a ``queue`` argument
                only used to specify custom routing keys to topic exchanges.

            priority (int): The task priority, a number between 0 and 9.
                Defaults to the :attr:`priority` attribute.

            serializer (str): Serialization method to use.
                Can be `pickle`, `json`, `yaml`, `msgpack` or any custom
                serialization method that's been registered
                with :mod:`kombu.serialization.registry`.
                Defaults to the :attr:`serializer` attribute.

            compression (str): Optional compression method
                to use.  Can be one of ``zlib``, ``bzip2``,
                or any custom compression methods registered with
                :func:`kombu.compression.register`.
                Defaults to the :setting:`task_compression` setting.

            link (Signature): A single, or a list of tasks signatures
                to apply if the task returns successfully.

            link_error (Signature): A single, or a list of task signatures
                to apply if an error occurs while executing the task.

            producer (kombu.Producer): custom producer to use when publishing
                the task.

            add_to_parent (bool): If set to True (default) and the task
                is applied while executing another task, then the result
                will be appended to the parent tasks ``request.children``
                attribute.  Trailing can also be disabled by default using the
                :attr:`trail` attribute

            publisher (kombu.Producer): Deprecated alias to ``producer``.

            headers (Dict): Message headers to be included in the message.

        Returns:
            celery.result.AsyncResult: Promise of future evaluation.

        Raises:
            TypeError: If not enough arguments are passed, or too many
                arguments are passed.  Note that signature checks may
                be disabled by specifying ``@task(typing=False)``.
            kombu.exceptions.OperationalError: If a connection to the
               transport cannot be made, or if the connection is lost.

        Note:
            Also supports all keyword arguments supported by
            :meth:`kombu.Producer.publish`.
        """
        if self.typing:
            try:
                check_arguments = self.__header__
            except AttributeError:  # pragma: no cover
                pass
            else:
                check_arguments(*(args or ()), **(kwargs or {}))

        app = self._get_app()
        if app.conf.task_always_eager:
            with denied_join_result():
                return self.apply(args, kwargs, task_id=task_id or uuid(),
                                  link=link, link_error=link_error, **options)

        if self.__v2_compat__:
            shadow = shadow or self.shadow_name(self(), args, kwargs, options)
        else:
            shadow = shadow or self.shadow_name(args, kwargs, options)

        preopts = self._get_exec_options()
        options = dict(preopts, **options) if options else preopts

        options.setdefault('ignore_result', self.ignore_result)

        return app.send_task(
            self.name, args, kwargs, task_id=task_id, producer=producer,
            link=link, link_error=link_error, result_cls=self.AsyncResult,
            shadow=shadow, task_type=self,
            **options
        )

apply_async源码

对于其利用,apply_async帮衬常用参数:

  • eta:钦定职务执行时间,类型为datetime时间档期的顺序;
  • countdown:倒计时,单位秒,浮点类型;
  • expires:职分过期时间,假使职责在跨超越期时刻还未实施则回收任务,浮点类型获取datetime类型;
  • retry:职责奉行停业时候是不是尝试,布尔类型。;
  • serializer:体系化方案,协助pickle、json、yaml、msgpack;
  • priority:职责优先级,有0~九优先级可设置,int类型;
  • retry_policy:职分重试机制,在那之中蕴涵多少个重试参数,类型是dict如下:

图片 17图片 18

max_retries:最大重试次数

interval_start:重试等待时间

interval_step:每次重试叠加时长,假设第一重试等待1s,第二次等待1+n秒

interval_max:最大等待时间

####示例
 add.apply_async((1, 3), retry=True, retry_policy={
        'max_retries': 1,
        'interval_start': 0,
        'interval_step': 0.8,
        'interval_max': 5,
    })

View Code

越来越多参数参考:

 

  

5.三按时任务运维

安插了按期职务,除了worker进度外,还需求运维二个beat进度。

Beat进度的效劳就相当于一个按期任务,依据配置来执行相应的任务。

5.3.1  启动beat进程

指令如下:celery -A proj.mycelery beat -l info

图片 19

5.3.2  启动worker进程

Worker进度运行和前面运维命令同样。celery –A proj.mycelery worker –l info

图片 20

5.3     定期义务运转

布置了定期职务,除了worker进度外,还须求运维2个beat进度。

Beat进度的功力就一定于2个按期职责,依据陈设来施行相应的天职。

本文由澳门威利斯人发布于网络资讯,转载请注明出处:分布式任务队列Celery入门与进阶,Django使用Cele

关键词: 澳门威利斯人