public class StreamExecutionEnvironment extends Object
Constructor and Description |
---|
StreamExecutionEnvironment(StreamExecutionEnvironment javaEnv) |
Modifier and Type | Method and Description |
---|---|
void |
addDefaultKryoSerializer(Class<?> type,
Class<? extends com.esotericsoftware.kryo.Serializer<?>> serializerClass)
Adds a new Kryo default serializer to the Runtime.
|
<T extends com.esotericsoftware.kryo.Serializer<?>> |
addDefaultKryoSerializer(Class<?> type,
T serializer)
Adds a new Kryo default serializer to the Runtime.
|
<T> DataStream<T> |
addSource(scala.Function1<SourceFunction.SourceContext<T>,scala.runtime.BoxedUnit> function,
TypeInformation<T> evidence$9)
Create a DataStream using a user defined source function for arbitrary
source functionality.
|
<T> DataStream<T> |
addSource(SourceFunction<T> function,
TypeInformation<T> evidence$8)
Create a DataStream using a user defined source function for arbitrary
source functionality.
|
<T> DataStream<T> |
createInput(InputFormat<T,?> inputFormat,
TypeInformation<T> evidence$7)
Generic method to create an input data stream with a specific input format.
|
static StreamExecutionEnvironment |
createLocalEnvironment(int parallelism)
Creates a local execution environment.
|
static StreamExecutionEnvironment |
createRemoteEnvironment(String host,
int port,
int parallelism,
scala.collection.Seq<String> jarFiles)
Creates a remote execution environment.
|
static StreamExecutionEnvironment |
createRemoteEnvironment(String host,
int port,
scala.collection.Seq<String> jarFiles)
Creates a remote execution environment.
|
StreamExecutionEnvironment |
disableOperatorChaining()
Disables operator chaining for streaming operators.
|
StreamExecutionEnvironment |
enableCheckpointing()
Method for enabling fault-tolerance.
|
StreamExecutionEnvironment |
enableCheckpointing(long interval)
Enables checkpointing for the streaming job.
|
StreamExecutionEnvironment |
enableCheckpointing(long interval,
CheckpointingMode mode)
Enables checkpointing for the streaming job.
|
StreamExecutionEnvironment |
enableCheckpointing(long interval,
CheckpointingMode mode,
boolean force)
Enables checkpointing for the streaming job.
|
JobExecutionResult |
execute()
Triggers the program execution.
|
JobExecutionResult |
execute(String jobName)
Triggers the program execution.
|
<T> DataStream<T> |
fromCollection(scala.collection.Iterator<T> data,
TypeInformation<T> evidence$3)
Creates a DataStream from the given
Iterator . |
<T> DataStream<T> |
fromCollection(scala.collection.Seq<T> data,
TypeInformation<T> evidence$2)
Creates a DataStream from the given non-empty
Seq . |
<T> DataStream<T> |
fromElements(scala.collection.Seq<T> data,
TypeInformation<T> evidence$1)
Creates a DataStream that contains the given elements.
|
<T> DataStream<T> |
fromParallelCollection(SplittableIterator<T> data,
TypeInformation<T> evidence$4)
Creates a DataStream from the given
SplittableIterator . |
DataStream<Object> |
generateSequence(long from,
long to)
Creates a new DataStream that contains a sequence of numbers.
|
long |
getBufferTimeout()
Gets the default buffer timeout set for this environment
|
CheckpointConfig |
getCheckpointConfig()
Gets the checkpoint config, which defines values like checkpoint interval, delay between
checkpoints, etc.
|
CheckpointingMode |
getCheckpointingMode() |
ExecutionConfig |
getConfig()
Gets the config object.
|
static StreamExecutionEnvironment |
getExecutionEnvironment()
Creates an execution environment that represents the context in which the program is
currently executed.
|
String |
getExecutionPlan()
Creates the plan with which the system will execute the program, and
returns it as a String using a JSON representation of the execution data
flow graph.
|
StreamExecutionEnvironment |
getJavaEnv() |
int |
getNumberOfExecutionRetries()
Deprecated.
This method will be replaced by
getRestartStrategy . The
FixedDelayRestartStrategyConfiguration contains the number of execution retries. |
int |
getParallelism()
Returns the default parallelism for this execution environment.
|
RestartStrategies.RestartStrategyConfiguration |
getRestartStrategy()
Returns the specified restart strategy configuration.
|
AbstractStateBackend |
getStateBackend()
Returns the state backend that defines how to store and checkpoint state.
|
StreamGraph |
getStreamGraph()
Getter of the
StreamGraph of the streaming job. |
TimeCharacteristic |
getStreamTimeCharacteristic()
Gets the time characteristic/
|
StreamExecutionEnvironment |
getWrappedStreamExecutionEnvironment()
Getter of the wrapped
StreamExecutionEnvironment |
<T> DataStream<T> |
readFile(FileInputFormat<T> inputFormat,
String filePath,
FileProcessingMode watchType,
long interval,
FilePathFilter filter,
TypeInformation<T> evidence$6)
Reads the contents of the user-specified path based on the given
FileInputFormat . |
<T> DataStream<T> |
readFile(FileInputFormat<T> inputFormat,
String filePath,
TypeInformation<T> evidence$5)
Reads the given file with the given input format.
|
DataStream<String> |
readFileStream(String StreamPath,
long intervalMillis,
FileMonitoringFunction.WatchType watchType)
Creates a DataStream that contains the contents of file created while
system watches the given path.
|
DataStream<String> |
readTextFile(String filePath)
Creates a DataStream that represents the Strings produced by reading the
given file line wise.
|
DataStream<String> |
readTextFile(String filePath,
String charsetName)
Creates a data stream that represents the Strings produced by reading the given file
line wise.
|
void |
registerType(Class<?> typeClass)
Registers the given type with the serialization stack.
|
void |
registerTypeWithKryoSerializer(Class<?> clazz,
Class<? extends com.esotericsoftware.kryo.Serializer<?>> serializer)
Registers the given type with the serializer at the
KryoSerializer . |
<T extends com.esotericsoftware.kryo.Serializer<?>> |
registerTypeWithKryoSerializer(Class<?> clazz,
T serializer)
Registers the given type with the serializer at the
KryoSerializer . |
<F> F |
scalaClean(F f)
Returns a "closure-cleaned" version of the given function.
|
StreamExecutionEnvironment |
setBufferTimeout(long timeoutMillis)
Sets the maximum time frequency (milliseconds) for the flushing of the
output buffers.
|
static void |
setDefaultLocalParallelism(int parallelism)
Sets the default parallelism that will be used for the local execution
environment created by
createLocalEnvironment() . |
void |
setNumberOfExecutionRetries(int numRetries)
Deprecated.
This method will be replaced by
setRestartStrategy() . The
FixedDelayRestartStrategyConfiguration contains the number of execution retries. |
void |
setParallelism(int parallelism)
Sets the parallelism for operations executed through this environment.
|
void |
setRestartStrategy(RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration)
Sets the restart strategy configuration.
|
StreamExecutionEnvironment |
setStateBackend(AbstractStateBackend backend)
Sets the state backend that describes how to store and checkpoint operator state.
|
void |
setStreamTimeCharacteristic(TimeCharacteristic characteristic)
Sets the time characteristic for all streams create from this environment, e.g., processing
time, event time, or ingestion time.
|
DataStream<String> |
socketTextStream(String hostname,
int port,
char delimiter,
long maxRetry)
Creates a new DataStream that contains the strings received infinitely
from socket.
|
public StreamExecutionEnvironment(StreamExecutionEnvironment javaEnv)
public static void setDefaultLocalParallelism(int parallelism)
createLocalEnvironment()
.
parallelism
- The parallelism to use as the default local parallelism.public static StreamExecutionEnvironment getExecutionEnvironment()
public static StreamExecutionEnvironment createLocalEnvironment(int parallelism)
public static StreamExecutionEnvironment createRemoteEnvironment(String host, int port, scala.collection.Seq<String> jarFiles)
StreamExecutionEnvironment.setParallelism()
.
host
- The host name or address of the master (JobManager),
where the program should be executed.port
- The port of the master (JobManager), where the program should be executed.jarFiles
- The JAR files with code that needs to be shipped to the cluster. If the
program uses
user-defined functions, user-defined input formats, or any libraries,
those must be
provided in the JAR files.public static StreamExecutionEnvironment createRemoteEnvironment(String host, int port, int parallelism, scala.collection.Seq<String> jarFiles)
host
- The host name or address of the master (JobManager),
where the program should be executed.port
- The port of the master (JobManager), where the program should be executed.parallelism
- The parallelism to use during the execution.jarFiles
- The JAR files with code that needs to be shipped to the cluster. If the
program uses
user-defined functions, user-defined input formats, or any libraries,
those must be
provided in the JAR files.public StreamExecutionEnvironment getJavaEnv()
public ExecutionConfig getConfig()
public void setParallelism(int parallelism)
DataStream.setParallelism(int)
.public int getParallelism()
DataStream.setParallelism(int)
public StreamExecutionEnvironment setBufferTimeout(long timeoutMillis)
public long getBufferTimeout()
public StreamExecutionEnvironment disableOperatorChaining()
public CheckpointConfig getCheckpointConfig()
public StreamExecutionEnvironment enableCheckpointing(long interval, CheckpointingMode mode, boolean force)
The job draws checkpoints periodically, in the given interval. The state will be stored in the configured state backend.
NOTE: Checkpointing iterative streaming dataflows in not properly supported at the moment. If the "force" parameter is set to true, the system will execute the job nonetheless.
interval
- Time interval between state checkpoints in millis.mode
- The checkpointing mode, selecting between "exactly once" and "at least once" guarantees.force
- If true checkpointing will be enabled for iterative jobs as well.public StreamExecutionEnvironment enableCheckpointing(long interval, CheckpointingMode mode)
The job draws checkpoints periodically, in the given interval. The system uses the
given CheckpointingMode
for the checkpointing ("exactly once" vs "at least once").
The state will be stored in the configured state backend.
NOTE: Checkpointing iterative streaming dataflows in not properly supported at
the moment. For that reason, iterative jobs will not be started if used
with enabled checkpointing. To override this mechanism, use the
enableCheckpointing(long, CheckpointingMode, boolean)
method.
interval
- Time interval between state checkpoints in milliseconds.mode
- The checkpointing mode, selecting between "exactly once" and "at least once" guarantees.public StreamExecutionEnvironment enableCheckpointing(long interval)
The job draws checkpoints periodically, in the given interval. The program will use
CheckpointingMode.EXACTLY_ONCE
mode. The state will be stored in the
configured state backend.
NOTE: Checkpointing iterative streaming dataflows in not properly supported at
the moment. For that reason, iterative jobs will not be started if used
with enabled checkpointing. To override this mechanism, use the
enableCheckpointing(long, CheckpointingMode, boolean)
method.
interval
- Time interval between state checkpoints in milliseconds.public StreamExecutionEnvironment enableCheckpointing()
Setting this option assumes that the job is used in production and thus if not stated
explicitly otherwise with calling the setRestartStrategy
method in case of
failure the job will be resubmitted to the cluster indefinitely.
public CheckpointingMode getCheckpointingMode()
public StreamExecutionEnvironment setStateBackend(AbstractStateBackend backend)
KeyedStream
is maintained (heap, managed memory, externally), and where state
snapshots/checkpoints are stored, both for the key/value state, and for checkpointed
functions (implementing the interface
Checkpointed
.
The MemoryStateBackend
for example
maintains the state in heap memory, as objects. It is lightweight without extra
dependencies, but can checkpoint only small states (some counters).
In contrast, the FsStateBackend
stores checkpoints of the state (also maintained as heap objects) in files. When using
a replicated file system (like HDFS, S3, MapR FS, Tachyon, etc) this will guarantee
that state is not lost upon failures of individual nodes and that the entire streaming
program can be executed highly available and strongly consistent (assuming that Flink
is run in high-availability mode).
public AbstractStateBackend getStateBackend()
public void setRestartStrategy(RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration)
restartStrategyConfiguration
- Restart strategy configuration to be setpublic RestartStrategies.RestartStrategyConfiguration getRestartStrategy()
public void setNumberOfExecutionRetries(int numRetries)
setRestartStrategy()
. The
FixedDelayRestartStrategyConfiguration contains the number of execution retries.public int getNumberOfExecutionRetries()
getRestartStrategy
. The
FixedDelayRestartStrategyConfiguration contains the number of execution retries.public <T extends com.esotericsoftware.kryo.Serializer<?>> void addDefaultKryoSerializer(Class<?> type, T serializer)
type
- The class of the types serialized with the given serializer.serializer
- The serializer to use.public void addDefaultKryoSerializer(Class<?> type, Class<? extends com.esotericsoftware.kryo.Serializer<?>> serializerClass)
type
- The class of the types serialized with the given serializer.serializerClass
- The class of the serializer to use.public <T extends com.esotericsoftware.kryo.Serializer<?>> void registerTypeWithKryoSerializer(Class<?> clazz, T serializer)
KryoSerializer
.
Note that the serializer instance must be serializable (as defined by java.io.Serializable), because it may be distributed to the worker nodes by java serialization.
public void registerTypeWithKryoSerializer(Class<?> clazz, Class<? extends com.esotericsoftware.kryo.Serializer<?>> serializer)
KryoSerializer
.public void registerType(Class<?> typeClass)
public void setStreamTimeCharacteristic(TimeCharacteristic characteristic)
If you set the characteristic to IngestionTime of EventTime this will set a default
watermark update interval of 200 ms. If this is not applicable for your application
you should change it using
ExecutionConfig.setAutoWatermarkInterval(long)
characteristic
- The time characteristic.public TimeCharacteristic getStreamTimeCharacteristic()
setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)
public DataStream<Object> generateSequence(long from, long to)
1
the emitted elements are in order.public <T> DataStream<T> fromElements(scala.collection.Seq<T> data, TypeInformation<T> evidence$1)
Note that this operation will result in a non-parallel data source, i.e. a data source with a parallelism of one.
public <T> DataStream<T> fromCollection(scala.collection.Seq<T> data, TypeInformation<T> evidence$2)
Seq
. The elements need to be serializable
because the framework may move the elements into the cluster if needed.
Note that this operation will result in a non-parallel data source, i.e. a data source with a parallelism of one.
public <T> DataStream<T> fromCollection(scala.collection.Iterator<T> data, TypeInformation<T> evidence$3)
Iterator
.
Note that this operation will result in a non-parallel data source, i.e. a data source with a parallelism of one.
public <T> DataStream<T> fromParallelCollection(SplittableIterator<T> data, TypeInformation<T> evidence$4)
SplittableIterator
.public DataStream<String> readTextFile(String filePath)
public DataStream<String> readTextFile(String filePath, String charsetName)
public <T> DataStream<T> readFile(FileInputFormat<T> inputFormat, String filePath, TypeInformation<T> evidence$5)
public DataStream<String> readFileStream(String StreamPath, long intervalMillis, FileMonitoringFunction.WatchType watchType)
public <T> DataStream<T> readFile(FileInputFormat<T> inputFormat, String filePath, FileProcessingMode watchType, long interval, FilePathFilter filter, TypeInformation<T> evidence$6)
FileInputFormat
.
Depending on the provided FileProcessingMode
, the source
may periodically monitor (every interval
ms) the path for new data
(FileProcessingMode.PROCESS_CONTINUOUSLY
), or process
once the data currently in the path and exit
(FileProcessingMode.PROCESS_ONCE
). In addition,
if the path contains files not to be processed, the user can specify a custom
FilePathFilter
. As a default implementation you can use
FilePathFilter.createDefaultFilter()
.
** NOTES ON CHECKPOINTING: ** If the watchType
is set to
FileProcessingMode#PROCESS_ONCE
, the source monitors the path ** once **,
creates the FileInputSplits
to be processed, forwards them to the downstream
readers
to read the actual data,
and exits, without waiting for the readers to finish reading. This
implies that no more checkpoint barriers are going to be forwarded
after the source exits, thus having no checkpoints after that point.
inputFormat
- The input format used to create the data streamfilePath
- The path of the file, as a URI (e.g., "file:///some/local/file" or
"hdfs://host:port/file/path")watchType
- The mode in which the source should operate, i.e. monitor path and react
to new data, or process once and exitinterval
- In the case of periodic path monitoring, this specifies the interval (in millis)
between consecutive path scansfilter
- The files to be excluded from the processingpublic DataStream<String> socketTextStream(String hostname, int port, char delimiter, long maxRetry)
public <T> DataStream<T> createInput(InputFormat<T,?> inputFormat, TypeInformation<T> evidence$7)
public <T> DataStream<T> addSource(SourceFunction<T> function, TypeInformation<T> evidence$8)
public <T> DataStream<T> addSource(scala.Function1<SourceFunction.SourceContext<T>,scala.runtime.BoxedUnit> function, TypeInformation<T> evidence$9)
public JobExecutionResult execute()
The program execution will be logged and displayed with a generated default name.
public JobExecutionResult execute(String jobName)
The program execution will be logged and displayed with the provided name.
public String getExecutionPlan()
public StreamGraph getStreamGraph()
StreamGraph
of the streaming job.
public StreamExecutionEnvironment getWrappedStreamExecutionEnvironment()
StreamExecutionEnvironment
public <F> F scalaClean(F f)
ExecutionConfig
Copyright © 2014–2017 The Apache Software Foundation. All rights reserved.