閱讀901 返回首頁    go 阿裏雲 go 技術社區[雲棲]


使用stream操作表達更高級的數據處理請求, Part 1

使用stream操作表達更高級的數據處理請求,Part 1

原文鏈接 作者:Raoul-Gabriel Urma 譯者:石頭獅子(v1.lion@qq.com) 校對:吳京潤

沒有了集合你會怎麼做?幾乎每一個Java應用都建立和處理集合。對於許多編程任務而言,這是基礎的技術:集合分組和處理數據。例如,你可能想要建立一個銀行交易集合來代表用戶的賬戶記錄。然後,你想要處理所有的集合找出用戶花費了多少金額。盡管集合如此重要,但是Java的實現遠非完美。

首先,典型的集合處理模式有點像SQL操作,例如”查找”(查找最大值的交易)或”分組”(編組所有與雜貨購買有關的交易)。大部分的數據庫可以允許我們聲明式地指定這些操作。例如,後麵的SQL查詢可以讓我們找出最高值的交易ID:”SELECT id, MAX(value) from transactions”。

正如所見,我們並不需要去實現如何計算最大值(例如,使用循環,一個變量跟蹤最大的值)。我僅需要表達我們需要的。這個原則意味著,你並不需要擔憂如何明確地實現這些查詢–這完全不需要你處理。為什麼我們不能讓集合做相同的事情呢?想想你使用循環一次又一次的重新實現了這些操作幾次?

其次,我們怎樣才能有效率的處理大型的集合?理論上講,需要加快處理的速度,可能要使用多核架構。然而,寫出並行處理的代碼並不容易,而且也容易出錯。

Java SE 8 解決了這個問題。 Java API的設計者使用新的稱為Stream的抽象更新了API,使得可以聲明式的處理數據。此外,streams可以使用多核心架構,而你並不需要寫任何一行關於多核心處理的代碼。聽起來很美,確實是這樣嗎?這就是本係列文章要表述的內容。

在我們詳細表述使用streams可以做什麼之前,先讓我們看看一個例子。以便有一個使用Java SE 8 streams新的編程方式的概念。假設我們需要找出所有類型為grocery的交易,返回以交易金額為降序的交易ID列表。Java SE 7中,我們所做的如Listing 1Java SE 8中,我們所做的如Listing 2

01 List<Transaction> groceryTransactions = new Arraylist<>();
02 for(Transaction t: transactions){
03   if(t.getType() == Transaction.GROCERY){
04     groceryTransactions.add(t);
05   }
06 }
07 Collections.sort(groceryTransactions, new Comparator(){
08   public int compare(Transaction t1, Transaction t2){
09     return t2.getValue().compareTo(t1.getValue());
10   }
11 });
12 List<Integer> transactionIds = new ArrayList<>();
13 for(Transaction t: groceryTransactions){
14   transactionsIds.add(t.getId());
15 }

Listing 1

1 List<Integer> transactionsIds =
2       transactions.stream()
3                   .filter(t -> t.getType() == Transaction.GROCERY)
4                   .sorted(comparing(Transaction::getValue).reversed())
5                   .map(Transaction::getId)
6                   .collect(toList());

Listing 2

Figure 1 描述了Java SE 8的代碼。首先,我們使用List上可用的stream()方法從transactions(數據源)列表上取到stream。隨後,幾個操作(filter,sorted,map,collect)串聯起來形成pipeline(管道),pipeline可以看成是對數據查詢的一種形式。

Figure 1

可是,如何才能並行執行代碼呢?對於Java SE 8來說,這是否容易做到:隻要使用parallelStream()替換stream()方法,正如Listing 3所示Streams API內部會分解你的查詢,使用你電腦上的多個核心。

1 </pre>
2 List<Integer> transactionsIds = transactions.parallelStream()
3     .filter(t -> t.getType() == Transaction.GROCERY)
4     .sorted(comparing(Transaction::getValue).reversed())
5     .map(Transaction::getId)
6     .collect(toList());

Listing 3

不必擔憂這段代碼是否無法理解。我們會在下一章中繼續探究代碼是如何工作的。注意到lambda 表達式(例如, t-> t.getCategory() == Transaction.GROCERY),和方法引用(例如,Transaction::getId)的使用。這些概念目前你應該是熟悉的。

現在,已經看到stream作為有效表達的抽象,就像集合數據上的SQL操作。此外,這些操作可以簡潔的使用lambda 表達式參數化。

在學習Java SE 8 streams係列文章之後,你應該能夠使用Streams API寫出類似Listing 3上的代碼,表達出強有力的查詢。

使用Streams基礎
我們先從一些理論開始。一個stream的定義是什麼?簡短的定義是”從一個支持聚集操作的源上獲取的一序列元素”。讓我們逐個解釋:

