HCatalog - Formato input output

Il HCatInputFormat e HCatOutputFormatle interfacce vengono utilizzate per leggere i dati da HDFS e, dopo l'elaborazione, scrivere i dati risultanti in HDFS utilizzando il lavoro MapReduce. Elaboriamo le interfacce dei formati di input e output.

HCatInputFormat

Il HCatInputFormatviene utilizzato con i lavori MapReduce per leggere i dati dalle tabelle gestite da HCatalog. HCatInputFormat espone un'API MapReduce di Hadoop 0.20 per la lettura dei dati come se fossero stati pubblicati in una tabella.

Sr.No. Nome e descrizione del metodo
1

public static HCatInputFormat setInput(Job job, String dbName, String tableName)throws IOException

Imposta gli input da utilizzare per il lavoro. Interroga il metastore con la specifica di input fornita e serializza le partizioni corrispondenti nella configurazione del lavoro per le attività di MapReduce.

2

public static HCatInputFormat setInput(Configuration conf, String dbName, String tableName) throws IOException

Imposta gli input da utilizzare per il lavoro. Interroga il metastore con la specifica di input fornita e serializza le partizioni corrispondenti nella configurazione del lavoro per le attività di MapReduce.

3

public HCatInputFormat setFilter(String filter)throws IOException

Imposta un filtro nella tabella di input.

4

public HCatInputFormat setProperties(Properties properties) throws IOException

Imposta le proprietà per il formato di input.

Il HCatInputFormat L'API include i seguenti metodi:

  • setInput
  • setOutputSchema
  • getTableSchema

Usare HCatInputFormat per leggere i dati, creare prima un'istanza di un file InputJobInfo con le informazioni necessarie dalla tabella in fase di lettura e quindi chiamare setInput con il InputJobInfo.

Puoi usare il file setOutputSchema metodo per includere un file projection schema, per specificare i campi di output. Se non viene specificato uno schema, verranno restituite tutte le colonne nella tabella. È possibile utilizzare il metodo getTableSchema per determinare lo schema della tabella per una tabella di input specificata.

HCatOutputFormat

HCatOutputFormat viene utilizzato con i lavori MapReduce per scrivere i dati nelle tabelle gestite da HCatalog. HCatOutputFormat espone un'API MapReduce di Hadoop 0.20 per la scrittura di dati in una tabella. Quando un lavoro MapReduce utilizza HCatOutputFormat per scrivere l'output, viene utilizzato il formato OutputFormat predefinito configurato per la tabella e la nuova partizione viene pubblicata nella tabella al termine del lavoro.

Sr.No. Nome e descrizione del metodo
1

public static void setOutput (Configuration conf, Credentials credentials, OutputJobInfo outputJobInfo) throws IOException

Impostare le informazioni sull'output da scrivere per il lavoro. Interroga il server dei metadati per trovare lo StorageHandler da utilizzare per la tabella. Genera un errore se la partizione è già pubblicata.

2

public static void setSchema (Configuration conf, HCatSchema schema) throws IOException

Imposta lo schema per i dati da scrivere nella partizione. Lo schema della tabella viene utilizzato per impostazione predefinita per la partizione se non viene chiamato.

3

public RecordWriter <WritableComparable<?>, HCatRecord > getRecordWriter (TaskAttemptContext context)throws IOException, InterruptedException

Ottieni lo scrittore di dischi per il lavoro. Utilizza l'OutputFormat predefinito di StorageHandler per ottenere il writer di record.

4

public OutputCommitter getOutputCommitter (TaskAttemptContext context) throws IOException, InterruptedException

Ottieni il committer di output per questo formato di output. Assicura che l'output venga eseguito correttamente.

Il HCatOutputFormat L'API include i seguenti metodi:

  • setOutput
  • setSchema
  • getTableSchema

La prima chiamata su HCatOutputFormat deve essere setOutput; qualsiasi altra chiamata genererà un'eccezione dicendo che il formato di output non è inizializzato.

Lo schema per i dati da scrivere è specificato da setSchemametodo. Devi chiamare questo metodo, fornendo lo schema dei dati che stai scrivendo. Se i tuoi dati hanno lo stesso schema dello schema della tabella, puoi usareHCatOutputFormat.getTableSchema() per ottenere lo schema della tabella e quindi passarlo a setSchema().

Esempio

Il seguente programma MapReduce legge i dati da una tabella che presume abbia un numero intero nella seconda colonna ("colonna 1") e conta quante istanze di ciascun valore distinto trova. Cioè, fa l'equivalente di "select col1, count(*) from $table group by col1;".

Ad esempio, se i valori nella seconda colonna sono {1, 1, 1, 3, 3, 5}, il programma produrrà il seguente output di valori e conteggi:

1, 3
3, 2
5, 1

Diamo ora un'occhiata al codice del programma:

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import org.apache.HCatalog.common.HCatConstants;
import org.apache.HCatalog.data.DefaultHCatRecord;
import org.apache.HCatalog.data.HCatRecord;
import org.apache.HCatalog.data.schema.HCatSchema;

