Apache Storm - Esempio di lavoro

Abbiamo esaminato i dettagli tecnici principali di Apache Storm e ora è il momento di codificare alcuni semplici scenari.

Scenario - Analizzatore del registro delle chiamate mobili

La chiamata mobile e la sua durata verranno fornite come input ad Apache Storm e Storm elaborerà e raggrupperà la chiamata tra lo stesso chiamante e destinatario e il loro numero totale di chiamate.

Creazione beccuccio

Spout è un componente utilizzato per la generazione di dati. Fondamentalmente, uno spout implementerà un'interfaccia IRichSpout. L'interfaccia "IRichSpout" ha i seguenti metodi importanti:

  • open- Fornisce al beccuccio un ambiente da eseguire. Gli esecutori eseguiranno questo metodo per inizializzare lo spout.

  • nextTuple - Emette i dati generati tramite il collector.

  • close - Questo metodo viene chiamato quando uno spout sta per spegnersi.

  • declareOutputFields - Dichiara lo schema di output della tupla.

  • ack - Riconosce che una tupla specifica viene elaborata

  • fail - Specifica che una tupla specifica non viene elaborata e non deve essere rielaborata.

Aperto

La firma del open metodo è il seguente:

open(Map conf, TopologyContext context, SpoutOutputCollector collector)
  • conf - Fornisce la configurazione tempesta per questo beccuccio.

  • context - Fornisce informazioni complete sulla posizione del beccuccio all'interno della topologia, il suo ID attività, informazioni di input e output.

  • collector - Ci consente di emettere la tupla che verrà elaborata dai bolt.

nextTuple

La firma del nextTuple metodo è il seguente:

nextTuple()

nextTuple () viene chiamato periodicamente dallo stesso ciclo dei metodi ack () e fail (). Deve rilasciare il controllo del thread quando non c'è lavoro da fare, in modo che gli altri metodi abbiano la possibilità di essere chiamati. Quindi la prima riga di nextTuple controlla se l'elaborazione è terminata. In tal caso, dovrebbe restare inattivo per almeno un millisecondo per ridurre il carico sul processore prima di tornare.

vicino

La firma del close metodo è il seguente:

close()

declareOutputFields

La firma del declareOutputFields metodo è il seguente:

declareOutputFields(OutputFieldsDeclarer declarer)

declarer - Viene utilizzato per dichiarare gli ID del flusso di output, i campi di output, ecc.

Questo metodo viene utilizzato per specificare lo schema di output della tupla.

ack

La firma del ack metodo è il seguente:

ack(Object msgId)

Questo metodo riconosce che una tupla specifica è stata elaborata.

fallire

La firma del nextTuple metodo è il seguente:

ack(Object msgId)

Questo metodo informa che una tupla specifica non è stata completamente elaborata. Storm rielaborerà la tupla specifica.

FakeCallLogReaderSpout

Nel nostro scenario, dobbiamo raccogliere i dettagli del registro delle chiamate. Le informazioni del registro delle chiamate contengono.

  • numero del chiamante
  • numero del ricevitore
  • duration

Poiché non disponiamo di informazioni in tempo reale sui registri delle chiamate, genereremo falsi registri delle chiamate. Le informazioni false verranno create utilizzando la classe Random. Di seguito viene fornito il codice completo del programma.

Codifica - FakeCallLogReaderSpout.java

import java.util.*;
//import storm tuple packages
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

//import Spout interface packages
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;

//Create a class FakeLogReaderSpout which implement IRichSpout interface 
   to access functionalities
	
public class FakeCallLogReaderSpout implements IRichSpout {
   //Create instance for SpoutOutputCollector which passes tuples to bolt.
   private SpoutOutputCollector collector;
   private boolean completed = false;
	
   //Create instance for TopologyContext which contains topology data.
   private TopologyContext context;
	
   //Create instance for Random class.
   private Random randomGenerator = new Random();
   private Integer idx = 0;

   @Override
   public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
      this.context = context;
      this.collector = collector;
   }

   @Override
   public void nextTuple() {
      if(this.idx <= 1000) {
         List<String> mobileNumbers = new ArrayList<String>();
         mobileNumbers.add("1234123401");
         mobileNumbers.add("1234123402");
         mobileNumbers.add("1234123403");
         mobileNumbers.add("1234123404");

         Integer localIdx = 0;
         while(localIdx++ < 100 && this.idx++ < 1000) {
            String fromMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
            String toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
				
            while(fromMobileNumber == toMobileNumber) {
               toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
            }
				
            Integer duration = randomGenerator.nextInt(60);
            this.collector.emit(new Values(fromMobileNumber, toMobileNumber, duration));
         }
      }
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("from", "to", "duration"));
   }

   //Override all the interface methods
   @Override
   public void close() {}

   public boolean isDistributed() {
      return false;
   }

   @Override
   public void activate() {}

   @Override 
   public void deactivate() {}

   @Override
   public void ack(Object msgId) {}

   @Override
   public void fail(Object msgId) {}

   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
}

Creazione di bulloni

Bolt è un componente che accetta le tuple come input, elabora la tupla e produce nuove tuple come output. Bolts implementeràIRichBoltinterfaccia. In questo programma, due classi di bulloniCallLogCreatorBolt e CallLogCounterBolt vengono utilizzati per eseguire le operazioni.

L'interfaccia IRichBolt ha i seguenti metodi:

  • prepare- Fornisce al bolt un ambiente da eseguire. Gli esecutori eseguiranno questo metodo per inizializzare lo spout.

  • execute - Elabora una singola tupla di input.

  • cleanup - Chiamato quando un chiavistello sta per spegnersi.

  • declareOutputFields - Dichiara lo schema di output della tupla.

Preparare

La firma del prepare metodo è il seguente:

prepare(Map conf, TopologyContext context, OutputCollector collector)
  • conf - Fornisce la configurazione Storm per questo bolt.

  • context - Fornisce informazioni complete sulla posizione del bullone all'interno della topologia, il suo ID attività, informazioni di input e output, ecc.

  • collector - Ci consente di emettere la tupla elaborata.

eseguire

La firma del execute metodo è il seguente:

execute(Tuple tuple)

Qui tuple è la tupla di input da elaborare.

Il executemetodo elabora una singola tupla alla volta. È possibile accedere ai dati della tupla tramite il metodo getValue della classe Tuple. Non è necessario elaborare immediatamente la tupla di input. Più tupla possono essere elaborate e restituite come una singola tupla di output. La tupla elaborata può essere emessa utilizzando la classe OutputCollector.

pulire

La firma del cleanup metodo è il seguente:

cleanup()

declareOutputFields

La firma del declareOutputFields metodo è il seguente:

declareOutputFields(OutputFieldsDeclarer declarer)

Qui il parametro declarer viene utilizzato per dichiarare gli ID del flusso di output, i campi di output, ecc.

Questo metodo viene utilizzato per specificare lo schema di output della tupla

Registro chiamate Creator Bolt

Il bolt creatore del registro chiamate riceve la tupla del registro chiamate. La tupla del registro delle chiamate ha il numero del chiamante, il numero del destinatario e la durata della chiamata. Questo bullone crea semplicemente un nuovo valore combinando il numero del chiamante e il numero del ricevitore. Il formato del nuovo valore è "Numero chiamante - Numero destinatario" ed è denominato come nuovo campo, "chiamata". Di seguito viene fornito il codice completo.

Codifica - CallLogCreatorBolt.java

//import util packages
import java.util.HashMap;
import java.util.Map;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;

//import Storm IRichBolt package
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

//Create a class CallLogCreatorBolt which implement IRichBolt interface
public class CallLogCreatorBolt implements IRichBolt {
   //Create instance for OutputCollector which collects and emits tuples to produce output
   private OutputCollector collector;

   @Override
   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      this.collector = collector;
   }

   @Override
   public void execute(Tuple tuple) {
      String from = tuple.getString(0);
      String to = tuple.getString(1);
      Integer duration = tuple.getInteger(2);
      collector.emit(new Values(from + " - " + to, duration));
   }

   @Override
   public void cleanup() {}

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("call", "duration"));
   }
	
   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
}

Registro chiamate Counter Bolt

Call log counter bolt riceve la chiamata e la sua durata come tupla. Questo bullone inizializza un oggetto dizionario (mappa) nel metodo prepare. Nelexecute, controlla la tupla e crea una nuova voce nell'oggetto dizionario per ogni nuovo valore di “chiamata” nella tupla e imposta un valore 1 nell'oggetto dizionario. Per la voce già disponibile nel dizionario, incrementa semplicemente il suo valore. In termini semplici, questo bolt salva la chiamata e il suo conteggio nell'oggetto dizionario. Invece di salvare la chiamata e il suo conteggio nel dizionario, possiamo anche salvarla in un'origine dati. Il codice completo del programma è il seguente:

Codifica - CallLogCounterBolt.java

import java.util.HashMap;
import java.util.Map;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

public class CallLogCounterBolt implements IRichBolt {
   Map<String, Integer> counterMap;
   private OutputCollector collector;

   @Override
   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      this.counterMap = new HashMap<String, Integer>();
      this.collector = collector;
   }

   @Override
   public void execute(Tuple tuple) {
      String call = tuple.getString(0);
      Integer duration = tuple.getInteger(1);
		
      if(!counterMap.containsKey(call)){
         counterMap.put(call, 1);
      }else{
         Integer c = counterMap.get(call) + 1;
         counterMap.put(call, c);
      }
		
      collector.ack(tuple);
   }

   @Override
   public void cleanup() {
      for(Map.Entry<String, Integer> entry:counterMap.entrySet()){
         System.out.println(entry.getKey()+" : " + entry.getValue());
      }
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("call"));
   }
	
   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
	
}

Creazione della topologia

