閱讀856 返回首頁    go 阿裏雲 go 技術社區[雲棲]


線程任務的取消

  當外部代碼能夠在活動自然完成之前,把它的狀態更改為完成狀態,那麼這個活動被稱為可取消(cancellable)。取消任務是一個很常見的需求,無論是由於用戶請求還是係統錯誤引起的服務關閉等等原因。最簡單的任務取消策略就是在線程中維持一個bool變量,在run方法中判斷此變量的bool值來決定是否取消任務。顯然,這個bool變量需要聲明為volatile,以保持多線程環境下可見性(所謂可見性,就是當一個線程修改共享對象的某個狀態變量後,另一個線程可以馬上看到修改結果)。下麵是一個來自《java並發編程實踐》的例子:
package net.rubyeye.concurrency.chapter7;

import java.math.BigInteger;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

public class PrimeGenerator implements Runnable {
    
private final List<BigInteger> primes = new ArrayList<BigInteger>();

    
private volatile boolean cancelled;
   
public void run() {
        BigInteger p 
= BigInteger.ONE;
        
while (!cancelled) {
            p 
= p.nextProbablePrime();
            
synchronized (this) {
                primes.add(p);
            }
        }
    }
   
public void cancel() {
        cancelled 
= true;
    }
   
public synchronized List<BigInteger> get() {
        
return new ArrayList<BigInteger>(primes);
    }

   
public static void main(String args[]) throws InterruptedException {
        PrimeGenerator generator 
= new PrimeGenerator();
        
new Thread(generator).start();
        
try {
            TimeUnit.SECONDS.sleep(
1);
        } 
finally {
            generator.cancel();
        }
    }
}
    main中啟動一個素數生成的任務,線程運行一秒就取消掉。通過線程中的cancelled變量來表征任務是否繼續執行。既然是最簡單的策略,那麼什麼是例外情況?顯然,阻塞操作下(比如調用join,wait,sleep方法),這樣的策略會出問題。任務因為調用這些阻塞方法而被阻塞,它將不會去檢查volatile變量,導致取消操作失效。那麼解決辦法是什麼?中斷!考慮我們用BlockingQueue去保存生成的素數,BlockingQueue的put方法是阻塞的(當BlockingQueue滿的時候,put操作會阻塞直到有元素被take),讓我們看看不采用中斷,仍然采用簡單策略會出現什麼情況:
package net.rubyeye.concurrency.chapter7;

import java.math.BigInteger;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

public class BrokenPrimeProducer extends Thread {
    
static int i = 1000;

    
private final BlockingQueue<BigInteger> queue;

    
private volatile boolean cancelled = false;

    BrokenPrimeProducer(BlockingQueue
<BigInteger> queue) {
        
this.queue = queue;
    }

    
public void run() {
        BigInteger p 
= BigInteger.ONE;
        
try {
            
while (!cancelled) {
                p 
= p.nextProbablePrime();
                queue.put(p);
            }
        } 
catch (InterruptedException cusumed) {
        }
    }

    
public void cancel() {
        
this.cancelled = false;
    }

    
public static void main(String args[]) throws InterruptedException {
        BlockingQueue
<BigInteger> queue = new LinkedBlockingQueue<BigInteger>(
                
10);
        BrokenPrimeProducer producer 
= new BrokenPrimeProducer(queue);
        producer.start();
        
try {
            
while (needMorePrimes())
                queue.take();
        } 
finally {
            producer.cancel();
        }
    }

    
public static boolean needMorePrimes() throws InterruptedException {
        
boolean result = true;
        i
--;
        
if (i == 0)
            result 
= false;
        
return result;
    }
}
    我們在main中通過queue.take來消費產生的素數(雖然僅僅是取出扔掉),我們隻消費了1000個素數,然後嚐試取消產生素數的任務,很遺憾,取消不了,因為產生素數的線程產生素數的速度大於我們消費的速度,我們在消費1000後就停止消費了,那麼任務將被queue的put方法阻塞,永遠也不會去判斷cancelled狀態變量,任務取消不了。正確的做法應當是使用中斷(interrupt):
package net.rubyeye.concurrency.chapter7;

import java.math.BigInteger;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

public class PrimeProducer extends Thread {
    
static int i = 1000;

    
private final BlockingQueue<BigInteger> queue;

    
private volatile boolean cancelled = false;

    PrimeProducer(BlockingQueue
<BigInteger> queue) {
        
this.queue = queue;
    }

    
public void run() {
        BigInteger p 
= BigInteger.ONE;
        
try {
            
while (!Thread.currentThread().isInterrupted()) {
                p 
= p.nextProbablePrime();
                queue.put(p);
            }
        } 
catch (InterruptedException cusumed) {
        }
    }

    
public void cancel() {
        interrupt();
    }

    
public static void main(String args[]) throws InterruptedException {
        BlockingQueue
<BigInteger> queue = new LinkedBlockingQueue<BigInteger>(
                
10);
        PrimeProducer producer 
= new PrimeProducer(queue);
        producer.start();
        
try {
            
while (needMorePrimes())
                queue.take();
        } 
finally {
            producer.cancel();
        }
    }

    
public static boolean needMorePrimes() throws InterruptedException {
        
boolean result = true;
        i
--;
        
if (i == 0)
            result 
= false;
        
return result;
    }
}
   在run方法中,通過Thread的isInterrupted來判斷interrupt status是否已經被修改,從而正確實現了任務的取消。關於interrupt,有一點需要特別說明,調用interrupt並不意味著必然停止目標線程的正在進行的工作,它僅僅是傳遞一個請求中斷的信號給目標線程,目標線程會在下一個方便的時刻中斷。而對於阻塞方法產生的InterruptedException的處理,兩種選擇:要麼重新拋出讓上層代碼來處理,要麼在catch塊中調用Thread的interrupt來保存中斷狀態。除非你確定要讓工作線程終止(如上所示代碼),否則不要僅僅是catch而不做任務處理工作(生吞了InterruptedException),更詳細可以參考這裏。如果不清楚外部線程的中斷策略,生搬硬套地調用interrupt可能產生不可預料的後果,可參見書中7.1.4例子。

   另外一個取消任務的方法就是采用Future來管理任務,這是JDK5引入的,用於管理任務的生命周期,處理異常等。比如調用ExecutorService的sumit方法會返回一個Future來描述任務,而Future有一個cancel方法用於取消任務。
   那麼,如果任務調用了不可中斷的阻塞方法,比如Socket的read、write方法,java.nio中的同步I/O,那麼該怎麼處理呢?簡單地,關閉它們!參考下麵的例子:
package net.rubyeye.concurrency.chapter7;

import java.io.IOException;
import java.io.InputStream;
import java.net.Socket;

/**
 * 展示對於不可中斷阻塞的取消任務 通過關閉socket引發異常來中斷
 * 
 * 
@author Admin
 * 
 
*/
public abstract class ReaderThread extends Thread {
    
private final Socket socket;

    
private final InputStream in;

    
public ReaderThread(Socket socket) throws IOException {
        
this.socket = socket;
        
this.in = socket.getInputStream();
    }

    
// 重寫interrupt方法
    public void interrupt() {
        
try {
            socket.close();
        } 
catch (IOException e) {
        } 
finally {
            
super.interrupt();
        }
    }

    
public void run() {
        
try {
            
byte[] buf = new byte[1024];
            
while (true) {
                
int count = in.read(buf);
                
if (count < 0)
                    
break;
                
else if (count > 0)
                    processBuff(buf, count);
            }
        } 
catch (IOException e) {
        }
    }

    
public abstract void processBuff(byte[] buf, int count);
}

    Reader線程重寫了interrupt方法,其中調用了socket的close方法用於中斷read方法,最後,又調用了super.interrupt(),防止當調用可中斷的阻塞方法時不能正常中斷。文章轉自莊周夢蝶  ,原文發布時間 2007-09-03

文章轉自莊周夢蝶  ,原文發布時間

最後更新:2017-05-18 10:32:45

  上一篇:go  《Maven官方指南》指南第三方部署到遠程倉庫
  下一篇:go  《工作流管理——模型、方法和係統》筆記2:Petri網對工作流建模