PySpark - RDD

Ora che abbiamo installato e configurato PySpark sul nostro sistema, possiamo programmare in Python su Apache Spark. Tuttavia, prima di farlo, comprendiamo un concetto fondamentale in Spark - RDD.

RDD sta per Resilient Distributed Dataset, questi sono gli elementi che vengono eseguiti e operano su più nodi per eseguire l'elaborazione parallela su un cluster. Gli RDD sono elementi immutabili, il che significa che una volta creato un RDD non è possibile modificarlo. Anche gli RDD sono tolleranti ai guasti, quindi, in caso di guasto, si ripristinano automaticamente. È possibile applicare più operazioni su questi RDD per ottenere un determinato compito.

Per applicare le operazioni su questi RDD, ci sono due modi:

  • Trasformazione e
  • Action

Cerchiamo di capire questi due modi in dettaglio.

Transformation- Queste sono le operazioni che vengono applicate su un RDD per creare un nuovo RDD. Filter, groupBy e map sono gli esempi di trasformazioni.

Action - Queste sono le operazioni che vengono applicate su RDD, che ordina a Spark di eseguire il calcolo e inviare il risultato al driver.

Per applicare qualsiasi operazione in PySpark, dobbiamo creare un file PySpark RDDprimo. Il seguente blocco di codice contiene i dettagli di una classe RDD PySpark -

class pyspark.RDD (
   jrdd, 
   ctx, 
   jrdd_deserializer = AutoBatchedSerializer(PickleSerializer())
)

Vediamo come eseguire alcune operazioni di base utilizzando PySpark. Il codice seguente in un file Python crea parole RDD, che memorizza un insieme di parole menzionate.

words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)

Ora eseguiremo alcune operazioni sulle parole.

contare()

Viene restituito il numero di elementi nell'RDD.

----------------------------------------count.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "count app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
counts = words.count()
print "Number of elements in RDD -> %i" % (counts)
----------------------------------------count.py---------------------------------------

Command - Il comando per count () è -

$SPARK_HOME/bin/spark-submit count.py

Output - L'output per il comando precedente è -

Number of elements in RDD → 8

raccogliere()

Vengono restituiti tutti gli elementi nell'RDD.

----------------------------------------collect.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Collect app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
coll = words.collect()
print "Elements in RDD -> %s" % (coll)
----------------------------------------collect.py---------------------------------------

Command - Il comando per collect () è -

$SPARK_HOME/bin/spark-submit collect.py

Output - L'output per il comando precedente è -

Elements in RDD -> [
   'scala', 
   'java', 
   'hadoop', 
   'spark', 
   'akka', 
   'spark vs hadoop', 
   'pyspark', 
   'pyspark and spark'
]

foreach (f)

Restituisce solo quegli elementi che soddisfano la condizione della funzione all'interno di foreach. Nell'esempio seguente, chiamiamo una funzione di stampa in foreach, che stampa tutti gli elementi nell'RDD.

----------------------------------------foreach.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "ForEach app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
def f(x): print(x)
fore = words.foreach(f) 
----------------------------------------foreach.py---------------------------------------

Command - Il comando per foreach (f) è -

$SPARK_HOME/bin/spark-submit foreach.py

Output - L'output per il comando precedente è -

scala
java
hadoop
spark
akka
spark vs hadoop
pyspark
pyspark and spark

filtro (f)

Viene restituito un nuovo RDD contenente gli elementi, che soddisfa la funzione all'interno del filtro. Nell'esempio seguente, filtriamo le stringhe contenenti "spark".

----------------------------------------filter.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Filter app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
words_filter = words.filter(lambda x: 'spark' in x)
filtered = words_filter.collect()
print "Fitered RDD -> %s" % (filtered)
----------------------------------------filter.py----------------------------------------

Command - Il comando per il filtro (f) è -

$SPARK_HOME/bin/spark-submit filter.py

Output - L'output per il comando precedente è -

Fitered RDD -> [
   'spark', 
   'spark vs hadoop', 
   'pyspark', 
   'pyspark and spark'
]

map (f, preservesPartitioning = False)

Viene restituito un nuovo RDD applicando una funzione a ciascun elemento nell'RDD. Nell'esempio seguente, formiamo una coppia di valori chiave e mappiamo ogni stringa con un valore di 1.

----------------------------------------map.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Map app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
words_map = words.map(lambda x: (x, 1))
mapping = words_map.collect()
print "Key value pair -> %s" % (mapping)
----------------------------------------map.py---------------------------------------

Command - Il comando per map (f, conservaPartitioning = False) è -

$SPARK_HOME/bin/spark-submit map.py

Output - L'output del comando precedente è -

Key value pair -> [
   ('scala', 1), 
   ('java', 1), 
   ('hadoop', 1), 
   ('spark', 1), 
   ('akka', 1), 
   ('spark vs hadoop', 1), 
   ('pyspark', 1), 
   ('pyspark and spark', 1)
]

ridurre (f)

Dopo aver eseguito l'operazione binaria commutativa e associativa specificata, viene restituito l'elemento nell'RDD. Nell'esempio seguente, stiamo importando add package dall'operatore e applicandolo su 'num' per eseguire una semplice operazione di addizione.

----------------------------------------reduce.py---------------------------------------
from pyspark import SparkContext
from operator import add
sc = SparkContext("local", "Reduce app")
nums = sc.parallelize([1, 2, 3, 4, 5])
adding = nums.reduce(add)
print "Adding all the elements -> %i" % (adding)
----------------------------------------reduce.py---------------------------------------

Command - Il comando per reduce (f) è -

$SPARK_HOME/bin/spark-submit reduce.py

Output - L'output del comando precedente è -

Adding all the elements -> 15

join (altro, numPartitions = Nessuno)

Restituisce RDD con una coppia di elementi con le chiavi corrispondenti e tutti i valori per quella particolare chiave. Nell'esempio seguente, ci sono due coppie di elementi in due RDD differenti. Dopo aver unito questi due RDD, otteniamo un RDD con elementi con chiavi corrispondenti e relativi valori.

----------------------------------------join.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Join app")
x = sc.parallelize([("spark", 1), ("hadoop", 4)])
y = sc.parallelize([("spark", 2), ("hadoop", 5)])
joined = x.join(y)
final = joined.collect()
print "Join RDD -> %s" % (final)
----------------------------------------join.py---------------------------------------

Command - Il comando per join (other, numPartitions = None) è -

$SPARK_HOME/bin/spark-submit join.py

Output - L'output per il comando precedente è -

Join RDD -> [
   ('spark', (1, 2)),  
   ('hadoop', (4, 5))
]

cache ()

Mantieni questo RDD con il livello di archiviazione predefinito (MEMORY_ONLY). Puoi anche verificare se l'RDD è memorizzato nella cache o meno.

----------------------------------------cache.py---------------------------------------
from pyspark import SparkContext 
sc = SparkContext("local", "Cache app") 
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
) 
words.cache() 
caching = words.persist().is_cached 
print "Words got chached > %s" % (caching)
----------------------------------------cache.py---------------------------------------

Command - Il comando per cache () è -

$SPARK_HOME/bin/spark-submit cache.py

Output - L'output per il programma di cui sopra è -

Words got cached -> True

Queste erano alcune delle operazioni più importanti che vengono eseguite su PySpark RDD.