00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 #ifndef TESTSHORTMULTIPLEDELAY_HPP
00021 #define TESTSHORTMULTIPLEDELAY_HPP
00022
00023 #include <iostream>
00024 #include <fstream>
00025 #include <limits>
00026
00027 #include <boost/thread/mutex.hpp>
00028 #include <boost/random.hpp>
00029 #include <boost/lexical_cast.hpp>
00030 #include <boost/foreach.hpp>
00031 #define foreach BOOST_FOREACH
00032
00033 #include <ros/ros.h>
00034 #include <std_msgs/UInt8.h>
00035 #include <std_msgs/UInt32.h>
00036 #include <std_msgs/Time.h>
00037
00038 #include <socrob/multicast.h>
00039
00040
00041
00042 using namespace std;
00043 using namespace ros;
00044 using namespace socrob::multicast;
00045 using boost::lexical_cast;
00046
00047
00048
00049 class TestShortMultipleDelay
00050 {
00051 boost::mutex mutex_;
00052
00053 int sid_;
00054
00055 const uint8_t MAGIC_BYTE_;
00056
00057 unsigned round_number_;
00058 unsigned round_uid_;
00059 Time round_tx_;
00060
00061 bool waiting_;
00062
00063 long double acc_max_arrival_delay_;
00064 long double acc_avg_arrival_delay_;
00065 long double acc_complete_delay_;
00066 unsigned long num_rounds_;
00067
00068 unsigned long failed_;
00069
00070 boost::variate_generator<boost::mt19937, boost::uniform_int<unsigned> > random_uid_;
00071
00072 Time output_time_base_;
00073 unsigned output_line_;
00074 ofstream output_;
00075
00076 public:
00077 TestShortMultipleDelay (int sid,
00078 string const& type) :
00079 sid_ (sid),
00080 MAGIC_BYTE_ (2),
00081 round_number_ (0),
00082 round_uid_(),
00083 round_tx_ (Time::now()),
00084 waiting_ (false),
00085 acc_max_arrival_delay_ (0),
00086 acc_avg_arrival_delay_ (0),
00087 acc_complete_delay_ (0),
00088 num_rounds_ (0),
00089 failed_ (0),
00090 random_uid_ (boost::mt19937 (Time::now().toNSec()),
00091 boost::uniform_int<unsigned> (numeric_limits<unsigned>::min(), numeric_limits<unsigned>::max())),
00092 output_time_base_ (Time::now()) {
00093 if (0 != sid_) {
00094 return;
00095 }
00096
00097 output_.open ( ("/tmp/TestShortMultipleDelay_" + type + "_" + lexical_cast<string> (output_time_base_.sec) + ".csv").c_str());
00098 output_ << "\"Round Number\""
00099 << ";\"Handler Delay 1\""
00100 << ";\"Handler Delay 2\""
00101 << ";\"Handler Delay 3\""
00102 << ";\"Handler Delay 4\""
00103 << ";\"Callback Delay\""
00104 << endl;
00105 output_line_ = 1;
00106 }
00107
00108
00109
00110 ~TestShortMultipleDelay() {
00111 unsigned l = output_line_;
00112 output_ << ";;;" << endl;
00113 output_ << "\"Average Handler Delay:\";=AVERAGE(B2:E" << l << ")" << endl;
00114 output_ << "\"Average Callback Delay:\";=AVERAGE(F2:F" << l << ")" << endl;
00115 }
00116
00117
00118
00119 bool
00120 is_question (vector<uint8_t> & question) {
00121 std_msgs::UInt8 magic_msg;
00122 deserialize (magic_msg, question);
00123
00124 if (magic_msg.data == MAGIC_BYTE_) {
00125 return true;
00126 }
00127
00128 return false;
00129 }
00130
00131
00132
00133 void
00134 timer (socrob::multicast::Manager& manager) {
00135 if (0 != sid_) {
00136 return;
00137 }
00138
00139 vector<uint8_t> question;
00140 std_msgs::UInt8 magic_msg;
00141 magic_msg.data = MAGIC_BYTE_;
00142 serialize_append (question, magic_msg);
00143
00144 {
00145 boost::lock_guard<boost::mutex> _ (mutex_);
00146
00147 if (waiting_) {
00148 Duration age = Time::now() - round_tx_;
00149 if (age > Duration (1, 0)) {
00150 ROS_WARN_STREAM_THROTTLE (1, "Last round (" << round_number_ << ") was created " << age << " seconds ago, still waiting...");
00151 }
00152 return;
00153 }
00154
00155 round_number_++;
00156 round_uid_ = random_uid_();
00157 round_tx_ = Time::now();
00158 waiting_ = true;
00159
00160 std_msgs::UInt32 round_number_msg;
00161 round_number_msg.data = round_number_;
00162 serialize_append (question, round_number_msg);
00163
00164 std_msgs::UInt32 round_uid_msg;
00165 round_uid_msg.data = round_uid_;
00166 serialize_append (question, round_uid_msg);
00167 }
00168
00169 set<id_type> required_sids;
00170 required_sids.insert (1);
00171 required_sids.insert (2);
00172 required_sids.insert (3);
00173 required_sids.insert (4);
00174
00175
00176 Rate rate (1000);
00177 while (! manager.startShortRound (question, required_sids, boost::bind (&TestShortMultipleDelay::short_callback, this, _1))) {
00178 boost::lock_guard<boost::mutex> _ (mutex_);
00179 failed_++;
00180 rate.sleep();
00181 }
00182 }
00183
00184
00185
00186 void
00187 short_handler (vector<uint8_t>& answer,
00188 id_type from,
00189 vector<uint8_t> & question) {
00190 Time now = Time::now();
00191
00192 size_t offset = 0;
00193 std_msgs::UInt8 magic_msg;
00194 offset += deserialize (magic_msg, question, offset);
00195 std_msgs::UInt32 number_msg;
00196 offset += deserialize (number_msg, question, offset);
00197 std_msgs::UInt32 uid_msg;
00198 offset += deserialize (uid_msg, question, offset);
00199
00200 if (magic_msg.data != MAGIC_BYTE_) {
00201 ROS_FATAL_STREAM ("TestShortMultipleDelay::short_handler received a question with wrong magic byte.");
00202 abort();
00203 }
00204
00205 serialize_append (answer, number_msg);
00206
00207 serialize_append (answer, uid_msg);
00208
00209 std_msgs::UInt8 sid_msg;
00210 sid_msg.data = sid_;
00211 serialize_append (answer, sid_msg);
00212
00213 std_msgs::Time time_msg;
00214 time_msg.data = now;
00215 serialize_append (answer, time_msg);
00216 }
00217
00218
00219
00220 void
00221 short_callback (map<id_type, vector<uint8_t> > & answers) {
00222 Time now = Time::now();
00223
00224 boost::lock_guard<boost::mutex> _ (mutex_);
00225
00226 if (! waiting_) {
00227 ROS_FATAL_STREAM ("Callback called unexpectedly while short rounds not active");
00228 abort();
00229 }
00230 waiting_ = false;
00231
00232 if (0 == answers.size()) {
00233 ROS_WARN_STREAM ("Did not receive any answer in this round while testing short rounds multiple.");
00234 return;
00235 }
00236
00237 if (4 > answers.size()) {
00238 ROS_WARN_STREAM ("Did not receive enough answers in this round while testing short rounds multiple. Maybe the delay to one or more robots was too big and therefore considered NOT_RUNNING.");
00239 return;
00240 }
00241
00242 if (4 != answers.size()) {
00243 ROS_FATAL_STREAM ("Received too many answers in this round while testing short rounds multiple.");
00244 abort();
00245 }
00246
00247 long double avg_arrival_delay = 0;
00248 long double max_arrival_delay = 0;
00249
00250
00251 ++ (output_line_);
00252 output_ << round_number_;
00253
00254 typedef map<id_type, vector<uint8_t> >::value_type tmp_t;
00255 foreach (tmp_t & answer_pair, answers) {
00256 vector<uint8_t> & answer = answer_pair.second;
00257
00258 size_t offset = 0;
00259 std_msgs::UInt32 number_msg;
00260 offset += deserialize (number_msg, answer, offset);
00261 std_msgs::UInt32 uid_msg;
00262 offset += deserialize (uid_msg, answer, offset);
00263 std_msgs::UInt8 sid_msg;
00264 offset += deserialize (sid_msg, answer, offset);
00265 std_msgs::Time arrival_time_msg;
00266 offset += deserialize (arrival_time_msg, answer, offset);
00267
00268 unsigned sent_number = number_msg.data;
00269 unsigned sent_uid = uid_msg.data;
00270 id_type sid = sid_msg.data;
00271 Time ans_arrival_time = arrival_time_msg.data;
00272
00273 if (sid != answer_pair.first) {
00274 ROS_FATAL_STREAM ("Received answer with agent number mismatch while testing short rounds single.");
00275 abort();
00276 }
00277 if (sid != 1 && sid != 2 && sid != 3 && sid != 4) {
00278 ROS_FATAL_STREAM ("Received answer from wrong agent while testing short rounds multiple.");
00279 abort();
00280 }
00281 if (sent_number != round_number_) {
00282 ROS_FATAL_STREAM ("Received answer wrong round number while testing short rounds multiple.");
00283 abort();
00284 }
00285 if (sent_uid != round_uid_) {
00286 ROS_FATAL_STREAM ("Received answer with right number but wrong UID while testing short rounds multiple.");
00287 abort();
00288 }
00289
00290 Duration arrival_delay = ans_arrival_time - round_tx_;
00291
00292 output_ << ";" << arrival_delay;
00293
00294 long double arrival_delay_ld = (long double) arrival_delay.sec + 1e-9 * (long double) arrival_delay.nsec;
00295 avg_arrival_delay += arrival_delay_ld / 4;
00296 max_arrival_delay = max (max_arrival_delay, arrival_delay_ld);
00297 }
00298
00299 Duration complete_delay = now - round_tx_;
00300
00301 output_ << ";" << complete_delay << endl;
00302
00303 ROS_INFO_STREAM ("Short round " << round_number_ << " to {1,2,3,4}: q. took " << avg_arrival_delay << " avg s and " << max_arrival_delay << " max s to arrive at dest, and " << complete_delay << " seconds from start to callback.");
00304
00305 acc_avg_arrival_delay_ += avg_arrival_delay;
00306 acc_max_arrival_delay_ += max_arrival_delay;
00307 acc_complete_delay_ += (long double) complete_delay.sec + 1e-9 * (long double) complete_delay.nsec;
00308 num_rounds_++;
00309 }
00310
00311
00312
00313 void
00314 statistics() {
00315 if (0 != sid_) {
00316 return;
00317 }
00318
00319 acc_avg_arrival_delay_ /= num_rounds_;
00320 acc_max_arrival_delay_ /= num_rounds_;
00321 acc_complete_delay_ /= num_rounds_;
00322
00323 cout << "Final results at " << sid_ << " for short multiple: handler avg delay: " << lexical_cast<string> (acc_avg_arrival_delay_) << ", handler avg of max delay: " << lexical_cast<string> (acc_max_arrival_delay_) << ", callback delay: " << lexical_cast<string> (acc_complete_delay_) << "." << endl;
00324 cout << "Failed rounds at " << sid_ << ": " << failed_ << endl;
00325 }
00326 };
00327
00328 #endif