Oct 8, 2018

Textual description of firstImageUrl

Setup Kafka and Zookeeper with brew(homebrew) on Mac operating system: Use Kafka tool(UI) to Publish message and monitor Kafka topic message.

Kafka is publish subscribe messaging system which are most commonly used in asynchronous work flow. Homebrew is a software package management system that simplifies the installation of software on Apple's macOS operating system. Using brew Kafka and Zookeeper can installed & setup with ease.

Install zookeeper using brew(homebrew):
brew install zookeeper
Updating Homebrew...
==> Auto-updated Homebrew!
Updated 2 taps (homebrew/core, homebrew/cask).
==> New Formulae
bundletool         geant4             i2pd               mallet             opensubdiv         opentracing-cpp    stanford-corenlp   tdlib
==> Updated Formulae
....
.....
==> Downloading https://homebrew.bintray.com/bottles/zookeeper-3.4.12.high_sierra.bottle.tar.gz
######################################################################## 100.0%
==> Pouring zookeeper-3.4.12.high_sierra.bottle.tar.gz
==> Caveats
To have launchd start zookeeper now and restart at login:
  brew services start zookeeper
Or, if you don't want/need a background service you can just run:
  zkServer start
==> Summary
🍺  /usr/local/Cellar/zookeeper/3.4.12: 242 files, 32.9MB
Where is installation directory of zookeeper : /usr/local/Cellar/zookeeper

Install Kafka Using brew(hombrew):
➜  brew install kafka
Updating Homebrew...
==> Downloading https://homebrew.bintray.com/bottles/kafka-2.0.0.high_sierra.bottle.tar.gz
######################################################################## 100.0%
==> Pouring kafka-2.0.0.high_sierra.bottle.tar.gz
==> Caveats
To have launchd start kafka now and restart at login:
  brew services start kafka
Or, if you don't want/need a background service you can just run:
  zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties & kafka-server-start /usr/local/etc/kafka/server.properties
==> Summary
🍺  /usr/local/Cellar/kafka/2.0.0: 160 files, 46.8MB
Where is installation directory of Kafka : /usr/local/Cellar/kafka


Kafka Producer and Consumer(Command line tool)

Create Kafka topic: Create topic "topic-devinline-1" with replication factor 1 and partition 1
➜  bin pwd
/usr/local/Cellar/kafka/2.0.0/bin
➜  bin sh kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topic-devinline-1 
Created topic "topic-devinline-1".

Start producer:
➜  bin kafka-console-producer --broker-list localhost:9092 --topic topic-devinline-1                       
>Hello, Fist message
>Bye, Message
>
Start consumer:
➜  bin kafka-console-consumer --bootstrap-server localhost:9092 --topic  topic-devinline-1 --from-beginning
Hello, Fist message
Bye, Message

Kafka UI tool(Kafka Tool)

An UI tool to analyse and post/consume messages in topic. Download Kafka tool
As we have setup Zookeeper and Kafka locally, we can use following configuration to setup Kafka tool.

Analyse Kafka Topic through UI:
Open Topic "topic-devinline-1" and partition-0. Click green play button and find both message posted earlier.

Post Message from UI tool in Kafka topic:
1. Select Partition-0, Select Data tab and Click + 
2. Post message in text-box and click Add.
3. Click on green-play button and message posted is available in topic.


Note: If you find text box for Key also, provide a key along with message. It can be configured in preferences.
Kafka tool -> Preferences -> topics -> Default content type (Key = No Key)

Oct 7, 2018

Textual description of firstImageUrl

Tableau : Extract Data and Filter - How to create filter and Visualise stats in Tableau

Extracts are saved subsets of data that we can use to improve performance - we can reduce the total amount of data by using filters and configuring other limits.

How to create Extract in Tableau(Desktop version) :-
1.  Right click on Data source listed under Data tab and select Extract Data
2.  Select default or modify config for selecting specific  set of records and click Extract.

How to use a filter for all Worksheet?
Right click on Filter -> "Apply to Worksheets" ->  Select "All using this datasource"
Now this filter will be available for all worksheet throughout if data source is used on the working area.

Filter data source and visualise in Tableau 
1. Download data source - Unemployment Statistics
2. Open Tableau and Import excel as data source. Create extract as suggested above. On creating extract by default it will be used.
3. Drag Unemployed field from Measures section to rows shelf and drag Period from Dimensions to Columns shelf. By default, Line chart will be created in Working area.Change line chart to Area chart.
4. Drag Age from Dimensions to Filter area and select at least one age range. Right click on filter created and select Show Filter. Drag Age filter below Marks area.

5. Drag Age in Label and Color pallet and select different age range from filter and observe Area chart. Below area chart created with three age range.

Oct 6, 2018

Textual description of firstImageUrl

Tableau : Bar Chart displaying Representative and TotalSell along with their yearly bonus.

In previous post we discussed about Data visualisation with CSV data source and Tableau : Export a Image chart with report generated from CSV data - here we visualised Representative and their TotalSells. In this post we will reuse same worksheet and enhance to solve Try Yourself question posted at end of above post.

QuestionCreate a bar chart displaying Representative and TotalSell along with there yearly bonus. Display yearly bonus at top of each bar.  

Prerequisite:
Data visualisation:  Representative and TotalSell along with there yearly bonus
1. Right click in Measures section and click on "Create Calculated field".
2. Provide a validation name for this new fields as "TotalBonus" and it is 5% of total sell.
[TotalSales(In $)]*(0.05)

3. Drag TotalBonus from Measures section on Label.
4. Right click on dragged field from Marks section(below tooltip) and format it with standard English(US) to display in $.
Representative Bonus Summary along with TotalSells 