La topologia Storm è fondamentalmente una struttura Thrift. La classe TopologyBuilder fornisce metodi semplici e facili per creare topologie complesse. La classe TopologyBuilder dispone di metodi per impostare lo spout(setSpout) e per fissare il bullone (setBolt). Infine, TopologyBuilder ha createTopology per creare la topologia. Usa il seguente frammento di codice per creare una topologia:

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());

builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())
   .shuffleGrouping("call-log-reader-spout");

builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt())
   .fieldsGrouping("call-log-creator-bolt", new Fields("call"));

shuffleGrouping e fieldsGrouping i metodi aiutano a impostare il raggruppamento del flusso per beccuccio e bulloni.

Cluster locale

A scopo di sviluppo, possiamo creare un cluster locale utilizzando l'oggetto "LocalCluster" e quindi inviare la topologia utilizzando il metodo "submitTopology" della classe "LocalCluster". Uno degli argomenti per "submitTopology" è un'istanza della classe "Config". La classe "Config" viene utilizzata per impostare le opzioni di configurazione prima di inviare la topologia. Questa opzione di configurazione verrà unita alla configurazione del cluster in fase di esecuzione e inviata a tutte le attività (spout e bolt) con il metodo prepare. Una volta inviata la topologia al cluster, attenderemo 10 secondi affinché il cluster calcoli la topologia inviata e quindi spegneremo il cluster utilizzando il metodo "shutdown" di "LocalCluster". Il codice completo del programma è il seguente:

Codifica - LogAnalyserStorm.java

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

//import storm configuration packages
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;

//Create main class LogAnalyserStorm submit topology.
public class LogAnalyserStorm {
   public static void main(String[] args) throws Exception{
      //Create Config instance for cluster configuration
      Config config = new Config();
      config.setDebug(true);
		
      //
      TopologyBuilder builder = new TopologyBuilder();
      builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());

      builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())
         .shuffleGrouping("call-log-reader-spout");

      builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt())
         .fieldsGrouping("call-log-creator-bolt", new Fields("call"));
			
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("LogAnalyserStorm", config, builder.createTopology());
      Thread.sleep(10000);
		
      //Stop the topology
		
      cluster.shutdown();
   }
}

Creazione ed esecuzione dell'applicazione

L'applicazione completa ha quattro codici Java. Sono -

  • FakeCallLogReaderSpout.java
  • CallLogCreaterBolt.java
  • CallLogCounterBolt.java
  • LogAnalyerStorm.java

L'applicazione può essere creata utilizzando il seguente comando:

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java

L'applicazione può essere eseguita utilizzando il seguente comando:

java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserStorm

Produzione

Una volta avviata l'applicazione, verranno visualizzati i dettagli completi sul processo di avvio del cluster, l'elaborazione di spout e bolt e, infine, il processo di arresto del cluster. In "CallLogCounterBolt", abbiamo stampato la chiamata e i dettagli del conteggio. Queste informazioni verranno visualizzate sulla console come segue:

1234123402 - 1234123401 : 78
1234123402 - 1234123404 : 88
1234123402 - 1234123403 : 105
1234123401 - 1234123404 : 74
1234123401 - 1234123403 : 81
1234123401 - 1234123402 : 81
1234123403 - 1234123404 : 86
1234123404 - 1234123401 : 63
1234123404 - 1234123402 : 82
1234123403 - 1234123402 : 83
1234123404 - 1234123403 : 86
1234123403 - 1234123401 : 93

Linguaggi non JVM

Le topologie Storm sono implementate da interfacce Thrift che semplificano l'invio di topologie in qualsiasi lingua. Storm supporta Ruby, Python e molti altri linguaggi. Diamo un'occhiata al binding di Python.

Python Binding

Python è un linguaggio di programmazione interpretato per scopi generali, interattivo, orientato agli oggetti e di alto livello. Storm supporta Python per implementare la sua topologia. Python supporta le operazioni di emissione, ancoraggio, riconoscimento e registrazione.

Come sapete, i bulloni possono essere definiti in qualsiasi lingua. I bolt scritti in un'altra lingua vengono eseguiti come processi secondari e Storm comunica con tali processi secondari con messaggi JSON su stdin / stdout. Per prima cosa prendi un esempio di Bolt WordCount che supporta l'associazione Python.

public static class WordCount implements IRichBolt {
   public WordSplit() {
      super("python", "splitword.py");
   }
	
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("word"));
   }
}

Qui la classe WordCount implementa il IRichBoltinterfaccia e in esecuzione con l'implementazione di Python specificato come argomento del super metodo "splitword.py". Ora crea un'implementazione di Python chiamata "splitword.py".

import storm
   class WordCountBolt(storm.BasicBolt):
      def process(self, tup):
         words = tup.values[0].split(" ")
         for word in words:
         storm.emit([word])
WordCountBolt().run()

Questa è l'implementazione di esempio per Python che conta le parole in una determinata frase. Allo stesso modo puoi collegarti anche ad altre lingue di supporto.