public class HeadOperator extends org.apache.flink.streaming.api.operators.AbstractStreamOperator<IterationRecord<?>> implements org.apache.flink.streaming.api.operators.OneInputStreamOperator<IterationRecord<?>,IterationRecord<?>>, org.apache.flink.statefun.flink.core.feedback.FeedbackConsumer<org.apache.flink.streaming.runtime.streamrecord.StreamRecord<IterationRecord<?>>>, org.apache.flink.runtime.operators.coordination.OperatorEventHandler, org.apache.flink.streaming.api.operators.BoundedOneInput
Specially for checkpoint, the head operator would like to
IterationListener.onEpochWatermarkIncremented(int,
IterationListener.Context, Collector)
.
To implement the first target, the head operator also need to include the records between
alignment and received barrier from the feed-back edge into the snapshot. To implement the second
target, the head operator would also wait for the notification from the OperatorCoordinator in
additional to the task inputs. This ensures the GloballyAlignedEvent
would not interleave
with the epoch watermarks and all the tasks inside the iteration would be notified with the same
epochs, which facility the rescaling in the future.
Modifier and Type | Field and Description |
---|---|
static org.apache.flink.util.OutputTag<IterationRecord<Void>> |
ALIGN_NOTIFY_OUTPUT_TAG |
Constructor and Description |
---|
HeadOperator(IterationID iterationId,
int feedbackIndex,
boolean isCriteriaStream,
org.apache.flink.api.common.operators.MailboxExecutor mailboxExecutor,
org.apache.flink.runtime.operators.coordination.OperatorEventGateway operatorEventGateway,
org.apache.flink.streaming.runtime.tasks.ProcessingTimeService processingTimeService) |
Modifier and Type | Method and Description |
---|---|
void |
close() |
void |
endInput() |
org.apache.flink.runtime.operators.coordination.OperatorEventGateway |
getOperatorEventGateway() |
org.apache.flink.iteration.operator.HeadOperator.HeadOperatorStatus |
getStatus() |
void |
handleOperatorEvent(org.apache.flink.runtime.operators.coordination.OperatorEvent operatorEvent) |
void |
initializeState(org.apache.flink.runtime.state.StateInitializationContext context) |
void |
notifyCheckpointAborted(long checkpointId) |
void |
prepareSnapshotPreBarrier(long checkpointId) |
void |
processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord<IterationRecord<?>> element) |
void |
processFeedback(org.apache.flink.streaming.runtime.streamrecord.StreamRecord<IterationRecord<?>> iterationRecord) |
void |
setup(org.apache.flink.streaming.runtime.tasks.StreamTask<?,?> containingTask,
org.apache.flink.streaming.api.graph.StreamConfig config,
org.apache.flink.streaming.api.operators.Output<org.apache.flink.streaming.runtime.streamrecord.StreamRecord<IterationRecord<?>>> output) |
void |
snapshotState(org.apache.flink.runtime.state.StateSnapshotContext context) |
finish, getChainingStrategy, getContainingTask, getCurrentKey, getExecutionConfig, getInternalTimerService, getKeyedStateBackend, getKeyedStateStore, getMetricGroup, getOperatorConfig, getOperatorID, getOperatorName, getOperatorStateBackend, getOrCreateKeyedState, getPartitionedState, getPartitionedState, getProcessingTimeService, getRuntimeContext, getTimeServiceManager, getUserCodeClassloader, initializeState, isUsingCustomRawKeyedState, notifyCheckpointComplete, open, processLatencyMarker, processLatencyMarker1, processLatencyMarker2, processWatermark, processWatermark1, processWatermark2, processWatermarkStatus, processWatermarkStatus1, processWatermarkStatus2, reportOrForwardLatencyMarker, setChainingStrategy, setCurrentKey, setKeyContextElement1, setKeyContextElement2, setProcessingTimeService, snapshotState
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
setKeyContextElement
finish, getMetricGroup, getOperatorID, initializeState, open, setKeyContextElement1, setKeyContextElement2, snapshotState
notifyCheckpointComplete
public static final org.apache.flink.util.OutputTag<IterationRecord<Void>> ALIGN_NOTIFY_OUTPUT_TAG
public HeadOperator(IterationID iterationId, int feedbackIndex, boolean isCriteriaStream, org.apache.flink.api.common.operators.MailboxExecutor mailboxExecutor, org.apache.flink.runtime.operators.coordination.OperatorEventGateway operatorEventGateway, org.apache.flink.streaming.runtime.tasks.ProcessingTimeService processingTimeService)
public void setup(org.apache.flink.streaming.runtime.tasks.StreamTask<?,?> containingTask, org.apache.flink.streaming.api.graph.StreamConfig config, org.apache.flink.streaming.api.operators.Output<org.apache.flink.streaming.runtime.streamrecord.StreamRecord<IterationRecord<?>>> output)
setup
in interface org.apache.flink.streaming.api.operators.SetupableStreamOperator<IterationRecord<?>>
setup
in class org.apache.flink.streaming.api.operators.AbstractStreamOperator<IterationRecord<?>>
public void initializeState(org.apache.flink.runtime.state.StateInitializationContext context) throws Exception
initializeState
in interface org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.CheckpointedStreamOperator
initializeState
in class org.apache.flink.streaming.api.operators.AbstractStreamOperator<IterationRecord<?>>
Exception
public void prepareSnapshotPreBarrier(long checkpointId) throws Exception
prepareSnapshotPreBarrier
in interface org.apache.flink.streaming.api.operators.StreamOperator<IterationRecord<?>>
prepareSnapshotPreBarrier
in class org.apache.flink.streaming.api.operators.AbstractStreamOperator<IterationRecord<?>>
Exception
public void snapshotState(org.apache.flink.runtime.state.StateSnapshotContext context) throws Exception
snapshotState
in interface org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.CheckpointedStreamOperator
snapshotState
in class org.apache.flink.streaming.api.operators.AbstractStreamOperator<IterationRecord<?>>
Exception
public void notifyCheckpointAborted(long checkpointId) throws Exception
notifyCheckpointAborted
in interface org.apache.flink.api.common.state.CheckpointListener
notifyCheckpointAborted
in class org.apache.flink.streaming.api.operators.AbstractStreamOperator<IterationRecord<?>>
Exception
public void processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord<IterationRecord<?>> element) throws Exception
processElement
in interface org.apache.flink.streaming.api.operators.Input<IterationRecord<?>>
Exception
public void processFeedback(org.apache.flink.streaming.runtime.streamrecord.StreamRecord<IterationRecord<?>> iterationRecord) throws Exception
processFeedback
in interface org.apache.flink.statefun.flink.core.feedback.FeedbackConsumer<org.apache.flink.streaming.runtime.streamrecord.StreamRecord<IterationRecord<?>>>
Exception
public void handleOperatorEvent(org.apache.flink.runtime.operators.coordination.OperatorEvent operatorEvent)
handleOperatorEvent
in interface org.apache.flink.runtime.operators.coordination.OperatorEventHandler
public void endInput() throws Exception
endInput
in interface org.apache.flink.streaming.api.operators.BoundedOneInput
Exception
public void close() throws Exception
close
in interface org.apache.flink.streaming.api.operators.StreamOperator<IterationRecord<?>>
close
in class org.apache.flink.streaming.api.operators.AbstractStreamOperator<IterationRecord<?>>
Exception
@VisibleForTesting public org.apache.flink.runtime.operators.coordination.OperatorEventGateway getOperatorEventGateway()
@VisibleForTesting public org.apache.flink.iteration.operator.HeadOperator.HeadOperatorStatus getStatus()
Copyright © 2019–2023 The Apache Software Foundation. All rights reserved.