@Public public class StreamExecutionEnvironment extends Object implements AutoCloseable
LocalStreamEnvironment
will cause execution in the current JVM, a RemoteStreamEnvironment
will cause execution on a remote setup.
The environment provides methods to control the job execution (such as setting the parallelism or the fault tolerance/checkpointing parameters) and to interact with the outside world (data access).
LocalStreamEnvironment
,
RemoteStreamEnvironment
Modifier and Type | Field and Description |
---|---|
protected List<Tuple2<String,DistributedCache.DistributedCacheEntry>> |
cacheFile
Now we could not migrate this field to configuration.
|
protected CheckpointConfig |
checkpointCfg
Settings that control the checkpointing behavior.
|
protected ExecutionConfig |
config
The execution configuration for this environment.
|
protected Configuration |
configuration
Currently, configuration is split across multiple member variables and classes such as
ExecutionConfig or CheckpointConfig . |
static String |
DEFAULT_JOB_NAME
Deprecated.
This constant does not fit well to batch runtime mode.
|
protected List<Transformation<?>> |
transformations |
Constructor and Description |
---|
StreamExecutionEnvironment() |
StreamExecutionEnvironment(Configuration configuration)
Creates a new
StreamExecutionEnvironment that will use the given Configuration to configure the PipelineExecutor . |
StreamExecutionEnvironment(Configuration configuration,
ClassLoader userClassloader)
Creates a new
StreamExecutionEnvironment that will use the given Configuration to configure the PipelineExecutor . |
StreamExecutionEnvironment(PipelineExecutorServiceLoader executorServiceLoader,
Configuration configuration,
ClassLoader userClassloader)
Creates a new
StreamExecutionEnvironment that will use the given Configuration to configure the PipelineExecutor . |
Modifier and Type | Method and Description |
---|---|
void |
addDefaultKryoSerializer(Class<?> type,
Class<? extends com.esotericsoftware.kryo.Serializer<?>> serializerClass)
Deprecated.
Register data types and serializers through hard codes is deprecated, because you
need to modify the codes when upgrading job version. You should configure this by config
option
PipelineOptions.SERIALIZATION_CONFIG . |
<T extends com.esotericsoftware.kryo.Serializer<?> & Serializable> |
addDefaultKryoSerializer(Class<?> type,
T serializer)
Deprecated.
Register data types and serializers through hard codes is deprecated, because you
need to modify the codes when upgrading job version. Instance-type serializer definition
where serializers are serialized and written into the snapshot and deserialized for use
is deprecated as well. Use class-type serializer definition by
PipelineOptions.SERIALIZATION_CONFIG instead, where only the class name is written into
the snapshot and new instance of the serializer is created for use. This is a breaking
change, and it will be removed in Flink 2.0. |
void |
addOperator(Transformation<?> transformation)
Adds an operator to the list of operators that should be executed when calling
execute() . |
<OUT> DataStreamSource<OUT> |
addSource(SourceFunction<OUT> function)
Deprecated.
This method relies on the
SourceFunction API, which is due to be
removed. Use the fromSource(Source, WatermarkStrategy, String) method based on
the new Source API instead. |
<OUT> DataStreamSource<OUT> |
addSource(SourceFunction<OUT> function,
String sourceName)
Deprecated.
This method relies on the
SourceFunction API, which is due to be
removed. Use the fromSource(Source, WatermarkStrategy, String) method based on
the new Source API instead. |
<OUT> DataStreamSource<OUT> |
addSource(SourceFunction<OUT> function,
String sourceName,
TypeInformation<OUT> typeInfo)
Deprecated.
This method relies on the
SourceFunction API, which is due to be
removed. Use the fromSource(Source, WatermarkStrategy, String, TypeInformation)
method based on the new Source API instead. |
<OUT> DataStreamSource<OUT> |
addSource(SourceFunction<OUT> function,
TypeInformation<OUT> typeInfo)
Deprecated.
This method relies on the
SourceFunction API, which is due to be
removed. Use the fromSource(Source, WatermarkStrategy, String, TypeInformation)
method based on the new Source API instead. |
static boolean |
areExplicitEnvironmentsAllowed()
Checks whether it is currently permitted to explicitly instantiate a LocalEnvironment or a
RemoteEnvironment.
|
<F> F |
clean(F f)
Returns a "closure-cleaned" version of the given function.
|
void |
clearJobListeners()
Clear all registered
JobListener s. |
void |
close()
Close and clean up the execution environment.
|
void |
configure(ReadableConfig configuration)
Sets all relevant options contained in the
ReadableConfig such as e.g. |
void |
configure(ReadableConfig configuration,
ClassLoader classLoader)
Sets all relevant options contained in the
ReadableConfig such as e.g. |
<OUT> DataStreamSource<OUT> |
createInput(InputFormat<OUT,?> inputFormat)
Generic method to create an input data stream with
InputFormat . |
<OUT> DataStreamSource<OUT> |
createInput(InputFormat<OUT,?> inputFormat,
TypeInformation<OUT> typeInfo)
Generic method to create an input data stream with
InputFormat . |
static LocalStreamEnvironment |
createLocalEnvironment()
Creates a
LocalStreamEnvironment . |
static LocalStreamEnvironment |
createLocalEnvironment(Configuration configuration)
Creates a
LocalStreamEnvironment . |
static LocalStreamEnvironment |
createLocalEnvironment(int parallelism)
Creates a
LocalStreamEnvironment . |
static LocalStreamEnvironment |
createLocalEnvironment(int parallelism,
Configuration configuration)
Creates a
LocalStreamEnvironment . |
static StreamExecutionEnvironment |
createLocalEnvironmentWithWebUI(Configuration conf)
Creates a
LocalStreamEnvironment for local program execution that also starts the web
monitoring UI. |
static StreamExecutionEnvironment |
createRemoteEnvironment(String host,
int port,
Configuration clientConfig,
String... jarFiles)
Creates a
RemoteStreamEnvironment . |
static StreamExecutionEnvironment |
createRemoteEnvironment(String host,
int port,
int parallelism,
String... jarFiles)
Creates a
RemoteStreamEnvironment . |
static StreamExecutionEnvironment |
createRemoteEnvironment(String host,
int port,
String... jarFiles)
Creates a
RemoteStreamEnvironment . |
StreamExecutionEnvironment |
disableOperatorChaining()
Disables operator chaining for streaming operators.
|
StreamExecutionEnvironment |
enableChangelogStateBackend(boolean enabled)
Enable the change log for current state backend.
|
StreamExecutionEnvironment |
enableCheckpointing()
Deprecated.
Use
enableCheckpointing(long) instead. |
StreamExecutionEnvironment |
enableCheckpointing(long interval)
Enables checkpointing for the streaming job.
|
StreamExecutionEnvironment |
enableCheckpointing(long interval,
CheckpointingMode mode)
Deprecated.
use
enableCheckpointing(long, CheckpointingMode) instead. |
StreamExecutionEnvironment |
enableCheckpointing(long interval,
CheckpointingMode mode)
Enables checkpointing for the streaming job.
|
StreamExecutionEnvironment |
enableCheckpointing(long interval,
CheckpointingMode mode,
boolean force)
Deprecated.
Use
enableCheckpointing(long, CheckpointingMode) instead. Forcing
checkpoints will be removed in the future. |
JobExecutionResult |
execute()
Triggers the program execution.
|
JobExecutionResult |
execute(StreamGraph streamGraph)
Triggers the program execution.
|
JobExecutionResult |
execute(String jobName)
Triggers the program execution.
|
JobClient |
executeAsync()
Triggers the program asynchronously.
|
JobClient |
executeAsync(StreamGraph streamGraph)
Triggers the program execution asynchronously.
|
JobClient |
executeAsync(String jobName)
Triggers the program execution asynchronously.
|
<OUT> DataStreamSource<OUT> |
fromCollection(Collection<OUT> data)
Deprecated.
This method will be removed a future release, possibly as early as version 2.0.
Use
fromData(Collection) instead. |
<OUT> DataStreamSource<OUT> |
fromCollection(Collection<OUT> data,
TypeInformation<OUT> typeInfo)
Deprecated.
This method will be removed a future release, possibly as early as version 2.0.
Use
fromData(Collection, TypeInformation) instead. |
<OUT> DataStreamSource<OUT> |
fromCollection(Iterator<OUT> data,
Class<OUT> type)
Deprecated.
This method will be removed a future release, possibly as early as version 2.0.
Use
fromData(Collection, TypeInformation) instead. For rate-limited data
generation, use DataGeneratorSource with RateLimiterStrategy . If you need
to use a fixed set of elements in such scenario, combine it with FromElementsGeneratorFunction . |
<OUT> DataStreamSource<OUT> |
fromCollection(Iterator<OUT> data,
TypeInformation<OUT> typeInfo)
Deprecated.
This method will be removed a future release, possibly as early as version 2.0.
Use
fromData(Collection, TypeInformation) instead. For rate-limited data
generation, use DataGeneratorSource with RateLimiterStrategy . If you need
to use a fixed set of elements in such scenario, combine it with FromElementsGeneratorFunction . |
<OUT> DataStreamSource<OUT> |
fromData(Class<OUT> type,
OUT... data)
Creates a new data stream that contains the given elements.
|
<OUT> DataStreamSource<OUT> |
fromData(Collection<OUT> data)
Creates a new data stream that contains the given elements.The type of the data stream is
that of the elements in the collection.
|
<OUT> DataStreamSource<OUT> |
fromData(Collection<OUT> data,
TypeInformation<OUT> typeInfo)
Creates a new data stream that contains the given elements.
|
<OUT> DataStreamSource<OUT> |
fromData(OUT... data)
Creates a new data stream that contains the given elements.
|
<OUT> DataStreamSource<OUT> |
fromData(TypeInformation<OUT> typeInfo,
OUT... data)
Creates a new data stream that contains the given elements.
|
<OUT> DataStreamSource<OUT> |
fromElements(Class<OUT> type,
OUT... data)
Deprecated.
This method will be removed a future release, possibly as early as version 2.0.
Use
#fromData(OUT...) instead. |
<OUT> DataStreamSource<OUT> |
fromElements(OUT... data)
Deprecated.
This method will be removed a future release, possibly as early as version 2.0.
Use
#fromData(OUT...) instead. |
<OUT> DataStreamSource<OUT> |
fromParallelCollection(SplittableIterator<OUT> iterator,
Class<OUT> type)
Creates a new data stream that contains elements in the iterator.
|
<OUT> DataStreamSource<OUT> |
fromParallelCollection(SplittableIterator<OUT> iterator,
TypeInformation<OUT> typeInfo)
Creates a new data stream that contains elements in the iterator.
|
DataStreamSource<Long> |
fromSequence(long from,
long to)
Creates a new data stream that contains a sequence of numbers (longs) and is useful for
testing and for cases that just need a stream of N events of any kind.
|
<OUT> DataStreamSource<OUT> |
fromSource(Source<OUT,?,?> source,
WatermarkStrategy<OUT> timestampsAndWatermarks,
String sourceName)
Adds a data
Source to the environment to get a DataStream . |
<OUT> DataStreamSource<OUT> |
fromSource(Source<OUT,?,?> source,
WatermarkStrategy<OUT> timestampsAndWatermarks,
String sourceName,
TypeInformation<OUT> typeInfo)
Adds a data
Source to the environment to get a DataStream . |
DataStreamSource<Long> |
generateSequence(long from,
long to)
Deprecated.
Use
fromSequence(long, long) instead to create a new data stream that
contains NumberSequenceSource . |
StreamGraph |
generateStreamGraph(List<Transformation<?>> transformations)
Generates a
StreamGraph that consists of the given transformations and is configured with the configuration of this environment. |
long |
getBufferTimeout()
Gets the maximum time frequency (milliseconds) for the flushing of the output buffers.
|
List<Tuple2<String,DistributedCache.DistributedCacheEntry>> |
getCachedFiles()
Get the list of cached files that were registered for distribution among the task managers.
|
CheckpointConfig |
getCheckpointConfig()
Gets the checkpoint config, which defines values like checkpoint interval, delay between
checkpoints, etc.
|
CheckpointingMode |
getCheckpointingConsistencyMode()
Returns the checkpointing consistency mode (exactly-once vs. at-least-once).
|
CheckpointingMode |
getCheckpointingMode()
Deprecated.
Use
getCheckpointingConsistencyMode() instead. |
long |
getCheckpointInterval()
Returns the checkpointing interval or -1 if checkpointing is disabled.
|
ExecutionConfig |
getConfig()
Gets the config object.
|
ReadableConfig |
getConfiguration()
Gives read-only access to the underlying configuration of this environment.
|
static int |
getDefaultLocalParallelism()
Gets the default parallelism that will be used for the local execution environment created by
createLocalEnvironment() . |
Path |
getDefaultSavepointDirectory()
Gets the default savepoint directory for this Job.
|
static StreamExecutionEnvironment |
getExecutionEnvironment()
Creates an execution environment that represents the context in which the program is
currently executed.
|
static StreamExecutionEnvironment |
getExecutionEnvironment(Configuration configuration)
Creates an execution environment that represents the context in which the program is
currently executed.
|
String |
getExecutionPlan()
Creates the plan with which the system will execute the program, and returns it as a String
using a JSON representation of the execution data flow graph.
|
List<JobListener> |
getJobListeners()
Gets the config JobListeners.
|
int |
getMaxParallelism()
Gets the maximum degree of parallelism defined for the program.
|
int |
getNumberOfExecutionRetries()
Deprecated.
This method will be replaced by
getRestartStrategy() . |
int |
getParallelism()
Gets the parallelism with which operation are executed by default.
|
RestartStrategies.RestartStrategyConfiguration |
getRestartStrategy()
Deprecated.
The method is marked as deprecated because starting from Flink 1.19, the usage of
all complex Java objects related to configuration, including their getter and setter
methods, should be replaced by ConfigOption. In a future major version of Flink, this
method will be removed entirely. It is recommended to switch to using the ConfigOptions
provided by
RestartStrategyOptions for configuring
restart strategies. |
StateBackend |
getStateBackend()
Deprecated.
The method is marked as deprecated because starting from Flink 1.19, the usage of
all complex Java objects related to configuration, including their getter and setter
methods, should be replaced by ConfigOption. In a future major version of Flink, this
method will be removed entirely. It is recommended to find which state backend is used by
state backend ConfigOption. For more details on using ConfigOption for state backend
configuration, please refer to the Flink documentation: state-backends
|
StreamGraph |
getStreamGraph()
Getter of the
StreamGraph of the streaming job. |
StreamGraph |
getStreamGraph(boolean clearTransformations)
Getter of the
StreamGraph of the streaming job with the option to clear previously
registered transformations . |
TimeCharacteristic |
getStreamTimeCharacteristic()
Deprecated.
See
setStreamTimeCharacteristic(TimeCharacteristic) for deprecation
notice. |
List<Transformation<?>> |
getTransformations() |
protected ClassLoader |
getUserClassloader() |
protected static void |
initializeContextEnvironment(StreamExecutionEnvironmentFactory ctx) |
void |
invalidateClusterDataset(AbstractID datasetId) |
boolean |
isChainingEnabled()
Returns whether operator chaining is enabled.
|
boolean |
isChainingOfOperatorsWithDifferentMaxParallelismEnabled() |
TernaryBoolean |
isChangelogStateBackendEnabled()
Gets the enable status of change log for state backend.
|
boolean |
isForceCheckpointing()
Deprecated.
Forcing checkpoints will be removed in future version.
|
boolean |
isForceUnalignedCheckpoints()
Returns whether unaligned checkpoints are force-enabled.
|
boolean |
isUnalignedCheckpointsEnabled()
Returns whether unaligned checkpoints are enabled.
|
protected Set<AbstractID> |
listCompletedClusterDatasets() |
<OUT> DataStreamSource<OUT> |
readFile(FileInputFormat<OUT> inputFormat,
String filePath)
Deprecated.
Use
FileSource#forRecordStreamFormat()/forBulkFileFormat()/forRecordFileFormat() instead . An
example of reading a file using a simple TextLineInputFormat :
|
<OUT> DataStreamSource<OUT> |
readFile(FileInputFormat<OUT> inputFormat,
String filePath,
FileProcessingMode watchType,
long interval)
Deprecated.
Use
FileSource#forRecordStreamFormat()/forBulkFileFormat()/forRecordFileFormat() instead . An
example of reading a file using a simple TextLineInputFormat :
|
<OUT> DataStreamSource<OUT> |
readFile(FileInputFormat<OUT> inputFormat,
String filePath,
FileProcessingMode watchType,
long interval,
FilePathFilter filter)
Deprecated.
|
<OUT> DataStreamSource<OUT> |
readFile(FileInputFormat<OUT> inputFormat,
String filePath,
FileProcessingMode watchType,
long interval,
TypeInformation<OUT> typeInformation)
Deprecated.
Use
FileSource#forRecordStreamFormat()/forBulkFileFormat()/forRecordFileFormat() instead . An
example of reading a file using a simple TextLineInputFormat :
|
DataStream<String> |
readFileStream(String filePath,
long intervalMillis,
FileMonitoringFunction.WatchType watchType)
Deprecated.
|
DataStreamSource<String> |
readTextFile(String filePath)
Deprecated.
Use
FileSource#forRecordStreamFormat()/forBulkFileFormat()/forRecordFileFormat() instead . An
example of reading a file using a simple TextLineInputFormat :
|
DataStreamSource<String> |
readTextFile(String filePath,
String charsetName)
Deprecated.
Use
FileSource#forRecordStreamFormat()/forBulkFileFormat()/forRecordFileFormat() instead . An
example of reading a file using a simple TextLineInputFormat :
|
void |
registerCachedFile(String filePath,
String name)
Registers a file at the distributed cache under the given name.
|
void |
registerCachedFile(String filePath,
String name,
boolean executable)
Registers a file at the distributed cache under the given name.
|
<T> void |
registerCacheTransformation(AbstractID intermediateDataSetID,
CacheTransformation<T> t) |
void |
registerCollectIterator(CollectResultIterator<?> iterator) |
void |
registerJobListener(JobListener jobListener)
Register a
JobListener in this environment. |
StreamExecutionEnvironment |
registerSlotSharingGroup(SlotSharingGroup slotSharingGroup)
Register a slot sharing group with its resource spec.
|
void |
registerType(Class<?> type)
Deprecated.
Register data types and serializers through hard codes is deprecated, because you
need to modify the codes when upgrading job version. You should configure this by config
option
PipelineOptions.SERIALIZATION_CONFIG . |
void |
registerTypeWithKryoSerializer(Class<?> type,
Class<? extends com.esotericsoftware.kryo.Serializer> serializerClass)
Deprecated.
Register data types and serializers through hard codes is deprecated, because you
need to modify the codes when upgrading job version. You should configure this by config
option
PipelineOptions.SERIALIZATION_CONFIG . |
<T extends com.esotericsoftware.kryo.Serializer<?> & Serializable> |
registerTypeWithKryoSerializer(Class<?> type,
T serializer)
Deprecated.
Register data types and serializers through hard codes is deprecated, because you
need to modify the codes when upgrading job version. Instance-type serializer definition
where serializers are serialized and written into the snapshot and deserialized for use
is deprecated as well. Use class-type serializer definition by
PipelineOptions.SERIALIZATION_CONFIG instead, where only the class name is written into
the snapshot and new instance of the serializer is created for use. This is a breaking
change, and it will be removed in Flink 2.0. |
protected static void |
resetContextEnvironment() |
StreamExecutionEnvironment |
setBufferTimeout(long timeoutMillis)
Sets the maximum time frequency (milliseconds) for the flushing of the output buffers.
|
static void |
setDefaultLocalParallelism(int parallelism)
Sets the default parallelism that will be used for the local execution environment created by
createLocalEnvironment() . |
StreamExecutionEnvironment |
setDefaultSavepointDirectory(Path savepointDirectory)
Sets the default savepoint directory, where savepoints will be written to if no is explicitly
provided when triggered.
|
StreamExecutionEnvironment |
setDefaultSavepointDirectory(String savepointDirectory)
Sets the default savepoint directory, where savepoints will be written to if no is explicitly
provided when triggered.
|
StreamExecutionEnvironment |
setDefaultSavepointDirectory(URI savepointDirectory)
Sets the default savepoint directory, where savepoints will be written to if no is explicitly
provided when triggered.
|
StreamExecutionEnvironment |
setMaxParallelism(int maxParallelism)
Sets the maximum degree of parallelism defined for the program.
|
void |
setNumberOfExecutionRetries(int numberOfExecutionRetries)
Deprecated.
This method will be replaced by
setRestartStrategy(org.apache.flink.api.common.restartstrategy.RestartStrategies.RestartStrategyConfiguration) . The RestartStrategies.fixedDelayRestart(int, Duration) contains the number of execution
retries. |
StreamExecutionEnvironment |
setParallelism(int parallelism)
Sets the parallelism for operations executed through this environment.
|
void |
setRestartStrategy(RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration)
Deprecated.
The method is marked as deprecated because starting from Flink 1.19, the usage of
all complex Java objects related to configuration, including their getter and setter
methods, should be replaced by ConfigOption. In a future major version of Flink, this
method will be removed entirely. It is recommended to switch to using the ConfigOptions
provided by
RestartStrategyOptions for configuring
restart strategies. |
StreamExecutionEnvironment |
setRuntimeMode(RuntimeExecutionMode executionMode)
Sets the runtime execution mode for the application (see
RuntimeExecutionMode ). |
StreamExecutionEnvironment |
setStateBackend(StateBackend backend)
Deprecated.
The method is marked as deprecated because starting from Flink 1.19, the usage of
all complex Java objects related to configuration, including their getter and setter
methods, should be replaced by ConfigOption. In a future major version of Flink, this
method will be removed entirely. It is recommended to switch to using the ConfigOptions
provided for configuring state backend like the following code snippet:
For more details on using ConfigOption for state backend configuration, please refer to
the Flink documentation: state-backends |
void |
setStreamTimeCharacteristic(TimeCharacteristic characteristic)
Deprecated.
In Flink 1.12 the default stream time characteristic has been changed to
TimeCharacteristic.EventTime , thus you don't need to call this method for enabling
event-time support anymore. Explicitly using processing-time windows and timers works in
event-time mode. If you need to disable watermarks, please use ExecutionConfig.setAutoWatermarkInterval(long) . If you are using TimeCharacteristic.IngestionTime , please manually set an appropriate WatermarkStrategy . If you are using generic "time window" operations (for example
through KeyedStream.window(WindowAssigner) that change
behaviour based on the time characteristic, please use equivalent operations that
explicitly specify processing time or event time. |
DataStreamSource<String> |
socketTextStream(String hostname,
int port)
Creates a new data stream that contains the strings received infinitely from a socket.
|
DataStreamSource<String> |
socketTextStream(String hostname,
int port,
char delimiter)
Deprecated.
Use
socketTextStream(String, int, String) instead. |
DataStreamSource<String> |
socketTextStream(String hostname,
int port,
char delimiter,
long maxRetry)
Deprecated.
Use
socketTextStream(String, int, String, long) instead. |
DataStreamSource<String> |
socketTextStream(String hostname,
int port,
String delimiter)
Creates a new data stream that contains the strings received infinitely from a socket.
|
DataStreamSource<String> |
socketTextStream(String hostname,
int port,
String delimiter,
long maxRetry)
Creates a new data stream that contains the strings received infinitely from a socket.
|
@Deprecated public static final String DEFAULT_JOB_NAME
protected final ExecutionConfig config
protected final CheckpointConfig checkpointCfg
protected final List<Transformation<?>> transformations
protected final List<Tuple2<String,DistributedCache.DistributedCacheEntry>> cacheFile
protected final Configuration configuration
ExecutionConfig
or CheckpointConfig
. This architecture makes it quite difficult to
handle/merge/enrich configuration or restrict access in other APIs.
In the long-term, this Configuration
object should be the source of truth for
newly added ConfigOption
s that are relevant for DataStream API. Make sure to also
update configure(ReadableConfig, ClassLoader)
.
public StreamExecutionEnvironment()
@PublicEvolving public StreamExecutionEnvironment(Configuration configuration)
StreamExecutionEnvironment
that will use the given Configuration
to configure the PipelineExecutor
.@PublicEvolving public StreamExecutionEnvironment(Configuration configuration, ClassLoader userClassloader)
StreamExecutionEnvironment
that will use the given Configuration
to configure the PipelineExecutor
.
In addition, this constructor allows specifying the user code ClassLoader
.
@PublicEvolving public StreamExecutionEnvironment(PipelineExecutorServiceLoader executorServiceLoader, Configuration configuration, ClassLoader userClassloader)
StreamExecutionEnvironment
that will use the given Configuration
to configure the PipelineExecutor
.
In addition, this constructor allows specifying the PipelineExecutorServiceLoader
and user code ClassLoader
.
@Internal public void registerCollectIterator(CollectResultIterator<?> iterator)
protected ClassLoader getUserClassloader()
public ExecutionConfig getConfig()
public List<Tuple2<String,DistributedCache.DistributedCacheEntry>> getCachedFiles()
@PublicEvolving public List<JobListener> getJobListeners()
public StreamExecutionEnvironment setParallelism(int parallelism)
LocalStreamEnvironment
uses by default a value equal to the number of hardware contexts (CPU
cores / threads). When executing the program via the command line client from a JAR file, the
default degree of parallelism is the one configured for that setup.parallelism
- The parallelism@PublicEvolving public StreamExecutionEnvironment setRuntimeMode(RuntimeExecutionMode executionMode)
RuntimeExecutionMode
). This
is equivalent to setting the execution.runtime-mode
in your application's
configuration file.
We recommend users to NOT use this method but set the execution.runtime-mode
using
the command-line when submitting the application. Keeping the application code
configuration-free allows for more flexibility as the same application will be able to be
executed in any execution mode.
executionMode
- the desired execution mode.public StreamExecutionEnvironment setMaxParallelism(int maxParallelism)
The maximum degree of parallelism specifies the upper limit for dynamic scaling. It also defines the number of key groups used for partitioned state.
maxParallelism
- Maximum degree of parallelism to be used for the program., with 0 < maxParallelism <= 2^15
.@PublicEvolving public StreamExecutionEnvironment registerSlotSharingGroup(SlotSharingGroup slotSharingGroup)
Note that a slot sharing group hints the scheduler that the grouped operators CAN be deployed into a shared slot. There's no guarantee that the scheduler always deploy the grouped operators together. In cases grouped operators are deployed into separate slots, the slot resources will be derived from the specified group requirements.
slotSharingGroup
- which contains name and its resource spec.public int getParallelism()
public int getMaxParallelism()
The maximum degree of parallelism specifies the upper limit for dynamic scaling. It also defines the number of key groups used for partitioned state.
public StreamExecutionEnvironment setBufferTimeout(long timeoutMillis)
timeoutMillis
- The maximum time between two output flushes.public long getBufferTimeout()
setBufferTimeout(long)
.@PublicEvolving public StreamExecutionEnvironment disableOperatorChaining()
@PublicEvolving public boolean isChainingEnabled()
true
if chaining is enabled, false otherwise.@PublicEvolving public boolean isChainingOfOperatorsWithDifferentMaxParallelismEnabled()
public CheckpointConfig getCheckpointConfig()
public StreamExecutionEnvironment enableCheckpointing(long interval)
CheckpointingMode.EXACTLY_ONCE
guarantees.
The job draws checkpoints periodically, in the given interval. The state will be stored in the configured state backend.
NOTE: Checkpointing iterative streaming dataflows is not properly supported at the moment. For that reason, iterative jobs will not be started if used with enabled checkpointing.
interval
- Time interval between state checkpoints in milliseconds.@Deprecated public StreamExecutionEnvironment enableCheckpointing(long interval, CheckpointingMode mode)
enableCheckpointing(long, CheckpointingMode)
instead.The job draws checkpoints periodically, in the given interval. The system uses the given
CheckpointingMode
for the checkpointing ("exactly
once" vs "at least once"). The state will be stored in the configured state backend.
NOTE: Checkpointing iterative streaming dataflows is not properly supported at the moment. For that reason, iterative jobs will not be started if used with enabled checkpointing.
interval
- Time interval between state checkpoints in milliseconds.mode
- The checkpointing mode, selecting between "exactly once" and "at least once"
guaranteed.public StreamExecutionEnvironment enableCheckpointing(long interval, CheckpointingMode mode)
The job draws checkpoints periodically, in the given interval. The system uses the given
CheckpointingMode
for the checkpointing ("exactly once" vs "at least once"). The
state will be stored in the configured state backend.
NOTE: Checkpointing iterative streaming dataflows is not properly supported at the moment. For that reason, iterative jobs will not be started if used with enabled checkpointing.
interval
- Time interval between state checkpoints in milliseconds.mode
- The checkpointing mode, selecting between "exactly once" and "at least once"
guaranteed.@Deprecated @PublicEvolving public StreamExecutionEnvironment enableCheckpointing(long interval, CheckpointingMode mode, boolean force)
enableCheckpointing(long, CheckpointingMode)
instead. Forcing
checkpoints will be removed in the future.The job draws checkpoints periodically, in the given interval. The state will be stored in the configured state backend.
NOTE: Checkpointing iterative streaming dataflows is not properly supported at the moment. If the "force" parameter is set to true, the system will execute the job nonetheless.
interval
- Time interval between state checkpoints in millis.mode
- The checkpointing mode, selecting between "exactly once" and "at least once"
guaranteed.force
- If true checkpointing will be enabled for iterative jobs as well.@Deprecated @PublicEvolving public StreamExecutionEnvironment enableCheckpointing()
enableCheckpointing(long)
instead.CheckpointingMode.EXACTLY_ONCE
guarantees.
The job draws checkpoints periodically, in the default interval. The state will be stored in the configured state backend.
NOTE: Checkpointing iterative streaming dataflows is not properly supported at the moment. For that reason, iterative jobs will not be started if used with enabled checkpointing.
public long getCheckpointInterval()
Shorthand for getCheckpointConfig().getCheckpointInterval()
.
@Deprecated @PublicEvolving public boolean isForceCheckpointing()
@PublicEvolving public boolean isUnalignedCheckpointsEnabled()
@PublicEvolving public boolean isForceUnalignedCheckpoints()
@Deprecated public CheckpointingMode getCheckpointingMode()
getCheckpointingConsistencyMode()
instead.Shorthand for getCheckpointConfig().getCheckpointingMode()
.
public CheckpointingMode getCheckpointingConsistencyMode()
Shorthand for getCheckpointConfig().getCheckpointingConsistencyMode()
.
@Deprecated @PublicEvolving public StreamExecutionEnvironment setStateBackend(StateBackend backend)
Configuration config = new Configuration();
config.set(StateBackendOptions.STATE_BACKEND, "hashmap");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
For more details on using ConfigOption for state backend configuration, please refer to
the Flink documentation: state-backendsState managed by the state backend includes both keyed state that is accessible on keyed streams
, as well as state
maintained directly by the user code that implements CheckpointedFunction
.
The HashMapStateBackend
maintains state in
heap memory, as objects. It is lightweight without extra dependencies, but is limited to JVM
heap memory.
In contrast, the EmbeddedRocksDBStateBackend
stores its state in an embedded
RocksDB
instance. This state backend can store very large state that exceeds memory
and spills to local disk. All key/value state (including windows) is stored in the key/value
index of RocksDB.
In both cases, fault tolerance is managed via the jobs CheckpointStorage
which configures how and where state
backends persist during a checkpoint.
getStateBackend()
,
CheckpointConfig.setCheckpointStorage( org.apache.flink.runtime.state.CheckpointStorage)
@Deprecated @PublicEvolving public StateBackend getStateBackend()
setStateBackend(StateBackend)
@PublicEvolving public StreamExecutionEnvironment enableChangelogStateBackend(boolean enabled)
Stateful operators write the state changes to that log (logging the state), in addition to applying them to the state tables in RocksDB or the in-mem Hashtable.
An operator can acknowledge a checkpoint as soon as the changes in the log have reached the durable checkpoint storage.
The state tables are persisted periodically, independent of the checkpoints. We call this the materialization of the state on the checkpoint storage.
Once the state is materialized on checkpoint storage, the state changelog can be truncated to the corresponding point.
It establish a way to drastically reduce the checkpoint interval for streaming applications across state backends. For more details please check the FLIP-158.
If this method is not called explicitly, it means no preference for enabling the change log. Configs for change log enabling will override in different config levels (job/local/cluster).
enabled
- true if enable the change log for state backend explicitly, otherwise disable
the change log.isChangelogStateBackendEnabled()
@PublicEvolving public TernaryBoolean isChangelogStateBackendEnabled()
TernaryBoolean
for the enable status of change log for state backend. Could
be TernaryBoolean.UNDEFINED
if user never specify this by calling enableChangelogStateBackend(boolean)
.enableChangelogStateBackend(boolean)
@PublicEvolving public StreamExecutionEnvironment setDefaultSavepointDirectory(String savepointDirectory)
getDefaultSavepointDirectory()
@PublicEvolving public StreamExecutionEnvironment setDefaultSavepointDirectory(URI savepointDirectory)
getDefaultSavepointDirectory()
@PublicEvolving public StreamExecutionEnvironment setDefaultSavepointDirectory(Path savepointDirectory)
getDefaultSavepointDirectory()
@Nullable @PublicEvolving public Path getDefaultSavepointDirectory()
setDefaultSavepointDirectory(Path)
@Deprecated @PublicEvolving public void setRestartStrategy(RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration)
RestartStrategyOptions
for configuring
restart strategies.restartStrategyConfiguration
- Restart strategy configuration to be set@Deprecated @PublicEvolving public RestartStrategies.RestartStrategyConfiguration getRestartStrategy()
RestartStrategyOptions
for configuring
restart strategies.@Deprecated @PublicEvolving public void setNumberOfExecutionRetries(int numberOfExecutionRetries)
setRestartStrategy(org.apache.flink.api.common.restartstrategy.RestartStrategies.RestartStrategyConfiguration)
. The RestartStrategies.fixedDelayRestart(int, Duration)
contains the number of execution
retries.-1
indicates that the system default value (as
defined in the configuration) should be used.numberOfExecutionRetries
- The number of times the system will try to re-execute failed
tasks.@Deprecated @PublicEvolving public int getNumberOfExecutionRetries()
getRestartStrategy()
.-1
indicates that the system default value (as defined in the configuration) should be used.@Deprecated public <T extends com.esotericsoftware.kryo.Serializer<?> & Serializable> void addDefaultKryoSerializer(Class<?> type, T serializer)
PipelineOptions.SERIALIZATION_CONFIG
instead, where only the class name is written into
the snapshot and new instance of the serializer is created for use. This is a breaking
change, and it will be removed in Flink 2.0.Note that the serializer instance must be serializable (as defined by java.io.Serializable), because it may be distributed to the worker nodes by java serialization.
type
- The class of the types serialized with the given serializer.serializer
- The serializer to use.@Deprecated public void addDefaultKryoSerializer(Class<?> type, Class<? extends com.esotericsoftware.kryo.Serializer<?>> serializerClass)
PipelineOptions.SERIALIZATION_CONFIG
.type
- The class of the types serialized with the given serializer.serializerClass
- The class of the serializer to use.@Deprecated public <T extends com.esotericsoftware.kryo.Serializer<?> & Serializable> void registerTypeWithKryoSerializer(Class<?> type, T serializer)
PipelineOptions.SERIALIZATION_CONFIG
instead, where only the class name is written into
the snapshot and new instance of the serializer is created for use. This is a breaking
change, and it will be removed in Flink 2.0.Note that the serializer instance must be serializable (as defined by java.io.Serializable), because it may be distributed to the worker nodes by java serialization.
type
- The class of the types serialized with the given serializer.serializer
- The serializer to use.@Deprecated public void registerTypeWithKryoSerializer(Class<?> type, Class<? extends com.esotericsoftware.kryo.Serializer> serializerClass)
PipelineOptions.SERIALIZATION_CONFIG
.type
- The class of the types serialized with the given serializer.serializerClass
- The class of the serializer to use.@Deprecated public void registerType(Class<?> type)
PipelineOptions.SERIALIZATION_CONFIG
.type
- The class of the type to register.@PublicEvolving @Deprecated public void setStreamTimeCharacteristic(TimeCharacteristic characteristic)
TimeCharacteristic.EventTime
, thus you don't need to call this method for enabling
event-time support anymore. Explicitly using processing-time windows and timers works in
event-time mode. If you need to disable watermarks, please use ExecutionConfig.setAutoWatermarkInterval(long)
. If you are using TimeCharacteristic.IngestionTime
, please manually set an appropriate WatermarkStrategy
. If you are using generic "time window" operations (for example
through KeyedStream.window(WindowAssigner)
that change
behaviour based on the time characteristic, please use equivalent operations that
explicitly specify processing time or event time.If you set the characteristic to IngestionTime of EventTime this will set a default
watermark update interval of 200 ms. If this is not applicable for your application you
should change it using ExecutionConfig.setAutoWatermarkInterval(long)
.
characteristic
- The time characteristic.@PublicEvolving @Deprecated public TimeCharacteristic getStreamTimeCharacteristic()
setStreamTimeCharacteristic(TimeCharacteristic)
for deprecation
notice.@PublicEvolving public void configure(ReadableConfig configuration)
ReadableConfig
such as e.g. StreamPipelineOptions.TIME_CHARACTERISTIC
. It will reconfigure StreamExecutionEnvironment
, ExecutionConfig
and CheckpointConfig
.
It will change the value of a setting only if a corresponding option was set in the configuration
. If a key is not present, the current value of a field will remain untouched.
configuration
- a configuration to read the values from@PublicEvolving public void configure(ReadableConfig configuration, ClassLoader classLoader)
ReadableConfig
such as e.g. StreamPipelineOptions.TIME_CHARACTERISTIC
. It will reconfigure StreamExecutionEnvironment
, ExecutionConfig
and CheckpointConfig
.
It will change the value of a setting only if a corresponding option was set in the configuration
. If a key is not present, the current value of a field will remain untouched.
configuration
- a configuration to read the values fromclassLoader
- a class loader to use when loading classes@SafeVarargs public final <OUT> DataStreamSource<OUT> fromData(OUT... data)
String
or Integer
.
The framework will try and determine the exact type from the elements. In case of generic
elements, it may be necessary to manually supply the type information via #fromData(org.apache.flink.api.common.typeinfo.TypeInformation, OUT...)
.
NOTE: This creates a non-parallel data stream source by default (parallelism of one).
Adjustment of parallelism is supported via setParallelism()
on the result.
OUT
- The type of the returned data streamdata
- The array of elements to create the data stream from.@SafeVarargs public final <OUT> DataStreamSource<OUT> fromData(TypeInformation<OUT> typeInfo, OUT... data)
typeInfo
type. The sequence of elements must not be empty.
NOTE: This creates a non-parallel data stream source by default (parallelism of one).
Adjustment of parallelism is supported via setParallelism()
on the result.
OUT
- The type of the returned data streamtypeInfo
- The type information of the elements.data
- The array of elements to create the data stream from.public <OUT> DataStreamSource<OUT> fromData(Collection<OUT> data, TypeInformation<OUT> typeInfo)
String
or Integer
.
The framework will try and determine the exact type from the elements. In case of generic
elements, it may be necessary to manually supply the type information via #fromData(org.apache.flink.api.common.typeinfo.TypeInformation, OUT...)
.
NOTE: This creates a non-parallel data stream source by default (parallelism of one).
Adjustment of parallelism is supported via setParallelism()
on the result.
OUT
- The generic type of the returned data stream.data
- The collection of elements to create the data stream from.typeInfo
- The type information of the elements.@SafeVarargs public final <OUT> DataStreamSource<OUT> fromData(Class<OUT> type, OUT... data)
NOTE: This creates a non-parallel data stream source by default (parallelism of one).
Adjustment of parallelism is supported via setParallelism()
on the result.
OUT
- The type of the returned data streamtype
- The based class type in the collection.data
- The array of elements to create the data stream from.public <OUT> DataStreamSource<OUT> fromData(Collection<OUT> data)
The framework will try and determine the exact type from the collection elements. In case
of generic elements, it may be necessary to manually supply the type information via fromData(java.util.Collection, org.apache.flink.api.common.typeinfo.TypeInformation)
.
NOTE: This creates a non-parallel data stream source by default (parallelism of one).
Adjustment of parallelism is supported via setParallelism()
on the result.
OUT
- The generic type of the returned data stream.data
- The collection of elements to create the data stream from.@Deprecated public DataStreamSource<Long> generateSequence(long from, long to)
fromSequence(long, long)
instead to create a new data stream that
contains NumberSequenceSource
.1
(using SingleOutputStreamOperator.setParallelism(int)
)
the generated sequence of elements is in order.from
- The number to start at (inclusive)to
- The number to stop at (inclusive)public DataStreamSource<Long> fromSequence(long from, long to)
The generated source splits the sequence into as many parallel sub-sequences as there are parallel source readers. Each sub-sequence will be produced in order. If the parallelism is limited to one, the source will produce one sequence in order.
This source is always bounded. For very long sequences (for example over the entire domain of long integer values), you may consider executing the application in a streaming manner because of the end bound that is pretty far away.
Use fromSource(Source, WatermarkStrategy, String)
together with NumberSequenceSource
if you required more control over the created sources. For example, if
you want to set a WatermarkStrategy
.
from
- The number to start at (inclusive)to
- The number to stop at (inclusive)@SafeVarargs @Deprecated public final <OUT> DataStreamSource<OUT> fromElements(OUT... data)
#fromData(OUT...)
instead.String
or Integer
.
The framework will try and determine the exact type from the elements. In case of generic
elements, it may be necessary to manually supply the type information via fromCollection(java.util.Collection, org.apache.flink.api.common.typeinfo.TypeInformation)
.
Note that this operation will result in a non-parallel data stream source, i.e. a data stream source with a degree of parallelism one.
OUT
- The type of the returned data streamdata
- The array of elements to create the data stream from.@SafeVarargs @Deprecated public final <OUT> DataStreamSource<OUT> fromElements(Class<OUT> type, OUT... data)
#fromData(OUT...)
instead.OUT
- The type of the returned data streamtype
- The based class type in the collection.data
- The array of elements to create the data stream from.public <OUT> DataStreamSource<OUT> fromCollection(Collection<OUT> data)
fromData(Collection)
instead.The framework will try and determine the exact type from the collection elements. In case
of generic elements, it may be necessary to manually supply the type information via fromCollection(java.util.Collection, org.apache.flink.api.common.typeinfo.TypeInformation)
.
Note that this operation will result in a non-parallel data stream source, i.e. a data stream source with parallelism one.
OUT
- The generic type of the returned data stream.data
- The collection of elements to create the data stream from.public <OUT> DataStreamSource<OUT> fromCollection(Collection<OUT> data, TypeInformation<OUT> typeInfo)
fromData(Collection, TypeInformation)
instead.Note that this operation will result in a non-parallel data stream source, i.e., a data stream source with parallelism one.
OUT
- The type of the returned data streamdata
- The collection of elements to create the data stream fromtypeInfo
- The TypeInformation for the produced data streampublic <OUT> DataStreamSource<OUT> fromCollection(Iterator<OUT> data, Class<OUT> type)
fromData(Collection, TypeInformation)
instead. For rate-limited data
generation, use DataGeneratorSource
with RateLimiterStrategy
. If you need
to use a fixed set of elements in such scenario, combine it with FromElementsGeneratorFunction
.Because the iterator will remain unmodified until the actual execution happens, the type of data returned by the iterator must be given explicitly in the form of the type class (this is due to the fact that the Java compiler erases the generic type information).
Note that this operation will result in a non-parallel data stream source, i.e., a data stream source with a parallelism of one.
OUT
- The type of the returned data streamdata
- The iterator of elements to create the data stream fromtype
- The class of the data produced by the iterator. Must not be a generic class.fromCollection(java.util.Iterator,
org.apache.flink.api.common.typeinfo.TypeInformation)
public <OUT> DataStreamSource<OUT> fromCollection(Iterator<OUT> data, TypeInformation<OUT> typeInfo)
fromData(Collection, TypeInformation)
instead. For rate-limited data
generation, use DataGeneratorSource
with RateLimiterStrategy
. If you need
to use a fixed set of elements in such scenario, combine it with FromElementsGeneratorFunction
.Because the iterator will remain unmodified until the actual execution happens, the type
of data returned by the iterator must be given explicitly in the form of the type
information. This method is useful for cases where the type is generic. In that case, the
type class (as given in fromCollection(java.util.Iterator, Class)
does not supply
all type information.
Note that this operation will result in a non-parallel data stream source, i.e., a data stream source with parallelism one.
OUT
- The type of the returned data streamdata
- The iterator of elements to create the data stream fromtypeInfo
- The TypeInformation for the produced data streampublic <OUT> DataStreamSource<OUT> fromParallelCollection(SplittableIterator<OUT> iterator, Class<OUT> type)
Because the iterator will remain unmodified until the actual execution happens, the type of data returned by the iterator must be given explicitly in the form of the type class (this is due to the fact that the Java compiler erases the generic type information).
OUT
- The type of the returned data streamiterator
- The iterator that produces the elements of the data streamtype
- The class of the data produced by the iterator. Must not be a generic class.public <OUT> DataStreamSource<OUT> fromParallelCollection(SplittableIterator<OUT> iterator, TypeInformation<OUT> typeInfo)
Because the iterator will remain unmodified until the actual execution happens, the type
of data returned by the iterator must be given explicitly in the form of the type
information. This method is useful for cases where the type is generic. In that case, the
type class (as given in fromParallelCollection(org.apache.flink.util.SplittableIterator, Class)
does not supply all
type information.
OUT
- The type of the returned data streamiterator
- The iterator that produces the elements of the data streamtypeInfo
- The TypeInformation for the produced data stream.@Deprecated public DataStreamSource<String> readTextFile(String filePath)
FileSource#forRecordStreamFormat()/forBulkFileFormat()/forRecordFileFormat() instead
. An
example of reading a file using a simple TextLineInputFormat
:
FileSource<String> source =
FileSource.forRecordStreamFormat(
new TextLineInputFormat(), new Path("/foo/bar"))
.build();
NOTES ON CHECKPOINTING: The source monitors the path, creates the FileInputSplits
to be processed, forwards them to
the downstream readers to read the actual data, and exits, without waiting for the readers to
finish reading. This implies that no more checkpoint barriers are going to be forwarded after
the source exits, thus having no checkpoints after that point.
filePath
- The path of the file, as a URI (e.g., "file:///some/local/file" or
"hdfs://host:port/file/path").@Deprecated public DataStreamSource<String> readTextFile(String filePath, String charsetName)
FileSource#forRecordStreamFormat()/forBulkFileFormat()/forRecordFileFormat() instead
. An
example of reading a file using a simple TextLineInputFormat
:
FileSource<String> source =
FileSource.forRecordStreamFormat(
new TextLineInputFormat("UTF-8"), new Path("/foo/bar"))
.build();
Charset
with the given name will be
used to read the files.
NOTES ON CHECKPOINTING: The source monitors the path, creates the FileInputSplits
to be processed, forwards them to
the downstream readers to read the actual data, and exits, without waiting for the readers to
finish reading. This implies that no more checkpoint barriers are going to be forwarded after
the source exits, thus having no checkpoints after that point.
filePath
- The path of the file, as a URI (e.g., "file:///some/local/file" or
"hdfs://host:port/file/path")charsetName
- The name of the character set used to read the file@Deprecated public <OUT> DataStreamSource<OUT> readFile(FileInputFormat<OUT> inputFormat, String filePath)
FileSource#forRecordStreamFormat()/forBulkFileFormat()/forRecordFileFormat() instead
. An
example of reading a file using a simple TextLineInputFormat
:
FileSource<String> source =
FileSource.forRecordStreamFormat(
new TextLineInputFormat(), new Path("/foo/bar"))
.build();
filePath
based on the given FileInputFormat
.
Since all data streams need specific information about their types, this method needs to
determine the type of the data produced by the input format. It will attempt to determine the
data type by reflection, unless the input format implements the ResultTypeQueryable
interface. In the latter case, this
method will invoke the ResultTypeQueryable.getProducedType()
method to
determine data type produced by the input format.
NOTES ON CHECKPOINTING: The source monitors the path, creates the FileInputSplits
to be processed, forwards them to
the downstream readers to read the actual data, and exits, without waiting for the readers to
finish reading. This implies that no more checkpoint barriers are going to be forwarded after
the source exits, thus having no checkpoints after that point.
OUT
- The type of the returned data streamfilePath
- The path of the file, as a URI (e.g., "file:///some/local/file" or
"hdfs://host:port/file/path")inputFormat
- The input format used to create the data stream@PublicEvolving @Deprecated public <OUT> DataStreamSource<OUT> readFile(FileInputFormat<OUT> inputFormat, String filePath, FileProcessingMode watchType, long interval, FilePathFilter filter)
FileInputFormat.setFilesFilter(FilePathFilter)
to set a filter and
readFile(FileInputFormat, String, FileProcessingMode,
long)
filePath
based on the given FileInputFormat
. Depending on the provided FileProcessingMode
.
See readFile(FileInputFormat, String, FileProcessingMode, long)
OUT
- The type of the returned data streaminputFormat
- The input format used to create the data streamfilePath
- The path of the file, as a URI (e.g., "file:///some/local/file" or
"hdfs://host:port/file/path")watchType
- The mode in which the source should operate, i.e. monitor path and react to
new data, or process once and exitinterval
- In the case of periodic path monitoring, this specifies the interval (in
millis) between consecutive path scansfilter
- The files to be excluded from the processing@Deprecated @PublicEvolving public <OUT> DataStreamSource<OUT> readFile(FileInputFormat<OUT> inputFormat, String filePath, FileProcessingMode watchType, long interval)
FileSource#forRecordStreamFormat()/forBulkFileFormat()/forRecordFileFormat() instead
. An
example of reading a file using a simple TextLineInputFormat
:
FileSource<String> source =
FileSource.forRecordStreamFormat(
new TextLineInputFormat(), new Path("/foo/bar"))
.monitorContinuously(Duration.of(10, SECONDS))
.build();
filePath
based on the given FileInputFormat
. Depending on the provided FileProcessingMode
, the source may
periodically monitor (every interval
ms) the path for new data (FileProcessingMode.PROCESS_CONTINUOUSLY
), or process once the data currently in the path and
exit (FileProcessingMode.PROCESS_ONCE
). In addition, if the path contains files not
to be processed, the user can specify a custom FilePathFilter
. As a default
implementation you can use FilePathFilter.createDefaultFilter()
.
Since all data streams need specific information about their types, this method needs to
determine the type of the data produced by the input format. It will attempt to determine the
data type by reflection, unless the input format implements the ResultTypeQueryable
interface. In the latter case, this
method will invoke the ResultTypeQueryable.getProducedType()
method to
determine data type produced by the input format.
NOTES ON CHECKPOINTING: If the watchType
is set to FileProcessingMode.PROCESS_ONCE
, the source monitors the path once, creates the
FileInputSplits
to be processed, forwards
them to the downstream readers to read the actual data, and exits, without waiting for the
readers to finish reading. This implies that no more checkpoint barriers are going to be
forwarded after the source exits, thus having no checkpoints after that point.
OUT
- The type of the returned data streaminputFormat
- The input format used to create the data streamfilePath
- The path of the file, as a URI (e.g., "file:///some/local/file" or
"hdfs://host:port/file/path")watchType
- The mode in which the source should operate, i.e. monitor path and react to
new data, or process once and exitinterval
- In the case of periodic path monitoring, this specifies the interval (in
millis) between consecutive path scans@Deprecated public DataStream<String> readFileStream(String filePath, long intervalMillis, FileMonitoringFunction.WatchType watchType)
readFile(FileInputFormat, String, FileProcessingMode, long)
instead.filePath
- The path of the file, as a URI (e.g., "file:///some/local/file" or
"hdfs://host:port/file/path/")intervalMillis
- The interval of file watching in millisecondswatchType
- The watch type of file stream. When watchType is FileMonitoringFunction.WatchType.ONLY_NEW_FILES
,
the system processes only new files. FileMonitoringFunction.WatchType.REPROCESS_WITH_APPENDED
means that the system re-processes all contents of appended file. FileMonitoringFunction.WatchType.PROCESS_ONLY_APPENDED
means that the system processes only appended contents of files.@Deprecated @PublicEvolving public <OUT> DataStreamSource<OUT> readFile(FileInputFormat<OUT> inputFormat, String filePath, FileProcessingMode watchType, long interval, TypeInformation<OUT> typeInformation)
FileSource#forRecordStreamFormat()/forBulkFileFormat()/forRecordFileFormat() instead
. An
example of reading a file using a simple TextLineInputFormat
:
FileSource<String> source =
FileSource.forRecordStreamFormat(
new TextLineInputFormat(), new Path("/foo/bar"))
.monitorContinuously(Duration.of(10, SECONDS))
.build();
filePath
based on the given FileInputFormat
. Depending on the provided FileProcessingMode
, the source may
periodically monitor (every interval
ms) the path for new data (FileProcessingMode.PROCESS_CONTINUOUSLY
), or process once the data currently in the path and
exit (FileProcessingMode.PROCESS_ONCE
). In addition, if the path contains files not
to be processed, the user can specify a custom FilePathFilter
. As a default
implementation you can use FilePathFilter.createDefaultFilter()
.
NOTES ON CHECKPOINTING: If the watchType
is set to FileProcessingMode.PROCESS_ONCE
, the source monitors the path once, creates the
FileInputSplits
to be processed, forwards
them to the downstream readers to read the actual data, and exits, without waiting for the
readers to finish reading. This implies that no more checkpoint barriers are going to be
forwarded after the source exits, thus having no checkpoints after that point.
OUT
- The type of the returned data streaminputFormat
- The input format used to create the data streamfilePath
- The path of the file, as a URI (e.g., "file:///some/local/file" or
"hdfs://host:port/file/path")watchType
- The mode in which the source should operate, i.e. monitor path and react to
new data, or process once and exittypeInformation
- Information on the type of the elements in the output streaminterval
- In the case of periodic path monitoring, this specifies the interval (in
millis) between consecutive path scans@Deprecated public DataStreamSource<String> socketTextStream(String hostname, int port, char delimiter, long maxRetry)
socketTextStream(String, int, String, long)
instead.Let us note that the socket itself does not report on abort and as a consequence retries are only initiated when the socket was gracefully terminated.
hostname
- The host name which a server socket bindsport
- The port number which a server socket binds. A port number of 0 means that the
port number is automatically allocated.delimiter
- A character which splits received strings into recordsmaxRetry
- The maximal retry interval in seconds while the program waits for a socket
that is temporarily down. Reconnection is initiated every second. A number of 0 means
that the reader is immediately terminated, while a negative value ensures retrying
forever.@PublicEvolving public DataStreamSource<String> socketTextStream(String hostname, int port, String delimiter, long maxRetry)
Let us note that the socket itself does not report on abort and as a consequence retries are only initiated when the socket was gracefully terminated.
hostname
- The host name which a server socket bindsport
- The port number which a server socket binds. A port number of 0 means that the
port number is automatically allocated.delimiter
- A string which splits received strings into recordsmaxRetry
- The maximal retry interval in seconds while the program waits for a socket
that is temporarily down. Reconnection is initiated every second. A number of 0 means
that the reader is immediately terminated, while a negative value ensures retrying
forever.@Deprecated public DataStreamSource<String> socketTextStream(String hostname, int port, char delimiter)
socketTextStream(String, int, String)
instead.hostname
- The host name which a server socket bindsport
- The port number which a server socket binds. A port number of 0 means that the
port number is automatically allocated.delimiter
- A character which splits received strings into records@PublicEvolving public DataStreamSource<String> socketTextStream(String hostname, int port, String delimiter)
hostname
- The host name which a server socket bindsport
- The port number which a server socket binds. A port number of 0 means that the
port number is automatically allocated.delimiter
- A string which splits received strings into records@PublicEvolving public DataStreamSource<String> socketTextStream(String hostname, int port)
hostname
- The host name which a server socket bindsport
- The port number which a server socket binds. A port number of 0 means that the
port number is automatically allocated.@PublicEvolving public <OUT> DataStreamSource<OUT> createInput(InputFormat<OUT,?> inputFormat)
InputFormat
.
Since all data streams need specific information about their types, this method needs to
determine the type of the data produced by the input format. It will attempt to determine the
data type by reflection, unless the input format implements the ResultTypeQueryable
interface. In the latter case, this
method will invoke the ResultTypeQueryable.getProducedType()
method to
determine data type produced by the input format.
NOTES ON CHECKPOINTING: In the case of a FileInputFormat
, the source
(which executes the ContinuousFileMonitoringFunction
) monitors the path, creates the
FileInputSplits
to be processed, forwards
them to the downstream readers to read the actual data, and exits, without waiting for the
readers to finish reading. This implies that no more checkpoint barriers are going to be
forwarded after the source exits, thus having no checkpoints.
OUT
- The type of the returned data streaminputFormat
- The input format used to create the data stream@PublicEvolving public <OUT> DataStreamSource<OUT> createInput(InputFormat<OUT,?> inputFormat, TypeInformation<OUT> typeInfo)
InputFormat
.
The data stream is typed to the given TypeInformation. This method is intended for input
formats where the return type cannot be determined by reflection analysis, and that do not
implement the ResultTypeQueryable
interface.
NOTES ON CHECKPOINTING: In the case of a FileInputFormat
, the source
(which executes the ContinuousFileMonitoringFunction
) monitors the path, creates the
FileInputSplits
to be processed, forwards
them to the downstream readers to read the actual data, and exits, without waiting for the
readers to finish reading. This implies that no more checkpoint barriers are going to be
forwarded after the source exits, thus having no checkpoints.
OUT
- The type of the returned data streaminputFormat
- The input format used to create the data streamtypeInfo
- The information about the type of the output type@Deprecated public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function)
SourceFunction
API, which is due to be
removed. Use the fromSource(Source, WatermarkStrategy, String)
method based on
the new Source
API instead.By default sources have a parallelism of 1. To enable parallel execution, the user defined
source should implement ParallelSourceFunction
or extend RichParallelSourceFunction
. In these cases
the resulting source will have the parallelism of the environment. To change this afterwards
call DataStreamSource.setParallelism(int)
OUT
- type of the returned streamfunction
- the user defined function@Deprecated public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, String sourceName)
SourceFunction
API, which is due to be
removed. Use the fromSource(Source, WatermarkStrategy, String)
method based on
the new Source
API instead.DataStream
. Only in
very special cases does the user need to support type information. Otherwise use addSource(org.apache.flink.streaming.api.functions.source.SourceFunction)
OUT
- type of the returned streamfunction
- the user defined functionsourceName
- Name of the data source@Deprecated public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, TypeInformation<OUT> typeInfo)
SourceFunction
API, which is due to be
removed. Use the fromSource(Source, WatermarkStrategy, String, TypeInformation)
method based on the new Source
API instead.DataStream
. Only in
very special cases does the user need to support type information. Otherwise use addSource(org.apache.flink.streaming.api.functions.source.SourceFunction)
OUT
- type of the returned streamfunction
- the user defined functiontypeInfo
- the user defined type information for the stream@Deprecated public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, String sourceName, TypeInformation<OUT> typeInfo)
SourceFunction
API, which is due to be
removed. Use the fromSource(Source, WatermarkStrategy, String, TypeInformation)
method based on the new Source
API instead.DataStream
. Only in
very special cases does the user need to support type information. Otherwise use addSource(org.apache.flink.streaming.api.functions.source.SourceFunction)
OUT
- type of the returned streamfunction
- the user defined functionsourceName
- Name of the data sourcetypeInfo
- the user defined type information for the stream@PublicEvolving public <OUT> DataStreamSource<OUT> fromSource(Source<OUT,?,?> source, WatermarkStrategy<OUT> timestampsAndWatermarks, String sourceName)
Source
to the environment to get a DataStream
.
The result will be either a bounded data stream (that can be processed in a batch way) or
an unbounded data stream (that must be processed in a streaming way), based on the
boundedness property of the source, as defined by Source.getBoundedness()
.
The result type (that is used to create serializers for the produced data events) will be automatically extracted. This is useful for sources that describe the produced types already in their configuration, to avoid having to declare the type multiple times. For example the file sources and Kafka sources already define the produced byte their parsers/serializers/formats, and can forward that information.
OUT
- type of the returned streamsource
- the user defined sourcesourceName
- Name of the data source@Experimental public <OUT> DataStreamSource<OUT> fromSource(Source<OUT,?,?> source, WatermarkStrategy<OUT> timestampsAndWatermarks, String sourceName, TypeInformation<OUT> typeInfo)
Source
to the environment to get a DataStream
.
The result will be either a bounded data stream (that can be processed in a batch way) or
an unbounded data stream (that must be processed in a streaming way), based on the
boundedness property of the source, as defined by Source.getBoundedness()
.
This method takes an explicit type information for the produced data stream, so that
callers can define directly what type/serializer will be used for the produced stream. For
sources that describe their produced type, the method fromSource(Source,
WatermarkStrategy, String)
can be used to avoid specifying the produced type redundantly.
OUT
- type of the returned streamsource
- the user defined sourcesourceName
- Name of the data sourcetypeInfo
- the user defined type information for the streampublic JobExecutionResult execute() throws Exception
The program execution will be logged and displayed with a generated default name.
Exception
- which occurs during job execution.public JobExecutionResult execute(String jobName) throws Exception
The program execution will be logged and displayed with the provided name
jobName
- Desired name of the jobException
- which occurs during job execution.@Internal public JobExecutionResult execute(StreamGraph streamGraph) throws Exception
streamGraph
- the stream graph representing the transformationsException
- which occurs during job execution.@PublicEvolving public void registerJobListener(JobListener jobListener)
JobListener
in this environment. The JobListener
will be notified
on specific job status changed.@PublicEvolving public void clearJobListeners()
JobListener
s.@PublicEvolving public final JobClient executeAsync() throws Exception
The program execution will be logged and displayed with a generated default name.
@PublicEvolving public JobClient executeAsync(String jobName) throws Exception
The program execution will be logged and displayed with the provided name
@Internal public JobClient executeAsync(StreamGraph streamGraph) throws Exception
@Internal public StreamGraph getStreamGraph()
StreamGraph
of the streaming job. This call clears previously
registered transformations
.@Internal public StreamGraph getStreamGraph(boolean clearTransformations)
StreamGraph
of the streaming job with the option to clear previously
registered transformations
. Clearing the transformations allows, for
example, to not re-execute the same operations when calling execute()
multiple
times.clearTransformations
- Whether or not to clear previously registered transformations@Internal public StreamGraph generateStreamGraph(List<Transformation<?>> transformations)
StreamGraph
that consists of the given transformations
and is configured with the configuration of this environment.
This method does not access or clear the previously registered transformations.
transformations
- list of transformations that the graph should containpublic String getExecutionPlan()
@Internal public <F> F clean(F f)
ExecutionConfig
@Internal public void addOperator(Transformation<?> transformation)
@Internal public ReadableConfig getConfiguration()
Note that the returned configuration might not be complete. It only contains options that
have initialized the environment via StreamExecutionEnvironment(Configuration)
or
options that are not represented in dedicated configuration classes such as ExecutionConfig
or CheckpointConfig
.
Use configure(ReadableConfig, ClassLoader)
to set options that are specific to
this environment.
public static StreamExecutionEnvironment getExecutionEnvironment()
createLocalEnvironment()
.public static StreamExecutionEnvironment getExecutionEnvironment(Configuration configuration)
createLocalEnvironment(Configuration)
.
When executed from the command line the given configuration is stacked on top of the
global configuration which comes from the config.yaml
, potentially overriding
duplicated options.
configuration
- The configuration to instantiate the environment with.public static LocalStreamEnvironment createLocalEnvironment()
LocalStreamEnvironment
. The local execution environment will run the
program in a multi-threaded fashion in the same JVM as the environment was created in. The
default parallelism of the local environment is the number of hardware contexts (CPU cores /
threads), unless it was specified differently by setParallelism(int)
.public static LocalStreamEnvironment createLocalEnvironment(int parallelism)
LocalStreamEnvironment
. The local execution environment will run the
program in a multi-threaded fashion in the same JVM as the environment was created in. It
will use the parallelism specified in the parameter.parallelism
- The parallelism for the local environment.public static LocalStreamEnvironment createLocalEnvironment(int parallelism, Configuration configuration)
LocalStreamEnvironment
. The local execution environment will run the
program in a multi-threaded fashion in the same JVM as the environment was created in. It
will use the parallelism specified in the parameter.parallelism
- The parallelism for the local environment.configuration
- Pass a custom configuration into the clusterpublic static LocalStreamEnvironment createLocalEnvironment(Configuration configuration)
LocalStreamEnvironment
. The local execution environment will run the
program in a multi-threaded fashion in the same JVM as the environment was created in.configuration
- Pass a custom configuration into the cluster@PublicEvolving public static StreamExecutionEnvironment createLocalEnvironmentWithWebUI(Configuration conf)
LocalStreamEnvironment
for local program execution that also starts the web
monitoring UI.
The local execution environment will run the program in a multi-threaded fashion in the same JVM as the environment was created in. It will use the parallelism specified in the parameter.
If the configuration key 'rest.port' was set in the configuration, that particular port will be used for the web UI. Otherwise, the default port (8081) will be used.
public static StreamExecutionEnvironment createRemoteEnvironment(String host, int port, String... jarFiles)
RemoteStreamEnvironment
. The remote environment sends (parts of) the
program to a cluster for execution. Note that all file paths used in the program must be
accessible from the cluster. The execution will use no parallelism, unless the parallelism is
set explicitly via setParallelism(int)
.host
- The host name or address of the master (JobManager), where the program should be
executed.port
- The port of the master (JobManager), where the program should be executed.jarFiles
- The JAR files with code that needs to be shipped to the cluster. If the
program uses user-defined functions, user-defined input formats, or any libraries, those
must be provided in the JAR files.public static StreamExecutionEnvironment createRemoteEnvironment(String host, int port, int parallelism, String... jarFiles)
RemoteStreamEnvironment
. The remote environment sends (parts of) the
program to a cluster for execution. Note that all file paths used in the program must be
accessible from the cluster. The execution will use the specified parallelism.host
- The host name or address of the master (JobManager), where the program should be
executed.port
- The port of the master (JobManager), where the program should be executed.parallelism
- The parallelism to use during the execution.jarFiles
- The JAR files with code that needs to be shipped to the cluster. If the
program uses user-defined functions, user-defined input formats, or any libraries, those
must be provided in the JAR files.public static StreamExecutionEnvironment createRemoteEnvironment(String host, int port, Configuration clientConfig, String... jarFiles)
RemoteStreamEnvironment
. The remote environment sends (parts of) the
program to a cluster for execution. Note that all file paths used in the program must be
accessible from the cluster. The execution will use the specified parallelism.host
- The host name or address of the master (JobManager), where the program should be
executed.port
- The port of the master (JobManager), where the program should be executed.clientConfig
- The configuration used by the client that connects to the remote cluster.jarFiles
- The JAR files with code that needs to be shipped to the cluster. If the
program uses user-defined functions, user-defined input formats, or any libraries, those
must be provided in the JAR files.@PublicEvolving public static int getDefaultLocalParallelism()
createLocalEnvironment()
.@PublicEvolving public static void setDefaultLocalParallelism(int parallelism)
createLocalEnvironment()
.parallelism
- The parallelism to use as the default local parallelism.protected static void initializeContextEnvironment(StreamExecutionEnvironmentFactory ctx)
protected static void resetContextEnvironment()
public void registerCachedFile(String filePath, String name)
The RuntimeContext
can be obtained inside
UDFs via RichFunction.getRuntimeContext()
and
provides access DistributedCache
via RuntimeContext.getDistributedCache()
.
filePath
- The path of the file, as a URI (e.g. "file:///some/path" or
"hdfs://host:port/and/path")name
- The name under which the file is registered.public void registerCachedFile(String filePath, String name, boolean executable)
The RuntimeContext
can be obtained inside
UDFs via RichFunction.getRuntimeContext()
and
provides access DistributedCache
via RuntimeContext.getDistributedCache()
.
filePath
- The path of the file, as a URI (e.g. "file:///some/path" or
"hdfs://host:port/and/path")name
- The name under which the file is registered.executable
- flag indicating whether the file should be executable@Internal public static boolean areExplicitEnvironmentsAllowed()
@Internal public List<Transformation<?>> getTransformations()
@Internal public <T> void registerCacheTransformation(AbstractID intermediateDataSetID, CacheTransformation<T> t)
@Internal public void invalidateClusterDataset(AbstractID datasetId) throws Exception
Exception
protected Set<AbstractID> listCompletedClusterDatasets()
public void close() throws Exception
close
in interface AutoCloseable
Exception
Copyright © 2014–2024 The Apache Software Foundation. All rights reserved.