Sun’s Blog-专注于阅读资料
本文主要介绍Sun’s Blog-专注于阅读资料 方法和在新技术下所面对的“挑战”,方便大家深入理解Sun’s Blog-专注于阅读资料 过程。本文也将分享Sun’s Blog-专注于阅读资料 所遇到的问题和应对策略,怎么解决怎么做的问题。
通过深入本文可以理解代码原理,进行代码文档的下载,也可以查看相应 Demo 部署效果。
在企业开发中,Hadoop框架自带的
InputFormat
类型不能满足所有应用场景,需要自定义InputFormat来解决实际问题。
自定义InputFormat步骤如下:
- (1)自定义一个类继承
FilelnputFormat
。 - (2)自定义一个类继承
RecordReader
,实现一次读取一个完整文件,将文件名为key,文件内容为value。 - (3)在输出时使用
SequenceFileOutPutFormat
输出合并文件。
无论HDFS还是MapReduce,在处理小文件时效率都非常低,但又难免面临处理大量小文件的场景,此时,就需要有相应解决方案。可以自定义InputFormat实现小文件的合并。
1. 需求
将多个小文件合并成一个SequenceFile
文件(SequenceFile文件是Hadoop用来存储二进制形式的key-value(bytes) 对的文件格式),SequenceFile里面存储着多个文件,存储的形式为文件路径+名称为key,文件内容为value。
(1)输入数据
(2)期望输出文件格式
2. 需求分析
-
自定义一个类继承
FileInputFormat
(1)重写isSplitable()
方法,返回false
,让文件不可切,整个文件作为1片。
(2)重写createRecordReader(),返回自定义的RecordReader对象 -
自定义一个类继承
RecordReader
在RecordReader中,nextKeyValue
()是最重要的方法,返回当前读取到的key-value
,如果读到返回true
,调用Mapper的map()来处理,否则返回false
3. 编写程序
MyInputFormat.java
/* * 1. 改变切片策略,一个文件固定切1片,通过指定文件不可切 * * 2. 提供RR ,这个RR读取切片的文件名作为key,读取切片的内容封装到bytes作为value */ public class MyInputFormat extends FileInputFormat { @Override public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { return new MyRecordReader(); } @Override protected boolean isSplitable(JobContext context, Path filename) { return false; } }
MyRecordReader.java
/* * RecordReader从MapTask处理的当前切片中读取数据 * * XXXContext都是Job的上下文,通过XXXContext可以获取Job的配置Configuration对象 */ public class MyRecordReader extends RecordReader { private Text key; private BytesWritable value; private String filename; private int length; private FileSystem fs; private Path path; private FSDataInputStream is; private boolean flag=true; // MyRecordReader在创建后,在进入Mapper的run()之前,自动调用 // 文件的所有内容设置为1个切片,切片的长度等于文件的长度 @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { FileSplit fileSplit=(FileSplit) split; filename=fileSplit.getPath().getName(); length=(int) fileSplit.getLength(); path=fileSplit.getPath(); //获取当前Job的配置对象 Configuration conf = context.getConfiguration(); //获取当前Job使用的文件系统 fs=FileSystem.get(conf); is = fs.open(path); } // 读取一组输入的key-value,读到返回true,否则返回false // 将文件的名称封装为key,将文件的内容封装为BytesWritable类型的value,返回true // 第二次调用nextKeyValue()返回false @Override public boolean nextKeyValue() throws IOException, InterruptedException { if (flag) { //实例化对象 if (key==null) { key=new Text(); } if (value==null) { value=new BytesWritable(); } //赋值 //将文件名封装到key中 key.set(filename); // 将文件的内容读取到BytesWritable中 byte [] content=new byte[length]; IOUtils.readFully(is, content, 0, length); value.set(content, 0, length); flag=false; return true; } return false; } //返回当前读取到的key-value中的key @Override public Object getCurrentKey() throws IOException, InterruptedException { return key; } //返回当前读取到的key-value中的value @Override public Object getCurrentValue() throws IOException, InterruptedException { return value; } //返回读取切片的进度 @Override public float getProgress() throws IOException, InterruptedException { return 0; } // 在Mapper的输入关闭时调用,清理工作 @Override public void close() throws IOException { if (is != null) { IOUtils.closeStream(is); } if (fs !=null) { fs.close(); } } }
CustomIFMapper.java
public class CustomIFMapper extends Mapper<Text, BytesWritable, Text, BytesWritable>{ }
CustomIFReducer.java
public class CustomIFReducer extends Reducer<Text, BytesWritable, Text, BytesWritable>{ }
CustomIFDriver.java
public class CustomIFDriver { public static void main(String[] args) throws Exception { Path inputPath=new Path("e:/mrinput/custom"); Path outputPath=new Path("e:/mroutput/custom"); //作为整个Job的配置 Configuration conf = new Configuration(); //保证输出目录不存在 FileSystem fs=FileSystem.get(conf); if (fs.exists(outputPath)) { fs.delete(outputPath, true); } // 创建Job Job job = Job.getInstance(conf); // 设置Job运行的Mapper,Reducer类型,Mapper,Reducer输出的key-value类型 job.setMapperClass(CustomIFMapper.class); job.setReducerClass(CustomIFReducer.class); // Job需要根据Mapper和Reducer输出的Key-value类型准备序列化器,通过序列化器对输出的key-value进行序列化和反序列化 // 如果Mapper和Reducer输出的Key-value类型一致,直接设置Job最终的输出类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(BytesWritable.class); // 设置输入目录和输出目录 FileInputFormat.setInputPaths(job, inputPath); FileOutputFormat.setOutputPath(job, outputPath); // 设置输入和输出格式 job.setInputFormatClass(MyInputFormat.class); job.setOutputFormatClass(SequenceFileOutputFormat.class); // ③运行Job job.waitForCompletion(true); } }
Sun’s Blog-专注于阅读资料部分资料来自网络,侵权毕设源码联系删除
qklbishe.com区块链毕设代做网专注|以太坊fabric-计算机|java|毕业设计|代做平台 » Sun’s Blog-专注于阅读资料