Kafka Connect: Setup Kafka Connect Cluster(Docker Landoop fast-data-dev) and FileStream Source connector (Standalone and Distributed mode) Part-1

Kafka Connect (or Connect API) is a framework to import/export data from/to other systems and it internally uses the Producer and Consumer API. The Connect API defines the programming interface which is implemented to build a concrete connector which has actual logic to read/write data from other system. These other system can be Databases, Couchbase, Sap, HANA, Blockchain, Cassandra, FTP, Twitter, etc.
Kafka connect API includes both Producer API (for Source -> Kafka) and Consumer API (for Kafka -> Sink). Like Kafka cluster consists of multiple brokers, Kafka connect cluster is collection of workers (Servers).
Kafka connect - Source and Sink interaction

Kafka Connect workers Standalone vs Distributed Mode:
Standalone mode:  Single process runs our connectors and tasks(Connectors + User configuration => Task). Configuration is bundled with processes. Its easy to get started but not fault tolerant and no scalability.
Distributed mode:  multiple workers run our connectors and tanks. Configuration is submitted using REST API. Scalable and fault tolerant(rebalances on worker failure).

Install Docker and setup Kafka connector:

1. Refer this and install docker as per your operating system. I have installed docker in Mac and validate it is up and running. Start docker and wait for a moment to get started.
Docker is up and running with version validation in terminal

2. Create a file docker-compose.yml and copy & paste following configs in it. (Or download docker-compose.yml).
version: '2'

services:
  # this is our kafka cluster.
  kafka-cluster:
    image: landoop/fast-data-dev:cp3.3.0
    environment:
      ADV_HOST: 127.0.0.1         # Change to 192.168.99.100 if using Docker Toolbox
      RUNTESTS: 0                 # Disable Running tests so the cluster starts faster
    ports:
      - 2181:2181                 # Zookeeper
      - 3030:3030                 # Landoop UI
      - 8081-8083:8081-8083       # REST Proxy, Schema Registry, Kafka Connect ports
      - 9581-9585:9581-9585       # JMX Ports
      - 9092:9092                 # Kafka Broker

  # we will use elasticsearch as one of our sinks.
  # This configuration allows you to start elasticsearch
  elasticsearch:
    image: itzg/elasticsearch:2.4.3
    environment:
      PLUGINS: appbaseio/dejavu
      OPTS: -Dindex.number_of_shards=1 -Dindex.number_of_replicas=0
    ports:
      - "9200:9200"

  # we will use postgres as one of our sinks.
  # This configuration allows you to start postgres
  postgres:
    image: postgres:9.5-alpine
    environment:
      POSTGRES_USER: postgres     # define credentials
      POSTGRES_PASSWORD: postgres # define credentials
      POSTGRES_DB: postgres       # define database
    ports:
      - 5432:5432                 # Postgres port

3. Go to the directory where we have created this yaml file and execute following command to start Kafka cluster. When below command is ran very first time it download Landoop fast-data-dev image(highlighted in green). Once image is downloaded it creates a Kafka cluster and it can be accessed via browser at address 127.0.0.1:3030

