Class IntervalJoinOperator<K,T1,T2,OUT>
- java.lang.Object
-
- org.apache.flink.streaming.api.operators.AbstractStreamOperator<OUT>
-
- org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator<OUT,ProcessJoinFunction<T1,T2,OUT>>
-
- org.apache.flink.streaming.api.operators.co.IntervalJoinOperator<K,T1,T2,OUT>
-
- Type Parameters:
K
- The type of the key based on which we join elements.T1
- The type of the elements in the left stream.T2
- The type of the elements in the right stream.OUT
- The output type created by the user-defined function.
- All Implemented Interfaces:
Serializable
,CheckpointListener
,KeyContext
,KeyContextHandler
,OutputTypeConfigurable<OUT>
,StreamOperator<OUT>
,StreamOperatorStateHandler.CheckpointedStreamOperator
,Triggerable<K,String>
,TwoInputStreamOperator<T1,T2,OUT>
,UserFunctionProvider<ProcessJoinFunction<T1,T2,OUT>>
,YieldingOperator<OUT>
@Internal public class IntervalJoinOperator<K,T1,T2,OUT> extends AbstractUdfStreamOperator<OUT,ProcessJoinFunction<T1,T2,OUT>> implements TwoInputStreamOperator<T1,T2,OUT>, Triggerable<K,String>
Anoperator
to execute time-bounded stream inner joins.By using a configurable lower and upper bound this operator will emit exactly those pairs (T1, T2) where t2.ts ∈ [T1.ts + lowerBound, T1.ts + upperBound]. Both the lower and the upper bound can be configured to be either inclusive or exclusive.
As soon as elements are joined they are passed to a user-defined
ProcessJoinFunction
.The basic idea of this implementation is as follows: Whenever we receive an element at
processElement1(StreamRecord)
(a.k.a. the left side), we add it to the left buffer. We then check the right buffer to see whether there are any elements that can be joined. If there are, they are joined and passed to the aforementioned function. The same happens the other way around when receiving an element on the right side.Whenever a pair of elements is emitted it will be assigned the max timestamp of either of the elements.
In order to avoid the element buffers to grow indefinitely a cleanup timer is registered per element. This timer indicates when an element is not considered for joining anymore and can be removed from the state.
- See Also:
- Serialized Form
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description static class
IntervalJoinOperator.BufferEntry<T>
A container for elements put in the left/write buffer.static class
IntervalJoinOperator.BufferEntrySerializer<T>
Aserializer
for theIntervalJoinOperator.BufferEntry
.static class
IntervalJoinOperator.BufferEntrySerializerSnapshot<T>
-
Field Summary
-
Fields inherited from class org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator
userFunction
-
Fields inherited from class org.apache.flink.streaming.api.operators.AbstractStreamOperator
config, lastRecordAttributes1, lastRecordAttributes2, latencyStats, LOG, metrics, output, processingTimeService, stateHandler, stateKeySelector1, stateKeySelector2, timeServiceManager
-
-
Constructor Summary
Constructors Constructor Description IntervalJoinOperator(long lowerBound, long upperBound, boolean lowerBoundInclusive, boolean upperBoundInclusive, OutputTag<T1> leftLateDataOutputTag, OutputTag<T2> rightLateDataOutputTag, TypeSerializer<T1> leftTypeSerializer, TypeSerializer<T2> rightTypeSerializer, ProcessJoinFunction<T1,T2,OUT> udf)
Creates a new IntervalJoinOperator.
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description void
initializeState(StateInitializationContext context)
Stream operators with state which can be restored need to override this hook method.void
onEventTime(InternalTimer<K,String> timer)
Invoked when an event-time timer fires.void
onProcessingTime(InternalTimer<K,String> timer)
Invoked when a processing-time timer fires.void
open()
This method is called immediately before any elements are processed, it should contain the operator's initialization logic, e.g. state initialization.void
processElement1(StreamRecord<T1> record)
Process aStreamRecord
from the left stream.void
processElement2(StreamRecord<T2> record)
Process aStreamRecord
from the right stream.protected <T> void
sideOutput(T value, long timestamp, boolean isLeft)
Write skipped late arriving element to SideOutput.-
Methods inherited from class org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator
close, finish, getUserFunction, getUserFunctionParameters, notifyCheckpointAborted, notifyCheckpointComplete, setOutputType, setup, snapshotState
-
Methods inherited from class org.apache.flink.streaming.api.operators.AbstractStreamOperator
getContainingTask, getCurrentKey, getExecutionConfig, getInternalTimerService, getKeyedStateBackend, getKeyedStateStore, getMetricGroup, getOperatorConfig, getOperatorID, getOperatorName, getOperatorStateBackend, getOrCreateKeyedState, getPartitionedState, getPartitionedState, getProcessingTimeService, getRuntimeContext, getStateKeySelector1, getStateKeySelector2, getTimeServiceManager, getUserCodeClassloader, hasKeyContext1, hasKeyContext2, initializeState, isUsingCustomRawKeyedState, prepareSnapshotPreBarrier, processLatencyMarker, processLatencyMarker1, processLatencyMarker2, processRecordAttributes, processRecordAttributes1, processRecordAttributes2, processWatermark, processWatermark1, processWatermark2, processWatermarkStatus, processWatermarkStatus1, processWatermarkStatus2, reportOrForwardLatencyMarker, setCurrentKey, setKeyContextElement1, setKeyContextElement2, setMailboxExecutor, setProcessingTimeService, snapshotState, useSplittableTimers
-
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
-
Methods inherited from interface org.apache.flink.api.common.state.CheckpointListener
notifyCheckpointAborted, notifyCheckpointComplete
-
Methods inherited from interface org.apache.flink.streaming.api.operators.KeyContext
getCurrentKey, setCurrentKey
-
Methods inherited from interface org.apache.flink.streaming.api.operators.KeyContextHandler
hasKeyContext
-
Methods inherited from interface org.apache.flink.streaming.api.operators.StreamOperator
close, finish, getMetricGroup, getOperatorAttributes, getOperatorID, initializeState, prepareSnapshotPreBarrier, setKeyContextElement1, setKeyContextElement2, snapshotState
-
Methods inherited from interface org.apache.flink.streaming.api.operators.TwoInputStreamOperator
processLatencyMarker1, processLatencyMarker2, processRecordAttributes1, processRecordAttributes2, processWatermark1, processWatermark2, processWatermarkStatus1, processWatermarkStatus2
-
-
-
-
Constructor Detail
-
IntervalJoinOperator
public IntervalJoinOperator(long lowerBound, long upperBound, boolean lowerBoundInclusive, boolean upperBoundInclusive, OutputTag<T1> leftLateDataOutputTag, OutputTag<T2> rightLateDataOutputTag, TypeSerializer<T1> leftTypeSerializer, TypeSerializer<T2> rightTypeSerializer, ProcessJoinFunction<T1,T2,OUT> udf)
Creates a new IntervalJoinOperator.- Parameters:
lowerBound
- The lower bound for evaluating if elements should be joinedupperBound
- The upper bound for evaluating if elements should be joinedlowerBoundInclusive
- Whether or not to include elements where the timestamp matches the lower boundupperBoundInclusive
- Whether or not to include elements where the timestamp matches the upper boundudf
- A user-definedProcessJoinFunction
that gets called whenever two elements of T1 and T2 are joined
-
-
Method Detail
-
open
public void open() throws Exception
Description copied from class:AbstractStreamOperator
This method is called immediately before any elements are processed, it should contain the operator's initialization logic, e.g. state initialization.The default implementation does nothing.
- Specified by:
open
in interfaceStreamOperator<K>
- Overrides:
open
in classAbstractUdfStreamOperator<OUT,ProcessJoinFunction<T1,T2,OUT>>
- Throws:
Exception
- An exception in this method causes the operator to fail.
-
initializeState
public void initializeState(StateInitializationContext context) throws Exception
Description copied from class:AbstractStreamOperator
Stream operators with state which can be restored need to override this hook method.- Specified by:
initializeState
in interfaceStreamOperatorStateHandler.CheckpointedStreamOperator
- Overrides:
initializeState
in classAbstractUdfStreamOperator<OUT,ProcessJoinFunction<T1,T2,OUT>>
- Parameters:
context
- context that allows to register different states.- Throws:
Exception
-
processElement1
public void processElement1(StreamRecord<T1> record) throws Exception
Process aStreamRecord
from the left stream. Whenever anStreamRecord
arrives at the left stream, it will get added to the left buffer. Possible join candidates for that element will be looked up from the right buffer and if the pair lies within the user defined boundaries, it gets passed to theProcessJoinFunction
.- Specified by:
processElement1
in interfaceTwoInputStreamOperator<K,T1,T2>
- Parameters:
record
- An incoming record to be joined- Throws:
Exception
- Can throw an Exception during state access
-
processElement2
public void processElement2(StreamRecord<T2> record) throws Exception
Process aStreamRecord
from the right stream. Whenever aStreamRecord
arrives at the right stream, it will get added to the right buffer. Possible join candidates for that element will be looked up from the left buffer and if the pair lies within the user defined boundaries, it gets passed to theProcessJoinFunction
.- Specified by:
processElement2
in interfaceTwoInputStreamOperator<K,T1,T2>
- Parameters:
record
- An incoming record to be joined- Throws:
Exception
- Can throw an exception during state access
-
sideOutput
protected <T> void sideOutput(T value, long timestamp, boolean isLeft)
Write skipped late arriving element to SideOutput.
-
onEventTime
public void onEventTime(InternalTimer<K,String> timer) throws Exception
Description copied from interface:Triggerable
Invoked when an event-time timer fires.- Specified by:
onEventTime
in interfaceTriggerable<K,T1>
- Throws:
Exception
-
onProcessingTime
public void onProcessingTime(InternalTimer<K,String> timer) throws Exception
Description copied from interface:Triggerable
Invoked when a processing-time timer fires.- Specified by:
onProcessingTime
in interfaceTriggerable<K,T1>
- Throws:
Exception
-
-