NTV.json_ntv.ntv_util

Created on Feb 27 2023

@author: Philippe@loco-labs.io

The ntv_util module is part of the NTV.json_ntv package (specification document).

It contains the classes NtvUtil, NtvConnector, NtvTree, NtvJsonEncoder and NtvError for NTV entities.

  1# -*- coding: utf-8 -*-
  2"""
  3Created on Feb 27 2023
  4
  5@author: Philippe@loco-labs.io
  6
  7The `ntv_util` module is part of the `NTV.json_ntv` package ([specification document](
  8https://github.com/loco-philippe/NTV/blob/main/documentation/JSON-NTV-standard.pdf)).
  9
 10It contains the classes `NtvUtil`, `NtvConnector`, `NtvTree`, `NtvJsonEncoder` 
 11and `NtvError` for NTV entities.
 12"""
 13from abc import ABC, abstractmethod
 14import datetime
 15import json
 16
 17class NtvUtil:
 18    ''' The NtvUtil class includes static methods used by several NTV classes.
 19    NtvUtil is the parent class of `Datatype`, `Namespace`, `Ntv`.
 20    
 21    *class variables :*
 22    - **_namespaces_** : list of Namespace defined
 23    - **_types_** : list of Datatype defined
 24    
 25    *static methods :*
 26    - `from_obj_name`
 27    - `decode_ntv_tab`
 28    
 29    '''   
 30    _namespaces_ = {}
 31    _types_ = {}
 32    
 33    @staticmethod
 34    def from_obj_name(string):
 35        '''return a tuple with name, type_str and separator from string'''
 36        if not isinstance(string, str):
 37            raise NtvError('a json-name have to be str')
 38        if string == '':
 39            return (None, None, None)
 40        sep = None
 41        if '::' in string:
 42            sep = '::'
 43        elif ':' in string:
 44            sep = ':'
 45        if sep is None:
 46            #return (string, None, None)
 47            return (string, None, None) if string[-1]!='.' else (None, string, None)
 48        split = string.rsplit(sep, 2)
 49        if len(split) == 1:
 50            return (string, None, sep)
 51        if split[0] == '':
 52            return (None, split[1], sep)
 53        if split[1] == '':
 54            return (split[0], None, sep)
 55        return (split[0], split[1], sep)
 56
 57    @staticmethod
 58    def decode_ntv_tab(ntv, ntv_to_val):
 59        '''Generate a tuple data from a Ntv tab value (bytes, string, json, Ntv object)
 60        
 61        *parameters:*
 62        
 63        - **ntv**: Ntv data to decode,
 64        - **ntv_to_val**: method to convert external value form ntv in internal Field value
 65        
 66        *Returns tuple: (name, dtype, codec, parent, keys, coef, leng)*
 67
 68        - name (None or string): name of the Field
 69        - dtype (None or string): type of data
 70        - codec (list): list of Field codec values
 71        - parent (None or int): Field parent or None
 72        - keys (None or list): Field keys
 73        - coef (None or int): coef if primary Field else None
 74        - leng (int): length of the Field
 75        '''
 76        #ntv = Ntv.obj(field)
 77        typ = ntv.type_str if ntv.ntv_type else None
 78        nam = ntv.name
 79        val = ntv_to_val(ntv)
 80        if ntv.__class__.__name__ == 'NtvSingle':
 81            return (nam, typ, [val], None, None, None, 1)
 82        if len(ntv) < 2 or len(ntv) > 3 or ntv[0].__class__.__name__ == 'NtvSingle':
 83            return (nam, typ, val, None, None, None, len(ntv))
 84
 85        ntvc = ntv[0]
 86        leng = max(len(ind) for ind in ntv)
 87        typc = ntvc.type_str if ntvc.ntv_type else None
 88        valc = ntv_to_val(ntvc)
 89        if len(ntv) == 3 and ntv[1].__class__.__name__ == 'NtvSingle' and \
 90                isinstance(ntv[1].val, (int, str)) and \
 91                ntv[2].__class__.__name__ != 'NtvSingle' and \
 92                isinstance(ntv[2][0].val, int):
 93            return (nam, typc, valc, ntv[1].val, ntv[2].to_obj(), None, leng)
 94        if len(ntv) == 2 and len(ntv[1]) == 1 and isinstance(ntv[1].val, (int, str)):
 95            return (nam, typc, valc, ntv[1].val, None, None, leng)
 96        if len(ntv) == 2 and len(ntv[1]) == 1 and isinstance(ntv[1].val, list):
 97            leng = leng * ntv[1][0].val
 98            return (nam, typc, valc, None, None, ntv[1][0].val, leng)
 99        if len(ntv) == 2 and len(ntv[1]) > 1 and isinstance(ntv[1][0].val, int):
