ntv-numpy.ntv_numpy.xconnector
@author: Philippe@loco-labs.io
The xconnector
module is part of the ntv-numpy.ntv_numpy
package (specification document).
It contains the XarrayConnec
class for the Xarray interface and the ScippConnec
class for Scipp interface.
For more information, see the user guide or the github repository.
1# -*- coding: utf-8 -*- 2""" 3@author: Philippe@loco-labs.io 4 5The `xconnector` module is part of the `ntv-numpy.ntv_numpy` package ([specification document]( 6https://loco-philippe.github.io/ES/JSON%20semantic%20format%20(JSON-NTV).htm)). 7 8It contains the `XarrayConnec` class for the Xarray interface and the `ScippConnec` 9 class for Scipp interface. 10 11For more information, see the 12[user guide](https://loco-philippe.github.io/ntv-numpy/docs/user_guide.html) 13 or the [github repository](https://github.com/loco-philippe/ntv-numpy). 14""" 15 16 17import xarray as xr 18import scipp as sc 19from astropy import wcs 20from astropy.nddata import NDData 21from astropy.nddata.nduncertainty import StdDevUncertainty, VarianceUncertainty 22from astropy.nddata.nduncertainty import InverseVariance 23from ntv_numpy.ndarray import Nutil, Ndarray 24from ntv_numpy.xndarray import Xndarray 25 26 27class AstropyNDDataConnec: 28 ''' NDData interface with two static methods ximport and xexport''' 29 30 @staticmethod 31 def xexport(xdt, **kwargs): 32 '''return a NDData from a Xdataset''' 33 data = xdt['data'].ndarray 34 mask = xdt['data.mask'].ndarray 35 unit = xdt['data'].nda.ntvtype.extension 36 uncert = xdt['data.uncertainty'].ndarray 37 typ_u = xdt['data.uncertainty'].nda.ntvtype.extension 38 match typ_u: 39 case 'std': 40 uncertainty = StdDevUncertainty(uncert) 41 case 'var': 42 uncertainty = VarianceUncertainty(uncert) 43 case 'inv': 44 uncertainty = InverseVariance(uncert) 45 case _: 46 uncertainty = uncert 47 meta = xdt['meta'].meta | {'name': xdt.name} 48 wcs_dic = xdt['wcs'].meta 49 psf = xdt['psf'].ndarray 50 return NDData(data, mask=mask, unit=unit, uncertainty=uncertainty, 51 meta=meta, wcs=wcs.WCS(wcs_dic), psf=psf) 52 53 @staticmethod 54 def ximport(ndd, Xclass, **kwargs): 55 '''return a Xdataset from a astropy.NDData''' 56 xnd = [] 57 name = 'no_name' 58 unit = ndd.unit.to_string() if not ndd.unit is None else None 59 ntv_type = Nutil.ntv_type(ndd.data.dtype.name, ext=unit) 60 xnd += [Xndarray('data', nda=Ndarray(ndd.data, ntv_type=ntv_type))] 61 if ndd.meta: 62 meta = {key: val for key, val in ndd.meta.items() if key != 'name'} 63 name = ndd.meta.get('name', 'no_name') 64 xnd += [Xndarray('meta', meta=meta)] 65 if ndd.wcs: 66 xnd += [Xndarray('wcs', meta=dict(ndd.wcs.to_header()))] 67 if not ndd.psf is None: 68 xnd += [Xndarray('psf', nda=Ndarray(ndd.psf, ntv_type=ntv_type))] 69 if not ndd.mask is None: 70 xnd += [Xndarray('data.mask', nda=ndd.mask)] 71 if not ndd.uncertainty is None: 72 typ_u = ndd.uncertainty.__class__.__name__[:3].lower() 73 ntv_type = Nutil.ntv_type( 74 ndd.uncertainty.array.dtype.name, ext=typ_u) 75 nda = Ndarray(ndd.uncertainty.array, ntv_type=ntv_type) 76 xnd += [Xndarray('data.uncertainty', nda=nda)] 77 return Xclass(xnd, name).to_canonical() 78 79 80class XarrayConnec: 81 ''' Xarray interface with two static methods ximport and xexport''' 82 83 @staticmethod 84 def xexport(xdt, **kwargs): 85 '''return a xr.DataArray or a xr.Dataset from a Xdataset 86 87 *Parameters* 88 89 - **dataset** : Boolean (default True) - if False and a single data_var, 90 return a sc.DataArray 91 - **datagroup** : Boolean (default True) - if True, return a sc.DataGroup 92 which contains the sc.DataArray/sc.Dataset and the other data else only 93 sc.DataArray/sc.Dataset 94 ''' 95 option = {'dataset': True, 'datagroup': True} | kwargs 96 coords = XarrayConnec._to_xr_vars( 97 xdt, xdt.dimensions + xdt.coordinates) 98 coords |= XarrayConnec._to_xr_vars(xdt, xdt.additionals) 99 attrs = XarrayConnec._to_xr_attrs(xdt, **option) 100 if len(xdt.data_vars) == 1 and not option['dataset']: 101 var_name = xdt.data_vars[0] 102 data = xdt.to_ndarray(var_name) 103 dims = xdt.dims(var_name) 104 attrs |= {'ntv_type': xdt[var_name].nda.ntv_type} 105 attrs |= xdt[var_name].meta if xdt[var_name].meta else {} 106 name = var_name if var_name != 'data' else None 107 return xr.DataArray(data=data, coords=coords, dims=dims, attrs=attrs, 108 name=name) 109 data_vars = XarrayConnec._to_xr_vars(xdt, xdt.data_vars) 110 xrd = xr.Dataset(data_vars, coords=coords, attrs=attrs) 111 return xrd 112 113 @staticmethod 114 def ximport(xar, Xclass, **kwargs): 115 '''return a Xdataset from a xr.DataArray or a xr.Dataset''' 116 xnd = [] 117 if xar.attrs: 118 attrs = {k: v for k, v in xar.attrs.items() if not k in [ 119 'name', 'ntv_type']} 120 for name, meta in attrs.items(): 121 if isinstance(meta, list): 122 xnd += [Xndarray.read_json({name: meta})] 123 else: 124 xnd += [Xndarray(name, meta=meta)] 125 for coord in xar.coords: 126 xnd += [XarrayConnec._var_xr_to_xnd(xar.coords[coord])] 127 if list(xar.coords[coord].dims) == list(xar.dims) and isinstance(xar, xr.Dataset): 128 xnd[-1].links = [list(xar.data_vars)[0]] 129 if isinstance(xar, xr.DataArray): 130 var = XarrayConnec._var_xr_to_xnd( 131 xar, name='data', add_attrs=False) 132 xnd += [XarrayConnec._var_xr_to_xnd(xar, 133 name='data', add_attrs=False)] 134 xdt = Xclass(xnd, xar.attrs.get('name')) 135 for var in xdt.data_vars: 136 if var != xar.name and xar.name: 137 xdt[var].links = [xar.name] 138 return xdt.to_canonical() 139 for var in xar.data_vars: 140 xnd += [XarrayConnec._var_xr_to_xnd(xar.data_vars[var])] 141 return Xclass(xnd, xar.attrs.get('name')).to_canonical() 142 143 @staticmethod 144 def _var_xr_to_xnd(xar, name=None, add_attrs=True): 145 '''return a Xndarray from a Xarray variable 146 147 *Parameters* 148 149 - **xar** : Xarray variable to convert in Xndarray, 150 - **name** : string (default None) - default name if xar have non name, 151 - **add_attrs** : boolean (default True) - if False, attrs are not converted 152 ''' 153 full_name = xar.name if xar.name else name 154 name = Nutil.split_name(full_name)[0] 155 dims = None if xar.dims == (name,) else list(xar.dims) 156 ntv_type = xar.attrs.get('ntv_type') 157 nda = xar.values 158 if nda.dtype.name == 'datetime64[ns]' and ntv_type: 159 nda = Nutil.convert(ntv_type, nda, tojson=False) 160 attrs = {k: v for k, v in xar.attrs.items() 161 if not k in ['ntv_type', 'name']} if add_attrs else {} 162 return Xndarray(full_name, Ndarray(nda, ntv_type), dims, attrs) 163 164 @staticmethod 165 def _to_xr_attrs(xdt, **option): 166 '''return a dict with attributes from a Xdataset 167 168 *Parameters* 169 170 - **datagroup** : Boolean if True, add json representation of 'relative' 171 Xndarrays and 'data_arrays' Xndarrays 172 ''' 173 attrs = {meta: xdt[meta].meta for meta in xdt.metadata} 174 attrs |= {'name': xdt.name} if xdt.name else {} 175 if option['datagroup']: 176 for name in xdt.names: 177 if xdt[name].mode == 'relative': 178 attrs |= xdt[name].to_json(header=False) 179 for name in xdt.data_arrays: 180 attrs |= xdt[name].to_json(header=False) 181 return attrs 182 183 @staticmethod 184 def _to_xr_coord(xdt, name): 185 '''return a dict with Xarray attributes from a Xndarray defined by his name''' 186 data = xdt.to_ndarray(name) 187 if name in xdt.additionals and not xdt[name].links: 188 data = data.reshape(xdt.shape_dims(xdt[name].name)) 189 dims = tuple(xdt.dims(name)) if xdt.dims(name) else (xdt[name].name) 190 meta = {'ntv_type': xdt[name].ntv_type} | ( 191 xdt[name].meta if xdt[name].meta else {}) 192 return {name: (dims, data, meta)} 193 194 @staticmethod 195 def _to_xr_vars(xdt, list_names): 196 '''return a dict with Xarray attributes from a list of Xndarray names''' 197 arg_vars = {} 198 valid_names = [ 199 name for name in list_names if xdt[name].mode == 'absolute'] 200 for xnd_name in valid_names: 201 arg_vars |= XarrayConnec._to_xr_coord(xdt, xnd_name) 202 return arg_vars 203 204 @staticmethod 205 def _xr_add_type(xar): 206 '''add 'ntv_type' as attribute for a xr.DataArray''' 207 if isinstance(xar, xr.DataArray) and not 'ntv_type' in xar.attrs: 208 xar.attrs |= {'ntv_type': Nutil.ntv_type(xar.data.dtype.name)} 209 return 210 for coord in xar.coords: 211 XarrayConnec._xr_add_type(coord) 212 for var in xar.data_vars: 213 XarrayConnec._xr_add_type(var) 214 return 215 216 217class ScippConnec: 218 ''' Scipp interface with two static methods ximport and xexport''' 219 220 SCTYPE_DTYPE = {'string': 'str'} 221 222 @staticmethod 223 def xexport(xdt, **kwargs): 224 '''return a sc.DataArray or a sc.Dataset from a xdataset 225 226 *Parameters* 227 228 - **dataset** : Boolean (default True) - if False and a single data_var, 229 return a DataArray 230 - **datagroup** : Boolean (default True) - if True return a DataGroup with 231 metadata and data_arrays 232 - **ntv_type** : Boolean (default True) - if True add ntv-type to the name 233 ''' 234 option = {'dataset': True, 'datagroup': True, 235 'ntv_type': True} | kwargs 236 coords = dict([ScippConnec._to_scipp_var(xdt, name, **option) 237 for name in xdt.coordinates + xdt.dimensions 238 if xdt[name].mode == 'absolute']) 239 scd = sc.Dataset(dict([ScippConnec._to_sc_dataarray(xdt, name, coords, **option) 240 for name in xdt.data_vars 241 if xdt[name].mode == 'absolute'])) 242 scd = scd if option['dataset'] else scd[list(scd)[0]] 243 if not option['datagroup']: 244 return scd 245 sc_name = xdt.name if xdt.name else 'no_name' 246 return sc.DataGroup({sc_name: scd} | ScippConnec._to_scipp_grp(xdt, **option)) 247 248 @staticmethod 249 def ximport(sc_obj, Xclass, **kwargs): 250 '''return a xdataset from a scipp object DataArray, Dataset or DataGroup''' 251 xnd = [] 252 scd = sc_obj 253 xnd_name = None 254 if isinstance(sc_obj, sc.DataGroup): 255 for obj in sc_obj: 256 if isinstance(sc_obj[obj], (sc.Dataset, sc.DataArray)): 257 scd = sc_obj[obj] 258 xnd_name = obj 259 break 260 if isinstance(scd, sc.DataArray): 261 scd = sc.Dataset({(scd.name if scd.name else 'no_name'): scd}) 262 if isinstance(scd, sc.Dataset): 263 for coord in scd.coords: 264 xnd += ScippConnec._var_sc_to_xnd( 265 scd.coords[coord], scd, coord) 266 for var in scd: 267 for mask in scd[var].masks: 268 m_var = Nutil.split_json_name(var)[0] 269 xnd += ScippConnec._var_sc_to_xnd( 270 scd[var].masks[mask], scd, mask, m_var) 271 xnd += ScippConnec._var_sc_to_xnd(scd[var].data, scd, var) 272 if isinstance(sc_obj, sc.DataGroup): 273 xnd = ScippConnec._grp_sc_to_xnd(sc_obj, xnd) 274 return Xclass(xnd, xnd_name).to_canonical() 275 276 @staticmethod 277 def _grp_sc_to_xnd(sc_obj, xnd): 278 '''return a list of Xndarray from a scipp variable''' 279 dic_xnd = {xar.name: xar for xar in xnd} 280 for obj in sc_obj: 281 name, add_name = Nutil.split_name(obj) 282 match [name, add_name, sc_obj[obj]]: 283 case [name, None, list()]: 284 xnd += [Xndarray.read_json({name: sc_obj[obj]})] 285 case [name, add_name, sc.Variable()]: 286 xnd += ScippConnec._var_sc_to_xnd( 287 sc_obj[obj], None, add_name, name) 288 case [name, _, dict() | str() | list()] if name in dic_xnd: 289 if dic_xnd[name].meta: 290 dic_xnd[name].meta |= sc_obj[obj] 291 else: 292 dic_xnd[name].meta = sc_obj[obj] 293 case [name, _, dict() | str() | list()]: 294 xnd += [Xndarray.read_json({name: sc_obj[obj]})] 295 case [_, _, _]: ... 296 return xnd 297 298 @staticmethod 299 def _var_sc_to_xnd(scv, scd=None, sc_name='', var=None): 300 '''return a list of Xndarray from a scipp variable 301 - var : name 302 - sc_name : scipp name''' 303 l_xnda = [] 304 unit = scv.unit.name if scv.unit and not scv.unit in [ 305 'dimensionless', 'ns'] else '' 306 ext_name, typ1 = Nutil.split_json_name(sc_name, True) 307 var_name, typ2 = Nutil.split_json_name(var, True) 308 full_name = var_name + \ 309 ('.' if var_name and ext_name else '') + ext_name 310 ntv_type_base = typ1 + typ2 311 ntv_type = ntv_type_base + ('[' + unit + ']' if unit else '') 312 313 links = [Nutil.split_json_name(jsn)[0] for jsn in scv.dims] 314 if not scd is None and sc_name in scd.coords and scv.dims == scd.dims: 315 links = [Nutil.split_json_name(list(scd)[0])[0]] 316 if not scv.variances is None: 317 nda = Ndarray(scv.variances, ntv_type_base) 318 l_xnda.append(Xndarray(full_name + '.variance', nda, links)) 319 nda = Ndarray(scv.values, ntv_type) 320 l_xnda.append(Xndarray(full_name, nda, links)) 321 return l_xnda 322 323 @staticmethod 324 def _to_sc_dataarray(xdt, name, coords, **option): 325 '''return a scipp.DataArray from a xdataset.global_var defined by his name''' 326 scipp_name, data = ScippConnec._to_scipp_var(xdt, name, **option) 327 masks = dict([ScippConnec._to_scipp_var(xdt, nam, **option) 328 for nam in set(xdt.var_group(name)) & set(xdt.masks)]) 329 return (scipp_name, sc.DataArray(data, coords=coords, masks=masks)) 330 331 @staticmethod 332 def _to_scipp_grp(xdt, **option): 333 '''return a dict with metadata, data-array and data_add from a xdataset''' 334 grp = {} 335 grp |= dict([ScippConnec._to_scipp_var(xdt, name, **option) 336 for name in xdt.data_add + xdt.data_arrays 337 if xdt[name].add_name != 'variance']) 338 opt_mask = option | {'grp_mask': True} 339 grp |= dict([ScippConnec._to_scipp_var(xdt, name, **opt_mask) 340 for name in xdt.masks 341 if xdt[name].name in xdt.names and not xdt[name].name in xdt.data_vars]) 342 grp |= {name + '.meta': xdt[name].meta for name in xdt.names 343 if xdt[name].meta} 344 for name in xdt.names: 345 if xdt[name].mode == 'relative': 346 grp |= xdt[name].to_json(header=False) 347 return grp 348 349 @staticmethod 350 def _to_scipp_var(xdt, name, **kwargs): 351 '''return a scipp.Variable from a Xndarray defined by his name''' 352 option = {'grp_mask': False, 'ntv_type': True} | kwargs 353 add_name = Nutil.split_name(name)[1] 354 new_n = add_name if name in xdt.masks and not option['grp_mask'] else name 355 opt_n = option['ntv_type'] 356 values = xdt.to_ndarray(name) 357 vari_name = name + '.variance' 358 variances = xdt[vari_name].darray if vari_name in xdt.names else None 359 if not variances is None: 360 variances = variances.reshape(xdt.shape_dims(vari_name)) 361 dims = xdt.dims(name, opt_n) if xdt.dims( 362 name, opt_n) else [xdt[name].name] 363 simple_type, unit = Nutil.split_type(xdt[name].ntv_type) 364 scipp_name = new_n + (':' + simple_type if opt_n else '') 365 unit = unit if unit else '' 366 return (scipp_name, sc.array(dims=dims, values=values, 367 variances=variances, unit=unit))
28class AstropyNDDataConnec: 29 ''' NDData interface with two static methods ximport and xexport''' 30 31 @staticmethod 32 def xexport(xdt, **kwargs): 33 '''return a NDData from a Xdataset''' 34 data = xdt['data'].ndarray 35 mask = xdt['data.mask'].ndarray 36 unit = xdt['data'].nda.ntvtype.extension 37 uncert = xdt['data.uncertainty'].ndarray 38 typ_u = xdt['data.uncertainty'].nda.ntvtype.extension 39 match typ_u: 40 case 'std': 41 uncertainty = StdDevUncertainty(uncert) 42 case 'var': 43 uncertainty = VarianceUncertainty(uncert) 44 case 'inv': 45 uncertainty = InverseVariance(uncert) 46 case _: 47 uncertainty = uncert 48 meta = xdt['meta'].meta | {'name': xdt.name} 49 wcs_dic = xdt['wcs'].meta 50 psf = xdt['psf'].ndarray 51 return NDData(data, mask=mask, unit=unit, uncertainty=uncertainty, 52 meta=meta, wcs=wcs.WCS(wcs_dic), psf=psf) 53 54 @staticmethod 55 def ximport(ndd, Xclass, **kwargs): 56 '''return a Xdataset from a astropy.NDData''' 57 xnd = [] 58 name = 'no_name' 59 unit = ndd.unit.to_string() if not ndd.unit is None else None 60 ntv_type = Nutil.ntv_type(ndd.data.dtype.name, ext=unit) 61 xnd += [Xndarray('data', nda=Ndarray(ndd.data, ntv_type=ntv_type))] 62 if ndd.meta: 63 meta = {key: val for key, val in ndd.meta.items() if key != 'name'} 64 name = ndd.meta.get('name', 'no_name') 65 xnd += [Xndarray('meta', meta=meta)] 66 if ndd.wcs: 67 xnd += [Xndarray('wcs', meta=dict(ndd.wcs.to_header()))] 68 if not ndd.psf is None: 69 xnd += [Xndarray('psf', nda=Ndarray(ndd.psf, ntv_type=ntv_type))] 70 if not ndd.mask is None: 71 xnd += [Xndarray('data.mask', nda=ndd.mask)] 72 if not ndd.uncertainty is None: 73 typ_u = ndd.uncertainty.__class__.__name__[:3].lower() 74 ntv_type = Nutil.ntv_type( 75 ndd.uncertainty.array.dtype.name, ext=typ_u) 76 nda = Ndarray(ndd.uncertainty.array, ntv_type=ntv_type) 77 xnd += [Xndarray('data.uncertainty', nda=nda)] 78 return Xclass(xnd, name).to_canonical()
NDData interface with two static methods ximport and xexport
31 @staticmethod 32 def xexport(xdt, **kwargs): 33 '''return a NDData from a Xdataset''' 34 data = xdt['data'].ndarray 35 mask = xdt['data.mask'].ndarray 36 unit = xdt['data'].nda.ntvtype.extension 37 uncert = xdt['data.uncertainty'].ndarray 38 typ_u = xdt['data.uncertainty'].nda.ntvtype.extension 39 match typ_u: 40 case 'std': 41 uncertainty = StdDevUncertainty(uncert) 42 case 'var': 43 uncertainty = VarianceUncertainty(uncert) 44 case 'inv': 45 uncertainty = InverseVariance(uncert) 46 case _: 47 uncertainty = uncert 48 meta = xdt['meta'].meta | {'name': xdt.name} 49 wcs_dic = xdt['wcs'].meta 50 psf = xdt['psf'].ndarray 51 return NDData(data, mask=mask, unit=unit, uncertainty=uncertainty, 52 meta=meta, wcs=wcs.WCS(wcs_dic), psf=psf)
return a NDData from a Xdataset
54 @staticmethod 55 def ximport(ndd, Xclass, **kwargs): 56 '''return a Xdataset from a astropy.NDData''' 57 xnd = [] 58 name = 'no_name' 59 unit = ndd.unit.to_string() if not ndd.unit is None else None 60 ntv_type = Nutil.ntv_type(ndd.data.dtype.name, ext=unit) 61 xnd += [Xndarray('data', nda=Ndarray(ndd.data, ntv_type=ntv_type))] 62 if ndd.meta: 63 meta = {key: val for key, val in ndd.meta.items() if key != 'name'} 64 name = ndd.meta.get('name', 'no_name') 65 xnd += [Xndarray('meta', meta=meta)] 66 if ndd.wcs: 67 xnd += [Xndarray('wcs', meta=dict(ndd.wcs.to_header()))] 68 if not ndd.psf is None: 69 xnd += [Xndarray('psf', nda=Ndarray(ndd.psf, ntv_type=ntv_type))] 70 if not ndd.mask is None: 71 xnd += [Xndarray('data.mask', nda=ndd.mask)] 72 if not ndd.uncertainty is None: 73 typ_u = ndd.uncertainty.__class__.__name__[:3].lower() 74 ntv_type = Nutil.ntv_type( 75 ndd.uncertainty.array.dtype.name, ext=typ_u) 76 nda = Ndarray(ndd.uncertainty.array, ntv_type=ntv_type) 77 xnd += [Xndarray('data.uncertainty', nda=nda)] 78 return Xclass(xnd, name).to_canonical()
return a Xdataset from a astropy.NDData
81class XarrayConnec: 82 ''' Xarray interface with two static methods ximport and xexport''' 83 84 @staticmethod 85 def xexport(xdt, **kwargs): 86 '''return a xr.DataArray or a xr.Dataset from a Xdataset 87 88 *Parameters* 89 90 - **dataset** : Boolean (default True) - if False and a single data_var, 91 return a sc.DataArray 92 - **datagroup** : Boolean (default True) - if True, return a sc.DataGroup 93 which contains the sc.DataArray/sc.Dataset and the other data else only 94 sc.DataArray/sc.Dataset 95 ''' 96 option = {'dataset': True, 'datagroup': True} | kwargs 97 coords = XarrayConnec._to_xr_vars( 98 xdt, xdt.dimensions + xdt.coordinates) 99 coords |= XarrayConnec._to_xr_vars(xdt, xdt.additionals) 100 attrs = XarrayConnec._to_xr_attrs(xdt, **option) 101 if len(xdt.data_vars) == 1 and not option['dataset']: 102 var_name = xdt.data_vars[0] 103 data = xdt.to_ndarray(var_name) 104 dims = xdt.dims(var_name) 105 attrs |= {'ntv_type': xdt[var_name].nda.ntv_type} 106 attrs |= xdt[var_name].meta if xdt[var_name].meta else {} 107 name = var_name if var_name != 'data' else None 108 return xr.DataArray(data=data, coords=coords, dims=dims, attrs=attrs, 109 name=name) 110 data_vars = XarrayConnec._to_xr_vars(xdt, xdt.data_vars) 111 xrd = xr.Dataset(data_vars, coords=coords, attrs=attrs) 112 return xrd 113 114 @staticmethod 115 def ximport(xar, Xclass, **kwargs): 116 '''return a Xdataset from a xr.DataArray or a xr.Dataset''' 117 xnd = [] 118 if xar.attrs: 119 attrs = {k: v for k, v in xar.attrs.items() if not k in [ 120 'name', 'ntv_type']} 121 for name, meta in attrs.items(): 122 if isinstance(meta, list): 123 xnd += [Xndarray.read_json({name: meta})] 124 else: 125 xnd += [Xndarray(name, meta=meta)] 126 for coord in xar.coords: 127 xnd += [XarrayConnec._var_xr_to_xnd(xar.coords[coord])] 128 if list(xar.coords[coord].dims) == list(xar.dims) and isinstance(xar, xr.Dataset): 129 xnd[-1].links = [list(xar.data_vars)[0]] 130 if isinstance(xar, xr.DataArray): 131 var = XarrayConnec._var_xr_to_xnd( 132 xar, name='data', add_attrs=False) 133 xnd += [XarrayConnec._var_xr_to_xnd(xar, 134 name='data', add_attrs=False)] 135 xdt = Xclass(xnd, xar.attrs.get('name')) 136 for var in xdt.data_vars: 137 if var != xar.name and xar.name: 138 xdt[var].links = [xar.name] 139 return xdt.to_canonical() 140 for var in xar.data_vars: 141 xnd += [XarrayConnec._var_xr_to_xnd(xar.data_vars[var])] 142 return Xclass(xnd, xar.attrs.get('name')).to_canonical() 143 144 @staticmethod 145 def _var_xr_to_xnd(xar, name=None, add_attrs=True): 146 '''return a Xndarray from a Xarray variable 147 148 *Parameters* 149 150 - **xar** : Xarray variable to convert in Xndarray, 151 - **name** : string (default None) - default name if xar have non name, 152 - **add_attrs** : boolean (default True) - if False, attrs are not converted 153 ''' 154 full_name = xar.name if xar.name else name 155 name = Nutil.split_name(full_name)[0] 156 dims = None if xar.dims == (name,) else list(xar.dims) 157 ntv_type = xar.attrs.get('ntv_type') 158 nda = xar.values 159 if nda.dtype.name == 'datetime64[ns]' and ntv_type: 160 nda = Nutil.convert(ntv_type, nda, tojson=False) 161 attrs = {k: v for k, v in xar.attrs.items() 162 if not k in ['ntv_type', 'name']} if add_attrs else {} 163 return Xndarray(full_name, Ndarray(nda, ntv_type), dims, attrs) 164 165 @staticmethod 166 def _to_xr_attrs(xdt, **option): 167 '''return a dict with attributes from a Xdataset 168 169 *Parameters* 170 171 - **datagroup** : Boolean if True, add json representation of 'relative' 172 Xndarrays and 'data_arrays' Xndarrays 173 ''' 174 attrs = {meta: xdt[meta].meta for meta in xdt.metadata} 175 attrs |= {'name': xdt.name} if xdt.name else {} 176 if option['datagroup']: 177 for name in xdt.names: 178 if xdt[name].mode == 'relative': 179 attrs |= xdt[name].to_json(header=False) 180 for name in xdt.data_arrays: 181 attrs |= xdt[name].to_json(header=False) 182 return attrs 183 184 @staticmethod 185 def _to_xr_coord(xdt, name): 186 '''return a dict with Xarray attributes from a Xndarray defined by his name''' 187 data = xdt.to_ndarray(name) 188 if name in xdt.additionals and not xdt[name].links: 189 data = data.reshape(xdt.shape_dims(xdt[name].name)) 190 dims = tuple(xdt.dims(name)) if xdt.dims(name) else (xdt[name].name) 191 meta = {'ntv_type': xdt[name].ntv_type} | ( 192 xdt[name].meta if xdt[name].meta else {}) 193 return {name: (dims, data, meta)} 194 195 @staticmethod 196 def _to_xr_vars(xdt, list_names): 197 '''return a dict with Xarray attributes from a list of Xndarray names''' 198 arg_vars = {} 199 valid_names = [ 200 name for name in list_names if xdt[name].mode == 'absolute'] 201 for xnd_name in valid_names: 202 arg_vars |= XarrayConnec._to_xr_coord(xdt, xnd_name) 203 return arg_vars 204 205 @staticmethod 206 def _xr_add_type(xar): 207 '''add 'ntv_type' as attribute for a xr.DataArray''' 208 if isinstance(xar, xr.DataArray) and not 'ntv_type' in xar.attrs: 209 xar.attrs |= {'ntv_type': Nutil.ntv_type(xar.data.dtype.name)} 210 return 211 for coord in xar.coords: 212 XarrayConnec._xr_add_type(coord) 213 for var in xar.data_vars: 214 XarrayConnec._xr_add_type(var) 215 return
Xarray interface with two static methods ximport and xexport
84 @staticmethod 85 def xexport(xdt, **kwargs): 86 '''return a xr.DataArray or a xr.Dataset from a Xdataset 87 88 *Parameters* 89 90 - **dataset** : Boolean (default True) - if False and a single data_var, 91 return a sc.DataArray 92 - **datagroup** : Boolean (default True) - if True, return a sc.DataGroup 93 which contains the sc.DataArray/sc.Dataset and the other data else only 94 sc.DataArray/sc.Dataset 95 ''' 96 option = {'dataset': True, 'datagroup': True} | kwargs 97 coords = XarrayConnec._to_xr_vars( 98 xdt, xdt.dimensions + xdt.coordinates) 99 coords |= XarrayConnec._to_xr_vars(xdt, xdt.additionals) 100 attrs = XarrayConnec._to_xr_attrs(xdt, **option) 101 if len(xdt.data_vars) == 1 and not option['dataset']: 102 var_name = xdt.data_vars[0] 103 data = xdt.to_ndarray(var_name) 104 dims = xdt.dims(var_name) 105 attrs |= {'ntv_type': xdt[var_name].nda.ntv_type} 106 attrs |= xdt[var_name].meta if xdt[var_name].meta else {} 107 name = var_name if var_name != 'data' else None 108 return xr.DataArray(data=data, coords=coords, dims=dims, attrs=attrs, 109 name=name) 110 data_vars = XarrayConnec._to_xr_vars(xdt, xdt.data_vars) 111 xrd = xr.Dataset(data_vars, coords=coords, attrs=attrs) 112 return xrd
return a xr.DataArray or a xr.Dataset from a Xdataset
Parameters
- dataset : Boolean (default True) - if False and a single data_var, return a sc.DataArray
- datagroup : Boolean (default True) - if True, return a sc.DataGroup which contains the sc.DataArray/sc.Dataset and the other data else only sc.DataArray/sc.Dataset
114 @staticmethod 115 def ximport(xar, Xclass, **kwargs): 116 '''return a Xdataset from a xr.DataArray or a xr.Dataset''' 117 xnd = [] 118 if xar.attrs: 119 attrs = {k: v for k, v in xar.attrs.items() if not k in [ 120 'name', 'ntv_type']} 121 for name, meta in attrs.items(): 122 if isinstance(meta, list): 123 xnd += [Xndarray.read_json({name: meta})] 124 else: 125 xnd += [Xndarray(name, meta=meta)] 126 for coord in xar.coords: 127 xnd += [XarrayConnec._var_xr_to_xnd(xar.coords[coord])] 128 if list(xar.coords[coord].dims) == list(xar.dims) and isinstance(xar, xr.Dataset): 129 xnd[-1].links = [list(xar.data_vars)[0]] 130 if isinstance(xar, xr.DataArray): 131 var = XarrayConnec._var_xr_to_xnd( 132 xar, name='data', add_attrs=False) 133 xnd += [XarrayConnec._var_xr_to_xnd(xar, 134 name='data', add_attrs=False)] 135 xdt = Xclass(xnd, xar.attrs.get('name')) 136 for var in xdt.data_vars: 137 if var != xar.name and xar.name: 138 xdt[var].links = [xar.name] 139 return xdt.to_canonical() 140 for var in xar.data_vars: 141 xnd += [XarrayConnec._var_xr_to_xnd(xar.data_vars[var])] 142 return Xclass(xnd, xar.attrs.get('name')).to_canonical()
return a Xdataset from a xr.DataArray or a xr.Dataset
218class ScippConnec: 219 ''' Scipp interface with two static methods ximport and xexport''' 220 221 SCTYPE_DTYPE = {'string': 'str'} 222 223 @staticmethod 224 def xexport(xdt, **kwargs): 225 '''return a sc.DataArray or a sc.Dataset from a xdataset 226 227 *Parameters* 228 229 - **dataset** : Boolean (default True) - if False and a single data_var, 230 return a DataArray 231 - **datagroup** : Boolean (default True) - if True return a DataGroup with 232 metadata and data_arrays 233 - **ntv_type** : Boolean (default True) - if True add ntv-type to the name 234 ''' 235 option = {'dataset': True, 'datagroup': True, 236 'ntv_type': True} | kwargs 237 coords = dict([ScippConnec._to_scipp_var(xdt, name, **option) 238 for name in xdt.coordinates + xdt.dimensions 239 if xdt[name].mode == 'absolute']) 240 scd = sc.Dataset(dict([ScippConnec._to_sc_dataarray(xdt, name, coords, **option) 241 for name in xdt.data_vars 242 if xdt[name].mode == 'absolute'])) 243 scd = scd if option['dataset'] else scd[list(scd)[0]] 244 if not option['datagroup']: 245 return scd 246 sc_name = xdt.name if xdt.name else 'no_name' 247 return sc.DataGroup({sc_name: scd} | ScippConnec._to_scipp_grp(xdt, **option)) 248 249 @staticmethod 250 def ximport(sc_obj, Xclass, **kwargs): 251 '''return a xdataset from a scipp object DataArray, Dataset or DataGroup''' 252 xnd = [] 253 scd = sc_obj 254 xnd_name = None 255 if isinstance(sc_obj, sc.DataGroup): 256 for obj in sc_obj: 257 if isinstance(sc_obj[obj], (sc.Dataset, sc.DataArray)): 258 scd = sc_obj[obj] 259 xnd_name = obj 260 break 261 if isinstance(scd, sc.DataArray): 262 scd = sc.Dataset({(scd.name if scd.name else 'no_name'): scd}) 263 if isinstance(scd, sc.Dataset): 264 for coord in scd.coords: 265 xnd += ScippConnec._var_sc_to_xnd( 266 scd.coords[coord], scd, coord) 267 for var in scd: 268 for mask in scd[var].masks: 269 m_var = Nutil.split_json_name(var)[0] 270 xnd += ScippConnec._var_sc_to_xnd( 271 scd[var].masks[mask], scd, mask, m_var) 272 xnd += ScippConnec._var_sc_to_xnd(scd[var].data, scd, var) 273 if isinstance(sc_obj, sc.DataGroup): 274 xnd = ScippConnec._grp_sc_to_xnd(sc_obj, xnd) 275 return Xclass(xnd, xnd_name).to_canonical() 276 277 @staticmethod 278 def _grp_sc_to_xnd(sc_obj, xnd): 279 '''return a list of Xndarray from a scipp variable''' 280 dic_xnd = {xar.name: xar for xar in xnd} 281 for obj in sc_obj: 282 name, add_name = Nutil.split_name(obj) 283 match [name, add_name, sc_obj[obj]]: 284 case [name, None, list()]: 285 xnd += [Xndarray.read_json({name: sc_obj[obj]})] 286 case [name, add_name, sc.Variable()]: 287 xnd += ScippConnec._var_sc_to_xnd( 288 sc_obj[obj], None, add_name, name) 289 case [name, _, dict() | str() | list()] if name in dic_xnd: 290 if dic_xnd[name].meta: 291 dic_xnd[name].meta |= sc_obj[obj] 292 else: 293 dic_xnd[name].meta = sc_obj[obj] 294 case [name, _, dict() | str() | list()]: 295 xnd += [Xndarray.read_json({name: sc_obj[obj]})] 296 case [_, _, _]: ... 297 return xnd 298 299 @staticmethod 300 def _var_sc_to_xnd(scv, scd=None, sc_name='', var=None): 301 '''return a list of Xndarray from a scipp variable 302 - var : name 303 - sc_name : scipp name''' 304 l_xnda = [] 305 unit = scv.unit.name if scv.unit and not scv.unit in [ 306 'dimensionless', 'ns'] else '' 307 ext_name, typ1 = Nutil.split_json_name(sc_name, True) 308 var_name, typ2 = Nutil.split_json_name(var, True) 309 full_name = var_name + \ 310 ('.' if var_name and ext_name else '') + ext_name 311 ntv_type_base = typ1 + typ2 312 ntv_type = ntv_type_base + ('[' + unit + ']' if unit else '') 313 314 links = [Nutil.split_json_name(jsn)[0] for jsn in scv.dims] 315 if not scd is None and sc_name in scd.coords and scv.dims == scd.dims: 316 links = [Nutil.split_json_name(list(scd)[0])[0]] 317 if not scv.variances is None: 318 nda = Ndarray(scv.variances, ntv_type_base) 319 l_xnda.append(Xndarray(full_name + '.variance', nda, links)) 320 nda = Ndarray(scv.values, ntv_type) 321 l_xnda.append(Xndarray(full_name, nda, links)) 322 return l_xnda 323 324 @staticmethod 325 def _to_sc_dataarray(xdt, name, coords, **option): 326 '''return a scipp.DataArray from a xdataset.global_var defined by his name''' 327 scipp_name, data = ScippConnec._to_scipp_var(xdt, name, **option) 328 masks = dict([ScippConnec._to_scipp_var(xdt, nam, **option) 329 for nam in set(xdt.var_group(name)) & set(xdt.masks)]) 330 return (scipp_name, sc.DataArray(data, coords=coords, masks=masks)) 331 332 @staticmethod 333 def _to_scipp_grp(xdt, **option): 334 '''return a dict with metadata, data-array and data_add from a xdataset''' 335 grp = {} 336 grp |= dict([ScippConnec._to_scipp_var(xdt, name, **option) 337 for name in xdt.data_add + xdt.data_arrays 338 if xdt[name].add_name != 'variance']) 339 opt_mask = option | {'grp_mask': True} 340 grp |= dict([ScippConnec._to_scipp_var(xdt, name, **opt_mask) 341 for name in xdt.masks 342 if xdt[name].name in xdt.names and not xdt[name].name in xdt.data_vars]) 343 grp |= {name + '.meta': xdt[name].meta for name in xdt.names 344 if xdt[name].meta} 345 for name in xdt.names: 346 if xdt[name].mode == 'relative': 347 grp |= xdt[name].to_json(header=False) 348 return grp 349 350 @staticmethod 351 def _to_scipp_var(xdt, name, **kwargs): 352 '''return a scipp.Variable from a Xndarray defined by his name''' 353 option = {'grp_mask': False, 'ntv_type': True} | kwargs 354 add_name = Nutil.split_name(name)[1] 355 new_n = add_name if name in xdt.masks and not option['grp_mask'] else name 356 opt_n = option['ntv_type'] 357 values = xdt.to_ndarray(name) 358 vari_name = name + '.variance' 359 variances = xdt[vari_name].darray if vari_name in xdt.names else None 360 if not variances is None: 361 variances = variances.reshape(xdt.shape_dims(vari_name)) 362 dims = xdt.dims(name, opt_n) if xdt.dims( 363 name, opt_n) else [xdt[name].name] 364 simple_type, unit = Nutil.split_type(xdt[name].ntv_type) 365 scipp_name = new_n + (':' + simple_type if opt_n else '') 366 unit = unit if unit else '' 367 return (scipp_name, sc.array(dims=dims, values=values, 368 variances=variances, unit=unit))
Scipp interface with two static methods ximport and xexport
223 @staticmethod 224 def xexport(xdt, **kwargs): 225 '''return a sc.DataArray or a sc.Dataset from a xdataset 226 227 *Parameters* 228 229 - **dataset** : Boolean (default True) - if False and a single data_var, 230 return a DataArray 231 - **datagroup** : Boolean (default True) - if True return a DataGroup with 232 metadata and data_arrays 233 - **ntv_type** : Boolean (default True) - if True add ntv-type to the name 234 ''' 235 option = {'dataset': True, 'datagroup': True, 236 'ntv_type': True} | kwargs 237 coords = dict([ScippConnec._to_scipp_var(xdt, name, **option) 238 for name in xdt.coordinates + xdt.dimensions 239 if xdt[name].mode == 'absolute']) 240 scd = sc.Dataset(dict([ScippConnec._to_sc_dataarray(xdt, name, coords, **option) 241 for name in xdt.data_vars 242 if xdt[name].mode == 'absolute'])) 243 scd = scd if option['dataset'] else scd[list(scd)[0]] 244 if not option['datagroup']: 245 return scd 246 sc_name = xdt.name if xdt.name else 'no_name' 247 return sc.DataGroup({sc_name: scd} | ScippConnec._to_scipp_grp(xdt, **option))
return a sc.DataArray or a sc.Dataset from a xdataset
Parameters
- dataset : Boolean (default True) - if False and a single data_var, return a DataArray
- datagroup : Boolean (default True) - if True return a DataGroup with metadata and data_arrays
- ntv_type : Boolean (default True) - if True add ntv-type to the name
249 @staticmethod 250 def ximport(sc_obj, Xclass, **kwargs): 251 '''return a xdataset from a scipp object DataArray, Dataset or DataGroup''' 252 xnd = [] 253 scd = sc_obj 254 xnd_name = None 255 if isinstance(sc_obj, sc.DataGroup): 256 for obj in sc_obj: 257 if isinstance(sc_obj[obj], (sc.Dataset, sc.DataArray)): 258 scd = sc_obj[obj] 259 xnd_name = obj 260 break 261 if isinstance(scd, sc.DataArray): 262 scd = sc.Dataset({(scd.name if scd.name else 'no_name'): scd}) 263 if isinstance(scd, sc.Dataset): 264 for coord in scd.coords: 265 xnd += ScippConnec._var_sc_to_xnd( 266 scd.coords[coord], scd, coord) 267 for var in scd: 268 for mask in scd[var].masks: 269 m_var = Nutil.split_json_name(var)[0] 270 xnd += ScippConnec._var_sc_to_xnd( 271 scd[var].masks[mask], scd, mask, m_var) 272 xnd += ScippConnec._var_sc_to_xnd(scd[var].data, scd, var) 273 if isinstance(sc_obj, sc.DataGroup): 274 xnd = ScippConnec._grp_sc_to_xnd(sc_obj, xnd) 275 return Xclass(xnd, xnd_name).to_canonical()
return a xdataset from a scipp object DataArray, Dataset or DataGroup