NTV.json_ntv.ntv_util

Created on Feb 27 2023

@author: Philippe@loco-labs.io

The ntv_util module is part of the NTV.json_ntv package (specification document).

It contains the classes NtvUtil, NtvConnector, NtvTree, NtvJsonEncoder and NtvError for NTV entities.

  1# -*- coding: utf-8 -*-
  2"""
  3Created on Feb 27 2023
  4
  5@author: Philippe@loco-labs.io
  6
  7The `ntv_util` module is part of the `NTV.json_ntv` package ([specification document](
  8https://github.com/loco-philippe/NTV/blob/main/documentation/JSON-NTV-standard.pdf)).
  9
 10It contains the classes `NtvUtil`, `NtvConnector`, `NtvTree`, `NtvJsonEncoder`
 11and `NtvError` for NTV entities.
 12"""
 13from abc import ABC, abstractmethod
 14import datetime
 15import json
 16import re
 17
 18
 19class NtvUtil:
 20    ''' The NtvUtil class includes static methods used by several NTV classes.
 21    NtvUtil is the parent class of `Datatype`, `Namespace`, `Ntv`.
 22
 23    *class variables :*
 24    - **_namespaces_** : list of Namespace defined
 25    - **_types_** : list of Datatype defined
 26
 27    *static methods :*
 28    - `from_obj_name`
 29    - `decode_ntv_tab`
 30    - `to_ntv_pointer`
 31
 32    '''
 33    _namespaces_ = {}
 34    _types_ = {}
 35
 36    @staticmethod
 37    def from_obj_name(string):
 38        '''return a tuple with name, type_str and separator from string'''
 39        if not isinstance(string, str):
 40            raise NtvError('a json-name have to be str')
 41        if string == '':
 42            return (None, None, None)
 43        spl = string.rsplit(':', maxsplit=1)
 44        if len(spl) == 1:
 45            if string[-1] == '.':
 46                return (None, string, None)
 47            return(string, None, None)
 48        if spl[0] == '':
 49            return (None, spl[1], ':')
 50        if spl[0][-1] == ':':
 51            sp0 = spl[0][:-1]
 52            return (None if sp0 == '' else sp0, None if spl[1] == '' else spl[1], '::')
 53        return (None if spl[0] == '' else spl[0], None if spl[1] == '' else spl[1], ':')
 54
 55    @staticmethod
 56    def decode_ntv_tab(ntv, ntv_to_val):
 57        '''Generate a tuple data from a Ntv tab value (bytes, string, json, Ntv object)
 58
 59        *parameters:*
 60
 61        - **ntv**: Ntv data to decode,
 62        - **ntv_to_val**: method to convert external value form ntv in internal Field value
 63
 64        *Returns tuple: (name, dtype, codec, parent, keys, coef, leng)*
 65
 66        - name (None or string): name of the Field
 67        - dtype (None or string): type of data
 68        - codec (list): list of Field codec values
 69        - parent (None or int): Field parent or None
 70        - keys (None or list): Field keys
 71        - coef (None or int): coef if primary Field else None
 72        - leng (int): length of the Field
 73        '''
 74        typ = ntv.type_str if ntv.ntv_type else None
 75        nam = ntv.name
 76        val = ntv_to_val(ntv)
 77        if ntv.__class__.__name__ == 'NtvSingle':
 78            return (nam, typ, [val], None, None, None, 1)
 79        if len(ntv) < 2 or len(ntv) > 3 or ntv[0].__class__.__name__ == 'NtvSingle':
 80            return (nam, typ, val, None, None, None, len(ntv))
 81
 82        ntvc = ntv[0]
 83        leng = max(len(ind) for ind in ntv)
 84        typc = ntvc.type_str if ntvc.ntv_type else None
 85        valc = ntv_to_val(ntvc)
 86        if len(ntv) == 3 and ntv[1].__class__.__name__ == 'NtvSingle' and \
 87                isinstance(ntv[1].val, (int, str)) and \
 88                ntv[2].__class__.__name__ != 'NtvSingle' and \
 89                isinstance(ntv[2][0].val, int):
 90            return (nam, typc, valc, ntv[1].val, ntv[2].to_obj(), None, leng)
 91        if len(ntv) == 2 and len(ntv[1]) == 1 and isinstance(ntv[1].val, (int, str)):
 92            return (nam, typc, valc, ntv[1].val, None, None, leng)
 93        if len(ntv) == 2 and len(ntv[1]) == 1 and isinstance(ntv[1].val, list):
 94            leng = leng * ntv[1][0].val
 95            return (nam, typc, valc, None, None, ntv[1][0].val, leng)
 96        if len(ntv) == 2 and len(ntv[1]) > 1 and isinstance(ntv[1][0].val, int):
 97            return (nam, typc, valc, None, ntv[1].to_obj(), None, leng)
 98        return (nam, typ, val, None, None, None, len(ntv))
 99
