PySpark - Guida rapida

In questo capitolo, conosceremo cos'è Apache Spark e come è stato sviluppato PySpark.

Spark - Panoramica

Apache Spark è un framework di elaborazione in tempo reale velocissimo. Esegue calcoli in memoria per analizzare i dati in tempo reale. È entrato in foto comeApache Hadoop MapReducestava eseguendo solo l'elaborazione in batch e mancava una funzione di elaborazione in tempo reale. Pertanto, è stato introdotto Apache Spark in quanto può eseguire l'elaborazione del flusso in tempo reale e può anche occuparsi dell'elaborazione in batch.

Oltre all'elaborazione in tempo reale e in batch, Apache Spark supporta anche query interattive e algoritmi iterativi. Apache Spark dispone di un proprio gestore cluster, dove può ospitare la propria applicazione. Sfrutta Apache Hadoop sia per l'archiviazione che per l'elaborazione. UtilizzaHDFS (Hadoop Distributed File system) per l'archiviazione e può eseguire applicazioni Spark su YARN anche.

PySpark - Panoramica

Apache Spark è scritto in formato Scala programming language. Per supportare Python con Spark, la comunità di Apache Spark ha rilasciato uno strumento, PySpark. Usando PySpark, puoi lavorare conRDDsanche in linguaggio di programmazione Python. È a causa di una libreria chiamataPy4j che sono in grado di raggiungere questo obiettivo.

Offerte PySpark PySpark Shellche collega l'API Python al core Spark e inizializza il contesto Spark. La maggior parte dei data scientist e degli esperti di analisi oggi utilizza Python per via del suo ricco set di librerie. L'integrazione di Python con Spark è un vantaggio per loro.

In questo capitolo capiremo la configurazione dell'ambiente di PySpark.

Note - Questo considerando che hai Java e Scala installati sul tuo computer.

Ora scarichiamo e configuriamo PySpark con i seguenti passaggi.

Step 1- Vai alla pagina di download ufficiale di Apache Spark e scarica l'ultima versione di Apache Spark disponibile lì. In questo tutorial, stiamo usandospark-2.1.0-bin-hadoop2.7.

Step 2- Ora, estrai il file tar Spark scaricato. Per impostazione predefinita, verrà scaricato nella directory Download.

# tar -xvf Downloads/spark-2.1.0-bin-hadoop2.7.tgz

Creerà una directory spark-2.1.0-bin-hadoop2.7. Prima di avviare PySpark, è necessario impostare i seguenti ambienti per impostare il percorso Spark e il filePy4j path.

export SPARK_HOME = /home/hadoop/spark-2.1.0-bin-hadoop2.7
export PATH = $PATH:/home/hadoop/spark-2.1.0-bin-hadoop2.7/bin
export PYTHONPATH = $SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.4-src.zip:$PYTHONPATH
export PATH = $SPARK_HOME/python:$PATH

Oppure, per impostare gli ambienti di cui sopra a livello globale, inseriscili nel file .bashrc file. Quindi eseguire il comando seguente affinché gli ambienti funzionino.

# source .bashrc

Ora che abbiamo impostato tutti gli ambienti, andiamo alla directory Spark e richiamiamo la shell PySpark eseguendo il seguente comando:

# ./bin/pyspark

Questo avvierà la tua shell PySpark.

Python 2.7.12 (default, Nov 19 2016, 06:48:10) 
[GCC 5.4.0 20160609] on linux2
Type "help", "copyright", "credits" or "license" for more information.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.1.0
      /_/
Using Python version 2.7.12 (default, Nov 19 2016 06:48:10)
SparkSession available as 'spark'.
<<<

SparkContext è il punto di ingresso per qualsiasi funzionalità Spark. Quando eseguiamo un'applicazione Spark, viene avviato un programma driver, che ha la funzione principale e il tuo SparkContext viene avviato qui. Il programma driver esegue quindi le operazioni all'interno degli esecutori sui nodi di lavoro.

SparkContext usa Py4J per avviare un file JVM e crea un file JavaSparkContext. Per impostazione predefinita, PySpark ha SparkContext disponibile come‘sc’, quindi la creazione di un nuovo SparkContext non funzionerà.

Il blocco di codice seguente contiene i dettagli di una classe PySpark e i parametri che possono essere accettati da SparkContext.

