00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 #ifndef ROSCPP_SESSIONS_H
00036 #define ROSCPP_SESSIONS_H
00037
00038 #include <string>
00039 #include <sstream>
00040 #include <map>
00041 #include <list>
00042 #include <ros/node_handle.h>
00043 #include <ros/service.h>
00044 #include <ros/service_traits.h>
00045 #include <boost/thread/thread.hpp>
00046 #include <boost/thread/mutex.hpp>
00047 #include <boost/thread/condition.hpp>
00048 #include <boost/bind.hpp>
00049 #include <boost/shared_ptr.hpp>
00050
00051 namespace ros {
00052 namespace session {
00053
00054 class abstractSession
00055 {
00056 public:
00057 abstractSession(const std::string session_name, int sessionid) : _sessionid(sessionid), bterminated(false) {
00058 std::stringstream ss; ss << _sessionid;
00059 _session_name = _nh.resolveName(session_name);
00060 size_t pos = _session_name.rfind('/');
00061 if( pos == std::string::npos )
00062 _session_dir = "/";
00063 else
00064 _session_dir = _session_name.substr(0,pos+1);
00065 _sessionheader[_session_name] = ss.str();
00066 _threadasync = boost::thread(boost::bind(&abstractSession::_AsyncWorkerThread, this));
00067 }
00068
00069 virtual ~abstractSession() {
00070 if( !bterminated ) {
00071 close_connections();
00072 bterminated = true;
00073 }
00074
00075 {
00076 boost::mutex::scoped_lock lock(async_mutex);
00077 _condHasAsyncCalls.notify_all();
00078 }
00079 _threadasync.join();
00080 }
00081
00085 template<class MReq, class MRes>
00086 bool call(const std::string &service_name, MReq &req, MRes &res, bool bAsync=false)
00087 {
00088 if( service_name.size() == 0 ) {
00089 ROS_WARN("service name empty");
00090 return false;
00091 }
00092
00093 boost::mutex::scoped_lock servlock(serv_mutex);
00094 if( bterminated ) {
00095 ROS_WARN("session instance already terminated");
00096 return false;
00097 }
00098
00099
00100
00101
00102
00103
00104 namespace st = service_traits;
00105 if (strcmp(st::md5sum(req), st::md5sum(res))) {
00106 ROS_ERROR("The request and response parameters to the service "
00107 "call must be autogenerated from the same "
00108 "server definition file (.srv). your service call "
00109 "for %s appeared to use request/response types "
00110 "from different .srv files. (%s vs. %s)", service_name.c_str(), st::md5sum(req), st::md5sum(res));
00111 return false;
00112 }
00113
00114
00115
00116
00117 std::map<std::string, ServiceClient>::iterator itserv = _mapservices.find(service_name);
00118 ServiceClient handle;
00119
00120 if( itserv != _mapservices.end() ) {
00121 if( itserv->second.isValid() ) {
00122 handle = itserv->second;
00123 }
00124 else {
00125 _mapservices.erase(itserv);
00126 }
00127 }
00128
00129 if( !handle ) {
00130
00131 std::string global_service_name;
00132 if( service_name[0] != '/' ) {
00133 global_service_name = _session_dir + service_name;
00134 }
00135 else {
00136 global_service_name = service_name;
00137 }
00138 ros::ServiceClientOptions ops(global_service_name, st::md5sum(req), true, _sessionheader);
00139 handle = _nh.serviceClient(ops);
00140 if( !handle ) {
00141 return false;
00142 }
00143 _mapservices[service_name] = handle;
00144 }
00145
00146 if( bAsync ) {
00147
00148 boost::mutex::scoped_lock lock(async_mutex);
00149 listAsyncCalls.push_back(AsyncCallBasePtr(new AsyncCall<MReq,MRes>(handle, st::md5sum(req), new MReq(req), new MRes(res))));
00150 return true;
00151 }
00152
00153
00154 _flush_async();
00155 return handle.call(req, res);
00156 }
00157
00159 const std::string& GetSessionName() const {
00160 return _session_name;
00161 }
00162 int GetSessionId() const {
00163 return _sessionid;
00164 }
00165
00167 virtual bool close_connections()
00168 {
00169 if( bterminated ) {
00170 ROS_WARN("session instance already terminated");
00171 return false;
00172 }
00173
00174 boost::mutex::scoped_lock lock(serv_mutex);
00175 _flush_async();
00176 _mapservices.clear();
00177
00178 return true;
00179 }
00180
00182 virtual bool terminate() = 0;
00183 virtual bool isterminated() const {
00184 return bterminated;
00185 }
00186
00187 private:
00188 ros::NodeHandle _nh;
00189 std::string _session_name, _session_dir;
00190 int _sessionid;
00191 M_string _sessionheader;
00192
00193 std::map<std::string, ServiceClient> _mapservices;
00194
00195 class AsyncCallBase
00196 {
00197 public:
00198 AsyncCallBase(ros::ServiceClient handle, const std::string& server_md5sum) : _handle(handle), _server_md5sum(server_md5sum) {
00199 }
00200 virtual ~AsyncCallBase() {
00201 }
00202 virtual bool call() = 0;
00203 ros::ServiceClient _handle;
00204 std::string _server_md5sum;
00205 };
00206 typedef boost::shared_ptr<AsyncCallBase> AsyncCallBasePtr;
00207
00208 template<class MReq, class MRes>
00209 class AsyncCall : public AsyncCallBase
00210 {
00211 public:
00212 AsyncCall(ros::ServiceClient handle, const std::string& server_md5sum, MReq* preq, MRes* pres)
00213 : AsyncCallBase(handle, server_md5sum), req(preq), res(pres) {
00214 }
00215 virtual bool call() {
00216 return _handle.call(*req.get(), *res.get(), _server_md5sum);
00217 }
00218 boost::shared_ptr<MReq> req;
00219 boost::shared_ptr<MRes> res;
00220 };
00221
00222 std::list< AsyncCallBasePtr > listAsyncCalls;
00223 boost::thread _threadasync;
00224
00225 void _AsyncWorkerThread()
00226 {
00227 boost::mutex::scoped_lock lock(async_mutex);
00228
00229 while(!bterminated) {
00230 if( listAsyncCalls.size() == 0 ) {
00231 _condHasAsyncCalls.wait(lock);
00232 }
00233 if( bterminated ) {
00234 break;
00235 }
00236
00237 ROS_ASSERT(listAsyncCalls.size()>0);
00238 if( listAsyncCalls.size() == 0 ) {
00239 continue;
00240 }
00241
00242 AsyncCallBasePtr asynccall = listAsyncCalls.front();
00243 lock.unlock();
00244
00245 if( !asynccall->call() ) {
00246 ROS_WARN("async service call failed\n");
00247 }
00248
00249 lock.lock();
00250 ROS_ASSERT(listAsyncCalls.size() > 0);
00251 ROS_ASSERT(listAsyncCalls.front()->_handle == asynccall->_handle);
00252 listAsyncCalls.pop_front();
00253
00254 if( listAsyncCalls.size() == 0 ) {
00255 _condAsyncCallsFlushed.notify_all();
00256 }
00257 }
00258 }
00259
00260 protected:
00262 virtual void _flush_async()
00263 {
00264 boost::mutex::scoped_lock lock(async_mutex);
00265 if( listAsyncCalls.size() == 0 ) {
00266 return;
00267 }
00268 _condHasAsyncCalls.notify_all();
00269 _condAsyncCallsFlushed.wait(async_mutex);
00270 ROS_ASSERT( listAsyncCalls.size() == 0 );
00271 }
00272
00273 boost::mutex serv_mutex, async_mutex;
00274 boost::condition _condHasAsyncCalls, _condAsyncCallsFlushed;
00275 bool bterminated;
00276 };
00277
00278 template<class MReq, class MRes>
00279 class Session : public abstractSession
00280 {
00281 public:
00282 Session(const std::string _session_name, int _sessionid)
00283 : abstractSession(_session_name, _sessionid) {
00284 }
00285
00286 Session(const std::string _session_name, int _sessionid, const MReq& req, const MRes& res)
00287 : abstractSession(_session_name, _sessionid), session_req(req), session_res(res) {
00288 }
00289 virtual ~Session() {
00290 }
00291 virtual bool terminate() {
00292 if( bterminated ) {
00293 ROS_WARN("session instance already terminated");
00294 return false;
00295 }
00296 close_connections();
00297 boost::mutex::scoped_lock lock(serv_mutex);
00298 session_req.sessionid = GetSessionId();
00299 bool bsuccess = service::call(GetSessionName(), session_req, session_res);
00300 bterminated = true;
00301 return bsuccess;
00302 }
00303
00304 protected:
00305 MReq session_req;
00306 MRes session_res;
00307 };
00308
00310 typedef boost::shared_ptr<abstractSession> abstractSessionHandle;
00311
00312 template<class MReq, class MRes>
00313 abstractSessionHandle create_session(const std::string &session_name, MReq &req, MRes &res)
00314 {
00315
00316
00317
00318
00319 if( !service::call(session_name,req,res) )
00320 return abstractSessionHandle();
00321
00322 return abstractSessionHandle(new Session<MReq,MRes>(session_name, res.sessionid, req, res));
00323 }
00324
00325 }
00326 }
00327
00328 #endif