Textual description of firstImageUrl

Data visualisation with CSV data source and Tableau : Generate report with CSV data source and export as Image

Tableau is an interactive data visualisation products focused on business intelligence. It provides support for integrating various data source like CSV, databases(local and cloud), various big data storage like Apache drill, Hive, etc. It query datasources and then generates a number of graph types and store and retrieve data from its in-memory data engine.

In previous posts we saw how to integrate tableau with Hive and visualise & Integrate Tableau and Apache drill(JSON data source). Here we will use CSV data source to visualise using Tableau.

1. Download CSV datasource.
2. Click Text file form connect section and import CSV file & visualise CSV data in tabular form.

Click on Sheet 1 and open dashboard. Under data tab we have two sections (Dimension and Measures) which contains columns from data source.
Dimension : Independent variable (Order Date, Region, Item and Rep)
Measures:Dependent variable (Unit price and Units)

Visualisation-1: Total Number of Units Sold by representative in each region 
Drag and drop Region & Rep from dimension to working area, which acts columns data. Similarly drag and drop Units from Measures in working area.
Number of units per region and representative (In decreasing order per region)

Alex sold maximum units in Central, Richard in East and James in west region. Does any company interested in number of units sold by their representative or total amount in $.

Visualisation-2: Total amount(In $) Sold by representative in each region
Lets create new field Total from dependent fields (Units and Unit price). Do right click in Measures section and select "Create Calculated field". Using Units and unit price compute TotalSales which listed in Measures section.
Custom field TotalSales computed from Units and Unit Price
Delete Sun(Units) from Rows. Drag & drop TotalSales instead. Now we have a chart with Rep with TotalSales(In $).
Representative VS TotalSales(In $)
Above chart shows Matthew is best Rep in Central, Susan in East and James in West region. TotalSales(In $) gives right insights about sells and representative.

Colourful chart: Drag and drop Region & Rep in Color pallet. It modifies chart and each bar is coloured. If Rep is dragged- each rep is f different color, when Region is dragged - each region is represented with different color.
Colourful grouped representation: By Rep and By Region

Format Labels: Drag TotoalSales in Label pallet (beside Color). It brings total sales value on each bar of chart. Format of currency representation can be changed.
Export worksheet as Image : Go to Worksheet -> Export -> Image...

Try Yourself: Modify existing worksheet and compute bonus for each Rep as 5% of individual's TotalSell and display at top of each Rep.

Next: Create a bar chart Representative and TotalSell along with there yearly bonus. (Yes, Solution of above question)
Textual description of firstImageUrl

Elasticsearch and Kibana : Kibana DevConsole with ElasticSearch

Oct 5, 2018

Textual description of firstImageUrl

Kafka Connect: Setup ElasticSearch Sink Connector to transfer Kafka topic data to ElasticSearch in distributed mode

Elasticsearch is a distributed search and analytics engine which runs on Apache Lucene (The indexing and search library for high performance, full text search engine).
In previous posts we used Kafka Source Connectors (FileSourceConnector in standalone and Distributed mode & TwitterSourceConnector) - DataSource to KafkaTopic. In this post we will use ElasticSearchSink to transfer data from Kafka topic to ElasticSearch. 

Prerequisite:

Setup ElasticSearchSink connector
1. Start Docker.
2. Start Kafka Cluster and elastic search on docker(Landoop fast-data-dev) using docker-compose
➜  Kafka-connect docker-compose up kafka-cluster elsticsearch 
Creating network "code_default" with the default driver
Pulling kafka-cluster (landoop/fast-data-dev:cp3.3.0)...

3. ElasticSearch Connector config: Go to http://127.0.0.1:3030 and create ElasticSearch connector with following configurations.
name=sink-elastic-twitter-distributed
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=2
topics=kafka-connect-distributed-twitter
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
connection.url=http://elasticsearch:9200
type.name=kafka-connect
key.ignore=true

4. Topology:  Source twitter distributed connected to Sink elastic twitter distributed
5. Monitor topic kafka-connect-distributed-twitter and count message at http://127.0.0.1:9200/kafka-connect-distributed-twitter/_count

Oct 4, 2018

Textual description of firstImageUrl

Twitter Kafka connector : How to fetch tweets from twitter account using Kafka connector and publish it in another Kafka topic

In previous post we did setup Kafka connect cluster in docker(Landoop fast-data-dev) and created  FileStreamConnector in standalone mode and in distributed mode for transferring file content to Kafka topic. In this post we will use existing Docker Kafka connect setup to fetch tweets from twitter using TwitterConnector and publish in another topic.

Prerequisite

1. Setup Kafka connect. I have setup using docker Landoop fast-data-dev.
2. Create Twitter App account and setup an app. Twitter provides four different keys. Under Keys and access token tab we have following keys:
Application Settings
    Consumer Key (API Key)
    Consumer Secret (API Secret)
Your Access Token
    Access Token
    Access Token Secret

I have created an app "ZythamTwitterStreamTest" and ready with 4 keys. Start Kafka cluster using docker-compose.
➜  Kafka-connect docker-compose up kafka-cluster 
Starting kafkaconnect_kafka-cluster_1 ... done
Attaching to kafkaconnect_kafka-cluster_1

Create Kafka topic where tweets are published by TwitterKafkaConnector.
➜  Kafka-connect kafka-topics --create --topic kafka-connect-distributed-twitter --partitions 3 --replication-factor 1 --zookeeper 127.0.0.1:2181
Created topic "kafka-connect-distributed".

Create Kafka twitter connector for distributed mode transferring stream data to Kafka topic: Use below configuration for creating twitter Kafka connector.

name=source-twitter-distributed
connector.class=com.eneco.trading.kafka.connect.twitter.TwitterSourceConnector
tasks.max=1
topic=kafka-connect-distributed-twitter
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
twitter.consumerkey=LXPbdzppfUS51i9v3clDSBCBe
twitter.consumersecret=FkPxDo0TvvnDb9wnlXF7i3MEvRKpCo9BcAfxGXfSRMotss7zyF
twitter.token=67405754-X4LRN3IWJ9CP5JuPxs86iiKWev20o7Zi6g3IuHFpM
twitter.secret=Fi4qhOfXSsPQTEjIfCOwNNVNLxHyXiP3vMHIrUc9OjFCE
track.terms=Bangalore,Kafka,Cyclone
language=en


Start Kafka consumer listening topic "kafka-connect-distributed-twitter" :
root@fast-data-dev / $ kafka-console-consumer --topic kafka-connect-distributed-twitter --bootstrap-server 127.0.0.1:9092

Message received by consumer (JSON formatted): Below message received as tweet contains word "Bangalore".

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "int64",
        "optional": false,
        "field": "id"
      },
      {
        "type": "string",
        "optional": true,
        "field": "created_at"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "int64",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": true,
            "field": "name"
          },
          {
            "type": "string",
            "optional": true,
            "field": "screen_name"
          },
          {
            "type": "string",
            "optional": true,
            "field": "location"
          },
          {
            "type": "boolean",
            "optional": false,
            "field": "verified"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "friends_count"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "followers_count"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "statuses_count"
          }
        ],
        "optional": false,
        "name": "com.eneco.trading.kafka.connect.twitter.User",
        "field": "user"
      },
      {
        "type": "string",
        "optional": true,
        "field": "text"
      },
      {
        "type": "string",
        "optional": true,
        "field": "lang"
      },
      {
        "type": "boolean",
        "optional": false,
        "field": "is_retweet"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "array",
            "items": {
              "type": "struct",
              "fields": [
                {
                  "type": "string",
                  "optional": true,
                  "field": "text"
                }
              ],
              "optional": false,
              "name": "com.eneco.trading.kafka.connect.twitter.Hashtag"
            },
            "optional": true,
            "field": "hashtags"
          },
          {
            "type": "array",
            "items": {
              "type": "struct",
              "fields": [
                {
                  "type": "string",
                  "optional": true,
                  "field": "display_url"
                },
                {
                  "type": "string",
                  "optional": true,
                  "field": "expanded_url"
                },
                {
                  "type": "int64",
                  "optional": false,
                  "field": "id"
                },
                {
                  "type": "string",
                  "optional": true,
                  "field": "type"
                },
                {
                  "type": "string",
                  "optional": true,
                  "field": "url"
                }
              ],
              "optional": false,
              "name": "com.eneco.trading.kafka.connect.twitter.Medium"
            },
            "optional": true,
            "field": "media"
          },
          {
            "type": "array",
            "items": {
              "type": "struct",
              "fields": [
                {
                  "type": "string",
                  "optional": true,
                  "field": "display_url"
                },
                {
                  "type": "string",
                  "optional": true,
                  "field": "expanded_url"
                },
                {
                  "type": "string",
                  "optional": true,
                  "field": "url"
                }
              ],
              "optional": false,
              "name": "com.eneco.trading.kafka.connect.twitter.Url"
            },
            "optional": true,
            "field": "urls"
          },
          {
            "type": "array",
            "items": {
              "type": "struct",
              "fields": [
                {
                  "type": "int64",
                  "optional": false,
                  "field": "id"
                },
                {
                  "type": "string",
                  "optional": true,
                  "field": "name"
                },
                {
                  "type": "string",
                  "optional": true,
                  "field": "screen_name"
                }
              ],
              "optional": false,
              "name": "com.eneco.trading.kafka.connect.twitter.UserMention"
            },
            "optional": true,
            "field": "user_mentions"
          }
        ],
        "optional": false,
        "name": "com.eneco.trading.kafka.connect.twitter.Entities",
        "field": "entities"
      }
    ],
    "optional": false,
    "name": "com.eneco.trading.kafka.connect.twitter.Tweet"
  },
  "payload": {
    "id": 1047866309655101400,
    "created_at": "2018-10-04T15:09:31.000+0000",
    "user": {
      "id": 166235967,
      "name": "Bangalore Informer",
      "screen_name": "bangaloreinform",
      "location": "Bangalore",
      "verified": false,
      "friends_count": 27,
      "followers_count": 402,
      "statuses_count": 27755
    },
    "text": "WANTED….? For Bangalore Project|Easy Cinema Studios| – Bangalore Video https://t.co/J5xXw9Eq8I",
    "lang": "en",
    "is_retweet": false,
    "entities": {
      "hashtags": [],
      "media": [],
      "urls": [
        {
          "display_url": "bangaloreinformer.com/49412/wanted-f…",
          "expanded_url": "https://bangaloreinformer.com/49412/wanted-for-bangalore-projecteasy-cinema-studios-bangalore-video/",
          "url": "https://t.co/J5xXw9Eq8I"
        }
      ],
      "user_mentions": []
    }
  }
}


Oct 2, 2018

Textual description of firstImageUrl

Kafka Connect: Setup Kafka Connect Cluster(Docker Landoop fast-data-dev) and FileStream Source connector (Standalone and Distributed mode) Part-2

