00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037 import pymongo as pm
00038 import gridfs as gfs
00039 import rospy
00040 import StringIO
00041 import std_msgs.msg
00042 import json
00043 import bson.json_util
00044
00045 class MessageCollection:
00046
00047 def __init__(self, db, coll, msg_class,
00048 db_host=None, db_port=None, indexes=[]):
00049 """
00050 @param db: Name of database
00051 @param coll: Name of collection
00052 @param indexes: List of fields to build indexes on.
00053 @param msg_class: The class of the message object being stored
00054 @param db_host: The host where the db server is listening.
00055 @param db_port: The port on which the db server is listening.
00056
00057 Creates collection, db, and indexes if don't already exist.
00058 The database host and port are set to the provided values if given.
00059 If not, the ROS parameters warehouse_host and warehouse_port are used,
00060 and these in turn default to localhost and 27017.
00061 """
00062
00063
00064 self.host = db_host or rospy.get_param('warehouse_host', 'localhost')
00065 self.port = db_port or rospy.get_param('warehouse_port', 27017)
00066 while not rospy.is_shutdown():
00067 try:
00068 self.conn = pm.Connection(self.host, self.port)
00069 break
00070 except:
00071 rospy.loginfo( "Attempting to connect to mongodb @ {0}:{1}".\
00072 format(self.host,self.port))
00073 rospy.sleep(2.0)
00074
00075
00076 self.db = self.conn[db]
00077 self.coll = self.db[coll]
00078 self.fs = gfs.GridFS(self.db)
00079 self.msg_class = msg_class
00080
00081
00082 for ind in indexes:
00083 self.ensure_index(ind)
00084 self.ensure_index('creation_time')
00085
00086
00087
00088
00089 insertion_topic = 'warehouse/{0}/{1}/inserts'.format(db, coll)
00090 self.insertion_pub = rospy.Publisher(insertion_topic,
00091 std_msgs.msg.String, latch=True)
00092
00093
00094 def ensure_index(self, ind):
00095 info = self.coll.index_information()
00096 if ind in info:
00097 rospy.logdebug("Index {0} already exists".format(ind))
00098 else:
00099 self.coll.ensure_index(ind, name=ind)
00100
00101
00102 def insert(self, m, metadata={}):
00103 """
00104 @param m: Message to insert
00105 @param metadata: Dictionary of metadata to associate with message
00106 """
00107
00108 buff = StringIO.StringIO()
00109 m.serialize(buff)
00110 v = buff.getvalue()
00111 msg_id = self.fs.put(v)
00112
00113
00114 entry= metadata.copy()
00115 entry['blob_id'] = msg_id
00116 entry['creation_time'] = rospy.Time.now().to_sec()
00117
00118
00119 self.coll.insert(entry)
00120
00121
00122 s = json.dumps(entry, default=bson.json_util.default)
00123 self.insertion_pub.publish(s)
00124
00125
00126 def query(self, query, metadata_only=False, sort_by='', ascending=True):
00127 """
00128 Perform a query.
00129
00130 @return: Iterator over tuples (message, metadata) if metadata_only is
00131 False, or iterator over metadata if it's true
00132 """
00133 if sort_by:
00134 results = self.coll.find(query, sort=[(sort_by, pm.ASCENDING if
00135 ascending else pm.DESCENDING)])
00136 else:
00137 results = self.coll.find(query)
00138
00139 if metadata_only:
00140 return results
00141 else:
00142 return (self.process_entry(r) for r in results)
00143
00144 def find_one(self, query, metadata_only=False, sort_by='', ascending=True):
00145 """
00146 Like query except returns a single matching item, or None if
00147 no item exists
00148 """
00149 return next(self.query(query, metadata_only, sort_by, ascending), None)
00150
00151 def remove(self, query):
00152 "Remove items matching query and return number of removed items."
00153 num_removed = 0
00154 for item in self.query(query, metadata_only=True):
00155 self.coll.remove(item['_id'])
00156 num_removed += 1
00157 self.fs.delete(item['blob_id'])
00158 return num_removed
00159
00160 def process_entry(self, r):
00161 blob = self.fs.get(r['blob_id'])
00162 msg = self.msg_class()
00163 msg.deserialize(blob.read())
00164 return msg, r
00165
00166 def update(self, entry, metadata=None, msg=None):
00167 """
00168 Update a message and/or metadata.
00169
00170 @param entry: The existing metadata entry
00171 @param metadata: Updates to metadata. These are merged with the existing dictionary entries.
00172 @param msg: If specified, a new message object to store in place of the current one.
00173 """
00174 old_blob_id = None
00175 if msg:
00176 buf = StringIO.StringIO()
00177 msg.serialize(buf)
00178 v = buf.getvalue()
00179 new_msg_id = self.fs.put(v)
00180 old_blob_id = entry['blob_id']
00181 entry['blob_id'] = new_msg_id
00182 if metadata:
00183 entry.update(metadata)
00184 self.coll.save(entry, safe=True)
00185 if old_blob_id:
00186 self.fs.delete(old_blob_id)
00187
00188 def count(self):
00189 return self.coll.count()
00190
00191
00192
00193
00194
00195
00196
00197