Apache Kafka - Esempio di gruppo di consumatori
Il gruppo di consumatori è un consumo multi-thread o multi-macchina degli argomenti Kafka.
Gruppo di consumatori
- I consumatori possono unirsi a un gruppo utilizzando lo stesso - group.id. 
- Il parallelismo massimo di un gruppo è che il numero di consumatori nel gruppo ← no di partizioni. 
- Kafka assegna le partizioni di un argomento al consumatore in un gruppo, in modo che ogni partizione venga consumata da esattamente un consumatore nel gruppo. 
- Kafka garantisce che un messaggio venga letto solo da un singolo consumatore del gruppo. 
- I consumatori possono visualizzare il messaggio nell'ordine in cui sono stati memorizzati nel registro. 
Riequilibrio di un consumatore
L'aggiunta di più processi / thread causerà il riequilibrio di Kafka. Se un consumatore o un broker non riesce a inviare heartbeat a ZooKeeper, può essere riconfigurato tramite il cluster Kafka. Durante questo riequilibrio, Kafka assegnerà le partizioni disponibili ai thread disponibili, eventualmente spostando una partizione in un altro processo.
import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
public class ConsumerGroup {
   public static void main(String[] args) throws Exception {
      if(args.length < 2){
         System.out.println("Usage: consumer <topic> <groupname>");
         return;
      }
      
      String topic = args[0].toString();
      String group = args[1].toString();
      Properties props = new Properties();
      props.put("bootstrap.servers", "localhost:9092");
      props.put("group.id", group);
      props.put("enable.auto.commit", "true");
      props.put("auto.commit.interval.ms", "1000");
      props.put("session.timeout.ms", "30000");
      props.put("key.deserializer",          
         "org.apache.kafka.common.serialization.ByteArraySerializer");
      props.put("value.deserializer", 
         "org.apache.kafka.common.serializa-tion.StringDeserializer");
      KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
      
      consumer.subscribe(Arrays.asList(topic));
      System.out.println("Subscribed to topic " + topic);
      int i = 0;
         
      while (true) {
         ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records)
               System.out.printf("offset = %d, key = %s, value = %s\n", 
               record.offset(), record.key(), record.value());
      }     
   }  
}Compilazione
javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/libs/*" ConsumerGroup.javaEsecuzione
>>java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/libs/*":. 
ConsumerGroup <topic-name> my-group
>>java -cp "/home/bala/Workspace/kafka/kafka_2.11-0.9.0.0/libs/*":. 
ConsumerGroup <topic-name> my-groupQui abbiamo creato un nome di gruppo di esempio come mio-gruppo
 con due consumatori. Allo stesso modo, puoi creare il tuo gruppo e il numero di consumatori nel gruppo.
Ingresso
Apri la CLI del produttore e invia alcuni messaggi come:
Test consumer group 01
Test consumer group 02Output del primo processo
Subscribed to topic Hello-kafka
offset = 3, key = null, value = Test consumer group 01Risultato del secondo processo
Subscribed to topic Hello-kafka
offset = 3, key = null, value = Test consumer group 02Ora si spera che tu abbia capito SimpleConsumer e ConsumeGroup utilizzando la demo del client Java. Ora hai un'idea su come inviare e ricevere messaggi utilizzando un client Java. Continuiamo l'integrazione di Kafka con le tecnologie dei big data nel prossimo capitolo.
