43 gsdf_msgs::CommPacket packet = deserialize_ros<gsdf_msgs::CommPacket>(data);
44 int packet_source = packet.header.source;
45 int shm_rid =
rth_->getRobotID();
48 if(packet_source == shm_rid) {
53 const int packet_type = packet.header.type;
54 std::vector<uint8_t> packet_data = packet.content.buf;
58 gsdf_msgs::RobotBase rb = deserialize_ros<gsdf_msgs::RobotBase>(packet_data);
64 const Base&
self =
rth_->getRobotBase();
65 Base neighbor(rb.px, rb.py, rb.pz, rb.vx, rb.vy, rb.vz, rb.valid);
67 if(
cni_->isNeighbor(
self, neighbor)) {
68 rth_->insertOrUpdateNeighbor(rb.id, 0, 0, 0, rb.px, rb.py, rb.pz, rb.vx, rb.vy, rb.vz);
71 rth_->deleteNeighbor(rb.id);
77 if(!
rth_->inNeighbors(packet_source)){
80 gsdf_msgs::JoinSwarm srjs = deserialize_ros<gsdf_msgs::JoinSwarm>(packet_data);
81 rth_->joinNeighborSwarm(srjs.robot_id, srjs.swarm_id);
86 if(!
rth_->inNeighbors(packet_source)){
89 gsdf_msgs::LeaveSwarm srls = deserialize_ros<gsdf_msgs::LeaveSwarm>(packet_data);
90 rth_->leaveNeighborSwarm(srls.robot_id, srls.swarm_id);
95 if(!
rth_->inNeighbors(packet_source)){
98 gsdf_msgs::SwarmList srsl = deserialize_ros<gsdf_msgs::SwarmList>(packet_data);
99 rth_->insertOrRefreshNeighborSwarm(srsl.robot_id, srsl.swarm_list);
104 if(!
rth_->inNeighbors(packet_source)) {
107 gsdf_msgs::VirtualStigmergyQuery vsq = deserialize_ros<gsdf_msgs::VirtualStigmergyQuery>(packet_data);
109 bool exist =
rth_->getVirtualStigmergyTuple(vsq.vstig_id, vsq.key, local);
112 if(!exist||(local.lamport_clock<vsq.lamport_clock)) {
113 rth_->createVirtualStigmergy(vsq.vstig_id);
114 rth_->insertOrUpdateVirtualStigmergy(vsq.vstig_id, vsq.key, vsq.value, vsq.lamport_clock, time(NULL), 0, vsq.robot_id);
116 if(!
rth_->checkNeighborsOverlap(packet_source)) {
117 gsdf_msgs::VirtualStigmergyPut vsp_new;
118 vsp_new.vstig_id = vsq.vstig_id;
119 vsp_new.key = vsq.key;
120 vsp_new.value = vsq.value;
121 vsp_new.lamport_clock = vsq.lamport_clock;
122 vsp_new.robot_id = vsq.robot_id;
125 gsdf_msgs::CommPacket p;
126 p.header.source = shm_rid;
128 p.header.data_len = vsp_new_vec.size();
129 p.header.version = 1;
130 p.header.checksum = 0;
131 p.content.buf = vsp_new_vec;
133 mqm_->getOutMsgQueue(
"vstig")->push(msg_data);
136 else if(local.lamport_clock > vsq.lamport_clock) {
137 gsdf_msgs::VirtualStigmergyPut vsp;
138 vsp.vstig_id = vsq.vstig_id;
140 vsp.value = local.vstig_value;
141 vsp.lamport_clock = local.lamport_clock;
142 vsp.robot_id = local.robot_id;
145 gsdf_msgs::CommPacket p;
146 p.header.source = shm_rid;
148 p.header.data_len = vsp_vec.size();
149 p.header.version = 1;
150 p.header.checksum = 0;
151 p.content.buf = vsp_vec;
153 mqm_->getOutMsgQueue(
"vstig")->push(msg_data);
155 else if((local.lamport_clock == vsq.lamport_clock) && (local.robot_id != vsq.robot_id)) {
165 if(!
rth_->inNeighbors(packet_source)){
168 gsdf_msgs::VirtualStigmergyPut vsp = deserialize_ros<gsdf_msgs::VirtualStigmergyPut>(packet_data);
171 bool exist =
rth_->getVirtualStigmergyTuple(vsp.vstig_id, vsp.key, local);
174 if(!exist||(local.lamport_clock < vsp.lamport_clock)) {
175 rth_->createVirtualStigmergy(vsp.vstig_id);
176 rth_->insertOrUpdateVirtualStigmergy(vsp.vstig_id, vsp.key, vsp.value, vsp.lamport_clock, time(NULL), 0, vsp.robot_id);
178 if(!
rth_->checkNeighborsOverlap(packet_source)) {
179 gsdf_msgs::VirtualStigmergyPut vsp_new;
180 vsp_new.vstig_id = vsp.vstig_id;
181 vsp_new.key = vsp.key;
182 vsp_new.value = vsp.value;
183 vsp_new.lamport_clock = vsp.lamport_clock;
184 vsp_new.robot_id = vsp.robot_id;
187 gsdf_msgs::CommPacket p;
188 p.header.source = shm_rid;
190 p.header.data_len = vsp_new_vec.size();
191 p.header.version = 1;
192 p.header.checksum = 0;
193 p.content.buf = vsp_new_vec;
195 mqm_->getOutMsgQueue(
"vstig")->push(msg_data);
198 else if((local.lamport_clock==vsp.lamport_clock)&&(local.robot_id!=vsp.robot_id)) {
208 if(!
rth_->inNeighbors(packet_source)){
211 gsdf_msgs::VirtualStigmergyPuts vsps = deserialize_ros<gsdf_msgs::VirtualStigmergyPuts>(packet_data);
213 bool process_msg =
false;
214 int neighbor_size =
rth_->getNeighborSize();
215 float prob = vsps.prob;
216 float rand_prob = (float)rand()/RAND_MAX;
218 if(neighbor_size < 3) {
222 if(rand_prob < prob) {
237 bool exist =
rth_->getVirtualStigmergyTuple(vsps.vstig_id, vsps.key, local);
241 rth_->createVirtualStigmergy(vsps.vstig_id);
242 rth_->insertOrUpdateVirtualStigmergy(vsps.vstig_id, vsps.key, vsps.value, vsps.lamport_clock, time(NULL), 0, vsps.robot_id);
244 if(!
rth_->checkNeighborsOverlap(packet_source)) {
245 gsdf_msgs::VirtualStigmergyPuts vsps_new;
246 vsps_new.vstig_id = vsps.vstig_id;
247 vsps_new.key = vsps.key;
248 vsps_new.value = vsps.value;
249 vsps_new.lamport_clock = vsps.lamport_clock;
250 vsps_new.robot_id = vsps.robot_id;
251 int neighbor_size =
rth_->getNeighborSize();
252 if(neighbor_size < 3) {
256 vsps_new.prob = 2.0/neighbor_size;
260 gsdf_msgs::CommPacket p;
261 p.header.source = shm_rid;
263 p.header.data_len = vsps_new_vec.size();
264 p.header.version = 1;
265 p.header.checksum = 0;
266 p.content.buf = vsps_new_vec;
268 mqm_->getOutMsgQueue(
"vstig")->push(msg_data);
281 if(!
rth_->inNeighbors(packet_source)){
284 gsdf_msgs::BlackBoardPut bbp = deserialize_ros<gsdf_msgs::BlackBoardPut>(packet_data);
285 int robot_id =
rth_->getRobotID();
286 std::string bb_key = bbp.key;
287 if(bbp.on_robot_id == robot_id) {
288 rth_->createBlackBoard(bbp.bb_id);
290 rth_->getBlackBoardTuple(bbp.bb_id, bbp.key, local);
293 if(!
rth_->isBlackBoardTupleExist(bbp.bb_id, bbp.key) || local.
timestamp < bbp.timestamp) {
294 rth_->insertOrUpdateBlackBoard(bbp.bb_id, bbp.key, bbp.value, bbp.timestamp, bbp.robot_id);
304 if(!
rth_->inNeighbors(packet_source)){
307 gsdf_msgs::BlackBoardQuery bbq = deserialize_ros<gsdf_msgs::BlackBoardQuery>(packet_data);
308 int robot_id =
rth_->getRobotID();
309 std::string bb_key = bbq.key;
311 if(bbq.on_robot_id == robot_id) {
313 rth_->getBlackBoardTuple(bbq.bb_id, bbq.key, local);
314 gsdf_msgs::BlackBoardAck bba;
315 bba.bb_id = bbq.bb_id;
316 bba.on_robot_id = bbq.on_robot_id;
320 bba.robot_id = bbq.robot_id;
323 gsdf_msgs::CommPacket p;
324 p.header.source = shm_rid;
326 p.header.data_len = bbqa_vec.size();
327 p.header.version = 1;
328 p.header.checksum = 0;
329 p.content.buf = bbqa_vec;
331 mqm_->getOutMsgQueue(
"bb")->push(msg_data);
340 if(!
rth_->inNeighbors(packet_source)){
343 gsdf_msgs::BlackBoardAck bba = deserialize_ros<gsdf_msgs::BlackBoardAck>(packet_data);
344 int robot_id =
rth_->getRobotID();
345 std::string bb_key = bba.key;
347 if(bba.on_robot_id == robot_id) {
351 rth_->createBlackBoard(bba.bb_id);
352 rth_->insertOrUpdateBlackBoard(bba.bb_id, bba.key, bba.value, bba.timestamp, bba.robot_id);
358 if(!
rth_->inNeighbors(packet_source)){
361 gsdf_msgs::SCDSPSOPut scds_put = deserialize_ros<gsdf_msgs::SCDSPSOPut>(packet_data);
363 bool exist =
rth_->getSCDSPSOValue(scds_put.key, local);
366 if ((!exist) || (local.val < scds_put.val)) {
368 new_data.
pos = scds_put.pos;
369 new_data.
val = scds_put.val;
370 new_data.
robot_id = scds_put.robot_id;
371 new_data.
gen = scds_put.gen;
373 rth_->insertOrUpdateSCDSPSOValue(scds_put.key, new_data);
375 if(!
rth_->checkNeighborsOverlap(packet_source)) {
376 gsdf_msgs::CommPacket p;
377 p.header.source = shm_rid;
379 p.header.data_len = packet_data.size();
380 p.header.version = 1;
381 p.header.checksum = 0;
382 p.content.buf = packet_data;
384 mqm_->getOutMsgQueue(
"scds_pso")->push(msg_data);
391 if(!
rth_->inNeighbors(packet_source)){
394 gsdf_msgs::SCDSPSOGet scds_get = deserialize_ros<gsdf_msgs::SCDSPSOGet>(packet_data);
396 bool exist =
rth_->getSCDSPSOValue(scds_get.key, local);
399 if ((!exist) || (local.val < scds_get.val)) {
401 new_data.
pos = scds_get.pos;
402 new_data.
val = scds_get.val;
403 new_data.
robot_id = scds_get.robot_id;
404 new_data.
gen = scds_get.gen;
406 rth_->insertOrUpdateSCDSPSOValue(scds_get.key, new_data);
408 if(!
rth_->checkNeighborsOverlap(packet_source)) {
409 gsdf_msgs::CommPacket p;
410 p.header.source = shm_rid;
412 p.header.data_len = packet_data.size();
413 p.header.version = 1;
414 p.header.checksum = 0;
415 p.content.buf = packet_data;
417 mqm_->getOutMsgQueue(
"scds_pso")->push(msg_data);
420 else if(local.val > scds_get.val) {
421 gsdf_msgs::SCDSPSOPut scds_put;
422 scds_put.key = scds_get.key;
423 scds_put.pos = local.pos;
424 scds_put.val = local.val;
425 scds_put.robot_id = local.robot_id;
426 scds_put.gen = local.gen;
427 scds_put.timestamp = local.timestamp;
428 std::vector<uint8_t> scds_put_vec =
serialize_ros(scds_put);;
430 gsdf_msgs::CommPacket p;
431 p.header.source = shm_rid;
433 p.header.data_len = scds_put_vec.size();
434 p.header.version = 1;
435 p.header.checksum = 0;
436 p.content.buf = scds_put_vec;
438 mqm_->getOutMsgQueue(
"scds_pso")->push(msg_data);
447 if(!
rth_->inNeighbors(packet_source)){
450 gsdf_msgs::NeighborBroadcastKeyValue nbkv = deserialize_ros<gsdf_msgs::NeighborBroadcastKeyValue>(packet_data);
455 helper->call(nbkv.value);
460 gsdf_msgs::BarrierSyn bs = deserialize_ros<gsdf_msgs::BarrierSyn>(packet_data);
465 gsdf_msgs::BarrierAck ba;
466 ba.robot_id = packet.header.source;
469 gsdf_msgs::CommPacket p;
470 p.header.source = shm_rid;
472 p.header.data_len = ba_vec.size();
473 p.header.version = 1;
474 p.header.checksum = 0;
475 p.content.buf = ba_vec;
477 mqm_->getOutMsgQueue(
"barrier")->push(msg_data);
481 gsdf_msgs::BarrierAck ba = deserialize_ros<gsdf_msgs::BarrierAck>(packet_data);
483 if(shm_rid == ba.robot_id) {
484 rth_->insertBarrier(packet.header.source);
491 std::cout<<
"UNDEFINED PACKET TYPE!"<<std::endl;
495 catch(
char *err_str) {
496 std::cout<<err_str<<std::endl;
501 void PacketParser::PacketParser::parse(
const std::vector<char>& data)
503 std::vector<uint8_t> seq;
504 for(
int i = 0; i < data.size(); i++) {
505 uint8_t c = (uint8_t)(data[i]);
513 std::vector<uint8_t> seq;
514 for(
int i = 0; i < len; i++) {
515 seq.push_back(*(data+i));
522 uint8_t *p = (uint8_t*)data;
boost::shared_ptr< micros_swarm::RuntimeHandle > rth_
boost::shared_ptr< micros_swarm::MsgQueueManager > mqm_
std::vector< uint8_t > serialize_ros(T t)
void parse(const std::vector< uint8_t > &data)
static boost::shared_ptr< T > getSingleton()
boost::shared_ptr< CheckNeighborInterface > cni_
std::vector< uint8_t > bb_value