Spark SQL - DataFrames
Un DataFrame è una raccolta distribuita di dati, organizzata in colonne denominate. Concettualmente, è equivalente a tabelle relazionali con buone tecniche di ottimizzazione.
Un DataFrame può essere costruito da una matrice di origini diverse come tabelle Hive, file di dati strutturati, database esterni o RDD esistenti. Questa API è stata progettata per applicazioni moderne di Big Data e data science prendendo ispirazioneDataFrame in R Programming e Pandas in Python.
Caratteristiche di DataFrame
Ecco una serie di alcune caratteristiche caratteristiche di DataFrame:
Capacità di elaborare i dati nella dimensione da Kilobyte a Petabyte su un cluster a nodo singolo in cluster di grandi dimensioni.
Supporta diversi formati di dati (Avro, csv, ricerca elastica e Cassandra) e sistemi di archiviazione (HDFS, tabelle HIVE, mysql, ecc.).
Ottimizzazione dello stato dell'arte e generazione di codice tramite l'ottimizzatore Spark SQL Catalyst (framework di trasformazione dell'albero).
Può essere facilmente integrato con tutti gli strumenti e framework di Big Data tramite Spark-Core.
Fornisce API per la programmazione Python, Java, Scala e R.
SQLContext
SQLContext è una classe e viene utilizzata per inizializzare le funzionalità di Spark SQL. SparkContext classe oggetto (sc) è necessario per inizializzare l'oggetto classe SQLContext.
Il comando seguente viene utilizzato per inizializzare SparkContext tramite spark-shell.
$ spark-shell
Per impostazione predefinita, l'oggetto SparkContext viene inizializzato con il nome sc quando parte la scintilla.
Utilizzare il comando seguente per creare SQLContext.
scala> val sqlcontext = new org.apache.spark.sql.SQLContext(sc)
Esempio
Consideriamo un esempio di record dei dipendenti in un file JSON denominato employee.json. Utilizzare i seguenti comandi per creare un DataFrame (df) e leggere un documento JSON denominatoemployee.json con il seguente contenuto.
employee.json - Posiziona questo file nella directory in cui si trova il file scala> il puntatore si trova.
{
{"id" : "1201", "name" : "satish", "age" : "25"}
{"id" : "1202", "name" : "krishna", "age" : "28"}
{"id" : "1203", "name" : "amith", "age" : "39"}
{"id" : "1204", "name" : "javed", "age" : "23"}
{"id" : "1205", "name" : "prudvi", "age" : "23"}
}
Operazioni DataFrame
DataFrame fornisce un linguaggio specifico del dominio per la manipolazione dei dati strutturati. Qui, includiamo alcuni esempi di base di elaborazione di dati strutturati utilizzando DataFrame.
Seguire i passaggi indicati di seguito per eseguire le operazioni DataFrame -
Leggi il documento JSON
Innanzitutto, dobbiamo leggere il documento JSON. Sulla base di ciò, genera un DataFrame denominato (dfs).
Utilizza il seguente comando per leggere il documento JSON denominato employee.json. I dati vengono visualizzati come una tabella con i campi: id, nome ed età.
scala> val dfs = sqlContext.read.json("employee.json")
Output - I nomi dei campi vengono presi automaticamente da employee.json.
dfs: org.apache.spark.sql.DataFrame = [age: string, id: string, name: string]
Mostra i dati
Se vuoi vedere i dati nel DataFrame, usa il seguente comando.
scala> dfs.show()
Output - Puoi vedere i dati dei dipendenti in formato tabulare.
<console>:22, took 0.052610 s
+----+------+--------+
|age | id | name |
+----+------+--------+
| 25 | 1201 | satish |
| 28 | 1202 | krishna|
| 39 | 1203 | amith |
| 23 | 1204 | javed |
| 23 | 1205 | prudvi |
+----+------+--------+
Usa il metodo printSchema
Se vuoi vedere la struttura (schema) del DataFrame, usa il seguente comando.
scala> dfs.printSchema()
Output
root
|-- age: string (nullable = true)
|-- id: string (nullable = true)
|-- name: string (nullable = true)
Usa metodo di selezione
Usa il seguente comando per recuperare name-colonna tra tre colonne dal DataFrame.
scala> dfs.select("name").show()
Output - Puoi vedere i valori di name colonna.
<console>:22, took 0.044023 s
+--------+
| name |
+--------+
| satish |
| krishna|
| amith |
| javed |
| prudvi |
+--------+
Usa filtro età
Utilizzare il seguente comando per trovare i dipendenti la cui età è maggiore di 23 (età> 23).
scala> dfs.filter(dfs("age") > 23).show()
Output
<console>:22, took 0.078670 s
+----+------+--------+
|age | id | name |
+----+------+--------+
| 25 | 1201 | satish |
| 28 | 1202 | krishna|
| 39 | 1203 | amith |
+----+------+--------+
Usa il metodo groupBy
Utilizzare il seguente comando per contare il numero di dipendenti che hanno la stessa età.
scala> dfs.groupBy("age").count().show()
Output - due dipendenti hanno 23 anni.
<console>:22, took 5.196091 s
+----+-----+
|age |count|
+----+-----+
| 23 | 2 |
| 25 | 1 |
| 28 | 1 |
| 39 | 1 |
+----+-----+
Esecuzione di query SQL a livello di programmazione
Un SQLContext consente alle applicazioni di eseguire query SQL a livello di codice durante l'esecuzione di funzioni SQL e restituisce il risultato come DataFrame.
Generalmente, in background, SparkSQL supporta due diversi metodi per convertire RDD esistenti in DataFrame:
Sr. No | Metodi e descrizione |
---|---|
1 | Deduzione dello schema utilizzando la riflessione Questo metodo utilizza la riflessione per generare lo schema di un RDD che contiene tipi specifici di oggetti. |
2 | Specifica dello schema a livello di codice Il secondo metodo per creare DataFrame è tramite l'interfaccia programmatica che consente di costruire uno schema e quindi applicarlo a un RDD esistente. |