Twitter Kafka connector : How to fetch tweets from twitter account using Kafka connector and publish it in another Kafka topic

In previous post we did setup Kafka connect cluster in docker(Landoop fast-data-dev) and created  FileStreamConnector in standalone mode and in distributed mode for transferring file content to Kafka topic. In this post we will use existing Docker Kafka connect setup to fetch tweets from twitter using TwitterConnector and publish in another topic.

Prerequisite

1. Setup Kafka connect. I have setup using docker Landoop fast-data-dev.
2. Create Twitter App account and setup an app. Twitter provides four different keys. Under Keys and access token tab we have following keys:
Application Settings
    Consumer Key (API Key)
    Consumer Secret (API Secret)
Your Access Token
    Access Token
    Access Token Secret

I have created an app "ZythamTwitterStreamTest" and ready with 4 keys. Start Kafka cluster using docker-compose.
➜  Kafka-connect docker-compose up kafka-cluster 
Starting kafkaconnect_kafka-cluster_1 ... done
Attaching to kafkaconnect_kafka-cluster_1

Create Kafka topic where tweets are published by TwitterKafkaConnector.
➜  Kafka-connect kafka-topics --create --topic kafka-connect-distributed-twitter --partitions 3 --replication-factor 1 --zookeeper 127.0.0.1:2181
Created topic "kafka-connect-distributed".

Create Kafka twitter connector for distributed mode transferring stream data to Kafka topic: Use below configuration for creating twitter Kafka connector.

name=source-twitter-distributed
connector.class=com.eneco.trading.kafka.connect.twitter.TwitterSourceConnector
tasks.max=1
topic=kafka-connect-distributed-twitter
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
twitter.consumerkey=LXPbdzppfUS51i9v3clDSBCBe
twitter.consumersecret=FkPxDo0TvvnDb9wnlXF7i3MEvRKpCo9BcAfxGXfSRMotss7zyF
twitter.token=67405754-X4LRN3IWJ9CP5JuPxs86iiKWev20o7Zi6g3IuHFpM
twitter.secret=Fi4qhOfXSsPQTEjIfCOwNNVNLxHyXiP3vMHIrUc9OjFCE
track.terms=Bangalore,Kafka,Cyclone
language=en


Start Kafka consumer listening topic "kafka-connect-distributed-twitter" :
root@fast-data-dev / $ kafka-console-consumer --topic kafka-connect-distributed-twitter --bootstrap-server 127.0.0.1:9092

Message received by consumer (JSON formatted): Below message received as tweet contains word "Bangalore".

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "int64",
        "optional": false,
        "field": "id"
      },
      {
        "type": "string",
        "optional": true,
        "field": "created_at"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "int64",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": true,
            "field": "name"
          },
          {
            "type": "string",
            "optional": true,
            "field": "screen_name"
          },
          {
            "type": "string",
            "optional": true,
            "field": "location"
          },
          {
            "type": "boolean",
            "optional": false,
            "field": "verified"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "friends_count"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "followers_count"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "statuses_count"
          }
        ],
        "optional": false,
        "name": "com.eneco.trading.kafka.connect.twitter.User",
        "field": "user"
      },
      {
        "type": "string",
        "optional": true,
        "field": "text"
      },
      {
        "type": "string",
        "optional": true,
        "field": "lang"
      },
      {
        "type": "boolean",
        "optional": false,
        "field": "is_retweet"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "array",
            "items": {
              "type": "struct",
              "fields": [
                {
                  "type": "string",
                  "optional": true,
                  "field": "text"
                }
              ],
              "optional": false,
              "name": "com.eneco.trading.kafka.connect.twitter.Hashtag"
            },
            "optional": true,
            "field": "hashtags"
          },
          {
            "type": "array",
            "items": {
              "type": "struct",
              "fields": [
                {
                  "type": "string",
                  "optional": true,
                  "field": "display_url"
                },
                {
                  "type": "string",
                  "optional": true,
                  "field": "expanded_url"
                },
                {
                  "type": "int64",
                  "optional": false,
                  "field": "id"
                },
                {
                  "type": "string",
                  "optional": true,
                  "field": "type"
                },
                {
                  "type": "string",
                  "optional": true,
                  "field": "url"
                }
              ],
              "optional": false,
              "name": "com.eneco.trading.kafka.connect.twitter.Medium"
            },
            "optional": true,
            "field": "media"
          },
          {
            "type": "array",
            "items": {
              "type": "struct",
              "fields": [
                {
                  "type": "string",
                  "optional": true,
                  "field": "display_url"
                },
                {
                  "type": "string",
                  "optional": true,
                  "field": "expanded_url"
                },
                {
                  "type": "string",
                  "optional": true,
                  "field": "url"
                }
              ],
              "optional": false,
              "name": "com.eneco.trading.kafka.connect.twitter.Url"
            },
            "optional": true,
            "field": "urls"
          },
          {
            "type": "array",
            "items": {
              "type": "struct",
              "fields": [
                {
                  "type": "int64",
                  "optional": false,
                  "field": "id"
                },
                {
                  "type": "string",
                  "optional": true,
                  "field": "name"
                },
                {
                  "type": "string",
                  "optional": true,
                  "field": "screen_name"
                }
              ],
              "optional": false,
              "name": "com.eneco.trading.kafka.connect.twitter.UserMention"
            },
            "optional": true,
            "field": "user_mentions"
          }
        ],
        "optional": false,
        "name": "com.eneco.trading.kafka.connect.twitter.Entities",
        "field": "entities"
      }
    ],
    "optional": false,
    "name": "com.eneco.trading.kafka.connect.twitter.Tweet"
  },
  "payload": {
    "id": 1047866309655101400,
    "created_at": "2018-10-04T15:09:31.000+0000",
    "user": {
      "id": 166235967,
      "name": "Bangalore Informer",
      "screen_name": "bangaloreinform",
      "location": "Bangalore",
      "verified": false,
      "friends_count": 27,
      "followers_count": 402,
      "statuses_count": 27755
    },
    "text": "WANTED….? For Bangalore Project|Easy Cinema Studios| – Bangalore Video https://t.co/J5xXw9Eq8I",
    "lang": "en",
    "is_retweet": false,
    "entities": {
      "hashtags": [],
      "media": [],
      "urls": [
        {
          "display_url": "bangaloreinformer.com/49412/wanted-f…",
          "expanded_url": "https://bangaloreinformer.com/49412/wanted-for-bangalore-projecteasy-cinema-studios-bangalore-video/",
          "url": "https://t.co/J5xXw9Eq8I"
        }
      ],
      "user_mentions": []
    }
  }
}



7 Comments

  1. How do you debug a situation where you don't receive any messages? All the logs seem right but no messages gets received. I know for sure the terms are popular and there are lots of tweets but the nothing comes.

    ReplyDelete
  2. Interestingly you write, I will address you'll find exciting and interesting things on similar topics. Satta king online

    ReplyDelete
  3. I NEED TO TO THANK YOU FOR ONES TIME FOR THIS PARTICULARLY FANTASTIC READ !! I DEFINITELY REALLY LIKED EVERY PART OF IT AND I ALSO HAVE YOU SAVED TO FAV TO LOOK AT NEW INFORMATION IN YOUR SITE.
    고스톱

    ReplyDelete
  4. HI I AM A NEW USER OF THIS SITE SO HERE I SAW MULTIPLE ARTICLES AND POSTS POSTED BY THIS SITE, I CURIOUS MORE INTEREST IN SOME OF THEM HOPE YOU WILL GIVE MORE INFORMATION ON THIS TOPICS IN YOUR NEXT ARTICLES. THANK YOU FOR THIS....
    스포츠토토

    ReplyDelete

  5. WE ARE REALLY GRATEFUL FOR YOUR BLOG POST. YOU WILL FIND A LOT OF APPROACHES AFTER VISITING YOUR POST. GREAT WORK!
    토토사이트

    ReplyDelete
  6. Notwithstanding, being a long way from ordinary promoting as perhaps, likewise, it has confounded numerous producers that are uncertain how to use the framework to drive income and new openness.mytoolstown

    ReplyDelete
Previous Post Next Post