Sep 30, 2018

Java Client to publish message in tibco queue

TIBCO Enterprise Message Service is a standards-based messaging platform (fully TCK certified to both the JMS 1.1 and 2.0 standards) that simplifies and accelerates the integration and management of data distribution in high-performance, enterprise environments.(Source: Wiki). In this post we will create a Java client which publishes message in Tiibco Queue.

1. Create a Maven project and add following dependencies in pom.xml.
<dependencies>
    <!-- https://mvnrepository.com/artifact/javax.jms/javax.jms-api -->
    <dependency>
        <groupId>javax.jms</groupId>
        <artifactId>jms</artifactId>
        <version>1.1</version>
    </dependency>
    <dependency>
        <groupId>tibco.jms</groupId>
        <artifactId>tibco_jms</artifactId>
        <version>6.1</version>
    </dependency>
</dependencies> 

2. Create a Java class TibcoQueue which manages tibco queue details.
/**
 * @author www.devinline.com(nikhil)
 *
 */
public class TibcoQueue {
 private String tibcoInstance;
 private String tibcoQueueName;
 private String userName;
 private String passWord;

 /**
  * @param tibcoInstance
  * @param tibcoQueueName
  * @param userName
  * @param passWord
  */
 public TibcoQueue(String tibcoInstance, String tibcoQueueName, 
   String userName, String passWord) {
  this.tibcoInstance = tibcoInstance;
  this.tibcoQueueName = tibcoQueueName;
  this.userName = userName;
  this.passWord = passWord;
 }

 public String getTibcoInstance() {
  return tibcoInstance;
 }

 public void setTibcoInstance(String tibcoInstance) {
  this.tibcoInstance = tibcoInstance;
 }

 public String getTibcoQueueName() {
  return tibcoQueueName;
 }

 public void setTibcoQueueName(String tibcoQueueName) {
  this.tibcoQueueName = tibcoQueueName;
 }

 public String getUserName() {
  return userName;
 }

 public void setUserName(String userName) {
  this.userName = userName;
 }

 public String getPassWord() {
  return passWord;
 }

 public void setPassWord(String passWord) {
  this.passWord = passWord;
 }
}

3. Create a Java class which does all configuration and send message to queue.
import java.util.List;

import javax.jms.DeliveryMode;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;

import com.tibco.tibjms.TibjmsQueueConnectionFactory;

/**
 * @author www.devinline.com (nikhil)
 *
 */

public class TibcoQueueUtils {
 private QueueConnection queueConnection;
 private QueueSession queueSession;
 private Queue sendQueue;
 private MessageProducer messageProducer;

 private void configureTibcoConnection(TibcoQueue tibcoQueue) throws Exception {
  QueueConnectionFactory queueConnectionFactory = 
    new TibjmsQueueConnectionFactory(tibcoQueue.getTibcoInstance());
  queueConnection = queueConnectionFactory.createQueueConnection(tibcoQueue.getUserName(),
    tibcoQueue.getPassWord());
  queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
  sendQueue = queueSession.createQueue(tibcoQueue.getTibcoQueueName());
 }

 private void configureProducer(TibcoQueue tibcoQueue) throws Exception {
  configureTibcoConnection(tibcoQueue);
  messageProducer = queueSession.createProducer(sendQueue);
  messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
  queueConnection.start();
 }

 private void closeProducer() throws Exception {
  messageProducer.close();
  closeTibcoConnection();
 }

 private void closeTibcoConnection() throws Exception {
  queueSession.close();
  queueConnection.stop();
  queueConnection.close();
 }

 public synchronized void sendMessage(String messageText, 
   TibcoQueue tibcoQueue) throws Exception {
  configureProducer(tibcoQueue);
  TextMessage message = queueSession.createTextMessage(messageText);
  messageProducer.send(message);
  closeProducer();
 }

 public synchronized void sendMessages(List<String> messageTexts, 
   TibcoQueue tibcoQueue) throws Exception {
  messageTexts.forEach(messageText -> {
   try {
    sendMessage(messageText, tibcoQueue);
   } catch (Exception e) {
    e.printStackTrace();
   }
  });
 }
}

4. Create Java class with main method which executes sendMessage() method and initiates message sending to queue.
import com.devinline.jms.tibco.utils.TibcoQueue;
import com.devinline.jms.utils.JMSConstants;
import com.devinline.jms.tibco.utils.TibcoQueueUtils;
/**
 * @author www.devinline.com (nikhil)
 *
 */
public class TibcoQueueClientPublisher {
 public static void main(String[] args) throws Exception {
  String message = "{\"order\":{\"orderNo\":\"XYZ1234\",\"status\":\"COMPLETED\"}}";
  /**/
  TibcoQueue tibcoQueue = new TibcoQueue(
    JMSConstants.BROKER_URL, 
    JMSConstants.QUEUE_NAME,
    JMSConstants.QUEUE_USERNAME, 
    JMSConstants.QUEUE_PASSWORD);
  
  TibcoQueueUtils queueUtil = new TibcoQueueUtils();
  System.out.println("Sending message");
  queueUtil.sendMessage(message,tibcoQueue);
  System.out.println("Message sent successfully!!");
 }
}

Note: Create JMSConstants class with fields broker_url, queueName, username and password and use it in above class accordingly.

Sep 26, 2018

Textual description of firstImageUrl

Favourite Mobile Kafka Stream Application: User activity on e-commerce website captures Username & Mobile Brand as Input and produces output most popular mobile brand with count (MobileBrand: count)

Favourite(Popular) Mobile application: In previous post we did setup and wrote word count program. In this post we will write a Kafka stream app which give favourite colour - count of each colour favoured by number of user.

Prerequisite: Zookeeper and Single node Kafka setup should be up and running. Refer this to setup Setup Single Node Single/Multi Broker Configuration (Kafka and Zookeeper).

User and its favourite mobile is captured as input (producer submit in Input topic) and Kafka Stream listener (Java application that need to be developed) processes the request and gives output most popular mobile brand and associated count.

Input: <user_name>,<favourite_mobile_brand> -- coma is delimiter
Output: <Mobile_brand> : <Count>

Development strategy: Topology and Operations involved

1. Read input topic from Kafka as Kstream
2. Filter invalid input - consider only mobile brands {"iphone", "vivo", "xiaomi","nokia"}
3. Use SelectKey to fetch userId
4. Use MapValues to extract mobile brand in Lowercase
5. Filter and remove unwanted input (Input with userId and mobile from different brand)
6. Write Output to intermediary topic - For converting KStream to KTtable
7. Read from Kafka as KTable
8. Group by Mobile brand
9. Count to total mobile brands in KTable
10. Write output to Output topic

Create Kafka topic : Create Input/output and intermediary topic.
[centos@host01 kafka]$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic favourite-colour-input
Created topic "favourite-mobile-input".

[centos@host01 kafka]$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic user-keys-and-colours --config cleanup.policy=compact
Created topic "user-detail-and-mobile".

[centos@host01 kafka]$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic favourite-colour-output --config cleanup.policy=compact
Created topic "favourite-mobile-output".

Java Kafka Stream client: 

Create a Java class similar to WordCount example and copy & paste below sample code. Below is Java Kafka Stream client for processing Input <User, MobileBrand>:
package com.devinline.kafkastream;

import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;

/**
 * @author www.devinline.com (nikhil)
 *
 */

public class FavouriteMobileApp {
 public static void main(String[] args) {
  Properties props = new Properties();
  /*
   * ApplicationId is specific to application which is used for consumer
   * groupId/default clientId prefix
   */
  props.put(StreamsConfig.APPLICATION_ID_CONFIG, "favourite-mobile-app");
  /* List of IP address:port that need to be connected */
  props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9091");
  /* Start topic consumption from Start */
  props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  /* For serialization and desrialization of data */
  props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
  props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
  
  props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0");
  
  KStreamBuilder builder = new KStreamBuilder();
  /* Stream from Kafka : name of topic */
  KStream<String, String> textInputLines = builder.stream("favourite-mobile-input");
  String[] phones = {"iphone", "vivo", "xiaomi","nokia"};  
  /*favColorInputStream.filter((key,value) -> Arrays.asList(words).contains(value))*/  
  
  KStream<String, String> userAndMobilesStream = textInputLines.filter((key,value) -> value.contains(","))
    .selectKey((key, value) -> value.split(",")[0].toLowerCase())
    .mapValues( value -> value.split(",")[1].toLowerCase())
    .filter((key,value) -> Arrays.asList(phones).contains(value));
    //.filter((user,mobile) -> Arrays.stream(phones).parallel().anyMatch(mobile::contains));
  
  /* Publish in intermediary topic */
  userAndMobilesStream.to("user-detail-and-mobile");
  
  /* Read topic in KTable */
  KTable<String, String> userMobileTable = builder.table("user-detail-and-mobile");
  
  /* Perform GroupBy Operation and followed by count colour */
  KTable<String, Long> favouriteColors = userMobileTable.groupBy((user,mobile) -> new KeyValue<>(mobile, mobile))
    .count("CountByMobileBrand");
  
  /* Publish in output topic */
  favouriteColors.to(Serdes.String(),Serdes.Long(),"favourite-mobile-output");
  
  KafkaStreams streams = new KafkaStreams(builder, props);
  streams.cleanUp();
  streams.start();
  
  /*Print topology details*/
  System.out.println(streams.toString());
  /*Add shutdownhook for graceful shutdown */
  Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
  
 }
}

Open terminal Start Zookeeper and Kafka broker on localhost:9091. Use jps command to validate zookeeper and Kafka is updated and running.
Start Zookeeper
[centos@host01 zookeeper]$ zkServer.sh start
Start Kafka broker
[centos@host01 kafka]$ bin/kafka-server-start.sh config/server-1.properties
[centos@host01 zookeeper]$ jps
3169 Kafka
2649 QuorumPeerMain
10169 Jps

