Sep 26, 2018

Textual description of firstImageUrl

Favourite Mobile Kafka Stream Application: User activity on e-commerce website captures Username & Mobile Brand as Input and produces output most popular mobile brand with count (MobileBrand: count)

Favourite(Popular) Mobile application: In previous post we did setup and wrote word count program. In this post we will write a Kafka stream app which give favourite colour - count of each colour favoured by number of user.

Prerequisite: Zookeeper and Single node Kafka setup should be up and running. Refer this to setup Setup Single Node Single/Multi Broker Configuration (Kafka and Zookeeper).

User and its favourite mobile is captured as input (producer submit in Input topic) and Kafka Stream listener (Java application that need to be developed) processes the request and gives output most popular mobile brand and associated count.

Input: <user_name>,<favourite_mobile_brand> -- coma is delimiter
Output: <Mobile_brand> : <Count>

Development strategy: Topology and Operations involved

1. Read input topic from Kafka as Kstream
2. Filter invalid input - consider only mobile brands {"iphone", "vivo", "xiaomi","nokia"}
3. Use SelectKey to fetch userId
4. Use MapValues to extract mobile brand in Lowercase
5. Filter and remove unwanted input (Input with userId and mobile from different brand)
6. Write Output to intermediary topic - For converting KStream to KTtable
7. Read from Kafka as KTable
8. Group by Mobile brand
9. Count to total mobile brands in KTable
10. Write output to Output topic

Create Kafka topic : Create Input/output and intermediary topic.
[centos@host01 kafka]$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic favourite-colour-input
Created topic "favourite-mobile-input".

[centos@host01 kafka]$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic user-keys-and-colours --config cleanup.policy=compact
Created topic "user-detail-and-mobile".

[centos@host01 kafka]$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic favourite-colour-output --config cleanup.policy=compact
Created topic "favourite-mobile-output".

Java Kafka Stream client: 

Create a Java class similar to WordCount example and copy & paste below sample code. Below is Java Kafka Stream client for processing Input <User, MobileBrand>:
package com.devinline.kafkastream;

import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;

/**
 * @author www.devinline.com (nikhil)
 *
 */

public class FavouriteMobileApp {
 public static void main(String[] args) {
  Properties props = new Properties();
  /*
   * ApplicationId is specific to application which is used for consumer
   * groupId/default clientId prefix
   */
  props.put(StreamsConfig.APPLICATION_ID_CONFIG, "favourite-mobile-app");
  /* List of IP address:port that need to be connected */
  props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9091");
  /* Start topic consumption from Start */
  props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  /* For serialization and desrialization of data */
  props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
  props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
  
  props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0");
  
  KStreamBuilder builder = new KStreamBuilder();
  /* Stream from Kafka : name of topic */
  KStream<String, String> textInputLines = builder.stream("favourite-mobile-input");
  String[] phones = {"iphone", "vivo", "xiaomi","nokia"};  
  /*favColorInputStream.filter((key,value) -> Arrays.asList(words).contains(value))*/  
  
  KStream<String, String> userAndMobilesStream = textInputLines.filter((key,value) -> value.contains(","))
    .selectKey((key, value) -> value.split(",")[0].toLowerCase())
    .mapValues( value -> value.split(",")[1].toLowerCase())
    .filter((key,value) -> Arrays.asList(phones).contains(value));
    //.filter((user,mobile) -> Arrays.stream(phones).parallel().anyMatch(mobile::contains));
  
  /* Publish in intermediary topic */
  userAndMobilesStream.to("user-detail-and-mobile");
  
  /* Read topic in KTable */
  KTable<String, String> userMobileTable = builder.table("user-detail-and-mobile");
  
  /* Perform GroupBy Operation and followed by count colour */
  KTable<String, Long> favouriteColors = userMobileTable.groupBy((user,mobile) -> new KeyValue<>(mobile, mobile))
    .count("CountByMobileBrand");
  
  /* Publish in output topic */
  favouriteColors.to(Serdes.String(),Serdes.Long(),"favourite-mobile-output");
  
  KafkaStreams streams = new KafkaStreams(builder, props);
  streams.cleanUp();
  streams.start();
  
  /*Print topology details*/
  System.out.println(streams.toString());
  /*Add shutdownhook for graceful shutdown */
  Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
  
 }
}

Open terminal Start Zookeeper and Kafka broker on localhost:9091. Use jps command to validate zookeeper and Kafka is updated and running.
Start Zookeeper
[centos@host01 zookeeper]$ zkServer.sh start
Start Kafka broker
[centos@host01 kafka]$ bin/kafka-server-start.sh config/server-1.properties
[centos@host01 zookeeper]$ jps
3169 Kafka
2649 QuorumPeerMain
10169 Jps

Start Kafka producer(Input to Kafka stream client) and consumer(Output topic): Input (userid,mobile) is passed constantly in input Kafka topic and Kafka stream client process that and produces output(mobile,count>) in output topic.
Start Kafka Producer : Producer produces input in topic "favourite-mobile-input"
[centos@host01 kafka]$ bin/kafka-console-producer.sh --broker-list localhost:9091 --topic favourite-mobile-input
Start Kafka Consumer: Consumer consume message from topic "favourite-mobile-output"
[centos@host01 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9091 \
    --topic favourite-mobile-output \
    --from-beginning \
    --formatter kafka.tools.DefaultMessageFormatter \
    --property print.key=true \
    --property print.value=true \
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

Sample Output : Below are sample input and output of producer and consumer.
Producer :
[centos@host01 kafka]$ bin/kafka-console-producer.sh --broker-list localhost:9091 --topic favourite-mobile-input
>nik,iphone
>ranjan,iphone
>Jack,Nokia
>Susi,lg
>Kavi,xiaomi
>Raj,vivo
>Rajni,Vivo
>Roy,Nokia
>Nik,iphone
>Ranjan,Nokia
>Kavi,Iphone
>

Consumer:
[centos@host01 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9091 \
>     --topic favourite-mobile-output \
>     --from-beginning \
>     --formatter kafka.tools.DefaultMessageFormatter \
>     --property print.key=true \
>     --property print.value=true \
>     --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
>     --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
iphone 1
iphone 2
nokia 1
xiaomi 1
vivo 1
vivo 2
nokia 2
iphone 1
iphone 2
iphone 1
nokia 3
xiaomi 0
iphone 2


Location: Bengaluru, Karnataka, India