00001
00023 #include "micros_swarm/packet_parser.h"
00024
00025 namespace micros_swarm{
00026
00027 PacketParser::PacketParser()
00028 {
00029 rth_ = Singleton<RuntimeHandle>::getSingleton();
00030 cni_.reset(new CheckNeighbor(rth_->getNeighborDistance()));
00031 mqm_ = Singleton<MsgQueueManager>::getSingleton();
00032 }
00033
00034 PacketParser::~PacketParser()
00035 {
00036 rth_.reset();
00037 cni_.reset();
00038 mqm_.reset();
00039 }
00040
00041 void PacketParser::parse(const std::vector<uint8_t>& data)
00042 {
00043 gsdf_msgs::CommPacket packet = deserialize_ros<gsdf_msgs::CommPacket>(data);
00044 int packet_source = packet.header.source;
00045 int shm_rid = rth_->getRobotID();
00046
00047
00048 if(packet_source == shm_rid) {
00049 return;
00050 }
00051
00052 try {
00053 const int packet_type = packet.header.type;
00054 std::vector<uint8_t> packet_data = packet.content.buf;
00055
00056 switch(packet_type) {
00057 case SINGLE_ROBOT_BROADCAST_BASE: {
00058 gsdf_msgs::RobotBase rb = deserialize_ros<gsdf_msgs::RobotBase>(packet_data);
00059
00060
00061
00062
00063
00064 const Base& self = rth_->getRobotBase();
00065 Base neighbor(rb.px, rb.py, rb.pz, rb.vx, rb.vy, rb.vz, rb.valid);
00066
00067 if(cni_->isNeighbor(self, neighbor)) {
00068 rth_->insertOrUpdateNeighbor(rb.id, 0, 0, 0, rb.px, rb.py, rb.pz, rb.vx, rb.vy, rb.vz);
00069 }
00070 else {
00071 rth_->deleteNeighbor(rb.id);
00072 }
00073
00074 break;
00075 }
00076 case SINGLE_ROBOT_JOIN_SWARM: {
00077 if(!rth_->inNeighbors(packet_source)){
00078 return;
00079 }
00080 gsdf_msgs::JoinSwarm srjs = deserialize_ros<gsdf_msgs::JoinSwarm>(packet_data);
00081 rth_->joinNeighborSwarm(srjs.robot_id, srjs.swarm_id);
00082
00083 break;
00084 }
00085 case SINGLE_ROBOT_LEAVE_SWARM: {
00086 if(!rth_->inNeighbors(packet_source)){
00087 return;
00088 }
00089 gsdf_msgs::LeaveSwarm srls = deserialize_ros<gsdf_msgs::LeaveSwarm>(packet_data);
00090 rth_->leaveNeighborSwarm(srls.robot_id, srls.swarm_id);
00091
00092 break;
00093 }
00094 case SINGLE_ROBOT_SWARM_LIST: {
00095 if(!rth_->inNeighbors(packet_source)){
00096 return;
00097 }
00098 gsdf_msgs::SwarmList srsl = deserialize_ros<gsdf_msgs::SwarmList>(packet_data);
00099 rth_->insertOrRefreshNeighborSwarm(srsl.robot_id, srsl.swarm_list);
00100
00101 break;
00102 }
00103 case VIRTUAL_STIGMERGY_QUERY: {
00104 if(!rth_->inNeighbors(packet_source)) {
00105 return;
00106 }
00107 gsdf_msgs::VirtualStigmergyQuery vsq = deserialize_ros<gsdf_msgs::VirtualStigmergyQuery>(packet_data);
00108 VirtualStigmergyTuple local;
00109 bool exist = rth_->getVirtualStigmergyTuple(vsq.vstig_id, vsq.key, local);
00110
00111
00112 if(!exist||(local.lamport_clock<vsq.lamport_clock)) {
00113 rth_->createVirtualStigmergy(vsq.vstig_id);
00114 rth_->insertOrUpdateVirtualStigmergy(vsq.vstig_id, vsq.key, vsq.value, vsq.lamport_clock, time(NULL), 0, vsq.robot_id);
00115
00116 if(!rth_->checkNeighborsOverlap(packet_source)) {
00117 gsdf_msgs::VirtualStigmergyPut vsp_new;
00118 vsp_new.vstig_id = vsq.vstig_id;
00119 vsp_new.key = vsq.key;
00120 vsp_new.value = vsq.value;
00121 vsp_new.lamport_clock = vsq.lamport_clock;
00122 vsp_new.robot_id = vsq.robot_id;
00123 std::vector<uint8_t> vsp_new_vec = serialize_ros(vsp_new);
00124
00125 gsdf_msgs::CommPacket p;
00126 p.header.source = shm_rid;
00127 p.header.type = VIRTUAL_STIGMERGY_PUT;
00128 p.header.data_len = vsp_new_vec.size();
00129 p.header.version = 1;
00130 p.header.checksum = 0;
00131 p.content.buf = vsp_new_vec;
00132 std::vector<uint8_t> msg_data = serialize_ros(p);
00133 mqm_->getOutMsgQueue("vstig")->push(msg_data);
00134 }
00135 }
00136 else if(local.lamport_clock > vsq.lamport_clock) {
00137 gsdf_msgs::VirtualStigmergyPut vsp;
00138 vsp.vstig_id = vsq.vstig_id;
00139 vsp.key = vsq.key;
00140 vsp.value = local.vstig_value;
00141 vsp.lamport_clock = local.lamport_clock;
00142 vsp.robot_id = local.robot_id;
00143 std::vector<uint8_t> vsp_vec = serialize_ros(vsp);
00144
00145 gsdf_msgs::CommPacket p;
00146 p.header.source = shm_rid;
00147 p.header.type = VIRTUAL_STIGMERGY_PUT;
00148 p.header.data_len = vsp_vec.size();
00149 p.header.version = 1;
00150 p.header.checksum = 0;
00151 p.content.buf = vsp_vec;
00152 std::vector<uint8_t> msg_data = serialize_ros(p);
00153 mqm_->getOutMsgQueue("vstig")->push(msg_data);
00154 }
00155 else if((local.lamport_clock == vsq.lamport_clock) && (local.robot_id != vsq.robot_id)) {
00156
00157 }
00158 else {
00159
00160 }
00161
00162 break;
00163 }
00164 case VIRTUAL_STIGMERGY_PUT: {
00165 if(!rth_->inNeighbors(packet_source)){
00166 return;
00167 }
00168 gsdf_msgs::VirtualStigmergyPut vsp = deserialize_ros<gsdf_msgs::VirtualStigmergyPut>(packet_data);
00169
00170 VirtualStigmergyTuple local;
00171 bool exist = rth_->getVirtualStigmergyTuple(vsp.vstig_id, vsp.key, local);
00172
00173
00174 if(!exist||(local.lamport_clock < vsp.lamport_clock)) {
00175 rth_->createVirtualStigmergy(vsp.vstig_id);
00176 rth_->insertOrUpdateVirtualStigmergy(vsp.vstig_id, vsp.key, vsp.value, vsp.lamport_clock, time(NULL), 0, vsp.robot_id);
00177
00178 if(!rth_->checkNeighborsOverlap(packet_source)) {
00179 gsdf_msgs::VirtualStigmergyPut vsp_new;
00180 vsp_new.vstig_id = vsp.vstig_id;
00181 vsp_new.key = vsp.key;
00182 vsp_new.value = vsp.value;
00183 vsp_new.lamport_clock = vsp.lamport_clock;
00184 vsp_new.robot_id = vsp.robot_id;
00185 std::vector<uint8_t> vsp_new_vec = serialize_ros(vsp_new);
00186
00187 gsdf_msgs::CommPacket p;
00188 p.header.source = shm_rid;
00189 p.header.type = VIRTUAL_STIGMERGY_PUT;
00190 p.header.data_len = vsp_new_vec.size();
00191 p.header.version = 1;
00192 p.header.checksum = 0;
00193 p.content.buf = vsp_new_vec;
00194 std::vector<uint8_t> msg_data = serialize_ros(p);
00195 mqm_->getOutMsgQueue("vstig")->push(msg_data);
00196 }
00197 }
00198 else if((local.lamport_clock==vsp.lamport_clock)&&(local.robot_id!=vsp.robot_id)) {
00199
00200 }
00201 else {
00202
00203 }
00204
00205 break;
00206 }
00207 case VIRTUAL_STIGMERGY_PUTS: {
00208 if(!rth_->inNeighbors(packet_source)){
00209 return;
00210 }
00211 gsdf_msgs::VirtualStigmergyPuts vsps = deserialize_ros<gsdf_msgs::VirtualStigmergyPuts>(packet_data);
00212
00213 bool process_msg = false;
00214 int neighbor_size = rth_->getNeighborSize();
00215 float prob = vsps.prob;
00216 float rand_prob = (float)rand()/RAND_MAX;
00217
00218 if(neighbor_size < 3) {
00219 process_msg = true;
00220 }
00221 else {
00222 if(rand_prob < prob) {
00223 process_msg = true;
00224 }
00225 else {
00226 process_msg = false;
00227 }
00228 }
00229
00230
00231
00232 if(!process_msg) {
00233 return;
00234 }
00235
00236 VirtualStigmergyTuple local;
00237 bool exist = rth_->getVirtualStigmergyTuple(vsps.vstig_id, vsps.key, local);
00238
00239
00240 if(!exist||(local.lamport_clock < vsps.lamport_clock)) {
00241 rth_->createVirtualStigmergy(vsps.vstig_id);
00242 rth_->insertOrUpdateVirtualStigmergy(vsps.vstig_id, vsps.key, vsps.value, vsps.lamport_clock, time(NULL), 0, vsps.robot_id);
00243
00244 if(!rth_->checkNeighborsOverlap(packet_source)) {
00245 gsdf_msgs::VirtualStigmergyPuts vsps_new;
00246 vsps_new.vstig_id = vsps.vstig_id;
00247 vsps_new.key = vsps.key;
00248 vsps_new.value = vsps.value;
00249 vsps_new.lamport_clock = vsps.lamport_clock;
00250 vsps_new.robot_id = vsps.robot_id;
00251 int neighbor_size = rth_->getNeighborSize();
00252 if(neighbor_size < 3) {
00253 vsps_new.prob = 1;
00254 }
00255 else {
00256 vsps_new.prob = 2.0/neighbor_size;
00257 }
00258 std::vector<uint8_t> vsps_new_vec = serialize_ros(vsps_new);
00259
00260 gsdf_msgs::CommPacket p;
00261 p.header.source = shm_rid;
00262 p.header.type = VIRTUAL_STIGMERGY_PUTS;
00263 p.header.data_len = vsps_new_vec.size();
00264 p.header.version = 1;
00265 p.header.checksum = 0;
00266 p.content.buf = vsps_new_vec;
00267 std::vector<uint8_t> msg_data = serialize_ros(p);
00268 mqm_->getOutMsgQueue("vstig")->push(msg_data);
00269 }
00270 }
00271 else if((local.lamport_clock==vsps.lamport_clock)&&(local.robot_id!=vsps.robot_id)) {
00272
00273 }
00274 else {
00275
00276 }
00277
00278 break;
00279 }
00280 case BLACKBOARD_PUT: {
00281 if(!rth_->inNeighbors(packet_source)){
00282 return;
00283 }
00284 gsdf_msgs::BlackBoardPut bbp = deserialize_ros<gsdf_msgs::BlackBoardPut>(packet_data);
00285 int robot_id = rth_->getRobotID();
00286 std::string bb_key = bbp.key;
00287 if(bbp.on_robot_id == robot_id) {
00288 rth_->createBlackBoard(bbp.bb_id);
00289 BlackBoardTuple local;
00290 rth_->getBlackBoardTuple(bbp.bb_id, bbp.key, local);
00291
00292
00293 if(!rth_->isBlackBoardTupleExist(bbp.bb_id, bbp.key) || local.timestamp < bbp.timestamp) {
00294 rth_->insertOrUpdateBlackBoard(bbp.bb_id, bbp.key, bbp.value, bbp.timestamp, bbp.robot_id);
00295 }
00296 }
00297 else{
00298
00299 }
00300
00301 break;
00302 }
00303 case BLACKBOARD_QUERY: {
00304 if(!rth_->inNeighbors(packet_source)){
00305 return;
00306 }
00307 gsdf_msgs::BlackBoardQuery bbq = deserialize_ros<gsdf_msgs::BlackBoardQuery>(packet_data);
00308 int robot_id = rth_->getRobotID();
00309 std::string bb_key = bbq.key;
00310
00311 if(bbq.on_robot_id == robot_id) {
00312 BlackBoardTuple local;
00313 rth_->getBlackBoardTuple(bbq.bb_id, bbq.key, local);
00314 gsdf_msgs::BlackBoardAck bba;
00315 bba.bb_id = bbq.bb_id;
00316 bba.on_robot_id = bbq.on_robot_id;
00317 bba.key = bbq.key;
00318 bba.value = local.bb_value;
00319 bba.timestamp = local.timestamp;
00320 bba.robot_id = bbq.robot_id;
00321 std::vector<uint8_t> bbqa_vec = serialize_ros(bba);
00322
00323 gsdf_msgs::CommPacket p;
00324 p.header.source = shm_rid;
00325 p.header.type = BLACKBOARD_QUERY_ACK;
00326 p.header.data_len = bbqa_vec.size();
00327 p.header.version = 1;
00328 p.header.checksum = 0;
00329 p.content.buf = bbqa_vec;
00330 std::vector<uint8_t> msg_data = serialize_ros(p);
00331 mqm_->getOutMsgQueue("bb")->push(msg_data);
00332 }
00333 else{
00334
00335 }
00336
00337 break;
00338 }
00339 case BLACKBOARD_QUERY_ACK: {
00340 if(!rth_->inNeighbors(packet_source)){
00341 return;
00342 }
00343 gsdf_msgs::BlackBoardAck bba = deserialize_ros<gsdf_msgs::BlackBoardAck>(packet_data);
00344 int robot_id = rth_->getRobotID();
00345 std::string bb_key = bba.key;
00346
00347 if(bba.on_robot_id == robot_id) {
00348
00349 }
00350 else {
00351 rth_->createBlackBoard(bba.bb_id);
00352 rth_->insertOrUpdateBlackBoard(bba.bb_id, bba.key, bba.value, bba.timestamp, bba.robot_id);
00353 }
00354
00355 break;
00356 }
00357 case SCDS_PSO_PUT: {
00358 if(!rth_->inNeighbors(packet_source)){
00359 return;
00360 }
00361 gsdf_msgs::SCDSPSOPut scds_put = deserialize_ros<gsdf_msgs::SCDSPSOPut>(packet_data);
00362 SCDSPSODataTuple local;
00363 bool exist = rth_->getSCDSPSOValue(scds_put.key, local);
00364
00365
00366 if ((!exist) || (local.val < scds_put.val)) {
00367 SCDSPSODataTuple new_data;
00368 new_data.pos = scds_put.pos;
00369 new_data.val = scds_put.val;
00370 new_data.robot_id = scds_put.robot_id;
00371 new_data.gen = scds_put.gen;
00372 new_data.timestamp = scds_put.timestamp;
00373 rth_->insertOrUpdateSCDSPSOValue(scds_put.key, new_data);
00374
00375 if(!rth_->checkNeighborsOverlap(packet_source)) {
00376 gsdf_msgs::CommPacket p;
00377 p.header.source = shm_rid;
00378 p.header.type = SCDS_PSO_PUT;
00379 p.header.data_len = packet_data.size();
00380 p.header.version = 1;
00381 p.header.checksum = 0;
00382 p.content.buf = packet_data;
00383 std::vector <uint8_t> msg_data = serialize_ros(p);
00384 mqm_->getOutMsgQueue("scds_pso")->push(msg_data);
00385 }
00386 }
00387
00388 break;
00389 }
00390 case SCDS_PSO_GET: {
00391 if(!rth_->inNeighbors(packet_source)){
00392 return;
00393 }
00394 gsdf_msgs::SCDSPSOGet scds_get = deserialize_ros<gsdf_msgs::SCDSPSOGet>(packet_data);
00395 SCDSPSODataTuple local;
00396 bool exist = rth_->getSCDSPSOValue(scds_get.key, local);
00397
00398
00399 if ((!exist) || (local.val < scds_get.val)) {
00400 SCDSPSODataTuple new_data;
00401 new_data.pos = scds_get.pos;
00402 new_data.val = scds_get.val;
00403 new_data.robot_id = scds_get.robot_id;
00404 new_data.gen = scds_get.gen;
00405 new_data.timestamp = scds_get.timestamp;
00406 rth_->insertOrUpdateSCDSPSOValue(scds_get.key, new_data);
00407
00408 if(!rth_->checkNeighborsOverlap(packet_source)) {
00409 gsdf_msgs::CommPacket p;
00410 p.header.source = shm_rid;
00411 p.header.type = SCDS_PSO_PUT;
00412 p.header.data_len = packet_data.size();
00413 p.header.version = 1;
00414 p.header.checksum = 0;
00415 p.content.buf = packet_data;
00416 std::vector <uint8_t> msg_data = serialize_ros(p);
00417 mqm_->getOutMsgQueue("scds_pso")->push(msg_data);
00418 }
00419 }
00420 else if(local.val > scds_get.val) {
00421 gsdf_msgs::SCDSPSOPut scds_put;
00422 scds_put.key = scds_get.key;
00423 scds_put.pos = local.pos;
00424 scds_put.val = local.val;
00425 scds_put.robot_id = local.robot_id;
00426 scds_put.gen = local.gen;
00427 scds_put.timestamp = local.timestamp;
00428 std::vector<uint8_t> scds_put_vec = serialize_ros(scds_put);;
00429
00430 gsdf_msgs::CommPacket p;
00431 p.header.source = shm_rid;
00432 p.header.type = SCDS_PSO_PUT;
00433 p.header.data_len = scds_put_vec.size();
00434 p.header.version = 1;
00435 p.header.checksum = 0;
00436 p.content.buf = scds_put_vec;
00437 std::vector<uint8_t> msg_data = serialize_ros(p);
00438 mqm_->getOutMsgQueue("scds_pso")->push(msg_data);
00439 }
00440 else {
00441
00442 }
00443
00444 break;
00445 }
00446 case NEIGHBOR_BROADCAST_KEY_VALUE: {
00447 if(!rth_->inNeighbors(packet_source)){
00448 return;
00449 }
00450 gsdf_msgs::NeighborBroadcastKeyValue nbkv = deserialize_ros<gsdf_msgs::NeighborBroadcastKeyValue>(packet_data);
00451 boost::shared_ptr<ListenerHelper> helper = rth_->getListenerHelper(nbkv.key);
00452 if(helper == NULL) {
00453 return;
00454 }
00455 helper->call(nbkv.value);
00456
00457 break;
00458 }
00459 case BARRIER_SYN: {
00460 gsdf_msgs::BarrierSyn bs = deserialize_ros<gsdf_msgs::BarrierSyn>(packet_data);
00461 if(bs.s != "SYN") {
00462 return;
00463 }
00464
00465 gsdf_msgs::BarrierAck ba;
00466 ba.robot_id = packet.header.source;
00467 std::vector<uint8_t> ba_vec = serialize_ros(ba);
00468
00469 gsdf_msgs::CommPacket p;
00470 p.header.source = shm_rid;
00471 p.header.type = BARRIER_ACK;
00472 p.header.data_len = ba_vec.size();
00473 p.header.version = 1;
00474 p.header.checksum = 0;
00475 p.content.buf = ba_vec;
00476 std::vector<uint8_t> msg_data = serialize_ros(p);
00477 mqm_->getOutMsgQueue("barrier")->push(msg_data);
00478 break;
00479 }
00480 case BARRIER_ACK: {
00481 gsdf_msgs::BarrierAck ba = deserialize_ros<gsdf_msgs::BarrierAck>(packet_data);
00482
00483 if(shm_rid == ba.robot_id) {
00484 rth_->insertBarrier(packet.header.source);
00485 }
00486
00487 break;
00488 }
00489
00490 default: {
00491 std::cout<<"UNDEFINED PACKET TYPE!"<<std::endl;
00492 }
00493 }
00494 }
00495 catch(char *err_str) {
00496 std::cout<<err_str<<std::endl;
00497 return;
00498 }
00499 }
00500
00501 void PacketParser::PacketParser::parse(const std::vector<char>& data)
00502 {
00503 std::vector<uint8_t> seq;
00504 for(int i = 0; i < data.size(); i++) {
00505 uint8_t c = (uint8_t)(data[i]);
00506 seq.push_back(c);
00507 }
00508 parse(seq);
00509 }
00510
00511 void PacketParser::parse(uint8_t* data, int len)
00512 {
00513 std::vector<uint8_t> seq;
00514 for(int i = 0; i < len; i++) {
00515 seq.push_back(*(data+i));
00516 }
00517 parse(seq);
00518 }
00519
00520 void PacketParser::parse(char* data, int len)
00521 {
00522 uint8_t *p = (uint8_t*)data;
00523 parse(p, len);
00524 }
00525 };