00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00042 #include <mongo_ros/exceptions.h>
00043 #include <mongo_ros/mongo_ros.h>
00044 #include <std_msgs/String.h>
00045 #include <boost/format.hpp>
00046 #include <boost/foreach.hpp>
00047
00048 namespace mongo_ros
00049 {
00050
00051 using std::string;
00052 using std::vector;
00053 namespace ser=ros::serialization;
00054 namespace mt=ros::message_traits;
00055
00056
00057 template <class M>
00058 MessageCollection<M>::MessageCollection (const string& db,
00059 const string& coll,
00060 const string& db_host,
00061 const unsigned db_port,
00062 const float timeout) :
00063 ns_(db+"."+coll),
00064 md5sum_matches_(true),
00065 insertion_pub_(nh_.advertise<std_msgs::String>("warehouse/"+db+"/"+coll+\
00066 "/inserts", 100, true))
00067 {
00068 initialize(db, coll, db_host, db_port, timeout);
00069 }
00070
00071
00072 template <class M>
00073 void MessageCollection<M>::initialize (const string& db, const string& coll,
00074 const string& host, const unsigned port,
00075 const float timeout)
00076 {
00077 conn_ = makeDbConnection(nh_, host, port, timeout);
00078
00079 gfs_.reset(new mongo::GridFS(*conn_, db));
00080 ROS_DEBUG_NAMED ("create_collection", "Constructed collection");
00081
00082 ensureIndex("creation_time");
00083
00084
00085 const string meta_ns = db+".ros_message_collections";
00086 if (!conn_->count(meta_ns, BSON("name" << coll)))
00087 {
00088 ROS_DEBUG_NAMED ("create_collection", "Inserting metadata");
00089 typedef typename mt::DataType<M> DataType;
00090 const string datatype = DataType().value();
00091 typedef typename mt::MD5Sum<M> Md5;
00092 const string md5 = Md5().value();
00093 conn_->insert(meta_ns, BSON("name" << coll << "type" << datatype
00094 << "md5sum" << md5));
00095 }
00096 else
00097 {
00098 ROS_DEBUG_NAMED("create_collection", "Not inserting metadata");
00099 typedef typename mt::MD5Sum<M> Md5;
00100 const string md5 = Md5().value();
00101 if (!conn_->count(meta_ns, BSON("name" << coll << "md5sum" << md5)))
00102 {
00103 md5sum_matches_ = false;
00104 typedef typename mt::DataType<M> DataType;
00105 const string datatype = DataType().value();
00106 ROS_ERROR("The md5 sum for message %s changed to %s. Only reading metadata.",
00107 datatype.c_str(), md5.c_str());
00108 }
00109 }
00110
00111 if (insertion_pub_.getNumSubscribers()==0)
00112 {
00113 ros::WallDuration d(0.1);
00114 ROS_DEBUG_STREAM_NAMED ("create_collection",
00115 "Waiting " << d.toSec() <<
00116 " for any additional notification subscribers");
00117 d.sleep();
00118 }
00119 }
00120
00121 template <class M>
00122 MessageCollection<M>& MessageCollection<M>::ensureIndex
00123 (const string& field)
00124 {
00125 conn_->ensureIndex(ns_, BSON(field << 1));
00126 return *this;
00127 }
00128
00129 template <class M>
00130 void MessageCollection<M>::insert
00131 (const M& msg, const Metadata& metadata)
00132 {
00133 if (!md5sum_matches_)
00134 throw Md5SumException("Cannot insert additional elements.");
00135
00137 const mongo::BSONObj bson = metadata;
00138 mongo::OID id;
00139 bson["_id"].Val(id);
00140
00142 const size_t serial_size = ser::serializationLength(msg);
00143 boost::shared_array<uint8_t> buffer(new uint8_t[serial_size]);
00144 ser::OStream stream(buffer.get(), serial_size);
00145 ser::serialize(stream, msg);
00146 const char* data = (char*) buffer.get();
00147
00148
00149 mongo::BSONObj file_obj = gfs_->storeFile(data, serial_size, id.toString());
00150
00151
00152 mongo::BSONObjBuilder builder;
00153 builder.appendElements(bson);
00154 mongo::OID blob_id;
00155 file_obj["_id"].Val(blob_id);
00156 builder.append("blob_id", blob_id);
00157 mongo::BSONObj entry = builder.obj();
00158 conn_->insert(ns_, entry);
00159
00160
00161
00162 std_msgs::String notification;
00163 notification.data = entry.jsonString();
00164 insertion_pub_.publish(notification);
00165 }
00166
00167 template <class M>
00168 typename QueryResults<M>::range_t
00169 MessageCollection<M>::queryResults (const mongo::Query& query,
00170 const bool metadata_only,
00171 const string& sort_by,
00172 const bool ascending) const
00173 {
00174 if (!md5sum_matches_ && !metadata_only)
00175 throw Md5SumException("Can only query metadata.");
00176
00177 mongo::Query copy(query.obj);
00178 ROS_DEBUG_NAMED("query", "Sending query %s to %s", copy.toString().c_str(),
00179 ns_.c_str());
00180
00181 if (sort_by.size() > 0)
00182 copy.sort(sort_by, ascending ? 1 : -1);
00183 return typename QueryResults<M>::range_t
00184 (ResultIterator<M>(conn_, ns_, copy, gfs_, metadata_only),
00185 ResultIterator<M>());
00186 }
00187
00188
00189 template <class M>
00190 vector <typename MessageWithMetadata<M>::ConstPtr>
00191 MessageCollection<M>::pullAllResults (const mongo::Query& query,
00192 const bool metadata_only,
00193 const string& sort_by,
00194 const bool ascending) const
00195 {
00196 typename QueryResults<M>::range_t res = queryResults(query, metadata_only,
00197 sort_by, ascending);
00198 return vector<typename MessageWithMetadata<M>::ConstPtr>
00199 (res.first, res.second);
00200 }
00201
00202
00203 template <class M>
00204 typename MessageWithMetadata<M>::ConstPtr
00205 MessageCollection<M>::findOne (const Query& q,const bool metadata_only) const
00206 {
00207 typename QueryResults<M>::range_t res = queryResults(q, metadata_only);
00208 if (res.first==res.second)
00209 throw NoMatchingMessageException(ns_);
00210 return *res.first;
00211 }
00212
00213
00214 template <class M>
00215 unsigned MessageCollection<M>::removeMessages (const mongo::Query& query)
00216 {
00217 unsigned num_removed = 0;
00218 typedef typename MessageWithMetadata<M>::ConstPtr Msg;
00219 vector<Msg> msgs = pullAllResults(query, true);
00220 conn_->remove(ns_, query);
00221
00222
00223 BOOST_FOREACH (Msg m, msgs)
00224 {
00225 mongo::OID id;
00226 m->metadata["_id"].Val(id);
00227 gfs_->removeFile(id.toString());
00228 num_removed++;
00229 }
00230
00231 return num_removed;
00232 }
00233
00234 template <class M>
00235 void MessageCollection<M>::modifyMetadata (const Query& q, const Metadata& m)
00236 {
00237 typename MessageWithMetadata<M>::ConstPtr orig = findOne(q, true);
00238
00239 mongo::BSONObjBuilder new_meta_builder;
00240
00241 std::set<std::string> fields;
00242 m.getFieldNames(fields);
00243
00244 BOOST_FOREACH (const string& f, fields)
00245 {
00246 if ((f!="_id") && (f!="creation_time"))
00247 new_meta_builder.append(BSON("$set" << BSON(f << m.getField(f))).\
getField("$set"));
00248 }
00249
00250 mongo::BSONObj new_meta = new_meta_builder.obj().copy();
00251 conn_->update(ns_, q, new_meta);
00252 }
00253
00254 template <class M>
00255 unsigned MessageCollection<M>::count ()
00256 {
00257 return conn_->count(ns_);
00258 }
00259
00260 template <class M>
00261 bool MessageCollection<M>::md5SumMatches () const
00262 {
00263 return md5sum_matches_;
00264 }
00265
00266 }
00267