博客
关于我
MapReduce Java API-多输入路径方式
阅读量:794 次
发布时间:2023-02-06

本文共 3265 字,大约阅读时间需要 10 分钟。

MapReduce Java API实例-统计单词出现频率

在已实现单次单词频率统计的基础上,本文将详细说明如何处理多个数据集文件(如多个txt文件)的输入问题。

多文件输入的实现方法如下:

多文件输入可以通过Hadoop提供的MultipleInputs类的addInputPath方法来实现。这种方法允许将多个文件路径添加到MapReduce任务中,从而支持批量处理。

以下是完整的代码实现:

  • Map类
  • package com.badao.multinput;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;import java.util.StringTokenizer;public class MultInputMapper extends Mapper
    { private Text word = new Text(); private IntWritable one = new IntWritable(1); @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer stringTokenizer = new StringTokenizer(value.toString()); while (stringTokenizer.hasMoreTokens()) { word.set(stringTokenizer.nextToken()); context.write(word, one); } }}
    1. Reduce类
    2. package com.badao.multinput;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class MultInputReducer extends Reducer
      { private IntWritable result = new IntWritable(); @Override public void reduce(Text key, Iterable
      values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); }}
      1. Job类
      2. package com.badao.multinput;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class MultInputJob {    public static void main(String[] args)            throws InterruptedException, IOException, ClassNotFoundException {        wordCountLocal();    }    public static void wordCountLocal()            throws IOException, ClassNotFoundException, InterruptedException {        Configuration conf = new Configuration();        Job job = Job.getInstance(conf, "multinputwordcount");        job.setJarByClass(MultInputJob.class);        job.setMapperClass(MultInputMapper.class);        job.setCombinerClass(IntSumReducer.class);        job.setReducerClass(MultInputReducer.class);        job.setOutputKeyClass(Text.class);        job.setOutputValueClass(IntWritable.class);        Path path1 = new Path("D:\\words.txt");        Path path2 = new Path("D:\\words2.txt");        MultipleInputs.addInputPath(job, path1, TextInputFormat.class, MultInputMapper.class);        MultipleInputs.addInputPath(job, path2, TextInputFormat.class, MultInputMapper.class);        FileOutputFormat.setOutputPath(job, new Path("D:\\mulinputOut"));        job.waitForCompletion(true);    }}

        运行Job类查看效果:

        以上实现展示了如何在MapReduce框架中处理多个输入文件。通过MultipleInputs.addInputPath方法,可以轻松地将多个输入路径添加到同一个MapReduce任务中,从而实现多文件批量处理。

    转载地址:http://trufk.baihongyu.com/

    你可能感兴趣的文章
    Logstash input jdbc连接数据库
    查看>>
    logstash mysql 准实时同步到 elasticsearch
    查看>>
    Logstash是什么,干什么用的?带你详细认识
    查看>>
    Logstash简介和部署---ElasticStack(ELK)工作笔记019
    查看>>
    logstash设置开机自启动
    查看>>
    logstash详解
    查看>>
    Logstash语法入门
    查看>>
    loj #6485. LJJ 学二项式定理 (模板qwq)
    查看>>
    Loj 6285. 数列分块入门 9
    查看>>
    Lombok的使用及其原理
    查看>>
    Lua,Lua API,配置文件
    查看>>
    lumen开发问题:Call to undefined method Illuminate\Validation\Validator::make()
    查看>>
    luogu P1268 树的重量
    查看>>
    Luogu P2326 AKN's PPAP【按位贪心】
    查看>>
    LUOGU P2575 高手过招
    查看>>
    LUOGU P4095 [HEOI2013]Eden 的新背包问题
    查看>>
    Luogu2973:[USACO10HOL]赶小猪
    查看>>
    luogu3172 [CQOI2015]选数 莫比乌斯反演+杜教筛
    查看>>
    LVM: Logical Volume Manager 逻辑卷管理
    查看>>
    lvm基本知识与常用命令
    查看>>