Sep 5, 2018

Apache Kafka : Multi-Broker producer and Kafka SeekingConsumer (Controlling The Consumer's Position)

In previous posts we setup single node multi broker Kafka and did write Kafka producer & consumer in java for producing message and consuming with single broker. In this post we will post message to multi broker and create a consumer which reads always from beginning.

Prerequisite:
1. Zookeeper is up and running.
2. Kafka server setup in previous post - Single node and multiple broker is up and running.
[centos@host01 kafka]$ bin/kafka-server-start.sh config/server-1.properties
[centos@host01 ~]$ bin/kafka-server-start.sh config/server-2.properties
[centos@host01 kafka]$ bin/kafka-server-start.sh config/server-3.properties

How to check Zookeeper and Kafka is running or not ?
In this case 3 instance of Kafka with id 101,102 and 103 is running.
[centos@host01 kafka]$ jps
11859 Kafka
15204 org.eclipse.equinox.launcher_1.3.200.v20160318-1642.jar
10502 QuorumPeerMain
11543 Kafka
12200 Jps
11211 Kafka
[centos@host01 kafka]$ echo dump | nc localhost 2181 | grep brokers
 /brokers/ids/103
 /brokers/ids/102
 /brokers/ids/101

Create Kafka Topic: Create a topic "Multibroker-App-Devinline" with replication factor 3 as we three broker up and running.
[centos@host01 ~]$ kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 -partitions 1 --topic Multibroker-App-Devinline
Created topic "Multibroker-App-Devinline".

Kafka Producer: Below sample publish message to topic "Multibroker-App-Devinline" with replication 3 (localhost:9091, localhost:9092, localhost:9093).

package com.devinline.kafkasamples;

import java.util.Properties;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class KafkaMultiBrokerProducer {
 public static void main(String[] args) {
  String topicName = "Multibroker-App-Devinline";
  Properties props = new Properties();
  props.put("bootstrap.servers", "localhost:9092,localhost:9093");
  props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  props.put("acks", "all");
  Producer<String, String> producer = new KafkaProducer<String, String>(props);
  for (int i = 0; i < 5; i++) {
   String key = "Key" + i;
   String message = "Message from Kafka App Devinline " + i;
   producer.send(new ProducerRecord<String, String>(topicName, key, message));
  }
  System.out.println("Message sent successfully");
  producer.close();
 }
}

Kafka Seeking Consumer : Generally Kafka Consumer will simply consume records from beginning to end, periodically committing its position (either automatically or manually). However Kafka allows the consumer to manually control its position, moving forward or backwards in a partition at will.
Below sample code creates a seeking consumer which starts reading from nth message available in topic. We have to provide implementation for method onPartitionsAssigned () of class ConsumerRebalanceListener.
package com.devinline.kafkasamples;

/**
 * @author www.devinline.com (nikhil)
 *
 */
import java.util.Arrays;
import java.util.Collection;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;

public class KafkaSeekingConsumer {
 public static void main(String[] args) throws Exception {
  String topicName = "Multibroker-App-Devinline";
  String groupName = "mygroup";
  Properties props = new Properties();
  props.put("bootstrap.servers", "localhost:9092,localhost:9093");
  props.put("group.id", groupName);
  props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  final KafkaConsumer<String, String> consumer;
  try {
   consumer = new KafkaConsumer<>(props);
   consumer.subscribe(Arrays.asList(topicName), new ConsumerRebalanceListener() {
    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
     for (TopicPartition partition : partitions) {
      consumer.seek(partition, 5);
     }
    }
   });
   while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records) {
     System.out.printf("Message received -> offset = %d, key = %s, value = %s\n", record.offset(),
       record.key(), record.value());
    }
   }
  } catch (Exception ex) {
   ex.printStackTrace();
  }
 }
}

Note: If we change POSITION value in above sample consumer.seek(partition,<POSITION>), this consumer will read message from that position.

Sample Output
(SeekingConsumer):  Start consuming from 6th Message.
29 [main] INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values:
metric.reporters = []
metadata.max.age.ms = 300000
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
group.id = mygroup
partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
max.partition.fetch.bytes = 1048576
bootstrap.servers = [localhost:9092, localhost:9093]
....
    ....
fetch.min.bytes = 1024
send.buffer.bytes = 131072
auto.offset.reset = latest

460 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.9.0.0
460 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : fc7243c2af4b2b4a
Message received -> offset = 5, key = null, value = 6
Message received -> offset = 6, key = null, value = 7
Message received -> offset = 12, key = Key0, value = Message from Kafka App Devinline 0
Message received -> offset = 13, key = Key1, value = Message from Kafka App Devinline 1
Message received -> offset = 14, key = Key2, value = Message from Kafka App Devinline 2
Message received -> offset = 15, key = Key3, value = Message from Kafka App Devinline 3
Message received -> offset = 16, key = Key4, value = Message from Kafka App Devinline 4

Lets start consumer from terminal and validate the messages from topic "Multibroker-App-Devinline".
[centos@host01 ~]$ kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic Multibroker-App-Devinline --from-beginning
1
2
3
4
5
6
7
Message from Kafka App Devinline 0
Message from Kafka App Devinline 1
Message from Kafka App Devinline 2
Message from Kafka App Devinline 3
Message from Kafka App Devinline 4

Reference Kafka Consumer : https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html
Location: Bengaluru, Karnataka, India