生產者消費者問題-轉自維基百科
生產者消費者問題(英語:Producer-consumer problem),也稱有限緩衝問題(英語:Bounded-buffer problem),是一個多線程同步問題的經典案例。該問題描述了兩個共享固定大小緩衝區的線程——即所謂的“生產者”和“消費者”——在實際運行時會發生的問題。生產者的主要作用是生成一定量的數據放到緩衝區中,然後重複此過程。與此同時,消費者也在緩衝區消耗這些數據。該問題的關鍵就是要保證生產者不會在緩衝區滿時加入數據,消費者也不會在緩衝區中空時消耗數據。
要解決該問題,就必須讓生產者在緩衝區滿時休眠(要麼幹脆就放棄數據),等到下次消費者消耗緩衝區中的數據的時候,生產者才能被喚醒,開始往緩衝區添加數據。同樣,也可以讓消費者在緩衝區空時進入休眠,等到生產者往緩衝區添加數據之後,再喚醒消費者。通常采用進程間通信的方法解決該問題,常用的方法有信號燈法[1]等。如果解決方法不夠完善,則容易出現死鎖的情況。出現死鎖時,兩個線程都會陷入休眠,等待對方喚醒自己。該問題也能被推廣到多個生產者和消費者的情形。
int itemCount = 0; procedure producer() { while (true) { item = produceItem(); if (itemCount == BUFFER_SIZE) { sleep(); } putItemIntoBuffer(item); itemCount = itemCount + 1; if (itemCount == 1) { wakeup(consumer); } } } procedure consumer() { while (true) { if (itemCount == 0) { sleep(); } item = removeItemFromBuffer(); itemCount = itemCount - 1; if (itemCount == BUFFER_SIZE - 1) { wakeup(producer); } consumeItem(item); } }
上麵代碼中的問題在於它可能導致競爭條件,進而引發死鎖。考慮下麵的情形:
- 消費者把最後一個 itemCount 的內容讀出來,注意它現在是零。消費者返回到while的起始處,現在進入 if 塊;
- 就在調用sleep之前,CPU決定將時間讓給生產者,於是消費者在執行 sleep 之前就被中斷了,生產者開始執行;
- 生產者生產出一項數據後將其放入緩衝區,然後在 itemCount 上加 1;
- 由於緩衝區在上一步加 1 之前為空,生產者嚐試喚醒消費者;
- 遺憾的是,消費者並沒有在休眠,喚醒指令不起作用。當消費者恢複執行的時候,執行 sleep,一覺不醒。出現這種情況的原因在於,消費者隻能被生產者在 itemCount 為 1 的情況下喚醒;
- 生產者不停地循環執行,直到緩衝區滿,隨後進入休眠。
由於兩個進程都進入了永遠的休眠,死鎖情況出現了。因此,該算法是不完善的。
使用信號燈的算法[編輯]
信號燈可以避免上述喚醒指令不起作用的情況。該方法(見下麵的代碼)使用了兩個信號燈,fillCount 和 emptyCount。fillCount 用於記錄緩衝區中將被讀取的數據項數,emptyCount 用於記錄緩衝區中空閑空間數。當有新數據項被放入緩衝區時,fillCount 增加,emptyCount 減少。如果在生產者嚐試減少 emptyCount 的時候發現其值為零,那麼生產者就進入休眠。等到有數據項被消耗,emptyCount 增加的時候,生產者才被喚醒。消費者的行為類似。
semaphore fillCount = 0; // 生產的項目 semaphore emptyCount = BUFFER_SIZE; // 剩餘空間 procedure producer() { while (true) { item = produceItem(); down(emptyCount); putItemIntoBuffer(item); up(fillCount); } } procedure consumer() { while (true) { down(fillCount); item = removeItemFromBuffer(); up(emptyCount); consumeItem(item); } }
上述方法在隻有一個生產者和一個消費者時能解決問題。對於多個生產者或者多個消費者共享緩衝區的情況,該算法也會導致競爭條件,出現兩個或以上的線程同時讀或寫同一個緩衝區槽的情況。為了說明這種情況是如何發生的,可以假設 putItemIntoBuffer() 的一種可能的實現:先尋找下一個可用空槽,然後寫入數據項。下列情形是可能出現的:
- 兩個生產者都增加 emptyCount 的值;
- 某一生產者尋找到下一個可用空槽;
- 另一生產者也找到了下一個可用空槽,結果和上一步被找到的是同一個空槽;
- 兩個生產者向可用空槽寫入數
- 據。
為了解決這個問題,需要在保證同一時刻隻有一個生產者能夠執行 putItemIntoBuffer()。也就是說,需要尋找一種方法來互斥地執行臨界區的代碼。為了達到這個目的,可引入一個二值信號燈 mutex,其值隻能為 1 或者 0。如果把線程放入 down(mutex) 和 up(mutex) 之間,就可以限製隻有一個線程能被執行。多生產者、消費者的解決算法如下:
semaphore mutex = 1; semaphore fillCount = 0; semaphore emptyCount = BUFFER_SIZE; procedure producer() { while (true) { item = produceItem(); down(emptyCount); down(mutex); putItemIntoBuffer(item); up(mutex); up(fillCount); } } procedure consumer() { while (true) { down(fillCount); down(mutex); item = removeItemFromBuffer(); up(mutex); up(emptyCount); consumeItem(item); } }
使用管程的算法[編輯]
下列偽代碼展示的是使用管程來解決生產者消費者問題的辦法。由於管程一定能保證互斥,不必特地考慮保護臨界區[2]。也就是說,下麵這個方法不用修改就可以推廣適用於任意數量的生產者和消費者的情況。
monitor ProducerConsumer { int itemCount condition full; condition empty; procedure add(item) { while (itemCount == BUFFER_SIZE) { wait(full); } putItemIntoBuffer(item); itemCount = itemCount + 1; if (itemCount == 1) { notify(empty); } } procedure remove() { while (itemCount == 0) { wait(empty); } item = removeItemFromBuffer(); itemCount = itemCount - 1; if (itemCount == BUFFER_SIZE - 1) { notify(full); } return item; } } procedure producer() { while (true) { item = produceItem() ProducerConsumer.add(item) } } procedure consumer() { while (true) { item = ProducerConsumer.remove() consumeItem(item) } }
注意代碼中 while 語句的用法,都是用
在測試緩衝區是否已滿或空的時候。當存在多個消費者時,有可能造成競爭條件的情況是:某一消費者在一項數據被放入緩衝區中時被喚醒,但是另一消費者已經在管程上等待了一段時間並移除了這項數據。如果 while 語句被改成 if,則會出現放入緩衝區的數據項過多,或移除空緩衝區中的元素的情況。
不使用信號燈或者管程[編輯]
對於生產者消費者問題來說,特別是當隻有一個生產者和一個消費者時,實現一個先進先出結構或者通信通道非常重要。這樣,生產者-消費者模式就可以在不依賴信號燈、互斥變量或管程的的情況下高效地傳輸數據。但如果采用這種模式,性能可能下降,因為實現這種模式的代價比較高。人們喜歡用先進先出結構或者通信通道,隻是因為可以避免端與端之間的原子性同步。用 C 語言舉例如下,請注意:
- 該例繞開了對共享變量的原子性“讀-改-寫”訪問:每個 Count 變量都由單線程更新;
- 該例並不使線程休眠,這種做法依據係統不同是合理的。變量 sched_yield 是一個無關緊要可以不要的變量。線程庫通常會要求信號燈或者條件變量控製線程的休眠和喚起,在多處理器環境中,線程的休眠和喚起發生的頻率比傳遞數據符號要小,因此避開對數據原子性操作是有利的。
volatile unsigned int produceCount, consumeCount; TokenType buffer[BUFFER_SIZE]; void producer(void) { while (1) { while (produceCount - consumeCount == BUFFER_SIZE) sched_yield(); // 緩衝區滿 buffer[produceCount % BUFFER_SIZE] = produceToken(); produceCount += 1; } } void consumer(void) { while (1) { while (produceCount - consumeCount == 0) sched_yield(); // 緩衝區空 consumeToken( buffer[consumeCount % BUFFER_SIZE]); consumeCount += 1; } }
最後更新:2017-04-03 20:19:15