濮阳杆衣贸易有限公司

主頁 > 知識庫 > Python使用signal定時結(jié)束AsyncIOScheduler任務的問題

Python使用signal定時結(jié)束AsyncIOScheduler任務的問題

熱門標簽:旅游廁所地圖標注怎么弄 宿州電話機器人哪家好 南昌地圖標注 西青語音電銷機器人哪家好 電梯新時達系統(tǒng)外呼顯示e 地圖標注與注銷 百應電話機器人總部 無錫智能外呼系統(tǒng)好用嗎 成都呼叫中心外呼系統(tǒng)哪家強

在使用aiohttp結(jié)合apscheduler的AsyncIOScheduler模擬定點并發(fā)的時候遇到兩個問題

  1. 在調(diào)度器scheduler.start()后,程序直接退出(在Jupiter中任務可以正常啟動)
  2. 如何在指定時間調(diào)用scheduler.shutdown()? (因為程序直接退出了)

原調(diào)試代碼如下:

from datetime import datetime, timedelta

import aiohttp
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
async def get(session):
    url = 'https://httpbin.org/get?a=1'
    async with session.get(url) as res:
        print('get', res.status)
        return await res.text()

async def post(session):
    url = 'https://httpbin.org/post?b=2'
    async with session.post(url) as res:
        print('post', res.status)
        return await res.text()
async def main():
    async with aiohttp.ClientSession() as session:
        await get(session)
        await post(session)

if __name__ == '__main__':
    jobstores = {'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')}
    scheduler = AsyncIOScheduler(jobstores=jobstores)
    for i in range(10):  # 添加10個任務
        job = scheduler.add_job(main, 'date', run_date=datetime.now() + timedelta(seconds=10))
    scheduler.start()

Google后發(fā)現(xiàn)AsyncIOScheduler的使用需要在scheduler啟動后,需要自己調(diào)用asyncio.get_event_loop().run_forever()來啟動協(xié)程任務。
但是一旦run_forever()則就會阻塞至死。除非有KeyboardInterrupt, SystemExit等異常或者強殺來停止其運行。
此時想到使用Python的signal來定時發(fā)送信號,修改后程序如下,可以正常延遲停止(感覺有點像模擬Go的defer)。

# -*- coding: utf-8 -*-
"""
@Time : 2021/7/23
@Auth : hanzhichao
@Desc:
"""
from datetime import datetime, timedelta
import signal
import asyncio

import aiohttp
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore

async def get(session):
    url = 'https://httpbin.org/get?a=1'
    async with session.get(url) as res:
        print('get', res.status)
        return await res.text()

async def post(session):
    url = 'https://httpbin.org/post?b=2'
    async with session.post(url) as res:
        print('post', res.status)
        return await res.text()

async def main():
    async with aiohttp.ClientSession() as session:
        await get(session)
        await post(session)

if __name__ == '__main__':
    jobstores = {'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')}
    scheduler = AsyncIOScheduler(jobstores=jobstores)
    for i in range(10):  # 添加10個任務
        job = scheduler.add_job(main, 'date', run_date=datetime.now() + timedelta(seconds=10))
    scheduler.start()
    signal.alarm(20)  # 20秒后終止程序
    asyncio.get_event_loop().run_forever()  # 永遠運行

到此這篇關(guān)于Python使用signal定時結(jié)束AsyncIOScheduler任務的文章就介紹到這了,更多相關(guān)Python定時結(jié)束AsyncIOScheduler任務內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

您可能感興趣的文章:
  • 將Python腳本打包成MACOSAPP程序過程
  • [項目布局配置]Nosql與PythonWeb-Flask框架組合
  • Python連接Postgres/Mysql/Mongo數(shù)據(jù)庫基本操作大全
  • python生成可執(zhí)行exe控制Microsip自動填寫號碼并撥打功能
  • Python之os模塊案例詳解

標簽:雅安 辛集 濰坊 許昌 贛州 渭南 七臺河 西安

巨人網(wǎng)絡通訊聲明:本文標題《Python使用signal定時結(jié)束AsyncIOScheduler任務的問題》,本文關(guān)鍵詞  Python,使用,signal,定時,結(jié)束,;如發(fā)現(xiàn)本文內(nèi)容存在版權(quán)問題,煩請?zhí)峁┫嚓P(guān)信息告之我們,我們將及時溝通與處理。本站內(nèi)容系統(tǒng)采集于網(wǎng)絡,涉及言論、版權(quán)與本站無關(guān)。
  • 相關(guān)文章
  • 下面列出與本文章《Python使用signal定時結(jié)束AsyncIOScheduler任務的問題》相關(guān)的同類信息!
  • 本頁收集關(guān)于Python使用signal定時結(jié)束AsyncIOScheduler任務的問題的相關(guān)信息資訊供網(wǎng)民參考!
  • 推薦文章
    华阴市| 石渠县| 蒲江县| 屏东市| 涿鹿县| 湛江市| 泸溪县| 杨浦区| 清水县| 吉林省| 枣庄市| 榆中县| 衡阳县| 怀集县| 青河县| 会理县| 油尖旺区| 孝昌县| 出国| 大宁县| 鹰潭市| 拜泉县| 望城县| 师宗县| 金溪县| 邹平县| 隆子县| 临沭县| 桦川县| 北辰区| 海盐县| 白水县| 寻甸| 邢台县| 丹寨县| 枣庄市| 罗定市| 施甸县| 梨树县| 东海县| 灵川县|