00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037 import pymongo as pm
00038 import gridfs as gfs
00039 import rospy
00040 import StringIO
00041 import std_msgs.msg
00042 import json
00043 import bson.json_util
00044
00045 class MessageCollection:
00046
00047 def __init__(self, db, coll, msg_class,
00048 db_host=None, db_port=None, indexes=[]):
00049 """
00050 @param db: Name of database
00051 @param coll: Name of collection
00052 @param indexes: List of fields to build indexes on.
00053 @param msg_class: The class of the message object being stored
00054 @param db_host: The host where the db server is listening.
00055 @param db_port: The port on which the db server is listening.
00056
00057 Creates collection, db, and indexes if don't already exist.
00058 The database host and port are set to the provided values if given.
00059 If not, the ROS parameters warehouse_host and warehouse_port are used,
00060 and these in turn default to localhost and 27017.
00061 """
00062
00063
00064 self.host = db_host or rospy.get_param('warehouse_host', 'localhost')
00065 self.port = db_port or rospy.get_param('warehouse_port', 27017)
00066 while not rospy.is_shutdown():
00067 try:
00068 self.conn = pm.Connection(self.host, self.port)
00069 break
00070 except:
00071 rospy.loginfo( "Attempting to connect to mongodb @ {0}:{1}".\
00072 format(self.host,self.port))
00073 rospy.sleep(2.0)
00074
00075
00076 self.db = self.conn[db]
00077 self.coll = self.db[coll]
00078 self.fs = gfs.GridFS(self.db)
00079 self.msg_class = msg_class
00080
00081
00082 for ind in indexes:
00083 self.ensure_index(ind)
00084 self.ensure_index('creation_time')
00085
00086
00087
00088
00089 insertion_topic = 'warehouse/{0}/{1}/inserts'.format(db, coll)
00090 self.insertion_pub = rospy.Publisher(insertion_topic, std_msgs.msg.String,
00091 latch=True, queue_size=5)
00092
00093
00094 def ensure_index(self, ind, **kwargs):
00095 info = self.coll.index_information()
00096 if ind in info:
00097 rospy.logdebug("Index {0} already exists".format(ind))
00098 else:
00099 kwargs['name'] = ind
00100 self.coll.ensure_index(ind, **kwargs)
00101
00102
00103 def insert(self, m, metadata={}, **kwargs):
00104 """
00105 @param m: Message to insert
00106 @param metadata: Dictionary of metadata to associate with message
00107 """
00108
00109 buff = StringIO.StringIO()
00110 m.serialize(buff)
00111 v = buff.getvalue()
00112 msg_id = self.fs.put(v)
00113
00114
00115 entry= metadata.copy()
00116 entry['blob_id'] = msg_id
00117 entry['creation_time'] = rospy.Time.now().to_sec()
00118
00119
00120 self.coll.insert(entry, **kwargs)
00121
00122
00123 s = json.dumps(entry, default=bson.json_util.default)
00124 self.insertion_pub.publish(s)
00125
00126
00127 def query(self, query, metadata_only=False, sort_by='', ascending=True):
00128 """
00129 Perform a query.
00130
00131 @return: Iterator over tuples (message, metadata) if metadata_only is
00132 False, or iterator over metadata if it's true
00133 """
00134 if sort_by:
00135 results = self.coll.find(query, sort=[(sort_by, pm.ASCENDING if
00136 ascending else pm.DESCENDING)])
00137 else:
00138 results = self.coll.find(query)
00139
00140 if metadata_only:
00141 return results
00142 else:
00143 return (self.process_entry(r) for r in results)
00144
00145 def find_one(self, query, metadata_only=False, sort_by='', ascending=True):
00146 """
00147 Like query except returns a single matching item, or None if
00148 no item exists
00149 """
00150 return next(self.query(query, metadata_only, sort_by, ascending), None)
00151
00152 def remove(self, query):
00153 "Remove items matching query and return number of removed items."
00154 num_removed = 0
00155 for item in self.query(query, metadata_only=True):
00156 self.coll.remove(item['_id'])
00157 num_removed += 1
00158 self.fs.delete(item['blob_id'])
00159 return num_removed
00160
00161 def process_entry(self, r):
00162 blob = self.fs.get(r['blob_id'])
00163 msg = self.msg_class()
00164 msg.deserialize(blob.read())
00165 return msg, r
00166
00167 def update(self, entry, metadata=None, msg=None):
00168 """
00169 Update a message and/or metadata.
00170
00171 @param entry: The existing metadata entry
00172 @param metadata: Updates to metadata. These are merged with the existing dictionary entries.
00173 @param msg: If specified, a new message object to store in place of the current one.
00174 """
00175 old_blob_id = None
00176 if msg:
00177 buf = StringIO.StringIO()
00178 msg.serialize(buf)
00179 v = buf.getvalue()
00180 new_msg_id = self.fs.put(v)
00181 old_blob_id = entry['blob_id']
00182 entry['blob_id'] = new_msg_id
00183 if metadata:
00184 entry.update(metadata)
00185 self.coll.save(entry, safe=True)
00186 if old_blob_id:
00187 self.fs.delete(old_blob_id)
00188
00189 def count(self):
00190 return self.coll.count()
00191
00192
00193
00194
00195
00196
00197
00198