精品国产人成在线_亚洲高清无码在线观看_国产在线视频国产永久2021_国产AV综合第一页一个的一区免费影院黑人_最近中文字幕MV高清在线视频

0
  • 聊天消息
  • 系統(tǒng)消息
  • 評(píng)論與回復(fù)
登錄后你可以
  • 下載海量資料
  • 學(xué)習(xí)在線課程
  • 觀看技術(shù)視頻
  • 寫文章/發(fā)帖/加入社區(qū)
會(huì)員中心
創(chuàng)作中心

完善資料讓更多小伙伴認(rèn)識(shí)你,還能領(lǐng)取20積分哦,立即完善>

3天內(nèi)不再提示

Celery Beat 的周期調(diào)度機(jī)制及實(shí)現(xiàn)原理

科技綠洲 ? 來源:Python實(shí)用寶典 ? 作者:Python實(shí)用寶典 ? 2023-10-31 15:24 ? 次閱讀

Celery 是一個(gè)簡(jiǎn)單、靈活且可靠的,處理大量消息的分布式系統(tǒng),它是一個(gè)專注于實(shí)時(shí)處理的任務(wù)隊(duì)列,同時(shí)也支持任務(wù)調(diào)度。

為了講解 Celery Beat 的周期調(diào)度機(jī)制及實(shí)現(xiàn)原理,我們會(huì)基于Django從制作一個(gè)簡(jiǎn)單的周期任務(wù)開始,然后一步一步拆解 Celery Beat 的源代碼。

相關(guān)前置應(yīng)用知識(shí),可以閱讀以下文章:

  1. 實(shí)戰(zhàn)教程!Django Celery 異步與定時(shí)任務(wù)
  2. Python Celery異步快速下載股票數(shù)據(jù)

1.Celery 簡(jiǎn)單周期任務(wù)示例

在 celery_app.tasks.py 中添加如下任務(wù):

@shared_task
def pythondict_task():
    print("pythondict_task")

在 django.celery.py 文件中添加如下配置:

from celery_django import settings
from datetime import timedelta


app.autodiscover_tasks(lambda : settings.INSTALLED_APPS)

CELERYBEAT_SCHEDULE = {
    'pythondict_task': {
        'task': 'celery_app.tasks.pythondict_task',
        'schedule': timedelta(seconds=3),
    },
}

app.conf.update(CELERYBEAT_SCHEDULE=CELERYBEAT_SCHEDULE)

至此,配置完成,此時(shí),先啟動(dòng) Celery Beat 定時(shí)任務(wù)命令:

celery beat -A celery_django -S django

然后打開第二個(gè)終端進(jìn)程啟動(dòng)消費(fèi)者:

celery -A celery_django worker

此時(shí)在worker的終端上就會(huì)輸出類似如下的信息

[2021-07-11 16:34:11,546: WARNING/PoolWorker-3] pythondict_task
[2021-07-11 16:34:11,550: WARNING/PoolWorker-4] pythondict_task
[2021-07-11 16:34:11,551: WARNING/PoolWorker-2] pythondict_task
[2021-07-11 16:34:11,560: WARNING/PoolWorker-1] pythondict_task

看到結(jié)果正常輸出,說明任務(wù)成功定時(shí)執(zhí)行。

2.源碼剖析

為了明白 Celery Beat 是如何實(shí)現(xiàn)周期任務(wù)調(diào)度的,我們需要從 Celery 源碼入手。

當(dāng)你執(zhí)行 Celery Beat 啟動(dòng)命令的時(shí)候,到底發(fā)生了什么?

celery beat -A celery_django -S django

當(dāng)你執(zhí)行這個(gè)命令的時(shí)候,Celery/bin/celery.py 中的 CeleryCommand 類接收到命令后,會(huì)選擇 beat 對(duì)應(yīng)的類執(zhí)行如下代碼:

# Python 實(shí)用寶典
# https://pythondict.com

from celery.bin.beat import beat

class CeleryCommand(Command):
    commands = {
        # ...
        'beat': beat,
        # ...
    }
    # ...
    def execute(self, command, argv=None):
        try:
            cls = self.commands[command]
        except KeyError:
            cls, argv = self.commands['help'], ['help']
        cls = self.commands.get(command) or self.commands['help']
        try:
            return cls(
                app=self.app, on_error=self.on_error,
                no_color=self.no_color, quiet=self.quiet,
                on_usage_error=partial(self.on_usage_error, command=command),
            ).run_from_argv(self.prog_name, argv[1:], command=argv[0])
        except self.UsageError as exc:
            self.on_usage_error(exc)
            return exc.status
        except self.Error as exc:
            self.on_error(exc)
            return exc.status

此時(shí)cls對(duì)應(yīng)的是beat類,通過查看位于bin/beat.py中的 beat 類可知,該類只重寫了run方法和add_arguments方法。

所以此時(shí)執(zhí)行的 run_from_argv 方法是 beat 繼承的 Command 的 run_from_argv 方法:

# Python 實(shí)用寶典
# https://pythondict.com

def run_from_argv(self, prog_name, argv=None, command=None):
    return self.handle_argv(prog_name, sys.argv if argv is None else argv, command)

該方法中會(huì)調(diào)用 Command 的 handle_argv 方法,而該方法在經(jīng)過相關(guān)參數(shù)處理后會(huì)調(diào)用 self(*args, **options) 到 call 函數(shù):

# Python 實(shí)用寶典
    # https://pythondict.com
    
    def handle_argv(self, prog_name, argv, command=None):
        """Parse command-line arguments from ``argv`` and dispatch
        to :meth:`run`.

        :param prog_name: The program name (``argv[0]``).
        :param argv: Command arguments.

        Exits with an error message if :attr:`supports_args` is disabled
        and ``argv`` contains positional arguments.

        """
        options, args = self.prepare_args(
            *self.parse_options(prog_name, argv, command))
        return self(*args, **options)

Command 類的 __call__函數(shù):

# Python 實(shí)用寶典
    # https://pythondict.com
    
    def __call__(self, *args, **kwargs):
        random.seed() # maybe we were forked.
        self.verify_args(args)
        try:
            ret = self.run(*args, **kwargs)
            return ret if ret is not None else EX_OK
        except self.UsageError as exc:
            self.on_usage_error(exc)
            return exc.status
        except self.Error as exc:
            self.on_error(exc)
            return exc.status

可見,在該函數(shù)中會(huì)調(diào)用到run方法,此時(shí)調(diào)用的run方法就是beat類中重寫的run方法,查看該方法:

# Python 實(shí)用寶典
# https://pythondict.com
    
class beat(Command):
    """Start the beat periodic task scheduler.

    Examples::

        celery beat -l info
        celery beat -s /var/run/celery/beat-schedule --detach
        celery beat -S djcelery.schedulers.DatabaseScheduler

    """
    doc = __doc__
    enable_config_from_cmdline = True
    supports_args = False

    def run(self, detach=False, logfile=None, pidfile=None, uid=None,
            gid=None, umask=None, working_directory=None, **kwargs):
        # 是否開啟后臺(tái)運(yùn)行
        if not detach:
            maybe_drop_privileges(uid=uid, gid=gid)
        workdir = working_directory
        kwargs.pop('app', None)
        # 設(shè)定偏函數(shù)
        beat = partial(self.app.Beat,
                       logfile=logfile, pidfile=pidfile, **kwargs)

        if detach:
            with detached(logfile, pidfile, uid, gid, umask, workdir):
                return beat().run() # 后臺(tái)運(yùn)行
        else:
            return beat().run() # 立即運(yùn)行

這里引用了偏函數(shù)的知識(shí),偏函數(shù)就是從基函數(shù)創(chuàng)建一個(gè)新的帶默認(rèn)參數(shù)的函數(shù),詳細(xì)可見廖雪峰老師的介紹:
https://www.liaoxuefeng.com/wiki/1016959663602400/1017454145929440

可見,此時(shí)創(chuàng)建了app的Beat方法的偏函數(shù),并通過 .run 函數(shù)執(zhí)行啟動(dòng) beat 進(jìn)程,首先看看這個(gè) beat 方法:

# Python 實(shí)用寶典
    # https://pythondict.com
    @cached_property
    def Beat(self, **kwargs):
        # 導(dǎo)入celery.apps.beat:Beat類
        return self.subclass_with_self('celery.apps.beat:Beat')

可以看到此時(shí)就實(shí)例化了 celery.apps.beat 中的 Beat 類,并調(diào)用了該實(shí)例的 run 方法:

