かまたま日記3

プログラミングメイン、たまに日常

Parallel Streamの並列数を調整する

Streamの parallel メソッドを呼ぶとストリームの処理を並列に実行できますが、これは内部的には前回紹介したForkJoinPoolが使われています。ForkJoinPoolは内部でcommon poolと呼ばれる共通プールを持っており、明示的にPoolを指定しない ForkJoinTask#invoke などはこのプールが利用されるようです。

public static void main(String[] args) {
    IntStream.range(0, 16).parallel()
            .forEach(i -> System.out.println(Thread.currentThread().getName() + ": " + i));
}

自分のマシン (MacBook Pro 15 inch) ではプールのサイズは7でした。実装を見るとデフォルトでは "コア数-1" になるようです。

main: 10
ForkJoinPool.commonPool-worker-3: 2
main: 11
ForkJoinPool.commonPool-worker-3: 3
ForkJoinPool.commonPool-worker-3: 1
ForkJoinPool.commonPool-worker-4: 13
ForkJoinPool.commonPool-worker-4: 12
ForkJoinPool.commonPool-worker-1: 5
ForkJoinPool.commonPool-worker-1: 8
ForkJoinPool.commonPool-worker-4: 15
ForkJoinPool.commonPool-worker-3: 6
ForkJoinPool.commonPool-worker-6: 0
main: 9
ForkJoinPool.commonPool-worker-7: 4
ForkJoinPool.commonPool-worker-2: 14
ForkJoinPool.commonPool-worker-5: 7

これは、システムプロパティ -Djava.util.concurrent.ForkJoinPool.common.parallelism で変更可能で、試しに2にしてみると、こんな感じの出力になります。

main: 10
main: 11
ForkJoinPool.commonPool-worker-1: 4
ForkJoinPool.commonPool-worker-1: 5
ForkJoinPool.commonPool-worker-0: 2
ForkJoinPool.commonPool-worker-1: 6
ForkJoinPool.commonPool-worker-1: 7
main: 8
main: 9
ForkJoinPool.commonPool-worker-1: 0
ForkJoinPool.commonPool-worker-1: 1
ForkJoinPool.commonPool-worker-0: 3
ForkJoinPool.commonPool-worker-1: 12
ForkJoinPool.commonPool-worker-1: 13
main: 14
main: 15

また、ParallelStreamで自前のForkJoinPoolを使いたい場合は、少しトリッキーですが、ストリーム処理自体をRunnableでラップしてForkJoinPoolに渡すと言った方法が使えるようです。 参考リンク

public static void main(String[] args) throws Exception {
    ForkJoinPool pool = new ForkJoinPool(2);
    pool.submit(() -> IntStream.range(0, 16).parallel()
            .forEach(i -> System.out.println(Thread.currentThread().getName() + ": " + i))
    ).get();
}
ForkJoinPool-1-worker-1: 10
ForkJoinPool-1-worker-0: 5
ForkJoinPool-1-worker-0: 4
ForkJoinPool-1-worker-0: 7
ForkJoinPool-1-worker-1: 11
ForkJoinPool-1-worker-1: 9
ForkJoinPool-1-worker-0: 6
ForkJoinPool-1-worker-0: 2
ForkJoinPool-1-worker-1: 8
ForkJoinPool-1-worker-1: 14
ForkJoinPool-1-worker-1: 15
ForkJoinPool-1-worker-0: 3
ForkJoinPool-1-worker-0: 1
ForkJoinPool-1-worker-1: 13
ForkJoinPool-1-worker-1: 12
ForkJoinPool-1-worker-0: 0