閱讀971 返回首頁    go 阿裏雲 go 技術社區[雲棲]


kudu

Kudu White Paper

https://www.cloudera.com/documentation/betas/kudu/0-5-0/topics/kudu_resources.html

https://getkudu.io/overview.html

 

Kudu is a new storage system designed and implemented from the ground up to fill this gap between high-throughput sequential-access storage systems such as HDFS[27] and low-latency random-access systems such as HBase or Cassandra.

Kudu的定位很明確,就是要填補高吞吐的順序讀與低延遲的隨機讀之間的gap,即,要balance,滿足對兩方麵都有需求的業務;

 

In particular, Kudu o ers a simple API for row-level inserts, updates, and deletes, while providing table scans at throughputs similar to Parquet, a commonly-used columnar format for static data.

什麼是Parquet,一種支持嵌套的列存格式實現,基於Dremel的算法

深入分析Parquet列式存儲格式

Kudu即提供簡單的基於行的讀寫API,也提供基於列存的掃表的支持

 

Kudu at a high level

 

Tables and schemas

Kudu is a storage system for tables of structured data.

A Kudu cluster may have any number of tables, each of which has a well-defined schema consisting of a finite number of columns.

Each such column has a name, type (e.g INT32 or STRING) and optional nullability.

Some ordered subset of those columns are specified to be the table's primary key.

The primary key enforces a uniqueness constraint (at most one row may have a given primary key tuple) and acts as the sole index by which rows may be effieciently updated or deleted.

上麵這些等同於關係型數據庫,除了主鍵作為唯一索引,因為kudu不支持二級索引

 

Unlike most relational databases, Kudu does not currently offer secondary indexes or uniqueness constraints other than the primary key.

Currently, Kudu requires that every table has a primary key defined, though we anticipate that a future version will add automatic generation of surrogate keys.

不支持二級索引,必須要定義主鍵

 

Write operations

After creating a table, the user mutates the table using Insert, Update, and Delete APIs.

Currently, Kudu does not offer any multi-row transactional APIs: each mutation conceptually executes as its own transaction, despite being automatically batched with other mutations for better performance. 
Modifications within a single row are always executed atomically across columns.

寫操作,不支持多行transactions;保證單行的操作原子性

 

Read operations

Kudu offers only a Scan operation to retrieve data from a table. 
On a scan, the user may add any number of predicates to filter the results. 
Currently, we offer only two types of predicates: comparisons between a column and a constant value, and composite primary key ranges.

kudu提供Scan來掃表,用戶可以增加任意個數的謂詞來過濾結果,當前隻支持兩種謂詞,將column和常量比較,或組合的組件range

In addition to applying predicates, the user may specify a projection for a scan. 
A projection consists of a subset of columns to be retrieved. 
Because Kudu's on-disk storage is columnar, specifying such a subset can substantially improve performance for typical analytic workloads.

除了謂詞,還能使用projection,因為kudu是列存,所以進行projection會大大提高效率

 

Other APIs

In addition to data path APIs, the Kudu client library offers other useful functionality. 
In particular, the Hadoop ecosystem gains much of its performance by scheduling for data locality. 
Kudu provides APIs for callers to determine the mapping of data ranges to particular servers to aid distributed execution frameworks such as Spark, MapReduce, or Impala in scheduling.

kudu還提供其它的API,比如提供mapping of data ranges to particular servers,來便於locality

 

Consistency Model

Kudu provides clients the choice between two consistency modes. 
The default consistency mode is snapshot consistency
A scan is guaranteed to yield a snapshot with no anomalies in which causality would be violated. 
As such, it also guarantees read-your-writes consistency from a single client.

snapshot consistency,保證單個client的讀寫一致性,即,單個client,讀寫是可以保證順序和因果關係的

By default, Kudu does not provide an external consistency guarantee.

比如,1完成寫入a,發送消息給2,2收到消息後寫入b,從c看來,隻看到b,而沒有看到a

雖然a在時序上是先的,但由於在不同的client,所以是無法保證這種外部一致性的

Kudu offers the option to manually propagate timestamps between clients: 
after performing a write, the user may ask the client library for a timestamp token. 
This token may be propagated to another client through the external channel, and passed to the Kudu API on the other side, thus preserving the causal relationship between writes made across the two clients.

kudu提供一種可選的手工的在clients之間傳播timestamps的方式:

執行寫操作時,用戶會向client library要一個timestamp token,這個token會被附帶的通過外部的channel傳遞給另一個client,另一個client在調用Kudu API的時候,會帶上這個token,這樣kudu就知道兩個寫操作之間的先後順序

If propagating tokens is too complex, Kudu optionally uses commit-wait as in Spanner[14]. 
After performing a write with commit-wait enabled, the client may be delayed for a period of time to ensure that any later write will be causally ordered correctly. Absent specialized time-keeping hardware, this can introduce signi cant latencies in writes (100-1000ms with default NTP con gurations), so we anticipate that a minority of users will take advantage of this option.

如果嫌propagating tokens太麻煩,kudu可以使用類似spanner的commit-wait,但是由於沒有專門的硬件,在使用NTP的條件下,寫延遲會達到100-1000ms,估計沒啥人用

Given this, it is plausible that within a few years, cloud providers will offer tight global time synchronization as a differentiating service.

