00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 
00029 
00030 
00042 #include <mongo_ros/exceptions.h>
00043 #include <mongo_ros/mongo_ros.h>
00044 #include <std_msgs/String.h>
00045 #include <boost/format.hpp>
00046 #include <boost/foreach.hpp>
00047 
00048 namespace mongo_ros
00049 {
00050 
00051 using std::string;
00052 using std::vector;
00053 namespace ser=ros::serialization;
00054 namespace mt=ros::message_traits;
00055 
00056 
00057 template <class M>
00058 MessageCollection<M>::MessageCollection (const string& db,
00059                                          const string& coll,
00060                                          const string& db_host,
00061                                          const unsigned db_port,
00062                                          const float timeout) :
00063   ns_(db+"."+coll),
00064   md5sum_matches_(true),
00065   insertion_pub_(nh_.advertise<std_msgs::String>("warehouse/"+db+"/"+coll+\
00066                                                  "/inserts", 100, true))
00067 {
00068   initialize(db, coll, db_host, db_port, timeout);
00069 }
00070 
00071 
00072 template <class M>
00073 void MessageCollection<M>::initialize (const string& db, const string& coll,
00074                                        const string& host, const unsigned port,
00075                                        const float timeout)
00076 {
00077   conn_ = makeDbConnection(nh_, host, port, timeout);
00078   
00079   gfs_.reset(new mongo::GridFS(*conn_, db));
00080   ROS_DEBUG_NAMED ("create_collection", "Constructed collection");
00081 
00082   ensureIndex("creation_time");
00083 
00084   
00085   const string meta_ns = db+".ros_message_collections";
00086   if (!conn_->count(meta_ns, BSON("name" << coll)))
00087   {
00088     ROS_DEBUG_NAMED ("create_collection", "Inserting metadata");
00089     typedef typename mt::DataType<M> DataType;
00090     const string datatype = DataType().value();
00091     typedef typename mt::MD5Sum<M> Md5;
00092     const string md5 = Md5().value();
00093     conn_->insert(meta_ns, BSON("name" << coll << "type" << datatype
00094                                 << "md5sum" << md5));
00095   }
00096   else
00097   {
00098     ROS_DEBUG_NAMED("create_collection", "Not inserting metadata");
00099     typedef typename mt::MD5Sum<M> Md5;
00100     const string md5 = Md5().value();
00101     if (!conn_->count(meta_ns, BSON("name" << coll << "md5sum" << md5)))
00102     {
00103       md5sum_matches_ = false;
00104       typedef typename mt::DataType<M> DataType;
00105       const string datatype = DataType().value();
00106       ROS_ERROR("The md5 sum for message %s changed to %s. Only reading metadata.", 
00107                 datatype.c_str(), md5.c_str());
00108     }
00109   }
00110   
00111   if (insertion_pub_.getNumSubscribers()==0)
00112   {
00113     ros::WallDuration d(0.1);
00114     ROS_DEBUG_STREAM_NAMED ("create_collection",
00115                             "Waiting " << d.toSec() <<
00116                             " for any additional notification subscribers");
00117     d.sleep();
00118   }
00119 }
00120 
00121 template <class M>
00122 MessageCollection<M>& MessageCollection<M>::ensureIndex
00123 (const string& field)
00124 {
00125   conn_->ensureIndex(ns_, BSON(field << 1));
00126   return *this;
00127 }
00128 
00129 template <class M>
00130 void MessageCollection<M>::insert
00131 (const M& msg, const Metadata& metadata)
00132 {
00133   if (!md5sum_matches_)
00134     throw Md5SumException("Cannot insert additional elements.");
00135   
00137   const mongo::BSONObj bson = metadata;
00138   mongo::OID id;
00139   bson["_id"].Val(id);
00140   
00142   const size_t serial_size = ser::serializationLength(msg);
00143   boost::shared_array<uint8_t> buffer(new uint8_t[serial_size]);
00144   ser::OStream stream(buffer.get(), serial_size);
00145   ser::serialize(stream, msg);
00146   const char* data = (char*) buffer.get();
00147 
00148   
00149   mongo::BSONObj file_obj = gfs_->storeFile(data, serial_size, id.toString());
00150 
00151   
00152   mongo::BSONObjBuilder builder;
00153   builder.appendElements(bson);
00154   mongo::OID blob_id;
00155   file_obj["_id"].Val(blob_id);
00156   builder.append("blob_id", blob_id);
00157   mongo::BSONObj entry = builder.obj();
00158   conn_->insert(ns_, entry);
00159   
00160 
00161   
00162   std_msgs::String notification;
00163   notification.data = entry.jsonString();
00164   insertion_pub_.publish(notification);
00165 }
00166 
00167 template <class M>
00168 typename QueryResults<M>::range_t
00169 MessageCollection<M>::queryResults (const mongo::Query& query,
00170                                     const bool metadata_only,
00171                                     const string& sort_by,
00172                                     const bool ascending) const
00173 {
00174   if (!md5sum_matches_ && !metadata_only)
00175     throw Md5SumException("Can only query metadata.");
00176   
00177   mongo::Query copy(query.obj);
00178   ROS_DEBUG_NAMED("query", "Sending query %s to %s", copy.toString().c_str(),
00179                   ns_.c_str());
00180                   
00181   if (sort_by.size() > 0)
00182     copy.sort(sort_by, ascending ? 1 : -1);
00183   return typename QueryResults<M>::range_t
00184     (ResultIterator<M>(conn_, ns_, copy, gfs_, metadata_only),
00185      ResultIterator<M>());
00186 }
00187 
00188 
00189 template <class M>
00190 vector <typename MessageWithMetadata<M>::ConstPtr>
00191 MessageCollection<M>::pullAllResults (const mongo::Query& query,
00192                                       const bool metadata_only,
00193                                       const string& sort_by,
00194                                       const bool ascending) const
00195 {  
00196   typename QueryResults<M>::range_t res = queryResults(query, metadata_only,
00197                                                        sort_by, ascending);
00198   return vector<typename MessageWithMetadata<M>::ConstPtr>
00199     (res.first, res.second);
00200 }
00201 
00202 
00203 template <class M>
00204 typename MessageWithMetadata<M>::ConstPtr
00205 MessageCollection<M>::findOne (const Query& q,const bool metadata_only) const
00206 {
00207   typename QueryResults<M>::range_t res = queryResults(q, metadata_only);
00208   if (res.first==res.second)
00209     throw NoMatchingMessageException(ns_);
00210   return *res.first;
00211 }
00212 
00213 
00214 template <class M>
00215 unsigned MessageCollection<M>::removeMessages (const mongo::Query& query)
00216 {
00217   unsigned num_removed = 0;
00218   typedef typename MessageWithMetadata<M>::ConstPtr Msg;
00219   vector<Msg> msgs = pullAllResults(query, true);
00220   conn_->remove(ns_, query);
00221 
00222   
00223   BOOST_FOREACH (Msg m, msgs)
00224   {
00225     mongo::OID id;
00226     m->metadata["_id"].Val(id);
00227     gfs_->removeFile(id.toString());
00228     num_removed++;
00229   }
00230 
00231   return num_removed;
00232 }
00233 
00234 template <class M>
00235 void MessageCollection<M>::modifyMetadata (const Query& q, const Metadata& m)
00236 {
00237   typename MessageWithMetadata<M>::ConstPtr orig = findOne(q, true);
00238   
00239   mongo::BSONObjBuilder new_meta_builder;
00240 
00241   std::set<std::string> fields;
00242   m.getFieldNames(fields);
00243 
00244   BOOST_FOREACH (const string& f, fields) 
00245   {
00246     if ((f!="_id") && (f!="creation_time")) 
00247       new_meta_builder.append(BSON("$set" << BSON(f << m.getField(f))).\
                              getField("$set"));
00248   }
00249 
00250   mongo::BSONObj new_meta = new_meta_builder.obj().copy();
00251   conn_->update(ns_, q, new_meta);
00252 }
00253 
00254 template <class M>
00255 unsigned MessageCollection<M>::count ()
00256 {
00257   return conn_->count(ns_);
00258 }
00259 
00260 template <class M>
00261 bool MessageCollection<M>::md5SumMatches () const
00262 {
00263   return md5sum_matches_;
00264 }
00265 
00266 } 
00267