Apache Storm - Guida rapida

Cos'è Apache Storm?

Apache Storm è un sistema di elaborazione di big data distribuito in tempo reale. Storm è progettato per elaborare grandi quantità di dati in un metodo scalabile orizzontale e con tolleranza agli errori. È un framework di dati in streaming che ha la capacità di raggiungere i più alti tassi di ingestione. Sebbene Storm sia senza stato, gestisce l'ambiente distribuito e lo stato del cluster tramite Apache ZooKeeper. È semplice e puoi eseguire tutti i tipi di manipolazioni sui dati in tempo reale in parallelo.

Apache Storm continua a essere un leader nell'analisi dei dati in tempo reale. Storm è facile da configurare, utilizzare e garantisce che ogni messaggio verrà elaborato attraverso la topologia almeno una volta.

Apache Storm contro Hadoop

Fondamentalmente i framework Hadoop e Storm vengono utilizzati per analizzare i big data. Entrambi si completano a vicenda e differiscono in alcuni aspetti. Apache Storm esegue tutte le operazioni tranne la persistenza, mentre Hadoop è bravo in tutto ma è in ritardo nel calcolo in tempo reale. La tabella seguente confronta gli attributi di Storm e Hadoop.

Tempesta Hadoop
Elaborazione del flusso in tempo reale Elaborazione in lotti
Apolidi Stateful
Architettura Master / Slave con coordinamento basato su ZooKeeper. Il nodo master è chiamato comenimbus e gli schiavi lo sono supervisors. Architettura master-slave con / senza coordinamento basato su ZooKeeper. Il nodo principale èjob tracker e il nodo slave è task tracker.
Un processo di streaming Storm può accedere a decine di migliaia di messaggi al secondo sul cluster. Hadoop Distributed File System (HDFS) utilizza il framework MapReduce per elaborare enormi quantità di dati che richiedono minuti o ore.
La topologia Storm viene eseguita fino all'arresto da parte dell'utente o fino a un errore irreversibile imprevisto. I lavori MapReduce vengono eseguiti in ordine sequenziale e alla fine completati.
Both are distributed and fault-tolerant
Se nimbus / supervisore muore, il riavvio lo fa continuare da dove si era fermato, quindi nulla viene influenzato. Se JobTracker muore, tutti i lavori in esecuzione vengono persi.

Casi d'uso di Apache Storm

Apache Storm è molto famoso per l'elaborazione in tempo reale di grandi flussi di dati. Per questo motivo, la maggior parte delle aziende utilizza Storm come parte integrante del proprio sistema. Alcuni esempi notevoli sono i seguenti:

Twitter- Twitter utilizza Apache Storm per la sua gamma di "prodotti di Publisher Analytics". I "Prodotti di analisi dei publisher" elaborano ogni singolo tweet e clic nella piattaforma Twitter. Apache Storm è profondamente integrato con l'infrastruttura di Twitter.

NaviSite- NaviSite utilizza Storm per il sistema di monitoraggio / verifica del registro eventi. Tutti i registri generati nel sistema passeranno attraverso la tempesta. Storm controllerà il messaggio rispetto al set configurato di espressioni regolari e se c'è una corrispondenza, quel particolare messaggio verrà salvato nel database.

Wego- Wego è un metasearch di viaggio con sede a Singapore. I dati relativi ai viaggi provengono da molte fonti in tutto il mondo con tempi diversi. Storm aiuta Wego a cercare dati in tempo reale, risolve i problemi di concorrenza e trova la migliore corrispondenza per l'utente finale.

Vantaggi di Apache Storm

Ecco un elenco dei vantaggi offerti da Apache Storm:

  • Storm è open source, robusto e facile da usare. Potrebbe essere utilizzato sia nelle piccole aziende che nelle grandi società.

  • Storm è a tolleranza di errore, flessibile, affidabile e supporta qualsiasi linguaggio di programmazione.

  • Consente l'elaborazione del flusso in tempo reale.

  • Storm è incredibilmente veloce perché ha un enorme potere di elaborazione dei dati.

  • Storm può mantenere le prestazioni anche sotto carico crescente aggiungendo risorse in modo lineare. È altamente scalabile.

  • Storm esegue l'aggiornamento dei dati e la risposta di consegna end-to-end in pochi secondi o minuti a seconda del problema. Ha una latenza molto bassa.

  • Storm ha intelligenza operativa.

  • Storm fornisce un'elaborazione dei dati garantita anche se uno dei nodi connessi nel cluster muore o i messaggi vengono persi.

Apache Storm legge il flusso grezzo di dati in tempo reale da un'estremità e lo trasmette attraverso una sequenza di piccole unità di elaborazione e invia le informazioni elaborate / utili all'altra estremità.

Il diagramma seguente illustra il concetto di base di Apache Storm.

Diamo ora uno sguardo più da vicino ai componenti di Apache Storm -

Componenti Descrizione
Tupla Tuple è la struttura dati principale in Storm. È un elenco di elementi ordinati. Per impostazione predefinita, una tupla supporta tutti i tipi di dati. In genere, viene modellato come un insieme di valori separati da virgole e passato a un cluster Storm.
Stream Stream è una sequenza non ordinata di tuple.
Beccucci Fonte del flusso. In genere, Storm accetta dati di input da origini dati non elaborate come Twitter Streaming API, coda Apache Kafka, coda Kestrel, ecc. In caso contrario, è possibile scrivere spout per leggere i dati dalle origini dati. "ISpout" è l'interfaccia principale per l'implementazione degli spout. Alcune delle interfacce specifiche sono IRichSpout, BaseRichSpout, KafkaSpout, ecc.
Bulloni I bulloni sono unità di elaborazione logica. Gli spout passano i dati al processo di bolt e bolt e producono un nuovo flusso di output. Bolts può eseguire le operazioni di filtraggio, aggregazione, unione, interazione con origini dati e database. Bolt riceve i dati ed emette su uno o più bulloni. "IBolt" è l'interfaccia principale per l'implementazione dei bulloni. Alcune delle interfacce comuni sono IRichBolt, IBasicBolt, ecc.

Facciamo un esempio in tempo reale di "Twitter Analysis" e vediamo come può essere modellato in Apache Storm. Il diagramma seguente mostra la struttura.

L'input per l '"analisi di Twitter" proviene dall'API di streaming di Twitter. Spout leggerà i tweet degli utenti che utilizzano l'API di streaming di Twitter e l'output come un flusso di tuple. Una singola tupla dallo spout avrà un nome utente Twitter e un singolo tweet come valori separati da virgole. Quindi, questo flusso di tuple verrà inoltrato al Bolt e il Bolt dividerà il tweet in singole parole, calcolerà il conteggio delle parole e manterrà le informazioni su un'origine dati configurata. Ora possiamo ottenere facilmente il risultato interrogando l'origine dati.

Topologia

I beccucci e i bulloni sono collegati insieme e formano una topologia. La logica dell'applicazione in tempo reale è specificata nella topologia Storm. In parole semplici, una topologia è un grafo diretto in cui i vertici sono il calcolo e i bordi sono flussi di dati.

Una topologia semplice inizia con gli spout. Spout invia i dati a uno o più bulloni. Bolt rappresenta un nodo nella topologia con la logica di elaborazione più piccola e l'output di un bolt può essere emesso in un altro bolt come input.

Storm mantiene la topologia sempre in esecuzione, fino a quando non si termina la topologia. Il compito principale di Apache Storm è eseguire la topologia e eseguirà un numero qualsiasi di topologia in un dato momento.

Compiti

Ora hai un'idea di base su beccucci e bulloni. Sono la più piccola unità logica della topologia e una topologia è costruita utilizzando un singolo beccuccio e una serie di bulloni. Devono essere eseguiti correttamente in un ordine particolare affinché la topologia venga eseguita correttamente. L'esecuzione di ogni singolo beccuccio e catenaccio da parte di Storm è chiamata "Compiti". In parole semplici, un'attività è l'esecuzione di un beccuccio o di un bullone. In un dato momento, ogni beccuccio e bullone possono avere più istanze in esecuzione in più thread separati.

Lavoratori

Una topologia viene eseguita in modo distribuito, su più nodi di lavoro. Storm distribuisce le attività in modo uniforme su tutti i nodi di lavoro. Il ruolo del nodo di lavoro è ascoltare i lavori e avviare o arrestare i processi ogni volta che arriva un nuovo lavoro.

Raggruppamento di flussi

Il flusso di dati scorre dagli spout ai bulloni o da un bullone all'altro. Il raggruppamento dei flussi controlla il modo in cui le tuple vengono instradate nella topologia e ci aiuta a comprendere il flusso delle tuple nella topologia. Ci sono quattro raggruppamenti incorporati come spiegato di seguito.

Raggruppamento casuale

Nel raggruppamento casuale, un numero uguale di tuple viene distribuito casualmente tra tutti i lavoratori che eseguono i bulloni. Il diagramma seguente mostra la struttura.

Raggruppamento di campi

I campi con gli stessi valori nelle tuple vengono raggruppati e le restanti tuple vengono mantenute all'esterno. Quindi, le tuple con gli stessi valori di campo vengono inviate allo stesso worker che esegue i bolt. Ad esempio, se lo stream è raggruppato in base al campo "parola", le tuple con la stessa stringa "Hello" si sposteranno nello stesso worker. Il diagramma seguente mostra come funziona il raggruppamento dei campi.

Raggruppamento globale

