Apache Kafka – KIP 32,33 Time Index
32, 33都是和時間相關的,
KIP-32 - Add timestamps to Kafka message
引入版本,0.10.0.0
需要給kafka的message加上時間戳,這樣更方便一些, 比如在做retention,rolling,或getMessageByTime的時候
在config裏麵可以配置,
- message.timestamp.type - This topic level configuration defines the type of timestamp in the messages of a topic. The valid values are CreateTime or LogAppendTime.
- max.message.time.difference.ms - This configuration only works when message.timestamp.type=CreateTime. The broker will only accept messages whose timestamp differs no more than max.message.time.difference.ms from the broker local time.
type,可以定義,是以用戶指定的event time,或是kafka處理的processing time來作為時間戳
如果選用用戶createTime,會產生的問題是,首先這個時間戳不一定是遞增的
max.message.time.difference.ms 的默認值是Long.MaxValue,如果設置該項,會丟棄時間異常的message,即過老或過新的;
然後在ProducerRecord,ConsumerRecord,增加timestamp字段
Add a timestamp field to ProducerRecord and ConsumerRecord.
對於createTime,需要user自己在創建ProducerRecord的時候去設置,timestamp
如果是LogAppendTime,broker會在收到message後自動填上這個timestamp
使用CreateTime和LogAppendTime的區別?
The key decision we made in this KIP is whether use LogAppendTime(Broker Time) or CreateTime(Application Time)
The good things about LogAppendTime are:
- Broker is more robust.
- Monotonically increasing.
- Deterministic behavior for log rolling and retention.
- If CreateTime is required, it can always be put into the message payload.
The good things about CreateTime are:
- More intuitive to users.
- User may want to have log retention based on when the message is created instead of when the message enters the pipeline.
- Immutable after entering the pipeline.
場景不一樣,CreateTime主要是對於分析的場景,
其實message裏麵往往是包含有event time的,所以單純從隊列而言,LogAppendTime就足夠,而且各種邏輯會簡單很多
好,現在message裏麵有時間了,怎麼用?
所以提出,
KIP-33 - Add a time based log index
引入版本,0.10.1.0
動機,
Kafka has a few timestamp based functions, including
- Searching message by timestamp
- Time based log rolling
- Time based log retention.
Currently these operations depend on the create time / modification time of the log segment file. This has a few issues.
- Searching offset by timestamp has very coarse granularity (log segment level), it also does not work well when replica is reassigned.
- The time based log rolling and retention does not work well when replica is reassigned.
之前retention,rolling,或search message都會用到time
而之前的時間都是用的是log segment的創建時間,這樣會有些問題
尤其當發生replica reassigned後,log segment的時間會變成最新,所以就不準確了
這裏會引入,time-based log index,來建立時間索引
所以log目錄下的文件,就從原來的log file,offset index兩個,增加time-based log index,變成3個
The log index works for both LogAppendTime and CreateTime.
Create another index file for each log segment with name SegmentBaseOffset.timeindex. The density of the index is upper bounded by index.interval.bytes configuration.
格式,
Time Index Entry => Timestamp Offset
Timestamp => int64
Offset => int32
創建timeIndex的過程,
- When a new log segment is created, the broker will create a time index file for the log segment.
- The default initial / max size of the time index files is the same as the offset index files. (time index entry is 1.5x of the size of offset index entry, user should set the configuration accordingly).
- Each log segment maintains the largest timestamp so far in that segment. The initial value of the largest timestamp is -1 for a newly created segment.
- When broker receives a message, if the message is not rejected due to timestamp exceeds threshold, the message will be appended to the log. (The timestamp will either be LogAppendTime or CreateTime depending on the configuration)
- When broker appends the message to the log segment, if an offset index entry is inserted, it will also insert a time index entry if the max timestamp so far is greater than the timestamp in the last time index entry.
- For message format v0, the timestamp is always -1, so no time index entry will be inserted when message is appended.
- When the current active segment is rolled out or closed. A time index entry will be inserted into the time index to ensure the last time index entry has the largest timestamp of the log segment.
- If largest timestamp in the segment is non-negative (at least one message has a timestamp), the entry will be(largest_timestamp_in_the_segment -> base_offset_of_the_next_segment)
- If largest timestamp in the segment is -1 (No message in the segment has a timestamp), the time index will be empty and the largest timestamp would be default to the segment last modification time.
The time index is not monotonically increasing for the segments of a partition. Instead, it is only monotonically increasing within each individual time index file. i.e. It is possible that the time index file for a later log segment contains smaller timestamp than some timestamp in the time index file of an earlier segment.
創建新的log segment的同時,會創建time index file,並初始化
當brokers append一條message到log segment時,首先offset index entry 會被插入(index插入都是有inteval的),同時也會插入一條time index entry (當timestamp大於當前TimeIndex中last entry的時間,所以time index是單調遞增的)
並且當active segment發生roll或者closed的時候也會插入一條time index entry(因為index插入有間隔,所以在關閉或新開的時候,需要把last記錄插入index)
並且上麵在插入time index的時候,會判斷時間戳是否當前time index中last entry的時間,所以在單個time index file中,時間是單調遞增的
但是在多個time index file之間,無法保證,即在新的time index file中會出現比較老的時間戳;
如果message用的是createTime,這個問題應該會經常碰到
按上麵的場景,這樣,我們在做rolling,retention的時候都會用這個time index
步驟如下,
Enforce time based log retention
To enforce time based log retention, the broker will check from the oldest segment forward to the latest segment. For each segment, the broker checks the last time index entry of a log segment. The timestamp will be the latest timestamp of the messages in the log segment. So if that timestamp expires, the broker will delete the log segment. The broker will stop at the first segment which is not expired. i.e. the broker will not expire a segment even if it is expired, unless all the older segment has been expired.
broker會從最老的segment開始遍曆,如果該segment的last time index是過期的,就把這個segment刪掉
如果沒有過期,就停止掃描;這樣如果後麵還有過期的segment,也不會被過期掉
所以如果用createTime,會讓log retention變的有點不確定和混亂
Enforce time based log rolling
Currently time based log rolling is based on the creating time of the log segment.
With this KIP, the time based rolling would be changed to only based on the message timestamp.
More specifically, if the first message in the log segment has a timestamp, A new log segment will be rolled out if timestamp in the message about to be appended is greater than the timestamp of the first message in the segment + log.roll.ms.
When message.timestamp.type=CreateTime, user should set max.message.time.difference.ms appropriately together with log.roll.ms to avoid frequent log segment roll out.
During the migration phase, if the first message in a segment does not have a timestamp, the log rolling will still be based on the (current time - create time of the segment).
log segment的rolling也是根據message的時間,所以這裏當message.timestamp.type=CreateTime的時候,必須謹慎的設置max.message.time.difference.ms, 以避免rolling
上述比較好理解,唯一注意的是,如果message.timestamp.type=CreateTime, 以message的timestamp作為依據,會有很多的不確定性
Search message by timestamp
When searching by timestamp, broker will start from the earliest log segment and check the last time index entry. If the timestamp of the last time index entry is greater than the target timestamp, the broker will do binary search on that time index to find the closest index entry and scan the log from there. Otherwise it will move on to the next log segment.
Searching by timestamp will have better accuracy. The guarantees provided are:
- The messages whose timestamp are after the searched timestamp will be consumed.
- Some messages with earlier timestamp might also be consumed.
The OffsetRequest behaves almost the same as before. If timestamp T is set in the OffsetRequest, the first offset in the returned offset sequence means that if user want to consume from T, that is the offset to start with. The guarantee is that any message whose timestamp is greater than T has a bigger offset. i.e. Any message before this offset has a timestamp <T.
The time index granularity does not change the actual timestamp searching granularity. It only affects the time needed for searching.
這個feature非常讚,現在kafka終於可以支持按時間replay
之前隻能是在segment級別去replay
過程就是,
broker會從最早的earliest log segment開始遍曆,check last time index entry,如果小於指定時間,說明這個segment裏麵所有的message都是早於指定時間的,所以skip
繼續直到找到一個segment的last time index entry比指定時間大的,說明這個segment中有我們需要的數據
接著,在該segment的time index中進行二分查找,找到最接近的時間index,從對應的offset開始讀取
這裏會保證,在指定時間後的數據都會被讀取到,但注意之前的數據也是有可能被讀到的
首先,因為隻是找到最接近time index,所以不是精確的,總會多讀點
再者,如果是createTime的時間戳,message不是時間單調遞增的,所以後麵有可能有老的message
在源碼上,
KafkaAPIs.handleOffsetRequestV1
會調用,
fetchOffsetForTimestamp(replicaManager.logManager, topicPartition, timestamp) match { case Some(timestampOffset) if allowed(timestampOffset) => timestampOffset case _ => TimestampOffset(ListOffsetResponse.UNKNOWN_TIMESTAMP, ListOffsetResponse.UNKNOWN_OFFSET) }
而fetchOffsetForTimestamp的實現如下,
private def fetchOffsetForTimestamp(logManager: LogManager, topicPartition: TopicPartition, timestamp: Long) : Option[TimestampOffset] = { logManager.getLog(topicPartition) match { case Some(log) => log.fetchOffsetsByTimestamp(timestamp) case None => throw new UnknownTopicOrPartitionException(s"$topicPartition does not exist on the broker.") } }
logManager.getLog會返回partition所對應的Log,The log is a sequence of LogSegments
Log.fetchOffsetsByTimestamp的實現,
// Cache to avoid race conditions. `toBuffer` is faster than most alternatives and provides // constant time access while being safe to use with concurrent collections unlike `toArray`. val segmentsCopy = logSegments.toBuffer // For the earliest and latest, we do not need to return the timestamp. if (targetTimestamp == ListOffsetRequest.EARLIEST_TIMESTAMP) return Some(TimestampOffset(Record.NO_TIMESTAMP, segmentsCopy.head.baseOffset)) //如果是earliest,返回第一個segment的baseoffset else if (targetTimestamp == ListOffsetRequest.LATEST_TIMESTAMP) return Some(TimestampOffset(Record.NO_TIMESTAMP, logEndOffset)) //如果是latest,返回logEndOffset val targetSeg = { // Get all the segments whose largest timestamp is smaller than target timestamp val earlierSegs = segmentsCopy.takeWhile(_.largestTimestamp < targetTimestamp) // We need to search the first segment whose largest timestamp is greater than the target timestamp if there is one. if (earlierSegs.length < segmentsCopy.length) Some(segmentsCopy(earlierSegs.length)) //返回第一個大於targetTimestamp的segment else None targetSeg.flatMap(_.findOffsetByTimestamp(targetTimestamp))
根據每個segment裏麵的time index中的largestTimestamp去比較
找出比targetTimestamp大的,
LogSegment.findOffsetByTimestamp
是在該time index中,繼續二分查找,找到最接近的timestamp所對應的offset
最後更新:2017-04-07 21:23:50