packet_parser.cpp
Go to the documentation of this file.
1 
24 
25 namespace micros_swarm{
26 
28  {
30  cni_.reset(new CheckNeighbor(rth_->getNeighborDistance()));
32  }
33 
35  {
36  rth_.reset();
37  cni_.reset();
38  mqm_.reset();
39  }
40 
41  void PacketParser::parse(const std::vector<uint8_t>& data)
42  {
43  gsdf_msgs::CommPacket packet = deserialize_ros<gsdf_msgs::CommPacket>(data);
44  int packet_source = packet.header.source;
45  int shm_rid = rth_->getRobotID();
46 
47  //ignore the packet of the local robot
48  if(packet_source == shm_rid) {
49  return;
50  }
51 
52  try {
53  const int packet_type = packet.header.type;
54  std::vector<uint8_t> packet_data = packet.content.buf;
55 
56  switch(packet_type) {
58  gsdf_msgs::RobotBase rb = deserialize_ros<gsdf_msgs::RobotBase>(packet_data);
59 
60  //if(rb.valid != 1) { //ignore the default Base value.
61  // return;
62  //}
63 
64  const Base& self = rth_->getRobotBase();
65  Base neighbor(rb.px, rb.py, rb.pz, rb.vx, rb.vy, rb.vz, rb.valid);
66 
67  if(cni_->isNeighbor(self, neighbor)) {
68  rth_->insertOrUpdateNeighbor(rb.id, 0, 0, 0, rb.px, rb.py, rb.pz, rb.vx, rb.vy, rb.vz);
69  }
70  else {
71  rth_->deleteNeighbor(rb.id);
72  }
73 
74  break;
75  }
77  if(!rth_->inNeighbors(packet_source)){
78  return;
79  }
80  gsdf_msgs::JoinSwarm srjs = deserialize_ros<gsdf_msgs::JoinSwarm>(packet_data);
81  rth_->joinNeighborSwarm(srjs.robot_id, srjs.swarm_id);
82 
83  break;
84  }
86  if(!rth_->inNeighbors(packet_source)){
87  return;
88  }
89  gsdf_msgs::LeaveSwarm srls = deserialize_ros<gsdf_msgs::LeaveSwarm>(packet_data);
90  rth_->leaveNeighborSwarm(srls.robot_id, srls.swarm_id);
91 
92  break;
93  }
95  if(!rth_->inNeighbors(packet_source)){
96  return;
97  }
98  gsdf_msgs::SwarmList srsl = deserialize_ros<gsdf_msgs::SwarmList>(packet_data);
99  rth_->insertOrRefreshNeighborSwarm(srsl.robot_id, srsl.swarm_list);
100 
101  break;
102  }
104  if(!rth_->inNeighbors(packet_source)) {
105  return;
106  }
107  gsdf_msgs::VirtualStigmergyQuery vsq = deserialize_ros<gsdf_msgs::VirtualStigmergyQuery>(packet_data);
108  VirtualStigmergyTuple local;
109  bool exist = rth_->getVirtualStigmergyTuple(vsq.vstig_id, vsq.key, local);
110 
111  //local tuple is not exist or the local timestamp is smaller
112  if(!exist||(local.lamport_clock<vsq.lamport_clock)) {
113  rth_->createVirtualStigmergy(vsq.vstig_id);
114  rth_->insertOrUpdateVirtualStigmergy(vsq.vstig_id, vsq.key, vsq.value, vsq.lamport_clock, time(NULL), 0, vsq.robot_id);
115 
116  if(!rth_->checkNeighborsOverlap(packet_source)) {
117  gsdf_msgs::VirtualStigmergyPut vsp_new;
118  vsp_new.vstig_id = vsq.vstig_id;
119  vsp_new.key = vsq.key;
120  vsp_new.value = vsq.value;
121  vsp_new.lamport_clock = vsq.lamport_clock;
122  vsp_new.robot_id = vsq.robot_id;
123  std::vector<uint8_t> vsp_new_vec = serialize_ros(vsp_new);
124 
125  gsdf_msgs::CommPacket p;
126  p.header.source = shm_rid;
127  p.header.type = VIRTUAL_STIGMERGY_PUT;
128  p.header.data_len = vsp_new_vec.size();
129  p.header.version = 1;
130  p.header.checksum = 0;
131  p.content.buf = vsp_new_vec;
132  std::vector<uint8_t> msg_data = serialize_ros(p);
133  mqm_->getOutMsgQueue("vstig")->push(msg_data);
134  }
135  }
136  else if(local.lamport_clock > vsq.lamport_clock) { //local timestamp is larger
137  gsdf_msgs::VirtualStigmergyPut vsp;
138  vsp.vstig_id = vsq.vstig_id;
139  vsp.key = vsq.key;
140  vsp.value = local.vstig_value;
141  vsp.lamport_clock = local.lamport_clock;
142  vsp.robot_id = local.robot_id;
143  std::vector<uint8_t> vsp_vec = serialize_ros(vsp);
144 
145  gsdf_msgs::CommPacket p;
146  p.header.source = shm_rid;
147  p.header.type = VIRTUAL_STIGMERGY_PUT;
148  p.header.data_len = vsp_vec.size();
149  p.header.version = 1;
150  p.header.checksum = 0;
151  p.content.buf = vsp_vec;
152  std::vector<uint8_t> msg_data = serialize_ros(p);
153  mqm_->getOutMsgQueue("vstig")->push(msg_data);
154  }
155  else if((local.lamport_clock == vsq.lamport_clock) && (local.robot_id != vsq.robot_id)) {
156  //std::cout<<"query conflict"<<std::endl;
157  }
158  else {
159  //do nothing
160  }
161 
162  break;
163  }
164  case VIRTUAL_STIGMERGY_PUT: {
165  if(!rth_->inNeighbors(packet_source)){
166  return;
167  }
168  gsdf_msgs::VirtualStigmergyPut vsp = deserialize_ros<gsdf_msgs::VirtualStigmergyPut>(packet_data);
169 
170  VirtualStigmergyTuple local;
171  bool exist = rth_->getVirtualStigmergyTuple(vsp.vstig_id, vsp.key, local);
172 
173  //local tuple is not exist or local timestamp is smaller
174  if(!exist||(local.lamport_clock < vsp.lamport_clock)) {
175  rth_->createVirtualStigmergy(vsp.vstig_id);
176  rth_->insertOrUpdateVirtualStigmergy(vsp.vstig_id, vsp.key, vsp.value, vsp.lamport_clock, time(NULL), 0, vsp.robot_id);
177 
178  if(!rth_->checkNeighborsOverlap(packet_source)) {
179  gsdf_msgs::VirtualStigmergyPut vsp_new;
180  vsp_new.vstig_id = vsp.vstig_id;
181  vsp_new.key = vsp.key;
182  vsp_new.value = vsp.value;
183  vsp_new.lamport_clock = vsp.lamport_clock;
184  vsp_new.robot_id = vsp.robot_id;
185  std::vector<uint8_t> vsp_new_vec = serialize_ros(vsp_new);
186 
187  gsdf_msgs::CommPacket p;
188  p.header.source = shm_rid;
189  p.header.type = VIRTUAL_STIGMERGY_PUT;
190  p.header.data_len = vsp_new_vec.size();
191  p.header.version = 1;
192  p.header.checksum = 0;
193  p.content.buf = vsp_new_vec;
194  std::vector<uint8_t> msg_data = serialize_ros(p);
195  mqm_->getOutMsgQueue("vstig")->push(msg_data);
196  }
197  }
198  else if((local.lamport_clock==vsp.lamport_clock)&&(local.robot_id!=vsp.robot_id)) {
199  //std::cout<<"put conflict"<<std::endl;
200  }
201  else {
202  //do nothing
203  }
204 
205  break;
206  }
207  case VIRTUAL_STIGMERGY_PUTS: {
208  if(!rth_->inNeighbors(packet_source)){
209  return;
210  }
211  gsdf_msgs::VirtualStigmergyPuts vsps = deserialize_ros<gsdf_msgs::VirtualStigmergyPuts>(packet_data);
212 
213  bool process_msg = false;
214  int neighbor_size = rth_->getNeighborSize();
215  float prob = vsps.prob;
216  float rand_prob = (float)rand()/RAND_MAX;
217 
218  if(neighbor_size < 3) {
219  process_msg = true;
220  }
221  else {
222  if(rand_prob < prob) {
223  process_msg = true;
224  }
225  else {
226  process_msg = false;
227  }
228  }
229 
230  //std::cout<<rth_->getRobotID()<<": "<<rand_prob<<", "<<prob<<", "<<process_msg<<std::endl;
231 
232  if(!process_msg) {
233  return;
234  }
235 
236  VirtualStigmergyTuple local;
237  bool exist = rth_->getVirtualStigmergyTuple(vsps.vstig_id, vsps.key, local);
238 
239  //local tuple is not exist or local timestamp is smaller
240  if(!exist||(local.lamport_clock < vsps.lamport_clock)) {
241  rth_->createVirtualStigmergy(vsps.vstig_id);
242  rth_->insertOrUpdateVirtualStigmergy(vsps.vstig_id, vsps.key, vsps.value, vsps.lamport_clock, time(NULL), 0, vsps.robot_id);
243 
244  if(!rth_->checkNeighborsOverlap(packet_source)) {
245  gsdf_msgs::VirtualStigmergyPuts vsps_new;
246  vsps_new.vstig_id = vsps.vstig_id;
247  vsps_new.key = vsps.key;
248  vsps_new.value = vsps.value;
249  vsps_new.lamport_clock = vsps.lamport_clock;
250  vsps_new.robot_id = vsps.robot_id;
251  int neighbor_size = rth_->getNeighborSize();
252  if(neighbor_size < 3) {
253  vsps_new.prob = 1;
254  }
255  else {
256  vsps_new.prob = 2.0/neighbor_size;
257  }
258  std::vector<uint8_t> vsps_new_vec = serialize_ros(vsps_new);
259 
260  gsdf_msgs::CommPacket p;
261  p.header.source = shm_rid;
262  p.header.type = VIRTUAL_STIGMERGY_PUTS;
263  p.header.data_len = vsps_new_vec.size();
264  p.header.version = 1;
265  p.header.checksum = 0;
266  p.content.buf = vsps_new_vec;
267  std::vector<uint8_t> msg_data = serialize_ros(p);
268  mqm_->getOutMsgQueue("vstig")->push(msg_data);
269  }
270  }
271  else if((local.lamport_clock==vsps.lamport_clock)&&(local.robot_id!=vsps.robot_id)) {
272  //std::cout<<"put conflict"<<std::endl;
273  }
274  else {
275  //do nothing
276  }
277 
278  break;
279  }
280  case BLACKBOARD_PUT: {
281  if(!rth_->inNeighbors(packet_source)){
282  return;
283  }
284  gsdf_msgs::BlackBoardPut bbp = deserialize_ros<gsdf_msgs::BlackBoardPut>(packet_data);
285  int robot_id = rth_->getRobotID();
286  std::string bb_key = bbp.key;
287  if(bbp.on_robot_id == robot_id) {
288  rth_->createBlackBoard(bbp.bb_id);
289  BlackBoardTuple local;
290  rth_->getBlackBoardTuple(bbp.bb_id, bbp.key, local);
291 
292  //local tuple is not exist or the local timestamp is smaller
293  if(!rth_->isBlackBoardTupleExist(bbp.bb_id, bbp.key) || local.timestamp < bbp.timestamp) {
294  rth_->insertOrUpdateBlackBoard(bbp.bb_id, bbp.key, bbp.value, bbp.timestamp, bbp.robot_id);
295  }
296  }
297  else{
298 
299  }
300 
301  break;
302  }
303  case BLACKBOARD_QUERY: {
304  if(!rth_->inNeighbors(packet_source)){
305  return;
306  }
307  gsdf_msgs::BlackBoardQuery bbq = deserialize_ros<gsdf_msgs::BlackBoardQuery>(packet_data);
308  int robot_id = rth_->getRobotID();
309  std::string bb_key = bbq.key;
310 
311  if(bbq.on_robot_id == robot_id) {
312  BlackBoardTuple local;
313  rth_->getBlackBoardTuple(bbq.bb_id, bbq.key, local);
314  gsdf_msgs::BlackBoardAck bba;
315  bba.bb_id = bbq.bb_id;
316  bba.on_robot_id = bbq.on_robot_id;
317  bba.key = bbq.key;
318  bba.value = local.bb_value;
319  bba.timestamp = local.timestamp;
320  bba.robot_id = bbq.robot_id;
321  std::vector<uint8_t> bbqa_vec = serialize_ros(bba);
322 
323  gsdf_msgs::CommPacket p;
324  p.header.source = shm_rid;
325  p.header.type = BLACKBOARD_QUERY_ACK;
326  p.header.data_len = bbqa_vec.size();
327  p.header.version = 1;
328  p.header.checksum = 0;
329  p.content.buf = bbqa_vec;
330  std::vector<uint8_t> msg_data = serialize_ros(p);
331  mqm_->getOutMsgQueue("bb")->push(msg_data);
332  }
333  else{
334 
335  }
336 
337  break;
338  }
339  case BLACKBOARD_QUERY_ACK: {
340  if(!rth_->inNeighbors(packet_source)){
341  return;
342  }
343  gsdf_msgs::BlackBoardAck bba = deserialize_ros<gsdf_msgs::BlackBoardAck>(packet_data);
344  int robot_id = rth_->getRobotID();
345  std::string bb_key = bba.key;
346 
347  if(bba.on_robot_id == robot_id) {
348 
349  }
350  else {
351  rth_->createBlackBoard(bba.bb_id);
352  rth_->insertOrUpdateBlackBoard(bba.bb_id, bba.key, bba.value, bba.timestamp, bba.robot_id);
353  }
354 
355  break;
356  }
357  case SCDS_PSO_PUT: {
358  if(!rth_->inNeighbors(packet_source)){
359  return;
360  }
361  gsdf_msgs::SCDSPSOPut scds_put = deserialize_ros<gsdf_msgs::SCDSPSOPut>(packet_data);
362  SCDSPSODataTuple local;
363  bool exist = rth_->getSCDSPSOValue(scds_put.key, local);
364 
365  //local tuple is not exist or local value is smaller
366  if ((!exist) || (local.val < scds_put.val)) {
367  SCDSPSODataTuple new_data;
368  new_data.pos = scds_put.pos;
369  new_data.val = scds_put.val;
370  new_data.robot_id = scds_put.robot_id;
371  new_data.gen = scds_put.gen;
372  new_data.timestamp = scds_put.timestamp;
373  rth_->insertOrUpdateSCDSPSOValue(scds_put.key, new_data);
374 
375  if(!rth_->checkNeighborsOverlap(packet_source)) {
376  gsdf_msgs::CommPacket p;
377  p.header.source = shm_rid;
378  p.header.type = SCDS_PSO_PUT;
379  p.header.data_len = packet_data.size();
380  p.header.version = 1;
381  p.header.checksum = 0;
382  p.content.buf = packet_data;
383  std::vector <uint8_t> msg_data = serialize_ros(p);
384  mqm_->getOutMsgQueue("scds_pso")->push(msg_data);
385  }
386  }
387 
388  break;
389  }
390  case SCDS_PSO_GET: {
391  if(!rth_->inNeighbors(packet_source)){
392  return;
393  }
394  gsdf_msgs::SCDSPSOGet scds_get = deserialize_ros<gsdf_msgs::SCDSPSOGet>(packet_data);
395  SCDSPSODataTuple local;
396  bool exist = rth_->getSCDSPSOValue(scds_get.key, local);
397 
398  //local tuple is not exist or local value is smaller
399  if ((!exist) || (local.val < scds_get.val)) {
400  SCDSPSODataTuple new_data;
401  new_data.pos = scds_get.pos;
402  new_data.val = scds_get.val;
403  new_data.robot_id = scds_get.robot_id;
404  new_data.gen = scds_get.gen;
405  new_data.timestamp = scds_get.timestamp;
406  rth_->insertOrUpdateSCDSPSOValue(scds_get.key, new_data);
407 
408  if(!rth_->checkNeighborsOverlap(packet_source)) {
409  gsdf_msgs::CommPacket p;
410  p.header.source = shm_rid;
411  p.header.type = SCDS_PSO_PUT;
412  p.header.data_len = packet_data.size();
413  p.header.version = 1;
414  p.header.checksum = 0;
415  p.content.buf = packet_data;
416  std::vector <uint8_t> msg_data = serialize_ros(p);
417  mqm_->getOutMsgQueue("scds_pso")->push(msg_data);
418  }
419  }
420  else if(local.val > scds_get.val) {
421  gsdf_msgs::SCDSPSOPut scds_put;
422  scds_put.key = scds_get.key;
423  scds_put.pos = local.pos;
424  scds_put.val = local.val;
425  scds_put.robot_id = local.robot_id;
426  scds_put.gen = local.gen;
427  scds_put.timestamp = local.timestamp;
428  std::vector<uint8_t> scds_put_vec = serialize_ros(scds_put);;
429 
430  gsdf_msgs::CommPacket p;
431  p.header.source = shm_rid;
432  p.header.type = SCDS_PSO_PUT;
433  p.header.data_len = scds_put_vec.size();
434  p.header.version = 1;
435  p.header.checksum = 0;
436  p.content.buf = scds_put_vec;
437  std::vector<uint8_t> msg_data = serialize_ros(p);
438  mqm_->getOutMsgQueue("scds_pso")->push(msg_data);
439  }
440  else {
441 
442  }
443 
444  break;
445  }
447  if(!rth_->inNeighbors(packet_source)){
448  return;
449  }
450  gsdf_msgs::NeighborBroadcastKeyValue nbkv = deserialize_ros<gsdf_msgs::NeighborBroadcastKeyValue>(packet_data);
451  boost::shared_ptr<ListenerHelper> helper = rth_->getListenerHelper(nbkv.key);
452  if(helper == NULL) {
453  return;
454  }
455  helper->call(nbkv.value);
456 
457  break;
458  }
459  case BARRIER_SYN: {
460  gsdf_msgs::BarrierSyn bs = deserialize_ros<gsdf_msgs::BarrierSyn>(packet_data);
461  if(bs.s != "SYN") {
462  return;
463  }
464 
465  gsdf_msgs::BarrierAck ba;
466  ba.robot_id = packet.header.source;
467  std::vector<uint8_t> ba_vec = serialize_ros(ba);
468 
469  gsdf_msgs::CommPacket p;
470  p.header.source = shm_rid;
471  p.header.type = BARRIER_ACK;
472  p.header.data_len = ba_vec.size();
473  p.header.version = 1;
474  p.header.checksum = 0;
475  p.content.buf = ba_vec;
476  std::vector<uint8_t> msg_data = serialize_ros(p);
477  mqm_->getOutMsgQueue("barrier")->push(msg_data);
478  break;
479  }
480  case BARRIER_ACK: {
481  gsdf_msgs::BarrierAck ba = deserialize_ros<gsdf_msgs::BarrierAck>(packet_data);
482 
483  if(shm_rid == ba.robot_id) {
484  rth_->insertBarrier(packet.header.source);
485  }
486 
487  break;
488  }
489 
490  default: {
491  std::cout<<"UNDEFINED PACKET TYPE!"<<std::endl;
492  }
493  }
494  }
495  catch(char *err_str) {
496  std::cout<<err_str<<std::endl;
497  return;
498  }
499  }
500 
501  void PacketParser::PacketParser::parse(const std::vector<char>& data)
502  {
503  std::vector<uint8_t> seq;
504  for(int i = 0; i < data.size(); i++) {
505  uint8_t c = (uint8_t)(data[i]);
506  seq.push_back(c);
507  }
508  parse(seq);
509  }
510 
511  void PacketParser::parse(uint8_t* data, int len)
512  {
513  std::vector<uint8_t> seq;
514  for(int i = 0; i < len; i++) {
515  seq.push_back(*(data+i));
516  }
517  parse(seq);
518  }
519 
520  void PacketParser::parse(char* data, int len)
521  {
522  uint8_t *p = (uint8_t*)data;
523  parse(p, len);
524  }
525 };
std::vector< float > pos
Definition: data_type.h:170
boost::shared_ptr< micros_swarm::RuntimeHandle > rth_
Definition: packet_parser.h:68
boost::shared_ptr< micros_swarm::MsgQueueManager > mqm_
Definition: packet_parser.h:70
std::vector< uint8_t > serialize_ros(T t)
Definition: serialize.h:39
void parse(const std::vector< uint8_t > &data)
static boost::shared_ptr< T > getSingleton()
Definition: singleton.h:70
boost::shared_ptr< CheckNeighborInterface > cni_
Definition: packet_parser.h:69
std::vector< uint8_t > bb_value
Definition: data_type.h:150


micros_swarm
Author(s):
autogenerated on Mon Jun 10 2019 14:02:06