Flink与HadApache Flink 与 Hadoop Hadoop Distributed File System (HDFS) 的交互主要体现在数据的读取与写入上。Flink 作为一个分布式流处理和批处理框架,可以很好地与 HDFS 集成,实现数据的输入输出。
**数据读取**:
- **批处理模式**:在批处理场景下,Flink 可以通过 FileSystem 输入格式从 HDFS 中读取静态数据集,如文本文件、序列化文件等。Flink 提供了多种文件系统的 connector,其中包括 Hadoop 文件系统 connector,可以直接读取 HDFS 上的数据。
- **流处理模式**:尽管 Flink 更多地是以流式处理为核心,但在某些场景下,也可以读取 HDFS 上持续追加的文件作为流式数据源。例如,可以通过监视目录或文件结束符(如 `_SUCCESS` 文件)来读取新增的数据块。
**数据写入**:
- **批处理结果输出**:Flink 完成批处理任务后,可以将结果数据以文件形式持久化到 HDFS,便于后续的离线分析或其它批处理作业继续处理。
- **流处理结果实时写入**:在流处理场景下,Flink 可以将处理后的实时结果数据实时写入到 HDFS 中,便于其他系统或服务后续读取和使用。Flink 提供了多种 Sink(接收器)用于将数据写入 HDFS,比如 `HadoopOutputFormatSinkFunction` 或 `HadoopFileSystemSink`。
**示例**:
使用 Flink DataStream API 时,可以通过以下方式与 HDFS 交互:
```java
// 读取 HDFS 数据
env.readFile(new TextInputFormat(new Path("hdfs://namenode:port/path/to/input")), "input", new SimpleStringSchema());
// 写入 HDFS 数据
DataStream<String> stream = ... // 处理后的数据流
stream.addSink(new HadoopFsSink<>(new Path("hdfs://namenode:port/path/to/output"),
new SimpleStringEncoder<>("UTF-8"), new TextOutputFormat<>()));
// 或者使用 OutputFormatSinkFunction
stream.addSink(new OutputFormatSinkFunction<>(new TextOutputFormat<>(new Path("hdfs://namenode:port/path/to/output")) {
@Override
public void writeRecord(Object record) throws IOException {
getOutputFormat().writeRecord(record);
}
}));
```
通过上述方式,Flink 便能与 HDFS 实现高效、稳定的读写交互,满足大规模数据处理的需求。