大数据技术实验三 —— Mapreduce数据处理实验

1 实验目的

了解MapReduce中“Map”和“Reduce”基本概念和主要思想;掌握基本的MapReduce API编程,并实现合并、去重、排序等基本功能;

2 实验环境

实验平台:基于实验一搭建的虚拟机Hadoop大数据实验平台上的MapReduce集群;
编程语言:JAVA

3 实验内容

  1. 编程实现文件合并和去重操作;对于每行至少具有三个字段的两个输入文件,即文件A和文件B,请编写MapReduce程序,对两个文件进行合并,并剔除其中重复的内容,得到一个新的输出文件C。
  2. 编写程序实现对输入文件的排序;现在有多个输入文件,每个文件中的每行内容均为一个整数。要求读取所有文件中的整数,进行升序排序后,输出到一个新的文件中,输出的数据格式为每行两个整数,第一个数字为第二个整数的排序位次,第二个整数为原待排列的整数。
  3. 对以上两个任务撰写实验报告,并提交相关实验代码。

4 任务一

编程实现文件合并和去重操作;对于每行至少具有三个字段的两个输入文件,即文件A和文件B,请编写MapReduce程序,对两个文件进行合并,并剔除其中重复的内容,得到一个新的输出文件C。

4.1 编写代码Merge.java

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import java.io.IOException;

public class Merge {
    // Map类,继承自Mapper类--一个抽象类
    public static class Map extends Mapper<Object, Text, Text, Text> {
        private static Text text = new Text();

        // 重写map方法
        public void map(Object key, Text value, Context content) throws IOException, InterruptedException {
            text = value;
            // 底层通过Context content传递信息(即key value)
            content.write(text, new Text(""));
        }
    }

