00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00040 #include <mongo_ros/exceptions.h>
00041 #include <mongo_ros/mongo_ros.h>
00042 #include <std_msgs/String.h>
00043 #include <boost/format.hpp>
00044 #include <boost/foreach.hpp>
00045
00046 namespace mongo_ros
00047 {
00048
00049 using std::string;
00050 using std::vector;
00051 namespace ser=ros::serialization;
00052 namespace mt=ros::message_traits;
00053
00054
00055 template <class M>
00056 MessageCollection<M>::MessageCollection (const string& db,
00057 const string& coll,
00058 const string& db_host,
00059 const unsigned db_port,
00060 const float timeout) :
00061 ns_(db+"."+coll),
00062 insertion_pub_(nh_.advertise<std_msgs::String>("warehouse/"+db+"/"+coll+"/inserts", 100))
00063 {
00064 initialize(db, coll, db_host, db_port, timeout);
00065 }
00066
00067
00068 template <class M>
00069 void MessageCollection<M>::initialize (const string& db, const string& coll,
00070 const string& host, const unsigned port,
00071 const float timeout)
00072 {
00073 conn_ = makeDbConnection(nh_, host, port, timeout);
00074
00075 gfs_.reset(new mongo::GridFS(*conn_, db));
00076 ROS_DEBUG_NAMED ("create_collection", "Constructed collection");
00077
00078 ensureIndex("creation_time");
00079
00080
00081 const string meta_ns = db+".ros_message_collections";
00082 if (!conn_->count(meta_ns, BSON("name" << coll)))
00083 {
00084 ROS_DEBUG_NAMED ("create_collection", "Inserting metadata");
00085 typedef typename mt::DataType<M> DataType;
00086 const string datatype = DataType().value();
00087 typedef typename mt::MD5Sum<M> Md5;
00088 const string md5 = Md5().value();
00089 conn_->insert(meta_ns, BSON("name" << coll << "type" << datatype
00090 << "md5sum" << md5));
00091 }
00092 else
00093 ROS_DEBUG_NAMED("create_collection", "Not inserting metadata");
00094
00095 if (insertion_pub_.getNumSubscribers()==0)
00096 {
00097 ros::WallDuration d(0.1);
00098 ROS_DEBUG_STREAM_NAMED ("create_collection",
00099 "Waiting " << d.toSec() <<
00100 " for any additional notification subscribers");
00101 d.sleep();
00102 }
00103 }
00104
00105 template <class M>
00106 MessageCollection<M>& MessageCollection<M>::ensureIndex
00107 (const string& field)
00108 {
00109 conn_->ensureIndex(ns_, BSON(field << 1));
00110 return *this;
00111 }
00112
00113 template <class M>
00114 void MessageCollection<M>::insert
00115 (const M& msg, const Metadata& metadata)
00116 {
00118 const mongo::BSONObj bson = metadata;
00119 mongo::OID id;
00120 bson["_id"].Val(id);
00121
00123 const size_t serial_size = ser::serializationLength(msg);
00124 boost::shared_array<uint8_t> buffer(new uint8_t[serial_size]);
00125 ser::OStream stream(buffer.get(), serial_size);
00126 ser::serialize(stream, msg);
00127 const char* data = (char*) buffer.get();
00128
00129
00130 mongo::BSONObj file_obj = gfs_->storeFile(data, serial_size, id.toString());
00131
00132
00133 mongo::BSONObjBuilder builder;
00134 builder.appendElements(bson);
00135 mongo::OID blob_id;
00136 file_obj["_id"].Val(blob_id);
00137 builder.append("blob_id", blob_id);
00138 mongo::BSONObj entry = builder.obj();
00139 conn_->insert(ns_, entry);
00140
00141
00142
00143 std_msgs::String notification;
00144 notification.data = entry.jsonString();
00145 insertion_pub_.publish(notification);
00146 }
00147
00148 template <class M>
00149 typename QueryResults<M>::range_t
00150 MessageCollection<M>::queryResults (const mongo::Query& query,
00151 const bool metadata_only,
00152 const string& sort_by,
00153 const bool ascending) const
00154 {
00155 mongo::Query copy(query.obj);
00156 if (sort_by.size() > 0)
00157 copy.sort(sort_by, ascending ? 1 : -1);
00158 return typename QueryResults<M>::range_t
00159 (ResultIterator<M>(conn_, ns_, copy, gfs_, metadata_only),
00160 ResultIterator<M>());
00161 }
00162
00163
00164 template <class M>
00165 vector <typename MessageWithMetadata<M>::ConstPtr>
00166 MessageCollection<M>::pullAllResults (const mongo::Query& query,
00167 const bool metadata_only,
00168 const string& sort_by,
00169 const bool ascending) const
00170 {
00171 typename QueryResults<M>::range_t res = queryResults(query, metadata_only,
00172 sort_by, ascending);
00173 return vector<typename MessageWithMetadata<M>::ConstPtr>
00174 (res.first, res.second);
00175 }
00176
00177
00178 template <class M>
00179 typename MessageWithMetadata<M>::ConstPtr
00180 MessageCollection<M>::findOne (const Query& q,const bool metadata_only) const
00181 {
00182 typename QueryResults<M>::range_t res = queryResults(q, metadata_only);
00183 if (res.first==res.second)
00184 throw NoMatchingMessageException(ns_);
00185 return *res.first;
00186 }
00187
00188
00189 template <class M>
00190 unsigned MessageCollection<M>::removeMessages (const mongo::Query& query)
00191 {
00192 unsigned num_removed = 0;
00193 typedef typename MessageWithMetadata<M>::ConstPtr Msg;
00194 vector<Msg> msgs = pullAllResults(query, true);
00195 conn_->remove(ns_, query);
00196
00197
00198 BOOST_FOREACH (Msg m, msgs)
00199 {
00200 mongo::OID id;
00201 m->metadata["_id"].Val(id);
00202 gfs_->removeFile(id.toString());
00203 num_removed++;
00204 }
00205
00206 return num_removed;
00207 }
00208
00209 template <class M>
00210 void MessageCollection<M>::modifyMetadata (const Query& q, const Metadata& m)
00211 {
00212 typename MessageWithMetadata<M>::ConstPtr orig = findOne(q, true);
00213
00214 mongo::BSONObjBuilder new_meta_builder;
00215
00216 std::set<std::string> fields;
00217 m.getFieldNames(fields);
00218
00219 BOOST_FOREACH (const string& f, fields)
00220 {
00221 if ((f!="_id") && (f!="creation_time"))
00222 new_meta_builder.append(BSON("$set" << BSON(f << m.getField(f))).\
getField("$set"));
00223 }
00224
00225 mongo::BSONObj new_meta = new_meta_builder.obj().copy();
00226 conn_->update(ns_, q, new_meta);
00227 }
00228
00229 template <class M>
00230 unsigned MessageCollection<M>::count ()
00231 {
00232 return conn_->count(ns_);
00233 }
00234
00235 }
00236