Tutti i flussi possono essere raggruppati e inoltrati a un bullone. Questo raggruppamento invia le tuple generate da tutte le istanze dell'origine a una singola istanza di destinazione (in particolare, scegli il worker con l'ID più basso).

Tutti i raggruppamenti

All Grouping invia una singola copia di ogni tupla a tutte le istanze del bolt ricevente. Questo tipo di raggruppamento viene utilizzato per inviare segnali ai bulloni. Tutto il raggruppamento è utile per le operazioni di unione.

Uno dei punti salienti di Apache Storm è che è un'applicazione a tolleranza d'errore, veloce e senza applicazione distribuita "Single Point of Failure" (SPOF). Possiamo installare Apache Storm in tutti i sistemi necessari per aumentare la capacità dell'applicazione.

Diamo un'occhiata a come è progettato il cluster Apache Storm e alla sua architettura interna. Il diagramma seguente mostra il design del cluster.

Apache Storm ha due tipi di nodi, Nimbus (nodo master) e Supervisor(nodo di lavoro). Nimbus è il componente centrale di Apache Storm. Il compito principale di Nimbus è eseguire la topologia Storm. Nimbus analizza la topologia e raccoglie l'attività da eseguire. Quindi, distribuirà l'attività a un supervisore disponibile.

Un supervisore avrà uno o più processi di lavoro. Il supervisore delegherà le attività ai processi di lavoro. Il processo di lavoro genererà tutti gli esecutori necessari ed eseguirà l'attività. Apache Storm utilizza un sistema di messaggistica distribuito interno per la comunicazione tra nimbus e supervisori.

Componenti Descrizione
Nimbus Nimbus è un nodo master del cluster Storm. Tutti gli altri nodi nel cluster vengono chiamati comeworker nodes. Il nodo principale è responsabile della distribuzione dei dati tra tutti i nodi di lavoro, dell'assegnazione di attività ai nodi di lavoro e del monitoraggio degli errori.
Supervisore I nodi che seguono le istruzioni fornite dal nimbus sono chiamati Supervisori. UNsupervisor ha più processi di lavoro e governa i processi di lavoro per completare le attività assegnate dal nimbus.
Processo di lavoro Un processo di lavoro eseguirà attività correlate a una topologia specifica. Un processo di lavoro non eseguirà un'attività da solo, ma creaexecutorse chiede loro di eseguire un compito particolare. Un processo di lavoro avrà più esecutori.
Esecutore Un esecutore non è altro che un singolo thread generato da un processo di lavoro. Un esecutore esegue una o più attività ma solo per uno specifico beccuccio o catenaccio.
Compito Un'attività esegue l'effettiva elaborazione dei dati. Quindi, o è un beccuccio o un bullone.
Framework ZooKeeper

Apache ZooKeeper è un servizio utilizzato da un cluster (gruppo di nodi) per coordinarsi tra loro e mantenere i dati condivisi con solide tecniche di sincronizzazione. Nimbus è senza stato, quindi dipende da ZooKeeper per monitorare lo stato del nodo di lavoro.

ZooKeeper aiuta il supervisore a interagire con il nimbus. È responsabile del mantenimento dello stato di nimbus e supervisore.

Storm è di natura apolide. Anche se la natura apolide presenta i suoi svantaggi, in realtà aiuta Storm a elaborare i dati in tempo reale nel modo migliore e più rapido possibile.

Storm non è però del tutto apolide. Memorizza il suo stato in Apache ZooKeeper. Poiché lo stato è disponibile in Apache ZooKeeper, un nimbus guasto può essere riavviato e fatto funzionare da dove era partito. Di solito, strumenti di monitoraggio del servizio comemonit monitorerà Nimbus e lo riavvierà in caso di guasto.

Apache Storm ha anche una topologia avanzata chiamata Trident Topologycon manutenzione dello stato e fornisce anche un'API di alto livello come Pig. Discuteremo tutte queste caratteristiche nei prossimi capitoli.

Un cluster Storm funzionante dovrebbe avere un nimbus e uno o più supervisori. Un altro nodo importante è Apache ZooKeeper, che verrà utilizzato per il coordinamento tra il nimbus e i supervisori.

Diamo ora uno sguardo da vicino al flusso di lavoro di Apache Storm -

  • Inizialmente, il nimbus attenderà che la "Topologia tempesta" gli venga sottoposta.

  • Una volta inviata una topologia, elaborerà la topologia e raccoglierà tutte le attività che devono essere eseguite e l'ordine in cui l'attività deve essere eseguita.

  • Quindi, il nimbus distribuirà uniformemente i compiti a tutti i supervisori disponibili.

  • In un determinato intervallo di tempo, tutti i supervisori invieranno i battiti cardiaci all'aureola per informare che sono ancora vivi.

  • Quando un supervisore muore e non invia un battito cardiaco al nimbus, il nimbus assegna i compiti a un altro supervisore.

  • Quando il nimbo stesso muore, i supervisori lavoreranno sul compito già assegnato senza alcun problema.

  • Una volta completate tutte le attività, il supervisore attenderà l'arrivo di una nuova attività.

  • Nel frattempo, il nimbus morto verrà riavviato automaticamente dagli strumenti di monitoraggio del servizio.

  • Il nimbus riavviato continuerà da dove si era fermato. Allo stesso modo, anche il supervisore morto può essere riavviato automaticamente. Poiché sia ​​il nimbus che il supervisore possono essere riavviati automaticamente ed entrambi continueranno come prima, è garantito che Storm elabori tutte le attività almeno una volta.

  • Una volta che tutte le topologie sono state elaborate, il nimbus attende l'arrivo di una nuova topologia e allo stesso modo il supervisore attende nuove attività.

Per impostazione predefinita, ci sono due modalità in un cluster Storm:

  • Local mode- Questa modalità viene utilizzata per lo sviluppo, il test e il debug perché è il modo più semplice per vedere tutti i componenti della topologia che lavorano insieme. In questa modalità, possiamo regolare i parametri che ci consentono di vedere come la nostra topologia viene eseguita in diversi ambienti di configurazione Storm. In modalità locale, le topologie Storm vengono eseguite sulla macchina locale in una singola JVM.

  • Production mode- In questa modalità, sottoponiamo la nostra topologia al cluster working storm, che è composto da molti processi, solitamente in esecuzione su macchine diverse. Come discusso nel flusso di lavoro di storm, un cluster funzionante verrà eseguito a tempo indeterminato fino a quando non verrà arrestato.

Apache Storm elabora i dati in tempo reale e l'input proviene normalmente da un sistema di accodamento dei messaggi. Un sistema di messaggistica distribuito esterno fornirà l'input necessario per il calcolo in tempo reale. Spout leggerà i dati dal sistema di messaggistica e li convertirà in tuple e immetterli in Apache Storm. Il fatto interessante è che Apache Storm utilizza internamente il proprio sistema di messaggistica distribuito per la comunicazione tra il suo nimbus e il supervisore.

Cos'è il sistema di messaggistica distribuita?

La messaggistica distribuita si basa sul concetto di accodamento dei messaggi affidabile. I messaggi vengono accodati in modo asincrono tra le applicazioni client e i sistemi di messaggistica. Un sistema di messaggistica distribuito offre i vantaggi di affidabilità, scalabilità e persistenza.

La maggior parte degli schemi di messaggistica segue il publish-subscribe modello (semplicemente Pub-Sub) dove vengono chiamati i mittenti dei messaggi publishers e vengono chiamati coloro che vogliono ricevere i messaggi subscribers.

Una volta che il messaggio è stato pubblicato dal mittente, gli abbonati possono ricevere il messaggio selezionato con l'aiuto di un'opzione di filtro. Di solito abbiamo due tipi di filtri, uno ètopic-based filtering e un altro lo è content-based filtering.

Notare che il modello pub-sub può comunicare solo tramite messaggi. È un'architettura molto vagamente accoppiata; anche i mittenti non sanno chi sono i loro iscritti. Molti modelli di messaggi consentono al broker di messaggi di scambiare messaggi di pubblicazione per un accesso tempestivo da parte di molti abbonati. Un esempio di vita reale è Dish TV, che pubblica diversi canali come sport, film, musica, ecc. E chiunque può iscriversi al proprio set di canali e ottenerli ogni volta che i canali a cui si è iscritti sono disponibili.

La tabella seguente descrive alcuni dei più diffusi sistemi di messaggistica ad alto rendimento:

Sistema di messaggistica distribuito Descrizione
Apache Kafka Kafka è stato sviluppato presso LinkedIn Corporation e in seguito è diventato un sottoprogetto di Apache. Apache Kafka si basa su un modello di pubblicazione-sottoscrizione distribuito, persistente e abilitato al broker. Kafka è veloce, scalabile e altamente efficiente.
RabbitMQ RabbitMQ è una robusta applicazione di messaggistica distribuita open source. È facile da usare e funziona su tutte le piattaforme.
JMS (Java Message Service) JMS è un'API open source che supporta la creazione, la lettura e l'invio di messaggi da un'applicazione all'altra. Fornisce la consegna dei messaggi garantita e segue il modello di pubblicazione-sottoscrizione.
ActiveMQ Il sistema di messaggistica ActiveMQ è un'API open source di JMS.
ZeroMQ ZeroMQ è l'elaborazione di messaggi peer-peer senza broker. Fornisce modelli di messaggi push-pull, router-dealer.
Gheppio Kestrel è una coda di messaggi distribuita veloce, affidabile e semplice.

Protocollo parsimonioso

Thrift è stato creato su Facebook per lo sviluppo di servizi cross-language e RPC (Remote Procedure Call). Successivamente, è diventato un progetto Apache open source. Apache Thrift è un fileInterface Definition Language e consente di definire nuovi tipi di dati e implementazione di servizi in aggiunta ai tipi di dati definiti in modo semplice.

Apache Thrift è anche un framework di comunicazione che supporta sistemi embedded, applicazioni mobili, applicazioni web e molti altri linguaggi di programmazione. Alcune delle caratteristiche chiave associate ad Apache Thrift sono la sua modularità, flessibilità e prestazioni elevate. Inoltre, può eseguire streaming, messaggistica e RPC in applicazioni distribuite.

Storm utilizza ampiamente il protocollo Thrift per la comunicazione interna e la definizione dei dati. La topologia Storm è sempliceThrift Structs. Storm Nimbus che esegue la topologia in Apache Storm è un fileThrift service.

Vediamo ora come installare il framework Apache Storm sulla tua macchina. Ci sono tre passaggi majo qui -

  • Installa Java sul tuo sistema, se non lo hai già.
  • Installa il framework ZooKeeper.
  • Installa il framework Apache Storm.

Passaggio 1: verifica dell'installazione di Java

Utilizzare il seguente comando per verificare se Java è già installato sul sistema.

$ java -version

Se Java è già presente, vedrai il suo numero di versione. Altrimenti, scarica l'ultima versione di JDK.

Passaggio 1.1: scarica JDK

Scarica l'ultima versione di JDK utilizzando il seguente collegamento - www.oracle.com

L'ultima versione è JDK 8u 60 e il file è “jdk-8u60-linux-x64.tar.gz”. Scarica il file sulla tua macchina.

Passaggio 1.2: estrai i file

Generalmente i file vengono scaricati nel file downloadscartella. Estrai il setup di tar usando i seguenti comandi.

$ cd /go/to/download/path
$ tar -zxf jdk-8u60-linux-x64.gz

Passaggio 1.3: passare alla directory opt

Per rendere Java disponibile a tutti gli utenti, spostare il contenuto java estratto nella cartella "/ usr / local / java".

$ su
password: (type password of root user)
$ mkdir /opt/jdk
$ mv jdk-1.8.0_60 /opt/jdk/

Passaggio 1.4 - Imposta percorso

Per impostare il percorso e le variabili JAVA_HOME, aggiungi i seguenti comandi al file ~ / .bashrc.

export JAVA_HOME =/usr/jdk/jdk-1.8.0_60
export PATH=$PATH:$JAVA_HOME/bin

Ora applica tutte le modifiche al sistema in esecuzione corrente.

$ source ~/.bashrc

Passaggio 1.5 - Alternative Java

Utilizzare il seguente comando per modificare le alternative Java.

update-alternatives --install /usr/bin/java java /opt/jdk/jdk1.8.0_60/bin/java 100

Passaggio 1.6

Ora verifica l'installazione di Java utilizzando il comando di verifica (java -version) spiegato nel passaggio 1.

Passaggio 2: installazione di ZooKeeper Framework

Passaggio 2.1 - Scarica ZooKeeper

Per installare il framework ZooKeeper sul tuo computer, visita il seguente link e scarica l'ultima versione di ZooKeeper http://zookeeper.apache.org/releases.html

Al momento, l'ultima versione di ZooKeeper è la 3.4.6 (ZooKeeper-3.4.6.tar.gz).

Passaggio 2.2: estrai il file tar

Estrai il file tar utilizzando i seguenti comandi:

$ cd opt/
$ tar -zxf zookeeper-3.4.6.tar.gz
$ cd zookeeper-3.4.6
$ mkdir data

Passaggio 2.3: creare il file di configurazione

Aprire il file di configurazione denominato "conf / zoo.cfg" utilizzando il comando "vi conf / zoo.cfg" e impostando tutti i seguenti parametri come punto di partenza.

$ vi conf/zoo.cfg
tickTime=2000
dataDir=/path/to/zookeeper/data
clientPort=2181
initLimit=5
syncLimit=2

Una volta che il file di configurazione è stato salvato con successo, puoi avviare il server ZooKeeper.

Passaggio 2.4 - Avvia il server ZooKeeper

Usa il seguente comando per avviare il server ZooKeeper.

$ bin/zkServer.sh start

Dopo aver eseguito questo comando, riceverai una risposta come segue:

$ JMX enabled by default
$ Using config: /Users/../zookeeper-3.4.6/bin/../conf/zoo.cfg
$ Starting zookeeper ... STARTED

Passaggio 2.5: avviare la CLI

Utilizzare il seguente comando per avviare la CLI.

$ bin/zkCli.sh

Dopo aver eseguito il comando precedente, sarai connesso al server ZooKeeper e riceverai la seguente risposta.

Connecting to localhost:2181
................
................
................
Welcome to ZooKeeper!
................
................
WATCHER::
WatchedEvent state:SyncConnected type: None path:null
[zk: localhost:2181(CONNECTED) 0]

Passaggio 2.6 - Arresta il server ZooKeeper

Dopo aver connesso il server ed eseguito tutte le operazioni, puoi arrestare il server ZooKeeper utilizzando il seguente comando.

bin/zkServer.sh stop

Hai installato correttamente Java e ZooKeeper sulla tua macchina. Vediamo ora i passaggi per installare il framework Apache Storm.

Passaggio 3: installazione di Apache Storm Framework

Passaggio 3.1 Scarica Storm

Per installare Storm Framework sulla tua macchina, visita il seguente link e scarica l'ultima versione di Storm http://storm.apache.org/downloads.html

Al momento, l'ultima versione di Storm è "apache-storm-0.9.5.tar.gz".

Passaggio 3.2: estrai il file tar

Estrai il file tar utilizzando i seguenti comandi:

$ cd opt/
$ tar -zxf apache-storm-0.9.5.tar.gz
$ cd apache-storm-0.9.5
$ mkdir data

Passaggio 3.3 - Apri il file di configurazione

La versione corrente di Storm contiene un file in "conf / storm.yaml" che configura i daemon di Storm. Aggiungi le seguenti informazioni a quel file.

$ vi conf/storm.yaml
storm.zookeeper.servers:
 - "localhost"
storm.local.dir: “/path/to/storm/data(any path)”
nimbus.host: "localhost"
supervisor.slots.ports:
 - 6700
 - 6701
 - 6702
 - 6703

Dopo aver applicato tutte le modifiche, salva e torna al terminale.

Passaggio 3.4: avviare Nimbus

$ bin/storm nimbus

Passaggio 3.5: avviare il supervisore

$ bin/storm supervisor

Passaggio 3.6 Avvia l'interfaccia utente

$ bin/storm ui

Dopo aver avviato l'applicazione dell'interfaccia utente di Storm, digitare l'URL http://localhost:8080nel tuo browser preferito e potresti vedere le informazioni sul cluster Storm e la sua topologia in esecuzione. La pagina dovrebbe essere simile allo screenshot seguente.

Abbiamo esaminato i dettagli tecnici principali di Apache Storm e ora è il momento di codificare alcuni semplici scenari.

Scenario - Analizzatore del registro delle chiamate mobili

La chiamata mobile e la sua durata verranno fornite come input ad Apache Storm e Storm elaborerà e raggrupperà la chiamata tra lo stesso chiamante e destinatario e il numero totale di chiamate.

Creazione beccuccio

Spout è un componente utilizzato per la generazione di dati. Fondamentalmente, uno spout implementerà un'interfaccia IRichSpout. L'interfaccia "IRichSpout" ha i seguenti metodi importanti:

  • open- Fornisce al beccuccio un ambiente da eseguire. Gli esecutori eseguiranno questo metodo per inizializzare lo spout.

  • nextTuple - Emette i dati generati tramite il collector.

  • close - Questo metodo viene chiamato quando uno spout sta per spegnersi.

  • declareOutputFields - Dichiara lo schema di output della tupla.

  • ack - Riconosce che una tupla specifica viene elaborata

  • fail - Specifica che una tupla specifica non viene elaborata e non deve essere rielaborata.

Aperto

La firma del open il metodo è il seguente:

open(Map conf, TopologyContext context, SpoutOutputCollector collector)
  • conf - Fornisce la configurazione tempesta per questo beccuccio.

  • context - Fornisce informazioni complete sulla posizione del beccuccio all'interno della topologia, il suo ID attività, informazioni di input e output.

  • collector - Ci consente di emettere la tupla che verrà elaborata dai bolt.

nextTuple

La firma del nextTuple il metodo è il seguente:

nextTuple()

nextTuple () viene chiamato periodicamente dallo stesso ciclo dei metodi ack () e fail (). Deve rilasciare il controllo del thread quando non c'è lavoro da fare, in modo che gli altri metodi abbiano la possibilità di essere chiamati. Quindi la prima riga di nextTuple controlla se l'elaborazione è terminata. In tal caso, dovrebbe restare inattivo per almeno un millisecondo per ridurre il carico sul processore prima di tornare.

vicino

La firma del close il metodo è il seguente:

close()

declareOutputFields

La firma del declareOutputFields il metodo è il seguente:

declareOutputFields(OutputFieldsDeclarer declarer)

declarer - Viene utilizzato per dichiarare gli ID del flusso di output, i campi di output, ecc.

Questo metodo viene utilizzato per specificare lo schema di output della tupla.

ack

La firma del ack il metodo è il seguente:

ack(Object msgId)

Questo metodo riconosce che una tupla specifica è stata elaborata.

fallire

La firma del nextTuple il metodo è il seguente:

ack(Object msgId)

Questo metodo informa che una tupla specifica non è stata completamente elaborata. Storm rielaborerà la tupla specifica.

FakeCallLogReaderSpout

Nel nostro scenario, dobbiamo raccogliere i dettagli del registro delle chiamate. Le informazioni del registro delle chiamate contengono.

  • numero del chiamante
  • numero del ricevitore
  • duration

Poiché non disponiamo di informazioni in tempo reale sui registri delle chiamate, genereremo falsi registri delle chiamate. Le informazioni false verranno create utilizzando la classe Random. Di seguito viene fornito il codice completo del programma.

Codifica - FakeCallLogReaderSpout.java

import java.util.*;
//import storm tuple packages
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

//import Spout interface packages
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;

//Create a class FakeLogReaderSpout which implement IRichSpout interface 
   to access functionalities
	
public class FakeCallLogReaderSpout implements IRichSpout {
   //Create instance for SpoutOutputCollector which passes tuples to bolt.
   private SpoutOutputCollector collector;
   private boolean completed = false;
	
   //Create instance for TopologyContext which contains topology data.
   private TopologyContext context;
	
   //Create instance for Random class.
   private Random randomGenerator = new Random();
   private Integer idx = 0;

   @Override
   public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
      this.context = context;
      this.collector = collector;
   }

   @Override
   public void nextTuple() {
      if(this.idx <= 1000) {
         List<String> mobileNumbers = new ArrayList<String>();
         mobileNumbers.add("1234123401");
         mobileNumbers.add("1234123402");
         mobileNumbers.add("1234123403");
         mobileNumbers.add("1234123404");

         Integer localIdx = 0;
         while(localIdx++ < 100 && this.idx++ < 1000) {
            String fromMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
            String toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
				
            while(fromMobileNumber == toMobileNumber) {
               toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
            }
				
            Integer duration = randomGenerator.nextInt(60);
            this.collector.emit(new Values(fromMobileNumber, toMobileNumber, duration));
         }
      }
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("from", "to", "duration"));
   }

   //Override all the interface methods
   @Override
   public void close() {}

   public boolean isDistributed() {
      return false;
   }

   @Override
   public void activate() {}

   @Override 
   public void deactivate() {}

   @Override
   public void ack(Object msgId) {}

   @Override
   public void fail(Object msgId) {}

   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
}

Creazione di bulloni

Bolt è un componente che accetta le tuple come input, elabora la tupla e produce nuove tuple come output. Bolts implementeràIRichBoltinterfaccia. In questo programma, due classi di bulloniCallLogCreatorBolt e CallLogCounterBolt vengono utilizzati per eseguire le operazioni.

L'interfaccia IRichBolt ha i seguenti metodi:

  • prepare- Fornisce al bolt un ambiente da eseguire. Gli esecutori eseguiranno questo metodo per inizializzare lo spout.

  • execute - Elabora una singola tupla di input.

  • cleanup - Chiamato quando un chiavistello sta per spegnersi.

  • declareOutputFields - Dichiara lo schema di output della tupla.

Preparare

La firma del prepare il metodo è il seguente:

prepare(Map conf, TopologyContext context, OutputCollector collector)
  • conf - Fornisce la configurazione Storm per questo bolt.

  • context - Fornisce informazioni complete sulla posizione del bullone all'interno della topologia, il suo ID attività, informazioni di input e output, ecc.

  • collector - Ci consente di emettere la tupla elaborata.

eseguire

La firma del execute il metodo è il seguente:

execute(Tuple tuple)

Qui tuple è la tupla di input da elaborare.

Il executemetodo elabora una singola tupla alla volta. È possibile accedere ai dati della tupla tramite il metodo getValue della classe Tuple. Non è necessario elaborare immediatamente la tupla di input. Più tupla possono essere elaborate e restituite come una singola tupla di output. La tupla elaborata può essere emessa utilizzando la classe OutputCollector.

pulire

La firma del cleanup il metodo è il seguente:

cleanup()

declareOutputFields

La firma del declareOutputFields il metodo è il seguente:

declareOutputFields(OutputFieldsDeclarer declarer)

Qui il parametro declarer viene utilizzato per dichiarare gli ID del flusso di output, i campi di output, ecc.

Questo metodo viene utilizzato per specificare lo schema di output della tupla

Registro chiamate Creator Bolt

Il bolt creatore del registro chiamate riceve la tupla del registro chiamate. La tupla del registro delle chiamate ha il numero del chiamante, il numero del destinatario e la durata della chiamata. Questo bullone crea semplicemente un nuovo valore combinando il numero del chiamante e il numero del ricevitore. Il formato del nuovo valore è "Numero chiamante - Numero destinatario" ed è denominato come nuovo campo, "chiamata". Di seguito viene fornito il codice completo.

Codifica - CallLogCreatorBolt.java

//import util packages
import java.util.HashMap;
import java.util.Map;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;

//import Storm IRichBolt package
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

//Create a class CallLogCreatorBolt which implement IRichBolt interface
public class CallLogCreatorBolt implements IRichBolt {
   //Create instance for OutputCollector which collects and emits tuples to produce output
   private OutputCollector collector;

   @Override
   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      this.collector = collector;
   }

   @Override
   public void execute(Tuple tuple) {
      String from = tuple.getString(0);
      String to = tuple.getString(1);
      Integer duration = tuple.getInteger(2);
      collector.emit(new Values(from + " - " + to, duration));
   }

   @Override
   public void cleanup() {}

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("call", "duration"));
   }
	
   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
}

Registro chiamate Counter Bolt

Call log counter bolt riceve la chiamata e la sua durata come tupla. Questo bullone inizializza un oggetto dizionario (mappa) nel metodo prepare. Inexecutemetodo, controlla la tupla e crea una nuova voce nell'oggetto dizionario per ogni nuovo valore di “chiamata” nella tupla e imposta un valore 1 nell'oggetto dizionario. Per la voce già disponibile nel dizionario, incrementa semplicemente il suo valore. In termini semplici, questo bolt salva la chiamata e il suo conteggio nell'oggetto dizionario. Invece di salvare la chiamata e il suo conteggio nel dizionario, possiamo anche salvarla in un'origine dati. Il codice completo del programma è il seguente:

Codifica - CallLogCounterBolt.java

import java.util.HashMap;
import java.util.Map;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

public class CallLogCounterBolt implements IRichBolt {
   Map<String, Integer> counterMap;
   private OutputCollector collector;

   @Override
   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      this.counterMap = new HashMap<String, Integer>();
      this.collector = collector;
   }

   @Override
   public void execute(Tuple tuple) {
      String call = tuple.getString(0);
      Integer duration = tuple.getInteger(1);
		
      if(!counterMap.containsKey(call)){
         counterMap.put(call, 1);
      }else{
         Integer c = counterMap.get(call) + 1;
         counterMap.put(call, c);
      }
		
      collector.ack(tuple);
   }

   @Override
   public void cleanup() {
      for(Map.Entry<String, Integer> entry:counterMap.entrySet()){
         System.out.println(entry.getKey()+" : " + entry.getValue());
      }
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("call"));
   }
	
   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
	
}

Creazione della topologia

La topologia Storm è fondamentalmente una struttura Thrift. La classe TopologyBuilder fornisce metodi semplici e facili per creare topologie complesse. La classe TopologyBuilder dispone di metodi per impostare lo spout(setSpout) e per fissare il bullone (setBolt). Infine, TopologyBuilder ha createTopology per creare la topologia. Usa il seguente frammento di codice per creare una topologia:

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());

builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())
   .shuffleGrouping("call-log-reader-spout");

builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt())
   .fieldsGrouping("call-log-creator-bolt", new Fields("call"));

shuffleGrouping e fieldsGrouping i metodi aiutano a impostare il raggruppamento del flusso per beccuccio e bulloni.

Cluster locale

A scopo di sviluppo, possiamo creare un cluster locale utilizzando l'oggetto "LocalCluster" e quindi inviare la topologia utilizzando il metodo "submitTopology" della classe "LocalCluster". Uno degli argomenti per "submitTopology" è un'istanza della classe "Config". La classe "Config" viene utilizzata per impostare le opzioni di configurazione prima di inviare la topologia. Questa opzione di configurazione verrà unita alla configurazione del cluster in fase di esecuzione e inviata a tutte le attività (spout e bolt) con il metodo prepare. Una volta inviata la topologia al cluster, attenderemo 10 secondi affinché il cluster calcoli la topologia inviata e quindi spegneremo il cluster utilizzando il metodo "shutdown" di "LocalCluster". Il codice completo del programma è il seguente:

Codifica - LogAnalyserStorm.java

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

//import storm configuration packages
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;

//Create main class LogAnalyserStorm submit topology.
public class LogAnalyserStorm {
   public static void main(String[] args) throws Exception{
      //Create Config instance for cluster configuration
      Config config = new Config();
      config.setDebug(true);
		
      //
      TopologyBuilder builder = new TopologyBuilder();
      builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());

      builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())
         .shuffleGrouping("call-log-reader-spout");

      builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt())
         .fieldsGrouping("call-log-creator-bolt", new Fields("call"));
			
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("LogAnalyserStorm", config, builder.createTopology());
      Thread.sleep(10000);
		
      //Stop the topology
		
      cluster.shutdown();
   }
}

Creazione ed esecuzione dell'applicazione

L'applicazione completa ha quattro codici Java. Sono -

  • FakeCallLogReaderSpout.java
  • CallLogCreaterBolt.java
  • CallLogCounterBolt.java
  • LogAnalyerStorm.java

L'applicazione può essere creata utilizzando il seguente comando:

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java

L'applicazione può essere eseguita utilizzando il seguente comando:

java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserStorm

Produzione

Una volta avviata l'applicazione, verranno visualizzati i dettagli completi sul processo di avvio del cluster, l'elaborazione di spout e bolt e, infine, il processo di arresto del cluster. In "CallLogCounterBolt", abbiamo stampato la chiamata e i dettagli del conteggio. Queste informazioni verranno visualizzate sulla console come segue:

1234123402 - 1234123401 : 78
1234123402 - 1234123404 : 88
1234123402 - 1234123403 : 105
1234123401 - 1234123404 : 74
1234123401 - 1234123403 : 81
1234123401 - 1234123402 : 81
1234123403 - 1234123404 : 86
1234123404 - 1234123401 : 63
1234123404 - 1234123402 : 82
1234123403 - 1234123402 : 83
1234123404 - 1234123403 : 86
1234123403 - 1234123401 : 93

Linguaggi non JVM

Le topologie Storm sono implementate da interfacce Thrift che semplificano l'invio di topologie in qualsiasi lingua. Storm supporta Ruby, Python e molti altri linguaggi. Diamo un'occhiata al binding di Python.

Python Binding

Python è un linguaggio di programmazione interpretato per scopi generali, interattivo, orientato agli oggetti e di alto livello. Storm supporta Python per implementare la sua topologia. Python supporta le operazioni di emissione, ancoraggio, riconoscimento e registrazione.

Come sapete, i bulloni possono essere definiti in qualsiasi lingua. I bulloni scritti in un'altra lingua vengono eseguiti come processi secondari e Storm comunica con tali processi secondari con messaggi JSON su stdin / stdout. Per prima cosa prendi un esempio di Bolt WordCount che supporta l'associazione Python.

public static class WordCount implements IRichBolt {
   public WordSplit() {
      super("python", "splitword.py");
   }
	
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("word"));
   }
}

Qui la classe WordCount implementa il IRichBoltinterfaccia e in esecuzione con l'implementazione di Python specificato come argomento del super metodo "splitword.py". Ora crea un'implementazione di Python chiamata "splitword.py".

import storm
   class WordCountBolt(storm.BasicBolt):
      def process(self, tup):
         words = tup.values[0].split(" ")
         for word in words:
         storm.emit([word])
WordCountBolt().run()

Questa è l'implementazione di esempio per Python che conta le parole in una determinata frase. Allo stesso modo puoi collegarti anche ad altre lingue di supporto.

Trident è un'estensione di Storm. Come Storm, anche Trident è stato sviluppato da Twitter. Il motivo principale alla base dello sviluppo di Trident è fornire un'astrazione di alto livello su Storm insieme all'elaborazione del flusso con stato e alle query distribuite a bassa latenza.

Trident utilizza beccuccio e bullone, ma questi componenti di basso livello vengono generati automaticamente da Trident prima dell'esecuzione. Trident ha funzioni, filtri, join, raggruppamenti e aggregazioni.

Trident elabora i flussi come una serie di batch che vengono indicati come transazioni. Generalmente la dimensione di quei piccoli batch sarà dell'ordine di migliaia o milioni di tuple, a seconda del flusso di input. In questo modo, Trident è diverso da Storm, che esegue l'elaborazione tupla per tupla.

Il concetto di elaborazione batch è molto simile alle transazioni di database. Ad ogni transazione viene assegnato un ID transazione. La transazione è considerata riuscita, una volta completata tutta la sua elaborazione. Tuttavia, un errore nell'elaborazione di una delle tuple della transazione causerà la ritrasmissione dell'intera transazione. Per ogni batch, Trident chiamerà beginCommit all'inizio della transazione e si impegnerà alla fine di essa.

Topologia tridente

Trident API espone una semplice opzione per creare topologia Trident utilizzando la classe "TridentTopology". Fondamentalmente, la topologia Trident riceve il flusso di input dallo spout ed esegue una sequenza di operazioni ordinata (filtro, aggregazione, raggruppamento, ecc.) Sul flusso. Storm Tuple è sostituito da Trident Tuple e Bolts sono sostituiti da operazioni. Una semplice topologia Trident può essere creata come segue:

TridentTopology topology = new TridentTopology();

Tuple Trident

La tupla trident è un elenco di valori denominato. L'interfaccia TridentTuple è il modello di dati di una topologia Trident. L'interfaccia TridentTuple è l'unità di base dei dati che può essere elaborata da una topologia Trident.

Trident Beccuccio

Il beccuccio Trident è simile al beccuccio Storm, con opzioni aggiuntive per utilizzare le funzionalità di Trident. In realtà, possiamo ancora utilizzare IRichSpout, che abbiamo utilizzato nella topologia Storm, ma sarà di natura non transazionale e non saremo in grado di utilizzare i vantaggi forniti da Trident.

Il beccuccio di base con tutte le funzionalità per utilizzare le caratteristiche di Trident è "ITridentSpout". Supporta semantica transazionale e opaca. Gli altri beccucci sono IBatchSpout, IPartitionedTridentSpout e IOpaquePartitionedTridentSpout.

Oltre a questi beccucci generici, Trident ha molti esempi di implementazione del beccuccio trident. Uno di questi è FeederBatchSpout beccuccio, che possiamo utilizzare per inviare facilmente un elenco denominato di tuple trident senza preoccuparci dell'elaborazione batch, del parallelismo, ecc.

La creazione di FeederBatchSpout e l'alimentazione dei dati possono essere eseguite come mostrato di seguito:

TridentTopology topology = new TridentTopology();
FeederBatchSpout testSpout = new FeederBatchSpout(
   ImmutableList.of("fromMobileNumber", "toMobileNumber", “duration”));
topology.newStream("fixed-batch-spout", testSpout)
testSpout.feed(ImmutableList.of(new Values("1234123401", "1234123402", 20)));

Trident Operations

Trident si basa sull '"operazione Trident" per elaborare il flusso di input delle tuple trident. L'API Trident ha una serie di operazioni integrate per gestire l'elaborazione del flusso da semplice a complessa. Queste operazioni vanno dalla semplice convalida al raggruppamento e aggregazione complessi di tuple tridenti. Esaminiamo le operazioni più importanti e utilizzate di frequente.

Filtro

Il filtro è un oggetto utilizzato per eseguire l'attività di convalida dell'input. Un filtro Trident ottiene un sottoinsieme di campi di tupla trident come input e restituisce true o false a seconda che determinate condizioni siano soddisfatte o meno. Se viene restituito true, la tupla viene mantenuta nel flusso di output; in caso contrario, la tupla viene rimossa dal flusso. Il filtro erediterà fondamentalmente daBaseFilter class e implementare il isKeepmetodo. Ecco un'implementazione di esempio dell'operazione di filtro:

public class MyFilter extends BaseFilter {
   public boolean isKeep(TridentTuple tuple) {
      return tuple.getInteger(1) % 2 == 0;
   }
}

input

[1, 2]
[1, 3]
[1, 4]

output

[1, 2]
[1, 4]

La funzione di filtro può essere chiamata nella topologia utilizzando il metodo "each". La classe "Fields" può essere utilizzata per specificare l'input (sottoinsieme della tupla tridente). Il codice di esempio è il seguente:

TridentTopology topology = new TridentTopology();
topology.newStream("spout", spout)
.each(new Fields("a", "b"), new MyFilter())

Funzione

Functionè un oggetto utilizzato per eseguire una semplice operazione su una singola tupla tridente. Prende un sottoinsieme di campi di tupla trident ed emette zero o più nuovi campi di tupla trident.

Function fondamentalmente eredita da BaseFunction class e implementa il executemetodo. Di seguito viene fornita un'implementazione di esempio:

public class MyFunction extends BaseFunction {
   public void execute(TridentTuple tuple, TridentCollector collector) {
      int a = tuple.getInteger(0);
      int b = tuple.getInteger(1);
      collector.emit(new Values(a + b));
   }
}

input

[1, 2]
[1, 3]
[1, 4]

output

[1, 2, 3]
[1, 3, 4]
[1, 4, 5]

Proprio come l'operazione Filter, l'operazione Function può essere chiamata in una topologia utilizzando il eachmetodo. Il codice di esempio è il seguente:

TridentTopology topology = new TridentTopology();
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d")));

Aggregazione

L'aggregazione è un oggetto utilizzato per eseguire operazioni di aggregazione su un batch di input, una partizione o un flusso. Trident ha tre tipi di aggregazione. Sono i seguenti:

  • aggregate- Aggrega ogni batch di tupla trident in isolamento. Durante il processo di aggregazione, le tuple vengono inizialmente ripartizionate utilizzando il raggruppamento globale per combinare tutte le partizioni dello stesso batch in una singola partizione.

  • partitionAggregate- Aggrega ogni partizione invece dell'intero batch di tupla tridente. L'output dell'aggregazione di partizioni sostituisce completamente la tupla di input. L'output dell'aggregato della partizione contiene una singola tupla di campo.

  • persistentaggregate - Aggrega tutte le tupla trident in tutto il batch e archivia il risultato in memoria o database.

TridentTopology topology = new TridentTopology();

// aggregate operation
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .aggregate(new Count(), new Fields(“count”))
	
// partitionAggregate operation
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .partitionAggregate(new Count(), new Fields(“count"))
	
// persistentAggregate - saving the count to memory
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));

L'operazione di aggregazione può essere creata utilizzando CombinerAggregator, ReducerAggregator o l'interfaccia generica dell'aggregatore. L'aggregatore "count" utilizzato nell'esempio precedente è uno degli aggregatori incorporati. È implementato utilizzando "CombinerAggregator". L'implementazione è la seguente:

public class Count implements CombinerAggregator<Long> {
   @Override
   public Long init(TridentTuple tuple) {
      return 1L;
   }
	
   @Override
   public Long combine(Long val1, Long val2) {
      return val1 + val2;
   }
	
   @Override
   public Long zero() {
      return 0L;
   }
}

Raggruppamento

L'operazione di raggruppamento è un'operazione incorporata e può essere chiamata da groupBymetodo. Il metodo groupBy ripartiziona il flusso eseguendo una partitionBy sui campi specificati, quindi all'interno di ciascuna partizione raggruppa le tuple i cui campi di gruppo sono uguali. Normalmente, usiamo "groupBy" insieme a "persistentAggregate" per ottenere l'aggregazione raggruppata. Il codice di esempio è il seguente:

TridentTopology topology = new TridentTopology();

// persistentAggregate - saving the count to memory
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .groupBy(new Fields(“d”)
   .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));

Fusione e unione

La fusione e l'unione possono essere eseguite utilizzando rispettivamente il metodo "merge" e "join". L'unione combina uno o più flussi. L'unione è simile all'unione, tranne per il fatto che l'unione utilizza il campo della tupla tridente da entrambi i lati per controllare e unire due flussi. Inoltre, l'unione funzionerà solo a livello di batch. Il codice di esempio è il seguente:

TridentTopology topology = new TridentTopology();
topology.merge(stream1, stream2, stream3);
topology.join(stream1, new Fields("key"), stream2, new Fields("x"), 
   new Fields("key", "a", "b", "c"));

Stato di manutenzione

Trident fornisce un meccanismo per la manutenzione dello stato. Le informazioni sullo stato possono essere memorizzate nella topologia stessa, altrimenti è possibile memorizzarle anche in un database separato. Il motivo è mantenere uno stato che se una tupla fallisce durante l'elaborazione, la tupla fallita viene ritentata. Ciò crea un problema durante l'aggiornamento dello stato perché non si è sicuri se lo stato di questa tupla sia stato aggiornato in precedenza o meno. Se la tupla non è riuscita prima dell'aggiornamento dello stato, riprovare la tupla renderà lo stato stabile. Tuttavia, se la tupla non è riuscita dopo l'aggiornamento dello stato, riprovare la stessa tupla aumenterà di nuovo il conteggio nel database e renderà lo stato instabile. È necessario eseguire i seguenti passaggi per garantire che un messaggio venga elaborato una sola volta:

  • Elaborare le tuple in piccoli lotti.

  • Assegna un ID univoco a ogni batch. Se il batch viene ritentato, viene assegnato lo stesso ID univoco.

  • Gli aggiornamenti di stato vengono ordinati tra batch. Ad esempio, l'aggiornamento dello stato del secondo batch non sarà possibile fino al completamento dell'aggiornamento dello stato del primo batch.

RPC distribuito

RPC distribuito viene utilizzato per interrogare e recuperare il risultato dalla topologia Trident. Storm ha un server RPC distribuito integrato. Il server RPC distribuito riceve la richiesta RPC dal client e la passa alla topologia. La topologia elabora la richiesta e invia il risultato al server RPC distribuito, che viene reindirizzato dal server RPC distribuito al client. La query RPC distribuita di Trident viene eseguita come una normale query RPC, tranne per il fatto che queste query vengono eseguite in parallelo.

Quando usare Trident?

Come in molti casi d'uso, se il requisito è elaborare una query una sola volta, possiamo ottenerlo scrivendo una topologia in Trident. D'altra parte, sarà difficile ottenere una volta esattamente l'elaborazione nel caso di Storm. Quindi Trident sarà utile per quei casi d'uso in cui è necessaria una sola elaborazione. Trident non è adatto a tutti i casi d'uso, in particolare i casi d'uso ad alte prestazioni perché aggiunge complessità a Storm e gestisce lo stato.

Esempio di lavoro di Trident

Convertiremo la nostra applicazione di analisi del registro delle chiamate elaborata nella sezione precedente al framework Trident. L'applicazione Trident sarà relativamente semplice rispetto alla normale tempesta, grazie alla sua API di alto livello. A Storm sarà fondamentalmente richiesto di eseguire una qualsiasi delle operazioni di funzione, filtro, aggregazione, raggruppamento, unione e unione in Trident. Infine avvieremo il server DRPC utilizzando ilLocalDRPC class e cerca qualche parola chiave usando il execute metodo della classe LocalDRPC.

Formattazione delle informazioni sulla chiamata

Lo scopo della classe FormatCall è formattare le informazioni sulla chiamata comprendendo "Numero chiamante" e "Numero destinatario". Il codice completo del programma è il seguente:

Codifica: FormatCall.java

import backtype.storm.tuple.Values;

import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;

public class FormatCall extends BaseFunction {
   @Override
   public void execute(TridentTuple tuple, TridentCollector collector) {
      String fromMobileNumber = tuple.getString(0);
      String toMobileNumber = tuple.getString(1);
      collector.emit(new Values(fromMobileNumber + " - " + toMobileNumber));
   }
}

CSVSplit

Lo scopo della classe CSVSplit è dividere la stringa di input in base a "virgola (,)" ed emettere ogni parola nella stringa. Questa funzione viene utilizzata per analizzare l'argomento di input delle query distribuite. Il codice completo è il seguente:

Codifica: CSVSplit.java

import backtype.storm.tuple.Values;

import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;

public class CSVSplit extends BaseFunction {
   @Override
   public void execute(TridentTuple tuple, TridentCollector collector) {
      for(String word: tuple.getString(0).split(",")) {
         if(word.length() > 0) {
            collector.emit(new Values(word));
         }
      }
   }
}

Log Analyzer

Questa è l'applicazione principale. Inizialmente, l'applicazione inizializzerà TridentTopology e fornirà informazioni sul chiamante utilizzandoFeederBatchSpout. Il flusso di topologia Trident può essere creato utilizzando ilnewStreammetodo della classe TridentTopology. Allo stesso modo, il flusso DRPC della topologia Trident può essere creato utilizzando l'estensionenewDRCPStreammetodo della classe TridentTopology. È possibile creare un semplice server DRCP utilizzando la classe LocalDRPC.LocalDRPCha il metodo di esecuzione per cercare qualche parola chiave. Di seguito viene fornito il codice completo.

Codifica: LogAnalyserTrident.java

import java.util.*;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.utils.DRPCClient;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import storm.trident.TridentState;
import storm.trident.TridentTopology;
import storm.trident.tuple.TridentTuple;

import storm.trident.operation.builtin.FilterNull;
import storm.trident.operation.builtin.Count;
import storm.trident.operation.builtin.Sum;
import storm.trident.operation.builtin.MapGet;
import storm.trident.operation.builtin.Debug;
import storm.trident.operation.BaseFilter;

import storm.trident.testing.FixedBatchSpout;
import storm.trident.testing.FeederBatchSpout;
import storm.trident.testing.Split;
import storm.trident.testing.MemoryMapState;

import com.google.common.collect.ImmutableList;

public class LogAnalyserTrident {
   public static void main(String[] args) throws Exception {
      System.out.println("Log Analyser Trident");
      TridentTopology topology = new TridentTopology();
		
      FeederBatchSpout testSpout = new FeederBatchSpout(ImmutableList.of("fromMobileNumber",
         "toMobileNumber", "duration"));

      TridentState callCounts = topology
         .newStream("fixed-batch-spout", testSpout)
         .each(new Fields("fromMobileNumber", "toMobileNumber"), 
         new FormatCall(), new Fields("call"))
         .groupBy(new Fields("call"))
         .persistentAggregate(new MemoryMapState.Factory(), new Count(), 
         new Fields("count"));

      LocalDRPC drpc = new LocalDRPC();

      topology.newDRPCStream("call_count", drpc)
         .stateQuery(callCounts, new Fields("args"), new MapGet(), new Fields("count"));

      topology.newDRPCStream("multiple_call_count", drpc)
         .each(new Fields("args"), new CSVSplit(), new Fields("call"))
         .groupBy(new Fields("call"))
         .stateQuery(callCounts, new Fields("call"), new MapGet(), 
         new Fields("count"))
         .each(new Fields("call", "count"), new Debug())
         .each(new Fields("count"), new FilterNull())
         .aggregate(new Fields("count"), new Sum(), new Fields("sum"));

      Config conf = new Config();
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("trident", conf, topology.build());
      Random randomGenerator = new Random();
      int idx = 0;
		
      while(idx < 10) {
         testSpout.feed(ImmutableList.of(new Values("1234123401", 
            "1234123402", randomGenerator.nextInt(60))));

         testSpout.feed(ImmutableList.of(new Values("1234123401", 
            "1234123403", randomGenerator.nextInt(60))));

         testSpout.feed(ImmutableList.of(new Values("1234123401", 
            "1234123404", randomGenerator.nextInt(60))));

         testSpout.feed(ImmutableList.of(new Values("1234123402", 
            "1234123403", randomGenerator.nextInt(60))));

         idx = idx + 1;
      }

      System.out.println("DRPC : Query starts");
      System.out.println(drpc.execute("call_count","1234123401 - 1234123402"));
      System.out.println(drpc.execute("multiple_call_count", "1234123401 -
         1234123402,1234123401 - 1234123403"));
      System.out.println("DRPC : Query ends");

      cluster.shutdown();
      drpc.shutdown();

      // DRPCClient client = new DRPCClient("drpc.server.location", 3772);
   }
}

Creazione ed esecuzione dell'applicazione

L'applicazione completa ha tre codici Java. Sono i seguenti:

  • FormatCall.java
  • CSVSplit.java
  • LogAnalyerTrident.java

L'applicazione può essere creata utilizzando il seguente comando:

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java

L'applicazione può essere eseguita utilizzando il seguente comando:

java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserTrident

Produzione

Una volta avviata l'applicazione, l'applicazione produrrà i dettagli completi sul processo di avvio del cluster, sull'elaborazione delle operazioni, sul server DRPC e sulle informazioni sul client e, infine, sul processo di arresto del cluster. Questo output verrà visualizzato sulla console come mostrato di seguito.

DRPC : Query starts
[["1234123401 - 1234123402",10]]
DEBUG: [1234123401 - 1234123402, 10]
DEBUG: [1234123401 - 1234123403, 10]
[[20]]
DRPC : Query ends

In questo capitolo, discuteremo un'applicazione in tempo reale di Apache Storm. Vedremo come Storm viene utilizzato su Twitter.

Twitter

Twitter è un servizio di social networking online che fornisce una piattaforma per inviare e ricevere tweet degli utenti. Gli utenti registrati possono leggere e pubblicare tweet, ma gli utenti non registrati possono solo leggere i tweet. L'hashtag viene utilizzato per classificare i tweet per parola chiave aggiungendo # prima della parola chiave pertinente. Ora prendiamo uno scenario in tempo reale per trovare l'hashtag più utilizzato per argomento.

Creazione beccuccio

Lo scopo di spout è ottenere i tweet inviati dalle persone il prima possibile. Twitter fornisce "Twitter Streaming API", uno strumento basato su servizi web per recuperare i tweet inviati dalle persone in tempo reale. È possibile accedere all'API Twitter Streaming in qualsiasi linguaggio di programmazione.

twitter4j è una libreria Java open source, non ufficiale, che fornisce un modulo basato su Java per accedere facilmente all'API Twitter Streaming. twitter4jfornisce un framework basato sull'ascoltatore per accedere ai tweet. Per accedere all'API di streaming di Twitter, è necessario accedere all'account sviluppatore di Twitter e ottenere i seguenti dettagli di autenticazione OAuth.

  • Customerkey
  • CustomerSecret
  • AccessToken
  • AccessTookenSecret

Storm fornisce un beccuccio Twitter, TwitterSampleSpout,nel suo starter kit. Lo useremo per recuperare i tweet. Lo spout richiede i dettagli di autenticazione OAuth e almeno una parola chiave. Il beccuccio emetterà tweet in tempo reale basati su parole chiave. Di seguito viene fornito il codice completo del programma.

Codifica: TwitterSampleSpout.java

import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;

import twitter4j.FilterQuery;
import twitter4j.StallWarning;
import twitter4j.Status;
import twitter4j.StatusDeletionNotice;
import twitter4j.StatusListener;

import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;
import twitter4j.auth.AccessToken;
import twitter4j.conf.ConfigurationBuilder;

import backtype.storm.Config;
import backtype.storm.spout.SpoutOutputCollector;

import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import backtype.storm.utils.Utils;

@SuppressWarnings("serial")
public class TwitterSampleSpout extends BaseRichSpout {
   SpoutOutputCollector _collector;
   LinkedBlockingQueue<Status> queue = null;
   TwitterStream _twitterStream;
		
   String consumerKey;
   String consumerSecret;
   String accessToken;
   String accessTokenSecret;
   String[] keyWords;
		
   public TwitterSampleSpout(String consumerKey, String consumerSecret,
      String accessToken, String accessTokenSecret, String[] keyWords) {
         this.consumerKey = consumerKey;
         this.consumerSecret = consumerSecret;
         this.accessToken = accessToken;
         this.accessTokenSecret = accessTokenSecret;
         this.keyWords = keyWords;
   }
		
   public TwitterSampleSpout() {
      // TODO Auto-generated constructor stub
   }
		
   @Override
   public void open(Map conf, TopologyContext context,
      SpoutOutputCollector collector) {
         queue = new LinkedBlockingQueue<Status>(1000);
         _collector = collector;
         StatusListener listener = new StatusListener() {
            @Override
            public void onStatus(Status status) {
               queue.offer(status);
            }
					
            @Override
            public void onDeletionNotice(StatusDeletionNotice sdn) {}
					
            @Override
            public void onTrackLimitationNotice(int i) {}
					
            @Override
            public void onScrubGeo(long l, long l1) {}
					
            @Override
            public void onException(Exception ex) {}
					
            @Override
            public void onStallWarning(StallWarning arg0) {
               // TODO Auto-generated method stub
            }
         };
				
         ConfigurationBuilder cb = new ConfigurationBuilder();
				
         cb.setDebugEnabled(true)
            .setOAuthConsumerKey(consumerKey)
            .setOAuthConsumerSecret(consumerSecret)
            .setOAuthAccessToken(accessToken)
            .setOAuthAccessTokenSecret(accessTokenSecret);
					
         _twitterStream = new TwitterStreamFactory(cb.build()).getInstance();
         _twitterStream.addListener(listener);
				
         if (keyWords.length == 0) {
            _twitterStream.sample();
         }else {
            FilterQuery query = new FilterQuery().track(keyWords);
            _twitterStream.filter(query);
         }
   }
			
   @Override
   public void nextTuple() {
      Status ret = queue.poll();
				
      if (ret == null) {
         Utils.sleep(50);
      } else {
         _collector.emit(new Values(ret));
      }
   }
			
   @Override
   public void close() {
      _twitterStream.shutdown();
   }
			
   @Override
   public Map<String, Object> getComponentConfiguration() {
      Config ret = new Config();
      ret.setMaxTaskParallelism(1);
      return ret;
   }
			
   @Override
   public void ack(Object id) {}
			
   @Override
   public void fail(Object id) {}
			
   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("tweet"));
   }
}

Hashtag Reader Bolt

Il tweet emesso dallo spout verrà inoltrato a HashtagReaderBolt, che elaborerà il tweet ed emetterà tutti gli hashtag disponibili. HashtagReaderBolt utilizzagetHashTagEntitiesmetodo fornito da twitter4j. getHashTagEntities legge il tweet e restituisce l'elenco degli hashtag. Il codice completo del programma è il seguente:

Codifica: HashtagReaderBolt.java

import java.util.HashMap;
import java.util.Map;

import twitter4j.*;
import twitter4j.conf.*;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

public class HashtagReaderBolt implements IRichBolt {
   private OutputCollector collector;

   @Override
   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      this.collector = collector;
   }

   @Override
   public void execute(Tuple tuple) {
      Status tweet = (Status) tuple.getValueByField("tweet");
      for(HashtagEntity hashtage : tweet.getHashtagEntities()) {
         System.out.println("Hashtag: " + hashtage.getText());
         this.collector.emit(new Values(hashtage.getText()));
      }
   }

   @Override
   public void cleanup() {}

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("hashtag"));
   }
	
   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
	
}

Bullone contatore hashtag

L'hashtag emesso verrà inoltrato a HashtagCounterBolt. Questo bullone elaborerà tutti gli hashtag e salverà ogni hashtag e il suo conteggio in memoria utilizzando l'oggetto Java Map. Di seguito viene fornito il codice completo del programma.

Codifica: HashtagCounterBolt.java

import java.util.HashMap;
import java.util.Map;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;

import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

public class HashtagCounterBolt implements IRichBolt {
   Map<String, Integer> counterMap;
   private OutputCollector collector;

   @Override
   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      this.counterMap = new HashMap<String, Integer>();
      this.collector = collector;
   }

   @Override
   public void execute(Tuple tuple) {
      String key = tuple.getString(0);

      if(!counterMap.containsKey(key)){
         counterMap.put(key, 1);
      }else{
         Integer c = counterMap.get(key) + 1;
         counterMap.put(key, c);
      }
		
      collector.ack(tuple);
   }

   @Override
   public void cleanup() {
      for(Map.Entry<String, Integer> entry:counterMap.entrySet()){
         System.out.println("Result: " + entry.getKey()+" : " + entry.getValue());
      }
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("hashtag"));
   }
	
   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
	
}

Invio di una topologia

L'invio di una topologia è l'applicazione principale. La topologia di Twitter è composta daTwitterSampleSpout, HashtagReaderBolt, e HashtagCounterBolt. Il codice di programma seguente mostra come inviare una topologia.

Codifica: TwitterHashtagStorm.java

import java.util.*;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;

