@Public public abstract class StreamExecutionEnvironment extends Object
LocalStreamEnvironment
will cause execution in the current JVM, a
RemoteStreamEnvironment
will cause execution on a remote setup.
The environment provides methods to control the job execution (such as setting the parallelism or the fault tolerance/checkpointing parameters) and to interact with the outside world (data access).
LocalStreamEnvironment
,
RemoteStreamEnvironment
Modifier and Type | Field and Description |
---|---|
protected List<Tuple2<String,DistributedCache.DistributedCacheEntry>> |
cacheFile |
static String |
DEFAULT_JOB_NAME
The default name to use for a streaming job if no other name has been specified.
|
protected boolean |
isChainingEnabled |
protected List<StreamTransformation<?>> |
transformations |
Constructor and Description |
---|
StreamExecutionEnvironment() |
Modifier and Type | Method and Description |
---|---|
void |
addDefaultKryoSerializer(Class<?> type,
Class<? extends com.esotericsoftware.kryo.Serializer<?>> serializerClass)
Adds a new Kryo default serializer to the Runtime.
|
<T extends com.esotericsoftware.kryo.Serializer<?> & Serializable> |
addDefaultKryoSerializer(Class<?> type,
T serializer)
Adds a new Kryo default serializer to the Runtime.
|
void |
addOperator(StreamTransformation<?> transformation)
Adds an operator to the list of operators that should be executed when calling
execute() . |
<OUT> DataStreamSource<OUT> |
addSource(SourceFunction<OUT> function)
Adds a Data Source to the streaming topology.
|
<OUT> DataStreamSource<OUT> |
addSource(SourceFunction<OUT> function,
String sourceName)
Ads a data source with a custom type information thus opening a
DataStream . |
<OUT> DataStreamSource<OUT> |
addSource(SourceFunction<OUT> function,
String sourceName,
TypeInformation<OUT> typeInfo)
Ads a data source with a custom type information thus opening a
DataStream . |
<OUT> DataStreamSource<OUT> |
addSource(SourceFunction<OUT> function,
TypeInformation<OUT> typeInfo)
Ads a data source with a custom type information thus opening a
DataStream . |
<F> F |
clean(F f)
Returns a "closure-cleaned" version of the given function.
|
<OUT> DataStreamSource<OUT> |
createInput(InputFormat<OUT,?> inputFormat)
Generic method to create an input data stream with
InputFormat . |
<OUT> DataStreamSource<OUT> |
createInput(InputFormat<OUT,?> inputFormat,
TypeInformation<OUT> typeInfo)
Generic method to create an input data stream with
InputFormat . |
static LocalStreamEnvironment |
createLocalEnvironment()
Creates a
LocalStreamEnvironment . |
static LocalStreamEnvironment |
createLocalEnvironment(int parallelism)
Creates a
LocalStreamEnvironment . |
static LocalStreamEnvironment |
createLocalEnvironment(int parallelism,
Configuration configuration)
Creates a
LocalStreamEnvironment . |
static StreamExecutionEnvironment |
createLocalEnvironmentWithWebUI(Configuration conf)
Creates a
LocalStreamEnvironment for local program execution that also starts the
web monitoring UI. |
static StreamExecutionEnvironment |
createRemoteEnvironment(String host,
int port,
Configuration clientConfig,
String... jarFiles)
Creates a
RemoteStreamEnvironment . |
static StreamExecutionEnvironment |
createRemoteEnvironment(String host,
int port,
int parallelism,
String... jarFiles)
Creates a
RemoteStreamEnvironment . |
static StreamExecutionEnvironment |
createRemoteEnvironment(String host,
int port,
String... jarFiles)
Creates a
RemoteStreamEnvironment . |
StreamExecutionEnvironment |
disableOperatorChaining()
Disables operator chaining for streaming operators.
|
StreamExecutionEnvironment |
enableCheckpointing()
Deprecated.
Use
enableCheckpointing(long) instead. |
StreamExecutionEnvironment |
enableCheckpointing(long interval)
Enables checkpointing for the streaming job.
|
StreamExecutionEnvironment |
enableCheckpointing(long interval,
CheckpointingMode mode)
Enables checkpointing for the streaming job.
|
StreamExecutionEnvironment |
enableCheckpointing(long interval,
CheckpointingMode mode,
boolean force)
Deprecated.
Use
enableCheckpointing(long, CheckpointingMode) instead.
Forcing checkpoints will be removed in the future. |
JobExecutionResult |
execute()
Triggers the program execution.
|
abstract JobExecutionResult |
execute(String jobName)
Triggers the program execution.
|
<OUT> DataStreamSource<OUT> |
fromCollection(Collection<OUT> data)
Creates a data stream from the given non-empty collection.
|
<OUT> DataStreamSource<OUT> |
fromCollection(Collection<OUT> data,
TypeInformation<OUT> typeInfo)
Creates a data stream from the given non-empty collection.
|
<OUT> DataStreamSource<OUT> |
fromCollection(Iterator<OUT> data,
Class<OUT> type)
Creates a data stream from the given iterator.
|
<OUT> DataStreamSource<OUT> |
fromCollection(Iterator<OUT> data,
TypeInformation<OUT> typeInfo)
Creates a data stream from the given iterator.
|
<OUT> DataStreamSource<OUT> |
fromElements(Class<OUT> type,
OUT... data)
Creates a new data set that contains the given elements.
|
<OUT> DataStreamSource<OUT> |
fromElements(OUT... data)
Creates a new data stream that contains the given elements.
|
<OUT> DataStreamSource<OUT> |
fromParallelCollection(SplittableIterator<OUT> iterator,
Class<OUT> type)
Creates a new data stream that contains elements in the iterator.
|
<OUT> DataStreamSource<OUT> |
fromParallelCollection(SplittableIterator<OUT> iterator,
TypeInformation<OUT> typeInfo)
Creates a new data stream that contains elements in the iterator.
|
DataStreamSource<Long> |
generateSequence(long from,
long to)
Creates a new data stream that contains a sequence of numbers.
|
long |
getBufferTimeout()
Gets the maximum time frequency (milliseconds) for the flushing of the
output buffers.
|
List<Tuple2<String,DistributedCache.DistributedCacheEntry>> |
getCachedFiles()
Get the list of cached files that were registered for distribution among the task managers.
|
CheckpointConfig |
getCheckpointConfig()
Gets the checkpoint config, which defines values like checkpoint interval, delay between
checkpoints, etc.
|
CheckpointingMode |
getCheckpointingMode()
Returns the checkpointing mode (exactly-once vs.
|
long |
getCheckpointInterval()
Returns the checkpointing interval or -1 if checkpointing is disabled.
|
ExecutionConfig |
getConfig()
Gets the config object.
|
static int |
getDefaultLocalParallelism()
Gets the default parallelism that will be used for the local execution environment created by
createLocalEnvironment() . |
static StreamExecutionEnvironment |
getExecutionEnvironment()
Creates an execution environment that represents the context in which the
program is currently executed.
|
String |
getExecutionPlan()
Creates the plan with which the system will execute the program, and
returns it as a String using a JSON representation of the execution data
flow graph.
|
int |
getMaxParallelism()
Gets the maximum degree of parallelism defined for the program.
|
int |
getNumberOfExecutionRetries()
Deprecated.
This method will be replaced by
getRestartStrategy() . |
int |
getParallelism()
Gets the parallelism with which operation are executed by default.
|
RestartStrategies.RestartStrategyConfiguration |
getRestartStrategy()
Returns the specified restart strategy configuration.
|
StateBackend |
getStateBackend()
Gets the state backend that defines how to store and checkpoint state.
|
StreamGraph |
getStreamGraph()
Getter of the
StreamGraph of the streaming job. |
TimeCharacteristic |
getStreamTimeCharacteristic()
Gets the time characteristic.
|
protected static void |
initializeContextEnvironment(StreamExecutionEnvironmentFactory ctx) |
boolean |
isChainingEnabled()
Returns whether operator chaining is enabled.
|
boolean |
isForceCheckpointing()
Deprecated.
Forcing checkpoints will be removed in future version.
|
<OUT> DataStreamSource<OUT> |
readFile(FileInputFormat<OUT> inputFormat,
String filePath)
Reads the contents of the user-specified
filePath based on the given FileInputFormat . |
<OUT> DataStreamSource<OUT> |
readFile(FileInputFormat<OUT> inputFormat,
String filePath,
FileProcessingMode watchType,
long interval)
Reads the contents of the user-specified
filePath based on the given FileInputFormat . |
<OUT> DataStreamSource<OUT> |
readFile(FileInputFormat<OUT> inputFormat,
String filePath,
FileProcessingMode watchType,
long interval,
FilePathFilter filter)
Deprecated.
|
<OUT> DataStreamSource<OUT> |
readFile(FileInputFormat<OUT> inputFormat,
String filePath,
FileProcessingMode watchType,
long interval,
TypeInformation<OUT> typeInformation)
Reads the contents of the user-specified
filePath based on the given FileInputFormat . |
DataStream<String> |
readFileStream(String filePath,
long intervalMillis,
FileMonitoringFunction.WatchType watchType)
Deprecated.
|
DataStreamSource<String> |
readTextFile(String filePath)
Reads the given file line-by-line and creates a data stream that contains a string with the
contents of each such line.
|
DataStreamSource<String> |
readTextFile(String filePath,
String charsetName)
Reads the given file line-by-line and creates a data stream that contains a string with the
contents of each such line.
|
void |
registerCachedFile(String filePath,
String name)
Registers a file at the distributed cache under the given name.
|
void |
registerCachedFile(String filePath,
String name,
boolean executable)
Registers a file at the distributed cache under the given name.
|
void |
registerType(Class<?> type)
Registers the given type with the serialization stack.
|
void |
registerTypeWithKryoSerializer(Class<?> type,
Class<? extends com.esotericsoftware.kryo.Serializer> serializerClass)
Registers the given Serializer via its class as a serializer for the
given type at the KryoSerializer.
|
<T extends com.esotericsoftware.kryo.Serializer<?> & Serializable> |
registerTypeWithKryoSerializer(Class<?> type,
T serializer)
Registers the given type with a Kryo Serializer.
|
protected static void |
resetContextEnvironment() |
StreamExecutionEnvironment |
setBufferTimeout(long timeoutMillis)
Sets the maximum time frequency (milliseconds) for the flushing of the
output buffers.
|
static void |
setDefaultLocalParallelism(int parallelism)
Sets the default parallelism that will be used for the local execution
environment created by
createLocalEnvironment() . |
StreamExecutionEnvironment |
setMaxParallelism(int maxParallelism)
Sets the maximum degree of parallelism defined for the program.
|
void |
setNumberOfExecutionRetries(int numberOfExecutionRetries)
Deprecated.
This method will be replaced by
setRestartStrategy(org.apache.flink.api.common.restartstrategy.RestartStrategies.RestartStrategyConfiguration) . The
RestartStrategies.fixedDelayRestart(int, Time) contains the number of
execution retries. |
StreamExecutionEnvironment |
setParallelism(int parallelism)
Sets the parallelism for operations executed through this environment.
|
void |
setRestartStrategy(RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration)
Sets the restart strategy configuration.
|
StreamExecutionEnvironment |
setStateBackend(AbstractStateBackend backend)
Deprecated.
Use
setStateBackend(StateBackend) instead. |
StreamExecutionEnvironment |
setStateBackend(StateBackend backend)
Sets the state backend that describes how to store and checkpoint operator state.
|
void |
setStreamTimeCharacteristic(TimeCharacteristic characteristic)
Sets the time characteristic for all streams create from this environment, e.g., processing
time, event time, or ingestion time.
|
DataStreamSource<String> |
socketTextStream(String hostname,
int port)
Creates a new data stream that contains the strings received infinitely from a socket.
|
DataStreamSource<String> |
socketTextStream(String hostname,
int port,
char delimiter)
Deprecated.
Use
socketTextStream(String, int, String) instead. |
DataStreamSource<String> |
socketTextStream(String hostname,
int port,
char delimiter,
long maxRetry)
Deprecated.
Use
socketTextStream(String, int, String, long) instead. |
DataStreamSource<String> |
socketTextStream(String hostname,
int port,
String delimiter)
Creates a new data stream that contains the strings received infinitely from a socket.
|
DataStreamSource<String> |
socketTextStream(String hostname,
int port,
String delimiter,
long maxRetry)
Creates a new data stream that contains the strings received infinitely from a socket.
|
public static final String DEFAULT_JOB_NAME
protected final List<StreamTransformation<?>> transformations
protected boolean isChainingEnabled
protected final List<Tuple2<String,DistributedCache.DistributedCacheEntry>> cacheFile
public ExecutionConfig getConfig()
public List<Tuple2<String,DistributedCache.DistributedCacheEntry>> getCachedFiles()
public StreamExecutionEnvironment setParallelism(int parallelism)
LocalStreamEnvironment
uses by default a value equal to the
number of hardware contexts (CPU cores / threads). When executing the
program via the command line client from a JAR file, the default degree
of parallelism is the one configured for that setup.parallelism
- The parallelismpublic StreamExecutionEnvironment setMaxParallelism(int maxParallelism)
The maximum degree of parallelism specifies the upper limit for dynamic scaling. It also defines the number of key groups used for partitioned state.
maxParallelism
- Maximum degree of parallelism to be used for the program.,
with 0 < maxParallelism <= 2^15 - 1public int getParallelism()
public int getMaxParallelism()
The maximum degree of parallelism specifies the upper limit for dynamic scaling. It also defines the number of key groups used for partitioned state.
public StreamExecutionEnvironment setBufferTimeout(long timeoutMillis)
timeoutMillis
- The maximum time between two output flushes.public long getBufferTimeout()
setBufferTimeout(long)
.@PublicEvolving public StreamExecutionEnvironment disableOperatorChaining()
@PublicEvolving public boolean isChainingEnabled()
true
if chaining is enabled, false otherwise.public CheckpointConfig getCheckpointConfig()
public StreamExecutionEnvironment enableCheckpointing(long interval)
CheckpointingMode.EXACTLY_ONCE
guarantees.
The job draws checkpoints periodically, in the given interval. The state will be stored in the configured state backend.
NOTE: Checkpointing iterative streaming dataflows in not properly supported at
the moment. For that reason, iterative jobs will not be started if used
with enabled checkpointing. To override this mechanism, use the
enableCheckpointing(long, CheckpointingMode, boolean)
method.
interval
- Time interval between state checkpoints in milliseconds.public StreamExecutionEnvironment enableCheckpointing(long interval, CheckpointingMode mode)
The job draws checkpoints periodically, in the given interval. The system uses the
given CheckpointingMode
for the checkpointing ("exactly once" vs "at least once").
The state will be stored in the configured state backend.
NOTE: Checkpointing iterative streaming dataflows in not properly supported at
the moment. For that reason, iterative jobs will not be started if used
with enabled checkpointing. To override this mechanism, use the
enableCheckpointing(long, CheckpointingMode, boolean)
method.
interval
- Time interval between state checkpoints in milliseconds.mode
- The checkpointing mode, selecting between "exactly once" and "at least once" guaranteed.@Deprecated @PublicEvolving public StreamExecutionEnvironment enableCheckpointing(long interval, CheckpointingMode mode, boolean force)
enableCheckpointing(long, CheckpointingMode)
instead.
Forcing checkpoints will be removed in the future.The job draws checkpoints periodically, in the given interval. The state will be stored in the configured state backend.
NOTE: Checkpointing iterative streaming dataflows in not properly supported at the moment. If the "force" parameter is set to true, the system will execute the job nonetheless.
interval
- Time interval between state checkpoints in millis.mode
- The checkpointing mode, selecting between "exactly once" and "at least once" guaranteed.force
- If true checkpointing will be enabled for iterative jobs as well.@Deprecated @PublicEvolving public StreamExecutionEnvironment enableCheckpointing()
enableCheckpointing(long)
instead.CheckpointingMode.EXACTLY_ONCE
guarantees.
The job draws checkpoints periodically, in the default interval. The state will be stored in the configured state backend.
NOTE: Checkpointing iterative streaming dataflows in not properly supported at
the moment. For that reason, iterative jobs will not be started if used
with enabled checkpointing. To override this mechanism, use the
enableCheckpointing(long, CheckpointingMode, boolean)
method.
public long getCheckpointInterval()
Shorthand for getCheckpointConfig().getCheckpointInterval()
.
@Deprecated @PublicEvolving public boolean isForceCheckpointing()
public CheckpointingMode getCheckpointingMode()
Shorthand for getCheckpointConfig().getCheckpointingMode()
.
@PublicEvolving public StreamExecutionEnvironment setStateBackend(StateBackend backend)
State managed by the state backend includes both keyed state that is accessible on
keyed streams
, as well as
state maintained directly by the user code that implements
CheckpointedFunction
.
The MemoryStateBackend
for example
maintains the state in heap memory, as objects. It is lightweight without extra dependencies,
but can checkpoint only small states (some counters).
In contrast, the FsStateBackend
stores checkpoints of the state (also maintained as heap objects) in files. When using a replicated
file system (like HDFS, S3, MapR FS, Tachyon, etc) this will guarantee that state is not lost upon
failures of individual nodes and that streaming program can be executed highly available and strongly
consistent (assuming that Flink is run in high-availability mode).
getStateBackend()
@Deprecated @PublicEvolving public StreamExecutionEnvironment setStateBackend(AbstractStateBackend backend)
setStateBackend(StateBackend)
instead.@PublicEvolving public StateBackend getStateBackend()
setStateBackend(StateBackend)
@PublicEvolving public void setRestartStrategy(RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration)
restartStrategyConfiguration
- Restart strategy configuration to be set@PublicEvolving public RestartStrategies.RestartStrategyConfiguration getRestartStrategy()
@Deprecated @PublicEvolving public void setNumberOfExecutionRetries(int numberOfExecutionRetries)
setRestartStrategy(org.apache.flink.api.common.restartstrategy.RestartStrategies.RestartStrategyConfiguration)
. The
RestartStrategies.fixedDelayRestart(int, Time)
contains the number of
execution retries.-1
indicates that the system default value (as defined in the configuration)
should be used.numberOfExecutionRetries
- The number of times the system will try to re-execute failed tasks.@Deprecated @PublicEvolving public int getNumberOfExecutionRetries()
getRestartStrategy()
.-1
indicates that the system default value (as defined
in the configuration) should be used.public <T extends com.esotericsoftware.kryo.Serializer<?> & Serializable> void addDefaultKryoSerializer(Class<?> type, T serializer)
Note that the serializer instance must be serializable (as defined by java.io.Serializable), because it may be distributed to the worker nodes by java serialization.
type
- The class of the types serialized with the given serializer.serializer
- The serializer to use.public void addDefaultKryoSerializer(Class<?> type, Class<? extends com.esotericsoftware.kryo.Serializer<?>> serializerClass)
type
- The class of the types serialized with the given serializer.serializerClass
- The class of the serializer to use.public <T extends com.esotericsoftware.kryo.Serializer<?> & Serializable> void registerTypeWithKryoSerializer(Class<?> type, T serializer)
Note that the serializer instance must be serializable (as defined by java.io.Serializable), because it may be distributed to the worker nodes by java serialization.
type
- The class of the types serialized with the given serializer.serializer
- The serializer to use.public void registerTypeWithKryoSerializer(Class<?> type, Class<? extends com.esotericsoftware.kryo.Serializer> serializerClass)
type
- The class of the types serialized with the given serializer.serializerClass
- The class of the serializer to use.public void registerType(Class<?> type)
type
- The class of the type to register.@PublicEvolving public void setStreamTimeCharacteristic(TimeCharacteristic characteristic)
If you set the characteristic to IngestionTime of EventTime this will set a default
watermark update interval of 200 ms. If this is not applicable for your application
you should change it using ExecutionConfig.setAutoWatermarkInterval(long)
.
characteristic
- The time characteristic.@PublicEvolving public TimeCharacteristic getStreamTimeCharacteristic()
setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)
public DataStreamSource<Long> generateSequence(long from, long to)
1
(using SingleOutputStreamOperator.setParallelism(int)
)
the generated sequence of elements is in order.from
- The number to start at (inclusive)to
- The number to stop at (inclusive)@SafeVarargs public final <OUT> DataStreamSource<OUT> fromElements(OUT... data)
String
or Integer
.
The framework will try and determine the exact type from the elements. In case of generic
elements, it may be necessary to manually supply the type information via
fromCollection(java.util.Collection, org.apache.flink.api.common.typeinfo.TypeInformation)
.
Note that this operation will result in a non-parallel data stream source, i.e. a data stream source with a degree of parallelism one.
OUT
- The type of the returned data streamdata
- The array of elements to create the data stream from.@SafeVarargs public final <OUT> DataStreamSource<OUT> fromElements(Class<OUT> type, OUT... data)
OUT
- The type of the returned data streamtype
- The based class type in the collection.data
- The array of elements to create the data stream from.public <OUT> DataStreamSource<OUT> fromCollection(Collection<OUT> data)
The framework will try and determine the exact type from the collection elements. In case of generic
elements, it may be necessary to manually supply the type information via
fromCollection(java.util.Collection, org.apache.flink.api.common.typeinfo.TypeInformation)
.
Note that this operation will result in a non-parallel data stream source, i.e. a data stream source with parallelism one.
OUT
- The generic type of the returned data stream.data
- The collection of elements to create the data stream from.public <OUT> DataStreamSource<OUT> fromCollection(Collection<OUT> data, TypeInformation<OUT> typeInfo)
Note that this operation will result in a non-parallel data stream source, i.e., a data stream source with parallelism one.
OUT
- The type of the returned data streamdata
- The collection of elements to create the data stream fromtypeInfo
- The TypeInformation for the produced data streampublic <OUT> DataStreamSource<OUT> fromCollection(Iterator<OUT> data, Class<OUT> type)
Because the iterator will remain unmodified until the actual execution happens, the type of data returned by the iterator must be given explicitly in the form of the type class (this is due to the fact that the Java compiler erases the generic type information).
Note that this operation will result in a non-parallel data stream source, i.e., a data stream source with a parallelism of one.
OUT
- The type of the returned data streamdata
- The iterator of elements to create the data stream fromtype
- The class of the data produced by the iterator. Must not be a generic class.fromCollection(java.util.Iterator, org.apache.flink.api.common.typeinfo.TypeInformation)
public <OUT> DataStreamSource<OUT> fromCollection(Iterator<OUT> data, TypeInformation<OUT> typeInfo)
Because the iterator will remain unmodified until the actual execution happens,
the type of data returned by the iterator must be given explicitly in the form of the type
information. This method is useful for cases where the type is generic.
In that case, the type class (as given in
fromCollection(java.util.Iterator, Class)
does not supply all type information.
Note that this operation will result in a non-parallel data stream source, i.e., a data stream source with parallelism one.
OUT
- The type of the returned data streamdata
- The iterator of elements to create the data stream fromtypeInfo
- The TypeInformation for the produced data streampublic <OUT> DataStreamSource<OUT> fromParallelCollection(SplittableIterator<OUT> iterator, Class<OUT> type)
Because the iterator will remain unmodified until the actual execution happens, the type of data returned by the iterator must be given explicitly in the form of the type class (this is due to the fact that the Java compiler erases the generic type information).
OUT
- The type of the returned data streamiterator
- The iterator that produces the elements of the data streamtype
- The class of the data produced by the iterator. Must not be a generic class.public <OUT> DataStreamSource<OUT> fromParallelCollection(SplittableIterator<OUT> iterator, TypeInformation<OUT> typeInfo)
Because the iterator will remain unmodified until the actual execution happens, the type
of data returned by the iterator must be given explicitly in the form of the type
information. This method is useful for cases where the type is generic. In that case, the
type class (as given in
fromParallelCollection(org.apache.flink.util.SplittableIterator, Class)
does not
supply all type information.
OUT
- The type of the returned data streamiterator
- The iterator that produces the elements of the data streamtypeInfo
- The TypeInformation for the produced data stream.public DataStreamSource<String> readTextFile(String filePath)
NOTES ON CHECKPOINTING: The source monitors the path, creates the
FileInputSplits
to be processed, forwards
them to the downstream readers
to read the actual data,
and exits, without waiting for the readers to finish reading. This implies that no more
checkpoint barriers are going to be forwarded after the source exits, thus having no
checkpoints after that point.
filePath
- The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path").public DataStreamSource<String> readTextFile(String filePath, String charsetName)
Charset
with the given name will be
used to read the files.
NOTES ON CHECKPOINTING: The source monitors the path, creates the
FileInputSplits
to be processed,
forwards them to the downstream readers
to read the actual data,
and exits, without waiting for the readers to finish reading. This implies that no more checkpoint
barriers are going to be forwarded after the source exits, thus having no checkpoints after that point.
filePath
- The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path")charsetName
- The name of the character set used to read the filepublic <OUT> DataStreamSource<OUT> readFile(FileInputFormat<OUT> inputFormat, String filePath)
filePath
based on the given FileInputFormat
.
Since all data streams need specific information about their types, this method needs to determine the
type of the data produced by the input format. It will attempt to determine the data type by reflection,
unless the input format implements the ResultTypeQueryable
interface.
In the latter case, this method will invoke the
ResultTypeQueryable.getProducedType()
method to determine data
type produced by the input format.
NOTES ON CHECKPOINTING: The source monitors the path, creates the
FileInputSplits
to be processed,
forwards them to the downstream readers
to read the actual data,
and exits, without waiting for the readers to finish reading. This implies that no more checkpoint
barriers are going to be forwarded after the source exits, thus having no checkpoints after that point.
OUT
- The type of the returned data streamfilePath
- The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path")inputFormat
- The input format used to create the data stream@PublicEvolving @Deprecated public <OUT> DataStreamSource<OUT> readFile(FileInputFormat<OUT> inputFormat, String filePath, FileProcessingMode watchType, long interval, FilePathFilter filter)
FileInputFormat.setFilesFilter(FilePathFilter)
to set a filter and
readFile(FileInputFormat, String, FileProcessingMode, long)
filePath
based on the given FileInputFormat
. Depending
on the provided FileProcessingMode
.
See readFile(FileInputFormat, String, FileProcessingMode, long)
OUT
- The type of the returned data streaminputFormat
- The input format used to create the data streamfilePath
- The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path")watchType
- The mode in which the source should operate, i.e. monitor path and react to new data, or process once and exitinterval
- In the case of periodic path monitoring, this specifies the interval (in millis) between consecutive path scansfilter
- The files to be excluded from the processing@PublicEvolving public <OUT> DataStreamSource<OUT> readFile(FileInputFormat<OUT> inputFormat, String filePath, FileProcessingMode watchType, long interval)
filePath
based on the given FileInputFormat
. Depending
on the provided FileProcessingMode
, the source may periodically monitor (every interval
ms) the path
for new data (FileProcessingMode.PROCESS_CONTINUOUSLY
), or process once the data currently in the path and
exit (FileProcessingMode.PROCESS_ONCE
). In addition, if the path contains files not to be processed, the user
can specify a custom FilePathFilter
. As a default implementation you can use
FilePathFilter.createDefaultFilter()
.
Since all data streams need specific information about their types, this method needs to determine the
type of the data produced by the input format. It will attempt to determine the data type by reflection,
unless the input format implements the ResultTypeQueryable
interface.
In the latter case, this method will invoke the
ResultTypeQueryable.getProducedType()
method to determine data
type produced by the input format.
NOTES ON CHECKPOINTING: If the watchType
is set to FileProcessingMode.PROCESS_ONCE
,
the source monitors the path once, creates the FileInputSplits
to be processed, forwards them to the downstream readers
to read the actual data,
and exits, without waiting for the readers to finish reading. This implies that no more checkpoint barriers
are going to be forwarded after the source exits, thus having no checkpoints after that point.
OUT
- The type of the returned data streaminputFormat
- The input format used to create the data streamfilePath
- The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path")watchType
- The mode in which the source should operate, i.e. monitor path and react to new data, or process once and exitinterval
- In the case of periodic path monitoring, this specifies the interval (in millis) between consecutive path scans@Deprecated public DataStream<String> readFileStream(String filePath, long intervalMillis, FileMonitoringFunction.WatchType watchType)
readFile(FileInputFormat, String, FileProcessingMode, long)
instead.filePath
- The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path/")intervalMillis
- The interval of file watching in millisecondswatchType
- The watch type of file stream. When watchType is FileMonitoringFunction.WatchType.ONLY_NEW_FILES
, the system processes
only
new files. FileMonitoringFunction.WatchType.REPROCESS_WITH_APPENDED
means that the system re-processes all contents of
appended file. FileMonitoringFunction.WatchType.PROCESS_ONLY_APPENDED
means that the system processes only appended
contents
of files.@PublicEvolving public <OUT> DataStreamSource<OUT> readFile(FileInputFormat<OUT> inputFormat, String filePath, FileProcessingMode watchType, long interval, TypeInformation<OUT> typeInformation)
filePath
based on the given FileInputFormat
.
Depending on the provided FileProcessingMode
, the source may periodically monitor (every interval
ms)
the path for new data (FileProcessingMode.PROCESS_CONTINUOUSLY
), or process once the data currently in the
path and exit (FileProcessingMode.PROCESS_ONCE
). In addition, if the path contains files not to be processed,
the user can specify a custom FilePathFilter
. As a default implementation you can use
FilePathFilter.createDefaultFilter()
.
NOTES ON CHECKPOINTING: If the watchType
is set to FileProcessingMode.PROCESS_ONCE
,
the source monitors the path once, creates the FileInputSplits
to be processed, forwards them to the downstream readers
to read the actual data,
and exits, without waiting for the readers to finish reading. This implies that no more checkpoint barriers
are going to be forwarded after the source exits, thus having no checkpoints after that point.
OUT
- The type of the returned data streaminputFormat
- The input format used to create the data streamfilePath
- The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path")watchType
- The mode in which the source should operate, i.e. monitor path and react to new data, or process once and exittypeInformation
- Information on the type of the elements in the output streaminterval
- In the case of periodic path monitoring, this specifies the interval (in millis) between consecutive path scans@Deprecated public DataStreamSource<String> socketTextStream(String hostname, int port, char delimiter, long maxRetry)
socketTextStream(String, int, String, long)
instead.Let us note that the socket itself does not report on abort and as a consequence retries are only initiated when the socket was gracefully terminated.
hostname
- The host name which a server socket bindsport
- The port number which a server socket binds. A port number of 0 means that the port number is automatically
allocated.delimiter
- A character which splits received strings into recordsmaxRetry
- The maximal retry interval in seconds while the program waits for a socket that is temporarily down.
Reconnection is initiated every second. A number of 0 means that the reader is immediately terminated,
while
a negative value ensures retrying forever.@PublicEvolving public DataStreamSource<String> socketTextStream(String hostname, int port, String delimiter, long maxRetry)
Let us note that the socket itself does not report on abort and as a consequence retries are only initiated when the socket was gracefully terminated.
hostname
- The host name which a server socket bindsport
- The port number which a server socket binds. A port number of 0 means that the port number is automatically
allocated.delimiter
- A string which splits received strings into recordsmaxRetry
- The maximal retry interval in seconds while the program waits for a socket that is temporarily down.
Reconnection is initiated every second. A number of 0 means that the reader is immediately terminated,
while
a negative value ensures retrying forever.@Deprecated public DataStreamSource<String> socketTextStream(String hostname, int port, char delimiter)
socketTextStream(String, int, String)
instead.hostname
- The host name which a server socket bindsport
- The port number which a server socket binds. A port number of 0 means that the port number is automatically
allocated.delimiter
- A character which splits received strings into records@PublicEvolving public DataStreamSource<String> socketTextStream(String hostname, int port, String delimiter)
hostname
- The host name which a server socket bindsport
- The port number which a server socket binds. A port number of 0 means that the port number is automatically
allocated.delimiter
- A string which splits received strings into records@PublicEvolving public DataStreamSource<String> socketTextStream(String hostname, int port)
hostname
- The host name which a server socket bindsport
- The port number which a server socket binds. A port number of 0 means that the port number is automatically
allocated.@PublicEvolving public <OUT> DataStreamSource<OUT> createInput(InputFormat<OUT,?> inputFormat)
InputFormat
.
Since all data streams need specific information about their types, this method needs to determine the
type of the data produced by the input format. It will attempt to determine the data type by reflection,
unless the input format implements the ResultTypeQueryable
interface.
In the latter case, this method will invoke the
ResultTypeQueryable.getProducedType()
method to determine data
type produced by the input format.
NOTES ON CHECKPOINTING: In the case of a FileInputFormat
, the source
(which executes the ContinuousFileMonitoringFunction
) monitors the path, creates the
FileInputSplits
to be processed, forwards
them to the downstream ContinuousFileReaderOperator
to read the actual data, and exits,
without waiting for the readers to finish reading. This implies that no more checkpoint
barriers are going to be forwarded after the source exits, thus having no checkpoints.
OUT
- The type of the returned data streaminputFormat
- The input format used to create the data stream@PublicEvolving public <OUT> DataStreamSource<OUT> createInput(InputFormat<OUT,?> inputFormat, TypeInformation<OUT> typeInfo)
InputFormat
.
The data stream is typed to the given TypeInformation. This method is intended for input formats
where the return type cannot be determined by reflection analysis, and that do not implement the
ResultTypeQueryable
interface.
NOTES ON CHECKPOINTING: In the case of a FileInputFormat
, the source
(which executes the ContinuousFileMonitoringFunction
) monitors the path, creates the
FileInputSplits
to be processed, forwards
them to the downstream ContinuousFileReaderOperator
to read the actual data, and exits,
without waiting for the readers to finish reading. This implies that no more checkpoint
barriers are going to be forwarded after the source exits, thus having no checkpoints.
OUT
- The type of the returned data streaminputFormat
- The input format used to create the data streamtypeInfo
- The information about the type of the output typepublic <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function)
By default sources have a parallelism of 1. To enable parallel execution, the user defined source should
implement ParallelSourceFunction
or extend RichParallelSourceFunction
. In these cases the resulting source
will have the parallelism of the environment. To change this afterwards call DataStreamSource.setParallelism(int)
OUT
- type of the returned streamfunction
- the user defined functionpublic <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, String sourceName)
DataStream
. Only in very special cases does the user need to
support type information. Otherwise use
addSource(org.apache.flink.streaming.api.functions.source.SourceFunction)
OUT
- type of the returned streamfunction
- the user defined functionsourceName
- Name of the data sourcepublic <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, TypeInformation<OUT> typeInfo)
DataStream
. Only in very special cases does the user need to
support type information. Otherwise use
addSource(org.apache.flink.streaming.api.functions.source.SourceFunction)
OUT
- type of the returned streamfunction
- the user defined functiontypeInfo
- the user defined type information for the streampublic <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, String sourceName, TypeInformation<OUT> typeInfo)
DataStream
. Only in very special cases does the user need to
support type information. Otherwise use
addSource(org.apache.flink.streaming.api.functions.source.SourceFunction)
OUT
- type of the returned streamfunction
- the user defined functionsourceName
- Name of the data sourcetypeInfo
- the user defined type information for the streampublic JobExecutionResult execute() throws Exception
The program execution will be logged and displayed with a generated default name.
Exception
- which occurs during job execution.public abstract JobExecutionResult execute(String jobName) throws Exception
The program execution will be logged and displayed with the provided name
jobName
- Desired name of the jobException
- which occurs during job execution.@Internal public StreamGraph getStreamGraph()
StreamGraph
of the streaming job.public String getExecutionPlan()
@Internal public <F> F clean(F f)
ExecutionConfig
@Internal public void addOperator(StreamTransformation<?> transformation)
public static StreamExecutionEnvironment getExecutionEnvironment()
createLocalEnvironment()
.public static LocalStreamEnvironment createLocalEnvironment()
LocalStreamEnvironment
. The local execution environment
will run the program in a multi-threaded fashion in the same JVM as the
environment was created in. The default parallelism of the local
environment is the number of hardware contexts (CPU cores / threads),
unless it was specified differently by setParallelism(int)
.public static LocalStreamEnvironment createLocalEnvironment(int parallelism)
LocalStreamEnvironment
. The local execution environment
will run the program in a multi-threaded fashion in the same JVM as the
environment was created in. It will use the parallelism specified in the
parameter.parallelism
- The parallelism for the local environment.public static LocalStreamEnvironment createLocalEnvironment(int parallelism, Configuration configuration)
LocalStreamEnvironment
. The local execution environment
will run the program in a multi-threaded fashion in the same JVM as the
environment was created in. It will use the parallelism specified in the
parameter.parallelism
- The parallelism for the local environment.configuration
- Pass a custom configuration into the cluster@PublicEvolving public static StreamExecutionEnvironment createLocalEnvironmentWithWebUI(Configuration conf)
LocalStreamEnvironment
for local program execution that also starts the
web monitoring UI.
The local execution environment will run the program in a multi-threaded fashion in the same JVM as the environment was created in. It will use the parallelism specified in the parameter.
If the configuration key 'rest.port' was set in the configuration, that particular port will be used for the web UI. Otherwise, the default port (8081) will be used.
public static StreamExecutionEnvironment createRemoteEnvironment(String host, int port, String... jarFiles)
RemoteStreamEnvironment
. The remote environment sends
(parts of) the program to a cluster for execution. Note that all file
paths used in the program must be accessible from the cluster. The
execution will use no parallelism, unless the parallelism is set
explicitly via setParallelism(int)
.host
- The host name or address of the master (JobManager), where the
program should be executed.port
- The port of the master (JobManager), where the program should
be executed.jarFiles
- The JAR files with code that needs to be shipped to the
cluster. If the program uses user-defined functions,
user-defined input formats, or any libraries, those must be
provided in the JAR files.public static StreamExecutionEnvironment createRemoteEnvironment(String host, int port, int parallelism, String... jarFiles)
RemoteStreamEnvironment
. The remote environment sends
(parts of) the program to a cluster for execution. Note that all file
paths used in the program must be accessible from the cluster. The
execution will use the specified parallelism.host
- The host name or address of the master (JobManager), where the
program should be executed.port
- The port of the master (JobManager), where the program should
be executed.parallelism
- The parallelism to use during the execution.jarFiles
- The JAR files with code that needs to be shipped to the
cluster. If the program uses user-defined functions,
user-defined input formats, or any libraries, those must be
provided in the JAR files.public static StreamExecutionEnvironment createRemoteEnvironment(String host, int port, Configuration clientConfig, String... jarFiles)
RemoteStreamEnvironment
. The remote environment sends
(parts of) the program to a cluster for execution. Note that all file
paths used in the program must be accessible from the cluster. The
execution will use the specified parallelism.host
- The host name or address of the master (JobManager), where the
program should be executed.port
- The port of the master (JobManager), where the program should
be executed.clientConfig
- The configuration used by the client that connects to the remote cluster.jarFiles
- The JAR files with code that needs to be shipped to the
cluster. If the program uses user-defined functions,
user-defined input formats, or any libraries, those must be
provided in the JAR files.@PublicEvolving public static int getDefaultLocalParallelism()
createLocalEnvironment()
.@PublicEvolving public static void setDefaultLocalParallelism(int parallelism)
createLocalEnvironment()
.parallelism
- The parallelism to use as the default local parallelism.protected static void initializeContextEnvironment(StreamExecutionEnvironmentFactory ctx)
protected static void resetContextEnvironment()
public void registerCachedFile(String filePath, String name)
The RuntimeContext
can be obtained inside UDFs via
RichFunction.getRuntimeContext()
and provides access
DistributedCache
via
RuntimeContext.getDistributedCache()
.
filePath
- The path of the file, as a URI (e.g. "file:///some/path" or "hdfs://host:port/and/path")name
- The name under which the file is registered.public void registerCachedFile(String filePath, String name, boolean executable)
The RuntimeContext
can be obtained inside UDFs via
RichFunction.getRuntimeContext()
and provides access
DistributedCache
via
RuntimeContext.getDistributedCache()
.
filePath
- The path of the file, as a URI (e.g. "file:///some/path" or "hdfs://host:port/and/path")name
- The name under which the file is registered.executable
- flag indicating whether the file should be executableCopyright © 2014–2019 The Apache Software Foundation. All rights reserved.