一、模块开发——数据预处理
1、分析预处理的数据
在收集的日志文件中,通常情况下,不能直接将日志文件进行数据分析,这是因为日志文件中有许多不合法的数据(比如日志数据在网络传输过程中发送数据丢失)。
在数据预处理阶段,主要目的就是对收集的原始数据进行清洗和筛选,因此使用MapReduce 技术就可以轻松实现。在实际开发中,数据预处理过程通常不会直接将不合法的数据直接删除,而是对每条数据添加标识字段,从而避免其他业务使用时丢失数据。
另外,此次数据预处理只是清洗和筛选不合法的数据信息,会读取每行日志文件数据并最终输出一条数据,不会进行其他操作,因此在使用MapReduce技术进行处理过程中,只会涉及 Map 阶段,不会涉及Reduce 阶段。在默认情况下,ReduceTask值为1,因此在主运行函数中,需要设置 Job.setNumReduceTasks(0)。
2、实现数据的预处理
(1)创建Maven项目,添加相关依赖
pom.xml文件配置如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cn.itcast</groupId>
<artifactId>HadoopDataReport</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.10.1</version>
</dependency>
</dependencies>
</project>
(2)创建JavaBean对象,封装日志记录
收集的日志数据中,每一行代表一条日志记录,并且包含有多个用空格分隔的字段信息,为了方便后续数据处理,创建一个 JavaBean 对象对每条数据进行封装。
WebLogBean.java
package cn.itcast.weblog.bean;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
* 对接外部数据的层,表结构定义最好跟外部数据源保持一致
* 同时实现序列号,方便网络数据传播
*/
public class WebLogBean implements Writable {
private boolean valid = true; //标记数据是否合法
private String remote_addr; //访客IP地址
private String remote_user; //记录访客用户信息,忽略属性“-”
private String time_local; //记录访问时间与时区
private String request; //记录请求的URL
private String status; //请求状态
private String body_bytes_sent; //记录发送给客户端文件主体内容大小
private String http_referer; //记录从哪个页面链接访问过来的
private String http_user_agent; //记录客户浏览器的相关信息
//设置 WebLogBean 进行字段数据封装
public void setBean(boolean valid, String remote_addr, String remote_user,
String time_local, String request, String status,
String body_bytes_sent,
String http_referer, String http_user_agent) {
this.valid = valid;
this.remote_addr = remote_addr;
this.remote_user = remote_user;
this.time_local = time_local;
this.request = request;
this.status = status;
this.body_bytes_sent = body_bytes_sent;
this.http_referer = http_referer;
this.http_user_agent = http_user_agent;
}
public boolean isValid() {
return valid;
}
public void setValid(boolean valid) {
this.valid = valid;
}
public String getRemote_addr() {
return remote_addr;
}
public void setRemote_addr(String remote_addr) {
this.remote_addr = remote_addr;
}
public String getRemote_user() {
return remote_user;
}
public void setRemote_user(String remote_user) {
this.remote_user = remote_user;
}
public String getTime_local() {
return time_local;
}
public void setTime_local(String time_local) {
this.time_local = time_local;
}
public String getRequest() {
return request;
}
public void setRequest(String request) {
this.request = request;
}
public String getStatus() {
return status;
}
public void setStatus(String status) {
this.status = status;
}
public String getBody_bytes_sent() {
return body_bytes_sent;
}
public void setBody_bytes_sent(String body_bytes_sent) {
this.body_bytes_sent = body_bytes_sent;
}
public String getHttp_referer() {
return http_referer;
}
public void setHttp_referer(String http_referer) {
this.http_referer = http_referer;
}
public String getHttp_user_agent() {
return http_user_agent;
}
public void setHttp_user_agent(String http_user_agent) {
this.http_user_agent = http_user_agent;
}
//重写 toString() 方法,使用 Hive 默认分隔符进行分隔,为后续导入 Hive 表提供便利
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(valid);
sb.append("\001").append(this.getRemote_addr());
sb.append("\001").append(this.getRemote_user());
sb.append("\001").append(this.getTime_local());
sb.append("\001").append(this.getRequest());
sb.append("\001").append(this.getStatus());
sb.append("\001").append(this.getBody_bytes_sent());
sb.append("\001").append(this.getHttp_referer());
sb.append("\001").append(this.getHttp_user_agent());
return sb.toString();
}
//序列化方法
@Override
public void readFields(DataInput dataInput) throws IOException {
this.valid = dataInput.readBoolean();
this.remote_addr = dataInput.readUTF();
this.remote_user = dataInput.readUTF();
this.time_local = dataInput.readUTF();
this.request = dataInput.readUTF();
this.status = dataInput.readUTF();
this.body_bytes_sent = dataInput.readUTF();
this.http_referer = dataInput.readUTF();
this.http_user_agent = dataInput.readUTF();
}
// 反序列化方法(注意与序列化方法顺序保持一致)
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeBoolean(this.valid);
dataOutput.writeUTF(this.remote_addr);
dataOutput.writeUTF(this.remote_user);
dataOutput.writeUTF(this.time_local);
dataOutput.writeUTF(this.request);
dataOutput.writeUTF(this.status);
dataOutput.writeUTF(this.body_bytes_sent);
dataOutput.writeUTF(this.http_referer);
dataOutput.writeUTF(this.http_user_agent);
}
}
(3)创建MapReduce程序,执行数据预处理
创建 JavaBean 实体类后,接下来开始编写 MapReduce 程序,进行数据预处理。
WebLogPreProcess.java
package cn.itcast.weblog.preproces;
import cn.itcast.weblog.bean.WebLogBean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
/**
* 日志数据处理:数据清洗、日期格式转换、缺失字段填充默认值、字段添加合法标记
*/
public class WebLogPreProcess {
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(WebLogPreProcess.class);
job.setMapperClass(WebLogPreProcessMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
//此次案例测试数据不是非常大,所以使用本地路径
//(实际情况会对 HDFS 上存储的文件进行处理)
FileInputFormat.setInputPaths(job, new Path("/home/huanganchi/Hadoop/实训项目/HadoopDemo/textHadoop/weblog/input"));
FileOutputFormat.setOutputPath(job, new Path("/home/huanganchi/Hadoop/实训项目/HadoopDemo/textHadoop/weblog/output"));
//将 ReduceTask 属设置为 0,不需要 Reduce 阶段
job.setNumReduceTasks(0);
boolean res = job.waitForCompletion(true);
System.exit(res ? 0 : 1);
}
//mapreduce 程序 map 阶段
public static class WebLogPreProcessMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
//用来存储网站 URL 分类数据
Set<String> pages = new HashSet<String>();
Text k = new Text();
NullWritable v = NullWritable.get();
/**
* 设置初始化方法, 用来表示用户请求的是合法数据
*/
@Override
protected void setup(Context context) throws IOException, InterruptedException {
pages.add("/about");
pages.add("/black-ip-list");
pages.add("/cassendra-cluster/");
pages.add("/finance-rhive-repurchase");
pages.add("/hadoop-family-roadmad");
pages.add("/hadoop-hive-intro/");
pages.add("/hadoop-zookeeper-intro");
pages.add("/hadoop-mahout-roadmap");
}
/**
* 重写 map()方法,对每行记录重新解析转换并输出
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//获取一行数据
String line = value.toString();
//调用解析类 WebLogParser 解析日志数据,最后封装为 WebLogBean 对象
WebLogBean webLogBean = WebLogParser.parser(line);
if (webLogBean != null) {
//过滤 js/图片/css 等静态资源
WebLogParser.filtStaticResource(webLogBean, pages);
k.set(webLogBean.toString());
context.write(k, v);
}
}
}
}
WebLogParser.java
package cn.itcast.weblog.preproces;
import cn.itcast.weblog.bean.WebLogBean;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Locale;
import java.util.Set;
public class WebLogParser {
//定义时间格式
public static SimpleDateFormat df1 = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss", Locale.US);
public static SimpleDateFormat df2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US);
/**
* 根据采集的数据字段信息进行解析封装
*/
public static WebLogBean parser(String line) {
WebLogBean webLogBean = new WebLogBean();
//把一行数据以空格字符切割并存入数组arr中
String[] arr = line.split(" ");
//如果数组长度小于等于11,说明这条数据不完整,因此可以忽略这条数据
if (arr.length > 11) {
//满足条件的数据逐个赋值给webLogBean对象
webLogBean.setRemote_addr(arr[0]);
webLogBean.setRemote_user(arr[1]);
String time_local = formatDate(arr[3].substring(1));
if (null == time_local || "".equals(time_local)) time_local = "-";
webLogBean.setTime_local(time_local);
webLogBean.setRequest(arr[6]);
webLogBean.setStatus(arr[8]);
webLogBean.setBody_bytes_sent(arr[9]);
webLogBean.setHttp_referer(arr[10]);
//如果useragent元素较多,拼接useragent
if (arr.length > 12) {
StringBuilder sb = new StringBuilder();
for (int i = 11; i < arr.length; i++) {
sb.append(arr[i]);
}
webLogBean.setHttp_user_agent(sb.toString());
} else {
webLogBean.setHttp_user_agent(arr[11]);
}
//大于 400,HTTP 错误
if (Integer.parseInt(webLogBean.getStatus()) >= 400) {
webLogBean.setValid(false);
}
if ("-".equals(webLogBean.getTime_local())) {
webLogBean.setValid(false);
}
} else {
webLogBean = null;
}
return webLogBean;
}
//对请求路径资源是否合法进行标记
public static void filtStaticResource(WebLogBean bean, Set<String> pages) {
if (!pages.contains(bean.getRequest())) {
bean.setValid(false);
}
}
//格式化时间方法
public static String formatDate(String time_local) {
try {
return df2.format(df1.parse(time_local));
} catch (ParseException e) {
return null;
}
}
}
运行结果:
二、模块开发——数据仓库开发
1、上传文件
在启动了Hadoop的Linux系统root目录下创建目录weblog,并将预处理产生的结果文件上传到 weblog 目录下。
cd
mkdir weblog
cd weblog
执行 rz 文件上传命令
在HDFS上创建目录,用于存放预处理过的数据,并上传数据到HDFS。
hadoop fs -mkdir -p /weblog/preprocessed
hadoop fs -put part-m-00000 /weblog/preprocessed
2、实现数据仓库
启动Hive数据仓库,执行以下操作:
--创建数据仓库
DROP DATABASE IF EXISTS weblog;
CREATE DATABASE weblog;
USE weblog;
--创建表
CREATE TABLE ods_weblog_origin (
valid string, --有效标志
remote_addr string, --来源IP
remote_user string, --用户标志
time_local string, --访问完整时间
request string, --请求的URL
status string, --响应码
body_bytes_sent string, --传输字节数
http_referer string, --来源URL
http_user_agent string --客户终端标志
)
partitioned by (datestr string)
row format delimited fields terminated by '\001';
--导入数据
load data inpath '/weblog/preprocessed' overwrite into table ods_weblog_origin partition(datestr='20130918');
--生成明细表
--1. 创建明细表 ods_weblog_detwail
CREATE TABLE ods_weblog_detwail (
valid string, --有效标志
remote_addr string, --来源IP
remote_user string, --用户标志
time_local string, --访问完整时间
daystr string, --访问日期
timestr string, --访问时间
month string, --访问月
day string, --访问日
hour string, --访问时
request string, --请求的URL
status string, --响应码
body_bytes_sent string, --传输字节数
http_referer string, --来源URL
ref_host string, --来源的host
ref_path string, --来源路径
ref_query string, --来源参数query
ref_query_id string, --来源参数query的值
http_user_agent string --客户终端标志
)
partitioned by (datestr string);
--2. 创建临时中间表 t_ods_tmp_referurl
CREATE TABLE t_ods_tmp_referurl as SELECT a.*, b.*
FROM ods_weblog_origin a LATERAL VIEW
parse_url_tuple(regexp_replace(http_referer, "\"", ""),'HOST', 'PATH', 'QUERY', 'QUERY:id') b
as host, path, query, query_id;
--3. 创建临时中间表 t_ods_tmp_detail
CREATE TABLE t_ods_tmp_detail as
SELECT b.*, substring(time_local, 0, 10) as daystr,
substring(time_local, 12) as tmstr,
substring(time_local, 6, 2) as month,
substring(time_local, 9, 2) as day,
substring(time_local, 11, 3) as hour
FROM t_ods_tmp_referurl b;
--4. 修改默认动态分区参数
set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
--5. 向 ods_weblog_detwail 表中加载数据
insert overwrite table ods_weblog_detwail partition(datestr)
SELECT DISTINCT otd.valid, otd.remote_addr, otd.remote_user,
otd.time_local, otd.daystr, otd.tmstr, otd.month, otd.day, otd.hour,
otr.request, otr.status, otr.body_bytes_sent,
otr.http_referer, otr.host, otr.path,
otr.query, otr.query_id, otr.http_user_agent, otd.daystr
FROM t_ods_tmp_detail as otd, t_ods_tmp_referurl as otr
WHERE otd.remote_addr = otr.remote_addr
AND otd.time_local = otr.time_local
AND otd.body_bytes_sent = otr.body_bytes_sent
AND otd.request = otr.request;
三、模块开发——数据分析
--数据分析
--流量分析
--创建每日访问量表dw_pvs_everyday
CREATE TABLE IF NOT EXISTS dw_pvs_everyday(pvs bigint, month string, day string);
--从宽表 ods_weblog_detwail 获取每日访问量数据并插入维度表 dw_pvs_everyday
INSERT INTO TABLE dw_pvs_everyday
SELECT COUNT(*) AS pvs, owd.month AS month, owd.day AS day
FROM ods_weblog_detwail owd GROUP BY owd.month, owd.day;
--人均浏览量分析
--创建维度表dw_avgpv_user_everyday
CREATE TABLE IF NOT EXISTS dw_avgpv_user_everyday (day string, avgpv string);
--从宽表 ods_weblog_detwail 获取相关数据并插入维度表 dw_avgpv_user_everyday
INSERT INTO TABLE dw_avgpv_user_everyday
SELECT '2013-09-18', SUM(b.pvs)/COUNT(b.remote_addr) FROM
(SELECT remote_addr, COUNT(1) AS pvs FROM ods_weblog_detwail WHERE
datestr = '2013-09-18' GROUP by remote_addr) b;
INSERT INTO TABLE dw_avgpv_user_everyday
SELECT '2013-09-19', SUM(b.pvs)/COUNT(b.remote_addr) FROM
(SELECT remote_addr, COUNT(1) AS pvs FROM ods_weblog_detwail WHERE
datestr = '2013-09-19' GROUP by remote_addr) b;
INSERT INTO TABLE dw_avgpv_user_everyday
SELECT '2013-09-20', SUM(b.pvs)/COUNT(b.remote_addr) FROM
(SELECT remote_addr, COUNT(1) AS pvs FROM ods_weblog_detwail WHERE
datestr = '2013-09-20' GROUP by remote_addr) b;
INSERT INTO TABLE dw_avgpv_user_everyday
SELECT '2013-09-21', SUM(b.pvs)/COUNT(b.remote_addr) FROM
(SELECT remote_addr, COUNT(1) AS pvs FROM ods_weblog_detwail WHERE
datestr = '2013-09-21' GROUP by remote_addr) b;
INSERT INTO TABLE dw_avgpv_user_everyday
SELECT '2013-09-22', SUM(b.pvs)/COUNT(b.remote_addr) FROM
(SELECT remote_addr, COUNT(1) AS pvs FROM ods_weblog_detwail WHERE
datestr = '2013-09-22' GROUP by remote_addr) b;
INSERT INTO TABLE dw_avgpv_user_everyday
SELECT '2013-09-23', SUM(b.pvs)/COUNT(b.remote_addr) FROM
(SELECT remote_addr, COUNT(1) AS pvs FROM ods_weblog_detwail WHERE
datestr = '2013-09-23' GROUP by remote_addr) b;
INSERT INTO TABLE dw_avgpv_user_everyday
SELECT '2013-09-24', SUM(b.pvs)/COUNT(b.remote_addr) FROM
(SELECT remote_addr, COUNT(1) AS pvs FROM ods_weblog_detwail WHERE
datestr = '2013-09-24' GROUP by remote_addr) b;
四、模块开发——数据导出
1. 创建 MySql 数据库和表
--数据导出
--创建数据仓库
DROP DATABASE IF EXISTS sqoopdb;
CREATE DATABASE sqoopdb;
USE sqoopdb;
--创建表
CREATE TABLE t_avgpv_num (
dateStr VARCHAR(255) DEFAULT NULL,
avgPvNum DECIMAL(6,2) DEFAULT NULL
) ENGINE=MyISAM DEFAULT CHARSET=utf8;
2. 执行数据导出命令
sqoop export \
--connect jdbc:mysql://hadoop01.bgd01:3306/sqoopdb \
--username root \
--password 123456 \
--table t_avgpv_num \
--columns "dateStr,avgPvNum" \
--fields-terminated-by '\001' \
--export-dir /user/hive/warehouse/weblog.db/dw_avgpv_user_everyday
免责申明:
本文系转载,版权归原作者所有,如若侵权请联系我们进行删除!
《数据治理行业实践白皮书》下载地址:https://fs80.cn/4w2atu
《数栈V6.0产品白皮书》下载地址:https://fs80.cn/cw0iw1
想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:https://www.dtstack.com/?src=bbs
同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术群」,交流最新开源技术信息,群号码:30537511,项目地址:https://github.com/DTStack