public class SlotPoolImpl extends Object implements SlotPool
ExecutionGraph
. It will attempt to acquire new slots
from the ResourceManager when it cannot serve a slot request. If no ResourceManager is currently available,
or it gets a decline from the ResourceManager, or a request times out, it fails the slot request. The slot pool also
holds all the slots that were offered to it and accepted, and can thus provides registered free slots even if the
ResourceManager is down. The slots will only be released when they are useless, e.g. when the job is fully running
but we still have some free slots.
All the allocation or the slot offering will be identified by self generated AllocationID, we will use it to eliminate ambiguities.
TODO : Make pending requests location preference aware TODO : Make pass location preferences to ResourceManager when sending a slot request
Modifier and Type | Class and Description |
---|---|
protected static class |
SlotPoolImpl.AvailableSlots
Organize all available slots from different points of view.
|
Modifier and Type | Field and Description |
---|---|
protected org.slf4j.Logger |
log |
Constructor and Description |
---|
SlotPoolImpl(JobID jobId) |
SlotPoolImpl(JobID jobId,
Clock clock,
Time rpcTimeout,
Time idleSlotTimeout) |
Modifier and Type | Method and Description |
---|---|
Optional<PhysicalSlot> |
allocateAvailableSlot(SlotRequestId slotRequestId,
AllocationID allocationID)
Allocates the available slot with the given allocation id under the given request id.
|
void |
close() |
void |
connectToResourceManager(ResourceManagerGateway resourceManagerGateway)
Connects the SlotPool to the given ResourceManager.
|
AllocatedSlotReport |
createAllocatedSlotReport(ResourceID taskManagerId)
Create report about the allocated slots belonging to the specified task manager.
|
void |
disconnectResourceManager()
Disconnects the slot pool from its current Resource Manager.
|
Optional<ResourceID> |
failAllocation(AllocationID allocationID,
Exception cause)
Fail the specified allocation and release the corresponding slot if we have one.
|
protected org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.AllocatedSlots |
getAllocatedSlots() |
protected SlotPoolImpl.AvailableSlots |
getAvailableSlots() |
Collection<SlotInfo> |
getAvailableSlotsInformation()
Returns a list of
SlotInfo objects about all slots that are currently available in the slot
pool. |
Collection<SlotOffer> |
offerSlots(TaskManagerLocation taskManagerLocation,
TaskManagerGateway taskManagerGateway,
Collection<SlotOffer> offers)
Offers multiple slots to the
SlotPool . |
boolean |
registerTaskManager(ResourceID resourceID)
Register TaskManager to this pool, only those slots come from registered TaskManager will be considered valid.
|
void |
releaseSlot(SlotRequestId slotRequestId,
Throwable cause)
Releases the slot with the given
SlotRequestId . |
boolean |
releaseTaskManager(ResourceID resourceId,
Exception cause)
Unregister TaskManager from this pool, all the related slots will be released and tasks be canceled.
|
CompletableFuture<PhysicalSlot> |
requestNewAllocatedSlot(SlotRequestId slotRequestId,
ResourceProfile resourceProfile,
Time timeout)
Request the allocation of a new slot from the resource manager.
|
protected void |
runAsync(Runnable runnable)
Execute the runnable in the main thread of the underlying RPC endpoint.
|
protected void |
scheduleRunAsync(Runnable runnable,
long delay,
TimeUnit unit)
Execute the runnable in the main thread of the underlying RPC endpoint, with
a delay of the given number of milliseconds.
|
protected void |
scheduleRunAsync(Runnable runnable,
Time delay)
Execute the runnable in the main thread of the underlying RPC endpoint, with
a delay of the given number of milliseconds.
|
void |
start(JobMasterId jobMasterId,
String newJobManagerAddress,
ComponentMainThreadExecutor componentMainThreadExecutor)
Start the slot pool to accept RPC calls.
|
void |
suspend()
Suspends this pool, meaning it has lost its authority to accept and distribute slots.
|
protected void |
timeoutPendingSlotRequest(SlotRequestId slotRequestId) |
@VisibleForTesting public SlotPoolImpl(JobID jobId)
public void start(@Nonnull JobMasterId jobMasterId, @Nonnull String newJobManagerAddress, @Nonnull ComponentMainThreadExecutor componentMainThreadExecutor) throws Exception
start
in interface SlotPool
jobMasterId
- The necessary leader id for running the job.newJobManagerAddress
- for the slot requests which are sent to the resource managercomponentMainThreadExecutor
- The main thread executor for the job master's main thread.Exception
public void suspend()
public void close()
close
in interface AutoCloseable
close
in interface SlotPool
public void connectToResourceManager(@Nonnull ResourceManagerGateway resourceManagerGateway)
SlotPool
connectToResourceManager
in interface SlotPool
resourceManagerGateway
- The RPC gateway for the resource manager.public void disconnectResourceManager()
SlotPool
The slot pool will still be able to serve slots from its internal pool.
disconnectResourceManager
in interface SlotPool
public void releaseSlot(@Nonnull SlotRequestId slotRequestId, @Nullable Throwable cause)
AllocatedSlotActions
SlotRequestId
. Additionally, one can provide a cause for the slot release.releaseSlot
in interface AllocatedSlotActions
slotRequestId
- identifying the slot to releasecause
- of the slot release, null if nonepublic Optional<PhysicalSlot> allocateAvailableSlot(@Nonnull SlotRequestId slotRequestId, @Nonnull AllocationID allocationID)
SlotPool
null
if no slot with the given allocation id is available.allocateAvailableSlot
in interface SlotPool
slotRequestId
- identifying the requested slotallocationID
- the allocation id of the requested available slotnull
if no such slot existed.@Nonnull public CompletableFuture<PhysicalSlot> requestNewAllocatedSlot(@Nonnull SlotRequestId slotRequestId, @Nonnull ResourceProfile resourceProfile, Time timeout)
SlotPool
requestNewAllocatedSlot
in interface SlotPool
slotRequestId
- identifying the requested slotresourceProfile
- resource profile that specifies the resource requirements for the requested slottimeout
- timeout for the allocation procedure@Nonnull public Collection<SlotInfo> getAvailableSlotsInformation()
SlotPool
SlotInfo
objects about all slots that are currently available in the slot
pool.getAvailableSlotsInformation
in interface SlotPool
SlotInfo
objects about all slots that are currently available in the slot pool.public Collection<SlotOffer> offerSlots(TaskManagerLocation taskManagerLocation, TaskManagerGateway taskManagerGateway, Collection<SlotOffer> offers)
SlotPool
SlotPool
. The slot offerings can be
individually accepted or rejected by returning the collection of accepted
slot offers.offerSlots
in interface SlotPool
taskManagerLocation
- from which the slot offers originatetaskManagerGateway
- to talk to the slot offereroffers
- slot offers which are offered to the SlotPool
public Optional<ResourceID> failAllocation(AllocationID allocationID, Exception cause)
failAllocation
in interface SlotPool
allocationID
- Represents the allocation which should be failedcause
- The cause of the failurepublic boolean registerTaskManager(ResourceID resourceID)
registerTaskManager
in interface SlotPool
resourceID
- The id of the TaskManagerpublic boolean releaseTaskManager(ResourceID resourceId, Exception cause)
releaseTaskManager
in interface SlotPool
resourceId
- The id of the TaskManagercause
- for the releasing of the TaskManagerpublic AllocatedSlotReport createAllocatedSlotReport(ResourceID taskManagerId)
SlotPool
createAllocatedSlotReport
in interface SlotPool
taskManagerId
- identifies the task manager@VisibleForTesting protected void timeoutPendingSlotRequest(SlotRequestId slotRequestId)
@VisibleForTesting protected org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.AllocatedSlots getAllocatedSlots()
@VisibleForTesting protected SlotPoolImpl.AvailableSlots getAvailableSlots()
protected void runAsync(Runnable runnable)
runnable
- Runnable to be executed in the main thread of the underlying RPC endpointprotected void scheduleRunAsync(Runnable runnable, Time delay)
runnable
- Runnable to be executeddelay
- The delay after which the runnable will be executedprotected void scheduleRunAsync(Runnable runnable, long delay, TimeUnit unit)
runnable
- Runnable to be executeddelay
- The delay after which the runnable will be executedCopyright © 2014–2020 The Apache Software Foundation. All rights reserved.