序列元素:stream為特定元素類型值集合提供了一個接口。但是,stream並不實際存儲元素;元素隻在需要的時候被計算。
:Stream從數據提供源上消費數據,源可以是集合、數組、I/O資源等。
聚集操作,Stream支持類SQL的操作,和函數式編程語言的共通操作,例如 filter, map, reduce, find, match, sorted等等。

此外,stream操作有兩個基本的特征,使得其和集合操作有極大的不同。

管道:許多stream 操作返回stream自身。這可以讓操作串聯成一個大的管道。這也使得某些優化技術,例如惰性(laziness)和短路(short-circuiting)得以實現,這些概念我們都會在後麵闡釋。
內部迭代:與集合相比,集合的迭代是明確地(外部迭代),而stream操作執行的迭代你無法感知到。

讓我們重新看看之前的代碼來闡述這個概念。Figure 2表述了Listing 2的更多細節。

Figure 2
首先,我們從transactions list上調用stream()獲取到stream。數據源是transaction list,並且提供元素序列給stream。接下來,我們使用一係列stream上的聚合操作:filter (使用給定的predicate過濾元素), sorted (使用給定的comparator排序元素), and map (抽取信息)。所有這些操作除了collect之外,都返回stream。所以,這些操作可以串聯形成一個管道,管道可以看成是對源查詢的視圖。

所有的操作隻有在調用collect的時候才會執行。collect操作會開始處理管道,返回結果(一些不是stream;例子上是List)。不要太關心collect;我們會在之後的文章中詳細闡述。現在,你可以把collect看成一個需要指定如何聚集stream元素匯總成結果的操作。例子中,toList()則描述了需要從Stream轉換為List。

在我們闡述stream的方法之前,暫停並回顧一下stream 和collection之間的不同。

Streams Versus Collections

集合與stream在序列元素上所提供接口的新概念,都同時在java上存在。所以,不同的是什麼?簡而言之,集合是關於數據的,stream是關於計算的。想想存儲在DVD上的電影。這就是集合(可能是字節,又可能是幀–這裏,我們並不關心),因為其包含所有的數據結構。現在我們想想相同的視頻,當視頻是互聯網上的流的情況。則這個時候就是stream(比特或幀)。視頻流播放器隻需要下載用戶現在觀看位置之前的幾幀,所以你才可以從流的起始開始播放,在這之前,流裏麵的數據已經是被計算過了(想象下足球直播流)。

粗略的講,集合和stream之間的不同則是在處理計算的事情時。集合是一個內存上的數據結構,持有所有的這個數據結構的值–集合上的每個元素在要添加進集合之前都需要被計算。相反,stream概念上是固定的數據結構,流內的每個元素隻在需要的時候計算。

使用Collection接口則需要用戶來完成迭代(例如,使用稱為foreach的增強for循環);這個被叫做外部迭代。

相反,Streams庫使用內部迭代–為你執行迭代操作並且在某處維護執行結果;你僅僅隻要提供一個函數說我要完成這個。Listing 4裏麵的的代碼(使用集合的外部迭代)和Listing 5(使用stream的內部迭代)則闡述了這點不同。

1 List<String> transactionIds = new ArrayList<>();
2     for(Transaction t: transactions){
3         transactionIds.add(t.getId());
4     }

Listing 4

1 List<Integer> transactionIds =
2     transactions.stream()
3     .map(Transaction::getId)
4     .collect(toList());

Listing 5

Listing 4上,我們明確地順序迭代transactions list,抽取出每個交易ID並添加給聚集器。相反,當使用stream,並沒有明確地迭代。Listing 5上的代碼建立一個查詢,其中map操作參數化為抽取交易ID,collect操作轉換結果Stream到List。

到目前為止,你應該明確知道stream是什麼,並且你可以使用它。現在,讓我們看看stream提供的其他操作,這些操作可以讓你表達你自己的數據處理查詢。

Stream Operations: Exploiting Streams to Process Data

java.util .stream.Stream中的Stream接口定義了許多操作,主要可以分成兩類。正如Figure 1裏麵的例子,可以看到如下的操作:

filter, sorted, 和map, 這些可以從管道上連接在一起的。
collect 關閉管道並放回結果。

Stream 上可以連接的操作稱為中間操作。因為其返回的類型是Stream。關閉stream管道的操作稱為結束操作。其從管道上產生結果,例如List,一個整數,甚至是void(任何非stream類型)。

你也許會疑惑這些物質的重要性。當然,中間操作在stream管道上執行結束之前是不會執行;中間操作是惰性的(Lazy),主要是因為中間操作通常是合並的,並且被結束操作處理進通道。

