The Apache Flink project supports multiple file systems that can be used as backing stores for input and output connectors.
Apache Flink allows users to access many different systems as data sources or sinks.
The system is designed for very easy extensibility. Similar to Apache Hadoop, Flink has the concept
of so called InputFormat
s and OutputFormat
s.
One implementation of these InputFormat
s is the HadoopInputFormat
. This is a wrapper that allows
users to use all existing Hadoop input formats with Flink.
This section shows some examples for connecting Flink to other systems. Read more about Hadoop compatibility in Flink.
Flink has extensive built-in support for Apache Avro. This allows to easily read from Avro files with Flink. Also, the serialization framework of Flink is able to handle classes generated from Avro schemas. Be sure to include the Flink Avro dependency to the pom.xml of your project.
In order to read data from an Avro file, you have to specify an AvroInputFormat
.
Example:
Note that User
is a POJO generated by Avro. Flink also allows to perform string-based key selection of these POJOs. For example:
Note that using the GenericData.Record
type is possible with Flink, but not recommended. Since the record contains the full schema, its very data intensive and thus probably slow to use.
Flink’s POJO field selection also works with POJOs generated from Avro. However, the usage is only possible if the field types are written correctly to the generated class. If a field is of type Object
you can not use the field as a join or grouping key.
Specifying a field in Avro like this {"name": "type_double_test", "type": "double"},
works fine, however specifying it as a UNION-type with only one field ({"name": "type_double_test", "type": ["double"]},
) will generate a field of type Object
. Note that specifying nullable types ({"name": "type_double_test", "type": ["null", "double"]},
) is possible!
Note: This example works starting from Flink 0.6-incubating
This example is using the HadoopInputFormat
wrapper to use an existing Hadoop input format implementation for accessing Azure’s Table Storage.
azure-tables-hadoop
project. The input format developed by the project is not yet available in Maven Central, therefore, we have to build the project ourselves.
Execute the following commands:<dependencies>
section) to your pom.xml
file:flink-hadoop-compatibility
is a Flink package that provides the Hadoop input format wrappers.
microsoft-hadoop-azure
is adding the project we’ve build before to our project.
The project is now prepared for starting to code. We recommend to import the project into an IDE, such as Eclipse or IntelliJ. (Import as a Maven project!).
Browse to the code of the Job.java
file. Its an empty skeleton for a Flink job.
Paste the following code into it:
The example shows how to access an Azure table and turn data into Flink’s DataSet
(more specifically, the type of the set is DataSet<Tuple2<Text, WritableEntity>>
). With the DataSet
, you can apply all known transformations to the DataSet.
This GitHub repository documents how to use MongoDB with Apache Flink (starting from 0.7-incubating).