精品国产人成在线_亚洲高清无码在线观看_国产在线视频国产永久2021_国产AV综合第一页一个的一区免费影院黑人_最近中文字幕MV高清在线视频

0
  • 聊天消息
  • 系統消息
  • 評論與回復
登錄后你可以
  • 下載海量資料
  • 學習在線課程
  • 觀看技術視頻
  • 寫文章/發(fā)帖/加入社區(qū)
會員中心
創(chuàng)作中心

完善資料讓更多小伙伴認識你,還能領取20積分哦,立即完善>

3天內不再提示

Crontab:簡單實用的Python 周期任務調度工具

科技綠洲 ? 來源:Python實用寶典 ? 作者:Python實用寶典 ? 2023-11-01 09:40 ? 次閱讀

如果你想周期性地執(zhí)行某個 Python 腳本,最出名的選擇應該是 Crontab 腳本,但是 Crontab 具有以下缺點:

  • 1.不方便執(zhí)行 秒級任務 。
  • 2.當需要執(zhí)行的定時任務有上百個的時候,Crontab 的 管理就會特別不方便 。

還有一個選擇是 Celery,但是 Celery 的配置比較麻煩,如果你只是需要一個輕量級的調度工具,Celery 不會是一個好選擇。

在你想要使用一個輕量級的任務調度工具,而且希望它盡量簡單、容易使用、不需要外部依賴,最好能夠容納 Crontab 的所有基本功能,那么 Schedule 模塊是你的不二之選。

使用它來調度任務可能只需要幾行代碼,感受一下:

# Python 實用寶典
import schedule
import time

def job():
    print("I'm working...")

schedule.every(10).minutes.do(job)

while True:
    schedule.run_pending()
    time.sleep(1)

上面的代碼表示每10分鐘執(zhí)行一次 job 函數,非常簡單方便。你只需要引入 schedule 模塊,通過調用** scedule.every(時間數).時間類型.do(job) ** 發(fā)布周期任務。

發(fā)布后的周期任務需要用** run_pending函數來檢測是否執(zhí)行,因此需要一個While **循環(huán)不斷地輪詢這個函數。

下面具體講講Schedule模塊的安裝和初級、進階使用方法。

1.準備

開始之前,你要確保Python和pip已經成功安裝在電腦上,如果沒有,可以訪問這篇文章:超詳細Python安裝指南 進行安裝。

(可選1) 如果你用Python的目的是數據分析,可以直接安裝Anaconda:Python數據分析與挖掘好幫手—Anaconda,它內置了Python和pip.

(可選2) 此外,推薦大家用VSCode編輯器,它有許多的優(yōu)點:Python 編程的最好搭檔—VSCode 詳細指南。

請選擇以下任一種方式輸入命令安裝依賴

  1. Windows 環(huán)境 打開 Cmd (開始-運行-CMD)。
  2. MacOS 環(huán)境 打開 Terminal (command+空格輸入Terminal)。
  3. 如果你用的是 VSCode編輯器 或 Pycharm,可以直接使用界面下方的Terminal.
pip install schedule

2.基本使用

最基本的使用在文首已經提到過,下面給大家展示更多的調度任務例子:

# Python 實用寶典
import schedule
import time

def job():
    print("I'm working...")

# 每十分鐘執(zhí)行任務
schedule.every(10).minutes.do(job)
# 每個小時執(zhí)行任務
schedule.every().hour.do(job)
# 每天的10:30執(zhí)行任務
schedule.every().day.at("10:30").do(job)
# 每個月執(zhí)行任務
schedule.every().monday.do(job)
# 每個星期三的13:15分執(zhí)行任務
schedule.every().wednesday.at("13:15").do(job)
# 每分鐘的第17秒執(zhí)行任務
schedule.every().minute.at(":17").do(job)

while True:
    schedule.run_pending()
    time.sleep(1)

可以看到,從月到秒的配置,上面的例子都覆蓋到了。不過如果你想只運行一次任務的話,可以這么配:

