{{format('0')}} {{format('189')}} {{format('4394')}}

【PY模块】优化 apscheduler 模块,追踪 Job 状态 [ Python极客 ]

大数据男孩 文章 正文

想做一个技术博客,奈何实力不够
分享

明妃

{{nature("2023-01-10 21:47:51")}}更新

ps:好久没发文章,悄悄翻一个

说明

最近在使用apscheduler过程中,为了控制每个 Job 的状态,发现有点困难,所以就尝试改造一下

目标

为 Job 类添加一个 dict 类型的属性,用来保存用户 自定义的数据,例如 当 job 在执行过程发生错误,就可以把状态改成 error 并停止 job

改造

地方一

apscheduler/job.py

# 增加 state 属性  
__slots__ = ('_scheduler', '_jobstore_alias', 'id', 'trigger', 'executor', 'func', 'func_ref',
                 'args', 'kwargs', 'name', 'misfire_grace_time', 'coalesce', 'max_instances',
                 'next_run_time', '__weakref__', 'state')

mark

地方二

# 在返回值中 增加 state
def __getstate__(self):
    ...
    return {
            'version': 1,
            'id': self.id,
            'func': self.func_ref,
            'trigger': self.trigger,
            'executor': self.executor,
            'args': args,
            'kwargs': self.kwargs,
            'name': self.name,
            'misfire_grace_time': self.misfire_grace_time,
            'coalesce': self.coalesce,
            'max_instances': self.max_instances,
            'next_run_time': self.next_run_time,
            'state': self.state ####
        }

地方三

# 在设置 job 属性时,增加 state
def __setstate__(self, state):
    if state.get('version', 1) > 1:
        raise ValueError('Job has version %s, but only version 1 can be handled' %
                         state['version'])

    self.id = state['id']
    self.func_ref = state['func']
    self.func = ref_to_obj(self.func_ref)
    self.trigger = state['trigger']
    self.executor = state['executor']
    self.args = state['args']
    self.kwargs = state['kwargs']
    self.name = state['name']
    self.misfire_grace_time = state['misfire_grace_time']
    self.coalesce = state['coalesce']
    self.max_instances = state['max_instances']
    self.next_run_time = state['next_run_time']
    self.state = state['state'] ####

地方四

# 在 add_job 函数上,添加上 state 参数
def add_job(self, func, trigger=None, args=None, kwargs=None, id=None, name=None, state={}, ####
            misfire_grace_time=undefined, coalesce=undefined, max_instances=undefined,
            next_run_time=undefined, jobstore='default', executor='default',
            replace_existing=False, **trigger_args):

            ...

            # 把 state 参数添加到 job_kwargs 字典
            job_kwargs = {
            'trigger': self._create_trigger(trigger, trigger_args),
            'executor': executor,
            'func': func,
            'args': tuple(args) if args is not None else (),
            'kwargs': dict(kwargs) if kwargs is not None else {},
            'id': id,
            'name': name,
            'misfire_grace_time': misfire_grace_time,
            'coalesce': coalesce,
            'max_instances': max_instances,
            'next_run_time': next_run_time,
            'state': state ####
        }

地方五

# 在参数修复函数里,添加上 state ,并进行检测
def _modify(self, **changes):

    ...

    if 'state' in changes:
    state = changes.pop('state')
    if not isinstance(state, dict):
        raise TypeError('state must be a dict')
    approved['state'] = state

验证

在以上五个地方添加上,就可以实现为 每个 job 开辟一个用户自定义属性的存储

添加任务

可以看到 job 能够正常获取到 state

mark

获取任务

获取任务时 state 也是存在的

mark

当任务发生异常时

按照官网的方案,监控任务状态需要添加监听

# 调度监听
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.events import EVENT_JOB_ERROR, EVENT_JOB_MISSED, \
    EVENT_JOB_ADDED, EVENT_JOB_REMOVED, EVENT_JOB_EXECUTED, EVENT_JOB_MODIFIED

scheduler = BackgroundScheduler(timezone='Asia/Shanghai')
def apscheduler_listener(event):
    if event.code == EVENT_JOB_ERROR:
        # 任务 发送错误时
        job = scheduler.get_job(event.job_id)
        job.state['state'] = 'error'
        job.pause()
        logger.error(f'job id: {event.job_id} traceback: {event.traceback}')

scheduler.add_listener(apscheduler_listener,
                       EVENT_JOB_ADDED | EVENT_JOB_REMOVED | EVENT_JOB_MODIFIED | EVENT_JOB_EXECUTED | EVENT_JOB_ERROR | EVENT_JOB_MISSED)
scheduler.start()

可以看到当任务发生错误时,就可以改变任务状态,并停止该任务

mark

评论 0
0
{{userInfo.data?.nickname}}
{{userInfo.data?.email}}
TOP 2
Spark 2.0 单机模式与集群模式 安装

{{nature('2020-01-02 16:47:07')}} {{format('12501')}}人已阅读

TOP 3
Office 2016 Pro Plus 激活

{{nature('2019-12-11 20:43:10')}} {{format('9386')}}人已阅读

TOP 4
Linux上 MySQL 开启远程登陆的两种方法

{{nature('2019-12-26 17:20:52')}} {{format('7374')}}人已阅读

TOP 5
Linux 安装 MySQL 5.7

{{nature('2019-12-26 16:03:55')}} {{format('4853')}}人已阅读

目录

标签云

模块

一言

# {{hitokoto.data.from || '来自'}} #
{{hitokoto.data.hitokoto || '内容'}}
作者:{{hitokoto.data.from_who || '作者'}}
自定义UI
配色方案

侧边栏