packet_parser.cpp
Go to the documentation of this file.
00001 
00023 #include "micros_swarm/packet_parser.h"
00024 
00025 namespace micros_swarm{
00026 
00027     PacketParser::PacketParser()
00028     {
00029         rth_ = Singleton<RuntimeHandle>::getSingleton();
00030         cni_.reset(new CheckNeighbor(rth_->getNeighborDistance()));
00031         mqm_ = Singleton<MsgQueueManager>::getSingleton();
00032     }
00033 
00034     PacketParser::~PacketParser()
00035     {
00036         rth_.reset();
00037         cni_.reset();
00038         mqm_.reset();
00039     }
00040 
00041     void PacketParser::parse(const std::vector<uint8_t>& data)
00042     {
00043         gsdf_msgs::CommPacket packet = deserialize_ros<gsdf_msgs::CommPacket>(data);
00044         int packet_source = packet.header.source;
00045         int shm_rid = rth_->getRobotID();
00046         
00047         //ignore the packet of the local robot
00048         if(packet_source == shm_rid) {
00049             return;
00050         }
00051 
00052         try {
00053             const int packet_type = packet.header.type;
00054             std::vector<uint8_t> packet_data = packet.content.buf;
00055 
00056             switch(packet_type) {
00057                 case SINGLE_ROBOT_BROADCAST_BASE: {
00058                     gsdf_msgs::RobotBase rb = deserialize_ros<gsdf_msgs::RobotBase>(packet_data);
00059                 
00060                     //if(rb.valid != 1) {  //ignore the default Base value.
00061                     //    return;
00062                     //}
00063                 
00064                     const Base& self = rth_->getRobotBase();
00065                     Base neighbor(rb.px, rb.py, rb.pz, rb.vx, rb.vy, rb.vz, rb.valid);
00066                 
00067                     if(cni_->isNeighbor(self, neighbor)) {
00068                         rth_->insertOrUpdateNeighbor(rb.id, 0, 0, 0, rb.px, rb.py, rb.pz, rb.vx, rb.vy, rb.vz);
00069                     }
00070                     else {
00071                         rth_->deleteNeighbor(rb.id);
00072                     }
00073                 
00074                     break;
00075                 }
00076                 case SINGLE_ROBOT_JOIN_SWARM: {
00077                     if(!rth_->inNeighbors(packet_source)){
00078                         return;
00079                     }
00080                     gsdf_msgs::JoinSwarm srjs = deserialize_ros<gsdf_msgs::JoinSwarm>(packet_data);
00081                     rth_->joinNeighborSwarm(srjs.robot_id, srjs.swarm_id);
00082                 
00083                     break;
00084                 }
00085                 case SINGLE_ROBOT_LEAVE_SWARM: {
00086                     if(!rth_->inNeighbors(packet_source)){
00087                         return;
00088                     }
00089                     gsdf_msgs::LeaveSwarm srls = deserialize_ros<gsdf_msgs::LeaveSwarm>(packet_data);
00090                     rth_->leaveNeighborSwarm(srls.robot_id, srls.swarm_id);
00091                 
00092                     break;
00093                 }
00094                 case SINGLE_ROBOT_SWARM_LIST: {
00095                     if(!rth_->inNeighbors(packet_source)){
00096                         return;
00097                     }
00098                     gsdf_msgs::SwarmList srsl = deserialize_ros<gsdf_msgs::SwarmList>(packet_data);
00099                     rth_->insertOrRefreshNeighborSwarm(srsl.robot_id, srsl.swarm_list);
00100                 
00101                     break;
00102                 }
00103                 case VIRTUAL_STIGMERGY_QUERY: {
00104                     if(!rth_->inNeighbors(packet_source)) {
00105                         return;
00106                     }
00107                     gsdf_msgs::VirtualStigmergyQuery vsq = deserialize_ros<gsdf_msgs::VirtualStigmergyQuery>(packet_data);
00108                     VirtualStigmergyTuple local;
00109                     bool exist = rth_->getVirtualStigmergyTuple(vsq.vstig_id, vsq.key, local);
00110                 
00111                     //local tuple is not exist or the local timestamp is smaller
00112                     if(!exist||(local.lamport_clock<vsq.lamport_clock)) {
00113                         rth_->createVirtualStigmergy(vsq.vstig_id);
00114                         rth_->insertOrUpdateVirtualStigmergy(vsq.vstig_id, vsq.key, vsq.value, vsq.lamport_clock, time(NULL), 0, vsq.robot_id);
00115 
00116                         if(!rth_->checkNeighborsOverlap(packet_source)) {
00117                             gsdf_msgs::VirtualStigmergyPut vsp_new;
00118                             vsp_new.vstig_id = vsq.vstig_id;
00119                             vsp_new.key = vsq.key;
00120                             vsp_new.value = vsq.value;
00121                             vsp_new.lamport_clock = vsq.lamport_clock;
00122                             vsp_new.robot_id = vsq.robot_id;
00123                             std::vector<uint8_t> vsp_new_vec = serialize_ros(vsp_new);
00124 
00125                             gsdf_msgs::CommPacket p;
00126                             p.header.source = shm_rid;
00127                             p.header.type = VIRTUAL_STIGMERGY_PUT;
00128                             p.header.data_len = vsp_new_vec.size();
00129                             p.header.version = 1;
00130                             p.header.checksum = 0;
00131                             p.content.buf = vsp_new_vec;
00132                             std::vector<uint8_t> msg_data = serialize_ros(p);
00133                             mqm_->getOutMsgQueue("vstig")->push(msg_data);
00134                         }
00135                     }
00136                     else if(local.lamport_clock > vsq.lamport_clock) {  //local timestamp is larger
00137                         gsdf_msgs::VirtualStigmergyPut vsp;
00138                         vsp.vstig_id = vsq.vstig_id;
00139                         vsp.key = vsq.key;
00140                         vsp.value = local.vstig_value;
00141                         vsp.lamport_clock = local.lamport_clock;
00142                         vsp.robot_id = local.robot_id;
00143                         std::vector<uint8_t> vsp_vec = serialize_ros(vsp);
00144                     
00145                         gsdf_msgs::CommPacket p;
00146                         p.header.source = shm_rid;
00147                         p.header.type = VIRTUAL_STIGMERGY_PUT;
00148                         p.header.data_len = vsp_vec.size();
00149                         p.header.version = 1;
00150                         p.header.checksum = 0;
00151                         p.content.buf = vsp_vec;
00152                         std::vector<uint8_t> msg_data = serialize_ros(p);
00153                         mqm_->getOutMsgQueue("vstig")->push(msg_data);
00154                     }
00155                     else if((local.lamport_clock == vsq.lamport_clock) && (local.robot_id != vsq.robot_id)) {
00156                         //std::cout<<"query conflict"<<std::endl;
00157                     }
00158                     else {
00159                         //do nothing
00160                     }
00161                 
00162                     break;
00163                 }
00164                 case VIRTUAL_STIGMERGY_PUT: {
00165                     if(!rth_->inNeighbors(packet_source)){
00166                         return;
00167                     }
00168                     gsdf_msgs::VirtualStigmergyPut vsp = deserialize_ros<gsdf_msgs::VirtualStigmergyPut>(packet_data);
00169                 
00170                     VirtualStigmergyTuple local;
00171                     bool exist = rth_->getVirtualStigmergyTuple(vsp.vstig_id, vsp.key, local);
00172                 
00173                     //local tuple is not exist or local timestamp is smaller
00174                     if(!exist||(local.lamport_clock < vsp.lamport_clock)) {
00175                         rth_->createVirtualStigmergy(vsp.vstig_id);
00176                         rth_->insertOrUpdateVirtualStigmergy(vsp.vstig_id, vsp.key, vsp.value, vsp.lamport_clock, time(NULL), 0, vsp.robot_id);
00177 
00178                         if(!rth_->checkNeighborsOverlap(packet_source)) {
00179                             gsdf_msgs::VirtualStigmergyPut vsp_new;
00180                             vsp_new.vstig_id = vsp.vstig_id;
00181                             vsp_new.key = vsp.key;
00182                             vsp_new.value = vsp.value;
00183                             vsp_new.lamport_clock = vsp.lamport_clock;
00184                             vsp_new.robot_id = vsp.robot_id;
00185                             std::vector<uint8_t> vsp_new_vec = serialize_ros(vsp_new);
00186 
00187                             gsdf_msgs::CommPacket p;
00188                             p.header.source = shm_rid;
00189                             p.header.type = VIRTUAL_STIGMERGY_PUT;
00190                             p.header.data_len = vsp_new_vec.size();
00191                             p.header.version = 1;
00192                             p.header.checksum = 0;
00193                             p.content.buf = vsp_new_vec;
00194                             std::vector<uint8_t> msg_data = serialize_ros(p);
00195                             mqm_->getOutMsgQueue("vstig")->push(msg_data);
00196                         }
00197                     }
00198                     else if((local.lamport_clock==vsp.lamport_clock)&&(local.robot_id!=vsp.robot_id)) {
00199                         //std::cout<<"put conflict"<<std::endl;
00200                     }
00201                     else {
00202                         //do nothing
00203                     }
00204                         
00205                     break;
00206                 }
00207                 case VIRTUAL_STIGMERGY_PUTS: {
00208                     if(!rth_->inNeighbors(packet_source)){
00209                         return;
00210                     }
00211                     gsdf_msgs::VirtualStigmergyPuts vsps = deserialize_ros<gsdf_msgs::VirtualStigmergyPuts>(packet_data);
00212 
00213                     bool process_msg = false;
00214                     int neighbor_size = rth_->getNeighborSize();
00215                     float prob = vsps.prob;
00216                     float rand_prob = (float)rand()/RAND_MAX;
00217 
00218                     if(neighbor_size < 3) {
00219                         process_msg = true;
00220                     }
00221                     else {
00222                         if(rand_prob < prob) {
00223                             process_msg = true;
00224                         }
00225                         else {
00226                             process_msg = false;
00227                         }
00228                     }
00229 
00230                     //std::cout<<rth_->getRobotID()<<": "<<rand_prob<<", "<<prob<<", "<<process_msg<<std::endl;
00231 
00232                     if(!process_msg) {
00233                         return;
00234                     }
00235 
00236                     VirtualStigmergyTuple local;
00237                     bool exist = rth_->getVirtualStigmergyTuple(vsps.vstig_id, vsps.key, local);
00238 
00239                     //local tuple is not exist or local timestamp is smaller
00240                     if(!exist||(local.lamport_clock < vsps.lamport_clock)) {
00241                         rth_->createVirtualStigmergy(vsps.vstig_id);
00242                         rth_->insertOrUpdateVirtualStigmergy(vsps.vstig_id, vsps.key, vsps.value, vsps.lamport_clock, time(NULL), 0, vsps.robot_id);
00243 
00244                         if(!rth_->checkNeighborsOverlap(packet_source)) {
00245                             gsdf_msgs::VirtualStigmergyPuts vsps_new;
00246                             vsps_new.vstig_id = vsps.vstig_id;
00247                             vsps_new.key = vsps.key;
00248                             vsps_new.value = vsps.value;
00249                             vsps_new.lamport_clock = vsps.lamport_clock;
00250                             vsps_new.robot_id = vsps.robot_id;
00251                             int neighbor_size = rth_->getNeighborSize();
00252                             if(neighbor_size < 3) {
00253                                 vsps_new.prob = 1;
00254                             }
00255                             else {
00256                                 vsps_new.prob = 2.0/neighbor_size;
00257                             }
00258                             std::vector<uint8_t> vsps_new_vec = serialize_ros(vsps_new);
00259 
00260                             gsdf_msgs::CommPacket p;
00261                             p.header.source = shm_rid;
00262                             p.header.type = VIRTUAL_STIGMERGY_PUTS;
00263                             p.header.data_len = vsps_new_vec.size();
00264                             p.header.version = 1;
00265                             p.header.checksum = 0;
00266                             p.content.buf = vsps_new_vec;
00267                             std::vector<uint8_t> msg_data = serialize_ros(p);
00268                             mqm_->getOutMsgQueue("vstig")->push(msg_data);
00269                         }
00270                     }
00271                     else if((local.lamport_clock==vsps.lamport_clock)&&(local.robot_id!=vsps.robot_id)) {
00272                         //std::cout<<"put conflict"<<std::endl;
00273                     }
00274                     else {
00275                         //do nothing
00276                     }
00277 
00278                     break;
00279                 }
00280                 case BLACKBOARD_PUT: {
00281                     if(!rth_->inNeighbors(packet_source)){
00282                         return;
00283                     }
00284                     gsdf_msgs::BlackBoardPut bbp = deserialize_ros<gsdf_msgs::BlackBoardPut>(packet_data);
00285                     int robot_id = rth_->getRobotID();
00286                     std::string bb_key = bbp.key;
00287                     if(bbp.on_robot_id == robot_id) {
00288                         rth_->createBlackBoard(bbp.bb_id);
00289                         BlackBoardTuple local;
00290                         rth_->getBlackBoardTuple(bbp.bb_id, bbp.key, local);
00291 
00292                         //local tuple is not exist or the local timestamp is smaller
00293                         if(!rth_->isBlackBoardTupleExist(bbp.bb_id, bbp.key) || local.timestamp < bbp.timestamp) {
00294                             rth_->insertOrUpdateBlackBoard(bbp.bb_id, bbp.key, bbp.value, bbp.timestamp, bbp.robot_id);
00295                         }
00296                     }
00297                     else{
00298 
00299                     }
00300 
00301                     break;
00302                 }
00303                 case BLACKBOARD_QUERY: {
00304                     if(!rth_->inNeighbors(packet_source)){
00305                         return;
00306                     }
00307                     gsdf_msgs::BlackBoardQuery bbq = deserialize_ros<gsdf_msgs::BlackBoardQuery>(packet_data);
00308                     int robot_id = rth_->getRobotID();
00309                     std::string bb_key = bbq.key;
00310 
00311                     if(bbq.on_robot_id == robot_id) {
00312                         BlackBoardTuple local;
00313                         rth_->getBlackBoardTuple(bbq.bb_id, bbq.key, local);
00314                         gsdf_msgs::BlackBoardAck bba;
00315                         bba.bb_id = bbq.bb_id;
00316                         bba.on_robot_id = bbq.on_robot_id;
00317                         bba.key = bbq.key;
00318                         bba.value = local.bb_value;
00319                         bba.timestamp = local.timestamp;
00320                         bba.robot_id = bbq.robot_id;
00321                         std::vector<uint8_t> bbqa_vec = serialize_ros(bba);
00322 
00323                         gsdf_msgs::CommPacket p;
00324                         p.header.source = shm_rid;
00325                         p.header.type = BLACKBOARD_QUERY_ACK;
00326                         p.header.data_len = bbqa_vec.size();
00327                         p.header.version = 1;
00328                         p.header.checksum = 0;
00329                         p.content.buf = bbqa_vec;
00330                         std::vector<uint8_t> msg_data = serialize_ros(p);
00331                         mqm_->getOutMsgQueue("bb")->push(msg_data);
00332                     }
00333                     else{
00334 
00335                     }
00336 
00337                     break;
00338                 }
00339                 case BLACKBOARD_QUERY_ACK: {
00340                     if(!rth_->inNeighbors(packet_source)){
00341                         return;
00342                     }
00343                     gsdf_msgs::BlackBoardAck bba = deserialize_ros<gsdf_msgs::BlackBoardAck>(packet_data);
00344                     int robot_id = rth_->getRobotID();
00345                     std::string bb_key = bba.key;
00346 
00347                     if(bba.on_robot_id == robot_id) {
00348 
00349                     }
00350                     else {
00351                         rth_->createBlackBoard(bba.bb_id);
00352                         rth_->insertOrUpdateBlackBoard(bba.bb_id, bba.key, bba.value, bba.timestamp, bba.robot_id);
00353                     }
00354 
00355                     break;
00356                 }
00357                 case SCDS_PSO_PUT: {
00358                     if(!rth_->inNeighbors(packet_source)){
00359                         return;
00360                     }
00361                     gsdf_msgs::SCDSPSOPut scds_put = deserialize_ros<gsdf_msgs::SCDSPSOPut>(packet_data);
00362                     SCDSPSODataTuple local;
00363                     bool exist = rth_->getSCDSPSOValue(scds_put.key, local);
00364 
00365                     //local tuple is not exist or local value is smaller
00366                     if ((!exist) || (local.val < scds_put.val)) {
00367                         SCDSPSODataTuple new_data;
00368                         new_data.pos = scds_put.pos;
00369                         new_data.val = scds_put.val;
00370                         new_data.robot_id = scds_put.robot_id;
00371                         new_data.gen = scds_put.gen;
00372                         new_data.timestamp = scds_put.timestamp;
00373                         rth_->insertOrUpdateSCDSPSOValue(scds_put.key, new_data);
00374 
00375                         if(!rth_->checkNeighborsOverlap(packet_source)) {
00376                             gsdf_msgs::CommPacket p;
00377                             p.header.source = shm_rid;
00378                             p.header.type = SCDS_PSO_PUT;
00379                             p.header.data_len = packet_data.size();
00380                             p.header.version = 1;
00381                             p.header.checksum = 0;
00382                             p.content.buf = packet_data;
00383                             std::vector <uint8_t> msg_data = serialize_ros(p);
00384                             mqm_->getOutMsgQueue("scds_pso")->push(msg_data);
00385                         }
00386                     }
00387 
00388                     break;
00389                 }
00390                 case SCDS_PSO_GET: {
00391                     if(!rth_->inNeighbors(packet_source)){
00392                         return;
00393                     }
00394                     gsdf_msgs::SCDSPSOGet scds_get = deserialize_ros<gsdf_msgs::SCDSPSOGet>(packet_data);
00395                     SCDSPSODataTuple local;
00396                     bool exist = rth_->getSCDSPSOValue(scds_get.key, local);
00397 
00398                     //local tuple is not exist or local value is smaller
00399                     if ((!exist) || (local.val < scds_get.val)) {
00400                         SCDSPSODataTuple new_data;
00401                         new_data.pos = scds_get.pos;
00402                         new_data.val = scds_get.val;
00403                         new_data.robot_id = scds_get.robot_id;
00404                         new_data.gen = scds_get.gen;
00405                         new_data.timestamp = scds_get.timestamp;
00406                         rth_->insertOrUpdateSCDSPSOValue(scds_get.key, new_data);
00407 
00408                         if(!rth_->checkNeighborsOverlap(packet_source)) {
00409                             gsdf_msgs::CommPacket p;
00410                             p.header.source = shm_rid;
00411                             p.header.type = SCDS_PSO_PUT;
00412                             p.header.data_len = packet_data.size();
00413                             p.header.version = 1;
00414                             p.header.checksum = 0;
00415                             p.content.buf = packet_data;
00416                             std::vector <uint8_t> msg_data = serialize_ros(p);
00417                             mqm_->getOutMsgQueue("scds_pso")->push(msg_data);
00418                         }
00419                     }
00420                     else if(local.val > scds_get.val) {
00421                         gsdf_msgs::SCDSPSOPut scds_put;
00422                         scds_put.key = scds_get.key;
00423                         scds_put.pos = local.pos;
00424                         scds_put.val = local.val;
00425                         scds_put.robot_id = local.robot_id;
00426                         scds_put.gen = local.gen;
00427                         scds_put.timestamp = local.timestamp;
00428                         std::vector<uint8_t> scds_put_vec = serialize_ros(scds_put);;
00429 
00430                         gsdf_msgs::CommPacket p;
00431                         p.header.source = shm_rid;
00432                         p.header.type = SCDS_PSO_PUT;
00433                         p.header.data_len = scds_put_vec.size();
00434                         p.header.version = 1;
00435                         p.header.checksum = 0;
00436                         p.content.buf = scds_put_vec;
00437                         std::vector<uint8_t> msg_data = serialize_ros(p);
00438                         mqm_->getOutMsgQueue("scds_pso")->push(msg_data);
00439                     }
00440                     else {
00441 
00442                     }
00443 
00444                     break;
00445                 }
00446                 case NEIGHBOR_BROADCAST_KEY_VALUE: {
00447                     if(!rth_->inNeighbors(packet_source)){
00448                         return;
00449                     }
00450                     gsdf_msgs::NeighborBroadcastKeyValue nbkv = deserialize_ros<gsdf_msgs::NeighborBroadcastKeyValue>(packet_data);
00451                     boost::shared_ptr<ListenerHelper> helper = rth_->getListenerHelper(nbkv.key);
00452                     if(helper == NULL) {
00453                         return;
00454                     }
00455                     helper->call(nbkv.value);
00456                
00457                     break;
00458                 }
00459                 case BARRIER_SYN: {
00460                     gsdf_msgs::BarrierSyn bs = deserialize_ros<gsdf_msgs::BarrierSyn>(packet_data);
00461                     if(bs.s != "SYN") {
00462                         return;
00463                     }
00464 
00465                     gsdf_msgs::BarrierAck ba;
00466                     ba.robot_id = packet.header.source;
00467                     std::vector<uint8_t> ba_vec = serialize_ros(ba);
00468                     
00469                     gsdf_msgs::CommPacket p;
00470                     p.header.source = shm_rid;
00471                     p.header.type = BARRIER_ACK;
00472                     p.header.data_len = ba_vec.size();
00473                     p.header.version = 1;
00474                     p.header.checksum = 0;
00475                     p.content.buf = ba_vec;
00476                     std::vector<uint8_t> msg_data = serialize_ros(p);
00477                     mqm_->getOutMsgQueue("barrier")->push(msg_data);
00478                     break;
00479                 }
00480                 case BARRIER_ACK: {
00481                     gsdf_msgs::BarrierAck ba = deserialize_ros<gsdf_msgs::BarrierAck>(packet_data);
00482                 
00483                     if(shm_rid == ba.robot_id) {
00484                         rth_->insertBarrier(packet.header.source);
00485                     }
00486                     
00487                     break;
00488                 }
00489             
00490                 default: {
00491                     std::cout<<"UNDEFINED PACKET TYPE!"<<std::endl;
00492                 }
00493             }
00494         }
00495         catch(char *err_str) {
00496             std::cout<<err_str<<std::endl;
00497             return;
00498         }
00499     }
00500 
00501     void PacketParser::PacketParser::parse(const std::vector<char>& data)
00502     {
00503         std::vector<uint8_t> seq;
00504         for(int i = 0; i < data.size(); i++) {
00505             uint8_t c = (uint8_t)(data[i]);
00506             seq.push_back(c);
00507         }
00508         parse(seq);
00509     }
00510 
00511     void PacketParser::parse(uint8_t* data, int len)
00512     {
00513         std::vector<uint8_t> seq;
00514         for(int i = 0; i < len; i++) {
00515             seq.push_back(*(data+i));
00516         }
00517         parse(seq);
00518     }
00519 
00520     void PacketParser::parse(char* data, int len)
00521     {
00522         uint8_t *p = (uint8_t*)data;
00523         parse(p, len);
00524     }
00525 };


micros_swarm
Author(s):
autogenerated on Thu Jun 6 2019 18:52:14