Java8 Stream 教程学习
😄 @Auther: sizaif
📆 2021-05-29 20:37:59
🔗 转载翻译于 https://winterbe.com/posts/2014/07/31/java8-stream-tutorial-examples/
[TOC]
说明
这个示例驱动的教程深入概述了Java 8流。当我第一次读到Stream
API时,我对它的名字感到困惑,因为它听起来类似于Java I/O中的InputStream
和OutputStream
。但是Java 8流是完全不同的东西。Stream是Monads,因此在将函数式编程引入Java中扮演着重要的角色:
在函数式编程,
Monads
是一个结构,表示计算定义为一系列的步骤。一个具有单子结构的类型定义了将操作链起来的含义,或将该类型的函数嵌套在一起。
本指南教你如何使用Java 8流以及如何使用不同类型的可用流操作。您将了解处理顺序以及流操作的顺序如何影响运行时性能。更强大的流操作reduce
,collect
和flatMap
将详细介绍。本教程最后对并行流进行了深入研究。
流是如何工作的(How streams work)
一个流表示一个元素序列,并支持不同类型的操作来对这些元素执行计算:
List<String> myList =
Arrays.asList("a1", "a2", "b1", "c2", "c1");
myList
.stream()
.filter(s -> s.startsWith("c"))
.map(String::toUpperCase)
.sorted()
.forEach(System.out::println);
// C1
// C2
流操作可以是中间操作,也可以是终端操作。中间操作返回一个流,因此我们可以不使用分号链接多个中间操作.终端操作要么为空,要么返回非流结果.上面的过滤器示例中,map
和sorted
是中间操作,而forEach
是终端操作。有关所有可用流操作的完整列表,请参阅stream Javadoc。如上例中所示的这种流操作链也称为操作管道。
大多数流操作接受某种类型的lambda表达式参数,一个指定操作确切行为的函数接口。大多数操作必须是无干扰和无状态的。这是什么意思?
当一个函数不修改流的基础数据源时,它就是无干扰的,例如,在上面的例子中,没有一个lambda表达式
通过从集合中添加或删除元素来修改myList
。
当操作的执行是确定的时,函数就是无状态 的,例如,在上面的例子中,没有lambda
表达式依赖于在执行过程中可能改变的外部作用域的任何可变变量或状态。
不同类型的流(Different kind of streams)
可以从各种数据源创建流,特别是集合。列表和集合支持新的方法stream()
和parallelStream()
来创建顺序流或并行流。并行流能够在多个线程上操作,本教程的后面一节将对此进行介绍。我们现在主要关注顺序流:
Arrays.asList("a1", "a2", "a3")
.stream()
.findFirst()
.ifPresent(System.out::println); // a1
在对象列表上调用方法stream()
将返回一个常规对象流。但我们不需要创建集合来处理流,就像我们在下一个代码示例中看到的:
Stream.of("a1", "a2", "a3")
.findFirst()
.ifPresent(System.out::println); // a1
只需使用stream .of()
从一堆对象引用创建一个流。
除了常规的对象流之外,Java 8
还提供了特殊类型的流,用于处理基本数据类型int
、long
和double
。你可能已经猜到是IntStream
, LongStream
和DoubleStream
。
IntStreams
可以使用IntStream.range()
替换常规的for循环
:
IntStream.range(1, 4)
.forEach(System.out::println);
// 1
// 2
// 3
所有这些基元流都像普通的对象流一样工作,有以下不同:基元流使用特化的lambda表达式
,例如用IntFunction
代替Function
,用IntPredicate
代替Predicate
。原始流支持附加的终端聚合操作sum()
和average()
:
Arrays.stream(new int[] {1, 2, 3})
.map(n -> 2 * n + 1)
.average()
.ifPresent(System.out::println); // 5.0
有时将常规对象流转换为基本流很有用,反之亦然。为此,对象流支持特殊的映射操作mapToInt()
、mapToLong()
和mapToDouble
:
Stream.of("a1", "a2", "a3")
.map(s -> s.substring(1))
.mapToInt(Integer::parseInt)
.max()
.ifPresent(System.out::println); // 3
原始流可以通过mapToObj()转换为对象流:
IntStream.range(1, 4)
.mapToObj(i -> "a" + i)
.forEach(System.out::println);
// a1
// a2
// a3
下面是一个组合的例子:double类型
的流首先映射到int类型
的流,然后再映射到string类型
的对象流:
Stream.of(1.0, 2.0, 3.0)
.mapToInt(Double::intValue)
.mapToObj(i -> "a" + i)
.forEach(System.out::println);
// a1
// a2
// a3
处理次序(Processing Order)
现在,我们已经学习了如何创建和使用不同类型的流,让我们深入了解如何在幕后处理流操作。中间操作的一个重要特征是惰性。看看这个例子,这里缺少了一个终端操作:
Stream.of("d2", "a2", "b1", "b3", "c")
.filter(s -> {
System.out.println("filter: " + s);
return true;
});
在执行此代码片段时,不会将任何内容打印到控制台。这是因为只有在存在终端操作时才会执行中间操作。
让我们通过终端操作forEach来扩展上面的示例:
Stream.of("d2", "a2", "b1", "b3", "c")
.filter(s -> {
System.out.println("filter: " + s);
return true;
})
.forEach(s -> System.out.println("forEach: " + s));
执行此代码片段会在控制台上产生所需的输出:
filter: d2
forEach: d2
filter: a2
forEach: a2
filter: b1
forEach: b1
filter: b3
forEach: b3
filter: c
forEach: c
结果的顺序可能会令人惊讶。一种简单的方法是在流的所有元素上一个接一个地水平执行操作.但是每个元素都沿着链垂直移动。第一个字符串“d2”
传递filter
然后forEach
,只有这样第二个字符串“a2”
才会被处理。
这种行为可以减少对每个元素执行的实际操作数,在下一个例子中我们可以看到:
Stream.of("d2", "a2", "b1", "b3", "c")
.map(s -> {
System.out.println("map: " + s);
return s.toUpperCase();
})
.anyMatch(s -> {
System.out.println("anyMatch: " + s);
return s.startsWith("A");
});
// map: d2
// anyMatch: D2
// map: a2
// anyMatch: A2
只要谓词应用于给定的输入元素,anyMatch
操作就返回true
。对于传递给“A2”
的第二个元素是这样的。由于流链是垂直执行的,所以在这种情况下只需要执行两次map
。因此,不是映射流的所有元素,而是尽可能少地调用map。
Why order matters
下一个示例包含两个中间操作map
和filter
以及终端操作forEach
。让我们再次检查这些操作是如何执行的:
Stream.of("d2", "a2", "b1", "b3", "c")
.map(s -> {
System.out.println("map: " + s);
return s.toUpperCase();
})
.filter(s -> {
System.out.println("filter: " + s);
return s.startsWith("A");
})
.forEach(s -> System.out.println("forEach: " + s));
// map: d2
// filter: D2
// map: a2
// filter: A2
// forEach: A2
// map: b1
// filter: B1
// map: b3
// filter: B3
// map: c
// filter: C
您可能已经猜到,对于底层集合中的每个字符串,map
和filter
都被调用5次,而forEach
只被调用一次。
如果我们改变操作的顺序,将filter
移到链的开头,就可以大大减少实际执行的次数:
Stream.of("d2", "a2", "b1", "b3", "c")
.filter(s -> {
System.out.println("filter: " + s);
return s.startsWith("a");
})
.map(s -> {
System.out.println("map: " + s);
return s.toUpperCase();
})
.forEach(s -> System.out.println("forEach: " + s));
// filter: d2
// filter: a2
// map: a2
// forEach: A2
// filter: b1
// filter: b3
// filter: c
现在,map
只被调用一次,所以对于大量输入元素,操作管道的执行速度要快得多。在编写复杂的方法链时,请记住这一点。
让我们通过一个额外的操作来扩展上面的例子,sorted
:
Stream.of("d2", "a2", "b1", "b3", "c")
.sorted((s1, s2) -> {
System.out.printf("sort: %s; %s\n", s1, s2);
return s1.compareTo(s2);
})
.filter(s -> {
System.out.println("filter: " + s);
return s.startsWith("a");
})
.map(s -> {
System.out.println("map: " + s);
return s.toUpperCase();
})
.forEach(s -> System.out.println("forEach: " + s));
排序是一种特殊的中间操作。这是一个所谓的有状态操作,因为为了对元素集合排序,您必须在排序期间保持状态。
执行这个示例会得到以下控制台输出:
sort: a2; d2
sort: b1; a2
sort: b1; d2
sort: b1; a2
sort: b3; b1
sort: b3; d2
sort: c; b3
sort: c; d2
filter: a2
map: a2
forEach: A2
filter: b1
filter: b3
filter: c
filter: d2
首先,对整个输入集合执行排序操作。换句话说,排序
是水平执行的。所以在这种情况下,对于输入集合中的每个元素的多个组合,sorted被调用8次。
再一次,我们可以通过重新排序链来优化性能:
Stream.of("d2", "a2", "b1", "b3", "c")
.filter(s -> {
System.out.println("filter: " + s);
return s.startsWith("a");
})
.sorted((s1, s2) -> {
System.out.printf("sort: %s; %s\n", s1, s2);
return s1.compareTo(s2);
})
.map(s -> {
System.out.println("map: " + s);
return s.toUpperCase();
})
.forEach(s -> System.out.println("forEach: " + s));
// filter: d2
// filter: a2
// filter: b1
// filter: b3
// filter: c
// map: a2
// forEach: A2
在本例中,从未调用sorted,因为filter将输入集合减少到只有一个元素。因此,对于更大的输入集合,性能会大大提高
复用流(Reusing Streams)
Java 8流不能被重用。当你调用任何终端操作时,流就会被关闭:
Stream<String> stream =
Stream.of("d2", "a2", "b1", "b3", "c")
.filter(s -> s.startsWith("a"));
stream.anyMatch(s -> true); // ok
stream.noneMatch(s -> true); // exception
在同一个流的anyMatch
之后调用nonmatch会
导致以下异常:
java.lang.IllegalStateException: stream has already been operated upon or closed
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:229)
at java.util.stream.ReferencePipeline.noneMatch(ReferencePipeline.java:459)
at com.winterbe.java8.Streams5.test7(Streams5.java:38)
at com.winterbe.java8.Streams5.main(Streams5.java:28)
为了克服这个限制,我们必须为想要执行的每个终端操作创建一个新的流链,例如,我们可以创建一个流供应商来构造一个所有中间操作都已经设置好的新流:
Supplier<Stream<String>> streamSupplier = () -> Stream.of("d2", "a2", "b1", "b3", "c") .filter(s -> s.startsWith("a"));streamSupplier.get().anyMatch(s -> true); // okstreamSupplier.get().noneMatch(s -> true); // ok
对get()
的每次调用都会构造一个新流,我们将在其上保存以调用所需的终端操作。
高级操作(Advanced Operations)
流支持许多不同的操作。我们已经学习了最重要的操作,如filter
或map
。我把所有其他可用的操作留给您去发现(请参阅Stream Javadoc)。相反,让我们深入研究更复杂的操作收集、flatMap
和reduce
。
本节中的大多数代码示例使用以下人员列表进行演示:
class Person {
String name;
int age;
Person(String name, int age) {
this.name = name;
this.age = age;
}
@Override
public String toString() {
return name;
}
}
List<Person> persons =
Arrays.asList(
new Person("Max", 18),
new Person("Peter", 23),
new Person("Pamela", 23),
new Person("David", 12));
Collect
Collect是一个非常有用的终端操作,可以将流的元素转换为另一种结果,例如: list
,map
,set
.Collect接受Collector
,收集器由四种不同的操作组成:供应商(supplier)、累加器( accumulator)、合并器(combiner)和收尾器(finisher)。乍一听,这听起来非常复杂,但好的部分是Java 8通过collector类支持各种内置收集器。所以对于最常见的操作,你不需要自己实现收集器。
让我们从一个非常常见的用例开始:
List<Person> filtered = persons .stream() .filter(p -> p.name.startsWith("P")) .collect(Collectors.toList());System.out.println(filtered); // [Peter, Pamela]
如您所见,从流的元素构造一个列表非常简单。需要一个集合而不是列表-只需使用collections . toset()。
下一个例子将所有人按年龄分组:
Map<Integer, List<Person>> personsByAge = persons
.stream()
.collect(Collectors.groupingBy(p -> p.age));
personsByAge
.forEach((age, p) -> System.out.format("age %s: %s\n", age, p));
// age 18: [Max]
// age 23: [Peter, Pamela]
// age 12: [David]
Collectors
是非常多才多艺的。你也可以创建流元素的聚合,
例如确定所有人的平均年龄:
Double averageAge = persons
.stream()
.collect(Collectors.averagingInt(p -> p.age));
System.out.println(averageAge); // 19.0
如果您对更全面的统计信息感兴趣,汇总收集器将返回一个特殊的内置汇总统计信息对象。因此,我们可以简单地确定人员的最小年龄,最大年龄和算术平均年龄以及总和和计数。
IntSummaryStatistics ageSummary = persons .stream() .collect(Collectors.summarizingInt(p -> p.age));System.out.println(ageSummary);// IntSummaryStatistics{count=4, sum=76, min=12, average=19.000000, max=23}
下一个例子将所有人连接到一个字符串中:
String phrase = persons .stream() .filter(p -> p.age >= 18) .map(p -> p.name) .collect(Collectors.joining(" and ", "In Germany ", " are of legal age."));System.out.println(phrase);// In Germany Max and Peter and Pamela are of legal age.
连接收集器接受分隔符以及可选的前缀和后缀。
为了将流元素转换为映射,我们必须指定键和值应该如何映射。请记住,映射的键必须是唯一的,否则抛出IllegalStateException
。您可以选择将merge
函数作为附加参数传递,以绕过异常:
Map<Integer, String> map = persons .stream() .collect(Collectors.toMap( p -> p.age, p -> p.name, (name1, name2) -> name1 + ";" + name2));System.out.println(map);// {18=Max, 23=Peter;Pamela, 12=David}
现在我们已经了解了一些最强大的内置收集器,让我们尝试构建自己的特殊收集器。我们想要将流中的所有人转换为单个字符串,该字符串由以|
管道字符分隔的所有大写字母组成。
为了实现这一点,我们通过collector .of()
创建一个新的收集器。我们必须传递收集器的四个组成部分:供应商(supplier)、累加器( accumulator)、合并器(combiner)和收尾器(finisher)
Collector<Person, StringJoiner, String> personNameCollector = Collector.of( () -> new StringJoiner(" | "), // supplier (j, p) -> j.add(p.name.toUpperCase()), // accumulator (j1, j2) -> j1.merge(j2), // combiner StringJoiner::toString); // finisherString names = persons .stream() .collect(personNameCollector);System.out.println(names); // MAX | PETER | PAMELA | DAVID
由于Java中的字符串是不可变的,我们需要一个像StringJoiner
这样的helper类
来让收集器构造我们的字符串。提供者最初构造这样一个带有适当分隔符的StringJoiner
。累加器用于将每个人的大写名称添加到StringJoiner
中。组合器知道如何将两个StringJoiners
合并为一个。在最后一步中,完成器从StringJoiner构造所需的字符串。
FlatMap
我们已经学习了如何利用map
操作将流的对象转换为另一种类型的对象。Map
有一定的局限性,因为每个对象只能映射到另一个对象。但是,如果我们想将一个对象转换为多个其他对象,或者根本不转换该怎么办呢?这就是flatMap
的救星。
FlatMap
将流的每个元素转换为其他对象的流。因此,每个对象将被转换为0个、一个或多个由流支持的其他对象。这些流的内容将被放置到flatMap
操作返回的流中。
在我们看到flatMap的实际应用之前,我们需要一个适当的类型层次结构:
class Foo {
String name;
List<Bar> bars = new ArrayList<>();
Foo(String name) {
this.name = name;
}
}
class Bar {
String name;
Bar(String name) {
this.name = name;
}
}
我们利用我们关于流的知识来实例化几个对象:
List<Foo> foos = new ArrayList<>();
// create foos
IntStream
.range(1, 4)
.forEach(i -> foos.add(new Foo("Foo" + i)));
// create bars
foos.forEach(f ->
IntStream
.range(1, 4)
.forEach(i -> f.bars.add(new Bar("Bar" + i + " <- " + f.name))));
现在我们有一个包含三个foo
的列表,每个foo
由三个bar
组成。
FlatMap
接受一个必须返回对象流的函数。因此,为了解析每个foo的bar对象,我们只需传递适当的函数:
foos.stream()
.flatMap(f -> f.bars.stream())
.forEach(b -> System.out.println(b.name));
// Bar1 <- Foo1
// Bar2 <- Foo1
// Bar3 <- Foo1
// Bar1 <- Foo2
// Bar2 <- Foo2
// Bar3 <- Foo2
// Bar1 <- Foo3
// Bar2 <- Foo3
// Bar3 <- Foo3
正如您所看到的,我们已经成功地将三个foo对象的流转换为九个bar对象的流。
最后,上面的代码示例可以简化为一个流操作的单一管道:
IntStream.range(1, 4)
.mapToObj(i -> new Foo("Foo" + i))
.peek(f -> IntStream.range(1, 4)
.mapToObj(i -> new Bar("Bar" + i + " <- " f.name))
.forEach(f.bars::add))
.flatMap(f -> f.bars.stream())
.forEach(b -> System.out.println(b.name));
FlatMap
也适用于Java 8中引入的Optional
类。flatMap
操作返回另一种类型的可选对象。因此可以利用它来防止讨厌的空检查。
想想这样一个高度分层的结构:
class Outer { Nested nested;}class Nested { Inner inner;}class Inner { String foo;}
为了解析外部实例的内部字符串foo
,你必须添加多个空检查来防止可能的nullpointerexception
:
Outer outer = new Outer();if (outer != null && outer.nested != null && outer.nested.inner != null) { System.out.println(outer.nested.inner.foo);}
同样的行为可以通过使用可选的flatMap
操作获得:
Optional.of(new Outer()) .flatMap(o -> Optional.ofNullable(o.nested)) .flatMap(n -> Optional.ofNullable(n.inner)) .flatMap(i -> Optional.ofNullable(i.foo)) .ifPresent(System.out::println);
对flatMap的每次调用返回一个可选的包装(如果存在)所需对象,如果不存在则返回null。
Reduce
reduce
操作将流的所有元素合并为一个结果。Java 8支持三种不同的reduce方法。第一种方法将元素流缩减为流中的一个元素。
让我们看看如何使用这个方法来确定最年长的人:
persons
.stream()
.reduce((p1, p2) -> p1.age > p2.age ? p1 : p2)
.ifPresent(System.out::println); // Pamela
reduce
方法接受一个BinaryOperator
累加器函数。这实际上是一个BiFunction
,其中两个操作数共享同一类型,在这里是Person
。BiFunctions
类似于Function
,但接受两个参数。示例函数比较两个人的年龄,以便返回具有最大年龄的人。
第二个reduce方法同时接受标识值和BinaryOperator
累加器。该方法可用于构造一个新的Person,
该Person具有来自流中所有其他人员的聚合名称和年龄:
Person result =
persons
.stream()
.reduce(new Person("", 0), (p1, p2) -> {
p1.age += p2.age;
p1.name += p2.name;
return p1;
});
System.out.format("name=%s; age=%s", result.name, result.age);
// name=MaxPeterPamelaDavid; age=76
第三个reduce方法接受三个参数:一个标识值
、一个BiFunction累加器
和一个BinaryOperator类型
的组合函数。因为身份值类型不限于Person类型,我们可以利用这个约简来确定所有人的年龄总和:
Integer ageSum = persons .stream() .reduce(0, (sum, p) -> sum += p.age, (sum1, sum2) -> sum1 + sum2);System.out.println(ageSum); // 76
正如你所看到的,结果是76,但在引擎盖下到底发生了什么?让我们通过一些调试输出来扩展上面的代码:
Integer ageSum = persons .stream() .reduce(0, (sum, p) -> { System.out.format("accumulator: sum=%s; person=%s\n", sum, p); return sum += p.age; }, (sum1, sum2) -> { System.out.format("combiner: sum1=%s; sum2=%s\n", sum1, sum2); return sum1 + sum2; });// accumulator: sum=0; person=Max// accumulator: sum=18; person=Peter// accumulator: sum=41; person=Pamela// accumulator: sum=64; person=David
正如你所看到的,累加器函数完成了所有的工作。它首先被调用,初始值为0,第一个人是Max。在接下来的三个步骤中,总和随着最后一步的年龄不断增加,直到76岁。
Wait what?合成器(combiner)从来没有被调用?并行执行同一个流将解除秘密:
Integer ageSum = persons
.parallelStream()
.reduce(0,
(sum, p) -> {
System.out.format("accumulator: sum=%s; person=%s\n", sum, p);
return sum += p.age;
},
(sum1, sum2) -> {
System.out.format("combiner: sum1=%s; sum2=%s\n", sum1, sum2);
return sum1 + sum2;
});
// accumulator: sum=0; person=Pamela
// accumulator: sum=0; person=David
// accumulator: sum=0; person=Max
// accumulator: sum=0; person=Peter
// combiner: sum1=18; sum2=23
// combiner: sum1=23; sum2=12
// combiner: sum1=41; sum2=35
并行执行此流会导致完全不同的执行行为。现在合并器被调用了。由于累加器是并行调用的,因此需要合并器将单独累加的值相加。
让我们在下一章更深入地探讨并行流。
并行流(Parallel Streams)
流可以并行执行,以提高大量输入元素的运行时性能。并行流使用通过静态ForkJoinPool. commonpool()
方法可用的公共ForkJoinPool
。底层线程池的大小最多使用5个线程—取决于可用的物理CPU内核的数量:
ForkJoinPool commonPool = ForkJoinPool.commonPool();
System.out.println(commonPool.getParallelism()); // 3
在我的机器上,公共池初始化时的并行度默认为3。这个值可以通过设置以下JVM参数来增加或减少:
-Djava.util.concurrent.ForkJoinPool.common.parallelism=5
集合支持parallelStream()
方法来创建元素的并行流。或者,您可以在给定流上调用中间方法parallel()
来将顺序流转换为并行对应流。
为了少描述并行流的并行执行行为,下面的例子将当前线程的信息打印到sout:
Arrays.asList("a1", "a2", "b1", "c2", "c1") .parallelStream() .filter(s -> { System.out.format("filter: %s [%s]\n", s, Thread.currentThread().getName()); return true; }) .map(s -> { System.out.format("map: %s [%s]\n", s, Thread.currentThread().getName()); return s.toUpperCase(); }) .forEach(s -> System.out.format("forEach: %s [%s]\n", s, Thread.currentThread().getName()));
通过研究调试输出,我们应该更好地理解哪些线程实际用于执行流操作:
filter: b1 [main]
filter: a2 [ForkJoinPool.commonPool-worker-1]
map: a2 [ForkJoinPool.commonPool-worker-1]
filter: c2 [ForkJoinPool.commonPool-worker-3]
map: c2 [ForkJoinPool.commonPool-worker-3]
filter: c1 [ForkJoinPool.commonPool-worker-2]
map: c1 [ForkJoinPool.commonPool-worker-2]
forEach: C2 [ForkJoinPool.commonPool-worker-3]
forEach: A2 [ForkJoinPool.commonPool-worker-1]
map: b1 [main]
forEach: B1 [main]
filter: a1 [ForkJoinPool.commonPool-worker-3]
map: a1 [ForkJoinPool.commonPool-worker-3]
forEach: A1 [ForkJoinPool.commonPool-worker-3]
forEach: C1 [ForkJoinPool.commonPool-worker-2]
正如您可以看到的,并行流利用了公共ForkJoinPool
中的所有可用线程来执行流操作。连续运行时的输出可能不同,因为实际使用的特定线程的行为是不确定的。
让我们通过一个额外的流操作——sort
来扩展这个示例:
Arrays.asList("a1", "a2", "b1", "c2", "c1")
.parallelStream()
.filter(s -> {
System.out.format("filter: %s [%s]\n",
s, Thread.currentThread().getName());
return true;
})
.map(s -> {
System.out.format("map: %s [%s]\n",
s, Thread.currentThread().getName());
return s.toUpperCase();
})
.sorted((s1, s2) -> {
System.out.format("sort: %s <> %s [%s]\n",
s1, s2, Thread.currentThread().getName());
return s1.compareTo(s2);
})
.forEach(s -> System.out.format("forEach: %s [%s]\n",
s, Thread.currentThread().getName()));
乍一看,结果可能很奇怪:
filter: c2 [ForkJoinPool.commonPool-worker-3]filter: c1 [ForkJoinPool.commonPool-worker-2]map: c1 [ForkJoinPool.commonPool-worker-2]filter: a2 [ForkJoinPool.commonPool-worker-1]map: a2 [ForkJoinPool.commonPool-worker-1]filter: b1 [main]map: b1 [main]filter: a1 [ForkJoinPool.commonPool-worker-2]map: a1 [ForkJoinPool.commonPool-worker-2]map: c2 [ForkJoinPool.commonPool-worker-3]sort: A2 <> A1 [main]sort: B1 <> A2 [main]sort: C2 <> B1 [main]sort: C1 <> C2 [main]sort: C1 <> B1 [main]sort: C1 <> C2 [main]forEach: A1 [ForkJoinPool.commonPool-worker-1]forEach: C2 [ForkJoinPool.commonPool-worker-3]forEach: B1 [main]forEach: A2 [ForkJoinPool.commonPool-worker-2]forEach: C1 [ForkJoinPool.commonPool-worker-1]
排序似乎只在主线程上顺序执行。实际上,在并行流上排序使用了新的Java 8方法Arrays.parallelSort()
。如Javadoc中所述,如果排序是顺序执行还是并行执行,该方法决定数组的长度:
如果指定数组的长度小于最小粒度,则使用适当的
Arrays
对其进行排序。排序方法。
回到上一节的reduce
示例。我们已经发现,combiner
函数只在并行中调用,而不是在顺序流中调用。让我们看看哪些线程真正涉及:
List<Person> persons = Arrays.asList(
new Person("Max", 18),
new Person("Peter", 23),
new Person("Pamela", 23),
new Person("David", 12));
persons
.parallelStream()
.reduce(0,
(sum, p) -> {
System.out.format("accumulator: sum=%s; person=%s [%s]\n",
sum, p, Thread.currentThread().getName());
return sum += p.age;
},
(sum1, sum2) -> {
System.out.format("combiner: sum1=%s; sum2=%s [%s]\n",
sum1, sum2, Thread.currentThread().getName());
return sum1 + sum2;
});
控制台输出显示累加器和组合器函数在所有可用线程上并行执行:
accumulator: sum=0; person=Pamela; [main]
accumulator: sum=0; person=Max; [ForkJoinPool.commonPool-worker-3]
accumulator: sum=0; person=David; [ForkJoinPool.commonPool-worker-2]
accumulator: sum=0; person=Peter; [ForkJoinPool.commonPool-worker-1]
combiner: sum1=18; sum2=23; [ForkJoinPool.commonPool-worker-1]
combiner: sum1=23; sum2=12; [ForkJoinPool.commonPool-worker-2]
combiner: sum1=41; sum2=35; [ForkJoinPool.commonPool-worker-2]
总之,可以这样说,并行流可以为具有大量输入元素的流带来很好的性能提升。但是请记住,像reduce
和collect
这样的并行流操作需要额外的计算(合并操作),这在顺序执行时是不需要的。
此外,我们还了解到所有并行流操作共享同一个jvm范围的公共ForkJoinPool
。因此,您可能希望避免实现缓慢阻塞的流操作,因为这可能会降低应用程序中严重依赖并行流的其他部分的速度。