测试是每个软件开发过程中不可或缺的一部分, Apache Flink 同样提供了在测试金字塔的多个级别上测试应用程序代码的工具。
测试用户自定义函数
通常,我们可以假设 Flink 在用户自定义函数之外产生了正确的结果。因此,建议尽可能多的用单元测试来测试那些包含主要业务逻辑的类。
单元测试无状态、无时间限制的 UDF
例如,让我们以以下无状态的
MapFunction
为例。
public class IncrementMapFunction implements MapFunction<Long, Long> {
@Override
public Long map(Long record) throws Exception {
return record + 1;
通过传递合适地参数并验证输出,你可以很容易的使用你喜欢的测试框架对这样的函数进行单元测试。
public class IncrementMapFunctionTest {
@Test
public void testIncrement() throws Exception {
// instantiate your function
IncrementMapFunction incrementer = new IncrementMapFunction();
// call the methods that you have implemented
assertEquals(3L, incrementer.map(2L));
类似地,对于使用 org.apache.flink.util.Collector
的用户自定义函数(例如FlatMapFunction
或者 ProcessFunction
),可以通过提供模拟对象而不是真正的 collector 来轻松测试。具有与 IncrementMapFunction
相同功能的 FlatMapFunction
可以按照以下方式进行单元测试。
public class IncrementFlatMapFunctionTest {
@Test
public void testIncrement() throws Exception {
// instantiate your function
IncrementFlatMapFunction incrementer = new IncrementFlatMapFunction();
Collector<Integer> collector = mock(Collector.class);
// call the methods that you have implemented
incrementer.flatMap(2L, collector);
//verify collector was called with the right output
Mockito.verify(collector, times(1)).collect(3L);
对有状态或及时 UDF 和自定义算子进行单元测试
对使用管理状态或定时器的用户自定义函数的功能测试会更加困难,因为它涉及到测试用户代码和 Flink 运行时的交互。 为此,Flink 提供了一组所谓的测试工具,可用于测试用户自定义函数和自定义算子:
OneInputStreamOperatorTestHarness
(适用于 DataStream
上的算子)KeyedOneInputStreamOperatorTestHarness
(适用于 KeyedStream
上的算子)TwoInputStreamOperatorTestHarness
(f适用于两个 DataStream
的 ConnectedStreams
算子)KeyedTwoInputStreamOperatorTestHarness
(适用于两个 KeyedStream
上的 ConnectedStreams
算子)
要使用测试工具,还需要一组其他的依赖项,请查阅配置小节了解更多细节。
现在,可以使用测试工具将记录和 watermark 推送到用户自定义函数或自定义算子中,控制处理时间,最后对算子的输出(包括旁路输出)进行校验。
public class StatefulFlatMapTest {
private OneInputStreamOperatorTestHarness<Long, Long> testHarness;
private StatefulFlatMap statefulFlatMapFunction;
@Before
public void setupTestHarness() throws Exception {
//instantiate user-defined function
statefulFlatMapFunction = new StatefulFlatMapFunction();
// wrap user defined function into a the corresponding operator
testHarness = new OneInputStreamOperatorTestHarness<>(new StreamFlatMap<>(statefulFlatMapFunction));
// optionally configured the execution environment
testHarness.getExecutionConfig().setAutoWatermarkInterval(50);
// open the test harness (will also call open() on RichFunctions)
testHarness.open();
@Test
public void testingStatefulFlatMapFunction() throws Exception {
//push (timestamped) elements into the operator (and hence user defined function)
testHarness.processElement(2L, 100L);
//trigger event time timers by advancing the event time of the operator with a watermark
testHarness.processWatermark(100L);
//trigger processing time timers by advancing the processing time of the operator directly
testHarness.setProcessingTime(100L);
//retrieve list of emitted records for assertions
assertThat(testHarness.getOutput(), containsInExactlyThisOrder(3L));
//retrieve list of records emitted to a specific side output for assertions (ProcessFunction only)
//assertThat(testHarness.getSideOutput(new OutputTag<>("invalidRecords")), hasSize(0))
KeyedOneInputStreamOperatorTestHarness
和 KeyedTwoInputStreamOperatorTestHarness
可以通过为键的类另外提供一个包含 TypeInformation
的 KeySelector
来实例化。
public class StatefulFlatMapFunctionTest {
private OneInputStreamOperatorTestHarness<String, Long, Long> testHarness;
private StatefulFlatMap statefulFlatMapFunction;
@Before
public void setupTestHarness() throws Exception {
//instantiate user-defined function
statefulFlatMapFunction = new StatefulFlatMapFunction();
// wrap user defined function into a the corresponding operator
testHarness = new KeyedOneInputStreamOperatorTestHarness<>(new StreamFlatMap<>(statefulFlatMapFunction), new MyStringKeySelector(), Types.STRING);
// open the test harness (will also call open() on RichFunctions)
testHarness.open();
//tests
在 Flink 代码库里可以找到更多使用这些测试工具的示例,例如:
org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorTest
是测试算子和用户自定义函数(取决于处理时间和事件时间)的一个很好的例子。org.apache.flink.streaming.api.functions.sink.filesystem.LocalStreamingFileSinkTest
展示了如何使用 AbstractStreamOperatorTestHarness
测试自定义 sink。具体来说,它使用 AbstractStreamOperatorTestHarness.snapshot
和 AbstractStreamOperatorTestHarness.initializeState
来测试它与 Flink checkpoint 机制的交互。
注意 AbstractStreamOperatorTestHarness
及其派生类目前不属于公共 API,可以进行更改。
单元测试 Process Function #
考虑到它的重要性,除了之前可以直接用于测试 ProcessFunction
的测试工具之外,Flink 还提供了一个名为 ProcessFunctionTestHarnesses
的测试工具工厂类,可以简化测试工具的实例化。考虑以下示例:
注意 要使用此测试工具,还需要引入上一节中介绍的依赖项。
public static class PassThroughProcessFunction extends ProcessFunction<Integer, Integer> {
@Override
public void processElement(Integer value, Context ctx, Collector<Integer> out) throws Exception {
out.collect(value);
通过传递合适的参数并验证输出,对使用 ProcessFunctionTestHarnesses
是很容易进行单元测试并验证输出。
public class PassThroughProcessFunctionTest {
@Test
public void testPassThrough() throws Exception {
//instantiate user-defined function
PassThroughProcessFunction processFunction = new PassThroughProcessFunction();
// wrap user defined function into a the corresponding operator
OneInputStreamOperatorTestHarness<Integer, Integer> harness = ProcessFunctionTestHarnesses
.forProcessFunction(processFunction);
//push (timestamped) elements into the operator (and hence user defined function)
harness.processElement(1, 10);
//retrieve list of emitted records for assertions
assertEquals(harness.extractOutputValues(), Collections.singletonList(1));
有关如何使用 ProcessFunctionTestHarnesses
来测试 ProcessFunction
不同风格的更多示例,, 例如 KeyedProcessFunction
,KeyedCoProcessFunction
,BroadcastProcessFunction
等,鼓励用户自行查看 ProcessFunctionTestHarnessesTest
。
测试 Flink 作业
JUnit 规则 MiniClusterWithClientResource
Apache Flink 提供了一个名为 MiniClusterWithClientResource
的 Junit 规则,用于针对本地嵌入式小型集群测试完整的作业。 叫做 MiniClusterWithClientResource
.
要使用 MiniClusterWithClientResource
,需要添加一个额外的依赖项(测试范围)。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils</artifactId>
<version>1.15.0</version>
<scope>test</scope>
</dependency>
让我们采用与前面几节相同的简单 MapFunction
来做示例。
public class IncrementMapFunction implements MapFunction<Long, Long> {
@Override
public Long map(Long record) throws Exception {
return record + 1;
现在,可以在本地 Flink 集群使用这个 MapFunction
的简单 pipeline,如下所示。
public class ExampleIntegrationTest {
@ClassRule
public static MiniClusterWithClientResource flinkCluster =
new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setNumberSlotsPerTaskManager(2)
.setNumberTaskManagers(1)
.build());
@Test
public void testIncrementPipeline() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// configure your test environment
env.setParallelism(2);
// values are collected in a static variable
CollectSink.values.clear();
// create a stream of custom elements and apply transformations
env.fromElements(1L, 21L, 22L)
.map(new IncrementMapFunction())
.addSink(new CollectSink());
// execute
env.execute();
// verify your results
assertTrue(CollectSink.values.containsAll(2L, 22L, 23L));
// create a testing sink
private static class CollectSink implements SinkFunction<Long> {
// must be static
public static final List<Long> values = Collections.synchronizedList(new ArrayList<>());
@Override
public void invoke(Long value, SinkFunction.Context context) throws Exception {
values.add(value);
关于使用 MiniClusterWithClientResource
进行集成测试的几点备注:
-
为了不将整个 pipeline 代码从生产复制到测试,请将你的 source 和 sink 在生产代码中设置成可插拔的,并在测试中注入特殊的测试 source 和测试 sink。
-
这里使用 CollectSink
中的静态变量,是因为Flink 在将所有算子分布到整个集群之前先对其进行了序列化。 解决此问题的一种方法是与本地 Flink 小型集群通过实例化算子的静态变量进行通信。 或者,你可以使用测试的 sink 将数据写入临时目录的文件中。
-
如果你的作业使用事件时间计时器,则可以实现自定义的 并行 源函数来发出 watermark。
-
建议始终以 parallelism > 1 的方式在本地测试 pipeline,以识别只有在并行执行 pipeline 时才会出现的 bug。
-
优先使用 @ClassRule
而不是 @Rule
,这样多个测试可以共享同一个 Flink 集群。这样做可以节省大量的时间,因为 Flink 集群的启动和关闭通常会占用实际测试的执行时间。
-
如果你的 pipeline 包含自定义状态处理,则可以通过启用 checkpoint 并在小型集群中重新启动作业来测试其正确性。为此,你需要在 pipeline 中(仅测试)抛出用户自定义函数的异常来触发失败。
目录测试测试用户自定义函数单元测试无状态、无时间限制的 UDF测试是每个软件开发过程中不可或缺的一部分, Apache Flink 同样提供了在测试金字塔的多个级别上测试应用程序代码的工具。通常,我们可以假设 Flink 在用户自定义函数之外产生了正确的结果。因此,建议尽可能多的用单元测试来测试那些包含主要业务逻辑的类。例如,让我们以以下无状态的 为例。通过传递合适地参数并验证输出,你可以很容易的使用你喜欢的测试框架对这样的函数进行单元测试。类似地,对于使用 的用户自定义函数(例如 或者 )
DataStream(数据流)本身是 Flink 中一个用来表示数据集合的类(Class),我们编写的 Flink 代码其实就是基于这种数据类型的处理,所以这套核心API 就以DataStream 命名。对于批处理和流处理,我们都可以用这同一套 API 来实现。
DataStream 在用法上有些类似于常规的 Java 集合,但又有所不同。我们在代码中往往并不关心集合中具体的数据,而只是用 API 定义出一连串的操作来处理它们;这就叫作数据流的“转换”(transformations)。
Flink 根据抽象程度分层,提供了三种不同的 API。每一种 API 在简洁性和表达力上有着不同的侧重,并且针对不同的应用场景。
DataStream API 是 Flink 中用于编写流处理作业的 API,为许多通用的流处理操作提供了处理原语。它支持 Java 和 Scala 语言,预先定义了例如 map()、reduce()、aggregate() 等函数。
在 DataStream API 中,Flink 应用程序同样包含以下步骤:
获取 execution environment
Flink 中的 DataStream 程序是对数据流进行转换(例如过滤、更新状态、定义窗口、聚合)的常规程序。数据流最初是从各种来源(例如,消息队列、套接字流、文件)创建的。结果通过接收器返回,例如可以将数据写入文件或标准输出(例如命令行终端)。Flink 程序可以在各种上下文中运行,可以独立运行,也可以嵌入到其他程序中。执行可以在本地 JVM 中发生,也可以在许多机器的集群上发生。
DataStream API 既可以处理无解流数据, 也可以处理有界的批量数据. 意思是既支持流处理,也支持离线的批
软硬兼施虾米:
(二十三)Flink Table API & SQL 编程指南 Table API
Jackie_GP:
(六)flink1.15 概念透析
普通网友:
(三)Flink1.15 发布最新版本说明
weixin_40134075: