読者です 読者をやめる 読者になる 読者になる

kagamihogeの日記

kagamihogeの日記です。

The Java TutorialsのAggregate operationsのところをテキトーに訳した

英語の勉強中です。

The Java TutorialsAggregate Operationsのところを読んでテキトーに訳した。並行プログラミングあんま詳しくないんで、やや雑な訳になったが・・・・・・致し方なし。

Lesson: 集約操作(Aggregate Operations)

Note:このセクションのコンセプトをより良く理解するには、Lambda Expressions Method Referencesのセクションを確認してください。

コレクションは何のために使用するのでしょうか? 単にコレクションにオブジェクトを格納して削除することでは無いでしょう。大抵の場合、コレクションに格納された要素を検索することと思います。

Lambda Expressions のセクションで解説されたシナリオを思い返してみてください。いま、ソーシャルネットワーキングアプリケーションを作ろうとしている、と仮定します。管理者がある種のアクション、例えば検索条件を満たすソーシャルネットワーキングアプリケーションのメンバへのメッセージ送信のような、を実行できる機能を作成したいとします。

ソーシャルネットワーキングアプリケーションのメンバは以下のようなPersonクラスで表現される、と仮定します。

public class Person {

    public enum Sex {
        MALE, FEMALE
    }

    String name;
    LocalDate birthday;
    Sex gender;
    String emailAddress;
    
    // ...

    public int getAge() {
        // ...
    }

    public String getName() {
        // ...
    }
}

以下の例は、for-eachループでコレクションrosterに含まれるすべてのメンバ名を表示します。

for (Person p : roster) {
    System.out.println(p.getName());
}

以下の例は、コレクションrosterに含まれるすべてのメンバ名を表示しますが、forEachに集約操作を使用しています。

roster
    .stream()
    .forEach(e -> System.out.println(e.getName());

この例の場合では、集約操作を使用する方がfor-eachループよりも長くなっていますが、bulk-dataオペレーションを使用するやり方は、より複雑なタスクの場合には簡潔な表現となるでしょう。

以降のトピックが扱うのは以下の通りです。

このセクションで説明に使用されるコードの抜粋はBulkDataOperationsExamplesから行っています。

Pipelines and Streams

パイプライン(pipeline)とは、集約操作のシーケンスです。以下の例は、コレクションrosterに含まれる男性メンバを表示するもので、集約操作のfilterforEachで構成されるパイプラインで処理を行っています。

roster
    .stream()
    .filter(e -> e.getGender() == Person.Sex.MALE)
    .forEach(e -> System.out.println(e.getName()));

比較として、for-eachループでコレクションrosterに含まれる男性メンバーを表示するのは以下の通りです。

for (Person p : roster) {
    if (p.getGender() == Person.Sex.MALE) {
        System.out.println(p.getName());
    }
}

パイプラインは以下のコンポーネントから構成されます。

  • ソース(source):コレクション、配列、generator function、I/O channelのことです。この例におけるソースはコレクションrosterのことです。
  • ゼロ個以上の中間操作(intermediate operations):filterのような中間操作は新しいストリームを生成します。
    ストリーム(stream)とは、要素のシーケンスです。コレクションとは異なり、要素を格納するデータ構造ではありません。その代りに、ストリームはソースからパイプラインに値を受け渡します。この例ではコレクションrosterstreamメソッドを呼び出すことでストリームを生成しています。
    filterは述語(このメソッドの引数)にマッチする要素を含む新しいストリームを返します。この例での述語はラムダ式e -> e.getGender() == Person.Sex.MALEです。このラムダ式は、もしオブジェクトegenderフィールドがPerson.Sex.MALEであればtrueを返します。その結果、filterはコレクションrosterの全男性メンバを含むストリームを返します。
  • 終端操作(terminal operation)。終端操作とは、forEachのような、ストリームでないプリミティブ値(double値のような)やコレクションで結果を返すものです。forEachの場合は何も返しません。この例では、forEachの引数はラムダ式e -> System.out.println(e.getName())で、オブジェクトegetNameメソッドを呼び出しています。(Javaランタイムとコンパイルはオブジェクトeの型はPersonと推論します。)

以下の例はコレクションrosterに含まれる全男性の年齢の平均を、集約操作のfilter, mapToInt, averageで構成されるパイプラインを使用して、計算します。

double average = roster
    .stream()
    .filter(p -> p.getGender() == Person.Sex.MALE)
    .mapToInt(Person::getAge)
    .average()
    .getAsDouble();

mapToIntIntStream型(integerのみで構成される)の新しいストリームを返します。この操作は引数に指定される関数をストリームの各要素に適用します。この例では、関数はPerson::getAgeで、メンバの年齢を返すメソッド参照です。(代わりにラムダ式e -> e.getAge()も使用できます) その結果として、この例はmapToIntはコレクションrosterの全男性メンバの年齢を含むストリームを返します。

averageIntStreamに含まれる要素の平均値を計算します。帰り値の型はOptionalDoubleです。もしストリームに要素が何もなければ、averageOptionalDoubleの空インスタンスを返し、getAsDoubleメソッドNoSuchElementExceptionをスローします。JDKaverageのようなストリームの内容を結合して一つの値を返す終端操作を多数提供しています。これらの操作はreduction operationsと呼ばれ、Reductionセクションで詳細な情報があります。

Differences Between Aggregate Operations and Iterators

forEachのような集約操作はイテレータと似ています。しかし、根本的に異なる要素がいくつかあります。

  • 内部イテレーションの使用(They use internal iteration): 集約操作にはコレクションの次の要素を処理するためのnextのようなメソッドはありません。internal delegationでは、アプリケーションはイテレートするコレクションを指定し、JDKがコレクションのイテレート方法を決定します。外部イテレーション(external iteration)では、アプリケーションは対象のコレクションとイテレート方法の両方とも指定します。しかし、外部イテレーションはコレクションの要素をシーケンシャルにだけイテレート可能です。内部イテレーションにこの制限はありません。このことは、下位要素に分割された問題を同時に解き、それらの解の結果を結合するような、パラレル計算を活用するのが容易になります。詳細な情報についてはParallelismを参照してください。
  • ストリームによる処理(They process elements from a stream): 集約操作はストリーム経由で要素を処理し、コレクションには直接触れません。そのため、ストリーム操作(stream operations)と呼ばれます。
  • 振る舞いのパラメーター化(They support behavior as parameters): 多くの集約操作の引数にはlambda expressionsを指定できます。これは、集約操作それぞれにおける振る舞いをカスタマイズ可能にします。

Reduction

このAggregate Operationsのセクションでは、コレクションrosterの全男性メンバの平均年齢を計算するパイプライン操作について解説します。

double average = roster
    .stream()
    .filter(p -> p.getGender() == Person.Sex.MALE)
    .mapToInt(Person::getAge)
    .average()
    .getAsDouble();

JDKにはストリームの要素を結合して一つの要素を返す終端操作(たとえばaveragesumminmaxcountなど)が多数含まれています。それらの操作はreduction operationsと呼ばれています。また、JDKには単一値を返すかわりにコレクションを返すreduction operationsもあります。reduction operationsは特定のタスク、たとえば平均値の探索や要素のカテゴリ別グルーピングなど、を実行します。ただ、JDKは汎用のreduction operationsであるreducecollectを提供しており、詳細はこのセクションで扱います。

このセクションは以下のトピックについて説明します。

このセクションで説明されるサンプルコードの抜粋はReductionExamplesを参照してください。

The Stream.reduce Method

Stream.reduceメソッドは汎用のreduction operationです。コレクションrosterの男性メンバの年齢を合計する、以下のパイプラインを見てみます。これはStream.sumというreduction operationを使用しています。

Integer totalAge = roster
    .stream()
    .mapToInt(Person::getAge)
    .sum();

Stream.reduceで同じ値を計算する以下のパイプラインと比較してみます。

Integer totalAgeReduce = roster
   .stream()
   .map(Person::getAge)
   .reduce(
       0,
       (a, b) -> a + b);

この例のreduceは二つの引数を取ります。

  • identity: identityはreductionの初期値で、ストリームに要素が何も無い場合にはデフォルトの結果になります。この例では、identityは0で、年齢の合計の初期値となり、コレクションrosterが空の場合のデフォルトの結果となります。
  • accumulator: accumulator functionは二つの引数を取ります。一つはreductionの部分的な結果(partial result)(この例では、その時点までに処理された整数の合計)で、もう一つはストリームの次の要素(この例では、整数)です。この関数は、次の部分的な結果を返します。この例では、accumulator functionはラムダ式で、二つのIntegerを足してIntegerを返しています。
(a, b) -> a + b

reduceは常に新しい値を返します。また、accumulator functionもストリーム要素を処理する度に新しい値を返します。このとき、ストリームの要素を、コレクションのような複雑なオブジェクトにreduceしたい、と仮定します。これはアプリケーションのパフォーマンスに悪影響を与える可能性があります。もしreduceがコレクションに要素を追加する場合、accumulator functionは要素を処理する度に、新しいコレクションを生成します。これは非効率的です。そうではなく、既存のコレクションを更新するほうが効率的でしょう。次のセクションで説明する、Stream.collectメソッドを使用することでそれが可能になります。

The Stream.collect Method

要素を処理するとき常に新しい値を生成するreduceとは異なり、collectメソッドは既存の値を更新あるいはmutatesします。

ストリームの値を合計する方法を考えます。これには二種類のデータが必要で、それは要素数とその合計です。しかし、reduceメソッドとその他すべてのeduction methods同様、collectメソッドは一つの値を返すだけです。要素数と合計を保持し続けるメンバ変数を含む新しいデータ型を作成する必要があり、たとえばAveragerでは以下のようになります。

class Averager implements IntConsumer
{
    private int total = 0;
    private int count = 0;
        
    public double average() {
        return count > 0 ? ((double) total)/count : 0;
    }
        
    public void accept(int i) { total += i; count++; }
    public void combine(Averager other) {
        total += other.total;
        count += other.count;
    }
}

以下のパイプラインは、すべての男性メンバの平均年齢を計算するために、Averagerクラスcollectメソッドを使用しています。

Averager averageCollect = roster.stream()
    .filter(p -> p.getGender() == Person.Sex.MALE)
    .map(Person::getAge)
    .collect(Averager::new, Averager::accept, Averager::combine);
                   
System.out.println("Average age of male members: " +
    averageCollect.average());

この例のcollectは三つの引数を取ります。

  • supplier: supplierはfactory functionで、新規のインスタンスを生成します。collectでは、結果を格納するコンテナインスタンスを生成します。上記の例では、Averagerクラスの新しいインスタンスです。
  • accumulator: accumulator functionは、ストリームの要素を結果を格納するコンテナインスタンスに取り込みます。上記の例では、count変数をカウントアップし、男性メンバの年齢を意味するストリームの要素をtotal変数に足すことで、Averagerを修正しています。
  • combiner: combiner functionは、引数に二つの結果を格納するコンテナインスタンスを取りそれらをマージします。上記の例では、引数のAveragerインスタンスcounttotalを、自身のメンバ変数に足しています。

  • supplierは、reduceのidentity elementが値なのとは対照的に、ラムダ式(もしくはmethod reference)です。

  • accumulatorとcombiner functionsには戻り値がありません。
  • parallel streamsでcollectを使用できます。詳細についてはParallelismを参照してください。(parallel streamでcollectメソッドを動作させる場合、JDKはcombiner functionが新しいオブジェクトを生成する度に、新しいスレッドを生成します。上記の例の新しいオブジェクトはAveragerです。その結果、同期化について心配する必要はありません。)

JDKはストリームの要素の平均を算出するaverageを提供しますが、ストリームの要素からなんらかの値を計算する必要があれば、collectとカスタムクラスを使用できます。

collectはコレクションにとって最適です。以下の例はcolletでコレクションの男性メンバの氏名を抽出しています。

List<String> namesOfMaleMembersCollect = roster
    .stream()
    .filter(p -> p.getGender() == Person.Sex.MALE)
    .map(p -> p.getName())
    .collect(Collectors.toList());

このcollectの使い方は、一つのCollector引数を取ります。このクラスは三つの引数(supplier, accumulator, combiner functions)を要求するcollectで使用される関数をカプセル化しています。

Collectorsクラスは多くの有用なreduction operationsを備えており、たとえば様々な条件で要素をまとめたり、コレクションに要素を蓄積したりできます。そうしたreduction operationsはCollectorクラスのインスタンスを返すので、collectの引数として使用できます。

上記の例が使用しているCollectors.toListは、ストリームの要素をListの新しいインスタンスに蓄積します。Collectorsクラスのメソッドと同様に、toListは、コレクションではなくCollectorインスタンスを返します。

以下の例はコレクションrosterのメンバを性別ごとにグループ化します。

Map<Person.Sex, List<Person>> byGender =
    roster
        .stream()
        .collect(
            Collectors.groupingBy(Person::getGender));

groupingByは、キーが引数で指定されるラムダ式(classification functionと呼ばれます)の適用結果となるマップを返します。この例では、返されるマップは二つのキーPerson.Sex.MALEPerson.Sex.FEMALEを持ちます。キーに当たるclassification functionがストリームの要素を処理するとき、要素はキーに対応するListインスタンスに入れられます。この例では、キーPerson.Sex.MALEに対応する値は、全男性メンバを含むListになります。

以下の例は、コレクションrosterの全メンバを性別でグループ分けしてその氏名のリストを取得します。

Map<Person.Sex, List<String>> namesByGender =
    roster
        .stream()
        .collect(
            Collectors.groupingBy(
                Person::getGender,                      
                Collectors.mapping(
                    Person::getName,
                    Collectors.toList())));

この例はgroupingByは二つの引数を取り、classification functionとCollectorインスタンスです。Collector引数はdownstream collectorと呼ばれます。このcollectorは、別のcollectorの結果が適用されるJava runtimeのcollectorです。従って、groupingByは、このgroupingByが生成するListcollectメソッドを適用できます。この例では、mapping functionPerson::getNameをストリームの各要素で実行するcollector mappingを適用しています。その結果、生成されるストリームはメンバの氏名のみから構成されるようになります。パイプラインは一つ以上のdownstream collectorsを持つことが出来、この例の場合はmultilevel reductionと呼ばれます。

以下の例は性別ごとのメンバの年齢の合計を計算しています。

Map<Person.Sex, Integer> totalAgeByGender =
    roster
        .stream()
        .collect(
            Collectors.groupingBy(
                Person::getGender,                      
                Collectors.reducing(
                    0,
                    Person::getAge,
                    Integer::sum)));

reducingは三つの引数を取ります。

  • identity: Stream.reduceと同様に、、identityは二つの役割があり、reductionの初期値とストリームに要素が無い場合のデフォルト値となります。上記の例では、identityは0で、年齢の合計の初期値でメンバがゼロ人の場合にはデフォルトの結果となります。
  • mapper: reducingはストリームの全要素にこのmapper functionを適用します。上記の例では、mapper functionは各メンバの年齢を取得しています。
  • operation: このoperation functionは、mapperから返される値を集約するために使用されます。上記の例では、operation functionはIntegerを合計していきます。

以下の例は性別ごとのメンバの年齢の平均を計算しています。

Map<Person.Sex, Double> averageAgeByGender = roster
    .stream()
    .collect(
        Collectors.groupingBy(
            Person::getGender,                      
            Collectors.averagingInt(Person::getAge)));

Parallelism

パラレル計算は、問題をサブ要素に分割して、同時(サブ要素を別々のスレッドで動作させるパラレルなやり方で)にサブ要素を解き、サブ要素の結果を最終結果にまとめる、ことを指します。Java SEはFork/Join frameworkを提供しており、アプリケーションでパラレル計算を容易に実装できます。しかし、このフレームワークでは、どのようにある問題を分割する(パーティーション化する)のかを定義しなければなりません。集約操作では、Javaランタイムがパーティショニングと結果の集約を実行します。

コレクションを使用するアプリケーションでパラレル化を実装するのが難しい理由の一つはコレクションが非スレッドセーフなことで、複数のスレッドがThread InterferenceMemory Consistency Errorsを引き起こさずにコレクションを操作できないことを意味します。Collections FrameworkはWrapper Implementationsを提供しており、任意のコレクションへ機械的に同期化を追加します。しかし、同期化はthread contentionを引き起こします。これはパラレルに実行するスレッドを妨害するのでthread contentionは避けたいところです。集約操作とパラレルストリーム(parallel streams)は、メソッド呼び出し中はコレクションを更新できない条件により、非スレッドセーフなコレクションでパラレル化の実装を可能にします。

注意点として、十分なデータ量とCPUコアがあるとしても、パラレル化は逐次的な処理よりも機械的に早くなるわけではありません。集約操作はパラレル化の実装を容易にしますが、アプリケーションがパラレル化に適しているかどうかを判断するのは依然として開発者の役割です。

このセクションでは以下のトピックについて説明します。

このセクションで説明に使用されるソースコードParallelismExamplesにあります。

Executing Streams in Parallel

ストリームはシリアルでもパラレルでも実行できます。ストリームがパラレルに実行されるとき、Javaランタイムはストリームを複数のサブストリームにパーティーション化します。集約操作はイテレートするとパラレルにサブストリームを処理し、結果を結合します。

ストリームを生成するとき、特に指定されない限りは常にシリアルなストリームになります。パラレルなストリームを生成するには、Collection.parallelStreamを呼び出します。もしくは、BaseStream.parallelを呼び出します。たとえば、以下のステートメントはパラレルに全男性メンバの平均年齢を計算します。

double average = roster
    .parallelStream()
    .filter(p -> p.getGender() == Person.Sex.MALE)
    .mapToInt(Person::getAge)
    .average()
    .getAsDouble();

Concurrent Reduction

再度、性別ごとにメンバをグループ化する以下の例(reductionセクションで解説)を考えてみます。この例はcollectを呼び出し、コレクションrosterMapに集約します。

Map<Person.Sex, List<Person>> byGender =
    roster
        .stream()
        .collect(
            Collectors.groupingBy(Person::getGender));

以下はパラレル化したものです。

ConcurrentMap<Person.Sex, List<Person>> byGender =
    roster
        .parallelStream()
        .collect(
            Collectors.groupingByConcurrent(Person::getGender));

これはconcurrent reductionと呼ばれています。Javaランタイムは、collectを含む個々のパイプラインが以下の条件をすべて満たす場合、concurrent reductionを実行します。

Note: このサンプルはMapではなくConcurrentMapを返し、groupingByではなくgroupingByConcurrentを呼び出します。(ConcurrentMapについてはConcurrent Collectionsセクションを参照してください。) groupingByConcurrentとは異なり、groupingByはパラレルストリームには不十分です。(その理由はキーで二つのマップをマージすることにあり、これはコンピュータ的には高コストな処理です。)同様に、パラレルストリームではCollectors.toMapよりも、Collectors.toConcurrentMapの方が適しています。

Ordering

パイプラインがストリームの要素を処理する順序は、ストリームがシリアルもしくはパラレルに実行されるかどうか・ストリームのソース・中間操作、に依存します。たとえば、ArrayListインスタンスforEachで要素を何回か表示する例を考えてみます。

Integer[] intArray = {1, 2, 3, 4, 5, 6, 7, 8 };
List<Integer> listOfIntegers =
    new ArrayList<>(Arrays.asList(intArray));

System.out.println("listOfIntegers:");
listOfIntegers
    .stream()
    .forEach(e -> System.out.print(e + " "));
System.out.println("");

System.out.println("listOfIntegers sorted in reverse order:");
Comparator<Integer> normal = Integer::compare;
Comparator<Integer> reversed = normal.reversed(); 
Collections.sort(listOfIntegers, reversed);  
listOfIntegers
    .stream()
    .forEach(e -> System.out.print(e + " "));
System.out.println("");
     
System.out.println("Parallel stream");
listOfIntegers
    .parallelStream()
    .forEach(e -> System.out.print(e + " "));
System.out.println("");
    
System.out.println("Another parallel stream:");
listOfIntegers
    .parallelStream()
    .forEach(e -> System.out.print(e + " "));
System.out.println("");
     
System.out.println("With forEachOrdered:");
listOfIntegers
    .parallelStream()
    .forEachOrdered(e -> System.out.print(e + " "));
System.out.println("");

この例は5つのパイプラインから構成されています。出力は以下のようになります。

listOfIntegers:
1 2 3 4 5 6 7 8
listOfIntegers sorted in reverse order:
8 7 6 5 4 3 2 1
Parallel stream:
3 4 1 6 2 5 7 8
Another parallel stream:
6 3 1 5 7 8 4 2
With forEachOrdered:
8 7 6 5 4 3 2 1

上記の例は以下のような処理をしています。

  • 一つ目のパイプラインlistOfIntegersをリストに要素が追加された順序で要素を表示します。
  • 二つ目のパイプラインCollections.sortメソッドでソートした後でlistOfIntegersの要素を表示しています。
  • 三つ目と四つ目のパイプラインは一見するとランダムな順序でリストの要素を表示しています。ストリームは要素を処理するとき内部イテレーションを使用することを思い出して下さい。それ故に、パラレルにストリームを実行するとき、Javaコンパイラとランタイムは、ストリームへの指定が特になければパラレルコンピューティングによる利点を最大化するために、ストリームの要素の処理順序を決定します。
  • 五つ目のパイプラインforEachOrderedメソッドを使用しており、ストリームをシリアルかパラレルで実行するかどうかにかかわらず、そのソースが指定する順序でストリームの要素を処理します。ただし、パラレルストリームでforEachOrderedのような操作をすることは、パラレル化の利点を損なう可能性があることに注意が必要です。

Side Effects

あるメソッドや式は副作用を持つことがあり、値を生成したり返すことに加えてコンピュータの状態を変更することがあります。例としては、mutable reductions(collectを使用する操作で詳細はReductionを参照)やデバッグ用のSystem.out.println呼び出しも該当します。JDKパイプラインにおける特定の副作用を処理します。特に、collectメソッドは最も一般的なストリーム操作を実行するために設計されており、parallel-safeな副作用を持っています。forEachpeekなどは副作用を起こす操作用に設計されており、たとえばvoidを返すラムダ式や、何も実行しないが副作用はあるSystem.out.printlnなどのことです。よって、注意してforEachpeekを使うべきです。もし、パラレルストリームでそれらの操作をしようする場合、Javaランタイムは引数で指定されるラムダ式複数スレッドでコンカレントに実行する可能性があります。加えて、filtermapのような操作で副作用を持つラムダ式を引数として渡せません。以降のセクションでinterferencestateful lambda expressionsについて説明します。これらは両方とも、特にパラレルストリームにおいて、副作用の原因となり、矛盾の発生(return inconsistent)や予測不可能な結果(unpredictable results)に繋がることがあります。とはいえ、まずはlazinessについて触れますが、その理由はこれが干渉(interference)に直接影響するためです。

Laziness

すべての中間操作はlazyです。値が要求時にだけ評価されるなら、式、メソッドアルゴリズムはlazyです。(アルゴリズムは、直ちに処理もしくは評価される場合にはeagerです) 終端操作が開始するまで中間操作はストリームの処理を開始しないので、中間操作はlazyです。lazyにストリームを処理することは、Javaコンパイラとランタイムがストリームの処理を最適化できるようになります。たとえば、Aggregate Operationsセクションで説明しているfilter-mapToInt-averageのようなパイプラインなどでは、filterから複数の要素を取得し、mapToIntが受け取って何個かの整数をまとめて、averageに渡すことができます。averageは、ストリームから要求される全要素を取得し終わるまでこのプロセスを繰り返し、平均を計算します。

Interference

ストリームにおけるラムダ式は衝突(interfere)すべきではありません。衝突が発生するのは、パイプラインがストリームを処理している時に、ストリームのソースが変更される場合です。たとえば、以下のコードはList listOfStringsの文字列結合を試行しますが、ConcurrentModifiedExceptionをスローします。

try {
    List<String> listOfStrings =
        new ArrayList<>(Arrays.asList("one", "two"));
         
    // 終端操作が開始された後にソースへ"three"文字列を追加しよう
    // とするので失敗する。
     
    String concatenatedString = listOfStrings
        .stream()
        
        // 下記は禁止! ここで衝突が発生する。
        .peek(s -> listOfStrings.add("three"))
        
        .reduce((a, b) -> a + " " + b)
        .get();
                 
    System.out.println("Concatenated string: " + concatenatedString);
         
} catch (Exception e) {
    System.out.println("Exception caught: " + e.toString());
}

この例はlistOfStringsに含まれる文字列を終端操作reduceOptional<String>に結合します。しかし、ここではパイプラインは中間操作peekを呼び出しており、listOfStringsに新しい要素を追加しようとしています。すべての中間操作はlazyなことを思い出して下さい。この例のパイプラインgetが呼び出される時に実行を開始し、getが完了するときに実行を終了します。peekの引数はパイプラインの実行中にストリームのソースを修正するので、JavaランタイムはConcurrentModifiedExceptionをスローします。

Stateful Lambda Expressions

ストリーム操作の引数ではstateful lambda expressionsの使用は避けて下さい。stateful lambda expressionsとは、結果がパイプライン実行中に変更される任意の状態に依存するコードのことです。以下のコードはList listOfIntegersからmap中間操作を経由して新規のListインスタンスに要素を追加しています。二回実行し、最初はシリアル、次はパラレルストリームで実行します。

List<Integer> serialStorage = new ArrayList<>();
     
System.out.println("Serial stream:");
listOfIntegers
    .stream()
    
    // 下記は禁止! stateful lambda expressionを使用している。
    .map(e -> { serialStorage.add(e); return e; })
    
    .forEachOrdered(e -> System.out.print(e + " "));
System.out.println("");
     
serialStorage
    .stream()
    .forEachOrdered(e -> System.out.print(e + " "));
System.out.println("");

System.out.println("Parallel stream:");
List<Integer> parallelStorage = Collections.synchronizedList(
    new ArrayList<>());
listOfIntegers
    .parallelStream()
    
    // 下記は禁止! stateful lambda expressionを使用している。
    .map(e -> { parallelStorage.add(e); return e; })
    
    .forEachOrdered(e -> System.out.print(e + " "));
System.out.println("");
     
parallelStorage
    .stream()
    .forEachOrdered(e -> System.out.print(e + " "));
System.out.println("");

ラムダ式e -> { parallelStorage.add(e); return e; }はstateful lambda expressionです。このコードは実行するたびに結果が変化します。結果は以下のようになります。

Serial stream:
8 7 6 5 4 3 2 1
8 7 6 5 4 3 2 1
Parallel stream:
8 7 6 5 4 3 2 1
1 3 6 2 4 5 8 7

forEachOrderedはストリームを指定された順序で要素を処理し、これはストリームがシリアルかパラレルかどうかは関係ありません。しかし、ストリームがパラレルに実行されるとき、mapJavaランタイムとコンパイラによって指定されるストリームの要素を処理します。その結果、ラムダ式e -> { parallelStorage.add(e); return e; }List parallelStorageに要素を追加する順序は、コードを実行するたびに変化します。決定論的かつ予想可能な結果のためには、ストリーム操作の引数のラムダ式は非ステートフルなことを守ってください。

Note: 上記の例は、 List parallelStorageをスレッドセーフにするために、synchronizedListメソッドを実行しています。コレクションは非スレッドセーフなことに注意してください。これは、複数スレッドは同時に特定のコレクションにアクセスすべきでないことを意味しています。いま、parallelStorageの生成時にsynchronizedListメソッドを使用しないと仮定します。

List<Integer> parallelStorage = new ArrayList<>();

上記の例は不規則な振る舞いになります。その理由は、個々のスレッドがListインスタンスにアクセスするとき、スケジューリングに同期化などの仕組み無しで、複数スレッドがparallelStorageから取得や更新をするためです。その結果、この例は以下のような出力になります。

Parallel stream:
8 7 6 5 4 3 2 1
null 3 5 4 7 8 1 2