K
- The type of the key based on which we join elements.T1
- The type of the elements in the left stream.T2
- The type of the elements in the right stream.OUT
- The output type created by the user-defined function.@Internal public class IntervalJoinOperator<K,T1,T2,OUT> extends AbstractUdfStreamOperator<OUT,ProcessJoinFunction<T1,T2,OUT>> implements TwoInputStreamOperator<T1,T2,OUT>, Triggerable<K,String>
operator
to execute time-bounded stream inner joins.
By using a configurable lower and upper bound this operator will emit exactly those pairs (T1, T2) where t2.ts ∈ [T1.ts + lowerBound, T1.ts + upperBound]. Both the lower and the upper bound can be configured to be either inclusive or exclusive.
As soon as elements are joined they are passed to a user-defined ProcessJoinFunction
.
The basic idea of this implementation is as follows: Whenever we receive an element at processElement1(StreamRecord)
(a.k.a. the left side), we add it to the left buffer. We then
check the right buffer to see whether there are any elements that can be joined. If there are,
they are joined and passed to the aforementioned function. The same happens the other way around
when receiving an element on the right side.
Whenever a pair of elements is emitted it will be assigned the max timestamp of either of the elements.
In order to avoid the element buffers to grow indefinitely a cleanup timer is registered per element. This timer indicates when an element is not considered for joining anymore and can be removed from the state.
Modifier and Type | Class and Description |
---|---|
static class |
IntervalJoinOperator.BufferEntry<T>
A container for elements put in the left/write buffer.
|
static class |
IntervalJoinOperator.BufferEntrySerializer<T>
A
serializer for the IntervalJoinOperator.BufferEntry . |
static class |
IntervalJoinOperator.BufferEntrySerializerSnapshot<T>
|
userFunction
chainingStrategy, config, latencyStats, LOG, metrics, output, processingTimeService
Constructor and Description |
---|
IntervalJoinOperator(long lowerBound,
long upperBound,
boolean lowerBoundInclusive,
boolean upperBoundInclusive,
OutputTag<T1> leftLateDataOutputTag,
OutputTag<T2> rightLateDataOutputTag,
TypeSerializer<T1> leftTypeSerializer,
TypeSerializer<T2> rightTypeSerializer,
ProcessJoinFunction<T1,T2,OUT> udf)
Creates a new IntervalJoinOperator.
|
Modifier and Type | Method and Description |
---|---|
void |
initializeState(StateInitializationContext context)
Stream operators with state which can be restored need to override this hook method.
|
void |
onEventTime(InternalTimer<K,String> timer)
Invoked when an event-time timer fires.
|
void |
onProcessingTime(InternalTimer<K,String> timer)
Invoked when a processing-time timer fires.
|
void |
open()
This method is called immediately before any elements are processed, it should contain the
operator's initialization logic, e.g. state initialization.
|
void |
processElement1(StreamRecord<T1> record)
Process a
StreamRecord from the left stream. |
void |
processElement2(StreamRecord<T2> record)
Process a
StreamRecord from the right stream. |
protected <T> void |
sideOutput(T value,
long timestamp,
boolean isLeft)
Write skipped late arriving element to SideOutput.
|
close, finish, getUserFunction, getUserFunctionParameters, notifyCheckpointAborted, notifyCheckpointComplete, setOutputType, setup, snapshotState
getChainingStrategy, getContainingTask, getCurrentKey, getExecutionConfig, getInternalTimerService, getKeyedStateBackend, getKeyedStateStore, getMetricGroup, getOperatorConfig, getOperatorID, getOperatorName, getOperatorStateBackend, getOrCreateKeyedState, getPartitionedState, getPartitionedState, getProcessingTimeService, getRuntimeContext, getTimeServiceManager, getUserCodeClassloader, hasKeyContext1, hasKeyContext2, initializeState, isUsingCustomRawKeyedState, prepareSnapshotPreBarrier, processLatencyMarker, processLatencyMarker1, processLatencyMarker2, processWatermark, processWatermark1, processWatermark2, processWatermarkStatus, processWatermarkStatus1, processWatermarkStatus2, reportOrForwardLatencyMarker, setChainingStrategy, setCurrentKey, setKeyContextElement1, setKeyContextElement2, setProcessingTimeService, snapshotState
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
processLatencyMarker1, processLatencyMarker2, processWatermark1, processWatermark2, processWatermarkStatus1, processWatermarkStatus2
close, finish, getMetricGroup, getOperatorID, initializeState, prepareSnapshotPreBarrier, setKeyContextElement1, setKeyContextElement2, snapshotState
notifyCheckpointAborted, notifyCheckpointComplete
getCurrentKey, setCurrentKey
hasKeyContext
public IntervalJoinOperator(long lowerBound, long upperBound, boolean lowerBoundInclusive, boolean upperBoundInclusive, OutputTag<T1> leftLateDataOutputTag, OutputTag<T2> rightLateDataOutputTag, TypeSerializer<T1> leftTypeSerializer, TypeSerializer<T2> rightTypeSerializer, ProcessJoinFunction<T1,T2,OUT> udf)
lowerBound
- The lower bound for evaluating if elements should be joinedupperBound
- The upper bound for evaluating if elements should be joinedlowerBoundInclusive
- Whether or not to include elements where the timestamp matches the
lower boundupperBoundInclusive
- Whether or not to include elements where the timestamp matches the
upper boundudf
- A user-defined ProcessJoinFunction
that gets called whenever two elements
of T1 and T2 are joinedpublic void open() throws Exception
AbstractStreamOperator
The default implementation does nothing.
open
in interface StreamOperator<OUT>
open
in class AbstractUdfStreamOperator<OUT,ProcessJoinFunction<T1,T2,OUT>>
Exception
- An exception in this method causes the operator to fail.public void initializeState(StateInitializationContext context) throws Exception
AbstractStreamOperator
initializeState
in interface StreamOperatorStateHandler.CheckpointedStreamOperator
initializeState
in class AbstractUdfStreamOperator<OUT,ProcessJoinFunction<T1,T2,OUT>>
context
- context that allows to register different states.Exception
public void processElement1(StreamRecord<T1> record) throws Exception
StreamRecord
from the left stream. Whenever an StreamRecord
arrives
at the left stream, it will get added to the left buffer. Possible join candidates for that
element will be looked up from the right buffer and if the pair lies within the user defined
boundaries, it gets passed to the ProcessJoinFunction
.processElement1
in interface TwoInputStreamOperator<T1,T2,OUT>
record
- An incoming record to be joinedException
- Can throw an Exception during state accesspublic void processElement2(StreamRecord<T2> record) throws Exception
StreamRecord
from the right stream. Whenever a StreamRecord
arrives
at the right stream, it will get added to the right buffer. Possible join candidates for that
element will be looked up from the left buffer and if the pair lies within the user defined
boundaries, it gets passed to the ProcessJoinFunction
.processElement2
in interface TwoInputStreamOperator<T1,T2,OUT>
record
- An incoming record to be joinedException
- Can throw an exception during state accessprotected <T> void sideOutput(T value, long timestamp, boolean isLeft)
public void onEventTime(InternalTimer<K,String> timer) throws Exception
Triggerable
onEventTime
in interface Triggerable<K,String>
Exception
public void onProcessingTime(InternalTimer<K,String> timer) throws Exception
Triggerable
onProcessingTime
in interface Triggerable<K,String>
Exception
Copyright © 2014–2024 The Apache Software Foundation. All rights reserved.