00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00040 #include <warehouse/PullMessage.h>
00041 #include <warehouse/exception.h>
00042
00043 namespace warehouse
00044 {
00045
00046 using std::vector;
00047 using std::string;
00048
00050 template <class M>
00051 ResultIterator<M>::ResultIterator (const ros::NodeHandle nh, const InitiateQuery& srv,
00052 ros::ServiceClient& pull_client,
00053 const bool metadata_only) :
00054 sub_(new ResultIterator<M>::QueryResultSubscriber(nh, srv, pull_client, metadata_only))
00055 {}
00056
00057 template <class M>
00058 ResultIterator<M>::QueryResultSubscriber::QueryResultSubscriber (const ros::NodeHandle& nh,
00059 const InitiateQuery& srv,
00060 const ros::ServiceClient& pull_client,
00061 const bool metadata_only) :
00062 nh_(nh), query_id_(srv.response.query_id),
00063 topic_base_("warehouse/" + srv.request.db_name + "/" + srv.request.collection_name + "/"),
00064 data_received_(true),
00065 sub_(nh_.subscribe(topic_base_ + query_id_, 10,
00066 &ResultIterator<M>::QueryResultSubscriber::messageCallback, this)),
00067 metadata_sub_(nh_.subscribe(topic_base_ + "metadata/" + query_id_, 10,
00068 &ResultIterator<M>::QueryResultSubscriber::metadataCallback, this)),
00069 pull_client_(pull_client), metadata_only_(metadata_only), initial_fetch_(false)
00070 {
00071 }
00072
00074 template <class M>
00075 ResultIterator<M>::ResultIterator ()
00076 {
00077 }
00078
00080 template <class M>
00081 ResultIterator<M>::ResultIterator (const ResultIterator<M>& res) :
00082 sub_(res.sub_)
00083
00084 {
00085 }
00086
00087 template <class M>
00088 void
00089 ResultIterator<M>::maybeFetchFirst () const
00090 {
00091 ROS_ASSERT_MSG(sub_, "Code assumption violated");
00092 if (!sub_->initial_fetch_) {
00093 sub_->fetchNext();
00094 sub_->initial_fetch_ = true;
00095 }
00096 }
00097
00098
00099 template <class M>
00100 typename MessageWithMetadata<M>::ConstPtr
00101 ResultIterator<M>::dereference() const
00102 {
00103 maybeFetchFirst();
00104 ROS_ASSERT_MSG(sub_->next_result_, "Can't dereference iterator: no more results");
00105 return sub_->next_result_;
00106 }
00107
00108 template <class M>
00109 void ResultIterator<M>::increment ()
00110 {
00111 maybeFetchFirst();
00112 ROS_ASSERT_MSG(sub_ && sub_->next_result_, "Can't increment iterator past the end");
00113 sub_->fetchNext();
00114 }
00115
00116 template <class M>
00117 void
00118 ResultIterator<M>::QueryResultSubscriber::fetchNext ()
00119 {
00120 PullMessage srv;
00121 srv.request.query_id = query_id_;
00122 data_received_ = metadata_only_;
00123 metadata_received_ = false;
00124 boost::unique_lock<boost::mutex> l(mutex_);
00125 next_result_.reset(new MessageWithMetadata<M>());
00126 if (!pull_client_.call(srv))
00127 throw WarehouseClientException("Service call to pull_message failed");
00128 ROS_DEBUG_NAMED ("query_int", "pulling %u messages", srv.response.num_messages_sent);
00129 if (srv.response.num_messages_sent > 0)
00130 {
00131 ROS_ASSERT(srv.response.num_messages_sent == 1);
00132 ROS_DEBUG_NAMED ("query_int", "Waiting for unique lock");
00133 while (!(data_received_ && metadata_received_) && ros::ok()) {
00134 ROS_DEBUG_NAMED ("query_int", "Waiting on condition variable");
00135 cond.wait(l);
00136 }
00137 }
00138 else
00139 next_result_.reset();
00140 ROS_DEBUG_NAMED ("query_int", "Done fetching");
00141 }
00142
00143
00144 template <class M>
00145 void ResultIterator<M>::QueryResultSubscriber::messageCallback (typename M::ConstPtr msg)
00146 {
00147 ROS_ASSERT_MSG (!data_received_, "Code assumption violated: data received though we didn't ask for it.");
00148 ROS_DEBUG_NAMED ("query_int", "Waiting for lock guard");
00149 boost::lock_guard<boost::mutex> l(mutex_);
00150 M* base = next_result_.get();
00151 *base = *msg;
00152 ROS_DEBUG_STREAM_NAMED ("query_int", "Received message " << *msg);
00153 data_received_ = true;
00154 if (metadata_received_)
00155 cond.notify_all();
00156 }
00157
00158 template <class M>
00159 void ResultIterator<M>::QueryResultSubscriber::metadataCallback (std_msgs::String::ConstPtr m)
00160 {
00161 ROS_ASSERT_MSG (!metadata_received_, "Code assumption violated: data received though we didn't ask for it.");
00162 ROS_DEBUG_NAMED ("query_int", "Waiting for lock guard");
00163 boost::lock_guard<boost::mutex> l(mutex_);
00164 ROS_DEBUG_NAMED ("query_int", "Got lock guard");
00165 next_result_->metadata = m->data;
00166 ROS_DEBUG_NAMED ("query_int", "Received metadata %s", m->data.c_str());
00167 metadata_received_ = true;
00168 if (data_received_)
00169 cond.notify_all();
00170 }
00171
00172
00173 template <class M>
00174 bool ResultIterator<M>::equal (const ResultIterator<M>& other) const
00175 {
00178 if (sub_) {
00179 maybeFetchFirst();
00180 return !sub_->next_result_ && (!other.sub_ || !other.sub_->next_result_);
00181 }
00182 else
00183 return !other.sub_ || !other.sub_->next_result_;
00184 }
00185
00186
00187
00188
00189 }