Hadoop - MapReduce
MapReduce è un framework utilizzando il quale possiamo scrivere applicazioni per elaborare enormi quantità di dati, in parallelo, su grandi cluster di hardware di base in modo affidabile.
Cos'è MapReduce?
MapReduce è una tecnica di elaborazione e un modello di programma per il calcolo distribuito basato su java. L'algoritmo MapReduce contiene due importanti attività, ovvero Map e Reduce. Map prende un set di dati e lo converte in un altro set di dati, dove i singoli elementi vengono suddivisi in tuple (coppie chiave / valore). In secondo luogo, ridurre l'attività, che prende l'output da una mappa come input e combina quelle tuple di dati in un insieme più piccolo di tuple. Come implica la sequenza del nome MapReduce, l'attività di riduzione viene sempre eseguita dopo il lavoro di mappa.
Il vantaggio principale di MapReduce è che è facile scalare l'elaborazione dei dati su più nodi di elaborazione. Nel modello MapReduce, le primitive di elaborazione dei dati sono chiamate mappatori e riduttori. La scomposizione di un'applicazione di elaborazione dati in mappatori e riduttori a volte non è banale. Tuttavia, una volta che scriviamo un'applicazione nel modulo MapReduce, ridimensionare l'applicazione per eseguirla su centinaia, migliaia o persino decine di migliaia di macchine in un cluster è semplicemente una modifica alla configurazione. Questa semplice scalabilità è ciò che ha attratto molti programmatori a utilizzare il modello MapReduce.
L'algoritmo
Generalmente il paradigma di MapReduce si basa sull'invio al computer dove risiedono i dati!
Il programma MapReduce viene eseguito in tre fasi, ovvero fase della mappa, fase casuale e fase di riduzione.
Map stage- Il compito della mappa o del mappatore consiste nell'elaborare i dati di input. Generalmente i dati di input sono sotto forma di file o directory e sono archiviati nel file system Hadoop (HDFS). Il file di input viene passato riga per riga alla funzione mapper. Il mappatore elabora i dati e crea diversi piccoli blocchi di dati.
Reduce stage - Questa fase è la combinazione di Shuffle stage e il Reducepalcoscenico. Il compito del riduttore è elaborare i dati che provengono dal mappatore. Dopo l'elaborazione, produce un nuovo set di output, che verrà archiviato nell'HDFS.
Durante un processo MapReduce, Hadoop invia le attività di mappa e riduzione ai server appropriati nel cluster.
Il framework gestisce tutti i dettagli del passaggio dei dati come l'emissione di attività, la verifica del completamento delle attività e la copia dei dati nel cluster tra i nodi.
La maggior parte dell'elaborazione avviene su nodi con dati su dischi locali che riduce il traffico di rete.
Dopo il completamento delle attività date, il cluster raccoglie e riduce i dati per formare un risultato appropriato e lo invia di nuovo al server Hadoop.
Input e output (prospettiva Java)
Il framework MapReduce opera su coppie <chiave, valore>, ovvero il framework visualizza l'input per il lavoro come un insieme di coppie <chiave, valore> e produce un insieme di coppie <chiave, valore> come output del lavoro , presumibilmente di diversi tipi.
La chiave e le classi di valore devono essere serializzate dal framework e quindi devono implementare l'interfaccia Writable. Inoltre, le classi chiave devono implementare l'interfaccia Writable-Comparable per facilitare l'ordinamento in base al framework. Tipi di input e output di aMapReduce job - (Input) <k1, v1> → map → <k2, v2> → reduce → <k3, v3> (Output).
Ingresso | Produzione | |
---|---|---|
Carta geografica | <k1, v1> | elenco (<k2, v2>) |
Ridurre | <k2, list (v2)> | elenco (<k3, v3>) |
Terminologia
PayLoad - Le applicazioni implementano le funzioni Mappa e Riduci e costituiscono il nucleo del lavoro.
Mapper - Mapper mappa le coppie chiave / valore di input su un insieme di coppie chiave / valore intermedie.
NamedNode - Nodo che gestisce Hadoop Distributed File System (HDFS).
DataNode - Nodo in cui i dati vengono presentati in anticipo prima che avvenga qualsiasi elaborazione.
MasterNode - Nodo in cui viene eseguito JobTracker e che accetta le richieste di lavoro dai client.
SlaveNode - Nodo in cui viene eseguito il programma Map and Reduce.
JobTracker - Pianifica i lavori e tiene traccia dei lavori assegnati a Task tracker.
Task Tracker - Tiene traccia dell'attività e segnala lo stato a JobTracker.
Job - Un programma è l'esecuzione di un Mapper e Reducer su un set di dati.
Task - Un'esecuzione di un Mapper o di un Reducer su una porzione di dati.
Task Attempt - Una particolare istanza di un tentativo di eseguire un'attività su uno SlaveNode.
Scenario di esempio
Di seguito sono riportati i dati relativi al consumo elettrico di un'organizzazione. Contiene il consumo elettrico mensile e la media annua dei vari anni.
Jan | Feb | Mar | Apr | Maggio | Jun | Lug | Ago | Sep | Ott | Nov | Dic | Media | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|
1979 | 23 | 23 | 2 | 43 | 24 | 25 | 26 | 26 | 26 | 26 | 25 | 26 | 25 |
1980 | 26 | 27 | 28 | 28 | 28 | 30 | 31 | 31 | 31 | 30 | 30 | 30 | 29 |
1981 | 31 | 32 | 32 | 32 | 33 | 34 | 35 | 36 | 36 | 34 | 34 | 34 | 34 |
1984 | 39 | 38 | 39 | 39 | 39 | 41 | 42 | 43 | 40 | 39 | 38 | 38 | 40 |
1985 | 38 | 39 | 39 | 39 | 39 | 41 | 41 | 41 | 00 | 40 | 39 | 39 | 45 |
Se i dati di cui sopra vengono forniti come input, dobbiamo scrivere applicazioni per elaborarli e produrre risultati come la ricerca dell'anno di utilizzo massimo, dell'anno di utilizzo minimo e così via. Questo è un walkover per i programmatori con un numero finito di record. Scriveranno semplicemente la logica per produrre l'output richiesto e passeranno i dati all'applicazione scritta.
Ma si pensi ai dati che rappresentano il consumo elettrico di tutte le industrie su larga scala di un particolare stato, sin dalla sua formazione.
Quando scriviamo applicazioni per elaborare tali dati in blocco,
Ci vorrà molto tempo per l'esecuzione.
Ci sarà un intenso traffico di rete quando spostiamo i dati dall'origine al server di rete e così via.
Per risolvere questi problemi, abbiamo il framework MapReduce.
Dati in ingresso
I dati di cui sopra vengono salvati come sample.txte dato come input. Il file di input appare come mostrato di seguito.
1979 23 23 2 43 24 25 26 26 26 26 25 26 25
1980 26 27 28 28 28 30 31 31 31 30 30 30 29
1981 31 32 32 32 33 34 35 36 36 34 34 34 34
1984 39 38 39 39 39 41 42 43 40 39 38 38 40
1985 38 39 39 39 39 41 41 41 00 40 39 39 45
Programma di esempio
Di seguito è riportato il programma per i dati di esempio utilizzando il framework MapReduce.
package hadoop;
import java.util.*;
import java.io.IOException;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
public class ProcessUnits {
//Mapper class
public static class E_EMapper extends MapReduceBase implements
Mapper<LongWritable ,/*Input key Type */
Text, /*Input value Type*/
Text, /*Output key Type*/
IntWritable> /*Output value Type*/
{
//Map function
public void map(LongWritable key, Text value,
OutputCollector<Text, IntWritable> output,
Reporter reporter) throws IOException {
String line = value.toString();
String lasttoken = null;
StringTokenizer s = new StringTokenizer(line,"\t");
String year = s.nextToken();
while(s.hasMoreTokens()) {
lasttoken = s.nextToken();
}
int avgprice = Integer.parseInt(lasttoken);
output.collect(new Text(year), new IntWritable(avgprice));
}
}
//Reducer class
public static class E_EReduce extends MapReduceBase implements Reducer< Text, IntWritable, Text, IntWritable > {
//Reduce function
public void reduce( Text key, Iterator <IntWritable> values,
OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
int maxavg = 30;
int val = Integer.MIN_VALUE;
while (values.hasNext()) {
if((val = values.next().get())>maxavg) {
output.collect(key, new IntWritable(val));
}
}
}
}
//Main function
public static void main(String args[])throws Exception {
JobConf conf = new JobConf(ProcessUnits.class);
conf.setJobName("max_eletricityunits");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(E_EMapper.class);
conf.setCombinerClass(E_EReduce.class);
conf.setReducerClass(E_EReduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf);
}
}
Salva il programma sopra come ProcessUnits.java. Di seguito viene spiegata la compilazione e l'esecuzione del programma.
Compilazione ed esecuzione del programma Process Units
Supponiamo di essere nella directory home di un utente Hadoop (ad esempio / home / hadoop).
Seguire i passaggi indicati di seguito per compilare ed eseguire il programma sopra.
Passo 1
Il comando seguente serve per creare una directory per memorizzare le classi java compilate.
$ mkdir units
Passo 2
Scarica Hadoop-core-1.2.1.jar,che viene utilizzato per compilare ed eseguire il programma MapReduce. Visitare il seguente collegamento mvnrepository.com per scaricare il jar. Supponiamo che la cartella scaricata sia/home/hadoop/.
Passaggio 3
I seguenti comandi vengono utilizzati per compilare il file ProcessUnits.java programma e creando un vaso per il programma.
$ javac -classpath hadoop-core-1.2.1.jar -d units ProcessUnits.java
$ jar -cvf units.jar -C units/ .
Passaggio 4
Il comando seguente viene utilizzato per creare una directory di input in HDFS.
$HADOOP_HOME/bin/hadoop fs -mkdir input_dir
Passaggio 5
Il seguente comando viene utilizzato per copiare il file di input denominato sample.txtnella directory di input di HDFS.
$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/sample.txt input_dir
Passaggio 6
Il comando seguente viene utilizzato per verificare i file nella directory di input.
$HADOOP_HOME/bin/hadoop fs -ls input_dir/
Passaggio 7
Il seguente comando viene utilizzato per eseguire l'applicazione Eleunit_max prendendo i file di input dalla directory di input.
$HADOOP_HOME/bin/hadoop jar units.jar hadoop.ProcessUnits input_dir output_dir
Attendi qualche istante finché il file non viene eseguito. Dopo l'esecuzione, come mostrato di seguito, l'output conterrà il numero di suddivisioni di input, il numero di attività di mappa, il numero di attività di riduzione, ecc.
INFO mapreduce.Job: Job job_1414748220717_0002
completed successfully
14/10/31 06:02:52
INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read = 61
FILE: Number of bytes written = 279400
FILE: Number of read operations = 0
FILE: Number of large read operations = 0
FILE: Number of write operations = 0
HDFS: Number of bytes read = 546
HDFS: Number of bytes written = 40
HDFS: Number of read operations = 9
HDFS: Number of large read operations = 0
HDFS: Number of write operations = 2 Job Counters
Launched map tasks = 2
Launched reduce tasks = 1
Data-local map tasks = 2
Total time spent by all maps in occupied slots (ms) = 146137
Total time spent by all reduces in occupied slots (ms) = 441
Total time spent by all map tasks (ms) = 14613
Total time spent by all reduce tasks (ms) = 44120
Total vcore-seconds taken by all map tasks = 146137
Total vcore-seconds taken by all reduce tasks = 44120
Total megabyte-seconds taken by all map tasks = 149644288
Total megabyte-seconds taken by all reduce tasks = 45178880
Map-Reduce Framework
Map input records = 5
Map output records = 5
Map output bytes = 45
Map output materialized bytes = 67
Input split bytes = 208
Combine input records = 5
Combine output records = 5
Reduce input groups = 5
Reduce shuffle bytes = 6
Reduce input records = 5
Reduce output records = 5
Spilled Records = 10
Shuffled Maps = 2
Failed Shuffles = 0
Merged Map outputs = 2
GC time elapsed (ms) = 948
CPU time spent (ms) = 5160
Physical memory (bytes) snapshot = 47749120
Virtual memory (bytes) snapshot = 2899349504
Total committed heap usage (bytes) = 277684224
File Output Format Counters
Bytes Written = 40
Passaggio 8
Il comando seguente viene utilizzato per verificare i file risultanti nella cartella di output.
$HADOOP_HOME/bin/hadoop fs -ls output_dir/
Passaggio 9
Il comando seguente viene utilizzato per visualizzare l'output in Part-00000 file. Questo file è generato da HDFS.
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000
Di seguito è riportato l'output generato dal programma MapReduce.
1981 34
1984 40
1985 45
Passaggio 10
Il comando seguente viene utilizzato per copiare la cartella di output da HDFS al file system locale per l'analisi.
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000/bin/hadoop dfs get output_dir /home/hadoop
Comandi importanti
Tutti i comandi Hadoop vengono richiamati dal $HADOOP_HOME/bin/hadoopcomando. L'esecuzione dello script Hadoop senza argomenti stampa la descrizione per tutti i comandi.
Usage - hadoop [--config confdir] COMANDO
La tabella seguente elenca le opzioni disponibili e la loro descrizione.
Sr.No. | Opzione e descrizione |
---|---|
1 | namenode -format Formatta il filesystem DFS. |
2 | secondarynamenode Esegue il namenode secondario DFS. |
3 | namenode Esegue il namenode DFS. |
4 | datanode Esegue un codice dati DFS. |
5 | dfsadmin Esegue un client di amministrazione DFS. |
6 | mradmin Esegue un client di amministrazione Map-Reduce. |
7 | fsck Esegue un'utilità di controllo del file system DFS. |
8 | fs Esegue un client utente generico del file system. |
9 | balancer Esegue un'utilità di bilanciamento del cluster. |
10 | oiv Applica il visualizzatore fsimage offline a un'immagine fsimage. |
11 | fetchdt Recupera un token di delega da NameNode. |
12 | jobtracker Esegue il nodo MapReduce Job Tracker. |
13 | pipes Esegue un lavoro Pipes. |
14 | tasktracker Esegue un nodo MapReduce Task Tracker. |
15 | historyserver Esegue i server della cronologia dei lavori come daemon autonomo. |
16 | job Manipola i lavori MapReduce. |
17 | queue Ottiene informazioni su JobQueues. |
18 | version Stampa la versione. |
19 | jar <jar> Esegue un file jar. |
20 | distcp <srcurl> <desturl> Copia file o directory in modo ricorsivo. |
21 | distcp2 <srcurl> <desturl> DistCp versione 2. |
22 | archive -archiveName NAME -p <parent path> <src>* <dest> Crea un archivio hadoop. |
23 | classpath Stampa il percorso di classe necessario per ottenere il jar Hadoop e le librerie richieste. |
24 | daemonlog Ottieni / Imposta il livello di log per ogni daemon |
Come interagire con i lavori di MapReduce
Utilizzo - lavoro hadoop [GENERIC_OPTIONS]
Le seguenti sono le Opzioni generiche disponibili in un lavoro Hadoop.
Sr.No. | GENERIC_OPTION e descrizione |
---|---|
1 | -submit <job-file> Invia il lavoro. |
2 | -status <job-id> Stampa la mappa e riduce la percentuale di completamento e tutti i contatori dei lavori. |
3 | -counter <job-id> <group-name> <countername> Stampa il valore del contatore. |
4 | -kill <job-id> Uccide il lavoro. |
5 | -events <job-id> <fromevent-#> <#-of-events> Stampa i dettagli degli eventi ricevuti da jobtracker per l'intervallo specificato. |
6 | -history [all] <jobOutputDir> - history < jobOutputDir> Stampa i dettagli del lavoro, i dettagli dei suggerimenti non riusciti e terminati. Ulteriori dettagli sul lavoro come le attività riuscite e i tentativi di attività effettuati per ciascuna attività possono essere visualizzati specificando l'opzione [tutti]. |
7 | -list[all] Visualizza tutti i lavori. -list mostra solo i lavori che devono ancora essere completati. |
8 | -kill-task <task-id> Uccide il compito. Le attività terminate NON vengono conteggiate nei tentativi falliti. |
9 | -fail-task <task-id> Fallisce il compito. Le attività non riuscite vengono conteggiate rispetto ai tentativi falliti. |
10 | -set-priority <job-id> <priority> Modifica la priorità del lavoro. I valori di priorità consentiti sono VERY_HIGH, HIGH, NORMAL, LOW, VERY_LOW |
Per vedere lo stato del lavoro
$ $HADOOP_HOME/bin/hadoop job -status <JOB-ID>
e.g.
$ $HADOOP_HOME/bin/hadoop job -status job_201310191043_0004
Per vedere la cronologia del lavoro output-dir
$ $HADOOP_HOME/bin/hadoop job -history <DIR-NAME>
e.g.
$ $HADOOP_HOME/bin/hadoop job -history /user/expert/output
Per uccidere il lavoro
$ $HADOOP_HOME/bin/hadoop job -kill <JOB-ID>
e.g.
$ $HADOOP_HOME/bin/hadoop job -kill job_201310191043_0004