百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术教程 > 正文

如何让你的Python程序,定时定点地去执行任务?

mhr18 2024-12-05 15:08 30 浏览 0 评论

apscheduler 的使用

??我们项目中总是避免不了要使用一些定时任务,比如说最近的项目,用户点击报名考试以后需要在考试日期临近的时候推送小程序消息提醒到客户微信上,翻了翻 fastapi 中的实现,虽然方法和包也不少,但是要不就是太重了(比如需要再开服务,还要依赖 redis,都不好用),虽然也可以使用 time 模块的 time.sleep()机上 fastapi 的后台任务变相实现,但是相对简单的功能还行,复杂点的代码起来就麻烦了,所以还是专人专事找个负责这个额的包吧。找来找去发现 APScheduler 就挺适合,代码简单,实现效果也很好,这里做个记录!

安装

pip install apscheduler

主要组成部分

概念性东西,混个脸熟,代码比这些定义好理解。

触发器(trigger)包含调度逻辑,每一个作业有它自己的触发器,用于决定接下来哪一个作业会运行。除了他们自己初始配置意外,触发器完全是无状态的。说人话就是你指定那种方式触发当前的任务。

干货主要有:

① 200 多本 Python 电子书(和经典的书籍)应该有

② Python标准库资料(最全中文版)

③ 项目源码(四五十个有趣且可靠的练手项目及源码)

④ Python基础入门、爬虫、网络开发、大数据分析方面的视频(适合小白学习)

⑤ Python学习路线图(告别不入流的学习)
私信小编01即可获取大量Python学习资源

类型

解释

DateTrigger

到期执行(到xxxx年x月x日 x时x分x秒执行) 对应DateTrigger

IntervalTrigger

间隔执行(每5秒执行一次)

CronTrigger

一个crontab类型的条件(这个比较复杂,比如周一到周四的4-5点每5秒执行一次)

作业存储(job store)存储被调度的作业,默认的作业存储是简单地把作业保存在内存中,其他的作业存储是将作业保存在数据库中。一个作业的数据讲在保存在持久化作业存储时被序列化,并在加载时被反序列化。调度器不能分享同一个作业存储。

Jobstore在scheduler中初始化,另外也可通过scheduler的add_jobstore动态添加Jobstore。每个jobstore
都会绑定一个alias,scheduler在Add Job时,根据指定的jobstore在scheduler中找到相应的jobstore,并
将job添加到jobstore中。

Jobstore主要是通过pickle库的loads和dumps【实现核心是通过python的__getstate__和__setstate__重写
实现】,每次变更时将Job动态保存到存储中,使用时再动态的加载出来,作为存储的可以是redis,也可以
是数据库【通过sqlarchemy这个库集成多种数据库】,也可以是mongodb等
目前APScheduler支持的Jobstore:

MemoryJobStore
MongoDBJobStore
RedisJobStore
RethinkDBJobStore
SQLAlchemyJobStore
ZooKeeperJobStore

执行器(executor)处理作业的运行,他们通常通过在作业中提交制定的可调用对象到一个线程或者进城池来进行。当作业完成时,执行器将会通知调度器。

- 说人话就是添加任务时候用它来包装的,executor的种类会根据不同的调度来选择,如果选择AsyncIO作为调度的库,那么选择AsyncIOExecutor,如果选择tornado作为调度的库,选择TornadoExecutor,如果选择启动进程作为调度,选择ThreadPoolExecutor或者ProcessPoolExecutor都可以Executor的选择需要根据实际的scheduler来选择不同的执行器
目前APScheduler支持的Executor:
AsyncIOExecutor
GeventExecutor
ThreadPoolExecutor
ProcessPoolExecutor
TornadoExecutor
TwistedExecutor

调度器(scheduler)是其他的组成部分。你通常在应用只有一个调度器,应用的开发者通常不会直接处理作业存储、调度器和触发器,相反,调度器提供了处理这些的合适的接口。配置作业存储和执行器可以在调度器中完成,例如添加、修改和移除作业.

Scheduler是APScheduler的核心,所有相关组件通过其定义。scheduler启动之后,将开始按照配置的任务进行调度。
除了依据所有定义Job的trigger生成的将要调度时间唤醒调度之外。当发生Job信息变更时也会触发调度。

scheduler可根据自身的需求选择不同的组件,如果是使用AsyncIO则选择AsyncIOScheduler,使用tornado则
选择TornadoScheduler。
目前APScheduler支持的Scheduler:

AsyncIOScheduler
BackgroundScheduler
BlockingScheduler
GeventScheduler
QtScheduler
TornadoScheduler
TwistedScheduler

简单应用

import time
from apscheduler.schedulers.blocking import BlockingScheduler # 引入后台

def my_job():
    print time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))

sched = BlockingScheduler()
sched.add_job(my_job, 'interval', seconds=5)
sched.start()

完整代码

# trigeers 触发器
# job stores job 存储
# executors 执行器
# schedulers 调度器

from pytz import utc
from sqlalchemy import func

from apscheduler.schedulers.background import BackgroundScheduler,AsyncIOScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ProcessPoolExecutor


jobstores = {
    # 可以配置多个存储
    #'mongo': {'type': 'mongodb'}, 
    'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')  # SQLAlchemyJobStore指定存储链接
}
executors = {
    'default': {'type': 'threadpool', 'max_workers': 20},     # 最大工作线程数20
    'processpool': ProcessPoolExecutor(max_workers=5)         # 最大工作进程数为5
}
job_defaults = {
    'coalesce': False,   # 关闭新job的合并,当job延误或者异常原因未执行时
    'max_instances': 3   # 并发运行新job默认最大实例多少
}
scheduler = BackgroundScheduler()

# .. do something else here, maybe add jobs etc.

scheduler.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc) # utc作为调度程序的时区


import os
import time

def print_time(name):
    print(f'{name} - {time.ctime()}')

def add_job(job_id, func, args, seconds):
    """添加job"""
    print(f"添加间隔执行任务job - {job_id}")
    scheduler.add_job(id=job_id, func=func, args=args, trigger='interval', seconds=seconds)

def add_coun_job(job_id, func, args, start_time):
    """添加job"""
    print(f"添加一次执行任务job - {job_id}")
    scheduler.add_job(id=job_id, func=func, args=args, trigger='date',timezone='Asia/Shanghai', run_date=start_time)
    # scheduler.add_job(func=print_time, trigger='date',timezone='Asia/Shanghai', run_date=datetime(2022, 2, 19, 17, 57, 0).astimezone(), args=['text2'])

def remove_job(job_id):
    """移除job"""
    scheduler.remove_job(job_id)
    print(f"移除job - {job_id}")

def pause_job(job_id):
    """停止job"""
    scheduler.pause_job(job_id)
    print(f"停止job - {job_id}")

def resume_job(job_id):
    """恢复job"""
    scheduler.resume_job(job_id)
    print(f"恢复job - {job_id}")

def get_jobs():
    """获取所有job信息,包括已停止的"""
    res = scheduler.get_jobs()
    print(f"所有job - {res}")

def print_jobs():
    print(f"详细job信息")
    scheduler.print_jobs()

def start():
    """启动调度器"""
    scheduler.start()

def shutdown():
    """关闭调度器"""
    scheduler.shutdown()

if __name__ == '__main__':
    scheduler = BackgroundScheduler()
    # start()
    # print('Press Ctrl+{0} to exit \n'.format('Break' if os.name == 'nt' else 'C'))
    # add_job('job_A', func=print_time, args=("A", ), seconds=1)
    # add_job('job_B', func=print_time, args=("B", ), seconds=2)
    # time.sleep(6)
    # pause_job('job_A') # 停止a
    # get_jobs()   #得到所有job
    # time.sleep(6)
    # print_jobs()
    # resume_job('job_A')
    # time.sleep(6)
    # remove_job('job_A')
    # time.sleep(6)
    from datetime import datetime
    import pytz
    start()
    
    date_temp = datetime(2022, 2, 19, 17, 30, 5)
    # scheduler.add_job(print_time, 'date', run_date=datetime.now(pytz.timezone('America/Manaus')), args=['text'])
    # scheduler.add_job(print_time, 'date',timezone='Asia/Shanghai', run_date=datetime(2022, 2, 19, 17, 57, 0).astimezone(), args=['text2'])
    add_coun_job(job_id="job_C",func=print_time,args=('一次性执行任务',),start_time=datetime(2022, 2, 19, 18, 4, 0).astimezone())
    time.sleep(130)
    try:
        shutdown()
    except RuntimeError:
        pass

相关推荐

【推荐】一个开源免费、AI 驱动的智能数据管理系统,支持多数据库

如果您对源码&技术感兴趣,请点赞+收藏+转发+关注,大家的支持是我分享最大的动力!!!.前言在当今数据驱动的时代,高效、智能地管理数据已成为企业和个人不可或缺的能力。为了满足这一需求,我们推出了这款开...

Pure Storage推出统一数据管理云平台及新闪存阵列

PureStorage公司今日推出企业数据云(EnterpriseDataCloud),称其为组织在混合环境中存储、管理和使用数据方式的全面架构升级。该公司表示,EDC使组织能够在本地、云端和混...

对Java学习的10条建议(对java课程的建议)

不少Java的初学者一开始都是信心满满准备迎接挑战,但是经过一段时间的学习之后,多少都会碰到各种挫败,以下北风网就总结一些对于初学者非常有用的建议,希望能够给他们解决现实中的问题。Java编程的准备:...

SQLShift 重大更新:Oracle→PostgreSQL 存储过程转换功能上线!

官网:https://sqlshift.cn/6月,SQLShift迎来重大版本更新!作为国内首个支持Oracle->OceanBase存储过程智能转换的工具,SQLShift在过去一...

JDK21有没有什么稳定、简单又强势的特性?

佳未阿里云开发者2025年03月05日08:30浙江阿里妹导读这篇文章主要介绍了Java虚拟线程的发展及其在AJDK中的实现和优化。阅前声明:本文介绍的内容基于AJDK21.0.5[1]以及以上...

「松勤软件测试」网站总出现404 bug?总结8个原因,不信解决不了

在进行网站测试的时候,有没有碰到过网站崩溃,打不开,出现404错误等各种现象,如果你碰到了,那么恭喜你,你的网站出问题了,是什么原因导致网站出问题呢,根据松勤软件测试的总结如下:01数据库中的表空间不...

Java面试题及答案最全总结(2025版)

大家好,我是Java面试陪考员最近很多小伙伴在忙着找工作,给大家整理了一份非常全面的Java面试题及答案。涉及的内容非常全面,包含:Spring、MySQL、JVM、Redis、Linux、Sprin...

数据库日常运维工作内容(数据库日常运维 工作内容)

#数据库日常运维工作包括哪些内容?#数据库日常运维工作是一个涵盖多个层面的综合性任务,以下是详细的分类和内容说明:一、数据库运维核心工作监控与告警性能监控:实时监控CPU、内存、I/O、连接数、锁等待...

分布式之系统底层原理(上)(底层分布式技术)

作者:allanpan,腾讯IEG高级后台工程师导言分布式事务是分布式系统必不可少的组成部分,基本上只要实现一个分布式系统就逃不开对分布式事务的支持。本文从分布式事务这个概念切入,尝试对分布式事务...

oracle 死锁了怎么办?kill 进程 直接上干货

1、查看死锁是否存在selectusername,lockwait,status,machine,programfromv$sessionwheresidin(selectsession...

SpringBoot 各种分页查询方式详解(全网最全)

一、分页查询基础概念与原理1.1什么是分页查询分页查询是指将大量数据分割成多个小块(页)进行展示的技术,它是现代Web应用中必不可少的功能。想象一下你去图书馆找书,如果所有书都堆在一张桌子上,你很难...

《战场兄弟》全事件攻略 一般事件合同事件红装及隐藏职业攻略

《战场兄弟》全事件攻略,一般事件合同事件红装及隐藏职业攻略。《战场兄弟》事件奖励,事件条件。《战场兄弟》是OverhypeStudios制作发行的一款由xcom和桌游为灵感来源,以中世纪、低魔奇幻为...

LoadRunner(loadrunner录制不到脚本)

一、核心组件与工作流程LoadRunner性能测试工具-并发测试-正版软件下载-使用教程-价格-官方代理商的架构围绕三大核心组件构建,形成完整测试闭环:VirtualUserGenerator(...

Redis数据类型介绍(redis 数据类型)

介绍Redis支持五种数据类型:String(字符串),Hash(哈希),List(列表),Set(集合)及Zset(sortedset:有序集合)。1、字符串类型概述1.1、数据类型Redis支持...

RMAN备份监控及优化总结(rman备份原理)

今天主要介绍一下如何对RMAN备份监控及优化,这里就不讲rman备份的一些原理了,仅供参考。一、监控RMAN备份1、确定备份源与备份设备的最大速度从磁盘读的速度和磁带写的带度、备份的速度不可能超出这两...

取消回复欢迎 发表评论: