@Internal public final class SlicingWindowOperator<K,W> extends TableStreamOperator<RowData> implements OneInputStreamOperator<RowData,RowData>, Triggerable<K,W>, KeyContext
SlicingWindowOperator
implements an optimized processing for aligned windows which
can apply the slicing optimization. The core idea of slicing optimization is to divide all
elements from a data stream into a finite number of non-overlapping chunks (a.k.a. slices).
We divide windows into 2 categories: Aligned Windows and Unaligned Windows.
Aligned Windows are windows have predetermined window boundaries and windows can be divided into finite number of non-overlapping chunks. The boundary of an aligned window is determined independently from the time characteristic of the data stream, or messages it receives. For example, hopping (sliding) window is an aligned window as the window boundaries are predetermined based on the window size and slide. Aligned windows include tumbling, hopping, cumulative windows.
Unaligned Windows are windows determined dynamically based on elements. For example, session window is an unaligned window as the window boundaries are determined based on the messages timestamps and their correlations. Currently, unaligned windows include session window only.
Because aligned windows can be divided into finite number of non-overlapping chunks (a.k.a. slices), which can apply efficient processing to share intermediate results.
Dividing a window of aligned windows into a finite number of non-overlapping chunks, where the chunks are slices. It has the following properties:
A slicing window operator is a simple wrap of SlicingWindowProcessor
. It delegates all
the important methods to the underlying processor, where the processor can have different
implementation for aggregate and topk or others.
A SlicingWindowProcessor
usually leverages the SliceAssigner
to assign slices
and calculate based on the slices. See SliceSharedWindowAggProcessor
as an example.
Note: since SlicingWindowProcessor
leverages slicing optimization for aligned windows,
therefore, it doesn't support unaligned windows, e.g. session window.
Note: currently, SlicingWindowOperator
doesn't support early-fire and late-arrival.
Thus late elements (elements belong to emitted windows) will be simply dropped.
TableStreamOperator.ContextImpl
Modifier and Type | Field and Description |
---|---|
protected TimestampedCollector<RowData> |
collector
This is used for emitting elements with a given timestamp.
|
ctx, currentWatermark
chainingStrategy, config, latencyStats, LOG, metrics, output, processingTimeService
Constructor and Description |
---|
SlicingWindowOperator(SlicingWindowProcessor<W> windowProcessor) |
Modifier and Type | Method and Description |
---|---|
void |
close()
This method is called at the very end of the operator's life, both in the case of a
successful completion of the operation, and in the case of a failure and canceling.
|
Counter |
getNumLateRecordsDropped() |
Gauge<Long> |
getWatermarkLatency() |
void |
initializeState(StateInitializationContext context)
Stream operators with state which can be restored need to override this hook method.
|
void |
onEventTime(InternalTimer<K,W> timer)
Invoked when an event-time timer fires.
|
void |
onProcessingTime(InternalTimer<K,W> timer)
Invoked when a processing-time timer fires.
|
void |
open()
This method is called immediately before any elements are processed, it should contain the
operator's initialization logic, e.g. state initialization.
|
void |
prepareSnapshotPreBarrier(long checkpointId)
This method is called when the operator should do a snapshot, before it emits its own
checkpoint barrier.
|
void |
processElement(StreamRecord<RowData> element)
Processes one element that arrived on this input of the
MultipleInputStreamOperator . |
void |
processWatermark(Watermark mark)
Processes a
Watermark that arrived on the first input of this two-input operator. |
void |
snapshotState(StateSnapshotContext context)
Stream operators with state, which want to participate in a snapshot need to override this
hook method.
|
computeMemorySize
finish, getChainingStrategy, getContainingTask, getCurrentKey, getExecutionConfig, getInternalTimerService, getKeyedStateBackend, getKeyedStateStore, getMetricGroup, getOperatorConfig, getOperatorID, getOperatorName, getOperatorStateBackend, getOrCreateKeyedState, getPartitionedState, getPartitionedState, getProcessingTimeService, getRuntimeContext, getTimeServiceManager, getUserCodeClassloader, hasKeyContext1, hasKeyContext2, initializeState, isUsingCustomRawKeyedState, notifyCheckpointAborted, notifyCheckpointComplete, processLatencyMarker, processLatencyMarker1, processLatencyMarker2, processWatermark1, processWatermark2, processWatermarkStatus, processWatermarkStatus1, processWatermarkStatus2, reportOrForwardLatencyMarker, setChainingStrategy, setCurrentKey, setKeyContextElement1, setKeyContextElement2, setProcessingTimeService, setup, snapshotState
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
setKeyContextElement
finish, getMetricGroup, getOperatorID, initializeState, setKeyContextElement1, setKeyContextElement2, snapshotState
notifyCheckpointAborted, notifyCheckpointComplete
getCurrentKey, setCurrentKey
processLatencyMarker, processWatermarkStatus
hasKeyContext
protected transient TimestampedCollector<RowData> collector
public SlicingWindowOperator(SlicingWindowProcessor<W> windowProcessor)
public void open() throws Exception
AbstractStreamOperator
The default implementation does nothing.
open
in interface StreamOperator<RowData>
open
in class TableStreamOperator<RowData>
Exception
- An exception in this method causes the operator to fail.public void initializeState(StateInitializationContext context) throws Exception
AbstractStreamOperator
initializeState
in interface StreamOperatorStateHandler.CheckpointedStreamOperator
initializeState
in class AbstractStreamOperator<RowData>
context
- context that allows to register different states.Exception
public void snapshotState(StateSnapshotContext context) throws Exception
AbstractStreamOperator
snapshotState
in interface StreamOperatorStateHandler.CheckpointedStreamOperator
snapshotState
in class AbstractStreamOperator<RowData>
context
- context that provides information and means required for taking a snapshotException
public void close() throws Exception
StreamOperator
This method is expected to make a thorough effort to release all resources that the operator has acquired.
NOTE:It can not emit any records! If you need to emit records at the end of
processing, do so in the StreamOperator.finish()
method.
close
in interface StreamOperator<RowData>
close
in class AbstractStreamOperator<RowData>
Exception
public void processElement(StreamRecord<RowData> element) throws Exception
Input
MultipleInputStreamOperator
.
This method is guaranteed to not be called concurrently with other methods of the operator.processElement
in interface Input<RowData>
Exception
public void processWatermark(Watermark mark) throws Exception
Input
Watermark
that arrived on the first input of this two-input operator.
This method is guaranteed to not be called concurrently with other methods of the operator.processWatermark
in interface Input<RowData>
processWatermark
in class TableStreamOperator<RowData>
Exception
Watermark
public void onEventTime(InternalTimer<K,W> timer) throws Exception
Triggerable
onEventTime
in interface Triggerable<K,W>
Exception
public void onProcessingTime(InternalTimer<K,W> timer) throws Exception
Triggerable
onProcessingTime
in interface Triggerable<K,W>
Exception
public void prepareSnapshotPreBarrier(long checkpointId) throws Exception
StreamOperator
This method is intended not for any actual state persistence, but only for emitting some data before emitting the checkpoint barrier. Operators that maintain some small transient state that is inefficient to checkpoint (especially when it would need to be checkpointed in a re-scalable way) but can simply be sent downstream before the checkpoint. An example are opportunistic pre-aggregation operators, which have small the pre-aggregation state that is frequently flushed downstream.
Important: This method should not be used for any actual state snapshot logic, because it will inherently be within the synchronous part of the operator's checkpoint. If heavy work is done within this method, it will affect latency and downstream checkpoint alignments.
prepareSnapshotPreBarrier
in interface StreamOperator<RowData>
prepareSnapshotPreBarrier
in class AbstractStreamOperator<RowData>
checkpointId
- The ID of the checkpoint.Exception
- Throwing an exception here causes the operator to fail and go into
recovery.@VisibleForTesting public Counter getNumLateRecordsDropped()
@VisibleForTesting public Gauge<Long> getWatermarkLatency()
Copyright © 2014–2024 The Apache Software Foundation. All rights reserved.