Programmazione Spark avanzata

Spark contiene due diversi tipi di variabili condivise: una è broadcast variables e il secondo è accumulators.

  • Broadcast variables - utilizzato per distribuire in modo efficiente grandi valori.

  • Accumulators - utilizzato per aggregare le informazioni di una particolare raccolta.

Variabili di trasmissione

Le variabili di trasmissione consentono al programmatore di mantenere una variabile di sola lettura memorizzata nella cache su ogni macchina invece di spedirne una copia con le attività. Possono essere utilizzati, ad esempio, per fornire a ogni nodo una copia di un ampio set di dati di input, in modo efficiente. Spark tenta inoltre di distribuire le variabili di trasmissione utilizzando algoritmi di trasmissione efficienti per ridurre i costi di comunicazione.

Le azioni Spark vengono eseguite attraverso una serie di fasi, separate da operazioni di "shuffle" distribuite. Spark trasmette automaticamente i dati comuni necessari alle attività all'interno di ciascuna fase.

I dati trasmessi in questo modo vengono memorizzati nella cache in formato serializzato e vengono deserializzati prima di eseguire ciascuna attività. Ciò significa che la creazione esplicita di variabili di trasmissione è utile solo quando le attività in più fasi richiedono gli stessi dati o quando è importante memorizzare nella cache i dati in forma deserializzata.

Le variabili di trasmissione vengono create da una variabile v a chiamata SparkContext.broadcast(v). La variabile broadcast è un wrapperv, ed è possibile accedere al suo valore chiamando il valuemetodo. Il codice riportato di seguito mostra questo:

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))

Output -

broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)

Dopo che la variabile di trasmissione è stata creata, dovrebbe essere utilizzata al posto del valore v in tutte le funzioni eseguite sul cluster, quindi vnon viene spedito ai nodi più di una volta. Inoltre, l'oggettov non deve essere modificato dopo la sua trasmissione, in modo da garantire che tutti i nodi ricevano lo stesso valore della variabile di trasmissione.

Accumulatori

Gli accumulatori sono variabili che vengono "aggiunte" solo tramite un'operazione associativa e possono quindi essere supportate in modo efficiente in parallelo. Possono essere usati per implementare contatori (come in MapReduce) o somme. Spark supporta nativamente gli accumulatori di tipi numerici ei programmatori possono aggiungere il supporto per nuovi tipi. Se gli accumulatori vengono creati con un nome, verranno visualizzati in formatoSpark’s UI. Questo può essere utile per comprendere lo stato di avanzamento delle fasi in esecuzione (NOTA: non è ancora supportato in Python).

Un accumulatore viene creato da un valore iniziale v a chiamata SparkContext.accumulator(v). Le attività in esecuzione sul cluster possono quindi aggiungersi ad esso utilizzando iladdo l'operatore + = (in Scala e Python). Tuttavia, non possono leggere il suo valore. Solo il programma del driver può leggere il valore dell'accumulatore, utilizzando il suovalue metodo.

Il codice riportato di seguito mostra un accumulatore utilizzato per sommare gli elementi di un array:

scala> val accum = sc.accumulator(0) 
 
scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)

Se vuoi vedere l'output del codice sopra, usa il seguente comando:

scala> accum.value

Produzione

res2: Int = 10

Operazioni RDD numeriche

Spark ti consente di eseguire diverse operazioni sui dati numerici, utilizzando uno dei metodi API predefiniti. Le operazioni numeriche di Spark vengono implementate con un algoritmo di streaming che consente di costruire il modello, un elemento alla volta.

Queste operazioni vengono calcolate e restituite come file StatusCounter oggetto chiamando status() metodo.

Di seguito è riportato un elenco di metodi numerici disponibili in StatusCounter.

S.No Metodi e significato
1

count()

Numero di elementi nell'RDD.

2

Mean()

Media degli elementi nella RDD.

3

Sum()

Valore totale degli elementi nella RDD.

4

Max()

Valore massimo tra tutti gli elementi nell'RDD.

5

Min()

Valore minimo tra tutti gli elementi nell'RDD.

6

Variance()

Varianza degli elementi.

7

Stdev()

Deviazione standard.

Se vuoi usare solo uno di questi metodi, puoi chiamare il metodo corrispondente direttamente su RDD.