扫一扫
关注公众号
kafka中的层级时间轮是一个存储定时任务的环形队列,底层使用数组实现,数据中每个元素是一个定时任务双向链表TimerTaskList,链表中的值才是真正的定时任务TimerTaskEntry。
如果直接使用数组来存储定时任务,当精度要求是秒级时,数组长度无限大,内存肯定无法支撑的。其实timeWheel就是一个不存在hash冲突的数据结构,类似我们现实生活中的手表,任何时间中可以落入0-59的槽中,假设有秒级、分钟级、小时级、一天级别、一月级别、一年级别……的精度“手表”,那么很长很长的时间就很方便用多个”手表”直观表示了,也不需要占用太多的内存
public class TimeWheel {
/** 时间槽的刻度 */
private long tickMs;
/** 一圈有多少刻度 */
private int wheelSize;
/** 一圈能表示多长时间 */
private long interval;
/** 槽 */
private Bucket[] buckets;
/** 时间轮指针 */
private long currentTimestamp;
/** 上层时间轮 */
private volatile TimeWheel overflowWheel;
public TimeWheel(long tickMs, int wheelSize, long currentTimestamp) {
this.currentTimestamp = currentTimestamp;
this.tickMs = tickMs;
this.wheelSize = wheelSize;
this.interval = tickMs * wheelSize;
this.buckets = new Bucket[wheelSize];
this.currentTimestamp = currentTimestamp - (currentTimestamp % tickMs);
for (int i = 0; i < wheelSize; i++) {
buckets[i] = new Bucket();
}
}
}
需要注意的是kafka中的定时器只持有第一层timeWheel的引用。通过DelayQueue少量空间换时间的方法,将每个TimerTaskList按过期时间(TaskEntry的最短过期时间,添加时记录)来排序,最短的放在队头。kafka中有一个expireOperationReaper线程,根据过期时间获取到对应的TimerTaskEntry进行操作或者降级到下一层timeWheel
kafka中是使用timeWheel实现延迟操作,比如延迟生产、延迟拉取、延迟删除等,本文不进行深入讨论。此处用客户端主动发送心跳来说明如何使用timeWheel来管理连接-心跳。假设我们的精度是1s,一圈能表示的时间是60s。当有新连接加入时,就将连接放到现在指针位置的上一位,当60秒后客户端还没有心跳过来则连接断开。如果在60秒,此连接有心跳发送,则将这个连接位置重置为当前指针的上一位