NTV.json_ntv.ntv_util
Created on Feb 27 2023
@author: Philippe@loco-labs.io
The ntv_util
module is part of the NTV.json_ntv
package (specification document).
It contains the classes NtvConnector
, NtvTree
, NtvJsonEncoder
and NtvError
for NTV entities.
1# -*- coding: utf-8 -*- 2""" 3Created on Feb 27 2023 4 5@author: Philippe@loco-labs.io 6 7The `ntv_util` module is part of the `NTV.json_ntv` package ([specification document]( 8https://github.com/loco-philippe/NTV/blob/main/documentation/JSON-NTV-standard.pdf)). 9 10It contains the classes `NtvConnector`, `NtvTree`, `NtvJsonEncoder` and `NtvError` 11for NTV entities. 12""" 13from abc import ABC, abstractmethod 14import datetime 15import json 16 17 18class NtvConnector(ABC): 19 ''' The NtvConnector class is an abstract class used by all NTV connectors 20 for conversion between NTV-JSON data and NTV-OBJ data. 21 22 A NtvConnector child is defined by: 23 - clas_obj: str - define the class name of the object to convert 24 - clas_typ: str - define the NTVtype of the converted object 25 - to_obj_ntv: method - converter from JsonNTV to the object 26 - to_json_ntv: method - converter from the object to JsonNTV 27 28 *class method :* 29 - `connector` 30 - `dic_connec` 31 - `castable` (@property) 32 - `dic_obj` (@property) 33 - `dic_type` (@property) 34 35 *abstract method* 36 - `to_obj_ntv` 37 - `to_json_ntv` 38 39 *static method* 40 - `cast` 41 - `uncast` 42 - `is_json_class` 43 - `is_json` 44 - `init_ntv_keys` 45 ''' 46 47 DIC_NTV_CL = {'NtvSingle': 'ntv', 'NtvList': 'ntv'} 48 DIC_GEO_CL = {'Point': 'point', 'MultiPoint': 'multipoint', 'LineString': 'line', 49 'MultiLineString': 'multiline', 'Polygon': 'polygon', 50 'MultiPolygon': 'multipolygon'} 51 DIC_DAT_CL = {'date': 'date', 'time': 'time', 'datetime': 'datetime'} 52 DIC_FCT = {'date': datetime.date.fromisoformat, 'time': datetime.time.fromisoformat, 53 'datetime': datetime.datetime.fromisoformat} 54 DIC_GEO = {'point': 'point', 'multipoint': 'multipoint', 'line': 'linestring', 55 'multiline': 'multilinestring', 'polygon': 'polygon', 56 'multipolygon': 'multipolygon'} 57 DIC_CBOR = {'point': False, 'multipoint': False, 'line': False, 58 'multiline': False, 'polygon': False, 'multipolygon': False, 59 'date': True, 'time': False, 'datetime': True} 60 DIC_OBJ = {'tab': 'DataFrameConnec', 'field': 'SeriesConnec', 61 'point': 'ShapelyConnec', 'multipoint': 'ShapelyConnec', 62 'line': 'ShapelyConnec', 'multiline': 'ShapelyConnec', 63 'polygon': 'ShapelyConnec', 'multipolygon': 'ShapelyConnec', 64 'other': None} 65 66 @classmethod 67 @property 68 def castable(cls): 69 '''return a list with class_name allowed for json-obj conversion''' 70 return ['str', 'int', 'bool', 'float', 'dict', 'tuple', 'NoneType', 71 'NtvSingle', 'NtvList'] \ 72 + list(NtvConnector.DIC_GEO_CL.keys()) \ 73 + list(NtvConnector.DIC_FCT.keys()) \ 74 + list(NtvConnector.dic_connec().keys()) 75 76 @classmethod 77 @property 78 def dic_obj(cls): 79 '''return a dict with the connectors: { type: class_connec_name }''' 80 return {clas.clas_typ: clas.__name__ for clas in cls.__subclasses__()} |\ 81 NtvConnector.DIC_OBJ 82 83 @classmethod 84 @property 85 def dic_type(cls): 86 '''return a dict with the connectors: { class_obj_name: type }''' 87 return {clas.clas_obj: clas.clas_typ for clas in cls.__subclasses__()} |\ 88 NtvConnector.DIC_GEO_CL | NtvConnector.DIC_DAT_CL | NtvConnector.DIC_NTV_CL 89 90 @classmethod 91 def connector(cls): 92 '''return a dict with the connectors: { class_connec_name: class_connec }''' 93 return {clas.__name__: clas for clas in cls.__subclasses__()} 94 95 @classmethod 96 def dic_connec(cls): 97 '''return a dict with the clas associated to the connector: 98 { class_obj_name: class_connec_name }''' 99 return {clas.clas_obj: clas.__name__ for clas in cls.__subclasses__()} 100 101 @staticmethod 102 @abstractmethod 103 def to_obj_ntv(ntv_value, **kwargs): 104 ''' abstract - convert ntv_value into the return object''' 105 106 @staticmethod 107 @abstractmethod 108 def to_json_ntv(value, name=None, typ=None): 109 ''' abstract - convert NTV object (value, name, type) into the NTV json 110 (json-value, name, type)''' 111 112 @staticmethod 113 def cast(data, name=None, type_str=None): 114 '''return JSON-NTV conversion (json_value, name, type_str) of the NTV entity 115 defined in parameters. 116 117 *Parameters* 118 119 - **data**: NtvSingle entity or NTVvalue of the NTV entity 120 - **name** : String (default None) - name of the NTV entity 121 - **type_str**: String (default None) - type of the NTV entity 122 ''' 123 clas = data.__class__.__name__ 124 if clas == 'NtvSingle': 125 name = data.ntv_name 126 type_str = data.type_str 127 data = data.ntv_value 128 dic_geo_cl = NtvConnector.DIC_GEO_CL 129 dic_connec = NtvConnector.dic_connec() 130 match clas: 131 case 'int' | 'float' | 'bool' | 'str': 132 return (data, name, type_str) 133 case 'dict': 134 return ({key: NtvConnector.cast(val, name, type_str)[0] 135 for key, val in data.items()}, name, type_str) 136 case 'list': 137 return ([NtvConnector.cast(val, name, type_str)[0] for val in data], 138 name, NtvConnector._typ_obj(data) if not type_str else type_str) 139 case 'tuple': 140 return (list(data), name, 'array' if not type_str else type_str) 141 case 'date' | 'time' | 'datetime': 142 return (data.isoformat(), name, clas if not type_str else type_str) 143 case 'Point' | 'MultiPoint' | 'LineString' | 'MultiLineString' | \ 144 'Polygon' | 'MultiPolygon': 145 return (NtvConnector.connector()[dic_connec['geometry']].to_json_ntv(data)[0], 146 name, dic_geo_cl[data.__class__.__name__] if not type_str else type_str) 147 case 'NtvSingle' | 'NtvList': 148 return (data.to_obj(), name, 'ntv' if not type_str else type_str) 149 case _: 150 connec = None 151 if clas in dic_connec and dic_connec[clas] in NtvConnector.connector(): 152 connec = NtvConnector.connector()[dic_connec[clas]] 153 if connec: 154 return connec.to_json_ntv(data, name, type_str) 155 raise NtvError( 156 'connector is not defined for the class : ', clas) 157 return (data, name, type_str) 158 159 @staticmethod 160 def uncast(value, name=None, type_str=None, **kwargs): 161 '''return OBJ-NTV conversion (obj_value, name, type_str) of a NTV entity 162 163 *Parameters* 164 165 - **data**: NtvSingle entity or NTVvalue of the NTV entity 166 - **name** : String (default None) - name of the NTV entity 167 - **type_str**: String (default None) - type of the NTV entity 168 ''' 169 if type_str == 'json': 170 return (value, name, type_str) 171 if value.__class__.__name__ == 'NtvSingle': 172 if not (type_str in set(NtvConnector.dic_type.values()) and 173 NtvConnector.is_json(value) or type_str is None): 174 return (value.ntv_value, value.name, value.type_str) 175 type_str = value.type_str if value.ntv_type else None 176 name = value.ntv_name 177 value = value.ntv_value 178 #dic_obj = NtvConnector.dic_obj 179 option = {'dicobj': {}, 'format': 'json', 'type_obj': False} | kwargs 180 value_obj = NtvConnector._uncast_val(value, type_str, **option) 181 return (value_obj, name, type_str if type_str else NtvConnector._typ_obj(value_obj)) 182 183 @staticmethod 184 def _typ_obj(value): 185 if isinstance(value, dict): 186 return NtvConnector._typ_obj(list(value.values())) 187 if isinstance(value, (tuple, list)): 188 for val in value: 189 typ = NtvConnector._typ_obj(val) 190 if typ: 191 return typ 192 return None 193 return NtvConnector.dic_type.get(value.__class__.__name__) 194 195 @staticmethod 196 def _uncast_val(value, type_n, **option): 197 '''return value from ntv value''' 198 dic_fct = NtvConnector.DIC_FCT 199 dic_geo = NtvConnector.DIC_GEO 200 dic_obj = NtvConnector.dic_obj | option['dicobj'] 201 dic_cbor = NtvConnector.DIC_CBOR 202 if not type_n or type_n == 'json' or (option['format'] == 'cbor' and 203 not dic_cbor.get(type_n, False)): 204 return value 205 if type_n in dic_fct: 206 if isinstance(value, (tuple, list)): 207 return [NtvConnector._uncast_val(val, type_n, **option) for val in value] 208 if isinstance(value, dict): 209 return {key: NtvConnector._uncast_val(val, type_n, **option) 210 for key, val in value.items()} 211 return dic_fct[type_n](value) 212 if type_n == 'array': 213 return tuple(value) 214 if type_n == 'ntv': 215 from json_ntv.ntv import Ntv 216 return Ntv.from_obj(value) 217 if type_n in dic_geo: 218 option['type_geo'] = dic_geo[type_n] 219 connec = None 220 if type_n in dic_obj and \ 221 dic_obj[type_n] in NtvConnector.connector(): 222 connec = NtvConnector.connector()[dic_obj[type_n]] 223 elif dic_obj['other'] in NtvConnector.connector(): 224 connec = NtvConnector.connector()['other'] 225 if connec: 226 return connec.to_obj_ntv(value, **option) 227 #raise NtvError('type of value not allowed for conversion') 228 return value 229 230 @staticmethod 231 def is_json_class(val): 232 ''' return True if val is a json type''' 233 return val is None or isinstance(val, (list, int, str, float, bool, dict)) 234 235 @staticmethod 236 def is_json(obj): 237 ''' check if obj is a json structure and return True if obj is a json-value 238 239 *Parameters* 240 241 - **obj** : object to check 242 - **ntvobj** : boolean (default False) - if True NTV class value are accepted''' 243 if obj is None: 244 return True 245 is_js = NtvConnector.is_json 246 match obj: 247 case str() | int() | float() | bool() as obj: 248 return True 249 case list() | tuple() as obj: 250 if not obj: 251 return True 252 return min(is_js(obj_in) for obj_in in obj) 253 case dict() as obj: 254 if not obj: 255 return True 256 if not min(isinstance(key, str) for key in obj.keys()): 257 raise NtvError('key in dict in not string') 258 return min(is_js(obj_in) for obj_in in obj.values()) 259 case _: 260 if not obj.__class__.__name__ in NtvConnector.castable: 261 raise NtvError(obj.__class__.__name__ + 262 'is not valid for NTV') 263 return False 264 265 @staticmethod 266 def keysfromderkeys(parentkeys, derkeys): 267 '''return keys from parent keys and derkeys 268 269 *Parameters* 270 271 - **parentkeys** : list of keys from parent 272 - **derkeys** : list of derived keys 273 274 *Returns* : list of keys''' 275 return [derkeys[pkey] for pkey in parentkeys] 276 277 @staticmethod 278 def encode_coef(lis): 279 '''Generate a repetition coefficient for periodic list''' 280 if len(lis) < 2: 281 return 0 282 coef = 1 283 while coef != len(lis): 284 if lis[coef-1] != lis[coef]: 285 break 286 coef += 1 287 if (not len(lis) % (coef * (max(lis) + 1)) and 288 lis == NtvConnector.keysfromcoef(coef, max(lis) + 1, len(lis))): 289 return coef 290 return 0 291 292 @staticmethod 293 def keysfromcoef(coef, period, leng=None): 294 ''' return a list of keys with periodic structure''' 295 if not leng: 296 leng = coef * period 297 return None if not (coef and period) else [(ind % (coef * period)) // coef 298 for ind in range(leng)] 299 @staticmethod 300 def init_ntv_keys(ind, lidx, leng): 301 ''' initialization of explicit keys data in lidx object of tabular data''' 302 # name: 0, type: 1, codec: 2, parent: 3, keys: 4, coef: 5, leng: 6 303 name, typ, codec, parent, keys, coef, length = lidx[ind] 304 if (keys, parent, coef) == (None, None, None): # full or unique 305 if len(codec) == 1: # unique 306 lidx[ind][4] = [0] * leng 307 elif len(codec) == leng: # full 308 lidx[ind][4] = list(range(leng)) 309 else: 310 raise NtvError('impossible to generate keys') 311 return 312 if keys and len(keys) > 1 and parent is None: #complete 313 return 314 if coef: #primary 315 lidx[ind][4] = [(ikey % (coef * len(codec))) // coef for ikey in range(leng)] 316 lidx[ind][3] = None 317 return 318 if parent is None: 319 raise NtvError('keys not referenced') 320 if not lidx[parent][4] or len(lidx[parent][4]) != leng: 321 NtvConnector.init_ntv_keys(parent, lidx, leng) 322 if not keys and len(codec) == len(lidx[parent][2]): # implicit 323 lidx[ind][4] = lidx[parent][4] 324 lidx[ind][3] = None 325 return 326 lidx[ind][4] = NtvConnector.keysfromderkeys(lidx[parent][4], keys) # relative 327 #lidx[ind][4] = [keys[pkey] for pkey in lidx[parent][4]] # relative 328 lidx[ind][3] = None 329 return 330 331class NtvTree: 332 ''' The NtvTree class is an iterator class used to traverse a NTV tree structure. 333 Some other methods give tree indicators and data. 334 335 *Attributes :* 336 337 - **ntv** : Ntv entity 338 - **_node**: Ntv entity - node pointer 339 - **_stack**: list - stack used to store context in recursive methods 340 341 *dynamic values (@property)* 342 - `breadth` 343 - `size` 344 - `height` 345 - `adjacency_list` 346 - `nodes` 347 - `leaf_nodes` 348 - `inner_nodes` 349 ''' 350 351 def __init__(self, ntv): 352 ''' the parameter of the constructor is the Ntv entity''' 353 self._ntv = ntv 354 self._node = None 355 self._stack = [] 356 357 def __iter__(self): 358 ''' iterator without initialization''' 359 return self 360 361 def __next__(self): 362 ''' return next node in the tree''' 363 if self._node is None: 364 self._node = self._ntv 365 elif len(self._node) == 0: 366 raise StopIteration 367 elif self._node.__class__.__name__ == 'NtvList': 368 self._next_down() 369 else: 370 self._next_up() 371 return self._node 372 373 def _next_down(self): 374 ''' find the next subchild node''' 375 376 self._node = self._node[0] 377 self._stack.append(0) 378 379 def _next_up(self): 380 ''' find the next sibling or ancestor node''' 381 parent = self._node.parent 382 if not parent or self._node == self._ntv: 383 raise StopIteration 384 ind = self._stack[-1] 385 if ind < len(parent) - 1: # if ind is not the last 386 self._node = parent[ind + 1] 387 self._stack[-1] += 1 388 else: 389 if parent == self._ntv: 390 raise StopIteration 391 self._node = parent 392 self._stack.pop() 393 self._next_up() 394 395 @property 396 def breadth(self): 397 ''' return the number of leaves''' 398 return len(self.leaf_nodes) 399 400 @property 401 def size(self): 402 ''' return the number of nodes''' 403 return len(self.nodes) 404 405 @property 406 def height(self): 407 ''' return the height of the tree''' 408 return max(len(node.pointer()) for node in self.__class__(self._ntv)) 409 410 @property 411 def adjacency_list(self): 412 ''' return a dict with the list of child nodes for each parent node''' 413 return {node: node.val for node in self.inner_nodes} 414 415 @property 416 def nodes(self): 417 ''' return the list of nodes according to the DFS preordering algorithm''' 418 return list(self.__class__(self._ntv)) 419 420 @property 421 def dic_nodes(self): 422 ''' return a dict of nodes according to the DFS preordering algorithm''' 423 return {node.ntv_name: node for node in self.__class__(self._ntv) 424 if node.ntv_name} 425 426 @property 427 def leaf_nodes(self): 428 ''' return the list of leaf nodes according to the DFS preordering algorithm''' 429 #return [node for node in self.__class__(self._ntv) if not isinstance(node, NtvList)] 430 return [node for node in self.__class__(self._ntv) 431 if node.__class__.__name__ == 'NtvSingle'] 432 433 @property 434 def inner_nodes(self): 435 ''' return the list of inner nodes according to the DFS preordering algorithm''' 436 return [node for node in self.__class__(self._ntv) 437 if node.__class__.__name__ == 'NtvList'] 438 439 440 441 442class NtvJsonEncoder(json.JSONEncoder): 443 """json encoder for Ntv data""" 444 445 def default(self, o): 446 try: 447 return NtvConnector.cast(o)[0] 448 except NtvError: 449 return json.JSONEncoder.default(self, o) 450 if isinstance(o, (datetime.datetime, datetime.date, datetime.time)): 451 return o.isoformat() 452 return json.JSONEncoder.default(self, o) 453 454 455class NtvError(Exception): 456 ''' NTV Exception''' 457 # pass
19class NtvConnector(ABC): 20 ''' The NtvConnector class is an abstract class used by all NTV connectors 21 for conversion between NTV-JSON data and NTV-OBJ data. 22 23 A NtvConnector child is defined by: 24 - clas_obj: str - define the class name of the object to convert 25 - clas_typ: str - define the NTVtype of the converted object 26 - to_obj_ntv: method - converter from JsonNTV to the object 27 - to_json_ntv: method - converter from the object to JsonNTV 28 29 *class method :* 30 - `connector` 31 - `dic_connec` 32 - `castable` (@property) 33 - `dic_obj` (@property) 34 - `dic_type` (@property) 35 36 *abstract method* 37 - `to_obj_ntv` 38 - `to_json_ntv` 39 40 *static method* 41 - `cast` 42 - `uncast` 43 - `is_json_class` 44 - `is_json` 45 - `init_ntv_keys` 46 ''' 47 48 DIC_NTV_CL = {'NtvSingle': 'ntv', 'NtvList': 'ntv'} 49 DIC_GEO_CL = {'Point': 'point', 'MultiPoint': 'multipoint', 'LineString': 'line', 50 'MultiLineString': 'multiline', 'Polygon': 'polygon', 51 'MultiPolygon': 'multipolygon'} 52 DIC_DAT_CL = {'date': 'date', 'time': 'time', 'datetime': 'datetime'} 53 DIC_FCT = {'date': datetime.date.fromisoformat, 'time': datetime.time.fromisoformat, 54 'datetime': datetime.datetime.fromisoformat} 55 DIC_GEO = {'point': 'point', 'multipoint': 'multipoint', 'line': 'linestring', 56 'multiline': 'multilinestring', 'polygon': 'polygon', 57 'multipolygon': 'multipolygon'} 58 DIC_CBOR = {'point': False, 'multipoint': False, 'line': False, 59 'multiline': False, 'polygon': False, 'multipolygon': False, 60 'date': True, 'time': False, 'datetime': True} 61 DIC_OBJ = {'tab': 'DataFrameConnec', 'field': 'SeriesConnec', 62 'point': 'ShapelyConnec', 'multipoint': 'ShapelyConnec', 63 'line': 'ShapelyConnec', 'multiline': 'ShapelyConnec', 64 'polygon': 'ShapelyConnec', 'multipolygon': 'ShapelyConnec', 65 'other': None} 66 67 @classmethod 68 @property 69 def castable(cls): 70 '''return a list with class_name allowed for json-obj conversion''' 71 return ['str', 'int', 'bool', 'float', 'dict', 'tuple', 'NoneType', 72 'NtvSingle', 'NtvList'] \ 73 + list(NtvConnector.DIC_GEO_CL.keys()) \ 74 + list(NtvConnector.DIC_FCT.keys()) \ 75 + list(NtvConnector.dic_connec().keys()) 76 77 @classmethod 78 @property 79 def dic_obj(cls): 80 '''return a dict with the connectors: { type: class_connec_name }''' 81 return {clas.clas_typ: clas.__name__ for clas in cls.__subclasses__()} |\ 82 NtvConnector.DIC_OBJ 83 84 @classmethod 85 @property 86 def dic_type(cls): 87 '''return a dict with the connectors: { class_obj_name: type }''' 88 return {clas.clas_obj: clas.clas_typ for clas in cls.__subclasses__()} |\ 89 NtvConnector.DIC_GEO_CL | NtvConnector.DIC_DAT_CL | NtvConnector.DIC_NTV_CL 90 91 @classmethod 92 def connector(cls): 93 '''return a dict with the connectors: { class_connec_name: class_connec }''' 94 return {clas.__name__: clas for clas in cls.__subclasses__()} 95 96 @classmethod 97 def dic_connec(cls): 98 '''return a dict with the clas associated to the connector: 99 { class_obj_name: class_connec_name }''' 100 return {clas.clas_obj: clas.__name__ for clas in cls.__subclasses__()} 101 102 @staticmethod 103 @abstractmethod 104 def to_obj_ntv(ntv_value, **kwargs): 105 ''' abstract - convert ntv_value into the return object''' 106 107 @staticmethod 108 @abstractmethod 109 def to_json_ntv(value, name=None, typ=None): 110 ''' abstract - convert NTV object (value, name, type) into the NTV json 111 (json-value, name, type)''' 112 113 @staticmethod 114 def cast(data, name=None, type_str=None): 115 '''return JSON-NTV conversion (json_value, name, type_str) of the NTV entity 116 defined in parameters. 117 118 *Parameters* 119 120 - **data**: NtvSingle entity or NTVvalue of the NTV entity 121 - **name** : String (default None) - name of the NTV entity 122 - **type_str**: String (default None) - type of the NTV entity 123 ''' 124 clas = data.__class__.__name__ 125 if clas == 'NtvSingle': 126 name = data.ntv_name 127 type_str = data.type_str 128 data = data.ntv_value 129 dic_geo_cl = NtvConnector.DIC_GEO_CL 130 dic_connec = NtvConnector.dic_connec() 131 match clas: 132 case 'int' | 'float' | 'bool' | 'str': 133 return (data, name, type_str) 134 case 'dict': 135 return ({key: NtvConnector.cast(val, name, type_str)[0] 136 for key, val in data.items()}, name, type_str) 137 case 'list': 138 return ([NtvConnector.cast(val, name, type_str)[0] for val in data], 139 name, NtvConnector._typ_obj(data) if not type_str else type_str) 140 case 'tuple': 141 return (list(data), name, 'array' if not type_str else type_str) 142 case 'date' | 'time' | 'datetime': 143 return (data.isoformat(), name, clas if not type_str else type_str) 144 case 'Point' | 'MultiPoint' | 'LineString' | 'MultiLineString' | \ 145 'Polygon' | 'MultiPolygon': 146 return (NtvConnector.connector()[dic_connec['geometry']].to_json_ntv(data)[0], 147 name, dic_geo_cl[data.__class__.__name__] if not type_str else type_str) 148 case 'NtvSingle' | 'NtvList': 149 return (data.to_obj(), name, 'ntv' if not type_str else type_str) 150 case _: 151 connec = None 152 if clas in dic_connec and dic_connec[clas] in NtvConnector.connector(): 153 connec = NtvConnector.connector()[dic_connec[clas]] 154 if connec: 155 return connec.to_json_ntv(data, name, type_str) 156 raise NtvError( 157 'connector is not defined for the class : ', clas) 158 return (data, name, type_str) 159 160 @staticmethod 161 def uncast(value, name=None, type_str=None, **kwargs): 162 '''return OBJ-NTV conversion (obj_value, name, type_str) of a NTV entity 163 164 *Parameters* 165 166 - **data**: NtvSingle entity or NTVvalue of the NTV entity 167 - **name** : String (default None) - name of the NTV entity 168 - **type_str**: String (default None) - type of the NTV entity 169 ''' 170 if type_str == 'json': 171 return (value, name, type_str) 172 if value.__class__.__name__ == 'NtvSingle': 173 if not (type_str in set(NtvConnector.dic_type.values()) and 174 NtvConnector.is_json(value) or type_str is None): 175 return (value.ntv_value, value.name, value.type_str) 176 type_str = value.type_str if value.ntv_type else None 177 name = value.ntv_name 178 value = value.ntv_value 179 #dic_obj = NtvConnector.dic_obj 180 option = {'dicobj': {}, 'format': 'json', 'type_obj': False} | kwargs 181 value_obj = NtvConnector._uncast_val(value, type_str, **option) 182 return (value_obj, name, type_str if type_str else NtvConnector._typ_obj(value_obj)) 183 184 @staticmethod 185 def _typ_obj(value): 186 if isinstance(value, dict): 187 return NtvConnector._typ_obj(list(value.values())) 188 if isinstance(value, (tuple, list)): 189 for val in value: 190 typ = NtvConnector._typ_obj(val) 191 if typ: 192 return typ 193 return None 194 return NtvConnector.dic_type.get(value.__class__.__name__) 195 196 @staticmethod 197 def _uncast_val(value, type_n, **option): 198 '''return value from ntv value''' 199 dic_fct = NtvConnector.DIC_FCT 200 dic_geo = NtvConnector.DIC_GEO 201 dic_obj = NtvConnector.dic_obj | option['dicobj'] 202 dic_cbor = NtvConnector.DIC_CBOR 203 if not type_n or type_n == 'json' or (option['format'] == 'cbor' and 204 not dic_cbor.get(type_n, False)): 205 return value 206 if type_n in dic_fct: 207 if isinstance(value, (tuple, list)): 208 return [NtvConnector._uncast_val(val, type_n, **option) for val in value] 209 if isinstance(value, dict): 210 return {key: NtvConnector._uncast_val(val, type_n, **option) 211 for key, val in value.items()} 212 return dic_fct[type_n](value) 213 if type_n == 'array': 214 return tuple(value) 215 if type_n == 'ntv': 216 from json_ntv.ntv import Ntv 217 return Ntv.from_obj(value) 218 if type_n in dic_geo: 219 option['type_geo'] = dic_geo[type_n] 220 connec = None 221 if type_n in dic_obj and \ 222 dic_obj[type_n] in NtvConnector.connector(): 223 connec = NtvConnector.connector()[dic_obj[type_n]] 224 elif dic_obj['other'] in NtvConnector.connector(): 225 connec = NtvConnector.connector()['other'] 226 if connec: 227 return connec.to_obj_ntv(value, **option) 228 #raise NtvError('type of value not allowed for conversion') 229 return value 230 231 @staticmethod 232 def is_json_class(val): 233 ''' return True if val is a json type''' 234 return val is None or isinstance(val, (list, int, str, float, bool, dict)) 235 236 @staticmethod 237 def is_json(obj): 238 ''' check if obj is a json structure and return True if obj is a json-value 239 240 *Parameters* 241 242 - **obj** : object to check 243 - **ntvobj** : boolean (default False) - if True NTV class value are accepted''' 244 if obj is None: 245 return True 246 is_js = NtvConnector.is_json 247 match obj: 248 case str() | int() | float() | bool() as obj: 249 return True 250 case list() | tuple() as obj: 251 if not obj: 252 return True 253 return min(is_js(obj_in) for obj_in in obj) 254 case dict() as obj: 255 if not obj: 256 return True 257 if not min(isinstance(key, str) for key in obj.keys()): 258 raise NtvError('key in dict in not string') 259 return min(is_js(obj_in) for obj_in in obj.values()) 260 case _: 261 if not obj.__class__.__name__ in NtvConnector.castable: 262 raise NtvError(obj.__class__.__name__ + 263 'is not valid for NTV') 264 return False 265 266 @staticmethod 267 def keysfromderkeys(parentkeys, derkeys): 268 '''return keys from parent keys and derkeys 269 270 *Parameters* 271 272 - **parentkeys** : list of keys from parent 273 - **derkeys** : list of derived keys 274 275 *Returns* : list of keys''' 276 return [derkeys[pkey] for pkey in parentkeys] 277 278 @staticmethod 279 def encode_coef(lis): 280 '''Generate a repetition coefficient for periodic list''' 281 if len(lis) < 2: 282 return 0 283 coef = 1 284 while coef != len(lis): 285 if lis[coef-1] != lis[coef]: 286 break 287 coef += 1 288 if (not len(lis) % (coef * (max(lis) + 1)) and 289 lis == NtvConnector.keysfromcoef(coef, max(lis) + 1, len(lis))): 290 return coef 291 return 0 292 293 @staticmethod 294 def keysfromcoef(coef, period, leng=None): 295 ''' return a list of keys with periodic structure''' 296 if not leng: 297 leng = coef * period 298 return None if not (coef and period) else [(ind % (coef * period)) // coef 299 for ind in range(leng)] 300 @staticmethod 301 def init_ntv_keys(ind, lidx, leng): 302 ''' initialization of explicit keys data in lidx object of tabular data''' 303 # name: 0, type: 1, codec: 2, parent: 3, keys: 4, coef: 5, leng: 6 304 name, typ, codec, parent, keys, coef, length = lidx[ind] 305 if (keys, parent, coef) == (None, None, None): # full or unique 306 if len(codec) == 1: # unique 307 lidx[ind][4] = [0] * leng 308 elif len(codec) == leng: # full 309 lidx[ind][4] = list(range(leng)) 310 else: 311 raise NtvError('impossible to generate keys') 312 return 313 if keys and len(keys) > 1 and parent is None: #complete 314 return 315 if coef: #primary 316 lidx[ind][4] = [(ikey % (coef * len(codec))) // coef for ikey in range(leng)] 317 lidx[ind][3] = None 318 return 319 if parent is None: 320 raise NtvError('keys not referenced') 321 if not lidx[parent][4] or len(lidx[parent][4]) != leng: 322 NtvConnector.init_ntv_keys(parent, lidx, leng) 323 if not keys and len(codec) == len(lidx[parent][2]): # implicit 324 lidx[ind][4] = lidx[parent][4] 325 lidx[ind][3] = None 326 return 327 lidx[ind][4] = NtvConnector.keysfromderkeys(lidx[parent][4], keys) # relative 328 #lidx[ind][4] = [keys[pkey] for pkey in lidx[parent][4]] # relative 329 lidx[ind][3] = None 330 return
The NtvConnector class is an abstract class used by all NTV connectors for conversion between NTV-JSON data and NTV-OBJ data.
A NtvConnector child is defined by:
- clas_obj: str - define the class name of the object to convert
- clas_typ: str - define the NTVtype of the converted object
- to_obj_ntv: method - converter from JsonNTV to the object
- to_json_ntv: method - converter from the object to JsonNTV
class method :
connector
dic_connec
castable
(@property)dic_obj
(@property)dic_type
(@property)
abstract method
static method
91 @classmethod 92 def connector(cls): 93 '''return a dict with the connectors: { class_connec_name: class_connec }''' 94 return {clas.__name__: clas for clas in cls.__subclasses__()}
return a dict with the connectors: { class_connec_name: class_connec }
96 @classmethod 97 def dic_connec(cls): 98 '''return a dict with the clas associated to the connector: 99 { class_obj_name: class_connec_name }''' 100 return {clas.clas_obj: clas.__name__ for clas in cls.__subclasses__()}
return a dict with the clas associated to the connector: { class_obj_name: class_connec_name }
102 @staticmethod 103 @abstractmethod 104 def to_obj_ntv(ntv_value, **kwargs): 105 ''' abstract - convert ntv_value into the return object'''
abstract - convert ntv_value into the return object
107 @staticmethod 108 @abstractmethod 109 def to_json_ntv(value, name=None, typ=None): 110 ''' abstract - convert NTV object (value, name, type) into the NTV json 111 (json-value, name, type)'''
abstract - convert NTV object (value, name, type) into the NTV json (json-value, name, type)
113 @staticmethod 114 def cast(data, name=None, type_str=None): 115 '''return JSON-NTV conversion (json_value, name, type_str) of the NTV entity 116 defined in parameters. 117 118 *Parameters* 119 120 - **data**: NtvSingle entity or NTVvalue of the NTV entity 121 - **name** : String (default None) - name of the NTV entity 122 - **type_str**: String (default None) - type of the NTV entity 123 ''' 124 clas = data.__class__.__name__ 125 if clas == 'NtvSingle': 126 name = data.ntv_name 127 type_str = data.type_str 128 data = data.ntv_value 129 dic_geo_cl = NtvConnector.DIC_GEO_CL 130 dic_connec = NtvConnector.dic_connec() 131 match clas: 132 case 'int' | 'float' | 'bool' | 'str': 133 return (data, name, type_str) 134 case 'dict': 135 return ({key: NtvConnector.cast(val, name, type_str)[0] 136 for key, val in data.items()}, name, type_str) 137 case 'list': 138 return ([NtvConnector.cast(val, name, type_str)[0] for val in data], 139 name, NtvConnector._typ_obj(data) if not type_str else type_str) 140 case 'tuple': 141 return (list(data), name, 'array' if not type_str else type_str) 142 case 'date' | 'time' | 'datetime': 143 return (data.isoformat(), name, clas if not type_str else type_str) 144 case 'Point' | 'MultiPoint' | 'LineString' | 'MultiLineString' | \ 145 'Polygon' | 'MultiPolygon': 146 return (NtvConnector.connector()[dic_connec['geometry']].to_json_ntv(data)[0], 147 name, dic_geo_cl[data.__class__.__name__] if not type_str else type_str) 148 case 'NtvSingle' | 'NtvList': 149 return (data.to_obj(), name, 'ntv' if not type_str else type_str) 150 case _: 151 connec = None 152 if clas in dic_connec and dic_connec[clas] in NtvConnector.connector(): 153 connec = NtvConnector.connector()[dic_connec[clas]] 154 if connec: 155 return connec.to_json_ntv(data, name, type_str) 156 raise NtvError( 157 'connector is not defined for the class : ', clas) 158 return (data, name, type_str)
return JSON-NTV conversion (json_value, name, type_str) of the NTV entity defined in parameters.
Parameters
- data: NtvSingle entity or NTVvalue of the NTV entity
- name : String (default None) - name of the NTV entity
- type_str: String (default None) - type of the NTV entity
160 @staticmethod 161 def uncast(value, name=None, type_str=None, **kwargs): 162 '''return OBJ-NTV conversion (obj_value, name, type_str) of a NTV entity 163 164 *Parameters* 165 166 - **data**: NtvSingle entity or NTVvalue of the NTV entity 167 - **name** : String (default None) - name of the NTV entity 168 - **type_str**: String (default None) - type of the NTV entity 169 ''' 170 if type_str == 'json': 171 return (value, name, type_str) 172 if value.__class__.__name__ == 'NtvSingle': 173 if not (type_str in set(NtvConnector.dic_type.values()) and 174 NtvConnector.is_json(value) or type_str is None): 175 return (value.ntv_value, value.name, value.type_str) 176 type_str = value.type_str if value.ntv_type else None 177 name = value.ntv_name 178 value = value.ntv_value 179 #dic_obj = NtvConnector.dic_obj 180 option = {'dicobj': {}, 'format': 'json', 'type_obj': False} | kwargs 181 value_obj = NtvConnector._uncast_val(value, type_str, **option) 182 return (value_obj, name, type_str if type_str else NtvConnector._typ_obj(value_obj))
return OBJ-NTV conversion (obj_value, name, type_str) of a NTV entity
Parameters
- data: NtvSingle entity or NTVvalue of the NTV entity
- name : String (default None) - name of the NTV entity
- type_str: String (default None) - type of the NTV entity
231 @staticmethod 232 def is_json_class(val): 233 ''' return True if val is a json type''' 234 return val is None or isinstance(val, (list, int, str, float, bool, dict))
return True if val is a json type
236 @staticmethod 237 def is_json(obj): 238 ''' check if obj is a json structure and return True if obj is a json-value 239 240 *Parameters* 241 242 - **obj** : object to check 243 - **ntvobj** : boolean (default False) - if True NTV class value are accepted''' 244 if obj is None: 245 return True 246 is_js = NtvConnector.is_json 247 match obj: 248 case str() | int() | float() | bool() as obj: 249 return True 250 case list() | tuple() as obj: 251 if not obj: 252 return True 253 return min(is_js(obj_in) for obj_in in obj) 254 case dict() as obj: 255 if not obj: 256 return True 257 if not min(isinstance(key, str) for key in obj.keys()): 258 raise NtvError('key in dict in not string') 259 return min(is_js(obj_in) for obj_in in obj.values()) 260 case _: 261 if not obj.__class__.__name__ in NtvConnector.castable: 262 raise NtvError(obj.__class__.__name__ + 263 'is not valid for NTV') 264 return False
check if obj is a json structure and return True if obj is a json-value
Parameters
- obj : object to check
- ntvobj : boolean (default False) - if True NTV class value are accepted
266 @staticmethod 267 def keysfromderkeys(parentkeys, derkeys): 268 '''return keys from parent keys and derkeys 269 270 *Parameters* 271 272 - **parentkeys** : list of keys from parent 273 - **derkeys** : list of derived keys 274 275 *Returns* : list of keys''' 276 return [derkeys[pkey] for pkey in parentkeys]
return keys from parent keys and derkeys
Parameters
- parentkeys : list of keys from parent
- derkeys : list of derived keys
Returns : list of keys
278 @staticmethod 279 def encode_coef(lis): 280 '''Generate a repetition coefficient for periodic list''' 281 if len(lis) < 2: 282 return 0 283 coef = 1 284 while coef != len(lis): 285 if lis[coef-1] != lis[coef]: 286 break 287 coef += 1 288 if (not len(lis) % (coef * (max(lis) + 1)) and 289 lis == NtvConnector.keysfromcoef(coef, max(lis) + 1, len(lis))): 290 return coef 291 return 0
Generate a repetition coefficient for periodic list
293 @staticmethod 294 def keysfromcoef(coef, period, leng=None): 295 ''' return a list of keys with periodic structure''' 296 if not leng: 297 leng = coef * period 298 return None if not (coef and period) else [(ind % (coef * period)) // coef 299 for ind in range(leng)]
return a list of keys with periodic structure
300 @staticmethod 301 def init_ntv_keys(ind, lidx, leng): 302 ''' initialization of explicit keys data in lidx object of tabular data''' 303 # name: 0, type: 1, codec: 2, parent: 3, keys: 4, coef: 5, leng: 6 304 name, typ, codec, parent, keys, coef, length = lidx[ind] 305 if (keys, parent, coef) == (None, None, None): # full or unique 306 if len(codec) == 1: # unique 307 lidx[ind][4] = [0] * leng 308 elif len(codec) == leng: # full 309 lidx[ind][4] = list(range(leng)) 310 else: 311 raise NtvError('impossible to generate keys') 312 return 313 if keys and len(keys) > 1 and parent is None: #complete 314 return 315 if coef: #primary 316 lidx[ind][4] = [(ikey % (coef * len(codec))) // coef for ikey in range(leng)] 317 lidx[ind][3] = None 318 return 319 if parent is None: 320 raise NtvError('keys not referenced') 321 if not lidx[parent][4] or len(lidx[parent][4]) != leng: 322 NtvConnector.init_ntv_keys(parent, lidx, leng) 323 if not keys and len(codec) == len(lidx[parent][2]): # implicit 324 lidx[ind][4] = lidx[parent][4] 325 lidx[ind][3] = None 326 return 327 lidx[ind][4] = NtvConnector.keysfromderkeys(lidx[parent][4], keys) # relative 328 #lidx[ind][4] = [keys[pkey] for pkey in lidx[parent][4]] # relative 329 lidx[ind][3] = None 330 return
initialization of explicit keys data in lidx object of tabular data
332class NtvTree: 333 ''' The NtvTree class is an iterator class used to traverse a NTV tree structure. 334 Some other methods give tree indicators and data. 335 336 *Attributes :* 337 338 - **ntv** : Ntv entity 339 - **_node**: Ntv entity - node pointer 340 - **_stack**: list - stack used to store context in recursive methods 341 342 *dynamic values (@property)* 343 - `breadth` 344 - `size` 345 - `height` 346 - `adjacency_list` 347 - `nodes` 348 - `leaf_nodes` 349 - `inner_nodes` 350 ''' 351 352 def __init__(self, ntv): 353 ''' the parameter of the constructor is the Ntv entity''' 354 self._ntv = ntv 355 self._node = None 356 self._stack = [] 357 358 def __iter__(self): 359 ''' iterator without initialization''' 360 return self 361 362 def __next__(self): 363 ''' return next node in the tree''' 364 if self._node is None: 365 self._node = self._ntv 366 elif len(self._node) == 0: 367 raise StopIteration 368 elif self._node.__class__.__name__ == 'NtvList': 369 self._next_down() 370 else: 371 self._next_up() 372 return self._node 373 374 def _next_down(self): 375 ''' find the next subchild node''' 376 377 self._node = self._node[0] 378 self._stack.append(0) 379 380 def _next_up(self): 381 ''' find the next sibling or ancestor node''' 382 parent = self._node.parent 383 if not parent or self._node == self._ntv: 384 raise StopIteration 385 ind = self._stack[-1] 386 if ind < len(parent) - 1: # if ind is not the last 387 self._node = parent[ind + 1] 388 self._stack[-1] += 1 389 else: 390 if parent == self._ntv: 391 raise StopIteration 392 self._node = parent 393 self._stack.pop() 394 self._next_up() 395 396 @property 397 def breadth(self): 398 ''' return the number of leaves''' 399 return len(self.leaf_nodes) 400 401 @property 402 def size(self): 403 ''' return the number of nodes''' 404 return len(self.nodes) 405 406 @property 407 def height(self): 408 ''' return the height of the tree''' 409 return max(len(node.pointer()) for node in self.__class__(self._ntv)) 410 411 @property 412 def adjacency_list(self): 413 ''' return a dict with the list of child nodes for each parent node''' 414 return {node: node.val for node in self.inner_nodes} 415 416 @property 417 def nodes(self): 418 ''' return the list of nodes according to the DFS preordering algorithm''' 419 return list(self.__class__(self._ntv)) 420 421 @property 422 def dic_nodes(self): 423 ''' return a dict of nodes according to the DFS preordering algorithm''' 424 return {node.ntv_name: node for node in self.__class__(self._ntv) 425 if node.ntv_name} 426 427 @property 428 def leaf_nodes(self): 429 ''' return the list of leaf nodes according to the DFS preordering algorithm''' 430 #return [node for node in self.__class__(self._ntv) if not isinstance(node, NtvList)] 431 return [node for node in self.__class__(self._ntv) 432 if node.__class__.__name__ == 'NtvSingle'] 433 434 @property 435 def inner_nodes(self): 436 ''' return the list of inner nodes according to the DFS preordering algorithm''' 437 return [node for node in self.__class__(self._ntv) 438 if node.__class__.__name__ == 'NtvList']
The NtvTree class is an iterator class used to traverse a NTV tree structure. Some other methods give tree indicators and data.
Attributes :
- ntv : Ntv entity
- _node: Ntv entity - node pointer
- _stack: list - stack used to store context in recursive methods
dynamic values (@property)
443class NtvJsonEncoder(json.JSONEncoder): 444 """json encoder for Ntv data""" 445 446 def default(self, o): 447 try: 448 return NtvConnector.cast(o)[0] 449 except NtvError: 450 return json.JSONEncoder.default(self, o) 451 if isinstance(o, (datetime.datetime, datetime.date, datetime.time)): 452 return o.isoformat() 453 return json.JSONEncoder.default(self, o)
json encoder for Ntv data
446 def default(self, o): 447 try: 448 return NtvConnector.cast(o)[0] 449 except NtvError: 450 return json.JSONEncoder.default(self, o) 451 if isinstance(o, (datetime.datetime, datetime.date, datetime.time)): 452 return o.isoformat() 453 return json.JSONEncoder.default(self, o)
Implement this method in a subclass such that it returns
a serializable object for o
, or calls the base implementation
(to raise a TypeError
).
For example, to support arbitrary iterators, you could implement default like this::
def default(self, o):
try:
iterable = iter(o)
except TypeError:
pass
else:
return list(iterable)
# Let the base class default method raise the TypeError
return JSONEncoder.default(self, o)
Inherited Members
- json.encoder.JSONEncoder
- JSONEncoder
- encode
- iterencode
NTV Exception
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback