00001 #include <boost/shared_array.hpp> 00002 #include <ros/ros.h> 00003 00004 00005 #include "throttled_transport/throttler.h" 00006 00007 namespace throttled_transport { 00008 00009 Throttler::Throttler() { 00010 initialized = false; 00011 throttle_mode_ = THROTTLE_BY_FREQUENCY; 00012 max_freq_ = 0; 00013 max_bw_ = 0; 00014 used_bytes_ = 0; 00015 message_count_ = 0; 00016 max_bytes_reached_ = false; 00017 } 00018 00019 bool Throttler::initialize() { 00020 // TODO: find what namespace to use 00021 ros::NodeHandle nh("~"); 00022 00023 std::string throttle_type; 00024 nh.param("throttled_transport/mode",throttle_type,throttle_type); 00025 if (throttle_type == "frequency") { 00026 throttle_mode_ = THROTTLE_BY_FREQUENCY; 00027 } 00028 if (throttle_type == "bandwidth") { 00029 throttle_mode_ = THROTTLE_BY_BANDWIDTH; 00030 } 00031 00032 nh.param("throttled_transport/max_frequency",max_freq_,0.0); 00033 nh.param("throttled_transport/max_bandwidth",max_bw_,0.0); 00034 00035 // TODO: create a service to configure this parameter ? 00036 throttling_server = nh.advertiseService("set_throttling_parameters",&Throttler::set_throttling_parameters, this); 00037 00038 // TODO: use dynamic_reconfigure ? 00039 00040 used_bytes_ = 0; 00041 message_count_ = 0; 00042 max_bytes_reached_ = false; 00043 last_publish_ = ros::Time::now(); 00044 initialized = true; 00045 return true; 00046 } 00047 00048 bool Throttler::set_throttling_parameters(SetThrottlingParameters::Request &req, 00049 SetThrottlingParameters::Response &res) { 00050 if (req.throttling_mode == "frequency") { 00051 throttle_mode_ = THROTTLE_BY_FREQUENCY; 00052 } 00053 if (req.throttling_mode == "bandwidth") { 00054 throttle_mode_ = THROTTLE_BY_BANDWIDTH; 00055 } 00056 switch (throttle_mode_) { 00057 case THROTTLE_BY_FREQUENCY: 00058 max_freq_ = req.max_frequency; 00059 break; 00060 case THROTTLE_BY_BANDWIDTH: 00061 max_bw_ = req.max_bandwidth; 00062 break; 00063 default: 00064 break; 00065 } 00066 used_bytes_ = 0; 00067 message_count_ = 0; 00068 max_bytes_reached_ = false; 00069 last_publish_ = ros::Time::now(); 00070 res.result = 0; 00071 return true; 00072 } 00073 00074 bool Throttler::can_publish(size_t datasize) { 00075 if (!initialized) { 00076 initialize(); 00077 } 00078 ros::Time now = ros::Time::now(); 00079 double dt = (now - last_publish_).toSec();; 00080 message_count_ += 1; 00081 // ROS_INFO("Can publish? dt %.3f mc %d mxf %.2f ub %d mxbw %.2f", 00082 // dt,message_count_,max_freq_, used_bytes_, max_bw_); 00083 00084 switch (throttle_mode_) { 00085 case THROTTLE_BY_FREQUENCY: 00086 if (max_freq_ <= 0) { 00087 return false; 00088 } 00089 if ((1 / dt) < max_freq_) { 00090 message_count_ = 0; 00091 last_publish_ = now; 00092 return true; 00093 } 00094 break; 00095 case THROTTLE_BY_BANDWIDTH: 00096 if (max_bw_ <= 0) { 00097 return false; 00098 } 00099 if (max_bytes_reached_) { 00100 if ((used_bytes_ / dt) < max_bw_) { 00101 used_bytes_ = 0; 00102 last_publish_ = now; 00103 max_bytes_reached_ = false; 00104 } 00105 return false; 00106 } else { 00107 if ((used_bytes_ / dt) < max_bw_) { 00108 used_bytes_ += datasize; 00109 return true; 00110 } else { 00111 max_bytes_reached_ = true; 00112 return false; 00113 } 00114 } 00115 break; 00116 default: 00117 break; 00118 } 00119 00120 00121 return false; 00122 } 00123 00124 00125 };