In previous post we did setup Kafka connect cluster in docker(Landoop fast-data-dev) and created  FileStreamConnector in standalone mode for transferring file content to Kafka topic. In this post we will use existing Docker Kafka connect setup to transfer file content in distributed mode.

Kafka Connect Source (Distributed mode) 

In this section we will see how to configure a connector in distributed mode using Kafka connect UI and run FileStreamSource connector in distributed mode. Read from file and publish data to Kafka topic. FileStreamSource connector acts as carrier.

Step-1: Create Kafka topic "kafka-connect-distibuted" with 3 partition and replication factor 1. Topic created can be validated form in browser at address http://127.0.0.1:3030/kafka-topics-ui/#/.
root@fast-data-dev kafka-connect $ kafka-topics --create --topic kafka-connect-distributed --partitions 3 --replication-factor 1 --zookeeper 127.0.0.1:2181
Created topic "kafka-connect-distributed".

Step-2: Go to Kafka connect UI http://127.0.0.1:3030/kafka-connect-ui/#/cluster/fast-data-dev and Create File connector. Click on New and select File from available connector.
FileStream connector creation from Kafka Connect UI

Step-3: Update configs with the following & click CREATE. On success it creates a connector "file-stream-kafka-connect-distributed" and lists in left side connector panel.
name=file-stream-kafka-connect-distributed
connector.class=org.apache.kafka.connect.file.FileStreamSourceConnector
tasks.max=1
file=source-input.txt
topic=kafka-connect-distributed
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
FileStreamSource Connector configuration
Step-4: Since we are running connector in distributed mode, we have to create source-input.txt file in Kafka connect cluster. Make sure docker is up and running (Remember: docker-compose up kafka-cluster)
  • Find running docker container ID. 
➜  Kafka-connect docker ps
CONTAINER ID        IMAGE                           COMMAND                  CREATED             STATUS              PORTS                                                                                                                                                  NAMES
2ebdc89d7caf        landoop/fast-data-dev:cp3.3.0   "/usr/local/bin/dumb…"   18 hours ago        Up 18 hours         0.0.0.0:2181->2181/tcp, 0.0.0.0:3030->3030/tcp, 0.0.0.0:8081-8083->8081-8083/tcp, 0.0.0.0:9092->9092/tcp, 0.0.0.0:9581-9585->9581-9585/tcp, 3031/tcp   kafkaconnect_kafka-cluster_1
  • Using above containerId(2ebdc89d7caf) login to docker machine, execute ls command validate we are in docker.
➜  Kafka-connect docker exec -it 2ebdc89d7caf bash
root@fast-data-dev / $ pwd
/
root@fast-data-dev / $ ls
bin	    dev			home   mnt   root  srv	usr
build.info  etc			lib    opt   run   sys	var
connectors  extra-connect-jars	media  proc  sbin  tmp
root@fast-data-dev / $ 
Step-5: Action time !! Create source-input.txt and write some message in it. Once file is saved, all message is posted in topic "kafka-connect-distributed".
root@fast-data-dev / $ vi source-input.txt 
root@fast-data-dev / $ cat source-input.txt 
Hello, From distributed mode
Messaege from docker 
Bye, Last message !!

Step-6 : Visit Kafka topic UI and check for the message copied from file to topic by FileSourceConnector. Note that message are stored in JSON format as connector topic created earlier with config value.converter.schemas.enable=true.

Consume message from topic and validate message are retrieved in JOSN format

Add some new message in source-input.txt.  Open a new terminal and start consuming message from topic "kafka-connect-distributed".

Docker terminal
root@fast-data-dev / $ pwd
/
root@fast-data-dev / $ echo "MesageLive-1" >> source-input.txt 
root@fast-data-dev / $ echo "MesageLive-Dist-2" >> source-input.txt 
root@fast-data-dev / $ 

Topic consumer terminal 
➜  Kafka-connect docker run --rm -it --net=host landoop/fast-data-dev bash

root@fast-data-dev / $ kafka-console-consumer --topic kafka-connect-distributed --from-beginning --bootstrap-server 127.0.0.1:9092
{"schema":{"type":"string","optional":false},"payload":"Hello, From distributed mode"}
{"schema":{"type":"string","optional":false},"payload":"Messaege from docker "}
{"schema":{"type":"string","optional":false},"payload":"Bye, Last message !!"}
{"schema":{"type":"string","optional":false},"payload":"MesageLive-1"}
{"schema":{"type":"string","optional":false},"payload":"MesageLive-Dist-2"}


Related post: 
Textual description of firstImageUrl

Kafka Connect: Setup Kafka Connect Cluster(Docker Landoop fast-data-dev) and FileStream Source connector (Standalone and Distributed mode) Part-1

Kafka Connect (or Connect API) is a framework to import/export data from/to other systems and it internally uses the Producer and Consumer API. The Connect API defines the programming interface which is implemented to build a concrete connector which has actual logic to read/write data from other system. These other system can be Databases, Couchbase, Sap, HANA, Blockchain, Cassandra, FTP, Twitter, etc.
Kafka connect API includes both Producer API (for Source -> Kafka) and Consumer API (for Kafka -> Sink). Like Kafka cluster consists of multiple brokers, Kafka connect cluster is collection of workers (Servers).
Kafka connect - Source and Sink interaction

Kafka Connect workers Standalone vs Distributed Mode:
Standalone mode:  Single process runs our connectors and tasks(Connectors + User configuration => Task). Configuration is bundled with processes. Its easy to get started but not fault tolerant and no scalability.
Distributed mode:  multiple workers run our connectors and tanks. Configuration is submitted using REST API. Scalable and fault tolerant(rebalances on worker failure).

Install Docker and setup Kafka connector:

