@Internal public abstract class InputPriorityGraphGenerator extends Object
ExecNode
graph.
Some batch operators (for example, hash join and nested loop join) have different priorities for their inputs. When some operators are reused, a deadlock may occur due to the conflict in these priorities.
For example, consider the SQL query:
WITH T1 AS (SELECT a, COUNT(*) AS cnt1 FROM x GROUP BY a), T2 AS (SELECT d, COUNT(*) AS cnt2 FROM y GROUP BY d) SELECT * FROM (SELECT cnt1, cnt2 FROM T1 LEFT JOIN T2 ON a = d) UNION ALL (SELECT cnt1, cnt2 FROM T2 LEFT JOIN T1 ON d = a)
When sub-plan reuse are enabled, we'll get the following physical plan:
Union(all=[true], union=[cnt1, cnt2]) :- Calc(select=[CAST(cnt1) AS cnt1, cnt2]) : +- HashJoin(joinType=[LeftOuterJoin], where=[=(a, d)], select=[a, cnt1, d, cnt2], build=[right]) : :- HashAggregate(isMerge=[true], groupBy=[a], select=[a, Final_COUNT(count1$0) AS cnt1], reuse_id=[2]) : : +- Exchange(distribution=[hash[a]]) : : +- LocalHashAggregate(groupBy=[a], select=[a, Partial_COUNT(*) AS count1$0]) : : +- Calc(select=[a]) : : +- LegacyTableSourceScan(table=[[default_catalog, default_database, x, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) : +- HashAggregate(isMerge=[true], groupBy=[d], select=[d, Final_COUNT(count1$0) AS cnt2], reuse_id=[1]) : +- Exchange(distribution=[hash[d]]) : +- LocalHashAggregate(groupBy=[d], select=[d, Partial_COUNT(*) AS count1$0]) : +- Calc(select=[d]) : +- LegacyTableSourceScan(table=[[default_catalog, default_database, y, source: [TestTableSource(d, e, f)]]], fields=[d, e, f]) +- Calc(select=[cnt1, CAST(cnt2) AS cnt2]) +- HashJoin(joinType=[LeftOuterJoin], where=[=(d, a)], select=[d, cnt2, a, cnt1], build=[right]) :- Reused(reference_id=[1]) +- Reused(reference_id=[2])
Note that the first hash join needs to read all results from the hash aggregate whose reuse id is 1 before reading the results from the hash aggregate whose reuse id is 2, while the second hash join requires the opposite. This physical plan will thus cause a deadlock.
This class maintains a topological graph in which an edge pointing from vertex A to vertex B indicates that the results from vertex A need to be read before those from vertex B. A loop in the graph indicates a deadlock, and different subclasses of this class resolve the conflict in different ways.
For a detailed explanation of the algorithm, see appendix of the design doc.
Modifier and Type | Field and Description |
---|---|
protected org.apache.flink.table.planner.plan.nodes.exec.processor.utils.TopologyGraph |
graph |
Constructor and Description |
---|
InputPriorityGraphGenerator(List<ExecNode<?>> roots,
Set<ExecNode<?>> boundaries,
InputProperty.DamBehavior safeDamBehavior)
Create an
InputPriorityGraphGenerator for the given ExecNode sub-graph. |
Modifier and Type | Method and Description |
---|---|
protected void |
createTopologyGraph() |
protected abstract void |
resolveInputPriorityConflict(ExecNode<?> node,
int higherInput,
int lowerInput) |
protected org.apache.flink.table.planner.plan.nodes.exec.processor.utils.TopologyGraph graph
public InputPriorityGraphGenerator(List<ExecNode<?>> roots, Set<ExecNode<?>> boundaries, InputProperty.DamBehavior safeDamBehavior)
InputPriorityGraphGenerator
for the given ExecNode
sub-graph.roots
- the first layer of nodes on the output side of the sub-graphboundaries
- the first layer of nodes on the input side of the sub-graphsafeDamBehavior
- when checking for conflicts we'll ignore the edges with InputProperty.DamBehavior
stricter or equal than thisprotected void createTopologyGraph()
protected abstract void resolveInputPriorityConflict(ExecNode<?> node, int higherInput, int lowerInput)
Copyright © 2014–2024 The Apache Software Foundation. All rights reserved.