data_receiver.h
Go to the documentation of this file.
00001 /*
00002  * This file is part of the rc_dynamics_api package.
00003  *
00004  * Copyright (c) 2017 Roboception GmbH
00005  * All rights reserved
00006  *
00007  * Author: Christian Emmerich
00008  *
00009  * Redistribution and use in source and binary forms, with or without
00010  * modification, are permitted provided that the following conditions are met:
00011  *
00012  * 1. Redistributions of source code must retain the above copyright notice,
00013  * this list of conditions and the following disclaimer.
00014  *
00015  * 2. Redistributions in binary form must reproduce the above copyright notice,
00016  * this list of conditions and the following disclaimer in the documentation
00017  * and/or other materials provided with the distribution.
00018  *
00019  * 3. Neither the name of the copyright holder nor the names of its contributors
00020  * may be used to endorse or promote products derived from this software without
00021  * specific prior written permission.
00022  *
00023  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
00024  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00025  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00026  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
00027  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
00028  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
00029  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
00030  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
00031  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
00032  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00033  * POSSIBILITY OF SUCH DAMAGE.
00034  */
00035 
00036 #ifndef RC_DYNAMICS_API_DATASTREAM_H
00037 #define RC_DYNAMICS_API_DATASTREAM_H
00038 
00039 #include <memory>
00040 #include <sstream>
00041 
00042 #ifdef WIN32
00043 #include <winsock2.h>
00044 #else
00045 #include <netinet/in.h>
00046 #include <unistd.h>
00047 #include <ifaddrs.h>
00048 #include <arpa/inet.h>
00049 #endif
00050 
00051 #include <string.h>
00052 #include <functional>
00053 
00054 #include "net_utils.h"
00055 #include "socket_exception.h"
00056 
00057 #include "roboception/msgs/frame.pb.h"
00058 #include "roboception/msgs/dynamics.pb.h"
00059 #include "roboception/msgs/imu.pb.h"
00060 
00061 namespace rc
00062 {
00063 namespace dynamics
00064 {
00069 class DataReceiver : public std::enable_shared_from_this<DataReceiver>
00070 {
00071 public:
00072   using Ptr = std::shared_ptr<DataReceiver>;
00073 
00085   static Ptr create(const std::string& ip_address, unsigned int& port)
00086   {
00087     return Ptr(new DataReceiver(ip_address, port));
00088   }
00089 
00090   virtual ~DataReceiver()
00091   {
00092 #ifdef WIN32
00093     closesocket(_sockfd);
00094 #else
00095     close(_sockfd);
00096 #endif
00097   }
00098 
00102   std::string getIpAddress() const {
00103     return ip_;
00104   }
00105 
00109   unsigned int getPort() const {
00110     return port_;
00111   }
00112 
00118   virtual void setTimeout(unsigned int ms)
00119   {
00120 #ifdef WIN32
00121     DWORD timeout = ms;
00122     if (setsockopt(_sockfd, SOL_SOCKET, SO_RCVTIMEO, (const char*)&timeout, sizeof(timeout)) < 0)
00123     {
00124       throw SocketException("Error while setting receive timeout!", errno);
00125     }
00126 #else
00127     struct timeval _recvtimeout;
00128     _recvtimeout.tv_sec = ms / 1000;
00129     _recvtimeout.tv_usec = (ms % 1000) * 1000;
00130     if (setsockopt(_sockfd, SOL_SOCKET, SO_RCVTIMEO, (const char*)&_recvtimeout, sizeof(struct timeval)) < 0)
00131     {
00132       throw SocketException("Error while setting receive timeout!", errno);
00133     }
00134 #endif
00135   }
00136 
00150   template <class PbMsgType>
00151   std::shared_ptr<PbMsgType> receive()
00152   {
00153 // receive msg from socket; blocking call (timeout)
00154 #ifdef WIN32
00155     int msg_size = recvfrom(_sockfd, _buffer, sizeof(_buffer), 0, NULL, NULL);
00156 
00157     if (msg_size < 0)
00158     {
00159       int e = WSAGetLastError();
00160       if (e == WSAETIMEDOUT)
00161       {
00162         // timeouts are allowed to happen, then return NULL pointer
00163         return nullptr;
00164       }
00165       else
00166       {
00167         throw SocketException("Error during socket recvfrom!", e);
00168       }
00169     }
00170 #else
00171     int msg_size = TEMP_FAILURE_RETRY(recvfrom(_sockfd, _buffer, sizeof(_buffer), 0, NULL, NULL));
00172 
00173     if (msg_size < 0)
00174     {
00175       int e = errno;
00176       if (e == EAGAIN || e == EWOULDBLOCK)
00177       {
00178         // timeouts are allowed to happen, then return NULL pointer
00179         return nullptr;
00180       }
00181       else
00182       {
00183         throw SocketException("Error during socket recvfrom!", e);
00184       }
00185     }
00186 #endif
00187 
00188     // parse msgs as probobuf
00189     auto pb_msg = std::shared_ptr<PbMsgType>(new PbMsgType());
00190     pb_msg->ParseFromArray(_buffer, msg_size);
00191     return pb_msg;
00192   }
00193 
00208   virtual std::shared_ptr<::google::protobuf::Message> receive(const std::string& pb_msg_type)
00209   {
00210     auto found = _recv_func_map.find(pb_msg_type);
00211     if (found == _recv_func_map.end())
00212     {
00213       std::stringstream msg;
00214       msg << "Unsupported protobuf message type '" << pb_msg_type << "'. Only the following types are supported: ";
00215       for (auto const& p : _recv_func_map)
00216         msg << p.first << " ";
00217       throw std::invalid_argument(msg.str());
00218     }
00219     return _recv_func_map[pb_msg_type]();
00220   }
00221 
00222 protected:
00223   DataReceiver(const std::string& ip_address, unsigned int& port) : ip_(ip_address), port_(port)
00224   {
00225     // check if given string is a valid IP address
00226     if (!rc::isValidIPAddress(ip_address))
00227     {
00228       throw std::invalid_argument("Given IP address is not a valid address: " + ip_address);
00229     }
00230 
00231     // open socket for UDP listening
00232     _sockfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
00233 #ifdef WIN32
00234     if (_sockfd == INVALID_SOCKET)
00235 #else
00236     if (_sockfd < 0)
00237 #endif
00238     {
00239       throw SocketException("Error while creating socket!", errno);
00240     }
00241 
00242     // bind socket to IP address and port number
00243     struct sockaddr_in myaddr;
00244     myaddr.sin_family = AF_INET;
00245     myaddr.sin_addr.s_addr = inet_addr(ip_address.c_str());  // set IP addrs
00246     myaddr.sin_port = htons(static_cast<u_short>(port));
00247     if (bind(_sockfd, (sockaddr*)&myaddr, sizeof(sockaddr)) < 0)
00248     {
00249       throw SocketException("Error while binding socket!", errno);
00250     }
00251 
00252     // if socket was bound to arbitrary port, we need to figure out to which
00253     // port number
00254     if (port == 0)
00255     {
00256 #ifdef WIN32
00257       int len = sizeof(myaddr);
00258 #else
00259       socklen_t len = sizeof(myaddr);
00260 #endif
00261 
00262       if (getsockname(_sockfd, (struct sockaddr*)&myaddr, &len) < 0)
00263       {
00264 #ifdef WIN32
00265         closesocket(_sockfd);
00266 #else
00267         close(_sockfd);
00268 #endif
00269 
00270         throw SocketException("Error while getting socket name!", errno);
00271       }
00272       port_ = port = ntohs(myaddr.sin_port);
00273     }
00274 
00275     // register all known protobuf message types
00276     _recv_func_map[roboception::msgs::Frame::descriptor()->name()] =
00277         std::bind(&DataReceiver::receive<roboception::msgs::Frame>, this);
00278     _recv_func_map[roboception::msgs::Imu::descriptor()->name()] =
00279         std::bind(&DataReceiver::receive<roboception::msgs::Imu>, this);
00280     _recv_func_map[roboception::msgs::Dynamics::descriptor()->name()] =
00281         std::bind(&DataReceiver::receive<roboception::msgs::Dynamics>, this);
00282   }
00283 
00284 #ifdef WIN32
00285   SOCKET _sockfd;
00286 #else
00287   int _sockfd;
00288 #endif
00289 
00290   char _buffer[512];
00291 
00292   typedef std::map<std::string, std::function<std::shared_ptr<::google::protobuf::Message>()>> map_type;
00293   map_type _recv_func_map;
00294 
00295   std::string ip_;
00296   unsigned int port_;
00297 };
00298 }
00299 }
00300 
00301 #endif  // RC_DYNAMICS_API_DATASTREAM_H


rc_dynamics_api
Author(s): Heiko Hirschmueller , Christian Emmerich , Felix Endres
autogenerated on Thu May 9 2019 02:13:50