Kafka 爲啥那快?kafka 零複製 Zero-copy 如何實現?

Kafka 採用了兩種零拷貝技術

Kafka 之所以能夠快速地處理大量數據,其中一個重要原因就是它採用了零拷貝(Zero-copy)技術。

Kafka 採用了兩種零拷貝技術來提高性能:mmap 和 sendfile。

Kafka 採用了兩種零拷貝技術, 主要有兩個大的場景:

當然,零拷貝並不是 Kafka 的專利,而是操作系統的能力 ,又比如 Netty,Rocketmq 都用到了零拷貝,這個後面尼恩會詳細給大家做展開的介紹。

不過,零拷貝技術可以減少不必要的數據拷貝次數,從而提高數據傳輸效率,所以,也是面試的絕對重點, 各位看官,尤其是要轉架構的, 一定要好好掌握。

回顧一下 mmap 零複製

mmap 可以把文件映射到進程的虛擬內存空間,實現對文件的讀取和修改而不需要用傳統的 read 和 write 系統調用。這樣可以減少一次數據拷貝,並且不同的虛擬內存地址可以指向同一個物理內存,實現數據共享

mmap (memory-map)

mmap (memory-map) 可以把文件映射到進程的虛擬內存空間。通過對這段內存的讀取和修改,可以實現對文件的讀取和修改,而不需要用 read 和 write 系統調用,但是這一切都需要操作系統在幕後工作(異步處理)。如下圖所示,爲 mmap 實現原理的示意圖。

可以看到,通過 mmap ,用戶進程空間中某一塊虛擬內存與內核中的物理內存(PageCache)形成映射,而這塊物理內存與目標文件的某一塊形成映射。

用戶進程讀取文件的過程不是傳統的 read 系統調用,而是直接訪問的 PageCache,如果沒有數據,系統會把文件的內容讀取過來緩存起來,應該說就是利用的內核中的緩存區。

Java 中的 mmap 底層 是通過 JNI 調用 C , C 語言中的 mmap 函數爲:

void *mmap(void *addr, size_t len, int prot, int flags, int fd, off_t offset);

addr:指定映射的起始地址,通常設爲NULL,由內核來分配
length:代表將文件中映射到內存的部分的長度。
prot:映射區域的保護方式。可以爲以下幾種方式的組合:
    PROT_EXEC 映射區域可被執行
    PROT_READ 映射區域可被讀取
    PROT_WRITE 映射區域可被寫入
    PROT_NONE 映射區域不能存取
flags:映射區的特性標誌位,常用的兩個選項是:
    MAP_SHARD:寫入映射區的數據會複製迴文件,且運行其他映射文件的進程共享
    MAP_PRIVATE:對映射區的寫入操作會產生一個映射區的複製,對此區域的修改不會寫會原文件
fd:要映射到內存中的文件描述符,有open函數打開文件時返回的值。
offset:文件映射的偏移量,通常設置爲0,代表從文件最前方開始對應,offset必須是分頁大小的整數倍。
函數返回值:實際分配的內存的起始地址。

與 mmap 函數成對使用的,是 munmap 函數,它是用來解除映射的函數,

Java 中的 munmap 底層 是通過 JNI 調用 C , C 語言中的 munmap 函數爲:

int munmap(void *start, size_t length)
    
start:映射的起始地址
length:文件中映射到內存的部分的長度
返回值:解除成功返回0,失敗返回-1。

mmap 零拷貝技術通過內存映射文件的方式,減少了數據在用戶空間和內核空間之間的拷貝次數,從而提高了數據傳輸效率。

以下是 mmap 零拷貝的一般流程:

  1. 打開文件:首先,使用open系統調用打開需要進行內存映射的文件,並獲取文件描述符(file descriptor)。

  2. 創建內存映射:通過mmap系統調用將文件的部分或全部內容映射到進程的地址空間。mmap的典型調用如下:

    cvoid *mmap(void *addr, size_t length, int prot, int flags, int fd, off_t offset);
  3. 訪問映射的內存:一旦內存映射建立,進程就可以直接訪問映射的內存區域,就像訪問普通內存一樣。對這部分內存的讀寫操作將直接影響到文件的內容。

  4. 同步數據到磁盤:如果需要確保對映射區域的修改被寫回到文件中,可以使用msync系統調用。這步操作是可選的,取決於應用是否需要立即將數據同步到磁盤。

  5. 解除映射:當不再需要映射時,使用munmap系統調用來解除映射,釋放資源:

    cint munmap(void *start, size_t length);
    • start:映射區域的起始地址。

    • length:映射的長度。

  6. 關閉文件:最後,使用close系統調用關閉文件描述符。

mmap 零拷貝的優勢在於:

尼恩特別提示: mmap 技術在進行文件映射的時候,一般有大小限制,在 1.5GB~2GB 之間

所以,在很多消息中間件,會限制文件的大小。

mmap 技術可以將磁盤文件映射到內存中,用戶通過修改內存就能修改磁盤文件。這樣可以避免將數據從內核態拷貝到用戶態再拷貝回內核態的過程,從而提高數據傳輸效率。

如何通過 mmap 查詢索引找到具體的消息數據

以 kafka_2.13-2.8.0 爲例,分析 Kafka 消息在磁盤上的存儲結構、配置以及如何通過索引找到具體的消息數據。

既然是日誌索引相關的問題,正好以此來分析存儲模塊下的索引文件:

分區目錄

一個分區(Partition)有 1 到多個副本(Replica),是主從結構,

主(Leader)負責處理讀寫請求,從(Follower)只負責同步數據並在主宕機的時候頂替主實現高可用。

在 Kafka 數據目錄下存放着各分區目錄(Partition),名稱格式爲 topic-partitionNo,如 test-0 代表名爲 test 的 Topic 的 0 號分區。分區目錄下存放消息的文件。

分段日誌和索引

Kafka 的消息是分段(Segment)存儲在文件裏的,當達到配置指定的條件就會創建新的分段文件。

每個分段都都對應消息日誌(.log),偏移量索引(.index)和時間索引(.timeindex)三個文件,

文件名爲起始偏移量(Offset),代表這個文件第一條消息的偏移量值。

以下是日誌分段和索引創建的配置項,詳情見 Apache Kafka Broker 配置。

除了 log.index.interval.bytes 隻影響單個索引的創建時機,其他配置都會觸發日誌分段。

qxn3uI

消息日誌與索引關係

Kafka 數據最終都會保持在磁盤上,對於消息有三個關鍵的文件消息日誌(.log),偏移量索引(.index)和時間索引(.timeindex)。

消息日誌保存的是消息的原數據,接收到的生產者(Producer)的消息會以追加的方式順序寫到這個文件中,順序寫效率遠高於隨機寫,減輕了磁盤尋址壓力。這是 Kafka 使用磁盤做存儲卻能保證高性能的原因之一。

每個消息都會有一個自增的偏移量值,從 0 開始,每條消息都遞增這個值,所以偏移量代表即將到來的下一條消息的偏移量值。

Kafka 中索引有偏移量索引和時間索引兩種。

它沒有爲每一條消息建立索引,那樣索引文件會太過於龐大,而是分段建立,所以一個索引只能指明消息所在位置的範圍,最終要在這個範圍遍歷查找。

時間索引指向的是偏移量索引,偏移量索引指向了消息日誌二進制位置。通過時間戳或者偏移量最終都可以定位到消息的具體位置。

