Apache Kafka - Esempio di gruppo di consumatori

Il gruppo di consumatori è un consumo multi-thread o multi-macchina degli argomenti Kafka.

Gruppo di consumatori

  • I consumatori possono unirsi a un gruppo utilizzando lo stesso group.id.

  • Il parallelismo massimo di un gruppo è che il numero di consumatori nel gruppo ← no di partizioni.

  • Kafka assegna le partizioni di un argomento al consumatore in un gruppo, in modo che ogni partizione venga consumata da esattamente un consumatore nel gruppo.

  • Kafka garantisce che un messaggio venga letto solo da un singolo consumatore del gruppo.

  • I consumatori possono visualizzare il messaggio nell'ordine in cui sono stati memorizzati nel registro.

Riequilibrio di un consumatore

L'aggiunta di più processi / thread causerà il riequilibrio di Kafka. Se un consumatore o un broker non riesce a inviare heartbeat a ZooKeeper, può essere riconfigurato tramite il cluster Kafka. Durante questo riequilibrio, Kafka assegnerà le partizioni disponibili ai thread disponibili, eventualmente spostando una partizione in un altro processo.

import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public class ConsumerGroup {
   public static void main(String[] args) throws Exception {
      if(args.length < 2){
         System.out.println("Usage: consumer <topic> <groupname>");
         return;
      }
      
      String topic = args[0].toString();
      String group = args[1].toString();
      Properties props = new Properties();
      props.put("bootstrap.servers", "localhost:9092");
      props.put("group.id", group);
      props.put("enable.auto.commit", "true");
      props.put("auto.commit.interval.ms", "1000");
      props.put("session.timeout.ms", "30000");
      props.put("key.deserializer",          
         "org.apache.kafka.common.serialization.ByteArraySerializer");
      props.put("value.deserializer", 
         "org.apache.kafka.common.serializa-tion.StringDeserializer");
      KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
      
      consumer.subscribe(Arrays.asList(topic));
      System.out.println("Subscribed to topic " + topic);
      int i = 0;
         
      while (true) {
         ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records)
               System.out.printf("offset = %d, key = %s, value = %s\n", 
               record.offset(), record.key(), record.value());
      }     
   }  
}

Compilazione

javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/libs/*" ConsumerGroup.java

Esecuzione

>>java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/libs/*":. 
ConsumerGroup <topic-name> my-group
>>java -cp "/home/bala/Workspace/kafka/kafka_2.11-0.9.0.0/libs/*":. 
ConsumerGroup <topic-name> my-group

Qui abbiamo creato un nome di gruppo di esempio come mio-gruppo con due consumatori. Allo stesso modo, puoi creare il tuo gruppo e il numero di consumatori nel gruppo.

Ingresso

Apri la CLI del produttore e invia alcuni messaggi come:

Test consumer group 01
Test consumer group 02

Output del primo processo

Subscribed to topic Hello-kafka
offset = 3, key = null, value = Test consumer group 01

Risultato del secondo processo

Subscribed to topic Hello-kafka
offset = 3, key = null, value = Test consumer group 02

Ora si spera che tu abbia capito SimpleConsumer e ConsumeGroup utilizzando la demo del client Java. Ora hai un'idea su come inviare e ricevere messaggi utilizzando un client Java. Continuiamo l'integrazione di Kafka con le tecnologie dei big data nel prossimo capitolo.