python.observation.essearch
How to use observation.essearch ?
Create the MongoDB database: You can easily create an account on MongoDB. Once you have a database, follow the guidelines here to connect to it using pymongo. All you need in order to be able to use this module is to be able to connect to the Collection with pymongo.
Fill the database with your data: Construct an observation or a list of observations containing your data using dedicated functions from
python.observation.Observation
. You can then useinsert_mongo(collection, observation)
to insert it in the database.Write a request using observation.ESSearch: An
ESSearch
instance must be created with either a MongoDB Collection (passed as argument collection) or a list of observations (passed as argument data). Criteria for the query are then added one by one usingESSearch.addCondition
orESSearch.orCondition
, or all together withESSearch.addConditions
or passed as argument parameters of ESSearch.
A condition is composed of:
- a path giving which element is concerned by the condition;
- an operand which is the item of the comparison (if omitted, the existence of the path is tested);
- a comparator which can be applied on the operand, for example '>=' or 'within' (defaults to equality in most cases);
- optional parameters detailed in
ESSearch.addCondition
documentation, like inverted to add a not.
Execute the research with ESSearch.execute()
. Put the parameter single to True if you want the return to be a single observation
instead of a list of observations.
Example of python code using observation.essearch module:
from pymongo import MongoClient
from observation.essearch import ESSearch
import datetime
client = Mongoclient(<Mongo-auth>)
collection = client[<base>][<collection>]
# In this example, we search for measures of property PM25 taken between 2022/01/01
# and 2022/31/12 and we ensure the measure is an Observation.
# We execute with argument single = True to merge the result in one single
# Observation.
# Option 1
srch = ESSearch(collection)
srch.addCondition('datation', datetime.datetime(2022, 1, 1), '>=')
srch.addCondition('datation', datetime.datetime(2022, 12, 31), '<=')
srch.addCondition('property', 'PM25')
srch.addCondition(path = 'type', comparator = '==', operand = 'observation')
result = srch.execute(single = True)
# Option 2 (equivalent to option 1 but on one line)
result = ESSearch(collection,
[['datation', datetime.datetime(2022, 1, 1), '>='],
['datation', datetime.datetime(2022, 12, 31), '<='],
['property', 'PM25'],
{'path': 'type', 'comparator': '==', 'operand': 'observation'}]
).execute(single = True)
1''' 2# How to use observation.essearch ? 3 41. **Create the MongoDB database:** 5You can easily create an account on MongoDB. Once you have a database, follow the guidelines [here](https://www.mongodb.com/docs/atlas/tutorial/connect-to-your-cluster/#connect-to-your-atlas-cluster) to connect to it using pymongo. 6All you need in order to be able to use this module is to be able to connect to the Collection with pymongo. 7 82. **Fill the database with your data:** 9Construct an observation or a list of observations containing your data using dedicated functions from `python.observation.Observation`. 10You can then use `insert_mongo(collection, observation)` to insert it in the database. 11 123. **Write a request using observation.ESSearch:** 13An `ESSearch` instance must be created with either a MongoDB Collection (passed as argument **collection**) or a list of observations (passed as argument **data**). 14Criteria for the query are then added one by one using `ESSearch.addCondition` or `ESSearch.orCondition`, or all together with `ESSearch.addConditions` or passed as argument **parameters** of ESSearch. 15 16A condition is composed of: 17- a **path** giving which element is concerned by the condition; 18- an **operand** which is the item of the comparison (if omitted, the existence of the path is tested); 19- a **comparator** which can be applied on the operand, for example '>=' or 'within' (defaults to equality in most cases); 20- optional parameters detailed in `ESSearch.addCondition` documentation, like **inverted** to add a *not*. 21 22Execute the research with `ESSearch.execute()`. Put the parameter **single** to True if you want the return to be a single observation 23instead of a list of observations. 24 25Example of python code using observation.essearch module: 26```python 27from pymongo import MongoClient 28from observation.essearch import ESSearch 29import datetime 30 31client = Mongoclient(<Mongo-auth>) 32collection = client[<base>][<collection>] 33 34# In this example, we search for measures of property PM25 taken between 2022/01/01 35# and 2022/31/12 and we ensure the measure is an Observation. 36# We execute with argument single = True to merge the result in one single 37# Observation. 38 39# Option 1 40srch = ESSearch(collection) 41srch.addCondition('datation', datetime.datetime(2022, 1, 1), '>=') 42srch.addCondition('datation', datetime.datetime(2022, 12, 31), '<=') 43srch.addCondition('property', 'PM25') 44srch.addCondition(path = 'type', comparator = '==', operand = 'observation') 45result = srch.execute(single = True) 46 47# Option 2 (equivalent to option 1 but on one line) 48result = ESSearch(collection, 49 [['datation', datetime.datetime(2022, 1, 1), '>='], 50 ['datation', datetime.datetime(2022, 12, 31), '<='], 51 ['property', 'PM25'], 52 {'path': 'type', 'comparator': '==', 'operand': 'observation'}] 53 ).execute(single = True) 54``` 55''' 56import datetime 57import shapely.geometry 58from pymongo.collection import Collection 59from pymongo.cursor import Cursor 60from pymongo.command_cursor import CommandCursor 61import bson 62 63from observation.esobservation import Observation 64from observation.field import Field 65from observation.util import util 66from observation.timeslot import TimeSlot 67 68dico_alias_mongo = { # dictionnary of the different names accepted for each comparator and each given type. <key>:<value> -> <accepted name>:<name in MongoDB> 69 # any type other than those used as keys is considered non valid 70 str : { 71 None:"$eq", 72 "eq":"$eq", "=":"$eq", "==":"$eq", "$eq":"$eq", 73 "in":"$in", "$in":"$in", 74 "regex":"$regex", "$regex":"$regex", 75 "oid":"$oid","$oid":"$oid" 76 }, 77 int : { 78 None:"$eq", 79 "eq":"$eq", "=":"$eq", "==":"$eq", "$eq":"$eq", 80 "gte":"$gte", ">=":"$gte", "=>":"$gte", "$gte":"$gte", 81 "gt":"$gt", ">":"$gt", "$gt":"$gt", 82 "lte":"$lte", "<=":"$lte", "=<":"$lte", "$lte":"$lte", 83 "lt":"$lt", "<":"$lt", "$lt":"$lt", 84 "in":"$in", "$in":"$in" 85 }, 86 datetime.datetime : { 87 None:"$eq", 88 "eq":"$eq", "=":"$eq", "==":"$eq", "$eq":"$eq", 89 "gte":"$gte", ">=":"$gte", "=>":"$gte", "$gte":"$gte", 90 "gt":"$gt", ">":"$gt", "$gt":"$gt", 91 "lte":"$lte", "<=":"$lte", "=<":"$lte", "$lte":"$lte", 92 "lt":"$lt", "<":"$lt", "$lt":"$lt", 93 "in":"$in", "$in":"$in" 94 }, 95 TimeSlot : { 96 None:"within", 97 "eq":"within", "=":"within", "==":"within", "$eq":"within", "within":"within", "within":"within", 98 "contains":"intersects", "$contains":"intersects", 99 "in":"within", "$in":"within", "within":"within", "$within":"within", 100 "disjoint":"disjoint", "$disjoint":"disjoint", 101 "intersects":"intersects", "$intersects":"intersects" 102 }, 103 list : { # lists are interpreted as geometries 104 None:"$geoIntersects", 105 "eq":"equals", "=":"equals", "==":"equals", "$eq":"equals", "equals":"equals", "$equals":"equals", 106 "$geowithin":"$geoWithin", "geowithin":"$geoWithin", "$geoWithin":"$geoWithin", "geoWithin":"$geoWithin", "within":"$geoWithin", "$within":"$geoWithin", 107 "disjoint":"disjoint", "$disjoint":"disjoint", 108 "intersects":"$geoIntersects", "$intersects":"$geoIntersects", "geoIntersects":"$geoIntersects", "$geointersects":"$geoIntersects", "geoIntersects":"$geoIntersects", "$geoIntersects":"$geoIntersects", 109 "touches":"touches", "$touches":"touches", 110 "overlaps":"overlaps", "$overlaps":"overlaps", 111 "contains":"contains", "$contains":"contains", 112 "$geoNear":"$geoNear", "$geonear":"$geoNear", "geonear":"$geoNear", "geoNear":"$geoNear", 113 114 "in":"$in", "$in":"$in" # only in case where a list is not a geometry 115 }, 116 #bson.objectid.ObjectId : { 117 bson.ObjectId : { 118 None:"$eq", 119 "eq":"$eq", "=":"$eq", "==":"$eq", "$eq":"$eq", 120 "in":"$in", "$in":"$in" 121 } 122} 123dico_alias_mongo[float] = dico_alias_mongo[int] 124 125_geoeq = lambda x, y: x.equals(y) 126_geowith = lambda x, y: x.within(y) 127_geodis = lambda x, y: x.disjoint(y) 128_geointer = lambda x, y: x.intersects(y) 129_geotou = lambda x, y: x.touches(y) 130_geoover = lambda x, y: x.overlaps(y) 131_geocont = lambda x, y: x.contains(y) 132_geonear = lambda x, y: True 133 134_defeq = lambda x, y: x == y 135_defsupeq = lambda x, y: x >= y 136_defsup = lambda x, y: x > y 137_definfeq = lambda x, y: x <= y 138_definf = lambda x, y: x < y 139_defin = lambda x, y: x in y 140 141_timinfeq = lambda x, y: x.bounds[0] <= y # at least one element of the TimeSlot is lte y 142_timinf = lambda x, y: x.bounds[0] < y # at least one element of the TimeSlot is lt y 143_timsupeq = lambda x, y: x.bounds[1] >= y # at least one element of the TimeSlot is gte y 144_timsup = lambda x, y: x.bounds[1] > y # at least one element of the TimeSlot is gt y 145_timeq = lambda x, y: x == TimeSlot(y) 146# To have all elements verify a comparison instead of just one, combine with parameter inverted. 147# For example : not (at least one element of the TimeSlot is lte y) <=> all elements of the TimeSlot are gt y 148 149dico_alias_python = { 150 TimeSlot : { # only used in python filtering part 151 TimeSlot : { # comparison of a TimeSlot with a timeSlot 152 None:"equals", 153 "eq":"equals", "=":"equals", "==":"equals", "$eq":"equals", "equals":"equals", "$equals":"equals", 154 "contains":"contains", "$contains":"contains", 155 "in":"within", "$in":"within", "within":"within", "$within":"within", 156 "disjoint":"disjoint", "$disjoint":"disjoint", 157 "intersects":"intersects", "$intersects":"intersects" 158 }, 159 160 datetime.datetime : { # comparison of a datetime and a TimeSlot 161 None:_timeq, 162 "eq":_timeq, "=":_timeq, "==":_timeq, "$eq":_timeq, "equals":_timeq, "$equals":_timeq, 163 "$gte":_timsupeq, "gte":_timsupeq, ">=":_timsupeq, "=>":_timsupeq, 164 "$gt":_timsup, "gt":_timsup, ">":_timsup, 165 "$lte":_timinfeq, "lte":_timinfeq, "<=":_timinfeq, "=<":_timinfeq, 166 "$lt":_timinf, "lt":_timinf, "<":_timinf 167 } 168 }, 169 'geometry' : { # lists are interpreted as geometries 170 None:_geointer, 171 "eq":_geoeq, "=":_geoeq, "==":_geoeq, "$eq":_geoeq, "equals":_geoeq, "$equals":_geoeq, 172 "$geowithin":_geowith, "geowithin":_geowith, "$geoWithin":_geowith, "geoWithin":_geowith, "within":_geowith, "$within":_geowith, 173 "disjoint":_geodis, "$disjoint":_geodis, 174 "intersects":_geointer, "$intersects":_geointer, "geoIntersects":_geointer, "$geointersects":_geointer, "geoIntersects":_geointer, "$geoIntersects":_geointer, 175 "touches":_geotou, "$touches":_geotou, 176 "overlaps":_geoover, "$overlaps":_geoover, 177 "contains":_geocont, "$contains":_geocont, 178 "$geoNear":_geonear, "$geonear":_geonear, "geonear":_geonear, "geoNear":_geonear 179 }, 180 'default' : { 181 None:_defeq, 182 "eq":_defeq, "=":_defeq, "==":_defeq, "$eq":_defeq, 183 "gte":_defsupeq, ">=":_defsupeq, "=>":_defsupeq, "$gte":_defsupeq, 184 "gt":_defsup, ">":_defsup, "$gt":_defsup, 185 "lte":_definfeq, "<=":"$lte", "=<":_definfeq, 186 "lt":_definf, "<":_definf, "$lt":_definf, 187 "in":_defin, "$in":_defin 188 } 189} 190 191def insert_from_doc(collection, document , info=True): 192 '''Inserts all observations from a document into a collection, where each line of the document corresponds to an observation.''' 193 with open(document, 'r') as doc: 194 for line in doc: 195 try: insert_to_mongo(collection, line, info) 196 except: pass 197 198def insert_to_mongo(collection, obj, info=False): # Mieux avec panda ? 199 '''Takes an observation or a list of observations and inserts them into a MongoDB collection, with info by default.''' 200 # Faire une fonction pour permettre l'ajout direct de fichiers csv. 201 inserted_list = [] 202 if isinstance(obj, list): 203 pile = obj 204 elif isinstance(obj, Observation): 205 pile = [obj] 206 else: 207 pile = [Observation.from_obj(obj)] 208 for obs in pile: 209 if obs.id: obs_hash = obs.id 210 else: obs_hash = hash(obs) 211 metadata = {'id': obs_hash} 212 if obs.name : metadata['name'] = obs.name 213 if obs.param: metadata['param'] = obs.param 214 if info: metadata['information'] = Observation._info(True, True) 215 if len(obs.lname) == 1: # a special case is needed because lists with one element are replaced by the element itself so iteration doesn't work 216 for line in obs: 217 inserted_list.append({obs.lname[0]: util.json(line, encoded=False, typevalue=None, simpleval=False, geojson=True), 218 '_metadata': metadata}) 219 elif len(obs.lname) > 1: 220 for line in obs: 221 inserted_list.append({obs.lname[i]: util.json(line[i], encoded=False, typevalue=None, simpleval=False, geojson=True) 222 for i in range(len(line))} | {'_metadata': metadata}) 223 if inserted_list != []: collection.insert_many(inserted_list) 224 225def empty_request(collection): 226 """ 227 Empty request to get an idea of what the database contains. 228 Currently returns the count of elements in the collection and the name of each column. 229 """ 230 count = 0 231 column_names = [] 232 cursor = collection.find() 233 for doc in cursor: 234 count += 1 235 for column_name in doc: 236 if column_name not in column_names: 237 column_names.append(column_name) 238 return {'count': count, 'column_names': column_names} 239 240class ESSearch: 241 """ 242 An `ESSearch` is defined as an ensemble of conditions to be used to execute a MongoDB request or any iterable containing only observations. 243 244 *Attributes (for @property, see methods)* : 245 246 - **input** : input on which the query is done. One of or a list of these : 247 - pymongo.collection.Collection 248 - pymongo.cursor.Cursor 249 - pymongo.command_cursor.CommandCursor 250 - Observation (can be defined from a str or a dict) 251 - **parameters** : list of list of conditions for queries, to be interpreted as : parameters = [[cond_1 AND cond_2 AND cond_3] OR [cond_4 AND cond_5 AND cond_6]] where conds are criteria for queries 252 - **hide** : list of paths to hide from the output 253 - **heavy** : boolean indicating whether the request should search for nested values or not. Does not work with geoJSON. 254 - **sources** : attribute used to indicate the sources of the data in param 255 256 The methods defined in this class are (documentations in methods definitions): 257 258 *setter* 259 260 - `ESSearch.addInput` 261 - `ESSearch.removeInputs` 262 - `ESSearch.setHide` 263 - `ESSearch.setHeavy` 264 - `ESSearch.clear` 265 266 *dynamic value (getter @property)* 267 268 - `ESSearch.request` 269 - `ESSearch.cursor` 270 271 *parameters for query - update methods* 272 273 - `ESSearch.addConditions` 274 - `ESSearch.addCondition` 275 - `ESSearch.orCondition` 276 - `ESSearch.removeCondition` 277 - `ESSearch.clearConditions` 278 279 *query method* 280 281 - `ESSearch.execute` 282 """ 283 def __init__(self, 284 input = None, 285 parameters = None, 286 hide = [], 287 heavy = False, 288 sources = None, 289 **kwargs 290 ): 291 ''' 292 ESSearch constructor. Parameters can also be defined and updated using class methods. 293 294 *Arguments* 295 296 - **input** : input on which the query is done. Must be one of or a list of these (can be nested): 297 - pymongo.collection.Collection 298 - pymongo.cursor.Cursor 299 - pymongo.command_cursor.CommandCursor 300 - Observation 301 - str corresponding to a json Observation 302 - dict corresponding to a json Observation 303 - **parameters** : dict, list (default None) - list of list or list of dictionnaries whose keys are arguments of ESSearch.addCondition method 304 ex: parameters = [ 305 {'name' : 'datation', 'operand' : datetime.datetime(2022, 9, 19, 1), 'comparator' : '>='}, 306 {'name' : 'property', 'operand' : 'PM2'} 307 ] 308 - **hide** : list (default []) - List of strings where strings correspond to paths to remove from the output 309 - **heavy** : bool (default False) - Must be True when values are defined directly and inside dictionnaries simultaneously. 310 - **sources** : (default None) - Optional parameter indicating the sources of the data in case when a query is executed with parameter single = True. 311 - **kwargs** : other parameters are used as arguments for ESSearch.addCondition method. 312 ''' 313 self.parameters = [[]] # self.parameters 314 if isinstance(hide, list): self.hide = hide # self.hide 315 else: raise TypeError("hide must be a list.") 316 317 if isinstance(heavy, bool): self.heavy = heavy # self.heavy 318 else: raise TypeError("heavy must be a bool.") 319 self.sources = sources # self.sources 320 321 self.input = [[], []] # self.input : formatted as [[Mongo Objects], [Observations]] (list of two lists) 322 if isinstance(input, list): pile = input 323 else: pile = [input] 324 while not len(pile) == 0: 325 obj = pile.pop() 326 if isinstance(obj, list): 327 pile += obj 328 elif isinstance(obj, (Collection, Cursor, CommandCursor)): 329 self.input[0].append(obj) 330 elif isinstance(obj, Observation): 331 self.input[1].append(obj) 332 elif isinstance(obj, (str, dict)): 333 try: 334 self.input[1].append(Observation.from_obj(obj)) 335 except: 336 raise ValueError("Cannot convert " + str(obj) + " to an Observation.") 337 elif obj is not None: 338 raise TypeError("Unsupported type for input " + str(obj)) 339 340 if parameters: self.addConditions(parameters) 341 if kwargs: self.addCondition(**kwargs) 342 343 def __repr__(self): 344 return "ESSearch(input = " + str(self.input) + ", parameters = " + str(self.parameters) + ")" 345 346 def __str__(self): 347 return str(self.parameters) 348 349 def __iter__(self): 350 self.n = -1 351 return self 352 353 def __next__(self): 354 if self.n < len(self.parameters)-1: 355 self.n += 1 356 return self.parameters[self.n] 357 else: 358 raise StopIteration 359 360 def __getitem__(self, key): 361 return self.parameters[key] 362 363 def addInput(self, input): 364 """ 365 Adds one or many inputs on which the query is to be executed given by argument input. 366 Inputs can be: 367 - pymongo.collection.Collection 368 - pymongo.cursor.Cursor 369 - pymongo.command_cursor.CommandCursor 370 - Observation 371 - str corresponding to a json Observation 372 - dict corresponding to a json Observation 373 or a list of any of these. 374 """ 375 added_input = [[], []] 376 if isinstance(input, list): pile = input 377 else: pile = [input] 378 while not len(pile) == 0: # using a stack (LIFO) to allow easy treatment of nested data. 379 obj = pile.pop() 380 if isinstance(obj, list): 381 pile += obj 382 elif isinstance(obj, (Collection, Cursor, CommandCursor)): 383 added_input[0].append(obj) 384 elif isinstance(obj, Observation): 385 added_input[1].append(obj) 386 elif isinstance(obj, (str, dict)): 387 try: 388 added_input[1].append(Observation.from_obj(obj)) 389 except: 390 raise ValueError("Cannot convert " + str(obj) + " to an Observation.") 391 elif obj is not None: 392 raise TypeError("Unsupported type for input " + str(obj)) 393 self.input[0] += added_input[0] # self.input is updated with the new inputs 394 self.input[1] += added_input[1] 395 396 def removeInputs(self): 397 """ 398 Removes all inputs from self. 399 """ 400 self.input = [[], []] 401 402 def setHide(self, hide): 403 ''' 404 Sets self.hide to a value given by argument hide. 405 ''' 406 if isinstance(hide, list): self.hide = hide 407 else: raise TypeError("hide must be a list.") 408 409 def setHeavy(self, heavy): 410 ''' 411 Sets self.heavy to a value given by argument heavy. 412 ''' 413 if isinstance(heavy, list): self.heavy = heavy 414 else: raise TypeError("heavy must be a bool.") 415 416 def setSources(self, sources): 417 ''' 418 Sets self.sources to a value given by argument sources. 419 ''' 420 self.sources = sources 421 422 def addConditions(self, parameters): 423 ''' 424 Takes multiple parameters and applies self.addCondition() on each of them. 425 ''' 426 if isinstance(parameters, dict): # case when one single condition is added 427 self.addCondition(**parameters) 428 elif isinstance(parameters, (list, tuple)): # case when several conditions are added 429 for parameter in parameters: 430 if isinstance(parameter, dict): self.addCondition(**parameter) 431 elif isinstance(parameters, (list, tuple)): self.addCondition(*parameter) 432 else: self.addCondition(parameter) 433 else: raise TypeError("parameters must be either a dict or a list of dict.") 434 435 def addCondition(self, path, operand = None, comparator = None, or_position = -1, **kwargs): 436 ''' 437 Takes parameters and inserts corresponding query condition in self.parameters. 438 439 *Parameters* 440 441 - **path** : str (required argument) - name of an IIndex, which corresponds to an Dataset column name, or name of a metadata element. 442 (ex: 'datation', 'location', 'property') 443 444 - **operand** : - (default None) - Object used for the comparison. 445 (ex: if we search for observations made in Paris, operand is 'Paris') 446 447 - **comparator**: str (default None) - str giving the comparator to use. (ex: '>=', 'in') 448 449 - **or_position** : int (default -1) - position in self.parameters in which the condition is to be inserted. 450 451 - **formatstring** : str (default None) - str to use to automatically change str to datetime before applying condition. 452 Does not update the data base. If value is set to 'default', format is assumed to be Isoformat. 453 454 - **inverted** : bool (default None) - to add a "not" in the condition. 455 To use in case where every element of a MongoDB array (equivalent to python list) must verify the condition (by default, condition is verified when at least one element of the array verifies it). 456 457 - **unwind** : int (default None) - int corresponding to the number of additional {"$unwind" : "$" + path} to be added in the beginning of the query. 458 459 - **regex_options** : str (default None) - str associated to regex options (i, m, x and s). See [this link](https://www.mongodb.com/docs/manual/reference/operator/query/regex/) for more details. 460 461 no comparator => default comparator associated with operand type in dico_alias_mongo is used (mainly equality) 462 no operand => only the existence of something located at path is tested 463 ''' 464 465 ## 1. Check if arguments given are valid. 466 467 if not isinstance(path, str): raise TypeError("name must be a str.") 468 if comparator is not None and not isinstance(comparator, str): raise TypeError("comparator must be a str.") 469 if or_position is not None and not isinstance(or_position, int): raise TypeError("or_position must be an int.") 470 471 for item in kwargs: # checking if parameters in kwarg do exist 472 if item not in {'formatstring', 'inverted', 'unwind', 'regex_options', 'distanceField', 'distanceMultiplier', 'includeLocs', 'key', 'maxDistance', 'minDistance', 'near', 'query', 'spherical'}: 473 raise ValueError("Unknown parameter : ", item) 474 475 if isinstance(operand, datetime.datetime) and (operand.tzinfo is None or operand.tzinfo.utcoffset(operand) is None): 476 operand = operand.replace(tzinfo=datetime.timezone.utc) 477 478 if operand: # checking if comparator can be applied on the operand 479 try: comparator = dico_alias_mongo[type(operand)][comparator] 480 except: raise ValueError("Incompatible values for comparator and operand. Ensure parameters are in the correct order.") 481 elif comparator: 482 raise ValueError("operand must be defined when comparator is used.") 483 484 ## 2. Add the condition to self.parameters 485 486 condition = {"comparator" : comparator, "operand" : operand, "path" : path} | kwargs 487 488 if or_position >= len(self.parameters): 489 self.parameters.append([condition]) 490 else: 491 self.parameters[or_position].append(condition) 492 493 def orCondition(self, *args, **kwargs): 494 ''' 495 Adds a condition in a new sublist in self.parameters. Separations in sublists correspond to "or" in the query. 496 ''' 497 self.addCondition(or_position = len(self.parameters), *args, **kwargs) 498 499 def removeCondition(self, or_position = None, condnum = None): 500 ''' 501 Removes a condition from self.parameters. By default, last element added is removed. 502 Otherwise, the removed condition is the one at self.parameters[or_position][condnum]. 503 504 To remove all conditions, use ESSearch.clearConditions() method. 505 ''' 506 if self.parameters == [[]]: return 507 if or_position is None: 508 if condnum is None: # by default : remove the very last added condition. 509 if len(self.parameters[-1]) > 1: self.parameters[-1].pop(-1) 510 else: self.parameters.pop(-1) 511 else: 512 if len(self.parameters[-1]) > 1 or condnum > 1: self.parameters[-1].pop(condnum) 513 else: self.parameters.pop(-1) 514 else: 515 if condnum is None or (len(self.parameters[or_position]) == 1 and condnum == 0): self.parameters.pop(or_position) # if or_position is not None and condnum is, the whole sublist at or_position is removed. 516 else: self.parameters[or_position].pop(condnum) 517 if self.parameters == []: # ensure self.parameters returns to its default value after being emptied 518 self.parameters = [[]] 519 520 def clearConditions(self): 521 ''' 522 Removes all conditions from self.parameters. 523 To remove all attributes, use ESSearch.clear() method. 524 ''' 525 self.parameters = [[]] 526 527 def clear(self): 528 ''' 529 Resets self. 530 (Creating a new Observation would be smarter than using this function.) 531 ''' 532 self = ESSearch() 533 534 def _cond(self, or_pos, operand, comparator, path, inverted = False, formatstring = None, unwind = None, regex_options = None, **kwargs): 535 ''' 536 Takes parameters and adds corresponding MongoDB expression to self._match. 537 self._unwind and self._set are updated when necessary. 538 ''' 539 if unwind: 540 if isinstance(unwind, str): 541 self._unwind.append(unwind) 542 elif isinstance(unwind, int): 543 for _ in range(unwind): self._unwind.append(path) 544 elif isinstance(unwind, tuple): # format : (<path>, <unwind quantity>) 545 for _ in range(unwind[1]): self._unwind.append(unwind[0]) 546 else: raise TypeError("unwind must be a tuple, a str or an int.") 547 548 if self.heavy and operand is not None: 549 if path not in self._heavystages: self._heavystages.add(path) # peut-être mieux de laisser l'utilisateur choisir manuellement 550 path = "_" + path + ".v" 551 552 if operand is None: # no operand => we only test if there is something located at path 553 comparator = "$exists" 554 operand = 1 555 else: 556 try: comparator = dico_alias_mongo[type(operand)][comparator] #global variable 557 except: 558 if formatstring: 559 try: comparator = dico_alias_mongo[datetime.datetime][comparator] 560 except: raise ValueError("Comparator not allowed.") 561 elif isinstance(operand, shapely.geometry.base.BaseGeometry): 562 operand = {"type" : operand.geom_type, "coordinates" : list(operand.exterior.coords)} 563 else: raise ValueError("Comparator not allowed.") 564 565 ##if path in {"$year", "$month", "$dayOfMonth", "$hour", "$minute", "$second", "$millisecond", "$dayOfYear", "$dayOfWeek"}: 566 ## self._set |= {path[1:]: {'datation' : path}} #à tester 567 ## path = datation 568 ## self._project |= {name[1:]:0} 569 570 if isinstance(operand, TimeSlot): #equals->within, contains->intersects, within, disjoint, intersects 571 self._filtered = True 572 if comparator == "within": 573 self._cond(or_pos, operand[0].start, "$gte", path, False) 574 self._cond(or_pos, operand[-1].end, "$lte", path, False) 575 elif comparator == "intersects": 576 self._cond(or_pos, operand[0].start, "$lte", path, False) # pourquoi False et pas inverted ici ?? 577 self._cond(or_pos, operand[-1].end, "$gte", path, False) 578 return 579 580 if formatstring: 581 if formatstring == "default": 582 if isinstance(operand, str): 583 operand = datetime.datetime.fromisoformat(operand) 584 self._set |= {path : {"$convert": {"input" : "$" + path, "to" : "date", "onError" : "$" + path}}} 585 else: 586 if isinstance(operand, str): 587 datetime.datetime.strptime(operand, formatstring) 588 self._set |= {path : {"$dateFromString" : {"dateString" : "$" + path, "format": formatstring, "onError": "$" + path}}} 589 590 if comparator in {"$geoIntersects", "$geoWithin"}: # operand : 591 # [x, y] or [[x, y]] -> Point ; 592 # [[x1, y1], [x2, y2]] -> LineString ; 593 # [[x1, y1], [x2, y2], [x3, y3], ...] or [[x1, y1], [x2, y2], [x3, y3], ..., [x1, y1]] or [[[x1, y1], [x2, y2], [x3, y3], ..., [x1, y1]]] -> Polygon. 594 if isinstance(operand, list): 595 if not isinstance(operand[0], list): 596 geom_type = "Point" 597 coordinates = operand 598 elif not isinstance(operand[0][0], list): 599 if len(operand) == 1: 600 geom_type = "Point" 601 coordinates = operand[0] 602 elif len(operand) == 2: 603 geom_type = "LineString" 604 coordinates = operand 605 elif len(operand) > 2: 606 if not operand[-1] == operand[0]: 607 operand.append(operand[0]) 608 geom_type = "Polygon" 609 coordinates = [operand] 610 else: raise ValueError("Unable to define a geometry from " + str(operand)) 611 else: 612 geom_type = "Polygon" 613 coordinates = operand 614 operand = {"$geometry" : {"type" : geom_type, "coordinates" : coordinates}} 615 elif isinstance(operand, dict) and '$geometry' not in operand: 616 operand = {"$geometry" : operand} 617 elif comparator == "$geoNear": # $geoNear is a MongoDB stage 618 self._geonear = self._geonear | kwargs 619 if 'distanceField' not in self._geonear: raise ValueError("distanceField missing in MongoDB stage $geoNear.") 620 return 621 elif isinstance(operand, list): # lists are interpreted as geometries. An additional filtering is necessary for geometry-specific functions 622 self._filtered = True 623 return 624 625 if comparator == "$regex" and regex_options: 626 cond_0 = {"$regex" : operand, "$options" : regex_options} 627 else: 628 cond_0 = {comparator : operand} 629 630 if inverted: 631 if path in self._match[or_pos]: 632 if "$nor" in self._match[or_pos][path]: 633 self._match[or_pos][path]["$nor"].append(cond_0) 634 elif "not" in self._match[or_pos][path]: 635 self._match[or_pos][path]["$nor"] = [self._match[or_pos][path]["$not"], cond_0] 636 del self._match[or_pos][path]["$not"] 637 else: 638 self._match[or_pos][path]["$not"] = cond_0 639 else: 640 self._match[or_pos][path] = {"$not" : cond_0} 641 else: 642 if path not in self._match[or_pos]: 643 self._match[or_pos][path] = cond_0 644 else: 645 self._match[or_pos][path] |= cond_0 646 647 def _fullSearchMongo(self): 648 """ 649 Takes self.parameters and returns a MongoDB Aggregation query. 650 """ 651 ## ESSearch._fullSearchMongo() 1: Declare private variables 652 653 request = [] 654 self._match = [] 655 self._unwind = [] 656 self._heavystages = set() # two additional set stages to treat nested objects 657 self._set = {} 658 self._geonear = {} 659 self._match = [] 660 self._project = {"_id" : 0} 661 for el in self.hide: self._project |= {el : 0} 662 663 ## ESSearch._fullSearchMongo() 2: Update private variables for each condition 664 665 for i in range(len(self.parameters)): # rewriting conditions in MongoDB format 666 self._match.append({}) 667 for cond in self.parameters[i]: 668 self._cond(or_pos = i, **cond) 669 670 ## ESSearch._fullSearchMongo() 3: Case 1 : find request 671 672 if not self._unwind and not self.heavy and not self._set and not self._geonear: # collection.find() request 673 if self._match: 674 j = 0 675 for i in range(len(self._match)): 676 if self._match[i] and j != i: # removing empty elements in place 677 self._match[j] = self._match[i] 678 j += 1 679 if j == 0: # when there is no $or 680 if self._match[0]: match = self._match[0] 681 else: # when there is a $or 682 match = {"$or": self._match[:j]} 683 return 'find', match 684 685 ## ESSearch._fullSearchMongo() 4: Case 2 : aggregate request 686 687 else: 688 if self._unwind: # Mongo $unwind stage 689 for unwind in self._unwind: 690 request.append({"$unwind" : "$" + unwind}) 691 if self._heavystages: # Additional Mongo $set stage if self.heavy is True 692 heavy = {} 693 for path in self._heavystages: 694 heavy |= {"_"+path:{"$cond":{"if":{"$eq":[{"$type":"$"+path},"object"]},"then":{"$objectToArray":"$"+path},"else": {"v":"$"+path}}}} 695 self._project |= {'_' + path: 0} 696 request.append({"$set" : heavy}) 697 if self._set: request.append({"$set" : self._set}) # Mongo $set stage 698 if self._geonear: request.append({"$geoNear" : self._geonear}) # Mongo $geoNear stage 699 if self._match: # Mongo $match stage 700 j = 0 701 for i in range(len(self._match)): 702 if self._match[i] and j != i: 703 self._match[j] = self._match[i] 704 j += 1 705 if j == 0: # when there is no $or 706 if self._match[0]: request.append({"$match" : self._match[0]}) 707 else: # when there is a $or 708 request.append({"$match" : {"$or": self._match[:j]}}) 709 if self._unwind: # Second Mongo $set stage when unwind not empty 710 dico = {} 711 for unwind in self._unwind: 712 if not unwind in dico: dico[unwind] = ["$" + unwind] 713 else: dico[unwind] = [dico[unwind]] 714 request.append({"$set" : dico}) 715 if self._project: request.append({"$project" : self._project}) # Mongo $project stage 716 return 'aggregation', request 717 718 @property 719 def request(self): 720 ''' 721 Getter returning the content of the query or aggregation query to be executed with ESSearch.execute(). 722 ''' 723 request_type, request_content = self._fullSearchMongo() 724 725 if request_type == 'find': 726 return 'collection.find(' + str(request_content) + ', ' + str(self._project) + ')' 727 else: 728 return 'collection.aggregate(' + str(request_content) + ')' 729 730 @property 731 def cursor(self): 732 ''' 733 Getter returning the cursors of the query or aggregation query result on all collections and cursors contained in self.input. 734 ''' 735 request_type, request_content = self._fullSearchMongo() 736 737 cursor_list = [] 738 for item in self.input[0]: # Determine the result cursor for each element of the input on which a Mongo query makes sense 739 if isinstance(item, (Collection, Cursor, CommandCursor)): 740 if request_type == 'find': 741 cursor_list.append(item.find(request_content, self._project)) 742 else: 743 cursor_list.append(item.aggregate(request_content)) 744 if len(cursor_list) == 1: 745 return cursor_list[0] 746 else: 747 return cursor_list 748 749 750 def execute(self, returnmode = 'observation', fillvalue = None, name = None, param = None): 751 ''' 752 Executes the request and returns its result, either in one or many Observations. 753 754 *Parameter* 755 756 - **returnmode** : str (default None) - Parameter giving the format of the output: 757 - 'unchanged' : output is returned as it is in the database, some operations like operations sepcific to TimeSlot object are not performed; 758 - 'observation': Each element is returned as an observation, but original observations aren't recreated; 759 - 'idfused': observations whose ids are the same are merged together; 760 - 'single': return a single observation merging all observations together. 761 - **fillvalue** : (default None) - Value to use to fill gaps when observations are merged together. 762 - **name** : str (default None) - name of the output observation when returnmode is 'single'. 763 - **param** : dict (default None) - param of the output observation when returnmode is 'single'. 764 ''' 765 if returnmode not in {'unchanged', 'observation', 'idfused', 'single'}: raise ValueError("returnmode must have one of these values: 'unchanged', 'observation', 'idfused', 'single'.") 766 if returnmode == 'single': 767 if name is not None and not isinstance(name, str) : raise TypeError("name should be a string.") 768 if param is not None and not isinstance(param, dict): raise TypeError("param should be a dictionnary.") 769 self._filtered = False # Boolean put to True inside of self._cond() if an additional filtering specific to TimeSlot and shapely geometries is necessary. 770 771 ## Construction of a result list where data are in the format given by returnmode 772 773 ## ESSearch.execute() 1: Query is executed on each Mongo Collection or Cursor of self.input 774 775 result = [] 776 for data in self.input[0]: 777 request_type, request_content = self._fullSearchMongo() 778 if request_type == 'find': 779 cursor = data.find(request_content, self._project) 780 else: 781 cursor = data.aggregate(request_content) 782 783 if returnmode == 'observation': # Only in this case is an observation created directly. 784 for item in cursor: 785 if self._filtered: # Additional filtering for objects like TimeSlot who need it 786 for conds in self.parameters: 787 checks_parameters, checks_conds = True, True 788 for cond in conds: 789 if cond['path'] in item: 790 try: checks_conds = checks_conds and self._condcheck(item[cond['path']], cond) # checking for each condition if it is satisfied 791 except: checks_conds = False 792 else: 793 checks_conds = False 794 checks_parameters = checks_parameters or checks_conds 795 dic = {} 796 if '_metadata' in item: 797 if 'name' in item['_metadata']: dic['name'] = item['_metadata']['name'] 798 if 'param' in item['_metadata']: dic['param'] = item['_metadata']['param'] 799 del item['_metadata'] 800 dic['idxdic'] = {key: [item[key]] for key in item} 801 if not self._filtered or (self._filtered and checks_parameters): result.append(Observation.dic(**dic)) 802 803 elif returnmode == 'single': 804 for item in cursor: 805 if '_metadata' in item : del item['_metadata'] 806 result.append(item) 807 808 else: # returnmode == 'unchanged' or returnmode == 'idfused' 809 for item in cursor: 810 if item: 811 result.append(item) 812 813 ## ESSearch.execute() 2: Operations for cases 'idfused' and 'single' are performed on output objects. 814 # (more efficient to do it like this than after a conversion to Observation) 815 816 if returnmode == 'single': 817 arg = {} # argument to be given to Observation.dic() merging all observations together 818 for i in range(len(result)): 819 for column_name in arg: # for columns already in the new Observation 820 if column_name not in result[i]: 821 arg[column_name].append(fillvalue) 822 for column_name in result[i]: # for columns missing in the new Observation 823 if column_name not in arg: 824 arg[column_name] = [fillvalue] * i + [result[i][column_name]] # an empty column filled with fillvalue is added if a new column name is encountered 825 else: 826 arg[column_name].append(result[i][column_name]) 827 if self._filtered: result = [self._filtered_observation(Observation.dic(arg))] 828 else: result = [Observation.dic(arg)] 829 830 elif returnmode == 'idfused': 831 hashs_dic = {} 832 for item in result: 833 id = str(item['_metadata']['id']) # will throw an error if item has no id. Should items with no id be let as is or merged together? 834 if id in hashs_dic: # one line is added to hashs_dic[id] for each element of result having this id 835 del item['_metadata'] # Two items with the same id should have the same metadata. 836 for column_name in hashs_dic[id]['idxdic']: 837 if column_name not in item: 838 hashs_dic[id]['idxdic'][column_name].append(fillvalue) 839 for column_name in item: 840 if column_name not in hashs_dic[id]['idxdic']: 841 hashs_dic[id]['idxdic'][column_name] = [fillvalue] * i + [item[column_name]] # a filled column is added if a new column name is encountered 842 else: 843 hashs_dic[id]['idxdic'][column_name].append(item[column_name]) 844 else: 845 dic = {} 846 if 'name' in item['_metadata']: dic['name'] = item['_metadata']['name'] 847 if 'param' in item['_metadata']: dic['param'] = item['_metadata']['param'] 848 del item['_metadata'] 849 dic['idxdic'] = {key: [item[key]] for key in item} 850 hashs_dic[id] = dic 851 result = [] 852 for id in hashs_dic: # an Observation is added to result for each id 853 obs_out = Observation.dic(**hashs_dic[id]) 854 if obs_out: 855 if self._filtered: result.append(self._filtered_observation(obs_out)) # finalement, semble plus pertinent de faire ce filtrage directemt sur la sortie Mongo, car même si un à un les tests de condition sont faits à de nombreuses reprises, au global ce ne sont jamais les mêmes combinaisons de test 856 else: result.append(obs_out) 857 # At this point, result is a list of observations. 858 859 ## ESSearch.execute() 3: Other inputs (pure observations) are treated purely in python with the self._filtered_observation() method 860 861 for data in self.input[1]: # data which are not taken from a Mongo database and already are observations are treated here. 862 result.append(self._filtered_observation(data, False)) 863 864 ## ESSearch.execute() 4: Operations for cases 'idfused' and 'single' are performed again with the added observations 865 866 if len(result) >= 1: 867 if returnmode == 'single': 868 result = self._fusion(result, fillvalue) 869 if name is None: name = "ESSearch query result on " + str(datetime.datetime.now()) # default value for name 870 if param is None: # default value for param 871 if self.sources is not None: 872 sources = self.sources 873 else: 874 sources = [] 875 for item in self.input: # informations about the inputs are added to param 876 if isinstance(item, Observation): 877 if item.name is not None: 878 sources.append('Observation: ' + item.name) 879 else: 880 sources.append('data') 881 elif isinstance(item, Collection): 882 sources.append('MongoDB collection: ' + item.name + ' from database: ' + item.database.name) 883 elif isinstance(item, Cursor): 884 sources.append('Pymongo cursor: ' + item.cursor_id + ' from collection ' + item.collection.name + 885 ' from database: ' + item.collection.database.name) 886 elif isinstance(item, CommandCursor): 887 sources.append('Pymongo commandcursor: ' + item.cursor_id + ' from collection ' + item.collection.name + 888 ' from database: ' + item.collection.database.name) 889 else: # should not happen 890 sources.append('data') 891 param = {'date': str(datetime.datetime.now()), 'project': 'essearch', 'type': 'dim3', 892 'context': {'origin': 'ESSearch query', 'sources ': sources, 893 'ESSearch_parameters': str(self.parameters)}} 894 result.param = param 895 896 elif returnmode == 'idfused' and len(result) != 1: 897 hashs_dic = {} # This dictionnary is in the format {id: [observations]} 898 for item in result: 899 if item.id in hashs_dic: 900 hashs_dic[item.id].append(item) 901 else: 902 hashs_dic[item.id] = [item] 903 result = [] 904 for id in hashs_dic: 905 obs_out = self._fusion(hashs_dic[id], fillvalue) 906 if hashs_dic[id][0].name : obs_out.name = hashs_dic[id][0].name # name and param associated to the id are put back 907 if hashs_dic[id][0].param: obs_out.name = hashs_dic[id][0].param 908 if obs_out: result.append(obs_out) 909 910 ## ESSearch.execute() 5: Return result 911 912 return result 913 914 def _filtered_observation(self, obs, is_from_mongo = True): # Vérifier que cette fonction n'est pas moins efficace que de tout déplier / filtrer / tout replier 915 # Regarder si on ne peut pas faire la même chose en mieux avec des fonctions de numpy ou panda. 916 ''' 917 Takes an Observation and returns a filtered Observation with self.parameters as a filter. 918 ''' 919 # self.parameters = [[cond1 AND cond 2 AND cond 3] OR [cond4 AND cond5 AND cond6]] 920 # dico = {"data": [["datation", [date1, date2, date3], [0,1,0,2,2,1]], ["location", [loc1, loc2, loc3], [0,1,2,0,1,1]]]} # dico n'est plus utilisé 921 if len(obs) == 0 or self.parameters == [[]]: return obs 922 923 ## ESSearch._filtered_observation() 0: This function is done with an iteration over self.parameters 924 925 final_filter = [False] * obs.lencomplete 926 for i in range(len(self.parameters)): # for each group of conditions separated by a or 927 if self.parameters[i] != []: 928 929 ## ESSearch._filtered_observation() 1: Checking if no column is missing and if conditions on metadata are verified 930 931 conds = {} # conds is a dict which associates columns to the condition which concern them (inside of [cond1 AND cond 2 AND cond 3] : only AND) 932 relevant = True # no column given by cond["path"] is missing from obs 933 for cond in self.parameters[i]: 934 next_relevant = False 935 for j in range(len(obs.lindex)): 936 if cond["path"] == obs.lindex[j].name: 937 if j in conds: conds[j].append(cond) 938 else: conds[j] = [cond] 939 next_relevant = True 940 elif cond["path"][:9] == "_metadata": # metadata are id, name and param 941 if not is_from_mongo: # This step is considered to be done already for data taken out of Mongo 942 if cond["path"][10:] == 'name' : next_relevant = next_relevant and self._condcheck(obs.name, cond) 943 if cond["path"][10:] == 'id' : next_relevant = next_relevant and self._condcheck(obs.id, cond) 944 if cond["path"][10:] == 'param': next_relevant = next_relevant and self._condcheck(obs.param, cond) 945 else: 946 next_relevant = True 947 relevant = relevant and next_relevant 948 if not relevant: continue # if a column on which a condition is applied is missing, the set of conditions given by the element of self.parameters is considered not verified. 949 950 ## ESSearch._filtered_observation() 2: Condition is applied on all elements of the Observation to create a boolean filter 951 952 full_filter = [True] * obs.lencomplete 953 for j in range(obs.lenidx): # iteration over the columns 954 filter = [] 955 for item in obs.lindex[j].cod: 956 boolean = True 957 for cond in conds[j]: 958 try: boolean = boolean and self._condcheck(item, cond) 959 except: pass #boolean = False # pose problème pour les regex et autres opérations non implémentées... 960 filter.append(boolean) # condition is rewritten as a boolean filter for the incoming data 961 next_full_filter = util.tovalues(obs.lindex[j].keys, filter) # filter changed from filter on optimize codec to filter on full codec. (filter on optimize codec was the just how we calculated it, this isn't an actual method of the module) 962 full_filter = [full_filter[k] and next_full_filter[k] for k in range(len(full_filter))] # full filter is updated each time 963 964 ## ESSearch._filtered_observation() 3: or_position is taken into account 965 966 final_filter = [final_filter[j] or full_filter[j] for j in range(len(full_filter))] 967 968 ## ESSearch._filtered_observation() 4: Application of the final filter on the incoming Observation 969 970 obs.setfilter(final_filter) 971 obs.applyfilter() 972 return obs 973 974 def _condcheck(self, item, cond = None): # ajouter la gestion des regex 975 ''' 976 Takes an item and returns a Boolean if it verifies criteria given by parameter. 977 ''' 978 #cond = {"comparator" : comparator, "operand" : operand, "path" : path} and sometimes can contain other things 979 980 # 0. Basic cases 981 982 if cond is None: return True 983 if 'inverted' in cond and cond['inverted']: return not self._condcheck(item, cond | {'inverted' : False}) 984 if cond["comparator"] is None and cond["operand"] is None: return True 985 986 # 1. formatstring applied if formatstring there is 987 988 if "formatstring" in cond: 989 if not isinstance(item, (datetime.datetime, TimeSlot)): 990 item = datetime.datetime.strptime(item, cond["formatstring"]) 991 if not isinstance(cond["operand"], (datetime.datetime, TimeSlot)): 992 cond["operand"] = datetime.datetime.strptime(cond["operand"], cond["formatstring"]) 993 994 # 2. Cases TimeSlot, geometry and nested property need specific treatment 995 996 if isinstance(item, TimeSlot): 997 if isinstance(cond["operand"], TimeSlot): # if comparator is one of the specific operators for TimeSlot 998 cond["comparator"] = dico_alias_python[TimeSlot][TimeSlot][cond["comparator"]] 999 return item.link(cond["operand"])[0] == cond["comparator"] 1000 else: # if operand is a datetime 1001 try: return dico_alias_python[TimeSlot][datetime.datetime][cond["comparator"]](item, cond["operand"]) 1002 except: raise ValueError("Comparator not supported for TimeSlot.") 1003 1004 elif isinstance(item, list) or isinstance(item, shapely.geometry.base.BaseGeometry): 1005 if isinstance(item, list): # lists are interpreted as geometries 1006 if len(item) == 1: item = shapely.geometry.Point(item[0]) 1007 elif (len(item) > 1 and not isinstance(item[0], list)): item = shapely.geometry.Point(item) 1008 elif len(item) == 2: item = shapely.geometry.LineString(item) 1009 elif len(item) > 2: 1010 if not item[-1] == item[0]: 1011 item.append(item[0]) 1012 item = shapely.geometry.Polygon([item]) 1013 if isinstance(cond["operand"], list): # lists are interpreted as geometries 1014 if len(cond["operand"]) == 1: cond["operand"] = shapely.geometry.Point(cond["operand"][0]) 1015 elif (len(cond["operand"]) > 1 and not isinstance(cond["operand"][0], list)): 1016 cond["operand"] = shapely.geometry.Point(cond["operand"]) 1017 elif len(cond["operand"]) == 2: cond["operand"] = shapely.geometry.LineString(cond["operand"]) 1018 elif len(cond["operand"]) > 2: 1019 if not item[-1] == item[0]: 1020 item.append(item[0]) 1021 item = shapely.geometry.Polygon([item]) 1022 return dico_alias_mongo['geometry'][cond["comparator"]](item, cond["operand"]) 1023 1024 elif cond["path"] == "property" and isinstance(item, dict): # assuming that property contains dicts and that the query targets one of its values 1025 for val in item.values(): 1026 if self._condcheck(val, cond | {"path" : None}): 1027 return True 1028 return False 1029 1030 # 3. General comparison for remaining cases 1031 1032 try: return dico_alias_mongo['default'][cond["comparator"]](item, cond["operand"]) 1033 except: 1034 #raise ValueError("Comparator not supported.") 1035 return True 1036 1037 def _fusion(self, obsList, fillvalue = None): # Idéalement, utiliser une méthode fusion de Observation. 1038 ''' 1039 Takes a list of observations and returns one observation merging them together in one single observation. 1040 ''' 1041 ## ESSearch._fusion() 0: Basic cases 1042 1043 if len(obsList) == 0: return Observation() 1044 elif len(obsList) == 1: return Observation(obsList[0]) 1045 else: # Fusion of a list with more than one element 1046 lidx = [] 1047 1048 ## ESSearch._fusion() 1: Determination of the names of the columns 1049 1050 new_lname = set() 1051 for obs in obsList: 1052 new_lname |= set(obs.lname) 1053 new_lname = list(new_lname) 1054 1055 ## ESSearch._fusion() 2: Fill the columns with the values of the observations to merge 1056 1057 for i in range(len(new_lname)): # for each column of the new Observation 1058 values = [] 1059 for obs in obsList: # for each Observation in the list 1060 if new_lname[i] in obs.lname: values += obs.lindex[obs.lname.index(new_lname[i])].values # values of the column are added to the new column 1061 else: values += [fillvalue] * len(obs) # when there is no value, filled with fillvalue 1062 codec = util.tocodec(values) 1063 lidx.append(Field(codec, new_lname[i], util.tokeys(values, codec))) 1064 1065 ## ESSearch._fusion() 3: Build the actual Observation 1066 1067 return Observation(lidx) 1068 # Il y aurait peut-être moyen d'optimiser un peu en remplaçant Field, util.tokeys et l'appel final d'Observation par des manipulations directes sur les objets. 1069 # faire la fusion directement sur les keys au lieu de la faire sur les codec ? 1070 # Avec la méthode actuelle, certaines opérations sont faites plusieurs fois.
192def insert_from_doc(collection, document , info=True): 193 '''Inserts all observations from a document into a collection, where each line of the document corresponds to an observation.''' 194 with open(document, 'r') as doc: 195 for line in doc: 196 try: insert_to_mongo(collection, line, info) 197 except: pass
Inserts all observations from a document into a collection, where each line of the document corresponds to an observation.
199def insert_to_mongo(collection, obj, info=False): # Mieux avec panda ? 200 '''Takes an observation or a list of observations and inserts them into a MongoDB collection, with info by default.''' 201 # Faire une fonction pour permettre l'ajout direct de fichiers csv. 202 inserted_list = [] 203 if isinstance(obj, list): 204 pile = obj 205 elif isinstance(obj, Observation): 206 pile = [obj] 207 else: 208 pile = [Observation.from_obj(obj)] 209 for obs in pile: 210 if obs.id: obs_hash = obs.id 211 else: obs_hash = hash(obs) 212 metadata = {'id': obs_hash} 213 if obs.name : metadata['name'] = obs.name 214 if obs.param: metadata['param'] = obs.param 215 if info: metadata['information'] = Observation._info(True, True) 216 if len(obs.lname) == 1: # a special case is needed because lists with one element are replaced by the element itself so iteration doesn't work 217 for line in obs: 218 inserted_list.append({obs.lname[0]: util.json(line, encoded=False, typevalue=None, simpleval=False, geojson=True), 219 '_metadata': metadata}) 220 elif len(obs.lname) > 1: 221 for line in obs: 222 inserted_list.append({obs.lname[i]: util.json(line[i], encoded=False, typevalue=None, simpleval=False, geojson=True) 223 for i in range(len(line))} | {'_metadata': metadata}) 224 if inserted_list != []: collection.insert_many(inserted_list)
Takes an observation or a list of observations and inserts them into a MongoDB collection, with info by default.
226def empty_request(collection): 227 """ 228 Empty request to get an idea of what the database contains. 229 Currently returns the count of elements in the collection and the name of each column. 230 """ 231 count = 0 232 column_names = [] 233 cursor = collection.find() 234 for doc in cursor: 235 count += 1 236 for column_name in doc: 237 if column_name not in column_names: 238 column_names.append(column_name) 239 return {'count': count, 'column_names': column_names}
Empty request to get an idea of what the database contains. Currently returns the count of elements in the collection and the name of each column.
241class ESSearch: 242 """ 243 An `ESSearch` is defined as an ensemble of conditions to be used to execute a MongoDB request or any iterable containing only observations. 244 245 *Attributes (for @property, see methods)* : 246 247 - **input** : input on which the query is done. One of or a list of these : 248 - pymongo.collection.Collection 249 - pymongo.cursor.Cursor 250 - pymongo.command_cursor.CommandCursor 251 - Observation (can be defined from a str or a dict) 252 - **parameters** : list of list of conditions for queries, to be interpreted as : parameters = [[cond_1 AND cond_2 AND cond_3] OR [cond_4 AND cond_5 AND cond_6]] where conds are criteria for queries 253 - **hide** : list of paths to hide from the output 254 - **heavy** : boolean indicating whether the request should search for nested values or not. Does not work with geoJSON. 255 - **sources** : attribute used to indicate the sources of the data in param 256 257 The methods defined in this class are (documentations in methods definitions): 258 259 *setter* 260 261 - `ESSearch.addInput` 262 - `ESSearch.removeInputs` 263 - `ESSearch.setHide` 264 - `ESSearch.setHeavy` 265 - `ESSearch.clear` 266 267 *dynamic value (getter @property)* 268 269 - `ESSearch.request` 270 - `ESSearch.cursor` 271 272 *parameters for query - update methods* 273 274 - `ESSearch.addConditions` 275 - `ESSearch.addCondition` 276 - `ESSearch.orCondition` 277 - `ESSearch.removeCondition` 278 - `ESSearch.clearConditions` 279 280 *query method* 281 282 - `ESSearch.execute` 283 """ 284 def __init__(self, 285 input = None, 286 parameters = None, 287 hide = [], 288 heavy = False, 289 sources = None, 290 **kwargs 291 ): 292 ''' 293 ESSearch constructor. Parameters can also be defined and updated using class methods. 294 295 *Arguments* 296 297 - **input** : input on which the query is done. Must be one of or a list of these (can be nested): 298 - pymongo.collection.Collection 299 - pymongo.cursor.Cursor 300 - pymongo.command_cursor.CommandCursor 301 - Observation 302 - str corresponding to a json Observation 303 - dict corresponding to a json Observation 304 - **parameters** : dict, list (default None) - list of list or list of dictionnaries whose keys are arguments of ESSearch.addCondition method 305 ex: parameters = [ 306 {'name' : 'datation', 'operand' : datetime.datetime(2022, 9, 19, 1), 'comparator' : '>='}, 307 {'name' : 'property', 'operand' : 'PM2'} 308 ] 309 - **hide** : list (default []) - List of strings where strings correspond to paths to remove from the output 310 - **heavy** : bool (default False) - Must be True when values are defined directly and inside dictionnaries simultaneously. 311 - **sources** : (default None) - Optional parameter indicating the sources of the data in case when a query is executed with parameter single = True. 312 - **kwargs** : other parameters are used as arguments for ESSearch.addCondition method. 313 ''' 314 self.parameters = [[]] # self.parameters 315 if isinstance(hide, list): self.hide = hide # self.hide 316 else: raise TypeError("hide must be a list.") 317 318 if isinstance(heavy, bool): self.heavy = heavy # self.heavy 319 else: raise TypeError("heavy must be a bool.") 320 self.sources = sources # self.sources 321 322 self.input = [[], []] # self.input : formatted as [[Mongo Objects], [Observations]] (list of two lists) 323 if isinstance(input, list): pile = input 324 else: pile = [input] 325 while not len(pile) == 0: 326 obj = pile.pop() 327 if isinstance(obj, list): 328 pile += obj 329 elif isinstance(obj, (Collection, Cursor, CommandCursor)): 330 self.input[0].append(obj) 331 elif isinstance(obj, Observation): 332 self.input[1].append(obj) 333 elif isinstance(obj, (str, dict)): 334 try: 335 self.input[1].append(Observation.from_obj(obj)) 336 except: 337 raise ValueError("Cannot convert " + str(obj) + " to an Observation.") 338 elif obj is not None: 339 raise TypeError("Unsupported type for input " + str(obj)) 340 341 if parameters: self.addConditions(parameters) 342 if kwargs: self.addCondition(**kwargs) 343 344 def __repr__(self): 345 return "ESSearch(input = " + str(self.input) + ", parameters = " + str(self.parameters) + ")" 346 347 def __str__(self): 348 return str(self.parameters) 349 350 def __iter__(self): 351 self.n = -1 352 return self 353 354 def __next__(self): 355 if self.n < len(self.parameters)-1: 356 self.n += 1 357 return self.parameters[self.n] 358 else: 359 raise StopIteration 360 361 def __getitem__(self, key): 362 return self.parameters[key] 363 364 def addInput(self, input): 365 """ 366 Adds one or many inputs on which the query is to be executed given by argument input. 367 Inputs can be: 368 - pymongo.collection.Collection 369 - pymongo.cursor.Cursor 370 - pymongo.command_cursor.CommandCursor 371 - Observation 372 - str corresponding to a json Observation 373 - dict corresponding to a json Observation 374 or a list of any of these. 375 """ 376 added_input = [[], []] 377 if isinstance(input, list): pile = input 378 else: pile = [input] 379 while not len(pile) == 0: # using a stack (LIFO) to allow easy treatment of nested data. 380 obj = pile.pop() 381 if isinstance(obj, list): 382 pile += obj 383 elif isinstance(obj, (Collection, Cursor, CommandCursor)): 384 added_input[0].append(obj) 385 elif isinstance(obj, Observation): 386 added_input[1].append(obj) 387 elif isinstance(obj, (str, dict)): 388 try: 389 added_input[1].append(Observation.from_obj(obj)) 390 except: 391 raise ValueError("Cannot convert " + str(obj) + " to an Observation.") 392 elif obj is not None: 393 raise TypeError("Unsupported type for input " + str(obj)) 394 self.input[0] += added_input[0] # self.input is updated with the new inputs 395 self.input[1] += added_input[1] 396 397 def removeInputs(self): 398 """ 399 Removes all inputs from self. 400 """ 401 self.input = [[], []] 402 403 def setHide(self, hide): 404 ''' 405 Sets self.hide to a value given by argument hide. 406 ''' 407 if isinstance(hide, list): self.hide = hide 408 else: raise TypeError("hide must be a list.") 409 410 def setHeavy(self, heavy): 411 ''' 412 Sets self.heavy to a value given by argument heavy. 413 ''' 414 if isinstance(heavy, list): self.heavy = heavy 415 else: raise TypeError("heavy must be a bool.") 416 417 def setSources(self, sources): 418 ''' 419 Sets self.sources to a value given by argument sources. 420 ''' 421 self.sources = sources 422 423 def addConditions(self, parameters): 424 ''' 425 Takes multiple parameters and applies self.addCondition() on each of them. 426 ''' 427 if isinstance(parameters, dict): # case when one single condition is added 428 self.addCondition(**parameters) 429 elif isinstance(parameters, (list, tuple)): # case when several conditions are added 430 for parameter in parameters: 431 if isinstance(parameter, dict): self.addCondition(**parameter) 432 elif isinstance(parameters, (list, tuple)): self.addCondition(*parameter) 433 else: self.addCondition(parameter) 434 else: raise TypeError("parameters must be either a dict or a list of dict.") 435 436 def addCondition(self, path, operand = None, comparator = None, or_position = -1, **kwargs): 437 ''' 438 Takes parameters and inserts corresponding query condition in self.parameters. 439 440 *Parameters* 441 442 - **path** : str (required argument) - name of an IIndex, which corresponds to an Dataset column name, or name of a metadata element. 443 (ex: 'datation', 'location', 'property') 444 445 - **operand** : - (default None) - Object used for the comparison. 446 (ex: if we search for observations made in Paris, operand is 'Paris') 447 448 - **comparator**: str (default None) - str giving the comparator to use. (ex: '>=', 'in') 449 450 - **or_position** : int (default -1) - position in self.parameters in which the condition is to be inserted. 451 452 - **formatstring** : str (default None) - str to use to automatically change str to datetime before applying condition. 453 Does not update the data base. If value is set to 'default', format is assumed to be Isoformat. 454 455 - **inverted** : bool (default None) - to add a "not" in the condition. 456 To use in case where every element of a MongoDB array (equivalent to python list) must verify the condition (by default, condition is verified when at least one element of the array verifies it). 457 458 - **unwind** : int (default None) - int corresponding to the number of additional {"$unwind" : "$" + path} to be added in the beginning of the query. 459 460 - **regex_options** : str (default None) - str associated to regex options (i, m, x and s). See [this link](https://www.mongodb.com/docs/manual/reference/operator/query/regex/) for more details. 461 462 no comparator => default comparator associated with operand type in dico_alias_mongo is used (mainly equality) 463 no operand => only the existence of something located at path is tested 464 ''' 465 466 ## 1. Check if arguments given are valid. 467 468 if not isinstance(path, str): raise TypeError("name must be a str.") 469 if comparator is not None and not isinstance(comparator, str): raise TypeError("comparator must be a str.") 470 if or_position is not None and not isinstance(or_position, int): raise TypeError("or_position must be an int.") 471 472 for item in kwargs: # checking if parameters in kwarg do exist 473 if item not in {'formatstring', 'inverted', 'unwind', 'regex_options', 'distanceField', 'distanceMultiplier', 'includeLocs', 'key', 'maxDistance', 'minDistance', 'near', 'query', 'spherical'}: 474 raise ValueError("Unknown parameter : ", item) 475 476 if isinstance(operand, datetime.datetime) and (operand.tzinfo is None or operand.tzinfo.utcoffset(operand) is None): 477 operand = operand.replace(tzinfo=datetime.timezone.utc) 478 479 if operand: # checking if comparator can be applied on the operand 480 try: comparator = dico_alias_mongo[type(operand)][comparator] 481 except: raise ValueError("Incompatible values for comparator and operand. Ensure parameters are in the correct order.") 482 elif comparator: 483 raise ValueError("operand must be defined when comparator is used.") 484 485 ## 2. Add the condition to self.parameters 486 487 condition = {"comparator" : comparator, "operand" : operand, "path" : path} | kwargs 488 489 if or_position >= len(self.parameters): 490 self.parameters.append([condition]) 491 else: 492 self.parameters[or_position].append(condition) 493 494 def orCondition(self, *args, **kwargs): 495 ''' 496 Adds a condition in a new sublist in self.parameters. Separations in sublists correspond to "or" in the query. 497 ''' 498 self.addCondition(or_position = len(self.parameters), *args, **kwargs) 499 500 def removeCondition(self, or_position = None, condnum = None): 501 ''' 502 Removes a condition from self.parameters. By default, last element added is removed. 503 Otherwise, the removed condition is the one at self.parameters[or_position][condnum]. 504 505 To remove all conditions, use ESSearch.clearConditions() method. 506 ''' 507 if self.parameters == [[]]: return 508 if or_position is None: 509 if condnum is None: # by default : remove the very last added condition. 510 if len(self.parameters[-1]) > 1: self.parameters[-1].pop(-1) 511 else: self.parameters.pop(-1) 512 else: 513 if len(self.parameters[-1]) > 1 or condnum > 1: self.parameters[-1].pop(condnum) 514 else: self.parameters.pop(-1) 515 else: 516 if condnum is None or (len(self.parameters[or_position]) == 1 and condnum == 0): self.parameters.pop(or_position) # if or_position is not None and condnum is, the whole sublist at or_position is removed. 517 else: self.parameters[or_position].pop(condnum) 518 if self.parameters == []: # ensure self.parameters returns to its default value after being emptied 519 self.parameters = [[]] 520 521 def clearConditions(self): 522 ''' 523 Removes all conditions from self.parameters. 524 To remove all attributes, use ESSearch.clear() method. 525 ''' 526 self.parameters = [[]] 527 528 def clear(self): 529 ''' 530 Resets self. 531 (Creating a new Observation would be smarter than using this function.) 532 ''' 533 self = ESSearch() 534 535 def _cond(self, or_pos, operand, comparator, path, inverted = False, formatstring = None, unwind = None, regex_options = None, **kwargs): 536 ''' 537 Takes parameters and adds corresponding MongoDB expression to self._match. 538 self._unwind and self._set are updated when necessary. 539 ''' 540 if unwind: 541 if isinstance(unwind, str): 542 self._unwind.append(unwind) 543 elif isinstance(unwind, int): 544 for _ in range(unwind): self._unwind.append(path) 545 elif isinstance(unwind, tuple): # format : (<path>, <unwind quantity>) 546 for _ in range(unwind[1]): self._unwind.append(unwind[0]) 547 else: raise TypeError("unwind must be a tuple, a str or an int.") 548 549 if self.heavy and operand is not None: 550 if path not in self._heavystages: self._heavystages.add(path) # peut-être mieux de laisser l'utilisateur choisir manuellement 551 path = "_" + path + ".v" 552 553 if operand is None: # no operand => we only test if there is something located at path 554 comparator = "$exists" 555 operand = 1 556 else: 557 try: comparator = dico_alias_mongo[type(operand)][comparator] #global variable 558 except: 559 if formatstring: 560 try: comparator = dico_alias_mongo[datetime.datetime][comparator] 561 except: raise ValueError("Comparator not allowed.") 562 elif isinstance(operand, shapely.geometry.base.BaseGeometry): 563 operand = {"type" : operand.geom_type, "coordinates" : list(operand.exterior.coords)} 564 else: raise ValueError("Comparator not allowed.") 565 566 ##if path in {"$year", "$month", "$dayOfMonth", "$hour", "$minute", "$second", "$millisecond", "$dayOfYear", "$dayOfWeek"}: 567 ## self._set |= {path[1:]: {'datation' : path}} #à tester 568 ## path = datation 569 ## self._project |= {name[1:]:0} 570 571 if isinstance(operand, TimeSlot): #equals->within, contains->intersects, within, disjoint, intersects 572 self._filtered = True 573 if comparator == "within": 574 self._cond(or_pos, operand[0].start, "$gte", path, False) 575 self._cond(or_pos, operand[-1].end, "$lte", path, False) 576 elif comparator == "intersects": 577 self._cond(or_pos, operand[0].start, "$lte", path, False) # pourquoi False et pas inverted ici ?? 578 self._cond(or_pos, operand[-1].end, "$gte", path, False) 579 return 580 581 if formatstring: 582 if formatstring == "default": 583 if isinstance(operand, str): 584 operand = datetime.datetime.fromisoformat(operand) 585 self._set |= {path : {"$convert": {"input" : "$" + path, "to" : "date", "onError" : "$" + path}}} 586 else: 587 if isinstance(operand, str): 588 datetime.datetime.strptime(operand, formatstring) 589 self._set |= {path : {"$dateFromString" : {"dateString" : "$" + path, "format": formatstring, "onError": "$" + path}}} 590 591 if comparator in {"$geoIntersects", "$geoWithin"}: # operand : 592 # [x, y] or [[x, y]] -> Point ; 593 # [[x1, y1], [x2, y2]] -> LineString ; 594 # [[x1, y1], [x2, y2], [x3, y3], ...] or [[x1, y1], [x2, y2], [x3, y3], ..., [x1, y1]] or [[[x1, y1], [x2, y2], [x3, y3], ..., [x1, y1]]] -> Polygon. 595 if isinstance(operand, list): 596 if not isinstance(operand[0], list): 597 geom_type = "Point" 598 coordinates = operand 599 elif not isinstance(operand[0][0], list): 600 if len(operand) == 1: 601 geom_type = "Point" 602 coordinates = operand[0] 603 elif len(operand) == 2: 604 geom_type = "LineString" 605 coordinates = operand 606 elif len(operand) > 2: 607 if not operand[-1] == operand[0]: 608 operand.append(operand[0]) 609 geom_type = "Polygon" 610 coordinates = [operand] 611 else: raise ValueError("Unable to define a geometry from " + str(operand)) 612 else: 613 geom_type = "Polygon" 614 coordinates = operand 615 operand = {"$geometry" : {"type" : geom_type, "coordinates" : coordinates}} 616 elif isinstance(operand, dict) and '$geometry' not in operand: 617 operand = {"$geometry" : operand} 618 elif comparator == "$geoNear": # $geoNear is a MongoDB stage 619 self._geonear = self._geonear | kwargs 620 if 'distanceField' not in self._geonear: raise ValueError("distanceField missing in MongoDB stage $geoNear.") 621 return 622 elif isinstance(operand, list): # lists are interpreted as geometries. An additional filtering is necessary for geometry-specific functions 623 self._filtered = True 624 return 625 626 if comparator == "$regex" and regex_options: 627 cond_0 = {"$regex" : operand, "$options" : regex_options} 628 else: 629 cond_0 = {comparator : operand} 630 631 if inverted: 632 if path in self._match[or_pos]: 633 if "$nor" in self._match[or_pos][path]: 634 self._match[or_pos][path]["$nor"].append(cond_0) 635 elif "not" in self._match[or_pos][path]: 636 self._match[or_pos][path]["$nor"] = [self._match[or_pos][path]["$not"], cond_0] 637 del self._match[or_pos][path]["$not"] 638 else: 639 self._match[or_pos][path]["$not"] = cond_0 640 else: 641 self._match[or_pos][path] = {"$not" : cond_0} 642 else: 643 if path not in self._match[or_pos]: 644 self._match[or_pos][path] = cond_0 645 else: 646 self._match[or_pos][path] |= cond_0 647 648 def _fullSearchMongo(self): 649 """ 650 Takes self.parameters and returns a MongoDB Aggregation query. 651 """ 652 ## ESSearch._fullSearchMongo() 1: Declare private variables 653 654 request = [] 655 self._match = [] 656 self._unwind = [] 657 self._heavystages = set() # two additional set stages to treat nested objects 658 self._set = {} 659 self._geonear = {} 660 self._match = [] 661 self._project = {"_id" : 0} 662 for el in self.hide: self._project |= {el : 0} 663 664 ## ESSearch._fullSearchMongo() 2: Update private variables for each condition 665 666 for i in range(len(self.parameters)): # rewriting conditions in MongoDB format 667 self._match.append({}) 668 for cond in self.parameters[i]: 669 self._cond(or_pos = i, **cond) 670 671 ## ESSearch._fullSearchMongo() 3: Case 1 : find request 672 673 if not self._unwind and not self.heavy and not self._set and not self._geonear: # collection.find() request 674 if self._match: 675 j = 0 676 for i in range(len(self._match)): 677 if self._match[i] and j != i: # removing empty elements in place 678 self._match[j] = self._match[i] 679 j += 1 680 if j == 0: # when there is no $or 681 if self._match[0]: match = self._match[0] 682 else: # when there is a $or 683 match = {"$or": self._match[:j]} 684 return 'find', match 685 686 ## ESSearch._fullSearchMongo() 4: Case 2 : aggregate request 687 688 else: 689 if self._unwind: # Mongo $unwind stage 690 for unwind in self._unwind: 691 request.append({"$unwind" : "$" + unwind}) 692 if self._heavystages: # Additional Mongo $set stage if self.heavy is True 693 heavy = {} 694 for path in self._heavystages: 695 heavy |= {"_"+path:{"$cond":{"if":{"$eq":[{"$type":"$"+path},"object"]},"then":{"$objectToArray":"$"+path},"else": {"v":"$"+path}}}} 696 self._project |= {'_' + path: 0} 697 request.append({"$set" : heavy}) 698 if self._set: request.append({"$set" : self._set}) # Mongo $set stage 699 if self._geonear: request.append({"$geoNear" : self._geonear}) # Mongo $geoNear stage 700 if self._match: # Mongo $match stage 701 j = 0 702 for i in range(len(self._match)): 703 if self._match[i] and j != i: 704 self._match[j] = self._match[i] 705 j += 1 706 if j == 0: # when there is no $or 707 if self._match[0]: request.append({"$match" : self._match[0]}) 708 else: # when there is a $or 709 request.append({"$match" : {"$or": self._match[:j]}}) 710 if self._unwind: # Second Mongo $set stage when unwind not empty 711 dico = {} 712 for unwind in self._unwind: 713 if not unwind in dico: dico[unwind] = ["$" + unwind] 714 else: dico[unwind] = [dico[unwind]] 715 request.append({"$set" : dico}) 716 if self._project: request.append({"$project" : self._project}) # Mongo $project stage 717 return 'aggregation', request 718 719 @property 720 def request(self): 721 ''' 722 Getter returning the content of the query or aggregation query to be executed with ESSearch.execute(). 723 ''' 724 request_type, request_content = self._fullSearchMongo() 725 726 if request_type == 'find': 727 return 'collection.find(' + str(request_content) + ', ' + str(self._project) + ')' 728 else: 729 return 'collection.aggregate(' + str(request_content) + ')' 730 731 @property 732 def cursor(self): 733 ''' 734 Getter returning the cursors of the query or aggregation query result on all collections and cursors contained in self.input. 735 ''' 736 request_type, request_content = self._fullSearchMongo() 737 738 cursor_list = [] 739 for item in self.input[0]: # Determine the result cursor for each element of the input on which a Mongo query makes sense 740 if isinstance(item, (Collection, Cursor, CommandCursor)): 741 if request_type == 'find': 742 cursor_list.append(item.find(request_content, self._project)) 743 else: 744 cursor_list.append(item.aggregate(request_content)) 745 if len(cursor_list) == 1: 746 return cursor_list[0] 747 else: 748 return cursor_list 749 750 751 def execute(self, returnmode = 'observation', fillvalue = None, name = None, param = None): 752 ''' 753 Executes the request and returns its result, either in one or many Observations. 754 755 *Parameter* 756 757 - **returnmode** : str (default None) - Parameter giving the format of the output: 758 - 'unchanged' : output is returned as it is in the database, some operations like operations sepcific to TimeSlot object are not performed; 759 - 'observation': Each element is returned as an observation, but original observations aren't recreated; 760 - 'idfused': observations whose ids are the same are merged together; 761 - 'single': return a single observation merging all observations together. 762 - **fillvalue** : (default None) - Value to use to fill gaps when observations are merged together. 763 - **name** : str (default None) - name of the output observation when returnmode is 'single'. 764 - **param** : dict (default None) - param of the output observation when returnmode is 'single'. 765 ''' 766 if returnmode not in {'unchanged', 'observation', 'idfused', 'single'}: raise ValueError("returnmode must have one of these values: 'unchanged', 'observation', 'idfused', 'single'.") 767 if returnmode == 'single': 768 if name is not None and not isinstance(name, str) : raise TypeError("name should be a string.") 769 if param is not None and not isinstance(param, dict): raise TypeError("param should be a dictionnary.") 770 self._filtered = False # Boolean put to True inside of self._cond() if an additional filtering specific to TimeSlot and shapely geometries is necessary. 771 772 ## Construction of a result list where data are in the format given by returnmode 773 774 ## ESSearch.execute() 1: Query is executed on each Mongo Collection or Cursor of self.input 775 776 result = [] 777 for data in self.input[0]: 778 request_type, request_content = self._fullSearchMongo() 779 if request_type == 'find': 780 cursor = data.find(request_content, self._project) 781 else: 782 cursor = data.aggregate(request_content) 783 784 if returnmode == 'observation': # Only in this case is an observation created directly. 785 for item in cursor: 786 if self._filtered: # Additional filtering for objects like TimeSlot who need it 787 for conds in self.parameters: 788 checks_parameters, checks_conds = True, True 789 for cond in conds: 790 if cond['path'] in item: 791 try: checks_conds = checks_conds and self._condcheck(item[cond['path']], cond) # checking for each condition if it is satisfied 792 except: checks_conds = False 793 else: 794 checks_conds = False 795 checks_parameters = checks_parameters or checks_conds 796 dic = {} 797 if '_metadata' in item: 798 if 'name' in item['_metadata']: dic['name'] = item['_metadata']['name'] 799 if 'param' in item['_metadata']: dic['param'] = item['_metadata']['param'] 800 del item['_metadata'] 801 dic['idxdic'] = {key: [item[key]] for key in item} 802 if not self._filtered or (self._filtered and checks_parameters): result.append(Observation.dic(**dic)) 803 804 elif returnmode == 'single': 805 for item in cursor: 806 if '_metadata' in item : del item['_metadata'] 807 result.append(item) 808 809 else: # returnmode == 'unchanged' or returnmode == 'idfused' 810 for item in cursor: 811 if item: 812 result.append(item) 813 814 ## ESSearch.execute() 2: Operations for cases 'idfused' and 'single' are performed on output objects. 815 # (more efficient to do it like this than after a conversion to Observation) 816 817 if returnmode == 'single': 818 arg = {} # argument to be given to Observation.dic() merging all observations together 819 for i in range(len(result)): 820 for column_name in arg: # for columns already in the new Observation 821 if column_name not in result[i]: 822 arg[column_name].append(fillvalue) 823 for column_name in result[i]: # for columns missing in the new Observation 824 if column_name not in arg: 825 arg[column_name] = [fillvalue] * i + [result[i][column_name]] # an empty column filled with fillvalue is added if a new column name is encountered 826 else: 827 arg[column_name].append(result[i][column_name]) 828 if self._filtered: result = [self._filtered_observation(Observation.dic(arg))] 829 else: result = [Observation.dic(arg)] 830 831 elif returnmode == 'idfused': 832 hashs_dic = {} 833 for item in result: 834 id = str(item['_metadata']['id']) # will throw an error if item has no id. Should items with no id be let as is or merged together? 835 if id in hashs_dic: # one line is added to hashs_dic[id] for each element of result having this id 836 del item['_metadata'] # Two items with the same id should have the same metadata. 837 for column_name in hashs_dic[id]['idxdic']: 838 if column_name not in item: 839 hashs_dic[id]['idxdic'][column_name].append(fillvalue) 840 for column_name in item: 841 if column_name not in hashs_dic[id]['idxdic']: 842 hashs_dic[id]['idxdic'][column_name] = [fillvalue] * i + [item[column_name]] # a filled column is added if a new column name is encountered 843 else: 844 hashs_dic[id]['idxdic'][column_name].append(item[column_name]) 845 else: 846 dic = {} 847 if 'name' in item['_metadata']: dic['name'] = item['_metadata']['name'] 848 if 'param' in item['_metadata']: dic['param'] = item['_metadata']['param'] 849 del item['_metadata'] 850 dic['idxdic'] = {key: [item[key]] for key in item} 851 hashs_dic[id] = dic 852 result = [] 853 for id in hashs_dic: # an Observation is added to result for each id 854 obs_out = Observation.dic(**hashs_dic[id]) 855 if obs_out: 856 if self._filtered: result.append(self._filtered_observation(obs_out)) # finalement, semble plus pertinent de faire ce filtrage directemt sur la sortie Mongo, car même si un à un les tests de condition sont faits à de nombreuses reprises, au global ce ne sont jamais les mêmes combinaisons de test 857 else: result.append(obs_out) 858 # At this point, result is a list of observations. 859 860 ## ESSearch.execute() 3: Other inputs (pure observations) are treated purely in python with the self._filtered_observation() method 861 862 for data in self.input[1]: # data which are not taken from a Mongo database and already are observations are treated here. 863 result.append(self._filtered_observation(data, False)) 864 865 ## ESSearch.execute() 4: Operations for cases 'idfused' and 'single' are performed again with the added observations 866 867 if len(result) >= 1: 868 if returnmode == 'single': 869 result = self._fusion(result, fillvalue) 870 if name is None: name = "ESSearch query result on " + str(datetime.datetime.now()) # default value for name 871 if param is None: # default value for param 872 if self.sources is not None: 873 sources = self.sources 874 else: 875 sources = [] 876 for item in self.input: # informations about the inputs are added to param 877 if isinstance(item, Observation): 878 if item.name is not None: 879 sources.append('Observation: ' + item.name) 880 else: 881 sources.append('data') 882 elif isinstance(item, Collection): 883 sources.append('MongoDB collection: ' + item.name + ' from database: ' + item.database.name) 884 elif isinstance(item, Cursor): 885 sources.append('Pymongo cursor: ' + item.cursor_id + ' from collection ' + item.collection.name + 886 ' from database: ' + item.collection.database.name) 887 elif isinstance(item, CommandCursor): 888 sources.append('Pymongo commandcursor: ' + item.cursor_id + ' from collection ' + item.collection.name + 889 ' from database: ' + item.collection.database.name) 890 else: # should not happen 891 sources.append('data') 892 param = {'date': str(datetime.datetime.now()), 'project': 'essearch', 'type': 'dim3', 893 'context': {'origin': 'ESSearch query', 'sources ': sources, 894 'ESSearch_parameters': str(self.parameters)}} 895 result.param = param 896 897 elif returnmode == 'idfused' and len(result) != 1: 898 hashs_dic = {} # This dictionnary is in the format {id: [observations]} 899 for item in result: 900 if item.id in hashs_dic: 901 hashs_dic[item.id].append(item) 902 else: 903 hashs_dic[item.id] = [item] 904 result = [] 905 for id in hashs_dic: 906 obs_out = self._fusion(hashs_dic[id], fillvalue) 907 if hashs_dic[id][0].name : obs_out.name = hashs_dic[id][0].name # name and param associated to the id are put back 908 if hashs_dic[id][0].param: obs_out.name = hashs_dic[id][0].param 909 if obs_out: result.append(obs_out) 910 911 ## ESSearch.execute() 5: Return result 912 913 return result 914 915 def _filtered_observation(self, obs, is_from_mongo = True): # Vérifier que cette fonction n'est pas moins efficace que de tout déplier / filtrer / tout replier 916 # Regarder si on ne peut pas faire la même chose en mieux avec des fonctions de numpy ou panda. 917 ''' 918 Takes an Observation and returns a filtered Observation with self.parameters as a filter. 919 ''' 920 # self.parameters = [[cond1 AND cond 2 AND cond 3] OR [cond4 AND cond5 AND cond6]] 921 # dico = {"data": [["datation", [date1, date2, date3], [0,1,0,2,2,1]], ["location", [loc1, loc2, loc3], [0,1,2,0,1,1]]]} # dico n'est plus utilisé 922 if len(obs) == 0 or self.parameters == [[]]: return obs 923 924 ## ESSearch._filtered_observation() 0: This function is done with an iteration over self.parameters 925 926 final_filter = [False] * obs.lencomplete 927 for i in range(len(self.parameters)): # for each group of conditions separated by a or 928 if self.parameters[i] != []: 929 930 ## ESSearch._filtered_observation() 1: Checking if no column is missing and if conditions on metadata are verified 931 932 conds = {} # conds is a dict which associates columns to the condition which concern them (inside of [cond1 AND cond 2 AND cond 3] : only AND) 933 relevant = True # no column given by cond["path"] is missing from obs 934 for cond in self.parameters[i]: 935 next_relevant = False 936 for j in range(len(obs.lindex)): 937 if cond["path"] == obs.lindex[j].name: 938 if j in conds: conds[j].append(cond) 939 else: conds[j] = [cond] 940 next_relevant = True 941 elif cond["path"][:9] == "_metadata": # metadata are id, name and param 942 if not is_from_mongo: # This step is considered to be done already for data taken out of Mongo 943 if cond["path"][10:] == 'name' : next_relevant = next_relevant and self._condcheck(obs.name, cond) 944 if cond["path"][10:] == 'id' : next_relevant = next_relevant and self._condcheck(obs.id, cond) 945 if cond["path"][10:] == 'param': next_relevant = next_relevant and self._condcheck(obs.param, cond) 946 else: 947 next_relevant = True 948 relevant = relevant and next_relevant 949 if not relevant: continue # if a column on which a condition is applied is missing, the set of conditions given by the element of self.parameters is considered not verified. 950 951 ## ESSearch._filtered_observation() 2: Condition is applied on all elements of the Observation to create a boolean filter 952 953 full_filter = [True] * obs.lencomplete 954 for j in range(obs.lenidx): # iteration over the columns 955 filter = [] 956 for item in obs.lindex[j].cod: 957 boolean = True 958 for cond in conds[j]: 959 try: boolean = boolean and self._condcheck(item, cond) 960 except: pass #boolean = False # pose problème pour les regex et autres opérations non implémentées... 961 filter.append(boolean) # condition is rewritten as a boolean filter for the incoming data 962 next_full_filter = util.tovalues(obs.lindex[j].keys, filter) # filter changed from filter on optimize codec to filter on full codec. (filter on optimize codec was the just how we calculated it, this isn't an actual method of the module) 963 full_filter = [full_filter[k] and next_full_filter[k] for k in range(len(full_filter))] # full filter is updated each time 964 965 ## ESSearch._filtered_observation() 3: or_position is taken into account 966 967 final_filter = [final_filter[j] or full_filter[j] for j in range(len(full_filter))] 968 969 ## ESSearch._filtered_observation() 4: Application of the final filter on the incoming Observation 970 971 obs.setfilter(final_filter) 972 obs.applyfilter() 973 return obs 974 975 def _condcheck(self, item, cond = None): # ajouter la gestion des regex 976 ''' 977 Takes an item and returns a Boolean if it verifies criteria given by parameter. 978 ''' 979 #cond = {"comparator" : comparator, "operand" : operand, "path" : path} and sometimes can contain other things 980 981 # 0. Basic cases 982 983 if cond is None: return True 984 if 'inverted' in cond and cond['inverted']: return not self._condcheck(item, cond | {'inverted' : False}) 985 if cond["comparator"] is None and cond["operand"] is None: return True 986 987 # 1. formatstring applied if formatstring there is 988 989 if "formatstring" in cond: 990 if not isinstance(item, (datetime.datetime, TimeSlot)): 991 item = datetime.datetime.strptime(item, cond["formatstring"]) 992 if not isinstance(cond["operand"], (datetime.datetime, TimeSlot)): 993 cond["operand"] = datetime.datetime.strptime(cond["operand"], cond["formatstring"]) 994 995 # 2. Cases TimeSlot, geometry and nested property need specific treatment 996 997 if isinstance(item, TimeSlot): 998 if isinstance(cond["operand"], TimeSlot): # if comparator is one of the specific operators for TimeSlot 999 cond["comparator"] = dico_alias_python[TimeSlot][TimeSlot][cond["comparator"]] 1000 return item.link(cond["operand"])[0] == cond["comparator"] 1001 else: # if operand is a datetime 1002 try: return dico_alias_python[TimeSlot][datetime.datetime][cond["comparator"]](item, cond["operand"]) 1003 except: raise ValueError("Comparator not supported for TimeSlot.") 1004 1005 elif isinstance(item, list) or isinstance(item, shapely.geometry.base.BaseGeometry): 1006 if isinstance(item, list): # lists are interpreted as geometries 1007 if len(item) == 1: item = shapely.geometry.Point(item[0]) 1008 elif (len(item) > 1 and not isinstance(item[0], list)): item = shapely.geometry.Point(item) 1009 elif len(item) == 2: item = shapely.geometry.LineString(item) 1010 elif len(item) > 2: 1011 if not item[-1] == item[0]: 1012 item.append(item[0]) 1013 item = shapely.geometry.Polygon([item]) 1014 if isinstance(cond["operand"], list): # lists are interpreted as geometries 1015 if len(cond["operand"]) == 1: cond["operand"] = shapely.geometry.Point(cond["operand"][0]) 1016 elif (len(cond["operand"]) > 1 and not isinstance(cond["operand"][0], list)): 1017 cond["operand"] = shapely.geometry.Point(cond["operand"]) 1018 elif len(cond["operand"]) == 2: cond["operand"] = shapely.geometry.LineString(cond["operand"]) 1019 elif len(cond["operand"]) > 2: 1020 if not item[-1] == item[0]: 1021 item.append(item[0]) 1022 item = shapely.geometry.Polygon([item]) 1023 return dico_alias_mongo['geometry'][cond["comparator"]](item, cond["operand"]) 1024 1025 elif cond["path"] == "property" and isinstance(item, dict): # assuming that property contains dicts and that the query targets one of its values 1026 for val in item.values(): 1027 if self._condcheck(val, cond | {"path" : None}): 1028 return True 1029 return False 1030 1031 # 3. General comparison for remaining cases 1032 1033 try: return dico_alias_mongo['default'][cond["comparator"]](item, cond["operand"]) 1034 except: 1035 #raise ValueError("Comparator not supported.") 1036 return True 1037 1038 def _fusion(self, obsList, fillvalue = None): # Idéalement, utiliser une méthode fusion de Observation. 1039 ''' 1040 Takes a list of observations and returns one observation merging them together in one single observation. 1041 ''' 1042 ## ESSearch._fusion() 0: Basic cases 1043 1044 if len(obsList) == 0: return Observation() 1045 elif len(obsList) == 1: return Observation(obsList[0]) 1046 else: # Fusion of a list with more than one element 1047 lidx = [] 1048 1049 ## ESSearch._fusion() 1: Determination of the names of the columns 1050 1051 new_lname = set() 1052 for obs in obsList: 1053 new_lname |= set(obs.lname) 1054 new_lname = list(new_lname) 1055 1056 ## ESSearch._fusion() 2: Fill the columns with the values of the observations to merge 1057 1058 for i in range(len(new_lname)): # for each column of the new Observation 1059 values = [] 1060 for obs in obsList: # for each Observation in the list 1061 if new_lname[i] in obs.lname: values += obs.lindex[obs.lname.index(new_lname[i])].values # values of the column are added to the new column 1062 else: values += [fillvalue] * len(obs) # when there is no value, filled with fillvalue 1063 codec = util.tocodec(values) 1064 lidx.append(Field(codec, new_lname[i], util.tokeys(values, codec))) 1065 1066 ## ESSearch._fusion() 3: Build the actual Observation 1067 1068 return Observation(lidx) 1069 # Il y aurait peut-être moyen d'optimiser un peu en remplaçant Field, util.tokeys et l'appel final d'Observation par des manipulations directes sur les objets. 1070 # faire la fusion directement sur les keys au lieu de la faire sur les codec ? 1071 # Avec la méthode actuelle, certaines opérations sont faites plusieurs fois.
An ESSearch
is defined as an ensemble of conditions to be used to execute a MongoDB request or any iterable containing only observations.
Attributes (for @property, see methods) :
- input : input on which the query is done. One of or a list of these :
- pymongo.collection.Collection
- pymongo.cursor.Cursor
- pymongo.command_cursor.CommandCursor
- Observation (can be defined from a str or a dict)
- parameters : list of list of conditions for queries, to be interpreted as : parameters = [[cond_1 AND cond_2 AND cond_3] OR [cond_4 AND cond_5 AND cond_6]] where conds are criteria for queries
- hide : list of paths to hide from the output
- heavy : boolean indicating whether the request should search for nested values or not. Does not work with geoJSON.
- sources : attribute used to indicate the sources of the data in param
The methods defined in this class are (documentations in methods definitions):
setter
dynamic value (getter @property)
parameters for query - update methods
ESSearch.addConditions
ESSearch.addCondition
ESSearch.orCondition
ESSearch.removeCondition
ESSearch.clearConditions
query method
284 def __init__(self, 285 input = None, 286 parameters = None, 287 hide = [], 288 heavy = False, 289 sources = None, 290 **kwargs 291 ): 292 ''' 293 ESSearch constructor. Parameters can also be defined and updated using class methods. 294 295 *Arguments* 296 297 - **input** : input on which the query is done. Must be one of or a list of these (can be nested): 298 - pymongo.collection.Collection 299 - pymongo.cursor.Cursor 300 - pymongo.command_cursor.CommandCursor 301 - Observation 302 - str corresponding to a json Observation 303 - dict corresponding to a json Observation 304 - **parameters** : dict, list (default None) - list of list or list of dictionnaries whose keys are arguments of ESSearch.addCondition method 305 ex: parameters = [ 306 {'name' : 'datation', 'operand' : datetime.datetime(2022, 9, 19, 1), 'comparator' : '>='}, 307 {'name' : 'property', 'operand' : 'PM2'} 308 ] 309 - **hide** : list (default []) - List of strings where strings correspond to paths to remove from the output 310 - **heavy** : bool (default False) - Must be True when values are defined directly and inside dictionnaries simultaneously. 311 - **sources** : (default None) - Optional parameter indicating the sources of the data in case when a query is executed with parameter single = True. 312 - **kwargs** : other parameters are used as arguments for ESSearch.addCondition method. 313 ''' 314 self.parameters = [[]] # self.parameters 315 if isinstance(hide, list): self.hide = hide # self.hide 316 else: raise TypeError("hide must be a list.") 317 318 if isinstance(heavy, bool): self.heavy = heavy # self.heavy 319 else: raise TypeError("heavy must be a bool.") 320 self.sources = sources # self.sources 321 322 self.input = [[], []] # self.input : formatted as [[Mongo Objects], [Observations]] (list of two lists) 323 if isinstance(input, list): pile = input 324 else: pile = [input] 325 while not len(pile) == 0: 326 obj = pile.pop() 327 if isinstance(obj, list): 328 pile += obj 329 elif isinstance(obj, (Collection, Cursor, CommandCursor)): 330 self.input[0].append(obj) 331 elif isinstance(obj, Observation): 332 self.input[1].append(obj) 333 elif isinstance(obj, (str, dict)): 334 try: 335 self.input[1].append(Observation.from_obj(obj)) 336 except: 337 raise ValueError("Cannot convert " + str(obj) + " to an Observation.") 338 elif obj is not None: 339 raise TypeError("Unsupported type for input " + str(obj)) 340 341 if parameters: self.addConditions(parameters) 342 if kwargs: self.addCondition(**kwargs)
ESSearch constructor. Parameters can also be defined and updated using class methods.
Arguments
- input : input on which the query is done. Must be one of or a list of these (can be nested):
- pymongo.collection.Collection
- pymongo.cursor.Cursor
- pymongo.command_cursor.CommandCursor
- Observation
- str corresponding to a json Observation
- dict corresponding to a json Observation
- parameters : dict, list (default None) - list of list or list of dictionnaries whose keys are arguments of ESSearch.addCondition method ex: parameters = [ {'name' : 'datation', 'operand' : datetime.datetime(2022, 9, 19, 1), 'comparator' : '>='}, {'name' : 'property', 'operand' : 'PM2'} ]
- hide : list (default []) - List of strings where strings correspond to paths to remove from the output
- heavy : bool (default False) - Must be True when values are defined directly and inside dictionnaries simultaneously.
- sources : (default None) - Optional parameter indicating the sources of the data in case when a query is executed with parameter single = True.
- kwargs : other parameters are used as arguments for ESSearch.addCondition method.
364 def addInput(self, input): 365 """ 366 Adds one or many inputs on which the query is to be executed given by argument input. 367 Inputs can be: 368 - pymongo.collection.Collection 369 - pymongo.cursor.Cursor 370 - pymongo.command_cursor.CommandCursor 371 - Observation 372 - str corresponding to a json Observation 373 - dict corresponding to a json Observation 374 or a list of any of these. 375 """ 376 added_input = [[], []] 377 if isinstance(input, list): pile = input 378 else: pile = [input] 379 while not len(pile) == 0: # using a stack (LIFO) to allow easy treatment of nested data. 380 obj = pile.pop() 381 if isinstance(obj, list): 382 pile += obj 383 elif isinstance(obj, (Collection, Cursor, CommandCursor)): 384 added_input[0].append(obj) 385 elif isinstance(obj, Observation): 386 added_input[1].append(obj) 387 elif isinstance(obj, (str, dict)): 388 try: 389 added_input[1].append(Observation.from_obj(obj)) 390 except: 391 raise ValueError("Cannot convert " + str(obj) + " to an Observation.") 392 elif obj is not None: 393 raise TypeError("Unsupported type for input " + str(obj)) 394 self.input[0] += added_input[0] # self.input is updated with the new inputs 395 self.input[1] += added_input[1]
Adds one or many inputs on which the query is to be executed given by argument input. Inputs can be: - pymongo.collection.Collection - pymongo.cursor.Cursor - pymongo.command_cursor.CommandCursor - Observation - str corresponding to a json Observation - dict corresponding to a json Observation or a list of any of these.
397 def removeInputs(self): 398 """ 399 Removes all inputs from self. 400 """ 401 self.input = [[], []]
Removes all inputs from self.
403 def setHide(self, hide): 404 ''' 405 Sets self.hide to a value given by argument hide. 406 ''' 407 if isinstance(hide, list): self.hide = hide 408 else: raise TypeError("hide must be a list.")
Sets self.hide to a value given by argument hide.
410 def setHeavy(self, heavy): 411 ''' 412 Sets self.heavy to a value given by argument heavy. 413 ''' 414 if isinstance(heavy, list): self.heavy = heavy 415 else: raise TypeError("heavy must be a bool.")
Sets self.heavy to a value given by argument heavy.
417 def setSources(self, sources): 418 ''' 419 Sets self.sources to a value given by argument sources. 420 ''' 421 self.sources = sources
Sets self.sources to a value given by argument sources.
423 def addConditions(self, parameters): 424 ''' 425 Takes multiple parameters and applies self.addCondition() on each of them. 426 ''' 427 if isinstance(parameters, dict): # case when one single condition is added 428 self.addCondition(**parameters) 429 elif isinstance(parameters, (list, tuple)): # case when several conditions are added 430 for parameter in parameters: 431 if isinstance(parameter, dict): self.addCondition(**parameter) 432 elif isinstance(parameters, (list, tuple)): self.addCondition(*parameter) 433 else: self.addCondition(parameter) 434 else: raise TypeError("parameters must be either a dict or a list of dict.")
Takes multiple parameters and applies self.addCondition() on each of them.
436 def addCondition(self, path, operand = None, comparator = None, or_position = -1, **kwargs): 437 ''' 438 Takes parameters and inserts corresponding query condition in self.parameters. 439 440 *Parameters* 441 442 - **path** : str (required argument) - name of an IIndex, which corresponds to an Dataset column name, or name of a metadata element. 443 (ex: 'datation', 'location', 'property') 444 445 - **operand** : - (default None) - Object used for the comparison. 446 (ex: if we search for observations made in Paris, operand is 'Paris') 447 448 - **comparator**: str (default None) - str giving the comparator to use. (ex: '>=', 'in') 449 450 - **or_position** : int (default -1) - position in self.parameters in which the condition is to be inserted. 451 452 - **formatstring** : str (default None) - str to use to automatically change str to datetime before applying condition. 453 Does not update the data base. If value is set to 'default', format is assumed to be Isoformat. 454 455 - **inverted** : bool (default None) - to add a "not" in the condition. 456 To use in case where every element of a MongoDB array (equivalent to python list) must verify the condition (by default, condition is verified when at least one element of the array verifies it). 457 458 - **unwind** : int (default None) - int corresponding to the number of additional {"$unwind" : "$" + path} to be added in the beginning of the query. 459 460 - **regex_options** : str (default None) - str associated to regex options (i, m, x and s). See [this link](https://www.mongodb.com/docs/manual/reference/operator/query/regex/) for more details. 461 462 no comparator => default comparator associated with operand type in dico_alias_mongo is used (mainly equality) 463 no operand => only the existence of something located at path is tested 464 ''' 465 466 ## 1. Check if arguments given are valid. 467 468 if not isinstance(path, str): raise TypeError("name must be a str.") 469 if comparator is not None and not isinstance(comparator, str): raise TypeError("comparator must be a str.") 470 if or_position is not None and not isinstance(or_position, int): raise TypeError("or_position must be an int.") 471 472 for item in kwargs: # checking if parameters in kwarg do exist 473 if item not in {'formatstring', 'inverted', 'unwind', 'regex_options', 'distanceField', 'distanceMultiplier', 'includeLocs', 'key', 'maxDistance', 'minDistance', 'near', 'query', 'spherical'}: 474 raise ValueError("Unknown parameter : ", item) 475 476 if isinstance(operand, datetime.datetime) and (operand.tzinfo is None or operand.tzinfo.utcoffset(operand) is None): 477 operand = operand.replace(tzinfo=datetime.timezone.utc) 478 479 if operand: # checking if comparator can be applied on the operand 480 try: comparator = dico_alias_mongo[type(operand)][comparator] 481 except: raise ValueError("Incompatible values for comparator and operand. Ensure parameters are in the correct order.") 482 elif comparator: 483 raise ValueError("operand must be defined when comparator is used.") 484 485 ## 2. Add the condition to self.parameters 486 487 condition = {"comparator" : comparator, "operand" : operand, "path" : path} | kwargs 488 489 if or_position >= len(self.parameters): 490 self.parameters.append([condition]) 491 else: 492 self.parameters[or_position].append(condition)
Takes parameters and inserts corresponding query condition in self.parameters.
Parameters
path : str (required argument) - name of an IIndex, which corresponds to an Dataset column name, or name of a metadata element. (ex: 'datation', 'location', 'property')
operand : - (default None) - Object used for the comparison. (ex: if we search for observations made in Paris, operand is 'Paris')
comparator: str (default None) - str giving the comparator to use. (ex: '>=', 'in')
or_position : int (default -1) - position in self.parameters in which the condition is to be inserted.
formatstring : str (default None) - str to use to automatically change str to datetime before applying condition. Does not update the data base. If value is set to 'default', format is assumed to be Isoformat.
inverted : bool (default None) - to add a "not" in the condition. To use in case where every element of a MongoDB array (equivalent to python list) must verify the condition (by default, condition is verified when at least one element of the array verifies it).
unwind : int (default None) - int corresponding to the number of additional {"$unwind" : "$" + path} to be added in the beginning of the query.
regex_options : str (default None) - str associated to regex options (i, m, x and s). See this link for more details.
no comparator => default comparator associated with operand type in dico_alias_mongo is used (mainly equality) no operand => only the existence of something located at path is tested
494 def orCondition(self, *args, **kwargs): 495 ''' 496 Adds a condition in a new sublist in self.parameters. Separations in sublists correspond to "or" in the query. 497 ''' 498 self.addCondition(or_position = len(self.parameters), *args, **kwargs)
Adds a condition in a new sublist in self.parameters. Separations in sublists correspond to "or" in the query.
500 def removeCondition(self, or_position = None, condnum = None): 501 ''' 502 Removes a condition from self.parameters. By default, last element added is removed. 503 Otherwise, the removed condition is the one at self.parameters[or_position][condnum]. 504 505 To remove all conditions, use ESSearch.clearConditions() method. 506 ''' 507 if self.parameters == [[]]: return 508 if or_position is None: 509 if condnum is None: # by default : remove the very last added condition. 510 if len(self.parameters[-1]) > 1: self.parameters[-1].pop(-1) 511 else: self.parameters.pop(-1) 512 else: 513 if len(self.parameters[-1]) > 1 or condnum > 1: self.parameters[-1].pop(condnum) 514 else: self.parameters.pop(-1) 515 else: 516 if condnum is None or (len(self.parameters[or_position]) == 1 and condnum == 0): self.parameters.pop(or_position) # if or_position is not None and condnum is, the whole sublist at or_position is removed. 517 else: self.parameters[or_position].pop(condnum) 518 if self.parameters == []: # ensure self.parameters returns to its default value after being emptied 519 self.parameters = [[]]
Removes a condition from self.parameters. By default, last element added is removed. Otherwise, the removed condition is the one at self.parameters[or_position][condnum].
To remove all conditions, use ESSearch.clearConditions() method.
521 def clearConditions(self): 522 ''' 523 Removes all conditions from self.parameters. 524 To remove all attributes, use ESSearch.clear() method. 525 ''' 526 self.parameters = [[]]
Removes all conditions from self.parameters. To remove all attributes, use ESSearch.clear() method.
528 def clear(self): 529 ''' 530 Resets self. 531 (Creating a new Observation would be smarter than using this function.) 532 ''' 533 self = ESSearch()
Resets self. (Creating a new Observation would be smarter than using this function.)
Getter returning the content of the query or aggregation query to be executed with ESSearch.execute().
Getter returning the cursors of the query or aggregation query result on all collections and cursors contained in self.input.
751 def execute(self, returnmode = 'observation', fillvalue = None, name = None, param = None): 752 ''' 753 Executes the request and returns its result, either in one or many Observations. 754 755 *Parameter* 756 757 - **returnmode** : str (default None) - Parameter giving the format of the output: 758 - 'unchanged' : output is returned as it is in the database, some operations like operations sepcific to TimeSlot object are not performed; 759 - 'observation': Each element is returned as an observation, but original observations aren't recreated; 760 - 'idfused': observations whose ids are the same are merged together; 761 - 'single': return a single observation merging all observations together. 762 - **fillvalue** : (default None) - Value to use to fill gaps when observations are merged together. 763 - **name** : str (default None) - name of the output observation when returnmode is 'single'. 764 - **param** : dict (default None) - param of the output observation when returnmode is 'single'. 765 ''' 766 if returnmode not in {'unchanged', 'observation', 'idfused', 'single'}: raise ValueError("returnmode must have one of these values: 'unchanged', 'observation', 'idfused', 'single'.") 767 if returnmode == 'single': 768 if name is not None and not isinstance(name, str) : raise TypeError("name should be a string.") 769 if param is not None and not isinstance(param, dict): raise TypeError("param should be a dictionnary.") 770 self._filtered = False # Boolean put to True inside of self._cond() if an additional filtering specific to TimeSlot and shapely geometries is necessary. 771 772 ## Construction of a result list where data are in the format given by returnmode 773 774 ## ESSearch.execute() 1: Query is executed on each Mongo Collection or Cursor of self.input 775 776 result = [] 777 for data in self.input[0]: 778 request_type, request_content = self._fullSearchMongo() 779 if request_type == 'find': 780 cursor = data.find(request_content, self._project) 781 else: 782 cursor = data.aggregate(request_content) 783 784 if returnmode == 'observation': # Only in this case is an observation created directly. 785 for item in cursor: 786 if self._filtered: # Additional filtering for objects like TimeSlot who need it 787 for conds in self.parameters: 788 checks_parameters, checks_conds = True, True 789 for cond in conds: 790 if cond['path'] in item: 791 try: checks_conds = checks_conds and self._condcheck(item[cond['path']], cond) # checking for each condition if it is satisfied 792 except: checks_conds = False 793 else: 794 checks_conds = False 795 checks_parameters = checks_parameters or checks_conds 796 dic = {} 797 if '_metadata' in item: 798 if 'name' in item['_metadata']: dic['name'] = item['_metadata']['name'] 799 if 'param' in item['_metadata']: dic['param'] = item['_metadata']['param'] 800 del item['_metadata'] 801 dic['idxdic'] = {key: [item[key]] for key in item} 802 if not self._filtered or (self._filtered and checks_parameters): result.append(Observation.dic(**dic)) 803 804 elif returnmode == 'single': 805 for item in cursor: 806 if '_metadata' in item : del item['_metadata'] 807 result.append(item) 808 809 else: # returnmode == 'unchanged' or returnmode == 'idfused' 810 for item in cursor: 811 if item: 812 result.append(item) 813 814 ## ESSearch.execute() 2: Operations for cases 'idfused' and 'single' are performed on output objects. 815 # (more efficient to do it like this than after a conversion to Observation) 816 817 if returnmode == 'single': 818 arg = {} # argument to be given to Observation.dic() merging all observations together 819 for i in range(len(result)): 820 for column_name in arg: # for columns already in the new Observation 821 if column_name not in result[i]: 822 arg[column_name].append(fillvalue) 823 for column_name in result[i]: # for columns missing in the new Observation 824 if column_name not in arg: 825 arg[column_name] = [fillvalue] * i + [result[i][column_name]] # an empty column filled with fillvalue is added if a new column name is encountered 826 else: 827 arg[column_name].append(result[i][column_name]) 828 if self._filtered: result = [self._filtered_observation(Observation.dic(arg))] 829 else: result = [Observation.dic(arg)] 830 831 elif returnmode == 'idfused': 832 hashs_dic = {} 833 for item in result: 834 id = str(item['_metadata']['id']) # will throw an error if item has no id. Should items with no id be let as is or merged together? 835 if id in hashs_dic: # one line is added to hashs_dic[id] for each element of result having this id 836 del item['_metadata'] # Two items with the same id should have the same metadata. 837 for column_name in hashs_dic[id]['idxdic']: 838 if column_name not in item: 839 hashs_dic[id]['idxdic'][column_name].append(fillvalue) 840 for column_name in item: 841 if column_name not in hashs_dic[id]['idxdic']: 842 hashs_dic[id]['idxdic'][column_name] = [fillvalue] * i + [item[column_name]] # a filled column is added if a new column name is encountered 843 else: 844 hashs_dic[id]['idxdic'][column_name].append(item[column_name]) 845 else: 846 dic = {} 847 if 'name' in item['_metadata']: dic['name'] = item['_metadata']['name'] 848 if 'param' in item['_metadata']: dic['param'] = item['_metadata']['param'] 849 del item['_metadata'] 850 dic['idxdic'] = {key: [item[key]] for key in item} 851 hashs_dic[id] = dic 852 result = [] 853 for id in hashs_dic: # an Observation is added to result for each id 854 obs_out = Observation.dic(**hashs_dic[id]) 855 if obs_out: 856 if self._filtered: result.append(self._filtered_observation(obs_out)) # finalement, semble plus pertinent de faire ce filtrage directemt sur la sortie Mongo, car même si un à un les tests de condition sont faits à de nombreuses reprises, au global ce ne sont jamais les mêmes combinaisons de test 857 else: result.append(obs_out) 858 # At this point, result is a list of observations. 859 860 ## ESSearch.execute() 3: Other inputs (pure observations) are treated purely in python with the self._filtered_observation() method 861 862 for data in self.input[1]: # data which are not taken from a Mongo database and already are observations are treated here. 863 result.append(self._filtered_observation(data, False)) 864 865 ## ESSearch.execute() 4: Operations for cases 'idfused' and 'single' are performed again with the added observations 866 867 if len(result) >= 1: 868 if returnmode == 'single': 869 result = self._fusion(result, fillvalue) 870 if name is None: name = "ESSearch query result on " + str(datetime.datetime.now()) # default value for name 871 if param is None: # default value for param 872 if self.sources is not None: 873 sources = self.sources 874 else: 875 sources = [] 876 for item in self.input: # informations about the inputs are added to param 877 if isinstance(item, Observation): 878 if item.name is not None: 879 sources.append('Observation: ' + item.name) 880 else: 881 sources.append('data') 882 elif isinstance(item, Collection): 883 sources.append('MongoDB collection: ' + item.name + ' from database: ' + item.database.name) 884 elif isinstance(item, Cursor): 885 sources.append('Pymongo cursor: ' + item.cursor_id + ' from collection ' + item.collection.name + 886 ' from database: ' + item.collection.database.name) 887 elif isinstance(item, CommandCursor): 888 sources.append('Pymongo commandcursor: ' + item.cursor_id + ' from collection ' + item.collection.name + 889 ' from database: ' + item.collection.database.name) 890 else: # should not happen 891 sources.append('data') 892 param = {'date': str(datetime.datetime.now()), 'project': 'essearch', 'type': 'dim3', 893 'context': {'origin': 'ESSearch query', 'sources ': sources, 894 'ESSearch_parameters': str(self.parameters)}} 895 result.param = param 896 897 elif returnmode == 'idfused' and len(result) != 1: 898 hashs_dic = {} # This dictionnary is in the format {id: [observations]} 899 for item in result: 900 if item.id in hashs_dic: 901 hashs_dic[item.id].append(item) 902 else: 903 hashs_dic[item.id] = [item] 904 result = [] 905 for id in hashs_dic: 906 obs_out = self._fusion(hashs_dic[id], fillvalue) 907 if hashs_dic[id][0].name : obs_out.name = hashs_dic[id][0].name # name and param associated to the id are put back 908 if hashs_dic[id][0].param: obs_out.name = hashs_dic[id][0].param 909 if obs_out: result.append(obs_out) 910 911 ## ESSearch.execute() 5: Return result 912 913 return result
Executes the request and returns its result, either in one or many Observations.
Parameter
- returnmode : str (default None) - Parameter giving the format of the output:
- 'unchanged' : output is returned as it is in the database, some operations like operations sepcific to TimeSlot object are not performed;
- 'observation': Each element is returned as an observation, but original observations aren't recreated;
- 'idfused': observations whose ids are the same are merged together;
- 'single': return a single observation merging all observations together.
- fillvalue : (default None) - Value to use to fill gaps when observations are merged together.
- name : str (default None) - name of the output observation when returnmode is 'single'.
- param : dict (default None) - param of the output observation when returnmode is 'single'.