547
王者榮耀
基於Consul的分布式鎖實現
我們在構建分布式係統的時候,經常需要控製對共享資源的互斥訪問。這個時候我們就涉及到分布式鎖(也稱為全局鎖)的實現,基於目前的各種工具,我們已經有了大量的實現方式,比如:基於Redis的實現、基於Zookeeper的實現。本文將介紹一種基於Consul 的Key/Value存儲來實現分布式鎖以及信號量的方法。
分布式鎖實現
基於Consul的分布式鎖主要利用Key/Value存儲API中的acquire和release操作來實現。acquire和release操作是類似Check-And-Set的操作:
- acquire操作隻有當鎖不存在持有者時才會返回true,並且set設置的Value值,同時執行操作的session會持有對該Key的鎖,否則就返回false。
- release操作則是使用指定的session來釋放某個Key的鎖,如果指定的session無效,那麼會返回false,否則就會set設置Value值,並返回true。
具體實現中主要使用了這幾個Key/Value的API:
- create session:https://www.consul.io/api/session.html
- delete session:https://www.consul.io/api/session.html
- KV acquire/release:https://www.consul.io/api/kv.html
基本流程
具體實現
public class Lock {
private static final String prefix = "lock/"; // 同步鎖參數前綴
private ConsulClient consulClient;
private String sessionName;
private String sessionId = null;
private String lockKey;
/**
*
* @param consulClient
* @param sessionName 同步鎖的session名稱
* @param lockKey 同步鎖在consul的KV存儲中的Key路徑,會自動增加prefix前綴,方便歸類查詢
*/
public Lock(ConsulClient consulClient, String sessionName, String lockKey) {
this.consulClient = consulClient;
this.sessionName = sessionName;
this.lockKey = prefix + lockKey;
}
/**
* 獲取同步鎖
*
* @param block 是否阻塞,直到獲取到鎖為止
* @return
*/
public Boolean lock(boolean block) {
if (sessionId != null) {
throw new RuntimeException(sessionId + " - Already locked!");
}
sessionId = createSession(sessionName);
while(true) {
PutParams putParams = new PutParams();
putParams.setAcquireSession(sessionId);
if(consulClient.setKVValue(lockKey, "lock:" + LocalDateTime.now(), putParams).getValue()) {
return true;
} else if(block) {
continue;
} else {
return false;
}
}
}
/**
* 釋放同步鎖
*
* @return
*/
public Boolean unlock() {
PutParams putParams = new PutParams();
putParams.setReleaseSession(sessionId);
boolean result = consulClient.setKVValue(lockKey, "unlock:" + LocalDateTime.now(), putParams).getValue();
consulClient.sessionDestroy(sessionId, null);
return result;
}
/**
* 創建session
* @param sessionName
* @return
*/
private String createSession(String sessionName) {
NewSession newSession = new NewSession();
newSession.setName(sessionName);
return consulClient.sessionCreate(newSession, null).getValue();
}
}
單元測試
下麵單元測試的邏輯:通過線程的方式來模擬不同的分布式服務來競爭鎖。多個處理線程同時以阻塞方式來申請分布式鎖,當處理線程獲得鎖之後,Sleep一段隨機事件,以模擬處理業務邏輯,處理完畢之後釋放鎖。
public class TestLock {
private Logger logger = Logger.getLogger(getClass());
@Test
public void testLock() throws Exception {
new Thread(new LockRunner(1)).start();
new Thread(new LockRunner(2)).start();
new Thread(new LockRunner(3)).start();
new Thread(new LockRunner(4)).start();
new Thread(new LockRunner(5)).start();
Thread.sleep(200000L);
}
class LockRunner implements Runnable {
private Logger logger = Logger.getLogger(getClass());
private int flag;
public LockRunner(int flag) {
this.flag = flag;
}
@Override
public void run() {
Lock lock = new Lock(new ConsulClient(), "lock-session", "lock-key");
try {
if (lock.lock(true)) {
logger.info("Thread " + flag + " start!");
Thread.sleep(new Random().nextInt(3000L));
logger.info("Thread " + flag + " end!");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
}
單元測試執行結果如下:
2017-04-12 21:28:09,698 INFO [Thread-0] LockRunner – Thread 1 start!
2017-04-12 21:28:12,717 INFO [Thread-0] LockRunner – Thread 1 end!
2017-04-12 21:28:13,219 INFO [Thread-2] LockRunner – Thread 3 start!
2017-04-12 21:28:15,672 INFO [Thread-2] LockRunner – Thread 3 end!
2017-04-12 21:28:15,735 INFO [Thread-1] LockRunner – Thread 2 start!
2017-04-12 21:28:17,788 INFO [Thread-1] LockRunner – Thread 2 end!
2017-04-12 21:28:18,249 INFO [Thread-4] LockRunner – Thread 5 start!
2017-04-12 21:28:19,573 INFO [Thread-4] LockRunner – Thread 5 end!
2017-04-12 21:28:19,757 INFO [Thread-3] LockRunner – Thread 4 start!
2017-04-12 21:28:21,353 INFO [Thread-3] LockRunner – Thread 4 end!
|
從測試結果我們可以看到,通過分布式鎖的形式來控製並發時,多個同步操作隻會有一個操作能夠被執行,其他操作隻有在等鎖釋放之後才有機會去執行,所以通過這樣的分布式鎖,我們可以控製共享資源同時隻能被一個操作進行執行,以保障數據處理時的分布式並發問題。
優化建議
本文我們實現了基於Consul的簡單分布式鎖,但是在實際運行時,可能會因為各種各樣的意外情況導致unlock操作沒有得到正確地執行,從而使得分布式鎖無法釋放。所以為了更完善的使用分布式鎖,我們還必須實現對鎖的超時清理等控製,保證即使出現了未正常解鎖的情況下也能自動修複,以提升係統的健壯性。那麼如何實現呢?請持續關注我的後續分解!
參考文檔
- Key/Value的API:https://www.consul.io/api/kv.html
- 選舉機製:https://www.consul.io/docs/guides/leader-election.html
實現代碼
- GitHub:https://github.com/dyc87112/consul-distributed-lock
- 開源中國:https://git.oschina.net/didispace/consul-distributed-lock
最後更新:2017-05-18 20:31:31