100            return (nam, typc, valc, None, ntv[1].to_obj(), None, leng)
101        return (nam, typ, val, None, None, None, len(ntv))
102    
103class NtvConnector(ABC):
104    ''' The NtvConnector class is an abstract class used by all NTV connectors
105    for conversion between NTV-JSON data and NTV-OBJ data.
106
107    A NtvConnector child is defined by:
108    - clas_obj: str - define the class name of the object to convert
109    - clas_typ: str - define the Datatype of the converted object
110    - to_obj_ntv: method - converter from JsonNTV to the object
111    - to_json_ntv: method - converter from the object to JsonNTV
112    
113    *class method :*
114    - `connector`
115    - `dic_connec`
116    - `castable` (@property)
117    - `dic_obj` (@property)
118    - `dic_type` (@property)
119
120    *abstract method*
121    - `to_obj_ntv`
122    - `to_json_ntv`
123
124    *static method*
125    - `cast`
126    - `uncast`
127    - `is_json_class`
128    - `is_json`
129    - `init_ntv_keys`
130    '''
131
132    DIC_NTV_CL = {'NtvSingle': 'ntv', 'NtvList': 'ntv'}
133    DIC_GEO_CL = {'Point': 'point', 'MultiPoint': 'multipoint', 'LineString': 'line',
134                  'MultiLineString': 'multiline', 'Polygon': 'polygon',
135                  'MultiPolygon': 'multipolygon'}
136    DIC_DAT_CL = {'date': 'date', 'time': 'time', 'datetime': 'datetime'}
137    DIC_FCT = {'date': datetime.date.fromisoformat, 'time': datetime.time.fromisoformat,
138               'datetime': datetime.datetime.fromisoformat}
139    DIC_GEO = {'point': 'point', 'multipoint': 'multipoint', 'line': 'linestring',
140               'multiline': 'multilinestring', 'polygon': 'polygon',
141               'multipolygon': 'multipolygon'}
142    DIC_CBOR = {'point': False, 'multipoint': False, 'line': False,
143                'multiline': False, 'polygon': False, 'multipolygon': False,
144                'date': True, 'time': False, 'datetime': True}
145    DIC_OBJ = {'tab': 'DataFrameConnec', 'field': 'SeriesConnec',
146               'point': 'ShapelyConnec', 'multipoint': 'ShapelyConnec',
147               'line': 'ShapelyConnec', 'multiline': 'ShapelyConnec',
148               'polygon': 'ShapelyConnec', 'multipolygon': 'ShapelyConnec',
149               'other': None}
150
151    @classmethod
152    @property
153    def castable(cls):
154        '''return a list with class_name allowed for json-obj conversion'''
155        return ['str', 'int', 'bool', 'float', 'dict', 'tuple', 'NoneType',
156                'NtvSingle', 'NtvList'] \
157            + list(NtvConnector.DIC_GEO_CL.keys()) \
158            + list(NtvConnector.DIC_FCT.keys()) \
159            + list(NtvConnector.dic_connec().keys())
160
161    @classmethod
162    @property
163    def dic_obj(cls):
164        '''return a dict with the connectors: { type: class_connec_name }'''
165        return {clas.clas_typ: clas.__name__ for clas in cls.__subclasses__()} |\
166            NtvConnector.DIC_OBJ
167
168    @classmethod
169    @property
170    def dic_type(cls):
171        '''return a dict with the connectors: { class_obj_name: type }'''
172        return {clas.clas_obj: clas.clas_typ for clas in cls.__subclasses__()} |\
173            NtvConnector.DIC_GEO_CL | NtvConnector.DIC_DAT_CL | NtvConnector.DIC_NTV_CL
174
175    @classmethod
176    def connector(cls):
177        '''return a dict with the connectors: { class_connec_name: class_connec }'''
178        return {clas.__name__: clas for clas in cls.__subclasses__()}
179
180    @classmethod
181    def dic_connec(cls):
182        '''return a dict with the clas associated to the connector:
183        { class_obj_name: class_connec_name }'''
184        return {clas.clas_obj: clas.__name__ for clas in cls.__subclasses__()}
185
186    @staticmethod
187    @abstractmethod
188    def to_obj_ntv(ntv_value, **kwargs):
189        ''' abstract - convert ntv_value into the return object'''
190
191    @staticmethod
192    @abstractmethod
193    def to_json_ntv(value, name=None, typ=None):
194        ''' abstract - convert NTV object (value, name, type) into the NTV json
195        (json-value, name, type)'''
196
197    @staticmethod
198    def cast(data, name=None, type_str=None):
199        '''return JSON-NTV conversion (json_value, name, type_str) of the NTV entity
200        defined in parameters.
201
202        *Parameters*
203
204        - **data**: NtvSingle entity or NTVvalue of the NTV entity
205        - **name** : String (default None) - name of the NTV entity
206        - **type_str**: String (default None) - type of the NTV entity
207        '''
208        clas = data.__class__.__name__
209        if clas == 'NtvSingle':
210            name = data.ntv_name
211            type_str = data.type_str
212            data = data.ntv_value
213        dic_geo_cl = NtvConnector.DIC_GEO_CL
214        dic_connec = NtvConnector.dic_connec()
215        match clas:
216            case 'int' | 'float' | 'bool' | 'str':
217                return (data, name, type_str)
218            case 'dict':
219                return ({key: NtvConnector.cast(val, name, type_str)[0]
220                         for key, val in data.items()}, name, type_str)
221            case 'list':
222                return ([NtvConnector.cast(val, name, type_str)[0] for val in data],
223                        name, NtvConnector._typ_obj(data) if not type_str else type_str)
224            case 'tuple':
225                return (list(data), name, 'array' if not type_str else type_str)
226            case 'date' | 'time' | 'datetime':
227                return (data.isoformat(), name, clas if not type_str else type_str)
228            case 'Point' | 'MultiPoint' | 'LineString' | 'MultiLineString' | \
229                    'Polygon' | 'MultiPolygon':
230                return (NtvConnector.connector()[dic_connec['geometry']].to_json_ntv(data)[0],
231                        name, dic_geo_cl[data.__class__.__name__] if not type_str else type_str)
232            case 'NtvSingle' | 'NtvList':
233                return (data.to_obj(), name, 'ntv' if not type_str else type_str)
234            case _:
235                connec = None
236                if clas in dic_connec and dic_connec[clas] in NtvConnector.connector():
237                    connec = NtvConnector.connector()[dic_connec[clas]]
238                if connec:
239                    return connec.to_json_ntv(data, name, type_str)
240                raise NtvError(
241                    'connector is not defined for the class : ', clas)
242        return (data, name, type_str)
243
244    @staticmethod
245    def uncast(value, name=None, type_str=None, **kwargs):
246        '''return OBJ-NTV conversion (obj_value, name, type_str) of a NTV entity
247
248        *Parameters*
249
250        - **data**: NtvSingle entity or NTVvalue of the NTV entity
251        - **name** : String (default None) - name of the NTV entity
252        - **type_str**: String (default None) - type of the NTV entity
253        '''
254        if type_str == 'json':
255            return (value, name, type_str)
256        if value.__class__.__name__ == 'NtvSingle':
257            if not (type_str in set(NtvConnector.dic_type.values()) and 
258                    NtvConnector.is_json(value) or type_str is None):
259                return (value.ntv_value, value.name, value.type_str)
260            type_str = value.type_str if value.ntv_type else None
261            name = value.ntv_name
262            value = value.ntv_value
263        #dic_obj = NtvConnector.dic_obj
264        option = {'dicobj': {}, 'format': 'json', 'type_obj': False} | kwargs
265        value_obj = NtvConnector._uncast_val(value, type_str, **option)
266        return (value_obj, name, type_str if type_str else NtvConnector._typ_obj(value_obj))
267
268    @staticmethod
269    def _typ_obj(value):
270        if isinstance(value, dict):
271            return NtvConnector._typ_obj(list(value.values()))
272        if isinstance(value, (tuple, list)):
273            for val in value:
274                typ = NtvConnector._typ_obj(val)
275                if typ:
276                    return typ
277            return None
278        return NtvConnector.dic_type.get(value.__class__.__name__)
279
280    @staticmethod
281    def _uncast_val(value, type_n, **option):
282        '''return value from ntv value'''
283        dic_fct = NtvConnector.DIC_FCT
284        dic_geo = NtvConnector.DIC_GEO
285        dic_obj = NtvConnector.dic_obj | option['dicobj']
286        dic_cbor = NtvConnector.DIC_CBOR
287        if not type_n or type_n == 'json' or (option['format'] == 'cbor' and 
288                                              not dic_cbor.get(type_n, False)):
289            return value
290        if type_n in dic_fct:
291            if isinstance(value, (tuple, list)):
292                return [NtvConnector._uncast_val(val, type_n, **option) for val in value]
293            if isinstance(value, dict):
294                return {key: NtvConnector._uncast_val(val, type_n, **option)
295                        for key, val in value.items()}
296            return dic_fct[type_n](value)
297        if type_n == 'array':
298            return tuple(value)
299        if type_n == 'ntv':
300            from json_ntv.ntv import Ntv
301            return Ntv.from_obj(value)
302        if type_n in dic_geo:
303            option['type_geo'] = dic_geo[type_n]
304        connec = None
305        if type_n in dic_obj and \
306                dic_obj[type_n] in NtvConnector.connector():
307            connec = NtvConnector.connector()[dic_obj[type_n]]
308        elif dic_obj['other'] in NtvConnector.connector():
309            connec = NtvConnector.connector()['other']
310        if connec:
311            return connec.to_obj_ntv(value, **option)
312        #raise NtvError('type of value not allowed for conversion')
313        return value
314
315    @staticmethod
316    def is_json_class(val):
317        ''' return True if val is a json type'''
318        return val is None or isinstance(val, (list, int, str, float, bool, dict))
319
320    @staticmethod
321    def is_json(obj):
322        ''' check if obj is a json structure and return True if obj is a json-value
323
324        *Parameters*
325
326        - **obj** : object to check
327        - **ntvobj** : boolean (default False) - if True NTV class value are accepted'''
328        if obj is None:
329            return True
330        is_js = NtvConnector.is_json
331        match obj:
332            case str() | int() | float() | bool() as obj:
333                return True
334            case list() | tuple() as obj:
335                if not obj:
336                    return True
337                return min(is_js(obj_in) for obj_in in obj)
338            case dict() as obj:
339                if not obj:
340                    return True
341                if not min(isinstance(key, str) for key in obj.keys()):
342                    raise NtvError('key in dict in not string')
343                return min(is_js(obj_in) for obj_in in obj.values())
344            case _:
345                if not obj.__class__.__name__ in NtvConnector.castable:
346                    raise NtvError(obj.__class__.__name__ +
347                                   ' is not valid for NTV')
348                return False
349
350    @staticmethod
351    def keysfromderkeys(parentkeys, derkeys):
352        '''return keys from parent keys and derkeys
353
354        *Parameters*
355
356        - **parentkeys** : list of keys from parent
357        - **derkeys** : list of derived keys
358
359        *Returns* : list of keys'''
360        return [derkeys[pkey] for pkey in parentkeys]
361
362    @staticmethod 
363    def encode_coef(lis):
364        '''Generate a repetition coefficient for periodic list'''
365        if len(lis) < 2:
366            return 0
367        coef = 1
368        while coef != len(lis):
369            if lis[coef-1] != lis[coef]:
370                break
371            coef += 1
372        if (not len(lis) % (coef * (max(lis) + 1)) and 
373            lis == NtvConnector.keysfromcoef(coef, max(lis) + 1, len(lis))):
374            return coef
375        return 0
376    
377    @staticmethod 
378    def keysfromcoef(coef, period, leng=None):
379        ''' return a list of keys with periodic structure'''
380        if not leng:
381            leng = coef * period
382        return None if not (coef and period) else [(ind % (coef * period)) // coef 
383                                                   for ind in range(leng)]    
384    @staticmethod
385    def init_ntv_keys(ind, lidx, leng):
386        ''' initialization of explicit keys data in lidx object of tabular data'''
387        # name: 0, type: 1, codec: 2, parent: 3, keys: 4, coef: 5, leng: 6
388        name, typ, codec, parent, keys, coef, length = lidx[ind]
389        if (keys, parent, coef) == (None, None, None):  # full or unique
390            if len(codec) == 1: # unique
391                lidx[ind][4] = [0] * leng
392            elif len(codec) == leng:    # full
393                lidx[ind][4] = list(range(leng))
394            else:
395                raise NtvError('impossible to generate keys')
396            return
397        if keys and len(keys) > 1 and parent is None:  #complete
398            return
399        if coef:  #primary
400            lidx[ind][4] = [(ikey % (coef * len(codec))) // coef for ikey in range(leng)]
401            lidx[ind][3] = None
402            return  
403        if parent is None:
404            raise NtvError('keys not referenced')          
405        if not lidx[parent][4] or len(lidx[parent][4]) != leng:
406            NtvConnector.init_ntv_keys(parent, lidx, leng)
407        if not keys and len(codec) == len(lidx[parent][2]):    # implicit
408            lidx[ind][4] = lidx[parent][4]
409            lidx[ind][3] = None
410            return
411        lidx[ind][4] = NtvConnector.keysfromderkeys(lidx[parent][4], keys)  # relative
412        #lidx[ind][4] = [keys[pkey] for pkey in lidx[parent][4]]  # relative
413        lidx[ind][3] = None
414        return    
415    
416class NtvTree:
417    ''' The NtvTree class is an iterator class used to traverse a NTV tree structure.
418    Some other methods give tree indicators and data.
419
420    *Attributes :*
421
422    - **ntv** : Ntv entity
423    - **_node**:  Ntv entity - node pointer
424    - **_stack**:  list - stack used to store context in recursive methods 
425
426    *dynamic values (@property)*
427    - `breadth`
428    - `size`
429    - `height`
430    - `adjacency_list`
431    - `nodes`
432    - `dic_nodes`
433    - `leaf_nodes`
434    - `inner_nodes`
435    '''
436
437    def __init__(self, ntv):
438        ''' the parameter of the constructor is the Ntv entity'''
439        self._ntv = ntv
440        self._node = None
441        self._stack = []
442
443    def __iter__(self):
444        ''' iterator without initialization'''
445        return self
446
447    def __next__(self):
448        ''' return next node in the tree'''
449        if self._node is None:
450            self._node = self._ntv
451        elif len(self._node) == 0:
452            raise StopIteration
453        elif self._node.__class__.__name__ == 'NtvList':
454            self._next_down()
455        else:
456            self._next_up()
457        return self._node
458
459    def _next_down(self):
460        ''' find the next subchild node'''
461
462        self._node = self._node[0]
463        self._stack.append(0)
464
465    def _next_up(self):
466        ''' find the next sibling or ancestor node'''
467        parent = self._node.parent
468        if not parent or self._node == self._ntv:
469            raise StopIteration
470        ind = self._stack[-1]
471        if ind < len(parent) - 1:  # if ind is not the last
472            self._node = parent[ind + 1]
473            self._stack[-1] += 1
474        else:
475            if parent == self._ntv:
476                raise StopIteration
477            self._node = parent
478            self._stack.pop()
479            self._next_up()
480            
481    @property
482    def breadth(self):
483        ''' return the number of leaves'''
484        return len(self.leaf_nodes)
485
486    @property
487    def size(self):
488        ''' return the number of nodes'''
489        return len(self.nodes)
490
491    @property
492    def height(self):
493        ''' return the height of the tree'''
494        return max(len(node.pointer()) for node in self.__class__(self._ntv))
495
496    @property
497    def adjacency_list(self):
498        ''' return a dict with the list of child nodes for each parent node'''
499        return {node: node.val for node in self.inner_nodes}
500
501    @property
502    def nodes(self):
503        ''' return the list of nodes according to the DFS preordering algorithm'''
504        return list(self.__class__(self._ntv))
505
506    @property
507    def dic_nodes(self):
508        ''' return a dict of nodes according to the DFS preordering algorithm'''
509        return {node.ntv_name: node for node in self.__class__(self._ntv)
510                if node.ntv_name}
511
512    @property
513    def leaf_nodes(self):
514        ''' return the list of leaf nodes according to the DFS preordering algorithm'''
515        #return [node for node in self.__class__(self._ntv) if not isinstance(node, NtvList)]
516        return [node for node in self.__class__(self._ntv)
517                if node.__class__.__name__ == 'NtvSingle']
518
519    @property
520    def inner_nodes(self):
521        ''' return the list of inner nodes according to the DFS preordering algorithm'''
522        return [node for node in self.__class__(self._ntv)
523                if node.__class__.__name__ == 'NtvList']
524
525
526class NtvJsonEncoder(json.JSONEncoder):
527    """json encoder for Ntv data"""
528
529    def default(self, o):
530        try:
531            return NtvConnector.cast(o)[0]
532        except NtvError:
533            return json.JSONEncoder.default(self, o)
534        if isinstance(o, (datetime.datetime, datetime.date, datetime.time)):
535            return o.isoformat()
536        return json.JSONEncoder.default(self, o)
537
538
539class NtvError(Exception):
540    ''' NTV Exception'''
541    # pass
class NtvUtil:
 18class NtvUtil:
 19    ''' The NtvUtil class includes static methods used by several NTV classes.
 20    NtvUtil is the parent class of `Datatype`, `Namespace`, `Ntv`.
 21    
 22    *class variables :*
 23    - **_namespaces_** : list of Namespace defined
 24    - **_types_** : list of Datatype defined
 25    
 26    *static methods :*
 27    - `from_obj_name`
 28    - `decode_ntv_tab`
 29    
 30    '''   
 31    _namespaces_ = {}
 32    _types_ = {}
 33    
 34    @staticmethod
 35    def from_obj_name(string):
 36        '''return a tuple with name, type_str and separator from string'''
 37        if not isinstance(string, str):
 38            raise NtvError('a json-name have to be str')
 39        if string == '':
 40            return (None, None, None)
 41        sep = None
 42        if '::' in string:
 43            sep = '::'
 44        elif ':' in string:
 45            sep = ':'
 46        if sep is None:
 47            #return (string, None, None)
 48            return (string, None, None) if string[-1]!='.' else (None, string, None)
 49        split = string.rsplit(sep, 2)
 50        if len(split) == 1:
 51            return (string, None, sep)
 52        if split[0] == '':
 53            return (None, split[1], sep)
 54        if split[1] == '':
 55            return (split[0], None, sep)
 56        return (split[0], split[1], sep)
 57
 58    @staticmethod
 59    def decode_ntv_tab(ntv, ntv_to_val):
 60        '''Generate a tuple data from a Ntv tab value (bytes, string, json, Ntv object)
 61        
 62        *parameters:*
 63        
 64        - **ntv**: Ntv data to decode,
 65        - **ntv_to_val**: method to convert external value form ntv in internal Field value
 66        
 67        *Returns tuple: (name, dtype, codec, parent, keys, coef, leng)*
 68
 69        - name (None or string): name of the Field
 70        - dtype (None or string): type of data
 71        - codec (list): list of Field codec values
 72        - parent (None or int): Field parent or None
 73        - keys (None or list): Field keys
 74        - coef (None or int): coef if primary Field else None
 75        - leng (int): length of the Field
 76        '''
 77        #ntv = Ntv.obj(field)
 78        typ = ntv.type_str if ntv.ntv_type else None
 79        nam = ntv.name
 80        val = ntv_to_val(ntv)
 81        if ntv.__class__.__name__ == 'NtvSingle':
 82            return (nam, typ, [val], None, None, None, 1)
 83        if len(ntv) < 2 or len(ntv) > 3 or ntv[0].__class__.__name__ == 'NtvSingle':
 84            return (nam, typ, val, None, None, None, len(ntv))
 85
 86        ntvc = ntv[0]
 87        leng = max(len(ind) for ind in ntv)
 88        typc = ntvc.type_str if ntvc.ntv_type else None
 89        valc = ntv_to_val(ntvc)
 90        if len(ntv) == 3 and ntv[1].__class__.__name__ == 'NtvSingle' and \
 91                isinstance(ntv[1].val, (int, str)) and \
 92                ntv[2].__class__.__name__ != 'NtvSingle' and \
 93                isinstance(ntv[2][0].val, int):
 94            return (nam, typc, valc, ntv[1].val, ntv[2].to_obj(), None, leng)
 95        if len(ntv) == 2 and len(ntv[1]) == 1 and isinstance(ntv[1].val, (int, str)):
 96            return (nam, typc, valc, ntv[1].val, None, None, leng)
 97        if len(ntv) == 2 and len(ntv[1]) == 1 and isinstance(ntv[1].val, list):
 98            leng = leng * ntv[1][0].val
 99            return (nam, typc, valc, None, None, ntv[1][0].val, leng)
100        if len(ntv) == 2 and len(ntv[1]) > 1 and isinstance(ntv[1][0].val, int):
101            return (nam, typc, valc, None, ntv[1].to_obj(), None, leng)
102        return (nam, typ, val, None, None, None, len(ntv))

The NtvUtil class includes static methods used by several NTV classes. NtvUtil is the parent class of Datatype, Namespace, Ntv.

class variables :

  • _namespaces_ : list of Namespace defined
  • _types_ : list of Datatype defined

static methods :

@staticmethod
def from_obj_name(string):
34    @staticmethod
35    def from_obj_name(string):
36        '''return a tuple with name, type_str and separator from string'''
37        if not isinstance(string, str):
38            raise NtvError('a json-name have to be str')
39        if string == '':
40            return (None, None, None)
41        sep = None
42        if '::' in string:
43            sep = '::'
44        elif ':' in string:
45            sep = ':'
46        if sep is None:
47            #return (string, None, None)
48            return (string, None, None) if string[-1]!='.' else (None, string, None)
49        split = string.rsplit(sep, 2)
50        if len(split) == 1:
51            return (string, None, sep)
52        if split[0] == '':
53            return (None, split[1], sep)
54        if split[1] == '':
55            return (split[0], None, sep)
56        return (split[0], split[1], sep)

return a tuple with name, type_str and separator from string

@staticmethod
def decode_ntv_tab(ntv, ntv_to_val):
 58    @staticmethod
 59    def decode_ntv_tab(ntv, ntv_to_val):
 60        '''Generate a tuple data from a Ntv tab value (bytes, string, json, Ntv object)
 61        
 62        *parameters:*
 63        
 64        - **ntv**: Ntv data to decode,
 65        - **ntv_to_val**: method to convert external value form ntv in internal Field value
 66        
 67        *Returns tuple: (name, dtype, codec, parent, keys, coef, leng)*
 68
 69        - name (None or string): name of the Field
 70        - dtype (None or string): type of data
 71        - codec (list): list of Field codec values
 72        - parent (None or int): Field parent or None
 73        - keys (None or list): Field keys
 74        - coef (None or int): coef if primary Field else None
 75        - leng (int): length of the Field
 76        '''
 77        #ntv = Ntv.obj(field)
 78        typ = ntv.type_str if ntv.ntv_type else None
 79        nam = ntv.name
 80        val = ntv_to_val(ntv)
 81        if ntv.__class__.__name__ == 'NtvSingle':
 82            return (nam, typ, [val], None, None, None, 1)
 83        if len(ntv) < 2 or len(ntv) > 3 or ntv[0].__class__.__name__ == 'NtvSingle':
 84            return (nam, typ, val, None, None, None, len(ntv))
 85
 86        ntvc = ntv[0]
 87        leng = max(len(ind) for ind in ntv)
 88        typc = ntvc.type_str if ntvc.ntv_type else None
 89        valc = ntv_to_val(ntvc)
 90        if len(ntv) == 3 and ntv[1].__class__.__name__ == 'NtvSingle' and \
 91                isinstance(ntv[1].val, (int, str)) and \
 92                ntv[2].__class__.__name__ != 'NtvSingle' and \
 93                isinstance(ntv[2][0].val, int):
 94            return (nam, typc, valc, ntv[1].val, ntv[2].to_obj(), None, leng)
 95        if len(ntv) == 2 and len(ntv[1]) == 1 and isinstance(ntv[1].val, (int, str)):
 96            return (nam, typc, valc, ntv[1].val, None, None, leng)
 97        if len(ntv) == 2 and len(ntv[1]) == 1 and isinstance(ntv[1].val, list):
 98            leng = leng * ntv[1][0].val
 99            return (nam, typc, valc, None, None, ntv[1][0].val, leng)
100        if len(ntv) == 2 and len(ntv[1]) > 1 and isinstance(ntv[1][0].val, int):
101            return (nam, typc, valc, None, ntv[1].to_obj(), None, leng)
102        return (nam, typ, val, None, None, None, len(ntv))

Generate a tuple data from a Ntv tab value (bytes, string, json, Ntv object)

parameters:

  • ntv: Ntv data to decode,
  • ntv_to_val: method to convert external value form ntv in internal Field value

Returns tuple: (name, dtype, codec, parent, keys, coef, leng)

  • name (None or string): name of the Field
  • dtype (None or string): type of data
  • codec (list): list of Field codec values
  • parent (None or int): Field parent or None
  • keys (None or list): Field keys
  • coef (None or int): coef if primary Field else None
  • leng (int): length of the Field
class NtvConnector(abc.ABC):
104class NtvConnector(ABC):
105    ''' The NtvConnector class is an abstract class used by all NTV connectors
106    for conversion between NTV-JSON data and NTV-OBJ data.
107
108    A NtvConnector child is defined by:
109    - clas_obj: str - define the class name of the object to convert
110    - clas_typ: str - define the Datatype of the converted object
111    - to_obj_ntv: method - converter from JsonNTV to the object
112    - to_json_ntv: method - converter from the object to JsonNTV
113    
114    *class method :*
115    - `connector`
116    - `dic_connec`
117    - `castable` (@property)
118    - `dic_obj` (@property)
119    - `dic_type` (@property)
120
121    *abstract method*
122    - `to_obj_ntv`
123    - `to_json_ntv`
124
125    *static method*
126    - `cast`
127    - `uncast`
128    - `is_json_class`
129    - `is_json`
130    - `init_ntv_keys`
131    '''
132
133    DIC_NTV_CL = {'NtvSingle': 'ntv', 'NtvList': 'ntv'}
134    DIC_GEO_CL = {'Point': 'point', 'MultiPoint': 'multipoint', 'LineString': 'line',
135                  'MultiLineString': 'multiline', 'Polygon': 'polygon',
136                  'MultiPolygon': 'multipolygon'}
137    DIC_DAT_CL = {'date': 'date', 'time': 'time', 'datetime': 'datetime'}
138    DIC_FCT = {'date': datetime.date.fromisoformat, 'time': datetime.time.fromisoformat,
139               'datetime': datetime.datetime.fromisoformat}
140    DIC_GEO = {'point': 'point', 'multipoint': 'multipoint', 'line': 'linestring',
141               'multiline': 'multilinestring', 'polygon': 'polygon',
142               'multipolygon': 'multipolygon'}
143    DIC_CBOR = {'point': False, 'multipoint': False, 'line': False,
144                'multiline': False, 'polygon': False, 'multipolygon': False,
145                'date': True, 'time': False, 'datetime': True}
146    DIC_OBJ = {'tab': 'DataFrameConnec', 'field': 'SeriesConnec',
147               'point': 'ShapelyConnec', 'multipoint': 'ShapelyConnec',
148               'line': 'ShapelyConnec', 'multiline': 'ShapelyConnec',
149               'polygon': 'ShapelyConnec', 'multipolygon': 'ShapelyConnec',
150               'other': None}
151
152    @classmethod
153    @property
154    def castable(cls):
155        '''return a list with class_name allowed for json-obj conversion'''
156        return ['str', 'int', 'bool', 'float', 'dict', 'tuple', 'NoneType',
157                'NtvSingle', 'NtvList'] \
158            + list(NtvConnector.DIC_GEO_CL.keys()) \
159            + list(NtvConnector.DIC_FCT.keys()) \
160            + list(NtvConnector.dic_connec().keys())
161
162    @classmethod
163    @property
164    def dic_obj(cls):
165        '''return a dict with the connectors: { type: class_connec_name }'''
166        return {clas.clas_typ: clas.__name__ for clas in cls.__subclasses__()} |\
167            NtvConnector.DIC_OBJ
168
169    @classmethod
170    @property
171    def dic_type(cls):
172        '''return a dict with the connectors: { class_obj_name: type }'''
173        return {clas.clas_obj: clas.clas_typ for clas in cls.__subclasses__()} |\
174            NtvConnector.DIC_GEO_CL | NtvConnector.DIC_DAT_CL | NtvConnector.DIC_NTV_CL
175
176    @classmethod
177    def connector(cls):
178        '''return a dict with the connectors: { class_connec_name: class_connec }'''
179        return {clas.__name__: clas for clas in cls.__subclasses__()}
180
181    @classmethod
182    def dic_connec(cls):
183        '''return a dict with the clas associated to the connector:
184        { class_obj_name: class_connec_name }'''
185        return {clas.clas_obj: clas.__name__ for clas in cls.__subclasses__()}
186
187    @staticmethod
188    @abstractmethod
189    def to_obj_ntv(ntv_value, **kwargs):
190        ''' abstract - convert ntv_value into the return object'''
191
192    @staticmethod
193    @abstractmethod
194    def to_json_ntv(value, name=None, typ=None):
195        ''' abstract - convert NTV object (value, name, type) into the NTV json
196        (json-value, name, type)'''
197
198    @staticmethod
199    def cast(data, name=None, type_str=None):
200        '''return JSON-NTV conversion (json_value, name, type_str) of the NTV entity
201        defined in parameters.
202
203        *Parameters*
204
205        - **data**: NtvSingle entity or NTVvalue of the NTV entity
206        - **name** : String (default None) - name of the NTV entity
207        - **type_str**: String (default None) - type of the NTV entity
208        '''
209        clas = data.__class__.__name__
210        if clas == 'NtvSingle':
211            name = data.ntv_name
212            type_str = data.type_str
213            data = data.ntv_value
214        dic_geo_cl = NtvConnector.DIC_GEO_CL
215        dic_connec = NtvConnector.dic_connec()
216        match clas:
217            case 'int' | 'float' | 'bool' | 'str':
218                return (data, name, type_str)
219            case 'dict':
220                return ({key: NtvConnector.cast(val, name, type_str)[0]
221                         for key, val in data.items()}, name, type_str)
222            case 'list':
223                return ([NtvConnector.cast(val, name, type_str)[0] for val in data],
224                        name, NtvConnector._typ_obj(data) if not type_str else type_str)
225            case 'tuple':
226                return (list(data), name, 'array' if not type_str else type_str)
227            case 'date' | 'time' | 'datetime':
228                return (data.isoformat(), name, clas if not type_str else type_str)
229            case 'Point' | 'MultiPoint' | 'LineString' | 'MultiLineString' | \
230                    'Polygon' | 'MultiPolygon':
231                return (NtvConnector.connector()[dic_connec['geometry']].to_json_ntv(data)[0],
232                        name, dic_geo_cl[data.__class__.__name__] if not type_str else type_str)
233            case 'NtvSingle' | 'NtvList':
234                return (data.to_obj(), name, 'ntv' if not type_str else type_str)
235            case _:
236                connec = None
237                if clas in dic_connec and dic_connec[clas] in NtvConnector.connector():
238                    connec = NtvConnector.connector()[dic_connec[clas]]
239                if connec:
240                    return connec.to_json_ntv(data, name, type_str)
241                raise NtvError(
242                    'connector is not defined for the class : ', clas)
243        return (data, name, type_str)
244
245    @staticmethod
246    def uncast(value, name=None, type_str=None, **kwargs):
247        '''return OBJ-NTV conversion (obj_value, name, type_str) of a NTV entity
248
249        *Parameters*
250
251        - **data**: NtvSingle entity or NTVvalue of the NTV entity
252        - **name** : String (default None) - name of the NTV entity
253        - **type_str**: String (default None) - type of the NTV entity
254        '''
255        if type_str == 'json':
256            return (value, name, type_str)
257        if value.__class__.__name__ == 'NtvSingle':
258            if not (type_str in set(NtvConnector.dic_type.values()) and 
259                    NtvConnector.is_json(value) or type_str is None):
260                return (value.ntv_value, value.name, value.type_str)
261            type_str = value.type_str if value.ntv_type else None
262            name = value.ntv_name
263            value = value.ntv_value
264        #dic_obj = NtvConnector.dic_obj
265        option = {'dicobj': {}, 'format': 'json', 'type_obj': False} | kwargs
266        value_obj = NtvConnector._uncast_val(value, type_str, **option)
267        return (value_obj, name, type_str if type_str else NtvConnector._typ_obj(value_obj))
268
269    @staticmethod
270    def _typ_obj(value):
271        if isinstance(value, dict):
272            return NtvConnector._typ_obj(list(value.values()))
273        if isinstance(value, (tuple, list)):
274            for val in value:
275                typ = NtvConnector._typ_obj(val)
276                if typ:
277                    return typ
278            return None
279        return NtvConnector.dic_type.get(value.__class__.__name__)
280
281    @staticmethod
282    def _uncast_val(value, type_n, **option):
283        '''return value from ntv value'''
284        dic_fct = NtvConnector.DIC_FCT
285        dic_geo = NtvConnector.DIC_GEO
286        dic_obj = NtvConnector.dic_obj | option['dicobj']
287        dic_cbor = NtvConnector.DIC_CBOR
288        if not type_n or type_n == 'json' or (option['format'] == 'cbor' and 
289                                              not dic_cbor.get(type_n, False)):
290            return value
291        if type_n in dic_fct:
292            if isinstance(value, (tuple, list)):
293                return [NtvConnector._uncast_val(val, type_n, **option) for val in value]
294            if isinstance(value, dict):
295                return {key: NtvConnector._uncast_val(val, type_n, **option)
296                        for key, val in value.items()}
297            return dic_fct[type_n](value)
298        if type_n == 'array':
299            return tuple(value)
300        if type_n == 'ntv':
301            from json_ntv.ntv import Ntv
302            return Ntv.from_obj(value)
303        if type_n in dic_geo:
304            option['type_geo'] = dic_geo[type_n]
305        connec = None
306        if type_n in dic_obj and \
307                dic_obj[type_n] in NtvConnector.connector():
308            connec = NtvConnector.connector()[dic_obj[type_n]]
309        elif dic_obj['other'] in NtvConnector.connector():
310            connec = NtvConnector.connector()['other']
311        if connec:
312            return connec.to_obj_ntv(value, **option)
313        #raise NtvError('type of value not allowed for conversion')
314        return value
315
316    @staticmethod
317    def is_json_class(val):
318        ''' return True if val is a json type'''
319        return val is None or isinstance(val, (list, int, str, float, bool, dict))
320
321    @staticmethod
322    def is_json(obj):
323        ''' check if obj is a json structure and return True if obj is a json-value
324
325        *Parameters*
326
327        - **obj** : object to check
328        - **ntvobj** : boolean (default False) - if True NTV class value are accepted'''
329        if obj is None:
330            return True
331        is_js = NtvConnector.is_json
332        match obj:
333            case str() | int() | float() | bool() as obj:
334                return True
335            case list() | tuple() as obj:
336                if not obj:
337                    return True
338                return min(is_js(obj_in) for obj_in in obj)
339            case dict() as obj:
340                if not obj:
341                    return True
342                if not min(isinstance(key, str) for key in obj.keys()):
343                    raise NtvError('key in dict in not string')
344                return min(is_js(obj_in) for obj_in in obj.values())
345            case _:
346                if not obj.__class__.__name__ in NtvConnector.castable:
347                    raise NtvError(obj.__class__.__name__ +
348                                   ' is not valid for NTV')
349                return False
350
351    @staticmethod
352    def keysfromderkeys(parentkeys, derkeys):
353        '''return keys from parent keys and derkeys
354
355        *Parameters*
356
357        - **parentkeys** : list of keys from parent
358        - **derkeys** : list of derived keys
359
360        *Returns* : list of keys'''
361        return [derkeys[pkey] for pkey in parentkeys]
362
363    @staticmethod 
364    def encode_coef(lis):
365        '''Generate a repetition coefficient for periodic list'''
366        if len(lis) < 2:
367            return 0
368        coef = 1
369        while coef != len(lis):
370            if lis[coef-1] != lis[coef]:
371                break
372            coef += 1
373        if (not len(lis) % (coef * (max(lis) + 1)) and 
374            lis == NtvConnector.keysfromcoef(coef, max(lis) + 1, len(lis))):
375            return coef
376        return 0
377    
378    @staticmethod 
379    def keysfromcoef(coef, period, leng=None):
380        ''' return a list of keys with periodic structure'''
381        if not leng:
382            leng = coef * period
383        return None if not (coef and period) else [(ind % (coef * period)) // coef 
384                                                   for ind in range(leng)]    
385    @staticmethod
386    def init_ntv_keys(ind, lidx, leng):
387        ''' initialization of explicit keys data in lidx object of tabular data'''
388        # name: 0, type: 1, codec: 2, parent: 3, keys: 4, coef: 5, leng: 6
389        name, typ, codec, parent, keys, coef, length = lidx[ind]
390        if (keys, parent, coef) == (None, None, None):  # full or unique
391            if len(codec) == 1: # unique
392                lidx[ind][4] = [0] * leng
393            elif len(codec) == leng:    # full
394                lidx[ind][4] = list(range(leng))
395            else:
396                raise NtvError('impossible to generate keys')
397            return
398        if keys and len(keys) > 1 and parent is None:  #complete
399            return
400        if coef:  #primary
401            lidx[ind][4] = [(ikey % (coef * len(codec))) // coef for ikey in range(leng)]
402            lidx[ind][3] = None
403            return  
404        if parent is None:
405            raise NtvError('keys not referenced')          
406        if not lidx[parent][4] or len(lidx[parent][4]) != leng:
407            NtvConnector.init_ntv_keys(parent, lidx, leng)
408        if not keys and len(codec) == len(lidx[parent][2]):    # implicit
409            lidx[ind][4] = lidx[parent][4]
410            lidx[ind][3] = None
411            return
412        lidx[ind][4] = NtvConnector.keysfromderkeys(lidx[parent][4], keys)  # relative
413        #lidx[ind][4] = [keys[pkey] for pkey in lidx[parent][4]]  # relative
414        lidx[ind][3] = None
415        return    

The NtvConnector class is an abstract class used by all NTV connectors for conversion between NTV-JSON data and NTV-OBJ data.

A NtvConnector child is defined by:

  • clas_obj: str - define the class name of the object to convert
  • clas_typ: str - define the Datatype of the converted object
  • to_obj_ntv: method - converter from JsonNTV to the object
  • to_json_ntv: method - converter from the object to JsonNTV

class method :

abstract method

static method

castable

return a list with class_name allowed for json-obj conversion

dic_obj

return a dict with the connectors: { type: class_connec_name }

dic_type

return a dict with the connectors: { class_obj_name: type }

@classmethod
def connector(cls):
176    @classmethod
177    def connector(cls):
178        '''return a dict with the connectors: { class_connec_name: class_connec }'''
179        return {clas.__name__: clas for clas in cls.__subclasses__()}

return a dict with the connectors: { class_connec_name: class_connec }

@classmethod
def dic_connec(cls):
181    @classmethod
182    def dic_connec(cls):
183        '''return a dict with the clas associated to the connector:
184        { class_obj_name: class_connec_name }'''
185        return {clas.clas_obj: clas.__name__ for clas in cls.__subclasses__()}

return a dict with the clas associated to the connector: { class_obj_name: class_connec_name }

@staticmethod
@abstractmethod
def to_obj_ntv(ntv_value, **kwargs):
187    @staticmethod
188    @abstractmethod
189    def to_obj_ntv(ntv_value, **kwargs):
190        ''' abstract - convert ntv_value into the return object'''

abstract - convert ntv_value into the return object

@staticmethod
@abstractmethod
def to_json_ntv(value, name=None, typ=None):
192    @staticmethod
193    @abstractmethod
194    def to_json_ntv(value, name=None, typ=None):
195        ''' abstract - convert NTV object (value, name, type) into the NTV json
196        (json-value, name, type)'''

abstract - convert NTV object (value, name, type) into the NTV json (json-value, name, type)

@staticmethod
def cast(data, name=None, type_str=None):
198    @staticmethod
199    def cast(data, name=None, type_str=None):
200        '''return JSON-NTV conversion (json_value, name, type_str) of the NTV entity
201        defined in parameters.
202
203        *Parameters*
204
205        - **data**: NtvSingle entity or NTVvalue of the NTV entity
206        - **name** : String (default None) - name of the NTV entity
207        - **type_str**: String (default None) - type of the NTV entity
208        '''
209        clas = data.__class__.__name__
210        if clas == 'NtvSingle':
211            name = data.ntv_name
212            type_str = data.type_str
213            data = data.ntv_value
214        dic_geo_cl = NtvConnector.DIC_GEO_CL
215        dic_connec = NtvConnector.dic_connec()
216        match clas:
217            case 'int' | 'float' | 'bool' | 'str':
218                return (data, name, type_str)
219            case 'dict':
220                return ({key: NtvConnector.cast(val, name, type_str)[0]
221                         for key, val in data.items()}, name, type_str)
222            case 'list':
223                return ([NtvConnector.cast(val, name, type_str)[0] for val in data],
224                        name, NtvConnector._typ_obj(data) if not type_str else type_str)
225            case 'tuple':
226                return (list(data), name, 'array' if not type_str else type_str)
227            case 'date' | 'time' | 'datetime':
228                return (data.isoformat(), name, clas if not type_str else type_str)
229            case 'Point' | 'MultiPoint' | 'LineString' | 'MultiLineString' | \
230                    'Polygon' | 'MultiPolygon':
231                return (NtvConnector.connector()[dic_connec['geometry']].to_json_ntv(data)[0],
232                        name, dic_geo_cl[data.__class__.__name__] if not type_str else type_str)
233            case 'NtvSingle' | 'NtvList':
234                return (data.to_obj(), name, 'ntv' if not type_str else type_str)
235            case _:
236                connec = None
237                if clas in dic_connec and dic_connec[clas] in NtvConnector.connector():
238                    connec = NtvConnector.connector()[dic_connec[clas]]
239                if connec:
240                    return connec.to_json_ntv(data, name, type_str)
241                raise NtvError(
242                    'connector is not defined for the class : ', clas)
243        return (data, name, type_str)

return JSON-NTV conversion (json_value, name, type_str) of the NTV entity defined in parameters.

Parameters

  • data: NtvSingle entity or NTVvalue of the NTV entity
  • name : String (default None) - name of the NTV entity
  • type_str: String (default None) - type of the NTV entity
@staticmethod
def uncast(value, name=None, type_str=None, **kwargs):
245    @staticmethod
246    def uncast(value, name=None, type_str=None, **kwargs):
247        '''return OBJ-NTV conversion (obj_value, name, type_str) of a NTV entity
248
249        *Parameters*
250
251        - **data**: NtvSingle entity or NTVvalue of the NTV entity
252        - **name** : String (default None) - name of the NTV entity
253        - **type_str**: String (default None) - type of the NTV entity
254        '''
255        if type_str == 'json':
256            return (value, name, type_str)
257        if value.__class__.__name__ == 'NtvSingle':
258            if not (type_str in set(NtvConnector.dic_type.values()) and 
259                    NtvConnector.is_json(value) or type_str is None):
260                return (value.ntv_value, value.name, value.type_str)
261            type_str = value.type_str if value.ntv_type else None
262            name = value.ntv_name
263            value = value.ntv_value
264        #dic_obj = NtvConnector.dic_obj
265        option = {'dicobj': {}, 'format': 'json', 'type_obj': False} | kwargs
266        value_obj = NtvConnector._uncast_val(value, type_str, **option)
267        return (value_obj, name, type_str if type_str else NtvConnector._typ_obj(value_obj))

return OBJ-NTV conversion (obj_value, name, type_str) of a NTV entity

Parameters

  • data: NtvSingle entity or NTVvalue of the NTV entity
  • name : String (default None) - name of the NTV entity
  • type_str: String (default None) - type of the NTV entity
@staticmethod
def is_json_class(val):
316    @staticmethod
317    def is_json_class(val):
318        ''' return True if val is a json type'''
319        return val is None or isinstance(val, (list, int, str, float, bool, dict))

return True if val is a json type

@staticmethod
def is_json(obj):
321    @staticmethod
322    def is_json(obj):
323        ''' check if obj is a json structure and return True if obj is a json-value
324
325        *Parameters*
326
327        - **obj** : object to check
328        - **ntvobj** : boolean (default False) - if True NTV class value are accepted'''
329        if obj is None:
330            return True
331        is_js = NtvConnector.is_json
332        match obj:
333            case str() | int() | float() | bool() as obj:
334                return True
335            case list() | tuple() as obj:
336                if not obj:
337                    return True
338                return min(is_js(obj_in) for obj_in in obj)
339            case dict() as obj:
340                if not obj:
341                    return True
342                if not min(isinstance(key, str) for key in obj.keys()):
343                    raise NtvError('key in dict in not string')
344                return min(is_js(obj_in) for obj_in in obj.values())
345            case _:
346                if not obj.__class__.__name__ in NtvConnector.castable:
347                    raise NtvError(obj.__class__.__name__ +
348                                   ' is not valid for NTV')
349                return False

check if obj is a json structure and return True if obj is a json-value

Parameters

  • obj : object to check
  • ntvobj : boolean (default False) - if True NTV class value are accepted
@staticmethod
def keysfromderkeys(parentkeys, derkeys):
351    @staticmethod
352    def keysfromderkeys(parentkeys, derkeys):
353        '''return keys from parent keys and derkeys
354
355        *Parameters*
356
357        - **parentkeys** : list of keys from parent
358        - **derkeys** : list of derived keys
359
360        *Returns* : list of keys'''
361        return [derkeys[pkey] for pkey in parentkeys]

return keys from parent keys and derkeys

Parameters

  • parentkeys : list of keys from parent
  • derkeys : list of derived keys

Returns : list of keys

@staticmethod
def encode_coef(lis):
363    @staticmethod 
364    def encode_coef(lis):
365        '''Generate a repetition coefficient for periodic list'''
366        if len(lis) < 2:
367            return 0
368        coef = 1
369        while coef != len(lis):
370            if lis[coef-1] != lis[coef]:
371                break
372            coef += 1
373        if (not len(lis) % (coef * (max(lis) + 1)) and 
374            lis == NtvConnector.keysfromcoef(coef, max(lis) + 1, len(lis))):
375            return coef
376        return 0

Generate a repetition coefficient for periodic list

@staticmethod
def keysfromcoef(coef, period, leng=None):
378    @staticmethod 
379    def keysfromcoef(coef, period, leng=None):
380        ''' return a list of keys with periodic structure'''
381        if not leng:
382            leng = coef * period
383        return None if not (coef and period) else [(ind % (coef * period)) // coef 
384                                                   for ind in range(leng)]    

return a list of keys with periodic structure

@staticmethod
def init_ntv_keys(ind, lidx, leng):
385    @staticmethod
386    def init_ntv_keys(ind, lidx, leng):
387        ''' initialization of explicit keys data in lidx object of tabular data'''
388        # name: 0, type: 1, codec: 2, parent: 3, keys: 4, coef: 5, leng: 6
389        name, typ, codec, parent, keys, coef, length = lidx[ind]
390        if (keys, parent, coef) == (None, None, None):  # full or unique
391            if len(codec) == 1: # unique
392                lidx[ind][4] = [0] * leng
393            elif len(codec) == leng:    # full
394                lidx[ind][4] = list(range(leng))
395            else:
396                raise NtvError('impossible to generate keys')
397            return
398        if keys and len(keys) > 1 and parent is None:  #complete
399            return
400        if coef:  #primary
401            lidx[ind][4] = [(ikey % (coef * len(codec))) // coef for ikey in range(leng)]
402            lidx[ind][3] = None
403            return  
404        if parent is None:
405            raise NtvError('keys not referenced')          
406        if not lidx[parent][4] or len(lidx[parent][4]) != leng:
407            NtvConnector.init_ntv_keys(parent, lidx, leng)
408        if not keys and len(codec) == len(lidx[parent][2]):    # implicit
409            lidx[ind][4] = lidx[parent][4]
410            lidx[ind][3] = None
411            return
412        lidx[ind][4] = NtvConnector.keysfromderkeys(lidx[parent][4], keys)  # relative
413        #lidx[ind][4] = [keys[pkey] for pkey in lidx[parent][4]]  # relative
414        lidx[ind][3] = None
415        return    

initialization of explicit keys data in lidx object of tabular data

class NtvTree:
417class NtvTree:
418    ''' The NtvTree class is an iterator class used to traverse a NTV tree structure.
419    Some other methods give tree indicators and data.
420
421    *Attributes :*
422
423    - **ntv** : Ntv entity
424    - **_node**:  Ntv entity - node pointer
425    - **_stack**:  list - stack used to store context in recursive methods 
426
427    *dynamic values (@property)*
428    - `breadth`
429    - `size`
430    - `height`
431    - `adjacency_list`
432    - `nodes`
433    - `dic_nodes`
434    - `leaf_nodes`
435    - `inner_nodes`
436    '''
437
438    def __init__(self, ntv):
439        ''' the parameter of the constructor is the Ntv entity'''
440        self._ntv = ntv
441        self._node = None
442        self._stack = []
443
444    def __iter__(self):
445        ''' iterator without initialization'''
446        return self
447
448    def __next__(self):
449        ''' return next node in the tree'''
450        if self._node is None:
451            self._node = self._ntv
452        elif len(self._node) == 0:
453            raise StopIteration
454        elif self._node.__class__.__name__ == 'NtvList':
455            self._next_down()
456        else:
457            self._next_up()
458        return self._node
459
460    def _next_down(self):
461        ''' find the next subchild node'''
462
463        self._node = self._node[0]
464        self._stack.append(0)
465
466    def _next_up(self):
467        ''' find the next sibling or ancestor node'''
468        parent = self._node.parent
469        if not parent or self._node == self._ntv:
470            raise StopIteration
471        ind = self._stack[-1]
472        if ind < len(parent) - 1:  # if ind is not the last
473            self._node = parent[ind + 1]
474            self._stack[-1] += 1
475        else:
476            if parent == self._ntv:
477                raise StopIteration
478            self._node = parent
479            self._stack.pop()
480            self._next_up()
481            
482    @property
483    def breadth(self):
484        ''' return the number of leaves'''
485        return len(self.leaf_nodes)
486
487    @property
488    def size(self):
489        ''' return the number of nodes'''
490        return len(self.nodes)
491
492    @property
493    def height(self):
494        ''' return the height of the tree'''
495        return max(len(node.pointer()) for node in self.__class__(self._ntv))
496
497    @property
498    def adjacency_list(self):
499        ''' return a dict with the list of child nodes for each parent node'''
500        return {node: node.val for node in self.inner_nodes}
501
502    @property
503    def nodes(self):
504        ''' return the list of nodes according to the DFS preordering algorithm'''
505        return list(self.__class__(self._ntv))
506
507    @property
508    def dic_nodes(self):
509        ''' return a dict of nodes according to the DFS preordering algorithm'''
510        return {node.ntv_name: node for node in self.__class__(self._ntv)
511                if node.ntv_name}
512
513    @property
514    def leaf_nodes(self):
515        ''' return the list of leaf nodes according to the DFS preordering algorithm'''
516        #return [node for node in self.__class__(self._ntv) if not isinstance(node, NtvList)]
517        return [node for node in self.__class__(self._ntv)
518                if node.__class__.__name__ == 'NtvSingle']
519
520    @property
521    def inner_nodes(self):
522        ''' return the list of inner nodes according to the DFS preordering algorithm'''
523        return [node for node in self.__class__(self._ntv)
524                if node.__class__.__name__ == 'NtvList']

The NtvTree class is an iterator class used to traverse a NTV tree structure. Some other methods give tree indicators and data.

Attributes :

  • ntv : Ntv entity
  • _node: Ntv entity - node pointer
  • _stack: list - stack used to store context in recursive methods

dynamic values (@property)

NtvTree(ntv)
438    def __init__(self, ntv):
439        ''' the parameter of the constructor is the Ntv entity'''
440        self._ntv = ntv
441        self._node = None
442        self._stack = []

the parameter of the constructor is the Ntv entity

breadth

return the number of leaves

size

return the number of nodes

height

return the height of the tree

adjacency_list

return a dict with the list of child nodes for each parent node

nodes

return the list of nodes according to the DFS preordering algorithm

dic_nodes

return a dict of nodes according to the DFS preordering algorithm

leaf_nodes

return the list of leaf nodes according to the DFS preordering algorithm

inner_nodes

return the list of inner nodes according to the DFS preordering algorithm

class NtvJsonEncoder(json.encoder.JSONEncoder):
527class NtvJsonEncoder(json.JSONEncoder):
528    """json encoder for Ntv data"""
529
530    def default(self, o):
531        try:
532            return NtvConnector.cast(o)[0]
533        except NtvError:
534            return json.JSONEncoder.default(self, o)
535        if isinstance(o, (datetime.datetime, datetime.date, datetime.time)):
536            return o.isoformat()
537        return json.JSONEncoder.default(self, o)

json encoder for Ntv data

def default(self, o):
530    def default(self, o):
531        try:
532            return NtvConnector.cast(o)[0]
533        except NtvError:
534            return json.JSONEncoder.default(self, o)
535        if isinstance(o, (datetime.datetime, datetime.date, datetime.time)):
536            return o.isoformat()
537        return json.JSONEncoder.default(self, o)

Implement this method in a subclass such that it returns a serializable object for o, or calls the base implementation (to raise a TypeError).

For example, to support arbitrary iterators, you could implement default like this::

def default(self, o):
    try:
        iterable = iter(o)
    except TypeError:
        pass
    else:
        return list(iterable)
    # Let the base class default method raise the TypeError
    return JSONEncoder.default(self, o)
Inherited Members
json.encoder.JSONEncoder
JSONEncoder
encode
iterencode
class NtvError(builtins.Exception):
540class NtvError(Exception):
541    ''' NTV Exception'''
542    # pass

NTV Exception

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback