- https://www.cnblogs.com/frankcui/p/17160317.html
- http://www.cs.columbia.edu/~nahum/w6998/papers/sosp87-timing-wheels.pdf
- https://blog.acolyer.org/2015/11/23/hashed-and-hierarchical-timing-wheels/
- https://www.confluent.io/blog/apache-kafka-purgatory-hierarchical-timing-wheels/
- https://www.youtube.com/watch?v=rvZaS3CXicE
- https://www.lpnote.com/2017/11/15/hashed-and-hierarchical-timing-wheels/
上面的单个时序槽内的数据以链表展示,实际上可以采用哈希集,从而可以进一步优化检索、去重的性能。
Kafka 分层时序轮的代码详解:
- https://bewaremypower.github.io/2020/02/04/Kafka%E6%BA%90%E7%A0%81%E9%98%85%E8%AF%BB11-%E6%97%B6%E9%97%B4%E8%BD%AETimingWheel/
- https://bewaremypower.github.io/2020/02/05/Kafka%E6%BA%90%E7%A0%81%E9%98%85%E8%AF%BB12-%E9%AB%98%E6%80%A7%E8%83%BD%E5%AE%9A%E6%97%B6%E5%99%A8SystemTimer/
/*
* Source: https://github.com/apache/kafka/blob/b1796ce6d2c04444a62393fbfd7c61811e001d67/server-common/src/main/java/org/apache/kafka/server/util/timer/
* 分层时序轮在操作在超时之前完成时特别有效。即使所有操作都超时,当定时器中有大量项目时,它仍然具有优势。它的插入成本(包括重新插入)和删除成本分别为 O(m) 和 O(1),m 代表的是时间轮的层数。
* 大多数实际系统中,时间轮的层数很少超过 3 或 4 层,因此 O(m) 在实践中往往接近于 O(1)。这就是为什么在处理大量定时任务时,特别是当有许多短期任务时,分层时间轮通常比基于优先队列的实现更高效,因为基于优先队列的定时器对于插入和删除都需要 O(logN) 的时间复杂度,其中 N 是队列中的项目数。
*/
public class TimingWheel {
// 定长数组实现循环队列,来模拟时间轮;
// 时间轮的每个 bucket(即数组元素)为链表,链表上每个节点对应一个定时任务;
// 多层时间轮通过单个时间轮的链表来实现。
private final long tickMs; // tickMs 表示时间轮中每个刻度(或称为"格子")代表的时间长度,单位是毫秒。它是时间轮的最小时间精度。较小的 tickMs 提供更高的精度,但可能增加系统负担(因为需要更频繁地检查时间轮)。
private final long startMs;
private final int wheelSize; // wheelSize 定义了时间轮中 bucket 的数量。它决定了时间轮可以直接处理的时间范围。时间轮的总时间范围 = tickMs * wheelSize。较大的 wheelSize 允许时间轮直接处理更长的时间范围,但会占用更多内存。
private final AtomicInteger taskCounter; // 任务数量,即所有桶(链表)中的节点数量之和
private final DelayQueue<TimerTaskList> queue; // TimerTaskList 实际上是一个双向链表,用于存储在同一个时间槽(bucket)中的所有定时任务
private final long interval;
private final TimerTaskList[] buckets; //
private long currentTimeMs;
private volatile TimingWheel overflowWheel = null; // 对于超出当前时间轮范围的任务,系统会创建一个"溢出轮"(overflow wheel)。这个溢出轮的 tickMs 通常是当前轮的总时间范围。
TimingWheel(
long tickMs,
int wheelSize,
long startMs,
AtomicInteger taskCounter,
DelayQueue<TimerTaskList> queue
) {
this.tickMs = tickMs;
this.startMs = startMs;
this.wheelSize = wheelSize;
this.taskCounter = taskCounter;
this.queue = queue;
this.buckets = new TimerTaskList[wheelSize];
this.interval = tickMs * wheelSize;
// 向下取整到 tickMs 的倍数
this.currentTimeMs = startMs - (startMs % tickMs);
for (int i = 0; i < buckets.length; i++) {
buckets[i] = new TimerTaskList(taskCounter);
}
}
private synchronized void addOverflowWheel() {
if (overflowWheel == null) { // Double-Checked Locking 模式
overflowWheel = new TimingWheel(
interval,
wheelSize,
currentTimeMs,
taskCounter,
queue
);
}
}
public boolean add(TimerTaskEntry timerTaskEntry) {
long expiration = timerTaskEntry.expirationMs;
if (timerTaskEntry.cancelled()) {
return false;
} else if (expiration < currentTimeMs + tickMs) {
// 过期时间在第一个桶的范围内,表示已经过期,此时无需加入时间轮
return false;
} else if (expiration < currentTimeMs + interval) {
// 过期时间在当前时间轮能表示的时间范围内,加入到其中一个桶
// 注意按照这个算法,第一个桶的时间范围是 [c+u,c+u*2),因为 [c,c+u) 范围内被视为已过期
// 而且第一个桶对应 buckets 的下标并不一定是 0,因为数组只是作为循环队列的存储方式,起始下标无所谓
long virtualId = expiration / tickMs;
int bucketId = (int) (virtualId % (long) wheelSize);
TimerTaskList bucket = buckets[bucketId];
bucket.add(timerTaskEntry);
// 设置过期时间,这里也取整了,即可以被 tickMs 整除
if (bucket.setExpiration(virtualId * tickMs)) { // 仅在新的过期时间和之前的不同才返回 true
// 由于进行了取整,同一个 bucket 所有节点的过期时间都相同,因此仅在 bucket 的第一个节点加入时才会进入此 if 块
// 因此保证了每个桶只会被加入一次到 queue 中,queue 存放所有包含定时任务节点的 bucket
// 借助 DelayQueue 来检测 bucket 是否过期,bucket 时遍历即可取出所有节点
queue.offer(bucket);
}
return true;
} else {
// 过期时间在当前时间轮表示的范围之外,即溢出,需要创建高一层时间轮来加入
if (overflowWheel == null) addOverflowWheel(); // 双重检查上锁的第一层检查
return overflowWheel.add(timerTaskEntry); // 高一层时间轮也可能无法容纳,因此可能会递归创建更高层级的时间轮
}
}
public void advanceClock(long timeMs) {
if (timeMs >= currentTimeMs + tickMs) { // timeMs 超过了当前 bucket 的时间范围
currentTimeMs = timeMs - (timeMs % tickMs); // 修改当前时间,即原先的第一个桶已经失效
if (overflowWheel != null) overflowWheel.advanceClock(currentTimeMs); // 若存在更高层的时间轮,则也会向前运转
}
}
}
public class SystemTimer implements Timer {
// 全局变量
private final ExecutorService taskExecutor;
private final DelayQueue<TimerTaskList> delayQueue;
private final AtomicInteger taskCounter;
private final TimingWheel timingWheel;
// 读写锁,保护时间轮运转(tick)时的相关数据结构
private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private final ReentrantReadWriteLock.ReadLock readLock = readWriteLock.readLock();
private final ReentrantReadWriteLock.WriteLock writeLock = readWriteLock.writeLock();
public SystemTimer(String executorName) {
this(executorName, 1, 20, Time.SYSTEM.hiResClockMs());
}
public SystemTimer(
String executorName,
long tickMs,
int wheelSize,
long startMs
) {
this.taskExecutor = Executors.newFixedThreadPool(1,
runnable -> KafkaThread.nonDaemon("executor-" + executorName, runnable));
this.delayQueue = new DelayQueue<>();
this.taskCounter = new AtomicInteger(0);
this.timingWheel = new TimingWheel(
tickMs,
wheelSize,
startMs,
taskCounter,
delayQueue
);
}
public void add(TimerTask timerTask) {
readLock.lock();
try {
addTimerTaskEntry(new TimerTaskEntry(timerTask, timerTask.delayMs + Time.SYSTEM.hiResClockMs()));
} finally {
readLock.unlock();
}
}
private void addTimerTaskEntry(TimerTaskEntry timerTaskEntry) {
if (!timingWheel.add(timerTaskEntry)) {
if (!timerTaskEntry.cancelled()) {
taskExecutor.submit(timerTaskEntry.timerTask);
}
}
}
public boolean advanceClock(long timeoutMs) throws InterruptedException {
// 尝试在 timeoutMs 内取出完成的任务
TimerTaskList bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS);
if (bucket != null) { // 取出了过期的 bucket
writeLock.lock();
try {
while (bucket != null) {
// 推进当前时间轮,内部可能会递归推进更高一层时间轮,currentTime 被修改
timingWheel.advanceClock(bucket.getExpiration());
// 取出 bucket 所有任务节点,将其传入 reinsert 方法。TimerTaskList.flush 方法很简单,用内置锁保护,然后依次删除链表(bucket)所有节点,并应用到函数上,最后重置 expiration 以保证下次有任务加入该 bucket 时,该 bucket 会被加入延迟队列。
bucket.flush(this::addTimerTaskEntry);
// 非阻塞地取出任务,将当前时点所有过期的 bucket 全部取出
bucket = delayQueue.poll();
}
} finally {
writeLock.unlock();
}
return true;
} else {
return false;
}
}
public int size() {
return taskCounter.get();
}
@Override
public void close() {
taskExecutor.shutdown();
}
}
一个可行的优化方案是按照树结构以及懒更新来进行实现,保证了查询时间复杂度在这种情况下收敛回 O(1):
- 类似多叉树或者字典树的结构,比如树的根节点为起始,下面有 12 个月的子节点,然后每个月又有大概 30 个日的子节点,如此类推
- 可以根据具体场景需要,设计哪个级别的槽才真正存储数据以优化空间(比如一般情况下只在秒级槽或毫秒级槽存储任务数据)
- 而懒更新在只有需要的时候才创建子节点,如此可以优化无数据的查询以及空间使用率,在任务执行完后,如果槽里没有任务了可以将节点删除、回收
- 另外在数据量极大且希望支持并发执行任务的场景下,可以考虑再对该时序轮树进行 partition,即多颗不同 root 节点的完整树,按任务的 ID 进行均匀哈希然后选择存进其中一颗树
- 因为可能一个槽内的任务队列会长,因此需要是 schema less 的数据库,且因为 Redis 单个键的 List 数据就可以支持到 2^32 - 1 个元素,且 Redis 读写性能上都更合适刻度较小的任务调度系统