01 List<Integer> numbers = Arrays.asList(12345678);
02 List<Integer> twoEvenSquares =
03     numbers.stream()
04             .filter(n -> {
05             System.out.println("filtering " + n);
06             return n % 2 == 0;
07             })
08         .map(n -> {
09         System.out.println("mapping " + n);
10         return n * n;
11     })
12     .limit(2)
13     .collect(toList());

Listing 6

例如,看看Listing 6上的代碼,計算給定number list上兩個偶數的平方:

filtering 1
filtering 2
mapping 2
filtering 3
filtering 4
mapping 4

因為limit(2)使用短路特性;我們需要隻處理stream的部分,並非全部地返回結果。這和計算用and串聯操作的布爾表達式有點類似:隻要一個表達式返回false,我們可以推斷出整個表達式返回false,而不用全部計算。這裏,limit操作返回大小為2的stream。

當然,filter和map操作合並到相同的通道中。

總結下我們目前學習到的,宏觀上處理stream包括這三件事:

一個數據源(例如集合),在數據源上執行的查詢
串聯的中間操作,這些操作形成stream管道
一個結束操作, 執行stream管道,並且產生結果。

現在,先看看stream上可用的一些操作。查閱java.util .stream.Stream接口獲取全部的列表,同樣也是這篇文章後麵引用的資源。

Filtering. 有幾個操作可以用來從stream中過濾元素:
filter(Predicate): 使用predicate (java.util.function.Predicate)作為參數,並返回包含所有匹配給定predict元素的stream。

distinct: 返回一個有唯一元素的stream(根據stream中元素的equals實現)。
limit(n): 返回一個不長於給定大小n的stream。
skip(n): 返回一個丟棄了前麵n個元素的stream。

Finding and matching. 一個通常的數據處理模式是決定是否某些元素匹配給定的屬性。你可以使用anyMatch,allMatch和noneMatch操作來幫助你完成這些操作。所有這些操作使用Predicate作為參數,返回一個布爾值作為結果(因此,這些是決定式的操作)。例如,你可以使用allMatch檢查transaction stream中所有交易額大於100的元素,如 Listing 7所示的。

1 boolean expensive = transactions.stream()
2     .allMatch(t -> t.getValue() > 100);

Listing 7

Stream接口提供 findFirst 和findAny操作,用於從stream中取回任意的元素。主要可以用於連接其他的stream操作,例如filter。
findFirst 和findAny返回Optional對象,如Listing 8所示。

1 Optional<Transaction> = transactions.stream()
2     .filter(t -> t.getType() == Transaction.GROCERY)
3     .findAny();

Listing 8

Optional<T>類(java.util .Optional)是一個容器類,用於代表一個值存在或不存在。Listing 8中,findAny可能並不會返回任何grocery類型的交易。

Optional類有一些方法用於測試元素是否存在。例如,如果有交易存在,我們可以選擇使用ifPresent方法選擇對optional對象上應用操作,如Listing 9(我們隻是打印交易)。

1 transactions.stream()
2     .filter(t -> t.getType() == Transaction.GROCERY)
3     .findAny()
4     .ifPresent(System.out::println);

Listing 9

Mapping. Stream支持map方法,使用function(java.util.function.Function)作為參數用於映射stream中的元素到另外一種形式。function會應用到每一個元素,映射元素到新的元素。

例如,你可能想要從stream的每個元素中抽出信息。Listing 10的例子中,我們從一個list上返回每個詞長度的list。Reducing. 目前,我們所見的結束操作返回boolean(allMatch等),void(forEach),或一個Optional對象(findAny等)。並且同樣已經使用collect組合所有stream中的元素為List。

1 List<String> words = Arrays.asList("Oracle""Java""Magazine");
2 List<Integer> wordLengths =
3     words.stream()
4     .map(String::length)
5     .collect(toList());

Listing 10

當然,你同樣可以組合stream中的所有元素表述成更複雜的處理請求,例如,最高ID的交易是什麼?或計算所有交易額的總數。

這可以使用stream上的reduce操作,這個操作重複地為每個元素應用操作(例如,添加兩個數字),直到產生結果。函數式程序中一般稱這操作為折疊操作(fold),你可以把這個操作看成是重複地折疊紙張的一部分(你的stream),直到形成一個小正方形,這就是折疊操作的結果。

先看下我們如何使用for循環計算list的和:

1 int sum = 0;
2 for (int x : numbers) {
3     sum += x;
4 }

Numbers list上的每個元素重複地使用添加操作來產生一個結果。實際上,我們縮小numbers list到一個數值。代碼中則有兩個參數:sum變量的初始值,例子上為0,和組合所有list元素的操作,例子上為+。

使用stream的reduce方法,我們可以累加所有的stream元素。如 Listing 11所示的。

reduce方法使用兩個參數:

1 int sum = numbers.stream().reduce(0, (a, b) -> a + b);

Listing 11

一個初始值,0

BinaryOperator<T>,用於組合兩個元素並產生一個新的值。

reduce方法本質上抽象了重複的應用模式。其他查詢例如”計算產品”或”計算最大值(見Listing 12)”則是成為reduce方法的特定例子。

1 int product = numbers.stream().reduce(1, (a, b) -> a * b);
2 int product = numbers.stream().reduce(1, Integer::max);

Listing 12

Numeric Streams

現在,已經看過了使用reduce方法用於計算整數stream和的例子。但是,這其中還是有一定的開銷:我們執行多次裝箱(boxing)操作,重複的在integer對象上求和。如果可以調用一個sum方法,可能會更好一點,正如Listing 13所示,是否更明確我們代碼的目的?

1 int statement = transactions.stream()
2     .map(Transaction::getValue)
3     .sum(); // error since Stream has no sum method

Listing 13

Java SE 8 引入3個特定的primitive stream接口用於處理這個問題–IntStream,DoubleStream和LongStream–各自代表stream中的元素是int,double和long。

通常要轉換stream到特定版本的stream所執行的方法是mapToInt,mapToDouble和mapToLong。這些方法工作起來完全像是我們之前見到的map方法,不同的是這些方法返回特定的stream而不是Stream<T>。例如,我們可以改進Listing 13的代碼,如Listing 14所展示的。你同樣可以通過裝箱(boxed)操作從primitive stream轉換為某個對象stream。

1 int statementSum =
2     transactions.stream()
3     .mapToInt(Transaction::getValue)
4     .sum(); // works!

Listing 14

最後,另一個numeric streams有用的形式是數字範圍(numeric ranges)。例如,你可能想要產生所有1到100之間的數值。Java SE 8則引入了 IntStream, DoubleStream, 和LongStream上可用的2個靜態方法輔助產生這樣的範圍:range和rangeClosed。

這兩個方法都使用範圍的起始作為首個參數,範圍的結束作為第二個參數。range方法是開區間,而rangeClosed是閉區間的。 Listing 15則是一個使用rangeClose方法的例子,返回10到30之間數值的stream。

1 IntStream oddNumbers =
2     IntStream.rangeClosed(1030)
3     .filter(n -> n % 2 == 1);

Listing 15

Building Streams

有幾種方式用於構建stream。我們已經看到如何從集合上獲取到stream。同樣,我也使用了number stream。你同樣可以從值、數組或文件上建立stream。此外甚至可以從一個函數上獲取stream 來產生無限的stream。

從值或從數組上建立stream十分簡單:隻要為值調用Stream.of的靜態方法和為數組調用Arrays.stream生成。如 Listing 16所示。

1 Stream<Integer> numbersFromValues = Stream.of(1234);
2 int[] numbers = {1234};
3 IntStream numbersFromArray = Arrays.stream(numbers);

Listing 16

同樣也可以使用Files.lines靜態方法將文件轉換為一個stream。例如,Listing 17計算文件中的行數。

1 long numberOfLines =
2      Files.lines(Paths.get(“yourFile.txt”), Charset.defaultCharset())
3          .count();

Listing 17

Infinite streams. 最後,在我們結束關於stream的這篇文章之前,還有一個令人興奮的概念。到目前為止,應該理解stream內的元素是按需產生的。這裏有兩個靜態方法–Stream.iterate 和 Stream.generate可以從函數上建立stream。然而,由於元素是按需計算的,這兩個操作可以一直產生元素。這就是為什麼稱為 infinite stream:沒有固定大小的stream,與我們從固定集合建立的流相比。

Listing 18 是使用iterate的例子,創建一個所有10倍數的數字stream。Iterate方法使用一個初始值(例子上是,0)和一個用於連續地產生每個新值的lambda(類型為UnaryOperator<T>)。

1 Stream<Integer> numbers = Stream.iterate(0, n -> n + 10);

Listing 18
我們可以把這個無限的stream轉換成固定大小的stream,通過使用limit操作。例如,我們可以限製stream的大小為5,如Listing 19所示。

1 numbers.limit(5).forEach(System.out::println); // 0, 10, 20, 30, 40

Listing 19

Conclusion

Java SE 8 引入的stream API,可以讓我們表達更複雜的數據處理邏輯。本文中,你已經看到stream支持許多方法,例如filter,map,reduce和iterate,這些方法組合可以寫出簡潔的代碼並表達數據處理查詢。這種新的代碼編寫方式與Java SE8 之前你要處理的集合十分的不同。顯然,這有許多好處。首先,Stream API使用了許多技術,例如惰性和短路來優化數據處理查詢。其次,stream可以是並行自動地使用多核心架構。本係列的下一章節中,我們會表述更高級的操作,例如flatMap和collect。

最後更新:2017-05-22 17:01:26

  上一篇:go  企業網站推廣的方法有哪些?
  下一篇:go  並發編程圖書