Skip to content

Latest commit

 

History

History
206 lines (183 loc) · 11 KB

File metadata and controls

206 lines (183 loc) · 11 KB

散列与分层时序轮


上面的单个时序槽内的数据以链表展示,实际上可以采用哈希集,从而可以进一步优化检索、去重的性能。

工业应用实现

Kafka 分层时序轮的代码详解:

/*
 * Source: https://github.com/apache/kafka/blob/b1796ce6d2c04444a62393fbfd7c61811e001d67/server-common/src/main/java/org/apache/kafka/server/util/timer/
 * 分层时序轮在操作在超时之前完成时特别有效。即使所有操作都超时,当定时器中有大量项目时,它仍然具有优势。它的插入成本(包括重新插入)和删除成本分别为 O(m) 和 O(1),m 代表的是时间轮的层数。
 * 大多数实际系统中,时间轮的层数很少超过 3 或 4 层,因此 O(m) 在实践中往往接近于 O(1)。这就是为什么在处理大量定时任务时,特别是当有许多短期任务时,分层时间轮通常比基于优先队列的实现更高效,因为基于优先队列的定时器对于插入和删除都需要 O(logN) 的时间复杂度,其中 N 是队列中的项目数。
 */
public class TimingWheel {
    // 定长数组实现循环队列,来模拟时间轮;
    // 时间轮的每个 bucket(即数组元素)为链表,链表上每个节点对应一个定时任务;
    // 多层时间轮通过单个时间轮的链表来实现。
    private final long tickMs; // tickMs 表示时间轮中每个刻度(或称为"格子")代表的时间长度,单位是毫秒。它是时间轮的最小时间精度。较小的 tickMs 提供更高的精度,但可能增加系统负担(因为需要更频繁地检查时间轮)。
    private final long startMs;
    private final int wheelSize; // wheelSize 定义了时间轮中 bucket 的数量。它决定了时间轮可以直接处理的时间范围。时间轮的总时间范围 = tickMs * wheelSize。较大的 wheelSize 允许时间轮直接处理更长的时间范围,但会占用更多内存。
    private final AtomicInteger taskCounter; // 任务数量,即所有桶(链表)中的节点数量之和
    private final DelayQueue<TimerTaskList> queue; // TimerTaskList 实际上是一个双向链表,用于存储在同一个时间槽(bucket)中的所有定时任务
    private final long interval;
    private final TimerTaskList[] buckets; // 
    private long currentTimeMs;

    private volatile TimingWheel overflowWheel = null; // 对于超出当前时间轮范围的任务,系统会创建一个"溢出轮"(overflow wheel)。这个溢出轮的 tickMs 通常是当前轮的总时间范围。

    TimingWheel(
        long tickMs,
        int wheelSize,
        long startMs,
        AtomicInteger taskCounter,
        DelayQueue<TimerTaskList> queue
    ) {
        this.tickMs = tickMs;
        this.startMs = startMs;
        this.wheelSize = wheelSize;
        this.taskCounter = taskCounter;
        this.queue = queue;
        this.buckets = new TimerTaskList[wheelSize];
        this.interval = tickMs * wheelSize;
        // 向下取整到 tickMs 的倍数
        this.currentTimeMs = startMs - (startMs % tickMs);

        for (int i = 0; i < buckets.length; i++) {
            buckets[i] = new TimerTaskList(taskCounter);
        }
    }

    private synchronized void addOverflowWheel() {
        if (overflowWheel == null) { // Double-Checked Locking 模式
            overflowWheel = new TimingWheel(
                interval,
                wheelSize,
                currentTimeMs,
                taskCounter,
                queue
            );
        }
    }

    public boolean add(TimerTaskEntry timerTaskEntry) {
        long expiration = timerTaskEntry.expirationMs;

        if (timerTaskEntry.cancelled()) {
            return false;
        } else if (expiration < currentTimeMs + tickMs) {
            // 过期时间在第一个桶的范围内,表示已经过期,此时无需加入时间轮
            return false;
        } else if (expiration < currentTimeMs + interval) {
            // 过期时间在当前时间轮能表示的时间范围内,加入到其中一个桶
            // 注意按照这个算法,第一个桶的时间范围是 [c+u,c+u*2),因为 [c,c+u) 范围内被视为已过期
            // 而且第一个桶对应 buckets 的下标并不一定是 0,因为数组只是作为循环队列的存储方式,起始下标无所谓
            long virtualId = expiration / tickMs;
            int bucketId = (int) (virtualId % (long) wheelSize);
            TimerTaskList bucket = buckets[bucketId];
            bucket.add(timerTaskEntry);

            // 设置过期时间,这里也取整了,即可以被 tickMs 整除
            if (bucket.setExpiration(virtualId * tickMs)) { // 仅在新的过期时间和之前的不同才返回 true
                // 由于进行了取整,同一个 bucket 所有节点的过期时间都相同,因此仅在 bucket 的第一个节点加入时才会进入此 if 块
                // 因此保证了每个桶只会被加入一次到 queue 中,queue 存放所有包含定时任务节点的 bucket
                // 借助 DelayQueue 来检测 bucket 是否过期,bucket 时遍历即可取出所有节点
                queue.offer(bucket);
            }
            return true;
        } else {
            // 过期时间在当前时间轮表示的范围之外,即溢出,需要创建高一层时间轮来加入
            if (overflowWheel == null) addOverflowWheel(); // 双重检查上锁的第一层检查
            return overflowWheel.add(timerTaskEntry); // 高一层时间轮也可能无法容纳,因此可能会递归创建更高层级的时间轮
        }
    }

    public void advanceClock(long timeMs) {
        if (timeMs >= currentTimeMs + tickMs) { // timeMs 超过了当前 bucket 的时间范围
            currentTimeMs = timeMs - (timeMs % tickMs); // 修改当前时间,即原先的第一个桶已经失效

            if (overflowWheel != null) overflowWheel.advanceClock(currentTimeMs); // 若存在更高层的时间轮,则也会向前运转
        }
    }
}



public class SystemTimer implements Timer {
    // 全局变量
    private final ExecutorService taskExecutor;
    private final DelayQueue<TimerTaskList> delayQueue;
    private final AtomicInteger taskCounter;
    private final TimingWheel timingWheel;

    // 读写锁,保护时间轮运转(tick)时的相关数据结构
    private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    private final ReentrantReadWriteLock.ReadLock readLock = readWriteLock.readLock();
    private final ReentrantReadWriteLock.WriteLock writeLock = readWriteLock.writeLock();

    public SystemTimer(String executorName) {
        this(executorName, 1, 20, Time.SYSTEM.hiResClockMs());
    }

    public SystemTimer(
        String executorName,
        long tickMs,
        int wheelSize,
        long startMs
    ) {
        this.taskExecutor = Executors.newFixedThreadPool(1,
            runnable -> KafkaThread.nonDaemon("executor-" + executorName, runnable));
        this.delayQueue = new DelayQueue<>();
        this.taskCounter = new AtomicInteger(0);
        this.timingWheel = new TimingWheel(
            tickMs,
            wheelSize,
            startMs,
            taskCounter,
            delayQueue
        );
    }

    public void add(TimerTask timerTask) {
        readLock.lock();
        try {
            addTimerTaskEntry(new TimerTaskEntry(timerTask, timerTask.delayMs + Time.SYSTEM.hiResClockMs()));
        } finally {
            readLock.unlock();
        }
    }

    private void addTimerTaskEntry(TimerTaskEntry timerTaskEntry) {
        if (!timingWheel.add(timerTaskEntry)) {
            if (!timerTaskEntry.cancelled()) {
                taskExecutor.submit(timerTaskEntry.timerTask);
            }
        }
    }

    public boolean advanceClock(long timeoutMs) throws InterruptedException {
        // 尝试在 timeoutMs 内取出完成的任务
        TimerTaskList bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS);
        if (bucket != null) { // 取出了过期的 bucket
            writeLock.lock();
            try {
                while (bucket != null) {
                    // 推进当前时间轮,内部可能会递归推进更高一层时间轮,currentTime 被修改
                    timingWheel.advanceClock(bucket.getExpiration());
                    // 取出 bucket 所有任务节点,将其传入 reinsert 方法。TimerTaskList.flush 方法很简单,用内置锁保护,然后依次删除链表(bucket)所有节点,并应用到函数上,最后重置 expiration 以保证下次有任务加入该 bucket 时,该 bucket 会被加入延迟队列。
                    bucket.flush(this::addTimerTaskEntry);
                    // 非阻塞地取出任务,将当前时点所有过期的 bucket 全部取出
                    bucket = delayQueue.poll();
                }
            } finally {
                writeLock.unlock();
            }
            return true;
        } else {
            return false;
        }
    }

    public int size() {
        return taskCounter.get();
    }

    @Override
    public void close() {
        taskExecutor.shutdown();
    }
}

树实现

一个可行的优化方案是按照树结构以及懒更新来进行实现,保证了查询时间复杂度在这种情况下收敛回 O(1):

  • 类似多叉树或者字典树的结构,比如树的根节点为起始,下面有 12 个月的子节点,然后每个月又有大概 30 个日的子节点,如此类推
  • 可以根据具体场景需要,设计哪个级别的槽才真正存储数据以优化空间(比如一般情况下只在秒级槽或毫秒级槽存储任务数据)
  • 而懒更新在只有需要的时候才创建子节点,如此可以优化无数据的查询以及空间使用率,在任务执行完后,如果槽里没有任务了可以将节点删除、回收
  • 另外在数据量极大且希望支持并发执行任务的场景下,可以考虑再对该时序轮树进行 partition,即多颗不同 root 节点的完整树,按任务的 ID 进行均匀哈希然后选择存进其中一颗树
  • 因为可能一个槽内的任务队列会长,因此需要是 schema less 的数据库,且因为 Redis 单个键的 List 数据就可以支持到 2^32 - 1 个元素,且 Redis 读写性能上都更合适刻度较小的任务调度系统