用KeySelector wordAndOne . keyBy ( new KeySelector < Tuple2 < String , Integer > , Tuple2 < String , Integer > > ( ) { @Override public Tuple2 < String , Integer > getKey ( Tuple2 < String , Integer > value ) throws Exception { return Tuple2 . of ( value . f0 , value . f1 ) ; } ) ;

上述可用lambda简化

wordAndOne.keyBy(
        (KeySelector<Tuple2<String, Integer>, Tuple2<String, Integer>>) value -> 
                Tuple2.of(value.f0, value.f1)

二、POJO

假设有个流

DataStream<PeopleCount> source = ...

PeopleCount的类定义是

public class PeopleCount {
    private String province;
    private String city;
    private Integer counts;
    public PeopleCount() {
	省略其他代码。。。

1. 单个字段keyBy

source.keyBy(a -> a.getProvince())
source.keyBy(PeopleCount::getProvince)

2. 多个字段keyBy

source.keyBy(new KeySelector<PeopleCount, Tuple2<String, String>>() {
    @Override
    public Tuple2<String, String> getKey(PeopleCount value) throws Exception {
        return Tuple2.of(value.getProvince(), value.getCity());
});

上述可用lambda简化

map.keyBy(
        (KeySelector<PeopleCount, Tuple2<String, String>>) value -> 
                Tuple2.of(value.getProvince(), value.getCity())
                    一、元组假设有个流DataStream&lt;Tuple2&lt;String, Integer&gt;&gt; wordAndOne = ....1. 单个字段keyBy用字段位置wordAndOne.keyBy(0)用字段表达式wordAndOne.keyBy(v -&gt; v.f0)2. 多个字段keyBy用字段位置wordAndOne.keyBy(0, 1)用KeySelectorwordAndOne.keyBy(new KeySelector&lt;Tuple2&
				
// 自定义 KeySelector public static class MyKeySelector implements KeySelector<MarketingUserBehavior,Tuple2<String,String>>{ @Override public Tuple2<String, String> getKey(MarketingUserBehavior marketingUserBehavior) thr
public class RecordSeclectId implements KeySelector<Record, String> { private static final long serialVersionUID = 4780234853172462378L; @Override public String getKey(Record value) throw...
本文主要研究一下flink KeyedStream的KeySelector KeyedStream flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/KeyedStream.java @Public public class KeyedStream&lt;T, KEY&g...
Flink 一个job , 多个keyBy , 多个sink. 最近楼主在开发flink 流程的时候, 碰到了一个八哥, 一个source ,多个keyBy , 多个sink.但是总是出现ClassCastException . 函数的原型大概是这样: DataStream source = env.addSource(); source.map(return AModel).fliter(Objects::nonNull).keyBy().sink(); source.map(return B..
1 按照元组的元素来分区 . //流处理环境 StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> text =
Flink的Transformation转换主要包括四种:单数据流基本转换、基于Key的分组转换、多数据流转换和数据重分布转换。本文主要介绍基于Key的分组转换,关于时间和窗口将在后续文章中介绍。读者可以使用Flink Scala Shell或者Intellij Idea来进行练习: Flink Scala Shell使用教程 Intellij Idea开发环境搭建教程 Flink单数...
可以使用 CONCAT 函数将多个字段合并为一个字段,示例代码如下: SELECT CONCAT(column1, column2, column3) as new_column FROM table_name; 其中,column1, column2, column3 分别为要合并的字段名,new_column 为合并后的字段名,table_name 为对应的表名。