NTV.json_ntv.ntv_util

Created on Feb 27 2023

@author: Philippe@loco-labs.io

The ntv_util module is part of the NTV.json_ntv package (specification document).

It contains the classes NtvUtil, NtvConnector, NtvTree, NtvJsonEncoder and NtvError for NTV entities.

  1# -*- coding: utf-8 -*-
  2"""
  3Created on Feb 27 2023
  4
  5@author: Philippe@loco-labs.io
  6
  7The `ntv_util` module is part of the `NTV.json_ntv` package ([specification document](
  8https://github.com/loco-philippe/NTV/blob/main/documentation/JSON-NTV-standard.pdf)).
  9
 10It contains the classes `NtvUtil`, `NtvConnector`, `NtvTree`, `NtvJsonEncoder`
 11and `NtvError` for NTV entities.
 12"""
 13from abc import ABC, abstractmethod
 14import datetime
 15import json
 16import re
 17
 18
 19class NtvUtil:
 20    ''' The NtvUtil class includes static methods used by several NTV classes.
 21    NtvUtil is the parent class of `Datatype`, `Namespace`, `Ntv`.
 22
 23    *class variables :*
 24    - **_namespaces_** : list of Namespace defined
 25    - **_types_** : list of Datatype defined
 26
 27    *static methods :*
 28    - `is_dictable`
 29    - `from_obj_name`
 30    - `decode_ntv_tab`
 31    - `to_ntv_pointer`
 32
 33    '''
 34    _namespaces_ = {}
 35    _types_ = {}
 36
 37    @staticmethod
 38    def is_dictable(lis):
 39        '''check if a list can be translated in a dict'''
 40        keys = set()
 41        for val in lis:
 42            if not isinstance(val, dict):
 43                return False
 44            if len(val) != 1:
 45                return False
 46            keys.add(list(val)[0])
 47        return len(keys) == len(lis)
 48
 49    @staticmethod
 50    def from_obj_name(string):
 51        '''return a tuple with name, type_str and separator from string'''
 52        if not isinstance(string, str):
 53            raise NtvError('a json-name have to be str')
 54        if string == '':
 55            return (None, None, None)
 56        spl = string.rsplit(':', maxsplit=1)
 57        if len(spl) == 1:
 58            if string[-1] == '.':
 59                return (None, string, None)
 60            return (string, None, None)
 61        if spl[0] == '':
 62            return (None, spl[1], ':')
 63        if spl[0][-1] == ':':
 64            sp0 = spl[0][:-1]
 65            return (None if sp0 == '' else sp0, None if spl[1] == '' else spl[1], '::')
 66        return (None if spl[0] == '' else spl[0], None if spl[1] == '' else spl[1], ':')
 67
 68    @staticmethod
 69    def decode_ntv_tab(ntv, ntv_to_val):
 70        '''Generate a tuple data from a Ntv tab value (bytes, string, json, Ntv object)
 71
 72        *parameters:*
 73
 74        - **ntv**: Ntv data to decode,
 75        - **ntv_to_val**: method to convert external value form ntv in internal Field value
 76
 77        *Returns tuple: (name, dtype, codec, parent, keys, coef, leng)*
 78
 79        - name (None or string): name of the Field
 80        - dtype (None or string): type of data
 81        - codec (list): list of Field codec values
 82        - parent (None or int): Field parent or None
 83        - keys (None or list): Field keys
 84        - coef (None or int): coef if primary Field else None
 85        - leng (int): length of the Field
 86        '''
 87        typ = ntv.type_str if ntv.ntv_type else None
 88        nam = ntv.name
 89        val = ntv_to_val(ntv)
 90        if ntv.__class__.__name__ == 'NtvSingle':
 91            return (nam, typ, [val], None, None, None, 1)
 92        if len(ntv) < 2 or len(ntv) > 3 or ntv[0].__class__.__name__ == 'NtvSingle':
 93            return (nam, typ, val, None, None, None, len(ntv))
 94
 95        ntvc = ntv[0]
 96        leng = max(len(ind) for ind in ntv)
 97        typc = ntvc.type_str if ntvc.ntv_type else None
 98        valc = ntv_to_val(ntvc)
 99        if len(ntv) == 3 and ntv[1].__class__.__name__ == 'NtvSingle' and \
100                isinstance(ntv[1].val, (int, str)) and \
101                ntv[2].__class__.__name__ != 'NtvSingle' and \
102                isinstance(ntv[2][0].val, int):
103            return (nam, typc, valc, ntv[1].val, ntv[2].to_obj(), None, leng)
104        if len(ntv) == 2 and len(ntv[1]) == 1 and isinstance(ntv[1].val, (int, str)):
105            return (nam, typc, valc, ntv[1].val, None, None, leng)
106        if len(ntv) == 2 and len(ntv[1]) == 1 and isinstance(ntv[1].val, list):
107            leng = leng * ntv[1][0].val
108            return (nam, typc, valc, None, None, ntv[1][0].val, leng)
109        if len(ntv) == 2 and len(ntv[1]) > 1 and isinstance(ntv[1][0].val, int):
110            return (nam, typc, valc, None, ntv[1].to_obj(), None, leng)
111        return (nam, typ, val, None, None, None, len(ntv))
112
113    @staticmethod
114    def to_ntvpointer(jsonpointer, unique_root=False):
115        '''convert a json pointer inter a NTV pointer (string)
116
117        *parameters:*
118
119        - **jsonpointer**: String - json pointer to convert,
120        - **unique_root**: Boolean (default False) - True if the json root length is 1 '''
121        single = '/([0-9]+)(/[a-z])'
122        if unique_root and not '0' <= jsonpointer[1] <= '9':
123            return re.sub(single, r'\g<2>', jsonpointer)[1:]
124        return re.sub(single, r'\g<2>', jsonpointer)
125
126
127class NtvConnector(ABC):
128    ''' The NtvConnector class is an abstract class used by all NTV connectors
129    for conversion between NTV-JSON data and NTV-OBJ data.
130
131    A NtvConnector child is defined by:
132    - clas_obj: str - define the class name of the object to convert
133    - clas_typ: str - define the Datatype of the converted object
134    - to_obj_ntv: method - converter from JsonNTV to the object
135    - to_json_ntv: method - converter from the object to JsonNTV
136
137    *class method :*
138    - `connector`
139    - `dic_connec`
140    - `castable` (@property)
141    - `dic_obj` (@property)
142    - `dic_type` (@property)
143
144    *abstract method*
145    - `to_obj_ntv`
146    - `to_json_ntv`
147
148    *static method*
149    - `cast`
150    - `uncast`
151    - `is_json_class`
152    - `is_json`
153    - `init_ntv_keys`
154    '''
155
156    DIC_NTV_CL = {'NtvSingle': 'ntv', 'NtvList': 'ntv'}
157    DIC_GEO_CL = {'Point': 'point', 'MultiPoint': 'multipoint', 'LineString': 'line',
158                  'MultiLineString': 'multiline', 'Polygon': 'polygon',
159                  'MultiPolygon': 'multipolygon'}
160    DIC_DAT_CL = {'date': 'date', 'time': 'time', 'datetime': 'datetime'}
161    DIC_FCT = {'date': datetime.date.fromisoformat, 'time': datetime.time.fromisoformat,
162               'datetime': datetime.datetime.fromisoformat}
163    DIC_GEO = {'point': 'point', 'multipoint': 'multipoint', 'line': 'linestring',
164               'multiline': 'multilinestring', 'polygon': 'polygon',
165               'multipolygon': 'multipolygon'}
166    DIC_CBOR = {'point': False, 'multipoint': False, 'line': False,
167                'multiline': False, 'polygon': False, 'multipolygon': False,
168                'date': True, 'time': False, 'datetime': True}
169    DIC_OBJ = {'tab': 'DataFrameConnec', 'field': 'SeriesConnec',
170               'ndarray': 'NdarrayConnec',
171               'point': 'ShapelyConnec', 'multipoint': 'ShapelyConnec',
172               'line': 'ShapelyConnec', 'multiline': 'ShapelyConnec',
173               'polygon': 'ShapelyConnec', 'multipolygon': 'ShapelyConnec',
174               'other': None}
175
176    @classmethod
177    @property
178    def castable(cls):
179        '''return a list with class_name allowed for json-obj conversion'''
180        return ['str', 'int', 'bool', 'float', 'dict', 'tuple', 'NoneType',
181                'NtvSingle', 'NtvList'] \
182            + list(NtvConnector.DIC_GEO_CL.keys()) \
183            + list(NtvConnector.DIC_FCT.keys()) \
184            + list(NtvConnector.dic_connec().keys())
185
186    @classmethod
187    @property
188    def dic_obj(cls):
189        '''return a dict with the connectors: { type: class_connec_name }'''
190        return {clas.clas_typ: clas.__name__ for clas in cls.__subclasses__()} |\
191            NtvConnector.DIC_OBJ
192
193    @classmethod
194    @property
195    def dic_type(cls):
196        '''return a dict with the connectors: { class_obj_name: type }'''
197        return {clas.clas_obj: clas.clas_typ for clas in cls.__subclasses__()} |\
198            NtvConnector.DIC_GEO_CL | NtvConnector.DIC_DAT_CL | NtvConnector.DIC_NTV_CL
199
200    @classmethod
201    def connector(cls):
202        '''return a dict with the connectors: { class_connec_name: class_connec }'''
203        return {clas.__name__: clas for clas in cls.__subclasses__()}
204
205    @classmethod
206    def dic_connec(cls):
207        '''return a dict with the clas associated to the connector:
208        { class_obj_name: class_connec_name }'''
209        return {clas.clas_obj: clas.__name__ for clas in cls.__subclasses__()}
210
211    @staticmethod
212    @abstractmethod
213    def to_obj_ntv(ntv_value, **kwargs):
214        ''' abstract - convert ntv_value into the return object'''
215
216    @staticmethod
217    @abstractmethod
218    def to_json_ntv(value, name=None, typ=None):
219        ''' abstract - convert NTV object (value, name, type) into the NTV json
220        (json-value, name, type)'''
221
222    @staticmethod
223    def cast(data, name=None, type_str=None):
224        '''return JSON-NTV conversion (json_value, name, type_str) of the NTV entity
225        defined in parameters.
226
227        *Parameters*
228
229        - **data**: NtvSingle entity or NTVvalue of the NTV entity
230        - **name** : String (default None) - name of the NTV entity
231        - **type_str**: String (default None) - type of the NTV entity
232        '''
233        clas = data.__class__.__name__
234        if clas == 'NtvSingle':
235            name = data.ntv_name
236            type_str = data.type_str
237            data = data.ntv_value
238        dic_geo_cl = NtvConnector.DIC_GEO_CL
239        dic_connec = NtvConnector.dic_connec()
240        match clas:
241            case 'int' | 'float' | 'bool' | 'str':
242                return (data, name, type_str)
243            case 'dict':
244                return ({key: NtvConnector.cast(val, name, type_str)[0]
245                         for key, val in data.items()}, name, type_str)
246            case 'list':
247                return ([NtvConnector.cast(val, name, type_str)[0] for val in data],
248                        name, NtvConnector._typ_obj(data) if not type_str else type_str)
249            case 'tuple':
250                return (list(data), name, 'array' if not type_str else type_str)
251            case 'date' | 'time' | 'datetime':
252                return (data.isoformat(), name, clas if not type_str else type_str)
253            case 'Point' | 'MultiPoint' | 'LineString' | 'MultiLineString' | \
254                    'Polygon' | 'MultiPolygon':
255                return (NtvConnector.connector()[dic_connec['geometry']].to_json_ntv(data)[0],
256                        name, dic_geo_cl[data.__class__.__name__] if not type_str else type_str)
257            case 'NtvSingle' | 'NtvList':
258                return (data.to_obj(), name, 'ntv' if not type_str else type_str)
259            case _:
260                connec = None
261                if clas in dic_connec and dic_connec[clas] in NtvConnector.connector():
262                    connec = NtvConnector.connector()[dic_connec[clas]]
263                if connec:
264                    return connec.to_json_ntv(data, name, type_str)
265                raise NtvError(
266                    'connector is not defined for the class : ', clas)
267        return (data, name, type_str)
268
269    @staticmethod
270    def uncast(value, name=None, type_str=None, **kwargs):
271        '''return OBJ-NTV conversion (obj_value, name, type_str) of a NTV entity
272
273        *Parameters*
274
275        - **value**: NtvSingle entity or NTVvalue of the NTV entity
276        - **name** : String (default None) - name of the NTV entity
277        - **type_str**: String (default None) - type of the NTV entity
278        '''
279        if type_str == 'json':
280            return (value, name, type_str)
281        typebase_str = type_str
282        if value.__class__.__name__ == 'NtvSingle':
283            if not (type_str in set(NtvConnector.dic_type.values()) and
284                    NtvConnector.is_json(value) or type_str is None):
285                return (value.ntv_value, value.name, value.type_str)
286            type_str = value.type_str if value.ntv_type else None
287            typebase_str = value.typebase_str if value.ntv_type else None
288            name = value.ntv_name
289            value = value.ntv_value
290        option = {'dicobj': {}, 'format': 'json', 'type_obj': False} | kwargs
291        value_obj = NtvConnector._uncast_val(value, typebase_str, **option)
292        return (value_obj, name, type_str if type_str else NtvConnector._typ_obj(value_obj))
293
294    @staticmethod
295    def _typ_obj(value):
296        if isinstance(value, dict):
297            return NtvConnector._typ_obj(list(value.values()))
298        if isinstance(value, (tuple, list)):
299            for val in value:
300                typ = NtvConnector._typ_obj(val)
301                if typ:
302                    return typ
303            return None
304        return NtvConnector.dic_type.get(value.__class__.__name__)
305
306    @staticmethod
307    def _uncast_val(value, type_n, **option):
308        '''return value from ntv value
309
310        *Parameters*
311
312        - **value**: NtvSingle entity or NTVvalue of the NTV entity
313        '''
314        dic_fct = NtvConnector.DIC_FCT
315        dic_geo = NtvConnector.DIC_GEO
316        dic_obj = NtvConnector.dic_obj | option['dicobj']
317        dic_cbor = NtvConnector.DIC_CBOR
318        if not type_n or type_n == 'json' or (option['format'] == 'cbor' and
319                                              not dic_cbor.get(type_n, False)):
320            return value
321        if type_n in dic_fct:
322            if isinstance(value, (tuple, list)):
323                return [NtvConnector._uncast_val(val, type_n, **option) for val in value]
324            if isinstance(value, dict):
325                return {key: NtvConnector._uncast_val(val, type_n, **option)
326                        for key, val in value.items()}
327            return dic_fct[type_n](value)
328        if type_n == 'array':
329            return tuple(value)
330        if type_n == 'ntv':
331            from json_ntv.ntv import Ntv
332            return Ntv.from_obj(value)
333        if type_n in dic_geo:
334            option['type_geo'] = dic_geo[type_n]
335        connec = None
336        if type_n in dic_obj and \
337                dic_obj[type_n] in NtvConnector.connector():
338            connec = NtvConnector.connector()[dic_obj[type_n]]
339        elif dic_obj['other'] in NtvConnector.connector():
340            connec = NtvConnector.connector()['other']
341        if connec:
342            return connec.to_obj_ntv(value, **option)
343        return value
344
345    @staticmethod
346    def is_json_class(val):
347        ''' return True if val is a json type'''
348        return val is None or isinstance(val, (list, int, str, float, bool, dict))
349
350    @staticmethod
351    def is_json(obj):
352        ''' check if obj is a json structure and return True if obj is a json-value
353
354        *Parameters*
355
356        - **obj** : object to check
357        - **ntvobj** : boolean (default False) - if True NTV class value are accepted'''
358        if obj is None:
359            return True
360        is_js = NtvConnector.is_json
361        match obj:
362            case str() | int() | float() | bool() as obj:
363                return True
364            case list() | tuple() as obj:
365                if not obj:
366                    return True
367                return min(is_js(obj_in) for obj_in in obj)
368            case dict() as obj:
369                if not obj:
370                    return True
371                if not min(isinstance(key, str) for key in obj.keys()):
372                    raise NtvError('key in dict in not string')
373                return min(is_js(obj_in) for obj_in in obj.values())
374            case _:
375                if obj.__class__.__name__ not in NtvConnector.castable:
376                    raise NtvError(obj.__class__.__name__ +
377                                   ' is not valid for NTV')
378                return False
379
380    @staticmethod
381    def keysfromderkeys(parentkeys, derkeys):
382        '''return keys from parent keys and derkeys
383
384        *Parameters*
385
386        - **parentkeys** : list of keys from parent
387        - **derkeys** : list of derived keys
388
389        *Returns* : list of keys'''
390        return [derkeys[pkey] for pkey in parentkeys]
391
392    @staticmethod
393    def encode_coef(lis):
394        '''Generate a repetition coefficient for periodic list'''
395        if len(lis) < 2:
396            return 0
397        coef = 1
398        while coef != len(lis):
399            if lis[coef-1] != lis[coef]:
400                break
401            coef += 1
402        if (not len(lis) % (coef * (max(lis) + 1)) and
403                lis == NtvConnector.keysfromcoef(coef, max(lis) + 1, len(lis))):
404            return coef
405        return 0
406
407    @staticmethod
408    def keysfromcoef(coef, period, leng=None):
409        ''' return a list of keys with periodic structure'''
410        if not leng:
411            leng = coef * period
412        return None if not (coef and period) else [(ind % (coef * period)) // coef
413                                                   for ind in range(leng)]
414
415    @staticmethod
416    def format_field(lidx_decode, leng):
417        ''' return a list of format'''
418        # name: 0, type: 1, codec: 2, parent: 3, keys: 4, coef: 5, leng: 6
419        field_format = []
420        for field_decode in lidx_decode:
421            name, typ, codec, parent, keys, coef, length = field_decode
422            if (keys, parent, coef) == (None, None, None):  # full or unique
423                if len(codec) == 1:  # unique
424                    field_format.append('unique')
425                elif len(codec) == leng:    # full
426                    field_format.append('full')
427                else:
428                    raise NtvError('impossible to generate keys')
429                continue
430            if keys and len(keys) > 1 and parent is None:  # complete
431                field_format.append('complete')
432                continue
433            if coef:  # primary
434                field_format.append('primary')
435                continue
436            if parent is None:
437                raise NtvError('keys not referenced')
438            if not keys:    # implicit
439                field_format.append('implicit')
440                continue
441            field_format.append('relative')
442        return field_format
443
444    @staticmethod
445    def init_ntv_keys(ind, lidx, leng):
446        ''' initialization of explicit keys data in lidx object of tabular data'''
447        # name: 0, type: 1, codec: 2, parent: 3, keys: 4, coef: 5, leng: 6
448        name, typ, codec, parent, keys, coef, length = lidx[ind]
449        if (keys, parent, coef) == (None, None, None):  # full or unique
450            if len(codec) == 1:  # unique
451                lidx[ind][4] = [0] * leng
452            elif len(codec) == leng:    # full
453                lidx[ind][4] = list(range(leng))
454            else:
455                raise NtvError('impossible to generate keys')
456            return
457        if keys and len(keys) > 1 and parent is None:  # complete
458            return
459        if coef:  # primary
460            lidx[ind][4] = [(ikey % (coef * len(codec))) //
461                            coef for ikey in range(leng)]
462            lidx[ind][3] = None
463            return
464        if parent is None:
465            raise NtvError('keys not referenced')
466        if not lidx[parent][4] or len(lidx[parent][4]) != leng:
467            NtvConnector.init_ntv_keys(parent, lidx, leng)
468        if not keys and len(codec) == len(lidx[parent][2]):    # implicit
469            lidx[ind][4] = lidx[parent][4]
470            lidx[ind][3] = None
471            return
472        lidx[ind][4] = NtvConnector.keysfromderkeys(
473            lidx[parent][4], keys)  # relative
474        lidx[ind][3] = None
475        return
476
477
478class NtvTree:
479    ''' The NtvTree class is an iterator class used to traverse a NTV tree structure.
480    Some other methods give tree indicators and data.
481
482    *Attributes :*
483
484    - **ntv** : Ntv entity
485    - **_node**:  Ntv entity - node pointer
486    - **_stack**:  list - stack used to store context in recursive methods
487
488    *dynamic values (@property)*
489    - `breadth`
490    - `size`
491    - `height`
492    - `adjacency_list`
493    - `nodes`
494    - `dic_nodes`
495    - `leaf_nodes`
496    - `inner_nodes`
497    '''
498
499    def __init__(self, ntv):
500        ''' the parameter of the constructor is the Ntv entity'''
501        self._ntv = ntv
502        self._node = None
503        self._stack = []
504
505    def __iter__(self):
506        ''' iterator without initialization'''
507        return self
508
509    def __next__(self):
510        ''' return next node in the tree'''
511        if self._node is None:
512            self._node = self._ntv
513        elif len(self._node) == 0:
514            raise StopIteration
515        elif self._node.__class__.__name__ == 'NtvList':
516            self._next_down()
517        else:
518            self._next_up()
519        return self._node
520
521    def _next_down(self):
522        ''' find the next subchild node'''
523
524        self._node = self._node[0]
525        self._stack.append(0)
526
527    def _next_up(self):
528        ''' find the next sibling or ancestor node'''
529        parent = self._node.parent
530        if not parent or self._node == self._ntv:
531            raise StopIteration
532        ind = self._stack[-1]
533        if ind < len(parent) - 1:  # if ind is not the last
534            self._node = parent[ind + 1]
535            self._stack[-1] += 1
536        else:
537            if parent == self._ntv:
538                raise StopIteration
539            self._node = parent
540            self._stack.pop()
541            self._next_up()
542
543    @property
544    def breadth(self):
545        ''' return the number of leaves'''
546        return len(self.leaf_nodes)
547
548    @property
549    def size(self):
550        ''' return the number of nodes'''
551        return len(self.nodes)
552
553    @property
554    def height(self):
555        ''' return the height of the tree'''
556        return max(len(node.pointer()) for node in self.__class__(self._ntv)) - 1
557
558    @property
559    def adjacency_list(self):
560        ''' return a dict with the list of child nodes for each parent node'''
561        return {node: node.val for node in self.inner_nodes}
562
563    @property
564    def nodes(self):
565        ''' return the list of nodes according to the DFS preordering algorithm'''
566        return list(self.__class__(self._ntv))
567
568    @property
569    def dic_nodes(self):
570        ''' return a dict of nodes according to the DFS preordering algorithm'''
571        return {node.ntv_name: node for node in self.__class__(self._ntv)
572                if node.ntv_name}
573
574    @property
575    def leaf_nodes(self):
576        ''' return the list of leaf nodes according to the DFS preordering algorithm'''
577        return [node for node in self.__class__(self._ntv)
578                if node.__class__.__name__ == 'NtvSingle']
579
580    @property
581    def inner_nodes(self):
582        ''' return the list of inner nodes according to the DFS preordering algorithm'''
583        return [node for node in self.__class__(self._ntv)
584                if node.__class__.__name__ == 'NtvList']
585
586
587class NtvJsonEncoder(json.JSONEncoder):
588    """json encoder for Ntv data"""
589
590    def default(self, o):
591        try:
592            return NtvConnector.cast(o)[0]
593        except NtvError:
594            return json.JSONEncoder.default(self, o)
595        if isinstance(o, (datetime.datetime, datetime.date, datetime.time)):
596            return o.isoformat()
597        return json.JSONEncoder.default(self, o)
598
599
600class NtvError(Exception):
601    ''' NTV Exception'''
class NtvUtil:
 20class NtvUtil:
 21    ''' The NtvUtil class includes static methods used by several NTV classes.
 22    NtvUtil is the parent class of `Datatype`, `Namespace`, `Ntv`.
 23
 24    *class variables :*
 25    - **_namespaces_** : list of Namespace defined
 26    - **_types_** : list of Datatype defined
 27
 28    *static methods :*
 29    - `is_dictable`
 30    - `from_obj_name`
 31    - `decode_ntv_tab`
 32    - `to_ntv_pointer`
 33
 34    '''
 35    _namespaces_ = {}
 36    _types_ = {}
 37
 38    @staticmethod
 39    def is_dictable(lis):
 40        '''check if a list can be translated in a dict'''
 41        keys = set()
 42        for val in lis:
 43            if not isinstance(val, dict):
 44                return False
 45            if len(val) != 1:
 46                return False
 47            keys.add(list(val)[0])
 48        return len(keys) == len(lis)
 49
 50    @staticmethod
 51    def from_obj_name(string):
 52        '''return a tuple with name, type_str and separator from string'''
 53        if not isinstance(string, str):
 54            raise NtvError('a json-name have to be str')
 55        if string == '':
 56            return (None, None, None)
 57        spl = string.rsplit(':', maxsplit=1)
 58        if len(spl) == 1:
 59            if string[-1] == '.':
 60                return (None, string, None)
 61            return (string, None, None)
 62        if spl[0] == '':
 63            return (None, spl[1], ':')
 64        if spl[0][-1] == ':':
 65            sp0 = spl[0][:-1]
 66            return (None if sp0 == '' else sp0, None if spl[1] == '' else spl[1], '::')
 67        return (None if spl[0] == '' else spl[0], None if spl[1] == '' else spl[1], ':')
 68
 69    @staticmethod
 70    def decode_ntv_tab(ntv, ntv_to_val):
 71        '''Generate a tuple data from a Ntv tab value (bytes, string, json, Ntv object)
 72
 73        *parameters:*
 74
 75        - **ntv**: Ntv data to decode,
 76        - **ntv_to_val**: method to convert external value form ntv in internal Field value
 77
 78        *Returns tuple: (name, dtype, codec, parent, keys, coef, leng)*
 79
 80        - name (None or string): name of the Field
 81        - dtype (None or string): type of data
 82        - codec (list): list of Field codec values
 83        - parent (None or int): Field parent or None
 84        - keys (None or list): Field keys
 85        - coef (None or int): coef if primary Field else None
 86        - leng (int): length of the Field
 87        '''
 88        typ = ntv.type_str if ntv.ntv_type else None
 89        nam = ntv.name
 90        val = ntv_to_val(ntv)
 91        if ntv.__class__.__name__ == 'NtvSingle':
 92            return (nam, typ, [val], None, None, None, 1)
 93        if len(ntv) < 2 or len(ntv) > 3 or ntv[0].__class__.__name__ == 'NtvSingle':
 94            return (nam, typ, val, None, None, None, len(ntv))
 95
 96        ntvc = ntv[0]
 97        leng = max(len(ind) for ind in ntv)
 98        typc = ntvc.type_str if ntvc.ntv_type else None
 99        valc = ntv_to_val(ntvc)
100        if len(ntv) == 3 and ntv[1].__class__.__name__ == 'NtvSingle' and \
101                isinstance(ntv[1].val, (int, str)) and \
102                ntv[2].__class__.__name__ != 'NtvSingle' and \
103                isinstance(ntv[2][0].val, int):
104            return (nam, typc, valc, ntv[1].val, ntv[2].to_obj(), None, leng)
105        if len(ntv) == 2 and len(ntv[1]) == 1 and isinstance(ntv[1].val, (int, str)):
106            return (nam, typc, valc, ntv[1].val, None, None, leng)
107        if len(ntv) == 2 and len(ntv[1]) == 1 and isinstance(ntv[1].val, list):
108            leng = leng * ntv[1][0].val
109            return (nam, typc, valc, None, None, ntv[1][0].val, leng)
110        if len(ntv) == 2 and len(ntv[1]) > 1 and isinstance(ntv[1][0].val, int):
111            return (nam, typc, valc, None, ntv[1].to_obj(), None, leng)
112        return (nam, typ, val, None, None, None, len(ntv))
113
114    @staticmethod
115    def to_ntvpointer(jsonpointer, unique_root=False):
116        '''convert a json pointer inter a NTV pointer (string)
117
118        *parameters:*
119
120        - **jsonpointer**: String - json pointer to convert,
121        - **unique_root**: Boolean (default False) - True if the json root length is 1 '''
122        single = '/([0-9]+)(/[a-z])'
123        if unique_root and not '0' <= jsonpointer[1] <= '9':
124            return re.sub(single, r'\g<2>', jsonpointer)[1:]
125        return re.sub(single, r'\g<2>', jsonpointer)