The assignment of operation timestamps is based on a clock algorithm termed HybridTime[15]. 
參考,HybridTime - Accessible Global Consistency with High Clock Uncertainty

 

Timestamps

Although Kudu uses timestamps internally to implement concurrency control, Kudu does not allow the user to manually set the timestamp of a write operation. This differs from systems such as Cassandra and HBase, which treat the timestamp of a cell as a first-class part of the data model.

We do, however, allow the user to specify a timestamp for a read operation. 
This allows the user to perform point-in-time queries in the past, as well as to ensure that different distributed tasks that together make up a single query" (e.g. as in Spark or Impala) read a consistent snapshot.

kudu將timestamps用於實現並發控製,所以在寫操作的時候,不允許用戶手工set timestamp

但在讀操作中,用戶可以指定timestamp,來讓用戶執行point-in-time queries;

 

Architecture

Following the design of BigTable and GFS[18] (and their open-source analogues HBase and HDFS), Kudu relies on a single Master server, responsible for metadata, and an arbitrary number of Tablet Servers, responsible for data.

 

Partitioning

As in most distributed database systems, tables in Kudu are horizontally partitioned. Kudu, like BigTable, calls these horizontal partitions tablets.

Any row may be mapped to exactly one tablet based on the value of its primary key, thus ensuring that random access operations such as inserts or updates 
affect only a single tablet.

For large tables where throughputis important, we recommend on the order of 10-100 tablets per machine. Each tabletcan be tens of gigabytes.

 

Unlike BigTable, which offers only key-range-based partitioning, and unlike Cassandra, which is nearly always deployed with hash-based partitioning, Kudu supports a flexible array of partitioning schemes.

kudu的partition策略比較flexible,可以同時兼顧hash-based和key-range-based

When creating a table, the user specifies a partition schema for that table. 
The partition schema acts as a function which can map from a primary key tuple into a binary partition key.

當創建一個table,我們指定一個partition schema,即將primary key轉換成partition key的function

Each tablet covers a contiguous range of these partition keys. Thus, a client, when performing a read or write, can easily determine which tablet should hold the given key and route the request accordingly.

每個tablet都會cover一個連續的partitions key的range,所以client在執行讀寫的時候,可以很容易找到到底要去哪個tablet去讀

The partition schema is made up of zero or more hash-partitioning rules followed by an optional range-partitioning rule:

hash-partitioning rule consists of a subset of the primary key columns and a number of buckets. 
For example, as expressed in our SQL dialect, DISTRIBUTE BY HASH(hostname, ts) INTO 16 BUCKETS. 
These rules convert tuples into binary keys by first concatenating the values of the specified columns, and then computing the hash code of the resulting string modulo the requested number of buckets. This resulting bucket number is encoded as a 32-bit big-endian integer in the resulting partition key.

range-partitioning rule consists of an ordered subset of the primary key columns. 
This rule maps tuples into binary strings by concatenating the values of the specified columns using an order-preserving encoding.

Partition schema可以由0+個hash-partitioning rules和可選的range-partitioning rule組成

 

By employing these partitioning rules, users can easily trade off between query parallelism and query concurrency based on their particular workload.

For example, consider a time series application which stores rows of the form (host, metric, time, value) and in which inserts are almost always done with monotonically increasing time values. Choosing to hash-partition by timestamp optimally spreads the insert load across all servers; however, a query for a specific metric on a specific host during a short time range must scan all tablets, limiting concurrency.

如果隻是用hash-partition,無法應對short time range的查詢

A user might instead choose to range-partition by timestamp while adding separate hash partitioning rules for the metric name and hostname, which 
would provide a good trade-off of parallelism on write and concurrency on read.

做法是選擇對於timestamp進行range-partition,並且對metric name and hostname增加hash partitioning

Although this exibility in partitioning is relatively unique in the NoSQL space, it should be quite familiar to users and administrators of analytic MPP database 
management systems.

 

Replication

In order to provide high availability and durability while running on large commodity clusters, Kudu replicates all of its table data across multiple machines.

Kudu employs the Raft consensus algorithm to replicate its tablets. 
In particular, Kudu uses Raft to agree upon a logical log of operations (e.g. insert/update/delete) for each tablet.

When a client wishes to perform a write, it first locates the leader replica (see Section 3.4.3) and sends a Write RPC to this replica.

If the client's information was stale and the replica is no longer the leader, it rejects the request, causing the client to invalidate and refresh its metadata cache and resend the request to the new leader.

If the replica is in fact still acting as the leader, it employs a local lock manager to serialize the operation against other concurrent operations, picks an MVCC timestamp, and proposes the operation via Raft to its followers. 
If a majority of replicas accept the write and log it to their own local write-ahead logs, the write is considered durably replicated and thus can be committed on 
all replicas.

kudu提供多複本機製,並且通過Raft來保證複本之間的一致性,特別是用於同步複本間operations (e.g. insert/update/delete)的邏輯日誌

client想要寫,會先找到leader replica,然後發起一個寫RPC;如果client meta過期,leader已經遷移,那麼需要更新meta data,並把寫請求發給新的leader

然後leader replica,會用local的鎖管理,來保證concurrent operations的順序執行,並pick一個MVCC時間戳,並通過Raft將operations發送給followers

如果大部分replicas接受該寫操作,並寫入他們自己的local write-ahead logs,就認為這次寫操作被認為durably replicated,並且commit這次寫操作

 

