diff --git a/ingestion/src/metadata/ingestion/source/database/common_db_source.py b/ingestion/src/metadata/ingestion/source/database/common_db_source.py index 627f81944219..da26ad973a88 100644 --- a/ingestion/src/metadata/ingestion/source/database/common_db_source.py +++ b/ingestion/src/metadata/ingestion/source/database/common_db_source.py @@ -353,7 +353,7 @@ def query_view_names_and_types(self, schema_name: str) -> Iterable[TableNameAndT for table_name in self.inspector.get_view_names(schema_name) or [] ] - def get_tables_name_and_type(self) -> Optional[Iterable[Tuple[str, str]]]: # noqa: UP006, UP045 + def get_tables_name_and_type(self) -> Optional[Iterable[Tuple[str, TableType]]]: # noqa: UP006, UP045 """ Handle table and views. @@ -363,9 +363,15 @@ def get_tables_name_and_type(self) -> Optional[Iterable[Tuple[str, str]]]: # no :return: tables or views, depending on config """ schema_name = self.context.get().database_schema - try: - if self.source_config.includeTables: - for table_and_type in self.query_table_names_and_types(schema_name): + if self.source_config.includeTables: + try: + table_iter = self.query_table_names_and_types(schema_name) + except Exception as err: + logger.warning(f"Fetching table list failed for schema {schema_name} due to - {err}") + logger.debug(traceback.format_exc()) + table_iter = [] + for table_and_type in table_iter: + try: table_name = self.standardize_table_name(schema_name, table_and_type.name) table_fqn = fqn.build( self.metadata, @@ -385,10 +391,21 @@ def get_tables_name_and_type(self) -> Optional[Iterable[Tuple[str, str]]]: # no "Table Filtered Out", ) continue - yield table_name, table_and_type.type_ + except Exception as err: + logger.warning(f"Skipping table {table_and_type.name!r} in schema {schema_name} due to - {err}") + logger.debug(traceback.format_exc()) + continue + yield table_name, table_and_type.type_ - if self.source_config.includeViews: - for view_and_type in self.query_view_names_and_types(schema_name): + if self.source_config.includeViews: + try: + view_iter = self.query_view_names_and_types(schema_name) + except Exception as err: + logger.warning(f"Fetching view list failed for schema {schema_name} due to - {err}") + logger.debug(traceback.format_exc()) + view_iter = [] + for view_and_type in view_iter: + try: view_name = self.standardize_table_name(schema_name, view_and_type.name) view_fqn = fqn.build( self.metadata, @@ -408,10 +425,11 @@ def get_tables_name_and_type(self) -> Optional[Iterable[Tuple[str, str]]]: # no "Table Filtered Out", ) continue - yield view_name, view_and_type.type_ - except Exception as err: - logger.warning(f"Fetching tables names failed for schema {schema_name} due to - {err}") - logger.debug(traceback.format_exc()) + except Exception as err: + logger.warning(f"Skipping view {view_and_type.name!r} in schema {schema_name} due to - {err}") + logger.debug(traceback.format_exc()) + continue + yield view_name, view_and_type.type_ @calculate_execution_time() def get_schema_definition( diff --git a/ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py b/ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py index 49243ef3cfd2..c1d422afb9de 100644 --- a/ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py @@ -688,21 +688,25 @@ def _get_table_names_and_types( **({"include_transient_tables": True} if self.service_connection.includeTransientTables else {}), ) - self.context.get_global().deleted_tables.extend( - [ - fqn.build( - metadata=self.metadata, - entity_type=Table, - service_name=self.context.get().database_service, - database_name=self.context.get().database, - schema_name=schema_name, - table_name=table.name, + deleted_fqns = [] + for table in snowflake_tables.get_deleted(): # pyright: ignore[reportAttributeAccessIssue] + try: + deleted_fqns.append( + fqn.build( + metadata=self.metadata, + entity_type=Table, + service_name=self.context.get().database_service, # pyright: ignore[reportAttributeAccessIssue] + database_name=self.context.get().database, # pyright: ignore[reportAttributeAccessIssue] + schema_name=schema_name, + table_name=table.name, + ) ) - for table in snowflake_tables.get_deleted() - ] - ) + except Exception as err: + logger.warning(f"Skipping deleted-table FQN for {table.name!r} in schema {schema_name}: {err}") + logger.debug(traceback.format_exc()) + self.context.get_global().deleted_tables.extend(deleted_fqns) - return [TableNameAndType(name=table.name, type_=table.type_) for table in snowflake_tables.get_not_deleted()] + return [TableNameAndType(name=table.name, type_=table.type_) for table in snowflake_tables.get_not_deleted()] # pyright: ignore[reportAttributeAccessIssue] def _get_stream_names_and_types(self, schema_name: str) -> List[TableNameAndType]: # noqa: UP006 table_type = TableType.Stream diff --git a/ingestion/src/metadata/ingestion/source/database/snowflake/utils.py b/ingestion/src/metadata/ingestion/source/database/snowflake/utils.py index f59a0bf3887c..0d31464b0a31 100644 --- a/ingestion/src/metadata/ingestion/source/database/snowflake/utils.py +++ b/ingestion/src/metadata/ingestion/source/database/snowflake/utils.py @@ -406,7 +406,15 @@ def get_schema_columns(self, connection, schema, **kw): identity_increment, ordinal_position, ) in result: - table_name = self.normalize_name(fqn.quote_name(table_name)) # noqa: PLW2901 + try: + table_name = self.normalize_name(fqn.quote_name(table_name)) # noqa: PLW2901 + except ValueError: + logger.warning( + "Skipping column row in schema %s with unsupported table name %r", + schema, + table_name, + ) + continue column_name = self.normalize_name(column_name) # noqa: PLW2901 if table_name not in ans: ans[table_name] = [] diff --git a/ingestion/src/metadata/ingestion/source/mcp/client.py b/ingestion/src/metadata/ingestion/source/mcp/client.py index fe253a507498..686b6bcc9d9e 100644 --- a/ingestion/src/metadata/ingestion/source/mcp/client.py +++ b/ingestion/src/metadata/ingestion/source/mcp/client.py @@ -283,7 +283,7 @@ def send_notification(self, method: str, params: Optional[Dict] = None) -> None: try: self.session.post( f"{self.url}/mcp", - json=notification, + json=notification, # pyright: ignore[reportArgumentType] timeout=self.timeout, ) except Exception as e: @@ -302,7 +302,7 @@ def send_request(self, method: str, params: Optional[Dict] = None) -> Dict[str, try: response = self.session.post( f"{self.url}/mcp", - json=request, + json=request, # pyright: ignore[reportArgumentType] timeout=self.timeout, ) response.raise_for_status() diff --git a/ingestion/tests/unit/test_fqn.py b/ingestion/tests/unit/test_fqn.py index 198fb086a771..81ce5f7e5541 100644 --- a/ingestion/tests/unit/test_fqn.py +++ b/ingestion/tests/unit/test_fqn.py @@ -105,6 +105,25 @@ def test_quote_name(self): fqn.quote_name('a"b') self.assertEqual('Invalid name a"b', str(context.exception)) + def test_quote_name_rejects_newline(self): + """ + Names with embedded newlines (which Snowflake's ``information_schema`` + occasionally returns when source tables were created from scripts that + forgot to strip a trailing ``\\n``) are not valid OpenMetadata FQN + components — the OM server's ``quoteName`` rejects them too. Python's + ``quote_name`` therefore raises here to keep the client/server + contract consistent. The defensive try/except added to + ``_get_schema_columns`` (snowflake/utils.py) and + ``CommonDbSourceService.get_tables_name_and_type`` + (common_db_source.py) catch this ValueError and let the rest of the + schema continue ingesting. + """ + with self.assertRaises(ValueError) as context: + fqn.quote_name("REPRO_BACKUP\n ") + self.assertIn("Invalid name", str(context.exception)) + with self.assertRaises(ValueError): + fqn.quote_name("a\nb") + def test_invalid(self): with self.assertRaises(Exception): # noqa: B017 fqn.split('a.."') diff --git a/ingestion/tests/unit/topology/database/test_snowflake.py b/ingestion/tests/unit/topology/database/test_snowflake.py index 5331291a3f55..06f47b2fc71b 100644 --- a/ingestion/tests/unit/topology/database/test_snowflake.py +++ b/ingestion/tests/unit/topology/database/test_snowflake.py @@ -845,3 +845,129 @@ def test_yields_database_names_in_order(self): names = list(source.get_database_names_raw()) assert names == ["DB_A", "DB_B", "DB_C"] + + +class SnowflakeBadNameIsolationTest(TestCase): + """ + Regression tests for the fault-isolation paths added so that a single + invalid table name in a schema does not poison ingestion for unrelated + tables. See: + - snowflake/utils.py::get_schema_columns (per-row try/except) + - snowflake/metadata.py::_get_table_names_and_types + (per-table try/except around deleted-tables FQN listcomp) + """ + + @staticmethod + def _column_row(table_name, column_name, ordinal): + """Build a row tuple in the shape _get_schema_columns iterates over.""" + return ( + table_name, + column_name, + "NUMBER", # coltype + None, # character_maximum_length + 38, # numeric_precision + 0, # numeric_scale + "YES", # is_nullable + None, # column_default + "NO", # is_identity + None, # comment + None, # identity_start + None, # identity_increment + ordinal, # ordinal_position + ) + + def test_get_schema_columns_skips_invalid_table_name(self): + """A row in information_schema.columns whose table_name cannot be + FQN-quoted must be skipped, and columns for valid tables in the same + result must still be populated.""" + from snowflake.sqlalchemy.snowdialect import SnowflakeDialect + + from metadata.ingestion.source.database.snowflake.utils import ( + get_schema_columns, + ) + + dialect = SnowflakeDialect() + # The function calls these on `self`; stub them. + dialect._current_database_schema = Mock(return_value=("DB", "SCHEMA")) + dialect._get_schema_primary_keys = Mock(return_value={}) + + rows = [ + self._column_row("GOOD_TBL", "ID", 1), + # Unbalanced quote — quote_name raises ValueError, even with re.DOTALL. + self._column_row('BAD"NAME', "X", 1), + self._column_row("GOOD_TBL", "NAME", 2), + ] + + mock_connection = Mock() + mock_connection.execute = Mock(return_value=iter(rows)) + + result = get_schema_columns(dialect, mock_connection, schema="SCHEMA", info_cache={}) + + # The good table's columns were populated even though a bad-named row + # appeared between them — fault isolation at the per-row level. + good_key = next(k for k in result if k.lower() == "good_tbl") + self.assertEqual(len(result[good_key]), 2) + self.assertEqual([c["name"].lower() for c in result[good_key]], ["id", "name"]) + # The bad-named row was skipped, not added under any case-variant key. + self.assertFalse(any("bad" in k.lower() for k in result)) + + def test_get_table_names_skips_deleted_with_invalid_name(self): + """A deleted table whose name cannot be FQN-quoted must not abort the + listcomp that populates context.deleted_tables — valid deletions + before/after the bad row should still be recorded.""" + from datetime import datetime + + from metadata.ingestion.source.database.snowflake.models import ( + SnowflakeTable, + SnowflakeTableList, + ) + + source = self.sources["not_incremental"] if hasattr(self, "sources") else None + if source is None: + source = next(iter(get_snowflake_sources().values())) + + deleted_at = datetime(2026, 1, 1) + snowflake_tables = SnowflakeTableList( + tables=[ + SnowflakeTable(name="GOOD_GONE", deleted=deleted_at, type_=TableType.Regular), + SnowflakeTable(name='BAD"GONE', deleted=deleted_at, type_=TableType.Regular), + SnowflakeTable(name="ALIVE_TBL", deleted=None, type_=TableType.Regular), + ] + ) + + mock_inspector = MagicMock() + mock_inspector.get_table_names = Mock(return_value=snowflake_tables) + source.context.get().__dict__["database_service"] = "svc" + source.context.get().__dict__["database"] = "db" + source.context.get_global().deleted_tables = [] + + def fake_fqn_build(*, metadata, entity_type, service_name, database_name, schema_name, table_name, **_kw): + from metadata.utils.fqn import quote_name + + # quote_name still rejects names with embedded `"`; let that drive the failure. + quote_name(table_name) + return f"{service_name}.{database_name}.{schema_name}.{table_name}" + + with ( + patch.object(SnowflakeSource, "inspector", new_callable=PropertyMock) as p, + patch( + "metadata.ingestion.source.database.snowflake.metadata.fqn.build", + side_effect=fake_fqn_build, + ), + ): + p.return_value = mock_inspector + not_deleted = source._get_table_names_and_types("SCHEMA") + + # Iteration completed and yielded the alive table. + names = [t.name for t in not_deleted] + self.assertEqual(names, ["ALIVE_TBL"]) + # The good deleted FQN was recorded; the bad-named one was skipped. + recorded = source.context.get_global().deleted_tables + self.assertEqual(len(recorded), 1) + self.assertIn("GOOD_GONE", recorded[0]) + self.assertNotIn("BAD", " ".join(recorded)) + + def setUp(self): + # Build a snowflake source we can mutate per-test. + if not hasattr(self, "sources"): + self.sources = get_snowflake_sources() diff --git a/ingestion/tests/unit/topology/test_common_db_source_isolation.py b/ingestion/tests/unit/topology/test_common_db_source_isolation.py new file mode 100644 index 000000000000..c702b01bce9d --- /dev/null +++ b/ingestion/tests/unit/topology/test_common_db_source_isolation.py @@ -0,0 +1,141 @@ +# Copyright 2025 Collate +# Licensed under the Collate Community License, Version 1.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Tests for per-iteration fault isolation in +`CommonDbSourceService.get_tables_name_and_type`. + +A single table whose name cannot be FQN-built (or whose filter check fails) +must be recorded as a per-table failure on `self.status`, and the loop must +continue with the remaining tables and views in the schema. +""" + +from unittest.mock import MagicMock, patch + +import pytest + +from metadata.generated.schema.entity.data.table import TableType +from metadata.ingestion.source.database.common_db_source import TableNameAndType +from metadata.ingestion.source.database.snowflake.metadata import SnowflakeSource +from metadata.utils.fqn import FQNBuildingException + + +@pytest.fixture +def source(): + """Build a minimal CommonDbSourceService instance via the concrete + SnowflakeSource subclass, without invoking __init__.""" + instance = SnowflakeSource.__new__(SnowflakeSource) + instance.metadata = MagicMock() + instance.status = MagicMock() + instance.source_config = MagicMock() + instance.source_config.includeTables = True + instance.source_config.includeViews = True + instance.source_config.useFqnForFiltering = False + instance.source_config.tableFilterPattern = None + instance.context = MagicMock() + context_state = MagicMock() + context_state.database_service = "svc" + context_state.database = "db" + context_state.database_schema = "schema" + instance.context.get.return_value = context_state + return instance + + +def _fqn_side_effect(*, bad_name): + """fqn.build that raises FQNBuildingException only for `bad_name`.""" + + def _build(_metadata, *, entity_type, service_name, database_name, schema_name, table_name, **_): + if table_name == bad_name: + raise FQNBuildingException(f"Error building FQN for Table: Invalid name {table_name}") + return f"{service_name}.{database_name}.{schema_name}.{table_name}" + + return _build + + +def test_get_tables_name_and_type_isolates_failed_table(caplog, source): + """A bad-name table is logged and skipped; valid tables before AND after + it are still yielded. The bad table is NOT escalated to ``status.failed`` + — per-iteration failures stay as warnings.""" + import logging + + source.query_table_names_and_types = MagicMock( + return_value=[ + TableNameAndType(name="GOOD_1", type_=TableType.Regular), + TableNameAndType(name='BAD"NAME', type_=TableType.Regular), + TableNameAndType(name="GOOD_2", type_=TableType.Regular), + ] + ) + source.query_view_names_and_types = MagicMock(return_value=[]) + source.standardize_table_name = lambda _schema, name: name + + with ( + patch( + "metadata.ingestion.source.database.common_db_source.fqn.build", + side_effect=_fqn_side_effect(bad_name='BAD"NAME'), + ), + caplog.at_level(logging.WARNING, logger="metadata.Ingestion"), + ): + yielded = list(source.get_tables_name_and_type()) + + assert [(n, t) for n, t in yielded] == [ + ("GOOD_1", TableType.Regular), + ("GOOD_2", TableType.Regular), + ] + # Not escalated to status.failed — just a warning log. + assert source.status.failed.call_count == 0 + warning_text = "\n".join(rec.message for rec in caplog.records) + assert "BAD" in warning_text + assert "Skipping table" in warning_text + + +def test_get_tables_name_and_type_isolates_failed_view(caplog, source): + """Same warn-and-continue contract for views.""" + import logging + + source.query_table_names_and_types = MagicMock(return_value=[]) + source.query_view_names_and_types = MagicMock( + return_value=[ + TableNameAndType(name="V_GOOD", type_=TableType.View), + TableNameAndType(name='V"BAD', type_=TableType.View), + ] + ) + source.standardize_table_name = lambda _schema, name: name + + with ( + patch( + "metadata.ingestion.source.database.common_db_source.fqn.build", + side_effect=_fqn_side_effect(bad_name='V"BAD'), + ), + caplog.at_level(logging.WARNING, logger="metadata.Ingestion"), + ): + yielded = list(source.get_tables_name_and_type()) + + assert yielded == [("V_GOOD", TableType.View)] + assert source.status.failed.call_count == 0 + warning_text = "\n".join(rec.message for rec in caplog.records) + assert "V" in warning_text + assert "Skipping view" in warning_text + + +def test_get_tables_name_and_type_handles_listing_failure(source): + """If query_table_names_and_types itself raises, the function logs a + warning and proceeds with the view loop (no crash).""" + source.query_table_names_and_types = MagicMock(side_effect=RuntimeError("upstream listing exploded")) + source.query_view_names_and_types = MagicMock(return_value=[TableNameAndType(name="V1", type_=TableType.View)]) + source.standardize_table_name = lambda _schema, name: name + + with patch( + "metadata.ingestion.source.database.common_db_source.fqn.build", + side_effect=_fqn_side_effect(bad_name="__never_matches__"), + ): + yielded = list(source.get_tables_name_and_type()) + + assert yielded == [("V1", TableType.View)]