00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00040 #include <warehouse/result_iterator.h>
00041 #include <warehouse/Metadata.h>
00042 #include <warehouse/PullMessage.h>
00043 #include <warehouse/InitiateQuery.h>
00044 #include <warehouse/RemoveMessages.h>
00045 #include <yaml-cpp/yaml.h>
00046
00047 namespace warehouse
00048 {
00049
00050 using std::string;
00051
00052
00053
00054
00055
00056
00057 template <class M>
00058 Collection<M>::Collection (const std::string& topic, const string& db,
00059 const string& coll, const ros::NodeHandle& nh) :
00060 impl_(new Collection<M>::Impl(topic, db, coll, nh))
00061 {
00062 }
00063
00064 template <class M>
00065 Collection<M>::Collection (const Collection<M>& c) :
00066 impl_(c.impl_)
00067 {
00068 }
00069
00070 template <class M>
00071 Collection<M>::Collection ()
00072 {}
00073
00074 template <class M>
00075 Collection<M>&
00076 Collection<M>::operator= (const Collection<M>& c)
00077 {
00078 impl_ = c.impl_;
00079 return *this;
00080 }
00081
00082 template <class M>
00083 Collection<M>::Impl::Impl (const string& topic, const string& db,
00084 const string& coll, const ros::NodeHandle& nh) :
00085 nh_(nh), cb_queue_(new ros::CallbackQueue()), spinner_(1, cb_queue_.get()),
00086 query_client_(nh_.serviceClient<InitiateQuery>("initiate_query")),
00087 pull_client_(nh_.serviceClient<PullMessage>("pull_message")),
00088 metadata_client_(nh_.serviceClient<Metadata>("add_metadata")),
00089 remove_messages_client_(nh_.serviceClient<RemoveMessages>("remove_messages")),
00090 topic_(topic), db_(db), collection_(coll), initialized_(false),
00091 node_name_(ros::this_node::getName()),
00092 warehouse_sub_connection_received_(false)
00093 {
00094 nh_.setCallbackQueue(cb_queue_.get());
00095 spinner_.start();
00096 setupConnections();
00097 }
00098
00099 inline
00100 void waitForService (ros::ServiceClient& client)
00101 {
00102 while (ros::ok()) {
00103 if (client.waitForExistence(ros::Duration(5.0)))
00104 return;
00105 ROS_INFO_NAMED("init", "Waiting for service %s to exist",
00106 client.getService().c_str());
00107 }
00108 }
00109
00110
00111
00112
00113
00114 template <class M>
00115 void Collection<M>::Impl::setupConnections ()
00116 {
00117 ROS_ASSERT (!initialized_);
00118 pub_ = nh_.advertise<M>(topic_, 10, boost::bind(&Collection<M>::Impl::\
00119 connectCallback, this, _1));
00120
00121
00122 ros::Rate r(10);
00123 while (!warehouse_sub_connection_received_ && ros::ok()) {
00124 ROS_DEBUG_STREAM_NAMED ("setup_collection", "Waiting for warehouse connection");
00125 r.sleep();
00126 ros::spinOnce();
00127 }
00128
00129 const string notification_topic = "warehouse/" + db_ + "/" + collection_ + "/notify";
00130 sub_ = nh_.subscribe(notification_topic, 10,
00131 &Collection<M>::Impl::notificationCallback, this);
00132
00133
00134
00135
00136 while (sub_.getNumPublishers()==0) {
00137 ROS_DEBUG_STREAM_NAMED ("setup_collection",
00138 "Waiting for connection to warehouse on " <<
00139 notification_topic);
00140 r.sleep();
00141 ros::spinOnce();
00142 }
00143
00144 ROS_DEBUG_STREAM_NAMED ("setup_collection", "Waiting for services to exist");
00145 waitForService(query_client_);
00146 waitForService(pull_client_);
00147 waitForService(metadata_client_);
00148 waitForService(remove_messages_client_);
00149 initialized_ = true;
00150 ROS_DEBUG_STREAM_NAMED ("setup_collection", "Collection has been set up");
00151 }
00152
00153
00154
00155
00156
00157
00158 template <class M>
00159 void Collection<M>::Impl::notificationCallback (const UpdateNotification& update)
00160 {
00161 ROS_DEBUG_NAMED ("publish_int", "In notification callback");
00162 boost::lock_guard<boost::mutex> l(mutex_);
00163 if (update.update_type == UpdateNotification::NEW_MESSAGE &&
00164 update.sender == node_name_) {
00165 ROS_DEBUG_STREAM_NAMED ("publish_int", "Received ack from " << node_name_);
00166 last_id_ = update.id;
00167 cond_.notify_all();
00168 }
00169 else
00170 ROS_DEBUG_STREAM_NAMED ("publish_int", "Ignoring update " << update <<
00171 " in notification callback");
00172 }
00173
00174
00175 template <class M>
00176 void Collection<M>::Impl::connectCallback (const ros::SingleSubscriberPublisher& pub)
00177 {
00178 boost::mutex::scoped_lock l(mutex_);
00179 const string sub_name = pub.getSubscriberName();
00180 if (sub_name == nh_.resolveName("warehouse")) {
00181 warehouse_sub_connection_received_ = true;
00182 ROS_DEBUG_STREAM_NAMED ("connect_callback", "Received connection callback from " << sub_name);
00183 }
00184 else
00185 ROS_WARN_STREAM_NAMED ("connect_callback", "Received subscriber connection from " <<
00186 sub_name << "; assuming this is not the actual warehouse");
00187 }
00188
00189
00190
00191
00192
00193
00194 template <class M>
00195 typename QueryResults<M>::range_t
00196 Collection<M>::queryResults (const vector<Condition>& conditions,
00197 const bool metadata_only,
00198 const OrderingCriterion& order) const
00199 {
00200 ROS_ASSERT(impl_ && impl_->initialized_);
00201 boost::mutex::scoped_lock l(impl_->mutex_);
00202
00203
00204 InitiateQuery srv;
00205 srv.request.db_name = impl_->db_;
00206 srv.request.collection_name = impl_->collection_;
00207 srv.request.query = conditions;
00208 srv.request.order = order;
00209 srv.request.metadata_only = metadata_only;
00210
00211
00212 if (!impl_->query_client_.call(srv))
00213 throw WarehouseClientException("Service call to initiate_query failed");
00214 ROS_DEBUG_STREAM_NAMED ("initiate_query", "Service call to initiate query succeeded");
00215
00216
00217 return typename QueryResults<M>::range_t
00218 (ResultIterator<M>(impl_->nh_, srv, impl_->pull_client_, metadata_only),
00219 ResultIterator<M>());
00220 }
00221
00222 template <class M>
00223 vector<typename MessageWithMetadata<M>::ConstPtr >
00224 Collection<M>::pullAllResults (const vector<Condition>& conditions,
00225 const bool metadata_only,
00226 const OrderingCriterion& order) const
00227 {
00228 ROS_ASSERT(impl_ && impl_->initialized_);
00229 typename QueryResults<M>::range_t res = queryResults(conditions, metadata_only, order);
00230 return vector<typename MessageWithMetadata<M>::ConstPtr > (res.first, res.second);
00231 }
00232
00233
00234
00235
00236
00237
00238
00239 inline
00240 string emptyMetadata ()
00241 {
00242 YAML::Emitter out;
00243 out << YAML::BeginMap << YAML::EndMap;
00244 return string(out.c_str());
00245 }
00246
00247
00248
00249 template <class M>
00250 string Collection<M>::publish (const M& msg, const string& metadata)
00251 {
00252 ROS_ASSERT(impl_ && impl_->initialized_);
00253 ROS_DEBUG_NAMED ("publish_int", "Acquiring unique lock");
00254 boost::unique_lock<boost::mutex> l(impl_->mutex_);
00255 impl_->last_id_.reset();
00256 ROS_DEBUG_NAMED ("publish_int", "Publishing message");
00257 impl_->pub_.publish(msg);
00258 while (!impl_->last_id_ && ros::ok()) {
00259 ROS_DEBUG_NAMED ("publish_int", "Waiting on condition variable");
00260 impl_->cond_.wait(l);
00261 }
00262 addMetadata(*impl_->last_id_, metadata.size() > 0 ? metadata : emptyMetadata());
00263 ROS_DEBUG_NAMED ("publish_int", "Added metadata");
00264 return *impl_->last_id_;
00265 }
00266
00267 template <class M>
00268 void Collection<M>::addMetadata (const string& id, const string& metadata)
00269 {
00270 ROS_ASSERT(impl_ && impl_->initialized_);
00271 Metadata srv;
00272 srv.request.db_name = impl_->db_;
00273 srv.request.collection_name = impl_->collection_;
00274 srv.request.id = id;
00275 srv.request.metadata = metadata;
00276
00277
00278 if (!impl_->metadata_client_.call(srv))
00279 throw WarehouseClientException("Service call to add_metadata failed");
00280
00281
00282 if (srv.response.error_code != Metadata::Response::SUCCESS)
00283 throw MetadataException(srv.response.error_code, srv.response.error_msg);
00284 }
00285
00286
00287 template <class M>
00288 unsigned
00289 Collection<M>::removeMessages (const vector<Condition>& conditions)
00290 {
00291 ROS_ASSERT(impl_ && impl_->initialized_);
00292 boost::mutex::scoped_lock l(impl_->mutex_);
00293 RemoveMessages srv;
00294 srv.request.db_name = impl_->db_;
00295 srv.request.collection_name = impl_->collection_;
00296 srv.request.conditions = conditions;
00297
00298 if (!impl_->remove_messages_client_.call(srv))
00299 throw WarehouseClientException("Service call to remove_messages failed");
00300 if (srv.response.error_code != RemoveMessages::Response::SUCCESS)
00301 throw WarehouseClientException
00302 (format("Call to remove_messages returned error %1%: '%2%'") %
00303 srv.response.error_code % srv.response.error_msg);
00304 return srv.response.num_messages_removed;
00305 }
00306
00307
00308
00309
00310
00311 template <class M>
00312 string Collection<M>::collectionName () const
00313 {
00314 return impl_->collection_;
00315 }
00316
00317
00318
00319 }