# Python 實用寶典
import schedule
import time

def job_that_executes_once():
    # 此處編寫的任務只會執(zhí)行一次...
    return schedule.CancelJob

schedule.every().day.at('22:30').do(job_that_executes_once)

while True:
    schedule.run_pending()
    time.sleep(1)

參數傳遞

如果你有參數需要傳遞給作業(yè)去執(zhí)行,你只需要這么做:

# Python 實用寶典
import schedule

def greet(name):
    print('Hello', name)

# do() 將額外的參數傳遞給job函數
schedule.every(2).seconds.do(greet, name='Alice')
schedule.every(4).seconds.do(greet, name='Bob')

獲取目前所有的作業(yè)

如果你想獲取目前所有的作業(yè):

# Python 實用寶典
import schedule

def hello():
    print('Hello world')

schedule.every().second.do(hello)

all_jobs = schedule.get_jobs()

取消所有作業(yè)

如果某些機制觸發(fā)了,你需要立即清除當前程序的所有作業(yè):

# Python 實用寶典
import schedule

def greet(name):
    print('Hello {}'.format(name))

schedule.every().second.do(greet)

schedule.clear()

標簽功能

在設置作業(yè)的時候,為了后續(xù)方便管理作業(yè),你可以給作業(yè)打個標簽,這樣你可以通過標簽過濾獲取作業(yè)或取消作業(yè)。

# Python 實用寶典
import schedule

def greet(name):
    print('Hello {}'.format(name))

# .tag 打標簽
schedule.every().day.do(greet, 'Andrea').tag('daily-tasks', 'friend')
schedule.every().hour.do(greet, 'John').tag('hourly-tasks', 'friend')
schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer')
schedule.every().day.do(greet, 'Derek').tag('daily-tasks', 'guest')

# get_jobs(標簽):可以獲取所有該標簽的任務
friends = schedule.get_jobs('friend')

# 取消所有 daily-tasks 標簽的任務
schedule.clear('daily-tasks')

**
設定作業(yè)截止時間**

如果你需要讓某個作業(yè)到某個時間截止,你可以通過這個方法:

# Python 實用寶典
import schedule
from datetime import datetime, timedelta, time

def job():
    print('Boo')

# 每個小時運行作業(yè),18:30后停止
schedule.every(1).hours.until("18:30").do(job)

# 每個小時運行作業(yè),2030-01-01 18:33 today
schedule.every(1).hours.until("2030-01-01 18:33").do(job)

# 每個小時運行作業(yè),8個小時后停止
schedule.every(1).hours.until(timedelta(hours=8)).do(job)

# 每個小時運行作業(yè),11:32:42后停止
schedule.every(1).hours.until(time(11, 33, 42)).do(job)

# 每個小時運行作業(yè),2020-5-17 11:36:20后停止
schedule.every(1).hours.until(datetime(2020, 5, 17, 11, 36, 20)).do(job)

截止日期之后,該作業(yè)將無法運行。

立即運行所有作業(yè),而不管其安排如何

如果某個機制觸發(fā)了,你需要立即運行所有作業(yè),可以調用 ** schedule.run_all() ** :

# Python 實用寶典
import schedule

def job_1():
    print('Foo')

def job_2():
    print('Bar')

schedule.every().monday.at("12:40").do(job_1)
schedule.every().tuesday.at("16:40").do(job_2)

schedule.run_all()

# 立即運行所有作業(yè),每次作業(yè)間隔10schedule.run_all(delay_seconds=10)

3.高級使用

裝飾器安排作業(yè)

如果你覺得設定作業(yè)這種形式太啰嗦了,也可以使用裝飾器模式:

# Python 實用寶典
from schedule import every, repeat, run_pending
import time

# 此裝飾器效果等同于 schedule.every(10).minutes.do(job)
@repeat(every(10).minutes)
def job():
    print("I am a scheduled job")

while True:
    run_pending()
    time.sleep(1)

并行執(zhí)行

默認情況下,Schedule 按順序執(zhí)行所有作業(yè)。其背后的原因是,很難找到讓每個人都高興的并行執(zhí)行模型。

不過你可以通過多線程的形式來運行每個作業(yè)以解決此限制:

# Python 實用寶典
import threading
import time
import schedule

def job1():
    print("I'm running on thread %s" % threading.current_thread())
def job2():
    print("I'm running on thread %s" % threading.current_thread())
def job3():
    print("I'm running on thread %s" % threading.current_thread())

def run_threaded(job_func):
    job_thread = threading.Thread(target=job_func)
    job_thread.start()

schedule.every(10).seconds.do(run_threaded, job1)
schedule.every(10).seconds.do(run_threaded, job2)
schedule.every(10).seconds.do(run_threaded, job3)

while True:
    schedule.run_pending()
    time.sleep(1)

日志記錄

Schedule 模塊同時也支持 logging 日志記錄,這么使用:

# Python 實用寶典
import schedule
import logging

logging.basicConfig()
schedule_logger = logging.getLogger('schedule')
# 日志級別為DEBUG
schedule_logger.setLevel(level=logging.DEBUG)

def job():
    print("Hello, Logs")

schedule.every().second.do(job)

schedule.run_all()

schedule.clear()

效果如下:

DEBUG:schedule:Running *all* 1 jobs with 0s delay in between
DEBUG:schedule:Running job Job(interval=1, unit=seconds, do=job, args=(), kwargs={})
Hello, Logs
DEBUG:schedule:Deleting *all* jobs

異常處理

Schedule 不會自動捕捉異常,它遇到異常會直接拋出,這會導致一個嚴重的問題: 后續(xù)所有的作業(yè)都會被中斷執(zhí)行 ,因此我們需要捕捉到這些異常。

你可以手動捕捉,但是某些你預料不到的情況需要程序進行自動捕獲,加一個裝飾器就能做到了:

# Python 實用寶典
import functools

def catch_exceptions(cancel_on_failure=False):
    def catch_exceptions_decorator(job_func):
        @functools.wraps(job_func)
        def wrapper(*args, **kwargs):
            try:
                return job_func(*args, **kwargs)
            except:
                import traceback
                print(traceback.format_exc())
                if cancel_on_failure:
                    return schedule.CancelJob
        return wrapper
    return catch_exceptions_decorator

@catch_exceptions(cancel_on_failure=True)
def bad_task():
    return 1 / 0

schedule.every(5).minutes.do(bad_task)

這樣,**bad_task在執(zhí)行時遇到的任何錯誤,都會被catch_exceptions **捕獲,這點在保證調度任務正常運轉的時候非常關鍵。

聲明:本文內容及配圖由入駐作者撰寫或者入駐合作網站授權轉載。文章觀點僅代表作者本人,不代表電子發(fā)燒友網立場。文章及其配圖僅供工程師學習之用,如有內容侵權或者其他違規(guī)問題,請聯系本站處理。 舉報投訴
  • 函數
    +關注

    關注

    3

    文章

    4307

    瀏覽量

    62434
  • 代碼
    +關注

    關注

    30

    文章

    4751

    瀏覽量

    68359
  • python
    +關注

    關注

    56

    文章

    4782

    瀏覽量

    84463
  • 腳本
    +關注

    關注

    1

    文章

    387

    瀏覽量

    14833
