Interface ShuffleEnvironment<P extends ResultPartitionWriter,G extends IndexedInputGate>
-
- Type Parameters:
P
- type of provided result partition writersG
- type of provided input gates
- All Superinterfaces:
AutoCloseable
- All Known Implementing Classes:
NettyShuffleEnvironment
public interface ShuffleEnvironment<P extends ResultPartitionWriter,G extends IndexedInputGate> extends AutoCloseable
Interface for the implementation of shuffle service local environment.Input/Output interface of local shuffle service environment is based on memory
Buffers
. A producer can write shuffle data into the buffers, obtained from the createdResultPartitionWriters
and a consumer reads the buffers from the createdInputGates
.Lifecycle management.
The interface contains method's to manage the lifecycle of the local shuffle service environment:
start()
must be called before using the shuffle service environment.AutoCloseable.close()
is called to release the shuffle service environment.
Shuffle Input/Output management.
Result partition management.
The interface implements a factory of result partition writers to produce shuffle data:
createResultPartitionWriters(org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext, java.util.List<org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor>)
. The created writers are grouped per owner. The owner is responsible for the writers' lifecycle from the moment of creation.Partitions are fully released in the following cases:
ResultPartitionWriter.fail(Throwable)
andResultPartitionWriter.close()
are called if the production has failed.- for PIPELINED partitions if there was a detected consumption attempt and it either failed
or finished after the bounded production has been done (
ResultPartitionWriter.finish()
andResultPartitionWriter.close()
have been called). Only one consumption attempt is ever expected for the PIPELINED partition at the moment so it can be released afterwards. - if the following methods are called outside of the producer thread:
ShuffleMaster.releasePartitionExternally(ShuffleDescriptor)
- and if it occupies any producer local resources (
ShuffleDescriptor.storesLocalResourcesOn()
) then alsoreleasePartitionsLocally(Collection)
The partitions, which currently still occupy local resources, can be queried with
getPartitionsOccupyingLocalResources()
.Input gate management.
The interface implements a factory for the input gates:
createInputGates(org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext, org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider, java.util.List<org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor>)
. The created gates are grouped per owner. The owner is responsible for the gates' lifecycle from the moment of creation.When the input gates are created, it can happen that not all consumed partitions are known at that moment e.g. because their producers have not been started yet. Therefore, the
ShuffleEnvironment
provides a methodupdatePartitionInfo(org.apache.flink.runtime.executiongraph.ExecutionAttemptID, org.apache.flink.runtime.executiongraph.PartitionInfo)
to update them externally, when the producer becomes known. The update mechanism has to be threadsafe because the updated gate can be read concurrently from a different thread.
-
-
Method Summary
All Methods Instance Methods Abstract Methods Default Methods Modifier and Type Method Description List<G>
createInputGates(ShuffleIOOwnerContext ownerContext, PartitionProducerStateProvider partitionProducerStateProvider, List<InputGateDeploymentDescriptor> inputGateDeploymentDescriptors)
Factory method for theInputGates
to consume result partitions.List<P>
createResultPartitionWriters(ShuffleIOOwnerContext ownerContext, List<ResultPartitionDeploymentDescriptor> resultPartitionDeploymentDescriptors)
Factory method for theResultPartitionWriters
to produce result partitions.ShuffleIOOwnerContext
createShuffleIOOwnerContext(String ownerName, ExecutionAttemptID executionAttemptID, MetricGroup parentGroup)
Create a context of the shuffle input/output owner used to create partitions or gates belonging to the owner.default Optional<ShuffleMetrics>
getMetricsIfPartitionOccupyingLocalResource(ResultPartitionID partitionId)
Get metrics of the partition if it still occupies some resources locally and have not been released yet.Collection<ResultPartitionID>
getPartitionsOccupyingLocalResources()
Report partitions which still occupy some resources locally.void
releasePartitionsLocally(Collection<ResultPartitionID> partitionIds)
Release local resources occupied by the given partitions.int
start()
Start the internal related services before using the shuffle service environment.boolean
updatePartitionInfo(ExecutionAttemptID consumerID, PartitionInfo partitionInfo)
Update a gate with the newly available partition information, previously unknown.-
Methods inherited from interface java.lang.AutoCloseable
close
-
-
-
-
Method Detail
-
start
int start() throws IOException
Start the internal related services before using the shuffle service environment.- Returns:
- a port to connect for the shuffle data exchange, -1 if only local connection is possible.
- Throws:
IOException
-
createShuffleIOOwnerContext
ShuffleIOOwnerContext createShuffleIOOwnerContext(String ownerName, ExecutionAttemptID executionAttemptID, MetricGroup parentGroup)
Create a context of the shuffle input/output owner used to create partitions or gates belonging to the owner.This method has to be called only once to avoid duplicated internal metric group registration.
- Parameters:
ownerName
- the owner name, used for logsexecutionAttemptID
- execution attempt id of the producer or consumerparentGroup
- parent of shuffle specific metric group- Returns:
- context of the shuffle input/output owner used to create partitions or gates belonging to the owner
-
createResultPartitionWriters
List<P> createResultPartitionWriters(ShuffleIOOwnerContext ownerContext, List<ResultPartitionDeploymentDescriptor> resultPartitionDeploymentDescriptors)
Factory method for theResultPartitionWriters
to produce result partitions.The order of the
ResultPartitionWriters
in the returned collection should be the same as the iteration order of the passedresultPartitionDeploymentDescriptors
.- Parameters:
ownerContext
- the owner context relevant for partition creationresultPartitionDeploymentDescriptors
- descriptors of the partition, produced by the owner- Returns:
- list of the
ResultPartitionWriters
-
releasePartitionsLocally
void releasePartitionsLocally(Collection<ResultPartitionID> partitionIds)
Release local resources occupied by the given partitions.This is called for partitions which occupy resources locally (can be checked by
ShuffleDescriptor.storesLocalResourcesOn()
).- Parameters:
partitionIds
- identifying the partitions to be released
-
getPartitionsOccupyingLocalResources
Collection<ResultPartitionID> getPartitionsOccupyingLocalResources()
Report partitions which still occupy some resources locally.- Returns:
- collection of partitions which still occupy some resources locally and have not been released yet.
-
getMetricsIfPartitionOccupyingLocalResource
default Optional<ShuffleMetrics> getMetricsIfPartitionOccupyingLocalResource(ResultPartitionID partitionId)
Get metrics of the partition if it still occupies some resources locally and have not been released yet.- Parameters:
partitionId
- the partition id- Returns:
- An Optional of
ShuffleMetrics
, if found, of the given partition
-
createInputGates
List<G> createInputGates(ShuffleIOOwnerContext ownerContext, PartitionProducerStateProvider partitionProducerStateProvider, List<InputGateDeploymentDescriptor> inputGateDeploymentDescriptors)
Factory method for theInputGates
to consume result partitions.The order of the
InputGates
in the returned collection should be the same as the iteration order of the passedinputGateDeploymentDescriptors
.- Parameters:
ownerContext
- the owner context relevant for gate creationpartitionProducerStateProvider
- producer state provider to query whether the producer is ready for consumptioninputGateDeploymentDescriptors
- descriptors of the input gates to consume- Returns:
- list of the
InputGates
-
updatePartitionInfo
boolean updatePartitionInfo(ExecutionAttemptID consumerID, PartitionInfo partitionInfo) throws IOException, InterruptedException
Update a gate with the newly available partition information, previously unknown.- Parameters:
consumerID
- execution id to distinguish gates with the same id from the different consumer executionspartitionInfo
- information needed to consume the updated partition, e.g. network location- Returns:
true
if the partition has been updated orfalse
if the partition is not available anymore.- Throws:
IOException
- IO problem by the updateInterruptedException
- potentially blocking operation was interrupted
-
-