public class DummyStreamExecutionEnvironment extends StreamExecutionEnvironment
StreamExecutionEnvironment
, which holds a real StreamExecutionEnvironment
, shares all configurations of the real environment, and disables all
configuration setting methods.
When translating relational plan to execution plan in the Planner
, the generated
Transformation
s will be added into StreamExecutionEnvironment's buffer, and they will be
cleared only when StreamExecutionEnvironment.execute()
method is called. Each TableEnvironment
instance holds an immutable StreamExecutionEnvironment instance. If there are
multiple translations (not all for `execute`, e.g. `explain` and then `execute`) in one
TableEnvironment instance, the transformation buffer is dirty, and execution result may be
incorrect.
This dummy StreamExecutionEnvironment is only used for buffering the transformations generated in the planner. A new dummy StreamExecutionEnvironment instance should be created for each translation, and this could avoid dirty the transformation buffer of the real StreamExecutionEnvironment instance.
All set methods (e.g. `setXX`, `enableXX`, `disableXX`, etc) are disabled to prohibit changing configuration, all get methods (e.g. `getXX`, `isXX`, etc) will be delegated to the real StreamExecutionEnvironment. `execute`, `getStreamGraph`, `getExecutionPlan` methods are also disabled, while `addOperator` method is enabled to allow the planner to add the generated transformations to the dummy StreamExecutionEnvironment.
This class could be removed once the StreamTableSource
interface and StreamTableSink
interface are reworked.
NOTE: Please remove com.esotericsoftware.kryo
item in the whitelist of
checkCodeDependencies() method in test_table_shaded_dependencies.sh
end-to-end test when
this class is removed.
cacheFile, checkpointCfg, config, configuration, DEFAULT_JOB_NAME, transformations
Constructor and Description |
---|
DummyStreamExecutionEnvironment(StreamExecutionEnvironment realExecEnv) |
Modifier and Type | Method and Description |
---|---|
void |
addDefaultKryoSerializer(Class<?> type,
Class<? extends com.esotericsoftware.kryo.Serializer<?>> serializerClass)
Adds a new Kryo default serializer to the Runtime.
|
<T extends com.esotericsoftware.kryo.Serializer<?> & Serializable> |
addDefaultKryoSerializer(Class<?> type,
T serializer)
Adds a new Kryo default serializer to the Runtime.
|
StreamExecutionEnvironment |
disableOperatorChaining()
Disables operator chaining for streaming operators.
|
StreamExecutionEnvironment |
enableCheckpointing()
Enables checkpointing for the streaming job.
|
StreamExecutionEnvironment |
enableCheckpointing(long interval)
Enables checkpointing for the streaming job.
|
StreamExecutionEnvironment |
enableCheckpointing(long interval,
CheckpointingMode mode)
Enables checkpointing for the streaming job.
|
StreamExecutionEnvironment |
enableCheckpointing(long interval,
CheckpointingMode mode)
Enables checkpointing for the streaming job.
|
StreamExecutionEnvironment |
enableCheckpointing(long interval,
CheckpointingMode mode,
boolean force)
Enables checkpointing for the streaming job.
|
JobExecutionResult |
execute()
Triggers the program execution.
|
JobExecutionResult |
execute(StreamGraph streamGraph)
Triggers the program execution.
|
JobExecutionResult |
execute(String jobName)
Triggers the program execution.
|
long |
getBufferTimeout()
Gets the maximum time frequency (milliseconds) for the flushing of the output buffers.
|
List<Tuple2<String,DistributedCache.DistributedCacheEntry>> |
getCachedFiles()
Get the list of cached files that were registered for distribution among the task managers.
|
CheckpointConfig |
getCheckpointConfig()
Gets the checkpoint config, which defines values like checkpoint interval, delay between
checkpoints, etc.
|
CheckpointingMode |
getCheckpointingConsistencyMode()
Returns the checkpointing consistency mode (exactly-once vs. at-least-once).
|
CheckpointingMode |
getCheckpointingMode()
Returns the checkpointing mode (exactly-once vs. at-least-once).
|
long |
getCheckpointInterval()
Returns the checkpointing interval or -1 if checkpointing is disabled.
|
ExecutionConfig |
getConfig()
Gets the config object.
|
ReadableConfig |
getConfiguration()
Gives read-only access to the underlying configuration of this environment.
|
String |
getExecutionPlan()
Creates the plan with which the system will execute the program, and returns it as a String
using a JSON representation of the execution data flow graph.
|
int |
getMaxParallelism()
Gets the maximum degree of parallelism defined for the program.
|
int |
getNumberOfExecutionRetries()
Gets the number of times the system will try to re-execute failed tasks.
|
int |
getParallelism()
Gets the parallelism with which operation are executed by default.
|
RestartStrategies.RestartStrategyConfiguration |
getRestartStrategy()
Returns the specified restart strategy configuration.
|
StateBackend |
getStateBackend()
Gets the state backend that defines how to store and checkpoint state.
|
StreamGraph |
getStreamGraph()
Getter of the
StreamGraph of the streaming job. |
TimeCharacteristic |
getStreamTimeCharacteristic()
Gets the time characteristic.
|
boolean |
isChainingEnabled()
Returns whether operator chaining is enabled.
|
boolean |
isForceCheckpointing()
Returns whether checkpointing is force-enabled.
|
void |
registerCachedFile(String filePath,
String name)
Registers a file at the distributed cache under the given name.
|
void |
registerCachedFile(String filePath,
String name,
boolean executable)
Registers a file at the distributed cache under the given name.
|
void |
registerType(Class<?> type)
Registers the given type with the serialization stack.
|
void |
registerTypeWithKryoSerializer(Class<?> type,
Class<? extends com.esotericsoftware.kryo.Serializer> serializerClass)
Registers the given Serializer via its class as a serializer for the given type at the
KryoSerializer.
|
<T extends com.esotericsoftware.kryo.Serializer<?> & Serializable> |
registerTypeWithKryoSerializer(Class<?> type,
T serializer)
Registers the given type with a Kryo Serializer.
|
StreamExecutionEnvironment |
setBufferTimeout(long timeoutMillis)
Sets the maximum time frequency (milliseconds) for the flushing of the output buffers.
|
StreamExecutionEnvironment |
setMaxParallelism(int maxParallelism)
Sets the maximum degree of parallelism defined for the program.
|
void |
setNumberOfExecutionRetries(int numberOfExecutionRetries)
Sets the number of times that failed tasks are re-executed.
|
StreamExecutionEnvironment |
setParallelism(int parallelism)
Sets the parallelism for operations executed through this environment.
|
void |
setRestartStrategy(RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration)
Sets the restart strategy configuration.
|
StreamExecutionEnvironment |
setStateBackend(StateBackend backend)
Sets the state backend that describes how to store operator.
|
void |
setStreamTimeCharacteristic(TimeCharacteristic characteristic)
Sets the time characteristic for all streams create from this environment, e.g., processing
time, event time, or ingestion time.
|
addOperator, addSource, addSource, addSource, addSource, areExplicitEnvironmentsAllowed, clean, clearJobListeners, close, configure, configure, createInput, createInput, createLocalEnvironment, createLocalEnvironment, createLocalEnvironment, createLocalEnvironment, createLocalEnvironmentWithWebUI, createRemoteEnvironment, createRemoteEnvironment, createRemoteEnvironment, enableChangelogStateBackend, executeAsync, executeAsync, executeAsync, fromCollection, fromCollection, fromCollection, fromCollection, fromData, fromData, fromData, fromData, fromData, fromElements, fromElements, fromParallelCollection, fromParallelCollection, fromSequence, fromSource, fromSource, generateSequence, generateStreamGraph, getDefaultLocalParallelism, getDefaultSavepointDirectory, getExecutionEnvironment, getExecutionEnvironment, getJobListeners, getStreamGraph, getTransformations, getUserClassloader, initializeContextEnvironment, invalidateClusterDataset, isChainingOfOperatorsWithDifferentMaxParallelismEnabled, isChangelogStateBackendEnabled, isForceUnalignedCheckpoints, isUnalignedCheckpointsEnabled, listCompletedClusterDatasets, readFile, readFile, readFile, readFile, readFileStream, readTextFile, readTextFile, registerCacheTransformation, registerCollectIterator, registerJobListener, registerSlotSharingGroup, resetContextEnvironment, setDefaultLocalParallelism, setDefaultSavepointDirectory, setDefaultSavepointDirectory, setDefaultSavepointDirectory, setRuntimeMode, socketTextStream, socketTextStream, socketTextStream, socketTextStream, socketTextStream
public DummyStreamExecutionEnvironment(StreamExecutionEnvironment realExecEnv)
public ExecutionConfig getConfig()
StreamExecutionEnvironment
getConfig
in class StreamExecutionEnvironment
public ReadableConfig getConfiguration()
StreamExecutionEnvironment
Note that the returned configuration might not be complete. It only contains options that
have initialized the environment via StreamExecutionEnvironment(Configuration)
or
options that are not represented in dedicated configuration classes such as ExecutionConfig
or CheckpointConfig
.
Use StreamExecutionEnvironment.configure(ReadableConfig, ClassLoader)
to set options that are specific to
this environment.
getConfiguration
in class StreamExecutionEnvironment
public List<Tuple2<String,DistributedCache.DistributedCacheEntry>> getCachedFiles()
StreamExecutionEnvironment
getCachedFiles
in class StreamExecutionEnvironment
public StreamExecutionEnvironment setParallelism(int parallelism)
StreamExecutionEnvironment
LocalStreamEnvironment
uses by default a value equal to the number of hardware contexts (CPU
cores / threads). When executing the program via the command line client from a JAR file, the
default degree of parallelism is the one configured for that setup.setParallelism
in class StreamExecutionEnvironment
parallelism
- The parallelismpublic StreamExecutionEnvironment setMaxParallelism(int maxParallelism)
StreamExecutionEnvironment
The maximum degree of parallelism specifies the upper limit for dynamic scaling. It also defines the number of key groups used for partitioned state.
setMaxParallelism
in class StreamExecutionEnvironment
maxParallelism
- Maximum degree of parallelism to be used for the program., with 0 < maxParallelism <= 2^15
.public int getParallelism()
StreamExecutionEnvironment
getParallelism
in class StreamExecutionEnvironment
public int getMaxParallelism()
StreamExecutionEnvironment
The maximum degree of parallelism specifies the upper limit for dynamic scaling. It also defines the number of key groups used for partitioned state.
getMaxParallelism
in class StreamExecutionEnvironment
public StreamExecutionEnvironment setBufferTimeout(long timeoutMillis)
StreamExecutionEnvironment
setBufferTimeout
in class StreamExecutionEnvironment
timeoutMillis
- The maximum time between two output flushes.public long getBufferTimeout()
StreamExecutionEnvironment
StreamExecutionEnvironment.setBufferTimeout(long)
.getBufferTimeout
in class StreamExecutionEnvironment
public StreamExecutionEnvironment disableOperatorChaining()
StreamExecutionEnvironment
disableOperatorChaining
in class StreamExecutionEnvironment
public boolean isChainingEnabled()
StreamExecutionEnvironment
isChainingEnabled
in class StreamExecutionEnvironment
true
if chaining is enabled, false otherwise.public CheckpointConfig getCheckpointConfig()
StreamExecutionEnvironment
getCheckpointConfig
in class StreamExecutionEnvironment
public StreamExecutionEnvironment enableCheckpointing(long interval)
StreamExecutionEnvironment
CheckpointingMode.EXACTLY_ONCE
guarantees.
The job draws checkpoints periodically, in the given interval. The state will be stored in the configured state backend.
NOTE: Checkpointing iterative streaming dataflows is not properly supported at the moment. For that reason, iterative jobs will not be started if used with enabled checkpointing.
enableCheckpointing
in class StreamExecutionEnvironment
interval
- Time interval between state checkpoints in milliseconds.public StreamExecutionEnvironment enableCheckpointing(long interval, CheckpointingMode mode)
StreamExecutionEnvironment
The job draws checkpoints periodically, in the given interval. The system uses the given
CheckpointingMode
for the checkpointing ("exactly
once" vs "at least once"). The state will be stored in the configured state backend.
NOTE: Checkpointing iterative streaming dataflows is not properly supported at the moment. For that reason, iterative jobs will not be started if used with enabled checkpointing.
enableCheckpointing
in class StreamExecutionEnvironment
interval
- Time interval between state checkpoints in milliseconds.mode
- The checkpointing mode, selecting between "exactly once" and "at least once"
guaranteed.public StreamExecutionEnvironment enableCheckpointing(long interval, CheckpointingMode mode)
StreamExecutionEnvironment
The job draws checkpoints periodically, in the given interval. The system uses the given
CheckpointingMode
for the checkpointing ("exactly once" vs "at least once"). The
state will be stored in the configured state backend.
NOTE: Checkpointing iterative streaming dataflows is not properly supported at the moment. For that reason, iterative jobs will not be started if used with enabled checkpointing.
enableCheckpointing
in class StreamExecutionEnvironment
interval
- Time interval between state checkpoints in milliseconds.mode
- The checkpointing mode, selecting between "exactly once" and "at least once"
guaranteed.public StreamExecutionEnvironment enableCheckpointing(long interval, CheckpointingMode mode, boolean force)
StreamExecutionEnvironment
The job draws checkpoints periodically, in the given interval. The state will be stored in the configured state backend.
NOTE: Checkpointing iterative streaming dataflows is not properly supported at the moment. If the "force" parameter is set to true, the system will execute the job nonetheless.
enableCheckpointing
in class StreamExecutionEnvironment
interval
- Time interval between state checkpoints in millis.mode
- The checkpointing mode, selecting between "exactly once" and "at least once"
guaranteed.force
- If true checkpointing will be enabled for iterative jobs as well.public StreamExecutionEnvironment enableCheckpointing()
StreamExecutionEnvironment
CheckpointingMode.EXACTLY_ONCE
guarantees.
The job draws checkpoints periodically, in the default interval. The state will be stored in the configured state backend.
NOTE: Checkpointing iterative streaming dataflows is not properly supported at the moment. For that reason, iterative jobs will not be started if used with enabled checkpointing.
enableCheckpointing
in class StreamExecutionEnvironment
public long getCheckpointInterval()
StreamExecutionEnvironment
Shorthand for getCheckpointConfig().getCheckpointInterval()
.
getCheckpointInterval
in class StreamExecutionEnvironment
public boolean isForceCheckpointing()
StreamExecutionEnvironment
isForceCheckpointing
in class StreamExecutionEnvironment
public CheckpointingMode getCheckpointingMode()
StreamExecutionEnvironment
Shorthand for getCheckpointConfig().getCheckpointingMode()
.
getCheckpointingMode
in class StreamExecutionEnvironment
public CheckpointingMode getCheckpointingConsistencyMode()
StreamExecutionEnvironment
Shorthand for getCheckpointConfig().getCheckpointingConsistencyMode()
.
getCheckpointingConsistencyMode
in class StreamExecutionEnvironment
public StreamExecutionEnvironment setStateBackend(StateBackend backend)
StreamExecutionEnvironment
State managed by the state backend includes both keyed state that is accessible on keyed streams
, as well as state
maintained directly by the user code that implements CheckpointedFunction
.
The HashMapStateBackend
maintains state in
heap memory, as objects. It is lightweight without extra dependencies, but is limited to JVM
heap memory.
In contrast, the EmbeddedRocksDBStateBackend
stores its state in an embedded
RocksDB
instance. This state backend can store very large state that exceeds memory
and spills to local disk. All key/value state (including windows) is stored in the key/value
index of RocksDB.
In both cases, fault tolerance is managed via the jobs CheckpointStorage
which configures how and where state
backends persist during a checkpoint.
setStateBackend
in class StreamExecutionEnvironment
StreamExecutionEnvironment.getStateBackend()
,
CheckpointConfig.setCheckpointStorage( org.apache.flink.runtime.state.CheckpointStorage)
public StateBackend getStateBackend()
StreamExecutionEnvironment
getStateBackend
in class StreamExecutionEnvironment
StreamExecutionEnvironment.setStateBackend(StateBackend)
public void setRestartStrategy(RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration)
StreamExecutionEnvironment
setRestartStrategy
in class StreamExecutionEnvironment
restartStrategyConfiguration
- Restart strategy configuration to be setpublic RestartStrategies.RestartStrategyConfiguration getRestartStrategy()
StreamExecutionEnvironment
getRestartStrategy
in class StreamExecutionEnvironment
public void setNumberOfExecutionRetries(int numberOfExecutionRetries)
StreamExecutionEnvironment
-1
indicates that the system default value (as
defined in the configuration) should be used.setNumberOfExecutionRetries
in class StreamExecutionEnvironment
numberOfExecutionRetries
- The number of times the system will try to re-execute failed
tasks.public int getNumberOfExecutionRetries()
StreamExecutionEnvironment
-1
indicates that the system default value (as defined in the configuration) should be used.getNumberOfExecutionRetries
in class StreamExecutionEnvironment
public <T extends com.esotericsoftware.kryo.Serializer<?> & Serializable> void addDefaultKryoSerializer(Class<?> type, T serializer)
StreamExecutionEnvironment
Note that the serializer instance must be serializable (as defined by java.io.Serializable), because it may be distributed to the worker nodes by java serialization.
addDefaultKryoSerializer
in class StreamExecutionEnvironment
type
- The class of the types serialized with the given serializer.serializer
- The serializer to use.public void addDefaultKryoSerializer(Class<?> type, Class<? extends com.esotericsoftware.kryo.Serializer<?>> serializerClass)
StreamExecutionEnvironment
addDefaultKryoSerializer
in class StreamExecutionEnvironment
type
- The class of the types serialized with the given serializer.serializerClass
- The class of the serializer to use.public <T extends com.esotericsoftware.kryo.Serializer<?> & Serializable> void registerTypeWithKryoSerializer(Class<?> type, T serializer)
StreamExecutionEnvironment
Note that the serializer instance must be serializable (as defined by java.io.Serializable), because it may be distributed to the worker nodes by java serialization.
registerTypeWithKryoSerializer
in class StreamExecutionEnvironment
type
- The class of the types serialized with the given serializer.serializer
- The serializer to use.public void registerTypeWithKryoSerializer(Class<?> type, Class<? extends com.esotericsoftware.kryo.Serializer> serializerClass)
StreamExecutionEnvironment
registerTypeWithKryoSerializer
in class StreamExecutionEnvironment
type
- The class of the types serialized with the given serializer.serializerClass
- The class of the serializer to use.public void registerType(Class<?> type)
StreamExecutionEnvironment
registerType
in class StreamExecutionEnvironment
type
- The class of the type to register.public void setStreamTimeCharacteristic(TimeCharacteristic characteristic)
StreamExecutionEnvironment
If you set the characteristic to IngestionTime of EventTime this will set a default
watermark update interval of 200 ms. If this is not applicable for your application you
should change it using ExecutionConfig.setAutoWatermarkInterval(long)
.
setStreamTimeCharacteristic
in class StreamExecutionEnvironment
characteristic
- The time characteristic.public TimeCharacteristic getStreamTimeCharacteristic()
StreamExecutionEnvironment
getStreamTimeCharacteristic
in class StreamExecutionEnvironment
public JobExecutionResult execute() throws Exception
StreamExecutionEnvironment
The program execution will be logged and displayed with a generated default name.
execute
in class StreamExecutionEnvironment
Exception
- which occurs during job execution.public JobExecutionResult execute(String jobName) throws Exception
StreamExecutionEnvironment
The program execution will be logged and displayed with the provided name
execute
in class StreamExecutionEnvironment
jobName
- Desired name of the jobException
- which occurs during job execution.public JobExecutionResult execute(StreamGraph streamGraph) throws Exception
StreamExecutionEnvironment
execute
in class StreamExecutionEnvironment
streamGraph
- the stream graph representing the transformationsException
- which occurs during job execution.public void registerCachedFile(String filePath, String name)
StreamExecutionEnvironment
The RuntimeContext
can be obtained inside
UDFs via RichFunction.getRuntimeContext()
and
provides access DistributedCache
via RuntimeContext.getDistributedCache()
.
registerCachedFile
in class StreamExecutionEnvironment
filePath
- The path of the file, as a URI (e.g. "file:///some/path" or
"hdfs://host:port/and/path")name
- The name under which the file is registered.public void registerCachedFile(String filePath, String name, boolean executable)
StreamExecutionEnvironment
The RuntimeContext
can be obtained inside
UDFs via RichFunction.getRuntimeContext()
and
provides access DistributedCache
via RuntimeContext.getDistributedCache()
.
registerCachedFile
in class StreamExecutionEnvironment
filePath
- The path of the file, as a URI (e.g. "file:///some/path" or
"hdfs://host:port/and/path")name
- The name under which the file is registered.executable
- flag indicating whether the file should be executablepublic StreamGraph getStreamGraph()
StreamExecutionEnvironment
StreamGraph
of the streaming job. This call clears previously
registered transformations
.getStreamGraph
in class StreamExecutionEnvironment
public String getExecutionPlan()
StreamExecutionEnvironment
getExecutionPlan
in class StreamExecutionEnvironment
Copyright © 2014–2024 The Apache Software Foundation. All rights reserved.