The NtvUtil class includes static methods used by several NTV classes. NtvUtil is the parent class of Datatype, Namespace, Ntv.

class variables :

  • _namespaces_ : list of Namespace defined
  • _types_ : list of Datatype defined

static methods :

@staticmethod
def is_dictable(lis):
38    @staticmethod
39    def is_dictable(lis):
40        '''check if a list can be translated in a dict'''
41        keys = set()
42        for val in lis:
43            if not isinstance(val, dict):
44                return False
45            if len(val) != 1:
46                return False
47            keys.add(list(val)[0])
48        return len(keys) == len(lis)

check if a list can be translated in a dict

@staticmethod
def from_obj_name(string):
50    @staticmethod
51    def from_obj_name(string):
52        '''return a tuple with name, type_str and separator from string'''
53        if not isinstance(string, str):
54            raise NtvError('a json-name have to be str')
55        if string == '':
56            return (None, None, None)
57        spl = string.rsplit(':', maxsplit=1)
58        if len(spl) == 1:
59            if string[-1] == '.':
60                return (None, string, None)
61            return (string, None, None)
62        if spl[0] == '':
63            return (None, spl[1], ':')
64        if spl[0][-1] == ':':
65            sp0 = spl[0][:-1]
66            return (None if sp0 == '' else sp0, None if spl[1] == '' else spl[1], '::')
67        return (None if spl[0] == '' else spl[0], None if spl[1] == '' else spl[1], ':')

return a tuple with name, type_str and separator from string

@staticmethod
def decode_ntv_tab(ntv, ntv_to_val):
 69    @staticmethod
 70    def decode_ntv_tab(ntv, ntv_to_val):
 71        '''Generate a tuple data from a Ntv tab value (bytes, string, json, Ntv object)
 72
 73        *parameters:*
 74
 75        - **ntv**: Ntv data to decode,
 76        - **ntv_to_val**: method to convert external value form ntv in internal Field value
 77
 78        *Returns tuple: (name, dtype, codec, parent, keys, coef, leng)*
 79
 80        - name (None or string): name of the Field
 81        - dtype (None or string): type of data
 82        - codec (list): list of Field codec values
 83        - parent (None or int): Field parent or None
 84        - keys (None or list): Field keys
 85        - coef (None or int): coef if primary Field else None
 86        - leng (int): length of the Field
 87        '''
 88        typ = ntv.type_str if ntv.ntv_type else None
 89        nam = ntv.name
 90        val = ntv_to_val(ntv)
 91        if ntv.__class__.__name__ == 'NtvSingle':
 92            return (nam, typ, [val], None, None, None, 1)
 93        if len(ntv) < 2 or len(ntv) > 3 or ntv[0].__class__.__name__ == 'NtvSingle':
 94            return (nam, typ, val, None, None, None, len(ntv))
 95
 96        ntvc = ntv[0]
 97        leng = max(len(ind) for ind in ntv)
 98        typc = ntvc.type_str if ntvc.ntv_type else None
 99        valc = ntv_to_val(ntvc)
100        if len(ntv) == 3 and ntv[1].__class__.__name__ == 'NtvSingle' and \
101                isinstance(ntv[1].val, (int, str)) and \
102                ntv[2].__class__.__name__ != 'NtvSingle' and \
103                isinstance(ntv[2][0].val, int):
104            return (nam, typc, valc, ntv[1].val, ntv[2].to_obj(), None, leng)
105        if len(ntv) == 2 and len(ntv[1]) == 1 and isinstance(ntv[1].val, (int, str)):
106            return (nam, typc, valc, ntv[1].val, None, None, leng)
107        if len(ntv) == 2 and len(ntv[1]) == 1 and isinstance(ntv[1].val, list):
108            leng = leng * ntv[1][0].val
109            return (nam, typc, valc, None, None, ntv[1][0].val, leng)
110        if len(ntv) == 2 and len(ntv[1]) > 1 and isinstance(ntv[1][0].val, int):
111            return (nam, typc, valc, None, ntv[1].to_obj(), None, leng)
112        return (nam, typ, val, None, None, None, len(ntv))

Generate a tuple data from a Ntv tab value (bytes, string, json, Ntv object)

parameters:

  • ntv: Ntv data to decode,
  • ntv_to_val: method to convert external value form ntv in internal Field value

Returns tuple: (name, dtype, codec, parent, keys, coef, leng)

  • name (None or string): name of the Field
  • dtype (None or string): type of data
  • codec (list): list of Field codec values
  • parent (None or int): Field parent or None
  • keys (None or list): Field keys
  • coef (None or int): coef if primary Field else None
  • leng (int): length of the Field
@staticmethod
def to_ntvpointer(jsonpointer, unique_root=False):
114    @staticmethod
115    def to_ntvpointer(jsonpointer, unique_root=False):
116        '''convert a json pointer inter a NTV pointer (string)
117
118        *parameters:*
119
120        - **jsonpointer**: String - json pointer to convert,
121        - **unique_root**: Boolean (default False) - True if the json root length is 1 '''
122        single = '/([0-9]+)(/[a-z])'
123        if unique_root and not '0' <= jsonpointer[1] <= '9':
124            return re.sub(single, r'\g<2>', jsonpointer)[1:]
125        return re.sub(single, r'\g<2>', jsonpointer)

convert a json pointer inter a NTV pointer (string)

parameters:

  • jsonpointer: String - json pointer to convert,
  • unique_root: Boolean (default False) - True if the json root length is 1