class pyspark.SparkContext (
   master = None,
   appName = None, 
   sparkHome = None, 
   pyFiles = None, 
   environment = None, 
   batchSize = 0, 
   serializer = PickleSerializer(), 
   conf = None, 
   gateway = None, 
   jsc = None, 
   profiler_cls = <class 'pyspark.profiler.BasicProfiler'>
)

Parametri

Di seguito sono riportati i parametri di SparkContext.

  • Master - È l'URL del cluster a cui si connette.

  • appName - Nome del tuo lavoro.

  • sparkHome - Directory di installazione di Spark.

  • pyFiles - I file .zip o .py da inviare al cluster e da aggiungere a PYTHONPATH.

  • Environment - Variabili di ambiente dei nodi di lavoro.

  • batchSize- Il numero di oggetti Python rappresentati come un singolo oggetto Java. Impostare 1 per disabilitare il batch, 0 per scegliere automaticamente la dimensione del batch in base alle dimensioni degli oggetti o -1 per utilizzare una dimensione del batch illimitata.

  • Serializer - serializzatore RDD.

  • Conf - Un oggetto di L {SparkConf} per impostare tutte le proprietà di Spark.

  • Gateway - Utilizza un gateway e una JVM esistenti, altrimenti inizializza una nuova JVM.

  • JSC - L'istanza JavaSparkContext.

  • profiler_cls - Una classe di Profiler personalizzato utilizzata per eseguire la profilazione (l'impostazione predefinita è pyspark.profiler.BasicProfiler).

Tra i parametri di cui sopra, master e appnamesono principalmente usati. Le prime due righe di qualsiasi programma PySpark hanno l'aspetto mostrato di seguito:

from pyspark import SparkContext
sc = SparkContext("local", "First App")

Esempio di SparkContext - PySpark Shell

Ora che ne sai abbastanza di SparkContext, eseguiamo un semplice esempio sulla shell di PySpark. In questo esempio, conteremo il numero di righe con il carattere "a" o "b" nel fileREADME.mdfile. Quindi, diciamo se ci sono 5 righe in un file e 3 righe hanno il carattere 'a', l'output sarà →Line with a: 3. Lo stesso sarà fatto per il carattere "b".

Note- Non stiamo creando alcun oggetto SparkContext nell'esempio seguente perché, per impostazione predefinita, Spark crea automaticamente l'oggetto SparkContext denominato sc, all'avvio della shell PySpark. Nel caso in cui provi a creare un altro oggetto SparkContext, riceverai il seguente errore:"ValueError: Cannot run multiple SparkContexts at once".

<<< logFile = "file:///home/hadoop/spark-2.1.0-bin-hadoop2.7/README.md"
<<< logData = sc.textFile(logFile).cache()
<<< numAs = logData.filter(lambda s: 'a' in s).count()
<<< numBs = logData.filter(lambda s: 'b' in s).count()
<<< print "Lines with a: %i, lines with b: %i" % (numAs, numBs)
Lines with a: 62, lines with b: 30

Esempio di SparkContext - Programma Python

Eseguiamo lo stesso esempio usando un programma Python. Crea un file Python chiamatofirstapp.py e inserisci il codice seguente in quel file.

----------------------------------------firstapp.py---------------------------------------
from pyspark import SparkContext
logFile = "file:///home/hadoop/spark-2.1.0-bin-hadoop2.7/README.md"  
sc = SparkContext("local", "first app")
logData = sc.textFile(logFile).cache()
numAs = logData.filter(lambda s: 'a' in s).count()
numBs = logData.filter(lambda s: 'b' in s).count()
print "Lines with a: %i, lines with b: %i" % (numAs, numBs)
----------------------------------------firstapp.py---------------------------------------

Quindi eseguiremo il seguente comando nel terminale per eseguire questo file Python. Otterremo lo stesso output di cui sopra.

$SPARK_HOME/bin/spark-submit firstapp.py
Output: Lines with a: 62, lines with b: 30

Ora che abbiamo installato e configurato PySpark sul nostro sistema, possiamo programmare in Python su Apache Spark. Tuttavia, prima di farlo, comprendiamo un concetto fondamentale in Spark - RDD.

RDD sta per Resilient Distributed Dataset, questi sono gli elementi che vengono eseguiti e operano su più nodi per eseguire l'elaborazione parallela su un cluster. Gli RDD sono elementi immutabili, il che significa che una volta creato un RDD non è possibile modificarlo. Anche gli RDD sono tolleranti ai guasti, quindi, in caso di guasto, si ripristinano automaticamente. È possibile applicare più operazioni su questi RDD per ottenere un determinato compito.

Per applicare le operazioni su questi RDD, ci sono due modi:

  • Trasformazione e
  • Action

Cerchiamo di capire questi due modi in dettaglio.

Transformation- Queste sono le operazioni che vengono applicate su un RDD per creare un nuovo RDD. Filter, groupBy e map sono gli esempi di trasformazioni.

Action - Queste sono le operazioni che vengono applicate su RDD, che ordina a Spark di eseguire il calcolo e inviare il risultato al driver.

Per applicare qualsiasi operazione in PySpark, dobbiamo creare un file PySpark RDDprimo. Il seguente blocco di codice contiene i dettagli di una classe RDD PySpark -

class pyspark.RDD (
   jrdd, 
   ctx, 
   jrdd_deserializer = AutoBatchedSerializer(PickleSerializer())
)

Vediamo come eseguire alcune operazioni di base utilizzando PySpark. Il codice seguente in un file Python crea parole RDD, che memorizza un insieme di parole menzionate.

words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)

