NTV.json_ntv.ntv_util

Created on Feb 27 2023

@author: Philippe@loco-labs.io

The ntv_util module is part of the NTV.json_ntv package (specification document).

It contains the classes NtvConnector, NtvTree, NtvJsonEncoder and NtvError for NTV entities.

  1# -*- coding: utf-8 -*-
  2"""
  3Created on Feb 27 2023
  4
  5@author: Philippe@loco-labs.io
  6
  7The `ntv_util` module is part of the `NTV.json_ntv` package ([specification document](
  8https://github.com/loco-philippe/NTV/blob/main/documentation/JSON-NTV-standard.pdf)).
  9
 10It contains the classes `NtvConnector`, `NtvTree`, `NtvJsonEncoder` and `NtvError`
 11for NTV entities.
 12"""
 13from abc import ABC, abstractmethod
 14import datetime
 15import json
 16
 17
 18class NtvConnector(ABC):
 19    ''' The NtvConnector class is an abstract class used by all NTV connectors
 20    for conversion between NTV-JSON data and NTV-OBJ data.
 21
 22    A NtvConnector child is defined by:
 23    - clas_obj: str - define the class name of the object to convert
 24    - clas_typ: str - define the NTVtype of the converted object
 25    - to_obj_ntv: method - converter from JsonNTV to the object
 26    - to_json_ntv: method - converter from the object to JsonNTV
 27    
 28    *class method :*
 29    - `connector`
 30    - `dic_connec`
 31    - `castable` (@property)
 32    - `dic_obj` (@property)
 33    - `dic_type` (@property)
 34
 35    *abstract method*
 36    - `to_obj_ntv`
 37    - `to_json_ntv`
 38
 39    *static method*
 40    - `cast`
 41    - `uncast`
 42    - `is_json_class`
 43    - `is_json`
 44    - `init_ntv_keys`
 45    '''
 46
 47    DIC_NTV_CL = {'NtvSingle': 'ntv', 'NtvList': 'ntv'}
 48    DIC_GEO_CL = {'Point': 'point', 'MultiPoint': 'multipoint', 'LineString': 'line',
 49                  'MultiLineString': 'multiline', 'Polygon': 'polygon',
 50                  'MultiPolygon': 'multipolygon'}
 51    DIC_DAT_CL = {'date': 'date', 'time': 'time', 'datetime': 'datetime'}
 52    DIC_FCT = {'date': datetime.date.fromisoformat, 'time': datetime.time.fromisoformat,
 53               'datetime': datetime.datetime.fromisoformat}
 54    DIC_GEO = {'point': 'point', 'multipoint': 'multipoint', 'line': 'linestring',
 55               'multiline': 'multilinestring', 'polygon': 'polygon',
 56               'multipolygon': 'multipolygon'}
 57    DIC_CBOR = {'point': False, 'multipoint': False, 'line': False,
 58                'multiline': False, 'polygon': False, 'multipolygon': False,
 59                'date': True, 'time': False, 'datetime': True}
 60    DIC_OBJ = {'tab': 'DataFrameConnec', 'field': 'SeriesConnec',
 61               'point': 'ShapelyConnec', 'multipoint': 'ShapelyConnec',
 62               'line': 'ShapelyConnec', 'multiline': 'ShapelyConnec',
 63               'polygon': 'ShapelyConnec', 'multipolygon': 'ShapelyConnec',
 64               'other': None}
 65
 66    @classmethod
 67    @property
 68    def castable(cls):
 69        '''return a list with class_name allowed for json-obj conversion'''
 70        return ['str', 'int', 'bool', 'float', 'dict', 'tuple', 'NoneType',
 71                'NtvSingle', 'NtvList'] \
 72            + list(NtvConnector.DIC_GEO_CL.keys()) \
 73            + list(NtvConnector.DIC_FCT.keys()) \
 74            + list(NtvConnector.dic_connec().keys())
 75
 76    @classmethod
 77    @property
 78    def dic_obj(cls):
 79        '''return a dict with the connectors: { type: class_connec_name }'''
 80        return {clas.clas_typ: clas.__name__ for clas in cls.__subclasses__()} |\
 81            NtvConnector.DIC_OBJ
 82
 83    @classmethod
 84    @property
 85    def dic_type(cls):
 86        '''return a dict with the connectors: { class_obj_name: type }'''
 87        return {clas.clas_obj: clas.clas_typ for clas in cls.__subclasses__()} |\
 88            NtvConnector.DIC_GEO_CL | NtvConnector.DIC_DAT_CL | NtvConnector.DIC_NTV_CL
 89
 90    @classmethod
 91    def connector(cls):
 92        '''return a dict with the connectors: { class_connec_name: class_connec }'''
 93        return {clas.__name__: clas for clas in cls.__subclasses__()}
 94
 95    @classmethod
 96    def dic_connec(cls):
 97        '''return a dict with the clas associated to the connector:
 98        { class_obj_name: class_connec_name }'''
 99        return {clas.clas_obj: clas.__name__ for clas in cls.__subclasses__()}
100
101    @staticmethod
102    @abstractmethod
103    def to_obj_ntv(ntv_value, **kwargs):
104        ''' abstract - convert ntv_value into the return object'''
105
106    @staticmethod
107    @abstractmethod
108    def to_json_ntv(value, name=None, typ=None):
109        ''' abstract - convert NTV object (value, name, type) into the NTV json
110        (json-value, name, type)'''
111
112    @staticmethod
113    def cast(data, name=None, type_str=None):
114        '''return JSON-NTV conversion (json_value, name, type_str) of the NTV entity
115        defined in parameters.
116
117        *Parameters*
118
119        - **data**: NtvSingle entity or NTVvalue of the NTV entity
120        - **name** : String (default None) - name of the NTV entity
121        - **type_str**: String (default None) - type of the NTV entity
122        '''
123        clas = data.__class__.__name__
124        if clas == 'NtvSingle':
125            name = data.ntv_name
126            type_str = data.type_str
127            data = data.ntv_value
128        dic_geo_cl = NtvConnector.DIC_GEO_CL
129        dic_connec = NtvConnector.dic_connec()
130        match clas:
131            case 'int' | 'float' | 'bool' | 'str':
132                return (data, name, type_str)
133            case 'dict':
134                return ({key: NtvConnector.cast(val, name, type_str)[0]
135                         for key, val in data.items()}, name, type_str)
136            case 'list':
137                return ([NtvConnector.cast(val, name, type_str)[0] for val in data],
138                        name, NtvConnector._typ_obj(data) if not type_str else type_str)
139            case 'tuple':
140                return (list(data), name, 'array' if not type_str else type_str)
141            case 'date' | 'time' | 'datetime':
142                return (data.isoformat(), name, clas if not type_str else type_str)
143            case 'Point' | 'MultiPoint' | 'LineString' | 'MultiLineString' | \
144                    'Polygon' | 'MultiPolygon':
145                return (NtvConnector.connector()[dic_connec['geometry']].to_json_ntv(data)[0],
146                        name, dic_geo_cl[data.__class__.__name__] if not type_str else type_str)
147            case 'NtvSingle' | 'NtvList':
148                return (data.to_obj(), name, 'ntv' if not type_str else type_str)
149            case _:
150                connec = None
151                if clas in dic_connec and dic_connec[clas] in NtvConnector.connector():
152                    connec = NtvConnector.connector()[dic_connec[clas]]
153                if connec:
154                    return connec.to_json_ntv(data, name, type_str)
155                raise NtvError(
156                    'connector is not defined for the class : ', clas)
157        return (data, name, type_str)
158
159    @staticmethod
160    def uncast(value, name=None, type_str=None, **kwargs):
161        '''return OBJ-NTV conversion (obj_value, name, type_str) of a NTV entity
162
163        *Parameters*
164
165        - **data**: NtvSingle entity or NTVvalue of the NTV entity
166        - **name** : String (default None) - name of the NTV entity
167        - **type_str**: String (default None) - type of the NTV entity
168        '''
169        if type_str == 'json':
170            return (value, name, type_str)
171        if value.__class__.__name__ == 'NtvSingle':
172            if not (type_str in set(NtvConnector.dic_type.values()) and 
173                    NtvConnector.is_json(value) or type_str is None):
174                return (value.ntv_value, value.name, value.type_str)
175            type_str = value.type_str if value.ntv_type else None
176            name = value.ntv_name
177            value = value.ntv_value
178        #dic_obj = NtvConnector.dic_obj
179        option = {'dicobj': {}, 'format': 'json', 'type_obj': False} | kwargs
180        value_obj = NtvConnector._uncast_val(value, type_str, **option)
181        return (value_obj, name, type_str if type_str else NtvConnector._typ_obj(value_obj))
182
183    @staticmethod
184    def _typ_obj(value):
185        if isinstance(value, dict):
186            return NtvConnector._typ_obj(list(value.values()))
187        if isinstance(value, (tuple, list)):
188            for val in value:
189                typ = NtvConnector._typ_obj(val)
190                if typ:
191                    return typ
192            return None
193        return NtvConnector.dic_type.get(value.__class__.__name__)
194
195    @staticmethod
196    def _uncast_val(value, type_n, **option):
197        '''return value from ntv value'''
198        dic_fct = NtvConnector.DIC_FCT
199        dic_geo = NtvConnector.DIC_GEO
200        dic_obj = NtvConnector.dic_obj | option['dicobj']
201        dic_cbor = NtvConnector.DIC_CBOR
202        if not type_n or type_n == 'json' or (option['format'] == 'cbor' and 
203                                              not dic_cbor.get(type_n, False)):
204            return value
205        if type_n in dic_fct:
206            if isinstance(value, (tuple, list)):
207                return [NtvConnector._uncast_val(val, type_n, **option) for val in value]
208            if isinstance(value, dict):
209                return {key: NtvConnector._uncast_val(val, type_n, **option)
210                        for key, val in value.items()}
211            return dic_fct[type_n](value)
212        if type_n == 'array':
213            return tuple(value)
214        if type_n == 'ntv':
215            from json_ntv.ntv import Ntv
216            return Ntv.from_obj(value)
217        if type_n in dic_geo:
218            option['type_geo'] = dic_geo[type_n]
219        connec = None
220        if type_n in dic_obj and \
221                dic_obj[type_n] in NtvConnector.connector():
222            connec = NtvConnector.connector()[dic_obj[type_n]]
223        elif dic_obj['other'] in NtvConnector.connector():
224            connec = NtvConnector.connector()['other']
225        if connec:
226            return connec.to_obj_ntv(value, **option)
227        #raise NtvError('type of value not allowed for conversion')
228        return value
229
230    @staticmethod
231    def is_json_class(val):
232        ''' return True if val is a json type'''
233        return val is None or isinstance(val, (list, int, str, float, bool, dict))
234
235    @staticmethod
236    def is_json(obj):
237        ''' check if obj is a json structure and return True if obj is a json-value
238
239        *Parameters*
240
241        - **obj** : object to check
242        - **ntvobj** : boolean (default False) - if True NTV class value are accepted'''
243        if obj is None:
244            return True
245        is_js = NtvConnector.is_json
246        match obj:
247            case str() | int() | float() | bool() as obj:
248                return True
249            case list() | tuple() as obj:
250                if not obj:
251                    return True
252                return min(is_js(obj_in) for obj_in in obj)
253            case dict() as obj:
254                if not obj:
255                    return True
256                if not min(isinstance(key, str) for key in obj.keys()):
257                    raise NtvError('key in dict in not string')
258                return min(is_js(obj_in) for obj_in in obj.values())
259            case _:
260                if not obj.__class__.__name__ in NtvConnector.castable:
261                    raise NtvError(obj.__class__.__name__ +
262                                   'is not valid for NTV')
263                return False
264
265    @staticmethod
266    def keysfromderkeys(parentkeys, derkeys):
267        '''return keys from parent keys and derkeys
268
269        *Parameters*
270
271        - **parentkeys** : list of keys from parent
272        - **derkeys** : list of derived keys
273
274        *Returns* : list of keys'''
275        return [derkeys[pkey] for pkey in parentkeys]
276
277    @staticmethod 
278    def encode_coef(lis):
279        '''Generate a repetition coefficient for periodic list'''
280        if len(lis) < 2:
281            return 0
282        coef = 1
283        while coef != len(lis):
284            if lis[coef-1] != lis[coef]:
285                break
286            coef += 1
287        if (not len(lis) % (coef * (max(lis) + 1)) and 
288            lis == NtvConnector.keysfromcoef(coef, max(lis) + 1, len(lis))):
289            return coef
290        return 0
291    
292    @staticmethod 
293    def keysfromcoef(coef, period, leng=None):
294        ''' return a list of keys with periodic structure'''
295        if not leng:
296            leng = coef * period
297        return None if not (coef and period) else [(ind % (coef * period)) // coef 
298                                                   for ind in range(leng)]    
299    @staticmethod
300    def init_ntv_keys(ind, lidx, leng):
301        ''' initialization of explicit keys data in lidx object of tabular data'''
302        # name: 0, type: 1, codec: 2, parent: 3, keys: 4, coef: 5, leng: 6
303        name, typ, codec, parent, keys, coef, length = lidx[ind]
304        if (keys, parent, coef) == (None, None, None):  # full or unique
305            if len(codec) == 1: # unique
306                lidx[ind][4] = [0] * leng
307            elif len(codec) == leng:    # full
308                lidx[ind][4] = list(range(leng))
309            else:
310                raise NtvError('impossible to generate keys')
311            return
312        if keys and len(keys) > 1 and parent is None:  #complete
313            return
314        if coef:  #primary
315            lidx[ind][4] = [(ikey % (coef * len(codec))) // coef for ikey in range(leng)]
316            lidx[ind][3] = None
317            return  
318        if parent is None:
319            raise NtvError('keys not referenced')          
320        if not lidx[parent][4] or len(lidx[parent][4]) != leng:
321            NtvConnector.init_ntv_keys(parent, lidx, leng)
322        if not keys and len(codec) == len(lidx[parent][2]):    # implicit
323            lidx[ind][4] = lidx[parent][4]
324            lidx[ind][3] = None
325            return
326        lidx[ind][4] = NtvConnector.keysfromderkeys(lidx[parent][4], keys)  # relative
327        #lidx[ind][4] = [keys[pkey] for pkey in lidx[parent][4]]  # relative
328        lidx[ind][3] = None
329        return    
330    
331class NtvTree:
332    ''' The NtvTree class is an iterator class used to traverse a NTV tree structure.
333    Some other methods give tree indicators and data.
334
335    *Attributes :*
336
337    - **ntv** : Ntv entity
338    - **_node**:  Ntv entity - node pointer
339    - **_stack**:  list - stack used to store context in recursive methods 
340
341    *dynamic values (@property)*
342    - `breadth`
343    - `size`
344    - `height`
345    - `adjacency_list`
346    - `nodes`
347    - `leaf_nodes`
348    - `inner_nodes`
349    '''
350
351    def __init__(self, ntv):
352        ''' the parameter of the constructor is the Ntv entity'''
353        self._ntv = ntv
354        self._node = None
355        self._stack = []
356
357    def __iter__(self):
358        ''' iterator without initialization'''
359        return self
360
361    def __next__(self):
362        ''' return next node in the tree'''
363        if self._node is None:
364            self._node = self._ntv
365        elif len(self._node) == 0:
366            raise StopIteration
367        elif self._node.__class__.__name__ == 'NtvList':
368            self._next_down()
369        else:
370            self._next_up()
371        return self._node
372
373    def _next_down(self):
374        ''' find the next subchild node'''
375
376        self._node = self._node[0]
377        self._stack.append(0)
378
379    def _next_up(self):
380        ''' find the next sibling or ancestor node'''
381        parent = self._node.parent
382        if not parent or self._node == self._ntv:
383            raise StopIteration
384        ind = self._stack[-1]
385        if ind < len(parent) - 1:  # if ind is not the last
386            self._node = parent[ind + 1]
387            self._stack[-1] += 1
388        else:
389            if parent == self._ntv:
390                raise StopIteration
391            self._node = parent
392            self._stack.pop()
393            self._next_up()
394            
395    @property
396    def breadth(self):
397        ''' return the number of leaves'''
398        return len(self.leaf_nodes)
399
400    @property
401    def size(self):
402        ''' return the number of nodes'''
403        return len(self.nodes)
404
405    @property
406    def height(self):
407        ''' return the height of the tree'''
408        return max(len(node.pointer()) for node in self.__class__(self._ntv))
409
410    @property
411    def adjacency_list(self):
412        ''' return a dict with the list of child nodes for each parent node'''
413        return {node: node.val for node in self.inner_nodes}
414
415    @property
416    def nodes(self):
417        ''' return the list of nodes according to the DFS preordering algorithm'''
418        return list(self.__class__(self._ntv))
419
420    @property
421    def dic_nodes(self):
422        ''' return a dict of nodes according to the DFS preordering algorithm'''
423        return {node.ntv_name: node for node in self.__class__(self._ntv)
424                if node.ntv_name}
425
426    @property
427    def leaf_nodes(self):
428        ''' return the list of leaf nodes according to the DFS preordering algorithm'''
429        #return [node for node in self.__class__(self._ntv) if not isinstance(node, NtvList)]
430        return [node for node in self.__class__(self._ntv)
431                if node.__class__.__name__ == 'NtvSingle']
432
433    @property
434    def inner_nodes(self):
435        ''' return the list of inner nodes according to the DFS preordering algorithm'''
436        return [node for node in self.__class__(self._ntv)
437                if node.__class__.__name__ == 'NtvList']
438
439
440
441
442class NtvJsonEncoder(json.JSONEncoder):
443    """json encoder for Ntv data"""
444
445    def default(self, o):
446        try:
447            return NtvConnector.cast(o)[0]
448        except NtvError:
449            return json.JSONEncoder.default(self, o)
450        if isinstance(o, (datetime.datetime, datetime.date, datetime.time)):
451            return o.isoformat()
452        return json.JSONEncoder.default(self, o)
453
454
455class NtvError(Exception):
456    ''' NTV Exception'''
457    # pass
class NtvConnector(abc.ABC):
 19class NtvConnector(ABC):
 20    ''' The NtvConnector class is an abstract class used by all NTV connectors
 21    for conversion between NTV-JSON data and NTV-OBJ data.
 22
 23    A NtvConnector child is defined by:
 24    - clas_obj: str - define the class name of the object to convert
 25    - clas_typ: str - define the NTVtype of the converted object
 26    - to_obj_ntv: method - converter from JsonNTV to the object
 27    - to_json_ntv: method - converter from the object to JsonNTV
 28    
 29    *class method :*
 30    - `connector`
 31    - `dic_connec`
 32    - `castable` (@property)
 33    - `dic_obj` (@property)
 34    - `dic_type` (@property)
 35
 36    *abstract method*
 37    - `to_obj_ntv`
 38    - `to_json_ntv`
 39
 40    *static method*
 41    - `cast`
 42    - `uncast`
 43    - `is_json_class`
 44    - `is_json`
 45    - `init_ntv_keys`
 46    '''
 47
 48    DIC_NTV_CL = {'NtvSingle': 'ntv', 'NtvList': 'ntv'}
 49    DIC_GEO_CL = {'Point': 'point', 'MultiPoint': 'multipoint', 'LineString': 'line',
 50                  'MultiLineString': 'multiline', 'Polygon': 'polygon',
 51                  'MultiPolygon': 'multipolygon'}
 52    DIC_DAT_CL = {'date': 'date', 'time': 'time', 'datetime': 'datetime'}
 53    DIC_FCT = {'date': datetime.date.fromisoformat, 'time': datetime.time.fromisoformat,
 54               'datetime': datetime.datetime.fromisoformat}
 55    DIC_GEO = {'point': 'point', 'multipoint': 'multipoint', 'line': 'linestring',
 56               'multiline': 'multilinestring', 'polygon': 'polygon',
 57               'multipolygon': 'multipolygon'}
 58    DIC_CBOR = {'point': False, 'multipoint': False, 'line': False,
 59                'multiline': False, 'polygon': False, 'multipolygon': False,
 60                'date': True, 'time': False, 'datetime': True}
 61    DIC_OBJ = {'tab': 'DataFrameConnec', 'field': 'SeriesConnec',
 62               'point': 'ShapelyConnec', 'multipoint': 'ShapelyConnec',
 63               'line': 'ShapelyConnec', 'multiline': 'ShapelyConnec',
 64               'polygon': 'ShapelyConnec', 'multipolygon': 'ShapelyConnec',
 65               'other': None}
 66
 67    @classmethod
 68    @property
 69    def castable(cls):
 70        '''return a list with class_name allowed for json-obj conversion'''
 71        return ['str', 'int', 'bool', 'float', 'dict', 'tuple', 'NoneType',
 72                'NtvSingle', 'NtvList'] \
 73            + list(NtvConnector.DIC_GEO_CL.keys()) \
 74            + list(NtvConnector.DIC_FCT.keys()) \
 75            + list(NtvConnector.dic_connec().keys())
 76
 77    @classmethod
 78    @property
 79    def dic_obj(cls):
 80        '''return a dict with the connectors: { type: class_connec_name }'''
 81        return {clas.clas_typ: clas.__name__ for clas in cls.__subclasses__()} |\
 82            NtvConnector.DIC_OBJ
 83
 84    @classmethod
 85    @property
 86    def dic_type(cls):
 87        '''return a dict with the connectors: { class_obj_name: type }'''
 88        return {clas.clas_obj: clas.clas_typ for clas in cls.__subclasses__()} |\
 89            NtvConnector.DIC_GEO_CL | NtvConnector.DIC_DAT_CL | NtvConnector.DIC_NTV_CL
 90
 91    @classmethod
 92    def connector(cls):
 93        '''return a dict with the connectors: { class_connec_name: class_connec }'''
 94        return {clas.__name__: clas for clas in cls.__subclasses__()}
 95
 96    @classmethod
 97    def dic_connec(cls):
 98        '''return a dict with the clas associated to the connector:
 99        { class_obj_name: class_connec_name }'''
100        return {clas.clas_obj: clas.__name__ for clas in cls.__subclasses__()}
101
102    @staticmethod
103    @abstractmethod
104    def to_obj_ntv(ntv_value, **kwargs):
105        ''' abstract - convert ntv_value into the return object'''
106
107    @staticmethod
108    @abstractmethod
109    def to_json_ntv(value, name=None, typ=None):
110        ''' abstract - convert NTV object (value, name, type) into the NTV json
111        (json-value, name, type)'''
112
113    @staticmethod
114    def cast(data, name=None, type_str=None):
115        '''return JSON-NTV conversion (json_value, name, type_str) of the NTV entity
116        defined in parameters.
117
118        *Parameters*
119
120        - **data**: NtvSingle entity or NTVvalue of the NTV entity
121        - **name** : String (default None) - name of the NTV entity
122        - **type_str**: String (default None) - type of the NTV entity
123        '''
124        clas = data.__class__.__name__
125        if clas == 'NtvSingle':
126            name = data.ntv_name
127            type_str = data.type_str
128            data = data.ntv_value
129        dic_geo_cl = NtvConnector.DIC_GEO_CL
130        dic_connec = NtvConnector.dic_connec()
131        match clas:
132            case 'int' | 'float' | 'bool' | 'str':
133                return (data, name, type_str)
134            case 'dict':
135                return ({key: NtvConnector.cast(val, name, type_str)[0]
136                         for key, val in data.items()}, name, type_str)
137            case 'list':
138                return ([NtvConnector.cast(val, name, type_str)[0] for val in data],
139                        name, NtvConnector._typ_obj(data) if not type_str else type_str)
140            case 'tuple':
141                return (list(data), name, 'array' if not type_str else type_str)
142            case 'date' | 'time' | 'datetime':
143                return (data.isoformat(), name, clas if not type_str else type_str)
144            case 'Point' | 'MultiPoint' | 'LineString' | 'MultiLineString' | \
145                    'Polygon' | 'MultiPolygon':
146                return (NtvConnector.connector()[dic_connec['geometry']].to_json_ntv(data)[0],
147                        name, dic_geo_cl[data.__class__.__name__] if not type_str else type_str)
148            case 'NtvSingle' | 'NtvList':
149                return (data.to_obj(), name, 'ntv' if not type_str else type_str)
150            case _:
151                connec = None
152                if clas in dic_connec and dic_connec[clas] in NtvConnector.connector():
153                    connec = NtvConnector.connector()[dic_connec[clas]]
154                if connec:
155                    return connec.to_json_ntv(data, name, type_str)
156                raise NtvError(
157                    'connector is not defined for the class : ', clas)
158        return (data, name, type_str)
159
160    @staticmethod
161    def uncast(value, name=None, type_str=None, **kwargs):
162        '''return OBJ-NTV conversion (obj_value, name, type_str) of a NTV entity
163
164        *Parameters*
165
166        - **data**: NtvSingle entity or NTVvalue of the NTV entity
167        - **name** : String (default None) - name of the NTV entity
168        - **type_str**: String (default None) - type of the NTV entity
169        '''
170        if type_str == 'json':
171            return (value, name, type_str)
172        if value.__class__.__name__ == 'NtvSingle':
173            if not (type_str in set(NtvConnector.dic_type.values()) and 
174                    NtvConnector.is_json(value) or type_str is None):
175                return (value.ntv_value, value.name, value.type_str)
176            type_str = value.type_str if value.ntv_type else None
177            name = value.ntv_name
178            value = value.ntv_value
179        #dic_obj = NtvConnector.dic_obj
180        option = {'dicobj': {}, 'format': 'json', 'type_obj': False} | kwargs
181        value_obj = NtvConnector._uncast_val(value, type_str, **option)
182        return (value_obj, name, type_str if type_str else NtvConnector._typ_obj(value_obj))
183
184    @staticmethod
185    def _typ_obj(value):
186        if isinstance(value, dict):
187            return NtvConnector._typ_obj(list(value.values()))
188        if isinstance(value, (tuple, list)):
189            for val in value:
190                typ = NtvConnector._typ_obj(val)
191                if typ:
192                    return typ
193            return None
194        return NtvConnector.dic_type.get(value.__class__.__name__)
195
196    @staticmethod
197    def _uncast_val(value, type_n, **option):
198        '''return value from ntv value'''
199        dic_fct = NtvConnector.DIC_FCT
200        dic_geo = NtvConnector.DIC_GEO
201        dic_obj = NtvConnector.dic_obj | option['dicobj']
202        dic_cbor = NtvConnector.DIC_CBOR
203        if not type_n or type_n == 'json' or (option['format'] == 'cbor' and 
204                                              not dic_cbor.get(type_n, False)):
205            return value
206        if type_n in dic_fct:
207            if isinstance(value, (tuple, list)):
208                return [NtvConnector._uncast_val(val, type_n, **option) for val in value]
209            if isinstance(value, dict):
210                return {key: NtvConnector._uncast_val(val, type_n, **option)
211                        for key, val in value.items()}
212            return dic_fct[type_n](value)
213        if type_n == 'array':
214            return tuple(value)
215        if type_n == 'ntv':
216            from json_ntv.ntv import Ntv
217            return Ntv.from_obj(value)
218        if type_n in dic_geo:
219            option['type_geo'] = dic_geo[type_n]
220        connec = None
221        if type_n in dic_obj and \
222                dic_obj[type_n] in NtvConnector.connector():
223            connec = NtvConnector.connector()[dic_obj[type_n]]
224        elif dic_obj['other'] in NtvConnector.connector():
225            connec = NtvConnector.connector()['other']
226        if connec:
227            return connec.to_obj_ntv(value, **option)
228        #raise NtvError('type of value not allowed for conversion')
229        return value
230
231    @staticmethod
232    def is_json_class(val):
233        ''' return True if val is a json type'''
234        return val is None or isinstance(val, (list, int, str, float, bool, dict))
235
236    @staticmethod
237    def is_json(obj):
238        ''' check if obj is a json structure and return True if obj is a json-value
239
240        *Parameters*
241
242        - **obj** : object to check
243        - **ntvobj** : boolean (default False) - if True NTV class value are accepted'''
244        if obj is None:
245            return True
246        is_js = NtvConnector.is_json
247        match obj:
248            case str() | int() | float() | bool() as obj:
249                return True
250            case list() | tuple() as obj:
251                if not obj:
252                    return True
253                return min(is_js(obj_in) for obj_in in obj)
254            case dict() as obj:
255                if not obj:
256                    return True
257                if not min(isinstance(key, str) for key in obj.keys()):
258                    raise NtvError('key in dict in not string')
259                return min(is_js(obj_in) for obj_in in obj.values())
260            case _:
261                if not obj.__class__.__name__ in NtvConnector.castable:
262                    raise NtvError(obj.__class__.__name__ +
263                                   'is not valid for NTV')
264                return False
265
266    @staticmethod
267    def keysfromderkeys(parentkeys, derkeys):
268        '''return keys from parent keys and derkeys
269
270        *Parameters*
271
272        - **parentkeys** : list of keys from parent
273        - **derkeys** : list of derived keys
274
275        *Returns* : list of keys'''
276        return [derkeys[pkey] for pkey in parentkeys]
277
278    @staticmethod 
279    def encode_coef(lis):
280        '''Generate a repetition coefficient for periodic list'''
281        if len(lis) < 2:
282            return 0
283        coef = 1
284        while coef != len(lis):
285            if lis[coef-1] != lis[coef]:
286                break
287            coef += 1
288        if (not len(lis) % (coef * (max(lis) + 1)) and 
289            lis == NtvConnector.keysfromcoef(coef, max(lis) + 1, len(lis))):
290            return coef
291        return 0
292    
293    @staticmethod 
294    def keysfromcoef(coef, period, leng=None):
295        ''' return a list of keys with periodic structure'''
296        if not leng:
297            leng = coef * period
298        return None if not (coef and period) else [(ind % (coef * period)) // coef 
299                                                   for ind in range(leng)]    
300    @staticmethod
301    def init_ntv_keys(ind, lidx, leng):
302        ''' initialization of explicit keys data in lidx object of tabular data'''
303        # name: 0, type: 1, codec: 2, parent: 3, keys: 4, coef: 5, leng: 6
304        name, typ, codec, parent, keys, coef, length = lidx[ind]
305        if (keys, parent, coef) == (None, None, None):  # full or unique
306            if len(codec) == 1: # unique
307                lidx[ind][4] = [0] * leng
308            elif len(codec) == leng:    # full
309                lidx[ind][4] = list(range(leng))
310            else:
311                raise NtvError('impossible to generate keys')
312            return
313        if keys and len(keys) > 1 and parent is None:  #complete
314            return
315        if coef:  #primary
316            lidx[ind][4] = [(ikey % (coef * len(codec))) // coef for ikey in range(leng)]
317            lidx[ind][3] = None
318            return  
319        if parent is None:
320            raise NtvError('keys not referenced')          
321        if not lidx[parent][4] or len(lidx[parent][4]) != leng:
322            NtvConnector.init_ntv_keys(parent, lidx, leng)
323        if not keys and len(codec) == len(lidx[parent][2]):    # implicit
324            lidx[ind][4] = lidx[parent][4]
325            lidx[ind][3] = None
326            return
327        lidx[ind][4] = NtvConnector.keysfromderkeys(lidx[parent][4], keys)  # relative
328        #lidx[ind][4] = [keys[pkey] for pkey in lidx[parent][4]]  # relative
329        lidx[ind][3] = None
330        return    

The NtvConnector class is an abstract class used by all NTV connectors for conversion between NTV-JSON data and NTV-OBJ data.

A NtvConnector child is defined by:

  • clas_obj: str - define the class name of the object to convert
  • clas_typ: str - define the NTVtype of the converted object
  • to_obj_ntv: method - converter from JsonNTV to the object
  • to_json_ntv: method - converter from the object to JsonNTV

class method :

abstract method

static method

castable

return a list with class_name allowed for json-obj conversion

dic_obj

return a dict with the connectors: { type: class_connec_name }

dic_type

return a dict with the connectors: { class_obj_name: type }

@classmethod
def connector(cls):
91    @classmethod
92    def connector(cls):
93        '''return a dict with the connectors: { class_connec_name: class_connec }'''
94        return {clas.__name__: clas for clas in cls.__subclasses__()}

return a dict with the connectors: { class_connec_name: class_connec }

@classmethod
def dic_connec(cls):
 96    @classmethod
 97    def dic_connec(cls):
 98        '''return a dict with the clas associated to the connector:
 99        { class_obj_name: class_connec_name }'''
100        return {clas.clas_obj: clas.__name__ for clas in cls.__subclasses__()}

return a dict with the clas associated to the connector: { class_obj_name: class_connec_name }

@staticmethod
@abstractmethod
def to_obj_ntv(ntv_value, **kwargs):
102    @staticmethod
103    @abstractmethod
104    def to_obj_ntv(ntv_value, **kwargs):
105        ''' abstract - convert ntv_value into the return object'''

abstract - convert ntv_value into the return object

@staticmethod
@abstractmethod
def to_json_ntv(value, name=None, typ=None):
107    @staticmethod
108    @abstractmethod
109    def to_json_ntv(value, name=None, typ=None):
110        ''' abstract - convert NTV object (value, name, type) into the NTV json
111        (json-value, name, type)'''

abstract - convert NTV object (value, name, type) into the NTV json (json-value, name, type)

@staticmethod
def cast(data, name=None, type_str=None):
113    @staticmethod
114    def cast(data, name=None, type_str=None):
115        '''return JSON-NTV conversion (json_value, name, type_str) of the NTV entity
116        defined in parameters.
117
118        *Parameters*
119
120        - **data**: NtvSingle entity or NTVvalue of the NTV entity
121        - **name** : String (default None) - name of the NTV entity
122        - **type_str**: String (default None) - type of the NTV entity
123        '''
124        clas = data.__class__.__name__
125        if clas == 'NtvSingle':
126            name = data.ntv_name
127            type_str = data.type_str
128            data = data.ntv_value
129        dic_geo_cl = NtvConnector.DIC_GEO_CL
130        dic_connec = NtvConnector.dic_connec()
131        match clas:
132            case 'int' | 'float' | 'bool' | 'str':
133                return (data, name, type_str)
134            case 'dict':
135                return ({key: NtvConnector.cast(val, name, type_str)[0]
136                         for key, val in data.items()}, name, type_str)
137            case 'list':
138                return ([NtvConnector.cast(val, name, type_str)[0] for val in data],
139                        name, NtvConnector._typ_obj(data) if not type_str else type_str)
140            case 'tuple':
141                return (list(data), name, 'array' if not type_str else type_str)
142            case 'date' | 'time' | 'datetime':
143                return (data.isoformat(), name, clas if not type_str else type_str)
144            case 'Point' | 'MultiPoint' | 'LineString' | 'MultiLineString' | \
145                    'Polygon' | 'MultiPolygon':
146                return (NtvConnector.connector()[dic_connec['geometry']].to_json_ntv(data)[0],
147                        name, dic_geo_cl[data.__class__.__name__] if not type_str else type_str)
148            case 'NtvSingle' | 'NtvList':
149                return (data.to_obj(), name, 'ntv' if not type_str else type_str)
150            case _:
151                connec = None
152                if clas in dic_connec and dic_connec[clas] in NtvConnector.connector():
153                    connec = NtvConnector.connector()[dic_connec[clas]]
154                if connec:
155                    return connec.to_json_ntv(data, name, type_str)
156                raise NtvError(
157                    'connector is not defined for the class : ', clas)
158        return (data, name, type_str)

return JSON-NTV conversion (json_value, name, type_str) of the NTV entity defined in parameters.

Parameters

  • data: NtvSingle entity or NTVvalue of the NTV entity
  • name : String (default None) - name of the NTV entity
  • type_str: String (default None) - type of the NTV entity
@staticmethod
def uncast(value, name=None, type_str=None, **kwargs):
160    @staticmethod
161    def uncast(value, name=None, type_str=None, **kwargs):
162        '''return OBJ-NTV conversion (obj_value, name, type_str) of a NTV entity
163
164        *Parameters*
165
166        - **data**: NtvSingle entity or NTVvalue of the NTV entity
167        - **name** : String (default None) - name of the NTV entity
168        - **type_str**: String (default None) - type of the NTV entity
169        '''
170        if type_str == 'json':
171            return (value, name, type_str)
172        if value.__class__.__name__ == 'NtvSingle':
173            if not (type_str in set(NtvConnector.dic_type.values()) and 
174                    NtvConnector.is_json(value) or type_str is None):
175                return (value.ntv_value, value.name, value.type_str)
176            type_str = value.type_str if value.ntv_type else None
177            name = value.ntv_name
178            value = value.ntv_value
179        #dic_obj = NtvConnector.dic_obj
180        option = {'dicobj': {}, 'format': 'json', 'type_obj': False} | kwargs
181        value_obj = NtvConnector._uncast_val(value, type_str, **option)
182        return (value_obj, name, type_str if type_str else NtvConnector._typ_obj(value_obj))

return OBJ-NTV conversion (obj_value, name, type_str) of a NTV entity

Parameters

  • data: NtvSingle entity or NTVvalue of the NTV entity
  • name : String (default None) - name of the NTV entity
  • type_str: String (default None) - type of the NTV entity
@staticmethod
def is_json_class(val):
231    @staticmethod
232    def is_json_class(val):
233        ''' return True if val is a json type'''
234        return val is None or isinstance(val, (list, int, str, float, bool, dict))

return True if val is a json type

@staticmethod
def is_json(obj):
236    @staticmethod
237    def is_json(obj):
238        ''' check if obj is a json structure and return True if obj is a json-value
239
240        *Parameters*
241
242        - **obj** : object to check
243        - **ntvobj** : boolean (default False) - if True NTV class value are accepted'''
244        if obj is None:
245            return True
246        is_js = NtvConnector.is_json
247        match obj:
248            case str() | int() | float() | bool() as obj:
249                return True
250            case list() | tuple() as obj:
251                if not obj:
252                    return True
253                return min(is_js(obj_in) for obj_in in obj)
254            case dict() as obj:
255                if not obj:
256                    return True
257                if not min(isinstance(key, str) for key in obj.keys()):
258                    raise NtvError('key in dict in not string')
259                return min(is_js(obj_in) for obj_in in obj.values())
260            case _:
261                if not obj.__class__.__name__ in NtvConnector.castable:
262                    raise NtvError(obj.__class__.__name__ +
263                                   'is not valid for NTV')
264                return False

check if obj is a json structure and return True if obj is a json-value

Parameters

  • obj : object to check
  • ntvobj : boolean (default False) - if True NTV class value are accepted
@staticmethod
def keysfromderkeys(parentkeys, derkeys):
266    @staticmethod
267    def keysfromderkeys(parentkeys, derkeys):
268        '''return keys from parent keys and derkeys
269
270        *Parameters*
271
272        - **parentkeys** : list of keys from parent
273        - **derkeys** : list of derived keys
274
275        *Returns* : list of keys'''
276        return [derkeys[pkey] for pkey in parentkeys]

return keys from parent keys and derkeys

Parameters

  • parentkeys : list of keys from parent
  • derkeys : list of derived keys

Returns : list of keys

@staticmethod
def encode_coef(lis):
278    @staticmethod 
279    def encode_coef(lis):
280        '''Generate a repetition coefficient for periodic list'''
281        if len(lis) < 2:
282            return 0
283        coef = 1
284        while coef != len(lis):
285            if lis[coef-1] != lis[coef]:
286                break
287            coef += 1
288        if (not len(lis) % (coef * (max(lis) + 1)) and 
289            lis == NtvConnector.keysfromcoef(coef, max(lis) + 1, len(lis))):
290            return coef
291        return 0

Generate a repetition coefficient for periodic list

@staticmethod
def keysfromcoef(coef, period, leng=None):
293    @staticmethod 
294    def keysfromcoef(coef, period, leng=None):
295        ''' return a list of keys with periodic structure'''
296        if not leng:
297            leng = coef * period
298        return None if not (coef and period) else [(ind % (coef * period)) // coef 
299                                                   for ind in range(leng)]    

return a list of keys with periodic structure

@staticmethod
def init_ntv_keys(ind, lidx, leng):
300    @staticmethod
301    def init_ntv_keys(ind, lidx, leng):
302        ''' initialization of explicit keys data in lidx object of tabular data'''
303        # name: 0, type: 1, codec: 2, parent: 3, keys: 4, coef: 5, leng: 6
304        name, typ, codec, parent, keys, coef, length = lidx[ind]
305        if (keys, parent, coef) == (None, None, None):  # full or unique
306            if len(codec) == 1: # unique
307                lidx[ind][4] = [0] * leng
308            elif len(codec) == leng:    # full
309                lidx[ind][4] = list(range(leng))
310            else:
311                raise NtvError('impossible to generate keys')
312            return
313        if keys and len(keys) > 1 and parent is None:  #complete
314            return
315        if coef:  #primary
316            lidx[ind][4] = [(ikey % (coef * len(codec))) // coef for ikey in range(leng)]
317            lidx[ind][3] = None
318            return  
319        if parent is None:
320            raise NtvError('keys not referenced')          
321        if not lidx[parent][4] or len(lidx[parent][4]) != leng:
322            NtvConnector.init_ntv_keys(parent, lidx, leng)
323        if not keys and len(codec) == len(lidx[parent][2]):    # implicit
324            lidx[ind][4] = lidx[parent][4]
325            lidx[ind][3] = None
326            return
327        lidx[ind][4] = NtvConnector.keysfromderkeys(lidx[parent][4], keys)  # relative
328        #lidx[ind][4] = [keys[pkey] for pkey in lidx[parent][4]]  # relative
329        lidx[ind][3] = None
330        return    

initialization of explicit keys data in lidx object of tabular data

class NtvTree:
332class NtvTree:
333    ''' The NtvTree class is an iterator class used to traverse a NTV tree structure.
334    Some other methods give tree indicators and data.
335
336    *Attributes :*
337
338    - **ntv** : Ntv entity
339    - **_node**:  Ntv entity - node pointer
340    - **_stack**:  list - stack used to store context in recursive methods 
341
342    *dynamic values (@property)*
343    - `breadth`
344    - `size`
345    - `height`
346    - `adjacency_list`
347    - `nodes`
348    - `leaf_nodes`
349    - `inner_nodes`
350    '''
351
352    def __init__(self, ntv):
353        ''' the parameter of the constructor is the Ntv entity'''
354        self._ntv = ntv
355        self._node = None
356        self._stack = []
357
358    def __iter__(self):
359        ''' iterator without initialization'''
360        return self
361
362    def __next__(self):
363        ''' return next node in the tree'''
364        if self._node is None:
365            self._node = self._ntv
366        elif len(self._node) == 0:
367            raise StopIteration
368        elif self._node.__class__.__name__ == 'NtvList':
369            self._next_down()
370        else:
371            self._next_up()
372        return self._node
373
374    def _next_down(self):
375        ''' find the next subchild node'''
376
377        self._node = self._node[0]
378        self._stack.append(0)
379
380    def _next_up(self):
381        ''' find the next sibling or ancestor node'''
382        parent = self._node.parent
383        if not parent or self._node == self._ntv:
384            raise StopIteration
385        ind = self._stack[-1]
386        if ind < len(parent) - 1:  # if ind is not the last
387            self._node = parent[ind + 1]
388            self._stack[-1] += 1
389        else:
390            if parent == self._ntv:
391                raise StopIteration
392            self._node = parent
393            self._stack.pop()
394            self._next_up()
395            
396    @property
397    def breadth(self):
398        ''' return the number of leaves'''
399        return len(self.leaf_nodes)
400
401    @property
402    def size(self):
403        ''' return the number of nodes'''
404        return len(self.nodes)
405
406    @property
407    def height(self):
408        ''' return the height of the tree'''
409        return max(len(node.pointer()) for node in self.__class__(self._ntv))
410
411    @property
412    def adjacency_list(self):
413        ''' return a dict with the list of child nodes for each parent node'''
414        return {node: node.val for node in self.inner_nodes}
415
416    @property
417    def nodes(self):
418        ''' return the list of nodes according to the DFS preordering algorithm'''
419        return list(self.__class__(self._ntv))
420
421    @property
422    def dic_nodes(self):
423        ''' return a dict of nodes according to the DFS preordering algorithm'''
424        return {node.ntv_name: node for node in self.__class__(self._ntv)
425                if node.ntv_name}
426
427    @property
428    def leaf_nodes(self):
429        ''' return the list of leaf nodes according to the DFS preordering algorithm'''
430        #return [node for node in self.__class__(self._ntv) if not isinstance(node, NtvList)]
431        return [node for node in self.__class__(self._ntv)
432                if node.__class__.__name__ == 'NtvSingle']
433
434    @property
435    def inner_nodes(self):
436        ''' return the list of inner nodes according to the DFS preordering algorithm'''
437        return [node for node in self.__class__(self._ntv)
438                if node.__class__.__name__ == 'NtvList']

The NtvTree class is an iterator class used to traverse a NTV tree structure. Some other methods give tree indicators and data.

Attributes :

  • ntv : Ntv entity
  • _node: Ntv entity - node pointer
  • _stack: list - stack used to store context in recursive methods

dynamic values (@property)

NtvTree(ntv)
352    def __init__(self, ntv):
353        ''' the parameter of the constructor is the Ntv entity'''
354        self._ntv = ntv
355        self._node = None
356        self._stack = []

the parameter of the constructor is the Ntv entity

breadth

return the number of leaves

size

return the number of nodes

height

return the height of the tree

adjacency_list

return a dict with the list of child nodes for each parent node

nodes

return the list of nodes according to the DFS preordering algorithm

dic_nodes

return a dict of nodes according to the DFS preordering algorithm

leaf_nodes

return the list of leaf nodes according to the DFS preordering algorithm

inner_nodes

return the list of inner nodes according to the DFS preordering algorithm

class NtvJsonEncoder(json.encoder.JSONEncoder):
443class NtvJsonEncoder(json.JSONEncoder):
444    """json encoder for Ntv data"""
445
446    def default(self, o):
447        try:
448            return NtvConnector.cast(o)[0]
449        except NtvError:
450            return json.JSONEncoder.default(self, o)
451        if isinstance(o, (datetime.datetime, datetime.date, datetime.time)):
452            return o.isoformat()
453        return json.JSONEncoder.default(self, o)

json encoder for Ntv data

def default(self, o):
446    def default(self, o):
447        try:
448            return NtvConnector.cast(o)[0]
449        except NtvError:
450            return json.JSONEncoder.default(self, o)
451        if isinstance(o, (datetime.datetime, datetime.date, datetime.time)):
452            return o.isoformat()
453        return json.JSONEncoder.default(self, o)

Implement this method in a subclass such that it returns a serializable object for o, or calls the base implementation (to raise a TypeError).

For example, to support arbitrary iterators, you could implement default like this::

def default(self, o):
    try:
        iterable = iter(o)
    except TypeError:
        pass
    else:
        return list(iterable)
    # Let the base class default method raise the TypeError
    return JSONEncoder.default(self, o)
Inherited Members
json.encoder.JSONEncoder
JSONEncoder
encode
iterencode
class NtvError(builtins.Exception):
456class NtvError(Exception):
457    ''' NTV Exception'''
458    # pass

NTV Exception

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback