Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036 #ifndef RC_DYNAMICS_API_DATASTREAM_H
00037 #define RC_DYNAMICS_API_DATASTREAM_H
00038
00039 #include <memory>
00040 #include <sstream>
00041
00042 #include <netinet/in.h>
00043 #include <unistd.h>
00044 #include <ifaddrs.h>
00045 #include <arpa/inet.h>
00046 #include <string.h>
00047
00048 #include "net_utils.h"
00049 #include "socket_exception.h"
00050
00051 #include "roboception/msgs/frame.pb.h"
00052 #include "roboception/msgs/dynamics.pb.h"
00053 #include "roboception/msgs/imu.pb.h"
00054
00055 namespace rc
00056 {
00057 namespace dynamics
00058 {
00059
00060
00065 class DataReceiver
00066 : public std::enable_shared_from_this<DataReceiver>
00067 {
00068 public:
00069
00070 using Ptr = std::shared_ptr<DataReceiver>;
00071
00083 static Ptr create(std::string ip_address, unsigned int &port)
00084 {
00085 return Ptr(new DataReceiver(ip_address, port));
00086 }
00087
00088 virtual ~DataReceiver()
00089 {
00090 close(_sockfd);
00091 }
00092
00098 virtual void setTimeout(unsigned int ms)
00099 {
00100 _recvtimeout.tv_sec = ms / 1000;
00101 _recvtimeout.tv_usec = (ms % 1000) * 1000;
00102 if (setsockopt(_sockfd, SOL_SOCKET, SO_RCVTIMEO,
00103 (const char *) &_recvtimeout,
00104 sizeof(struct timeval)) < 0)
00105 {
00106 throw SocketException("Error while setting receive timeout!", errno);
00107 }
00108 }
00109
00110
00124 template<class PbMsgType>
00125 std::shared_ptr<PbMsgType> receive()
00126 {
00127
00128 int msg_size = TEMP_FAILURE_RETRY(
00129 recvfrom(_sockfd, _buffer, sizeof(_buffer), 0, NULL, NULL));
00130 if (msg_size < 0)
00131 {
00132 int e = errno;
00133 if (e == EAGAIN || e == EWOULDBLOCK)
00134 {
00135
00136 return nullptr;
00137 }
00138 else
00139 {
00140 throw SocketException("Error during socket recvfrom!", e);
00141 }
00142 }
00143
00144
00145 auto pbMsg = std::shared_ptr<PbMsgType>(new PbMsgType());
00146 pbMsg->ParseFromArray(_buffer, msg_size);
00147 return pbMsg;
00148 }
00149
00164 virtual std::shared_ptr<::google::protobuf::Message> receive(const std::string &pbMsgType)
00165 {
00166 auto found = _recv_func_map.find(pbMsgType);
00167 if (found == _recv_func_map.end())
00168 {
00169 std::stringstream msg;
00170 msg << "Unsupported protobuf message type '" << pbMsgType
00171 << "'. Only the following types are supported: ";
00172 for (auto const &p : _recv_func_map) msg << p.first << " ";
00173 throw std::invalid_argument(msg.str());
00174 }
00175 return _recv_func_map[pbMsgType]();
00176 }
00177
00178 protected:
00179
00180 DataReceiver(std::string ip_address, unsigned int &port)
00181 {
00182
00183 if (!rc::isValidIPAddress(ip_address))
00184 {
00185 throw std::invalid_argument("Given IP address is not a valid address: "
00186 + ip_address);
00187 }
00188
00189
00190 _recvtimeout.tv_sec = 0;
00191 _recvtimeout.tv_usec = 1000 * 10;
00192
00193
00194 _sockfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
00195 if (_sockfd < 0)
00196 {
00197 throw SocketException("Error while creating socket!", errno);
00198 }
00199
00200
00201 struct sockaddr_in myaddr;
00202 myaddr.sin_family = AF_INET;
00203 inet_aton(ip_address.c_str(), &myaddr.sin_addr);
00204 myaddr.sin_port = htons(port);
00205 if (bind(_sockfd, (sockaddr *) &myaddr, sizeof(sockaddr)) < 0)
00206 {
00207 throw SocketException("Error while binding socket!", errno);
00208 }
00209
00210
00211
00212 if (port == 0)
00213 {
00214 socklen_t len = sizeof(myaddr);
00215 if (getsockname(_sockfd, (struct sockaddr *) &myaddr, &len) < 0)
00216 {
00217 close(_sockfd);
00218 throw SocketException("Error while getting socket name!", errno);
00219 }
00220 port = ntohs(myaddr.sin_port);
00221 }
00222
00223
00224 _recv_func_map[roboception::msgs::Frame::descriptor()->name()] = std::bind(
00225 &DataReceiver::receive<roboception::msgs::Frame>, this);
00226 _recv_func_map[roboception::msgs::Imu::descriptor()->name()] = std::bind(
00227 &DataReceiver::receive<roboception::msgs::Imu>, this);
00228 _recv_func_map[roboception::msgs::Dynamics::descriptor()->name()] = std::bind(
00229 &DataReceiver::receive<roboception::msgs::Dynamics>, this);
00230 }
00231
00232 int _sockfd;
00233 struct timeval _recvtimeout;
00234 char _buffer[512];
00235
00236 typedef std::map<std::string, std::function<std::shared_ptr<::google::protobuf::Message>()>> map_type;
00237 map_type _recv_func_map;
00238 };
00239
00240 }
00241 }
00242
00243 #endif //RC_DYNAMICS_API_DATASTREAM_H