Class StreamNonDeterministicPhysicalPlanResolver
- java.lang.Object
-
- org.apache.flink.table.planner.plan.optimize.StreamNonDeterministicPhysicalPlanResolver
-
public class StreamNonDeterministicPhysicalPlanResolver extends Object
TheStreamNonDeterministicPhysicalPlanResolver
tries to resolve the correctness issue caused by 'Non-Deterministic Updates' (NDU) in a changelog pipeline. Changelog may contain kinds of message types: Insert (I), Delete (D), Update_Before (UB), Update_After (UA).There's no NDU problem in an insert-only changelog pipeline.
For the updates, there are two cases, with and without upsertKey(a metadata from
FlinkRelMdUpsertKeys
, consider it as the primary key of the changelog). The upsertKey can be always treated as deterministic, so if all the pipeline operators can transmit upsertKey normally (include working with sink's primary key), everything goes well.The key issue is upsertKey can be easily lost in a pipeline or does not exist from the source or at the sink. All stateful operators can only process an update (D/UB/UA) message by comparing the complete row (retract by row) if without a key identifier, also include a sink without primary key definition that works as retractSink. So under the 'retract by row' mode, a stateful operator requires no non-deterministic column disturb the original changelog row. There are three killers:
1. Non-deterministic functions(include scalar, table, aggregate functions, builtin or custom ones)
2. LookupJoin on an evolving source
3. Cdc-source carries metadata fields(system columns, not belongs to the entity row itself)
For the first step, this resolver automatically enables the materialization for No.2(LookupJoin) if needed, and gives the detail error message for No.1 (Non-deterministic functions) and No.3(Cdc-source with metadata) which we think it is relatively easy to change the SQL(add materialization is not a good idea for now, it has very high cost and will bring too much complexity to the operators).
Why not do this validation and rewriting in physical-rewrite phase, like
FlinkChangelogModeInferenceProgram
does? Because the physical plan may be changed a lot after physical-rewrite being done, we should check the 'final' plan instead.Some specific plan patterns:
1. Non-deterministic scalar function calls
Sink | Project1{select col1,col2,now(),...} | Scan1
2. Non-deterministic table function calls
Sink | Correlate / \ Project1 TableFunctionScan1 | Scan1
3. lookup join: lookup a source which data may change over time
Sink | LookupJoin / \ Filter1 Source2 | Project1 | Scan1
3.1 lookup join: an inner project with non-deterministic function calls or remaining join condition is non-deterministic
Sink | LookupJoin / \ Filter1 Project2 | | Project1 Source2 | Scan1
4. cdc source with metadata
Sink | no upsertKey can be inferred Correlate / \ / TableFunctionScan1(deterministic) Project1 {select id,name,attr1,op_time} | Scan {cdc source <id,name,attr1,op_type,op_time> }
4.1 cdc source with metadata
Sink | no upsertKey can be inferred LookupJoin {lookup key not contains the lookup source's pk} / \ / Source2 Project1 {select id,name,attr1,op_time} | Scan {cdc source <id,name,attr1,op_type,op_time> }
CDC source with metadata is another form of non-deterministic update.
5. grouping keys with non-deterministic column
Sink{pk=(c3,day)} | upsertKey=(c3,day) GroupAgg{group by c3, day} | Project{select c1,c2,DATE_FORMAT(CURRENT_TIMESTAMP, 'yyMMdd') day,...} | Deduplicate{keep last row, dedup on c1,c2} | Scan
-
-
Constructor Summary
Constructors Constructor Description StreamNonDeterministicPhysicalPlanResolver()
-
Method Summary
All Methods Static Methods Concrete Methods Modifier and Type Method Description static List<org.apache.calcite.rel.RelNode>
resolvePhysicalPlan(List<org.apache.calcite.rel.RelNode> expanded, TableConfig tableConfig)
Try to resolve the NDU problem if configuredOptimizerConfigOptions.TABLE_OPTIMIZER_NONDETERMINISTIC_UPDATE_STRATEGY
is in `TRY_RESOLVE` mode.
-
-
-
Method Detail
-
resolvePhysicalPlan
public static List<org.apache.calcite.rel.RelNode> resolvePhysicalPlan(List<org.apache.calcite.rel.RelNode> expanded, TableConfig tableConfig)
Try to resolve the NDU problem if configuredOptimizerConfigOptions.TABLE_OPTIMIZER_NONDETERMINISTIC_UPDATE_STRATEGY
is in `TRY_RESOLVE` mode. Will raise an error if the NDU issues in the given plan can not be completely solved.
-
-