message_collection.py
Go to the documentation of this file.
00001 # Software License Agreement (BSD License)
00002 #
00003 # Copyright (c) 2008, Willow Garage, Inc.
00004 # All rights reserved.
00005 #
00006 # Redistribution and use in source and binary forms, with or without
00007 # modification, are permitted provided that the following conditions
00008 # are met:
00009 #
00010 #  * Redistributions of source code must retain the above copyright
00011 #    notice, this list of conditions and the following disclaimer.
00012 #  * Redistributions in binary form must reproduce the above
00013 #    copyright notice, this list of conditions and the following
00014 #    disclaimer in the documentation and/or other materials provided
00015 #    with the distribution.
00016 #  * Neither the name of Willow Garage, Inc. nor the names of its
00017 #    contributors may be used to endorse or promote products derived
00018 #    from this software without specific prior written permission.
00019 #
00020 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00021 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00022 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00023 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00024 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00025 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00026 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00027 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00028 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00029 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00030 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00031 # POSSIBILITY OF SUCH DAMAGE.
00032 #
00033 # Author: Bhaskara Marthi
00034 
00035 # Collection of messages stored in a Mongo table and GridFS
00036 
00037 import pymongo as pm
00038 import gridfs as gfs
00039 import rospy
00040 import StringIO
00041 import std_msgs.msg
00042 import json
00043 import bson.json_util
00044 
00045 class MessageCollection:
00046 
00047     def __init__(self, db, coll, msg_class,
00048                  db_host=None, db_port=None, indexes=[]):
00049         """
00050         @param db: Name of database
00051         @param coll: Name of collection
00052         @param indexes: List of fields to build indexes on.
00053         @param msg_class: The class of the message object being stored
00054         @param db_host: The host where the db server is listening.
00055         @param db_port: The port on which the db server is listening.
00056 
00057         Creates collection, db, and indexes if don't already exist.
00058         The database host and port are set to the provided values if given.
00059         If not, the ROS parameters warehouse_host and warehouse_port are used,
00060         and these in turn default to localhost and 27017.
00061         """
00062 
00063         # Connect to mongo
00064         self.host = db_host or rospy.get_param('warehouse_host', 'localhost')
00065         self.port = db_port or rospy.get_param('warehouse_port', 27017)
00066         while not rospy.is_shutdown():
00067             try:
00068                 self.conn = pm.Connection(self.host, self.port)
00069                 break
00070             except:
00071                 rospy.loginfo( "Attempting to connect to mongodb @ {0}:{1}".\
00072                                format(self.host,self.port))
00073                 rospy.sleep(2.0)
00074 
00075         # Set up db, collection, gridfs
00076         self.db = self.conn[db]
00077         self.coll = self.db[coll]
00078         self.fs = gfs.GridFS(self.db)
00079         self.msg_class = msg_class
00080 
00081         # Indexes
00082         for ind in indexes:
00083             self.ensure_index(ind)
00084         self.ensure_index('creation_time')
00085 
00086         # Add to the metatable
00087 
00088         # Set up insertion pub
00089         insertion_topic = 'warehouse/{0}/{1}/inserts'.format(db, coll)
00090         self.insertion_pub = rospy.Publisher(insertion_topic, std_msgs.msg.String,
00091                                              latch=True, queue_size=5)
00092         
00093 
00094     def ensure_index(self, ind, **kwargs):
00095         info = self.coll.index_information()
00096         if ind in info:
00097             rospy.logdebug("Index {0} already exists".format(ind))
00098         else:
00099             kwargs['name'] = ind
00100             self.coll.ensure_index(ind, **kwargs)
00101 
00102 
00103     def insert(self, m, metadata={}, **kwargs):
00104         """
00105         @param m: Message to insert
00106         @param metadata: Dictionary of metadata to associate with message
00107         """
00108         # Insert raw message into gridFS
00109         buff = StringIO.StringIO()
00110         m.serialize(buff)
00111         v = buff.getvalue()
00112         msg_id = self.fs.put(v)
00113 
00114         # Create db entry
00115         entry= metadata.copy()
00116         entry['blob_id'] = msg_id
00117         entry['creation_time'] = rospy.Time.now().to_sec()
00118 
00119         # Insert message info
00120         self.coll.insert(entry, **kwargs)
00121 
00122         # Publish ros notification
00123         s = json.dumps(entry, default=bson.json_util.default)
00124         self.insertion_pub.publish(s)
00125 
00126 
00127     def query(self, query, metadata_only=False, sort_by='', ascending=True):
00128         """
00129         Perform a query.
00130 
00131         @return: Iterator over tuples (message, metadata) if metadata_only is
00132         False, or iterator over metadata if it's true
00133         """
00134         if sort_by:
00135             results = self.coll.find(query, sort=[(sort_by, pm.ASCENDING if
00136                 ascending else pm.DESCENDING)])
00137         else:
00138             results = self.coll.find(query)
00139             
00140         if metadata_only:
00141             return results
00142         else:
00143             return (self.process_entry(r) for r in results)
00144 
00145     def find_one(self, query, metadata_only=False, sort_by='', ascending=True):
00146         """
00147         Like query except returns a single matching item, or None if
00148         no item exists
00149         """
00150         return next(self.query(query, metadata_only, sort_by, ascending), None)
00151 
00152     def remove(self, query):
00153         "Remove items matching query and return number of removed items."
00154         num_removed = 0
00155         for item in self.query(query, metadata_only=True):
00156             self.coll.remove(item['_id'])
00157             num_removed += 1
00158             self.fs.delete(item['blob_id'])
00159         return num_removed
00160         
00161     def process_entry(self, r):
00162         blob = self.fs.get(r['blob_id'])
00163         msg = self.msg_class()
00164         msg.deserialize(blob.read())
00165         return msg, r
00166 
00167     def update(self, entry, metadata=None, msg=None):
00168         """
00169         Update a message and/or metadata.
00170 
00171         @param entry: The existing metadata entry
00172         @param metadata: Updates to metadata.  These are merged with the existing dictionary entries.
00173         @param msg: If specified, a new message object to store in place of the current one.
00174         """
00175         old_blob_id = None
00176         if msg:
00177             buf = StringIO.StringIO()
00178             msg.serialize(buf)
00179             v = buf.getvalue()
00180             new_msg_id = self.fs.put(v)
00181             old_blob_id = entry['blob_id']
00182             entry['blob_id'] = new_msg_id
00183         if metadata:
00184             entry.update(metadata)
00185         self.coll.save(entry, safe=True)
00186         if old_blob_id:
00187             self.fs.delete(old_blob_id)
00188 
00189     def count(self):
00190         return self.coll.count()
00191 
00192             
00193         
00194         
00195 
00196     
00197         
00198               


warehouse_ros
Author(s): Bhaskara Marthi
autogenerated on Wed Aug 26 2015 16:44:56