Hadoop - MapReduce

MapReduce è un framework utilizzando il quale possiamo scrivere applicazioni per elaborare enormi quantità di dati, in parallelo, su grandi cluster di hardware di base in modo affidabile.

Cos'è MapReduce?

MapReduce è una tecnica di elaborazione e un modello di programma per il calcolo distribuito basato su java. L'algoritmo MapReduce contiene due importanti attività, ovvero Map e Reduce. Map prende un set di dati e lo converte in un altro set di dati, dove i singoli elementi vengono suddivisi in tuple (coppie chiave / valore). In secondo luogo, ridurre l'attività, che prende l'output da una mappa come input e combina quelle tuple di dati in un insieme più piccolo di tuple. Come implica la sequenza del nome MapReduce, l'attività di riduzione viene sempre eseguita dopo il lavoro di mappa.

Il vantaggio principale di MapReduce è che è facile scalare l'elaborazione dei dati su più nodi di elaborazione. Nel modello MapReduce, le primitive di elaborazione dei dati sono chiamate mappatori e riduttori. La scomposizione di un'applicazione di elaborazione dati in mappatori e riduttori a volte non è banale. Tuttavia, una volta che scriviamo un'applicazione nel modulo MapReduce, ridimensionare l'applicazione per eseguirla su centinaia, migliaia o persino decine di migliaia di macchine in un cluster è semplicemente una modifica alla configurazione. Questa semplice scalabilità è ciò che ha attratto molti programmatori a utilizzare il modello MapReduce.

L'algoritmo

  • Generalmente il paradigma di MapReduce si basa sull'invio al computer dove risiedono i dati!

  • Il programma MapReduce viene eseguito in tre fasi, ovvero fase della mappa, fase casuale e fase di riduzione.

    • Map stage- Il compito della mappa o del mappatore consiste nell'elaborare i dati di input. Generalmente i dati di input sono sotto forma di file o directory e sono archiviati nel file system Hadoop (HDFS). Il file di input viene passato riga per riga alla funzione mapper. Il mappatore elabora i dati e crea diversi piccoli blocchi di dati.

    • Reduce stage - Questa fase è la combinazione di Shuffle stage e il Reducepalcoscenico. Il compito del riduttore è elaborare i dati che provengono dal mappatore. Dopo l'elaborazione, produce un nuovo set di output, che verrà archiviato nell'HDFS.

  • Durante un processo MapReduce, Hadoop invia le attività di mappa e riduzione ai server appropriati nel cluster.

  • Il framework gestisce tutti i dettagli del passaggio dei dati come l'emissione di attività, la verifica del completamento delle attività e la copia dei dati nel cluster tra i nodi.

  • La maggior parte dell'elaborazione avviene su nodi con dati su dischi locali che riduce il traffico di rete.

  • Dopo il completamento delle attività date, il cluster raccoglie e riduce i dati per formare un risultato appropriato e lo invia di nuovo al server Hadoop.

Input e output (prospettiva Java)

Il framework MapReduce opera su coppie <chiave, valore>, ovvero il framework visualizza l'input per il lavoro come un insieme di coppie <chiave, valore> e produce un insieme di coppie <chiave, valore> come output del lavoro , presumibilmente di diversi tipi.

La chiave e le classi di valore devono essere serializzate dal framework e quindi devono implementare l'interfaccia Writable. Inoltre, le classi chiave devono implementare l'interfaccia Writable-Comparable per facilitare l'ordinamento in base al framework. Tipi di input e output di aMapReduce job - (Input) <k1, v1> → map → <k2, v2> → reduce → <k3, v3> (Output).

Ingresso Produzione
Carta geografica <k1, v1> elenco (<k2, v2>)
Ridurre <k2, list (v2)> elenco (<k3, v3>)

Terminologia

  • PayLoad - Le applicazioni implementano le funzioni Mappa e Riduci e costituiscono il nucleo del lavoro.

  • Mapper - Mapper mappa le coppie chiave / valore di input su un insieme di coppie chiave / valore intermedie.

  • NamedNode - Nodo che gestisce Hadoop Distributed File System (HDFS).

  • DataNode - Nodo in cui i dati vengono presentati in anticipo prima che avvenga qualsiasi elaborazione.

  • MasterNode - Nodo in cui viene eseguito JobTracker e che accetta le richieste di lavoro dai client.

  • SlaveNode - Nodo in cui viene eseguito il programma Map and Reduce.

  • JobTracker - Pianifica i lavori e tiene traccia dei lavori assegnati a Task tracker.

  • Task Tracker - Tiene traccia dell'attività e segnala lo stato a JobTracker.

  • Job - Un programma è l'esecuzione di un Mapper e Reducer su un set di dati.

  • Task - Un'esecuzione di un Mapper o di un Reducer su una porzione di dati.

  • Task Attempt - Una particolare istanza di un tentativo di eseguire un'attività su uno SlaveNode.

Scenario di esempio

Di seguito sono riportati i dati relativi al consumo elettrico di un'organizzazione. Contiene il consumo elettrico mensile e la media annua dei vari anni.

Jan Feb Mar Apr Maggio Jun Lug Ago Sep Ott Nov Dic Media
1979 23 23 2 43 24 25 26 26 26 26 25 26 25
1980 26 27 28 28 28 30 31 31 31 30 30 30 29
1981 31 32 32 32 33 34 35 36 36 34 34 34 34
1984 39 38 39 39 39 41 42 43 40 39 38 38 40
1985 38 39 39 39 39 41 41 41 00 40 39 39 45

Se i dati di cui sopra vengono forniti come input, dobbiamo scrivere applicazioni per elaborarli e produrre risultati come la ricerca dell'anno di utilizzo massimo, dell'anno di utilizzo minimo e così via. Questo è un walkover per i programmatori con un numero finito di record. Scriveranno semplicemente la logica per produrre l'output richiesto e passeranno i dati all'applicazione scritta.

Ma si pensi ai dati che rappresentano il consumo elettrico di tutte le industrie su larga scala di un particolare stato, sin dalla sua formazione.

Quando scriviamo applicazioni per elaborare tali dati in blocco,

  • Ci vorrà molto tempo per l'esecuzione.

  • Ci sarà un intenso traffico di rete quando spostiamo i dati dall'origine al server di rete e così via.

Per risolvere questi problemi, abbiamo il framework MapReduce.

Dati in ingresso

I dati di cui sopra vengono salvati come sample.txte dato come input. Il file di input appare come mostrato di seguito.

1979   23   23   2   43   24   25   26   26   26   26   25   26  25 
1980   26   27   28  28   28   30   31   31   31   30   30   30  29 
1981   31   32   32  32   33   34   35   36   36   34   34   34  34 
1984   39   38   39  39   39   41   42   43   40   39   38   38  40 
1985   38   39   39  39   39   41   41   41   00   40   39   39  45

Programma di esempio

Di seguito è riportato il programma per i dati di esempio utilizzando il framework MapReduce.

package hadoop; 

import java.util.*; 

import java.io.IOException; 
import java.io.IOException; 

import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.conf.*; 
import org.apache.hadoop.io.*; 
import org.apache.hadoop.mapred.*; 
import org.apache.hadoop.util.*; 

public class ProcessUnits {
   //Mapper class 
   public static class E_EMapper extends MapReduceBase implements 
   Mapper<LongWritable ,/*Input key Type */ 
   Text,                /*Input value Type*/ 
   Text,                /*Output key Type*/ 
   IntWritable>        /*Output value Type*/ 
   {
      //Map function 
      public void map(LongWritable key, Text value, 
      OutputCollector<Text, IntWritable> output,   
      
      Reporter reporter) throws IOException { 
         String line = value.toString(); 
         String lasttoken = null; 
         StringTokenizer s = new StringTokenizer(line,"\t"); 
         String year = s.nextToken(); 
         
         while(s.hasMoreTokens()) {
            lasttoken = s.nextToken();
         }
         int avgprice = Integer.parseInt(lasttoken); 
         output.collect(new Text(year), new IntWritable(avgprice)); 
      } 
   }
   
   //Reducer class 
   public static class E_EReduce extends MapReduceBase implements Reducer< Text, IntWritable, Text, IntWritable > {
   
      //Reduce function 
      public void reduce( Text key, Iterator <IntWritable> values, 
      OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { 
         int maxavg = 30; 
         int val = Integer.MIN_VALUE; 
            
         while (values.hasNext()) { 
            if((val = values.next().get())>maxavg) { 
               output.collect(key, new IntWritable(val)); 
            } 
         }
      } 
   }

   //Main function 
   public static void main(String args[])throws Exception { 
      JobConf conf = new JobConf(ProcessUnits.class); 
      
      conf.setJobName("max_eletricityunits"); 
      conf.setOutputKeyClass(Text.class);
      conf.setOutputValueClass(IntWritable.class); 
      conf.setMapperClass(E_EMapper.class); 
      conf.setCombinerClass(E_EReduce.class); 
      conf.setReducerClass(E_EReduce.class); 
      conf.setInputFormat(TextInputFormat.class); 
      conf.setOutputFormat(TextOutputFormat.class); 
      
      FileInputFormat.setInputPaths(conf, new Path(args[0])); 
      FileOutputFormat.setOutputPath(conf, new Path(args[1])); 
      
      JobClient.runJob(conf); 
   } 
}

Salva il programma sopra come ProcessUnits.java. Di seguito viene spiegata la compilazione e l'esecuzione del programma.

Compilazione ed esecuzione del programma Process Units

Supponiamo di essere nella directory home di un utente Hadoop (ad esempio / home / hadoop).

Seguire i passaggi indicati di seguito per compilare ed eseguire il programma sopra.

Passo 1

Il comando seguente serve per creare una directory per memorizzare le classi java compilate.

$ mkdir units

Passo 2

Scarica Hadoop-core-1.2.1.jar,che viene utilizzato per compilare ed eseguire il programma MapReduce. Visitare il seguente collegamento mvnrepository.com per scaricare il jar. Supponiamo che la cartella scaricata sia/home/hadoop/.

Passaggio 3

I seguenti comandi vengono utilizzati per compilare il file ProcessUnits.java programma e creando un vaso per il programma.

$ javac -classpath hadoop-core-1.2.1.jar -d units ProcessUnits.java 
$ jar -cvf units.jar -C units/ .

Passaggio 4

Il comando seguente viene utilizzato per creare una directory di input in HDFS.

$HADOOP_HOME/bin/hadoop fs -mkdir input_dir

Passaggio 5

Il seguente comando viene utilizzato per copiare il file di input denominato sample.txtnella directory di input di HDFS.

$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/sample.txt input_dir

Passaggio 6

Il comando seguente viene utilizzato per verificare i file nella directory di input.

$HADOOP_HOME/bin/hadoop fs -ls input_dir/

Passaggio 7

Il seguente comando viene utilizzato per eseguire l'applicazione Eleunit_max prendendo i file di input dalla directory di input.

$HADOOP_HOME/bin/hadoop jar units.jar hadoop.ProcessUnits input_dir output_dir

Attendi qualche istante finché il file non viene eseguito. Dopo l'esecuzione, come mostrato di seguito, l'output conterrà il numero di suddivisioni di input, il numero di attività di mappa, il numero di attività di riduzione, ecc.

INFO mapreduce.Job: Job job_1414748220717_0002 
completed successfully 
14/10/31 06:02:52 
INFO mapreduce.Job: Counters: 49 
   File System Counters 
 
FILE: Number of bytes read = 61 
FILE: Number of bytes written = 279400 
FILE: Number of read operations = 0 
FILE: Number of large read operations = 0   
FILE: Number of write operations = 0 
HDFS: Number of bytes read = 546 
HDFS: Number of bytes written = 40 
HDFS: Number of read operations = 9 
HDFS: Number of large read operations = 0 
HDFS: Number of write operations = 2 Job Counters 


   Launched map tasks = 2  
   Launched reduce tasks = 1 
   Data-local map tasks = 2  
   Total time spent by all maps in occupied slots (ms) = 146137 
   Total time spent by all reduces in occupied slots (ms) = 441   
   Total time spent by all map tasks (ms) = 14613 
   Total time spent by all reduce tasks (ms) = 44120 
   Total vcore-seconds taken by all map tasks = 146137 
   Total vcore-seconds taken by all reduce tasks = 44120 
   Total megabyte-seconds taken by all map tasks = 149644288 
   Total megabyte-seconds taken by all reduce tasks = 45178880 
   
Map-Reduce Framework 
 
   Map input records = 5  
   Map output records = 5   
   Map output bytes = 45  
   Map output materialized bytes = 67  
   Input split bytes = 208 
   Combine input records = 5  
   Combine output records = 5 
   Reduce input groups = 5  
   Reduce shuffle bytes = 6  
   Reduce input records = 5  
   Reduce output records = 5  
   Spilled Records = 10  
   Shuffled Maps  = 2  
   Failed Shuffles = 0  
   Merged Map outputs = 2  
   GC time elapsed (ms) = 948  
   CPU time spent (ms) = 5160  
   Physical memory (bytes) snapshot = 47749120  
   Virtual memory (bytes) snapshot = 2899349504  
   Total committed heap usage (bytes) = 277684224
     
File Output Format Counters 
 
   Bytes Written = 40

Passaggio 8

Il comando seguente viene utilizzato per verificare i file risultanti nella cartella di output.

$HADOOP_HOME/bin/hadoop fs -ls output_dir/

Passaggio 9

Il comando seguente viene utilizzato per visualizzare l'output in Part-00000 file. Questo file è generato da HDFS.

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000

Di seguito è riportato l'output generato dal programma MapReduce.

1981    34 
1984    40 
1985    45

Passaggio 10

Il comando seguente viene utilizzato per copiare la cartella di output da HDFS al file system locale per l'analisi.

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000/bin/hadoop dfs get output_dir /home/hadoop

Comandi importanti

Tutti i comandi Hadoop vengono richiamati dal $HADOOP_HOME/bin/hadoopcomando. L'esecuzione dello script Hadoop senza argomenti stampa la descrizione per tutti i comandi.

Usage - hadoop [--config confdir] COMANDO

La tabella seguente elenca le opzioni disponibili e la loro descrizione.

Sr.No. Opzione e descrizione
1

namenode -format

Formatta il filesystem DFS.

2

secondarynamenode

Esegue il namenode secondario DFS.

3

namenode

Esegue il namenode DFS.

4

datanode

Esegue un codice dati DFS.

5

dfsadmin

Esegue un client di amministrazione DFS.

6

mradmin

Esegue un client di amministrazione Map-Reduce.

7

fsck

Esegue un'utilità di controllo del file system DFS.

8

fs

Esegue un client utente generico del file system.

9

balancer

Esegue un'utilità di bilanciamento del cluster.

10

oiv

Applica il visualizzatore fsimage offline a un'immagine fsimage.

11

fetchdt

Recupera un token di delega da NameNode.

12

jobtracker

Esegue il nodo MapReduce Job Tracker.

13

pipes

Esegue un lavoro Pipes.

14

tasktracker

Esegue un nodo MapReduce Task Tracker.

15

historyserver

Esegue i server della cronologia dei lavori come daemon autonomo.

16

job

Manipola i lavori MapReduce.

17

queue

Ottiene informazioni su JobQueues.

18

version

Stampa la versione.

19

jar <jar>

Esegue un file jar.

20

distcp <srcurl> <desturl>

Copia file o directory in modo ricorsivo.

21

distcp2 <srcurl> <desturl>

DistCp versione 2.

22

archive -archiveName NAME -p <parent path> <src>* <dest>

Crea un archivio hadoop.

23

classpath

Stampa il percorso di classe necessario per ottenere il jar Hadoop e le librerie richieste.

24

daemonlog

Ottieni / Imposta il livello di log per ogni daemon

Come interagire con i lavori di MapReduce

Utilizzo - lavoro hadoop [GENERIC_OPTIONS]

Le seguenti sono le Opzioni generiche disponibili in un lavoro Hadoop.

Sr.No. GENERIC_OPTION e descrizione
1

-submit <job-file>

Invia il lavoro.

2

-status <job-id>

Stampa la mappa e riduce la percentuale di completamento e tutti i contatori dei lavori.

3

-counter <job-id> <group-name> <countername>

Stampa il valore del contatore.

4

-kill <job-id>

Uccide il lavoro.

5

-events <job-id> <fromevent-#> <#-of-events>

Stampa i dettagli degli eventi ricevuti da jobtracker per l'intervallo specificato.

6

-history [all] <jobOutputDir> - history < jobOutputDir>

Stampa i dettagli del lavoro, i dettagli dei suggerimenti non riusciti e terminati. Ulteriori dettagli sul lavoro come le attività riuscite e i tentativi di attività effettuati per ciascuna attività possono essere visualizzati specificando l'opzione [tutti].

7

-list[all]

Visualizza tutti i lavori. -list mostra solo i lavori che devono ancora essere completati.

8

-kill-task <task-id>

Uccide il compito. Le attività terminate NON vengono conteggiate nei tentativi falliti.

9

-fail-task <task-id>

Fallisce il compito. Le attività non riuscite vengono conteggiate rispetto ai tentativi falliti.

10

-set-priority <job-id> <priority>

Modifica la priorità del lavoro. I valori di priorità consentiti sono VERY_HIGH, HIGH, NORMAL, LOW, VERY_LOW

Per vedere lo stato del lavoro

$ $HADOOP_HOME/bin/hadoop job -status <JOB-ID> 
e.g. 
$ $HADOOP_HOME/bin/hadoop job -status job_201310191043_0004

Per vedere la cronologia del lavoro output-dir

$ $HADOOP_HOME/bin/hadoop job -history <DIR-NAME> 
e.g. 
$ $HADOOP_HOME/bin/hadoop job -history /user/expert/output

Per uccidere il lavoro

$ $HADOOP_HOME/bin/hadoop job -kill <JOB-ID> 
e.g. 
$ $HADOOP_HOME/bin/hadoop job -kill job_201310191043_0004