00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037 import pymongo as pm
00038 import gridfs as gfs
00039 import rospy
00040 import StringIO
00041
00042 class MessageCollection:
00043
00044 def __init__(self, db, coll, msg_class,
00045 db_host=None, db_port=None, indexes=[]):
00046 """
00047 @param db: Name of database
00048 @param coll: Name of collection
00049 @param indexes: List of fields to build indexes on.
00050 @param msg_class: The class of the message object being stored
00051 @param db_host: The host where the db server is listening.
00052 @param db_port: The port on which the db server is listening.
00053
00054 Creates collection, db, and indexes if don't already exist.
00055 The database host and port are set to the provided values if given.
00056 If not, the ROS parameters warehouse_host and warehouse_port are used,
00057 and these in turn default to localhost and 27017.
00058 """
00059
00060
00061 self.host = db_host or rospy.get_param('warehouse_host', 'localhost')
00062 self.port = db_port or rospy.get_param('warehouse_port', 27017)
00063 while not rospy.is_shutdown():
00064 try:
00065 self.conn = pm.Connection(self.host, self.port)
00066 break
00067 except:
00068 rospy.loginfo( "Attempting to connect to mongodb @ {0}:{1}".\
00069 format(self.host,self.port))
00070 rospy.sleep(2.0)
00071
00072
00073 self.db = self.conn[db]
00074 self.coll = self.db[coll]
00075 self.fs = gfs.GridFS(self.db)
00076 self.msg_class = msg_class
00077
00078
00079 for ind in indexes:
00080 self.ensure_index(ind)
00081 self.ensure_index('creation_time')
00082
00083
00084
00085
00086
00087
00088 def ensure_index(self, ind):
00089 info = self.coll.index_information()
00090 if ind in info:
00091 rospy.logdebug("Index {0} already exists".format(ind))
00092 else:
00093 self.coll.ensure_index(ind, name=ind)
00094
00095
00096 def insert(self, m, metadata={}):
00097 """
00098 @param m: Message to insert
00099 @param metadata: Dictionary of metadata to associate with message
00100 """
00101
00102 buff = StringIO.StringIO()
00103 m.serialize(buff)
00104 v = buff.getvalue()
00105 msg_id = self.fs.put(v)
00106
00107
00108 entry= metadata.copy()
00109 entry['blob_id'] = msg_id
00110 entry['creation_time'] = rospy.Time.now().to_sec()
00111
00112
00113 self.coll.insert(entry)
00114
00115
00116 def query(self, query, metadata_only=False, sort_by='', ascending=True):
00117 """
00118 Perform a query.
00119
00120 @return: Iterator over tuples (message, metadata) if metadata_only is
00121 False, or iterator over metadata if it's true
00122 """
00123 if sort_by:
00124 results = self.coll.find(query, sort=[(sort_by, pm.ASCENDING if
00125 ascending else pm.DESCENDING)])
00126 else:
00127 results = self.coll.find(query)
00128
00129 if metadata_only:
00130 return results
00131 else:
00132 return (self.process_entry(r) for r in results)
00133
00134 def find_one(self, query, metadata_only=False, sort_by='', ascending=True):
00135 """
00136 Like query except returns a single matching item, or None if
00137 no item exists
00138 """
00139 return next(self.query(query, metadata_only, sort_by, ascending), None)
00140
00141 def remove(self, query):
00142 "Remove items matching query and return number of removed items."
00143 num_removed = 0
00144 for item in self.query(query, metadata_only=True):
00145 self.coll.remove(item['_id'])
00146 num_removed += 1
00147 self.fs.delete(item['blob_id'])
00148 return num_removed
00149
00150 def process_entry(self, r):
00151 blob = self.fs.get(r['blob_id'])
00152 msg = self.msg_class()
00153 msg.deserialize(blob.read())
00154 return msg, r
00155
00156 def update(self, entry, metadata=None, msg=None):
00157 """
00158 Update a message and/or metadata.
00159
00160 @param entry: The existing metadata entry
00161 @param metadata: Updates to metadata. These are merged with the existing dictionary entries.
00162 @param msg: If specified, a new message object to store in place of the current one.
00163 """
00164 old_blob_id = None
00165 if msg:
00166 buf = StringIO.StringIO()
00167 msg.serialize(buf)
00168 v = buf.getvalue()
00169 new_msg_id = self.fs.put(v)
00170 old_blob_id = entry['blob_id']
00171 entry['blob_id'] = new_msg_id
00172 if metadata:
00173 entry.update(metadata)
00174 self.coll.save(entry, safe=True)
00175 if old_blob_id:
00176 self.fs.delete(old_blob_id)
00177
00178 def count(self):
00179 return self.coll.count()
00180
00181
00182
00183
00184
00185
00186
00187