可以通過配置參數 log.index.interval.bytes 控制兩個索引間隔的字節數,超過這個大小就建立新索引。這個值越小,索引越密集,查詢快但是文件體積大。

消息日誌(.log)

通過消息日誌(.log)可以看到每條消息具體的內容。

# 只輸出消息日誌描述信息
kafka-dump-log.sh --files /var/kafka-logs/test-0/00000000000000023147.log

# 輸出消息日誌完整信息
kafka-dump-log.sh --files /var/kafka-logs/test-0/00000000000000023147.log --print-data-log

可以看到下圖這個消息日誌起始偏移量(Starting offset)是 23147,代表這個日誌第一條消息的偏移量,這個偏移量同時也是消息日誌和兩個索引文件的文件名。

每 n 條消息組成一批(batch),每一批消息對應有一個描述信息,記錄了這批消息的大小,偏移量範圍 baseOffset 和 lastOffset,位置(position)以及大小(batchSize)等信息。

描述信息下面就是對應這一批具體的消息。如下圖:

偏移量索引(.index)

# 查看偏移量索引內容
kafka-dump-log.sh --files /var/kafka-logs/test-0/00000000000000023147.index

偏移量索引是稀疏結構,每隔一段記錄一條消息的索引。

Offset 指消息的偏移量,position 指這個偏移量的消息所在的一批(batch)消息在. log 中的起始二進制位置。

時間索引(.timeindex)

# 查看時間索引內容
kafka-dump-log.sh --files /var/kafka-logs/test-0/00000000000000023147.timeindex1.2.

時間索引也是稀疏結構,每隔一段記錄一條消息的索引。

時間戳(timestamp)指這條消息的創建時間,Offset 指這個消息的偏移量。

上面這條指令同時會輸出根據時間戳索引查找消息的結果,比如創建時間爲 1632390207745 的消息偏移量爲 23388,這條消息所在那一批消息的起始偏移量(Indexed offset / baseOffset:)爲 23388,終止偏移量(found log offset / lastOffset:)爲 23390,這一批消息一起有 23390 ~ 23388 = 3 條消息。

Kafka 通過 MappedByteBuffer 將索引文件映射到內存中,來加快索引的查詢速度。

位移索引

不同索引類型保存不同的 <Key , Value> 對,對 OffsetIndex 位移索引而言,Key 就是消息的相對位移,Value 保存該消息的日誌段文件中該消息第一個字節的物理文件位置。

  定義:

對於偏移量索引文件,保存的是 <相對偏移量,物理地址> 的對應關係,文件中的相對偏移量是單調遞增的。

  查找:

查詢指定偏移量對應的消息時,使用改進的二分查找算法來快速定位偏移量的位置,

如果指定的偏移量不在索引文件中,則會返回文件中小於指定偏移量的最大偏移量及對應的物理地址,該邏輯通過 OffsetIndex.lookup() 方法實現。

一個參考的 稀疏索引.index文件的內容,大致如下

4lkdNC

假設 ,要尋找 offset 爲 115 位點對應的文件 position,

因爲 115 介於「110-120」之間,因此稀疏索引能夠提供的信息就是,110 需要從 8200 的位置開始往後找,這樣也就粗略定位了 115 的大致 position

  索引項:

偏移量索引文件的索引項結構如下圖所示,每個索引項記錄了相對偏移量 relativeOffset 和對應消息的第一個字節在日誌段文件中的物理地址 position,共佔用 8 個字節。

尼恩提示:本質上, 消息的偏移量(offset)如果是 絕對偏移量, 那是一個 long ,是要佔用 8 個字節滴,那麼,爲啥這裏是四個字節呢?

爲啥?

索引項中沒有直接使用 long 類型絕對偏移量,而改爲只佔用 4 個字節 int 的相對偏移量(relativeOffset=offset-baseOffset),這樣可以減小索引文件佔用的空間。

舉個例子看一下:

爲什麼使用相對偏移量?這樣可以節約存儲空間。每條消息的絕對偏移量佔用 8 個字節,而相對偏移量只佔用 4 個字節(relativeOffset=offset-baseOffset)。

在日誌段文件滾動的條件中,有一個是:追加消息的最大偏移量和當前日誌段的 baseOffset 的差值大於 Int.MaxValue(4 個字節),因爲如果 相對偏移量 大於這個 4 個字節值,就無法存儲相對偏移量了。

所以, kafka 有兩個偏移量:

relativeOffset 相對位移是一個整型,佔用 4 個字節,物理文件位置也是一個整型,同樣佔用 4 個字節,因此總共 8 個字節。

總之,Kafka 中的消息位移值是一個長整型,應該佔用 8 個字節纔對,但是,在保存 OffsetIndex<Key , Value> 對,Kafka 做了一些優化,每個 OffsetIndex 對象在創建時,都已經保存了對應日誌段對象的起始位移,因此保存與起始位移的差值就夠了。

  1. 爲了節省空間,一個索引項節省了 4 字節,想想那些日消息處理數萬億的公司。

  2. 因爲內存資源是很寶貴的,索引項越短,內存中能存儲的索引項就越多,索引項多了直接命中的概率就高了。

通過索引查詢消息過程

偏移量索引和時間戳索引對應的類分別爲:OffsetIndex 和 TimeIndex,其公共的抽象父類爲 AbstractIndex:

與之相關的源碼如下:

  1. AbstractIndex.scala:抽象類,封裝了所有索引的公共操作

  2. OffsetIndex.scala:位移索引,保存了位移值和對應磁盤物理位置的關係

  3. TimeIndex.scala:時間戳索引,保存了時間戳和對應位移值的關係

  4. TransactionIndex.scala:事務索引,啓用 Kafka 事務之後纔會出現這個索引

這裏先介紹 OffsetIndex 位移索引 文件。

1. 索引項大小定義:

//偏移量索引文件索引項override def entrySize = 8
//時間戳索引文件索引項override def entrySize = 12

2. 根據絕對偏移量計算相對偏移量:relativeOffset

def relativeOffset(offset: Long): Int = {

  val relativeOffset = toRelative(offset)

  if (relativeOffset.isEmpty)

    throw new IndexOffsetOverflowException(s"Integer overflow for offset: $offset (${file.getAbsoluteFile})")

  relativeOffset.get

}

relativeOffset 方法內部調用了 toRelative 方法:用給定的偏移量 - 日誌段起始偏移量,如果結果合法則返回

private def toRelative(offset: Long): Option[Int] = {

  val relativeOffset = offset - baseOffset

  if (relativeOffset < 0 || relativeOffset > Int.MaxValue)

    None

  else

    Some(relativeOffset.toInt)

}

3. 將相對偏移量還原成絕對偏移量:parseEntry

偏移量索引:

override protected def parseEntry(buffer: ByteBuffer, n: Int): OffsetPosition = {

  OffsetPosition(baseOffset + relativeOffset(buffer, n), physical(buffer, n))

}

這個方法返回一個 OffsetPosition 類型。

該類有兩個方法,分別返回索引項的 Key 和 Value。

這裏的 parseEntry 方法,就是要構造 OffsetPosition 所需的 Key 和 Value。

Key 是絕對偏移量,根據索引項中的相對偏移量計算,代碼使用 baseOffset + relativeOffset(buffer, n) 的方式將相對偏移量還原成絕對偏移量;

Value 是這個偏移量上消息在日誌段文件中的物理位置,代碼調用 physical 方法計算這個物理位置並把它作爲 Value。

最後,parseEntry 方法把 Key 和 Value 封裝到一個 OffsetPosition 實例中,然後將這個實例返回。

4. 快速定位消息所在的物理文件位置

e.g. 假設要查找偏移量爲 230 的消息?

第一步: 通過跳 表 ,找 分段的 index 文件

Kafka 中存在一個 ConcurrentSkipListMap 來保存在每個日誌分段,

通過跳躍表方式,定位到在 00000000000000000217.index ,

第二步: 通過 改進的二分查找, 找到不大於 相對偏移量的 最大索引項

通過二分法在偏移量索引文件中找到不大於 230-217 =13 的最大索引項,即 offset 12 那欄,

第三步:找日誌文件,找到相對的目標 記錄

從日誌文件物理位置 456 開始,繼續向後查找找到相對偏移量爲 13 的消息。

 def lookup(targetOffset: Long): OffsetPosition = {

    maybeLock(lock) {

      //複製出整個索引映射區

      val idx = mmap.duplicate

      // largestLowerBoundSlotFor  方法底層使用了改進版的二分查找算法尋找對應的槽

      val slot = largestLowerBoundSlotFor(idx, targetOffset, IndexSearchType.KEY)

      // 如果沒找到,返回一個空的位置,即物理文件位置從0開始,表示從頭讀日誌文件  

      // 否則返回slot槽對應的索引項

      if(slot == -1)

        OffsetPosition(baseOffset, 0)

      else

        parseEntry(idx, slot)

    }

  }

從上面 OffsetIndex.scala#lookup()` 的源,可以看到關鍵處有兩點:

AbstractIndex.scala#largestLowerBoundSlotFor()` 的源碼如下:

protected def largestLowerBoundSlotFor(idx: ByteBuffer, target: Long, searchEntity: IndexSearchType): Int =
 indexSlotRangeFor(idx, target, searchEntity)._1

private def indexSlotRangeFor(idx: ByteBuffer, target: Long, searchEntity: IndexSearchType): (Int, Int) = {
// check if the index is empty
if(_entries == 0)
  return (-1, -1)

def binarySearch(begin: Int, end: Int) : (Int, Int) = {
  // binary search for the entry
  var lo = begin
  var hi = end
  while(lo < hi) {
    val mid = (lo + hi + 1) >>> 1
    val found = parseEntry(idx, mid)
    val compareResult = compareIndexEntry(found, target, searchEntity)
    if(compareResult > 0)
      hi = mid - 1
    else if(compareResult < 0)
      lo = mid
    else
      return (mid, mid)
  }
  (lo, if (lo == _entries - 1) -1 else lo + 1)
}

//使用所有索引數據 entry 的總量 _entries 減去熱區數據大小_warmEntries,
// 確定一個熱區索引的起始位置,這樣可以保障只在索引數據的尾部進行二分查找
val firstHotEntry = Math.max(0, _entries - 1 - _warmEntries)
// check if the target offset is in the warm section of the index
if(compareIndexEntry(parseEntry(idx, firstHotEntry), target, searchEntity) < 0) {
  return binarySearch(firstHotEntry, _entries - 1)
}

// check if the target offset is smaller than the least offset
if(compareIndexEntry(parseEntry(idx, 0), target, searchEntity) > 0)
  return (-1, 0)

binarySearch(0, firstHotEntry)
}

AbstractIndex.scala#largestLowerBoundSlotFor()` 的主要邏輯是從索引數據中二分查找確定消息數據在文件中的物理起始點,這裏需要注意索引文件實際進行了冷熱分區,其中關鍵如下:

  1. 使用所有索引數據 entry 的總量 _entries 減去熱區數據大小_warmEntries,確定一個熱區索引的起始位置,這樣可以保障只在索引數據的尾部進行二分查找

  2. 之所以這樣處理,是因爲 Kafka 的索引是在末尾追加寫入的,並且一般寫入的數據很快就會被讀取,數據熱點集中在尾部。索引數據一般都在頁緩存中,而操作系統的內存是有限的,必然要通過類似 LRU 的機制淘汰頁緩存。

  3. 如果每次二分查找都從頭開始,則索引中間部分的數據所在的頁緩存大概率已經被淘汰掉,從而導致缺頁中斷,必須重新從磁盤上讀文件,影響性能

尼恩提示:頁緩存也叫文件緩衝,是文件系統數據在內存中的緩存結構,Kafka 的消息數據存儲也充分利用了頁緩存,如果消息寫入消費速度相當,則消費時大概率直接命中緩存而不經過磁盤 IO,極大提高性能。但是當某個消費者消費速度落後時,可能會導致 Kafka 節點上的頁緩存頻繁切換,拖累整個集羣的性能

相關視頻:《尼恩 Java 硬核架構班第 16 章:RocketMQ 第一部曲:葵花寶典(高性能祕籍)架構師視角解讀 OS 底層的 mmap、pagecache、zerocopy 等底層的底層知識 》 (點擊此鏈接學習)

偏移量索引文件的查找原理:

假設要查找偏移量爲 230 的消息,查找過程如下:

  注意:

改進的二分查找

就 Kafka 而言,索引是在文件末尾追加的寫入的,並且一般寫入的數據立馬就會被讀取。所以數據的熱點集中在尾部。並且操作系統基本上都是用頁爲單位緩存和管理內存的,內存又是有限的,因此會通過類 LRU 機制淘汰內存。

看起來 LRU 非常適合 Kafka 的場景,但是使用標準的二分查找會有缺頁中斷的情況,畢竟二分是跳着訪問的。

簡單的來講,假設某索引佔 page cache 13 頁,此時數據已經寫到了 12 頁。

按照 kafka 訪問的特性,此時訪問的數據都在第 12 頁,因此二分查找的特性,此時緩存頁的訪問順序依次是 0,6,9,11,12。

因爲頻繁被訪問,所以這幾頁一定存在 page cache 中。

當第 12 頁不斷被填充,滿了之後會申請新頁第 13 頁保存索引項,而按照二分查找的特性,此時緩存頁的訪問順序依次是:0,7,10,12。

這 7 和 10 很久沒被訪問到了,很可能已經不再緩存中了,然後需要從磁盤上讀取數據。

註釋說:在他們的測試中,這會導致至少會產生從幾毫秒跳到 1 秒的延遲。

基於以上問題,Kafka 使用了改進版的二分查找,改的不是二分查找的內部,而且把所有索引項分爲熱區和冷區 這個改進可以讓查詢熱數據部分時,遍歷的 Page 永遠是固定的,這樣能避免缺頁中斷。

看到這裏其實我想到了一致性 hash,一致性 hash 相對於普通的 hash 不就是在 node 新增的時候緩存的訪問固定,或者只需要遷移少部分數據。

Log 類採用跳躍表(SkipList)管理 LogSegment 對象

每個 topic 分區對應一個 Log 類對象(一個 broker 節點上只允許存放分區的一個副本,所以從 broker 視角來看一個分區對應一個 Log 類對象),其中包含了一系列隸屬對應 topic 分區的 LogSegment 對象,Log 類採用跳躍表(SkipList)數據結構對這些 LogSegment 對象進行管理。

上圖展示了 LogSegment 在 Log 中基於 SkipList 的組織形式(其中青色小圓圈表示單個 LogSegment 對象)。

寫入索引項的方法

偏移量索引:append

寫入索引項 append 方法的實現, 通過 mmap 實現 idex 文件讀寫的 零複製,流程圖如下

寫入索引項 append 方法的實現, 通過 mmap 實現 idex 文件讀寫的 零複製,代碼如下

 def append(offset: Long, position: Int): Unit = {
    inLock(lock) {
     // 索引文件如果已經寫滿,直接拋出異常
      require(!isFull, "Attempt to append to a full index (size = " + _entries + ").")
    // 要保證待寫入的位移offset比當前索引文件中所存的位移值要大
  // 這主要是爲了維護索引的單調性
      if (_entries == 0 || offset > _lastOffset) {
        trace(s"Adding index entry $offset => $position to ${file.getAbsolutePath}")
        mmap.putInt(relativeOffset(offset))//向mmap寫入相對位移值
        mmap.putInt(position)//向mmap寫入物理文件位置
        _entries += 1//更新索引項個數
        _lastOffset = offset//更新當前索引文件最大位移值
      // 確保寫入索引項格式符合要求
        require(_entries * entrySize == mmap.position(), s"$entries entries but file position in index is ${mmap.position()}.")
      } else {
        throw new InvalidOffsetException(s"Attempt to append an offset ($offset) to position $entries no larger than" +
          s" the last offset appended (${_lastOffset}) to ${file.getAbsolutePath}.")
      }
    }
  }

時間戳索引

TimeIndex 保存的是 <時間戳,相對位移值>,時間戳需要長整型來保存,相對位移值使用 Integer 來保存。因此 TimeIndex 單個索引項需要佔用 12 個字節。

寫入時間戳索引的索引項

def maybeAppend(timestamp: Long, offset: Long, skipFullCheck: Boolean = false): Unit = {
    inLock(lock) {
      if (!skipFullCheck)
  // 索引文件如果已經寫滿,直接拋出異常
        require(!isFull, "Attempt to append to a full time index (size = " + _entries + ").")
    // 這主要是爲了維護索引的單調性
      if (_entries != 0 && offset < lastEntry.offset)
        throw new InvalidOffsetException(s"Attempt to append an offset ($offset) to slot ${_entries} no larger than" +
          s" the last offset appended (${lastEntry.offset}) to ${file.getAbsolutePath}.")
// 這主要是爲了維護索引的單調性
      if (_entries != 0 && timestamp < lastEntry.timestamp)
        throw new IllegalStateException(s"Attempt to append a timestamp ($timestamp) to slot ${_entries} no larger" +
          s" than the last timestamp appended (${lastEntry.timestamp}) to ${file.getAbsolutePath}.")

      if (timestamp > lastEntry.timestamp) {
        trace(s"Adding index entry $timestamp => $offset to ${file.getAbsolutePath}.")
        mmap.putLong(timestamp)//向mmap寫入時間戳
        mmap.putInt(relativeOffset(offset))//向mmap寫入相對位移值
        _entries += 1
        _lastEntry = TimestampOffset(timestamp, offset)
        require(_entries * entrySize == mmap.position(), s"${_entries} entries but file position in index is ${mmap.position()}.")
      }
    }
  }

位移索引和時間戳索引的區別是什麼?

Kafka 中有三大類索引:位移索引、時間戳索引和已中止事務索引。分別對應了. index、.timeindex、.txnindex 文件。

與之相關的源碼如下:

  1. AbstractIndex.scala:抽象類,封裝了所有索引的公共操作

  2. OffsetIndex.scala:位移索引,保存了位移值和對應磁盤物理位置的關係

  3. TimeIndex.scala:時間戳索引,保存了時間戳和對應位移值的關係

  4. TransactionIndex.scala:事務索引,啓用 Kafka 事務之後纔會出現這個索引

sendfile 是最高性能的零複製技術

sendfile 技術可以直接在內核完成輸入和輸出,不需要拷貝到用戶空間再寫出去。這樣可以減少不必要的數據拷貝次數,提高數據傳輸效率。

並且,sendfile 是最高性能的零複製技術。 具體請參見 :《尼恩 Java 硬核架構班第 16 章:RocketMQ 第一部曲:葵花寶典(高性能祕籍)架構師視角解讀 OS 底層的 mmap、pagecache、zerocopy 等底層的底層知識 》 (點擊此鏈接學習)

傳統拷貝,從磁盤讀取文件併發送到網絡的流程

如果您的應用程序要從磁盤讀取文件並通過網絡發送它,則可能會進行一堆不必要的拷貝,以及用戶態 / 內核態的切換。

來一個更復雜的 傳統拷貝,從磁盤讀取文件併發送到網絡的流程圖

一些術語:

在這個例子中,我們有 4 次模式切換(用戶態和內核態之間的切換)和 4 次數據拷貝。

於是,爲了解決這一問題,DMA 技術就出現了,每個 I/O 設備都有自己的 DMA 控制器,通過這個 DMA 控制器,CPU 只需要告訴 DMA 控制器,我們要傳輸什麼數據,從哪裏來,到哪裏去,就可以放心離開了。

後續的實際數據傳輸工作,都會由 DMA 控制器來完成,CPU 不需要參與數據傳輸的工作。

零拷貝,從磁盤讀取文件併發送到網絡的流程

來一個更復雜的 零拷貝,從磁盤讀取文件併發送到網絡的流程圖

爲了減少拷貝,把數據從磁盤直接發向網絡,那 Kafka 在存儲數據的時候,就要保證存儲的數據格式和將要發出的 response 格式一致。

在傳統拷貝模式下,第二步、第三步沒啥意義,因爲 Kafka 沒有對數據做額外處理,只是簡單轉發。那能否從磁盤直接發向網絡呢?答案是肯定的。

通過零拷貝技術,磁盤上的數據還是要先進入 read buffer,然後不用再拷貝到應用程序的緩存區,而是直接拷貝到 NIC buffer,圖上的步驟 2:Appends just file descriptors,只是把文件描述符交給了 Socket buffer,實際數據並沒有拷貝給 Socket buffer。這就是所謂的 scatter-gather 操作(也稱爲 Vectorized I/O),scatter-gather 是僅將 read buffer 數據指針存儲在 socket buffer 中,並讓 DMA 直接從內存讀取數據的行爲。

最終結果如何呢?

Kafka 的 log 文件 與日誌格式的演變

先看一下 Kafka 的 log 文件。 這個關係到 log 文件的 外邊結構。

Kafka 引入了日誌分段 LogSegment 的概念,將 Log 切分爲多個 LogSegment, 一個 LogSegment 一個 log 文件。

Log 中追加消息時是順序寫入的,且只能寫入最後一個 LogSegment,此前的都不能寫入。

每個 LogSegment 對應於磁盤上的一個日誌文件和兩個索引文件 和 其他文件。

偏移量索引文件 (.index) 和時間戳索引文件(.timeindex)。

每個 LogSegment 有基準偏移 baseOffset, 表示當前 LogSegment 中第一條消息的 offset。

每個 LogSegment 還可能會包含 ".delete",".clean" 等臨時文件

接下來,再看一下 Kafka 消息協議。 這個關係到 log 文件的 內部結構。

v0 版本

Kafka 消息格式的第一個版本通常稱爲 v0 版本,在 Kafka 0.10.0 之前都採用的這個消息格式

v1 版本

Kafka 從 0.10.0 版本開始到 0.11.0 版本之前所使用的消息格式版本爲 v1

v1 比 v0 版本就多了一個 timestamp 字段,表示消息的時間戳。

v2 版本

Kafka 0.11.0 版本開始所使用的消息格式版本爲 v2,這個版本的消息相 vO v1 版本而言改動很大,同時還參考了 Protocol Buffer 引入了變長整型( Varints )和 ZigZag 編碼。

第一個特點:v2 版本的 消息壓縮

kafka 將多條消息一起壓縮。v2 版本中消息集稱爲 Record Batch, 而不是先前的 MessageSet, 其內部也包含了一條或多條消息 , 即一個 Record Batch 可能含有 1-N 條消息

消息壓縮 通過參數 compression.type 配置。默認值爲 producer,表示保留生產者使用的壓縮方式。

參數還可以配置爲 gzip、snapp、lz4

第二個特點:v2 版本的 變長字段

Varints 是使用一個或多個字節來序列化整數的一種方法。Record 內部字段大量採用了 Varints 變長字段

消息的批量生產

消息生產端 Producer 這裏沒有太多需要同步的,一言蔽之就是將消息封裝後發送給 Broker 端,不過讀者這裏想強調一下 Record Batch 的概念

在默認情況下,單 Batch 的上限是 16K,一個 Batch 可以存儲 1 條或者多條消息,這個取決於 Producer 端的配置,如果 Producer 設置了黏性分區策略,linger.ms 聚批時間設置足夠長(例如 1000ms),那麼很容易將 Batch 填滿;又或者 linger.ms 配置了默認值(linger.ms=0),那麼聚批將不會被觸發,那一個 Batch 上就只有一條消息。

因此無論怎樣,Record Batch 是消息的載體,也是消息讀取的最小單位(注意不是消息本身,這裏在後文還會提及)

上圖表明瞭,某個 Record Batch 中可能只有一條消息,也有可能存在多條,甚至將 16K 全部填充滿;無論哪種 case,Producer 都是以 Record Batch 粒度將消息發送至 Broker 的

消息的 log 文件存儲

消息的存儲,包括 log 文件和 index 文件 總體邏輯上的關係,映射到實際代碼中在磁盤上的關係則是如下圖所示:

每個分區對應一個 Log 對象,在磁盤中就是一個子目錄,子目錄下面會有多組日誌段即多 Log Segment,每組日誌段包含:消息日誌文件 (以 log 結尾)、位移索引文件 (以 index 結尾)、時間戳索引文件 (以 timeindex 結尾)。

其實還有其它後綴的文件,例,例如. txnindex、.deleted 等等。篇幅有限,暫不提起。

其中 log 文件是用來存儲消息的,而 index 文件則是用來存儲稀疏索引的

注:這裏爲什麼 ndex 文件 要隔 4K 做一次稀疏索引,而不是 3K 或者 5K 呢?

其實這裏主要是與硬件兼容,現在多數廠商的硬件,單次掃數據的大小一般都是 4K 對齊的,很多硬件都提升到了 8K 甚至 16K,稀疏索引設置爲 4K,能保證即便是當前的 Record Batch 只有 1 個字節,後續的內容也能緩存在 Page Cache 中,下次掃描的時候,可以直接從緩存中讀取,而不用掃描磁盤

另外,基於 V2 的存儲版本,消息的查詢都是以 Record Batch 作爲最小粒度查詢的,而 Producer 設置的 Record Batch 的默認值爲 16K,即如果消息攢批合理的話,稀疏索引可能是每隔 16K 構建起來的

Kafka 寫入日誌的步驟

服務端將生產者產生的消息集存儲到日誌文件,要考慮對消息集進行分段存儲。

如圖 6-3 所示,服務端將消息追加到日誌文件,具體步驟如下。

  1. 每個分區對應的日誌對象管理了分區的所有日誌分段。

  2. 將消息集追加到當前活動的日誌分段,任何時刻,都只會有一個活動的日誌分段

  3. 每個日誌分段對應一個數據文件和索引文件,消息內容會追加到 Log 數據文件中。

  4. 操作底層數據的接口是文件通道,消息集提供一個 writeFullyTo() 方法,參數是文件通道

  5. 消息集 (ByteBufferMessageSet) 的 writeFullyTo()方法,調用文件通道的 write()方法,將底層包含消息內容的字節緩衝區 (ByteBuffer) 寫到 File 文件通道中。

  6. 字節緩衝區寫到 File 文件通道中,消息就持久化到日誌分段對應的 log 分段 數據文件中了

生產者發送消息時的 消息集

生產者發送消息時,會在客戶端將屬於同一個分區的一批消息,作爲一個生產請求發送給服務端。

底層是字節緩衝區的 ByteBufferMessageSet 對象。

僞代碼如下:

// Java版本的生產者客戶端傳遞的消息內容是ByteBuffer,無需額外處理
class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet {
	// Scala版本的客戶端傳遞Message對象,要將消息集填充到字節緩衝區中
	def this(codec: CompressionCodeccounter:LongRef,messages: Message*) {
		// create()的返回值是ByteBuffer,通過this()再調用類級別的構造函數
		this(create(0ffsetAssigner(counter,messages.size),messages:_*))
	}	
}

消息集中的每條消息 (Message) 都會被分配一個相對偏移量,而每一批消息的相對偏移量都是從 0 開始的。

下圖給出了一個示例,生產者寫到分區 P 批匹消息:

客戶端每次發送給服務端的一批消息,它的字節緩衝區只屬於這一批消息,字節緩衝區不是共享的數據結構。

消息集中的每條消息由 3 部分組成: 偏移量、數據大小、消息內容。

Kafka 存儲消息時,會爲每條消息都指定一個唯一的偏移量。

同一個分區的所有日誌分段,它們的偏移量從 0 開始不斷遞增。不同分區的偏移量之間沒有關係,所以說 Kaka 只保證同一個分區的消息有序性,但是不保證跨分區消息的有序性。

消息長度通常不固定,而且在讀取文件時客戶端可能期望直接定位到指定的偏移量。

記錄消息長度的好處是: 如果不希望讀取這條消息,只需要讀取出消息長度這個字段的值,然後跳過這些大小的字節,這樣就可以定位到下一條數據的起始位置。

和消息集的第二部分類似,每條消息的鍵值之前也都會先記錄鍵的長度和值的長度。

注意: 消息格式是在客戶端定義的消息集在傳給服務端之前, 就用 ByteBufferMessageSet 封裝好。服務端接收的每個分區消息就是 ByteBufferMessageSet。

另外,如所示,每條消息除了保存消息的鍵值內容外,還保存一些其他數據,比如校驗值、魔數、鍵的長度、值的長度等。

消息集的 writeMessage()方法將每條消息 (Message) 填充到字節緩衝區中,緩衝區會暫存每個分區的一批消息, 這個方法實際上是在客戶端調用的,填充消息 纔會爲這批消息設置從開始遞增的偏移量,

如下所示,在服務端調用文件通道的寫方法時,纔會將消息集字節緩衝區的內容刷寫到文件中。

Kafka 日誌追加方式

服務端將每個分區的消息追加到日誌中,是以日誌分段爲單位的。

