ecstats.cpp
Go to the documentation of this file.
00001 // EML
00002 #include <ethercat/netif.h>
00003 #include <ethercat/ethercat_xenomai_drv.h>
00004 #include <dll/ethercat_frame.h>
00005 #include <dll/ethercat_device_addressed_telegram.h>
00006 
00007 // STL
00008 #include <vector>
00009 #include <stdexcept>
00010 #include <iostream>
00011 #include <string>
00012 #include <sstream>
00013 
00014 // BOOST
00015 #include <boost/foreach.hpp>
00016 
00017 // ROS
00018 #include <ros/ros.h>
00019 #include <ectools/ecstats.h>
00020 
00021 // C
00022 #include <unistd.h>
00023 #include <signal.h>
00024 #include <errno.h>
00025 #include <pthread.h>
00026 #include <sys/types.h>
00027 #include <sys/socket.h>
00028 #include <net/if.h>
00029 #include <sys/ioctl.h>
00030 #include <netinet/in.h>
00031 
00032 using std::cout;
00033 using std::cerr;
00034 using std::endl;
00035 using std::ostringstream;
00036 using std::string;
00037 using std::runtime_error;
00038 
00039 // from getopt
00040 extern char *optarg;
00041 extern int optind, opterr, optopt;
00042 
00043 static const unsigned default_packet_size = 1400;
00044 static const unsigned default_num_threads = 8;
00045 
00046 void usage(char const *progname) {
00047   fprintf(
00048           stderr,
00049           "usage: %s [-i <interface>] [-s <size>] [-d <delay>] [-l <preload>] [-v]\n"
00050           " Measures Ethernet bandwidth and packet loss.  Also, counts number of EtherCAT devices.\n"
00051           " Publishes results on 'ecstats' topic.\n"
00052           " -i : network interface to use when communicating with EtherCAT MCBs\n"
00053           " -s : <size> of packet to send.  Defaults to %d\n"
00054           " -j : number of threads to run.  Defaults to %d\n"
00055           " -v : increase verbosity\n"
00056           " -h : display this help\n",
00057           progname, default_packet_size, default_num_threads);
00058 }
00059 
00060 int get_request_sock() throw (std::exception)
00061 {
00062   static int request_sock = -1; 
00063   if (request_sock < 0) {    
00064     // create generic socket to use for ioctl
00065     request_sock = socket(PF_INET, SOCK_DGRAM, 0);
00066     if (request_sock < 0) {
00067       int error = errno;
00068       std::ostringstream os;
00069       os << "Couldn't open generic socket for ioctls " << strerror(error); 
00070       throw std::runtime_error(os.str());
00071     }
00072   }  
00073   return request_sock;
00074 }
00075   
00076 
00077 // returns true if interface is up
00078 bool is_iface_up(const char* iface_name) throw (std::exception)
00079 {
00080   int request_sock = get_request_sock();
00081 
00082   // get interface flags
00083   struct ifreq ifr;                 
00084   strncpy(ifr.ifr_name, iface_name, IFNAMSIZ);
00085   if (ioctl(request_sock, SIOCGIFFLAGS, &ifr) < 0) {
00086     int error = errno;
00087     close(request_sock);
00088     std::ostringstream os;
00089     os << "Could not make SIOCGIFFLAGS ioctl : " << strerror(error); 
00090     throw std::runtime_error(os.str());
00091   }
00092   
00093   return (ifr.ifr_flags & IFF_UP);
00094 }
00095 
00096 // returns true if interface is running (has link)
00097 bool is_iface_running(const char* iface_name) throw (std::exception)
00098 {
00099   int request_sock = get_request_sock();
00100     
00101   // get interface flags
00102   struct ifreq ifr;                 
00103   strncpy(ifr.ifr_name, iface_name, IFNAMSIZ);
00104   if (ioctl(request_sock, SIOCGIFFLAGS, &ifr) < 0) {
00105     int error = errno;
00106     close(request_sock);
00107     std::ostringstream os;
00108     os << "Could not make SIOCGIFFLAGS ioctl : " << strerror(error); 
00109     throw std::runtime_error(os.str());
00110   }
00111   
00112   return (ifr.ifr_flags & IFF_RUNNING);
00113 }
00114 
00115 
00116 
00117 #define EXCEPT_HEADER  __FILE__ << ':' << __LINE__ << ' ' << __func__ << "  "
00118 
00119 double timediff (const timespec &time1, const timespec &time0) 
00120 {
00121   double diff = time1.tv_sec - time0.tv_sec;
00122   long long diff_ns = time1.tv_nsec - time0.tv_nsec;
00123   diff += ((double) diff_ns ) * 0.000000001;
00124   return diff;
00125 }
00126 
00127 
00128 // Use a thread data struct
00129 struct send_thread {
00130   pthread_t thread;
00131   std::string name;
00132   unsigned thread_num;
00133   pthread_mutex_t thread_lock;
00134   bool running;
00135 
00136   uint64_t sent, interval_sent;
00137   uint64_t unsent, interval_unsent;
00138   uint64_t dropped, interval_dropped;   
00139   uint64_t good, interval_good;
00140   int interval_min_device_count;
00141   int max_device_count;
00142   struct timespec start_time, interval_time;
00143 
00144   unsigned packet_size;  
00145   unsigned char buffer[2000];
00146   unsigned buffer_index;
00147   unsigned char buffer_value;
00148 
00149   struct netif *ni;
00150   APRD_Telegram aprd_telegram;
00151   EC_Ethernet_Frame frame;
00152 
00153   static bool quit;
00154   static unsigned verbose;
00155   static pthread_mutex_t global_lock;
00156   static unsigned thread_count;
00157 
00158 
00159   static inline void lock_global() {
00160     pthread_mutex_lock(&global_lock);
00161   }
00162 
00163   static inline void unlock_global() {
00164     pthread_mutex_unlock(&global_lock);
00165   }
00166 
00167   inline void lock_thread() {
00168     pthread_mutex_lock(&thread_lock);
00169   }
00170 
00171   inline void unlock_thread() {
00172     pthread_mutex_unlock(&thread_lock);
00173   }
00174 
00175 
00176   send_thread(unsigned packet_size, struct netif *ni) : 
00177     running(false),
00178     sent(0), interval_sent(0),
00179     unsent(0), interval_unsent(0),
00180     dropped(0), interval_dropped(0),
00181     good(0), interval_good(0),
00182     interval_min_device_count(-1),
00183     max_device_count(-1),
00184     packet_size(packet_size),
00185     buffer_index(0),
00186     buffer_value(0xFF),
00187     ni(ni), 
00188     aprd_telegram(0,0,0,0,packet_size,buffer),
00189     frame(&aprd_telegram)
00190   {
00191     lock_global();
00192     ++thread_count;
00193     unlock_global();
00194     thread_num = thread_count;
00195 
00196     ostringstream os;
00197     os << "thread("<<thread_num<<")";
00198     name = os.str();
00199     
00200     if (verbose>0) {
00201       lock_global();
00202       cerr << " send_thread " << thread_count << " " << name << endl;
00203       unlock_global();    
00204     }
00205 
00206     int error = pthread_mutex_init(&thread_lock,NULL);
00207     if (error != 0) 
00208     {
00209       cerr << " error creating thread lock : " << strerror(error) << endl;  
00210     }
00211   }
00212   
00213   ~send_thread() {
00214     lock_global();
00215     --thread_count;
00216     if (verbose>0) {
00217       cerr << " ~send_thread " << thread_count << " " << name << endl;
00218     }
00219     if (running) {
00220       cout << " Error : destructor called on running thread" << endl;
00221     }
00222     unlock_global();
00223   }
00224   
00225   void start() throw(std::exception) {
00226     int error = pthread_create(&thread, NULL, this->run, this);
00227     if (error != 0) {
00228       ostringstream os;    
00229       os << EXCEPT_HEADER << "Error creating thread " << name << " : " << strerror(error);
00230       throw runtime_error(os.str());
00231     }
00232     running = true;
00233   }
00234   
00235   static void* run(void* arg) {
00236     send_thread *st = (send_thread *)arg;
00237 
00238     if (verbose>0) {
00239       st->lock_global();
00240       cerr << " run(1) " << st->name << endl;
00241       st->unlock_global();
00242     }
00243     st->run();    
00244     if (verbose>0) {
00245       st->lock_global(); 
00246       cerr << " ~run(1) " << st->name << endl;
00247       st->unlock_global(); 
00248     }
00249     st->running = false;
00250     return NULL;
00251   }
00252   
00253   void run() {
00254     if (verbose>0) {      
00255       lock_global();
00256       cerr << " run(2) " << name << endl;
00257       unlock_global();    
00258     }
00259 
00260     if (clock_gettime(CLOCK_REALTIME, &start_time) != 0) {
00261       int error = errno;
00262       fprintf(stderr, "%s: Could not get start time : %s\n",
00263              __func__, strerror(error));
00264       return;
00265     }
00266     interval_time = start_time;
00267     
00268     while (!quit) {
00269       int send_index = rand() & 0xFF;
00270       int send_wkc   = rand() & 0xFFFF;      
00271       
00272       aprd_telegram.set_adp(0);
00273       aprd_telegram.set_wkc(send_wkc);
00274       aprd_telegram.set_idx(send_index);
00275       
00276       // Put moving pattern in buffer
00277       buffer[buffer_index]=buffer_value;      
00278       ++buffer_index;
00279       if ((buffer_index>=sizeof(buffer)) || (buffer_index>=packet_size)) {
00280         buffer_index=0;
00281         buffer_value = ~buffer_value;
00282       }
00283       
00284       int handle = ni->tx(&frame, ni);
00285       if (handle<0) {
00286         lock_thread();
00287         ++unsent;
00288         ++interval_unsent;
00289         unlock_thread();
00290         if (verbose) {
00291           lock_global();
00292           cerr << '-';
00293           unlock_global();
00294         }
00295       } else {        
00296         lock_thread();
00297         ++sent;
00298         ++interval_sent;
00299         unlock_thread();
00300         
00301         bool success = ni->rx(&frame, ni, handle);
00302 
00303         lock_thread();
00304         if (!success) {
00305 
00306           ++dropped;
00307           ++interval_dropped;
00308         } else {
00309           ++good;
00310           ++interval_good;
00311           int device_count = aprd_telegram.get_adp();
00312           if ((interval_min_device_count == -1) || (device_count < interval_min_device_count)) 
00313           {
00314             // interval device count is minumum number of number of devices seen in a publishing interval.
00315             // -1 is a special case, it means that no packets have come back (hence device count cannnot be determined)
00316             interval_min_device_count = device_count;
00317           }
00318           if (device_count > max_device_count) 
00319           {
00320             max_device_count = device_count;
00321           }
00322         }
00323         unlock_thread();
00324         
00325         if (verbose) {
00326           lock_global();
00327           if (!success) {
00328             cerr << 'x';
00329           }
00330           else if (verbose>1) {
00331             cerr << '.';
00332           }
00333           unlock_global();
00334         }
00335       }
00336       
00337       if (send_index != aprd_telegram.get_idx()) {
00338         lock_global();
00339         cerr << " got invalid frame back : expected " << send_index 
00340              << " got " << (int)aprd_telegram.get_idx() << endl;
00341         unlock_global();
00342       }
00343     } // end while 
00344   }
00345 
00346   void collect_stats(ectools::ecstats &stats) 
00347   {
00348     struct timespec current_time;
00349     if (clock_gettime(CLOCK_REALTIME, &current_time) != 0) {
00350       int error = errno;
00351       fprintf(stderr,"%s: Could not get stoptime : %s\n",
00352              __func__, strerror(error));
00353       return;
00354     }
00355     
00356     double total_time_sec    = timediff(current_time, start_time);    
00357     double interval_time_sec = timediff(current_time, interval_time);
00358     static const unsigned ethernet_overhead = 14;
00359     static const unsigned ethercat_frame_overhead = 2;
00360     static const unsigned ethercat_datagram_overhead = 12;
00361     unsigned packet_size_bits = 8*(packet_size + ethernet_overhead + ethercat_frame_overhead + ethercat_datagram_overhead);    
00362     
00363     lock_thread();
00364     {      
00365       
00366       double mbits_per_bit = 1.0 / 1000000.0;
00367       stats.total_bandwidth_mbps    += mbits_per_bit * (double)(good * packet_size_bits) / total_time_sec;
00368       stats.interval_bandwidth_mbps += mbits_per_bit * (double)(interval_good * packet_size_bits) / interval_time_sec;
00369       
00370       stats.total_sent_packets += (sent + unsent);
00371       stats.interval_sent_packets += (interval_sent + interval_unsent);
00372       
00373       stats.total_dropped_packets += (dropped + unsent);
00374       stats.interval_dropped_packets += (interval_dropped + interval_unsent);
00375      
00376       if ((stats.interval_min_device_count == -1) ||
00377           ((interval_min_device_count!=-1) && (interval_min_device_count < stats.interval_min_device_count)))
00378       {
00379         stats.interval_min_device_count = interval_min_device_count;
00380       }
00381       
00382       if (max_device_count > stats.max_device_count)
00383       {
00384         stats.max_device_count = max_device_count;
00385       }   
00386 
00387       interval_sent = 0;
00388       interval_unsent = 0;
00389       interval_good = 0;
00390       interval_dropped = 0;
00391       interval_min_device_count = -1;
00392     }
00393     unlock_thread();
00394       
00395     interval_time = current_time;    
00396   }
00397   
00398 
00399   void print(std::ostream &os)
00400   {
00401     lock_thread();
00402 
00403     os << name 
00404        << " : attempts " <<  (good+dropped+unsent) 
00405        << " unsent " << unsent 
00406        << " dropped " << dropped 
00407        << " good " << good 
00408        << " max devices " << max_device_count 
00409        << " interval min devices " << interval_min_device_count
00410        << endl;
00411 
00412     unlock_thread();    
00413   }
00414 
00415 };
00416 
00417 
00418 pthread_mutex_t send_thread::global_lock = PTHREAD_MUTEX_INITIALIZER;
00419 unsigned send_thread::thread_count = 0; 
00420 unsigned send_thread::verbose = 0;
00421 bool send_thread::quit = false;
00422 
00423 
00424 
00425 int main(int argc, char** argv) {
00426   char const *progname = argv[0];
00427   if (argc < 2) {
00428     usage(progname);
00429     return 1;
00430   }
00431 
00432   ros::init(argc, argv, "ecstats");
00433 
00434   const char *interface = NULL; //default_interface;    
00435   unsigned packet_size = default_packet_size;
00436   unsigned num_threads = default_num_threads;  
00437 
00438   int optchar;
00439   int tmp;
00440   while ((optchar = getopt(argc, argv, "i:s:j:vh")) != -1) {            
00441     switch(optchar) { 
00442     case 'h':
00443       usage(progname);
00444       return 0;
00445       break;
00446     case 's':
00447       if ( sscanf(optarg, "%i", &tmp) != 1 ) {
00448         fprintf(stderr, "invalid packet size : '%s'\n", optarg);
00449         return 1;
00450       }
00451       if (tmp < 0) {
00452         fprintf(stderr, "size must be positive number\n");
00453         return 1;
00454       }
00455       packet_size = tmp;
00456       break;
00457     case 'j':
00458       if ( sscanf(optarg, "%i", &tmp) != 1 ) {
00459         fprintf(stderr, "invalid amount : '%s'\n", optarg);
00460         return 1;
00461       }
00462       if (tmp <= 0) {
00463         fprintf(stderr, "number of threads must be positive number\n");
00464         return 1;
00465       }
00466       num_threads = tmp;
00467       break;              
00468     case 'i': {
00469       interface = optarg;
00470       break;
00471     }
00472     case 'v':
00473       send_thread::verbose += 1;
00474       break;
00475     case '?':
00476       fprintf(stderr, "unknown/unsupported option %c\n", optopt);
00477       return 1;
00478     default:
00479       fprintf(stderr, "programming error %c = %d??\n", optchar, optchar);
00480       return 1;                         
00481     } // end switch        
00482   }
00483 
00484   if (interface == NULL) {
00485     fprintf(stderr,"You must specify and interface with (-i)\n");
00486     return 1;
00487   }
00488 
00489   // Must run as root
00490   int test_sock = socket(PF_PACKET, SOCK_RAW, htons(0x88A4));
00491   if ((test_sock < 0) && (errno == EPERM))
00492   {
00493     ROS_FATAL("Insufficient priviledges to open raw socket. Try running as root");
00494     ROS_BREAK();
00495   }
00496   close(test_sock);
00497   
00498   if (!is_iface_up(interface)) {
00499     fprintf(stderr, "Interface %s is not UP, try 'ifconfig %s up'", interface, interface);
00500     return 1;
00501   }
00502     
00503   struct netif *ni = init_ec(interface);         
00504   if (ni == NULL) {
00505     fprintf(stderr, "Initializing Network interface (%s) failed\n", interface);
00506     return 1;
00507   }
00508 
00509   if (set_socket_timeout(ni, 5000)) {
00510     printf("Setting socket timeout failed\n\n");                
00511     close_socket(ni);
00512     return 1;
00513   }
00514 
00515   ros::NodeHandle node;
00516   ros::Publisher ecstats_pub = node.advertise<ectools::ecstats>("ecstats", 100);
00517   ros::Rate loop_rate(5);
00518                          
00519   std::vector<send_thread*> thread_list;     
00520 
00521   for (unsigned i=0; i<num_threads; ++i) { 
00522     send_thread *p = new send_thread(packet_size, ni);
00523     if (p==NULL){ 
00524       throw std::bad_alloc();                
00525     }
00526     thread_list.push_back(p);
00527   }      
00528   
00529   BOOST_FOREACH( send_thread* t, thread_list ) {  
00530     assert(t!=NULL);
00531     t->start();
00532   }
00533 
00534   uint64_t late_packets_previous = 0;
00535   ectools::ecstats stats;
00536   stats.max_device_count = -1;
00537   while (ros::ok())
00538   {
00539     stats.has_link = is_iface_running(interface);
00540     stats.interval_min_device_count = -1;
00541 
00542     // Zero stats
00543     stats.total_sent_packets = 0;
00544     stats.total_dropped_packets = 0;
00545     stats.total_bandwidth_mbps = 0.0;
00546     stats.interval_sent_packets = 0;
00547     stats.interval_dropped_packets = 0;
00548     stats.interval_bandwidth_mbps = 0.0;
00549 
00550     // Accumulate data from each thread -- 
00551     BOOST_FOREACH( send_thread* &t, thread_list ) {  
00552       t->collect_stats(stats);
00553     }
00554 
00555     // Keep track of how many packets come in late because of operating system glitches
00556     uint64_t late_packets = ni->counters.rx_late_pkt;
00557     stats.total_late_packets = late_packets;
00558     stats.interval_late_packets = late_packets - late_packets_previous;
00559     late_packets_previous = late_packets;
00560    
00561     // Publish data
00562     ecstats_pub.publish(stats);
00563 
00564     ros::spinOnce();
00565     loop_rate.sleep();
00566   }
00567 
00568   
00569   cerr << "Quiting" << endl;
00570   send_thread::quit = true;
00571   usleep(10000);
00572   cout << endl;
00573 
00574   // Stop threads
00575   BOOST_FOREACH( send_thread* &t, thread_list ) {  
00576     t->print(cout);
00577     if (t!=NULL) {
00578       delete t;
00579       t = NULL;
00580     }
00581   }
00582 
00583   for (unsigned xx=0; xx<thread_list.size(); ++xx) {
00584     assert(thread_list[xx] == NULL);    
00585   }
00586     
00587   if (ni!=NULL) {
00588     close_socket(ni);
00589     ni = NULL;
00590   }
00591 
00592   return 0;
00593 }


ectools
Author(s): Derek King
autogenerated on Sat Dec 28 2013 17:52:56