金沙国际官网_金沙国际平台登录

因为这个金沙国际官网_金沙国际平台登录网站与很多的大型澳门赌场都有合作,金沙国际官网_金沙国际平台登录尽职尽责,高效执行,保持好奇心,不断学习,追求卓越,点击进入金沙国际官网_金沙国际平台登录马上体验吧,所以现在也正式地开始了营业。

您的位置:金沙国际官网 > 编程 > 分布式任务队列Celery入门与进阶,中文字符串编

分布式任务队列Celery入门与进阶,中文字符串编

发布时间:2019-11-01 17:41编辑:编程浏览(87)

    (2015年8月5日更新:微软已经修复了Roslyn的这个bug,详见 )

    走进 LINQ 的世界

    一、简介

      Celery是由Python开发、简单、灵活、可靠的分布式任务队列,其本质是生产者消费者模型,生产者发送任务到消息队列,消费者负责处理任务。Celery侧重于实时操作,但对调度支持也很好,其每天可以处理数以百万计的任务。特点:

    • 简单:熟悉celery的工作流程后,配置使用简单
    • 高可用:当任务执行失败或执行过程中发生连接中断,celery会自动尝试重新执行任务
    • 快速:一个单进程的celery每分钟可处理上百万个任务
    • 灵活:几乎celery的各个组件都可以被扩展及自定制

    应用场景举例:

      1.web应用:当用户在网站进行某个操作需要很长时间完成时,我们可以将这种操作交给Celery执行,直接返回给用户,等到Celery执行完成以后通知用户,大大提好网站的并发以及用户的体验感。

      2.任务场景:比如在运维场景下需要批量在几百台机器执行某些命令或者任务,此时Celery可以轻松搞定。

      3.定时任务:向定时导数据报表、定时发送通知类似场景,虽然Linux的计划任务可以帮我实现,但是非常不利于管理,而Celery可以提供管理接口和丰富的API。

    昨天,我们用VS2015编译了博客程序中的一个程序集并发布上线。

      在此之前曾发表过三篇关于 LINQ 的随笔:

        进阶:《LINQ 标准查询操作概述》(强烈推荐)

        技巧:《Linq To Objects - 如何操作字符串》 和 《Linq To Objects - 如何操作文件目录》

      现在,自己打算再整理一篇关于 LINQ 入门的随笔,也是图文并茂的哦。

     

    二、架构&工作原理

      Celery由以下三部分构成:消息中间件(Broker)、任务执行单元Worker、结果存储(Backend),如下图:

      图片 1

    工作原理:

    1. 任务模块Task包含异步任务和定时任务。其中,异步任务通常在业务逻辑中被触发并发往消息队列,而定时任务由Celery Beat进程周期性地将任务发往消息队列;
    2. 任务执行单元Worker实时监视消息队列获取队列中的任务执行;
    3. Woker执行完任务后将结果保存在Backend中;

    今天有园友反馈向我们反馈,个人博客分页显示随笔列表的页面中,“上一页”“下一页”显示乱码:

    目录

    • LINQ 简介
    • 介绍 LINQ 查询
    • LINQ 基本查询操作
    • 使用 LINQ 进行数据转换
    • LINQ 查询操作的类型关系
    • LINQ 中的查询语法和方法语法

     

    消息中间件Broker

      消息中间件Broker官方提供了很多备选方案,支持RabbitMQ、Redis、Amazon SQS、MongoDB、Memcached 等,官方推荐RabbitMQ。

    图片 2

    LINQ 简介

      语言集成查询 (LINQ) 是 Visual Studio 2008 和 .NET Framework 3.5 版中引入的一项创新功能。

      传统上,针对数据的查询都是以简单的字符串表示,而没有编译时类型检查或 IntelliSense 支持。此外,您还必须针对以下各种数据源学习一种不同的查询语言:SQL 数据库、XML 文档、各种 Web 服务等等。 通过LINQ, 您可以使用语言关键字和熟悉的运算符针对强类型化对象集合编写查询。

    图片 3

      

      在 Visual Studio 中,可以为以下数据源编写 LINQ 查询:SQL Server 数据库、XML 文档、ADO.NET 数据集,以及支持 IEnumerable 或泛型 IEnumerable<T> 接口的任意对象集合。

      使用要求:项目 ≥ .NET Framework 3.5 。

     

    任务执行单元Worker

      Worker是任务执行单元,负责从消息队列中取出任务执行,它可以启动一个或者多个,也可以启动在不同的机器节点,这就是其实现分布式的核心。

    而这个地方的“上一页”“下一页”字符串恰恰是在我们昨天发布的程序集中定义的:

    一、介绍 LINQ 查询

      查询是一种从数据源检索数据的表达式。随着时间的推移,人们已经为各种数据源开发了不同的语言;例如,用于关系数据库的 SQL 和用于 XML 的 XQuery。因此,开发人员不得不针对他们必须支持的每种数据源或数据格式而学习新的查询语言。LINQ 通过提供一种跨数据源和数据格式使用数据的一致模型,简化了这一情况。在 LINQ 查询中,始终会用到对象。可以使用相同的编码模式来查询和转换 XML 文档、SQL 数据库、ADO.NET 数据集、.NET 集合中的数据以及对其有 LINQ 提供程序可用的任何其他格式的数据。  

     

    结果存储Backend

      Backend结果存储官方也提供了诸多的存储方式支持:RabbitMQ、 Redis、Memcached,SQLAlchemy, Django ORM、Apache Cassandra、Elasticsearch。

    public class Pager : Control
    {
        protected string PreviousText = "上一页";
        protected string NextText = "下一页";
    
        //...
    }
    

      1.1 查询操作的三个部分

      操作三部曲:①取数据源 ②创建查询 ③执行查询

    图片 4图片 5

     1 internal class Program
     2 {
     3         private static void Main(string[] args)
     4         {
     5             //1.获取数据源
     6             var nums = new int[7] { 0, 1, 2, 3, 4, 5, 6 };
     7 
     8             //2.创建查询
     9             var numQuery =
    10                 from num in nums
    11                 where (num % 2) == 0
    12                 select num;
    13 
    14             //3.执行查询
    15             foreach (var num in numQuery)
    16             {
    17                 Console.WriteLine("{0}", num);
    18             }
    19         }
    20 }
    

    View Code

    图片 6

     

       下图显示了完整的查询操作。在 LINQ 中,查询的执行与查询本身截然不同;换句话说,查询本身指的是只创建查询变量,不检索任何数据。

    图片 7

      

    三、安装使用 

      这里我使用的redis作为消息中间件,redis安装可以参考

    Celery安装: 

    pip3 install celery
    

    可是昨天我们并没有更改这部分代码,肯定不是我们昨天代码修改引起的。

      1.2 数据源

      在上一个示例中,由于数据源是数组,因此它隐式支持泛型 IEnumerable<T> 接口。支持 IEnumerable<T> 或派生接口(如泛型 IQueryable<T>)的类型称为可查询类型。  

      可查询类型不需要进行修改或特殊处理就可以用作 LINQ 数据源。如果源数据还没有作为可查询类型出现在内存中,则 LINQ 提供程序必须以此方式表示源数据。例如,LINQ to XML 将 XML 文档加载到可查询的 XElement 类型中:

      //从 XML 中创建数据源
      //using System.Xml.Linq;
      var contacts = XElement.Load(@"c:xxx.xml");
    

      

      在 LINQ to SQL 中,首先需要创建对象关系映射。 针对这些对象编写查询,然后由 LINQ to SQL 在运行时处理与数据库的通信。

    图片 8图片 9

    1     var  db = new Northwnd(@"c:northwnd.mdf");
    2     
    3     //查询在伦敦的客户
    4     var custQuery =
    5         from cust in db.Customers
    6         where cust.City == "London"
    7         select cust;
    

    Customers 表示数据库中的特定表

     

    简单使用

      目录结构:

    project/
    ├── __init__.py  
    ├── config.py
    └── tasks.py
    

    各目录文件说明:

    __init__.py:初始化Celery以及加载配置文件

    #!/usr/bin/env python3
    # -*- coding:utf-8 -*-
    # Author:wd
    from celery import Celery
    app = Celery('project')                                # 创建 Celery 实例
    app.config_from_object('project.config')               # 加载配置模块
    

    config.py:  Celery相关配置文件,更多配置参考:

    #!/usr/bin/env python3
    # -*- coding:utf-8 -*-
    # Author:wd
    
    BROKER_URL = 'redis://10.1.210.69:6379/0' # Broker配置,使用Redis作为消息中间件
    
    CELERY_RESULT_BACKEND = 'redis://10.1.210.69:6379/0' # BACKEND配置,这里使用redis
    
    CELERY_RESULT_SERIALIZER = 'json' # 结果序列化方案
    
    CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任务过期时间
    
    CELERY_TIMEZONE='Asia/Shanghai'   # 时区配置
    
    CELERY_IMPORTS = (     # 指定导入的任务模块,可以指定多个
        'project.tasks',
    )
    

    tasks.py :任务定义文件

    #!/usr/bin/env python3
    # -*- coding:utf-8 -*-
    # Author:wd
    
    from project import app
    @app.task
    def show_name(name):
        return name
    

    启动Worker:

    celery worker -A project -l debug
    

    各个参数含义:

      worker: 代表第启动的角色是work当然还有beat等其他角色;

      -A :项目路径,这里我的目录是project

      -l:启动的日志级别,更多参数使用celery --help查看

    查看日志输出,会发现我们定义的任务,以及相关配置:

    图片 10

     

      虽然启动了worker,但是我们还需要通过delay或apply_async来将任务添加到worker中,这里我们通过交互式方法添加任务,并返回AsyncResult对象,通过AsyncResult对象获取结果:

    图片 11

    AsyncResult除了get方法用于常用获取结果方法外还提以下常用方法或属性:

    • state: 返回任务状态;
    • task_id: 返回任务id;
    • result: 返回任务结果,同get()方法;
    • ready(): 判断任务是否以及有结果,有结果为True,否则False;
    • info(): 获取任务信息,默认为结果;
    • wait(t): 等待t秒后获取结果,若任务执行完毕,则不等待直接获取结果,若任务在执行中,则wait期间一直阻塞,直到超时报错;
    • successfu(): 判断任务是否成功,成功为True,否则为False;

    于是,我们改用VS2013重新编译了一下这个程序集,更新之后,乱码立马消失。

      1.3 查询

      查询指定要从数据源中检索的信息。 查询还可以指定在返回这些信息之前如何对其进行排序、分组和结构化。 查询存储在查询变量中,并用查询表达式进行初始化。

      之前的示例中的查询是从整数数组中返回所有的偶数。 该查询表达式包含三个子句:fromwhere 和 select。(如果您熟悉 SQL,您会注意到这些子句的顺序与 SQL 中的顺序相反。)from 子句指定数据源,where 子句指定应用筛选器,select 子句指定返回的元素的类型。 目前需要注意的是,在 LINQ 中,查询变量本身不执行任何操作并且不返回任何数据。 它只是存储在以后某个时刻执行查询时为生成结果而必需的信息。

     

    四、进阶使用

      对于普通的任务来说可能满足不了我们的任务需求,所以还需要了解一些进阶用法,Celery提供了诸多调度方式,例如任务编排、根据任务状态执行不同的操作、重试机制等,以下会对常用高阶用法进行讲述。

    接着,用ILSpy反编译了VS2015所编译出的程序集的IL代码之后,真相大白:

      1.4 查询执行

    定时任务&计划任务

      Celery的提供的定时任务主要靠schedules来完成,通过beat组件周期性将任务发送给woker执行。在示例中,新建文件period_task.py,并添加任务到配置文件中:

    period_task.py:

    #!/usr/bin/env python3
    # -*- coding:utf-8 -*-
    # Author:wd
    from project import app
    from celery.schedules import crontab
    
    @app.on_after_configure.connect
    def setup_periodic_tasks(sender, **kwargs):
        sender.add_periodic_task(10.0, add.s(1,3), name='1+3=') # 每10秒执行add
        sender.add_periodic_task(
            crontab(hour=16, minute=56, day_of_week=1),      #每周一下午四点五十六执行sayhai
            sayhi.s('wd'),name='say_hi'
        )
    
    
    
    @app.task
    def add(x,y):
        print(x+y)
        return x+y
    
    
    @app.task
    def sayhi(name):
        return 'hello %s' % name
    

    config.py

    #!/usr/bin/env python3
    # -*- coding:utf-8 -*-
    # Author:wd
    
    BROKER_URL = 'redis://10.1.210.69:6379/0' # Broker配置,使用Redis作为消息中间件
    
    CELERY_RESULT_BACKEND = 'redis://10.1.210.69:6379/0' # BACKEND配置,这里使用redis
    
    CELERY_RESULT_SERIALIZER = 'json' # 结果序列化方案
    
    CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任务过期时间
    
    CELERY_TIMEZONE='Asia/Shanghai'   # 时区配置
    
    CELERY_IMPORTS = (     # 指定导入的任务模块,可以指定多个
        'project.tasks',
        'project.period_task', #定时任务
    )
    

    启动worker和beat:

    celery worker -A project -l debug #启动work
    celery beat -A  project.period_task -l  debug #启动beat,注意此时对应的文件路径
    

    我们可以观察worker日志:

    图片 12

    还可以通过配置文件方式指定定时和计划任务,此时的配置文件如下:

    #!/usr/bin/env python3
    # -*- coding:utf-8 -*-
    # Author:wd
    
    from project import app
    from celery.schedules import crontab
    
    BROKER_URL = 'redis://10.1.210.69:6379/0' # Broker配置,使用Redis作为消息中间件
    
    CELERY_RESULT_BACKEND = 'redis://10.1.210.69:6379/0' # BACKEND配置,这里使用redis
    
    CELERY_RESULT_SERIALIZER = 'json' # 结果序列化方案
    
    CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任务过期时间
    
    CELERY_TIMEZONE='Asia/Shanghai'   # 时区配置
    
    CELERY_IMPORTS = (     # 指定导入的任务模块,可以指定多个
        'project.tasks',
        'project.period_task',
    )
    
    app.conf.beat_schedule = {
        'period_add_task': {    # 计划任务
            'task': 'project.period_task.add',  #任务路径
            'schedule': crontab(hour=18, minute=16, day_of_week=1),
            'args': (3, 4),
        },
    'add-every-30-seconds': {          # 每10秒执行
            'task': 'project.period_task.sayhi',  #任务路径
            'schedule': 10.0,
            'args': ('wd',)
        },
    }
    

    此时的period_task.py只需要注册到woker中就行了,如下:

    #!/usr/bin/env python3
    # -*- coding:utf-8 -*-
    # Author:wd
    from project import app
    
    @app.task
    def add(x,y):
        print(x+y)
        return x+y
    
    
    @app.task
    def sayhi(name):
        return 'hello %s' % name
    

    同样启动worker和beat结果和第一种方式一样。更多详细的内容请参考:

    public class Pager : Control
    {
        protected string PreviousText = "ÉÏÒ»Ò³";
        protected string NextText = "ÏÂÒ»Ò³";
        //...
    }
    

      1.延迟执行

        如前所述,查询变量本身只是存储查询命令。  实际的查询执行会延迟到在 foreach 语句中循环访问查询变量时发生。 此概念称为“延迟执行”。

    任务绑定

      Celery可通过任务绑定到实例获取到任务的上下文,这样我们可以在任务运行时候获取到任务的状态,记录相关日志等。

    修改任务中的period_task.py,如下:

    #!/usr/bin/env python3
    # -*- coding:utf-8 -*-
    # Author:wd
    from project import app
    from celery.utils.log import get_task_logger
    
    logger = get_task_logger(__name__)
    @app.task(bind=True)  # 绑定任务
    def add(self,x,y):
        logger.info(self.request.__dict__)  #打印日志
        try:
            a=[]
            a[10]==1
        except Exception as e:
            raise self.retry(exc=e, countdown=5, max_retries=3) # 出错每5秒尝试一次,总共尝试3次
        return x+y
    

    在以上代码中,通过bind参数将任务绑定,self指任务的上下文,通过self获取任务状态,同时在任务出错时进行任务重试,我们观察日志:

    图片 13

    原来是VS2015所用的编译器惹的祸,而这个编译器就是大名鼎鼎的 Roslyn 。

      2.强制立即执行

        对一系列源元素执行聚合函数的查询必须首先循环访问这些元素。CountMaxAverage 和 First 就属于此类查询。由于查询本身必须使用 foreach 以便返回结果,因此这些查询在执行时不使用显式 foreach 语句。另外还要注意,这些类型的查询返回单个值,而不是 IEnumerable 集合。 

    图片 14图片 15

    1     var numbers = new int[7] { 0, 1, 2, 3, 4, 5, 6 };
    2 
    3     var evenNumQuery =
    4         from num in numbers
    5         where (num % 2) == 0
    6         select num;
    7 
    8     var evenNumCount = evenNumQuery.Count();
    

    View Code

    图片 16

     

      若要强制立即执行任意查询并缓存其结果,可以调用 ToList<TSource> 或 ToArray<TSource> 方法。

    图片 17图片 18

    1     var numQuery2 =
    2            (from num in numbers
    3             where (num % 2) == 0
    4             select num).ToList();
    5 
    6     var numQuery3 =
    7           (from num in numbers
    8            where (num % 2) == 0
    9             select num).ToArray();
    

    View Code

     

      此外,还可以通过在紧跟查询表达式之后的位置放置一个 foreach 循环来强制执行查询。但是,通过调用 ToList 或 ToArray,也可以将所有数据缓存在单个集合对象中。 

     

    内置钩子函数

      Celery在执行任务时候,提供了钩子方法用于在任务执行完成时候进行对应的操作,在Task源码中提供了很多状态钩子函数如:on_success(成功后执行)、on_failure(失败时候执行)、on_retry(任务重试时候执行)、after_return(任务返回时候执行),在进行使用是我们只需要重写这些方法,完成相应的操作即可。

    在以下示例中,我们继续修改period_task.py,分别定义三个任务来演示任务失败、重试、任务成功后执行的操作:

    #!/usr/bin/env python3
    # -*- coding:utf-8 -*-
    # Author:wd
    from project import app
    from celery.utils.log import get_task_logger
    from celery import Task
    
    logger = get_task_logger(__name__)
    
    class demotask(Task):
    
        def on_success(self, retval, task_id, args, kwargs):   # 任务成功执行
            logger.info('task id:{} , arg:{} , successful !'.format(task_id,args))
    
    
    
        def on_failure(self, exc, task_id, args, kwargs, einfo):  #任务失败执行
            logger.info('task id:{} , arg:{} , failed ! erros : {}' .format(task_id,args,exc))
    
    
        def on_retry(self, exc, task_id, args, kwargs, einfo):    #任务重试执行
            logger.info('task id:{} , arg:{} , retry !  einfo: {}'.format(task_id, args, exc))
    
    @app.task(base=demotask,bind=True)
    def add(self,x,y):
        try:
            a=[]
            a[10]==1
        except Exception as e:
            raise self.retry(exc=e, countdown=5, max_retries=1) # 出错每5秒尝试一次,总共尝试1次
        return x+y
    
    @app.task(base=demotask)
    def sayhi(name):
        a=[]
        a[10]==1
        return 'hi {}'.format(name)
    
    @app.task(base=demotask)
    def sum(a,b):
        return 'a+b={} '.format(a+b)
    

    此时的配置文件config.py:

    #!/usr/bin/env python3
    # -*- coding:utf-8 -*-
    # Author:wd
    
    from project import app
    from celery.schedules import crontab
    
    BROKER_URL = 'redis://10.1.210.69:6379/0' # Broker配置,使用Redis作为消息中间件
    
    CELERY_RESULT_BACKEND = 'redis://10.1.210.69:6379/0' # BACKEND配置,这里使用redis
    
    CELERY_RESULT_SERIALIZER = 'json' # 结果序列化方案
    
    CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任务过期时间
    
    CELERY_TIMEZONE='Asia/Shanghai'   # 时区配置
    
    CELERY_IMPORTS = (     # 指定导入的任务模块,可以指定多个
        'project.tasks',
        'project.period_task',
    )
    
    app.conf.beat_schedule = {
    'add': {          # 每10秒执行
            'task': 'project.period_task.add',  #任务路径
            'schedule': 10.0,
            'args': (10,12),
        },
    'sayhi': {          # 每10秒执行
            'task': 'project.period_task.sayhi',  #任务路径
            'schedule': 10.0,
            'args': ('wd',),
        },
    'sum': {          # 每10秒执行
            'task': 'project.period_task.sum',  #任务路径
            'schedule': 10.0,
            'args': (1,3),
        },
    }
    

    然后重启worker和beat,查看日志:

    图片 19

     

    本文由金沙国际官网发布于编程,转载请注明出处:分布式任务队列Celery入门与进阶,中文字符串编

    关键词: