博客 Flink的单元测试介绍及示例

Flink的单元测试介绍及示例

   沸羊羊   发表于 2024-12-27 11:36  452  0

一、Flink测试概述

Apache Flink 同样提供了在测试金字塔的多个级别上测试应用程序代码的工具。
本文示例的maven依赖


UTF-8
UTF-8
1.8
1.8
1.8
2.12
1.17.0




org.apache.flink
flink-clients
${flink.version}
provided


org.apache.flink
flink-java
${flink.version}
provided


org.apache.flink
flink-streaming-java
${flink.version}
provided


org.apache.flink
flink-csv
${flink.version}
provided


org.apache.flink
flink-json
${flink.version}
provided


junit
junit
4.13


org.mockito
mockito-core
4.0.0
test

二、测试用户自定义函数

可以假设 Flink 在用户自定义函数之外产生了正确的结果。因此,建议尽可能多的用单元测试来测试那些包含主要业务逻辑的类。

1、单元测试无状态、无时间限制的 UDF

1)、示例-mapFunction

以下无状态的 MapFunction 为例

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/f550a6e4434a5c8aec8d355ee281a0a2..png

通过传递合适地参数并验证输出,可以很容易的使用你喜欢的测试框架对这样的函数进行单元测试。

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/8a8755ff2b80bc6a75aa1de46f12ad9b..png

2)、示例-flatMapFunction
对于使用 org.apache.flink.util.Collector 的用户自定义函数(例如FlatMapFunction 或者 ProcessFunction),可以通过提供模拟对象而不是真正的 collector 来轻松测试。具有与 IncrementMapFunction 相同功能的 FlatMapFunction 可以按照以下方式进行单元测试。

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/cb1ae09feaf2a0b98cb8d5be0671c70b..png

2、对有状态或及时 UDF 和自定义算子进行单元测试
对使用管理状态或定时器的用户自定义函数的功能测试会更加困难,因为它涉及到测试用户代码和 Flink 运行时的交互。 为此,Flink 提供了一组所谓的测试工具,可用于测试用户自定义函数和自定义算子:

OneInputStreamOperatorTestHarness (适用于 DataStream 上的算子)
KeyedOneInputStreamOperatorTestHarness (适用于 KeyedStream 上的算子)
TwoInputStreamOperatorTestHarness (f适用于两个 DataStream 的 ConnectedStreams 算子)
KeyedTwoInputStreamOperatorTestHarness (适用于两个 KeyedStream 上的 ConnectedStreams 算子)
要使用测试工具,还需要一组其他的依赖项,比如DataStream和TableAPI的依赖。

1)、DataStream API 测试依赖
如果要为使用 DataStream API 构建的作业开发测试用例,则需要添加以下依赖项:

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/a5c93f427e5804dd47370888b2fc3509..png

在各种测试实用程序中,该模块提供了 MiniCluster (一个可配置的轻量级 Flink 集群,能在 JUnit 测试中运行),可以直接执行作业。

2)、Table API 测试依赖
如果您想在您的 IDE 中本地测试 Table API 和 SQL 程序,除了前述提到的 flink-test-utils 之外,您还要添加以下依赖项:

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/c0bd2ca23c15f55f0a19eea837b7bce6..png

这将自动引入查询计划器和运行时,分别用于计划和执行查询。

flink-table-test-utils 模块已在 Flink 1.15 中引入,截至Flink 1.17版本被认为是实验性的。

3)、flatmap function 单元测试
现在,可以使用测试工具将记录和 watermark 推送到用户自定义函数或自定义算子中,控制处理时间,最后对算子的输出(包括旁路输出)进行校验。

示例如下

/*
* @Author: alanchan
* @LastEditors: alanchan
* @Description: 单元测试flatmap,如果是偶数则存储原值及平方数
*/
import java.util.concurrent.ConcurrentLinkedQueue;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.operators.StreamFlatMap;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.util.Collector;
import org.junit.Before;
import org.junit.Test;