Ora eseguiremo alcune operazioni sulle parole.

contare()

Viene restituito il numero di elementi nell'RDD.

----------------------------------------count.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "count app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
counts = words.count()
print "Number of elements in RDD -> %i" % (counts)
----------------------------------------count.py---------------------------------------

Command - Il comando per count () è -

$SPARK_HOME/bin/spark-submit count.py

Output - L'output per il comando precedente è -

Number of elements in RDD → 8

raccogliere()

Vengono restituiti tutti gli elementi nell'RDD.

----------------------------------------collect.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Collect app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
coll = words.collect()
print "Elements in RDD -> %s" % (coll)
----------------------------------------collect.py---------------------------------------

Command - Il comando per collect () è -

$SPARK_HOME/bin/spark-submit collect.py

Output - L'output per il comando precedente è -

Elements in RDD -> [
   'scala', 
   'java', 
   'hadoop', 
   'spark', 
   'akka', 
   'spark vs hadoop', 
   'pyspark', 
   'pyspark and spark'
]

foreach (f)

Restituisce solo quegli elementi che soddisfano la condizione della funzione all'interno di foreach. Nell'esempio seguente, chiamiamo una funzione di stampa in foreach, che stampa tutti gli elementi nell'RDD.

----------------------------------------foreach.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "ForEach app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
def f(x): print(x)
fore = words.foreach(f) 
----------------------------------------foreach.py---------------------------------------

Command - Il comando per foreach (f) è -

$SPARK_HOME/bin/spark-submit foreach.py

Output - L'output per il comando precedente è -

scala
java
hadoop
spark
akka
spark vs hadoop
pyspark
pyspark and spark

filtro (f)

Viene restituito un nuovo RDD contenente gli elementi, che soddisfa la funzione all'interno del filtro. Nell'esempio seguente, filtriamo le stringhe contenenti "spark".

----------------------------------------filter.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Filter app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
words_filter = words.filter(lambda x: 'spark' in x)
filtered = words_filter.collect()
print "Fitered RDD -> %s" % (filtered)
----------------------------------------filter.py----------------------------------------

Command - Il comando per il filtro (f) è -

$SPARK_HOME/bin/spark-submit filter.py

Output - L'output per il comando precedente è -

Fitered RDD -> [
   'spark', 
   'spark vs hadoop', 
   'pyspark', 
   'pyspark and spark'
]

map (f, preservesPartitioning = False)

Viene restituito un nuovo RDD applicando una funzione a ciascun elemento nell'RDD. Nell'esempio seguente, formiamo una coppia di valori chiave e mappiamo ogni stringa con un valore di 1.

----------------------------------------map.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Map app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
words_map = words.map(lambda x: (x, 1))
mapping = words_map.collect()
print "Key value pair -> %s" % (mapping)
----------------------------------------map.py---------------------------------------

Command - Il comando per map (f, conservaPartitioning = False) è -

$SPARK_HOME/bin/spark-submit map.py

Output - L'output del comando precedente è -

Key value pair -> [
   ('scala', 1), 
   ('java', 1), 
   ('hadoop', 1), 
   ('spark', 1), 
   ('akka', 1), 
   ('spark vs hadoop', 1), 
   ('pyspark', 1), 
   ('pyspark and spark', 1)
]

ridurre (f)

Dopo aver eseguito l'operazione binaria commutativa e associativa specificata, viene restituito l'elemento nell'RDD. Nell'esempio seguente, stiamo importando add package dall'operatore e applicandolo su 'num' per eseguire una semplice operazione di addizione.

----------------------------------------reduce.py---------------------------------------
from pyspark import SparkContext
from operator import add
sc = SparkContext("local", "Reduce app")
nums = sc.parallelize([1, 2, 3, 4, 5])
adding = nums.reduce(add)
print "Adding all the elements -> %i" % (adding)
----------------------------------------reduce.py---------------------------------------

Command - Il comando per reduce (f) è -

$SPARK_HOME/bin/spark-submit reduce.py

Output - L'output del comando precedente è -

Adding all the elements -> 15

join (altro, numPartitions = Nessuno)

Restituisce RDD con una coppia di elementi con le chiavi corrispondenti e tutti i valori per quella particolare chiave. Nell'esempio seguente, ci sono due coppie di elementi in due RDD differenti. Dopo aver unito questi due RDD, otteniamo un RDD con elementi con chiavi corrispondenti e relativi valori.

----------------------------------------join.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Join app")
x = sc.parallelize([("spark", 1), ("hadoop", 4)])
y = sc.parallelize([("spark", 2), ("hadoop", 5)])
joined = x.join(y)
final = joined.collect()
print "Join RDD -> %s" % (final)
----------------------------------------join.py---------------------------------------

Command - Il comando per join (other, numPartitions = None) è -

$SPARK_HOME/bin/spark-submit join.py

Output - L'output per il comando precedente è -

Join RDD -> [
   ('spark', (1, 2)),  
   ('hadoop', (4, 5))
]

cache ()

Mantieni questo RDD con il livello di archiviazione predefinito (MEMORY_ONLY). Puoi anche verificare se l'RDD è memorizzato nella cache o meno.

----------------------------------------cache.py---------------------------------------
from pyspark import SparkContext 
sc = SparkContext("local", "Cache app") 
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
) 
words.cache() 
caching = words.persist().is_cached 
print "Words got chached > %s" % (caching)
----------------------------------------cache.py---------------------------------------

Command - Il comando per cache () è -

$SPARK_HOME/bin/spark-submit cache.py

Output - L'output per il programma di cui sopra è -

Words got cached -> True

Queste erano alcune delle operazioni più importanti che vengono eseguite su PySpark RDD.

Per l'elaborazione parallela, Apache Spark utilizza variabili condivise. Una copia della variabile condivisa va su ogni nodo del cluster quando il driver invia un'attività all'esecutore sul cluster, in modo che possa essere utilizzata per eseguire attività.

Esistono due tipi di variabili condivise supportate da Apache Spark:

  • Broadcast
  • Accumulator

Cerchiamo di capirli in dettaglio.

Trasmissione

Le variabili di trasmissione vengono utilizzate per salvare la copia dei dati su tutti i nodi. Questa variabile viene memorizzata nella cache su tutte le macchine e non viene inviata su macchine con attività. Il seguente blocco di codice contiene i dettagli di una classe Broadcast per PySpark.

class pyspark.Broadcast (
   sc = None, 
   value = None, 
   pickle_registry = None, 
   path = None
)

L'esempio seguente mostra come utilizzare una variabile Broadcast. Una variabile Broadcast ha un attributo chiamato valore, che memorizza i dati e viene utilizzato per restituire un valore trasmesso.

----------------------------------------broadcast.py--------------------------------------
from pyspark import SparkContext 
sc = SparkContext("local", "Broadcast app") 
words_new = sc.broadcast(["scala", "java", "hadoop", "spark", "akka"]) 
data = words_new.value 
print "Stored data -> %s" % (data) 
elem = words_new.value[2] 
print "Printing a particular element in RDD -> %s" % (elem)
----------------------------------------broadcast.py--------------------------------------

Command - Il comando per una variabile broadcast è il seguente:

$SPARK_HOME/bin/spark-submit broadcast.py

Output - Di seguito è riportato l'output per il seguente comando.

Stored data -> [
   'scala',  
   'java', 
   'hadoop', 
   'spark', 
   'akka'
]
Printing a particular element in RDD -> hadoop

Accumulatore

Le variabili accumulatrici vengono utilizzate per aggregare le informazioni tramite operazioni associative e commutative. Ad esempio, è possibile utilizzare un accumulatore per un'operazione di somma o contatori (in MapReduce). Il seguente blocco di codice contiene i dettagli di una classe Accumulator per PySpark.

class pyspark.Accumulator(aid, value, accum_param)

L'esempio seguente mostra come utilizzare una variabile Accumulator. Una variabile accumulatore ha un attributo chiamato valore che è simile a quello che ha una variabile di trasmissione. Memorizza i dati e serve per restituire il valore dell'accumulatore, ma utilizzabile solo in un programma driver.

In questo esempio, una variabile accumulatore viene utilizzata da più worker e restituisce un valore accumulato.

----------------------------------------accumulator.py------------------------------------
from pyspark import SparkContext 
sc = SparkContext("local", "Accumulator app") 
num = sc.accumulator(10) 
def f(x): 
   global num 
   num+=x 
rdd = sc.parallelize([20,30,40,50]) 
rdd.foreach(f) 
final = num.value 
print "Accumulated value is -> %i" % (final)
----------------------------------------accumulator.py------------------------------------

Command - Il comando per una variabile accumulatore è il seguente -

$SPARK_HOME/bin/spark-submit accumulator.py

Output - L'output per il comando precedente è fornito di seguito.

Accumulated value is -> 150

Per eseguire un'applicazione Spark sul locale / cluster, è necessario impostare alcune configurazioni e parametri, questo è ciò con cui SparkConf aiuta. Fornisce configurazioni per eseguire un'applicazione Spark. Il blocco di codice seguente contiene i dettagli di una classe SparkConf per PySpark.

class pyspark.SparkConf (
   loadDefaults = True, 
   _jvm = None, 
   _jconf = None
)

Inizialmente, creeremo un oggetto SparkConf con SparkConf (), che caricherà i valori da spark.*Anche le proprietà del sistema Java. Ora puoi impostare diversi parametri usando l'oggetto SparkConf ei loro parametri avranno la priorità sulle proprietà di sistema.

In una classe SparkConf sono presenti metodi setter che supportano il concatenamento. Ad esempio, puoi scrivereconf.setAppName(“PySpark App”).setMaster(“local”). Una volta passato un oggetto SparkConf ad Apache Spark, non può essere modificato da nessun utente.

Di seguito sono riportati alcuni degli attributi più comunemente usati di SparkConf:

  • set(key, value) - Per impostare una proprietà di configurazione.

  • setMaster(value) - Per impostare l'URL principale.

  • setAppName(value) - Per impostare il nome di un'applicazione.

  • get(key, defaultValue=None) - Per ottenere un valore di configurazione di una chiave.

  • setSparkHome(value) - Per impostare il percorso di installazione di Spark sui nodi di lavoro.

Consideriamo il seguente esempio di utilizzo di SparkConf in un programma PySpark. In questo esempio, stiamo impostando il nome dell'applicazione Spark comePySpark App e impostando l'URL principale per un'applicazione Spark su → spark://master:7077.

Il seguente blocco di codice ha le linee, quando vengono aggiunte nel file Python, imposta le configurazioni di base per l'esecuzione di un'applicazione PySpark.

---------------------------------------------------------------------------------------
from pyspark import SparkConf, SparkContext
conf = SparkConf().setAppName("PySpark App").setMaster("spark://master:7077")
sc = SparkContext(conf=conf)
---------------------------------------------------------------------------------------

In Apache Spark, puoi caricare i tuoi file usando sc.addFile (sc è il tuo SparkContext predefinito) e ottieni il percorso su un worker usando SparkFiles.get. Pertanto, SparkFiles risolve i percorsi dei file aggiunti tramiteSparkContext.addFile().

SparkFiles contengono i seguenti metodi di classe:

  • get(filename)
  • getrootdirectory()

Cerchiamo di capirli in dettaglio.

get (nome file)

Specifica il percorso del file aggiunto tramite SparkContext.addFile ().

