Celery 是一個(gè)簡(jiǎn)單、靈活且可靠的,處理大量消息的分布式系統(tǒng),它是一個(gè)專注于實(shí)時(shí)處理的任務(wù)隊(duì)列,同時(shí)也支持任務(wù)調(diào)度。
為了講解 Celery Beat 的周期調(diào)度機(jī)制及實(shí)現(xiàn)原理,我們會(huì)基于Django從制作一個(gè)簡(jiǎn)單的周期任務(wù)開始,然后一步一步拆解 Celery Beat 的源代碼。
相關(guān)前置應(yīng)用知識(shí),可以閱讀以下文章:
1.Celery 簡(jiǎn)單周期任務(wù)示例
在 celery_app.tasks.py 中添加如下任務(wù):
@shared_task
def pythondict_task():
print("pythondict_task")
在 django.celery.py 文件中添加如下配置:
from celery_django import settings
from datetime import timedelta
app.autodiscover_tasks(lambda : settings.INSTALLED_APPS)
CELERYBEAT_SCHEDULE = {
'pythondict_task': {
'task': 'celery_app.tasks.pythondict_task',
'schedule': timedelta(seconds=3),
},
}
app.conf.update(CELERYBEAT_SCHEDULE=CELERYBEAT_SCHEDULE)
至此,配置完成,此時(shí),先啟動(dòng) Celery Beat 定時(shí)任務(wù)命令:
celery beat -A celery_django -S django
然后打開第二個(gè)終端進(jìn)程啟動(dòng)消費(fèi)者:
celery -A celery_django worker
此時(shí)在worker的終端上就會(huì)輸出類似如下的信息:
[2021-07-11 16:34:11,546: WARNING/PoolWorker-3] pythondict_task
[2021-07-11 16:34:11,550: WARNING/PoolWorker-4] pythondict_task
[2021-07-11 16:34:11,551: WARNING/PoolWorker-2] pythondict_task
[2021-07-11 16:34:11,560: WARNING/PoolWorker-1] pythondict_task
看到結(jié)果正常輸出,說明任務(wù)成功定時(shí)執(zhí)行。
2.源碼剖析
為了明白 Celery Beat 是如何實(shí)現(xiàn)周期任務(wù)調(diào)度的,我們需要從 Celery 源碼入手。
當(dāng)你執(zhí)行 Celery Beat 啟動(dòng)命令的時(shí)候,到底發(fā)生了什么?
celery beat -A celery_django -S django
當(dāng)你執(zhí)行這個(gè)命令的時(shí)候,Celery/bin/celery.py 中的 CeleryCommand 類接收到命令后,會(huì)選擇 beat 對(duì)應(yīng)的類執(zhí)行如下代碼:
# Python 實(shí)用寶典
# https://pythondict.com
from celery.bin.beat import beat
class CeleryCommand(Command):
commands = {
# ...
'beat': beat,
# ...
}
# ...
def execute(self, command, argv=None):
try:
cls = self.commands[command]
except KeyError:
cls, argv = self.commands['help'], ['help']
cls = self.commands.get(command) or self.commands['help']
try:
return cls(
app=self.app, on_error=self.on_error,
no_color=self.no_color, quiet=self.quiet,
on_usage_error=partial(self.on_usage_error, command=command),
).run_from_argv(self.prog_name, argv[1:], command=argv[0])
except self.UsageError as exc:
self.on_usage_error(exc)
return exc.status
except self.Error as exc:
self.on_error(exc)
return exc.status
此時(shí)cls對(duì)應(yīng)的是beat類,通過查看位于bin/beat.py中的 beat 類可知,該類只重寫了run方法和add_arguments方法。
所以此時(shí)執(zhí)行的 run_from_argv 方法是 beat 繼承的 Command 的 run_from_argv 方法:
# Python 實(shí)用寶典
# https://pythondict.com
def run_from_argv(self, prog_name, argv=None, command=None):
return self.handle_argv(prog_name, sys.argv if argv is None else argv, command)
該方法中會(huì)調(diào)用 Command 的 handle_argv 方法,而該方法在經(jīng)過相關(guān)參數(shù)處理后會(huì)調(diào)用 self(*args, **options) 到 call 函數(shù):
# Python 實(shí)用寶典
# https://pythondict.com
def handle_argv(self, prog_name, argv, command=None):
"""Parse command-line arguments from ``argv`` and dispatch
to :meth:`run`.
:param prog_name: The program name (``argv[0]``).
:param argv: Command arguments.
Exits with an error message if :attr:`supports_args` is disabled
and ``argv`` contains positional arguments.
"""
options, args = self.prepare_args(
*self.parse_options(prog_name, argv, command))
return self(*args, **options)
Command 類的 __call__函數(shù):
# Python 實(shí)用寶典
# https://pythondict.com
def __call__(self, *args, **kwargs):
random.seed() # maybe we were forked.
self.verify_args(args)
try:
ret = self.run(*args, **kwargs)
return ret if ret is not None else EX_OK
except self.UsageError as exc:
self.on_usage_error(exc)
return exc.status
except self.Error as exc:
self.on_error(exc)
return exc.status
可見,在該函數(shù)中會(huì)調(diào)用到run方法,此時(shí)調(diào)用的run方法就是beat類中重寫的run方法,查看該方法:
# Python 實(shí)用寶典
# https://pythondict.com
class beat(Command):
"""Start the beat periodic task scheduler.
Examples::
celery beat -l info
celery beat -s /var/run/celery/beat-schedule --detach
celery beat -S djcelery.schedulers.DatabaseScheduler
"""
doc = __doc__
enable_config_from_cmdline = True
supports_args = False
def run(self, detach=False, logfile=None, pidfile=None, uid=None,
gid=None, umask=None, working_directory=None, **kwargs):
# 是否開啟后臺(tái)運(yùn)行
if not detach:
maybe_drop_privileges(uid=uid, gid=gid)
workdir = working_directory
kwargs.pop('app', None)
# 設(shè)定偏函數(shù)
beat = partial(self.app.Beat,
logfile=logfile, pidfile=pidfile, **kwargs)
if detach:
with detached(logfile, pidfile, uid, gid, umask, workdir):
return beat().run() # 后臺(tái)運(yùn)行
else:
return beat().run() # 立即運(yùn)行
這里引用了偏函數(shù)的知識(shí),偏函數(shù)就是從基函數(shù)創(chuàng)建一個(gè)新的帶默認(rèn)參數(shù)的函數(shù),詳細(xì)可見廖雪峰老師的介紹:
https://www.liaoxuefeng.com/wiki/1016959663602400/1017454145929440
可見,此時(shí)創(chuàng)建了app的Beat方法的偏函數(shù),并通過 .run 函數(shù)執(zhí)行啟動(dòng) beat 進(jìn)程,首先看看這個(gè) beat 方法:
# Python 實(shí)用寶典
# https://pythondict.com
@cached_property
def Beat(self, **kwargs):
# 導(dǎo)入celery.apps.beat:Beat類
return self.subclass_with_self('celery.apps.beat:Beat')
可以看到此時(shí)就實(shí)例化了 celery.apps.beat 中的 Beat 類,并調(diào)用了該實(shí)例的 run 方法:
# Python 實(shí)用寶典
# https://pythondict.com
def run(self):
print(str(self.colored.cyan(
'celery beat v{0} is starting.'.format(VERSION_BANNER))))
# 初始化loader
self.init_loader()
# 設(shè)置進(jìn)程
self.set_process_title()
# 開啟任務(wù)調(diào)度
self.start_scheduler()
init_loader 中,會(huì)導(dǎo)入默認(rèn)的modules,此時(shí)會(huì)引入相關(guān)的定時(shí)任務(wù),這些不是本文重點(diǎn)。我們重點(diǎn)看 start_scheduler 是如何開啟任務(wù)調(diào)度的:
# Python 實(shí)用寶典
# https://pythondict.com
def start_scheduler(self):
c = self.colored
if self.pidfile: # 是否設(shè)定了pid文件
platforms.create_pidlock(self.pidfile) # 創(chuàng)建pid文件
# 初始化service
beat = self.Service(app=self.app,
max_interval=self.max_interval,
scheduler_cls=self.scheduler_cls,
schedule_filename=self.schedule)
# 打印啟動(dòng)信息
print(str(c.blue('__ ', c.magenta('-'),
c.blue(' ... __ '), c.magenta('-'),
c.blue(' _n'),
c.reset(self.startup_info(beat)))))
# 開啟日志
self.setup_logging()
if self.socket_timeout:
logger.debug('Setting default socket timeout to %r',
self.socket_timeout)
# 設(shè)置超時(shí)
socket.setdefaulttimeout(self.socket_timeout)
try:
# 注冊(cè)handler
self.install_sync_handler(beat)
# 開啟beat
beat.start()
except Exception as exc:
logger.critical('beat raised exception %s: %r',
exc.__class__, exc,
exc_info=True)
我們看下beat是如何開啟的:
# Python 實(shí)用寶典
# https://pythondict.com
def start(self, embedded_process=False, drift=-0.010):
info('beat: Starting...')
# 打印最大間隔時(shí)間
debug('beat: Ticking with max interval- >%s',
humanize_seconds(self.scheduler.max_interval))
# 通知注冊(cè)該signal的函數(shù)
signals.beat_init.send(sender=self)
if embedded_process:
signals.beat_embedded_init.send(sender=self)
platforms.set_process_title('celery beat')
try:
while not self._is_shutdown.is_set():
# 調(diào)用scheduler.tick()函數(shù)檢查還剩多余時(shí)間
interval = self.scheduler.tick()
interval = interval + drift if interval else interval
# 如果大于0
if interval and interval > 0:
debug('beat: Waking up %s.',
humanize_seconds(interval, prefix='in '))
# 休眠
time.sleep(interval)
if self.scheduler.should_sync():
self.scheduler._do_sync()
except (KeyboardInterrupt, SystemExit):
self._is_shutdown.set()
finally:
self.sync()
這里重點(diǎn)看 self.scheduler.tick() 方法:
# Python 實(shí)用寶典
# https://pythondict.com
def tick(self):
"""Run a tick, that is one iteration of the scheduler.
Executes all due tasks.
"""
remaining_times = []
try:
# 遍歷每個(gè)周期任務(wù)設(shè)定
for entry in values(self.schedule):
# 下次運(yùn)行時(shí)間
next_time_to_run = self.maybe_due(entry, self.publisher)
if next_time_to_run:
remaining_times.append(next_time_to_run)
except RuntimeError:
pass
return min(remaining_times + [self.max_interval])
這里通過 self.schedule 拿到了所有存放在用 shelve 寫入的 celerybeat-schedule 文件的定時(shí)任務(wù),遍歷所有定時(shí)任務(wù),調(diào)用 self.maybe_due 方法:
# Python 實(shí)用寶典
# https://pythondict.com
def maybe_due(self, entry, publisher=None):
# 是否到達(dá)運(yùn)行時(shí)間
is_due, next_time_to_run = entry.is_due()
if is_due:
# 打印任務(wù)發(fā)送日志
info('Scheduler: Sending due task %s (%s)', entry.name, entry.task)
try:
# 執(zhí)行任務(wù)
result = self.apply_async(entry, publisher=publisher)
except Exception as exc:
error('Message Error: %sn%s',
exc, traceback.format_stack(), exc_info=True)
else:
debug('%s sent. id- >%s', entry.task, result.id)
return next_time_to_run
可以看到,此處會(huì)判斷任務(wù)是否到達(dá)定時(shí)時(shí)間,如果是的話,會(huì)調(diào)用 apply_async 調(diào)用Worker執(zhí)行任務(wù)。如果不是,則返回下次運(yùn)行時(shí)間,讓 Beat 進(jìn)程進(jìn)行 Sleep,減少進(jìn)程資源消耗。
到此,我們就講解完了 Celery Beat 在周期定時(shí)任務(wù)的檢測(cè)調(diào)度機(jī)制,怎么樣,小伙伴們有沒有什么疑惑?可以在下方留言區(qū)留言一起討論哦。
-
終端
+關(guān)注
關(guān)注
1文章
1115瀏覽量
29831 -
源代碼
+關(guān)注
關(guān)注
96文章
2944瀏覽量
66668 -
python
+關(guān)注
關(guān)注
56文章
4782瀏覽量
84453
發(fā)布評(píng)論請(qǐng)先 登錄
相關(guān)推薦
評(píng)論