data_receiver.h
Go to the documentation of this file.
00001 /*
00002  * This file is part of the rc_dynamics_api package.
00003  *
00004  * Copyright (c) 2017 Roboception GmbH
00005  * All rights reserved
00006  *
00007  * Author: Christian Emmerich
00008  *
00009  * Redistribution and use in source and binary forms, with or without
00010  * modification, are permitted provided that the following conditions are met:
00011  *
00012  * 1. Redistributions of source code must retain the above copyright notice,
00013  * this list of conditions and the following disclaimer.
00014  *
00015  * 2. Redistributions in binary form must reproduce the above copyright notice,
00016  * this list of conditions and the following disclaimer in the documentation
00017  * and/or other materials provided with the distribution.
00018  *
00019  * 3. Neither the name of the copyright holder nor the names of its contributors
00020  * may be used to endorse or promote products derived from this software without
00021  * specific prior written permission.
00022  *
00023  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
00024  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00025  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00026  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
00027  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
00028  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
00029  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
00030  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
00031  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
00032  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00033  * POSSIBILITY OF SUCH DAMAGE.
00034  */
00035 
00036 #ifndef RC_DYNAMICS_API_DATASTREAM_H
00037 #define RC_DYNAMICS_API_DATASTREAM_H
00038 
00039 #include <memory>
00040 #include <sstream>
00041 
00042 #include <netinet/in.h>
00043 #include <unistd.h>
00044 #include <ifaddrs.h>
00045 #include <arpa/inet.h>
00046 #include <string.h>
00047 
00048 #include "net_utils.h"
00049 #include "socket_exception.h"
00050 
00051 #include "roboception/msgs/frame.pb.h"
00052 #include "roboception/msgs/dynamics.pb.h"
00053 #include "roboception/msgs/imu.pb.h"
00054 
00055 namespace rc
00056 {
00057 namespace dynamics
00058 {
00059 
00060 
00065 class DataReceiver
00066         : public std::enable_shared_from_this<DataReceiver>
00067 {
00068   public:
00069 
00070     using Ptr = std::shared_ptr<DataReceiver>;
00071 
00083     static Ptr create(std::string ip_address, unsigned int &port)
00084     {
00085       return Ptr(new DataReceiver(ip_address, port));
00086     }
00087 
00088     virtual ~DataReceiver()
00089     {
00090       close(_sockfd);
00091     }
00092 
00098     virtual void setTimeout(unsigned int ms)
00099     {
00100       _recvtimeout.tv_sec = ms / 1000;
00101       _recvtimeout.tv_usec = (ms % 1000) * 1000;
00102       if (setsockopt(_sockfd, SOL_SOCKET, SO_RCVTIMEO,
00103                      (const char *) &_recvtimeout,
00104                      sizeof(struct timeval)) < 0)
00105       {
00106         throw SocketException("Error while setting receive timeout!", errno);
00107       }
00108     }
00109 
00110 
00124     template<class PbMsgType>
00125     std::shared_ptr<PbMsgType> receive()
00126     {
00127       // receive msg from socket; blocking call (timeout)
00128       int msg_size = TEMP_FAILURE_RETRY(
00129               recvfrom(_sockfd, _buffer, sizeof(_buffer), 0, NULL, NULL));
00130       if (msg_size < 0)
00131       {
00132         int e = errno;
00133         if (e == EAGAIN || e == EWOULDBLOCK)
00134         {
00135           // timeouts are allowed to happen, then return NULL pointer
00136           return nullptr;
00137         }
00138         else
00139         {
00140           throw SocketException("Error during socket recvfrom!", e);
00141         }
00142       }
00143 
00144       // parse msgs as probobuf
00145       auto pbMsg = std::shared_ptr<PbMsgType>(new PbMsgType());
00146       pbMsg->ParseFromArray(_buffer, msg_size);
00147       return pbMsg;
00148     }
00149 
00164     virtual std::shared_ptr<::google::protobuf::Message> receive(const std::string &pbMsgType)
00165     {
00166       auto found = _recv_func_map.find(pbMsgType);
00167       if (found == _recv_func_map.end())
00168       {
00169         std::stringstream msg;
00170         msg << "Unsupported protobuf message type '" << pbMsgType
00171             << "'. Only the following types are supported: ";
00172         for (auto const &p : _recv_func_map) msg << p.first << " ";
00173         throw std::invalid_argument(msg.str());
00174       }
00175       return _recv_func_map[pbMsgType]();
00176     }
00177 
00178   protected:
00179 
00180     DataReceiver(std::string ip_address, unsigned int &port)
00181     {
00182       // check if given string is a valid IP address
00183       if (!rc::isValidIPAddress(ip_address))
00184       {
00185         throw std::invalid_argument("Given IP address is not a valid address: "
00186                                + ip_address);
00187       }
00188 
00189       // may result in weird errors when not initialized properly
00190       _recvtimeout.tv_sec = 0;
00191       _recvtimeout.tv_usec = 1000 * 10;
00192 
00193       // open socket for UDP listening
00194       _sockfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
00195       if (_sockfd < 0)
00196       {
00197         throw SocketException("Error while creating socket!", errno);
00198       }
00199 
00200       // bind socket to IP address and port number
00201       struct sockaddr_in myaddr;
00202       myaddr.sin_family = AF_INET;
00203       inet_aton(ip_address.c_str(), &myaddr.sin_addr); // set IP addrs
00204       myaddr.sin_port = htons(port);
00205       if (bind(_sockfd, (sockaddr *) &myaddr, sizeof(sockaddr)) < 0)
00206       {
00207         throw SocketException("Error while binding socket!", errno);
00208       }
00209 
00210       // if socket was bound to arbitrary port, we need to figure out to which
00211       // port number
00212       if (port == 0)
00213       {
00214         socklen_t len = sizeof(myaddr);
00215         if (getsockname(_sockfd, (struct sockaddr *) &myaddr, &len) < 0)
00216         {
00217           close(_sockfd);
00218           throw SocketException("Error while getting socket name!", errno);
00219         }
00220         port = ntohs(myaddr.sin_port);
00221       }
00222 
00223       // register all known protobuf message types
00224       _recv_func_map[roboception::msgs::Frame::descriptor()->name()] = std::bind(
00225               &DataReceiver::receive<roboception::msgs::Frame>, this);
00226       _recv_func_map[roboception::msgs::Imu::descriptor()->name()] = std::bind(
00227               &DataReceiver::receive<roboception::msgs::Imu>, this);
00228       _recv_func_map[roboception::msgs::Dynamics::descriptor()->name()] = std::bind(
00229               &DataReceiver::receive<roboception::msgs::Dynamics>, this);
00230     }
00231 
00232     int _sockfd;
00233     struct timeval _recvtimeout;
00234     char _buffer[512];
00235 
00236     typedef std::map<std::string, std::function<std::shared_ptr<::google::protobuf::Message>()>> map_type;
00237     map_type _recv_func_map;
00238 };
00239 
00240 }
00241 }
00242 
00243 #endif //RC_DYNAMICS_API_DATASTREAM_H


rc_visard_driver
Author(s): Heiko Hirschmueller , Christian Emmerich , Felix Ruess
autogenerated on Thu Jun 6 2019 20:43:02