TopicManager.h
Go to the documentation of this file.
00001 /*------------------------------------------------------------------------
00002  *---------------------           ros_rt_wmp              --------------------
00003  *------------------------------------------------------------------------
00004  *                                                         V0.1B  15/09/11
00005  *
00006  *
00007  *  File: ./src/TopicManager.h
00008  *  Authors: Danilo Tardioli
00009  *  ----------------------------------------------------------------------
00010  *  Copyright (C) 2000-2011, Universidad de Zaragoza, SPAIN
00011  *
00012  *  Contact Addresses: Danilo Tardioli                   dantard@unizar.es
00013  *
00014  *  RT-WMP is free software; you can  redistribute it and/or  modify it
00015  *  under the terms of the GNU General Public License  as published by the
00016  *  Free Software Foundation;  either  version 2, or (at  your option) any
00017  *  later version.
00018  *
00019  *  RT-WMP  is distributed  in the  hope  that  it will be   useful, but
00020  *  WITHOUT  ANY  WARRANTY;     without  even the   implied   warranty  of
00021  *  MERCHANTABILITY  or  FITNESS FOR A  PARTICULAR PURPOSE.    See the GNU
00022  *  General Public License for more details.
00023  *
00024  *  You should have received  a  copy of  the  GNU General Public  License
00025  *  distributed with RT-WMP;  see file COPYING.   If not,  write to the
00026  *  Free Software  Foundation,  59 Temple Place  -  Suite 330,  Boston, MA
00027  *  02111-1307, USA.
00028  *
00029  *  As a  special exception, if you  link this  unit  with other  files to
00030  *  produce an   executable,   this unit  does  not  by  itself cause  the
00031  *  resulting executable to be covered by the  GNU General Public License.
00032  *  This exception does  not however invalidate  any other reasons why the
00033  *  executable file might be covered by the GNU Public License.
00034  *
00035  *----------------------------------------------------------------------*/
00036 
00037 #ifndef TOPICMANAGER_H_
00038 #define TOPICMANAGER_H_
00039 
00040 #pragma GCC diagnostic ignored "-Warray-bounds"
00041 
00042 #include "Manager.h"
00043         #include <zlib.h>
00044 
00045 template<class T> class TopicManager: public Manager {
00046 protected:
00047         std::map<std::string, info_t> flows_map;
00048         ros::Subscriber sub;
00049         int counter;
00050 
00051 public:
00052 
00053         TopicManager(ros::NodeHandle * n, int port, std::string name,
00054                         std::string source, std::string destination, unsigned char priority, bool broadcast) :
00055                 Manager(n, port, name, priority) {
00056                 amIstatic = true;
00057                 setBroadcast(broadcast);
00058                 setSource(source);
00059                 setDestination(destination);
00060                 init();
00061         }
00062 
00063         TopicManager(ros::NodeHandle * n, int port, std::string name, std::string source, unsigned char priority, bool broadcast) :
00064                 Manager(n, port, name, priority) {
00065                 amIstatic = false;
00066                 setBroadcast(broadcast);
00067                 setSource(source);
00068                 init();
00069         }
00070 
00071         virtual void startRX(){
00072                 ROSWMP_DEBUG(stderr,"am I dest? %d", amIdst);
00073                 if (amIdst) {
00074                         ROSWMP_DEBUG(stderr,"Queue subscribed (%s) port : %d", name.c_str(), port);
00075                         boost::thread(boost::bind(&Manager::run, this));
00076                 }
00077         }
00078 
00079         virtual bool isHost() {
00080                 return amIsrc;
00081         }
00082 
00083 
00084         void init() {
00085 
00086                 if (amIsrc) {
00087                         std::ostringstream s;
00088                         s << n->getNamespace() << "/tx/" << name;
00089                         sub = n->subscribe(s.str(), 1, &TopicManager::callback, this);
00090                         ROSWMP_DEBUG(stderr,"Callback subscribed (%s)\n", s.str().c_str());
00091 
00092                         if (! amIstatic){
00093                                 init_param();
00094                         }
00095 
00096                         std::ostringstream s1;
00097                         s1 << n->getNamespace() << "/" << name << "/decimation";
00098                         decimation = s1.str();
00099                         n->setParam(decimation, 1);
00100                 }
00101                 counter = 1;
00102         }
00103 
00104         bool shouldDecimate(){
00105                 int val;
00106                 n->getParamCached(decimation, val);
00107                 if (val < 1){
00108                         n->setParam(decimation, 1);
00109                         val = 1;
00110                 }
00111                 if (counter == val){
00112                         counter = 1;
00113                         return false;
00114                 }else{
00115                         counter ++ ;
00116                         return true;
00117                 }
00118         }
00119 
00120         virtual void fillDestination(const boost::shared_ptr<T const> & message) {
00121                 std::string value;
00122                 n->getParamCached(param_dest, value);
00123                 ROSWMP_DEBUG(stderr,"Get param cached: %s\n",value.c_str());
00124                 setDestination(value);
00125         }
00126 
00127         virtual int getPriority(const boost::shared_ptr<T const> & message) {
00128                 return flow_prio;
00129         }
00130 
00131         virtual int getSubPort(T & pm) {
00132                 return 0;
00133         }
00134         virtual std::string getSubTopic(T & pm) {
00135                 return "x";
00136         }
00137         virtual void callback(const boost::shared_ptr<T const> & message) {
00138                 if (!justone && (stopped || shouldDecimate())) {
00139                         return;
00140                 }
00141                 justone = false;
00142                 if (!amIstatic){
00143                         fillDestination(message);
00144                 }
00145 
00146                 if (dests.size()>0) {
00147                         int priority = getPriority(message);
00148                         int n = serialize<T>((char*)(sbuff + sizeof(flow_t)), message);
00149                         ROSWMP_DEBUG(stderr,"Serializing size %d\n",n);
00150                         int bc_dest = computeBroadcastDest();
00151                         ROSWMP_DEBUG(stderr, "Push BC Message, size %d dest %d name %s\n", n + sizeof(flow_t),bc_dest,name.c_str());
00152 
00153 
00154 //                      int size = n;
00155 //                      char * p = sbuff + sizeof(flow_t);
00156 //                      Bytef * zd = (Bytef *) malloc(500000);
00157 //                      uLongf zlen = 500000;
00158 //                      int err = compress(zd, &zlen, (const Bytef*) p, (uLong) size);
00159 //                      if (err != Z_OK) {
00160 //                              fprintf(stderr,"UNABLE to compress, sending uncompressed");
00161 //                      } else {
00162 //                              //memcpy(p, (char*) zd, zlen);
00163 //                              size = zlen;
00164 //                              //data_pointer = (char*) zd;
00165 //                      }
00166 //                      fprintf(stderr, "Post compress :%d\n", size);
00167 
00168 
00169 
00170 
00171                         wmpPushData(port, sbuff, n + sizeof(flow_t), bc_dest, priority);
00172                 }
00173         }
00174         virtual bool popMessage(T & pm, unsigned int & size, unsigned char & src1, signed char & pri ){
00175                 char * p;
00176                 int idx = wmpPopData(port, &p, &size, &src1, &pri);
00177                 size = size - sizeof(flow_t);
00178                 if (!deserialize<T> (p + sizeof(flow_t), size, pm)) {
00179                         ROS_ERROR("Deserialize error\n");
00180                         wmpPopDataDone(idx);
00181                         return false;
00182                 }
00183                 wmpPopDataDone(idx);
00184                 return true;
00185         }
00186 
00187         virtual void run() {
00188                 T pm;
00189                 while (ros::ok()) {
00190 
00191                         signed char pri;
00192                         unsigned int size;
00193                         unsigned char src1;
00194 
00195                         if (!popMessage(pm, size, src1, pri)) {
00196                                 continue;
00197                         }
00198 
00199                         int subPort = getSubPort(pm);
00200                         std::string subTopic = getSubTopic(pm);
00201 
00202                         ROSWMP_DEBUG(stderr, "Received %s on Manager src %d \n", name.c_str(),src1);
00203 
00204                         std::ostringstream hash;
00205                         hash << n->getNamespace() << "/rx/R" << (int) src1 << "/" << name; //XXX: << "/compressed";
00206 
00207                         if (subTopic.compare("x") != 0) {
00208                                 hash << "/" << subTopic;
00209                         }
00210                         if (subPort > 0) {
00211                                 hash << "/" << subPort;
00212                         }
00213 
00214                         ROSWMP_DEBUG(stderr, "Received %s on Manager\n", hash.str().c_str());
00215 
00216 
00217                         if (flows_map.find(hash.str()) == flows_map.end()) {
00218                                 flows_map[hash.str()].publisher = n->advertise<T> (hash.str(),1);
00219                         }
00220 
00221                         flows_map[hash.str()].publisher.publish(pm);
00222                         ROSWMP_DEBUG(stderr, "Published (port:%d)\n!", port);
00223                 }
00224         }
00225 };
00226 
00227 #endif /* TOPICMANAGER_H_ */


ros_rt_wmp
Author(s): Danilo Tardioli, dantard@unizar.es
autogenerated on Fri Jan 3 2014 12:07:56