import org.apache.HCatalog.mapreduce.HCatInputFormat;
import org.apache.HCatalog.mapreduce.HCatOutputFormat;
import org.apache.HCatalog.mapreduce.InputJobInfo;
import org.apache.HCatalog.mapreduce.OutputJobInfo;

public class GroupByAge extends Configured implements Tool {

   public static class Map extends Mapper<WritableComparable, 
      HCatRecord, IntWritable, IntWritable> {
      int age;
		
      @Override
      protected void map(
         WritableComparable key, HCatRecord value,
         org.apache.hadoop.mapreduce.Mapper<WritableComparable,
         HCatRecord, IntWritable, IntWritable>.Context context
      )throws IOException, InterruptedException {
         age = (Integer) value.get(1);
         context.write(new IntWritable(age), new IntWritable(1));
      }
   }
	
   public static class Reduce extends Reducer<IntWritable, IntWritable,
      WritableComparable, HCatRecord> {
      @Override
      protected void reduce(
         IntWritable key, java.lang.Iterable<IntWritable> values,
         org.apache.hadoop.mapreduce.Reducer<IntWritable, IntWritable,
         WritableComparable, HCatRecord>.Context context
      )throws IOException ,InterruptedException {
         int sum = 0;
         Iterator<IntWritable> iter = values.iterator();
			
         while (iter.hasNext()) {
            sum++;
            iter.next();
         }
			
         HCatRecord record = new DefaultHCatRecord(2);
         record.set(0, key.get());
         record.set(1, sum);
         context.write(null, record);
      }
   }
	
   public int run(String[] args) throws Exception {
      Configuration conf = getConf();
      args = new GenericOptionsParser(conf, args).getRemainingArgs();
		
      String serverUri = args[0];
      String inputTableName = args[1];
      String outputTableName = args[2];
      String dbName = null;
      String principalID = System
		
      .getProperty(HCatConstants.HCAT_METASTORE_PRINCIPAL);
      if (principalID != null)
      conf.set(HCatConstants.HCAT_METASTORE_PRINCIPAL, principalID);
      Job job = new Job(conf, "GroupByAge");
      HCatInputFormat.setInput(job, InputJobInfo.create(dbName, inputTableName, null));

      // initialize HCatOutputFormat
      job.setInputFormatClass(HCatInputFormat.class);
      job.setJarByClass(GroupByAge.class);
      job.setMapperClass(Map.class);
      job.setReducerClass(Reduce.class);
		
      job.setMapOutputKeyClass(IntWritable.class);
      job.setMapOutputValueClass(IntWritable.class);
      job.setOutputKeyClass(WritableComparable.class);
      job.setOutputValueClass(DefaultHCatRecord.class);
		
      HCatOutputFormat.setOutput(job, OutputJobInfo.create(dbName, outputTableName, null));
      HCatSchema s = HCatOutputFormat.getTableSchema(job);
      System.err.println("INFO: output schema explicitly set for writing:" + s);
      HCatOutputFormat.setSchema(job, s);
      job.setOutputFormatClass(HCatOutputFormat.class);
      return (job.waitForCompletion(true) ? 0 : 1);
   }
	
   public static void main(String[] args) throws Exception {
      int exitCode = ToolRunner.run(new GroupByAge(), args);
      System.exit(exitCode);
   }
}

Prima di compilare il programma sopra, devi scaricarne alcuni jars e aggiungili al file classpathper questa applicazione. Devi scaricare tutti i jar Hive e HCatalog (HCatalog-core-0.5.0.jar, hive-metastore-0.10.0.jar, libthrift-0.7.0.jar, hive-exec-0.10.0.jar, libfb303-0.7.0.jar, jdo2-api-2.3-ec.jar, slf4j-api-1.6.1.jar).

Usa i seguenti comandi per copiarli jar file da local per HDFS e aggiungili al file classpath.

bin/hadoop fs -copyFromLocal $HCAT_HOME/share/HCatalog/HCatalog-core-0.5.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/hive-metastore-0.10.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/libthrift-0.7.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/hive-exec-0.10.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/libfb303-0.7.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/jdo2-api-2.3-ec.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/slf4j-api-1.6.1.jar /tmp

export LIB_JARS=hdfs:///tmp/HCatalog-core-0.5.0.jar,
hdfs:///tmp/hive-metastore-0.10.0.jar,
hdfs:///tmp/libthrift-0.7.0.jar,
hdfs:///tmp/hive-exec-0.10.0.jar,
hdfs:///tmp/libfb303-0.7.0.jar,
hdfs:///tmp/jdo2-api-2.3-ec.jar,
hdfs:///tmp/slf4j-api-1.6.1.jar

Utilizzare il seguente comando per compilare ed eseguire il programma dato.

$HADOOP_HOME/bin/hadoop jar GroupByAge tmp/hive

Ora controlla la tua directory di output (hdfs: user / tmp / hive) per l'output (part_0000, part_0001).