ntv-numpy.ntv_numpy.xconnector

@author: Philippe@loco-labs.io

The xconnector module is part of the ntv-numpy.ntv_numpy package (specification document).

It contains interface classes with two static methods ximport and xexport:

For more information, see the user guide or the github repository.

  1# -*- coding: utf-8 -*-
  2"""
  3@author: Philippe@loco-labs.io
  4
  5The `xconnector` module is part of the `ntv-numpy.ntv_numpy` package ([specification document](
  6https://loco-philippe.github.io/ES/JSON%20semantic%20format%20(JSON-NTV).htm)).
  7
  8It contains interface classes with two static methods `ximport` and `xexport`:
  9- `XarrayConnec` class for Xarray Dataset or DataArray,
 10- `AstropyNDDataConnec` class for Astropy NDData,
 11- `ScippConnec` class for Scipp Dataset or DataArray,
 12- `PandasConnec` class for pandas dataFrame,
 13
 14
 15For more information, see the
 16[user guide](https://loco-philippe.github.io/ntv-numpy/docs/user_guide.html)
 17 or the [github repository](https://github.com/loco-philippe/ntv-numpy).
 18"""
 19
 20
 21import xarray as xr
 22import scipp as sc
 23import pandas as pd
 24import numpy as np
 25from astropy import wcs
 26from astropy.nddata import NDData
 27from astropy.nddata.nduncertainty import StdDevUncertainty, VarianceUncertainty
 28from astropy.nddata.nduncertainty import InverseVariance
 29from ntv_numpy.ndarray import Nutil, Ndarray
 30from ntv_numpy.xndarray import Xndarray
 31
 32
 33class AstropyNDDataConnec:
 34    ''' NDData interface with two static methods ximport and xexport'''
 35
 36    @staticmethod
 37    def xexport(xdt, **kwargs):
 38        '''return a NDData from a Xdataset'''
 39        data = xdt['data'].ndarray
 40        mask = xdt['data.mask'].ndarray
 41        unit = xdt['data'].nda.ntvtype.extension
 42        uncert = xdt['data.uncertainty'].ndarray
 43        typ_u = xdt['data.uncertainty'].nda.ntvtype.extension
 44        match typ_u:
 45            case 'std':
 46                uncertainty = StdDevUncertainty(uncert)
 47            case 'var':
 48                uncertainty = VarianceUncertainty(uncert)
 49            case 'inv':
 50                uncertainty = InverseVariance(uncert)
 51            case _:
 52                uncertainty = uncert
 53        meta = xdt['meta'].meta | {'name': xdt.name}
 54        wcs_dic = xdt['wcs'].meta
 55        psf = xdt['psf'].ndarray
 56        return NDData(data, mask=mask, unit=unit, uncertainty=uncertainty,
 57                      meta=meta, wcs=wcs.WCS(wcs_dic), psf=psf)
 58
 59    @staticmethod
 60    def ximport(ndd, Xclass, **kwargs):
 61        '''return a Xdataset from a astropy.NDData'''
 62        xnd = []
 63        name = 'no_name'
 64        unit = ndd.unit.to_string() if not ndd.unit is None else None
 65        ntv_type = Nutil.ntv_type(ndd.data.dtype.name, ext=unit)
 66        xnd += [Xndarray('data', nda=Ndarray(ndd.data, ntv_type=ntv_type))]
 67        if ndd.meta:
 68            meta = {key: val for key, val in ndd.meta.items() if key != 'name'}
 69            name = ndd.meta.get('name', 'no_name')
 70            xnd += [Xndarray('meta', meta=meta)]
 71        if ndd.wcs:
 72            xnd += [Xndarray('wcs', meta=dict(ndd.wcs.to_header()))]
 73        if not ndd.psf is None:
 74            xnd += [Xndarray('psf', nda=Ndarray(ndd.psf, ntv_type=ntv_type))]
 75        if not ndd.mask is None:
 76            xnd += [Xndarray('data.mask', nda=ndd.mask)]
 77        if not ndd.uncertainty is None:
 78            typ_u = ndd.uncertainty.__class__.__name__[:3].lower()
 79            ntv_type = Nutil.ntv_type(
 80                ndd.uncertainty.array.dtype.name, ext=typ_u)
 81            nda = Ndarray(ndd.uncertainty.array, ntv_type=ntv_type)
 82            xnd += [Xndarray('data.uncertainty', nda=nda)]
 83        return Xclass(xnd, name).to_canonical()
 84
 85
 86class PandasConnec:
 87    ''' pandas.DataFrame interface with two static methods ximport and xexport'''
 88
 89    @staticmethod
 90    def xexport(xdt, **kwargs):
 91        '''return a pd.DataFrame from a Xdataset
 92
 93        *Parameters*
 94
 95        - **json_name**: Boolean (default True) - if False use full_name else json_name
 96        - **info**: Boolean (default True) - if True add xdt.info in DataFrame.attrs
 97        - **dims**: list of string (default None) - order of dimensions full_name to apply
 98        '''
 99        opt = {'json_name': True, 'info': True, 'dims': None} | kwargs
100        dic_name = {name: xdt[name].json_name if opt['json_name'] else xdt[name].full_name
101                    for name in xdt.names}
102        dims = xdt.dimensions if not opt['dims'] else tuple(opt['dims'])
103        fields = (xdt.group(dims) + xdt.group(xdt.coordinates) +
104                  xdt.group(xdt.data_vars) + xdt.uniques)
105        fields += tuple(nam for nam in xdt.group(xdt.data_arrays)
106                        if len(xdt[nam]) == xdt.length)
107        fields_array = tuple(var for var in fields if not xdt[var].uri)
108        dic_series = {dic_name[name]: PandasConnec._to_np_series(xdt, name, dims)
109                      for name in fields_array}
110        dfr = pd.DataFrame(dic_series)
111        index = [dic_name[name] for name in dims]
112        if index:
113            dfr = dfr.set_index(index)
114        if opt['info']:
115            dfr.attrs |= {'info': xdt.tab_info}
116            dfr.attrs |= {'metadata': {
117                name: xdt[name].meta for name in xdt.metadata}}
118            fields_uri = [var for var in fields if not var in fields_array]
119            fields_other = [nam for nam in xdt.group(xdt.data_arrays)
120                            if len(xdt[nam]) != xdt.length]
121            if fields_uri:
122                dfr.attrs |= {'fields': {nam: xdt[nam].to_json(noname=True,)
123                                         for nam in fields_uri + fields_other}}
124            if xdt.name:
125                dfr.attrs |= {'name': xdt.name}
126        return dfr
127
128    @staticmethod
129    def ximport(df, Xclass, **kwargs):
130        '''return a Xdataset from a pd.DataFrame
131
132        *Parameters*
133
134        - dims: list of string (default None) - order of dimensions to apply
135        '''
136        opt = {'dims': None} | kwargs
137        xnd = []
138        dfr = df.reset_index()
139        if 'index' in dfr.columns and not 'index' in df.columns:
140            del dfr['index']
141        df_names = {Nutil.split_json_name(j_name)[0]: j_name
142                    for j_name in dfr.columns}
143        df_ntv_types = {Nutil.split_json_name(j_name)[0]:
144                        Nutil.split_json_name(j_name)[1] for j_name in dfr.columns}
145        dfr.columns = [Nutil.split_json_name(name)[0] for name in dfr.columns]
146        if dfr.attrs.get('metadata'):
147            for name, meta in dfr.attrs['metadata'].items():
148                xnd += [Xndarray.read_json({name: meta})]
149        if dfr.attrs.get('fields'):
150            for name, jsn in dfr.attrs['fields'].items():
151                xnd += [Xndarray.read_json({name: jsn})]
152        if dfr.attrs.get('info'):
153            dimensions = dfr.attrs['info']['dimensions']
154            data = dfr.attrs['info']['data']
155        else:
156            dimensions, data = PandasConnec._ximport_analysis(dfr, opt['dims'])
157        shape_dfr = [data[dim]['shape'][0]
158                     for dim in dimensions] if dimensions else len(dfr)
159        dfr = dfr.sort_values(dimensions)
160        for name in df_names:
161            xnd += [PandasConnec._ximport_series(data, name, dfr, dimensions,
162                                                 shape_dfr, df_ntv_types, **opt)]
163        return Xclass(xnd, dfr.attrs.get('name')).to_canonical()
164
165    @staticmethod
166    def _ximport_analysis(dfr, opt_dims):
167        '''return data and dimensions from analysis module
168        - opt_dims: partition to apply
169        - dfr: dataframe to analyse'''
170        dfr_idx = list(dfr.index.names)
171        opt_dims = dfr_idx if dfr_idx != [None] else opt_dims
172        ana = dfr.npd.analysis(distr=True)
173        partition = ana.field_partition(partition=opt_dims, mode='id')
174        part_rel = ana.relation_partition(partition=opt_dims, noroot=True)
175        part_dim = ana.relation_partition(
176            partition=opt_dims, noroot=True, primary=True)
177        dimensions = partition['primary']
178        len_fields = {fld.idfield: fld.lencodec for fld in ana.fields}
179        data = {fld.idfield: {
180            'shape': [len_fields[dim] for dim in part_dim[fld.idfield]] if part_dim else [],
181            'links': part_rel[fld.idfield] if part_rel else []} for fld in ana.fields}
182        for json_name in data:
183            if not data[json_name]['shape']:
184                name = Nutil.split_name(Nutil.split_json_name(json_name)[0])[0]
185                p_name = [js_name for js_name in data
186                          if Nutil.split_json_name(js_name)[0] == name][0]
187                data[json_name]['shape'] = data[p_name]['shape']
188        return (dimensions, data)
189
190    @staticmethod
191    def _ximport_series(data, name, dfr, dimensions, shape_dfr, df_ntv_types, **opt):
192        '''return a Xndarray from a Series of a pd.DataFrame'''
193        if data[name].get('xtype') == 'meta':  # or len(dfr[name].unique()) == 1:
194            return Xndarray(name, meta=dfr[name].iloc[0])
195        meta = data[name].get('meta')
196        ntv_type = df_ntv_types[name]
197        if len(dfr[name].unique()) == 1:
198            nda = Ndarray(np.array(dfr[name].iloc[0]),
199                          ntv_type=ntv_type, str_uri=False)
200            nda.set_shape([1])
201            return Xndarray(name, nda=nda, meta=meta)
202        if not dimensions:
203            nda = Ndarray(np.array(dfr[name]), ntv_type=ntv_type)
204            return Xndarray(name, nda=nda, meta=meta)
205        dims = []
206        PandasConnec._get_dims(dims, name, data, dimensions)
207        if not dims:
208            p_name, add_name = Nutil.split_name(name)
209            if add_name:
210                PandasConnec._get_dims(dims, p_name, data, dimensions)
211        np_array = PandasConnec._from_series(dfr, name, shape_dfr,
212                                             dimensions, dims, opt['dims'])
213        shape = data[name].get('shape', [len(dfr)])
214        nda = Ndarray(np_array, ntv_type, shape)
215        links = data[name].get('links')
216        return Xndarray(name, nda=nda, links=links if links else dims, meta=meta)
217
218    @staticmethod
219    def _to_np_series(xdt, name, dims):
220        '''return a np.ndarray from the Xndarray of xdt defined by his name
221
222        *parameters*
223
224        - **xdt**: Xdataset - data to convert in a pd.DataFrame
225        - **name**: string - full_name of the Xndarray to convert
226        - **dims**: list of string - order of dimensions full_name to apply'''
227        if name in xdt.uniques:
228            return np.array([xdt[name].darray[0]] * xdt.length)
229        if xdt[name].shape == [xdt.length]:
230            return xdt[name].darray
231        n_shape = {nam: len(xdt[nam]) for nam in dims}
232        dim_name = xdt.dims(name)
233        if not set(dim_name) <= set(dims):
234            return None
235        add_name = [nam for nam in dims if not nam in dim_name]
236        tab_name = add_name + dim_name
237
238        til = 1
239        for nam in add_name:
240            til *= n_shape[nam]
241        shap = [n_shape[nam] for nam in tab_name]
242        order = [dims.index(nam) for nam in tab_name]
243        arr = xdt[name].darray
244        return Nutil.extend_array(arr, til, shap, order)
245
246    @staticmethod
247    def _from_series(dfr, name, shape, dims, links, new_dims=None):
248        '''return a flattened np.ndarray from the pd.Series of dfr defined by his name
249
250        *parameters*
251
252        - dfr: DataFrame - data to convert in Xdataset
253        - name: string - name of the Series (full_name or json_name)
254        - shape: shape of the Xdataset
255        - dims: list of string - list of name of dimensions
256        - links: list of string - list of linked Series
257        - new_dims: list of string (default None) - new order of dims
258        '''
259        if not links:
260            return np.array(dfr[name])
261        old_order = list(range(len(dims)))
262        new_dims = new_dims if new_dims else dims
263        order = [dims.index(dim)
264                 for dim in new_dims] if new_dims else old_order
265        idx = [0] * len(dims)
266        for nam in links:
267            idx[new_dims.index(nam)] = slice(shape[dims.index(nam)])
268        xar = np.moveaxis(np.array(dfr[name]).reshape(
269            shape), old_order, order)[*idx]
270        if not links:
271            return xar.flatten()
272        lnk = [nam for nam in new_dims if nam in links]
273        shape_lnk = [shape[dims.index(nam)] for nam in lnk]
274        xar = xar.reshape(shape_lnk)
275        old_order = list(range(len(links)))
276        order = [lnk.index(dim) for dim in links]
277        return np.moveaxis(xar, old_order, order).flatten()
278
279    @staticmethod
280    def _get_dims(dims, name, data, dimensions):
281        '''add names of dimensions into dims'''
282        if not name:
283            return
284        if name in dimensions:
285            dims += [name]
286        else:
287            if not 'links' in data[name]:
288                return
289            for nam in data[name]['links']:
290                PandasConnec._get_dims(dims, nam, data, dimensions)
291
292
293class XarrayConnec:
294    ''' Xarray interface with two static methods ximport and xexport'''
295
296    @staticmethod
297    def xexport(xdt, **kwargs):
298        '''return a xr.DataArray or a xr.Dataset from a Xdataset
299
300        *Parameters*
301
302        - **dataset** : Boolean (default True) - if False and a single data_var,
303        return a sc.DataArray
304        - **datagroup** : Boolean (default True) - if True, return a sc.DataGroup
305        which contains the sc.DataArray/sc.Dataset and the other data else only
306        sc.DataArray/sc.Dataset
307        '''
308        option = {'dataset': True, 'datagroup': True} | kwargs
309        coords = XarrayConnec._to_xr_vars(
310            xdt, xdt.dimensions + xdt.coordinates + xdt.uniques)
311        coords |= XarrayConnec._to_xr_vars(xdt, xdt.additionals)
312        attrs = XarrayConnec._to_xr_attrs(xdt, **option)
313        if len(xdt.data_vars) == 1 and not option['dataset']:
314            var_name = xdt.data_vars[0]
315            data = xdt.to_ndarray(var_name)
316            dims = xdt.dims(var_name)
317            attrs |= {'ntv_type': xdt[var_name].nda.ntv_type}
318            attrs |= xdt[var_name].meta if xdt[var_name].meta else {}
319            name = var_name if var_name != 'data' else None
320            xrd = xr.DataArray(data=data, coords=coords, dims=dims, attrs=attrs,
321                               name=name)
322        else:
323            data_vars = XarrayConnec._to_xr_vars(xdt, xdt.data_vars)
324            xrd = xr.Dataset(data_vars, coords=coords, attrs=attrs)
325        for unic in xdt.uniques:
326            xrd[unic].attrs |= {'ntv_type': xdt[unic].ntv_type} | (
327                xdt[unic].meta if xdt[unic].meta else {})
328        return xrd
329
330    @staticmethod
331    def ximport(xar, Xclass, **kwargs):
332        '''return a Xdataset from a xr.DataArray or a xr.Dataset'''
333        xnd = []
334        if xar.attrs:
335            attrs = {k: v for k, v in xar.attrs.items() if not k in [
336                'name', 'ntv_type']}
337            for name, meta in attrs.items():
338                if isinstance(meta, list):
339                    xnd += [Xndarray.read_json({name: meta})]
340                else:
341                    xnd += [Xndarray(name, meta=meta)]
342        for coord in xar.coords:
343            xnd += [XarrayConnec._var_xr_to_xnd(xar.coords[coord])]
344            if list(xar.coords[coord].dims) == list(xar.dims) and isinstance(xar, xr.Dataset):
345                xnd[-1].links = [list(xar.data_vars)[0]]
346        if isinstance(xar, xr.DataArray):
347            var = XarrayConnec._var_xr_to_xnd(
348                xar, name='data', add_attrs=False)
349            xnd += [XarrayConnec._var_xr_to_xnd(xar,
350                                                name='data', add_attrs=False)]
351            xdt = Xclass(xnd, xar.attrs.get('name'))
352            for var in xdt.data_vars:
353                if var != xar.name and xar.name:
354                    xdt[var].links = [xar.name]
355            return xdt.to_canonical()
356        for var in xar.data_vars:
357            xnd += [XarrayConnec._var_xr_to_xnd(xar.data_vars[var])]
358        return Xclass(xnd, xar.attrs.get('name')).to_canonical()
359
360    @staticmethod
361    def _var_xr_to_xnd(var, name=None, add_attrs=True):
362        '''return a Xndarray from a Xarray variable
363
364        *Parameters*
365
366        - **var** : Xarray variable to convert in Xndarray,
367        - **name** : string (default None) - default name if var have no name,
368        - **add_attrs** : boolean (default True) - if False, attrs are not converted
369        '''
370        full_name = var.name if var.name else name
371        name = Nutil.split_name(full_name)[0]
372        dims = None if var.dims == (name,) or var.size == 1 else list(var.dims)
373        ntv_type = var.attrs.get('ntv_type')
374        nda = var.values
375        nda = nda.reshape(1) if not nda.shape else nda
376        if nda.dtype.name == 'datetime64[ns]' and ntv_type:
377            nda = Nutil.convert(ntv_type, nda, tojson=False)
378        attrs = {k: v for k, v in var.attrs.items()
379                 if not k in ['ntv_type', 'name']} if add_attrs else {}
380        return Xndarray(full_name, Ndarray(nda, ntv_type), dims, attrs)
381
382    @staticmethod
383    def _to_xr_attrs(xdt, **option):
384        '''return a dict with attributes from a Xdataset
385
386        *Parameters*
387
388        - **datagroup** : Boolean  if True, add json representation of 'relative'
389        Xndarrays and 'data_arrays' Xndarrays
390        '''
391        attrs = {meta: xdt[meta].meta for meta in xdt.metadata}
392        attrs |= {'name': xdt.name} if xdt.name else {}
393        if option['datagroup']:
394            for name in xdt.names:
395                if xdt[name].mode == 'relative':
396                    attrs |= xdt[name].to_json(header=False)
397            for name in xdt.data_arrays:
398                attrs |= xdt[name].to_json(header=False)
399        return attrs
400
401    @staticmethod
402    def _to_xr_coord(xdt, name):
403        '''return a dict with Xarray attributes from a Xndarray defined by his name'''
404        data = xdt.to_ndarray(name)
405        if name in xdt.uniques:
406            return {name: data[0]}
407        if name in xdt.additionals and not xdt[name].links:
408            data = data.reshape(xdt.shape_dims(xdt[name].name))
409        dims = tuple(xdt.dims(name)) if xdt.dims(name) else (xdt[name].name)
410        meta = {'ntv_type': xdt[name].ntv_type} | (
411            xdt[name].meta if xdt[name].meta else {})
412        return {name: (dims, data, meta)}
413
414    @staticmethod
415    def _to_xr_vars(xdt, list_names):
416        '''return a dict with Xarray attributes from a list of Xndarray names'''
417        arg_vars = {}
418        valid_names = [
419            nam for nam in list_names if xdt[nam].mode == 'absolute']
420        for xnd_name in valid_names:
421            arg_vars |= XarrayConnec._to_xr_coord(xdt, xnd_name)
422        for name in list_names:
423            if xdt[name].xtype == 'meta':
424                arg_vars |= {name: xdt[name].meta}
425        return arg_vars
426
427    @staticmethod
428    def _xr_add_type(xar):
429        '''add 'ntv_type' as attribute for a xr.DataArray'''
430        if isinstance(xar, xr.DataArray) and not 'ntv_type' in xar.attrs:
431            xar.attrs |= {'ntv_type': Nutil.ntv_type(xar.data.dtype.name)}
432            return
433        for coord in xar.coords:
434            XarrayConnec._xr_add_type(coord)
435        for var in xar.data_vars:
436            XarrayConnec._xr_add_type(var)
437        return
438
439
440class ScippConnec:
441    ''' Scipp interface with two static methods ximport and xexport'''
442
443    SCTYPE_DTYPE = {'string': 'str'}
444
445    @staticmethod
446    def xexport(xdt, **kwargs):
447        '''return a sc.DataArray or a sc.Dataset from a xdataset
448
449        *Parameters*
450
451        - **dataset** : Boolean (default True) - if False and a single data_var,
452        return a DataArray
453        - **datagroup** : Boolean (default True) - if True return a DataGroup with
454        metadata and data_arrays
455        - **ntv_type** : Boolean (default True) - if True add ntv-type to the name
456        '''
457        option = {'dataset': True, 'datagroup': True,
458                  'ntv_type': True} | kwargs
459        coords = dict([ScippConnec._to_scipp_var(xdt, name, **option)
460                       for name in xdt.coordinates + xdt.dimensions + xdt.uniques
461                       if xdt[name].mode == 'absolute'])
462        scd = sc.Dataset(dict([ScippConnec._to_sc_dataarray(xdt, name, coords, **option)
463                               for name in xdt.data_vars
464                               if xdt[name].mode == 'absolute']))
465        scd = scd if option['dataset'] else scd[list(scd)[0]]
466        if not option['datagroup']:
467            return scd
468        sc_name = xdt.name if xdt.name else 'no_name'
469        return sc.DataGroup({sc_name: scd} | ScippConnec._to_scipp_grp(xdt, **option))
470
471    @staticmethod
472    def ximport(sc_obj, Xclass, **kwargs):
473        '''return a xdataset from a scipp object DataArray, Dataset or DataGroup'''
474        xnd = []
475        scd = sc_obj
476        xnd_name = None
477        if isinstance(sc_obj, sc.DataGroup):
478            for obj in sc_obj:
479                if isinstance(sc_obj[obj], (sc.Dataset, sc.DataArray)):
480                    scd = sc_obj[obj]
481                    xnd_name = obj
482                    break
483        if isinstance(scd, sc.DataArray):
484            scd = sc.Dataset({(scd.name if scd.name else 'no_name'): scd})
485        if isinstance(scd, sc.Dataset):
486            for coord in scd.coords:
487                xnd += ScippConnec._var_sc_to_xnd(
488                    scd.coords[coord], scd, coord)
489            for var in scd:
490                for mask in scd[var].masks:
491                    m_var = Nutil.split_json_name(var)[0]
492                    xnd += ScippConnec._var_sc_to_xnd(
493                        scd[var].masks[mask], scd, mask, m_var)
494                xnd += ScippConnec._var_sc_to_xnd(scd[var].data, scd, var)
495        if isinstance(sc_obj, sc.DataGroup):
496            xnd = ScippConnec._grp_sc_to_xnd(sc_obj, xnd)
497        return Xclass(xnd, xnd_name).to_canonical()
498
499    @staticmethod
500    def _grp_sc_to_xnd(sc_obj, xnd):
501        '''return a list of Xndarray from a scipp variable'''
502        dic_xnd = {xar.name: xar for xar in xnd}
503        for obj in sc_obj:
504            name, add_name = Nutil.split_name(obj)
505            match [name, add_name, sc_obj[obj]]:
506                case [name, None, list()]:
507                    xnd += [Xndarray.read_json({name: sc_obj[obj]})]
508                case [name, add_name, sc.Variable()]:
509                    xnd += ScippConnec._var_sc_to_xnd(
510                        sc_obj[obj], None, add_name, name)
511                case [name, _, dict() | str() | list()] if name in dic_xnd:
512                    if dic_xnd[name].meta:
513                        dic_xnd[name].meta |= sc_obj[obj]
514                    else:
515                        dic_xnd[name].meta = sc_obj[obj]
516                case [name, _, dict() | str() | list()]:
517                    xnd += [Xndarray.read_json({name: sc_obj[obj]})]
518                case [_, _, _]: ...
519        return xnd
520
521    @staticmethod
522    def _var_sc_to_xnd(scv, scd=None, sc_name='', var=None):
523        '''return a list of Xndarray from a scipp variable
524        - scd : scipp dataset
525        - scv : scipp variable
526        - var : name
527        - sc_name : scipp name'''
528        l_xnda = []
529        unit = scv.unit.name if scv.unit and not scv.unit in [
530            'dimensionless', 'ns'] else ''
531        ext_name, typ1 = Nutil.split_json_name(sc_name, True)
532        var_name, typ2 = Nutil.split_json_name(var, True)
533        full_name = var_name + \
534            ('.' if var_name and ext_name else '') + ext_name
535        ntv_type_base = typ1 + typ2
536        ntv_type = ntv_type_base + ('[' + unit + ']' if unit else '')
537        links = [Nutil.split_json_name(jsn)[0] for jsn in scv.dims]
538        if not scd is None and sc_name in scd.coords and scv.dims == scd.dims:
539            links = [Nutil.split_json_name(list(scd)[0])[0]]
540        if not scv.variances is None:
541            nda = Ndarray(scv.variances, ntv_type_base)
542            l_xnda.append(Xndarray(full_name + '.variance', nda, links))
543        nda = Ndarray(scv.values, ntv_type, str_uri=False)
544        shape = scv.shape if scv.shape else (1,)
545        nda.set_shape(shape)
546        l_xnda.append(Xndarray(full_name, nda, links))
547        return l_xnda
548
549    @staticmethod
550    def _to_sc_dataarray(xdt, name, coords, **option):
551        '''return a scipp.DataArray from a xdataset.global_var defined by his name'''
552        scipp_name, data = ScippConnec._to_scipp_var(xdt, name, **option)
553        masks = dict([ScippConnec._to_scipp_var(xdt, nam, **option)
554                     for nam in set(xdt.group(name)) & set(xdt.masks)])
555        return (scipp_name, sc.DataArray(data, coords=coords, masks=masks))
556
557    @staticmethod
558    def _to_scipp_grp(xdt, **option):
559        '''return a dict with metadata, data-array and data_add from a xdataset'''
560        grp = {}
561        grp |= dict([ScippConnec._to_scipp_var(xdt, name, **option)
562                     for name in xdt.data_add + xdt.data_arrays
563                     if xdt[name].add_name != 'variance'])
564        opt_mask = option | {'grp_mask': True}
565        grp |= dict([ScippConnec._to_scipp_var(xdt, name, **opt_mask)
566                     for name in xdt.masks
567                     if xdt[name].name in xdt.names and not xdt[name].name in xdt.data_vars])
568        grp |= {name + '.meta': xdt[name].meta for name in xdt.names
569                if xdt[name].meta}
570        for name in xdt.names:
571            if xdt[name].mode == 'relative':
572                grp |= xdt[name].to_json(header=False)
573        return grp
574
575    @staticmethod
576    def _to_scipp_var(xdt, name, **kwargs):
577        '''return a scipp.Variable from a Xndarray defined by his name'''
578        option = {'grp_mask': False, 'ntv_type': True} | kwargs
579        simple_type, unit = Nutil.split_type(xdt[name].ntv_type)
580        unit = unit if unit else ''
581        add_name = Nutil.split_name(name)[1]
582        new_n = add_name if name in xdt.masks and not option['grp_mask'] else name
583        opt_n = option['ntv_type']
584        scipp_name = new_n + (':' + simple_type if opt_n else '')
585        if name in xdt.uniques:
586            return (scipp_name, sc.scalar(xdt[name].darray[0], unit=unit))
587        vari_name = name + '.variance'
588        variances = xdt[vari_name].darray if vari_name in xdt.names else None
589        dims = xdt.dims(name, opt_n) if xdt.dims(
590            name, opt_n) else [xdt[name].name]
591        var = sc.array(dims=['flat'], values=xdt.to_darray(
592            name), variances=variances, unit=unit)
593        var = sc.fold(var, dim='flat', sizes=dict(zip(dims, xdt[name].shape)))
594        return (scipp_name, var)
class AstropyNDDataConnec:
34class AstropyNDDataConnec:
35    ''' NDData interface with two static methods ximport and xexport'''
36
37    @staticmethod
38    def xexport(xdt, **kwargs):
39        '''return a NDData from a Xdataset'''
40        data = xdt['data'].ndarray
41        mask = xdt['data.mask'].ndarray
42        unit = xdt['data'].nda.ntvtype.extension
43        uncert = xdt['data.uncertainty'].ndarray
44        typ_u = xdt['data.uncertainty'].nda.ntvtype.extension
45        match typ_u:
46            case 'std':
47                uncertainty = StdDevUncertainty(uncert)
48            case 'var':
49                uncertainty = VarianceUncertainty(uncert)
50            case 'inv':
51                uncertainty = InverseVariance(uncert)
52            case _:
53                uncertainty = uncert
54        meta = xdt['meta'].meta | {'name': xdt.name}
55        wcs_dic = xdt['wcs'].meta
56        psf = xdt['psf'].ndarray
57        return NDData(data, mask=mask, unit=unit, uncertainty=uncertainty,
58                      meta=meta, wcs=wcs.WCS(wcs_dic), psf=psf)
59
60    @staticmethod
61    def ximport(ndd, Xclass, **kwargs):
62        '''return a Xdataset from a astropy.NDData'''
63        xnd = []
64        name = 'no_name'
65        unit = ndd.unit.to_string() if not ndd.unit is None else None
66        ntv_type = Nutil.ntv_type(ndd.data.dtype.name, ext=unit)
67        xnd += [Xndarray('data', nda=Ndarray(ndd.data, ntv_type=ntv_type))]
68        if ndd.meta:
69            meta = {key: val for key, val in ndd.meta.items() if key != 'name'}
70            name = ndd.meta.get('name', 'no_name')
71            xnd += [Xndarray('meta', meta=meta)]
72        if ndd.wcs:
73            xnd += [Xndarray('wcs', meta=dict(ndd.wcs.to_header()))]
74        if not ndd.psf is None:
75            xnd += [Xndarray('psf', nda=Ndarray(ndd.psf, ntv_type=ntv_type))]
76        if not ndd.mask is None:
77            xnd += [Xndarray('data.mask', nda=ndd.mask)]
78        if not ndd.uncertainty is None:
79            typ_u = ndd.uncertainty.__class__.__name__[:3].lower()
80            ntv_type = Nutil.ntv_type(
81                ndd.uncertainty.array.dtype.name, ext=typ_u)
82            nda = Ndarray(ndd.uncertainty.array, ntv_type=ntv_type)
83            xnd += [Xndarray('data.uncertainty', nda=nda)]
84        return Xclass(xnd, name).to_canonical()

