android網絡業務的封裝與調度
https://ikeepu.com/bar/10266838
手機客戶端程序由於網絡寬帶的約束,尤其在GPRS網絡環境下,大數據量的網絡交互很大程度上降低應用的響應,影響用戶體驗。比如,如果做一個手機網盤客戶端,在後台上傳文件時(大數據量的交互),獲取文件列表(命令類的交互)這個過程就顯得太別慢。而我們的要求是希望這些命令類操作能盡快得到響應。
通常,在手機客戶端,我們設計一個網絡操作的管理器,來統一管理這些需要聯網的操作。
具體做法是把網絡操作封裝成一個Command(或者說是Task),管理器實現特定的調度規則來調度運行這些Task。
這樣做的好處至少有三:
一. 用Command封裝了網絡操作,使得這些操作與上傳的業務分離,解除了強耦合。
二. 可以根據網絡情況來確定來采用不同的調度規則,提高用戶體驗。
三. 重用,這些Task和TaskManager的代碼在別的手機應用上基本上能照搬過去。
四. 擴展,當應用需要擴展新的業務時,隻有擴展一個新的Command(或者說是Task),接受調度即可,易於擴展。
例子:
還是以上文提到的微盤為例,可以概括我們對管理器的設計要求有:
在Wifi網絡環境下:
一:各種網絡操作可以並行運行。
在GPRS網絡環境下:
二:支持優先級搶占調度,命令類操作的優先級比數據傳輸類的優先級高,當命令類的Task(獲取文件列表)提交後,打斷數據傳輸的Task(如上傳,下載),等命令類的任務運行完畢,再接著運行數據類任務(斷點上傳,下載)。
二:同一個優先級的任務可以並行運行,如多個命令一起在網絡上傳輸。
實現思路:
TaskManager :
1. TaskManager開辟一個後台線程進行調度工作。
2. 由於要支持多個優先級的搶占調度,我們需要兩個隊列來維護運行中的Task和等待中的Task。
3. 由於Task的調度是基於優先級的,我們可以使用優先級隊列,運行隊列采用PriorityQueue,等待隊列使用PriorityBlockingQueue,當沒有網絡業務需要運行時,調度線程阻塞掛起,避免空轉。
4. TaskManager設計為單一實例(單一模式)。
5. 每個Task被調度運行時,該Task被從等待隊列移動運行隊列,當Task執行完畢時,從運行隊列刪除,喚醒調度線程進行新的調度。
下麵是簡單的設計代碼:
public final class TaskEngine implements Runnable{ private PriorityQueue<Task> runningQueue;//運行的task隊列 private PriorityBlockingQueue<Task> readyQueue;//就緒的task隊列,準備接受調度的task列表 private final AtomicLong taskIdProducer = new AtomicLong(1);//Task Id生成器 private Object sheduleLock = new Object();//同步鎖 private static TaskEngine instance; public long addTask(BusinessObject bo){ Task task = new Task(bo); long newTaskId = taskIdProducer.incrementAndGet(); task.setTaskId(newTaskId); if(this.isWifiNetWork()){ //WIFI網絡 synchronized(sheduleLock){ runningQueue.add(task); } new Thread(task).start(); }else{ //GPRS網絡 if(readyQueue.offer(task)){ //task入就緒隊列 final ReentrantLock lock = this.lock; lock.lock(); try{ needSchedule.signal(); //喚醒調度線程重新調度 }finally{ lock.unlock();} } } return newTaskId; } public final void run(){//task調度邏輯 .... .... } //掛起調度線程 當不需要調度時 private void waitUntilNeedSchedule() throws InterruptedException { ..... } }
Task:
1. 對要執行的網絡操作的封裝。
2. Task執行完畢時,發Task結束信號,喚醒調度線程進行新的調度
3. Task要實現Comparable接口,才能讓TaskManager的兩個隊列自動其包含的Task排序。
下麵是簡單的設計代碼:
public class Task implements Runnable,Comparable<Task>{ private long taskId; private BusinessObject bo;//封裝網絡操作的業務對象的抽象父類, //它封裝了具體的Command,保證了業務擴展中Task的接口不變 @Override public void run() { this.onTaskStart(); this.bo.execute(); this.onTaskEnd(); } private voidonTaskStart() {...} public int getPriority() {...} public void setPriority(intpriority) {...} @Override public int compareTo(Task object1) { return this.getPriority()>object1.getPriority()?-1:1; } }
小注意事項:
Android對PriorityQueue的實現和Jdk中的實現有點不一樣。
(PriorityQueue.java在android中的代碼路徑是usr\dalvik\libcore\luni\src\main\java\java\util)
PriorityQueue.remove(Object o) ;//從PriorityQueue中刪除一個元素。
對於完成這個刪除操作android和jdk都是分兩個過程實現,一,找出待刪除元素的索引index,二,刪除index所在元素。
在JDK中,是通過調用元素的equals方法來找到待刪除元素的索引,
private int indexOf(Object o) { if (o != null) { for (int i = 0; i < size; i++) if (o.equals(queue[i])) return i; } return -1; }
在android中,是間接調用元素的compareTo方法判斷結果是否為0來找到待刪除元素的索引,
int targetIndex; for (targetIndex = 0; targetIndex < size; targetIndex++) { if (0 == this.compare((E) o, elements[targetIndex])) { break; } } private int compare(E o1, E o2) { if (null != comparator) { return comparator.compare(o1, o2); } return ((Comparable<? super E>) o1).compareTo(o2); }
所以為了Task能在執行完畢時從PriorityQueue找到這個Task並刪除之,需要在compareTo方法裏在優先級相等時返回0。
@Override public int compareTo(Task object1) { if(this.getPriority()==object1.getPriority()) return 0; return this.getPriority()>object1.getPriority()?-1:1; }
當是這樣運行PriorityQueue.remove(Object o) 邏輯上隻能刪除PriorityQueue裏第一個優先級與被刪除的元素優先級相等的元素(可能是待刪除的元素也可能不是),有誤刪的可能,需要做如下修改:
@Override public int compareTo(Task object1) { if(this.getPriority()==object1.getPriority() && this.equals(object1)) return 0; return this.getPriority()>object1.getPriority()?-1:1; }
這樣才能正確執行remove(Object o),刪除指定的對象o。
個人覺得android這樣設計使得remove(Object o)複雜化了,不然JDK中那麼簡潔。語義上也不是那麼好懂了,
因為按通常理解,equals是判斷兩個對象是否相等,compareTo可能是對象的某個屬性的比較(類別數據庫中的order by),
而現在執行PriorityQueue.remove(Object o),這個對象o明明在容器PriorityQueue中,卻刪除不了,除非去翻看android中PriorityQueue的實現代碼,然後重寫compareTo這個方法。這樣的API使用時比較容易出錯,應該不符號良好的API設計規範
吧。
完整的代碼實現如下:
* @類名:TaskEngine * @創建:baiyingjun (devXiaobai@gmail.com) * @創建日期:2011-7-7 * @說明:task調度引擎 ***************************************************/ public final class TaskEngine implements Runnable{ private static final String TAG=Log.makeTag(TaskEngine.class); private PriorityQueue<Task> runningQueue;//運行的task隊列 private PriorityBlockingQueue<Task> readyQueue;//就緒的task隊列,準備接受調度的task列表 private final AtomicLong taskIdProducer = new AtomicLong(1); private Object sheduleLock = new Object();//調度鎖 private static TaskEngine instance; private final ReentrantLock lock = new ReentrantLock(true); private final Condition needSchedule = lock.newCondition(); private Task currentTask;//準備接受調度的task private Context mAppContext; /** * add BusinessObject to taskEngine */ public long addTask(BusinessObject bo) throws NetworkNotConnectException{ Task task = new Task(bo); long newTaskId = taskIdProducer.incrementAndGet(); task.setTaskId(newTaskId); if(Log.DBG){ Log.d(TAG, "Add task with task id "+newTaskId+", priority "+task.getPriority()); } if(this.isWifiNetWork()){ //WIFI網絡 synchronized(sheduleLock){ runningQueue.add(task); } new Thread(task).start(); }else{ //GPRS網絡 if(readyQueue.offer(task)){ //task入就緒隊列 if(Log.DBG) Log.d(TAG, "add task " +task.bo.methodName+" "+task.taskId+" to ready queue"); final ReentrantLock lock = this.lock; lock.lock(); try{ needSchedule.signal(); //喚醒調度線程重新調度 }finally{ lock.unlock(); } } //schedule(); } return newTaskId; } private TaskEngine(Context context){ mAppContext = context; runningQueue = new PriorityQueue<Task>(); readyQueue = new PriorityBlockingQueue<Task>(); new Thread(this).start(); Log.i(TAG, "shedule thread working"); } public synchronized static TaskEngine getInstance(Context context){ Context appContext = context.getApplicationContext(); if(instance==null || instance.mAppContext!=appContext){ instance=new TaskEngine(appContext); } return instance; } protected boolean isWifiNetWork() throws NetworkNotConnectException{ return NetworkManager.isWIFINetWork(mAppContext); } /** * task調度邏輯 */ public final void run(){ Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND); while(true){ try { if(this.isWifiNetWork()){ Task task = this.readyQueue.take(); if(task !=null){ synchronized(sheduleLock){ runningQueue.add(task); } new Thread(task).start(); } } else{//非wifi網絡 //空就緒隊列,空運行隊列,等待直到有任務到來 if(this.readyQueue.size()==0 && runningQueue.size()==0){ currentTask=readyQueue.take(); synchronized(sheduleLock){ runningQueue.add(currentTask); } new Thread(currentTask).start(); } //搶占式調度(就緒隊列非空,運行隊列優先級比就緒隊列優先級低) else if(readyQueue.size()>0 && this.readyQueue.element().getPriority()>=this.getMaxPriority()){ currentTask = readyQueue.take(); if(currentTask.getPriority()>this.getMaxPriority()){//暫停低優先級的任務運行 synchronized(sheduleLock){ for(int i=0;i<runningQueue.size();i++){ Task toStopTask =runningQueue.remove(); //因為任務調度,將低優先級的任務暫時給凍結起來 toStopTask.setState(Task.STATE_FROST); readyQueue.add(toStopTask); } } } //運行被調度的任務 runningQueue.add(currentTask); new Thread(currentTask).start(); }else {//等高優先級的任務運行完畢 waitUntilNeedSchedule(); } } }catch (InterruptedException e) { Log.e(TAG, "Schedule error "+e.getMessage()); } catch (NetworkNotConnectException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } /* * 等待,直到就緒隊列裏的最高優先級比當前運行優先級高,或者就緒隊列為空時等待運行隊列運行完畢 */ private void waitUntilNeedSchedule() throws InterruptedException{ final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { try{ while ((readyQueue.size()>0 && readyQueue.element().getPriority()<getMaxPriority())//等高優先級的任務運行完畢 || (readyQueue.size()==0 && runningQueue.size()>0) ){//或者等運行隊列運行完畢 if(Log.DBG) Log.d(TAG, "waiting sheduling........"); needSchedule.await(); } } catch (InterruptedException ie) { needSchedule.signal(); // propagate to non-interrupted thread throw ie; } } finally { lock.unlock(); } } /** * Hand the specified task ,such as pause,delete and so on */ public boolean handTask(long taskId,int handType) { Log.i(TAG, "set task`s state with taskId "+taskId); synchronized(this.sheduleLock){ //如果在運行隊列裏,取消該任務 Iterator<Task> runningItor= this.runningQueue.iterator(); while(runningItor.hasNext()){ Task task = runningItor.next(); boolean b = task.equals(this); if(task.getTaskId()==taskId){ runningQueue.remove(task); task.setState(handType); Log.i(TAG, "set runningQueue taskId = "+taskId + " state " + handType); return true; } } //如果在就緒隊列裏,刪除 Iterator<Task> readyItor= this.readyQueue.iterator(); while(readyItor.hasNext()){ Task task = readyItor.next(); if(task.getTaskId()==taskId){ // readyQueue.remove(task); task.setState(handType); Log.i(TAG, "set readyQueue taskId = "+taskId + " state " + handType); return true; } } return false; } } /*** * 獲取運行隊列任務的最高優先級 */ private int getMaxPriority(){ if(this.runningQueue==null || this.runningQueue.size()==0) return -1; else{ return this.runningQueue.element().getPriority(); } } /*************************************************** * @類名:Task * @創建:baiyingjun (devXiaobai@gmail.com) * @創建日期:2011-7-7 * @說明:業務對象的包裝成可運行實體 ***************************************************/ public class Task implements Runnable,Comparable<Task>{ //運行 public static final int STATE_INIT = -1; //運行 public static final int STATE_RUN = 0; //停止 public static final int STATE_STOP = 1; //暫停 public static final int STATE_PAUSE = 2; //取消 public static final int STATE_CANCLE = 3; //凍結,如果在GPRS下,因為線程調度的時候低優先級的被放readyqueue裏的時候,要把這個任務暫時給“凍結”起來 public static final int STATE_FROST = 4; private long taskId; private BusinessObject bo; public Task(){ } public Task(BusinessObject bo){ this.bo=bo; } @Override public void run() { this.onTaskStart(); this.bo.execute(); this.onTaskEnd(); } private void onTaskStart(){ this.bo.setmState(STATE_RUN); } public long getTaskId() { return taskId; } public void setTaskId(long taskId) { this.taskId = taskId; } public int getPriority() { return this.bo.getPriority(); } public void setPriority(int priority) { this.bo.setPriority(priority); } /* * compare task priority */ @Override public int compareTo(Task object1) { if(this.getPriority()==object1.getPriority()&& this.equals(object1)) return 0; return this.getPriority()>object1.getPriority()?-1:1; } public void setState(int state){//設置當前運行的task的state this.bo.setmState(state); Log.d(TAG, "Set task "+this.bo.methodName+" "+this.taskId + " state " + state); } private void onTaskEnd(){//運行完畢後從taskengine運行隊列裏刪除 if(Log.DBG){ Log.d(TAG, "task "+this.bo.methodName+" "+taskId+" End"); } if(this.bo.getmState() == STATE_FROST)//因為調度停止了該業務 return; final ReentrantLock lock = TaskEngine.this.lock; lock.lock(); try{ boolean removed = runningQueue.remove(this); //remove from running queue assert removed; if(Log.DBG) Log.d(TAG, this.bo.methodName+" "+this.taskId+" remove from runningQueue"); needSchedule.signal(); //喚醒調度線程重新調度 }finally{ lock.unlock(); } } @Override public boolean equals(Object o) { // TODO Auto-generated method stub if(this==o){ return true; } if(o instanceof Task){ return taskId==((Task)o).taskId; } return false; } } }
拾漏補遺:
1.補充最初的設計類圖(可能與代碼不太一致,但能說明問題)
2. BusinessObject的實現代碼:
/*************************************************** * @類名:BusinessObject * @創建:baiyingjun (devXiaobai@gmail.com) * @創建日期:2011-7-6 * @說明:抽象的業務,擴展的網絡業務要繼承這個類並實現抽象方法execute() ***************************************************/ public abstract class BusinessObject { public static final int LOWEST_PRIORITY=1;//最低優先級 public static final int LOW_PRIORITY=2;//低優先級 public static final int NORMAL_PRIORITY=3;//正常優先級 public static final int HIGH_PRIORITY=4;//高優先級 public static final int HIGHEST_PRIORITY=5;//最高優先級 protected BusinessListener listnener;//運行業務的回調 protected Context mContext; private long taskId; protected Map<String,Object> params;//運行業務需要的參數 private int priority; public int getPriority() { return priority; } public void setPriority(int priority) { this.priority = priority; } //設置回調 public void addBusinessListener(BusinessListener listnener){ this.listnener=listnener; } public long doBusiness() throws NetworkNotConnectException{ taskId= TaskEngine.getInstance(mContext).addTask(this); return taskId; } public abstract void execute(); }
最後更新:2017-04-02 16:47:44