MapReduce 分区器
分区器的工作方式类似于处理输入数据集的条件。分区阶段发生在 Map 阶段之后和 Reduce 阶段之前。
partitioner 的数量等于 reducer 的数量。这意味着分区器将根据减速器的数量划分数据。因此,从单个 partitioner 传递的数据由单个 Reducer 处理。
分区器
分区器对中间映射输出的键值对进行分区。它使用用户定义的条件对数据进行分区,其工作方式类似于哈希函数。分区总数与作业的 Reducer 任务数相同。让我们举个例子来了解分区器是如何工作的。
MapReduce 分区器实现
为了方便起见,让我们假设我们有一个名为 Employee 的小表,其中包含以下数据。我们将使用此示例数据作为输入数据集来演示分区器的工作原理。
Id | Name | Age | Gender | Salary |
---|---|---|---|---|
1201 | gopal | 45 | Male | 50,000 |
1202 | manisha | 40 | Female | 50,000 |
1203 | khalil | 34 | Male | 30,000 |
1204 | prasanth | 30 | Male | 30,000 |
1205 | kiran | 20 | Male | 40,000 |
1206 | laxmi | 25 | Female | 35,000 |
1207 | bhavya | 20 | Female | 15,000 |
1208 | reshma | 19 | Female | 15,000 |
1209 | kranthi | 22 | Male | 22,000 |
1210 | Satish | 24 | Male | 25,000 |
1211 | Krishna | 25 | Male | 25,000 |
1212 | Arshad | 28 | Male | 20,000 |
1213 | lavanya | 18 | Female | 8,000 |
我们必须编写一个应用程序来处理输入数据集,以找到不同年龄组(例如,20 岁以下、21 到 30 岁之间、30 岁以上)中按性别计算的最高工资员工。
输入数据
以上数据保存为 输入.txt 在“/home/hadoop/hadoopPartitioner”目录中并作为输入给出。
1201 | gopal | 45 | Male | 50000 |
1202 | manisha | 40 | Female | 51000 |
1203 | khaleel | 34 | Male | 30000 |
1204 | prasanth | 30 | Male | 31000 |
1205 | kiran | 20 | Male | 40000 |
1206 | laxmi | 25 | Female | 35000 |
1207 | bhavya | 20 | Female | 15000 |
1208 | reshma | 19 | Female | 14000 |
1209 | kranthi | 22 | Male | 22000 |
1210 | Satish | 24 | Male | 25000 |
1211 | Krishna | 25 | Male | 26000 |
1212 | Arshad | 28 | Male | 20000 |
1213 | lavanya | 18 | Female | 8000 |
基于给定的输入,以下是程序的算法解释。
地图任务
当我们将文本数据保存在文本文件中时,map 任务接受键值对作为输入。该地图任务的输入如下:
输入 : key 可以是“任意特殊键 + 文件名 + 行号”(例如:key = @input1),value 是该行中的数据(例如:value = 1201 \t gopal \t 45 \ t 男性 \t 50000)。
Method :这个地图任务的操作如下:
-
Read the value (记录数据),它来自字符串中参数列表的输入值。
-
使用 split 函数,将性别分开并存储在字符串变量中。
String[] str = value.toString().split("\t", -3); String gender=str[3];
-
发送性别信息和记录数据 value 作为映射任务的输出键值对到 分区任务 .
context.write(new Text(gender), new Text(value));
-
对文本文件中的所有记录重复上述所有步骤。
输出 : 你会得到性别数据和记录数据值作为键值对。
分区任务
partitioner 任务接受来自 map 任务的键值对作为其输入。分区意味着将数据分成段。根据给定的分区条件标准,输入的键值对数据可以根据年龄标准分为三部分。
输入 : 键值对集合中的全部数据。
key = 记录中的性别字段值。
value = 该性别的整个记录数据值。
Method : 分区逻辑的流程运行如下。
- 从输入键值对中读取年龄字段值。
String[] str = value.toString().split("\t"); int age = Integer.parseInt(str[2]);
-
使用以下条件检查年龄值。
- 年龄小于或等于 20
- 年龄大于 20 岁且小于或等于 30 岁。
- 年龄大于 30。
if(age<=20) { return 0; } else if(age>20 && age<=30) { return 1 % numReduceTasks; } else { return 2 % numReduceTasks; }
输出 :整个key-value对数据被分割成三个key-value对集合。 Reducer 在每个集合上单独工作。
减少任务
partitioner 任务的数量等于 reducer 任务的数量。这里我们有三个 partitioner 任务,因此我们有三个 Reducer 任务要执行。
输入 :Reducer 会使用不同的键值对集合执行 3 次。
key = 记录中的性别字段值。
value = 该性别的整个记录数据。
Method : 以下逻辑将应用于每个集合。
- 读取每条记录的 Salary 字段值。
String [] str = val.toString().split("\t", -3); 注意: str[4] have the salary field value.
-
使用 max 变量检查薪水。如果 str[4] 是最大薪水,则将 str[4] 分配给 max,否则跳过该步骤。
if(Integer.parseInt(str[4])>max) { max=Integer.parseInt(str[4]); }
-
对每个密钥集合重复步骤 1 和 2(男性和女性是密钥集合)。执行完这三个步骤后,你将在 Male 键集合中找到一项最高薪水,从女性键集合中找到一项最高薪水。
context.write(new Text(key), new IntWritable(max));
输出 : 最后,你会得到一组不同年龄段的三个集合的键值对数据。它分别包含每个年龄段的男性集合的最高工资和女性集合的最高工资。
执行 Map、Partitioner 和 Reduce 任务后,三个键值对数据集合作为输出存储在三个不同的文件中。
所有这三个任务都被视为 MapReduce 作业。这些作业的以下要求和规范应在配置中指定:
- Job name
- 键和值的输入和输出格式
- Map、Reduce 和 Partitioner 任务的各个类
Configuration conf = getConf(); // 创建作业 Job job = new Job(conf, "topsal"); job.setJarByClass(Partitioner例子.class); // 文件输入输出路径 File输入Format.set输入Paths(job, new Path(arg[0])); File输出Format.set输出Path(job,new Path(arg[1])); // 为键值对设置 Mapper 类和输出格式。 job.setMapperClass(MapClass.class); job.setMap输出KeyClass(Text.class); job.setMap输出ValueClass(Text.class); // 设置分区器语句 job.setPartitionerClass(CaderPartitioner.class); // 为键值对设置 Reducer 类和输入/输出格式。 job.setReducerClass(ReduceClass.class); // Reducer 任务数。 job.setNumReduceTasks(3); // 数据的输入输出格式 job.set输入FormatClass(Text输入Format.class); job.set输出FormatClass(Text输出Format.class); job.set输出KeyClass(Text.class); job.set输出ValueClass(Text.class);
示例程序
下面的程序展示了如何在 MapReduce 程序中实现给定条件的分区器。
package partitionerexample; import java.io.*; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.conf.*; import org.apache.hadoop.conf.*; import org.apache.hadoop.fs.*; import org.apache.hadoop.mapreduce.lib.input.*; import org.apache.hadoop.mapreduce.lib.output.*; import org.apache.hadoop.util.*; public class Partitioner例子 extends Configured implements Tool { // 地图类 public static class MapClass extends Mapper<LongWritable,Text,Text,Text> { public void map(LongWritable key, Text value, Context context) { try{ String[] str = value.toString().split("\t", -3); String gender=str[3]; context.write(new Text(gender), new Text(value)); } catch(Exception e) { System.out.println(e.getMessage()); } } } // 减速器类 public static class ReduceClass extends Reducer<Text,Text,Text,IntWritable> { public int max = -1; public void reduce(Text key, Iterable <Text> values, Context context) throws IOException, InterruptedException { max = -1; for (Text val : values) { String [] str = val.toString().split("\t", -3); if(Integer.parseInt(str[4])>max) max=Integer.parseInt(str[4]); } context.write(new Text(key), new IntWritable(max)); } } // 分区器类 public static class CaderPartitioner extends Partitioner < Text, Text > { @Override public int getPartition(Text key, Text value, int numReduceTasks) { String[] str = value.toString().split("\t"); int age = Integer.parseInt(str[2]); if(numReduceTasks == 0) { return 0; } if(age<=20) { return 0; } else if(age>20 && age<=30) { return 1 % numReduceTasks; } else { return 2 % numReduceTasks; } } } @Override public int run(String[] arg) throws Exception { Configuration conf = getConf(); Job job = new Job(conf, "topsal"); job.setJarByClass(Partitioner例子.class); File输入Format.set输入Paths(job, new Path(arg[0])); File输出Format.set输出Path(job,new Path(arg[1])); job.setMapperClass(MapClass.class); job.setMap输出KeyClass(Text.class); job.setMap输出ValueClass(Text.class); // 设置分区器语句 job.setPartitionerClass(CaderPartitioner.class); job.setReducerClass(ReduceClass.class); job.setNumReduceTasks(3); job.set输入FormatClass(Text输入Format.class); job.set输出FormatClass(Text输出Format.class); job.set输出KeyClass(Text.class); job.set输出ValueClass(Text.class); System.exit(job.waitForCompletion(true)? 0 : 1); return 0; } public static void main(String ar[]) throws Exception { int res = ToolRunner.run(new Configuration(), new Partitioner例子(),ar); System.exit(0); } }
将上面的代码另存为 Partitioner例子.java 在“/home/hadoop/hadoopPartitioner”中。下面给出程序的编译和执行。
编译和执行
假设我们在 Hadoop 用户的主目录中(例如,/home/hadoop)。
按照下面给出的步骤编译和执行上述程序。
步骤 1 : 下载Hadoop-core-1.2.1.jar,用于编译执行MapReduce程序。你可以从以下位置下载 jar mvnrepository.com .
假设下载的文件夹是“/home/hadoop/hadoopPartitioner”
步骤 2 : 以下命令用于编译程序 Partitioner例子.java 并为程序创建一个 jar。
$ javac -classpath hadoop-core-1.2.1.jar -d ProcessUnits.java $ jar -cvf Partitioner例子.jar -C .
步骤 3 :使用以下命令在HDFS中创建一个输入目录。
$HADOOP_HOME/bin/hadoop fs -mkdir input_dir
步骤 4 :使用以下命令复制输入文件名为 输入.txt 在 HDFS 的输入目录中。
$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/hadoopPartitioner/input.txt input_dir
步骤 5 : 使用以下命令验证输入目录下的文件。
$HADOOP_HOME/bin/hadoop fs -ls input_dir/
步骤 6 : 使用以下命令,通过从输入目录中获取输入文件来运行最高薪水应用程序。
$HADOOP_HOME/bin/hadoop jar Partitioner例子.jar partitionerexample.Partitioner例子 input_dir/input.txt output_dir
等待一段时间,直到文件被执行。执行后,输出包含多个输入拆分、map 任务和 Reducer 任务。
15/02/04 15发送性别信息和记录数据51 INFO mapreduce.Job: Job job_1423027269044_0021 completed successfully 15/02/04 15发送性别信息和记录数据52 INFO mapreduce.Job: Counters: 49 File System Counters FILE: Number of bytes read=467 FILE: Number of bytes written=426777 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=480 HDFS: Number of bytes written=72 HDFS: Number of read operations=12 HDFS: Number of large read operations=0 HDFS: Number of write operations=6 Job Counters Launched map tasks=1 Launched reduce tasks=3 Data-local map tasks=1 Total time spent by all maps in occupied slots (ms)=8212 Total time spent by all reduces in occupied slots (ms)=59858 Total time spent by all map tasks (ms)=8212 Total time spent by all reduce tasks (ms)=59858 Total vcore-seconds taken by all map tasks=8212 Total vcore-seconds taken by all reduce tasks=59858 Total megabyte-seconds taken by all map tasks=8409088 Total megabyte-seconds taken by all reduce tasks=61294592 Map-Reduce Framework Map input records=13 Map output records=13 Map output bytes=423 Map output materialized bytes=467 输入 split bytes=119 Combine input records=0 Combine output records=0 Reduce input groups=6 Reduce shuffle bytes=467 Reduce input records=13 Reduce output records=6 Spilled Records=26 Shuffled Maps =3 Failed Shuffles=0 Merged Map outputs=3 GC time elapsed (ms)=224 CPU time spent (ms)=3690 Physical memory (bytes) snapshot=553816064 Virtual memory (bytes) snapshot=3441266688 Total committed heap usage (bytes)=334102528 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File 输入 Format Counters Bytes Read=361 File 输出 Format Counters Bytes Written=72
步骤 7 : 使用以下命令验证输出文件夹中的结果文件。
$HADOOP_HOME/bin/hadoop fs -ls output_dir/
你将在三个文件中找到输出,因为你在程序中使用了三个分区器和三个 Reducer。
步骤 8 : 使用下面的命令查看中的输出 Part-00000 文件。该文件由 HDFS 生成。
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000
在 Part-00000 中输出
Female 15000 Male 40000
使用以下命令查看输出 零件-00001 file.
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00001
Part-00001 中的输出
Female 35000 Male 31000
使用以下命令查看输出 Part-00002 file.
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00002
Part-00002 中的输出
Female 51000 Male 50000