00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037 #ifndef TOPICMANAGER_H_
00038 #define TOPICMANAGER_H_
00039
00040 #pragma GCC diagnostic ignored "-Warray-bounds"
00041
00042 #include "Manager.h"
00043 #include <zlib.h>
00044
00045 template<class T> class TopicManager: public Manager {
00046 protected:
00047 std::map<std::string, info_t> flows_map;
00048 ros::Subscriber sub;
00049 int counter;
00050
00051 public:
00052
00053 TopicManager(ros::NodeHandle * n, int port, std::string name,
00054 std::string source, std::string destination, unsigned char priority, bool broadcast) :
00055 Manager(n, port, name, priority) {
00056 amIstatic = true;
00057 setBroadcast(broadcast);
00058 setSource(source);
00059 setDestination(destination);
00060 init();
00061 }
00062
00063 TopicManager(ros::NodeHandle * n, int port, std::string name, std::string source, unsigned char priority, bool broadcast) :
00064 Manager(n, port, name, priority) {
00065 amIstatic = false;
00066 setBroadcast(broadcast);
00067 setSource(source);
00068 init();
00069 }
00070
00071 virtual void startRX(){
00072 ROSWMP_DEBUG(stderr,"am I dest? %d", amIdst);
00073 if (amIdst) {
00074 ROSWMP_DEBUG(stderr,"Queue subscribed (%s) port : %d", name.c_str(), port);
00075 boost::thread(boost::bind(&Manager::run, this));
00076 }
00077 }
00078
00079 virtual bool isHost() {
00080 return amIsrc;
00081 }
00082
00083
00084 void init() {
00085
00086 if (amIsrc) {
00087 std::ostringstream s;
00088 s << n->getNamespace() << "/tx/" << name;
00089 sub = n->subscribe(s.str(), 1, &TopicManager::callback, this);
00090 ROSWMP_DEBUG(stderr,"Callback subscribed (%s)\n", s.str().c_str());
00091
00092 if (! amIstatic){
00093 init_param();
00094 }
00095
00096 std::ostringstream s1;
00097 s1 << n->getNamespace() << "/" << name << "/decimation";
00098 decimation = s1.str();
00099 n->setParam(decimation, 1);
00100 }
00101 counter = 1;
00102 }
00103
00104 bool shouldDecimate(){
00105 int val;
00106 n->getParamCached(decimation, val);
00107 if (val < 1){
00108 n->setParam(decimation, 1);
00109 val = 1;
00110 }
00111 if (counter == val){
00112 counter = 1;
00113 return false;
00114 }else{
00115 counter ++ ;
00116 return true;
00117 }
00118 }
00119
00120 virtual void fillDestination(const boost::shared_ptr<T const> & message) {
00121 std::string value;
00122 n->getParamCached(param_dest, value);
00123 ROSWMP_DEBUG(stderr,"Get param cached: %s\n",value.c_str());
00124 setDestination(value);
00125 }
00126
00127 virtual int getPriority(const boost::shared_ptr<T const> & message) {
00128 return flow_prio;
00129 }
00130
00131 virtual int getSubPort(T & pm) {
00132 return 0;
00133 }
00134 virtual std::string getSubTopic(T & pm) {
00135 return "x";
00136 }
00137 virtual void callback(const boost::shared_ptr<T const> & message) {
00138 if (!justone && (stopped || shouldDecimate())) {
00139 return;
00140 }
00141 justone = false;
00142 if (!amIstatic){
00143 fillDestination(message);
00144 }
00145
00146 if (dests.size()>0) {
00147 int priority = getPriority(message);
00148 int n = serialize<T>((char*)(sbuff + sizeof(flow_t)), message);
00149 ROSWMP_DEBUG(stderr,"Serializing size %d\n",n);
00150 int bc_dest = computeBroadcastDest();
00151 ROSWMP_DEBUG(stderr, "Push BC Message, size %d dest %d name %s\n", n + sizeof(flow_t),bc_dest,name.c_str());
00152
00153
00154
00155
00156
00157
00158
00159
00160
00161
00162
00163
00164
00165
00166
00167
00168
00169
00170
00171 wmpPushData(port, sbuff, n + sizeof(flow_t), bc_dest, priority);
00172 }
00173 }
00174 virtual bool popMessage(T & pm, unsigned int & size, unsigned char & src1, signed char & pri ){
00175 char * p;
00176 int idx = wmpPopData(port, &p, &size, &src1, &pri);
00177 size = size - sizeof(flow_t);
00178 if (!deserialize<T> (p + sizeof(flow_t), size, pm)) {
00179 ROS_ERROR("Deserialize error\n");
00180 wmpPopDataDone(idx);
00181 return false;
00182 }
00183 wmpPopDataDone(idx);
00184 return true;
00185 }
00186
00187 virtual void run() {
00188 T pm;
00189 while (ros::ok()) {
00190
00191 signed char pri;
00192 unsigned int size;
00193 unsigned char src1;
00194
00195 if (!popMessage(pm, size, src1, pri)) {
00196 continue;
00197 }
00198
00199 int subPort = getSubPort(pm);
00200 std::string subTopic = getSubTopic(pm);
00201
00202 ROSWMP_DEBUG(stderr, "Received %s on Manager src %d \n", name.c_str(),src1);
00203
00204 std::ostringstream hash;
00205 hash << n->getNamespace() << "/rx/R" << (int) src1 << "/" << name;
00206
00207 if (subTopic.compare("x") != 0) {
00208 hash << "/" << subTopic;
00209 }
00210 if (subPort > 0) {
00211 hash << "/" << subPort;
00212 }
00213
00214 ROSWMP_DEBUG(stderr, "Received %s on Manager\n", hash.str().c_str());
00215
00216
00217 if (flows_map.find(hash.str()) == flows_map.end()) {
00218 flows_map[hash.str()].publisher = n->advertise<T> (hash.str(),1);
00219 }
00220
00221 flows_map[hash.str()].publisher.publish(pm);
00222 ROSWMP_DEBUG(stderr, "Published (port:%d)\n!", port);
00223 }
00224 }
00225 };
00226
00227 #endif