PySpark - Trasmissione e accumulatore

Per l'elaborazione parallela, Apache Spark utilizza variabili condivise. Una copia della variabile condivisa va su ogni nodo del cluster quando il driver invia un'attività all'esecutore sul cluster, in modo che possa essere utilizzata per eseguire attività.

Esistono due tipi di variabili condivise supportate da Apache Spark:

  • Broadcast
  • Accumulator

Cerchiamo di capirli in dettaglio.

Trasmissione

Le variabili di trasmissione vengono utilizzate per salvare la copia dei dati su tutti i nodi. Questa variabile viene memorizzata nella cache su tutte le macchine e non viene inviata su macchine con attività. Il seguente blocco di codice contiene i dettagli di una classe Broadcast per PySpark.

class pyspark.Broadcast (
   sc = None, 
   value = None, 
   pickle_registry = None, 
   path = None
)

L'esempio seguente mostra come utilizzare una variabile Broadcast. Una variabile Broadcast ha un attributo chiamato valore, che memorizza i dati e viene utilizzato per restituire un valore trasmesso.

----------------------------------------broadcast.py--------------------------------------
from pyspark import SparkContext 
sc = SparkContext("local", "Broadcast app") 
words_new = sc.broadcast(["scala", "java", "hadoop", "spark", "akka"]) 
data = words_new.value 
print "Stored data -> %s" % (data) 
elem = words_new.value[2] 
print "Printing a particular element in RDD -> %s" % (elem)
----------------------------------------broadcast.py--------------------------------------

Command - Il comando per una variabile broadcast è il seguente:

$SPARK_HOME/bin/spark-submit broadcast.py

Output - Di seguito è riportato l'output per il seguente comando.

Stored data -> [
   'scala',  
   'java', 
   'hadoop', 
   'spark', 
   'akka'
]
Printing a particular element in RDD -> hadoop

Accumulatore

Le variabili accumulatrici vengono utilizzate per aggregare le informazioni tramite operazioni associative e commutative. Ad esempio, è possibile utilizzare un accumulatore per un'operazione di somma o contatori (in MapReduce). Il seguente blocco di codice contiene i dettagli di una classe Accumulator per PySpark.

class pyspark.Accumulator(aid, value, accum_param)

L'esempio seguente mostra come utilizzare una variabile Accumulator. Una variabile accumulatore ha un attributo chiamato valore che è simile a quello che ha una variabile di trasmissione. Memorizza i dati e serve per restituire il valore dell'accumulatore, ma utilizzabile solo in un programma driver.

In questo esempio, una variabile accumulatore viene utilizzata da più worker e restituisce un valore accumulato.

----------------------------------------accumulator.py------------------------------------
from pyspark import SparkContext 
sc = SparkContext("local", "Accumulator app") 
num = sc.accumulator(10) 
def f(x): 
   global num 
   num+=x 
rdd = sc.parallelize([20,30,40,50]) 
rdd.foreach(f) 
final = num.value 
print "Accumulated value is -> %i" % (final)
----------------------------------------accumulator.py------------------------------------

Command - Il comando per una variabile accumulatore è il seguente -

$SPARK_HOME/bin/spark-submit accumulator.py

Output - L'output per il comando precedente è fornito di seguito.

Accumulated value is -> 150