1. Refer this and install docker as per your operating system. I have installed docker in Mac and validate it is up and running. Start docker and wait for a moment to get started.
Docker is up and running with version validation in terminal

2. Create a file docker-compose.yml and copy & paste following configs in it. (Or download docker-compose.yml).
version: '2'

services:
  # this is our kafka cluster.
  kafka-cluster:
    image: landoop/fast-data-dev:cp3.3.0
    environment:
      ADV_HOST: 127.0.0.1         # Change to 192.168.99.100 if using Docker Toolbox
      RUNTESTS: 0                 # Disable Running tests so the cluster starts faster
    ports:
      - 2181:2181                 # Zookeeper
      - 3030:3030                 # Landoop UI
      - 8081-8083:8081-8083       # REST Proxy, Schema Registry, Kafka Connect ports
      - 9581-9585:9581-9585       # JMX Ports
      - 9092:9092                 # Kafka Broker

  # we will use elasticsearch as one of our sinks.
  # This configuration allows you to start elasticsearch
  elasticsearch:
    image: itzg/elasticsearch:2.4.3
    environment:
      PLUGINS: appbaseio/dejavu
      OPTS: -Dindex.number_of_shards=1 -Dindex.number_of_replicas=0
    ports:
      - "9200:9200"

  # we will use postgres as one of our sinks.
  # This configuration allows you to start postgres
  postgres:
    image: postgres:9.5-alpine
    environment:
      POSTGRES_USER: postgres     # define credentials
      POSTGRES_PASSWORD: postgres # define credentials
      POSTGRES_DB: postgres       # define database
    ports:
      - 5432:5432                 # Postgres port

3. Go to the directory where we have created this yaml file and execute following command to start Kafka cluster. When below command is ran very first time it download Landoop fast-data-dev image(highlighted in green). Once image is downloaded it creates a Kafka cluster and it can be accessed via browser at address 127.0.0.1:3030

➜  Kafka-connect pwd
/Users/n0r0082/Kafka/Kafka-connect
➜  Kafka-connect docker-compose up kafka-cluster 
Creating network "code_default" with the default driver
Pulling kafka-cluster (landoop/fast-data-dev:cp3.3.0)...
cp3.3.0: Pulling from landoop/fast-data-dev
b56ae66c2937: Pull complete
......
......
990edf28e90a: Pull complete
Digest: sha256:0177ed6416dd0a549a6ec5028e0d19d93b323d03086a452976c251bb9d6a54e4
Status: Downloaded newer image for landoop/fast-data-dev:cp3.3.0
Creating code_kafka-cluster_1 ... done
Attaching to code_kafka-cluster_1
kafka-cluster_1  | Setting advertised host to 127.0.0.1.
kafka-cluster_1  | Starting services.
kafka-cluster_1  | This is landoop’s fast-data-dev. Kafka 0.11.0.0, Confluent OSS 3.3.0.
kafka-cluster_1  | You may visit http://127.0.0.1:3030 in about a minute.
kafka-cluster_1  | 2018-09-29 18:13:33,195 CRIT Supervisor running as root (no user in config file)
kafka-cluster_1  | 2018-09-29 18:13:33,195 WARN Included extra file "/etc/supervisord.d/01-zookeeper.conf" during parsing
kafka-cluster_1  | 2018-09-29 18:13:33,195 WARN Included extra file "/etc/supervisord.d/02-broker.conf" during parsing
kafka-cluster_1  | 2018-09-29 18:13:33,195 WARN Included extra file "/etc/supervisord.d/03-schema-registry.conf" during parsing
kafka-cluster_1  | 2018-09-29 18:13:33,195 WARN Included extra file "/etc/supervisord.d/04-rest-proxy.conf" during parsing
kafka-cluster_1  | 2018-09-29 18:13:33,195 WARN Included extra file "/etc/supervisord.d/05-connect-distributed.conf" during parsing
kafka-cluster_1  | 2018-09-29 18:13:33,195 WARN Included extra file "/etc/supervisord.d/06-caddy.conf" during parsing
kafka-cluster_1  | 2018-09-29 18:13:33,195 WARN Included extra file "/etc/supervisord.d/07-smoke-tests.conf" during parsing
kafka-cluster_1  | 2018-09-29 18:13:33,195 WARN Included extra file "/etc/supervisord.d/08-logs-to-kafka.conf" during parsing
kafka-cluster_1  | 2018-09-29 18:13:33,195 WARN Included extra file "/etc/supervisord.d/99-supervisord-sample-data.conf" during parsing
kafka-cluster_1  | 2018-09-29 18:13:33,196 INFO supervisord started with pid 7
kafka-cluster_1  | 2018-09-29 18:13:34,205 INFO spawned: 'sample-data' with pid 97
kafka-cluster_1  | 2018-09-29 18:13:34,207 INFO spawned: 'zookeeper' with pid 98
kafka-cluster_1  | 2018-09-29 18:13:34,209 INFO spawned: 'caddy' with pid 99
kafka-cluster_1  | 2018-09-29 18:13:34,211 INFO spawned: 'broker' with pid 101
kafka-cluster_1  | 2018-09-29 18:13:34,213 INFO spawned: 'smoke-tests' with pid 102
kafka-cluster_1  | 2018-09-29 18:13:34,215 INFO spawned: 'connect-distributed' with pid 104
kafka-cluster_1  | 2018-09-29 18:13:34,217 INFO spawned: 'logs-to-kafka' with pid 108
kafka-cluster_1  | 2018-09-29 18:13:34,219 INFO spawned: 'schema-registry' with pid 110
kafka-cluster_1  | 2018-09-29 18:13:34,221 INFO spawned: 'rest-proxy' with pid 111
kafka-cluster_1  | 2018-09-29 18:13:35,220 INFO success: sample-data entered RUNNING state, process has stayed up for > than 1 seconds (startsecs)
kafka-cluster_1  | 2018-09-29 18:13:35,220 INFO success: zookeeper entered RUNNING state, process has stayed up for > than 1 seconds (startsecs)
kafka-cluster_1  | 2018-09-29 18:13:35,221 INFO success: caddy entered RUNNING state, process has stayed up for > than 1 seconds (startsecs)
kafka-cluster_1  | 2018-09-29 18:13:35,221 INFO success: broker entered RUNNING state, process has stayed up for > than 1 seconds (startsecs)
kafka-cluster_1  | 2018-09-29 18:13:35,221 INFO success: smoke-tests entered RUNNING state, process has stayed up for > than 1 seconds (startsecs)
kafka-cluster_1  | 2018-09-29 18:13:35,221 INFO success: connect-distributed entered RUNNING state, process has stayed up for > than 1 seconds (startsecs)
kafka-cluster_1  | 2018-09-29 18:13:35,221 INFO success: logs-to-kafka entered RUNNING state, process has stayed up for > than 1 seconds (startsecs)
kafka-cluster_1  | 2018-09-29 18:13:35,221 INFO success: schema-registry entered RUNNING state, process has stayed up for > than 1 seconds (startsecs)
kafka-cluster_1  | 2018-09-29 18:13:35,221 INFO success: rest-proxy entered RUNNING state, process has stayed up for > than 1 seconds (startsecs)
kafka-cluster_1  | 2018-09-29 18:13:36,473 INFO exited: rest-proxy (exit status 1; not expected)
kafka-cluster_1  | 2018-09-29 18:13:36,502 INFO spawned: 'rest-proxy' with pid 281
kafka-cluster_1  | 2018-09-29 18:13:36,788 INFO exited: schema-registry (exit status 1; not expected)
kafka-cluster_1  | 2018-09-29 18:13:36,858 INFO spawned: 'schema-registry' with pid 328
kafka-cluster_1  | 2018-09-29 18:13:37,762 INFO success: rest-proxy entered RUNNING state, process has stayed up for > than 1 seconds (startsecs)
kafka-cluster_1  | 2018-09-29 18:13:38,020 INFO success: schema-registry entered RUNNING state, process has stayed up for > than 1 seconds (startsecs)

