开发实战:基于最小堆设计任务调度系统

 ( heap )是一种特殊的树结构,能够快速定位 最大值 或 最小值 ,是实现 堆排序 和 优先队列 的关键。优先队列主要应用在 事件处理 和 任务调度 等场景。接下来,我们以 任务调度 为例,抛砖引玉。

Python中的堆

Python 标准库内置了 优先队列 实现,这就是 heapq 模块。我们知道堆是一种完全二叉树,可以保存于数组中;而 list 对象就是一种典型的动态数组结构!因此, heapq 将堆维护于 list 对象中,而不是提供一种新容器对象。相反,heapq 提供了几个关键操作函数,可直接操作 list 对象:

  • heapify ,将 list 对象转化成堆(调整元素顺序以满足堆性质);
  • heappush ,将新元素压入堆中;
  • heappop ,弹出堆顶元素;
  • etc

创建一个列表对象并将其作为一个堆来使用:

1
heap = []

往堆中压入新元素,被压入元素对象必须 可比较 ,自定义类需要实现 lt 等比较方法:

1
heappush(heap, item)

heapqlist 对象维护成 最小堆 ,因此 堆顶 (树的 根节点 )即为最小值:

1
smallest = top = heap[0]

当然了,我们也可以将最小值从堆中弹出:

1
item = heappop(heap)

古典多线程调度

假设我们接到一个需求——设计定时任务执行系统。定时任务由 JobItem 类抽象,executing_ts 是任务执行时间:

1
2
3
4
5
6
class JobItem:
    
    def __init__(self, executing_ts, job):
        self.executing_ts = executing_ts
        self.job = job
        # ...

初学者可能会想到最简单的多线程方案。系统需要同时处理多个定时任务,每个任务由一个线程来执行不就好了吗?这就是古典多线程模型,实例代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
import time
from threading import Thread

def job_worker(job_item):
    # 先等待执行时间到达
    time.sleep(job_item.executing_ts - time.time())

    # 实际执行任务
    process(job_item.job)
    
def add_job(job_item):
    # 启动一个线程来执行任务
    Thread(target=job_worker, args=(job_item,)).start()
  • job_worker ,工作线程执行函数,线程先睡眠等待执行时间到达( 6 行),然后调用 process 来执行( 8 行);
  • add_job ,添加新定时任务时,启动一个新线程来处理;

这个方案虽然很简洁,但也很鸡肋。一方面,创建、销毁线程的开销很大;另一方面,由于线程需要占用不少资源,而系统能够支持的最大线程数相对有限。假设现在有成千上万的定时任务等待执行,系统能撑得住吗?

调度线程引入

采用多线程方案时,需要合理控制工作线程的 个数 。我们可以将执行时间已到达的任务放进一个 就绪任务队列 ,然后启动若干个工作线程来执行就绪任务。新任务执行时间不定,可能有的是一分钟后执行,有的是一天后才执行。那么,问题就转变成——如何判断任务是否就绪?

这时,我们可以用另一个线程—— 调度线程 来完成这个使命。调度线程不断接收新任务,并在任务到期时将其添加至就绪任务队列。如果我们用另一个队列来保存新任务,那么调度线程便是两个队列间的 任务搬运工 :

  • 新任务队列 ,保存新任务,任务创建后即添加到这个队列;

  • 就绪任务队列 ,保存执行时间已到达的任务;

  • 调度线程 ,订阅 新任务队列 ,当任务时间到达时将其添加至 就绪任务队列 ( 搬运工 );

  • 工作线程 ,从 就绪任务队 列取出任务并执行( 消费者 );

借助 queue 模块,实现方案中的队列只需两行代码:

1
2
3
4
5
6
7
from queue import Queue

# 新任务队列
new_jobs = Queue()

# 就绪任务队列
ready_jobs = Queue()

这样一来,添加新任务时,只需将 JobItem 放入 新任务队列 即可:

1
2
def add_job(job_item):
    new_jobs.put(job_item)

工作线程执行逻辑也很简单,一个永久循环便搞定了:

1
2
3
4
5
6
7
def job_worker():
    while True:
        # 从就绪队列中取任务
    	job_item = ready_jobs.get()

        # 执行任务
    	process(job_item.job)

开始划重点了—— 调度线程 的实现!

由于就绪任务一定是所有任务中执行时间最小的,因此可以用一个 最小堆 来维护任务集。我们希望任务按执行时间排序,因此需要为 JobItem 编写相关比较方法:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
from functools import total_ordering

@total_ordering
class JobItem:
    
    def __init__(self, executing_ts, job):
        self.executing_ts = executing_ts
        self.job = job
        # ...
        
    def __eq__(self, other):
        return self.executing_ts == other.executing_ts
    
    def __lt__(self, other):
        return self.executing_ts < other.executing_ts

注意到,我们只实现了 eq 和 lt 魔术方法,gt 等其他比较方法均由 total_ordering 装饰器代劳。

调度线程只需从新任务队列中取任务并压入最小堆,与此同时检查堆顶任务执行时间是否到达。由于线程需要同时处理两件不同的事情,初学者可能要慌了。不打紧,我们先画一个流程图梳理一下执行逻辑:

线程主体逻辑是一个永久循环,每次循环时:

  1. 先检查堆顶任务,如果执行时间已到,则移到就绪任务队列并进入下次循环;
  2. 等待新任务队列,如有新任务到达,则压入堆中并进入下次循环;
  3. 特别注意,等待新任务时不能永久阻塞,需要根据当前堆顶任务计算等待时间;
  4. 等待超时便进入下次循环再次检查堆顶任务,因此堆中任务不会被耽搁;

理清执行逻辑后,你知道如何实现调度线程了吗?如果想进一步学习,请猛戳 阅读原文 ,获取更多详细信息!

【Python源码剖析】系列文章首发于公众号【小菜学编程】,敬请关注:

【Python源码剖析】系列文章首发于公众号【小菜学编程】,敬请关注: