APScheduler的使用是怎么样的-创新互联

今天就跟大家聊聊有关APScheduler的使用是怎么样的,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。

成都创新互联总部坐落于成都市区,致力网站建设服务有成都做网站、成都网站建设、网络营销策划、网页设计、网站维护、公众号搭建、成都小程序开发、软件开发等为企业提供一整套的信息化建设解决方案。创造真正意义上的网站建设,为互联网品牌在互动行销领域创造价值而不懈努力!

  1.简介

  APScheduler 是一款Python开发的定时任务工具, 跨平台运行, 不依赖Linux系统的crontab服务, 在windows上也可以运行

  官方文档的地址是 https://apscheduler.readthedocs.io/en/latest/index.html

  简单介绍

  APScheduler具有四种组件

  触发器(triggers) 指定定时任务的执行的时机

  存储器(job stores) 可以定时持久化存储, 可以保存在数据库中或redis

  # 存储在redis中

  from apscheduler.jobstores.redis import RedisJobStore

  # 存储在mongo中

  from apscheduler.jobstores.mongodb import MongoDBJobStore

  # 存储在数据库中

  from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore

  执行器(executors) 在定时任务执行时, 进程或者线程的方式执行任务

  调度器(schedulers)

  # 以后台的方式运行

  from apscheduler.schedulers.background import BackgroundScheduler

  # 以阻塞的方式运行, 前台运行

  from apscheduler.schedulers.background import BlockingScheduler

  对添加的任务可以做持久保存

  2.安装

  pip install apscheduler

  3. 触发器 Trigger

  date在特定的时间日期执行

  from datetime import date

  from apscheduler.schedulers.blocking import BlockingScheduler

  sched = BlockingScheduler()

  def my_job(text):

  print(text)

  # 在2019年11月6日00:00:00执行

  sched.add_job(my_job, 'date', run_date=date(2019, 11, 6))

  # 在2019年11月6日16:30:05, 可以指定运行的详细时间

  sched.add_job(my_job, 'date', run_date=datetime(2009, 11, 6, 16, 30, 5))

  # 运行时间也可以是字符串的形式

  sched.add_job(my_job, 'date', run_date='2009-11-06 16:30:05', args=['text])

  # 立即执行

  sched.add_job(my_job, 'date')

  sched.start()

  interval:以固定的时间间隔运行作业时使用

  weeks (int) – 间隔的周数

  days (int) – 间隔的天数

  hours (int) – 间隔的小时

  minutes (int) –间隔的分钟

  seconds (int) – 间隔的秒

  start_date (datetime|str) – 间隔时间的起点

  end_date (datetime|str) – 间隔时间的结束点

  timezone (datetime.tzinfo|str) – 时区

  jitter (int|None) – 将作业执行 延迟的时间

  from datetime import datetime

  # 每两小时执行一次

  sched.add_job(job_function, 'interval', hours=2)

  # 在2018年10月10日09:30:00 到2019年6月15日11:00:00的时间内,每两小时执行一次

  sched.add_job(job_function, 'interval', hours=2, start_date='2018-10-10 09:30:00', end_date='2019-06-15 11:00:00')

  cron:在一天中的特定时间定期运行作业时使用

  常见的参数

  year (int|str) – 4位数的年份

  month (int|str) – month (1-12)

  day (int|str) – day (1-31)

  week (int|str) – ISO week (1-53)

  day_of_week (int|str) –工作日的编号或名称(0-6或周一,周二,周三,周四,周五,周六,周日)

  hour (int|str) – 小时(0-23)

  minute (int|str) – 分钟 (0-59)

  second (int|str) – 秒 (0-59)

  start_date (datetime|str) –最早触发的日期/时间(包括)

  end_date (datetime|str) – 结束触发的日期/时间(包括)

  timezone (datetime.tzinfo|str) – 时区

  jitter (int|None) – 将执行作业延迟几秒执行

  常见的表达式类型

  # 在6、7、8、11、12月的第三个周五的00:00, 01:00, 02:00和03:00 执行

  sched.add_job(job_function, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3')

  # 在2014年5月30日前的周一到周五的5:30执行

  sched.add_job(job_function, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2014-05-30')

  # 执行的方式 用装饰器的形式, 每个月的最后一个星期日执行

  @sched.scheduled_job('cron', id='my_job_id', day='last sun')

  def some_decorated_task():

  print("I am printed at 00:00:00 on the last Sunday of every month!")

  # 可以使用标准的crontab表达式执行

  sched.add_job(job_function, CronTrigger.from_crontab('0 0 1-15 may-aug *'))

  # 延迟120秒执行

  sched.add_job(job_function, 'cron', hour='*', jitter=120)

  calendarinterval:在一天的特定时间以日历为基础的间隔运行作业时使用

  参数和 interval 中的参数设置相同

  from datetime import datetime

  from apscheduler.schedulers.blocking import BlockingScheduler

  def job_function():

  print("Hello World")

  sched = BlockingScheduler()

  # 每个月的15:36:00 执行这个任务

  sched.add_job(job_function, 'calendarinterval', months=1, hour=15, minute=36)

  # 从今天开始 每两个月的 15点36分执行, 时间范围是 2019-6-16到 2020-3-26

  sched.add_job(job_function, 'calendarinterval', months=2, start_date='2019-06-16',

  end_date='2020-03-16', hour=15, minute=36)

  sched.start()

  4. 储存器

  REDIS_CONF = {

  "password": "xxxxx",

  "host": "192.168.137.120",

  "port": 6379,

  "db": 0}

  from apscheduler.jobstores.redis import RedisJobStore

  from apscheduler.jobstores.mongodb import MongoDBJobStore

  from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore

  # 存储器

  job_stores = {

  # 使用redis存储

  'redis': RedisJobStore(jobs_key=jobs_key, run_times_key=run_times_key, **REDIS_CONF),

  # 使用mongo存储

  'mongo': MongoDBJobStore(),

  # 数据库存储

  'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')

  }

  # 执行器

  executors = {

  'default': ThreadPoolExecutor(20), # 20个线程

  'processpool': ProcessPoolExecutor(5) # 5个进程

  }

  job_defaults = {

  'coalesce': False, # 相同任务触发多次

  'max_instances': 3 # 每个任务最多同时触发三次

  }

  # 使用配置, 启动

  scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)

  5. 执行器 在定时任务该执行时,以进程或线程方式执行任务

  # 线程的方式执行

  from apscheduler.executors.pool import ThreadPoolExecutor

  executors = {

  'default': ThreadPoolExecutor(20) # 最多20个线程同时执行

  }

  scheduler = BackgroundScheduler(executors=executors)

  # 进程的方式

  executors = {

  'default': ProcessPoolExecutor(5) # 最多5个进程同时执行

  }

  6.调度器

  BlockingScheduler: 作为独立进程时使用

  from apscheduler.schedulers.blocking import BlockingScheduler

  scheduler = BlockingScheduler()

  scheduler.start()

  # 此处程序会发生阻塞复制代码

  BackgroundScheduler 后台运行, 在框架中使用

  from apscheduler.schedulers.background import BackgroundScheduler

  scheduler = BackgroundScheduler()

  scheduler.start()

  # 此处程序不会发生阻塞复制代码

  AsyncIOScheduler : 当你的程序使用了asyncio的时候使用。

  GeventScheduler : 当你的程序使用了gevent的时候使用。

  TornadoScheduler : 当你的程序基于Tornado的时候使用。

  TwistedScheduler : 当你的程序使用了Twisted的时候使用

  QtScheduler : 如果你的应用是一个Qt应用的时候可以使用。

  7. 配置的三中方法

  方法1

  from pytz import utc

  from apscheduler.schedulers.background import BackgroundScheduler

  from apscheduler.jobstores.mongodb import MongoDBJobStore

  from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore

  from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor

  jobstores = {

  'mongo': MongoDBJobStore(),

  'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')

  }

  executors = {

  'default': ThreadPoolExecutor(20), # 大线程数

  'processpool': ProcessPoolExecutor(5) # 大进程数

  }

  job_defaults = {

  'coalesce': False,

  'max_instances': 3 # 同一个任务启动实例的大个数

  }

  # 配置的使用方式

  scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)郑州妇科检查哪家好 http://www.zzkdfk.com/

  方法2

  from apscheduler.schedulers.background import BackgroundScheduler

  # 使用字典的形式添加配置

  scheduler = BackgroundScheduler({

  'apscheduler.jobstores.mongo': {

  'type': 'mongodb'

  },

  'apscheduler.jobstores.default': {

  'type': 'sqlalchemy',

  'url': 'sqlite:///jobs.sqlite'

  },

  'apscheduler.executors.default': {

  'class': 'apscheduler.executors.pool:ThreadPoolExecutor',

  'max_workers': '20'

  },

  'apscheduler.executors.processpool': {

  'type': 'processpool',

  'max_workers': '5'

  },

  'apscheduler.job_defaults.coalesce': 'false',

  'apscheduler.job_defaults.max_instances': '3',

  'apscheduler.timezone': 'UTC',

  })

  方法3

  from pytz import utc

  from apscheduler.schedulers.background import BackgroundScheduler

  from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore

  from apscheduler.executors.pool import ProcessPoolExecutor

  jobstores = {

  'mongo': {'type': 'mongodb'},

  'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')

  }

  executors = {

  'default': {'type': 'threadpool', 'max_workers': 20},

  'processpool': ProcessPoolExecutor(max_workers=5)

  }

  job_defaults = {

  'coalesce': False,

  'max_instances': 3

  }

  scheduler = BackgroundScheduler()

  # 使用调度器对象的 configure属性增加 存储器, 执行器 存储器 的配置

  scheduler.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)

  8. 定时任务启动

  scheduler.start()

  对于BlockingScheduler ,程序会阻塞在这,防止退出,作为独立进程时使用。

  对于BackgroundScheduler,可以在应用程序中使用。不再以单独的进程使用。

  9. 任务管理

  方式1

  job = scheduler.add_job(myfunc, 'interval', minutes=2) # 添加任务

  job.remove() # 删除任务

  job.pause() # 暂定任务

  job.resume() # 恢复任务

  job.shutdown() # 关闭调度

  job.shutdown(wait=False) # 不等待正在运行的任务

  方式2

  scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id') # 添加任务

  scheduler.remove_job('my_job_id') # 删除任务

  scheduler.pause_job('my_job_id') # 暂定任务

  scheduler.resume_job('my_job_id') # 恢复任务

  修改调度, 修改调度的配置属性

  job.modify(max_instances=6, name='Alternate name')

  # 更改触发器

  scheduler.reschedule_job('my_job_id', trigger='cron', minute='*/5')

  获取作业列表 get_jobs() 方法, 返回的是Job实例列表

  10.日志的使用

  项目中没有使用日志记录,

  import logging

  logging.basicConfig()

  logging.getLogger('apscheduler').setLevel(logging.DEBUG)

  集成到项目中的日志中

  logger = logging.getLogger("django")

  ......

  scheduler = BackgroundScheduler(jobstores=job_stores, executors=executors, job_defaults=job_defaults)

  scheduler._logger = logger

  11.完整的例子

  REDIS_CONF = {

  "password": "xxxxx",

  "host": "192.168.137.120",

  "port": 6379,

  "db": 0}

  logger = logging.getLogger("django")

  jobs_key = 'collection_api_apscheduler.jobs'

  run_times_key = 'collection_api_apscheduler.run_times'

  job_stores = {

  'default': RedisJobStore(jobs_key=jobs_key, run_times_key=run_times_key, **REDIS_CONF)

  }

  executors = {

  'default': {'type': 'threadpool', 'max_workers': 60}

  }

  job_defaults = {

  'coalesce': True, # 相同任务同时触发多次时,只运行一次

  'max_instances': 3,

  'misfire_grace_time': 30, # 过期30秒依然执行该任务

  }

  scheduler = BackgroundScheduler(jobstores=job_stores, executors=executors, job_defaults=job_defaults)

  scheduler._logger = logger

  # 如果持久化的调度器中作业列表, 调度器继续执行

  if scheduler.get_jobs():

  scheduler.resume()

  # 添加定时任务

  scheduler.add_job(handle_news_task, 'date', id='handle_news_task', replace_existing=True)

  scheduler.add_job(......)

  scheduler.start()

看完上述内容,你们对APScheduler的使用是怎么样的有进一步的了解吗?如果还想了解更多知识或者相关内容,请关注创新互联-成都网站建设公司行业资讯频道,感谢大家的支持。


名称栏目:APScheduler的使用是怎么样的-创新互联
分享链接:http://pcwzsj.com/article/ipecc.html