定時器概念
什么是定時器呢?它是指從指定的時刻開始,經過一個指定時間,然后觸發(fā)一個事件,用戶可以自定義定時器的周期與頻率。
實現(xiàn)一個簡單的定時程序
方案一
在 Python 中,如何定義一個定時器函數(shù)呢?我們先看第一種方法。假設我們需要執(zhí)行一個函數(shù)userCountFunc,這個函數(shù)需要每隔一個小時被執(zhí)行一次。那么,我們可以這樣寫:
def main():
startCronTask(userCountFunc, minutes=60)
if __name__ == '__main__':
main()
如上面的代碼,我們在定義了一個 main 函數(shù)后,便定義了一個定時函數(shù) startCronTask。第一個參數(shù)為函數(shù)名,第二個參數(shù)為時間,第二個參數(shù)表示多長時間后調用后面第一個參數(shù)的函數(shù)。第一個參數(shù)注意是函數(shù)對象,進行參數(shù)傳遞,用函數(shù)名(如 userCountFunc)表示該對象,不是函數(shù)執(zhí)行語句 userCountFunc(),不然會報錯。那么,在實現(xiàn)這個函數(shù)時,需要引入定時功能,Python 中有一個定時任務模塊 BlockingScheduler:
from apscheduler.schedulers.blocking import BlockingScheduler
def startCronTask(task, **config):
# BlockingScheduler
scheduler = BlockingScheduler()
scheduler.add_job(task, 'interval', **config)
scheduler.start()
定義完一個調度模塊之后,實際的定時調度功能就完成了。接下來,需要具體實現(xiàn)定時執(zhí)行的邏輯函數(shù) userCountFunc:
def userCountFunc():
logger.info('count user')
...
這樣,對于方案一,實現(xiàn)的簡單的定時功能就完成了。
方案二
方案一中介紹的是 Python 自帶的 BlockingScheduler 模塊,Python 中除了可以通過 BlockingScheduler,還通過線程實現(xiàn)定時器 timer,來簡單的看下代碼:
import threading
def timerFunc():
print('Hello World~')
timer = threading.Timer(1, timerFunc)
timer.start()
在上面的代碼中,定時器函數(shù) threading.Timer 主要有2個參數(shù),參數(shù)意義與方案一類似,接下來執(zhí)行這段程序:
Hello World~
Process finished with exit code 0
我們發(fā)現(xiàn)只執(zhí)行一遍,程序就結束了,但顯然不是我們想要的結果。其實,我們看下 Time 類,有這樣的一句解釋性注釋:Call a function after a specified number of seconds,我們發(fā)現(xiàn)上面在執(zhí)行后并未循環(huán)執(zhí)行,所以需要修改下:
import threading
def timerFunc():
print('Hello World~')
global timer
timer = threading.Timer(10.5, timerFunc)
timer.start()
timer = threading.Timer(3, timerFunc)
timer.start()
此時,我們可以看到輸出結果:
Hello World~
Hello World~
Hello World~
...
這里需要注意的是:必須在定時器執(zhí)行函數(shù)內部重復構造定時器,因為定時器構造后只執(zhí)行1次,必須循環(huán)調用。
另外,在上面的代碼中,我們其實還可以看到:threading.Timer(5.5, timerFunc),定時器間隔單位是秒,可以是浮點數(shù),如5.5,0.9等,在執(zhí)行函數(shù) timerFunc 內部和外部中給的值可以不同。如上例中第一次執(zhí)行 timerFunc 是3秒后,后面的都是10.5秒后執(zhí)行。
接下來,我們再看看如何再一定時間結束定時功能。我們可以使用cancel停止定時器的工作,如下例:
import threading
def timerFunc():
print('Hello World~')
global timer
timer = threading.Timer(10.5, timerFunc)
timer.start()
timer = threading.Timer(3, timerFunc)
timer.start()
time.sleep(60)
timer.cancel()
上面的代碼表示:在定時器按照一定時間執(zhí)行后,執(zhí)行過程耗時60秒后停止定時操作功能,退出。顯示結果為:
Hello World~
Hello World~
Hello World~
Hello World~
Hello World~
...
Process finished with exit code 0
到此這篇關于利用Python實現(xiàn)定時程序的方法的文章就介紹到這了,更多相關Python 定時程序內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
您可能感興趣的文章:- python實現(xiàn)簡單的聊天小程序
- 基于Python+Pyqt5開發(fā)一個應用程序
- Python如何實現(xiàn)的簡單購物車程序
- 結合Python網絡爬蟲做一個今日新聞小程序