00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036 #ifndef MANAGER_H_
00037 #define MANAGER_H_
00038 #include <map>
00039 #include <sstream>
00040 #include <semaphore.h>
00041 #include <sstream>
00042 #include <vector>
00043 #include <boost/algorithm/string.hpp>
00044
00045 extern "C" {
00046 #include "core/interface/wmp_interface.h"
00047 #include "core/interface/Msg.h"
00048 #include "core/include/definitions.h"
00049 }
00050
00051 #define MAX_DATA_SIZE 512000
00052 #define WMP_MESSAGE_SIZE 1000
00053 #define ROSWMP_DEBUG(output,...) // fprintf(output, __VA_ARGS__); fprintf(output,"\n");
00054
00055 struct info_t {
00056 ros::Publisher publisher;
00057 };
00058
00059
00060 struct flow_t {
00061 int type;
00062 int ok;
00063 int len;
00064 int npaks;
00065 };
00066
00067 class Manager {
00068 protected:
00069 int port;
00070 unsigned char flow_prio;
00071 std::vector<uint8_t> dests;
00072 bool bc, stopped, amIsrc, amIdst,amIstatic;
00073 std::string name, param_dest, decimation;
00074 bool justone;
00075 char sbuff[MAX_DATA_SIZE];
00076 boost::shared_array<uint8_t> dbuff;
00077
00078 enum wmp_commands_t {
00079 STOP, TOPIC_STOP, TOPIC_START, TOPIC_DECIMATE, TOPIC_JUSTONE, SET_PRIORITY, GET_PRIORITY
00080 };
00081 public:
00082 ros::NodeHandle * n;
00083 Manager(ros::NodeHandle * n, int port, std::string name, unsigned char priority) {
00084 this->n = n;
00085 this->port = port;
00086 this->name = name;
00087 this->flow_prio = priority;
00088 amIdst = true;
00089 amIsrc = false;
00090 amIstatic = true;
00091 stopped = false;
00092 dbuff.reset(new unsigned char[MAX_DATA_SIZE]);
00093 justone = false;
00094 }
00095
00096 void setBroadcast(bool bc){
00097 this->bc = bc;
00098 }
00099
00100 void justOne(){
00101 justone = true;
00102 }
00103
00104 void setSource(std::string source){
00105 std::vector<std::string> strs;
00106 boost::split(strs, source, boost::is_any_of(" ,n/"));
00107 for (unsigned i = 0; i < strs.size();i++){
00108 if (strs.at(i).compare("")!=0){
00109 if (atoi(strs.at(i).c_str()) == wmpGetNodeId()){
00110 amIsrc = true;
00111 }
00112 }
00113 }
00114 }
00115
00116 void setDestination(std::string destination) {
00117 dests.clear();
00118 std::vector<std::string> strs;
00119 boost::split(strs, destination, boost::is_any_of(" ,n/"));
00120 for (unsigned i = 0; i < strs.size(); i++) {
00121 if (strs.at(i).compare("")!=0){
00122 int id = atoi(strs.at(i).c_str());
00123 if (id == wmpGetNodeId()) {
00124 amIdst = true;
00125
00126 } else {
00127 dests.push_back(id);
00128 }
00129
00130 }
00131 }
00132 }
00133
00134 int computeBroadcastDest(){
00135 int dest = 0;
00136 for (unsigned i = 0; i < dests.size(); i++) {
00137 dest = dest + int(pow(2,dests[i]));
00138 }
00139 return dest;
00140 }
00141
00142 virtual void init_param() {
00143
00144 std::ostringstream s1,s2;
00145 s1 << n->getNamespace() << "/" << name << "/dest";
00146 param_dest = s1.str();
00147
00148 for (unsigned i = 0; i < dests.size(); i++){
00149 if (i == 0){
00150 s2 << int(dests.at(i));
00151 } else {
00152 s2 << "," << int(dests.at(i));
00153 }
00154 }
00155
00156 n->setParam(param_dest, s2.str());
00157 }
00158
00159 virtual void startRX(){
00160 }
00161
00162 virtual unsigned char getPriority(){
00163 return flow_prio;
00164 }
00165
00166 virtual void setPriority(unsigned char prio){
00167 flow_prio = prio;
00168 }
00169
00170
00171 virtual void run()=0;
00172
00173 virtual void stop() {
00174 ROSWMP_DEBUG(stderr, "Stopping %s\n",name.c_str());
00175 stopped = true;
00176 }
00177
00178 virtual void start() {
00179 ROSWMP_DEBUG(stderr, "Starting %s\n",name.c_str());
00180 stopped = false;
00181 }
00182
00183 virtual bool isHost() {
00184 return false;
00185 }
00186
00187 template <typename Q> bool deserialize(char * p, int size, Q & pm){
00188 memcpy(dbuff.get(), p, size);
00189 ROSWMP_DEBUG(stderr, "DESERIALIZE SIZE: %d %s \n",size,name.c_str());
00190 try {
00191 ros::SerializedMessage smsg(dbuff, size);
00192 ros::serialization::deserializeMessage<Q>(smsg, pm);
00193
00194 ros::Time* tm = ros::message_traits::timeStamp(pm);
00195 if (tm){
00196 ROSWMP_DEBUG(stderr,"Type %s, tm", ros::message_traits::datatype(pm));
00197 *tm = ros::Time::now();
00198 }else{
00199 ROSWMP_DEBUG(stderr,"Type %s, no TS", ros::message_traits::datatype(pm));
00200 }
00201
00202 return true;
00203 } catch (...){
00204 ROS_ERROR("DESERIALIZE ERROR IN %s\n", name.c_str());
00205 return false;
00206 }
00207 }
00208 template <typename P> int serialize(char * p, const boost::shared_ptr<P const> & pm){
00209 ros::SerializedMessage sbuffer = ros::serialization::serializeMessage<P>(*pm);
00210 memcpy(p,sbuffer.message_start,sbuffer.num_bytes);
00211 return sbuffer.num_bytes;
00212 }
00213
00214 template <typename P> int serialize(char * p, P & pm){
00215 ros::SerializedMessage sbuffer = ros::serialization::serializeMessage<P>(pm);
00216 memcpy(p,sbuffer.message_start,sbuffer.num_bytes);
00217 return sbuffer.num_bytes;
00218 }
00219 void setDecimation(int i){
00220 decimation = i;
00221 }
00222 };
00223
00224 #endif