Start Kafka producer(Input to Kafka stream client) and consumer(Output topic): Input (userid,mobile) is passed constantly in input Kafka topic and Kafka stream client process that and produces output(mobile,count>) in output topic.
Start Kafka Producer : Producer produces input in topic "favourite-mobile-input"
[centos@host01 kafka]$ bin/kafka-console-producer.sh --broker-list localhost:9091 --topic favourite-mobile-input
Start Kafka Consumer: Consumer consume message from topic "favourite-mobile-output"
[centos@host01 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9091 \
    --topic favourite-mobile-output \
    --from-beginning \
    --formatter kafka.tools.DefaultMessageFormatter \
    --property print.key=true \
    --property print.value=true \
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

Sample Output : Below are sample input and output of producer and consumer.
Producer :
[centos@host01 kafka]$ bin/kafka-console-producer.sh --broker-list localhost:9091 --topic favourite-mobile-input
>nik,iphone
>ranjan,iphone
>Jack,Nokia
>Susi,lg
>Kavi,xiaomi
>Raj,vivo
>Rajni,Vivo
>Roy,Nokia
>Nik,iphone
>Ranjan,Nokia
>Kavi,Iphone
>

Consumer:
[centos@host01 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9091 \
>     --topic favourite-mobile-output \
>     --from-beginning \
>     --formatter kafka.tools.DefaultMessageFormatter \
>     --property print.key=true \
>     --property print.value=true \
>     --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
>     --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
iphone 1
iphone 2
nokia 1
xiaomi 1
vivo 1
vivo 2
nokia 2
iphone 1
iphone 2
iphone 1
nokia 3
xiaomi 0
iphone 2

Sep 25, 2018

How to Become a Data Scientist: An overview by Active Wizards

There is no doubt that the topic of big data is the trending subject in organisations today. Businesses have no choice other than to embrace the concept and stay up-to-date. Therefore, data scientists like Active Wizards have a big demand for their services in the 21st century. Many people are eyeing the career, especially those who are still in school. But, the big concern is how to become a data scientist. In this ultimate guide, we will tell you more about the profession and what it takes to become one. Read on to know more.

Who Is a Data Scientist?

Most people still get lost somewhere in the complexity of data science. In fact, it is hard to define what data science is by looking at it from one angle. A data scientist’s primary role is to collect and analyze data in an organization or project to make a sensible conclusion.
One of the results can be in the form of the visualization of data, where the data reveals specific patterns. Data scientists can conduct a graphical presentation with charts. This is a clearer way to display the patterns. Data scientists use specific algorithms that analyze the data in specific ways.

What You Need to Have

Before qualifying to become a data scientist, there are various steps you will have to go through. They all revolve around gaining the necessary knowledge, skills and experience. Here are the steps.
  • Earning a bachelor’s degree – most universities and colleges around the world usually offer data science courses. They all teach the same concept even though they may have different names. However, earning a degree from a reputable university will advance your career.
  • Earning a master’s degree – to further improve your skills and chances of becoming a professional in data science, a master’s degree will be a prudent idea. There are numerous universities that offer you a great opportunity to conduct the best research and write a compelling thesis.
  • Working for experience – before becoming an expert, you will need to get exposure in the real world. Some people start this exposure during their internship programs while others wait until they get their first job. The best industries in which to learn more about data science include the banking and healthcare industries.
  • Write a portfolio – after gathering enough experience as an employee in one of the best data science firms, then it would be a great opportunity to start your firm. Write a compelling portfolio and start looking for clients. It is better to target small companies that are not so complex and graduate to bigger ones as you gain experience.

Data Science Paperwork

Before settling down completely as an experienced data scientist running your own company, it is a prudent idea to know whether your state requires you to have a certificate to practice. This can differ depending on where you come from. Some states have authorized bodies to approve all the data scientists. By now, you already know what you need to become a data scientist. Start gathering your skills and experience now by following the above insights.

Author: Jack Botsford
Send Mail to Jack




Get hands on with Machine learning Get started with ML and AI

Sep 22, 2018

Textual description of firstImageUrl

Apache Kafka Stream : Setup Word Count Sample in eclipse and multi broker Kafka cluster

Kafka Streams is a client library for building applications and microservices, where the input and output data are stored in Kafka clusters. Earlier we did setup Kafka Cluster Multi Broker Configuration and performed basic Kafka producer /consumer operations. In this post we will use multi broker Kafka cluster and demonstrate how to use Kafka Streams with word count example.

Create Input and Output Kafka Topic: Create input & output topic with two partitions which will be used to publish & consume  messages.
[centos@host01 kafka]$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic word-count-input-topic
Created topic "word-count-input-topic".
[centos@host01 kafka]$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic word-count-output-topic
Created topic "word-count-output-topic".

Start Kafka: Starting Kafka on localhost and port 9091 and 9092.
[centos@host01 kafka]$ bin/kafka-server-start.sh config/server-1.properties

[centos@host01 kafka]$ bin/kafka-server-start.sh config/server-1.properties

Word count Kafka stream application :

1. Create a Maven project and modify pom.xml with following dependencies.
 <dependencies>
   <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
  <version>0.11.0.0</version>
 </dependency>
 <dependency>
                <groupId>org.slf4j</groupId>
               <artifactId>slf4j-api</artifactId>
               <version>1.7.5</version>
   </dependency>
   <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-simple</artifactId>
       <version>1.6.4</version>
   </dependency>
   <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-log4j12</artifactId>
       <version>1.7.25</version>
   </dependency>
  </dependencies>
  <build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.5.1</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
            </configuration>
        </plugin>
    </plugins>
</build>
2. Create a file log4.properties in resource directory and add following properties.
log4j.rootLogger=INFO, stdout

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%p %m (%c:%L) %n

3. Create a Java class with following code. Below class listens to Input topic registered brokers (bootstrap servers) and publish message to output topic. This Kafka stream app process incoming textLine and split them based on space, keep on counting word and increments its count.
package com.devinline.kafkastream;

import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;

public class StreamStartupApp {
 public static void main(String[] args) {
  Properties props = new Properties();
  /*
   * ApplicationId is specific to application which is used for consumer
   * groupId/default clientId prefix
   */
  props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-stream-wrod-count");
  /* List of IP address:port that need to be connected */
  props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9091, 127.0.0.1:9092");
  /* Start topic consumption from Start */
  props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  /* For serialization and desrialization of data */
  props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
  props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

  KStreamBuilder builder = new KStreamBuilder();
  /* Stream from Kafka : name of topic */
  KStream<String, String> wordCountInputStream = builder.stream("word-count-input-topic");
  
  /* Convert word to lowercase */
  KTable<String, Long> wordCounts = wordCountInputStream.mapValues(textLineInput -> textLineInput.toLowerCase())
     /* Convert word to lowercase */
     .flatMapValues(lowerCaseTextLine -> Arrays.asList(lowerCaseTextLine.split(" ")))
     /* Select key */
     .selectKey((ignoredKey, value) -> value)
     /* Using default grouping using Serdes.String().getClass() */
     .groupByKey()
     /* Compute word count*/
     .count("Count");
  /* Publish wordcounts output to output kafka */
  wordCounts.to(Serdes.String(), Serdes.Long(), "word-count-output-topic");
  
  KafkaStreams kafkaStreams = new KafkaStreams(builder, props);
  kafkaStreams.start();
  
  /*Print topology details*/
  System.out.println(kafkaStreams);
  
  /* Graceful degradation of app - add Shutdownhook */
  Runtime.getRuntime().addShutdownHook(new Thread(kafkaStreams::close));
 }
}

Start a Kafka consumer : This consumer reads message from output topic where message is processed and published by Kafka stream java client.
[centos@host01 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
>         --topic word-count-output-topic \
>         --from-beginning \
>         --formatter kafka.tools.DefaultMessageFormatter \
>         --property print.key=true \
>         --property print.value=true \
>         --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
>         --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
WordCount App Eclipse Project Structure

Start Producer and publish message to Kafka input topic:
[centos@host01 kafka]$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic word-count-input-topic
>Kafka stream works
>Hello Kafka Stream
>WOrks

[centos@host01 kafka]$ bin/kafka-console-producer.sh --broker-list localhost:9091 --topic word-count-input-topic
ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console.
>From producer-1 hello kafka
>Kafka stream    
>

Kafka Consumer message consumption: Consumer display word count processed by Java Kafka Stream applications and published in output topic. It is live stream processing.
[centos@host01 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
>         --topic word-count-output-topic \
>         --from-beginning \
>         --formatter kafka.tools.DefaultMessageFormatter \
>         --property print.key=true \
>         --property print.value=true \
>         --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
>         --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
kafka 1
stream 1
works 1
kafka 2
hello 1
stream 2
works 2
from 1
kafka 3
producer-1 1
hello 2
kafka 4
stream 3

Kafka stream application Internal topics: Running a Kafka Stream may eventually create internal intermediary topics. These are two types of internal topic created - Managed by Kafka stream and used by Kafka stream. We should never use, delete or publish any message in it. It's internal so does not exist for outer world.
  1. Repartitioning topic: On transforming of key of stream, repartitioning will happen at some processor.
  2. Change-log topic: On aggregation Kafka Stream saves compacted data in topic. 
Word count Kafka Stream internal topics: List available topic in broker, we find two topic prefixed by <APPLICATION_ID_CONFIG>_<Compute_word_count_result_string>
[centos@host01 kafka]$ kafka-topics.sh --list --zookeeper localhost:2181
__consumer_offsets
kafka-stream-wrod-count-Count-changelog
kafka-stream-wrod-count-Count-repartition
word-count-input
word-count-input-topic
word-count-output
word-count-output-topic

Scaling Kafka Stream application: Input topic we created earlier has two partition so we could spin up to 2 instance of Kafka Stream application. Most importantly Kafka Stream application relies on Kafka Consumer and we can add multiple Kafka consumer to consumer group. We will demonstrate this using two instance of wordCount app and launch two instance of WordCount app using jar.

  1. Export runnable jar from eclipse with dependencies say KSWordCountApp.jar.
  2. Spin two instance of word count app. Run below command in two different terminal.
    [centos@host01 Desktop]$ java -jar KSWordCountApp-1.jar 
    
  3. Observe log after starting instance-1 and how does it changes after starting instance-2.
    WordCountApp Instance-1 Log: 
    [centos@host01 Desktop]$ java -jar KSWordCountApp-1.jar 
    SLF4J: Class path contains multiple SLF4J bindings.
    SLF4J: Found binding in [jar:rsrc:slf4j-simple-1.6.4.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: Found binding in [jar:rsrc:slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
    SLF4J: Actual binding is of type [org.slf4j.impl.SimpleLoggerFactory]
    189 [main] INFO org.apache.kafka.streams.StreamsConfig - StreamsConfig values: 
     application.id = kafka-stream-wrod-count
     application.server = 
     bootstrap.servers = [127.0.0.1:9091, 127.0.0.1:9092]
     buffered.records.per.partition = 1000
     cache.max.bytes.buffering = 10485760
    .....
    ......
    Count-repartition-0, word-count-input-topic-0] assigned at the end of consumer rebalance.
     
     current active tasks: [0_0, 1_0, 0_1, 1_1]
     current suspended standby tasks: []
     
  4. Start instance-2 and again observe the log of terminal-1 and terminal-2 : Active tasks spliced between two word count app.
    WordCountApp Instance-1 Log:
    39121 [kafka-stream-wrod-count-e4f45acc-9b87-4eed-a0a1-b6a1c0bedf26-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [kafka-stream-wrod-count-e4f45acc-9b87-4eed-a0a1-b6a1c0bedf26-StreamThread-1] partition assignment took 37 ms.
     current active tasks: [0_0, 1_0]
     current standby tasks: []
    
    
    WordCountApp Instance-2 Log: 
    3114 [kafka-stream-wrod-count-94b574f5-d603-42d4-92da-cc9b74f4a651-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [kafka-stream-wrod-count-94b574f5-d603-42d4-92da-cc9b74f4a651-StreamThread-1] partition assignment took 605 ms.
     current active tasks: [0_1, 1_1]
     current standby tasks: []
    
  5. For end user change is not visible. Publish message and observe response in consumer, it will same as earlier. At any time we can shutdown instance-1 or 2 and it will not affect consumer working.
  6. Shutdown instance-2 and again observe logs. Instance-2 gracefully shutdown and active task list of instance-1 is same as when it was started with all four states.
    WordCountApp Instance-2 Log:
    1280475 [kafka-stream-wrod-count-94b574f5-d603-42d4-92da-cc9b74f4a651-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [kafka-stream-wrod-count-94b574f5-d603-42d4-92da-cc9b74f4a651-StreamThread-1] Removing all standby tasks []
    1280475 [kafka-stream-wrod-count-94b574f5-d603-42d4-92da-cc9b74f4a651-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [kafka-stream-wrod-count-94b574f5-d603-42d4-92da-cc9b74f4a651-StreamThread-1] Stream thread shutdown complete
    1280475 [kafka-stream-wrod-count-94b574f5-d603-42d4-92da-cc9b74f4a651-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [kafka-stream-wrod-count-94b574f5-d603-42d4-92da-cc9b74f4a651-StreamThread-1] State transition from PENDING_SHUTDOWN to DEAD.
    1280475 [kafka-streams-close-thread] INFO org.apache.kafka.streams.KafkaStreams - stream-client [kafka-stream-wrod-count-94b574f5-d603-42d4-92da-cc9b74f4a651] Stopped Kafka Streams process.
    1280475 [Thread-1] INFO org.apache.kafka.streams.KafkaStreams - stream-client [kafka-stream-wrod-count-94b574f5-d603-42d4-92da-cc9b74f4a651] State transition from PENDING_SHUTDOWN to NOT_RUNNING.
    
    WordCountApp Instance-1 Log: 
    1328049 [kafka-stream-wrod-count-e4f45acc-9b87-4eed-a0a1-b6a1c0bedf26-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [kafka-stream-wrod-count-e4f45acc-9b87-4eed-a0a1-b6a1c0bedf26-StreamThread-1] partition assignment took 205 ms.
     current active tasks: [0_0, 1_0, 0_1, 1_1]
     current standby tasks: []
    

Sep 20, 2018

Internals of Cassandra data storage: Visualise Cassandra node handshaking and how data is persisted on disk (Simulate flush and compaction using nodetool and sstabledump)

In previous post we Setup Multi node Cassandra Cluster on Google Compute Engine. In this post we analyse Cassandra startup logs and visualise how handshakes happens when new node joins cluster. Later we will see how data is persisted in file system after flushing, after deleting record we will look into how tombstone information is stored and how deleted record is maintained in Cassandra.

Cassandra node running status and Handshake visualisation 

On starting Cassandra on any node we are interested in three things.
  • Messaging service starting on which node (what is Ip address and interface) and what is Ip address of node where Cassandra started and what's final status (Highlighted in pink)
  • Which (starting) node does handshaking with which (already running) node (Highlighted in blue
  • Which all topology is updated on starting Cassandra on given node (Highlighted in Yellow)
Start Cassandra on Instance-1 and capture log: IP address of Instance-2 is 10.128.0.2
INFO  [main] 2018-09-20 16:15:57,812 StorageService.java:618 - Cassandra version: 3.11.3
INFO  [main] 2018-09-20 16:15:57,813 StorageService.java:619 - Thrift API version: 20.1.0
INFO  [main] 2018-09-20 16:15:57,813 StorageService.java:620 - CQL supported versions: 3.4.4 (default: 3.4.4)
INFO  [main] 2018-09-20 16:15:57,813 StorageService.java:622 - Native protocol supported versions: 3/v3, 4/v4, 5/v5-beta (default: 4/v4)
INFO  [main] 2018-09-20 16:15:57,893 IndexSummaryManager.java:85 - Initializing index summary manager with a memory pool size of 50 MB and a resize interval of 60 minutes
INFO  [main] 2018-09-20 16:15:57,914 MessagingService.java:761 - Starting Messaging Service on /10.128.0.2:7000 (eth0)
INFO  [main] 2018-09-20 16:16:03,012 OutboundTcpConnection.java:108 - OutboundTcpConnection using coalescing strategy DISABLED
INFO  [main] 2018-09-20 16:16:29,114 StorageService.java:704 - Loading persisted ring state
INFO  [main] 2018-09-20 16:16:29,137 StorageService.java:822 - Starting up server gossip
INFO  [main] 2018-09-20 16:16:29,250 TokenMetadata.java:479 - Updating topology for /10.128.0.2
INFO  [main] 2018-09-20 16:16:29,251 TokenMetadata.java:479 - Updating topology for /10.128.0.2
INFO  [main] 2018-09-20 16:16:29,370 StorageService.java:1446 - JOINING: Finish joining ring
INFO  [main] 2018-09-20 16:16:29,426 SecondaryIndexManager.java:509 - Executing pre-join tasks for: CFS(Keyspace='keyspace1', ColumnFamily='standard1')
INFO  [main] 2018-09-20 16:16:29,432 SecondaryIndexManager.java:509 - Executing pre-join tasks for: CFS(Keyspace='keyspace1', ColumnFamily='counter1')
INFO  [main] 2018-09-20 16:16:29,439 SecondaryIndexManager.java:509 - Executing pre-join tasks for: CFS(Keyspace='stockdb', ColumnFamily='user')
INFO  [main] 2018-09-20 16:16:29,519 StorageService.java:2289 - Node /10.128.0.2 state jump to NORMAL

Observation
: From above log we can see Messaging service is started on /10.128.0.2:7000 (eth0) and IP address of node is /10.128.0.2 and after starting its state is Normal (last highlighted line of log)
Since it is very first node started in data centre so only topology of this node is updated.

Start Cassandra on Instance-2 and capture log
: IP address of Instance-2 is 10.128.0.3
INFO  [main] 2018-09-20 16:18:26,317 QueryProcessor.java:163 - Preloaded 1 prepared statements
INFO  [main] 2018-09-20 16:18:26,318 StorageService.java:618 - Cassandra version: 3.11.3
INFO  [main] 2018-09-20 16:18:26,318 StorageService.java:619 - Thrift API version: 20.1.0
INFO  [main] 2018-09-20 16:18:26,318 StorageService.java:620 - CQL supported versions: 3.4.4 (default: 3.4.4)
INFO  [main] 2018-09-20 16:18:26,319 StorageService.java:622 - Native protocol supported versions: 3/v3, 4/v4, 5/v5-beta (default: 4/v4)
INFO  [main] 2018-09-20 16:18:26,403 IndexSummaryManager.java:85 - Initializing index summary manager with a memory pool size of 50 MB and a resize interval of 60 minutes
INFO  [main] 2018-09-20 16:18:26,423 MessagingService.java:761 - Starting Messaging Service on /10.128.0.3:7000 (eth0)
INFO  [main] 2018-09-20 16:18:26,545 OutboundTcpConnection.java:108 - OutboundTcpConnection using coalescing strategy DISABLED
INFO  [HANDSHAKE-/10.128.0.2] 2018-09-20 16:18:26,582 OutboundTcpConnection.java:561 - Handshaking version with /10.128.0.2
INFO  [main] 2018-09-20 16:18:27,582 StorageService.java:704 - Loading persisted ring state
INFO  [main] 2018-09-20 16:18:27,607 StorageService.java:822 - Starting up server gossip
INFO  [main] 2018-09-20 16:18:27,715 TokenMetadata.java:479 - Updating topology for /10.128.0.3
INFO  [main] 2018-09-20 16:18:27,716 TokenMetadata.java:479 - Updating topology for /10.128.0.3
INFO  [main] 2018-09-20 16:18:27,845 StorageService.java:1446 - JOINING: Finish joining ring
INFO  [main] 2018-09-20 16:18:27,896 SecondaryIndexManager.java:509 - Executing pre-join tasks for: CFS(Keyspace='keyspace1', ColumnFamily='standard1')
INFO  [main] 2018-09-20 16:18:27,900 SecondaryIndexManager.java:509 - Executing pre-join tasks for: CFS(Keyspace='keyspace1', ColumnFamily='counter1')
INFO  [main] 2018-09-20 16:18:27,901 SecondaryIndexManager.java:509 - Executing pre-join tasks for: CFS(Keyspace='stockdb', ColumnFamily='user')
INFO  [main] 2018-09-20 16:18:27,991 StorageService.java:2289 - Node /10.128.0.3 state jump to NORMAL
INFO  [main] 2018-09-20 16:18:28,033 AuthCache.java:172 - (Re)initializing CredentialsCache (validity period/update interval/max entries) (2000/2000/1000)
INFO  [main] 2018-09-20 16:18:28,043 Gossiper.java:1692 - Waiting for gossip to settle...
INFO  [GossipStage:1] 2018-09-20 16:18:28,731 Gossiper.java:1053 - Node /10.128.0.2 has restarted, now UP
INFO  [GossipStage:1] 2018-09-20 16:18:28,748 StorageService.java:2289 - Node /10.128.0.2 state jump to NORMAL
INFO  [GossipStage:1] 2018-09-20 16:18:28,775 TokenMetadata.java:479 - Updating topology for /10.128.0.2
INFO  [GossipStage:1] 2018-09-20 16:18:28,776 TokenMetadata.java:479 - Updating topology for /10.128.0.2
INFO  [RequestResponseStage-5] 2018-09-20 16:18:28,803 Gossiper.java:1019 - InetAddress /10.128.0.2 is now UP
INFO  [HANDSHAKE-/10.128.0.2] 2018-09-20 16:18:28,822 OutboundTcpConnection.java:561 - Handshaking version with /10.128.0.2

Observation: From above log we can see Messaging service is started on /10.128.0.3:7000 (eth0) and IP address of node is /10.128.0.3 and after starting its state is Normal .
Since Instance-1 is already running when Cassandra starts on this (10.128.0.3) node, it does handshake with already running node (10.128.0.2) and topology of both Instance-1 (10.128.0.2) and Instance-2 (10.128.0.3) is updated.

Start Cassandra on Instance-3 and capture log: IP address of Instance-3 is 10.128.0.4
INFO  [ScheduledTasks:1] 2018-09-20 17:12:45,013 TokenMetadata.java:498 - Updating topology for all endpoints that have changed
INFO  [main] 2018-09-20 17:12:45,213 StorageService.java:600 - Populating token metadata from system tables
INFO  [main] 2018-09-20 17:12:45,370 StorageService.java:607 - Token metadata: Normal Tokens:
/10.128.0.2:[......... ]
/10.128.0.3:[..........]
/10.128.0.4:[..........]
NFO  [main] 2018-09-20 17:12:46,849 MessagingService.java:761 - Starting Messaging Service on /10.128.0.4:7000 (eth0)
INFO  [main] 2018-09-20 17:12:46,960 OutboundTcpConnection.java:108 - OutboundTcpConnection using coalescing strategy DISABLED
INFO  [HANDSHAKE-/10.128.0.2] 2018-09-20 17:12:47,004 OutboundTcpConnection.java:561 - Handshaking version with /10.128.0.2
INFO  [main] 2018-09-20 17:12:48,000 StorageService.java:704 - Loading persisted ring state
INFO  [main] 2018-09-20 17:12:48,026 StorageService.java:822 - Starting up server gossip
INFO  [main] 2018-09-20 17:12:48,125 TokenMetadata.java:479 - Updating topology for /10.128.0.4
INFO  [main] 2018-09-20 17:12:48,125 TokenMetadata.java:479 - Updating topology for /10.128.0.4
INFO  [main] 2018-09-20 17:12:48,228 StorageService.java:1446 - JOINING: Finish joining ring
INFO  [main] 2018-09-20 17:12:48,274 SecondaryIndexManager.java:509 - Executing pre-join tasks for: CFS(Keyspace='keyspace1', ColumnFamily='standard1')
INFO  [main] 2018-09-20 17:12:48,280 SecondaryIndexManager.java:509 - Executing pre-join tasks for: CFS(Keyspace='keyspace1', ColumnFamily='counter1')
INFO  [main] 2018-09-20 17:12:48,280 SecondaryIndexManager.java:509 - Executing pre-join tasks for: CFS(Keyspace='stockdb', ColumnFamily='user')
INFO  [main] 2018-09-20 17:12:48,361 StorageService.java:2289 - Node /10.128.0.4 state jump to NORMAL
INFO  [main] 2018-09-20 17:12:48,394 AuthCache.java:172 - (Re)initializing CredentialsCache (validity period/update interval/max entries) (2000/2000/1000)
INFO  [main] 2018-09-20 17:12:48,401 Gossiper.java:1692 - Waiting for gossip to settle...
INFO  [HANDSHAKE-/10.128.0.3] 2018-09-20 17:12:48,979 OutboundTcpConnection.java:561 - Handshaking version with /10.128.0.3
INFO  [HANDSHAKE-/10.128.0.3] 2018-09-20 17:12:48,993 OutboundTcpConnection.java:561 - Handshaking version with /10.128.0.3
INFO  [GossipStage:1] 2018-09-20 17:12:49,015 Gossiper.java:1053 - Node /10.128.0.2 has restarted, now UP
INFO  [GossipStage:1] 2018-09-20 17:12:49,030 StorageService.java:2289 - Node /10.128.0.2 state jump to NORMAL
INFO  [GossipStage:1] 2018-09-20 17:12:49,045 TokenMetadata.java:479 - Updating topology for /10.128.0.2
INFO  [GossipStage:1] 2018-09-20 17:12:49,047 TokenMetadata.java:479 - Updating topology for /10.128.0.2
INFO  [GossipStage:1] 2018-09-20 17:12:49,050 Gossiper.java:1053 - Node /10.128.0.3 has restarted, now UP
INFO  [GossipStage:1] 2018-09-20 17:12:49,063 StorageService.java:2289 - Node /10.128.0.3 state jump to NORMAL
INFO  [RequestResponseStage-3] 2018-09-20 17:12:49,073 Gossiper.java:1019 - InetAddress /10.128.0.3 is now UP
INFO  [RequestResponseStage-3] 2018-09-20 17:12:49,074 Gossiper.java:1019 - InetAddress /10.128.0.2 is now UP
INFO  [GossipStage:1] 2018-09-20 17:12:49,078 TokenMetadata.java:479 - Updating topology for /10.128.0.3
INFO  [GossipStage:1] 2018-09-20 17:12:49,079 TokenMetadata.java:479 - Updating topology for /10.128.0.3
INFO  [HANDSHAKE-/10.128.0.2] 2018-09-20 17:12:49,304 OutboundTcpConnection.java:561 - Handshaking version with /10.128.0.2

Observation: From above log we can see Messaging service is started on /10.128.0.4:7000 (eth0) and IP address of node is /10.128.0.4 and after starting its state is Normal .
Since Instance-1 & Instance-2 is already running when Cassandra starts on this(10.128.0.4) node, it does handshakes with already running node (10.128.0.2 and 10.128.0.3) and topology of all three Instance-1 (10.128.0.2), Instance-2 (10.128.0.3) and Instance-3 (10.128.0.4) is updated.
We have highlighted token range for all running nodes in green. When a new node joins Virtually tokens are assigned.

Start Cassandra on Instance-4 and capture log: IP address of Instance-4 is 10.128.0.5
INFO  [HANDSHAKE-/10.128.0.2] 2018-09-20 16:08:40,370 OutboundTcpConnection.java:561 - Handshaking version with /10.128.0.2
INFO  [GossipStage:1] 2018-09-20 16:08:40,373 Gossiper.java:1053 - Node /10.128.0.4 has restarted, now UP
INFO  [main] 2018-09-20 16:08:40,399 StorageService.java:1446 - JOINING: Finish joining ring
INFO  [GossipStage:1] 2018-09-20 16:08:40,402 StorageService.java:2289 - Node /10.128.0.4 state jump to NORMAL
INFO  [HANDSHAKE-/10.128.0.4] 2018-09-20 16:08:40,403 OutboundTcpConnection.java:561 - Handshaking version with /10.128.0.4
INFO  [GossipStage:1] 2018-09-20 16:08:40,444 TokenMetadata.java:479 - Updating topology for /10.128.0.4
INFO  [GossipStage:1] 2018-09-20 16:08:40,447 TokenMetadata.java:479 - Updating topology for /10.128.0.4
INFO  [GossipStage:1] 2018-09-20 16:08:40,447 Gossiper.java:1053 - Node /10.128.0.2 has restarted, now UP
INFO  [RequestResponseStage-3] 2018-09-20 16:08:40,468 Gossiper.java:1019 - InetAddress /10.128.0.4 is now UP
INFO  [GossipStage:1] 2018-09-20 16:08:40,472 StorageService.java:2289 - Node /10.128.0.2 state jump to NORMAL
INFO  [RequestResponseStage-2] 2018-09-20 16:08:40,483 Gossiper.java:1019 - InetAddress /10.128.0.2 is now UP
INFO  [GossipStage:1] 2018-09-20 16:08:40,485 TokenMetadata.java:479 - Updating topology for /10.128.0.2
INFO  [GossipStage:1] 2018-09-20 16:08:40,486 TokenMetadata.java:479 - Updating topology for /10.128.0.2
INFO  [GossipStage:1] 2018-09-20 16:08:40,491 Gossiper.java:1053 - Node /10.128.0.3 has restarted, now UP
INFO  [GossipStage:1] 2018-09-20 16:08:40,518 StorageService.java:2289 - Node /10.128.0.3 state jump to NORMAL
INFO  [HANDSHAKE-/10.128.0.3] 2018-09-20 16:08:40,536 OutboundTcpConnection.java:561 - Handshaking version with /10.128.0.3
INFO  [main] 2018-09-20 16:08:40,540 SecondaryIndexManager.java:509 - Executing pre-join tasks for: CFS(Keyspace='keyspace1', ColumnFamily='standard1')
INFO  [GossipStage:1] 2018-09-20 16:08:40,537 TokenMetadata.java:479 - Updating topology for /10.128.0.3
INFO  [GossipStage:1] 2018-09-20 16:08:40,550 TokenMetadata.java:479 - Updating topology for /10.128.0.3
INFO  [main] 2018-09-20 16:08:40,551 SecondaryIndexManager.java:509 - Executing pre-join tasks for: CFS(Keyspace='keyspace1', ColumnFamily='counter1')
INFO  [main] 2018-09-20 16:08:40,556 SecondaryIndexManager.java:509 - Executing pre-join tasks for: CFS(Keyspace='stockdb', ColumnFamily='user')
INFO  [RequestResponseStage-1] 2018-09-20 16:08:40,599 Gossiper.java:1019 - InetAddress /10.128.0.3 is now UP
INFO  [main] 2018-09-20 16:08:40,643 StorageService.java:2289 - Node /10.128.0.5 state jump to NORMAL
INFO  [main] 2018-09-20 16:08:40,668 AuthCache.java:172 - (Re)initializing CredentialsCache (validity period/update interval/max entries) (2000/2000/1000)
INFO  [main] 2018-09-20 16:08:40,676 Gossiper.java:1692 - Waiting for gossip to settle...
INFO  [HANDSHAKE-/10.128.0.3] 2018-09-20 16:08:41,284 OutboundTcpConnection.java:561 - Handshaking version with /10.128.0.3
INFO  [HANDSHAKE-/10.128.0.4] 2018-09-20 16:08:41,297 OutboundTcpConnection.java:561 - Handshaking version with /10.128.0.4

Observation
: From above log we can see IP address of node is /10.128.0.4 and after starting its state is Normal .
Since Instance-1, Instance-2 & Instance-3 is already running when Cassandra starts on this(10.128.0.5) node, it does handshakes with already running node (10.128.0.2 , 10.128.0.3 and 10.128.0.4) and topology of all four Instance-1 (10.128.0.2), Instance-2 (10.128.0.3), Instance-3 (10.128.0.4) and Instance-3 (10.128.0.5) is updated.


Cassandra Data storage visualisation
 

 All four instance of Cassandra is up and running. Run below command and validate data centre(DC) configuration : two node in rack1 and 2 node in rack2.
nikhilranjan234@instance-1:~$ nodetool status
Datacenter: dc1
===============
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address     Load       Tokens       Owns    Host ID                               Rack
UN  10.128.0.2  38.99 MiB  256          ?       02b41029-cacc-47d8-91ca-44a579071529  r1
UN  10.128.0.3  44.05 MiB  256          ?       94b6296c-f1d2-4817-af32-8ae8e7ea07fc  r1
UN  10.128.0.4  61.11 MiB  256          ?       0ec021b0-0ae9-47fc-bd5b-894287d78a0b  r2
UN  10.128.0.5  85.07 MiB  256          ?       0828fce5-715c-4482-a909-e9e1fd40e26a  r2

4 instance of Cassandra is up and running([rack-1:  I1 & I2] [rack-2 : I3 & I4] )

Keyspace and Table(column family) creation : Run cqlsh utility on one of the terminal .
nikhilranjan234@instance-1:~$ cqlsh `hostname -I` -u cassandra -p cassandra
Connected to wm-cluster at 10.128.0.2:9042.
[cqlsh 5.0.1 | Cassandra 3.11.3 | CQL spec 3.4.4 | Native protocol v4]
Use HELP for help.
cassandra@cqlsh> 

Create Keyspace with replication factor 3
: We have four instance of Cassandra running so we can have 3 copy of data on different instances in given DC.
On keyspace creation success prompt does not gives any success message, describe keyspaces and see it has been created.
cassandra@cqlsh> CREATE KEYSPACE IF NOT EXISTS "OCProc" WITH REPLICATION = {'class':'NetworkTopologyStrategy', 'dc1' : 3};
cassandra@cqlsh> describe keyspaces;
system_schema  system     "OCProc"            system_traces
system_auth    keyspace1  system_distributed  stockdb      

Select and Use a keyspace : Since column families (like table in RDMS) are created in context of keyspace so select keyspace we want to work with.
cassandra@cqlsh> use "OCProc" ;
cassandra@cqlsh:OCProc>

Create Table(Column Family) in selected Keyspace
:
cassandra@cqlsh:OCProc> CREATE TABLE user (
          ...            username text,
          ...            email text,
          ...            city text,
          ...            phone varint,
          ...            encrypted_password blob,
          ...            PRIMARY KEY (username, city)
          ...            )WITH comment = 'Creating USERS Tabel to store users details';
cassandra@cqlsh:OCProc> describe tables;
user

Insert data in user Table(Column Family) and select table to display data
:
cassandra@cqlsh:OCProc> INSERT INTO  user 
          ...             ( username ,  email ,  city ,  phone ,  encrypted_password )
          ...             VALUES (
          ...               'zytham',
          ...               'zytham@gmail.com',
          ...               'Patna',
          ...               9999888800,
          ...               0x9792977ed729792e403da53024c6069a9158b8c4
          ...             );
cassandra@cqlsh:OCProc> INSERT INTO  user 
          ...             ( username ,  email ,  city ,  phone ,  encrypted_password )
          ...             VALUES(
          ...               'ranjan',
          ...               'ranjan@gmail.com',
          ...               'Bangalore',
          ...                678998800,
          ...               0x8914977ed729792e403da53024c6069a9158b8c4
          ...             );
cassandra@cqlsh:OCProc> INSERT INTO  user 
          ...             ( username ,  email ,  city ,  phone ,  encrypted_password )
          ...             VALUES(
          ...               'mishra',
          ...               'zytham@gmail.com',
          ...               'Torento',
          ...                00980099766,
          ...               0x891497745729792e403da53024c6069a9158b8c4
          ...             );
cassandra@cqlsh:OCProc> SELECT * FROM user;

 username | city      | email            | encrypted_password                         | phone
----------+-----------+------------------+--------------------------------------------+------------
   zytham |     Patna | zytham@gmail.com | 0x9792977ed729792e403da53024c6069a9158b8c4 | 9999888800
   ranjan | Bangalore | ranjan@gmail.com | 0x8914977ed729792e403da53024c6069a9158b8c4 |  678998800
   mishra |   Torento | zytham@gmail.com | 0x891497745729792e403da53024c6069a9158b8c4 |  980099766

Replicated of data on Cassandra nodes: Which node stores data for username: zytham ?
Below command shows one copy is stored on instance-1(where we are running query) and 2 copy is stored on Instance-3 and Instance-4 (rack2)
nikhilranjan234@instance-1:~$ nodetool getendpoints OCProc user zytham
10.128.0.2
10.128.0.4
10.128.0.5
Run for some other user, it is not necessary that one copy is always stored on instance where query ie being run.

Data directory and its storage : Default data storage directory is /opt/apache-cassandra-3.11.3/data/
Data storage hierarchy : <KEYSPACE>/<TABLE_UNIQUE_ID>/<GENERATION_DATA>
nikhilranjan234@instance-1:/$ cd /opt/apache-cassandra-3.11.3/data/data
nikhilranjan234@instance-1:/opt/apache-cassandra-3.11.3/data/data$ ls -l
total 32
drwxr-xr-x  4 nikhilranjan234 nikhilranjan234 4096 Sep 18 10:22 keyspace1
drwxr-xr-x  3 nikhilranjan234 nikhilranjan234 4096 Sep 20 18:07 OCProc
drwxr-xr-x  3 nikhilranjan234 nikhilranjan234 4096 Sep 18 07:03 stockdb
drwxr-xr-x 26 nikhilranjan234 nikhilranjan234 4096 Sep 18 06:36 system
drwxr-xr-x  6 nikhilranjan234 nikhilranjan234 4096 Sep 18 06:36 system_auth
drwxr-xr-x  5 nikhilranjan234 nikhilranjan234 4096 Sep 18 06:36 system_distributed
drwxr-xr-x 12 nikhilranjan234 nikhilranjan234 4096 Sep 18 06:36 system_schema
drwxr-xr-x  4 nikhilranjan234 nikhilranjan234 4096 Sep 18 06:36 system_traces

We have OCProc keyspace created as directory. Run below command and find persisted data.
nikhilranjan234@instance-1:/opt/apache-cassandra-3.11.3/data/data$ cd OCProc/
nikhilranjan234@instance-1:/opt/apache-cassandra-3.11.3/data/data/OCProc$ cd user-f82692c0bcff11e8a9c80961902fe681/
nikhilranjan234@instance-1:/opt/apache-cassandra-3.11.3/data/data/OCProc/user-f82692c0bcff11e8a9c80961902fe681$ ls
backups

Where is our data, no data is persisted yet : 
Answer: memtable has all our user data, it not yet persisted in disk (sstable)

How does data comes from memtable to disk  ?
Answer: When memtable flush data then it is stored on disk. Let use nodetool to forceful flush data and see what changes it brings to directory "user-f82692c0bcff11e8a9c80961902fe681".
nikhilranjan234@instance-1:/opt/apache-cassandra-3.11.3/data/data/OCProc/user-f82692c0bcff11e8a9c80961902fe681$ ls
backups
nikhilranjan234@instance-1:/opt/apache-cassandra-3.11.3/data/data/OCProc/user-f82692c0bcff11e8a9c80961902fe681$ nodetool flush
nikhilranjan234@instance-1:/opt/apache-cassandra-3.11.3/data/data/OCProc/user-f82692c0bcff11e8a9c80961902fe681$ ls -l
total 40
drwxr-xr-x 2 nikhilranjan234 nikhilranjan234 4096 Sep 20 18:07 backups
-rw-r--r-- 1 nikhilranjan234 nikhilranjan234   43 Sep 20 18:44 mc-1-big-CompressionInfo.db
-rw-r--r-- 1 nikhilranjan234 nikhilranjan234   83 Sep 20 18:44 mc-1-big-Data.db
-rw-r--r-- 1 nikhilranjan234 nikhilranjan234   10 Sep 20 18:44 mc-1-big-Digest.crc32
-rw-r--r-- 1 nikhilranjan234 nikhilranjan234   16 Sep 20 18:44 mc-1-big-Filter.db
-rw-r--r-- 1 nikhilranjan234 nikhilranjan234   10 Sep 20 18:44 mc-1-big-Index.db
-rw-r--r-- 1 nikhilranjan234 nikhilranjan234 4772 Sep 20 18:44 mc-1-big-Statistics.db
-rw-r--r-- 1 nikhilranjan234 nikhilranjan234   62 Sep 20 18:44 mc-1-big-Summary.db
-rw-r--r-- 1 nikhilranjan234 nikhilranjan234   92 Sep 20 18:44 mc-1-big-TOC.txt

On flush memtable flushes data to disk(Intermediate stage is sstable). File "mc-1-big-Data.db" contains our data and along with flush also creates files for index and filter.

View User data in "mc-1-big-Data.db" :  Using sstabledump on instance-1 we can visualise user data in file "mc-1-big-Data.db".
nikhilranjan234@instance-1:/opt/apache-cassandra-3.11.3/data/data/OCProc/user-f82692c0bcff11e8a9c80961902fe681$ sstabledump -d mc-1-big-Data.db
WARN  18:55:20,083 Small commitlog volume detected at /opt/apache-cassandra-3.11.3/data/commitlog; setting commitlog_total_space_in_mb to 2503.  You can override this in cassandra.yaml
WARN  18:55:20,100 Small cdc volume detected at /opt/apache-cassandra-3.11.3/data/cdc_raw; setting cdc_total_space_in_mb to 1251.  You can override this in cassandra.yaml
WARN  18:55:20,267 Only 7.951GiB free across all data volumes. Consider adding more capacity to your cluster or removing obsolete snapshots
[zytham]@0 Row[info=[ts=1537467165740069] ]: Patna | [email=zytham@gmail.com ts=1537467165740069], [encrypted_password=9792977ed729792e403da53024c6069a9158b8c4 ts=1537467165740069], [p
hone=9999888800 ts=1537467165740069]
Instance-1 stores only one record with username = zytham. We can see that each cell is stored with timestamp and column value.

Where are other user records ? : Apply node flush on Instance-3 and display data stored in file first generation data file "mc-1-big-Data.db". Below we can see one copy of each user record is stored on Instance-3.
nikhilranjan234@instance-3:/opt/apache-cassandra-3.11.3/data/data/OCProc/user-f82692c0bcff11e8a9c80961902fe681$ sstabledump -d mc-1-big-Data.db
WARN  18:55:50,970 Small commitlog volume detected at /opt/apache-cassandra-3.11.3/data/commitlog; setting commitlog_total_space_in_mb to 2503.  You can override this in cassandra.yaml
WARN  18:55:50,977 Small cdc volume detected at /opt/apache-cassandra-3.11.3/data/cdc_raw; setting cdc_total_space_in_mb to 1251.  You can override this in cassandra.yaml
WARN  18:55:51,146 Only 7.946GiB free across all data volumes. Consider adding more capacity to your cluster or removing obsolete snapshots
[zytham]@0 Row[info=[ts=1537467165740069] ]: Patna | [email=zytham@gmail.com ts=1537467165740069], [encrypted_password=9792977ed729792e403da53024c6069a9158b8c4 ts=1537467165740069], [p
hone=9999888800 ts=1537467165740069]
[ranjan]@79 Row[info=[ts=1537467165763097] ]: Bangalore | [email=ranjan@gmail.com ts=1537467165763097], [encrypted_password=8914977ed729792e403da53024c6069a9158b8c4 ts=1537467165763097
], [phone=678998800 ts=1537467165763097]
[mishra]@163 Row[info=[ts=1537467168375384] ]: Torento | [email=zytham@gmail.com ts=1537467168375384], [encrypted_password=891497745729792e403da53024c6069a9158b8c4 ts=1537467168375384]
, [phone=980099766 ts=1537467168375384]

Display JOSN form of data visualisation
: Execute above command without -d switch.
nikhilranjan234@instance-3:/opt/apache-cassandra-3.11.3/data/data/OCProc/user-f82692c0bcff11e8a9c80961902fe681$ sstabledump  mc-1-big-Data.db
[
  {
    "partition" : {
      "key" : [ "zytham" ],
      "position" : 0
    },
    "rows" : [
      {
        "type" : "row",
        "position" : 78,
        "clustering" : [ "Patna" ],
        "liveness_info" : { "tstamp" : "2018-09-20T18:12:45.740069Z" },
        "cells" : [
          { "name" : "email", "value" : "zytham@gmail.com" },
          { "name" : "encrypted_password", "value" : "0x9792977ed729792e403da53024c6069a9158b8c4" },
          { "name" : "phone", "value" : 9999888800 }
        ]
      }
    ]
  },
  {
    "partition" : {
      "key" : [ "ranjan" ],
      "position" : 79
    },
    "rows" : [
      {
        "type" : "row",
        "position" : 162,
        "clustering" : [ "Bangalore" ],
        "liveness_info" : { "tstamp" : "2018-09-20T18:12:45.763097Z" },
        "cells" : [
          { "name" : "email", "value" : "ranjan@gmail.com" },
          { "name" : "encrypted_password", "value" : "0x8914977ed729792e403da53024c6069a9158b8c4" },
          { "name" : "phone", "value" : 678998800 }
                  ]
      }
    ]
  },
  {
    "partition" : {
      "key" : [ "mishra" ],
      "position" : 163
    },
    "rows" : [
      {
        "type" : "row",
        "position" : 245,
        "clustering" : [ "Torento" ],
        "liveness_info" : { "tstamp" : "2018-09-20T18:12:48.375384Z" },
        "cells" : [
          { "name" : "email", "value" : "zytham@gmail.com" },
          { "name" : "encrypted_password", "value" : "0x891497745729792e403da53024c6069a9158b8c4" },
          { "name" : "phone", "value" : 980099766 }
        ]
      }
    ]
  }
]

Delete record from user table: delete row form user where username = 'mishra' and city='Torento';
nikhilranjan234@instance-3:/opt/apache-cassandra-3.11.3/data/data/OCProc/user-f82692c0bcff11e8a9c80961902fe681$ cqlsh `hostname -I` -u cassandra -p 
cassandra
Connected to wm-cluster at 10.128.0.4:9042.
[cqlsh 5.0.1 | Cassandra 3.11.3 | CQL spec 3.4.4 | Native protocol v4]
Use HELP for help.
cassandra@cqlsh> 
cassandra@cqlsh> 
cassandra@cqlsh> 
cassandra@cqlsh> 
cassandra@cqlsh> describe keyspaces
system_schema  system     "OCProc"            system_traces
system_auth    keyspace1  system_distributed  stockdb      
cassandra@cqlsh> use "OCProc";
cassandra@cqlsh:OCProc> delete from user where username = 'mishra' and city='Torento';
cassandra@cqlsh:OCProc> exit

See data file again, no change. Where did chnage happend till now ? :
Answer: memtable.
nikhilranjan234@instance-3:/opt/apache-cassandra-3.11.3/data/data/OCProc/user-f82692c0bcff11e8a9c80961902fe681$ sstabledump -d mc-1-big-Data.db 
WARN  19:13:18,553 Small commitlog volume detected at /opt/apache-cassandra-3.11.3/data/commitlog; setting commitlog_total_space_in_mb to 2503.  You can override this
 in cassandra.yaml
WARN  19:13:18,559 Small cdc volume detected at /opt/apache-cassandra-3.11.3/data/cdc_raw; setting cdc_total_space_in_mb to 1251.  You can override this in cassandra.
yaml
WARN  19:13:18,729 Only 7.945GiB free across all data volumes. Consider adding more capacity to your cluster or removing obsolete snapshots
[zytham]@0 Row[info=[ts=1537467165740069] ]: Patna | [email=zytham@gmail.com ts=1537467165740069], [encrypted_password=9792977ed729792e403da53024c6069a9158b8c4 ts=153
7467165740069], [phone=9999888800 ts=1537467165740069]
[ranjan]@79 Row[info=[ts=1537467165763097] ]: Bangalore | [email=ranjan@gmail.com ts=1537467165763097], [encrypted_password=8914977ed729792e403da53024c6069a9158b8c4 t
s=1537467165763097], [phone=678998800 ts=1537467165763097]
[mishra]@163 Row[info=[ts=1537467168375384] ]: Torento | [email=zytham@gmail.com ts=1537467168375384], [encrypted_password=891497745729792e403da53024c6069a9158b8c4 ts
=1537467168375384], [phone=980099766 ts=1537467168375384]

Flush using nodetool : New generation files are persisted along with old one, old files are not deleted.
nikhilranjan234@instance-3:/opt/apache-cassandra-3.11.3/data/data/OCProc/user-f82692c0bcff11e8a9c80961902fe681$ nodetool flush
nikhilranjan234@instance-3:/opt/apache-cassandra-3.11.3/data/data/OCProc/user-f82692c0bcff11e8a9c80961902fe681$ ls -l
total 76
drwxr-xr-x 2 nikhilranjan234 nikhilranjan234 4096 Sep 20 18:07 backups
-rw-r--r-- 1 nikhilranjan234 nikhilranjan234   43 Sep 20 18:54 mc-1-big-CompressionInfo.db
-rw-r--r-- 1 nikhilranjan234 nikhilranjan234  173 Sep 20 18:54 mc-1-big-Data.db
-rw-r--r-- 1 nikhilranjan234 nikhilranjan234   10 Sep 20 18:54 mc-1-big-Digest.crc32
-rw-r--r-- 1 nikhilranjan234 nikhilranjan234   16 Sep 20 18:54 mc-1-big-Filter.db
-rw-r--r-- 1 nikhilranjan234 nikhilranjan234   31 Sep 20 18:54 mc-1-big-Index.db
-rw-r--r-- 1 nikhilranjan234 nikhilranjan234 4786 Sep 20 18:54 mc-1-big-Statistics.db
-rw-r--r-- 1 nikhilranjan234 nikhilranjan234   62 Sep 20 18:54 mc-1-big-Summary.db
-rw-r--r-- 1 nikhilranjan234 nikhilranjan234   92 Sep 20 18:54 mc-1-big-TOC.txt
-rw-r--r-- 1 nikhilranjan234 nikhilranjan234   43 Sep 20 19:15 mc-2-big-CompressionInfo.db
-rw-r--r-- 1 nikhilranjan234 nikhilranjan234   42 Sep 20 19:15 mc-2-big-Data.db
-rw-r--r-- 1 nikhilranjan234 nikhilranjan234   10 Sep 20 19:15 mc-2-big-Digest.crc32
-rw-r--r-- 1 nikhilranjan234 nikhilranjan234   16 Sep 20 19:15 mc-2-big-Filter.db
-rw-r--r-- 1 nikhilranjan234 nikhilranjan234   10 Sep 20 19:15 mc-2-big-Index.db
-rw-r--r-- 1 nikhilranjan234 nikhilranjan234 4637 Sep 20 19:15 mc-2-big-Statistics.db
-rw-r--r-- 1 nikhilranjan234 nikhilranjan234   62 Sep 20 19:15 mc-2-big-Summary.db
-rw-r--r-- 1 nikhilranjan234 nikhilranjan234   92 Sep 20 19:15 mc-2-big-TOC.txt

When old (old generation) files be deleted ? :
Answer: During comapction

Apply compaction: Delete old generation file and merged old & new generation files.
nikhilranjan234@instance-3:/opt/apache-cassandra-3.11.3/data/data/OCProc/user-f82692c0bcff11e8a9c80961902fe681$ nodetool compact
nikhilranjan234@instance-3:/opt/apache-cassandra-3.11.3/data/data/OCProc/user-f82692c0bcff11e8a9c80961902fe681$ ls -l
total 40
drwxr-xr-x 2 nikhilranjan234 nikhilranjan234 4096 Sep 20 18:07 backups
-rw-r--r-- 1 nikhilranjan234 nikhilranjan234   51 Sep 20 19:17 mc-3-big-CompressionInfo.db
-rw-r--r-- 1 nikhilranjan234 nikhilranjan234  171 Sep 20 19:17 mc-3-big-Data.db
-rw-r--r-- 1 nikhilranjan234 nikhilranjan234   10 Sep 20 19:17 mc-3-big-Digest.crc32
-rw-r--r-- 1 nikhilranjan234 nikhilranjan234   16 Sep 20 19:17 mc-3-big-Filter.db
-rw-r--r-- 1 nikhilranjan234 nikhilranjan234   31 Sep 20 19:17 mc-3-big-Index.db
-rw-r--r-- 1 nikhilranjan234 nikhilranjan234 4805 Sep 20 19:17 mc-3-big-Statistics.db
-rw-r--r-- 1 nikhilranjan234 nikhilranjan234   62 Sep 20 19:17 mc-3-big-Summary.db
-rw-r--r-- 1 nikhilranjan234 nikhilranjan234   92 Sep 20 19:17 mc-3-big-TOC.txt

Deleted and compacted: Now visualise merged data file. It has not been removed, it has been marked as deleted (Why?) : If some failed node joins cluster after recovery and has old data, then this marked deleted help to update that node and remove stalled data which has been deleted already.
nikhilranjan234@instance-3:/opt/apache-cassandra-3.11.3/data/data/OCProc/user-f82692c0bcff11e8a9c80961902fe681$ sstabledump -d mc-3-big-Data.db 
WARN  19:26:45,958 Small commitlog volume detected at /opt/apache-cassandra-3.11.3/data/commitlog; setting commitlog_total_space_in_mb to 2503.  You can override this
 in cassandra.yaml
WARN  19:26:45,968 Small cdc volume detected at /opt/apache-cassandra-3.11.3/data/cdc_raw; setting cdc_total_space_in_mb to 1251.  You can override this in cassandra.
yaml
WARN  19:26:46,151 Only 7.966GiB free across all data volumes. Consider adding more capacity to your cluster or removing obsolete snapshots
[zytham]@0 Row[info=[ts=1537467165740069] ]: Patna | [email=zytham@gmail.com ts=1537467165740069], [encrypted_password=9792977ed729792e403da53024c6069a9158b8c4 ts=153
7467165740069], [phone=9999888800 ts=1537467165740069]
[ranjan]@79 Row[info=[ts=1537467165763097] ]: Bangalore | [email=ranjan@gmail.com ts=1537467165763097], [encrypted_password=8914977ed729792e403da53024c6069a9158b8c4 t
s=1537467165763097], [phone=678998800 ts=1537467165763097]
[mishra]@163 Row[info=[ts=-9223372036854775808] del=deletedAt=1537470698672680, localDeletion=1537470698 ]: Torento | 

Here is a nice from the last pikcle post which explains in details - Deletes and Tombstones in Cassandra