ntv-numpy.ntv_numpy.xconnector

@author: Philippe@loco-labs.io

The xconnector module is part of the ntv-numpy.ntv_numpy package (specification document).

It contains the XarrayConnec class for the Xarray interface and the ScippConnec class for Scipp interface.

For more information, see the user guide or the github repository.

  1# -*- coding: utf-8 -*-
  2"""
  3@author: Philippe@loco-labs.io
  4
  5The `xconnector` module is part of the `ntv-numpy.ntv_numpy` package ([specification document](
  6https://loco-philippe.github.io/ES/JSON%20semantic%20format%20(JSON-NTV).htm)).
  7
  8It contains the `XarrayConnec` class for the Xarray interface and the `ScippConnec`
  9 class for Scipp interface.
 10
 11For more information, see the
 12[user guide](https://loco-philippe.github.io/ntv-numpy/docs/user_guide.html)
 13 or the [github repository](https://github.com/loco-philippe/ntv-numpy).
 14"""
 15
 16
 17import xarray as xr
 18import scipp as sc
 19from astropy import wcs
 20from astropy.nddata import NDData
 21from astropy.nddata.nduncertainty import StdDevUncertainty, VarianceUncertainty
 22from astropy.nddata.nduncertainty import InverseVariance
 23from ntv_numpy.ndarray import Nutil, Ndarray
 24from ntv_numpy.xndarray import Xndarray
 25
 26
 27class AstropyNDDataConnec:
 28    ''' NDData interface with two static methods ximport and xexport'''
 29
 30    @staticmethod
 31    def xexport(xdt, **kwargs):
 32        '''return a NDData from a Xdataset'''
 33        data = xdt['data'].ndarray
 34        mask = xdt['data.mask'].ndarray
 35        unit = xdt['data'].nda.ntvtype.extension
 36        uncert = xdt['data.uncertainty'].ndarray
 37        typ_u = xdt['data.uncertainty'].nda.ntvtype.extension
 38        match typ_u:
 39            case 'std':
 40                uncertainty = StdDevUncertainty(uncert)
 41            case 'var':
 42                uncertainty = VarianceUncertainty(uncert)
 43            case 'inv':
 44                uncertainty = InverseVariance(uncert)
 45            case _:
 46                uncertainty = uncert
 47        meta = xdt['meta'].meta | {'name': xdt.name}
 48        wcs_dic = xdt['wcs'].meta
 49        psf = xdt['psf'].ndarray
 50        return NDData(data, mask=mask, unit=unit, uncertainty=uncertainty,
 51                      meta=meta, wcs=wcs.WCS(wcs_dic), psf=psf)
 52
 53    @staticmethod
 54    def ximport(ndd, Xclass, **kwargs):
 55        '''return a Xdataset from a astropy.NDData'''
 56        xnd = []
 57        name = 'no_name'
 58        unit = ndd.unit.to_string() if not ndd.unit is None else None
 59        ntv_type = Nutil.ntv_type(ndd.data.dtype.name, ext=unit)
 60        xnd += [Xndarray('data', nda=Ndarray(ndd.data, ntv_type=ntv_type))]
 61        if ndd.meta:
 62            meta = {key: val for key, val in ndd.meta.items() if key != 'name'}
 63            name = ndd.meta.get('name', 'no_name')
 64            xnd += [Xndarray('meta', meta=meta)]
 65        if ndd.wcs:
 66            xnd += [Xndarray('wcs', meta=dict(ndd.wcs.to_header()))]
 67        if not ndd.psf is None:
 68            xnd += [Xndarray('psf', nda=Ndarray(ndd.psf, ntv_type=ntv_type))]
 69        if not ndd.mask is None:
 70            xnd += [Xndarray('data.mask', nda=ndd.mask)]
 71        if not ndd.uncertainty is None:
 72            typ_u = ndd.uncertainty.__class__.__name__[:3].lower()
 73            ntv_type = Nutil.ntv_type(
 74                ndd.uncertainty.array.dtype.name, ext=typ_u)
 75            nda = Ndarray(ndd.uncertainty.array, ntv_type=ntv_type)
 76            xnd += [Xndarray('data.uncertainty', nda=nda)]
 77        return Xclass(xnd, name).to_canonical()
 78
 79
 80class XarrayConnec:
 81    ''' Xarray interface with two static methods ximport and xexport'''
 82
 83    @staticmethod
 84    def xexport(xdt, **kwargs):
 85        '''return a xr.DataArray or a xr.Dataset from a Xdataset
 86
 87        *Parameters*
 88
 89        - **dataset** : Boolean (default True) - if False and a single data_var,
 90        return a sc.DataArray
 91        - **datagroup** : Boolean (default True) - if True, return a sc.DataGroup
 92        which contains the sc.DataArray/sc.Dataset and the other data else only
 93        sc.DataArray/sc.Dataset
 94        '''
 95        option = {'dataset': True, 'datagroup': True} | kwargs
 96        coords = XarrayConnec._to_xr_vars(
 97            xdt, xdt.dimensions + xdt.coordinates)
 98        coords |= XarrayConnec._to_xr_vars(xdt, xdt.additionals)
 99        attrs = XarrayConnec._to_xr_attrs(xdt, **option)
100        if len(xdt.data_vars) == 1 and not option['dataset']:
101            var_name = xdt.data_vars[0]
102            data = xdt.to_ndarray(var_name)
103            dims = xdt.dims(var_name)
104            attrs |= {'ntv_type': xdt[var_name].nda.ntv_type}
105            attrs |= xdt[var_name].meta if xdt[var_name].meta else {}
106            name = var_name if var_name != 'data' else None
107            return xr.DataArray(data=data, coords=coords, dims=dims, attrs=attrs,
108                                name=name)
109        data_vars = XarrayConnec._to_xr_vars(xdt, xdt.data_vars)
110        xrd = xr.Dataset(data_vars, coords=coords, attrs=attrs)
111        return xrd
112
113    @staticmethod
114    def ximport(xar, Xclass, **kwargs):
115        '''return a Xdataset from a xr.DataArray or a xr.Dataset'''
116        xnd = []
117        if xar.attrs:
118            attrs = {k: v for k, v in xar.attrs.items() if not k in [
119                'name', 'ntv_type']}
120            for name, meta in attrs.items():
121                if isinstance(meta, list):
122                    xnd += [Xndarray.read_json({name: meta})]
123                else:
124                    xnd += [Xndarray(name, meta=meta)]
125        for coord in xar.coords:
126            xnd += [XarrayConnec._var_xr_to_xnd(xar.coords[coord])]
127            if list(xar.coords[coord].dims) == list(xar.dims) and isinstance(xar, xr.Dataset):
128                xnd[-1].links = [list(xar.data_vars)[0]]
129        if isinstance(xar, xr.DataArray):
130            var = XarrayConnec._var_xr_to_xnd(
131                xar, name='data', add_attrs=False)
132            xnd += [XarrayConnec._var_xr_to_xnd(xar,
133                                                name='data', add_attrs=False)]
134            xdt = Xclass(xnd, xar.attrs.get('name'))
135            for var in xdt.data_vars:
136                if var != xar.name and xar.name:
137                    xdt[var].links = [xar.name]
138            return xdt.to_canonical()
139        for var in xar.data_vars:
140            xnd += [XarrayConnec._var_xr_to_xnd(xar.data_vars[var])]
141        return Xclass(xnd, xar.attrs.get('name')).to_canonical()
142
143    @staticmethod
144    def _var_xr_to_xnd(xar, name=None, add_attrs=True):
145        '''return a Xndarray from a Xarray variable
146
147        *Parameters*
148
149        - **xar** : Xarray variable to convert in Xndarray,
150        - **name** : string (default None) - default name if xar have non name,
151        - **add_attrs** : boolean (default True) - if False, attrs are not converted
152        '''
153        full_name = xar.name if xar.name else name
154        name = Nutil.split_name(full_name)[0]
155        dims = None if xar.dims == (name,) else list(xar.dims)
156        ntv_type = xar.attrs.get('ntv_type')
157        nda = xar.values
158        if nda.dtype.name == 'datetime64[ns]' and ntv_type:
159            nda = Nutil.convert(ntv_type, nda, tojson=False)
160        attrs = {k: v for k, v in xar.attrs.items()
161                 if not k in ['ntv_type', 'name']} if add_attrs else {}
162        return Xndarray(full_name, Ndarray(nda, ntv_type), dims, attrs)
163
164    @staticmethod
165    def _to_xr_attrs(xdt, **option):
166        '''return a dict with attributes from a Xdataset
167
168        *Parameters*
169
170        - **datagroup** : Boolean  if True, add json representation of 'relative'
171        Xndarrays and 'data_arrays' Xndarrays
172        '''
173        attrs = {meta: xdt[meta].meta for meta in xdt.metadata}
174        attrs |= {'name': xdt.name} if xdt.name else {}
175        if option['datagroup']:
176            for name in xdt.names:
177                if xdt[name].mode == 'relative':
178                    attrs |= xdt[name].to_json(header=False)
179            for name in xdt.data_arrays:
180                attrs |= xdt[name].to_json(header=False)
181        return attrs
182
183    @staticmethod
184    def _to_xr_coord(xdt, name):
185        '''return a dict with Xarray attributes from a Xndarray defined by his name'''
186        data = xdt.to_ndarray(name)
187        if name in xdt.additionals and not xdt[name].links:
188            data = data.reshape(xdt.shape_dims(xdt[name].name))
189        dims = tuple(xdt.dims(name)) if xdt.dims(name) else (xdt[name].name)
190        meta = {'ntv_type': xdt[name].ntv_type} | (
191            xdt[name].meta if xdt[name].meta else {})
192        return {name: (dims, data, meta)}
193
194    @staticmethod
195    def _to_xr_vars(xdt, list_names):
196        '''return a dict with Xarray attributes from a list of Xndarray names'''
197        arg_vars = {}
198        valid_names = [
199            name for name in list_names if xdt[name].mode == 'absolute']
200        for xnd_name in valid_names:
201            arg_vars |= XarrayConnec._to_xr_coord(xdt, xnd_name)
202        return arg_vars
203
204    @staticmethod
205    def _xr_add_type(xar):
206        '''add 'ntv_type' as attribute for a xr.DataArray'''
207        if isinstance(xar, xr.DataArray) and not 'ntv_type' in xar.attrs:
208            xar.attrs |= {'ntv_type': Nutil.ntv_type(xar.data.dtype.name)}
209            return
210        for coord in xar.coords:
211            XarrayConnec._xr_add_type(coord)
212        for var in xar.data_vars:
213            XarrayConnec._xr_add_type(var)
214        return
215
216
217class ScippConnec:
218    ''' Scipp interface with two static methods ximport and xexport'''
219
220    SCTYPE_DTYPE = {'string': 'str'}
221
222    @staticmethod
223    def xexport(xdt, **kwargs):
224        '''return a sc.DataArray or a sc.Dataset from a xdataset
225
226        *Parameters*
227
228        - **dataset** : Boolean (default True) - if False and a single data_var,
229        return a DataArray
230        - **datagroup** : Boolean (default True) - if True return a DataGroup with
231        metadata and data_arrays
232        - **ntv_type** : Boolean (default True) - if True add ntv-type to the name
233        '''
234        option = {'dataset': True, 'datagroup': True,
235                  'ntv_type': True} | kwargs
236        coords = dict([ScippConnec._to_scipp_var(xdt, name, **option)
237                       for name in xdt.coordinates + xdt.dimensions
238                       if xdt[name].mode == 'absolute'])
239        scd = sc.Dataset(dict([ScippConnec._to_sc_dataarray(xdt, name, coords, **option)
240                               for name in xdt.data_vars
241                               if xdt[name].mode == 'absolute']))
242        scd = scd if option['dataset'] else scd[list(scd)[0]]
243        if not option['datagroup']:
244            return scd
245        sc_name = xdt.name if xdt.name else 'no_name'
246        return sc.DataGroup({sc_name: scd} | ScippConnec._to_scipp_grp(xdt, **option))
247
248    @staticmethod
249    def ximport(sc_obj, Xclass, **kwargs):
250        '''return a xdataset from a scipp object DataArray, Dataset or DataGroup'''
251        xnd = []
252        scd = sc_obj
253        xnd_name = None
254        if isinstance(sc_obj, sc.DataGroup):
255            for obj in sc_obj:
256                if isinstance(sc_obj[obj], (sc.Dataset, sc.DataArray)):
257                    scd = sc_obj[obj]
258                    xnd_name = obj
259                    break
260        if isinstance(scd, sc.DataArray):
261            scd = sc.Dataset({(scd.name if scd.name else 'no_name'): scd})
262        if isinstance(scd, sc.Dataset):
263            for coord in scd.coords:
264                xnd += ScippConnec._var_sc_to_xnd(
265                    scd.coords[coord], scd, coord)
266            for var in scd:
267                for mask in scd[var].masks:
268                    m_var = Nutil.split_json_name(var)[0]
269                    xnd += ScippConnec._var_sc_to_xnd(
270                        scd[var].masks[mask], scd, mask, m_var)
271                xnd += ScippConnec._var_sc_to_xnd(scd[var].data, scd, var)
272        if isinstance(sc_obj, sc.DataGroup):
273            xnd = ScippConnec._grp_sc_to_xnd(sc_obj, xnd)
274        return Xclass(xnd, xnd_name).to_canonical()
275
276    @staticmethod
277    def _grp_sc_to_xnd(sc_obj, xnd):
278        '''return a list of Xndarray from a scipp variable'''
279        dic_xnd = {xar.name: xar for xar in xnd}
280        for obj in sc_obj:
281            name, add_name = Nutil.split_name(obj)
282            match [name, add_name, sc_obj[obj]]:
283                case [name, None, list()]:
284                    xnd += [Xndarray.read_json({name: sc_obj[obj]})]
285                case [name, add_name, sc.Variable()]:
286                    xnd += ScippConnec._var_sc_to_xnd(
287                        sc_obj[obj], None, add_name, name)
288                case [name, _, dict() | str() | list()] if name in dic_xnd:
289                    if dic_xnd[name].meta:
290                        dic_xnd[name].meta |= sc_obj[obj]
291                    else:
292                        dic_xnd[name].meta = sc_obj[obj]
293                case [name, _, dict() | str() | list()]:
294                    xnd += [Xndarray.read_json({name: sc_obj[obj]})]
295                case [_, _, _]: ...
296        return xnd
297
298    @staticmethod
299    def _var_sc_to_xnd(scv, scd=None, sc_name='', var=None):
300        '''return a list of Xndarray from a scipp variable
301        - var : name
302        - sc_name : scipp name'''
303        l_xnda = []
304        unit = scv.unit.name if scv.unit and not scv.unit in [
305            'dimensionless', 'ns'] else ''
306        ext_name, typ1 = Nutil.split_json_name(sc_name, True)
307        var_name, typ2 = Nutil.split_json_name(var, True)
308        full_name = var_name + \
309            ('.' if var_name and ext_name else '') + ext_name
310        ntv_type_base = typ1 + typ2
311        ntv_type = ntv_type_base + ('[' + unit + ']' if unit else '')
312
313        links = [Nutil.split_json_name(jsn)[0] for jsn in scv.dims]
314        if not scd is None and sc_name in scd.coords and scv.dims == scd.dims:
315            links = [Nutil.split_json_name(list(scd)[0])[0]]
316        if not scv.variances is None:
317            nda = Ndarray(scv.variances, ntv_type_base)
318            l_xnda.append(Xndarray(full_name + '.variance', nda, links))
319        nda = Ndarray(scv.values, ntv_type)
320        l_xnda.append(Xndarray(full_name, nda, links))
321        return l_xnda
322
323    @staticmethod
324    def _to_sc_dataarray(xdt, name, coords, **option):
325        '''return a scipp.DataArray from a xdataset.global_var defined by his name'''
326        scipp_name, data = ScippConnec._to_scipp_var(xdt, name, **option)
327        masks = dict([ScippConnec._to_scipp_var(xdt, nam, **option)
328                     for nam in set(xdt.var_group(name)) & set(xdt.masks)])
329        return (scipp_name, sc.DataArray(data, coords=coords, masks=masks))
330
331    @staticmethod
332    def _to_scipp_grp(xdt, **option):
333        '''return a dict with metadata, data-array and data_add from a xdataset'''
334        grp = {}
335        grp |= dict([ScippConnec._to_scipp_var(xdt, name, **option)
336                     for name in xdt.data_add + xdt.data_arrays
337                     if xdt[name].add_name != 'variance'])
338        opt_mask = option | {'grp_mask': True}
339        grp |= dict([ScippConnec._to_scipp_var(xdt, name, **opt_mask)
340                     for name in xdt.masks
341                     if xdt[name].name in xdt.names and not xdt[name].name in xdt.data_vars])
342        grp |= {name + '.meta': xdt[name].meta for name in xdt.names
343                if xdt[name].meta}
344        for name in xdt.names:
345            if xdt[name].mode == 'relative':
346                grp |= xdt[name].to_json(header=False)
347        return grp
348
349    @staticmethod
350    def _to_scipp_var(xdt, name, **kwargs):
351        '''return a scipp.Variable from a Xndarray defined by his name'''
352        option = {'grp_mask': False, 'ntv_type': True} | kwargs
353        add_name = Nutil.split_name(name)[1]
354        new_n = add_name if name in xdt.masks and not option['grp_mask'] else name
355        opt_n = option['ntv_type']
356        values = xdt.to_ndarray(name)
357        vari_name = name + '.variance'
358        variances = xdt[vari_name].darray if vari_name in xdt.names else None
359        if not variances is None:
360            variances = variances.reshape(xdt.shape_dims(vari_name))
361        dims = xdt.dims(name, opt_n) if xdt.dims(
362            name, opt_n) else [xdt[name].name]
363        simple_type, unit = Nutil.split_type(xdt[name].ntv_type)
364        scipp_name = new_n + (':' + simple_type if opt_n else '')
365        unit = unit if unit else ''
366        return (scipp_name, sc.array(dims=dims, values=values,
367                                     variances=variances, unit=unit))
class AstropyNDDataConnec:
28class AstropyNDDataConnec:
29    ''' NDData interface with two static methods ximport and xexport'''
30
31    @staticmethod
32    def xexport(xdt, **kwargs):
33        '''return a NDData from a Xdataset'''
34        data = xdt['data'].ndarray
35        mask = xdt['data.mask'].ndarray
36        unit = xdt['data'].nda.ntvtype.extension
37        uncert = xdt['data.uncertainty'].ndarray
38        typ_u = xdt['data.uncertainty'].nda.ntvtype.extension
39        match typ_u:
40            case 'std':
41                uncertainty = StdDevUncertainty(uncert)
42            case 'var':
43                uncertainty = VarianceUncertainty(uncert)
44            case 'inv':
45                uncertainty = InverseVariance(uncert)
46            case _:
47                uncertainty = uncert
48        meta = xdt['meta'].meta | {'name': xdt.name}
49        wcs_dic = xdt['wcs'].meta
50        psf = xdt['psf'].ndarray
51        return NDData(data, mask=mask, unit=unit, uncertainty=uncertainty,
52                      meta=meta, wcs=wcs.WCS(wcs_dic), psf=psf)
53
54    @staticmethod
55    def ximport(ndd, Xclass, **kwargs):
56        '''return a Xdataset from a astropy.NDData'''
57        xnd = []
58        name = 'no_name'
59        unit = ndd.unit.to_string() if not ndd.unit is None else None
60        ntv_type = Nutil.ntv_type(ndd.data.dtype.name, ext=unit)
61        xnd += [Xndarray('data', nda=Ndarray(ndd.data, ntv_type=ntv_type))]
62        if ndd.meta:
63            meta = {key: val for key, val in ndd.meta.items() if key != 'name'}
64            name = ndd.meta.get('name', 'no_name')
65            xnd += [Xndarray('meta', meta=meta)]
66        if ndd.wcs:
67            xnd += [Xndarray('wcs', meta=dict(ndd.wcs.to_header()))]
68        if not ndd.psf is None:
69            xnd += [Xndarray('psf', nda=Ndarray(ndd.psf, ntv_type=ntv_type))]
70        if not ndd.mask is None:
71            xnd += [Xndarray('data.mask', nda=ndd.mask)]
72        if not ndd.uncertainty is None:
73            typ_u = ndd.uncertainty.__class__.__name__[:3].lower()
74            ntv_type = Nutil.ntv_type(
75                ndd.uncertainty.array.dtype.name, ext=typ_u)
76            nda = Ndarray(ndd.uncertainty.array, ntv_type=ntv_type)
77            xnd += [Xndarray('data.uncertainty', nda=nda)]
78        return Xclass(xnd, name).to_canonical()