當日志分段累加的消息達到闕值大小 (文件大小達到 1GB) 時,會新創建一個日誌分段保存新的消息,而分區的消息總是追加到最新的日誌分段中。

每個日誌分段都有一個基準偏移量 (segmentBaseoffset,或者叫 baseoffset),這個基準偏移量是分區級別的絕對偏移量,而且這個值在日誌分段中是固定的。有了這個基準偏移量,就可以計算出每條消息在分區中的絕對偏移量,最後把消息以及對應的絕對偏移量寫到日誌文件中。

日誌追加方法中的 messages 參數是客戶端創建的消息集,這裏面的偏移量是相對偏移量。

在追加到日誌分段時,validMessages 變量已經是絕對偏移量了,具體步驟如下。

  1. 對客戶端傳遞的消息集進行驗證,確保每條消息的 (相對) 偏移量都是單調遞增的。

  2. 刪除消息集中無效的消息。如果大小一致,直接返回 messages,否則會進行截斷。

  3. 爲有效消息集的每條消息分配 (絕對) 偏移量。

  4. 將更新了偏移量值的消息集追加到當前日誌分段中。

  5. 更新日誌的偏移量 (下一個偏移量 nextOffsetMetadata) 必要時調用 flush()方法刷寫磁盤。

Log 類定義了 Log#append 方法,用於往 Log 對象中追加消息數據。

需要注意的一點是,Log 對象使用 SkipList 管理多個 LogSegment,我們在執行追加消息時是不能夠往 SkipList 中的任意 LogSegment 對象執行追加操作的,Kafka 設計僅允許往 activeSegment 對象中追加消息。

方法 Log#append 實現如下:

def append(records: MemoryRecords, assignOffsets: Boolean = true): LogAppendInfo = {
    // 1. 解析、校驗待追加的消息數據,封裝成 LogAppendInfo 對象
    val appendInfo = this.analyzeAndValidateRecords(records)
    // 如果消息數據個數爲 0,則直接返回
    if (appendInfo.shallowCount == 0) return appendInfo

    // 2. 剔除待追加消息中未通過驗證的字節部分
    var validRecords = this.trimInvalidBytes(records, appendInfo)

    try {
        // 將待追加消息中剩餘有效的字節追加到 Log 對象中
        lock synchronized {
            // 3.1 如果指定需要分配 offset
            if (assignOffsets) {
                // 獲取當前 Log 對象對應的最後一個 offset 值,以此開始向後分配 offset
                val offset = new LongRef(nextOffsetMetadata.messageOffset)
                // 更新待追加消息的 firstOffset 爲 Log 對象最後一個 offset 值
                appendInfo.firstOffset = offset.value
                val now = time.milliseconds
                val validateAndOffsetAssignResult = try {
                    // 對消息(包括壓縮後的)的 magic 值進行統一
                    // 驗證數據完整性,並分配 offset,同時按要求更新消息的時間戳
                    LogValidator.validateMessagesAndAssignOffsets(
                        validRecords,
                        offset,
                        now,
                        appendInfo.sourceCodec,
                        appendInfo.targetCodec,
                        config.compact,
                        config.messageFormatVersion.messageFormatVersion,
                        config.messageTimestampType,
                        config.messageTimestampDifferenceMaxMs)
                } catch {
                    case e: IOException =>
                       .....
                }
                validRecords = validateAndOffsetAssignResult.validatedRecords
                appendInfo.maxTimestamp = validateAndOffsetAssignResult.maxTimestamp
                appendInfo.offsetOfMaxTimestamp = validateAndOffsetAssignResult.shallowOffsetOfMaxTimestamp
                // 更新待追加消息的 lastOffset 值
                appendInfo.lastOffset = offset.value - 1
                // 如果時間戳類型爲 LOG_APPEND_TIME,則修改時間戳
                if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME)
                    appendInfo.logAppendTime = now

                // 如果在執行 validateMessagesAndAssignOffsets 操作時修改了消息的長度,
                //則需要重新驗證,防止消息過長
                if (validateAndOffsetAssignResult.messageSizeMaybeChanged) {
                    for (logEntry <- validRecords.shallowEntries.asScala) {
                        if (logEntry.sizeInBytes > config.maxMessageSize) {
                           .....
                        }
                    }
                }
            }
            // 3.2 不需要分配 offset
            else {
                // 如果消息的 offset 不是單調遞增,或者消息的 firstOffset 小於 Log 中記錄的下一條消息 offset,則說明 appendInfo 非法
                if (!appendInfo.offsetsMonotonic || appendInfo.firstOffset < nextOffsetMetadata.messageOffset)
                  ...
            }

            // 4. 校驗待追加消息的長度,保證不超過了單個 LogSegment 所允許的最大長度(對應 segment.bytes 配置)
            if (validRecords.sizeInBytes > config.segmentSize) {
                throw new RecordBatchTooLargeException(
                    "Message set size is %d bytes which exceeds the maximum configured segment size of %s.".format(validRecords.sizeInBytes, config.segmentSize))
            }

            // 5. 獲取 activeSegment 對象,如果需要則創建新的 activeSegment 對象
            val segment = this.maybeRoll(
                messagesSize = validRecords.sizeInBytes,
                maxTimestampInMessages = appendInfo.maxTimestamp,
                maxOffsetInMessages = appendInfo.lastOffset)


            // 6. 往 activeSegment 中追加消息
            segment.append(
                firstOffset = appendInfo.firstOffset,
                largestOffset = appendInfo.lastOffset,
                largestTimestamp = appendInfo.maxTimestamp,
                shallowOffsetOfMaxTimestamp = appendInfo.offsetOfMaxTimestamp,
                records = validRecords)

            // 7. 更新 LEO 中記錄的當前 Log 最後一個 offset 值
            this.updateLogEndOffset(appendInfo.lastOffset + 1)
              ...

            // 8. 如果刷盤時間間隔達到閾值(對應 flush.messages 配置),則執行刷盤
            if (unflushedMessages >= config.flushInterval)
                this.flush() // 將 [recoveryPoint, logEndOffset) 之間的數據刷盤

            appendInfo
        }
    } catch {
       ...
    }
}

nextOffsetMetadata 讀寫操作發生在 務端處理生產請求和拉取請求時,具體步驟如下:

  1. 生產者發送消息集給服務端,服務端會將這一批消息追加到日誌中。

  2. 每條消息需要指定絕對偏移量,服務端會用 nextoffsetMetadata 的值作爲起始偏移量。

  3. 服務端將每條帶有偏移量的消息寫入到日誌分段中。

  4. 服務端會獲取這一批消息中最後一條消息的偏移量,加上一後更新 nextoffsetMetadata。

  5. 消費線程 (消費者或備份副本) 會根據這個變量的最新值拉取消息。一旦變量值發生變化消費線程就能拉取到新寫入的消息。

nextoffsetMetadata 變量是一個關於日誌的偏移量元數據對象 (LogoffsetMetadata)。

日誌的偏移量元數據都是從當前活動的日誌分段 (activeSegment) 獲取相關的信息: 下一條消息的偏移量、當前日誌分段的基準偏移量、當前日誌分段的大小。

LogSegment#append 方法的實現,該方法用於往當前 LogSegment 對應的 log 文件中追加消息數據,並在需要時更新對應的 index 和 timeindex 索引數據。

LogSegment#append 方法實現如下:

