Apache Kafka - Integrazione con Storm

In questo capitolo impareremo come integrare Kafka con Apache Storm.

A proposito di Storm

Storm è stato originariamente creato da Nathan Marz e dal team di BackType. In breve tempo, Apache Storm è diventato uno standard per il sistema di elaborazione distribuito in tempo reale che consente di elaborare un enorme volume di dati. Storm è molto veloce e un benchmark lo ha registrato a oltre un milione di tuple elaborate al secondo per nodo. Apache Storm viene eseguito continuamente, consumando i dati dalle origini configurate (Spouts) e trasmette i dati alla pipeline di elaborazione (Bolt). Combinati, beccucci e bulloni creano una topologia.

Integrazione con Storm

Kafka e Storm si completano naturalmente a vicenda e la loro potente collaborazione consente analisi in streaming in tempo reale per big data in rapido movimento. L'integrazione di Kafka e Storm ha lo scopo di rendere più facile per gli sviluppatori inserire e pubblicare flussi di dati dalle topologie Storm.

Flusso concettuale

Un beccuccio è una fonte di flussi. Ad esempio, uno spout può leggere tuple da un argomento Kafka ed emetterle come flusso. Un bolt consuma flussi di input, elabora ed eventualmente emette nuovi flussi. Bolts può fare qualsiasi cosa, dall'esecuzione di funzioni, al filtraggio di tuple, alle aggregazioni di streaming, ai join di streaming, alla comunicazione con i database e altro ancora. Ogni nodo in una topologia Storm viene eseguito in parallelo. Una topologia viene eseguita a tempo indeterminato finché non viene terminata. Storm riassegnerà automaticamente tutte le attività non riuscite. Inoltre, Storm garantisce che non si verificherà alcuna perdita di dati, anche se le macchine si arrestano e i messaggi vengono eliminati.

Esaminiamo in dettaglio le API di integrazione di Kafka-Storm. Esistono tre classi principali per integrare Kafka con Storm. Sono i seguenti:

BrokerHosts - ZkHosts e StaticHosts

BrokerHosts è un'interfaccia e ZkHosts e StaticHosts sono le sue due implementazioni principali. ZkHosts viene utilizzato per tracciare dinamicamente i broker Kafka mantenendo i dettagli in ZooKeeper, mentre StaticHosts viene utilizzato per impostare manualmente / staticamente i broker Kafka ei relativi dettagli. ZkHosts è il modo semplice e veloce per accedere al broker Kafka.

La firma di ZkHosts è la seguente:

public ZkHosts(String brokerZkStr, String brokerZkPath)
public ZkHosts(String brokerZkStr)

Dove brokerZkStr è l'host di ZooKeeper e brokerZkPath è il percorso di ZooKeeper per mantenere i dettagli del broker Kafka.

API KafkaConfig

Questa API viene utilizzata per definire le impostazioni di configurazione per il cluster Kafka. La firma di Kafka Con-fig è definita come segue

public KafkaConfig(BrokerHosts hosts, string topic)

    Hosts - I BrokerHost possono essere ZkHosts / StaticHosts.

    Topic - nome dell'argomento.

API SpoutConfig

Spoutconfig è un'estensione di KafkaConfig che supporta informazioni aggiuntive su ZooKeeper.

public SpoutConfig(BrokerHosts hosts, string topic, string zkRoot, string id)
  • Hosts - BrokerHosts può essere qualsiasi implementazione dell'interfaccia BrokerHosts

  • Topic - nome dell'argomento.

  • zkRoot - Percorso radice di ZooKeeper.

  • id −Il beccuccio memorizza lo stato degli offset consumati in Zookeeper. L'ID dovrebbe identificare in modo univoco il tuo beccuccio.

SchemeAsMultiScheme

SchemeAsMultiScheme è un'interfaccia che determina come il ByteBuffer consumato da Kafka viene trasformato in una tupla tempesta. È derivato da MultiScheme e accetta l'implementazione della classe Scheme. Ci sono molte implementazioni della classe Scheme e una di queste implementazioni è StringScheme, che analizza il byte come una semplice stringa. Controlla anche la denominazione del campo di output. La firma è definita come segue.

public SchemeAsMultiScheme(Scheme scheme)
  • Scheme - buffer di byte consumato da kafka.

API KafkaSpout

KafkaSpout è la nostra implementazione dello spout, che si integrerà con Storm. Recupera i messaggi dall'argomento kafka e li emette nell'ecosistema Storm come tuple. KafkaSpout ottiene i dettagli di configurazione da SpoutConfig.

Di seguito è riportato un codice di esempio per creare un semplice beccuccio Kafka.

// ZooKeeper connection string
BrokerHosts hosts = new ZkHosts(zkConnString);

//Creating SpoutConfig Object
SpoutConfig spoutConfig = new SpoutConfig(hosts, 
   topicName, "/" + topicName UUID.randomUUID().toString());

//convert the ByteBuffer to String.
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

//Assign SpoutConfig to KafkaSpout.
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

Creazione di bulloni

Bolt è un componente che accetta le tuple come input, elabora la tupla e produce nuove tuple come output. Bolts implementerà l'interfaccia IRichBolt. In questo programma, per eseguire le operazioni vengono utilizzate due classi di bulloni WordSplitter-Bolt e WordCounterBolt.

L'interfaccia IRichBolt ha i seguenti metodi:

  • Prepare- Fornisce al bolt un ambiente da eseguire. Gli esecutori eseguiranno questo metodo per inizializzare lo spout.

  • Execute - Elabora una singola tupla di input.

  • Cleanup - Chiamato quando un catenaccio sta per spegnersi.

  • declareOutputFields - Dichiara lo schema di output della tupla.

Creiamo SplitBolt.java, che implementa la logica per dividere una frase in parole e CountBolt.java, che implementa la logica per separare parole uniche e contare la sua occorrenza.

SplitBolt.java

import java.util.Map;

import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import backtype.storm.task.OutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.IRichBolt;
import backtype.storm.task.TopologyContext;

public class SplitBolt implements IRichBolt {
   private OutputCollector collector;
   
   @Override
   public void prepare(Map stormConf, TopologyContext context,
      OutputCollector collector) {
      this.collector = collector;
   }
   
   @Override
   public void execute(Tuple input) {
      String sentence = input.getString(0);
      String[] words = sentence.split(" ");
      
      for(String word: words) {
         word = word.trim();
         
         if(!word.isEmpty()) {
            word = word.toLowerCase();
            collector.emit(new Values(word));
         }
         
      }

      collector.ack(input);
   }
   
   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("word"));
   }

   @Override
   public void cleanup() {}
   
   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
   
}

CountBolt.java

import java.util.Map;
import java.util.HashMap;

import backtype.storm.tuple.Tuple;
import backtype.storm.task.OutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.IRichBolt;
import backtype.storm.task.TopologyContext;

public class CountBolt implements IRichBolt{
   Map<String, Integer> counters;
   private OutputCollector collector;
   
   @Override
   public void prepare(Map stormConf, TopologyContext context,
   OutputCollector collector) {
      this.counters = new HashMap<String, Integer>();
      this.collector = collector;
   }

   @Override
   public void execute(Tuple input) {
      String str = input.getString(0);
      
      if(!counters.containsKey(str)){
         counters.put(str, 1);
      }else {
         Integer c = counters.get(str) +1;
         counters.put(str, c);
      }
   
      collector.ack(input);
   }

   @Override
   public void cleanup() {
      for(Map.Entry<String, Integer> entry:counters.entrySet()){
         System.out.println(entry.getKey()+" : " + entry.getValue());
      }
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
   
   }

   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
}

Invio alla topologia

La topologia Storm è fondamentalmente una struttura Thrift. La classe TopologyBuilder fornisce metodi semplici e facili per creare topologie complesse. La classe TopologyBuilder dispone di metodi per impostare spout (setSpout) e per impostare bolt (setBolt). Infine, TopologyBuilder ha createTopology per creare to-pology. I metodi shuffleGrouping e fieldsGrouping aiutano a impostare il raggruppamento del flusso per spout e bolt.

Local Cluster- Ai fini di sviluppo, siamo in grado di creare un cluster locale utilizzando LocalCluster oggetto e quindi inviare la topologia utilizzando submitTopology metodo LocalCluster di classe.

KafkaStormSample.java

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;

import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

import backtype.storm.spout.SchemeAsMultiScheme;
import storm.kafka.trident.GlobalPartitionInformation;
import storm.kafka.ZkHosts;
import storm.kafka.Broker;
import storm.kafka.StaticHosts;
import storm.kafka.BrokerHosts;
import storm.kafka.SpoutConfig;
import storm.kafka.KafkaConfig;
import storm.kafka.KafkaSpout;
import storm.kafka.StringScheme;

public class KafkaStormSample {
   public static void main(String[] args) throws Exception{
      Config config = new Config();
      config.setDebug(true);
      config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
      String zkConnString = "localhost:2181";
      String topic = "my-first-topic";
      BrokerHosts hosts = new ZkHosts(zkConnString);
      
      SpoutConfig kafkaSpoutConfig = new SpoutConfig (hosts, topic, "/" + topic,    
         UUID.randomUUID().toString());
      kafkaSpoutConfig.bufferSizeBytes = 1024 * 1024 * 4;
      kafkaSpoutConfig.fetchSizeBytes = 1024 * 1024 * 4;
      kafkaSpoutConfig.forceFromStart = true;
      kafkaSpoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

      TopologyBuilder builder = new TopologyBuilder();
      builder.setSpout("kafka-spout", new KafkaSpout(kafkaSpoutCon-fig));
      builder.setBolt("word-spitter", new SplitBolt()).shuffleGroup-ing("kafka-spout");
      builder.setBolt("word-counter", new CountBolt()).shuffleGroup-ing("word-spitter");
         
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("KafkaStormSample", config, builder.create-Topology());

      Thread.sleep(10000);
      
      cluster.shutdown();
   }
}

Prima di spostare la compilation, l'integrazione di Kakfa-Storm necessita della libreria java del client ZooKeeper del curatore. La versione 2.9.1 del curatore supporta la versione 0.9.5 di Apache Storm (che usiamo in questo tutorial). Scarica i file jar specificati di seguito e inseriscili nel percorso di classe java.

  • curator-client-2.9.1.jar
  • curator-framework-2.9.1.jar

Dopo aver incluso i file delle dipendenze, compilare il programma utilizzando il seguente comando,

javac -cp "/path/to/Kafka/apache-storm-0.9.5/lib/*" *.java

Esecuzione

Avvia Kafka Producer CLI (spiegato nel capitolo precedente), crea un nuovo argomento chiamato my-first-topic e fornisci alcuni messaggi di esempio come mostrato di seguito -

hello
kafka
storm
spark
test message
another test message

Ora esegui l'applicazione utilizzando il seguente comando:

java -cp “/path/to/Kafka/apache-storm-0.9.5/lib/*”:. KafkaStormSample

L'output di esempio di questa applicazione è specificato di seguito:

storm : 1
test : 2
spark : 1
another : 1
kafka : 1
hello : 1
message : 2