MySQL · 源碼分析 · InnoDB 異步IO工作流程
之前的一篇內核月報InnoDB IO子係統 中介紹了InnoDB IO子係統中包含的同步IO以及異步IO。本篇文章將從源碼層麵剖析一下InnoDB IO子係統中,數據頁的同步IO以及異步IO請求的具體實現過程。
在MySQL5.6中,InnoDB的異步IO主要是用來處理預讀以及對數據文件的寫請求的。而對於正常的頁麵數據讀取則是通過同步IO進行的。到底二者在代碼層麵上的實現過程有什麼樣的區別? 接下來我們將以Linux native io的執行過程為主線,對IO請求的執行過程進行梳理。
重點數據結構
- os_aio_array_t
/** 用來記錄某一類(ibuf,log,read,write)異步IO(aio)請求的數組類型。每一個異步IO請求都會在類型對應的數組中注冊一個innodb
aio對象。*/
os_aio_array_t {
os_ib_mutex_t mutex; // 主要用來控製異步read/write線程的並發操作。對於ibuf,log類型,由於隻有一個線程,所以不存在並發操作問題
os_event_t not_full; // 一個條件變量event,用來通知等待獲取slot的線程是否os_aio_array_t數組有空閑的slot供aio請求
os_event_t is_empty; // 條件變量event,用來通知IO線程os_aio_array_t數組是否有pening的IO請求。
ulint n_slots; // 數組容納的IO請求數。= 線程數 * 每個segment允許pending的請求數(256)
ulint n_segments; // 允許獨立wait的segment數,即某種類型的IO的允許最大線程數
ulint cur_seg; /* IO請求會按照round robin的方式分配到不同的segment中,該變量指示下一個IO請求可以分配的segment */
ulint n_reserved; // 已經Pending的IO請求數
os_aio_slot_t* slots; // 用來記錄具體的每個IO請求對象的數組,也即n_segments 個線程共用n_slots個槽位來存放pending io請求
\#ifdef __WIN__
HANDLE* handles;
/*!< Pointer to an array of OS native
event handles where we copied the
handles from slots, in the same
order. This can be used in
WaitForMultipleObjects; used only in
Windows */
\#endif __WIN__
\#if defined(LINUX_NATIVE_AIO)
io_context_t* aio_ctx; // aio上下文的數組,每個segment擁有獨立的一個aio上下文數組,用來記錄以及完成的IO請求上下文
struct io_event* aio_events; // 該數組用來記錄已經完成的IO請求事件。異步IO通過設置事件通知IO線程處理完成的IO請求
struct iocb** pending; // 用來記錄pending的aio請求
ulint* count; // 該數組記錄了每個segment對應的pending aio請求數量
\#endif /* LINUX_NATIV_AIO */
}
- os_aio_slot_t
// os_aio_array_t數組中用來記錄一個異步IO(aio)請求的對象
os_aio_slot_t {
ibool is_read; /*!< TRUE if a read operation */
ulint pos; // os_aio_array_t數組中所在的位置
ibool reserved; // TRUE表示該Slot已經被別的IO請求占用了
time_t reservation_time; // 占用的時間
ulint len; // io請求的長度
byte* buf; // 數據讀取或者需要寫入的buffer,通常指向buffer pool的一個頁麵,壓縮頁麵有特殊處理
ulint type; /* 請求類型,即讀還是寫IO請求 */
os_offset_t offset; /*!< file offset in bytes */
os_file_t file; /*!< file where to read or write */
const char* name; /*!< 需要讀取的文件及路徑信息 */
ibool io_already_done; /* TRUE表示IO已經完成了
fil_node_t* message1; /* 該aio操作的innodb文件描述符(f_node_t)*/
void* message2; /* 用來記錄完成IO請求所對應的具體buffer pool bpage頁 */
\#ifdef WIN_ASYNC_IO
HANDLE handle; /*!< handle object we need in the
OVERLAPPED struct */
OVERLAPPED control; /*!< Windows control block for the
aio request */
\#elif defined(LINUX_NATIVE_AIO)
struct iocb control; /* 該slot使用的aio請求控製塊iocb */
int n_bytes; /* 讀寫bytes */
int ret; /* AIO return code */
\#endif /* WIN_ASYNC_IO */
}
流程圖
源碼分析
- 物理數據頁操作入口函數os_aio_func
ibool
os_aio_func(
/*========*/
ulint type, /* IO類型,READ還是WRITE IO */
ulint mode, /* 這裏表示是否使用SIMULATED aio執行異步IO請求 */
const char* name, /* IO需要打開的tablespace路徑+名稱 */
os_file_t file, /* IO操作的文件 */
void* buf, // 數據讀取或者需要寫入的buffer,通常指向buffer pool的一個頁麵,壓縮頁麵有特殊處理
os_offset_t offset, /*!< in: file offset where to read or write */
ulint n, /* 讀取或寫入字節數 */
fil_node_t* message1, /* 該aio操作的innodb文件描述符(f_node_t),隻對異步IO起作用 */
void* message2, /* 用來記錄完成IO請求所對應的具體buffer pool bpage頁,隻對異步IO起作用 */
ibool should_buffer, // 是否需要緩存aio請求,該變量主要對預讀起作用
ibool page_encrypt,
/*!< in: Whether to encrypt */
ulint page_size)
/*!< in: Page size */
{
...
wake_later = mode & OS_AIO_SIMULATED_WAKE_LATER;
mode = mode & (~OS_AIO_SIMULATED_WAKE_LATER);
if (mode == OS_AIO_SYNC
#ifdef WIN_ASYNC_IO
&& !srv_use_native_aio
#endif /* WIN_ASYNC_IO */
) {
/* This is actually an ordinary synchronous read or write:
no need to use an i/o-handler thread. NOTE that if we use
Windows async i/o, Windows does not allow us to use
ordinary synchronous os_file_read etc. on the same file,
therefore we have built a special mechanism for synchronous
wait in the Windows case.
Also note that the Performance Schema instrumentation has
been performed by current os_aio_func()'s wrapper function
pfs_os_aio_func(). So we would no longer need to call
Performance Schema instrumented os_file_read() and
os_file_write(). Instead, we should use os_file_read_func()
and os_file_write_func() */
/* 這裏如果是同步IO,並且native io沒有開啟的情況下,直接使用os_file_read/write函數進行讀取,
不需要經過IO線程進行處理 */
if (type == OS_FILE_READ) {
if (page_encrypt) {
return(os_file_read_decrypt_page(file, buf, offset, n, page_size));
} else {
return(os_file_read_func(file, buf, offset, n));
}
}
ut_ad(!srv_read_only_mode);
ut_a(type == OS_FILE_WRITE);
if (page_encrypt) {
return(os_file_write_encrypt_page(name, file, buf, offset, n, page_size));
} else {
return(os_file_write_func(name, file, buf, offset, n));
}
}
try_again:
switch (mode) {
// 根據訪問類型,定位IO請求數組
case OS_AIO_NORMAL:
if (type == OS_FILE_READ) {
array = os_aio_read_array;
} else {
ut_ad(!srv_read_only_mode);
array = os_aio_write_array;
}
break;
case OS_AIO_IBUF:
ut_ad(type == OS_FILE_READ);
/* Reduce probability of deadlock bugs in connection with ibuf:
do not let the ibuf i/o handler sleep */
wake_later = FALSE;
if (srv_read_only_mode) {
array = os_aio_read_array;
}
break;
case OS_AIO_LOG:
if (srv_read_only_mode) {
array = os_aio_read_array;
} else {
array = os_aio_log_array;
}
break;
case OS_AIO_SYNC:
array = os_aio_sync_array;
#if defined(LINUX_NATIVE_AIO)
/* In Linux native AIO we don't use sync IO array. */
ut_a(!srv_use_native_aio);
#endif /* LINUX_NATIVE_AIO */
break;
default:
ut_error;
array = NULL; /* Eliminate compiler warning */
}
// 阻塞為當前IO請求申請一個用來執行異步IO的slot
slot = os_aio_array_reserve_slot(type, array, message1, message2, file,
name, buf, offset, n, page_encrypt, page_size);
DBUG_EXECUTE_IF("simulate_slow_aio",
{
os_thread_sleep(1000000);
}
);
if (type == OS_FILE_READ) {
if (srv_use_native_aio) {
os_n_file_reads++;
os_bytes_read_since_printout += n;
#ifdef WIN_ASYNC_IO
// 這裏是Windows用來處理異步IO讀請求
ret = ReadFile(file, buf, (DWORD) n, &len,
&(slot->control));
#elif defined(LINUX_NATIVE_AIO)
// 這裏是Linux來處理native io
if (!os_aio_linux_dispatch(array, slot, should_buffer)) {
goto err_exit;
#endif /* WIN_ASYNC_IO */
} else {
if (!wake_later) {
// 喚醒simulated aio處理線程
os_aio_simulated_wake_handler_thread(
os_aio_get_segment_no_from_slot(
array, slot));
}
}
} else if (type == OS_FILE_WRITE) {
ut_ad(!srv_read_only_mode);
if (srv_use_native_aio) {
os_n_file_writes++;
#ifdef WIN_ASYNC_IO
// 這裏是Windows用來處理異步IO寫請求
ret = WriteFile(file, buf, (DWORD) n, &len,
&(slot->control));
#elif defined(LINUX_NATIVE_AIO)
// 這裏是Linux來處理native io
if (!os_aio_linux_dispatch(array, slot, false)) {
goto err_exit;
}
#endif /* WIN_ASYNC_IO */
} else {
if (!wake_later) {
// 喚醒simulated aio處理線程
os_aio_simulated_wake_handler_thread(
os_aio_get_segment_no_from_slot(
array, slot));
}
}
} else {
ut_error;
}
...
}
- 負責通知Linux內核執行native IO請求的函數os_aio_linux_dispatch
static
ibool
os_aio_linux_dispatch(
/*==================*/
os_aio_array_t* array, /* IO請求函數 */
os_aio_slot_t* slot, /* 申請好的slot */
ibool should_buffer) // 是否需要緩存aio 請求,該變量主要對預讀起作用
{
...
/* Find out what we are going to work with.
The iocb struct is directly in the slot.
The io_context is one per segment. */
// 每個segment包含的slot個數,Linux下每個segment包含256個slot
slots_per_segment = array->n_slots / array->n_segments;
iocb = &slot->control;
io_ctx_index = slot->pos / slots_per_segment;
if (should_buffer) {
/* 這裏也可以看到aio請求緩存隻對讀請求起作用 */
ut_ad(array == os_aio_read_array);
ulint n;
ulint count;
os_mutex_enter(array->mutex);
/* There are array->n_slots elements in array->pending, which is divided into
* array->n_segments area of equal size. The iocb of each segment are
* buffered in its corresponding area in the pending array consecutively as
* they come. array->count[i] records the number of buffered aio requests in
* the ith segment.*/
n = io_ctx_index * slots_per_segment
+ array->count[io_ctx_index];
array->pending[n] = iocb;
array->count[io_ctx_index] ++;
count = array->count[io_ctx_index];
os_mutex_exit(array->mutex);
// 如果當前segment的slot都已經被占用了,就需要提交一次異步aio請求
if (count == slots_per_segment) {
os_aio_linux_dispatch_read_array_submit(); //no cover line
}
// 否則就直接返回
return (TRUE);
}
// 直接提交IO請求到內核
ret = io_submit(array->aio_ctx[io_ctx_index], 1, &iocb);
...
}
- IO線程負責監控aio請求的主函數fil_aio_wait
void
fil_aio_wait(
/*=========*/
ulint segment) /*!< in: the number of the segment in the aio
array to wait for */
{
ibool ret;
fil_node_t* fil_node;
void* message;
ulint type;
ut_ad(fil_validate_skip());
if (srv_use_native_aio) { // 使用native io
srv_set_io_thread_op_info(segment, "native aio handle");
#ifdef WIN_ASYNC_IO
ret = os_aio_windows_handle( // Window監控入口
segment, 0, &fil_node, &message, &type);
#elif defined(LINUX_NATIVE_AIO)
ret = os_aio_linux_handle( // Linux native io監控入口
segment, &fil_node, &message, &type);
#else
ut_error;
ret = 0; /* Eliminate compiler warning */
#endif /* WIN_ASYNC_IO */
} else {
srv_set_io_thread_op_info(segment, "simulated aio handle");
ret = os_aio_simulated_handle( // Simulated aio監控入口
segment, &fil_node, &message, &type);
}
ut_a(ret);
if (fil_node == NULL) {
ut_ad(srv_shutdown_state == SRV_SHUTDOWN_EXIT_THREADS);
return;
}
srv_set_io_thread_op_info(segment, "complete io for fil node");
mutex_enter(&fil_system->mutex);
// 到這裏表示至少有一個IO請求已經完成,該函數設置狀態信息
fil_node_complete_io(fil_node, fil_system, type);
mutex_exit(&fil_system->mutex);
ut_ad(fil_validate_skip());
/* Do the i/o handling */
/* IMPORTANT: since i/o handling for reads will read also the insert
buffer in tablespace 0, you have to be very careful not to introduce
deadlocks in the i/o system. We keep tablespace 0 data files always
open, and use a special i/o thread to serve insert buffer requests. */
if (fil_node->space->purpose == FIL_TABLESPACE) { // 數據文件讀寫IO
srv_set_io_thread_op_info(segment, "complete io for buf page");
// IO請求完成後,這裏處理buffer pool對應的bpage相關的一些狀態信息並根據checksum驗證數據的正確性
buf_page_io_complete(static_cast<buf_page_t*>(message));
} else { // 日誌文件的讀寫IO
srv_set_io_thread_op_info(segment, "complete io for log");
log_io_complete(static_cast<log_group_t*>(message));
}
}
#endif /* UNIV_HOTBACKUP */
- IO線程負責處理native IO請求的函數os_aio_linux_handle
ibool
os_aio_linux_handle(ulint global_seg, // 屬於哪個segment
fil_node_t**message1, /* 該aio操作的innodb文件描述符(f_node_t)*/
void** message2, /* 用來記錄完成IO請求所對應的具體buffer pool bpage頁 */
ulint* type){ // 讀or寫IO
// 根據global_seg獲得該aio 的os_aio_array_t數組,並返回對應的segment
segment = os_aio_get_array_and_local_segment(&array, global_seg);
n = array->n_slots / array->n_segments; //獲得一個線程可監控的io event數
/* Loop until we have found a completed request. */
for (;;) {
ibool any_reserved = FALSE;
os_mutex_enter(array->mutex);
for (i = 0; i < n; ++i) { // 遍曆該線程所發起的所有aio請求
slot = os_aio_array_get_nth_slot(
array, i + segment * n);
if (!slot->reserved) { // 該slot是否被占用
continue;
} else if (slot->io_already_done) { // IO請求已經完成,可以通知主線程返回數據了
/* Something for us to work on. */
goto found;
} else {
any_reserved = TRUE;
}
}
os_mutex_exit(array->mutex);
// 到這裏說明沒有找到一個完成的io,則再去collect
os_aio_linux_collect(array, segment, n);
found: // 找到一個完成的io,將內容返回
*message1 = slot->message1;
*message2 = slot->message2; // 返回完成IO所對應的bpage頁
*type = slot->type;
if (slot->ret == 0 && slot->n_bytes == (long) slot->len) {
if (slot->page_encrypt
&& slot->type == OS_FILE_READ) {
os_decrypt_page(slot->buf, slot->len, slot->page_size, FALSE);
}
ret = TRUE;
} else {
errno = -slot->ret;
/* os_file_handle_error does tell us if we should retry
this IO. As it stands now, we don't do this retry when
reaping requests from a different context than
the dispatcher. This non-retry logic is the same for
windows and linux native AIO.
We should probably look into this to transparently
re-submit the IO. */
os_file_handle_error(slot->name, "Linux aio");
ret = FALSE;
}
os_mutex_exit(array->mutex);
os_aio_array_free_slot(array, slot);
return(ret);
}
- 等待native IO請求完成os_aio_linux_collect
os_aio_linux_collect(os_aio_array_t* array,
ulint segment,
ulint seg_size){
events = &array->aio_events[segment * seg_size]; // 定位segment所對應的io event的數組位置
/* 獲得該線程的aio上下文數組 */
io_ctx = array->aio_ctx[segment];
/* Starting point of the segment we will be working on. */
start_pos = segment * seg_size;
/* End point. */
end_pos = start_pos + seg_size;
retry:
/* Initialize the events. The timeout value is arbitrary.
We probably need to experiment with it a little. */
memset(events, 0, sizeof(*events) * seg_size);
timeout.tv_sec = 0;
timeout.tv_nsec = OS_AIO_REAP_TIMEOUT;
ret = io_getevents(io_ctx, 1, seg_size, events, &timeout); // 阻塞等待該IO線程所監控的任一IO請求完成
if (ret > 0) { // 有IO請求完成
for (i = 0; i < ret; i++) {
// 記錄完成IO的請求信息到對應的os_aio_slot_t 對象
os_aio_slot_t* slot;
struct iocb* control;
control = (struct iocb*) events[i].obj; // 獲得完成的aio的iocb,即提交這個aio請求的iocb
ut_a(control != NULL);
slot = (os_aio_slot_t*) control->data; // 通過data獲得這個aio iocb所對應的os_aio_slot_t
/* Some sanity checks. */
ut_a(slot != NULL);
ut_a(slot->reserved);
os_mutex_enter(array->mutex);
slot->n_bytes = events[i].res; // 將該io執行的結果保存到slot裏
slot->ret = events[i].res2;
slot->io_already_done = TRUE; // 標誌該io已經完成了,這個標誌也是外層判斷的條件
os_mutex_exit(array->mutex);
}
return;
}
…
}
綜上重點對InnoDB navtive IO讀寫數據文件從源碼角度進行了分析,有興趣的讀者也可以繼續了解InnoDB自帶的simulated IO的實現過程,原理雷同native IO,隻是在實現方式上自己進行了處理。本篇文章對InnoDB IO請求的執行流程進行了梳理,對重點數據結構以及函數進行了分析,希望對讀者日後進行源碼閱讀及修改有所幫助。
最後更新:2017-07-21 09:03:14