# Python 實(shí)用寶典
    # https://pythondict.com
    def run(self):
        print(str(self.colored.cyan(
            'celery beat v{0} is starting.'.format(VERSION_BANNER))))
        # 初始化loader
        self.init_loader()
        # 設(shè)置進(jìn)程
        self.set_process_title()
        # 開啟任務(wù)調(diào)度
        self.start_scheduler()

init_loader 中,會(huì)導(dǎo)入默認(rèn)的modules,此時(shí)會(huì)引入相關(guān)的定時(shí)任務(wù),這些不是本文重點(diǎn)。我們重點(diǎn)看 start_scheduler 是如何開啟任務(wù)調(diào)度的:

# Python 實(shí)用寶典
    # https://pythondict.com
    def start_scheduler(self):
        c = self.colored
        if self.pidfile: # 是否設(shè)定了pid文件
            platforms.create_pidlock(self.pidfile) # 創(chuàng)建pid文件
        # 初始化service
        beat = self.Service(app=self.app,
                            max_interval=self.max_interval,
                            scheduler_cls=self.scheduler_cls,
                            schedule_filename=self.schedule)
        
        # 打印啟動(dòng)信息
        print(str(c.blue('__ ', c.magenta('-'),
                  c.blue(' ... __ '), c.magenta('-'),
                  c.blue(' _n'),
                  c.reset(self.startup_info(beat)))))
        # 開啟日志
        self.setup_logging()
        if self.socket_timeout:
            logger.debug('Setting default socket timeout to %r',
                         self.socket_timeout)
            # 設(shè)置超時(shí)
            socket.setdefaulttimeout(self.socket_timeout)
        try:
            # 注冊(cè)handler
            self.install_sync_handler(beat)
            # 開啟beat
            beat.start()
        except Exception as exc:
            logger.critical('beat raised exception %s: %r',
                            exc.__class__, exc,
                            exc_info=True)

我們看下beat是如何開啟的:

# Python 實(shí)用寶典
    # https://pythondict.com
    def start(self, embedded_process=False, drift=-0.010):
        info('beat: Starting...')
        # 打印最大間隔時(shí)間
        debug('beat: Ticking with max interval- >%s',
              humanize_seconds(self.scheduler.max_interval))
        
        # 通知注冊(cè)該signal的函數(shù)
        signals.beat_init.send(sender=self)
        if embedded_process:
            signals.beat_embedded_init.send(sender=self)
            platforms.set_process_title('celery beat')

        try:
            while not self._is_shutdown.is_set():
                # 調(diào)用scheduler.tick()函數(shù)檢查還剩多余時(shí)間
                interval = self.scheduler.tick()
                interval = interval + drift if interval else interval
                # 如果大于0
                if interval and interval > 0:
                    debug('beat: Waking up %s.',
                          humanize_seconds(interval, prefix='in '))
                    # 休眠
                    time.sleep(interval)
                    if self.scheduler.should_sync():
                        self.scheduler._do_sync()
        except (KeyboardInterrupt, SystemExit):
            self._is_shutdown.set()
        finally:
            self.sync()

這里重點(diǎn)看 self.scheduler.tick() 方法:

# Python 實(shí)用寶典
    # https://pythondict.com
    def tick(self):
        """Run a tick, that is one iteration of the scheduler.

        Executes all due tasks.

        """
        remaining_times = []
        try:
            # 遍歷每個(gè)周期任務(wù)設(shè)定
            for entry in values(self.schedule):
                # 下次運(yùn)行時(shí)間
                next_time_to_run = self.maybe_due(entry, self.publisher)
                if next_time_to_run:
                    remaining_times.append(next_time_to_run)
        except RuntimeError:
            pass

        return min(remaining_times + [self.max_interval])

這里通過 self.schedule 拿到了所有存放在用 shelve 寫入的 celerybeat-schedule 文件的定時(shí)任務(wù),遍歷所有定時(shí)任務(wù),調(diào)用 self.maybe_due 方法:

# Python 實(shí)用寶典
    # https://pythondict.com
    def maybe_due(self, entry, publisher=None):
        # 是否到達(dá)運(yùn)行時(shí)間
        is_due, next_time_to_run = entry.is_due()

        if is_due:
            # 打印任務(wù)發(fā)送日志
            info('Scheduler: Sending due task %s (%s)', entry.name, entry.task)
            try:
                # 執(zhí)行任務(wù)
                result = self.apply_async(entry, publisher=publisher)
            except Exception as exc:
                error('Message Error: %sn%s',
                      exc, traceback.format_stack(), exc_info=True)
            else:
                debug('%s sent. id- >%s', entry.task, result.id)
        return next_time_to_run