    // Reduce类,继承自Reducer类--一个抽象类
    public static class Reduce extends Reducer<Text, Text, Text, Text> {
        public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            // 对于所有的相同的key,只写入一个,相当于对于所有Iterable<Text> values,只执行一次write操作
            context.write(key, new Text(""));
        }
    }

    // main方法
    public static void main(String[] args) throws Exception {
        final String INPUT_PATH = "ghb_lab3_input";// 输入目录
        final String OUTPUT_PATH = "ghb_lab3_output";// 输出目录
        Configuration conf = new Configuration();
        // conf.set("fs.defaultFS", "hdfs://localhost:9000");
        Path path = new Path(OUTPUT_PATH);

        // 加载配置文件
        FileSystem fileSystem = path.getFileSystem(conf);
        // 输出目录若存在则删除
        if (fileSystem.exists(new Path(OUTPUT_PATH))) {
            fileSystem.delete(new Path(OUTPUT_PATH), true);
        }
        Job job = Job.getInstance(conf, "Merge");
        job.setJarByClass(Merge.class);
        job.setMapperClass(Map.class); // 初始化为自定义Map类
        job.setReducerClass(Reduce.class); // 初始化为自定义Reduce类
        job.setOutputKeyClass(Text.class); // 指定输出的key的类型,Text相当于String类
        job.setOutputValueClass(Text.class); // 指定输出的Value的类型,Text相当于String类
        FileInputFormat.addInputPath(job, new Path(INPUT_PATH)); // FileInputFormat指将输入的文件(若大于64M)进行切片划分,每个split切片对应一个Mapper任务
        FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

4.2 传输代码和实验数据

打开WinSCP,将Merge.java及实验数据复制到虚拟机cluster1的/home/hadoop路径下。

在cluster1中使用用户hadoop,输入以下命令

cd ~
// 创建输入文件夹
mkdir ghb_lab3_input
// 创建输出文件夹
mkdir ghb_lab3_output
// 将数据文件移动到输入文件夹内
mv 附录实验数据一.csv ghb_lab3_output/
mv 附录实验数据二.csv ghb_lab3_output/
// 查看文件是否移动成功
ls ghb_lab3_output

4.3 编译运行

// 构造新的命令ghb_javac,注意下面是一行,不要写成多行
alias ghb_javac="javac -cp /usr/local/hadoop-2.6.5/share/hadoop/common/*:/usr/local/hadoop-2.6.5/share/hadoop/common/lib/*:/usr/local/hadoop-2.6.5/share/hadoop/hdfs/lib/*:/usr/local/hadoop-2.6.5/share/hadoop/hdfs/*:/usr/local/hadoop-2.6.5/share/hadoop/mapreduce/*:/usr/local/hadoop-2.6.5/share/hadoop/mapreduce/lib/*:/usr/local/hadoop-2.6.5/share/hadoop/yarn/lib/*:/usr/local/hadoop-2.6.5/share/hadoop/yarn/*:"

// 构造新的命令ghb_java,注意下面是一行,不要写成多行
alias ghb_java="java -cp /usr/local/hadoop-2.6.5/share/hadoop/common/*:/usr/local/hadoop-2.6.5/share/hadoop/common/lib/*:/usr/local/hadoop-2.6.5/share/hadoop/hdfs/lib/*:/usr/local/hadoop-2.6.5/share/hadoop/hdfs/*:/usr/local/hadoop-2.6.5/share/hadoop/mapreduce/*:/usr/local/hadoop-2.6.5/share/hadoop/mapreduce/lib/*:/usr/local/hadoop-2.6.5/share/hadoop/yarn/lib/*:/usr/local/hadoop-2.6.5/share/hadoop/yarn/*:"
// 编译
ghb_javac Merge.java
// 运行
ghb_java Merge

4.4 查看输出结果

ls ghb_lab3_output/
cat ghb_lab3_output/part-r-00000

5 任务二

编写程序实现对输入文件的排序;现在有多个输入文件,每个文件中的每行内容均为一个整数。要求读取所有文件中的整数,进行升序排序后,输出到一个新的文件中,输出的数据格式为每行两个整数,第一个数字为第二个整数的排序位次,第二个整数为原待排列的整数。

5.1 编写代码Sort.java

import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.fs.FileSystem;

public class Sort {
    public static class Partition extends Partitioner<IntWritable, IntWritable> {

        @Override
        public int getPartition(IntWritable key, IntWritable value, int numPartitions) {
            int MaxNumber = 65223;
            int bound = MaxNumber / numPartitions + 1;
            int keynumber = key.get();
            for (int i = 0; i < numPartitions; i++) {
                if (keynumber < bound * i && keynumber >= bound * (i - 1))
                    return i - 1;
            }
            return 0;
        }
    }

    public static class SortMapper extends Mapper<Object, Text, IntWritable, IntWritable> {

        private static IntWritable data = new IntWritable();

        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            data.set(Integer.parseInt(line));
            context.write(data, new IntWritable(1));
        }
    }

    public static class SortReducer extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
        private static IntWritable linenum = new IntWritable(1);

        public void reduce(IntWritable key, Iterable<IntWritable> values, Context context)
                throws IOException, InterruptedException {
            for (IntWritable val : values) {
                context.write(linenum, key);
                linenum = new IntWritable(linenum.get() + 1);
            }

        }
    }

    public static void main(String[] args) throws Exception {
        // TODO Auto-generated method stub
        final String INPUT_PATH = "ghb_lab3_input2";// 输入目录
        final String OUTPUT_PATH = "ghb_lab3_output2";// 输出目录
        Configuration conf = new Configuration();
        // conf.set("fs.defaultFS", "hdfs://localhost:9000");

        Path path = new Path(OUTPUT_PATH);

        // 加载配置文件
        FileSystem fileSystem = path.getFileSystem(conf);

        // 输出目录若存在则删除
        if (fileSystem.exists(new Path(OUTPUT_PATH))) {
            fileSystem.delete(new Path(OUTPUT_PATH), true);
        }

        Job job = new Job(conf, "Sort");
        job.setJarByClass(Sort.class);
        job.setMapperClass(SortMapper.class);
        job.setPartitionerClass(Partition.class);
        job.setReducerClass(SortReducer.class);
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path(INPUT_PATH)); // FileInputFormat指将输入的文件(若大于64M)进行切片划分,每个split切片对应一个Mapper任务
        FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

5.2 传输代码和实验数据

打开WinSCP,将Sort.java及实验数据复制到虚拟机cluster1的/home/hadoop路径下。


在cluster1中使用用户hadoop,输入以下命令

cd ~
// 创建输入文件夹
mkdir ghb_lab3_input2
// 创建输出文件夹
mkdir ghb_lab3_output2
// 将数据文件移动到输入文件夹内
mv 附录实验数据三.txt ghb_lab3_input2/
mv 附录实验数据四.txt ghb_lab3_input2/
// 查看文件是否移动成功
ls ghb_lab3_input2

5.3 编译运行

// 编译
ghb_javac Sort.java
// 运行
ghb_java Sort

5.4 查看输出结果

ls ghb_lab3_output2/
cat ghb_lab3_output2/part-r-00000

发表评论

电子邮件地址不会被公开。 必填项已用*标注

开始在上面输入您的搜索词,然后按回车进行搜索。按ESC取消。

返回顶部