跟著實例學習ZooKeeper的用法: 隊列
使用Curator也可以簡化Ephemeral Node (臨時節點)的操作。Curator也提供ZK Recipe的分布式隊列實現。 利用ZK的 PERSISTENTSEQUENTIAL節點, 可以保證放入到隊列中的項目是按照順序排隊的。 如果單一的消費者從隊列中取數據, 那麼它是先入先出的,這也是隊列的特點。 如果你嚴格要求順序,你就的使用單一的消費者,可以使用leader選舉隻讓leader作為唯一的消費者。
但是, 根據Netflix的Curator作者所說, ZooKeeper真心不適合做Queue,或者說ZK沒有實現一個好的Queue,詳細內容可以看 Tech Note 4, 原因有五:
- ZK有1MB 的傳輸限製。 實踐中ZNode必須相對較小,而隊列包含成千上萬的消息,非常的大。
- 如果有很多節點,ZK啟動時相當的慢。 而使用queue會導致好多ZNode. 你需要顯著增大 initLimit 和 syncLimit.
- ZNode很大的時候很難清理。Netflix不得不創建了一個專門的程序做這事。
- 當很大量的包含成千上萬的子節點的ZNode時, ZK的性能變得不好
- ZK的數據庫完全放在內存中。 大量的Queue意味著會占用很多的內存空間。
盡管如此, Curator還是創建了各種Queue的實現。 如果Queue的數據量不太多,數據量不太大的情況下,酌情考慮,還是可以使用的。
DistributedQueue
DistributedQueue是最普通的一種隊列。 它設計以下四個類:
- QueueBuilder
- QueueConsumer
- QueueSerializer
- DistributedQueue
創建隊列使用QueueBuilder,它也是其它隊列的創建類。
public static <T> QueueBuilder<T> builder(CuratorFramework client,
QueueConsumer<T> consumer,
QueueSerializer<T> serializer,
java.lang.String queuePath)
QueueBuilder<MessageType> builder = QueueBuilder.builder(client, consumer, serializer, path);
... more builder method calls as needed ...
DistributedQueue<MessageType queue = builder.build(); `
創建好queue就可以往裏麵放入數據了:
queue.put(aMessage);
QueueSerializer
提供了對隊列中的對象的序列化和反序列化。
QueueConsumer
是消費者, 它可以接收隊列的數據。 處理隊列中的數據的代碼邏輯可以放在QueueConsumer.consumeMessage()
中。
正常情況下先將消息從隊列中移除,再交給消費者消費。 但這是兩個步驟,不是原子的。 可以調用Builder的lockPath()
消費者加鎖, 當消費者消費數據時持有鎖,這樣其它消費者不能消費此消息。 如果消費失敗或者進程死掉,消息可以交給其它進程。這會帶來一點性能的損失。 最好還是單消費者模式使用隊列。
例子:
package com.colobu.zkrecipe.queue;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.CuratorListener;
import org.apache.curator.framework.recipes.queue.DistributedQueue;
import org.apache.curator.framework.recipes.queue.QueueBuilder;
import org.apache.curator.framework.recipes.queue.QueueConsumer;
import org.apache.curator.framework.recipes.queue.QueueSerializer;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingServer;
import org.apache.curator.utils.CloseableUtils;
public class DistributedQueueExample {
private static final String PATH = "/example/queue";
public static void main(String[] args) throws Exception {
TestingServer server = new TestingServer();
CuratorFramework client = null;
DistributedQueue<String> queue = null;
try {
client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
client.getCuratorListenable().addListener(new CuratorListener() {
@Override
public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception {
System.out.println("CuratorEvent: " + event.getType().name());
}
});
client.start();
QueueConsumer<String> consumer = createQueueConsumer();
QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);
queue = builder.buildQueue();
queue.start();
for (int i = 0; i < 10; i++) {
queue.put(" test-" + i);
Thread.sleep((long)(3 * Math.random()));
}
Thread.sleep(20000);
} catch (Exception ex) {
} finally {
CloseableUtils.closeQuietly(queue);
CloseableUtils.closeQuietly(client);
CloseableUtils.closeQuietly(server);
}
}
private static QueueSerializer<String> createQueueSerializer() {
return new QueueSerializer<String>(){
@Override
public byte[] serialize(String item) {
return item.getBytes();
}
@Override
public String deserialize(byte[] bytes) {
return new String(bytes);
}
};
}
private static QueueConsumer<String> createQueueConsumer() {
return new QueueConsumer<String>(){
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState) {
System.out.println("connection new state: " + newState.name());
}
@Override
public void consumeMessage(String message) throws Exception {
System.out.println("consume one message: " + message);
}
};
}
}
DistributedIdQueue
DistributedIdQueue和上麵的隊列類似, 但是可以為隊列中的每一個元素設置一個ID。 可以通過ID把隊列中任意的元素移除。 它涉及幾個類:
- QueueBuilder
- QueueConsumer
- QueueSerializer
- DistributedQueue
通過下麵方法創建:
builder.buildIdQueue()
放入元素時:
queue.put(aMessage, messageId);
移除元素時:
int numberRemoved = queue.remove(messageId);
在這個例子中, 有些元素還沒有被消費者消費時就移除了,這樣消費者不會收到刪除的消息。
package com.colobu.zkrecipe.queue;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.CuratorListener;
import org.apache.curator.framework.recipes.queue.DistributedIdQueue;
import org.apache.curator.framework.recipes.queue.QueueBuilder;
import org.apache.curator.framework.recipes.queue.QueueConsumer;
import org.apache.curator.framework.recipes.queue.QueueSerializer;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingServer;
import org.apache.curator.utils.CloseableUtils;
public class DistributedIdQueueExample {
private static final String PATH = "/example/queue";
public static void main(String[] args) throws Exception {
TestingServer server = new TestingServer();
CuratorFramework client = null;
DistributedIdQueue<String> queue = null;
try {
client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
client.getCuratorListenable().addListener(new CuratorListener() {
@Override
public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception {
System.out.println("CuratorEvent: " + event.getType().name());
}
});
client.start();
QueueConsumer<String> consumer = createQueueConsumer();
QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);
queue = builder.buildIdQueue();
queue.start();
for (int i = 0; i < 10; i++) {
queue.put(" test-" + i, "Id" + i);
Thread.sleep((long)(50 * Math.random()));
queue.remove("Id" + i);
}
Thread.sleep(20000);
} catch (Exception ex) {
} finally {
CloseableUtils.closeQuietly(queue);
CloseableUtils.closeQuietly(client);
CloseableUtils.closeQuietly(server);
}
}
private static QueueSerializer<String> createQueueSerializer() {
return new QueueSerializer<String>(){
@Override
public byte[] serialize(String item) {
return item.getBytes();
}
@Override
public String deserialize(byte[] bytes) {
return new String(bytes);
}
};
}
private static QueueConsumer<String> createQueueConsumer() {
return new QueueConsumer<String>(){
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState) {
System.out.println("connection new state: " + newState.name());
}
@Override
public void consumeMessage(String message) throws Exception {
System.out.println("consume one message: " + message);
}
};
}
}
DistributedPriorityQueue
優先級隊列對隊列中的元素按照優先級進行排序。 Priority越小, 元素月靠前, 越先被消費掉。 它涉及下麵幾個類:
- QueueBuilder
- QueueConsumer
- QueueSerializer
- DistributedPriorityQueue
通過builder.buildPriorityQueue(minItemsBeforeRefresh)方法創建。 當優先級隊列得到元素增刪消息時,它會暫停處理當前的元素隊列,然後刷新隊列。minItemsBeforeRefresh指定刷新前當前活動的隊列的最小數量。 主要設置你的程序可以容忍的不排序的最小值。
放入隊列時需要指定優先級:
queue.put(aMessage, priority);
例子:
package com.colobu.zkrecipe.queue;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.CuratorListener;
import org.apache.curator.framework.recipes.queue.DistributedPriorityQueue;
import org.apache.curator.framework.recipes.queue.QueueBuilder;
import org.apache.curator.framework.recipes.queue.QueueConsumer;
import org.apache.curator.framework.recipes.queue.QueueSerializer;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingServer;
import org.apache.curator.utils.CloseableUtils;
public class DistributedPriorityQueueExample {
private static final String PATH = "/example/queue";
public static void main(String[] args) throws Exception {
TestingServer server = new TestingServer();
CuratorFramework client = null;
DistributedPriorityQueue<String> queue = null;
try {
client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
client.getCuratorListenable().addListener(new CuratorListener() {
@Override
public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception {
System.out.println("CuratorEvent: " + event.getType().name());
}
});
client.start();
QueueConsumer<String> consumer = createQueueConsumer();
QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);
queue = builder.buildPriorityQueue(0);
queue.start();
for (int i = 0; i < 10; i++) {
int priority = (int)(Math.random() * 100);
System.out.println("test-" + i + " priority:" + priority);
queue.put("test-" + i, priority);
Thread.sleep((long)(50 * Math.random()));
}
Thread.sleep(20000);
} catch (Exception ex) {
} finally {
CloseableUtils.closeQuietly(queue);
CloseableUtils.closeQuietly(client);
CloseableUtils.closeQuietly(server);
}
}
private static QueueSerializer<String> createQueueSerializer() {
return new QueueSerializer<String>(){
@Override
public byte[] serialize(String item) {
return item.getBytes();
}
@Override
public String deserialize(byte[] bytes) {
return new String(bytes);
}
};
}
private static QueueConsumer<String> createQueueConsumer() {
return new QueueConsumer<String>(){
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState) {
System.out.println("connection new state: " + newState.name());
}
@Override
public void consumeMessage(String message) throws Exception {
System.out.println("consume one message: " + message);
}
};
}
}
DistributedDelayQueue
JDK中也有DelayQueue,不知道你是否熟悉。 DistributedDelayQueue也提供了類似的功能, 元素有個delay值, 消費者隔一段時間才能收到元素。 涉及到下麵四個類。
- QueueBuilder
- QueueConsumer
- QueueSerializer
- DistributedDelayQueue
通過下麵的語句創建:
QueueBuilder<MessageType> builder = QueueBuilder.builder(client, consumer, serializer, path);
... more builder method calls as needed ...
DistributedDelayQueue<MessageType> queue = builder.buildDelayQueue();
放入元素時可以指定delayUntilEpoch
:
queue.put(aMessage, delayUntilEpoch);
注意delayUntilEpoch
不是離現在的一個時間間隔, 比如20毫秒,而是未來的一個時間戳,如 System.currentTimeMillis() + 10秒。 如果delayUntilEpoch的時間已經過去,消息會立刻被消費者接收。
例子:
package com.colobu.zkrecipe.queue;
import java.util.Date;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.CuratorListener;
import org.apache.curator.framework.recipes.queue.DistributedDelayQueue;
import org.apache.curator.framework.recipes.queue.QueueBuilder;
import org.apache.curator.framework.recipes.queue.QueueConsumer;
import org.apache.curator.framework.recipes.queue.QueueSerializer;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingServer;
import org.apache.curator.utils.CloseableUtils;
public class DistributedDelayQueueExample {
private static final String PATH = "/example/queue";
public static void main(String[] args) throws Exception {
TestingServer server = new TestingServer();
CuratorFramework client = null;
DistributedDelayQueue<String> queue = null;
try {
client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
client.getCuratorListenable().addListener(new CuratorListener() {
@Override
public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception {
System.out.println("CuratorEvent: " + event.getType().name());
}
});
client.start();
QueueConsumer<String> consumer = createQueueConsumer();
QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);
queue = builder.buildDelayQueue();
queue.start();
for (int i = 0; i < 10; i++) {
queue.put("test-" + i, System.currentTimeMillis() + 10000);
}
System.out.println(new Date().getTime() + ": already put all items");
Thread.sleep(20000);
} catch (Exception ex) {
} finally {
CloseableUtils.closeQuietly(queue);
CloseableUtils.closeQuietly(client);
CloseableUtils.closeQuietly(server);
}
}
private static QueueSerializer<String> createQueueSerializer() {
return new QueueSerializer<String>() {
@Override
public byte[] serialize(String item) {
return item.getBytes();
}
@Override
public String deserialize(byte[] bytes) {
return new String(bytes);
}
};
}
private static QueueConsumer<String> createQueueConsumer() {
return new QueueConsumer<String>() {
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState) {
System.out.println("connection new state: " + newState.name());
}
@Override
public void consumeMessage(String message) throws Exception {
System.out.println(new Date().getTime() + ": consume one message: " + message);
}
};
}
}
SimpleDistributedQueue
前麵雖然實現了各種隊列,但是你注意到沒有,這些隊列並沒有實現類似JDK一樣的接口。 SimpleDistributedQueue
提供了和JDK一致性的接口(但是沒有實現Queue接口)。 創建很簡單:
public SimpleDistributedQueue(CuratorFramework client,
String path)
增加元素:
public boolean offer(byte[] data) throws Exception
刪除元素:
public byte[] take() throws Exception
另外還提供了其它方法:
public byte[] peek() throws Exception
public byte[] poll(long timeout, TimeUnit unit) throws Exception
public byte[] poll() throws Exception
public byte[] remove() throws Exception
public byte[] element() throws Exception
沒有add
方法, 多了take
方法。
take
方法在成功返回之前會被阻塞。 而poll
在隊列為空時直接返回null。w String(client.getData().forPath(PATH2)));
} catch (Exception ex) {
ex.printStackTrace();
} finally {
CloseableUtils.closeQuietly(node);
CloseableUtils.closeQuietly(client);
CloseableUtils.closeQuietly(server);
}
}
} “`
最後更新:2017-05-23 10:31:57