Flink comes with an integrated interactive Scala Shell. It can be used in a local setup as well as in a cluster setup. To get started with downloading Flink and setting up a cluster please refer to local setup or cluster setup
To use the shell with an integrated Flink cluster just execute:
bin/start-scala-shell.sh local
in the root directory of your binary Flink directory.
To use it with a running cluster start the scala shell with the keyword remote
and supply the host and port of the JobManager with:
bin/start-scala-shell.sh remote <hostname> <portnumber>
The shell will prebind the ExecutionEnvironment as “env”, so far only batch mode is supported.
The following example will execute the wordcount program in the Scala shell:
Scala-Flink> val text = env.fromElements(
"To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffer",
"The slings and arrows of outrageous fortune",
"Or to take arms against a sea of troubles,")
Scala-Flink> val counts = text.flatMap { _.toLowerCase.split("\\W+") }.map { (_, 1) }.groupBy(0).sum(1)
Scala-Flink> counts.print()
The print() command will automatically send the specified tasks to the JobManager for execution and will show the result of the computation in the terminal.
It is possbile to write results to a file. However, in this case you need to call execute
, to run your program:
Scala-Flink> env.execute("MyProgram")
The Flink Shell comes with command history and autocompletion.
It is possible to add external classpaths to the Scala-shell. These will be sent to the Jobmanager automatically alongside your shell program, when calling execute.
Use the parameter -a <path/to/jar.jar>
or --addclasspath <path/to/jar.jar>
to load additional classes.
bin/start-scala-shell.sh [local | remote <host> <port>] --addclasspath <path/to/jar.jar>