Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
84ead13
protocal support for full outer join
windtalker Mar 14, 2026
577d5c0
add doc
windtalker Mar 14, 2026
7a2a79c
guard cartesian full outer join
windtalker Mar 14, 2026
15b7ab1
add local gtests build notes
windtalker Mar 14, 2026
93954c2
make full join schemas nullable
windtalker Mar 14, 2026
06dbc7b
allow full join non-equal conditions
windtalker Mar 14, 2026
6581648
clarify full join other-condition rollout
windtalker Mar 14, 2026
f7c2093
fix full join other-condition execution
windtalker Mar 15, 2026
7e5e0d0
test full join targeted coverage
windtalker Mar 15, 2026
cde4613
Fix full outer join review comments
windtalker Mar 27, 2026
1590cf3
update tipb
windtalker Mar 28, 2026
1f30115
Remove AGENTS.md from full outer join PR
windtalker Mar 28, 2026
bbe82d7
docs add nulleq join design note
windtalker Mar 15, 2026
d58fde6
plumb join is_null_eq into DB::Join
windtalker Mar 15, 2026
f82ad38
support json_object pushdown
windtalker Mar 16, 2026
8b5499c
disable json_object explain test temporarily
windtalker Mar 16, 2026
71b80dd
fix json_object code format
windtalker Mar 16, 2026
3eb2eb1
support nullable nulleq join keys in hash join
windtalker Mar 16, 2026
1316d57
refine row filter handling for nulleq join
windtalker Mar 16, 2026
239bfef
add outer join tests for nulleq scan-after-probe
windtalker Mar 16, 2026
64df247
test: cover full join other condition for nulleq
windtalker Mar 16, 2026
074a670
fix: disable runtime filter for nullable nulleq keys
windtalker Mar 16, 2026
877b54a
test: extend nulleq join coverage
windtalker Mar 17, 2026
43ace95
test: add null-eq join regression coverage
windtalker Mar 21, 2026
6bccb3c
docs: update null-eq join progress
windtalker Mar 21, 2026
ba30d61
feat: optimize null-eq join fixed keys
windtalker Mar 22, 2026
8e48e45
test: cover null-eq join key paths
windtalker Mar 22, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,12 @@ Block ScanHashMapAfterProbeBlockInputStream::readImpl()
else
fillColumnsUsingCurrentPartition<false, true>(columns_left, columns_right, row_counter_column);
break;
case ASTTableJoin::Kind::Full:
if (parent.has_other_condition)
fillColumnsUsingCurrentPartition<true, false>(columns_left, columns_right, row_counter_column);
else
fillColumnsUsingCurrentPartition<false, false>(columns_left, columns_right, row_counter_column);
break;
case ASTTableJoin::Kind::RightAnti:
case ASTTableJoin::Kind::RightOuter:
if (parent.has_other_condition)
Expand Down
124 changes: 63 additions & 61 deletions dbms/src/Debug/MockExecutor/JoinBinder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,63 @@
#include <Debug/MockExecutor/ExecutorBinder.h>
#include <Debug/MockExecutor/JoinBinder.h>
#include <Flash/Coprocessor/DAGCodec.h>
#include <Flash/Coprocessor/JoinInterpreterHelper.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTTablesInSelectQuery.h>

