閱讀730 返回首頁    go 阿裏雲 go 技術社區[雲棲]


《kafka中文手冊》- 構架設計(一)

4. DESIGN 設計

4.1 Motivation 目的

We designed Kafka to be able to act as a unified platform for handling all the real-time data feeds a large company might have. To do this we had to think through a fairly broad set of use cases.

kafka被設計為大公司的實時在線數據處理提供一個統一的平台, 為達到這樣的目標, 我們對相當廣泛的用例進行考慮和衡量.

It would have to have high-throughput to support high volume event streams such as real-time log aggregation.

能具備很高的吞吐量以便支持大容積的事件流, 例如實時日誌匯總係統

It would need to deal gracefully with large data backlogs to be able to support periodic data loads from offline systems.

能非常謹慎處理大量日誌數據備份, 以便能定時從離線係統加載數據

It also meant the system would have to handle low-latency delivery to handle more traditional messaging use-cases.

係統必須有比較低的延遲分發機製, 才能支持傳統的消息係統的使用

We wanted to support partitioned, distributed, real-time processing of these feeds to create new, derived feeds. This motivated our partitioning and consumer model.

希望能夠支持可分區的, 分布式的, 實時的數據反饋處理, 並創建和分發新的反饋.

Finally in cases where the stream is fed into other data systems for serving, we knew the system would have to be able to guarantee fault-tolerance in the presence of machine failures.

最後, 如果流是反饋給其他係統的, 係統需要能在機器宕機的時候提供容錯保障.

Supporting these uses led us to a design with a number of unique elements, more akin to a database log than a traditional messaging system. We will outline some elements of the design in the following sections.

為了支持這些使用情景, 我們需要設計一個更類似於數據庫日誌係統, 而不是傳統的消息係統那樣, 具有更多獨特特性的係統

4.2 Persistence 存儲

Don’t fear the filesystem! 不要對文件係統感到恐懼

Kafka relies heavily on the filesystem for storing and caching messages. There is a general perception that “disks are slow” which makes people skeptical that a persistent structure can offer competitive performance. In fact disks are both much slower and much faster than people expect depending on how they are used; and a properly designed disk structure can often be as fast as the network.

kafka很依賴於底層的文件係統用於保存和緩存消息記錄, 一種普遍的觀念是磁盤很慢, 大家都會懷疑kafka的存儲結構是否能提供有競爭力的存儲性能呢. 但是, 實際上磁盤比人們現象中的還快, 這就看你怎麼用了. 一個合理設計的磁盤存儲結構, 往往可以和網絡一樣快

The key fact about disk performance is that the throughput of hard drives has been diverging from the latency of a disk seek for the last decade. As a result the performance of linear writes on a JBOD configuration with six 7200rpm SATA RAID-5 array is about 600MB/sec but the performance of random writes is only about 100k/sec—a difference of over 6000X. These linear reads and writes are the most predictable of all usage patterns, and are heavily optimized by the operating system. A modern operating system provides read-ahead and write-behind techniques that prefetch data in large block multiples and group smaller logical writes into large physical writes. A further discussion of this issue can be found in this ACM Queue article; they actually find that sequential disk access can in some cases be faster than random memory access!

關於磁盤性能的關鍵事實是,硬盤驅動器的吞吐量在過去十年時, 磁道的尋址延遲就已經達到了極限了。使用jaod方式配置6個7200rpm SATA RAID-5 組的磁盤陣列大概是 600MB/sec, 但是隨即寫性能隻有100k/sec, 差距是6000X萬倍, 線性讀寫在使用上是最容易預測的方式, 所以大部分操作係統都對這方麵做了很多優化措施. 現在的操作係統, 都有提前讀和緩存寫的技術, 從大的數據塊中批量讀取數據, 並匯總小的邏輯寫請求後, 使用一次大的物理寫請求代替. 跟多關於這方麵的套路可以查看 ACM Queue article 這裏, 它指出這一的一個事實, 順序寫在某些情況下比隨機的內存讀取還要快

To compensate for this performance divergence, modern operating systems have become increasingly aggressive in their use of main memory for disk caching. A modern OS will happily divert all free memory to disk caching with little performance penalty when the memory is reclaimed. All disk reads and writes will go through this unified cache. This feature cannot easily be turned off without using direct I/O, so even if a process maintains an in-process cache of the data, this data will likely be duplicated in OS pagecache, effectively storing everything twice.

為了彌補這種性能上的差距, 現代操作係統, 更多使用內存來為磁盤做緩存. 現代的操作係統更樂意使用所有的空閑內存為磁盤做緩存, 在內存的回收上隻需要花費極小的代價. 所有的磁盤讀寫都通過統一的緩存. 如果沒有使用direct I/O這個開關, 這種特性不會很容易被屏蔽掉. 因此,即使一個進程內部獨立維持一個數據緩存, 那麼數據也有可能在係統頁中再被緩存一次, 所有的數據都會被存儲兩次

Furthermore, we are building on top of the JVM, and anyone who has spent any time with Java memory usage knows two things:

此外, 我們基於jvm上麵構建應用, 有花費時間在java內存上的人都知道兩件事

  1. The memory overhead of objects is very high, often doubling the size of the data stored (or worse). 內存中存有大量的對象需要消耗很高, 經常是雙倍於存儲到磁盤時大小(可能更多)
  2. Java garbage collection becomes increasingly fiddly and slow as the in-heap data increases. java的垃圾收集器在內存數據增加是變得很煩瑣的, 很慢

As a result of these factors using the filesystem and relying on pagecache is superior to maintaining an in-memory cache or other structure—we at least double the available cache by having automatic access to all free memory, and likely double again by storing a compact byte structure rather than individual objects. Doing so will result in a cache of up to 28-30GB on a 32GB machine without GC penalties. Furthermore, this cache will stay warm even if the service is restarted, whereas the in-process cache will need to be rebuilt in memory (which for a 10GB cache may take 10 minutes) or else it will need to start with a completely cold cache (which likely means terrible initial performance). This also greatly simplifies the code as all logic for maintaining coherency between the cache and filesystem is now in the OS, which tends to do so more efficiently and more correctly than one-off in-process attempts. If your disk usage favors linear reads then read-ahead is effectively pre-populating this cache with useful data on each disk read.

考慮到這些因素, 使用文件係統並使用頁緩存機製比自己去進行內存緩存或使用其他存儲結構更為有效–我們訪問內存的時候已經起碼至少訪問了兩次緩存, 很有可能在寫字節的時候也是兩次存儲而非單次. 這樣做的話, 在一個緩存達到32GB的機器上, 可以減少GC的代價, 這樣也可以減少代碼在維護緩存和係統文件間的一致性, 比再嚐試新的方法有更高的正確行. 如果你對磁盤的使用充分利用到線性讀, 那麼預取機製將會很有效的在每次磁盤讀取時實現填充好緩存空間.

This suggests a design which is very simple: rather than maintain as much as possible in-memory and flush it all out to the filesystem in a panic when we run out of space, we invert that. All data is immediately written to a persistent log on the filesystem without necessarily flushing to disk. In effect this just means that it is transferred into the kernel’s pagecache.

這意味設計非常簡單, 係統不是更多把數據保存到內存空間, 在內存空間耗盡時才趕緊寫入到文件係統中, 相反的, 所有的數據都被馬上寫入到文件係統的日誌文件中, 但沒有必要馬上進行flush磁盤操作. 隻是把數據傳輸到係統內核的頁麵空間中去了.

This style of pagecache-centric design is described in an article on the design of Varnish here (along with a healthy dose of arrogance).

這種頁麵緩存風格設計可以參考這裏 : article

Constant Time Suffices 常量耗時需求

The persistent data structure used in messaging systems are often a per-consumer queue with an associated BTree or other general-purpose random access data structures to maintain metadata about messages. BTrees are the most versatile data structure available, and make it possible to support a wide variety of transactional and non-transactional semantics in the messaging system. They do come with a fairly high cost, though: Btree operations are O(log N). Normally O(log N) is considered essentially equivalent to constant time, but this is not true for disk operations. Disk seeks come at 10 ms a pop, and each disk can do only one seek at a time so parallelism is limited. Hence even a handful of disk seeks leads to very high overhead. Since storage systems mix very fast cached operations with very slow physical disk operations, the observed performance of tree structures is often superlinear as data increases with fixed cache–i.e. doubling your data makes things much worse than twice as slow.

在消息係統中, 大部分持久化的數據結構通常使用一個消費者隊列一個btree結構, 或其他隨機讀取的數據結構用於維持消息的元數據信息. btree結構是最通用的數據結構類型, 它在消息係統中, 能夠支廣泛的事物或非事物的語義. 雖然btree操作的代價是 O(log N), 但是實際使用時消耗的代價卻很高. 通常O(log N) 被認為是消耗常量時間, 但是這個對硬盤操作卻不是這樣, 硬盤尋址需要使用10ms的耗時, 每次請求隻能做一次硬盤尋址, 不能並發執行. 所以即使少數的幾次硬盤尋址也會有很高的負載, 因為存儲係統混合和快速緩存操作和慢速的物理磁盤操作, btree樹的性能一般逼近與緩存到硬盤裏麵的數據大小, 當數據量加倍時, 效率可能下降一半, 或更慢.

Intuitively a persistent queue could be built on simple reads and appends to files as is commonly the case with logging solutions. This structure has the advantage that all operations are O(1) and reads do not block writes or each other. This has obvious performance advantages since the performance is completely decoupled from the data size—one server can now take full advantage of a number of cheap, low-rotational speed 1+TB SATA drives. Though they have poor seek performance, these drives have acceptable performance for large reads and writes and come at 1/3 the price and 3x the capacity.

直覺來看, 一個持久化隊列可以使用簡單的讀和追加數據到文件的日誌方式進行實現, 這種結構有一個好處是, 所有操作都是O(1)性能的, 而且讀和寫入數據不會相互阻塞, 這樣性能和數據的大小完全無關, 一台服務器可以完全充分利用了廉價, 低速的1+TB SATA 硬盤, 雖然它們的尋道性能不高, 但是他們以3分之一的價格和3倍的容量接受大量的讀寫請求

Having access to virtually unlimited disk space without any performance penalty means that we can provide some features not usually found in a messaging system. For example, in Kafka, instead of attempting to delete messages as soon as they are consumed, we can retain messages for a relatively long period (say a week). This leads to a great deal of flexibility for consumers, as we will describe.

能夠以微小地 性能代價存取數據到無限的硬盤中, 這意味著我們可以提供一些其他消息係統沒有的特性. 例如, 在kafka中, 不需要在消費者消費了數據後馬上把消息從隊列中刪除掉, 相反的我們可以保留一段很長的時間, 例如一個禮拜. 這對消費者來說提供了很大的靈活性, 下麵我們就會講到

4.3 Efficiency 效率

We have put significant effort into efficiency. One of our primary use cases is handling web activity data, which is very high volume: each page view may generate dozens of writes. Furthermore, we assume each message published is read by at least one consumer (often many), hence we strive to make consumption as cheap as possible.

我們在效率上投入了眾多的努力, 一個我們的主要用例是具有大吞吐量的web活動日誌, 每頁麵的每次訪問都會產生好幾十次的寫, 進一步, 我們假定每次消息發布, 至少會被一個消費者讀取(經常情況下是多個消費者), 因此, 我們努力使消費消息的代價盡可能小.

We have also found, from experience building and running a number of similar systems, that efficiency is a key to effective multi-tenant operations. If the downstream infrastructure service can easily become a bottleneck due to a small bump in usage by the application, such small changes will often create problems. By being very fast we help ensure that the application will tip-over under load before the infrastructure. This is particularly important when trying to run a centralized service that supports dozens or hundreds of applications on a centralized cluster as changes in usage patterns are a near-daily occurrence.

從構建一些相識的係統的經驗中, 我們也發現, 有效的多租戶操作是提升性能的關鍵. 下遊的基礎服務很容易由於程序的很小的使用錯誤成為瓶頸, 例如, 一些小的變化很常導致一些新的問題, 我們可以非常快速在程序發布到基礎平台前, 進行迭代測試, 這對需要在集中式的集群裏跑幾十個, 幾千個應用時, 程序每天都在變動時非常有用.

We discussed disk efficiency in the previous section. Once poor disk access patterns have been eliminated, there are two common causes of inefficiency in this type of system: too many small I/O operations, and excessive byte copying.

前麵一個章節我們討論了磁盤的性能, 沒有效率的磁盤訪問模式就忽略不說了, 這裏在係統上還有兩個可能會導致效率低下的地方: 很多小的I/O操作和過多的字節拷貝

The small I/O problem happens both between the client and the server and in the server’s own persistent operations.

小I/O問題在客戶端和服務器端都會發生, 在服務器端有它自己的存儲操作

To avoid this, our protocol is built around a “message set” abstraction that naturally groups messages together. This allows network requests to group messages together and amortize the overhead of the network roundtrip rather than sending a single message at a time. The server in turn appends chunks of messages to its log in one go, and the consumer fetches large linear chunks at a time.

為了避免這個問題, 我們的通訊協議這是基於消息集合這個概念構建的, 很容易把多個消息組合起來. 這樣允許網絡組合消息後進行發送, 而不是每次發送一條信息, 減少網絡的來回開銷. 服務器也是每次寫入一堆數據到日誌中, 消費者也是每次線性讀取一堆數據

This simple optimization produces orders of magnitude speed up. Batching leads to larger network packets, larger sequential disk operations, contiguous memory blocks, and so on, all of which allows Kafka to turn a bursty stream of random message writes into linear writes that flow to the consumers.

這種簡單的優化可以提升大量的性能, 批量處理導致大的網絡數據包, 大的磁盤順序讀寫, 連續的內存塊等等, 所有的這些能把kafka的間接性的隨機消息寫改成線性寫入後, 發送給消費者

The other inefficiency is in byte copying. At low message rates this is not an issue, but under load the impact is significant. To avoid this we employ a standardized binary message format that is shared by the producer, the broker, and the consumer (so data chunks can be transferred without modification between them).

另外一個低效率的地方是字節拷貝。在消息吞吐量不多的時候這不是一個問題,但在高負載下的影響是非常顯著。為了避免這種情況,我們在生產者、服務器和消費者間使用一個標準化的二進製消息格式(這樣數據塊可以在它們之間直接進行傳輸而不需要再做修改)。

The message log maintained by the broker is itself just a directory of files, each populated by a sequence of message sets that have been written to disk in the same format used by the producer and consumer. Maintaining this common format allows optimization of the most important operation: network transfer of persistent log chunks. Modern unix operating systems offer a highly optimized code path for transferring data out of pagecache to a socket; in Linux this is done with the sendfile system call.

服務器端使用文件的形式維護消息日誌, 所有的消息都按提供者和消費者使用的格式順序寫入到磁盤中, 維護這樣的格式需要優化最常用的一些操作: 對持久日誌塊的網絡傳輸. 現在的unix操作係列通常都有提供高效的優化代碼直接把數據從緩存頁發送到socket, 在linux下使用sendfile的係統調用

To understand the impact of sendfile, it is important to understand the common data path for transfer of data from file to socket: 如果要理解一下sendfile調用的功效, 需要了解下正常情況下數據從文件發送到socket的過程

  1. The operating system reads data from the disk into pagecache in kernel space  操作係統從磁盤讀取數據到係統內核空間的緩存頁中
  2. The application reads the data from kernel space into a user-space buffer 應用從內核空間讀取數據到用戶空間緩衝區中
  3. The application writes the data back into kernel space into a socket buffer 應用從把數據寫回到內核空間的socket緩衝區中
  4. The operating system copies the data from the socket buffer to the NIC buffer where it is sent over the network 係統拷貝socket緩衝區的數據到網卡緩衝區, 然後由網卡發送數據到網絡中

This is clearly inefficient, there are four copies and two system calls. Using sendfile, this re-copying is avoided by allowing the OS to send the data from pagecache to the network directly. So in this optimized path, only the final copy to the NIC buffer is needed.

這很明顯很沒效率, 有4次拷貝還有2次係統調用, 如果使用sendfile命令, 重新拷貝運行係統直接把數據從緩存頁拷貝到網絡, 優化後, 最終隻需要一次從緩存頁到網卡緩衝區拷貝

We expect a common use case to be multiple consumers on a topic. Using the zero-copy optimization above, data is copied into pagecache exactly once and reused on each consumption instead of being stored in memory and copied out to kernel space every time it is read. This allows messages to be consumed at a rate that approaches the limit of the network connection.

我們預期一個常見消費方式是使用多個消費者同時消費一個主題, 使用上麵提到的zero-copy的優化方式, 數據隻被拷貝到頁緩存一次, 並被多次消費, 而不是緩存到(用戶空間的)內存中, 然後在每次消費時拷貝到係統內核空間中. 這可以使消費者消費消息的速度達到網絡連接的速度.

This combination of pagecache and sendfile means that on a Kafka cluster where the consumers are mostly caught up you will see no read activity on the disks whatsoever as they will be serving data entirely from cache.

組合頁緩存和sendfile機製後, kafka集群在跟上消費者消費的同時, 讓你覺得好像沒有多少的磁盤讀活動, 因為大部分的數據響應需求都是從緩存獲取的.

For more background on the sendfile and zero-copy support in Java, see this article.

如果想要知道更多關於java對sendfile和zero-copy的支持, 可以閱讀這篇文章 article.

End-to-end Batch Compression 端到端的數據壓縮

In some cases the bottleneck is actually not CPU or disk but network bandwidth. This is particularly true for a data pipeline that needs to send messages between data centers over a wide-area network. Of course, the user can always compress its messages one at a time without any support needed from Kafka, but this can lead to very poor compression ratios as much of the redundancy is due to repetition between messages of the same type (e.g. field names in JSON or user agents in web logs or common string values). Efficient compression requires compressing multiple messages together rather than compressing each message individually.

在大部分情況下, 瓶頸不會是cpu或磁盤, 而是網絡帶寬. 這個在數據中心之間建立需要跨越廣域網發送消息的數據管道時更為明顯, 當然用戶可以獨立於kafka自己做消息壓縮, 但是這有可能由於消息類型冗餘, 導致壓縮比例很低(例如, json的字段名, 或web中的用戶代理日誌, 或常用的字符串值), 有效的壓縮方式應該是允許壓縮重複的消息, 而不是分別壓縮單個消息

Kafka supports this by allowing recursive message sets. A batch of messages can be clumped together compressed and sent to the server in this form. This batch of messages will be written in compressed form and will remain compressed in the log and will only be decompressed by the consumer.

kafka通過遞歸的消息集合支持這樣的操作. 一批的消息可以被收集在一起後壓縮, 並發送到服務器端. 這樣被壓縮的一批數據, 在日誌也是使用壓縮的格式, 隻有在消費者消費的時候才會被解壓

Kafka supports GZIP, Snappy and LZ4 compression protocols. More details on compression can be found here.

kafka支持 GZIP, Snappy and LZ4 壓縮協議, 更多關於壓縮的細節可以查看這裏 here.

4.4 The Producer 發布者

Load balancing 負載均衡

The producer sends data directly to the broker that is the leader for the partition without any intervening routing tier. To help the producer do this all Kafka nodes can answer a request for metadata about which servers are alive and where the leaders for the partitions of a topic are at any given time to allow the producer to appropriately direct its requests. 為了讓生產者實現這個功能, 所有的kafka服務器節點都能響應這樣的元數據請求: 哪些服務器是活著的, 主題的哪些分區是主分區, 分配在哪個服務器上, 這樣提供者就能適當地直接發送它的請求到服務器上.

生產者之間發送數據到主分區的服務器上, 不需要經過任何中間路由.

The client controls which partition it publishes messages to. This can be done at random, implementing a kind of random load balancing, or it can be done by some semantic partitioning function. We expose the interface for semantic partitioning by allowing the user to specify a key to partition by and using this to hash to a partition (there is also an option to override the partition function if need be). For example if the key chosen was a user id then all data for a given user would be sent to the same partition. This in turn will allow consumers to make locality assumptions about their consumption. This style of partitioning is explicitly designed to allow locality-sensitive processing in consumers.

客戶端控製消息發送數據到哪個分區,  這個可以實現隨機的負載均衡方式. 或者使用一些特定語義的分區函數, 我們有提供特定分區的接口讓用於根據指定的鍵值進行hash分區(當然也有選項可以重寫分區函數), 例如, 如果鍵值使用用戶ID, 則用戶相關的所有數據都會被分發到同一個分區上. 這允許消費者, 在消費數據時做一些特定的本地化處理. 這樣的分區風格經常被設計用於一些本地處理比較敏感的消費者

Asynchronous send 異步發送

Batching is one of the big drivers of efficiency, and to enable batching the Kafka producer will attempt to accumulate data in memory and to send out larger batches in a single request. The batching can be configured to accumulate no more than a fixed number of messages and to wait no longer than some fixed latency bound (say 64k or 10 ms). This allows the accumulation of more bytes to send, and few larger I/O operations on the servers. This buffering is configurable and gives a mechanism to trade off a small amount of additional latency for better throughput.

批處理是提升性能的一個主要驅動, 為了允許批量處理, kafka提供者會嚐試在內存中匯總數據, 並用一次請求批次提交信息. 批處理, 不僅僅可以配置指定的消息數量, 也可以指定等待特定的延遲時間(如64k 或10ms), 這允許匯總更多的數據後再發送, 在服務器端也會減少更多的IO操作. 該緩衝是可配置的,並給出了一個機製,通過權衡少量額外的延遲時間獲取更好的吞吐量.

Details on configuration and the api for the producer can be found elsewhere in the documentation.

更多的細節信息可以在提供者的 configuration 和 api 這裏找到.

4.5 The Consumer 訂閱者

The Kafka consumer works by issuing “fetch” requests to the brokers leading the partitions it wants to consume. The consumer specifies its offset in the log with each request and receives back a chunk of log beginning from that position. The consumer thus has significant control over this position and can rewind it to re-consume data if need be.
消費者通過從主分區的服務器獲取數據進行消費. 消費者指定每次請求時日誌的偏移量, 然後從這個位置開啟批量獲取數據. 消費者對位移量有絕對的控製權, 這樣消費者可以重新設置位移位置, 並在有需要的時重新消費.

Push vs. pull 推送vs拉取

An initial question we considered is whether consumers should pull data from brokers or brokers should push data to the consumer. In this respect Kafka follows a more traditional design, shared by most messaging systems, where data is pushed to the broker from the producer and pulled from the broker by the consumer. Some logging-centric systems, such as Scribe and Apache Flume, follow a very different push-based path where data is pushed downstream. There are pros and cons to both approaches. However, a push-based system has difficulty dealing with diverse consumers as the broker controls the rate at which data is transferred. The goal is generally for the consumer to be able to consume at the maximum possible rate; unfortunately, in a push system this means the consumer tends to be overwhelmed when its rate of consumption falls below the rate of production (a denial of service attack, in essence). A pull-based system has the nicer property that the consumer simply falls behind and catches up when it can. This can be mitigated with some kind of backoff protocol by which the consumer can indicate it is overwhelmed, but getting the rate of transfer to fully utilize (but never over-utilize) the consumer is trickier than it seems. Previous attempts at building systems in this fashion led us to go with a more traditional pull model.

一個基本的問題是, 我們在考慮, 消費者是否主動從服務器那裏拉去數據, 還是服務器應該主動推送數據到消費者端. 在這方麵, kafka和傳統的消息吸引設計一樣, 生產者推送消息到服務器, 消費者從服務器拉去消息. 在一些日誌中心係統, 像 Scribe and Apache Flume, 使用一種特殊的推送流數據推送機製, 這些方式都有利有弊, 但是, 在一個基於推送方式消息係統, 很難處理大量的消費者, 因為服務器需要控製數據的傳輸速率. 目標是為了讓消費者盡可能多消費數據;不幸的是,在一個推送係統,這意味著消費者往往被消息淹沒,如果消費率低於生產速度(例如密集的服務攻擊). 基於拉去的係統往往比較優雅些, 消息處理隻是落後, 消費者在後麵盡可能趕上.

Another advantage of a pull-based system is that it lends itself to aggressive batching of data sent to the consumer. A push-based system must choose to either send a request immediately or accumulate more data and then send it later without knowledge of whether the downstream consumer will be able to immediately process it. If tuned for low latency, this will result in sending a single message at a time only for the transfer to end up being buffered anyway, which is wasteful. A pull-based design fixes this as the consumer always pulls all available messages after its current position in the log (or up to some configurable max size). So one gets optimal batching without introducing unnecessary latency.

使用基於拉取方式的係統還有一個好處就是容易匯集批量數據後發給消費者. 基於推送的係統, 要麼馬上發送請求, 要麼匯總數據後再發送, 而不光下遊的消費者是否能夠處理得上. 如果為了進一步降低延遲, 這會導致緩存還沒有結束時就傳輸單條數據過去, 這樣很浪費. 基於拉的方式可以從當前日誌位置拉去可用的消息(或者根據配置的大小). 這樣能在沒有引入不必要的延遲的情況下, 獲取到比較好的批處理性能.

The deficiency of a naive pull-based system is that if the broker has no data the consumer may end up polling in a tight loop, effectively busy-waiting for data to arrive. To avoid this we have parameters in our pull request that allow the consumer request to block in a “long poll” waiting until data arrives (and optionally waiting until a given number of bytes is available to ensure large transfer sizes).

基於拉取方式的係統不足的地方是如果沒有任何數據, 消費者就要循環檢測, 使用空輪詢的繁忙檢測方式等候數據到來.為了避免這一點,我們可以設置拉請求的參數,允許消費者請求在“長輪詢”時阻塞,直到數據到達.

You could imagine other possible designs which would be only pull, end-to-end. The producer would locally write to a local log, and brokers would pull from that with consumers pulling from them. A similar type of “store-and-forward” producer is often proposed. This is intriguing but we felt not very suitable for our target use cases which have thousands of producers. Our experience running persistent data systems at scale led us to feel that involving thousands of disks in the system across many applications would not actually make things more reliable and would be a nightmare to operate. And in practice we have found that we can run a pipeline with strong SLAs at large scale without a need for producer persistence.

你可以想象一些其他從端到端的一些可能性設計. 生產者把記錄寫入到本地日誌中, 服務器將從消費者拉取的數據中拉取. 一種類似的儲存和轉發的生產者模型經常被提議. 這雖然挺有趣的, 但不適合有成千上萬生產者的情況. 在我們大規模運行數據儲存係統的經驗來看, 成千上萬的磁盤跨越多個應用並不讓係統更為可靠, 操作起來將會是一個噩夢. 在實踐中, 我們發現可以創建具有很強壯的SLAs保障的, 大規模的管道, 並且不需要提供者有持久化能力.

Keeping track of what has been consumed is, surprisingly, one of the key performance points of a messaging system.
令人驚訝的是,跟蹤已消耗的內容是消息傳遞係統的關鍵性能點之一.。

Most messaging systems keep metadata about what messages have been consumed on the broker. That is, as a message is handed out to a consumer, the broker either records that fact locally immediately or it may wait for acknowledgement from the consumer. This is a fairly intuitive choice, and indeed for a single machine server it is not clear where else this state could go. Since the data structures used for storage in many messaging systems scale poorly, this is also a pragmatic choice–since the broker knows what is consumed it can immediately delete it, keeping the data size small.

大部分的消息係統在服務器端記錄哪些消息被消費的元數據信息.  那就是, 消息被發送給消費者時, 服務器要麼在本地馬上記錄日誌, 要麼等待消費者反饋後記錄. 這樣的話相當不直觀, 事實上,對於一台服務器, 很難理清楚這個狀態到底去哪裏了. 因為在大部分的消息儲存係統中, 數據結構很難被擴展, 這也依賴於編程的語義, 如果服務器知道消息被消費後可以馬上刪除, 那麼就可以維持比較小的數據集.

What is perhaps not obvious is that getting the broker and consumer to come into agreement about what has been consumed is not a trivial problem. If the broker records a message as consumed immediately every time it is handed out over the network, then if the consumer fails to process the message (say because it crashes or the request times out or whatever) that message will be lost. To solve this problem, many messaging systems add an acknowledgement feature which means that messages are only marked as sent not consumed when they are sent; the broker waits for a specific acknowledgement from the consumer to record the message as consumed. This strategy fixes the problem of losing messages, but creates new problems. First of all, if the consumer processes the message but fails before it can send an acknowledgement then the message will be consumed twice. The second problem is around performance, now the broker must keep multiple states about every single message (first to lock it so it is not given out a second time, and then to mark it as permanently consumed so that it can be removed). Tricky problems must be dealt with, like what to do with messages that are sent but never acknowledged.

碰巧不太明顯的是, 讓服務器和消費者對已經消費的數據達成一致並不是一件簡單的事情. 如果服務器在每次數據分發出去後, 馬上標記消息已經被消費了, 如果消費者處理消息失敗了(例如宕機了), 那麼消息可能會丟失. 為了解決這個問題, 很多消息係統添加了反饋機製, 用於標記消息已經被發送, 而不是被消費, 服務器等待消費者發送一個反饋來確認消息已經 被消費. 這個策略解決消息丟失的問題, 但是同時也引發新的問題. 首先, 如果消費者已經消費了記錄, 但是在反饋時失敗, 則有可能重複消費兩次. 其次, 是多一個來回的性能損耗, 現在服務器就要為每個消息保存不同的狀態(先鎖定, 這樣不會發送第二次, 然後標記為永久消費後, 才能把它刪除). 還有些麻煩的問題需要處理, 比如消息被發送 了, 但是從來沒有接受到反饋.

Kafka handles this differently. Our topic is divided into a set of totally ordered partitions, each of which is consumed by exactly one consumer within each subscribing consumer group at any given time. This means that the position of a consumer in each partition is just a single integer, the offset of the next message to consume. This makes the state about what has been consumed very small, just one number for each partition. This state can be periodically checkpointed. This makes the equivalent of message acknowledgements very cheap.

kafka使用不一樣的處理方式, 主題被劃分成一係列有序的分區集合, 每個分區在一個時刻僅被訂閱分組中的一個消費者消費. 這意味這每個消費者在一個分區位置就隻是一個數值, 用於記錄下一次消息要被消費的位置. 這意味著記錄消費者狀態的代價非常小, 隻是每個分區一個數值. 這個狀態可以定期做檢查點, 這使等價的消息反饋代價非常小.

There is a side benefit of this decision. A consumer can deliberately rewind back to an old offset and re-consume data. This violates the common contract of a queue, but turns out to be an essential feature for many consumers. For example, if the consumer code has a bug and is discovered after some messages are consumed, the consumer can re-consume those messages once the bug is fixed.

這個方案還有另外的好處, 消費者可以優雅地重新指定一個舊的位移位置, 並重新消費數據. 這個和通常的隊列觀念有點相悖, 但是對很多消費者來說是一個很重要的特性. 例如,如果消費代碼有bug,並且在一些消息被消費後發現,一旦bug被修複,消費者可以重新使用這些消息.。

Offline Data Load 離線數據加載

Scalable persistence allows for the possibility of consumers that only periodically consume such as batch data loads that periodically bulk-load data into an offline system such as Hadoop or a relational data warehouse.
可擴展的持久性儲存能力, 使得消費者能定期批量把數據導入到離線係統中, 如:Hadoop 或關係型數據倉庫.

In the case of Hadoop we parallelize the data load by splitting the load over individual map tasks, one for each node/topic/partition combination, allowing full parallelism in the loading. Hadoop provides the task management, and tasks which fail can restart without danger of duplicate data—they simply restart from their original position.

在hadoop的例子中, 我們通過把數據分發到獨立的任務集中進行並行處理, 每個的單位是按服務器/主題/分區, 這樣可以允許很好的並發數據加載處理. Hadoop 提供任務管理, 任務可以在失敗是重新啟動, 而不用擔心會重複處理數據–隻需要簡單從他們原來處理的位置重新開始.

轉載自 並發編程網 - ifeve.com

最後更新:2017-05-19 10:24:32

  上一篇:go  《kafka中文手冊》- 構架設計(二)
  下一篇:go  《OSGi官方文檔》使用OSGi的好處