Open browser and check Landoop fast-data-dev services is running. This UI provides an overview of Kafka cluster with topic and broker insights.


Kafka Connect Source(Standalone mode) 

Standalone mode is best way to get started with minimum config and infrastructure setup. In this section we will see how to configure a connector in Standalone mode and transfer file content to Kafka topic.
Standalone mode Kafka Connect Architecture

For starting any Kafka connect cluster we requires - workers config and connector (file-stream) config. Follow below steps to setup Kafka connect in standalone mode with FileStream connector

Step-1: Create two files: workers-config.properties  and file-stream-connector-properties. I am creating these files under directory /Users/n0r0082/Kafka/Kafka-connect where docker-compose.yml was created.

➜  Kafka-connect pwd
/Users/n0r0082/Kafka/Kafka-connect
➜  Kafka-connect ls
docker-compose.yml
➜  Kafka-connect touch workers-config.properties
➜  Kafka-connect vi workers-config.properties 
➜  Kafka-connect touch file-stream-connector-properties
➜  Kafka-connect vi file-stream-connector-properties 
➜  Kafka-connect ls 
docker-compose.yml     file-stream-connector-properties workers-config.properties
workers-config.properties
bootstrap.servers=127.0.0.1:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
# we always leave the internal key to JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter.schemas.enable=false
# Rest API
rest.port=8086
rest.host.name=127.0.0.1
# this config is only for standalone workers
offset.storage.file.filename=standalone.offsets
offset.flush.interval.ms=10000

file-stream-connector-properties
# These are standard kafka connect parameters, need for ALL connectors
name=file-stream-kafka-connect-standalone
connector.class=org.apache.kafka.connect.file.FileStreamSourceConnector
tasks.max=1
# Parameters can be found here: https://github.com/apache/kafka/blob/trunk/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java
file=source-input.txt
topic=kafka-connect-standalone

Step-2:Create an input file source-input.txt, the content of the file is transferred to Kafka topic.
➜  Kafka-connect touch source-input.txt
Step-3: Mount a host directory in a docker container: Make sure we are in directory where we have created 2 config file and 1 source file. After mount we automatically switch to fast-data-dev shell.
➜  Kafka-connect pwd
/Users/n0r0082/Kafka/Kafka-connect
➜  Kafka-connect docker run --rm -it -v "$(pwd)":/kafka-connect/ --net=host landoop/fast-data-dev bash
root@fast-data-dev / $ pwd
/
root@fast-data-dev / $ ls
bin         connectors  dev  extra-connect-jars  kafka-connect  lib64  mnt  proc  run   srv  tmp  var
build.info  data        etc  home                lib            media  opt  root  sbin  sys  usr
root@fast-data-dev / $ cd kafka-connect/
root@fast-data-dev kafka-connect $ ls
docker-compose.yml  file-stream-connector-properties  source-input.txt  workers-config.properties
root@fast-data-dev kafka-connect $ 

Step-4: Create Kafka topic "kafka-connect-standalone" with 3 partition and replication factor 1. Topic created can be validated form in browser at address http://127.0.0.1:3030/kafka-topics-ui/#/.
root@fast-data-dev kafka-connect $ kafka-topics --create --topic kafka-connect-standalone --partitions 3 --replication-factor 1 --zookeeper 127.0.0.1:2181
Created topic "kafka-connect-standalone".

