Twitter Streaming API provides opportunity to connect to the stream of tweets made available by Twitter. Flink Streaming comes with a built-in
TwitterSource class for establishing a connection to this stream. To use this connector, add the following dependency to your project:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-twitter_2.10</artifactId> <version>1.0.3</version> </dependency>
Note that the streaming connectors are currently not part of the binary distribution. See linking with them for cluster execution here.
In order to connect to Twitter stream the user has to register their program and acquire the necessary information for the authentication. The process is described below.
Acquiring the authentication information
First of all, a Twitter account is needed. Sign up for free at twitter.com/signup or sign in at Twitter’s Application Management and register the application by clicking on the “Create New App” button. Fill out a form about your program and accept the Terms and Conditions.
After selecting the application, the API key and API secret (called
TwitterSource respectively) is located on the “API Keys” tab. The necessary OAuth Access Token data (
TwitterSource) can be generated and acquired on the “Keys and Access Tokens” tab.
Remember to keep these pieces of information secret and do not push them to public repositories.
Accessing the authentication information
Create a properties file, and pass its path in the constructor of
TwitterSource. The content of the file should be similar to this:
#properties file for my app secret=*** consumerSecret=*** token=***-*** consumerKey=***
TwitterSource class has two constructors.
public TwitterSource(String authPath, int numberOfTweets);to emit a finite number of tweets
public TwitterSource(String authPath);for streaming
Both constructors expect a
String authPath argument determining the location of the properties file containing the authentication information. In the first case,
numberOfTweets determines how many tweet the source emits.
In contrast to other connectors, the
TwitterSource depends on no additional services. For example the following code should run gracefully:
DataStream<String> streamSource = env.addSource(new TwitterSource("/PATH/TO/myFile.properties"));
streamSource = env.addSource(new TwitterSource("/PATH/TO/myFile.properties"))
TwitterSource emits strings containing a JSON code.
To retrieve information from the JSON code you can add a FlatMap or a Map function handling JSON code. For example, there is an implementation
JSONParseFlatMap abstract class among the examples.
JSONParseFlatMap is an extension of the
FlatMapFunction and has a
String getField(String jsonText, String field);
getField(jsonText : String, field : String) : String
function which can be use to acquire the value of a given field.
There are two basic types of tweets. The usual tweets contain information such as date and time of creation, id, user, language and many more details. The other type is the delete information.
TwitterStream is an example of how to use
TwitterSource. It implements a language frequency counter program.