Class StreamExecutionEnvironment
- java.lang.Object
-
- org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
-
- All Implemented Interfaces:
AutoCloseable
- Direct Known Subclasses:
DummyStreamExecutionEnvironment
,LocalStreamEnvironment
,RemoteStreamEnvironment
,SqlGatewayStreamExecutionEnvironment
,StreamContextEnvironment
,StreamPlanEnvironment
,TestStreamEnvironment
@Public public class StreamExecutionEnvironment extends Object implements AutoCloseable
The StreamExecutionEnvironment is the context in which a streaming program is executed. ALocalStreamEnvironment
will cause execution in the current JVM, aRemoteStreamEnvironment
will cause execution on a remote setup.The environment provides methods to control the job execution (such as setting the parallelism or the fault tolerance/checkpointing parameters) and to interact with the outside world (data access).
- See Also:
LocalStreamEnvironment
,RemoteStreamEnvironment
-
-
Field Summary
Fields Modifier and Type Field Description protected List<Tuple2<String,DistributedCache.DistributedCacheEntry>>
cacheFile
Now we could not migrate this field to configuration.protected CheckpointConfig
checkpointCfg
Settings that control the checkpointing behavior.protected ExecutionConfig
config
The execution configuration for this environment.protected Configuration
configuration
Currently, configuration is split across multiple member variables and classes such asExecutionConfig
orCheckpointConfig
.protected List<Transformation<?>>
transformations
-
Constructor Summary
Constructors Constructor Description StreamExecutionEnvironment()
StreamExecutionEnvironment(Configuration configuration)
Creates a newStreamExecutionEnvironment
that will use the givenConfiguration
to configure thePipelineExecutor
.StreamExecutionEnvironment(Configuration configuration, ClassLoader userClassloader)
Creates a newStreamExecutionEnvironment
that will use the givenConfiguration
to configure thePipelineExecutor
.StreamExecutionEnvironment(PipelineExecutorServiceLoader executorServiceLoader, Configuration configuration, ClassLoader userClassloader)
Creates a newStreamExecutionEnvironment
that will use the givenConfiguration
to configure thePipelineExecutor
.
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Deprecated Methods Modifier and Type Method Description void
addOperator(Transformation<?> transformation)
Adds an operator to the list of operators that should be executed when callingexecute()
.<OUT> DataStreamSource<OUT>
addSource(SourceFunction<OUT> function)
Deprecated.This method relies on theSourceFunction
API, which is due to be removed.<OUT> DataStreamSource<OUT>
addSource(SourceFunction<OUT> function, String sourceName)
Deprecated.This method relies on theSourceFunction
API, which is due to be removed.<OUT> DataStreamSource<OUT>
addSource(SourceFunction<OUT> function, String sourceName, TypeInformation<OUT> typeInfo)
Deprecated.This method relies on theSourceFunction
API, which is due to be removed.<OUT> DataStreamSource<OUT>
addSource(SourceFunction<OUT> function, TypeInformation<OUT> typeInfo)
Deprecated.This method relies on theSourceFunction
API, which is due to be removed.static boolean
areExplicitEnvironmentsAllowed()
Checks whether it is currently permitted to explicitly instantiate a LocalEnvironment or a RemoteEnvironment.<F> F
clean(F f)
Returns a "closure-cleaned" version of the given function.void
clearJobListeners()
Clear all registeredJobListener
s.void
close()
Close and clean up the execution environment.void
configure(ReadableConfig configuration)
Sets all relevant options contained in theReadableConfig
.void
configure(ReadableConfig configuration, ClassLoader classLoader)
Sets all relevant options contained in theReadableConfig
.<OUT> DataStreamSource<OUT>
createInput(InputFormat<OUT,?> inputFormat)
Generic method to create an input data stream withInputFormat
.<OUT> DataStreamSource<OUT>
createInput(InputFormat<OUT,?> inputFormat, TypeInformation<OUT> typeInfo)
Generic method to create an input data stream withInputFormat
.static LocalStreamEnvironment
createLocalEnvironment()
Creates aLocalStreamEnvironment
.static LocalStreamEnvironment
createLocalEnvironment(int parallelism)
Creates aLocalStreamEnvironment
.static LocalStreamEnvironment
createLocalEnvironment(int parallelism, Configuration configuration)
Creates aLocalStreamEnvironment
.static LocalStreamEnvironment
createLocalEnvironment(Configuration configuration)
Creates aLocalStreamEnvironment
.static StreamExecutionEnvironment
createLocalEnvironmentWithWebUI(Configuration conf)
Creates aLocalStreamEnvironment
for local program execution that also starts the web monitoring UI.static StreamExecutionEnvironment
createRemoteEnvironment(String host, int port, int parallelism, String... jarFiles)
Creates aRemoteStreamEnvironment
.static StreamExecutionEnvironment
createRemoteEnvironment(String host, int port, String... jarFiles)
Creates aRemoteStreamEnvironment
.static StreamExecutionEnvironment
createRemoteEnvironment(String host, int port, Configuration clientConfig, String... jarFiles)
Creates aRemoteStreamEnvironment
.StreamExecutionEnvironment
disableOperatorChaining()
Disables operator chaining for streaming operators.StreamExecutionEnvironment
enableChangelogStateBackend(boolean enabled)
Enable the change log for current state backend.StreamExecutionEnvironment
enableCheckpointing(long interval)
Enables checkpointing for the streaming job.StreamExecutionEnvironment
enableCheckpointing(long interval, CheckpointingMode mode)
Enables checkpointing for the streaming job.StreamExecutionEnvironment
enableCheckpointing(long interval, CheckpointingMode mode)
Deprecated.useenableCheckpointing(long, CheckpointingMode)
instead.JobExecutionResult
execute()
Triggers the program execution.JobExecutionResult
execute(String jobName)
Triggers the program execution.JobExecutionResult
execute(StreamGraph streamGraph)
Triggers the program execution.JobClient
executeAsync()
Triggers the program asynchronously.JobClient
executeAsync(String jobName)
Triggers the program execution asynchronously.JobClient
executeAsync(StreamGraph streamGraph)
Triggers the program execution asynchronously.<OUT> DataStreamSource<OUT>
fromCollection(Collection<OUT> data)
Deprecated.This method will be removed a future release, possibly as early as version 2.0.<OUT> DataStreamSource<OUT>
fromCollection(Collection<OUT> data, TypeInformation<OUT> typeInfo)
Deprecated.This method will be removed a future release, possibly as early as version 2.0.<OUT> DataStreamSource<OUT>
fromCollection(Iterator<OUT> data, Class<OUT> type)
Deprecated.This method will be removed a future release, possibly as early as version 2.0.<OUT> DataStreamSource<OUT>
fromCollection(Iterator<OUT> data, TypeInformation<OUT> typeInfo)
Deprecated.This method will be removed a future release, possibly as early as version 2.0.<OUT> DataStreamSource<OUT>
fromData(Class<OUT> type, OUT... data)
Creates a new data stream that contains the given elements.<OUT> DataStreamSource<OUT>
fromData(Collection<OUT> data)
Creates a new data stream that contains the given elements.The type of the data stream is that of the elements in the collection.<OUT> DataStreamSource<OUT>
fromData(Collection<OUT> data, TypeInformation<OUT> typeInfo)
Creates a new data stream that contains the given elements.<OUT> DataStreamSource<OUT>
fromData(TypeInformation<OUT> typeInfo, OUT... data)
Creates a new data stream that contains the given elements.<OUT> DataStreamSource<OUT>
fromData(OUT... data)
Creates a new data stream that contains the given elements.<OUT> DataStreamSource<OUT>
fromElements(Class<OUT> type, OUT... data)
Deprecated.This method will be removed a future release, possibly as early as version 2.0.<OUT> DataStreamSource<OUT>
fromElements(OUT... data)
Deprecated.This method will be removed a future release, possibly as early as version 2.0.<OUT> DataStreamSource<OUT>
fromParallelCollection(SplittableIterator<OUT> iterator, Class<OUT> type)
Creates a new data stream that contains elements in the iterator.<OUT> DataStreamSource<OUT>
fromParallelCollection(SplittableIterator<OUT> iterator, TypeInformation<OUT> typeInfo)
Creates a new data stream that contains elements in the iterator.DataStreamSource<Long>
fromSequence(long from, long to)
Creates a new data stream that contains a sequence of numbers (longs) and is useful for testing and for cases that just need a stream of N events of any kind.<OUT> DataStreamSource<OUT>
fromSource(Source<OUT,?,?> source, WatermarkStrategy<OUT> timestampsAndWatermarks, String sourceName)
Adds a dataSource
to the environment to get aDataStream
.<OUT> DataStreamSource<OUT>
fromSource(Source<OUT,?,?> source, WatermarkStrategy<OUT> timestampsAndWatermarks, String sourceName, TypeInformation<OUT> typeInfo)
Adds a dataSource
to the environment to get aDataStream
.DataStreamSource<Long>
generateSequence(long from, long to)
Deprecated.UsefromSequence(long, long)
instead to create a new data stream that containsNumberSequenceSource
.StreamGraph
generateStreamGraph(List<Transformation<?>> transformations)
Generates aStreamGraph
that consists of the giventransformations
and is configured with the configuration of this environment.long
getBufferTimeout()
Gets the maximum time frequency (milliseconds) for the flushing of the output buffers.List<Tuple2<String,DistributedCache.DistributedCacheEntry>>
getCachedFiles()
Get the list of cached files that were registered for distribution among the task managers.CheckpointConfig
getCheckpointConfig()
Gets the checkpoint config, which defines values like checkpoint interval, delay between checkpoints, etc.CheckpointingMode
getCheckpointingConsistencyMode()
Returns the checkpointing consistency mode (exactly-once vs. at-least-once).CheckpointingMode
getCheckpointingMode()
Deprecated.UsegetCheckpointingConsistencyMode()
instead.long
getCheckpointInterval()
Returns the checkpointing interval or -1 if checkpointing is disabled.ExecutionConfig
getConfig()
Gets the config object.ReadableConfig
getConfiguration()
Gives read-only access to the underlying configuration of this environment.static int
getDefaultLocalParallelism()
Gets the default parallelism that will be used for the local execution environment created bycreateLocalEnvironment()
.Path
getDefaultSavepointDirectory()
Gets the default savepoint directory for this Job.static StreamExecutionEnvironment
getExecutionEnvironment()
Creates an execution environment that represents the context in which the program is currently executed.static StreamExecutionEnvironment
getExecutionEnvironment(Configuration configuration)
Creates an execution environment that represents the context in which the program is currently executed.String
getExecutionPlan()
Creates the plan with which the system will execute the program, and returns it as a String using a JSON representation of the execution data flow graph.List<JobListener>
getJobListeners()
Gets the config JobListeners.int
getMaxParallelism()
Gets the maximum degree of parallelism defined for the program.int
getParallelism()
Gets the parallelism with which operation are executed by default.StreamGraph
getStreamGraph()
Getter of theStreamGraph
of the streaming job.StreamGraph
getStreamGraph(boolean clearTransformations)
Getter of theStreamGraph
of the streaming job with the option to clear previously registeredtransformations
.List<Transformation<?>>
getTransformations()
protected ClassLoader
getUserClassloader()
protected static void
initializeContextEnvironment(StreamExecutionEnvironmentFactory ctx)
void
invalidateClusterDataset(AbstractID datasetId)
boolean
isChainingEnabled()
Returns whether operator chaining is enabled.boolean
isChainingOfOperatorsWithDifferentMaxParallelismEnabled()
TernaryBoolean
isChangelogStateBackendEnabled()
Gets the enable status of change log for state backend.boolean
isForceUnalignedCheckpoints()
Returns whether unaligned checkpoints are force-enabled.boolean
isUnalignedCheckpointsEnabled()
Returns whether unaligned checkpoints are enabled.protected Set<AbstractID>
listCompletedClusterDatasets()
<OUT> DataStreamSource<OUT>
readFile(FileInputFormat<OUT> inputFormat, String filePath)
Deprecated.UseFileSource#forRecordStreamFormat()/forBulkFileFormat()/forRecordFileFormat() instead
.<OUT> DataStreamSource<OUT>
readFile(FileInputFormat<OUT> inputFormat, String filePath, FileProcessingMode watchType, long interval)
Deprecated.UseFileSource#forRecordStreamFormat()/forBulkFileFormat()/forRecordFileFormat() instead
.<OUT> DataStreamSource<OUT>
readFile(FileInputFormat<OUT> inputFormat, String filePath, FileProcessingMode watchType, long interval, FilePathFilter filter)
Deprecated.<OUT> DataStreamSource<OUT>
readFile(FileInputFormat<OUT> inputFormat, String filePath, FileProcessingMode watchType, long interval, TypeInformation<OUT> typeInformation)
Deprecated.UseFileSource#forRecordStreamFormat()/forBulkFileFormat()/forRecordFileFormat() instead
.DataStream<String>
readFileStream(String filePath, long intervalMillis, FileMonitoringFunction.WatchType watchType)
Deprecated.void
registerCachedFile(String filePath, String name)
Registers a file at the distributed cache under the given name.void
registerCachedFile(String filePath, String name, boolean executable)
Registers a file at the distributed cache under the given name.<T> void
registerCacheTransformation(AbstractID intermediateDataSetID, CacheTransformation<T> t)
void
registerCollectIterator(CollectResultIterator<?> iterator)
void
registerJobListener(JobListener jobListener)
Register aJobListener
in this environment.StreamExecutionEnvironment
registerSlotSharingGroup(SlotSharingGroup slotSharingGroup)
Register a slot sharing group with its resource spec.protected static void
resetContextEnvironment()
StreamExecutionEnvironment
setBufferTimeout(long timeoutMillis)
Sets the maximum time frequency (milliseconds) for the flushing of the output buffers.static void
setDefaultLocalParallelism(int parallelism)
Sets the default parallelism that will be used for the local execution environment created bycreateLocalEnvironment()
.StreamExecutionEnvironment
setDefaultSavepointDirectory(String savepointDirectory)
Sets the default savepoint directory, where savepoints will be written to if no is explicitly provided when triggered.StreamExecutionEnvironment
setDefaultSavepointDirectory(URI savepointDirectory)
Sets the default savepoint directory, where savepoints will be written to if no is explicitly provided when triggered.StreamExecutionEnvironment
setDefaultSavepointDirectory(Path savepointDirectory)
Sets the default savepoint directory, where savepoints will be written to if no is explicitly provided when triggered.StreamExecutionEnvironment
setMaxParallelism(int maxParallelism)
Sets the maximum degree of parallelism defined for the program.StreamExecutionEnvironment
setParallelism(int parallelism)
Sets the parallelism for operations executed through this environment.StreamExecutionEnvironment
setRuntimeMode(RuntimeExecutionMode executionMode)
Sets the runtime execution mode for the application (seeRuntimeExecutionMode
).DataStreamSource<String>
socketTextStream(String hostname, int port)
Creates a new data stream that contains the strings received infinitely from a socket.DataStreamSource<String>
socketTextStream(String hostname, int port, char delimiter)
Deprecated.UsesocketTextStream(String, int, String)
instead.DataStreamSource<String>
socketTextStream(String hostname, int port, char delimiter, long maxRetry)
Deprecated.UsesocketTextStream(String, int, String, long)
instead.DataStreamSource<String>
socketTextStream(String hostname, int port, String delimiter)
Creates a new data stream that contains the strings received infinitely from a socket.DataStreamSource<String>
socketTextStream(String hostname, int port, String delimiter, long maxRetry)
Creates a new data stream that contains the strings received infinitely from a socket.
-
-
-
Field Detail
-
config
protected final ExecutionConfig config
The execution configuration for this environment.
-
checkpointCfg
protected final CheckpointConfig checkpointCfg
Settings that control the checkpointing behavior.
-
transformations
protected final List<Transformation<?>> transformations
-
cacheFile
protected final List<Tuple2<String,DistributedCache.DistributedCacheEntry>> cacheFile
Now we could not migrate this field to configuration. Because this object field remains directly accessible and modifiable as it is exposed through a getter to users, allowing external modifications.
-
configuration
protected final Configuration configuration
Currently, configuration is split across multiple member variables and classes such asExecutionConfig
orCheckpointConfig
. This architecture makes it quite difficult to handle/merge/enrich configuration or restrict access in other APIs.In the long-term, this
Configuration
object should be the source of truth for newly addedConfigOption
s that are relevant for DataStream API. Make sure to also updateconfigure(ReadableConfig, ClassLoader)
.
-
-
Constructor Detail
-
StreamExecutionEnvironment
public StreamExecutionEnvironment()
-
StreamExecutionEnvironment
@PublicEvolving public StreamExecutionEnvironment(Configuration configuration)
Creates a newStreamExecutionEnvironment
that will use the givenConfiguration
to configure thePipelineExecutor
.
-
StreamExecutionEnvironment
@PublicEvolving public StreamExecutionEnvironment(Configuration configuration, ClassLoader userClassloader)
Creates a newStreamExecutionEnvironment
that will use the givenConfiguration
to configure thePipelineExecutor
.In addition, this constructor allows specifying the user code
ClassLoader
.
-
StreamExecutionEnvironment
@PublicEvolving public StreamExecutionEnvironment(PipelineExecutorServiceLoader executorServiceLoader, Configuration configuration, ClassLoader userClassloader)
Creates a newStreamExecutionEnvironment
that will use the givenConfiguration
to configure thePipelineExecutor
.In addition, this constructor allows specifying the
PipelineExecutorServiceLoader
and user codeClassLoader
.
-
-
Method Detail
-
registerCollectIterator
@Internal public void registerCollectIterator(CollectResultIterator<?> iterator)
-
getUserClassloader
protected ClassLoader getUserClassloader()
-
getConfig
public ExecutionConfig getConfig()
Gets the config object.
-
getCachedFiles
public List<Tuple2<String,DistributedCache.DistributedCacheEntry>> getCachedFiles()
Get the list of cached files that were registered for distribution among the task managers.
-
getJobListeners
@PublicEvolving public List<JobListener> getJobListeners()
Gets the config JobListeners.
-
setParallelism
public StreamExecutionEnvironment setParallelism(int parallelism)
Sets the parallelism for operations executed through this environment. Setting a parallelism of x here will cause all operators (such as map, batchReduce) to run with x parallel instances. This method overrides the default parallelism for this environment. TheLocalStreamEnvironment
uses by default a value equal to the number of hardware contexts (CPU cores / threads). When executing the program via the command line client from a JAR file, the default degree of parallelism is the one configured for that setup.- Parameters:
parallelism
- The parallelism
-
setRuntimeMode
@PublicEvolving public StreamExecutionEnvironment setRuntimeMode(RuntimeExecutionMode executionMode)
Sets the runtime execution mode for the application (seeRuntimeExecutionMode
). This is equivalent to setting theexecution.runtime-mode
in your application's configuration file.We recommend users to NOT use this method but set the
execution.runtime-mode
using the command-line when submitting the application. Keeping the application code configuration-free allows for more flexibility as the same application will be able to be executed in any execution mode.- Parameters:
executionMode
- the desired execution mode.- Returns:
- The execution environment of your application.
-
setMaxParallelism
public StreamExecutionEnvironment setMaxParallelism(int maxParallelism)
Sets the maximum degree of parallelism defined for the program. The upper limit (inclusive) is Short.MAX_VALUE + 1.The maximum degree of parallelism specifies the upper limit for dynamic scaling. It also defines the number of key groups used for partitioned state.
- Parameters:
maxParallelism
- Maximum degree of parallelism to be used for the program., with0 < maxParallelism <= 2^15
.
-
registerSlotSharingGroup
@PublicEvolving public StreamExecutionEnvironment registerSlotSharingGroup(SlotSharingGroup slotSharingGroup)
Register a slot sharing group with its resource spec.Note that a slot sharing group hints the scheduler that the grouped operators CAN be deployed into a shared slot. There's no guarantee that the scheduler always deploy the grouped operators together. In cases grouped operators are deployed into separate slots, the slot resources will be derived from the specified group requirements.
- Parameters:
slotSharingGroup
- which contains name and its resource spec.
-
getParallelism
public int getParallelism()
Gets the parallelism with which operation are executed by default. Operations can individually override this value to use a specific parallelism.- Returns:
- The parallelism used by operations, unless they override that value.
-
getMaxParallelism
public int getMaxParallelism()
Gets the maximum degree of parallelism defined for the program.The maximum degree of parallelism specifies the upper limit for dynamic scaling. It also defines the number of key groups used for partitioned state.
- Returns:
- Maximum degree of parallelism
-
setBufferTimeout
public StreamExecutionEnvironment setBufferTimeout(long timeoutMillis)
Sets the maximum time frequency (milliseconds) for the flushing of the output buffers. By default the output buffers flush frequently to provide low latency and to aid smooth developer experience. Setting the parameter can result in three logical modes:- A positive integer triggers flushing periodically by that integer
- 0 triggers flushing after every record thus minimizing latency
- -1 triggers flushing only when the output buffer is full thus maximizing throughput
- Parameters:
timeoutMillis
- The maximum time between two output flushes.
-
getBufferTimeout
public long getBufferTimeout()
Gets the maximum time frequency (milliseconds) for the flushing of the output buffers. For clarification on the extremal values seesetBufferTimeout(long)
.- Returns:
- The timeout of the buffer.
-
disableOperatorChaining
@PublicEvolving public StreamExecutionEnvironment disableOperatorChaining()
Disables operator chaining for streaming operators. Operator chaining allows non-shuffle operations to be co-located in the same thread fully avoiding serialization and de-serialization.- Returns:
- StreamExecutionEnvironment with chaining disabled.
-
isChainingEnabled
@PublicEvolving public boolean isChainingEnabled()
Returns whether operator chaining is enabled.- Returns:
true
if chaining is enabled, false otherwise.
-
isChainingOfOperatorsWithDifferentMaxParallelismEnabled
@PublicEvolving public boolean isChainingOfOperatorsWithDifferentMaxParallelismEnabled()
-
getCheckpointConfig
public CheckpointConfig getCheckpointConfig()
Gets the checkpoint config, which defines values like checkpoint interval, delay between checkpoints, etc.- Returns:
- The checkpoint config.
-
enableCheckpointing
public StreamExecutionEnvironment enableCheckpointing(long interval)
Enables checkpointing for the streaming job. The distributed state of the streaming dataflow will be periodically snapshotted. In case of a failure, the streaming dataflow will be restarted from the latest completed checkpoint. This method selectsCheckpointingMode.EXACTLY_ONCE
guarantees.The job draws checkpoints periodically, in the given interval. The state will be stored in the configured state backend.
NOTE: Checkpointing iterative streaming dataflows is not properly supported at the moment. For that reason, iterative jobs will not be started if used with enabled checkpointing.
- Parameters:
interval
- Time interval between state checkpoints in milliseconds.
-
enableCheckpointing
@Deprecated public StreamExecutionEnvironment enableCheckpointing(long interval, CheckpointingMode mode)
Deprecated.useenableCheckpointing(long, CheckpointingMode)
instead.Enables checkpointing for the streaming job. The distributed state of the streaming dataflow will be periodically snapshotted. In case of a failure, the streaming dataflow will be restarted from the latest completed checkpoint.The job draws checkpoints periodically, in the given interval. The system uses the given
CheckpointingMode
for the checkpointing ("exactly once" vs "at least once"). The state will be stored in the configured state backend.NOTE: Checkpointing iterative streaming dataflows is not properly supported at the moment. For that reason, iterative jobs will not be started if used with enabled checkpointing.
- Parameters:
interval
- Time interval between state checkpoints in milliseconds.mode
- The checkpointing mode, selecting between "exactly once" and "at least once" guaranteed.
-
enableCheckpointing
public StreamExecutionEnvironment enableCheckpointing(long interval, CheckpointingMode mode)
Enables checkpointing for the streaming job. The distributed state of the streaming dataflow will be periodically snapshotted. In case of a failure, the streaming dataflow will be restarted from the latest completed checkpoint.The job draws checkpoints periodically, in the given interval. The system uses the given
CheckpointingMode
for the checkpointing ("exactly once" vs "at least once"). The state will be stored in the configured state backend.NOTE: Checkpointing iterative streaming dataflows is not properly supported at the moment. For that reason, iterative jobs will not be started if used with enabled checkpointing.
- Parameters:
interval
- Time interval between state checkpoints in milliseconds.mode
- The checkpointing mode, selecting between "exactly once" and "at least once" guaranteed.
-
getCheckpointInterval
public long getCheckpointInterval()
Returns the checkpointing interval or -1 if checkpointing is disabled.Shorthand for
getCheckpointConfig().getCheckpointInterval()
.- Returns:
- The checkpointing interval or -1
-
isUnalignedCheckpointsEnabled
@PublicEvolving public boolean isUnalignedCheckpointsEnabled()
Returns whether unaligned checkpoints are enabled.
-
isForceUnalignedCheckpoints
@PublicEvolving public boolean isForceUnalignedCheckpoints()
Returns whether unaligned checkpoints are force-enabled.
-
getCheckpointingMode
@Deprecated public CheckpointingMode getCheckpointingMode()
Deprecated.UsegetCheckpointingConsistencyMode()
instead.Returns the checkpointing mode (exactly-once vs. at-least-once).Shorthand for
getCheckpointConfig().getCheckpointingMode()
.- Returns:
- The checkpoint mode
-
getCheckpointingConsistencyMode
public CheckpointingMode getCheckpointingConsistencyMode()
Returns the checkpointing consistency mode (exactly-once vs. at-least-once).Shorthand for
getCheckpointConfig().getCheckpointingConsistencyMode()
.- Returns:
- The checkpoint mode
-
enableChangelogStateBackend
@PublicEvolving public StreamExecutionEnvironment enableChangelogStateBackend(boolean enabled)
Enable the change log for current state backend. This change log allows operators to persist state changes in a very fine-grained manner. Currently, the change log only applies to keyed state, so non-keyed operator state and channel state are persisted as usual. The 'state' here refers to 'keyed state'. Details are as follows:Stateful operators write the state changes to that log (logging the state), in addition to applying them to the state tables in RocksDB or the in-mem Hashtable.
An operator can acknowledge a checkpoint as soon as the changes in the log have reached the durable checkpoint storage.
The state tables are persisted periodically, independent of the checkpoints. We call this the materialization of the state on the checkpoint storage.
Once the state is materialized on checkpoint storage, the state changelog can be truncated to the corresponding point.
It establish a way to drastically reduce the checkpoint interval for streaming applications across state backends. For more details please check the FLIP-158.
If this method is not called explicitly, it means no preference for enabling the change log. Configs for change log enabling will override in different config levels (job/local/cluster).
- Parameters:
enabled
- true if enable the change log for state backend explicitly, otherwise disable the change log.- Returns:
- This StreamExecutionEnvironment itself, to allow chaining of function calls.
- See Also:
isChangelogStateBackendEnabled()
-
isChangelogStateBackendEnabled
@PublicEvolving public TernaryBoolean isChangelogStateBackendEnabled()
Gets the enable status of change log for state backend.- Returns:
- a
TernaryBoolean
for the enable status of change log for state backend. Could beTernaryBoolean.UNDEFINED
if user never specify this by callingenableChangelogStateBackend(boolean)
. - See Also:
enableChangelogStateBackend(boolean)
-
setDefaultSavepointDirectory
@PublicEvolving public StreamExecutionEnvironment setDefaultSavepointDirectory(String savepointDirectory)
Sets the default savepoint directory, where savepoints will be written to if no is explicitly provided when triggered.- Returns:
- This StreamExecutionEnvironment itself, to allow chaining of function calls.
- See Also:
getDefaultSavepointDirectory()
-
setDefaultSavepointDirectory
@PublicEvolving public StreamExecutionEnvironment setDefaultSavepointDirectory(URI savepointDirectory)
Sets the default savepoint directory, where savepoints will be written to if no is explicitly provided when triggered.- Returns:
- This StreamExecutionEnvironment itself, to allow chaining of function calls.
- See Also:
getDefaultSavepointDirectory()
-
setDefaultSavepointDirectory
@PublicEvolving public StreamExecutionEnvironment setDefaultSavepointDirectory(Path savepointDirectory)
Sets the default savepoint directory, where savepoints will be written to if no is explicitly provided when triggered.- Returns:
- This StreamExecutionEnvironment itself, to allow chaining of function calls.
- See Also:
getDefaultSavepointDirectory()
-
getDefaultSavepointDirectory
@Nullable @PublicEvolving public Path getDefaultSavepointDirectory()
Gets the default savepoint directory for this Job.- See Also:
setDefaultSavepointDirectory(Path)
-
configure
@PublicEvolving public void configure(ReadableConfig configuration)
Sets all relevant options contained in theReadableConfig
. It will reconfigureStreamExecutionEnvironment
,ExecutionConfig
andCheckpointConfig
.It will change the value of a setting only if a corresponding option was set in the
configuration
. If a key is not present, the current value of a field will remain untouched.- Parameters:
configuration
- a configuration to read the values from
-
configure
@PublicEvolving public void configure(ReadableConfig configuration, ClassLoader classLoader)
Sets all relevant options contained in theReadableConfig
. It will reconfigureStreamExecutionEnvironment
,ExecutionConfig
andCheckpointConfig
.It will change the value of a setting only if a corresponding option was set in the
configuration
. If a key is not present, the current value of a field will remain untouched.- Parameters:
configuration
- a configuration to read the values fromclassLoader
- a class loader to use when loading classes
-
fromData
@SafeVarargs public final <OUT> DataStreamSource<OUT> fromData(OUT... data)
Creates a new data stream that contains the given elements. The elements must all be of the same type, for example, all of theString
orInteger
.The framework will try and determine the exact type from the elements. In case of generic elements, it may be necessary to manually supply the type information via
#fromData(org.apache.flink.api.common.typeinfo.TypeInformation, OUT...)
.NOTE: This creates a non-parallel data stream source by default (parallelism of one). Adjustment of parallelism is supported via
setParallelism()
on the result.- Type Parameters:
OUT
- The type of the returned data stream- Parameters:
data
- The array of elements to create the data stream from.- Returns:
- The data stream representing the given array of elements
-
fromData
@SafeVarargs public final <OUT> DataStreamSource<OUT> fromData(TypeInformation<OUT> typeInfo, OUT... data)
Creates a new data stream that contains the given elements. The elements should be the same or be the subclass to thetypeInfo
type. The sequence of elements must not be empty.NOTE: This creates a non-parallel data stream source by default (parallelism of one). Adjustment of parallelism is supported via
setParallelism()
on the result.- Type Parameters:
OUT
- The type of the returned data stream- Parameters:
typeInfo
- The type information of the elements.data
- The array of elements to create the data stream from.- Returns:
- The data stream representing the given array of elements
-
fromData
public <OUT> DataStreamSource<OUT> fromData(Collection<OUT> data, TypeInformation<OUT> typeInfo)
Creates a new data stream that contains the given elements. The elements must all be of the same type, for example, all of theString
orInteger
.The framework will try and determine the exact type from the elements. In case of generic elements, it may be necessary to manually supply the type information via
#fromData(org.apache.flink.api.common.typeinfo.TypeInformation, OUT...)
.NOTE: This creates a non-parallel data stream source by default (parallelism of one). Adjustment of parallelism is supported via
setParallelism()
on the result.- Type Parameters:
OUT
- The generic type of the returned data stream.- Parameters:
data
- The collection of elements to create the data stream from.typeInfo
- The type information of the elements.- Returns:
- The data stream representing the given collection
-
fromData
@SafeVarargs public final <OUT> DataStreamSource<OUT> fromData(Class<OUT> type, OUT... data)
Creates a new data stream that contains the given elements. The framework will determine the type according to the based type user supplied. The elements should be the same or be the subclass to the based type. The sequence of elements must not be empty.NOTE: This creates a non-parallel data stream source by default (parallelism of one). Adjustment of parallelism is supported via
setParallelism()
on the result.- Type Parameters:
OUT
- The type of the returned data stream- Parameters:
type
- The based class type in the collection.data
- The array of elements to create the data stream from.- Returns:
- The data stream representing the given array of elements
-
fromData
public <OUT> DataStreamSource<OUT> fromData(Collection<OUT> data)
Creates a new data stream that contains the given elements.The type of the data stream is that of the elements in the collection.The framework will try and determine the exact type from the collection elements. In case of generic elements, it may be necessary to manually supply the type information via
fromData(java.util.Collection, org.apache.flink.api.common.typeinfo.TypeInformation)
.NOTE: This creates a non-parallel data stream source by default (parallelism of one). Adjustment of parallelism is supported via
setParallelism()
on the result.- Type Parameters:
OUT
- The generic type of the returned data stream.- Parameters:
data
- The collection of elements to create the data stream from.- Returns:
- The data stream representing the given collection
-
generateSequence
@Deprecated public DataStreamSource<Long> generateSequence(long from, long to)
Deprecated.UsefromSequence(long, long)
instead to create a new data stream that containsNumberSequenceSource
.Creates a new data stream that contains a sequence of numbers. This is a parallel source, if you manually set the parallelism to1
(usingSingleOutputStreamOperator.setParallelism(int)
) the generated sequence of elements is in order.- Parameters:
from
- The number to start at (inclusive)to
- The number to stop at (inclusive)- Returns:
- A data stream, containing all number in the [from, to] interval
-
fromSequence
public DataStreamSource<Long> fromSequence(long from, long to)
Creates a new data stream that contains a sequence of numbers (longs) and is useful for testing and for cases that just need a stream of N events of any kind.The generated source splits the sequence into as many parallel sub-sequences as there are parallel source readers. Each sub-sequence will be produced in order. If the parallelism is limited to one, the source will produce one sequence in order.
This source is always bounded. For very long sequences (for example over the entire domain of long integer values), you may consider executing the application in a streaming manner because of the end bound that is pretty far away.
Use
fromSource(Source, WatermarkStrategy, String)
together withNumberSequenceSource
if you required more control over the created sources. For example, if you want to set aWatermarkStrategy
.- Parameters:
from
- The number to start at (inclusive)to
- The number to stop at (inclusive)
-
fromElements
@SafeVarargs @Deprecated public final <OUT> DataStreamSource<OUT> fromElements(OUT... data)
Deprecated.This method will be removed a future release, possibly as early as version 2.0. Use#fromData(OUT...)
instead.Creates a new data stream that contains the given elements. The elements must all be of the same type, for example, all of theString
orInteger
.The framework will try and determine the exact type from the elements. In case of generic elements, it may be necessary to manually supply the type information via
fromCollection(java.util.Collection, org.apache.flink.api.common.typeinfo.TypeInformation)
.Note that this operation will result in a non-parallel data stream source, i.e. a data stream source with a degree of parallelism one.
- Type Parameters:
OUT
- The type of the returned data stream- Parameters:
data
- The array of elements to create the data stream from.- Returns:
- The data stream representing the given array of elements
-
fromElements
@SafeVarargs @Deprecated public final <OUT> DataStreamSource<OUT> fromElements(Class<OUT> type, OUT... data)
Deprecated.This method will be removed a future release, possibly as early as version 2.0. Use#fromData(OUT...)
instead.Creates a new data stream that contains the given elements. The framework will determine the type according to the based type user supplied. The elements should be the same or be the subclass to the based type. The sequence of elements must not be empty. Note that this operation will result in a non-parallel data stream source, i.e. a data stream source with a degree of parallelism one.- Type Parameters:
OUT
- The type of the returned data stream- Parameters:
type
- The based class type in the collection.data
- The array of elements to create the data stream from.- Returns:
- The data stream representing the given array of elements
-
fromCollection
public <OUT> DataStreamSource<OUT> fromCollection(Collection<OUT> data)
Deprecated.This method will be removed a future release, possibly as early as version 2.0. UsefromData(Collection)
instead.Creates a data stream from the given non-empty collection. The type of the data stream is that of the elements in the collection.The framework will try and determine the exact type from the collection elements. In case of generic elements, it may be necessary to manually supply the type information via
fromCollection(java.util.Collection, org.apache.flink.api.common.typeinfo.TypeInformation)
.Note that this operation will result in a non-parallel data stream source, i.e. a data stream source with parallelism one.
- Type Parameters:
OUT
- The generic type of the returned data stream.- Parameters:
data
- The collection of elements to create the data stream from.- Returns:
- The data stream representing the given collection
-
fromCollection
public <OUT> DataStreamSource<OUT> fromCollection(Collection<OUT> data, TypeInformation<OUT> typeInfo)
Deprecated.This method will be removed a future release, possibly as early as version 2.0. UsefromData(Collection, TypeInformation)
instead.Creates a data stream from the given non-empty collection.Note that this operation will result in a non-parallel data stream source, i.e., a data stream source with parallelism one.
- Type Parameters:
OUT
- The type of the returned data stream- Parameters:
data
- The collection of elements to create the data stream fromtypeInfo
- The TypeInformation for the produced data stream- Returns:
- The data stream representing the given collection
-
fromCollection
public <OUT> DataStreamSource<OUT> fromCollection(Iterator<OUT> data, Class<OUT> type)
Deprecated.This method will be removed a future release, possibly as early as version 2.0. UsefromData(Collection, TypeInformation)
instead. For rate-limited data generation, useDataGeneratorSource
withRateLimiterStrategy
. If you need to use a fixed set of elements in such scenario, combine it withFromElementsGeneratorFunction
.Creates a data stream from the given iterator.Because the iterator will remain unmodified until the actual execution happens, the type of data returned by the iterator must be given explicitly in the form of the type class (this is due to the fact that the Java compiler erases the generic type information).
Note that this operation will result in a non-parallel data stream source, i.e., a data stream source with a parallelism of one.
- Type Parameters:
OUT
- The type of the returned data stream- Parameters:
data
- The iterator of elements to create the data stream fromtype
- The class of the data produced by the iterator. Must not be a generic class.- Returns:
- The data stream representing the elements in the iterator
- See Also:
fromCollection(java.util.Iterator, org.apache.flink.api.common.typeinfo.TypeInformation)
-
fromCollection
public <OUT> DataStreamSource<OUT> fromCollection(Iterator<OUT> data, TypeInformation<OUT> typeInfo)
Deprecated.This method will be removed a future release, possibly as early as version 2.0. UsefromData(Collection, TypeInformation)
instead. For rate-limited data generation, useDataGeneratorSource
withRateLimiterStrategy
. If you need to use a fixed set of elements in such scenario, combine it withFromElementsGeneratorFunction
.Creates a data stream from the given iterator.Because the iterator will remain unmodified until the actual execution happens, the type of data returned by the iterator must be given explicitly in the form of the type information. This method is useful for cases where the type is generic. In that case, the type class (as given in
fromCollection(java.util.Iterator, Class)
does not supply all type information.Note that this operation will result in a non-parallel data stream source, i.e., a data stream source with parallelism one.
- Type Parameters:
OUT
- The type of the returned data stream- Parameters:
data
- The iterator of elements to create the data stream fromtypeInfo
- The TypeInformation for the produced data stream- Returns:
- The data stream representing the elements in the iterator
-
fromParallelCollection
public <OUT> DataStreamSource<OUT> fromParallelCollection(SplittableIterator<OUT> iterator, Class<OUT> type)
Creates a new data stream that contains elements in the iterator. The iterator is splittable, allowing the framework to create a parallel data stream source that returns the elements in the iterator.Because the iterator will remain unmodified until the actual execution happens, the type of data returned by the iterator must be given explicitly in the form of the type class (this is due to the fact that the Java compiler erases the generic type information).
- Type Parameters:
OUT
- The type of the returned data stream- Parameters:
iterator
- The iterator that produces the elements of the data streamtype
- The class of the data produced by the iterator. Must not be a generic class.- Returns:
- A data stream representing the elements in the iterator
-
fromParallelCollection
public <OUT> DataStreamSource<OUT> fromParallelCollection(SplittableIterator<OUT> iterator, TypeInformation<OUT> typeInfo)
Creates a new data stream that contains elements in the iterator. The iterator is splittable, allowing the framework to create a parallel data stream source that returns the elements in the iterator.Because the iterator will remain unmodified until the actual execution happens, the type of data returned by the iterator must be given explicitly in the form of the type information. This method is useful for cases where the type is generic. In that case, the type class (as given in
fromParallelCollection(org.apache.flink.util.SplittableIterator, Class)
does not supply all type information.- Type Parameters:
OUT
- The type of the returned data stream- Parameters:
iterator
- The iterator that produces the elements of the data streamtypeInfo
- The TypeInformation for the produced data stream.- Returns:
- A data stream representing the elements in the iterator
-
readFile
@Deprecated public <OUT> DataStreamSource<OUT> readFile(FileInputFormat<OUT> inputFormat, String filePath)
Deprecated.UseFileSource#forRecordStreamFormat()/forBulkFileFormat()/forRecordFileFormat() instead
. An example of reading a file using a simpleTextLineInputFormat
:FileSource<String> source = FileSource.forRecordStreamFormat( new TextLineInputFormat(), new Path("/foo/bar")) .build();
Reads the contents of the user-specifiedfilePath
based on the givenFileInputFormat
.Since all data streams need specific information about their types, this method needs to determine the type of the data produced by the input format. It will attempt to determine the data type by reflection, unless the input format implements the
ResultTypeQueryable
interface. In the latter case, this method will invoke theResultTypeQueryable.getProducedType()
method to determine data type produced by the input format.NOTES ON CHECKPOINTING: The source monitors the path, creates the
FileInputSplits
to be processed, forwards them to the downstream readers to read the actual data, and exits, without waiting for the readers to finish reading. This implies that no more checkpoint barriers are going to be forwarded after the source exits, thus having no checkpoints after that point.- Type Parameters:
OUT
- The type of the returned data stream- Parameters:
filePath
- The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path")inputFormat
- The input format used to create the data stream- Returns:
- The data stream that represents the data read from the given file
-
readFile
@PublicEvolving @Deprecated public <OUT> DataStreamSource<OUT> readFile(FileInputFormat<OUT> inputFormat, String filePath, FileProcessingMode watchType, long interval, FilePathFilter filter)
Deprecated.Reads the contents of the user-specifiedfilePath
based on the givenFileInputFormat
. Depending on the providedFileProcessingMode
.See
readFile(FileInputFormat, String, FileProcessingMode, long)
- Type Parameters:
OUT
- The type of the returned data stream- Parameters:
inputFormat
- The input format used to create the data streamfilePath
- The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path")watchType
- The mode in which the source should operate, i.e. monitor path and react to new data, or process once and exitinterval
- In the case of periodic path monitoring, this specifies the interval (in millis) between consecutive path scansfilter
- The files to be excluded from the processing- Returns:
- The data stream that represents the data read from the given file
-
readFile
@Deprecated @PublicEvolving public <OUT> DataStreamSource<OUT> readFile(FileInputFormat<OUT> inputFormat, String filePath, FileProcessingMode watchType, long interval)
Deprecated.UseFileSource#forRecordStreamFormat()/forBulkFileFormat()/forRecordFileFormat() instead
. An example of reading a file using a simpleTextLineInputFormat
:FileSource<String> source = FileSource.forRecordStreamFormat( new TextLineInputFormat(), new Path("/foo/bar")) .monitorContinuously(Duration.of(10, SECONDS)) .build();
Reads the contents of the user-specifiedfilePath
based on the givenFileInputFormat
. Depending on the providedFileProcessingMode
, the source may periodically monitor (everyinterval
ms) the path for new data (FileProcessingMode.PROCESS_CONTINUOUSLY
), or process once the data currently in the path and exit (FileProcessingMode.PROCESS_ONCE
). In addition, if the path contains files not to be processed, the user can specify a customFilePathFilter
. As a default implementation you can useFilePathFilter.createDefaultFilter()
.Since all data streams need specific information about their types, this method needs to determine the type of the data produced by the input format. It will attempt to determine the data type by reflection, unless the input format implements the
ResultTypeQueryable
interface. In the latter case, this method will invoke theResultTypeQueryable.getProducedType()
method to determine data type produced by the input format.NOTES ON CHECKPOINTING: If the
watchType
is set toFileProcessingMode.PROCESS_ONCE
, the source monitors the path once, creates theFileInputSplits
to be processed, forwards them to the downstream readers to read the actual data, and exits, without waiting for the readers to finish reading. This implies that no more checkpoint barriers are going to be forwarded after the source exits, thus having no checkpoints after that point.- Type Parameters:
OUT
- The type of the returned data stream- Parameters:
inputFormat
- The input format used to create the data streamfilePath
- The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path")watchType
- The mode in which the source should operate, i.e. monitor path and react to new data, or process once and exitinterval
- In the case of periodic path monitoring, this specifies the interval (in millis) between consecutive path scans- Returns:
- The data stream that represents the data read from the given file
-
readFileStream
@Deprecated public DataStream<String> readFileStream(String filePath, long intervalMillis, FileMonitoringFunction.WatchType watchType)
Deprecated.Creates a data stream that contains the contents of file created while system watches the given path. The file will be read with the system's default character set.- Parameters:
filePath
- The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path/")intervalMillis
- The interval of file watching in millisecondswatchType
- The watch type of file stream. When watchType isFileMonitoringFunction.WatchType.ONLY_NEW_FILES
, the system processes only new files.FileMonitoringFunction.WatchType.REPROCESS_WITH_APPENDED
means that the system re-processes all contents of appended file.FileMonitoringFunction.WatchType.PROCESS_ONLY_APPENDED
means that the system processes only appended contents of files.- Returns:
- The DataStream containing the given directory.
-
readFile
@Deprecated @PublicEvolving public <OUT> DataStreamSource<OUT> readFile(FileInputFormat<OUT> inputFormat, String filePath, FileProcessingMode watchType, long interval, TypeInformation<OUT> typeInformation)
Deprecated.UseFileSource#forRecordStreamFormat()/forBulkFileFormat()/forRecordFileFormat() instead
. An example of reading a file using a simpleTextLineInputFormat
:FileSource<String> source = FileSource.forRecordStreamFormat( new TextLineInputFormat(), new Path("/foo/bar")) .monitorContinuously(Duration.of(10, SECONDS)) .build();
Reads the contents of the user-specifiedfilePath
based on the givenFileInputFormat
. Depending on the providedFileProcessingMode
, the source may periodically monitor (everyinterval
ms) the path for new data (FileProcessingMode.PROCESS_CONTINUOUSLY
), or process once the data currently in the path and exit (FileProcessingMode.PROCESS_ONCE
). In addition, if the path contains files not to be processed, the user can specify a customFilePathFilter
. As a default implementation you can useFilePathFilter.createDefaultFilter()
.NOTES ON CHECKPOINTING: If the
watchType
is set toFileProcessingMode.PROCESS_ONCE
, the source monitors the path once, creates theFileInputSplits
to be processed, forwards them to the downstream readers to read the actual data, and exits, without waiting for the readers to finish reading. This implies that no more checkpoint barriers are going to be forwarded after the source exits, thus having no checkpoints after that point.- Type Parameters:
OUT
- The type of the returned data stream- Parameters:
inputFormat
- The input format used to create the data streamfilePath
- The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path")watchType
- The mode in which the source should operate, i.e. monitor path and react to new data, or process once and exittypeInformation
- Information on the type of the elements in the output streaminterval
- In the case of periodic path monitoring, this specifies the interval (in millis) between consecutive path scans- Returns:
- The data stream that represents the data read from the given file
-
socketTextStream
@Deprecated public DataStreamSource<String> socketTextStream(String hostname, int port, char delimiter, long maxRetry)
Deprecated.UsesocketTextStream(String, int, String, long)
instead.Creates a new data stream that contains the strings received infinitely from a socket. Received strings are decoded by the system's default character set. On the termination of the socket server connection retries can be initiated.Let us note that the socket itself does not report on abort and as a consequence retries are only initiated when the socket was gracefully terminated.
- Parameters:
hostname
- The host name which a server socket bindsport
- The port number which a server socket binds. A port number of 0 means that the port number is automatically allocated.delimiter
- A character which splits received strings into recordsmaxRetry
- The maximal retry interval in seconds while the program waits for a socket that is temporarily down. Reconnection is initiated every second. A number of 0 means that the reader is immediately terminated, while a negative value ensures retrying forever.- Returns:
- A data stream containing the strings received from the socket
-
socketTextStream
@PublicEvolving public DataStreamSource<String> socketTextStream(String hostname, int port, String delimiter, long maxRetry)
Creates a new data stream that contains the strings received infinitely from a socket. Received strings are decoded by the system's default character set. On the termination of the socket server connection retries can be initiated.Let us note that the socket itself does not report on abort and as a consequence retries are only initiated when the socket was gracefully terminated.
- Parameters:
hostname
- The host name which a server socket bindsport
- The port number which a server socket binds. A port number of 0 means that the port number is automatically allocated.delimiter
- A string which splits received strings into recordsmaxRetry
- The maximal retry interval in seconds while the program waits for a socket that is temporarily down. Reconnection is initiated every second. A number of 0 means that the reader is immediately terminated, while a negative value ensures retrying forever.- Returns:
- A data stream containing the strings received from the socket
-
socketTextStream
@Deprecated public DataStreamSource<String> socketTextStream(String hostname, int port, char delimiter)
Deprecated.UsesocketTextStream(String, int, String)
instead.Creates a new data stream that contains the strings received infinitely from a socket. Received strings are decoded by the system's default character set. The reader is terminated immediately when the socket is down.- Parameters:
hostname
- The host name which a server socket bindsport
- The port number which a server socket binds. A port number of 0 means that the port number is automatically allocated.delimiter
- A character which splits received strings into records- Returns:
- A data stream containing the strings received from the socket
-
socketTextStream
@PublicEvolving public DataStreamSource<String> socketTextStream(String hostname, int port, String delimiter)
Creates a new data stream that contains the strings received infinitely from a socket. Received strings are decoded by the system's default character set. The reader is terminated immediately when the socket is down.- Parameters:
hostname
- The host name which a server socket bindsport
- The port number which a server socket binds. A port number of 0 means that the port number is automatically allocated.delimiter
- A string which splits received strings into records- Returns:
- A data stream containing the strings received from the socket
-
socketTextStream
@PublicEvolving public DataStreamSource<String> socketTextStream(String hostname, int port)
Creates a new data stream that contains the strings received infinitely from a socket. Received strings are decoded by the system's default character set, using"\n" as delimiter. The reader is terminated immediately when the socket is down.- Parameters:
hostname
- The host name which a server socket bindsport
- The port number which a server socket binds. A port number of 0 means that the port number is automatically allocated.- Returns:
- A data stream containing the strings received from the socket
-
createInput
@PublicEvolving public <OUT> DataStreamSource<OUT> createInput(InputFormat<OUT,?> inputFormat)
Generic method to create an input data stream withInputFormat
.Since all data streams need specific information about their types, this method needs to determine the type of the data produced by the input format. It will attempt to determine the data type by reflection, unless the input format implements the
ResultTypeQueryable
interface. In the latter case, this method will invoke theResultTypeQueryable.getProducedType()
method to determine data type produced by the input format.NOTES ON CHECKPOINTING: In the case of a
FileInputFormat
, the source (which executes theContinuousFileMonitoringFunction
) monitors the path, creates theFileInputSplits
to be processed, forwards them to the downstream readers to read the actual data, and exits, without waiting for the readers to finish reading. This implies that no more checkpoint barriers are going to be forwarded after the source exits, thus having no checkpoints.- Type Parameters:
OUT
- The type of the returned data stream- Parameters:
inputFormat
- The input format used to create the data stream- Returns:
- The data stream that represents the data created by the input format
-
createInput
@PublicEvolving public <OUT> DataStreamSource<OUT> createInput(InputFormat<OUT,?> inputFormat, TypeInformation<OUT> typeInfo)
Generic method to create an input data stream withInputFormat
.The data stream is typed to the given TypeInformation. This method is intended for input formats where the return type cannot be determined by reflection analysis, and that do not implement the
ResultTypeQueryable
interface.NOTES ON CHECKPOINTING: In the case of a
FileInputFormat
, the source (which executes theContinuousFileMonitoringFunction
) monitors the path, creates theFileInputSplits
to be processed, forwards them to the downstream readers to read the actual data, and exits, without waiting for the readers to finish reading. This implies that no more checkpoint barriers are going to be forwarded after the source exits, thus having no checkpoints.- Type Parameters:
OUT
- The type of the returned data stream- Parameters:
inputFormat
- The input format used to create the data streamtypeInfo
- The information about the type of the output type- Returns:
- The data stream that represents the data created by the input format
-
addSource
@Deprecated public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function)
Deprecated.This method relies on theSourceFunction
API, which is due to be removed. Use thefromSource(Source, WatermarkStrategy, String)
method based on the newSource
API instead.Adds a Data Source to the streaming topology.By default sources have a parallelism of 1. To enable parallel execution, the user defined source should implement
ParallelSourceFunction
or extendRichParallelSourceFunction
. In these cases the resulting source will have the parallelism of the environment. To change this afterwards callDataStreamSource.setParallelism(int)
- Type Parameters:
OUT
- type of the returned stream- Parameters:
function
- the user defined function- Returns:
- the data stream constructed
-
addSource
@Internal public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, String sourceName)
Deprecated.This method relies on theSourceFunction
API, which is due to be removed. Use thefromSource(Source, WatermarkStrategy, String)
method based on the newSource
API instead.Adds a data source with a custom type information thus opening aDataStream
. Only in very special cases does the user need to support type information. Otherwise useaddSource(SourceFunction)
- Type Parameters:
OUT
- type of the returned stream- Parameters:
function
- the user defined functionsourceName
- Name of the data source- Returns:
- the data stream constructed
-
addSource
@Internal public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, TypeInformation<OUT> typeInfo)
Deprecated.This method relies on theSourceFunction
API, which is due to be removed. Use thefromSource(Source, WatermarkStrategy, String, TypeInformation)
method based on the newSource
API instead.Ads a data source with a custom type information thus opening aDataStream
. Only in very special cases does the user need to support type information. Otherwise useaddSource(SourceFunction)
- Type Parameters:
OUT
- type of the returned stream- Parameters:
function
- the user defined functiontypeInfo
- the user defined type information for the stream- Returns:
- the data stream constructed
-
addSource
@Internal public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, String sourceName, TypeInformation<OUT> typeInfo)
Deprecated.This method relies on theSourceFunction
API, which is due to be removed. Use thefromSource(Source, WatermarkStrategy, String, TypeInformation)
method based on the newSource
API instead.Ads a data source with a custom type information thus opening aDataStream
. Only in very special cases does the user need to support type information. Otherwise useaddSource(SourceFunction)
- Type Parameters:
OUT
- type of the returned stream- Parameters:
function
- the user defined functionsourceName
- Name of the data sourcetypeInfo
- the user defined type information for the stream- Returns:
- the data stream constructed
-
fromSource
@PublicEvolving public <OUT> DataStreamSource<OUT> fromSource(Source<OUT,?,?> source, WatermarkStrategy<OUT> timestampsAndWatermarks, String sourceName)
Adds a dataSource
to the environment to get aDataStream
.The result will be either a bounded data stream (that can be processed in a batch way) or an unbounded data stream (that must be processed in a streaming way), based on the boundedness property of the source, as defined by
Source.getBoundedness()
.The result type (that is used to create serializers for the produced data events) will be automatically extracted. This is useful for sources that describe the produced types already in their configuration, to avoid having to declare the type multiple times. For example the file sources and Kafka sources already define the produced byte their parsers/serializers/formats, and can forward that information.
- Type Parameters:
OUT
- type of the returned stream- Parameters:
source
- the user defined sourcesourceName
- Name of the data source- Returns:
- the data stream constructed
-
fromSource
@Experimental public <OUT> DataStreamSource<OUT> fromSource(Source<OUT,?,?> source, WatermarkStrategy<OUT> timestampsAndWatermarks, String sourceName, TypeInformation<OUT> typeInfo)
Adds a dataSource
to the environment to get aDataStream
.The result will be either a bounded data stream (that can be processed in a batch way) or an unbounded data stream (that must be processed in a streaming way), based on the boundedness property of the source, as defined by
Source.getBoundedness()
.This method takes an explicit type information for the produced data stream, so that callers can define directly what type/serializer will be used for the produced stream. For sources that describe their produced type, the method
fromSource(Source, WatermarkStrategy, String)
can be used to avoid specifying the produced type redundantly.- Type Parameters:
OUT
- type of the returned stream- Parameters:
source
- the user defined sourcesourceName
- Name of the data sourcetypeInfo
- the user defined type information for the stream- Returns:
- the data stream constructed
-
execute
public JobExecutionResult execute() throws Exception
Triggers the program execution. The environment will execute all parts of the program that have resulted in a "sink" operation. Sink operations are for example printing results or forwarding them to a message queue.The program execution will be logged and displayed with a generated default name.
- Returns:
- The result of the job execution, containing elapsed time and accumulators.
- Throws:
Exception
- which occurs during job execution.
-
execute
public JobExecutionResult execute(String jobName) throws Exception
Triggers the program execution. The environment will execute all parts of the program that have resulted in a "sink" operation. Sink operations are for example printing results or forwarding them to a message queue.The program execution will be logged and displayed with the provided name
- Parameters:
jobName
- Desired name of the job- Returns:
- The result of the job execution, containing elapsed time and accumulators.
- Throws:
Exception
- which occurs during job execution.
-
execute
@Internal public JobExecutionResult execute(StreamGraph streamGraph) throws Exception
Triggers the program execution. The environment will execute all parts of the program that have resulted in a "sink" operation. Sink operations are for example printing results or forwarding them to a message queue.- Parameters:
streamGraph
- the stream graph representing the transformations- Returns:
- The result of the job execution, containing elapsed time and accumulators.
- Throws:
Exception
- which occurs during job execution.
-
registerJobListener
@PublicEvolving public void registerJobListener(JobListener jobListener)
Register aJobListener
in this environment. TheJobListener
will be notified on specific job status changed.
-
clearJobListeners
@PublicEvolving public void clearJobListeners()
Clear all registeredJobListener
s.
-
executeAsync
@PublicEvolving public final JobClient executeAsync() throws Exception
Triggers the program asynchronously. The environment will execute all parts of the program that have resulted in a "sink" operation. Sink operations are for example printing results or forwarding them to a message queue.The program execution will be logged and displayed with a generated default name.
-
executeAsync
@PublicEvolving public JobClient executeAsync(String jobName) throws Exception
Triggers the program execution asynchronously. The environment will execute all parts of the program that have resulted in a "sink" operation. Sink operations are for example printing results or forwarding them to a message queue.The program execution will be logged and displayed with the provided name
-
executeAsync
@Internal public JobClient executeAsync(StreamGraph streamGraph) throws Exception
Triggers the program execution asynchronously. The environment will execute all parts of the program that have resulted in a "sink" operation. Sink operations are for example printing results or forwarding them to a message queue.
-
getStreamGraph
@Internal public StreamGraph getStreamGraph()
Getter of theStreamGraph
of the streaming job. This call clears previously registeredtransformations
.- Returns:
- The stream graph representing the transformations
-
getStreamGraph
@Internal public StreamGraph getStreamGraph(boolean clearTransformations)
Getter of theStreamGraph
of the streaming job with the option to clear previously registeredtransformations
. Clearing the transformations allows, for example, to not re-execute the same operations when callingexecute()
multiple times.- Parameters:
clearTransformations
- Whether or not to clear previously registered transformations- Returns:
- The stream graph representing the transformations
-
generateStreamGraph
@Internal public StreamGraph generateStreamGraph(List<Transformation<?>> transformations)
Generates aStreamGraph
that consists of the giventransformations
and is configured with the configuration of this environment.This method does not access or clear the previously registered transformations.
- Parameters:
transformations
- list of transformations that the graph should contain- Returns:
- The stream graph representing the transformations
-
getExecutionPlan
public String getExecutionPlan()
Creates the plan with which the system will execute the program, and returns it as a String using a JSON representation of the execution data flow graph. Note that this needs to be called, before the plan is executed.- Returns:
- The execution plan of the program, as a JSON String.
-
clean
@Internal public <F> F clean(F f)
Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning is not disabled in theExecutionConfig
-
addOperator
@Internal public void addOperator(Transformation<?> transformation)
-
getConfiguration
@Internal public ReadableConfig getConfiguration()
Gives read-only access to the underlying configuration of this environment.Note that the returned configuration might not be complete. It only contains options that have initialized the environment via
StreamExecutionEnvironment(Configuration)
or options that are not represented in dedicated configuration classes such asExecutionConfig
orCheckpointConfig
.Use
configure(ReadableConfig, ClassLoader)
to set options that are specific to this environment.
-
getExecutionEnvironment
public static StreamExecutionEnvironment getExecutionEnvironment()
Creates an execution environment that represents the context in which the program is currently executed. If the program is invoked standalone, this method returns a local execution environment, as returned bycreateLocalEnvironment()
.- Returns:
- The execution environment of the context in which the program is executed.
-
getExecutionEnvironment
public static StreamExecutionEnvironment getExecutionEnvironment(Configuration configuration)
Creates an execution environment that represents the context in which the program is currently executed. If the program is invoked standalone, this method returns a local execution environment, as returned bycreateLocalEnvironment(Configuration)
.When executed from the command line the given configuration is stacked on top of the global configuration which comes from the
config.yaml
, potentially overriding duplicated options.- Parameters:
configuration
- The configuration to instantiate the environment with.- Returns:
- The execution environment of the context in which the program is executed.
-
createLocalEnvironment
public static LocalStreamEnvironment createLocalEnvironment()
Creates aLocalStreamEnvironment
. The local execution environment will run the program in a multi-threaded fashion in the same JVM as the environment was created in. The default parallelism of the local environment is the number of hardware contexts (CPU cores / threads), unless it was specified differently bysetParallelism(int)
.- Returns:
- A local execution environment.
-
createLocalEnvironment
public static LocalStreamEnvironment createLocalEnvironment(int parallelism)
Creates aLocalStreamEnvironment
. The local execution environment will run the program in a multi-threaded fashion in the same JVM as the environment was created in. It will use the parallelism specified in the parameter.- Parameters:
parallelism
- The parallelism for the local environment.- Returns:
- A local execution environment with the specified parallelism.
-
createLocalEnvironment
public static LocalStreamEnvironment createLocalEnvironment(int parallelism, Configuration configuration)
Creates aLocalStreamEnvironment
. The local execution environment will run the program in a multi-threaded fashion in the same JVM as the environment was created in. It will use the parallelism specified in the parameter.- Parameters:
parallelism
- The parallelism for the local environment.configuration
- Pass a custom configuration into the cluster- Returns:
- A local execution environment with the specified parallelism.
-
createLocalEnvironment
public static LocalStreamEnvironment createLocalEnvironment(Configuration configuration)
Creates aLocalStreamEnvironment
. The local execution environment will run the program in a multi-threaded fashion in the same JVM as the environment was created in.- Parameters:
configuration
- Pass a custom configuration into the cluster- Returns:
- A local execution environment with the specified parallelism.
-
createLocalEnvironmentWithWebUI
@PublicEvolving public static StreamExecutionEnvironment createLocalEnvironmentWithWebUI(Configuration conf)
Creates aLocalStreamEnvironment
for local program execution that also starts the web monitoring UI.The local execution environment will run the program in a multi-threaded fashion in the same JVM as the environment was created in. It will use the parallelism specified in the parameter.
If the configuration key 'rest.port' was set in the configuration, that particular port will be used for the web UI. Otherwise, the default port (8081) will be used.
-
createRemoteEnvironment
public static StreamExecutionEnvironment createRemoteEnvironment(String host, int port, String... jarFiles)
Creates aRemoteStreamEnvironment
. The remote environment sends (parts of) the program to a cluster for execution. Note that all file paths used in the program must be accessible from the cluster. The execution will use no parallelism, unless the parallelism is set explicitly viasetParallelism(int)
.- Parameters:
host
- The host name or address of the master (JobManager), where the program should be executed.port
- The port of the master (JobManager), where the program should be executed.jarFiles
- The JAR files with code that needs to be shipped to the cluster. If the program uses user-defined functions, user-defined input formats, or any libraries, those must be provided in the JAR files.- Returns:
- A remote environment that executes the program on a cluster.
-
createRemoteEnvironment
public static StreamExecutionEnvironment createRemoteEnvironment(String host, int port, int parallelism, String... jarFiles)
Creates aRemoteStreamEnvironment
. The remote environment sends (parts of) the program to a cluster for execution. Note that all file paths used in the program must be accessible from the cluster. The execution will use the specified parallelism.- Parameters:
host
- The host name or address of the master (JobManager), where the program should be executed.port
- The port of the master (JobManager), where the program should be executed.parallelism
- The parallelism to use during the execution.jarFiles
- The JAR files with code that needs to be shipped to the cluster. If the program uses user-defined functions, user-defined input formats, or any libraries, those must be provided in the JAR files.- Returns:
- A remote environment that executes the program on a cluster.
-
createRemoteEnvironment
public static StreamExecutionEnvironment createRemoteEnvironment(String host, int port, Configuration clientConfig, String... jarFiles)
Creates aRemoteStreamEnvironment
. The remote environment sends (parts of) the program to a cluster for execution. Note that all file paths used in the program must be accessible from the cluster. The execution will use the specified parallelism.- Parameters:
host
- The host name or address of the master (JobManager), where the program should be executed.port
- The port of the master (JobManager), where the program should be executed.clientConfig
- The configuration used by the client that connects to the remote cluster.jarFiles
- The JAR files with code that needs to be shipped to the cluster. If the program uses user-defined functions, user-defined input formats, or any libraries, those must be provided in the JAR files.- Returns:
- A remote environment that executes the program on a cluster.
-
getDefaultLocalParallelism
@PublicEvolving public static int getDefaultLocalParallelism()
Gets the default parallelism that will be used for the local execution environment created bycreateLocalEnvironment()
.- Returns:
- The default local parallelism
-
setDefaultLocalParallelism
@PublicEvolving public static void setDefaultLocalParallelism(int parallelism)
Sets the default parallelism that will be used for the local execution environment created bycreateLocalEnvironment()
.- Parameters:
parallelism
- The parallelism to use as the default local parallelism.
-
initializeContextEnvironment
protected static void initializeContextEnvironment(StreamExecutionEnvironmentFactory ctx)
-
resetContextEnvironment
protected static void resetContextEnvironment()
-
registerCachedFile
public void registerCachedFile(String filePath, String name)
Registers a file at the distributed cache under the given name. The file will be accessible from any user-defined function in the (distributed) runtime under a local path. Files may be local files (which will be distributed via BlobServer), or files in a distributed file system. The runtime will copy the files temporarily to a local cache, if needed.The
RuntimeContext
can be obtained inside UDFs viaRichFunction.getRuntimeContext()
and provides accessDistributedCache
viaRuntimeContext.getDistributedCache()
.- Parameters:
filePath
- The path of the file, as a URI (e.g. "file:///some/path" or "hdfs://host:port/and/path")name
- The name under which the file is registered.
-
registerCachedFile
public void registerCachedFile(String filePath, String name, boolean executable)
Registers a file at the distributed cache under the given name. The file will be accessible from any user-defined function in the (distributed) runtime under a local path. Files may be local files (which will be distributed via BlobServer), or files in a distributed file system. The runtime will copy the files temporarily to a local cache, if needed.The
RuntimeContext
can be obtained inside UDFs viaRichFunction.getRuntimeContext()
and provides accessDistributedCache
viaRuntimeContext.getDistributedCache()
.- Parameters:
filePath
- The path of the file, as a URI (e.g. "file:///some/path" or "hdfs://host:port/and/path")name
- The name under which the file is registered.executable
- flag indicating whether the file should be executable
-
areExplicitEnvironmentsAllowed
@Internal public static boolean areExplicitEnvironmentsAllowed()
Checks whether it is currently permitted to explicitly instantiate a LocalEnvironment or a RemoteEnvironment.- Returns:
- True, if it is possible to explicitly instantiate a LocalEnvironment or a RemoteEnvironment, false otherwise.
-
getTransformations
@Internal public List<Transformation<?>> getTransformations()
-
registerCacheTransformation
@Internal public <T> void registerCacheTransformation(AbstractID intermediateDataSetID, CacheTransformation<T> t)
-
invalidateClusterDataset
@Internal public void invalidateClusterDataset(AbstractID datasetId) throws Exception
- Throws:
Exception
-
listCompletedClusterDatasets
protected Set<AbstractID> listCompletedClusterDatasets()
-
close
public void close() throws Exception
Close and clean up the execution environment. All the cached intermediate results will be released physically.- Specified by:
close
in interfaceAutoCloseable
- Throws:
Exception
-
-