➜  Kafka-connect pwd
/Users/n0r0082/Kafka/Kafka-connect
➜  Kafka-connect docker-compose up kafka-cluster 
Creating network "code_default" with the default driver
Pulling kafka-cluster (landoop/fast-data-dev:cp3.3.0)...
cp3.3.0: Pulling from landoop/fast-data-dev
b56ae66c2937: Pull complete
......
......
990edf28e90a: Pull complete
Digest: sha256:0177ed6416dd0a549a6ec5028e0d19d93b323d03086a452976c251bb9d6a54e4
Status: Downloaded newer image for landoop/fast-data-dev:cp3.3.0
Creating code_kafka-cluster_1 ... done
Attaching to code_kafka-cluster_1
kafka-cluster_1  | Setting advertised host to 127.0.0.1.
kafka-cluster_1  | Starting services.
kafka-cluster_1  | This is landoop’s fast-data-dev. Kafka 0.11.0.0, Confluent OSS 3.3.0.
kafka-cluster_1  | You may visit http://127.0.0.1:3030 in about a minute.
kafka-cluster_1  | 2018-09-29 18:13:33,195 CRIT Supervisor running as root (no user in config file)
kafka-cluster_1  | 2018-09-29 18:13:33,195 WARN Included extra file "/etc/supervisord.d/01-zookeeper.conf" during parsing
kafka-cluster_1  | 2018-09-29 18:13:33,195 WARN Included extra file "/etc/supervisord.d/02-broker.conf" during parsing
kafka-cluster_1  | 2018-09-29 18:13:33,195 WARN Included extra file "/etc/supervisord.d/03-schema-registry.conf" during parsing
kafka-cluster_1  | 2018-09-29 18:13:33,195 WARN Included extra file "/etc/supervisord.d/04-rest-proxy.conf" during parsing
kafka-cluster_1  | 2018-09-29 18:13:33,195 WARN Included extra file "/etc/supervisord.d/05-connect-distributed.conf" during parsing
kafka-cluster_1  | 2018-09-29 18:13:33,195 WARN Included extra file "/etc/supervisord.d/06-caddy.conf" during parsing
kafka-cluster_1  | 2018-09-29 18:13:33,195 WARN Included extra file "/etc/supervisord.d/07-smoke-tests.conf" during parsing
kafka-cluster_1  | 2018-09-29 18:13:33,195 WARN Included extra file "/etc/supervisord.d/08-logs-to-kafka.conf" during parsing
kafka-cluster_1  | 2018-09-29 18:13:33,195 WARN Included extra file "/etc/supervisord.d/99-supervisord-sample-data.conf" during parsing
kafka-cluster_1  | 2018-09-29 18:13:33,196 INFO supervisord started with pid 7
kafka-cluster_1  | 2018-09-29 18:13:34,205 INFO spawned: 'sample-data' with pid 97
kafka-cluster_1  | 2018-09-29 18:13:34,207 INFO spawned: 'zookeeper' with pid 98
kafka-cluster_1  | 2018-09-29 18:13:34,209 INFO spawned: 'caddy' with pid 99
kafka-cluster_1  | 2018-09-29 18:13:34,211 INFO spawned: 'broker' with pid 101
kafka-cluster_1  | 2018-09-29 18:13:34,213 INFO spawned: 'smoke-tests' with pid 102
kafka-cluster_1  | 2018-09-29 18:13:34,215 INFO spawned: 'connect-distributed' with pid 104
kafka-cluster_1  | 2018-09-29 18:13:34,217 INFO spawned: 'logs-to-kafka' with pid 108
kafka-cluster_1  | 2018-09-29 18:13:34,219 INFO spawned: 'schema-registry' with pid 110
kafka-cluster_1  | 2018-09-29 18:13:34,221 INFO spawned: 'rest-proxy' with pid 111
kafka-cluster_1  | 2018-09-29 18:13:35,220 INFO success: sample-data entered RUNNING state, process has stayed up for > than 1 seconds (startsecs)
kafka-cluster_1  | 2018-09-29 18:13:35,220 INFO success: zookeeper entered RUNNING state, process has stayed up for > than 1 seconds (startsecs)
kafka-cluster_1  | 2018-09-29 18:13:35,221 INFO success: caddy entered RUNNING state, process has stayed up for > than 1 seconds (startsecs)
kafka-cluster_1  | 2018-09-29 18:13:35,221 INFO success: broker entered RUNNING state, process has stayed up for > than 1 seconds (startsecs)
kafka-cluster_1  | 2018-09-29 18:13:35,221 INFO success: smoke-tests entered RUNNING state, process has stayed up for > than 1 seconds (startsecs)
kafka-cluster_1  | 2018-09-29 18:13:35,221 INFO success: connect-distributed entered RUNNING state, process has stayed up for > than 1 seconds (startsecs)
kafka-cluster_1  | 2018-09-29 18:13:35,221 INFO success: logs-to-kafka entered RUNNING state, process has stayed up for > than 1 seconds (startsecs)
kafka-cluster_1  | 2018-09-29 18:13:35,221 INFO success: schema-registry entered RUNNING state, process has stayed up for > than 1 seconds (startsecs)
kafka-cluster_1  | 2018-09-29 18:13:35,221 INFO success: rest-proxy entered RUNNING state, process has stayed up for > than 1 seconds (startsecs)
kafka-cluster_1  | 2018-09-29 18:13:36,473 INFO exited: rest-proxy (exit status 1; not expected)
kafka-cluster_1  | 2018-09-29 18:13:36,502 INFO spawned: 'rest-proxy' with pid 281
kafka-cluster_1  | 2018-09-29 18:13:36,788 INFO exited: schema-registry (exit status 1; not expected)
kafka-cluster_1  | 2018-09-29 18:13:36,858 INFO spawned: 'schema-registry' with pid 328
kafka-cluster_1  | 2018-09-29 18:13:37,762 INFO success: rest-proxy entered RUNNING state, process has stayed up for > than 1 seconds (startsecs)
kafka-cluster_1  | 2018-09-29 18:13:38,020 INFO success: schema-registry entered RUNNING state, process has stayed up for > than 1 seconds (startsecs)

Open browser and check Landoop fast-data-dev services is running. This UI provides an overview of Kafka cluster with topic and broker insights.


Kafka Connect Source(Standalone mode) 

Standalone mode is best way to get started with minimum config and infrastructure setup. In this section we will see how to configure a connector in Standalone mode and transfer file content to Kafka topic.
Standalone mode Kafka Connect Architecture