class NtvConnector(abc.ABC):
128class NtvConnector(ABC):
129    ''' The NtvConnector class is an abstract class used by all NTV connectors
130    for conversion between NTV-JSON data and NTV-OBJ data.
131
132    A NtvConnector child is defined by:
133    - clas_obj: str - define the class name of the object to convert
134    - clas_typ: str - define the Datatype of the converted object
135    - to_obj_ntv: method - converter from JsonNTV to the object
136    - to_json_ntv: method - converter from the object to JsonNTV
137
138    *class method :*
139    - `connector`
140    - `dic_connec`
141    - `castable` (@property)
142    - `dic_obj` (@property)
143    - `dic_type` (@property)
144
145    *abstract method*
146    - `to_obj_ntv`
147    - `to_json_ntv`
148
149    *static method*
150    - `cast`
151    - `uncast`
152    - `is_json_class`
153    - `is_json`
154    - `init_ntv_keys`
155    '''
156
157    DIC_NTV_CL = {'NtvSingle': 'ntv', 'NtvList': 'ntv'}
158    DIC_GEO_CL = {'Point': 'point', 'MultiPoint': 'multipoint', 'LineString': 'line',
159                  'MultiLineString': 'multiline', 'Polygon': 'polygon',
160                  'MultiPolygon': 'multipolygon'}
161    DIC_DAT_CL = {'date': 'date', 'time': 'time', 'datetime': 'datetime'}
162    DIC_FCT = {'date': datetime.date.fromisoformat, 'time': datetime.time.fromisoformat,
163               'datetime': datetime.datetime.fromisoformat}
164    DIC_GEO = {'point': 'point', 'multipoint': 'multipoint', 'line': 'linestring',
165               'multiline': 'multilinestring', 'polygon': 'polygon',
166               'multipolygon': 'multipolygon'}
167    DIC_CBOR = {'point': False, 'multipoint': False, 'line': False,
168                'multiline': False, 'polygon': False, 'multipolygon': False,
169                'date': True, 'time': False, 'datetime': True}
170    DIC_OBJ = {'tab': 'DataFrameConnec', 'field': 'SeriesConnec',
171               'ndarray': 'NdarrayConnec',
172               'point': 'ShapelyConnec', 'multipoint': 'ShapelyConnec',
173               'line': 'ShapelyConnec', 'multiline': 'ShapelyConnec',
174               'polygon': 'ShapelyConnec', 'multipolygon': 'ShapelyConnec',
175               'other': None}
176
177    @classmethod
178    @property
179    def castable(cls):
180        '''return a list with class_name allowed for json-obj conversion'''
181        return ['str', 'int', 'bool', 'float', 'dict', 'tuple', 'NoneType',
182                'NtvSingle', 'NtvList'] \
183            + list(NtvConnector.DIC_GEO_CL.keys()) \
184            + list(NtvConnector.DIC_FCT.keys()) \
185            + list(NtvConnector.dic_connec().keys())
186
187    @classmethod
188    @property
189    def dic_obj(cls):
190        '''return a dict with the connectors: { type: class_connec_name }'''
191        return {clas.clas_typ: clas.__name__ for clas in cls.__subclasses__()} |\
192            NtvConnector.DIC_OBJ
193
194    @classmethod
195    @property
196    def dic_type(cls):
197        '''return a dict with the connectors: { class_obj_name: type }'''
198        return {clas.clas_obj: clas.clas_typ for clas in cls.__subclasses__()} |\
199            NtvConnector.DIC_GEO_CL | NtvConnector.DIC_DAT_CL | NtvConnector.DIC_NTV_CL
200
201    @classmethod
202    def connector(cls):
203        '''return a dict with the connectors: { class_connec_name: class_connec }'''
204        return {clas.__name__: clas for clas in cls.__subclasses__()}
205
206    @classmethod
207    def dic_connec(cls):
208        '''return a dict with the clas associated to the connector:
209        { class_obj_name: class_connec_name }'''
210        return {clas.clas_obj: clas.__name__ for clas in cls.__subclasses__()}
211
212    @staticmethod
213    @abstractmethod
214    def to_obj_ntv(ntv_value, **kwargs):
215        ''' abstract - convert ntv_value into the return object'''
216
217    @staticmethod
218    @abstractmethod
219    def to_json_ntv(value, name=None, typ=None):
220        ''' abstract - convert NTV object (value, name, type) into the NTV json
221        (json-value, name, type)'''
222
223    @staticmethod
224    def cast(data, name=None, type_str=None):
225        '''return JSON-NTV conversion (json_value, name, type_str) of the NTV entity
226        defined in parameters.
227
228        *Parameters*
229
230        - **data**: NtvSingle entity or NTVvalue of the NTV entity
231        - **name** : String (default None) - name of the NTV entity
232        - **type_str**: String (default None) - type of the NTV entity
233        '''
234        clas = data.__class__.__name__
235        if clas == 'NtvSingle':
236            name = data.ntv_name
237            type_str = data.type_str
238            data = data.ntv_value
239        dic_geo_cl = NtvConnector.DIC_GEO_CL
240        dic_connec = NtvConnector.dic_connec()
241        match clas:
242            case 'int' | 'float' | 'bool' | 'str':
243                return (data, name, type_str)
244            case 'dict':
245                return ({key: NtvConnector.cast(val, name, type_str)[0]
246                         for key, val in data.items()}, name, type_str)
247            case 'list':
248                return ([NtvConnector.cast(val, name, type_str)[0] for val in data],
249                        name, NtvConnector._typ_obj(data) if not type_str else type_str)
250            case 'tuple':
251                return (list(data), name, 'array' if not type_str else type_str)
252            case 'date' | 'time' | 'datetime':
253                return (data.isoformat(), name, clas if not type_str else type_str)
254            case 'Point' | 'MultiPoint' | 'LineString' | 'MultiLineString' | \
255                    'Polygon' | 'MultiPolygon':
256                return (NtvConnector.connector()[dic_connec['geometry']].to_json_ntv(data)[0],
257                        name, dic_geo_cl[data.__class__.__name__] if not type_str else type_str)
258            case 'NtvSingle' | 'NtvList':
259                return (data.to_obj(), name, 'ntv' if not type_str else type_str)
260            case _:
261                connec = None
262                if clas in dic_connec and dic_connec[clas] in NtvConnector.connector():
263                    connec = NtvConnector.connector()[dic_connec[clas]]
264                if connec:
265                    return connec.to_json_ntv(data, name, type_str)
266                raise NtvError(
267                    'connector is not defined for the class : ', clas)
268        return (data, name, type_str)
269
270    @staticmethod
271    def uncast(value, name=None, type_str=None, **kwargs):
272        '''return OBJ-NTV conversion (obj_value, name, type_str) of a NTV entity
273
274        *Parameters*
275
276        - **value**: NtvSingle entity or NTVvalue of the NTV entity
277        - **name** : String (default None) - name of the NTV entity
278        - **type_str**: String (default None) - type of the NTV entity
279        '''
280        if type_str == 'json':
281            return (value, name, type_str)
282        typebase_str = type_str
283        if value.__class__.__name__ == 'NtvSingle':
284            if not (type_str in set(NtvConnector.dic_type.values()) and
285                    NtvConnector.is_json(value) or type_str is None):
286                return (value.ntv_value, value.name, value.type_str)
287            type_str = value.type_str if value.ntv_type else None
288            typebase_str = value.typebase_str if value.ntv_type else None
289            name = value.ntv_name
290            value = value.ntv_value
291        option = {'dicobj': {}, 'format': 'json', 'type_obj': False} | kwargs
292        value_obj = NtvConnector._uncast_val(value, typebase_str, **option)
293        return (value_obj, name, type_str if type_str else NtvConnector._typ_obj(value_obj))
294
295    @staticmethod
296    def _typ_obj(value):
297        if isinstance(value, dict):
298            return NtvConnector._typ_obj(list(value.values()))
299        if isinstance(value, (tuple, list)):
300            for val in value:
301                typ = NtvConnector._typ_obj(val)
302                if typ:
303                    return typ
304            return None
305        return NtvConnector.dic_type.get(value.__class__.__name__)
306
307    @staticmethod
308    def _uncast_val(value, type_n, **option):
309        '''return value from ntv value
310
311        *Parameters*
312
313        - **value**: NtvSingle entity or NTVvalue of the NTV entity
314        '''
315        dic_fct = NtvConnector.DIC_FCT
316        dic_geo = NtvConnector.DIC_GEO
317        dic_obj = NtvConnector.dic_obj | option['dicobj']
318        dic_cbor = NtvConnector.DIC_CBOR
319        if not type_n or type_n == 'json' or (option['format'] == 'cbor' and
320                                              not dic_cbor.get(type_n, False)):
321            return value
322        if type_n in dic_fct:
323            if isinstance(value, (tuple, list)):
324                return [NtvConnector._uncast_val(val, type_n, **option) for val in value]
325            if isinstance(value, dict):
326                return {key: NtvConnector._uncast_val(val, type_n, **option)
327                        for key, val in value.items()}
328            return dic_fct[type_n](value)
329        if type_n == 'array':
330            return tuple(value)
331        if type_n == 'ntv':
332            from json_ntv.ntv import Ntv
333            return Ntv.from_obj(value)
334        if type_n in dic_geo:
335            option['type_geo'] = dic_geo[type_n]
336        connec = None
337        if type_n in dic_obj and \
338                dic_obj[type_n] in NtvConnector.connector():
339            connec = NtvConnector.connector()[dic_obj[type_n]]
340        elif dic_obj['other'] in NtvConnector.connector():
341            connec = NtvConnector.connector()['other']
342        if connec:
343            return connec.to_obj_ntv(value, **option)
344        return value
345
346    @staticmethod
347    def is_json_class(val):
348        ''' return True if val is a json type'''
349        return val is None or isinstance(val, (list, int, str, float, bool, dict))
350
351    @staticmethod
352    def is_json(obj):
353        ''' check if obj is a json structure and return True if obj is a json-value
354
355        *Parameters*
356
357        - **obj** : object to check
358        - **ntvobj** : boolean (default False) - if True NTV class value are accepted'''
359        if obj is None:
360            return True
361        is_js = NtvConnector.is_json
362        match obj:
363            case str() | int() | float() | bool() as obj:
364                return True
365            case list() | tuple() as obj:
366                if not obj:
367                    return True
368                return min(is_js(obj_in) for obj_in in obj)
369            case dict() as obj:
370                if not obj:
371                    return True
372                if not min(isinstance(key, str) for key in obj.keys()):
373                    raise NtvError('key in dict in not string')
374                return min(is_js(obj_in) for obj_in in obj.values())
375            case _:
376                if obj.__class__.__name__ not in NtvConnector.castable:
377                    raise NtvError(obj.__class__.__name__ +
378                                   ' is not valid for NTV')
379                return False
380
381    @staticmethod
382    def keysfromderkeys(parentkeys, derkeys):
383        '''return keys from parent keys and derkeys
384
385        *Parameters*
386
387        - **parentkeys** : list of keys from parent
388        - **derkeys** : list of derived keys
389
390        *Returns* : list of keys'''
391        return [derkeys[pkey] for pkey in parentkeys]
392
393    @staticmethod
394    def encode_coef(lis):
395        '''Generate a repetition coefficient for periodic list'''
396        if len(lis) < 2:
397            return 0
398        coef = 1
399        while coef != len(lis):
400            if lis[coef-1] != lis[coef]:
401                break
402            coef += 1
403        if (not len(lis) % (coef * (max(lis) + 1)) and
404                lis == NtvConnector.keysfromcoef(coef, max(lis) + 1, len(lis))):
405            return coef
406        return 0
407
408    @staticmethod
409    def keysfromcoef(coef, period, leng=None):
410        ''' return a list of keys with periodic structure'''
411        if not leng:
412            leng = coef * period
413        return None if not (coef and period) else [(ind % (coef * period)) // coef
414                                                   for ind in range(leng)]
415
416    @staticmethod
417    def format_field(lidx_decode, leng):
418        ''' return a list of format'''
419        # name: 0, type: 1, codec: 2, parent: 3, keys: 4, coef: 5, leng: 6
420        field_format = []
421        for field_decode in lidx_decode:
422            name, typ, codec, parent, keys, coef, length = field_decode
423            if (keys, parent, coef) == (None, None, None):  # full or unique
424                if len(codec) == 1:  # unique
425                    field_format.append('unique')
426                elif len(codec) == leng:    # full
427                    field_format.append('full')
428                else:
429                    raise NtvError('impossible to generate keys')
430                continue
431            if keys and len(keys) > 1 and parent is None:  # complete
432                field_format.append('complete')
433                continue
434            if coef:  # primary
435                field_format.append('primary')
436                continue
437            if parent is None:
438                raise NtvError('keys not referenced')
439            if not keys:    # implicit
440                field_format.append('implicit')
441                continue
442            field_format.append('relative')
443        return field_format
444
445    @staticmethod
446    def init_ntv_keys(ind, lidx, leng):
447        ''' initialization of explicit keys data in lidx object of tabular data'''
448        # name: 0, type: 1, codec: 2, parent: 3, keys: 4, coef: 5, leng: 6
449        name, typ, codec, parent, keys, coef, length = lidx[ind]
450        if (keys, parent, coef) == (None, None, None):  # full or unique
451            if len(codec) == 1:  # unique
452                lidx[ind][4] = [0] * leng
453            elif len(codec) == leng:    # full
454                lidx[ind][4] = list(range(leng))
455            else:
456                raise NtvError('impossible to generate keys')
457            return
458        if keys and len(keys) > 1 and parent is None:  # complete
459            return
460        if coef:  # primary
461            lidx[ind][4] = [(ikey % (coef * len(codec))) //
462                            coef for ikey in range(leng)]
463            lidx[ind][3] = None
464            return
465        if parent is None:
466            raise NtvError('keys not referenced')
467        if not lidx[parent][4] or len(lidx[parent][4]) != leng:
468            NtvConnector.init_ntv_keys(parent, lidx, leng)
469        if not keys and len(codec) == len(lidx[parent][2]):    # implicit
470            lidx[ind][4] = lidx[parent][4]
471            lidx[ind][3] = None
472            return
473        lidx[ind][4] = NtvConnector.keysfromderkeys(
474            lidx[parent][4], keys)  # relative
475        lidx[ind][3] = None
476        return

