這篇文章主要講解了“Python實(shí)現(xiàn)定時(shí)任務(wù)之a(chǎn)pscheduler怎么使用”,文中的講解內(nèi)容簡(jiǎn)單清晰,易于學(xué)習(xí)與理解,下面請(qǐng)大家跟著小編的思路慢慢深入,一起來(lái)研究和學(xué)習(xí)“Python實(shí)現(xiàn)定時(shí)任務(wù)之a(chǎn)pscheduler怎么使用”吧!
創(chuàng)新互聯(lián)于2013年成立,是專(zhuān)業(yè)互聯(lián)網(wǎng)技術(shù)服務(wù)公司,擁有項(xiàng)目網(wǎng)站建設(shè)、網(wǎng)站制作網(wǎng)站策劃,項(xiàng)目實(shí)施與項(xiàng)目整合能力。我們以讓每一個(gè)夢(mèng)想脫穎而出為使命,1280元天元做網(wǎng)站,已為上家服務(wù),為天元各地企業(yè)和個(gè)人服務(wù),聯(lián)系電話:18982081108
來(lái)個(gè)簡(jiǎn)單的例子看看apscheduler是如何使用的。
#encoding:utf-8
from apscheduler.schedulers.blocking import BlockingScheduler
import datetime
def sch_test():
now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print('時(shí)間:{}, 測(cè)試apscheduler'.format(now))
task = BlockingScheduler()
task.add_job(func=sch_test, trigger='cron', second='*/10')
task.start()
上述例子很簡(jiǎn)單,我們首先要定義一個(gè)apscheduler的對(duì)象,然后add_job添加任務(wù),最后start開(kāi)啟任務(wù)就行了。
例子是每隔10秒運(yùn)行一次sch_test任務(wù),運(yùn)行結(jié)果如下:
時(shí)間:2022-10-08 15:16:30, 測(cè)試apscheduler
時(shí)間:2022-10-08 15:16:40, 測(cè)試apscheduler
時(shí)間:2022-10-08 15:16:50, 測(cè)試apscheduler
時(shí)間:2022-10-08 15:17:00, 測(cè)試apscheduler
如果我們要在執(zhí)行任務(wù)函數(shù)時(shí)攜帶參數(shù),只要在add_job函數(shù)中添加args就行,比如task.add_job(func=sch_test, args=('a'), trigger='cron', second='*/10')。
上面例子中我們初步了解到如何使用apschedulerl了,接下來(lái)需要知道apscheduler的設(shè)計(jì)框架。apscheduler有四個(gè)主要模塊,分別是:觸發(fā)器triggers、任務(wù)存儲(chǔ)器job_stores、執(zhí)行器executors、調(diào)度器schedulers。
1. 觸發(fā)器triggers:
觸發(fā)器指的是任務(wù)指定的觸發(fā)方式,例子中我們用的是“cron”方式。我們可以選擇cron、date、interval中的一個(gè)。
cron表示的是定時(shí)任務(wù),類(lèi)似linux crontab,在指定的時(shí)間觸發(fā)。
可用參數(shù)如下:
除此之外,我們還可用表達(dá)式類(lèi)型去設(shè)置cron。比如常用的有:
使用方法示例,在每天7點(diǎn)20分執(zhí)行一次:
task.add_job(func=sch_test, args=('定時(shí)任務(wù)',), trigger='cron',
hour='7', minute='20')
date表示具體到某個(gè)時(shí)間的一次性任務(wù);
使用方法示例:
# 使用run_date指定運(yùn)行時(shí)間
task.add_job(func='sch_test', trigger='date', run_date=datetime.datetime(2022 ,10 , 8, 16, 1, 30))
# 或者用next_run_time
task.add_job(func=sch_test,trigger='date', next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=3))
interval表示的是循環(huán)任務(wù),指定一個(gè)間隔時(shí)間,每過(guò)間隔時(shí)間執(zhí)行一次。
interval可設(shè)置如下的參數(shù):
使用方法示例,每隔3秒執(zhí)行一次sch_test任務(wù):
task.add_job(func=sch_test, args=('循環(huán)任務(wù)',), trigger='interval', seconds=3)。
來(lái)個(gè)例子把3種觸發(fā)器都使用一遍:
# encoding:utf-8
from apscheduler.schedulers.blocking import BlockingScheduler
import datetime
def sch_test(job_type):
now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print('時(shí)間:{}, {}測(cè)試apscheduler'.format(now, job_type))
task = BlockingScheduler()
task.add_job(func=sch_test, args=('一次性任務(wù)',),trigger='date', next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=3))
task.add_job(func=sch_test, args=('定時(shí)任務(wù)',), trigger='cron', second='*/5')
task.add_job(func=sch_test, args=('循環(huán)任務(wù)',), trigger='interval', seconds=3)
task.start()
打印部分結(jié)果:
時(shí)間:2022-10-08 15:45:49, 一次性任務(wù)測(cè)試apscheduler
時(shí)間:2022-10-08 15:45:49, 循環(huán)任務(wù)測(cè)試apscheduler
時(shí)間:2022-10-08 15:45:50, 定時(shí)任務(wù)測(cè)試apscheduler
時(shí)間:2022-10-08 15:45:52, 循環(huán)任務(wù)測(cè)試apscheduler
時(shí)間:2022-10-08 15:45:55, 定時(shí)任務(wù)測(cè)試apscheduler
時(shí)間:2022-10-08 15:45:55, 循環(huán)任務(wù)測(cè)試apscheduler
時(shí)間:2022-10-08 15:45:58, 循環(huán)任務(wù)測(cè)試apscheduler
通過(guò)代碼示例和結(jié)果展示,我們可清晰的知道不同觸發(fā)器的使用區(qū)別。
2. 任務(wù)存儲(chǔ)器job_stores
顧名思義,任務(wù)存儲(chǔ)器是存儲(chǔ)任務(wù)的地方,默認(rèn)都是存儲(chǔ)在內(nèi)存中。我們也可自定義存儲(chǔ)方式,比如將任務(wù)存到MySQL中。這里有以下幾種選擇:
通常默認(rèn)存儲(chǔ)在內(nèi)存即可,但若程序故障重啟的話,會(huì)重新拉取任務(wù)運(yùn)行了,如果你對(duì)任務(wù)的執(zhí)行要求高,那么可以選擇其他的存儲(chǔ)器。
使用SQLAlchemyJobStore存儲(chǔ)器示例:
from apscheduler.schedulers.blocking import BlockingScheduler
def sch_test(job_type):
now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print('時(shí)間:{}, {}測(cè)試apscheduler'.format(now, job_type))
sched = BlockingScheduler()
# 使用mysql存儲(chǔ)任務(wù)
sql_url = 'mysql+pymysql://root:root@localhost:3306/db_name?charset=utf8'
sched.add_jobstore('sqlalchemy',url=sql_url)
# 添加任務(wù)
sched.add_job(func=sch_test, args=('定時(shí)任務(wù)',), trigger='cron', second='*/5')
sched.start()
3. 執(zhí)行器executors
執(zhí)行器的功能就是將任務(wù)放到線程池或進(jìn)程池中運(yùn)行。有以下幾種選擇:
默認(rèn)是ThreadPoolExecutor, 常用的也就是第線程和進(jìn)程池執(zhí)行器。如果應(yīng)用是CPU密集型操作,可用ProcessPoolExecutor來(lái)執(zhí)行。
4. 調(diào)度器schedulers
調(diào)度器屬于apscheduler的核心,它扮演著統(tǒng)籌整個(gè)apscheduler系統(tǒng)的角色,存儲(chǔ)器、執(zhí)行器、觸發(fā)器在它的調(diào)度下正常運(yùn)行。調(diào)度器有以下幾個(gè):
不是特定場(chǎng)景下,我們最常用的是BlockingScheduler調(diào)度器。
異常監(jiān)聽(tīng)
定時(shí)任務(wù)在運(yùn)行時(shí),若出現(xiàn)錯(cuò)誤,需要設(shè)置監(jiān)聽(tīng)機(jī)制,我們通常結(jié)合logging模塊記錄錯(cuò)誤信息。
使用示例:
from apscheduler.schedulers.blocking import BlockingScheduler
import datetime
from apscheduler.events import EVENT_JOB_EXECUTED , EVENT_JOB_ERROR
import logging
# logging日志配置打印格式及保存位置
logging.basicConfig(level=logging.INFO,
format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
filename='sche.log',
filemode='a')
def log_listen(event):
if event.exception :
print ( '任務(wù)出錯(cuò),報(bào)錯(cuò)信息:{}'.format(event.exception))
else:
print ( '任務(wù)正常運(yùn)行...' )
def sch_test(job_type):
now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print('時(shí)間:{}, {}測(cè)試apscheduler'.format(now, job_type))
print(1/0)
sched = BlockingScheduler()
# 使用mysql存儲(chǔ)任務(wù)
sql_url = 'mysql+pymysql://root:root@localhost:3306/db?charset=utf8'
sched.add_jobstore('sqlalchemy',url=sql_url)
# 添加任務(wù)
sched.add_job(func=sch_test, args=('定時(shí)任務(wù)',), trigger='cron', second='*/5')
# 配置任務(wù)執(zhí)行完成及錯(cuò)誤時(shí)的監(jiān)聽(tīng)
sched.add_listener(log_listen, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
# 配置日志監(jiān)聽(tīng)
sched._logger = logging
sched.start()
apscheduler的封裝使用
上面介紹了apscheduler框架的主要模塊,我們基本能掌握怎樣使用apscheduler了。下面就來(lái)封裝一下apscheduler吧,以后要用直接在這份代碼上修改就行了。
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
from apscheduler.events import EVENT_JOB_EXECUTED , EVENT_JOB_ERROR
import logging
import logging.handlers
import os
import datetime
class LoggerUtils():
def init_logger(self, logger_name):
# 日志格式
formatter = logging.Formatter('%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s')
log_obj = logging.getLogger(logger_name)
log_obj.setLevel(logging.INFO)
# 設(shè)置log存儲(chǔ)位置
path = '/data/logs/'
filename = '{}{}.log'.format(path, logger_name)
if not os.path.exists(path):
os.makedirs(path)
# 設(shè)置日志按照時(shí)間分割
timeHandler = logging.handlers.TimedRotatingFileHandler(
filename,
when='D', # 按照什么維度切割, S:秒,M:分,H:小時(shí),D:天,W:周
interval=1, # 多少天切割一次
backupCount=10 # 保留幾天
)
timeHandler.setLevel(logging.INFO)
timeHandler.setFormatter(formatter)
log_obj.addHandler(timeHandler)
return log_obj
class Scheduler(LoggerUtils):
def __init__(self):
# 執(zhí)行器設(shè)置
executors = {
'default': ThreadPoolExecutor(10), # 設(shè)置一個(gè)名為“default”的ThreadPoolExecutor,其worker值為10
'processpool': ProcessPoolExecutor(5) # 設(shè)置一個(gè)名為“processpool”的ProcessPoolExecutor,其worker值為5
}
self.scheduler = BlockingScheduler(timezone="Asia/Shanghai", executors=executors)
# 存儲(chǔ)器設(shè)置
# 這里使用sqlalchemy存儲(chǔ)器,將任務(wù)存儲(chǔ)在mysql
sql_url = 'mysql+pymysql://root:root@localhost:3306/db?charset=utf8'
self.scheduler.add_jobstore('sqlalchemy',url=sql_url)
def log_listen(event):
if event.exception:
# 日志記錄
self.scheduler._logger.error(event.traceback)
# 配置任務(wù)執(zhí)行完成及錯(cuò)誤時(shí)的監(jiān)聽(tīng)
self.scheduler.add_listener(log_listen, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
# 配置日志監(jiān)聽(tīng)
self.scheduler._logger = self.init_logger('sche_test')
def add_job(self, *args, **kwargs):
"""添加任務(wù)"""
self.scheduler.add_job(*args, **kwargs)
def start(self):
"""開(kāi)啟任務(wù)"""
self.scheduler.start()
# 測(cè)試任務(wù)
def sch_test(job_type):
now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print('時(shí)間:{}, {}測(cè)試apscheduler'.format(now, job_type))
print(1/0)
# 添加任務(wù),開(kāi)啟任務(wù)
sched = Scheduler()
# 添加任務(wù)
sched.add_job(func=sch_test, args=('定時(shí)任務(wù)',), trigger='cron', second='*/5')
# 開(kāi)啟任務(wù)
sched.start()
感謝各位的閱讀,以上就是“Python實(shí)現(xiàn)定時(shí)任務(wù)之a(chǎn)pscheduler怎么使用”的內(nèi)容了,經(jīng)過(guò)本文的學(xué)習(xí)后,相信大家對(duì)Python實(shí)現(xiàn)定時(shí)任務(wù)之a(chǎn)pscheduler怎么使用這一問(wèn)題有了更深刻的體會(huì),具體使用情況還需要大家實(shí)踐驗(yàn)證。這里是創(chuàng)新互聯(lián),小編將為大家推送更多相關(guān)知識(shí)點(diǎn)的文章,歡迎關(guān)注!
新聞名稱(chēng):Python實(shí)現(xiàn)定時(shí)任務(wù)之a(chǎn)pscheduler怎么使用
文章分享:http://www.ekvhdxd.cn/article28/jsjccp.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供服務(wù)器托管、品牌網(wǎng)站制作、商城網(wǎng)站、外貿(mào)網(wǎng)站建設(shè)、靜態(tài)網(wǎng)站、網(wǎng)站建設(shè)
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)