[centos@host01 config]$ kafka-topics.sh --zookeeper localhost:2181 --alter --topic Multibroker-App-Devinline --partitions 2 WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected Adding partitions succeeded! [centos@host01 config]$ kafka-topics.sh --describe --zookeeper localhost:2181 --topic Multibroker-App-Devinline Topic:Multibroker-App-Devinline PartitionCount:2 ReplicationFactor:3 Configs: Topic: Multibroker-App-Devinline Partition: 0 Leader: 103 Replicas: 101,103,102 Isr: 103,102,101 Topic: Multibroker-App-Devinline Partition: 1 Leader: 102 Replicas: 102,103,101 Isr: 102,103,101
Kafka Producer (Multi broker Kafka Single Node environment):
package com.devinline.kafkasamples; import java.util.Properties; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; public class KafkaMultiBrokerProducer { public static void main(String[] args) { String topicName = "Multibroker-App-Devinline"; Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092,localhost:9093"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("acks", "all"); Producer<String, String> producer = new KafkaProducer<String, String>(props); for (int i = 0; i < 5; i++) { String key = "Key" + i; String message = "Message from Kafka App Devinline " + i; producer.send(new ProducerRecord<String, String>(topicName, key, message)); } System.out.println("Message sent successfully"); producer.close(); } }
Kafka Consumer (Multi broker Kafka Single Node environment) : Below Consumer has group.id = "mygroup". Create another copy of below sample program say KafkaMultiBrokerConsumer2.
package com.devinline.kafkasamples; import java.util.Arrays; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; public class KafkaMultiBrokerConsumer1 { public static void main(String[] args) throws Exception { String topicName = "Multibroker-App-Devinline"; String groupName = "mygroup"; Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092,localhost:9093"); props.put("group.id", groupName); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = null; try { consumer = new KafkaConsumer<String, String>(props); consumer.subscribe(Arrays.asList(topicName)); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("Message received -> partition = %d, offset = %d, key = %s, value = %s\n", record.partition(), record.offset(), record.key(), record.value()); } } } catch (Exception ex) { ex.printStackTrace(); } finally { consumer.close(); } } }
Run two instance of above consumer and run producer program. We observe that messages produced by producer is consumed by two consumer (from partition = 0 and partition = 1). Here Consumer Group of two consumer instances of same group id so messages are consumed by both the consumers of the consumer group.
Below sample output shows that message 0 and 4 is consumed by one consumer from partition = 1 and message 1, 2 , 3 is consumed by another consumer from partition = 0.
Output of consumer-1:
481 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.9.0.0
481 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : fc7243c2af4b2b4a
Message received -> partition = 0, offset = 20, key = Key1, value = Message from Kafka App Devinline 1
Message received -> partition = 0, offset = 21, key = Key2, value = Message from Kafka App Devinline 2
Message received -> partition = 0, offset = 22, key = Key3, value = Message from Kafka App Devinline 3
Output of consumer-2:
288 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.9.0.0
288 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : fc7243c2af4b2b4a
27607 [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Attempt to heart beat failed since the group is rebalancing, try to re-join group.
Message received -> partition = 1, offset = 2, key = Key0, value = Message from Kafka App Devinline 0
Message received -> partition = 1, offset = 3, key = Key4, value = Message from Kafka App Devinline 4
- Create a topic Failsafe-Topic with 1 partition and replication factor of 3
[centos@host01 kafka]$ kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 -partitions 1 --topic Failsafe-Devinline-Topic Created topic "Failsafe-Devinline-Topic".
- List all the topics and verify that the new topic is created
[centos@host01 kafka]$ kafka-topics.sh --list --zookeeper localhost:2181 Failsafe-Devinline-Topic Multibroker-App Multibroker-App-Devinline __consumer_offsets topic-devinline-1
- Open a new terminal and start producer to the Failsafe-Topic
[centos@host01 kafka]$ kafka-console-producer.sh --broker-list localhost:9091,localhost:9092 --topic Failsafe-Devinline-Topic >Message-1 >Message-2 >Message-3
- Open a new terminal and start consumer to the Failsafe-Topic
[centos@host01 kafka]$ kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic Failsafe-Devinline-Topic --from-beginning Message-1 Message-2 Message-3
- Describe the topic details and verify the output similar to displayed below
[centos@host01 kafka]$ kafka-topics.sh --describe --zookeeper localhost:2181 --topic Failsafe-Devinline-Topic Topic:Failsafe-Devinline-Topic PartitionCount:1 ReplicationFactor:3 Configs: Topic: Failsafe-Devinline-Topic Partition: 0 Leader: 101 Replicas: 101,102,103 Isr: 101,102,103
- Switch to the terminal where the leader is running and shutdown the same by entering Ctrl + D
- Describe the topic again and verify that new leader election. Isr (InSync replica) has two entry only.
[centos@host01 kafka]$ kafka-topics.sh --describe --zookeeper localhost:2181 --topic Failsafe-Devinline-Topic Topic:Failsafe-Devinline-Topic PartitionCount:1 ReplicationFactor:3 Configs: Topic: Failsafe-Devinline-Topic Partition: 0 Leader: 102 Replicas: 101,102,103 Isr: 102,103
- Restart consumer again and describe topic details again. Newly started consumer added in Isr (InSync replica)
[centos@host01 kafka]$ kafka-topics.sh --describe --zookeeper localhost:2181 --topic Failsafe-Devinline-Topic Topic:Failsafe-Devinline-Topic PartitionCount:1 ReplicationFactor:3 Configs: Topic: Failsafe-Devinline-Topic Partition: 0 Leader: 102 Replicas: 101,102,103 Isr: 102,103,101
Replication prevents Kafka broker failure by electing leader from available Isr consumer list.
0 comments:
Post a Comment