Specifica dello schema a livello di codice

Il secondo metodo per creare DataFrame è tramite l'interfaccia programmatica che consente di costruire uno schema e quindi applicarlo a un RDD esistente. Possiamo creare un DataFrame a livello di codice utilizzando i seguenti tre passaggi.

  • Crea un RDD di righe da un RDD originale.

  • Creare lo schema rappresentato da uno StructType corrispondente alla struttura delle righe nell'RDD creato nel passaggio 1.

  • Applicare lo schema all'RDD di righe tramite il metodo createDataFrame fornito da SQLContext.

Esempio

Consideriamo un esempio di record dei dipendenti in un file di testo denominato employee.txt. Crea uno schema utilizzando DataFrame direttamente leggendo i dati dal file di testo.

Given Data - Guarda i seguenti dati di un file denominato employee.txt posizionato nella rispettiva directory corrente in cui è in esecuzione il punto della shell spark.

1201, satish, 25
1202, krishna, 28
1203, amith, 39
1204, javed, 23
1205, prudvi, 23

Seguire i passaggi indicati di seguito per generare uno schema a livello di programmazione.

Apri Spark Shell

Avvia la shell Spark usando il seguente esempio.

$ spark-shell

Crea oggetto SQLContext

Genera SQLContext utilizzando il seguente comando. Qui,sc significa oggetto SparkContext.

scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc)

Leggi input da file di testo

Crea un RDD DataFrame leggendo un dato dal file di testo denominato employee.txt utilizzando il seguente comando.

scala> val employee = sc.textFile("employee.txt")

Crea uno schema codificato in un formato stringa

Utilizzare il seguente comando per creare uno schema codificato in un formato stringa. Ciò significa, assumere la struttura dei campi di una tabella e passare i nomi dei campi utilizzando un delimitatore.

scala> val schemaString = "id name age"

Produzione

schemaString: String = id name age

Importa le rispettive API

Utilizzare il comando seguente per importare funzionalità di riga e tipi di dati SQL.

scala> import org.apache.spark.sql.Row;
scala> import org.apache.spark.sql.types.{StructType, StructField, StringType};

Genera schema

Il comando seguente viene utilizzato per generare uno schema leggendo il file schemaStringvariabile. Significa che devi leggere ogni campo dividendo l'intera stringa con uno spazio come delimitatore e prendere ogni tipo di campo come tipo String, per impostazione predefinita.

scala> val schema = StructType(schemaString.split(" ").map(fieldName ⇒ StructField(fieldName, StringType, true)))

Applica trasformazione per la lettura di dati da file di testo

Utilizzare il comando seguente per convertire un RDD (dipendente) in righe. Significa che qui stiamo specificando la logica per leggere i dati RDD e archiviarli in rowRDD. Qui stiamo usando due funzioni di mappa: una è un delimitatore per dividere la stringa del record (.map(_.split(","))) e la seconda funzione di mappatura per definire una riga con il valore di indice del campo (.map(e ⇒ Row(e(0).trim.toInt, e(1), e(2).trim.toInt))).

scala> val rowRDD = employee.map(_.split(",")).map(e ⇒ Row(e(0).trim.toInt, e(1), e(2).trim.toInt))

Applica RowRDD nei dati di riga in base allo schema

Utilizzare la seguente istruzione per creare un DataFrame utilizzando rowRDD dati e schema (SCHEMA) variabile.

scala> val employeeDF = sqlContext.createDataFrame(rowRDD, schema)

Produzione

employeeDF: org.apache.spark.sql.DataFrame = [id: string, name: string, age: string]

Memorizza i dati di DataFrame nella tabella

Utilizzare il seguente comando per archiviare DataFrame in una tabella denominata employee.

scala> employeeDF.registerTempTable("employee")

Il employeeil tavolo è ora pronto. Passiamo alcune query SQL nella tabella utilizzando il metodoSQLContext.sql().

Seleziona Query su DataFrame

Utilizzare la seguente istruzione per selezionare tutti i record dal file employeetavolo. Qui usiamo la variabileallrecordsper acquisire tutti i dati dei record. Per visualizzare quei record, chiamashow() metodo su di esso.

scala> val allrecords = sqlContext.sql("SELECT * FROM employee")

Per vedere i dati del risultato di allrecords DataFrame, usa il seguente comando.

scala> allrecords.show()

Produzione

+------+--------+----+
|  id  | name   |age |
+------+--------+----+
| 1201 | satish | 25 |
| 1202 | krishna| 28 |
| 1203 | amith  | 39 |
| 1204 | javed  | 23 |
| 1205 | prudvi | 23 |
+------+--------+----+

Il metodo sqlContext.sqlconsente di costruire DataFrame quando le colonne ei loro tipi non sono noti fino al runtime. Ora puoi eseguire diverse query SQL in esso.