Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@
for table_name in self.inspector.get_view_names(schema_name) or []
]

def get_tables_name_and_type(self) -> Optional[Iterable[Tuple[str, str]]]: # noqa: UP006, UP045
def get_tables_name_and_type(self) -> Optional[Iterable[Tuple[str, TableType]]]: # noqa: UP006, UP045

Check failure on line 356 in ingestion/src/metadata/ingestion/source/database/common_db_source.py

View check run for this annotation

SonarQubeCloud / [open-metadata-ingestion] SonarCloud Code Analysis

Refactor this function to reduce its Cognitive Complexity from 28 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=open-metadata-ingestion&issues=AZ4cEVw1NG2cpcEijiv1&open=AZ4cEVw1NG2cpcEijiv1&pullRequest=28060
"""
Handle table and views.

Expand All @@ -363,9 +363,15 @@
:return: tables or views, depending on config
"""
schema_name = self.context.get().database_schema
try:
if self.source_config.includeTables:
for table_and_type in self.query_table_names_and_types(schema_name):
if self.source_config.includeTables:
try:
table_iter = self.query_table_names_and_types(schema_name)
except Exception as err:
logger.warning(f"Fetching table list failed for schema {schema_name} due to - {err}")
logger.debug(traceback.format_exc())
Comment on lines +366 to +371
table_iter = []
for table_and_type in table_iter:
try:
table_name = self.standardize_table_name(schema_name, table_and_type.name)
table_fqn = fqn.build(
self.metadata,
Expand All @@ -385,10 +391,21 @@
"Table Filtered Out",
)
continue
yield table_name, table_and_type.type_
except Exception as err:
logger.warning(f"Skipping table {table_and_type.name!r} in schema {schema_name} due to - {err}")
logger.debug(traceback.format_exc())
continue
yield table_name, table_and_type.type_
Comment on lines 373 to +398
Comment on lines +394 to +398

if self.source_config.includeViews:
for view_and_type in self.query_view_names_and_types(schema_name):
if self.source_config.includeViews:
try:
view_iter = self.query_view_names_and_types(schema_name)
except Exception as err:
logger.warning(f"Fetching view list failed for schema {schema_name} due to - {err}")
logger.debug(traceback.format_exc())
view_iter = []
for view_and_type in view_iter:
try:
view_name = self.standardize_table_name(schema_name, view_and_type.name)
view_fqn = fqn.build(
self.metadata,
Expand All @@ -408,10 +425,11 @@
"Table Filtered Out",
)
continue
yield view_name, view_and_type.type_
except Exception as err:
logger.warning(f"Fetching tables names failed for schema {schema_name} due to - {err}")
logger.debug(traceback.format_exc())
except Exception as err:
logger.warning(f"Skipping view {view_and_type.name!r} in schema {schema_name} due to - {err}")
logger.debug(traceback.format_exc())
continue
yield view_name, view_and_type.type_
Comment on lines 400 to +432

@calculate_execution_time()
def get_schema_definition(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@
return True, partition_details
return False, None

def yield_tag(self, schema_name: str) -> Iterable[Either[OMetaTagAndClassification]]:

Check failure on line 516 in ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py

View check run for this annotation

SonarQubeCloud / [open-metadata-ingestion] SonarCloud Code Analysis

Refactor this function to reduce its Cognitive Complexity from 23 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=open-metadata-ingestion&issues=AZ4cEVwTNG2cpcEijivy&open=AZ4cEVwTNG2cpcEijivy&pullRequest=28060
"""
Yield tags for tables/columns and schemas.
"""
Expand Down Expand Up @@ -688,21 +688,25 @@
**({"include_transient_tables": True} if self.service_connection.includeTransientTables else {}),
)

self.context.get_global().deleted_tables.extend(
[
fqn.build(
metadata=self.metadata,
entity_type=Table,
service_name=self.context.get().database_service,
database_name=self.context.get().database,
schema_name=schema_name,
table_name=table.name,
deleted_fqns = []
for table in snowflake_tables.get_deleted(): # pyright: ignore[reportAttributeAccessIssue]
try:
deleted_fqns.append(
fqn.build(
metadata=self.metadata,
entity_type=Table,
service_name=self.context.get().database_service, # pyright: ignore[reportAttributeAccessIssue]
database_name=self.context.get().database, # pyright: ignore[reportAttributeAccessIssue]
schema_name=schema_name,
table_name=table.name,
)
)
for table in snowflake_tables.get_deleted()
]
)
except Exception as err:
logger.warning(f"Skipping deleted-table FQN for {table.name!r} in schema {schema_name}: {err}")
logger.debug(traceback.format_exc())
self.context.get_global().deleted_tables.extend(deleted_fqns)

return [TableNameAndType(name=table.name, type_=table.type_) for table in snowflake_tables.get_not_deleted()]
return [TableNameAndType(name=table.name, type_=table.type_) for table in snowflake_tables.get_not_deleted()] # pyright: ignore[reportAttributeAccessIssue]

def _get_stream_names_and_types(self, schema_name: str) -> List[TableNameAndType]: # noqa: UP006
table_type = TableType.Stream
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,15 @@
identity_increment,
ordinal_position,
) in result:
table_name = self.normalize_name(fqn.quote_name(table_name)) # noqa: PLW2901
try:
table_name = self.normalize_name(fqn.quote_name(table_name)) # noqa: PLW2901
except ValueError:
logger.warning(
"Skipping column row in schema %s with unsupported table name %r",
schema,
table_name,
)
continue
column_name = self.normalize_name(column_name) # noqa: PLW2901
if table_name not in ans:
ans[table_name] = []
Expand Down Expand Up @@ -499,7 +507,7 @@

