From 484683d3d8569247a3dbfc04b44ec9b623038597 Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Mon, 19 Sep 2022 17:23:20 +0200 Subject: [PATCH 1/6] Add provenance to CASA table reads and writes --- daskms/reads.py | 27 ++++++++++++++++++++++--- daskms/table_proxy.py | 1 + daskms/tests/test_metadata.py | 18 +++++++++++++++++ daskms/tests/test_ms_read_and_update.py | 11 +++++++--- daskms/writes.py | 19 +++++++++++++---- 5 files changed, 66 insertions(+), 10 deletions(-) create mode 100644 daskms/tests/test_metadata.py diff --git a/daskms/reads.py b/daskms/reads.py index df227026..03524643 100644 --- a/daskms/reads.py +++ b/daskms/reads.py @@ -14,7 +14,7 @@ dim_extents_array, infer_dtype, ) -from daskms.constants import DASKMS_PARTITION_KEY +from daskms.constants import DASKMS_PARTITION_KEY, DASKMS_METADATA from daskms.ordering import ( ordering_taql, row_ordering, @@ -318,6 +318,20 @@ def _table_proxy_factory(self): __executor_key__=executor_key(self.canonical_name), ) + def _metadata(self, table_proxy): + """ Create daskms metadata """ + metadata = table_proxy.getdesc().result().get(DASKMS_METADATA, {}) + provenance = metadata.setdefault("provenance", []) + + try: + provenance.remove(self.table_path) + except ValueError: + pass + + provenance.append(self.table_path) + + return metadata + def _table_schema(self): return lookup_table_schema(self.canonical_name, self.table_schema) @@ -344,7 +358,11 @@ def _single_dataset(self, table_proxy, orders, exemplar_row=0): else: coords = {"ROWID": rowid} - attrs = {DASKMS_PARTITION_KEY: ()} + attrs = { + DASKMS_METADATA: self._metadata(table_proxy), + DASKMS_PARTITION_KEY: {} + } + return Dataset(variables, coords=coords, attrs=attrs) @@ -400,7 +418,10 @@ def _group_datasets(self, table_proxy, groups, exemplar_rows, orders): partitions = tuple( (c, g.dtype.name) for c, g in zip(self.group_cols, group_id) ) - attrs = {DASKMS_PARTITION_KEY: partitions} + attrs = { + DASKMS_METADATA: self._metadata(table_proxy), + DASKMS_PARTITION_KEY: partitions + } # Use python types which are json serializable group_id = [gid.item() for gid in group_id] diff --git a/daskms/table_proxy.py b/daskms/table_proxy.py index 5786c896..4212f8d5 100644 --- a/daskms/table_proxy.py +++ b/daskms/table_proxy.py @@ -29,6 +29,7 @@ ("nrows", READLOCK), ("colnames", READLOCK), ("getcoldesc", READLOCK), + ("getdesc", READLOCK), ("getdminfo", READLOCK), ("iswritable", READLOCK), # Modification diff --git a/daskms/tests/test_metadata.py b/daskms/tests/test_metadata.py new file mode 100644 index 00000000..0e656da8 --- /dev/null +++ b/daskms/tests/test_metadata.py @@ -0,0 +1,18 @@ +import dask +import pyrap.tables as pt + +from daskms.constants import DASKMS_METADATA +from daskms import xds_from_storage_ms, xds_to_table + +def test_provenance(ms, tmp_path_factory): + datasets = xds_from_storage_ms(ms) + + for ds in datasets: + assert ds.attrs[DASKMS_METADATA]["provenance"] == [ms] + + data_dir = tmp_path_factory.mktemp("provenance") + store = str(data_dir / "blah.ms") + dask.compute(xds_to_table(datasets, store)) + + with pt.table(str(store), ack=False) as T: + assert T.getkeywords()[DASKMS_METADATA] == {"provenance": [ms, store]} diff --git a/daskms/tests/test_ms_read_and_update.py b/daskms/tests/test_ms_read_and_update.py index a4db05bc..80d2842d 100644 --- a/daskms/tests/test_ms_read_and_update.py +++ b/daskms/tests/test_ms_read_and_update.py @@ -14,7 +14,7 @@ except ImportError: from dask.utils import key_split -from daskms.constants import DASKMS_PARTITION_KEY +from daskms.constants import DASKMS_PARTITION_KEY, DASKMS_METADATA from daskms.dask_ms import xds_from_ms, xds_from_table, xds_to_table from daskms.query import orderby_clause, where_clause from daskms.table_proxy import TableProxy, taql_factory @@ -146,12 +146,14 @@ def test_ms_update(ms, group_cols, index_cols, select_cols): for k, _ in nds.attrs[DASKMS_PARTITION_KEY]: assert getattr(write, k) == getattr(nds, k) + assert ds.attrs[DASKMS_METADATA]["provenance"] == [ms] + writes.append(write) # Do all writes in parallel dask.compute(writes) - xds = xds_from_ms( + rxds = xds_from_ms( ms, columns=select_cols, group_cols=group_cols, @@ -160,11 +162,14 @@ def test_ms_update(ms, group_cols, index_cols, select_cols): ) # Check that state and data have been correctly written - it = enumerate(zip(xds, written_states, written_data)) + it = enumerate(zip(rxds, written_states, written_data)) for i, (ds, state, data) in it: assert_array_equal(ds.STATE_ID.data, state) assert_array_equal(ds.DATA.data, data) + assert ds.attrs[DASKMS_PARTITION_KEY] == xds[i].attrs[DASKMS_PARTITION_KEY] + assert ds.attrs[DASKMS_METADATA]["provenance"] == [ms] + assert len(ds.attrs[DASKMS_PARTITION_KEY]) == len(group_cols) @pytest.mark.parametrize( "index_cols", diff --git a/daskms/writes.py b/daskms/writes.py index e8972b62..651a4256 100644 --- a/daskms/writes.py +++ b/daskms/writes.py @@ -10,7 +10,7 @@ import pyrap.tables as pt from daskms.columns import dim_extents_array -from daskms.constants import DASKMS_PARTITION_KEY +from daskms.constants import DASKMS_METADATA, DASKMS_PARTITION_KEY from daskms.dataset import Dataset from daskms.dataset_schema import DatasetSchema from daskms.descriptors.builder import AbstractDescriptorBuilder @@ -20,7 +20,7 @@ from daskms.table import table_exists from daskms.table_executor import executor_key from daskms.table_proxy import TableProxy, WRITELOCK -from daskms.utils import table_path_split +from daskms.utils import table_path_split, freeze log = logging.getLogger(__name__) @@ -566,6 +566,17 @@ def _write_datasets( table_name = "::".join((table_name, subtable)) if subtable else table_name row_orders = [] + fmeta = set(freeze(ds.attrs.get(DASKMS_METADATA, {})) for ds in datasets) + + if not len(fmeta) == 1: + raise ValueError(f"{DASKMS_METADATA} is not consistent across datasets") + + table_keywords = table_keywords or {} + table_metadata = table_keywords.get(DASKMS_METADATA, {}) + table_keywords[DASKMS_METADATA] = {**datasets[0].attrs[DASKMS_METADATA], **table_metadata} + provenance = table_keywords[DASKMS_METADATA].setdefault("provenance", []) + provenance.append(table) + # Put table and column keywords table_proxy.submit( _put_keywords, WRITELOCK, table_keywords, column_keywords @@ -709,14 +720,14 @@ def _write_datasets( def _put_keywords(table, table_keywords, column_keywords): - if table_keywords is not None: + if table_keywords: for k, v in table_keywords.items(): if v == DELKW: table.removekeyword(k) else: table.putkeyword(k, v) - if column_keywords is not None: + if column_keywords: for column, keywords in column_keywords.items(): for k, v in keywords.items(): if v == DELKW: From cd123637cad36311f548f82814b1d74fe7e2ded8 Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Mon, 19 Sep 2022 17:28:02 +0200 Subject: [PATCH 2/6] flake8 --- daskms/reads.py | 1 - daskms/tests/test_metadata.py | 1 + daskms/tests/test_ms_read_and_update.py | 3 ++- daskms/writes.py | 6 ++++-- 4 files changed, 7 insertions(+), 4 deletions(-) diff --git a/daskms/reads.py b/daskms/reads.py index 03524643..af3c26cd 100644 --- a/daskms/reads.py +++ b/daskms/reads.py @@ -363,7 +363,6 @@ def _single_dataset(self, table_proxy, orders, exemplar_row=0): DASKMS_PARTITION_KEY: {} } - return Dataset(variables, coords=coords, attrs=attrs) def _group_datasets(self, table_proxy, groups, exemplar_rows, orders): diff --git a/daskms/tests/test_metadata.py b/daskms/tests/test_metadata.py index 0e656da8..ac8b0339 100644 --- a/daskms/tests/test_metadata.py +++ b/daskms/tests/test_metadata.py @@ -4,6 +4,7 @@ from daskms.constants import DASKMS_METADATA from daskms import xds_from_storage_ms, xds_to_table + def test_provenance(ms, tmp_path_factory): datasets = xds_from_storage_ms(ms) diff --git a/daskms/tests/test_ms_read_and_update.py b/daskms/tests/test_ms_read_and_update.py index 80d2842d..deda74d7 100644 --- a/daskms/tests/test_ms_read_and_update.py +++ b/daskms/tests/test_ms_read_and_update.py @@ -167,7 +167,8 @@ def test_ms_update(ms, group_cols, index_cols, select_cols): assert_array_equal(ds.STATE_ID.data, state) assert_array_equal(ds.DATA.data, data) - assert ds.attrs[DASKMS_PARTITION_KEY] == xds[i].attrs[DASKMS_PARTITION_KEY] + orig_part_key = xds[i].attrs[DASKMS_PARTITION_KEY] + assert ds.attrs[DASKMS_PARTITION_KEY] == orig_part_key assert ds.attrs[DASKMS_METADATA]["provenance"] == [ms] assert len(ds.attrs[DASKMS_PARTITION_KEY]) == len(group_cols) diff --git a/daskms/writes.py b/daskms/writes.py index 651a4256..5caee9be 100644 --- a/daskms/writes.py +++ b/daskms/writes.py @@ -569,11 +569,13 @@ def _write_datasets( fmeta = set(freeze(ds.attrs.get(DASKMS_METADATA, {})) for ds in datasets) if not len(fmeta) == 1: - raise ValueError(f"{DASKMS_METADATA} is not consistent across datasets") + raise ValueError( + f"{DASKMS_METADATA} is not consistent across datasets") table_keywords = table_keywords or {} table_metadata = table_keywords.get(DASKMS_METADATA, {}) - table_keywords[DASKMS_METADATA] = {**datasets[0].attrs[DASKMS_METADATA], **table_metadata} + table_keywords[DASKMS_METADATA] = { + **datasets[0].attrs[DASKMS_METADATA], **table_metadata} provenance = table_keywords[DASKMS_METADATA].setdefault("provenance", []) provenance.append(table) From db940491f3459a13ae37cfd9d72427e5c685ca7c Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Thu, 22 Sep 2022 15:48:30 +0200 Subject: [PATCH 3/6] lint --- daskms/reads.py | 9 +++------ daskms/tests/test_ms_read_and_update.py | 1 + daskms/writes.py | 7 ++++--- 3 files changed, 8 insertions(+), 9 deletions(-) diff --git a/daskms/reads.py b/daskms/reads.py index af3c26cd..c55505dd 100644 --- a/daskms/reads.py +++ b/daskms/reads.py @@ -319,7 +319,7 @@ def _table_proxy_factory(self): ) def _metadata(self, table_proxy): - """ Create daskms metadata """ + """Create daskms metadata""" metadata = table_proxy.getdesc().result().get(DASKMS_METADATA, {}) provenance = metadata.setdefault("provenance", []) @@ -358,10 +358,7 @@ def _single_dataset(self, table_proxy, orders, exemplar_row=0): else: coords = {"ROWID": rowid} - attrs = { - DASKMS_METADATA: self._metadata(table_proxy), - DASKMS_PARTITION_KEY: {} - } + attrs = {DASKMS_METADATA: self._metadata(table_proxy), DASKMS_PARTITION_KEY: {}} return Dataset(variables, coords=coords, attrs=attrs) @@ -419,7 +416,7 @@ def _group_datasets(self, table_proxy, groups, exemplar_rows, orders): ) attrs = { DASKMS_METADATA: self._metadata(table_proxy), - DASKMS_PARTITION_KEY: partitions + DASKMS_PARTITION_KEY: partitions, } # Use python types which are json serializable diff --git a/daskms/tests/test_ms_read_and_update.py b/daskms/tests/test_ms_read_and_update.py index deda74d7..0513c735 100644 --- a/daskms/tests/test_ms_read_and_update.py +++ b/daskms/tests/test_ms_read_and_update.py @@ -172,6 +172,7 @@ def test_ms_update(ms, group_cols, index_cols, select_cols): assert ds.attrs[DASKMS_METADATA]["provenance"] == [ms] assert len(ds.attrs[DASKMS_PARTITION_KEY]) == len(group_cols) + @pytest.mark.parametrize( "index_cols", [ diff --git a/daskms/writes.py b/daskms/writes.py index 5caee9be..1779ecb4 100644 --- a/daskms/writes.py +++ b/daskms/writes.py @@ -569,13 +569,14 @@ def _write_datasets( fmeta = set(freeze(ds.attrs.get(DASKMS_METADATA, {})) for ds in datasets) if not len(fmeta) == 1: - raise ValueError( - f"{DASKMS_METADATA} is not consistent across datasets") + raise ValueError(f"{DASKMS_METADATA} is not consistent across datasets") table_keywords = table_keywords or {} table_metadata = table_keywords.get(DASKMS_METADATA, {}) table_keywords[DASKMS_METADATA] = { - **datasets[0].attrs[DASKMS_METADATA], **table_metadata} + **datasets[0].attrs[DASKMS_METADATA], + **table_metadata, + } provenance = table_keywords[DASKMS_METADATA].setdefault("provenance", []) provenance.append(table) From 2128d1dea3affbd9b02ca9be13a732fa974cccb7 Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Thu, 22 Sep 2022 16:00:08 +0200 Subject: [PATCH 4/6] Remove duplicate provenance during writes --- daskms/writes.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/daskms/writes.py b/daskms/writes.py index 1779ecb4..dc2ccfd6 100644 --- a/daskms/writes.py +++ b/daskms/writes.py @@ -578,6 +578,12 @@ def _write_datasets( **table_metadata, } provenance = table_keywords[DASKMS_METADATA].setdefault("provenance", []) + + try: + provenance.remove(table) + except ValueError: + pass + provenance.append(table) # Put table and column keywords From e541ad7a0b27486bd558581739b390d26e0dc76a Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Tue, 4 Oct 2022 09:52:08 +0200 Subject: [PATCH 5/6] WIP --- daskms/dataset.py | 11 +++++++++-- daskms/writes.py | 13 +++++++------ 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/daskms/dataset.py b/daskms/dataset.py index 494e133f..8f71b3ae 100644 --- a/daskms/dataset.py +++ b/daskms/dataset.py @@ -8,6 +8,7 @@ from collections import OrderedDict import dask +from daskms.constants import DASKMS_METADATA import dask.array as da from dask.highlevelgraph import HighLevelGraph import numpy as np @@ -103,7 +104,7 @@ def _convert_to_variable(k, v): if xr is not None: - from xarray import Dataset, Variable + from xarray import Dataset as BaseDataset, Variable else: # This class duplicates xarray's Frozen class in # https://github.com/pydata/xarray/blob/master/xarray/core/utils.py @@ -235,7 +236,7 @@ def __dask_postpersist__(self): args = (fn, args, self.data.name, self.dims, self.attrs) return (self.finalize_persist, args) - class Dataset: + class BaseDataset: """ Replicates a minimal subset of `xarray Dataset `_'s @@ -531,3 +532,9 @@ def __dask_postpersist__(self): for k, v in self._data_vars.items() ] return self.finalize_persist, (data_info, self._coords, self._attrs) + + +class Dataset(BaseDataset): + def __init__(self, data_vars, coords=None, attrs=None): + attrs = {DASKMS_METADATA: {}, **(attrs or {})} + super().__init__(data_vars, coords=coords, attrs=attrs) diff --git a/daskms/writes.py b/daskms/writes.py index dc2ccfd6..11936cd1 100644 --- a/daskms/writes.py +++ b/daskms/writes.py @@ -566,17 +566,18 @@ def _write_datasets( table_name = "::".join((table_name, subtable)) if subtable else table_name row_orders = [] - fmeta = set(freeze(ds.attrs.get(DASKMS_METADATA, {})) for ds in datasets) + frozen_meta = set(freeze(ds.attrs.get(DASKMS_METADATA, {})) for ds in datasets) - if not len(fmeta) == 1: + if len(frozen_meta) == 0: + metadata = {} + elif len(frozen_meta) == 1: + metadata = datasets[0].attrs.get(DASKMS_METADATA, {}) + else: raise ValueError(f"{DASKMS_METADATA} is not consistent across datasets") table_keywords = table_keywords or {} table_metadata = table_keywords.get(DASKMS_METADATA, {}) - table_keywords[DASKMS_METADATA] = { - **datasets[0].attrs[DASKMS_METADATA], - **table_metadata, - } + table_keywords[DASKMS_METADATA] = {**metadata, **table_metadata} provenance = table_keywords[DASKMS_METADATA].setdefault("provenance", []) try: From 166e3eaa521b72fadb68cb85ca8f2db85631bba4 Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Fri, 28 Oct 2022 11:57:16 +0200 Subject: [PATCH 6/6] Remove extraneous makesubrecord=True --- daskms/writes.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/daskms/writes.py b/daskms/writes.py index b2fba521..085d1b95 100644 --- a/daskms/writes.py +++ b/daskms/writes.py @@ -751,7 +751,7 @@ def _put_keywords(table, table_keywords, column_keywords): if v == DELKW: table.removecolkeyword(column, k) else: - table.putcolkeyword(column, k, v, makesubrecord=True) + table.putcolkeyword(column, k, v) return True