The NtvConnector class is an abstract class used by all NTV connectors for conversion between NTV-JSON data and NTV-OBJ data.

A NtvConnector child is defined by:

  • clas_obj: str - define the class name of the object to convert
  • clas_typ: str - define the Datatype of the converted object
  • to_obj_ntv: method - converter from JsonNTV to the object
  • to_json_ntv: method - converter from the object to JsonNTV

class method :

abstract method

static method

DIC_NTV_CL = {'NtvSingle': 'ntv', 'NtvList': 'ntv'}
DIC_GEO_CL = {'Point': 'point', 'MultiPoint': 'multipoint', 'LineString': 'line', 'MultiLineString': 'multiline', 'Polygon': 'polygon', 'MultiPolygon': 'multipolygon'}
DIC_DAT_CL = {'date': 'date', 'time': 'time', 'datetime': 'datetime'}
DIC_FCT = {'date': <built-in method fromisoformat of type object>, 'time': <built-in method fromisoformat of type object>, 'datetime': <built-in method fromisoformat of type object>}
DIC_GEO = {'point': 'point', 'multipoint': 'multipoint', 'line': 'linestring', 'multiline': 'multilinestring', 'polygon': 'polygon', 'multipolygon': 'multipolygon'}
DIC_CBOR = {'point': False, 'multipoint': False, 'line': False, 'multiline': False, 'polygon': False, 'multipolygon': False, 'date': True, 'time': False, 'datetime': True}
DIC_OBJ = {'tab': 'DataFrameConnec', 'field': 'SeriesConnec', 'ndarray': 'NdarrayConnec', 'point': 'ShapelyConnec', 'multipoint': 'ShapelyConnec', 'line': 'ShapelyConnec', 'multiline': 'ShapelyConnec', 'polygon': 'ShapelyConnec', 'multipolygon': 'ShapelyConnec', 'other': None}
castable
177    @classmethod
178    @property
179    def castable(cls):
180        '''return a list with class_name allowed for json-obj conversion'''
181        return ['str', 'int', 'bool', 'float', 'dict', 'tuple', 'NoneType',
182                'NtvSingle', 'NtvList'] \
183            + list(NtvConnector.DIC_GEO_CL.keys()) \
184            + list(NtvConnector.DIC_FCT.keys()) \
185            + list(NtvConnector.dic_connec().keys())

return a list with class_name allowed for json-obj conversion

dic_obj
187    @classmethod
188    @property
189    def dic_obj(cls):
190        '''return a dict with the connectors: { type: class_connec_name }'''
191        return {clas.clas_typ: clas.__name__ for clas in cls.__subclasses__()} |\
192            NtvConnector.DIC_OBJ

return a dict with the connectors: { type: class_connec_name }

dic_type
194    @classmethod
195    @property
196    def dic_type(cls):
197        '''return a dict with the connectors: { class_obj_name: type }'''
198        return {clas.clas_obj: clas.clas_typ for clas in cls.__subclasses__()} |\
199            NtvConnector.DIC_GEO_CL | NtvConnector.DIC_DAT_CL | NtvConnector.DIC_NTV_CL

return a dict with the connectors: { class_obj_name: type }

@classmethod
def connector(cls):
201    @classmethod
202    def connector(cls):
203        '''return a dict with the connectors: { class_connec_name: class_connec }'''
204        return {clas.__name__: clas for clas in cls.__subclasses__()}

return a dict with the connectors: { class_connec_name: class_connec }

@classmethod
def dic_connec(cls):
206    @classmethod
207    def dic_connec(cls):
208        '''return a dict with the clas associated to the connector:
209        { class_obj_name: class_connec_name }'''
210        return {clas.clas_obj: clas.__name__ for clas in cls.__subclasses__()}

return a dict with the clas associated to the connector: { class_obj_name: class_connec_name }

@staticmethod
@abstractmethod
def to_obj_ntv(ntv_value, **kwargs):
212    @staticmethod
213    @abstractmethod
214    def to_obj_ntv(ntv_value, **kwargs):
215        ''' abstract - convert ntv_value into the return object'''

abstract - convert ntv_value into the return object

@staticmethod
@abstractmethod
def to_json_ntv(value, name=None, typ=None):
217    @staticmethod
218    @abstractmethod
219    def to_json_ntv(value, name=None, typ=None):
220        ''' abstract - convert NTV object (value, name, type) into the NTV json
221        (json-value, name, type)'''

abstract - convert NTV object (value, name, type) into the NTV json (json-value, name, type)

@staticmethod
def cast(data, name=None, type_str=None):
223    @staticmethod
224    def cast(data, name=None, type_str=None):
225        '''return JSON-NTV conversion (json_value, name, type_str) of the NTV entity
226        defined in parameters.
227
228        *Parameters*
229
230        - **data**: NtvSingle entity or NTVvalue of the NTV entity
231        - **name** : String (default None) - name of the NTV entity
232        - **type_str**: String (default None) - type of the NTV entity
233        '''
234        clas = data.__class__.__name__
235        if clas == 'NtvSingle':
236            name = data.ntv_name
237            type_str = data.type_str
238            data = data.ntv_value
239        dic_geo_cl = NtvConnector.DIC_GEO_CL
240        dic_connec = NtvConnector.dic_connec()
241        match clas:
242            case 'int' | 'float' | 'bool' | 'str':
243                return (data, name, type_str)
244            case 'dict':
245                return ({key: NtvConnector.cast(val, name, type_str)[0]
246                         for key, val in data.items()}, name, type_str)
247            case 'list':
248                return ([NtvConnector.cast(val, name, type_str)[0] for val in data],
249                        name, NtvConnector._typ_obj(data) if not type_str else type_str)
250            case 'tuple':
251                return (list(data), name, 'array' if not type_str else type_str)
252            case 'date' | 'time' | 'datetime':
253                return (data.isoformat(), name, clas if not type_str else type_str)
254            case 'Point' | 'MultiPoint' | 'LineString' | 'MultiLineString' | \
255                    'Polygon' | 'MultiPolygon':
256                return (NtvConnector.connector()[dic_connec['geometry']].to_json_ntv(data)[0],
257                        name, dic_geo_cl[data.__class__.__name__] if not type_str else type_str)
258            case 'NtvSingle' | 'NtvList':
259                return (data.to_obj(), name, 'ntv' if not type_str else type_str)
260            case _:
261                connec = None
262                if clas in dic_connec and dic_connec[clas] in NtvConnector.connector():
263                    connec = NtvConnector.connector()[dic_connec[clas]]
264                if connec:
265                    return connec.to_json_ntv(data, name, type_str)
266                raise NtvError(
267                    'connector is not defined for the class : ', clas)
268        return (data, name, type_str)

return JSON-NTV conversion (json_value, name, type_str) of the NTV entity defined in parameters.

Parameters

  • data: NtvSingle entity or NTVvalue of the NTV entity
  • name : String (default None) - name of the NTV entity
  • type_str: String (default None) - type of the NTV entity
@staticmethod
def uncast(value, name=None, type_str=None, **kwargs):
270    @staticmethod
271    def uncast(value, name=None, type_str=None, **kwargs):
272        '''return OBJ-NTV conversion (obj_value, name, type_str) of a NTV entity
273
274        *Parameters*
275
276        - **value**: NtvSingle entity or NTVvalue of the NTV entity
277        - **name** : String (default None) - name of the NTV entity
278        - **type_str**: String (default None) - type of the NTV entity
279        '''
280        if type_str == 'json':
281            return (value, name, type_str)
282        typebase_str = type_str
283        if value.__class__.__name__ == 'NtvSingle':
284            if not (type_str in set(NtvConnector.dic_type.values()) and
285                    NtvConnector.is_json(value) or type_str is None):
286                return (value.ntv_value, value.name, value.type_str)
287            type_str = value.type_str if value.ntv_type else None
288            typebase_str = value.typebase_str if value.ntv_type else None
289            name = value.ntv_name
290            value = value.ntv_value
291        option = {'dicobj': {}, 'format': 'json', 'type_obj': False} | kwargs
292        value_obj = NtvConnector._uncast_val(value, typebase_str, **option)
293        return (value_obj, name, type_str if type_str else NtvConnector._typ_obj(value_obj))

return OBJ-NTV conversion (obj_value, name, type_str) of a NTV entity

Parameters

  • value: NtvSingle entity or NTVvalue of the NTV entity
  • name : String (default None) - name of the NTV entity
  • type_str: String (default None) - type of the NTV entity
@staticmethod
def is_json_class(val):
346    @staticmethod
347    def is_json_class(val):
348        ''' return True if val is a json type'''
349        return val is None or isinstance(val, (list, int, str, float, bool, dict))

return True if val is a json type

@staticmethod
def is_json(obj):
351    @staticmethod
352    def is_json(obj):
353        ''' check if obj is a json structure and return True if obj is a json-value
354
355        *Parameters*
356
357        - **obj** : object to check
358        - **ntvobj** : boolean (default False) - if True NTV class value are accepted'''
359        if obj is None:
360            return True
361        is_js = NtvConnector.is_json
362        match obj:
363            case str() | int() | float() | bool() as obj:
364                return True
365            case list() | tuple() as obj:
366                if not obj:
367                    return True
368                return min(is_js(obj_in) for obj_in in obj)
369            case dict() as obj:
370                if not obj:
371                    return True
372                if not min(isinstance(key, str) for key in obj.keys()):
373                    raise NtvError('key in dict in not string')
374                return min(is_js(obj_in) for obj_in in obj.values())
375            case _:
376                if obj.__class__.__name__ not in NtvConnector.castable:
377                    raise NtvError(obj.__class__.__name__ +
378                                   ' is not valid for NTV')
379                return False

check if obj is a json structure and return True if obj is a json-value

Parameters

  • obj : object to check
  • ntvobj : boolean (default False) - if True NTV class value are accepted
@staticmethod
def keysfromderkeys(parentkeys, derkeys):
381    @staticmethod
382    def keysfromderkeys(parentkeys, derkeys):
383        '''return keys from parent keys and derkeys
384
385        *Parameters*
386
387        - **parentkeys** : list of keys from parent
388        - **derkeys** : list of derived keys
389
390        *Returns* : list of keys'''
391        return [derkeys[pkey] for pkey in parentkeys]

return keys from parent keys and derkeys

Parameters

  • parentkeys : list of keys from parent
  • derkeys : list of derived keys

Returns : list of keys

@staticmethod
def encode_coef(lis):
393    @staticmethod
394    def encode_coef(lis):
395        '''Generate a repetition coefficient for periodic list'''
396        if len(lis) < 2:
397            return 0
398        coef = 1
399        while coef != len(lis):
400            if lis[coef-1] != lis[coef]:
401                break
402            coef += 1
403        if (not len(lis) % (coef * (max(lis) + 1)) and
404                lis == NtvConnector.keysfromcoef(coef, max(lis) + 1, len(lis))):
405            return coef
406        return 0

Generate a repetition coefficient for periodic list

@staticmethod
def keysfromcoef(coef, period, leng=None):
408    @staticmethod
409    def keysfromcoef(coef, period, leng=None):
410        ''' return a list of keys with periodic structure'''
411        if not leng:
412            leng = coef * period
413        return None if not (coef and period) else [(ind % (coef * period)) // coef
414                                                   for ind in range(leng)]

return a list of keys with periodic structure

@staticmethod
def format_field(lidx_decode, leng):
416    @staticmethod
417    def format_field(lidx_decode, leng):
418        ''' return a list of format'''
419        # name: 0, type: 1, codec: 2, parent: 3, keys: 4, coef: 5, leng: 6
420        field_format = []
421        for field_decode in lidx_decode:
422            name, typ, codec, parent, keys, coef, length = field_decode
423            if (keys, parent, coef) == (None, None, None):  # full or unique
424                if len(codec) == 1:  # unique
425                    field_format.append('unique')
426                elif len(codec) == leng:    # full
427                    field_format.append('full')
428                else:
429                    raise NtvError('impossible to generate keys')
430                continue
431            if keys and len(keys) > 1 and parent is None:  # complete
432                field_format.append('complete')
433                continue
434            if coef:  # primary
435                field_format.append('primary')
436                continue
437            if parent is None:
438                raise NtvError('keys not referenced')
439            if not keys:    # implicit
440                field_format.append('implicit')
441                continue
442            field_format.append('relative')
443        return field_format

return a list of format

@staticmethod
def init_ntv_keys(ind, lidx, leng):
445    @staticmethod
446    def init_ntv_keys(ind, lidx, leng):
447        ''' initialization of explicit keys data in lidx object of tabular data'''
448        # name: 0, type: 1, codec: 2, parent: 3, keys: 4, coef: 5, leng: 6
449        name, typ, codec, parent, keys, coef, length = lidx[ind]
450        if (keys, parent, coef) == (None, None, None):  # full or unique
451            if len(codec) == 1:  # unique
452                lidx[ind][4] = [0] * leng
453            elif len(codec) == leng:    # full
454                lidx[ind][4] = list(range(leng))
455            else:
456                raise NtvError('impossible to generate keys')
457            return
458        if keys and len(keys) > 1 and parent is None:  # complete
459            return
460        if coef:  # primary
461            lidx[ind][4] = [(ikey % (coef * len(codec))) //
462                            coef for ikey in range(leng)]
463            lidx[ind][3] = None
464            return
465        if parent is None:
466            raise NtvError('keys not referenced')
467        if not lidx[parent][4] or len(lidx[parent][4]) != leng:
468            NtvConnector.init_ntv_keys(parent, lidx, leng)
469        if not keys and len(codec) == len(lidx[parent][2]):    # implicit
470            lidx[ind][4] = lidx[parent][4]
471            lidx[ind][3] = None
472            return
473        lidx[ind][4] = NtvConnector.keysfromderkeys(
474            lidx[parent][4], keys)  # relative
475        lidx[ind][3] = None
476        return

initialization of explicit keys data in lidx object of tabular data

class NtvTree:
479class NtvTree:
480    ''' The NtvTree class is an iterator class used to traverse a NTV tree structure.
481    Some other methods give tree indicators and data.
482
483    *Attributes :*
484
485    - **ntv** : Ntv entity
486    - **_node**:  Ntv entity - node pointer
487    - **_stack**:  list - stack used to store context in recursive methods
488
489    *dynamic values (@property)*
490    - `breadth`
491    - `size`
492    - `height`
493    - `adjacency_list`
494    - `nodes`
495    - `dic_nodes`
496    - `leaf_nodes`
497    - `inner_nodes`
498    '''
499
500    def __init__(self, ntv):
501        ''' the parameter of the constructor is the Ntv entity'''
502        self._ntv = ntv
503        self._node = None
504        self._stack = []
505
506    def __iter__(self):
507        ''' iterator without initialization'''
508        return self
509
510    def __next__(self):
511        ''' return next node in the tree'''
512        if self._node is None:
513            self._node = self._ntv
514        elif len(self._node) == 0:
515            raise StopIteration
516        elif self._node.__class__.__name__ == 'NtvList':
517            self._next_down()
518        else:
519            self._next_up()
520        return self._node
521
522    def _next_down(self):
523        ''' find the next subchild node'''
524
525        self._node = self._node[0]
526        self._stack.append(0)
527
528    def _next_up(self):
529        ''' find the next sibling or ancestor node'''
530        parent = self._node.parent
531        if not parent or self._node == self._ntv:
532            raise StopIteration
533        ind = self._stack[-1]
534        if ind < len(parent) - 1:  # if ind is not the last
535            self._node = parent[ind + 1]
536            self._stack[-1] += 1
537        else:
538            if parent == self._ntv:
539                raise StopIteration
540            self._node = parent
541            self._stack.pop()
542            self._next_up()
543
544    @property
545    def breadth(self):
546        ''' return the number of leaves'''
547        return len(self.leaf_nodes)
548
549    @property
550    def size(self):
551        ''' return the number of nodes'''
552        return len(self.nodes)
553
554    @property
555    def height(self):
556        ''' return the height of the tree'''
557        return max(len(node.pointer()) for node in self.__class__(self._ntv)) - 1
558
559    @property
560    def adjacency_list(self):
561        ''' return a dict with the list of child nodes for each parent node'''
562        return {node: node.val for node in self.inner_nodes}
563
564    @property
565    def nodes(self):
566        ''' return the list of nodes according to the DFS preordering algorithm'''
567        return list(self.__class__(self._ntv))
568
569    @property
570    def dic_nodes(self):
571        ''' return a dict of nodes according to the DFS preordering algorithm'''
572        return {node.ntv_name: node for node in self.__class__(self._ntv)
573                if node.ntv_name}
574
575    @property
576    def leaf_nodes(self):
577        ''' return the list of leaf nodes according to the DFS preordering algorithm'''
578        return [node for node in self.__class__(self._ntv)
579                if node.__class__.__name__ == 'NtvSingle']
580
581    @property
582    def inner_nodes(self):
583        ''' return the list of inner nodes according to the DFS preordering algorithm'''
584        return [node for node in self.__class__(self._ntv)
585                if node.__class__.__name__ == 'NtvList']

The NtvTree class is an iterator class used to traverse a NTV tree structure. Some other methods give tree indicators and data.

Attributes :

  • ntv : Ntv entity
  • _node: Ntv entity - node pointer
  • _stack: list - stack used to store context in recursive methods

dynamic values (@property)

NtvTree(ntv)
500    def __init__(self, ntv):
501        ''' the parameter of the constructor is the Ntv entity'''
502        self._ntv = ntv
503        self._node = None
504        self._stack = []

the parameter of the constructor is the Ntv entity

breadth
544    @property
545    def breadth(self):
546        ''' return the number of leaves'''
547        return len(self.leaf_nodes)

return the number of leaves

size
549    @property
550    def size(self):
551        ''' return the number of nodes'''
552        return len(self.nodes)

return the number of nodes

height
554    @property
555    def height(self):
556        ''' return the height of the tree'''
557        return max(len(node.pointer()) for node in self.__class__(self._ntv)) - 1

return the height of the tree

adjacency_list
559    @property
560    def adjacency_list(self):
561        ''' return a dict with the list of child nodes for each parent node'''
562        return {node: node.val for node in self.inner_nodes}

return a dict with the list of child nodes for each parent node

nodes
564    @property
565    def nodes(self):
566        ''' return the list of nodes according to the DFS preordering algorithm'''
567        return list(self.__class__(self._ntv))

return the list of nodes according to the DFS preordering algorithm

dic_nodes
569    @property
570    def dic_nodes(self):
571        ''' return a dict of nodes according to the DFS preordering algorithm'''
572        return {node.ntv_name: node for node in self.__class__(self._ntv)
573                if node.ntv_name}

return a dict of nodes according to the DFS preordering algorithm

leaf_nodes
575    @property
576    def leaf_nodes(self):
577        ''' return the list of leaf nodes according to the DFS preordering algorithm'''
578        return [node for node in self.__class__(self._ntv)
579                if node.__class__.__name__ == 'NtvSingle']

return the list of leaf nodes according to the DFS preordering algorithm

inner_nodes
581    @property
582    def inner_nodes(self):
583        ''' return the list of inner nodes according to the DFS preordering algorithm'''
584        return [node for node in self.__class__(self._ntv)
585                if node.__class__.__name__ == 'NtvList']

return the list of inner nodes according to the DFS preordering algorithm

class NtvJsonEncoder(json.encoder.JSONEncoder):
588class NtvJsonEncoder(json.JSONEncoder):
589    """json encoder for Ntv data"""
590
591    def default(self, o):
592        try:
593            return NtvConnector.cast(o)[0]
594        except NtvError:
595            return json.JSONEncoder.default(self, o)
596        if isinstance(o, (datetime.datetime, datetime.date, datetime.time)):
597            return o.isoformat()
598        return json.JSONEncoder.default(self, o)

json encoder for Ntv data

def default(self, o):
591    def default(self, o):
592        try:
593            return NtvConnector.cast(o)[0]
594        except NtvError:
595            return json.JSONEncoder.default(self, o)
596        if isinstance(o, (datetime.datetime, datetime.date, datetime.time)):
597            return o.isoformat()
598        return json.JSONEncoder.default(self, o)

Implement this method in a subclass such that it returns a serializable object for o, or calls the base implementation (to raise a TypeError).

For example, to support arbitrary iterators, you could implement default like this::

def default(self, o):
    try:
        iterable = iter(o)
    except TypeError:
        pass
    else:
        return list(iterable)
    # Let the base class default method raise the TypeError
    return JSONEncoder.default(self, o)
Inherited Members
json.encoder.JSONEncoder
JSONEncoder
item_separator
key_separator
skipkeys
ensure_ascii
check_circular
allow_nan
sort_keys
indent
encode
iterencode
class NtvError(builtins.Exception):
601class NtvError(Exception):
602    ''' NTV Exception'''

NTV Exception

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback
add_note
args