public class TestStatefulFlatMapDemo3 {

static class AlanFlatMapFunction implements FlatMapFunction {
@Override
public void flatMap(Integer value, Collector out) throws Exception {
if (value % 2 == 0) {
out.collect(value);
out.collect(value * value);
}
}

}

OneInputStreamOperatorTestHarness testHarness;

@Before
public void setupTestHarness() throws Exception {
StreamFlatMap operator = new StreamFlatMap(new AlanFlatMapFunction());

testHarness = new OneInputStreamOperatorTestHarness(operator);
testHarness.open();
}

@Test
public void testFlatMap2() throws Exception {
long initialTime = 0L;
ConcurrentLinkedQueue

KeyedOneInputStreamOperatorTestHarness 和 KeyedTwoInputStreamOperatorTestHarness 可以通过为键的类另外提供一个包含 TypeInformation 的 KeySelector 来实例化。

  • 示例如下
/*
* @Author: alanchan
* @LastEditors: alanchan
* @Description: 按照城市分类,并将城市缩写变成大写
*/
import com.google.common.collect.Lists;

import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.operators.StreamFlatMap;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

public class TestStatefulFlatMapDemo2 {
@Data
@NoArgsConstructor
@AllArgsConstructor
static class User {
private int id;
private String name;
private int age;
private String city;
}

static class AlanFlatMapFunction extends RichFlatMapFunction {
// The state is only accessible by functions applied on a {@code KeyedStream}
ValueState previousInput;

@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
previousInput = getRuntimeContext()
.getState(new ValueStateDescriptor("previousInput", User.class));
}

@Override
public void flatMap(User input, Collector out) throws Exception {
previousInput.update(input);
input.setCity(input.getCity().toUpperCase());
out.collect(input);
}
}

AlanFlatMapFunction alanFlatMapFunction = new AlanFlatMapFunction();
OneInputStreamOperatorTestHarness testHarness;

@Before
public void setupTestHarness() throws Exception {
alanFlatMapFunction = new AlanFlatMapFunction();

testHarness = new KeyedOneInputStreamOperatorTestHarness<>(new StreamFlatMap<>(alanFlatMapFunction),
new KeySelector() {

@Override
public String getKey(User value) throws Exception {
return value.getCity();
}
}, Types.STRING);

testHarness.open();
}

@Test
public void testFlatMap() throws Exception {
testHarness.processElement(new User(1, "alanchan", 18, "sh"), 10);

ValueState previousInput = alanFlatMapFunction.getRuntimeContext().getState(
new ValueStateDescriptor<>("previousInput", User.class));
User stateValue = previousInput.value();

Assert.assertEquals(
Lists.newArrayList(new StreamRecord<>(new User(1, "alanchan", 18, "sh".toUpperCase()), 10)),
testHarness.extractOutputStreamRecords());

Assert.assertEquals(new User(1, "alanchan", 18, "sh".toUpperCase()), stateValue);

testHarness.processElement(new User(2, "alan", 19, "bj"), 10000);
Assert.assertEquals(
Lists.newArrayList(
new StreamRecord<>(new User(1, "alanchan", 18, "sh".toUpperCase()), 10),
new StreamRecord<>(new User(2, "alan", 19, "bj".toUpperCase()), 10000)),
testHarness.extractOutputStreamRecords());
Assert.assertEquals(new User(2, "alan", 19, "bj".toUpperCase()), previousInput.value());

}
}

4)、Process Function 单元测试

除了之前可以直接用于测试 ProcessFunction 的测试工具之外,Flink 还提供了一个名为 ProcessFunctionTestHarnesses 的测试工具工厂类,可以简化测试工具的实例化。

  • OneInputStreamOperatorTestHarness示例
import com.google.common.collect.Lists;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/*
* @Author: alanchan
* @LastEditors: alanchan
* @Description:
*/
public class TestProcessOperatorDemo1 {
// public abstract class KeyedProcessFunction
static class AlanProcessFunction extends KeyedProcessFunction {

@Override
public void processElement(String value, KeyedProcessFunction.Context ctx,
Collector out) throws Exception {
ctx.timerService().registerProcessingTimeTimer(50);
out.collect("vx->" + value);
}

@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception {
// 到达时间点触发事件操作
out.collect(String.format("定时器在 %d 被触发", timestamp));
}

}

private OneInputStreamOperatorTestHarness testHarness;
private AlanProcessFunction processFunction;

@Before
public void setupTestHarness() throws Exception {
processFunction = new AlanProcessFunction();

testHarness = new KeyedOneInputStreamOperatorTestHarness<>(
new KeyedProcessOperator<>(processFunction),
x -> "1",
Types.STRING);
// Function time is initialized to 0
testHarness.open();
}

@Test
public void testProcessElement() throws Exception {
testHarness.processElement("alanchanchn", 10);
Assert.assertEquals(
Lists.newArrayList(
new StreamRecord<>("vx->alanchanchn", 10)),
testHarness.extractOutputStreamRecords());
}

@Test
public void testOnTimer() throws Exception {
// test first record
testHarness.processElement("alanchanchn", 10);
Assert.assertEquals(1, testHarness.numProcessingTimeTimers());

// Function time 设置为 100
testHarness.setProcessingTime(100);
Assert.assertEquals(
Lists.newArrayList(
new StreamRecord<>("vx->alanchanchn", 10),
new StreamRecord<>("定时器在 100 被触发")),
testHarness.extractOutputStreamRecords());
}
}


————————————————

本文系转载,版权归原作者所有,如若侵权请联系我们进行删除!

《数据资产管理白皮书》下载地址:https://www.dtstack.com/resources/1073/?src=bbs

《行业指标体系白皮书》下载地址:https://www.dtstack.com/resources/1057/?src=bbs

《数据治理行业实践白皮书》下载地址:https://www.dtstack.com/resources/1001/?src=bbs

《数栈V6.0产品白皮书》下载地址:https://www.dtstack.com/resources/1004/?src=bbs

想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:https://www.dtstack.com/?src=bbs

同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术群」,交流最新开源技术信息,群号码:30537511,项目地址:https://github.com/DTStack


0条评论
下一篇:
社区公告
  • 大数据领域最专业的产品&技术交流社区,专注于探讨与分享大数据领域有趣又火热的信息,专业又专注的数据人园地

最新活动更多
微信扫码获取数字化转型资料
钉钉扫码加入技术交流群