00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00039 #ifndef POSE_GRAPH_GRAPH_DB_IMPL_H
00040 #define POSE_GRAPH_GRAPH_DB_IMPL_H
00041
00042 #include <pose_graph/exception.h>
00043 #include <graph_mapping_utils/general.h>
00044 #include <ros/ros.h>
00045 #include <sstream>
00046 #include <boost/foreach.hpp>
00047 #include <boost/lexical_cast.hpp>
00048
00049
00050 namespace pose_graph
00051 {
00052
00053 namespace util=graph_mapping_utils;
00054 namespace wh=warehouse;
00055
00056 using std::string;
00057 using std::map;
00058 using std::vector;
00059 using boost::format;
00060
00061 typedef boost::mutex::scoped_lock Lock;
00062
00063 template <class M>
00064 typename M::ConstPtr
00065 getNodeData (wh::Collection<M>* client, const unsigned n)
00066 {
00067 vector<wh::Condition> cond(1);
00068 cond[0].field_name = "node";
00069 cond[0].predicate = wh::Condition::EQUALS;
00070 cond[0].args.push_back((format("%1%") % n).str());
00071 vector<typename wh::MessageWithMetadata<M>::ConstPtr> res =
00072 client->pullAllResults(cond);
00073 if (res.size() != 1)
00074 throw DataNotFoundException(n, res.size());
00075 return res[0];
00076 }
00077
00078
00079 template <class M>
00080 CachedNodeMap<M>::CachedNodeMap (wh::Collection<M>* coll) :
00081 initialized_(false),
00082 coll_(coll)
00083 {
00084 initialLoad();
00085 initialized_ = true;
00086 }
00087
00088 template <class M>
00089 CachedNodeMap<M>::CachedNodeMap (wh::WarehouseClient* client,
00090 const string& name) :
00091 initialized_(false),
00092 coll_ptr_(new wh::Collection<M>(client->setupCollection<M>
00093 (name, nodeIndex(), nodeIndex()))),
00094 coll_(coll_ptr_.get()),
00095 update_sub_(nh_.subscribe("warehouse/" + client->dbName() + "/" + name +
00096 "/notify", 100, &CachedNodeMap::updateCB, this))
00097 {
00098 initialLoad();
00099 initialized_ = true;
00100 }
00101
00102
00103 template <class M>
00104 void CachedNodeMap<M>::updateCB (const wh::UpdateNotification& m)
00105 {
00106 Lock l(mutex_);
00107 if (initialized_)
00108 {
00109 if (m.update_type == wh::UpdateNotification::UPDATE_METADATA)
00110 {
00111 const int n = wh::getMetadataVal<int>(m.metadata, "node");
00112 const string name = ros::this_node::getName();
00113 if (m.sender != name)
00114 {
00115 map_.erase(n);
00116 }
00117 }
00118 }
00119 else
00120 ROS_INFO_STREAM ("Ignoring update notification " << m <<
00121 " because not initialized yet.");
00122 }
00123
00124 template <class M>
00125 CachedNodeMap<M>::CachedNodeMap (const CachedNodeMap<M>& m) :
00126 initialized_(m.initialized_),
00127 coll_(m.coll_), map_(m.map_), update_sub_(m.update_sub_)
00128 {
00129 ROS_ASSERT(initialized_);
00130 }
00131
00132 template <class M>
00133 void CachedNodeMap<M>::initialLoad ()
00134 {
00135 map<unsigned, typename M::ConstPtr> map;
00136 BOOST_FOREACH (typename wh::MessageWithMetadata<M>::ConstPtr res,
00137 coll_->pullAllResults(vector<wh::Condition>())) {
00138 std::stringstream ss(res->metadata);
00139 YAML::Parser parser(ss);
00140 YAML::Node doc;
00141 const bool found = parser.GetNextDocument(doc);
00142 ROS_ASSERT_MSG(found, "Couldn't parse %s as yaml string",
00143 res->metadata.c_str());
00144 const YAML::Node* pName = doc.FindValue("node");
00145 ROS_ASSERT_MSG(pName, "Could find node metadata in %s",
00146 res->metadata.c_str());
00147
00148 unsigned id;
00149 *pName >> id;
00150 const unsigned n(id);
00151 map[n] = res;
00152 ROS_DEBUG_STREAM_NAMED ("cached_node_map", "Preloaded data for "
00153 << n << " from " << coll_->collectionName());
00154 }
00155 Lock l(mutex_);
00156 map_ = map;
00157 }
00158
00159 template <class M>
00160 typename M::ConstPtr
00161 CachedNodeMap<M>::get (const unsigned n) const
00162 {
00163 typedef map<unsigned, typename M::ConstPtr> Map;
00164 typename Map::const_iterator pos;
00165 {
00166 Lock l(mutex_);
00167 pos = map_.find(n);
00168 }
00169 if (pos == map_.end()) {
00170 ROS_DEBUG_STREAM_NAMED ("cached_node_map", "Getting data for node " << n <<
00171 " from collection " << coll_->collectionName());
00172 typename M::ConstPtr msg = getNodeData(coll_, n);
00173 Lock l(mutex_);
00174 return (map_[n] = msg);
00175 }
00176 else
00177 return pos->second;
00178 }
00179
00180 template <class M>
00181 void CachedNodeMap<M>::removeExisting (const unsigned n)
00182 {
00183 typedef typename wh::MessageWithMetadata<M>::ConstPtr Msg;
00184 vector<wh::Condition> conditions(1);
00185 conditions[0].field_name = "node";
00186 conditions[0].predicate = wh::Condition::EQUALS;
00187 conditions[0].args.push_back(boost::lexical_cast<string>(n));
00188 vector<Msg> existing = coll_->pullAllResults(conditions, true);
00189 if (existing.size()>0)
00190 {
00191 const unsigned nr = coll_->removeMessages(conditions);
00192 if (nr>1)
00193 ROS_WARN ("Unexpected situation: more than one message was found for "
00194 "node %u in collection %s", nr, coll_->collectionName().c_str());
00195 }
00196 Lock l(mutex_);
00197 map_.erase(n);
00198 }
00199
00200 template <class M>
00201 void
00202 CachedNodeMap<M>::set (const unsigned n, typename M::ConstPtr msg)
00203 {
00204 removeExisting(n);
00205 coll_->publish(*msg, nodeMetadata(n));
00206 Lock l(mutex_);
00207 map_[n] = msg;
00208 }
00209
00210 template <class M>
00211 void
00212 CachedNodeMap<M>::set (const unsigned n, const M& msg)
00213 {
00214 removeExisting(n);
00215 coll_->publish(msg, nodeMetadata(n));
00216 typename M::ConstPtr m(new M(msg));
00217 Lock l(mutex_);
00218 map_[n] = m;
00219 }
00220
00221 template <class M>
00222 bool CachedNodeMap<M>::hasData (const unsigned n) const
00223 {
00224 {
00225 Lock l(mutex_);
00226 if (util::contains(map_, n))
00227 return true;
00228 }
00229 vector<wh::Condition> cond(1);
00230 cond[0].field_name = "node";
00231 cond[0].predicate = wh::Condition::EQUALS;
00232 cond[0].args.push_back((format("%1%") % n).str());
00233 typename wh::QueryResults<M>::range_t res = coll_->queryResults(cond);
00234 return (!(res.first == res.second));
00235 }
00236
00237
00238
00239
00240 }
00241
00242 #endif // include guard