ntv-numpy.ntv_numpy.xconnector

@author: Philippe@loco-labs.io

The xconnector module is part of the ntv-numpy.ntv_numpy package (specification document).

It contains interface classes with two static methods ximport and xexport:

For more information, see the user guide or the github repository.

  1# -*- coding: utf-8 -*-
  2"""
  3@author: Philippe@loco-labs.io
  4
  5The `xconnector` module is part of the `ntv-numpy.ntv_numpy` package ([specification document](
  6https://loco-philippe.github.io/ES/JSON%20semantic%20format%20(JSON-NTV).htm)).
  7
  8It contains interface classes with two static methods `ximport` and `xexport`:
  9- `XarrayConnec` class for Xarray Dataset or DataArray,
 10- `AstropyNDDataConnec` class for Astropy NDData,
 11- `ScippConnec` class for Scipp Dataset or DataArray,
 12- `PandasConnec` class for pandas dataFrame,
 13
 14
 15For more information, see the
 16[user guide](https://loco-philippe.github.io/ntv-numpy/docs/user_guide.html)
 17 or the [github repository](https://github.com/loco-philippe/ntv-numpy).
 18"""
 19
 20
 21import xarray as xr
 22import scipp as sc
 23import pandas as pd
 24import numpy as np
 25from astropy import wcs
 26from astropy.nddata import NDData
 27from astropy.nddata.nduncertainty import StdDevUncertainty, VarianceUncertainty
 28from astropy.nddata.nduncertainty import InverseVariance
 29from ntv_numpy.ndarray import Nutil, Ndarray
 30from ntv_numpy.xndarray import Xndarray
 31
 32
 33class AstropyNDDataConnec:
 34    ''' NDData interface with two static methods ximport and xexport'''
 35
 36    @staticmethod
 37    def xexport(xdt, **kwargs):
 38        '''return a NDData from a Xdataset'''
 39        data = xdt['data'].ndarray
 40        mask = xdt['data.mask'].ndarray
 41        unit = xdt['data'].nda.ntvtype.extension
 42        uncert = xdt['data.uncertainty'].ndarray
 43        typ_u = xdt['data.uncertainty'].nda.ntvtype.extension
 44        match typ_u:
 45            case 'std':
 46                uncertainty = StdDevUncertainty(uncert)
 47            case 'var':
 48                uncertainty = VarianceUncertainty(uncert)
 49            case 'inv':
 50                uncertainty = InverseVariance(uncert)
 51            case _:
 52                uncertainty = uncert
 53        meta = xdt['meta'].meta | {'name': xdt.name}
 54        wcs_dic = xdt['wcs'].meta
 55        psf = xdt['psf'].ndarray
 56        return NDData(data, mask=mask, unit=unit, uncertainty=uncertainty,
 57                      meta=meta, wcs=wcs.WCS(wcs_dic), psf=psf)
 58
 59    @staticmethod
 60    def ximport(ndd, Xclass, **kwargs):
 61        '''return a Xdataset from a astropy.NDData'''
 62        xnd = []
 63        name = 'no_name'
 64        unit = ndd.unit.to_string() if ndd.unit is not None else None
 65        ntv_type = Nutil.ntv_type(ndd.data.dtype.name, ext=unit)
 66        xnd += [Xndarray('data', nda=Ndarray(ndd.data, ntv_type=ntv_type))]
 67        if ndd.meta:
 68            meta = {key: val for key, val in ndd.meta.items() if key != 'name'}
 69            name = ndd.meta.get('name', 'no_name')
 70            xnd += [Xndarray('meta', meta=meta)]
 71        if ndd.wcs:
 72            xnd += [Xndarray('wcs', meta=dict(ndd.wcs.to_header()))]
 73        if ndd.psf is not None:
 74            xnd += [Xndarray('psf', nda=Ndarray(ndd.psf, ntv_type=ntv_type))]
 75        if ndd.mask is not None:
 76            xnd += [Xndarray('data.mask', nda=ndd.mask)]
 77        if ndd.uncertainty is not None:
 78            typ_u = ndd.uncertainty.__class__.__name__[:3].lower()
 79            ntv_type = Nutil.ntv_type(
 80                ndd.uncertainty.array.dtype.name, ext=typ_u)
 81            nda = Ndarray(ndd.uncertainty.array, ntv_type=ntv_type)
 82            xnd += [Xndarray('data.uncertainty', nda=nda)]
 83        return Xclass(xnd, name).to_canonical()
 84
 85
 86class PandasConnec:
 87    ''' pandas.DataFrame interface with two static methods ximport and xexport'''
 88
 89    @staticmethod
 90    def xexport(xdt, **kwargs):
 91        '''return a pd.DataFrame from a Xdataset
 92
 93        *Parameters*
 94
 95        - **ntv_type**: Boolean (default True) - if False use full_name else json_name
 96        - **info**: Boolean (default True) - if True add xdt.info in DataFrame.attrs
 97        - **dims**: list of string (default None) - order of dimensions full_name to apply
 98        '''
 99        opt = {'ntv_type': True, 'info': True, 'dims': None} | kwargs
100        dic_name = {name: xdt[name].json_name if opt['ntv_type'] else xdt[name].full_name
101                    for name in xdt.names}
102        dims = xdt.dimensions if not opt['dims'] else tuple(opt['dims'])
103        fields = (xdt.group(dims) + xdt.group(xdt.coordinates) +
104                  xdt.group(xdt.data_vars) + xdt.uniques)
105        fields += tuple(nam for nam in xdt.group(xdt.data_arrays)
106                        if len(xdt[nam]) == xdt.length)
107        fields_array = tuple(var for var in fields if not xdt[var].uri)
108        dic_series = {dic_name[name]: PandasConnec._to_np_series(xdt, name, dims)
109                      for name in fields_array}
110        dfr = pd.DataFrame(dic_series)
111        index = [dic_name[name] for name in dims]
112        if index:
113            dfr = dfr.set_index(index)
114        if opt['info']:
115            dfr.attrs |= {'info': xdt.tab_info}
116            dfr.attrs |= {'metadata': {
117                name: xdt[name].meta for name in xdt.metadata}}
118            fields_uri = [var for var in fields if var not in fields_array]
119            fields_other = [nam for nam in xdt.group(xdt.data_arrays)
120                            if len(xdt[nam]) != xdt.length]
121            if fields_uri:
122                dfr.attrs |= {'fields': {nam: xdt[nam].to_json(noname=True,)
123                                         for nam in fields_uri + fields_other}}
124            if xdt.name:
125                dfr.attrs |= {'name': xdt.name}
126        return dfr
127
128    @staticmethod
129    def ximport(df, Xclass, **kwargs):
130        '''return a Xdataset from a pd.DataFrame
131
132        *Parameters*
133
134        - dims: list of string (default None) - order of dimensions to apply
135        '''
136        opt = {'dims': None} | kwargs
137        xnd = []
138        dfr = df.reset_index()
139        if 'index' in dfr.columns and 'index' not in df.columns:
140            del dfr['index']
141        df_names = {Nutil.split_json_name(j_name)[0]: j_name
142                    for j_name in dfr.columns}
143        df_ntv_types = {Nutil.split_json_name(j_name)[0]:
144                        Nutil.split_json_name(j_name)[1] for j_name in dfr.columns}
145        dfr.columns = [Nutil.split_json_name(name)[0] for name in dfr.columns]
146        if dfr.attrs.get('metadata'):
147            for name, meta in dfr.attrs['metadata'].items():
148                xnd += [Xndarray.read_json({name: meta})]
149        if dfr.attrs.get('fields'):
150            for name, jsn in dfr.attrs['fields'].items():
151                xnd += [Xndarray.read_json({name: jsn})]
152        if dfr.attrs.get('info'):
153            dimensions = dfr.attrs['info']['dimensions']
154            data = dfr.attrs['info']['data']
155        else:
156            dimensions, data = PandasConnec._ximport_analysis(dfr, opt['dims'])
157        shape_dfr = [data[dim]['shape'][0]
158                     for dim in dimensions] if dimensions else len(dfr)
159        dfr = dfr.sort_values(dimensions)
160        for name in df_names:
161            xnd += [PandasConnec._ximport_series(data, name, dfr, dimensions,
162                                                 shape_dfr, df_ntv_types, **opt)]
163        return Xclass(xnd, dfr.attrs.get('name')).to_canonical()
164
165    @staticmethod
166    def _ximport_analysis(dfr, opt_dims):
167        '''return data and dimensions from analysis module
168        - opt_dims: partition to apply
169        - dfr: dataframe to analyse'''
170        dfr_idx = list(dfr.index.names)
171        opt_dims = dfr_idx if dfr_idx != [None] else opt_dims
172        ana = dfr.npd.analysis(distr=True)
173        partition = ana.field_partition(partition=opt_dims, mode='id')
174        part_rel = ana.relation_partition(partition=opt_dims, noroot=True)
175        part_dim = ana.relation_partition(
176            partition=opt_dims, noroot=True, primary=True)
177        dimensions = partition['primary']
178        len_fields = {fld.idfield: fld.lencodec for fld in ana.fields}
179        data = {fld.idfield: {
180            'shape': [len_fields[dim] for dim in part_dim[fld.idfield]] if part_dim else [],
181            'links': part_rel[fld.idfield] if part_rel else []} for fld in ana.fields}
182        for json_name in data:
183            if not data[json_name]['shape']:
184                name = Nutil.split_name(Nutil.split_json_name(json_name)[0])[0]
185                p_name = [js_name for js_name in data
186                          if Nutil.split_json_name(js_name)[0] == name][0]
187                data[json_name]['shape'] = data[p_name]['shape']
188        return (dimensions, data)
189
190    @staticmethod
191    def _ximport_series(data, name, dfr, dimensions, shape_dfr, df_ntv_types, **opt):
192        '''return a Xndarray from a Series of a pd.DataFrame'''
193        if data[name].get('xtype') == 'meta':  # or len(dfr[name].unique()) == 1:
194            return Xndarray(name, meta=dfr[name].iloc[0])
195        meta = data[name].get('meta')
196        ntv_type = df_ntv_types[name]
197        if len(dfr[name].unique()) == 1:
198            nda = Ndarray(np.array(dfr[name].iloc[0]),
199                          ntv_type=ntv_type, str_uri=False)
200            nda.set_shape([1])
201            return Xndarray(name, nda=nda, meta=meta)
202        if not dimensions:
203            nda = Ndarray(np.array(dfr[name]), ntv_type=ntv_type)
204            return Xndarray(name, nda=nda, meta=meta)
205        dims = []
206        PandasConnec._get_dims(dims, name, data, dimensions)
207        if not dims:
208            p_name, add_name = Nutil.split_name(name)
209            if add_name:
210                PandasConnec._get_dims(dims, p_name, data, dimensions)
211        np_array = PandasConnec._from_series(dfr, name, shape_dfr,
212                                             dimensions, dims, opt['dims'])
213        shape = data[name].get('shape', [len(dfr)])
214        nda = Ndarray(np_array, ntv_type, shape)
215        links = data[name].get('links')
216        return Xndarray(name, nda=nda, links=links if links else dims, meta=meta)
217
218    @staticmethod
219    def _to_np_series(xdt, name, dims):
220        '''return a np.ndarray from the Xndarray of xdt defined by his name
221
222        *parameters*
223
224        - **xdt**: Xdataset - data to convert in a pd.DataFrame
225        - **name**: string - full_name of the Xndarray to convert
226        - **dims**: list of string - order of dimensions full_name to apply'''
227        if name in xdt.uniques:
228            return np.array([xdt[name].darray[0]] * xdt.length)
229        if xdt[name].shape == [xdt.length]:
230            return xdt[name].darray
231        n_shape = {nam: len(xdt[nam]) for nam in dims}
232        dim_name = xdt.dims(name)
233        if not set(dim_name) <= set(dims):
234            return None
235        add_name = [nam for nam in dims if nam not in dim_name]
236        tab_name = add_name + dim_name
237
238        til = 1
239        for nam in add_name:
240            til *= n_shape[nam]
241        shap = [n_shape[nam] for nam in tab_name]
242        order = [dims.index(nam) for nam in tab_name]
243        arr = xdt[name].darray
244        return Nutil.extend_array(arr, til, shap, order)
245
246    @staticmethod
247    def _from_series(dfr, name, shape, dims, links, new_dims=None):
248        '''return a flattened np.ndarray from the pd.Series of dfr defined by his name
249
250        *parameters*
251
252        - dfr: DataFrame - data to convert in Xdataset
253        - name: string - name of the Series (full_name or json_name)
254        - shape: shape of the Xdataset
255        - dims: list of string - list of name of dimensions
256        - links: list of string - list of linked Series
257        - new_dims: list of string (default None) - new order of dims
258        '''
259        if not links:
260            return np.array(dfr[name])
261        old_order = list(range(len(dims)))
262        new_dims = new_dims if new_dims else dims
263        order = [dims.index(dim)
264                 for dim in new_dims] if new_dims else old_order
265        idx = [0] * len(dims)
266        for nam in links:
267            idx[new_dims.index(nam)] = slice(shape[dims.index(nam)])
268        xar = np.moveaxis(np.array(dfr[name]).reshape(shape),
269                          old_order, order)[tuple(idx)]
270        if not links:
271            return xar.flatten()
272        lnk = [nam for nam in new_dims if nam in links]
273        shape_lnk = [shape[dims.index(nam)] for nam in lnk]
274        xar = xar.reshape(shape_lnk)
275        old_order = list(range(len(links)))
276        order = [lnk.index(dim) for dim in links]
277        return np.moveaxis(xar, old_order, order).flatten()
278
279    @staticmethod
280    def _get_dims(dims, name, data, dimensions):
281        '''add names of dimensions into dims'''
282        if not name:
283            return
284        if name in dimensions:
285            dims += [name]
286        else:
287            if 'links' not in data[name]:
288                return
289            for nam in data[name]['links']:
290                PandasConnec._get_dims(dims, nam, data, dimensions)
291
292
293class XarrayConnec:
294    ''' Xarray interface with two static methods ximport and xexport'''
295
296    @staticmethod
297    def xexport(xdt, **kwargs):
298        '''return a xr.DataArray or a xr.Dataset from a Xdataset
299
300        *Parameters*
301
302        - **dataset** : Boolean (default True) - if False and a single data_var,
303        return a xr.DataArray
304        - **info** : Boolean (default True) - if True, add json representation
305        of 'relative' Xndarrays and 'data_arrays' Xndarrays in attrs
306        '''
307        option = {'dataset': True, 'info': True} | kwargs
308        coords = XarrayConnec._to_xr_vars(
309            xdt, xdt.dimensions + xdt.coordinates + xdt.uniques)
310        coords |= XarrayConnec._to_xr_vars(xdt, xdt.additionals)
311        attrs = XarrayConnec._to_xr_attrs(xdt, **option)
312        if len(xdt.data_vars) == 1 and not option['dataset']:
313            var_name = xdt.data_vars[0]
314            data = xdt.to_ndarray(var_name)
315            dims = xdt.dims(var_name)
316            attrs |= {'ntv_type': xdt[var_name].nda.ntv_type}
317            attrs |= xdt[var_name].meta if xdt[var_name].meta else {}
318            name = var_name if var_name != 'data' else None
319            xrd = xr.DataArray(data=data, coords=coords, dims=dims, attrs=attrs,
320                               name=name)
321        else:
322            data_vars = XarrayConnec._to_xr_vars(xdt, xdt.data_vars)
323            xrd = xr.Dataset(data_vars, coords=coords, attrs=attrs)
324        for unic in xdt.uniques:
325            xrd[unic].attrs |= {'ntv_type': xdt[unic].ntv_type} | (
326                xdt[unic].meta if xdt[unic].meta else {})
327        return xrd
328
329    @staticmethod
330    def ximport(xar, Xclass, **kwargs):
331        '''return a Xdataset from a xr.DataArray or a xr.Dataset'''
332        xnd = []
333        if xar.attrs:
334            attrs = {k: v for k, v in xar.attrs.items() if k not in [
335                'name', 'ntv_type']}
336            for name, meta in attrs.items():
337                if isinstance(meta, list):
338                    xnd += [Xndarray.read_json({name: meta})]
339                else:
340                    xnd += [Xndarray(name, meta=meta)]
341        for coord in xar.coords:
342            xnd += [XarrayConnec._var_xr_to_xnd(xar.coords[coord])]
343            if list(xar.coords[coord].dims) == list(xar.dims) and isinstance(xar, xr.Dataset):
344                xnd[-1].links = [list(xar.data_vars)[0]]
345        if isinstance(xar, xr.DataArray):
346            var = XarrayConnec._var_xr_to_xnd(
347                xar, name='data', add_attrs=False)
348            xnd += [XarrayConnec._var_xr_to_xnd(xar,
349                                                name='data', add_attrs=False)]
350            xdt = Xclass(xnd, xar.attrs.get('name'))
351            for var in xdt.data_vars:
352                if var != xar.name and xar.name:
353                    xdt[var].links = [xar.name]
354            return xdt.to_canonical()
355        for var in xar.data_vars:
356            xnd += [XarrayConnec._var_xr_to_xnd(xar.data_vars[var])]
357        return Xclass(xnd, xar.attrs.get('name')).to_canonical()
358
359    @staticmethod
360    def _var_xr_to_xnd(var, name=None, add_attrs=True):
361        '''return a Xndarray from a Xarray variable
362
363        *Parameters*
364
365        - **var** : Xarray variable to convert in Xndarray,
366        - **name** : string (default None) - default name if var have no name,
367        - **add_attrs** : boolean (default True) - if False, attrs are not converted
368        '''
369        full_name = var.name if var.name else name
370        name = Nutil.split_name(full_name)[0]
371        dims = None if var.dims == (name,) or var.size == 1 else list(var.dims)
372        ntv_type = var.attrs.get('ntv_type')
373        nda = var.values
374        nda = nda.reshape(1) if not nda.shape else nda
375        if nda.dtype.name == 'datetime64[ns]' and ntv_type:
376            nda = Nutil.convert(ntv_type, nda, tojson=False)
377        attrs = {k: v for k, v in var.attrs.items()
378                 if k not in ['ntv_type', 'name']} if add_attrs else {}
379        return Xndarray(full_name, Ndarray(nda, ntv_type), dims, attrs)
380
381    @staticmethod
382    def _to_xr_attrs(xdt, **option):
383        '''return a dict with attributes from a Xdataset
384
385        *Parameters*
386
387        - **info** : Boolean  if True, add json representation of 'relative'
388        Xndarrays and 'data_arrays' Xndarrays in attrs
389        '''
390        attrs = {meta: xdt[meta].meta for meta in xdt.metadata}
391        attrs |= {'name': xdt.name} if xdt.name else {}
392        if option['info']:
393            for name in xdt.names:
394                if xdt[name].mode == 'relative':
395                    attrs |= xdt[name].to_json(header=False)
396            for name in xdt.data_arrays:
397                attrs |= xdt[name].to_json(header=False)
398        return attrs
399
400    @staticmethod
401    def _to_xr_coord(xdt, name):
402        '''return a dict with Xarray attributes from a Xndarray defined by his name'''
403        data = xdt.to_ndarray(name)
404        if name in xdt.uniques:
405            return {name: data[0]}
406        if name in xdt.additionals and not xdt[name].links:
407            data = data.reshape(xdt.shape_dims(xdt[name].name))
408        dims = tuple(xdt.dims(name)) if xdt.dims(name) else (xdt[name].name)
409        meta = {'ntv_type': xdt[name].ntv_type} | (
410            xdt[name].meta if xdt[name].meta else {})
411        return {name: (dims, data, meta)}
412
413    @staticmethod
414    def _to_xr_vars(xdt, list_names):
415        '''return a dict with Xarray attributes from a list of Xndarray names'''
416        arg_vars = {}
417        valid_names = [
418            nam for nam in list_names if xdt[nam].mode == 'absolute']
419        for xnd_name in valid_names:
420            arg_vars |= XarrayConnec._to_xr_coord(xdt, xnd_name)
421        for name in list_names:
422            if xdt[name].xtype == 'meta':
423                arg_vars |= {name: xdt[name].meta}
424        return arg_vars
425
426    @staticmethod
427    def _xr_add_type(xar):
428        '''add 'ntv_type' as attribute for a xr.DataArray'''
429        if isinstance(xar, xr.DataArray) and 'ntv_type' not in xar.attrs:
430            xar.attrs |= {'ntv_type': Nutil.ntv_type(xar.data.dtype.name)}
431            return
432        for coord in xar.coords:
433            XarrayConnec._xr_add_type(coord)
434        for var in xar.data_vars:
435            XarrayConnec._xr_add_type(var)
436        return
437
438
439class ScippConnec:
440    ''' Scipp interface with two static methods ximport and xexport'''
441
442    SCTYPE_DTYPE = {'string': 'str'}
443
444    @staticmethod
445    def xexport(xdt, **kwargs):
446        '''return a sc.DataArray or a sc.Dataset from a xdataset
447
448        *Parameters*
449
450        - **dataset** : Boolean (default True) - if False and a single data_var,
451        return a DataArray
452        - **info** : Boolean (default True) - if True return a DataGroup with
453        metadata and data_arrays
454        - **ntv_type** : Boolean (default True) - if True add ntv-type to the name
455        '''
456        option = {'dataset': True, 'info': True,
457                  'ntv_type': True} | kwargs
458        coords = dict([ScippConnec._to_scipp_var(xdt, name, **option)
459                       for name in xdt.coordinates + xdt.dimensions + xdt.uniques
460                       if xdt[name].mode == 'absolute'])
461        scd = sc.Dataset(dict([ScippConnec._to_sc_dataarray(xdt, name, coords, **option)
462                               for name in xdt.data_vars
463                               if xdt[name].mode == 'absolute']))
464        scd = scd if option['dataset'] else scd[list(scd)[0]]
465        if not option['info']:
466            return scd
467        sc_name = xdt.name if xdt.name else 'no_name'
468        return sc.DataGroup({sc_name: scd} | ScippConnec._to_scipp_grp(xdt, **option))
469
470    @staticmethod
471    def ximport(sc_obj, Xclass, **kwargs):
472        '''return a xdataset from a scipp object DataArray, Dataset or DataGroup'''
473        xnd = []
474        scd = sc_obj
475        xnd_name = None
476        if isinstance(sc_obj, sc.DataGroup):
477            for obj in sc_obj:
478                if isinstance(sc_obj[obj], (sc.Dataset, sc.DataArray)):
479                    scd = sc_obj[obj]
480                    xnd_name = obj
481                    break
482        if isinstance(scd, sc.DataArray):
483            scd = sc.Dataset({(scd.name if scd.name else 'no_name'): scd})
484        if isinstance(scd, sc.Dataset):
485            for coord in scd.coords:
486                xnd += ScippConnec._var_sc_to_xnd(
487                    scd.coords[coord], scd, coord)
488            for var in scd:
489                for mask in scd[var].masks:
490                    m_var = Nutil.split_json_name(var)[0]
491                    xnd += ScippConnec._var_sc_to_xnd(
492                        scd[var].masks[mask], scd, mask, m_var)
493                xnd += ScippConnec._var_sc_to_xnd(scd[var].data, scd, var)
494        if isinstance(sc_obj, sc.DataGroup):
495            xnd = ScippConnec._grp_sc_to_xnd(sc_obj, xnd)
496        return Xclass(xnd, xnd_name).to_canonical()
497
498    @staticmethod
499    def _grp_sc_to_xnd(sc_obj, xnd):
500        '''return a list of Xndarray from a scipp variable'''
501        dic_xnd = {xar.name: xar for xar in xnd}
502        for obj in sc_obj:
503            name, add_name = Nutil.split_name(obj)
504            match [name, add_name, sc_obj[obj]]:
505                case [name, None, list()]:
506                    xnd += [Xndarray.read_json({name: sc_obj[obj]})]
507                case [name, add_name, sc.Variable()]:
508                    xnd += ScippConnec._var_sc_to_xnd(
509                        sc_obj[obj], None, add_name, name)
510                case [name, _, dict() | str() | list()] if name in dic_xnd:
511                    if dic_xnd[name].meta:
512                        dic_xnd[name].meta |= sc_obj[obj]
513                    else:
514                        dic_xnd[name].meta = sc_obj[obj]
515                case [name, _, dict() | str() | list()]:
516                    xnd += [Xndarray.read_json({name: sc_obj[obj]})]
517                case [_, _, _]: ...
518        return xnd
519
520    @staticmethod
521    def _var_sc_to_xnd(scv, scd=None, sc_name='', var=None):
522        '''return a list of Xndarray from a scipp variable
523        - scd : scipp dataset
524        - scv : scipp variable
525        - var : name
526        - sc_name : scipp name'''
527        l_xnda = []
528        unit = scv.unit.name if scv.unit and scv.unit not in [
529            'dimensionless', 'ns'] else ''
530        ext_name, typ1 = Nutil.split_json_name(sc_name, True)
531        var_name, typ2 = Nutil.split_json_name(var, True)
532        full_name = var_name + \
533            ('.' if var_name and ext_name else '') + ext_name
534        ntv_type_base = typ1 + typ2
535        ntv_type = ntv_type_base + ('[' + unit + ']' if unit else '')
536        links = [Nutil.split_json_name(jsn)[0] for jsn in scv.dims]
537        if scd is not None and sc_name in scd.coords and scv.dims == scd.dims:
538            links = [Nutil.split_json_name(list(scd)[0])[0]]
539        if scv.variances is not None:
540            nda = Ndarray(scv.variances, ntv_type_base)
541            l_xnda.append(Xndarray(full_name + '.variance', nda, links))
542        nda = Ndarray(scv.values, ntv_type, str_uri=False)
543        shape = scv.shape if scv.shape else (1,)
544        nda.set_shape(shape)
545        l_xnda.append(Xndarray(full_name, nda, links))
546        return l_xnda
547
548    @staticmethod
549    def _to_sc_dataarray(xdt, name, coords, **option):
550        '''return a scipp.DataArray from a xdataset.global_var defined by his name'''
551        scipp_name, data = ScippConnec._to_scipp_var(xdt, name, **option)
552        masks = dict([ScippConnec._to_scipp_var(xdt, nam, **option)
553                     for nam in set(xdt.group(name)) & set(xdt.masks)])
554        return (scipp_name, sc.DataArray(data, coords=coords, masks=masks))
555
556    @staticmethod
557    def _to_scipp_grp(xdt, **option):
558        '''return a dict with metadata, data-array and data_add from a xdataset'''
559        grp = {}
560        grp |= dict([ScippConnec._to_scipp_var(xdt, name, **option)
561                     for name in xdt.data_add + xdt.data_arrays
562                     if xdt[name].add_name != 'variance'])
563        opt_mask = option | {'grp_mask': True}
564        grp |= dict([ScippConnec._to_scipp_var(xdt, name, **opt_mask)
565                     for name in xdt.masks
566                     if xdt[name].name in xdt.names and xdt[name].name not in xdt.data_vars])
567        grp |= {name + '.meta': xdt[name].meta for name in xdt.names
568                if xdt[name].meta}
569        for name in xdt.names:
570            if xdt[name].mode == 'relative':
571                grp |= xdt[name].to_json(header=False)
572        return grp
573
574    @staticmethod
575    def _to_scipp_var(xdt, name, **kwargs):
576        '''return a scipp.Variable from a Xndarray defined by his name'''
577        option = {'grp_mask': False, 'ntv_type': True} | kwargs
578        simple_type, unit = Nutil.split_type(xdt[name].ntv_type)
579        unit = unit if unit else ''
580        add_name = Nutil.split_name(name)[1]
581        new_n = add_name if name in xdt.masks and not option['grp_mask'] else name
582        opt_n = option['ntv_type']
583        scipp_name = new_n + (':' + simple_type if opt_n else '')
584        if name in xdt.uniques:
585            return (scipp_name, sc.scalar(xdt[name].darray[0], unit=unit))
586        vari_name = name + '.variance'
587        variances = xdt[vari_name].darray if vari_name in xdt.names else None
588        dims = xdt.dims(name, opt_n) if xdt.dims(
589            name, opt_n) else [xdt[name].name]
590        var = sc.array(dims=['flat'], values=xdt.to_darray(
591            name), variances=variances, unit=unit)
592        var = sc.fold(var, dim='flat', sizes=dict(zip(dims, xdt[name].shape)))
593        return (scipp_name, var)
class AstropyNDDataConnec:
34class AstropyNDDataConnec:
35    ''' NDData interface with two static methods ximport and xexport'''
36
37    @staticmethod
38    def xexport(xdt, **kwargs):
39        '''return a NDData from a Xdataset'''
40        data = xdt['data'].ndarray
41        mask = xdt['data.mask'].ndarray
42        unit = xdt['data'].nda.ntvtype.extension
43        uncert = xdt['data.uncertainty'].ndarray
44        typ_u = xdt['data.uncertainty'].nda.ntvtype.extension
45        match typ_u:
46            case 'std':
47                uncertainty = StdDevUncertainty(uncert)
48            case 'var':
49                uncertainty = VarianceUncertainty(uncert)
50            case 'inv':
51                uncertainty = InverseVariance(uncert)
52            case _:
53                uncertainty = uncert
54        meta = xdt['meta'].meta | {'name': xdt.name}
55        wcs_dic = xdt['wcs'].meta
56        psf = xdt['psf'].ndarray
57        return NDData(data, mask=mask, unit=unit, uncertainty=uncertainty,
58                      meta=meta, wcs=wcs.WCS(wcs_dic), psf=psf)
59
60    @staticmethod
61    def ximport(ndd, Xclass, **kwargs):
62        '''return a Xdataset from a astropy.NDData'''
63        xnd = []
64        name = 'no_name'
65        unit = ndd.unit.to_string() if ndd.unit is not None else None
66        ntv_type = Nutil.ntv_type(ndd.data.dtype.name, ext=unit)
67        xnd += [Xndarray('data', nda=Ndarray(ndd.data, ntv_type=ntv_type))]
68        if ndd.meta:
69            meta = {key: val for key, val in ndd.meta.items() if key != 'name'}
70            name = ndd.meta.get('name', 'no_name')
71            xnd += [Xndarray('meta', meta=meta)]
72        if ndd.wcs:
73            xnd += [Xndarray('wcs', meta=dict(ndd.wcs.to_header()))]
74        if ndd.psf is not None:
75            xnd += [Xndarray('psf', nda=Ndarray(ndd.psf, ntv_type=ntv_type))]
76        if ndd.mask is not None:
77            xnd += [Xndarray('data.mask', nda=ndd.mask)]
78        if ndd.uncertainty is not None:
79            typ_u = ndd.uncertainty.__class__.__name__[:3].lower()
80            ntv_type = Nutil.ntv_type(
81                ndd.uncertainty.array.dtype.name, ext=typ_u)
82            nda = Ndarray(ndd.uncertainty.array, ntv_type=ntv_type)
83            xnd += [Xndarray('data.uncertainty', nda=nda)]
84        return Xclass(xnd, name).to_canonical()

