NTV.json_ntv.ntv_util
Created on Feb 27 2023
@author: Philippe@loco-labs.io
The ntv_util
module is part of the NTV.json_ntv
package (specification document).
It contains the classes NtvUtil
, NtvConnector
, NtvTree
, NtvJsonEncoder
and NtvError
for NTV entities.
1# -*- coding: utf-8 -*- 2""" 3Created on Feb 27 2023 4 5@author: Philippe@loco-labs.io 6 7The `ntv_util` module is part of the `NTV.json_ntv` package ([specification document]( 8https://github.com/loco-philippe/NTV/blob/main/documentation/JSON-NTV-standard.pdf)). 9 10It contains the classes `NtvUtil`, `NtvConnector`, `NtvTree`, `NtvJsonEncoder` 11and `NtvError` for NTV entities. 12""" 13from abc import ABC, abstractmethod 14import datetime 15import json 16import re 17 18 19class NtvUtil: 20 ''' The NtvUtil class includes static methods used by several NTV classes. 21 NtvUtil is the parent class of `Datatype`, `Namespace`, `Ntv`. 22 23 *class variables :* 24 - **_namespaces_** : list of Namespace defined 25 - **_types_** : list of Datatype defined 26 27 *static methods :* 28 - `from_obj_name` 29 - `decode_ntv_tab` 30 - `to_ntv_pointer` 31 32 ''' 33 _namespaces_ = {} 34 _types_ = {} 35 36 @staticmethod 37 def from_obj_name(string): 38 '''return a tuple with name, type_str and separator from string''' 39 if not isinstance(string, str): 40 raise NtvError('a json-name have to be str') 41 if string == '': 42 return (None, None, None) 43 spl = string.rsplit(':', maxsplit=1) 44 if len(spl) == 1: 45 if string[-1] == '.': 46 return (None, string, None) 47 return(string, None, None) 48 if spl[0] == '': 49 return (None, spl[1], ':') 50 if spl[0][-1] == ':': 51 sp0 = spl[0][:-1] 52 return (None if sp0 == '' else sp0, None if spl[1] == '' else spl[1], '::') 53 return (None if spl[0] == '' else spl[0], None if spl[1] == '' else spl[1], ':') 54 55 @staticmethod 56 def decode_ntv_tab(ntv, ntv_to_val): 57 '''Generate a tuple data from a Ntv tab value (bytes, string, json, Ntv object) 58 59 *parameters:* 60 61 - **ntv**: Ntv data to decode, 62 - **ntv_to_val**: method to convert external value form ntv in internal Field value 63 64 *Returns tuple: (name, dtype, codec, parent, keys, coef, leng)* 65 66 - name (None or string): name of the Field 67 - dtype (None or string): type of data 68 - codec (list): list of Field codec values 69 - parent (None or int): Field parent or None 70 - keys (None or list): Field keys 71 - coef (None or int): coef if primary Field else None 72 - leng (int): length of the Field 73 ''' 74 typ = ntv.type_str if ntv.ntv_type else None 75 nam = ntv.name 76 val = ntv_to_val(ntv) 77 if ntv.__class__.__name__ == 'NtvSingle': 78 return (nam, typ, [val], None, None, None, 1) 79 if len(ntv) < 2 or len(ntv) > 3 or ntv[0].__class__.__name__ == 'NtvSingle': 80 return (nam, typ, val, None, None, None, len(ntv)) 81 82 ntvc = ntv[0] 83 leng = max(len(ind) for ind in ntv) 84 typc = ntvc.type_str if ntvc.ntv_type else None 85 valc = ntv_to_val(ntvc) 86 if len(ntv) == 3 and ntv[1].__class__.__name__ == 'NtvSingle' and \ 87 isinstance(ntv[1].val, (int, str)) and \ 88 ntv[2].__class__.__name__ != 'NtvSingle' and \ 89 isinstance(ntv[2][0].val, int): 90 return (nam, typc, valc, ntv[1].val, ntv[2].to_obj(), None, leng) 91 if len(ntv) == 2 and len(ntv[1]) == 1 and isinstance(ntv[1].val, (int, str)): 92 return (nam, typc, valc, ntv[1].val, None, None, leng) 93 if len(ntv) == 2 and len(ntv[1]) == 1 and isinstance(ntv[1].val, list): 94 leng = leng * ntv[1][0].val 95 return (nam, typc, valc, None, None, ntv[1][0].val, leng) 96 if len(ntv) == 2 and len(ntv[1]) > 1 and isinstance(ntv[1][0].val, int): 97 return (nam, typc, valc, None, ntv[1].to_obj(), None, leng) 98 return (nam, typ, val, None, None, None, len(ntv)) 99 100 @staticmethod 101 def to_ntvpointer(jsonpointer, unique_root=False): 102 '''convert a json pointer inter a NTV pointer (string) 103 104 *parameters:* 105 106 - **jsonpointer**: String - json pointer to convert, 107 - **unique_root**: Boolean (default False) - True if the json root length is 1 ''' 108 single = '/([0-9]+)(/[a-z])' 109 if unique_root and not ('0' <= jsonpointer[1] <= '9'): 110 return re.sub(single, '\g<2>', jsonpointer)[1:] 111 return re.sub(single, '\g<2>', jsonpointer) 112 113 114class NtvConnector(ABC): 115 ''' The NtvConnector class is an abstract class used by all NTV connectors 116 for conversion between NTV-JSON data and NTV-OBJ data. 117 118 A NtvConnector child is defined by: 119 - clas_obj: str - define the class name of the object to convert 120 - clas_typ: str - define the Datatype of the converted object 121 - to_obj_ntv: method - converter from JsonNTV to the object 122 - to_json_ntv: method - converter from the object to JsonNTV 123 124 *class method :* 125 - `connector` 126 - `dic_connec` 127 - `castable` (@property) 128 - `dic_obj` (@property) 129 - `dic_type` (@property) 130 131 *abstract method* 132 - `to_obj_ntv` 133 - `to_json_ntv` 134 135 *static method* 136 - `cast` 137 - `uncast` 138 - `is_json_class` 139 - `is_json` 140 - `init_ntv_keys` 141 ''' 142 143 DIC_NTV_CL = {'NtvSingle': 'ntv', 'NtvList': 'ntv'} 144 DIC_GEO_CL = {'Point': 'point', 'MultiPoint': 'multipoint', 'LineString': 'line', 145 'MultiLineString': 'multiline', 'Polygon': 'polygon', 146 'MultiPolygon': 'multipolygon'} 147 DIC_DAT_CL = {'date': 'date', 'time': 'time', 'datetime': 'datetime'} 148 DIC_FCT = {'date': datetime.date.fromisoformat, 'time': datetime.time.fromisoformat, 149 'datetime': datetime.datetime.fromisoformat} 150 DIC_GEO = {'point': 'point', 'multipoint': 'multipoint', 'line': 'linestring', 151 'multiline': 'multilinestring', 'polygon': 'polygon', 152 'multipolygon': 'multipolygon'} 153 DIC_CBOR = {'point': False, 'multipoint': False, 'line': False, 154 'multiline': False, 'polygon': False, 'multipolygon': False, 155 'date': True, 'time': False, 'datetime': True} 156 DIC_OBJ = {'tab': 'DataFrameConnec', 'field': 'SeriesConnec', 157 'point': 'ShapelyConnec', 'multipoint': 'ShapelyConnec', 158 'line': 'ShapelyConnec', 'multiline': 'ShapelyConnec', 159 'polygon': 'ShapelyConnec', 'multipolygon': 'ShapelyConnec', 160 'other': None} 161 162 @classmethod 163 @property 164 def castable(cls): 165 '''return a list with class_name allowed for json-obj conversion''' 166 return ['str', 'int', 'bool', 'float', 'dict', 'tuple', 'NoneType', 167 'NtvSingle', 'NtvList'] \ 168 + list(NtvConnector.DIC_GEO_CL.keys()) \ 169 + list(NtvConnector.DIC_FCT.keys()) \ 170 + list(NtvConnector.dic_connec().keys()) 171 172 @classmethod 173 @property 174 def dic_obj(cls): 175 '''return a dict with the connectors: { type: class_connec_name }''' 176 return {clas.clas_typ: clas.__name__ for clas in cls.__subclasses__()} |\ 177 NtvConnector.DIC_OBJ 178 179 @classmethod 180 @property 181 def dic_type(cls): 182 '''return a dict with the connectors: { class_obj_name: type }''' 183 return {clas.clas_obj: clas.clas_typ for clas in cls.__subclasses__()} |\ 184 NtvConnector.DIC_GEO_CL | NtvConnector.DIC_DAT_CL | NtvConnector.DIC_NTV_CL 185 186 @classmethod 187 def connector(cls): 188 '''return a dict with the connectors: { class_connec_name: class_connec }''' 189 return {clas.__name__: clas for clas in cls.__subclasses__()} 190 191 @classmethod 192 def dic_connec(cls): 193 '''return a dict with the clas associated to the connector: 194 { class_obj_name: class_connec_name }''' 195 return {clas.clas_obj: clas.__name__ for clas in cls.__subclasses__()} 196 197 @staticmethod 198 @abstractmethod 199 def to_obj_ntv(ntv_value, **kwargs): 200 ''' abstract - convert ntv_value into the return object''' 201 202 @staticmethod 203 @abstractmethod 204 def to_json_ntv(value, name=None, typ=None): 205 ''' abstract - convert NTV object (value, name, type) into the NTV json 206 (json-value, name, type)''' 207 208 @staticmethod 209 def cast(data, name=None, type_str=None): 210 '''return JSON-NTV conversion (json_value, name, type_str) of the NTV entity 211 defined in parameters. 212 213 *Parameters* 214 215 - **data**: NtvSingle entity or NTVvalue of the NTV entity 216 - **name** : String (default None) - name of the NTV entity 217 - **type_str**: String (default None) - type of the NTV entity 218 ''' 219 clas = data.__class__.__name__ 220 if clas == 'NtvSingle': 221 name = data.ntv_name 222 type_str = data.type_str 223 data = data.ntv_value 224 dic_geo_cl = NtvConnector.DIC_GEO_CL 225 dic_connec = NtvConnector.dic_connec() 226 match clas: 227 case 'int' | 'float' | 'bool' | 'str': 228 return (data, name, type_str) 229 case 'dict': 230 return ({key: NtvConnector.cast(val, name, type_str)[0] 231 for key, val in data.items()}, name, type_str) 232 case 'list': 233 return ([NtvConnector.cast(val, name, type_str)[0] for val in data], 234 name, NtvConnector._typ_obj(data) if not type_str else type_str) 235 case 'tuple': 236 return (list(data), name, 'array' if not type_str else type_str) 237 case 'date' | 'time' | 'datetime': 238 return (data.isoformat(), name, clas if not type_str else type_str) 239 case 'Point' | 'MultiPoint' | 'LineString' | 'MultiLineString' | \ 240 'Polygon' | 'MultiPolygon': 241 return (NtvConnector.connector()[dic_connec['geometry']].to_json_ntv(data)[0], 242 name, dic_geo_cl[data.__class__.__name__] if not type_str else type_str) 243 case 'NtvSingle' | 'NtvList': 244 return (data.to_obj(), name, 'ntv' if not type_str else type_str) 245 case _: 246 connec = None 247 if clas in dic_connec and dic_connec[clas] in NtvConnector.connector(): 248 connec = NtvConnector.connector()[dic_connec[clas]] 249 if connec: 250 return connec.to_json_ntv(data, name, type_str) 251 raise NtvError( 252 'connector is not defined for the class : ', clas) 253 return (data, name, type_str) 254 255 @staticmethod 256 def uncast(value, name=None, type_str=None, **kwargs): 257 '''return OBJ-NTV conversion (obj_value, name, type_str) of a NTV entity 258 259 *Parameters* 260 261 - **data**: NtvSingle entity or NTVvalue of the NTV entity 262 - **name** : String (default None) - name of the NTV entity 263 - **type_str**: String (default None) - type of the NTV entity 264 ''' 265 if type_str == 'json': 266 return (value, name, type_str) 267 if value.__class__.__name__ == 'NtvSingle': 268 if not (type_str in set(NtvConnector.dic_type.values()) and 269 NtvConnector.is_json(value) or type_str is None): 270 return (value.ntv_value, value.name, value.type_str) 271 type_str = value.type_str if value.ntv_type else None 272 name = value.ntv_name 273 value = value.ntv_value 274 option = {'dicobj': {}, 'format': 'json', 'type_obj': False} | kwargs 275 value_obj = NtvConnector._uncast_val(value, type_str, **option) 276 return (value_obj, name, type_str if type_str else NtvConnector._typ_obj(value_obj)) 277 278 @staticmethod 279 def _typ_obj(value): 280 if isinstance(value, dict): 281 return NtvConnector._typ_obj(list(value.values())) 282 if isinstance(value, (tuple, list)): 283 for val in value: 284 typ = NtvConnector._typ_obj(val) 285 if typ: 286 return typ 287 return None 288 return NtvConnector.dic_type.get(value.__class__.__name__) 289 290 @staticmethod 291 def _uncast_val(value, type_n, **option): 292 '''return value from ntv value''' 293 dic_fct = NtvConnector.DIC_FCT 294 dic_geo = NtvConnector.DIC_GEO 295 dic_obj = NtvConnector.dic_obj | option['dicobj'] 296 dic_cbor = NtvConnector.DIC_CBOR 297 if not type_n or type_n == 'json' or (option['format'] == 'cbor' and 298 not dic_cbor.get(type_n, False)): 299 return value 300 if type_n in dic_fct: 301 if isinstance(value, (tuple, list)): 302 return [NtvConnector._uncast_val(val, type_n, **option) for val in value] 303 if isinstance(value, dict): 304 return {key: NtvConnector._uncast_val(val, type_n, **option) 305 for key, val in value.items()} 306 return dic_fct[type_n](value) 307 if type_n == 'array': 308 return tuple(value) 309 if type_n == 'ntv': 310 from json_ntv.ntv import Ntv 311 return Ntv.from_obj(value) 312 if type_n in dic_geo: 313 option['type_geo'] = dic_geo[type_n] 314 connec = None 315 if type_n in dic_obj and \ 316 dic_obj[type_n] in NtvConnector.connector(): 317 connec = NtvConnector.connector()[dic_obj[type_n]] 318 elif dic_obj['other'] in NtvConnector.connector(): 319 connec = NtvConnector.connector()['other'] 320 if connec: 321 return connec.to_obj_ntv(value, **option) 322 return value 323 324 @staticmethod 325 def is_json_class(val): 326 ''' return True if val is a json type''' 327 return val is None or isinstance(val, (list, int, str, float, bool, dict)) 328 329 @staticmethod 330 def is_json(obj): 331 ''' check if obj is a json structure and return True if obj is a json-value 332 333 *Parameters* 334 335 - **obj** : object to check 336 - **ntvobj** : boolean (default False) - if True NTV class value are accepted''' 337 if obj is None: 338 return True 339 is_js = NtvConnector.is_json 340 match obj: 341 case str() | int() | float() | bool() as obj: 342 return True 343 case list() | tuple() as obj: 344 if not obj: 345 return True 346 return min(is_js(obj_in) for obj_in in obj) 347 case dict() as obj: 348 if not obj: 349 return True 350 if not min(isinstance(key, str) for key in obj.keys()): 351 raise NtvError('key in dict in not string') 352 return min(is_js(obj_in) for obj_in in obj.values()) 353 case _: 354 if not obj.__class__.__name__ in NtvConnector.castable: 355 raise NtvError(obj.__class__.__name__ + 356 ' is not valid for NTV') 357 return False 358 359 @staticmethod 360 def keysfromderkeys(parentkeys, derkeys): 361 '''return keys from parent keys and derkeys 362 363 *Parameters* 364 365 - **parentkeys** : list of keys from parent 366 - **derkeys** : list of derived keys 367 368 *Returns* : list of keys''' 369 return [derkeys[pkey] for pkey in parentkeys] 370 371 @staticmethod 372 def encode_coef(lis): 373 '''Generate a repetition coefficient for periodic list''' 374 if len(lis) < 2: 375 return 0 376 coef = 1 377 while coef != len(lis): 378 if lis[coef-1] != lis[coef]: 379 break 380 coef += 1 381 if (not len(lis) % (coef * (max(lis) + 1)) and 382 lis == NtvConnector.keysfromcoef(coef, max(lis) + 1, len(lis))): 383 return coef 384 return 0 385 386 @staticmethod 387 def keysfromcoef(coef, period, leng=None): 388 ''' return a list of keys with periodic structure''' 389 if not leng: 390 leng = coef * period 391 return None if not (coef and period) else [(ind % (coef * period)) // coef 392 for ind in range(leng)] 393 394 @staticmethod 395 def init_ntv_keys(ind, lidx, leng): 396 ''' initialization of explicit keys data in lidx object of tabular data''' 397 # name: 0, type: 1, codec: 2, parent: 3, keys: 4, coef: 5, leng: 6 398 name, typ, codec, parent, keys, coef, length = lidx[ind] 399 if (keys, parent, coef) == (None, None, None): # full or unique 400 if len(codec) == 1: # unique 401 lidx[ind][4] = [0] * leng 402 elif len(codec) == leng: # full 403 lidx[ind][4] = list(range(leng)) 404 else: 405 raise NtvError('impossible to generate keys') 406 return 407 if keys and len(keys) > 1 and parent is None: # complete 408 return 409 if coef: # primary 410 lidx[ind][4] = [(ikey % (coef * len(codec))) // 411 coef for ikey in range(leng)] 412 lidx[ind][3] = None 413 return 414 if parent is None: 415 raise NtvError('keys not referenced') 416 if not lidx[parent][4] or len(lidx[parent][4]) != leng: 417 NtvConnector.init_ntv_keys(parent, lidx, leng) 418 if not keys and len(codec) == len(lidx[parent][2]): # implicit 419 lidx[ind][4] = lidx[parent][4] 420 lidx[ind][3] = None 421 return 422 lidx[ind][4] = NtvConnector.keysfromderkeys( 423 lidx[parent][4], keys) # relative 424 lidx[ind][3] = None 425 return 426 427 428class NtvTree: 429 ''' The NtvTree class is an iterator class used to traverse a NTV tree structure. 430 Some other methods give tree indicators and data. 431 432 *Attributes :* 433 434 - **ntv** : Ntv entity 435 - **_node**: Ntv entity - node pointer 436 - **_stack**: list - stack used to store context in recursive methods 437 438 *dynamic values (@property)* 439 - `breadth` 440 - `size` 441 - `height` 442 - `adjacency_list` 443 - `nodes` 444 - `dic_nodes` 445 - `leaf_nodes` 446 - `inner_nodes` 447 ''' 448 449 def __init__(self, ntv): 450 ''' the parameter of the constructor is the Ntv entity''' 451 self._ntv = ntv 452 self._node = None 453 self._stack = [] 454 455 def __iter__(self): 456 ''' iterator without initialization''' 457 return self 458 459 def __next__(self): 460 ''' return next node in the tree''' 461 if self._node is None: 462 self._node = self._ntv 463 elif len(self._node) == 0: 464 raise StopIteration 465 elif self._node.__class__.__name__ == 'NtvList': 466 self._next_down() 467 else: 468 self._next_up() 469 return self._node 470 471 def _next_down(self): 472 ''' find the next subchild node''' 473 474 self._node = self._node[0] 475 self._stack.append(0) 476 477 def _next_up(self): 478 ''' find the next sibling or ancestor node''' 479 parent = self._node.parent 480 if not parent or self._node == self._ntv: 481 raise StopIteration 482 ind = self._stack[-1] 483 if ind < len(parent) - 1: # if ind is not the last 484 self._node = parent[ind + 1] 485 self._stack[-1] += 1 486 else: 487 if parent == self._ntv: 488 raise StopIteration 489 self._node = parent 490 self._stack.pop() 491 self._next_up() 492 493 @property 494 def breadth(self): 495 ''' return the number of leaves''' 496 return len(self.leaf_nodes) 497 498 @property 499 def size(self): 500 ''' return the number of nodes''' 501 return len(self.nodes) 502 503 @property 504 def height(self): 505 ''' return the height of the tree''' 506 return max(len(node.pointer()) for node in self.__class__(self._ntv)) - 1 507 508 @property 509 def adjacency_list(self): 510 ''' return a dict with the list of child nodes for each parent node''' 511 return {node: node.val for node in self.inner_nodes} 512 513 @property 514 def nodes(self): 515 ''' return the list of nodes according to the DFS preordering algorithm''' 516 return list(self.__class__(self._ntv)) 517 518 @property 519 def dic_nodes(self): 520 ''' return a dict of nodes according to the DFS preordering algorithm''' 521 return {node.ntv_name: node for node in self.__class__(self._ntv) 522 if node.ntv_name} 523 524 @property 525 def leaf_nodes(self): 526 ''' return the list of leaf nodes according to the DFS preordering algorithm''' 527 return [node for node in self.__class__(self._ntv) 528 if node.__class__.__name__ == 'NtvSingle'] 529 530 @property 531 def inner_nodes(self): 532 ''' return the list of inner nodes according to the DFS preordering algorithm''' 533 return [node for node in self.__class__(self._ntv) 534 if node.__class__.__name__ == 'NtvList'] 535 536 537class NtvJsonEncoder(json.JSONEncoder): 538 """json encoder for Ntv data""" 539 540 def default(self, o): 541 try: 542 return NtvConnector.cast(o)[0] 543 except NtvError: 544 return json.JSONEncoder.default(self, o) 545 if isinstance(o, (datetime.datetime, datetime.date, datetime.time)): 546 return o.isoformat() 547 return json.JSONEncoder.default(self, o) 548 549 550class NtvError(Exception): 551 ''' NTV Exception'''
20class NtvUtil: 21 ''' The NtvUtil class includes static methods used by several NTV classes. 22 NtvUtil is the parent class of `Datatype`, `Namespace`, `Ntv`. 23 24 *class variables :* 25 - **_namespaces_** : list of Namespace defined 26 - **_types_** : list of Datatype defined 27 28 *static methods :* 29 - `from_obj_name` 30 - `decode_ntv_tab` 31 - `to_ntv_pointer` 32 33 ''' 34 _namespaces_ = {} 35 _types_ = {} 36 37 @staticmethod 38 def from_obj_name(string): 39 '''return a tuple with name, type_str and separator from string''' 40 if not isinstance(string, str): 41 raise NtvError('a json-name have to be str') 42 if string == '': 43 return (None, None, None) 44 spl = string.rsplit(':', maxsplit=1) 45 if len(spl) == 1: 46 if string[-1] == '.': 47 return (None, string, None) 48 return(string, None, None) 49 if spl[0] == '': 50 return (None, spl[1], ':') 51 if spl[0][-1] == ':': 52 sp0 = spl[0][:-1] 53 return (None if sp0 == '' else sp0, None if spl[1] == '' else spl[1], '::') 54 return (None if spl[0] == '' else spl[0], None if spl[1] == '' else spl[1], ':') 55 56 @staticmethod 57 def decode_ntv_tab(ntv, ntv_to_val): 58 '''Generate a tuple data from a Ntv tab value (bytes, string, json, Ntv object) 59 60 *parameters:* 61 62 - **ntv**: Ntv data to decode, 63 - **ntv_to_val**: method to convert external value form ntv in internal Field value 64 65 *Returns tuple: (name, dtype, codec, parent, keys, coef, leng)* 66 67 - name (None or string): name of the Field 68 - dtype (None or string): type of data 69 - codec (list): list of Field codec values 70 - parent (None or int): Field parent or None 71 - keys (None or list): Field keys 72 - coef (None or int): coef if primary Field else None 73 - leng (int): length of the Field 74 ''' 75 typ = ntv.type_str if ntv.ntv_type else None 76 nam = ntv.name 77 val = ntv_to_val(ntv) 78 if ntv.__class__.__name__ == 'NtvSingle': 79 return (nam, typ, [val], None, None, None, 1) 80 if len(ntv) < 2 or len(ntv) > 3 or ntv[0].__class__.__name__ == 'NtvSingle': 81 return (nam, typ, val, None, None, None, len(ntv)) 82 83 ntvc = ntv[0] 84 leng = max(len(ind) for ind in ntv) 85 typc = ntvc.type_str if ntvc.ntv_type else None 86 valc = ntv_to_val(ntvc) 87 if len(ntv) == 3 and ntv[1].__class__.__name__ == 'NtvSingle' and \ 88 isinstance(ntv[1].val, (int, str)) and \ 89 ntv[2].__class__.__name__ != 'NtvSingle' and \ 90 isinstance(ntv[2][0].val, int): 91 return (nam, typc, valc, ntv[1].val, ntv[2].to_obj(), None, leng) 92 if len(ntv) == 2 and len(ntv[1]) == 1 and isinstance(ntv[1].val, (int, str)): 93 return (nam, typc, valc, ntv[1].val, None, None, leng) 94 if len(ntv) == 2 and len(ntv[1]) == 1 and isinstance(ntv[1].val, list): 95 leng = leng * ntv[1][0].val 96 return (nam, typc, valc, None, None, ntv[1][0].val, leng) 97 if len(ntv) == 2 and len(ntv[1]) > 1 and isinstance(ntv[1][0].val, int): 98 return (nam, typc, valc, None, ntv[1].to_obj(), None, leng) 99 return (nam, typ, val, None, None, None, len(ntv)) 100 101 @staticmethod 102 def to_ntvpointer(jsonpointer, unique_root=False): 103 '''convert a json pointer inter a NTV pointer (string) 104 105 *parameters:* 106 107 - **jsonpointer**: String - json pointer to convert, 108 - **unique_root**: Boolean (default False) - True if the json root length is 1 ''' 109 single = '/([0-9]+)(/[a-z])' 110 if unique_root and not ('0' <= jsonpointer[1] <= '9'): 111 return re.sub(single, '\g<2>', jsonpointer)[1:] 112 return re.sub(single, '\g<2>', jsonpointer)
The NtvUtil class includes static methods used by several NTV classes.
NtvUtil is the parent class of Datatype
, Namespace
, Ntv
.
class variables :
- _namespaces_ : list of Namespace defined
- _types_ : list of Datatype defined
static methods :
from_obj_name
decode_ntv_tab
to_ntv_pointer
37 @staticmethod 38 def from_obj_name(string): 39 '''return a tuple with name, type_str and separator from string''' 40 if not isinstance(string, str): 41 raise NtvError('a json-name have to be str') 42 if string == '': 43 return (None, None, None) 44 spl = string.rsplit(':', maxsplit=1) 45 if len(spl) == 1: 46 if string[-1] == '.': 47 return (None, string, None) 48 return(string, None, None) 49 if spl[0] == '': 50 return (None, spl[1], ':') 51 if spl[0][-1] == ':': 52 sp0 = spl[0][:-1] 53 return (None if sp0 == '' else sp0, None if spl[1] == '' else spl[1], '::') 54 return (None if spl[0] == '' else spl[0], None if spl[1] == '' else spl[1], ':')
return a tuple with name, type_str and separator from string
56 @staticmethod 57 def decode_ntv_tab(ntv, ntv_to_val): 58 '''Generate a tuple data from a Ntv tab value (bytes, string, json, Ntv object) 59 60 *parameters:* 61 62 - **ntv**: Ntv data to decode, 63 - **ntv_to_val**: method to convert external value form ntv in internal Field value 64 65 *Returns tuple: (name, dtype, codec, parent, keys, coef, leng)* 66 67 - name (None or string): name of the Field 68 - dtype (None or string): type of data 69 - codec (list): list of Field codec values 70 - parent (None or int): Field parent or None 71 - keys (None or list): Field keys 72 - coef (None or int): coef if primary Field else None 73 - leng (int): length of the Field 74 ''' 75 typ = ntv.type_str if ntv.ntv_type else None 76 nam = ntv.name 77 val = ntv_to_val(ntv) 78 if ntv.__class__.__name__ == 'NtvSingle': 79 return (nam, typ, [val], None, None, None, 1) 80 if len(ntv) < 2 or len(ntv) > 3 or ntv[0].__class__.__name__ == 'NtvSingle': 81 return (nam, typ, val, None, None, None, len(ntv)) 82 83 ntvc = ntv[0] 84 leng = max(len(ind) for ind in ntv) 85 typc = ntvc.type_str if ntvc.ntv_type else None 86 valc = ntv_to_val(ntvc) 87 if len(ntv) == 3 and ntv[1].__class__.__name__ == 'NtvSingle' and \ 88 isinstance(ntv[1].val, (int, str)) and \ 89 ntv[2].__class__.__name__ != 'NtvSingle' and \ 90 isinstance(ntv[2][0].val, int): 91 return (nam, typc, valc, ntv[1].val, ntv[2].to_obj(), None, leng) 92 if len(ntv) == 2 and len(ntv[1]) == 1 and isinstance(ntv[1].val, (int, str)): 93 return (nam, typc, valc, ntv[1].val, None, None, leng) 94 if len(ntv) == 2 and len(ntv[1]) == 1 and isinstance(ntv[1].val, list): 95 leng = leng * ntv[1][0].val 96 return (nam, typc, valc, None, None, ntv[1][0].val, leng) 97 if len(ntv) == 2 and len(ntv[1]) > 1 and isinstance(ntv[1][0].val, int): 98 return (nam, typc, valc, None, ntv[1].to_obj(), None, leng) 99 return (nam, typ, val, None, None, None, len(ntv))
Generate a tuple data from a Ntv tab value (bytes, string, json, Ntv object)
parameters:
- ntv: Ntv data to decode,
- ntv_to_val: method to convert external value form ntv in internal Field value
Returns tuple: (name, dtype, codec, parent, keys, coef, leng)
- name (None or string): name of the Field
- dtype (None or string): type of data
- codec (list): list of Field codec values
- parent (None or int): Field parent or None
- keys (None or list): Field keys
- coef (None or int): coef if primary Field else None
- leng (int): length of the Field
101 @staticmethod 102 def to_ntvpointer(jsonpointer, unique_root=False): 103 '''convert a json pointer inter a NTV pointer (string) 104 105 *parameters:* 106 107 - **jsonpointer**: String - json pointer to convert, 108 - **unique_root**: Boolean (default False) - True if the json root length is 1 ''' 109 single = '/([0-9]+)(/[a-z])' 110 if unique_root and not ('0' <= jsonpointer[1] <= '9'): 111 return re.sub(single, '\g<2>', jsonpointer)[1:] 112 return re.sub(single, '\g<2>', jsonpointer)
convert a json pointer inter a NTV pointer (string)
parameters:
- jsonpointer: String - json pointer to convert,
- unique_root: Boolean (default False) - True if the json root length is 1
115class NtvConnector(ABC): 116 ''' The NtvConnector class is an abstract class used by all NTV connectors 117 for conversion between NTV-JSON data and NTV-OBJ data. 118 119 A NtvConnector child is defined by: 120 - clas_obj: str - define the class name of the object to convert 121 - clas_typ: str - define the Datatype of the converted object 122 - to_obj_ntv: method - converter from JsonNTV to the object 123 - to_json_ntv: method - converter from the object to JsonNTV 124 125 *class method :* 126 - `connector` 127 - `dic_connec` 128 - `castable` (@property) 129 - `dic_obj` (@property) 130 - `dic_type` (@property) 131 132 *abstract method* 133 - `to_obj_ntv` 134 - `to_json_ntv` 135 136 *static method* 137 - `cast` 138 - `uncast` 139 - `is_json_class` 140 - `is_json` 141 - `init_ntv_keys` 142 ''' 143 144 DIC_NTV_CL = {'NtvSingle': 'ntv', 'NtvList': 'ntv'} 145 DIC_GEO_CL = {'Point': 'point', 'MultiPoint': 'multipoint', 'LineString': 'line', 146 'MultiLineString': 'multiline', 'Polygon': 'polygon', 147 'MultiPolygon': 'multipolygon'} 148 DIC_DAT_CL = {'date': 'date', 'time': 'time', 'datetime': 'datetime'} 149 DIC_FCT = {'date': datetime.date.fromisoformat, 'time': datetime.time.fromisoformat, 150 'datetime': datetime.datetime.fromisoformat} 151 DIC_GEO = {'point': 'point', 'multipoint': 'multipoint', 'line': 'linestring', 152 'multiline': 'multilinestring', 'polygon': 'polygon', 153 'multipolygon': 'multipolygon'} 154 DIC_CBOR = {'point': False, 'multipoint': False, 'line': False, 155 'multiline': False, 'polygon': False, 'multipolygon': False, 156 'date': True, 'time': False, 'datetime': True} 157 DIC_OBJ = {'tab': 'DataFrameConnec', 'field': 'SeriesConnec', 158 'point': 'ShapelyConnec', 'multipoint': 'ShapelyConnec', 159 'line': 'ShapelyConnec', 'multiline': 'ShapelyConnec', 160 'polygon': 'ShapelyConnec', 'multipolygon': 'ShapelyConnec', 161 'other': None} 162 163 @classmethod 164 @property 165 def castable(cls): 166 '''return a list with class_name allowed for json-obj conversion''' 167 return ['str', 'int', 'bool', 'float', 'dict', 'tuple', 'NoneType', 168 'NtvSingle', 'NtvList'] \ 169 + list(NtvConnector.DIC_GEO_CL.keys()) \ 170 + list(NtvConnector.DIC_FCT.keys()) \ 171 + list(NtvConnector.dic_connec().keys()) 172 173 @classmethod 174 @property 175 def dic_obj(cls): 176 '''return a dict with the connectors: { type: class_connec_name }''' 177 return {clas.clas_typ: clas.__name__ for clas in cls.__subclasses__()} |\ 178 NtvConnector.DIC_OBJ 179 180 @classmethod 181 @property 182 def dic_type(cls): 183 '''return a dict with the connectors: { class_obj_name: type }''' 184 return {clas.clas_obj: clas.clas_typ for clas in cls.__subclasses__()} |\ 185 NtvConnector.DIC_GEO_CL | NtvConnector.DIC_DAT_CL | NtvConnector.DIC_NTV_CL 186 187 @classmethod 188 def connector(cls): 189 '''return a dict with the connectors: { class_connec_name: class_connec }''' 190 return {clas.__name__: clas for clas in cls.__subclasses__()} 191 192 @classmethod 193 def dic_connec(cls): 194 '''return a dict with the clas associated to the connector: 195 { class_obj_name: class_connec_name }''' 196 return {clas.clas_obj: clas.__name__ for clas in cls.__subclasses__()} 197 198 @staticmethod 199 @abstractmethod 200 def to_obj_ntv(ntv_value, **kwargs): 201 ''' abstract - convert ntv_value into the return object''' 202 203 @staticmethod 204 @abstractmethod 205 def to_json_ntv(value, name=None, typ=None): 206 ''' abstract - convert NTV object (value, name, type) into the NTV json 207 (json-value, name, type)''' 208 209 @staticmethod 210 def cast(data, name=None, type_str=None): 211 '''return JSON-NTV conversion (json_value, name, type_str) of the NTV entity 212 defined in parameters. 213 214 *Parameters* 215 216 - **data**: NtvSingle entity or NTVvalue of the NTV entity 217 - **name** : String (default None) - name of the NTV entity 218 - **type_str**: String (default None) - type of the NTV entity 219 ''' 220 clas = data.__class__.__name__ 221 if clas == 'NtvSingle': 222 name = data.ntv_name 223 type_str = data.type_str 224 data = data.ntv_value 225 dic_geo_cl = NtvConnector.DIC_GEO_CL 226 dic_connec = NtvConnector.dic_connec() 227 match clas: 228 case 'int' | 'float' | 'bool' | 'str': 229 return (data, name, type_str) 230 case 'dict': 231 return ({key: NtvConnector.cast(val, name, type_str)[0] 232 for key, val in data.items()}, name, type_str) 233 case 'list': 234 return ([NtvConnector.cast(val, name, type_str)[0] for val in data], 235 name, NtvConnector._typ_obj(data) if not type_str else type_str) 236 case 'tuple': 237 return (list(data), name, 'array' if not type_str else type_str) 238 case 'date' | 'time' | 'datetime': 239 return (data.isoformat(), name, clas if not type_str else type_str) 240 case 'Point' | 'MultiPoint' | 'LineString' | 'MultiLineString' | \ 241 'Polygon' | 'MultiPolygon': 242 return (NtvConnector.connector()[dic_connec['geometry']].to_json_ntv(data)[0], 243 name, dic_geo_cl[data.__class__.__name__] if not type_str else type_str) 244 case 'NtvSingle' | 'NtvList': 245 return (data.to_obj(), name, 'ntv' if not type_str else type_str) 246 case _: 247 connec = None 248 if clas in dic_connec and dic_connec[clas] in NtvConnector.connector(): 249 connec = NtvConnector.connector()[dic_connec[clas]] 250 if connec: 251 return connec.to_json_ntv(data, name, type_str) 252 raise NtvError( 253 'connector is not defined for the class : ', clas) 254 return (data, name, type_str) 255 256 @staticmethod 257 def uncast(value, name=None, type_str=None, **kwargs): 258 '''return OBJ-NTV conversion (obj_value, name, type_str) of a NTV entity 259 260 *Parameters* 261 262 - **data**: NtvSingle entity or NTVvalue of the NTV entity 263 - **name** : String (default None) - name of the NTV entity 264 - **type_str**: String (default None) - type of the NTV entity 265 ''' 266 if type_str == 'json': 267 return (value, name, type_str) 268 if value.__class__.__name__ == 'NtvSingle': 269 if not (type_str in set(NtvConnector.dic_type.values()) and 270 NtvConnector.is_json(value) or type_str is None): 271 return (value.ntv_value, value.name, value.type_str) 272 type_str = value.type_str if value.ntv_type else None 273 name = value.ntv_name 274 value = value.ntv_value 275 option = {'dicobj': {}, 'format': 'json', 'type_obj': False} | kwargs 276 value_obj = NtvConnector._uncast_val(value, type_str, **option) 277 return (value_obj, name, type_str if type_str else NtvConnector._typ_obj(value_obj)) 278 279 @staticmethod 280 def _typ_obj(value): 281 if isinstance(value, dict): 282 return NtvConnector._typ_obj(list(value.values())) 283 if isinstance(value, (tuple, list)): 284 for val in value: 285 typ = NtvConnector._typ_obj(val) 286 if typ: 287 return typ 288 return None 289 return NtvConnector.dic_type.get(value.__class__.__name__) 290 291 @staticmethod 292 def _uncast_val(value, type_n, **option): 293 '''return value from ntv value''' 294 dic_fct = NtvConnector.DIC_FCT 295 dic_geo = NtvConnector.DIC_GEO 296 dic_obj = NtvConnector.dic_obj | option['dicobj'] 297 dic_cbor = NtvConnector.DIC_CBOR 298 if not type_n or type_n == 'json' or (option['format'] == 'cbor' and 299 not dic_cbor.get(type_n, False)): 300 return value 301 if type_n in dic_fct: 302 if isinstance(value, (tuple, list)): 303 return [NtvConnector._uncast_val(val, type_n, **option) for val in value] 304 if isinstance(value, dict): 305 return {key: NtvConnector._uncast_val(val, type_n, **option) 306 for key, val in value.items()} 307 return dic_fct[type_n](value) 308 if type_n == 'array': 309 return tuple(value) 310 if type_n == 'ntv': 311 from json_ntv.ntv import Ntv 312 return Ntv.from_obj(value) 313 if type_n in dic_geo: 314 option['type_geo'] = dic_geo[type_n] 315 connec = None 316 if type_n in dic_obj and \ 317 dic_obj[type_n] in NtvConnector.connector(): 318 connec = NtvConnector.connector()[dic_obj[type_n]] 319 elif dic_obj['other'] in NtvConnector.connector(): 320 connec = NtvConnector.connector()['other'] 321 if connec: 322 return connec.to_obj_ntv(value, **option) 323 return value 324 325 @staticmethod 326 def is_json_class(val): 327 ''' return True if val is a json type''' 328 return val is None or isinstance(val, (list, int, str, float, bool, dict)) 329 330 @staticmethod 331 def is_json(obj): 332 ''' check if obj is a json structure and return True if obj is a json-value 333 334 *Parameters* 335 336 - **obj** : object to check 337 - **ntvobj** : boolean (default False) - if True NTV class value are accepted''' 338 if obj is None: 339 return True 340 is_js = NtvConnector.is_json 341 match obj: 342 case str() | int() | float() | bool() as obj: 343 return True 344 case list() | tuple() as obj: 345 if not obj: 346 return True 347 return min(is_js(obj_in) for obj_in in obj) 348 case dict() as obj: 349 if not obj: 350 return True 351 if not min(isinstance(key, str) for key in obj.keys()): 352 raise NtvError('key in dict in not string') 353 return min(is_js(obj_in) for obj_in in obj.values()) 354 case _: 355 if not obj.__class__.__name__ in NtvConnector.castable: 356 raise NtvError(obj.__class__.__name__ + 357 ' is not valid for NTV') 358 return False 359 360 @staticmethod 361 def keysfromderkeys(parentkeys, derkeys): 362 '''return keys from parent keys and derkeys 363 364 *Parameters* 365 366 - **parentkeys** : list of keys from parent 367 - **derkeys** : list of derived keys 368 369 *Returns* : list of keys''' 370 return [derkeys[pkey] for pkey in parentkeys] 371 372 @staticmethod 373 def encode_coef(lis): 374 '''Generate a repetition coefficient for periodic list''' 375 if len(lis) < 2: 376 return 0 377 coef = 1 378 while coef != len(lis): 379 if lis[coef-1] != lis[coef]: 380 break 381 coef += 1 382 if (not len(lis) % (coef * (max(lis) + 1)) and 383 lis == NtvConnector.keysfromcoef(coef, max(lis) + 1, len(lis))): 384 return coef 385 return 0 386 387 @staticmethod 388 def keysfromcoef(coef, period, leng=None): 389 ''' return a list of keys with periodic structure''' 390 if not leng: 391 leng = coef * period 392 return None if not (coef and period) else [(ind % (coef * period)) // coef 393 for ind in range(leng)] 394 395 @staticmethod 396 def init_ntv_keys(ind, lidx, leng): 397 ''' initialization of explicit keys data in lidx object of tabular data''' 398 # name: 0, type: 1, codec: 2, parent: 3, keys: 4, coef: 5, leng: 6 399 name, typ, codec, parent, keys, coef, length = lidx[ind] 400 if (keys, parent, coef) == (None, None, None): # full or unique 401 if len(codec) == 1: # unique 402 lidx[ind][4] = [0] * leng 403 elif len(codec) == leng: # full 404 lidx[ind][4] = list(range(leng)) 405 else: 406 raise NtvError('impossible to generate keys') 407 return 408 if keys and len(keys) > 1 and parent is None: # complete 409 return 410 if coef: # primary 411 lidx[ind][4] = [(ikey % (coef * len(codec))) // 412 coef for ikey in range(leng)] 413 lidx[ind][3] = None 414 return 415 if parent is None: 416 raise NtvError('keys not referenced') 417 if not lidx[parent][4] or len(lidx[parent][4]) != leng: 418 NtvConnector.init_ntv_keys(parent, lidx, leng) 419 if not keys and len(codec) == len(lidx[parent][2]): # implicit 420 lidx[ind][4] = lidx[parent][4] 421 lidx[ind][3] = None 422 return 423 lidx[ind][4] = NtvConnector.keysfromderkeys( 424 lidx[parent][4], keys) # relative 425 lidx[ind][3] = None 426 return
The NtvConnector class is an abstract class used by all NTV connectors for conversion between NTV-JSON data and NTV-OBJ data.
A NtvConnector child is defined by:
- clas_obj: str - define the class name of the object to convert
- clas_typ: str - define the Datatype of the converted object
- to_obj_ntv: method - converter from JsonNTV to the object
- to_json_ntv: method - converter from the object to JsonNTV
class method :
connector
dic_connec
castable
(@property)dic_obj
(@property)dic_type
(@property)
abstract method
static method
187 @classmethod 188 def connector(cls): 189 '''return a dict with the connectors: { class_connec_name: class_connec }''' 190 return {clas.__name__: clas for clas in cls.__subclasses__()}
return a dict with the connectors: { class_connec_name: class_connec }
192 @classmethod 193 def dic_connec(cls): 194 '''return a dict with the clas associated to the connector: 195 { class_obj_name: class_connec_name }''' 196 return {clas.clas_obj: clas.__name__ for clas in cls.__subclasses__()}
return a dict with the clas associated to the connector: { class_obj_name: class_connec_name }
198 @staticmethod 199 @abstractmethod 200 def to_obj_ntv(ntv_value, **kwargs): 201 ''' abstract - convert ntv_value into the return object'''
abstract - convert ntv_value into the return object
203 @staticmethod 204 @abstractmethod 205 def to_json_ntv(value, name=None, typ=None): 206 ''' abstract - convert NTV object (value, name, type) into the NTV json 207 (json-value, name, type)'''
abstract - convert NTV object (value, name, type) into the NTV json (json-value, name, type)
209 @staticmethod 210 def cast(data, name=None, type_str=None): 211 '''return JSON-NTV conversion (json_value, name, type_str) of the NTV entity 212 defined in parameters. 213 214 *Parameters* 215 216 - **data**: NtvSingle entity or NTVvalue of the NTV entity 217 - **name** : String (default None) - name of the NTV entity 218 - **type_str**: String (default None) - type of the NTV entity 219 ''' 220 clas = data.__class__.__name__ 221 if clas == 'NtvSingle': 222 name = data.ntv_name 223 type_str = data.type_str 224 data = data.ntv_value 225 dic_geo_cl = NtvConnector.DIC_GEO_CL 226 dic_connec = NtvConnector.dic_connec() 227 match clas: 228 case 'int' | 'float' | 'bool' | 'str': 229 return (data, name, type_str) 230 case 'dict': 231 return ({key: NtvConnector.cast(val, name, type_str)[0] 232 for key, val in data.items()}, name, type_str) 233 case 'list': 234 return ([NtvConnector.cast(val, name, type_str)[0] for val in data], 235 name, NtvConnector._typ_obj(data) if not type_str else type_str) 236 case 'tuple': 237 return (list(data), name, 'array' if not type_str else type_str) 238 case 'date' | 'time' | 'datetime': 239 return (data.isoformat(), name, clas if not type_str else type_str) 240 case 'Point' | 'MultiPoint' | 'LineString' | 'MultiLineString' | \ 241 'Polygon' | 'MultiPolygon': 242 return (NtvConnector.connector()[dic_connec['geometry']].to_json_ntv(data)[0], 243 name, dic_geo_cl[data.__class__.__name__] if not type_str else type_str) 244 case 'NtvSingle' | 'NtvList': 245 return (data.to_obj(), name, 'ntv' if not type_str else type_str) 246 case _: 247 connec = None 248 if clas in dic_connec and dic_connec[clas] in NtvConnector.connector(): 249 connec = NtvConnector.connector()[dic_connec[clas]] 250 if connec: 251 return connec.to_json_ntv(data, name, type_str) 252 raise NtvError( 253 'connector is not defined for the class : ', clas) 254 return (data, name, type_str)
return JSON-NTV conversion (json_value, name, type_str) of the NTV entity defined in parameters.
Parameters
- data: NtvSingle entity or NTVvalue of the NTV entity
- name : String (default None) - name of the NTV entity
- type_str: String (default None) - type of the NTV entity
256 @staticmethod 257 def uncast(value, name=None, type_str=None, **kwargs): 258 '''return OBJ-NTV conversion (obj_value, name, type_str) of a NTV entity 259 260 *Parameters* 261 262 - **data**: NtvSingle entity or NTVvalue of the NTV entity 263 - **name** : String (default None) - name of the NTV entity 264 - **type_str**: String (default None) - type of the NTV entity 265 ''' 266 if type_str == 'json': 267 return (value, name, type_str) 268 if value.__class__.__name__ == 'NtvSingle': 269 if not (type_str in set(NtvConnector.dic_type.values()) and 270 NtvConnector.is_json(value) or type_str is None): 271 return (value.ntv_value, value.name, value.type_str) 272 type_str = value.type_str if value.ntv_type else None 273 name = value.ntv_name 274 value = value.ntv_value 275 option = {'dicobj': {}, 'format': 'json', 'type_obj': False} | kwargs 276 value_obj = NtvConnector._uncast_val(value, type_str, **option) 277 return (value_obj, name, type_str if type_str else NtvConnector._typ_obj(value_obj))
return OBJ-NTV conversion (obj_value, name, type_str) of a NTV entity
Parameters
- data: NtvSingle entity or NTVvalue of the NTV entity
- name : String (default None) - name of the NTV entity
- type_str: String (default None) - type of the NTV entity
325 @staticmethod 326 def is_json_class(val): 327 ''' return True if val is a json type''' 328 return val is None or isinstance(val, (list, int, str, float, bool, dict))
return True if val is a json type
330 @staticmethod 331 def is_json(obj): 332 ''' check if obj is a json structure and return True if obj is a json-value 333 334 *Parameters* 335 336 - **obj** : object to check 337 - **ntvobj** : boolean (default False) - if True NTV class value are accepted''' 338 if obj is None: 339 return True 340 is_js = NtvConnector.is_json 341 match obj: 342 case str() | int() | float() | bool() as obj: 343 return True 344 case list() | tuple() as obj: 345 if not obj: 346 return True 347 return min(is_js(obj_in) for obj_in in obj) 348 case dict() as obj: 349 if not obj: 350 return True 351 if not min(isinstance(key, str) for key in obj.keys()): 352 raise NtvError('key in dict in not string') 353 return min(is_js(obj_in) for obj_in in obj.values()) 354 case _: 355 if not obj.__class__.__name__ in NtvConnector.castable: 356 raise NtvError(obj.__class__.__name__ + 357 ' is not valid for NTV') 358 return False
check if obj is a json structure and return True if obj is a json-value
Parameters
- obj : object to check
- ntvobj : boolean (default False) - if True NTV class value are accepted
360 @staticmethod 361 def keysfromderkeys(parentkeys, derkeys): 362 '''return keys from parent keys and derkeys 363 364 *Parameters* 365 366 - **parentkeys** : list of keys from parent 367 - **derkeys** : list of derived keys 368 369 *Returns* : list of keys''' 370 return [derkeys[pkey] for pkey in parentkeys]
return keys from parent keys and derkeys
Parameters
- parentkeys : list of keys from parent
- derkeys : list of derived keys
Returns : list of keys
372 @staticmethod 373 def encode_coef(lis): 374 '''Generate a repetition coefficient for periodic list''' 375 if len(lis) < 2: 376 return 0 377 coef = 1 378 while coef != len(lis): 379 if lis[coef-1] != lis[coef]: 380 break 381 coef += 1 382 if (not len(lis) % (coef * (max(lis) + 1)) and 383 lis == NtvConnector.keysfromcoef(coef, max(lis) + 1, len(lis))): 384 return coef 385 return 0
Generate a repetition coefficient for periodic list
387 @staticmethod 388 def keysfromcoef(coef, period, leng=None): 389 ''' return a list of keys with periodic structure''' 390 if not leng: 391 leng = coef * period 392 return None if not (coef and period) else [(ind % (coef * period)) // coef 393 for ind in range(leng)]
return a list of keys with periodic structure
395 @staticmethod 396 def init_ntv_keys(ind, lidx, leng): 397 ''' initialization of explicit keys data in lidx object of tabular data''' 398 # name: 0, type: 1, codec: 2, parent: 3, keys: 4, coef: 5, leng: 6 399 name, typ, codec, parent, keys, coef, length = lidx[ind] 400 if (keys, parent, coef) == (None, None, None): # full or unique 401 if len(codec) == 1: # unique 402 lidx[ind][4] = [0] * leng 403 elif len(codec) == leng: # full 404 lidx[ind][4] = list(range(leng)) 405 else: 406 raise NtvError('impossible to generate keys') 407 return 408 if keys and len(keys) > 1 and parent is None: # complete 409 return 410 if coef: # primary 411 lidx[ind][4] = [(ikey % (coef * len(codec))) // 412 coef for ikey in range(leng)] 413 lidx[ind][3] = None 414 return 415 if parent is None: 416 raise NtvError('keys not referenced') 417 if not lidx[parent][4] or len(lidx[parent][4]) != leng: 418 NtvConnector.init_ntv_keys(parent, lidx, leng) 419 if not keys and len(codec) == len(lidx[parent][2]): # implicit 420 lidx[ind][4] = lidx[parent][4] 421 lidx[ind][3] = None 422 return 423 lidx[ind][4] = NtvConnector.keysfromderkeys( 424 lidx[parent][4], keys) # relative 425 lidx[ind][3] = None 426 return
initialization of explicit keys data in lidx object of tabular data
429class NtvTree: 430 ''' The NtvTree class is an iterator class used to traverse a NTV tree structure. 431 Some other methods give tree indicators and data. 432 433 *Attributes :* 434 435 - **ntv** : Ntv entity 436 - **_node**: Ntv entity - node pointer 437 - **_stack**: list - stack used to store context in recursive methods 438 439 *dynamic values (@property)* 440 - `breadth` 441 - `size` 442 - `height` 443 - `adjacency_list` 444 - `nodes` 445 - `dic_nodes` 446 - `leaf_nodes` 447 - `inner_nodes` 448 ''' 449 450 def __init__(self, ntv): 451 ''' the parameter of the constructor is the Ntv entity''' 452 self._ntv = ntv 453 self._node = None 454 self._stack = [] 455 456 def __iter__(self): 457 ''' iterator without initialization''' 458 return self 459 460 def __next__(self): 461 ''' return next node in the tree''' 462 if self._node is None: 463 self._node = self._ntv 464 elif len(self._node) == 0: 465 raise StopIteration 466 elif self._node.__class__.__name__ == 'NtvList': 467 self._next_down() 468 else: 469 self._next_up() 470 return self._node 471 472 def _next_down(self): 473 ''' find the next subchild node''' 474 475 self._node = self._node[0] 476 self._stack.append(0) 477 478 def _next_up(self): 479 ''' find the next sibling or ancestor node''' 480 parent = self._node.parent 481 if not parent or self._node == self._ntv: 482 raise StopIteration 483 ind = self._stack[-1] 484 if ind < len(parent) - 1: # if ind is not the last 485 self._node = parent[ind + 1] 486 self._stack[-1] += 1 487 else: 488 if parent == self._ntv: 489 raise StopIteration 490 self._node = parent 491 self._stack.pop() 492 self._next_up() 493 494 @property 495 def breadth(self): 496 ''' return the number of leaves''' 497 return len(self.leaf_nodes) 498 499 @property 500 def size(self): 501 ''' return the number of nodes''' 502 return len(self.nodes) 503 504 @property 505 def height(self): 506 ''' return the height of the tree''' 507 return max(len(node.pointer()) for node in self.__class__(self._ntv)) - 1 508 509 @property 510 def adjacency_list(self): 511 ''' return a dict with the list of child nodes for each parent node''' 512 return {node: node.val for node in self.inner_nodes} 513 514 @property 515 def nodes(self): 516 ''' return the list of nodes according to the DFS preordering algorithm''' 517 return list(self.__class__(self._ntv)) 518 519 @property 520 def dic_nodes(self): 521 ''' return a dict of nodes according to the DFS preordering algorithm''' 522 return {node.ntv_name: node for node in self.__class__(self._ntv) 523 if node.ntv_name} 524 525 @property 526 def leaf_nodes(self): 527 ''' return the list of leaf nodes according to the DFS preordering algorithm''' 528 return [node for node in self.__class__(self._ntv) 529 if node.__class__.__name__ == 'NtvSingle'] 530 531 @property 532 def inner_nodes(self): 533 ''' return the list of inner nodes according to the DFS preordering algorithm''' 534 return [node for node in self.__class__(self._ntv) 535 if node.__class__.__name__ == 'NtvList']
The NtvTree class is an iterator class used to traverse a NTV tree structure. Some other methods give tree indicators and data.
Attributes :
- ntv : Ntv entity
- _node: Ntv entity - node pointer
- _stack: list - stack used to store context in recursive methods
dynamic values (@property)
538class NtvJsonEncoder(json.JSONEncoder): 539 """json encoder for Ntv data""" 540 541 def default(self, o): 542 try: 543 return NtvConnector.cast(o)[0] 544 except NtvError: 545 return json.JSONEncoder.default(self, o) 546 if isinstance(o, (datetime.datetime, datetime.date, datetime.time)): 547 return o.isoformat() 548 return json.JSONEncoder.default(self, o)
json encoder for Ntv data
541 def default(self, o): 542 try: 543 return NtvConnector.cast(o)[0] 544 except NtvError: 545 return json.JSONEncoder.default(self, o) 546 if isinstance(o, (datetime.datetime, datetime.date, datetime.time)): 547 return o.isoformat() 548 return json.JSONEncoder.default(self, o)
Implement this method in a subclass such that it returns
a serializable object for o
, or calls the base implementation
(to raise a TypeError
).
For example, to support arbitrary iterators, you could implement default like this::
def default(self, o):
try:
iterable = iter(o)
except TypeError:
pass
else:
return list(iterable)
# Let the base class default method raise the TypeError
return JSONEncoder.default(self, o)
Inherited Members
- json.encoder.JSONEncoder
- JSONEncoder
- encode
- iterencode
NTV Exception
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback