Class StreamNonDeterministicPhysicalPlanResolver


  • public class StreamNonDeterministicPhysicalPlanResolver
    extends Object
    The StreamNonDeterministicPhysicalPlanResolver tries to resolve the correctness issue caused by 'Non-Deterministic Updates' (NDU) in a changelog pipeline. Changelog may contain kinds of message types: Insert (I), Delete (D), Update_Before (UB), Update_After (UA).

    There's no NDU problem in an insert-only changelog pipeline.

    For the updates, there are two cases, with and without upsertKey(a metadata from FlinkRelMdUpsertKeys , consider it as the primary key of the changelog). The upsertKey can be always treated as deterministic, so if all the pipeline operators can transmit upsertKey normally (include working with sink's primary key), everything goes well.

    The key issue is upsertKey can be easily lost in a pipeline or does not exist from the source or at the sink. All stateful operators can only process an update (D/UB/UA) message by comparing the complete row (retract by row) if without a key identifier, also include a sink without primary key definition that works as retractSink. So under the 'retract by row' mode, a stateful operator requires no non-deterministic column disturb the original changelog row. There are three killers:

    1. Non-deterministic functions(include scalar, table, aggregate functions, builtin or custom ones)

    2. LookupJoin on an evolving source

    3. Cdc-source carries metadata fields(system columns, not belongs to the entity row itself)

    For the first step, this resolver automatically enables the materialization for No.2(LookupJoin) if needed, and gives the detail error message for No.1 (Non-deterministic functions) and No.3(Cdc-source with metadata) which we think it is relatively easy to change the SQL(add materialization is not a good idea for now, it has very high cost and will bring too much complexity to the operators).

    Why not do this validation and rewriting in physical-rewrite phase, like FlinkChangelogModeInferenceProgram does? Because the physical plan may be changed a lot after physical-rewrite being done, we should check the 'final' plan instead.

    Some specific plan patterns:

    1. Non-deterministic scalar function calls

    
     Sink
       |
     Project1{select col1,col2,now(),...}
       |
     Scan1
     

    2. Non-deterministic table function calls

    
          Sink
            |
         Correlate
         /      \
     Project1  TableFunctionScan1
        |
      Scan1
     

    3. lookup join: lookup a source which data may change over time

    
          Sink
            |
         LookupJoin
         /      \
     Filter1  Source2
        |
     Project1
        |
      Scan1
     

    3.1 lookup join: an inner project with non-deterministic function calls or remaining join condition is non-deterministic

    
          Sink
            |
         LookupJoin
         /      \
     Filter1  Project2
        |        |
     Project1   Source2
        |
      Scan1
     

    4. cdc source with metadata

    
         Sink
           | no upsertKey can be inferred
       Correlate
         /      \
       /       TableFunctionScan1(deterministic)
     Project1 {select id,name,attr1,op_time}
       |
     Scan {cdc source <id,name,attr1,op_type,op_time> }
     

    4.1 cdc source with metadata

    
         Sink
           | no upsertKey can be inferred
       LookupJoin {lookup key not contains the lookup source's pk}
         /      \
       /       Source2
     Project1 {select id,name,attr1,op_time}
       |
     Scan {cdc source <id,name,attr1,op_type,op_time> }
     

    CDC source with metadata is another form of non-deterministic update.

    5. grouping keys with non-deterministic column

    
     Sink{pk=(c3,day)}
       | upsertKey=(c3,day)
      GroupAgg{group by c3, day}
       |
     Project{select c1,c2,DATE_FORMAT(CURRENT_TIMESTAMP, 'yyMMdd') day,...}
       |
     Deduplicate{keep last row, dedup on c1,c2}
       |
      Scan
     
    • Constructor Detail

      • StreamNonDeterministicPhysicalPlanResolver

        public StreamNonDeterministicPhysicalPlanResolver()