本文共 3265 字,大约阅读时间需要 10 分钟。
MapReduce Java API实例-统计单词出现频率
在已实现单次单词频率统计的基础上,本文将详细说明如何处理多个数据集文件(如多个txt文件)的输入问题。
多文件输入的实现方法如下:
多文件输入可以通过Hadoop提供的MultipleInputs类的addInputPath方法来实现。这种方法允许将多个文件路径添加到MapReduce任务中,从而支持批量处理。
以下是完整的代码实现:
package com.badao.multinput;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;import java.util.StringTokenizer;public class MultInputMapper extends Mapper{ private Text word = new Text(); private IntWritable one = new IntWritable(1); @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer stringTokenizer = new StringTokenizer(value.toString()); while (stringTokenizer.hasMoreTokens()) { word.set(stringTokenizer.nextToken()); context.write(word, one); } }}
package com.badao.multinput;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class MultInputReducer extends Reducer{ private IntWritable result = new IntWritable(); @Override public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); }}
package com.badao.multinput;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class MultInputJob { public static void main(String[] args) throws InterruptedException, IOException, ClassNotFoundException { wordCountLocal(); } public static void wordCountLocal() throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "multinputwordcount"); job.setJarByClass(MultInputJob.class); job.setMapperClass(MultInputMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(MultInputReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); Path path1 = new Path("D:\\words.txt"); Path path2 = new Path("D:\\words2.txt"); MultipleInputs.addInputPath(job, path1, TextInputFormat.class, MultInputMapper.class); MultipleInputs.addInputPath(job, path2, TextInputFormat.class, MultInputMapper.class); FileOutputFormat.setOutputPath(job, new Path("D:\\mulinputOut")); job.waitForCompletion(true); }}
运行Job类查看效果:
以上实现展示了如何在MapReduce框架中处理多个输入文件。通过MultipleInputs.addInputPath方法,可以轻松地将多个输入路径添加到同一个MapReduce任务中,从而实现多文件批量处理。
转载地址:http://trufk.baihongyu.com/