进入flink sql命令行
sql-client.sh
与所有 SQL 引擎一样,Flink 查询操作是在表上进行。与传统数据库不同,Flink 不在本地管理静态数据;相反,它的查询在外部表上连续运行。
Flink 数据处理流水线开始于 source 表。source 表产生在查询执行期间可以被操作的行;它们是查询时 FROM
子句中引用的表。这些表可能是 Kafka 的 topics,数据库,文件系统,或者任何其它 Flink 知道如何消费的系统。
可以通过 SQL 客户端或使用环境配置文件来定义表。SQL 客户端支持类似于传统 SQL 的 SQL DDL 命令。标准 SQL DDL 用于创建,修改,删除表。Flink 支持不同的连接器和格式相结合以定义表。
# Source表 -- hdfs source
-- 将数据上传到hdfs
hadoop dfs -mkdir -p /data/peoples
hadoop dfs -put student.txt /data/peoples
CREATE TABLE student (
id STRING,
name STRING,
age INT,
gender STRING,
clazz STRING
) WITH (
'connector' = 'filesystem',
'path' = 'hdfs://master:9000/data/peoples',
'format' = 'csv'
);
虽然最初设计时没有考虑流语义,但 SQL 是用于构建连续数据流水线的强大工具。Flink SQL 与传统数据库查询的不同之处在于,Flink SQL 持续消费到达的行并对其结果进行更新。
一个连续查询永远不会终止,并会产生一个动态表作为结果。动态表是 Flink 中 Table API 和 SQL 对流数据支持的核心概念。连续流上的聚合需要在查询执行期间不断地存储聚合的结果。
select clazz,count(1) as c
from student
group by clazz;
当运行此查询时,SQL 客户端实时但是以只读方式提供输出。存储结果,作为报表或仪表板的数据来源,需要写到另一个表。这可以使用 INSERT INTO
语句来实现。本节中引用的表称为 sink 表。INSERT INTO
语句将作为一个独立查询被提交到 Flink 集群中。
# sink表(保存查询数据) -- hdfs sink
CREATE TABLE clazz_num (
clazz STRING,
c BIGINT
) WITH (
'connector' = 'filesystem',
'path' = 'hdfs://master:9000/data/clazz_num',
'format' = 'csv'
);
# 将连续查询的结果插入到sink表中
batch是输出最终的结果,streamg模式输出连续结果
-- 如果连续查询的返回的动态表是一个更新的表
-- 插入语句的返回的字段和类型和sink表一致
SET 'execution.runtime-mode' = 'batch'; 使用批处理模式写入hdfs
insert into clazz_num
select clazz,count(1) as c
from student
group by clazz;
hadoop dfs -ls /data/clazz_num
hadoop dfs -cat /data/clazz_num/*
提交后,它将运行并将结果直接存储到 sink 表中,而不是将结果加载到系统内存中。
《行业指标体系白皮书》下载地址:https://www.dtstack.com/resources/1057/?src=bbs
《数据治理行业实践白皮书》下载地址:https://www.dtstack.com/resources/1001/?src=bbs
《数栈V6.0产品白皮书》下载地址:https://www.dtstack.com/resources/1004/?src=bbs
想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:https://www.dtstack.com/?src=bbs
同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术群」,交流最新开源技术信息,群号码:30537511,项目地址:https://github.com/DTStack