博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
大数据教程(9.6)map端join实现
阅读量:5942 次
发布时间:2019-06-19

本文共 6938 字,大约阅读时间需要 23 分钟。

hot3.png

        上一篇文章讲了mapreduce配合实现join,本节博主将讲述在map端的join实现;

        一、需求

               实现两个“表”的join操作,其中一个表数据量小,一个表很大,这种场景在实际中非常常见,比如“订单日志” join “产品信息”

        二、分析

               --原理阐述:适用于关联表中有小表的情形;可以将小表分发到所有的map节点,这样,map节点就可以在本地对自己所读到的大表数据进行join并输出最终结果,可以大大提高join操作的并发度,加快处理速度

               --示例:先在mapper类中预先定义好小表,进行join

               --并用distributedcache机制将小表的数据分发到每一个maptask执行节点,从而每一个maptask节点可以从本地加载到小表的数据,进而在本地即可实现join

        三、代码实现

package com.empire.hadoop.mr.mapsidejoin;import java.io.BufferedReader;import java.io.FileInputStream;import java.io.IOException;import java.io.InputStreamReader;import java.net.URI;import java.util.HashMap;import java.util.Map;import org.apache.commons.lang.StringUtils;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class MapSideJoin {    public static class MapSideJoinMapper extends Mapper
{ // 用一个hashmap来加载保存产品信息表 Map
pdInfoMap = new HashMap
(); Text k = new Text(); /** * 通过阅读父类Mapper的源码,发现 setup方法是在maptask处理数据之前调用一次 可以用来做一些初始化工作 */ @Override protected void setup(Context context) throws IOException, InterruptedException { BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("product.txt"))); String line; while (StringUtils.isNotEmpty(line = br.readLine())) { String[] fields = line.split("\t"); pdInfoMap.put(fields[0], fields[2]); } br.close(); } // 由于已经持有完整的产品信息表,所以在map方法中就能实现join逻辑了 @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String orderLine = value.toString(); String[] fields = orderLine.split("\t"); String pdName = pdInfoMap.get(fields[1]); k.set(orderLine + "\t" + pdName); context.write(k, NullWritable.get()); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(MapSideJoin.class); //job.setJar("D:/mapsidejoin.jar"); job.setMapperClass(MapSideJoinMapper.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 指定需要缓存一个文件到所有的maptask运行节点工作目录 /* job.addArchiveToClassPath(archive); */// 缓存jar包到task运行节点的classpath中 /* job.addFileToClassPath(file); */// 缓存普通文件到task运行节点的classpath中 /* job.addCacheArchive(uri); */// 缓存压缩包文件到task运行节点的工作目录 /* job.addCacheFile(uri) */// 缓存普通文件到task运行节点的工作目录 // 将产品表文件缓存到task工作节点的工作目录中去 //job.addCacheFile(new URI("file:/D:/srcdata/mapjoincache/pdts.txt")); job.addCacheFile(new URI("hdfs://centos-aaron-h1:9000/rjoin/mapjoincache/product.txt")); //map端join的逻辑不需要reduce阶段,设置reducetask数量为0 job.setNumReduceTasks(0); boolean res = job.waitForCompletion(true); System.exit(res ? 0 : 1); }}

        四、执行程序

               

#上传jarAlt+plcd d:/put  mapsidejoin.jar#准备hadoop处理的数据文件cd /home/hadoop/apps/hadoop-2.9.1hadoop fs  -mkdir -p /rjoin/mapjoinsideinputhadoop fs  -mkdir -p /rjoin/mapjoincachehdfs dfs -put  order.txt  /rjoin/mapjoinsideinputhdfs dfs -put  product.txt  /rjoin/mapjoincache#运行mapsidejoin程序hadoop jar mapsidejoin.jar  com.empire.hadoop.mr.mapsidejoin.MapSideJoin /rjoin/mapjoinsideinput /rjoin/mapjoinsideoutput

        五、运行效果

