Deduzione dello schema utilizzando la riflessione
Questo metodo utilizza la riflessione per generare lo schema di un RDD che contiene tipi specifici di oggetti. L'interfaccia Scala per Spark SQL supporta la conversione automatica di un RDD contenente classi case in un DataFrame. Ilcase classdefinisce lo schema della tabella. I nomi degli argomenti della classe case vengono letti usando la reflection e diventano i nomi delle colonne.
Le classi case possono anche essere nidificate o contenere tipi complessi come sequenze o array. Questo RDD può essere convertito implicitamente in un DataFrame e quindi registrato come una tabella. Le tabelle possono essere utilizzate nelle successive istruzioni SQL.
Esempio
Consideriamo un esempio di record dei dipendenti in un file di testo denominato employee.txt. Creare un RDD leggendo i dati dal file di testo e convertirlo in DataFrame utilizzando le funzioni SQL predefinite.
Given Data - Dai un'occhiata ai seguenti dati di un file denominato employee.txt posizionato nella rispettiva directory corrente in cui è in esecuzione il punto della shell spark.
1201, satish, 25
1202, krishna, 28
1203, amith, 39
1204, javed, 23
1205, prudvi, 23
I seguenti esempi spiegano come generare uno schema utilizzando Reflections.
Avvia Spark Shell
Avvia Spark Shell usando il seguente comando.
$ spark-shell
Crea SQLContext
Genera SQLContext utilizzando il seguente comando. Qui,sc significa oggetto SparkContext.
scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
Importa funzioni SQL
Utilizzare il comando seguente per importare tutte le funzioni SQL utilizzate per convertire implicitamente un RDD in un DataFrame.
scala> import sqlContext.implicts._
Crea classe caso
Successivamente, dobbiamo definire uno schema per i dati dei record dei dipendenti utilizzando una classe case. Il comando seguente viene utilizzato per dichiarare la classe case in base ai dati forniti (id, nome, età).
scala> case class Employee(id: Int, name: String, age: Int)
defined class Employee
Crea RDD e applica trasformazioni
Utilizzare il seguente comando per generare un RDD denominato empl leggendo i dati da employee.txt e convertendolo in DataFrame, utilizzando le funzioni Map.
Qui vengono definite due funzioni della mappa. Uno è per dividere il record di testo in campi (.map(_.split(“,”))) e la seconda funzione map per convertire i singoli campi (id, name, age) in un oggetto di classe case (.map(e(0).trim.toInt, e(1), e(2).trim.toInt)).
Alla fine, toDF() viene utilizzato per convertire l'oggetto della classe case con schema in un DataFrame.
scala> val empl=sc.textFile("employee.txt")
.map(_.split(","))
.map(e⇒ employee(e(0).trim.toInt,e(1), e(2).trim.toInt))
.toDF()
Produzione
empl: org.apache.spark.sql.DataFrame = [id: int, name: string, age: int]
Memorizza i dati DataFrame in una tabella
Utilizzare il comando seguente per archiviare i dati DataFrame in una tabella denominata employee. Dopo questo comando, possiamo applicare tutti i tipi di istruzioni SQL in esso.
scala> empl.registerTempTable("employee")
Il tavolo dei dipendenti è pronto. Passiamo ora alcune query sql sulla tabella utilizzandoSQLContext.sql() metodo.
Seleziona Query su DataFrame
Utilizzare il comando seguente per selezionare tutti i record dal file employeetavolo. Qui usiamo la variabileallrecordsper acquisire tutti i dati dei record. Per visualizzare quei record, chiamashow() metodo su di esso.
scala> val allrecords = sqlContext.sql("SELeCT * FROM employee")
Per vedere i dati dei risultati di allrecords DataFrame, usa il seguente comando.
scala> allrecords.show()
Produzione
+------+---------+----+
| id | name |age |
+------+---------+----+
| 1201 | satish | 25 |
| 1202 | krishna | 28 |
| 1203 | amith | 39 |
| 1204 | javed | 23 |
| 1205 | prudvi | 23 |
+------+---------+----+
Where Clause Query SQL su DataFrame
Utilizzare il seguente comando per l'applicazione wheredichiarazione in una tabella. Qui, la variabileagefilter memorizza le registrazioni dei dipendenti la cui età è compresa tra 20 e 35 anni.
scala> val agefilter = sqlContext.sql("SELeCT * FROM employee WHERE ageC>=20 AND age <= 35")
Per vedere i dati dei risultati di agefilter DataFrame, usa il seguente comando.
scala> agefilter.show()
Produzione
<console>:25, took 0.112757 s
+------+---------+----+
| id | name |age |
+------+---------+----+
| 1201 | satish | 25 |
| 1202 | krishna | 28 |
| 1204 | javed | 23 |
| 1205 | prudvi | 23 |
+------+---------+----+
Le due query precedenti sono state passate sull'intera tabella DataFrame. Ora proviamo a recuperare i dati dal risultato DataFrame applicandoTransformations su di esso.
Recupera i valori ID da agefilter DataFrame utilizzando l'indice di colonna
La seguente istruzione viene utilizzata per recuperare i valori ID da agefilter Risultato RDD, utilizzando l'indice di campo.
scala> agefilter.map(t=>"ID: "+t(0)).collect().foreach(println)
Produzione
<console>:25, took 0.093844 s
ID: 1201
ID: 1202
ID: 1204
ID: 1205
Questo approccio basato sulla riflessione porta a un codice più conciso e funziona bene quando si conosce già lo schema durante la scrittura dell'applicazione Spark.