HCatalog - Reader Writer

HCatalog contiene un'API di trasferimento dati per input e output paralleli senza utilizzare MapReduce. Questa API utilizza un'astrazione di archiviazione di base di tabelle e righe per leggere i dati dal cluster Hadoop e scrivere i dati in esso.

L'API di trasferimento dati contiene principalmente tre classi; quelli sono -

  • HCatReader - Legge i dati da un cluster Hadoop.

  • HCatWriter - Scrive i dati in un cluster Hadoop.

  • DataTransferFactory - Genera istanze di lettore e scrittore.

Questa API è adatta per la configurazione del nodo master-slave. Parliamo di più suHCatReader e HCatWriter.

HCatReader

HCatReader è una classe astratta interna a HCatalog e astrae le complessità del sistema sottostante da cui devono essere recuperati i record.

Sr.No. Nome e descrizione del metodo
1

Public abstract ReaderContext prepareRead() throws HCatException

Questo dovrebbe essere chiamato al nodo master per ottenere ReaderContext che poi dovrebbe essere serializzato e inviato ai nodi slave.

2

Public abstract Iterator <HCatRecorder> read() throws HCaException

Questo dovrebbe essere chiamato ai nodi slave per leggere HCatRecords.

3

Public Configuration getConf()

Restituirà l'oggetto della classe di configurazione.

La classe HCatReader viene utilizzata per leggere i dati da HDFS. La lettura è un processo in due fasi in cui il primo passaggio avviene sul nodo master di un sistema esterno. La seconda fase viene eseguita in parallelo su più nodi slave.

Le letture vengono eseguite su un file ReadEntity. Prima di iniziare a leggere, è necessario definire una ReadEntity da cui leggere. Questo può essere fatto tramiteReadEntity.Builder. È possibile specificare un nome di database, un nome di tabella, una partizione e una stringa di filtro. Ad esempio:

ReadEntity.Builder builder = new ReadEntity.Builder();
ReadEntity entity = builder.withDatabase("mydb").withTable("mytbl").build(); 10.

Lo snippet di codice precedente definisce un oggetto ReadEntity ("entità"), che comprende una tabella denominata mytbl in un database denominato mydb, che può essere utilizzato per leggere tutte le righe di questa tabella. Notare che questa tabella deve esistere in HCatalog prima dell'inizio di questa operazione.

Dopo aver definito un ReadEntity, ottieni un'istanza di HCatReader utilizzando ReadEntity e la configurazione del cluster -

HCatReader reader = DataTransferFactory.getHCatReader(entity, config);

Il passaggio successivo consiste nell'ottenere un ReaderContext dal lettore come segue:

ReaderContext cntxt = reader.prepareRead();

HCatWriter

Questa astrazione è interna a HCatalog. Questo per facilitare la scrittura su HCatalog da sistemi esterni. Non provare a creare un'istanza direttamente. Utilizza invece DataTransferFactory.

Sr.No. Nome e descrizione del metodo
1

Public abstract WriterContext prepareRead() throws HCatException

Il sistema esterno dovrebbe richiamare questo metodo esattamente una volta da un nodo master. Restituisce un fileWriterContext. Questo dovrebbe essere serializzato e inviato ai nodi slave per la costruzioneHCatWriter Là.

2

Public abstract void write(Iterator<HCatRecord> recordItr) throws HCaException

Questo metodo dovrebbe essere utilizzato sui nodi slave per eseguire le scritture. RecordItr è un oggetto iteratore che contiene la raccolta di record da scrivere in HCatalog.

3

Public abstract void abort(WriterContext cntxt) throws HCatException

Questo metodo dovrebbe essere chiamato nel nodo master. Lo scopo principale di questo metodo è eseguire le pulizie in caso di errori.

4

public abstract void commit(WriterContext cntxt) throws HCatException

Questo metodo dovrebbe essere chiamato nel nodo master. Lo scopo di questo metodo è eseguire il commit dei metadati.

Simile alla lettura, anche la scrittura è un processo in due fasi in cui il primo passaggio avviene sul nodo master. Successivamente, la seconda fase avviene in parallelo sui nodi slave.

Le scritture vengono eseguite su un file WriteEntity che può essere costruito in un modo simile alle letture -

WriteEntity.Builder builder = new WriteEntity.Builder();
WriteEntity entity = builder.withDatabase("mydb").withTable("mytbl").build();

Il codice precedente crea un oggetto WriteEntity entityche può essere utilizzato per scrivere in una tabella denominatamytbl nel database mydb.

Dopo aver creato un WriteEntity, il passaggio successivo è ottenere un WriterContext -

HCatWriter writer = DataTransferFactory.getHCatWriter(entity, config);
WriterContext info = writer.prepareWrite();

Tutti i passaggi precedenti si verificano sul nodo master. Il nodo master serializza quindi l'oggetto WriterContext e lo rende disponibile a tutti gli slave.

Sui nodi slave, è necessario ottenere un HCatWriter utilizzando WriterContext come segue:

HCatWriter writer = DataTransferFactory.getHCatWriter(context);

Poi il writeraccetta un iteratore come argomento per il writemetodo -

writer.write(hCatRecordItr);

Il writer poi chiama getNext() su questo iteratore in un ciclo e scrive tutti i record allegati all'iteratore.

Il TestReaderWriter.javaviene utilizzato per testare le classi HCatreader e HCatWriter. Il seguente programma dimostra come utilizzare HCatReader e HCatWriter API per leggere i dati da un file di origine e successivamente scriverli su un file di destinazione.

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.ql.CommandNeedRetryException;
import org.apache.hadoop.mapreduce.InputSplit;

import org.apache.hive.HCatalog.common.HCatException;
import org.apache.hive.HCatalog.data.transfer.DataTransferFactory;
import org.apache.hive.HCatalog.data.transfer.HCatReader;
import org.apache.hive.HCatalog.data.transfer.HCatWriter;
import org.apache.hive.HCatalog.data.transfer.ReadEntity;
import org.apache.hive.HCatalog.data.transfer.ReaderContext;
import org.apache.hive.HCatalog.data.transfer.WriteEntity;
import org.apache.hive.HCatalog.data.transfer.WriterContext;
import org.apache.hive.HCatalog.mapreduce.HCatBaseTest;

import org.junit.Assert;
import org.junit.Test;

public class TestReaderWriter extends HCatBaseTest {
   @Test
   public void test() throws MetaException, CommandNeedRetryException,
      IOException, ClassNotFoundException {
		
      driver.run("drop table mytbl");
      driver.run("create table mytbl (a string, b int)");
		
      Iterator<Entry<String, String>> itr = hiveConf.iterator();
      Map<String, String> map = new HashMap<String, String>();
		
      while (itr.hasNext()) {
         Entry<String, String> kv = itr.next();
         map.put(kv.getKey(), kv.getValue());
      }
		
      WriterContext cntxt = runsInMaster(map);
      File writeCntxtFile = File.createTempFile("hcat-write", "temp");
      writeCntxtFile.deleteOnExit();
		
      // Serialize context.
      ObjectOutputStream oos = new ObjectOutputStream(new FileOutputStream(writeCntxtFile));
      oos.writeObject(cntxt);
      oos.flush();
      oos.close();
		
      // Now, deserialize it.
      ObjectInputStream ois = new ObjectInputStream(new FileInputStream(writeCntxtFile));
      cntxt = (WriterContext) ois.readObject();
      ois.close();
      runsInSlave(cntxt);
      commit(map, true, cntxt);
		
      ReaderContext readCntxt = runsInMaster(map, false);
      File readCntxtFile = File.createTempFile("hcat-read", "temp");
      readCntxtFile.deleteOnExit();
      oos = new ObjectOutputStream(new FileOutputStream(readCntxtFile));
      oos.writeObject(readCntxt);
      oos.flush();
      oos.close();
		
      ois = new ObjectInputStream(new FileInputStream(readCntxtFile));
      readCntxt = (ReaderContext) ois.readObject();
      ois.close();
		
      for (int i = 0; i < readCntxt.numSplits(); i++) {
         runsInSlave(readCntxt, i);
      }
   }
	
   private WriterContext runsInMaster(Map<String, String> config) throws HCatException {
      WriteEntity.Builder builder = new WriteEntity.Builder();
      WriteEntity entity = builder.withTable("mytbl").build();
		
      HCatWriter writer = DataTransferFactory.getHCatWriter(entity, config);
      WriterContext info = writer.prepareWrite();
      return info;
   }
	
   private ReaderContext runsInMaster(Map<String, String> config, 
      boolean bogus) throws HCatException {
      ReadEntity entity = new ReadEntity.Builder().withTable("mytbl").build();
      HCatReader reader = DataTransferFactory.getHCatReader(entity, config);
      ReaderContext cntxt = reader.prepareRead();
      return cntxt;
   }
	
   private void runsInSlave(ReaderContext cntxt, int slaveNum) throws HCatException {
      HCatReader reader = DataTransferFactory.getHCatReader(cntxt, slaveNum);
      Iterator<HCatRecord> itr = reader.read();
      int i = 1;
		
      while (itr.hasNext()) {
         HCatRecord read = itr.next();
         HCatRecord written = getRecord(i++);
			
         // Argh, HCatRecord doesnt implement equals()
         Assert.assertTrue("Read: " + read.get(0) + "Written: " + written.get(0),
         written.get(0).equals(read.get(0)));
			
         Assert.assertTrue("Read: " + read.get(1) + "Written: " + written.get(1),
         written.get(1).equals(read.get(1)));
			
         Assert.assertEquals(2, read.size());
      }
		
      //Assert.assertFalse(itr.hasNext());
   }
	
   private void runsInSlave(WriterContext context) throws HCatException {
      HCatWriter writer = DataTransferFactory.getHCatWriter(context);
      writer.write(new HCatRecordItr());
   }
	
   private void commit(Map<String, String> config, boolean status,
      WriterContext context) throws IOException {
      WriteEntity.Builder builder = new WriteEntity.Builder();
      WriteEntity entity = builder.withTable("mytbl").build();
      HCatWriter writer = DataTransferFactory.getHCatWriter(entity, config);
		
      if (status) {
         writer.commit(context);
      } else {
         writer.abort(context);
      }
   }
	
   private static HCatRecord getRecord(int i) {
      List<Object> list = new ArrayList<Object>(2);
      list.add("Row #: " + i);
      list.add(i);
      return new DefaultHCatRecord(list);
   }
	
   private static class HCatRecordItr implements Iterator<HCatRecord> {
      int i = 0;
		
      @Override
      public boolean hasNext() {
         return i++ < 100 ? true : false;
      }
		
      @Override
      public HCatRecord next() {
         return getRecord(i);
      }
		
      @Override
      public void remove() {
         throw new RuntimeException();
      }
   }
}

Il programma precedente legge i dati dall'HDFS sotto forma di record e scrive i dati del record in mytable