Class StreamingSemiAntiJoinOperator

    • Method Detail

      • processElement1

        public void processElement1​(StreamRecord<RowData> element)
                             throws Exception
        Process an input element and output incremental joined records, retraction messages will be sent in some scenarios.

        Following is the pseudo code to describe the core logic of this method.

         if there is no matched rows on the other side
           if anti join, send input record
         if there are matched rows on the other side
           if semi join, send input record
         if the input record is accumulate, state.add(record, matched size)
         if the input record is retract, state.retract(record)
         
        Throws:
        Exception
      • processElement2

        public void processElement2​(StreamRecord<RowData> element)
                             throws Exception
        Process an input element and output incremental joined records, retraction messages will be sent in some scenarios.

        Following is the pseudo code to describe the core logic of this method.

        Note: "+I" represents "INSERT", "-D" represents "DELETE", "+U" represents "UPDATE_AFTER", "-U" represents "UPDATE_BEFORE".

         if input record is accumulate
         | state.add(record)
         | if there is no matched rows on the other side, skip
         | if there are matched rows on the other side
         | | if the matched num in the matched rows == 0
         | |   if anti join, send -D[other]s
         | |   if semi join, send +I/+U[other]s (using input RowKind)
         | | if the matched num in the matched rows > 0, skip
         | | otherState.update(other, old+1)
         | endif
         endif
         if input record is retract
         | state.retract(record)
         | if there is no matched rows on the other side, skip
         | if there are matched rows on the other side
         | | if the matched num in the matched rows == 0, this should never happen!
         | | if the matched num in the matched rows == 1
         | |   if semi join, send -D/-U[other] (using input RowKind)
         | |   if anti join, send +I[other]
         | | if the matched num in the matched rows > 1, skip
         | | otherState.update(other, old-1)
         | endif
         endif
         
        Throws:
        Exception