Hadoop - Streaming

Lo streaming di Hadoop è un'utilità fornita con la distribuzione di Hadoop. Questa utilità consente di creare ed eseguire lavori di mappatura / riduzione con qualsiasi eseguibile o script come mappatore e / o riduttore.

Esempio di utilizzo di Python

Per lo streaming Hadoop, stiamo considerando il problema del conteggio delle parole. Qualsiasi lavoro in Hadoop deve avere due fasi: mappatore e riduttore. Abbiamo scritto codici per il mappatore e il riduttore in script Python per eseguirlo sotto Hadoop. Si può anche scrivere lo stesso in Perl e Ruby.

Codice fase mappatore

!/usr/bin/python

import sys

# Input takes from standard input for myline in sys.stdin: 
   # Remove whitespace either side 
   myline = myline.strip() 

   # Break the line into words 
   words = myline.split() 

   # Iterate the words list
   for myword in words:
      # Write the results to standard output 
      print '%s\t%s' % (myword, 1)

Assicurati che questo file abbia il permesso di esecuzione (chmod + x / home / expert / hadoop-1.2.1 / mapper.py).

Codice fase riduttore

#!/usr/bin/python

from operator import itemgetter 
import sys 

current_word = ""
current_count = 0 
word = "" 

# Input takes from standard input for myline in sys.stdin: 
   # Remove whitespace either side 
   myline = myline.strip() 

   # Split the input we got from mapper.py word, 
   count = myline.split('\t', 1) 

   # Convert count variable to integer 
   try: 
      count = int(count) 

   except ValueError: 
      # Count was not a number, so silently ignore this line continue

   if current_word == word: 
   current_count += count 
   else: 
      if current_word: 
         # Write result to standard output print '%s\t%s' % (current_word, current_count) 
   
      current_count = count
      current_word = word

# Do not forget to output the last word if needed! 
if current_word == word: 
   print '%s\t%s' % (current_word, current_count)

Salva i codici mappatore e riduttore in mapper.py e riduttore.py nella directory principale di Hadoop. Assicurati che questi file abbiano il permesso di esecuzione (chmod + x mapper.py e chmod + x riduttore.py). Poiché Python è sensibile all'indentazione, lo stesso codice può essere scaricato dal collegamento sottostante.

Esecuzione del programma WordCount

$ $HADOOP_HOME/bin/hadoop jar contrib/streaming/hadoop-streaming-1.
2.1.jar \
   -input input_dirs \ 
   -output output_dir \ 
   -mapper <path/mapper.py \ 
   -reducer <path/reducer.py

Dove "\" viene utilizzato per la continuazione della riga per una chiara leggibilità.

Per esempio,

./bin/hadoop jar contrib/streaming/hadoop-streaming-1.2.1.jar -input myinput -output myoutput -mapper /home/expert/hadoop-1.2.1/mapper.py -reducer /home/expert/hadoop-1.2.1/reducer.py

Come funziona lo streaming

Nell'esempio precedente, sia il mapper che il riduttore sono script Python che leggono l'input dallo standard input ed emettono l'output nell'output standard. L'utilità creerà un lavoro Mappa / Riduci, inoltrerà il lavoro a un cluster appropriato e monitorerà l'avanzamento del lavoro fino al suo completamento.

Quando viene specificato uno script per i mappatori, ogni attività del mapper avvierà lo script come processo separato quando il mapper viene inizializzato. Durante l'esecuzione dell'attività di mappatura, converte i suoi input in linee e inserisce le linee nell'input standard (STDIN) del processo. Nel frattempo, il mappatore raccoglie gli output orientati alla riga dallo standard output (STDOUT) del processo e converte ogni riga in una coppia chiave / valore, che viene raccolta come output del mappatore. Per impostazione predefinita, il prefisso di una riga fino al primo carattere di tabulazione è la chiave e il resto della riga (escluso il carattere di tabulazione) sarà il valore. Se non è presente alcun carattere di tabulazione nella riga, l'intera riga viene considerata come chiave e il valore è nullo. Tuttavia, questo può essere personalizzato, secondo una necessità.

Quando viene specificato uno script per i riduttori, ciascuna attività del riduttore avvierà lo script come processo separato, quindi il riduttore viene inizializzato. Durante l'esecuzione dell'attività del riduttore, converte le sue coppie chiave / valori di input in righe e le invia allo standard input (STDIN) del processo. Nel frattempo, il riduttore raccoglie gli output orientati alla linea dallo standard output (STDOUT) del processo, converte ogni riga in una coppia chiave / valore, che viene raccolta come output del riduttore. Per impostazione predefinita, il prefisso di una riga fino al primo carattere di tabulazione è la chiave e il resto della riga (escluso il carattere di tabulazione) è il valore. Tuttavia, questo può essere personalizzato secondo requisiti specifici.

Comandi importanti

Parametri Opzioni Descrizione
-indirizzario di ingresso / nome-file necessario Posizione di input per mappatore.
-output nome-directory necessario Posizione di uscita per riduttore.
-mapper eseguibile o script o JavaClassName necessario Eseguibile del mapping.
-Riduttore eseguibile o script o JavaClassName necessario Riduttore eseguibile.
-file nome-file Opzionale Rende l'eseguibile mappatore, riduttore o combinatore disponibile localmente sui nodi di calcolo.
-inputformat JavaClassName Opzionale La classe fornita deve restituire le coppie chiave / valore della classe Text. Se non specificato, TextInputFormat viene utilizzato come impostazione predefinita.
-outputformat JavaClassName Opzionale La classe che fornisci dovrebbe prendere coppie chiave / valore della classe Text. Se non specificato, TextOutputformat viene utilizzato come impostazione predefinita.
-partitioner JavaClassName Opzionale Classe che determina a quale riduzione viene inviata una chiave.
-combiner streamingCommand o JavaClassName Opzionale Eseguibile del combinatore per l'output della mappa.
-cmdenv nome = valore Opzionale Passa la variabile di ambiente ai comandi di streaming.
-inputreader Opzionale Per compatibilità con le versioni precedenti: specifica una classe di lettore di record (invece di una classe di formato di input).
-verbose Opzionale Output dettagliato.
-lazyOutput Opzionale Crea output pigramente. Ad esempio, se il formato di output è basato su FileOutputFormat, il file di output viene creato solo alla prima chiamata a output.collect (o Context.write).
-numReduceTasks Opzionale Specifica il numero di riduttori.
-mapdebug Opzionale Script da chiamare quando l'attività della mappa fallisce.
-reducedebug Opzionale Script da chiamare quando l'attività di riduzione non riesce.