HCatalog 输入输出格式


The HCat输入Format and HCat输出格式 interfaces are used to read data from HDFS and after processing, write the resultant data into HDFS using MapReduce job. Let us elaborate the 输入 and 输出 format interfaces.

HCat输入Format


The HCat输入Format 与 MapReduce 作业一起用于从 HCatalog 管理的表中读取数据。 HCat输入Format 公开了一个 Hadoop 0.20 MapReduce API,用于读取数据,就好像它已发布到表中一样。

序号. 方法名称和描述
1

public static HCat输入Format set输入(Job job, String dbName, String tableName)throws IOException

设置用于作业的输入。它使用给定的输入规范查询元存储,并将匹配的分区序列化到 MapReduce 任务的作业配置中。

2

public static HCat输入Format set输入(Configuration conf, String dbName, String tableName) throws IOException

设置用于作业的输入。它使用给定的输入规范查询元存储,并将匹配的分区序列化到 MapReduce 任务的作业配置中。

3

public HCat输入Format setFilter(String filter)throws IOException

在输入表上设置过滤器。

4

public HCat输入Format setProperties(Properties properties) throws IOException

设置输入格式的属性。

The HCat输入Format API包括以下方法:

  • set输入
  • set输出Schema
  • getTableSchema

To use HCat输入Format 要读取数据,首先实例化一个 输入作业信息 从表中读取必要的信息,然后调用 set输入 with the 输入作业信息 .

你可以使用 set输出Schema 方法包括一个 投影模式 , 指定输出字段。如果未指定架构,则将返回表中的所有列。你可以使用 getTableSchema 方法来确定指定输入表的表架构。

HCat输出格式


HCat输出Format 与 MapReduce 作业一起使用,以将数据写入 HCatalog 管理的表。 HCat输出Format 公开了用于将数据写入表的 Hadoop 0.20 MapReduce API。当 MapReduce 作业使用 HCat输出Format 写入输出时,将使用为表配置的默认 输出Format,并在作业完成后将新分区发布到表中。

序号. 方法名称和描述
1

public static void set输出 (Configuration conf, Credentials credentials, 输出JobInfo outputJobInfo) throws IOException

设置有关要为作业写入的输出的信息。它查询元数据服务器以查找要用于表的 StorageHandler。如果分区已发布,则会引发错误。

2

public static void setSchema (Configuration conf, HCatSchema schema) throws IOException

设置要写入分区的数据的架构。如果未调用,则默认情况下将表模式用于分区。

3

public RecordWriter <WritableComparable<?>, HCatRecord > getRecordWriter (TaskAttemptContext context)throws IOException, InterruptedException

为这份工作找唱片作家。它使用 StorageHandler 的默认 输出Format 来获取记录写入器。

4

public 输出Committer get输出Committer (TaskAttemptContext context) throws IOException, InterruptedException

获取此输出格式的输出提交者。它确保正确提交输出。

The HCat输出格式 API包括以下方法:

  • 设置输出
  • 设置模式
  • getTableSchema

HCat输出Format 的第一次调用必须是 设置输出 ;任何其他调用都会抛出异常,说明输出格式未初始化。

被写出的数据的模式由 设置模式 方法。你必须调用此方法,并提供你正在编写的数据架构。如果你的数据与表架构具有相同的架构,则可以使用 HCat输出Format.getTableSchema() 获取表模式,然后将其传递给 设置架构() .

例子

下面的 MapReduce 程序从一个表中读取数据,它假定在第二列(“列 1”)中有一个整数,并计算它找到的每个不同值的实例数。也就是说,它相当于“ select col1, count(*) from $table group by col1; ".

例如,如果第二列中的值为 {1, 1, 1, 3, 3, 5},那么程序将产生以下值和计数的输出:

1, 3
3, 2
5, 1

现在让我们看一下程序代码:

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import org.apache.HCatalog.common.HCatConstants;
import org.apache.HCatalog.data.DefaultHCatRecord;
import org.apache.HCatalog.data.HCatRecord;
import org.apache.HCatalog.data.schema.HCatSchema;

import org.apache.HCatalog.mapreduce.HCat输入Format;
import org.apache.HCatalog.mapreduce.HCat输出Format;
import org.apache.HCatalog.mapreduce.输入JobInfo;
import org.apache.HCatalog.mapreduce.输出JobInfo;

public class GroupByAge extends Configured implements Tool {

    public static class Map extends Mapper<WritableComparable,
        HCatRecord, IntWritable, IntWritable> {
        int age;
		
        @Override
        protected void map(
            WritableComparable key, HCatRecord value,
            org.apache.hadoop.mapreduce.Mapper<WritableComparable,
            HCatRecord, IntWritable, IntWritable>.Context context
        )throws IOException, InterruptedException {
            age = (Integer) value.get(1);
            context.write(new IntWritable(age), new IntWritable(1));
        }
    }
	
    public static class Reduce extends Reducer<IntWritable, IntWritable,
        WritableComparable, HCatRecord> {
        @Override
        protected void reduce(
            IntWritable key, java.lang.Iterable<IntWritable> values,
            org.apache.hadoop.mapreduce.Reducer<IntWritable, IntWritable,
            WritableComparable, HCatRecord>.Context context
        )throws IOException ,InterruptedException {
            int sum = 0;
            Iterator<IntWritable> iter = values.iterator();
			
            while (iter.hasNext()) {
                sum++;
                iter.next();
            }
			
            HCatRecord record = new DefaultHCatRecord(2);
            record.set(0, key.get());
            record.set(1, sum);
            context.write(null, record);
        }
    }
	
    public int run(String[] args) throws Exception {
        Configuration conf = getConf();
        args = new GenericOptionsParser(conf, args).getRemainingArgs();
		
        String serverUri = args[0];
        String inputTableName = args[1];
        String outputTableName = args[2];
        String dbName = null;
        String principalID = System
		
        .getProperty(HCatConstants.HCAT_METASTORE_PRINCIPAL);
        if (principalID != null)
        conf.set(HCatConstants.HCAT_METASTORE_PRINCIPAL, principalID);
        Job job = new Job(conf, "GroupByAge");
        HCat输入Format.set输入(job, 输入JobInfo.create(dbName, inputTableName, null));

        // 初始化 HCat输出Format
        job.set输入FormatClass(HCat输入Format.class);
        job.setJarByClass(GroupByAge.class);
        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);
		
        job.setMap输出KeyClass(IntWritable.class);
        job.setMap输出ValueClass(IntWritable.class);
        job.set输出KeyClass(WritableComparable.class);
        job.set输出ValueClass(DefaultHCatRecord.class);
		
        HCat输出Format.set输出(job, 输出JobInfo.create(dbName, outputTableName, null));
        HCatSchema s = HCat输出Format.getTableSchema(job);
        System.err.println("INFO: output schema explicitly set for writing:" + s);
        HCat输出Format.setSchema(job, s);
        job.set输出FormatClass(HCat输出Format.class);
        return (job.waitForCompletion(true) ? 0 : 1);
    }
	
    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new GroupByAge(), args);
        System.exit(exitCode);
    }
}

在编译上述程序之前,你必须下载一些 jars 并将它们添加到 类路径 对于这个应用程序。你需要下载所有 Hive jar 和 HCatalog jar(HCatalog-core-0.5.0.jar、hive-metastore-0.10.0.jar、libthrift-0.7.0.jar、hive-exec-0.10.0.jar、 libfb303-0.7.0.jar、jdo2-api-2.3-ec.jar、slf4j-api-1.6.1.jar)。

使用以下命令复制这些 jar 文件来自 local to HDFS 并将它们添加到 类路径 .

bin/hadoop fs -copyFromLocal $HCAT_HOME/share/HCatalog/HCatalog-core-0.5.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/hive-metastore-0.10.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/libthrift-0.7.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/hive-exec-0.10.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/libfb303-0.7.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/jdo2-api-2.3-ec.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/slf4j-api-1.6.1.jar /tmp

export LIB_JARS=hdfs:// /tmp/HCatalog-core-0.5.0.jar,
hdfs:// /tmp/hive-metastore-0.10.0.jar,
hdfs:// /tmp/libthrift-0.7.0.jar,
hdfs:// /tmp/hive-exec-0.10.0.jar,
hdfs:// /tmp/libfb303-0.7.0.jar,
hdfs:// /tmp/jdo2-api-2.3-ec.jar,
hdfs:// /tmp/slf4j-api-1.6.1.jar

使用以下命令编译并执行给定的程序。

$HADOOP_HOME/bin/hadoop jar GroupByAge tmp/hive

现在,检查输出HCatalog (hdfs: user/tmp/hive) 的输出 (part_0000, part_0001)。