Introduction
This connector uses the twitter streaming api to listen for status update messages and convert them to a Kafka Connect struct on the fly. The goal is to match as much of the Twitter Status object as possible.
Configuration
TwitterSourceConnector
This Twitter Source connector is used to pull data from Twitter in realtime.
name=connector1
tasks.max=1
connector.class=com.github.jcustenborder.kafka.connect.twitter.TwitterSourceConnector
# Set these required values
twitter.oauth.accessTokenSecret=
process.deletes=
filter.keywords=
kafka.status.topic=
kafka.delete.topic=
twitter.oauth.consumerSecret=
twitter.oauth.accessToken=
twitter.oauth.consumerKey=
Name | Description | Type | Default | Valid Values | Importance |
---|---|---|---|---|---|
filter.keywords | Twitter keywords to filter for. | list | high | ||
filter.userIds | Twitter user IDs to follow. | list | "" | low | |
kafka.delete.topic | Kafka topic to write delete events to. | string | high | ||
kafka.status.topic | Kafka topic to write the statuses to. | string | high | ||
process.deletes | Should this connector process deletes. | boolean | high | ||
twitter.oauth.accessToken | OAuth access token | password | high | ||
twitter.oauth.accessTokenSecret | OAuth access token secret | password | high | ||
twitter.oauth.consumerKey | OAuth consumer key | password | high | ||
twitter.oauth.consumerSecret | OAuth consumer secret | password | high | ||
twitter.debug | Flag to enable debug logging for the twitter api. | boolean | false | low |
Schemas
com.github.jcustenborder.kafka.connect.twitter.Place
Returns the place attached to this status
Name | Optional | Schema | Default Value | Documentation |
---|---|---|---|---|
Name | true | String | ||
StreetAddress | true | String | ||
CountryCode | true | String | ||
Id | true | String | ||
Country | true | String | ||
PlaceType | true | String | ||
URL | true | String | ||
FullName | true | String |
com.github.jcustenborder.kafka.connect.twitter.GeoLocation
Returns The location that this tweet refers to if available.
Name | Optional | Schema | Default Value | Documentation |
---|---|---|---|---|
Latitude | false | Float64 | returns the latitude of the geo location | |
Longitude | false | Float64 | returns the longitude of the geo location |
com.github.jcustenborder.kafka.connect.twitter.StatusDeletionNotice
Message that is received when a status is deleted from Twitter.
Name | Optional | Schema | Default Value | Documentation |
---|---|---|---|---|
StatusId | false | Int64 | ||
UserId | false | Int64 |
com.github.jcustenborder.kafka.connect.twitter.StatusDeletionNoticeKey
Key for a message that is received when a status is deleted from Twitter.
Name | Optional | Schema | Default Value | Documentation |
---|---|---|---|---|
StatusId | false | Int64 |
com.github.jcustenborder.kafka.connect.twitter.StatusKey
Key for a twitter status.
Name | Optional | Schema | Default Value | Documentation |
---|---|---|---|---|
Id | true | Int64 |
com.github.jcustenborder.kafka.connect.twitter.Status
Twitter status message.
Name | Optional | Schema | Default Value | Documentation |
---|---|---|---|---|
CreatedAt | true | Timestamp | Return the created_at | |
Id | true | Int64 | Returns the id of the status | |
Text | true | String | Returns the text of the status | |
Source | true | String | Returns the source | |
Truncated | true | Boolean | Test if the status is truncated | |
InReplyToStatusId | true | Int64 | Returns the in_reply_tostatus_id | |
InReplyToUserId | true | Int64 | Returns the in_reply_user_id | |
InReplyToScreenName | true | String | Returns the in_reply_to_screen_name | |
GeoLocation | true | com.github.jcustenborder.kafka.connect.twitter.GeoLocation | Returns The location that this tweet refers to if available. | |
Place | true | com.github.jcustenborder.kafka.connect.twitter.Place | Returns the place attached to this status | |
Favorited | true | Boolean | Test if the status is favorited | |
Retweeted | true | Boolean | Test if the status is retweeted | |
FavoriteCount | true | Int32 | Indicates approximately how many times this Tweet has been "favorited" by Twitter users. | |
User | false | com.github.jcustenborder.kafka.connect.twitter.User | Return the user associated with the status. | |
This can be null if the instance is from User.getStatus(). | ||||
Retweet | true | Boolean | ||
Contributors | false | Array of Int64 | Returns an array of contributors, or null if no contributor is associated with this status. | |
RetweetCount | true | Int32 | Returns the number of times this tweet has been retweeted, or -1 when the tweet was created before this feature was enabled. | |
RetweetedByMe | true | Boolean | ||
CurrentUserRetweetId | true | Int64 | Returns the authenticating user's retweet's id of this tweet, or -1L when the tweet was created before this feature was enabled. | |
PossiblySensitive | true | Boolean | ||
Lang | true | String | Returns the lang of the status text if available. | |
WithheldInCountries | false | Array of String | Returns the list of country codes where the tweet is withheld | |
HashtagEntities | true | Array of com.github.jcustenborder.kafka.connect.twitter.HashtagEntity | Returns an array if hashtag mentioned in the tweet. | |
UserMentionEntities | true | Array of com.github.jcustenborder.kafka.connect.twitter.UserMentionEntity | Returns an array of user mentions in the tweet. | |
MediaEntities | true | Array of com.github.jcustenborder.kafka.connect.twitter.MediaEntity | Returns an array of MediaEntities if medias are available in the tweet. | |
SymbolEntities | true | Array of com.github.jcustenborder.kafka.connect.twitter.SymbolEntity | Returns an array of SymbolEntities if medias are available in the tweet. | |
URLEntities | true | Array of com.github.jcustenborder.kafka.connect.twitter.URLEntity | Returns an array if URLEntity mentioned in the tweet. |
com.github.jcustenborder.kafka.connect.twitter.User
Return the user associated with the status. This can be null if the instance is from User.getStatus().
Name | Optional | Schema | Default Value | Documentation |
---|---|---|---|---|
Id | true | Int64 | Returns the id of the user | |
Name | true | String | Returns the name of the user | |
ScreenName | true | String | Returns the screen name of the user | |
Location | true | String | Returns the location of the user | |
Description | true | String | Returns the description of the user | |
ContributorsEnabled | true | Boolean | Tests if the user is enabling contributors | |
ProfileImageURL | true | String | Returns the profile image url of the user | |
BiggerProfileImageURL | true | String | ||
MiniProfileImageURL | true | String | ||
OriginalProfileImageURL | true | String | ||
ProfileImageURLHttps | true | String | ||
BiggerProfileImageURLHttps | true | String | ||
MiniProfileImageURLHttps | true | String | ||
OriginalProfileImageURLHttps | true | String | ||
DefaultProfileImage | true | Boolean | Tests if the user has not uploaded their own avatar | |
URL | true | String | Returns the url of the user | |
Protected | true | Boolean | Test if the user status is protected | |
FollowersCount | true | Int32 | Returns the number of followers | |
ProfileBackgroundColor | true | String | ||
ProfileTextColor | true | String | ||
ProfileLinkColor | true | String | ||
ProfileSidebarFillColor | true | String | ||
ProfileSidebarBorderColor | true | String | ||
ProfileUseBackgroundImage | true | Boolean | ||
DefaultProfile | true | Boolean | Tests if the user has not altered the theme or background | |
ShowAllInlineMedia | true | Boolean | ||
FriendsCount | true | Int32 | Returns the number of users the user follows (AKA "followings") | |
CreatedAt | true | Timestamp | ||
FavouritesCount | true | Int32 | ||
UtcOffset | true | Int32 | ||
TimeZone | true | String | ||
ProfileBackgroundImageURL | true | String | ||
ProfileBackgroundImageUrlHttps | true | String | ||
ProfileBannerURL | true | String | ||
ProfileBannerRetinaURL | true | String | ||
ProfileBannerIPadURL | true | String | ||
ProfileBannerIPadRetinaURL | true | String | ||
ProfileBannerMobileURL | true | String | ||
ProfileBannerMobileRetinaURL | true | String | ||
ProfileBackgroundTiled | true | Boolean | ||
Lang | true | String | Returns the preferred language of the user | |
StatusesCount | true | Int32 | ||
GeoEnabled | true | Boolean | ||
Verified | true | Boolean | ||
Translator | true | Boolean | ||
ListedCount | true | Int32 | Returns the number of public lists the user is listed on, or -1 if the count is unavailable. | |
FollowRequestSent | true | Boolean | Returns true if the authenticating user has requested to follow this user, otherwise false. | |
WithheldInCountries | false | Array of String | Returns the list of country codes where the user is withheld |
com.github.jcustenborder.kafka.connect.twitter.ExtendedMediaEntity.Variant
Name | Optional | Schema | Default Value | Documentation |
---|---|---|---|---|
Url | true | String | ||
Bitrate | true | Int32 | ||
ContentType | true | String |
com.github.jcustenborder.kafka.connect.twitter.MediaEntity.Size
Name | Optional | Schema | Default Value | Documentation |
---|---|---|---|---|
Resize | true | Int32 | ||
Width | true | Int32 | ||
Height | true | Int32 |
com.github.jcustenborder.kafka.connect.twitter.ExtendedMediaEntity
Name | Optional | Schema | Default Value | Documentation |
---|---|---|---|---|
VideoAspectRatioWidth | true | Int32 | ||
VideoAspectRatioHeight | true | Int32 | ||
VideoDurationMillis | true | Int64 | ||
VideoVariants | true | Array of com.github.jcustenborder.kafka.connect.twitter.ExtendedMediaEntity.Variant | ||
ExtAltText | true | String | ||
Id | true | Int64 | Returns the id of the media. | |
Type | true | String | Returns the media type photo, video, animated_gif. | |
MediaURL | true | String | Returns the media URL. | |
Sizes | false | Map of <Int32, com.github.jcustenborder.kafka.connect.twitter.MediaEntity.Size> | Returns size variations of the media. | |
MediaURLHttps | true | String | Returns the media secure URL. | |
URL | true | String | Returns the URL mentioned in the tweet. | |
Text | true | String | Returns the URL mentioned in the tweet. | |
ExpandedURL | true | String | Returns the expanded URL if mentioned URL is shorten. | |
Start | true | Int32 | Returns the index of the start character of the URL mentioned in the tweet. | |
End | true | Int32 | Returns the index of the end character of the URL mentioned in the tweet. | |
DisplayURL | true | String | Returns the display URL if mentioned URL is shorten. |
com.github.jcustenborder.kafka.connect.twitter.HashtagEntity
Name | Optional | Schema | Default Value | Documentation |
---|---|---|---|---|
Text | true | String | Returns the text of the hashtag without #. | |
Start | true | Int32 | Returns the index of the start character of the hashtag. | |
End | true | Int32 | Returns the index of the end character of the hashtag. |
com.github.jcustenborder.kafka.connect.twitter.MediaEntity
Name | Optional | Schema | Default Value | Documentation |
---|---|---|---|---|
Id | true | Int64 | Returns the id of the media. | |
Type | true | String | Returns the media type photo, video, animated_gif. | |
MediaURL | true | String | Returns the media URL. | |
Sizes | false | Map of <Int32, com.github.jcustenborder.kafka.connect.twitter.MediaEntity.Size> | ||
MediaURLHttps | true | String | Returns the media secure URL. | |
VideoAspectRatioWidth | true | Int32 | ||
VideoAspectRatioHeight | true | Int32 | ||
VideoDurationMillis | true | Int64 | ||
VideoVariants | true | Array of com.github.jcustenborder.kafka.connect.twitter.ExtendedMediaEntity.Variant | Returns size variations of the media. | |
ExtAltText | true | String | ||
URL | true | String | Returns the URL mentioned in the tweet. | |
Text | true | String | Returns the URL mentioned in the tweet. | |
ExpandedURL | true | String | Returns the expanded URL if mentioned URL is shorten. | |
Start | true | Int32 | Returns the index of the start character of the URL mentioned in the tweet. | |
End | true | Int32 | Returns the index of the end character of the URL mentioned in the tweet. | |
DisplayURL | true | String | Returns the display URL if mentioned URL is shorten. |
com.github.jcustenborder.kafka.connect.twitter.SymbolEntity
Name | Optional | Schema | Default Value | Documentation |
---|---|---|---|---|
Start | true | Int32 | Returns the index of the start character of the symbol. | |
End | true | Int32 | Returns the index of the end character of the symbol. | |
Text | true | String | Returns the text of the entity |
com.github.jcustenborder.kafka.connect.twitter.URLEntity
Name | Optional | Schema | Default Value | Documentation |
---|---|---|---|---|
URL | true | String | Returns the URL mentioned in the tweet. | |
Text | true | String | Returns the URL mentioned in the tweet. | |
ExpandedURL | true | String | Returns the expanded URL if mentioned URL is shorten. | |
Start | true | Int32 | Returns the index of the start character of the URL mentioned in the tweet. | |
End | true | Int32 | Returns the index of the end character of the URL mentioned in the tweet. | |
DisplayURL | true | String | Returns the display URL if mentioned URL is shorten. |
com.github.jcustenborder.kafka.connect.twitter.UserMentionEntity
Name | Optional | Schema | Default Value | Documentation |
---|---|---|---|---|
Name | true | String | Returns the name mentioned in the status. | |
Id | true | Int64 | Returns the user id mentioned in the status. | |
Text | true | String | Returns the screen name mentioned in the status. | |
ScreenName | true | String | Returns the screen name mentioned in the status. | |
Start | true | Int32 | Returns the index of the start character of the user mention. | |
End | true | Int32 | Returns the index of the end character of the user mention. |
Running in development
mvn clean package
export CLASSPATH="$(find target/ -type f -name '*.jar'| grep '\-package' | tr '\n' ':')"
$CONFLUENT_HOME/bin/connect-standalone connect/connect-avro-docker.properties config/TwitterSourceConnector.properties