public class TwitterHashtagStorm {
   public static void main(String[] args) throws Exception{
      String consumerKey = args[0];
      String consumerSecret = args[1];
		
      String accessToken = args[2];
      String accessTokenSecret = args[3];
		
      String[] arguments = args.clone();
      String[] keyWords = Arrays.copyOfRange(arguments, 4, arguments.length);
		
      Config config = new Config();
      config.setDebug(true);
		
      TopologyBuilder builder = new TopologyBuilder();
      builder.setSpout("twitter-spout", new TwitterSampleSpout(consumerKey,
         consumerSecret, accessToken, accessTokenSecret, keyWords));

      builder.setBolt("twitter-hashtag-reader-bolt", new HashtagReaderBolt())
         .shuffleGrouping("twitter-spout");

      builder.setBolt("twitter-hashtag-counter-bolt", new HashtagCounterBolt())
         .fieldsGrouping("twitter-hashtag-reader-bolt", new Fields("hashtag"));
			
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("TwitterHashtagStorm", config,
         builder.createTopology());
      Thread.sleep(10000);
      cluster.shutdown();
   }
}

Creazione ed esecuzione dell'applicazione

L'applicazione completa ha quattro codici Java. Sono i seguenti:

  • TwitterSampleSpout.java
  • HashtagReaderBolt.java
  • HashtagCounterBolt.java
  • TwitterHashtagStorm.java

È possibile compilare l'applicazione utilizzando il seguente comando:

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:”/path/to/twitter4j/lib/*” *.java

Eseguire l'applicazione utilizzando i seguenti comandi:

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:”/path/to/twitter4j/lib/*”:.
TwitterHashtagStorm <customerkey> <customersecret> <accesstoken> <accesstokensecret>
<keyword1> <keyword2> … <keywordN>

Produzione

L'applicazione stamperà l'hashtag attualmente disponibile e il suo conteggio. L'output dovrebbe essere simile al seguente:

Result: jazztastic : 1
Result: foodie : 1
Result: Redskins : 1
Result: Recipe : 1
Result: cook : 1
Result: android : 1
Result: food : 2
Result: NoToxicHorseMeat : 1
Result: Purrs4Peace : 1
Result: livemusic : 1
Result: VIPremium : 1
Result: Frome : 1
Result: SundayRoast : 1
Result: Millennials : 1
Result: HealthWithKier : 1
Result: LPs30DaysofGratitude : 1
Result: cooking : 1
Result: gameinsight : 1
Result: Countryfile : 1
Result: androidgames : 1

Yahoo! Finance è il sito Web di notizie economiche e dati finanziari leader in Internet. Fa parte di Yahoo! e fornisce informazioni su notizie finanziarie, statistiche di mercato, dati di mercato internazionale e altre informazioni sulle risorse finanziarie a cui chiunque può accedere.

Se sei un utente Yahoo! registrato utente, puoi personalizzare Yahoo! Finanza per trarre vantaggio dalle sue determinate offerte. Yahoo! L'API Finance viene utilizzata per eseguire query sui dati finanziari da Yahoo!

Questa API visualizza i dati in ritardo di 15 minuti rispetto al tempo reale e aggiorna il database ogni minuto per accedere alle informazioni correnti relative alle scorte. Ora prendiamo uno scenario in tempo reale di un'azienda e vediamo come generare un avviso quando il valore delle sue azioni scende al di sotto di 100.

Creazione beccuccio

Lo scopo del beccuccio è quello di ottenere i dettagli dell'azienda ed emettere i prezzi a bulloni. È possibile utilizzare il seguente codice di programma per creare uno spout.

Codifica: YahooFinanceSpout.java

import java.util.*;
import java.io.*;
import java.math.BigDecimal;

//import yahoofinace packages
import yahoofinance.YahooFinance;
import yahoofinance.Stock;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;

public class YahooFinanceSpout implements IRichSpout {
   private SpoutOutputCollector collector;
   private boolean completed = false;
   private TopologyContext context;
	
   @Override
   public void open(Map conf, TopologyContext context, SpoutOutputCollector collector){
      this.context = context;
      this.collector = collector;
   }

   @Override
   public void nextTuple() {
      try {
         Stock stock = YahooFinance.get("INTC");
         BigDecimal price = stock.getQuote().getPrice();

         this.collector.emit(new Values("INTC", price.doubleValue()));
         stock = YahooFinance.get("GOOGL");
         price = stock.getQuote().getPrice();

         this.collector.emit(new Values("GOOGL", price.doubleValue()));
         stock = YahooFinance.get("AAPL");
         price = stock.getQuote().getPrice();

         this.collector.emit(new Values("AAPL", price.doubleValue()));
      } catch(Exception e) {}
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("company", "price"));
   }

   @Override
   public void close() {}
	
   public boolean isDistributed() {
      return false;
   }

   @Override
   public void activate() {}

   @Override
   public void deactivate() {}

   @Override
   public void ack(Object msgId) {}

   @Override
   public void fail(Object msgId) {}

   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
	
}

Creazione di bulloni

Qui lo scopo del bullone è elaborare i prezzi della società data quando i prezzi scendono al di sotto di 100. Utilizza l'oggetto Java Map per impostare l'avviso del limite di prezzo di interruzione come truequando i prezzi delle azioni scendono al di sotto di 100; altrimenti falso. Il codice completo del programma è il seguente:

Codifica: PriceCutOffBolt.java

import java.util.HashMap;
import java.util.Map;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;

import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.tuple.Tuple;

public class PriceCutOffBolt implements IRichBolt {
   Map<String, Integer> cutOffMap;
   Map<String, Boolean> resultMap;
	
   private OutputCollector collector;

   @Override
   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      this.cutOffMap = new HashMap <String, Integer>();
      this.cutOffMap.put("INTC", 100);
      this.cutOffMap.put("AAPL", 100);
      this.cutOffMap.put("GOOGL", 100);

      this.resultMap = new HashMap<String, Boolean>();
      this.collector = collector;
   }

   @Override
   public void execute(Tuple tuple) {
      String company = tuple.getString(0);
      Double price = tuple.getDouble(1);

      if(this.cutOffMap.containsKey(company)){
         Integer cutOffPrice = this.cutOffMap.get(company);

         if(price < cutOffPrice) {
            this.resultMap.put(company, true);
         } else {
            this.resultMap.put(company, false);
         }
      }
		
      collector.ack(tuple);
   }

   @Override
   public void cleanup() {
      for(Map.Entry<String, Boolean> entry:resultMap.entrySet()){
         System.out.println(entry.getKey()+" : " + entry.getValue());
      }
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("cut_off_price"));
   }
	
   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
	
}

Invio di una topologia

Questa è l'applicazione principale in cui YahooFinanceSpout.java e PriceCutOffBolt.java sono collegati insieme e producono una topologia. Il codice di programma seguente mostra come inviare una topologia.

Codifica: YahooFinanceStorm.java

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;

public class YahooFinanceStorm {
   public static void main(String[] args) throws Exception{
      Config config = new Config();
      config.setDebug(true);
		
      TopologyBuilder builder = new TopologyBuilder();
      builder.setSpout("yahoo-finance-spout", new YahooFinanceSpout());

      builder.setBolt("price-cutoff-bolt", new PriceCutOffBolt())
         .fieldsGrouping("yahoo-finance-spout", new Fields("company"));
			
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("YahooFinanceStorm", config, builder.createTopology());
      Thread.sleep(10000);
      cluster.shutdown();
   }
}

Creazione ed esecuzione dell'applicazione

L'applicazione completa ha tre codici Java. Sono i seguenti:

  • YahooFinanceSpout.java
  • PriceCutOffBolt.java
  • YahooFinanceStorm.java

L'applicazione può essere creata utilizzando il seguente comando:

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:”/path/to/yahoofinance/lib/*” *.java

L'applicazione può essere eseguita utilizzando il seguente comando:

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:”/path/to/yahoofinance/lib/*”:.
YahooFinanceStorm

Produzione

L'output sarà simile al seguente:

GOOGL : false
AAPL : false
INTC : true

Il framework Apache Storm supporta molte delle migliori applicazioni industriali odierne. Forniremo una breve panoramica di alcune delle applicazioni più importanti di Storm in questo capitolo.

Klout

Klout è un'applicazione che utilizza l'analisi dei social media per classificare i propri utenti in base all'influenza sociale online Klout Score, che è un valore numerico compreso tra 1 e 100. Klout utilizza l'astrazione Trident di Apache Storm per creare topologie complesse che trasmettono dati.

Il canale meteo

The Weather Channel utilizza le topologie Storm per acquisire i dati meteorologici. Ha collaborato con Twitter per abilitare la pubblicità informata sul tempo su Twitter e sulle applicazioni mobili.OpenSignal è un'azienda specializzata nella mappatura della copertura wireless. StormTag e WeatherSignalsono progetti basati sulle condizioni meteorologiche creati da OpenSignal. StormTag è una stazione meteorologica Bluetooth che si collega a un portachiavi. I dati meteo raccolti dal dispositivo vengono inviati all'app WeatherSignal e ai server OpenSignal.

Industria delle telecomunicazioni

I fornitori di telecomunicazioni elaborano milioni di telefonate al secondo. Eseguono analisi forensi su chiamate interrotte e scarsa qualità del suono. I record dei dettagli delle chiamate fluiscono a una velocità di milioni al secondo e Apache Storm li elabora in tempo reale e identifica eventuali schemi problematici. L'analisi della tempesta può essere utilizzata per migliorare continuamente la qualità delle chiamate.