Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@ ligradle
.DS_Store
*.patch
*/metastore_db
.pyc
.pyc
__pycache__/
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,15 @@ public Table getTable(String name) {

// Dispatch based on CoralTable implementation type
if (coralTable instanceof IcebergTable) {
ViewDependencyTracker.get().recordBaseDependency(dbName, name);
return new IcebergCalciteTableAdapter((IcebergTable) coralTable);
} else if (coralTable instanceof HiveTable) {
HiveTable hiveTable = (HiveTable) coralTable;
// Check if it's a view
if (hiveTable.tableType() == VIEW) {
return new HiveCalciteViewAdapter(hiveTable, ImmutableList.of(CoralRootSchema.ROOT_SCHEMA, dbName));
} else {
ViewDependencyTracker.get().recordBaseDependency(dbName, name);
return new HiveCalciteTableAdapter(hiveTable);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public Table getTable(String name) {
case VIRTUAL_VIEW:
return new HiveCalciteViewAdapter(table, ImmutableList.of(HiveSchema.ROOT_SCHEMA, dbName));
default:
ViewDependencyTracker.get().recordBaseDependency(dbName, name);
return new HiveCalciteTableAdapter(table);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,13 @@ public RelNode convertSql(String sql) {
* @return Calcite {@link RelNode} representation of hive view definition
*/
public RelNode convertView(String hiveDbName, String hiveViewName) {
SqlNode sqlNode = processView(hiveDbName, hiveViewName);
return toRel(sqlNode);
ViewDependencyTracker.get().enterView(hiveDbName, hiveViewName);
try {
SqlNode sqlNode = processView(hiveDbName, hiveViewName);
return toRel(sqlNode);
} finally {
ViewDependencyTracker.get().exitView();
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return and validate it is the same view ?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a check for validating the return of exitView

}
}

// TODO change back to protected once the relevant tests move to the common package
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/**
* Copyright 2017-2026 LinkedIn Corporation. All rights reserved.
* Licensed under the BSD-2 Clause license.
* See LICENSE in the project root for license information.
*/
package com.linkedin.coral.common;

import java.util.List;
import java.util.Objects;


/**
* Represents a single node in the view dependency chain.
* For a view "db.v1" that depends on "db.v2" and "db.t1",
* this would be: ViewDependency("db.v1", ["db.v2", "db.t1"])
*/
public class ViewDependency {
private final String viewName;
private final List<String> dependencies;

public ViewDependency(String viewName, List<String> dependencies) {
this.viewName = viewName;
this.dependencies = dependencies;
}

public String getViewName() {
return viewName;
}

public List<String> getDependencies() {
return dependencies;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ViewDependency that = (ViewDependency) o;
return Objects.equals(viewName, that.viewName) && Objects.equals(dependencies, that.dependencies);
}

@Override
public int hashCode() {
return Objects.hash(viewName, dependencies);
}

@Override
public String toString() {
return "ViewDependency{view=" + viewName + ", deps=" + dependencies + "}";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/**
* Copyright 2017-2026 LinkedIn Corporation. All rights reserved.
* Licensed under the BSD-2 Clause license.
* See LICENSE in the project root for license information.
*/
package com.linkedin.coral.common;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;


/**
* Thread-local tracker that records view to [immediate dependencies] mappings
* during Calcite view expansion.
*/
public class ViewDependencyTracker {
private static final ThreadLocal<ViewDependencyTracker> INSTANCE =
ThreadLocal.withInitial(ViewDependencyTracker::new);

private final Map<String, List<String>> viewDeps = new LinkedHashMap<>();

private final Deque<String> expansionStack = new ArrayDeque<>();

public static ViewDependencyTracker get() {
return INSTANCE.get();
}

public static void reset() {
INSTANCE.remove();
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After each resolution, the state needs to be cleared - not just in tests, but also for 'regular' use

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, this method will be invoked from the caller who invokes the rel converter of Coral


/**
* Called at the START of expanding a view.
*/
public void enterView(String dbName, String tableName) {
String qualifiedName = dbName + "." + tableName;
// Record this view as a dependency of the current parent
if (!expansionStack.isEmpty()) {
String parent = expansionStack.peek();
viewDeps.computeIfAbsent(parent, k -> new ArrayList<>()).add(qualifiedName);
}
expansionStack.push(qualifiedName);
}

/**
* Called when a base table (non-view) is encountered during view expansion.
*/
public void recordBaseDependency(String dbName, String tableName) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Base -> BaseTable or some such ?

String qualifiedName = dbName + "." + tableName;
if (!expansionStack.isEmpty()) {
String parent = expansionStack.peek();
viewDeps.computeIfAbsent(parent, k -> new ArrayList<>()).add(qualifiedName);
}
}

/**
* Called at the END of expanding a view.
*/
public void exitView() {
if (!expansionStack.isEmpty()) {
expansionStack.pop();
}
}

/**
* Returns the collected view dependency chain.
* Each entry represents a view and its immediate dependencies (both views and base tables).
*/
public List<ViewDependency> getViewDependencies() {
List<ViewDependency> result = new ArrayList<>();
for (Map.Entry<String, List<String>> entry : viewDeps.entrySet()) {
result.add(new ViewDependency(entry.getKey(), entry.getValue()));
}
return result;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2017-2022 LinkedIn Corporation. All rights reserved.
* Copyright 2017-2026 LinkedIn Corporation. All rights reserved.
* Licensed under the BSD-2 Clause license.
* See LICENSE in the project root for license information.
*/
Expand All @@ -18,6 +18,7 @@
import org.apache.calcite.util.Util;

import com.linkedin.coral.common.FuzzyUnionSqlRewriter;
import com.linkedin.coral.common.ViewDependencyTracker;


/**
Expand All @@ -44,8 +45,13 @@ public RelRoot expandView(RelDataType rowType, String queryString, List<String>
String dbName = Util.last(schemaPath);
String tableName = viewPath.get(0);

SqlNode sqlNode = hiveToRelConverter.processView(dbName, tableName)
.accept(new FuzzyUnionSqlRewriter(tableName, hiveToRelConverter));
return hiveToRelConverter.getSqlToRelConverter().convertQuery(sqlNode, true, true);
ViewDependencyTracker.get().enterView(dbName, tableName);
try {
SqlNode sqlNode = hiveToRelConverter.processView(dbName, tableName)
.accept(new FuzzyUnionSqlRewriter(tableName, hiveToRelConverter));
return hiveToRelConverter.getSqlToRelConverter().convertQuery(sqlNode, true, true);
} finally {
ViewDependencyTracker.get().exitView();
}
}
}
37 changes: 33 additions & 4 deletions coral-spark/src/main/java/com/linkedin/coral/spark/CoralSpark.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import com.linkedin.coral.com.google.common.collect.ImmutableList;
import com.linkedin.coral.common.HiveMetastoreClient;
import com.linkedin.coral.common.ViewDependency;
import com.linkedin.coral.common.ViewDependencyTracker;
import com.linkedin.coral.spark.containers.SparkRelInfo;
import com.linkedin.coral.spark.containers.SparkUDFInfo;
import com.linkedin.coral.spark.dialect.SparkSqlDialect;
Expand All @@ -43,14 +45,16 @@
public class CoralSpark {

private final List<String> baseTables;
private final List<ViewDependency> viewDependencies;
private final List<SparkUDFInfo> sparkUDFInfoList;
private final HiveMetastoreClient hiveMetastoreClient;
private final SqlNode sqlNode;
private final String sparkSql;

private CoralSpark(List<String> baseTables, List<SparkUDFInfo> sparkUDFInfoList, String sparkSql,
HiveMetastoreClient hmsClient, SqlNode sqlNode) {
private CoralSpark(List<String> baseTables, List<ViewDependency> viewDependencies,
List<SparkUDFInfo> sparkUDFInfoList, String sparkSql, HiveMetastoreClient hmsClient, SqlNode sqlNode) {
this.baseTables = baseTables;
this.viewDependencies = viewDependencies;
this.sparkUDFInfoList = sparkUDFInfoList;
this.sparkSql = sparkSql;
this.hiveMetastoreClient = hmsClient;
Expand All @@ -74,13 +78,16 @@ private CoralSpark(List<String> baseTables, List<SparkUDFInfo> sparkUDFInfoList,
* @return [[CoralSpark]]
*/
public static CoralSpark create(RelNode irRelNode, HiveMetastoreClient hmsClient) {
// Capture view dependencies that were collected during convertView -> expandView chain
List<ViewDependency> viewDeps = ViewDependencyTracker.get().getViewDependencies();

SparkRelInfo sparkRelInfo = IRRelToSparkRelTransformer.transform(irRelNode);
Set<SparkUDFInfo> sparkUDFInfos = sparkRelInfo.getSparkUDFInfos();
RelNode sparkRelNode = sparkRelInfo.getSparkRelNode();
SqlNode sparkSqlNode = constructSparkSqlNode(sparkRelNode, sparkUDFInfos, hmsClient);
String sparkSQL = constructSparkSQL(sparkSqlNode);
List<String> baseTables = constructBaseTables(sparkRelNode);
return new CoralSpark(baseTables, ImmutableList.copyOf(sparkUDFInfos), sparkSQL, hmsClient, sparkSqlNode);
return new CoralSpark(baseTables, viewDeps, ImmutableList.copyOf(sparkUDFInfos), sparkSQL, hmsClient, sparkSqlNode);
}

/**
Expand All @@ -99,6 +106,9 @@ public static CoralSpark create(RelNode irRelNode, Schema schema, HiveMetastoreC
}

private static CoralSpark createWithAlias(RelNode irRelNode, List<String> aliases, HiveMetastoreClient hmsClient) {
// Capture view dependencies that were collected during convertView -> expandView chain
List<ViewDependency> viewDeps = ViewDependencyTracker.get().getViewDependencies();

SparkRelInfo sparkRelInfo = IRRelToSparkRelTransformer.transform(irRelNode);
Set<SparkUDFInfo> sparkUDFInfos = sparkRelInfo.getSparkUDFInfos();
RelNode sparkRelNode = sparkRelInfo.getSparkRelNode();
Expand All @@ -111,7 +121,7 @@ private static CoralSpark createWithAlias(RelNode irRelNode, List<String> aliase
}
String sparkSQL = constructSparkSQL(sparkSqlNode);
List<String> baseTables = constructBaseTables(sparkRelNode);
return new CoralSpark(baseTables, ImmutableList.copyOf(sparkUDFInfos), sparkSQL, hmsClient, sparkSqlNode);
return new CoralSpark(baseTables, viewDeps, ImmutableList.copyOf(sparkUDFInfos), sparkSQL, hmsClient, sparkSqlNode);
}

private static SqlNode constructSparkSqlNode(RelNode sparkRelNode, Set<SparkUDFInfo> sparkUDFInfos,
Expand Down Expand Up @@ -165,6 +175,25 @@ public List<String> getBaseTables() {
return baseTables;
}

/**
* Getter for the view dependency chain collected during view expansion.
* Each {@link ViewDependency} represents a view and its immediate dependencies
* (which can be other views or base tables) in "database_name.table_name" format.
*
* <p>For example, if view "db.v1" depends on view "db.v2" and table "db.t1",
* and "db.v2" depends on tables "db.t3" and "db.t4", this returns:
* <pre>
* [ViewDependency("db.v1", ["db.v2", "db.t1"]),
* ViewDependency("db.v2", ["db.t3", "db.t4"])]
* </pre>
*
* @return List of {@link ViewDependency} representing the view dependency chain,
* or an empty list if no views were involved.
*/
public List<ViewDependency> getViewDependencies() {
return viewDependencies;
}

/**
* Getter for Spark UDF information list:
* Additional information required to use an UDF (for details, read [[SparkUDFInfo]])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.testng.annotations.Test;

import com.linkedin.coral.com.google.common.collect.ImmutableList;
import com.linkedin.coral.common.ViewDependency;
import com.linkedin.coral.common.ViewDependencyTracker;
import com.linkedin.coral.hive.hive2rel.functions.StaticHiveFunctionRegistry;
import com.linkedin.coral.spark.containers.SparkUDFInfo;
import com.linkedin.coral.spark.exceptions.UnsupportedUDFException;
Expand Down Expand Up @@ -74,6 +76,60 @@ public void testGetBaseTablesFromView() {
assertTrue(base_tables.contains("default.bar"));
}

@Test
public void testGetViewDependenciesFromNestedView() {
// foo_bar_view depends on foo_view (a view) and bar (a table)
// foo_view depends on foo (a table)
// Expected chain:
// foo_bar_view -> [foo_view, bar]
// foo_view -> [foo]
ViewDependencyTracker.reset();
RelNode relNode = TestUtils.toRelNode("default", "foo_bar_view");
CoralSpark coralSpark = createCoralSpark(relNode);
List<ViewDependency> viewDeps = coralSpark.getViewDependencies();

assertFalse(viewDeps.isEmpty(), "Expected view dependencies for nested view");

ViewDependency fooBarViewDep =
viewDeps.stream().filter(vd -> vd.getViewName().equals("default.foo_bar_view")).findFirst().orElse(null);
assertNotNull(fooBarViewDep, "Expected dependency entry for foo_bar_view");
assertTrue(fooBarViewDep.getDependencies().contains("default.foo_view"), "foo_bar_view should depend on foo_view");
assertTrue(fooBarViewDep.getDependencies().contains("default.bar"), "foo_bar_view should depend on bar");

ViewDependency fooViewDep =
viewDeps.stream().filter(vd -> vd.getViewName().equals("default.foo_view")).findFirst().orElse(null);
assertNotNull(fooViewDep, "Expected dependency entry for foo_view");
assertTrue(fooViewDep.getDependencies().contains("default.foo"), "foo_view should depend on foo");
}

@Test
public void testGetViewDependenciesFromSimpleView() {
// foo_view depends only on foo (a base table)
// Expected: single entry foo_view -> [foo]
ViewDependencyTracker.reset();
RelNode relNode = TestUtils.toRelNode("default", "foo_view");
CoralSpark coralSpark = createCoralSpark(relNode);
List<ViewDependency> viewDeps = coralSpark.getViewDependencies();

assertFalse(viewDeps.isEmpty(), "Expected view dependencies for simple view");
assertEquals(viewDeps.size(), 1, "Expected exactly one view dependency entry");

ViewDependency dep = viewDeps.get(0);
assertEquals(dep.getViewName(), "default.foo_view");
assertTrue(dep.getDependencies().contains("default.foo"), "foo_view should depend on foo");
}

@Test
public void testGetViewDependenciesFromBaseTable() {
// A base table should have no view dependencies
ViewDependencyTracker.reset();
RelNode relNode = TestUtils.toRelNode("default", "foo");
CoralSpark coralSpark = createCoralSpark(relNode);
List<ViewDependency> viewDeps = coralSpark.getViewDependencies();

assertTrue(viewDeps.isEmpty(), "Expected no view dependencies for a base table");
}

@Test
public void testQuotingKeywords() {
RelNode relNode = TestUtils.toRelNode("default", "baz_view");
Expand Down