閱讀740 返回首頁    go 阿裏雲 go 技術社區[雲棲]


跟著實例學習ZooKeeper的用法: 隊列

使用Curator也可以簡化Ephemeral Node (臨時節點)的操作。Curator也提供ZK Recipe的分布式隊列實現。 利用ZK的 PERSISTENTSEQUENTIAL節點, 可以保證放入到隊列中的項目是按照順序排隊的。 如果單一的消費者從隊列中取數據, 那麼它是先入先出的,這也是隊列的特點。 如果你嚴格要求順序,你就的使用單一的消費者,可以使用leader選舉隻讓leader作為唯一的消費者。

但是, 根據Netflix的Curator作者所說, ZooKeeper真心不適合做Queue,或者說ZK沒有實現一個好的Queue,詳細內容可以看 Tech Note 4, 原因有五:

  1. ZK有1MB 的傳輸限製。 實踐中ZNode必須相對較小,而隊列包含成千上萬的消息,非常的大。
  2. 如果有很多節點,ZK啟動時相當的慢。 而使用queue會導致好多ZNode. 你需要顯著增大 initLimit 和 syncLimit.
  3. ZNode很大的時候很難清理。Netflix不得不創建了一個專門的程序做這事。
  4. 當很大量的包含成千上萬的子節點的ZNode時, ZK的性能變得不好
  5. ZK的數據庫完全放在內存中。 大量的Queue意味著會占用很多的內存空間。

盡管如此, Curator還是創建了各種Queue的實現。 如果Queue的數據量不太多,數據量不太大的情況下,酌情考慮,還是可以使用的。

DistributedQueue

DistributedQueue是最普通的一種隊列。 它設計以下四個類:

  • QueueBuilder
  • QueueConsumer
  • QueueSerializer
  • DistributedQueue

創建隊列使用QueueBuilder,它也是其它隊列的創建類。

public static <T> QueueBuilder<T> builder(CuratorFramework client,
                                          QueueConsumer<T> consumer,
                                          QueueSerializer<T> serializer,
                                          java.lang.String queuePath)
QueueBuilder<MessageType>    builder = QueueBuilder.builder(client, consumer, serializer, path);
... more builder method calls as needed ...
DistributedQueue<MessageType queue = builder.build();    `

創建好queue就可以往裏麵放入數據了:

queue.put(aMessage);

QueueSerializer提供了對隊列中的對象的序列化和反序列化。

QueueConsumer是消費者, 它可以接收隊列的數據。 處理隊列中的數據的代碼邏輯可以放在QueueConsumer.consumeMessage()中。

正常情況下先將消息從隊列中移除,再交給消費者消費。 但這是兩個步驟,不是原子的。 可以調用Builder的lockPath()消費者加鎖, 當消費者消費數據時持有鎖,這樣其它消費者不能消費此消息。 如果消費失敗或者進程死掉,消息可以交給其它進程。這會帶來一點性能的損失。 最好還是單消費者模式使用隊列。

例子:

package com.colobu.zkrecipe.queue;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.CuratorListener;
import org.apache.curator.framework.recipes.queue.DistributedQueue;
import org.apache.curator.framework.recipes.queue.QueueBuilder;
import org.apache.curator.framework.recipes.queue.QueueConsumer;
import org.apache.curator.framework.recipes.queue.QueueSerializer;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingServer;
import org.apache.curator.utils.CloseableUtils;

public class DistributedQueueExample {
    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        DistributedQueue<String> queue = null;
        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.getCuratorListenable().addListener(new CuratorListener() {
                @Override
                public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception {
                    System.out.println("CuratorEvent: " + event.getType().name());
                }
            });

            client.start();
            QueueConsumer<String> consumer = createQueueConsumer();
            QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);
            queue = builder.buildQueue();
            queue.start();

            for (int i = 0; i < 10; i++) {
                queue.put(" test-" + i);
                Thread.sleep((long)(3 * Math.random()));
            }

            Thread.sleep(20000);

        } catch (Exception ex) {

        } finally {
            CloseableUtils.closeQuietly(queue);
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }

    private static QueueSerializer<String> createQueueSerializer() {
        return new QueueSerializer<String>(){

            @Override
            public byte[] serialize(String item) {
                return item.getBytes();
            }

            @Override
            public String deserialize(byte[] bytes) {
                return new String(bytes);
            }

        };
    }

    private static QueueConsumer<String> createQueueConsumer() {

        return new QueueConsumer<String>(){

            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                System.out.println("connection new state: " + newState.name());
            }

            @Override
            public void consumeMessage(String message) throws Exception {
                System.out.println("consume one message: " + message);                
            }

        };
    }
}

DistributedIdQueue

DistributedIdQueue和上麵的隊列類似, 但是可以為隊列中的每一個元素設置一個ID。 可以通過ID把隊列中任意的元素移除。 它涉及幾個類:

  • QueueBuilder
  • QueueConsumer
  • QueueSerializer
  • DistributedQueue

通過下麵方法創建:

builder.buildIdQueue()

放入元素時:

queue.put(aMessage, messageId);

移除元素時:

int numberRemoved = queue.remove(messageId);

在這個例子中, 有些元素還沒有被消費者消費時就移除了,這樣消費者不會收到刪除的消息。

package com.colobu.zkrecipe.queue;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.CuratorListener;
import org.apache.curator.framework.recipes.queue.DistributedIdQueue;
import org.apache.curator.framework.recipes.queue.QueueBuilder;
import org.apache.curator.framework.recipes.queue.QueueConsumer;
import org.apache.curator.framework.recipes.queue.QueueSerializer;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingServer;
import org.apache.curator.utils.CloseableUtils;

public class DistributedIdQueueExample {
    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        DistributedIdQueue<String> queue = null;
        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.getCuratorListenable().addListener(new CuratorListener() {
                @Override
                public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception {
                    System.out.println("CuratorEvent: " + event.getType().name());
                }
            });

            client.start();
            QueueConsumer<String> consumer = createQueueConsumer();
            QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);
            queue = builder.buildIdQueue();
            queue.start();

            for (int i = 0; i < 10; i++) {
                queue.put(" test-" + i, "Id" + i);
                Thread.sleep((long)(50 * Math.random()));
                queue.remove("Id" + i);
            }

            Thread.sleep(20000);

        } catch (Exception ex) {

        } finally {
            CloseableUtils.closeQuietly(queue);
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }

    private static QueueSerializer<String> createQueueSerializer() {
        return new QueueSerializer<String>(){

            @Override
            public byte[] serialize(String item) {
                return item.getBytes();
            }

            @Override
            public String deserialize(byte[] bytes) {
                return new String(bytes);
            }

        };
    }

    private static QueueConsumer<String> createQueueConsumer() {

        return new QueueConsumer<String>(){

            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                System.out.println("connection new state: " + newState.name());
            }

            @Override
            public void consumeMessage(String message) throws Exception {
                System.out.println("consume one message: " + message);                
            }

        };
    }
}

DistributedPriorityQueue

優先級隊列對隊列中的元素按照優先級進行排序。 Priority越小, 元素月靠前, 越先被消費掉。 它涉及下麵幾個類:

  • QueueBuilder
  • QueueConsumer
  • QueueSerializer
  • DistributedPriorityQueue

通過builder.buildPriorityQueue(minItemsBeforeRefresh)方法創建。 當優先級隊列得到元素增刪消息時,它會暫停處理當前的元素隊列,然後刷新隊列。minItemsBeforeRefresh指定刷新前當前活動的隊列的最小數量。 主要設置你的程序可以容忍的不排序的最小值。

放入隊列時需要指定優先級:

queue.put(aMessage, priority);

例子:

package com.colobu.zkrecipe.queue;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.CuratorListener;
import org.apache.curator.framework.recipes.queue.DistributedPriorityQueue;
import org.apache.curator.framework.recipes.queue.QueueBuilder;
import org.apache.curator.framework.recipes.queue.QueueConsumer;
import org.apache.curator.framework.recipes.queue.QueueSerializer;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingServer;
import org.apache.curator.utils.CloseableUtils;

public class DistributedPriorityQueueExample {
    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        DistributedPriorityQueue<String> queue = null;
        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.getCuratorListenable().addListener(new CuratorListener() {
                @Override
                public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception {
                    System.out.println("CuratorEvent: " + event.getType().name());
                }
            });

            client.start();
            QueueConsumer<String> consumer = createQueueConsumer();
            QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);
            queue = builder.buildPriorityQueue(0);
            queue.start();

            for (int i = 0; i < 10; i++) {
                int priority = (int)(Math.random() * 100);
                System.out.println("test-" + i + " priority:" + priority);
                queue.put("test-" + i, priority);
                Thread.sleep((long)(50 * Math.random()));
            }

            Thread.sleep(20000);

        } catch (Exception ex) {

        } finally {
            CloseableUtils.closeQuietly(queue);
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }

    private static QueueSerializer<String> createQueueSerializer() {
        return new QueueSerializer<String>(){

            @Override
            public byte[] serialize(String item) {
                return item.getBytes();
            }

            @Override
            public String deserialize(byte[] bytes) {
                return new String(bytes);
            }

        };
    }

    private static QueueConsumer<String> createQueueConsumer() {

        return new QueueConsumer<String>(){

            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                System.out.println("connection new state: " + newState.name());
            }

            @Override
            public void consumeMessage(String message) throws Exception {
                System.out.println("consume one message: " + message);                
            }

        };
    }
}

DistributedDelayQueue

JDK中也有DelayQueue,不知道你是否熟悉。 DistributedDelayQueue也提供了類似的功能, 元素有個delay值, 消費者隔一段時間才能收到元素。 涉及到下麵四個類。

  • QueueBuilder
  • QueueConsumer
  • QueueSerializer
  • DistributedDelayQueue

通過下麵的語句創建:

QueueBuilder<MessageType>    builder = QueueBuilder.builder(client, consumer, serializer, path);
... more builder method calls as needed ...
DistributedDelayQueue<MessageType> queue = builder.buildDelayQueue();

放入元素時可以指定delayUntilEpoch

queue.put(aMessage, delayUntilEpoch);

注意delayUntilEpoch不是離現在的一個時間間隔, 比如20毫秒,而是未來的一個時間戳,如 System.currentTimeMillis() + 10秒。 如果delayUntilEpoch的時間已經過去,消息會立刻被消費者接收。

例子:

package com.colobu.zkrecipe.queue;

import java.util.Date;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.CuratorListener;
import org.apache.curator.framework.recipes.queue.DistributedDelayQueue;
import org.apache.curator.framework.recipes.queue.QueueBuilder;
import org.apache.curator.framework.recipes.queue.QueueConsumer;
import org.apache.curator.framework.recipes.queue.QueueSerializer;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingServer;
import org.apache.curator.utils.CloseableUtils;

public class DistributedDelayQueueExample {
    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        DistributedDelayQueue<String> queue = null;
        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.getCuratorListenable().addListener(new CuratorListener() {
                @Override
                public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception {
                    System.out.println("CuratorEvent: " + event.getType().name());
                }
            });

            client.start();
            QueueConsumer<String> consumer = createQueueConsumer();
            QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);
            queue = builder.buildDelayQueue();
            queue.start();

            for (int i = 0; i < 10; i++) {
                queue.put("test-" + i, System.currentTimeMillis() + 10000);
            }
            System.out.println(new Date().getTime() + ": already put all items");


            Thread.sleep(20000);

        } catch (Exception ex) {

        } finally {
            CloseableUtils.closeQuietly(queue);
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }

    private static QueueSerializer<String> createQueueSerializer() {
        return new QueueSerializer<String>() {

            @Override
            public byte[] serialize(String item) {
                return item.getBytes();
            }

            @Override
            public String deserialize(byte[] bytes) {
                return new String(bytes);
            }

        };
    }

    private static QueueConsumer<String> createQueueConsumer() {

        return new QueueConsumer<String>() {

            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                System.out.println("connection new state: " + newState.name());
            }

            @Override
            public void consumeMessage(String message) throws Exception {
                System.out.println(new Date().getTime() + ": consume one message: " + message);
            }

        };
    }
}

SimpleDistributedQueue

前麵雖然實現了各種隊列,但是你注意到沒有,這些隊列並沒有實現類似JDK一樣的接口。 SimpleDistributedQueue提供了和JDK一致性的接口(但是沒有實現Queue接口)。 創建很簡單:

public SimpleDistributedQueue(CuratorFramework client,
                              String path)

增加元素:

public boolean offer(byte[] data) throws Exception

刪除元素:

public byte[] take() throws Exception

另外還提供了其它方法:

public byte[] peek() throws Exception
public byte[] poll(long timeout, TimeUnit unit) throws Exception
public byte[] poll() throws Exception
public byte[] remove() throws Exception
public byte[] element() throws Exception

沒有add方法, 多了take方法。

take方法在成功返回之前會被阻塞。 而poll在隊列為空時直接返回null。w String(client.getData().forPath(PATH2)));

    } catch (Exception ex) {
        ex.printStackTrace();
    } finally {
        CloseableUtils.closeQuietly(node);
        CloseableUtils.closeQuietly(client);
        CloseableUtils.closeQuietly(server);
    }

}

} “`

最後更新:2017-05-23 10:31:57

  上一篇:go  Java FP: Java中函數式編程的謂詞函數(Predicates)第一部分
  下一篇:go  CPU Cache Flushing Fallacy