Sun’s Blog-专注于阅读资料

本文主要介绍Sun’s Blog-专注于阅读资料 方法和在新技术下所面对的“挑战”,方便大家深入理解Sun’s Blog-专注于阅读资料 过程。本文也将分享Sun’s Blog-专注于阅读资料 所遇到的问题和应对策略,怎么解决怎么做的问题。
通过深入本文可以理解代码原理,进行代码文档的下载,也可以查看相应 Demo 部署效果。

在企业开发中,Hadoop框架自带的InputFormat类型不能满足所有应用场景,需要自定义InputFormat来解决实际问题。
自定义InputFormat步骤如下:

  • (1)自定义一个类继承FilelnputFormat
  • (2)自定义一个类继承RecordReader,实现一次读取一个完整文件,将文件名为key,文件内容为value。
  • (3)在输出时使用SequenceFileOutPutFormat输出合并文件。

无论HDFS还是MapReduce,在处理小文件时效率都非常低,但又难免面临处理大量小文件的场景,此时,就需要有相应解决方案。可以自定义InputFormat实现小文件的合并

1. 需求

将多个小文件合并成一个SequenceFile文件(SequenceFile文件是Hadoop用来存储二进制形式的key-value(bytes) 对的文件格式),SequenceFile里面存储着多个文件,存储的形式为文件路径+名称为key,文件内容为value

(1)输入数据
Sun's Blog-专注于阅读
(2)期望输出文件格式
Sun's Blog-专注于阅读

2. 需求分析

  1. 自定义一个类继承FileInputFormat
    (1)重写isSplitable()方法,返回false,让文件不可切,整个文件作为1片
    (2)重写createRecordReader(),返回自定义的RecordReader对象

  2. 自定义一个类继承RecordReader
    在RecordReader中,nextKeyValue()是最重要的方法,返回当前读取到的key-value,如果读到返回true,调用Mapper的map()来处理,否则返回false

3. 编写程序

MyInputFormat.java

/*  * 1. 改变切片策略,一个文件固定切1片,通过指定文件不可切  *   * 2. 提供RR ,这个RR读取切片的文件名作为key,读取切片的内容封装到bytes作为value  */ public class MyInputFormat extends FileInputFormat {  	@Override 	public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { 		return new MyRecordReader(); 	} 	 	@Override 	protected boolean isSplitable(JobContext context, Path filename) { 		return false; 	} } 

MyRecordReader.java

/*  * RecordReader从MapTask处理的当前切片中读取数据  *   * XXXContext都是Job的上下文,通过XXXContext可以获取Job的配置Configuration对象  */ public class MyRecordReader extends RecordReader { 	 	private Text key; 	private BytesWritable value; 	 	private String filename; 	private int length; 	 	private FileSystem fs; 	private Path path; 	 	private FSDataInputStream is; 	 	private boolean flag=true;  	// MyRecordReader在创建后,在进入Mapper的run()之前,自动调用 	// 文件的所有内容设置为1个切片,切片的长度等于文件的长度 	@Override 	public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {  		FileSplit fileSplit=(FileSplit) split; 		 		filename=fileSplit.getPath().getName(); 		 		length=(int) fileSplit.getLength(); 		 		path=fileSplit.getPath(); 		 		//获取当前Job的配置对象 		Configuration conf = context.getConfiguration(); 		 		//获取当前Job使用的文件系统 		fs=FileSystem.get(conf); 		 		 is = fs.open(path); 		 	}  	// 读取一组输入的key-value,读到返回true,否则返回false 	// 将文件的名称封装为key,将文件的内容封装为BytesWritable类型的value,返回true 	// 第二次调用nextKeyValue()返回false 	@Override 	public boolean nextKeyValue() throws IOException, InterruptedException { 		 		if (flag) { 			 			//实例化对象 			if (key==null) { 				key=new Text(); 			} 			 			if (value==null) { 				value=new BytesWritable(); 			} 			 			//赋值 			//将文件名封装到key中 			key.set(filename); 			 			// 将文件的内容读取到BytesWritable中 			byte [] content=new byte[length]; 			 			IOUtils.readFully(is, content, 0, length); 			 			value.set(content, 0, length); 			 			flag=false; 			 			return true; 			 		} 		return false; 	}  	//返回当前读取到的key-value中的key 	@Override 	public Object getCurrentKey() throws IOException, InterruptedException { 		return key; 	}  	//返回当前读取到的key-value中的value 	@Override 	public Object getCurrentValue() throws IOException, InterruptedException { 		return value; 	}  	//返回读取切片的进度 	@Override 	public float getProgress() throws IOException, InterruptedException { 		return 0; 	}  	// 在Mapper的输入关闭时调用,清理工作 	@Override 	public void close() throws IOException { 		if (is != null) { 			IOUtils.closeStream(is); 		}	 		if (fs !=null) { 			fs.close(); 		} 	} } 

CustomIFMapper.java

public class CustomIFMapper extends Mapper<Text, BytesWritable, Text, BytesWritable>{  } 

CustomIFReducer.java

public class CustomIFReducer extends Reducer<Text, BytesWritable, Text, BytesWritable>{  } 

CustomIFDriver.java

public class CustomIFDriver { 	 	public static void main(String[] args) throws Exception { 		 		Path inputPath=new Path("e:/mrinput/custom"); 		Path outputPath=new Path("e:/mroutput/custom"); 		 		//作为整个Job的配置 		Configuration conf = new Configuration(); 		//保证输出目录不存在 		FileSystem fs=FileSystem.get(conf); 		 		if (fs.exists(outputPath)) { 			fs.delete(outputPath, true); 		} 		 		// 创建Job 		Job job = Job.getInstance(conf);  		// 设置Job运行的Mapper,Reducer类型,Mapper,Reducer输出的key-value类型 		job.setMapperClass(CustomIFMapper.class); 		job.setReducerClass(CustomIFReducer.class); 		 		// Job需要根据Mapper和Reducer输出的Key-value类型准备序列化器,通过序列化器对输出的key-value进行序列化和反序列化 		// 如果Mapper和Reducer输出的Key-value类型一致,直接设置Job最终的输出类型 		job.setOutputKeyClass(Text.class); 		job.setOutputValueClass(BytesWritable.class); 		 		// 设置输入目录和输出目录 		FileInputFormat.setInputPaths(job, inputPath); 		FileOutputFormat.setOutputPath(job, outputPath); 		 		// 设置输入和输出格式 		job.setInputFormatClass(MyInputFormat.class); 		job.setOutputFormatClass(SequenceFileOutputFormat.class); 		 		// ③运行Job 		job.waitForCompletion(true); 			 	} } 

Sun’s Blog-专注于阅读资料部分资料来自网络,侵权毕设源码联系删除

区块链毕设网(www.qklbishe.com)全网最靠谱的原创区块链毕设代做网站
部分资料来自网络,侵权联系删除!
资源收费仅为搬运整理打赏费用,用户自愿支付 !
qklbishe.com区块链毕设代做网专注|以太坊fabric-计算机|java|毕业设计|代做平台 » Sun’s Blog-专注于阅读资料

提供最优质的资源集合

立即查看 了解详情