NDData interface with two static methods ximport and xexport

@staticmethod
def xexport(xdt, **kwargs):
37    @staticmethod
38    def xexport(xdt, **kwargs):
39        '''return a NDData from a Xdataset'''
40        data = xdt['data'].ndarray
41        mask = xdt['data.mask'].ndarray
42        unit = xdt['data'].nda.ntvtype.extension
43        uncert = xdt['data.uncertainty'].ndarray
44        typ_u = xdt['data.uncertainty'].nda.ntvtype.extension
45        match typ_u:
46            case 'std':
47                uncertainty = StdDevUncertainty(uncert)
48            case 'var':
49                uncertainty = VarianceUncertainty(uncert)
50            case 'inv':
51                uncertainty = InverseVariance(uncert)
52            case _:
53                uncertainty = uncert
54        meta = xdt['meta'].meta | {'name': xdt.name}
55        wcs_dic = xdt['wcs'].meta
56        psf = xdt['psf'].ndarray
57        return NDData(data, mask=mask, unit=unit, uncertainty=uncertainty,
58                      meta=meta, wcs=wcs.WCS(wcs_dic), psf=psf)

return a NDData from a Xdataset

@staticmethod
def ximport(ndd, Xclass, **kwargs):
60    @staticmethod
61    def ximport(ndd, Xclass, **kwargs):
62        '''return a Xdataset from a astropy.NDData'''
63        xnd = []
64        name = 'no_name'
65        unit = ndd.unit.to_string() if ndd.unit is not None else None
66        ntv_type = Nutil.ntv_type(ndd.data.dtype.name, ext=unit)
67        xnd += [Xndarray('data', nda=Ndarray(ndd.data, ntv_type=ntv_type))]
68        if ndd.meta:
69            meta = {key: val for key, val in ndd.meta.items() if key != 'name'}
70            name = ndd.meta.get('name', 'no_name')
71            xnd += [Xndarray('meta', meta=meta)]
72        if ndd.wcs:
73            xnd += [Xndarray('wcs', meta=dict(ndd.wcs.to_header()))]
74        if ndd.psf is not None:
75            xnd += [Xndarray('psf', nda=Ndarray(ndd.psf, ntv_type=ntv_type))]
76        if ndd.mask is not None:
77            xnd += [Xndarray('data.mask', nda=ndd.mask)]
78        if ndd.uncertainty is not None:
79            typ_u = ndd.uncertainty.__class__.__name__[:3].lower()
80            ntv_type = Nutil.ntv_type(
81                ndd.uncertainty.array.dtype.name, ext=typ_u)
82            nda = Ndarray(ndd.uncertainty.array, ntv_type=ntv_type)
83            xnd += [Xndarray('data.uncertainty', nda=nda)]
84        return Xclass(xnd, name).to_canonical()

return a Xdataset from a astropy.NDData

class PandasConnec:
 87class PandasConnec:
 88    ''' pandas.DataFrame interface with two static methods ximport and xexport'''
 89
 90    @staticmethod
 91    def xexport(xdt, **kwargs):
 92        '''return a pd.DataFrame from a Xdataset
 93
 94        *Parameters*
 95
 96        - **ntv_type**: Boolean (default True) - if False use full_name else json_name
 97        - **info**: Boolean (default True) - if True add xdt.info in DataFrame.attrs
 98        - **dims**: list of string (default None) - order of dimensions full_name to apply
 99        '''
100        opt = {'ntv_type': True, 'info': True, 'dims': None} | kwargs
101        dic_name = {name: xdt[name].json_name if opt['ntv_type'] else xdt[name].full_name
102                    for name in xdt.names}
103        dims = xdt.dimensions if not opt['dims'] else tuple(opt['dims'])
104        fields = (xdt.group(dims) + xdt.group(xdt.coordinates) +
105                  xdt.group(xdt.data_vars) + xdt.uniques)
106        fields += tuple(nam for nam in xdt.group(xdt.data_arrays)
107                        if len(xdt[nam]) == xdt.length)
108        fields_array = tuple(var for var in fields if not xdt[var].uri)
109        dic_series = {dic_name[name]: PandasConnec._to_np_series(xdt, name, dims)
110                      for name in fields_array}
111        dfr = pd.DataFrame(dic_series)
112        index = [dic_name[name] for name in dims]
113        if index:
114            dfr = dfr.set_index(index)
115        if opt['info']:
116            dfr.attrs |= {'info': xdt.tab_info}
117            dfr.attrs |= {'metadata': {
118                name: xdt[name].meta for name in xdt.metadata}}
119            fields_uri = [var for var in fields if var not in fields_array]
120            fields_other = [nam for nam in xdt.group(xdt.data_arrays)
121                            if len(xdt[nam]) != xdt.length]
122            if fields_uri:
123                dfr.attrs |= {'fields': {nam: xdt[nam].to_json(noname=True,)
124                                         for nam in fields_uri + fields_other}}
125            if xdt.name:
126                dfr.attrs |= {'name': xdt.name}
127        return dfr
128
129    @staticmethod
130    def ximport(df, Xclass, **kwargs):
131        '''return a Xdataset from a pd.DataFrame
132
133        *Parameters*
134
135        - dims: list of string (default None) - order of dimensions to apply
136        '''
137        opt = {'dims': None} | kwargs
138        xnd = []
139        dfr = df.reset_index()
140        if 'index' in dfr.columns and 'index' not in df.columns:
141            del dfr['index']
142        df_names = {Nutil.split_json_name(j_name)[0]: j_name
143                    for j_name in dfr.columns}
144        df_ntv_types = {Nutil.split_json_name(j_name)[0]:
145                        Nutil.split_json_name(j_name)[1] for j_name in dfr.columns}
146        dfr.columns = [Nutil.split_json_name(name)[0] for name in dfr.columns]
147        if dfr.attrs.get('metadata'):
148            for name, meta in dfr.attrs['metadata'].items():
149                xnd += [Xndarray.read_json({name: meta})]
150        if dfr.attrs.get('fields'):
151            for name, jsn in dfr.attrs['fields'].items():
152                xnd += [Xndarray.read_json({name: jsn})]
153        if dfr.attrs.get('info'):
154            dimensions = dfr.attrs['info']['dimensions']
155            data = dfr.attrs['info']['data']
156        else:
157            dimensions, data = PandasConnec._ximport_analysis(dfr, opt['dims'])
158        shape_dfr = [data[dim]['shape'][0]
159                     for dim in dimensions] if dimensions else len(dfr)
160        dfr = dfr.sort_values(dimensions)
161        for name in df_names:
162            xnd += [PandasConnec._ximport_series(data, name, dfr, dimensions,
163                                                 shape_dfr, df_ntv_types, **opt)]
164        return Xclass(xnd, dfr.attrs.get('name')).to_canonical()
165
166    @staticmethod
167    def _ximport_analysis(dfr, opt_dims):
168        '''return data and dimensions from analysis module
169        - opt_dims: partition to apply
170        - dfr: dataframe to analyse'''
171        dfr_idx = list(dfr.index.names)
172        opt_dims = dfr_idx if dfr_idx != [None] else opt_dims
173        ana = dfr.npd.analysis(distr=True)
174        partition = ana.field_partition(partition=opt_dims, mode='id')
175        part_rel = ana.relation_partition(partition=opt_dims, noroot=True)
176        part_dim = ana.relation_partition(
177            partition=opt_dims, noroot=True, primary=True)
178        dimensions = partition['primary']
179        len_fields = {fld.idfield: fld.lencodec for fld in ana.fields}
180        data = {fld.idfield: {
181            'shape': [len_fields[dim] for dim in part_dim[fld.idfield]] if part_dim else [],
182            'links': part_rel[fld.idfield] if part_rel else []} for fld in ana.fields}
183        for json_name in data:
184            if not data[json_name]['shape']:
185                name = Nutil.split_name(Nutil.split_json_name(json_name)[0])[0]
186                p_name = [js_name for js_name in data
187                          if Nutil.split_json_name(js_name)[0] == name][0]
188                data[json_name]['shape'] = data[p_name]['shape']
189        return (dimensions, data)
190
191    @staticmethod
192    def _ximport_series(data, name, dfr, dimensions, shape_dfr, df_ntv_types, **opt):
193        '''return a Xndarray from a Series of a pd.DataFrame'''
194        if data[name].get('xtype') == 'meta':  # or len(dfr[name].unique()) == 1:
195            return Xndarray(name, meta=dfr[name].iloc[0])
196        meta = data[name].get('meta')
197        ntv_type = df_ntv_types[name]
198        if len(dfr[name].unique()) == 1:
199            nda = Ndarray(np.array(dfr[name].iloc[0]),
200                          ntv_type=ntv_type, str_uri=False)
201            nda.set_shape([1])
202            return Xndarray(name, nda=nda, meta=meta)
203        if not dimensions:
204            nda = Ndarray(np.array(dfr[name]), ntv_type=ntv_type)
205            return Xndarray(name, nda=nda, meta=meta)
206        dims = []
207        PandasConnec._get_dims(dims, name, data, dimensions)
208        if not dims:
209            p_name, add_name = Nutil.split_name(name)
210            if add_name:
211                PandasConnec._get_dims(dims, p_name, data, dimensions)
212        np_array = PandasConnec._from_series(dfr, name, shape_dfr,
213                                             dimensions, dims, opt['dims'])
214        shape = data[name].get('shape', [len(dfr)])
215        nda = Ndarray(np_array, ntv_type, shape)
216        links = data[name].get('links')
217        return Xndarray(name, nda=nda, links=links if links else dims, meta=meta)
218
219    @staticmethod
220    def _to_np_series(xdt, name, dims):
221        '''return a np.ndarray from the Xndarray of xdt defined by his name
222
223        *parameters*
224
225        - **xdt**: Xdataset - data to convert in a pd.DataFrame
226        - **name**: string - full_name of the Xndarray to convert
227        - **dims**: list of string - order of dimensions full_name to apply'''
228        if name in xdt.uniques:
229            return np.array([xdt[name].darray[0]] * xdt.length)
230        if xdt[name].shape == [xdt.length]:
231            return xdt[name].darray
232        n_shape = {nam: len(xdt[nam]) for nam in dims}
233        dim_name = xdt.dims(name)
234        if not set(dim_name) <= set(dims):
235            return None
236        add_name = [nam for nam in dims if nam not in dim_name]
237        tab_name = add_name + dim_name
238
239        til = 1
240        for nam in add_name:
241            til *= n_shape[nam]
242        shap = [n_shape[nam] for nam in tab_name]
243        order = [dims.index(nam) for nam in tab_name]
244        arr = xdt[name].darray
245        return Nutil.extend_array(arr, til, shap, order)
246
247    @staticmethod
248    def _from_series(dfr, name, shape, dims, links, new_dims=None):
249        '''return a flattened np.ndarray from the pd.Series of dfr defined by his name
250
251        *parameters*
252
253        - dfr: DataFrame - data to convert in Xdataset
254        - name: string - name of the Series (full_name or json_name)
255        - shape: shape of the Xdataset
256        - dims: list of string - list of name of dimensions
257        - links: list of string - list of linked Series
258        - new_dims: list of string (default None) - new order of dims
259        '''
260        if not links:
261            return np.array(dfr[name])
262        old_order = list(range(len(dims)))
263        new_dims = new_dims if new_dims else dims
264        order = [dims.index(dim)
265                 for dim in new_dims] if new_dims else old_order
266        idx = [0] * len(dims)
267        for nam in links:
268            idx[new_dims.index(nam)] = slice(shape[dims.index(nam)])
269        xar = np.moveaxis(np.array(dfr[name]).reshape(shape),
270                          old_order, order)[tuple(idx)]
271        if not links:
272            return xar.flatten()
273        lnk = [nam for nam in new_dims if nam in links]
274        shape_lnk = [shape[dims.index(nam)] for nam in lnk]
275        xar = xar.reshape(shape_lnk)
276        old_order = list(range(len(links)))
277        order = [lnk.index(dim) for dim in links]
278        return np.moveaxis(xar, old_order, order).flatten()
279
280    @staticmethod
281    def _get_dims(dims, name, data, dimensions):
282        '''add names of dimensions into dims'''
283        if not name:
284            return
285        if name in dimensions:
286            dims += [name]
287        else:
288            if 'links' not in data[name]:
289                return
290            for nam in data[name]['links']:
291                PandasConnec._get_dims(dims, nam, data, dimensions)

