堆 ( heap )是一种特殊的树结构,能够快速定位 最大值 或 最小值 ,是实现 堆排序 和 优先队列 的关键。优先队列主要应用在 事件处理 和 任务调度 等场景。接下来,我们以 任务调度 为例,抛砖引玉。
Python中的堆
Python 标准库内置了 优先队列 实现,这就是 heapq 模块。我们知道堆是一种完全二叉树,可以保存于数组中;而 list 对象就是一种典型的动态数组结构!因此, heapq 将堆维护于 list 对象中,而不是提供一种新容器对象。相反,heapq 提供了几个关键操作函数,可直接操作 list 对象:
- heapify ,将 list 对象转化成堆(调整元素顺序以满足堆性质);
- heappush ,将新元素压入堆中;
- heappop ,弹出堆顶元素;
- etc
创建一个列表对象并将其作为一个堆来使用:
|
|
往堆中压入新元素,被压入元素对象必须 可比较 ,自定义类需要实现 lt 等比较方法:
|
|
heapq 将 list 对象维护成 最小堆 ,因此 堆顶 (树的 根节点 )即为最小值:
|
|
当然了,我们也可以将最小值从堆中弹出:
|
|
古典多线程调度
假设我们接到一个需求——设计定时任务执行系统。定时任务由 JobItem 类抽象,executing_ts 是任务执行时间:
|
|
初学者可能会想到最简单的多线程方案。系统需要同时处理多个定时任务,每个任务由一个线程来执行不就好了吗?这就是古典多线程模型,实例代码如下:
|
|
- job_worker ,工作线程执行函数,线程先睡眠等待执行时间到达( 6 行),然后调用 process 来执行( 8 行);
- add_job ,添加新定时任务时,启动一个新线程来处理;
这个方案虽然很简洁,但也很鸡肋。一方面,创建、销毁线程的开销很大;另一方面,由于线程需要占用不少资源,而系统能够支持的最大线程数相对有限。假设现在有成千上万的定时任务等待执行,系统能撑得住吗?
调度线程引入
采用多线程方案时,需要合理控制工作线程的 个数 。我们可以将执行时间已到达的任务放进一个 就绪任务队列 ,然后启动若干个工作线程来执行就绪任务。新任务执行时间不定,可能有的是一分钟后执行,有的是一天后才执行。那么,问题就转变成——如何判断任务是否就绪?
这时,我们可以用另一个线程—— 调度线程 来完成这个使命。调度线程不断接收新任务,并在任务到期时将其添加至就绪任务队列。如果我们用另一个队列来保存新任务,那么调度线程便是两个队列间的 任务搬运工 :
-
新任务队列 ,保存新任务,任务创建后即添加到这个队列;
-
就绪任务队列 ,保存执行时间已到达的任务;
-
调度线程 ,订阅 新任务队列 ,当任务时间到达时将其添加至 就绪任务队列 ( 搬运工 );
-
工作线程 ,从 就绪任务队 列取出任务并执行( 消费者 );
借助 queue 模块,实现方案中的队列只需两行代码:
|
|
这样一来,添加新任务时,只需将 JobItem 放入 新任务队列 即可:
|
|
工作线程执行逻辑也很简单,一个永久循环便搞定了:
|
|
开始划重点了—— 调度线程 的实现!
由于就绪任务一定是所有任务中执行时间最小的,因此可以用一个 最小堆 来维护任务集。我们希望任务按执行时间排序,因此需要为 JobItem 编写相关比较方法:
|
|
注意到,我们只实现了 eq 和 lt 魔术方法,gt 等其他比较方法均由 total_ordering 装饰器代劳。
调度线程只需从新任务队列中取任务并压入最小堆,与此同时检查堆顶任务执行时间是否到达。由于线程需要同时处理两件不同的事情,初学者可能要慌了。不打紧,我们先画一个流程图梳理一下执行逻辑:
线程主体逻辑是一个永久循环,每次循环时:
- 先检查堆顶任务,如果执行时间已到,则移到就绪任务队列并进入下次循环;
- 等待新任务队列,如有新任务到达,则压入堆中并进入下次循环;
- 特别注意,等待新任务时不能永久阻塞,需要根据当前堆顶任务计算等待时间;
- 等待超时便进入下次循环再次检查堆顶任务,因此堆中任务不会被耽搁;
理清执行逻辑后,你知道如何实现调度线程了吗?如果想进一步学习,请猛戳 阅读原文 ,获取更多详细信息!
【Python源码剖析】系列文章首发于公众号【小菜学编程】,敬请关注: