buffer_layer.cc
Go to the documentation of this file.
00001 /*------------------------------------------------------------------------
00002  *---------------------           WMPSNIFFER          --------------------
00003  *------------------------------------------------------------------------
00004  *                                                         V7.0B  11/05/10
00005  *
00006  *
00007  *  File: buff_layer.cc
00008  *  Authors: Danilo Tardioli
00009  *  ----------------------------------------------------------------------
00010  *  Copyright (C) 2000-2012, Universidad de Zaragoza, SPAIN
00011  *
00012  *  Contact Addresses: Danilo Tardioli                   dantard@unizar.es
00013  *
00014  *  RT-WMP is free software; you can  redistribute it and/or  modify it
00015  *  under the terms of the GNU General Public License  as published by the
00016  *  Free Software Foundation;  either  version 2, or (at  your option) any
00017  *  later version.
00018  *
00019  *  RT-WMP  is distributed  in the  hope  that  it will be   useful, but
00020  *  WITHOUT  ANY  WARRANTY;     without  even the   implied   warranty  of
00021  *  MERCHANTABILITY  or  FITNESS FOR A  PARTICULAR PURPOSE.    See the GNU
00022  *  General Public License for more details.
00023  *
00024  *  You should have received  a  copy of  the  GNU General Public  License
00025  *  distributed with RT-WMP;  see file COPYING.   If not,  write to the
00026  *  Free Software  Foundation,  59 Temple Place  -  Suite 330,  Boston, MA
00027  *  02111-1307, USA.
00028  *
00029  *  As a  special exception, if you  link this  unit  with other  files to
00030  *  produce an   executable,   this unit  does  not  by  itself cause  the
00031  *  resulting executable to be covered by the  GNU General Public License.
00032  *  This exception does  not however invalidate  any other reasons why the
00033  *  executable file might be covered by the GNU Public License.
00034  *
00035  *----------------------------------------------------------------------*/
00036 #include <iostream>
00037 #include <semaphore.h>
00038 #include <map>
00039 #include <queue>
00040 #include <set>
00041 #include <cstring>
00042 #include <sys/socket.h>
00043 #include <arpa/inet.h>
00044 #include "pcap_layer.h"
00045 #include "wmp_config.h"
00046 #include "core/include/frames.h"
00047 #include "bridge.hh"
00048 #include "basic_io.h"
00049 #include "buffer_layer.h"
00050 #include "marte_layer.h"
00051 #include <cassert>
00052 #include <cstdlib>
00053 #include <map>
00054 #include <algorithm>
00055 #include <assert.h>
00056 #include "misc.h"
00057 #include "BoundedHash.h"
00058 #include "shmem_layer.h"
00059 extern "C" {
00060  #include "core/include/wmp_utils.h"
00061 }
00062 
00063 #define EXIT_KEY 1
00064 
00065 static std::map<int, pac> rxMap;
00066 static std::vector<ts_key_t> keyQueue;
00067 static BoundedHash<int, pac, unsigned long long> bh;
00068 static std::map<int, pac> rxMap2;
00069 static std::vector<int> keyQueue2;
00070 static sem_t rxsem;
00071 static pthread_mutex_t mex;
00072 static bool bl_keep_running = false;
00073 
00074 static bool synchronized;
00075 static int num_nodes;
00076 static int st;// = (int*) v;
00077 static int foreign_idx = 100;
00078 
00079 static base_time_t bt[33];
00080 
00081 
00082 
00083 
00084 
00085 int wmpGetFrameHash(wmpFrame * p){
00086         return (p->hdr.serial * 10000 + p->hdr.from * 1000 + p->hdr.to
00087                         * 100 + p->hdr.retries * 10 + p->hdr.type);//+sdh.frame_type;
00088 }
00089 bool active = false;
00090 void buffer_layer_activate(bool activate){
00091         active = activate;
00092 
00093 }
00094 
00095 int lastPopped;
00096 void buffer_layer_clear(){
00097         rxMap.clear();
00098         rxMap2.clear();
00099         keyQueue2.clear();
00100         keyQueue.clear();
00101         bh.clear();
00102 }
00103 
00104 int buffer_layer_sniff_packet(char * data, simData_Hdr & sdh,
00105                 unsigned long long & time_us, std::set<int> & reached, std::map<int,
00106                                 robo_pose_t> & poses) {
00107         pac pk;
00108         wmpFrame * f = (wmpFrame *) data;
00109         wmpFrame lastFrame;
00110         lastFrame.hdr.serial = 0;   
00111         unsigned long long lastTime;
00112         while (1) {
00113                 pk = bh.pop();
00114                 if (pk.size == 0){
00115                         return 0;
00116                 }
00117                 if (pk.data_source != 32) {
00118                         bool cond = lastFrame.hdr.serial + 1 == f->hdr.serial;
00119                         if (!cond) {
00120                                 if (bh.countk2(f->hdr.serial) == 1) {
00121                                         int key = bh.getKey(f->hdr.serial, 0);
00122                                         pk = bh.pop(key);
00123                                         pk.sdh.time_source = 99;
00124                                 } else {
00125                                         continue;
00126                                 }
00127                         }
00128                 }
00129                 memcpy(data, pk.data, pk.size);
00130 
00131                 sdh = pk.sdh;
00132                 time_us = pk.time_us;
00133                 if (pk.sdh.frame_type != SP_FOREIGN || lastPopped != SP_FOREIGN) {
00134                         break;
00135                 }
00136         }
00137         lastTime = pk.time_us;
00138         lastFrame = *f;
00139         lastPopped = pk.sdh.frame_type;
00140         reached = pk.reached;
00141         poses = pk.poses;
00142         return pk.size;
00143 }
00144 
00145 void* loop(void * v) {
00146         char buf[2500];
00147         simData_Hdr sdh;
00148         unsigned long long time_us;
00149         wmpFrame * p = (wmpFrame *) buf;
00150         sniff_func_t * sf = (sniff_func_t *) v;
00151         static int ok=0,fore=0;
00152 
00153         while (bl_keep_running) {
00154 
00155                 std::map<int, robo_pose_t> poses;
00156                 poses.clear();
00157 
00158                 /* SNIFFING */
00159                 int size = sf->sniff_func(buf, sdh, time_us, poses);
00160 
00161                 if (!active){
00162                         buffer_layer_clear();
00163                         continue;
00164                 }
00165 
00166                 unsigned int key;
00167                 if (size > 0) {
00168                         bh.sleep();
00169                         bh.lock();
00170                         if (sdh.frame_type != SP_FOREIGN) {
00171                                 key = wmpGetFrameHash(p);
00172                                 ok++;
00173                         } else {
00174                                 fore++;
00175                                 key = foreign_idx++;
00176                                 if (!show_foreign_bridge()){
00177                                         bh.unlock();
00178                                         continue;
00179                                 }
00180                         }
00181 
00182                         fprintf(stderr,"Frames: %d, Foreign: %d - Serial: %d \r",ok,fore, (int) p->hdr.serial);
00183 
00184 
00185                         bool present = bh.find(key);
00186                         if (not present) {
00187                                 pac pkt;
00188                                 pkt.size = size;
00189                                 pkt.sdh.data_src = sdh.data_src;
00190                                 pkt.sdh.key = sdh.key;
00191                                 pkt.sdh.is_wmp = sdh.is_wmp;
00192                                 pkt.sdh.frame_type = sdh.frame_type;
00193                                 pkt.sdh.len = sdh.len;
00194                                 pkt.data_source = 99;
00195                                 pkt.time_us = time_us;//getRawActualTimeus();
00196                                 pkt.sdh.reinserted = sdh.reinserted;
00197                                 memcpy(pkt.data, buf, size);
00198 
00199                                 //bh.lock();
00200                                 bh.insert(key, p->hdr.serial, pkt.time_us) = pkt;
00201                                 //bh.unlock();
00202                         }
00203                         pac & pkt = bh.get(key);
00204                         if (sdh.data_src == 32) {
00205                                 pkt.time_us = time_us;
00206                                 pkt.sdh.time = time_us;
00207                                 pkt.sdh.t1 = time_us;
00208                                 pkt.sdh.rate = sdh.rate;
00209                                 pkt.sdh.proto = sdh.proto;
00210                                 pkt.data_source = 32;
00211                                 if (sdh.frame_type == SP_LUS_WMP_FRAME_DUP) {
00212                                         if (pkt.sdh.t1 != 0) {
00213                                                 /* OJO, la SP_LUS_WMP_FRAME_DUP llega DOS veces pero la segunda es la buena*/
00214                                                 pkt.sdh.t2 = time_us + bt[sdh.data_src].time_us;
00215                                                 pkt.sdh.t1t2_valid = true;
00216                                         }
00217                                 }
00218                         } else {
00219                                 if (sdh.data_src == p->hdr.from) {
00220                                         pkt.sdh.onair_local_ts = sdh.onair_local_ts;
00221                                         pkt.sdh.onair_local_ts_valid = sdh.onair_local_ts_valid;
00222                                 }
00223                         }
00224                         /* UPDATE POSE < */
00225                         std::map<int, robo_pose_t>::iterator iter;
00226                         for (iter = poses.begin(); iter != poses.end(); ++iter) {
00227                                 if (iter->second.pose_is_valid) {
00228                                         pkt.poses[iter->first] = iter->second;
00229                                 }
00230                                 pkt.reached.insert(iter->first);
00231                         }
00232                         bh.sort();
00233                         bh.unlock();
00234                 } else {
00235                         //fprintf(stderr,"PUSHED EXIT_KEY...\n\n");
00236                         pac pkt;
00237                         pkt.size = 0;
00238                         pkt.data_source = 64;
00239                         pkt.sdh.key = 0;
00240                         pkt.time_us = 0;
00241                         bh.lock();
00242                         bh.insert(0, 0, time_us + 1) = pkt;
00243                         bh.drain();
00244                         bh.unlock();
00245                         bl_keep_running=false;
00246                 }
00247         }
00248         return 0;
00249 }
00250 
00251 sniff_func_t sf;
00252 sniff_func_t sf2;
00253 
00254 int buffer_layer_init(int _st, int nnodes, char * iface) {
00255         bh.clear();
00256         st = _st;
00257         bl_keep_running = true;
00258         pthread_t th1;
00259         sem_init(&rxsem, 1, 0);
00260         pthread_mutex_init(&mex, 0);
00261         synchronized = false;
00262         num_nodes = nnodes;
00263         bh.setBufferSize(0);
00264 
00265         memset((char*) bt, 0, sizeof(bt));
00266 
00267         switch (st) {
00268         case ST_PCAP:
00269                 //if (!pcap_init(iface, nnodes)) return 0;
00270                 sf.sniff_func = pcap_sniff_packet;
00271                 sf.predominant = true;
00272                 sf.id = 11;
00273                 pthread_create(&th1, 0, loop, (void *) &sf);
00274                 break;
00275         case ST_MARTE:
00276                 marte_layer_init(nnodes);
00277                 sf2.sniff_func = marte_sniff_packet;
00278                 sf2.predominant = false;
00279                 sf2.id = 44;
00280                 pthread_create(&th1, 0, loop, (void *) &sf2);
00281                 break;
00282         case ST_SHARED_MEM:
00283                 shmem_init(nnodes);
00284                 sf2.sniff_func = shmem_sniff_packet;
00285                 sf2.predominant = false;
00286                 sf2.id = 66;
00287                 pthread_create(&th1, 0, loop, (void *) &sf2);
00288                 break;
00289         default:
00290                 assert(0);
00291         }
00292         return 1;
00293 }
00294 
00295 void buffer_layer_stop() {
00296         //keep_runnning = false;
00297         switch (st) {
00298         case ST_PCAP:
00299                 pcap_layer_close();
00300                 break;
00301         case ST_MARTE:
00302                 marte_layer_close();
00303                 break;
00304         case ST_SHARED_MEM:
00305                 shmem_close();
00306                 break;
00307         default:
00308                 assert(0);
00309         }
00310 }


ros_rt_wmp_sniffer
Author(s): Danilo Tardioli, dantard@unizar.es
autogenerated on Fri Jan 3 2014 12:08:32