博客 用短平快的方式告诉你:Flink-SQL 的扩展实现

用短平快的方式告诉你:Flink-SQL 的扩展实现

   小美   发表于 2023-01-05 16:31  249  0

2019 年 1 月 28 日,阿里云宣布开源 “计算王牌” 实时计算平台 Blink 回馈给 ApacheFlink 社区。官方称,计算延迟已经降到毫秒级,也就是你在浏览网页的时候,眨了一下眼睛,淘宝、天猫处理的信息已经刷新了 17 亿次。

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user6/article/acbbc2354e6777fcfd3340aaf7b2fd24..jpg

作为一家对技术有追求、有渴望的公司,怎么少得了为 Flink 社区做些贡献呢?

夫子说

首先,本文所述均基于 flink 1.5.4

我们为什么扩展 Flink-SQL?

由于 Flink 本身 SQL 语法并不提供在对接输入源和输出目的的 SQL 语法。数据开发在使用的过程中需要根据其提供的 Api 接口编写 Source 和 Sink, 异常繁琐,不仅需要了解 FLink 各类 Operator 的 API, 还需要对各个组件的相关调用方式有了解(比如 kafka,redis,mongo,hbase 等),并且在需要关联到外部数据源的时候没有提供 SQL 相关的实现方式,因此数据开发直接使用 Flink 编写 SQL 作为实时的数据分析时需要较大的额外工作量。

我们的目的是在使用 Flink-SQL 的时候只需要关心做什么,而不需要关心怎么做。不需要过多的关心程序的实现,专注于业务逻辑。

接下来,我们一起来看下 Flink-SQL 的扩展实现吧!

 

01 扩展了哪些 flink 相关 sql

(1)创建源表语句

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user6/article/f0eda30c589db9564bdc02d887f1394e..jpg

(2)创建输出表语句

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user6/article/6902ff8f13f2d440ac9781dca0eaadbc..jpg

(3)创建自定义函数

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user6/article/f597e773ab96cb8968372d7c85397a81..jpg

(4)维表关联

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user6/article/599128de8209bbd75921b250a3b4346c..jpg

02 各个模块是如何翻译到 flink 的实现

(1) 如何将创建源表的 sql 语句转换为 flink 的 operator;

Flink 中表的都会映射到 Table 这个类。然后调用注册方法将 Table 注册到 environment。

StreamTableEnvironment.registerTable(tableName, table);

当前我们只支持 kafka 数据源。Flink 本身有读取 kafka 的实现类,FlinkKafkaConsumer09, 所以只需要根据指定参数实例化出该对象。并调用注册方法注册即可。

另外需要注意在 flink sql 经常会需要用到 rowtime, proctime, 所以我们在注册表结构的时候额外添加 rowtime,proctime。

当需要用到 rowtime 的使用需要额外指定 DataStream.watermarks (assignTimestampsAndWatermarks),自定义 watermark 主要做两个事情:1:如何从 Row 中获取时间字段。 2:设定最大延迟时间。

(2) 如何将创建的输出表 sql 语句转换为 flink 的 operator;

Flink 输出 Operator 的基类是 OutputFormat, 我们这里继承的是 RichOutputFormat, 该抽象类继承 OutputFormat,额外实现了获取运行环境的方法 getRuntimeContext (), 方便于我们之后自定义 metric 等操作。

我们以输出到 mysql 插件 mysql-sink 为例,分两部分:

  • 将 create table 解析出表名称,字段信息,mysql 连接信息。

该部分使用正则表达式的方式将 create table 语句转换为内部的一个实现类。该类存储了表名称,字段信息,插件类型,插件连接信息。

  • 继承 RichOutputFormat 将数据写到对应的外部数据源。

主要是实现 writeRecord 方法,在 mysql 插件中其实就是调用 jdbc 实现插入或者更新方法。

(3) 如何将自定义函数语句转换为 flink 的 operator;

Flink 对 udf 提供两种类型的实现方式:

(1)继承 ScalarFunction

(2)继承 TableFunction

需要做的将用户提供的 jar 添加到 URLClassLoader, 并加载指定的 class (实现上述接口的类路径), 然后调用 TableEnvironment.registerFunction (funcName, udfFunc);即完成了 udf 的注册。之后即可使用改定义的 udf;

(4) 维表功能是如何实现的?

流计算中一个常见的需求就是为数据流补齐字段。因为数据采集端采集到的数据往往比较有限,在做数据分析之前,就要先将所需的维度信息补全,但是当前 flink 并未提供 join 外部数据源的 SQL 功能。

实现该功能需要注意的几个问题:

(1)维表的数据是不断变化的

在实现的时候需要支持定时更新内存中的缓存的外部数据源,比如使用 LRU 等策略。

(2)IO 吞吐问题

如果每接收到一条数据就串行到外部数据源去获取对应的关联记录的话,网络延迟将会是系统最大的瓶颈。这里我们选择阿里贡献给 flink 社区的算子 RichAsyncFunction。该算子使用异步的方式从外部数据源获取数据,大大减少了花费在网络请求上的时间。

(3)如何将 sql 中包含的维表解析到 flink operator   

为了从 sql 中解析出指定的维表和过滤条件, 使用正则明显不是一个合适的办法。需要匹配各种可能性。将是一个无穷无尽的过程。查看 flink 本身对 sql 的解析。它使用了 calcite 做为 sql 解析的工作。将 sql 解析出一个语法树,通过迭代的方式,搜索到对应的维表;然后将维表和非维表结构分开。

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user6/article/5a139b4b0993d23fbc5fd58bc52229ef..jpg

通过上述步骤可以通过 SQL 完成常用的从 kafka 源表,join 外部数据源,写入到指定的外部目的结构中。


想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:https://www.dtstack.com/?src=bbs

同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术群」,交流最新开源技术信息,群号码:30537511,项目地址:https://github.com/DTStack

0条评论
社区公告
  • 大数据领域最专业的产品&技术交流社区,专注于探讨与分享大数据领域有趣又火热的信息,专业又专注的数据人园地

最新活动更多
微信扫码获取数字化转型资料
钉钉扫码加入技术交流群