100    @staticmethod
101    def to_ntvpointer(jsonpointer, unique_root=False):
102        '''convert a json pointer inter a NTV pointer (string)
103
104        *parameters:*
105
106        - **jsonpointer**: String - json pointer to convert,
107        - **unique_root**: Boolean (default False) - True if the json root length is 1 '''
108        single = '/([0-9]+)(/[a-z])'
109        if unique_root and not ('0' <= jsonpointer[1] <= '9'):
110            return re.sub(single, '\g<2>', jsonpointer)[1:]
111        return re.sub(single, '\g<2>', jsonpointer)
112
113
114class NtvConnector(ABC):
115    ''' The NtvConnector class is an abstract class used by all NTV connectors
116    for conversion between NTV-JSON data and NTV-OBJ data.
117
118    A NtvConnector child is defined by:
119    - clas_obj: str - define the class name of the object to convert
120    - clas_typ: str - define the Datatype of the converted object
121    - to_obj_ntv: method - converter from JsonNTV to the object
122    - to_json_ntv: method - converter from the object to JsonNTV
123
124    *class method :*
125    - `connector`
126    - `dic_connec`
127    - `castable` (@property)
128    - `dic_obj` (@property)
129    - `dic_type` (@property)
130
131    *abstract method*
132    - `to_obj_ntv`
133    - `to_json_ntv`
134
135    *static method*
136    - `cast`
137    - `uncast`
138    - `is_json_class`
139    - `is_json`
140    - `init_ntv_keys`
141    '''
142
143    DIC_NTV_CL = {'NtvSingle': 'ntv', 'NtvList': 'ntv'}
144    DIC_GEO_CL = {'Point': 'point', 'MultiPoint': 'multipoint', 'LineString': 'line',
145                  'MultiLineString': 'multiline', 'Polygon': 'polygon',
146                  'MultiPolygon': 'multipolygon'}
147    DIC_DAT_CL = {'date': 'date', 'time': 'time', 'datetime': 'datetime'}
148    DIC_FCT = {'date': datetime.date.fromisoformat, 'time': datetime.time.fromisoformat,
149               'datetime': datetime.datetime.fromisoformat}
150    DIC_GEO = {'point': 'point', 'multipoint': 'multipoint', 'line': 'linestring',
151               'multiline': 'multilinestring', 'polygon': 'polygon',
152               'multipolygon': 'multipolygon'}
153    DIC_CBOR = {'point': False, 'multipoint': False, 'line': False,
154                'multiline': False, 'polygon': False, 'multipolygon': False,
155                'date': True, 'time': False, 'datetime': True}
156    DIC_OBJ = {'tab': 'DataFrameConnec', 'field': 'SeriesConnec',
157               'point': 'ShapelyConnec', 'multipoint': 'ShapelyConnec',
158               'line': 'ShapelyConnec', 'multiline': 'ShapelyConnec',
159               'polygon': 'ShapelyConnec', 'multipolygon': 'ShapelyConnec',
160               'other': None}
161
162    @classmethod
163    @property
164    def castable(cls):
165        '''return a list with class_name allowed for json-obj conversion'''
166        return ['str', 'int', 'bool', 'float', 'dict', 'tuple', 'NoneType',
167                'NtvSingle', 'NtvList'] \
168            + list(NtvConnector.DIC_GEO_CL.keys()) \
169            + list(NtvConnector.DIC_FCT.keys()) \
170            + list(NtvConnector.dic_connec().keys())
171
172    @classmethod
173    @property
174    def dic_obj(cls):
175        '''return a dict with the connectors: { type: class_connec_name }'''
176        return {clas.clas_typ: clas.__name__ for clas in cls.__subclasses__()} |\
177            NtvConnector.DIC_OBJ
178
179    @classmethod
180    @property
181    def dic_type(cls):
182        '''return a dict with the connectors: { class_obj_name: type }'''
183        return {clas.clas_obj: clas.clas_typ for clas in cls.__subclasses__()} |\
184            NtvConnector.DIC_GEO_CL | NtvConnector.DIC_DAT_CL | NtvConnector.DIC_NTV_CL
185
186    @classmethod
187    def connector(cls):
188        '''return a dict with the connectors: { class_connec_name: class_connec }'''
189        return {clas.__name__: clas for clas in cls.__subclasses__()}
190
191    @classmethod
192    def dic_connec(cls):
193        '''return a dict with the clas associated to the connector:
194        { class_obj_name: class_connec_name }'''
195        return {clas.clas_obj: clas.__name__ for clas in cls.__subclasses__()}
196
197    @staticmethod
198    @abstractmethod
199    def to_obj_ntv(ntv_value, **kwargs):
200        ''' abstract - convert ntv_value into the return object'''
201
202    @staticmethod
203    @abstractmethod
204    def to_json_ntv(value, name=None, typ=None):
205        ''' abstract - convert NTV object (value, name, type) into the NTV json
206        (json-value, name, type)'''
207
208    @staticmethod
209    def cast(data, name=None, type_str=None):
210        '''return JSON-NTV conversion (json_value, name, type_str) of the NTV entity
211        defined in parameters.
212
213        *Parameters*
214
215        - **data**: NtvSingle entity or NTVvalue of the NTV entity
216        - **name** : String (default None) - name of the NTV entity
217        - **type_str**: String (default None) - type of the NTV entity
218        '''
219        clas = data.__class__.__name__
220        if clas == 'NtvSingle':
221            name = data.ntv_name
222            type_str = data.type_str
223            data = data.ntv_value
224        dic_geo_cl = NtvConnector.DIC_GEO_CL
225        dic_connec = NtvConnector.dic_connec()
226        match clas:
227            case 'int' | 'float' | 'bool' | 'str':
228                return (data, name, type_str)
229            case 'dict':
230                return ({key: NtvConnector.cast(val, name, type_str)[0]
231                         for key, val in data.items()}, name, type_str)
232            case 'list':
233                return ([NtvConnector.cast(val, name, type_str)[0] for val in data],
234                        name, NtvConnector._typ_obj(data) if not type_str else type_str)
235            case 'tuple':
236                return (list(data), name, 'array' if not type_str else type_str)
237            case 'date' | 'time' | 'datetime':
238                return (data.isoformat(), name, clas if not type_str else type_str)
239            case 'Point' | 'MultiPoint' | 'LineString' | 'MultiLineString' | \
240                    'Polygon' | 'MultiPolygon':
241                return (NtvConnector.connector()[dic_connec['geometry']].to_json_ntv(data)[0],
242                        name, dic_geo_cl[data.__class__.__name__] if not type_str else type_str)
243            case 'NtvSingle' | 'NtvList':
244                return (data.to_obj(), name, 'ntv' if not type_str else type_str)
245            case _:
246                connec = None
247                if clas in dic_connec and dic_connec[clas] in NtvConnector.connector():
248                    connec = NtvConnector.connector()[dic_connec[clas]]
249                if connec:
250                    return connec.to_json_ntv(data, name, type_str)
251                raise NtvError(
252                    'connector is not defined for the class : ', clas)
253        return (data, name, type_str)
254
255    @staticmethod
256    def uncast(value, name=None, type_str=None, **kwargs):
257        '''return OBJ-NTV conversion (obj_value, name, type_str) of a NTV entity
258
259        *Parameters*
260
261        - **data**: NtvSingle entity or NTVvalue of the NTV entity
262        - **name** : String (default None) - name of the NTV entity
263        - **type_str**: String (default None) - type of the NTV entity
264        '''
265        if type_str == 'json':
266            return (value, name, type_str)
267        if value.__class__.__name__ == 'NtvSingle':
268            if not (type_str in set(NtvConnector.dic_type.values()) and
269                    NtvConnector.is_json(value) or type_str is None):
270                return (value.ntv_value, value.name, value.type_str)
271            type_str = value.type_str if value.ntv_type else None
272            name = value.ntv_name
273            value = value.ntv_value
274        option = {'dicobj': {}, 'format': 'json', 'type_obj': False} | kwargs
275        value_obj = NtvConnector._uncast_val(value, type_str, **option)
276        return (value_obj, name, type_str if type_str else NtvConnector._typ_obj(value_obj))
277
278    @staticmethod
279    def _typ_obj(value):
280        if isinstance(value, dict):
281            return NtvConnector._typ_obj(list(value.values()))
282        if isinstance(value, (tuple, list)):
283            for val in value:
284                typ = NtvConnector._typ_obj(val)
285                if typ:
286                    return typ
287            return None
288        return NtvConnector.dic_type.get(value.__class__.__name__)
289
290    @staticmethod
291    def _uncast_val(value, type_n, **option):
292        '''return value from ntv value'''
293        dic_fct = NtvConnector.DIC_FCT
294        dic_geo = NtvConnector.DIC_GEO
295        dic_obj = NtvConnector.dic_obj | option['dicobj']
296        dic_cbor = NtvConnector.DIC_CBOR
297        if not type_n or type_n == 'json' or (option['format'] == 'cbor' and
298                                              not dic_cbor.get(type_n, False)):
299            return value
300        if type_n in dic_fct:
301            if isinstance(value, (tuple, list)):
302                return [NtvConnector._uncast_val(val, type_n, **option) for val in value]
303            if isinstance(value, dict):
304                return {key: NtvConnector._uncast_val(val, type_n, **option)
305                        for key, val in value.items()}
306            return dic_fct[type_n](value)
307        if type_n == 'array':
308            return tuple(value)
309        if type_n == 'ntv':
310            from json_ntv.ntv import Ntv
311            return Ntv.from_obj(value)
312        if type_n in dic_geo:
313            option['type_geo'] = dic_geo[type_n]
314        connec = None
315        if type_n in dic_obj and \
316                dic_obj[type_n] in NtvConnector.connector():
317            connec = NtvConnector.connector()[dic_obj[type_n]]
318        elif dic_obj['other'] in NtvConnector.connector():
319            connec = NtvConnector.connector()['other']
320        if connec:
321            return connec.to_obj_ntv(value, **option)
322        return value
323
324    @staticmethod
325    def is_json_class(val):
326        ''' return True if val is a json type'''
327        return val is None or isinstance(val, (list, int, str, float, bool, dict))
328
329    @staticmethod
330    def is_json(obj):
331        ''' check if obj is a json structure and return True if obj is a json-value
332
333        *Parameters*
334
335        - **obj** : object to check
336        - **ntvobj** : boolean (default False) - if True NTV class value are accepted'''
337        if obj is None:
338            return True
339        is_js = NtvConnector.is_json
340        match obj:
341            case str() | int() | float() | bool() as obj:
342                return True
343            case list() | tuple() as obj:
344                if not obj:
345                    return True
346                return min(is_js(obj_in) for obj_in in obj)
347            case dict() as obj:
348                if not obj:
349                    return True
350                if not min(isinstance(key, str) for key in obj.keys()):
351                    raise NtvError('key in dict in not string')
352                return min(is_js(obj_in) for obj_in in obj.values())
353            case _:
354                if not obj.__class__.__name__ in NtvConnector.castable:
355                    raise NtvError(obj.__class__.__name__ +
356                                   ' is not valid for NTV')
357                return False
358
359    @staticmethod
360    def keysfromderkeys(parentkeys, derkeys):
361        '''return keys from parent keys and derkeys
362
363        *Parameters*
364
365        - **parentkeys** : list of keys from parent
366        - **derkeys** : list of derived keys
367
368        *Returns* : list of keys'''
369        return [derkeys[pkey] for pkey in parentkeys]
370
371    @staticmethod
372    def encode_coef(lis):
373        '''Generate a repetition coefficient for periodic list'''
374        if len(lis) < 2:
375            return 0
376        coef = 1
377        while coef != len(lis):
378            if lis[coef-1] != lis[coef]:
379                break
380            coef += 1
381        if (not len(lis) % (coef * (max(lis) + 1)) and
382                lis == NtvConnector.keysfromcoef(coef, max(lis) + 1, len(lis))):
383            return coef
384        return 0
385
386    @staticmethod
387    def keysfromcoef(coef, period, leng=None):
388        ''' return a list of keys with periodic structure'''
389        if not leng:
390            leng = coef * period
391        return None if not (coef and period) else [(ind % (coef * period)) // coef
392                                                   for ind in range(leng)]
393
394    @staticmethod
395    def init_ntv_keys(ind, lidx, leng):
396        ''' initialization of explicit keys data in lidx object of tabular data'''
397        # name: 0, type: 1, codec: 2, parent: 3, keys: 4, coef: 5, leng: 6
398        name, typ, codec, parent, keys, coef, length = lidx[ind]
399        if (keys, parent, coef) == (None, None, None):  # full or unique
400            if len(codec) == 1:  # unique
401                lidx[ind][4] = [0] * leng
402            elif len(codec) == leng:    # full
403                lidx[ind][4] = list(range(leng))
404            else:
405                raise NtvError('impossible to generate keys')
406            return
407        if keys and len(keys) > 1 and parent is None:  # complete
408            return
409        if coef:  # primary
410            lidx[ind][4] = [(ikey % (coef * len(codec))) //
411                            coef for ikey in range(leng)]
412            lidx[ind][3] = None
413            return
414        if parent is None:
415            raise NtvError('keys not referenced')
416        if not lidx[parent][4] or len(lidx[parent][4]) != leng:
417            NtvConnector.init_ntv_keys(parent, lidx, leng)
418        if not keys and len(codec) == len(lidx[parent][2]):    # implicit
419            lidx[ind][4] = lidx[parent][4]
420            lidx[ind][3] = None
421            return
422        lidx[ind][4] = NtvConnector.keysfromderkeys(
423            lidx[parent][4], keys)  # relative
424        lidx[ind][3] = None
425        return
426
427
428class NtvTree:
429    ''' The NtvTree class is an iterator class used to traverse a NTV tree structure.
430    Some other methods give tree indicators and data.
431
432    *Attributes :*
433
434    - **ntv** : Ntv entity
435    - **_node**:  Ntv entity - node pointer
436    - **_stack**:  list - stack used to store context in recursive methods
437
438    *dynamic values (@property)*
439    - `breadth`
440    - `size`
441    - `height`
442    - `adjacency_list`
443    - `nodes`
444    - `dic_nodes`
445    - `leaf_nodes`
446    - `inner_nodes`
447    '''
448
449    def __init__(self, ntv):
450        ''' the parameter of the constructor is the Ntv entity'''
451        self._ntv = ntv
452        self._node = None
453        self._stack = []
454
455    def __iter__(self):
456        ''' iterator without initialization'''
457        return self
458
459    def __next__(self):
460        ''' return next node in the tree'''
461        if self._node is None:
462            self._node = self._ntv
463        elif len(self._node) == 0:
464            raise StopIteration
465        elif self._node.__class__.__name__ == 'NtvList':
466            self._next_down()
467        else:
468            self._next_up()
469        return self._node
470
471    def _next_down(self):
472        ''' find the next subchild node'''
473
474        self._node = self._node[0]
475        self._stack.append(0)
476
477    def _next_up(self):
478        ''' find the next sibling or ancestor node'''
479        parent = self._node.parent
480        if not parent or self._node == self._ntv:
481            raise StopIteration
482        ind = self._stack[-1]
483        if ind < len(parent) - 1:  # if ind is not the last
484            self._node = parent[ind + 1]
485            self._stack[-1] += 1
486        else:
487            if parent == self._ntv:
488                raise StopIteration
489            self._node = parent
490            self._stack.pop()
491            self._next_up()
492
493    @property
494    def breadth(self):
495        ''' return the number of leaves'''
496        return len(self.leaf_nodes)
497
498    @property
499    def size(self):
500        ''' return the number of nodes'''
501        return len(self.nodes)
502
503    @property
504    def height(self):
505        ''' return the height of the tree'''
506        return max(len(node.pointer()) for node in self.__class__(self._ntv)) - 1
507
508    @property
509    def adjacency_list(self):
510        ''' return a dict with the list of child nodes for each parent node'''
511        return {node: node.val for node in self.inner_nodes}
512
513    @property
514    def nodes(self):
515        ''' return the list of nodes according to the DFS preordering algorithm'''
516        return list(self.__class__(self._ntv))
517
518    @property
519    def dic_nodes(self):
520        ''' return a dict of nodes according to the DFS preordering algorithm'''
521        return {node.ntv_name: node for node in self.__class__(self._ntv)
522                if node.ntv_name}
523
524    @property
525    def leaf_nodes(self):
526        ''' return the list of leaf nodes according to the DFS preordering algorithm'''
527        return [node for node in self.__class__(self._ntv)
528                if node.__class__.__name__ == 'NtvSingle']
529
530    @property
531    def inner_nodes(self):
532        ''' return the list of inner nodes according to the DFS preordering algorithm'''
533        return [node for node in self.__class__(self._ntv)
534                if node.__class__.__name__ == 'NtvList']
535
536
537class NtvJsonEncoder(json.JSONEncoder):
538    """json encoder for Ntv data"""
539
540    def default(self, o):
541        try:
542            return NtvConnector.cast(o)[0]
543        except NtvError:
544            return json.JSONEncoder.default(self, o)
545        if isinstance(o, (datetime.datetime, datetime.date, datetime.time)):
546            return o.isoformat()
547        return json.JSONEncoder.default(self, o)
548
549
550class NtvError(Exception):
551    ''' NTV Exception'''
class NtvUtil:
 20class NtvUtil:
 21    ''' The NtvUtil class includes static methods used by several NTV classes.
 22    NtvUtil is the parent class of `Datatype`, `Namespace`, `Ntv`.
 23
 24    *class variables :*
 25    - **_namespaces_** : list of Namespace defined
 26    - **_types_** : list of Datatype defined
 27
 28    *static methods :*
 29    - `from_obj_name`
 30    - `decode_ntv_tab`
 31    - `to_ntv_pointer`
 32
 33    '''
 34    _namespaces_ = {}
 35    _types_ = {}
 36
 37    @staticmethod
 38    def from_obj_name(string):
 39        '''return a tuple with name, type_str and separator from string'''
 40        if not isinstance(string, str):
 41            raise NtvError('a json-name have to be str')
 42        if string == '':
 43            return (None, None, None)
 44        spl = string.rsplit(':', maxsplit=1)
 45        if len(spl) == 1:
 46            if string[-1] == '.':
 47                return (None, string, None)
 48            return(string, None, None)
 49        if spl[0] == '':
 50            return (None, spl[1], ':')
 51        if spl[0][-1] == ':':
 52            sp0 = spl[0][:-1]
 53            return (None if sp0 == '' else sp0, None if spl[1] == '' else spl[1], '::')
 54        return (None if spl[0] == '' else spl[0], None if spl[1] == '' else spl[1], ':')
 55
 56    @staticmethod
 57    def decode_ntv_tab(ntv, ntv_to_val):
 58        '''Generate a tuple data from a Ntv tab value (bytes, string, json, Ntv object)
 59
 60        *parameters:*
 61
 62        - **ntv**: Ntv data to decode,
 63        - **ntv_to_val**: method to convert external value form ntv in internal Field value
 64
 65        *Returns tuple: (name, dtype, codec, parent, keys, coef, leng)*
 66
 67        - name (None or string): name of the Field
 68        - dtype (None or string): type of data
 69        - codec (list): list of Field codec values
 70        - parent (None or int): Field parent or None
 71        - keys (None or list): Field keys
 72        - coef (None or int): coef if primary Field else None
 73        - leng (int): length of the Field
 74        '''
 75        typ = ntv.type_str if ntv.ntv_type else None
 76        nam = ntv.name
 77        val = ntv_to_val(ntv)
 78        if ntv.__class__.__name__ == 'NtvSingle':
 79            return (nam, typ, [val], None, None, None, 1)
 80        if len(ntv) < 2 or len(ntv) > 3 or ntv[0].__class__.__name__ == 'NtvSingle':
 81            return (nam, typ, val, None, None, None, len(ntv))
 82
 83        ntvc = ntv[0]
 84        leng = max(len(ind) for ind in ntv)
 85        typc = ntvc.type_str if ntvc.ntv_type else None
 86        valc = ntv_to_val(ntvc)
 87        if len(ntv) == 3 and ntv[1].__class__.__name__ == 'NtvSingle' and \
 88                isinstance(ntv[1].val, (int, str)) and \
 89                ntv[2].__class__.__name__ != 'NtvSingle' and \
 90                isinstance(ntv[2][0].val, int):
 91            return (nam, typc, valc, ntv[1].val, ntv[2].to_obj(), None, leng)
 92        if len(ntv) == 2 and len(ntv[1]) == 1 and isinstance(ntv[1].val, (int, str)):
 93            return (nam, typc, valc, ntv[1].val, None, None, leng)
 94        if len(ntv) == 2 and len(ntv[1]) == 1 and isinstance(ntv[1].val, list):
 95            leng = leng * ntv[1][0].val
 96            return (nam, typc, valc, None, None, ntv[1][0].val, leng)
 97        if len(ntv) == 2 and len(ntv[1]) > 1 and isinstance(ntv[1][0].val, int):
 98            return (nam, typc, valc, None, ntv[1].to_obj(), None, leng)
 99        return (nam, typ, val, None, None, None, len(ntv))
100
101    @staticmethod
102    def to_ntvpointer(jsonpointer, unique_root=False):
103        '''convert a json pointer inter a NTV pointer (string)
104
105        *parameters:*
106
107        - **jsonpointer**: String - json pointer to convert,
108        - **unique_root**: Boolean (default False) - True if the json root length is 1 '''
109        single = '/([0-9]+)(/[a-z])'
110        if unique_root and not ('0' <= jsonpointer[1] <= '9'):
111            return re.sub(single, '\g<2>', jsonpointer)[1:]
112        return re.sub(single, '\g<2>', jsonpointer)

The NtvUtil class includes static methods used by several NTV classes. NtvUtil is the parent class of Datatype, Namespace, Ntv.

class variables :

  • _namespaces_ : list of Namespace defined
  • _types_ : list of Datatype defined

static methods :

@staticmethod
def from_obj_name(string):
37    @staticmethod
38    def from_obj_name(string):
39        '''return a tuple with name, type_str and separator from string'''
40        if not isinstance(string, str):
41            raise NtvError('a json-name have to be str')
42        if string == '':
43            return (None, None, None)
44        spl = string.rsplit(':', maxsplit=1)
45        if len(spl) == 1:
46            if string[-1] == '.':
47                return (None, string, None)
48            return(string, None, None)
49        if spl[0] == '':
50            return (None, spl[1], ':')
51        if spl[0][-1] == ':':
52            sp0 = spl[0][:-1]
53            return (None if sp0 == '' else sp0, None if spl[1] == '' else spl[1], '::')
54        return (None if spl[0] == '' else spl[0], None if spl[1] == '' else spl[1], ':')

return a tuple with name, type_str and separator from string

@staticmethod
def decode_ntv_tab(ntv, ntv_to_val):
56    @staticmethod
57    def decode_ntv_tab(ntv, ntv_to_val):
58        '''Generate a tuple data from a Ntv tab value (bytes, string, json, Ntv object)
59
60        *parameters:*
61
62        - **ntv**: Ntv data to decode,
63        - **ntv_to_val**: method to convert external value form ntv in internal Field value
64
65        *Returns tuple: (name, dtype, codec, parent, keys, coef, leng)*
66
67        - name (None or string): name of the Field
68        - dtype (None or string): type of data
69        - codec (list): list of Field codec values
70        - parent (None or int): Field parent or None
71        - keys (None or list): Field keys
72        - coef (None or int): coef if primary Field else None
73        - leng (int): length of the Field
74        '''
75        typ = ntv.type_str if ntv.ntv_type else None
76        nam = ntv.name
77        val = ntv_to_val(ntv)
78        if ntv.__class__.__name__ == 'NtvSingle':
79            return (nam, typ, [val], None, None, None, 1)
80        if len(ntv) < 2 or len(ntv) > 3 or ntv[0].__class__.__name__ == 'NtvSingle':
81            return (nam, typ, val, None, None, None, len(ntv))
82
83        ntvc = ntv[0]
84        leng = max(len(ind) for ind in ntv)
85        typc = ntvc.type_str if ntvc.ntv_type else None
86        valc = ntv_to_val(ntvc)
87        if len(ntv) == 3 and ntv[1].__class__.__name__ == 'NtvSingle' and \
88                isinstance(ntv[1].val, (int, str)) and \
89                ntv[2].__class__.__name__ != 'NtvSingle' and \
90                isinstance(ntv[2][0].val, int):
91            return (nam, typc, valc, ntv[1].val, ntv[2].to_obj(), None, leng)
92        if len(ntv) == 2 and len(ntv[1]) == 1 and isinstance(ntv[1].val, (int, str)):
93            return (nam, typc, valc, ntv[1].val, None, None, leng)
94        if len(ntv) == 2 and len(ntv[1]) == 1 and isinstance(ntv[1].val, list):
95            leng = leng * ntv[1][0].val
96            return (nam, typc, valc, None, None, ntv[1][0].val, leng)
97        if len(ntv) == 2 and len(ntv[1]) > 1 and isinstance(ntv[1][0].val, int):
98            return (nam, typc, valc, None, ntv[1].to_obj(), None, leng)
99        return (nam, typ, val, None, None, None, len(ntv))

Generate a tuple data from a Ntv tab value (bytes, string, json, Ntv object)