getrootdirectory ()

Specifica il percorso della directory root, che contiene il file aggiunto tramite SparkContext.addFile ().

----------------------------------------sparkfile.py------------------------------------
from pyspark import SparkContext
from pyspark import SparkFiles
finddistance = "/home/hadoop/examples_pyspark/finddistance.R"
finddistancename = "finddistance.R"
sc = SparkContext("local", "SparkFile App")
sc.addFile(finddistance)
print "Absolute Path -> %s" % SparkFiles.get(finddistancename)
----------------------------------------sparkfile.py------------------------------------

Command - Il comando è il seguente -

$SPARK_HOME/bin/spark-submit sparkfiles.py

Output - L'output per il comando precedente è -

Absolute Path -> 
   /tmp/spark-f1170149-af01-4620-9805-f61c85fecee4/userFiles-641dfd0f-240b-4264-a650-4e06e7a57839/finddistance.R

StorageLevel decide come memorizzare RDD. In Apache Spark, StorageLevel decide se RDD deve essere archiviato nella memoria o deve essere archiviato sul disco o entrambi. Decide inoltre se serializzare RDD e se replicare le partizioni RDD.

Il seguente blocco di codice ha la definizione di classe di StorageLevel -

class pyspark.StorageLevel(useDisk, useMemory, useOffHeap, deserialized, replication = 1)

Ora, per decidere l'archiviazione di RDD, ci sono diversi livelli di archiviazione, che sono indicati di seguito:

  • DISK_ONLY = StorageLevel (True, False, False, False, 1)

  • DISK_ONLY_2 = StorageLevel (True, False, False, False, 2)

  • MEMORY_AND_DISK = StorageLevel (True, True, False, False, 1)

  • MEMORY_AND_DISK_2 = StorageLevel (True, True, False, False, 2)

  • MEMORY_AND_DISK_SER = StorageLevel (True, True, False, False, 1)

  • MEMORY_AND_DISK_SER_2 = StorageLevel (True, True, False, False, 2)

  • MEMORY_ONLY = StorageLevel (False, True, False, False, 1)

  • MEMORY_ONLY_2 = StorageLevel (False, True, False, False, 2)

  • MEMORY_ONLY_SER = StorageLevel (False, True, False, False, 1)

  • MEMORY_ONLY_SER_2 = StorageLevel (False, True, False, False, 2)

  • OFF_HEAP = StorageLevel (True, True, True, False, 1)

Consideriamo il seguente esempio di StorageLevel, in cui utilizziamo il livello di archiviazione MEMORY_AND_DISK_2, il che significa che le partizioni RDD avranno la replica di 2.

------------------------------------storagelevel.py-------------------------------------
from pyspark import SparkContext
import pyspark
sc = SparkContext (
   "local", 
   "storagelevel app"
)
rdd1 = sc.parallelize([1,2])
rdd1.persist( pyspark.StorageLevel.MEMORY_AND_DISK_2 )
rdd1.getStorageLevel()
print(rdd1.getStorageLevel())
------------------------------------storagelevel.py-------------------------------------

Command - Il comando è il seguente -

$SPARK_HOME/bin/spark-submit storagelevel.py

Output - L'output per il comando precedente è fornito di seguito -

Disk Memory Serialized 2x Replicated

Apache Spark offre un'API di Machine Learning chiamata MLlib. PySpark ha anche questa API di machine learning in Python. Supporta diversi tipi di algoritmi, che sono menzionati di seguito:

  • mllib.classification - Il spark.mllibIl pacchetto supporta vari metodi per la classificazione binaria, la classificazione multiclasse e l'analisi di regressione. Alcuni degli algoritmi più popolari nella classificazione sonoRandom Forest, Naive Bayes, Decision Tree, eccetera.

  • mllib.clustering - Il clustering è un problema di apprendimento senza supervisione, in base al quale si mira a raggruppare sottoinsiemi di entità tra loro sulla base di una qualche nozione di somiglianza.

  • mllib.fpm- Il pattern matching frequente consiste nell'estrazione di elementi, set di elementi, sottosequenze o altre sottostrutture frequenti che di solito sono tra i primi passaggi per analizzare un set di dati su larga scala. Questo è stato un argomento di ricerca attivo nel data mining per anni.

  • mllib.linalg - Utilità MLlib per algebra lineare.

  • mllib.recommendation- Il filtraggio collaborativo è comunemente utilizzato per i sistemi di raccomandazione. Queste tecniche mirano a riempire le voci mancanti di una matrice di associazione di elementi utente.

  • spark.mllib- Supporta attualmente il filtraggio collaborativo basato su modello, in cui utenti e prodotti vengono descritti da un piccolo insieme di fattori latenti che possono essere utilizzati per prevedere le voci mancanti. spark.mllib utilizza l'algoritmo ALS (Alternating Least Squares) per apprendere questi fattori latenti.

  • mllib.regression- La regressione lineare appartiene alla famiglia degli algoritmi di regressione. L'obiettivo della regressione è trovare relazioni e dipendenze tra le variabili. L'interfaccia per lavorare con modelli di regressione lineare e riepiloghi dei modelli è simile al caso di regressione logistica.