pandas.DataFrame interface with two static methods ximport and xexport

@staticmethod
def xexport(xdt, **kwargs):
 90    @staticmethod
 91    def xexport(xdt, **kwargs):
 92        '''return a pd.DataFrame from a Xdataset
 93
 94        *Parameters*
 95
 96        - **ntv_type**: Boolean (default True) - if False use full_name else json_name
 97        - **info**: Boolean (default True) - if True add xdt.info in DataFrame.attrs
 98        - **dims**: list of string (default None) - order of dimensions full_name to apply
 99        '''
100        opt = {'ntv_type': True, 'info': True, 'dims': None} | kwargs
101        dic_name = {name: xdt[name].json_name if opt['ntv_type'] else xdt[name].full_name
102                    for name in xdt.names}
103        dims = xdt.dimensions if not opt['dims'] else tuple(opt['dims'])
104        fields = (xdt.group(dims) + xdt.group(xdt.coordinates) +
105                  xdt.group(xdt.data_vars) + xdt.uniques)
106        fields += tuple(nam for nam in xdt.group(xdt.data_arrays)
107                        if len(xdt[nam]) == xdt.length)
108        fields_array = tuple(var for var in fields if not xdt[var].uri)
109        dic_series = {dic_name[name]: PandasConnec._to_np_series(xdt, name, dims)
110                      for name in fields_array}
111        dfr = pd.DataFrame(dic_series)
112        index = [dic_name[name] for name in dims]
113        if index:
114            dfr = dfr.set_index(index)
115        if opt['info']:
116            dfr.attrs |= {'info': xdt.tab_info}
117            dfr.attrs |= {'metadata': {
118                name: xdt[name].meta for name in xdt.metadata}}
119            fields_uri = [var for var in fields if var not in fields_array]
120            fields_other = [nam for nam in xdt.group(xdt.data_arrays)
121                            if len(xdt[nam]) != xdt.length]
122            if fields_uri:
123                dfr.attrs |= {'fields': {nam: xdt[nam].to_json(noname=True,)
124                                         for nam in fields_uri + fields_other}}
125            if xdt.name:
126                dfr.attrs |= {'name': xdt.name}
127        return dfr

return a pd.DataFrame from a Xdataset

Parameters

  • ntv_type: Boolean (default True) - if False use full_name else json_name
  • info: Boolean (default True) - if True add xdt.info in DataFrame.attrs
  • dims: list of string (default None) - order of dimensions full_name to apply
@staticmethod
def ximport(df, Xclass, **kwargs):
129    @staticmethod
130    def ximport(df, Xclass, **kwargs):
131        '''return a Xdataset from a pd.DataFrame
132
133        *Parameters*
134
135        - dims: list of string (default None) - order of dimensions to apply
136        '''
137        opt = {'dims': None} | kwargs
138        xnd = []
139        dfr = df.reset_index()
140        if 'index' in dfr.columns and 'index' not in df.columns:
141            del dfr['index']
142        df_names = {Nutil.split_json_name(j_name)[0]: j_name
143                    for j_name in dfr.columns}
144        df_ntv_types = {Nutil.split_json_name(j_name)[0]:
145                        Nutil.split_json_name(j_name)[1] for j_name in dfr.columns}
146        dfr.columns = [Nutil.split_json_name(name)[0] for name in dfr.columns]
147        if dfr.attrs.get('metadata'):
148            for name, meta in dfr.attrs['metadata'].items():
149                xnd += [Xndarray.read_json({name: meta})]
150        if dfr.attrs.get('fields'):
151            for name, jsn in dfr.attrs['fields'].items():
152                xnd += [Xndarray.read_json({name: jsn})]
153        if dfr.attrs.get('info'):
154            dimensions = dfr.attrs['info']['dimensions']
155            data = dfr.attrs['info']['data']
156        else:
157            dimensions, data = PandasConnec._ximport_analysis(dfr, opt['dims'])
158        shape_dfr = [data[dim]['shape'][0]
159                     for dim in dimensions] if dimensions else len(dfr)
160        dfr = dfr.sort_values(dimensions)
161        for name in df_names:
162            xnd += [PandasConnec._ximport_series(data, name, dfr, dimensions,
163                                                 shape_dfr, df_ntv_types, **opt)]
164        return Xclass(xnd, dfr.attrs.get('name')).to_canonical()

return a Xdataset from a pd.DataFrame

Parameters

  • dims: list of string (default None) - order of dimensions to apply
class XarrayConnec:
294class XarrayConnec:
295    ''' Xarray interface with two static methods ximport and xexport'''
296
297    @staticmethod
298    def xexport(xdt, **kwargs):
299        '''return a xr.DataArray or a xr.Dataset from a Xdataset
300
301        *Parameters*
302
303        - **dataset** : Boolean (default True) - if False and a single data_var,
304        return a xr.DataArray
305        - **info** : Boolean (default True) - if True, add json representation
306        of 'relative' Xndarrays and 'data_arrays' Xndarrays in attrs
307        '''
308        option = {'dataset': True, 'info': True} | kwargs
309        coords = XarrayConnec._to_xr_vars(
310            xdt, xdt.dimensions + xdt.coordinates + xdt.uniques)
311        coords |= XarrayConnec._to_xr_vars(xdt, xdt.additionals)
312        attrs = XarrayConnec._to_xr_attrs(xdt, **option)
313        if len(xdt.data_vars) == 1 and not option['dataset']:
314            var_name = xdt.data_vars[0]
315            data = xdt.to_ndarray(var_name)
316            dims = xdt.dims(var_name)
317            attrs |= {'ntv_type': xdt[var_name].nda.ntv_type}
318            attrs |= xdt[var_name].meta if xdt[var_name].meta else {}
319            name = var_name if var_name != 'data' else None
320            xrd = xr.DataArray(data=data, coords=coords, dims=dims, attrs=attrs,
321                               name=name)
322        else:
323            data_vars = XarrayConnec._to_xr_vars(xdt, xdt.data_vars)
324            xrd = xr.Dataset(data_vars, coords=coords, attrs=attrs)
325        for unic in xdt.uniques:
326            xrd[unic].attrs |= {'ntv_type': xdt[unic].ntv_type} | (
327                xdt[unic].meta if xdt[unic].meta else {})
328        return xrd
329
330    @staticmethod
331    def ximport(xar, Xclass, **kwargs):
332        '''return a Xdataset from a xr.DataArray or a xr.Dataset'''
333        xnd = []
334        if xar.attrs:
335            attrs = {k: v for k, v in xar.attrs.items() if k not in [
336                'name', 'ntv_type']}
337            for name, meta in attrs.items():
338                if isinstance(meta, list):
339                    xnd += [Xndarray.read_json({name: meta})]
340                else:
341                    xnd += [Xndarray(name, meta=meta)]
342        for coord in xar.coords:
343            xnd += [XarrayConnec._var_xr_to_xnd(xar.coords[coord])]
344            if list(xar.coords[coord].dims) == list(xar.dims) and isinstance(xar, xr.Dataset):
345                xnd[-1].links = [list(xar.data_vars)[0]]
346        if isinstance(xar, xr.DataArray):
347            var = XarrayConnec._var_xr_to_xnd(
348                xar, name='data', add_attrs=False)
349            xnd += [XarrayConnec._var_xr_to_xnd(xar,
350                                                name='data', add_attrs=False)]
351            xdt = Xclass(xnd, xar.attrs.get('name'))
352            for var in xdt.data_vars:
353                if var != xar.name and xar.name:
354                    xdt[var].links = [xar.name]
355            return xdt.to_canonical()
356        for var in xar.data_vars:
357            xnd += [XarrayConnec._var_xr_to_xnd(xar.data_vars[var])]
358        return Xclass(xnd, xar.attrs.get('name')).to_canonical()
359
360    @staticmethod
361    def _var_xr_to_xnd(var, name=None, add_attrs=True):
362        '''return a Xndarray from a Xarray variable
363
364        *Parameters*
365
366        - **var** : Xarray variable to convert in Xndarray,
367        - **name** : string (default None) - default name if var have no name,
368        - **add_attrs** : boolean (default True) - if False, attrs are not converted
369        '''
370        full_name = var.name if var.name else name
371        name = Nutil.split_name(full_name)[0]
372        dims = None if var.dims == (name,) or var.size == 1 else list(var.dims)
373        ntv_type = var.attrs.get('ntv_type')
374        nda = var.values
375        nda = nda.reshape(1) if not nda.shape else nda
376        if nda.dtype.name == 'datetime64[ns]' and ntv_type:
377            nda = Nutil.convert(ntv_type, nda, tojson=False)
378        attrs = {k: v for k, v in var.attrs.items()
379                 if k not in ['ntv_type', 'name']} if add_attrs else {}
380        return Xndarray(full_name, Ndarray(nda, ntv_type), dims, attrs)
381
382    @staticmethod
383    def _to_xr_attrs(xdt, **option):
384        '''return a dict with attributes from a Xdataset
385
386        *Parameters*
387
388        - **info** : Boolean  if True, add json representation of 'relative'
389        Xndarrays and 'data_arrays' Xndarrays in attrs
390        '''
391        attrs = {meta: xdt[meta].meta for meta in xdt.metadata}
392        attrs |= {'name': xdt.name} if xdt.name else {}
393        if option['info']:
394            for name in xdt.names:
395                if xdt[name].mode == 'relative':
396                    attrs |= xdt[name].to_json(header=False)
397            for name in xdt.data_arrays:
398                attrs |= xdt[name].to_json(header=False)
399        return attrs
400
401    @staticmethod
402    def _to_xr_coord(xdt, name):
403        '''return a dict with Xarray attributes from a Xndarray defined by his name'''
404        data = xdt.to_ndarray(name)
405        if name in xdt.uniques:
406            return {name: data[0]}
407        if name in xdt.additionals and not xdt[name].links:
408            data = data.reshape(xdt.shape_dims(xdt[name].name))
409        dims = tuple(xdt.dims(name)) if xdt.dims(name) else (xdt[name].name)
410        meta = {'ntv_type': xdt[name].ntv_type} | (
411            xdt[name].meta if xdt[name].meta else {})
412        return {name: (dims, data, meta)}
413
414    @staticmethod
415    def _to_xr_vars(xdt, list_names):
416        '''return a dict with Xarray attributes from a list of Xndarray names'''
417        arg_vars = {}
418        valid_names = [
419            nam for nam in list_names if xdt[nam].mode == 'absolute']
420        for xnd_name in valid_names:
421            arg_vars |= XarrayConnec._to_xr_coord(xdt, xnd_name)
422        for name in list_names:
423            if xdt[name].xtype == 'meta':
424                arg_vars |= {name: xdt[name].meta}
425        return arg_vars
426
427    @staticmethod
428    def _xr_add_type(xar):
429        '''add 'ntv_type' as attribute for a xr.DataArray'''
430        if isinstance(xar, xr.DataArray) and 'ntv_type' not in xar.attrs:
431            xar.attrs |= {'ntv_type': Nutil.ntv_type(xar.data.dtype.name)}
432            return
433        for coord in xar.coords:
434            XarrayConnec._xr_add_type(coord)
435        for var in xar.data_vars:
436            XarrayConnec._xr_add_type(var)
437        return

