博客 【Kafka】与【Hadoop】的集成应用案例深度解析

【Kafka】与【Hadoop】的集成应用案例深度解析

   数栈君   发表于 2025-01-17 10:25  450  0

一、引言

1、Kafka简介

Apache Kafka 是一个分布式流处理平台,最初由 LinkedIn 开发,并于 2011 年开源,现在由 Apache Software Foundation 进行维护。Kafka 旨在提供一个统一、高吞吐量、低延迟的平台,用于处理实时数据流。它通常用于构建实时数据管道和流式应用。
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/0b17903ce4ebd153711e27c7860d5e43..png



2、Hadoop简介

Apache Hadoop 是一个开源的分布式计算框架,由 Apache 软件基金会开发和维护。它用于处理和存储大规模数据集,通常被称为“大数据”。Hadoop 的设计目标是提供一个可靠、可扩展和高效的平台,用于分布式数据处理。


http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/4d4656ec73d1e1a4f5e7be0c8811cf61..png


二、Kafka基础

1、Kafka核心概念

1.Producer: 生产者是将数据发布到 Kafka 主题中的客户端应用程序。生产者负责将数据发送到 Kafka 集群。
2.Consumer: 消费者是从 Kafka 主题中读取数据的客户端应用程序。消费者订阅一个或多个主题,并从中消费数据。
3.Broker: Kafka 集群由多个 Kafka 实例(称为 broker)组成,每个 broker 负责处理和存储一部分数据。Broker 之间通过分区和副本机制实现数据的分布式存储和高可用性。
4.Topic: 主题是 Kafka 中的消息分类或类别。生产者将消息发布到主题,消费者从主题中读取消息。每个主题可以分为多个分区(partition),以实现并行处理和扩展。
5.Partition: 分区是 Kafka 主题的基本单元,每个主题可以包含一个或多个分区。每个分区是一个有序的、不可变的消息队列。分区有助于实现数据的并行处理和负载均衡。
6.Offset: 每条消息在其所在的分区中都有一个唯一的标识符,称为偏移量(offset)。消费者使用偏移量来跟踪已经消费的消息位置。


http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/04cc5f2a7885823e88590188cb51befb..png


2、Kafka核心组件

1.Kafka Connect: Kafka Connect 是 Kafka 的一个组件,用于简化将数据从外部系统(例如数据库、文件系统等)导入和导出到 Kafka 的过程。它提供了许多预构建的连接器,可以轻松集成各种数据源和目标。
2.Kafka Streams: Kafka Streams 是一个用于构建流处理应用程序的客户端库。它允许开发者创建高度可扩展、容错的流处理应用程序,以便实时处理和分析数据流。
3.ZooKeeper: Kafka 使用 Apache ZooKeeper 进行分布式协调,管理集群的元数据,包括主题、分区、broker 等信息。


http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/364881f8c80eb5f5d3109549551fcb93..png



3、Kafka主要特性

1.高吞吐量: Kafka 可以处理大量的数据流,并且在低延迟下提供高吞吐量的消息传输。
2.可扩展性: 通过增加更多的 broker,可以轻松扩展 Kafka 集群的容量和性能。
3.持久性和容错性: Kafka 将数据持久化到磁盘,并通过复制机制实现高可用性,确保在硬件故障时数据不会丢失。
4.分区和并行处理: 通过将主题划分为多个分区,Kafka 支持高效的并行处理,从而提高数据处理的速度和效率。

4、Kafka使用场景

1.实时流数据管道: Kafka 通常用于构建实时数据管道,将数据从生产者传输到消费者。
2.数据集成: 使用 Kafka Connect,将不同数据源的数据集成到统一的 Kafka 平台。
3.实时分析和监控: 利用 Kafka Streams 或其他流处理框架,可以对实时数据进行分析和监控。

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/5fbc55274f01db7f30e1cd40d391988d..png

三、Hadoop生态系统概览

1、Hadoop核心组件

1. Hadoop 分布式文件系统 (HDFS):


    ·HDFS 是 Hadoop 的存储层,专为大规模数据存储而设计。
    ·它将数据分成块(通常为 128 MB 或 256 MB),并在集群中的多个节点上进行复制和存储,以确保数据的高可用性和容错性。
    ·HDFS 的主从架构包括一个 NameNode(管理文件系统的元数据)和多个 DataNode(存储实际数据)。

2. MapReduce:


    ·MapReduce 是 Hadoop 的计算模型和处理引擎,用于大规模数据处理。
    ·它将计算任务分成两个阶段:Map 阶段和 Reduce 阶段。Map 阶段处理输入数据并生成中间结果,Reduce 阶段汇总中间结果并生成最终输出。
    ·MapReduce 编程模型易于扩展,可以在数千个节点上并行处理数据。

3. Yet Another Resource Negotiator (YARN):


    ·YARN 是 Hadoop 的资源管理层,用于集群资源的管理和调度。
    ·它分离了资源管理和作业调度,提供了更好的集群资源利用率和灵活性。
    ·YARN 允许多种数据处理框架(如 MapReduce、Spark、Tez)在同一个 Hadoop 集群上运行。

4. Hadoop Common:


    ·Hadoop Common 包含 Hadoop 框架使用的通用实用工具和库,支持其他 Hadoop 模块的开发和操作。
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/a6f7ea0bc56407544a42e58a9fffef21..png


2、Hive

Apache Hive 是一个基于 Hadoop 的数据仓库软件,用于处理和查询存储在 Hadoop 分布式文件系统(HDFS)中的大规模数据集。它提供了一种类似 SQL 的查询语言,称为 HiveQL,用于数据分析和处理。

1. 简介


    ·Hive 由 Facebook 开发,随后成为 Apache 软件基金会的顶级项目。它旨在让熟悉 SQL 的用户能够轻松在 Hadoop 上进行数据处理,而不需要编写复杂的 MapReduce 代码。

2. 工作原理


    ·查询解析:用户通过 CLI、Web UI 或 Thrift 服务提交 HiveQL 查询。
    ·查询编译:查询由 Driver 解析,并由 Compiler 编译为执行计划。
    ·优化执行计划:执行计划经过优化,以提高查询效率。
    ·执行查询:优化后的执行计划提交给执行引擎(如 MapReduce、Tez 或 Spark)执行。
    ·返回结果:执行结果通过 Driver 返回给用户。

3. 使用场景

    ·数据仓库:Hive 主要用于构建数据仓库,支持复杂的数据分析和处理任务。

    ·数据分析:Hive 可以进行大规模数据分析,适用于日志分析、业务报告等。
    ·ETL 处理:Hive 支持数据的抽取、转换和加载(ETL)过程,可以将不同来源的数据整合并进行处理。


http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/697817d975df12c384b590827c78575d..png

3、HBase

Apache HBase 是一个开源的、分布式的、面向列的数据库,运行在 Hadoop 之上。它旨在提供对大规模结构化数据的随机、实时读写访问,类似于 Google 的 Bigtable。

HBase 是一个非关系型数据库(NoSQL),使用 HDFS 作为底层存储,适用于存储和处理大规模的稀疏表。它为 Hadoop 提供了一个高可靠性、高性能、高伸缩性的数据库服务。

1. 工作原理


    1.数据写入:数据首先写入内存中的 MemStore,然后异步地写入 HDFS 中的 HFile。数据写入时也会记录到 Write-Ahead Log(WAL)以保证数据的可靠性。
    2.数据读取:数据从内存(MemStore)和 HDFS 中的 HFile 读取,读取时会根据行键定位到相应的 RegionServer,再由 RegionServer 进行数据检索。
    3.分区管理:当表的数据量增加到一定程度时,Region 会进行分裂(Split),从而将数据分布到更多的 RegionServer 上以均衡负载。