@reflection.cache
def get_schema_foreign_keys(self, connection, schema, **kw):
current_database, current_schema = self._current_database_schema(connection, **kw) # noqa: RUF059

Check warning on line 510 in ingestion/src/metadata/ingestion/source/database/snowflake/utils.py

View check run for this annotation

SonarQubeCloud / [open-metadata-ingestion] SonarCloud Code Analysis

Replace the unused local variable "current_database" with "_".

See more on https://sonarcloud.io/project/issues?id=open-metadata-ingestion&issues=AZ4cEVtaNG2cpcEijivw&open=AZ4cEVtaNG2cpcEijivw&pullRequest=28060
result = connection.execute(
text(f"SHOW /* sqlalchemy:_get_schema_foreign_keys */ IMPORTED KEYS IN SCHEMA {schema}")
)
Expand Down Expand Up @@ -533,7 +541,7 @@

ans = {}

for _, v in foreign_key_map.items(): # noqa: PERF102

Check warning on line 544 in ingestion/src/metadata/ingestion/source/database/snowflake/utils.py

View check run for this annotation

SonarQubeCloud / [open-metadata-ingestion] SonarCloud Code Analysis

Modify this loop to iterate over the dictionary's values.

See more on https://sonarcloud.io/project/issues?id=open-metadata-ingestion&issues=AZ4cEVtaNG2cpcEijivx&open=AZ4cEVtaNG2cpcEijivx&pullRequest=28060
if v["table_name"] not in ans:
ans[v["table_name"]] = []
ans[v["table_name"]].append({k2: v2 for k2, v2 in v.items() if k2 != "table_name"})
Expand Down
4 changes: 2 additions & 2 deletions ingestion/src/metadata/ingestion/source/mcp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ def send_notification(self, method: str, params: Optional[Dict] = None) -> None:
try:
self.session.post(
f"{self.url}/mcp",
json=notification,
json=notification, # pyright: ignore[reportArgumentType]
timeout=self.timeout,
)
Comment on lines 283 to 288
except Exception as e:
Expand All @@ -302,7 +302,7 @@ def send_request(self, method: str, params: Optional[Dict] = None) -> Dict[str,
try:
response = self.session.post(
f"{self.url}/mcp",
json=request,
json=request, # pyright: ignore[reportArgumentType]
timeout=self.timeout,
)
response.raise_for_status()
Expand Down
19 changes: 19 additions & 0 deletions ingestion/tests/unit/test_fqn.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,25 @@ def test_quote_name(self):
fqn.quote_name('a"b')
self.assertEqual('Invalid name a"b', str(context.exception))

def test_quote_name_rejects_newline(self):
"""
Names with embedded newlines (which Snowflake's ``information_schema``
occasionally returns when source tables were created from scripts that
forgot to strip a trailing ``\\n``) are not valid OpenMetadata FQN
components — the OM server's ``quoteName`` rejects them too. Python's
``quote_name`` therefore raises here to keep the client/server
contract consistent. The defensive try/except added to
``_get_schema_columns`` (snowflake/utils.py) and
``CommonDbSourceService.get_tables_name_and_type``
(common_db_source.py) catch this ValueError and let the rest of the
schema continue ingesting.
"""
with self.assertRaises(ValueError) as context:
fqn.quote_name("REPRO_BACKUP\n ")
self.assertIn("Invalid name", str(context.exception))
with self.assertRaises(ValueError):
fqn.quote_name("a\nb")

def test_invalid(self):
with self.assertRaises(Exception): # noqa: B017
fqn.split('a.."')
Expand Down
126 changes: 126 additions & 0 deletions ingestion/tests/unit/topology/database/test_snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -845,3 +845,129 @@ def test_yields_database_names_in_order(self):
names = list(source.get_database_names_raw())

assert names == ["DB_A", "DB_B", "DB_C"]


class SnowflakeBadNameIsolationTest(TestCase):
"""
Regression tests for the fault-isolation paths added so that a single
invalid table name in a schema does not poison ingestion for unrelated
tables. See:
- snowflake/utils.py::get_schema_columns (per-row try/except)
- snowflake/metadata.py::_get_table_names_and_types
(per-table try/except around deleted-tables FQN listcomp)
"""

@staticmethod
def _column_row(table_name, column_name, ordinal):
"""Build a row tuple in the shape _get_schema_columns iterates over."""
return (
table_name,
column_name,
"NUMBER", # coltype
None, # character_maximum_length
38, # numeric_precision
0, # numeric_scale
"YES", # is_nullable
None, # column_default
"NO", # is_identity
None, # comment
None, # identity_start
None, # identity_increment
ordinal, # ordinal_position
)

def test_get_schema_columns_skips_invalid_table_name(self):
"""A row in information_schema.columns whose table_name cannot be
FQN-quoted must be skipped, and columns for valid tables in the same
result must still be populated."""
from snowflake.sqlalchemy.snowdialect import SnowflakeDialect

from metadata.ingestion.source.database.snowflake.utils import (
get_schema_columns,
)

dialect = SnowflakeDialect()
# The function calls these on `self`; stub them.
dialect._current_database_schema = Mock(return_value=("DB", "SCHEMA"))
dialect._get_schema_primary_keys = Mock(return_value={})

rows = [
self._column_row("GOOD_TBL", "ID", 1),
# Unbalanced quote — quote_name raises ValueError, even with re.DOTALL.
self._column_row('BAD"NAME', "X", 1),
self._column_row("GOOD_TBL", "NAME", 2),
]

mock_connection = Mock()
mock_connection.execute = Mock(return_value=iter(rows))

result = get_schema_columns(dialect, mock_connection, schema="SCHEMA", info_cache={})

# The good table's columns were populated even though a bad-named row
# appeared between them — fault isolation at the per-row level.
good_key = next(k for k in result if k.lower() == "good_tbl")
self.assertEqual(len(result[good_key]), 2)
self.assertEqual([c["name"].lower() for c in result[good_key]], ["id", "name"])
# The bad-named row was skipped, not added under any case-variant key.
self.assertFalse(any("bad" in k.lower() for k in result))

def test_get_table_names_skips_deleted_with_invalid_name(self):
"""A deleted table whose name cannot be FQN-quoted must not abort the
listcomp that populates context.deleted_tables — valid deletions
before/after the bad row should still be recorded."""
from datetime import datetime

from metadata.ingestion.source.database.snowflake.models import (
SnowflakeTable,
SnowflakeTableList,
)

source = self.sources["not_incremental"] if hasattr(self, "sources") else None
if source is None:
source = next(iter(get_snowflake_sources().values()))

deleted_at = datetime(2026, 1, 1)
snowflake_tables = SnowflakeTableList(
tables=[
SnowflakeTable(name="GOOD_GONE", deleted=deleted_at, type_=TableType.Regular),
SnowflakeTable(name='BAD"GONE', deleted=deleted_at, type_=TableType.Regular),
SnowflakeTable(name="ALIVE_TBL", deleted=None, type_=TableType.Regular),
]
)

mock_inspector = MagicMock()
mock_inspector.get_table_names = Mock(return_value=snowflake_tables)
source.context.get().__dict__["database_service"] = "svc"
source.context.get().__dict__["database"] = "db"
source.context.get_global().deleted_tables = []

def fake_fqn_build(*, metadata, entity_type, service_name, database_name, schema_name, table_name, **_kw):
from metadata.utils.fqn import quote_name

# quote_name still rejects names with embedded `"`; let that drive the failure.
quote_name(table_name)
return f"{service_name}.{database_name}.{schema_name}.{table_name}"

with (
patch.object(SnowflakeSource, "inspector", new_callable=PropertyMock) as p,
patch(
"metadata.ingestion.source.database.snowflake.metadata.fqn.build",
side_effect=fake_fqn_build,
),
):
p.return_value = mock_inspector
not_deleted = source._get_table_names_and_types("SCHEMA")

# Iteration completed and yielded the alive table.
names = [t.name for t in not_deleted]
self.assertEqual(names, ["ALIVE_TBL"])
# The good deleted FQN was recorded; the bad-named one was skipped.
recorded = source.context.get_global().deleted_tables
self.assertEqual(len(recorded), 1)
self.assertIn("GOOD_GONE", recorded[0])
self.assertNotIn("BAD", " ".join(recorded))

def setUp(self):
# Build a snowflake source we can mutate per-test.
if not hasattr(self, "sources"):
self.sources = get_snowflake_sources()
Loading
Loading