Class DefaultDeclarativeSlotPool
- java.lang.Object
-
- org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool
-
- All Implemented Interfaces:
DeclarativeSlotPool
- Direct Known Subclasses:
BlocklistDeclarativeSlotPool
public class DefaultDeclarativeSlotPool extends Object implements DeclarativeSlotPool
DefaultDeclarativeSlotPool
implementation.The implementation collects the current resource requirements and declares them at the ResourceManager. Whenever new slots are offered, the slot pool compares the offered slots to the set of available and required resources and only accepts those slots which are required.
Slots which are released won't be returned directly to their owners. Instead, the slot pool implementation will only return them after the idleSlotTimeout has been exceeded by a free slot.
The slot pool will call
newSlotsListener
whenever newly offered slots are accepted or if an allocated slot should become free after it is beingfreed
.This class expects 1 of 2 access patterns for changing requirements, which should not be mixed:
1) the legacy approach (used by the DefaultScheduler) tightly couples requirements to reserved slots. When a slot is requested it increases the requirements, when the slot is freed they are decreased again. In the general case what happens is that requirements are increased, a free slot is reserved, (the slot is used for a bit,) the slot is freed, the requirements are reduced. To this end
freeReservedSlot(org.apache.flink.runtime.clusterframework.types.AllocationID, java.lang.Throwable, long)
,releaseSlot(org.apache.flink.runtime.clusterframework.types.AllocationID, java.lang.Exception)
andreleaseSlots(org.apache.flink.runtime.clusterframework.types.ResourceID, java.lang.Exception)
return aResourceCounter
describing which requirement the slot(s) were fulfilling, with the expectation that the scheduler will subsequently decrease the requirements by that amount.2) The declarative approach (used by the AdaptiveScheduler) in contrast derives requirements exclusively based on what a given job currently requires. It may repeatedly reserve/free slots without any modifications to the requirements.
-
-
Nested Class Summary
-
Nested classes/interfaces inherited from interface org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPool
DeclarativeSlotPool.NewSlotsListener, DeclarativeSlotPool.NoOpNewSlotsListener
-
-
Field Summary
Fields Modifier and Type Field Description protected org.slf4j.Logger
log
protected AllocatedSlotPool
slotPool
-
Constructor Summary
Constructors Constructor Description DefaultDeclarativeSlotPool(JobID jobId, AllocatedSlotPool slotPool, Consumer<? super Collection<ResourceRequirement>> notifyNewResourceRequirements, Duration idleSlotTimeout, Duration rpcTimeout, Duration slotRequestMaxInterval, ComponentMainThreadExecutor componentMainThreadExecutor)
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description boolean
containsFreeSlot(AllocationID allocationId)
Checks whether the slot pool contains a slot with the givenAllocationID
and if it is free.boolean
containsSlots(ResourceID owner)
Returns whether the slot pool has a slot registered which is owned by the given TaskExecutor.void
decreaseResourceRequirementsBy(ResourceCounter decrement)
Decreases the resource requirements by decrement.ResourceCounter
freeReservedSlot(AllocationID allocationId, Throwable cause, long currentTime)
Frees the reserved slot identified by the given allocationId.Collection<? extends SlotInfo>
getAllSlotsInformation()
Returns the slot information for all slots (free and allocated slots).FreeSlotTracker
getFreeSlotTracker()
Returns the free slot tracker.Collection<ResourceRequirement>
getResourceRequirements()
Returns the current resource requirements.void
increaseResourceRequirementsBy(ResourceCounter increment)
Increases the resource requirements by increment.Collection<SlotOffer>
offerSlots(Collection<? extends SlotOffer> offers, TaskManagerLocation taskManagerLocation, TaskManagerGateway taskManagerGateway, long currentTime)
Offers slots to this slot pool.void
registerNewSlotsListener(DeclarativeSlotPool.NewSlotsListener newSlotsListener)
Registers a listener which is called whenever new slots become available.Collection<SlotOffer>
registerSlots(Collection<? extends SlotOffer> slots, TaskManagerLocation taskManagerLocation, TaskManagerGateway taskManagerGateway, long currentTime)
Registers the given set of slots at the slot pool.void
releaseIdleSlots(long currentTimeMillis)
Releases slots which have exceeded the idle slot timeout and are no longer needed to fulfill the resource requirements.ResourceCounter
releaseSlot(AllocationID allocationId, Exception cause)
Releases the slot specified by allocationId if one exists.ResourceCounter
releaseSlots(ResourceID owner, Exception cause)
Releases all slots belonging to the owning TaskExecutor if it has been registered.PhysicalSlot
reserveFreeSlot(AllocationID allocationId, ResourceProfile requiredSlotProfile)
Reserves the free slot identified by the given allocationId and maps it to the given requiredSlotProfile.void
setResourceRequirements(ResourceCounter resourceRequirements)
Sets the resource requirements to the given resourceRequirements.
-
-
-
Field Detail
-
log
protected final org.slf4j.Logger log
-
slotPool
protected final AllocatedSlotPool slotPool
-
-
Constructor Detail
-
DefaultDeclarativeSlotPool
public DefaultDeclarativeSlotPool(JobID jobId, AllocatedSlotPool slotPool, Consumer<? super Collection<ResourceRequirement>> notifyNewResourceRequirements, Duration idleSlotTimeout, Duration rpcTimeout, Duration slotRequestMaxInterval, ComponentMainThreadExecutor componentMainThreadExecutor)
-
-
Method Detail
-
increaseResourceRequirementsBy
public void increaseResourceRequirementsBy(ResourceCounter increment)
Description copied from interface:DeclarativeSlotPool
Increases the resource requirements by increment.- Specified by:
increaseResourceRequirementsBy
in interfaceDeclarativeSlotPool
- Parameters:
increment
- increment by which to increase the resource requirements
-
decreaseResourceRequirementsBy
public void decreaseResourceRequirementsBy(ResourceCounter decrement)
Description copied from interface:DeclarativeSlotPool
Decreases the resource requirements by decrement.- Specified by:
decreaseResourceRequirementsBy
in interfaceDeclarativeSlotPool
- Parameters:
decrement
- decrement by which to decrease the resource requirements
-
setResourceRequirements
public void setResourceRequirements(ResourceCounter resourceRequirements)
Description copied from interface:DeclarativeSlotPool
Sets the resource requirements to the given resourceRequirements.- Specified by:
setResourceRequirements
in interfaceDeclarativeSlotPool
- Parameters:
resourceRequirements
- new resource requirements
-
getResourceRequirements
public Collection<ResourceRequirement> getResourceRequirements()
Description copied from interface:DeclarativeSlotPool
Returns the current resource requirements.- Specified by:
getResourceRequirements
in interfaceDeclarativeSlotPool
- Returns:
- current resource requirements
-
offerSlots
public Collection<SlotOffer> offerSlots(Collection<? extends SlotOffer> offers, TaskManagerLocation taskManagerLocation, TaskManagerGateway taskManagerGateway, long currentTime)
Description copied from interface:DeclarativeSlotPool
Offers slots to this slot pool. The slot pool is free to accept as many slots as it needs.- Specified by:
offerSlots
in interfaceDeclarativeSlotPool
- Parameters:
offers
- offers containing the list of slots offered to this slot pooltaskManagerLocation
- taskManagerLocation is the location of the offering TaskExecutortaskManagerGateway
- taskManagerGateway is the gateway to talk to the offering TaskExecutorcurrentTime
- currentTime is the time the slots are being offered- Returns:
- collection of accepted slots; the other slot offers are implicitly rejected
-
registerSlots
public Collection<SlotOffer> registerSlots(Collection<? extends SlotOffer> slots, TaskManagerLocation taskManagerLocation, TaskManagerGateway taskManagerGateway, long currentTime)
Description copied from interface:DeclarativeSlotPool
Registers the given set of slots at the slot pool. The slot pool will try to accept all slots unless the slot is unavailable (for example, the TaskManger is blocked).The difference from
DeclarativeSlotPool.offerSlots(java.util.Collection<? extends org.apache.flink.runtime.taskexecutor.slot.SlotOffer>, org.apache.flink.runtime.taskmanager.TaskManagerLocation, org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway, long)
is that this method allows accepting slots which exceed the currently required, but theDeclarativeSlotPool.offerSlots(java.util.Collection<? extends org.apache.flink.runtime.taskexecutor.slot.SlotOffer>, org.apache.flink.runtime.taskmanager.TaskManagerLocation, org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway, long)
only accepts those slots that are currently required.- Specified by:
registerSlots
in interfaceDeclarativeSlotPool
- Parameters:
slots
- slots to registertaskManagerLocation
- taskManagerLocation is the location of the offering TaskExecutortaskManagerGateway
- taskManagerGateway is the gateway to talk to the offering TaskExecutorcurrentTime
- currentTime is the time the slots are being offered- Returns:
- the successfully registered slots; the other slot offers are implicitly rejected
-
reserveFreeSlot
public PhysicalSlot reserveFreeSlot(AllocationID allocationId, ResourceProfile requiredSlotProfile)
Description copied from interface:DeclarativeSlotPool
Reserves the free slot identified by the given allocationId and maps it to the given requiredSlotProfile.- Specified by:
reserveFreeSlot
in interfaceDeclarativeSlotPool
- Parameters:
allocationId
- allocationId identifies the free slot to allocaterequiredSlotProfile
- requiredSlotProfile specifying the resource requirement- Returns:
- a PhysicalSlot representing the allocated slot
-
freeReservedSlot
public ResourceCounter freeReservedSlot(AllocationID allocationId, @Nullable Throwable cause, long currentTime)
Description copied from interface:DeclarativeSlotPool
Frees the reserved slot identified by the given allocationId. If no slot with allocationId exists, then the call is ignored.Whether the freed slot is returned to the owning TaskExecutor is implementation dependent.
- Specified by:
freeReservedSlot
in interfaceDeclarativeSlotPool
- Parameters:
allocationId
- allocationId identifying the slot to releasecause
- cause for releasing the slot; can benull
currentTime
- currentTime when the slot was released- Returns:
- the resource requirements that the slot was fulfilling
-
registerNewSlotsListener
public void registerNewSlotsListener(DeclarativeSlotPool.NewSlotsListener newSlotsListener)
Description copied from interface:DeclarativeSlotPool
Registers a listener which is called whenever new slots become available.- Specified by:
registerNewSlotsListener
in interfaceDeclarativeSlotPool
- Parameters:
newSlotsListener
- which is called whenever new slots become available
-
releaseSlots
public ResourceCounter releaseSlots(ResourceID owner, Exception cause)
Description copied from interface:DeclarativeSlotPool
Releases all slots belonging to the owning TaskExecutor if it has been registered.- Specified by:
releaseSlots
in interfaceDeclarativeSlotPool
- Parameters:
owner
- owner identifying the owning TaskExecutorcause
- cause for failing the slots- Returns:
- the resource requirements that all slots were fulfilling; empty if all slots were currently free
-
releaseSlot
public ResourceCounter releaseSlot(AllocationID allocationId, Exception cause)
Description copied from interface:DeclarativeSlotPool
Releases the slot specified by allocationId if one exists.- Specified by:
releaseSlot
in interfaceDeclarativeSlotPool
- Parameters:
allocationId
- allocationId identifying the slot to failcause
- cause for failing the slot- Returns:
- the resource requirements that the slot was fulfilling; empty if the slot was currently free
-
releaseIdleSlots
public void releaseIdleSlots(long currentTimeMillis)
Description copied from interface:DeclarativeSlotPool
Releases slots which have exceeded the idle slot timeout and are no longer needed to fulfill the resource requirements.- Specified by:
releaseIdleSlots
in interfaceDeclarativeSlotPool
- Parameters:
currentTimeMillis
- current time
-
getFreeSlotTracker
public FreeSlotTracker getFreeSlotTracker()
Description copied from interface:DeclarativeSlotPool
Returns the free slot tracker.- Specified by:
getFreeSlotTracker
in interfaceDeclarativeSlotPool
- Returns:
- free slot tracker
-
getAllSlotsInformation
public Collection<? extends SlotInfo> getAllSlotsInformation()
Description copied from interface:DeclarativeSlotPool
Returns the slot information for all slots (free and allocated slots).- Specified by:
getAllSlotsInformation
in interfaceDeclarativeSlotPool
- Returns:
- collection of slot information
-
containsFreeSlot
public boolean containsFreeSlot(AllocationID allocationId)
Description copied from interface:DeclarativeSlotPool
Checks whether the slot pool contains a slot with the givenAllocationID
and if it is free.- Specified by:
containsFreeSlot
in interfaceDeclarativeSlotPool
- Parameters:
allocationId
- allocationId specifies the slot to check for- Returns:
true
if the slot pool contains a free slot registered under the given allocation id; otherwisefalse
-
containsSlots
public boolean containsSlots(ResourceID owner)
Description copied from interface:DeclarativeSlotPool
Returns whether the slot pool has a slot registered which is owned by the given TaskExecutor.- Specified by:
containsSlots
in interfaceDeclarativeSlotPool
- Parameters:
owner
- owner identifying the TaskExecutor for which to check whether the slot pool has some slots registered- Returns:
- true if the given TaskExecutor has a slot registered at the slot pool
-
-