For starting any Kafka connect cluster we requires - workers config and connector (file-stream) config. Follow below steps to setup Kafka connect in standalone mode with FileStream connector

Step-1: Create two files: workers-config.properties  and file-stream-connector-properties. I am creating these files under directory /Users/n0r0082/Kafka/Kafka-connect where docker-compose.yml was created.

➜  Kafka-connect pwd
/Users/n0r0082/Kafka/Kafka-connect
➜  Kafka-connect ls
docker-compose.yml
➜  Kafka-connect touch workers-config.properties
➜  Kafka-connect vi workers-config.properties 
➜  Kafka-connect touch file-stream-connector-properties
➜  Kafka-connect vi file-stream-connector-properties 
➜  Kafka-connect ls 
docker-compose.yml     file-stream-connector-properties workers-config.properties
workers-config.properties
bootstrap.servers=127.0.0.1:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
# we always leave the internal key to JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter.schemas.enable=false
# Rest API
rest.port=8086
rest.host.name=127.0.0.1
# this config is only for standalone workers
offset.storage.file.filename=standalone.offsets
offset.flush.interval.ms=10000

file-stream-connector-properties
# These are standard kafka connect parameters, need for ALL connectors
name=file-stream-kafka-connect-standalone
connector.class=org.apache.kafka.connect.file.FileStreamSourceConnector
tasks.max=1
# Parameters can be found here: https://github.com/apache/kafka/blob/trunk/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java
file=source-input.txt
topic=kafka-connect-standalone

Step-2:Create an input file source-input.txt, the content of the file is transferred to Kafka topic.
➜  Kafka-connect touch source-input.txt
Step-3: Mount a host directory in a docker container: Make sure we are in directory where we have created 2 config file and 1 source file. After mount we automatically switch to fast-data-dev shell.
➜  Kafka-connect pwd
/Users/n0r0082/Kafka/Kafka-connect
➜  Kafka-connect docker run --rm -it -v "$(pwd)":/kafka-connect/ --net=host landoop/fast-data-dev bash
root@fast-data-dev / $ pwd
/
root@fast-data-dev / $ ls
bin         connectors  dev  extra-connect-jars  kafka-connect  lib64  mnt  proc  run   srv  tmp  var
build.info  data        etc  home                lib            media  opt  root  sbin  sys  usr
root@fast-data-dev / $ cd kafka-connect/
root@fast-data-dev kafka-connect $ ls
docker-compose.yml  file-stream-connector-properties  source-input.txt  workers-config.properties
root@fast-data-dev kafka-connect $ 

Step-4: Create Kafka topic "kafka-connect-standalone" with 3 partition and replication factor 1. Topic created can be validated form in browser at address http://127.0.0.1:3030/kafka-topics-ui/#/.
root@fast-data-dev kafka-connect $ kafka-topics --create --topic kafka-connect-standalone --partitions 3 --replication-factor 1 --zookeeper 127.0.0.1:2181
Created topic "kafka-connect-standalone".

Step-5: Create standalone connector using command "connect-standalone". General syntax of connect-standalone:
connect-standalone <worker-properties> <connector1-properties[, connector-2,connector-2]> 
Create standalone connector using workers-config.properties and file-stream-connector-properties.
root@fast-data-dev kafka-connect $ connect-standalone workers-config.properties file-stream-connector-properties
[2018-10-02 08:24:57,859] INFO Kafka Connect standalone worker initializing ... (org.apache.kafka.connect.cli.ConnectStandalone:65)
[2018-10-02 08:24:57,886] INFO WorkerInfo values: 
 jvm.args = -Xmx256M, -XX:+UseG1GC, -XX:MaxGCPauseMillis=20, -XX:InitiatingHeapOccupancyPercent=35, -XX:+ExplicitGCInvokesConcurrent, -Djava.awt.headless=true, -Dcom.sun.management.jmxremote, -Dcom.sun.management.jmxremote.authenticate=false, -Dcom.sun.management.jmxremote.ssl=false, -Dkafka.logs.dir=/opt/la
    
....
[2018-10-02 08:24:57,892] INFO Scanning for plugin classes. This might take a moment ... (org.apache.kafka.connect.cli.ConnectStandalone:74)
[2018-10-02 08:25:04,335] INFO Registered loader: sun.misc.Launcher$AppClassLoader@764c12b6 (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:
[2018-10-02 08:25:04,384] INFO Added aliases 'FileStreamSinkConnector' and 'FileStreamSink' to plugin 'org.apache.kafka.connect.file.FileStreamSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:335)
[2018-10-02 08:25:04,384] INFO Added aliases 'FileStreamSourceConnector' and 'FileStreamSource' to plugin 'org.apache.kafka.connect.file.FileStreamSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:335)
[2018-10-02 08:25:04,385] INFO Added aliases 'MockConnector' and 'Mock' to plugin 'org.apache.kafka.connect.tools.MockConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:335)
[2018-10-02 08:25:04,385] INFO Added aliases 'MockSinkConnector' and 'MockSink' to plugin 'org.apache.kafka.connect.tools.MockSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:335)
....
....
[2018-10-02 08:25:04,425] INFO StandaloneConfig values: 
 access.control.allow.methods = 
 access.control.allow.origin = 
 bootstrap.servers = [127.0.0.1:9092]
 internal.key.converter = class org.apache.kafka.connect.json.JsonConverter
 internal.value.converter = class org.apache.kafka.connect.json.JsonConverter
 key.converter = class org.apache.kafka.connect.json.JsonConverter
 metric.reporters = []
 metrics.num.samples = 2
 metrics.recording.level = INFO
 metrics.sample.window.ms = 30000
 offset.flush.interval.ms = 10000
 offset.flush.timeout.ms = 5000
 offset.storage.file.filename = standalone.offsets
 plugin.path = null
 rest.advertised.host.name = null
 rest.advertised.port = null
 rest.host.name = 127.0.0.1
 rest.port = 8086
 task.shutdown.graceful.timeout.ms = 5000
 value.converter = class org.apache.kafka.connect.json.JsonConverter
 (org.apache.kafka.connect.runtime.standalone.StandaloneConfig:238)
.....
.....
[2018-10-02 08:25:06,325] INFO REST server listening at http://127.0.0.1:8086/, advertising URL http://127.0.0.1:8086/ (org.apache.kafka.connect.runtime.rest.RestServer:150)
[2018-10-02 08:25:06,325] INFO Kafka Connect started (org.apache.kafka.connect.runtime.Connect:55)
....
....
[2018-10-02 08:25:06,469] INFO Kafka version : 1.0.1-L0 (org.apache.kafka.common.utils.AppInfoParser:109)
[2018-10-02 08:25:06,469] INFO Kafka commitId : c0518aa65f25317e (org.apache.kafka.common.utils.AppInfoParser:110)
[2018-10-02 08:25:06,485] INFO WorkerSourceTask{id=file-stream-kafka-connect-standalone-0} Source task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSourceTask:158)
[2018-10-02 08:25:06,485] INFO Created connector file-stream-kafka-connect-standalone (org.apache.kafka.connect.cli.ConnectStandalone:99)
[2018-10-02 08:25:16,484] INFO WorkerSourceTask{id=file-stream-kafka-connect-standalone-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:306)
[2018-10-02 08:25:16,485] INFO WorkerSourceTask{id=file-stream-kafka-connect-standalone-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:323)

Step-6: Action time, Open file source-input.txt and type some message to it & save it. Message should have been transferred to Kafka topic. Open http://127.0.0.1:3030/kafka-topics-ui and monitor topic. We should receive message what we wrote in source-input.txt.
Message written in source-input.txt transferred to topic by FileStreamSourceConnector
What happened in background: We wrote data to source file and Kafka connect standalone pushed the data to topic. No programming just configs. !!

What is significance of file standalone.offsets ? 
Stop Kafka Connect (press Ctrl/Command + C). Once Kafka connect gracefully stopped, list all files in given directory. We have a new guest in the form of a file "standalone.offsets".
root@fast-data-dev kafka-connect $ pwd
/kafka-connect
root@fast-data-dev kafka-connect $ ls
docker-compose.yml  file-stream-connector-properties  source-input.txt  standalone.offsets  workers-config.properties
root@fast-data-dev kafka-connect $ 
This file is created by Kafka connect to keep track of from where it should resume reading message from source file on re-starts.
i.e: When Kafka connect starts again from where it should resume reading without publishing duplicate message to topic. Try to execute above command (connect-standalone workers-config.properties file-stream-connector-properties) again and validate in Kafka topic UI we do not have duplicate message.

 Part-2: Kafka Connect Source (Distributed mode)

4 Comments

  1. Thanks for posting useful information.You have provided an nice article, Thank you very much for this one. And i hope this will be useful for many people.. and i am waiting for your next post keep on updating these kinds of knowledgeable things...Really it was an awesome article...very interesting to read..please sharing like this information......
    Java training in Chennai | Java training in Chennai

    ReplyDelete
  2. I like your post very much. It is nice useful for my research. I wish for you to share more info about this. Keep blogging Apache Kafka Training in Electronic City

    ReplyDelete
  3. This site is incredible, and I was able to really enjoy what you wrote. I appreciate you sharing this, a lot.outstanding, excellent post Continue sharing articles of this kind. custom erp developers

    ReplyDelete
Previous Post Next Post