00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030 import roslib; roslib.load_manifest('warehouse')
00031 import pymongo as pm
00032 from pymongo import Connection
00033 from pymongo import json_util
00034 from pymongo import objectid
00035 import bson
00036 import gridfs
00037 import rospy
00038 import warehouse.srv as whs
00039 import warehouse.msg as whm
00040 import StringIO
00041 import threading
00042 import roslib.rostime as rostime
00043 import std_msgs.msg
00044 import json
00045 import yaml
00046 import gc
00047
00048 from rospy import loginfo
00049
00050 DELIMETER='___'
00051
00052
00053
00054
00055
00056
00057 def legal_db_name(name):
00058 return not name in ['admin', 'local'] and \
00059 name and not name.startswith('_')
00060
00061
00062 def legal_collection_name(name):
00063 return not name in ['system.indexes', 'fs.chunks'] and \
00064 name and not name.startswith('_')
00065
00066 def legal_topic_name(name):
00067 return name.startswith('/')
00068
00069 def updatable_metadata_field(name):
00070 return name and not name.startswith('_')
00071
00072 def extract_nested_field(msg, f):
00073 """
00074 Helper function that extracts a nested field from a ros message
00075 """
00076 for field in f:
00077 msg = getattr(msg, field)
00078 return msg
00079
00080 def parse_val(s, example):
00081 """
00082 Parse a string val based on the type of example
00083 """
00084 if isinstance(example, str) or isinstance(example, unicode):
00085 return s
00086 elif isinstance(example, int):
00087 return int(s)
00088 elif isinstance(example, float):
00089 try:
00090 return float(s)
00091 except ValueError:
00092 return s
00093 else:
00094 print ("Example {0} has type {1}".format(example, type(example)))
00095 raise ValueError(example)
00096
00097 def construct_query(q, example):
00098 """
00099 Helper to construct a mongo db query
00100 """
00101 query = {}
00102 for c in q:
00103 fields = c.field_name.split('.')
00104 db_field = DELIMETER.join(fields)
00105
00106
00107
00108
00109 dummy_val = example.get(db_field, 0.0)
00110
00111 if c.predicate == whm.Condition.EQUALS:
00112 v = parse_val(c.args[0], dummy_val)
00113 elif c.predicate == whm.Condition.LESS_THAN:
00114 v = { "$lt" : parse_val(c.args[0], dummy_val) }
00115 elif c.predicate == whm.Condition.GREATER_THAN:
00116 v = { "$gt" : parse_val(c.args[0], dummy_val) }
00117 else:
00118 raise ValueError(c)
00119 query[db_field] = v
00120 return query
00121
00122 def doc_to_string(d):
00123 return json.dumps(d, default=json_util.default)
00124
00125 def metadata_resp(code, msg):
00126 rospy.logerr("Handle metadata returned error {0}: {1}".\
00127 format(code, msg))
00128 return whs.MetadataResponse(error_code=code, error_msg=msg)
00129
00130
00131
00132
00133
00134
00135 class CollectionHandler:
00136 """
00137 A class that handles insertion of data in a collection.
00138 """
00139
00140 def __init__(self, req, msg_class, conn, topic):
00141 """
00142 Constructor. Listens to a topic named
00143 /warehouse/<db_name>/<collection_name>/insert.
00144 Inserts data of the type <msg_type> when it comes in over the wire.
00145
00146 @param req: The SetupCollection service request
00147 @param msg_type: The message data type
00148 @param conn: The connection to the db
00149 @param topic: The topic to push data on
00150
00151 @todo Should check the indexed fields for correctness here rather than
00152 dying in the callback
00153 """
00154
00155 self.connection = conn
00156 self.db = conn[req.db_name]
00157 self.collection = self.db[req.collection_name]
00158 self.fs = gridfs.GridFS(self.db)
00159 self.cursors = {}
00160 self.query_pubs = {}
00161 self.query_metadata_pubs = {}
00162 self.msg_class = msg_class
00163 self.publish_messages = {}
00164
00165
00166 notification_topic = '~{0}/{1}/notify'.format(req.db_name,
00167 req.collection_name)
00168 self.indexed_fields = [f.split('.') for f in req.indexed_fields if \
00169 f not in req.additional_metadata_fields]
00170
00171 self.sub = rospy.Subscriber(topic, rospy.AnyMsg, self.handle_push)
00172
00173
00174 self.pub = rospy.Publisher(notification_topic,
00175 whm.UpdateNotification)
00176
00177 def cleanup(self):
00178 self.sub.unregister()
00179 self.pub.unregister()
00180
00181 def handle_push(self, raw_msg):
00182 """
00183 Handles the insertion of data into a collection.
00184 """
00185
00186
00187 msg_id = self.fs.put(raw_msg._buff)
00188
00189
00190 db_entry = {}
00191 db_entry['_blob_id'] = msg_id
00192
00193
00194 if self.indexed_fields:
00195 msg = self.msg_class()
00196 msg.deserialize(raw_msg._buff)
00197
00198 for f in self.indexed_fields:
00199 val = extract_nested_field(msg, f)
00200 if isinstance(val, rostime.Time) or \
00201 isinstance(val, rostime.Duration):
00202
00203
00204 val = val.to_sec()
00205 db_entry[DELIMETER.join(f)] = val
00206
00207
00208 db_entry['_creation_time'] = rospy.Time.now().to_sec()
00209
00210
00211 self.collection.insert(db_entry)
00212
00213
00214 self.pub.publish(id=str(db_entry['_id']),
00215 update_type=whm.UpdateNotification.NEW_MESSAGE,
00216 sender=raw_msg._connection_header['callerid'],
00217 metadata=doc_to_string(db_entry))
00218
00219
00220
00221 def initiate_query(self, req, id):
00222
00223
00224 example = self.collection.find_one()
00225
00226
00227
00228
00229 if example is None:
00230 self.cursors[id] = self.collection.find()
00231
00232 else:
00233 self.cursors[id] = self.collection.find(
00234 spec=construct_query(req.query, example),
00235 sort=construct_order(req.order))
00236
00237 self.query_pubs[id] = rospy.Publisher(
00238 '~{0}/{1}/{2}'.format(req.db_name, req.collection_name, id),
00239 self.msg_class, latch=True)
00240 self.query_metadata_pubs[id] = rospy.Publisher(
00241 '~{0}/{1}/metadata/{2}'.format(req.db_name, req.collection_name, id),
00242 std_msgs.msg.String, latch=True)
00243 self.publish_messages[id] = not req.metadata_only
00244
00245 return whs.InitiateQueryResponse(query_id=id)
00246
00247 def pull_message(self, id):
00248 try:
00249 item = self.cursors[id].next()
00250 m = rospy.AnyMsg()
00251 m._buff = self.fs.get(item['_blob_id']).read()
00252 if self.publish_messages[id]:
00253 self.query_pubs[id].publish(m)
00254 self.query_metadata_pubs[id].publish(doc_to_string(item))
00255 return 1
00256 except StopIteration:
00257 self.query_pubs[id].unregister()
00258 self.query_metadata_pubs[id].unregister()
00259 del self.cursors[id]
00260 del self.publish_messages[id]
00261 del self.query_pubs[id]
00262 del self.query_metadata_pubs[id]
00263 return 0
00264
00265 def handle_remove(self, cond):
00266
00267 example = self.collection.find_one()
00268 if example is None:
00269 return 0
00270
00271
00272 q = construct_query(cond, example)
00273
00274
00275
00276
00277
00278 cursor = self.collection.find(q)
00279 blobs = [item['_blob_id'] for item in cursor]
00280
00281
00282 self.collection.remove(q)
00283
00284
00285 for b in blobs:
00286 self.fs.delete(b)
00287
00288 return len(blobs)
00289
00290
00291 def handle_metadata(self, req):
00292 for i in range(50):
00293 item = self.collection.find_one(objectid.ObjectId(req.id))
00294 if item is not None:
00295 if i>0:
00296 rospy.logerr("Found {0} in {1}/{2} on attempt {3}".\
00297 format(req.id, req.db_name,
00298 req.collection_name, i))
00299 break
00300 rospy.logerr("Couldn't find {0} in {1}/{2} (attempt {3})".\
00301 format(req.id, req.db_name, req.collection_name, i))
00302 rospy.sleep(0.01)
00303 if item is None:
00304 return metadata_resp(whs.MetadataResponse.INVALID_ID,
00305 "Couldn't find item {0} in {1}/{2}".\
00306 format(req.id, req.db_name,
00307 req.collection_name))
00308
00309
00310 try:
00311 meta = yaml.load(req.metadata)
00312 except ValueError as e:
00313 return metadata_resp(whs.MetadataResponse.JSON_ERROR,
00314 "Error '{0}' parsing metadata: {1}".\
00315 format(e, req.metadata))
00316
00317
00318 for k, v in meta.items():
00319 if updatable_metadata_field(k):
00320 item[k] = v
00321 else:
00322 rospy.warn("Cannot modify value of metadata field {0}".\
00323 format(k))
00324 self.collection.save(item)
00325
00326
00327 self.pub.publish(id=req.id,
00328 update_type=whm.UpdateNotification.UPDATE_METADATA,
00329 sender = req._connection_header['callerid'],
00330 metadata=doc_to_string(item))
00331 return whs.MetadataResponse(error_code=whs.MetadataResponse.SUCCESS)
00332
00333
00334
00335 def construct_order(order):
00336 """
00337 Construct a Mongodb ordering criterion from Ros message.
00338 If the field is empty, return None.
00339 """
00340 if order.field:
00341 if order.field == '_creation_time':
00342 db_field = '_creation_time'
00343 else:
00344 fields = order.field.split('.')
00345 db_field = DELIMETER.join(fields)
00346 return [(db_field, pm.DESCENDING if order.reverse else pm.ASCENDING)]
00347
00348
00349
00350
00351
00352
00353 class Backend:
00354 """
00355 The mongo database interface. This serves as the backend to the warehouse.
00356 """
00357 def __init__(self, host, port):
00358 """
00359 Constructor for the DB Interface. This creates a connection to the MonogDB
00360 and establishes a set of services
00361 """
00362 done = False
00363 self.handlers = {}
00364
00365 self.host = host
00366 self.port = port
00367
00368
00369 while not done and not rospy.is_shutdown():
00370 done = True
00371 try:
00372 loginfo( "Attempting to connect to mongodb @ {0}:{1}".\
00373 format(self.host,self.port))
00374 self.connection = Connection(self.host, self.port)
00375 except:
00376 done = False
00377 rospy.sleep(2.0)
00378
00379 if rospy.is_shutdown():
00380 rospy.logwarn( "Warehouse backend has been killed" )
00381 return
00382
00383 loginfo( "Connected to mongodb" )
00384
00385 self.lock = threading.Lock()
00386 self.next_query_id = 0
00387 self.query_handlers = {}
00388
00389 self.close_collection_srv = rospy.Service('close_collection',
00390 whs.CloseCollection,
00391 self.handle_close_collection)
00392
00393
00394 self.list_collections_srv = rospy.Service('list_collections',
00395 whs.ListCollections,
00396 self.list_collections)
00397
00398
00399 self.setup_collection = rospy.Service('setup_collection',
00400 whs.SetupCollection,
00401 self.handle_setup_collection)
00402
00403
00404 self.initiate_query_srv = rospy.Service('initiate_query',
00405 whs.InitiateQuery,
00406 self.handle_query)
00407
00408
00409 self.pull_message_srv = rospy.Service('pull_message',
00410 whs.PullMessage,
00411 self.handle_pull)
00412
00413
00414 self.drop_db = rospy.Service('drop_db',
00415 whs.DropDB,
00416 self.handle_drop)
00417
00418
00419 self.metadata_srv = rospy.Service('add_metadata',
00420 whs.Metadata,
00421 self.handle_metadata)
00422
00423
00424 self.remove_msg_srv = rospy.Service('remove_messages',
00425 whs.RemoveMessages,
00426 self.handle_remove)
00427
00428 def handle_remove(self, req):
00429 def resp(code, msg, num=0):
00430 return whs.RemoveMessagesResponse(error_code=code, error_msg=msg,
00431 num_messages_removed=num)
00432
00433 with self.lock:
00434 try:
00435 db = self.handlers[req.db_name]
00436 except KeyError:
00437 return resp(whs.RemoveMessagesResponse.INVALID_DB_NAME,
00438 "Db {0} not found".format(req.db_name))
00439 try:
00440 h = db[req.collection_name]
00441 except KeyError:
00442 return resp(whs.RemoveMessagesResponse.INVALID_COLLECTION_NAME,
00443 "Collection {0} not found".format(req.collection_name))
00444 response = resp(whs.RemoveMessagesResponse.SUCCESS, '',
00445 h.handle_remove(req.conditions))
00446 return response
00447
00448
00449
00450 def handle_metadata(self, req):
00451 with self.lock:
00452 try:
00453 db = self.handlers[req.db_name]
00454 except KeyError:
00455 return metadata_resp(whs.MetadataResponse.INVALID_DB_NAME,
00456 "Db {0} not found".format(req.db_name))
00457
00458 try:
00459 h = db[req.collection_name]
00460 except KeyError:
00461 return metadata_resp(whs.MetadataResponse.INVALID_COLLECTION_NAME,
00462 "Collection {0} not present in {1}".\
00463 format(req.collection_name, req.db_name))
00464
00465 resp = h.handle_metadata(req)
00466 return resp
00467
00468
00469
00470 def handle_drop(self, req):
00471 """
00472 Drop a database
00473 """
00474 with self.lock:
00475 assert(not req.db_name in self.handlers)
00476 self.connection.drop_database(req.db_name)
00477 loginfo('dropped %s', req.db_name)
00478 return whs.DropDBResponse()
00479
00480 def handle_pull(self, req):
00481 """
00482 Cause next message on a query to be published
00483 """
00484 with self.lock:
00485 n = self.query_handlers[req.query_id].pull_message(req.query_id)
00486 if n == 0:
00487 del self.query_handlers[req.query_id]
00488 return whs.PullMessageResponse(num_messages_sent=n)
00489
00490
00491 def handle_query(self, req):
00492 """
00493 Pass off the query to the appropriate collection
00494 """
00495 with self.lock:
00496 query_id = 'messages_{0}'.format(self.next_query_id)
00497 self.next_query_id += 1
00498 handler = self.handlers[req.db_name][req.collection_name]
00499 self.query_handlers[query_id] = handler
00500 resp = handler.initiate_query(req, query_id)
00501 return resp
00502
00503
00504 def get_db(self, name):
00505 """
00506 Create db if necessary.
00507 """
00508 created = False
00509 if name not in self.connection.database_names():
00510
00511 self.connection[name].create_collection('___DUMMY_TABLE___')
00512 loginfo('Created database %s', name)
00513 created = True
00514
00515
00516 assert name in self.connection.database_names()
00517 resp = (self.connection[name], created)
00518 return resp
00519
00520 def handle_close_collection(self, req):
00521 with self.lock:
00522 d_name = req.db_name
00523 c_name = req.collection_name
00524 if d_name in self.handlers and c_name in self.handlers[d_name]:
00525 self.handlers[d_name][c_name].cleanup()
00526 del self.handlers[d_name][c_name]
00527 if len(self.handlers[d_name]) == 0:
00528 del self.handlers[d_name]
00529 return whs.CloseCollectionResponse()
00530
00531 def list_collections(self, req):
00532 """
00533 Service callback that handles db and collection existence checks
00534 @param req: The whs.ListCollections service request
00535 """
00536 with self.lock:
00537 db_exists = False
00538 collections = []
00539
00540 if req.db_name in self.connection.database_names():
00541 db_exists = True
00542 collections = [c['name'].encode() for c in \
00543 self.connection[req.db_name]._ros_collection_info.find()]
00544
00545 return whs.ListCollectionsResponse(db_exists=db_exists,
00546 collections=collections)
00547
00548 def handle_setup_collection(self, req):
00549 """
00550 Service callback that handles setting up new collection
00551 @param req: The whs.SetupCollection service request
00552 """
00553 from warehouse.srv import SetupCollectionResponse
00554
00555 with self.lock:
00556 def error_msg(ec, msg):
00557 return SetupCollectionResponse(error_code=ec, error_msg=msg)
00558
00559
00560 topic = req.topic or rospy.resolve_name\
00561 ('~' + req.db_name +'/'+ req.collection_name + '/insert')
00562
00563
00564 if not legal_db_name(req.db_name):
00565 return error_msg(SetupCollectionResponse.ILLEGAL_DB_NAME,
00566 'Illegal db name {0}'.format(req.db_name))
00567 if not legal_collection_name(req.collection_name):
00568 return error_msg(SetupCollectionResponse.ILLEGAL_COLLECTION_NAME,
00569 'Illegal collection name {0}'.\
00570 format(req.collection_name))
00571 if not legal_topic_name(topic):
00572 return error_msg(SetupCollectionResponse.ILLEGAL_TOPIC_NAME,
00573 '{0} was not a globally qualified topic name'.\
00574 format(topic))
00575
00576
00577 try:
00578
00579 roslib.load_manifest(req.msg_pkg)
00580 m = __import__(req.msg_pkg+".msg")
00581 mod = getattr(m, 'msg')
00582 except roslib.packages.InvalidROSPkgException:
00583 return error_msg(whs.SetupCollectionResponse.PACKAGE_NOT_FOUND, \
00584 'package {0} not found'.format(req.msg_pkg))
00585 except ImportError:
00586 return error_msg(SetupCollectionResponse.MESSAGE_NOT_FOUND_IN_PACKAGE,
00587 'no messages found in {0}'.format(req.msg_pkg))
00588 rospy.logdebug('Imported %s', mod)
00589
00590
00591 try:
00592 cls = getattr(mod, req.msg_type)
00593 except AttributeError:
00594 return error_msg(whs.SetupCollectionResponse.MESSAGE_NOT_FOUND_IN_PACKAGE, \
00595 'message type {0} not found in {1}'.format(req.msg_type, req.msg_package))
00596 loginfo('Looked up class %s', mod)
00597
00598
00599 db, db_created = self.get_db(req.db_name)
00600 c = req.collection_name
00601 collection_created = False
00602 info = db._ros_collection_info.find_one({'name' : c})
00603 dummy = cls()
00604 if info:
00605 existing_info = (info['msg_pkg'], info['msg_type'], info['indexed_fields'], info['md5'])
00606 req_info = (req.msg_pkg, req.msg_type, req.indexed_fields, dummy._md5sum)
00607 if existing_info != req_info:
00608 return error_msg(whs.SetupCollectionResponse.EXISTING_COLLECTION_MISMATCH,
00609 'Args {0} did not match saved info {1}'.format(req_info, existing_info))
00610 else:
00611
00612 for f in req.indexed_fields:
00613 if f not in req.additional_metadata_fields:
00614 try:
00615 extract_nested_field(dummy, f.split('.'))
00616 except AttributeError:
00617 return error_msg(whs.SetupCollectionResponse.INVALID_INDEXED_FIELD,
00618 msg='Field {0} not found in message'.format(f))
00619
00620
00621 doc = {'name': c, 'msg_pkg': req.msg_pkg, 'msg_type': req.msg_type, \
00622 'indexed_fields': req.indexed_fields, 'md5': dummy._md5sum, 'fulltext': dummy._full_text}
00623 db._ros_collection_info.insert(doc)
00624 collection_created = True
00625 loginfo('Created collection {3}/{2} for message type {0}/{1}'.\
00626 format(doc['msg_pkg'], doc['msg_type'], c, req.db_name))
00627
00628
00629 for f in req.indexed_fields:
00630 name = DELIMETER.join(f.split('.'))
00631 db[c].create_index(name, name=name)
00632 loginfo('Generated index on {0}'.format(name))
00633
00634
00635 if not req.db_name in self.handlers:
00636 self.handlers[req.db_name] = {}
00637
00638 if not c in self.handlers[req.db_name]:
00639 self.handlers[req.db_name][c] = CollectionHandler(req, cls, self.connection, topic)
00640 loginfo('Created handler')
00641
00642
00643 return whs.SetupCollectionResponse(\
00644 error_code=whs.SetupCollectionResponse.SUCCESS, db_created=db_created,
00645 collection_created=collection_created, topic=topic)
00646
00647 if __name__ == '__main__':
00648 rospy.init_node('warehouse')
00649 host = rospy.get_param('~host', 'localhost')
00650 port = rospy.get_param('~port', 27017)
00651 back = Backend(host, port)
00652 rospy.spin()