$search
00001 # Software License Agreement (BSD License) 00002 # 00003 # Copyright (c) 2008, Willow Garage, Inc. 00004 # All rights reserved. 00005 # 00006 # Redistribution and use in source and binary forms, with or without 00007 # modification, are permitted provided that the following conditions 00008 # are met: 00009 # 00010 # * Redistributions of source code must retain the above copyright 00011 # notice, this list of conditions and the following disclaimer. 00012 # * Redistributions in binary form must reproduce the above 00013 # copyright notice, this list of conditions and the following 00014 # disclaimer in the documentation and/or other materials provided 00015 # with the distribution. 00016 # * Neither the name of Willow Garage, Inc. nor the names of its 00017 # contributors may be used to endorse or promote products derived 00018 # from this software without specific prior written permission. 00019 # 00020 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 00021 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 00022 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 00023 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 00024 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 00025 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 00026 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 00027 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 00028 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 00029 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 00030 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 00031 # POSSIBILITY OF SUCH DAMAGE. 00032 # 00033 # Author: Bhaskara Marthi 00034 00035 # Collection of messages stored in a Mongo table and GridFS 00036 00037 import pymongo as pm 00038 import gridfs as gfs 00039 import rospy 00040 import StringIO 00041 00042 class MessageCollection: 00043 00044 def __init__(self, db, coll, msg_class, 00045 db_host=None, db_port=None, indexes=[]): 00046 """ 00047 @param db: Name of database 00048 @param coll: Name of collection 00049 @param indexes: List of fields to build indexes on. 00050 @param msg_class: The class of the message object being stored 00051 @param db_host: The host where the db server is listening. 00052 @param db_port: The port on which the db server is listening. 00053 00054 Creates collection, db, and indexes if don't already exist. 00055 The database host and port are set to the provided values if given. 00056 If not, the ROS parameters warehouse_host and warehouse_port are used, 00057 and these in turn default to localhost and 27017. 00058 """ 00059 00060 # Connect to mongo 00061 self.host = db_host or rospy.get_param('warehouse_host', 'localhost') 00062 self.port = db_port or rospy.get_param('warehouse_port', 27017) 00063 while not rospy.is_shutdown(): 00064 try: 00065 self.conn = pm.Connection(self.host, self.port) 00066 break 00067 except: 00068 rospy.loginfo( "Attempting to connect to mongodb @ {0}:{1}".\ 00069 format(self.host,self.port)) 00070 rospy.sleep(2.0) 00071 00072 # Set up db, collection, gridfs 00073 self.db = self.conn[db] 00074 self.coll = self.db[coll] 00075 self.fs = gfs.GridFS(self.db) 00076 self.msg_class = msg_class 00077 00078 # Indexes 00079 for ind in indexes: 00080 self.ensure_index(ind) 00081 self.ensure_index('creation_time') 00082 00083 # Add to the metatable 00084 00085 # Publish insertion notification 00086 00087 00088 def ensure_index(self, ind): 00089 info = self.coll.index_information() 00090 if ind in info: 00091 rospy.logdebug("Index {0} already exists".format(ind)) 00092 else: 00093 self.coll.ensure_index(ind, name=ind) 00094 00095 00096 def insert(self, m, metadata={}): 00097 """ 00098 @param m: Message to insert 00099 @param metadata: Dictionary of metadata to associate with message 00100 """ 00101 # Insert raw message into gridFS 00102 buff = StringIO.StringIO() 00103 m.serialize(buff) 00104 v = buff.getvalue() 00105 msg_id = self.fs.put(v) 00106 00107 # Create db entry 00108 entry= metadata.copy() 00109 entry['blob_id'] = msg_id 00110 entry['creation_time'] = rospy.Time.now().to_sec() 00111 00112 # Insert message info 00113 self.coll.insert(entry) 00114 00115 00116 def query(self, query, metadata_only=False, sort_by='', ascending=True): 00117 """ 00118 Perform a query. 00119 00120 @return: Iterator over tuples (message, metadata) if metadata_only is 00121 False, or iterator over metadata if it's true 00122 """ 00123 if sort_by: 00124 results = self.coll.find(query, sort=[(sort_by, pm.ASCENDING if 00125 ascending else pm.DESCENDING)]) 00126 else: 00127 results = self.coll.find(query) 00128 00129 if metadata_only: 00130 return results 00131 else: 00132 return (self.process_entry(r) for r in results) 00133 00134 def find_one(self, query, metadata_only=False, sort_by='', ascending=True): 00135 """ 00136 Like query except returns a single matching item, or None if 00137 no item exists 00138 """ 00139 return next(self.query(query, metadata_only, sort_by, ascending), None) 00140 00141 def remove(self, query): 00142 "Remove items matching query and return number of removed items." 00143 num_removed = 0 00144 for item in self.query(query, metadata_only=True): 00145 self.coll.remove(item['_id']) 00146 num_removed += 1 00147 self.fs.delete(item['blob_id']) 00148 return num_removed 00149 00150 def process_entry(self, r): 00151 blob = self.fs.get(r['blob_id']) 00152 msg = self.msg_class() 00153 msg.deserialize(blob.read()) 00154 return msg, r 00155 00156 def update(self, entry, metadata=None, msg=None): 00157 """ 00158 Update a message and/or metadata. 00159 00160 @param entry: The existing metadata entry 00161 @param metadata: Updates to metadata. These are merged with the existing dictionary entries. 00162 @param msg: If specified, a new message object to store in place of the current one. 00163 """ 00164 old_blob_id = None 00165 if msg: 00166 buf = StringIO.StringIO() 00167 msg.serialize(buf) 00168 v = buf.getvalue() 00169 new_msg_id = self.fs.put(v) 00170 old_blob_id = entry['blob_id'] 00171 entry['blob_id'] = new_msg_id 00172 if metadata: 00173 entry.update(metadata) 00174 self.coll.save(entry, safe=True) 00175 if old_blob_id: 00176 self.fs.delete(old_blob_id) 00177 00178 def count(self): 00179 return self.coll.count() 00180 00181 00182 00183 00184 00185 00186 00187