880
windows
redis4.0之利用管道優化aofrewrite
前言
redis的aof持久化本質上是一個redo log,把所有執行過的寫命令追加到aof文件中。那麼隨著redis的運行,aof文件會不斷膨脹,當觸發收縮條件時就要做aofrewrite。
redis是通過fork子進程來做aofrewrite,同時為了保證aof的連續性,父進程把aofrewrite期間的寫命令緩存起來,等收割完子進程之後再追加到新的aof文件。如果期間寫入量較大的話收割時就要有大量的寫磁盤操作,造成性能下降。
為了提高aofrewrite效率,redis通過在父子進程間建立管道,把aofrewrite期間的寫命令通過管道同步給子進程,追加寫盤的操作也就轉交給了子進程。其實利用管道的優化在3.0時期就已經實現了,最近在總結4.0的新特性,索性就都歸到4.0裏,方便查閱。
aofrewrite詳解
1. aofrewrite的基礎實現
上圖是aofrewrite的流程,標注為基本的函數調用關係。
-
1 - 首先,通過命令或是事件觸發aofrewrite,調用rewriteAppendOnlyFileBackground()函數
* 該函數會fork出一個子進程
-
2 - 子進程調用rewriteAppendOnlyFile(tmpfile)函數創建新的aof文件
* 調用rewriteAppendOnlyFileRio()函數遍曆redis把所有key-value以命令的方式寫入新aof文件 * 完成後調用exitFromChild(0)退出
-
3 - 父進程記錄子進程的pid
* 當pid不為-1時就會執行aofRewriteBufferAppend()把寫命令緩存起來
-
4 - 子進程退出後父進程調用backgroundRewriteDoneHandler()來處理
* 調用aofRewriteBufferWrite()函數把積攢的寫命令緩存寫入子進程創建的臨時aof文件 * 最後rename()用新的aof文件替換掉原來的aof文件
在aofrewrite過程中,如果redis本身數據量較大子進程執行時間較長,或者寫入流量較高,就會導致aof-rewrite-buffer積攢較多,父進程就要進行大量寫磁盤操作,這對於redis來說顯然是不夠高效的。
2. 使用pipe優化
為了提高aofrewrite效率,redis使用pipe來優化,下圖中紅色標注即為優化的部分:
優化點:
-
1 - 父進程建立管道
* 共三條管道,分別為一條數據管道,和兩條控製管道 * 數據管道用來傳輸數據,控製管道用來做父子進程交互,控製何時停止數據傳輸
-
2 - 父進程向管道寫數據
* 注冊寫事件aofChildWriteDiffData()向數據管道寫數據
-
3 - 子進程從管道讀數據
* 子進程在生成新aof文件時會定期調用aofReadDiffFromParent()從管道讀取數據,並緩存下來
-
4 - 父子進程交互
* 子進程生成新aof文件後會通過控製管道向父進程發送"!",發起停止數據傳輸請求 * 父進程收到停止信號後激活讀事件處理函數aofChildPipeReadable(),停止數據傳輸,並向子進程回複"!",表示同意停止 * 子進程收到父進程的應答,調用rioWrite()把積攢的數據追加到新的aof文件,最後退出
細心的讀者會發現,aofRewriteBufferAppend()和aofRewriteBufferWrite()這一對函數仍然保留,父進程還是要把aof-rewrite-buffer寫盤嗎?是的,這是因為父子進程是異步結構,父子間總會有那麼一點代溝,aof-rewrite-buffer還是需要保留的,不過這個時候父進程寫盤的數據量就很小了,幾乎可以忽略。
3. aofrewrite代碼剖析
aofrewrite的觸發條件
- 1. 執行bgrewriteaof命令。
- 2. serverCron時間事件檢測到aof文件大小超限。
命令的觸發不必詳述,主要來看下serverCron的觸發:
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
...
/* Trigger an AOF rewrite if needed */
if (server.rdb_child_pid == -1 &&
server.aof_child_pid == -1 &&
server.aof_rewrite_perc &&
server.aof_current_size > server.aof_rewrite_min_size)
{
long long base = server.aof_rewrite_base_size ?
server.aof_rewrite_base_size : 1;
long long growth = (server.aof_current_size*100/base) - 100;
if (growth >= server.aof_rewrite_perc) {
serverLog(LL_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth);
rewriteAppendOnlyFileBackground();
}
}
...
}
也就是說aof文件大小超過了server.aof_rewrite_min_size,並且增長率大於server.aof_rewrite_perc時就會觸發(增長率計算的基數server.aof_rewrite_base_size是上次aofrewrite完之後aof文件的大小)。
目前雲redis設置server.aof_rewrite_min_size為內存規格的1/4,server.aof_rewrite_perc為100。
管道建立
aofrewrite觸發之後進入rewriteAppendOnlyFileBackground()函數:
int rewriteAppendOnlyFileBackground(void) {
pid_t childpid;
long long start;
if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) return C_ERR;
if (aofCreatePipes() != C_OK) return C_ERR;
openChildInfoPipe();
start = ustime();
if ((childpid = fork()) == 0) {
...
OK,重點來了,在fork之前調用了aofCreatePipes()函數來創建管道(openChildInfoPipe()函數隻是用來收集子進程copy-on-write用到的內存,就不詳細展開了):
int aofCreatePipes(void) {
int fds[6] = {-1, -1, -1, -1, -1, -1};
int j;
if (pipe(fds) == -1) goto error; /* parent -> children data. 父進程向子進程寫數據的管道*/
if (pipe(fds+2) == -1) goto error; /* children -> parent ack. 子進程向父進程發起停止傳輸的控製管道*/
if (pipe(fds+4) == -1) goto error; /* parent -> children ack. 父進程向子進程回複的控製管道*/
/* Parent -> children data is non blocking. */
if (anetNonBlock(NULL,fds[0]) != ANET_OK) goto error;
if (anetNonBlock(NULL,fds[1]) != ANET_OK) goto error;
if (aeCreateFileEvent(server.el, fds[2], AE_READABLE, aofChildPipeReadable, NULL) == AE_ERR) goto error;
//注冊讀事件處理函數,負責處理子進程要求停止數據傳輸的消息
server.aof_pipe_write_data_to_child = fds[1]; //父進程向子進程寫數據的fd
server.aof_pipe_read_data_from_parent = fds[0]; //子進程從父進程讀數據的fd
server.aof_pipe_write_ack_to_parent = fds[3]; //子進程向父進程發起停止消息的fd
server.aof_pipe_read_ack_from_child = fds[2]; //父進程從子進程讀取停止消息的fd
server.aof_pipe_write_ack_to_child = fds[5]; //父進程向子進程回複消息的fd
server.aof_pipe_read_ack_from_parent = fds[4]; //子進程從父進程讀取回複消息的fd
server.aof_stop_sending_diff = 0; //是否停止管道傳輸標記位
return C_OK;
...
}
父進程與管道傳輸
管道建立起來了我們再來看看fork之後父進程和子進程如何工作,首先看下父進程:
/* Parent */
server.stat_fork_time = ustime()-start;
server.stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / server.stat_fork_time / (1024*1024*1024); /* GB per second. */
latencyAddSampleIfNeeded("fork",server.stat_fork_time/1000);
...
server.aof_rewrite_scheduled = 0;
server.aof_rewrite_time_start = time(NULL);
server.aof_child_pid = childpid;
updateDictResizePolicy();
/* We set appendseldb to -1 in order to force the next call to the
* feedAppendOnlyFile() to issue a SELECT command, so the differences
* accumulated by the parent into server.aof_rewrite_buf will start
* with a SELECT statement and it will be safe to merge. */
server.aof_selected_db = -1;
...
父進程這裏做的事情並不多,主要是信息的記錄和一些標記位設置
- 記錄fork消耗的時間,info命令可以查看上次fork的耗時latest_fork_usec,單位微秒
- 設置server.aof_rewrite_scheduled = 0,防止serverCron再次觸發aofrewrite
- 設置server.aof_child_pid為子進程pid,其不為-1時redis才會向aof-rewrite-buffer緩存寫命令
- updateDictResizePolicy()禁止所有hash數據結構resize,這是為了盡量避免子進程copy-on-write進行內存拷貝
- 設置server.aof_selected_db = -1,下一次的aof日誌會強製加上select,這是為了保證命令執行到正確的db
接下來就是緩存寫命令和管道通信部分了,入口是在feedAppendOnlyFile():
void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
...
if (server.aof_child_pid != -1)
aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf));
...
}
server.aof_child_pid在這時就生效了,開始緩存寫命令:
void aofRewriteBufferAppend(unsigned char *s, unsigned long len) {
listNode *ln = listLast(server.aof_rewrite_buf_blocks);
aofrwblock *block = ln ? ln->value : NULL;
while(len) {
/* If we already got at least an allocated block, try appending
* at least some piece into it. */
if (block) {
unsigned long thislen = (block->free < len) ? block->free : len;
if (thislen) { /* The current block is not already full. */
memcpy(block->buf+block->used, s, thislen);
block->used += thislen;
block->free -= thislen;
s += thislen;
len -= thislen;
}
}
if (len) { /* First block to allocate, or need another block. */
int numblocks;
block = zmalloc(sizeof(*block));
block->free = AOF_RW_BUF_BLOCK_SIZE;
block->used = 0;
listAddNodeTail(server.aof_rewrite_buf_blocks,block);
/* Log every time we cross more 10 or 100 blocks, respectively
* as a notice or warning. */
numblocks = listLength(server.aof_rewrite_buf_blocks);
if (((numblocks+1) % 10) == 0) {
int level = ((numblocks+1) % 100) == 0 ? LL_WARNING :
LL_NOTICE;
serverLog(level,"Background AOF buffer size: %lu MB",
aofRewriteBufferSize()/(1024*1024));
}
}
}
if (aeGetFileEvents(server.el,server.aof_pipe_write_data_to_child) == 0) {
aeCreateFileEvent(server.el, server.aof_pipe_write_data_to_child,
AE_WRITABLE, aofChildWriteDiffData, NULL);
}
}
redis用鏈表server.aof_rewrite_buf_blocks來緩存aofrewrite期間的寫命令,鏈表的每個節點最大10MB;重點是在最後的寫事件注冊,當server.aof_pipe_write_data_to_child這個fd沒有注冊事件時,就注冊寫事件函數aofChildWriteDiffData:
void aofChildWriteDiffData(aeEventLoop *el, int fd, void *privdata, int mask) {
listNode *ln;
aofrwblock *block;
ssize_t nwritten;
...
while(1) {
ln = listFirst(server.aof_rewrite_buf_blocks);
block = ln ? ln->value : NULL;
if (server.aof_stop_sending_diff || !block) {
aeDeleteFileEvent(server.el,server.aof_pipe_write_data_to_child,
AE_WRITABLE);
return;
}
if (block->used > 0) {
nwritten = write(server.aof_pipe_write_data_to_child,
block->buf,block->used);
if (nwritten <= 0) return;
memmove(block->buf,block->buf+nwritten,block->used-nwritten);
block->used -= nwritten;
block->free += nwritten;
}
if (block->used == 0) listDelNode(server.aof_rewrite_buf_blocks,ln);
}
}
每次事件循環都會把server.aof_rewrite_buf_blocks積攢的寫命令全部同步給子進程,除非server.aof_stop_sending_diff被設置了停止標記。
子進程和管道傳輸
接下來看下子進程:
...
/* Child */
char tmpfile[256];
closeListeningSockets(0);
redisSetProcTitle("redis-aof-rewrite");
snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
if (rewriteAppendOnlyFile(tmpfile) == C_OK) {
...
exitFromChild(0);
} else {
exitFromChild(1);
}
...
子進程首先關閉監聽端口,然後就進入rewriteAppendOnlyFile()函數:
int rewriteAppendOnlyFile(char *filename) {
...
snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
fp = fopen(tmpfile,"w");
...
server.aof_child_diff = sdsempty();
...
if (rewriteAppendOnlyFileRio(&aof) == C_ERR) goto werr;
...
首先打開一個臨時aof文件,並初始化server.aof_child_diff緩存準備從父進程讀數據,然後就調用rewriteAppendOnlyFileRio()來寫aof文件和讀取管道中的數據:
int rewriteAppendOnlyFileRio(rio *aof) {
...
if (aof->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES) {
processed = aof->processed_bytes;
aofReadDiffFromParent();
}
...
}
在遍曆redis把key-value寫入新aof文件過程中,新aof文件每增長10K就會調用aofReadDiffFromParent()從管道中讀取數據追加到server.aof_child_diff:
ssize_t aofReadDiffFromParent(void) {
char buf[65536]; /* Default pipe buffer size on most Linux systems. */
ssize_t nread, total = 0;
while ((nread =
read(server.aof_pipe_read_data_from_parent,buf,sizeof(buf))) > 0) {
server.aof_child_diff = sdscatlen(server.aof_child_diff,buf,nread);
total += nread;
}
return total;
}
停止管道傳輸
聚散終有時,子進程在遍曆完redis生成好新的aof文件之後就要準備退出了,那麼退出前要先告訴父進程停止管道傳輸,依然回到rewriteAppendOnlyFile()函數來看:
int rewriteAppendOnlyFile(char *filename) {
...
/* Ask the master to stop sending diffs. */
if (write(server.aof_pipe_write_ack_to_parent,"!",1) != 1) goto werr;
if (anetNonBlock(NULL,server.aof_pipe_read_ack_from_parent) != ANET_OK)
goto werr;
/* We read the ACK from the server using a 10 seconds timeout. Normally
* it should reply ASAP, but just in case we lose its reply, we are sure
* the child will eventually get terminated. */
if (syncRead(server.aof_pipe_read_ack_from_parent,&byte,1,5000) != 1 ||
byte != '!') goto werr;
serverLog(LL_NOTICE,"Parent agreed to stop sending diffs. Finalizing AOF...");
/* Read the final diff if any. */
aofReadDiffFromParent();
/* Write the received diff to the file. */
serverLog(LL_NOTICE,
"Concatenating %.2f MB of AOF diff received from parent.",
(double) sdslen(server.aof_child_diff) / (1024*1024));
if (rioWrite(&aof,server.aof_child_diff,sdslen(server.aof_child_diff)) == 0)
goto werr;
/* Make sure data will not remain on the OS's output buffers */
if (fflush(fp) == EOF) goto werr;
if (fsync(fileno(fp)) == -1) goto werr;
if (fclose(fp) == EOF) goto werr;
...
}
這裏寫的就很直接了,使用write向控製管道寫入"!"發起停止請求,然後讀取返回結果,超時時間為10s,超時就goto werr異常退出,10s內讀取到"!"就繼續,此時再次調用aofReadDiffFromParent()從數據管道讀取數據確保管道中沒有遺留,最後就是rioWrite()把server.aof_child_diff積攢的數據寫入新aof文件啦。
那麼父進程是如何處理"!"的呢,還記得之前注冊的讀事件aofChildPipeReadable()吧,子進程向控製管道發送"!"就會激活:
void aofChildPipeReadable(aeEventLoop *el, int fd, void *privdata, int mask) {
char byte;
...
if (read(fd,&byte,1) == 1 && byte == '!') {
serverLog(LL_NOTICE,"AOF rewrite child asks to stop sending diffs.");
server.aof_stop_sending_diff = 1;
if (write(server.aof_pipe_write_ack_to_child,"!",1) != 1) {
/* If we can't send the ack, inform the user, but don't try again
* since in the other side the children will use a timeout if the
* kernel can't buffer our write, or, the children was
* terminated. */
serverLog(LL_WARNING,"Can't send ACK to AOF child: %s",
strerror(errno));
}
}
/* Remove the handler since this can be called only one time during a
* rewrite. */
aeDeleteFileEvent(server.el,server.aof_pipe_read_ack_from_child,AE_READABLE);
}
很簡單,標記server.aof_stop_sending_diff=1,給子進程回複"!",並且把自己從事件循環刪掉,自此父子進程間通信完成,剩下的就是父進程等待子進程退出進行收尾工作。
父進程收尾
serverCron()中會調用wait3()來收割子進程:
/* Check if a background saving or AOF rewrite in progress terminated. */
if (server.rdb_child_pid != -1 || server.aof_child_pid != -1 ||
ldbPendingChildren())
...
} else if (pid == server.aof_child_pid) {
backgroundRewriteDoneHandler(exitcode,bysignal);
if (!bysignal && exitcode == 0) receiveChildInfo();
} else {
...
如果收割到的pid是server.aof_child_pid就進入backgroundRewriteDoneHandler():
void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
...
/* Flush the differences accumulated by the parent to the
* rewritten AOF. */
latencyStartMonitor(latency);
snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof",
(int)server.aof_child_pid);
newfd = open(tmpfile,O_WRONLY|O_APPEND);
if (newfd == -1) {
serverLog(LL_WARNING,
"Unable to open the temporary AOF produced by the child: %s", strerror(errno));
goto cleanup;
}
if (aofRewriteBufferWrite(newfd) == -1) {
serverLog(LL_WARNING,
"Error trying to flush the parent diff to the rewritten AOF: %s", strerror(errno));
close(newfd);
goto cleanup;
}
latencyEndMonitor(latency);
latencyAddSampleIfNeeded("aof-rewrite-diff-write",latency);
首先會打開子進程生成的新aof文件,並調用aofRewriteBufferWrite()把server.aof_rewrite_buf_blocks中剩餘的數據追加到新aof文件。
/* Rename the temporary file. This will not unlink the target file if
* it exists, because we reference it with "oldfd". */
latencyStartMonitor(latency);
if (rename(tmpfile,server.aof_filename) == -1) {
serverLog(LL_WARNING,
"Error trying to rename the temporary AOF file %s into %s: %s",
tmpfile,
server.aof_filename,
strerror(errno));
close(newfd);
if (oldfd != -1) close(oldfd);
goto cleanup;
}
latencyEndMonitor(latency);
latencyAddSampleIfNeeded("aof-rename",latency);
之後把新aof文件rename為server.aof_filename記錄的文件名。
/* Asynchronously close the overwritten AOF. */
if (oldfd != -1) bioCreateBackgroundJob(BIO_CLOSE_FILE,(void*)(long)oldfd,NULL,NULL);
使用bio後台線程來close原來的aof文件。
cleanup:
aofClosePipes();
aofRewriteBufferReset();
aofRemoveTempFile(server.aof_child_pid);
server.aof_child_pid = -1;
server.aof_rewrite_time_last = time(NULL)-server.aof_rewrite_time_start;
server.aof_rewrite_time_start = -1;
/* Schedule a new rewrite if we are waiting for it to switch the AOF ON. */
if (server.aof_state == AOF_WAIT_REWRITE)
server.aof_rewrite_scheduled = 1;
最後是清理工作,包括關閉管道、重置aof-rewrite-buffer、複位server.aof_child_pid=-1等,自此aofrewrite完成。
後記
本文介紹了redis的aofrewrite基礎實現以及利用pipe的優化,基於4.0的雲redis也正在籌備之中,敬請期待。
最後更新:2017-08-24 12:03:34