Go to the documentation of this file.00001
00023 #include "udp_bc_broker/recv.h"
00024
00025 using namespace std;
00026
00027 namespace udp_bc_broker{
00028
00029 UdpRecver::UdpRecver(int port)
00030 {
00031 setvbuf(stdout, NULL, _IONBF, 0);
00032 fflush(stdout);
00033
00034
00035 bzero(&addrto, sizeof(struct sockaddr_in));
00036 addrto.sin_family = AF_INET;
00037 addrto.sin_addr.s_addr = htonl(INADDR_ANY);
00038 addrto.sin_port = htons(port);
00039
00040
00041 bzero(&from, sizeof(struct sockaddr_in));
00042 from.sin_family = AF_INET;
00043 from.sin_addr.s_addr = htonl(INADDR_ANY);
00044 from.sin_port = htons(port);
00045
00046 if ((sock = socket(AF_INET, SOCK_DGRAM, 0)) == -1) {
00047 cout<<"socket error!"<<endl;
00048 exit(-1);
00049 }
00050
00051 const int opt = 1;
00052 int nb = 0;
00053 nb = setsockopt(sock, SOL_SOCKET, SO_BROADCAST, (char *)&opt, sizeof(opt));
00054 if(nb == -1) {
00055 cout<<"set socket error!"<<endl;
00056 exit(-1);
00057 }
00058
00059 if(bind(sock,(struct sockaddr *)&(addrto), sizeof(struct sockaddr_in)) == -1) {
00060 cout<<"bind error!"<<endl;
00061 exit(-1);
00062 }
00063
00064 len = sizeof(sockaddr_in);
00065 buf = new char[10000];
00066 }
00067
00068 UdpRecver::~UdpRecver()
00069 {
00070 process_thread_->interrupt();
00071 process_thread_->join();
00072 delete process_thread_;
00073
00074 delete[] buf;
00075 }
00076
00077 void UdpRecver::process_msg()
00078 {
00079 while(true) {
00080 int ret = recvfrom(sock, buf, 10000, 0, (struct sockaddr*)&from, (socklen_t*)&len);
00081 if(ret <= 0) {
00082 cout<<"read error...."<<sock<<endl;
00083 }
00084 else {
00085 std::vector<uint8_t> msg_data;
00086 for(int i = 0; i < ret; i++) {
00087 msg_data.push_back(buf[i]);
00088 }
00089 callback(msg_data);
00090 }
00091 sleep(0.1);
00092 }
00093 }
00094
00095 void UdpRecver::receive(boost::function<void(const vector<uint8_t>&)> callBack)
00096 {
00097 callback = callBack;
00098 process_thread_ = new boost::thread(&UdpRecver::process_msg, this);
00099 }
00100 };