Sep 5, 2018

Kafka Consumer Groups and Replication & Failover: How replication helps to create failsafe topic

In this post we did setup multi broker Kafka environment with replication factor 3 and partition 1. We can alter partition count and same can be validated with describe command.
[centos@host01 config]$ kafka-topics.sh --zookeeper localhost:2181 --alter --topic Multibroker-App-Devinline --partitions 2
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!
[centos@host01 config]$ kafka-topics.sh --describe --zookeeper localhost:2181 --topic Multibroker-App-Devinline
Topic:Multibroker-App-Devinline PartitionCount:2 ReplicationFactor:3 Configs:
 Topic: Multibroker-App-Devinline Partition: 0 Leader: 103 Replicas: 101,103,102 Isr: 103,102,101
 Topic: Multibroker-App-Devinline Partition: 1 Leader: 102 Replicas: 102,103,101 Isr: 102,103,101

Kafka Producer (Multi broker Kafka Single Node environment):
package com.devinline.kafkasamples;

import java.util.Properties;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class KafkaMultiBrokerProducer {
 public static void main(String[] args) {
  String topicName = "Multibroker-App-Devinline";
  Properties props = new Properties();
  props.put("bootstrap.servers", "localhost:9092,localhost:9093");
  props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  props.put("acks", "all");
  Producer<String, String> producer = new KafkaProducer<String, String>(props);
  for (int i = 0; i < 5; i++) {
   String key = "Key" + i;
   String message = "Message from Kafka App Devinline " + i;
   producer.send(new ProducerRecord<String, String>(topicName, key, message));
  }
  System.out.println("Message sent successfully");
  producer.close();
 }
}

Kafka Consumer (Multi broker Kafka Single Node environment) : Below 
Consumer has group.id = "mygroup". Create another copy of below sample program say KafkaMultiBrokerConsumer2.

package com.devinline.kafkasamples;

import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class KafkaMultiBrokerConsumer1 {

 public static void main(String[] args) throws Exception {
  String topicName = "Multibroker-App-Devinline";
  String groupName = "mygroup";
  Properties props = new Properties();
  props.put("bootstrap.servers", "localhost:9092,localhost:9093");
  props.put("group.id", groupName);
  props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  
  KafkaConsumer<String, String> consumer = null;
  try {
   consumer = new KafkaConsumer<String, String>(props);
   consumer.subscribe(Arrays.asList(topicName));
   while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records) {
     System.out.printf("Message received -> partition = %d, offset = %d, key = %s, value = %s\n",
       record.partition(), record.offset(), record.key(), record.value());
    }
   }
  } catch (Exception ex) {
   ex.printStackTrace();
  } finally {
   consumer.close();
  }
 }

}

Run two instance of above consumer and run producer program. We observe that messages produced by producer is consumed by two consumer (from partition = 0 and partition = 1). Here Consumer Group of two consumer instances of same group id so messages are consumed by both the consumers of the consumer group.

Below sample output shows that message 0 and 4 is consumed by one consumer from partition = 1 and message 1, 2 , 3 is consumed by another consumer from partition = 0.

Output of consumer-1:
481 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.9.0.0
481 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : fc7243c2af4b2b4a
Message received -> partition = 0, offset = 20, key = Key1, value = Message from Kafka App Devinline 1
Message received -> partition = 0, offset = 21, key = Key2, value = Message from Kafka App Devinline 2
Message received -> partition = 0, offset = 22, key = Key3, value = Message from Kafka App Devinline 3

Output of consumer-2: 
288 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.9.0.0
288 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : fc7243c2af4b2b4a
27607 [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Attempt to heart beat failed since the group is rebalancing, try to re-join group.
Message received -> partition = 1, offset = 2, key = Key0, value = Message from Kafka App Devinline 0
Message received -> partition = 1, offset = 3, key = Key4, value = Message from Kafka App Devinline 4

Replication & broker failover: In Kafka replication helps broker failover, it means suppose one broker leader and it goes down then another broker takes place of leader and prevents broker failover. Below commands create topic with replication factor of 3 and with describe commands we displays leader.
  • Create a topic Failsafe-Topic with 1 partition and replication factor of 3
[centos@host01 kafka]$ kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 -partitions 1 --topic Failsafe-Devinline-Topic
Created topic "Failsafe-Devinline-Topic".
  • List all the topics and verify that the new topic is created
[centos@host01 kafka]$ kafka-topics.sh --list --zookeeper localhost:2181
Failsafe-Devinline-Topic
Multibroker-App
Multibroker-App-Devinline
__consumer_offsets
topic-devinline-1
  • Open a new terminal and start producer to the Failsafe-Topic
[centos@host01 kafka]$ kafka-console-producer.sh --broker-list localhost:9091,localhost:9092 --topic Failsafe-Devinline-Topic
>Message-1
>Message-2
>Message-3
  • Open a new terminal and start consumer to the Failsafe-Topic
[centos@host01 kafka]$ kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic Failsafe-Devinline-Topic --from-beginning
Message-1
Message-2
Message-3
  • Describe the topic details and verify the output similar to displayed below
[centos@host01 kafka]$ kafka-topics.sh --describe --zookeeper localhost:2181 --topic Failsafe-Devinline-Topic
Topic:Failsafe-Devinline-Topic PartitionCount:1 ReplicationFactor:3 Configs:
 Topic: Failsafe-Devinline-Topic Partition: 0 Leader: 101 Replicas: 101,102,103 Isr: 101,102,103
  • Switch to the terminal where the leader is running and shutdown the same by entering Ctrl + D
  • Describe the topic again and verify that new leader election. Isr (InSync replica) has two entry only. 
[centos@host01 kafka]$ kafka-topics.sh --describe --zookeeper localhost:2181 --topic Failsafe-Devinline-Topic
Topic:Failsafe-Devinline-Topic PartitionCount:1 ReplicationFactor:3 Configs:
 Topic: Failsafe-Devinline-Topic Partition: 0 Leader: 102 Replicas: 101,102,103 Isr: 102,103

  • Restart consumer again and describe topic details again. Newly started consumer added in Isr (InSync replica)

[centos@host01 kafka]$ kafka-topics.sh --describe --zookeeper localhost:2181 --topic Failsafe-Devinline-Topic
Topic:Failsafe-Devinline-Topic PartitionCount:1 ReplicationFactor:3 Configs:
 Topic: Failsafe-Devinline-Topic Partition: 0 Leader: 102 Replicas: 101,102,103 Isr: 102,103,101

Replication prevents Kafka broker failure by electing leader from available Isr consumer list.



Location: Bengaluru, Karnataka, India