diff --git a/Analytics/DataverseLink/LatencyMonitoring/FabricLinkDataStaleness.ipynb b/Analytics/DataverseLink/LatencyMonitoring/FabricLinkDataStaleness.ipynb new file mode 100644 index 00000000..4fc91d9c --- /dev/null +++ b/Analytics/DataverseLink/LatencyMonitoring/FabricLinkDataStaleness.ipynb @@ -0,0 +1 @@ +{"cells":[{"cell_type":"code","source":["# Parameters\n","tables_list = \"salestable,salesline\"\n","source_workspace_id = \"6b887591-99a7-4544-a8c4-80d4a784d7b1\"\n","source_lakehouse_name = 'Contoso_D365_LH'\n","hours_back = 2\n","target_table_name = \"cdf_staleness_table\"\n","versions_bucket_size = 5\n","display_sample = False\n","# If target_table_base_path is empty, the standard tables' path will be used\n","target_table_base_path = \"\""],"outputs":[{"output_type":"display_data","data":{"application/vnd.jupyter.statement-meta+json":{"session_id":"78bf0436-5490-4025-a3fc-ef9b17d864e5","normalized_state":"finished","queued_time":"2025-10-22T16:11:04.4044278Z","session_start_time":"2025-10-22T16:11:04.4053081Z","execution_start_time":"2025-10-22T16:11:08.7330132Z","execution_finish_time":"2025-10-22T16:11:10.9632492Z","parent_msg_id":"46f67a4a-9f01-42ed-9744-0dc8fcfc1098"}},"metadata":{}}],"execution_count":1,"metadata":{"microsoft":{"language":"python","language_group":"jupyter_python"},"tags":["parameters"]},"id":"369f3876-6c8d-4345-8eff-a771e99d6bf4"},{"cell_type":"code","source":["from deltalake import DeltaTable\n","from pyspark.sql.functions import col\n","from datetime import datetime, timedelta\n","import json\n","import pyarrow as pa\n","import pandas as pd\n","import numpy as np\n","\n","import re\n","\n","\n","\n","# ---------------- Helpers ----------------\n","\n","def deduplicate_lakehouse_table_python(\n"," input_path: str,\n"," \n","):\n"," from deltalake import write_deltalake\n"," \n"," order_cols=(\"first_SinkCreatedOn\", \"commit_version\")\n"," key_cols=(\"recid\", \"versionnumber\")\n"," \n"," # Read all Parquet files into a single DataFrame\n"," print(\"Reading DeltaTable files...\")\n"," df = DeltaTable(input_path).to_pandas()\n"," \n"," print(f\"Loaded {len(df):,} rows.\")\n"," \n"," # Deduplicate\n"," print(\"Deduplicating...\")\n"," df = df.sort_values(list(order_cols), ascending=[True]*len(order_cols))\n"," dedup_df = df.drop_duplicates(subset=list(key_cols), keep=\"first\")\n"," \n"," print(f\"Deduplicated: {len(dedup_df):,} rows remain (removed {len(df)-len(dedup_df):,}).\")\n"," \n"," # Overwrite original folder with deduplicated data\n"," print(\"Writing back...\")\n"," dedup_df = dedup_df.reset_index(drop=True)\n"," write_deltalake(\n"," target_path,\n"," dedup_df, \n"," mode=\"overwrite\",\n"," schema_mode = \"overwrite\",\n"," partition_by=[\"Table_Name\"], \n"," )\n","\n"," \n"," print(f\"Deduplication complete. Table updated at: {input_path}\")\n"," return len(dedup_df)\n","\n","def getTargetPath(target_table, target_table_base_path = \"\"):\n"," \n"," if target_table_base_path == \"\":\n"," tables_now = notebookutils.lakehouse.listTables(source_lakehouse_name, source_workspace_id)\n"," exists = any(t[\"name\"] == target_table_name for t in tables_now)\n"," # Derive the Lakehouse \"Tables\" base path from any existing table location\n"," # Example existing location:\n"," # abfss://@onelake.dfs.fabric.microsoft.com//Tables/\n"," if tables_now:\n"," any_loc = tables_now[0][\"location\"]\n"," elif filtered_tables:\n"," any_loc = filtered_tables[0][\"location\"]\n"," else:\n"," raise RuntimeError(\"Cannot infer Lakehouse Tables path (no tables found).\")\n"," \n"," tables_base = any_loc.rsplit(\"/\", 1)[0] # strip the trailing table name\n"," target_path = f\"{tables_base}/{target_table_name}\" # .../Tables/cdf_staleness_table\n"," else:\n"," target_path = f\"{target_table_base_path}/{target_table_name}\"\n"," return target_path\n","\n","def writeinthelake(partial_result, target_table, target_path, mode=\"overwrite\",):\n"," from deltalake import write_deltalake\n","\n"," target_table = DeltaTable(target_path).to_pandas()\n"," \n"," write_deltalake(\n"," target_path,\n"," partial_result, \n"," mode=mode,\n"," schema_mode = \"overwrite\",\n"," partition_by=[\"Table_Name\"], \n"," )\n"," return len(partial_result)\n"," \n","\n","\n","def to_datetime_ms_or_parse(series):\n"," \"\"\"\n"," Convert epoch-ms ints/strings or ISO strings to UTC datetimes. Treat 0 as missing.\n"," Works with Arrow-backed dtypes too.\n"," \"\"\"\n"," s = series\n"," if pd.api.types.is_datetime64_any_dtype(s):\n"," return pd.to_datetime(s, errors=\"coerce\", utc=True)\n","\n"," pa_dtype = getattr(s.dtype, \"pyarrow_dtype\", None)\n"," is_arrow_int = pa_dtype is not None and pa.types.is_integer(pa_dtype)\n"," is_arrow_ts = pa_dtype is not None and pa.types.is_timestamp(pa_dtype)\n","\n"," if is_arrow_ts:\n"," return pd.to_datetime(s, errors=\"coerce\", utc=True)\n","\n"," if is_arrow_int or pd.api.types.is_integer_dtype(s) or pd.api.types.is_float_dtype(s) or s.dtype == object:\n"," sn = pd.to_numeric(s, errors=\"coerce\")\n"," sn = sn.where(sn != 0, pd.NA) # treat 0 as missing\n"," return pd.to_datetime(sn, unit=\"ms\", errors=\"coerce\", utc=True)\n","\n"," return pd.to_datetime(s, errors=\"coerce\", utc=True)\n","\n","\n","# ---------------- Main function ----------------\n","\n","def compute_cdf_staleness(table_uri, starting_version=0, ending_version = 0, storage_options=None):\n"," \n"," dt = DeltaTable(table_uri, storage_options=storage_options or {})\n"," tables = []\n"," \n"," cdf_iter = dt.load_cdf(starting_version=starting_version, ending_version = ending_version)\n"," \n"," # No CDF iterator: return empty schema\n"," if cdf_iter is None:\n"," print(\"CDF is null\")\n"," return pd.DataFrame(columns=[\n"," \"Table_Name\", \"Id\", \"recid\", \"versionnumber\", \"Version_Staleness\",\n"," \"commit_version\", \n"," \"modifiedon\", \"createdon\",\n"," \"first_SinkCreatedOn\", \"last_SinkCreatedOn\",\n"," ])\n"," \n"," # Collect Arrow chunks\n"," for chunk in cdf_iter:\n"," if isinstance(chunk, pa.RecordBatch):\n"," tables.append(pa.Table.from_batches([chunk]))\n"," elif isinstance(chunk, pa.Table):\n"," tables.append(chunk)\n"," else:\n"," raise TypeError(f\"Unexpected CDF chunk type: {type(chunk)}\")\n"," \n"," #For debugging\n"," #print(f\"tables len: {len(tables)}\")\n","\n"," if not tables:\n"," print(\"No tables\")\n"," return pd.DataFrame(columns=[\n"," \"Table_Name\", \"Id\", \"recid\", \"versionnumber\", \"Version_Staleness\",\n"," \"commit_version\", \n"," \"modifiedon\", \"createdon\",\n"," \"first_SinkCreatedOn\", \"last_SinkCreatedOn\",\n"," ])\n","\n"," # Concat tables (new API first, fallback to old)\n"," try:\n"," cdf_tbl = pa.concat_tables(tables, promote_options=\"default\")\n"," except TypeError:\n"," cdf_tbl = pa.concat_tables(tables, promote=True)\n","\n"," df = cdf_tbl.to_pandas(types_mapper=pd.ArrowDtype)\n","\n"," #For debugging\n"," #print(f\"df len: {len(df)}\")\n","\n"," # Pick/normalize core CDF metadata cols\n"," def pick_col(opts, default=None):\n"," for c in opts:\n"," if c in df.columns:\n"," return c\n"," return default\n","\n"," change_col = pick_col([\"_change_type\", \"change_type\"])\n"," cver_col = pick_col([\"_commit_version\", \"commit_version\"])\n"," cts_col = pick_col([\"_commit_timestamp\", \"commit_timestamp\"])\n"," if not change_col or not cver_col or not cts_col:\n"," missing = [x for x, v in {\n"," \"change_type/_change_type\": change_col,\n"," \"commit_version/_commit_version\": cver_col,\n"," \"commit_timestamp/_commit_timestamp\": cts_col,\n"," }.items() if not v]\n"," raise ValueError(f\"CDF columns not found: {missing}. Ensure CDF is enabled/retained.\")\n","\n"," df = df.rename(columns={\n"," change_col: \"change_type\",\n"," cver_col: \"commit_version\",\n"," cts_col: \"commit_timestamp\",\n"," })\n","\n"," # Domain columns\n"," if \"Id\" not in df.columns and \"id\" in df.columns:\n"," df = df.rename(columns={\"id\": \"Id\"})\n"," if \"versionnumber\" not in df.columns and \"VersionNumber\" in df.columns:\n"," df = df.rename(columns={\"VersionNumber\": \"versionnumber\"})\n","\n"," if \"recid\" not in df.columns:\n"," for alt in [\"recId\", \"RecId\", \"RECID\"]:\n"," if alt in df.columns:\n"," df = df.rename(columns={alt: \"recid\"})\n"," break\n"," if \"recid\" not in df.columns:\n"," df[\"recid\"] = 0\n","\n"," sink_col = pick_col([\"SinkCreatedOn\", \"SyncCreatedOn\", \"synccreatedon\", \"sinkcreatedon\"])\n"," if sink_col and sink_col != \"SinkCreatedOn\":\n"," df = df.rename(columns={sink_col: \"SinkCreatedOn\"})\n"," if \"SinkCreatedOn\" not in df.columns:\n"," df[\"SinkCreatedOn\"] = pd.NaT\n","\n"," if \"modifiedon\" not in df.columns:\n"," df[\"modifiedon\"] = pd.NaT\n","\n"," if \"createdon\" not in df.columns:\n"," df[\"createdon\"] = pd.NaT\n","\n"," # Required\n"," required = [\"Id\", \"versionnumber\", \"commit_version\", \"recid\"]\n"," miss = [c for c in required if c not in df.columns]\n"," if miss:\n"," raise ValueError(f\"Missing required columns: {miss}\")\n"," \n","\n"," # Dtypes & timestamps\n"," df[\"commit_version\"] = pd.to_numeric(df[\"commit_version\"], errors=\"coerce\")\n"," \n","\n"," # Stable order\n"," df = df.sort_values([\"Id\", \"versionnumber\", \"commit_version\", \"commit_timestamp\"])\n","\n"," # Version birth per (Id, versionnumber)\n"," v_birth = (\n"," df[df[\"change_type\"].isin([\"insert\", \"update_postimage\"])]\n"," .sort_values([\"commit_version\", \"commit_timestamp\"])\n"," .groupby([\"Id\", \"versionnumber\"], as_index=False)\n"," .head(1)\n"," .loc[:, [\"Id\", \"versionnumber\", \"recid\", \"commit_version\", \"SinkCreatedOn\", \"modifieddatetime\", \"createddatetime\"]]\n"," .rename(columns={\n"," \"SinkCreatedOn\": \"first_SinkCreatedOn\",\n"," \"modifieddatetime\": \"modifiedon\",\n"," \"createddatetime\": \"createdon\",\n"," })\n"," )\n","\n"," #For debugging\n"," # print(f\"v_birth len: {len(v_birth)}\")\n","\n"," v_last = (\n"," df[df[\"change_type\"].isin([\"insert\", \"update_postimage\"])]\n"," .sort_values([\"commit_version\", \"commit_timestamp\"], ascending=False)\n"," .groupby([\"Id\", \"versionnumber\"], as_index=False)\n"," .head(1)\n"," .loc[:, [\"Id\", \"versionnumber\", \"recid\", \"SinkCreatedOn\"]]\n"," .rename(columns={\n"," \"SinkCreatedOn\": \"last_SinkCreatedOn\",\n"," })\n"," )\n","\n"," out = (\n"," v_birth\n"," .merge(v_last, on=[\"Id\", \"versionnumber\", \"recid\"], how=\"left\")\n"," .loc[:, [\n"," \"Id\", \"versionnumber\", \"recid\",\n"," \"commit_version\", \n"," \"modifiedon\", \"createdon\",\n"," \"first_SinkCreatedOn\", \"last_SinkCreatedOn\",\n"," ]]\n"," .sort_values([\"Id\", \"versionnumber\"])\n"," .reset_index(drop=True)\n"," )\n"," \n","\n"," \n"," # Enforce datetimes; treat epoch 0 as missing\n"," date_cols = [\n"," \"modifiedon\",\n"," \"createdon\",\n"," \"first_SinkCreatedOn\",\n"," \"last_SinkCreatedOn\",\n"," ]\n"," for c in date_cols:\n"," out[c] = to_datetime_ms_or_parse(out[c])\n"," out[c] = out[c].where(out[c] != pd.Timestamp(0, tz=\"UTC\"), pd.NaT)\n","\n"," \n","\n"," out = out.assign(Version_Staleness = out[\"first_SinkCreatedOn\"] - out[\"modifiedon\"].combine(out[\"createdon\"], func=max) )\n"," out = out.assign(Table_Name = table_uri.rsplit(\"/\", 1)[-1] )\n","\n"," \n"," # Compute timedelta\n"," \n"," fs_col = \"first_SinkCreatedOn\"\n"," lm_col = \"modifiedon\"\n"," \n","\n"," # Treat epoch-zero (1970-01-01) as missing; build a mask for missing/zero last_modified\n"," \n","\n"," zero_dt = pd.Timestamp(0, tz=\"UTC\")\n"," sent_1900 = pd.Timestamp(\"1900-01-01T00:00:00Z\")\n","\n"," mask_missing_last = out[lm_col].isna() | out[lm_col].eq(zero_dt) | out[lm_col].eq(sent_1900)\n","\n"," # 3) Compute timedelta; force staleness to 0 where last_modified is \"null date\"\n"," delta = out[fs_col] - out[lm_col]\n"," delta = delta.where(~mask_missing_last, pd.Timedelta(0))\n","\n","\n","\n"," # Build sign and absolute components\n"," sign = np.where(delta.notna() & (delta < pd.Timedelta(0)), \"-\", \"\")\n"," abs_delta = delta.abs()\n"," \n","\n"," # Days, hours, minutes, seconds (hours/minutes/seconds are within the day)\n"," days = abs_delta.dt.days\n"," hours = (abs_delta.dt.seconds // 3600)\n"," minutes = (abs_delta.dt.seconds % 3600) // 60\n"," seconds = abs_delta.dt.seconds % 60\n"," \n","\n","\n"," # Format dd.HH:mm:ss (zero-padded), keep NaN/NaT as \n"," out[\"Version_Staleness\"] = np.where(\n"," delta.notna(),\n"," sign\n"," + days.astype(\"Int64\").astype(str).str.zfill(2) + \".\"\n"," + hours.astype(\"Int64\").astype(str).str.zfill(2) + \":\"\n"," + minutes.astype(\"Int64\").astype(str).str.zfill(2) + \":\"\n"," + seconds.astype(\"Int64\").astype(str).str.zfill(2),\n"," pd.NA\n"," )\n"," \n"," #reorder cols\n"," cols = [\n"," \"Table_Name\", \"Id\", \"recid\", \"versionnumber\", \"Version_Staleness\",\n"," \"commit_version\", \n"," \"modifiedon\", \"createdon\",\n"," \"first_SinkCreatedOn\", \"last_SinkCreatedOn\",\n"," ]\n"," out = out[cols]\n"," out.sort_values([\"Table_Name\",\"Id\", \"versionnumber\"]).reset_index(drop=True)\n"," return out\n","\n","\n","# ---------------- Driver ----------------\n","\n","allowed_tables = tables_list.split(\",\")\n","\n","artifacts_tables_list = notebookutils.lakehouse.listTables(source_lakehouse_name, source_workspace_id)\n","filtered_tables = [t for t in artifacts_tables_list if t['name'] in allowed_tables]\n","\n","#filtered_tables = artifacts_tables_list.copy() #all tables\n","first = True\n","NumOfLines = 0\n","\n","target_path = getTargetPath(target_table_name, target_table_base_path )\n","\n","# Calculate timestamp\n","target_time = datetime.now() - timedelta(hours=hours_back)\n","print(f\"Process start - target changes after {target_time}\")\n","for table in filtered_tables:\n"," tablename = table['name']\n"," location = table['location']\n"," print(\"Running \" , tablename)\n"," dt = DeltaTable(location)\n"," history = dt.history()\n"," \n"," #filter History\n"," #history =[row for row in history if row.get(\"operation\") in {\"VacuumEnd\", \"VacuumStart\"}]\n"," #show history\n"," #display(history)\n"," last_version = history[0][\"version\"]\n"," print(f\"Last version: {last_version}\")\n"," \n"," matching_versions = []\n"," #There is no timestamp so we must use the LastVacuumTimestamp\n"," for entry in history:\n"," commit_info = entry.get(\"additionalCommitInfo\")\n"," if not commit_info:\n"," continue\n","\n"," # Parse JSON string to dict\n"," try:\n"," info_dict = json.loads(commit_info)\n"," vacuum_ts_str = info_dict.get(\"LastVacuumTimestamp\")\n"," if vacuum_ts_str:\n"," # Convert string to datetime\n"," vacuum_ts = datetime.strptime(vacuum_ts_str, \"%m/%d/%Y %I:%M:%S %p\")\n"," if vacuum_ts >= target_time:\n"," matching_versions.append((entry[\"version\"], vacuum_ts))\n"," except Exception as e:\n"," print(f\"Error parsing entry: {e}\")\n"," \n"," # Sort and get the latest version before the target time\n"," if matching_versions:\n"," matching_versions.sort(key=lambda x: x[0], reverse=False)\n"," start_version = matching_versions[0][0]\n"," print(f\"First version from target_time based on LastVacuumTimestamp: {start_version} {matching_versions[0][1]}\")\n"," else:\n"," print(f\"No matching version found after the target time {target_time}.\")\n"," print(\"Table not processed: \", tablename)\n"," continue\n"," \n","\n"," print(f\"Start version {start_version} - Last Version {last_version}\")\n"," ending_version = last_version\n"," if ending_version > (start_version + versions_bucket_size):\n"," ending_version = start_version + versions_bucket_size\n"," table_completed = False\n"," while table_completed == False:\n"," if ending_version >= last_version:\n"," ending_version = last_version\n"," table_completed = True\n"," result = compute_cdf_staleness(table_uri=location, starting_version=start_version, ending_version = ending_version)\n"," start_version = ending_version + 1\n"," ending_version = ending_version + versions_bucket_size\n"," \n"," if(len(result) == 0): continue\n","\n"," # Columns you want to show as human-readable strings\n"," date_cols = [\n"," \"modifiedon\", \"createdon\",\n"," \"first_SinkCreatedOn\", \"last_SinkCreatedOn\",\n"," ]\n","\n"," pretty = result.copy()\n"," \n"," # Show in UTC (ISO-ish)\n"," for c in date_cols:\n"," if c in pretty.columns and pd.api.types.is_datetime64_any_dtype(pretty[c]):\n"," pretty[c] = pretty[c].dt.strftime(\"%Y-%m-%d %H:%M:%S%z\")\n"," \n"," #Write in the table cdf_staleness_table\n"," NumOfLines += writeinthelake(pretty.copy(), target_table_name, target_path, \"append\") \n"," if display_sample:\n"," if(first): \n"," totalResult = pretty.head(1)\n"," else: \n"," totalResult = pd.concat([totalResult, pretty.head(1)]).reset_index(drop=True)\n"," first = False\n"," print(f\"Table {tablename} processed up to version {start_version - 1} (completed={table_completed})\")\n","\n","if(first == False):\n"," dedup_len = deduplicate_lakehouse_table_python(input_path=target_path)\n"," print(f\"{NumOfLines} rows created in cdf_staleness_table. Current tables records: {dedup_len}\")\n","else : print (\"No rows creted\")\n","\n","if display_sample and NumOfLines > 0 :\n"," try:\n"," display(totalResult)\n"," except NameError:\n"," print(\"totalResult is empty, no rows creted\")\n","\n","\n","\n","\n"],"outputs":[{"output_type":"display_data","data":{"application/vnd.jupyter.statement-meta+json":{"session_id":"78bf0436-5490-4025-a3fc-ef9b17d864e5","normalized_state":"finished","queued_time":"2025-10-22T16:11:04.4097607Z","session_start_time":null,"execution_start_time":"2025-10-22T16:11:10.9646568Z","execution_finish_time":"2025-10-22T16:11:44.997708Z","parent_msg_id":"acbbb162-c882-448a-8c88-f865f93ed7a4"}},"metadata":{}},{"output_type":"stream","name":"stdout","text":["Process start - target changes after 2025-10-18 12:11:30.051644\nRunning salestable\nLast version: 312\nFirst version from target_time based on LastVacuumTimestamp: 312 2025-10-21 09:21:35\nStart version 312 - Last Version 312\nnot tables\nRunning salesline\nLast version: 371\nFirst version from target_time based on LastVacuumTimestamp: 370 2025-10-21 09:21:33\nStart version 370 - Last Version 371\nTable salesline processed up to version 371 (completed=True)\nReading DeltaTable files...\nLoaded 10 rows.\nDeduplicating...\nDeduplicated: 5 rows remain (removed 5).\nWriting back...\nDeduplication complete. Table updated at: abfss://6b887591-99a7-4544-a8c4-80d4a784d7b1@onelake.dfs.fabric.microsoft.com/bca1ccdf-fa53-48c9-ac82-31536fe18832/Tables/cdf_staleness_table\n5 rows created in cdf_staleness_table. Current tables records: 5\n"]}],"execution_count":2,"metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"jupyter_python"},"tags":[]},"id":"e08a59b5-deba-469f-b80a-add19a54b8f6"}],"metadata":{"kernel_info":{"name":"jupyter","jupyter_kernel_name":"python3.11"},"kernelspec":{"name":"jupyter","language":"Jupyter","display_name":"Jupyter"},"language_info":{"name":"python"},"microsoft":{"language":"python","language_group":"jupyter_python","ms_spell_check":{"ms_spell_check_language":"en"}},"nteract":{"version":"nteract-front-end@1.0.0"},"spark_compute":{"compute_id":"/trident/default","session_options":{"conf":{"spark.synapse.nbs.session.timeout":"1200000"}}},"dependencies":{"lakehouse":{"known_lakehouses":[{"id":"bca1ccdf-fa53-48c9-ac82-31536fe18832"}],"default_lakehouse":"bca1ccdf-fa53-48c9-ac82-31536fe18832","default_lakehouse_name":"Contoso_D365_LH","default_lakehouse_workspace_id":"6b887591-99a7-4544-a8c4-80d4a784d7b1"}}},"nbformat":4,"nbformat_minor":5} \ No newline at end of file diff --git a/Analytics/DataverseLink/LatencyMonitoring/FabricLinkDataStaleness_Summarization.ipynb b/Analytics/DataverseLink/LatencyMonitoring/FabricLinkDataStaleness_Summarization.ipynb new file mode 100644 index 00000000..62166dc7 --- /dev/null +++ b/Analytics/DataverseLink/LatencyMonitoring/FabricLinkDataStaleness_Summarization.ipynb @@ -0,0 +1 @@ +{"cells":[{"cell_type":"code","source":["# ========= Parameters =========\n","lookback_hours = 3 # process rows with first_SinkCreatedOn < now - lookback_hours\n","grouping_time_zone = \"UTC\" # keep UTC or change\n","source_table = \"cdf_staleness_table\"\n","target_table = \"cdf_staleness_table_summarized\"\n","# =============================="],"outputs":[{"output_type":"display_data","data":{"application/vnd.livy.statement-meta+json":{"spark_pool":null,"statement_id":5,"statement_ids":[5],"state":"finished","livy_statement_state":"available","session_id":"5303a7df-58c4-47b4-878a-34385b0f8404","normalized_state":"finished","queued_time":"2025-10-22T16:22:47.4374596Z","session_start_time":null,"execution_start_time":"2025-10-22T16:22:47.4386429Z","execution_finish_time":"2025-10-22T16:22:47.8247882Z","parent_msg_id":"21be79dd-c90c-43c9-9114-7514a67284f9"},"text/plain":"StatementMeta(, 5303a7df-58c4-47b4-878a-34385b0f8404, 5, Finished, Available, Finished)"},"metadata":{}}],"execution_count":null,"metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"},"tags":["parameters"]},"id":"c1bf6326-9e2a-4a38-ac96-6dc1c25005cf"},{"cell_type":"code","source":["from pyspark.sql import functions as F\n","from pyspark.sql import types as T\n","\n","\n","# ------------ Safety checks ------------\n","if not spark.catalog.tableExists(source_table):\n"," raise RuntimeError(f\"Source table '{source_table}' does not exist in the current Lakehouse.\")\n","\n","if lookback_hours < 1:\n"," raise RuntimeError(f\"Lookback hours paramenter must be greather than 1. Current value: {lookback_hours}\")\n","\n","# Ensure target exists with desired schema (keys + measures). If not, create empty.\n","spark.sql(f\"\"\"\n"," CREATE TABLE IF NOT EXISTS {target_table} (\n"," TableName STRING,\n"," HourOfDay INT,\n"," Date STRING,\n"," StalenessAvg DOUBLE,\n"," StalenessMin DOUBLE,\n"," StalenessMax DOUBLE,\n"," Staleness05_00to05min BIGINT,\n"," Staleness10_05to10min BIGINT,\n"," Staleness15_10to15min BIGINT,\n"," Staleness20_15to20min BIGINT,\n"," Staleness25_20to25min BIGINT,\n"," Staleness30_25to30min BIGINT,\n"," Staleness60_30to60min BIGINT,\n"," StanelessO6_Over60 BIGINT\n"," ) USING delta\n","\"\"\")\n","\n","# ------------ Filter source by lookback ------------\n","lookback_cutoff_expr = F.expr(f\"current_timestamp() - INTERVAL {lookback_hours} HOURS\")\n","src = (\n"," spark.table(source_table)\n"," .filter(F.col(\"first_SinkCreatedOn\") < lookback_cutoff_expr)\n"," .withColumn(\"first_SinkCreatedOn\", F.col(\"first_SinkCreatedOn\").cast(\"timestamp\"))\n",")\n","\n","# If nothing to process, exit early\n","if src.rdd.isEmpty():\n"," print(f\"Nothing to process: no rows with first_SinkCreatedOn older than {lookback_hours} hour(s).\")\n","else:\n"," # ------------ Parse Version_Staleness: 'd.hh:mm:ss' -> total minutes (double) ------------\n"," # Robust to either 'd.hh:mm:ss' or 'hh:mm:ss' (treat missing days as 0)\n"," vs_str = F.col(\"Version_Staleness\").cast(\"string\")\n"," dot_split = F.split(vs_str, \"\\\\.\") # [days, hh:mm:ss] OR [hh:mm:ss]\n"," has_day = (F.size(dot_split) == F.lit(2))\n"," days = F.when(vs_str.isNull(), F.lit(None).cast(\"int\")) \\\n"," .otherwise(F.when(has_day, dot_split.getItem(0).cast(\"int\")).otherwise(F.lit(0)))\n","\n"," time_part = F.when(has_day, dot_split.getItem(1)).otherwise(dot_split.getItem(0))\n"," t_split = F.split(time_part, \":\") # [hh, mm, ss]\n","\n"," hours = F.when(vs_str.isNull(), F.lit(None).cast(\"int\")).otherwise(t_split.getItem(0).cast(\"int\"))\n"," minutes = F.when(vs_str.isNull(), F.lit(None).cast(\"int\")).otherwise(t_split.getItem(1).cast(\"int\"))\n"," seconds = F.when(vs_str.isNull(), F.lit(None).cast(\"int\")).otherwise(t_split.getItem(2).cast(\"int\"))\n","\n"," total_minutes = (\n"," days.cast(\"double\") * 1440.0 + # days -> minutes\n"," hours.cast(\"double\") * 60.0 +\n"," minutes.cast(\"double\") +\n"," seconds.cast(\"double\") / 60.0\n"," )\n","\n"," src = src.withColumn(\"Version_Staleness_Minutes\", total_minutes)\n","\n"," # ------------ Derive grouping keys (UTC) ------------\n"," # Convert from UTC-to-UTC (no-op) but consistent; use this column for Date/HourOfDay\n"," ts_group = F.from_utc_timestamp(F.col(\"first_SinkCreatedOn\"), grouping_time_zone)\n","\n"," df = (\n"," src\n"," .withColumn(\"TableName\", F.col(\"Table_Name\"))\n"," .withColumn(\"ts_group\", ts_group)\n"," .withColumn(\"HourOfDay\", F.hour(\"ts_group\"))\n"," .withColumn(\"Date\", F.date_format(F.to_date(\"ts_group\"), \"yyyy-MM-dd\"))\n"," )\n","\n"," st_mins = F.col(\"Version_Staleness_Minutes\").cast(\"double\")\n","\n"," # ------------ Bucket counts ------------\n"," c_00_05 = F.sum(F.when(st_mins <= 5, 1).otherwise(0)).cast(\"bigint\")\n"," c_05_10 = F.sum(F.when((st_mins > 5) & (st_mins <= 10), 1).otherwise(0)).cast(\"bigint\")\n"," c_10_15 = F.sum(F.when((st_mins > 10) & (st_mins <= 15), 1).otherwise(0)).cast(\"bigint\")\n"," c_15_20 = F.sum(F.when((st_mins > 15) & (st_mins <= 20), 1).otherwise(0)).cast(\"bigint\")\n"," c_20_25 = F.sum(F.when((st_mins > 20) & (st_mins <= 25), 1).otherwise(0)).cast(\"bigint\")\n"," c_25_30 = F.sum(F.when((st_mins > 25) & (st_mins <= 30), 1).otherwise(0)).cast(\"bigint\")\n"," c_30_60 = F.sum(F.when((st_mins > 30) & (st_mins <= 60), 1).otherwise(0)).cast(\"bigint\")\n"," c_o60 = F.sum(F.when((st_mins > 60), 1).otherwise(0)).cast(\"bigint\")\n","\n"," agg = (\n"," df.groupBy(\"TableName\", \"HourOfDay\", \"Date\")\n"," .agg(\n"," F.avg(st_mins).alias(\"StalenessAvg\"),\n"," F.min(st_mins).alias(\"StalenessMin\"),\n"," F.max(st_mins).alias(\"StalenessMax\"),\n"," c_00_05.alias(\"Staleness05_00to05min\"),\n"," c_05_10.alias(\"Staleness10_05to10min\"),\n"," c_10_15.alias(\"Staleness15_10to15min\"),\n"," c_15_20.alias(\"Staleness20_15to20min\"),\n"," c_20_25.alias(\"Staleness25_20to25min\"),\n"," c_25_30.alias(\"Staleness30_25to30min\"),\n"," c_30_60.alias(\"Staleness60_30to60min\"),\n"," c_o60.alias(\"StanelessO6_Over60\"),\n"," )\n"," )\n","\n"," # ------------ Upsert into target (override by composite key) ------------\n"," agg.createOrReplaceTempView(\"agg_staleness_summary_tmp\")\n","\n"," spark.sql(f\"\"\"\n"," MERGE INTO {target_table} AS tgt\n"," USING agg_staleness_summary_tmp AS src\n"," ON tgt.TableName = src.TableName\n"," AND tgt.HourOfDay = src.HourOfDay\n"," AND tgt.Date = src.Date\n"," WHEN MATCHED THEN UPDATE SET\n"," tgt.StalenessAvg = src.StalenessAvg,\n"," tgt.StalenessMin = src.StalenessMin,\n"," tgt.StalenessMax = src.StalenessMax,\n"," tgt.Staleness05_00to05min = src.Staleness05_00to05min,\n"," tgt.Staleness10_05to10min = src.Staleness10_05to10min,\n"," tgt.Staleness15_10to15min = src.Staleness15_10to15min,\n"," tgt.Staleness20_15to20min = src.Staleness20_15to20min,\n"," tgt.Staleness25_20to25min = src.Staleness25_20to25min,\n"," tgt.Staleness30_25to30min = src.Staleness30_25to30min,\n"," tgt.Staleness60_30to60min = src.Staleness60_30to60min,\n"," tgt.StanelessO6_Over60 = src.StanelessO6_Over60\n"," WHEN NOT MATCHED THEN INSERT (\n"," TableName, HourOfDay, Date,\n"," StalenessAvg, StalenessMin, StalenessMax,\n"," Staleness05_00to05min, Staleness10_05to10min, Staleness15_10to15min,\n"," Staleness20_15to20min, Staleness25_20to25min, Staleness30_25to30min, \n"," Staleness60_30to60min, StanelessO6_Over60\n"," ) VALUES (\n"," src.TableName, src.HourOfDay, src.Date,\n"," src.StalenessAvg, src.StalenessMin, src.StalenessMax,\n"," src.Staleness05_00to05min, src.Staleness10_05to10min, src.Staleness15_10to15min,\n"," src.Staleness20_15to20min, src.Staleness25_20to25min, src.Staleness30_25to30min, \n"," src.Staleness60_30to60min, src.StanelessO6_Over60\n"," )\n"," \"\"\")\n","\n"," # ------------ Remove processed source rows ------------\n"," spark.sql(f\"\"\"\n"," DELETE FROM {source_table}\n"," WHERE first_SinkCreatedOn < (current_timestamp() - INTERVAL {lookback_hours} HOURS)\n"," \"\"\")\n","\n"," print(f\"Summarization complete (UTC). Processed & removed rows older than {lookback_hours} hour(s).\")\n","\n"],"outputs":[{"output_type":"display_data","data":{"application/vnd.livy.statement-meta+json":{"spark_pool":null,"statement_id":6,"statement_ids":[6],"state":"finished","livy_statement_state":"available","session_id":"5303a7df-58c4-47b4-878a-34385b0f8404","normalized_state":"finished","queued_time":"2025-10-22T16:22:47.5483141Z","session_start_time":null,"execution_start_time":"2025-10-22T16:22:47.827003Z","execution_finish_time":"2025-10-22T16:23:08.3397606Z","parent_msg_id":"f7a83a80-57c7-43c0-be6d-f7de9ccd0ac8"},"text/plain":"StatementMeta(, 5303a7df-58c4-47b4-878a-34385b0f8404, 6, Finished, Available, Finished)"},"metadata":{}},{"output_type":"stream","name":"stdout","text":["Summarization complete (UTC). Processed & removed rows older than 3 hour(s).\n"]}],"execution_count":null,"metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"0c8a47d4-e58a-4b46-beff-bf10acda639a"}],"metadata":{"kernel_info":{"name":"synapse_pyspark"},"kernelspec":{"name":"synapse_pyspark","display_name":"synapse_pyspark"},"language_info":{"name":"python"},"microsoft":{"language":"python","language_group":"synapse_pyspark","ms_spell_check":{"ms_spell_check_language":"en"}},"nteract":{"version":"nteract-front-end@1.0.0"},"spark_compute":{"compute_id":"/trident/default","session_options":{"conf":{"spark.synapse.nbs.session.timeout":"1200000"}}},"dependencies":{"lakehouse":{"known_lakehouses":[{"id":"bca1ccdf-fa53-48c9-ac82-31536fe18832"}],"default_lakehouse":"bca1ccdf-fa53-48c9-ac82-31536fe18832","default_lakehouse_name":"Contoso_D365_LH","default_lakehouse_workspace_id":"6b887591-99a7-4544-a8c4-80d4a784d7b1"}}},"nbformat":4,"nbformat_minor":5} \ No newline at end of file diff --git a/Analytics/DataverseLink/LatencyMonitoring/Staleness analysis pipeline.zip b/Analytics/DataverseLink/LatencyMonitoring/Staleness analysis pipeline.zip new file mode 100644 index 00000000..86b0b7a2 Binary files /dev/null and b/Analytics/DataverseLink/LatencyMonitoring/Staleness analysis pipeline.zip differ diff --git a/Analytics/DataverseLink/LatencyMonitoring/Staleness report Fabric Link.pbix b/Analytics/DataverseLink/LatencyMonitoring/Staleness report Fabric Link.pbix new file mode 100644 index 00000000..000486a5 Binary files /dev/null and b/Analytics/DataverseLink/LatencyMonitoring/Staleness report Fabric Link.pbix differ diff --git a/Analytics/DataverseLink/LatencyMonitoring/readme.md b/Analytics/DataverseLink/LatencyMonitoring/readme.md new file mode 100644 index 00000000..60714fcf --- /dev/null +++ b/Analytics/DataverseLink/LatencyMonitoring/readme.md @@ -0,0 +1,58 @@ +# Fabric Link Staleness Report +This repository contains the components to crete a staleness report based on the data extracted using Fabric Link. + +## Challenge +Many Dynamics 365 Finance and SCM customers measure latency by simply comparing the value of **SinkModifiedOn** with **ModifiedDateTime** (or CretedDateTime). Problem: Fabric Link can sink multiple times the same RecId/RecVersion leaving only the last SinkModifiedOn. This leads to perceived latency that can be way higher than the real latency. +This solution allow to crete a report on top of Fabric, that provides accurate latency. + +## Solution Components +1. Table to store the actual staleness for RecId/RecVersion pairs (granular). +2. Table to store aggregated information (summarized). +3. Two scripts (Fabric Notebooks) to maintain the tables. +4. (opt) Pipeline to run the notebooks with a schedule. +5. (opt) Power BI dashboard to visualize the status. + +## Content +- **FabricLinkDataStaleness.ipynb**: the main notebook that generates the granular staleness data +- **FabricLinkDataStaleness_Summarization.ipynb**: the aggregation script that generates the summarized data and cleans up the ranualr table +- **Staleness analysis pipeline.zip**: containing the pipeline to run the scripts in sequence +- **Staleness report Fabric Link.pbix**: simple example of Power BI report leveraging the generated data + +## Preprequisites +The script works only with tables that have modifieddatatime or creteddatetime activated. + +## Intructions +Download the content and set up the two notebooks in fabric. +Import the two notebooks in Fabric and set up the needed parameters. + +#### FabricLinkDataStaleness paramenters: +- **tables_list**: the comma-separated list of tables to analyze (*mandatory*) +- **source_workspace_id**: the id of the workspace that contains the tables (*mandatory*) +- **source_lakehouse_name**: the name of the Lakehouse that contains the tables (*mandatory*) +- **hours_back**: how many hours back the script has to consider. This parameter must be an integer greaterthan zero (*mandatory*) +- **target_table_name**: name of the table that will contain the granular staleness data (*mandatory*) +- **versions_bucket_size**: the script works in chunks, this parameter decides how many versions must be considered for each chunk. It helps reducing the strain on the processing resources. It must be an integer greater than 2. (*mandatory*) +- **display_sample**: this parameter is used for debugging purposes to display samples of records in the console durin the runs +- **target_table_base_path**: use this parameter to have the target table sitting in a different lake. If *target_table_base_path* is empty, the standard tables' path will be used (*optional*) + +#### FabricLinkDataStaleness_Summarization paramenters: +- **lookback_hours**: The granular rows are processed starting from *first_SinkCreatedOn < now - lookback_hours*. This paramenter must be an integer greater than 1 +- **grouping_time_zone**: Default time zone UTC +- **source_table**: Must be the same as *target_table_name* parameter in the FabricLinkDataStaleness script +- **target_table**: The target summarization table *Staleness analysis pipeline.zip* + +*(opt.)* Crete the pipeline using **Staleness analysis pipeline.zip**. In your workspace crete a new pipeline then import the zip file from the edit page. +Note that all the above parameters can be changed in the pipeline in order to adapt to the specific needs. Make sure the pipeline's recurrence matcher the scripts parameters *hours_back* and *lookback_hours* to avoid leaving out some changes. + +Use the Power BI report example or build your own to easily analyze the data. + +## Suggestions +1. Run the recurring pipeline to have a stable and automated flow of information +2. Limit the analysis to critical tables. The latency analysis can be quite heavy depending on the change rate of tables +3. Make the summarization chase the granular analysis. For example: Granular analysis can process the last 3 hours while summarization compresses everything before that. +4. If the update flow is massive, keep the granular level window small. There is hardly any value in having details for billions of records. +5. In general, adapt the parameters to your scenario. +6. Test with few tables and then increase as needed. +7. Use *versions_bucket_size* parameter to reduce the CPU strain. + + diff --git a/Integration/README.md b/Integration/README.md index f7f68901..7a53d3e8 100644 --- a/Integration/README.md +++ b/Integration/README.md @@ -19,9 +19,8 @@ This implemetation asset provides sample integration code for Dynamics 365. | File/folder | Description | |-------------|-------------| | `README.md` | This README file. | -| `ODataCoreConsoleApp.sln` | Visual Studio 2019 solution file for the sample OData console application. | -| `ODataCoreConsoleApp` | Sample F&O OData client console application written to illustrate the use of .Net Core and the OData Connected Service in a client application. | +| `ODataCoreConsoleApp.sln` | Visual Studio 2022 solution file for the sample OData console application. | +| `ODataCoreConsoleApp` | [The OData Core Console App](https://github.com/microsoft/Dynamics-365-FastTrack-Implementation-Assets/tree/master/Integration/ODataCoreConsoleApp) is a sample F&O OData client console application written to illustrate the use of .Net Core and the OData Connected Service in a client application.| +| `MessageProcessorConsoleApp.sln` | Visual Studio 2022 solution file for the sample Message Processor console application. | +| `MessageProcessorConsoleApp` | [The Message Processor Console App](https://github.com/microsoft/Dynamics-365-FastTrack-Implementation-Assets/tree/master/Integration/MessageProcessorConsoleApp) is a message Processor console application written to illustrate the use of .Net to interact with Dynamics 365 F&O Message processor framework.| | `Field Service integration with F&O` | [Field Service integration with F&O extensibility examples](https://github.com/microsoft/Dynamics-365-FastTrack-Implementation-Assets/blob/master/Integration/Field%20service%20integration%20with%20F%26O/readme.md) | - -### Sample OData Console Application -- [OData Console sample](https://github.com/microsoft/Dynamics-365-FastTrack-Implementation-Assets/tree/master/Integration/ODataCoreConsoleApp).