松花皮蛋的黑板报
  • 分享在京东工作的技术感悟,还有JAVA技术和业内最佳实践,大部分都是务实的、能看懂的、可复现的

扫一扫
关注公众号

Kafka中的时间轮算法

博客首页文章列表 松花皮蛋me 2019-03-24 12:16

一、定义

kafka中的层级时间轮是一个存储定时任务的环形队列,底层使用数组实现,数据中每个元素是一个定时任务双向链表TimerTaskList,链表中的值才是真正的定时任务TimerTaskEntry。

二、如何表示无限的时间

如果直接使用数组来存储定时任务,当精度要求是秒级时,数组长度无限大,内存肯定无法支撑的。其实timeWheel就是一个不存在hash冲突的数据结构,类似我们现实生活中的手表,任何时间中可以落入0-59的槽中,假设有秒级、分钟级、小时级、一天级别、一月级别、一年级别……的精度“手表”,那么很长很长的时间就很方便用多个”手表”直观表示了,也不需要占用太多的内存

public class TimeWheel {

/** 时间槽的刻度 */
private long tickMs;

/** 一圈有多少刻度 */
private int wheelSize;

/** 一圈能表示多长时间 */
private long interval;

/** 槽 */
private Bucket[] buckets;

/** 时间轮指针 */
private long currentTimestamp;

/** 上层时间轮 */
private volatile TimeWheel overflowWheel;

public TimeWheel(long tickMs, int wheelSize, long currentTimestamp) {
    this.currentTimestamp = currentTimestamp;
    this.tickMs = tickMs;
    this.wheelSize = wheelSize;
    this.interval = tickMs * wheelSize;
    this.buckets = new Bucket[wheelSize];
    this.currentTimestamp = currentTimestamp - (currentTimestamp % tickMs);

    for (int i = 0; i < wheelSize; i++) {
        buckets[i] = new Bucket();
    }
}
}

三、如何推进任务

需要注意的是kafka中的定时器只持有第一层timeWheel的引用。通过DelayQueue少量空间换时间的方法,将每个TimerTaskList按过期时间(TaskEntry的最短过期时间,添加时记录)来排序,最短的放在队头。kafka中有一个expireOperationReaper线程,根据过期时间获取到对应的TimerTaskEntry进行操作或者降级到下一层timeWheel

四、应用

kafka中是使用timeWheel实现延迟操作,比如延迟生产、延迟拉取、延迟删除等,本文不进行深入讨论。此处用客户端主动发送心跳来说明如何使用timeWheel来管理连接-心跳。假设我们的精度是1s,一圈能表示的时间是60s。当有新连接加入时,就将连接放到现在指针位置的上一位,当60秒后客户端还没有心跳过来则连接断开。如果在60秒,此连接有心跳发送,则将这个连接位置重置为当前指针的上一位