Class StreamExecutionEnvironment

    • Field Detail

      • config

        protected final ExecutionConfig config
        The execution configuration for this environment.
      • checkpointCfg

        protected final CheckpointConfig checkpointCfg
        Settings that control the checkpointing behavior.
      • cacheFile

        protected final List<Tuple2<String,​DistributedCache.DistributedCacheEntry>> cacheFile
        Now we could not migrate this field to configuration. Because this object field remains directly accessible and modifiable as it is exposed through a getter to users, allowing external modifications.
    • Method Detail

      • getUserClassloader

        protected ClassLoader getUserClassloader()
      • setParallelism

        public StreamExecutionEnvironment setParallelism​(int parallelism)
        Sets the parallelism for operations executed through this environment. Setting a parallelism of x here will cause all operators (such as map, batchReduce) to run with x parallel instances. This method overrides the default parallelism for this environment. The LocalStreamEnvironment uses by default a value equal to the number of hardware contexts (CPU cores / threads). When executing the program via the command line client from a JAR file, the default degree of parallelism is the one configured for that setup.
        Parameters:
        parallelism - The parallelism
      • setRuntimeMode

        @PublicEvolving
        public StreamExecutionEnvironment setRuntimeMode​(RuntimeExecutionMode executionMode)
        Sets the runtime execution mode for the application (see RuntimeExecutionMode). This is equivalent to setting the execution.runtime-mode in your application's configuration file.

        We recommend users to NOT use this method but set the execution.runtime-mode using the command-line when submitting the application. Keeping the application code configuration-free allows for more flexibility as the same application will be able to be executed in any execution mode.

        Parameters:
        executionMode - the desired execution mode.
        Returns:
        The execution environment of your application.
      • setMaxParallelism

        public StreamExecutionEnvironment setMaxParallelism​(int maxParallelism)
        Sets the maximum degree of parallelism defined for the program. The upper limit (inclusive) is Short.MAX_VALUE + 1.

        The maximum degree of parallelism specifies the upper limit for dynamic scaling. It also defines the number of key groups used for partitioned state.

        Parameters:
        maxParallelism - Maximum degree of parallelism to be used for the program., with 0 < maxParallelism <= 2^15.
      • registerSlotSharingGroup

        @PublicEvolving
        public StreamExecutionEnvironment registerSlotSharingGroup​(SlotSharingGroup slotSharingGroup)
        Register a slot sharing group with its resource spec.

        Note that a slot sharing group hints the scheduler that the grouped operators CAN be deployed into a shared slot. There's no guarantee that the scheduler always deploy the grouped operators together. In cases grouped operators are deployed into separate slots, the slot resources will be derived from the specified group requirements.

        Parameters:
        slotSharingGroup - which contains name and its resource spec.
      • getParallelism

        public int getParallelism()
        Gets the parallelism with which operation are executed by default. Operations can individually override this value to use a specific parallelism.
        Returns:
        The parallelism used by operations, unless they override that value.
      • getMaxParallelism

        public int getMaxParallelism()
        Gets the maximum degree of parallelism defined for the program.

        The maximum degree of parallelism specifies the upper limit for dynamic scaling. It also defines the number of key groups used for partitioned state.

        Returns:
        Maximum degree of parallelism
      • setBufferTimeout

        public StreamExecutionEnvironment setBufferTimeout​(long timeoutMillis)
        Sets the maximum time frequency (milliseconds) for the flushing of the output buffers. By default the output buffers flush frequently to provide low latency and to aid smooth developer experience. Setting the parameter can result in three logical modes:
        • A positive integer triggers flushing periodically by that integer
        • 0 triggers flushing after every record thus minimizing latency
        • -1 triggers flushing only when the output buffer is full thus maximizing throughput
        Parameters:
        timeoutMillis - The maximum time between two output flushes.
      • getBufferTimeout

        public long getBufferTimeout()
        Gets the maximum time frequency (milliseconds) for the flushing of the output buffers. For clarification on the extremal values see setBufferTimeout(long).
        Returns:
        The timeout of the buffer.
      • disableOperatorChaining

        @PublicEvolving
        public StreamExecutionEnvironment disableOperatorChaining()
        Disables operator chaining for streaming operators. Operator chaining allows non-shuffle operations to be co-located in the same thread fully avoiding serialization and de-serialization.
        Returns:
        StreamExecutionEnvironment with chaining disabled.
      • isChainingEnabled

        @PublicEvolving
        public boolean isChainingEnabled()
        Returns whether operator chaining is enabled.
        Returns:
        true if chaining is enabled, false otherwise.
      • isChainingOfOperatorsWithDifferentMaxParallelismEnabled

        @PublicEvolving
        public boolean isChainingOfOperatorsWithDifferentMaxParallelismEnabled()
      • getCheckpointConfig

        public CheckpointConfig getCheckpointConfig()
        Gets the checkpoint config, which defines values like checkpoint interval, delay between checkpoints, etc.
        Returns:
        The checkpoint config.
      • enableCheckpointing

        public StreamExecutionEnvironment enableCheckpointing​(long interval)
        Enables checkpointing for the streaming job. The distributed state of the streaming dataflow will be periodically snapshotted. In case of a failure, the streaming dataflow will be restarted from the latest completed checkpoint. This method selects CheckpointingMode.EXACTLY_ONCE guarantees.

        The job draws checkpoints periodically, in the given interval. The state will be stored in the configured state backend.

        NOTE: Checkpointing iterative streaming dataflows is not properly supported at the moment. For that reason, iterative jobs will not be started if used with enabled checkpointing.

        Parameters:
        interval - Time interval between state checkpoints in milliseconds.
      • enableCheckpointing

        @Deprecated
        public StreamExecutionEnvironment enableCheckpointing​(long interval,
                                                              CheckpointingMode mode)
        Enables checkpointing for the streaming job. The distributed state of the streaming dataflow will be periodically snapshotted. In case of a failure, the streaming dataflow will be restarted from the latest completed checkpoint.

        The job draws checkpoints periodically, in the given interval. The system uses the given CheckpointingMode for the checkpointing ("exactly once" vs "at least once"). The state will be stored in the configured state backend.

        NOTE: Checkpointing iterative streaming dataflows is not properly supported at the moment. For that reason, iterative jobs will not be started if used with enabled checkpointing.

        Parameters:
        interval - Time interval between state checkpoints in milliseconds.
        mode - The checkpointing mode, selecting between "exactly once" and "at least once" guaranteed.
      • enableCheckpointing

        public StreamExecutionEnvironment enableCheckpointing​(long interval,
                                                              CheckpointingMode mode)
        Enables checkpointing for the streaming job. The distributed state of the streaming dataflow will be periodically snapshotted. In case of a failure, the streaming dataflow will be restarted from the latest completed checkpoint.

        The job draws checkpoints periodically, in the given interval. The system uses the given CheckpointingMode for the checkpointing ("exactly once" vs "at least once"). The state will be stored in the configured state backend.

        NOTE: Checkpointing iterative streaming dataflows is not properly supported at the moment. For that reason, iterative jobs will not be started if used with enabled checkpointing.

        Parameters:
        interval - Time interval between state checkpoints in milliseconds.
        mode - The checkpointing mode, selecting between "exactly once" and "at least once" guaranteed.
      • getCheckpointInterval

        public long getCheckpointInterval()
        Returns the checkpointing interval or -1 if checkpointing is disabled.

        Shorthand for getCheckpointConfig().getCheckpointInterval().

        Returns:
        The checkpointing interval or -1
      • isUnalignedCheckpointsEnabled

        @PublicEvolving
        public boolean isUnalignedCheckpointsEnabled()
        Returns whether unaligned checkpoints are enabled.
      • isForceUnalignedCheckpoints

        @PublicEvolving
        public boolean isForceUnalignedCheckpoints()
        Returns whether unaligned checkpoints are force-enabled.
      • getCheckpointingConsistencyMode

        public CheckpointingMode getCheckpointingConsistencyMode()
        Returns the checkpointing consistency mode (exactly-once vs. at-least-once).

        Shorthand for getCheckpointConfig().getCheckpointingConsistencyMode().

        Returns:
        The checkpoint mode
      • enableChangelogStateBackend

        @PublicEvolving
        public StreamExecutionEnvironment enableChangelogStateBackend​(boolean enabled)
        Enable the change log for current state backend. This change log allows operators to persist state changes in a very fine-grained manner. Currently, the change log only applies to keyed state, so non-keyed operator state and channel state are persisted as usual. The 'state' here refers to 'keyed state'. Details are as follows:

        Stateful operators write the state changes to that log (logging the state), in addition to applying them to the state tables in RocksDB or the in-mem Hashtable.

        An operator can acknowledge a checkpoint as soon as the changes in the log have reached the durable checkpoint storage.

        The state tables are persisted periodically, independent of the checkpoints. We call this the materialization of the state on the checkpoint storage.

        Once the state is materialized on checkpoint storage, the state changelog can be truncated to the corresponding point.

        It establish a way to drastically reduce the checkpoint interval for streaming applications across state backends. For more details please check the FLIP-158.

        If this method is not called explicitly, it means no preference for enabling the change log. Configs for change log enabling will override in different config levels (job/local/cluster).

        Parameters:
        enabled - true if enable the change log for state backend explicitly, otherwise disable the change log.
        Returns:
        This StreamExecutionEnvironment itself, to allow chaining of function calls.
        See Also:
        isChangelogStateBackendEnabled()
      • setDefaultSavepointDirectory

        @PublicEvolving
        public StreamExecutionEnvironment setDefaultSavepointDirectory​(String savepointDirectory)
        Sets the default savepoint directory, where savepoints will be written to if no is explicitly provided when triggered.
        Returns:
        This StreamExecutionEnvironment itself, to allow chaining of function calls.
        See Also:
        getDefaultSavepointDirectory()
      • setDefaultSavepointDirectory

        @PublicEvolving
        public StreamExecutionEnvironment setDefaultSavepointDirectory​(URI savepointDirectory)
        Sets the default savepoint directory, where savepoints will be written to if no is explicitly provided when triggered.
        Returns:
        This StreamExecutionEnvironment itself, to allow chaining of function calls.
        See Also:
        getDefaultSavepointDirectory()
      • setDefaultSavepointDirectory

        @PublicEvolving
        public StreamExecutionEnvironment setDefaultSavepointDirectory​(Path savepointDirectory)
        Sets the default savepoint directory, where savepoints will be written to if no is explicitly provided when triggered.
        Returns:
        This StreamExecutionEnvironment itself, to allow chaining of function calls.
        See Also:
        getDefaultSavepointDirectory()
      • configure

        @PublicEvolving
        public void configure​(ReadableConfig configuration,
                              ClassLoader classLoader)
        Sets all relevant options contained in the ReadableConfig. It will reconfigure StreamExecutionEnvironment, ExecutionConfig and CheckpointConfig.

        It will change the value of a setting only if a corresponding option was set in the configuration. If a key is not present, the current value of a field will remain untouched.

        Parameters:
        configuration - a configuration to read the values from
        classLoader - a class loader to use when loading classes
      • fromData

        @SafeVarargs
        public final <OUT> DataStreamSource<OUT> fromData​(OUT... data)
        Creates a new data stream that contains the given elements. The elements must all be of the same type, for example, all of the String or Integer.

        The framework will try and determine the exact type from the elements. In case of generic elements, it may be necessary to manually supply the type information via #fromData(org.apache.flink.api.common.typeinfo.TypeInformation, OUT...).

        NOTE: This creates a non-parallel data stream source by default (parallelism of one). Adjustment of parallelism is supported via setParallelism() on the result.

        Type Parameters:
        OUT - The type of the returned data stream
        Parameters:
        data - The array of elements to create the data stream from.
        Returns:
        The data stream representing the given array of elements
      • fromData

        @SafeVarargs
        public final <OUT> DataStreamSource<OUT> fromData​(TypeInformation<OUT> typeInfo,
                                                          OUT... data)
        Creates a new data stream that contains the given elements. The elements should be the same or be the subclass to the typeInfo type. The sequence of elements must not be empty.

        NOTE: This creates a non-parallel data stream source by default (parallelism of one). Adjustment of parallelism is supported via setParallelism() on the result.

        Type Parameters:
        OUT - The type of the returned data stream
        Parameters:
        typeInfo - The type information of the elements.
        data - The array of elements to create the data stream from.
        Returns:
        The data stream representing the given array of elements
      • fromData

        public <OUT> DataStreamSource<OUT> fromData​(Collection<OUT> data,
                                                    TypeInformation<OUT> typeInfo)
        Creates a new data stream that contains the given elements. The elements must all be of the same type, for example, all of the String or Integer.

        The framework will try and determine the exact type from the elements. In case of generic elements, it may be necessary to manually supply the type information via #fromData(org.apache.flink.api.common.typeinfo.TypeInformation, OUT...).

        NOTE: This creates a non-parallel data stream source by default (parallelism of one). Adjustment of parallelism is supported via setParallelism() on the result.

        Type Parameters:
        OUT - The generic type of the returned data stream.
        Parameters:
        data - The collection of elements to create the data stream from.
        typeInfo - The type information of the elements.
        Returns:
        The data stream representing the given collection
      • fromData

        @SafeVarargs
        public final <OUT> DataStreamSource<OUT> fromData​(Class<OUT> type,
                                                          OUT... data)
        Creates a new data stream that contains the given elements. The framework will determine the type according to the based type user supplied. The elements should be the same or be the subclass to the based type. The sequence of elements must not be empty.

        NOTE: This creates a non-parallel data stream source by default (parallelism of one). Adjustment of parallelism is supported via setParallelism() on the result.

        Type Parameters:
        OUT - The type of the returned data stream
        Parameters:
        type - The based class type in the collection.
        data - The array of elements to create the data stream from.
        Returns:
        The data stream representing the given array of elements
      • fromData

        public <OUT> DataStreamSource<OUT> fromData​(Collection<OUT> data)
        Creates a new data stream that contains the given elements.The type of the data stream is that of the elements in the collection.

        The framework will try and determine the exact type from the collection elements. In case of generic elements, it may be necessary to manually supply the type information via fromData(java.util.Collection, org.apache.flink.api.common.typeinfo.TypeInformation).

        NOTE: This creates a non-parallel data stream source by default (parallelism of one). Adjustment of parallelism is supported via setParallelism() on the result.

        Type Parameters:
        OUT - The generic type of the returned data stream.
        Parameters:
        data - The collection of elements to create the data stream from.
        Returns:
        The data stream representing the given collection
      • generateSequence

        @Deprecated
        public DataStreamSource<Long> generateSequence​(long from,
                                                       long to)
        Deprecated.
        Use fromSequence(long, long) instead to create a new data stream that contains NumberSequenceSource.
        Creates a new data stream that contains a sequence of numbers. This is a parallel source, if you manually set the parallelism to 1 (using SingleOutputStreamOperator.setParallelism(int)) the generated sequence of elements is in order.
        Parameters:
        from - The number to start at (inclusive)
        to - The number to stop at (inclusive)
        Returns:
        A data stream, containing all number in the [from, to] interval
      • fromSequence

        public DataStreamSource<Long> fromSequence​(long from,
                                                   long to)
        Creates a new data stream that contains a sequence of numbers (longs) and is useful for testing and for cases that just need a stream of N events of any kind.

        The generated source splits the sequence into as many parallel sub-sequences as there are parallel source readers. Each sub-sequence will be produced in order. If the parallelism is limited to one, the source will produce one sequence in order.

        This source is always bounded. For very long sequences (for example over the entire domain of long integer values), you may consider executing the application in a streaming manner because of the end bound that is pretty far away.

        Use fromSource(Source, WatermarkStrategy, String) together with NumberSequenceSource if you required more control over the created sources. For example, if you want to set a WatermarkStrategy.

        Parameters:
        from - The number to start at (inclusive)
        to - The number to stop at (inclusive)
      • fromElements

        @SafeVarargs
        @Deprecated
        public final <OUT> DataStreamSource<OUT> fromElements​(OUT... data)
        Deprecated.
        This method will be removed a future release, possibly as early as version 2.0. Use #fromData(OUT...) instead.
        Creates a new data stream that contains the given elements. The elements must all be of the same type, for example, all of the String or Integer.

        The framework will try and determine the exact type from the elements. In case of generic elements, it may be necessary to manually supply the type information via fromCollection(java.util.Collection, org.apache.flink.api.common.typeinfo.TypeInformation).

        Note that this operation will result in a non-parallel data stream source, i.e. a data stream source with a degree of parallelism one.

        Type Parameters:
        OUT - The type of the returned data stream
        Parameters:
        data - The array of elements to create the data stream from.
        Returns:
        The data stream representing the given array of elements
      • fromElements

        @SafeVarargs
        @Deprecated
        public final <OUT> DataStreamSource<OUT> fromElements​(Class<OUT> type,
                                                              OUT... data)
        Deprecated.
        This method will be removed a future release, possibly as early as version 2.0. Use #fromData(OUT...) instead.
        Creates a new data stream that contains the given elements. The framework will determine the type according to the based type user supplied. The elements should be the same or be the subclass to the based type. The sequence of elements must not be empty. Note that this operation will result in a non-parallel data stream source, i.e. a data stream source with a degree of parallelism one.
        Type Parameters:
        OUT - The type of the returned data stream
        Parameters:
        type - The based class type in the collection.
        data - The array of elements to create the data stream from.
        Returns:
        The data stream representing the given array of elements
      • fromCollection

        public <OUT> DataStreamSource<OUT> fromCollection​(Collection<OUT> data)
        Deprecated.
        This method will be removed a future release, possibly as early as version 2.0. Use fromData(Collection) instead.
        Creates a data stream from the given non-empty collection. The type of the data stream is that of the elements in the collection.

        The framework will try and determine the exact type from the collection elements. In case of generic elements, it may be necessary to manually supply the type information via fromCollection(java.util.Collection, org.apache.flink.api.common.typeinfo.TypeInformation).

        Note that this operation will result in a non-parallel data stream source, i.e. a data stream source with parallelism one.

        Type Parameters:
        OUT - The generic type of the returned data stream.
        Parameters:
        data - The collection of elements to create the data stream from.
        Returns:
        The data stream representing the given collection
      • fromCollection

        public <OUT> DataStreamSource<OUT> fromCollection​(Collection<OUT> data,
                                                          TypeInformation<OUT> typeInfo)
        Deprecated.
        This method will be removed a future release, possibly as early as version 2.0. Use fromData(Collection, TypeInformation) instead.
        Creates a data stream from the given non-empty collection.

        Note that this operation will result in a non-parallel data stream source, i.e., a data stream source with parallelism one.

        Type Parameters:
        OUT - The type of the returned data stream
        Parameters:
        data - The collection of elements to create the data stream from
        typeInfo - The TypeInformation for the produced data stream
        Returns:
        The data stream representing the given collection
      • fromCollection

        public <OUT> DataStreamSource<OUT> fromCollection​(Iterator<OUT> data,
                                                          Class<OUT> type)
        Deprecated.
        This method will be removed a future release, possibly as early as version 2.0. Use fromData(Collection, TypeInformation) instead. For rate-limited data generation, use DataGeneratorSource with RateLimiterStrategy. If you need to use a fixed set of elements in such scenario, combine it with FromElementsGeneratorFunction.
        Creates a data stream from the given iterator.

        Because the iterator will remain unmodified until the actual execution happens, the type of data returned by the iterator must be given explicitly in the form of the type class (this is due to the fact that the Java compiler erases the generic type information).

        Note that this operation will result in a non-parallel data stream source, i.e., a data stream source with a parallelism of one.

        Type Parameters:
        OUT - The type of the returned data stream
        Parameters:
        data - The iterator of elements to create the data stream from
        type - The class of the data produced by the iterator. Must not be a generic class.
        Returns:
        The data stream representing the elements in the iterator
        See Also:
        fromCollection(java.util.Iterator, org.apache.flink.api.common.typeinfo.TypeInformation)
      • fromCollection

        public <OUT> DataStreamSource<OUT> fromCollection​(Iterator<OUT> data,
                                                          TypeInformation<OUT> typeInfo)
        Deprecated.
        This method will be removed a future release, possibly as early as version 2.0. Use fromData(Collection, TypeInformation) instead. For rate-limited data generation, use DataGeneratorSource with RateLimiterStrategy. If you need to use a fixed set of elements in such scenario, combine it with FromElementsGeneratorFunction.
        Creates a data stream from the given iterator.

        Because the iterator will remain unmodified until the actual execution happens, the type of data returned by the iterator must be given explicitly in the form of the type information. This method is useful for cases where the type is generic. In that case, the type class (as given in fromCollection(java.util.Iterator, Class) does not supply all type information.

        Note that this operation will result in a non-parallel data stream source, i.e., a data stream source with parallelism one.

        Type Parameters:
        OUT - The type of the returned data stream
        Parameters:
        data - The iterator of elements to create the data stream from
        typeInfo - The TypeInformation for the produced data stream
        Returns:
        The data stream representing the elements in the iterator
      • fromParallelCollection

        public <OUT> DataStreamSource<OUT> fromParallelCollection​(SplittableIterator<OUT> iterator,
                                                                  Class<OUT> type)
        Creates a new data stream that contains elements in the iterator. The iterator is splittable, allowing the framework to create a parallel data stream source that returns the elements in the iterator.

        Because the iterator will remain unmodified until the actual execution happens, the type of data returned by the iterator must be given explicitly in the form of the type class (this is due to the fact that the Java compiler erases the generic type information).

        Type Parameters:
        OUT - The type of the returned data stream
        Parameters:
        iterator - The iterator that produces the elements of the data stream
        type - The class of the data produced by the iterator. Must not be a generic class.
        Returns:
        A data stream representing the elements in the iterator
      • fromParallelCollection

        public <OUT> DataStreamSource<OUT> fromParallelCollection​(SplittableIterator<OUT> iterator,
                                                                  TypeInformation<OUT> typeInfo)
        Creates a new data stream that contains elements in the iterator. The iterator is splittable, allowing the framework to create a parallel data stream source that returns the elements in the iterator.

        Because the iterator will remain unmodified until the actual execution happens, the type of data returned by the iterator must be given explicitly in the form of the type information. This method is useful for cases where the type is generic. In that case, the type class (as given in fromParallelCollection(org.apache.flink.util.SplittableIterator, Class) does not supply all type information.

        Type Parameters:
        OUT - The type of the returned data stream
        Parameters:
        iterator - The iterator that produces the elements of the data stream
        typeInfo - The TypeInformation for the produced data stream.
        Returns:
        A data stream representing the elements in the iterator
      • readFile

        @Deprecated
        public <OUT> DataStreamSource<OUT> readFile​(FileInputFormat<OUT> inputFormat,
                                                    String filePath)
        Deprecated.
        Use FileSource#forRecordStreamFormat()/forBulkFileFormat()/forRecordFileFormat() instead. An example of reading a file using a simple TextLineInputFormat:
        
         FileSource<String> source =
                FileSource.forRecordStreamFormat(
                   new TextLineInputFormat(), new Path("/foo/bar"))
                .build();
         
        Reads the contents of the user-specified filePath based on the given FileInputFormat.

        Since all data streams need specific information about their types, this method needs to determine the type of the data produced by the input format. It will attempt to determine the data type by reflection, unless the input format implements the ResultTypeQueryable interface. In the latter case, this method will invoke the ResultTypeQueryable.getProducedType() method to determine data type produced by the input format.

        NOTES ON CHECKPOINTING: The source monitors the path, creates the FileInputSplits to be processed, forwards them to the downstream readers to read the actual data, and exits, without waiting for the readers to finish reading. This implies that no more checkpoint barriers are going to be forwarded after the source exits, thus having no checkpoints after that point.

        Type Parameters:
        OUT - The type of the returned data stream
        Parameters:
        filePath - The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path")
        inputFormat - The input format used to create the data stream
        Returns:
        The data stream that represents the data read from the given file
      • readFile

        @Deprecated
        @PublicEvolving
        public <OUT> DataStreamSource<OUT> readFile​(FileInputFormat<OUT> inputFormat,
                                                    String filePath,
                                                    FileProcessingMode watchType,
                                                    long interval)
        Deprecated.
        Use FileSource#forRecordStreamFormat()/forBulkFileFormat()/forRecordFileFormat() instead. An example of reading a file using a simple TextLineInputFormat:
        
         FileSource<String> source =
                FileSource.forRecordStreamFormat(
                   new TextLineInputFormat(), new Path("/foo/bar"))
                .monitorContinuously(Duration.of(10, SECONDS))
                .build();
         
        Reads the contents of the user-specified filePath based on the given FileInputFormat. Depending on the provided FileProcessingMode, the source may periodically monitor (every interval ms) the path for new data (FileProcessingMode.PROCESS_CONTINUOUSLY), or process once the data currently in the path and exit (FileProcessingMode.PROCESS_ONCE). In addition, if the path contains files not to be processed, the user can specify a custom FilePathFilter. As a default implementation you can use FilePathFilter.createDefaultFilter().

        Since all data streams need specific information about their types, this method needs to determine the type of the data produced by the input format. It will attempt to determine the data type by reflection, unless the input format implements the ResultTypeQueryable interface. In the latter case, this method will invoke the ResultTypeQueryable.getProducedType() method to determine data type produced by the input format.

        NOTES ON CHECKPOINTING: If the watchType is set to FileProcessingMode.PROCESS_ONCE, the source monitors the path once, creates the FileInputSplits to be processed, forwards them to the downstream readers to read the actual data, and exits, without waiting for the readers to finish reading. This implies that no more checkpoint barriers are going to be forwarded after the source exits, thus having no checkpoints after that point.

        Type Parameters:
        OUT - The type of the returned data stream
        Parameters:
        inputFormat - The input format used to create the data stream
        filePath - The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path")
        watchType - The mode in which the source should operate, i.e. monitor path and react to new data, or process once and exit
        interval - In the case of periodic path monitoring, this specifies the interval (in millis) between consecutive path scans
        Returns:
        The data stream that represents the data read from the given file
      • readFile

        @Deprecated
        @PublicEvolving
        public <OUT> DataStreamSource<OUT> readFile​(FileInputFormat<OUT> inputFormat,
                                                    String filePath,
                                                    FileProcessingMode watchType,
                                                    long interval,
                                                    TypeInformation<OUT> typeInformation)
        Deprecated.
        Use FileSource#forRecordStreamFormat()/forBulkFileFormat()/forRecordFileFormat() instead. An example of reading a file using a simple TextLineInputFormat:
        
         FileSource<String> source =
                FileSource.forRecordStreamFormat(
                   new TextLineInputFormat(), new Path("/foo/bar"))
                .monitorContinuously(Duration.of(10, SECONDS))
                .build();
         
        Reads the contents of the user-specified filePath based on the given FileInputFormat. Depending on the provided FileProcessingMode, the source may periodically monitor (every interval ms) the path for new data (FileProcessingMode.PROCESS_CONTINUOUSLY), or process once the data currently in the path and exit (FileProcessingMode.PROCESS_ONCE). In addition, if the path contains files not to be processed, the user can specify a custom FilePathFilter. As a default implementation you can use FilePathFilter.createDefaultFilter().

        NOTES ON CHECKPOINTING: If the watchType is set to FileProcessingMode.PROCESS_ONCE, the source monitors the path once, creates the FileInputSplits to be processed, forwards them to the downstream readers to read the actual data, and exits, without waiting for the readers to finish reading. This implies that no more checkpoint barriers are going to be forwarded after the source exits, thus having no checkpoints after that point.

        Type Parameters:
        OUT - The type of the returned data stream
        Parameters:
        inputFormat - The input format used to create the data stream
        filePath - The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path")
        watchType - The mode in which the source should operate, i.e. monitor path and react to new data, or process once and exit
        typeInformation - Information on the type of the elements in the output stream
        interval - In the case of periodic path monitoring, this specifies the interval (in millis) between consecutive path scans
        Returns:
        The data stream that represents the data read from the given file
      • socketTextStream

        @Deprecated
        public DataStreamSource<String> socketTextStream​(String hostname,
                                                         int port,
                                                         char delimiter,
                                                         long maxRetry)
        Creates a new data stream that contains the strings received infinitely from a socket. Received strings are decoded by the system's default character set. On the termination of the socket server connection retries can be initiated.

        Let us note that the socket itself does not report on abort and as a consequence retries are only initiated when the socket was gracefully terminated.

        Parameters:
        hostname - The host name which a server socket binds
        port - The port number which a server socket binds. A port number of 0 means that the port number is automatically allocated.
        delimiter - A character which splits received strings into records
        maxRetry - The maximal retry interval in seconds while the program waits for a socket that is temporarily down. Reconnection is initiated every second. A number of 0 means that the reader is immediately terminated, while a negative value ensures retrying forever.
        Returns:
        A data stream containing the strings received from the socket
      • socketTextStream

        @PublicEvolving
        public DataStreamSource<String> socketTextStream​(String hostname,
                                                         int port,
                                                         String delimiter,
                                                         long maxRetry)
        Creates a new data stream that contains the strings received infinitely from a socket. Received strings are decoded by the system's default character set. On the termination of the socket server connection retries can be initiated.

        Let us note that the socket itself does not report on abort and as a consequence retries are only initiated when the socket was gracefully terminated.

        Parameters:
        hostname - The host name which a server socket binds
        port - The port number which a server socket binds. A port number of 0 means that the port number is automatically allocated.
        delimiter - A string which splits received strings into records
        maxRetry - The maximal retry interval in seconds while the program waits for a socket that is temporarily down. Reconnection is initiated every second. A number of 0 means that the reader is immediately terminated, while a negative value ensures retrying forever.
        Returns:
        A data stream containing the strings received from the socket
      • socketTextStream

        @Deprecated
        public DataStreamSource<String> socketTextStream​(String hostname,
                                                         int port,
                                                         char delimiter)
        Deprecated.
        Creates a new data stream that contains the strings received infinitely from a socket. Received strings are decoded by the system's default character set. The reader is terminated immediately when the socket is down.
        Parameters:
        hostname - The host name which a server socket binds
        port - The port number which a server socket binds. A port number of 0 means that the port number is automatically allocated.
        delimiter - A character which splits received strings into records
        Returns:
        A data stream containing the strings received from the socket
      • socketTextStream

        @PublicEvolving
        public DataStreamSource<String> socketTextStream​(String hostname,
                                                         int port,
                                                         String delimiter)
        Creates a new data stream that contains the strings received infinitely from a socket. Received strings are decoded by the system's default character set. The reader is terminated immediately when the socket is down.
        Parameters:
        hostname - The host name which a server socket binds
        port - The port number which a server socket binds. A port number of 0 means that the port number is automatically allocated.
        delimiter - A string which splits received strings into records
        Returns:
        A data stream containing the strings received from the socket
      • socketTextStream

        @PublicEvolving
        public DataStreamSource<String> socketTextStream​(String hostname,
                                                         int port)
        Creates a new data stream that contains the strings received infinitely from a socket. Received strings are decoded by the system's default character set, using"\n" as delimiter. The reader is terminated immediately when the socket is down.
        Parameters:
        hostname - The host name which a server socket binds
        port - The port number which a server socket binds. A port number of 0 means that the port number is automatically allocated.
        Returns:
        A data stream containing the strings received from the socket
      • createInput

        @PublicEvolving
        public <OUT> DataStreamSource<OUT> createInput​(InputFormat<OUT,​?> inputFormat)
        Generic method to create an input data stream with InputFormat.

        Since all data streams need specific information about their types, this method needs to determine the type of the data produced by the input format. It will attempt to determine the data type by reflection, unless the input format implements the ResultTypeQueryable interface. In the latter case, this method will invoke the ResultTypeQueryable.getProducedType() method to determine data type produced by the input format.

        NOTES ON CHECKPOINTING: In the case of a FileInputFormat, the source (which executes the ContinuousFileMonitoringFunction) monitors the path, creates the FileInputSplits to be processed, forwards them to the downstream readers to read the actual data, and exits, without waiting for the readers to finish reading. This implies that no more checkpoint barriers are going to be forwarded after the source exits, thus having no checkpoints.

        Type Parameters:
        OUT - The type of the returned data stream
        Parameters:
        inputFormat - The input format used to create the data stream
        Returns:
        The data stream that represents the data created by the input format
      • createInput

        @PublicEvolving
        public <OUT> DataStreamSource<OUT> createInput​(InputFormat<OUT,​?> inputFormat,
                                                       TypeInformation<OUT> typeInfo)
        Generic method to create an input data stream with InputFormat.

        The data stream is typed to the given TypeInformation. This method is intended for input formats where the return type cannot be determined by reflection analysis, and that do not implement the ResultTypeQueryable interface.

        NOTES ON CHECKPOINTING: In the case of a FileInputFormat, the source (which executes the ContinuousFileMonitoringFunction) monitors the path, creates the FileInputSplits to be processed, forwards them to the downstream readers to read the actual data, and exits, without waiting for the readers to finish reading. This implies that no more checkpoint barriers are going to be forwarded after the source exits, thus having no checkpoints.

        Type Parameters:
        OUT - The type of the returned data stream
        Parameters:
        inputFormat - The input format used to create the data stream
        typeInfo - The information about the type of the output type
        Returns:
        The data stream that represents the data created by the input format
      • fromSource

        @PublicEvolving
        public <OUT> DataStreamSource<OUT> fromSource​(Source<OUT,​?,​?> source,
                                                      WatermarkStrategy<OUT> timestampsAndWatermarks,
                                                      String sourceName)
        Adds a data Source to the environment to get a DataStream.

        The result will be either a bounded data stream (that can be processed in a batch way) or an unbounded data stream (that must be processed in a streaming way), based on the boundedness property of the source, as defined by Source.getBoundedness().

        The result type (that is used to create serializers for the produced data events) will be automatically extracted. This is useful for sources that describe the produced types already in their configuration, to avoid having to declare the type multiple times. For example the file sources and Kafka sources already define the produced byte their parsers/serializers/formats, and can forward that information.

        Type Parameters:
        OUT - type of the returned stream
        Parameters:
        source - the user defined source
        sourceName - Name of the data source
        Returns:
        the data stream constructed
      • fromSource

        @Experimental
        public <OUT> DataStreamSource<OUT> fromSource​(Source<OUT,​?,​?> source,
                                                      WatermarkStrategy<OUT> timestampsAndWatermarks,
                                                      String sourceName,
                                                      TypeInformation<OUT> typeInfo)
        Adds a data Source to the environment to get a DataStream.

        The result will be either a bounded data stream (that can be processed in a batch way) or an unbounded data stream (that must be processed in a streaming way), based on the boundedness property of the source, as defined by Source.getBoundedness().

        This method takes an explicit type information for the produced data stream, so that callers can define directly what type/serializer will be used for the produced stream. For sources that describe their produced type, the method fromSource(Source, WatermarkStrategy, String) can be used to avoid specifying the produced type redundantly.

        Type Parameters:
        OUT - type of the returned stream
        Parameters:
        source - the user defined source
        sourceName - Name of the data source
        typeInfo - the user defined type information for the stream
        Returns:
        the data stream constructed
      • execute

        public JobExecutionResult execute()
                                   throws Exception
        Triggers the program execution. The environment will execute all parts of the program that have resulted in a "sink" operation. Sink operations are for example printing results or forwarding them to a message queue.

        The program execution will be logged and displayed with a generated default name.

        Returns:
        The result of the job execution, containing elapsed time and accumulators.
        Throws:
        Exception - which occurs during job execution.
      • execute

        public JobExecutionResult execute​(String jobName)
                                   throws Exception
        Triggers the program execution. The environment will execute all parts of the program that have resulted in a "sink" operation. Sink operations are for example printing results or forwarding them to a message queue.

        The program execution will be logged and displayed with the provided name

        Parameters:
        jobName - Desired name of the job
        Returns:
        The result of the job execution, containing elapsed time and accumulators.
        Throws:
        Exception - which occurs during job execution.
      • execute

        @Internal
        public JobExecutionResult execute​(StreamGraph streamGraph)
                                   throws Exception
        Triggers the program execution. The environment will execute all parts of the program that have resulted in a "sink" operation. Sink operations are for example printing results or forwarding them to a message queue.
        Parameters:
        streamGraph - the stream graph representing the transformations
        Returns:
        The result of the job execution, containing elapsed time and accumulators.
        Throws:
        Exception - which occurs during job execution.
      • executeAsync

        @PublicEvolving
        public final JobClient executeAsync()
                                     throws Exception
        Triggers the program asynchronously. The environment will execute all parts of the program that have resulted in a "sink" operation. Sink operations are for example printing results or forwarding them to a message queue.

        The program execution will be logged and displayed with a generated default name.

        Returns:
        A JobClient that can be used to communicate with the submitted job, completed on submission succeeded.
        Throws:
        Exception - which occurs during job execution.
      • executeAsync

        @PublicEvolving
        public JobClient executeAsync​(String jobName)
                               throws Exception
        Triggers the program execution asynchronously. The environment will execute all parts of the program that have resulted in a "sink" operation. Sink operations are for example printing results or forwarding them to a message queue.

        The program execution will be logged and displayed with the provided name

        Parameters:
        jobName - desired name of the job
        Returns:
        A JobClient that can be used to communicate with the submitted job, completed on submission succeeded.
        Throws:
        Exception - which occurs during job execution.
      • executeAsync

        @Internal
        public JobClient executeAsync​(StreamGraph streamGraph)
                               throws Exception
        Triggers the program execution asynchronously. The environment will execute all parts of the program that have resulted in a "sink" operation. Sink operations are for example printing results or forwarding them to a message queue.
        Parameters:
        streamGraph - the stream graph representing the transformations
        Returns:
        A JobClient that can be used to communicate with the submitted job, completed on submission succeeded.
        Throws:
        Exception - which occurs during job execution.
      • getStreamGraph

        @Internal
        public StreamGraph getStreamGraph​(boolean clearTransformations)
        Getter of the StreamGraph of the streaming job with the option to clear previously registered transformations. Clearing the transformations allows, for example, to not re-execute the same operations when calling execute() multiple times.
        Parameters:
        clearTransformations - Whether or not to clear previously registered transformations
        Returns:
        The stream graph representing the transformations
      • generateStreamGraph

        @Internal
        public StreamGraph generateStreamGraph​(List<Transformation<?>> transformations)
        Generates a StreamGraph that consists of the given transformations and is configured with the configuration of this environment.

        This method does not access or clear the previously registered transformations.

        Parameters:
        transformations - list of transformations that the graph should contain
        Returns:
        The stream graph representing the transformations
      • getExecutionPlan

        public String getExecutionPlan()
        Creates the plan with which the system will execute the program, and returns it as a String using a JSON representation of the execution data flow graph. Note that this needs to be called, before the plan is executed.
        Returns:
        The execution plan of the program, as a JSON String.
      • clean

        @Internal
        public <F> F clean​(F f)
        Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning is not disabled in the ExecutionConfig
      • addOperator

        @Internal
        public void addOperator​(Transformation<?> transformation)
        Adds an operator to the list of operators that should be executed when calling execute().

        When calling execute() only the operators that where previously added to the list are executed.

        This is not meant to be used by users. The API methods that create operators must call this method.

      • getExecutionEnvironment

        public static StreamExecutionEnvironment getExecutionEnvironment()
        Creates an execution environment that represents the context in which the program is currently executed. If the program is invoked standalone, this method returns a local execution environment, as returned by createLocalEnvironment().
        Returns:
        The execution environment of the context in which the program is executed.
      • getExecutionEnvironment

        public static StreamExecutionEnvironment getExecutionEnvironment​(Configuration configuration)
        Creates an execution environment that represents the context in which the program is currently executed. If the program is invoked standalone, this method returns a local execution environment, as returned by createLocalEnvironment(Configuration).

        When executed from the command line the given configuration is stacked on top of the global configuration which comes from the config.yaml, potentially overriding duplicated options.

        Parameters:
        configuration - The configuration to instantiate the environment with.
        Returns:
        The execution environment of the context in which the program is executed.
      • createLocalEnvironment

        public static LocalStreamEnvironment createLocalEnvironment()
        Creates a LocalStreamEnvironment. The local execution environment will run the program in a multi-threaded fashion in the same JVM as the environment was created in. The default parallelism of the local environment is the number of hardware contexts (CPU cores / threads), unless it was specified differently by setParallelism(int).
        Returns:
        A local execution environment.
      • createLocalEnvironment

        public static LocalStreamEnvironment createLocalEnvironment​(int parallelism)
        Creates a LocalStreamEnvironment. The local execution environment will run the program in a multi-threaded fashion in the same JVM as the environment was created in. It will use the parallelism specified in the parameter.
        Parameters:
        parallelism - The parallelism for the local environment.
        Returns:
        A local execution environment with the specified parallelism.
      • createLocalEnvironment

        public static LocalStreamEnvironment createLocalEnvironment​(int parallelism,
                                                                    Configuration configuration)
        Creates a LocalStreamEnvironment. The local execution environment will run the program in a multi-threaded fashion in the same JVM as the environment was created in. It will use the parallelism specified in the parameter.
        Parameters:
        parallelism - The parallelism for the local environment.
        configuration - Pass a custom configuration into the cluster
        Returns:
        A local execution environment with the specified parallelism.
      • createLocalEnvironment

        public static LocalStreamEnvironment createLocalEnvironment​(Configuration configuration)
        Creates a LocalStreamEnvironment. The local execution environment will run the program in a multi-threaded fashion in the same JVM as the environment was created in.
        Parameters:
        configuration - Pass a custom configuration into the cluster
        Returns:
        A local execution environment with the specified parallelism.
      • createLocalEnvironmentWithWebUI

        @PublicEvolving
        public static StreamExecutionEnvironment createLocalEnvironmentWithWebUI​(Configuration conf)
        Creates a LocalStreamEnvironment for local program execution that also starts the web monitoring UI.

        The local execution environment will run the program in a multi-threaded fashion in the same JVM as the environment was created in. It will use the parallelism specified in the parameter.

        If the configuration key 'rest.port' was set in the configuration, that particular port will be used for the web UI. Otherwise, the default port (8081) will be used.

      • createRemoteEnvironment

        public static StreamExecutionEnvironment createRemoteEnvironment​(String host,
                                                                         int port,
                                                                         String... jarFiles)
        Creates a RemoteStreamEnvironment. The remote environment sends (parts of) the program to a cluster for execution. Note that all file paths used in the program must be accessible from the cluster. The execution will use no parallelism, unless the parallelism is set explicitly via setParallelism(int).
        Parameters:
        host - The host name or address of the master (JobManager), where the program should be executed.
        port - The port of the master (JobManager), where the program should be executed.
        jarFiles - The JAR files with code that needs to be shipped to the cluster. If the program uses user-defined functions, user-defined input formats, or any libraries, those must be provided in the JAR files.
        Returns:
        A remote environment that executes the program on a cluster.
      • createRemoteEnvironment

        public static StreamExecutionEnvironment createRemoteEnvironment​(String host,
                                                                         int port,
                                                                         int parallelism,
                                                                         String... jarFiles)
        Creates a RemoteStreamEnvironment. The remote environment sends (parts of) the program to a cluster for execution. Note that all file paths used in the program must be accessible from the cluster. The execution will use the specified parallelism.
        Parameters:
        host - The host name or address of the master (JobManager), where the program should be executed.
        port - The port of the master (JobManager), where the program should be executed.
        parallelism - The parallelism to use during the execution.
        jarFiles - The JAR files with code that needs to be shipped to the cluster. If the program uses user-defined functions, user-defined input formats, or any libraries, those must be provided in the JAR files.
        Returns:
        A remote environment that executes the program on a cluster.
      • createRemoteEnvironment

        public static StreamExecutionEnvironment createRemoteEnvironment​(String host,
                                                                         int port,
                                                                         Configuration clientConfig,
                                                                         String... jarFiles)
        Creates a RemoteStreamEnvironment. The remote environment sends (parts of) the program to a cluster for execution. Note that all file paths used in the program must be accessible from the cluster. The execution will use the specified parallelism.
        Parameters:
        host - The host name or address of the master (JobManager), where the program should be executed.
        port - The port of the master (JobManager), where the program should be executed.
        clientConfig - The configuration used by the client that connects to the remote cluster.
        jarFiles - The JAR files with code that needs to be shipped to the cluster. If the program uses user-defined functions, user-defined input formats, or any libraries, those must be provided in the JAR files.
        Returns:
        A remote environment that executes the program on a cluster.
      • getDefaultLocalParallelism

        @PublicEvolving
        public static int getDefaultLocalParallelism()
        Gets the default parallelism that will be used for the local execution environment created by createLocalEnvironment().
        Returns:
        The default local parallelism
      • setDefaultLocalParallelism

        @PublicEvolving
        public static void setDefaultLocalParallelism​(int parallelism)
        Sets the default parallelism that will be used for the local execution environment created by createLocalEnvironment().
        Parameters:
        parallelism - The parallelism to use as the default local parallelism.
      • resetContextEnvironment

        protected static void resetContextEnvironment()
      • registerCachedFile

        public void registerCachedFile​(String filePath,
                                       String name)
        Registers a file at the distributed cache under the given name. The file will be accessible from any user-defined function in the (distributed) runtime under a local path. Files may be local files (which will be distributed via BlobServer), or files in a distributed file system. The runtime will copy the files temporarily to a local cache, if needed.

        The RuntimeContext can be obtained inside UDFs via RichFunction.getRuntimeContext() and provides access DistributedCache via RuntimeContext.getDistributedCache().

        Parameters:
        filePath - The path of the file, as a URI (e.g. "file:///some/path" or "hdfs://host:port/and/path")
        name - The name under which the file is registered.
      • registerCachedFile

        public void registerCachedFile​(String filePath,
                                       String name,
                                       boolean executable)
        Registers a file at the distributed cache under the given name. The file will be accessible from any user-defined function in the (distributed) runtime under a local path. Files may be local files (which will be distributed via BlobServer), or files in a distributed file system. The runtime will copy the files temporarily to a local cache, if needed.

        The RuntimeContext can be obtained inside UDFs via RichFunction.getRuntimeContext() and provides access DistributedCache via RuntimeContext.getDistributedCache().

        Parameters:
        filePath - The path of the file, as a URI (e.g. "file:///some/path" or "hdfs://host:port/and/path")
        name - The name under which the file is registered.
        executable - flag indicating whether the file should be executable
      • areExplicitEnvironmentsAllowed

        @Internal
        public static boolean areExplicitEnvironmentsAllowed()
        Checks whether it is currently permitted to explicitly instantiate a LocalEnvironment or a RemoteEnvironment.
        Returns:
        True, if it is possible to explicitly instantiate a LocalEnvironment or a RemoteEnvironment, false otherwise.
      • listCompletedClusterDatasets

        protected Set<AbstractID> listCompletedClusterDatasets()
      • close

        public void close()
                   throws Exception
        Close and clean up the execution environment. All the cached intermediate results will be released physically.
        Specified by:
        close in interface AutoCloseable
        Throws:
        Exception