NDData interface with two static methods ximport and xexport

@staticmethod
def xexport(xdt, **kwargs):
31    @staticmethod
32    def xexport(xdt, **kwargs):
33        '''return a NDData from a Xdataset'''
34        data = xdt['data'].ndarray
35        mask = xdt['data.mask'].ndarray
36        unit = xdt['data'].nda.ntvtype.extension
37        uncert = xdt['data.uncertainty'].ndarray
38        typ_u = xdt['data.uncertainty'].nda.ntvtype.extension
39        match typ_u:
40            case 'std':
41                uncertainty = StdDevUncertainty(uncert)
42            case 'var':
43                uncertainty = VarianceUncertainty(uncert)
44            case 'inv':
45                uncertainty = InverseVariance(uncert)
46            case _:
47                uncertainty = uncert
48        meta = xdt['meta'].meta | {'name': xdt.name}
49        wcs_dic = xdt['wcs'].meta
50        psf = xdt['psf'].ndarray
51        return NDData(data, mask=mask, unit=unit, uncertainty=uncertainty,
52                      meta=meta, wcs=wcs.WCS(wcs_dic), psf=psf)

return a NDData from a Xdataset

@staticmethod
def ximport(ndd, Xclass, **kwargs):
54    @staticmethod
55    def ximport(ndd, Xclass, **kwargs):
56        '''return a Xdataset from a astropy.NDData'''
57        xnd = []
58        name = 'no_name'
59        unit = ndd.unit.to_string() if not ndd.unit is None else None
60        ntv_type = Nutil.ntv_type(ndd.data.dtype.name, ext=unit)
61        xnd += [Xndarray('data', nda=Ndarray(ndd.data, ntv_type=ntv_type))]
62        if ndd.meta:
63            meta = {key: val for key, val in ndd.meta.items() if key != 'name'}
64            name = ndd.meta.get('name', 'no_name')
65            xnd += [Xndarray('meta', meta=meta)]
66        if ndd.wcs:
67            xnd += [Xndarray('wcs', meta=dict(ndd.wcs.to_header()))]
68        if not ndd.psf is None:
69            xnd += [Xndarray('psf', nda=Ndarray(ndd.psf, ntv_type=ntv_type))]
70        if not ndd.mask is None:
71            xnd += [Xndarray('data.mask', nda=ndd.mask)]
72        if not ndd.uncertainty is None:
73            typ_u = ndd.uncertainty.__class__.__name__[:3].lower()
74            ntv_type = Nutil.ntv_type(
75                ndd.uncertainty.array.dtype.name, ext=typ_u)
76            nda = Ndarray(ndd.uncertainty.array, ntv_type=ntv_type)
77            xnd += [Xndarray('data.uncertainty', nda=nda)]
78        return Xclass(xnd, name).to_canonical()

return a Xdataset from a astropy.NDData

class XarrayConnec:
 81class XarrayConnec:
 82    ''' Xarray interface with two static methods ximport and xexport'''
 83
 84    @staticmethod
 85    def xexport(xdt, **kwargs):
 86        '''return a xr.DataArray or a xr.Dataset from a Xdataset
 87
 88        *Parameters*
 89
 90        - **dataset** : Boolean (default True) - if False and a single data_var,
 91        return a sc.DataArray
 92        - **datagroup** : Boolean (default True) - if True, return a sc.DataGroup
 93        which contains the sc.DataArray/sc.Dataset and the other data else only
 94        sc.DataArray/sc.Dataset
 95        '''
 96        option = {'dataset': True, 'datagroup': True} | kwargs
 97        coords = XarrayConnec._to_xr_vars(
 98            xdt, xdt.dimensions + xdt.coordinates)
 99        coords |= XarrayConnec._to_xr_vars(xdt, xdt.additionals)
100        attrs = XarrayConnec._to_xr_attrs(xdt, **option)
101        if len(xdt.data_vars) == 1 and not option['dataset']:
102            var_name = xdt.data_vars[0]
103            data = xdt.to_ndarray(var_name)
104            dims = xdt.dims(var_name)
105            attrs |= {'ntv_type': xdt[var_name].nda.ntv_type}
106            attrs |= xdt[var_name].meta if xdt[var_name].meta else {}
107            name = var_name if var_name != 'data' else None
108            return xr.DataArray(data=data, coords=coords, dims=dims, attrs=attrs,
109                                name=name)
110        data_vars = XarrayConnec._to_xr_vars(xdt, xdt.data_vars)
111        xrd = xr.Dataset(data_vars, coords=coords, attrs=attrs)
112        return xrd
113
114    @staticmethod
115    def ximport(xar, Xclass, **kwargs):
116        '''return a Xdataset from a xr.DataArray or a xr.Dataset'''
117        xnd = []
118        if xar.attrs:
119            attrs = {k: v for k, v in xar.attrs.items() if not k in [
120                'name', 'ntv_type']}
121            for name, meta in attrs.items():
122                if isinstance(meta, list):
123                    xnd += [Xndarray.read_json({name: meta})]
124                else:
125                    xnd += [Xndarray(name, meta=meta)]
126        for coord in xar.coords:
127            xnd += [XarrayConnec._var_xr_to_xnd(xar.coords[coord])]
128            if list(xar.coords[coord].dims) == list(xar.dims) and isinstance(xar, xr.Dataset):
129                xnd[-1].links = [list(xar.data_vars)[0]]
130        if isinstance(xar, xr.DataArray):
131            var = XarrayConnec._var_xr_to_xnd(
132                xar, name='data', add_attrs=False)
133            xnd += [XarrayConnec._var_xr_to_xnd(xar,
134                                                name='data', add_attrs=False)]
135            xdt = Xclass(xnd, xar.attrs.get('name'))
136            for var in xdt.data_vars:
137                if var != xar.name and xar.name:
138                    xdt[var].links = [xar.name]
139            return xdt.to_canonical()
140        for var in xar.data_vars:
141            xnd += [XarrayConnec._var_xr_to_xnd(xar.data_vars[var])]
142        return Xclass(xnd, xar.attrs.get('name')).to_canonical()
143
144    @staticmethod
145    def _var_xr_to_xnd(xar, name=None, add_attrs=True):
146        '''return a Xndarray from a Xarray variable
147
148        *Parameters*
149
150        - **xar** : Xarray variable to convert in Xndarray,
151        - **name** : string (default None) - default name if xar have non name,
152        - **add_attrs** : boolean (default True) - if False, attrs are not converted
153        '''
154        full_name = xar.name if xar.name else name
155        name = Nutil.split_name(full_name)[0]
156        dims = None if xar.dims == (name,) else list(xar.dims)
157        ntv_type = xar.attrs.get('ntv_type')
158        nda = xar.values
159        if nda.dtype.name == 'datetime64[ns]' and ntv_type:
160            nda = Nutil.convert(ntv_type, nda, tojson=False)
161        attrs = {k: v for k, v in xar.attrs.items()
162                 if not k in ['ntv_type', 'name']} if add_attrs else {}
163        return Xndarray(full_name, Ndarray(nda, ntv_type), dims, attrs)
164
165    @staticmethod
166    def _to_xr_attrs(xdt, **option):
167        '''return a dict with attributes from a Xdataset
168
169        *Parameters*
170
171        - **datagroup** : Boolean  if True, add json representation of 'relative'
172        Xndarrays and 'data_arrays' Xndarrays
173        '''
174        attrs = {meta: xdt[meta].meta for meta in xdt.metadata}
175        attrs |= {'name': xdt.name} if xdt.name else {}
176        if option['datagroup']:
177            for name in xdt.names:
178                if xdt[name].mode == 'relative':
179                    attrs |= xdt[name].to_json(header=False)
180            for name in xdt.data_arrays:
181                attrs |= xdt[name].to_json(header=False)
182        return attrs
183
184    @staticmethod
185    def _to_xr_coord(xdt, name):
186        '''return a dict with Xarray attributes from a Xndarray defined by his name'''
187        data = xdt.to_ndarray(name)
188        if name in xdt.additionals and not xdt[name].links:
189            data = data.reshape(xdt.shape_dims(xdt[name].name))
190        dims = tuple(xdt.dims(name)) if xdt.dims(name) else (xdt[name].name)
191        meta = {'ntv_type': xdt[name].ntv_type} | (
192            xdt[name].meta if xdt[name].meta else {})
193        return {name: (dims, data, meta)}
194
195    @staticmethod
196    def _to_xr_vars(xdt, list_names):
197        '''return a dict with Xarray attributes from a list of Xndarray names'''
198        arg_vars = {}
199        valid_names = [
200            name for name in list_names if xdt[name].mode == 'absolute']
201        for xnd_name in valid_names:
202            arg_vars |= XarrayConnec._to_xr_coord(xdt, xnd_name)
203        return arg_vars
204
205    @staticmethod
206    def _xr_add_type(xar):
207        '''add 'ntv_type' as attribute for a xr.DataArray'''
208        if isinstance(xar, xr.DataArray) and not 'ntv_type' in xar.attrs:
209            xar.attrs |= {'ntv_type': Nutil.ntv_type(xar.data.dtype.name)}
210            return
211        for coord in xar.coords:
212            XarrayConnec._xr_add_type(coord)
213        for var in xar.data_vars:
214            XarrayConnec._xr_add_type(var)
215        return

Xarray interface with two static methods ximport and xexport

@staticmethod
def xexport(xdt, **kwargs):
 84    @staticmethod
 85    def xexport(xdt, **kwargs):
 86        '''return a xr.DataArray or a xr.Dataset from a Xdataset
 87
 88        *Parameters*
 89
 90        - **dataset** : Boolean (default True) - if False and a single data_var,
 91        return a sc.DataArray
 92        - **datagroup** : Boolean (default True) - if True, return a sc.DataGroup
 93        which contains the sc.DataArray/sc.Dataset and the other data else only
 94        sc.DataArray/sc.Dataset
 95        '''
 96        option = {'dataset': True, 'datagroup': True} | kwargs
 97        coords = XarrayConnec._to_xr_vars(
 98            xdt, xdt.dimensions + xdt.coordinates)
 99        coords |= XarrayConnec._to_xr_vars(xdt, xdt.additionals)
100        attrs = XarrayConnec._to_xr_attrs(xdt, **option)
101        if len(xdt.data_vars) == 1 and not option['dataset']:
102            var_name = xdt.data_vars[0]
103            data = xdt.to_ndarray(var_name)
104            dims = xdt.dims(var_name)
105            attrs |= {'ntv_type': xdt[var_name].nda.ntv_type}
106            attrs |= xdt[var_name].meta if xdt[var_name].meta else {}
107            name = var_name if var_name != 'data' else None
108            return xr.DataArray(data=data, coords=coords, dims=dims, attrs=attrs,
109                                name=name)
110        data_vars = XarrayConnec._to_xr_vars(xdt, xdt.data_vars)
111        xrd = xr.Dataset(data_vars, coords=coords, attrs=attrs)
112        return xrd

return a xr.DataArray or a xr.Dataset from a Xdataset

Parameters

  • dataset : Boolean (default True) - if False and a single data_var, return a sc.DataArray
  • datagroup : Boolean (default True) - if True, return a sc.DataGroup which contains the sc.DataArray/sc.Dataset and the other data else only sc.DataArray/sc.Dataset
@staticmethod
def ximport(xar, Xclass, **kwargs):
114    @staticmethod
115    def ximport(xar, Xclass, **kwargs):
116        '''return a Xdataset from a xr.DataArray or a xr.Dataset'''
117        xnd = []
118        if xar.attrs:
119            attrs = {k: v for k, v in xar.attrs.items() if not k in [
120                'name', 'ntv_type']}
121            for name, meta in attrs.items():
122                if isinstance(meta, list):
123                    xnd += [Xndarray.read_json({name: meta})]
124                else:
125                    xnd += [Xndarray(name, meta=meta)]
126        for coord in xar.coords:
127            xnd += [XarrayConnec._var_xr_to_xnd(xar.coords[coord])]
128            if list(xar.coords[coord].dims) == list(xar.dims) and isinstance(xar, xr.Dataset):
129                xnd[-1].links = [list(xar.data_vars)[0]]
130        if isinstance(xar, xr.DataArray):
131            var = XarrayConnec._var_xr_to_xnd(
132                xar, name='data', add_attrs=False)
133            xnd += [XarrayConnec._var_xr_to_xnd(xar,
134                                                name='data', add_attrs=False)]
135            xdt = Xclass(xnd, xar.attrs.get('name'))
136            for var in xdt.data_vars:
137                if var != xar.name and xar.name:
138                    xdt[var].links = [xar.name]
139            return xdt.to_canonical()
140        for var in xar.data_vars:
141            xnd += [XarrayConnec._var_xr_to_xnd(xar.data_vars[var])]
142        return Xclass(xnd, xar.attrs.get('name')).to_canonical()

return a Xdataset from a xr.DataArray or a xr.Dataset

class ScippConnec:
218class ScippConnec:
219    ''' Scipp interface with two static methods ximport and xexport'''
220
221    SCTYPE_DTYPE = {'string': 'str'}
222
223    @staticmethod
224    def xexport(xdt, **kwargs):
225        '''return a sc.DataArray or a sc.Dataset from a xdataset
226
227        *Parameters*
228
229        - **dataset** : Boolean (default True) - if False and a single data_var,
230        return a DataArray
231        - **datagroup** : Boolean (default True) - if True return a DataGroup with
232        metadata and data_arrays
233        - **ntv_type** : Boolean (default True) - if True add ntv-type to the name
234        '''
235        option = {'dataset': True, 'datagroup': True,
236                  'ntv_type': True} | kwargs
237        coords = dict([ScippConnec._to_scipp_var(xdt, name, **option)
238                       for name in xdt.coordinates + xdt.dimensions
239                       if xdt[name].mode == 'absolute'])
240        scd = sc.Dataset(dict([ScippConnec._to_sc_dataarray(xdt, name, coords, **option)
241                               for name in xdt.data_vars
242                               if xdt[name].mode == 'absolute']))
243        scd = scd if option['dataset'] else scd[list(scd)[0]]
244        if not option['datagroup']:
245            return scd
246        sc_name = xdt.name if xdt.name else 'no_name'
247        return sc.DataGroup({sc_name: scd} | ScippConnec._to_scipp_grp(xdt, **option))
248
249    @staticmethod
250    def ximport(sc_obj, Xclass, **kwargs):
251        '''return a xdataset from a scipp object DataArray, Dataset or DataGroup'''
252        xnd = []
253        scd = sc_obj
254        xnd_name = None
255        if isinstance(sc_obj, sc.DataGroup):
256            for obj in sc_obj:
257                if isinstance(sc_obj[obj], (sc.Dataset, sc.DataArray)):
258                    scd = sc_obj[obj]
259                    xnd_name = obj
260                    break
261        if isinstance(scd, sc.DataArray):
262            scd = sc.Dataset({(scd.name if scd.name else 'no_name'): scd})
263        if isinstance(scd, sc.Dataset):
264            for coord in scd.coords:
265                xnd += ScippConnec._var_sc_to_xnd(
266                    scd.coords[coord], scd, coord)
267            for var in scd:
268                for mask in scd[var].masks:
269                    m_var = Nutil.split_json_name(var)[0]
270                    xnd += ScippConnec._var_sc_to_xnd(
271                        scd[var].masks[mask], scd, mask, m_var)
272                xnd += ScippConnec._var_sc_to_xnd(scd[var].data, scd, var)
273        if isinstance(sc_obj, sc.DataGroup):
274            xnd = ScippConnec._grp_sc_to_xnd(sc_obj, xnd)
275        return Xclass(xnd, xnd_name).to_canonical()
276
277    @staticmethod
278    def _grp_sc_to_xnd(sc_obj, xnd):
279        '''return a list of Xndarray from a scipp variable'''
280        dic_xnd = {xar.name: xar for xar in xnd}
281        for obj in sc_obj:
282            name, add_name = Nutil.split_name(obj)
283            match [name, add_name, sc_obj[obj]]:
284                case [name, None, list()]:
285                    xnd += [Xndarray.read_json({name: sc_obj[obj]})]
286                case [name, add_name, sc.Variable()]:
287                    xnd += ScippConnec._var_sc_to_xnd(
288                        sc_obj[obj], None, add_name, name)
289                case [name, _, dict() | str() | list()] if name in dic_xnd:
290                    if dic_xnd[name].meta:
291                        dic_xnd[name].meta |= sc_obj[obj]
292                    else:
293                        dic_xnd[name].meta = sc_obj[obj]
294                case [name, _, dict() | str() | list()]:
295                    xnd += [Xndarray.read_json({name: sc_obj[obj]})]
296                case [_, _, _]: ...
297        return xnd
298
299    @staticmethod
300    def _var_sc_to_xnd(scv, scd=None, sc_name='', var=None):
301        '''return a list of Xndarray from a scipp variable
302        - var : name
303        - sc_name : scipp name'''
304        l_xnda = []
305        unit = scv.unit.name if scv.unit and not scv.unit in [
306            'dimensionless', 'ns'] else ''
307        ext_name, typ1 = Nutil.split_json_name(sc_name, True)
308        var_name, typ2 = Nutil.split_json_name(var, True)
309        full_name = var_name + \
310            ('.' if var_name and ext_name else '') + ext_name
311        ntv_type_base = typ1 + typ2
312        ntv_type = ntv_type_base + ('[' + unit + ']' if unit else '')
313
314        links = [Nutil.split_json_name(jsn)[0] for jsn in scv.dims]
315        if not scd is None and sc_name in scd.coords and scv.dims == scd.dims:
316            links = [Nutil.split_json_name(list(scd)[0])[0]]
317        if not scv.variances is None:
318            nda = Ndarray(scv.variances, ntv_type_base)
319            l_xnda.append(Xndarray(full_name + '.variance', nda, links))
320        nda = Ndarray(scv.values, ntv_type)
321        l_xnda.append(Xndarray(full_name, nda, links))
322        return l_xnda
323
324    @staticmethod
325    def _to_sc_dataarray(xdt, name, coords, **option):
326        '''return a scipp.DataArray from a xdataset.global_var defined by his name'''
327        scipp_name, data = ScippConnec._to_scipp_var(xdt, name, **option)
328        masks = dict([ScippConnec._to_scipp_var(xdt, nam, **option)
329                     for nam in set(xdt.var_group(name)) & set(xdt.masks)])
330        return (scipp_name, sc.DataArray(data, coords=coords, masks=masks))
331
332    @staticmethod
333    def _to_scipp_grp(xdt, **option):
334        '''return a dict with metadata, data-array and data_add from a xdataset'''
335        grp = {}
336        grp |= dict([ScippConnec._to_scipp_var(xdt, name, **option)
337                     for name in xdt.data_add + xdt.data_arrays
338                     if xdt[name].add_name != 'variance'])
339        opt_mask = option | {'grp_mask': True}
340        grp |= dict([ScippConnec._to_scipp_var(xdt, name, **opt_mask)
341                     for name in xdt.masks
342                     if xdt[name].name in xdt.names and not xdt[name].name in xdt.data_vars])
343        grp |= {name + '.meta': xdt[name].meta for name in xdt.names
344                if xdt[name].meta}
345        for name in xdt.names:
346            if xdt[name].mode == 'relative':
347                grp |= xdt[name].to_json(header=False)
348        return grp
349
350    @staticmethod
351    def _to_scipp_var(xdt, name, **kwargs):
352        '''return a scipp.Variable from a Xndarray defined by his name'''
353        option = {'grp_mask': False, 'ntv_type': True} | kwargs
354        add_name = Nutil.split_name(name)[1]
355        new_n = add_name if name in xdt.masks and not option['grp_mask'] else name
356        opt_n = option['ntv_type']
357        values = xdt.to_ndarray(name)
358        vari_name = name + '.variance'
359        variances = xdt[vari_name].darray if vari_name in xdt.names else None
360        if not variances is None:
361            variances = variances.reshape(xdt.shape_dims(vari_name))
362        dims = xdt.dims(name, opt_n) if xdt.dims(
363            name, opt_n) else [xdt[name].name]
364        simple_type, unit = Nutil.split_type(xdt[name].ntv_type)
365        scipp_name = new_n + (':' + simple_type if opt_n else '')
366        unit = unit if unit else ''
367        return (scipp_name, sc.array(dims=dims, values=values,
368                                     variances=variances, unit=unit))

Scipp interface with two static methods ximport and xexport

SCTYPE_DTYPE = {'string': 'str'}
@staticmethod
def xexport(xdt, **kwargs):
223    @staticmethod
224    def xexport(xdt, **kwargs):
225        '''return a sc.DataArray or a sc.Dataset from a xdataset
226
227        *Parameters*
228
229        - **dataset** : Boolean (default True) - if False and a single data_var,
230        return a DataArray
231        - **datagroup** : Boolean (default True) - if True return a DataGroup with
232        metadata and data_arrays
233        - **ntv_type** : Boolean (default True) - if True add ntv-type to the name
234        '''
235        option = {'dataset': True, 'datagroup': True,
236                  'ntv_type': True} | kwargs
237        coords = dict([ScippConnec._to_scipp_var(xdt, name, **option)
238                       for name in xdt.coordinates + xdt.dimensions
239                       if xdt[name].mode == 'absolute'])
240        scd = sc.Dataset(dict([ScippConnec._to_sc_dataarray(xdt, name, coords, **option)
241                               for name in xdt.data_vars
242                               if xdt[name].mode == 'absolute']))
243        scd = scd if option['dataset'] else scd[list(scd)[0]]
244        if not option['datagroup']:
245            return scd
246        sc_name = xdt.name if xdt.name else 'no_name'
247        return sc.DataGroup({sc_name: scd} | ScippConnec._to_scipp_grp(xdt, **option))

return a sc.DataArray or a sc.Dataset from a xdataset

Parameters

  • dataset : Boolean (default True) - if False and a single data_var, return a DataArray
  • datagroup : Boolean (default True) - if True return a DataGroup with metadata and data_arrays
  • ntv_type : Boolean (default True) - if True add ntv-type to the name
@staticmethod
def ximport(sc_obj, Xclass, **kwargs):
249    @staticmethod
250    def ximport(sc_obj, Xclass, **kwargs):
251        '''return a xdataset from a scipp object DataArray, Dataset or DataGroup'''
252        xnd = []
253        scd = sc_obj
254        xnd_name = None
255        if isinstance(sc_obj, sc.DataGroup):
256            for obj in sc_obj:
257                if isinstance(sc_obj[obj], (sc.Dataset, sc.DataArray)):
258                    scd = sc_obj[obj]
259                    xnd_name = obj
260                    break
261        if isinstance(scd, sc.DataArray):
262            scd = sc.Dataset({(scd.name if scd.name else 'no_name'): scd})
263        if isinstance(scd, sc.Dataset):
264            for coord in scd.coords:
265                xnd += ScippConnec._var_sc_to_xnd(
266                    scd.coords[coord], scd, coord)
267            for var in scd:
268                for mask in scd[var].masks:
269                    m_var = Nutil.split_json_name(var)[0]
270                    xnd += ScippConnec._var_sc_to_xnd(
271                        scd[var].masks[mask], scd, mask, m_var)
272                xnd += ScippConnec._var_sc_to_xnd(scd[var].data, scd, var)
273        if isinstance(sc_obj, sc.DataGroup):
274            xnd = ScippConnec._grp_sc_to_xnd(sc_obj, xnd)
275        return Xclass(xnd, xnd_name).to_canonical()

return a xdataset from a scipp object DataArray, Dataset or DataGroup