Step-5: Create standalone connector using command "connect-standalone". General syntax of connect-standalone:
connect-standalone <worker-properties> <connector1-properties[, connector-2,connector-2]> 
Create standalone connector using workers-config.properties and file-stream-connector-properties.
root@fast-data-dev kafka-connect $ connect-standalone workers-config.properties file-stream-connector-properties
[2018-10-02 08:24:57,859] INFO Kafka Connect standalone worker initializing ... (org.apache.kafka.connect.cli.ConnectStandalone:65)
[2018-10-02 08:24:57,886] INFO WorkerInfo values: 
 jvm.args = -Xmx256M, -XX:+UseG1GC, -XX:MaxGCPauseMillis=20, -XX:InitiatingHeapOccupancyPercent=35, -XX:+ExplicitGCInvokesConcurrent, -Djava.awt.headless=true, -Dcom.sun.management.jmxremote, -Dcom.sun.management.jmxremote.authenticate=false, -Dcom.sun.management.jmxremote.ssl=false, -Dkafka.logs.dir=/opt/la
    
....
[2018-10-02 08:24:57,892] INFO Scanning for plugin classes. This might take a moment ... (org.apache.kafka.connect.cli.ConnectStandalone:74)
[2018-10-02 08:25:04,335] INFO Registered loader: sun.misc.Launcher$AppClassLoader@764c12b6 (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:
[2018-10-02 08:25:04,384] INFO Added aliases 'FileStreamSinkConnector' and 'FileStreamSink' to plugin 'org.apache.kafka.connect.file.FileStreamSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:335)
[2018-10-02 08:25:04,384] INFO Added aliases 'FileStreamSourceConnector' and 'FileStreamSource' to plugin 'org.apache.kafka.connect.file.FileStreamSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:335)
[2018-10-02 08:25:04,385] INFO Added aliases 'MockConnector' and 'Mock' to plugin 'org.apache.kafka.connect.tools.MockConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:335)
[2018-10-02 08:25:04,385] INFO Added aliases 'MockSinkConnector' and 'MockSink' to plugin 'org.apache.kafka.connect.tools.MockSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:335)
....
....
[2018-10-02 08:25:04,425] INFO StandaloneConfig values: 
 access.control.allow.methods = 
 access.control.allow.origin = 
 bootstrap.servers = [127.0.0.1:9092]
 internal.key.converter = class org.apache.kafka.connect.json.JsonConverter
 internal.value.converter = class org.apache.kafka.connect.json.JsonConverter
 key.converter = class org.apache.kafka.connect.json.JsonConverter
 metric.reporters = []
 metrics.num.samples = 2
 metrics.recording.level = INFO
 metrics.sample.window.ms = 30000
 offset.flush.interval.ms = 10000
 offset.flush.timeout.ms = 5000
 offset.storage.file.filename = standalone.offsets
 plugin.path = null
 rest.advertised.host.name = null
 rest.advertised.port = null
 rest.host.name = 127.0.0.1
 rest.port = 8086
 task.shutdown.graceful.timeout.ms = 5000
 value.converter = class org.apache.kafka.connect.json.JsonConverter
 (org.apache.kafka.connect.runtime.standalone.StandaloneConfig:238)
.....
.....
[2018-10-02 08:25:06,325] INFO REST server listening at http://127.0.0.1:8086/, advertising URL http://127.0.0.1:8086/ (org.apache.kafka.connect.runtime.rest.RestServer:150)
[2018-10-02 08:25:06,325] INFO Kafka Connect started (org.apache.kafka.connect.runtime.Connect:55)
....
....
[2018-10-02 08:25:06,469] INFO Kafka version : 1.0.1-L0 (org.apache.kafka.common.utils.AppInfoParser:109)
[2018-10-02 08:25:06,469] INFO Kafka commitId : c0518aa65f25317e (org.apache.kafka.common.utils.AppInfoParser:110)
[2018-10-02 08:25:06,485] INFO WorkerSourceTask{id=file-stream-kafka-connect-standalone-0} Source task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSourceTask:158)
[2018-10-02 08:25:06,485] INFO Created connector file-stream-kafka-connect-standalone (org.apache.kafka.connect.cli.ConnectStandalone:99)
[2018-10-02 08:25:16,484] INFO WorkerSourceTask{id=file-stream-kafka-connect-standalone-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:306)
[2018-10-02 08:25:16,485] INFO WorkerSourceTask{id=file-stream-kafka-connect-standalone-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:323)

Step-6: Action time, Open file source-input.txt and type some message to it & save it. Message should have been transferred to Kafka topic. Open http://127.0.0.1:3030/kafka-topics-ui and monitor topic. We should receive message what we wrote in source-input.txt.
Message written in source-input.txt transferred to topic by FileStreamSourceConnector
What happened in background: We wrote data to source file and Kafka connect standalone pushed the data to topic. No programming just configs. !!

What is significance of file standalone.offsets ? 
Stop Kafka Connect (press Ctrl/Command + C). Once Kafka connect gracefully stopped, list all files in given directory. We have a new guest in the form of a file "standalone.offsets".
root@fast-data-dev kafka-connect $ pwd
/kafka-connect
root@fast-data-dev kafka-connect $ ls
docker-compose.yml  file-stream-connector-properties  source-input.txt  standalone.offsets  workers-config.properties
root@fast-data-dev kafka-connect $ 
This file is created by Kafka connect to keep track of from where it should resume reading message from source file on re-starts.
i.e: When Kafka connect starts again from where it should resume reading without publishing duplicate message to topic. Try to execute above command (connect-standalone workers-config.properties file-stream-connector-properties) again and validate in Kafka topic UI we do not have duplicate message.

 Part-2: Kafka Connect Source (Distributed mode)

