message_collection_impl.h
Go to the documentation of this file.
00001 /*
00002  * Copyright (c) 2008, Willow Garage, Inc.
00003  * All rights reserved.
00004  *
00005  * Redistribution and use in source and binary forms, with or without
00006  * modification, are permitted provided that the following conditions are met:
00007  *
00008  *     * Redistributions of source code must retain the above copyright
00009  *       notice, this list of conditions and the following disclaimer.
00010  *     * Redistributions in binary form must reproduce the above copyright
00011  *       notice, this list of conditions and the following disclaimer in the
00012  *       documentation and/or other materials provided with the distribution.
00013  *     * Neither the name of the Willow Garage, Inc. nor the names of its
00014  *       contributors may be used to endorse or promote products derived from
00015  *       this software without specific prior written permission.
00016  *
00017  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
00018  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00019  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00020  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
00021  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
00022  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
00023  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
00024  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
00025  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
00026  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00027  * POSSIBILITY OF SUCH DAMAGE.
00028  *
00029  */
00030 
00040 #include <mongo_ros/exceptions.h>
00041 #include <mongo_ros/mongo_ros.h>
00042 #include <std_msgs/String.h>
00043 #include <boost/format.hpp>
00044 #include <boost/foreach.hpp>
00045 
00046 namespace mongo_ros
00047 {
00048 
00049 using std::string;
00050 using std::vector;
00051 namespace ser=ros::serialization;
00052 namespace mt=ros::message_traits;
00053 
00054 
00055 template <class M>
00056 MessageCollection<M>::MessageCollection (const string& db,
00057                                          const string& coll,
00058                                          const string& db_host,
00059                                          const unsigned db_port,
00060                                          const float timeout) :
00061   ns_(db+"."+coll),
00062   insertion_pub_(nh_.advertise<std_msgs::String>("warehouse/"+db+"/"+coll+"/inserts", 100))
00063 {
00064   initialize(db, coll, db_host, db_port, timeout);
00065 }
00066 
00067 
00068 template <class M>
00069 void MessageCollection<M>::initialize (const string& db, const string& coll,
00070                                        const string& host, const unsigned port,
00071                                        const float timeout)
00072 {
00073   conn_ = makeDbConnection(nh_, host, port, timeout);
00074   
00075   gfs_.reset(new mongo::GridFS(*conn_, db));
00076   ROS_DEBUG_NAMED ("create_collection", "Constructed collection");
00077 
00078   ensureIndex("creation_time");
00079 
00080   // Add to the metatable
00081   const string meta_ns = db+".ros_message_collections";
00082   if (!conn_->count(meta_ns, BSON("name" << coll)))
00083   {
00084     ROS_DEBUG_NAMED ("create_collection", "Inserting metadata");
00085     typedef typename mt::DataType<M> DataType;
00086     const string datatype = DataType().value();
00087     typedef typename mt::MD5Sum<M> Md5;
00088     const string md5 = Md5().value();
00089     conn_->insert(meta_ns, BSON("name" << coll << "type" << datatype
00090                                 << "md5sum" << md5));
00091   }
00092   else
00093     ROS_DEBUG_NAMED("create_collection", "Not inserting metadata");
00094   
00095   if (insertion_pub_.getNumSubscribers()==0)
00096   {
00097     ros::WallDuration d(0.1);
00098     ROS_DEBUG_STREAM_NAMED ("create_collection",
00099                             "Waiting " << d.toSec() <<
00100                             " for any additional notification subscribers");
00101     d.sleep();
00102   }
00103 }
00104 
00105 template <class M>
00106 MessageCollection<M>& MessageCollection<M>::ensureIndex
00107 (const string& field)
00108 {
00109   conn_->ensureIndex(ns_, BSON(field << 1));
00110   return *this;
00111 }
00112 
00113 template <class M>
00114 void MessageCollection<M>::insert
00115 (const M& msg, const Metadata& metadata)
00116 {
00118   const mongo::BSONObj bson = metadata;
00119   mongo::OID id;
00120   bson["_id"].Val(id);
00121   
00123   const size_t serial_size = ser::serializationLength(msg);
00124   boost::shared_array<uint8_t> buffer(new uint8_t[serial_size]);
00125   ser::OStream stream(buffer.get(), serial_size);
00126   ser::serialize(stream, msg);
00127   const char* data = (char*) buffer.get();
00128 
00129   // Store in grid fs
00130   mongo::BSONObj file_obj = gfs_->storeFile(data, serial_size, id.toString());
00131 
00132   // Add blob id to metadata and store it in the message collection
00133   mongo::BSONObjBuilder builder;
00134   builder.appendElements(bson);
00135   mongo::OID blob_id;
00136   file_obj["_id"].Val(blob_id);
00137   builder.append("blob_id", blob_id);
00138   mongo::BSONObj entry = builder.obj();
00139   conn_->insert(ns_, entry);
00140   
00141 
00142   // Publish ROS notification
00143   std_msgs::String notification;
00144   notification.data = entry.jsonString();
00145   insertion_pub_.publish(notification);
00146 }
00147 
00148 template <class M>
00149 typename QueryResults<M>::range_t
00150 MessageCollection<M>::queryResults (const mongo::Query& query,
00151                                     const bool metadata_only,
00152                                     const string& sort_by,
00153                                     const bool ascending) const
00154 {
00155   mongo::Query copy(query.obj);
00156   if (sort_by.size() > 0)
00157     copy.sort(sort_by, ascending ? 1 : -1);
00158   return typename QueryResults<M>::range_t
00159     (ResultIterator<M>(conn_, ns_, copy, gfs_, metadata_only),
00160      ResultIterator<M>());
00161 }
00162 
00163 
00164 template <class M>
00165 vector <typename MessageWithMetadata<M>::ConstPtr>
00166 MessageCollection<M>::pullAllResults (const mongo::Query& query,
00167                                       const bool metadata_only,
00168                                       const string& sort_by,
00169                                       const bool ascending) const
00170 {
00171   typename QueryResults<M>::range_t res = queryResults(query, metadata_only,
00172                                                        sort_by, ascending);
00173   return vector<typename MessageWithMetadata<M>::ConstPtr>
00174     (res.first, res.second);
00175 }
00176 
00177 
00178 template <class M>
00179 typename MessageWithMetadata<M>::ConstPtr
00180 MessageCollection<M>::findOne (const Query& q,const bool metadata_only) const
00181 {
00182   typename QueryResults<M>::range_t res = queryResults(q, metadata_only);
00183   if (res.first==res.second)
00184     throw NoMatchingMessageException(ns_);
00185   return *res.first;
00186 }
00187 
00188 
00189 template <class M>
00190 unsigned MessageCollection<M>::removeMessages (const mongo::Query& query)
00191 {
00192   unsigned num_removed = 0;
00193   typedef typename MessageWithMetadata<M>::ConstPtr Msg;
00194   vector<Msg> msgs = pullAllResults(query, true);
00195   conn_->remove(ns_, query);
00196 
00197   // Also remove the raw messages from gridfs
00198   BOOST_FOREACH (Msg m, msgs)
00199   {
00200     mongo::OID id;
00201     m->metadata["_id"].Val(id);
00202     gfs_->removeFile(id.toString());
00203     num_removed++;
00204   }
00205 
00206   return num_removed;
00207 }
00208 
00209 template <class M>
00210 void MessageCollection<M>::modifyMetadata (const Query& q, const Metadata& m)
00211 {
00212   typename MessageWithMetadata<M>::ConstPtr orig = findOne(q, true);
00213   
00214   mongo::BSONObjBuilder new_meta_builder;
00215 
00216   std::set<std::string> fields;
00217   m.getFieldNames(fields);
00218 
00219   BOOST_FOREACH (const string& f, fields) 
00220   {
00221     if ((f!="_id") && (f!="creation_time")) 
00222       new_meta_builder.append(BSON("$set" << BSON(f << m.getField(f))).\
                              getField("$set"));
00223   }
00224 
00225   mongo::BSONObj new_meta = new_meta_builder.obj().copy();
00226   conn_->update(ns_, q, new_meta);
00227 }
00228 
00229 template <class M>
00230 unsigned MessageCollection<M>::count ()
00231 {
00232   return conn_->count(ns_);
00233 }
00234 
00235 } // namespace
00236 
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Friends


mongo_ros
Author(s): Bhaskara Marthi
autogenerated on Mon Sep 2 2013 11:11:08