Apache Kafka Stream : Setup Word Count Sample in eclipse and multi broker Kafka cluster

Kafka Streams is a client library for building applications and microservices, where the input and output data are stored in Kafka clusters. Earlier we did setup Kafka Cluster Multi Broker Configuration and performed basic Kafka producer /consumer operations. In this post we will use multi broker Kafka cluster and demonstrate how to use Kafka Streams with word count example.

Create Input and Output Kafka Topic: Create input & output topic with two partitions which will be used to publish & consume  messages.
[centos@host01 kafka]$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic word-count-input-topic
Created topic "word-count-input-topic".
[centos@host01 kafka]$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic word-count-output-topic
Created topic "word-count-output-topic".

Start Kafka: Starting Kafka on localhost and port 9091 and 9092.
[centos@host01 kafka]$ bin/kafka-server-start.sh config/server-1.properties

[centos@host01 kafka]$ bin/kafka-server-start.sh config/server-1.properties

Word count Kafka stream application :

1. Create a Maven project and modify pom.xml with following dependencies.
2. Create a file log4.properties in resource directory and add following properties.
log4j.rootLogger=INFO, stdout

log4j.appender.stdout.layout.ConversionPattern=%p %m (%c:%L) %n

3. Create a Java class with following code. Below class listens to Input topic registered brokers (bootstrap servers) and publish message to output topic. This Kafka stream app process incoming textLine and split them based on space, keep on counting word and increments its count.
package com.devinline.kafkastream;

import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;

public class StreamStartupApp {
 public static void main(String[] args) {
  Properties props = new Properties();
   * ApplicationId is specific to application which is used for consumer
   * groupId/default clientId prefix
  props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-stream-wrod-count");
  /* List of IP address:port that need to be connected */
  props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, ",");
  /* Start topic consumption from Start */
  props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  /* For serialization and desrialization of data */
  props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
  props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

