Java 8 自定义流Collector实现

前文 我们看到 Java 8 Collectors提供了很多内置实现。但有时我们需要实现一些特定功能满足业务需要,本文带你学习如何自定义Collector的实现,计算字符串流中所有单词的长度。

假设有字符串流,利用每个字符串对象有方法length()————计算并返回单词长度。我们想创建自定义Collector,实现reduce操作,计算流中所有单词的长度之和。

使用 Collector.of() 方法

为了创建自定义Collector,需要实现Collector接口。现在,我们不使用传统方法,而是使用Collector.of()静态方法创建自定义Collector。

不仅是为了更精简和增强可读性,还因为这种方法可以忽略部分不必要的实现。实际上,Collector接口仅需要三个必须部分————提供者(supplier), 累加器(accumulator) 以及合并器(combiner)。

结果容器提供者(supplier)

实现Collector,必须提供结果容器,即累加值存储的地方。下面代码提供了结果容器:

() -> new int[1]

你可能会想,为什么是 new int[1],而不是int变量作为初始化值。原因是Collector接口需要结果容器能够以可变的方式进行更新。

累加元素(accumulator)

接下来,我们需要创建函数实现增加元素至结果容器。在我们的示例中,即单词的长度增加至结果容器:

(result, item) -> result[0] += item.length()

该函数是Consumer类型,其不返回任何值,仅以可变的方式更新结果容器————即数组中的第一个元素。

合并器(combiner)

在reduction序列操作中,提供者(supplier) 和 累加器(accumulator) 已经足够了,但为了能够实现并行操作,我们需要实现一个合并器。合并器(combiner)是定义两个结果如何合并的函数。
在并行环境下,流被分为多个部分,每个部分被并行累加。当所有部分都完成时,结果需要使用合并器函数进行合并。下面请看我们的实现代码:

(result1, result2) -> {
  result1[0] += result2[0];
  return result1;

最小的自定义Collector

现在我们已经所有必要组件,整合在一起就是我们的Collector:

 wordStream.collect(Collector.of(
    ()-> new int[1],
    (result, item) -> result[0] += item.length(),
    (result1, result2) -> {
        result1[0] += result2[0];
        return result1;

上面方案有个小问题,其直接返回结果容器即int[]类型。实际我们需要的字符串长度,不是结果容器。

最后一个转换

我们可以很容易实现,增加一个函数,其映射结果容器至我们需要的类型。这里我们仅仅需要数组的第一个元素:

total -> total[0]  

最后完整代码为:

    private List<String> wordList = Arrays.asList("tommy", "is", "a", "java", "developer");
    @Test
    public void wordCountTest() {
        Stream<String> wordStream = wordList.stream();
        int wordCnt = wordStream.collect(Collector.of(
            ()-> new int[1],
            (result, item) -> result[0] += item.length(),
            (result1, result2) -> {
                result1[0] += result2[0];
                return result1;
            total -> total[0]
        System.out.println("wordCnt = " + wordCnt);

如果把我们自定义的Collector赋值给变量,则代码可以简化为:

int wordCount = wordStream.collect(totalWordCountCollector);  

最后,我们看下优化参数。即自定义Collector支持不同类型的优化参数。
使用Collector.of() 可以在参数最后增加 Characteristics 作为可变参数:

Collector.of(  
  // supplier,
  // accumulator,
  // combiner,
  // finisher, 
  Collector.Characteristics.CONCURRENT,
  Collector.Characteristics.IDENTITY_FINISH,
  // ...

有三种 Characteristics 可以使用:

  • CONCURRENT — 指明结果容器可以被多个并发累加器使用
  • IDENTITY_FINISH — 指明结束函数是恒等函数且可以被忽略
  • UNORDERED — 指明collector不依赖元素顺序

本文我们学习了如何自定义Java 8 自定义 流的 Collector实现.更多内容可以参考官方文档

Java 8 自定义 流的 Collector实现前文我们看到 Java 8 Collectors提供了很多内置实现。但有时我们需要实现一些特定功能满足业务需要,本文带你学习如何自定义Collector的实现,计算字符串流中所有单词的长度。需求说明加入我们的字符串流,每个字符串对象有方法length()————返回单词长度。我们想创建自定义Collector,实现reduce操作,计算流中所... 怎么在java中创建一个自定义collector 在之前的java collectors文章里面,我们讲到了stream的collect方法可以调用Collectors里面的toList()或者toMap()方法,将结果转换为特定的集合类。 今天我们介绍一下怎么自定义一个Collector。 Collector介绍 我们先看一下Collector的定义: Collector接口需要实现supplier(),accumulator(),combiner(),finisher(),characteristics()这5个接口
collector collector的简单场景,一般是在处理完毕,想要收集对象的收尾工作。代码如下。 List<String> list = Stream.of("kimmy", "robin", "lisa", "lulu", "mike", "jimmy") .collect(Collectors.toList()); 这个收集器主要是把 水...
Collector.of(() -> { Pagination<T> pagination = new Pagination<>(); pagination.setPage(page); pagination.setSize(size); collect:收集器 Collector:收集器的方法 Collector:是一个接口,一种可变的汇聚操作,它将输入元素累积到可变结果容器中。在处理完所有输入元素后,可以选择将累积的结果转换为最终形式(这是一种可选操作),支持串行,并行操作 Collectors本身提供了关于Collector的常见汇聚实现Collectors本身实际上是一个工厂 为了确保串行与并行操作结果的的等价性,Collector函数需要满足两个条件:identity(同一性)与associativi collect(toList())方法由Stream里的值生成一个列表,是一个及早求值的操作。 Stream的of方法使用一个初始值生成新的Stream。事实上,collect的使用方法不仅限于此,它是一个非常通用的强大结构。 下面我们看一下用法: 运行结果如上图。 由于很多Stream操作都是惰性求值,因此调用Stream上的一...
Stream 的核心在于Collectors,即对处理后的数据进行收集。Collectors 提供了非常多且强大的API,可以将最终的数据收集成List、Set、Map,甚至是更复杂的结构(这三者的嵌套组合)。 Collectors 提供了很多API,有很多都是一些函数的重载,这里我个人将其分为三大类,如下: 数据收集:set、map、list 聚合归约:统计、求和、最值、平均、字符串拼接、规约 前后处理:分区、分组、自定义操作 API 使用 这里会讲到一些常用API 的用
你好,我是看山。 Java8 应该算是业界主版本了,版本中重要性很高的一个更新是Stream处理。关于处理内容比较多,本文主要是说一下Stream中的Collectors工具类的使用。 Collectors是java.util.stream包下的一个工具类,其中各个方法的返回值可以作为java.util.stream.Stream#collect的入参,实现对队列的各种操作,包括:分组、聚合等。官方文档给出一些例子: Implementations of {@link Collector} tha.
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.io.TextInputFormat; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.fs.Path; import org.apache.flink.util.Collector; public class FlinkHDFSExample { public static void main(String[] args) throws Exception { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<String> text = env.readTextFile("hdfs://localhost:9000/path/to/directory1/,hdfs://localhost:9000/path/to/directory2/") .withParameters(new Configuration().setBoolean("recursive.file.enumeration", true)); DataSet<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()) .groupBy(0) .sum(1); counts.print(); public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) { String[] tokens = value.toLowerCase().split("\\W+"); for (String token : tokens) { if (token.length() > 0) { out.collect(new Tuple2<>(token, 1)); 这个例子使用Flink的`readTextFile`方法读取HDFS下的多个目录中的文件,并使用`Tokenizer`函数对文件进行分词,最后统计每个单词出现的次数。注意,需要在`readTextFile`方法中设置`recursive.file.enumeration`参数为`true`,以便递归地读取所有子目录中的文件。