Apache Kafka - Integrazione con Spark

In questo capitolo, discuteremo su come integrare Apache Kafka con Spark Streaming API.

A proposito di Spark

L'API Spark Streaming consente un'elaborazione del flusso scalabile, ad alta velocità e tolleranza agli errori di flussi di dati in tempo reale. I dati possono essere acquisiti da molte fonti come Kafka, Flume, Twitter, ecc. E possono essere elaborati utilizzando algoritmi complessi come funzioni di alto livello come map, reduce, join e window. Infine, i dati elaborati possono essere inviati a filesystem, database e dashboard live. Resilient Distributed Datasets (RDD) è una struttura dati fondamentale di Spark. È una raccolta distribuita immutabile di oggetti. Ogni set di dati in RDD è suddiviso in partizioni logiche, che possono essere calcolate su diversi nodi del cluster.

Integrazione con Spark

Kafka è una potenziale piattaforma di messaggistica e integrazione per lo streaming di Spark. Kafka funge da hub centrale per i flussi di dati in tempo reale e vengono elaborati utilizzando algoritmi complessi in Spark Streaming. Una volta elaborati i dati, Spark Streaming potrebbe pubblicare i risultati in un altro argomento Kafka o archiviarli in HDFS, database o dashboard. Il diagramma seguente illustra il flusso concettuale.

Ora esaminiamo in dettaglio le API di Kafka-Spark.

API SparkConf

Rappresenta la configurazione per un'applicazione Spark. Utilizzato per impostare vari parametri Spark come coppie chiave-valore.

La classe SparkConf ha i seguenti metodi:

  • set(string key, string value) - imposta la variabile di configurazione.

  • remove(string key) - rimuovere la chiave dalla configurazione.

  • setAppName(string name) - imposta il nome dell'applicazione per la tua applicazione.

  • get(string key) - prendi la chiave

StreamingContext API

Questo è il punto di ingresso principale per la funzionalità Spark. Un SparkContext rappresenta la connessione a un cluster Spark e può essere utilizzato per creare RDD, accumulatori e variabili di trasmissione sul cluster. La firma è definita come mostrato di seguito.

public StreamingContext(String master, String appName, Duration batchDuration, 
   String sparkHome, scala.collection.Seq<String> jars, 
   scala.collection.Map<String,String> environment)
  • master - URL del cluster a cui connettersi (ad esempio mesos: // host: port, spark: // host: port, local [4]).

  • appName - un nome per il tuo lavoro, da visualizzare sull'interfaccia utente web del cluster

  • batchDuration - l'intervallo di tempo in cui i dati di streaming verranno suddivisi in batch

public StreamingContext(SparkConf conf, Duration batchDuration)

Crea un StreamingContext fornendo la configurazione necessaria per un nuovo SparkContext.

  • conf - Parametri Spark

  • batchDuration - l'intervallo di tempo in cui i dati di streaming verranno suddivisi in batch

API KafkaUtils

L'API KafkaUtils viene utilizzata per connettere il cluster Kafka allo streaming Spark. Questa API ha la firma del metodo createStream significativa definita come di seguito.

public static ReceiverInputDStream<scala.Tuple2<String,String>> createStream(
   StreamingContext ssc, String zkQuorum, String groupId,
   scala.collection.immutable.Map<String,Object> topics, StorageLevel storageLevel)

Il metodo mostrato sopra viene utilizzato per creare un flusso di input che estrae i messaggi da Kafka Brokers.

  • ssc - Oggetto StreamingContext.

  • zkQuorum - Quorum dello Zookeeper.

  • groupId - L'ID gruppo per questo consumatore.

  • topics - restituire una mappa degli argomenti da consumare.

  • storageLevel - Livello di archiviazione da utilizzare per memorizzare gli oggetti ricevuti.

L'API KafkaUtils ha un altro metodo createDirectStream, che viene utilizzato per creare un flusso di input che estrae direttamente i messaggi da Kafka Brokers senza utilizzare alcun ricevitore. Questo flusso può garantire che ogni messaggio di Kafka sia incluso nelle trasformazioni esattamente una volta.

L'applicazione di esempio viene eseguita in Scala. Per compilare l'applicazione, scarica e installa sbt , scala build tool (simile a maven). Di seguito viene presentato il codice dell'applicazione principale.

import java.util.HashMap

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, Produc-erRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._

object KafkaWordCount {
   def main(args: Array[String]) {
      if (args.length < 4) {
         System.err.println("Usage: KafkaWordCount <zkQuorum><group> <topics> <numThreads>")
         System.exit(1)
      }

      val Array(zkQuorum, group, topics, numThreads) = args
      val sparkConf = new SparkConf().setAppName("KafkaWordCount")
      val ssc = new StreamingContext(sparkConf, Seconds(2))
      ssc.checkpoint("checkpoint")

      val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
      val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
      val words = lines.flatMap(_.split(" "))
      val wordCounts = words.map(x => (x, 1L))
         .reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
      wordCounts.print()

      ssc.start()
      ssc.awaitTermination()
   }
}

Crea script

L'integrazione spark-kafka dipende dal jar di integrazione spark, spark streaming e spark Kafka. Crea un nuovo file build.sbt e specifica i dettagli dell'applicazione e la sua dipendenza. Lo sbt scaricherà il jar necessario durante la compilazione e il confezionamento dell'applicazione.

name := "Spark Kafka Project"
version := "1.0"
scalaVersion := "2.10.5"

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.6.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" % "1.6.0"

Compilazione / Confezione

Eseguire il comando seguente per compilare e creare il pacchetto del file jar dell'applicazione. Dobbiamo inviare il file jar nella console Spark per eseguire l'applicazione.

sbt package

Invio a Spark

Avvia Kafka Producer CLI (spiegato nel capitolo precedente), crea un nuovo argomento chiamato my-first-topic e fornisci alcuni messaggi di esempio come mostrato di seguito.

Another spark test message

Eseguire il comando seguente per inviare l'applicazione a Spark Console.

/usr/local/spark/bin/spark-submit --packages org.apache.spark:spark-streaming
-kafka_2.10:1.6.0 --class "KafkaWordCount" --master local[4] target/scala-2.10/spark
-kafka-project_2.10-1.0.jar localhost:2181 <group name> <topic name> <number of threads>

L'output di esempio di questa applicazione è mostrato di seguito.

spark console messages ..
(Test,1)
(spark,1)
(another,1)
(message,1)
spark console message ..