Class Transformation<T>

  • Type Parameters:
    T - The type of the elements that result from this Transformation
    Direct Known Subclasses:
    CacheTransformation, PartitionTransformation, PhysicalTransformation, SideOutputTransformation, SourceTransformationWrapper, UnionTransformation

    @Internal
    public abstract class Transformation<T>
    extends Object
    A Transformation represents the operation that creates a DataStream. Every DataStream has an underlying Transformation that is the origin of said DataStream.

    API operations such as DataStream#map create a tree of Transformations underneath. When the stream program is to be executed this graph is translated to a StreamGraph using StreamGraphGenerator.

    A Transformation does not necessarily correspond to a physical operation at runtime. Some operations are only logical concepts. Examples of this are union, split/select data stream, partitioning.

    The following graph of Transformations:

    
       Source              Source
          +                   +
          |                   |
          v                   v
      Rebalance          HashPartition
          +                   +
          |                   |
          |                   |
          +------>Union<------+
                    +
                    |
                    v
                  Split
                    +
                    |
                    v
                  Select
                    +
                    v
                   Map
                    +
                    |
                    v
                  Sink
     

    Would result in this graph of operations at runtime:

    
     Source              Source
       +                   +
       |                   |
       |                   |
       +------->Map<-------+
                 +
                 |
                 v
                Sink
     

    The information about partitioning, union, split/select end up being encoded in the edges that connect the sources to the map operation.

    • Field Detail

      • UPPER_BOUND_MAX_PARALLELISM

        public static final int UPPER_BOUND_MAX_PARALLELISM
        See Also:
        Constant Field Values
      • id

        protected final int id
      • description

        protected String description
      • typeUsed

        protected boolean typeUsed
      • bufferTimeout

        protected long bufferTimeout
    • Constructor Detail

      • Transformation

        public Transformation​(String name,
                              TypeInformation<T> outputType,
                              int parallelism)
        Creates a new Transformation with the given name, output type and parallelism.
        Parameters:
        name - The name of the Transformation, this will be shown in Visualizations and the Log
        outputType - The output type of this Transformation
        parallelism - The parallelism of this Transformation
      • Transformation

        public Transformation​(String name,
                              TypeInformation<T> outputType,
                              int parallelism,
                              boolean parallelismConfigured)
        Creates a new Transformation with the given name, output type and parallelism.
        Parameters:
        name - The name of the Transformation, this will be shown in Visualizations and the Log
        outputType - The output type of this Transformation
        parallelism - The parallelism of this Transformation
        parallelismConfigured - If true, the parallelism of the transformation is explicitly set and should be respected. Otherwise the parallelism can be changed at runtime.
    • Method Detail

      • getNewNodeId

        public static int getNewNodeId()
      • getId

        public int getId()
        Returns the unique ID of this Transformation.
      • setName

        public void setName​(String name)
        Changes the name of this Transformation.
      • getName

        public String getName()
        Returns the name of this Transformation.
      • setDescription

        public void setDescription​(String description)
        Changes the description of this Transformation.
      • getDescription

        public String getDescription()
        Returns the description of this Transformation.
      • getParallelism

        public int getParallelism()
        Returns the parallelism of this Transformation.
      • setParallelism

        public void setParallelism​(int parallelism)
        Sets the parallelism of this Transformation.
        Parameters:
        parallelism - The new parallelism to set on this Transformation.
      • setParallelism

        public void setParallelism​(int parallelism,
                                   boolean parallelismConfigured)
      • isParallelismConfigured

        public boolean isParallelismConfigured()
      • getMaxParallelism

        public int getMaxParallelism()
        Gets the maximum parallelism for this stream transformation.
        Returns:
        Maximum parallelism of this transformation.
      • setMaxParallelism

        public void setMaxParallelism​(int maxParallelism)
        Sets the maximum parallelism for this stream transformation.
        Parameters:
        maxParallelism - Maximum parallelism for this stream transformation.
      • setResources

        public void setResources​(ResourceSpec minResources,
                                 ResourceSpec preferredResources)
        Sets the minimum and preferred resources for this stream transformation.
        Parameters:
        minResources - The minimum resource of this transformation.
        preferredResources - The preferred resource of this transformation.
      • getMinResources

        public ResourceSpec getMinResources()
        Gets the minimum resource of this stream transformation.
        Returns:
        The minimum resource of this transformation.
      • getPreferredResources

        public ResourceSpec getPreferredResources()
        Gets the preferred resource of this stream transformation.
        Returns:
        The preferred resource of this transformation.
      • declareManagedMemoryUseCaseAtOperatorScope

        public Optional<Integer> declareManagedMemoryUseCaseAtOperatorScope​(ManagedMemoryUseCase managedMemoryUseCase,
                                                                            int weight)
        Declares that this transformation contains certain operator scope managed memory use case.
        Parameters:
        managedMemoryUseCase - The use case that this transformation declares needing managed memory for.
        weight - Use-case-specific weights for this transformation. Used for sharing managed memory across transformations for OPERATOR scope use cases. Check the individual ManagedMemoryUseCase for the specific weight definition.
        Returns:
        The previous weight, if exist.
      • declareManagedMemoryUseCaseAtSlotScope

        public void declareManagedMemoryUseCaseAtSlotScope​(ManagedMemoryUseCase managedMemoryUseCase)
        Declares that this transformation contains certain slot scope managed memory use case.
        Parameters:
        managedMemoryUseCase - The use case that this transformation declares needing managed memory for.
      • updateManagedMemoryStateBackendUseCase

        protected void updateManagedMemoryStateBackendUseCase​(boolean hasStateBackend)
      • getManagedMemoryOperatorScopeUseCaseWeights

        public Map<ManagedMemoryUseCase,​Integer> getManagedMemoryOperatorScopeUseCaseWeights()
        Get operator scope use cases that this transformation needs managed memory for, and the use-case-specific weights for this transformation. The weights are used for sharing managed memory across transformations for the use cases. Check the individual ManagedMemoryUseCase for the specific weight definition.
      • getManagedMemorySlotScopeUseCases

        public Set<ManagedMemoryUseCase> getManagedMemorySlotScopeUseCases()
        Get slot scope use cases that this transformation needs managed memory for.
      • setUidHash

        public void setUidHash​(String uidHash)
        Sets an user provided hash for this operator. This will be used AS IS the create the JobVertexID.

        The user provided hash is an alternative to the generated hashes, that is considered when identifying an operator through the default hash mechanics fails (e.g. because of changes between Flink versions).

        Important: this should be used as a workaround or for trouble shooting. The provided hash needs to be unique per transformation and job. Otherwise, job submission will fail. Furthermore, you cannot assign user-specified hash to intermediate nodes in an operator chain and trying so will let your job fail.

        A use case for this is in migration between Flink versions or changing the jobs in a way that changes the automatically generated hashes. In this case, providing the previous hashes directly through this method (e.g. obtained from old logs) can help to reestablish a lost mapping from states to their target operator.

        Parameters:
        uidHash - The user provided hash for this operator. This will become the JobVertexID, which is shown in the logs and web ui.
      • getUserProvidedNodeHash

        public String getUserProvidedNodeHash()
        Gets the user provided hash.
        Returns:
        The user provided hash.
      • setUid

        public void setUid​(String uid)
        Sets an ID for this Transformation. This is will later be hashed to a uidHash which is then used to create the JobVertexID (that is shown in logs and the web ui).

        The specified ID is used to assign the same operator ID across job submissions (for example when starting a job from a savepoint).

        Important: this ID needs to be unique per transformation and job. Otherwise, job submission will fail.

        Parameters:
        uid - The unique user-specified ID of this transformation.
      • getUid

        public String getUid()
        Returns the user-specified ID of this transformation.
        Returns:
        The unique user-specified ID of this transformation.
      • setSlotSharingGroup

        public void setSlotSharingGroup​(String slotSharingGroupName)
        Sets the slot sharing group of this transformation. Parallel instances of operations that are in the same slot sharing group will be co-located in the same TaskManager slot, if possible.

        Initially, an operation is in the default slot sharing group. This can be explicitly set using setSlotSharingGroup("default").

        Parameters:
        slotSharingGroupName - The slot sharing group's name.
      • setSlotSharingGroup

        public void setSlotSharingGroup​(SlotSharingGroup slotSharingGroup)
        Sets the slot sharing group of this transformation. Parallel instances of operations that are in the same slot sharing group will be co-located in the same TaskManager slot, if possible.

        Initially, an operation is in the default slot sharing group. This can be explicitly set with constructing a SlotSharingGroup with name "default".

        Parameters:
        slotSharingGroup - which contains name and its resource spec.
      • setCoLocationGroupKey

        public void setCoLocationGroupKey​(@Nullable
                                          String coLocationGroupKey)
        NOTE: This is an internal undocumented feature for now. It is not clear whether this will be supported and stable in the long term.

        Sets the key that identifies the co-location group. Operators with the same co-location key will have their corresponding subtasks placed into the same slot by the scheduler.

        Setting this to null means there is no co-location constraint.

      • getCoLocationGroupKey

        @Nullable
        public String getCoLocationGroupKey()
        NOTE: This is an internal undocumented feature for now. It is not clear whether this will be supported and stable in the long term.

        Gets the key that identifies the co-location group. Operators with the same co-location key will have their corresponding subtasks placed into the same slot by the scheduler.

        If this is null (which is the default), it means there is no co-location constraint.

      • setOutputType

        public void setOutputType​(TypeInformation<T> outputType)
        Tries to fill in the type information. Type information can be filled in later when the program uses a type hint. This method checks whether the type information has ever been accessed before and does not allow modifications if the type was accessed already. This ensures consistency by making sure different parts of the operation do not assume different type information.
        Parameters:
        outputType - The type information to fill in.
        Throws:
        IllegalStateException - Thrown, if the type information has been accessed before.
      • setBufferTimeout

        public void setBufferTimeout​(long bufferTimeout)
        Set the buffer timeout of this Transformation. The timeout defines how long data may linger in a partially full buffer before being sent over the network.

        Lower timeouts lead to lower tail latencies, but may affect throughput. For Flink 1.5+, timeouts of 1ms are feasible for jobs with high parallelism.

        A value of -1 means that the default buffer timeout should be used. A value of zero indicates that no buffering should happen, and all records/events should be immediately sent through the network, without additional buffering.

      • getBufferTimeout

        public long getBufferTimeout()
        Returns the buffer timeout of this Transformation.
        See Also:
        setBufferTimeout(long)
      • getTransitivePredecessorsInternal

        protected abstract List<Transformation<?>> getTransitivePredecessorsInternal()
        Returns all transitive predecessor Transformations of this Transformation. This is, for example, used when determining whether a feedback edge of an iteration actually has the iteration head as a predecessor.
        Returns:
        The list of transitive predecessors.
      • getTransitivePredecessors

        public final List<Transformation<?>> getTransitivePredecessors()
        Returns all transitive predecessor Transformations of this Transformation. This is, for example, used when determining whether a feedback edge of an iteration actually has the iteration head as a predecessor. This method is just a wrapper on top of getTransitivePredecessorsInternal method with public access. It uses caching internally.
        Returns:
        The list of transitive predecessors.
      • getInputs

        public abstract List<Transformation<?>> getInputs()
        Returns the transformations that are the immediate predecessors of the current transformation in the transformation graph.
      • hashCode

        public int hashCode()
        Overrides:
        hashCode in class Object
      • setAttribute

        public void setAttribute​(Attribute attribute)
      • getAttribute

        public Attribute getAttribute()