ServiceManager.h
Go to the documentation of this file.
00001 /*------------------------------------------------------------------------
00002  *---------------------           ros_rt_wmp              --------------------
00003  *------------------------------------------------------------------------
00004  *                                                         V0.1B  15/09/11
00005  *
00006  *
00007  *  File: ./src/ServiceManager.h
00008  *  Authors: Danilo Tardioli
00009  *  ----------------------------------------------------------------------
00010  *  Copyright (C) 2000-2011, Universidad de Zaragoza, SPAIN
00011  *
00012  *  Contact Addresses: Danilo Tardioli                   dantard@unizar.es
00013  *
00014  *  RT-WMP is free software; you can  redistribute it and/or  modify it
00015  *  under the terms of the GNU General Public License  as published by the
00016  *  Free Software Foundation;  either  version 2, or (at  your option) any
00017  *  later version.
00018  *
00019  *  RT-WMP  is distributed  in the  hope  that  it will be   useful, but
00020  *  WITHOUT  ANY  WARRANTY;     without  even the   implied   warranty  of
00021  *  MERCHANTABILITY  or  FITNESS FOR A  PARTICULAR PURPOSE.    See the GNU
00022  *  General Public License for more details.
00023  *
00024  *  You should have received  a  copy of  the  GNU General Public  License
00025  *  distributed with RT-WMP;  see file COPYING.   If not,  write to the
00026  *  Free Software  Foundation,  59 Temple Place  -  Suite 330,  Boston, MA
00027  *  02111-1307, USA.
00028  *
00029  *  As a  special exception, if you  link this  unit  with other  files to
00030  *  produce an   executable,   this unit  does  not  by  itself cause  the
00031  *  resulting executable to be covered by the  GNU General Public License.
00032  *  This exception does  not however invalidate  any other reasons why the
00033  *  executable file might be covered by the GNU Public License.
00034  *
00035  *----------------------------------------------------------------------*/
00036 #ifndef SERVICEMANAGER_H_
00037 #define SERVICEMANAGER_H_
00038 
00039 #include "Manager.h"
00040 
00041 template<class T> class ServiceManager: public Manager {
00042 protected:
00043         ros::Subscriber sub;
00044         ros::ServiceServer service;
00045         sem_t sem_resp;
00046         typename T::Response answ;
00047         int oks;
00048         int host;
00049 public:
00050         ServiceManager(ros::NodeHandle * n, int port, std::string name, int host, unsigned char priority) :
00051                 Manager(n, port, name, priority) {
00052                 std::ostringstream oss;
00053                 if (host != wmpGetNodeId()) {
00054                         oss << n->getNamespace() << "/R" << host << "/" << name;
00055                         service = n->advertiseService(oss.str(), &ServiceManager::callback, this);
00056                 }
00057                 this->host = host;
00058                 amIstatic = true;
00059         };
00060 
00061         void startRX(){
00062                 ROSWMP_DEBUG(stderr,"Starting RX threads\n");
00063 
00064                 createThreads();
00065         }
00066 
00067         ServiceManager(ros::NodeHandle * n, int port, std::string name, unsigned char priority) :
00068                 Manager(n, port, name, priority) {
00069                 this->host = -1;
00070                 amIstatic = false;
00071                 std::ostringstream oss;
00072                 oss << n->getNamespace() << "/remote/" << name;
00073                 service = n->advertiseService(oss.str(), &ServiceManager::callback, this);
00074                 init_param();
00075         }
00076 
00077         virtual void init_param() {
00078                 std::ostringstream s1,s2;
00079                 s1 << n->getNamespace() << "/" << name << "/dest";
00080                 param_dest = s1.str();
00081                 n->setParam(param_dest, host);
00082         }
00083 
00084 
00085         void createThreads() {
00086                 sem_init(&sem_resp, 0, 0);
00087                 boost::thread(boost::bind(&ServiceManager::run, this));
00088                 boost::thread(boost::bind(&ServiceManager::waitNetAnswer, this));
00089         }
00090 
00091 
00092         virtual void fillDestination(typename T::Request &req) {
00093                 int val;
00094                 if (!amIstatic) {
00095                         if (n->getParamCached(param_dest, val)){
00096                                 host = val;
00097                         }
00098                 }
00099         }
00100 
00101         virtual int getPriority(typename T::Request &req) {
00102                 return flow_prio;
00103         }
00104 
00105         virtual bool isHost() {
00106                 return this->host == wmpGetNodeId();
00107         }
00108 
00109         bool callback(typename T::Request &req, typename T::Response &resp) {
00110                 ROSWMP_DEBUG(stderr, "Callback received");
00111 
00112                 int n = serialize<typename T::Request> ((sbuff + sizeof(flow_t)), req);
00113 
00114                 if (!amIstatic){
00115                         fillDestination(req);
00116                 }
00117                 int priority = getPriority(req);
00118                 ROSWMP_DEBUG(stderr, "Sending request with priority: %d and host: %d", priority,host);
00119                 if (host >= 0){
00120                         wmpPushData(port, sbuff, n + sizeof(flow_t), 1 << host, priority);
00121                 } else{
00122                         ROS_ERROR("No destination specified, set .../dest param");
00123                         return false;
00124                 }
00125 
00126                 timespec ts;
00127                 clock_gettime(CLOCK_REALTIME, &ts);
00128                 ts.tv_sec += 3;
00129                 oks = 1;
00130                 if (sem_timedwait(&sem_resp, &ts) == 0 && oks) {
00131                         ROSWMP_DEBUG(stderr,"Received port = %d",port);
00132                         resp = answ;
00133                         return true;
00134                 } else {
00135                         return false;
00136                 }
00137         }
00138 
00139 
00140         void run() {
00141                 typename T::Request req;
00142                 ros::ServiceClient client = n->serviceClient<T> (name);
00143                 while (ros::ok()) {
00144                         char * p;
00145                         signed char pri;
00146                         unsigned int size;
00147                         unsigned char src;
00148                         ROSWMP_DEBUG(stderr, "Popping (wait) on port %d",port);
00149                         int idx = wmpPopData(port, &p, &size, &src, &pri);
00150                         flow_t * f = (flow_t *) p;
00151 
00152                         ROSWMP_DEBUG(stderr, "Deserializing");
00153                         deserialize<typename T::Request>((p + sizeof(flow_t)),size - sizeof(flow_t), req);
00154                         ROSWMP_DEBUG(stderr, "Deserialized");
00155 
00156                         wmpPopDataDone(idx);
00157 
00158                         /* Here I have the request */
00159                         T srv;
00160                         srv.request = req;
00161 
00162                         f = (flow_t *) sbuff;
00163 
00164                         ROSWMP_DEBUG(stderr, "Call service: %s",name.c_str());
00165 
00166                         if (client.call(srv)) {
00167                                 ROSWMP_DEBUG(stderr, "Call ok");
00168                                 f->ok = 1;
00169                         } else {
00170                                 ROSWMP_DEBUG(stderr, "Call error");
00171                                 f->ok = 0;
00172                         }
00173 
00174                         ROSWMP_DEBUG(stderr, "Post call");
00175                         //uint8_t unused = 0;
00176 
00177                         int n = serialize<typename T::Response>((sbuff + sizeof(flow_t)),srv.response);
00178                         //int n = srv.response.serializationLength();
00179                         //srv.response.serialize((uint8_t *) (buff + sizeof(flow_t)), unused);
00180 
00181                         wmpPushData(port + 1, sbuff, n + sizeof(flow_t), 1 << src, getPriority(req));
00182                         ROSWMP_DEBUG(stderr, "Pushed response");
00183                 }
00184         }
00185 
00186         void waitNetAnswer() {
00187                 while (ros::ok()) {
00188                         char * p;
00189                         signed char pri;
00190                         unsigned int size;
00191                         unsigned char src;
00192                         int idx = wmpPopData(port + 1, &p, &size, &src, &pri);
00193                         ROSWMP_DEBUG(stderr, "Received on Manager\n");
00194                         flow_t * f = (flow_t *) p;
00195 
00196                         //answ.deserialize((uint8_t*) (p + sizeof(flow_t)));
00197 
00198                         deserialize<typename T::Response>((p + sizeof(flow_t)), size - sizeof(flow_t), answ);
00199 
00200                         ROSWMP_DEBUG(stderr, "Deserializing Answer\n");
00201                         oks = f->ok;
00202                         wmpPopDataDone(idx);
00203                         ROSWMP_DEBUG(stderr, "Deserialized Answer result = %d  \n", oks);
00204                         sem_post(&sem_resp);
00205                 }
00206         }
00207 };
00208 
00209 
00210 #endif /* SERVICEMANAGER_H_ */


ros_rt_wmp
Author(s): Danilo Tardioli, dantard@unizar.es
autogenerated on Fri Jan 3 2014 12:07:55