00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036 #include <iostream>
00037 #include <semaphore.h>
00038 #include <map>
00039 #include <queue>
00040 #include <set>
00041 #include <cstring>
00042 #include <sys/socket.h>
00043 #include <arpa/inet.h>
00044 #include "pcap_layer.h"
00045 #include "wmp_config.h"
00046 #include "core/include/frames.h"
00047 #include "bridge.hh"
00048 #include "basic_io.h"
00049 #include "buffer_layer.h"
00050 #include "marte_layer.h"
00051 #include <cassert>
00052 #include <cstdlib>
00053 #include <map>
00054 #include <algorithm>
00055 #include <assert.h>
00056 #include "misc.h"
00057 #include "BoundedHash.h"
00058 #include "shmem_layer.h"
00059 extern "C" {
00060 #include "core/include/wmp_utils.h"
00061 }
00062
00063 #define EXIT_KEY 1
00064
00065 static std::map<int, pac> rxMap;
00066 static std::vector<ts_key_t> keyQueue;
00067 static BoundedHash<int, pac, unsigned long long> bh;
00068 static std::map<int, pac> rxMap2;
00069 static std::vector<int> keyQueue2;
00070 static sem_t rxsem;
00071 static pthread_mutex_t mex;
00072 static bool bl_keep_running = false;
00073
00074 static bool synchronized;
00075 static int num_nodes;
00076 static int st;
00077 static int foreign_idx = 100;
00078
00079 static base_time_t bt[33];
00080
00081
00082
00083
00084
00085 int wmpGetFrameHash(wmpFrame * p){
00086 return (p->hdr.serial * 10000 + p->hdr.from * 1000 + p->hdr.to
00087 * 100 + p->hdr.retries * 10 + p->hdr.type);
00088 }
00089 bool active = false;
00090 void buffer_layer_activate(bool activate){
00091 active = activate;
00092
00093 }
00094
00095 int lastPopped;
00096 void buffer_layer_clear(){
00097 rxMap.clear();
00098 rxMap2.clear();
00099 keyQueue2.clear();
00100 keyQueue.clear();
00101 bh.clear();
00102 }
00103
00104 int buffer_layer_sniff_packet(char * data, simData_Hdr & sdh,
00105 unsigned long long & time_us, std::set<int> & reached, std::map<int,
00106 robo_pose_t> & poses) {
00107 pac pk;
00108 wmpFrame * f = (wmpFrame *) data;
00109 wmpFrame lastFrame;
00110 lastFrame.hdr.serial = 0;
00111 unsigned long long lastTime;
00112 while (1) {
00113 pk = bh.pop();
00114 if (pk.size == 0){
00115 return 0;
00116 }
00117 if (pk.data_source != 32) {
00118 bool cond = lastFrame.hdr.serial + 1 == f->hdr.serial;
00119 if (!cond) {
00120 if (bh.countk2(f->hdr.serial) == 1) {
00121 int key = bh.getKey(f->hdr.serial, 0);
00122 pk = bh.pop(key);
00123 pk.sdh.time_source = 99;
00124 } else {
00125 continue;
00126 }
00127 }
00128 }
00129 memcpy(data, pk.data, pk.size);
00130
00131 sdh = pk.sdh;
00132 time_us = pk.time_us;
00133 if (pk.sdh.frame_type != SP_FOREIGN || lastPopped != SP_FOREIGN) {
00134 break;
00135 }
00136 }
00137 lastTime = pk.time_us;
00138 lastFrame = *f;
00139 lastPopped = pk.sdh.frame_type;
00140 reached = pk.reached;
00141 poses = pk.poses;
00142 return pk.size;
00143 }
00144
00145 void* loop(void * v) {
00146 char buf[2500];
00147 simData_Hdr sdh;
00148 unsigned long long time_us;
00149 wmpFrame * p = (wmpFrame *) buf;
00150 sniff_func_t * sf = (sniff_func_t *) v;
00151 static int ok=0,fore=0;
00152
00153 while (bl_keep_running) {
00154
00155 std::map<int, robo_pose_t> poses;
00156 poses.clear();
00157
00158
00159 int size = sf->sniff_func(buf, sdh, time_us, poses);
00160
00161 if (!active){
00162 buffer_layer_clear();
00163 continue;
00164 }
00165
00166 unsigned int key;
00167 if (size > 0) {
00168 bh.sleep();
00169 bh.lock();
00170 if (sdh.frame_type != SP_FOREIGN) {
00171 key = wmpGetFrameHash(p);
00172 ok++;
00173 } else {
00174 fore++;
00175 key = foreign_idx++;
00176 if (!show_foreign_bridge()){
00177 bh.unlock();
00178 continue;
00179 }
00180 }
00181
00182 fprintf(stderr,"Frames: %d, Foreign: %d - Serial: %d \r",ok,fore, (int) p->hdr.serial);
00183
00184
00185 bool present = bh.find(key);
00186 if (not present) {
00187 pac pkt;
00188 pkt.size = size;
00189 pkt.sdh.data_src = sdh.data_src;
00190 pkt.sdh.key = sdh.key;
00191 pkt.sdh.is_wmp = sdh.is_wmp;
00192 pkt.sdh.frame_type = sdh.frame_type;
00193 pkt.sdh.len = sdh.len;
00194 pkt.data_source = 99;
00195 pkt.time_us = time_us;
00196 pkt.sdh.reinserted = sdh.reinserted;
00197 memcpy(pkt.data, buf, size);
00198
00199
00200 bh.insert(key, p->hdr.serial, pkt.time_us) = pkt;
00201
00202 }
00203 pac & pkt = bh.get(key);
00204 if (sdh.data_src == 32) {
00205 pkt.time_us = time_us;
00206 pkt.sdh.time = time_us;
00207 pkt.sdh.t1 = time_us;
00208 pkt.sdh.rate = sdh.rate;
00209 pkt.sdh.proto = sdh.proto;
00210 pkt.data_source = 32;
00211 if (sdh.frame_type == SP_LUS_WMP_FRAME_DUP) {
00212 if (pkt.sdh.t1 != 0) {
00213
00214 pkt.sdh.t2 = time_us + bt[sdh.data_src].time_us;
00215 pkt.sdh.t1t2_valid = true;
00216 }
00217 }
00218 } else {
00219 if (sdh.data_src == p->hdr.from) {
00220 pkt.sdh.onair_local_ts = sdh.onair_local_ts;
00221 pkt.sdh.onair_local_ts_valid = sdh.onair_local_ts_valid;
00222 }
00223 }
00224
00225 std::map<int, robo_pose_t>::iterator iter;
00226 for (iter = poses.begin(); iter != poses.end(); ++iter) {
00227 if (iter->second.pose_is_valid) {
00228 pkt.poses[iter->first] = iter->second;
00229 }
00230 pkt.reached.insert(iter->first);
00231 }
00232 bh.sort();
00233 bh.unlock();
00234 } else {
00235
00236 pac pkt;
00237 pkt.size = 0;
00238 pkt.data_source = 64;
00239 pkt.sdh.key = 0;
00240 pkt.time_us = 0;
00241 bh.lock();
00242 bh.insert(0, 0, time_us + 1) = pkt;
00243 bh.drain();
00244 bh.unlock();
00245 bl_keep_running=false;
00246 }
00247 }
00248 return 0;
00249 }
00250
00251 sniff_func_t sf;
00252 sniff_func_t sf2;
00253
00254 int buffer_layer_init(int _st, int nnodes, char * iface) {
00255 bh.clear();
00256 st = _st;
00257 bl_keep_running = true;
00258 pthread_t th1;
00259 sem_init(&rxsem, 1, 0);
00260 pthread_mutex_init(&mex, 0);
00261 synchronized = false;
00262 num_nodes = nnodes;
00263 bh.setBufferSize(0);
00264
00265 memset((char*) bt, 0, sizeof(bt));
00266
00267 switch (st) {
00268 case ST_PCAP:
00269
00270 sf.sniff_func = pcap_sniff_packet;
00271 sf.predominant = true;
00272 sf.id = 11;
00273 pthread_create(&th1, 0, loop, (void *) &sf);
00274 break;
00275 case ST_MARTE:
00276 marte_layer_init(nnodes);
00277 sf2.sniff_func = marte_sniff_packet;
00278 sf2.predominant = false;
00279 sf2.id = 44;
00280 pthread_create(&th1, 0, loop, (void *) &sf2);
00281 break;
00282 case ST_SHARED_MEM:
00283 shmem_init(nnodes);
00284 sf2.sniff_func = shmem_sniff_packet;
00285 sf2.predominant = false;
00286 sf2.id = 66;
00287 pthread_create(&th1, 0, loop, (void *) &sf2);
00288 break;
00289 default:
00290 assert(0);
00291 }
00292 return 1;
00293 }
00294
00295 void buffer_layer_stop() {
00296
00297 switch (st) {
00298 case ST_PCAP:
00299 pcap_layer_close();
00300 break;
00301 case ST_MARTE:
00302 marte_layer_close();
00303 break;
00304 case ST_SHARED_MEM:
00305 shmem_close();
00306 break;
00307 default:
00308 assert(0);
00309 }
00310 }