閱讀529 返回首頁    go 阿裏雲 go 技術社區[雲棲]


Apache Storm 官方文檔 —— Storm 與 Kestrel

本文說明了如何使用 Storm 從 Kestrel 集群中消費數據。

前言

Storm

本教程中使用了 storm-kestrel 項目和 storm-starter 項目中的例子。建議讀者將這幾個項目 clone 到本地,並動手運行其中的例子。

Kestrel

本文假定讀者可以如此項目所述在本地運行一個 Kestrel 集群。

Kestrel 服務器與隊列

Kestrel 服務中包含有一組消息隊列。Kestrel 隊列是一種非常簡單的消息隊列,可以運行於 JVM 上,並使用 memcache 協議(以及一些擴展)與客戶端交互。詳情可以參考 storm-kestrel 項目中的 KestrelThriftClient 類的實現。

每個隊列均嚴格遵循先入先出的規則。為了提高服務性能,數據都是緩存在係統內存中的;不過,隻有開頭的 128MB 是保存在內存中的。在服務停止的時候,隊列的狀態會保存到一個日誌文件中。

請參閱此文了解更多詳細信息。

Kestrel 具有 * 快速 * 小巧 * 持久 * 可靠 等特點。

例如,Twitter 就使用 Kestrel 作為消息係統的核心環節,此文中介紹了相關信息。

** 向 Kestrel 中添加數據

首先,我們需要一個可以向 Kestrel 的隊列添加數據的程序。下述方法使用了 storm-kestrel 項目中的 KestrelClient 的實現。該方法從一個包含 5 個句子的數組中隨機選擇一個句子添加到 Kestrel 的隊列中。

  private static void queueSentenceItems(KestrelClient kestrelClient, String queueName)
            throws ParseError, IOException {

        String[] sentences = new String[] {
                "the cow jumped over the moon",
                "an apple a day keeps the doctor away",
                "four score and seven years ago",
                "snow white and the seven dwarfs",
                "i am at two with nature"};

        Random _rand = new Random();

        for(int i=1; i<=10; i++){

            String sentence = sentences[_rand.nextInt(sentences.length)];

            String val = "ID " + i + " " + sentence;

            boolean queueSucess = kestrelClient.queue(queueName, val);

            System.out.println("queueSucess=" +queueSucess+ " [" + val +"]");
        }
    }

從 Kestrel 中移除數據

此方法從一個隊列中取出一個數據,但並不把該數據從隊列中刪除:

private static void dequeueItems(KestrelClient kestrelClient, String queueName) throws IOException, ParseError { for(int i=1; i<=12; i++){

        Item item = kestrelClient.dequeue(queueName);

        if(item==null){
            System.out.println("The queue (" + queueName + ") contains no items.");
        }
        else
        {
            byte[] data = item._data;

            String receivedVal = new String(data);

            System.out.println("receivedItem=" + receivedVal);
        }
    }

此方法會從隊列中取出並移除數據:

private static void dequeueAndRemoveItems(KestrelClient kestrelClient, String queueName)
throws IOException, ParseError
     {
        for(int i=1; i<=12; i++){

            Item item = kestrelClient.dequeue(queueName);


            if(item==null){
                System.out.println("The queue (" + queueName + ") contains no items.");
            }
            else
            {
                int itemID = item._id;


                byte[] data = item._data;

                String receivedVal = new String(data);

                kestrelClient.ack(queueName, itemID);

                System.out.println("receivedItem=" + receivedVal);
            }
        }
}

向 Kestrel 中連續添加數據

下麵的程序可以向本地 Kestrel 服務的一個 sentence_queue 隊列中連續添加句子,這也是我們的最後一個程序。

可以在命令行窗口中輸入一個右中括號 ] 並回車來停止程序。

import java.io.IOException;
import java.io.InputStream;
import java.util.Random;

import backtype.storm.spout.KestrelClient;
import backtype.storm.spout.KestrelClient.Item;
import backtype.storm.spout.KestrelClient.ParseError;

public class AddSentenceItemsToKestrel {

    /**
     * @param args
     */
    public static void main(String[] args) {

        InputStream is = System.in;

        char closing_bracket = ']';

        int val = closing_bracket;

        boolean aux = true;

        try {

            KestrelClient kestrelClient = null;
            String queueName = "sentence_queue";

            while(aux){

                kestrelClient = new KestrelClient("localhost",22133);

                queueSentenceItems(kestrelClient, queueName);

                kestrelClient.close();

                Thread.sleep(1000);

                if(is.available()>0){
                 if(val==is.read())
                     aux=false;
                }
            }
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        catch (ParseError e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

        System.out.println("end");

    }
}

使用 KestrelSpout

下麵的拓撲使用 KestrelSpout 從一個 Kestrel 隊列中讀取句子,並將句子分割成若幹個單詞(Bolt:SplitSentence),然後輸出每個單詞出現的次數(Bolt:WordCount)。數據處理的細節可以參考消息的可靠性保證一文。

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("sentences", new KestrelSpout("localhost",22133,"sentence_queue",new StringScheme()));
builder.setBolt("split", new SplitSentence(), 10)
            .shuffleGrouping("sentences");
builder.setBolt("count", new WordCount(), 20)
        .fieldsGrouping("split", new Fields("word"));

運行

首先,以生產模式或者開發者模式啟動你的本地 Kestrel 服務。

然後,等待大約 5 秒鍾以防出現網絡連接異常。

現在可以運行向隊列中添加數據的程序,並啟動 Storm 拓撲。程序啟動的順序並不重要。

如果你以 TOPOLOGY_DEBUG 模式運行拓撲你會觀察到拓撲中 tuple 發送的細節信息。

最後更新:2017-05-22 13:32:22

  上一篇:go  Apache Storm 官方文檔 —— 常用模式
  下一篇:go  Apache Storm 官方文檔 —— 分布式 RPC