博客 MapReduce基础入门7

MapReduce基础入门7

   数栈君   发表于 2023-07-07 11:16  192  0

1、MapReduce会将一个大的计算任务进行拆分,拆分成小任务,让这些小任务在不同的计算机中进行处理,最后再将这些小任务的结果记性整体汇总

2、MapReduce分为两个阶段,一个Map阶段负责任务的拆分,一个是Reduce阶段,负责任务的汇总

3、整个MapReduce工作流程可以分为3个阶段:map、shuffle、reduce。

作者这里用又一个简单的案例来说明如何用MapReduce实现大表和小表之间的Join操作。也称之为MapJoin

一、MapJoin是什么?
1、将小表存入分布式缓存中,然后把分布式缓存的小表数据读取到本地每一个MapTask的Map集合
2、将大表通过正常的MapReduce流程进行读取,然后和Map集合中的数据进行join
3、Map端join一般用于小表join大表,而且Map端join没有Reduce

二、使用步骤
1.1.数据准备
第一个商品表(小表)


第二个订单表(大表)

任务很简单,两个表都有商品ID这个字段,需求:通过商品ID将两个表关联起来。

2.Map阶段
通过重写setup方法将缓存中的小表放入map集合里,再用大表进行关联。

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.HashMap;

public class Mapper_demo extends Mapper<LongWritable, Text,Text,NullWritable> {

HashMap<String, String> goodsMap = new HashMap<>();
/**
* setup方法会在map方法执行之前先执行,而且只会执行一次,主要用来做初始化工作
* @param context
* @throws IOException
* @throws InterruptedException
*/
//将小表从分布式缓存中读取,存入Map集合
@Override
protected void setup(Context context) throws IOException, InterruptedException {
//1:获取分布式缓存中文件的输入流
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(new FileInputStream("itheima_goods.txt")));
String line = null;
while ((line = bufferedReader.readLine()) != null){
String[] array = line.split("\\|");

goodsMap.put(array[0], array[2]);
}

/*
{100101,四川果冻橙6个约180g/个}
{100102,鲜丰水果秭归脐橙中华红}
*/
}

@Override
protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException {
//1:得到K2
String[] array = value.toString().split("\\|");
String k2 = array[1];
String v2 = array[0] + "\t" + array[2];

//2:将K2和Map集合进行Join
String mapValue = goodsMap.get(k2);

context.write(new Text(v2 + "\t" + mapValue), NullWritable.get());
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
2.Driver运行入口
没有Reducer阶段,直接写Driver进行测试。
这里主要是将小表放入缓存中,大表正常读取数据,走map阶段。

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.net.URI;

public class Driver_demo {
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration, "Reducer_Join");

job.setJarByClass(Driver_demo.class);

FileInputFormat.addInputPath(job,new Path("hdfs://node1:8020/input/goods/itheima_order_goods.txt"));

job.setMapperClass(Mapper_demo.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);

//将小表放入分布式缓存
job.addCacheFile(new URI("hdfs://node1:8020/input/goods/itheima_goods.txt"));

//2.5指定输出路径
Path outPath = new Path("hdfs://node1:8020/output/goods_map_join");
FileOutputFormat.setOutputPath(job,outPath);

FileSystem fileSystem = FileSystem.get(new URI("hdfs://node1:8020"), new Configuration());
boolean exists = fileSystem.exists(outPath);
if (exists){
fileSystem.delete(outPath,true);
}

//3.提交yarn执行
boolean bl = job.waitForCompletion(true);

//退出
System.exit(bl ? 0 : 1);
}
}


免责申明:

本文系转载,版权归原作者所有,如若侵权请联系我们进行删除!

《数据治理行业实践白皮书》下载地址:https://fs80.cn/4w2atu

《数栈V6.0产品白皮书》下载地址:
https://fs80.cn/cw0iw1

想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:
https://www.dtstack.com/?src=bbs

同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术群」,交流最新开源技术信息,群号码:30537511,项目地址:
https://github.com/DTStack

0条评论
上一篇:Yarn概述
社区公告
  • 大数据领域最专业的产品&技术交流社区,专注于探讨与分享大数据领域有趣又火热的信息,专业又专注的数据人园地

最新活动更多
微信扫码获取数字化转型资料
钉钉扫码加入技术交流群