ntv-numpy.ntv_numpy.xconnector
@author: Philippe@loco-labs.io
The xconnector
module is part of the ntv-numpy.ntv_numpy
package (specification document).
It contains interface classes with two static methods ximport
and xexport
:
XarrayConnec
class for Xarray Dataset or DataArray,AstropyNDDataConnec
class for Astropy NDData,ScippConnec
class for Scipp Dataset or DataArray,PandasConnec
class for pandas dataFrame,
For more information, see the user guide or the github repository.
1# -*- coding: utf-8 -*- 2""" 3@author: Philippe@loco-labs.io 4 5The `xconnector` module is part of the `ntv-numpy.ntv_numpy` package ([specification document]( 6https://loco-philippe.github.io/ES/JSON%20semantic%20format%20(JSON-NTV).htm)). 7 8It contains interface classes with two static methods `ximport` and `xexport`: 9- `XarrayConnec` class for Xarray Dataset or DataArray, 10- `AstropyNDDataConnec` class for Astropy NDData, 11- `ScippConnec` class for Scipp Dataset or DataArray, 12- `PandasConnec` class for pandas dataFrame, 13 14 15For more information, see the 16[user guide](https://loco-philippe.github.io/ntv-numpy/docs/user_guide.html) 17 or the [github repository](https://github.com/loco-philippe/ntv-numpy). 18""" 19 20 21import xarray as xr 22import scipp as sc 23import pandas as pd 24import numpy as np 25from astropy import wcs 26from astropy.nddata import NDData 27from astropy.nddata.nduncertainty import StdDevUncertainty, VarianceUncertainty 28from astropy.nddata.nduncertainty import InverseVariance 29from ntv_numpy.ndarray import Nutil, Ndarray 30from ntv_numpy.xndarray import Xndarray 31 32 33class AstropyNDDataConnec: 34 ''' NDData interface with two static methods ximport and xexport''' 35 36 @staticmethod 37 def xexport(xdt, **kwargs): 38 '''return a NDData from a Xdataset''' 39 data = xdt['data'].ndarray 40 mask = xdt['data.mask'].ndarray 41 unit = xdt['data'].nda.ntvtype.extension 42 uncert = xdt['data.uncertainty'].ndarray 43 typ_u = xdt['data.uncertainty'].nda.ntvtype.extension 44 match typ_u: 45 case 'std': 46 uncertainty = StdDevUncertainty(uncert) 47 case 'var': 48 uncertainty = VarianceUncertainty(uncert) 49 case 'inv': 50 uncertainty = InverseVariance(uncert) 51 case _: 52 uncertainty = uncert 53 meta = xdt['meta'].meta | {'name': xdt.name} 54 wcs_dic = xdt['wcs'].meta 55 psf = xdt['psf'].ndarray 56 return NDData(data, mask=mask, unit=unit, uncertainty=uncertainty, 57 meta=meta, wcs=wcs.WCS(wcs_dic), psf=psf) 58 59 @staticmethod 60 def ximport(ndd, Xclass, **kwargs): 61 '''return a Xdataset from a astropy.NDData''' 62 xnd = [] 63 name = 'no_name' 64 unit = ndd.unit.to_string() if not ndd.unit is None else None 65 ntv_type = Nutil.ntv_type(ndd.data.dtype.name, ext=unit) 66 xnd += [Xndarray('data', nda=Ndarray(ndd.data, ntv_type=ntv_type))] 67 if ndd.meta: 68 meta = {key: val for key, val in ndd.meta.items() if key != 'name'} 69 name = ndd.meta.get('name', 'no_name') 70 xnd += [Xndarray('meta', meta=meta)] 71 if ndd.wcs: 72 xnd += [Xndarray('wcs', meta=dict(ndd.wcs.to_header()))] 73 if not ndd.psf is None: 74 xnd += [Xndarray('psf', nda=Ndarray(ndd.psf, ntv_type=ntv_type))] 75 if not ndd.mask is None: 76 xnd += [Xndarray('data.mask', nda=ndd.mask)] 77 if not ndd.uncertainty is None: 78 typ_u = ndd.uncertainty.__class__.__name__[:3].lower() 79 ntv_type = Nutil.ntv_type( 80 ndd.uncertainty.array.dtype.name, ext=typ_u) 81 nda = Ndarray(ndd.uncertainty.array, ntv_type=ntv_type) 82 xnd += [Xndarray('data.uncertainty', nda=nda)] 83 return Xclass(xnd, name).to_canonical() 84 85 86class PandasConnec: 87 ''' pandas.DataFrame interface with two static methods ximport and xexport''' 88 89 @staticmethod 90 def xexport(xdt, **kwargs): 91 '''return a pd.DataFrame from a Xdataset 92 93 *Parameters* 94 95 - **json_name**: Boolean (default True) - if False use full_name else json_name 96 - **info**: Boolean (default True) - if True add xdt.info in DataFrame.attrs 97 - **dims**: list of string (default None) - order of dimensions full_name to apply 98 ''' 99 opt = {'json_name': True, 'info': True, 'dims': None} | kwargs 100 dic_name = {name: xdt[name].json_name if opt['json_name'] else xdt[name].full_name 101 for name in xdt.names} 102 dims = xdt.dimensions if not opt['dims'] else tuple(opt['dims']) 103 fields = (xdt.group(dims) + xdt.group(xdt.coordinates) + 104 xdt.group(xdt.data_vars) + xdt.uniques) 105 fields += tuple(nam for nam in xdt.group(xdt.data_arrays) 106 if len(xdt[nam]) == xdt.length) 107 fields_array = tuple(var for var in fields if not xdt[var].uri) 108 dic_series = {dic_name[name]: PandasConnec._to_np_series(xdt, name, dims) 109 for name in fields_array} 110 dfr = pd.DataFrame(dic_series) 111 index = [dic_name[name] for name in dims] 112 if index: 113 dfr = dfr.set_index(index) 114 if opt['info']: 115 dfr.attrs |= {'info': xdt.tab_info} 116 dfr.attrs |= {'metadata': { 117 name: xdt[name].meta for name in xdt.metadata}} 118 fields_uri = [var for var in fields if not var in fields_array] 119 fields_other = [nam for nam in xdt.group(xdt.data_arrays) 120 if len(xdt[nam]) != xdt.length] 121 if fields_uri: 122 dfr.attrs |= {'fields': {nam: xdt[nam].to_json(noname=True,) 123 for nam in fields_uri + fields_other}} 124 if xdt.name: 125 dfr.attrs |= {'name': xdt.name} 126 return dfr 127 128 @staticmethod 129 def ximport(df, Xclass, **kwargs): 130 '''return a Xdataset from a pd.DataFrame 131 132 *Parameters* 133 134 - dims: list of string (default None) - order of dimensions to apply 135 ''' 136 opt = {'dims': None} | kwargs 137 xnd = [] 138 dfr = df.reset_index() 139 if 'index' in dfr.columns and not 'index' in df.columns: 140 del dfr['index'] 141 df_names = {Nutil.split_json_name(j_name)[0]: j_name 142 for j_name in dfr.columns} 143 df_ntv_types = {Nutil.split_json_name(j_name)[0]: 144 Nutil.split_json_name(j_name)[1] for j_name in dfr.columns} 145 dfr.columns = [Nutil.split_json_name(name)[0] for name in dfr.columns] 146 if dfr.attrs.get('metadata'): 147 for name, meta in dfr.attrs['metadata'].items(): 148 xnd += [Xndarray.read_json({name: meta})] 149 if dfr.attrs.get('fields'): 150 for name, jsn in dfr.attrs['fields'].items(): 151 xnd += [Xndarray.read_json({name: jsn})] 152 if dfr.attrs.get('info'): 153 dimensions = dfr.attrs['info']['dimensions'] 154 data = dfr.attrs['info']['data'] 155 else: 156 dimensions, data = PandasConnec._ximport_analysis(dfr, opt['dims']) 157 shape_dfr = [data[dim]['shape'][0] 158 for dim in dimensions] if dimensions else len(dfr) 159 dfr = dfr.sort_values(dimensions) 160 for name in df_names: 161 xnd += [PandasConnec._ximport_series(data, name, dfr, dimensions, 162 shape_dfr, df_ntv_types, **opt)] 163 return Xclass(xnd, dfr.attrs.get('name')).to_canonical() 164 165 @staticmethod 166 def _ximport_analysis(dfr, opt_dims): 167 '''return data and dimensions from analysis module 168 - opt_dims: partition to apply 169 - dfr: dataframe to analyse''' 170 dfr_idx = list(dfr.index.names) 171 opt_dims = dfr_idx if dfr_idx != [None] else opt_dims 172 ana = dfr.npd.analysis(distr=True) 173 partition = ana.field_partition(partition=opt_dims, mode='id') 174 part_rel = ana.relation_partition(partition=opt_dims, noroot=True) 175 part_dim = ana.relation_partition( 176 partition=opt_dims, noroot=True, primary=True) 177 dimensions = partition['primary'] 178 len_fields = {fld.idfield: fld.lencodec for fld in ana.fields} 179 data = {fld.idfield: { 180 'shape': [len_fields[dim] for dim in part_dim[fld.idfield]] if part_dim else [], 181 'links': part_rel[fld.idfield] if part_rel else []} for fld in ana.fields} 182 for json_name in data: 183 if not data[json_name]['shape']: 184 name = Nutil.split_name(Nutil.split_json_name(json_name)[0])[0] 185 p_name = [js_name for js_name in data 186 if Nutil.split_json_name(js_name)[0] == name][0] 187 data[json_name]['shape'] = data[p_name]['shape'] 188 return (dimensions, data) 189 190 @staticmethod 191 def _ximport_series(data, name, dfr, dimensions, shape_dfr, df_ntv_types, **opt): 192 '''return a Xndarray from a Series of a pd.DataFrame''' 193 if data[name].get('xtype') == 'meta': # or len(dfr[name].unique()) == 1: 194 return Xndarray(name, meta=dfr[name].iloc[0]) 195 meta = data[name].get('meta') 196 ntv_type = df_ntv_types[name] 197 if len(dfr[name].unique()) == 1: 198 nda = Ndarray(np.array(dfr[name].iloc[0]), 199 ntv_type=ntv_type, str_uri=False) 200 nda.set_shape([1]) 201 return Xndarray(name, nda=nda, meta=meta) 202 if not dimensions: 203 nda = Ndarray(np.array(dfr[name]), ntv_type=ntv_type) 204 return Xndarray(name, nda=nda, meta=meta) 205 dims = [] 206 PandasConnec._get_dims(dims, name, data, dimensions) 207 if not dims: 208 p_name, add_name = Nutil.split_name(name) 209 if add_name: 210 PandasConnec._get_dims(dims, p_name, data, dimensions) 211 np_array = PandasConnec._from_series(dfr, name, shape_dfr, 212 dimensions, dims, opt['dims']) 213 shape = data[name].get('shape', [len(dfr)]) 214 nda = Ndarray(np_array, ntv_type, shape) 215 links = data[name].get('links') 216 return Xndarray(name, nda=nda, links=links if links else dims, meta=meta) 217 218 @staticmethod 219 def _to_np_series(xdt, name, dims): 220 '''return a np.ndarray from the Xndarray of xdt defined by his name 221 222 *parameters* 223 224 - **xdt**: Xdataset - data to convert in a pd.DataFrame 225 - **name**: string - full_name of the Xndarray to convert 226 - **dims**: list of string - order of dimensions full_name to apply''' 227 if name in xdt.uniques: 228 return np.array([xdt[name].darray[0]] * xdt.length) 229 if xdt[name].shape == [xdt.length]: 230 return xdt[name].darray 231 n_shape = {nam: len(xdt[nam]) for nam in dims} 232 dim_name = xdt.dims(name) 233 if not set(dim_name) <= set(dims): 234 return None 235 add_name = [nam for nam in dims if not nam in dim_name] 236 tab_name = add_name + dim_name 237 238 til = 1 239 for nam in add_name: 240 til *= n_shape[nam] 241 shap = [n_shape[nam] for nam in tab_name] 242 order = [dims.index(nam) for nam in tab_name] 243 arr = xdt[name].darray 244 return Nutil.extend_array(arr, til, shap, order) 245 246 @staticmethod 247 def _from_series(dfr, name, shape, dims, links, new_dims=None): 248 '''return a flattened np.ndarray from the pd.Series of dfr defined by his name 249 250 *parameters* 251 252 - dfr: DataFrame - data to convert in Xdataset 253 - name: string - name of the Series (full_name or json_name) 254 - shape: shape of the Xdataset 255 - dims: list of string - list of name of dimensions 256 - links: list of string - list of linked Series 257 - new_dims: list of string (default None) - new order of dims 258 ''' 259 if not links: 260 return np.array(dfr[name]) 261 old_order = list(range(len(dims))) 262 new_dims = new_dims if new_dims else dims 263 order = [dims.index(dim) 264 for dim in new_dims] if new_dims else old_order 265 idx = [0] * len(dims) 266 for nam in links: 267 idx[new_dims.index(nam)] = slice(shape[dims.index(nam)]) 268 xar = np.moveaxis(np.array(dfr[name]).reshape( 269 shape), old_order, order)[*idx] 270 if not links: 271 return xar.flatten() 272 lnk = [nam for nam in new_dims if nam in links] 273 shape_lnk = [shape[dims.index(nam)] for nam in lnk] 274 xar = xar.reshape(shape_lnk) 275 old_order = list(range(len(links))) 276 order = [lnk.index(dim) for dim in links] 277 return np.moveaxis(xar, old_order, order).flatten() 278 279 @staticmethod 280 def _get_dims(dims, name, data, dimensions): 281 '''add names of dimensions into dims''' 282 if not name: 283 return 284 if name in dimensions: 285 dims += [name] 286 else: 287 if not 'links' in data[name]: 288 return 289 for nam in data[name]['links']: 290 PandasConnec._get_dims(dims, nam, data, dimensions) 291 292 293class XarrayConnec: 294 ''' Xarray interface with two static methods ximport and xexport''' 295 296 @staticmethod 297 def xexport(xdt, **kwargs): 298 '''return a xr.DataArray or a xr.Dataset from a Xdataset 299 300 *Parameters* 301 302 - **dataset** : Boolean (default True) - if False and a single data_var, 303 return a sc.DataArray 304 - **datagroup** : Boolean (default True) - if True, return a sc.DataGroup 305 which contains the sc.DataArray/sc.Dataset and the other data else only 306 sc.DataArray/sc.Dataset 307 ''' 308 option = {'dataset': True, 'datagroup': True} | kwargs 309 coords = XarrayConnec._to_xr_vars( 310 xdt, xdt.dimensions + xdt.coordinates + xdt.uniques) 311 coords |= XarrayConnec._to_xr_vars(xdt, xdt.additionals) 312 attrs = XarrayConnec._to_xr_attrs(xdt, **option) 313 if len(xdt.data_vars) == 1 and not option['dataset']: 314 var_name = xdt.data_vars[0] 315 data = xdt.to_ndarray(var_name) 316 dims = xdt.dims(var_name) 317 attrs |= {'ntv_type': xdt[var_name].nda.ntv_type} 318 attrs |= xdt[var_name].meta if xdt[var_name].meta else {} 319 name = var_name if var_name != 'data' else None 320 xrd = xr.DataArray(data=data, coords=coords, dims=dims, attrs=attrs, 321 name=name) 322 else: 323 data_vars = XarrayConnec._to_xr_vars(xdt, xdt.data_vars) 324 xrd = xr.Dataset(data_vars, coords=coords, attrs=attrs) 325 for unic in xdt.uniques: 326 xrd[unic].attrs |= {'ntv_type': xdt[unic].ntv_type} | ( 327 xdt[unic].meta if xdt[unic].meta else {}) 328 return xrd 329 330 @staticmethod 331 def ximport(xar, Xclass, **kwargs): 332 '''return a Xdataset from a xr.DataArray or a xr.Dataset''' 333 xnd = [] 334 if xar.attrs: 335 attrs = {k: v for k, v in xar.attrs.items() if not k in [ 336 'name', 'ntv_type']} 337 for name, meta in attrs.items(): 338 if isinstance(meta, list): 339 xnd += [Xndarray.read_json({name: meta})] 340 else: 341 xnd += [Xndarray(name, meta=meta)] 342 for coord in xar.coords: 343 xnd += [XarrayConnec._var_xr_to_xnd(xar.coords[coord])] 344 if list(xar.coords[coord].dims) == list(xar.dims) and isinstance(xar, xr.Dataset): 345 xnd[-1].links = [list(xar.data_vars)[0]] 346 if isinstance(xar, xr.DataArray): 347 var = XarrayConnec._var_xr_to_xnd( 348 xar, name='data', add_attrs=False) 349 xnd += [XarrayConnec._var_xr_to_xnd(xar, 350 name='data', add_attrs=False)] 351 xdt = Xclass(xnd, xar.attrs.get('name')) 352 for var in xdt.data_vars: 353 if var != xar.name and xar.name: 354 xdt[var].links = [xar.name] 355 return xdt.to_canonical() 356 for var in xar.data_vars: 357 xnd += [XarrayConnec._var_xr_to_xnd(xar.data_vars[var])] 358 return Xclass(xnd, xar.attrs.get('name')).to_canonical() 359 360 @staticmethod 361 def _var_xr_to_xnd(var, name=None, add_attrs=True): 362 '''return a Xndarray from a Xarray variable 363 364 *Parameters* 365 366 - **var** : Xarray variable to convert in Xndarray, 367 - **name** : string (default None) - default name if var have no name, 368 - **add_attrs** : boolean (default True) - if False, attrs are not converted 369 ''' 370 full_name = var.name if var.name else name 371 name = Nutil.split_name(full_name)[0] 372 dims = None if var.dims == (name,) or var.size == 1 else list(var.dims) 373 ntv_type = var.attrs.get('ntv_type') 374 nda = var.values 375 nda = nda.reshape(1) if not nda.shape else nda 376 if nda.dtype.name == 'datetime64[ns]' and ntv_type: 377 nda = Nutil.convert(ntv_type, nda, tojson=False) 378 attrs = {k: v for k, v in var.attrs.items() 379 if not k in ['ntv_type', 'name']} if add_attrs else {} 380 return Xndarray(full_name, Ndarray(nda, ntv_type), dims, attrs) 381 382 @staticmethod 383 def _to_xr_attrs(xdt, **option): 384 '''return a dict with attributes from a Xdataset 385 386 *Parameters* 387 388 - **datagroup** : Boolean if True, add json representation of 'relative' 389 Xndarrays and 'data_arrays' Xndarrays 390 ''' 391 attrs = {meta: xdt[meta].meta for meta in xdt.metadata} 392 attrs |= {'name': xdt.name} if xdt.name else {} 393 if option['datagroup']: 394 for name in xdt.names: 395 if xdt[name].mode == 'relative': 396 attrs |= xdt[name].to_json(header=False) 397 for name in xdt.data_arrays: 398 attrs |= xdt[name].to_json(header=False) 399 return attrs 400 401 @staticmethod 402 def _to_xr_coord(xdt, name): 403 '''return a dict with Xarray attributes from a Xndarray defined by his name''' 404 data = xdt.to_ndarray(name) 405 if name in xdt.uniques: 406 return {name: data[0]} 407 if name in xdt.additionals and not xdt[name].links: 408 data = data.reshape(xdt.shape_dims(xdt[name].name)) 409 dims = tuple(xdt.dims(name)) if xdt.dims(name) else (xdt[name].name) 410 meta = {'ntv_type': xdt[name].ntv_type} | ( 411 xdt[name].meta if xdt[name].meta else {}) 412 return {name: (dims, data, meta)} 413 414 @staticmethod 415 def _to_xr_vars(xdt, list_names): 416 '''return a dict with Xarray attributes from a list of Xndarray names''' 417 arg_vars = {} 418 valid_names = [ 419 nam for nam in list_names if xdt[nam].mode == 'absolute'] 420 for xnd_name in valid_names: 421 arg_vars |= XarrayConnec._to_xr_coord(xdt, xnd_name) 422 for name in list_names: 423 if xdt[name].xtype == 'meta': 424 arg_vars |= {name: xdt[name].meta} 425 return arg_vars 426 427 @staticmethod 428 def _xr_add_type(xar): 429 '''add 'ntv_type' as attribute for a xr.DataArray''' 430 if isinstance(xar, xr.DataArray) and not 'ntv_type' in xar.attrs: 431 xar.attrs |= {'ntv_type': Nutil.ntv_type(xar.data.dtype.name)} 432 return 433 for coord in xar.coords: 434 XarrayConnec._xr_add_type(coord) 435 for var in xar.data_vars: 436 XarrayConnec._xr_add_type(var) 437 return 438 439 440class ScippConnec: 441 ''' Scipp interface with two static methods ximport and xexport''' 442 443 SCTYPE_DTYPE = {'string': 'str'} 444 445 @staticmethod 446 def xexport(xdt, **kwargs): 447 '''return a sc.DataArray or a sc.Dataset from a xdataset 448 449 *Parameters* 450 451 - **dataset** : Boolean (default True) - if False and a single data_var, 452 return a DataArray 453 - **datagroup** : Boolean (default True) - if True return a DataGroup with 454 metadata and data_arrays 455 - **ntv_type** : Boolean (default True) - if True add ntv-type to the name 456 ''' 457 option = {'dataset': True, 'datagroup': True, 458 'ntv_type': True} | kwargs 459 coords = dict([ScippConnec._to_scipp_var(xdt, name, **option) 460 for name in xdt.coordinates + xdt.dimensions + xdt.uniques 461 if xdt[name].mode == 'absolute']) 462 scd = sc.Dataset(dict([ScippConnec._to_sc_dataarray(xdt, name, coords, **option) 463 for name in xdt.data_vars 464 if xdt[name].mode == 'absolute'])) 465 scd = scd if option['dataset'] else scd[list(scd)[0]] 466 if not option['datagroup']: 467 return scd 468 sc_name = xdt.name if xdt.name else 'no_name' 469 return sc.DataGroup({sc_name: scd} | ScippConnec._to_scipp_grp(xdt, **option)) 470 471 @staticmethod 472 def ximport(sc_obj, Xclass, **kwargs): 473 '''return a xdataset from a scipp object DataArray, Dataset or DataGroup''' 474 xnd = [] 475 scd = sc_obj 476 xnd_name = None 477 if isinstance(sc_obj, sc.DataGroup): 478 for obj in sc_obj: 479 if isinstance(sc_obj[obj], (sc.Dataset, sc.DataArray)): 480 scd = sc_obj[obj] 481 xnd_name = obj 482 break 483 if isinstance(scd, sc.DataArray): 484 scd = sc.Dataset({(scd.name if scd.name else 'no_name'): scd}) 485 if isinstance(scd, sc.Dataset): 486 for coord in scd.coords: 487 xnd += ScippConnec._var_sc_to_xnd( 488 scd.coords[coord], scd, coord) 489 for var in scd: 490 for mask in scd[var].masks: 491 m_var = Nutil.split_json_name(var)[0] 492 xnd += ScippConnec._var_sc_to_xnd( 493 scd[var].masks[mask], scd, mask, m_var) 494 xnd += ScippConnec._var_sc_to_xnd(scd[var].data, scd, var) 495 if isinstance(sc_obj, sc.DataGroup): 496 xnd = ScippConnec._grp_sc_to_xnd(sc_obj, xnd) 497 return Xclass(xnd, xnd_name).to_canonical() 498 499 @staticmethod 500 def _grp_sc_to_xnd(sc_obj, xnd): 501 '''return a list of Xndarray from a scipp variable''' 502 dic_xnd = {xar.name: xar for xar in xnd} 503 for obj in sc_obj: 504 name, add_name = Nutil.split_name(obj) 505 match [name, add_name, sc_obj[obj]]: 506 case [name, None, list()]: 507 xnd += [Xndarray.read_json({name: sc_obj[obj]})] 508 case [name, add_name, sc.Variable()]: 509 xnd += ScippConnec._var_sc_to_xnd( 510 sc_obj[obj], None, add_name, name) 511 case [name, _, dict() | str() | list()] if name in dic_xnd: 512 if dic_xnd[name].meta: 513 dic_xnd[name].meta |= sc_obj[obj] 514 else: 515 dic_xnd[name].meta = sc_obj[obj] 516 case [name, _, dict() | str() | list()]: 517 xnd += [Xndarray.read_json({name: sc_obj[obj]})] 518 case [_, _, _]: ... 519 return xnd 520 521 @staticmethod 522 def _var_sc_to_xnd(scv, scd=None, sc_name='', var=None): 523 '''return a list of Xndarray from a scipp variable 524 - scd : scipp dataset 525 - scv : scipp variable 526 - var : name 527 - sc_name : scipp name''' 528 l_xnda = [] 529 unit = scv.unit.name if scv.unit and not scv.unit in [ 530 'dimensionless', 'ns'] else '' 531 ext_name, typ1 = Nutil.split_json_name(sc_name, True) 532 var_name, typ2 = Nutil.split_json_name(var, True) 533 full_name = var_name + \ 534 ('.' if var_name and ext_name else '') + ext_name 535 ntv_type_base = typ1 + typ2 536 ntv_type = ntv_type_base + ('[' + unit + ']' if unit else '') 537 links = [Nutil.split_json_name(jsn)[0] for jsn in scv.dims] 538 if not scd is None and sc_name in scd.coords and scv.dims == scd.dims: 539 links = [Nutil.split_json_name(list(scd)[0])[0]] 540 if not scv.variances is None: 541 nda = Ndarray(scv.variances, ntv_type_base) 542 l_xnda.append(Xndarray(full_name + '.variance', nda, links)) 543 nda = Ndarray(scv.values, ntv_type, str_uri=False) 544 shape = scv.shape if scv.shape else (1,) 545 nda.set_shape(shape) 546 l_xnda.append(Xndarray(full_name, nda, links)) 547 return l_xnda 548 549 @staticmethod 550 def _to_sc_dataarray(xdt, name, coords, **option): 551 '''return a scipp.DataArray from a xdataset.global_var defined by his name''' 552 scipp_name, data = ScippConnec._to_scipp_var(xdt, name, **option) 553 masks = dict([ScippConnec._to_scipp_var(xdt, nam, **option) 554 for nam in set(xdt.group(name)) & set(xdt.masks)]) 555 return (scipp_name, sc.DataArray(data, coords=coords, masks=masks)) 556 557 @staticmethod 558 def _to_scipp_grp(xdt, **option): 559 '''return a dict with metadata, data-array and data_add from a xdataset''' 560 grp = {} 561 grp |= dict([ScippConnec._to_scipp_var(xdt, name, **option) 562 for name in xdt.data_add + xdt.data_arrays 563 if xdt[name].add_name != 'variance']) 564 opt_mask = option | {'grp_mask': True} 565 grp |= dict([ScippConnec._to_scipp_var(xdt, name, **opt_mask) 566 for name in xdt.masks 567 if xdt[name].name in xdt.names and not xdt[name].name in xdt.data_vars]) 568 grp |= {name + '.meta': xdt[name].meta for name in xdt.names 569 if xdt[name].meta} 570 for name in xdt.names: 571 if xdt[name].mode == 'relative': 572 grp |= xdt[name].to_json(header=False) 573 return grp 574 575 @staticmethod 576 def _to_scipp_var(xdt, name, **kwargs): 577 '''return a scipp.Variable from a Xndarray defined by his name''' 578 option = {'grp_mask': False, 'ntv_type': True} | kwargs 579 simple_type, unit = Nutil.split_type(xdt[name].ntv_type) 580 unit = unit if unit else '' 581 add_name = Nutil.split_name(name)[1] 582 new_n = add_name if name in xdt.masks and not option['grp_mask'] else name 583 opt_n = option['ntv_type'] 584 scipp_name = new_n + (':' + simple_type if opt_n else '') 585 if name in xdt.uniques: 586 return (scipp_name, sc.scalar(xdt[name].darray[0], unit=unit)) 587 vari_name = name + '.variance' 588 variances = xdt[vari_name].darray if vari_name in xdt.names else None 589 dims = xdt.dims(name, opt_n) if xdt.dims( 590 name, opt_n) else [xdt[name].name] 591 var = sc.array(dims=['flat'], values=xdt.to_darray( 592 name), variances=variances, unit=unit) 593 var = sc.fold(var, dim='flat', sizes=dict(zip(dims, xdt[name].shape))) 594 return (scipp_name, var)
34class AstropyNDDataConnec: 35 ''' NDData interface with two static methods ximport and xexport''' 36 37 @staticmethod 38 def xexport(xdt, **kwargs): 39 '''return a NDData from a Xdataset''' 40 data = xdt['data'].ndarray 41 mask = xdt['data.mask'].ndarray 42 unit = xdt['data'].nda.ntvtype.extension 43 uncert = xdt['data.uncertainty'].ndarray 44 typ_u = xdt['data.uncertainty'].nda.ntvtype.extension 45 match typ_u: 46 case 'std': 47 uncertainty = StdDevUncertainty(uncert) 48 case 'var': 49 uncertainty = VarianceUncertainty(uncert) 50 case 'inv': 51 uncertainty = InverseVariance(uncert) 52 case _: 53 uncertainty = uncert 54 meta = xdt['meta'].meta | {'name': xdt.name} 55 wcs_dic = xdt['wcs'].meta 56 psf = xdt['psf'].ndarray 57 return NDData(data, mask=mask, unit=unit, uncertainty=uncertainty, 58 meta=meta, wcs=wcs.WCS(wcs_dic), psf=psf) 59 60 @staticmethod 61 def ximport(ndd, Xclass, **kwargs): 62 '''return a Xdataset from a astropy.NDData''' 63 xnd = [] 64 name = 'no_name' 65 unit = ndd.unit.to_string() if not ndd.unit is None else None 66 ntv_type = Nutil.ntv_type(ndd.data.dtype.name, ext=unit) 67 xnd += [Xndarray('data', nda=Ndarray(ndd.data, ntv_type=ntv_type))] 68 if ndd.meta: 69 meta = {key: val for key, val in ndd.meta.items() if key != 'name'} 70 name = ndd.meta.get('name', 'no_name') 71 xnd += [Xndarray('meta', meta=meta)] 72 if ndd.wcs: 73 xnd += [Xndarray('wcs', meta=dict(ndd.wcs.to_header()))] 74 if not ndd.psf is None: 75 xnd += [Xndarray('psf', nda=Ndarray(ndd.psf, ntv_type=ntv_type))] 76 if not ndd.mask is None: 77 xnd += [Xndarray('data.mask', nda=ndd.mask)] 78 if not ndd.uncertainty is None: 79 typ_u = ndd.uncertainty.__class__.__name__[:3].lower() 80 ntv_type = Nutil.ntv_type( 81 ndd.uncertainty.array.dtype.name, ext=typ_u) 82 nda = Ndarray(ndd.uncertainty.array, ntv_type=ntv_type) 83 xnd += [Xndarray('data.uncertainty', nda=nda)] 84 return Xclass(xnd, name).to_canonical()
NDData interface with two static methods ximport and xexport
37 @staticmethod 38 def xexport(xdt, **kwargs): 39 '''return a NDData from a Xdataset''' 40 data = xdt['data'].ndarray 41 mask = xdt['data.mask'].ndarray 42 unit = xdt['data'].nda.ntvtype.extension 43 uncert = xdt['data.uncertainty'].ndarray 44 typ_u = xdt['data.uncertainty'].nda.ntvtype.extension 45 match typ_u: 46 case 'std': 47 uncertainty = StdDevUncertainty(uncert) 48 case 'var': 49 uncertainty = VarianceUncertainty(uncert) 50 case 'inv': 51 uncertainty = InverseVariance(uncert) 52 case _: 53 uncertainty = uncert 54 meta = xdt['meta'].meta | {'name': xdt.name} 55 wcs_dic = xdt['wcs'].meta 56 psf = xdt['psf'].ndarray 57 return NDData(data, mask=mask, unit=unit, uncertainty=uncertainty, 58 meta=meta, wcs=wcs.WCS(wcs_dic), psf=psf)
return a NDData from a Xdataset
60 @staticmethod 61 def ximport(ndd, Xclass, **kwargs): 62 '''return a Xdataset from a astropy.NDData''' 63 xnd = [] 64 name = 'no_name' 65 unit = ndd.unit.to_string() if not ndd.unit is None else None 66 ntv_type = Nutil.ntv_type(ndd.data.dtype.name, ext=unit) 67 xnd += [Xndarray('data', nda=Ndarray(ndd.data, ntv_type=ntv_type))] 68 if ndd.meta: 69 meta = {key: val for key, val in ndd.meta.items() if key != 'name'} 70 name = ndd.meta.get('name', 'no_name') 71 xnd += [Xndarray('meta', meta=meta)] 72 if ndd.wcs: 73 xnd += [Xndarray('wcs', meta=dict(ndd.wcs.to_header()))] 74 if not ndd.psf is None: 75 xnd += [Xndarray('psf', nda=Ndarray(ndd.psf, ntv_type=ntv_type))] 76 if not ndd.mask is None: 77 xnd += [Xndarray('data.mask', nda=ndd.mask)] 78 if not ndd.uncertainty is None: 79 typ_u = ndd.uncertainty.__class__.__name__[:3].lower() 80 ntv_type = Nutil.ntv_type( 81 ndd.uncertainty.array.dtype.name, ext=typ_u) 82 nda = Ndarray(ndd.uncertainty.array, ntv_type=ntv_type) 83 xnd += [Xndarray('data.uncertainty', nda=nda)] 84 return Xclass(xnd, name).to_canonical()
return a Xdataset from a astropy.NDData
87class PandasConnec: 88 ''' pandas.DataFrame interface with two static methods ximport and xexport''' 89 90 @staticmethod 91 def xexport(xdt, **kwargs): 92 '''return a pd.DataFrame from a Xdataset 93 94 *Parameters* 95 96 - **json_name**: Boolean (default True) - if False use full_name else json_name 97 - **info**: Boolean (default True) - if True add xdt.info in DataFrame.attrs 98 - **dims**: list of string (default None) - order of dimensions full_name to apply 99 ''' 100 opt = {'json_name': True, 'info': True, 'dims': None} | kwargs 101 dic_name = {name: xdt[name].json_name if opt['json_name'] else xdt[name].full_name 102 for name in xdt.names} 103 dims = xdt.dimensions if not opt['dims'] else tuple(opt['dims']) 104 fields = (xdt.group(dims) + xdt.group(xdt.coordinates) + 105 xdt.group(xdt.data_vars) + xdt.uniques) 106 fields += tuple(nam for nam in xdt.group(xdt.data_arrays) 107 if len(xdt[nam]) == xdt.length) 108 fields_array = tuple(var for var in fields if not xdt[var].uri) 109 dic_series = {dic_name[name]: PandasConnec._to_np_series(xdt, name, dims) 110 for name in fields_array} 111 dfr = pd.DataFrame(dic_series) 112 index = [dic_name[name] for name in dims] 113 if index: 114 dfr = dfr.set_index(index) 115 if opt['info']: 116 dfr.attrs |= {'info': xdt.tab_info} 117 dfr.attrs |= {'metadata': { 118 name: xdt[name].meta for name in xdt.metadata}} 119 fields_uri = [var for var in fields if not var in fields_array] 120 fields_other = [nam for nam in xdt.group(xdt.data_arrays) 121 if len(xdt[nam]) != xdt.length] 122 if fields_uri: 123 dfr.attrs |= {'fields': {nam: xdt[nam].to_json(noname=True,) 124 for nam in fields_uri + fields_other}} 125 if xdt.name: 126 dfr.attrs |= {'name': xdt.name} 127 return dfr 128 129 @staticmethod 130 def ximport(df, Xclass, **kwargs): 131 '''return a Xdataset from a pd.DataFrame 132 133 *Parameters* 134 135 - dims: list of string (default None) - order of dimensions to apply 136 ''' 137 opt = {'dims': None} | kwargs 138 xnd = [] 139 dfr = df.reset_index() 140 if 'index' in dfr.columns and not 'index' in df.columns: 141 del dfr['index'] 142 df_names = {Nutil.split_json_name(j_name)[0]: j_name 143 for j_name in dfr.columns} 144 df_ntv_types = {Nutil.split_json_name(j_name)[0]: 145 Nutil.split_json_name(j_name)[1] for j_name in dfr.columns} 146 dfr.columns = [Nutil.split_json_name(name)[0] for name in dfr.columns] 147 if dfr.attrs.get('metadata'): 148 for name, meta in dfr.attrs['metadata'].items(): 149 xnd += [Xndarray.read_json({name: meta})] 150 if dfr.attrs.get('fields'): 151 for name, jsn in dfr.attrs['fields'].items(): 152 xnd += [Xndarray.read_json({name: jsn})] 153 if dfr.attrs.get('info'): 154 dimensions = dfr.attrs['info']['dimensions'] 155 data = dfr.attrs['info']['data'] 156 else: 157 dimensions, data = PandasConnec._ximport_analysis(dfr, opt['dims']) 158 shape_dfr = [data[dim]['shape'][0] 159 for dim in dimensions] if dimensions else len(dfr) 160 dfr = dfr.sort_values(dimensions) 161 for name in df_names: 162 xnd += [PandasConnec._ximport_series(data, name, dfr, dimensions, 163 shape_dfr, df_ntv_types, **opt)] 164 return Xclass(xnd, dfr.attrs.get('name')).to_canonical() 165 166 @staticmethod 167 def _ximport_analysis(dfr, opt_dims): 168 '''return data and dimensions from analysis module 169 - opt_dims: partition to apply 170 - dfr: dataframe to analyse''' 171 dfr_idx = list(dfr.index.names) 172 opt_dims = dfr_idx if dfr_idx != [None] else opt_dims 173 ana = dfr.npd.analysis(distr=True) 174 partition = ana.field_partition(partition=opt_dims, mode='id') 175 part_rel = ana.relation_partition(partition=opt_dims, noroot=True) 176 part_dim = ana.relation_partition( 177 partition=opt_dims, noroot=True, primary=True) 178 dimensions = partition['primary'] 179 len_fields = {fld.idfield: fld.lencodec for fld in ana.fields} 180 data = {fld.idfield: { 181 'shape': [len_fields[dim] for dim in part_dim[fld.idfield]] if part_dim else [], 182 'links': part_rel[fld.idfield] if part_rel else []} for fld in ana.fields} 183 for json_name in data: 184 if not data[json_name]['shape']: 185 name = Nutil.split_name(Nutil.split_json_name(json_name)[0])[0] 186 p_name = [js_name for js_name in data 187 if Nutil.split_json_name(js_name)[0] == name][0] 188 data[json_name]['shape'] = data[p_name]['shape'] 189 return (dimensions, data) 190 191 @staticmethod 192 def _ximport_series(data, name, dfr, dimensions, shape_dfr, df_ntv_types, **opt): 193 '''return a Xndarray from a Series of a pd.DataFrame''' 194 if data[name].get('xtype') == 'meta': # or len(dfr[name].unique()) == 1: 195 return Xndarray(name, meta=dfr[name].iloc[0]) 196 meta = data[name].get('meta') 197 ntv_type = df_ntv_types[name] 198 if len(dfr[name].unique()) == 1: 199 nda = Ndarray(np.array(dfr[name].iloc[0]), 200 ntv_type=ntv_type, str_uri=False) 201 nda.set_shape([1]) 202 return Xndarray(name, nda=nda, meta=meta) 203 if not dimensions: 204 nda = Ndarray(np.array(dfr[name]), ntv_type=ntv_type) 205 return Xndarray(name, nda=nda, meta=meta) 206 dims = [] 207 PandasConnec._get_dims(dims, name, data, dimensions) 208 if not dims: 209 p_name, add_name = Nutil.split_name(name) 210 if add_name: 211 PandasConnec._get_dims(dims, p_name, data, dimensions) 212 np_array = PandasConnec._from_series(dfr, name, shape_dfr, 213 dimensions, dims, opt['dims']) 214 shape = data[name].get('shape', [len(dfr)]) 215 nda = Ndarray(np_array, ntv_type, shape) 216 links = data[name].get('links') 217 return Xndarray(name, nda=nda, links=links if links else dims, meta=meta) 218 219 @staticmethod 220 def _to_np_series(xdt, name, dims): 221 '''return a np.ndarray from the Xndarray of xdt defined by his name 222 223 *parameters* 224 225 - **xdt**: Xdataset - data to convert in a pd.DataFrame 226 - **name**: string - full_name of the Xndarray to convert 227 - **dims**: list of string - order of dimensions full_name to apply''' 228 if name in xdt.uniques: 229 return np.array([xdt[name].darray[0]] * xdt.length) 230 if xdt[name].shape == [xdt.length]: 231 return xdt[name].darray 232 n_shape = {nam: len(xdt[nam]) for nam in dims} 233 dim_name = xdt.dims(name) 234 if not set(dim_name) <= set(dims): 235 return None 236 add_name = [nam for nam in dims if not nam in dim_name] 237 tab_name = add_name + dim_name 238 239 til = 1 240 for nam in add_name: 241 til *= n_shape[nam] 242 shap = [n_shape[nam] for nam in tab_name] 243 order = [dims.index(nam) for nam in tab_name] 244 arr = xdt[name].darray 245 return Nutil.extend_array(arr, til, shap, order) 246 247 @staticmethod 248 def _from_series(dfr, name, shape, dims, links, new_dims=None): 249 '''return a flattened np.ndarray from the pd.Series of dfr defined by his name 250 251 *parameters* 252 253 - dfr: DataFrame - data to convert in Xdataset 254 - name: string - name of the Series (full_name or json_name) 255 - shape: shape of the Xdataset 256 - dims: list of string - list of name of dimensions 257 - links: list of string - list of linked Series 258 - new_dims: list of string (default None) - new order of dims 259 ''' 260 if not links: 261 return np.array(dfr[name]) 262 old_order = list(range(len(dims))) 263 new_dims = new_dims if new_dims else dims 264 order = [dims.index(dim) 265 for dim in new_dims] if new_dims else old_order 266 idx = [0] * len(dims) 267 for nam in links: 268 idx[new_dims.index(nam)] = slice(shape[dims.index(nam)]) 269 xar = np.moveaxis(np.array(dfr[name]).reshape( 270 shape), old_order, order)[*idx] 271 if not links: 272 return xar.flatten() 273 lnk = [nam for nam in new_dims if nam in links] 274 shape_lnk = [shape[dims.index(nam)] for nam in lnk] 275 xar = xar.reshape(shape_lnk) 276 old_order = list(range(len(links))) 277 order = [lnk.index(dim) for dim in links] 278 return np.moveaxis(xar, old_order, order).flatten() 279 280 @staticmethod 281 def _get_dims(dims, name, data, dimensions): 282 '''add names of dimensions into dims''' 283 if not name: 284 return 285 if name in dimensions: 286 dims += [name] 287 else: 288 if not 'links' in data[name]: 289 return 290 for nam in data[name]['links']: 291 PandasConnec._get_dims(dims, nam, data, dimensions)
pandas.DataFrame interface with two static methods ximport and xexport
90 @staticmethod 91 def xexport(xdt, **kwargs): 92 '''return a pd.DataFrame from a Xdataset 93 94 *Parameters* 95 96 - **json_name**: Boolean (default True) - if False use full_name else json_name 97 - **info**: Boolean (default True) - if True add xdt.info in DataFrame.attrs 98 - **dims**: list of string (default None) - order of dimensions full_name to apply 99 ''' 100 opt = {'json_name': True, 'info': True, 'dims': None} | kwargs 101 dic_name = {name: xdt[name].json_name if opt['json_name'] else xdt[name].full_name 102 for name in xdt.names} 103 dims = xdt.dimensions if not opt['dims'] else tuple(opt['dims']) 104 fields = (xdt.group(dims) + xdt.group(xdt.coordinates) + 105 xdt.group(xdt.data_vars) + xdt.uniques) 106 fields += tuple(nam for nam in xdt.group(xdt.data_arrays) 107 if len(xdt[nam]) == xdt.length) 108 fields_array = tuple(var for var in fields if not xdt[var].uri) 109 dic_series = {dic_name[name]: PandasConnec._to_np_series(xdt, name, dims) 110 for name in fields_array} 111 dfr = pd.DataFrame(dic_series) 112 index = [dic_name[name] for name in dims] 113 if index: 114 dfr = dfr.set_index(index) 115 if opt['info']: 116 dfr.attrs |= {'info': xdt.tab_info} 117 dfr.attrs |= {'metadata': { 118 name: xdt[name].meta for name in xdt.metadata}} 119 fields_uri = [var for var in fields if not var in fields_array] 120 fields_other = [nam for nam in xdt.group(xdt.data_arrays) 121 if len(xdt[nam]) != xdt.length] 122 if fields_uri: 123 dfr.attrs |= {'fields': {nam: xdt[nam].to_json(noname=True,) 124 for nam in fields_uri + fields_other}} 125 if xdt.name: 126 dfr.attrs |= {'name': xdt.name} 127 return dfr
return a pd.DataFrame from a Xdataset
Parameters
- json_name: Boolean (default True) - if False use full_name else json_name
- info: Boolean (default True) - if True add xdt.info in DataFrame.attrs
- dims: list of string (default None) - order of dimensions full_name to apply
129 @staticmethod 130 def ximport(df, Xclass, **kwargs): 131 '''return a Xdataset from a pd.DataFrame 132 133 *Parameters* 134 135 - dims: list of string (default None) - order of dimensions to apply 136 ''' 137 opt = {'dims': None} | kwargs 138 xnd = [] 139 dfr = df.reset_index() 140 if 'index' in dfr.columns and not 'index' in df.columns: 141 del dfr['index'] 142 df_names = {Nutil.split_json_name(j_name)[0]: j_name 143 for j_name in dfr.columns} 144 df_ntv_types = {Nutil.split_json_name(j_name)[0]: 145 Nutil.split_json_name(j_name)[1] for j_name in dfr.columns} 146 dfr.columns = [Nutil.split_json_name(name)[0] for name in dfr.columns] 147 if dfr.attrs.get('metadata'): 148 for name, meta in dfr.attrs['metadata'].items(): 149 xnd += [Xndarray.read_json({name: meta})] 150 if dfr.attrs.get('fields'): 151 for name, jsn in dfr.attrs['fields'].items(): 152 xnd += [Xndarray.read_json({name: jsn})] 153 if dfr.attrs.get('info'): 154 dimensions = dfr.attrs['info']['dimensions'] 155 data = dfr.attrs['info']['data'] 156 else: 157 dimensions, data = PandasConnec._ximport_analysis(dfr, opt['dims']) 158 shape_dfr = [data[dim]['shape'][0] 159 for dim in dimensions] if dimensions else len(dfr) 160 dfr = dfr.sort_values(dimensions) 161 for name in df_names: 162 xnd += [PandasConnec._ximport_series(data, name, dfr, dimensions, 163 shape_dfr, df_ntv_types, **opt)] 164 return Xclass(xnd, dfr.attrs.get('name')).to_canonical()
return a Xdataset from a pd.DataFrame
Parameters
- dims: list of string (default None) - order of dimensions to apply
294class XarrayConnec: 295 ''' Xarray interface with two static methods ximport and xexport''' 296 297 @staticmethod 298 def xexport(xdt, **kwargs): 299 '''return a xr.DataArray or a xr.Dataset from a Xdataset 300 301 *Parameters* 302 303 - **dataset** : Boolean (default True) - if False and a single data_var, 304 return a sc.DataArray 305 - **datagroup** : Boolean (default True) - if True, return a sc.DataGroup 306 which contains the sc.DataArray/sc.Dataset and the other data else only 307 sc.DataArray/sc.Dataset 308 ''' 309 option = {'dataset': True, 'datagroup': True} | kwargs 310 coords = XarrayConnec._to_xr_vars( 311 xdt, xdt.dimensions + xdt.coordinates + xdt.uniques) 312 coords |= XarrayConnec._to_xr_vars(xdt, xdt.additionals) 313 attrs = XarrayConnec._to_xr_attrs(xdt, **option) 314 if len(xdt.data_vars) == 1 and not option['dataset']: 315 var_name = xdt.data_vars[0] 316 data = xdt.to_ndarray(var_name) 317 dims = xdt.dims(var_name) 318 attrs |= {'ntv_type': xdt[var_name].nda.ntv_type} 319 attrs |= xdt[var_name].meta if xdt[var_name].meta else {} 320 name = var_name if var_name != 'data' else None 321 xrd = xr.DataArray(data=data, coords=coords, dims=dims, attrs=attrs, 322 name=name) 323 else: 324 data_vars = XarrayConnec._to_xr_vars(xdt, xdt.data_vars) 325 xrd = xr.Dataset(data_vars, coords=coords, attrs=attrs) 326 for unic in xdt.uniques: 327 xrd[unic].attrs |= {'ntv_type': xdt[unic].ntv_type} | ( 328 xdt[unic].meta if xdt[unic].meta else {}) 329 return xrd 330 331 @staticmethod 332 def ximport(xar, Xclass, **kwargs): 333 '''return a Xdataset from a xr.DataArray or a xr.Dataset''' 334 xnd = [] 335 if xar.attrs: 336 attrs = {k: v for k, v in xar.attrs.items() if not k in [ 337 'name', 'ntv_type']} 338 for name, meta in attrs.items(): 339 if isinstance(meta, list): 340 xnd += [Xndarray.read_json({name: meta})] 341 else: 342 xnd += [Xndarray(name, meta=meta)] 343 for coord in xar.coords: 344 xnd += [XarrayConnec._var_xr_to_xnd(xar.coords[coord])] 345 if list(xar.coords[coord].dims) == list(xar.dims) and isinstance(xar, xr.Dataset): 346 xnd[-1].links = [list(xar.data_vars)[0]] 347 if isinstance(xar, xr.DataArray): 348 var = XarrayConnec._var_xr_to_xnd( 349 xar, name='data', add_attrs=False) 350 xnd += [XarrayConnec._var_xr_to_xnd(xar, 351 name='data', add_attrs=False)] 352 xdt = Xclass(xnd, xar.attrs.get('name')) 353 for var in xdt.data_vars: 354 if var != xar.name and xar.name: 355 xdt[var].links = [xar.name] 356 return xdt.to_canonical() 357 for var in xar.data_vars: 358 xnd += [XarrayConnec._var_xr_to_xnd(xar.data_vars[var])] 359 return Xclass(xnd, xar.attrs.get('name')).to_canonical() 360 361 @staticmethod 362 def _var_xr_to_xnd(var, name=None, add_attrs=True): 363 '''return a Xndarray from a Xarray variable 364 365 *Parameters* 366 367 - **var** : Xarray variable to convert in Xndarray, 368 - **name** : string (default None) - default name if var have no name, 369 - **add_attrs** : boolean (default True) - if False, attrs are not converted 370 ''' 371 full_name = var.name if var.name else name 372 name = Nutil.split_name(full_name)[0] 373 dims = None if var.dims == (name,) or var.size == 1 else list(var.dims) 374 ntv_type = var.attrs.get('ntv_type') 375 nda = var.values 376 nda = nda.reshape(1) if not nda.shape else nda 377 if nda.dtype.name == 'datetime64[ns]' and ntv_type: 378 nda = Nutil.convert(ntv_type, nda, tojson=False) 379 attrs = {k: v for k, v in var.attrs.items() 380 if not k in ['ntv_type', 'name']} if add_attrs else {} 381 return Xndarray(full_name, Ndarray(nda, ntv_type), dims, attrs) 382 383 @staticmethod 384 def _to_xr_attrs(xdt, **option): 385 '''return a dict with attributes from a Xdataset 386 387 *Parameters* 388 389 - **datagroup** : Boolean if True, add json representation of 'relative' 390 Xndarrays and 'data_arrays' Xndarrays 391 ''' 392 attrs = {meta: xdt[meta].meta for meta in xdt.metadata} 393 attrs |= {'name': xdt.name} if xdt.name else {} 394 if option['datagroup']: 395 for name in xdt.names: 396 if xdt[name].mode == 'relative': 397 attrs |= xdt[name].to_json(header=False) 398 for name in xdt.data_arrays: 399 attrs |= xdt[name].to_json(header=False) 400 return attrs 401 402 @staticmethod 403 def _to_xr_coord(xdt, name): 404 '''return a dict with Xarray attributes from a Xndarray defined by his name''' 405 data = xdt.to_ndarray(name) 406 if name in xdt.uniques: 407 return {name: data[0]} 408 if name in xdt.additionals and not xdt[name].links: 409 data = data.reshape(xdt.shape_dims(xdt[name].name)) 410 dims = tuple(xdt.dims(name)) if xdt.dims(name) else (xdt[name].name) 411 meta = {'ntv_type': xdt[name].ntv_type} | ( 412 xdt[name].meta if xdt[name].meta else {}) 413 return {name: (dims, data, meta)} 414 415 @staticmethod 416 def _to_xr_vars(xdt, list_names): 417 '''return a dict with Xarray attributes from a list of Xndarray names''' 418 arg_vars = {} 419 valid_names = [ 420 nam for nam in list_names if xdt[nam].mode == 'absolute'] 421 for xnd_name in valid_names: 422 arg_vars |= XarrayConnec._to_xr_coord(xdt, xnd_name) 423 for name in list_names: 424 if xdt[name].xtype == 'meta': 425 arg_vars |= {name: xdt[name].meta} 426 return arg_vars 427 428 @staticmethod 429 def _xr_add_type(xar): 430 '''add 'ntv_type' as attribute for a xr.DataArray''' 431 if isinstance(xar, xr.DataArray) and not 'ntv_type' in xar.attrs: 432 xar.attrs |= {'ntv_type': Nutil.ntv_type(xar.data.dtype.name)} 433 return 434 for coord in xar.coords: 435 XarrayConnec._xr_add_type(coord) 436 for var in xar.data_vars: 437 XarrayConnec._xr_add_type(var) 438 return
Xarray interface with two static methods ximport and xexport
297 @staticmethod 298 def xexport(xdt, **kwargs): 299 '''return a xr.DataArray or a xr.Dataset from a Xdataset 300 301 *Parameters* 302 303 - **dataset** : Boolean (default True) - if False and a single data_var, 304 return a sc.DataArray 305 - **datagroup** : Boolean (default True) - if True, return a sc.DataGroup 306 which contains the sc.DataArray/sc.Dataset and the other data else only 307 sc.DataArray/sc.Dataset 308 ''' 309 option = {'dataset': True, 'datagroup': True} | kwargs 310 coords = XarrayConnec._to_xr_vars( 311 xdt, xdt.dimensions + xdt.coordinates + xdt.uniques) 312 coords |= XarrayConnec._to_xr_vars(xdt, xdt.additionals) 313 attrs = XarrayConnec._to_xr_attrs(xdt, **option) 314 if len(xdt.data_vars) == 1 and not option['dataset']: 315 var_name = xdt.data_vars[0] 316 data = xdt.to_ndarray(var_name) 317 dims = xdt.dims(var_name) 318 attrs |= {'ntv_type': xdt[var_name].nda.ntv_type} 319 attrs |= xdt[var_name].meta if xdt[var_name].meta else {} 320 name = var_name if var_name != 'data' else None 321 xrd = xr.DataArray(data=data, coords=coords, dims=dims, attrs=attrs, 322 name=name) 323 else: 324 data_vars = XarrayConnec._to_xr_vars(xdt, xdt.data_vars) 325 xrd = xr.Dataset(data_vars, coords=coords, attrs=attrs) 326 for unic in xdt.uniques: 327 xrd[unic].attrs |= {'ntv_type': xdt[unic].ntv_type} | ( 328 xdt[unic].meta if xdt[unic].meta else {}) 329 return xrd
return a xr.DataArray or a xr.Dataset from a Xdataset
Parameters
- dataset : Boolean (default True) - if False and a single data_var, return a sc.DataArray
- datagroup : Boolean (default True) - if True, return a sc.DataGroup which contains the sc.DataArray/sc.Dataset and the other data else only sc.DataArray/sc.Dataset
331 @staticmethod 332 def ximport(xar, Xclass, **kwargs): 333 '''return a Xdataset from a xr.DataArray or a xr.Dataset''' 334 xnd = [] 335 if xar.attrs: 336 attrs = {k: v for k, v in xar.attrs.items() if not k in [ 337 'name', 'ntv_type']} 338 for name, meta in attrs.items(): 339 if isinstance(meta, list): 340 xnd += [Xndarray.read_json({name: meta})] 341 else: 342 xnd += [Xndarray(name, meta=meta)] 343 for coord in xar.coords: 344 xnd += [XarrayConnec._var_xr_to_xnd(xar.coords[coord])] 345 if list(xar.coords[coord].dims) == list(xar.dims) and isinstance(xar, xr.Dataset): 346 xnd[-1].links = [list(xar.data_vars)[0]] 347 if isinstance(xar, xr.DataArray): 348 var = XarrayConnec._var_xr_to_xnd( 349 xar, name='data', add_attrs=False) 350 xnd += [XarrayConnec._var_xr_to_xnd(xar, 351 name='data', add_attrs=False)] 352 xdt = Xclass(xnd, xar.attrs.get('name')) 353 for var in xdt.data_vars: 354 if var != xar.name and xar.name: 355 xdt[var].links = [xar.name] 356 return xdt.to_canonical() 357 for var in xar.data_vars: 358 xnd += [XarrayConnec._var_xr_to_xnd(xar.data_vars[var])] 359 return Xclass(xnd, xar.attrs.get('name')).to_canonical()
return a Xdataset from a xr.DataArray or a xr.Dataset
441class ScippConnec: 442 ''' Scipp interface with two static methods ximport and xexport''' 443 444 SCTYPE_DTYPE = {'string': 'str'} 445 446 @staticmethod 447 def xexport(xdt, **kwargs): 448 '''return a sc.DataArray or a sc.Dataset from a xdataset 449 450 *Parameters* 451 452 - **dataset** : Boolean (default True) - if False and a single data_var, 453 return a DataArray 454 - **datagroup** : Boolean (default True) - if True return a DataGroup with 455 metadata and data_arrays 456 - **ntv_type** : Boolean (default True) - if True add ntv-type to the name 457 ''' 458 option = {'dataset': True, 'datagroup': True, 459 'ntv_type': True} | kwargs 460 coords = dict([ScippConnec._to_scipp_var(xdt, name, **option) 461 for name in xdt.coordinates + xdt.dimensions + xdt.uniques 462 if xdt[name].mode == 'absolute']) 463 scd = sc.Dataset(dict([ScippConnec._to_sc_dataarray(xdt, name, coords, **option) 464 for name in xdt.data_vars 465 if xdt[name].mode == 'absolute'])) 466 scd = scd if option['dataset'] else scd[list(scd)[0]] 467 if not option['datagroup']: 468 return scd 469 sc_name = xdt.name if xdt.name else 'no_name' 470 return sc.DataGroup({sc_name: scd} | ScippConnec._to_scipp_grp(xdt, **option)) 471 472 @staticmethod 473 def ximport(sc_obj, Xclass, **kwargs): 474 '''return a xdataset from a scipp object DataArray, Dataset or DataGroup''' 475 xnd = [] 476 scd = sc_obj 477 xnd_name = None 478 if isinstance(sc_obj, sc.DataGroup): 479 for obj in sc_obj: 480 if isinstance(sc_obj[obj], (sc.Dataset, sc.DataArray)): 481 scd = sc_obj[obj] 482 xnd_name = obj 483 break 484 if isinstance(scd, sc.DataArray): 485 scd = sc.Dataset({(scd.name if scd.name else 'no_name'): scd}) 486 if isinstance(scd, sc.Dataset): 487 for coord in scd.coords: 488 xnd += ScippConnec._var_sc_to_xnd( 489 scd.coords[coord], scd, coord) 490 for var in scd: 491 for mask in scd[var].masks: 492 m_var = Nutil.split_json_name(var)[0] 493 xnd += ScippConnec._var_sc_to_xnd( 494 scd[var].masks[mask], scd, mask, m_var) 495 xnd += ScippConnec._var_sc_to_xnd(scd[var].data, scd, var) 496 if isinstance(sc_obj, sc.DataGroup): 497 xnd = ScippConnec._grp_sc_to_xnd(sc_obj, xnd) 498 return Xclass(xnd, xnd_name).to_canonical() 499 500 @staticmethod 501 def _grp_sc_to_xnd(sc_obj, xnd): 502 '''return a list of Xndarray from a scipp variable''' 503 dic_xnd = {xar.name: xar for xar in xnd} 504 for obj in sc_obj: 505 name, add_name = Nutil.split_name(obj) 506 match [name, add_name, sc_obj[obj]]: 507 case [name, None, list()]: 508 xnd += [Xndarray.read_json({name: sc_obj[obj]})] 509 case [name, add_name, sc.Variable()]: 510 xnd += ScippConnec._var_sc_to_xnd( 511 sc_obj[obj], None, add_name, name) 512 case [name, _, dict() | str() | list()] if name in dic_xnd: 513 if dic_xnd[name].meta: 514 dic_xnd[name].meta |= sc_obj[obj] 515 else: 516 dic_xnd[name].meta = sc_obj[obj] 517 case [name, _, dict() | str() | list()]: 518 xnd += [Xndarray.read_json({name: sc_obj[obj]})] 519 case [_, _, _]: ... 520 return xnd 521 522 @staticmethod 523 def _var_sc_to_xnd(scv, scd=None, sc_name='', var=None): 524 '''return a list of Xndarray from a scipp variable 525 - scd : scipp dataset 526 - scv : scipp variable 527 - var : name 528 - sc_name : scipp name''' 529 l_xnda = [] 530 unit = scv.unit.name if scv.unit and not scv.unit in [ 531 'dimensionless', 'ns'] else '' 532 ext_name, typ1 = Nutil.split_json_name(sc_name, True) 533 var_name, typ2 = Nutil.split_json_name(var, True) 534 full_name = var_name + \ 535 ('.' if var_name and ext_name else '') + ext_name 536 ntv_type_base = typ1 + typ2 537 ntv_type = ntv_type_base + ('[' + unit + ']' if unit else '') 538 links = [Nutil.split_json_name(jsn)[0] for jsn in scv.dims] 539 if not scd is None and sc_name in scd.coords and scv.dims == scd.dims: 540 links = [Nutil.split_json_name(list(scd)[0])[0]] 541 if not scv.variances is None: 542 nda = Ndarray(scv.variances, ntv_type_base) 543 l_xnda.append(Xndarray(full_name + '.variance', nda, links)) 544 nda = Ndarray(scv.values, ntv_type, str_uri=False) 545 shape = scv.shape if scv.shape else (1,) 546 nda.set_shape(shape) 547 l_xnda.append(Xndarray(full_name, nda, links)) 548 return l_xnda 549 550 @staticmethod 551 def _to_sc_dataarray(xdt, name, coords, **option): 552 '''return a scipp.DataArray from a xdataset.global_var defined by his name''' 553 scipp_name, data = ScippConnec._to_scipp_var(xdt, name, **option) 554 masks = dict([ScippConnec._to_scipp_var(xdt, nam, **option) 555 for nam in set(xdt.group(name)) & set(xdt.masks)]) 556 return (scipp_name, sc.DataArray(data, coords=coords, masks=masks)) 557 558 @staticmethod 559 def _to_scipp_grp(xdt, **option): 560 '''return a dict with metadata, data-array and data_add from a xdataset''' 561 grp = {} 562 grp |= dict([ScippConnec._to_scipp_var(xdt, name, **option) 563 for name in xdt.data_add + xdt.data_arrays 564 if xdt[name].add_name != 'variance']) 565 opt_mask = option | {'grp_mask': True} 566 grp |= dict([ScippConnec._to_scipp_var(xdt, name, **opt_mask) 567 for name in xdt.masks 568 if xdt[name].name in xdt.names and not xdt[name].name in xdt.data_vars]) 569 grp |= {name + '.meta': xdt[name].meta for name in xdt.names 570 if xdt[name].meta} 571 for name in xdt.names: 572 if xdt[name].mode == 'relative': 573 grp |= xdt[name].to_json(header=False) 574 return grp 575 576 @staticmethod 577 def _to_scipp_var(xdt, name, **kwargs): 578 '''return a scipp.Variable from a Xndarray defined by his name''' 579 option = {'grp_mask': False, 'ntv_type': True} | kwargs 580 simple_type, unit = Nutil.split_type(xdt[name].ntv_type) 581 unit = unit if unit else '' 582 add_name = Nutil.split_name(name)[1] 583 new_n = add_name if name in xdt.masks and not option['grp_mask'] else name 584 opt_n = option['ntv_type'] 585 scipp_name = new_n + (':' + simple_type if opt_n else '') 586 if name in xdt.uniques: 587 return (scipp_name, sc.scalar(xdt[name].darray[0], unit=unit)) 588 vari_name = name + '.variance' 589 variances = xdt[vari_name].darray if vari_name in xdt.names else None 590 dims = xdt.dims(name, opt_n) if xdt.dims( 591 name, opt_n) else [xdt[name].name] 592 var = sc.array(dims=['flat'], values=xdt.to_darray( 593 name), variances=variances, unit=unit) 594 var = sc.fold(var, dim='flat', sizes=dict(zip(dims, xdt[name].shape))) 595 return (scipp_name, var)
Scipp interface with two static methods ximport and xexport
446 @staticmethod 447 def xexport(xdt, **kwargs): 448 '''return a sc.DataArray or a sc.Dataset from a xdataset 449 450 *Parameters* 451 452 - **dataset** : Boolean (default True) - if False and a single data_var, 453 return a DataArray 454 - **datagroup** : Boolean (default True) - if True return a DataGroup with 455 metadata and data_arrays 456 - **ntv_type** : Boolean (default True) - if True add ntv-type to the name 457 ''' 458 option = {'dataset': True, 'datagroup': True, 459 'ntv_type': True} | kwargs 460 coords = dict([ScippConnec._to_scipp_var(xdt, name, **option) 461 for name in xdt.coordinates + xdt.dimensions + xdt.uniques 462 if xdt[name].mode == 'absolute']) 463 scd = sc.Dataset(dict([ScippConnec._to_sc_dataarray(xdt, name, coords, **option) 464 for name in xdt.data_vars 465 if xdt[name].mode == 'absolute'])) 466 scd = scd if option['dataset'] else scd[list(scd)[0]] 467 if not option['datagroup']: 468 return scd 469 sc_name = xdt.name if xdt.name else 'no_name' 470 return sc.DataGroup({sc_name: scd} | ScippConnec._to_scipp_grp(xdt, **option))
return a sc.DataArray or a sc.Dataset from a xdataset
Parameters
- dataset : Boolean (default True) - if False and a single data_var, return a DataArray
- datagroup : Boolean (default True) - if True return a DataGroup with metadata and data_arrays
- ntv_type : Boolean (default True) - if True add ntv-type to the name
472 @staticmethod 473 def ximport(sc_obj, Xclass, **kwargs): 474 '''return a xdataset from a scipp object DataArray, Dataset or DataGroup''' 475 xnd = [] 476 scd = sc_obj 477 xnd_name = None 478 if isinstance(sc_obj, sc.DataGroup): 479 for obj in sc_obj: 480 if isinstance(sc_obj[obj], (sc.Dataset, sc.DataArray)): 481 scd = sc_obj[obj] 482 xnd_name = obj 483 break 484 if isinstance(scd, sc.DataArray): 485 scd = sc.Dataset({(scd.name if scd.name else 'no_name'): scd}) 486 if isinstance(scd, sc.Dataset): 487 for coord in scd.coords: 488 xnd += ScippConnec._var_sc_to_xnd( 489 scd.coords[coord], scd, coord) 490 for var in scd: 491 for mask in scd[var].masks: 492 m_var = Nutil.split_json_name(var)[0] 493 xnd += ScippConnec._var_sc_to_xnd( 494 scd[var].masks[mask], scd, mask, m_var) 495 xnd += ScippConnec._var_sc_to_xnd(scd[var].data, scd, var) 496 if isinstance(sc_obj, sc.DataGroup): 497 xnd = ScippConnec._grp_sc_to_xnd(sc_obj, xnd) 498 return Xclass(xnd, xnd_name).to_canonical()
return a xdataset from a scipp object DataArray, Dataset or DataGroup