P
- type of provided result partition writersG
- type of provided input gatespublic interface ShuffleEnvironment<P extends ResultPartitionWriter,G extends IndexedInputGate> extends AutoCloseable
Input/Output interface of local shuffle service environment is based on memory Buffers
. A producer can write shuffle data into the buffers, obtained from the created ResultPartitionWriters
and a consumer reads the buffers from the created
InputGates
.
The interface contains method's to manage the lifecycle of the local shuffle service environment:
start()
must be called before using the shuffle service
environment.
AutoCloseable.close()
is called to release the shuffle service environment.
The interface implements a factory of result partition writers to produce shuffle data: createResultPartitionWriters(org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext, java.util.List<org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor>)
. The created writers are grouped per owner. The
owner is responsible for the writers' lifecycle from the moment of creation.
Partitions are fully released in the following cases:
ResultPartitionWriter.fail(Throwable)
and ResultPartitionWriter.close()
are
called if the production has failed.
ResultPartitionWriter.finish()
and ResultPartitionWriter.close()
have been
called). Only one consumption attempt is ever expected for the PIPELINED partition at the
moment so it can be released afterwards.
ShuffleMaster.releasePartitionExternally(ShuffleDescriptor)
ShuffleDescriptor.storesLocalResourcesOn()
) then also releasePartitionsLocally(Collection)
The partitions, which currently still occupy local resources, can be queried with getPartitionsOccupyingLocalResources()
.
The interface implements a factory for the input gates: createInputGates(org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext, org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider, java.util.List<org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor>)
. The created gates are grouped per owner. The owner is
responsible for the gates' lifecycle from the moment of creation.
When the input gates are created, it can happen that not all consumed partitions are known at
that moment e.g. because their producers have not been started yet. Therefore, the ShuffleEnvironment
provides a method updatePartitionInfo(org.apache.flink.runtime.executiongraph.ExecutionAttemptID, org.apache.flink.runtime.executiongraph.PartitionInfo)
to update
them externally, when the producer becomes known. The update mechanism has to be threadsafe
because the updated gate can be read concurrently from a different thread.
Modifier and Type | Method and Description |
---|---|
List<G> |
createInputGates(ShuffleIOOwnerContext ownerContext,
PartitionProducerStateProvider partitionProducerStateProvider,
List<InputGateDeploymentDescriptor> inputGateDeploymentDescriptors)
Factory method for the
InputGates to consume result partitions. |
List<P> |
createResultPartitionWriters(ShuffleIOOwnerContext ownerContext,
List<ResultPartitionDeploymentDescriptor> resultPartitionDeploymentDescriptors)
Factory method for the
ResultPartitionWriters to produce result
partitions. |
ShuffleIOOwnerContext |
createShuffleIOOwnerContext(String ownerName,
ExecutionAttemptID executionAttemptID,
MetricGroup parentGroup)
Create a context of the shuffle input/output owner used to create partitions or gates
belonging to the owner.
|
Collection<ResultPartitionID> |
getPartitionsOccupyingLocalResources()
Report partitions which still occupy some resources locally.
|
void |
releasePartitionsLocally(Collection<ResultPartitionID> partitionIds)
Release local resources occupied by the given partitions.
|
int |
start()
Start the internal related services before using the shuffle service environment.
|
boolean |
updatePartitionInfo(ExecutionAttemptID consumerID,
PartitionInfo partitionInfo)
Update a gate with the newly available partition information, previously unknown.
|
close
int start() throws IOException
IOException
ShuffleIOOwnerContext createShuffleIOOwnerContext(String ownerName, ExecutionAttemptID executionAttemptID, MetricGroup parentGroup)
This method has to be called only once to avoid duplicated internal metric group registration.
ownerName
- the owner name, used for logsexecutionAttemptID
- execution attempt id of the producer or consumerparentGroup
- parent of shuffle specific metric groupList<P> createResultPartitionWriters(ShuffleIOOwnerContext ownerContext, List<ResultPartitionDeploymentDescriptor> resultPartitionDeploymentDescriptors)
ResultPartitionWriters
to produce result
partitions.
The order of the ResultPartitionWriters
in the returned
collection should be the same as the iteration order of the passed resultPartitionDeploymentDescriptors
.
ownerContext
- the owner context relevant for partition creationresultPartitionDeploymentDescriptors
- descriptors of the partition, produced by the
ownerResultPartitionWriters
void releasePartitionsLocally(Collection<ResultPartitionID> partitionIds)
This is called for partitions which occupy resources locally (can be checked by ShuffleDescriptor.storesLocalResourcesOn()
).
partitionIds
- identifying the partitions to be releasedCollection<ResultPartitionID> getPartitionsOccupyingLocalResources()
List<G> createInputGates(ShuffleIOOwnerContext ownerContext, PartitionProducerStateProvider partitionProducerStateProvider, List<InputGateDeploymentDescriptor> inputGateDeploymentDescriptors)
InputGates
to consume result partitions.
The order of the InputGates
in the returned collection should be the
same as the iteration order of the passed inputGateDeploymentDescriptors
.
ownerContext
- the owner context relevant for gate creationpartitionProducerStateProvider
- producer state provider to query whether the producer
is ready for consumptioninputGateDeploymentDescriptors
- descriptors of the input gates to consumeInputGates
boolean updatePartitionInfo(ExecutionAttemptID consumerID, PartitionInfo partitionInfo) throws IOException, InterruptedException
consumerID
- execution id to distinguish gates with the same id from the different
consumer executionspartitionInfo
- information needed to consume the updated partition, e.g. network
locationtrue
if the partition has been updated or false
if the partition is
not available anymore.IOException
- IO problem by the updateInterruptedException
- potentially blocking operation was interruptedCopyright © 2014–2021 The Apache Software Foundation. All rights reserved.