public class StreamNonDeterministicPhysicalPlanResolver extends Object
StreamNonDeterministicPhysicalPlanResolver
tries to resolve the correctness issue
caused by 'Non-Deterministic Updates' (NDU) in a changelog pipeline. Changelog may contain kinds
of message types: Insert (I), Delete (D), Update_Before (UB), Update_After (UA).
There's no NDU problem in an insert-only changelog pipeline.
For the updates, there are two cases, with and without upsertKey(a metadata from FlinkRelMdUpsertKeys
, consider it as the primary
key of the changelog). The upsertKey can be always treated as deterministic, so if all the
pipeline operators can transmit upsertKey normally (include working with sink's primary key),
everything goes well.
The key issue is upsertKey can be easily lost in a pipeline or does not exist from the source or at the sink. All stateful operators can only process an update (D/UB/UA) message by comparing the complete row (retract by row) if without a key identifier, also include a sink without primary key definition that works as retractSink. So under the 'retract by row' mode, a stateful operator requires no non-deterministic column disturb the original changelog row. There are three killers:
1. Non-deterministic functions(include scalar, table, aggregate functions, builtin or custom ones)
2. LookupJoin on an evolving source
3. Cdc-source carries metadata fields(system columns, not belongs to the entity row itself)
For the first step, this resolver automatically enables the materialization for No.2(LookupJoin) if needed, and gives the detail error message for No.1 (Non-deterministic functions) and No.3(Cdc-source with metadata) which we think it is relatively easy to change the SQL(add materialization is not a good idea for now, it has very high cost and will bring too much complexity to the operators).
Why not do this validation and rewriting in physical-rewrite phase, like FlinkChangelogModeInferenceProgram
does?
Because the physical plan may be changed a lot after physical-rewrite being done, we should check
the 'final' plan instead.
Some specific plan patterns:
1. Non-deterministic scalar function calls
Sink
|
Project1{select col1,col2,now(),...}
|
Scan1
2. Non-deterministic table function calls
Sink
|
Correlate
/ \
Project1 TableFunctionScan1
|
Scan1
3. lookup join: lookup a source which data may change over time
Sink
|
LookupJoin
/ \
Filter1 Source2
|
Project1
|
Scan1
3.1 lookup join: an inner project with non-deterministic function calls or remaining join condition is non-deterministic
Sink
|
LookupJoin
/ \
Filter1 Project2
| |
Project1 Source2
|
Scan1
4. cdc source with metadata
Sink
| no upsertKey can be inferred
Correlate
/ \
/ TableFunctionScan1(deterministic)
Project1 {select id,name,attr1,op_time}
|
Scan {cdc source <id,name,attr1,op_type,op_time> }
4.1 cdc source with metadata
Sink
| no upsertKey can be inferred
LookupJoin {lookup key not contains the lookup source's pk}
/ \
/ Source2
Project1 {select id,name,attr1,op_time}
|
Scan {cdc source <id,name,attr1,op_type,op_time> }
CDC source with metadata is another form of non-deterministic update.
5. grouping keys with non-deterministic column
Sink{pk=(c3,day)}
| upsertKey=(c3,day)
GroupAgg{group by c3, day}
|
Project{select c1,c2,DATE_FORMAT(CURRENT_TIMESTAMP, 'yyMMdd') day,...}
|
Deduplicate{keep last row, dedup on c1,c2}
|
Scan
Constructor and Description |
---|
StreamNonDeterministicPhysicalPlanResolver() |
Modifier and Type | Method and Description |
---|---|
static List<org.apache.calcite.rel.RelNode> |
resolvePhysicalPlan(List<org.apache.calcite.rel.RelNode> expanded,
TableConfig tableConfig)
Try to resolve the NDU problem if configured
OptimizerConfigOptions.TABLE_OPTIMIZER_NONDETERMINISTIC_UPDATE_STRATEGY is in `TRY_RESOLVE`
mode. |
public StreamNonDeterministicPhysicalPlanResolver()
public static List<org.apache.calcite.rel.RelNode> resolvePhysicalPlan(List<org.apache.calcite.rel.RelNode> expanded, TableConfig tableConfig)
OptimizerConfigOptions.TABLE_OPTIMIZER_NONDETERMINISTIC_UPDATE_STRATEGY
is in `TRY_RESOLVE`
mode. Will raise an error if the NDU issues in the given plan can not be completely solved.Copyright © 2014–2024 The Apache Software Foundation. All rights reserved.