寫一個簡單的工作流(四)資源的處理
昨天晚上搞到深夜,終於將資源模塊搞定。到今天已經完成的功能包括:1.四種基本路由:順序、選擇、並行、循環
2.流程定義文件和係統配置文件的讀取和解析
3.使用內存作為流程數據和案例數據存儲的MemoryWorkFlowDAO的開發
4.資源模塊的開發
5.並發情況下的正確性測試等
計劃中的功能:
1.一個GUI的流程定義工具,這個不急,也還沒想好用什麼做,web還是桌麵?
2.各個數據庫版本的WorkFlowDAO的開發,將流程數據和案例數據保存在數據庫中。
3.更多的測試和example試驗。
回到資源這個概念,工作流中工作項(work item)的由資源來驅動的,這個資源(resource)可能是用戶、角色、定時時間或者某個事件消息。在標準petri網中,工作項對應於transition(變遷),變遷都是自動的,不需要所謂資源來驅動,顯然,這與工作流係統不同。具體到insect workflow(我取的名字,小巧之意),每個transition都有一個resource,用於驅動自身的firing,所有的resource都實現Resource接口:
public interface Resource extends Serializable {
public void start(Transition transition, Token token, Object args);
public ResourceType getType();
public long getId();
}
每個資源都有一個類型,以及這個類型中獨一無二的id,start方法用於驅動transtion的firing。一般情況下,你不需要實現這個接口,隻要繼承這個接口的抽象實現類AbstractResource,AbstractResource的start方法默認實現是首先調用模板方法doAction(稍後解釋),然後檢查觸發條件,如果通過就直接調用transition的fire方法:public void start(Transition transition, Token token, Object args);
public ResourceType getType();
public long getId();
}
public abstract class AbstractResource implements Resource {
public void start(Transition transition, Token token, Object args) {
doAction(transition, token, args);
if (transition.getCondition() != null
&& !transition.getCondition().check(token))
throw new ConditionException(transition.getName()
+ " transition沒有滿足觸發條件");
transition.fire(token, args);
}
public abstract void doAction(Transition transition, Token token,
Object
args) ;
}
public void start(Transition transition, Token token, Object args) {
doAction(transition, token, args);
if (transition.getCondition() != null
&& !transition.getCondition().check(token))
throw new ConditionException(transition.getName()
+ " transition沒有滿足觸發條件");
transition.fire(token, args);
}
public abstract void doAction(Transition transition, Token token,
Object

}
Transtion類的fire方法有三個操作組成:從輸入庫所移走token,往輸出庫所放入token,回調handler:
public void fire(Token token, Object
args) {
removeTokenFromInputs(token);
addTokenToOutputs(token);
invokeHandler(token, args);
}
那麼具體的資源顯然要實現AbstractResource中的doAction抽象方法,係統內置了五種資源:自動資源(AutoResource)、用戶(User)、用戶組(Group)、定時器(TimerResource)和事件監聽器(ObserverResource)。顯然,AutoResource、User和Group的doAction方法不需要做任何事情:
removeTokenFromInputs(token);
addTokenToOutputs(token);
invokeHandler(token, args);
}
public class User extends AbstractResource {
protected Group group;

@Override
public void doAction(Transition transition, Token token, Object
arg){
}
}
protected Group group;


