Spark SQL - DataFrames

Un DataFrame è una raccolta distribuita di dati, organizzata in colonne denominate. Concettualmente, è equivalente a tabelle relazionali con buone tecniche di ottimizzazione.

Un DataFrame può essere costruito da una matrice di origini diverse come tabelle Hive, file di dati strutturati, database esterni o RDD esistenti. Questa API è stata progettata per applicazioni moderne di Big Data e data science prendendo ispirazioneDataFrame in R Programming e Pandas in Python.

Caratteristiche di DataFrame

Ecco una serie di alcune caratteristiche caratteristiche di DataFrame:

  • Capacità di elaborare i dati nella dimensione da Kilobyte a Petabyte su un cluster a nodo singolo in cluster di grandi dimensioni.

  • Supporta diversi formati di dati (Avro, csv, ricerca elastica e Cassandra) e sistemi di archiviazione (HDFS, tabelle HIVE, mysql, ecc.).

  • Ottimizzazione dello stato dell'arte e generazione di codice tramite l'ottimizzatore Spark SQL Catalyst (framework di trasformazione dell'albero).

  • Può essere facilmente integrato con tutti gli strumenti e framework di Big Data tramite Spark-Core.

  • Fornisce API per la programmazione Python, Java, Scala e R.

SQLContext

SQLContext è una classe e viene utilizzata per inizializzare le funzionalità di Spark SQL. SparkContext classe oggetto (sc) è necessario per inizializzare l'oggetto classe SQLContext.

Il comando seguente viene utilizzato per inizializzare SparkContext tramite spark-shell.

$ spark-shell

Per impostazione predefinita, l'oggetto SparkContext viene inizializzato con il nome sc quando parte la scintilla.

Utilizzare il comando seguente per creare SQLContext.

scala> val sqlcontext = new org.apache.spark.sql.SQLContext(sc)

Esempio

Consideriamo un esempio di record dei dipendenti in un file JSON denominato employee.json. Utilizzare i seguenti comandi per creare un DataFrame (df) e leggere un documento JSON denominatoemployee.json con il seguente contenuto.

employee.json - Posiziona questo file nella directory in cui si trova il file scala> il puntatore si trova.

{
   {"id" : "1201", "name" : "satish", "age" : "25"}
   {"id" : "1202", "name" : "krishna", "age" : "28"}
   {"id" : "1203", "name" : "amith", "age" : "39"}
   {"id" : "1204", "name" : "javed", "age" : "23"}
   {"id" : "1205", "name" : "prudvi", "age" : "23"}
}

Operazioni DataFrame

DataFrame fornisce un linguaggio specifico del dominio per la manipolazione dei dati strutturati. Qui, includiamo alcuni esempi di base di elaborazione di dati strutturati utilizzando DataFrame.

Seguire i passaggi indicati di seguito per eseguire le operazioni DataFrame -

Leggi il documento JSON

Innanzitutto, dobbiamo leggere il documento JSON. Sulla base di ciò, genera un DataFrame denominato (dfs).

Utilizza il seguente comando per leggere il documento JSON denominato employee.json. I dati vengono visualizzati come una tabella con i campi: id, nome ed età.

scala> val dfs = sqlContext.read.json("employee.json")

Output - I nomi dei campi vengono presi automaticamente da employee.json.

dfs: org.apache.spark.sql.DataFrame = [age: string, id: string, name: string]

Mostra i dati

Se vuoi vedere i dati nel DataFrame, usa il seguente comando.

scala> dfs.show()

Output - Puoi vedere i dati dei dipendenti in formato tabulare.

<console>:22, took 0.052610 s
+----+------+--------+
|age | id   |  name  |
+----+------+--------+
| 25 | 1201 | satish |
| 28 | 1202 | krishna|
| 39 | 1203 | amith  |
| 23 | 1204 | javed  |
| 23 | 1205 | prudvi |
+----+------+--------+

Usa il metodo printSchema

Se vuoi vedere la struttura (schema) del DataFrame, usa il seguente comando.

scala> dfs.printSchema()

Output

root
   |-- age: string (nullable = true)
   |-- id: string (nullable = true)
   |-- name: string (nullable = true)

Usa metodo di selezione

Usa il seguente comando per recuperare name-colonna tra tre colonne dal DataFrame.

scala> dfs.select("name").show()

Output - Puoi vedere i valori di name colonna.

<console>:22, took 0.044023 s
+--------+
|  name  |
+--------+
| satish |
| krishna|
| amith  |
| javed  |
| prudvi |
+--------+

Usa filtro età

Utilizzare il seguente comando per trovare i dipendenti la cui età è maggiore di 23 (età> 23).

scala> dfs.filter(dfs("age") > 23).show()

Output

<console>:22, took 0.078670 s
+----+------+--------+
|age | id   | name   |
+----+------+--------+
| 25 | 1201 | satish |
| 28 | 1202 | krishna|
| 39 | 1203 | amith  |
+----+------+--------+

Usa il metodo groupBy

Utilizzare il seguente comando per contare il numero di dipendenti che hanno la stessa età.

scala> dfs.groupBy("age").count().show()

Output - due dipendenti hanno 23 anni.

<console>:22, took 5.196091 s
+----+-----+
|age |count|
+----+-----+
| 23 |  2  |
| 25 |  1  |
| 28 |  1  |
| 39 |  1  |
+----+-----+

Esecuzione di query SQL a livello di programmazione

Un SQLContext consente alle applicazioni di eseguire query SQL a livello di codice durante l'esecuzione di funzioni SQL e restituisce il risultato come DataFrame.

Generalmente, in background, SparkSQL supporta due diversi metodi per convertire RDD esistenti in DataFrame:

Sr. No Metodi e descrizione
1 Deduzione dello schema utilizzando la riflessione

Questo metodo utilizza la riflessione per generare lo schema di un RDD che contiene tipi specifici di oggetti.

2 Specifica dello schema a livello di codice

Il secondo metodo per creare DataFrame è tramite l'interfaccia programmatica che consente di costruire uno schema e quindi applicarlo a un RDD esistente.