NDData interface with two static methods ximport and xexport

@staticmethod
def xexport(xdt, **kwargs):
37    @staticmethod
38    def xexport(xdt, **kwargs):
39        '''return a NDData from a Xdataset'''
40        data = xdt['data'].ndarray
41        mask = xdt['data.mask'].ndarray
42        unit = xdt['data'].nda.ntvtype.extension
43        uncert = xdt['data.uncertainty'].ndarray
44        typ_u = xdt['data.uncertainty'].nda.ntvtype.extension
45        match typ_u:
46            case 'std':
47                uncertainty = StdDevUncertainty(uncert)
48            case 'var':
49                uncertainty = VarianceUncertainty(uncert)
50            case 'inv':
51                uncertainty = InverseVariance(uncert)
52            case _:
53                uncertainty = uncert
54        meta = xdt['meta'].meta | {'name': xdt.name}
55        wcs_dic = xdt['wcs'].meta
56        psf = xdt['psf'].ndarray
57        return NDData(data, mask=mask, unit=unit, uncertainty=uncertainty,
58                      meta=meta, wcs=wcs.WCS(wcs_dic), psf=psf)

return a NDData from a Xdataset

@staticmethod
def ximport(ndd, Xclass, **kwargs):
60    @staticmethod
61    def ximport(ndd, Xclass, **kwargs):
62        '''return a Xdataset from a astropy.NDData'''
63        xnd = []
64        name = 'no_name'
65        unit = ndd.unit.to_string() if not ndd.unit is None else None
66        ntv_type = Nutil.ntv_type(ndd.data.dtype.name, ext=unit)
67        xnd += [Xndarray('data', nda=Ndarray(ndd.data, ntv_type=ntv_type))]
68        if ndd.meta:
69            meta = {key: val for key, val in ndd.meta.items() if key != 'name'}
70            name = ndd.meta.get('name', 'no_name')
71            xnd += [Xndarray('meta', meta=meta)]
72        if ndd.wcs:
73            xnd += [Xndarray('wcs', meta=dict(ndd.wcs.to_header()))]
74        if not ndd.psf is None:
75            xnd += [Xndarray('psf', nda=Ndarray(ndd.psf, ntv_type=ntv_type))]
76        if not ndd.mask is None:
77            xnd += [Xndarray('data.mask', nda=ndd.mask)]
78        if not ndd.uncertainty is None:
79            typ_u = ndd.uncertainty.__class__.__name__[:3].lower()
80            ntv_type = Nutil.ntv_type(
81                ndd.uncertainty.array.dtype.name, ext=typ_u)
82            nda = Ndarray(ndd.uncertainty.array, ntv_type=ntv_type)
83            xnd += [Xndarray('data.uncertainty', nda=nda)]
84        return Xclass(xnd, name).to_canonical()

