public class StreamingSemiAntiJoinOperator extends AbstractStreamingJoinOperator
AbstractStreamingJoinOperator.AssociatedRecords, AbstractStreamingJoinOperator.OuterRecord
collector, joinCondition, LEFT_RECORDS_STATE_NAME, leftInputSideSpec, leftStateRetentionTime, leftType, RIGHT_RECORDS_STATE_NAME, rightInputSideSpec, rightStateRetentionTime, rightType
chainingStrategy, config, latencyStats, LOG, metrics, output, processingTimeService
Constructor and Description |
---|
StreamingSemiAntiJoinOperator(boolean isAntiJoin,
InternalTypeInfo<RowData> leftType,
InternalTypeInfo<RowData> rightType,
GeneratedJoinCondition generatedJoinCondition,
JoinInputSideSpec leftInputSideSpec,
JoinInputSideSpec rightInputSideSpec,
boolean[] filterNullKeys,
long leftStateRetentionTime,
long rightStateRetentionTIme) |
Modifier and Type | Method and Description |
---|---|
void |
open()
This method is called immediately before any elements are processed, it should contain the
operator's initialization logic, e.g. state initialization.
|
void |
processElement1(StreamRecord<RowData> element)
Process an input element and output incremental joined records, retraction messages will be
sent in some scenarios.
|
void |
processElement2(StreamRecord<RowData> element)
Process an input element and output incremental joined records, retraction messages will be
sent in some scenarios.
|
close
finish, getChainingStrategy, getContainingTask, getCurrentKey, getExecutionConfig, getInternalTimerService, getKeyedStateBackend, getKeyedStateStore, getMetricGroup, getOperatorConfig, getOperatorID, getOperatorName, getOperatorStateBackend, getOrCreateKeyedState, getPartitionedState, getPartitionedState, getProcessingTimeService, getRuntimeContext, getTimeServiceManager, getUserCodeClassloader, hasKeyContext1, hasKeyContext2, initializeState, initializeState, isUsingCustomRawKeyedState, notifyCheckpointAborted, notifyCheckpointComplete, prepareSnapshotPreBarrier, processLatencyMarker, processLatencyMarker1, processLatencyMarker2, processWatermark, processWatermark1, processWatermark2, processWatermarkStatus, processWatermarkStatus1, processWatermarkStatus2, reportOrForwardLatencyMarker, setChainingStrategy, setCurrentKey, setKeyContextElement1, setKeyContextElement2, setProcessingTimeService, setup, snapshotState, snapshotState
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
processLatencyMarker1, processLatencyMarker2, processWatermark1, processWatermark2, processWatermarkStatus1, processWatermarkStatus2
finish, getMetricGroup, getOperatorID, initializeState, prepareSnapshotPreBarrier, setKeyContextElement1, setKeyContextElement2, snapshotState
notifyCheckpointAborted, notifyCheckpointComplete
getCurrentKey, setCurrentKey
hasKeyContext
public StreamingSemiAntiJoinOperator(boolean isAntiJoin, InternalTypeInfo<RowData> leftType, InternalTypeInfo<RowData> rightType, GeneratedJoinCondition generatedJoinCondition, JoinInputSideSpec leftInputSideSpec, JoinInputSideSpec rightInputSideSpec, boolean[] filterNullKeys, long leftStateRetentionTime, long rightStateRetentionTIme)
public void open() throws Exception
AbstractStreamOperator
The default implementation does nothing.
open
in interface StreamOperator<RowData>
open
in class AbstractStreamingJoinOperator
Exception
- An exception in this method causes the operator to fail.public void processElement1(StreamRecord<RowData> element) throws Exception
Following is the pseudo code to describe the core logic of this method.
if there is no matched rows on the other side if anti join, send input record if there are matched rows on the other side if semi join, send input record if the input record is accumulate, state.add(record, matched size) if the input record is retract, state.retract(record)
Exception
public void processElement2(StreamRecord<RowData> element) throws Exception
Following is the pseudo code to describe the core logic of this method.
Note: "+I" represents "INSERT", "-D" represents "DELETE", "+U" represents "UPDATE_AFTER", "-U" represents "UPDATE_BEFORE".
if input record is accumulate | state.add(record) | if there is no matched rows on the other side, skip | if there are matched rows on the other side | | if the matched num in the matched rows == 0 | | if anti join, send -D[other]s | | if semi join, send +I/+U[other]s (using input RowKind) | | if the matched num in the matched rows > 0, skip | | otherState.update(other, old+1) | endif endif if input record is retract | state.retract(record) | if there is no matched rows on the other side, skip | if there are matched rows on the other side | | if the matched num in the matched rows == 0, this should never happen! | | if the matched num in the matched rows == 1 | | if semi join, send -D/-U[other] (using input RowKind) | | if anti join, send +I[other] | | if the matched num in the matched rows > 1, skip | | otherState.update(other, old-1) | endif endif
Exception
Copyright © 2014–2024 The Apache Software Foundation. All rights reserved.