收藏 人收藏

    評論

    相關推薦

    STM32 簡單任務調度

    );int64u uNextTick;int32u uLenTick;}sTask; 其中fTask為任務指針,指向具體的任務,uNextTick為該任務下一次執(zhí)行的時間,uLenTick為
    發(fā)表于 03-09 15:59

    crontab 任務調度用不了啊

    為什么我的任務調度執(zhí)行不了啊?????
    發(fā)表于 10-17 21:07

    樹莓派 python SimpleCV crontab 拍照

    1.用python寫一個定時拍照的程序使用了SimpleCV庫,安裝方法:1)sudo apt-get install python-opencv python-scipy python
    發(fā)表于 10-07 22:47

    簡單任務調度代碼

    通過定時器節(jié)拍控制任務執(zhí)行周期,此代碼的中斷函數時AVR的簡單任務調度.rar (2.4 KB )
    發(fā)表于 06-12 04:35

    使用SAFECheckpoints驗證任務調度性能

    在基于任務優(yōu)先級的搶占式調度機制中,會選擇就緒的最高優(yōu)先級任務執(zhí)行,因此,需要仔細考慮分配給每個任務的優(yōu)先級,它將直接影響任務何時被執(zhí)行。
    發(fā)表于 12-11 10:01

    任務調度器有何作用

    背景我們在做項目的時候,有時候會遇到對周期比較敏感的任務比如周期發(fā)送報文,由于對時間比較敏感我們需要此任務放在比較高的優(yōu)先級,為方便任務管理
    發(fā)表于 11-23 08:12

    busybox用crontab/crond在嵌入式系統中添加定時任務的方法

    busybox 用crontab、crond在嵌入式系統中添加定時任務:參考文檔:在嵌入式系統中,定時任務通過crond和cronttab兩個系統命令來聯合執(zhí)行。其中crond是定時任務
    發(fā)表于 12-14 06:40

    DVS系統硬實時周期任務動態(tài)調度算法

    與實時任務的可調度分析不同,實時DVS調度在保證任務截止時間限制同時,還要關注任務執(zhí)行的處理器功耗。功耗研究一段時間的累積效果,傳統基于最壞
    發(fā)表于 12-16 23:55 ?12次下載

    VxWorks下周期任務調度任務周期選擇

    本文介紹了VxWorks操作系統下任務調度的策略,分析了實際設計應用中,周期任務調度的需求。介紹了一種在VxWorks操作系統下優(yōu)化
    發(fā)表于 06-25 14:08 ?18次下載

    Linux任務調度crontab時間規(guī)則介紹

    sudo crontab -e5 * * * *每小時第5分鐘執(zhí)行*/5 * * * *每5分鐘執(zhí)行0 2 * * * 每天凌晨2點執(zhí)行cron是一個linux下的定時執(zhí)行工具,可以
    發(fā)表于 04-02 14:33 ?269次閱讀

    Python定時任務的實現方式

    在日常工作中,我們常常會用到需要周期性執(zhí)行的任務,一種方式是采用 Linux 系統自帶的 crond 結合命令行實現。另外一種方式是直接使用Python。接下來整理的是常見的Python
    的頭像 發(fā)表于 10-08 15:20 ?5704次閱讀

    單片機簡單任務調度框架

    背景我們在做項目的時候,有時候會遇到對周期比較敏感的任務比如周期發(fā)送報文,由于對時間比較敏感我們需要此任務放在比較高的優(yōu)先級,為方便任務管理
    發(fā)表于 11-15 12:21 ?19次下載
    單片機<b class='flag-5'>簡單任務</b><b class='flag-5'>調度</b>框架

    簡單實用的Python周期任務調度工具

    如果你想周期性地執(zhí)行某個 Python 腳本,最出名的選擇應該是 Crontab 腳本,
    的頭像 發(fā)表于 02-24 10:46 ?636次閱讀

    Schedule:簡單實用的 Python 周期任務調度工具

    如果你想在Linux服務器上周期性地執(zhí)行某個 Python 腳本,最出名的選擇應該是 Crontab 腳本,但是 Crontab 具有以下缺點: ** 1.不方便執(zhí)行 秒級的
    的頭像 發(fā)表于 10-30 11:18 ?657次閱讀

    Celery Beat 的周期調度機制及實現原理

    Celery 是一個簡單、靈活且可靠的,處理大量消息的分布式系統,它是一個專注于實時處理的任務隊列,同時也支持任務調度。 為了講解 Celery Beat 的
    的頭像 發(fā)表于 10-31 15:24 ?667次閱讀