namespace DB::mock
{
namespace
{
void appendJoinSchema(DAGSchema & output_schema, const DAGSchema & input_schema, bool make_nullable)
{
for (const auto & field : input_schema)
{
if (make_nullable && field.second.hasNotNullFlag())
output_schema.push_back(toNullableDAGColumnInfo(field));
else
output_schema.push_back(field);
}
}

void buildLeftSideJoinSchema(DAGSchema & schema, const DAGSchema & left_schema, tipb::JoinType tp)
{
appendJoinSchema(schema, left_schema, JoinInterpreterHelper::makeLeftJoinSideNullable(tp));
}

void buildRightSideJoinSchema(DAGSchema & schema, const DAGSchema & right_schema, tipb::JoinType tp)
{
/// Note: for semi join, the right table column is ignored
/// but for (anti) left outer semi join, a 1/0 (uint8) field is pushed back
/// indicating whether right table has matching row(s), see comment in ASTTableJoin::Kind for details.
if (tp == tipb::JoinType::TypeLeftOuterSemiJoin || tp == tipb::JoinType::TypeAntiLeftOuterSemiJoin)
{
tipb::FieldType field_type{};
field_type.set_tp(TiDB::TypeTiny);
field_type.set_charset("binary");
field_type.set_collate(TiDB::ITiDBCollator::BINARY);
field_type.set_flag(0);
field_type.set_flen(-1);
field_type.set_decimal(-1);
schema.push_back(std::make_pair("", TiDB::fieldTypeToColumnInfo(field_type)));
}
else if (tp != tipb::JoinType::TypeSemiJoin && tp != tipb::JoinType::TypeAntiSemiJoin)
{
appendJoinSchema(schema, right_schema, JoinInterpreterHelper::makeRightJoinSideNullable(tp));
}
}

DAGSchema buildOtherConditionSchema(
const DAGSchema & left_schema,
const DAGSchema & right_schema,
tipb::JoinType join_type)
{
DAGSchema merged_children_schema;
appendJoinSchema(merged_children_schema, left_schema, JoinInterpreterHelper::makeLeftJoinSideNullable(join_type));
appendJoinSchema(merged_children_schema, right_schema, JoinInterpreterHelper::makeRightJoinSideNullable(join_type));
return merged_children_schema;
}
} // namespace

void JoinBinder::addRuntimeFilter(MockRuntimeFilter & rf)
{
Expand Down Expand Up @@ -95,22 +147,8 @@ void JoinBinder::columnPrune(std::unordered_set<String> & used_columns)

/// update output schema
output_schema.clear();

for (auto & field : children[0]->output_schema)
{
if (tp == tipb::TypeRightOuterJoin && field.second.hasNotNullFlag())
output_schema.push_back(toNullableDAGColumnInfo(field));
else
output_schema.push_back(field);
}

for (auto & field : children[1]->output_schema)
{
if (tp == tipb::TypeLeftOuterJoin && field.second.hasNotNullFlag())
output_schema.push_back(toNullableDAGColumnInfo(field));
else
output_schema.push_back(field);
}
buildLeftSideJoinSchema(output_schema, children[0]->output_schema, tp);
buildRightSideJoinSchema(output_schema, children[1]->output_schema, tp);
}

void JoinBinder::fillJoinKeyAndFieldType(
Expand Down Expand Up @@ -158,6 +196,7 @@ bool JoinBinder::toTiPBExecutor(
join->set_join_exec_type(tipb::JoinExecType::TypeHashJoin);
join->set_inner_idx(inner_index);
join->set_is_null_aware_semi_join(is_null_aware_semi_join);
assert(is_null_eq.empty() || is_null_eq.size() == join_cols.size());

for (const auto & key : join_cols)
{
Expand All @@ -175,6 +214,9 @@ bool JoinBinder::toTiPBExecutor(
collator_id);
}

for (const auto flag : is_null_eq)
join->add_is_null_eq(flag != 0);

for (const auto & expr : left_conds)
{
tipb::Expr * cond = join->add_left_conditions();
Expand All @@ -187,11 +229,8 @@ bool JoinBinder::toTiPBExecutor(
astToPB(children[1]->output_schema, expr, cond, collator_id, context);
}

DAGSchema merged_children_schema{children[0]->output_schema};
merged_children_schema.insert(
merged_children_schema.end(),
children[1]->output_schema.begin(),
children[1]->output_schema.end());
DAGSchema merged_children_schema
= buildOtherConditionSchema(children[0]->output_schema, children[1]->output_schema, tp);

for (const auto & expr : other_conds)
{
Expand Down Expand Up @@ -293,52 +332,14 @@ void JoinBinder::toMPPSubPlan(
exchange_map[right_exchange_receiver->name] = std::make_pair(right_exchange_receiver, right_exchange_sender);
}

static void buildLeftSideJoinSchema(DAGSchema & schema, const DAGSchema & left_schema, tipb::JoinType tp)
{
for (const auto & field : left_schema)
{
if (tp == tipb::JoinType::TypeRightOuterJoin && field.second.hasNotNullFlag())
schema.push_back(toNullableDAGColumnInfo(field));
else
schema.push_back(field);
}
}

static void buildRightSideJoinSchema(DAGSchema & schema, const DAGSchema & right_schema, tipb::JoinType tp)
{
/// Note: for semi join, the right table column is ignored
/// but for (anti) left outer semi join, a 1/0 (uint8) field is pushed back
/// indicating whether right table has matching row(s), see comment in ASTTableJoin::Kind for details.
if (tp == tipb::JoinType::TypeLeftOuterSemiJoin || tp == tipb::JoinType::TypeAntiLeftOuterSemiJoin)
{
tipb::FieldType field_type{};
field_type.set_tp(TiDB::TypeTiny);
field_type.set_charset("binary");
field_type.set_collate(TiDB::ITiDBCollator::BINARY);
field_type.set_flag(0);
field_type.set_flen(-1);
field_type.set_decimal(-1);
schema.push_back(std::make_pair("", TiDB::fieldTypeToColumnInfo(field_type)));
}
else if (tp != tipb::JoinType::TypeSemiJoin && tp != tipb::JoinType::TypeAntiSemiJoin)
{
for (const auto & field : right_schema)
{
if (tp == tipb::JoinType::TypeLeftOuterJoin && field.second.hasNotNullFlag())
schema.push_back(toNullableDAGColumnInfo(field));
else
schema.push_back(field);
}
}
}

// compileJoin constructs a mocked Join executor node, note that all conditional expression params can be default
ExecutorBinderPtr compileJoin(
size_t & executor_index,
ExecutorBinderPtr left,
ExecutorBinderPtr right,
tipb::JoinType tp,
const ASTs & join_cols,
const std::vector<UInt8> & is_null_eq,
const ASTs & left_conds,
const ASTs & right_conds,
const ASTs & other_conds,
Expand All @@ -357,6 +358,7 @@ ExecutorBinderPtr compileJoin(
output_schema,
tp,
join_cols,
is_null_eq,
left_conds,
right_conds,
other_conds,
Expand Down Expand Up @@ -405,6 +407,6 @@ ExecutorBinderPtr compileJoin(size_t & executor_index, ExecutorBinderPtr left, E
join_cols.push_back(key);
}
}
return compileJoin(executor_index, left, right, tp, join_cols);
return compileJoin(executor_index, left, right, tp, join_cols, {});
}
} // namespace DB::mock
4 changes: 4 additions & 0 deletions dbms/src/Debug/MockExecutor/JoinBinder.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class JoinBinder : public ExecutorBinder
const DAGSchema & output_schema_,
tipb::JoinType tp_,
const ASTs & join_cols_,
const std::vector<UInt8> & is_null_eq_,
const ASTs & l_conds,
const ASTs & r_conds,
const ASTs & o_conds,
Expand All @@ -39,6 +40,7 @@ class JoinBinder : public ExecutorBinder
: ExecutorBinder(index_, "Join_" + std::to_string(index_), output_schema_)
, tp(tp_)
, join_cols(join_cols_)
, is_null_eq(is_null_eq_)
, left_conds(l_conds)
, right_conds(r_conds)
, other_conds(o_conds)
Expand Down Expand Up @@ -77,6 +79,7 @@ class JoinBinder : public ExecutorBinder
tipb::JoinType tp;

const ASTs join_cols{};
const std::vector<UInt8> is_null_eq{};
const ASTs left_conds{};
const ASTs right_conds{};
const ASTs other_conds{};
Expand All @@ -93,6 +96,7 @@ ExecutorBinderPtr compileJoin(
ExecutorBinderPtr right,
tipb::JoinType tp,
const ASTs & join_cols,
const std::vector<UInt8> & is_null_eq = {},
const ASTs & left_conds = {},
const ASTs & right_conds = {},
const ASTs & other_conds = {},
Expand Down
4 changes: 3 additions & 1 deletion dbms/src/Flash/Coprocessor/DAGUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ const std::unordered_map<tipb::ScalarFuncSig, String> scalar_func_map({
//{tipb::ScalarFuncSig::JsonReplaceSig, "cast"},
//{tipb::ScalarFuncSig::JsonRemoveSig, "cast"},
//{tipb::ScalarFuncSig::JsonMergeSig, "cast"},
//{tipb::ScalarFuncSig::JsonObjectSig, "cast"},
{tipb::ScalarFuncSig::JsonObjectSig, "json_object"},
{tipb::ScalarFuncSig::JsonArraySig, "json_array"},
{tipb::ScalarFuncSig::JsonValidJsonSig, "json_valid_json"},
{tipb::ScalarFuncSig::JsonValidOthersSig, "json_valid_others"},
Expand Down Expand Up @@ -847,6 +847,8 @@ String getJoinTypeName(const tipb::JoinType & tp)
return "LeftOuterJoin";
case tipb::JoinType::TypeRightOuterJoin:
return "RightOuterJoin";
case tipb::JoinType::TypeFullOuterJoin:
return "FullOuterJoin";
case tipb::JoinType::TypeLeftOuterSemiJoin:
return "LeftOuterSemiJoin";
case tipb::JoinType::TypeAntiSemiJoin:
Expand Down
Loading