ntv-numpy.ntv_numpy.xconnector
@author: Philippe@loco-labs.io
The xconnector
module is part of the ntv-numpy.ntv_numpy
package (specification document).
It contains interface classes with two static methods ximport
and xexport
:
XarrayConnec
class for Xarray Dataset or DataArray,AstropyNDDataConnec
class for Astropy NDData,ScippConnec
class for Scipp Dataset or DataArray,PandasConnec
class for pandas dataFrame,
For more information, see the user guide or the github repository.
1# -*- coding: utf-8 -*- 2""" 3@author: Philippe@loco-labs.io 4 5The `xconnector` module is part of the `ntv-numpy.ntv_numpy` package ([specification document]( 6https://loco-philippe.github.io/ES/JSON%20semantic%20format%20(JSON-NTV).htm)). 7 8It contains interface classes with two static methods `ximport` and `xexport`: 9- `XarrayConnec` class for Xarray Dataset or DataArray, 10- `AstropyNDDataConnec` class for Astropy NDData, 11- `ScippConnec` class for Scipp Dataset or DataArray, 12- `PandasConnec` class for pandas dataFrame, 13 14 15For more information, see the 16[user guide](https://loco-philippe.github.io/ntv-numpy/docs/user_guide.html) 17 or the [github repository](https://github.com/loco-philippe/ntv-numpy). 18""" 19 20 21import xarray as xr 22import scipp as sc 23import pandas as pd 24import numpy as np 25from astropy import wcs 26from astropy.nddata import NDData 27from astropy.nddata.nduncertainty import StdDevUncertainty, VarianceUncertainty 28from astropy.nddata.nduncertainty import InverseVariance 29from ntv_numpy.ndarray import Nutil, Ndarray 30from ntv_numpy.xndarray import Xndarray 31 32 33class AstropyNDDataConnec: 34 ''' NDData interface with two static methods ximport and xexport''' 35 36 @staticmethod 37 def xexport(xdt, **kwargs): 38 '''return a NDData from a Xdataset''' 39 data = xdt['data'].ndarray 40 mask = xdt['data.mask'].ndarray 41 unit = xdt['data'].nda.ntvtype.extension 42 uncert = xdt['data.uncertainty'].ndarray 43 typ_u = xdt['data.uncertainty'].nda.ntvtype.extension 44 match typ_u: 45 case 'std': 46 uncertainty = StdDevUncertainty(uncert) 47 case 'var': 48 uncertainty = VarianceUncertainty(uncert) 49 case 'inv': 50 uncertainty = InverseVariance(uncert) 51 case _: 52 uncertainty = uncert 53 meta = xdt['meta'].meta | {'name': xdt.name} 54 wcs_dic = xdt['wcs'].meta 55 psf = xdt['psf'].ndarray 56 return NDData(data, mask=mask, unit=unit, uncertainty=uncertainty, 57 meta=meta, wcs=wcs.WCS(wcs_dic), psf=psf) 58 59 @staticmethod 60 def ximport(ndd, Xclass, **kwargs): 61 '''return a Xdataset from a astropy.NDData''' 62 xnd = [] 63 name = 'no_name' 64 unit = ndd.unit.to_string() if ndd.unit is not None else None 65 ntv_type = Nutil.ntv_type(ndd.data.dtype.name, ext=unit) 66 xnd += [Xndarray('data', nda=Ndarray(ndd.data, ntv_type=ntv_type))] 67 if ndd.meta: 68 meta = {key: val for key, val in ndd.meta.items() if key != 'name'} 69 name = ndd.meta.get('name', 'no_name') 70 xnd += [Xndarray('meta', meta=meta)] 71 if ndd.wcs: 72 xnd += [Xndarray('wcs', meta=dict(ndd.wcs.to_header()))] 73 if ndd.psf is not None: 74 xnd += [Xndarray('psf', nda=Ndarray(ndd.psf, ntv_type=ntv_type))] 75 if ndd.mask is not None: 76 xnd += [Xndarray('data.mask', nda=ndd.mask)] 77 if ndd.uncertainty is not None: 78 typ_u = ndd.uncertainty.__class__.__name__[:3].lower() 79 ntv_type = Nutil.ntv_type( 80 ndd.uncertainty.array.dtype.name, ext=typ_u) 81 nda = Ndarray(ndd.uncertainty.array, ntv_type=ntv_type) 82 xnd += [Xndarray('data.uncertainty', nda=nda)] 83 return Xclass(xnd, name).to_canonical() 84 85 86class PandasConnec: 87 ''' pandas.DataFrame interface with two static methods ximport and xexport''' 88 89 @staticmethod 90 def xexport(xdt, **kwargs): 91 '''return a pd.DataFrame from a Xdataset 92 93 *Parameters* 94 95 - **ntv_type**: Boolean (default True) - if False use full_name else json_name 96 - **info**: Boolean (default True) - if True add xdt.info in DataFrame.attrs 97 - **dims**: list of string (default None) - order of dimensions full_name to apply 98 ''' 99 opt = {'ntv_type': True, 'info': True, 'dims': None} | kwargs 100 dic_name = {name: xdt[name].json_name if opt['ntv_type'] else xdt[name].full_name 101 for name in xdt.names} 102 dims = xdt.dimensions if not opt['dims'] else tuple(opt['dims']) 103 fields = (xdt.group(dims) + xdt.group(xdt.coordinates) + 104 xdt.group(xdt.data_vars) + xdt.uniques) 105 fields += tuple(nam for nam in xdt.group(xdt.data_arrays) 106 if len(xdt[nam]) == xdt.length) 107 fields_array = tuple(var for var in fields if not xdt[var].uri) 108 dic_series = {dic_name[name]: PandasConnec._to_np_series(xdt, name, dims) 109 for name in fields_array} 110 dfr = pd.DataFrame(dic_series) 111 index = [dic_name[name] for name in dims] 112 if index: 113 dfr = dfr.set_index(index) 114 if opt['info']: 115 dfr.attrs |= {'info': xdt.tab_info} 116 dfr.attrs |= {'metadata': { 117 name: xdt[name].meta for name in xdt.metadata}} 118 fields_uri = [var for var in fields if var not in fields_array] 119 fields_other = [nam for nam in xdt.group(xdt.data_arrays) 120 if len(xdt[nam]) != xdt.length] 121 if fields_uri: 122 dfr.attrs |= {'fields': {nam: xdt[nam].to_json(noname=True,) 123 for nam in fields_uri + fields_other}} 124 if xdt.name: 125 dfr.attrs |= {'name': xdt.name} 126 return dfr 127 128 @staticmethod 129 def ximport(df, Xclass, **kwargs): 130 '''return a Xdataset from a pd.DataFrame 131 132 *Parameters* 133 134 - dims: list of string (default None) - order of dimensions to apply 135 ''' 136 opt = {'dims': None} | kwargs 137 xnd = [] 138 dfr = df.reset_index() 139 if 'index' in dfr.columns and 'index' not in df.columns: 140 del dfr['index'] 141 df_names = {Nutil.split_json_name(j_name)[0]: j_name 142 for j_name in dfr.columns} 143 df_ntv_types = {Nutil.split_json_name(j_name)[0]: 144 Nutil.split_json_name(j_name)[1] for j_name in dfr.columns} 145 dfr.columns = [Nutil.split_json_name(name)[0] for name in dfr.columns] 146 if dfr.attrs.get('metadata'): 147 for name, meta in dfr.attrs['metadata'].items(): 148 xnd += [Xndarray.read_json({name: meta})] 149 if dfr.attrs.get('fields'): 150 for name, jsn in dfr.attrs['fields'].items(): 151 xnd += [Xndarray.read_json({name: jsn})] 152 if dfr.attrs.get('info'): 153 dimensions = dfr.attrs['info']['dimensions'] 154 data = dfr.attrs['info']['data'] 155 else: 156 dimensions, data = PandasConnec._ximport_analysis(dfr, opt['dims']) 157 shape_dfr = [data[dim]['shape'][0] 158 for dim in dimensions] if dimensions else len(dfr) 159 dfr = dfr.sort_values(dimensions) 160 for name in df_names: 161 xnd += [PandasConnec._ximport_series(data, name, dfr, dimensions, 162 shape_dfr, df_ntv_types, **opt)] 163 return Xclass(xnd, dfr.attrs.get('name')).to_canonical() 164 165 @staticmethod 166 def _ximport_analysis(dfr, opt_dims): 167 '''return data and dimensions from analysis module 168 - opt_dims: partition to apply 169 - dfr: dataframe to analyse''' 170 dfr_idx = list(dfr.index.names) 171 opt_dims = dfr_idx if dfr_idx != [None] else opt_dims 172 ana = dfr.npd.analysis(distr=True) 173 partition = ana.field_partition(partition=opt_dims, mode='id') 174 part_rel = ana.relation_partition(partition=opt_dims, noroot=True) 175 part_dim = ana.relation_partition( 176 partition=opt_dims, noroot=True, primary=True) 177 dimensions = partition['primary'] 178 len_fields = {fld.idfield: fld.lencodec for fld in ana.fields} 179 data = {fld.idfield: { 180 'shape': [len_fields[dim] for dim in part_dim[fld.idfield]] if part_dim else [], 181 'links': part_rel[fld.idfield] if part_rel else []} for fld in ana.fields} 182 for json_name in data: 183 if not data[json_name]['shape']: 184 name = Nutil.split_name(Nutil.split_json_name(json_name)[0])[0] 185 p_name = [js_name for js_name in data 186 if Nutil.split_json_name(js_name)[0] == name][0] 187 data[json_name]['shape'] = data[p_name]['shape'] 188 return (dimensions, data) 189 190 @staticmethod 191 def _ximport_series(data, name, dfr, dimensions, shape_dfr, df_ntv_types, **opt): 192 '''return a Xndarray from a Series of a pd.DataFrame''' 193 if data[name].get('xtype') == 'meta': # or len(dfr[name].unique()) == 1: 194 return Xndarray(name, meta=dfr[name].iloc[0]) 195 meta = data[name].get('meta') 196 ntv_type = df_ntv_types[name] 197 if len(dfr[name].unique()) == 1: 198 nda = Ndarray(np.array(dfr[name].iloc[0]), 199 ntv_type=ntv_type, str_uri=False) 200 nda.set_shape([1]) 201 return Xndarray(name, nda=nda, meta=meta) 202 if not dimensions: 203 nda = Ndarray(np.array(dfr[name]), ntv_type=ntv_type) 204 return Xndarray(name, nda=nda, meta=meta) 205 dims = [] 206 PandasConnec._get_dims(dims, name, data, dimensions) 207 if not dims: 208 p_name, add_name = Nutil.split_name(name) 209 if add_name: 210 PandasConnec._get_dims(dims, p_name, data, dimensions) 211 np_array = PandasConnec._from_series(dfr, name, shape_dfr, 212 dimensions, dims, opt['dims']) 213 shape = data[name].get('shape', [len(dfr)]) 214 nda = Ndarray(np_array, ntv_type, shape) 215 links = data[name].get('links') 216 return Xndarray(name, nda=nda, links=links if links else dims, meta=meta) 217 218 @staticmethod 219 def _to_np_series(xdt, name, dims): 220 '''return a np.ndarray from the Xndarray of xdt defined by his name 221 222 *parameters* 223 224 - **xdt**: Xdataset - data to convert in a pd.DataFrame 225 - **name**: string - full_name of the Xndarray to convert 226 - **dims**: list of string - order of dimensions full_name to apply''' 227 if name in xdt.uniques: 228 return np.array([xdt[name].darray[0]] * xdt.length) 229 if xdt[name].shape == [xdt.length]: 230 return xdt[name].darray 231 n_shape = {nam: len(xdt[nam]) for nam in dims} 232 dim_name = xdt.dims(name) 233 if not set(dim_name) <= set(dims): 234 return None 235 add_name = [nam for nam in dims if nam not in dim_name] 236 tab_name = add_name + dim_name 237 238 til = 1 239 for nam in add_name: 240 til *= n_shape[nam] 241 shap = [n_shape[nam] for nam in tab_name] 242 order = [dims.index(nam) for nam in tab_name] 243 arr = xdt[name].darray 244 return Nutil.extend_array(arr, til, shap, order) 245 246 @staticmethod 247 def _from_series(dfr, name, shape, dims, links, new_dims=None): 248 '''return a flattened np.ndarray from the pd.Series of dfr defined by his name 249 250 *parameters* 251 252 - dfr: DataFrame - data to convert in Xdataset 253 - name: string - name of the Series (full_name or json_name) 254 - shape: shape of the Xdataset 255 - dims: list of string - list of name of dimensions 256 - links: list of string - list of linked Series 257 - new_dims: list of string (default None) - new order of dims 258 ''' 259 if not links: 260 return np.array(dfr[name]) 261 old_order = list(range(len(dims))) 262 new_dims = new_dims if new_dims else dims 263 order = [dims.index(dim) 264 for dim in new_dims] if new_dims else old_order 265 idx = [0] * len(dims) 266 for nam in links: 267 idx[new_dims.index(nam)] = slice(shape[dims.index(nam)]) 268 xar = np.moveaxis(np.array(dfr[name]).reshape(shape), 269 old_order, order)[tuple(idx)] 270 if not links: 271 return xar.flatten() 272 lnk = [nam for nam in new_dims if nam in links] 273 shape_lnk = [shape[dims.index(nam)] for nam in lnk] 274 xar = xar.reshape(shape_lnk) 275 old_order = list(range(len(links))) 276 order = [lnk.index(dim) for dim in links] 277 return np.moveaxis(xar, old_order, order).flatten() 278 279 @staticmethod 280 def _get_dims(dims, name, data, dimensions): 281 '''add names of dimensions into dims''' 282 if not name: 283 return 284 if name in dimensions: 285 dims += [name] 286 else: 287 if 'links' not in data[name]: 288 return 289 for nam in data[name]['links']: 290 PandasConnec._get_dims(dims, nam, data, dimensions) 291 292 293class XarrayConnec: 294 ''' Xarray interface with two static methods ximport and xexport''' 295 296 @staticmethod 297 def xexport(xdt, **kwargs): 298 '''return a xr.DataArray or a xr.Dataset from a Xdataset 299 300 *Parameters* 301 302 - **dataset** : Boolean (default True) - if False and a single data_var, 303 return a xr.DataArray 304 - **info** : Boolean (default True) - if True, add json representation 305 of 'relative' Xndarrays and 'data_arrays' Xndarrays in attrs 306 ''' 307 option = {'dataset': True, 'info': True} | kwargs 308 coords = XarrayConnec._to_xr_vars( 309 xdt, xdt.dimensions + xdt.coordinates + xdt.uniques) 310 coords |= XarrayConnec._to_xr_vars(xdt, xdt.additionals) 311 attrs = XarrayConnec._to_xr_attrs(xdt, **option) 312 if len(xdt.data_vars) == 1 and not option['dataset']: 313 var_name = xdt.data_vars[0] 314 data = xdt.to_ndarray(var_name) 315 dims = xdt.dims(var_name) 316 attrs |= {'ntv_type': xdt[var_name].nda.ntv_type} 317 attrs |= xdt[var_name].meta if xdt[var_name].meta else {} 318 name = var_name if var_name != 'data' else None 319 xrd = xr.DataArray(data=data, coords=coords, dims=dims, attrs=attrs, 320 name=name) 321 else: 322 data_vars = XarrayConnec._to_xr_vars(xdt, xdt.data_vars) 323 xrd = xr.Dataset(data_vars, coords=coords, attrs=attrs) 324 for unic in xdt.uniques: 325 xrd[unic].attrs |= {'ntv_type': xdt[unic].ntv_type} | ( 326 xdt[unic].meta if xdt[unic].meta else {}) 327 return xrd 328 329 @staticmethod 330 def ximport(xar, Xclass, **kwargs): 331 '''return a Xdataset from a xr.DataArray or a xr.Dataset''' 332 xnd = [] 333 if xar.attrs: 334 attrs = {k: v for k, v in xar.attrs.items() if k not in [ 335 'name', 'ntv_type']} 336 for name, meta in attrs.items(): 337 if isinstance(meta, list): 338 xnd += [Xndarray.read_json({name: meta})] 339 else: 340 xnd += [Xndarray(name, meta=meta)] 341 for coord in xar.coords: 342 xnd += [XarrayConnec._var_xr_to_xnd(xar.coords[coord])] 343 if list(xar.coords[coord].dims) == list(xar.dims) and isinstance(xar, xr.Dataset): 344 xnd[-1].links = [list(xar.data_vars)[0]] 345 if isinstance(xar, xr.DataArray): 346 var = XarrayConnec._var_xr_to_xnd( 347 xar, name='data', add_attrs=False) 348 xnd += [XarrayConnec._var_xr_to_xnd(xar, 349 name='data', add_attrs=False)] 350 xdt = Xclass(xnd, xar.attrs.get('name')) 351 for var in xdt.data_vars: 352 if var != xar.name and xar.name: 353 xdt[var].links = [xar.name] 354 return xdt.to_canonical() 355 for var in xar.data_vars: 356 xnd += [XarrayConnec._var_xr_to_xnd(xar.data_vars[var])] 357 return Xclass(xnd, xar.attrs.get('name')).to_canonical() 358 359 @staticmethod 360 def _var_xr_to_xnd(var, name=None, add_attrs=True): 361 '''return a Xndarray from a Xarray variable 362 363 *Parameters* 364 365 - **var** : Xarray variable to convert in Xndarray, 366 - **name** : string (default None) - default name if var have no name, 367 - **add_attrs** : boolean (default True) - if False, attrs are not converted 368 ''' 369 full_name = var.name if var.name else name 370 name = Nutil.split_name(full_name)[0] 371 dims = None if var.dims == (name,) or var.size == 1 else list(var.dims) 372 ntv_type = var.attrs.get('ntv_type') 373 nda = var.values 374 nda = nda.reshape(1) if not nda.shape else nda 375 if nda.dtype.name == 'datetime64[ns]' and ntv_type: 376 nda = Nutil.convert(ntv_type, nda, tojson=False) 377 attrs = {k: v for k, v in var.attrs.items() 378 if k not in ['ntv_type', 'name']} if add_attrs else {} 379 return Xndarray(full_name, Ndarray(nda, ntv_type), dims, attrs) 380 381 @staticmethod 382 def _to_xr_attrs(xdt, **option): 383 '''return a dict with attributes from a Xdataset 384 385 *Parameters* 386 387 - **info** : Boolean if True, add json representation of 'relative' 388 Xndarrays and 'data_arrays' Xndarrays in attrs 389 ''' 390 attrs = {meta: xdt[meta].meta for meta in xdt.metadata} 391 attrs |= {'name': xdt.name} if xdt.name else {} 392 if option['info']: 393 for name in xdt.names: 394 if xdt[name].mode == 'relative': 395 attrs |= xdt[name].to_json(header=False) 396 for name in xdt.data_arrays: 397 attrs |= xdt[name].to_json(header=False) 398 return attrs 399 400 @staticmethod 401 def _to_xr_coord(xdt, name): 402 '''return a dict with Xarray attributes from a Xndarray defined by his name''' 403 data = xdt.to_ndarray(name) 404 if name in xdt.uniques: 405 return {name: data[0]} 406 if name in xdt.additionals and not xdt[name].links: 407 data = data.reshape(xdt.shape_dims(xdt[name].name)) 408 dims = tuple(xdt.dims(name)) if xdt.dims(name) else (xdt[name].name) 409 meta = {'ntv_type': xdt[name].ntv_type} | ( 410 xdt[name].meta if xdt[name].meta else {}) 411 return {name: (dims, data, meta)} 412 413 @staticmethod 414 def _to_xr_vars(xdt, list_names): 415 '''return a dict with Xarray attributes from a list of Xndarray names''' 416 arg_vars = {} 417 valid_names = [ 418 nam for nam in list_names if xdt[nam].mode == 'absolute'] 419 for xnd_name in valid_names: 420 arg_vars |= XarrayConnec._to_xr_coord(xdt, xnd_name) 421 for name in list_names: 422 if xdt[name].xtype == 'meta': 423 arg_vars |= {name: xdt[name].meta} 424 return arg_vars 425 426 @staticmethod 427 def _xr_add_type(xar): 428 '''add 'ntv_type' as attribute for a xr.DataArray''' 429 if isinstance(xar, xr.DataArray) and 'ntv_type' not in xar.attrs: 430 xar.attrs |= {'ntv_type': Nutil.ntv_type(xar.data.dtype.name)} 431 return 432 for coord in xar.coords: 433 XarrayConnec._xr_add_type(coord) 434 for var in xar.data_vars: 435 XarrayConnec._xr_add_type(var) 436 return 437 438 439class ScippConnec: 440 ''' Scipp interface with two static methods ximport and xexport''' 441 442 SCTYPE_DTYPE = {'string': 'str'} 443 444 @staticmethod 445 def xexport(xdt, **kwargs): 446 '''return a sc.DataArray or a sc.Dataset from a xdataset 447 448 *Parameters* 449 450 - **dataset** : Boolean (default True) - if False and a single data_var, 451 return a DataArray 452 - **info** : Boolean (default True) - if True return a DataGroup with 453 metadata and data_arrays 454 - **ntv_type** : Boolean (default True) - if True add ntv-type to the name 455 ''' 456 option = {'dataset': True, 'info': True, 457 'ntv_type': True} | kwargs 458 coords = dict([ScippConnec._to_scipp_var(xdt, name, **option) 459 for name in xdt.coordinates + xdt.dimensions + xdt.uniques 460 if xdt[name].mode == 'absolute']) 461 scd = sc.Dataset(dict([ScippConnec._to_sc_dataarray(xdt, name, coords, **option) 462 for name in xdt.data_vars 463 if xdt[name].mode == 'absolute'])) 464 scd = scd if option['dataset'] else scd[list(scd)[0]] 465 if not option['info']: 466 return scd 467 sc_name = xdt.name if xdt.name else 'no_name' 468 return sc.DataGroup({sc_name: scd} | ScippConnec._to_scipp_grp(xdt, **option)) 469 470 @staticmethod 471 def ximport(sc_obj, Xclass, **kwargs): 472 '''return a xdataset from a scipp object DataArray, Dataset or DataGroup''' 473 xnd = [] 474 scd = sc_obj 475 xnd_name = None 476 if isinstance(sc_obj, sc.DataGroup): 477 for obj in sc_obj: 478 if isinstance(sc_obj[obj], (sc.Dataset, sc.DataArray)): 479 scd = sc_obj[obj] 480 xnd_name = obj 481 break 482 if isinstance(scd, sc.DataArray): 483 scd = sc.Dataset({(scd.name if scd.name else 'no_name'): scd}) 484 if isinstance(scd, sc.Dataset): 485 for coord in scd.coords: 486 xnd += ScippConnec._var_sc_to_xnd( 487 scd.coords[coord], scd, coord) 488 for var in scd: 489 for mask in scd[var].masks: 490 m_var = Nutil.split_json_name(var)[0] 491 xnd += ScippConnec._var_sc_to_xnd( 492 scd[var].masks[mask], scd, mask, m_var) 493 xnd += ScippConnec._var_sc_to_xnd(scd[var].data, scd, var) 494 if isinstance(sc_obj, sc.DataGroup): 495 xnd = ScippConnec._grp_sc_to_xnd(sc_obj, xnd) 496 return Xclass(xnd, xnd_name).to_canonical() 497 498 @staticmethod 499 def _grp_sc_to_xnd(sc_obj, xnd): 500 '''return a list of Xndarray from a scipp variable''' 501 dic_xnd = {xar.name: xar for xar in xnd} 502 for obj in sc_obj: 503 name, add_name = Nutil.split_name(obj) 504 match [name, add_name, sc_obj[obj]]: 505 case [name, None, list()]: 506 xnd += [Xndarray.read_json({name: sc_obj[obj]})] 507 case [name, add_name, sc.Variable()]: 508 xnd += ScippConnec._var_sc_to_xnd( 509 sc_obj[obj], None, add_name, name) 510 case [name, _, dict() | str() | list()] if name in dic_xnd: 511 if dic_xnd[name].meta: 512 dic_xnd[name].meta |= sc_obj[obj] 513 else: 514 dic_xnd[name].meta = sc_obj[obj] 515 case [name, _, dict() | str() | list()]: 516 xnd += [Xndarray.read_json({name: sc_obj[obj]})] 517 case [_, _, _]: ... 518 return xnd 519 520 @staticmethod 521 def _var_sc_to_xnd(scv, scd=None, sc_name='', var=None): 522 '''return a list of Xndarray from a scipp variable 523 - scd : scipp dataset 524 - scv : scipp variable 525 - var : name 526 - sc_name : scipp name''' 527 l_xnda = [] 528 unit = scv.unit.name if scv.unit and scv.unit not in [ 529 'dimensionless', 'ns'] else '' 530 ext_name, typ1 = Nutil.split_json_name(sc_name, True) 531 var_name, typ2 = Nutil.split_json_name(var, True) 532 full_name = var_name + \ 533 ('.' if var_name and ext_name else '') + ext_name 534 ntv_type_base = typ1 + typ2 535 ntv_type = ntv_type_base + ('[' + unit + ']' if unit else '') 536 links = [Nutil.split_json_name(jsn)[0] for jsn in scv.dims] 537 if scd is not None and sc_name in scd.coords and scv.dims == scd.dims: 538 links = [Nutil.split_json_name(list(scd)[0])[0]] 539 if scv.variances is not None: 540 nda = Ndarray(scv.variances, ntv_type_base) 541 l_xnda.append(Xndarray(full_name + '.variance', nda, links)) 542 nda = Ndarray(scv.values, ntv_type, str_uri=False) 543 shape = scv.shape if scv.shape else (1,) 544 nda.set_shape(shape) 545 l_xnda.append(Xndarray(full_name, nda, links)) 546 return l_xnda 547 548 @staticmethod 549 def _to_sc_dataarray(xdt, name, coords, **option): 550 '''return a scipp.DataArray from a xdataset.global_var defined by his name''' 551 scipp_name, data = ScippConnec._to_scipp_var(xdt, name, **option) 552 masks = dict([ScippConnec._to_scipp_var(xdt, nam, **option) 553 for nam in set(xdt.group(name)) & set(xdt.masks)]) 554 return (scipp_name, sc.DataArray(data, coords=coords, masks=masks)) 555 556 @staticmethod 557 def _to_scipp_grp(xdt, **option): 558 '''return a dict with metadata, data-array and data_add from a xdataset''' 559 grp = {} 560 grp |= dict([ScippConnec._to_scipp_var(xdt, name, **option) 561 for name in xdt.data_add + xdt.data_arrays 562 if xdt[name].add_name != 'variance']) 563 opt_mask = option | {'grp_mask': True} 564 grp |= dict([ScippConnec._to_scipp_var(xdt, name, **opt_mask) 565 for name in xdt.masks 566 if xdt[name].name in xdt.names and xdt[name].name not in xdt.data_vars]) 567 grp |= {name + '.meta': xdt[name].meta for name in xdt.names 568 if xdt[name].meta} 569 for name in xdt.names: 570 if xdt[name].mode == 'relative': 571 grp |= xdt[name].to_json(header=False) 572 return grp 573 574 @staticmethod 575 def _to_scipp_var(xdt, name, **kwargs): 576 '''return a scipp.Variable from a Xndarray defined by his name''' 577 option = {'grp_mask': False, 'ntv_type': True} | kwargs 578 simple_type, unit = Nutil.split_type(xdt[name].ntv_type) 579 unit = unit if unit else '' 580 add_name = Nutil.split_name(name)[1] 581 new_n = add_name if name in xdt.masks and not option['grp_mask'] else name 582 opt_n = option['ntv_type'] 583 scipp_name = new_n + (':' + simple_type if opt_n else '') 584 if name in xdt.uniques: 585 return (scipp_name, sc.scalar(xdt[name].darray[0], unit=unit)) 586 vari_name = name + '.variance' 587 variances = xdt[vari_name].darray if vari_name in xdt.names else None 588 dims = xdt.dims(name, opt_n) if xdt.dims( 589 name, opt_n) else [xdt[name].name] 590 var = sc.array(dims=['flat'], values=xdt.to_darray( 591 name), variances=variances, unit=unit) 592 var = sc.fold(var, dim='flat', sizes=dict(zip(dims, xdt[name].shape))) 593 return (scipp_name, var)
34class AstropyNDDataConnec: 35 ''' NDData interface with two static methods ximport and xexport''' 36 37 @staticmethod 38 def xexport(xdt, **kwargs): 39 '''return a NDData from a Xdataset''' 40 data = xdt['data'].ndarray 41 mask = xdt['data.mask'].ndarray 42 unit = xdt['data'].nda.ntvtype.extension 43 uncert = xdt['data.uncertainty'].ndarray 44 typ_u = xdt['data.uncertainty'].nda.ntvtype.extension 45 match typ_u: 46 case 'std': 47 uncertainty = StdDevUncertainty(uncert) 48 case 'var': 49 uncertainty = VarianceUncertainty(uncert) 50 case 'inv': 51 uncertainty = InverseVariance(uncert) 52 case _: 53 uncertainty = uncert 54 meta = xdt['meta'].meta | {'name': xdt.name} 55 wcs_dic = xdt['wcs'].meta 56 psf = xdt['psf'].ndarray 57 return NDData(data, mask=mask, unit=unit, uncertainty=uncertainty, 58 meta=meta, wcs=wcs.WCS(wcs_dic), psf=psf) 59 60 @staticmethod 61 def ximport(ndd, Xclass, **kwargs): 62 '''return a Xdataset from a astropy.NDData''' 63 xnd = [] 64 name = 'no_name' 65 unit = ndd.unit.to_string() if ndd.unit is not None else None 66 ntv_type = Nutil.ntv_type(ndd.data.dtype.name, ext=unit) 67 xnd += [Xndarray('data', nda=Ndarray(ndd.data, ntv_type=ntv_type))] 68 if ndd.meta: 69 meta = {key: val for key, val in ndd.meta.items() if key != 'name'} 70 name = ndd.meta.get('name', 'no_name') 71 xnd += [Xndarray('meta', meta=meta)] 72 if ndd.wcs: 73 xnd += [Xndarray('wcs', meta=dict(ndd.wcs.to_header()))] 74 if ndd.psf is not None: 75 xnd += [Xndarray('psf', nda=Ndarray(ndd.psf, ntv_type=ntv_type))] 76 if ndd.mask is not None: 77 xnd += [Xndarray('data.mask', nda=ndd.mask)] 78 if ndd.uncertainty is not None: 79 typ_u = ndd.uncertainty.__class__.__name__[:3].lower() 80 ntv_type = Nutil.ntv_type( 81 ndd.uncertainty.array.dtype.name, ext=typ_u) 82 nda = Ndarray(ndd.uncertainty.array, ntv_type=ntv_type) 83 xnd += [Xndarray('data.uncertainty', nda=nda)] 84 return Xclass(xnd, name).to_canonical()
NDData interface with two static methods ximport and xexport
37 @staticmethod 38 def xexport(xdt, **kwargs): 39 '''return a NDData from a Xdataset''' 40 data = xdt['data'].ndarray 41 mask = xdt['data.mask'].ndarray 42 unit = xdt['data'].nda.ntvtype.extension 43 uncert = xdt['data.uncertainty'].ndarray 44 typ_u = xdt['data.uncertainty'].nda.ntvtype.extension 45 match typ_u: 46 case 'std': 47 uncertainty = StdDevUncertainty(uncert) 48 case 'var': 49 uncertainty = VarianceUncertainty(uncert) 50 case 'inv': 51 uncertainty = InverseVariance(uncert) 52 case _: 53 uncertainty = uncert 54 meta = xdt['meta'].meta | {'name': xdt.name} 55 wcs_dic = xdt['wcs'].meta 56 psf = xdt['psf'].ndarray 57 return NDData(data, mask=mask, unit=unit, uncertainty=uncertainty, 58 meta=meta, wcs=wcs.WCS(wcs_dic), psf=psf)
return a NDData from a Xdataset
60 @staticmethod 61 def ximport(ndd, Xclass, **kwargs): 62 '''return a Xdataset from a astropy.NDData''' 63 xnd = [] 64 name = 'no_name' 65 unit = ndd.unit.to_string() if ndd.unit is not None else None 66 ntv_type = Nutil.ntv_type(ndd.data.dtype.name, ext=unit) 67 xnd += [Xndarray('data', nda=Ndarray(ndd.data, ntv_type=ntv_type))] 68 if ndd.meta: 69 meta = {key: val for key, val in ndd.meta.items() if key != 'name'} 70 name = ndd.meta.get('name', 'no_name') 71 xnd += [Xndarray('meta', meta=meta)] 72 if ndd.wcs: 73 xnd += [Xndarray('wcs', meta=dict(ndd.wcs.to_header()))] 74 if ndd.psf is not None: 75 xnd += [Xndarray('psf', nda=Ndarray(ndd.psf, ntv_type=ntv_type))] 76 if ndd.mask is not None: 77 xnd += [Xndarray('data.mask', nda=ndd.mask)] 78 if ndd.uncertainty is not None: 79 typ_u = ndd.uncertainty.__class__.__name__[:3].lower() 80 ntv_type = Nutil.ntv_type( 81 ndd.uncertainty.array.dtype.name, ext=typ_u) 82 nda = Ndarray(ndd.uncertainty.array, ntv_type=ntv_type) 83 xnd += [Xndarray('data.uncertainty', nda=nda)] 84 return Xclass(xnd, name).to_canonical()
return a Xdataset from a astropy.NDData
87class PandasConnec: 88 ''' pandas.DataFrame interface with two static methods ximport and xexport''' 89 90 @staticmethod 91 def xexport(xdt, **kwargs): 92 '''return a pd.DataFrame from a Xdataset 93 94 *Parameters* 95 96 - **ntv_type**: Boolean (default True) - if False use full_name else json_name 97 - **info**: Boolean (default True) - if True add xdt.info in DataFrame.attrs 98 - **dims**: list of string (default None) - order of dimensions full_name to apply 99 ''' 100 opt = {'ntv_type': True, 'info': True, 'dims': None} | kwargs 101 dic_name = {name: xdt[name].json_name if opt['ntv_type'] else xdt[name].full_name 102 for name in xdt.names} 103 dims = xdt.dimensions if not opt['dims'] else tuple(opt['dims']) 104 fields = (xdt.group(dims) + xdt.group(xdt.coordinates) + 105 xdt.group(xdt.data_vars) + xdt.uniques) 106 fields += tuple(nam for nam in xdt.group(xdt.data_arrays) 107 if len(xdt[nam]) == xdt.length) 108 fields_array = tuple(var for var in fields if not xdt[var].uri) 109 dic_series = {dic_name[name]: PandasConnec._to_np_series(xdt, name, dims) 110 for name in fields_array} 111 dfr = pd.DataFrame(dic_series) 112 index = [dic_name[name] for name in dims] 113 if index: 114 dfr = dfr.set_index(index) 115 if opt['info']: 116 dfr.attrs |= {'info': xdt.tab_info} 117 dfr.attrs |= {'metadata': { 118 name: xdt[name].meta for name in xdt.metadata}} 119 fields_uri = [var for var in fields if var not in fields_array] 120 fields_other = [nam for nam in xdt.group(xdt.data_arrays) 121 if len(xdt[nam]) != xdt.length] 122 if fields_uri: 123 dfr.attrs |= {'fields': {nam: xdt[nam].to_json(noname=True,) 124 for nam in fields_uri + fields_other}} 125 if xdt.name: 126 dfr.attrs |= {'name': xdt.name} 127 return dfr 128 129 @staticmethod 130 def ximport(df, Xclass, **kwargs): 131 '''return a Xdataset from a pd.DataFrame 132 133 *Parameters* 134 135 - dims: list of string (default None) - order of dimensions to apply 136 ''' 137 opt = {'dims': None} | kwargs 138 xnd = [] 139 dfr = df.reset_index() 140 if 'index' in dfr.columns and 'index' not in df.columns: 141 del dfr['index'] 142 df_names = {Nutil.split_json_name(j_name)[0]: j_name 143 for j_name in dfr.columns} 144 df_ntv_types = {Nutil.split_json_name(j_name)[0]: 145 Nutil.split_json_name(j_name)[1] for j_name in dfr.columns} 146 dfr.columns = [Nutil.split_json_name(name)[0] for name in dfr.columns] 147 if dfr.attrs.get('metadata'): 148 for name, meta in dfr.attrs['metadata'].items(): 149 xnd += [Xndarray.read_json({name: meta})] 150 if dfr.attrs.get('fields'): 151 for name, jsn in dfr.attrs['fields'].items(): 152 xnd += [Xndarray.read_json({name: jsn})] 153 if dfr.attrs.get('info'): 154 dimensions = dfr.attrs['info']['dimensions'] 155 data = dfr.attrs['info']['data'] 156 else: 157 dimensions, data = PandasConnec._ximport_analysis(dfr, opt['dims']) 158 shape_dfr = [data[dim]['shape'][0] 159 for dim in dimensions] if dimensions else len(dfr) 160 dfr = dfr.sort_values(dimensions) 161 for name in df_names: 162 xnd += [PandasConnec._ximport_series(data, name, dfr, dimensions, 163 shape_dfr, df_ntv_types, **opt)] 164 return Xclass(xnd, dfr.attrs.get('name')).to_canonical() 165 166 @staticmethod 167 def _ximport_analysis(dfr, opt_dims): 168 '''return data and dimensions from analysis module 169 - opt_dims: partition to apply 170 - dfr: dataframe to analyse''' 171 dfr_idx = list(dfr.index.names) 172 opt_dims = dfr_idx if dfr_idx != [None] else opt_dims 173 ana = dfr.npd.analysis(distr=True) 174 partition = ana.field_partition(partition=opt_dims, mode='id') 175 part_rel = ana.relation_partition(partition=opt_dims, noroot=True) 176 part_dim = ana.relation_partition( 177 partition=opt_dims, noroot=True, primary=True) 178 dimensions = partition['primary'] 179 len_fields = {fld.idfield: fld.lencodec for fld in ana.fields} 180 data = {fld.idfield: { 181 'shape': [len_fields[dim] for dim in part_dim[fld.idfield]] if part_dim else [], 182 'links': part_rel[fld.idfield] if part_rel else []} for fld in ana.fields} 183 for json_name in data: 184 if not data[json_name]['shape']: 185 name = Nutil.split_name(Nutil.split_json_name(json_name)[0])[0] 186 p_name = [js_name for js_name in data 187 if Nutil.split_json_name(js_name)[0] == name][0] 188 data[json_name]['shape'] = data[p_name]['shape'] 189 return (dimensions, data) 190 191 @staticmethod 192 def _ximport_series(data, name, dfr, dimensions, shape_dfr, df_ntv_types, **opt): 193 '''return a Xndarray from a Series of a pd.DataFrame''' 194 if data[name].get('xtype') == 'meta': # or len(dfr[name].unique()) == 1: 195 return Xndarray(name, meta=dfr[name].iloc[0]) 196 meta = data[name].get('meta') 197 ntv_type = df_ntv_types[name] 198 if len(dfr[name].unique()) == 1: 199 nda = Ndarray(np.array(dfr[name].iloc[0]), 200 ntv_type=ntv_type, str_uri=False) 201 nda.set_shape([1]) 202 return Xndarray(name, nda=nda, meta=meta) 203 if not dimensions: 204 nda = Ndarray(np.array(dfr[name]), ntv_type=ntv_type) 205 return Xndarray(name, nda=nda, meta=meta) 206 dims = [] 207 PandasConnec._get_dims(dims, name, data, dimensions) 208 if not dims: 209 p_name, add_name = Nutil.split_name(name) 210 if add_name: 211 PandasConnec._get_dims(dims, p_name, data, dimensions) 212 np_array = PandasConnec._from_series(dfr, name, shape_dfr, 213 dimensions, dims, opt['dims']) 214 shape = data[name].get('shape', [len(dfr)]) 215 nda = Ndarray(np_array, ntv_type, shape) 216 links = data[name].get('links') 217 return Xndarray(name, nda=nda, links=links if links else dims, meta=meta) 218 219 @staticmethod 220 def _to_np_series(xdt, name, dims): 221 '''return a np.ndarray from the Xndarray of xdt defined by his name 222 223 *parameters* 224 225 - **xdt**: Xdataset - data to convert in a pd.DataFrame 226 - **name**: string - full_name of the Xndarray to convert 227 - **dims**: list of string - order of dimensions full_name to apply''' 228 if name in xdt.uniques: 229 return np.array([xdt[name].darray[0]] * xdt.length) 230 if xdt[name].shape == [xdt.length]: 231 return xdt[name].darray 232 n_shape = {nam: len(xdt[nam]) for nam in dims} 233 dim_name = xdt.dims(name) 234 if not set(dim_name) <= set(dims): 235 return None 236 add_name = [nam for nam in dims if nam not in dim_name] 237 tab_name = add_name + dim_name 238 239 til = 1 240 for nam in add_name: 241 til *= n_shape[nam] 242 shap = [n_shape[nam] for nam in tab_name] 243 order = [dims.index(nam) for nam in tab_name] 244 arr = xdt[name].darray 245 return Nutil.extend_array(arr, til, shap, order) 246 247 @staticmethod 248 def _from_series(dfr, name, shape, dims, links, new_dims=None): 249 '''return a flattened np.ndarray from the pd.Series of dfr defined by his name 250 251 *parameters* 252 253 - dfr: DataFrame - data to convert in Xdataset 254 - name: string - name of the Series (full_name or json_name) 255 - shape: shape of the Xdataset 256 - dims: list of string - list of name of dimensions 257 - links: list of string - list of linked Series 258 - new_dims: list of string (default None) - new order of dims 259 ''' 260 if not links: 261 return np.array(dfr[name]) 262 old_order = list(range(len(dims))) 263 new_dims = new_dims if new_dims else dims 264 order = [dims.index(dim) 265 for dim in new_dims] if new_dims else old_order 266 idx = [0] * len(dims) 267 for nam in links: 268 idx[new_dims.index(nam)] = slice(shape[dims.index(nam)]) 269 xar = np.moveaxis(np.array(dfr[name]).reshape(shape), 270 old_order, order)[tuple(idx)] 271 if not links: 272 return xar.flatten() 273 lnk = [nam for nam in new_dims if nam in links] 274 shape_lnk = [shape[dims.index(nam)] for nam in lnk] 275 xar = xar.reshape(shape_lnk) 276 old_order = list(range(len(links))) 277 order = [lnk.index(dim) for dim in links] 278 return np.moveaxis(xar, old_order, order).flatten() 279 280 @staticmethod 281 def _get_dims(dims, name, data, dimensions): 282 '''add names of dimensions into dims''' 283 if not name: 284 return 285 if name in dimensions: 286 dims += [name] 287 else: 288 if 'links' not in data[name]: 289 return 290 for nam in data[name]['links']: 291 PandasConnec._get_dims(dims, nam, data, dimensions)
pandas.DataFrame interface with two static methods ximport and xexport
90 @staticmethod 91 def xexport(xdt, **kwargs): 92 '''return a pd.DataFrame from a Xdataset 93 94 *Parameters* 95 96 - **ntv_type**: Boolean (default True) - if False use full_name else json_name 97 - **info**: Boolean (default True) - if True add xdt.info in DataFrame.attrs 98 - **dims**: list of string (default None) - order of dimensions full_name to apply 99 ''' 100 opt = {'ntv_type': True, 'info': True, 'dims': None} | kwargs 101 dic_name = {name: xdt[name].json_name if opt['ntv_type'] else xdt[name].full_name 102 for name in xdt.names} 103 dims = xdt.dimensions if not opt['dims'] else tuple(opt['dims']) 104 fields = (xdt.group(dims) + xdt.group(xdt.coordinates) + 105 xdt.group(xdt.data_vars) + xdt.uniques) 106 fields += tuple(nam for nam in xdt.group(xdt.data_arrays) 107 if len(xdt[nam]) == xdt.length) 108 fields_array = tuple(var for var in fields if not xdt[var].uri) 109 dic_series = {dic_name[name]: PandasConnec._to_np_series(xdt, name, dims) 110 for name in fields_array} 111 dfr = pd.DataFrame(dic_series) 112 index = [dic_name[name] for name in dims] 113 if index: 114 dfr = dfr.set_index(index) 115 if opt['info']: 116 dfr.attrs |= {'info': xdt.tab_info} 117 dfr.attrs |= {'metadata': { 118 name: xdt[name].meta for name in xdt.metadata}} 119 fields_uri = [var for var in fields if var not in fields_array] 120 fields_other = [nam for nam in xdt.group(xdt.data_arrays) 121 if len(xdt[nam]) != xdt.length] 122 if fields_uri: 123 dfr.attrs |= {'fields': {nam: xdt[nam].to_json(noname=True,) 124 for nam in fields_uri + fields_other}} 125 if xdt.name: 126 dfr.attrs |= {'name': xdt.name} 127 return dfr
return a pd.DataFrame from a Xdataset
Parameters
- ntv_type: Boolean (default True) - if False use full_name else json_name
- info: Boolean (default True) - if True add xdt.info in DataFrame.attrs
- dims: list of string (default None) - order of dimensions full_name to apply
129 @staticmethod 130 def ximport(df, Xclass, **kwargs): 131 '''return a Xdataset from a pd.DataFrame 132 133 *Parameters* 134 135 - dims: list of string (default None) - order of dimensions to apply 136 ''' 137 opt = {'dims': None} | kwargs 138 xnd = [] 139 dfr = df.reset_index() 140 if 'index' in dfr.columns and 'index' not in df.columns: 141 del dfr['index'] 142 df_names = {Nutil.split_json_name(j_name)[0]: j_name 143 for j_name in dfr.columns} 144 df_ntv_types = {Nutil.split_json_name(j_name)[0]: 145 Nutil.split_json_name(j_name)[1] for j_name in dfr.columns} 146 dfr.columns = [Nutil.split_json_name(name)[0] for name in dfr.columns] 147 if dfr.attrs.get('metadata'): 148 for name, meta in dfr.attrs['metadata'].items(): 149 xnd += [Xndarray.read_json({name: meta})] 150 if dfr.attrs.get('fields'): 151 for name, jsn in dfr.attrs['fields'].items(): 152 xnd += [Xndarray.read_json({name: jsn})] 153 if dfr.attrs.get('info'): 154 dimensions = dfr.attrs['info']['dimensions'] 155 data = dfr.attrs['info']['data'] 156 else: 157 dimensions, data = PandasConnec._ximport_analysis(dfr, opt['dims']) 158 shape_dfr = [data[dim]['shape'][0] 159 for dim in dimensions] if dimensions else len(dfr) 160 dfr = dfr.sort_values(dimensions) 161 for name in df_names: 162 xnd += [PandasConnec._ximport_series(data, name, dfr, dimensions, 163 shape_dfr, df_ntv_types, **opt)] 164 return Xclass(xnd, dfr.attrs.get('name')).to_canonical()
return a Xdataset from a pd.DataFrame
Parameters
- dims: list of string (default None) - order of dimensions to apply
294class XarrayConnec: 295 ''' Xarray interface with two static methods ximport and xexport''' 296 297 @staticmethod 298 def xexport(xdt, **kwargs): 299 '''return a xr.DataArray or a xr.Dataset from a Xdataset 300 301 *Parameters* 302 303 - **dataset** : Boolean (default True) - if False and a single data_var, 304 return a xr.DataArray 305 - **info** : Boolean (default True) - if True, add json representation 306 of 'relative' Xndarrays and 'data_arrays' Xndarrays in attrs 307 ''' 308 option = {'dataset': True, 'info': True} | kwargs 309 coords = XarrayConnec._to_xr_vars( 310 xdt, xdt.dimensions + xdt.coordinates + xdt.uniques) 311 coords |= XarrayConnec._to_xr_vars(xdt, xdt.additionals) 312 attrs = XarrayConnec._to_xr_attrs(xdt, **option) 313 if len(xdt.data_vars) == 1 and not option['dataset']: 314 var_name = xdt.data_vars[0] 315 data = xdt.to_ndarray(var_name) 316 dims = xdt.dims(var_name) 317 attrs |= {'ntv_type': xdt[var_name].nda.ntv_type} 318 attrs |= xdt[var_name].meta if xdt[var_name].meta else {} 319 name = var_name if var_name != 'data' else None 320 xrd = xr.DataArray(data=data, coords=coords, dims=dims, attrs=attrs, 321 name=name) 322 else: 323 data_vars = XarrayConnec._to_xr_vars(xdt, xdt.data_vars) 324 xrd = xr.Dataset(data_vars, coords=coords, attrs=attrs) 325 for unic in xdt.uniques: 326 xrd[unic].attrs |= {'ntv_type': xdt[unic].ntv_type} | ( 327 xdt[unic].meta if xdt[unic].meta else {}) 328 return xrd 329 330 @staticmethod 331 def ximport(xar, Xclass, **kwargs): 332 '''return a Xdataset from a xr.DataArray or a xr.Dataset''' 333 xnd = [] 334 if xar.attrs: 335 attrs = {k: v for k, v in xar.attrs.items() if k not in [ 336 'name', 'ntv_type']} 337 for name, meta in attrs.items(): 338 if isinstance(meta, list): 339 xnd += [Xndarray.read_json({name: meta})] 340 else: 341 xnd += [Xndarray(name, meta=meta)] 342 for coord in xar.coords: 343 xnd += [XarrayConnec._var_xr_to_xnd(xar.coords[coord])] 344 if list(xar.coords[coord].dims) == list(xar.dims) and isinstance(xar, xr.Dataset): 345 xnd[-1].links = [list(xar.data_vars)[0]] 346 if isinstance(xar, xr.DataArray): 347 var = XarrayConnec._var_xr_to_xnd( 348 xar, name='data', add_attrs=False) 349 xnd += [XarrayConnec._var_xr_to_xnd(xar, 350 name='data', add_attrs=False)] 351 xdt = Xclass(xnd, xar.attrs.get('name')) 352 for var in xdt.data_vars: 353 if var != xar.name and xar.name: 354 xdt[var].links = [xar.name] 355 return xdt.to_canonical() 356 for var in xar.data_vars: 357 xnd += [XarrayConnec._var_xr_to_xnd(xar.data_vars[var])] 358 return Xclass(xnd, xar.attrs.get('name')).to_canonical() 359 360 @staticmethod 361 def _var_xr_to_xnd(var, name=None, add_attrs=True): 362 '''return a Xndarray from a Xarray variable 363 364 *Parameters* 365 366 - **var** : Xarray variable to convert in Xndarray, 367 - **name** : string (default None) - default name if var have no name, 368 - **add_attrs** : boolean (default True) - if False, attrs are not converted 369 ''' 370 full_name = var.name if var.name else name 371 name = Nutil.split_name(full_name)[0] 372 dims = None if var.dims == (name,) or var.size == 1 else list(var.dims) 373 ntv_type = var.attrs.get('ntv_type') 374 nda = var.values 375 nda = nda.reshape(1) if not nda.shape else nda 376 if nda.dtype.name == 'datetime64[ns]' and ntv_type: 377 nda = Nutil.convert(ntv_type, nda, tojson=False) 378 attrs = {k: v for k, v in var.attrs.items() 379 if k not in ['ntv_type', 'name']} if add_attrs else {} 380 return Xndarray(full_name, Ndarray(nda, ntv_type), dims, attrs) 381 382 @staticmethod 383 def _to_xr_attrs(xdt, **option): 384 '''return a dict with attributes from a Xdataset 385 386 *Parameters* 387 388 - **info** : Boolean if True, add json representation of 'relative' 389 Xndarrays and 'data_arrays' Xndarrays in attrs 390 ''' 391 attrs = {meta: xdt[meta].meta for meta in xdt.metadata} 392 attrs |= {'name': xdt.name} if xdt.name else {} 393 if option['info']: 394 for name in xdt.names: 395 if xdt[name].mode == 'relative': 396 attrs |= xdt[name].to_json(header=False) 397 for name in xdt.data_arrays: 398 attrs |= xdt[name].to_json(header=False) 399 return attrs 400 401 @staticmethod 402 def _to_xr_coord(xdt, name): 403 '''return a dict with Xarray attributes from a Xndarray defined by his name''' 404 data = xdt.to_ndarray(name) 405 if name in xdt.uniques: 406 return {name: data[0]} 407 if name in xdt.additionals and not xdt[name].links: 408 data = data.reshape(xdt.shape_dims(xdt[name].name)) 409 dims = tuple(xdt.dims(name)) if xdt.dims(name) else (xdt[name].name) 410 meta = {'ntv_type': xdt[name].ntv_type} | ( 411 xdt[name].meta if xdt[name].meta else {}) 412 return {name: (dims, data, meta)} 413 414 @staticmethod 415 def _to_xr_vars(xdt, list_names): 416 '''return a dict with Xarray attributes from a list of Xndarray names''' 417 arg_vars = {} 418 valid_names = [ 419 nam for nam in list_names if xdt[nam].mode == 'absolute'] 420 for xnd_name in valid_names: 421 arg_vars |= XarrayConnec._to_xr_coord(xdt, xnd_name) 422 for name in list_names: 423 if xdt[name].xtype == 'meta': 424 arg_vars |= {name: xdt[name].meta} 425 return arg_vars 426 427 @staticmethod 428 def _xr_add_type(xar): 429 '''add 'ntv_type' as attribute for a xr.DataArray''' 430 if isinstance(xar, xr.DataArray) and 'ntv_type' not in xar.attrs: 431 xar.attrs |= {'ntv_type': Nutil.ntv_type(xar.data.dtype.name)} 432 return 433 for coord in xar.coords: 434 XarrayConnec._xr_add_type(coord) 435 for var in xar.data_vars: 436 XarrayConnec._xr_add_type(var) 437 return
Xarray interface with two static methods ximport and xexport
297 @staticmethod 298 def xexport(xdt, **kwargs): 299 '''return a xr.DataArray or a xr.Dataset from a Xdataset 300 301 *Parameters* 302 303 - **dataset** : Boolean (default True) - if False and a single data_var, 304 return a xr.DataArray 305 - **info** : Boolean (default True) - if True, add json representation 306 of 'relative' Xndarrays and 'data_arrays' Xndarrays in attrs 307 ''' 308 option = {'dataset': True, 'info': True} | kwargs 309 coords = XarrayConnec._to_xr_vars( 310 xdt, xdt.dimensions + xdt.coordinates + xdt.uniques) 311 coords |= XarrayConnec._to_xr_vars(xdt, xdt.additionals) 312 attrs = XarrayConnec._to_xr_attrs(xdt, **option) 313 if len(xdt.data_vars) == 1 and not option['dataset']: 314 var_name = xdt.data_vars[0] 315 data = xdt.to_ndarray(var_name) 316 dims = xdt.dims(var_name) 317 attrs |= {'ntv_type': xdt[var_name].nda.ntv_type} 318 attrs |= xdt[var_name].meta if xdt[var_name].meta else {} 319 name = var_name if var_name != 'data' else None 320 xrd = xr.DataArray(data=data, coords=coords, dims=dims, attrs=attrs, 321 name=name) 322 else: 323 data_vars = XarrayConnec._to_xr_vars(xdt, xdt.data_vars) 324 xrd = xr.Dataset(data_vars, coords=coords, attrs=attrs) 325 for unic in xdt.uniques: 326 xrd[unic].attrs |= {'ntv_type': xdt[unic].ntv_type} | ( 327 xdt[unic].meta if xdt[unic].meta else {}) 328 return xrd
return a xr.DataArray or a xr.Dataset from a Xdataset
Parameters
- dataset : Boolean (default True) - if False and a single data_var, return a xr.DataArray
- info : Boolean (default True) - if True, add json representation of 'relative' Xndarrays and 'data_arrays' Xndarrays in attrs
330 @staticmethod 331 def ximport(xar, Xclass, **kwargs): 332 '''return a Xdataset from a xr.DataArray or a xr.Dataset''' 333 xnd = [] 334 if xar.attrs: 335 attrs = {k: v for k, v in xar.attrs.items() if k not in [ 336 'name', 'ntv_type']} 337 for name, meta in attrs.items(): 338 if isinstance(meta, list): 339 xnd += [Xndarray.read_json({name: meta})] 340 else: 341 xnd += [Xndarray(name, meta=meta)] 342 for coord in xar.coords: 343 xnd += [XarrayConnec._var_xr_to_xnd(xar.coords[coord])] 344 if list(xar.coords[coord].dims) == list(xar.dims) and isinstance(xar, xr.Dataset): 345 xnd[-1].links = [list(xar.data_vars)[0]] 346 if isinstance(xar, xr.DataArray): 347 var = XarrayConnec._var_xr_to_xnd( 348 xar, name='data', add_attrs=False) 349 xnd += [XarrayConnec._var_xr_to_xnd(xar, 350 name='data', add_attrs=False)] 351 xdt = Xclass(xnd, xar.attrs.get('name')) 352 for var in xdt.data_vars: 353 if var != xar.name and xar.name: 354 xdt[var].links = [xar.name] 355 return xdt.to_canonical() 356 for var in xar.data_vars: 357 xnd += [XarrayConnec._var_xr_to_xnd(xar.data_vars[var])] 358 return Xclass(xnd, xar.attrs.get('name')).to_canonical()
return a Xdataset from a xr.DataArray or a xr.Dataset
440class ScippConnec: 441 ''' Scipp interface with two static methods ximport and xexport''' 442 443 SCTYPE_DTYPE = {'string': 'str'} 444 445 @staticmethod 446 def xexport(xdt, **kwargs): 447 '''return a sc.DataArray or a sc.Dataset from a xdataset 448 449 *Parameters* 450 451 - **dataset** : Boolean (default True) - if False and a single data_var, 452 return a DataArray 453 - **info** : Boolean (default True) - if True return a DataGroup with 454 metadata and data_arrays 455 - **ntv_type** : Boolean (default True) - if True add ntv-type to the name 456 ''' 457 option = {'dataset': True, 'info': True, 458 'ntv_type': True} | kwargs 459 coords = dict([ScippConnec._to_scipp_var(xdt, name, **option) 460 for name in xdt.coordinates + xdt.dimensions + xdt.uniques 461 if xdt[name].mode == 'absolute']) 462 scd = sc.Dataset(dict([ScippConnec._to_sc_dataarray(xdt, name, coords, **option) 463 for name in xdt.data_vars 464 if xdt[name].mode == 'absolute'])) 465 scd = scd if option['dataset'] else scd[list(scd)[0]] 466 if not option['info']: 467 return scd 468 sc_name = xdt.name if xdt.name else 'no_name' 469 return sc.DataGroup({sc_name: scd} | ScippConnec._to_scipp_grp(xdt, **option)) 470 471 @staticmethod 472 def ximport(sc_obj, Xclass, **kwargs): 473 '''return a xdataset from a scipp object DataArray, Dataset or DataGroup''' 474 xnd = [] 475 scd = sc_obj 476 xnd_name = None 477 if isinstance(sc_obj, sc.DataGroup): 478 for obj in sc_obj: 479 if isinstance(sc_obj[obj], (sc.Dataset, sc.DataArray)): 480 scd = sc_obj[obj] 481 xnd_name = obj 482 break 483 if isinstance(scd, sc.DataArray): 484 scd = sc.Dataset({(scd.name if scd.name else 'no_name'): scd}) 485 if isinstance(scd, sc.Dataset): 486 for coord in scd.coords: 487 xnd += ScippConnec._var_sc_to_xnd( 488 scd.coords[coord], scd, coord) 489 for var in scd: 490 for mask in scd[var].masks: 491 m_var = Nutil.split_json_name(var)[0] 492 xnd += ScippConnec._var_sc_to_xnd( 493 scd[var].masks[mask], scd, mask, m_var) 494 xnd += ScippConnec._var_sc_to_xnd(scd[var].data, scd, var) 495 if isinstance(sc_obj, sc.DataGroup): 496 xnd = ScippConnec._grp_sc_to_xnd(sc_obj, xnd) 497 return Xclass(xnd, xnd_name).to_canonical() 498 499 @staticmethod 500 def _grp_sc_to_xnd(sc_obj, xnd): 501 '''return a list of Xndarray from a scipp variable''' 502 dic_xnd = {xar.name: xar for xar in xnd} 503 for obj in sc_obj: 504 name, add_name = Nutil.split_name(obj) 505 match [name, add_name, sc_obj[obj]]: 506 case [name, None, list()]: 507 xnd += [Xndarray.read_json({name: sc_obj[obj]})] 508 case [name, add_name, sc.Variable()]: 509 xnd += ScippConnec._var_sc_to_xnd( 510 sc_obj[obj], None, add_name, name) 511 case [name, _, dict() | str() | list()] if name in dic_xnd: 512 if dic_xnd[name].meta: 513 dic_xnd[name].meta |= sc_obj[obj] 514 else: 515 dic_xnd[name].meta = sc_obj[obj] 516 case [name, _, dict() | str() | list()]: 517 xnd += [Xndarray.read_json({name: sc_obj[obj]})] 518 case [_, _, _]: ... 519 return xnd 520 521 @staticmethod 522 def _var_sc_to_xnd(scv, scd=None, sc_name='', var=None): 523 '''return a list of Xndarray from a scipp variable 524 - scd : scipp dataset 525 - scv : scipp variable 526 - var : name 527 - sc_name : scipp name''' 528 l_xnda = [] 529 unit = scv.unit.name if scv.unit and scv.unit not in [ 530 'dimensionless', 'ns'] else '' 531 ext_name, typ1 = Nutil.split_json_name(sc_name, True) 532 var_name, typ2 = Nutil.split_json_name(var, True) 533 full_name = var_name + \ 534 ('.' if var_name and ext_name else '') + ext_name 535 ntv_type_base = typ1 + typ2 536 ntv_type = ntv_type_base + ('[' + unit + ']' if unit else '') 537 links = [Nutil.split_json_name(jsn)[0] for jsn in scv.dims] 538 if scd is not None and sc_name in scd.coords and scv.dims == scd.dims: 539 links = [Nutil.split_json_name(list(scd)[0])[0]] 540 if scv.variances is not None: 541 nda = Ndarray(scv.variances, ntv_type_base) 542 l_xnda.append(Xndarray(full_name + '.variance', nda, links)) 543 nda = Ndarray(scv.values, ntv_type, str_uri=False) 544 shape = scv.shape if scv.shape else (1,) 545 nda.set_shape(shape) 546 l_xnda.append(Xndarray(full_name, nda, links)) 547 return l_xnda 548 549 @staticmethod 550 def _to_sc_dataarray(xdt, name, coords, **option): 551 '''return a scipp.DataArray from a xdataset.global_var defined by his name''' 552 scipp_name, data = ScippConnec._to_scipp_var(xdt, name, **option) 553 masks = dict([ScippConnec._to_scipp_var(xdt, nam, **option) 554 for nam in set(xdt.group(name)) & set(xdt.masks)]) 555 return (scipp_name, sc.DataArray(data, coords=coords, masks=masks)) 556 557 @staticmethod 558 def _to_scipp_grp(xdt, **option): 559 '''return a dict with metadata, data-array and data_add from a xdataset''' 560 grp = {} 561 grp |= dict([ScippConnec._to_scipp_var(xdt, name, **option) 562 for name in xdt.data_add + xdt.data_arrays 563 if xdt[name].add_name != 'variance']) 564 opt_mask = option | {'grp_mask': True} 565 grp |= dict([ScippConnec._to_scipp_var(xdt, name, **opt_mask) 566 for name in xdt.masks 567 if xdt[name].name in xdt.names and xdt[name].name not in xdt.data_vars]) 568 grp |= {name + '.meta': xdt[name].meta for name in xdt.names 569 if xdt[name].meta} 570 for name in xdt.names: 571 if xdt[name].mode == 'relative': 572 grp |= xdt[name].to_json(header=False) 573 return grp 574 575 @staticmethod 576 def _to_scipp_var(xdt, name, **kwargs): 577 '''return a scipp.Variable from a Xndarray defined by his name''' 578 option = {'grp_mask': False, 'ntv_type': True} | kwargs 579 simple_type, unit = Nutil.split_type(xdt[name].ntv_type) 580 unit = unit if unit else '' 581 add_name = Nutil.split_name(name)[1] 582 new_n = add_name if name in xdt.masks and not option['grp_mask'] else name 583 opt_n = option['ntv_type'] 584 scipp_name = new_n + (':' + simple_type if opt_n else '') 585 if name in xdt.uniques: 586 return (scipp_name, sc.scalar(xdt[name].darray[0], unit=unit)) 587 vari_name = name + '.variance' 588 variances = xdt[vari_name].darray if vari_name in xdt.names else None 589 dims = xdt.dims(name, opt_n) if xdt.dims( 590 name, opt_n) else [xdt[name].name] 591 var = sc.array(dims=['flat'], values=xdt.to_darray( 592 name), variances=variances, unit=unit) 593 var = sc.fold(var, dim='flat', sizes=dict(zip(dims, xdt[name].shape))) 594 return (scipp_name, var)
Scipp interface with two static methods ximport and xexport
445 @staticmethod 446 def xexport(xdt, **kwargs): 447 '''return a sc.DataArray or a sc.Dataset from a xdataset 448 449 *Parameters* 450 451 - **dataset** : Boolean (default True) - if False and a single data_var, 452 return a DataArray 453 - **info** : Boolean (default True) - if True return a DataGroup with 454 metadata and data_arrays 455 - **ntv_type** : Boolean (default True) - if True add ntv-type to the name 456 ''' 457 option = {'dataset': True, 'info': True, 458 'ntv_type': True} | kwargs 459 coords = dict([ScippConnec._to_scipp_var(xdt, name, **option) 460 for name in xdt.coordinates + xdt.dimensions + xdt.uniques 461 if xdt[name].mode == 'absolute']) 462 scd = sc.Dataset(dict([ScippConnec._to_sc_dataarray(xdt, name, coords, **option) 463 for name in xdt.data_vars 464 if xdt[name].mode == 'absolute'])) 465 scd = scd if option['dataset'] else scd[list(scd)[0]] 466 if not option['info']: 467 return scd 468 sc_name = xdt.name if xdt.name else 'no_name' 469 return sc.DataGroup({sc_name: scd} | ScippConnec._to_scipp_grp(xdt, **option))
return a sc.DataArray or a sc.Dataset from a xdataset
Parameters
- dataset : Boolean (default True) - if False and a single data_var, return a DataArray
- info : Boolean (default True) - if True return a DataGroup with metadata and data_arrays
- ntv_type : Boolean (default True) - if True add ntv-type to the name
471 @staticmethod 472 def ximport(sc_obj, Xclass, **kwargs): 473 '''return a xdataset from a scipp object DataArray, Dataset or DataGroup''' 474 xnd = [] 475 scd = sc_obj 476 xnd_name = None 477 if isinstance(sc_obj, sc.DataGroup): 478 for obj in sc_obj: 479 if isinstance(sc_obj[obj], (sc.Dataset, sc.DataArray)): 480 scd = sc_obj[obj] 481 xnd_name = obj 482 break 483 if isinstance(scd, sc.DataArray): 484 scd = sc.Dataset({(scd.name if scd.name else 'no_name'): scd}) 485 if isinstance(scd, sc.Dataset): 486 for coord in scd.coords: 487 xnd += ScippConnec._var_sc_to_xnd( 488 scd.coords[coord], scd, coord) 489 for var in scd: 490 for mask in scd[var].masks: 491 m_var = Nutil.split_json_name(var)[0] 492 xnd += ScippConnec._var_sc_to_xnd( 493 scd[var].masks[mask], scd, mask, m_var) 494 xnd += ScippConnec._var_sc_to_xnd(scd[var].data, scd, var) 495 if isinstance(sc_obj, sc.DataGroup): 496 xnd = ScippConnec._grp_sc_to_xnd(sc_obj, xnd) 497 return Xclass(xnd, xnd_name).to_canonical()
return a xdataset from a scipp object DataArray, Dataset or DataGroup