Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"cells":[{"cell_type":"code","source":["# ========= Parameters =========\n","lookback_hours = 3 # process rows with first_SinkCreatedOn < now - lookback_hours\n","grouping_time_zone = \"UTC\" # keep UTC or change\n","source_table = \"cdf_staleness_table\"\n","target_table = \"cdf_staleness_table_summarized\"\n","# =============================="],"outputs":[{"output_type":"display_data","data":{"application/vnd.livy.statement-meta+json":{"spark_pool":null,"statement_id":5,"statement_ids":[5],"state":"finished","livy_statement_state":"available","session_id":"5303a7df-58c4-47b4-878a-34385b0f8404","normalized_state":"finished","queued_time":"2025-10-22T16:22:47.4374596Z","session_start_time":null,"execution_start_time":"2025-10-22T16:22:47.4386429Z","execution_finish_time":"2025-10-22T16:22:47.8247882Z","parent_msg_id":"21be79dd-c90c-43c9-9114-7514a67284f9"},"text/plain":"StatementMeta(, 5303a7df-58c4-47b4-878a-34385b0f8404, 5, Finished, Available, Finished)"},"metadata":{}}],"execution_count":null,"metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"},"tags":["parameters"]},"id":"c1bf6326-9e2a-4a38-ac96-6dc1c25005cf"},{"cell_type":"code","source":["from pyspark.sql import functions as F\n","from pyspark.sql import types as T\n","\n","\n","# ------------ Safety checks ------------\n","if not spark.catalog.tableExists(source_table):\n"," raise RuntimeError(f\"Source table '{source_table}' does not exist in the current Lakehouse.\")\n","\n","if lookback_hours < 1:\n"," raise RuntimeError(f\"Lookback hours paramenter must be greather than 1. Current value: {lookback_hours}\")\n","\n","# Ensure target exists with desired schema (keys + measures). If not, create empty.\n","spark.sql(f\"\"\"\n"," CREATE TABLE IF NOT EXISTS {target_table} (\n"," TableName STRING,\n"," HourOfDay INT,\n"," Date STRING,\n"," StalenessAvg DOUBLE,\n"," StalenessMin DOUBLE,\n"," StalenessMax DOUBLE,\n"," Staleness05_00to05min BIGINT,\n"," Staleness10_05to10min BIGINT,\n"," Staleness15_10to15min BIGINT,\n"," Staleness20_15to20min BIGINT,\n"," Staleness25_20to25min BIGINT,\n"," Staleness30_25to30min BIGINT,\n"," Staleness60_30to60min BIGINT,\n"," StanelessO6_Over60 BIGINT\n"," ) USING delta\n","\"\"\")\n","\n","# ------------ Filter source by lookback ------------\n","lookback_cutoff_expr = F.expr(f\"current_timestamp() - INTERVAL {lookback_hours} HOURS\")\n","src = (\n"," spark.table(source_table)\n"," .filter(F.col(\"first_SinkCreatedOn\") < lookback_cutoff_expr)\n"," .withColumn(\"first_SinkCreatedOn\", F.col(\"first_SinkCreatedOn\").cast(\"timestamp\"))\n",")\n","\n","# If nothing to process, exit early\n","if src.rdd.isEmpty():\n"," print(f\"Nothing to process: no rows with first_SinkCreatedOn older than {lookback_hours} hour(s).\")\n","else:\n"," # ------------ Parse Version_Staleness: 'd.hh:mm:ss' -> total minutes (double) ------------\n"," # Robust to either 'd.hh:mm:ss' or 'hh:mm:ss' (treat missing days as 0)\n"," vs_str = F.col(\"Version_Staleness\").cast(\"string\")\n"," dot_split = F.split(vs_str, \"\\\\.\") # [days, hh:mm:ss] OR [hh:mm:ss]\n"," has_day = (F.size(dot_split) == F.lit(2))\n"," days = F.when(vs_str.isNull(), F.lit(None).cast(\"int\")) \\\n"," .otherwise(F.when(has_day, dot_split.getItem(0).cast(\"int\")).otherwise(F.lit(0)))\n","\n"," time_part = F.when(has_day, dot_split.getItem(1)).otherwise(dot_split.getItem(0))\n"," t_split = F.split(time_part, \":\") # [hh, mm, ss]\n","\n"," hours = F.when(vs_str.isNull(), F.lit(None).cast(\"int\")).otherwise(t_split.getItem(0).cast(\"int\"))\n"," minutes = F.when(vs_str.isNull(), F.lit(None).cast(\"int\")).otherwise(t_split.getItem(1).cast(\"int\"))\n"," seconds = F.when(vs_str.isNull(), F.lit(None).cast(\"int\")).otherwise(t_split.getItem(2).cast(\"int\"))\n","\n"," total_minutes = (\n"," days.cast(\"double\") * 1440.0 + # days -> minutes\n"," hours.cast(\"double\") * 60.0 +\n"," minutes.cast(\"double\") +\n"," seconds.cast(\"double\") / 60.0\n"," )\n","\n"," src = src.withColumn(\"Version_Staleness_Minutes\", total_minutes)\n","\n"," # ------------ Derive grouping keys (UTC) ------------\n"," # Convert from UTC-to-UTC (no-op) but consistent; use this column for Date/HourOfDay\n"," ts_group = F.from_utc_timestamp(F.col(\"first_SinkCreatedOn\"), grouping_time_zone)\n","\n"," df = (\n"," src\n"," .withColumn(\"TableName\", F.col(\"Table_Name\"))\n"," .withColumn(\"ts_group\", ts_group)\n"," .withColumn(\"HourOfDay\", F.hour(\"ts_group\"))\n"," .withColumn(\"Date\", F.date_format(F.to_date(\"ts_group\"), \"yyyy-MM-dd\"))\n"," )\n","\n"," st_mins = F.col(\"Version_Staleness_Minutes\").cast(\"double\")\n","\n"," # ------------ Bucket counts ------------\n"," c_00_05 = F.sum(F.when(st_mins <= 5, 1).otherwise(0)).cast(\"bigint\")\n"," c_05_10 = F.sum(F.when((st_mins > 5) & (st_mins <= 10), 1).otherwise(0)).cast(\"bigint\")\n"," c_10_15 = F.sum(F.when((st_mins > 10) & (st_mins <= 15), 1).otherwise(0)).cast(\"bigint\")\n"," c_15_20 = F.sum(F.when((st_mins > 15) & (st_mins <= 20), 1).otherwise(0)).cast(\"bigint\")\n"," c_20_25 = F.sum(F.when((st_mins > 20) & (st_mins <= 25), 1).otherwise(0)).cast(\"bigint\")\n"," c_25_30 = F.sum(F.when((st_mins > 25) & (st_mins <= 30), 1).otherwise(0)).cast(\"bigint\")\n"," c_30_60 = F.sum(F.when((st_mins > 30) & (st_mins <= 60), 1).otherwise(0)).cast(\"bigint\")\n"," c_o60 = F.sum(F.when((st_mins > 60), 1).otherwise(0)).cast(\"bigint\")\n","\n"," agg = (\n"," df.groupBy(\"TableName\", \"HourOfDay\", \"Date\")\n"," .agg(\n"," F.avg(st_mins).alias(\"StalenessAvg\"),\n"," F.min(st_mins).alias(\"StalenessMin\"),\n"," F.max(st_mins).alias(\"StalenessMax\"),\n"," c_00_05.alias(\"Staleness05_00to05min\"),\n"," c_05_10.alias(\"Staleness10_05to10min\"),\n"," c_10_15.alias(\"Staleness15_10to15min\"),\n"," c_15_20.alias(\"Staleness20_15to20min\"),\n"," c_20_25.alias(\"Staleness25_20to25min\"),\n"," c_25_30.alias(\"Staleness30_25to30min\"),\n"," c_30_60.alias(\"Staleness60_30to60min\"),\n"," c_o60.alias(\"StanelessO6_Over60\"),\n"," )\n"," )\n","\n"," # ------------ Upsert into target (override by composite key) ------------\n"," agg.createOrReplaceTempView(\"agg_staleness_summary_tmp\")\n","\n"," spark.sql(f\"\"\"\n"," MERGE INTO {target_table} AS tgt\n"," USING agg_staleness_summary_tmp AS src\n"," ON tgt.TableName = src.TableName\n"," AND tgt.HourOfDay = src.HourOfDay\n"," AND tgt.Date = src.Date\n"," WHEN MATCHED THEN UPDATE SET\n"," tgt.StalenessAvg = src.StalenessAvg,\n"," tgt.StalenessMin = src.StalenessMin,\n"," tgt.StalenessMax = src.StalenessMax,\n"," tgt.Staleness05_00to05min = src.Staleness05_00to05min,\n"," tgt.Staleness10_05to10min = src.Staleness10_05to10min,\n"," tgt.Staleness15_10to15min = src.Staleness15_10to15min,\n"," tgt.Staleness20_15to20min = src.Staleness20_15to20min,\n"," tgt.Staleness25_20to25min = src.Staleness25_20to25min,\n"," tgt.Staleness30_25to30min = src.Staleness30_25to30min,\n"," tgt.Staleness60_30to60min = src.Staleness60_30to60min,\n"," tgt.StanelessO6_Over60 = src.StanelessO6_Over60\n"," WHEN NOT MATCHED THEN INSERT (\n"," TableName, HourOfDay, Date,\n"," StalenessAvg, StalenessMin, StalenessMax,\n"," Staleness05_00to05min, Staleness10_05to10min, Staleness15_10to15min,\n"," Staleness20_15to20min, Staleness25_20to25min, Staleness30_25to30min, \n"," Staleness60_30to60min, StanelessO6_Over60\n"," ) VALUES (\n"," src.TableName, src.HourOfDay, src.Date,\n"," src.StalenessAvg, src.StalenessMin, src.StalenessMax,\n"," src.Staleness05_00to05min, src.Staleness10_05to10min, src.Staleness15_10to15min,\n"," src.Staleness20_15to20min, src.Staleness25_20to25min, src.Staleness30_25to30min, \n"," src.Staleness60_30to60min, src.StanelessO6_Over60\n"," )\n"," \"\"\")\n","\n"," # ------------ Remove processed source rows ------------\n"," spark.sql(f\"\"\"\n"," DELETE FROM {source_table}\n"," WHERE first_SinkCreatedOn < (current_timestamp() - INTERVAL {lookback_hours} HOURS)\n"," \"\"\")\n","\n"," print(f\"Summarization complete (UTC). Processed & removed rows older than {lookback_hours} hour(s).\")\n","\n"],"outputs":[{"output_type":"display_data","data":{"application/vnd.livy.statement-meta+json":{"spark_pool":null,"statement_id":6,"statement_ids":[6],"state":"finished","livy_statement_state":"available","session_id":"5303a7df-58c4-47b4-878a-34385b0f8404","normalized_state":"finished","queued_time":"2025-10-22T16:22:47.5483141Z","session_start_time":null,"execution_start_time":"2025-10-22T16:22:47.827003Z","execution_finish_time":"2025-10-22T16:23:08.3397606Z","parent_msg_id":"f7a83a80-57c7-43c0-be6d-f7de9ccd0ac8"},"text/plain":"StatementMeta(, 5303a7df-58c4-47b4-878a-34385b0f8404, 6, Finished, Available, Finished)"},"metadata":{}},{"output_type":"stream","name":"stdout","text":["Summarization complete (UTC). Processed & removed rows older than 3 hour(s).\n"]}],"execution_count":null,"metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"0c8a47d4-e58a-4b46-beff-bf10acda639a"}],"metadata":{"kernel_info":{"name":"synapse_pyspark"},"kernelspec":{"name":"synapse_pyspark","display_name":"synapse_pyspark"},"language_info":{"name":"python"},"microsoft":{"language":"python","language_group":"synapse_pyspark","ms_spell_check":{"ms_spell_check_language":"en"}},"nteract":{"version":"nteract-front-end@1.0.0"},"spark_compute":{"compute_id":"/trident/default","session_options":{"conf":{"spark.synapse.nbs.session.timeout":"1200000"}}},"dependencies":{"lakehouse":{"known_lakehouses":[{"id":"bca1ccdf-fa53-48c9-ac82-31536fe18832"}],"default_lakehouse":"bca1ccdf-fa53-48c9-ac82-31536fe18832","default_lakehouse_name":"Contoso_D365_LH","default_lakehouse_workspace_id":"6b887591-99a7-4544-a8c4-80d4a784d7b1"}}},"nbformat":4,"nbformat_minor":5}
Binary file not shown.
Binary file not shown.
58 changes: 58 additions & 0 deletions Analytics/DataverseLink/LatencyMonitoring/readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# Fabric Link Staleness Report
This repository contains the components to crete a staleness report based on the data extracted using Fabric Link.

## Challenge
Many Dynamics 365 Finance and SCM customers measure latency by simply comparing the value of **SinkModifiedOn** with **ModifiedDateTime** (or CretedDateTime). Problem: Fabric Link can sink multiple times the same RecId/RecVersion leaving only the last SinkModifiedOn. This leads to perceived latency that can be way higher than the real latency.
This solution allow to crete a report on top of Fabric, that provides accurate latency.

## Solution Components
1. Table to store the actual staleness for RecId/RecVersion pairs (granular).
2. Table to store aggregated information (summarized).
3. Two scripts (Fabric Notebooks) to maintain the tables.
4. (opt) Pipeline to run the notebooks with a schedule.
5. (opt) Power BI dashboard to visualize the status.

## Content
- **FabricLinkDataStaleness.ipynb**: the main notebook that generates the granular staleness data
- **FabricLinkDataStaleness_Summarization.ipynb**: the aggregation script that generates the summarized data and cleans up the ranualr table
- **Staleness analysis pipeline.zip**: containing the pipeline to run the scripts in sequence
- **Staleness report Fabric Link.pbix**: simple example of Power BI report leveraging the generated data

## Preprequisites
The script works only with tables that have modifieddatatime or creteddatetime activated.

## Intructions
Download the content and set up the two notebooks in fabric.
Import the two notebooks in Fabric and set up the needed parameters.

#### FabricLinkDataStaleness paramenters:
- **tables_list**: the comma-separated list of tables to analyze (*mandatory*)
- **source_workspace_id**: the id of the workspace that contains the tables (*mandatory*)
- **source_lakehouse_name**: the name of the Lakehouse that contains the tables (*mandatory*)
- **hours_back**: how many hours back the script has to consider. This parameter must be an integer greaterthan zero (*mandatory*)
- **target_table_name**: name of the table that will contain the granular staleness data (*mandatory*)
- **versions_bucket_size**: the script works in chunks, this parameter decides how many versions must be considered for each chunk. It helps reducing the strain on the processing resources. It must be an integer greater than 2. (*mandatory*)
- **display_sample**: this parameter is used for debugging purposes to display samples of records in the console durin the runs
- **target_table_base_path**: use this parameter to have the target table sitting in a different lake. If *target_table_base_path* is empty, the standard tables' path will be used (*optional*)

#### FabricLinkDataStaleness_Summarization paramenters:
- **lookback_hours**: The granular rows are processed starting from *first_SinkCreatedOn < now - lookback_hours*. This paramenter must be an integer greater than 1
- **grouping_time_zone**: Default time zone UTC
- **source_table**: Must be the same as *target_table_name* parameter in the FabricLinkDataStaleness script
- **target_table**: The target summarization table *Staleness analysis pipeline.zip*

*(opt.)* Crete the pipeline using **Staleness analysis pipeline.zip**. In your workspace crete a new pipeline then import the zip file from the edit page.
Note that all the above parameters can be changed in the pipeline in order to adapt to the specific needs. Make sure the pipeline's recurrence matcher the scripts parameters *hours_back* and *lookback_hours* to avoid leaving out some changes.

Use the Power BI report example or build your own to easily analyze the data.

## Suggestions
1. Run the recurring pipeline to have a stable and automated flow of information
2. Limit the analysis to critical tables. The latency analysis can be quite heavy depending on the change rate of tables
3. Make the summarization chase the granular analysis. For example: Granular analysis can process the last 3 hours while summarization compresses everything before that.
4. If the update flow is massive, keep the granular level window small. There is hardly any value in having details for billions of records.
5. In general, adapt the parameters to your scenario.
6. Test with few tables and then increase as needed.
7. Use *versions_bucket_size* parameter to reduce the CPU strain.


9 changes: 4 additions & 5 deletions Integration/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@ This implemetation asset provides sample integration code for Dynamics 365.
| File/folder | Description |
|-------------|-------------|
| `README.md` | This README file. |
| `ODataCoreConsoleApp.sln` | Visual Studio 2019 solution file for the sample OData console application. |
| `ODataCoreConsoleApp` | Sample F&O OData client console application written to illustrate the use of .Net Core and the OData Connected Service in a client application. |
| `ODataCoreConsoleApp.sln` | Visual Studio 2022 solution file for the sample OData console application. |
| `ODataCoreConsoleApp` | [The OData Core Console App](https://github.com/microsoft/Dynamics-365-FastTrack-Implementation-Assets/tree/master/Integration/ODataCoreConsoleApp) is a sample F&O OData client console application written to illustrate the use of .Net Core and the OData Connected Service in a client application.|
| `MessageProcessorConsoleApp.sln` | Visual Studio 2022 solution file for the sample Message Processor console application. |
| `MessageProcessorConsoleApp` | [The Message Processor Console App](https://github.com/microsoft/Dynamics-365-FastTrack-Implementation-Assets/tree/master/Integration/MessageProcessorConsoleApp) is a message Processor console application written to illustrate the use of .Net to interact with Dynamics 365 F&O Message processor framework.|
| `Field Service integration with F&O` | [Field Service integration with F&O extensibility examples](https://github.com/microsoft/Dynamics-365-FastTrack-Implementation-Assets/blob/master/Integration/Field%20service%20integration%20with%20F%26O/readme.md) |

### Sample OData Console Application
- [OData Console sample](https://github.com/microsoft/Dynamics-365-FastTrack-Implementation-Assets/tree/master/Integration/ODataCoreConsoleApp).