@Override
public void doAction(Transition transition, Token token, Object

}
}
而TimerResource就需要做特殊處理了,比如我們要達到這樣的效果:節點1狀態已經處於就緒,可以被觸發,可我們希望在就緒後延遲半分鍾再觸發,或者在晚上10點觸發等等。這樣的定時需求很常見,我采用了jdk5引入的ScheduledExecutorService來處理。係統中啟動這樣一個線程池,每個類似上麵的請求都提交給這個線程池來處理,那麼TimerResource就需要進行相應的修改:
public abstract class TimerResource extends AbstractResource {
protected int pool_size;
protected static ScheduledExecutorService scheduledExecutorService;
@Override
public long getId() {
// TODO Auto-generated method stub
return Common.TIMER_RESOURCE_ID;
}
public TimerResource() {
this.pool_size = 5;
scheduledExecutorService = Executors.newScheduledThreadPool(pool_size);
}
public static void shutdownPool() {
if (scheduledExecutorService != null)
scheduledExecutorService.shutdown();
}
public final void start(Transition transition, Token token, Object
args)
throws InterruptedException {
if (transition.getCondition() != null
&& !transition.getCondition().check(token))
throw new ConditionException(transition.getName()
+ " transition沒有滿足觸發條件");
transition.removeTokenFromInputs(token);
doAction(transition, token, args);
}
protected class ChangeRunner implements Runnable {
private Transition transition;
private Token token;
private Object[] args;
public ChangeRunner(Transition transition, Token token, Object
args) {
this.transition = transition;
this.token = token;
this.args = args;
}
public void run() {
if (transition.getCondition() != null
&& !transition.getCondition().check(token))
throw new ConditionException(transition.getName()
+ " transition沒有滿足觸發條件");
transition.addTokenToOutputs(token);
Object real_args[] = new Object[args.length - 2];
for (int i = 0; i < real_args.length; i++)
real_args[i] = args[i + 2];
transition.invokeHandler(token, real_args);
try {
// 回調
((WorkFlowAlgorithm) args[1]).enabledTraversing(token
.getWorkFlow());
((WorkFlowManager) args[0]).doAction(token.getId());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
注意到,start方法不再是直接調用transition的fire方法,而僅僅是進行了第一步操作:移除輸入庫所的place防止重複提交。後兩步操作都延遲到了提交給線程池的任務中,也就是代碼中的ChangeRunner類中的run方法。例如TimerResource的子類DelayTimerResource用於處理延遲的觸發,doAction就像這樣:protected int pool_size;
protected static ScheduledExecutorService scheduledExecutorService;
@Override
public long getId() {
// TODO Auto-generated method stub
return Common.TIMER_RESOURCE_ID;
}
public TimerResource() {
this.pool_size = 5;
scheduledExecutorService = Executors.newScheduledThreadPool(pool_size);
}
public static void shutdownPool() {
if (scheduledExecutorService != null)
scheduledExecutorService.shutdown();
}
public final void start(Transition transition, Token token, Object

throws InterruptedException {
if (transition.getCondition() != null
&& !transition.getCondition().check(token))
throw new ConditionException(transition.getName()
+ " transition沒有滿足觸發條件");
transition.removeTokenFromInputs(token);
doAction(transition, token, args);
}
protected class ChangeRunner implements Runnable {
private Transition transition;
private Token token;
private Object[] args;
public ChangeRunner(Transition transition, Token token, Object

this.transition = transition;
this.token = token;
this.args = args;
}
public void run() {
if (transition.getCondition() != null
&& !transition.getCondition().check(token))
throw new ConditionException(transition.getName()
+ " transition沒有滿足觸發條件");
transition.addTokenToOutputs(token);
Object real_args[] = new Object[args.length - 2];
for (int i = 0; i < real_args.length; i++)
real_args[i] = args[i + 2];
transition.invokeHandler(token, real_args);
try {
// 回調
((WorkFlowAlgorithm) args[1]).enabledTraversing(token
.getWorkFlow());
((WorkFlowManager) args[0]).doAction(token.getId());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
public class DelayTimerResource extends TimerResource {


@Override
public void doAction(Transition transition, Token token, Object
args){
scheduledExecutorService.schedule(new ChangeRunner(transition, token,
args), this.delay, this.timeUnit);
}
}


@Override
public void doAction(Transition transition, Token token, Object

scheduledExecutorService.schedule(new ChangeRunner(transition, token,
args), this.delay, this.timeUnit);
}
}
延遲的時間,時間單位這些信息都可以在流程定義文件中設置。事件監聽器資源與此類似,ObserverResource實現了java.util.Observer接口,往輸出庫所放入token和回調handler兩步操作被放在了update方法中提供給Subject回調。
文章轉自莊周夢蝶 ,原文發布時間2007-10-13
最後更新:2017-05-17 17:01:51