  KStreamBuilder builder = new KStreamBuilder();
  /* Stream from Kafka : name of topic */
  KStream<String, String> wordCountInputStream = builder.stream("word-count-input-topic");
  /* Convert word to lowercase */
  KTable<String, Long> wordCounts = wordCountInputStream.mapValues(textLineInput -> textLineInput.toLowerCase())
     /* Convert word to lowercase */
     .flatMapValues(lowerCaseTextLine -> Arrays.asList(lowerCaseTextLine.split(" ")))
     /* Select key */
     .selectKey((ignoredKey, value) -> value)
     /* Using default grouping using Serdes.String().getClass() */
     /* Compute word count*/
  /* Publish wordcounts output to output kafka */
  wordCounts.to(Serdes.String(), Serdes.Long(), "word-count-output-topic");
  KafkaStreams kafkaStreams = new KafkaStreams(builder, props);
  /*Print topology details*/
  /* Graceful degradation of app - add Shutdownhook */
  Runtime.getRuntime().addShutdownHook(new Thread(kafkaStreams::close));

Start a Kafka consumer : This consumer reads message from output topic where message is processed and published by Kafka stream java client.
[centos@host01 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
>         --topic word-count-output-topic \
>         --from-beginning \
>         --formatter kafka.tools.DefaultMessageFormatter \
>         --property print.key=true \
>         --property print.value=true \
>         --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
>         --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
WordCount App Eclipse Project Structure

Start Producer and publish message to Kafka input topic:
[centos@host01 kafka]$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic word-count-input-topic
>Kafka stream works
>Hello Kafka Stream

[centos@host01 kafka]$ bin/kafka-console-producer.sh --broker-list localhost:9091 --topic word-count-input-topic
ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console.
>From producer-1 hello kafka
>Kafka stream    

Kafka Consumer message consumption: Consumer display word count processed by Java Kafka Stream applications and published in output topic. It is live stream processing.
[centos@host01 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
>         --topic word-count-output-topic \
>         --from-beginning \
>         --formatter kafka.tools.DefaultMessageFormatter \
>         --property print.key=true \
>         --property print.value=true \
>         --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
>         --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
kafka 1
stream 1
works 1
kafka 2
hello 1
stream 2
works 2
from 1
kafka 3
producer-1 1
hello 2
kafka 4
stream 3

Kafka stream application Internal topics: Running a Kafka Stream may eventually create internal intermediary topics. These are two types of internal topic created - Managed by Kafka stream and used by Kafka stream. We should never use, delete or publish any message in it. It's internal so does not exist for outer world.
  1. Repartitioning topic: On transforming of key of stream, repartitioning will happen at some processor.
  2. Change-log topic: On aggregation Kafka Stream saves compacted data in topic. 
Word count Kafka Stream internal topics: List available topic in broker, we find two topic prefixed by <APPLICATION_ID_CONFIG>_<Compute_word_count_result_string>
[centos@host01 kafka]$ kafka-topics.sh --list --zookeeper localhost:2181

Scaling Kafka Stream application: Input topic we created earlier has two partition so we could spin up to 2 instance of Kafka Stream application. Most importantly Kafka Stream application relies on Kafka Consumer and we can add multiple Kafka consumer to consumer group. We will demonstrate this using two instance of wordCount app and launch two instance of WordCount app using jar.

  1. Export runnable jar from eclipse with dependencies say KSWordCountApp.jar.
  2. Spin two instance of word count app. Run below command in two different terminal.
    [centos@host01 Desktop]$ java -jar KSWordCountApp-1.jar 
  3. Observe log after starting instance-1 and how does it changes after starting instance-2.
    WordCountApp Instance-1 Log: 
    [centos@host01 Desktop]$ java -jar KSWordCountApp-1.jar 
    SLF4J: Class path contains multiple SLF4J bindings.
    SLF4J: Found binding in [jar:rsrc:slf4j-simple-1.6.4.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: Found binding in [jar:rsrc:slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
    SLF4J: Actual binding is of type [org.slf4j.impl.SimpleLoggerFactory]
    189 [main] INFO org.apache.kafka.streams.StreamsConfig - StreamsConfig values: 
     application.id = kafka-stream-wrod-count
     application.server = 
     bootstrap.servers = [,]
     buffered.records.per.partition = 1000
     cache.max.bytes.buffering = 10485760
    Count-repartition-0, word-count-input-topic-0] assigned at the end of consumer rebalance.
     current active tasks: [0_0, 1_0, 0_1, 1_1]
     current suspended standby tasks: []
  4. Start instance-2 and again observe the log of terminal-1 and terminal-2 : Active tasks spliced between two word count app.
    WordCountApp Instance-1 Log:
    39121 [kafka-stream-wrod-count-e4f45acc-9b87-4eed-a0a1-b6a1c0bedf26-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [kafka-stream-wrod-count-e4f45acc-9b87-4eed-a0a1-b6a1c0bedf26-StreamThread-1] partition assignment took 37 ms.
     current active tasks: [0_0, 1_0]
     current standby tasks: []
    WordCountApp Instance-2 Log: 
    3114 [kafka-stream-wrod-count-94b574f5-d603-42d4-92da-cc9b74f4a651-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [kafka-stream-wrod-count-94b574f5-d603-42d4-92da-cc9b74f4a651-StreamThread-1] partition assignment took 605 ms.
     current active tasks: [0_1, 1_1]
     current standby tasks: []
  5. For end user change is not visible. Publish message and observe response in consumer, it will same as earlier. At any time we can shutdown instance-1 or 2 and it will not affect consumer working.
  6. Shutdown instance-2 and again observe logs. Instance-2 gracefully shutdown and active task list of instance-1 is same as when it was started with all four states.
    WordCountApp Instance-2 Log:
    1280475 [kafka-stream-wrod-count-94b574f5-d603-42d4-92da-cc9b74f4a651-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [kafka-stream-wrod-count-94b574f5-d603-42d4-92da-cc9b74f4a651-StreamThread-1] Removing all standby tasks []
    1280475 [kafka-stream-wrod-count-94b574f5-d603-42d4-92da-cc9b74f4a651-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [kafka-stream-wrod-count-94b574f5-d603-42d4-92da-cc9b74f4a651-StreamThread-1] Stream thread shutdown complete
    1280475 [kafka-stream-wrod-count-94b574f5-d603-42d4-92da-cc9b74f4a651-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [kafka-stream-wrod-count-94b574f5-d603-42d4-92da-cc9b74f4a651-StreamThread-1] State transition from PENDING_SHUTDOWN to DEAD.
    1280475 [kafka-streams-close-thread] INFO org.apache.kafka.streams.KafkaStreams - stream-client [kafka-stream-wrod-count-94b574f5-d603-42d4-92da-cc9b74f4a651] Stopped Kafka Streams process.
    1280475 [Thread-1] INFO org.apache.kafka.streams.KafkaStreams - stream-client [kafka-stream-wrod-count-94b574f5-d603-42d4-92da-cc9b74f4a651] State transition from PENDING_SHUTDOWN to NOT_RUNNING.
    WordCountApp Instance-1 Log: 
    1328049 [kafka-stream-wrod-count-e4f45acc-9b87-4eed-a0a1-b6a1c0bedf26-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [kafka-stream-wrod-count-e4f45acc-9b87-4eed-a0a1-b6a1c0bedf26-StreamThread-1] partition assignment took 205 ms.
     current active tasks: [0_0, 1_0, 0_1, 1_1]
     current standby tasks: []


  1. Nonetheless, PVC does have a few major drawbacks. Its defining characteristic is also the source of its greatest weakness. It is difficult to preserve since it solidifies and contracts when exposed to cold. Distributors and end users should be aware that PVC Shrink Film will automatically and irreversibly shrink in storage if kept in the improper climate. Moreover, PVC shrink film must be sealed using specialized packing equipment. PVC films have been prohibited by several European and international manufacturers due to concerns about their impact on the environment.

Previous Post Next Post