public class StreamExecutionEnvironment extends Object
Constructor and Description |
---|
StreamExecutionEnvironment(StreamExecutionEnvironment javaEnv) |
Modifier and Type | Method and Description |
---|---|
void |
addDefaultKryoSerializer(Class<?> type,
Class<? extends com.esotericsoftware.kryo.Serializer<?>> serializerClass)
Adds a new Kryo default serializer to the Runtime.
|
<T extends com.esotericsoftware.kryo.Serializer<?>> |
addDefaultKryoSerializer(Class<?> type,
T serializer)
Adds a new Kryo default serializer to the Runtime.
|
<T> DataStream<T> |
addSource(scala.Function1<SourceFunction.SourceContext<T>,scala.runtime.BoxedUnit> function,
TypeInformation<T> evidence$10)
Create a DataStream using a user defined source function for arbitrary
source functionality.
|
<T> DataStream<T> |
addSource(SourceFunction<T> function,
TypeInformation<T> evidence$9)
Create a DataStream using a user defined source function for arbitrary
source functionality.
|
<T> DataStream<T> |
createInput(InputFormat<T,?> inputFormat,
TypeInformation<T> evidence$8)
Generic method to create an input data stream with a specific input format.
|
static StreamExecutionEnvironment |
createLocalEnvironment(int parallelism)
Creates a local execution environment.
|
static StreamExecutionEnvironment |
createLocalEnvironmentWithWebUI(Configuration config)
Creates a
StreamExecutionEnvironment for local program execution that also starts the
web monitoring UI. |
static StreamExecutionEnvironment |
createRemoteEnvironment(String host,
int port,
int parallelism,
scala.collection.Seq<String> jarFiles)
Creates a remote execution environment.
|
static StreamExecutionEnvironment |
createRemoteEnvironment(String host,
int port,
scala.collection.Seq<String> jarFiles)
Creates a remote execution environment.
|
StreamExecutionEnvironment |
disableOperatorChaining()
Disables operator chaining for streaming operators.
|
StreamExecutionEnvironment |
enableCheckpointing()
Deprecated.
. Since .
|
StreamExecutionEnvironment |
enableCheckpointing(long interval)
Enables checkpointing for the streaming job.
|
StreamExecutionEnvironment |
enableCheckpointing(long interval,
CheckpointingMode mode)
Enables checkpointing for the streaming job.
|
StreamExecutionEnvironment |
enableCheckpointing(long interval,
CheckpointingMode mode,
boolean force)
Deprecated.
. Since .
|
JobExecutionResult |
execute()
Triggers the program execution.
|
JobExecutionResult |
execute(String jobName)
Triggers the program execution.
|
<T> DataStream<T> |
fromCollection(scala.collection.Iterator<T> data,
TypeInformation<T> evidence$3)
Creates a DataStream from the given
Iterator . |
<T> DataStream<T> |
fromCollection(scala.collection.Seq<T> data,
TypeInformation<T> evidence$2)
Creates a DataStream from the given non-empty
Seq . |
<T> DataStream<T> |
fromElements(scala.collection.Seq<T> data,
TypeInformation<T> evidence$1)
Creates a DataStream that contains the given elements.
|
<T> DataStream<T> |
fromParallelCollection(SplittableIterator<T> data,
TypeInformation<T> evidence$4)
Creates a DataStream from the given
SplittableIterator . |
DataStream<Object> |
generateSequence(long from,
long to)
Creates a new DataStream that contains a sequence of numbers.
|
long |
getBufferTimeout()
Gets the default buffer timeout set for this environment
|
List<Tuple2<String,DistributedCache.DistributedCacheEntry>> |
getCachedFiles()
Gets cache files.
|
CheckpointConfig |
getCheckpointConfig()
Gets the checkpoint config, which defines values like checkpoint interval, delay between
checkpoints, etc.
|
CheckpointingMode |
getCheckpointingMode() |
ExecutionConfig |
getConfig()
Gets the config object.
|
static int |
getDefaultLocalParallelism()
Gets the default parallelism that will be used for the local execution environment created by
createLocalEnvironment() . |
static StreamExecutionEnvironment |
getExecutionEnvironment()
Creates an execution environment that represents the context in which the program is
currently executed.
|
String |
getExecutionPlan()
Creates the plan with which the system will execute the program, and
returns it as a String using a JSON representation of the execution data
flow graph.
|
StreamExecutionEnvironment |
getJavaEnv() |
int |
getMaxParallelism()
Returns the maximum degree of parallelism defined for the program.
|
int |
getNumberOfExecutionRetries()
Deprecated.
This method will be replaced by
getRestartStrategy . The
FixedDelayRestartStrategyConfiguration contains the number of execution retries. |
int |
getParallelism()
Returns the default parallelism for this execution environment.
|
RestartStrategies.RestartStrategyConfiguration |
getRestartStrategy()
Returns the specified restart strategy configuration.
|
AbstractStateBackend |
getStateBackend()
Returns the state backend that defines how to store and checkpoint state.
|
StreamGraph |
getStreamGraph()
Getter of the
StreamGraph of the streaming job. |
TimeCharacteristic |
getStreamTimeCharacteristic()
Gets the time characteristic/
|
StreamExecutionEnvironment |
getWrappedStreamExecutionEnvironment()
Getter of the wrapped
StreamExecutionEnvironment |
<T> DataStream<T> |
readFile(FileInputFormat<T> inputFormat,
String filePath,
FileProcessingMode watchType,
long interval,
FilePathFilter filter,
TypeInformation<T> evidence$6)
Deprecated.
Use
FileInputFormat#setFilesFilter(FilePathFilter) to set a filter and
StreamExecutionEnvironment#readFile(FileInputFormat, String, FileProcessingMode, long) |
<T> DataStream<T> |
readFile(FileInputFormat<T> inputFormat,
String filePath,
FileProcessingMode watchType,
long interval,
TypeInformation<T> evidence$7)
Reads the contents of the user-specified path based on the given
FileInputFormat . |
<T> DataStream<T> |
readFile(FileInputFormat<T> inputFormat,
String filePath,
TypeInformation<T> evidence$5)
Reads the given file with the given input format.
|
DataStream<String> |
readFileStream(String StreamPath,
long intervalMillis,
FileMonitoringFunction.WatchType watchType)
Creates a DataStream that contains the contents of file created while
system watches the given path.
|
DataStream<String> |
readTextFile(String filePath)
Creates a DataStream that represents the Strings produced by reading the
given file line wise.
|
DataStream<String> |
readTextFile(String filePath,
String charsetName)
Creates a data stream that represents the Strings produced by reading the given file
line wise.
|
void |
registerCachedFile(String filePath,
String name)
Registers a file at the distributed cache under the given name.
|
void |
registerCachedFile(String filePath,
String name,
boolean executable)
Registers a file at the distributed cache under the given name.
|
void |
registerType(Class<?> typeClass)
Registers the given type with the serialization stack.
|
void |
registerTypeWithKryoSerializer(Class<?> clazz,
Class<? extends com.esotericsoftware.kryo.Serializer<?>> serializer)
Registers the given type with the serializer at the
KryoSerializer . |
<T extends com.esotericsoftware.kryo.Serializer<?>> |
registerTypeWithKryoSerializer(Class<?> clazz,
T serializer)
Registers the given type with the serializer at the
KryoSerializer . |
<F> F |
scalaClean(F f)
Returns a "closure-cleaned" version of the given function.
|
StreamExecutionEnvironment |
setBufferTimeout(long timeoutMillis)
Sets the maximum time frequency (milliseconds) for the flushing of the
output buffers.
|
static void |
setDefaultLocalParallelism(int parallelism)
Sets the default parallelism that will be used for the local execution
environment created by
createLocalEnvironment() . |
void |
setMaxParallelism(int maxParallelism)
Sets the maximum degree of parallelism defined for the program.
|
void |
setNumberOfExecutionRetries(int numRetries)
Deprecated.
This method will be replaced by
setRestartStrategy() . The
FixedDelayRestartStrategyConfiguration contains the number of execution retries. |
void |
setParallelism(int parallelism)
Sets the parallelism for operations executed through this environment.
|
void |
setRestartStrategy(RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration)
Sets the restart strategy configuration.
|
StreamExecutionEnvironment |
setStateBackend(AbstractStateBackend backend)
Sets the state backend that describes how to store and checkpoint operator state.
|
void |
setStreamTimeCharacteristic(TimeCharacteristic characteristic)
Sets the time characteristic for all streams create from this environment, e.g., processing
time, event time, or ingestion time.
|
DataStream<String> |
socketTextStream(String hostname,
int port,
char delimiter,
long maxRetry)
Creates a new DataStream that contains the strings received infinitely
from socket.
|
public StreamExecutionEnvironment(StreamExecutionEnvironment javaEnv)
public static void setDefaultLocalParallelism(int parallelism)
createLocalEnvironment()
.
parallelism
- The default parallelism to use for local execution.public static int getDefaultLocalParallelism()
createLocalEnvironment()
.public static StreamExecutionEnvironment getExecutionEnvironment()
public static StreamExecutionEnvironment createLocalEnvironment(int parallelism)
This method sets the environment's default parallelism to given parameter, which
defaults to the value set via setDefaultLocalParallelism(Int)
.
parallelism
- (undocumented)public static StreamExecutionEnvironment createLocalEnvironmentWithWebUI(Configuration config)
StreamExecutionEnvironment
for local program execution that also starts the
web monitoring UI.
The local execution environment will run the program in a multi-threaded fashion in the same JVM as the environment was created in. It will use the parallelism specified in the parameter.
If the configuration key 'jobmanager.web.port' was set in the configuration, that particular port will be used for the web UI. Otherwise, the default port (8081) will be used.
config
- optional config for the local executionpublic static StreamExecutionEnvironment createRemoteEnvironment(String host, int port, scala.collection.Seq<String> jarFiles)
StreamExecutionEnvironment.setParallelism()
.
host
- The host name or address of the master (JobManager),
where the program should be executed.port
- The port of the master (JobManager), where the program should be executed.jarFiles
- The JAR files with code that needs to be shipped to the cluster. If the
program uses
user-defined functions, user-defined input formats, or any libraries,
those must be
provided in the JAR files.public static StreamExecutionEnvironment createRemoteEnvironment(String host, int port, int parallelism, scala.collection.Seq<String> jarFiles)
host
- The host name or address of the master (JobManager),
where the program should be executed.port
- The port of the master (JobManager), where the program should be executed.parallelism
- The parallelism to use during the execution.jarFiles
- The JAR files with code that needs to be shipped to the cluster. If the
program uses
user-defined functions, user-defined input formats, or any libraries,
those must be
provided in the JAR files.public StreamExecutionEnvironment getJavaEnv()
public ExecutionConfig getConfig()
public List<Tuple2<String,DistributedCache.DistributedCacheEntry>> getCachedFiles()
public void setParallelism(int parallelism)
DataStream.setParallelism(int)
.parallelism
- (undocumented)public void setMaxParallelism(int maxParallelism)
maxParallelism
- (undocumented)public int getParallelism()
DataStream.setParallelism(int)
public int getMaxParallelism()
The maximum degree of parallelism specifies the upper limit for dynamic scaling. It also defines the number of key groups used for partitioned state.
public StreamExecutionEnvironment setBufferTimeout(long timeoutMillis)
timeoutMillis
- (undocumented)public long getBufferTimeout()
public StreamExecutionEnvironment disableOperatorChaining()
public CheckpointConfig getCheckpointConfig()
public StreamExecutionEnvironment enableCheckpointing(long interval, CheckpointingMode mode, boolean force)
The job draws checkpoints periodically, in the given interval. The state will be stored in the configured state backend.
NOTE: Checkpointing iterative streaming dataflows in not properly supported at the moment. If the "force" parameter is set to true, the system will execute the job nonetheless.
interval
- Time interval between state checkpoints in millis.mode
- The checkpointing mode, selecting between "exactly once" and "at least once" guarantees.force
- If true checkpointing will be enabled for iterative jobs as well.public StreamExecutionEnvironment enableCheckpointing(long interval, CheckpointingMode mode)
The job draws checkpoints periodically, in the given interval. The system uses the
given CheckpointingMode
for the checkpointing ("exactly once" vs "at least once").
The state will be stored in the configured state backend.
NOTE: Checkpointing iterative streaming dataflows in not properly supported at
the moment. For that reason, iterative jobs will not be started if used
with enabled checkpointing. To override this mechanism, use the
enableCheckpointing(long, CheckpointingMode, boolean)
method.
interval
- Time interval between state checkpoints in milliseconds.mode
- The checkpointing mode, selecting between "exactly once" and "at least once" guarantees.public StreamExecutionEnvironment enableCheckpointing(long interval)
The job draws checkpoints periodically, in the given interval. The program will use
CheckpointingMode.EXACTLY_ONCE
mode. The state will be stored in the
configured state backend.
NOTE: Checkpointing iterative streaming dataflows in not properly supported at
the moment. For that reason, iterative jobs will not be started if used
with enabled checkpointing. To override this mechanism, use the
enableCheckpointing(long, CheckpointingMode, boolean)
method.
interval
- Time interval between state checkpoints in milliseconds.public StreamExecutionEnvironment enableCheckpointing()
Setting this option assumes that the job is used in production and thus if not stated
explicitly otherwise with calling the setRestartStrategy
method in case of
failure the job will be resubmitted to the cluster indefinitely.
public CheckpointingMode getCheckpointingMode()
public StreamExecutionEnvironment setStateBackend(AbstractStateBackend backend)
KeyedStream
is maintained (heap, managed memory, externally), and where state
snapshots/checkpoints are stored, both for the key/value state, and for checkpointed
functions (implementing the interface
Checkpointed
.
The MemoryStateBackend
for example
maintains the state in heap memory, as objects. It is lightweight without extra
dependencies, but can checkpoint only small states (some counters).
In contrast, the FsStateBackend
stores checkpoints of the state (also maintained as heap objects) in files. When using
a replicated file system (like HDFS, S3, MapR FS, Tachyon, etc) this will guarantee
that state is not lost upon failures of individual nodes and that the entire streaming
program can be executed highly available and strongly consistent (assuming that Flink
is run in high-availability mode).
backend
- (undocumented)public AbstractStateBackend getStateBackend()
public void setRestartStrategy(RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration)
restartStrategyConfiguration
- Restart strategy configuration to be setpublic RestartStrategies.RestartStrategyConfiguration getRestartStrategy()
public void setNumberOfExecutionRetries(int numRetries)
setRestartStrategy()
. The
FixedDelayRestartStrategyConfiguration contains the number of execution retries.numRetries
- (undocumented)public int getNumberOfExecutionRetries()
getRestartStrategy
. The
FixedDelayRestartStrategyConfiguration contains the number of execution retries.public <T extends com.esotericsoftware.kryo.Serializer<?>> void addDefaultKryoSerializer(Class<?> type, T serializer)
type
- The class of the types serialized with the given serializer.serializer
- The serializer to use.public void addDefaultKryoSerializer(Class<?> type, Class<? extends com.esotericsoftware.kryo.Serializer<?>> serializerClass)
type
- The class of the types serialized with the given serializer.serializerClass
- The class of the serializer to use.public <T extends com.esotericsoftware.kryo.Serializer<?>> void registerTypeWithKryoSerializer(Class<?> clazz, T serializer)
KryoSerializer
.
Note that the serializer instance must be serializable (as defined by java.io.Serializable), because it may be distributed to the worker nodes by java serialization.
clazz
- (undocumented)serializer
- (undocumented)public void registerTypeWithKryoSerializer(Class<?> clazz, Class<? extends com.esotericsoftware.kryo.Serializer<?>> serializer)
KryoSerializer
.clazz
- (undocumented)serializer
- (undocumented)public void registerType(Class<?> typeClass)
typeClass
- (undocumented)public void setStreamTimeCharacteristic(TimeCharacteristic characteristic)
If you set the characteristic to IngestionTime of EventTime this will set a default
watermark update interval of 200 ms. If this is not applicable for your application
you should change it using
ExecutionConfig.setAutoWatermarkInterval(long)
characteristic
- The time characteristic.public TimeCharacteristic getStreamTimeCharacteristic()
setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)
public DataStream<Object> generateSequence(long from, long to)
1
the emitted elements are in order.from
- (undocumented)to
- (undocumented)public <T> DataStream<T> fromElements(scala.collection.Seq<T> data, TypeInformation<T> evidence$1)
Note that this operation will result in a non-parallel data source, i.e. a data source with a parallelism of one.
data
- (undocumented)evidence$1
- (undocumented)public <T> DataStream<T> fromCollection(scala.collection.Seq<T> data, TypeInformation<T> evidence$2)
Seq
. The elements need to be serializable
because the framework may move the elements into the cluster if needed.
Note that this operation will result in a non-parallel data source, i.e. a data source with a parallelism of one.
data
- (undocumented)evidence$2
- (undocumented)public <T> DataStream<T> fromCollection(scala.collection.Iterator<T> data, TypeInformation<T> evidence$3)
Iterator
.
Note that this operation will result in a non-parallel data source, i.e. a data source with a parallelism of one.
data
- (undocumented)evidence$3
- (undocumented)public <T> DataStream<T> fromParallelCollection(SplittableIterator<T> data, TypeInformation<T> evidence$4)
SplittableIterator
.data
- (undocumented)evidence$4
- (undocumented)public DataStream<String> readTextFile(String filePath)
filePath
- (undocumented)public DataStream<String> readTextFile(String filePath, String charsetName)
filePath
- (undocumented)charsetName
- (undocumented)public <T> DataStream<T> readFile(FileInputFormat<T> inputFormat, String filePath, TypeInformation<T> evidence$5)
inputFormat
- (undocumented)filePath
- (undocumented)evidence$5
- (undocumented)public DataStream<String> readFileStream(String StreamPath, long intervalMillis, FileMonitoringFunction.WatchType watchType)
StreamPath
- (undocumented)intervalMillis
- (undocumented)watchType
- (undocumented)public <T> DataStream<T> readFile(FileInputFormat<T> inputFormat, String filePath, FileProcessingMode watchType, long interval, FilePathFilter filter, TypeInformation<T> evidence$6)
FileInputFormat#setFilesFilter(FilePathFilter)
to set a filter and
StreamExecutionEnvironment#readFile(FileInputFormat, String, FileProcessingMode, long)
FileInputFormat
.
Depending on the provided FileProcessingMode
.
inputFormat
- The input format used to create the data streamfilePath
- The path of the file, as a URI (e.g., "file:///some/local/file" or
"hdfs://host:port/file/path")watchType
- The mode in which the source should operate, i.e. monitor path and react
to new data, or process once and exitinterval
- In the case of periodic path monitoring, this specifies the interval (in millis)
between consecutive path scansfilter
- The files to be excluded from the processingevidence$6
- (undocumented)public <T> DataStream<T> readFile(FileInputFormat<T> inputFormat, String filePath, FileProcessingMode watchType, long interval, TypeInformation<T> evidence$7)
FileInputFormat
.
Depending on the provided FileProcessingMode
, the source
may periodically monitor (every interval
ms) the path for new data
(FileProcessingMode.PROCESS_CONTINUOUSLY
), or process
once the data currently in the path and exit
(FileProcessingMode.PROCESS_ONCE
). In addition,
if the path contains files not to be processed, the user can specify a custom
FilePathFilter
. As a default implementation you can use
FilePathFilter.createDefaultFilter()
.
** NOTES ON CHECKPOINTING: ** If the watchType
is set to
FileProcessingMode#PROCESS_ONCE
, the source monitors the path ** once **,
creates the FileInputSplits
to be processed, forwards them to the downstream
readers
to read the actual data,
and exits, without waiting for the readers to finish reading. This
implies that no more checkpoint barriers are going to be forwarded
after the source exits, thus having no checkpoints after that point.
inputFormat
- The input format used to create the data streamfilePath
- The path of the file, as a URI (e.g., "file:///some/local/file" or
"hdfs://host:port/file/path")watchType
- The mode in which the source should operate, i.e. monitor path and react
to new data, or process once and exitinterval
- In the case of periodic path monitoring, this specifies the interval (in millis)
between consecutive path scansevidence$7
- (undocumented)public DataStream<String> socketTextStream(String hostname, int port, char delimiter, long maxRetry)
hostname
- (undocumented)port
- (undocumented)delimiter
- (undocumented)maxRetry
- (undocumented)public <T> DataStream<T> createInput(InputFormat<T,?> inputFormat, TypeInformation<T> evidence$8)
inputFormat
- (undocumented)evidence$8
- (undocumented)public <T> DataStream<T> addSource(SourceFunction<T> function, TypeInformation<T> evidence$9)
function
- (undocumented)evidence$9
- (undocumented)public <T> DataStream<T> addSource(scala.Function1<SourceFunction.SourceContext<T>,scala.runtime.BoxedUnit> function, TypeInformation<T> evidence$10)
function
- (undocumented)evidence$10
- (undocumented)public JobExecutionResult execute()
The program execution will be logged and displayed with a generated default name.
public JobExecutionResult execute(String jobName)
The program execution will be logged and displayed with the provided name.
jobName
- (undocumented)public String getExecutionPlan()
public StreamGraph getStreamGraph()
StreamGraph
of the streaming job.
public StreamExecutionEnvironment getWrappedStreamExecutionEnvironment()
StreamExecutionEnvironment
public <F> F scalaClean(F f)
ExecutionConfig
f
- (undocumented)public void registerCachedFile(String filePath, String name)
The RuntimeContext
can be obtained inside UDFs
via RichFunction.getRuntimeContext()
and
provides access DistributedCache
via
RuntimeContext.getDistributedCache()
.
filePath
- The path of the file, as a URI (e.g. "file:///some/path" or
"hdfs://host:port/and/path")name
- The name under which the file is registered.public void registerCachedFile(String filePath, String name, boolean executable)
The RuntimeContext
can be obtained inside UDFs
via RichFunction.getRuntimeContext()
and
provides access DistributedCache
via
RuntimeContext.getDistributedCache()
.
filePath
- The path of the file, as a URI (e.g. "file:///some/path" or
"hdfs://host:port/and/path")name
- The name under which the file is registered.executable
- flag indicating whether the file should be executableCopyright © 2014–2018 The Apache Software Foundation. All rights reserved.