IPC Parameter Sending Thread #0] DEBUG org.apache.hadoop.ipc.Client - IPC Client (1880635722) connection to centos-aaron-h2/192.168.29.145:37772 from hadoop sending #87 org.apache.hadoop.mapreduce.v2.api.MRClientProtocolPB.getCounters[IPC Client (1880635722) connection to centos-aaron-h2/192.168.29.145:37772 from hadoop] DEBUG org.apache.hadoop.ipc.Client - IPC Client (1880635722) connection to centos-aaron-h2/192.168.29.145:37772 from hadoop got value #87[main] DEBUG org.apache.hadoop.ipc.ProtobufRpcEngine - Call: getCounters took 36ms[main] INFO org.apache.hadoop.mapreduce.Job - Counters: 30	File System Counters		FILE: Number of bytes read=0		FILE: Number of bytes written=189612		FILE: Number of read operations=0		FILE: Number of large read operations=0		FILE: Number of write operations=0		HDFS: Number of bytes read=218		HDFS: Number of bytes written=108		HDFS: Number of read operations=5		HDFS: Number of large read operations=0		HDFS: Number of write operations=2	Job Counters 		Launched map tasks=1		Data-local map tasks=1		Total time spent by all maps in occupied slots (ms)=3057		Total time spent by all reduces in occupied slots (ms)=0		Total time spent by all map tasks (ms)=3057		Total vcore-milliseconds taken by all map tasks=3057		Total megabyte-milliseconds taken by all map tasks=3130368	Map-Reduce Framework		Map input records=4		Map output records=4		Input split bytes=125		Spilled Records=0		Failed Shuffles=0		Merged Map outputs=0		GC time elapsed (ms)=99		CPU time spent (ms)=350		Physical memory (bytes) snapshot=117669888		Virtual memory (bytes) snapshot=845942784		Total committed heap usage (bytes)=16121856	File Input Format Counters 		Bytes Read=93	File Output Format Counters 		Bytes Written=108[main] DEBUG org.apache.hadoop.security.UserGroupInformation - PrivilegedAction as:hadoop (auth:SIMPLE) from:org.apache.hadoop.mapreduce.Job.updateStatus(Job.java:328)[IPC Parameter Sending Thread #0] DEBUG org.apache.hadoop.ipc.Client - IPC Client (1880635722) connection to centos-aaron-h2/192.168.29.145:37772 from hadoop sending #88 org.apache.hadoop.mapreduce.v2.api.MRClientProtocolPB.getJobReport[IPC Client (1880635722) connection to centos-aaron-h2/192.168.29.145:37772 from hadoop] DEBUG org.apache.hadoop.ipc.Client - IPC Client (1880635722) connection to centos-aaron-h2/192.168.29.145:37772 from hadoop got value #88[main] DEBUG org.apache.hadoop.ipc.ProtobufRpcEngine - Call: getJobReport took 0ms[pool-4-thread-1] DEBUG org.apache.hadoop.ipc.Client - stopping client from cache: org.apache.hadoop.ipc.Client@303c7016[Thread-3] DEBUG org.apache.hadoop.util.ShutdownHookManager - ShutdownHookManger complete shutdown.

            六、运行结果

[hadoop@centos-aaron-h1 ~]$  hdfs dfs -cat  /rjoin/mapjoinsideoutput/part-m-000001001    20150710        P0001   2       小米51002    20150710        P0001   3       小米51002    20150710        P0002   3       锤子T11003    20150710        P0003   3       锤子

            最后寄语,以上是博主本次文章的全部内容,如果大家觉得博主的文章还不错,请点赞;如果您对博主其它服务器大数据技术或者博主本人感兴趣,请关注博主博客,并且欢迎随时跟博主沟通交流。

转载于:https://my.oschina.net/u/2371923/blog/2987721

你可能感兴趣的文章
MyBatis学习总结(五)——实现关联表查询
查看>>
Centos 6 克隆导致网卡eth0变成eth1、及修改网卡名的方法
查看>>
好程序员三期Android高端班开班典礼-乱世看我称雄
查看>>
设计模式——单例模式
查看>>
mysql进程SHOW PROCESSLIST详解Command命令状态
查看>>
MyBatis学习总结(二)——使用MyBatis对表执行CRUD操作
查看>>
Can't connect to local MySQL server through socket '/tmp/mysql.sock' (2)
查看>>
安宫牛黄结转周期错误
查看>>
RabbitMQ学习总结(6)——消息的路由分发机制详解
查看>>
web.xml配置详解
查看>>
Windows phone 应用开发[12]-Pex 构建自动化白盒测试[下]
查看>>
Tomcat学习总结(2)——Tomcat使用详解
查看>>
让vim正确显示cfg文件中的xxx = yyy=zzz
查看>>
我的友情链接
查看>>
perl中的几个模块使用.
查看>>
zip文件格式说明
查看>>
Windows Server 2012 R2配置ISCSI磁盘共享盘(4)
查看>>
vc++修改软件程序菜单实例
查看>>
win2008r2下安装sql2008r2初版
查看>>
IBM确定公司未来存储技术发展方向
查看>>