英語の勉強中です。
The Java TutorialsのAggregate Operationsのところを読んでテキトーに訳した。並行プログラミングあんま詳しくないんで、やや雑な訳になったが・・・・・・致し方なし。
Lesson: 集約操作(Aggregate Operations)
Note:このセクションのコンセプトをより良く理解するには、Lambda Expressions とMethod Referencesのセクションを確認してください。
コレクションは何のために使用するのでしょうか? 単にコレクションにオブジェクトを格納して削除することでは無いでしょう。大抵の場合、コレクションに格納された要素を検索することと思います。
Lambda Expressions のセクションで解説されたシナリオを思い返してみてください。いま、ソーシャルネットワーキングアプリケーションを作ろうとしている、と仮定します。管理者がある種のアクション、例えば検索条件を満たすソーシャルネットワーキングアプリケーションのメンバへのメッセージ送信のような、を実行できる機能を作成したいとします。
ソーシャルネットワーキングアプリケーションのメンバは以下のようなPersonクラスで表現される、と仮定します。
public class Person { public enum Sex { MALE, FEMALE } String name; LocalDate birthday; Sex gender; String emailAddress; // ... public int getAge() { // ... } public String getName() { // ... } }
以下の例は、for-eachループでコレクションroster
に含まれるすべてのメンバ名を表示します。
for (Person p : roster) {
System.out.println(p.getName());
}
以下の例は、コレクションroster
に含まれるすべてのメンバ名を表示しますが、forEachに集約操作を使用しています。
roster
.stream()
.forEach(e -> System.out.println(e.getName());
この例の場合では、集約操作を使用する方がfor-eachループよりも長くなっていますが、bulk-dataオペレーションを使用するやり方は、より複雑なタスクの場合には簡潔な表現となるでしょう。
以降のトピックが扱うのは以下の通りです。
このセクションで説明に使用されるコードの抜粋はBulkDataOperationsExamplesから行っています。
Pipelines and Streams
パイプライン(pipeline)とは、集約操作のシーケンスです。以下の例は、コレクションroster
に含まれる男性メンバを表示するもので、集約操作のfilter
とforEach
で構成されるパイプラインで処理を行っています。
roster .stream() .filter(e -> e.getGender() == Person.Sex.MALE) .forEach(e -> System.out.println(e.getName()));
比較として、for-eachループでコレクションroster
に含まれる男性メンバーを表示するのは以下の通りです。
for (Person p : roster) { if (p.getGender() == Person.Sex.MALE) { System.out.println(p.getName()); } }
- ソース(source):コレクション、配列、generator function、I/O channelのことです。この例におけるソースはコレクション
roster
のことです。 - ゼロ個以上の中間操作(intermediate operations):
filter
のような中間操作は新しいストリームを生成します。
ストリーム(stream)とは、要素のシーケンスです。コレクションとは異なり、要素を格納するデータ構造ではありません。その代りに、ストリームはソースからパイプラインに値を受け渡します。この例ではコレクションroster
でstream
メソッドを呼び出すことでストリームを生成しています。
filter
は述語(このメソッドの引数)にマッチする要素を含む新しいストリームを返します。この例での述語はラムダ式e -> e.getGender() == Person.Sex.MALE
です。このラムダ式は、もしオブジェクトe
のgender
フィールドがPerson.Sex.MALE
であればtrue
を返します。その結果、filter
はコレクションroster
の全男性メンバを含むストリームを返します。 - 終端操作(terminal operation)。終端操作とは、
forEach
のような、ストリームでないプリミティブ値(double値のような)やコレクションで結果を返すものです。forEach
の場合は何も返しません。この例では、forEach
の引数はラムダ式e -> System.out.println(e.getName())
で、オブジェクトe
のgetName
メソッドを呼び出しています。(Javaランタイムとコンパイルはオブジェクトe
の型はPerson
と推論します。)
以下の例はコレクションroster
に含まれる全男性の年齢の平均を、集約操作のfilter
, mapToInt
, average
で構成されるパイプラインを使用して、計算します。
double average = roster .stream() .filter(p -> p.getGender() == Person.Sex.MALE) .mapToInt(Person::getAge) .average() .getAsDouble();
mapToInt
はIntStream
型(integerのみで構成される)の新しいストリームを返します。この操作は引数に指定される関数をストリームの各要素に適用します。この例では、関数はPerson::getAge
で、メンバの年齢を返すメソッド参照です。(代わりにラムダ式e -> e.getAge()
も使用できます) その結果として、この例はmapToInt
はコレクションroster
の全男性メンバの年齢を含むストリームを返します。
average
はIntStream
に含まれる要素の平均値を計算します。帰り値の型はOptionalDouble
です。もしストリームに要素が何もなければ、average
はOptionalDouble
の空インスタンスを返し、getAsDouble
メソッドはNoSuchElementException
をスローします。JDKはaverage
のようなストリームの内容を結合して一つの値を返す終端操作を多数提供しています。これらの操作はreduction operationsと呼ばれ、Reductionセクションで詳細な情報があります。
Differences Between Aggregate Operations and Iterators
forEach
のような集約操作はイテレータと似ています。しかし、根本的に異なる要素がいくつかあります。
- 内部イテレーションの使用(They use internal iteration): 集約操作にはコレクションの次の要素を処理するための
next
のようなメソッドはありません。internal delegationでは、アプリケーションはイテレートするコレクションを指定し、JDKがコレクションのイテレート方法を決定します。外部イテレーション(external iteration)では、アプリケーションは対象のコレクションとイテレート方法の両方とも指定します。しかし、外部イテレーションはコレクションの要素をシーケンシャルにだけイテレート可能です。内部イテレーションにこの制限はありません。このことは、下位要素に分割された問題を同時に解き、それらの解の結果を結合するような、パラレル計算を活用するのが容易になります。詳細な情報についてはParallelismを参照してください。 - ストリームによる処理(They process elements from a stream): 集約操作はストリーム経由で要素を処理し、コレクションには直接触れません。そのため、ストリーム操作(stream operations)と呼ばれます。
- 振る舞いのパラメーター化(They support behavior as parameters): 多くの集約操作の引数にはlambda expressionsを指定できます。これは、集約操作それぞれにおける振る舞いをカスタマイズ可能にします。
Reduction
このAggregate Operationsのセクションでは、コレクションroster
の全男性メンバの平均年齢を計算するパイプライン操作について解説します。
double average = roster .stream() .filter(p -> p.getGender() == Person.Sex.MALE) .mapToInt(Person::getAge) .average() .getAsDouble();
JDKにはストリームの要素を結合して一つの要素を返す終端操作(たとえばaverage、sum、min、max、countなど)が多数含まれています。それらの操作はreduction operationsと呼ばれています。また、JDKには単一値を返すかわりにコレクションを返すreduction operationsもあります。reduction operationsは特定のタスク、たとえば平均値の探索や要素のカテゴリ別グルーピングなど、を実行します。ただ、JDKは汎用のreduction operationsであるreduceとcollectを提供しており、詳細はこのセクションで扱います。
このセクションは以下のトピックについて説明します。
このセクションで説明されるサンプルコードの抜粋はReductionExamplesを参照してください。
The Stream.reduce Method
Stream.reduceメソッドは汎用のreduction operationです。コレクションroster
の男性メンバの年齢を合計する、以下のパイプラインを見てみます。これはStream.sumというreduction operationを使用しています。
Integer totalAge = roster .stream() .mapToInt(Person::getAge) .sum();
Stream.reduce
で同じ値を計算する以下のパイプラインと比較してみます。
Integer totalAgeReduce = roster .stream() .map(Person::getAge) .reduce( 0, (a, b) -> a + b);
この例のreduce
は二つの引数を取ります。
identity
: identityはreductionの初期値で、ストリームに要素が何も無い場合にはデフォルトの結果になります。この例では、identityは0
で、年齢の合計の初期値となり、コレクションroster
が空の場合のデフォルトの結果となります。accumulator
: accumulator functionは二つの引数を取ります。一つはreductionの部分的な結果(partial result)(この例では、その時点までに処理された整数の合計)で、もう一つはストリームの次の要素(この例では、整数)です。この関数は、次の部分的な結果を返します。この例では、accumulator functionはラムダ式で、二つのInteger
を足してInteger
を返しています。
(a, b) -> a + b
reduce
は常に新しい値を返します。また、accumulator functionもストリーム要素を処理する度に新しい値を返します。このとき、ストリームの要素を、コレクションのような複雑なオブジェクトにreduceしたい、と仮定します。これはアプリケーションのパフォーマンスに悪影響を与える可能性があります。もしreduce
がコレクションに要素を追加する場合、accumulator functionは要素を処理する度に、新しいコレクションを生成します。これは非効率的です。そうではなく、既存のコレクションを更新するほうが効率的でしょう。次のセクションで説明する、Stream.collectメソッドを使用することでそれが可能になります。
The Stream.collect Method
要素を処理するとき常に新しい値を生成するreduce
とは異なり、collectメソッドは既存の値を更新あるいはmutatesします。
ストリームの値を合計する方法を考えます。これには二種類のデータが必要で、それは要素数とその合計です。しかし、reduce
メソッドとその他すべてのeduction methods同様、collect
メソッドは一つの値を返すだけです。要素数と合計を保持し続けるメンバ変数を含む新しいデータ型を作成する必要があり、たとえばAveragerでは以下のようになります。
class Averager implements IntConsumer { private int total = 0; private int count = 0; public double average() { return count > 0 ? ((double) total)/count : 0; } public void accept(int i) { total += i; count++; } public void combine(Averager other) { total += other.total; count += other.count; } }
以下のパイプラインは、すべての男性メンバの平均年齢を計算するために、Averager
クラスcollect
メソッドを使用しています。
Averager averageCollect = roster.stream() .filter(p -> p.getGender() == Person.Sex.MALE) .map(Person::getAge) .collect(Averager::new, Averager::accept, Averager::combine); System.out.println("Average age of male members: " + averageCollect.average());
この例のcollect
は三つの引数を取ります。
supplier
: supplierはfactory functionで、新規のインスタンスを生成します。collect
では、結果を格納するコンテナインスタンスを生成します。上記の例では、Averager
クラスの新しいインスタンスです。accumulator
: accumulator functionは、ストリームの要素を結果を格納するコンテナインスタンスに取り込みます。上記の例では、count
変数をカウントアップし、男性メンバの年齢を意味するストリームの要素をtotal
変数に足すことで、Averager
を修正しています。combiner
: combiner functionは、引数に二つの結果を格納するコンテナインスタンスを取りそれらをマージします。上記の例では、引数のAverager
インスタンスのcount
とtotal
を、自身のメンバ変数に足しています。supplierは、
reduce
のidentity elementが値なのとは対照的に、ラムダ式(もしくはmethod reference)です。- accumulatorとcombiner functionsには戻り値がありません。
- parallel streamsで
collect
を使用できます。詳細についてはParallelismを参照してください。(parallel streamでcollect
メソッドを動作させる場合、JDKはcombiner functionが新しいオブジェクトを生成する度に、新しいスレッドを生成します。上記の例の新しいオブジェクトはAverager
です。その結果、同期化について心配する必要はありません。)
JDKはストリームの要素の平均を算出するaverage
を提供しますが、ストリームの要素からなんらかの値を計算する必要があれば、collect
とカスタムクラスを使用できます。
collect
はコレクションにとって最適です。以下の例はcollet
でコレクションの男性メンバの氏名を抽出しています。
List<String> namesOfMaleMembersCollect = roster .stream() .filter(p -> p.getGender() == Person.Sex.MALE) .map(p -> p.getName()) .collect(Collectors.toList());
このcollect
の使い方は、一つのCollector引数を取ります。このクラスは三つの引数(supplier, accumulator, combiner functions)を要求するcollect
で使用される関数をカプセル化しています。
Collectorsクラスは多くの有用なreduction operationsを備えており、たとえば様々な条件で要素をまとめたり、コレクションに要素を蓄積したりできます。そうしたreduction operationsはCollector
クラスのインスタンスを返すので、collect
の引数として使用できます。
上記の例が使用しているCollectors.toListは、ストリームの要素をList
の新しいインスタンスに蓄積します。Collectors
クラスのメソッドと同様に、toList
は、コレクションではなくCollector
のインスタンスを返します。
以下の例はコレクションroster
のメンバを性別ごとにグループ化します。
Map<Person.Sex, List<Person>> byGender = roster .stream() .collect( Collectors.groupingBy(Person::getGender));
groupingByは、キーが引数で指定されるラムダ式(classification functionと呼ばれます)の適用結果となるマップを返します。この例では、返されるマップは二つのキーPerson.Sex.MALE
とPerson.Sex.FEMALE
を持ちます。キーに当たるclassification functionがストリームの要素を処理するとき、要素はキーに対応するList
のインスタンスに入れられます。この例では、キーPerson.Sex.MALE
に対応する値は、全男性メンバを含むList
になります。
以下の例は、コレクションroster
の全メンバを性別でグループ分けしてその氏名のリストを取得します。
Map<Person.Sex, List<String>> namesByGender = roster .stream() .collect( Collectors.groupingBy( Person::getGender, Collectors.mapping( Person::getName, Collectors.toList())));
この例はgroupingByは二つの引数を取り、classification functionとCollector
のインスタンスです。Collector
引数はdownstream collectorと呼ばれます。このcollectorは、別のcollectorの結果が適用されるJava runtimeのcollectorです。従って、groupingBy
は、このgroupingBy
が生成するList
にcollect
メソッドを適用できます。この例では、mapping functionPerson::getName
をストリームの各要素で実行するcollector mappingを適用しています。その結果、生成されるストリームはメンバの氏名のみから構成されるようになります。パイプラインは一つ以上のdownstream collectorsを持つことが出来、この例の場合はmultilevel reductionと呼ばれます。
以下の例は性別ごとのメンバの年齢の合計を計算しています。
Map<Person.Sex, Integer> totalAgeByGender = roster .stream() .collect( Collectors.groupingBy( Person::getGender, Collectors.reducing( 0, Person::getAge, Integer::sum)));
reducingは三つの引数を取ります。
identity
:Stream.reduce
と同様に、、identityは二つの役割があり、reductionの初期値とストリームに要素が無い場合のデフォルト値となります。上記の例では、identityは0
で、年齢の合計の初期値でメンバがゼロ人の場合にはデフォルトの結果となります。mapper
:reducing
はストリームの全要素にこのmapper functionを適用します。上記の例では、mapper functionは各メンバの年齢を取得しています。operation
: このoperation functionは、mapper
から返される値を集約するために使用されます。上記の例では、operation functionはInteger
を合計していきます。
以下の例は性別ごとのメンバの年齢の平均を計算しています。
Map<Person.Sex, Double> averageAgeByGender = roster
.stream()
.collect(
Collectors.groupingBy(
Person::getGender,
Collectors.averagingInt(Person::getAge)));
Parallelism
パラレル計算は、問題をサブ要素に分割して、同時(サブ要素を別々のスレッドで動作させるパラレルなやり方で)にサブ要素を解き、サブ要素の結果を最終結果にまとめる、ことを指します。Java SEはFork/Join frameworkを提供しており、アプリケーションでパラレル計算を容易に実装できます。しかし、このフレームワークでは、どのようにある問題を分割する(パーティーション化する)のかを定義しなければなりません。集約操作では、Javaランタイムがパーティショニングと結果の集約を実行します。
コレクションを使用するアプリケーションでパラレル化を実装するのが難しい理由の一つはコレクションが非スレッドセーフなことで、複数のスレッドがThread InterferenceやMemory Consistency Errorsを引き起こさずにコレクションを操作できないことを意味します。Collections FrameworkはWrapper Implementationsを提供しており、任意のコレクションへ機械的に同期化を追加します。しかし、同期化はthread contentionを引き起こします。これはパラレルに実行するスレッドを妨害するのでthread contentionは避けたいところです。集約操作とパラレルストリーム(parallel streams)は、メソッド呼び出し中はコレクションを更新できない条件により、非スレッドセーフなコレクションでパラレル化の実装を可能にします。
注意点として、十分なデータ量とCPUコアがあるとしても、パラレル化は逐次的な処理よりも機械的に早くなるわけではありません。集約操作はパラレル化の実装を容易にしますが、アプリケーションがパラレル化に適しているかどうかを判断するのは依然として開発者の役割です。
このセクションでは以下のトピックについて説明します。
このセクションで説明に使用されるソースコードはParallelismExamplesにあります。
Executing Streams in Parallel
ストリームはシリアルでもパラレルでも実行できます。ストリームがパラレルに実行されるとき、Javaランタイムはストリームを複数のサブストリームにパーティーション化します。集約操作はイテレートするとパラレルにサブストリームを処理し、結果を結合します。
ストリームを生成するとき、特に指定されない限りは常にシリアルなストリームになります。パラレルなストリームを生成するには、Collection.parallelStreamを呼び出します。もしくは、BaseStream.parallelを呼び出します。たとえば、以下のステートメントはパラレルに全男性メンバの平均年齢を計算します。
double average = roster .parallelStream() .filter(p -> p.getGender() == Person.Sex.MALE) .mapToInt(Person::getAge) .average() .getAsDouble();
Concurrent Reduction
再度、性別ごとにメンバをグループ化する以下の例(reductionセクションで解説)を考えてみます。この例はcollect
を呼び出し、コレクションroster
をMap
に集約します。
Map<Person.Sex, List<Person>> byGender = roster .stream() .collect( Collectors.groupingBy(Person::getGender));
以下はパラレル化したものです。
ConcurrentMap<Person.Sex, List<Person>> byGender = roster .parallelStream() .collect( Collectors.groupingByConcurrent(Person::getGender));
これはconcurrent reductionと呼ばれています。Javaランタイムは、collect
を含む個々のパイプラインが以下の条件をすべて満たす場合、concurrent reductionを実行します。
- ストリームがパラレルであること。
collect
の引数であるcollectorがCollector.Characteristics.CONCURRENT.特性を有すること。collectorの特性を確認するには、Collector.Characteristicsを呼び出します。- ストリームに順序性が無いかcollectorがCollector.Characteristics.UNORDERED特性を有すること。ストリームに順序性が無いことを保障するには、BaseStream.unorderedを呼び出します。
Note: このサンプルはMap
ではなくConcurrentMapを返し、groupingBy
ではなくgroupingByConcurrentを呼び出します。(ConcurrentMap
についてはConcurrent Collectionsセクションを参照してください。) groupingByConcurrent
とは異なり、groupingBy
はパラレルストリームには不十分です。(その理由はキーで二つのマップをマージすることにあり、これはコンピュータ的には高コストな処理です。)同様に、パラレルストリームではCollectors.toMapよりも、Collectors.toConcurrentMapの方が適しています。
Ordering
パイプラインがストリームの要素を処理する順序は、ストリームがシリアルもしくはパラレルに実行されるかどうか・ストリームのソース・中間操作、に依存します。たとえば、ArrayList
のインスタンスをforEach
で要素を何回か表示する例を考えてみます。
Integer[] intArray = {1, 2, 3, 4, 5, 6, 7, 8 }; List<Integer> listOfIntegers = new ArrayList<>(Arrays.asList(intArray)); System.out.println("listOfIntegers:"); listOfIntegers .stream() .forEach(e -> System.out.print(e + " ")); System.out.println(""); System.out.println("listOfIntegers sorted in reverse order:"); Comparator<Integer> normal = Integer::compare; Comparator<Integer> reversed = normal.reversed(); Collections.sort(listOfIntegers, reversed); listOfIntegers .stream() .forEach(e -> System.out.print(e + " ")); System.out.println(""); System.out.println("Parallel stream"); listOfIntegers .parallelStream() .forEach(e -> System.out.print(e + " ")); System.out.println(""); System.out.println("Another parallel stream:"); listOfIntegers .parallelStream() .forEach(e -> System.out.print(e + " ")); System.out.println(""); System.out.println("With forEachOrdered:"); listOfIntegers .parallelStream() .forEachOrdered(e -> System.out.print(e + " ")); System.out.println("");
この例は5つのパイプラインから構成されています。出力は以下のようになります。
listOfIntegers: 1 2 3 4 5 6 7 8 listOfIntegers sorted in reverse order: 8 7 6 5 4 3 2 1 Parallel stream: 3 4 1 6 2 5 7 8 Another parallel stream: 6 3 1 5 7 8 4 2 With forEachOrdered: 8 7 6 5 4 3 2 1
上記の例は以下のような処理をしています。
- 一つ目のパイプラインは
listOfIntegers
をリストに要素が追加された順序で要素を表示します。 - 二つ目のパイプラインはCollections.sortメソッドでソートした後で
listOfIntegers
の要素を表示しています。 - 三つ目と四つ目のパイプラインは一見するとランダムな順序でリストの要素を表示しています。ストリームは要素を処理するとき内部イテレーションを使用することを思い出して下さい。それ故に、パラレルにストリームを実行するとき、Javaコンパイラとランタイムは、ストリームへの指定が特になければパラレルコンピューティングによる利点を最大化するために、ストリームの要素の処理順序を決定します。
- 五つ目のパイプラインはforEachOrderedメソッドを使用しており、ストリームをシリアルかパラレルで実行するかどうかにかかわらず、そのソースが指定する順序でストリームの要素を処理します。ただし、パラレルストリームで
forEachOrdered
のような操作をすることは、パラレル化の利点を損なう可能性があることに注意が必要です。
Side Effects
あるメソッドや式は副作用を持つことがあり、値を生成したり返すことに加えてコンピュータの状態を変更することがあります。例としては、mutable reductions(collect
を使用する操作で詳細はReductionを参照)やデバッグ用のSystem.out.println
呼び出しも該当します。JDKはパイプラインにおける特定の副作用を処理します。特に、collect
メソッドは最も一般的なストリーム操作を実行するために設計されており、parallel-safeな副作用を持っています。forEach
とpeek
などは副作用を起こす操作用に設計されており、たとえばvoidを返すラムダ式や、何も実行しないが副作用はあるSystem.out.println
などのことです。よって、注意してforEach
とpeek
を使うべきです。もし、パラレルストリームでそれらの操作をしようする場合、Javaランタイムは引数で指定されるラムダ式を複数スレッドでコンカレントに実行する可能性があります。加えて、filter
とmap
のような操作で副作用を持つラムダ式を引数として渡せません。以降のセクションでinterferenceとstateful lambda expressionsについて説明します。これらは両方とも、特にパラレルストリームにおいて、副作用の原因となり、矛盾の発生(return inconsistent)や予測不可能な結果(unpredictable results)に繋がることがあります。とはいえ、まずはlazinessについて触れますが、その理由はこれが干渉(interference)に直接影響するためです。
Laziness
すべての中間操作はlazyです。値が要求時にだけ評価されるなら、式、メソッド、アルゴリズムはlazyです。(アルゴリズムは、直ちに処理もしくは評価される場合にはeagerです) 終端操作が開始するまで中間操作はストリームの処理を開始しないので、中間操作はlazyです。lazyにストリームを処理することは、Javaコンパイラとランタイムがストリームの処理を最適化できるようになります。たとえば、Aggregate Operationsセクションで説明しているfilter
-mapToInt
-average
のようなパイプラインなどでは、filter
から複数の要素を取得し、mapToInt
が受け取って何個かの整数をまとめて、average
に渡すことができます。average
は、ストリームから要求される全要素を取得し終わるまでこのプロセスを繰り返し、平均を計算します。
Interference
ストリームにおけるラムダ式は衝突(interfere)すべきではありません。衝突が発生するのは、パイプラインがストリームを処理している時に、ストリームのソースが変更される場合です。たとえば、以下のコードはList listOfStrings
の文字列結合を試行しますが、ConcurrentModifiedException
をスローします。
try { List<String> listOfStrings = new ArrayList<>(Arrays.asList("one", "two")); // 終端操作が開始された後にソースへ"three"文字列を追加しよう // とするので失敗する。 String concatenatedString = listOfStrings .stream() // 下記は禁止! ここで衝突が発生する。 .peek(s -> listOfStrings.add("three")) .reduce((a, b) -> a + " " + b) .get(); System.out.println("Concatenated string: " + concatenatedString); } catch (Exception e) { System.out.println("Exception caught: " + e.toString()); }
この例はlistOfStrings
に含まれる文字列を終端操作reduce
でOptional<String>
に結合します。しかし、ここではパイプラインは中間操作peek
を呼び出しており、listOfStrings
に新しい要素を追加しようとしています。すべての中間操作はlazyなことを思い出して下さい。この例のパイプラインはget
が呼び出される時に実行を開始し、get
が完了するときに実行を終了します。peek
の引数はパイプラインの実行中にストリームのソースを修正するので、JavaランタイムはConcurrentModifiedException
をスローします。
Stateful Lambda Expressions
ストリーム操作の引数ではstateful lambda expressionsの使用は避けて下さい。stateful lambda expressionsとは、結果がパイプライン実行中に変更される任意の状態に依存するコードのことです。以下のコードはList listOfIntegers
からmap
中間操作を経由して新規のList
インスタンスに要素を追加しています。二回実行し、最初はシリアル、次はパラレルストリームで実行します。
List<Integer> serialStorage = new ArrayList<>(); System.out.println("Serial stream:"); listOfIntegers .stream() // 下記は禁止! stateful lambda expressionを使用している。 .map(e -> { serialStorage.add(e); return e; }) .forEachOrdered(e -> System.out.print(e + " ")); System.out.println(""); serialStorage .stream() .forEachOrdered(e -> System.out.print(e + " ")); System.out.println(""); System.out.println("Parallel stream:"); List<Integer> parallelStorage = Collections.synchronizedList( new ArrayList<>()); listOfIntegers .parallelStream() // 下記は禁止! stateful lambda expressionを使用している。 .map(e -> { parallelStorage.add(e); return e; }) .forEachOrdered(e -> System.out.print(e + " ")); System.out.println(""); parallelStorage .stream() .forEachOrdered(e -> System.out.print(e + " ")); System.out.println("");
ラムダ式e -> { parallelStorage.add(e); return e; }
はstateful lambda expressionです。このコードは実行するたびに結果が変化します。結果は以下のようになります。
Serial stream: 8 7 6 5 4 3 2 1 8 7 6 5 4 3 2 1 Parallel stream: 8 7 6 5 4 3 2 1 1 3 6 2 4 5 8 7
forEachOrdered
はストリームを指定された順序で要素を処理し、これはストリームがシリアルかパラレルかどうかは関係ありません。しかし、ストリームがパラレルに実行されるとき、map
はJavaランタイムとコンパイラによって指定されるストリームの要素を処理します。その結果、ラムダ式e -> { parallelStorage.add(e); return e; }
がList parallelStorage
に要素を追加する順序は、コードを実行するたびに変化します。決定論的かつ予想可能な結果のためには、ストリーム操作の引数のラムダ式は非ステートフルなことを守ってください。
Note: 上記の例は、 List parallelStorage
をスレッドセーフにするために、synchronizedListメソッドを実行しています。コレクションは非スレッドセーフなことに注意してください。これは、複数スレッドは同時に特定のコレクションにアクセスすべきでないことを意味しています。いま、parallelStorage
の生成時にsynchronizedList
メソッドを使用しないと仮定します。
List<Integer> parallelStorage = new ArrayList<>();
上記の例は不規則な振る舞いになります。その理由は、個々のスレッドがList
インスタンスにアクセスするとき、スケジューリングに同期化などの仕組み無しで、複数スレッドがparallelStorage
から取得や更新をするためです。その結果、この例は以下のような出力になります。
Parallel stream: 8 7 6 5 4 3 2 1 null 3 5 4 7 8 1 2