Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036 #ifndef RC_DYNAMICS_API_DATASTREAM_H
00037 #define RC_DYNAMICS_API_DATASTREAM_H
00038
00039 #include <memory>
00040 #include <sstream>
00041
00042 #ifdef WIN32
00043 #include <winsock2.h>
00044 #else
00045 #include <netinet/in.h>
00046 #include <unistd.h>
00047 #include <ifaddrs.h>
00048 #include <arpa/inet.h>
00049 #endif
00050
00051 #include <string.h>
00052 #include <functional>
00053
00054 #include "net_utils.h"
00055 #include "socket_exception.h"
00056
00057 #include "roboception/msgs/frame.pb.h"
00058 #include "roboception/msgs/dynamics.pb.h"
00059 #include "roboception/msgs/imu.pb.h"
00060
00061 namespace rc
00062 {
00063 namespace dynamics
00064 {
00069 class DataReceiver : public std::enable_shared_from_this<DataReceiver>
00070 {
00071 public:
00072 using Ptr = std::shared_ptr<DataReceiver>;
00073
00085 static Ptr create(const std::string& ip_address, unsigned int& port)
00086 {
00087 return Ptr(new DataReceiver(ip_address, port));
00088 }
00089
00090 virtual ~DataReceiver()
00091 {
00092 #ifdef WIN32
00093 closesocket(_sockfd);
00094 #else
00095 close(_sockfd);
00096 #endif
00097 }
00098
00102 std::string getIpAddress() const {
00103 return ip_;
00104 }
00105
00109 unsigned int getPort() const {
00110 return port_;
00111 }
00112
00118 virtual void setTimeout(unsigned int ms)
00119 {
00120 #ifdef WIN32
00121 DWORD timeout = ms;
00122 if (setsockopt(_sockfd, SOL_SOCKET, SO_RCVTIMEO, (const char*)&timeout, sizeof(timeout)) < 0)
00123 {
00124 throw SocketException("Error while setting receive timeout!", errno);
00125 }
00126 #else
00127 struct timeval _recvtimeout;
00128 _recvtimeout.tv_sec = ms / 1000;
00129 _recvtimeout.tv_usec = (ms % 1000) * 1000;
00130 if (setsockopt(_sockfd, SOL_SOCKET, SO_RCVTIMEO, (const char*)&_recvtimeout, sizeof(struct timeval)) < 0)
00131 {
00132 throw SocketException("Error while setting receive timeout!", errno);
00133 }
00134 #endif
00135 }
00136
00150 template <class PbMsgType>
00151 std::shared_ptr<PbMsgType> receive()
00152 {
00153
00154 #ifdef WIN32
00155 int msg_size = recvfrom(_sockfd, _buffer, sizeof(_buffer), 0, NULL, NULL);
00156
00157 if (msg_size < 0)
00158 {
00159 int e = WSAGetLastError();
00160 if (e == WSAETIMEDOUT)
00161 {
00162
00163 return nullptr;
00164 }
00165 else
00166 {
00167 throw SocketException("Error during socket recvfrom!", e);
00168 }
00169 }
00170 #else
00171 int msg_size = TEMP_FAILURE_RETRY(recvfrom(_sockfd, _buffer, sizeof(_buffer), 0, NULL, NULL));
00172
00173 if (msg_size < 0)
00174 {
00175 int e = errno;
00176 if (e == EAGAIN || e == EWOULDBLOCK)
00177 {
00178
00179 return nullptr;
00180 }
00181 else
00182 {
00183 throw SocketException("Error during socket recvfrom!", e);
00184 }
00185 }
00186 #endif
00187
00188
00189 auto pb_msg = std::shared_ptr<PbMsgType>(new PbMsgType());
00190 pb_msg->ParseFromArray(_buffer, msg_size);
00191 return pb_msg;
00192 }
00193
00208 virtual std::shared_ptr<::google::protobuf::Message> receive(const std::string& pb_msg_type)
00209 {
00210 auto found = _recv_func_map.find(pb_msg_type);
00211 if (found == _recv_func_map.end())
00212 {
00213 std::stringstream msg;
00214 msg << "Unsupported protobuf message type '" << pb_msg_type << "'. Only the following types are supported: ";
00215 for (auto const& p : _recv_func_map)
00216 msg << p.first << " ";
00217 throw std::invalid_argument(msg.str());
00218 }
00219 return _recv_func_map[pb_msg_type]();
00220 }
00221
00222 protected:
00223 DataReceiver(const std::string& ip_address, unsigned int& port) : ip_(ip_address), port_(port)
00224 {
00225
00226 if (!rc::isValidIPAddress(ip_address))
00227 {
00228 throw std::invalid_argument("Given IP address is not a valid address: " + ip_address);
00229 }
00230
00231
00232 _sockfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
00233 #ifdef WIN32
00234 if (_sockfd == INVALID_SOCKET)
00235 #else
00236 if (_sockfd < 0)
00237 #endif
00238 {
00239 throw SocketException("Error while creating socket!", errno);
00240 }
00241
00242
00243 struct sockaddr_in myaddr;
00244 myaddr.sin_family = AF_INET;
00245 myaddr.sin_addr.s_addr = inet_addr(ip_address.c_str());
00246 myaddr.sin_port = htons(static_cast<u_short>(port));
00247 if (bind(_sockfd, (sockaddr*)&myaddr, sizeof(sockaddr)) < 0)
00248 {
00249 throw SocketException("Error while binding socket!", errno);
00250 }
00251
00252
00253
00254 if (port == 0)
00255 {
00256 #ifdef WIN32
00257 int len = sizeof(myaddr);
00258 #else
00259 socklen_t len = sizeof(myaddr);
00260 #endif
00261
00262 if (getsockname(_sockfd, (struct sockaddr*)&myaddr, &len) < 0)
00263 {
00264 #ifdef WIN32
00265 closesocket(_sockfd);
00266 #else
00267 close(_sockfd);
00268 #endif
00269
00270 throw SocketException("Error while getting socket name!", errno);
00271 }
00272 port_ = port = ntohs(myaddr.sin_port);
00273 }
00274
00275
00276 _recv_func_map[roboception::msgs::Frame::descriptor()->name()] =
00277 std::bind(&DataReceiver::receive<roboception::msgs::Frame>, this);
00278 _recv_func_map[roboception::msgs::Imu::descriptor()->name()] =
00279 std::bind(&DataReceiver::receive<roboception::msgs::Imu>, this);
00280 _recv_func_map[roboception::msgs::Dynamics::descriptor()->name()] =
00281 std::bind(&DataReceiver::receive<roboception::msgs::Dynamics>, this);
00282 }
00283
00284 #ifdef WIN32
00285 SOCKET _sockfd;
00286 #else
00287 int _sockfd;
00288 #endif
00289
00290 char _buffer[512];
00291
00292 typedef std::map<std::string, std::function<std::shared_ptr<::google::protobuf::Message>()>> map_type;
00293 map_type _recv_func_map;
00294
00295 std::string ip_;
00296 unsigned int port_;
00297 };
00298 }
00299 }
00300
00301 #endif // RC_DYNAMICS_API_DATASTREAM_H