Ci sono anche altri algoritmi, classi e funzioni come parte del pacchetto mllib. A partire da ora, cerchiamo di capire una dimostrazione supyspark.mllib.

L'esempio seguente riguarda il filtraggio collaborativo che utilizza l'algoritmo ALS per creare il modello di raccomandazione e valutarlo sui dati di addestramento.

Dataset used - test.data

1,1,5.0
1,2,1.0
1,3,5.0
1,4,1.0
2,1,5.0
2,2,1.0
2,3,5.0
2,4,1.0
3,1,1.0
3,2,5.0
3,3,1.0
3,4,5.0
4,1,1.0
4,2,5.0
4,3,1.0
4,4,5.0
--------------------------------------recommend.py----------------------------------------
from __future__ import print_function
from pyspark import SparkContext
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
if __name__ == "__main__":
   sc = SparkContext(appName="Pspark mllib Example")
   data = sc.textFile("test.data")
   ratings = data.map(lambda l: l.split(','))\
      .map(lambda l: Rating(int(l[0]), int(l[1]), float(l[2])))
   
   # Build the recommendation model using Alternating Least Squares
   rank = 10
   numIterations = 10
   model = ALS.train(ratings, rank, numIterations)
   
   # Evaluate the model on training data
   testdata = ratings.map(lambda p: (p[0], p[1]))
   predictions = model.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2]))
   ratesAndPreds = ratings.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
   MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
   print("Mean Squared Error = " + str(MSE))
   
   # Save and load model
   model.save(sc, "target/tmp/myCollaborativeFilter")
   sameModel = MatrixFactorizationModel.load(sc, "target/tmp/myCollaborativeFilter")
--------------------------------------recommend.py----------------------------------------

Command - Il comando sarà il seguente -

$SPARK_HOME/bin/spark-submit recommend.py

Output - L'output del comando precedente sarà -

Mean Squared Error = 1.20536041839e-05

La serializzazione viene utilizzata per l'ottimizzazione delle prestazioni su Apache Spark. Tutti i dati inviati in rete o scritti sul disco o conservati nella memoria devono essere serializzati. La serializzazione gioca un ruolo importante nelle operazioni costose.

PySpark supporta serializzatori personalizzati per l'ottimizzazione delle prestazioni. I seguenti due serializzatori sono supportati da PySpark:

MarshalSerializer

Serializza gli oggetti utilizzando Marshal Serializer di Python. Questo serializzatore è più veloce di PickleSerializer, ma supporta meno tipi di dati.

class pyspark.MarshalSerializer

PickleSerializer

Serializza gli oggetti usando Pickle Serializer di Python. Questo serializzatore supporta quasi tutti gli oggetti Python, ma potrebbe non essere veloce quanto i serializzatori più specializzati.

class pyspark.PickleSerializer

Vediamo un esempio sulla serializzazione di PySpark. Qui, serializziamo i dati utilizzando MarshalSerializer.

--------------------------------------serializing.py-------------------------------------
from pyspark.context import SparkContext
from pyspark.serializers import MarshalSerializer
sc = SparkContext("local", "serialization app", serializer = MarshalSerializer())
print(sc.parallelize(list(range(1000))).map(lambda x: 2 * x).take(10))
sc.stop()
--------------------------------------serializing.py-------------------------------------

Command - Il comando è il seguente -

$SPARK_HOME/bin/spark-submit serializing.py

Output - L'output del comando precedente è -

[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]