Open For Ad

Open for Ad

Latest Posts

Kafka Consumer Groups and Replication & Failover: How replication helps to create failsafe topic

In this post we did setup multi broker Kafka environment with replication factor 3 and partition 1. We can alter partition count and same can be validated with describe command.
[centos@host01 config]$ kafka-topics.sh --zookeeper localhost:2181 --alter --topic Multibroker-App-Devinline --partitions 2
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!
[centos@host01 config]$ kafka-topics.sh --describe --zookeeper localhost:2181 --topic Multibroker-App-Devinline
Topic:Multibroker-App-Devinline PartitionCount:2 ReplicationFactor:3 Configs:
 Topic: Multibroker-App-Devinline Partition: 0 Leader: 103 Replicas: 101,103,102 Isr: 103,102,101
 Topic: Multibroker-App-Devinline Partition: 1 Leader: 102 Replicas: 102,103,101 Isr: 102,103,101

Kafka Producer (Multi broker Kafka Single Node environment):
package com.devinline.kafkasamples;

import java.util.Properties;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class KafkaMultiBrokerProducer {
 public static void main(String[] args) {
  String topicName = "Multibroker-App-Devinline";
  Properties props = new Properties();
  props.put("bootstrap.servers", "localhost:9092,localhost:9093");
  props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  props.put("acks", "all");
  Producer<String, String> producer = new KafkaProducer<String, String>(props);
  for (int i = 0; i < 5; i++) {
   String key = "Key" + i;
   String message = "Message from Kafka App Devinline " + i;
   producer.send(new ProducerRecord<String, String>(topicName, key, message));
  }
  System.out.println("Message sent successfully");
  producer.close();
 }
}

Kafka Consumer (Multi broker Kafka Single Node environment) : Below 
Consumer has group.id = "mygroup". Create another copy of below sample program say KafkaMultiBrokerConsumer2.

package com.devinline.kafkasamples;

import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class KafkaMultiBrokerConsumer1 {

 public static void main(String[] args) throws Exception {
  String topicName = "Multibroker-App-Devinline";
  String groupName = "mygroup";
  Properties props = new Properties();
  props.put("bootstrap.servers", "localhost:9092,localhost:9093");
  props.put("group.id", groupName);
  props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  
  KafkaConsumer<String, String> consumer = null;
  try {
   consumer = new KafkaConsumer<String, String>(props);
   consumer.subscribe(Arrays.asList(topicName));
   while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records) {
     System.out.printf("Message received -> partition = %d, offset = %d, key = %s, value = %s\n",
       record.partition(), record.offset(), record.key(), record.value());
    }
   }
  } catch (Exception ex) {
   ex.printStackTrace();
  } finally {
   consumer.close();
  }
 }

}

Run two instance of above consumer and run producer program. We observe that messages produced by producer is consumed by two consumer (from partition = 0 and partition = 1). Here Consumer Group of two consumer instances of same group id so messages are consumed by both the consumers of the consumer group.

Below sample output shows that message 0 and 4 is consumed by one consumer from partition = 1 and message 1, 2 , 3 is consumed by another consumer from partition = 0.

Output of consumer-1:
481 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.9.0.0
481 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : fc7243c2af4b2b4a
Message received -> partition = 0, offset = 20, key = Key1, value = Message from Kafka App Devinline 1
Message received -> partition = 0, offset = 21, key = Key2, value = Message from Kafka App Devinline 2
Message received -> partition = 0, offset = 22, key = Key3, value = Message from Kafka App Devinline 3

Output of consumer-2: 
288 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.9.0.0
288 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : fc7243c2af4b2b4a
27607 [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Attempt to heart beat failed since the group is rebalancing, try to re-join group.
Message received -> partition = 1, offset = 2, key = Key0, value = Message from Kafka App Devinline 0
Message received -> partition = 1, offset = 3, key = Key4, value = Message from Kafka App Devinline 4

Replication & broker failover: In Kafka replication helps broker failover, it means suppose one broker leader and it goes down then another broker takes place of leader and prevents broker failover. Below commands create topic with replication factor of 3 and with describe commands we displays leader.
  • Create a topic Failsafe-Topic with 1 partition and replication factor of 3
[centos@host01 kafka]$ kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 -partitions 1 --topic Failsafe-Devinline-Topic
Created topic "Failsafe-Devinline-Topic".
  • List all the topics and verify that the new topic is created
[centos@host01 kafka]$ kafka-topics.sh --list --zookeeper localhost:2181
Failsafe-Devinline-Topic
Multibroker-App
Multibroker-App-Devinline
__consumer_offsets
topic-devinline-1
  • Open a new terminal and start producer to the Failsafe-Topic
[centos@host01 kafka]$ kafka-console-producer.sh --broker-list localhost:9091,localhost:9092 --topic Failsafe-Devinline-Topic
>Message-1
>Message-2
>Message-3
  • Open a new terminal and start consumer to the Failsafe-Topic
[centos@host01 kafka]$ kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic Failsafe-Devinline-Topic --from-beginning
Message-1
Message-2
Message-3
  • Describe the topic details and verify the output similar to displayed below
[centos@host01 kafka]$ kafka-topics.sh --describe --zookeeper localhost:2181 --topic Failsafe-Devinline-Topic
Topic:Failsafe-Devinline-Topic PartitionCount:1 ReplicationFactor:3 Configs:
 Topic: Failsafe-Devinline-Topic Partition: 0 Leader: 101 Replicas: 101,102,103 Isr: 101,102,103
  • Switch to the terminal where the leader is running and shutdown the same by entering Ctrl + D
  • Describe the topic again and verify that new leader election. Isr (InSync replica) has two entry only. 
[centos@host01 kafka]$ kafka-topics.sh --describe --zookeeper localhost:2181 --topic Failsafe-Devinline-Topic
Topic:Failsafe-Devinline-Topic PartitionCount:1 ReplicationFactor:3 Configs:
 Topic: Failsafe-Devinline-Topic Partition: 0 Leader: 102 Replicas: 101,102,103 Isr: 102,103

  • Restart consumer again and describe topic details again. Newly started consumer added in Isr (InSync replica)

[centos@host01 kafka]$ kafka-topics.sh --describe --zookeeper localhost:2181 --topic Failsafe-Devinline-Topic
Topic:Failsafe-Devinline-Topic PartitionCount:1 ReplicationFactor:3 Configs:
 Topic: Failsafe-Devinline-Topic Partition: 0 Leader: 102 Replicas: 101,102,103 Isr: 102,103,101

Replication prevents Kafka broker failure by electing leader from available Isr consumer list.



7 comments:

  1. Wow, such an awesome blog you have written there and you and I get exactly what information I am looking for, in the third paragraph you put amazing effort to explain the theme of the content.
    고스톱

    ReplyDelete
  2. Nice post. I learn something more challenging on distinct blogs
    everyday. It will always be stimulating to read content off their
    writers and practice a little something from their store. I’d choose
    to use some with all the content in my small weblog whether you do
    not mind.
    스포츠토토

    ReplyDelete
  3. Hey, I just hopped over to your site via StumbleUpon. Not something
    I would normally read, but I liked your thoughts none the less.
    Thanks for making something worth reading.
    성인웹툰

    ReplyDelete
  4. I think this is among the most vital info for me. And I'm glad
    reading your article. But should remark on few general things, The
    web site style is wonderful, the articles is really great.
    토토사이트

    ReplyDelete
  5. Pretty nice post. I just stumbled upon your weblog and wanted to say that I have really enjoyed browsing your blog posts. After all I’ll be subscribing to your feed and I hope you write again soon 먹튀검증업체 I would like to write an article based on your article. When can I ask for a review?!

    ReplyDelete
  6. When did it start? The day I started surfing the Internet to read articles related to . I've been fond of seeing various sites related to 안전놀이터 around the world for over 10 years. Among them, I saw your site writing articles related to and I am very satisfied.

    ReplyDelete