public class StreamingJoinOperator extends AbstractStreamingJoinOperator
AbstractStreamingJoinOperator.AssociatedRecords, AbstractStreamingJoinOperator.OuterRecord
Modifier and Type | Field and Description |
---|---|
protected boolean |
leftIsOuter |
protected JoinRecordStateView |
leftRecordStateView |
protected boolean |
rightIsOuter |
protected JoinRecordStateView |
rightRecordStateView |
collector, joinCondition, LEFT_RECORDS_STATE_NAME, leftInputSideSpec, leftStateRetentionTime, leftType, RIGHT_RECORDS_STATE_NAME, rightInputSideSpec, rightStateRetentionTime, rightType
chainingStrategy, config, lastRecordAttributes1, lastRecordAttributes2, latencyStats, LOG, metrics, output, processingTimeService, stateHandler, stateKeySelector1, stateKeySelector2, timeServiceManager
Constructor and Description |
---|
StreamingJoinOperator(InternalTypeInfo<RowData> leftType,
InternalTypeInfo<RowData> rightType,
GeneratedJoinCondition generatedJoinCondition,
JoinInputSideSpec leftInputSideSpec,
JoinInputSideSpec rightInputSideSpec,
boolean leftIsOuter,
boolean rightIsOuter,
boolean[] filterNullKeys,
long leftStateRetentionTime,
long rightStateRetentionTime) |
Modifier and Type | Method and Description |
---|---|
void |
open()
This method is called immediately before any elements are processed, it should contain the
operator's initialization logic, e.g. state initialization.
|
protected void |
processElement(RowData input,
JoinRecordStateView inputSideStateView,
JoinRecordStateView otherSideStateView,
boolean inputIsLeft,
boolean isSuppress)
Process an input element and output incremental joined records, retraction messages will be
sent in some scenarios.
|
void |
processElement1(StreamRecord<RowData> element)
Processes one element that arrived on the first input of this two-input operator.
|
void |
processElement2(StreamRecord<RowData> element)
Processes one element that arrived on the second input of this two-input operator.
|
close
finish, getChainingStrategy, getContainingTask, getCurrentKey, getExecutionConfig, getInternalTimerService, getKeyedStateBackend, getKeyedStateStore, getMetricGroup, getOperatorConfig, getOperatorID, getOperatorName, getOperatorStateBackend, getOrCreateKeyedState, getPartitionedState, getPartitionedState, getProcessingTimeService, getRuntimeContext, getStateKeySelector1, getStateKeySelector2, getTimeServiceManager, getUserCodeClassloader, hasKeyContext1, hasKeyContext2, initializeState, initializeState, isUsingCustomRawKeyedState, notifyCheckpointAborted, notifyCheckpointComplete, prepareSnapshotPreBarrier, processLatencyMarker, processLatencyMarker1, processLatencyMarker2, processRecordAttributes, processRecordAttributes1, processRecordAttributes2, processWatermark, processWatermark1, processWatermark2, processWatermarkStatus, processWatermarkStatus1, processWatermarkStatus2, reportOrForwardLatencyMarker, setChainingStrategy, setCurrentKey, setKeyContextElement1, setKeyContextElement2, setMailboxExecutor, setProcessingTimeService, setup, snapshotState, snapshotState, useSplittableTimers
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
processLatencyMarker1, processLatencyMarker2, processRecordAttributes1, processRecordAttributes2, processWatermark1, processWatermark2, processWatermarkStatus1, processWatermarkStatus2
finish, getMetricGroup, getOperatorAttributes, getOperatorID, initializeState, prepareSnapshotPreBarrier, setKeyContextElement1, setKeyContextElement2, snapshotState
notifyCheckpointAborted, notifyCheckpointComplete
getCurrentKey, setCurrentKey
hasKeyContext
protected final boolean leftIsOuter
protected final boolean rightIsOuter
protected transient JoinRecordStateView leftRecordStateView
protected transient JoinRecordStateView rightRecordStateView
public StreamingJoinOperator(InternalTypeInfo<RowData> leftType, InternalTypeInfo<RowData> rightType, GeneratedJoinCondition generatedJoinCondition, JoinInputSideSpec leftInputSideSpec, JoinInputSideSpec rightInputSideSpec, boolean leftIsOuter, boolean rightIsOuter, boolean[] filterNullKeys, long leftStateRetentionTime, long rightStateRetentionTime)
public void open() throws Exception
AbstractStreamOperator
The default implementation does nothing.
open
in interface StreamOperator<RowData>
open
in class AbstractStreamingJoinOperator
Exception
- An exception in this method causes the operator to fail.public void processElement1(StreamRecord<RowData> element) throws Exception
TwoInputStreamOperator
Exception
public void processElement2(StreamRecord<RowData> element) throws Exception
TwoInputStreamOperator
Exception
protected void processElement(RowData input, JoinRecordStateView inputSideStateView, JoinRecordStateView otherSideStateView, boolean inputIsLeft, boolean isSuppress) throws Exception
Following is the pseudo code to describe the core logic of this method. The logic of this method is too complex, so we provide the pseudo code to help understand the logic. We should keep sync the following pseudo code with the real logic of the method.
Note: "+I" represents "INSERT", "-D" represents "DELETE", "+U" represents "UPDATE_AFTER",
"-U" represents "UPDATE_BEFORE". We forward input RowKind if it is inner join, otherwise, we
always send insert and delete for simplification. We can optimize this to send -U & +U
instead of D & I in the future (see FLINK-17337). They are equivalent in this join case. It
may need some refactoring if we want to send -U & +U, so we still keep -D & +I for now for
simplification. See FlinkChangelogModeInferenceProgram.SatisfyModifyKindSetTraitVisitor
.
if input record is accumulate | if input side is outer | | if there is no matched rows on the other side, send +I[record+null], state.add(record, 0) | | if there are matched rows on the other side | | | if other side is outer | | | | if the matched num in the matched rows == 0, send -D[null+other] | | | | if the matched num in the matched rows > 0, skip | | | | otherState.update(other, old + 1) | | | endif | | | send +I[record+other]s, state.add(record, other.size) | | endif | endif | if input side not outer | | state.add(record) | | if there is no matched rows on the other side, skip | | if there are matched rows on the other side | | | if other side is outer | | | | if the matched num in the matched rows == 0, send -D[null+other] | | | | if the matched num in the matched rows > 0, skip | | | | otherState.update(other, old + 1) | | | | send +I[record+other]s | | | else | | | | send +I/+U[record+other]s (using input RowKind) | | | endif | | endif | endif endif if input record is retract | state.retract(record) | if there is no matched rows on the other side | | if input side is outer, send -D[record+null] | endif | if there are matched rows on the other side, send -D[record+other]s if outer, send -D/-U[record+other]s if inner. | | if other side is outer | | | if the matched num in the matched rows == 0, this should never happen! | | | if the matched num in the matched rows == 1, send +I[null+other] | | | if the matched num in the matched rows > 1, skip | | | otherState.update(other, old - 1) | | endif | endif endif
input
- the input elementinputSideStateView
- state of input sideotherSideStateView
- state of other sideinputIsLeft
- whether input side is left sideisSuppress
- whether suppress the output of redundant messages when the other side is
outer join. This only applies to the case of mini-batch.Exception
Copyright © 2014–2024 The Apache Software Foundation. All rights reserved.