2. 使用场景


    ·实时分析:适用于需要低延迟、大吞吐量的数据访问场景,如实时数据分析和处理。
    ·数据存储:用于存储大规模的结构化和半结构化数据,如物联网数据、日志数据等。
    ·社交网络:管理社交网络数据,处理用户关系和消息流。
    ·时序数据:存储和查询时序数据,如监控数据、传感器数据等。

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/6a9079d40ff60c09a1d7e6930a81c1c1..png



4、Hadoop在大数据处理中的应用场景

1. 数据存储与管理

Hadoop 分布式文件系统 (HDFS) 提供了一个高可靠性、高可扩展性和高容错的数据存储系统,适用于存储海量数据。

    ·海量数据存储:能够存储和管理 PB 级的数据,适用于需要存储大规模、结构化和非结构化数据的应用场景,如企业日志、社交媒体数据、传感器数据等。
    ·分布式数据管理:利用 HDFS 可以将数据分布在集群中的多台机器上,提高数据存储和管理的效率和可靠性。

2. 数据处理与分析

MapReduce 是 Hadoop 的核心组件之一,提供了一种分布式数据处理模型,适用于大规模数据处理和分析。

    ·大数据处理:能够处理 TB 级到 PB 级的数据,广泛应用于大数据分析、数据挖掘、机器学习等领域。
    ·批处理:MapReduce 适合处理需要批量处理的任务,如日志处理、网页索引、图像处理等。


http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/e327962470b25547b0337ddf0f336e80..png



3. 数据仓库与查询

Hive 提供了一个基于 Hadoop 的数据仓库解决方案,可以使用类似 SQL 的查询语言 (HiveQL) 对存储在 HDFS 上的数据进行查询和分析。

    ·数据仓库:适用于构建大规模数据仓库,用于存储和管理企业的大量历史数据。
    ·数据查询和分析:用户可以使用 HiveQL 进行复杂的数据查询和分析,而无需了解底层的 MapReduce 实现。

4. 流数据处理

Hadoop 与流处理框架(如 Apache Storm、Apache Flink)集成,提供了实时数据处理能力。

    ·实时数据处理:适用于需要实时处理和分析的数据场景,如实时日志分析、在线交易监控、社交媒体数据分析等。
    ·事件驱动处理:处理实时流数据中的事件,能够快速响应数据变化和事件触发。

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/0adeed5b8c679832dd72f37c24d12eb2..png



四、Kafka与Hadoop集成的必要性

1、集成的优势

1. 实时与离线处理结合:


    ·混合处理架构:通过将 Kafka 和 Hadoop 集成,可以构建一个既能处理实时数据流,又能进行批量数据分析的混合处理架构。Kafka 负责实时数据流的处理,Hadoop 负责离线数据的存储和批处理。
    ·数据流的持久化:实时数据流通过 Kafka 进入系统后,可以定期将数据导入 Hadoop 中进行持久化和深度分析。

2. 扩展性和灵活性:


    ·高扩展性:Kafka 和 Hadoop 都具有高扩展性,能够处理大规模的数据。Kafka 可以处理百万级别的消息吞吐量,Hadoop 可以处理 PB 级别的数据存储和分析。
    ·灵活的数据管道:通过 Kafka,可以灵活地构建数据管道,将数据从生产者传输到消费者,并最终导入 Hadoop 中进行处理。

3. 简化数据集成和管理:


    ·统一的数据平台:集成 Kafka 和 Hadoop 可以构建一个统一的数据平台,简化数据的管理和集成。Kafka 负责实时数据的传输,Hadoop 负责数据的存储和批处理,形成一个完整的数据处理链条。
    ·数据一致性:通过统一的数据平台,可以确保数据的一致性和完整性,从而提高数据处理的效率和质量。


http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/e1ad58bf2aab9be5a045eecf12610a18..png




2、实际应用场景

1. 日志收集和分析:


   · 实时日志收集:通过 Kafka 收集各类系统日志、应用日志等,实现实时的日志监控和处理。
   · 批量日志分析:将日志数据定期导入 Hadoop 中,通过 MapReduce 或者 Hive 进行离线的日志分析,挖掘日志中的有价值信息。

2. 实时数据分析:


    ·实时流处理:使用 Kafka 和流处理框架(如 Apache Storm、Apache Flink)进行实时数据分析和处理,如实时报警、实时推荐等。
    ·离线数据存储和分析:将实时处理后的数据存储在 HDFS 中,通过 Hadoop 进行离线的数据分析和挖掘,提供长期的历史数据分析能力。


http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/5d1d2a78ef21703cb8af2327f74e26ed..png



五、Kafka与Hadoop集成案例

1、使用Logstash从Kafka到Hadoop的数据传输

使用 Logstash 从 Kafka 到 Hadoop 的数据传输是一个常见的场景,能够实现实时数据流处理和批量数据存储分析的结合。下面是本次案例的流量示意图


http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/5971649a1ae23e5d1aa5cbaa8dadd7ba..png



1. 环境准备

安装 Logstash:确保在你的系统上已经安装了 Logstash。

    ·如果还未安装,请参考《Logstash 的三种安装部署方式(YUM、二进制、Docker)详解》这篇文章
Kafka 集群:确保Kafka 集群正常运行,并且能够从中消费消息。

    ·如果还未搭建Kafka集群,请参考《Linux平台Kafka高可用集群部署全攻略》这篇文章
Hadoop 环境:确保 HDFS 正常运行,可以将数据写入 HDFS。

    ·如果还未搭建Hadoop环境,请参考《【Hadoop】集群搭建实战:超详细保姆级教程》这篇文章

2. 创建topic

登录到Kafka服务器,进入到Kafka的bin目录下,输入如下命令创建一个topic
./kafka-topics.sh --bootstrap-server kafka0:9092 --topic test  --create

3. 创建logstash配置文件

进入到logstash的conf目录下,创建配置文件 test.conf

input{
kafka {
bootstrap_servers => "192.168.40.100:9092,192.168.40.101:9092,192.168.40.102:9092"
topics => ["test"]
group_id => "logstashGroup"
codec => "json"
}
}

output {
hdfs {
path => "hdfs://192.168.40.130:8020/user/logs/%{+YYYY}/%{+MM}/%{+dd}/%{+HH}/logstash-%{+YYYY-MM-dd-HH}.log"
codec => "json_lines"
idle_flush_time => 10
}
}

    ·bootstrap_servers:指定 Kafka 集群的地址和端口。Logstash 会连接到这些 Kafka broker 来消费数据。这里提供了三个 Kafka broker 的地址,确保 Kafka 集群的高可用性。

    ·topics:指定要消费的 Kafka 主题。在此配置中,test 是要从 Kafka 消费的主题名。Logstash 会从此主题获取消息。
    ·group_id:定义 Kafka 消费者组 ID。多个 Logstash 实例如果使用相同的 group_id,它们将共享消费数据,从而避免重复消费。每个消费者组中的每个消费者会处理 Kafka 分区中的一部分数据。
    ·codec:json 编解码器指定 Logstash 会把接收到的 Kafka 消息解析为 JSON 格式。这意味着 Kafka 中的每条消息必须是有效的 JSON 格式数据。
    ·path:定义了输出日志的路径。这个路径指向 Hadoop HDFS 的一个位置,其中的 %{+YYYY}, %{+MM}, %{+dd}, %{+HH} 是动态替换的时间格式标记。日志文件会根据当前日期和小时被分区和命名。例如,文件路径可能是 hdfs://192.168.40.130:8020/user/logs/2024/11/18/14/logstash-2024-11-18-14.log,每小时生成一个日志文件。
    ·hdfs://192.168.40.130:8020 是 HDFS 的 URI 和端口。
    ·%{+YYYY}, %{+MM}, %{+dd}, %{+HH} 是时间格式化标记,Logstash 会根据事件发生的时间动态填充这些字段。
    ·codec:json_lines 指定输出数据的编码格式为 JSON 行格式(每一行是一个独立的 JSON 对象)。这种格式适合处理大规模的日志文件,因为它支持逐行解析和处理。
    ·idle_flush_time:指定 Logstash 在没有新数据到来时,等待的时间(以秒为单位)在将数据写入 HDFS 前进行刷新。这里设置为 10 秒,意味着如果 10 秒内没有新的数据,Logstash 会将已经积累的数据写入 HDFS。


http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/8188ba5c261136a3b06e1636d2219ef2..png




4. 安装HDFS插件

进入到logstash的bin目录下,执行如下命令安装hdfs插件
./logstash-plugin install logstash-output-hdfs

5. 启动logstash

进入到logstash的bin目录下,执行如下命令启动logstash

./logstash -f /opt/logstash/config/test.conf

2、Apache Spark作为中间层:从Kafka读取数据并写入Hadoop

使用 Apache Spark 作为中间层,从 Kafka 读取数据并将其写入 Hadoop,是处理大规模数据流的常见架构。这种方法通常用于实时数据处理和存储,尤其适用于需要高吞吐量和低延迟的数据流平台。

1. 从 Kafka 读取数据

Spark 提供了一个专门的连接器,可以从 Kafka 读取实时流数据。你可以使用 Structured Streaming 来读取 Kafka 中的数据流,并将其转化为 Spark 的数据帧(DataFrame)或数据集(Dataset)。配置 Kafka 读取源,以下是一个python示例
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr

# 创建 SparkSession
spark = SparkSession.builder \
.appName("KafkaToHadoop") \
.getOrCreate()

# 从 Kafka 中读取数据
kafka_df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "your_topic") \
.load()

# 转换数据
kafka_df = kafka_df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

在上述代码中,readStream 使得我们可以从 Kafka 消费消息流,并通过 option("subscribe", "your_topic") 来指定要订阅的 Kafka 主题。


2. 数据处理

在 Spark 中,我们可以对从 Kafka 获取的数据流进行实时处理。Spark 提供了强大的数据处理能力,可以对流数据进行清洗、转换、聚合等操作。
processed_df = kafka_df.select("key", "value")
# 可以进行更多的数据转换或处理

3. 写入 Hadoop HDFS

Spark 也提供了对 Hadoop HDFS 的支持,可以将处理后的数据写入 HDFS。我们可以选择批处理模式或流处理模式,根据需求来选择合适的方式。

配置写入 HDFS
processed_df \
.writeStream \
.outputMode("append") \
.format("parquet") \
.option("checkpointLocation", "/path/to/checkpoint") \
.option("path", "hdfs://namenode_host:8020/user/hadoop/logs/") \
.start()

在这里:
   · outputMode("append") 表示数据是以追加的方式写入 HDFS。
   · 使用 Parquet 格式是因为它支持高效的数据压缩和列式存储,非常适合大数据处理。
   · checkpointLocation 是存储流处理检查点信息的路径,确保数据的一致性和容错。


http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/134e9b824040cc4810e6605481ba304d..png


本文系转载,版权归原作者所有,如若侵权请联系我们进行删除!

《数据资产管理白皮书》下载地址:https://www.dtstack.com/resources/1073/?src=bbs

《行业指标体系白皮书》下载地址:https://www.dtstack.com/resources/1057/?src=bbs

《数据治理行业实践白皮书》下载地址:https://www.dtstack.com/resources/1001/?src=bbs

《数栈V6.0产品白皮书》下载地址:https://www.dtstack.com/resources/1004/?src=bbs

想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:https://www.dtstack.com/?src=bbs

同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术群」,交流最新开源技术信息,群号码:30537511,项目地址:https://github.com/DTStack

0条评论
社区公告
  • 大数据领域最专业的产品&技术交流社区,专注于探讨与分享大数据领域有趣又火热的信息,专业又专注的数据人园地

最新活动更多
微信扫码获取数字化转型资料
钉钉扫码加入技术交流群