parameters:

  • ntv: Ntv data to decode,
  • ntv_to_val: method to convert external value form ntv in internal Field value

Returns tuple: (name, dtype, codec, parent, keys, coef, leng)

  • name (None or string): name of the Field
  • dtype (None or string): type of data
  • codec (list): list of Field codec values
  • parent (None or int): Field parent or None
  • keys (None or list): Field keys
  • coef (None or int): coef if primary Field else None
  • leng (int): length of the Field
@staticmethod
def to_ntvpointer(jsonpointer, unique_root=False):
101    @staticmethod
102    def to_ntvpointer(jsonpointer, unique_root=False):
103        '''convert a json pointer inter a NTV pointer (string)
104
105        *parameters:*
106
107        - **jsonpointer**: String - json pointer to convert,
108        - **unique_root**: Boolean (default False) - True if the json root length is 1 '''
109        single = '/([0-9]+)(/[a-z])'
110        if unique_root and not ('0' <= jsonpointer[1] <= '9'):
111            return re.sub(single, '\g<2>', jsonpointer)[1:]
112        return re.sub(single, '\g<2>', jsonpointer)

convert a json pointer inter a NTV pointer (string)

parameters:

  • jsonpointer: String - json pointer to convert,
  • unique_root: Boolean (default False) - True if the json root length is 1
class NtvConnector(abc.ABC):
115class NtvConnector(ABC):
116    ''' The NtvConnector class is an abstract class used by all NTV connectors
117    for conversion between NTV-JSON data and NTV-OBJ data.
118
119    A NtvConnector child is defined by:
120    - clas_obj: str - define the class name of the object to convert
121    - clas_typ: str - define the Datatype of the converted object
122    - to_obj_ntv: method - converter from JsonNTV to the object
123    - to_json_ntv: method - converter from the object to JsonNTV
124
125    *class method :*
126    - `connector`
127    - `dic_connec`
128    - `castable` (@property)
129    - `dic_obj` (@property)
130    - `dic_type` (@property)
131
132    *abstract method*
133    - `to_obj_ntv`
134    - `to_json_ntv`
135
136    *static method*
137    - `cast`
138    - `uncast`
139    - `is_json_class`
140    - `is_json`
141    - `init_ntv_keys`
142    '''
143
144    DIC_NTV_CL = {'NtvSingle': 'ntv', 'NtvList': 'ntv'}
145    DIC_GEO_CL = {'Point': 'point', 'MultiPoint': 'multipoint', 'LineString': 'line',
146                  'MultiLineString': 'multiline', 'Polygon': 'polygon',
147                  'MultiPolygon': 'multipolygon'}
148    DIC_DAT_CL = {'date': 'date', 'time': 'time', 'datetime': 'datetime'}
149    DIC_FCT = {'date': datetime.date.fromisoformat, 'time': datetime.time.fromisoformat,
150               'datetime': datetime.datetime.fromisoformat}
151    DIC_GEO = {'point': 'point', 'multipoint': 'multipoint', 'line': 'linestring',
152               'multiline': 'multilinestring', 'polygon': 'polygon',
153               'multipolygon': 'multipolygon'}
154    DIC_CBOR = {'point': False, 'multipoint': False, 'line': False,
155                'multiline': False, 'polygon': False, 'multipolygon': False,
156                'date': True, 'time': False, 'datetime': True}
157    DIC_OBJ = {'tab': 'DataFrameConnec', 'field': 'SeriesConnec',
158               'point': 'ShapelyConnec', 'multipoint': 'ShapelyConnec',
159               'line': 'ShapelyConnec', 'multiline': 'ShapelyConnec',
160               'polygon': 'ShapelyConnec', 'multipolygon': 'ShapelyConnec',
161               'other': None}
162
163    @classmethod
164    @property
165    def castable(cls):
166        '''return a list with class_name allowed for json-obj conversion'''
167        return ['str', 'int', 'bool', 'float', 'dict', 'tuple', 'NoneType',
168                'NtvSingle', 'NtvList'] \
169            + list(NtvConnector.DIC_GEO_CL.keys()) \
170            + list(NtvConnector.DIC_FCT.keys()) \
171            + list(NtvConnector.dic_connec().keys())
172
173    @classmethod
174    @property
175    def dic_obj(cls):
176        '''return a dict with the connectors: { type: class_connec_name }'''
177        return {clas.clas_typ: clas.__name__ for clas in cls.__subclasses__()} |\
178            NtvConnector.DIC_OBJ
179
180    @classmethod
181    @property
182    def dic_type(cls):
183        '''return a dict with the connectors: { class_obj_name: type }'''
184        return {clas.clas_obj: clas.clas_typ for clas in cls.__subclasses__()} |\
185            NtvConnector.DIC_GEO_CL | NtvConnector.DIC_DAT_CL | NtvConnector.DIC_NTV_CL
186
187    @classmethod
188    def connector(cls):
189        '''return a dict with the connectors: { class_connec_name: class_connec }'''
190        return {clas.__name__: clas for clas in cls.__subclasses__()}
191
192    @classmethod
193    def dic_connec(cls):
194        '''return a dict with the clas associated to the connector:
195        { class_obj_name: class_connec_name }'''
196        return {clas.clas_obj: clas.__name__ for clas in cls.__subclasses__()}
197
198    @staticmethod
199    @abstractmethod
200    def to_obj_ntv(ntv_value, **kwargs):
201        ''' abstract - convert ntv_value into the return object'''
202
203    @staticmethod
204    @abstractmethod
205    def to_json_ntv(value, name=None, typ=None):
206        ''' abstract - convert NTV object (value, name, type) into the NTV json
207        (json-value, name, type)'''
208
209    @staticmethod
210    def cast(data, name=None, type_str=None):
211        '''return JSON-NTV conversion (json_value, name, type_str) of the NTV entity
212        defined in parameters.
213
214        *Parameters*
215
216        - **data**: NtvSingle entity or NTVvalue of the NTV entity
217        - **name** : String (default None) - name of the NTV entity
218        - **type_str**: String (default None) - type of the NTV entity
219        '''
220        clas = data.__class__.__name__
221        if clas == 'NtvSingle':
222            name = data.ntv_name
223            type_str = data.type_str
224            data = data.ntv_value
225        dic_geo_cl = NtvConnector.DIC_GEO_CL
226        dic_connec = NtvConnector.dic_connec()
227        match clas:
228            case 'int' | 'float' | 'bool' | 'str':
229                return (data, name, type_str)
230            case 'dict':
231                return ({key: NtvConnector.cast(val, name, type_str)[0]
232                         for key, val in data.items()}, name, type_str)
233            case 'list':
234                return ([NtvConnector.cast(val, name, type_str)[0] for val in data],
235                        name, NtvConnector._typ_obj(data) if not type_str else type_str)
236            case 'tuple':
237                return (list(data), name, 'array' if not type_str else type_str)
238            case 'date' | 'time' | 'datetime':
239                return (data.isoformat(), name, clas if not type_str else type_str)
240            case 'Point' | 'MultiPoint' | 'LineString' | 'MultiLineString' | \
241                    'Polygon' | 'MultiPolygon':
242                return (NtvConnector.connector()[dic_connec['geometry']].to_json_ntv(data)[0],
243                        name, dic_geo_cl[data.__class__.__name__] if not type_str else type_str)
244            case 'NtvSingle' | 'NtvList':
245                return (data.to_obj(), name, 'ntv' if not type_str else type_str)
246            case _:
247                connec = None
248                if clas in dic_connec and dic_connec[clas] in NtvConnector.connector():
249                    connec = NtvConnector.connector()[dic_connec[clas]]
250                if connec:
251                    return connec.to_json_ntv(data, name, type_str)
252                raise NtvError(
253                    'connector is not defined for the class : ', clas)
254        return (data, name, type_str)
255
256    @staticmethod
257    def uncast(value, name=None, type_str=None, **kwargs):
258        '''return OBJ-NTV conversion (obj_value, name, type_str) of a NTV entity
259
260        *Parameters*
261
262        - **data**: NtvSingle entity or NTVvalue of the NTV entity
263        - **name** : String (default None) - name of the NTV entity
264        - **type_str**: String (default None) - type of the NTV entity
265        '''
266        if type_str == 'json':
267            return (value, name, type_str)
268        if value.__class__.__name__ == 'NtvSingle':
269            if not (type_str in set(NtvConnector.dic_type.values()) and
270                    NtvConnector.is_json(value) or type_str is None):
271                return (value.ntv_value, value.name, value.type_str)
272            type_str = value.type_str if value.ntv_type else None
273            name = value.ntv_name
274            value = value.ntv_value
275        option = {'dicobj': {}, 'format': 'json', 'type_obj': False} | kwargs
276        value_obj = NtvConnector._uncast_val(value, type_str, **option)
277        return (value_obj, name, type_str if type_str else NtvConnector._typ_obj(value_obj))
278
279    @staticmethod
280    def _typ_obj(value):
281        if isinstance(value, dict):
282            return NtvConnector._typ_obj(list(value.values()))
283        if isinstance(value, (tuple, list)):
284            for val in value:
285                typ = NtvConnector._typ_obj(val)
286                if typ:
287                    return typ
288            return None
289        return NtvConnector.dic_type.get(value.__class__.__name__)
290
291    @staticmethod
292    def _uncast_val(value, type_n, **option):
293        '''return value from ntv value'''
294        dic_fct = NtvConnector.DIC_FCT
295        dic_geo = NtvConnector.DIC_GEO
296        dic_obj = NtvConnector.dic_obj | option['dicobj']
297        dic_cbor = NtvConnector.DIC_CBOR
298        if not type_n or type_n == 'json' or (option['format'] == 'cbor' and
299                                              not dic_cbor.get(type_n, False)):
300            return value
301        if type_n in dic_fct:
302            if isinstance(value, (tuple, list)):
303                return [NtvConnector._uncast_val(val, type_n, **option) for val in value]
304            if isinstance(value, dict):
305                return {key: NtvConnector._uncast_val(val, type_n, **option)
306                        for key, val in value.items()}
307            return dic_fct[type_n](value)
308        if type_n == 'array':
309            return tuple(value)
310        if type_n == 'ntv':
311            from json_ntv.ntv import Ntv
312            return Ntv.from_obj(value)
313        if type_n in dic_geo:
314            option['type_geo'] = dic_geo[type_n]
315        connec = None
316        if type_n in dic_obj and \
317                dic_obj[type_n] in NtvConnector.connector():
318            connec = NtvConnector.connector()[dic_obj[type_n]]
319        elif dic_obj['other'] in NtvConnector.connector():
320            connec = NtvConnector.connector()['other']
321        if connec:
322            return connec.to_obj_ntv(value, **option)
323        return value
324
325    @staticmethod
326    def is_json_class(val):
327        ''' return True if val is a json type'''
328        return val is None or isinstance(val, (list, int, str, float, bool, dict))
329
330    @staticmethod
331    def is_json(obj):
332        ''' check if obj is a json structure and return True if obj is a json-value
333
334        *Parameters*
335
336        - **obj** : object to check
337        - **ntvobj** : boolean (default False) - if True NTV class value are accepted'''
338        if obj is None:
339            return True
340        is_js = NtvConnector.is_json
341        match obj:
342            case str() | int() | float() | bool() as obj:
343                return True
344            case list() | tuple() as obj:
345                if not obj:
346                    return True
347                return min(is_js(obj_in) for obj_in in obj)
348            case dict() as obj:
349                if not obj:
350                    return True
351                if not min(isinstance(key, str) for key in obj.keys()):
352                    raise NtvError('key in dict in not string')
353                return min(is_js(obj_in) for obj_in in obj.values())
354            case _:
355                if not obj.__class__.__name__ in NtvConnector.castable:
356                    raise NtvError(obj.__class__.__name__ +
357                                   ' is not valid for NTV')
358                return False
359
360    @staticmethod
361    def keysfromderkeys(parentkeys, derkeys):
362        '''return keys from parent keys and derkeys
363
364        *Parameters*
365
366        - **parentkeys** : list of keys from parent
367        - **derkeys** : list of derived keys
368
369        *Returns* : list of keys'''
370        return [derkeys[pkey] for pkey in parentkeys]
371
372    @staticmethod
373    def encode_coef(lis):
374        '''Generate a repetition coefficient for periodic list'''
375        if len(lis) < 2:
376            return 0
377        coef = 1
378        while coef != len(lis):
379            if lis[coef-1] != lis[coef]:
380                break
381            coef += 1
382        if (not len(lis) % (coef * (max(lis) + 1)) and
383                lis == NtvConnector.keysfromcoef(coef, max(lis) + 1, len(lis))):
384            return coef
385        return 0
386
387    @staticmethod
388    def keysfromcoef(coef, period, leng=None):
389        ''' return a list of keys with periodic structure'''
390        if not leng:
391            leng = coef * period
392        return None if not (coef and period) else [(ind % (coef * period)) // coef
393                                                   for ind in range(leng)]
394
395    @staticmethod
396    def init_ntv_keys(ind, lidx, leng):
397        ''' initialization of explicit keys data in lidx object of tabular data'''
398        # name: 0, type: 1, codec: 2, parent: 3, keys: 4, coef: 5, leng: 6
399        name, typ, codec, parent, keys, coef, length = lidx[ind]
400        if (keys, parent, coef) == (None, None, None):  # full or unique
401            if len(codec) == 1:  # unique
402                lidx[ind][4] = [0] * leng
403            elif len(codec) == leng:    # full
404                lidx[ind][4] = list(range(leng))
405            else:
406                raise NtvError('impossible to generate keys')
407            return
408        if keys and len(keys) > 1 and parent is None:  # complete
409            return
410        if coef:  # primary
411            lidx[ind][4] = [(ikey % (coef * len(codec))) //
412                            coef for ikey in range(leng)]
413            lidx[ind][3] = None
414            return
415        if parent is None:
416            raise NtvError('keys not referenced')
417        if not lidx[parent][4] or len(lidx[parent][4]) != leng:
418            NtvConnector.init_ntv_keys(parent, lidx, leng)
419        if not keys and len(codec) == len(lidx[parent][2]):    # implicit
420            lidx[ind][4] = lidx[parent][4]
421            lidx[ind][3] = None
422            return
423        lidx[ind][4] = NtvConnector.keysfromderkeys(
424            lidx[parent][4], keys)  # relative
425        lidx[ind][3] = None
426        return

The NtvConnector class is an abstract class used by all NTV connectors for conversion between NTV-JSON data and NTV-OBJ data.

A NtvConnector child is defined by:

  • clas_obj: str - define the class name of the object to convert
  • clas_typ: str - define the Datatype of the converted object
  • to_obj_ntv: method - converter from JsonNTV to the object
  • to_json_ntv: method - converter from the object to JsonNTV

class method :

abstract method

static method

castable

return a list with class_name allowed for json-obj conversion

dic_obj

return a dict with the connectors: { type: class_connec_name }

dic_type

return a dict with the connectors: { class_obj_name: type }

@classmethod
def connector(cls):
187    @classmethod
188    def connector(cls):
189        '''return a dict with the connectors: { class_connec_name: class_connec }'''
190        return {clas.__name__: clas for clas in cls.__subclasses__()}

return a dict with the connectors: { class_connec_name: class_connec }

@classmethod
def dic_connec(cls):
192    @classmethod
193    def dic_connec(cls):
194        '''return a dict with the clas associated to the connector:
195        { class_obj_name: class_connec_name }'''
196        return {clas.clas_obj: clas.__name__ for clas in cls.__subclasses__()}

return a dict with the clas associated to the connector: { class_obj_name: class_connec_name }

@staticmethod
@abstractmethod
def to_obj_ntv(ntv_value, **kwargs):
198    @staticmethod
199    @abstractmethod
200    def to_obj_ntv(ntv_value, **kwargs):
201        ''' abstract - convert ntv_value into the return object'''

abstract - convert ntv_value into the return object

@staticmethod
@abstractmethod
def to_json_ntv(value, name=None, typ=None):
203    @staticmethod
204    @abstractmethod
205    def to_json_ntv(value, name=None, typ=None):
206        ''' abstract - convert NTV object (value, name, type) into the NTV json
207        (json-value, name, type)'''

abstract - convert NTV object (value, name, type) into the NTV json (json-value, name, type)

@staticmethod
def cast(data, name=None, type_str=None):
209    @staticmethod
210    def cast(data, name=None, type_str=None):
211        '''return JSON-NTV conversion (json_value, name, type_str) of the NTV entity
212        defined in parameters.
213
214        *Parameters*
215
216        - **data**: NtvSingle entity or NTVvalue of the NTV entity
217        - **name** : String (default None) - name of the NTV entity
218        - **type_str**: String (default None) - type of the NTV entity
219        '''
220        clas = data.__class__.__name__
221        if clas == 'NtvSingle':
222            name = data.ntv_name
223            type_str = data.type_str
224            data = data.ntv_value
225        dic_geo_cl = NtvConnector.DIC_GEO_CL
226        dic_connec = NtvConnector.dic_connec()
227        match clas:
228            case 'int' | 'float' | 'bool' | 'str':
229                return (data, name, type_str)
230            case 'dict':
231                return ({key: NtvConnector.cast(val, name, type_str)[0]
232                         for key, val in data.items()}, name, type_str)
233            case 'list':
234                return ([NtvConnector.cast(val, name, type_str)[0] for val in data],
235                        name, NtvConnector._typ_obj(data) if not type_str else type_str)
236            case 'tuple':
237                return (list(data), name, 'array' if not type_str else type_str)
238            case 'date' | 'time' | 'datetime':
239                return (data.isoformat(), name, clas if not type_str else type_str)
240            case 'Point' | 'MultiPoint' | 'LineString' | 'MultiLineString' | \
241                    'Polygon' | 'MultiPolygon':
242                return (NtvConnector.connector()[dic_connec['geometry']].to_json_ntv(data)[0],
243                        name, dic_geo_cl[data.__class__.__name__] if not type_str else type_str)
244            case 'NtvSingle' | 'NtvList':
245                return (data.to_obj(), name, 'ntv' if not type_str else type_str)
246            case _:
247                connec = None
248                if clas in dic_connec and dic_connec[clas] in NtvConnector.connector():
249                    connec = NtvConnector.connector()[dic_connec[clas]]
250                if connec:
251                    return connec.to_json_ntv(data, name, type_str)
252                raise NtvError(
253                    'connector is not defined for the class : ', clas)
254        return (data, name, type_str)

return JSON-NTV conversion (json_value, name, type_str) of the NTV entity defined in parameters.

Parameters

  • data: NtvSingle entity or NTVvalue of the NTV entity
  • name : String (default None) - name of the NTV entity
  • type_str: String (default None) - type of the NTV entity
@staticmethod
def uncast(value, name=None, type_str=None, **kwargs):
256    @staticmethod
257    def uncast(value, name=None, type_str=None, **kwargs):
258        '''return OBJ-NTV conversion (obj_value, name, type_str) of a NTV entity
259
260        *Parameters*
261
262        - **data**: NtvSingle entity or NTVvalue of the NTV entity
263        - **name** : String (default None) - name of the NTV entity
264        - **type_str**: String (default None) - type of the NTV entity
265        '''
266        if type_str == 'json':
267            return (value, name, type_str)
268        if value.__class__.__name__ == 'NtvSingle':
269            if not (type_str in set(NtvConnector.dic_type.values()) and
270                    NtvConnector.is_json(value) or type_str is None):
271                return (value.ntv_value, value.name, value.type_str)
272            type_str = value.type_str if value.ntv_type else None
273            name = value.ntv_name
274            value = value.ntv_value
275        option = {'dicobj': {}, 'format': 'json', 'type_obj': False} | kwargs
276        value_obj = NtvConnector._uncast_val(value, type_str, **option)
277        return (value_obj, name, type_str if type_str else NtvConnector._typ_obj(value_obj))

return OBJ-NTV conversion (obj_value, name, type_str) of a NTV entity

Parameters

  • data: NtvSingle entity or NTVvalue of the NTV entity
  • name : String (default None) - name of the NTV entity
  • type_str: String (default None) - type of the NTV entity
@staticmethod
def is_json_class(val):
325    @staticmethod
326    def is_json_class(val):
327        ''' return True if val is a json type'''
328        return val is None or isinstance(val, (list, int, str, float, bool, dict))

return True if val is a json type

@staticmethod
def is_json(obj):
330    @staticmethod
331    def is_json(obj):
332        ''' check if obj is a json structure and return True if obj is a json-value
333
334        *Parameters*
335
336        - **obj** : object to check
337        - **ntvobj** : boolean (default False) - if True NTV class value are accepted'''
338        if obj is None:
339            return True
340        is_js = NtvConnector.is_json
341        match obj:
342            case str() | int() | float() | bool() as obj:
343                return True
344            case list() | tuple() as obj:
345                if not obj:
346                    return True
347                return min(is_js(obj_in) for obj_in in obj)
348            case dict() as obj:
349                if not obj:
350                    return True
351                if not min(isinstance(key, str) for key in obj.keys()):
352                    raise NtvError('key in dict in not string')
353                return min(is_js(obj_in) for obj_in in obj.values())
354            case _:
355                if not obj.__class__.__name__ in NtvConnector.castable:
356                    raise NtvError(obj.__class__.__name__ +
357                                   ' is not valid for NTV')
358                return False

check if obj is a json structure and return True if obj is a json-value

Parameters

  • obj : object to check
  • ntvobj : boolean (default False) - if True NTV class value are accepted
@staticmethod
def keysfromderkeys(parentkeys, derkeys):
360    @staticmethod
361    def keysfromderkeys(parentkeys, derkeys):
362        '''return keys from parent keys and derkeys
363
364        *Parameters*
365
366        - **parentkeys** : list of keys from parent
367        - **derkeys** : list of derived keys
368
369        *Returns* : list of keys'''
370        return [derkeys[pkey] for pkey in parentkeys]

return keys from parent keys and derkeys

Parameters

  • parentkeys : list of keys from parent
  • derkeys : list of derived keys

Returns : list of keys

@staticmethod
def encode_coef(lis):
372    @staticmethod
373    def encode_coef(lis):
374        '''Generate a repetition coefficient for periodic list'''
375        if len(lis) < 2:
376            return 0
377        coef = 1
378        while coef != len(lis):
379            if lis[coef-1] != lis[coef]:
380                break
381            coef += 1
382        if (not len(lis) % (coef * (max(lis) + 1)) and
383                lis == NtvConnector.keysfromcoef(coef, max(lis) + 1, len(lis))):
384            return coef
385        return 0

