閱讀880 返回首頁    go windows


redis4.0之利用管道優化aofrewrite

前言

redis的aof持久化本質上是一個redo log,把所有執行過的寫命令追加到aof文件中。那麼隨著redis的運行,aof文件會不斷膨脹,當觸發收縮條件時就要做aofrewrite。

redis是通過fork子進程來做aofrewrite,同時為了保證aof的連續性,父進程把aofrewrite期間的寫命令緩存起來,等收割完子進程之後再追加到新的aof文件。如果期間寫入量較大的話收割時就要有大量的寫磁盤操作,造成性能下降。

為了提高aofrewrite效率,redis通過在父子進程間建立管道,把aofrewrite期間的寫命令通過管道同步給子進程,追加寫盤的操作也就轉交給了子進程。其實利用管道的優化在3.0時期就已經實現了,最近在總結4.0的新特性,索性就都歸到4.0裏,方便查閱。

aofrewrite詳解

1. aofrewrite的基礎實現

image.png

上圖是aofrewrite的流程,標注為基本的函數調用關係。

  • 1 - 首先,通過命令或是事件觸發aofrewrite,調用rewriteAppendOnlyFileBackground()函數

    * 該函數會fork出一個子進程
    
  • 2 - 子進程調用rewriteAppendOnlyFile(tmpfile)函數創建新的aof文件

    * 調用rewriteAppendOnlyFileRio()函數遍曆redis把所有key-value以命令的方式寫入新aof文件
    * 完成後調用exitFromChild(0)退出
    
  • 3 - 父進程記錄子進程的pid

    * 當pid不為-1時就會執行aofRewriteBufferAppend()把寫命令緩存起來
    
  • 4 - 子進程退出後父進程調用backgroundRewriteDoneHandler()來處理

    * 調用aofRewriteBufferWrite()函數把積攢的寫命令緩存寫入子進程創建的臨時aof文件
    * 最後rename()用新的aof文件替換掉原來的aof文件
    

在aofrewrite過程中,如果redis本身數據量較大子進程執行時間較長,或者寫入流量較高,就會導致aof-rewrite-buffer積攢較多,父進程就要進行大量寫磁盤操作,這對於redis來說顯然是不夠高效的。

2. 使用pipe優化

為了提高aofrewrite效率,redis使用pipe來優化,下圖中紅色標注即為優化的部分:

image.png

優化點:

  • 1 - 父進程建立管道

    * 共三條管道,分別為一條數據管道,和兩條控製管道
    * 數據管道用來傳輸數據,控製管道用來做父子進程交互,控製何時停止數據傳輸
    
  • 2 - 父進程向管道寫數據

    * 注冊寫事件aofChildWriteDiffData()向數據管道寫數據
    
  • 3 - 子進程從管道讀數據

    * 子進程在生成新aof文件時會定期調用aofReadDiffFromParent()從管道讀取數據,並緩存下來
    
  • 4 - 父子進程交互

    * 子進程生成新aof文件後會通過控製管道向父進程發送"!",發起停止數據傳輸請求
    * 父進程收到停止信號後激活讀事件處理函數aofChildPipeReadable(),停止數據傳輸,並向子進程回複"!",表示同意停止
    * 子進程收到父進程的應答,調用rioWrite()把積攢的數據追加到新的aof文件,最後退出
    

細心的讀者會發現,aofRewriteBufferAppend()和aofRewriteBufferWrite()這一對函數仍然保留,父進程還是要把aof-rewrite-buffer寫盤嗎?是的,這是因為父子進程是異步結構,父子間總會有那麼一點代溝,aof-rewrite-buffer還是需要保留的,不過這個時候父進程寫盤的數據量就很小了,幾乎可以忽略。

3. aofrewrite代碼剖析

aofrewrite的觸發條件

  • 1. 執行bgrewriteaof命令。
  • 2. serverCron時間事件檢測到aof文件大小超限。

命令的觸發不必詳述,主要來看下serverCron的觸發:

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
    ...
         /* Trigger an AOF rewrite if needed */
         if (server.rdb_child_pid == -1 &&
             server.aof_child_pid == -1 &&
             server.aof_rewrite_perc &&
             server.aof_current_size > server.aof_rewrite_min_size)
         {
            long long base = server.aof_rewrite_base_size ?
                            server.aof_rewrite_base_size : 1;
            long long growth = (server.aof_current_size*100/base) - 100;
            if (growth >= server.aof_rewrite_perc) {
                serverLog(LL_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth);
                rewriteAppendOnlyFileBackground();
            }
         }
    ...
}

也就是說aof文件大小超過了server.aof_rewrite_min_size,並且增長率大於server.aof_rewrite_perc時就會觸發(增長率計算的基數server.aof_rewrite_base_size是上次aofrewrite完之後aof文件的大小)。

目前雲redis設置server.aof_rewrite_min_size為內存規格的1/4,server.aof_rewrite_perc為100。

管道建立

aofrewrite觸發之後進入rewriteAppendOnlyFileBackground()函數:

int rewriteAppendOnlyFileBackground(void) {
    pid_t childpid;
    long long start;

    if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) return C_ERR;
    if (aofCreatePipes() != C_OK) return C_ERR;
    openChildInfoPipe();
    start = ustime();
    if ((childpid = fork()) == 0) {
    ...

OK,重點來了,在fork之前調用了aofCreatePipes()函數來創建管道(openChildInfoPipe()函數隻是用來收集子進程copy-on-write用到的內存,就不詳細展開了):

int aofCreatePipes(void) {
    int fds[6] = {-1, -1, -1, -1, -1, -1};
    int j;

    if (pipe(fds) == -1) goto error;   /* parent -> children data. 父進程向子進程寫數據的管道*/
    if (pipe(fds+2) == -1) goto error; /* children -> parent ack.  子進程向父進程發起停止傳輸的控製管道*/
    if (pipe(fds+4) == -1) goto error; /* parent -> children ack.  父進程向子進程回複的控製管道*/
    /* Parent -> children data is non blocking. */
    if (anetNonBlock(NULL,fds[0]) != ANET_OK) goto error;
    if (anetNonBlock(NULL,fds[1]) != ANET_OK) goto error;
    if (aeCreateFileEvent(server.el, fds[2], AE_READABLE, aofChildPipeReadable, NULL) == AE_ERR) goto error;
    //注冊讀事件處理函數,負責處理子進程要求停止數據傳輸的消息

    server.aof_pipe_write_data_to_child = fds[1];      //父進程向子進程寫數據的fd
    server.aof_pipe_read_data_from_parent = fds[0];    //子進程從父進程讀數據的fd
    server.aof_pipe_write_ack_to_parent = fds[3];      //子進程向父進程發起停止消息的fd
    server.aof_pipe_read_ack_from_child = fds[2];      //父進程從子進程讀取停止消息的fd
    server.aof_pipe_write_ack_to_child = fds[5];       //父進程向子進程回複消息的fd
    server.aof_pipe_read_ack_from_parent = fds[4];     //子進程從父進程讀取回複消息的fd
    server.aof_stop_sending_diff = 0;                  //是否停止管道傳輸標記位
    return C_OK;
    ...
}

父進程與管道傳輸

管道建立起來了我們再來看看fork之後父進程和子進程如何工作,首先看下父進程:

        /* Parent */
        server.stat_fork_time = ustime()-start;
        server.stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / server.stat_fork_time / (1024*1024*1024); /* GB per second. */
        latencyAddSampleIfNeeded("fork",server.stat_fork_time/1000);
        ...
        server.aof_rewrite_scheduled = 0;
        server.aof_rewrite_time_start = time(NULL);
        server.aof_child_pid = childpid;
        updateDictResizePolicy();
        /* We set appendseldb to -1 in order to force the next call to the
         * feedAppendOnlyFile() to issue a SELECT command, so the differences
         * accumulated by the parent into server.aof_rewrite_buf will start
         * with a SELECT statement and it will be safe to merge. */
        server.aof_selected_db = -1;
        ...

父進程這裏做的事情並不多,主要是信息的記錄和一些標記位設置

  • 記錄fork消耗的時間,info命令可以查看上次fork的耗時latest_fork_usec,單位微秒
  • 設置server.aof_rewrite_scheduled = 0,防止serverCron再次觸發aofrewrite
  • 設置server.aof_child_pid為子進程pid,其不為-1時redis才會向aof-rewrite-buffer緩存寫命令
  • updateDictResizePolicy()禁止所有hash數據結構resize,這是為了盡量避免子進程copy-on-write進行內存拷貝
  • 設置server.aof_selected_db = -1,下一次的aof日誌會強製加上select,這是為了保證命令執行到正確的db

接下來就是緩存寫命令和管道通信部分了,入口是在feedAppendOnlyFile():

void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
    ...
    if (server.aof_child_pid != -1)
        aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf));
    ...
}

server.aof_child_pid在這時就生效了,開始緩存寫命令:

void aofRewriteBufferAppend(unsigned char *s, unsigned long len) {
    listNode *ln = listLast(server.aof_rewrite_buf_blocks);
    aofrwblock *block = ln ? ln->value : NULL;

    while(len) {
        /* If we already got at least an allocated block, try appending
         * at least some piece into it. */
        if (block) {
            unsigned long thislen = (block->free < len) ? block->free : len;
            if (thislen) {  /* The current block is not already full. */
                memcpy(block->buf+block->used, s, thislen);
                block->used += thislen;
                block->free -= thislen;
                s += thislen;
                len -= thislen;
            }
        }

        if (len) { /* First block to allocate, or need another block. */
            int numblocks;

            block = zmalloc(sizeof(*block));
            block->free = AOF_RW_BUF_BLOCK_SIZE;
            block->used = 0;
            listAddNodeTail(server.aof_rewrite_buf_blocks,block);

            /* Log every time we cross more 10 or 100 blocks, respectively
             * as a notice or warning. */
            numblocks = listLength(server.aof_rewrite_buf_blocks);
            if (((numblocks+1) % 10) == 0) {
                int level = ((numblocks+1) % 100) == 0 ? LL_WARNING :
                                                         LL_NOTICE;
                serverLog(level,"Background AOF buffer size: %lu MB",
                    aofRewriteBufferSize()/(1024*1024));
            }
        }
    }
    if (aeGetFileEvents(server.el,server.aof_pipe_write_data_to_child) == 0) {
        aeCreateFileEvent(server.el, server.aof_pipe_write_data_to_child,
            AE_WRITABLE, aofChildWriteDiffData, NULL);
    }
}

redis用鏈表server.aof_rewrite_buf_blocks來緩存aofrewrite期間的寫命令,鏈表的每個節點最大10MB;重點是在最後的寫事件注冊,當server.aof_pipe_write_data_to_child這個fd沒有注冊事件時,就注冊寫事件函數aofChildWriteDiffData:

void aofChildWriteDiffData(aeEventLoop *el, int fd, void *privdata, int mask) {
    listNode *ln;
    aofrwblock *block;
    ssize_t nwritten;
    ...
    while(1) {
        ln = listFirst(server.aof_rewrite_buf_blocks);
        block = ln ? ln->value : NULL;
        if (server.aof_stop_sending_diff || !block) {
            aeDeleteFileEvent(server.el,server.aof_pipe_write_data_to_child,
                              AE_WRITABLE);
            return;
        }
        if (block->used > 0) {
            nwritten = write(server.aof_pipe_write_data_to_child,
                             block->buf,block->used);
            if (nwritten <= 0) return;
            memmove(block->buf,block->buf+nwritten,block->used-nwritten);
            block->used -= nwritten;
            block->free += nwritten;
        }
        if (block->used == 0) listDelNode(server.aof_rewrite_buf_blocks,ln);
    }
}

每次事件循環都會把server.aof_rewrite_buf_blocks積攢的寫命令全部同步給子進程,除非server.aof_stop_sending_diff被設置了停止標記。

子進程和管道傳輸

接下來看下子進程:

...
        /* Child */
        char tmpfile[256];
        closeListeningSockets(0);
        redisSetProcTitle("redis-aof-rewrite");
        snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
        if (rewriteAppendOnlyFile(tmpfile) == C_OK) {
            ...
            exitFromChild(0);
        } else {
            exitFromChild(1);
        }