def append(firstOffset: Long, // 待追加消息的起始 offset
           largestOffset: Long, // 待追加消息中的最大 offset
           largestTimestamp: Long, // 待追加消息中的最大時間戳
           shallowOffsetOfMaxTimestamp: Long, // 最大時間戳消息對應的 offset
           records: MemoryRecords) { // 待追加的消息數據
    if (records.sizeInBytes > 0) {
       ...
        // 獲取物理位置(當前分片的大小)
        val physicalPosition = log.sizeInBytes()
        if (physicalPosition == 0) rollingBasedTimestamp = Some(largestTimestamp)

        require(canConvertToRelativeOffset(largestOffset), "largest offset in message set can not be safely converted to relative offset.")

        // 將消息數據追加到 log 文件
        val appendedBytes = log.append(records)
        trace(s"Appended $appendedBytes to ${log.file()} at offset $firstOffset")

        // 更新已追加的消息對應的最大時間戳,及其 offset
        if (largestTimestamp > maxTimestampSoFar) {
            maxTimestampSoFar = largestTimestamp
            offsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp
        }

        // 如果當前累計追加的日誌字節數超過閾值(對應 index.interval.bytes 配置)
        if (bytesSinceLastIndexEntry > indexIntervalBytes) {
            // 更新 index 和 timeindex 文件
            index.append(firstOffset, physicalPosition)
            timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp)
            bytesSinceLastIndexEntry = 0 // 重置當前累計追加的日誌字節數
        }
        // 更新累計加入的日誌字節數
        bytesSinceLastIndexEntry += records.sizeInBytes
    }
}

如果當前追加的消息數據是有效的,則 LogSegment 會調用 FileRecords#append 方法將消息數據追加到對應的 log 文件中,並更新本地記錄的已追加消息的最大時間戳及其 offset。

前面我們介紹了 Kafka 並不會對每條消息都建立索引,而是採用稀疏索引的策略間隔指定大小的字節數(對應 index.interval.bytes 配置)建立索引項,如果當前累計追加的消息字節數超過該配置值,則 Kafka 會更新對應的 index 和 timeindex 數據。

broker 如何 分析和驗證消息集?

對消息集進行分析和驗證,主要利用了 Kafka 中 “分區的消息必須有序” 這個特性。

分析和驗證方法的返回值是一個日誌追加信息 (LogAppendInfo) 對象,該對象的內容包括: 消息集第一條和最後條消息的偏移量、消息集的總字節大小、偏移量是否單調遞增。

日誌追加信息表示消息集的概要信息,但並不包括消息內容。

日誌追加信息對象也是追加日誌方法的最後返回值。