可以看到,此處會(huì)判斷任務(wù)是否到達(dá)定時(shí)時(shí)間,如果是的話,會(huì)調(diào)用 apply_async 調(diào)用Worker執(zhí)行任務(wù)。如果不是,則返回下次運(yùn)行時(shí)間,讓 Beat 進(jìn)程進(jìn)行 Sleep,減少進(jìn)程資源消耗。

到此,我們就講解完了 Celery Beat 在周期定時(shí)任務(wù)的檢測(cè)調(diào)度機(jī)制,怎么樣,小伙伴們有沒有什么疑惑?可以在下方留言區(qū)留言一起討論哦。

聲明:本文內(nèi)容及配圖由入駐作者撰寫或者入駐合作網(wǎng)站授權(quán)轉(zhuǎn)載。文章觀點(diǎn)僅代表作者本人,不代表電子發(fā)燒友網(wǎng)立場(chǎng)。文章及其配圖僅供工程師學(xué)習(xí)之用,如有內(nèi)容侵權(quán)或者其他違規(guī)問題,請(qǐng)聯(lián)系本站處理。 舉報(bào)投訴
  • 終端
    +關(guān)注

    關(guān)注

    1

    文章

    1115

    瀏覽量

    29831
  • 源代碼
    +關(guān)注

    關(guān)注

    96

    文章

    2944

    瀏覽量

    66668
  • python
    +關(guān)注

    關(guān)注

    56

    文章

    4782

    瀏覽量

    84453
