前文
我们看到 Java 8 Collectors提供了很多内置实现。但有时我们需要实现一些特定功能满足业务需要,本文带你学习如何自定义Collector的实现,计算字符串流中所有单词的长度。
假设有字符串流,利用每个字符串对象有方法length()————计算并返回单词长度。我们想创建自定义Collector,实现reduce操作,计算流中所有单词的长度之和。
为了创建自定义Collector,需要实现Collector接口。现在,我们不使用传统方法,而是使用Collector.of()静态方法创建自定义Collector。
不仅是为了更精简和增强可读性,还因为这种方法可以忽略部分不必要的实现。实际上,Collector接口仅需要三个必须部分————提供者(supplier), 累加器(accumulator) 以及合并器(combiner)。
实现Collector,必须提供结果容器,即累加值存储的地方。下面代码提供了结果容器:
() -> new int[1]
你可能会想,为什么是 new int[1],而不是int变量作为初始化值。原因是Collector接口需要结果容器能够以可变的方式进行更新。
接下来,我们需要创建函数实现增加元素至结果容器。在我们的示例中,即单词的长度增加至结果容器:
(result, item) -> result[0] += item.length()
该函数是Consumer类型,其不返回任何值,仅以可变的方式更新结果容器————即数组中的第一个元素。
在reduction序列操作中,提供者(supplier) 和 累加器(accumulator) 已经足够了,但为了能够实现并行操作,我们需要实现一个合并器。合并器(combiner)是定义两个结果如何合并的函数。
在并行环境下,流被分为多个部分,每个部分被并行累加。当所有部分都完成时,结果需要使用合并器函数进行合并。下面请看我们的实现代码:
(result1, result2) -> {
result1[0] += result2[0];
return result1;
现在我们已经所有必要组件,整合在一起就是我们的Collector:
wordStream.collect(Collector.of(
()-> new int[1],
(result, item) -> result[0] += item.length(),
(result1, result2) -> {
result1[0] += result2[0];
return result1;
上面方案有个小问题,其直接返回结果容器即int[]类型。实际我们需要的字符串长度,不是结果容器。
我们可以很容易实现,增加一个函数,其映射结果容器至我们需要的类型。这里我们仅仅需要数组的第一个元素:
total -> total[0]
最后完整代码为:
private List<String> wordList = Arrays.asList("tommy", "is", "a", "java", "developer");
@Test
public void wordCountTest() {
Stream<String> wordStream = wordList.stream();
int wordCnt = wordStream.collect(Collector.of(
()-> new int[1],
(result, item) -> result[0] += item.length(),
(result1, result2) -> {
result1[0] += result2[0];
return result1;
total -> total[0]
System.out.println("wordCnt = " + wordCnt);
如果把我们自定义的Collector赋值给变量,则代码可以简化为:
int wordCount = wordStream.collect(totalWordCountCollector);
最后,我们看下优化参数。即自定义Collector支持不同类型的优化参数。
使用Collector.of()
可以在参数最后增加 Characteristics 作为可变参数:
Collector.of(
// supplier,
// accumulator,
// combiner,
// finisher,
Collector.Characteristics.CONCURRENT,
Collector.Characteristics.IDENTITY_FINISH,
// ...
有三种 Characteristics 可以使用:
- CONCURRENT — 指明结果容器可以被多个并发累加器使用
- IDENTITY_FINISH — 指明结束函数是恒等函数且可以被忽略
- UNORDERED — 指明collector不依赖元素顺序
本文我们学习了如何自定义Java 8 自定义 流的 Collector实现.更多内容可以参考官方文档。
Java 8 自定义 流的 Collector实现前文我们看到 Java 8 Collectors提供了很多内置实现。但有时我们需要实现一些特定功能满足业务需要,本文带你学习如何自定义Collector的实现,计算字符串流中所有单词的长度。需求说明加入我们的字符串流,每个字符串对象有方法length()————返回单词长度。我们想创建自定义Collector,实现reduce操作,计算流中所...
怎么在java中创建一个自定义的collector
在之前的java collectors文章里面,我们讲到了stream的collect方法可以调用Collectors里面的toList()或者toMap()方法,将结果转换为特定的集合类。
今天我们介绍一下怎么自定义一个Collector。
Collector介绍
我们先看一下Collector的定义:
Collector接口需要实现supplier(),accumulator(),combiner(),finisher(),characteristics()这5个接口
collector
collector的简单场景,一般是在流处理完毕,想要收集对象的收尾工作。代码如下。
List<String> list = Stream.of("kimmy", "robin", "lisa", "lulu", "mike", "jimmy")
.collect(Collectors.toList());
这个收集器主要是把 流水...
Collector.of(() -> {
Pagination<T> pagination = new Pagination<>();
pagination.setPage(page);
pagination.setSize(size);
collect:收集器
Collector:收集器的方法
Collector:是一个接口,一种可变的汇聚操作,它将输入元素累积到可变结果容器中。在处理完所有输入元素后,可以选择将累积的结果转换为最终形式(这是一种可选操作),支持串行,并行操作
Collectors本身提供了关于Collector的常见汇聚实现,Collectors本身实际上是一个工厂
为了确保串行与并行操作结果的的等价性,Collector函数需要满足两个条件:identity(同一性)与associativi
collect(toList())方法由Stream里的值生成一个列表,是一个及早求值的操作。
Stream的of方法使用一个初始值生成新的Stream。事实上,collect的使用方法不仅限于此,它是一个非常通用的强大结构。
下面我们看一下用法:
运行结果如上图。
由于很多Stream操作都是惰性求值,因此调用Stream上的一...
Stream 的核心在于Collectors,即对处理后的数据进行收集。Collectors 提供了非常多且强大的API,可以将最终的数据收集成List、Set、Map,甚至是更复杂的结构(这三者的嵌套组合)。
Collectors 提供了很多API,有很多都是一些函数的重载,这里我个人将其分为三大类,如下:
数据收集:set、map、list
聚合归约:统计、求和、最值、平均、字符串拼接、规约
前后处理:分区、分组、自定义操作
API 使用
这里会讲到一些常用API 的用
你好,我是看山。
Java8 应该算是业界主版本了,版本中重要性很高的一个更新是Stream流处理。关于流处理内容比较多,本文主要是说一下Stream中的Collectors工具类的使用。
Collectors是java.util.stream包下的一个工具类,其中各个方法的返回值可以作为java.util.stream.Stream#collect的入参,实现对队列的各种操作,包括:分组、聚合等。官方文档给出一些例子:
Implementations of {@link Collector} tha.
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.Path;
import org.apache.flink.util.Collector;
public class FlinkHDFSExample {
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> text = env.readTextFile("hdfs://localhost:9000/path/to/directory1/,hdfs://localhost:9000/path/to/directory2/")
.withParameters(new Configuration().setBoolean("recursive.file.enumeration", true));
DataSet<Tuple2<String, Integer>> counts =
text.flatMap(new Tokenizer())
.groupBy(0)
.sum(1);
counts.print();
public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
String[] tokens = value.toLowerCase().split("\\W+");
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<>(token, 1));
这个例子使用Flink的`readTextFile`方法读取HDFS下的多个目录中的文件,并使用`Tokenizer`函数对文件进行分词,最后统计每个单词出现的次数。注意,需要在`readTextFile`方法中设置`recursive.file.enumeration`参数为`true`,以便递归地读取所有子目录中的文件。