Oct 4, 2018

Textual description of firstImageUrl

Twitter Kafka connector : How to fetch tweets from twitter account using Kafka connector and publish it in another Kafka topic

In previous post we did setup Kafka connect cluster in docker(Landoop fast-data-dev) and created  FileStreamConnector in standalone mode and in distributed mode for transferring file content to Kafka topic. In this post we will use existing Docker Kafka connect setup to fetch tweets from twitter using TwitterConnector and publish in another topic.

Prerequisite

1. Setup Kafka connect. I have setup using docker Landoop fast-data-dev.
2. Create Twitter App account and setup an app. Twitter provides four different keys. Under Keys and access token tab we have following keys:
Application Settings
    Consumer Key (API Key)
    Consumer Secret (API Secret)
Your Access Token
    Access Token
    Access Token Secret

I have created an app "ZythamTwitterStreamTest" and ready with 4 keys. Start Kafka cluster using docker-compose.
➜  Kafka-connect docker-compose up kafka-cluster 
Starting kafkaconnect_kafka-cluster_1 ... done
Attaching to kafkaconnect_kafka-cluster_1

Create Kafka topic where tweets are published by TwitterKafkaConnector.
➜  Kafka-connect kafka-topics --create --topic kafka-connect-distributed-twitter --partitions 3 --replication-factor 1 --zookeeper 127.0.0.1:2181
Created topic "kafka-connect-distributed".

Create Kafka twitter connector for distributed mode transferring stream data to Kafka topic: Use below configuration for creating twitter Kafka connector.

name=source-twitter-distributed
connector.class=com.eneco.trading.kafka.connect.twitter.TwitterSourceConnector
tasks.max=1
topic=kafka-connect-distributed-twitter
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
twitter.consumerkey=LXPbdzppfUS51i9v3clDSBCBe
twitter.consumersecret=FkPxDo0TvvnDb9wnlXF7i3MEvRKpCo9BcAfxGXfSRMotss7zyF
twitter.token=67405754-X4LRN3IWJ9CP5JuPxs86iiKWev20o7Zi6g3IuHFpM
twitter.secret=Fi4qhOfXSsPQTEjIfCOwNNVNLxHyXiP3vMHIrUc9OjFCE
track.terms=Bangalore,Kafka,Cyclone
language=en


Start Kafka consumer listening topic "kafka-connect-distributed-twitter" :
root@fast-data-dev / $ kafka-console-consumer --topic kafka-connect-distributed-twitter --bootstrap-server 127.0.0.1:9092

Message received by consumer (JSON formatted): Below message received as tweet contains word "Bangalore".

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "int64",
        "optional": false,
        "field": "id"
      },
      {
        "type": "string",
        "optional": true,
        "field": "created_at"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "int64",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": true,
            "field": "name"
          },
          {
            "type": "string",
            "optional": true,
            "field": "screen_name"
          },
          {
            "type": "string",
            "optional": true,
            "field": "location"
          },
          {
            "type": "boolean",
            "optional": false,
            "field": "verified"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "friends_count"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "followers_count"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "statuses_count"
          }
        ],
        "optional": false,
        "name": "com.eneco.trading.kafka.connect.twitter.User",
        "field": "user"
      },
      {
        "type": "string",
        "optional": true,
        "field": "text"
      },
      {
        "type": "string",
        "optional": true,
        "field": "lang"
      },
      {
        "type": "boolean",
        "optional": false,
        "field": "is_retweet"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "array",
            "items": {
              "type": "struct",
              "fields": [
                {
                  "type": "string",
                  "optional": true,
                  "field": "text"
                }
              ],
              "optional": false,
              "name": "com.eneco.trading.kafka.connect.twitter.Hashtag"
            },
            "optional": true,
            "field": "hashtags"
          },
          {
            "type": "array",
            "items": {
              "type": "struct",
              "fields": [
                {
                  "type": "string",
                  "optional": true,
                  "field": "display_url"
                },
                {
                  "type": "string",
                  "optional": true,
                  "field": "expanded_url"
                },
                {
                  "type": "int64",
                  "optional": false,
                  "field": "id"
                },
                {
                  "type": "string",
                  "optional": true,
                  "field": "type"
                },
                {
                  "type": "string",
                  "optional": true,
                  "field": "url"
                }
              ],
              "optional": false,
              "name": "com.eneco.trading.kafka.connect.twitter.Medium"
            },
            "optional": true,
            "field": "media"
          },
          {
            "type": "array",
            "items": {
              "type": "struct",
              "fields": [
                {
                  "type": "string",
                  "optional": true,
                  "field": "display_url"
                },
                {
                  "type": "string",
                  "optional": true,
                  "field": "expanded_url"
                },
                {
                  "type": "string",
                  "optional": true,
                  "field": "url"
                }
              ],
              "optional": false,
              "name": "com.eneco.trading.kafka.connect.twitter.Url"
            },
            "optional": true,
            "field": "urls"
          },
          {
            "type": "array",
            "items": {
              "type": "struct",
              "fields": [
                {
                  "type": "int64",
                  "optional": false,
                  "field": "id"
                },
                {
                  "type": "string",
                  "optional": true,
                  "field": "name"
                },
                {
                  "type": "string",
                  "optional": true,
                  "field": "screen_name"
                }
              ],
              "optional": false,
              "name": "com.eneco.trading.kafka.connect.twitter.UserMention"
            },
            "optional": true,
            "field": "user_mentions"
          }
        ],
        "optional": false,
        "name": "com.eneco.trading.kafka.connect.twitter.Entities",
        "field": "entities"
      }
    ],
    "optional": false,
    "name": "com.eneco.trading.kafka.connect.twitter.Tweet"
  },
  "payload": {
    "id": 1047866309655101400,
    "created_at": "2018-10-04T15:09:31.000+0000",
    "user": {
      "id": 166235967,
      "name": "Bangalore Informer",
      "screen_name": "bangaloreinform",
      "location": "Bangalore",
      "verified": false,
      "friends_count": 27,
      "followers_count": 402,
      "statuses_count": 27755
    },
    "text": "WANTED….? For Bangalore Project|Easy Cinema Studios| – Bangalore Video https://t.co/J5xXw9Eq8I",
    "lang": "en",
    "is_retweet": false,
    "entities": {
      "hashtags": [],
      "media": [],
      "urls": [
        {
          "display_url": "bangaloreinformer.com/49412/wanted-f…",
          "expanded_url": "https://bangaloreinformer.com/49412/wanted-for-bangalore-projecteasy-cinema-studios-bangalore-video/",
          "url": "https://t.co/J5xXw9Eq8I"
        }
      ],
      "user_mentions": []
    }
  }
}



Location: Bengaluru, Karnataka, India