message_collection.py
Go to the documentation of this file.
00001 # Software License Agreement (BSD License)
00002 #
00003 # Copyright (c) 2008, Willow Garage, Inc.
00004 # All rights reserved.
00005 #
00006 # Redistribution and use in source and binary forms, with or without
00007 # modification, are permitted provided that the following conditions
00008 # are met:
00009 #
00010 #  * Redistributions of source code must retain the above copyright
00011 #    notice, this list of conditions and the following disclaimer.
00012 #  * Redistributions in binary form must reproduce the above
00013 #    copyright notice, this list of conditions and the following
00014 #    disclaimer in the documentation and/or other materials provided
00015 #    with the distribution.
00016 #  * Neither the name of Willow Garage, Inc. nor the names of its
00017 #    contributors may be used to endorse or promote products derived
00018 #    from this software without specific prior written permission.
00019 #
00020 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00021 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00022 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00023 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00024 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00025 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00026 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00027 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00028 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00029 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00030 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00031 # POSSIBILITY OF SUCH DAMAGE.
00032 #
00033 # Author: Bhaskara Marthi
00034 
00035 # Collection of messages stored in a Mongo table and GridFS
00036 
00037 import pymongo as pm
00038 import gridfs as gfs
00039 import rospy
00040 import StringIO
00041 import std_msgs.msg
00042 import json
00043 import bson.json_util
00044 
00045 class MessageCollection:
00046 
00047     def __init__(self, db, coll, msg_class,
00048                  db_host=None, db_port=None, indexes=[]):
00049         """
00050         @param db: Name of database
00051         @param coll: Name of collection
00052         @param indexes: List of fields to build indexes on.
00053         @param msg_class: The class of the message object being stored
00054         @param db_host: The host where the db server is listening.
00055         @param db_port: The port on which the db server is listening.
00056 
00057         Creates collection, db, and indexes if don't already exist.
00058         The database host and port are set to the provided values if given.
00059         If not, the ROS parameters warehouse_host and warehouse_port are used,
00060         and these in turn default to localhost and 27017.
00061         """
00062 
00063         # Connect to mongo
00064         self.host = db_host or rospy.get_param('warehouse_host', 'localhost')
00065         self.port = db_port or rospy.get_param('warehouse_port', 27017)
00066         while not rospy.is_shutdown():
00067             try:
00068                 self.conn = pm.Connection(self.host, self.port)
00069                 break
00070             except:
00071                 rospy.loginfo( "Attempting to connect to mongodb @ {0}:{1}".\
00072                                format(self.host,self.port))
00073                 rospy.sleep(2.0)
00074 
00075         # Set up db, collection, gridfs
00076         self.db = self.conn[db]
00077         self.coll = self.db[coll]
00078         self.fs = gfs.GridFS(self.db)
00079         self.msg_class = msg_class
00080 
00081         # Indexes
00082         for ind in indexes:
00083             self.ensure_index(ind)
00084         self.ensure_index('creation_time')
00085 
00086         # Add to the metatable
00087 
00088         # Set up insertion pub
00089         insertion_topic = 'warehouse/{0}/{1}/inserts'.format(db, coll)
00090         self.insertion_pub = rospy.Publisher(insertion_topic,
00091                                              std_msgs.msg.String, latch=True)
00092         
00093 
00094     def ensure_index(self, ind):
00095         info = self.coll.index_information()
00096         if ind in info:
00097             rospy.logdebug("Index {0} already exists".format(ind))
00098         else:
00099             self.coll.ensure_index(ind, name=ind)
00100 
00101 
00102     def insert(self, m, metadata={}):
00103         """
00104         @param m: Message to insert
00105         @param metadata: Dictionary of metadata to associate with message
00106         """
00107         # Insert raw message into gridFS
00108         buff = StringIO.StringIO()
00109         m.serialize(buff)
00110         v = buff.getvalue()
00111         msg_id = self.fs.put(v)
00112 
00113         # Create db entry
00114         entry= metadata.copy()
00115         entry['blob_id'] = msg_id
00116         entry['creation_time'] = rospy.Time.now().to_sec()
00117 
00118         # Insert message info
00119         self.coll.insert(entry)
00120 
00121         # Publish ros notification
00122         s = json.dumps(entry, default=bson.json_util.default)
00123         self.insertion_pub.publish(s)
00124 
00125 
00126     def query(self, query, metadata_only=False, sort_by='', ascending=True):
00127         """
00128         Perform a query.
00129 
00130         @return: Iterator over tuples (message, metadata) if metadata_only is
00131         False, or iterator over metadata if it's true
00132         """
00133         if sort_by:
00134             results = self.coll.find(query, sort=[(sort_by, pm.ASCENDING if
00135                 ascending else pm.DESCENDING)])
00136         else:
00137             results = self.coll.find(query)
00138             
00139         if metadata_only:
00140             return results
00141         else:
00142             return (self.process_entry(r) for r in results)
00143 
00144     def find_one(self, query, metadata_only=False, sort_by='', ascending=True):
00145         """
00146         Like query except returns a single matching item, or None if
00147         no item exists
00148         """
00149         return next(self.query(query, metadata_only, sort_by, ascending), None)
00150 
00151     def remove(self, query):
00152         "Remove items matching query and return number of removed items."
00153         num_removed = 0
00154         for item in self.query(query, metadata_only=True):
00155             self.coll.remove(item['_id'])
00156             num_removed += 1
00157             self.fs.delete(item['blob_id'])
00158         return num_removed
00159         
00160     def process_entry(self, r):
00161         blob = self.fs.get(r['blob_id'])
00162         msg = self.msg_class()
00163         msg.deserialize(blob.read())
00164         return msg, r
00165 
00166     def update(self, entry, metadata=None, msg=None):
00167         """
00168         Update a message and/or metadata.
00169 
00170         @param entry: The existing metadata entry
00171         @param metadata: Updates to metadata.  These are merged with the existing dictionary entries.
00172         @param msg: If specified, a new message object to store in place of the current one.
00173         """
00174         old_blob_id = None
00175         if msg:
00176             buf = StringIO.StringIO()
00177             msg.serialize(buf)
00178             v = buf.getvalue()
00179             new_msg_id = self.fs.put(v)
00180             old_blob_id = entry['blob_id']
00181             entry['blob_id'] = new_msg_id
00182         if metadata:
00183             entry.update(metadata)
00184         self.coll.save(entry, safe=True)
00185         if old_blob_id:
00186             self.fs.delete(old_blob_id)
00187 
00188     def count(self):
00189         return self.coll.count()
00190 
00191             
00192         
00193         
00194 
00195     
00196         
00197               
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Friends


mongo_ros
Author(s): Bhaskara Marthi
autogenerated on Tue Sep 3 2013 13:20:12