...

子進程首先關閉監聽端口,然後就進入rewriteAppendOnlyFile()函數:

int rewriteAppendOnlyFile(char *filename) {
    ...
    snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
    fp = fopen(tmpfile,"w");
    ...
    server.aof_child_diff = sdsempty();
    ...
        if (rewriteAppendOnlyFileRio(&aof) == C_ERR) goto werr;
    ...

首先打開一個臨時aof文件,並初始化server.aof_child_diff緩存準備從父進程讀數據,然後就調用rewriteAppendOnlyFileRio()來寫aof文件和讀取管道中的數據:

int rewriteAppendOnlyFileRio(rio *aof) {
    ...
            if (aof->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES) {
                processed = aof->processed_bytes;
                aofReadDiffFromParent();
            }
    ...
}

在遍曆redis把key-value寫入新aof文件過程中,新aof文件每增長10K就會調用aofReadDiffFromParent()從管道中讀取數據追加到server.aof_child_diff:

ssize_t aofReadDiffFromParent(void) {
    char buf[65536]; /* Default pipe buffer size on most Linux systems. */
    ssize_t nread, total = 0;

    while ((nread =
            read(server.aof_pipe_read_data_from_parent,buf,sizeof(buf))) > 0) {
        server.aof_child_diff = sdscatlen(server.aof_child_diff,buf,nread);
        total += nread;
    }
    return total;
}

停止管道傳輸

聚散終有時,子進程在遍曆完redis生成好新的aof文件之後就要準備退出了,那麼退出前要先告訴父進程停止管道傳輸,依然回到rewriteAppendOnlyFile()函數來看:

int rewriteAppendOnlyFile(char *filename) {
    ...
    /* Ask the master to stop sending diffs. */
    if (write(server.aof_pipe_write_ack_to_parent,"!",1) != 1) goto werr;
    if (anetNonBlock(NULL,server.aof_pipe_read_ack_from_parent) != ANET_OK)
        goto werr;
    /* We read the ACK from the server using a 10 seconds timeout. Normally
     * it should reply ASAP, but just in case we lose its reply, we are sure
     * the child will eventually get terminated. */
    if (syncRead(server.aof_pipe_read_ack_from_parent,&byte,1,5000) != 1 ||
        byte != '!') goto werr;
    serverLog(LL_NOTICE,"Parent agreed to stop sending diffs. Finalizing AOF...");

    /* Read the final diff if any. */
    aofReadDiffFromParent();

    /* Write the received diff to the file. */
    serverLog(LL_NOTICE,
        "Concatenating %.2f MB of AOF diff received from parent.",
        (double) sdslen(server.aof_child_diff) / (1024*1024));
    if (rioWrite(&aof,server.aof_child_diff,sdslen(server.aof_child_diff)) == 0)
        goto werr;

    /* Make sure data will not remain on the OS's output buffers */
    if (fflush(fp) == EOF) goto werr;
    if (fsync(fileno(fp)) == -1) goto werr;
    if (fclose(fp) == EOF) goto werr;
    ...
}

這裏寫的就很直接了,使用write向控製管道寫入"!"發起停止請求,然後讀取返回結果,超時時間為10s,超時就goto werr異常退出,10s內讀取到"!"就繼續,此時再次調用aofReadDiffFromParent()從數據管道讀取數據確保管道中沒有遺留,最後就是rioWrite()把server.aof_child_diff積攢的數據寫入新aof文件啦。

那麼父進程是如何處理"!"的呢,還記得之前注冊的讀事件aofChildPipeReadable()吧,子進程向控製管道發送"!"就會激活:

void aofChildPipeReadable(aeEventLoop *el, int fd, void *privdata, int mask) {
    char byte;
    ...
    if (read(fd,&byte,1) == 1 && byte == '!') {
        serverLog(LL_NOTICE,"AOF rewrite child asks to stop sending diffs.");
        server.aof_stop_sending_diff = 1;
        if (write(server.aof_pipe_write_ack_to_child,"!",1) != 1) {
            /* If we can't send the ack, inform the user, but don't try again
             * since in the other side the children will use a timeout if the
             * kernel can't buffer our write, or, the children was
             * terminated. */
            serverLog(LL_WARNING,"Can't send ACK to AOF child: %s",
                strerror(errno));
        }
    }
    /* Remove the handler since this can be called only one time during a
     * rewrite. */
    aeDeleteFileEvent(server.el,server.aof_pipe_read_ack_from_child,AE_READABLE);
}

很簡單,標記server.aof_stop_sending_diff=1,給子進程回複"!",並且把自己從事件循環刪掉,自此父子進程間通信完成,剩下的就是父進程等待子進程退出進行收尾工作。

父進程收尾

serverCron()中會調用wait3()來收割子進程:

    /* Check if a background saving or AOF rewrite in progress terminated. */
    if (server.rdb_child_pid != -1 || server.aof_child_pid != -1 ||
        ldbPendingChildren())
        ...
        } else if (pid == server.aof_child_pid) {
                backgroundRewriteDoneHandler(exitcode,bysignal);
                if (!bysignal && exitcode == 0) receiveChildInfo();
        } else {
        ...

如果收割到的pid是server.aof_child_pid就進入backgroundRewriteDoneHandler():

void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
    ...
        /* Flush the differences accumulated by the parent to the
         * rewritten AOF. */
        latencyStartMonitor(latency);
        snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof",
            (int)server.aof_child_pid);
        newfd = open(tmpfile,O_WRONLY|O_APPEND);
        if (newfd == -1) {
            serverLog(LL_WARNING,
                "Unable to open the temporary AOF produced by the child: %s", strerror(errno));
            goto cleanup;
        }

        if (aofRewriteBufferWrite(newfd) == -1) {
            serverLog(LL_WARNING,
                "Error trying to flush the parent diff to the rewritten AOF: %s", strerror(errno));
            close(newfd);
            goto cleanup;
        }
        latencyEndMonitor(latency);
        latencyAddSampleIfNeeded("aof-rewrite-diff-write",latency);

首先會打開子進程生成的新aof文件,並調用aofRewriteBufferWrite()把server.aof_rewrite_buf_blocks中剩餘的數據追加到新aof文件。

        /* Rename the temporary file. This will not unlink the target file if
         * it exists, because we reference it with "oldfd". */
        latencyStartMonitor(latency);
        if (rename(tmpfile,server.aof_filename) == -1) {
            serverLog(LL_WARNING,
                "Error trying to rename the temporary AOF file %s into %s: %s",
                tmpfile,
                server.aof_filename,
                strerror(errno));
            close(newfd);
            if (oldfd != -1) close(oldfd);
            goto cleanup;
        }
        latencyEndMonitor(latency);
        latencyAddSampleIfNeeded("aof-rename",latency);

之後把新aof文件rename為server.aof_filename記錄的文件名。

        /* Asynchronously close the overwritten AOF. */
        if (oldfd != -1) bioCreateBackgroundJob(BIO_CLOSE_FILE,(void*)(long)oldfd,NULL,NULL);

使用bio後台線程來close原來的aof文件。

cleanup:
    aofClosePipes();
    aofRewriteBufferReset();
    aofRemoveTempFile(server.aof_child_pid);
    server.aof_child_pid = -1;
    server.aof_rewrite_time_last = time(NULL)-server.aof_rewrite_time_start;
    server.aof_rewrite_time_start = -1;
    /* Schedule a new rewrite if we are waiting for it to switch the AOF ON. */
    if (server.aof_state == AOF_WAIT_REWRITE)
        server.aof_rewrite_scheduled = 1;

最後是清理工作,包括關閉管道、重置aof-rewrite-buffer、複位server.aof_child_pid=-1等,自此aofrewrite完成。

後記

本文介紹了redis的aofrewrite基礎實現以及利用pipe的優化,基於4.0的雲redis也正在籌備之中,敬請期待。

最後更新:2017-08-24 12:03:34

  上一篇:go  從零到一 之 ZooKeeper(一)
  下一篇:go  揭秘ZSearch2.0—基於OpenResty的API網關設計