收藏 人收藏

    評(píng)論

    相關(guān)推薦

    uc/os任務(wù)調(diào)度機(jī)制

    uc/os任務(wù)調(diào)度機(jī)制uc/OS 任務(wù)調(diào)度機(jī)制 內(nèi)核的核心任務(wù)是任務(wù)調(diào)度機(jī)制,為了對(duì)uC/OS進(jìn)行分析,我們從任務(wù)調(diào)度開始。在uC/OS中,一個(gè)任務(wù)通常是一個(gè)無限循環(huán),程序具有如下的結(jié)
    發(fā)表于 07-07 09:46

    UCOS之任務(wù)調(diào)度機(jī)制

    UCOS之任務(wù)調(diào)度機(jī)制
    發(fā)表于 05-30 07:56

    VxWorks系統(tǒng)的任務(wù)調(diào)度機(jī)制

    針對(duì)多任務(wù)系統(tǒng)而言,調(diào)度是指根據(jù)一定的算法.將CPU 分配給符合條件的任務(wù)使用,不同的系統(tǒng)任務(wù)調(diào)度機(jī)制不同。本文介紹VxWorks系統(tǒng)的任務(wù)調(diào)度策略和算法.分析優(yōu)先級(jí)倒置產(chǎn)
    發(fā)表于 12-16 14:11 ?10次下載

    Linux與VxWorks任務(wù)調(diào)度機(jī)制分析

    Linux與VxWorks任務(wù)調(diào)度機(jī)制分析
    發(fā)表于 03-28 09:52 ?19次下載

    嵌入式實(shí)時(shí)操作系統(tǒng)VxWorks內(nèi)核調(diào)度機(jī)制研究

    嵌入式實(shí)時(shí)操作系統(tǒng)VxWorks內(nèi)核調(diào)度機(jī)制研究
    發(fā)表于 03-29 12:26 ?13次下載

    μC/OS-II 任務(wù)調(diào)度機(jī)制的改進(jìn)

    介紹μC/OS-II 任務(wù)調(diào)度機(jī)制,并提出一種改進(jìn)方法,使μC/OS-II變成一個(gè)兼?zhèn)鋵?shí)時(shí)與分時(shí)任務(wù)調(diào)度機(jī)制的操作系統(tǒng); 論述改進(jìn)后系統(tǒng)的特點(diǎn)和要注意的問題,給出部分源代碼。
    發(fā)表于 04-15 11:21 ?14次下載

    高可信賴實(shí)時(shí)操作系統(tǒng)的防危調(diào)度機(jī)制

    為增強(qiáng)實(shí)時(shí)操作系統(tǒng)的防危性,在分析現(xiàn)有調(diào)度機(jī)制的基礎(chǔ)上,探討了最大關(guān)鍵度優(yōu)先的調(diào)度算法,該算法是一種混合型的優(yōu)先級(jí)實(shí)時(shí)調(diào)度算法,由靜態(tài)優(yōu)先級(jí)、動(dòng)態(tài)子優(yōu)先級(jí)和
    發(fā)表于 05-16 11:52 ?10次下載

    基于輪循機(jī)制和RED的語音流調(diào)度機(jī)制

           本文提出了一種新的VoIP業(yè)務(wù)流調(diào)度機(jī)制(RR-RED),通過隨機(jī)早期檢測(cè)(RED)和輪循機(jī)制(Round Robin)控制主動(dòng)丟包。該機(jī)制很好的繼承了R
    發(fā)表于 09-03 08:58 ?7次下載

    Li nux與VxWorks任務(wù)調(diào)度機(jī)制分析

    分析了Linux和VxWorks兩種多任務(wù)操作系統(tǒng)任務(wù)調(diào)度機(jī)制的異同,從任務(wù)控制塊、調(diào)度的時(shí)機(jī)、調(diào)度的優(yōu)先級(jí)和調(diào)度的策略方面進(jìn)行了詳細(xì)的分析和對(duì)比。分析了VxWorks和Linux在P
    發(fā)表于 11-13 17:54 ?10次下載

    VxWorks系統(tǒng)的任務(wù)調(diào)度機(jī)制

    針對(duì)多任務(wù)系統(tǒng)而言,調(diào)度是指根據(jù)一定的算法.將CPU 分配給符合條件的任務(wù)使用,不同的系統(tǒng)任務(wù)調(diào)度機(jī)制不同。本文介紹VxWorks系統(tǒng)的任務(wù)調(diào)度策略和算法.分析優(yōu)先級(jí)倒置產(chǎn)生
    發(fā)表于 11-27 16:26 ?13次下載

    嵌入式實(shí)時(shí)操作系統(tǒng)VxWorks內(nèi)核調(diào)度機(jī)制分析

    本文簡(jiǎn)要介紹了多任務(wù)內(nèi)核,重點(diǎn)分析了嵌入式實(shí)時(shí)操作系統(tǒng)VxWorks的內(nèi)核調(diào)度機(jī)制——優(yōu)先級(jí)搶占調(diào)度和時(shí)間片輪轉(zhuǎn)調(diào)度算法。
    發(fā)表于 12-11 16:15 ?14次下載

    Windows CE陷阱調(diào)度機(jī)制

     一.什么是陷阱調(diào)度機(jī)制?        一般來說,嵌入式操作系統(tǒng)主要由兩部分組成:運(yùn)行在核心態(tài)的內(nèi)核系統(tǒng)和運(yùn)行在用戶態(tài)的環(huán)境子系統(tǒng)組成。因
    發(fā)表于 08-27 14:38 ?627次閱讀

    基于動(dòng)態(tài)概率休眠調(diào)度機(jī)制的WSNs拓?fù)淇刂扑惴╛韓瑞艷

    基于動(dòng)態(tài)概率休眠調(diào)度機(jī)制的WSNs拓?fù)淇刂扑惴╛韓瑞艷
    發(fā)表于 03-19 19:19 ?0次下載

    虛擬計(jì)算資源調(diào)度機(jī)制研究

    針對(duì)基于Xen的vCPU調(diào)度機(jī)制對(duì)虛擬機(jī)網(wǎng)絡(luò)性能的影響進(jìn)行了深入研究和分析。提出一種高效、準(zhǔn)確、輕量級(jí)的網(wǎng)絡(luò)排隊(duì)敏感類型虛擬機(jī)(NSVM)識(shí)別方法,可根據(jù)當(dāng)前虛擬機(jī)I/O傳輸特征將容易受到影響
    發(fā)表于 02-08 17:08 ?0次下載
    虛擬計(jì)算資源<b class='flag-5'>調(diào)度機(jī)制</b>研究

    NB―IoT物理控制信道NB―PDCCH及資源調(diào)度機(jī)制

    NB―IoT物理控制信道NB―PDCCH及資源調(diào)度機(jī)制(現(xiàn)代電源技術(shù)試題及答案)-NB―IoT物理控制信道NB―PDCCH及資源調(diào)度機(jī)制 ? ? ? ? ?
    發(fā)表于 08-31 19:56 ?13次下載
    NB―IoT物理控制信道NB―PDCCH及資源<b class='flag-5'>調(diào)度機(jī)制</b>