Generate a repetition coefficient for periodic list

@staticmethod
def keysfromcoef(coef, period, leng=None):
387    @staticmethod
388    def keysfromcoef(coef, period, leng=None):
389        ''' return a list of keys with periodic structure'''
390        if not leng:
391            leng = coef * period
392        return None if not (coef and period) else [(ind % (coef * period)) // coef
393                                                   for ind in range(leng)]

return a list of keys with periodic structure

@staticmethod
def init_ntv_keys(ind, lidx, leng):
395    @staticmethod
396    def init_ntv_keys(ind, lidx, leng):
397        ''' initialization of explicit keys data in lidx object of tabular data'''
398        # name: 0, type: 1, codec: 2, parent: 3, keys: 4, coef: 5, leng: 6
399        name, typ, codec, parent, keys, coef, length = lidx[ind]
400        if (keys, parent, coef) == (None, None, None):  # full or unique
401            if len(codec) == 1:  # unique
402                lidx[ind][4] = [0] * leng
403            elif len(codec) == leng:    # full
404                lidx[ind][4] = list(range(leng))
405            else:
406                raise NtvError('impossible to generate keys')
407            return
408        if keys and len(keys) > 1 and parent is None:  # complete
409            return
410        if coef:  # primary
411            lidx[ind][4] = [(ikey % (coef * len(codec))) //
412                            coef for ikey in range(leng)]
413            lidx[ind][3] = None
414            return
415        if parent is None:
416            raise NtvError('keys not referenced')
417        if not lidx[parent][4] or len(lidx[parent][4]) != leng:
418            NtvConnector.init_ntv_keys(parent, lidx, leng)
419        if not keys and len(codec) == len(lidx[parent][2]):    # implicit
420            lidx[ind][4] = lidx[parent][4]
421            lidx[ind][3] = None
422            return
423        lidx[ind][4] = NtvConnector.keysfromderkeys(
424            lidx[parent][4], keys)  # relative
425        lidx[ind][3] = None
426        return

initialization of explicit keys data in lidx object of tabular data

class NtvTree:
429class NtvTree:
430    ''' The NtvTree class is an iterator class used to traverse a NTV tree structure.
431    Some other methods give tree indicators and data.
432
433    *Attributes :*
434
435    - **ntv** : Ntv entity
436    - **_node**:  Ntv entity - node pointer
437    - **_stack**:  list - stack used to store context in recursive methods
438
439    *dynamic values (@property)*
440    - `breadth`
441    - `size`
442    - `height`
443    - `adjacency_list`
444    - `nodes`
445    - `dic_nodes`
446    - `leaf_nodes`
447    - `inner_nodes`
448    '''
449
450    def __init__(self, ntv):
451        ''' the parameter of the constructor is the Ntv entity'''
452        self._ntv = ntv
453        self._node = None
454        self._stack = []
455
456    def __iter__(self):
457        ''' iterator without initialization'''
458        return self
459
460    def __next__(self):
461        ''' return next node in the tree'''
462        if self._node is None:
463            self._node = self._ntv
464        elif len(self._node) == 0:
465            raise StopIteration
466        elif self._node.__class__.__name__ == 'NtvList':
467            self._next_down()
468        else:
469            self._next_up()
470        return self._node
471
472    def _next_down(self):
473        ''' find the next subchild node'''
474
475        self._node = self._node[0]
476        self._stack.append(0)
477
478    def _next_up(self):
479        ''' find the next sibling or ancestor node'''
480        parent = self._node.parent
481        if not parent or self._node == self._ntv:
482            raise StopIteration
483        ind = self._stack[-1]
484        if ind < len(parent) - 1:  # if ind is not the last
485            self._node = parent[ind + 1]
486            self._stack[-1] += 1
487        else:
488            if parent == self._ntv:
489                raise StopIteration
490            self._node = parent
491            self._stack.pop()
492            self._next_up()
493
494    @property
495    def breadth(self):
496        ''' return the number of leaves'''
497        return len(self.leaf_nodes)
498
499    @property
500    def size(self):
501        ''' return the number of nodes'''
502        return len(self.nodes)
503
504    @property
505    def height(self):
506        ''' return the height of the tree'''
507        return max(len(node.pointer()) for node in self.__class__(self._ntv)) - 1
508
509    @property
510    def adjacency_list(self):
511        ''' return a dict with the list of child nodes for each parent node'''
512        return {node: node.val for node in self.inner_nodes}
513
514    @property
515    def nodes(self):
516        ''' return the list of nodes according to the DFS preordering algorithm'''
517        return list(self.__class__(self._ntv))
518
519    @property
520    def dic_nodes(self):
521        ''' return a dict of nodes according to the DFS preordering algorithm'''
522        return {node.ntv_name: node for node in self.__class__(self._ntv)
523                if node.ntv_name}
524
525    @property
526    def leaf_nodes(self):
527        ''' return the list of leaf nodes according to the DFS preordering algorithm'''
528        return [node for node in self.__class__(self._ntv)
529                if node.__class__.__name__ == 'NtvSingle']
530
531    @property
532    def inner_nodes(self):
533        ''' return the list of inner nodes according to the DFS preordering algorithm'''
534        return [node for node in self.__class__(self._ntv)
535                if node.__class__.__name__ == 'NtvList']

The NtvTree class is an iterator class used to traverse a NTV tree structure. Some other methods give tree indicators and data.

Attributes :

  • ntv : Ntv entity
  • _node: Ntv entity - node pointer
  • _stack: list - stack used to store context in recursive methods

dynamic values (@property)

NtvTree(ntv)
450    def __init__(self, ntv):
451        ''' the parameter of the constructor is the Ntv entity'''
452        self._ntv = ntv
453        self._node = None
454        self._stack = []

the parameter of the constructor is the Ntv entity

breadth

return the number of leaves

size

return the number of nodes

height

return the height of the tree

adjacency_list

return a dict with the list of child nodes for each parent node

nodes

return the list of nodes according to the DFS preordering algorithm

dic_nodes

return a dict of nodes according to the DFS preordering algorithm

leaf_nodes

return the list of leaf nodes according to the DFS preordering algorithm

inner_nodes

return the list of inner nodes according to the DFS preordering algorithm

class NtvJsonEncoder(json.encoder.JSONEncoder):
538class NtvJsonEncoder(json.JSONEncoder):
539    """json encoder for Ntv data"""
540
541    def default(self, o):
542        try:
543            return NtvConnector.cast(o)[0]
544        except NtvError:
545            return json.JSONEncoder.default(self, o)
546        if isinstance(o, (datetime.datetime, datetime.date, datetime.time)):
547            return o.isoformat()
548        return json.JSONEncoder.default(self, o)

json encoder for Ntv data

def default(self, o):
541    def default(self, o):
542        try:
543            return NtvConnector.cast(o)[0]
544        except NtvError:
545            return json.JSONEncoder.default(self, o)
546        if isinstance(o, (datetime.datetime, datetime.date, datetime.time)):
547            return o.isoformat()
548        return json.JSONEncoder.default(self, o)

Implement this method in a subclass such that it returns a serializable object for o, or calls the base implementation (to raise a TypeError).

For example, to support arbitrary iterators, you could implement default like this::

def default(self, o):
    try:
        iterable = iter(o)
    except TypeError:
        pass
    else:
        return list(iterable)
    # Let the base class default method raise the TypeError
    return JSONEncoder.default(self, o)
Inherited Members
json.encoder.JSONEncoder
JSONEncoder
encode
iterencode
class NtvError(builtins.Exception):
551class NtvError(Exception):
552    ''' NTV Exception'''

NTV Exception

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback