MapReduce - Implementazione di Hadoop

MapReduce è un framework utilizzato per scrivere applicazioni per elaborare enormi volumi di dati su grandi cluster di hardware di base in modo affidabile. Questo capitolo illustra il funzionamento di MapReduce nel framework Hadoop utilizzando Java.

Algoritmo MapReduce

Generalmente il paradigma MapReduce si basa sull'invio di programmi di riduzione della mappa ai computer in cui risiedono i dati effettivi.

  • Durante un processo MapReduce, Hadoop invia le attività di mappa e riduzione ai server appropriati nel cluster.

  • Il framework gestisce tutti i dettagli del passaggio dei dati come l'emissione di attività, la verifica del completamento delle attività e la copia dei dati nel cluster tra i nodi.

  • La maggior parte dell'elaborazione avviene sui nodi con dati su dischi locali che riducono il traffico di rete.

  • Dopo aver completato una determinata attività, il cluster raccoglie e riduce i dati per formare un risultato appropriato e li invia di nuovo al server Hadoop.

Input e output (prospettiva Java)

Il framework MapReduce opera su coppie chiave-valore, ovvero vede l'input al lavoro come un insieme di coppie chiave-valore e produce un insieme di coppie chiave-valore come output del lavoro, concepibilmente di diversi tipi.

Le classi chiave e valore devono essere serializzabili dal framework e quindi è necessario implementare l'interfaccia Writable. Inoltre, le classi chiave devono implementare l'interfaccia WritableComparable per facilitare l'ordinamento in base al framework.

Sia il formato di input che quello di output di un lavoro MapReduce sono sotto forma di coppie chiave-valore -

(Input) <k1, v1> -> map -> <k2, v2> -> reduce -> <k3, v3> (Output).

Ingresso Produzione
Carta geografica <k1, v1> elenco (<k2, v2>)
Ridurre <k2, list (v2)> elenco (<k3, v3>)

Implementazione di MapReduce

La tabella seguente mostra i dati relativi al consumo elettrico di un'organizzazione. La tabella include il consumo elettrico mensile e la media annuale per cinque anni consecutivi.

Jan Feb Mar Apr Maggio Jun Lug Ago Sep Ott Nov Dic Media
1979 23 23 2 43 24 25 26 26 26 26 25 26 25
1980 26 27 28 28 28 30 31 31 31 30 30 30 29
1981 31 32 32 32 33 34 35 36 36 34 34 34 34
1984 39 38 39 39 39 41 42 43 40 39 38 38 40
1985 38 39 39 39 39 41 41 41 00 40 39 39 45

Abbiamo bisogno di scrivere applicazioni per elaborare i dati di input nella tabella data per trovare l'anno di utilizzo massimo, l'anno di utilizzo minimo e così via. Questo compito è facile per i programmatori con una quantità finita di record, poiché scriveranno semplicemente la logica per produrre l'output richiesto e passeranno i dati all'applicazione scritta.

Alziamo ora la scala dei dati di input. Supponiamo di dover analizzare il consumo elettrico di tutte le industrie su larga scala di un particolare stato. Quando scriviamo applicazioni per elaborare tali dati in blocco,

  • Ci vorrà molto tempo per l'esecuzione.

  • Il traffico di rete sarà intenso quando spostiamo i dati dall'origine al server di rete.

Per risolvere questi problemi, abbiamo il framework MapReduce.

Dati in ingresso

I dati di cui sopra vengono salvati come sample.txte dato come input. Il file di input appare come mostrato di seguito.

1979 23 23 2 43 24 25 26 26 26 26 25 26 25
1980 26 27 28 28 28 30 31 31 31 30 30 30 29
1981 31 32 32 32 33 34 35 36 36 34 34 34 34
1984 39 38 39 39 39 41 42 43 40 39 38 38 40
1985 38 39 39 39 39 41 41 41 00 40 39 39 45

Programma di esempio

Il seguente programma per i dati di esempio utilizza il framework MapReduce.

package hadoop;

import java.util.*;
import java.io.IOException;
import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;

public class ProcessUnits
{
   //Mapper class
   public static class E_EMapper extends MapReduceBase implements
   Mapper<LongWritable,  /*Input key Type */
   Text,                   /*Input value Type*/
   Text,                   /*Output key Type*/
   IntWritable>            /*Output value Type*/
   {
      //Map function
      public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException
      {
         String line = value.toString();
         String lasttoken = null;
         StringTokenizer s = new StringTokenizer(line,"\t");
         String year = s.nextToken();
         
         while(s.hasMoreTokens()){
            lasttoken=s.nextToken();
         }
         
         int avgprice = Integer.parseInt(lasttoken);
         output.collect(new Text(year), new IntWritable(avgprice));
      }
   }
   
   //Reducer class
	
   public static class E_EReduce extends MapReduceBase implements
   Reducer< Text, IntWritable, Text, IntWritable >
   {
      //Reduce function
      public void reduce(Text key, Iterator <IntWritable> values, OutputCollector>Text, IntWritable> output, Reporter reporter) throws IOException
      {
         int maxavg=30;
         int val=Integer.MIN_VALUE;
         while (values.hasNext())
         {
            if((val=values.next().get())>maxavg)
            {
               output.collect(key, new IntWritable(val));
            }
         }
      }
   }
	
   //Main function
	
   public static void main(String args[])throws Exception
   {
      JobConf conf = new JobConf(Eleunits.class);
		
      conf.setJobName("max_eletricityunits");
		
      conf.setOutputKeyClass(Text.class);
      conf.setOutputValueClass(IntWritable.class);
		
      conf.setMapperClass(E_EMapper.class);
      conf.setCombinerClass(E_EReduce.class);
      conf.setReducerClass(E_EReduce.class);
		
      conf.setInputFormat(TextInputFormat.class);
      conf.setOutputFormat(TextOutputFormat.class);
		
      FileInputFormat.setInputPaths(conf, new Path(args[0]));
      FileOutputFormat.setOutputPath(conf, new Path(args[1]));
		
      JobClient.runJob(conf);
   }
}

Salva il programma sopra in ProcessUnits.java. Di seguito viene fornita la compilazione e l'esecuzione del programma.

Compilazione ed esecuzione del programma ProcessUnits

Supponiamo di essere nella directory home dell'utente Hadoop (ad esempio / home / hadoop).

Seguire i passaggi indicati di seguito per compilare ed eseguire il programma sopra.

Step 1 - Utilizzare il seguente comando per creare una directory in cui memorizzare le classi java compilate.

$ mkdir units

Step 2- Scarica Hadoop-core-1.2.1.jar, che viene utilizzato per compilare ed eseguire il programma MapReduce. Scarica il jar da mvnrepository.com . Supponiamo che la cartella di download sia / home / hadoop /.

Step 3 - I seguenti comandi vengono utilizzati per compilare il file ProcessUnits.java programma e per creare un vaso per il programma.

$ javac -classpath hadoop-core-1.2.1.jar -d units ProcessUnits.java
$ jar -cvf units.jar -C units/ .

Step 4 - Il comando seguente viene utilizzato per creare una directory di input in HDFS.

$HADOOP_HOME/bin/hadoop fs -mkdir input_dir

Step 5 - Il seguente comando viene utilizzato per copiare il file di input denominato sample.txt nella directory di input di HDFS.

$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/sample.txt input_dir

Step 6 - Il seguente comando viene utilizzato per verificare i file nella directory di input

$HADOOP_HOME/bin/hadoop fs -ls input_dir/

Step 7 - Il seguente comando viene utilizzato per eseguire l'applicazione Eleunit_max prendendo i file di input dalla directory di input.

$HADOOP_HOME/bin/hadoop jar units.jar hadoop.ProcessUnits input_dir output_dir

Attendi qualche istante finché il file non viene eseguito. Dopo l'esecuzione, l'output contiene una serie di suddivisioni di input, attività di mappa, attività di riduzione e così via.

INFO mapreduce.Job: Job job_1414748220717_0002
completed successfully
14/10/31 06:02:52
INFO mapreduce.Job: Counters: 49

File System Counters
   
   FILE: Number of bytes read=61
   FILE: Number of bytes written=279400
   FILE: Number of read operations=0
   FILE: Number of large read operations=0
   FILE: Number of write operations=0

   HDFS: Number of bytes read=546
   HDFS: Number of bytes written=40
   HDFS: Number of read operations=9
   HDFS: Number of large read operations=0
   HDFS: Number of write operations=2 Job Counters
   
   Launched map tasks=2
   Launched reduce tasks=1
   Data-local map tasks=2
	
   Total time spent by all maps in occupied slots (ms)=146137
   Total time spent by all reduces in occupied slots (ms)=441
   Total time spent by all map tasks (ms)=14613
   Total time spent by all reduce tasks (ms)=44120
	
   Total vcore-seconds taken by all map tasks=146137
   Total vcore-seconds taken by all reduce tasks=44120
	
   Total megabyte-seconds taken by all map tasks=149644288
   Total megabyte-seconds taken by all reduce tasks=45178880

Map-Reduce Framework
   
   Map input records=5
	
   Map output records=5
   Map output bytes=45
   Map output materialized bytes=67
	
   Input split bytes=208
   Combine input records=5
   Combine output records=5
	
   Reduce input groups=5
   Reduce shuffle bytes=6
   Reduce input records=5
   Reduce output records=5
	
   Spilled Records=10
   Shuffled Maps =2
   Failed Shuffles=0
   Merged Map outputs=2
	
   GC time elapsed (ms)=948
   CPU time spent (ms)=5160
	
   Physical memory (bytes) snapshot=47749120
   Virtual memory (bytes) snapshot=2899349504
	
   Total committed heap usage (bytes)=277684224

File Output Format Counters

   Bytes Written=40

Step 8 - Il seguente comando viene utilizzato per verificare i file risultanti nella cartella di output.

$HADOOP_HOME/bin/hadoop fs -ls output_dir/

Step 9 - Il seguente comando viene utilizzato per vedere l'output in Part-00000file. Questo file è generato da HDFS.

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000

Di seguito è riportato l'output generato dal programma MapReduce:

1981 34
1984 40
1985 45

Step 10 - Il seguente comando viene utilizzato per copiare la cartella di output da HDFS al file system locale.

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000/bin/hadoop dfs -get output_dir /home/hadoop