Marble原理之線程中斷
本章節依賴於【Marble使用】,閱讀本章節前請保證已經充分了解Marble。
中斷特性從Marble-Agent 2.0.5開始支持。
線程中斷使用
- 引入marble-agent jar包
xml <dependency> <groupId>com.github.jeff-dong</groupId> <artifactId>marble-agent</artifactId> <version>最新版</version> </dependency>
-
JOB執行代碼適當位置添加中斷標誌, 下麵給出示例代碼
@Component("job1") public class Job1 extends MarbleJob { private ClogWrapper logger = ClogWrapperFactory.getClogWrapper(Job1.class); @Override public void execute(String param) throws Exception { logger.info("JOB1開始執行 ..."); int i = 0; while (true) { i++; //1、用中斷狀態碼進行判斷 if (Thread.interrupted()) { logger.info("JOB1-[{}]-[{}]被打斷啦", param, Thread.currentThread().getName()); return; } try { Thread.sleep(500); } catch (InterruptedException e) { //2、捕獲終端異常後return結束 return; } logger.info("JOB1-[{}]-[{}]-{}-------", param, Thread.currentThread().getName(), i); } } }
Marble OFFLINE進行線程中斷
3.1 手動調度線程中斷
中斷實現及原理
Java的線程中斷
Java的線程中斷機製是一種協作機製,線程中斷並不能立即停掉線程執行,相反,可能線程永遠都不會響應。
java的線程中斷模型隻是通過修改線程的中斷標誌(interrupt)進行中斷通知,不會有其它額外操作,因此線程是否最終中斷取決於線程的執行邏輯。因此,如果想讓線程按照自己的想法中斷,要代碼中事先進行中斷的“埋點”處理。
有人可能會想到Thread的stop方法進行中斷,由於此方法可能造成不可預知的結果,已經被拋棄
Marble進行線程中斷實現
需求收集
- 以JOB為維度進行線程中斷;
- 盡量做到實時響應;
- 存在集群中多台機器,要支持指定某台機器中的線程中斷;
- 允許多次中斷嚐試;
- 中斷請求不能依賴於JOB當前狀態。可能已經停止調度的JOB也要手動中斷執行中的線程;
- 透明和擴展不同JOB的中斷(提供用戶中斷的"後處理"擴展);
需求分析及實現
【以JOB為維度進行線程中斷】
Marble的JOB標誌為 schedulerName-appId-jobName組成,目前Marble每個JOB調度時間和頻率都是個性化,目前調度完成就銷毀。但要做到任何時間進行執行中的線程中斷就要:
1.1 存儲JOB的運行線程,隨時準備中斷;
1.2 在緩存的JOB數量/時間和性能間做權衡,不能過多也不能過少;
1.3 製定緩存已滿時的拋棄策略,避免緩存被占滿新的線程永遠無法中斷;
1.4 要同步JOB和異步JOB透明處理(感覺不出差異);
實現:
Marble的線程池中定義支持並發的MAP進行JOB維度的線程緩存,此外指定每個JOB下緩存的線程數量。如下:
public class ThreadPool {
...
private Multimap<String, Object> threadMultimap = Multimaps.synchronizedMultimap(HashMultimap.<String, Object>create());
//multimap的單個key的最大容量
private static final int THREADMULTIMAP_SIZE = 50;
...
}
Marble-Agent在同步/異步JOB生成新的線程對象時進行放入MAP緩存,如果緩存(50個)已滿采用如下策略進行處理:
1. 嚐試清理當前map中的非活躍線程;
2. 嚐試清理當前map中已經完成的線程(同步線程有效);
3. 如果還未清理出空間,移除最久的線程;
public ThreadPool multimapPut(String key, Object value) {
if (StringUtils.isNotBlank(key)) {
Collection collection = threadMultimap.get(key);
if (collection != null && collection.size() >= THREADMULTIMAP_SIZE) {
//替換最久的
Iterator<Object> it = collection.iterator();
//首先進行 非活躍線程清理
while (it.hasNext()) {
Object tempObj = it.next();
if(tempObj instanceof MarbleThread){
MarbleThread mt = (MarbleThread)tempObj;
//不活躍刪除
if(!mt.isThreadAlive()){
it.remove();
}
}else if(tempObj instanceof MarbleThreadFeature){
MarbleThreadFeature mf = (MarbleThreadFeature) tempObj;
//完成的線程刪除
if(mf.isDone()){
it.remove();
}
}
}
//仍然>最大值,刪除最久未使用
if(collection.size() >= THREADMULTIMAP_SIZE){
while (it.hasNext()) {
it.next();
it.remove();
break;
}
}
threadMultimap.put(key, value);
return this;
}
}
threadMultimap.put(key, value);
return this;
}
此外,為了能在JVM關閉時進行線程中斷,添加JVM hook進行中斷調用處理(包括線程池的銷毀)。
除此之外,還有個小問題,由於線程池使用的是有界的阻塞隊列,此種情況下,線程中斷時可能有的線程存在於阻塞隊列中,單純的中斷無效,對於此類情況,要首先判斷阻塞隊列中是否存在要中斷的線程,存在的話進行隊列的移除操作。
【盡量做到實時響應】
隻能通過用戶在具體的線程邏輯中進行埋點處理,Marble在框架層麵除了及時把用戶的中斷請求送達之外,沒有其它措施。
【存在集群中多台機器,要支持指定某台機器中的線程中斷】
Marble OFFLINE的中斷頁麵支持機器的選擇,用戶進行選擇後,Marble會有針對性的進行機器的中斷RPC發送。
【允許多次中斷嚐試】
OFFLINE未對中斷次數進行限製,目前支持多次中斷請求發送。
【中斷請求不能依賴於JOB當前狀態】
考慮到用戶對曆史線程的中斷請求,Marble未把中斷操作綁定在JOB狀態上,任何JOB都可以進行終端嚐試。
【透明擴展不同JOB的中斷】
Marble目前支持同步和異步JOB,兩類JOB的中斷處理並不一致,比如同步job的中斷是通過FeatureTask的cancel實現,異步JOB是通過Thread的interrupt實現,此外線程被中斷後Marble希望能更進一步提供一個統一的“後處理”操作給用戶自己實現,比如用戶可能需要在線程被中斷後進行一些後續的log記錄等。
為了代碼層麵一致透明,且友好的實現“後處理”的封裝,Marble使用了代理模式,在Thread和FeatureTask上添加了一層“代理類”,由代理進行具體的中斷操作。
同步JOB代理類:
/**
* @author <a href="dongjianxing@aliyun.com">jeff</a>
* @version 2017/4/19 16:31
*/
public class MarbleThreadFeature<V> implements RunnableFuture<V> {
private ClogWrapper logger = ClogWrapperFactory.getClogWrapper(MarbleThreadFeature.class);
private MarbleJob marbleJob;
private String param;
private FutureTask<Result> futureTask;
public MarbleThreadFeature(final MarbleJob marbleJob, final String param) {
super();
this.marbleJob = marbleJob;
this.param = param;
futureTask = new FutureTask<>(new Callable<Result>() {
@Override
public Result call() throws Exception {
return marbleJob.executeSync(param);
}
});
}
@Override
public void run() {
futureTask.run();
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return futureTask.cancel(mayInterruptIfRunning);
}
@Override
public boolean isCancelled() {
return futureTask.isCancelled();
}
@Override
public boolean isDone() {
return futureTask.isDone();
}
@Override
public V get() throws InterruptedException, ExecutionException {
return (V) futureTask.get();
}
@Override
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return (V) futureTask.get(timeout, unit);
}
public void stop(String operator) {
if (futureTask != null && !futureTask.isCancelled()) {
logger.info("Thread-feature[{}] is interrupted", futureTask.getClass().getName());
futureTask.cancel(true);
}else if(marbleJob != null){
boolean removeResult = ((ThreadPoolExecutor) ThreadPool.getFixedInstance().getExecutorService()).getQueue().remove(marbleJob);
logger.info("Hanging MarbleJob[{}] is removed from the queue success?{}", marbleJob.getClass().getSimpleName(),removeResult);
}
//中斷後處理
if(marbleJob != null){
marbleJob.afterInterruptTreatment();
}
}
}
異步JOB代理類:
/**
* @author <a href="dongjianxing@aliyun.com">jeff</a>
* @version 2017/4/19 16:31
*/
public class MarbleThread implements Runnable {
private ClogWrapper logger = ClogWrapperFactory.getClogWrapper(MarbleThread.class);
private MarbleJob marbleJob;
private String param;
private Thread runThread;
public MarbleThread(MarbleJob marbleJob, String param) {
super();
this.marbleJob = marbleJob;
this.param = param;
}
@Override
public void run() {
runThread = Thread.currentThread();
try {
marbleJob.execute(param);
} catch (Exception e) {
e.printStackTrace();
}
}
public boolean isThreadAlive() {
return (runThread != null && runThread.isAlive());
}
public String getThreadName() {
return runThread != null ? runThread.getName() : "";
}
public void stop() {
//首先嚐試在阻塞隊列中刪除
boolean removeResult = ((ThreadPoolExecutor) ThreadPool.getFixedInstance().getExecutorService()).getQueue().remove(this);
logger.info("Hanging MarbleJob[{}] is removed from the queue success?{}", this.getClass().getSimpleName(), removeResult);
if (runThread != null && !runThread.isInterrupted()) {
logger.info("Thread[{}] is interrupted", runThread.getName());
runThread.interrupt();
}
//中斷後處理
if (marbleJob != null) {
marbleJob.afterInterruptTreatment();
}
}
}
最後更新:2017-07-11 17:32:47