00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00040 #include <mongo_ros/exceptions.h>
00041 #include <mongo_ros/mongo_ros.h>
00042 #include <std_msgs/String.h>
00043 #include <boost/format.hpp>
00044 #include <boost/foreach.hpp>
00045
00046 namespace mongo_ros
00047 {
00048
00049 using std::string;
00050 using std::vector;
00051 namespace ser=ros::serialization;
00052 namespace mt=ros::message_traits;
00053
00054
00055 template <class M>
00056 MessageCollection<M>::MessageCollection (const string& db,
00057 const string& coll,
00058 const string& db_host,
00059 const unsigned db_port) :
00060 ns_(db+"."+coll),
00061 insertion_pub_(nh_.advertise<std_msgs::String>("warehouse/"+db+"/"+coll+"/inserts", 100))
00062 {
00063 initialize(db, coll, db_host, db_port);
00064 }
00065
00066
00067 template <class M>
00068 void MessageCollection<M>::initialize (const string& db, const string& coll,
00069 const string& host,
00070 const unsigned port)
00071 {
00072 conn_ = makeDbConnection(nh_, host, port);
00073
00074 gfs_.reset(new mongo::GridFS(*conn_, db));
00075 ROS_DEBUG_NAMED ("create_collection", "Constructed collection");
00076
00077 ensureIndex("creation_time");
00078
00079
00080 const string meta_ns = db+".ros_message_collections";
00081 if (!conn_->count(meta_ns, BSON("name" << coll)))
00082 {
00083 ROS_DEBUG_NAMED ("create_collection", "Inserting metadata");
00084 typedef typename mt::DataType<M> DataType;
00085 const string datatype = DataType().value();
00086 typedef typename mt::MD5Sum<M> Md5;
00087 const string md5 = Md5().value();
00088 conn_->insert(meta_ns, BSON("name" << coll << "type" << datatype
00089 << "md5sum" << md5));
00090 }
00091 else
00092 ROS_DEBUG_NAMED("create_collection", "Not inserting metadata");
00093
00094 if (insertion_pub_.getNumSubscribers()==0)
00095 {
00096 ros::WallDuration d(0.1);
00097 ROS_DEBUG_STREAM_NAMED ("create_collection",
00098 "Waiting " << d.toSec() <<
00099 " for any additional notification subscribers");
00100 d.sleep();
00101 }
00102 }
00103
00104 template <class M>
00105 MessageCollection<M>& MessageCollection<M>::ensureIndex
00106 (const string& field)
00107 {
00108 conn_->ensureIndex(ns_, BSON(field << 1));
00109 return *this;
00110 }
00111
00112 template <class M>
00113 void MessageCollection<M>::insert
00114 (const M& msg, const Metadata& metadata)
00115 {
00117 const mongo::BSONObj bson = metadata;
00118 mongo::OID id;
00119 bson["_id"].Val(id);
00120
00122 const size_t serial_size = ser::serializationLength(msg);
00123 boost::shared_array<uint8_t> buffer(new uint8_t[serial_size]);
00124 ser::OStream stream(buffer.get(), serial_size);
00125 ser::serialize(stream, msg);
00126 const char* data = (char*) buffer.get();
00127
00128
00129 mongo::BSONObj file_obj = gfs_->storeFile(data, serial_size, id.toString());
00130
00131
00132 mongo::BSONObjBuilder builder;
00133 builder.appendElements(bson);
00134 mongo::OID blob_id;
00135 file_obj["_id"].Val(blob_id);
00136 builder.append("blob_id", blob_id);
00137 mongo::BSONObj entry = builder.obj();
00138 conn_->insert(ns_, entry);
00139
00140
00141
00142 std_msgs::String notification;
00143 notification.data = entry.jsonString();
00144 insertion_pub_.publish(notification);
00145 }
00146
00147 template <class M>
00148 typename QueryResults<M>::range_t
00149 MessageCollection<M>::queryResults (const mongo::BSONObj& query,
00150 const bool metadata_only,
00151 const string& sort_by,
00152 const bool ascending) const
00153 {
00154 mongo::Query bson(query.copy());
00155 if (sort_by.size() > 0)
00156 bson = bson.sort(sort_by, ascending ? 1 : -1);
00157 return queryResults(bson, metadata_only);
00158 }
00159
00160 template <class M>
00161 typename QueryResults<M>::range_t
00162 MessageCollection<M>::queryResults (const mongo::Query& bson,
00163 const bool metadata_only) const
00164 {
00165 return typename QueryResults<M>::range_t
00166 (ResultIterator<M>(conn_, ns_, bson, gfs_, metadata_only),
00167 ResultIterator<M>());
00168 }
00169
00170 template <class M>
00171 vector <typename MessageWithMetadata<M>::ConstPtr>
00172 MessageCollection<M>::pullAllResults (const mongo::BSONObj& query,
00173 const bool metadata_only,
00174 const string& sort_by,
00175 const bool ascending) const
00176 {
00177 typename QueryResults<M>::range_t res = queryResults(query, metadata_only,
00178 sort_by, ascending);
00179 return vector<typename MessageWithMetadata<M>::ConstPtr>
00180 (res.first, res.second);
00181 }
00182
00183
00184 template <class M>
00185 typename MessageWithMetadata<M>::ConstPtr
00186 MessageCollection<M>::findOne (const Query& q,const bool metadata_only) const
00187 {
00188 typename QueryResults<M>::range_t res = queryResults(q, metadata_only);
00189 if (res.first==res.second)
00190 throw NoMatchingMessageException(ns_);
00191 return *res.first;
00192 }
00193
00194
00195 template <class M>
00196 unsigned MessageCollection<M>::removeMessages (const mongo::BSONObj& query)
00197 {
00198 unsigned num_removed = 0;
00199 typedef typename MessageWithMetadata<M>::ConstPtr Msg;
00200 vector<Msg> msgs = pullAllResults(query, true);
00201 conn_->remove(ns_, query);
00202
00203
00204 BOOST_FOREACH (Msg m, msgs)
00205 {
00206 mongo::OID id;
00207 m->metadata["_id"].Val(id);
00208 gfs_->removeFile(id.toString());
00209 num_removed++;
00210 }
00211
00212 return num_removed;
00213 }
00214
00215 template <class M>
00216 void MessageCollection<M>::modifyMetadata (const Query& q, const Metadata& m)
00217 {
00218 typename MessageWithMetadata<M>::ConstPtr orig = findOne(q, true);
00219
00220 mongo::BSONObjBuilder new_meta_builder;
00221
00222 set<string> fields;
00223 m.getFieldNames(fields);
00224
00225 BOOST_FOREACH (const string& f, fields)
00226 {
00227 if ((f!="_id") && (f!="creation_time"))
00228 new_meta_builder.append(BSON("$set" << BSON(f << m.getField(f))).\
00229 getField("$set"));
00230 }
00231
00232 mongo::BSONObj new_meta = new_meta_builder.obj().copy();
00233 conn_->update(ns_, q, new_meta);
00234 }
00235
00236 template <class M>
00237 unsigned MessageCollection<M>::count ()
00238 {
00239 return conn_->count(ns_);
00240 }
00241
00242 }