使用stream操作表達更高級的數據處理請求, Part 1
使用stream操作表達更高級的數據處理請求,Part 1
原文鏈接 作者:Raoul-Gabriel Urma 譯者:石頭獅子(v1.lion@qq.com) 校對:吳京潤
沒有了集合你會怎麼做?幾乎每一個Java應用都建立和處理集合。對於許多編程任務而言,這是基礎的技術:集合分組和處理數據。例如,你可能想要建立一個銀行交易集合來代表用戶的賬戶記錄。然後,你想要處理所有的集合找出用戶花費了多少金額。盡管集合如此重要,但是Java的實現遠非完美。
首先,典型的集合處理模式有點像SQL操作,例如”查找”(查找最大值的交易)或”分組”(編組所有與雜貨購買有關的交易)。大部分的數據庫可以允許我們聲明式地指定這些操作。例如,後麵的SQL查詢可以讓我們找出最高值的交易ID:”SELECT id, MAX(value) from transactions”。
正如所見,我們並不需要去實現如何計算最大值(例如,使用循環,一個變量跟蹤最大的值)。我僅需要表達我們需要的。這個原則意味著,你並不需要擔憂如何明確地實現這些查詢–這完全不需要你處理。為什麼我們不能讓集合做相同的事情呢?想想你使用循環一次又一次的重新實現了這些操作幾次?
其次,我們怎樣才能有效率的處理大型的集合?理論上講,需要加快處理的速度,可能要使用多核架構。然而,寫出並行處理的代碼並不容易,而且也容易出錯。
Java SE 8 解決了這個問題。 Java API的設計者使用新的稱為Stream的抽象更新了API,使得可以聲明式的處理數據。此外,streams可以使用多核心架構,而你並不需要寫任何一行關於多核心處理的代碼。聽起來很美,確實是這樣嗎?這就是本係列文章要表述的內容。
在我們詳細表述使用streams可以做什麼之前,先讓我們看看一個例子。以便有一個使用Java SE 8 streams新的編程方式的概念。假設我們需要找出所有類型為grocery的交易,返回以交易金額為降序的交易ID列表。Java SE 7中,我們所做的如Listing 1。Java SE 8中,我們所做的如Listing 2。
01 |
List<Transaction> groceryTransactions = new Arraylist<>();
|
02 |
for (Transaction t: transactions){
|
03 |
if (t.getType() == Transaction.GROCERY){
|
04 |
groceryTransactions.add(t);
|
05 |
}
|
06 |
} |
07 |
Collections.sort(groceryTransactions, new Comparator(){
|
08 |
public int compare(Transaction t1, Transaction t2){
|
09 |
return t2.getValue().compareTo(t1.getValue());
|
10 |
}
|
11 |
}); |
12 |
List<Integer> transactionIds = new ArrayList<>();
|
13 |
for (Transaction t: groceryTransactions){
|
14 |
transactionsIds.add(t.getId());
|
15 |
} |
Listing 1
1 |
List<Integer> transactionsIds = |
2 |
transactions.stream()
|
3 |
.filter(t -> t.getType() == Transaction.GROCERY)
|
4 |
.sorted(comparing(Transaction::getValue).reversed())
|
5 |
.map(Transaction::getId)
|
6 |
.collect(toList());
|
Listing 2
Figure 1 描述了Java SE 8的代碼。首先,我們使用List上可用的stream()方法從transactions(數據源)列表上取到stream。隨後,幾個操作(filter,sorted,map,collect)串聯起來形成pipeline(管道),pipeline可以看成是對數據查詢的一種形式。
Figure 1
可是,如何才能並行執行代碼呢?對於Java SE 8來說,這是否容易做到:隻要使用parallelStream()替換stream()方法,正如Listing 3所示,Streams API內部會分解你的查詢,使用你電腦上的多個核心。
1 |
</pre> |
2 |
List<Integer> transactionsIds = transactions.parallelStream() |
3 |
.filter(t -> t.getType() == Transaction.GROCERY)
|
4 |
.sorted(comparing(Transaction::getValue).reversed())
|
5 |
.map(Transaction::getId)
|
6 |
.collect(toList());
|
Listing 3
不必擔憂這段代碼是否無法理解。我們會在下一章中繼續探究代碼是如何工作的。注意到lambda 表達式(例如, t-> t.getCategory() == Transaction.GROCERY),和方法引用(例如,Transaction::getId)的使用。這些概念目前你應該是熟悉的。
現在,已經看到stream作為有效表達的抽象,就像集合數據上的SQL操作。此外,這些操作可以簡潔的使用lambda 表達式參數化。
在學習Java SE 8 streams係列文章之後,你應該能夠使用Streams API寫出類似Listing 3上的代碼,表達出強有力的查詢。
使用Streams基礎
我們先從一些理論開始。一個stream的定義是什麼?簡短的定義是”從一個支持聚集操作的源上獲取的一序列元素”。讓我們逐個解釋:
序列元素:stream為特定元素類型值集合提供了一個接口。但是,stream並不實際存儲元素;元素隻在需要的時候被計算。
源:Stream從數據提供源上消費數據,源可以是集合、數組、I/O資源等。
聚集操作,Stream支持類SQL的操作,和函數式編程語言的共通操作,例如 filter, map, reduce, find, match, sorted等等。
此外,stream操作有兩個基本的特征,使得其和集合操作有極大的不同。
管道:許多stream 操作返回stream自身。這可以讓操作串聯成一個大的管道。這也使得某些優化技術,例如惰性(laziness)和短路(short-circuiting)得以實現,這些概念我們都會在後麵闡釋。
內部迭代:與集合相比,集合的迭代是明確地(外部迭代),而stream操作執行的迭代你無法感知到。
讓我們重新看看之前的代碼來闡述這個概念。Figure 2表述了Listing 2的更多細節。
Figure 2
首先,我們從transactions list上調用stream()獲取到stream。數據源是transaction list,並且提供元素序列給stream。接下來,我們使用一係列stream上的聚合操作:filter (使用給定的predicate過濾元素), sorted (使用給定的comparator排序元素), and map (抽取信息)。所有這些操作除了collect之外,都返回stream。所以,這些操作可以串聯形成一個管道,管道可以看成是對源查詢的視圖。
所有的操作隻有在調用collect的時候才會執行。collect操作會開始處理管道,返回結果(一些不是stream;例子上是List)。不要太關心collect;我們會在之後的文章中詳細闡述。現在,你可以把collect看成一個需要指定如何聚集stream元素匯總成結果的操作。例子中,toList()則描述了需要從Stream轉換為List。
在我們闡述stream的方法之前,暫停並回顧一下stream 和collection之間的不同。
Streams Versus Collections
集合與stream在序列元素上所提供接口的新概念,都同時在java上存在。所以,不同的是什麼?簡而言之,集合是關於數據的,stream是關於計算的。想想存儲在DVD上的電影。這就是集合(可能是字節,又可能是幀–這裏,我們並不關心),因為其包含所有的數據結構。現在我們想想相同的視頻,當視頻是互聯網上的流的情況。則這個時候就是stream(比特或幀)。視頻流播放器隻需要下載用戶現在觀看位置之前的幾幀,所以你才可以從流的起始開始播放,在這之前,流裏麵的數據已經是被計算過了(想象下足球直播流)。
粗略的講,集合和stream之間的不同則是在處理計算的事情時。集合是一個內存上的數據結構,持有所有的這個數據結構的值–集合上的每個元素在要添加進集合之前都需要被計算。相反,stream概念上是固定的數據結構,流內的每個元素隻在需要的時候計算。
使用Collection接口則需要用戶來完成迭代(例如,使用稱為foreach的增強for循環);這個被叫做外部迭代。
相反,Streams庫使用內部迭代–為你執行迭代操作並且在某處維護執行結果;你僅僅隻要提供一個函數說我要完成這個。Listing 4裏麵的的代碼(使用集合的外部迭代)和Listing 5(使用stream的內部迭代)則闡述了這點不同。
1 |
List<String> transactionIds = new ArrayList<>();
|
2 |
for (Transaction t: transactions){
|
3 |
transactionIds.add(t.getId());
|
4 |
}
|
Listing 4
1 |
List<Integer> transactionIds = |
2 |
transactions.stream()
|
3 |
.map(Transaction::getId)
|
4 |
.collect(toList());
|
Listing 5
Listing 4上,我們明確地順序迭代transactions list,抽取出每個交易ID並添加給聚集器。相反,當使用stream,並沒有明確地迭代。Listing 5上的代碼建立一個查詢,其中map操作參數化為抽取交易ID,collect操作轉換結果Stream到List。
到目前為止,你應該明確知道stream是什麼,並且你可以使用它。現在,讓我們看看stream提供的其他操作,這些操作可以讓你表達你自己的數據處理查詢。
Stream Operations: Exploiting Streams to Process Data
java.util .stream.Stream中的Stream接口定義了許多操作,主要可以分成兩類。正如Figure 1裏麵的例子,可以看到如下的操作:
filter, sorted, 和map, 這些可以從管道上連接在一起的。
collect 關閉管道並放回結果。
Stream 上可以連接的操作稱為中間操作。因為其返回的類型是Stream。關閉stream管道的操作稱為結束操作。其從管道上產生結果,例如List,一個整數,甚至是void(任何非stream類型)。
你也許會疑惑這些物質的重要性。當然,中間操作在stream管道上執行結束之前是不會執行;中間操作是惰性的(Lazy),主要是因為中間操作通常是合並的,並且被結束操作處理進通道。
01 |
List<Integer> numbers = Arrays.asList( 1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 );
|
02 |
List<Integer> twoEvenSquares = |
03 |
numbers.stream()
|
04 |
.filter(n -> {
|
05 |
System.out.println( "filtering " + n);
|
06 |
return n % 2 == 0 ;
|
07 |
})
|
08 |
.map(n -> {
|
09 |
System.out.println( "mapping " + n);
|
10 |
return n * n;
|
11 |
})
|
12 |
.limit( 2 )
|
13 |
.collect(toList());
|
Listing 6
例如,看看Listing 6上的代碼,計算給定number list上兩個偶數的平方:
filtering 1
filtering 2
mapping 2
filtering 3
filtering 4
mapping 4
因為limit(2)使用短路特性;我們需要隻處理stream的部分,並非全部地返回結果。這和計算用and串聯操作的布爾表達式有點類似:隻要一個表達式返回false,我們可以推斷出整個表達式返回false,而不用全部計算。這裏,limit操作返回大小為2的stream。
當然,filter和map操作合並到相同的通道中。
總結下我們目前學習到的,宏觀上處理stream包括這三件事:
一個數據源(例如集合),在數據源上執行的查詢
串聯的中間操作,這些操作形成stream管道
一個結束操作, 執行stream管道,並且產生結果。
現在,先看看stream上可用的一些操作。查閱java.util .stream.Stream接口獲取全部的列表,同樣也是這篇文章後麵引用的資源。
Filtering. 有幾個操作可以用來從stream中過濾元素:
filter(Predicate): 使用predicate (java.util.function.Predicate)作為參數,並返回包含所有匹配給定predict元素的stream。
distinct: 返回一個有唯一元素的stream(根據stream中元素的equals實現)。
limit(n): 返回一個不長於給定大小n的stream。
skip(n): 返回一個丟棄了前麵n個元素的stream。
Finding and matching. 一個通常的數據處理模式是決定是否某些元素匹配給定的屬性。你可以使用anyMatch,allMatch和noneMatch操作來幫助你完成這些操作。所有這些操作使用Predicate作為參數,返回一個布爾值作為結果(因此,這些是決定式的操作)。例如,你可以使用allMatch檢查transaction stream中所有交易額大於100的元素,如 Listing 7所示的。
1 |
boolean expensive = transactions.stream()
|
2 |
.allMatch(t -> t.getValue() > 100 );
|
Listing 7
Stream接口提供 findFirst 和findAny操作,用於從stream中取回任意的元素。主要可以用於連接其他的stream操作,例如filter。
findFirst 和findAny返回Optional對象,如Listing 8所示。
1 |
Optional<Transaction> = transactions.stream() |
2 |
.filter(t -> t.getType() == Transaction.GROCERY)
|
3 |
.findAny();
|
Listing 8
Optional<T>類(java.util .Optional)是一個容器類,用於代表一個值存在或不存在。Listing 8中,findAny可能並不會返回任何grocery類型的交易。
Optional類有一些方法用於測試元素是否存在。例如,如果有交易存在,我們可以選擇使用ifPresent方法選擇對optional對象上應用操作,如Listing 9(我們隻是打印交易)。
1 |
transactions.stream() |
2 |
.filter(t -> t.getType() == Transaction.GROCERY)
|
3 |
.findAny()
|
4 |
.ifPresent(System.out::println);
|
Listing 9
Mapping. Stream支持map方法,使用function(java.util.function.Function)作為參數用於映射stream中的元素到另外一種形式。function會應用到每一個元素,映射元素到新的元素。
例如,你可能想要從stream的每個元素中抽出信息。Listing 10的例子中,我們從一個list上返回每個詞長度的list。Reducing. 目前,我們所見的結束操作返回boolean(allMatch等),void(forEach),或一個Optional對象(findAny等)。並且同樣已經使用collect組合所有stream中的元素為List。
1 |
List<String> words = Arrays.asList( "Oracle" , "Java" , "Magazine" );
|
2 |
List<Integer> wordLengths = |
3 |
words.stream()
|
4 |
.map(String::length)
|
5 |
.collect(toList());
|
Listing 10
當然,你同樣可以組合stream中的所有元素表述成更複雜的處理請求,例如,最高ID的交易是什麼?或計算所有交易額的總數。
這可以使用stream上的reduce操作,這個操作重複地為每個元素應用操作(例如,添加兩個數字),直到產生結果。函數式程序中一般稱這操作為折疊操作(fold),你可以把這個操作看成是重複地折疊紙張的一部分(你的stream),直到形成一個小正方形,這就是折疊操作的結果。
先看下我們如何使用for循環計算list的和:
1 |
int sum = 0 ;
|
2 |
for ( int x : numbers) {
|
3 |
sum += x;
|
4 |
} |
Numbers list上的每個元素重複地使用添加操作來產生一個結果。實際上,我們縮小numbers list到一個數值。代碼中則有兩個參數:sum變量的初始值,例子上為0,和組合所有list元素的操作,例子上為+。
使用stream的reduce方法,我們可以累加所有的stream元素。如 Listing 11所示的。
reduce方法使用兩個參數:
1 |
int sum = numbers.stream().reduce( 0 , (a, b) -> a + b);
|
Listing 11
一個初始值,0
BinaryOperator<T>,用於組合兩個元素並產生一個新的值。
reduce方法本質上抽象了重複的應用模式。其他查詢例如”計算產品”或”計算最大值(見Listing 12)”則是成為reduce方法的特定例子。
1 |
int product = numbers.stream().reduce( 1 , (a, b) -> a * b);
|
2 |
int product = numbers.stream().reduce( 1 , Integer::max);
|
Listing 12
Numeric Streams
現在,已經看過了使用reduce方法用於計算整數stream和的例子。但是,這其中還是有一定的開銷:我們執行多次裝箱(boxing)操作,重複的在integer對象上求和。如果可以調用一個sum方法,可能會更好一點,正如Listing 13所示,是否更明確我們代碼的目的?
1 |
int statement = transactions.stream()
|
2 |
.map(Transaction::getValue)
|
3 |
.sum(); // error since Stream has no sum method
|
Listing 13
Java SE 8 引入3個特定的primitive stream接口用於處理這個問題–IntStream,DoubleStream和LongStream–各自代表stream中的元素是int,double和long。
通常要轉換stream到特定版本的stream所執行的方法是mapToInt,mapToDouble和mapToLong。這些方法工作起來完全像是我們之前見到的map方法,不同的是這些方法返回特定的stream而不是Stream<T>。例如,我們可以改進Listing 13的代碼,如Listing 14所展示的。你同樣可以通過裝箱(boxed)操作從primitive stream轉換為某個對象stream。
1 |
int statementSum =
|
2 |
transactions.stream()
|
3 |
.mapToInt(Transaction::getValue)
|
4 |
.sum(); // works!
|
Listing 14
最後,另一個numeric streams有用的形式是數字範圍(numeric ranges)。例如,你可能想要產生所有1到100之間的數值。Java SE 8則引入了 IntStream, DoubleStream, 和LongStream上可用的2個靜態方法輔助產生這樣的範圍:range和rangeClosed。
這兩個方法都使用範圍的起始作為首個參數,範圍的結束作為第二個參數。range方法是開區間,而rangeClosed是閉區間的。 Listing 15則是一個使用rangeClose方法的例子,返回10到30之間數值的stream。
1 |
IntStream oddNumbers = |
2 |
IntStream.rangeClosed( 10 , 30 )
|
3 |
.filter(n -> n % 2 == 1 );
|
Listing 15
Building Streams
有幾種方式用於構建stream。我們已經看到如何從集合上獲取到stream。同樣,我也使用了number stream。你同樣可以從值、數組或文件上建立stream。此外甚至可以從一個函數上獲取stream 來產生無限的stream。
從值或從數組上建立stream十分簡單:隻要為值調用Stream.of的靜態方法和為數組調用Arrays.stream生成。如 Listing 16所示。
1 |
Stream<Integer> numbersFromValues = Stream.of( 1 , 2 , 3 , 4 );
|
2 |
int [] numbers = { 1 , 2 , 3 , 4 };
|
3 |
IntStream numbersFromArray = Arrays.stream(numbers); |
Listing 16
同樣也可以使用Files.lines靜態方法將文件轉換為一個stream。例如,Listing 17計算文件中的行數。
1 |
long numberOfLines =
|
2 |
Files.lines(Paths.get(“yourFile.txt”), Charset.defaultCharset())
|
3 |
.count();
|
Listing 17
Infinite streams. 最後,在我們結束關於stream的這篇文章之前,還有一個令人興奮的概念。到目前為止,應該理解stream內的元素是按需產生的。這裏有兩個靜態方法–Stream.iterate 和 Stream.generate可以從函數上建立stream。然而,由於元素是按需計算的,這兩個操作可以一直產生元素。這就是為什麼稱為 infinite stream:沒有固定大小的stream,與我們從固定集合建立的流相比。
Listing 18 是使用iterate的例子,創建一個所有10倍數的數字stream。Iterate方法使用一個初始值(例子上是,0)和一個用於連續地產生每個新值的lambda(類型為UnaryOperator<T>)。
1 |
Stream<Integer> numbers = Stream.iterate( 0 , n -> n + 10 );
|
Listing 18
我們可以把這個無限的stream轉換成固定大小的stream,通過使用limit操作。例如,我們可以限製stream的大小為5,如Listing 19所示。
1 |
numbers.limit( 5 ).forEach(System.out::println); // 0, 10, 20, 30, 40
|
Listing 19
Conclusion
Java SE 8 引入的stream API,可以讓我們表達更複雜的數據處理邏輯。本文中,你已經看到stream支持許多方法,例如filter,map,reduce和iterate,這些方法組合可以寫出簡潔的代碼並表達數據處理查詢。這種新的代碼編寫方式與Java SE8 之前你要處理的集合十分的不同。顯然,這有許多好處。首先,Stream API使用了許多技術,例如惰性和短路來優化數據處理查詢。其次,stream可以是並行自動地使用多核心架構。本係列的下一章節中,我們會表述更高級的操作,例如flatMap和collect。
最後更新:2017-05-22 17:01:26