Flink programs can run distributed on clusters of many machines. There are two ways to send a program to a cluster for execution:
The command line interface lets you submit packaged programs (JARs) to a cluster (or single machine setup).
Please refer to the Command Line Interface documentation for details.
The remote environment lets you execute Flink Java programs on a cluster directly. The remote environment points to the cluster on which you want to execute the program.
If you are developing your program as a Maven project, you have to add the
flink-clients
module using this dependency:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.10</artifactId>
<version>1.0.3</version>
</dependency>
The following illustrates the use of the RemoteEnvironment
:
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment
.createRemoteEnvironment("flink-master", 6123, "/home/user/udfs.jar");
DataSet<String> data = env.readTextFile("hdfs://path/to/file");
data
.filter(new FilterFunction<String>() {
public boolean filter(String value) {
return value.startsWith("http://");
}
})
.writeAsText("hdfs://path/to/result");
env.execute();
}
Note that the program contains custom user code and hence requires a JAR file with the classes of the code attached. The constructor of the remote environment takes the path(s) to the JAR file(s).
The binary distribution contains jar packages in the lib
folder that are automatically
provided to the classpath of your distrbuted programs. Almost all of Flink classes are
located there with a few exceptions, for example the streaming connectors and some freshly
added modules. To run code depending on these modules you need to make them accessible
during runtime, for which we suggest two options:
lib
folder onto all of your TaskManagers.
Note that you have to restart your TaskManagers after this.The latter version is recommended as it respects the classloader management in Flink.
To provide these dependencies not included by Flink we suggest two options with Maven.
Using the latter approach in order to bundle the Kafka connector, flink-connector-kafka
you would need to add the classes from both the connector and the Kafka API itself. Add
the following to your plugins section.
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.9</version>
<executions>
<execution>
<id>unpack</id>
<!-- executed just before the package phase -->
<phase>prepare-package</phase>
<goals>
<goal>unpack</goal>
</goals>
<configuration>
<artifactItems>
<!-- For Flink connector classes -->
<artifactItem>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>1.0.3</version>
<type>jar</type>
<overWrite>false</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<includes>org/apache/flink/**</includes>
</artifactItem>
<!-- For Kafka API classes -->
<artifactItem>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_<YOUR_SCALA_VERSION></artifactId>
<version><YOUR_KAFKA_VERSION></version>
<type>jar</type>
<overWrite>false</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<includes>kafka/**</includes>
</artifactItem>
</artifactItems>
</configuration>
</execution>
</executions>
</plugin>
Now when running mvn clean package
the produced jar includes the required dependencies.