閱讀211 返回首頁    go 阿裏雲 go 技術社區[雲棲]


Marble原理之線程中斷

本章節依賴於【Marble使用】,閱讀本章節前請保證已經充分了解Marble。
中斷特性從Marble-Agent 2.0.5開始支持。

線程中斷使用

  1. 引入marble-agent jar包 xml <dependency> <groupId>com.github.jeff-dong</groupId> <artifactId>marble-agent</artifactId> <version>最新版</version> </dependency>
  2. JOB執行代碼適當位置添加中斷標誌, 下麵給出示例代碼

    @Component("job1")
    public class Job1 extends MarbleJob {
    private ClogWrapper logger = ClogWrapperFactory.getClogWrapper(Job1.class);
    
    @Override
    public void execute(String param) throws Exception {
        logger.info("JOB1開始執行 ...");
        int i = 0;
        while (true) {
            i++;
            //1、用中斷狀態碼進行判斷
            if (Thread.interrupted()) {
                logger.info("JOB1-[{}]-[{}]被打斷啦", param, Thread.currentThread().getName());
                return;
            }
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                //2、捕獲終端異常後return結束
                return;
            }
            logger.info("JOB1-[{}]-[{}]-{}-------", param, Thread.currentThread().getName(), i);
        }
    }
    }
    
  3. Marble OFFLINE進行線程中斷

3.1 手動調度線程中斷

4678905_b2f772e124219867

3.2 選擇要中斷的服務器進行終端嚐試
4678905_b192bb1edf275c81

3.3 查看中斷日誌(同步JOB)
4678905_5d008fef53039e9b

中斷實現及原理

Java的線程中斷

Java的線程中斷機製是一種協作機製,線程中斷並不能立即停掉線程執行,相反,可能線程永遠都不會響應。
java的線程中斷模型隻是通過修改線程的中斷標誌(interrupt)進行中斷通知,不會有其它額外操作,因此線程是否最終中斷取決於線程的執行邏輯。因此,如果想讓線程按照自己的想法中斷,要代碼中事先進行中斷的“埋點”處理。

有人可能會想到Thread的stop方法進行中斷,由於此方法可能造成不可預知的結果,已經被拋棄

Marble進行線程中斷實現

需求收集
  1. 以JOB為維度進行線程中斷;
  2. 盡量做到實時響應;
  3. 存在集群中多台機器,要支持指定某台機器中的線程中斷;
  4. 允許多次中斷嚐試;
  5. 中斷請求不能依賴於JOB當前狀態。可能已經停止調度的JOB也要手動中斷執行中的線程;
  6. 透明和擴展不同JOB的中斷(提供用戶中斷的"後處理"擴展);
需求分析及實現

【以JOB為維度進行線程中斷】

Marble的JOB標誌為 schedulerName-appId-jobName組成,目前Marble每個JOB調度時間和頻率都是個性化,目前調度完成就銷毀。但要做到任何時間進行執行中的線程中斷就要:
1.1 存儲JOB的運行線程,隨時準備中斷;
1.2 在緩存的JOB數量/時間和性能間做權衡,不能過多也不能過少;
1.3 製定緩存已滿時的拋棄策略,避免緩存被占滿新的線程永遠無法中斷;
1.4 要同步JOB和異步JOB透明處理(感覺不出差異);

實現:
Marble的線程池中定義支持並發的MAP進行JOB維度的線程緩存,此外指定每個JOB下緩存的線程數量。如下:

public class ThreadPool {
    ...
    private Multimap<String, Object> threadMultimap = Multimaps.synchronizedMultimap(HashMultimap.<String, Object>create());
    //multimap的單個key的最大容量
    private static final int THREADMULTIMAP_SIZE = 50;
    ...
}

Marble-Agent在同步/異步JOB生成新的線程對象時進行放入MAP緩存,如果緩存(50個)已滿采用如下策略進行處理:
1. 嚐試清理當前map中的非活躍線程;
2. 嚐試清理當前map中已經完成的線程(同步線程有效);
3. 如果還未清理出空間,移除最久的線程;

public ThreadPool multimapPut(String key, Object value) {
        if (StringUtils.isNotBlank(key)) {
            Collection collection = threadMultimap.get(key);
            if (collection != null && collection.size() >= THREADMULTIMAP_SIZE) {
                //替換最久的
                Iterator<Object> it = collection.iterator();
                //首先進行 非活躍線程清理
                while (it.hasNext()) {
                    Object tempObj = it.next();
                    if(tempObj instanceof MarbleThread){
                        MarbleThread mt = (MarbleThread)tempObj;
                        //不活躍刪除
                        if(!mt.isThreadAlive()){
                            it.remove();
                        }
                    }else if(tempObj instanceof MarbleThreadFeature){
                        MarbleThreadFeature mf = (MarbleThreadFeature) tempObj;
                        //完成的線程刪除
                        if(mf.isDone()){
                            it.remove();
                        }
                    }
                }
                //仍然>最大值,刪除最久未使用
                if(collection.size() >= THREADMULTIMAP_SIZE){
                    while (it.hasNext()) {
                        it.next();
                        it.remove();
                        break;
                    }
                }
                threadMultimap.put(key, value);
                return this;
            }
        }
        threadMultimap.put(key, value);
        return this;
    }

此外,為了能在JVM關閉時進行線程中斷,添加JVM hook進行中斷調用處理(包括線程池的銷毀)。
除此之外,還有個小問題,由於線程池使用的是有界的阻塞隊列,此種情況下,線程中斷時可能有的線程存在於阻塞隊列中,單純的中斷無效,對於此類情況,要首先判斷阻塞隊列中是否存在要中斷的線程,存在的話進行隊列的移除操作。

【盡量做到實時響應】
隻能通過用戶在具體的線程邏輯中進行埋點處理,Marble在框架層麵除了及時把用戶的中斷請求送達之外,沒有其它措施。

【存在集群中多台機器,要支持指定某台機器中的線程中斷】
Marble OFFLINE的中斷頁麵支持機器的選擇,用戶進行選擇後,Marble會有針對性的進行機器的中斷RPC發送。

【允許多次中斷嚐試】
OFFLINE未對中斷次數進行限製,目前支持多次中斷請求發送。

【中斷請求不能依賴於JOB當前狀態】
考慮到用戶對曆史線程的中斷請求,Marble未把中斷操作綁定在JOB狀態上,任何JOB都可以進行終端嚐試。

【透明擴展不同JOB的中斷】
Marble目前支持同步和異步JOB,兩類JOB的中斷處理並不一致,比如同步job的中斷是通過FeatureTask的cancel實現,異步JOB是通過Thread的interrupt實現,此外線程被中斷後Marble希望能更進一步提供一個統一的“後處理”操作給用戶自己實現,比如用戶可能需要在線程被中斷後進行一些後續的log記錄等。

為了代碼層麵一致透明,且友好的實現“後處理”的封裝,Marble使用了代理模式,在Thread和FeatureTask上添加了一層“代理類”,由代理進行具體的中斷操作。
同步JOB代理類:


/**
 * @author <a href="dongjianxing@aliyun.com">jeff</a>
 * @version 2017/4/19 16:31
 */
public class MarbleThreadFeature<V> implements RunnableFuture<V> {

    private ClogWrapper logger = ClogWrapperFactory.getClogWrapper(MarbleThreadFeature.class);
    private MarbleJob marbleJob;
    private String param;
    private FutureTask<Result> futureTask;


    public MarbleThreadFeature(final MarbleJob marbleJob, final String param) {
        super();
        this.marbleJob = marbleJob;
        this.param = param;
        futureTask = new FutureTask<>(new Callable<Result>() {
            @Override
            public Result call() throws Exception {
                return marbleJob.executeSync(param);
            }
        });
    }


    @Override
    public void run() {
        futureTask.run();
    }

    @Override
    public boolean cancel(boolean mayInterruptIfRunning) {
        return futureTask.cancel(mayInterruptIfRunning);
    }

    @Override
    public boolean isCancelled() {
        return futureTask.isCancelled();
    }

    @Override
    public boolean isDone() {
        return futureTask.isDone();
    }

    @Override
    public V get() throws InterruptedException, ExecutionException {
        return (V) futureTask.get();
    }

    @Override
    public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        return (V) futureTask.get(timeout, unit);
    }

    public void stop(String operator) {
        if (futureTask != null && !futureTask.isCancelled()) {
            logger.info("Thread-feature[{}] is interrupted", futureTask.getClass().getName());
            futureTask.cancel(true);
        }else if(marbleJob != null){
            boolean removeResult = ((ThreadPoolExecutor) ThreadPool.getFixedInstance().getExecutorService()).getQueue().remove(marbleJob);
            logger.info("Hanging MarbleJob[{}] is removed from the queue success?{}", marbleJob.getClass().getSimpleName(),removeResult);
        }
        //中斷後處理
        if(marbleJob != null){
            marbleJob.afterInterruptTreatment();
        }
    }

}

異步JOB代理類:


/**
 * @author <a href="dongjianxing@aliyun.com">jeff</a>
 * @version 2017/4/19 16:31
 */
public class MarbleThread implements Runnable {

    private ClogWrapper logger = ClogWrapperFactory.getClogWrapper(MarbleThread.class);
    private MarbleJob marbleJob;
    private String param;
    private Thread runThread;


    public MarbleThread(MarbleJob marbleJob, String param) {
        super();
        this.marbleJob = marbleJob;
        this.param = param;
    }

    @Override
    public void run() {
        runThread = Thread.currentThread();
        try {
            marbleJob.execute(param);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public boolean isThreadAlive() {
        return (runThread != null && runThread.isAlive());
    }

    public String getThreadName() {
        return runThread != null ? runThread.getName() : "";
    }

    public void stop() {
        //首先嚐試在阻塞隊列中刪除
        boolean removeResult = ((ThreadPoolExecutor) ThreadPool.getFixedInstance().getExecutorService()).getQueue().remove(this);
        logger.info("Hanging MarbleJob[{}] is removed from the queue success?{}", this.getClass().getSimpleName(), removeResult);
        if (runThread != null && !runThread.isInterrupted()) {
            logger.info("Thread[{}] is interrupted", runThread.getName());
            runThread.interrupt();
        }
        //中斷後處理
        if (marbleJob != null) {
            marbleJob.afterInterruptTreatment();
        }
    }
}

最後更新:2017-07-11 17:32:47

  上一篇:go  mybatis中動態sql常用的標簽
  下一篇:go  IT眾包不養技術人員該怎麼玩?