fNames = rType.getFieldNames();
+ return TypeInfoFactory.getStructTypeInfo(fNames, fTypes);
+ }
+
+ public static TypeInfo convertMapType(RelDataType rType) {
+ return TypeInfoFactory.getMapTypeInfo(convert(rType.getKeyType()), convert(rType.getValueType()));
+ }
+
+ public static TypeInfo convertListType(RelDataType rType) {
+ return TypeInfoFactory.getListTypeInfo(convert(rType.getComponentType()));
+ }
+
+ public static TypeInfo convertPrimtiveType(RelDataType rType) {
+ switch (rType.getSqlTypeName()) {
+ case BOOLEAN:
+ return TypeInfoFactory.booleanTypeInfo;
+ case TINYINT:
+ return TypeInfoFactory.byteTypeInfo;
+ case SMALLINT:
+ return TypeInfoFactory.shortTypeInfo;
+ case INTEGER:
+ return TypeInfoFactory.intTypeInfo;
+ case BIGINT:
+ return TypeInfoFactory.longTypeInfo;
+ case FLOAT:
+ return TypeInfoFactory.floatTypeInfo;
+ case DOUBLE:
+ return TypeInfoFactory.doubleTypeInfo;
+ case DATE:
+ return TypeInfoFactory.dateTypeInfo;
+ case TIMESTAMP:
+ return TypeInfoFactory.timestampTypeInfo;
+ case BINARY:
+ return TypeInfoFactory.binaryTypeInfo;
+ case DECIMAL:
+ return TypeInfoFactory.getDecimalTypeInfo(rType.getPrecision(), rType.getScale());
+ case VARCHAR:
+ if (rType.getPrecision() == Integer.MAX_VALUE) {
+ return TypeInfoFactory.getPrimitiveTypeInfo(serdeConstants.STRING_TYPE_NAME);
+ } else {
+ return TypeInfoFactory.getVarcharTypeInfo(rType.getPrecision());
+ }
+ case CHAR:
+ if (rType.getPrecision() > HiveChar.MAX_CHAR_LENGTH) {
+ return TypeInfoFactory.getVarcharTypeInfo(rType.getPrecision());
+ } else {
+ return TypeInfoFactory.getCharTypeInfo(rType.getPrecision());
+ }
+ case OTHER:
+ default:
+ return TypeInfoFactory.voidTypeInfo;
+ }
+ }
+}
diff --git a/coral-catalog-hive/src/main/resources/META-INF/services/com.linkedin.coral.common.catalog.CoralCalciteTableAdapterFactory b/coral-catalog-hive/src/main/resources/META-INF/services/com.linkedin.coral.common.catalog.CoralCalciteTableAdapterFactory
new file mode 100644
index 000000000..99c22cacf
--- /dev/null
+++ b/coral-catalog-hive/src/main/resources/META-INF/services/com.linkedin.coral.common.catalog.CoralCalciteTableAdapterFactory
@@ -0,0 +1 @@
+com.linkedin.coral.catalog.hive.HiveCalciteTableAdapterFactory
diff --git a/coral-catalog-iceberg/build.gradle b/coral-catalog-iceberg/build.gradle
new file mode 100644
index 000000000..f74ad16dc
--- /dev/null
+++ b/coral-catalog-iceberg/build.gradle
@@ -0,0 +1,20 @@
+apply plugin: 'java-library'
+
+dependencies {
+ api project(':coral-catalog-spi')
+
+ api deps.'linkedin-iceberg'.'iceberg-api'
+ api deps.'linkedin-iceberg'.'iceberg-core'
+
+ // Temporary: needed for IcebergHiveTableConverter bridge code (issue #575)
+ implementation(deps.'linkedin-iceberg'.'iceberg-hive-metastore') {
+ exclude group: 'org.apache.hive', module: 'hive-metastore'
+ exclude group: 'org.apache.hadoop', module: 'hadoop-common'
+ }
+ implementation(deps.'hive'.'hive-metastore') {
+ exclude group: 'com.linkedin.metastore-autometrics', module: 'autometrics-reporter'
+ exclude group: 'com.linkedin.metastore-audit', module: 'metastore-audit-logging'
+ exclude group: 'org.apache.avro', module: 'avro-tools'
+ }
+ implementation deps.'hadoop'.'hadoop-common'
+}
diff --git a/coral-catalog-iceberg/src/main/java/com/linkedin/coral/catalog/iceberg/IcebergCalciteTableAdapter.java b/coral-catalog-iceberg/src/main/java/com/linkedin/coral/catalog/iceberg/IcebergCalciteTableAdapter.java
new file mode 100644
index 000000000..c7e6705de
--- /dev/null
+++ b/coral-catalog-iceberg/src/main/java/com/linkedin/coral/catalog/iceberg/IcebergCalciteTableAdapter.java
@@ -0,0 +1,79 @@
+/**
+ * Copyright 2017-2026 LinkedIn Corporation. All rights reserved.
+ * Licensed under the BSD-2 Clause license.
+ * See LICENSE in the project root for license information.
+ */
+package com.linkedin.coral.catalog.iceberg;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.calcite.DataContext;
+import org.apache.calcite.config.CalciteConnectionConfig;
+import org.apache.calcite.linq4j.Enumerable;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.schema.ScannableTable;
+import org.apache.calcite.schema.Schema;
+import org.apache.calcite.schema.Statistic;
+import org.apache.calcite.schema.Statistics;
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlNode;
+
+import com.linkedin.coral.common.catalog.TableType;
+import com.linkedin.coral.common.types.CoralTypeToRelDataTypeConverter;
+import com.linkedin.coral.common.types.StructType;
+
+
+/**
+ * Calcite adapter for Apache Iceberg tables, bridging Iceberg metadata to Calcite's ScannableTable interface.
+ *
+ * Uses two-stage conversion: Iceberg -> Coral -> Calcite.
+ *
+ * @see Issue #575: Refactor ParseTreeBuilder to Use CoralTable
+ */
+public class IcebergCalciteTableAdapter implements ScannableTable {
+
+ private final IcebergTable coralTable;
+
+ /**
+ * Creates IcebergCalciteTableAdapter from IcebergTable.
+ *
+ * @param coralTable IcebergTable from catalog
+ */
+ public IcebergCalciteTableAdapter(IcebergTable coralTable) {
+ Preconditions.checkNotNull(coralTable);
+ this.coralTable = coralTable;
+ }
+
+ @Override
+ public RelDataType getRowType(RelDataTypeFactory typeFactory) {
+ StructType structType = (StructType) coralTable.getSchema();
+ return CoralTypeToRelDataTypeConverter.convert(structType, typeFactory);
+ }
+
+ @Override
+ public Statistic getStatistic() {
+ return Statistics.UNKNOWN;
+ }
+
+ @Override
+ public Schema.TableType getJdbcTableType() {
+ return coralTable.tableType() == TableType.VIEW ? Schema.TableType.VIEW : Schema.TableType.TABLE;
+ }
+
+ @Override
+ public boolean isRolledUp(String column) {
+ return false;
+ }
+
+ @Override
+ public boolean rolledUpColumnValidInsideAgg(String column, SqlCall call, SqlNode parent,
+ CalciteConnectionConfig config) {
+ return true;
+ }
+
+ @Override
+ public Enumerable