return a Xdataset from a astropy.NDData

class PandasConnec:
 87class PandasConnec:
 88    ''' pandas.DataFrame interface with two static methods ximport and xexport'''
 89
 90    @staticmethod
 91    def xexport(xdt, **kwargs):
 92        '''return a pd.DataFrame from a Xdataset
 93
 94        *Parameters*
 95
 96        - **json_name**: Boolean (default True) - if False use full_name else json_name
 97        - **info**: Boolean (default True) - if True add xdt.info in DataFrame.attrs
 98        - **dims**: list of string (default None) - order of dimensions full_name to apply
 99        '''
100        opt = {'json_name': True, 'info': True, 'dims': None} | kwargs
101        dic_name = {name: xdt[name].json_name if opt['json_name'] else xdt[name].full_name
102                    for name in xdt.names}
103        dims = xdt.dimensions if not opt['dims'] else tuple(opt['dims'])
104        fields = (xdt.group(dims) + xdt.group(xdt.coordinates) +
105                  xdt.group(xdt.data_vars) + xdt.uniques)
106        fields += tuple(nam for nam in xdt.group(xdt.data_arrays)
107                        if len(xdt[nam]) == xdt.length)
108        fields_array = tuple(var for var in fields if not xdt[var].uri)
109        dic_series = {dic_name[name]: PandasConnec._to_np_series(xdt, name, dims)
110                      for name in fields_array}
111        dfr = pd.DataFrame(dic_series)
112        index = [dic_name[name] for name in dims]
113        if index:
114            dfr = dfr.set_index(index)
115        if opt['info']:
116            dfr.attrs |= {'info': xdt.tab_info}
117            dfr.attrs |= {'metadata': {
118                name: xdt[name].meta for name in xdt.metadata}}
119            fields_uri = [var for var in fields if not var in fields_array]
120            fields_other = [nam for nam in xdt.group(xdt.data_arrays)
121                            if len(xdt[nam]) != xdt.length]
122            if fields_uri:
123                dfr.attrs |= {'fields': {nam: xdt[nam].to_json(noname=True,)
124                                         for nam in fields_uri + fields_other}}
125            if xdt.name:
126                dfr.attrs |= {'name': xdt.name}
127        return dfr
128
129    @staticmethod
130    def ximport(df, Xclass, **kwargs):
131        '''return a Xdataset from a pd.DataFrame
132
133        *Parameters*
134
135        - dims: list of string (default None) - order of dimensions to apply
136        '''
137        opt = {'dims': None} | kwargs
138        xnd = []
139        dfr = df.reset_index()
140        if 'index' in dfr.columns and not 'index' in df.columns:
141            del dfr['index']
142        df_names = {Nutil.split_json_name(j_name)[0]: j_name
143                    for j_name in dfr.columns}
144        df_ntv_types = {Nutil.split_json_name(j_name)[0]:
145                        Nutil.split_json_name(j_name)[1] for j_name in dfr.columns}
146        dfr.columns = [Nutil.split_json_name(name)[0] for name in dfr.columns]
147        if dfr.attrs.get('metadata'):
148            for name, meta in dfr.attrs['metadata'].items():
149                xnd += [Xndarray.read_json({name: meta})]
150        if dfr.attrs.get('fields'):
151            for name, jsn in dfr.attrs['fields'].items():
152                xnd += [Xndarray.read_json({name: jsn})]
153        if dfr.attrs.get('info'):
154            dimensions = dfr.attrs['info']['dimensions']
155            data = dfr.attrs['info']['data']
156        else:
157            dimensions, data = PandasConnec._ximport_analysis(dfr, opt['dims'])
158        shape_dfr = [data[dim]['shape'][0]
159                     for dim in dimensions] if dimensions else len(dfr)
160        dfr = dfr.sort_values(dimensions)
161        for name in df_names:
162            xnd += [PandasConnec._ximport_series(data, name, dfr, dimensions,
163                                                 shape_dfr, df_ntv_types, **opt)]
164        return Xclass(xnd, dfr.attrs.get('name')).to_canonical()
165
166    @staticmethod
167    def _ximport_analysis(dfr, opt_dims):
168        '''return data and dimensions from analysis module
169        - opt_dims: partition to apply
170        - dfr: dataframe to analyse'''
171        dfr_idx = list(dfr.index.names)
172        opt_dims = dfr_idx if dfr_idx != [None] else opt_dims
173        ana = dfr.npd.analysis(distr=True)
174        partition = ana.field_partition(partition=opt_dims, mode='id')
175        part_rel = ana.relation_partition(partition=opt_dims, noroot=True)
176        part_dim = ana.relation_partition(
177            partition=opt_dims, noroot=True, primary=True)
178        dimensions = partition['primary']
179        len_fields = {fld.idfield: fld.lencodec for fld in ana.fields}
180        data = {fld.idfield: {
181            'shape': [len_fields[dim] for dim in part_dim[fld.idfield]] if part_dim else [],
182            'links': part_rel[fld.idfield] if part_rel else []} for fld in ana.fields}
183        for json_name in data:
184            if not data[json_name]['shape']:
185                name = Nutil.split_name(Nutil.split_json_name(json_name)[0])[0]
186                p_name = [js_name for js_name in data
187                          if Nutil.split_json_name(js_name)[0] == name][0]
188                data[json_name]['shape'] = data[p_name]['shape']
189        return (dimensions, data)
190
191    @staticmethod
192    def _ximport_series(data, name, dfr, dimensions, shape_dfr, df_ntv_types, **opt):
193        '''return a Xndarray from a Series of a pd.DataFrame'''
194        if data[name].get('xtype') == 'meta':  # or len(dfr[name].unique()) == 1:
195            return Xndarray(name, meta=dfr[name].iloc[0])
196        meta = data[name].get('meta')
197        ntv_type = df_ntv_types[name]
198        if len(dfr[name].unique()) == 1:
199            nda = Ndarray(np.array(dfr[name].iloc[0]),
200                          ntv_type=ntv_type, str_uri=False)
201            nda.set_shape([1])
202            return Xndarray(name, nda=nda, meta=meta)
203        if not dimensions:
204            nda = Ndarray(np.array(dfr[name]), ntv_type=ntv_type)
205            return Xndarray(name, nda=nda, meta=meta)
206        dims = []
207        PandasConnec._get_dims(dims, name, data, dimensions)
208        if not dims:
209            p_name, add_name = Nutil.split_name(name)
210            if add_name:
211                PandasConnec._get_dims(dims, p_name, data, dimensions)
212        np_array = PandasConnec._from_series(dfr, name, shape_dfr,
213                                             dimensions, dims, opt['dims'])
214        shape = data[name].get('shape', [len(dfr)])
215        nda = Ndarray(np_array, ntv_type, shape)
216        links = data[name].get('links')
217        return Xndarray(name, nda=nda, links=links if links else dims, meta=meta)
218
219    @staticmethod
220    def _to_np_series(xdt, name, dims):
221        '''return a np.ndarray from the Xndarray of xdt defined by his name
222
223        *parameters*
224
225        - **xdt**: Xdataset - data to convert in a pd.DataFrame
226        - **name**: string - full_name of the Xndarray to convert
227        - **dims**: list of string - order of dimensions full_name to apply'''
228        if name in xdt.uniques:
229            return np.array([xdt[name].darray[0]] * xdt.length)
230        if xdt[name].shape == [xdt.length]:
231            return xdt[name].darray
232        n_shape = {nam: len(xdt[nam]) for nam in dims}
233        dim_name = xdt.dims(name)
234        if not set(dim_name) <= set(dims):
235            return None
236        add_name = [nam for nam in dims if not nam in dim_name]
237        tab_name = add_name + dim_name
238
239        til = 1
240        for nam in add_name:
241            til *= n_shape[nam]
242        shap = [n_shape[nam] for nam in tab_name]
243        order = [dims.index(nam) for nam in tab_name]
244        arr = xdt[name].darray
245        return Nutil.extend_array(arr, til, shap, order)
246
247    @staticmethod
248    def _from_series(dfr, name, shape, dims, links, new_dims=None):
249        '''return a flattened np.ndarray from the pd.Series of dfr defined by his name
250
251        *parameters*
252
253        - dfr: DataFrame - data to convert in Xdataset
254        - name: string - name of the Series (full_name or json_name)
255        - shape: shape of the Xdataset
256        - dims: list of string - list of name of dimensions
257        - links: list of string - list of linked Series
258        - new_dims: list of string (default None) - new order of dims
259        '''
260        if not links:
261            return np.array(dfr[name])
262        old_order = list(range(len(dims)))
263        new_dims = new_dims if new_dims else dims
264        order = [dims.index(dim)
265                 for dim in new_dims] if new_dims else old_order
266        idx = [0] * len(dims)
267        for nam in links:
268            idx[new_dims.index(nam)] = slice(shape[dims.index(nam)])
269        xar = np.moveaxis(np.array(dfr[name]).reshape(
270            shape), old_order, order)[*idx]
271        if not links:
272            return xar.flatten()
273        lnk = [nam for nam in new_dims if nam in links]
274        shape_lnk = [shape[dims.index(nam)] for nam in lnk]
275        xar = xar.reshape(shape_lnk)
276        old_order = list(range(len(links)))
277        order = [lnk.index(dim) for dim in links]
278        return np.moveaxis(xar, old_order, order).flatten()
279
280    @staticmethod
281    def _get_dims(dims, name, data, dimensions):
282        '''add names of dimensions into dims'''
283        if not name:
284            return
285        if name in dimensions:
286            dims += [name]
287        else:
288            if not 'links' in data[name]:
289                return
290            for nam in data[name]['links']:
291                PandasConnec._get_dims(dims, nam, data, dimensions)

pandas.DataFrame interface with two static methods ximport and xexport

@staticmethod
def xexport(xdt, **kwargs):
 90    @staticmethod
 91    def xexport(xdt, **kwargs):
 92        '''return a pd.DataFrame from a Xdataset
 93
 94        *Parameters*
 95
 96        - **json_name**: Boolean (default True) - if False use full_name else json_name
 97        - **info**: Boolean (default True) - if True add xdt.info in DataFrame.attrs
 98        - **dims**: list of string (default None) - order of dimensions full_name to apply
 99        '''
100        opt = {'json_name': True, 'info': True, 'dims': None} | kwargs
101        dic_name = {name: xdt[name].json_name if opt['json_name'] else xdt[name].full_name
102                    for name in xdt.names}
103        dims = xdt.dimensions if not opt['dims'] else tuple(opt['dims'])
104        fields = (xdt.group(dims) + xdt.group(xdt.coordinates) +
105                  xdt.group(xdt.data_vars) + xdt.uniques)
106        fields += tuple(nam for nam in xdt.group(xdt.data_arrays)
107                        if len(xdt[nam]) == xdt.length)
108        fields_array = tuple(var for var in fields if not xdt[var].uri)
109        dic_series = {dic_name[name]: PandasConnec._to_np_series(xdt, name, dims)
110                      for name in fields_array}
111        dfr = pd.DataFrame(dic_series)
112        index = [dic_name[name] for name in dims]
113        if index:
114            dfr = dfr.set_index(index)
115        if opt['info']:
116            dfr.attrs |= {'info': xdt.tab_info}
117            dfr.attrs |= {'metadata': {
118                name: xdt[name].meta for name in xdt.metadata}}
119            fields_uri = [var for var in fields if not var in fields_array]
120            fields_other = [nam for nam in xdt.group(xdt.data_arrays)
121                            if len(xdt[nam]) != xdt.length]
122            if fields_uri:
123                dfr.attrs |= {'fields': {nam: xdt[nam].to_json(noname=True,)
124                                         for nam in fields_uri + fields_other}}
125            if xdt.name:
126                dfr.attrs |= {'name': xdt.name}
127        return dfr

return a pd.DataFrame from a Xdataset

Parameters

  • json_name: Boolean (default True) - if False use full_name else json_name
  • info: Boolean (default True) - if True add xdt.info in DataFrame.attrs
  • dims: list of string (default None) - order of dimensions full_name to apply
@staticmethod
def ximport(df, Xclass, **kwargs):
129    @staticmethod
130    def ximport(df, Xclass, **kwargs):
131        '''return a Xdataset from a pd.DataFrame
132
133        *Parameters*
134
135        - dims: list of string (default None) - order of dimensions to apply
136        '''
137        opt = {'dims': None} | kwargs
138        xnd = []
139        dfr = df.reset_index()
140        if 'index' in dfr.columns and not 'index' in df.columns:
141            del dfr['index']
142        df_names = {Nutil.split_json_name(j_name)[0]: j_name
143                    for j_name in dfr.columns}
144        df_ntv_types = {Nutil.split_json_name(j_name)[0]:
145                        Nutil.split_json_name(j_name)[1] for j_name in dfr.columns}
146        dfr.columns = [Nutil.split_json_name(name)[0] for name in dfr.columns]
147        if dfr.attrs.get('metadata'):
148            for name, meta in dfr.attrs['metadata'].items():
149                xnd += [Xndarray.read_json({name: meta})]
150        if dfr.attrs.get('fields'):
151            for name, jsn in dfr.attrs['fields'].items():
152                xnd += [Xndarray.read_json({name: jsn})]
153        if dfr.attrs.get('info'):
154            dimensions = dfr.attrs['info']['dimensions']
155            data = dfr.attrs['info']['data']
156        else:
157            dimensions, data = PandasConnec._ximport_analysis(dfr, opt['dims'])
158        shape_dfr = [data[dim]['shape'][0]
159                     for dim in dimensions] if dimensions else len(dfr)
160        dfr = dfr.sort_values(dimensions)
161        for name in df_names:
162            xnd += [PandasConnec._ximport_series(data, name, dfr, dimensions,
163                                                 shape_dfr, df_ntv_types, **opt)]
164        return Xclass(xnd, dfr.attrs.get('name')).to_canonical()

return a Xdataset from a pd.DataFrame

Parameters

  • dims: list of string (default None) - order of dimensions to apply
class XarrayConnec:
294class XarrayConnec:
295    ''' Xarray interface with two static methods ximport and xexport'''
296
297    @staticmethod
298    def xexport(xdt, **kwargs):
299        '''return a xr.DataArray or a xr.Dataset from a Xdataset
300
301        *Parameters*
302
303        - **dataset** : Boolean (default True) - if False and a single data_var,
304        return a sc.DataArray
305        - **datagroup** : Boolean (default True) - if True, return a sc.DataGroup
306        which contains the sc.DataArray/sc.Dataset and the other data else only
307        sc.DataArray/sc.Dataset
308        '''
309        option = {'dataset': True, 'datagroup': True} | kwargs
310        coords = XarrayConnec._to_xr_vars(
311            xdt, xdt.dimensions + xdt.coordinates + xdt.uniques)
312        coords |= XarrayConnec._to_xr_vars(xdt, xdt.additionals)
313        attrs = XarrayConnec._to_xr_attrs(xdt, **option)
314        if len(xdt.data_vars) == 1 and not option['dataset']:
315            var_name = xdt.data_vars[0]
316            data = xdt.to_ndarray(var_name)
317            dims = xdt.dims(var_name)
318            attrs |= {'ntv_type': xdt[var_name].nda.ntv_type}
319            attrs |= xdt[var_name].meta if xdt[var_name].meta else {}
320            name = var_name if var_name != 'data' else None
321            xrd = xr.DataArray(data=data, coords=coords, dims=dims, attrs=attrs,
322                               name=name)
323        else:
324            data_vars = XarrayConnec._to_xr_vars(xdt, xdt.data_vars)
325            xrd = xr.Dataset(data_vars, coords=coords, attrs=attrs)
326        for unic in xdt.uniques:
327            xrd[unic].attrs |= {'ntv_type': xdt[unic].ntv_type} | (
328                xdt[unic].meta if xdt[unic].meta else {})
329        return xrd
330
331    @staticmethod
332    def ximport(xar, Xclass, **kwargs):
333        '''return a Xdataset from a xr.DataArray or a xr.Dataset'''
334        xnd = []
335        if xar.attrs:
336            attrs = {k: v for k, v in xar.attrs.items() if not k in [
337                'name', 'ntv_type']}
338            for name, meta in attrs.items():
339                if isinstance(meta, list):
340                    xnd += [Xndarray.read_json({name: meta})]
341                else:
342                    xnd += [Xndarray(name, meta=meta)]
343        for coord in xar.coords:
344            xnd += [XarrayConnec._var_xr_to_xnd(xar.coords[coord])]
345            if list(xar.coords[coord].dims) == list(xar.dims) and isinstance(xar, xr.Dataset):
346                xnd[-1].links = [list(xar.data_vars)[0]]
347        if isinstance(xar, xr.DataArray):
348            var = XarrayConnec._var_xr_to_xnd(
349                xar, name='data', add_attrs=False)
350            xnd += [XarrayConnec._var_xr_to_xnd(xar,
351                                                name='data', add_attrs=False)]
352            xdt = Xclass(xnd, xar.attrs.get('name'))
353            for var in xdt.data_vars:
354                if var != xar.name and xar.name:
355                    xdt[var].links = [xar.name]
356            return xdt.to_canonical()
357        for var in xar.data_vars:
358            xnd += [XarrayConnec._var_xr_to_xnd(xar.data_vars[var])]
359        return Xclass(xnd, xar.attrs.get('name')).to_canonical()
360
361    @staticmethod
362    def _var_xr_to_xnd(var, name=None, add_attrs=True):
363        '''return a Xndarray from a Xarray variable
364
365        *Parameters*
366
367        - **var** : Xarray variable to convert in Xndarray,
368        - **name** : string (default None) - default name if var have no name,
369        - **add_attrs** : boolean (default True) - if False, attrs are not converted
370        '''
371        full_name = var.name if var.name else name
372        name = Nutil.split_name(full_name)[0]
373        dims = None if var.dims == (name,) or var.size == 1 else list(var.dims)
374        ntv_type = var.attrs.get('ntv_type')
375        nda = var.values
376        nda = nda.reshape(1) if not nda.shape else nda
377        if nda.dtype.name == 'datetime64[ns]' and ntv_type:
378            nda = Nutil.convert(ntv_type, nda, tojson=False)
379        attrs = {k: v for k, v in var.attrs.items()
380                 if not k in ['ntv_type', 'name']} if add_attrs else {}
381        return Xndarray(full_name, Ndarray(nda, ntv_type), dims, attrs)
382
383    @staticmethod
384    def _to_xr_attrs(xdt, **option):
385        '''return a dict with attributes from a Xdataset
386
387        *Parameters*
388
389        - **datagroup** : Boolean  if True, add json representation of 'relative'
390        Xndarrays and 'data_arrays' Xndarrays
391        '''
392        attrs = {meta: xdt[meta].meta for meta in xdt.metadata}
393        attrs |= {'name': xdt.name} if xdt.name else {}
394        if option['datagroup']:
395            for name in xdt.names:
396                if xdt[name].mode == 'relative':
397                    attrs |= xdt[name].to_json(header=False)
398            for name in xdt.data_arrays:
399                attrs |= xdt[name].to_json(header=False)
400        return attrs
401
402    @staticmethod
403    def _to_xr_coord(xdt, name):
404        '''return a dict with Xarray attributes from a Xndarray defined by his name'''
405        data = xdt.to_ndarray(name)
406        if name in xdt.uniques:
407            return {name: data[0]}
408        if name in xdt.additionals and not xdt[name].links:
409            data = data.reshape(xdt.shape_dims(xdt[name].name))
410        dims = tuple(xdt.dims(name)) if xdt.dims(name) else (xdt[name].name)
411        meta = {'ntv_type': xdt[name].ntv_type} | (
412            xdt[name].meta if xdt[name].meta else {})
413        return {name: (dims, data, meta)}
414
415    @staticmethod
416    def _to_xr_vars(xdt, list_names):
417        '''return a dict with Xarray attributes from a list of Xndarray names'''
418        arg_vars = {}
419        valid_names = [
420            nam for nam in list_names if xdt[nam].mode == 'absolute']
421        for xnd_name in valid_names:
422            arg_vars |= XarrayConnec._to_xr_coord(xdt, xnd_name)
423        for name in list_names:
424            if xdt[name].xtype == 'meta':
425                arg_vars |= {name: xdt[name].meta}
426        return arg_vars
427
428    @staticmethod
429    def _xr_add_type(xar):
430        '''add 'ntv_type' as attribute for a xr.DataArray'''
431        if isinstance(xar, xr.DataArray) and not 'ntv_type' in xar.attrs:
432            xar.attrs |= {'ntv_type': Nutil.ntv_type(xar.data.dtype.name)}
433            return
434        for coord in xar.coords:
435            XarrayConnec._xr_add_type(coord)
436        for var in xar.data_vars:
437            XarrayConnec._xr_add_type(var)
438        return

