public class DefaultDeclarativeSlotPool extends Object implements DeclarativeSlotPool
DeclarativeSlotPool
implementation.
The implementation collects the current resource requirements and declares them at the ResourceManager. Whenever new slots are offered, the slot pool compares the offered slots to the set of available and required resources and only accepts those slots which are required.
Slots which are released won't be returned directly to their owners. Instead, the slot pool implementation will only return them after the idleSlotTimeout has been exceeded by a free slot.
The slot pool will call newSlotsListener
whenever newly offered slots are accepted or
if an allocated slot should become free after it is being freed
.
This class expects 1 of 2 access patterns for changing requirements, which should not be mixed:
1) the legacy approach (used by the DefaultScheduler) tightly couples requirements to reserved
slots. When a slot is requested it increases the requirements, when the slot is freed they are
decreased again. In the general case what happens is that requirements are increased, a free slot
is reserved, (the slot is used for a bit,) the slot is freed, the requirements are reduced. To
this end freeReservedSlot(org.apache.flink.runtime.clusterframework.types.AllocationID, java.lang.Throwable, long)
, releaseSlot(org.apache.flink.runtime.clusterframework.types.AllocationID, java.lang.Exception)
and releaseSlots(org.apache.flink.runtime.clusterframework.types.ResourceID, java.lang.Exception)
return a
ResourceCounter
describing which requirement the slot(s) were fulfilling, with the
expectation that the scheduler will subsequently decrease the requirements by that amount.
2) The declarative approach (used by the AdaptiveScheduler) in contrast derives requirements exclusively based on what a given job currently requires. It may repeatedly reserve/free slots without any modifications to the requirements.
DeclarativeSlotPool.NewSlotsListener, DeclarativeSlotPool.NoOpNewSlotsListener
Modifier and Type | Field and Description |
---|---|
protected org.slf4j.Logger |
log |
protected AllocatedSlotPool |
slotPool |
Constructor and Description |
---|
DefaultDeclarativeSlotPool(JobID jobId,
AllocatedSlotPool slotPool,
java.util.function.Consumer<? super Collection<ResourceRequirement>> notifyNewResourceRequirements,
Time idleSlotTimeout,
Time rpcTimeout) |
Modifier and Type | Method and Description |
---|---|
boolean |
containsFreeSlot(AllocationID allocationId)
Checks whether the slot pool contains a slot with the given
AllocationID and if it is
free. |
boolean |
containsSlots(ResourceID owner)
Returns whether the slot pool has a slot registered which is owned by the given TaskExecutor.
|
void |
decreaseResourceRequirementsBy(ResourceCounter decrement)
Decreases the resource requirements by decrement.
|
ResourceCounter |
freeReservedSlot(AllocationID allocationId,
Throwable cause,
long currentTime)
Frees the reserved slot identified by the given allocationId.
|
Collection<? extends SlotInfo> |
getAllSlotsInformation()
Returns the slot information for all slots (free and allocated slots).
|
Collection<SlotInfoWithUtilization> |
getFreeSlotsInformation()
Returns the slot information for all free slots (slots which can be allocated from the slot
pool).
|
Collection<ResourceRequirement> |
getResourceRequirements()
Returns the current resource requirements.
|
void |
increaseResourceRequirementsBy(ResourceCounter increment)
Increases the resource requirements by increment.
|
Collection<SlotOffer> |
offerSlots(Collection<? extends SlotOffer> offers,
TaskManagerLocation taskManagerLocation,
TaskManagerGateway taskManagerGateway,
long currentTime)
Offers slots to this slot pool.
|
void |
registerNewSlotsListener(DeclarativeSlotPool.NewSlotsListener newSlotsListener)
Registers a listener which is called whenever new slots become available.
|
Collection<SlotOffer> |
registerSlots(Collection<? extends SlotOffer> slots,
TaskManagerLocation taskManagerLocation,
TaskManagerGateway taskManagerGateway,
long currentTime)
Registers the given set of slots at the slot pool.
|
void |
releaseIdleSlots(long currentTimeMillis)
Releases slots which have exceeded the idle slot timeout and are no longer needed to fulfill
the resource requirements.
|
ResourceCounter |
releaseSlot(AllocationID allocationId,
Exception cause)
Releases the slot specified by allocationId if one exists.
|
ResourceCounter |
releaseSlots(ResourceID owner,
Exception cause)
Releases all slots belonging to the owning TaskExecutor if it has been registered.
|
PhysicalSlot |
reserveFreeSlot(AllocationID allocationId,
ResourceProfile requiredSlotProfile)
Reserves the free slot identified by the given allocationId and maps it to the given
requiredSlotProfile.
|
void |
setResourceRequirements(ResourceCounter resourceRequirements)
Sets the resource requirements to the given resourceRequirements.
|
protected final org.slf4j.Logger log
protected final AllocatedSlotPool slotPool
public DefaultDeclarativeSlotPool(JobID jobId, AllocatedSlotPool slotPool, java.util.function.Consumer<? super Collection<ResourceRequirement>> notifyNewResourceRequirements, Time idleSlotTimeout, Time rpcTimeout)
public void increaseResourceRequirementsBy(ResourceCounter increment)
DeclarativeSlotPool
increaseResourceRequirementsBy
in interface DeclarativeSlotPool
increment
- increment by which to increase the resource requirementspublic void decreaseResourceRequirementsBy(ResourceCounter decrement)
DeclarativeSlotPool
decreaseResourceRequirementsBy
in interface DeclarativeSlotPool
decrement
- decrement by which to decrease the resource requirementspublic void setResourceRequirements(ResourceCounter resourceRequirements)
DeclarativeSlotPool
setResourceRequirements
in interface DeclarativeSlotPool
resourceRequirements
- new resource requirementspublic Collection<ResourceRequirement> getResourceRequirements()
DeclarativeSlotPool
getResourceRequirements
in interface DeclarativeSlotPool
public Collection<SlotOffer> offerSlots(Collection<? extends SlotOffer> offers, TaskManagerLocation taskManagerLocation, TaskManagerGateway taskManagerGateway, long currentTime)
DeclarativeSlotPool
offerSlots
in interface DeclarativeSlotPool
offers
- offers containing the list of slots offered to this slot pooltaskManagerLocation
- taskManagerLocation is the location of the offering TaskExecutortaskManagerGateway
- taskManagerGateway is the gateway to talk to the offering
TaskExecutorcurrentTime
- currentTime is the time the slots are being offeredpublic Collection<SlotOffer> registerSlots(Collection<? extends SlotOffer> slots, TaskManagerLocation taskManagerLocation, TaskManagerGateway taskManagerGateway, long currentTime)
DeclarativeSlotPool
The difference from DeclarativeSlotPool.offerSlots(java.util.Collection<? extends org.apache.flink.runtime.taskexecutor.slot.SlotOffer>, org.apache.flink.runtime.taskmanager.TaskManagerLocation, org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway, long)
is that this method allows accepting slots which
exceed the currently required, but the DeclarativeSlotPool.offerSlots(java.util.Collection<? extends org.apache.flink.runtime.taskexecutor.slot.SlotOffer>, org.apache.flink.runtime.taskmanager.TaskManagerLocation, org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway, long)
only accepts those slots that are
currently required.
registerSlots
in interface DeclarativeSlotPool
slots
- slots to registertaskManagerLocation
- taskManagerLocation is the location of the offering TaskExecutortaskManagerGateway
- taskManagerGateway is the gateway to talk to the offering
TaskExecutorcurrentTime
- currentTime is the time the slots are being offeredpublic PhysicalSlot reserveFreeSlot(AllocationID allocationId, ResourceProfile requiredSlotProfile)
DeclarativeSlotPool
reserveFreeSlot
in interface DeclarativeSlotPool
allocationId
- allocationId identifies the free slot to allocaterequiredSlotProfile
- requiredSlotProfile specifying the resource requirementpublic ResourceCounter freeReservedSlot(AllocationID allocationId, @Nullable Throwable cause, long currentTime)
DeclarativeSlotPool
Whether the freed slot is returned to the owning TaskExecutor is implementation dependent.
freeReservedSlot
in interface DeclarativeSlotPool
allocationId
- allocationId identifying the slot to releasecause
- cause for releasing the slot; can be null
currentTime
- currentTime when the slot was releasedpublic void registerNewSlotsListener(DeclarativeSlotPool.NewSlotsListener newSlotsListener)
DeclarativeSlotPool
registerNewSlotsListener
in interface DeclarativeSlotPool
newSlotsListener
- which is called whenever new slots become availablepublic ResourceCounter releaseSlots(ResourceID owner, Exception cause)
DeclarativeSlotPool
releaseSlots
in interface DeclarativeSlotPool
owner
- owner identifying the owning TaskExecutorcause
- cause for failing the slotspublic ResourceCounter releaseSlot(AllocationID allocationId, Exception cause)
DeclarativeSlotPool
releaseSlot
in interface DeclarativeSlotPool
allocationId
- allocationId identifying the slot to failcause
- cause for failing the slotpublic void releaseIdleSlots(long currentTimeMillis)
DeclarativeSlotPool
releaseIdleSlots
in interface DeclarativeSlotPool
currentTimeMillis
- current timepublic Collection<SlotInfoWithUtilization> getFreeSlotsInformation()
DeclarativeSlotPool
getFreeSlotsInformation
in interface DeclarativeSlotPool
public Collection<? extends SlotInfo> getAllSlotsInformation()
DeclarativeSlotPool
getAllSlotsInformation
in interface DeclarativeSlotPool
public boolean containsFreeSlot(AllocationID allocationId)
DeclarativeSlotPool
AllocationID
and if it is
free.containsFreeSlot
in interface DeclarativeSlotPool
allocationId
- allocationId specifies the slot to check fortrue
if the slot pool contains a free slot registered under the given
allocation id; otherwise false
public boolean containsSlots(ResourceID owner)
DeclarativeSlotPool
containsSlots
in interface DeclarativeSlotPool
owner
- owner identifying the TaskExecutor for which to check whether the slot pool has
some slots registeredCopyright © 2014–2024 The Apache Software Foundation. All rights reserved.