Apache Kafka - Semplice esempio di produttore

Creiamo un'applicazione per la pubblicazione e il consumo di messaggi utilizzando un client Java. Il client del produttore Kafka è costituito dalle seguenti API.

API KafkaProducer

Cerchiamo di capire il set più importante di API del produttore di Kafka in questa sezione. La parte centrale dell'API KafkaProducer è la classe KafkaProducer . La classe KafkaProducer fornisce un'opzione per connettere un broker Kafka nel suo costruttore con i seguenti metodi.

  • La classe KafkaProducer fornisce il metodo di invio per inviare messaggi in modo asincrono a un argomento. La firma di send () è la seguente

producer.send(new ProducerRecord<byte[],byte[]>(topic, 
partition, key1, value1) , callback);
  • ProducerRecord - Il produttore gestisce un buffer di record in attesa di essere inviati.

  • Callback - Un callback fornito dall'utente da eseguire quando il record è stato riconosciuto dal server (null indica nessuna richiamata).

  • La classe KafkaProducer fornisce un metodo di svuotamento per garantire che tutti i messaggi inviati in precedenza siano stati effettivamente completati. La sintassi del metodo flush è la seguente:

public void flush()
  • La classe KafkaProducer fornisce il metodo partitionFor, che aiuta a ottenere i metadati della partizione per un determinato argomento. Questo può essere utilizzato per il partizionamento personalizzato. La firma di questo metodo è la seguente:

public Map metrics()

Restituisce la mappa delle metriche interne gestite dal produttore.

  • public void close () - La classe KafkaProducer fornisce blocchi di metodi di chiusura fino al completamento di tutte le richieste inviate in precedenza.

Producer API

La parte centrale dell'API Producer è la classe Producer . La classe Producer fornisce un'opzione per connettere il broker Kafka nel suo costruttore con i seguenti metodi.

La classe del produttore

La classe producer fornisce il metodo di invio a send messaggi a uno o più argomenti utilizzando le seguenti firme.

public void send(KeyedMessaget<k,v> message) 
- sends the data to a single topic,par-titioned by key using either sync or async producer.
public void send(List<KeyedMessage<k,v>>messages)
- sends data to multiple topics.
Properties prop = new Properties();
prop.put(producer.type,”async”)
ProducerConfig config = new ProducerConfig(prop);

Esistono due tipi di produttori: Sync e Async.

La stessa configurazione API si applica anche al produttore di sincronizzazione . La differenza tra loro è che un produttore di sincronizzazione invia i messaggi direttamente, ma invia i messaggi in background. Il produttore asincrono è preferito quando si desidera una velocità effettiva maggiore. Nelle versioni precedenti come la 0.8, un produttore asincrono non dispone di una callback per send () per registrare i gestori di errori. Questo è disponibile solo nella versione corrente di 0.9.

public void close ()

La classe del produttore fornisce close metodo per chiudere le connessioni del pool di produttori a tutti i broker Kafka.

Impostazioni di configurazione

Le principali impostazioni di configurazione dell'API Producer sono elencate nella tabella seguente per una migliore comprensione:

S.No Impostazioni e descrizione della configurazione
1

client.id

identifica l'applicazione del produttore

2

producer.type

sincronizzato o asincrono

3

acks

La configurazione acks controlla i criteri in base alle richieste del produttore sono considerati completi.

4

retries

Se la richiesta del produttore non riesce, riprova automaticamente con un valore specifico.

5

bootstrap.servers

elenco bootstrap di broker.

6

linger.ms

se vuoi ridurre il numero di richieste puoi impostare linger.ms su qualcosa di più grande di un certo valore.

7

key.serializer

Chiave per l'interfaccia del serializzatore.

8

value.serializer

valore per l'interfaccia del serializzatore.

9

batch.size

Dimensione buffer.

10

buffer.memory

controlla la quantità totale di memoria disponibile per il produttore per il buff-ering.

API ProducerRecord

ProducerRecord è una coppia chiave / valore che viene inviata al costruttore di classi Kafka cluster.ProducerRecord per la creazione di un record con coppie di partizione, chiave e valore utilizzando la seguente firma.

public ProducerRecord (string topic, int partition, k key, v value)
  • Topic - nome dell'argomento definito dall'utente che verrà aggiunto alla registrazione.

  • Partition - conteggio delle partizioni

  • Key - La chiave che verrà inclusa nel record.

  • Value - Registra i contenuti
public ProducerRecord (string topic, k key, v value)

Il costruttore della classe ProducerRecord viene utilizzato per creare un record con chiavi, coppie di valori e senza partizione.

  • Topic - Crea un argomento per assegnare il record.

  • Key - chiave per la cronaca.

  • Value - registrare i contenuti.

public ProducerRecord (string topic, v value)

La classe ProducerRecord crea un record senza partizione e chiave.

  • Topic - crea un argomento.

  • Value - registrare i contenuti.

I metodi della classe ProducerRecord sono elencati nella tabella seguente:

S.No Metodi di classe e descrizione
1

public string topic()

L'argomento verrà aggiunto al record.

2

public K key()

Chiave che verrà inclusa nel record. In assenza di tale chiave, qui verrà restituito null.

3

public V value()

Registra i contenuti.

4

partition()

Conteggio delle partizioni per il record

Applicazione SimpleProducer

Prima di creare l'applicazione, avvia prima ZooKeeper e il broker Kafka, quindi crea il tuo argomento nel broker Kafka utilizzando il comando crea argomento. Successivamente crea una classe java chiamata Sim-pleProducer.java e digita la seguente codifica.

//import util.properties packages
import java.util.Properties;

//import simple producer packages
import org.apache.kafka.clients.producer.Producer;

//import KafkaProducer packages
import org.apache.kafka.clients.producer.KafkaProducer;

//import ProducerRecord packages
import org.apache.kafka.clients.producer.ProducerRecord;

//Create java class named “SimpleProducer”
public class SimpleProducer {
   
   public static void main(String[] args) throws Exception{
      
      // Check arguments length value
      if(args.length == 0){
         System.out.println("Enter topic name”);
         return;
      }
      
      //Assign topicName to string variable
      String topicName = args[0].toString();
      
      // create instance for properties to access producer configs   
      Properties props = new Properties();
      
      //Assign localhost id
      props.put("bootstrap.servers", “localhost:9092");
      
      //Set acknowledgements for producer requests.      
      props.put("acks", “all");
      
      //If the request fails, the producer can automatically retry,
      props.put("retries", 0);
      
      //Specify buffer size in config
      props.put("batch.size", 16384);
      
      //Reduce the no of requests less than 0   
      props.put("linger.ms", 1);
      
      //The buffer.memory controls the total amount of memory available to the producer for buffering.   
      props.put("buffer.memory", 33554432);
      
      props.put("key.serializer", 
         "org.apache.kafka.common.serializa-tion.StringSerializer");
         
      props.put("value.serializer", 
         "org.apache.kafka.common.serializa-tion.StringSerializer");
      
      Producer<String, String> producer = new KafkaProducer
         <String, String>(props);
            
      for(int i = 0; i < 10; i++)
         producer.send(new ProducerRecord<String, String>(topicName, 
            Integer.toString(i), Integer.toString(i)));
               System.out.println(“Message sent successfully”);
               producer.close();
   }
}

Compilation - L'applicazione può essere compilata utilizzando il seguente comando.

javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” *.java

Execution - L'applicazione può essere eseguita utilizzando il seguente comando.

java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:. SimpleProducer <topic-name>

Output

Message sent successfully
To check the above output open new terminal and type Consumer CLI command to receive messages.
>> bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic <topic-name> —from-beginning
1
2
3
4
5
6
7
8
9
10

Semplice esempio di consumatore

A partire da ora abbiamo creato un produttore per inviare messaggi al cluster Kafka. Ora creiamo un consumatore per consumare i messaggi dal cluster Kafka. L'API KafkaConsumer viene utilizzata per consumare i messaggi dal cluster Kafka. Il costruttore della classe KafkaConsumer è definito di seguito.

public KafkaConsumer(java.util.Map<java.lang.String,java.lang.Object> configs)

configs - Restituire una mappa delle configurazioni dei consumatori.

La classe KafkaConsumer dispone dei seguenti metodi significativi elencati nella tabella seguente.

S.No Metodo e descrizione
1

public java.util.Set<TopicPar-tition> assignment()

Ottieni il set di partizioni attualmente assegnato dal consumatore.

2

public string subscription()

Sottoscrivi l'elenco di argomenti fornito per ottenere partizioni dinamicamente con firma.

3

public void sub-scribe(java.util.List<java.lang.String> topics, ConsumerRe-balanceListener listener)

Sottoscrivi l'elenco di argomenti fornito per ottenere partizioni dinamicamente con firma.

4

public void unsubscribe()

Annulla la sottoscrizione degli argomenti dall'elenco di partizioni fornito.

5

public void sub-scribe(java.util.List<java.lang.String> topics)

Sottoscrivi l'elenco di argomenti fornito per ottenere partizioni dinamicamente con firma. Se l'elenco di argomenti fornito è vuoto, viene considerato come unsubscribe ().

6

public void sub-scribe(java.util.regex.Pattern pattern, ConsumerRebalanceLis-tener listener)

Il modello di argomento si riferisce al modello di sottoscrizione nel formato dell'espressione regolare e l'argomento del listener riceve le notifiche dal modello di sottoscrizione.

7

public void as-sign(java.util.List<TopicParti-tion> partitions)

Assegna manualmente un elenco di partizioni al cliente.

8

poll()

Recupera i dati per gli argomenti o le partizioni specificate utilizzando una delle API di sottoscrizione / assegnazione. Questo restituirà un errore, se gli argomenti non vengono sottoscritti prima del polling dei dati.

9

public void commitSync()

Gli offset di commit restituiti nell'ultimo sondaggio () per tutti gli elenchi di argomenti e partizioni con sottoscrizione. La stessa operazione viene applicata a commitAsyn ().

10

public void seek(TopicPartition partition, long offset)

Recupera il valore di offset corrente che il consumatore utilizzerà nel successivo metodo poll ().

11

public void resume()

Riprendi le partizioni sospese.

12

public void wakeup()

Sveglia il consumatore.

API ConsumerRecord

L'API ConsumerRecord viene utilizzata per ricevere record dal cluster Kafka. Questa API è composta da un nome di argomento, un numero di partizione, da cui viene ricevuto il record e un offset che punta al record in una partizione Kafka. La classe ConsumerRecord viene utilizzata per creare un record consumer con nome argomento specifico, conteggio partizioni e coppie <chiave, valore>. Ha la seguente firma.

public ConsumerRecord(string topic,int partition, long offset,K key, V value)
  • Topic - Il nome dell'argomento per il record del consumatore ricevuto dal cluster Kafka.

  • Partition - Partizione per l'argomento.

  • Key - La chiave del record, se non esiste alcuna chiave verrà restituito null.

  • Value - Registra i contenuti.

API ConsumerRecords

L'API ConsumerRecords funge da contenitore per ConsumerRecord. Questa API viene utilizzata per mantenere l'elenco di ConsumerRecord per partizione per un particolare argomento. Il suo costruttore è definito di seguito.

public ConsumerRecords(java.util.Map<TopicPartition,java.util.List
<Consumer-Record>K,V>>> records)
  • TopicPartition - Restituisce una mappa della partizione per un particolare argomento.

  • Records - Elenco di restituzione di ConsumerRecord.

La classe ConsumerRecords ha i seguenti metodi definiti.

S.No Metodi e descrizione
1

public int count()

Il numero di record per tutti gli argomenti.

2

public Set partitions()

Il set di partizioni con i dati in questo set di record (se non sono stati restituiti dati, il set è vuoto).

3

public Iterator iterator()

Iterator ti consente di scorrere una raccolta, ottenendo o rimuovendo elementi.

4

public List records()

Ottieni l'elenco dei record per la partizione data.

Impostazioni di configurazione

Le impostazioni di configurazione per le impostazioni di configurazione principali dell'API client consumer sono elencate di seguito:

S.No Impostazioni e descrizione
1

bootstrap.servers

Elenco di bootstrap dei broker.

2

group.id

Assegna un singolo consumatore a un gruppo.

3

enable.auto.commit

Abilita il commit automatico per gli offset se il valore è vero, altrimenti non confermato.

4

auto.commit.interval.ms

Restituisce la frequenza con cui gli offset consumati aggiornati vengono scritti in ZooKeeper.

5

session.timeout.ms

Indica quanti millisecondi Kafka attenderà affinché ZooKeeper risponda a una richiesta (in lettura o scrittura) prima di rinunciare e continuare a consumare messaggi.

Applicazione SimpleConsumer

I passaggi dell'applicazione del produttore rimangono gli stessi qui. Per prima cosa, avvia il tuo broker ZooKeeper e Kafka. Quindi creare un'applicazione SimpleConsumer con la classe java denominata SimpleCon-sumer.java e digitare il codice seguente.

import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public class SimpleConsumer {
   public static void main(String[] args) throws Exception {
      if(args.length == 0){
         System.out.println("Enter topic name");
         return;
      }
      //Kafka consumer configuration settings
      String topicName = args[0].toString();
      Properties props = new Properties();
      
      props.put("bootstrap.servers", "localhost:9092");
      props.put("group.id", "test");
      props.put("enable.auto.commit", "true");
      props.put("auto.commit.interval.ms", "1000");
      props.put("session.timeout.ms", "30000");
      props.put("key.deserializer", 
         "org.apache.kafka.common.serializa-tion.StringDeserializer");
      props.put("value.deserializer", 
         "org.apache.kafka.common.serializa-tion.StringDeserializer");
      KafkaConsumer<String, String> consumer = new KafkaConsumer
         <String, String>(props);
      
      //Kafka Consumer subscribes list of topics here.
      consumer.subscribe(Arrays.asList(topicName))
      
      //print the topic name
      System.out.println("Subscribed to topic " + topicName);
      int i = 0;
      
      while (true) {
         ConsumerRecords<String, String> records = con-sumer.poll(100);
         for (ConsumerRecord<String, String> record : records)
         
         // print the offset,key and value for the consumer records.
         System.out.printf("offset = %d, key = %s, value = %s\n", 
            record.offset(), record.key(), record.value());
      }
   }
}

Compilation - L'applicazione può essere compilata utilizzando il seguente comando.

javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” *.java

Execution − L'applicazione può essere eseguita utilizzando il seguente comando

java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:. SimpleConsumer <topic-name>

Input- Apri la CLI del produttore e invia alcuni messaggi all'argomento. Puoi inserire l'input semplice come "Hello Consumer".

Output - Di seguito sarà l'output.

Subscribed to topic Hello-Kafka
offset = 3, key = null, value = Hello Consumer