Xarray interface with two static methods ximport and xexport

@staticmethod
def xexport(xdt, **kwargs):
297    @staticmethod
298    def xexport(xdt, **kwargs):
299        '''return a xr.DataArray or a xr.Dataset from a Xdataset
300
301        *Parameters*
302
303        - **dataset** : Boolean (default True) - if False and a single data_var,
304        return a xr.DataArray
305        - **info** : Boolean (default True) - if True, add json representation
306        of 'relative' Xndarrays and 'data_arrays' Xndarrays in attrs
307        '''
308        option = {'dataset': True, 'info': True} | kwargs
309        coords = XarrayConnec._to_xr_vars(
310            xdt, xdt.dimensions + xdt.coordinates + xdt.uniques)
311        coords |= XarrayConnec._to_xr_vars(xdt, xdt.additionals)
312        attrs = XarrayConnec._to_xr_attrs(xdt, **option)
313        if len(xdt.data_vars) == 1 and not option['dataset']:
314            var_name = xdt.data_vars[0]
315            data = xdt.to_ndarray(var_name)
316            dims = xdt.dims(var_name)
317            attrs |= {'ntv_type': xdt[var_name].nda.ntv_type}
318            attrs |= xdt[var_name].meta if xdt[var_name].meta else {}
319            name = var_name if var_name != 'data' else None
320            xrd = xr.DataArray(data=data, coords=coords, dims=dims, attrs=attrs,
321                               name=name)
322        else:
323            data_vars = XarrayConnec._to_xr_vars(xdt, xdt.data_vars)
324            xrd = xr.Dataset(data_vars, coords=coords, attrs=attrs)
325        for unic in xdt.uniques:
326            xrd[unic].attrs |= {'ntv_type': xdt[unic].ntv_type} | (
327                xdt[unic].meta if xdt[unic].meta else {})
328        return xrd

return a xr.DataArray or a xr.Dataset from a Xdataset

Parameters

  • dataset : Boolean (default True) - if False and a single data_var, return a xr.DataArray
  • info : Boolean (default True) - if True, add json representation of 'relative' Xndarrays and 'data_arrays' Xndarrays in attrs
@staticmethod
def ximport(xar, Xclass, **kwargs):
330    @staticmethod
331    def ximport(xar, Xclass, **kwargs):
332        '''return a Xdataset from a xr.DataArray or a xr.Dataset'''
333        xnd = []
334        if xar.attrs:
335            attrs = {k: v for k, v in xar.attrs.items() if k not in [
336                'name', 'ntv_type']}
337            for name, meta in attrs.items():
338                if isinstance(meta, list):
339                    xnd += [Xndarray.read_json({name: meta})]
340                else:
341                    xnd += [Xndarray(name, meta=meta)]
342        for coord in xar.coords:
343            xnd += [XarrayConnec._var_xr_to_xnd(xar.coords[coord])]
344            if list(xar.coords[coord].dims) == list(xar.dims) and isinstance(xar, xr.Dataset):
345                xnd[-1].links = [list(xar.data_vars)[0]]
346        if isinstance(xar, xr.DataArray):
347            var = XarrayConnec._var_xr_to_xnd(
348                xar, name='data', add_attrs=False)
349            xnd += [XarrayConnec._var_xr_to_xnd(xar,
350                                                name='data', add_attrs=False)]
351            xdt = Xclass(xnd, xar.attrs.get('name'))
352            for var in xdt.data_vars:
353                if var != xar.name and xar.name:
354                    xdt[var].links = [xar.name]
355            return xdt.to_canonical()
356        for var in xar.data_vars:
357            xnd += [XarrayConnec._var_xr_to_xnd(xar.data_vars[var])]
358        return Xclass(xnd, xar.attrs.get('name')).to_canonical()

return a Xdataset from a xr.DataArray or a xr.Dataset