Xarray interface with two static methods ximport and xexport

@staticmethod
def xexport(xdt, **kwargs):
297    @staticmethod
298    def xexport(xdt, **kwargs):
299        '''return a xr.DataArray or a xr.Dataset from a Xdataset
300
301        *Parameters*
302
303        - **dataset** : Boolean (default True) - if False and a single data_var,
304        return a sc.DataArray
305        - **datagroup** : Boolean (default True) - if True, return a sc.DataGroup
306        which contains the sc.DataArray/sc.Dataset and the other data else only
307        sc.DataArray/sc.Dataset
308        '''
309        option = {'dataset': True, 'datagroup': True} | kwargs
310        coords = XarrayConnec._to_xr_vars(
311            xdt, xdt.dimensions + xdt.coordinates + xdt.uniques)
312        coords |= XarrayConnec._to_xr_vars(xdt, xdt.additionals)
313        attrs = XarrayConnec._to_xr_attrs(xdt, **option)
314        if len(xdt.data_vars) == 1 and not option['dataset']:
315            var_name = xdt.data_vars[0]
316            data = xdt.to_ndarray(var_name)
317            dims = xdt.dims(var_name)
318            attrs |= {'ntv_type': xdt[var_name].nda.ntv_type}
319            attrs |= xdt[var_name].meta if xdt[var_name].meta else {}
320            name = var_name if var_name != 'data' else None
321            xrd = xr.DataArray(data=data, coords=coords, dims=dims, attrs=attrs,
322                               name=name)
323        else:
324            data_vars = XarrayConnec._to_xr_vars(xdt, xdt.data_vars)
325            xrd = xr.Dataset(data_vars, coords=coords, attrs=attrs)
326        for unic in xdt.uniques:
327            xrd[unic].attrs |= {'ntv_type': xdt[unic].ntv_type} | (
328                xdt[unic].meta if xdt[unic].meta else {})
329        return xrd

return a xr.DataArray or a xr.Dataset from a Xdataset

Parameters

  • dataset : Boolean (default True) - if False and a single data_var, return a sc.DataArray
  • datagroup : Boolean (default True) - if True, return a sc.DataGroup which contains the sc.DataArray/sc.Dataset and the other data else only sc.DataArray/sc.Dataset
@staticmethod
def ximport(xar, Xclass, **kwargs):
331    @staticmethod
332    def ximport(xar, Xclass, **kwargs):
333        '''return a Xdataset from a xr.DataArray or a xr.Dataset'''
334        xnd = []
335        if xar.attrs:
336            attrs = {k: v for k, v in xar.attrs.items() if not k in [
337                'name', 'ntv_type']}
338            for name, meta in attrs.items():
339                if isinstance(meta, list):
340                    xnd += [Xndarray.read_json({name: meta})]
341                else:
342                    xnd += [Xndarray(name, meta=meta)]
343        for coord in xar.coords:
344            xnd += [XarrayConnec._var_xr_to_xnd(xar.coords[coord])]
345            if list(xar.coords[coord].dims) == list(xar.dims) and isinstance(xar, xr.Dataset):
346                xnd[-1].links = [list(xar.data_vars)[0]]
347        if isinstance(xar, xr.DataArray):
348            var = XarrayConnec._var_xr_to_xnd(
349                xar, name='data', add_attrs=False)
350            xnd += [XarrayConnec._var_xr_to_xnd(xar,
351                                                name='data', add_attrs=False)]
352            xdt = Xclass(xnd, xar.attrs.get('name'))
353            for var in xdt.data_vars:
354                if var != xar.name and xar.name:
355                    xdt[var].links = [xar.name]
356            return xdt.to_canonical()
357        for var in xar.data_vars:
358            xnd += [XarrayConnec._var_xr_to_xnd(xar.data_vars[var])]
359        return Xclass(xnd, xar.attrs.get('name')).to_canonical()

return a Xdataset from a xr.DataArray or a xr.Dataset

class ScippConnec:
441class ScippConnec:
442    ''' Scipp interface with two static methods ximport and xexport'''
443
444    SCTYPE_DTYPE = {'string': 'str'}
445
446    @staticmethod
447    def xexport(xdt, **kwargs):
448        '''return a sc.DataArray or a sc.Dataset from a xdataset
449
450        *Parameters*
451
452        - **dataset** : Boolean (default True) - if False and a single data_var,
453        return a DataArray
454        - **datagroup** : Boolean (default True) - if True return a DataGroup with
455        metadata and data_arrays
456        - **ntv_type** : Boolean (default True) - if True add ntv-type to the name
457        '''
458        option = {'dataset': True, 'datagroup': True,
459                  'ntv_type': True} | kwargs
460        coords = dict([ScippConnec._to_scipp_var(xdt, name, **option)
461                       for name in xdt.coordinates + xdt.dimensions + xdt.uniques
462                       if xdt[name].mode == 'absolute'])
463        scd = sc.Dataset(dict([ScippConnec._to_sc_dataarray(xdt, name, coords, **option)
464                               for name in xdt.data_vars
465                               if xdt[name].mode == 'absolute']))
466        scd = scd if option['dataset'] else scd[list(scd)[0]]
467        if not option['datagroup']:
468            return scd
469        sc_name = xdt.name if xdt.name else 'no_name'
470        return sc.DataGroup({sc_name: scd} | ScippConnec._to_scipp_grp(xdt, **option))
471
472    @staticmethod
473    def ximport(sc_obj, Xclass, **kwargs):
474        '''return a xdataset from a scipp object DataArray, Dataset or DataGroup'''
475        xnd = []
476        scd = sc_obj
477        xnd_name = None
478        if isinstance(sc_obj, sc.DataGroup):
479            for obj in sc_obj:
480                if isinstance(sc_obj[obj], (sc.Dataset, sc.DataArray)):
481                    scd = sc_obj[obj]
482                    xnd_name = obj
483                    break
484        if isinstance(scd, sc.DataArray):
485            scd = sc.Dataset({(scd.name if scd.name else 'no_name'): scd})
486        if isinstance(scd, sc.Dataset):
487            for coord in scd.coords:
488                xnd += ScippConnec._var_sc_to_xnd(
489                    scd.coords[coord], scd, coord)
490            for var in scd:
491                for mask in scd[var].masks:
492                    m_var = Nutil.split_json_name(var)[0]
493                    xnd += ScippConnec._var_sc_to_xnd(
494                        scd[var].masks[mask], scd, mask, m_var)
495                xnd += ScippConnec._var_sc_to_xnd(scd[var].data, scd, var)
496        if isinstance(sc_obj, sc.DataGroup):
497            xnd = ScippConnec._grp_sc_to_xnd(sc_obj, xnd)
498        return Xclass(xnd, xnd_name).to_canonical()
499
500    @staticmethod
501    def _grp_sc_to_xnd(sc_obj, xnd):
502        '''return a list of Xndarray from a scipp variable'''
503        dic_xnd = {xar.name: xar for xar in xnd}
504        for obj in sc_obj:
505            name, add_name = Nutil.split_name(obj)
506            match [name, add_name, sc_obj[obj]]:
507                case [name, None, list()]:
508                    xnd += [Xndarray.read_json({name: sc_obj[obj]})]
509                case [name, add_name, sc.Variable()]:
510                    xnd += ScippConnec._var_sc_to_xnd(
511                        sc_obj[obj], None, add_name, name)
512                case [name, _, dict() | str() | list()] if name in dic_xnd:
513                    if dic_xnd[name].meta:
514                        dic_xnd[name].meta |= sc_obj[obj]
515                    else:
516                        dic_xnd[name].meta = sc_obj[obj]
517                case [name, _, dict() | str() | list()]:
518                    xnd += [Xndarray.read_json({name: sc_obj[obj]})]
519                case [_, _, _]: ...
520        return xnd
521
522    @staticmethod
523    def _var_sc_to_xnd(scv, scd=None, sc_name='', var=None):
524        '''return a list of Xndarray from a scipp variable
525        - scd : scipp dataset
526        - scv : scipp variable
527        - var : name
528        - sc_name : scipp name'''
529        l_xnda = []
530        unit = scv.unit.name if scv.unit and not scv.unit in [
531            'dimensionless', 'ns'] else ''
532        ext_name, typ1 = Nutil.split_json_name(sc_name, True)
533        var_name, typ2 = Nutil.split_json_name(var, True)
534        full_name = var_name + \
535            ('.' if var_name and ext_name else '') + ext_name
536        ntv_type_base = typ1 + typ2
537        ntv_type = ntv_type_base + ('[' + unit + ']' if unit else '')
538        links = [Nutil.split_json_name(jsn)[0] for jsn in scv.dims]
539        if not scd is None and sc_name in scd.coords and scv.dims == scd.dims:
540            links = [Nutil.split_json_name(list(scd)[0])[0]]
541        if not scv.variances is None:
542            nda = Ndarray(scv.variances, ntv_type_base)
543            l_xnda.append(Xndarray(full_name + '.variance', nda, links))
544        nda = Ndarray(scv.values, ntv_type, str_uri=False)
545        shape = scv.shape if scv.shape else (1,)
546        nda.set_shape(shape)
547        l_xnda.append(Xndarray(full_name, nda, links))
548        return l_xnda
549
550    @staticmethod
551    def _to_sc_dataarray(xdt, name, coords, **option):
552        '''return a scipp.DataArray from a xdataset.global_var defined by his name'''
553        scipp_name, data = ScippConnec._to_scipp_var(xdt, name, **option)
554        masks = dict([ScippConnec._to_scipp_var(xdt, nam, **option)
555                     for nam in set(xdt.group(name)) & set(xdt.masks)])
556        return (scipp_name, sc.DataArray(data, coords=coords, masks=masks))
557
558    @staticmethod
559    def _to_scipp_grp(xdt, **option):
560        '''return a dict with metadata, data-array and data_add from a xdataset'''
561        grp = {}
562        grp |= dict([ScippConnec._to_scipp_var(xdt, name, **option)
563                     for name in xdt.data_add + xdt.data_arrays
564                     if xdt[name].add_name != 'variance'])
565        opt_mask = option | {'grp_mask': True}
566        grp |= dict([ScippConnec._to_scipp_var(xdt, name, **opt_mask)
567                     for name in xdt.masks
568                     if xdt[name].name in xdt.names and not xdt[name].name in xdt.data_vars])
569        grp |= {name + '.meta': xdt[name].meta for name in xdt.names
570                if xdt[name].meta}
571        for name in xdt.names:
572            if xdt[name].mode == 'relative':
573                grp |= xdt[name].to_json(header=False)
574        return grp
575
576    @staticmethod
577    def _to_scipp_var(xdt, name, **kwargs):
578        '''return a scipp.Variable from a Xndarray defined by his name'''
579        option = {'grp_mask': False, 'ntv_type': True} | kwargs
580        simple_type, unit = Nutil.split_type(xdt[name].ntv_type)
581        unit = unit if unit else ''
582        add_name = Nutil.split_name(name)[1]
583        new_n = add_name if name in xdt.masks and not option['grp_mask'] else name
584        opt_n = option['ntv_type']
585        scipp_name = new_n + (':' + simple_type if opt_n else '')
586        if name in xdt.uniques:
587            return (scipp_name, sc.scalar(xdt[name].darray[0], unit=unit))
588        vari_name = name + '.variance'
589        variances = xdt[vari_name].darray if vari_name in xdt.names else None
590        dims = xdt.dims(name, opt_n) if xdt.dims(
591            name, opt_n) else [xdt[name].name]
592        var = sc.array(dims=['flat'], values=xdt.to_darray(
593            name), variances=variances, unit=unit)
594        var = sc.fold(var, dim='flat', sizes=dict(zip(dims, xdt[name].shape)))
595        return (scipp_name, var)

Scipp interface with two static methods ximport and xexport

SCTYPE_DTYPE = {'string': 'str'}
@staticmethod
def xexport(xdt, **kwargs):
446    @staticmethod
447    def xexport(xdt, **kwargs):
448        '''return a sc.DataArray or a sc.Dataset from a xdataset
449
450        *Parameters*
451
452        - **dataset** : Boolean (default True) - if False and a single data_var,
453        return a DataArray
454        - **datagroup** : Boolean (default True) - if True return a DataGroup with
455        metadata and data_arrays
456        - **ntv_type** : Boolean (default True) - if True add ntv-type to the name
457        '''
458        option = {'dataset': True, 'datagroup': True,
459                  'ntv_type': True} | kwargs
460        coords = dict([ScippConnec._to_scipp_var(xdt, name, **option)
461                       for name in xdt.coordinates + xdt.dimensions + xdt.uniques
462                       if xdt[name].mode == 'absolute'])
463        scd = sc.Dataset(dict([ScippConnec._to_sc_dataarray(xdt, name, coords, **option)
464                               for name in xdt.data_vars
465                               if xdt[name].mode == 'absolute']))
466        scd = scd if option['dataset'] else scd[list(scd)[0]]
467        if not option['datagroup']:
468            return scd
469        sc_name = xdt.name if xdt.name else 'no_name'
470        return sc.DataGroup({sc_name: scd} | ScippConnec._to_scipp_grp(xdt, **option))

return a sc.DataArray or a sc.Dataset from a xdataset

Parameters

  • dataset : Boolean (default True) - if False and a single data_var, return a DataArray
  • datagroup : Boolean (default True) - if True return a DataGroup with metadata and data_arrays
  • ntv_type : Boolean (default True) - if True add ntv-type to the name
@staticmethod
def ximport(sc_obj, Xclass, **kwargs):
472    @staticmethod
473    def ximport(sc_obj, Xclass, **kwargs):
474        '''return a xdataset from a scipp object DataArray, Dataset or DataGroup'''
475        xnd = []
476        scd = sc_obj
477        xnd_name = None
478        if isinstance(sc_obj, sc.DataGroup):
479            for obj in sc_obj:
480                if isinstance(sc_obj[obj], (sc.Dataset, sc.DataArray)):
481                    scd = sc_obj[obj]
482                    xnd_name = obj
483                    break
484        if isinstance(scd, sc.DataArray):
485            scd = sc.Dataset({(scd.name if scd.name else 'no_name'): scd})
486        if isinstance(scd, sc.Dataset):
487            for coord in scd.coords:
488                xnd += ScippConnec._var_sc_to_xnd(
489                    scd.coords[coord], scd, coord)
490            for var in scd:
491                for mask in scd[var].masks:
492                    m_var = Nutil.split_json_name(var)[0]
493                    xnd += ScippConnec._var_sc_to_xnd(
494                        scd[var].masks[mask], scd, mask, m_var)
495                xnd += ScippConnec._var_sc_to_xnd(scd[var].data, scd, var)
496        if isinstance(sc_obj, sc.DataGroup):
497            xnd = ScippConnec._grp_sc_to_xnd(sc_obj, xnd)
498        return Xclass(xnd, xnd_name).to_canonical()

return a xdataset from a scipp object DataArray, Dataset or DataGroup