00001
00051 #include "header.h"
00052
00053 bool getGroupStateF(adhoc_communication::GetGroupState::Request &req, adhoc_communication::GetGroupState::Response &res)
00054 {
00055 boost::unique_lock<boost::mutex> lock(mtx_mc_groups);
00056 McTree *t = mc_handler.getMcGroup(&req.group_name);
00057
00058 if (t != NULL)
00059 {
00060 res.activated = t->activated;
00061 res.connected = t->connected;
00062 res.root = t->root;
00063 res.member = t->member;
00064
00065 for (std::list<mac*>::iterator it = t->downlinks_l_.begin(); it != t->downlinks_l_.end(); ++it)
00066 {
00067 mac* m = *it;
00068 res.downlinks.push_back(getHostnameFromMac(m->mac_adr));
00069 }
00070
00071 if (!t->root)
00072 res.route_uplink = "Next hop:" + getHostnameFromMac(t->route_uplink_->next_hop) + " RD:" + getIntAsString(t->route_uplink_->root_distance) + " HOBS:" + getIntAsString(t->route_uplink_->hobs) + " CH:" + getIntAsString(t->route_uplink_-> current_hop);
00073
00074 else
00075 res.route_uplink = "";
00076
00077 return true;
00078 }
00079 else
00080 {
00081 res.route_uplink = "No mc entry found!";
00082 }
00083 }
00084
00085
00089 void shutDown()
00090 {
00091 ros::shutdown();
00092 exit(0);
00093 }
00094
00095 bool shutDownRos(adhoc_communication::ShutDown::Request &req, adhoc_communication::ShutDown::Response &res)
00096 {
00097 boost::thread d(shutDown);
00098 return true;
00099 }
00100
00108 bool joinMCGroup(adhoc_communication::ChangeMCMembership::Request &req, adhoc_communication::ChangeMCMembership::Response &res)
00109 {
00110
00111
00112
00113 if (req.action)
00114 {
00115 ROS_INFO("Try to join group: %s", req.group_name.c_str());
00116
00117 res.status = false;
00118
00119 boost::unique_lock<boost::mutex> lock_mc_groups(mtx_mc_groups);
00120 McTree *t = mc_handler.getMcGroup(&req.group_name);
00121
00122 if (t != NULL && t->connected && t->activated)
00123 {
00124
00125 t->member = true;
00126 res.status = true;
00127
00128 return true;
00129 }
00130 else if (t != NULL && t->outgoing_request_ != NULL && getMillisecondsTime() - t->time_stamp_ < INTERVAL_WAIT_FOR_MCROUTES)
00131 {
00132
00133 lock_mc_groups.unlock();
00134
00135 sleepMS(INTERVAL_WAIT_FOR_MCROUTES - (getMillisecondsTime() - t->time_stamp_));
00136
00137 sleepMS(INTERVAL_WAIT_FOR_MCROUTES * 2);
00138 lock_mc_groups.lock();
00139
00140 t = mc_handler.getMcGroup(&req.group_name);
00141
00142 if (t->activated)
00143 {
00144
00145 res.status = true;
00146 t->member = true;
00147
00148 return true;
00149 }
00150 else
00151 {
00152 t->resetTmpFields();
00153 res.status = false;
00154 }
00155
00156 }
00157 else if (t != NULL && (!t->connected || !t->activated))
00158 {
00159 t->resetTmpFields();
00160 res.status = false;
00161 }
00162 else if (t != NULL)
00163 t->printTree();
00164
00165 if (t == NULL || (t != NULL && t->outgoing_request_ == NULL))
00166 {
00167 mc_handler.addGroup(&req.group_name);
00168 {
00169 boost::unique_lock<boost::mutex> lock(mtx_routing_table);
00170 Logging::logRoutingTable(&routing_table_l, &mc_groups_l);
00171 }
00172
00173 uint8_t join_attempts = 0;
00174
00175
00176 route_request rreq_logging;
00177 rreq_logging.id = RouteRequest::req_count_stat;
00178 rreq_logging.hostname_source = hostname;
00179 rreq_logging.is_mc = true;
00180 rreq_logging.hostname_destination = req.group_name;
00181
00182 while (join_attempts < MAX_INTERN_JOIN_ATTEMPS && res.status == false)
00183 {
00184 RouteRequest* rr = new RouteRequest(hostname, req.group_name, hop_limit_min, true);
00185
00186
00187 route_request rreq_out;
00188 rreq_out.id = rr->header_.id;
00189 rreq_out.hop_limit = hop_limit_min;
00190 rreq_out.hostname_source = hostname;
00191 rreq_out.is_mc = true;
00192 rreq_out.hostname_destination = req.group_name;
00193
00194 {
00195 lock_mc_groups.unlock();
00196 boost::unique_lock<boost::mutex> lock(mtx_route_requests);
00197 route_requests_l.push_front(rreq_out);
00198 lock_mc_groups.lock();
00199 }
00200
00201
00202 routing_entry route;
00203
00204 ROS_INFO("SEND JOIN REQUEST: HOST[%s] ID[%u]", req.group_name.c_str(), rr->header_.id);
00205
00206 t = mc_handler.getMcGroup(&req.group_name);
00207
00208 t->safeOutgoingRequest(rr);
00209 string network_string = rr->getRequestAsNetworkString(src_mac);
00210 socketSend(network_string);
00211
00212 lock_mc_groups.unlock();
00213 #ifdef PERFORMANCE_LOGGING_MC_LINK_SUMMARY
00214 Logging::increaseProperty("num_mc_rrep_sent");
00215 Logging::increaseProperty("num_mc_total_bytes_sent ", network_string.length());
00216 #endif
00217 sleepMS(INTERVAL_WAIT_FOR_MCROUTES);
00218 lock_mc_groups.lock();
00219
00220 res.status = !t->routing_entries_l_.empty() || (t->activated && t->connected);
00221
00222 rreq_logging.retransmitted = join_attempts;
00223 join_attempts++;
00224 }
00225
00226
00227 if (res.status && t->activateBestRoute(&rreq_logging))
00228 {
00229 t->member = true;
00230 t->connected = true;
00231 t->activated = true;
00232
00233
00234 McRouteActivationFrame r_act_frame = McRouteActivationFrame(t->route_uplink_->next_hop, t->group_name_, t->route_uplink_->id, t->route_uplink_->hostname_source);
00235 string network_string = r_act_frame.getFrameAsNetworkString(src_mac);
00236
00237 resendUnackLinkFrame("", r_act_frame.header_.id, r_act_frame.header_.mac_destination, network_string, FRAME_TYPE_MC_ACTIVATION);
00238
00239 ROS_INFO("ACTIVATE MC ROUTE: MC GROUP[%s] NEXT HOP[%s]", t->group_name_.c_str(), getMacAsStr(t->route_uplink_->next_hop).c_str());
00240 ROS_INFO("SUCCESSFULLY JOINED GROUP MC: [%s]", req.group_name.c_str());
00241
00242 socketSend(network_string);
00243 #ifdef PERFORMANCE_LOGGING_MC_LINK_SUMMARY
00244 Logging::increaseProperty("num_mc_ract_sent");
00245 Logging::increaseProperty("num_mc_total_bytes_sent ", network_string.length());
00246 #endif
00247
00248 res.status = true;
00249
00250
00251 while (!t->waiting_requests_l_.empty())
00252 {
00253
00254 RouteRequest *req = t->waiting_requests_l_.front();
00255 t->waiting_requests_l_.pop_front();
00256 lock_mc_groups.unlock();
00257 processRouteRequest(*req);
00258 lock_mc_groups.lock();
00259
00260 delete req;
00261 }
00262
00263 return true;
00264 }
00265 else
00266 {
00267 ROS_WARN("COULD NOT JOIN GROUP MC: [%s]", req.group_name.c_str());
00268 }
00269 }
00270 else
00271 {
00272 ROS_WARN("UNEXPECTED FAILURE: 2345 %s", req.group_name.c_str());
00273 }
00274 }
00275 else
00276 {
00277
00278 boost::unique_lock<boost::mutex> lock_mc_group(mtx_mc_groups);
00279 McTree* t = mc_handler.getMcGroup(&req.group_name);
00280
00281 if (t != NULL)
00282 {
00283 if (t->downlinks_l_.empty())
00284 {
00285 if (t->root)
00286 {
00287 ROS_ERROR("CANNOT DISCONNECT FROM OWN GROUP: [%s]", req.group_name.c_str());
00288 return true;
00289 }
00290 ROS_WARN("DISCONNECT FROM MC TREE: MC GROUP[%s] NEXT HOP[%s]", t->group_name_.c_str(), getMacAsStr(t->route_uplink_->next_hop).c_str());
00291
00292 McDisconnectFrame disc_frame(t->route_uplink_->next_hop, req.group_name);
00293 disc_frame.disconnect_uplink = true;
00294
00295
00296 string network_string = disc_frame.getFrameAsNetworkString(src_mac);
00297 #ifdef PERFORMANCE_LOGGING_MC_LINK_SUMMARY
00298
00299 Logging::increaseProperty("num_mc_total_frames_sent");
00300 Logging::increaseProperty("num_mc_total_bytes_sent ", network_string.length());
00301
00302 #endif
00303
00304 resendUnackLinkFrame("", disc_frame.header_.id, disc_frame.header_.mac_destination, network_string, FRAME_TYPE_MC_DISCONNECT);
00305
00306 socketSend(network_string);
00307
00308
00309 mc_handler.removeGroup(&req.group_name);
00310 }
00311 else
00312 {
00313 t->member = false;
00314 ROS_INFO("SET MEMBERSHIP TO FALSE: GROUP[%s]", t->group_name_.c_str());
00315 }
00316 }
00317 else
00318 {
00319 ROS_INFO("NOT A MC MEMBER!");
00320 }
00321 }
00322
00323 return false;
00324 }
00325
00326 bool getNeighbors(adhoc_communication::GetNeighbors::Request &req, adhoc_communication::GetNeighbors::Response & res)
00327 {
00328
00329
00330
00331 boost::unique_lock<boost::mutex> lock(mtx_neighbors);
00332 for (std::list<hostname_mac>::iterator it = neighbors_l.begin(); it != neighbors_l.end(); ++it)
00333 {
00334 hostname_mac current_frame = *it;
00335 string hn = current_frame.hostname;
00336
00337 if (current_frame.reachable)
00338 res.neigbors.push_back(hn);
00339 }
00340
00341 return true;
00342 }
00343
00344
00345 bool sendQuaternion(adhoc_communication::SendQuaternion::Request &req,
00346 adhoc_communication::SendQuaternion::Response & res)
00347 {
00348
00349
00350
00351 #ifdef PERFORMANCE_LOGGING_SERVICE_CALLS
00352 unsigned long time_call = getMillisecondsTime();
00353 #endif
00354
00355 ROS_DEBUG("Service called to QUATERNION...");
00356 string s_msg = getSerializedMessage(req.quaternion);
00357
00358 res.status = sendPacket(req.dst_robot, s_msg, FRAME_DATA_TYPE_QUATERNION, req.topic);
00359
00360 #ifdef PERFORMANCE_LOGGING_SERVICE_CALLS
00361 Logging::logServiceCalls("SendQuaternion", time_call, getMillisecondsTime(), s_msg.size(), res.status);
00362 #endif
00363 return true;
00364 }
00365
00366 bool sendMap(adhoc_communication::SendOccupancyGrid::Request &req, adhoc_communication::SendOccupancyGrid::Response & res)
00367 {
00368
00369
00370
00371 #ifdef PERFORMANCE_LOGGING_SERVICE_CALLS
00372 unsigned long time_call = getMillisecondsTime();
00373 #endif
00374
00375 ROS_DEBUG("Service called to send map..");
00376 string s_msg = getSerializedMessage(req.map);
00377 res.status = sendPacket(req.dst_robot, s_msg, FRAME_DATA_TYPE_MAP, req.topic);
00378
00379 #ifdef PERFORMANCE_LOGGING_SERVICE_CALLS
00380 Logging::logServiceCalls("SendOccupancyGrid", time_call, getMillisecondsTime(), s_msg.size(), res.status);
00381 #endif
00382 return res.status;
00383 }
00384
00385 bool sendBroadcast(string& topic, string& data, uint8_t type, uint16_t range)
00386 {
00387 MultiHopBroadcastFrame b(topic, data, hostname, type, range);
00388 string n = b.getFrameAsNetworkString(src_mac);
00389 if (n.length() > ETHER_MAX_LEN)
00390 {
00391 ROS_ERROR("BROADCAST FRAME IS TOO BIG!");
00392 }
00393 socketSend(n);
00394 return true;
00395 }
00396
00397 bool sendBroadcastString(adhoc_communication::BroadcastString::Request &req, adhoc_communication::BroadcastString::Response & res)
00398 {
00399
00400
00401
00402 #ifdef PERFORMANCE_LOGGING_SERVICE_CALLS
00403 unsigned long time_call = getMillisecondsTime();
00404 #endif
00405 ROS_DEBUG("Service called to send broadcast string..");
00406
00407
00408
00409
00410 res.status = sendBroadcast(req.topic, req.data, FRAME_DATA_TYPE_ANY, req.hop_limit);
00411
00412 #ifdef PERFORMANCE_LOGGING_SERVICE_CALLS
00413 Logging::logServiceCalls("BroadcastString", time_call, getMillisecondsTime(), req.data.size(), res.status);
00414 #endif
00415 return res.status;
00416 }
00417
00418 bool sendBroadcastCMgrRobotUpdate(adhoc_communication::BroadcastCMgrRobotUpdate::Request &req, adhoc_communication::BroadcastCMgrRobotUpdate::Response & res)
00419 {
00420
00421
00422
00423 #ifdef PERFORMANCE_LOGGING_SERVICE_CALLS
00424 unsigned long time_call = getMillisecondsTime();
00425 #endif
00426 ROS_DEBUG("Service called to send broadcast update..");
00427
00428 string s_msg = getSerializedMessage(req.update);
00429
00430
00431 res.status = sendBroadcast(req.topic, s_msg, FRAME_DATA_TYPE_ROBOT_UPDATE, req.hop_limit);
00432
00433 #ifdef PERFORMANCE_LOGGING_SERVICE_CALLS
00434 Logging::logServiceCalls("BroadcastCMgrRobotUpdate", time_call, getMillisecondsTime(), s_msg.size(), res.status);
00435 #endif
00436 return res.status;
00437 }
00438
00439 bool sendRobotUpdate(adhoc_communication::SendCMgrRobotUpdate::Request &req, adhoc_communication::SendCMgrRobotUpdate::Response & res)
00440 {
00441
00442
00443
00444 #ifdef PERFORMANCE_LOGGING_SERVICE_CALLS
00445 unsigned long time_call = getMillisecondsTime();
00446 #endif
00447 ROS_DEBUG("Service called to send robot update..");
00448
00449 string s_msg = getSerializedMessage(req.update);
00450 res.status = sendPacket(req.dst_robot, s_msg, FRAME_DATA_TYPE_ROBOT_UPDATE, req.topic);
00451
00452 #ifdef PERFORMANCE_LOGGING_SERVICE_CALLS
00453 Logging::logServiceCalls("SendCMgrRobotUpdate", time_call, getMillisecondsTime(), s_msg.size(), res.status);
00454 #endif
00455 return res.status;
00456 }
00457
00458 bool sendTwist(adhoc_communication::SendTwist::Request &req, adhoc_communication::SendTwist::Response & res)
00459 {
00460
00461
00462
00463 #ifdef PERFORMANCE_LOGGING_SERVICE_CALLS
00464 unsigned long time_call = getMillisecondsTime();
00465 #endif
00466 ROS_DEBUG("Service called to send twist..");
00467 string s_msg = getSerializedMessage(req.twist);
00468
00469 res.status = sendPacket(req.dst_robot, s_msg, FRAME_DATA_TYPE_TWIST, req.topic);
00470 #ifdef PERFORMANCE_LOGGING_SERVICE_CALLS
00471 Logging::logServiceCalls("SendTwist", time_call, getMillisecondsTime(), s_msg.size(), res.status);
00472 #endif
00473 return res.status;
00474 }
00475
00476 bool sendPosition(adhoc_communication::SendMmRobotPosition::Request &req, adhoc_communication::SendMmRobotPosition::Response & res)
00477 {
00478 #ifdef PERFORMANCE_LOGGING_SERVICE_CALLS
00479 unsigned long time_call = getMillisecondsTime();
00480 #endif
00481
00482
00483
00484 ROS_DEBUG("Service called to send position..");
00485
00486 string s_msg = getSerializedMessage(req.position);
00487 res.status = sendPacket(req.dst_robot, s_msg, FRAME_DATA_TYPE_POSITION, req.topic);
00488
00489 #ifdef PERFORMANCE_LOGGING_SERVICE_CALLS
00490 Logging::logServiceCalls("SendMmRobotPosition", time_call, getMillisecondsTime(), s_msg.size(), res.status);
00491 #endif
00492 return res.status;
00493 }
00494
00495 bool sendAuction(adhoc_communication::SendExpAuction::Request &req, adhoc_communication::SendExpAuction::Response & res)
00496 {
00497 #ifdef PERFORMANCE_LOGGING_SERVICE_CALLS
00498 unsigned long time_call = getMillisecondsTime();
00499 #endif
00500
00501
00502
00503
00504 ROS_DEBUG("Service called to send Auction..");
00505
00506 string s_msg = getSerializedMessage(req.auction);
00507 res.status = sendPacket(req.dst_robot, s_msg, FRAME_DATA_TYPE_AUCTION, req.topic);
00508
00509 #ifdef PERFORMANCE_LOGGING_SERVICE_CALLS
00510 Logging::logServiceCalls("SendExpAuction", time_call, getMillisecondsTime(), s_msg.size(), res.status);
00511 #endif
00512 return res.status;
00513 }
00514
00515 bool sendCluster(adhoc_communication::SendExpCluster::Request &req, adhoc_communication::SendExpCluster::Response & res)
00516 {
00517 #ifdef PERFORMANCE_LOGGING_SERVICE_CALLS
00518 unsigned long time_call = getMillisecondsTime();
00519 #endif
00520
00521
00522
00523
00524 ROS_DEBUG("Service called to send cluster..");
00525
00526 string s_msg = getSerializedMessage(req.cluster);
00527 res.status = sendPacket(req.dst_robot, s_msg, FRAME_DATA_TYPE_CLUSTER, req.topic);
00528
00529 #ifdef PERFORMANCE_LOGGING_SERVICE_CALLS
00530 Logging::logServiceCalls("SendExpCluster", time_call, getMillisecondsTime(), s_msg.size(), res.status);
00531 #endif
00532 return res.status;
00533 }
00534
00535 bool sendString(adhoc_communication::SendString::Request &req, adhoc_communication::SendString::Response & res)
00536 {
00537
00538
00539
00540 #ifdef PERFORMANCE_LOGGING_SERVICE_CALLS
00541 unsigned long time_call = getMillisecondsTime();
00542 if (hostname.compare(req.dst_robot) == 0)
00543 return false;
00544 #endif
00545 ROS_DEBUG("Service called to send string..");
00546
00547 res.status = sendPacket(req.dst_robot, req.data, FRAME_DATA_TYPE_ANY, req.topic);
00548
00549 #ifdef PERFORMANCE_LOGGING_SERVICE_CALLS
00550
00551 Logging::logServiceCalls("SendString", time_call, getMillisecondsTime(), req.data.size(), res.status);
00552 #endif
00553 return res.status;
00554 }
00555
00556 bool sendPoint(adhoc_communication::SendMmPoint::Request &req, adhoc_communication::SendMmPoint::Response & res)
00557 {
00558
00559
00560
00561
00562 #ifdef PERFORMANCE_LOGGING_SERVICE_CALLS
00563 unsigned long time_call = getMillisecondsTime();
00564 #endif
00565 ROS_DEBUG("Service called to send point..");
00566
00567 string s_msg = getSerializedMessage(req.point);
00568 res.status = sendPacket(req.dst_robot, s_msg, FRAME_DATA_TYPE_POINT, req.topic);
00569
00570 #ifdef PERFORMANCE_LOGGING_SERVICE_CALLS
00571 Logging::logServiceCalls("SendMmPoint", time_call, getMillisecondsTime(), s_msg.size(), res.status);
00572 #endif
00573 return res.status;
00574 }
00575
00576 bool sendControlMessage(adhoc_communication::SendMmControl::Request &req, adhoc_communication::SendMmControl::Response & res)
00577 {
00578
00579
00580
00581 ROS_DEBUG("Service called to send control message..");
00582 #ifdef PERFORMANCE_LOGGING_SERVICE_CALLS
00583 unsigned long time_call = getMillisecondsTime();
00584 #endif
00585 string s_msg = getSerializedMessage(req.msg);
00586 res.status = sendPacket(req.dst_robot, s_msg, FRAME_DATA_TYPE_CONTROLL_MSG, req.topic);
00587
00588 #ifdef PERFORMANCE_LOGGING_SERVICE_CALLS
00589 Logging::logServiceCalls("SendMmControl", time_call, getMillisecondsTime(), s_msg.size(), res.status);
00590 #endif
00591 return res.status;
00592 }
00593
00594 bool sendMapUpdate(adhoc_communication::SendMmMapUpdate::Request &req, adhoc_communication::SendMmMapUpdate::Response & res)
00595 {
00596
00597
00598
00599
00600 ROS_DEBUG("Service called to send map update..");
00601 #ifdef PERFORMANCE_LOGGING_SERVICE_CALLS
00602 unsigned long time_call = getMillisecondsTime();
00603 #endif
00604 string s_msg = getSerializedMessage(req.map_update);
00605 res.status = sendPacket(req.dst_robot, s_msg, FRAME_DATA_TYPE_MAP_UPDATE, req.topic);
00606
00607 #ifdef PERFORMANCE_LOGGING_SERVICE_CALLS
00608 Logging::logServiceCalls("SendMmMapUpdate", time_call, getMillisecondsTime(), s_msg.size(), res.status);
00609 #endif
00610 return res.status;
00611 }
00612
00613 bool sendFrontier(adhoc_communication::SendExpFrontier::Request &req, adhoc_communication::SendExpFrontier::Response & res)
00614 {
00615
00616
00617
00618
00619 #ifdef PERFORMANCE_LOGGING_SERVICE_CALLS
00620 unsigned long time_call = getMillisecondsTime();
00621 #endif
00622 string s_msg = getSerializedMessage(req.frontier);
00623 ROS_DEBUG("Service called to send frontier..");
00624
00625
00626 res.status = sendPacket(req.dst_robot, s_msg, FRAME_DATA_TYPE_FRONTIER, req.topic);
00627
00628 #ifdef PERFORMANCE_LOGGING_SERVICE_CALLS
00629 Logging::logServiceCalls("SendExpFrontier", time_call, getMillisecondsTime(), s_msg.size(), res.status);
00630 #endif
00631 return res.status;
00632 }
00633
00634 bool sendRouteRequest(string* dst, routing_entry* route)
00635 {
00636 RouteRequest rr = RouteRequest(hostname, *dst, hop_limit_min, false);
00637
00638
00639 route_request out_req;
00640 out_req.id = rr.header_.id;
00641 out_req.hop_limit = hop_limit_min;
00642 out_req.is_mc = false;
00643 out_req.hostname_source = hostname;
00644 out_req.ts = getMillisecondsTime();
00645 out_req.retransmitted = 0;
00646 out_req.hostname_destination = *dst;
00647 out_req.response_sent = false;
00648 {
00649 boost::unique_lock<boost::mutex> lock(mtx_route_requests);
00650 route_requests_l.push_front(out_req);
00651 }
00652
00653
00654 ROS_INFO("SEND ROUTE REQUEST: HOST[%s] ID[%u]", dst->c_str(), rr.header_.id);
00655 string network_string = rr.getRequestAsNetworkString(src_mac);
00656 socketSend(network_string);
00657 #ifdef PERFORMANCE_LOGGING_UC_LINK_SUMMARY
00658 Logging::increaseProperty("num_rreq_sent");
00659 Logging::increaseProperty("num_total_bytes_sent", network_string.length());
00660 #endif
00661
00662
00663 {
00664 boost::unique_lock<boost::mutex> lock(mtx_notify);
00665
00666 boost::thread resendRouteRequestT(resendRouteRequest, boost::ref(out_req), boost::ref(got_request_response));
00667
00668 got_request_response.wait(lock);
00669 resendRouteRequestT.interrupt();
00670 }
00671
00672
00673 bool route_found = getRoute(*route, *dst);
00674
00675
00676 if (route_found)
00677 Logging::logRRequestInitiater(&out_req, route);
00678 else
00679 Logging::logRRequestInitiater(&out_req, NULL);
00680
00681 return route_found;
00682 }
00683
00684 bool gotAckRoutedFrame(routing_entry& route)
00685 {
00686 unsigned long start_time = getMillisecondsTime();
00687 {
00688 boost::unique_lock<boost::mutex> lockAckRF(mtx_notify);
00689 boost::thread resendRoutedFrameT(resendRoutedFrame, boost::ref(got_rframe_ack), route);
00690 got_rframe_ack.wait(lockAckRF);
00691 resendRoutedFrameT.interrupt();
00692 }
00693 unsigned long end_time = getMillisecondsTime();
00694
00695 boost::unique_lock<boost::mutex> loc(mtx_unack_routed_frame);
00696 if (unack_routed_frame->frame_is_ack)
00697 {
00698
00699 }
00700 else
00701 {
00702 ROS_WARN("Could not sent frame: ID[%u] destination host[%s]", unack_routed_frame->frame.header_.packet_id, route.hostname_destination.c_str());
00703 {
00704 loc.unlock();
00705 boost::unique_lock<boost::mutex> loc_rt(mtx_routing_table);
00706 routing_table_l.remove(route);
00707 }
00708
00709 loc.lock();
00710 }
00711 Logging::logTransportFrame(unack_routed_frame, &route, start_time, end_time, unack_routed_frame->frame_is_ack);
00712 return unack_routed_frame->frame_is_ack;
00713 }
00714
00715 bool sendPacket(std::string &hostname_destination, std::string& payload, uint8_t data_type_, std::string& topic)
00716 {
00717 packet_id_++;
00718
00719
00720
00721
00722
00723
00724
00725
00726
00727
00728
00729
00730
00731
00732
00733
00734 if (hostname_destination.compare(hostname) == 0)
00735 {
00736 ROS_WARN("Robot cannot send data to itself!");
00737 return false;
00738 }
00739
00740
00741
00742
00743
00744 bool bcast = (hostname_destination.compare("") == 0);
00745 bool send_successfully;
00746 bool mc_frame = false;
00747 routing_entry route;
00748
00749 if (!bcast)
00750 {
00751 ROS_DEBUG("DESTINATION: [%s]", hostname_destination.c_str());
00752
00753 send_successfully = getRoute(route, hostname_destination);
00754
00755 if (!send_successfully)
00756 {
00757
00758 boost::unique_lock<boost::mutex> lock_mc(mtx_mc_groups);
00759 McTree* mc_t = mc_handler.getMcGroup(&hostname_destination);
00760 if (mc_t != NULL && mc_t->connected && mc_t->activated)
00761 {
00762
00763 ROS_DEBUG("DESTINATION IS A MC GROUP: [%s]", mc_t->group_name_.c_str());
00764 mc_frame = true;
00765 send_successfully = true;
00766 bcast = true;
00767 memcpy(route.next_hop, bcast_mac, 6);
00768 route.hostname_destination = hostname_destination;
00769 route.hostname_source = hostname;
00770
00771 if (mc_t->root && mc_t->downlinks_l_.size() == 0)
00772 {
00773 ROS_DEBUG("THERE ARE NO OTHER MEMBERS IN THE MC GROUP [%s]", mc_t->group_name_.c_str());
00774 return true;
00775 }
00776 }
00777 }
00778
00779 if (!send_successfully)
00780 {
00781 send_successfully = sendRouteRequest(&hostname_destination, &route);
00782 }
00783 }
00784 else
00785 {
00786
00787 memcpy(route.next_hop, bcast_mac, 6);
00788 route.hostname_destination = "";
00789 route.hostname_source = hostname;
00790 send_successfully = true;
00791 }
00792
00793 if (!send_successfully)
00794 {
00795 ROS_WARN("NO ROUTED FOUND TO [%s]", hostname_destination.c_str());
00796 return false;
00797 }
00798
00799
00800
00801 send_successfully = false;
00802 uint32_t header_len = RoutedFrame::HEADER_FIXED_LEN;
00803 header_len += hostname.length();
00804 header_len += topic.length();
00805 header_len += mc_frame ? route.hostname_destination.length() : 0;
00806
00807
00808 uint32_t payload_len = payload.length();
00809 uint32_t payload_space = (max_frame_size - header_len);
00810
00811 float frames_count_f = payload_len / (float) payload_space;
00812 uint32_t frames_count = frames_count_f;
00813 if (frames_count_f > frames_count)
00814 frames_count++;
00815
00816 uint32_t packet_size = payload_len + (frames_count * header_len);
00817
00818 ROS_DEBUG("PACKET INFO: PL[%u] PL SPACE[%u] HEADER LEN[%u] SIZE[%u]", payload_len, payload_space, header_len, frames_count);
00819
00820 if (packet_size >= (unsigned) max_packet_size)
00821 {
00822 ROS_ERROR("Packet[%uBYTES] is too big! Try to increase the 'max_packet_size' parameter [%uBYTES]", packet_size, max_packet_size);
00823 return false;
00824 }
00825 else if (header_len >= (unsigned) max_frame_size)
00826 {
00827 ROS_ERROR("Header so long that there is no space for payload! Try to increase the 'max_frame_size' parameter!");
00828 return false;
00829 }
00830
00831
00832 Packet packet;
00833 list<RoutedFrame> frames;
00834
00835 int pos = 0;
00836
00837
00838
00839 for (unsigned int i = 0; i < frames_count; i++)
00840 {
00841 std::string p(payload, pos, payload_space);
00842 pos += payload_space;
00843 RoutedFrame f = RoutedFrame(topic, p, data_type_, packet_id_, i, frames_count);
00844
00845 f.mc_flag = mc_frame;
00846 f.negative_ack_type = frames_count > Packet::NACK_THRESHOLD;
00847 f.mc_g_name_ = mc_frame ? route.hostname_destination : "";
00848 f.hostname_source_ = hostname;
00849
00850
00851 if (i == 0)
00852 packet = Packet(f);
00853
00854 if (f.negative_ack_type)
00855 packet.frames_l_.push_back(f);
00856 frames.push_back(f);
00857
00858 }
00859
00860
00861 if (packet.isNack())
00862 {
00863 boost::unique_lock<boost::mutex> lock(mtx_cached_mc_packets);
00864 packet.ts_ = getMillisecondsTime() + packet.size_ * 10;
00865 packet.hostname_source_ = hostname;
00866
00867 cached_mc_packets_l.push_front(packet);
00868 }
00869
00870 bool cannot_send_frame = false;
00871
00872 for (unsigned int i = 0; i < frames_count && cannot_send_frame == false; i++)
00873 {
00874 RoutedFrame f = frames.front();
00875 string network_string = f.getFrameAsNetworkString(route, src_mac);
00876
00877
00878 if (packet.isMcFrame() && !packet.isNack())
00879 {
00880 McPosAckObj* ack_obj = NULL;
00881 {
00882
00883
00884 boost::unique_lock<boost::mutex> lock_mc(mtx_mc_groups);
00885 McTree* mc_t = mc_handler.getMcGroup(&hostname_destination);
00886 ack_obj = new McPosAckObj(&f, mc_t);
00887 }
00888 {
00889 boost::unique_lock<boost::mutex> lock(mtx_mc_ack_l);
00890 mc_ack_l.push_back(ack_obj);
00891 }
00892 }
00893 else if (!packet.isMcFrame())
00894 {
00895
00896 {
00897 boost::unique_lock<boost::mutex> lock(mtx_unack_routed_frame);
00898
00899 unack_routed_frame->frame = f;
00900 unack_routed_frame->frame_is_ack = false;
00901 unack_routed_frame->time_stamp = getMillisecondsTime();
00902 unack_routed_frame->retransmitted = 0;
00903 memcpy(unack_routed_frame->frame.header_.mac_destination_, route.next_hop, 6);
00904 unack_routed_frame->hostname_destination = hostname_destination;
00905 memcpy(unack_routed_frame->mac, f.header_.mac_destination_, 6);
00906
00907 }
00908
00909 {
00910
00911
00912 resendUnackLinkFrame(hostname, f.header_.frame_id, route.next_hop, network_string, FRAME_TYPE_TRANSPORT_DATA);
00913 #ifdef PERFORMANCE_LOGGING_UC_LINK_SUMMARY
00914 Logging::increaseProperty("num_unique_data_frames_sent");
00915
00916 Logging::increaseProperty("num_total_bytes_sent", network_string.length());
00917 #endif
00918 }
00919 }
00920
00921
00922 if (packet.isNack() && f.header_.packet_sequence_num % 10 == 0)
00923 sleepMS(5);
00924
00925 socketSend(network_string);
00926
00927 #ifdef DELAY
00928 if (simulation_mode)
00929 boost::this_thread::sleep(boost::posix_time::milliseconds(delay));
00930 #endif
00931
00932
00933 #ifdef PERFORMANCE_LOGGING_MC_LINK_SUMMARY
00934 if (f.mc_flag)
00935 {
00936 Logging::increaseProperty("num_mc_unique_data_frames_sent");
00937 Logging::increaseProperty("num_mc_total_bytes_sent ", network_string.length());
00938 }
00939 #endif
00940
00941
00942 if (packet.isMcFrame() && !packet.isNack())
00943 {
00944 stc_RoutedFrame* stc_rf = new stc_RoutedFrame();
00945 stc_rf->frame = f;
00946 stc_rf->frame_is_ack = false;
00947 stc_rf->hostname_destination = hostname_destination;
00948 stc_rf->retransmitted = 0;
00949 stc_rf->time_stamp = getMillisecondsTime();
00950
00951
00952 McPosAckObj* mc_ack = getMcAckObj(&f.mc_g_name_, f.header_.packet_id, f.header_.packet_sequence_num);
00953 if (mc_ack != NULL && mc_ack->missing_acks_l.empty())
00954 {
00955 stc_rf->frame_is_ack = true;
00956 }
00957 else
00958 {
00959 boost::thread resendMcAckFrameT(deliverMcAckFrame, boost::ref(*stc_rf), boost::ref(route), false);
00960 resendMcAckFrameT.join();
00961 }
00962
00963
00964 if (stc_rf->frame_is_ack)
00965 {
00966 send_successfully = true;
00967 }
00968 else
00969 {
00970 ROS_WARN("Could not sent pos ack packet with ID[%u] to group[%s]", stc_rf->frame.header_.packet_id, hostname_destination.c_str());
00971
00972 return false;
00973 }
00974 delete stc_rf;
00975 }
00976
00977 if (!bcast)
00978 {
00979 if (!gotAckRoutedFrame(route))
00980 {
00981
00982 routing_entry new_route;
00983 if (sendRouteRequest(&hostname_destination, &new_route))
00984 {
00985 if (new_route.samePath(route) == false)
00986 {
00987 route = routing_entry(new_route);
00988 i--;
00989 continue;
00990 }
00991
00992 }
00993
00994 ROS_WARN("Could not sent unicast packet: ID[%u] destination host[%s]", unack_routed_frame->frame.header_.packet_id, route.hostname_destination.c_str());
00995 return false;
00996 }
00997 else
00998 send_successfully = true;
00999 }
01000
01001 frames.pop_front();
01002 }
01003 if (!bcast)
01004 ROS_INFO("SUCCESSFULLY SEND PACKET: ID[%u] DEST[%s]", packet_id_, hostname_destination.c_str());
01005 else if (packet.isMcFrame())
01006 {
01007 send_successfully = true;
01008 if (packet.isNack())
01009 {
01010 ROS_INFO("SUCCESSFULLY SEND MC NEGATIVE ACK PACKET: ID[%u] GROUP[%s]", packet_id_, hostname_destination.c_str());
01011 }
01012 else
01013 {
01014 ROS_INFO("SUCCESSFULLY SEND MC POSITIVE ACK PACKET: ID[%u] GROUP[%s]", packet_id_, hostname_destination.c_str());
01015 }
01016 }
01017 else
01018 {
01019 send_successfully = true;
01020 ROS_INFO("SEND BROADCAST PACKET: ID[%u]", packet_id_);
01021 }
01022
01023 return send_successfully;
01024 }
01025
01026 void checkMutexAvailability(boost::mutex* m, string name)
01027 {
01028 while (ros::ok())
01029 {
01030 sleepMS(100);
01031
01032 if (!mtx_packets_incomplete.try_lock())
01033 ROS_ERROR("mutex %s is not free", name.c_str());
01034 else
01035 mtx_packets_incomplete.unlock();
01036 }
01037 }
01038
01039 int main(int argc, char **argv)
01040 {
01041
01042
01043
01044
01045
01046
01047 ros::init(argc, argv, "adhoc_communication");
01048
01049 ros::NodeHandle b("~");
01050 ros::NodeHandle b_pub;
01051
01052
01053 n_priv = &b;
01054 n_pub = &b_pub;
01055
01056 unack_routed_frame = new stc_RoutedFrame();
01057
01058 initParams(n_priv);
01059 initFrameTypes();
01060
01061
01062 eth_raw_init();
01063
01064
01065
01066 if (hostname.compare("") == 0)
01067 {
01068 char hostname_c[1024];
01069 hostname_c[1023] = '\0';
01070 gethostname(hostname_c, 1023);
01071 hostname = std::string(hostname_c);
01072 }
01073
01074 #ifdef PRINT_PARAMS
01075 ROS_ERROR("NODE STARTED WITH FOLLOWING SETTINGS:");
01076 ROS_ERROR("HOSTNAME[%s] MAC[%s] INTERFACE[%s]", hostname.c_str(), getMacAsStr(src_mac).c_str(), interface);
01077 ROS_ERROR("num_link_retrans=%u num_e2e_retrans=%u num_rreq=%u", num_link_retrans, num_e2e_retrans, num_rreq);
01078 ROS_ERROR("max_packet_size=%u max_frame_size=%u hop_limit_min=%u hop_limit_max=%u", max_packet_size, max_frame_size, hop_limit_min, hop_limit_max);
01079 ROS_ERROR("hop_limit_increment=%u beacon_interval=%u", hop_limit_increment, beacon_interval);
01080 ROS_ERROR("rebuild_mc_tree=%s", getBoolAsString(rebuild_mc_tree).c_str());
01081 #endif
01082
01083
01084
01085 std::string robot_prefix = "";
01086 if (simulation_mode)
01087 {
01088 robot_prefix = "/" + hostname + "/";
01089 }
01090
01091 ros::ServiceServer sendRobotUpdateS = n_pub->advertiseService(robot_prefix + node_prefix + "send_robot_update",
01092 sendRobotUpdate);
01093
01094 ros::ServiceServer sendTwistS = n_pub->advertiseService(robot_prefix + node_prefix + "send_twist",
01095 sendTwist);
01096 ros::ServiceServer sendMapS = n_pub->advertiseService(robot_prefix + node_prefix + "send_map", sendMap);
01097 ros::ServiceServer sendPostionS = n_pub->advertiseService(robot_prefix + node_prefix + "send_position", sendPosition);
01098 ros::ServiceServer sendStringS = n_pub->advertiseService(robot_prefix + node_prefix + "send_string", sendString);
01099 ros::ServiceServer getNeighborsS = n_pub->advertiseService(robot_prefix + node_prefix + "get_neighbors", getNeighbors);
01100 ros::ServiceServer sendPointS = n_pub->advertiseService(robot_prefix + node_prefix + "send_point", sendPoint);
01101 ros::ServiceServer sendMapUpdateS = n_pub->advertiseService(robot_prefix + node_prefix + "send_map_update", sendMapUpdate);
01102 ros::ServiceServer sendControlMsgS = n_pub->advertiseService(robot_prefix + node_prefix + "send_control_message", sendControlMessage);
01103 ros::ServiceServer joinMCGroupS = n_pub->advertiseService(robot_prefix + node_prefix + "join_mc_group", joinMCGroup);
01104 ros::ServiceServer sendFrontierS = n_pub->advertiseService(robot_prefix + node_prefix + "send_frontier", sendFrontier);
01105 ros::ServiceServer sendClusterS = n_pub->advertiseService(robot_prefix + node_prefix + "send_cluster", sendCluster);
01106 ros::ServiceServer sendAuctionS = n_pub->advertiseService(robot_prefix + node_prefix + "send_auction", sendAuction);
01107 ros::ServiceServer sendBcastS = n_pub->advertiseService(robot_prefix + node_prefix + "broadcast_robot_update", sendBroadcastCMgrRobotUpdate);
01108 ros::ServiceServer sendBcastStringS = n_pub->advertiseService(robot_prefix + node_prefix + "broadcast_string", sendBroadcastString);
01109
01110 ros::ServiceServer shutDownRosS = n_pub->advertiseService(robot_prefix + node_prefix + "shut_down", shutDownRos);
01111
01112 ros::ServiceServer getGroupStatusS = n_pub->advertiseService(robot_prefix + node_prefix + "get_group_state", getGroupStateF);
01113
01114 publishers_l.push_front(n_pub->advertise<std_msgs::String>(robot_prefix + node_prefix + topic_new_robot, 1000, true));
01115 publishers_l.push_front(n_pub->advertise<std_msgs::String>(robot_prefix + node_prefix + topic_remove_robot, 1000, true));
01116
01117 Logging::init(n_pub, &hostname);
01118
01119 signal(SIGSEGV, handler);
01120
01121
01122
01123
01124
01125 boost::thread receiveFramesT(receiveFrames);
01126 boost::thread processIncomingFramesT(processIncomingFrames);
01127
01128 boost::thread deleteObsoleteRequestsT(deleteObsoleteRequests);
01129
01130 boost::thread requestPendingFramesT(requestPendingFrames);
01131 boost::thread sendBeaconsT(sendBeacons);
01132 boost::thread deleteOldPacketsT(deleteOldPackets);
01133 boost::thread resendRequestedFramesT(resendRequestedFrames);
01134
01135
01136
01137
01138 list<ros::Subscriber> sub_robot_pos_l;
01139
01140 boost::thread log_mem_consumption_packets(&Logging::logMemoryConsumptionPackets, 5000,
01141 &mtx_packets_incomplete, &packets_incomplete_l,
01142 &mtx_cached_mc_packets, &cached_mc_packets_l,
01143 &mtx_unack_link_frames, &unack_link_frames_l,
01144 &mtx_unack_cr_frames, &unack_cr_frames_l
01145 );
01146
01147
01148 #ifdef PERFORMANCE_LOGGING_UC_LINK_SUMMARY
01149 boost::thread log_uc_frames(Logging::logUcPacketsSummary, 5000);
01150 #endif
01151
01152 #ifdef PERFORMANCE_LOGGING_MC_LINK_SUMMARY
01153 boost::thread log_mc_frames(Logging::logMcPacketsSummary, 5000);
01154 #endif
01155
01156
01157 #ifdef PERFORMANCE_LOGGING
01158 boost::thread periodix(Logging::periodicLog, 10000);
01159 #endif
01160
01161 if (simulation_mode)
01162 {
01163 my_sim_position = new PositionSubscriber();
01164 my_sim_position->robot_name_ = hostname;
01165 std::string topic = "/" + hostname;
01166 topic.append("/base_pose_ground_truth");
01167
01168 const char* robot_number_pos = hostname.data() + 6;
01169 try
01170 {
01171 my_sim_position->robot_number_ = boost::lexical_cast<uint32_t>(std::string(robot_number_pos));
01172 } catch (boost::bad_lexical_cast const&)
01173 {
01174 ROS_FATAL("If parameter simulation mode is set, the hostname of the robot must be like the robot names of the stage robot names, e.g., robot_0 or robot_1 etc.");
01175 return 0;
01176 }
01177
01178 for (int i = 0; i < robots_in_simulation; i++)
01179 {
01180 if ((unsigned) i != my_sim_position->robot_number_)
01181 {
01182 std::string i_as_str = getIntAsString(i);
01183 std::string topic_to_sub = "/robot_";
01184 topic_to_sub.append(i_as_str);
01185
01186 topic_to_sub.append("/base_pose_ground_truth");
01187 PositionSubscriber* sub = new PositionSubscriber();
01188 sub->robot_name_ = std::string("robot_").append(i_as_str);
01189 sub->robot_number_ = i;
01190 sub_robot_pos_l.push_front(n_pub->subscribe(topic_to_sub, 1, &PositionSubscriber::Subscribe, &*sub));
01191 robot_positions_l.push_front(sub);
01192 }
01193 else
01194 sub_robot_pos_l.push_front(n_pub->subscribe(topic, 1, &PositionSubscriber::Subscribe, &*my_sim_position));
01195 }
01196 }
01197
01198
01199 {
01200 ROS_INFO("Create own group! %s ", std::string("mc").append("_").append(hostname).c_str());
01201 boost::unique_lock<boost::mutex> lock(mtx_mc_groups);
01202 mc_handler.createGroupAsRoot(&std::string("mc").append("_").append(hostname));
01203 }
01204
01205
01206
01207
01208 #ifdef JOIN_ALL_GROUPS
01209 if (simulation_mode)
01210 {
01211 boost::thread joinT(&joinAllMcGroups);
01212 #ifdef DEBUG_OUTPUT
01213
01214 #endif
01215
01216
01217 }
01218 #endif
01219
01220
01221
01222 while (ros::ok())
01223 {
01224 ros::spin();
01225 }
01226 Logging::log();
01227
01228 sleepMS(1000);
01229
01230 delete unack_routed_frame;
01231
01232
01233 receiveFramesT.join();
01234 sendBeaconsT.join();
01235
01236
01237
01238
01239 close_raw_socket();
01240
01241
01242 ROS_INFO("Node terminating...");
01243
01244 return 0;
01245 }
01246
01247 bool gotAck(AckLinkFrame* inc_frame)
01248 {
01249
01250
01251
01252
01253
01254
01255
01256
01257
01258
01259
01260 stc_frame f;
01261 f.type = inc_frame->header_.ack_frame_type;
01262 memcpy(f.mac, inc_frame->header_.mac_confirmer, 6);
01263 f.hostname_source = inc_frame->hostname_source_;
01264 f.frame_id = inc_frame->header_.frame_id;
01265 f.mc_group = "";
01266
01267 {
01268 boost::unique_lock<boost::mutex> lock(mtx_unack_link_frames);
01269 std::list<stc_frame>::iterator i = std::find(unack_link_frames_l.begin(), unack_link_frames_l.end(), f);
01270 if (i != unack_link_frames_l.end())
01271 {
01272 #ifdef PERFORMANCE_LOGGING_UC_LINK_SUMMARY
01273 if (compareMac(inc_frame->header_.mac_confirmer, inc_frame->eh_h_.eh_source))
01274 Logging::increaseProperty("num_acknowledgments_received_directly");
01275 else
01276 Logging::increaseProperty("num_acknowledgments_received_relay");
01277
01278 Logging::increaseProperty("num_total_bytes_received", inc_frame->buffer_str_len_);
01279 #endif
01280
01281 #ifdef PERFORMANCE_LOGGING_UC_LINK_FRAMES
01282 Logging::logUcLinkTransmission(*i);
01283 #endif
01284 ROS_INFO("GOT ACK OF LINK FRAME: ID[%u] SOURCE MAC[%s] TYPE[%s]", (*i).frame_id, getMacAsStr((*i).mac).c_str(), frame_types[(*i).type].c_str());
01285 unack_link_frames_l.remove(*i);
01286 return true;
01287 }
01288 }
01289
01290 {
01291 boost::unique_lock<boost::mutex> lock(mtx_unack_cr_frames);
01292
01293 for (std::list<ack_cr_info>::iterator i = unack_cr_frames_l.begin(); i != unack_cr_frames_l.end();)
01294 {
01295 if ((*i).id == inc_frame->header_.frame_id && (*i).source_host.compare(inc_frame->hostname_source_) == 0 && compareMac((*i).frame_dst_mac, inc_frame->header_.mac_confirmer))
01296 {
01297 if (compareMac((*i).frame_src_mac, inc_frame->header_.mac_destination_))
01298 ROS_INFO("GOT ACK OF CR FRAME: ID[%u] SOURCE MAC[%s]", (*i).id, getMacAsStr(inc_frame->eh_h_.eh_source).c_str());
01299 else if (compareMac(inc_frame->header_.mac_destination_, src_mac))
01300 {
01301 ROS_INFO("GOT ACK OF RETRANSMITTED CR FRAME: ID[%u] SOURCE MAC[%s]", (*i).id, getMacAsStr(inc_frame->eh_h_.eh_source).c_str());
01302 sendLinkAck((*i).frame_src_mac, (*i).frame_dst_mac, (*i).id, (*i).source_host, false, (*i).frame_type);
01303 #ifdef PERFORMANCE_LOGGING_UC_LINK_SUMMARY
01304 Logging::increaseProperty("num_acknowledgments_relayed");
01305 Logging::increaseProperty("num_total_bytes_sent", AckLinkFrame::HEADER_FIXED_LEN + (*i).source_host.length());
01306 #endif
01307 }
01308
01309
01310 #ifdef PERFORMANCE_LOGGING_UC_LINK_SUMMARY
01311 Logging::increaseProperty("num_acknowledgments_received_relay");
01312 Logging::increaseProperty("num_total_bytes_received", inc_frame->buffer_str_len_);
01313 #endif
01314
01315 unack_cr_frames_l.erase(i);
01316 return true;
01317 }
01318 else if (getMillisecondsTime() - (*i).ts >= MAX_TIME_CACHE_UNACK_CR_FRAMES)
01319 {
01320 ROS_WARN("DROP UNACKNOWLEGED CR FRAME: ID[%u] SOURCE MAC[%s]", (*i).id, getMacAsStr(inc_frame->eh_h_.eh_source).c_str());
01321 i = unack_cr_frames_l.erase(i);
01322 }
01323 else
01324 i++;
01325 }
01326 }
01327 return false;
01328 }
01329
01330 void receiveFrames()
01331 {
01332
01333
01334
01335
01336
01337
01338
01339 unsigned char* buffer_incoming = new unsigned char[ETHER_MAX_LEN];
01340
01341 uint8_t max_safed_frames = 1;
01342 std::vector<std::string> frames_as_strings;
01343 uint8_t current_buffer_index = 0;
01344 uint16_t buffer_size = 0;
01345 for (uint8_t i = 0; i < max_safed_frames; i++)
01346 frames_as_strings.push_back(std::string(" "));
01347
01348
01349 while (ros::ok())
01350 {
01351
01352
01353
01354 recvfrom(raw_socket, buffer_incoming, ETHER_MAX_LEN, 0, NULL, NULL);
01355
01356
01357
01358
01359
01360
01361
01362
01363
01364
01365 ethernet_header = (struct eh_header *) buffer_incoming;
01366
01367
01368 if (ethernet_header->eh_protocol_id != ntohs(ETH_TYPE))
01369 continue;
01370
01371
01372
01373 if (compareMac(ethernet_header->eh_source, src_mac))
01374 continue;
01375
01376 uint8_t frame_type;
01377 unsigned char* frameTypeP = buffer_incoming + 14;
01378 memcpy((unsigned char*) &frame_type, (unsigned char*) frameTypeP, 1);
01379
01380
01381 unsigned char* dest_mac_p = buffer_incoming + 15;
01382 bool packet_4_me = compareMac(dest_mac_p, src_mac);
01383 bool packet_is_bcast = compareMac(dest_mac_p, bcast_mac);
01384 bool process_frame = true;
01385
01386
01387
01388
01389
01390
01391
01392
01393 if (loss_ratio > 0)
01394 {
01395 if (incoming_frame_count == 0)
01396 {
01397 incoming_frame_count++;
01398 ROS_ERROR("------------------------------------------------------------------------------");
01399 ROS_ERROR("----------------------LOSE FRAMES IS ACTIVATED!!------------------------------");
01400 ROS_ERROR("------------------------------------------------------------------------------");
01401 }
01402 if (frame_type == FRAME_TYPE_TRANSPORT_DATA || frame_type == FRAME_TYPE_TRANSPORT_ACK)
01403 {
01404 incoming_frame_count++;
01405
01406 if ((float) incoming_frames_lost / (float) incoming_frame_count <= loss_ratio)
01407 {
01408 incoming_frames_lost++;
01409 continue;
01410 }
01411 }
01412 }
01413
01414 #ifdef USE_CHANNEL_MODEL
01415 process_frame = isReachable(ethernet_header->eh_source);
01416 #endif
01417 if (!process_frame)
01418 {
01419 continue;
01420 }
01421
01422
01423
01424 uint8_t buffer_index = current_buffer_index - 1 >= 0 ? current_buffer_index - 1 : max_safed_frames - 1;
01425
01426
01427
01428 if (!process_frame)
01429 {
01430 continue;
01431 }
01432
01433 buffer_size = 250;
01434
01435
01436 if (frame_type == FRAME_TYPE_MC_ACK)
01437 {
01438 McAckFrame frame(buffer_incoming);
01439
01440
01441 if (frame.correct_crc_)
01442 {
01443 processMcAckFrame(&frame);
01444 }
01445 else
01446 ROS_ERROR("MC ACK FRAME: CRC INCORECCT!");
01447
01448 }
01449 else if (frame_type == FRAME_TYPE_MC_NACK)
01450 {
01451 McNackFrame frame(buffer_incoming);
01452
01453
01454 if (frame.correct_crc_)
01455 {
01456
01457 if (iAmMember(frame.mc_group_))
01458 {
01459 boost::unique_lock<boost::mutex> lock(mtx_requested_frames);
01460 requested_frames_l.push_back(frame);
01461
01462 }
01463 }
01464 else
01465 ROS_ERROR("MC NACK FRAME: CRC INCORECCT!");
01466 }
01467 else if (frame_type == FRAME_TYPE_ACK)
01468 {
01469 AckLinkFrame inc_ack_lframe(buffer_incoming);
01470 bool frame_is_4_me = compareMac(src_mac, inc_ack_lframe.header_.mac_destination_);
01471
01472 buffer_size = inc_ack_lframe.buffer_str_len_;
01473
01474
01475 if (enable_cooperative_relaying == false && !frame_is_4_me && inc_ack_lframe.pos_ack_flag_)
01476 {
01477 continue;
01478 }
01479 if (inc_ack_lframe.pos_ack_flag_)
01480 {
01481 gotAck(&inc_ack_lframe);
01482 }
01483 }
01484 else if (frame_type == FRAME_TYPE_REQUEST)
01485 {
01486 RouteRequest req(buffer_incoming);
01487 if (req.correct_crc_)
01488 {
01489 buffer_size = req.buffer_str_len_;
01490
01491 processRouteRequest(req);
01492 }
01493 else
01494 ROS_ERROR("ROUTE REQUEST: CRC INCORECCT!");
01495 }
01496 else if (frame_type == FRAME_TYPE_REPLY)
01497 {
01498 RouteResponse rr(buffer_incoming);
01499
01500 if (rr.correct_crc_)
01501 processRouteResponse(rr);
01502 else
01503 ROS_WARN("ROUTE RESPONSE: CRC INCORECCT!");
01504 }
01505 else if (frame_type == FRAME_TYPE_TRANSPORT_DATA)
01506 {
01507 RoutedFrame frame(buffer_incoming);
01508 buffer_size = frame.buffer_str_len_;
01509 if (packet_is_bcast)
01510 processBroadcastFrame(&frame);
01511 else
01512 processRoutedFrame(&frame);
01513 }
01514 else if (frame_type == FRAME_TYPE_TRANSPORT_ACK)
01515 {
01516 AckRoutedFrame rf(buffer_incoming);
01517 buffer_size = rf.buffer_str_len_;
01518
01519 processAckRoutedFrame(&rf);
01520 }
01521 else if (frame_type == FRAME_TYPE_BEACON)
01522 {
01523 Beacon hf(buffer_incoming);
01524 if (hf.correct_crc_)
01525 processBeacon(&hf);
01526 else
01527 ROS_ERROR("BEACON: CRC INCORECCT!");
01528
01529 }
01530 else if (frame_type == FRAME_TYPE_MC_ACTIVATION)
01531 {
01532
01533 McRouteActivationFrame mc_m_f(buffer_incoming);
01534
01535 if (mc_m_f.correct_crc_)
01536 {
01537 buffer_size = mc_m_f.buffer_str_len_;
01538 processMcActivationFrame(&mc_m_f);
01539 }
01540 else
01541 ROS_ERROR("MC ROUTE ACTIVATION: CRC INCORECCT!");
01542 }
01543 else if (frame_type == FRAME_TYPE_MC_DISCONNECT)
01544 {
01545 McDisconnectFrame mc_d(buffer_incoming);
01546
01547 if (mc_d.correct_crc_)
01548 {
01549 buffer_size = mc_d.buffer_str_len_;
01550
01551 if (mc_d.disconnect_uplink)
01552 processMcDisconnectUplink(&mc_d);
01553 else if (mc_d.disconnect_downlink)
01554 processMcDisconnectDownlink(&mc_d);
01555 else
01556 ROS_ERROR("UNKNOWN MC DISCONNECT FRAME!");
01557
01558
01559 }
01560 else
01561 ROS_ERROR("McDisconnectionFrame: CRC INCORECCT!");
01562
01563
01564 }
01565 else if (frame_type == FRAME_TYPE_BROADCAST)
01566 {
01567 MultiHopBroadcastFrame bcast(buffer_incoming);
01568
01569 if (bcast.correct_crc_ == false)
01570 {
01571 ROS_ERROR("MultiHopBroadcastFrame: CRC INCORECCT!");
01572 continue;
01573 }
01574
01575 bcasts bc_str(bcast.header_.id, bcast.hostname_source_);
01576 if (std::find(bcasts_l.begin(), bcasts_l.end(), bc_str) == bcasts_l.end() && bcast.hostname_source_.compare(hostname) != 0)
01577 {
01578 bcasts_l.push_back(bc_str);
01579
01580 buffer_size = bcast.buffer_str_len_;
01581 if (bcast.header_.payload_type == FRAME_DATA_TYPE_ROBOT_UPDATE)
01582 {
01583 adhoc_communication::CMgrRobotUpdate r_up;
01584 if (bcast.payload_.compare("") == 0)
01585 {
01586 ROS_ERROR("ERROR: PAYLOAD OF BCAST FRAME IS EMPTY!");
01587 continue;
01588 }
01589 desializeObject((unsigned char*) bcast.payload_.data(), bcast.payload_.length(), &r_up);
01590 publishMessage(r_up, bcast.topic_);
01591 }
01592 else if (bcast.header_.payload_type == FRAME_DATA_TYPE_ANY)
01593 {
01594
01595 if (bcast.payload_.compare("") == 0)
01596 {
01597 ROS_ERROR("ERROR: PAYLOAD OF BCAST FRAME IS EMPTY!");
01598 continue;
01599 }
01600 adhoc_communication::RecvString data;
01601 data.src_robot = bcast.hostname_source_;
01602 data.data = bcast.payload_;
01603 publishMessage(data, bcast.topic_);
01604 }
01605
01606 if (bcast.rebroadcast)
01607 {
01608 socketSend(bcast.getFrameAsNetworkString(src_mac));
01609 }
01610 }
01611 }
01612 else
01613 ROS_ERROR("UNKNOWN FRAMETYPE: [%u] source %s dest %s", frame_type, getMacAsStr(ethernet_header->eh_source).c_str(), getMacAsStr(dest_mac_p).c_str());
01614
01615
01616 if (frame_type != FRAME_TYPE_BEACON)
01617 {
01618 current_buffer_index = current_buffer_index + 1 < frames_as_strings.size() ? current_buffer_index + 1 : 0;
01619 frames_as_strings[current_buffer_index] = std::string((const char*) buffer_incoming, buffer_size);
01620 }
01621
01622 }
01623
01624 free(buffer_incoming);
01625 }
01626
01627 template<class message>
01628 void publishMessage(message m, string topic)
01629 {
01630
01631
01632
01633
01634
01635 try
01636 {
01637 bool pubExsists = false;
01638 std::string topic_w_prefix = "/" + hostname + "/" + topic;
01639 if (simulation_mode)
01640 topic = topic_w_prefix;
01641
01642 for (std::list<ros::Publisher>::iterator i = publishers_l.begin(); i != publishers_l.end(); i++)
01643 {
01644 if ((*i).getTopic().compare(topic) == 0)
01645 {
01646 (*i).publish(m);
01647 pubExsists = true;
01648 }
01649 }
01650 if (!pubExsists)
01651 {
01652 publishers_l.push_front(n_pub->advertise<message>(topic, 1000, true));
01653 publishers_l.front().publish(m);
01654
01655 }
01656 } catch (...)
01657 {
01658 ROS_ERROR("IN PUBLISH MESSAGE: NODE THROWS EXCEPTION!");
01659 }
01660 }
01661
01662 void sendBeacons()
01663 {
01664
01665
01666
01667
01668
01669
01670
01671
01672
01673 Beacon hf = Beacon(src_mac, hostname);
01674 string beacon = hf.getFrameAsNetworkString();
01675 uint16_t beacon_len = beacon.length();
01676 while (ros::ok())
01677 {
01678 #ifdef PERFORMANCE_LOGGING_UC_LINK_SUMMARY
01679 Logging::increaseProperty("num_beacons_sent");
01680 Logging::increaseProperty("num_total_bytes_sent", beacon_len);
01681 #endif
01682
01683 socketSend(beacon);
01684 {
01685 boost::unique_lock<boost::mutex> lock_neighbors(mtx_neighbors);
01686
01687 std::list<hostname_mac>::iterator i = neighbors_l.begin();
01688 while (i != neighbors_l.end())
01689 {
01690 hostname_mac & neighbor(*i);
01691
01692
01693 if (getMillisecondsTime() - neighbor.ts >= TIME_BEFORE_REMOVE_NEIGHBOR && neighbor.reachable)
01694 {
01695 ROS_ERROR("REMOVE NEIGHBOR: HOSTNAME[%s]", neighbor.hostname.c_str());
01696 std_msgs::String msg;
01697 msg.data = neighbor.hostname;
01698 lock_neighbors.unlock();
01699
01700 publishMessage(msg, node_prefix + topic_remove_robot);
01701 lock_neighbors.lock();
01702 neighbor.reachable = false;
01703
01704
01705 hostname_mac n;
01706 n.hostname = neighbor.hostname;
01707 memcpy((void*) n.mac, neighbor.mac, 6);
01708
01709
01710 boost::thread t(&mcLostConnection, n);
01711 }
01712 else
01713 {
01714 ROS_DEBUG("NEIGHBOR: HOSTNAME[%s] LAST BEACON GOT[%lu]", neighbor.hostname.c_str(), neighbor.ts);
01715 }
01716 i++;
01717 }
01718 }
01719
01720
01721 sleepMS(&beacon_interval);
01722 }
01723 }
01724
01725 McPosAckObj * getMcAckObj(std::string* group, uint32_t p_id, uint32_t seq)
01726 {
01727 boost::unique_lock<boost::mutex> lock(mtx_mc_ack_l);
01728 for (list<McPosAckObj*>::iterator i = mc_ack_l.begin(); i != mc_ack_l.end(); i++)
01729 {
01730 McPosAckObj* ack_obj = *i;
01731 if (ack_obj->group_name.compare(*group) == 0 && p_id == ack_obj->packet_id && ack_obj->sequence_num == seq)
01732 {
01733 return ack_obj;
01734 }
01735 }
01736 return NULL;
01737 }
01738
01739 void sendLinkAck(unsigned char* dest, unsigned char* confirmer_mac, uint32_t id, string source, bool cr, uint8_t type)
01740 {
01741 AckLinkFrame ack = AckLinkFrame(src_mac, confirmer_mac, dest, id, source, type);
01742 ack.cr_flag_ = cr;
01743
01744
01745 string network_string = ack.getFrameAsNetworkString();
01746 socketSend(network_string);
01747 #ifdef PERFORMANCE_LOGGING_UC_LINK_SUMMARY
01748 Logging::increaseProperty("num_acknowledgments_sent");
01749 Logging::increaseProperty("num_total_bytes_sent", network_string.length());
01750 #endif
01751
01752 }
01753
01754 void processMcAckFrame(McAckFrame* f)
01755 {
01756
01757 McPosAckObj* ack_obj = getMcAckObj(&f->mc_group_, f->header_.packet_id, f->header_.frame_seq_num);
01758 boost::unique_lock<boost::mutex> lock(mtx_mc_ack_l);
01759 if (ack_obj != NULL)
01760 {
01761 #ifdef PERFORMANCE_LOGGING_MC_LINK_SUMMARY
01762 Logging::increaseProperty("num_mc_acknowledgments_received_directly");
01763 Logging::increaseProperty("num_mc_total_bytes_received ", f->buffer_str_len_);
01764 #endif
01765
01766 if (ack_obj->GotAck(f))
01767 {
01768 boost::unique_lock<boost::mutex> lockd(mtx_notify);
01769 got_rframe_ack.notify_all();
01770
01771 ROS_DEBUG("GOT ALL MULTICAST ACK: GROUP[%s] HOST[%s] PACKET[%u] SEQUENCE[%u]", f->mc_group_.c_str(), f->hostname_source_.c_str(), f->header_.packet_id, f->header_.frame_seq_num);
01772
01773 if (f->hostname_source_.compare(hostname) != 0)
01774 {
01775
01776 McAckFrame arf = McAckFrame(src_mac, f->eh_h_.eh_source, f->hostname_source_, f->mc_group_, f->header_.packet_id, f->header_.frame_seq_num);
01777 string network_string = arf.getFrameAsNetworkString();
01778 socketSend(network_string);
01779 #ifdef PERFORMANCE_LOGGING_MC_LINK_SUMMARY
01780 Logging::increaseProperty("num_mc_acknowledgments_sent");
01781 Logging::increaseProperty("num_mc_total_bytes_sent ", network_string.length());
01782 #endif
01783 }
01784 }
01785 }
01786
01787
01788 }
01789
01790 void processAckRoutedFrame(AckRoutedFrame * in_a_rf)
01791 {
01792
01793
01794
01795
01796
01797
01798 stc_frame frame_to_safe;
01799 frame_to_safe.frame_id = in_a_rf->header_.frame_id;
01800 memcpy(frame_to_safe.mac, in_a_rf->eh_h_.eh_source, 6);
01801 frame_to_safe.hostname_source = std::string(in_a_rf->hostname_source_);
01802 frame_to_safe.mc_group = "";
01803
01804
01805 bool frame_is_for_me = compareMac(in_a_rf->header_.mac_destination_, src_mac);
01806
01807 if (frame_is_for_me)
01808 {
01809
01810 sendLinkAck(in_a_rf->eh_h_.eh_source, src_mac, frame_to_safe.frame_id, frame_to_safe.hostname_source, in_a_rf->cr_flag, in_a_rf->header_.frame_type);
01811 }
01812
01813 if (!frame_is_for_me && !enable_cooperative_relaying)
01814 {
01815 ROS_DEBUG("DONT PROCESS FRAME BECAUSE IT IS NOT FOR ME AND COOP_RELAYING IS TURNED OFF!");
01816 return;
01817 }
01818
01819 if (!in_a_rf->correct_crc_)
01820 {
01821 ROS_ERROR("ACK ROUTED FRAME: CRC INCORRECT!");
01822 return;
01823 }
01824
01825
01826 routing_entry route;
01827 {
01828 route.hostname_source = in_a_rf->hostname_source_;
01829 route.id = in_a_rf->header_.route_id;
01830
01831 boost::unique_lock<boost::mutex> lock(mtx_routing_table);
01832 std::list<routing_entry>::iterator find_route = std::find(routing_table_l.begin(), routing_table_l.end(), route);
01833
01834 if (find_route == routing_table_l.end())
01835 {
01836 ROS_DEBUG("ROUTE FOR THE ACK FRAME NOT FOUND: ID[%u] HOSTNAME SOURCE[%s] ROUTE ID[%u]", in_a_rf->header_.frame_id, in_a_rf->hostname_source_.c_str(), in_a_rf->header_.route_id);
01837 return;
01838 }
01839
01840 route.id = (*find_route).id;
01841 route.hostname_destination = (*find_route).hostname_destination;
01842 route.hostname_source = (*find_route).hostname_source;
01843 route.cr_entry = (*find_route).cr_entry;
01844 memcpy(route.next_hop, (*find_route).next_hop, 6);
01845 memcpy(route.previous_hop, (*find_route).previous_hop, 6);
01846 }
01847
01848 if (frame_is_for_me)
01849 {
01850
01851 if (route.hostname_source.compare(hostname) == 0)
01852 {
01853
01854 ROS_DEBUG("GOT ACKNOWLEGEMENT OF ROUTED FRAME: HOSTNAME[%s] ID[%u]", route.hostname_destination.c_str(), in_a_rf->header_.frame_id);
01855 #ifdef PERFORMANCE_LOGGING_UC_LINK_SUMMARY
01856 Logging::increaseProperty("num_data_frames_received_directly");
01857 Logging::increaseProperty("num_unique_data_frames_received_directly");
01858 Logging::increaseProperty("num_total_bytes_received", in_a_rf->buffer_str_len_);
01859 #endif
01860
01861 {
01862 boost::unique_lock<boost::mutex> lock(mtx_unack_routed_frame);
01863
01864
01865 if (in_a_rf->header_.frame_id == unack_routed_frame->frame.header_.frame_id && route.hostname_destination.compare(unack_routed_frame->hostname_destination) == 0)
01866 {
01867 unack_routed_frame->frame_is_ack = true;
01868 boost::unique_lock<boost::mutex> lockd(mtx_notify);
01869 got_rframe_ack.notify_all();
01870 }
01871 }
01872
01873 }
01874 else
01875 {
01876
01877
01878 string network_string = in_a_rf->getFrameAsNetworkString(route, src_mac);
01879
01880
01881 resendUnackLinkFrame(in_a_rf->hostname_source_, in_a_rf->header_.frame_id, route.previous_hop, network_string, FRAME_TYPE_TRANSPORT_ACK);
01882
01883 #ifdef PERFORMANCE_LOGGING_UC_LINK_SUMMARY
01884 Logging::increaseProperty("num_unique_data_frames_forwarded");
01885 Logging::increaseProperty("num_total_bytes_sent", network_string.length());
01886 #endif
01887
01888 socketSend(network_string);
01889 #ifdef DELAY
01890
01891 if (simulation_mode)
01892 boost::this_thread::sleep(boost::posix_time::milliseconds(delay));
01893 #endif
01894 }
01895 }
01896 }
01897
01898 void processRoutedFrame(RoutedFrame * f)
01899 {
01900
01901
01902
01903
01904
01905 RoutedFrame inc_frame = *f;
01906
01907 if (!inc_frame.correct_crc_)
01908 {
01909 ROS_ERROR("ROUTED FRAME: CRC INCORECCT!");
01910 return;
01911 }
01912
01913
01914 bool frame_is_for_me = compareMac(inc_frame.header_.mac_destination_, src_mac);
01915
01916 if (frame_is_for_me)
01917 {
01918
01919 sendLinkAck(inc_frame.eh_h_.eh_source, src_mac, inc_frame.header_.frame_id, inc_frame.hostname_source_, f->cr_flag, f->header_.frame_type);
01920 }
01921
01922 if (!frame_is_for_me)
01923 {
01924 ROS_DEBUG("DONT PROCESS FRAME BECAUSE IT IS NOT FOR ME AND COOP_RELAYING IS TURNED OFF!");
01925 return;
01926 }
01927
01928 bool cr_route = true;
01929 routing_entry route;
01930 #ifdef PERFORMANCE_LOGGING_UC_LINK_SUMMARY
01931 Logging::increaseProperty("num_data_frames_received_directly");
01932 Logging::increaseProperty("num_total_bytes_received", f->buffer_str_len_);
01933 #endif
01934
01935
01936
01937 if (gotFrameRecently(inc_frame))
01938 {
01939
01940 return;
01941 }
01942
01943 safeFrame(inc_frame);
01944
01945 #ifdef PERFORMANCE_LOGGING_UC_LINK_SUMMARY
01946 if (f->cr_flag)
01947 Logging::increaseProperty("num_unique_data_frames_received_relay");
01948 else
01949 Logging::increaseProperty("num_unique_data_frames_received_directly");
01950 #endif
01951
01952
01953
01954
01955 routing_entry route_f;
01956 route_f.id = inc_frame.header_.route_id;
01957 route_f.hostname_source = inc_frame.hostname_source_;
01958 {
01959 boost::unique_lock<boost::mutex> lock(mtx_routing_table);
01960 std::list<routing_entry>::iterator find_route = std::find(routing_table_l.begin(), routing_table_l.end(), route_f);
01961 if (find_route == routing_table_l.end())
01962 {
01963 ROS_DEBUG("ROUTE FOR THE ROUTED FRAME NOT FOUND: ID[%u] HOSTNAME SOURCE[%s] ROUTE ID[%u]", inc_frame.header_.frame_id, inc_frame.hostname_source_.c_str(), inc_frame.header_.route_id);
01964 return;
01965 }
01966
01967 routing_entry & tmp(*find_route);
01968 route.id = tmp.id;
01969 route.hostname_destination = tmp.hostname_destination;
01970 route.hostname_source = tmp.hostname_source;
01971 memcpy(route.next_hop, tmp.next_hop, 6);
01972 memcpy(route.previous_hop, tmp.previous_hop, 6);
01973 route.cr_entry = tmp.cr_entry;
01974
01975 }
01976
01977
01978 if (route.hostname_destination.compare(hostname) == 0)
01979 {
01980 ROS_DEBUG("GOT ROUTED FRAME: HOSTNAME[%s] ID[%u]", route.hostname_destination.c_str(), inc_frame.header_.frame_id);
01981
01982
01983
01984 AckRoutedFrame arf = AckRoutedFrame(inc_frame);
01985 std::string network_string = arf.getFrameAsNetworkString(route, src_mac);
01986 #ifdef DELAY
01987 if (simulation_mode)
01988 boost::this_thread::sleep(boost::posix_time::milliseconds(delay));
01989 #endif
01990
01991 resendUnackLinkFrame(arf.hostname_source_, arf.header_.frame_id, arf.header_.mac_destination_, network_string, FRAME_TYPE_TRANSPORT_ACK);
01992
01993 socketSend(network_string);
01994 ROS_DEBUG("SEND ACKNOWLEGEMENT OF ROUTED FRAME: ID[%u] DESTINATION[%s] NEXT HOP[%s]", inc_frame.header_.frame_id, route.hostname_destination.c_str(), getMacAsStr(route.previous_hop).c_str());
01995
01996 boost::unique_lock<boost::mutex> lock(mtx_inc_rf_frames);
01997 inc_frames_l.push_back(inc_frame);
01998
01999 }
02000 else
02001 {
02002 string network_string = inc_frame.getFrameAsNetworkString(route, src_mac);
02003
02004
02005 stc_frame frame_to_ack = inc_frame.getFrameStruct();
02006 frame_to_ack.network_string = network_string;
02007
02008 resendUnackLinkFrame(f->hostname_source_, f->header_.frame_id, inc_frame.header_.mac_destination_, network_string, FRAME_TYPE_TRANSPORT_DATA);
02009
02010 ROS_DEBUG("FORWARD ROUTED FRAME: FROM[%s] TO[%s]", getMacAsStr(route.previous_hop).c_str(), getMacAsStr(route.next_hop).c_str());
02011 #ifdef PERFORMANCE_LOGGING_UC_LINK_SUMMARY
02012 Logging::increaseProperty("num_unique_data_frames_forwarded");
02013 Logging::increaseProperty("num_total_bytes_sent ", network_string.length());
02014 #endif
02015
02016 socketSend(network_string);
02017
02018 }
02019 }
02020
02021 void sendMcAck(unsigned char* dst, string h_source, string g_name, uint32_t packet_id, uint32_t seq_num)
02022 {
02023 ROS_DEBUG("SEND POS MC ACKNOWLEGEMENT: ID[%u] GROUP[%s] SOURCE[%s] FROM[%s]", packet_id, g_name.c_str(), h_source.c_str(), getHostnameFromMac(dst).c_str());
02024
02025
02026 McAckFrame arf = McAckFrame(src_mac, dst, h_source, g_name, packet_id, seq_num);
02027 string network_string = arf.getFrameAsNetworkString();
02028 socketSend(network_string);
02029 #ifdef PERFORMANCE_LOGGING_MC_LINK_SUMMARY
02030 Logging::increaseProperty("num_mc_acknowledgments_sent");
02031 Logging::increaseProperty("num_mc_total_bytes_sent ", network_string.length());
02032 #endif
02033 }
02034
02035 void deliverMcAckFrame(stc_RoutedFrame& stc_rf, routing_entry& r, bool send_ack)
02036 {
02037 Logging::increaseProperty("running_multicast_threads");
02038
02039
02040 while (stc_rf.frame_is_ack == false && stc_rf.retransmitted <= num_e2e_retrans)
02041 {
02042 {
02043 boost::unique_lock<boost::mutex> lockAckRF(mtx_notify);
02044 boost::thread* resendMcFrameT = new boost::thread(resendMcFrame, boost::ref(got_rframe_ack), &stc_rf, r);
02045 got_rframe_ack.wait(lockAckRF);
02046 resendMcFrameT->interrupt();
02047 delete resendMcFrameT;
02048 }
02049
02050 McPosAckObj* mc_ack = getMcAckObj(&stc_rf.frame.mc_g_name_, stc_rf.frame.header_.packet_id, stc_rf.frame.header_.packet_sequence_num);
02051
02052 boost::unique_lock<boost::mutex> ack_lock(mtx_mc_ack_l);
02053 if (mc_ack != NULL && mc_ack->missing_acks_l.empty())
02054 {
02055 stc_rf.frame_is_ack = true;
02056 }
02057 }
02058
02059 if (send_ack && stc_rf.frame_is_ack)
02060 {
02061
02062 sendMcAck(stc_rf.frame.eh_h_.eh_source, stc_rf.frame.hostname_source_, stc_rf.frame.mc_g_name_, stc_rf.frame.header_.packet_id, stc_rf.frame.header_.packet_sequence_num);
02063 }
02064
02065 if (stc_rf.frame_is_ack == false)
02066 {
02067 ROS_WARN("COULD NOT SENT MC ACK FRAME: SOURCE[%s] GROUP[%s] PACK_ID[%u] SEQ_N[%u]", stc_rf.frame.hostname_source_.c_str(), stc_rf.frame.mc_g_name_.c_str(), stc_rf.frame.header_.packet_id, stc_rf.frame.header_.packet_sequence_num);
02068 }
02069 Logging::decreaseProperty("running_multicast_threads");
02070 }
02071
02072 void processBroadcastFrame(RoutedFrame * f)
02073 {
02074
02075
02076
02077
02078
02079
02080 RoutedFrame inc_frame(*f);
02081
02082
02083
02084 if (inc_frame.mc_flag)
02085 {
02086 boost::unique_lock<boost::mutex> lock(mtx_mc_groups);
02087 McTree* g = mc_handler.getMcGroup(&inc_frame.mc_g_name_);
02088
02089 if (g != NULL && (!g->activated || !g->connected || !g->processFrame(inc_frame.eh_h_.eh_source)))
02090 {
02091
02092 return;
02093 }
02094 }
02095
02096 if (!inc_frame.correct_crc_)
02097 {
02098 ROS_ERROR("BROADCAST FRAME: CRC INCORECCT!");
02099 return;
02100 }
02101
02102
02103 if (inc_frame.hostname_source_.compare(hostname) == 0)
02104 return;
02105
02106
02107 #ifdef PERFORMANCE_LOGGING_UC_LINK_SUMMARY
02108 if (!inc_frame.mc_flag)
02109 {
02110 Logging::increaseProperty("num_data_frames_received_directly");
02111 Logging::increaseProperty("num_total_bytes_received", f->buffer_str_len_);
02112 }
02113
02114 #endif
02115
02116 #ifdef PERFORMANCE_LOGGING_MC_LINK_SUMMARY
02117 if (inc_frame.mc_flag)
02118 {
02119 Logging::increaseProperty("num_mc_data_frames_received_directly");
02120 Logging::increaseProperty("num_mc_total_bytes_received ", f->buffer_str_len_);
02121 }
02122 #endif
02123
02124 if (gotFrameRecently(inc_frame))
02125 {
02126
02127 if (f->negative_ack_type == false || (f->cr_flag && f->mc_flag))
02128 sendMcAck(inc_frame.eh_h_.eh_source, inc_frame.hostname_source_, inc_frame.mc_g_name_, inc_frame.header_.packet_id, inc_frame.header_.packet_sequence_num);
02129
02130 return;
02131 }
02132
02133 #ifdef PERFORMANCE_LOGGING_UC_LINK_SUMMARY
02134 if (!inc_frame.mc_flag)
02135 Logging::increaseProperty("num_unique_data_frames_received_directly");
02136 #endif
02137
02138 #ifdef PERFORMANCE_LOGGING_MC_LINK_SUMMARY
02139 if (inc_frame.mc_flag)
02140 Logging::increaseProperty("num_mc_data_frames_received_directly");
02141 #endif
02142
02143 safeFrame(inc_frame);
02144
02145
02146 if (inc_frame.mc_flag)
02147 {
02148 bool propagated = false;
02149 {
02150 boost::unique_lock<boost::mutex> lock_groups(mtx_mc_groups);
02151 McTree* mc_t = mc_handler.getMcGroup(&inc_frame.mc_g_name_);
02152
02153 if (mc_t == NULL)
02154 return;
02155
02156 if (!mc_t->activated)
02157 {
02158 ROS_ERROR("MC ROUTE IS NOT ACTIVE [%s]", inc_frame.mc_g_name_.c_str());
02159 return;
02160 }
02161 #ifdef PERFORMANCE_LOGGING_MC_LINK_SUMMARY
02162 Logging::increaseProperty("num_mc_unique_data_frames_received_directly");
02163 #endif
02164
02165
02166 propagated = mc_t->propagateFrame(inc_frame.eh_h_.eh_source);
02167 }
02168 ROS_INFO("GOT MC FRAME: ID[%u] GROUP[%s] SOURCE[%s] FROM[%s] P_ID[%u] SEQ_N[%u]", inc_frame.header_.frame_id, inc_frame.mc_g_name_.c_str(), inc_frame.hostname_source_.c_str(), getHostnameFromMac(inc_frame.eh_h_.eh_source).c_str(), inc_frame.header_.packet_id, inc_frame.header_.packet_sequence_num);
02169
02170
02171
02172 if (inc_frame.negative_ack_type)
02173 {
02174
02175 cacheNackMcFrame(inc_frame);
02176
02177 }
02178 else
02179 {
02180
02181 if (!propagated || recursive_mc_ack == false)
02182 {
02183 sendMcAck(inc_frame.eh_h_.eh_source, inc_frame.hostname_source_, inc_frame.mc_g_name_, inc_frame.header_.packet_id, inc_frame.header_.packet_sequence_num);
02184 }
02185
02186 if (propagated)
02187 {
02188
02189
02190 stc_RoutedFrame stc_rf;
02191 stc_rf.frame = *f;
02192 stc_rf.frame_is_ack = false;
02193 stc_rf.retransmitted = 0;
02194 stc_rf.time_stamp = getMillisecondsTime();
02195
02196 routing_entry route;
02197 route.hostname_source = f->hostname_source_;
02198 memcpy(route.next_hop, bcast_mac, 6);
02199 route.id = -1;
02200
02201 boost::unique_lock<boost::mutex> lock(mtx_mc_ack_l);
02202 boost::unique_lock<boost::mutex> lock_groups(mtx_mc_groups);
02203 McTree* mc_t = mc_handler.getMcGroup(&inc_frame.mc_g_name_);
02204 McPosAckObj* ack_obj = new McPosAckObj(&inc_frame, mc_t);
02205
02206 mc_ack_l.push_front(ack_obj);
02207
02208 boost::thread resendMcAckFrameT(deliverMcAckFrame, stc_rf, route, recursive_mc_ack);
02209 }
02210 }
02211
02212
02213 if (propagated)
02214 {
02215 memcpy((unsigned char*) inc_frame.header_.mac_destination_, (unsigned char*) bcast_mac, 6);
02216 inc_frame.resend_flag = false;
02217 string network_string = inc_frame.getFrameAsNetworkString(inc_frame.header_.route_id, inc_frame.header_.mac_destination_, inc_frame.hostname_source_, src_mac);
02218 socketSend(network_string);
02219 #ifdef PERFORMANCE_LOGGING_MC_LINK_SUMMARY
02220 Logging::increaseProperty("num_mc_unique_data_frames_forwarded");
02221 Logging::increaseProperty("num_mc_total_bytes_sent ", network_string.length());
02222 #endif
02223 ROS_DEBUG("FORWARD MC FRAME: ID[%u] GROUP[%s] SOURCE[%s]", inc_frame.header_.frame_id, inc_frame.mc_g_name_.c_str(), inc_frame.hostname_source_.c_str());
02224 }
02225 else
02226 ROS_DEBUG("Don't propergate mc frame: ID[%u] group[%s] source[%s]", inc_frame.header_.frame_id, inc_frame.mc_g_name_.c_str(), inc_frame.hostname_source_.c_str());
02227
02228 {
02229 boost::unique_lock<boost::mutex> lock(mtx_inc_rf_frames);
02230 inc_frames_l.push_back(inc_frame);
02231 }
02232
02233
02234 }
02235 else
02236 {
02237
02238
02239 #ifdef PERFORMANCE_LOGGING_UC_LINK_SUMMARY
02240 Logging::increaseProperty("num_unique_data_frames_received_directly");
02241 Logging::increaseProperty("num_total_bytes_received", f->buffer_str_len_);
02242 #endif
02243
02244 boost::unique_lock<boost::mutex> lock(mtx_inc_rf_frames);
02245 inc_frames_l.push_back(inc_frame);
02246 }
02247
02248
02249
02250 }
02251
02252 void processRouteRequest(RouteRequest req)
02253 {
02254
02255
02256
02257
02258
02259
02260 route_request inc_req;
02261 inc_req.id = req.header_.id;
02262 inc_req.hostname_source = req.hostname_source_;
02263 bool route_req_exsits = true;
02264 {
02265 std::list<route_request>::iterator findReq;
02266 {
02267 boost::unique_lock<boost::mutex> lock_rreq(mtx_route_requests);
02268 findReq = std::find(route_requests_l.begin(), route_requests_l.end(),
02269 inc_req);
02270 route_req_exsits = findReq != route_requests_l.end();
02271 }
02272
02273
02274
02275 boost::unique_lock<boost::mutex> lock_mc_groups(mtx_mc_groups);
02276 McTree* mc_g = mc_handler.getMcGroup(&req.hostname_destination_);
02277 if ((req.hostname_destination_.compare(hostname) == 0 || (mc_g != NULL && mc_g->connected && mc_g->activated)))
02278 {
02279 lock_mc_groups.unlock();
02280 uint16_t counter = 1;
02281 if (route_req_exsits)
02282 {
02283 boost::unique_lock<boost::mutex> lock_rreq(mtx_route_requests);
02284 route_request & route_req(*findReq);
02285 counter = ++route_req.counter;
02286 }
02287
02288 mac m_adr(src_mac);
02289 req.path_l_.push_back(m_adr);
02290 lock_mc_groups.lock();
02291 {
02292
02293 boost::unique_lock<boost::mutex> lock_rreq(mtx_route_requests);
02294 Logging::logRRequestReceiver(req.hostname_destination_, req.header_.id, counter, req.header_.hop_count - 1, counter == 1, req.path_l_);
02295
02296 }
02297 req.path_l_.pop_back();
02298 }
02299 }
02300
02301
02302
02303 if (req.hostname_source_.compare(hostname) != 0 && (req.header_.hop_count + 1 <= req.header_.hop_limit || req.header_.hop_limit == 0) && !route_req_exsits)
02304 {
02305 if (req.mc_flag_ == false)
02306 {
02307
02308 if (req.hostname_destination_.compare(hostname) == 0)
02309 {
02310 inc_req.counter = 1;
02311 inc_req.response_sent = true;
02312 RouteResponse rr = RouteResponse(req, src_mac, 0);
02313
02314 ROS_INFO("GOT ROUTE REQUEST FOR ME: SOURCE[%s] ID[%u]", req.hostname_source_.c_str(), req.header_.id);
02315
02316
02317 routing_entry new_route;
02318 new_route.hostname_destination = hostname;
02319 new_route.current_hop = rr.current_hop_;
02320 new_route.hobs = req.header_.hop_count - 1;
02321 new_route.hostname_source = req.hostname_source_;
02322 new_route.id = req.header_.id;
02323 new_route.cr_entry = false;
02324 memcpy(new_route.previous_hop, ethernet_header->eh_source, 6);
02325 memcpy(new_route.next_hop, src_mac, 6);
02326 ROS_INFO("ADD ROUTE: ID[%u] SOURCE[%s] DEST[%s] NEXT HOP[%s] PREVIUS HOP[%s] PATH:[%s]", new_route.id, new_route.hostname_source.c_str(), new_route.hostname_destination.c_str(), getMacAsStr(new_route.next_hop).c_str(), getMacAsStr(new_route.previous_hop).c_str(), getPathAsStr(rr.path_l_).c_str());
02327
02328
02329 {
02330 boost::unique_lock<boost::mutex> lock(mtx_routing_table);
02331 cleanRoutingTable();
02332 routing_table_l.push_front(new_route);
02333 boost::unique_lock<boost::mutex> lock_g(mtx_mc_groups);
02334 Logging::logRoutingTable(&routing_table_l, &mc_groups_l);
02335 }
02336
02337
02338
02339
02340
02341
02342
02343 if (simulation_mode && new_route.hobs <= 1)
02344 sleepMS(10);
02345
02346 string network_string = rr.getResponseAsNetworkString(src_mac);
02347
02348 socketSend(network_string);
02349 #ifdef PERFORMANCE_LOGGING_UC_LINK_SUMMARY
02350 Logging::increaseProperty("num_rreq_received");
02351 Logging::increaseProperty("num_rrep_sent");
02352 Logging::increaseProperty("num_total_bytes_received", req.buffer_str_len_);
02353 Logging::increaseProperty("num_total_bytes_sent", network_string.length());
02354
02355 #endif
02356
02357
02358 }
02359 else
02360 {
02361 ROS_INFO("FORWARD REQUEST: SOURCE HOST[%s] DESTINATION HOST[%s] MAX HOPS[%u] CURRENT HOP[%u]", req.hostname_source_.c_str(), req.hostname_destination_.c_str(), req.header_.hop_limit, req.header_.hop_count + 1);
02362
02363 inc_req.response_sent = false;
02364 string network_string = req.getRequestAsNetworkString(src_mac);
02365 socketSend(network_string);
02366
02367
02368 #ifdef PERFORMANCE_LOGGING_UC_LINK_SUMMARY
02369 Logging::increaseProperty("num_rreq_sent");
02370 Logging::increaseProperty("num_total_bytes_sent", network_string.length());
02371 #endif
02372 Logging::logRRequestIntermediate(&req);
02373 }
02374 }
02375 else if (req.mc_flag_)
02376 {
02377 boost::unique_lock<boost::mutex> lock(mtx_mc_groups);
02378 McTree* mc_t = mc_handler.getMcGroup(&req.hostname_destination_);
02379
02380
02381 if (mc_t != NULL && mc_t->connected && mc_t->activated)
02382 {
02383
02384 if (!mc_t->root && compareMac(mc_t->route_uplink_->next_hop, req.eh_h_.eh_source))
02385 return;
02386 inc_req.counter = 1;
02387 inc_req.response_sent = true;
02388
02389
02390 routing_entry * route = new routing_entry;
02391 route->id = req.header_.id;
02392 route->hostname_source = req.hostname_source_;
02393 route->hostname_destination = req.hostname_destination_;
02394 memcpy((void*) route->previous_hop, (void*) req.eh_h_.eh_source, 6);
02395 route->mac_path_l = std::list<mac>(req.path_l_);
02396
02397
02398 mc_handler.addDownlinkRoute(route);
02399
02400 RouteResponse rr = RouteResponse(req, src_mac, mc_t->route_uplink_->root_distance + req.header_.hop_count);
02401
02402
02403 ROS_INFO("GOT MC REQUEST FOR ME AS DEST: MC GROUP[%s] SOURCE[%s] ID[%u] HOPS[%u] RD[%u]", mc_t->group_name_.c_str(), req.hostname_source_.c_str(), req.header_.id, req.header_.hop_count, mc_t->route_uplink_->root_distance);
02404 string network_string = rr.getResponseAsNetworkString(src_mac);
02405 socketSend(network_string);
02406
02407 #ifdef PERFORMANCE_LOGGING_MC_LINK_SUMMARY
02408 Logging::increaseProperty("num_mc_rrep_sent");
02409 Logging::increaseProperty("num_mc_total_bytes_sent ", network_string.length());
02410 #endif
02411 }
02412 else if (mc_t != NULL && mc_t->outgoing_request_ != NULL && getMillisecondsTime() - mc_t->time_stamp_ < INTERVAL_WAIT_FOR_MCROUTES)
02413 {
02414 RouteRequest* request = new RouteRequest(req);
02415
02416 if (mc_t->addWaitingRequest(request, src_mac))
02417 {
02418
02419
02420 }
02421 else
02422 {
02423
02424
02425 }
02426
02427 return;
02428 }
02429 else if (!route_req_exsits)
02430 {
02431 ROS_INFO("FORWARD MC REQUEST: SOURCE HOST[%s] DESTINATION GROUP[%s] MAX HOPS[%u] CURRENT HOP[%u]", req.hostname_source_.c_str(), req.hostname_destination_.c_str(), req.header_.hop_limit, req.header_.hop_count + 1);
02432 inc_req.response_sent = false;
02433
02434 mc_handler.addGroup(&req.hostname_destination_);
02435 mc_handler.getMcGroup(&req.hostname_destination_)->safeOutgoingRequest(new RouteRequest(req));
02436 {
02437 boost::unique_lock<boost::mutex> lock(mtx_routing_table);
02438 Logging::logRoutingTable(&routing_table_l, &mc_groups_l);
02439 }
02440 string network_string = req.getRequestAsNetworkString(src_mac);
02441 socketSend(network_string);
02442
02443
02444 Logging::logRRequestIntermediate(&req);
02445
02446 #ifdef PERFORMANCE_LOGGING_MC_LINK_SUMMARY
02447 Logging::increaseProperty("num_mc_rreq_sent");
02448 Logging::increaseProperty("num_mc_total_bytes_sent ", network_string.length());
02449 #endif
02450 }
02451 else
02452 {
02453 if (mc_t != NULL)
02454 {
02455 ROS_ERROR("mc_t != NULL");
02456 mc_t->printTree();
02457 }
02458 else
02459 {
02460 ROS_ERROR("return");
02461 }
02462
02463 return;
02464 }
02465 #ifdef PERFORMANCE_LOGGING_MC_LINK_SUMMARY
02466 Logging::increaseProperty("num_mc_rreq_received");
02467 Logging::increaseProperty("num_mc_total_bytes_received ", req.buffer_str_len_);
02468 #endif
02469 }
02470
02471
02472 memcpy((void*) inc_req.receiver_mac, (void*) req.eh_h_.eh_source, 6);
02473 inc_req.hostname_destination = req.hostname_destination_;
02474 inc_req.is_mc = req.mc_flag_;
02475
02476 boost::unique_lock<boost::mutex> lock(mtx_route_requests);
02477 route_requests_l.push_front(inc_req);
02478 }
02479 }
02480
02481 void processRouteResponse(RouteResponse rr)
02482 {
02483
02484
02485
02486
02487
02488 route_request incoming_req;
02489 incoming_req.id = rr.request_id_;
02490 incoming_req.hostname_source = rr.hostname_source_;
02491
02492 boost::unique_lock<boost::mutex> lock_route_req(mtx_route_requests);
02493 std::list<route_request>::iterator find_rreq = std::find(route_requests_l.begin(), route_requests_l.end(), incoming_req);
02494
02495
02496 if (find_rreq != route_requests_l.end())
02497 {
02498 route_request & request(*find_rreq);
02499 routing_entry new_route;
02500
02501
02502
02503 if (compareMac(rr.mac_current_hop_, src_mac) && request.response_sent == false)
02504 {
02505
02506 new_route.hobs = rr.hop_count_ - 1;
02507 new_route.mac_path_l = rr.path_l_;
02508 new_route.ts = getMillisecondsTime();
02509 new_route.hostname_source = rr.hostname_source_;
02510 new_route.id = rr.request_id_;
02511 new_route.cr_entry = false;
02512 new_route.current_hop = rr.current_hop_;
02513 memcpy(new_route.next_hop, ethernet_header->eh_source, 6);
02514 RouteResponse resp_2_get_next_hop =
02515 RouteResponse(
02516 (unsigned char*) ((std::string) rr.getResponseAsNetworkString(
02517 src_mac)).data());
02518 memcpy((unsigned char*) new_route.previous_hop,
02519 (void*) resp_2_get_next_hop.mac_current_hop_, 6);
02520
02521 if (rr.mc_flag_ == false)
02522 {
02523
02524 if (rr.current_hop_ == rr.hop_count_ && rr.hostname_source_ == hostname)
02525 {
02526 ROS_INFO("GOT ROUTE RESPONSE FROM[%s]", request.hostname_destination.c_str());
02527 ROS_INFO("PATH[%s]", getPathAsStr(rr.path_l_).c_str());
02528
02529 new_route.hostname_destination = request.hostname_destination;
02530
02531 route_requests_l.remove(request);
02532
02533 {
02534 boost::unique_lock<boost::mutex> lock(mtx_routing_table);
02535 cleanRoutingTable();
02536 routing_table_l.push_front(new_route);
02537 boost::unique_lock<boost::mutex> lock_g(mtx_mc_groups);
02538 Logging::logRoutingTable(&routing_table_l, &mc_groups_l);
02539
02540 }
02541
02542 boost::unique_lock<boost::mutex> lock(mtx_notify);
02543 got_request_response.notify_all();
02544
02545 #ifdef PERFORMANCE_LOGGING_UC_LINK_SUMMARY
02546 Logging::increaseProperty("num_rrep_received");
02547 Logging::increaseProperty("num_total_bytes_received", rr.hostname_source_.length() + RouteResponse::HEADER_FIXED_LEN);
02548 #endif
02549 }
02550 else
02551 {
02552
02553 ROS_INFO("FORWARD RESPONSE FROM[%s]: SOURCE HOST[%s] DESTINATION HOST[%s] NEXT HOP[%s]", getMacAsStr(new_route.next_hop).c_str(), new_route.hostname_source.c_str(), new_route.hostname_destination.c_str(), getMacAsStr(new_route.previous_hop).c_str());
02554 ROS_DEBUG("PATH[%s]", getPathAsStr(rr.path_l_).c_str());
02555
02556 request.response_sent = true;
02557 new_route.hostname_destination = "";
02558 {
02559 boost::unique_lock<boost::mutex> lock(
02560 mtx_routing_table);
02561 cleanRoutingTable();
02562 routing_table_l.push_front(new_route);
02563 boost::unique_lock<boost::mutex> lock_g(mtx_mc_groups);
02564 Logging::logRoutingTable(&routing_table_l, &mc_groups_l);
02565 }
02566 string network_string = rr.getResponseAsNetworkString(src_mac);
02567 socketSend(network_string);
02568 #ifdef PERFORMANCE_LOGGING_UC_LINK_SUMMARY
02569 Logging::increaseProperty("num_rrep_sent");
02570 Logging::increaseProperty("num_total_bytes_received", network_string.length());
02571 #endif
02572 }
02573 }
02574 else
02575 {
02576
02577 {
02578 boost::unique_lock<boost::mutex> lock(mtx_mc_groups);
02579 McTree* tree = mc_handler.getMcGroup(&request.hostname_destination);
02580 if (tree != NULL && tree->activated && tree->connected)
02581 return;
02582 }
02583
02584 routing_entry* route = new routing_entry;
02585 route->hostname_destination = request.hostname_destination;
02586 route->hostname_source = request.hostname_source;
02587 route->current_hop = rr.current_hop_;
02588 route->hobs = rr.current_hop_ - 1;
02589 route->root_distance = rr.root_distance - (rr.hop_count_ - rr.current_hop_);
02590 route->cr_entry = false;
02591 route->id = rr.request_id_;
02592 route->mac_path_l = rr.path_l_;
02593
02594 memcpy(route->previous_hop, new_route.previous_hop, 6);
02595 memcpy(route->next_hop, new_route.next_hop, 6);
02596 {
02597 boost::unique_lock<boost::mutex> lock(mtx_mc_groups);
02598 if (!mc_handler.addUplinkRoute(route))
02599 {
02600 delete route;
02601 return;
02602 }
02603 }
02604
02605 if (rr.hostname_source_.compare(hostname) == 0)
02606 {
02607 #ifdef PERFORMANCE_LOGGING_MC_LINK_SUMMARY
02608 Logging::increaseProperty("num_mc_rrep_received");
02609 Logging::increaseProperty("num_mc_total_bytes_received", rr.hostname_source_.length() + RouteResponse::HEADER_FIXED_LEN);
02610 #endif
02611
02612
02613 ROS_INFO("GOT MC RESPONSE: ROOT DISTANCE[%u] FROM[%s] MC GROUP[%s]", route->root_distance, getMacAsStr(rr.eh_.h_source).c_str(), request.hostname_destination.c_str());
02614
02615 }
02616 else
02617 {
02618
02619 ROS_INFO("FORWARD MC RESPONSE FROM[%s]: SOURCE HOST[%s] DESTINATION MC[%s] NEXT HOP[%s]", getMacAsStr(rr.eh_.h_source).c_str(), rr.hostname_source_.c_str(), request.hostname_destination.c_str(), getMacAsStr(new_route.previous_hop).c_str());
02620
02621
02622 string network_string = rr.getResponseAsNetworkString(src_mac);
02623 socketSend(network_string);
02624 request.response_sent = true;
02625
02626 #ifdef PERFORMANCE_LOGGING_MC_LINK_SUMMARY
02627 Logging::increaseProperty("num_mc_rrep_sent");
02628 Logging::increaseProperty("num_mc_total_bytes_sent ", network_string.length());
02629 #endif
02630 }
02631 }
02632 }
02633 else if (request.response_sent)
02634 ROS_INFO("I SENT ALREADY A RESPONSE: ID[%u] SOURCE HOST[%s]", rr.request_id_, rr.hostname_source_.c_str());
02635 else if (compareMac(rr.mac_current_hop_, src_mac) == false)
02636 ROS_ERROR("I AM NOT THE CURRENT HOP[%s] TO FORWARD THE RESPONSE!", getMacAsStr(rr.mac_current_hop_).c_str());
02637
02638
02639
02640 }
02641 else
02642 ROS_DEBUG("WAS NOT INVOLVED IN ROUTING PROCESS: ID[%u] SOURCE HOST[%s]", rr.request_id_, rr.hostname_source_.c_str());
02643
02644 }
02645
02646 void processBeacon(Beacon * beacon)
02647 {
02648
02649
02650
02651
02652
02653
02654 #ifdef PERFORMANCE_LOGGING_UC_LINK_SUMMARY
02655 Logging::increaseProperty("num_beacons_received");
02656 Logging::increaseProperty("num_total_bytes_received", beacon->HEADER_FIXED_LEN + beacon->hostname_.length());
02657 #endif
02658
02659 hostname_mac hm;
02660 hm.hostname = beacon->hostname_;
02661 hm.reachable = true;
02662 memcpy(hm.mac, beacon->mac_source_, 6);
02663
02664 bool new_neighbor = true;
02665 bool neighbor_reconnect = false;
02666
02667 boost::unique_lock<boost::mutex> lock_neighbors(mtx_neighbors);
02668
02669 std::list<hostname_mac>::iterator current_neighbor = std::find(neighbors_l.begin(), neighbors_l.end(), hm);
02670 if (current_neighbor != neighbors_l.end())
02671 {
02672 new_neighbor = false;
02673
02674 (*current_neighbor).stamp();
02675
02676 if (!(*current_neighbor).reachable)
02677 {
02678 neighbor_reconnect = true;
02679 }
02680
02681 (*current_neighbor).reachable = true;
02682 }
02683
02684 if (new_neighbor || neighbor_reconnect)
02685 {
02686 std_msgs::String msg;
02687 msg.data = hm.hostname;
02688 lock_neighbors.unlock();
02689 publishMessage(msg, node_prefix + topic_new_robot);
02690 lock_neighbors.lock();
02691 ROS_ERROR("NEW NEIGHBOR: NAME[%s]", hm.hostname.c_str());
02692
02693 if (new_neighbor)
02694 {
02695 neighbors_l.push_front(hm);
02696 }
02697 }
02698 }
02699
02700 void processMcActivationFrame(McRouteActivationFrame * f)
02701 {
02702
02703 if (compareMac(f->header_.mac_destination, src_mac) == false)
02704 return;
02705
02706 boost::unique_lock<boost::mutex> lock_mc_groups(mtx_mc_groups);
02707
02708 sendLinkAck(f->eh_h_.eh_dest, src_mac, f->header_.id, "", false, FRAME_TYPE_MC_ACTIVATION);
02709
02710 #ifdef PERFORMANCE_LOGGING_MC_LINK_SUMMARY
02711 Logging::increaseProperty("num_mc_ract_received");
02712 Logging::increaseProperty("num_mc_total_bytes_received ", f->buffer_str_len_);
02713 #endif
02714
02715 McTree* mc_t = mc_handler.getMcGroup(&f->mc_group_);
02716
02717
02718 if (mc_t != NULL && ((mc_t->connected && mc_t->activated) || mc_t->root))
02719 {
02720 mac* downlink = new mac(f->eh_h_.eh_source);
02721 if (mc_t->addDownlinkAsMember(downlink))
02722 {
02723
02724 ROS_INFO("ADD NEW LEAFE TO MC TREE: GROUP[%s] LEAFE MAC[%s]", f->mc_group_.c_str(), getMacAsStr(f->eh_h_.eh_source).c_str());
02725
02726 }
02727 else
02728 {
02729
02730
02731
02732
02733 }
02734 }
02735 else if ((mc_t = mc_handler.getMcGroup(&f->hostname_source_, &f->header_.route_id)) != NULL)
02736 {
02737
02738 mac* downlink = new mac;
02739 memcpy((void*) downlink->mac_adr, (void*) f->eh_h_.eh_source, 6);
02740 if (mc_t->addDownlinkAsConnector(downlink))
02741 {
02742 ROS_INFO("ACT AS CONNECTOR FOR MC TREE: GROUP[%s]", mc_t->group_name_.c_str());
02743 if (!mc_t->activateRoute(&f->hostname_source_, &f->header_.route_id, f->eh_h_.eh_source))
02744 ROS_ERROR("unexpected failure: could not activate mc tree as connector");
02745
02746
02747 McRouteActivationFrame r_act_frame = McRouteActivationFrame(mc_t->route_uplink_->next_hop, mc_t->group_name_, mc_t->route_uplink_->id, mc_t->route_uplink_->hostname_source);
02748
02749 string network_string = r_act_frame.getFrameAsNetworkString(src_mac);
02750 lock_mc_groups.unlock();
02751
02752 resendUnackLinkFrame("", r_act_frame.header_.id, r_act_frame.header_.mac_destination, network_string, FRAME_TYPE_MC_ACTIVATION);
02753
02754 socketSend(network_string);
02755 lock_mc_groups.lock();
02756
02757
02758 #ifdef PERFORMANCE_LOGGING_MC_LINK_SUMMARY
02759 Logging::increaseProperty("num_mc_ract_sent");
02760 Logging::increaseProperty("num_mc_total_bytes_sent ", network_string.length());
02761 #endif
02762
02763
02764 while (!mc_t->waiting_requests_l_.empty())
02765 {
02766
02767 RouteRequest *req = mc_t->waiting_requests_l_.front();
02768 mc_t->waiting_requests_l_.pop_front();
02769 lock_mc_groups.unlock();
02770 processRouteRequest(*req);
02771 lock_mc_groups.lock();
02772
02773 delete req;
02774 }
02775 }
02776 else
02777 ROS_ERROR("unexpected failure: could not add downlink as connector");
02778 }
02779 else if ((mc_t = mc_handler.getMcGroup(&f->mc_group_)) != NULL)
02780 {
02781 ROS_ERROR("act from: %s", getMacAsStr(f->eh_h_.eh_source).c_str());
02782 mc_t->printTree();
02783 }
02784 else if (mc_t == NULL)
02785 {
02786 ROS_ERROR("ERROR: 87");
02787 mc_t->printTree();
02788 }
02789
02790
02791
02792 }
02793
02794 void processIncomingFrames()
02795 {
02796
02797
02798
02799
02800
02801
02802
02803 while (ros::ok())
02804 {
02805 list<RoutedFrame> frames;
02806 bool list_is_empty = true;
02807 {
02808 frames.clear();
02809 boost::unique_lock<boost::mutex> lock(mtx_inc_rf_frames);
02810 frames = list<RoutedFrame>(inc_frames_l);
02811 inc_frames_l.clear();
02812 list_is_empty = inc_frames_l.empty();
02813 }
02814
02815
02816
02817
02818
02819
02820 while (frames.size() >= 1)
02821 {
02822
02823 {
02824 unsigned long time_lock = getMillisecondsTime();
02825 boost::unique_lock<boost::mutex> lock_packets_inco(mtx_packets_incomplete);
02826
02827
02828 for (std::list<Packet>::iterator p_i = packets_incomplete_l.begin(); p_i != packets_incomplete_l.end();)
02829 {
02830
02831 bool packet_is_complete = false;
02832 Packet & p(*p_i);
02833
02834 for (std::list<RoutedFrame>::iterator f_i = frames.begin(); f_i != frames.end();)
02835 {
02836
02837 RoutedFrame & frame(*f_i);
02838
02839 if (p.id_ == frame.header_.packet_id
02840 && frame.header_.packet_size > 1
02841 && p.hostname_source_.compare(
02842 frame.hostname_source_) == 0)
02843 {
02844
02845 if (p.addFrame(frame))
02846 {
02847 publishPacket(&p);
02848 packet_is_complete = true;
02849 }
02850
02851 f_i = frames.erase(f_i);
02852 }
02853 else
02854 f_i++;
02855 }
02856
02857 if (packet_is_complete)
02858 {
02859
02860 Packet p_comp = Packet(*p_i);
02861 p_comp.hostname_source_ = (*p_i).hostname_source_;
02862 if (p_comp.isMcFrame())
02863 {
02864
02865 boost::unique_lock<boost::mutex> lock(mtx_cached_mc_packets);
02866
02867 for (std::list<Packet>::iterator it = cached_mc_packets_l.begin(); it != cached_mc_packets_l.end(); it++)
02868 {
02869 Packet pa = *it;
02870 if (pa.id_ == p_comp.id_
02871 && pa.hostname_source_.compare(p_comp.hostname_source_) == 0
02872 && pa.mc_group_.compare(p_comp.mc_group_) == 0)
02873 {
02874
02875 cached_mc_packets_l.push_back(p_comp);
02876
02877 cached_mc_packets_l.erase(it);
02878
02879 break;
02880 }
02881 }
02882 }
02883
02884
02885 p_i = packets_incomplete_l.erase(p_i);
02886
02887 }
02888 else if (getMillisecondsTime() - p.ts_ > INTERVAL_DELETE_OLD_PACKETS)
02889 {
02890 ROS_INFO("DROP UNFINISHED PACKET: ID[%u] SOURCE[%s] SIZE[%u/%lu]", p.id_, p.hostname_source_.c_str(), p.size_, p.frames_l_.size());
02891 p_i = packets_incomplete_l.erase(p_i);
02892 }
02893 else
02894 p_i++;
02895
02896 }
02897 }
02898
02899
02900 for (std::list<RoutedFrame>::iterator f_i = frames.begin(); f_i != frames.end();)
02901 {
02902
02903 RoutedFrame & frame(*f_i);
02904
02905 bool new_packet = true;
02906
02907
02908 {
02909 unsigned long time_lock = getMillisecondsTime();
02910 boost::unique_lock<boost::mutex> lock_packets_inco(mtx_packets_incomplete);
02911 if (getMillisecondsTime() - time_lock >= 100)
02912 ROS_WARN("did not got lock 2");
02913 for (std::list<Packet>::iterator it = packets_incomplete_l.begin(); it != packets_incomplete_l.end(); it++)
02914 {
02915 Packet p = *it;
02916 if (p.id_ == frame.header_.packet_id
02917 && p.hostname_source_.compare(frame.hostname_source_) == 0)
02918 {
02919 new_packet = false;
02920 break;
02921 }
02922 }
02923 }
02924
02925 bool got_packet_already = false;
02926
02927 if (new_packet)
02928 {
02929 unsigned long time_lock = getMillisecondsTime();
02930 boost::unique_lock<boost::mutex> lock(mtx_cached_mc_packets);
02931 if (getMillisecondsTime() - time_lock >= 100)
02932 ROS_WARN("did not got lock 3");
02933
02934
02935
02936 for (std::list<Packet>::iterator it_pack = cached_mc_packets_l.begin(); it_pack != cached_mc_packets_l.end(); it_pack++)
02937 {
02938 if ((*it_pack).id_ == frame.header_.packet_id && (*it_pack).hostname_source_.compare(frame.hostname_source_) == 0 && (*it_pack).mc_group_.compare(frame.mc_g_name_) == 0 && (*it_pack).size_ <= (*it_pack).frames_l_.size() && (*it_pack).size_ > 1)
02939 {
02940 new_packet = false;
02941
02942 got_packet_already = true;
02943
02944 break;
02945 }
02946 }
02947 }
02948
02949 if (new_packet)
02950 {
02951
02952 Packet p = Packet(frame);
02953 ROS_DEBUG("ADD NEW PACKET: ID[%u] GROUP[%s] SOURCE[%s] SIZE[%u]", p.id_, p.mc_group_.c_str(), p.hostname_source_.c_str(), p.size_);
02954
02955 if (p.addFrame(frame))
02956 publishPacket(&p);
02957 else
02958 {
02959 boost::unique_lock<boost::mutex> lock_packets_inco(mtx_packets_incomplete);
02960 packets_incomplete_l.push_back(p);
02961 }
02962
02963
02964
02965 f_i = frames.erase(f_i);
02966
02967
02968 }
02969 else if (got_packet_already)
02970 f_i = frames.erase(f_i);
02971
02972 else
02973 {
02974
02975
02976 f_i++;
02977 }
02978
02979
02980 }
02981
02982
02983
02984
02985 }
02986
02987
02988
02989 sleepMS(10);
02990 }
02991
02992 }
02993
02994 void publishPacket(Packet * p)
02995 {
02996
02997
02998
02999
03000
03001
03002
03003
03004 stc_packet pack(p->hostname_source_, p->mc_group_, p->id_);
03005 unsigned long now = getMillisecondsTime();
03006 for (list<stc_packet>::iterator it = published_packets_l.begin(); it != published_packets_l.end();)
03007 {
03008
03009 if (pack == *it)
03010 {
03011
03012 return;
03013 }
03014 else if (now - (*it).ts >= MAX_TIME_CACHE_PUBLISHED_PACKETS)
03015 it = published_packets_l.erase(it);
03016 else
03017 it++;
03018 }
03019
03020
03021 published_packets_l.push_back(pack);
03022
03023
03024 bool mc_member = false;
03025 {
03026 boost::unique_lock<boost::mutex> lock(mtx_mc_groups);
03027 McTree* group = mc_handler.getMcGroup(&p->mc_group_);
03028 if (group != NULL && group->member)
03029 mc_member = true;
03030 }
03031
03032 ROS_INFO("PUBLISH PACKET: ID[%u] SRC[%s/%s]", pack.id, pack.src.c_str(), pack.mc_group.c_str());
03033
03034 if (p->mc_group_.compare("") == 0 || mc_member)
03035 {
03036
03037 try
03038 {
03039 std::string payload = p->getPayload();
03040
03041 if (p->data_type_ == FRAME_DATA_TYPE_MAP)
03042 {
03043 nav_msgs::OccupancyGrid map;
03044
03045 desializeObject((unsigned char*) payload.data(),
03046 payload.length(), &map);
03047
03048 publishMessage(map, p->topic_);
03049 }
03050
03051 if (p->data_type_ == FRAME_DATA_TYPE_POSITION)
03052 {
03053
03054 adhoc_communication::MmRobotPosition pos =
03055 adhoc_communication::MmRobotPosition();
03056
03057
03058 desializeObject((unsigned char*) payload.data(), payload.length(), &pos);
03059 pos.src_robot = p->hostname_source_;
03060 publishMessage(pos, p->topic_);
03061 }
03062 else if (p->data_type_ == FRAME_DATA_TYPE_ANY)
03063 {
03064 adhoc_communication::RecvString data = adhoc_communication::RecvString();
03065 data.src_robot = p->hostname_source_;
03066 data.data = payload;
03067 publishMessage(data, p->topic_);
03068 }
03069 else if (p->data_type_ == FRAME_DATA_TYPE_POINT)
03070 {
03071 adhoc_communication::MmPoint point;
03072 desializeObject((unsigned char*) payload.data(),
03073 payload.length(), &point);
03074 point.src_robot = p->hostname_source_;
03075 publishMessage(point, p->topic_);
03076 }
03077 else if (p->data_type_ == FRAME_DATA_TYPE_CONTROLL_MSG)
03078 {
03079 adhoc_communication::MmControl msg;
03080 desializeObject(
03081 (unsigned char*) payload.data(), payload.length(), &msg);
03082 msg.src_robot = p->hostname_source_;
03083 publishMessage(msg, p->topic_);
03084 }
03085 else if (p->data_type_ == FRAME_DATA_TYPE_MAP_UPDATE)
03086 {
03087 adhoc_communication::MmMapUpdate mapUpdate;
03088 desializeObject(
03089 (unsigned char*) payload.data(), payload.length(), &mapUpdate);
03090 mapUpdate.src_robot = p->hostname_source_;
03091 publishMessage(mapUpdate, p->topic_);
03092 }
03093 else if (p->data_type_ == FRAME_DATA_TYPE_FRONTIER)
03094 {
03095 adhoc_communication::ExpFrontier frontier;
03096 desializeObject(
03097 (unsigned char*) payload.data(), payload.length(), &frontier);
03098
03099 publishMessage(frontier, p->topic_);
03100 }
03101 else if (p->data_type_ == FRAME_DATA_TYPE_CLUSTER)
03102 {
03103 adhoc_communication::ExpCluster cluster;
03104 desializeObject(
03105 (unsigned char*) payload.data(), payload.length(), &cluster);
03106
03107 publishMessage(cluster, p->topic_);
03108 }
03109 else if (p->data_type_ == FRAME_DATA_TYPE_AUCTION)
03110 {
03111 adhoc_communication::ExpAuction auc;
03112 desializeObject(
03113 (unsigned char*) payload.data(), payload.length(), &auc);
03114
03115 publishMessage(auc, p->topic_);
03116 }
03117 else if (p->data_type_ == FRAME_DATA_TYPE_TWIST)
03118 {
03119 geometry_msgs::Twist twist;
03120 desializeObject(
03121 (unsigned char*) payload.data(), payload.length(), &twist);
03122
03123 publishMessage(twist, p->topic_);
03124 }
03125 else if (p->data_type_ == FRAME_DATA_TYPE_ROBOT_UPDATE)
03126 {
03127 adhoc_communication::CMgrRobotUpdate r_up;
03128 desializeObject((unsigned char*) payload.data(), payload.length(), &r_up);
03129
03130 publishMessage(r_up, p->topic_);
03131 }
03132 else
03133 ROS_ERROR("UNKNOWN DATA TYPE!");
03134
03135 } catch (int e)
03136 {
03137
03138 ROS_ERROR("NODE THROWS EXCEPTION: %u", e);
03139 }
03140
03141
03142
03143
03144
03145
03146
03147
03148
03149 }
03150
03151
03152
03153 }
03154
03155 void resendRouteRequest(route_request &req, boost::condition_variable & con)
03156 {
03157
03158
03159
03160
03161
03162
03163
03164
03165 sleepMS(INTERVAL_RESEND_ROUTE_REQUEST);
03166
03167 while (boost::this_thread::interruption_requested() == false)
03168 {
03169
03170 if (req.retransmitted >= (unsigned) num_rreq)
03171 {
03172 if (req.hop_limit < (unsigned) hop_limit_max)
03173 {
03174 req.hop_limit += hop_limit_increment;
03175 req.retransmitted = 0;
03176 ROS_INFO("INCREASE HOP COUNT OF ROUTE REQUEST: ID[%u] DESTINAION HOST[%s] MAX HOPS[%u]", req.id, req.hostname_destination.c_str(), req.hop_limit);
03177 }
03178 else
03179 {
03180 ROS_WARN("DROP ROUTE REQUEST: ID[%u] DESTINAION HOST[%s]", req.id, req.hostname_destination.c_str());
03181
03182 {
03183 boost::unique_lock<boost::mutex> lock(mtx_route_requests);
03184 route_requests_l.remove(req);
03185 }
03186
03187 boost::unique_lock<boost::mutex> lock(mtx_notify);
03188 con.notify_all();
03189
03190 return;
03191 }
03192 }
03193 else
03194 {
03195 req.id++;
03196 ROS_INFO("RESEND ROUTE REQUEST: ID[%u] DESTINAION HOST[%s] MAX HOPS[%u]", req.id, req.hostname_destination.c_str(), req.hop_limit);
03197 RouteRequest rr = RouteRequest(req);
03198 rr.req_count_stat++;
03199
03200 req.ts = getMillisecondsTime();
03201 req.retransmitted++;
03202 {
03203 boost::unique_lock<boost::mutex> lock(mtx_route_requests);
03204 route_requests_l.push_front(req);
03205 }
03206 string network_string = rr.getRequestAsNetworkString(src_mac);
03207 socketSend(network_string);
03208 #ifdef PERFORMANCE_LOGGING_UC_LINK_SUMMARY
03209 Logging::increaseProperty("num_rreq_sent");
03210 Logging::increaseProperty("num_total_bytes_sent", network_string.length());
03211 #endif
03212 }
03213
03214 sleepMS(INTERVAL_RESEND_ROUTE_REQUEST);
03215 }
03216 }
03217
03218 void resendRoutedFrame(boost::condition_variable &condi, routing_entry route)
03219 {
03220
03221
03222
03223
03224
03225
03226
03227
03228
03229 Logging::increaseProperty("running_unicast_transport_threads");
03230 sleepMS(INTERVAL_RESEND_ROUTED_FRAME);
03231
03232 while (boost::this_thread::interruption_requested() == false)
03233 {
03234 {
03235 boost::unique_lock<boost::mutex> lock(mtx_unack_routed_frame);
03236
03237 if (unack_routed_frame->retransmitted >= num_e2e_retrans)
03238 {
03239 boost::unique_lock<boost::mutex> lock(mtx_notify);
03240 unack_routed_frame->frame_is_ack = false;
03241 condi.notify_all();
03242 Logging::decreaseProperty("running_unicast_transport_threads");
03243 return;
03244 }
03245 else
03246 {
03247 unack_routed_frame->frame.header_.frame_id = unack_routed_frame->frame.frame_count_stat;
03248 unack_routed_frame->frame.frame_count_stat++;
03249 unack_routed_frame->time_stamp = getMillisecondsTime();
03250 unack_routed_frame->retransmitted++;
03251 unack_routed_frame->frame.resend_flag = true;
03252
03253 string network_string = unack_routed_frame->frame.getFrameAsNetworkString(route, src_mac);
03254
03255 socketSend(network_string);
03256
03257
03258
03259 #ifdef PERFORMANCE_LOGGING_UC_LINK_SUMMARY
03260 Logging::increaseProperty("num_data_frames_resent");
03261 Logging::increaseProperty("num_total_bytes_sent", network_string.length());
03262 #endif
03263 }
03264 }
03265 sleepMS(INTERVAL_RESEND_ROUTED_FRAME);
03266 }
03267
03268 Logging::decreaseProperty("running_unicast_transport_threads");
03269
03270 return;
03271 }
03272
03273 void resendMcFrame(boost::condition_variable &condi, stc_RoutedFrame* rf, routing_entry route)
03274 {
03275 Logging::increaseProperty("running_multicast_threads");
03276
03277 sleepMS(INTERVAL_RESEND_ROUTED_FRAME);
03278
03279 if (!boost::this_thread::interruption_requested())
03280 {
03281 rf->retransmitted++;
03282 rf->frame.resend_flag = true;
03283
03284
03285 string network_string = rf->frame.getFrameAsNetworkString(route, src_mac);
03286 socketSend(network_string);
03287 #ifdef PERFORMANCE_LOGGING_MC_LINK_SUMMARY
03288 Logging::increaseProperty("num_mc_data_frames_resent");
03289 Logging::increaseProperty("num_mc_total_bytes_sent ", network_string.length());
03290 #endif
03291 }
03292
03293 if (!boost::this_thread::interruption_requested())
03294 {
03295 boost::unique_lock<boost::mutex> lock(mtx_notify);
03296 condi.notify_all();
03297 }
03298
03299 Logging::decreaseProperty("running_multicast_threads");
03300 }
03301
03302 void deleteObsoleteRequests()
03303 {
03304
03305
03306
03307
03308
03309
03310 while (ros::ok())
03311 {
03312 {
03313 boost::unique_lock<boost::mutex> lock(mtx_route_requests);
03314 std::list<route_request>::iterator r = route_requests_l.begin();
03315
03316 while (r != route_requests_l.end())
03317 {
03318 route_request & currentRequest(*r);
03319 if (getMillisecondsTime() - currentRequest.ts > MAX_TIME_CACHE_ROUTE_REQUEST)
03320 {
03321 ROS_DEBUG("DROP CACHED ROUTE REQUEST: ID[%u] DESTuINAION HOST[%s]", currentRequest.id, currentRequest.hostname_destination.c_str());
03322 r = route_requests_l.erase(r);
03323 }
03324 else
03325 r++;
03326 }
03327 }
03328 sleepMS(INTERVAL_UPTDATE_THREAD_TO_DELETE_OBSOLETE_REQUESTS);
03329 }
03330 }
03331
03332 void deleteOldPackets()
03333 {
03334
03335
03336
03337
03338
03339
03340 while (ros::ok())
03341 {
03342 {
03343 boost::unique_lock<boost::mutex> lock(mtx_cached_mc_packets);
03344 std::list<Packet>::iterator r = cached_mc_packets_l.begin();
03345
03346 while (r != cached_mc_packets_l.end())
03347 {
03348 Packet & p(*r);
03349 if ((p.size_ <= p.frames_l_.size() && getMillisecondsTime() > p.ts_ && getMillisecondsTime() - p.ts_ >= INTERVAL_DELETE_OLD_PACKETS) || !p.isMcFrame())
03350 {
03351 ROS_INFO("DROP OLD PACKET: ID[%u] GROUP[%s] SRC HOST[%s]", p.id_, p.mc_group_.c_str(), p.hostname_source_.c_str());
03352 r = cached_mc_packets_l.erase(r);
03353 }
03354 else
03355 r++;
03356 }
03357 }
03358
03359 {
03360 boost::unique_lock<boost::mutex> lock(mtx_packets_incomplete);
03361 std::list<Packet>::iterator r = packets_incomplete_l.begin();
03362
03363 while (r != packets_incomplete_l.end())
03364 {
03365 Packet & p(*r);
03366 if ((getMillisecondsTime() > p.ts_ && getMillisecondsTime() - p.ts_ >= INTERVAL_DELETE_OLD_PACKETS) && p.isMcFrame())
03367 {
03368 ROS_INFO("DROP OLD UNFINISHED PACKET: ID[%u] GROUP[%s] SRC HOST[%s]", p.id_, p.mc_group_.c_str(), p.hostname_source_.c_str());
03369 r = packets_incomplete_l.erase(r);
03370 }
03371 else
03372 r++;
03373 }
03374 }
03375
03376 sleepMS(INTERVAL_UPTDATE_THREAD_TO_DELETE_OBSOLETE_REQUESTS);
03377 }
03378 }
03379
03380 void resendLinkFrame(stc_frame f)
03381 {
03382
03383 if (num_link_retrans == 0)
03384 return;
03385
03386 Logging::increaseProperty("running_unicast_link_threads");
03387 f.retransmitted = 0;
03388 bool got_ack = false;
03389 do
03390 {
03391 sleepMS(INTERVAL_TO_RETRANSMIT_LINK_FRAMES);
03392
03393 bool frame_successfully_acknowleged = false;
03394
03395 {
03396 boost::unique_lock<boost::mutex> lock(mtx_unack_link_frames);
03397 std::list<stc_frame>::iterator cr_frame_it = std::find(unack_link_frames_l.begin(), unack_link_frames_l.end(), f);
03398 frame_successfully_acknowleged = cr_frame_it == unack_link_frames_l.end();
03399 }
03400
03401 if (frame_successfully_acknowleged == false)
03402 {
03403 #ifdef PERFORMANCE_LOGGING_UC_LINK_SUMMARY
03404
03405 if (f.type == FRAME_TYPE_TRANSPORT_ACK || f.type == FRAME_TYPE_TRANSPORT_DATA)
03406 {
03407 if (f.hostname_source.compare(hostname) == 0)
03408 Logging::increaseProperty("num_data_frames_resent");
03409 else
03410 Logging::increaseProperty("num_data_frames_reforwarded");
03411 }
03412
03413 if (f.mc_group.compare("") == 0)
03414 Logging::increaseProperty("num_total_bytes_sent", f.network_string.length());
03415 #endif
03416
03417 #ifdef PERFORMANCE_LOGGING_MC_LINK_SUMMARY
03418 if (f.mc_group.compare("") != 0)
03419 Logging::increaseProperty("num_mc_total_bytes_sent ", f.network_string.length());
03420 #endif
03421
03422
03423 socketSend(f.network_string);
03424 f.retransmitted++;
03425 }
03426 else
03427 {
03428 got_ack = true;
03429 }
03430 } while (f.retransmitted < num_link_retrans && !got_ack && ros::ok());
03431
03432 if (!got_ack)
03433 {
03434 {
03435 boost::unique_lock<boost::mutex> lock(mtx_unack_link_frames);
03436 std::list<stc_frame>::iterator cr_frame_it = std::find(unack_link_frames_l.begin(), unack_link_frames_l.end(), f);
03437 if (cr_frame_it != unack_link_frames_l.end())
03438 {
03439 unack_link_frames_l.erase(cr_frame_it);
03440 ROS_WARN("DROP FRAME: ID[%u] CONFIRMER MAC[%s] SOURCE HOST[%s] TYPE[%s]", f.frame_id, getMacAsStr(f.mac).c_str(), f.hostname_source.c_str(), frame_types[f.type].c_str());
03441 }
03442 }
03443 if (f.type == FRAME_TYPE_MC_ACTIVATION)
03444 {
03445 hostname_mac n;
03446 memcpy(n.mac, f.mac, 6);
03447 n.hostname = getHostnameFromMac(f.mac);
03448
03449
03450 boost::thread t(&mcLostConnection, n);
03451 }
03452 }
03453
03454 Logging::decreaseProperty("running_unicast_link_threads");
03455 }
03456