$search
00001 /* 00002 * Copyright (c) 2008, Willow Garage, Inc. 00003 * All rights reserved. 00004 * 00005 * Redistribution and use in source and binary forms, with or without 00006 * modification, are permitted provided that the following conditions are met: 00007 * 00008 * * Redistributions of source code must retain the above copyright 00009 * notice, this list of conditions and the following disclaimer. 00010 * * Redistributions in binary form must reproduce the above copyright 00011 * notice, this list of conditions and the following disclaimer in the 00012 * documentation and/or other materials provided with the distribution. 00013 * * Neither the name of the Willow Garage, Inc. nor the names of its 00014 * contributors may be used to endorse or promote products derived from 00015 * this software without specific prior written permission. 00016 * 00017 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 00018 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 00019 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 00020 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 00021 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 00022 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 00023 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 00024 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 00025 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 00026 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 00027 * POSSIBILITY OF SUCH DAMAGE. 00028 * 00029 */ 00030 00040 #include <mongo_ros/exceptions.h> 00041 #include <mongo_ros/mongo_ros.h> 00042 #include <std_msgs/String.h> 00043 #include <boost/format.hpp> 00044 #include <boost/foreach.hpp> 00045 00046 namespace mongo_ros 00047 { 00048 00049 using std::string; 00050 using std::vector; 00051 namespace ser=ros::serialization; 00052 namespace mt=ros::message_traits; 00053 00054 00055 template <class M> 00056 MessageCollection<M>::MessageCollection (const string& db, 00057 const string& coll, 00058 const string& db_host, 00059 const unsigned db_port) : 00060 ns_(db+"."+coll), 00061 insertion_pub_(nh_.advertise<std_msgs::String>("warehouse/"+db+"/"+coll+"/inserts", 100)) 00062 { 00063 initialize(db, coll, db_host, db_port); 00064 } 00065 00066 00067 template <class M> 00068 void MessageCollection<M>::initialize (const string& db, const string& coll, 00069 const string& host, 00070 const unsigned port) 00071 { 00072 conn_ = makeDbConnection(nh_, host, port); 00073 00074 gfs_.reset(new mongo::GridFS(*conn_, db)); 00075 ROS_DEBUG_NAMED ("create_collection", "Constructed collection"); 00076 00077 ensureIndex("creation_time"); 00078 00079 // Add to the metatable 00080 const string meta_ns = db+".ros_message_collections"; 00081 if (!conn_->count(meta_ns, BSON("name" << coll))) 00082 { 00083 ROS_DEBUG_NAMED ("create_collection", "Inserting metadata"); 00084 typedef typename mt::DataType<M> DataType; 00085 const string datatype = DataType().value(); 00086 typedef typename mt::MD5Sum<M> Md5; 00087 const string md5 = Md5().value(); 00088 conn_->insert(meta_ns, BSON("name" << coll << "type" << datatype 00089 << "md5sum" << md5)); 00090 } 00091 else 00092 ROS_DEBUG_NAMED("create_collection", "Not inserting metadata"); 00093 00094 if (insertion_pub_.getNumSubscribers()==0) 00095 { 00096 ros::WallDuration d(0.1); 00097 ROS_DEBUG_STREAM_NAMED ("create_collection", 00098 "Waiting " << d.toSec() << 00099 " for any additional notification subscribers"); 00100 d.sleep(); 00101 } 00102 } 00103 00104 template <class M> 00105 MessageCollection<M>& MessageCollection<M>::ensureIndex 00106 (const string& field) 00107 { 00108 conn_->ensureIndex(ns_, BSON(field << 1)); 00109 return *this; 00110 } 00111 00112 template <class M> 00113 void MessageCollection<M>::insert 00114 (const M& msg, const Metadata& metadata) 00115 { 00117 const mongo::BSONObj bson = metadata; 00118 mongo::OID id; 00119 bson["_id"].Val(id); 00120 00122 const size_t serial_size = ser::serializationLength(msg); 00123 boost::shared_array<uint8_t> buffer(new uint8_t[serial_size]); 00124 ser::OStream stream(buffer.get(), serial_size); 00125 ser::serialize(stream, msg); 00126 const char* data = (char*) buffer.get(); 00127 00128 // Store in grid fs 00129 mongo::BSONObj file_obj = gfs_->storeFile(data, serial_size, id.toString()); 00130 00131 // Add blob id to metadata and store it in the message collection 00132 mongo::BSONObjBuilder builder; 00133 builder.appendElements(bson); 00134 mongo::OID blob_id; 00135 file_obj["_id"].Val(blob_id); 00136 builder.append("blob_id", blob_id); 00137 mongo::BSONObj entry = builder.obj(); 00138 conn_->insert(ns_, entry); 00139 00140 00141 // Publish ROS notification 00142 std_msgs::String notification; 00143 notification.data = entry.jsonString(); 00144 insertion_pub_.publish(notification); 00145 } 00146 00147 template <class M> 00148 typename QueryResults<M>::range_t 00149 MessageCollection<M>::queryResults (const mongo::BSONObj& query, 00150 const bool metadata_only, 00151 const string& sort_by, 00152 const bool ascending) const 00153 { 00154 mongo::Query bson(query.copy()); 00155 if (sort_by.size() > 0) 00156 bson = bson.sort(sort_by, ascending ? 1 : -1); 00157 return queryResults(bson, metadata_only); 00158 } 00159 00160 template <class M> 00161 typename QueryResults<M>::range_t 00162 MessageCollection<M>::queryResults (const mongo::Query& bson, 00163 const bool metadata_only) const 00164 { 00165 return typename QueryResults<M>::range_t 00166 (ResultIterator<M>(conn_, ns_, bson, gfs_, metadata_only), 00167 ResultIterator<M>()); 00168 } 00169 00170 template <class M> 00171 vector <typename MessageWithMetadata<M>::ConstPtr> 00172 MessageCollection<M>::pullAllResults (const mongo::BSONObj& query, 00173 const bool metadata_only, 00174 const string& sort_by, 00175 const bool ascending) const 00176 { 00177 typename QueryResults<M>::range_t res = queryResults(query, metadata_only, 00178 sort_by, ascending); 00179 return vector<typename MessageWithMetadata<M>::ConstPtr> 00180 (res.first, res.second); 00181 } 00182 00183 00184 template <class M> 00185 typename MessageWithMetadata<M>::ConstPtr 00186 MessageCollection<M>::findOne (const Query& q,const bool metadata_only) const 00187 { 00188 typename QueryResults<M>::range_t res = queryResults(q, metadata_only); 00189 if (res.first==res.second) 00190 throw NoMatchingMessageException(ns_); 00191 return *res.first; 00192 } 00193 00194 00195 template <class M> 00196 unsigned MessageCollection<M>::removeMessages (const mongo::BSONObj& query) 00197 { 00198 unsigned num_removed = 0; 00199 typedef typename MessageWithMetadata<M>::ConstPtr Msg; 00200 vector<Msg> msgs = pullAllResults(query, true); 00201 conn_->remove(ns_, query); 00202 00203 // Also remove the raw messages from gridfs 00204 BOOST_FOREACH (Msg m, msgs) 00205 { 00206 mongo::OID id; 00207 m->metadata["_id"].Val(id); 00208 gfs_->removeFile(id.toString()); 00209 num_removed++; 00210 } 00211 00212 return num_removed; 00213 } 00214 00215 template <class M> 00216 void MessageCollection<M>::modifyMetadata (const Query& q, const Metadata& m) 00217 { 00218 typename MessageWithMetadata<M>::ConstPtr orig = findOne(q, true); 00219 00220 mongo::BSONObjBuilder new_meta_builder; 00221 00222 set<string> fields; 00223 m.getFieldNames(fields); 00224 00225 BOOST_FOREACH (const string& f, fields) 00226 { 00227 if ((f!="_id") && (f!="creation_time")) 00228 new_meta_builder.append(BSON("$set" << BSON(f << m.getField(f))).\ 00229 getField("$set")); 00230 } 00231 00232 mongo::BSONObj new_meta = new_meta_builder.obj().copy(); 00233 conn_->update(ns_, q, new_meta); 00234 } 00235 00236 template <class M> 00237 unsigned MessageCollection<M>::count () 00238 { 00239 return conn_->count(ns_); 00240 } 00241 00242 } // namespace