message_collection_impl.hpp
Go to the documentation of this file.
00001 /*
00002  * Copyright (c) 2008, Willow Garage, Inc.
00003  * All rights reserved.
00004  *
00005  * Redistribution and use in source and binary forms, with or without
00006  * modification, are permitted provided that the following conditions are met:
00007  *
00008  *     * Redistributions of source code must retain the above copyright
00009  *       notice, this list of conditions and the following disclaimer.
00010  *     * Redistributions in binary form must reproduce the above copyright
00011  *       notice, this list of conditions and the following disclaimer in the
00012  *       documentation and/or other materials provided with the distribution.
00013  *     * Neither the name of the Willow Garage, Inc. nor the names of its
00014  *       contributors may be used to endorse or promote products derived from
00015  *       this software without specific prior written permission.
00016  *
00017  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
00018  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00019  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00020  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
00021  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
00022  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
00023  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
00024  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
00025  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
00026  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00027  * POSSIBILITY OF SUCH DAMAGE.
00028  *
00029  */
00030 
00042 #include <mongo_ros/exceptions.h>
00043 #include <mongo_ros/mongo_ros.h>
00044 #include <std_msgs/String.h>
00045 #include <boost/format.hpp>
00046 #include <boost/foreach.hpp>
00047 
00048 namespace mongo_ros
00049 {
00050 
00051 using std::string;
00052 using std::vector;
00053 namespace ser=ros::serialization;
00054 namespace mt=ros::message_traits;
00055 
00056 
00057 template <class M>
00058 MessageCollection<M>::MessageCollection (const string& db,
00059                                          const string& coll,
00060                                          const string& db_host,
00061                                          const unsigned db_port,
00062                                          const float timeout) :
00063   ns_(db+"."+coll),
00064   md5sum_matches_(true),
00065   insertion_pub_(nh_.advertise<std_msgs::String>("warehouse/"+db+"/"+coll+\
00066                                                  "/inserts", 100, true))
00067 {
00068   initialize(db, coll, db_host, db_port, timeout);
00069 }
00070 
00071 
00072 template <class M>
00073 void MessageCollection<M>::initialize (const string& db, const string& coll,
00074                                        const string& host, const unsigned port,
00075                                        const float timeout)
00076 {
00077   conn_ = makeDbConnection(nh_, host, port, timeout);
00078   
00079   gfs_.reset(new mongo::GridFS(*conn_, db));
00080   ROS_DEBUG_NAMED ("create_collection", "Constructed collection");
00081 
00082   ensureIndex("creation_time");
00083 
00084   // Add to the metatable
00085   const string meta_ns = db+".ros_message_collections";
00086   if (!conn_->count(meta_ns, BSON("name" << coll)))
00087   {
00088     ROS_DEBUG_NAMED ("create_collection", "Inserting metadata");
00089     typedef typename mt::DataType<M> DataType;
00090     const string datatype = DataType().value();
00091     typedef typename mt::MD5Sum<M> Md5;
00092     const string md5 = Md5().value();
00093     conn_->insert(meta_ns, BSON("name" << coll << "type" << datatype
00094                                 << "md5sum" << md5));
00095   }
00096   else
00097   {
00098     ROS_DEBUG_NAMED("create_collection", "Not inserting metadata");
00099     typedef typename mt::MD5Sum<M> Md5;
00100     const string md5 = Md5().value();
00101     if (!conn_->count(meta_ns, BSON("name" << coll << "md5sum" << md5)))
00102     {
00103       md5sum_matches_ = false;
00104       typedef typename mt::DataType<M> DataType;
00105       const string datatype = DataType().value();
00106       ROS_ERROR("The md5 sum for message %s changed to %s. Only reading metadata.", 
00107                 datatype.c_str(), md5.c_str());
00108     }
00109   }
00110   
00111   if (insertion_pub_.getNumSubscribers()==0)
00112   {
00113     ros::WallDuration d(0.1);
00114     ROS_DEBUG_STREAM_NAMED ("create_collection",
00115                             "Waiting " << d.toSec() <<
00116                             " for any additional notification subscribers");
00117     d.sleep();
00118   }
00119 }
00120 
00121 template <class M>
00122 MessageCollection<M>& MessageCollection<M>::ensureIndex
00123 (const string& field)
00124 {
00125   conn_->ensureIndex(ns_, BSON(field << 1));
00126   return *this;
00127 }
00128 
00129 template <class M>
00130 void MessageCollection<M>::insert
00131 (const M& msg, const Metadata& metadata)
00132 {
00133   if (!md5sum_matches_)
00134     throw Md5SumException("Cannot insert additional elements.");
00135   
00137   const mongo::BSONObj bson = metadata;
00138   mongo::OID id;
00139   bson["_id"].Val(id);
00140   
00142   const size_t serial_size = ser::serializationLength(msg);
00143   boost::shared_array<uint8_t> buffer(new uint8_t[serial_size]);
00144   ser::OStream stream(buffer.get(), serial_size);
00145   ser::serialize(stream, msg);
00146   const char* data = (char*) buffer.get();
00147 
00148   // Store in grid fs
00149   mongo::BSONObj file_obj = gfs_->storeFile(data, serial_size, id.toString());
00150 
00151   // Add blob id to metadata and store it in the message collection
00152   mongo::BSONObjBuilder builder;
00153   builder.appendElements(bson);
00154   mongo::OID blob_id;
00155   file_obj["_id"].Val(blob_id);
00156   builder.append("blob_id", blob_id);
00157   mongo::BSONObj entry = builder.obj();
00158   conn_->insert(ns_, entry);
00159   
00160 
00161   // Publish ROS notification
00162   std_msgs::String notification;
00163   notification.data = entry.jsonString();
00164   insertion_pub_.publish(notification);
00165 }
00166 
00167 template <class M>
00168 typename QueryResults<M>::range_t
00169 MessageCollection<M>::queryResults (const mongo::Query& query,
00170                                     const bool metadata_only,
00171                                     const string& sort_by,
00172                                     const bool ascending) const
00173 {
00174   if (!md5sum_matches_ && !metadata_only)
00175     throw Md5SumException("Can only query metadata.");
00176   
00177   mongo::Query copy(query.obj);
00178   ROS_DEBUG_NAMED("query", "Sending query %s to %s", copy.toString().c_str(),
00179                   ns_.c_str());
00180                   
00181   if (sort_by.size() > 0)
00182     copy.sort(sort_by, ascending ? 1 : -1);
00183   return typename QueryResults<M>::range_t
00184     (ResultIterator<M>(conn_, ns_, copy, gfs_, metadata_only),
00185      ResultIterator<M>());
00186 }
00187 
00188 
00189 template <class M>
00190 vector <typename MessageWithMetadata<M>::ConstPtr>
00191 MessageCollection<M>::pullAllResults (const mongo::Query& query,
00192                                       const bool metadata_only,
00193                                       const string& sort_by,
00194                                       const bool ascending) const
00195 {  
00196   typename QueryResults<M>::range_t res = queryResults(query, metadata_only,
00197                                                        sort_by, ascending);
00198   return vector<typename MessageWithMetadata<M>::ConstPtr>
00199     (res.first, res.second);
00200 }
00201 
00202 
00203 template <class M>
00204 typename MessageWithMetadata<M>::ConstPtr
00205 MessageCollection<M>::findOne (const Query& q,const bool metadata_only) const
00206 {
00207   typename QueryResults<M>::range_t res = queryResults(q, metadata_only);
00208   if (res.first==res.second)
00209     throw NoMatchingMessageException(ns_);
00210   return *res.first;
00211 }
00212 
00213 
00214 template <class M>
00215 unsigned MessageCollection<M>::removeMessages (const mongo::Query& query)
00216 {
00217   unsigned num_removed = 0;
00218   typedef typename MessageWithMetadata<M>::ConstPtr Msg;
00219   vector<Msg> msgs = pullAllResults(query, true);
00220   conn_->remove(ns_, query);
00221 
00222   // Also remove the raw messages from gridfs
00223   BOOST_FOREACH (Msg m, msgs)
00224   {
00225     mongo::OID id;
00226     m->metadata["_id"].Val(id);
00227     gfs_->removeFile(id.toString());
00228     num_removed++;
00229   }
00230 
00231   return num_removed;
00232 }
00233 
00234 template <class M>
00235 void MessageCollection<M>::modifyMetadata (const Query& q, const Metadata& m)
00236 {
00237   typename MessageWithMetadata<M>::ConstPtr orig = findOne(q, true);
00238   
00239   mongo::BSONObjBuilder new_meta_builder;
00240 
00241   std::set<std::string> fields;
00242   m.getFieldNames(fields);
00243 
00244   BOOST_FOREACH (const string& f, fields) 
00245   {
00246     if ((f!="_id") && (f!="creation_time")) 
00247       new_meta_builder.append(BSON("$set" << BSON(f << m.getField(f))).\
                              getField("$set"));
00248   }
00249 
00250   mongo::BSONObj new_meta = new_meta_builder.obj().copy();
00251   conn_->update(ns_, q, new_meta);
00252 }
00253 
00254 template <class M>
00255 unsigned MessageCollection<M>::count ()
00256 {
00257   return conn_->count(ns_);
00258 }
00259 
00260 template <class M>
00261 bool MessageCollection<M>::md5SumMatches () const
00262 {
00263   return md5sum_matches_;
00264 }
00265 
00266 } // namespace
00267 


warehouse_ros
Author(s): Bhaskara Marthi
autogenerated on Mon Oct 6 2014 08:51:55