NTV.json_ntv.ntv_util
Created on Feb 27 2023
@author: Philippe@loco-labs.io
The ntv_util
module is part of the NTV.json_ntv
package (specification document).
It contains the classes NtvUtil
, NtvConnector
, NtvTree
, NtvJsonEncoder
and NtvError
for NTV entities.
1# -*- coding: utf-8 -*- 2""" 3Created on Feb 27 2023 4 5@author: Philippe@loco-labs.io 6 7The `ntv_util` module is part of the `NTV.json_ntv` package ([specification document]( 8https://github.com/loco-philippe/NTV/blob/main/documentation/JSON-NTV-standard.pdf)). 9 10It contains the classes `NtvUtil`, `NtvConnector`, `NtvTree`, `NtvJsonEncoder` 11and `NtvError` for NTV entities. 12""" 13from abc import ABC, abstractmethod 14import datetime 15import json 16import re 17 18 19class NtvUtil: 20 ''' The NtvUtil class includes static methods used by several NTV classes. 21 NtvUtil is the parent class of `Datatype`, `Namespace`, `Ntv`. 22 23 *class variables :* 24 - **_namespaces_** : list of Namespace defined 25 - **_types_** : list of Datatype defined 26 27 *static methods :* 28 - `is_dictable` 29 - `from_obj_name` 30 - `decode_ntv_tab` 31 - `to_ntv_pointer` 32 33 ''' 34 _namespaces_ = {} 35 _types_ = {} 36 37 @staticmethod 38 def is_dictable(lis): 39 '''check if a list can be translated in a dict''' 40 keys = set() 41 for val in lis: 42 if not isinstance(val, dict): 43 return False 44 if len(val) != 1: 45 return False 46 keys.add(list(val)[0]) 47 return len(keys) == len(lis) 48 49 @staticmethod 50 def from_obj_name(string): 51 '''return a tuple with name, type_str and separator from string''' 52 if not isinstance(string, str): 53 raise NtvError('a json-name have to be str') 54 if string == '': 55 return (None, None, None) 56 spl = string.rsplit(':', maxsplit=1) 57 if len(spl) == 1: 58 if string[-1] == '.': 59 return (None, string, None) 60 return (string, None, None) 61 if spl[0] == '': 62 return (None, spl[1], ':') 63 if spl[0][-1] == ':': 64 sp0 = spl[0][:-1] 65 return (None if sp0 == '' else sp0, None if spl[1] == '' else spl[1], '::') 66 return (None if spl[0] == '' else spl[0], None if spl[1] == '' else spl[1], ':') 67 68 @staticmethod 69 def decode_ntv_tab(ntv, ntv_to_val): 70 '''Generate a tuple data from a Ntv tab value (bytes, string, json, Ntv object) 71 72 *parameters:* 73 74 - **ntv**: Ntv data to decode, 75 - **ntv_to_val**: method to convert external value form ntv in internal Field value 76 77 *Returns tuple: (name, dtype, codec, parent, keys, coef, leng)* 78 79 - name (None or string): name of the Field 80 - dtype (None or string): type of data 81 - codec (list): list of Field codec values 82 - parent (None or int): Field parent or None 83 - keys (None or list): Field keys 84 - coef (None or int): coef if primary Field else None 85 - leng (int): length of the Field 86 ''' 87 typ = ntv.type_str if ntv.ntv_type else None 88 nam = ntv.name 89 val = ntv_to_val(ntv) 90 if ntv.__class__.__name__ == 'NtvSingle': 91 return (nam, typ, [val], None, None, None, 1) 92 if len(ntv) < 2 or len(ntv) > 3 or ntv[0].__class__.__name__ == 'NtvSingle': 93 return (nam, typ, val, None, None, None, len(ntv)) 94 95 ntvc = ntv[0] 96 leng = max(len(ind) for ind in ntv) 97 typc = ntvc.type_str if ntvc.ntv_type else None 98 valc = ntv_to_val(ntvc) 99 if len(ntv) == 3 and ntv[1].__class__.__name__ == 'NtvSingle' and \ 100 isinstance(ntv[1].val, (int, str)) and \ 101 ntv[2].__class__.__name__ != 'NtvSingle' and \ 102 isinstance(ntv[2][0].val, int): 103 return (nam, typc, valc, ntv[1].val, ntv[2].to_obj(), None, leng) 104 if len(ntv) == 2 and len(ntv[1]) == 1 and isinstance(ntv[1].val, (int, str)): 105 return (nam, typc, valc, ntv[1].val, None, None, leng) 106 if len(ntv) == 2 and len(ntv[1]) == 1 and isinstance(ntv[1].val, list): 107 leng = leng * ntv[1][0].val 108 return (nam, typc, valc, None, None, ntv[1][0].val, leng) 109 if len(ntv) == 2 and len(ntv[1]) > 1 and isinstance(ntv[1][0].val, int): 110 return (nam, typc, valc, None, ntv[1].to_obj(), None, leng) 111 return (nam, typ, val, None, None, None, len(ntv)) 112 113 @staticmethod 114 def to_ntvpointer(jsonpointer, unique_root=False): 115 '''convert a json pointer inter a NTV pointer (string) 116 117 *parameters:* 118 119 - **jsonpointer**: String - json pointer to convert, 120 - **unique_root**: Boolean (default False) - True if the json root length is 1 ''' 121 single = '/([0-9]+)(/[a-z])' 122 if unique_root and not '0' <= jsonpointer[1] <= '9': 123 return re.sub(single, r'\g<2>', jsonpointer)[1:] 124 return re.sub(single, r'\g<2>', jsonpointer) 125 126 127class NtvConnector(ABC): 128 ''' The NtvConnector class is an abstract class used by all NTV connectors 129 for conversion between NTV-JSON data and NTV-OBJ data. 130 131 A NtvConnector child is defined by: 132 - clas_obj: str - define the class name of the object to convert 133 - clas_typ: str - define the Datatype of the converted object 134 - to_obj_ntv: method - converter from JsonNTV to the object 135 - to_json_ntv: method - converter from the object to JsonNTV 136 137 *class method :* 138 - `connector` 139 - `dic_connec` 140 - `castable` (@property) 141 - `dic_obj` (@property) 142 - `dic_type` (@property) 143 144 *abstract method* 145 - `to_obj_ntv` 146 - `to_json_ntv` 147 148 *static method* 149 - `cast` 150 - `uncast` 151 - `is_json_class` 152 - `is_json` 153 - `init_ntv_keys` 154 ''' 155 156 DIC_NTV_CL = {'NtvSingle': 'ntv', 'NtvList': 'ntv'} 157 DIC_GEO_CL = {'Point': 'point', 'MultiPoint': 'multipoint', 'LineString': 'line', 158 'MultiLineString': 'multiline', 'Polygon': 'polygon', 159 'MultiPolygon': 'multipolygon'} 160 DIC_DAT_CL = {'date': 'date', 'time': 'time', 'datetime': 'datetime'} 161 DIC_FCT = {'date': datetime.date.fromisoformat, 'time': datetime.time.fromisoformat, 162 'datetime': datetime.datetime.fromisoformat} 163 DIC_GEO = {'point': 'point', 'multipoint': 'multipoint', 'line': 'linestring', 164 'multiline': 'multilinestring', 'polygon': 'polygon', 165 'multipolygon': 'multipolygon'} 166 DIC_CBOR = {'point': False, 'multipoint': False, 'line': False, 167 'multiline': False, 'polygon': False, 'multipolygon': False, 168 'date': True, 'time': False, 'datetime': True} 169 DIC_OBJ = {'tab': 'DataFrameConnec', 'field': 'SeriesConnec', 170 'ndarray': 'NdarrayConnec', 171 'point': 'ShapelyConnec', 'multipoint': 'ShapelyConnec', 172 'line': 'ShapelyConnec', 'multiline': 'ShapelyConnec', 173 'polygon': 'ShapelyConnec', 'multipolygon': 'ShapelyConnec', 174 'other': None} 175 176 @classmethod 177 @property 178 def castable(cls): 179 '''return a list with class_name allowed for json-obj conversion''' 180 return ['str', 'int', 'bool', 'float', 'dict', 'tuple', 'NoneType', 181 'NtvSingle', 'NtvList'] \ 182 + list(NtvConnector.DIC_GEO_CL.keys()) \ 183 + list(NtvConnector.DIC_FCT.keys()) \ 184 + list(NtvConnector.dic_connec().keys()) 185 186 @classmethod 187 @property 188 def dic_obj(cls): 189 '''return a dict with the connectors: { type: class_connec_name }''' 190 return {clas.clas_typ: clas.__name__ for clas in cls.__subclasses__()} |\ 191 NtvConnector.DIC_OBJ 192 193 @classmethod 194 @property 195 def dic_type(cls): 196 '''return a dict with the connectors: { class_obj_name: type }''' 197 return {clas.clas_obj: clas.clas_typ for clas in cls.__subclasses__()} |\ 198 NtvConnector.DIC_GEO_CL | NtvConnector.DIC_DAT_CL | NtvConnector.DIC_NTV_CL 199 200 @classmethod 201 def connector(cls): 202 '''return a dict with the connectors: { class_connec_name: class_connec }''' 203 return {clas.__name__: clas for clas in cls.__subclasses__()} 204 205 @classmethod 206 def dic_connec(cls): 207 '''return a dict with the clas associated to the connector: 208 { class_obj_name: class_connec_name }''' 209 return {clas.clas_obj: clas.__name__ for clas in cls.__subclasses__()} 210 211 @staticmethod 212 @abstractmethod 213 def to_obj_ntv(ntv_value, **kwargs): 214 ''' abstract - convert ntv_value into the return object''' 215 216 @staticmethod 217 @abstractmethod 218 def to_json_ntv(value, name=None, typ=None): 219 ''' abstract - convert NTV object (value, name, type) into the NTV json 220 (json-value, name, type)''' 221 222 @staticmethod 223 def cast(data, name=None, type_str=None): 224 '''return JSON-NTV conversion (json_value, name, type_str) of the NTV entity 225 defined in parameters. 226 227 *Parameters* 228 229 - **data**: NtvSingle entity or NTVvalue of the NTV entity 230 - **name** : String (default None) - name of the NTV entity 231 - **type_str**: String (default None) - type of the NTV entity 232 ''' 233 clas = data.__class__.__name__ 234 if clas == 'NtvSingle': 235 name = data.ntv_name 236 type_str = data.type_str 237 data = data.ntv_value 238 dic_geo_cl = NtvConnector.DIC_GEO_CL 239 dic_connec = NtvConnector.dic_connec() 240 match clas: 241 case 'int' | 'float' | 'bool' | 'str': 242 return (data, name, type_str) 243 case 'dict': 244 return ({key: NtvConnector.cast(val, name, type_str)[0] 245 for key, val in data.items()}, name, type_str) 246 case 'list': 247 return ([NtvConnector.cast(val, name, type_str)[0] for val in data], 248 name, NtvConnector._typ_obj(data) if not type_str else type_str) 249 case 'tuple': 250 return (list(data), name, 'array' if not type_str else type_str) 251 case 'date' | 'time' | 'datetime': 252 return (data.isoformat(), name, clas if not type_str else type_str) 253 case 'Point' | 'MultiPoint' | 'LineString' | 'MultiLineString' | \ 254 'Polygon' | 'MultiPolygon': 255 return (NtvConnector.connector()[dic_connec['geometry']].to_json_ntv(data)[0], 256 name, dic_geo_cl[data.__class__.__name__] if not type_str else type_str) 257 case 'NtvSingle' | 'NtvList': 258 return (data.to_obj(), name, 'ntv' if not type_str else type_str) 259 case _: 260 connec = None 261 if clas in dic_connec and dic_connec[clas] in NtvConnector.connector(): 262 connec = NtvConnector.connector()[dic_connec[clas]] 263 if connec: 264 return connec.to_json_ntv(data, name, type_str) 265 raise NtvError( 266 'connector is not defined for the class : ', clas) 267 return (data, name, type_str) 268 269 @staticmethod 270 def uncast(value, name=None, type_str=None, **kwargs): 271 '''return OBJ-NTV conversion (obj_value, name, type_str) of a NTV entity 272 273 *Parameters* 274 275 - **value**: NtvSingle entity or NTVvalue of the NTV entity 276 - **name** : String (default None) - name of the NTV entity 277 - **type_str**: String (default None) - type of the NTV entity 278 ''' 279 if type_str == 'json': 280 return (value, name, type_str) 281 typebase_str = type_str 282 if value.__class__.__name__ == 'NtvSingle': 283 if not (type_str in set(NtvConnector.dic_type.values()) and 284 NtvConnector.is_json(value) or type_str is None): 285 return (value.ntv_value, value.name, value.type_str) 286 type_str = value.type_str if value.ntv_type else None 287 typebase_str = value.typebase_str if value.ntv_type else None 288 name = value.ntv_name 289 value = value.ntv_value 290 option = {'dicobj': {}, 'format': 'json', 'type_obj': False} | kwargs 291 value_obj = NtvConnector._uncast_val(value, typebase_str, **option) 292 return (value_obj, name, type_str if type_str else NtvConnector._typ_obj(value_obj)) 293 294 @staticmethod 295 def _typ_obj(value): 296 if isinstance(value, dict): 297 return NtvConnector._typ_obj(list(value.values())) 298 if isinstance(value, (tuple, list)): 299 for val in value: 300 typ = NtvConnector._typ_obj(val) 301 if typ: 302 return typ 303 return None 304 return NtvConnector.dic_type.get(value.__class__.__name__) 305 306 @staticmethod 307 def _uncast_val(value, type_n, **option): 308 '''return value from ntv value 309 310 *Parameters* 311 312 - **value**: NtvSingle entity or NTVvalue of the NTV entity 313 ''' 314 dic_fct = NtvConnector.DIC_FCT 315 dic_geo = NtvConnector.DIC_GEO 316 dic_obj = NtvConnector.dic_obj | option['dicobj'] 317 dic_cbor = NtvConnector.DIC_CBOR 318 if not type_n or type_n == 'json' or (option['format'] == 'cbor' and 319 not dic_cbor.get(type_n, False)): 320 return value 321 if type_n in dic_fct: 322 if isinstance(value, (tuple, list)): 323 return [NtvConnector._uncast_val(val, type_n, **option) for val in value] 324 if isinstance(value, dict): 325 return {key: NtvConnector._uncast_val(val, type_n, **option) 326 for key, val in value.items()} 327 return dic_fct[type_n](value) 328 if type_n == 'array': 329 return tuple(value) 330 if type_n == 'ntv': 331 from json_ntv.ntv import Ntv 332 return Ntv.from_obj(value) 333 if type_n in dic_geo: 334 option['type_geo'] = dic_geo[type_n] 335 connec = None 336 if type_n in dic_obj and \ 337 dic_obj[type_n] in NtvConnector.connector(): 338 connec = NtvConnector.connector()[dic_obj[type_n]] 339 elif dic_obj['other'] in NtvConnector.connector(): 340 connec = NtvConnector.connector()['other'] 341 if connec: 342 return connec.to_obj_ntv(value, **option) 343 return value 344 345 @staticmethod 346 def is_json_class(val): 347 ''' return True if val is a json type''' 348 return val is None or isinstance(val, (list, int, str, float, bool, dict)) 349 350 @staticmethod 351 def is_json(obj): 352 ''' check if obj is a json structure and return True if obj is a json-value 353 354 *Parameters* 355 356 - **obj** : object to check 357 - **ntvobj** : boolean (default False) - if True NTV class value are accepted''' 358 if obj is None: 359 return True 360 is_js = NtvConnector.is_json 361 match obj: 362 case str() | int() | float() | bool() as obj: 363 return True 364 case list() | tuple() as obj: 365 if not obj: 366 return True 367 return min(is_js(obj_in) for obj_in in obj) 368 case dict() as obj: 369 if not obj: 370 return True 371 if not min(isinstance(key, str) for key in obj.keys()): 372 raise NtvError('key in dict in not string') 373 return min(is_js(obj_in) for obj_in in obj.values()) 374 case _: 375 if obj.__class__.__name__ not in NtvConnector.castable: 376 raise NtvError(obj.__class__.__name__ + 377 ' is not valid for NTV') 378 return False 379 380 @staticmethod 381 def keysfromderkeys(parentkeys, derkeys): 382 '''return keys from parent keys and derkeys 383 384 *Parameters* 385 386 - **parentkeys** : list of keys from parent 387 - **derkeys** : list of derived keys 388 389 *Returns* : list of keys''' 390 return [derkeys[pkey] for pkey in parentkeys] 391 392 @staticmethod 393 def encode_coef(lis): 394 '''Generate a repetition coefficient for periodic list''' 395 if len(lis) < 2: 396 return 0 397 coef = 1 398 while coef != len(lis): 399 if lis[coef-1] != lis[coef]: 400 break 401 coef += 1 402 if (not len(lis) % (coef * (max(lis) + 1)) and 403 lis == NtvConnector.keysfromcoef(coef, max(lis) + 1, len(lis))): 404 return coef 405 return 0 406 407 @staticmethod 408 def keysfromcoef(coef, period, leng=None): 409 ''' return a list of keys with periodic structure''' 410 if not leng: 411 leng = coef * period 412 return None if not (coef and period) else [(ind % (coef * period)) // coef 413 for ind in range(leng)] 414 415 @staticmethod 416 def format_field(lidx_decode, leng): 417 ''' return a list of format''' 418 # name: 0, type: 1, codec: 2, parent: 3, keys: 4, coef: 5, leng: 6 419 field_format = [] 420 for field_decode in lidx_decode: 421 name, typ, codec, parent, keys, coef, length = field_decode 422 if (keys, parent, coef) == (None, None, None): # full or unique 423 if len(codec) == 1: # unique 424 field_format.append('unique') 425 elif len(codec) == leng: # full 426 field_format.append('full') 427 else: 428 raise NtvError('impossible to generate keys') 429 continue 430 if keys and len(keys) > 1 and parent is None: # complete 431 field_format.append('complete') 432 continue 433 if coef: # primary 434 field_format.append('primary') 435 continue 436 if parent is None: 437 raise NtvError('keys not referenced') 438 if not keys: # implicit 439 field_format.append('implicit') 440 continue 441 field_format.append('relative') 442 return field_format 443 444 @staticmethod 445 def init_ntv_keys(ind, lidx, leng): 446 ''' initialization of explicit keys data in lidx object of tabular data''' 447 # name: 0, type: 1, codec: 2, parent: 3, keys: 4, coef: 5, leng: 6 448 name, typ, codec, parent, keys, coef, length = lidx[ind] 449 if (keys, parent, coef) == (None, None, None): # full or unique 450 if len(codec) == 1: # unique 451 lidx[ind][4] = [0] * leng 452 elif len(codec) == leng: # full 453 lidx[ind][4] = list(range(leng)) 454 else: 455 raise NtvError('impossible to generate keys') 456 return 457 if keys and len(keys) > 1 and parent is None: # complete 458 return 459 if coef: # primary 460 lidx[ind][4] = [(ikey % (coef * len(codec))) // 461 coef for ikey in range(leng)] 462 lidx[ind][3] = None 463 return 464 if parent is None: 465 raise NtvError('keys not referenced') 466 if not lidx[parent][4] or len(lidx[parent][4]) != leng: 467 NtvConnector.init_ntv_keys(parent, lidx, leng) 468 if not keys and len(codec) == len(lidx[parent][2]): # implicit 469 lidx[ind][4] = lidx[parent][4] 470 lidx[ind][3] = None 471 return 472 lidx[ind][4] = NtvConnector.keysfromderkeys( 473 lidx[parent][4], keys) # relative 474 lidx[ind][3] = None 475 return 476 477 478class NtvTree: 479 ''' The NtvTree class is an iterator class used to traverse a NTV tree structure. 480 Some other methods give tree indicators and data. 481 482 *Attributes :* 483 484 - **ntv** : Ntv entity 485 - **_node**: Ntv entity - node pointer 486 - **_stack**: list - stack used to store context in recursive methods 487 488 *dynamic values (@property)* 489 - `breadth` 490 - `size` 491 - `height` 492 - `adjacency_list` 493 - `nodes` 494 - `dic_nodes` 495 - `leaf_nodes` 496 - `inner_nodes` 497 ''' 498 499 def __init__(self, ntv): 500 ''' the parameter of the constructor is the Ntv entity''' 501 self._ntv = ntv 502 self._node = None 503 self._stack = [] 504 505 def __iter__(self): 506 ''' iterator without initialization''' 507 return self 508 509 def __next__(self): 510 ''' return next node in the tree''' 511 if self._node is None: 512 self._node = self._ntv 513 elif len(self._node) == 0: 514 raise StopIteration 515 elif self._node.__class__.__name__ == 'NtvList': 516 self._next_down() 517 else: 518 self._next_up() 519 return self._node 520 521 def _next_down(self): 522 ''' find the next subchild node''' 523 524 self._node = self._node[0] 525 self._stack.append(0) 526 527 def _next_up(self): 528 ''' find the next sibling or ancestor node''' 529 parent = self._node.parent 530 if not parent or self._node == self._ntv: 531 raise StopIteration 532 ind = self._stack[-1] 533 if ind < len(parent) - 1: # if ind is not the last 534 self._node = parent[ind + 1] 535 self._stack[-1] += 1 536 else: 537 if parent == self._ntv: 538 raise StopIteration 539 self._node = parent 540 self._stack.pop() 541 self._next_up() 542 543 @property 544 def breadth(self): 545 ''' return the number of leaves''' 546 return len(self.leaf_nodes) 547 548 @property 549 def size(self): 550 ''' return the number of nodes''' 551 return len(self.nodes) 552 553 @property 554 def height(self): 555 ''' return the height of the tree''' 556 return max(len(node.pointer()) for node in self.__class__(self._ntv)) - 1 557 558 @property 559 def adjacency_list(self): 560 ''' return a dict with the list of child nodes for each parent node''' 561 return {node: node.val for node in self.inner_nodes} 562 563 @property 564 def nodes(self): 565 ''' return the list of nodes according to the DFS preordering algorithm''' 566 return list(self.__class__(self._ntv)) 567 568 @property 569 def dic_nodes(self): 570 ''' return a dict of nodes according to the DFS preordering algorithm''' 571 return {node.ntv_name: node for node in self.__class__(self._ntv) 572 if node.ntv_name} 573 574 @property 575 def leaf_nodes(self): 576 ''' return the list of leaf nodes according to the DFS preordering algorithm''' 577 return [node for node in self.__class__(self._ntv) 578 if node.__class__.__name__ == 'NtvSingle'] 579 580 @property 581 def inner_nodes(self): 582 ''' return the list of inner nodes according to the DFS preordering algorithm''' 583 return [node for node in self.__class__(self._ntv) 584 if node.__class__.__name__ == 'NtvList'] 585 586 587class NtvJsonEncoder(json.JSONEncoder): 588 """json encoder for Ntv data""" 589 590 def default(self, o): 591 try: 592 return NtvConnector.cast(o)[0] 593 except NtvError: 594 return json.JSONEncoder.default(self, o) 595 if isinstance(o, (datetime.datetime, datetime.date, datetime.time)): 596 return o.isoformat() 597 return json.JSONEncoder.default(self, o) 598 599 600class NtvError(Exception): 601 ''' NTV Exception'''
20class NtvUtil: 21 ''' The NtvUtil class includes static methods used by several NTV classes. 22 NtvUtil is the parent class of `Datatype`, `Namespace`, `Ntv`. 23 24 *class variables :* 25 - **_namespaces_** : list of Namespace defined 26 - **_types_** : list of Datatype defined 27 28 *static methods :* 29 - `is_dictable` 30 - `from_obj_name` 31 - `decode_ntv_tab` 32 - `to_ntv_pointer` 33 34 ''' 35 _namespaces_ = {} 36 _types_ = {} 37 38 @staticmethod 39 def is_dictable(lis): 40 '''check if a list can be translated in a dict''' 41 keys = set() 42 for val in lis: 43 if not isinstance(val, dict): 44 return False 45 if len(val) != 1: 46 return False 47 keys.add(list(val)[0]) 48 return len(keys) == len(lis) 49 50 @staticmethod 51 def from_obj_name(string): 52 '''return a tuple with name, type_str and separator from string''' 53 if not isinstance(string, str): 54 raise NtvError('a json-name have to be str') 55 if string == '': 56 return (None, None, None) 57 spl = string.rsplit(':', maxsplit=1) 58 if len(spl) == 1: 59 if string[-1] == '.': 60 return (None, string, None) 61 return (string, None, None) 62 if spl[0] == '': 63 return (None, spl[1], ':') 64 if spl[0][-1] == ':': 65 sp0 = spl[0][:-1] 66 return (None if sp0 == '' else sp0, None if spl[1] == '' else spl[1], '::') 67 return (None if spl[0] == '' else spl[0], None if spl[1] == '' else spl[1], ':') 68 69 @staticmethod 70 def decode_ntv_tab(ntv, ntv_to_val): 71 '''Generate a tuple data from a Ntv tab value (bytes, string, json, Ntv object) 72 73 *parameters:* 74 75 - **ntv**: Ntv data to decode, 76 - **ntv_to_val**: method to convert external value form ntv in internal Field value 77 78 *Returns tuple: (name, dtype, codec, parent, keys, coef, leng)* 79 80 - name (None or string): name of the Field 81 - dtype (None or string): type of data 82 - codec (list): list of Field codec values 83 - parent (None or int): Field parent or None 84 - keys (None or list): Field keys 85 - coef (None or int): coef if primary Field else None 86 - leng (int): length of the Field 87 ''' 88 typ = ntv.type_str if ntv.ntv_type else None 89 nam = ntv.name 90 val = ntv_to_val(ntv) 91 if ntv.__class__.__name__ == 'NtvSingle': 92 return (nam, typ, [val], None, None, None, 1) 93 if len(ntv) < 2 or len(ntv) > 3 or ntv[0].__class__.__name__ == 'NtvSingle': 94 return (nam, typ, val, None, None, None, len(ntv)) 95 96 ntvc = ntv[0] 97 leng = max(len(ind) for ind in ntv) 98 typc = ntvc.type_str if ntvc.ntv_type else None 99 valc = ntv_to_val(ntvc) 100 if len(ntv) == 3 and ntv[1].__class__.__name__ == 'NtvSingle' and \ 101 isinstance(ntv[1].val, (int, str)) and \ 102 ntv[2].__class__.__name__ != 'NtvSingle' and \ 103 isinstance(ntv[2][0].val, int): 104 return (nam, typc, valc, ntv[1].val, ntv[2].to_obj(), None, leng) 105 if len(ntv) == 2 and len(ntv[1]) == 1 and isinstance(ntv[1].val, (int, str)): 106 return (nam, typc, valc, ntv[1].val, None, None, leng) 107 if len(ntv) == 2 and len(ntv[1]) == 1 and isinstance(ntv[1].val, list): 108 leng = leng * ntv[1][0].val 109 return (nam, typc, valc, None, None, ntv[1][0].val, leng) 110 if len(ntv) == 2 and len(ntv[1]) > 1 and isinstance(ntv[1][0].val, int): 111 return (nam, typc, valc, None, ntv[1].to_obj(), None, leng) 112 return (nam, typ, val, None, None, None, len(ntv)) 113 114 @staticmethod 115 def to_ntvpointer(jsonpointer, unique_root=False): 116 '''convert a json pointer inter a NTV pointer (string) 117 118 *parameters:* 119 120 - **jsonpointer**: String - json pointer to convert, 121 - **unique_root**: Boolean (default False) - True if the json root length is 1 ''' 122 single = '/([0-9]+)(/[a-z])' 123 if unique_root and not '0' <= jsonpointer[1] <= '9': 124 return re.sub(single, r'\g<2>', jsonpointer)[1:] 125 return re.sub(single, r'\g<2>', jsonpointer)
The NtvUtil class includes static methods used by several NTV classes.
NtvUtil is the parent class of Datatype
, Namespace
, Ntv
.
class variables :
- _namespaces_ : list of Namespace defined
- _types_ : list of Datatype defined
static methods :
is_dictable
from_obj_name
decode_ntv_tab
to_ntv_pointer
38 @staticmethod 39 def is_dictable(lis): 40 '''check if a list can be translated in a dict''' 41 keys = set() 42 for val in lis: 43 if not isinstance(val, dict): 44 return False 45 if len(val) != 1: 46 return False 47 keys.add(list(val)[0]) 48 return len(keys) == len(lis)
check if a list can be translated in a dict
50 @staticmethod 51 def from_obj_name(string): 52 '''return a tuple with name, type_str and separator from string''' 53 if not isinstance(string, str): 54 raise NtvError('a json-name have to be str') 55 if string == '': 56 return (None, None, None) 57 spl = string.rsplit(':', maxsplit=1) 58 if len(spl) == 1: 59 if string[-1] == '.': 60 return (None, string, None) 61 return (string, None, None) 62 if spl[0] == '': 63 return (None, spl[1], ':') 64 if spl[0][-1] == ':': 65 sp0 = spl[0][:-1] 66 return (None if sp0 == '' else sp0, None if spl[1] == '' else spl[1], '::') 67 return (None if spl[0] == '' else spl[0], None if spl[1] == '' else spl[1], ':')
return a tuple with name, type_str and separator from string
69 @staticmethod 70 def decode_ntv_tab(ntv, ntv_to_val): 71 '''Generate a tuple data from a Ntv tab value (bytes, string, json, Ntv object) 72 73 *parameters:* 74 75 - **ntv**: Ntv data to decode, 76 - **ntv_to_val**: method to convert external value form ntv in internal Field value 77 78 *Returns tuple: (name, dtype, codec, parent, keys, coef, leng)* 79 80 - name (None or string): name of the Field 81 - dtype (None or string): type of data 82 - codec (list): list of Field codec values 83 - parent (None or int): Field parent or None 84 - keys (None or list): Field keys 85 - coef (None or int): coef if primary Field else None 86 - leng (int): length of the Field 87 ''' 88 typ = ntv.type_str if ntv.ntv_type else None 89 nam = ntv.name 90 val = ntv_to_val(ntv) 91 if ntv.__class__.__name__ == 'NtvSingle': 92 return (nam, typ, [val], None, None, None, 1) 93 if len(ntv) < 2 or len(ntv) > 3 or ntv[0].__class__.__name__ == 'NtvSingle': 94 return (nam, typ, val, None, None, None, len(ntv)) 95 96 ntvc = ntv[0] 97 leng = max(len(ind) for ind in ntv) 98 typc = ntvc.type_str if ntvc.ntv_type else None 99 valc = ntv_to_val(ntvc) 100 if len(ntv) == 3 and ntv[1].__class__.__name__ == 'NtvSingle' and \ 101 isinstance(ntv[1].val, (int, str)) and \ 102 ntv[2].__class__.__name__ != 'NtvSingle' and \ 103 isinstance(ntv[2][0].val, int): 104 return (nam, typc, valc, ntv[1].val, ntv[2].to_obj(), None, leng) 105 if len(ntv) == 2 and len(ntv[1]) == 1 and isinstance(ntv[1].val, (int, str)): 106 return (nam, typc, valc, ntv[1].val, None, None, leng) 107 if len(ntv) == 2 and len(ntv[1]) == 1 and isinstance(ntv[1].val, list): 108 leng = leng * ntv[1][0].val 109 return (nam, typc, valc, None, None, ntv[1][0].val, leng) 110 if len(ntv) == 2 and len(ntv[1]) > 1 and isinstance(ntv[1][0].val, int): 111 return (nam, typc, valc, None, ntv[1].to_obj(), None, leng) 112 return (nam, typ, val, None, None, None, len(ntv))
Generate a tuple data from a Ntv tab value (bytes, string, json, Ntv object)
parameters:
- ntv: Ntv data to decode,
- ntv_to_val: method to convert external value form ntv in internal Field value
Returns tuple: (name, dtype, codec, parent, keys, coef, leng)
- name (None or string): name of the Field
- dtype (None or string): type of data
- codec (list): list of Field codec values
- parent (None or int): Field parent or None
- keys (None or list): Field keys
- coef (None or int): coef if primary Field else None
- leng (int): length of the Field
114 @staticmethod 115 def to_ntvpointer(jsonpointer, unique_root=False): 116 '''convert a json pointer inter a NTV pointer (string) 117 118 *parameters:* 119 120 - **jsonpointer**: String - json pointer to convert, 121 - **unique_root**: Boolean (default False) - True if the json root length is 1 ''' 122 single = '/([0-9]+)(/[a-z])' 123 if unique_root and not '0' <= jsonpointer[1] <= '9': 124 return re.sub(single, r'\g<2>', jsonpointer)[1:] 125 return re.sub(single, r'\g<2>', jsonpointer)
convert a json pointer inter a NTV pointer (string)
parameters:
- jsonpointer: String - json pointer to convert,
- unique_root: Boolean (default False) - True if the json root length is 1
128class NtvConnector(ABC): 129 ''' The NtvConnector class is an abstract class used by all NTV connectors 130 for conversion between NTV-JSON data and NTV-OBJ data. 131 132 A NtvConnector child is defined by: 133 - clas_obj: str - define the class name of the object to convert 134 - clas_typ: str - define the Datatype of the converted object 135 - to_obj_ntv: method - converter from JsonNTV to the object 136 - to_json_ntv: method - converter from the object to JsonNTV 137 138 *class method :* 139 - `connector` 140 - `dic_connec` 141 - `castable` (@property) 142 - `dic_obj` (@property) 143 - `dic_type` (@property) 144 145 *abstract method* 146 - `to_obj_ntv` 147 - `to_json_ntv` 148 149 *static method* 150 - `cast` 151 - `uncast` 152 - `is_json_class` 153 - `is_json` 154 - `init_ntv_keys` 155 ''' 156 157 DIC_NTV_CL = {'NtvSingle': 'ntv', 'NtvList': 'ntv'} 158 DIC_GEO_CL = {'Point': 'point', 'MultiPoint': 'multipoint', 'LineString': 'line', 159 'MultiLineString': 'multiline', 'Polygon': 'polygon', 160 'MultiPolygon': 'multipolygon'} 161 DIC_DAT_CL = {'date': 'date', 'time': 'time', 'datetime': 'datetime'} 162 DIC_FCT = {'date': datetime.date.fromisoformat, 'time': datetime.time.fromisoformat, 163 'datetime': datetime.datetime.fromisoformat} 164 DIC_GEO = {'point': 'point', 'multipoint': 'multipoint', 'line': 'linestring', 165 'multiline': 'multilinestring', 'polygon': 'polygon', 166 'multipolygon': 'multipolygon'} 167 DIC_CBOR = {'point': False, 'multipoint': False, 'line': False, 168 'multiline': False, 'polygon': False, 'multipolygon': False, 169 'date': True, 'time': False, 'datetime': True} 170 DIC_OBJ = {'tab': 'DataFrameConnec', 'field': 'SeriesConnec', 171 'ndarray': 'NdarrayConnec', 172 'point': 'ShapelyConnec', 'multipoint': 'ShapelyConnec', 173 'line': 'ShapelyConnec', 'multiline': 'ShapelyConnec', 174 'polygon': 'ShapelyConnec', 'multipolygon': 'ShapelyConnec', 175 'other': None} 176 177 @classmethod 178 @property 179 def castable(cls): 180 '''return a list with class_name allowed for json-obj conversion''' 181 return ['str', 'int', 'bool', 'float', 'dict', 'tuple', 'NoneType', 182 'NtvSingle', 'NtvList'] \ 183 + list(NtvConnector.DIC_GEO_CL.keys()) \ 184 + list(NtvConnector.DIC_FCT.keys()) \ 185 + list(NtvConnector.dic_connec().keys()) 186 187 @classmethod 188 @property 189 def dic_obj(cls): 190 '''return a dict with the connectors: { type: class_connec_name }''' 191 return {clas.clas_typ: clas.__name__ for clas in cls.__subclasses__()} |\ 192 NtvConnector.DIC_OBJ 193 194 @classmethod 195 @property 196 def dic_type(cls): 197 '''return a dict with the connectors: { class_obj_name: type }''' 198 return {clas.clas_obj: clas.clas_typ for clas in cls.__subclasses__()} |\ 199 NtvConnector.DIC_GEO_CL | NtvConnector.DIC_DAT_CL | NtvConnector.DIC_NTV_CL 200 201 @classmethod 202 def connector(cls): 203 '''return a dict with the connectors: { class_connec_name: class_connec }''' 204 return {clas.__name__: clas for clas in cls.__subclasses__()} 205 206 @classmethod 207 def dic_connec(cls): 208 '''return a dict with the clas associated to the connector: 209 { class_obj_name: class_connec_name }''' 210 return {clas.clas_obj: clas.__name__ for clas in cls.__subclasses__()} 211 212 @staticmethod 213 @abstractmethod 214 def to_obj_ntv(ntv_value, **kwargs): 215 ''' abstract - convert ntv_value into the return object''' 216 217 @staticmethod 218 @abstractmethod 219 def to_json_ntv(value, name=None, typ=None): 220 ''' abstract - convert NTV object (value, name, type) into the NTV json 221 (json-value, name, type)''' 222 223 @staticmethod 224 def cast(data, name=None, type_str=None): 225 '''return JSON-NTV conversion (json_value, name, type_str) of the NTV entity 226 defined in parameters. 227 228 *Parameters* 229 230 - **data**: NtvSingle entity or NTVvalue of the NTV entity 231 - **name** : String (default None) - name of the NTV entity 232 - **type_str**: String (default None) - type of the NTV entity 233 ''' 234 clas = data.__class__.__name__ 235 if clas == 'NtvSingle': 236 name = data.ntv_name 237 type_str = data.type_str 238 data = data.ntv_value 239 dic_geo_cl = NtvConnector.DIC_GEO_CL 240 dic_connec = NtvConnector.dic_connec() 241 match clas: 242 case 'int' | 'float' | 'bool' | 'str': 243 return (data, name, type_str) 244 case 'dict': 245 return ({key: NtvConnector.cast(val, name, type_str)[0] 246 for key, val in data.items()}, name, type_str) 247 case 'list': 248 return ([NtvConnector.cast(val, name, type_str)[0] for val in data], 249 name, NtvConnector._typ_obj(data) if not type_str else type_str) 250 case 'tuple': 251 return (list(data), name, 'array' if not type_str else type_str) 252 case 'date' | 'time' | 'datetime': 253 return (data.isoformat(), name, clas if not type_str else type_str) 254 case 'Point' | 'MultiPoint' | 'LineString' | 'MultiLineString' | \ 255 'Polygon' | 'MultiPolygon': 256 return (NtvConnector.connector()[dic_connec['geometry']].to_json_ntv(data)[0], 257 name, dic_geo_cl[data.__class__.__name__] if not type_str else type_str) 258 case 'NtvSingle' | 'NtvList': 259 return (data.to_obj(), name, 'ntv' if not type_str else type_str) 260 case _: 261 connec = None 262 if clas in dic_connec and dic_connec[clas] in NtvConnector.connector(): 263 connec = NtvConnector.connector()[dic_connec[clas]] 264 if connec: 265 return connec.to_json_ntv(data, name, type_str) 266 raise NtvError( 267 'connector is not defined for the class : ', clas) 268 return (data, name, type_str) 269 270 @staticmethod 271 def uncast(value, name=None, type_str=None, **kwargs): 272 '''return OBJ-NTV conversion (obj_value, name, type_str) of a NTV entity 273 274 *Parameters* 275 276 - **value**: NtvSingle entity or NTVvalue of the NTV entity 277 - **name** : String (default None) - name of the NTV entity 278 - **type_str**: String (default None) - type of the NTV entity 279 ''' 280 if type_str == 'json': 281 return (value, name, type_str) 282 typebase_str = type_str 283 if value.__class__.__name__ == 'NtvSingle': 284 if not (type_str in set(NtvConnector.dic_type.values()) and 285 NtvConnector.is_json(value) or type_str is None): 286 return (value.ntv_value, value.name, value.type_str) 287 type_str = value.type_str if value.ntv_type else None 288 typebase_str = value.typebase_str if value.ntv_type else None 289 name = value.ntv_name 290 value = value.ntv_value 291 option = {'dicobj': {}, 'format': 'json', 'type_obj': False} | kwargs 292 value_obj = NtvConnector._uncast_val(value, typebase_str, **option) 293 return (value_obj, name, type_str if type_str else NtvConnector._typ_obj(value_obj)) 294 295 @staticmethod 296 def _typ_obj(value): 297 if isinstance(value, dict): 298 return NtvConnector._typ_obj(list(value.values())) 299 if isinstance(value, (tuple, list)): 300 for val in value: 301 typ = NtvConnector._typ_obj(val) 302 if typ: 303 return typ 304 return None 305 return NtvConnector.dic_type.get(value.__class__.__name__) 306 307 @staticmethod 308 def _uncast_val(value, type_n, **option): 309 '''return value from ntv value 310 311 *Parameters* 312 313 - **value**: NtvSingle entity or NTVvalue of the NTV entity 314 ''' 315 dic_fct = NtvConnector.DIC_FCT 316 dic_geo = NtvConnector.DIC_GEO 317 dic_obj = NtvConnector.dic_obj | option['dicobj'] 318 dic_cbor = NtvConnector.DIC_CBOR 319 if not type_n or type_n == 'json' or (option['format'] == 'cbor' and 320 not dic_cbor.get(type_n, False)): 321 return value 322 if type_n in dic_fct: 323 if isinstance(value, (tuple, list)): 324 return [NtvConnector._uncast_val(val, type_n, **option) for val in value] 325 if isinstance(value, dict): 326 return {key: NtvConnector._uncast_val(val, type_n, **option) 327 for key, val in value.items()} 328 return dic_fct[type_n](value) 329 if type_n == 'array': 330 return tuple(value) 331 if type_n == 'ntv': 332 from json_ntv.ntv import Ntv 333 return Ntv.from_obj(value) 334 if type_n in dic_geo: 335 option['type_geo'] = dic_geo[type_n] 336 connec = None 337 if type_n in dic_obj and \ 338 dic_obj[type_n] in NtvConnector.connector(): 339 connec = NtvConnector.connector()[dic_obj[type_n]] 340 elif dic_obj['other'] in NtvConnector.connector(): 341 connec = NtvConnector.connector()['other'] 342 if connec: 343 return connec.to_obj_ntv(value, **option) 344 return value 345 346 @staticmethod 347 def is_json_class(val): 348 ''' return True if val is a json type''' 349 return val is None or isinstance(val, (list, int, str, float, bool, dict)) 350 351 @staticmethod 352 def is_json(obj): 353 ''' check if obj is a json structure and return True if obj is a json-value 354 355 *Parameters* 356 357 - **obj** : object to check 358 - **ntvobj** : boolean (default False) - if True NTV class value are accepted''' 359 if obj is None: 360 return True 361 is_js = NtvConnector.is_json 362 match obj: 363 case str() | int() | float() | bool() as obj: 364 return True 365 case list() | tuple() as obj: 366 if not obj: 367 return True 368 return min(is_js(obj_in) for obj_in in obj) 369 case dict() as obj: 370 if not obj: 371 return True 372 if not min(isinstance(key, str) for key in obj.keys()): 373 raise NtvError('key in dict in not string') 374 return min(is_js(obj_in) for obj_in in obj.values()) 375 case _: 376 if obj.__class__.__name__ not in NtvConnector.castable: 377 raise NtvError(obj.__class__.__name__ + 378 ' is not valid for NTV') 379 return False 380 381 @staticmethod 382 def keysfromderkeys(parentkeys, derkeys): 383 '''return keys from parent keys and derkeys 384 385 *Parameters* 386 387 - **parentkeys** : list of keys from parent 388 - **derkeys** : list of derived keys 389 390 *Returns* : list of keys''' 391 return [derkeys[pkey] for pkey in parentkeys] 392 393 @staticmethod 394 def encode_coef(lis): 395 '''Generate a repetition coefficient for periodic list''' 396 if len(lis) < 2: 397 return 0 398 coef = 1 399 while coef != len(lis): 400 if lis[coef-1] != lis[coef]: 401 break 402 coef += 1 403 if (not len(lis) % (coef * (max(lis) + 1)) and 404 lis == NtvConnector.keysfromcoef(coef, max(lis) + 1, len(lis))): 405 return coef 406 return 0 407 408 @staticmethod 409 def keysfromcoef(coef, period, leng=None): 410 ''' return a list of keys with periodic structure''' 411 if not leng: 412 leng = coef * period 413 return None if not (coef and period) else [(ind % (coef * period)) // coef 414 for ind in range(leng)] 415 416 @staticmethod 417 def format_field(lidx_decode, leng): 418 ''' return a list of format''' 419 # name: 0, type: 1, codec: 2, parent: 3, keys: 4, coef: 5, leng: 6 420 field_format = [] 421 for field_decode in lidx_decode: 422 name, typ, codec, parent, keys, coef, length = field_decode 423 if (keys, parent, coef) == (None, None, None): # full or unique 424 if len(codec) == 1: # unique 425 field_format.append('unique') 426 elif len(codec) == leng: # full 427 field_format.append('full') 428 else: 429 raise NtvError('impossible to generate keys') 430 continue 431 if keys and len(keys) > 1 and parent is None: # complete 432 field_format.append('complete') 433 continue 434 if coef: # primary 435 field_format.append('primary') 436 continue 437 if parent is None: 438 raise NtvError('keys not referenced') 439 if not keys: # implicit 440 field_format.append('implicit') 441 continue 442 field_format.append('relative') 443 return field_format 444 445 @staticmethod 446 def init_ntv_keys(ind, lidx, leng): 447 ''' initialization of explicit keys data in lidx object of tabular data''' 448 # name: 0, type: 1, codec: 2, parent: 3, keys: 4, coef: 5, leng: 6 449 name, typ, codec, parent, keys, coef, length = lidx[ind] 450 if (keys, parent, coef) == (None, None, None): # full or unique 451 if len(codec) == 1: # unique 452 lidx[ind][4] = [0] * leng 453 elif len(codec) == leng: # full 454 lidx[ind][4] = list(range(leng)) 455 else: 456 raise NtvError('impossible to generate keys') 457 return 458 if keys and len(keys) > 1 and parent is None: # complete 459 return 460 if coef: # primary 461 lidx[ind][4] = [(ikey % (coef * len(codec))) // 462 coef for ikey in range(leng)] 463 lidx[ind][3] = None 464 return 465 if parent is None: 466 raise NtvError('keys not referenced') 467 if not lidx[parent][4] or len(lidx[parent][4]) != leng: 468 NtvConnector.init_ntv_keys(parent, lidx, leng) 469 if not keys and len(codec) == len(lidx[parent][2]): # implicit 470 lidx[ind][4] = lidx[parent][4] 471 lidx[ind][3] = None 472 return 473 lidx[ind][4] = NtvConnector.keysfromderkeys( 474 lidx[parent][4], keys) # relative 475 lidx[ind][3] = None 476 return
The NtvConnector class is an abstract class used by all NTV connectors for conversion between NTV-JSON data and NTV-OBJ data.
A NtvConnector child is defined by:
- clas_obj: str - define the class name of the object to convert
- clas_typ: str - define the Datatype of the converted object
- to_obj_ntv: method - converter from JsonNTV to the object
- to_json_ntv: method - converter from the object to JsonNTV
class method :
connector
dic_connec
castable
(@property)dic_obj
(@property)dic_type
(@property)
abstract method
static method
177 @classmethod 178 @property 179 def castable(cls): 180 '''return a list with class_name allowed for json-obj conversion''' 181 return ['str', 'int', 'bool', 'float', 'dict', 'tuple', 'NoneType', 182 'NtvSingle', 'NtvList'] \ 183 + list(NtvConnector.DIC_GEO_CL.keys()) \ 184 + list(NtvConnector.DIC_FCT.keys()) \ 185 + list(NtvConnector.dic_connec().keys())
return a list with class_name allowed for json-obj conversion
187 @classmethod 188 @property 189 def dic_obj(cls): 190 '''return a dict with the connectors: { type: class_connec_name }''' 191 return {clas.clas_typ: clas.__name__ for clas in cls.__subclasses__()} |\ 192 NtvConnector.DIC_OBJ
return a dict with the connectors: { type: class_connec_name }
194 @classmethod 195 @property 196 def dic_type(cls): 197 '''return a dict with the connectors: { class_obj_name: type }''' 198 return {clas.clas_obj: clas.clas_typ for clas in cls.__subclasses__()} |\ 199 NtvConnector.DIC_GEO_CL | NtvConnector.DIC_DAT_CL | NtvConnector.DIC_NTV_CL
return a dict with the connectors: { class_obj_name: type }
201 @classmethod 202 def connector(cls): 203 '''return a dict with the connectors: { class_connec_name: class_connec }''' 204 return {clas.__name__: clas for clas in cls.__subclasses__()}
return a dict with the connectors: { class_connec_name: class_connec }
206 @classmethod 207 def dic_connec(cls): 208 '''return a dict with the clas associated to the connector: 209 { class_obj_name: class_connec_name }''' 210 return {clas.clas_obj: clas.__name__ for clas in cls.__subclasses__()}
return a dict with the clas associated to the connector: { class_obj_name: class_connec_name }
212 @staticmethod 213 @abstractmethod 214 def to_obj_ntv(ntv_value, **kwargs): 215 ''' abstract - convert ntv_value into the return object'''
abstract - convert ntv_value into the return object
217 @staticmethod 218 @abstractmethod 219 def to_json_ntv(value, name=None, typ=None): 220 ''' abstract - convert NTV object (value, name, type) into the NTV json 221 (json-value, name, type)'''
abstract - convert NTV object (value, name, type) into the NTV json (json-value, name, type)
223 @staticmethod 224 def cast(data, name=None, type_str=None): 225 '''return JSON-NTV conversion (json_value, name, type_str) of the NTV entity 226 defined in parameters. 227 228 *Parameters* 229 230 - **data**: NtvSingle entity or NTVvalue of the NTV entity 231 - **name** : String (default None) - name of the NTV entity 232 - **type_str**: String (default None) - type of the NTV entity 233 ''' 234 clas = data.__class__.__name__ 235 if clas == 'NtvSingle': 236 name = data.ntv_name 237 type_str = data.type_str 238 data = data.ntv_value 239 dic_geo_cl = NtvConnector.DIC_GEO_CL 240 dic_connec = NtvConnector.dic_connec() 241 match clas: 242 case 'int' | 'float' | 'bool' | 'str': 243 return (data, name, type_str) 244 case 'dict': 245 return ({key: NtvConnector.cast(val, name, type_str)[0] 246 for key, val in data.items()}, name, type_str) 247 case 'list': 248 return ([NtvConnector.cast(val, name, type_str)[0] for val in data], 249 name, NtvConnector._typ_obj(data) if not type_str else type_str) 250 case 'tuple': 251 return (list(data), name, 'array' if not type_str else type_str) 252 case 'date' | 'time' | 'datetime': 253 return (data.isoformat(), name, clas if not type_str else type_str) 254 case 'Point' | 'MultiPoint' | 'LineString' | 'MultiLineString' | \ 255 'Polygon' | 'MultiPolygon': 256 return (NtvConnector.connector()[dic_connec['geometry']].to_json_ntv(data)[0], 257 name, dic_geo_cl[data.__class__.__name__] if not type_str else type_str) 258 case 'NtvSingle' | 'NtvList': 259 return (data.to_obj(), name, 'ntv' if not type_str else type_str) 260 case _: 261 connec = None 262 if clas in dic_connec and dic_connec[clas] in NtvConnector.connector(): 263 connec = NtvConnector.connector()[dic_connec[clas]] 264 if connec: 265 return connec.to_json_ntv(data, name, type_str) 266 raise NtvError( 267 'connector is not defined for the class : ', clas) 268 return (data, name, type_str)
return JSON-NTV conversion (json_value, name, type_str) of the NTV entity defined in parameters.
Parameters
- data: NtvSingle entity or NTVvalue of the NTV entity
- name : String (default None) - name of the NTV entity
- type_str: String (default None) - type of the NTV entity
270 @staticmethod 271 def uncast(value, name=None, type_str=None, **kwargs): 272 '''return OBJ-NTV conversion (obj_value, name, type_str) of a NTV entity 273 274 *Parameters* 275 276 - **value**: NtvSingle entity or NTVvalue of the NTV entity 277 - **name** : String (default None) - name of the NTV entity 278 - **type_str**: String (default None) - type of the NTV entity 279 ''' 280 if type_str == 'json': 281 return (value, name, type_str) 282 typebase_str = type_str 283 if value.__class__.__name__ == 'NtvSingle': 284 if not (type_str in set(NtvConnector.dic_type.values()) and 285 NtvConnector.is_json(value) or type_str is None): 286 return (value.ntv_value, value.name, value.type_str) 287 type_str = value.type_str if value.ntv_type else None 288 typebase_str = value.typebase_str if value.ntv_type else None 289 name = value.ntv_name 290 value = value.ntv_value 291 option = {'dicobj': {}, 'format': 'json', 'type_obj': False} | kwargs 292 value_obj = NtvConnector._uncast_val(value, typebase_str, **option) 293 return (value_obj, name, type_str if type_str else NtvConnector._typ_obj(value_obj))
return OBJ-NTV conversion (obj_value, name, type_str) of a NTV entity
Parameters
- value: NtvSingle entity or NTVvalue of the NTV entity
- name : String (default None) - name of the NTV entity
- type_str: String (default None) - type of the NTV entity
346 @staticmethod 347 def is_json_class(val): 348 ''' return True if val is a json type''' 349 return val is None or isinstance(val, (list, int, str, float, bool, dict))
return True if val is a json type
351 @staticmethod 352 def is_json(obj): 353 ''' check if obj is a json structure and return True if obj is a json-value 354 355 *Parameters* 356 357 - **obj** : object to check 358 - **ntvobj** : boolean (default False) - if True NTV class value are accepted''' 359 if obj is None: 360 return True 361 is_js = NtvConnector.is_json 362 match obj: 363 case str() | int() | float() | bool() as obj: 364 return True 365 case list() | tuple() as obj: 366 if not obj: 367 return True 368 return min(is_js(obj_in) for obj_in in obj) 369 case dict() as obj: 370 if not obj: 371 return True 372 if not min(isinstance(key, str) for key in obj.keys()): 373 raise NtvError('key in dict in not string') 374 return min(is_js(obj_in) for obj_in in obj.values()) 375 case _: 376 if obj.__class__.__name__ not in NtvConnector.castable: 377 raise NtvError(obj.__class__.__name__ + 378 ' is not valid for NTV') 379 return False
check if obj is a json structure and return True if obj is a json-value
Parameters
- obj : object to check
- ntvobj : boolean (default False) - if True NTV class value are accepted
381 @staticmethod 382 def keysfromderkeys(parentkeys, derkeys): 383 '''return keys from parent keys and derkeys 384 385 *Parameters* 386 387 - **parentkeys** : list of keys from parent 388 - **derkeys** : list of derived keys 389 390 *Returns* : list of keys''' 391 return [derkeys[pkey] for pkey in parentkeys]
return keys from parent keys and derkeys
Parameters
- parentkeys : list of keys from parent
- derkeys : list of derived keys
Returns : list of keys
393 @staticmethod 394 def encode_coef(lis): 395 '''Generate a repetition coefficient for periodic list''' 396 if len(lis) < 2: 397 return 0 398 coef = 1 399 while coef != len(lis): 400 if lis[coef-1] != lis[coef]: 401 break 402 coef += 1 403 if (not len(lis) % (coef * (max(lis) + 1)) and 404 lis == NtvConnector.keysfromcoef(coef, max(lis) + 1, len(lis))): 405 return coef 406 return 0
Generate a repetition coefficient for periodic list
408 @staticmethod 409 def keysfromcoef(coef, period, leng=None): 410 ''' return a list of keys with periodic structure''' 411 if not leng: 412 leng = coef * period 413 return None if not (coef and period) else [(ind % (coef * period)) // coef 414 for ind in range(leng)]
return a list of keys with periodic structure
416 @staticmethod 417 def format_field(lidx_decode, leng): 418 ''' return a list of format''' 419 # name: 0, type: 1, codec: 2, parent: 3, keys: 4, coef: 5, leng: 6 420 field_format = [] 421 for field_decode in lidx_decode: 422 name, typ, codec, parent, keys, coef, length = field_decode 423 if (keys, parent, coef) == (None, None, None): # full or unique 424 if len(codec) == 1: # unique 425 field_format.append('unique') 426 elif len(codec) == leng: # full 427 field_format.append('full') 428 else: 429 raise NtvError('impossible to generate keys') 430 continue 431 if keys and len(keys) > 1 and parent is None: # complete 432 field_format.append('complete') 433 continue 434 if coef: # primary 435 field_format.append('primary') 436 continue 437 if parent is None: 438 raise NtvError('keys not referenced') 439 if not keys: # implicit 440 field_format.append('implicit') 441 continue 442 field_format.append('relative') 443 return field_format
return a list of format
445 @staticmethod 446 def init_ntv_keys(ind, lidx, leng): 447 ''' initialization of explicit keys data in lidx object of tabular data''' 448 # name: 0, type: 1, codec: 2, parent: 3, keys: 4, coef: 5, leng: 6 449 name, typ, codec, parent, keys, coef, length = lidx[ind] 450 if (keys, parent, coef) == (None, None, None): # full or unique 451 if len(codec) == 1: # unique 452 lidx[ind][4] = [0] * leng 453 elif len(codec) == leng: # full 454 lidx[ind][4] = list(range(leng)) 455 else: 456 raise NtvError('impossible to generate keys') 457 return 458 if keys and len(keys) > 1 and parent is None: # complete 459 return 460 if coef: # primary 461 lidx[ind][4] = [(ikey % (coef * len(codec))) // 462 coef for ikey in range(leng)] 463 lidx[ind][3] = None 464 return 465 if parent is None: 466 raise NtvError('keys not referenced') 467 if not lidx[parent][4] or len(lidx[parent][4]) != leng: 468 NtvConnector.init_ntv_keys(parent, lidx, leng) 469 if not keys and len(codec) == len(lidx[parent][2]): # implicit 470 lidx[ind][4] = lidx[parent][4] 471 lidx[ind][3] = None 472 return 473 lidx[ind][4] = NtvConnector.keysfromderkeys( 474 lidx[parent][4], keys) # relative 475 lidx[ind][3] = None 476 return
initialization of explicit keys data in lidx object of tabular data
479class NtvTree: 480 ''' The NtvTree class is an iterator class used to traverse a NTV tree structure. 481 Some other methods give tree indicators and data. 482 483 *Attributes :* 484 485 - **ntv** : Ntv entity 486 - **_node**: Ntv entity - node pointer 487 - **_stack**: list - stack used to store context in recursive methods 488 489 *dynamic values (@property)* 490 - `breadth` 491 - `size` 492 - `height` 493 - `adjacency_list` 494 - `nodes` 495 - `dic_nodes` 496 - `leaf_nodes` 497 - `inner_nodes` 498 ''' 499 500 def __init__(self, ntv): 501 ''' the parameter of the constructor is the Ntv entity''' 502 self._ntv = ntv 503 self._node = None 504 self._stack = [] 505 506 def __iter__(self): 507 ''' iterator without initialization''' 508 return self 509 510 def __next__(self): 511 ''' return next node in the tree''' 512 if self._node is None: 513 self._node = self._ntv 514 elif len(self._node) == 0: 515 raise StopIteration 516 elif self._node.__class__.__name__ == 'NtvList': 517 self._next_down() 518 else: 519 self._next_up() 520 return self._node 521 522 def _next_down(self): 523 ''' find the next subchild node''' 524 525 self._node = self._node[0] 526 self._stack.append(0) 527 528 def _next_up(self): 529 ''' find the next sibling or ancestor node''' 530 parent = self._node.parent 531 if not parent or self._node == self._ntv: 532 raise StopIteration 533 ind = self._stack[-1] 534 if ind < len(parent) - 1: # if ind is not the last 535 self._node = parent[ind + 1] 536 self._stack[-1] += 1 537 else: 538 if parent == self._ntv: 539 raise StopIteration 540 self._node = parent 541 self._stack.pop() 542 self._next_up() 543 544 @property 545 def breadth(self): 546 ''' return the number of leaves''' 547 return len(self.leaf_nodes) 548 549 @property 550 def size(self): 551 ''' return the number of nodes''' 552 return len(self.nodes) 553 554 @property 555 def height(self): 556 ''' return the height of the tree''' 557 return max(len(node.pointer()) for node in self.__class__(self._ntv)) - 1 558 559 @property 560 def adjacency_list(self): 561 ''' return a dict with the list of child nodes for each parent node''' 562 return {node: node.val for node in self.inner_nodes} 563 564 @property 565 def nodes(self): 566 ''' return the list of nodes according to the DFS preordering algorithm''' 567 return list(self.__class__(self._ntv)) 568 569 @property 570 def dic_nodes(self): 571 ''' return a dict of nodes according to the DFS preordering algorithm''' 572 return {node.ntv_name: node for node in self.__class__(self._ntv) 573 if node.ntv_name} 574 575 @property 576 def leaf_nodes(self): 577 ''' return the list of leaf nodes according to the DFS preordering algorithm''' 578 return [node for node in self.__class__(self._ntv) 579 if node.__class__.__name__ == 'NtvSingle'] 580 581 @property 582 def inner_nodes(self): 583 ''' return the list of inner nodes according to the DFS preordering algorithm''' 584 return [node for node in self.__class__(self._ntv) 585 if node.__class__.__name__ == 'NtvList']
The NtvTree class is an iterator class used to traverse a NTV tree structure. Some other methods give tree indicators and data.
Attributes :
- ntv : Ntv entity
- _node: Ntv entity - node pointer
- _stack: list - stack used to store context in recursive methods
dynamic values (@property)
500 def __init__(self, ntv): 501 ''' the parameter of the constructor is the Ntv entity''' 502 self._ntv = ntv 503 self._node = None 504 self._stack = []
the parameter of the constructor is the Ntv entity
544 @property 545 def breadth(self): 546 ''' return the number of leaves''' 547 return len(self.leaf_nodes)
return the number of leaves
554 @property 555 def height(self): 556 ''' return the height of the tree''' 557 return max(len(node.pointer()) for node in self.__class__(self._ntv)) - 1
return the height of the tree
559 @property 560 def adjacency_list(self): 561 ''' return a dict with the list of child nodes for each parent node''' 562 return {node: node.val for node in self.inner_nodes}
return a dict with the list of child nodes for each parent node
564 @property 565 def nodes(self): 566 ''' return the list of nodes according to the DFS preordering algorithm''' 567 return list(self.__class__(self._ntv))
return the list of nodes according to the DFS preordering algorithm
569 @property 570 def dic_nodes(self): 571 ''' return a dict of nodes according to the DFS preordering algorithm''' 572 return {node.ntv_name: node for node in self.__class__(self._ntv) 573 if node.ntv_name}
return a dict of nodes according to the DFS preordering algorithm
575 @property 576 def leaf_nodes(self): 577 ''' return the list of leaf nodes according to the DFS preordering algorithm''' 578 return [node for node in self.__class__(self._ntv) 579 if node.__class__.__name__ == 'NtvSingle']
return the list of leaf nodes according to the DFS preordering algorithm
581 @property 582 def inner_nodes(self): 583 ''' return the list of inner nodes according to the DFS preordering algorithm''' 584 return [node for node in self.__class__(self._ntv) 585 if node.__class__.__name__ == 'NtvList']
return the list of inner nodes according to the DFS preordering algorithm
588class NtvJsonEncoder(json.JSONEncoder): 589 """json encoder for Ntv data""" 590 591 def default(self, o): 592 try: 593 return NtvConnector.cast(o)[0] 594 except NtvError: 595 return json.JSONEncoder.default(self, o) 596 if isinstance(o, (datetime.datetime, datetime.date, datetime.time)): 597 return o.isoformat() 598 return json.JSONEncoder.default(self, o)
json encoder for Ntv data
591 def default(self, o): 592 try: 593 return NtvConnector.cast(o)[0] 594 except NtvError: 595 return json.JSONEncoder.default(self, o) 596 if isinstance(o, (datetime.datetime, datetime.date, datetime.time)): 597 return o.isoformat() 598 return json.JSONEncoder.default(self, o)
Implement this method in a subclass such that it returns
a serializable object for o
, or calls the base implementation
(to raise a TypeError
).
For example, to support arbitrary iterators, you could implement default like this::
def default(self, o):
try:
iterable = iter(o)
except TypeError:
pass
else:
return list(iterable)
# Let the base class default method raise the TypeError
return JSONEncoder.default(self, o)
Inherited Members
- json.encoder.JSONEncoder
- JSONEncoder
- item_separator
- key_separator
- skipkeys
- ensure_ascii
- check_circular
- allow_nan
- sort_keys
- indent
- encode
- iterencode
NTV Exception
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback
- add_note
- args