服務端上層類 (比如分區、副本管理器調用追加日誌的方法,期望得到這一批消息的概要信息,比如第一個偏移量和最後一個偏移量。

這樣,它們就可以根據偏移量計算出一共追加了多少條消息 (服務端接收的消息集和最後真正被追加的消息數量可能會不一樣)。

上層類甚至還可以做一些複雜的業務邏輯處理,比如根據最後一個偏移量判斷被延遲的生產請求是否可以完成。相關代碼如下:

//對要追加的消息集進行分析和驗證,消息太大或者無效會被丟棄
def analyzeAndValidateMessageSet(messages:ByteBufferMessageSet)={
	var shallowMessageCount = 0 //消息數量
	var validBytesCount =0 //有效字節數
	//第一條消息和最後一條 (循環時表示上一條消息的偏移量)消息的偏移量
	var firstOffset,astoffset = -1L
	var monotonic =true // 是否單調遞增
	for(messageAndOffset <- messages,shallowIterator) {
		// 在第一條消息中更新firstoffset
		if(firstOffset <0) firstOffset = messageAndOffset.offset
		if(lastOffset >= messageAndOffset.offset) monotonic = false
		//每循環一條消息,就更新
		lastoffsetLastOffset = messageAndOffset.offset
		val m= messageAndoffset.message
		val messageSize = MessageSet.entrySize(m)
		m.ensureValid()//檢查消息是否有效
		shallowMessageCount +=1
		validBytesCount += messageSize
	}
	LogAppendInfo(firstOffset,lastOffset, sourceCodec,targetCodec,shallowMessageCount,validBytesCount,monotonic)
}

前面說過,消息集對象中消息的偏移量是從 0 開始的相對偏移量,並且它的底層是一個字節緩衝區。

那麼要獲得消息集中第一條消息和最後一條消息的偏移量,只能再把字節緩衝區解析出來,讀取每一條消息的偏移量。

這裏因爲還要對每條消息進行分析和驗證,所以讀取消息是不可避免的。

分析消息集的每條消息時,都會更新最近的偏移量 (lastoffset) 但只會在分析第一條消息時更新起始偏移量(firstoffset)。

判斷消息集中所有消息的偏移量是否單調遞增,只需要比較最近的偏移量和當前消息的偏移量。

如果每次處理一條消息時,當前消息的偏移量都比最近的偏移量值 (上 - 條消息的偏移量) 大,說明消息集是單調遞增的。

對消息集的每條消息都驗證和分析後, 下一步要爲消息分配絕對偏移量, 最後才能追加到日誌分段

broker 如何爲消息集分配絕對偏移量?

存儲到日誌文件中的消息,必須是分區 的絕對偏移量。

爲消息集分配絕對偏移量時,以 nextoffsetMetadata 的偏移量作爲起始偏移量。

分配完成後還要更新 nextoffsetMetadata 的偏移量值。

爲了保證在分配過程中,獲取偏移量的值並加一是一個原子操作,起始偏移量會作爲原子變量傳入 validateMessagesAndAssignoffsets() 方法。

相關代碼如下:

// 消息集 加到 志,獲取最近的偏移量作爲初始佳
class Log{ 
	def append(l'lessages : ByteBuffe MessageSet) {
		// nextOffsetMetadata 表示最近 一條消息的偏移量
		val offset = new AtomicLong(nextOffsetMetadata.messageOffset)
		//offset參數作爲原子變量,在分配偏移量時,先獲取出值再加一
		validMessages=validMessages.validateMessagesAndAssignOffsets(offset)
		//offset的返回值是最後一條消息的偏移量再加一,那麼最後一條消息就要減一
		appendInfo.lastOffset = offset.get - 1
		segment.append(appendInfo.firstoffset,validMessages) // 追加消息集
		//更新nextoffsetMetadata,用最後一條消息的偏移量加一表示最近下一條
		updateLogEndOffset(appendInfo.lastOffset +1)
	}
}

//字節緩衝區消息集根據指定的偏移量計數器、更新每條消息的偏移量
class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet {
	def validateMessagesAndAssignOffsets(offsetCounter:AtomicLong)={
		var messagePosition =0
		buffer.mark()//先標記
		while(messagePosition < sizeInBytes - MessageSet.LogOverhead){
		buffer.position(messagePosition)// 定位到每條消息的起始位置
		//以最新的偏移量計數器爲基礎,每條消息的偏移量都在此基礎上不斷加一	
		buffer.putLong(offsetCounter.getAndIncrement())
		val messageSize= buffer.getInt()
		// 消息的大小//更新消息的起始位置,爲下一條消息做準備 (12+消息大小,表示一條完整的消息)
		messagePosition += MessageSet.LogOverhead + messageSize
		buffer.reset()//重置的時候,回到最開始標記的地方
		this// 還是返回字節緩衝區消息集。除了偏移量改了,其他均沒有變化
	}
}

根據 “1. 消息集” 中消息集的格式,爲消息分配偏移量,實際上是更新每條消息的偏移量數據 (offset)。消息的大小(size) 和消息內容 (Message) 都不需要變動。

現在的問題主要是: 如何在字節緩衝區中定位到每條消息的偏移量所在位置。

定位消息偏移量的方式有兩種:

因爲底層字節緩衝區和消息集對象是一一對應的,所以消息集中第一條消息的偏移量一定是從字節緩衝區的位置 0 開始的。

每條消息的長度計算方式是: 8 + 4 + 消息大小。

其中,消息大小的值可以從第二部分讀取。

如表 6-2 所示,第一條消息中 “消息的大小” 存的值是 3,表示消息本身的內容長度是 3,整個消息佔用的大小就是: 8+4+3=15。

假設偏移量計數器初始值爲 10(即 nextoffsetMetadata 的值) 第一條消息的偏移量就等於 10。

分配第一條消息的偏移量時,修改前面 8 字節的內容爲 10。

接下來要修改第二條消息的偏移量爲 11,通過讀取第一條消息的大小 (等於 3) 再加上 12 字節,就定位到第二條消息起始位置(等於 15)。

修改第三條消息的偏移量爲 12 也是類似的,通過讀取第二條消息的大小 (等於 5) 再加上 12 字節 (第二條消息總共佔用了 17 字節) 就可以定位到第三條消息的起始位置 (15 再加上 17 等於 32) 以此類推,第四條消息的起始位置等於第三條消息佔用的 12 字節再加上 32,等於 48。

在寫人每條消息的絕對偏移量後,只會讀取消息的大小,不會讀取這條消息的實際內容。

消息集經過分配絕對偏移量後,纔可以追加到日誌分段中,日誌分段接收消息集並寫到文件中。

Kafak 通過順序寫實現寫入的高性能

通過上面的分析可以看到 ,kafka 寫 log 數據的時候,是以磁盤順序寫的方式來寫的,也就是說僅僅將數據追加到文件的末尾,不是在文件的隨機位置來修改數據。

爲啥 kafka 要順序寫,而不是隨機寫,是因爲硬盤速度慢,尤其 機械硬盤。

機械硬盤的性能爲啥那麼慢? 看看結構就知道:

機械磁盤上的每個磁道被等分爲若干個弧段,這些弧段稱之爲扇區。

如何在磁盤中讀 / 寫數據? 需要 物理動作,去移動 “磁頭” 到目標 扇區

機械磁盤的讀寫以扇區爲基本單位。完成一次磁盤 IO,需要經過尋道旋轉數據傳輸三個步驟。

看經典大圖:

爲什麼要採用磁盤順序寫?

正因爲 完成一次磁盤 IO,需要經過尋道、旋轉和數據傳輸三個步驟:

  1. 尋道(時間):磁頭移動定位到指定磁道;

  2. 旋轉延遲(時間):等待指定扇區從磁頭下旋轉經過;

  3. 數據傳輸(時間):數據在磁盤、內存與網絡之間的實際傳輸。

首先必須找到柱面,即磁頭需要移動對準相應磁道,這個過程叫做尋道,所耗費時間叫做尋道時間,然後目標扇區旋轉到磁頭下, 這個過程耗費的時間叫做旋轉時間。

怎麼樣才能提高磁盤的讀寫效率呢?

即採用 順序寫,這樣就不需要尋道時間,

而且,只需很少的旋轉時間,將數據追加到文件的末尾,不是在文件的隨機位置來修改數據。

因此,順序寫 省去尋道旋轉可以極大地提高磁盤讀寫的性能。

Kafka 採用順序寫文件的方式來提高磁盤寫入性能。

順序寫文件,基本減少了磁盤尋道旋轉的次數。磁頭再也不用在磁道上亂舞了,而是一路向前飛速前行。

Kafka 中每個分區是一個有序的,不可變的消息序列,新的消息不斷追加到 Partition 的末尾。

在 Kafka 中 Partition 只是一個邏輯概念,Kafka 將 Partition 劃分爲多個 Segment,每個 Segment 對應一個物理文件,Kafka 對 segment 文件追加寫,這就是順序寫文件。

Kafak 通過 sendfile 零拷貝實現發送的高性能

Kafak 通過 sendfile 零拷貝實現發送消息,從磁盤讀取文件併發送到網絡的流程圖

爲了使用 sendfile 零拷貝,把數據從磁盤直接發向網絡,那 Kafka 在存儲數據的時候,就要保證存儲的數據格式和將要發出的 response 格式一致。

所以, Kafka 沒有對數據做額外處理,只是簡單轉發。

通過零拷貝技術,磁盤上的數據還是要先進入 read buffer,然後不用再拷貝到應用程序的緩存區,而是直接拷貝到 NIC buffer,圖上的步驟 2:Appends just file descriptors,只是把文件描述符交給了 Socket buffer,實際數據並沒有拷貝給 Socket buffer。

這就是所謂的 scatter-gather 操作(也稱爲 Vectorized I/O),scatter-gather 是僅將 read buffer 數據指針存儲在 socket buffer 中,並讓 DMA 直接從內存讀取數據的行爲。

最終結果如何呢?

消費者從 broker 拉取數據

消費者從 broker 拉取數據,broker 把數據寫入 SOCKET channel,服務端的代碼如下:

@Override
    public long writeTo(TransferableChannel destChannel, long offset, int length) throws IOException {
        long newSize = Math.min(channel.size(), end) - start;
        int oldSize = sizeInBytes();
        if (newSize < oldSize)
            throw new KafkaException(String.format(
                    "Size of FileRecords %s has been truncated during write: old size %d, new size %d",
                    file.getAbsolutePath(), oldSize, newSize));

        long position = start + offset;
        long count = Math.min(length, oldSize - offset);
        return destChannel.transferFrom(channel, position, count);
    }

@Override
    public long transferFrom(FileChannel fileChannel, long position, long count) throws IOException {
        return fileChannel.transferTo(position, count, socketChannel);
    }

fileChannel.transferTo() 方法直接將當前通道內容傳輸到另一個通道,沒有涉及到 Buffer 的任何操作,NIO 中的 Buffer 是 JVM 堆或者堆外內存,但不論如何他們都是操作系統內核空間的內存。

也就是說這種方式不會有內核緩衝區到用戶緩衝區的讀寫問題

transferTo() 的實現方式就是通過系統調用 sendfile()(當然這是 Linux 中的系統調用)

相當於直接把請求數據的 ByteBuffer(內核態,數據還沒複製到用戶態) 通過 FileChannel 不用用戶態和內核態相互之間的複製,直接轉到 socketChannel

Kafka 的數據傳輸通過 TransportLayer 來完成,其子類 PlaintextTransportLayer 通過 Java NIO 的 FileChannel 的 transferTo 和 transferFrom 方法實現零拷貝

Kafka 爲啥達到 100Wtps 高性能?

Kafka 之 重要原因就是它採用了零拷貝(Zero-copy)技術 + 順序寫。

Kafka 採用了兩種零拷貝技術來提高性能:mmap 零拷貝 和 sendfile 零拷貝 。

Kafka 採用了 順序寫 技術來提高性能:順序寫 log 文件。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/jqlyBfOWHabGBb5WefsYAA