In the case of a failure of a minority of replicas, the leader can continue to propose and commit operations to the tablet's replicated log. 
If the leader itself fails, the Raft algorithm quickly elects a new leader. By default, Kudu uses a 500-millisecond heartbeat interval and a 1500-millisecond election timeout; thus, after a leader fails, a new leader is typically elected within a few seconds.

如果少量的replicas失敗,leader還可以繼續propose或commit操作;並且如果leader fail,Raft算法會重新選出一個新的leader;

 

Kudu implements some minor improvements on the Raft algorithm. In particular: 
1. As proposed in [19] we employ an exponential back-off algorithm after a failed leader election. We found that, as we typically commit Raft's persistent metadata to contended hard disk drives, such an extension was necessary to ensure election convergence on busy clusters.

2. When a new leader contacts a follower whose log diverges from its own, Raft proposes marching backward one operation at a time until discovering the point where they diverged. Kudu instead immediately jumps back to the last known committedIndex, which is always guaranteed to be present on any divergent follower. This minimizes the potential number of round trips at the cost of potentially sending redundant operations over the network. 
We found this simple to implement, and it ensures that divergent operations are aborted after a single round-trip.

 

Kudu不會對tablet本身做replica,而僅僅對operation log做replica 
這樣tablet每個replica的物理存儲都是完全的decoupled,這樣有如下的好處,

這怎麼理解?

不是說tablet沒有replica,而是這些replica不會顯式同步的,是全解耦的,獨立的 
Kudu隻會去同步operation log,當大部分follower,把operation寫入operation log,就認為operation成功 
而後續每個follower都獨立根據operation log去更新每個replica

Kudu does not replicate the on-disk storage of a tablet, but rather just its operation log. 
The physical storage of each replica of a tablet is fully decoupled. This yields several advantages:

When one replica is undergoing physical-layer background operations such as flushes or compactions (see Section 4), it is unlikely that other nodes are operating on the same tablet at the same time. 
Because Raft may commit after an acknowledgment by a majority of replicas, this reduces the impact of such physical-layer operations on the tail latencies experienced by clients for writes. 
In the future, we anticipate implementing techniques such as the speculative read requests described in [16] to further decrease tail latencies for reads in concurrent read/write workloads.

我的理解,在replica做物理層的flush或compaction的時候,其他node是無法同時操作這個tablet replica的 
而operating log的同步是不會影響的,所以不會影響Raft的majority acknowledgment

During development, we discovered some rare race conditions in the physical storage layer of the Kudu tablet. 
Because the storage layer is decoupled across replicas, none of these race conditions resulted in unrecoverable data loss: 
in all cases, we were able to detect that one replica had become corrupt (or silently diverged from the majority) and repair it.

物理存儲層的race conditions或單replica crash不會導致數據丟失

 

Configuration Change

Kudu implements Raft configuration change following the one-by-one algorithm proposed in [24]. 
In this approach, the number of voters in the Raft configuration may change by at most one in each configuration change. 
In order to grow a 3-replica con guration to 5 replicas, two separate configuration changes (3->4, 4->5) must be proposed and committed.

Raft的配置是one by one提交的,比如3 replica到5 replica,需要提交兩次

Kudu implements the addition of new servers through a process called remote bootstrap
In our design, in order to add a new replica, we first add it as a new member in the Raft configuration, even before notifying the destination server that a new replica will be copied to it. 
When this configuration change has been committed, the current Raft leader replica triggers a StartRemoteBootstrap RPC, which causes the destination server to pull a snapshot of the tablet data and log from the current leader. 
When the transfer is complete, the new server opens the tablet following the same process as after a server restart. 
When the tablet has opened the tablet data and replayed any necessary write-ahead logs, it has fully replicated the state of the leader at the time it began the transfer, and may begin responding to Raft RPCs as a fully-functional replica.

如何增加一個新的replica,過程稱為remote bootstrap,做法就是common的方式,能想象到的

In our current implementation, new servers are added immediately as VOTER replicas. 
This has the disadvantage that, after moving from a 3-server configuration to a 4-server configuration, three out of the four servers must acknowledge each operation. 
Because the new server is in the process of copying, it is unable to acknowledge operations. If another server were to crash during the snapshot-transfer process, the tablet would become unavailable for writes until the remote 
bootstrap finished.

當前增加一個replica,會立刻變為Voter,這個有問題的 
比如,配置從3 replica 到4 replica,此時新的replica還沒有完成初始化,但由於replica增加,現在需要3 replica ack才能成功,如果其中任意replica crash,會導致寫失敗

 

To address this issue, we plan to implement a PRE VOTER replica state. 
In this state, the leader will send Raft updates and trigger remote bootstrap on the target replica, but not count it as a voter when calculating the size of the configuration's majority. 
Upon detecting that the PRE VOTER replica has fully caught up to the current logs, the leader will automatically propose and commit another configuration change to transition the new replica to a full VOTER.

解決的方法很簡單,就是增加PRE VOTER狀態,當replica完全ready以後,再切換成Voter狀態

 

When removing replicas from a tablet, we follow a similar approach: 
the current Raft leader proposes an operation to change the configuration to one that does not include the node to be evicted. 
If this is committed, then the remaining nodes will no longer send messages to the evicted node, though the evicted node will not know that it has been removed. 
When the configuration change is committed, the remaining nodes report the configuration change to the Master, which is responsible for cleaning up the orphaned replica (see Section 3.4.2).

如何刪除一個replica?過程也比較common

 

The Kudu Master

Kudu's central master process has several key responsibilities: 
1. Act as a catalog manager, keeping track of which tables and tablets exist, as well as their schemas, desired replication levels, and other metadata. 
When tables are created, altered, or deleted, the Master coordinates these actions across the tablets and ensures their eventual completion. 
2. Act as a cluster coordinator, keeping track of which servers in the cluster are alive and coordinating redistribution of data after server failures. 
3. Act as a tablet directory, keeping track of which tablet servers are hosting replicas of each tablet.

 

Catalog Manager

The master itself hosts a single-tablet table which is restricted from direct access by users. 
The master internally writes catalog information to this tablet, while keeping a full write through cache of the catalog in memory at all times.

master會創建一個內存中的表用於存放目錄元數據,這種設計是可能有擴展性問題的,但是當前由於元數據比較小,問題不大

The catalog table maintains a small amount of state for each table in the system. 
In particular, it keeps the current version of the table schema, the state of the table (creating, running, deleting, etc), and the set of tablets which comprise the table.

catalog table包含,table schema當前的版本,table的狀態(creating, running, deleting, etc),以及table所包含的tablets集合

The master services a request to create a table by first writing a table record to the catalog table indicating a CREATING state. 
Asynchronously, it selects tablet servers to host tablet replicas, creates the Master-side tablet metadata, and sends asynchronous requests to create the replicas on the tablet servers. 
If the replica creation fails or times out on a majority of replicas, the tablet can be safely deleted and a new tablet created with a new set of replicas.  
If the Master fails in the middle of this operation, the table record indicates that a roll-forward is necessary and the master can resume where it left off.

table創建的過程,先往catalog table裏麵寫條table record,並將狀態設為,CREATING 
然後選擇一堆tablet servers來放他的tablet replicas,於是先把tablets的metadata,存到catalog裏麵,最後異步的發送請求給各個tablet servers去創建replica 
如果某個replica失敗,隻需要重新選擇創建一個tablet 
如果在過程中master fails,由於在catalog的已經記錄下來了,後續master resume後還能繼續

A similar approach is used for other operations such as schema changes and deletion, where the Master ensures that the change is propagated to the relevant tablet servers before writing the new state to its own storage. 
In all cases, the messages from the Master to the tablet servers are designed to be idempotent, such that on a crash and restart, they can be safely resent.

其他操作的過程是大致相同,比如schema changes and deletion,但我們需要保證從Master到tablet servers的message都是idempotent的,即crash and restart後,可以反複重複發的

Because the catalog table is itself persisted in a Kudu tablet, the Master supports using Raft to replicate its persistent state to backup master processes. 
Currently, the backup masters act only as Raft followers and do not serve client requests. Upon becoming elected leader by the Raft algorithm, a backup master scans its catalog table, loads its in-memory cache, and begins acting as an active master following the same process as a master restart.

當然catalog table本身是Kudu tablet,所以也可以用Raft來保證replicas之間的一致性; 
但backup master無法處理client requests,隻有被raft選為leader,backup master才會把catalog table加載到內存,從而成為真正的master

 

Cluster Coordination

Each of the tablet servers in a Kudu cluster is statically configured with a list of host names for the Kudu masters. 
Upon startup, the tablet servers register with the Masters and proceed to send tablet reports indicating the total set of tablets which they are hosting. 
The first such tablet report contains information about all tablets. 
All future tablet reports are incremental, only containing reports for tablets that have been newly created, deleted, or modified (e.g. processed a schema change or Raft configuration change).

tablet servers是通過靜態配置一係列hosts列表,來讓master知道 
完成啟動後,tablets server會到master上完成注冊,並會定期發送他hosting的所有的tablets的report

第一次的report是全量的,後續的report會是增量的 

A critical design point of Kudu is that, while the Master is the source of truth about catalog information, it is only an observer of the dynamic cluster state. 
The tablet servers themselves are always authoritative about the location of tablet replicas, the current Raft configuration, the current schema version of a tablet, etc. 
Because tablet replicas agree on all state changes via Raft, every such change can be mapped to a specific Raft operation index in which it was committed. 
This allows the Master to ensure that all tablet state updates are idempotent and resilient to transmission delays: the Master simply compares the Raft operation index of a tablet state update and discards it if the index is not newer than the Master's current view of the world.

Kudu的一個核心的設計思路是,master是作為一個觀察者,而非一個決策者,雖然他catalog表中保存了所有的tablets的元數據 
tablets所做的所有change都是通過Raft協議來決策的,每個change都有對應到一個Raft operation index 
所以master可以保證所有tablet狀態的更新是冪等的,並可以接受傳輸延遲,因為master隻要發現Raft operation index,比master當前的狀態要老,就可以discard掉

 

This design choice leaves much responsibility in the hands of the tablet servers themselves.

For example, rather than detecting tablet server crashes from the Master, Kudu instead delegates that responsibility to the Raft LEADER replicas of any tablets with replicas on the crashed machine. 
The leader keeps track of the last time it successfully communicated with each follower, and if it has failed to communicate for a significant period of time, it declares the follower dead and proposes a Raft configuration change to evict the follower from the Raft configuration. When this configuration change is successfully committed, the remaining tablet servers will issue a tablet report to the Master to advise it of the decision made by the leader.

這樣設計會將很多責任推給tablet servers,比如,發現tablet servers crash不是由master,而是由Raft LEADER replicas,leader當發現和follower無法通信的時候,就會認為該follower dead,並通過Raft過程去修改配置,當修改成功後,通知master即可

 

In order to regain the desired replication count for the tablet, the Master selects a tablet server to host a new replica based on its global view of the cluster. 
After selecting a server, the Master suggests a configuration change to the current leader replica for the tablet. However, the Master itself is powerless to change a tablet configuration - it must wait for the leader replica to propose and commit the configuration change operation, at which point the Master is notified of the configuration change's success via a tablet report. 
If the Master's suggestion failed (e.g. because the message was lost) it will stubbornly retry periodically until successful. 
Because these operations are tagged with the unique index of the degraded configuration, they are fully idempotent and conflict-free, even if the Master issues several conflicting suggestions, as might happen soon after a master fail-over.

當一個tablet server crash,Raft LEADER完成配置更改後,Master為了保證desired replication count,需要根據globe view重新選一個tablet server 來放新replica 
但是Master本身是沒有權利去改配置的,他隻能建議Raft leader去change配置,如果leader change成功會通過report通知master 
如果失敗,它會一直嚐試,因為這個request是冪等的,所以發多次不會有問題。。。

 

The master responds similarly to extra replicas of tablets. 
If the Master receives a tablet report which indicates that a replica has been removed from a tablet configuration, it stubbornly sends DeleteTablet RPCs to the removed node until the RPC succeeds. 
To ensure eventual cleanup even in the case of a master crash, the Master also sends such RPCs in response to a tablet report which identifies that a tablet server is hosting a replica which is not in the newest committed Raft configuration.

當發現一個replica已經被刪除或多餘時,過程也是一樣的

 

Tablet Directory

In order to efficiently perform read and write operations without intermediate network hops, clients query the Master for tablet location information. 
Clients are “thick" and maintain a local metadata cache which includes their most recent information about each tablet they have previously accessed, including the tablet's partition key range and its Raft configuration. 
At any point in time, the client's cache may be stale; if the client attempts to send a write to a server which is no longer the leader for a tablet, the server will reject the request. The client then contacts the Master to learn about the new leader. In the case that the client receives a network error communicating with its presumed leader, it follows the same strategy, assuming that the tablet has likely elected a new leader.

clients會從master去同步metadata,並且作為thick client會去cache,然後就不需要訪問master,而直接訪問tablet server,當然如果metadata發生變化,client會訪問失敗,這是會再次去同步metadata

In the future, we plan to piggy-back the current Raft configuration on the error response if a client contacts a non-leader replica. 
This will prevent extra round-trips to the master after leader elections, since typically the followers will have up-to-date information.

後麵,會試圖在error response中隨便返回Raft configuration,這樣client就不需要再去查一次了,因為follower應該是知道最新的信息的

 

Tablet storage

Within a tablet server, each tablet replica operates as an entirely separate entity, significantly decoupled from the partitioning and replication systems described in sections 3.2 and 3.3. 
During development of Kudu, we found that it was convenient to develop the storage layer somewhat independently from the higher-level distributed system, and in fact many of our functional and unit tests operate entirely within the confines of the tablet implementation. 
Due to this decoupling, we are exploring the idea of providing the ability to select an underlying storage layout on a per-table, per-tablet or even per-replica basis - a distributed analogue of Fractured Mirrors, as proposed in [26]. However, we currently offer only a single storage layout, described in this section.

在tablet server,每個tablet replica是作為一個完全separate的存在,是和higher-level 的分布式係統decoupled 的,你可以對tablet replica進行單獨的UT 
這樣的好處是,你甚至可以為per-table, per-tablet or even per-replica basis去選擇不同的storage layout

 

Overview

The implementation of tablet storage in Kudu addresses several goals: 
1. Fast columnar scans - In order to provide analytic performance comparable to best-of-breed immutable data formats such as Parquet and ORCFile[7], it's critical that the majority of scans can be serviced from efficiently encoded columnar data files. 
2. Low-latency random updates - In order to provide fast access to update or read arbitrary rows, we require O(lg n) lookup complexity for random access. 
3. Consistency of performance - Based on our experiences supporting other data storage systems, we have found that users are willing to trade o peak performance in order to achieve predictability. 
In order to provide these characteristics simultaneously, Kudu does not reuse any pre-existing storage engine, but rather chooses to implement a new hybrid columnar store architecture.

為了實現tablet storage,有以下幾個目標:

1. 快速的columnar scans,為了滿足analytic performance 
2. 低延遲的隨機更新 
3. Consistency of performance ,怎麼翻?根據經驗,用戶往往希望通過犧牲peak性能來達到更多的可預見性,所以我沒有用現有的storage engine,而是重新實現一套hybrid columnar store architecture。

 

RowSets

Tablets in Kudu are themselves subdivided into smaller units called RowSets
Some RowSets exist in memory only, termed MemRowSets, while others exist in a combination of disk and memory, termedDiskRowSets
Any given live (not deleted) row exists in exactly one RowSet; thus, RowSets form disjoint sets of rows. However, note that the primary key intervals of different RowSets may intersect.

At any point in time, a tablet has a single MemRowSet which stores all recently-inserted rows. Because these stores are entirely in-memory, a background thread periodically flushes MemRowSets to disk. 
The scheduling of these flushes is described in further detail in Section 4.11. 
When a MemRowSet has been selected to be flushed, a new, empty MemRowSet is swapped in to replace it. The previous MemRowSet is written to disk, and becomes one or more DiskRowSets.

This flush process is fully concurrent: readers can continue to access the old MemRowSet while it is being flushed, and updates and deletes of rows in the flushing MemRowSet are carefully tracked and rolled forward into the on-disk data upon completion of the flush process.

 

Tablets可以分為更小的單元,RowSets,兩種類型,放在內存的,MemRowSets;和放在磁盤的DiskRowSets,這個和Hbase的思路一樣

每一個live的行都會exactly的存在於某一個RowSet中,所以RowSets 是一係列不相交的rows的集合;但是主鍵的範圍會相交

可以想象,MemRowSet 會定期不停的flush到DiskRowSets,並且flush過程是concurrent,不會影響readers對老MemRowSet 的訪問

MemRowSet Implementation

MemRowSets are implemented by an in-memory concurrent B-tree with optimistic locking, broadly based off the design of MassTree[22], with the following changes:

1. We do not support removal of elements from the tree. Instead, we use MVCC records to represent deletions.

MemRowSets eventually flush to other storage, so we can defer removal of these records to other parts of the system.

2. Similarly, we do not support arbitrary in-place updates of records in the tree. Instead, we allow only modifications which do not change the value's size: this permits atomic compare-and-swap operations to append mutations to a 
per-record linked list.

3. We link together leaf nodes with a next pointer, as in the B+-tree[13]. This improves our sequential scan performance, a critical operation.

4. We do not implement the full ”trie of trees", but rather just a single tree, since we are less concerned about extremely high random access throughput compared to the original application.

MemRowSets,是以在內存中加上樂觀鎖的B-tree來實現的,基於MassTree 
1. 不支持刪除,用MVCC 
2. 不支持arbitrary in-place updates,permits atomic compare-and-swap operations to append mutations to aper-record linked list 
3. 在葉節點間增加,next pointer,來提供scan的性能 
4. 不用多樹,而用single tree,提高random access 的throughput

 

In order to optimize for scan performance over random access, we use slightly larger internal and leaf nodes sized at four cache-lines (256 bytes) each. 
Unlike most data in Kudu, MemRowSets store rows in a row-wise layout. This still provides acceptable performance, since the data is always in memory. 
To maximize throughput despite the choice of row storage, we utilize SSE2 memory prefetch instructions to prefetch one leaf node ahead of our scanner, and JIT-compile record projection operations using LLVM[5]. These optimizations provide significant performance boosts relative to the naive implementation. 
In order to form the key for insertion into the B-tree, we encode each row's primary key using an order-preserving encoding as described in Section 3.2. This allows efficient tree traversal using only memcmp operations for comparison, and the sorted nature of the MemRowSet allows for efficient scans over primary key ranges or individual key lookups.

為了在random acess的基礎上優化批量scan的性能,我們適當的加大internal and leaf nodes的大小到256k; 
而且不像kudu中的大部分數據,在MemRowSets中的數據是行存的,由於數據在內存中,讀取很快,所以行存的性能是可以接受的 
並且為了最大化throughput,使用SSE2指令集中的內存預取指令在scanner之前預先讀出葉節點,並且用LLVM編譯器框架來JIT complie記錄的projection操作;這些優化極大的提升了性能

LLVM每日談之一 LLVM是什麼

深入淺出 JIT 編譯器

為了產生插入到B-tree中的key,我們通過order-preserving encoding的方式將primary key 進行encoding;這樣會使遍曆樹很高效,僅僅通過memcmp 操作進行比較即可;

 

DiskRowSet Implementation

When MemRowSets flush to disk, they become DiskRowSets. 
While flushing a MemRowSet, we roll the DiskRowSet after each 32 MB of IO. This ensures that no DiskRowSet is too large, thus allowing efficient incremental compaction as described later in Section 4.10. Because a MemRowSet is in sorted order, the flushed DiskRowSets will themselves also be in sorted order, and each rolled segment will have a disjoint interval of primary keys.

MemRowSets 每32M會flush到一個DiskRowSet,這樣保證DiskRowSet 不會太大;MemRowSet 是有序的,所以DiskRowSets 也是有序的,並且每個rolled segment 主鍵不相交

A DiskRowSet is made up of two main components: base data and delta stores.

DiskRowSet 分為base data 和 delta stores 
The base data is a column-organized representation of the rows in the DiskRowSet. 
Each column is separately written to disk in a single contiguous block of data. 
The column itself is subdivided into small pages to allow for granular random reads, and an embedded B-tree index allows efficient seeking to each page based on its ordinal offset within the rowset. Column pages are encoded using a variety of encodings, such as dictionary encoding, bitshue[23], or front coding, and is optionally compressed using generic binary compression schemes such as LZ4, gzip, or bzip2.

These encodings and compression options may be specified explicitly by the user on a per-column basis, for example to designate that a large infrequently-accessed text column should be gzipped, while a column that typically stores small integers should be bit-packed. Several of the page formats supported 
by Kudu are common with those supported by Parquet, and our implementation shares much code with Impala's Parquet library.

In addition to flushing columns for each of the user-specified columns in the table, we also write a primary key index column, which stores the encoded primary key for each row. We also flush a chunked Bloom filter[10] which can be used to test for the possible presence of a row based on its encoded 
primary key.

base data就是將rows按column-organized 進行存儲; 
column會被分成small pages 以允許細粒度的隨機讀,embedded的B-tree索引讓我們可以很容易的在rowset中找到這個page; 
column可以被用各種格式encoding,或選擇各種壓縮,取決於應用的場景。

除了寫入column本身,還需要寫入row主鍵的索引列,以及相應的bloom filter來快速發現是否包含某行的primary key

Because columnar encodings are diffcult to update in place, the columns within the base data are considered immutable once flushed. 
Instead, updates and deletes are tracked through structures termed delta stores. Delta stores are either in-memory DeltaMemStores, or on-disk DeltaFiles.

A DeltaMemStore is a concurrent B-tree which shares the implementation described above.

A DeltaFile is a binary-typed column block. In both cases, delta stores maintain a mapping from (row offset, timestamp) tuples to RowChange-List records. The row offset is simply the ordinal index of a row within the RowSet - for example, the row with the lowest primary key has offset 0. The timestamp is the MVCC timestamp assigned when the operation was originally written. The RowChangeList is a binary-encoded list of changes to a row, for example indicating SET column id 3 = `foo' or DELETE.

When servicing an update to data within a DiskRowSet, 
we first consult the primary key index column. By using its embedded B-tree index, we can efficiently seek to the page containing the target row. 
Using page-level metadata, we can determine the row offset for the first cell within that page. By searching within the page (eg via in-memory binary search) 
we can then calculate the target row's offset within the entire DiskRowSet. Upon determining this offset, we insert a new delta record into the rowset's DeltaMemStore.

base data一旦flushed,就認為是不可變更的;所以update和delete就需要通過delta stores來存儲,delta stores也分為DeltaMemStores和DeltaFiles 
delta stores通過maintain一個(row offset, timestamp)的mapping的RowChange-List。注意這裏是row offset,而不是row主鍵

所以後麵當你做更新的時候,先通過row主鍵找到相應的rowset,然後通過embedded B-tree找到相應的page,然後再在page裏麵通過二分查找找到對應的column,這樣可以算出對於整個DiskRowSet的offset是多少,然後用這個offset去插入一條delta數據到DeltaMemStore

 

Delta Flushes

Because the DeltaMemStore is an in-memory store, it has finite capacity. 
The same background process which schedules flushes of MemRowSets also schedules flushes of DeltaMemStores. 
When  flushing a DeltaMemStore, a new empty store is swapped in while the existing one is written to disk and becomes a DeltaFile. 
A DeltaFile is a simple binary column which contains an immutable copy of the data that was previously in memory.

 

INSERT path

As described previously, each tablet has a single MemRowSet which is holds recently inserted data; 
however, it is not sufficient to simply write all inserts directly to the current MemRowSet, since Kudu enforces a primary key uniqueness constraint. 
In other words, unlike many NoSQL stores, Kudu differentiates INSERT from UPSERT.

In order to enforce the uniqueness constraint, Kudu must consult all of the existing DiskRowSets before inserting the new row. 
Because there may be hundreds or thousands of DiskRowSets per tablet, it is important that this be done efficiently, both by culling the number of DiskRowSets to consult and by making the lookup within a DiskRowSet efficient.

In order to cull the set of DiskRowSets to consult on an INSERT operation, each DiskRowSet stores a Bloom filter of the set of keys present. 
Because new keys are never inserted into an existing DiskRowSet, this Bloom filter is static data. 
We chunk the Bloom filter into 4KB pages, each corresponding to a small range of keys, and index those pages using an immutable B-tree structure. These pages as well as their index are cached in a server-wide LRU page cache, ensuring that most Bloom filter accesses do not require a physical disk seek.

Additionally, for each DiskRowSet, we store the minimum and maximum primary key, and use these key bounds to index the DiskRowSets in an interval tree. This further culls the set of DiskRowSets to consult on any given key lookup. 
A background compaction process, described in Section 4.10 reorganizes DiskRowSets to improve the effectiveness of the interval tree-based culling. 
For any DiskRowSets that are not able to be culled, we must fall back to looking up the key to be inserted within its encoded primary key column. This is done via the embedded B-tree index in that column, which ensures a logarithmic number of disk seeks in the worst case. Again, this data access is performed through the page cache, ensuring that for hot areas of key space, no physical disk seeks are needed.

由於kudu強製的主鍵唯一約束,所以在insert前,必須知道這個key是否存在 
論文中說,所以在insert前,必須要問所有的DiskRowSets,而DiskRowSets 很多,所以為了效率,每個DiskRowSets 都會存儲一個bloom filter; 
這裏將bloom filter chunk成4kb的page(這樣每個bl都隻需要讀一個page cache),每個隻對於小範圍的keys,用B-tree來索引這些bloom filter pages;

這些pages像其他索引一樣會被cache在server端的LRU page cache中;這樣來保證大部分對bloom filter的訪問,不需要訪問磁盤;

同時每個DiskRowSet都會記錄minimum and maximum primary key,作為進一步的篩選條件

這樣設計的目的,

因為bloom filter,判無是精確的,而判有是有可能錯的

所以我們用bl隻是篩選掉大部分判無的case,而對於判有的case,需要真正去search看看是不是確實有

 

Read path

Similar to systems like X100[11], Kudu's read path always operates in batches of rows in order to amortize function call cost and provide better opportunities for loop unrolling and SIMD instructions.

Kudu's in-memory batch format consists of a top-level structure which contains pointers to smaller blocks for each column being read. Thus, the batch itself is columnar in memory, which avoids any offset calculation cost when copying from columnar on-disk stores into the batch.

When reading data from a DiskRowSet, Kudu first determines if a range predicate on the scan can be used to cull the range of rows within this DiskRowSet. 
For example, if the scan has set a primary key lower bound, we perform a seek within the primary key column in order to determine a lower bound row offset; we do the same with any upper bound key. 
This converts the key range predicate into a row offset range predicate, which is simpler to satisfy as it requires no expensive string comparisons.

Next, Kudu performs the scan one column at a time. 
First, it seeks the target column to the correct row offset (0, if no predicate was provided, or the start row, if it previously determined a lower bound). 
Next, it copies cells from the source column into our row batch using the page-encoding specific decoder. 
Last, it consult the delta stores to see if any later updates have replaced cells with newer versions, based on the current scan's MVCC snapshot, applying those changes to our in-memory batch as necessary.

Because deltas are stored based on numerical row offsets rather than primary keys, this delta application process is extremely effcient: it does not require any per-row branching or expensive string comparisons.

這是deltas為何使用row offsets而不是主鍵的原因

After performing this process for each row in the projection, it returns the batch results, which will likely be copied into an RPC response and sent back to the client. 
The tablet server maintains stateful iterators on the server side for each scanner so that successive requests do not need to re-seek, but rather can continue from the previous point in each column file.

 

Lazy Materialization

If predicates have been specified for the scanner, we perform lazy materialization[9] of column data. In particular, we prefer to read columns which have associated range predicates before reading any other columns. 
After reading each such column, we evaluate the associated predicate. In the case that the predicate filters all rows in this batch, we short circuit the reading of other columns. 
This provides a significant speed boost when applying selective predicates, as the majority of data from the other selected columns will never be read from disk.

scanner會去檢測predicate,會先去讀有range predicates的columns,這樣可以先對row做過濾,減少其他column的讀取,成為lazy materialization

 

Delta Compaction

Because deltas are not stored in a columnar format, the scan speed of a tablet will degrade as ever more deltas are applied to the base data. Thus, Kudu's background maintenance manager periodically scans DiskRowSets to find any cases where a large number of deltas (as identified by the ratio between base data row count and delta count) have accumulated, and schedules a delta compaction operation which merges those deltas back into the base data columns. 
In particular, the delta compaction operation identifies the common case where the majority of deltas only apply to a subset of columns: for example, it is common for a SQL batch operation to update just one column out of a wide table. In this case, the delta compaction will only rewrite that single column, avoiding IO on the other unmodified columns.

deltas不是以columnar的格式存儲的,所以如果deltas太大會大大降低scan速度。所以Kudu後台會去定期把過大的deltas merge到base data中去

 

RowSet Compaction

In addition to compacting deltas into base data, Kudu also periodically compacts different DiskRowSets together in a process called RowSet compaction. This process performs a key-based merge of two or more DiskRowSets, resulting in a sorted stream of output rows. The output is written back to new DiskRowSets, again rolling every 32 MB, to ensure that no DiskRowSet in the system is too large. 
RowSet compaction has two goals: 
1. We take this opportunity to remove deleted rows. 
2. This process reduces the number of DiskRowSets that overlap in key range. By reducing the amount by which RowSets overlap, we reduce the number of RowSets which are expected to contain a randomly selected key in the tablet. This value acts as an upper bound for the number of Bloom lter lookups, and thus disk seeks, expected to service a write operation within the tablet.

會定期merge DiskRowSets,好處是remove刪除行,reduce多個DiskRowSets之間key range的overlap

 

Scheduling maintenance

As described in the sections above, Kudu has several different background maintenance operations that it performs to reduce memory usage and improve performance of the on-disk layout.


These operations are performed by a pool of maintenance threads that run within the tablet server process. 
Toward the design goal of consistent performance, these threads run all the time, rather than being triggered by specific events or conditions. 
Upon the completion of one maintenance operation, a scheduler process evaluates the state of the on-disk storage and picks the next operation to perform based on a set of heuristics meant to balance memory usage, write-ahead log retention, and the performance of future read and write operations. 
In order to select DiskRowSets to compact, the maintenance scheduler solves an optimization problem: 
given an IO budget (typically 128 MB), select a set of DiskRowSets such that compacting them would reduce the expected number of seeks, as described above. 
This optimization turns out to be a series of instances of the well-known integer knapsack problem, and is able to be solved effciently in a few milliseconds. 
Because the maintenance threads are always running small units of work, the operations can react quickly to changes in workload behavior. 
For example, when insertion workload increases, the scheduler quickly reacts and flushes in-memory stores to disk. When the insertion workload reduces, the server performs compactions in the background to increase performance for future writes. This provides smooth transitions in performance, making it easier for developers and operators to perform capacity planning and estimate the latency profile of their workloads.

會根據實際情況,啟發式的調度各種maintenance線程

最後更新:2017-04-07 21:05:50

  上一篇:go Running Kafka At Scale
  下一篇:go Flink - Working with State