Applicazione in tempo reale (Twitter)

Analizziamo un'applicazione in tempo reale per ottenere gli ultimi feed di Twitter e i suoi hashtag. In precedenza, abbiamo visto l'integrazione di Storm e Spark con Kafka. In entrambi gli scenari, abbiamo creato un Kafka Producer (utilizzando cli) per inviare messaggi all'ecosistema Kafka. Quindi, l'integrazione di tempesta e scintilla legge i messaggi utilizzando il consumatore Kafka e lo inietta rispettivamente nell'ecosistema tempesta e scintilla. Quindi, praticamente dobbiamo creare un produttore Kafka, che dovrebbe:

  • Leggere i feed di Twitter utilizzando "Twitter Streaming API",
  • Elaborare i feed,
  • Estrai gli hashtag e
  • Mandalo a Kafka.

Una volta che gli hashtag vengono ricevuti da Kafka, l'integrazione di Storm / Spark riceve le informazioni e le invia all'ecosistema Storm / Spark.

Twitter Streaming API

È possibile accedere alla "Twitter Streaming API" in qualsiasi linguaggio di programmazione. "Twitter4j" è una libreria Java non ufficiale, open source, che fornisce un modulo basato su Java per accedere facilmente alla "Twitter Streaming API". Il "twitter4j" fornisce un framework basato sull'ascoltatore per accedere ai tweet. Per accedere alla "Twitter Streaming API", dobbiamo accedere all'account sviluppatore Twitter e dovremmo ottenere quanto segueOAuth dettagli di autenticazione.

  • Customerkey
  • CustomerSecret
  • AccessToken
  • AccessTookenSecret

Una volta creato l'account sviluppatore, scarica i file jar "twitter4j" e inseriscili nel percorso della classe java.

La codifica completa del produttore di Twitter Kafka (KafkaTwitterProducer.java) è elencata di seguito:

import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.LinkedBlockingQueue;

import twitter4j.*;
import twitter4j.conf.*;

import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class KafkaTwitterProducer {
   public static void main(String[] args) throws Exception {
      LinkedBlockingQueue<Status> queue = new LinkedBlockingQueue<Sta-tus>(1000);
      
      if(args.length < 5){
         System.out.println(
            "Usage: KafkaTwitterProducer <twitter-consumer-key>
            <twitter-consumer-secret> <twitter-access-token>
            <twitter-access-token-secret>
            <topic-name> <twitter-search-keywords>");
         return;
      }
      
      String consumerKey = args[0].toString();
      String consumerSecret = args[1].toString();
      String accessToken = args[2].toString();
      String accessTokenSecret = args[3].toString();
      String topicName = args[4].toString();
      String[] arguments = args.clone();
      String[] keyWords = Arrays.copyOfRange(arguments, 5, arguments.length);

      ConfigurationBuilder cb = new ConfigurationBuilder();
      cb.setDebugEnabled(true)
         .setOAuthConsumerKey(consumerKey)
         .setOAuthConsumerSecret(consumerSecret)
         .setOAuthAccessToken(accessToken)
         .setOAuthAccessTokenSecret(accessTokenSecret);

      TwitterStream twitterStream = new TwitterStreamFactory(cb.build()).get-Instance();
      StatusListener listener = new StatusListener() {
        
         @Override
         public void onStatus(Status status) {      
            queue.offer(status);

            // System.out.println("@" + status.getUser().getScreenName() 
               + " - " + status.getText());
            // System.out.println("@" + status.getUser().getScreen-Name());

            /*for(URLEntity urle : status.getURLEntities()) {
               System.out.println(urle.getDisplayURL());
            }*/

            /*for(HashtagEntity hashtage : status.getHashtagEntities()) {
               System.out.println(hashtage.getText());
            }*/
         }
         
         @Override
         public void onDeletionNotice(StatusDeletionNotice statusDeletion-Notice) {
            // System.out.println("Got a status deletion notice id:" 
               + statusDeletionNotice.getStatusId());
         }
         
         @Override
         public void onTrackLimitationNotice(int numberOfLimitedStatuses) {
            // System.out.println("Got track limitation notice:" + 
               num-berOfLimitedStatuses);
         }

         @Override
         public void onScrubGeo(long userId, long upToStatusId) {
            // System.out.println("Got scrub_geo event userId:" + userId + 
            "upToStatusId:" + upToStatusId);
         }      
         
         @Override
         public void onStallWarning(StallWarning warning) {
            // System.out.println("Got stall warning:" + warning);
         }
         
         @Override
         public void onException(Exception ex) {
            ex.printStackTrace();
         }
      };
      twitterStream.addListener(listener);
      
      FilterQuery query = new FilterQuery().track(keyWords);
      twitterStream.filter(query);

      Thread.sleep(5000);
      
      //Add Kafka producer config settings
      Properties props = new Properties();
      props.put("bootstrap.servers", "localhost:9092");
      props.put("acks", "all");
      props.put("retries", 0);
      props.put("batch.size", 16384);
      props.put("linger.ms", 1);
      props.put("buffer.memory", 33554432);
      
      props.put("key.serializer", 
         "org.apache.kafka.common.serializa-tion.StringSerializer");
      props.put("value.serializer", 
         "org.apache.kafka.common.serializa-tion.StringSerializer");
      
      Producer<String, String> producer = new KafkaProducer<String, String>(props);
      int i = 0;
      int j = 0;
      
      while(i < 10) {
         Status ret = queue.poll();
         
         if (ret == null) {
            Thread.sleep(100);
            i++;
         }else {
            for(HashtagEntity hashtage : ret.getHashtagEntities()) {
               System.out.println("Hashtag: " + hashtage.getText());
               producer.send(new ProducerRecord<String, String>(
                  top-icName, Integer.toString(j++), hashtage.getText()));
            }
         }
      }
      producer.close();
      Thread.sleep(5000);
      twitterStream.shutdown();
   }
}

Compilazione

Compilare l'applicazione utilizzando il seguente comando:

javac -cp “/path/to/kafka/libs/*”:”/path/to/twitter4j/lib/*”:. KafkaTwitterProducer.java

Esecuzione

Apri due console. Eseguire l'applicazione compilata sopra come mostrato di seguito in una console.

java -cp “/path/to/kafka/libs/*”:”/path/to/twitter4j/lib/*”:
. KafkaTwitterProducer <twitter-consumer-key>
<twitter-consumer-secret>
<twitter-access-token>
<twitter-ac-cess-token-secret>
my-first-topic food

Esegui una qualsiasi delle applicazioni Spark / Storm spiegate nel capitolo precedente in un'altra finestra. Il punto principale da notare è che l'argomento utilizzato dovrebbe essere lo stesso in entrambi i casi. In questo caso, abbiamo utilizzato "il mio primo argomento" come nome dell'argomento.

Produzione

L'output di questa applicazione dipenderà dalle parole chiave e dal feed corrente di Twitter. Di seguito è specificato un output di esempio (integrazione storm).

. . .
food : 1
foodie : 2
burger : 1
. . .