Sep 30, 2018

Java Client to publish message in tibco queue

TIBCO Enterprise Message Service is a standards-based messaging platform (fully TCK certified to both the JMS 1.1 and 2.0 standards) that simplifies and accelerates the integration and management of data distribution in high-performance, enterprise environments.(Source: Wiki). In this post we will create a Java client which publishes message in Tiibco Queue.

1. Create a Maven project and add following dependencies in pom.xml.
<dependencies>
    <!-- https://mvnrepository.com/artifact/javax.jms/javax.jms-api -->
    <dependency>
        <groupId>javax.jms</groupId>
        <artifactId>jms</artifactId>
        <version>1.1</version>
    </dependency>
    <dependency>
        <groupId>tibco.jms</groupId>
        <artifactId>tibco_jms</artifactId>
        <version>6.1</version>
    </dependency>
</dependencies> 

2. Create a Java class TibcoQueue which manages tibco queue details.
/**
 * @author www.devinline.com(nikhil)
 *
 */
public class TibcoQueue {
 private String tibcoInstance;
 private String tibcoQueueName;
 private String userName;
 private String passWord;

 /**
  * @param tibcoInstance
  * @param tibcoQueueName
  * @param userName
  * @param passWord
  */
 public TibcoQueue(String tibcoInstance, String tibcoQueueName, 
   String userName, String passWord) {
  this.tibcoInstance = tibcoInstance;
  this.tibcoQueueName = tibcoQueueName;
  this.userName = userName;
  this.passWord = passWord;
 }

 public String getTibcoInstance() {
  return tibcoInstance;
 }

 public void setTibcoInstance(String tibcoInstance) {
  this.tibcoInstance = tibcoInstance;
 }

 public String getTibcoQueueName() {
  return tibcoQueueName;
 }

 public void setTibcoQueueName(String tibcoQueueName) {
  this.tibcoQueueName = tibcoQueueName;
 }

 public String getUserName() {
  return userName;
 }

 public void setUserName(String userName) {
  this.userName = userName;
 }

 public String getPassWord() {
  return passWord;
 }

 public void setPassWord(String passWord) {
  this.passWord = passWord;
 }
}

3. Create a Java class which does all configuration and send message to queue.
import java.util.List;

import javax.jms.DeliveryMode;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;

import com.tibco.tibjms.TibjmsQueueConnectionFactory;

/**
 * @author www.devinline.com (nikhil)
 *
 */

public class TibcoQueueUtils {
 private QueueConnection queueConnection;
 private QueueSession queueSession;
 private Queue sendQueue;
 private MessageProducer messageProducer;

 private void configureTibcoConnection(TibcoQueue tibcoQueue) throws Exception {
  QueueConnectionFactory queueConnectionFactory = 
    new TibjmsQueueConnectionFactory(tibcoQueue.getTibcoInstance());
  queueConnection = queueConnectionFactory.createQueueConnection(tibcoQueue.getUserName(),
    tibcoQueue.getPassWord());
  queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
  sendQueue = queueSession.createQueue(tibcoQueue.getTibcoQueueName());
 }

 private void configureProducer(TibcoQueue tibcoQueue) throws Exception {
  configureTibcoConnection(tibcoQueue);
  messageProducer = queueSession.createProducer(sendQueue);
  messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
  queueConnection.start();
 }

 private void closeProducer() throws Exception {
  messageProducer.close();
  closeTibcoConnection();
 }

 private void closeTibcoConnection() throws Exception {
  queueSession.close();
  queueConnection.stop();
  queueConnection.close();
 }

 public synchronized void sendMessage(String messageText, 
   TibcoQueue tibcoQueue) throws Exception {
  configureProducer(tibcoQueue);
  TextMessage message = queueSession.createTextMessage(messageText);
  messageProducer.send(message);
  closeProducer();
 }

 public synchronized void sendMessages(List<String> messageTexts, 
   TibcoQueue tibcoQueue) throws Exception {
  messageTexts.forEach(messageText -> {
   try {
    sendMessage(messageText, tibcoQueue);
   } catch (Exception e) {
    e.printStackTrace();
   }
  });
 }
}

4. Create Java class with main method which executes sendMessage() method and initiates message sending to queue.
import com.devinline.jms.tibco.utils.TibcoQueue;
import com.devinline.jms.utils.JMSConstants;
import com.devinline.jms.tibco.utils.TibcoQueueUtils;
/**
 * @author www.devinline.com (nikhil)
 *
 */
public class TibcoQueueClientPublisher {
 public static void main(String[] args) throws Exception {
  String message = "{\"order\":{\"orderNo\":\"XYZ1234\",\"status\":\"COMPLETED\"}}";
  /**/
  TibcoQueue tibcoQueue = new TibcoQueue(
    JMSConstants.BROKER_URL, 
    JMSConstants.QUEUE_NAME,
    JMSConstants.QUEUE_USERNAME, 
    JMSConstants.QUEUE_PASSWORD);
  
  TibcoQueueUtils queueUtil = new TibcoQueueUtils();
  System.out.println("Sending message");
  queueUtil.sendMessage(message,tibcoQueue);
  System.out.println("Message sent successfully!!");
 }
}

Note: Create JMSConstants class with fields broker_url, queueName, username and password and use it in above class accordingly.