Kafka Connect (or Connect API) is a framework to import/export data from/to other systems and it internally uses the Producer and Consumer API. The Connect API defines the programming interface which is implemented to build a concrete connector which has actual logic to read/write data from other system. These other system can be Databases, Couchbase, Sap, HANA, Blockchain, Cassandra, FTP, Twitter, etc.
Kafka connect API includes both Producer API (for Source -> Kafka) and Consumer API (for Kafka -> Sink). Like Kafka cluster consists of multiple brokers, Kafka connect cluster is collection of workers (Servers).
 |
Kafka connect - Source and Sink interaction |
Kafka Connect workers Standalone vs Distributed Mode:
Standalone mode: Single process runs our connectors and tasks(Connectors + User configuration => Task). Configuration is bundled with processes. Its easy to get started but not fault tolerant and no scalability.
Distributed mode: multiple workers run our connectors and tanks. Configuration is submitted using REST API. Scalable and fault tolerant(rebalances on worker failure).
Install Docker and setup Kafka connector:
1. Refer
this and install docker as per your operating system. I have installed docker in Mac and validate it is up and running. Start docker and wait for a moment to get started.
 |
Docker is up and running with version validation in terminal |
2. Create a file
docker-compose.yml and copy & paste following configs in it. (Or download
docker-compose.yml).
version: '2'
services:
# this is our kafka cluster.
kafka-cluster:
image: landoop/fast-data-dev:cp3.3.0
environment:
ADV_HOST: 127.0.0.1 # Change to 192.168.99.100 if using Docker Toolbox
RUNTESTS: 0 # Disable Running tests so the cluster starts faster
ports:
- 2181:2181 # Zookeeper
- 3030:3030 # Landoop UI
- 8081-8083:8081-8083 # REST Proxy, Schema Registry, Kafka Connect ports
- 9581-9585:9581-9585 # JMX Ports
- 9092:9092 # Kafka Broker
# we will use elasticsearch as one of our sinks.
# This configuration allows you to start elasticsearch
elasticsearch:
image: itzg/elasticsearch:2.4.3
environment:
PLUGINS: appbaseio/dejavu
OPTS: -Dindex.number_of_shards=1 -Dindex.number_of_replicas=0
ports:
- "9200:9200"
# we will use postgres as one of our sinks.
# This configuration allows you to start postgres
postgres:
image: postgres:9.5-alpine
environment:
POSTGRES_USER: postgres # define credentials
POSTGRES_PASSWORD: postgres # define credentials
POSTGRES_DB: postgres # define database
ports:
- 5432:5432 # Postgres port
3. Go to the directory where we have created this yaml file and execute following command to start Kafka cluster. When below command is ran very first time it download Landoop fast-data-dev image(highlighted in green). Once image is downloaded it creates a Kafka cluster and it can be accessed via browser at address 127.0.0.1:3030
➜ Kafka-connect pwd
/Users/n0r0082/Kafka/Kafka-connect
➜ Kafka-connect docker-compose up kafka-cluster
Creating network "code_default" with the default driver
Pulling kafka-cluster (landoop/fast-data-dev:cp3.3.0)...
cp3.3.0: Pulling from landoop/fast-data-dev
b56ae66c2937: Pull complete
......
......
990edf28e90a: Pull complete
Digest: sha256:0177ed6416dd0a549a6ec5028e0d19d93b323d03086a452976c251bb9d6a54e4
Status: Downloaded newer image for landoop/fast-data-dev:cp3.3.0
Creating code_kafka-cluster_1 ... done
Attaching to code_kafka-cluster_1
kafka-cluster_1 | Setting advertised host to 127.0.0.1.
kafka-cluster_1 | Starting services.
kafka-cluster_1 | This is landoop’s fast-data-dev. Kafka 0.11.0.0, Confluent OSS 3.3.0.
kafka-cluster_1 | You may visit http://127.0.0.1:3030 in about a minute.
kafka-cluster_1 | 2018-09-29 18:13:33,195 CRIT Supervisor running as root (no user in config file)
kafka-cluster_1 | 2018-09-29 18:13:33,195 WARN Included extra file "/etc/supervisord.d/01-zookeeper.conf" during parsing
kafka-cluster_1 | 2018-09-29 18:13:33,195 WARN Included extra file "/etc/supervisord.d/02-broker.conf" during parsing
kafka-cluster_1 | 2018-09-29 18:13:33,195 WARN Included extra file "/etc/supervisord.d/03-schema-registry.conf" during parsing
kafka-cluster_1 | 2018-09-29 18:13:33,195 WARN Included extra file "/etc/supervisord.d/04-rest-proxy.conf" during parsing
kafka-cluster_1 | 2018-09-29 18:13:33,195 WARN Included extra file "/etc/supervisord.d/05-connect-distributed.conf" during parsing
kafka-cluster_1 | 2018-09-29 18:13:33,195 WARN Included extra file "/etc/supervisord.d/06-caddy.conf" during parsing
kafka-cluster_1 | 2018-09-29 18:13:33,195 WARN Included extra file "/etc/supervisord.d/07-smoke-tests.conf" during parsing
kafka-cluster_1 | 2018-09-29 18:13:33,195 WARN Included extra file "/etc/supervisord.d/08-logs-to-kafka.conf" during parsing
kafka-cluster_1 | 2018-09-29 18:13:33,195 WARN Included extra file "/etc/supervisord.d/99-supervisord-sample-data.conf" during parsing
kafka-cluster_1 | 2018-09-29 18:13:33,196 INFO supervisord started with pid 7
kafka-cluster_1 | 2018-09-29 18:13:34,205 INFO spawned: 'sample-data' with pid 97
kafka-cluster_1 | 2018-09-29 18:13:34,207 INFO spawned: 'zookeeper' with pid 98
kafka-cluster_1 | 2018-09-29 18:13:34,209 INFO spawned: 'caddy' with pid 99
kafka-cluster_1 | 2018-09-29 18:13:34,211 INFO spawned: 'broker' with pid 101
kafka-cluster_1 | 2018-09-29 18:13:34,213 INFO spawned: 'smoke-tests' with pid 102
kafka-cluster_1 | 2018-09-29 18:13:34,215 INFO spawned: 'connect-distributed' with pid 104
kafka-cluster_1 | 2018-09-29 18:13:34,217 INFO spawned: 'logs-to-kafka' with pid 108
kafka-cluster_1 | 2018-09-29 18:13:34,219 INFO spawned: 'schema-registry' with pid 110
kafka-cluster_1 | 2018-09-29 18:13:34,221 INFO spawned: 'rest-proxy' with pid 111
kafka-cluster_1 | 2018-09-29 18:13:35,220 INFO success: sample-data entered RUNNING state, process has stayed up for > than 1 seconds (startsecs)
kafka-cluster_1 | 2018-09-29 18:13:35,220 INFO success: zookeeper entered RUNNING state, process has stayed up for > than 1 seconds (startsecs)
kafka-cluster_1 | 2018-09-29 18:13:35,221 INFO success: caddy entered RUNNING state, process has stayed up for > than 1 seconds (startsecs)
kafka-cluster_1 | 2018-09-29 18:13:35,221 INFO success: broker entered RUNNING state, process has stayed up for > than 1 seconds (startsecs)
kafka-cluster_1 | 2018-09-29 18:13:35,221 INFO success: smoke-tests entered RUNNING state, process has stayed up for > than 1 seconds (startsecs)
kafka-cluster_1 | 2018-09-29 18:13:35,221 INFO success: connect-distributed entered RUNNING state, process has stayed up for > than 1 seconds (startsecs)
kafka-cluster_1 | 2018-09-29 18:13:35,221 INFO success: logs-to-kafka entered RUNNING state, process has stayed up for > than 1 seconds (startsecs)
kafka-cluster_1 | 2018-09-29 18:13:35,221 INFO success: schema-registry entered RUNNING state, process has stayed up for > than 1 seconds (startsecs)
kafka-cluster_1 | 2018-09-29 18:13:35,221 INFO success: rest-proxy entered RUNNING state, process has stayed up for > than 1 seconds (startsecs)
kafka-cluster_1 | 2018-09-29 18:13:36,473 INFO exited: rest-proxy (exit status 1; not expected)
kafka-cluster_1 | 2018-09-29 18:13:36,502 INFO spawned: 'rest-proxy' with pid 281
kafka-cluster_1 | 2018-09-29 18:13:36,788 INFO exited: schema-registry (exit status 1; not expected)
kafka-cluster_1 | 2018-09-29 18:13:36,858 INFO spawned: 'schema-registry' with pid 328
kafka-cluster_1 | 2018-09-29 18:13:37,762 INFO success: rest-proxy entered RUNNING state, process has stayed up for > than 1 seconds (startsecs)
kafka-cluster_1 | 2018-09-29 18:13:38,020 INFO success: schema-registry entered RUNNING state, process has stayed up for > than 1 seconds (startsecs)
Open browser and check
Landoop fast-data-dev services is running. This UI provides an overview of Kafka cluster with topic and broker insights.
Kafka Connect Source(Standalone mode)
Standalone mode is best way to get started with minimum config and infrastructure setup. In this section we will see how to configure a connector in Standalone mode and transfer file content to Kafka topic.
 |
Standalone mode Kafka Connect Architecture |
For starting any Kafka connect cluster we requires - workers config and connector (file-stream) config. Follow below steps to setup Kafka connect in standalone mode with FileStream connector
Step-1: Create two files: workers-config.properties and file-stream-connector-properties. I am creating these files under directory /Users/n0r0082/Kafka/Kafka-connect where docker-compose.yml
was created.
➜ Kafka-connect pwd
/Users/n0r0082/Kafka/Kafka-connect
➜ Kafka-connect ls
docker-compose.yml
➜ Kafka-connect touch workers-config.properties
➜ Kafka-connect vi workers-config.properties
➜ Kafka-connect touch file-stream-connector-properties
➜ Kafka-connect vi file-stream-connector-properties
➜ Kafka-connect ls
docker-compose.yml file-stream-connector-properties workers-config.properties
workers-config.properties
bootstrap.servers=127.0.0.1:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
# we always leave the internal key to JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter.schemas.enable=false
# Rest API
rest.port=8086
rest.host.name=127.0.0.1
# this config is only for standalone workers
offset.storage.file.filename=standalone.offsets
offset.flush.interval.ms=10000
file-stream-connector-properties
# These are standard kafka connect parameters, need for ALL connectors
name=file-stream-kafka-connect-standalone
connector.class=org.apache.kafka.connect.file.FileStreamSourceConnector
tasks.max=1
# Parameters can be found here: https://github.com/apache/kafka/blob/trunk/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java
file=source-input.txt
topic=kafka-connect-standalone
Step-2:Create an input file source-input.txt, the content of the file is transferred to Kafka topic.
➜ Kafka-connect touch source-input.txt
Step-3: Mount a host directory in a docker container: Make sure we are in directory where we have created 2 config file and 1 source file. After mount we automatically switch to fast-data-dev shell.
➜ Kafka-connect pwd
/Users/n0r0082/Kafka/Kafka-connect
➜ Kafka-connect docker run --rm -it -v "$(pwd)":/kafka-connect/ --net=host landoop/fast-data-dev bash
root@fast-data-dev / $ pwd
/
root@fast-data-dev / $ ls
bin connectors dev extra-connect-jars kafka-connect lib64 mnt proc run srv tmp var
build.info data etc home lib media opt root sbin sys usr
root@fast-data-dev / $ cd kafka-connect/
root@fast-data-dev kafka-connect $ ls
docker-compose.yml file-stream-connector-properties source-input.txt workers-config.properties
root@fast-data-dev kafka-connect $
Step-4: Create Kafka topic "kafka-connect-standalone" with 3 partition and replication factor 1. Topic created can be validated form in browser at address
http://127.0.0.1:3030/kafka-topics-ui/#/.
root@fast-data-dev kafka-connect $ kafka-topics --create --topic kafka-connect-standalone --partitions 3 --replication-factor 1 --zookeeper 127.0.0.1:2181
Created topic "kafka-connect-standalone".
Step-5: Create standalone connector using command "connect-standalone". General syntax of connect-standalone:
connect-standalone <worker-properties> <connector1-properties[, connector-2,connector-2]>
Create standalone connector using
workers-config.properties and
file-stream-connector-properties.
root@fast-data-dev kafka-connect $ connect-standalone workers-config.properties file-stream-connector-properties
[2018-10-02 08:24:57,859] INFO Kafka Connect standalone worker initializing ... (org.apache.kafka.connect.cli.ConnectStandalone:65)
[2018-10-02 08:24:57,886] INFO WorkerInfo values:
jvm.args = -Xmx256M, -XX:+UseG1GC, -XX:MaxGCPauseMillis=20, -XX:InitiatingHeapOccupancyPercent=35, -XX:+ExplicitGCInvokesConcurrent, -Djava.awt.headless=true, -Dcom.sun.management.jmxremote, -Dcom.sun.management.jmxremote.authenticate=false, -Dcom.sun.management.jmxremote.ssl=false, -Dkafka.logs.dir=/opt/la
....
[2018-10-02 08:24:57,892] INFO Scanning for plugin classes. This might take a moment ... (org.apache.kafka.connect.cli.ConnectStandalone:74)
[2018-10-02 08:25:04,335] INFO Registered loader: sun.misc.Launcher$AppClassLoader@764c12b6 (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:
[2018-10-02 08:25:04,384] INFO Added aliases 'FileStreamSinkConnector' and 'FileStreamSink' to plugin 'org.apache.kafka.connect.file.FileStreamSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:335)
[2018-10-02 08:25:04,384] INFO Added aliases 'FileStreamSourceConnector' and 'FileStreamSource' to plugin 'org.apache.kafka.connect.file.FileStreamSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:335)
[2018-10-02 08:25:04,385] INFO Added aliases 'MockConnector' and 'Mock' to plugin 'org.apache.kafka.connect.tools.MockConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:335)
[2018-10-02 08:25:04,385] INFO Added aliases 'MockSinkConnector' and 'MockSink' to plugin 'org.apache.kafka.connect.tools.MockSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:335)
....
....
[2018-10-02 08:25:04,425] INFO StandaloneConfig values:
access.control.allow.methods =
access.control.allow.origin =
bootstrap.servers = [127.0.0.1:9092]
internal.key.converter = class org.apache.kafka.connect.json.JsonConverter
internal.value.converter = class org.apache.kafka.connect.json.JsonConverter
key.converter = class org.apache.kafka.connect.json.JsonConverter
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
offset.flush.interval.ms = 10000
offset.flush.timeout.ms = 5000
offset.storage.file.filename = standalone.offsets
plugin.path = null
rest.advertised.host.name = null
rest.advertised.port = null
rest.host.name = 127.0.0.1
rest.port = 8086
task.shutdown.graceful.timeout.ms = 5000
value.converter = class org.apache.kafka.connect.json.JsonConverter
(org.apache.kafka.connect.runtime.standalone.StandaloneConfig:238)
.....
.....
[2018-10-02 08:25:06,325] INFO REST server listening at http://127.0.0.1:8086/, advertising URL http://127.0.0.1:8086/ (org.apache.kafka.connect.runtime.rest.RestServer:150)
[2018-10-02 08:25:06,325] INFO Kafka Connect started (org.apache.kafka.connect.runtime.Connect:55)
....
....
[2018-10-02 08:25:06,469] INFO Kafka version : 1.0.1-L0 (org.apache.kafka.common.utils.AppInfoParser:109)
[2018-10-02 08:25:06,469] INFO Kafka commitId : c0518aa65f25317e (org.apache.kafka.common.utils.AppInfoParser:110)
[2018-10-02 08:25:06,485] INFO WorkerSourceTask{id=file-stream-kafka-connect-standalone-0} Source task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSourceTask:158)
[2018-10-02 08:25:06,485] INFO Created connector file-stream-kafka-connect-standalone (org.apache.kafka.connect.cli.ConnectStandalone:99)
[2018-10-02 08:25:16,484] INFO WorkerSourceTask{id=file-stream-kafka-connect-standalone-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:306)
[2018-10-02 08:25:16,485] INFO WorkerSourceTask{id=file-stream-kafka-connect-standalone-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:323)
Step-6: Action time, Open file source-input.txt and type some message to it & save it. Message should have been transferred to Kafka topic. Open
http://127.0.0.1:3030/kafka-topics-ui and monitor topic. We should receive message what we wrote in source-input.txt.
 |
Message written in source-input.txt transferred to topic by FileStreamSourceConnector |
What happened in background: We wrote data to source file and Kafka connect standalone pushed the data to topic. No programming just configs. !!
What is significance of file standalone.offsets ?
Stop Kafka Connect (press Ctrl/Command + C). Once Kafka connect gracefully stopped, list all files in given directory. We have a new guest in the form of a file "standalone.offsets".
root@fast-data-dev kafka-connect $ pwd
/kafka-connect
root@fast-data-dev kafka-connect $ ls
docker-compose.yml file-stream-connector-properties source-input.txt standalone.offsets workers-config.properties
root@fast-data-dev kafka-connect $
This file is created by Kafka connect to keep track of from where it should resume reading message from source file on re-starts.
i.e: When Kafka connect starts again
from where it should resume reading without publishing duplicate message to topic. Try to execute above command (connect-standalone workers-config.properties file-stream-connector-properties) again and validate in Kafka topic UI we do not have duplicate message.
Part-2: Kafka Connect Source (Distributed mode)