class ScippConnec:
440class ScippConnec:
441    ''' Scipp interface with two static methods ximport and xexport'''
442
443    SCTYPE_DTYPE = {'string': 'str'}
444
445    @staticmethod
446    def xexport(xdt, **kwargs):
447        '''return a sc.DataArray or a sc.Dataset from a xdataset
448
449        *Parameters*
450
451        - **dataset** : Boolean (default True) - if False and a single data_var,
452        return a DataArray
453        - **info** : Boolean (default True) - if True return a DataGroup with
454        metadata and data_arrays
455        - **ntv_type** : Boolean (default True) - if True add ntv-type to the name
456        '''
457        option = {'dataset': True, 'info': True,
458                  'ntv_type': True} | kwargs
459        coords = dict([ScippConnec._to_scipp_var(xdt, name, **option)
460                       for name in xdt.coordinates + xdt.dimensions + xdt.uniques
461                       if xdt[name].mode == 'absolute'])
462        scd = sc.Dataset(dict([ScippConnec._to_sc_dataarray(xdt, name, coords, **option)
463                               for name in xdt.data_vars
464                               if xdt[name].mode == 'absolute']))
465        scd = scd if option['dataset'] else scd[list(scd)[0]]
466        if not option['info']:
467            return scd
468        sc_name = xdt.name if xdt.name else 'no_name'
469        return sc.DataGroup({sc_name: scd} | ScippConnec._to_scipp_grp(xdt, **option))
470
471    @staticmethod
472    def ximport(sc_obj, Xclass, **kwargs):
473        '''return a xdataset from a scipp object DataArray, Dataset or DataGroup'''
474        xnd = []
475        scd = sc_obj
476        xnd_name = None
477        if isinstance(sc_obj, sc.DataGroup):
478            for obj in sc_obj:
479                if isinstance(sc_obj[obj], (sc.Dataset, sc.DataArray)):
480                    scd = sc_obj[obj]
481                    xnd_name = obj
482                    break
483        if isinstance(scd, sc.DataArray):
484            scd = sc.Dataset({(scd.name if scd.name else 'no_name'): scd})
485        if isinstance(scd, sc.Dataset):
486            for coord in scd.coords:
487                xnd += ScippConnec._var_sc_to_xnd(
488                    scd.coords[coord], scd, coord)
489            for var in scd:
490                for mask in scd[var].masks:
491                    m_var = Nutil.split_json_name(var)[0]
492                    xnd += ScippConnec._var_sc_to_xnd(
493                        scd[var].masks[mask], scd, mask, m_var)
494                xnd += ScippConnec._var_sc_to_xnd(scd[var].data, scd, var)
495        if isinstance(sc_obj, sc.DataGroup):
496            xnd = ScippConnec._grp_sc_to_xnd(sc_obj, xnd)
497        return Xclass(xnd, xnd_name).to_canonical()
498
499    @staticmethod
500    def _grp_sc_to_xnd(sc_obj, xnd):
501        '''return a list of Xndarray from a scipp variable'''
502        dic_xnd = {xar.name: xar for xar in xnd}
503        for obj in sc_obj:
504            name, add_name = Nutil.split_name(obj)
505            match [name, add_name, sc_obj[obj]]:
506                case [name, None, list()]:
507                    xnd += [Xndarray.read_json({name: sc_obj[obj]})]
508                case [name, add_name, sc.Variable()]:
509                    xnd += ScippConnec._var_sc_to_xnd(
510                        sc_obj[obj], None, add_name, name)
511                case [name, _, dict() | str() | list()] if name in dic_xnd:
512                    if dic_xnd[name].meta:
513                        dic_xnd[name].meta |= sc_obj[obj]
514                    else:
515                        dic_xnd[name].meta = sc_obj[obj]
516                case [name, _, dict() | str() | list()]:
517                    xnd += [Xndarray.read_json({name: sc_obj[obj]})]
518                case [_, _, _]: ...
519        return xnd
520
521    @staticmethod
522    def _var_sc_to_xnd(scv, scd=None, sc_name='', var=None):
523        '''return a list of Xndarray from a scipp variable
524        - scd : scipp dataset
525        - scv : scipp variable
526        - var : name
527        - sc_name : scipp name'''
528        l_xnda = []
529        unit = scv.unit.name if scv.unit and scv.unit not in [
530            'dimensionless', 'ns'] else ''
531        ext_name, typ1 = Nutil.split_json_name(sc_name, True)
532        var_name, typ2 = Nutil.split_json_name(var, True)
533        full_name = var_name + \
534            ('.' if var_name and ext_name else '') + ext_name
535        ntv_type_base = typ1 + typ2
536        ntv_type = ntv_type_base + ('[' + unit + ']' if unit else '')
537        links = [Nutil.split_json_name(jsn)[0] for jsn in scv.dims]
538        if scd is not None and sc_name in scd.coords and scv.dims == scd.dims:
539            links = [Nutil.split_json_name(list(scd)[0])[0]]
540        if scv.variances is not None:
541            nda = Ndarray(scv.variances, ntv_type_base)
542            l_xnda.append(Xndarray(full_name + '.variance', nda, links))
543        nda = Ndarray(scv.values, ntv_type, str_uri=False)
544        shape = scv.shape if scv.shape else (1,)
545        nda.set_shape(shape)
546        l_xnda.append(Xndarray(full_name, nda, links))
547        return l_xnda
548
549    @staticmethod
550    def _to_sc_dataarray(xdt, name, coords, **option):
551        '''return a scipp.DataArray from a xdataset.global_var defined by his name'''
552        scipp_name, data = ScippConnec._to_scipp_var(xdt, name, **option)
553        masks = dict([ScippConnec._to_scipp_var(xdt, nam, **option)
554                     for nam in set(xdt.group(name)) & set(xdt.masks)])
555        return (scipp_name, sc.DataArray(data, coords=coords, masks=masks))
556
557    @staticmethod
558    def _to_scipp_grp(xdt, **option):
559        '''return a dict with metadata, data-array and data_add from a xdataset'''
560        grp = {}
561        grp |= dict([ScippConnec._to_scipp_var(xdt, name, **option)
562                     for name in xdt.data_add + xdt.data_arrays
563                     if xdt[name].add_name != 'variance'])
564        opt_mask = option | {'grp_mask': True}
565        grp |= dict([ScippConnec._to_scipp_var(xdt, name, **opt_mask)
566                     for name in xdt.masks
567                     if xdt[name].name in xdt.names and xdt[name].name not in xdt.data_vars])
568        grp |= {name + '.meta': xdt[name].meta for name in xdt.names
569                if xdt[name].meta}
570        for name in xdt.names:
571            if xdt[name].mode == 'relative':
572                grp |= xdt[name].to_json(header=False)
573        return grp
574
575    @staticmethod
576    def _to_scipp_var(xdt, name, **kwargs):
577        '''return a scipp.Variable from a Xndarray defined by his name'''
578        option = {'grp_mask': False, 'ntv_type': True} | kwargs
579        simple_type, unit = Nutil.split_type(xdt[name].ntv_type)
580        unit = unit if unit else ''
581        add_name = Nutil.split_name(name)[1]
582        new_n = add_name if name in xdt.masks and not option['grp_mask'] else name
583        opt_n = option['ntv_type']
584        scipp_name = new_n + (':' + simple_type if opt_n else '')
585        if name in xdt.uniques:
586            return (scipp_name, sc.scalar(xdt[name].darray[0], unit=unit))
587        vari_name = name + '.variance'
588        variances = xdt[vari_name].darray if vari_name in xdt.names else None
589        dims = xdt.dims(name, opt_n) if xdt.dims(
590            name, opt_n) else [xdt[name].name]
591        var = sc.array(dims=['flat'], values=xdt.to_darray(
592            name), variances=variances, unit=unit)
593        var = sc.fold(var, dim='flat', sizes=dict(zip(dims, xdt[name].shape)))
594        return (scipp_name, var)

Scipp interface with two static methods ximport and xexport

SCTYPE_DTYPE = {'string': 'str'}
@staticmethod
def xexport(xdt, **kwargs):
445    @staticmethod
446    def xexport(xdt, **kwargs):
447        '''return a sc.DataArray or a sc.Dataset from a xdataset
448
449        *Parameters*
450
451        - **dataset** : Boolean (default True) - if False and a single data_var,
452        return a DataArray
453        - **info** : Boolean (default True) - if True return a DataGroup with
454        metadata and data_arrays
455        - **ntv_type** : Boolean (default True) - if True add ntv-type to the name
456        '''
457        option = {'dataset': True, 'info': True,
458                  'ntv_type': True} | kwargs
459        coords = dict([ScippConnec._to_scipp_var(xdt, name, **option)
460                       for name in xdt.coordinates + xdt.dimensions + xdt.uniques
461                       if xdt[name].mode == 'absolute'])
462        scd = sc.Dataset(dict([ScippConnec._to_sc_dataarray(xdt, name, coords, **option)
463                               for name in xdt.data_vars
464                               if xdt[name].mode == 'absolute']))
465        scd = scd if option['dataset'] else scd[list(scd)[0]]
466        if not option['info']:
467            return scd
468        sc_name = xdt.name if xdt.name else 'no_name'
469        return sc.DataGroup({sc_name: scd} | ScippConnec._to_scipp_grp(xdt, **option))

return a sc.DataArray or a sc.Dataset from a xdataset

Parameters

  • dataset : Boolean (default True) - if False and a single data_var, return a DataArray
  • info : Boolean (default True) - if True return a DataGroup with metadata and data_arrays
  • ntv_type : Boolean (default True) - if True add ntv-type to the name
@staticmethod
def ximport(sc_obj, Xclass, **kwargs):
471    @staticmethod
472    def ximport(sc_obj, Xclass, **kwargs):
473        '''return a xdataset from a scipp object DataArray, Dataset or DataGroup'''
474        xnd = []
475        scd = sc_obj
476        xnd_name = None
477        if isinstance(sc_obj, sc.DataGroup):
478            for obj in sc_obj:
479                if isinstance(sc_obj[obj], (sc.Dataset, sc.DataArray)):
480                    scd = sc_obj[obj]
481                    xnd_name = obj
482                    break
483        if isinstance(scd, sc.DataArray):
484            scd = sc.Dataset({(scd.name if scd.name else 'no_name'): scd})
485        if isinstance(scd, sc.Dataset):
486            for coord in scd.coords:
487                xnd += ScippConnec._var_sc_to_xnd(
488                    scd.coords[coord], scd, coord)
489            for var in scd:
490                for mask in scd[var].masks:
491                    m_var = Nutil.split_json_name(var)[0]
492                    xnd += ScippConnec._var_sc_to_xnd(
493                        scd[var].masks[mask], scd, mask, m_var)
494                xnd += ScippConnec._var_sc_to_xnd(scd[var].data, scd, var)
495        if isinstance(sc_obj, sc.DataGroup):
496            xnd = ScippConnec._grp_sc_to_xnd(sc_obj, xnd)
497        return Xclass(xnd, xnd_name).to_canonical()

return a xdataset from a scipp object DataArray, Dataset or DataGroup