data_receiver.h
Go to the documentation of this file.
1 /*
2  * This file is part of the rc_dynamics_api package.
3  *
4  * Copyright (c) 2017 Roboception GmbH
5  * All rights reserved
6  *
7  * Author: Christian Emmerich
8  *
9  * Redistribution and use in source and binary forms, with or without
10  * modification, are permitted provided that the following conditions are met:
11  *
12  * 1. Redistributions of source code must retain the above copyright notice,
13  * this list of conditions and the following disclaimer.
14  *
15  * 2. Redistributions in binary form must reproduce the above copyright notice,
16  * this list of conditions and the following disclaimer in the documentation
17  * and/or other materials provided with the distribution.
18  *
19  * 3. Neither the name of the copyright holder nor the names of its contributors
20  * may be used to endorse or promote products derived from this software without
21  * specific prior written permission.
22  *
23  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
24  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
25  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
26  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
27  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
28  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
29  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
30  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
31  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
32  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
33  * POSSIBILITY OF SUCH DAMAGE.
34  */
35 
36 #ifndef RC_DYNAMICS_API_DATASTREAM_H
37 #define RC_DYNAMICS_API_DATASTREAM_H
38 
39 #include <memory>
40 #include <sstream>
41 
42 #ifdef WIN32
43 #include <winsock2.h>
44 #else
45 #include <netinet/in.h>
46 #include <unistd.h>
47 #include <ifaddrs.h>
48 #include <arpa/inet.h>
49 #endif
50 
51 #include <string.h>
52 #include <functional>
53 
54 #include "net_utils.h"
55 #include "socket_exception.h"
56 
57 #include "roboception/msgs/frame.pb.h"
58 #include "roboception/msgs/dynamics.pb.h"
59 #include "roboception/msgs/imu.pb.h"
60 
61 namespace rc
62 {
63 namespace dynamics
64 {
69 class DataReceiver : public std::enable_shared_from_this<DataReceiver>
70 {
71 public:
72  using Ptr = std::shared_ptr<DataReceiver>;
73 
85  static Ptr create(const std::string& ip_address, unsigned int& port)
86  {
87  return Ptr(new DataReceiver(ip_address, port));
88  }
89 
90  virtual ~DataReceiver()
91  {
92 #ifdef WIN32
93  closesocket(_sockfd);
94 #else
95  close(_sockfd);
96 #endif
97  }
98 
102  std::string getIpAddress() const {
103  return ip_;
104  }
105 
109  unsigned int getPort() const {
110  return port_;
111  }
112 
118  virtual void setTimeout(unsigned int ms)
119  {
120 #ifdef WIN32
121  DWORD timeout = ms;
122  if (setsockopt(_sockfd, SOL_SOCKET, SO_RCVTIMEO, (const char*)&timeout, sizeof(timeout)) < 0)
123  {
124  throw SocketException("Error while setting receive timeout!", errno);
125  }
126 #else
127  struct timeval _recvtimeout;
128  _recvtimeout.tv_sec = ms / 1000;
129  _recvtimeout.tv_usec = (ms % 1000) * 1000;
130  if (setsockopt(_sockfd, SOL_SOCKET, SO_RCVTIMEO, (const char*)&_recvtimeout, sizeof(struct timeval)) < 0)
131  {
132  throw SocketException("Error while setting receive timeout!", errno);
133  }
134 #endif
135  }
136 
150  template <class PbMsgType>
151  std::shared_ptr<PbMsgType> receive()
152  {
153 // receive msg from socket; blocking call (timeout)
154 #ifdef WIN32
155  int msg_size = recvfrom(_sockfd, _buffer, sizeof(_buffer), 0, NULL, NULL);
156 
157  if (msg_size < 0)
158  {
159  int e = WSAGetLastError();
160  if (e == WSAETIMEDOUT)
161  {
162  // timeouts are allowed to happen, then return NULL pointer
163  return nullptr;
164  }
165  else
166  {
167  throw SocketException("Error during socket recvfrom!", e);
168  }
169  }
170 #else
171  int msg_size = TEMP_FAILURE_RETRY(recvfrom(_sockfd, _buffer, sizeof(_buffer), 0, NULL, NULL));
172 
173  if (msg_size < 0)
174  {
175  int e = errno;
176  if (e == EAGAIN || e == EWOULDBLOCK)
177  {
178  // timeouts are allowed to happen, then return NULL pointer
179  return nullptr;
180  }
181  else
182  {
183  throw SocketException("Error during socket recvfrom!", e);
184  }
185  }
186 #endif
187 
188  // parse msgs as probobuf
189  auto pb_msg = std::shared_ptr<PbMsgType>(new PbMsgType());
190  pb_msg->ParseFromArray(_buffer, msg_size);
191  return pb_msg;
192  }
193 
208  virtual std::shared_ptr<::google::protobuf::Message> receive(const std::string& pb_msg_type)
209  {
210  auto found = _recv_func_map.find(pb_msg_type);
211  if (found == _recv_func_map.end())
212  {
213  std::stringstream msg;
214  msg << "Unsupported protobuf message type '" << pb_msg_type << "'. Only the following types are supported: ";
215  for (auto const& p : _recv_func_map)
216  msg << p.first << " ";
217  throw std::invalid_argument(msg.str());
218  }
219  return _recv_func_map[pb_msg_type]();
220  }
221 
222 protected:
223  DataReceiver(const std::string& ip_address, unsigned int& port) : ip_(ip_address), port_(port)
224  {
225  // check if given string is a valid IP address
226  if (!rc::isValidIPAddress(ip_address))
227  {
228  throw std::invalid_argument("Given IP address is not a valid address: " + ip_address);
229  }
230 
231  // open socket for UDP listening
232  _sockfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
233 #ifdef WIN32
234  if (_sockfd == INVALID_SOCKET)
235 #else
236  if (_sockfd < 0)
237 #endif
238  {
239  throw SocketException("Error while creating socket!", errno);
240  }
241 
242  // bind socket to IP address and port number
243  struct sockaddr_in myaddr;
244  myaddr.sin_family = AF_INET;
245  myaddr.sin_addr.s_addr = inet_addr(ip_address.c_str()); // set IP addrs
246  myaddr.sin_port = htons(static_cast<u_short>(port));
247  if (bind(_sockfd, (sockaddr*)&myaddr, sizeof(sockaddr)) < 0)
248  {
249  throw SocketException("Error while binding socket!", errno);
250  }
251 
252  // if socket was bound to arbitrary port, we need to figure out to which
253  // port number
254  if (port == 0)
255  {
256 #ifdef WIN32
257  int len = sizeof(myaddr);
258 #else
259  socklen_t len = sizeof(myaddr);
260 #endif
261 
262  if (getsockname(_sockfd, (struct sockaddr*)&myaddr, &len) < 0)
263  {
264 #ifdef WIN32
265  closesocket(_sockfd);
266 #else
267  close(_sockfd);
268 #endif
269 
270  throw SocketException("Error while getting socket name!", errno);
271  }
272  port_ = port = ntohs(myaddr.sin_port);
273  }
274 
275  // register all known protobuf message types
276  _recv_func_map[roboception::msgs::Frame::descriptor()->name()] =
277  std::bind(&DataReceiver::receive<roboception::msgs::Frame>, this);
278  _recv_func_map[roboception::msgs::Imu::descriptor()->name()] =
279  std::bind(&DataReceiver::receive<roboception::msgs::Imu>, this);
280  _recv_func_map[roboception::msgs::Dynamics::descriptor()->name()] =
281  std::bind(&DataReceiver::receive<roboception::msgs::Dynamics>, this);
282  }
283 
284 #ifdef WIN32
285  SOCKET _sockfd;
286 #else
287  int _sockfd;
288 #endif
289 
290  char _buffer[512];
291 
292  typedef std::map<std::string, std::function<std::shared_ptr<::google::protobuf::Message>()>> map_type;
294 
295  std::string ip_;
296  unsigned int port_;
297 };
298 }
299 }
300 
301 #endif // RC_DYNAMICS_API_DATASTREAM_H
std::map< std::string, std::function< std::shared_ptr<::google::protobuf::Message >)> > map_type
std::shared_ptr< DataReceiver > Ptr
Definition: data_receiver.h:72
virtual std::shared_ptr<::google::protobuf::Message > receive(const std::string &pb_msg_type)
Receives the next message from data stream (string-parameter version)
std::shared_ptr< PbMsgType > receive()
Receives the next message from data stream (template-parameter version)
virtual void setTimeout(unsigned int ms)
Sets a user-specified timeout for the receivePose() method.
unsigned int getPort() const
Returns port for which the receiver was created.
A simple receiver object for handling data streamed by rc_visard&#39;s rc_dynamics module.
Definition: data_receiver.h:69
bool isValidIPAddress(const std::string &ip)
Checks if given string is a valid IP address.
Definition: net_utils.cc:279
static Ptr create(const std::string &ip_address, unsigned int &port)
Creates a data receiver bound to the user-given IP address and port number.
Definition: data_receiver.h:85
Exception representing an invalid socket operation.
std::string getIpAddress() const
Returns Ip address for which the receiver was created.
DataReceiver(const std::string &ip_address, unsigned int &port)


rc_dynamics_api
Author(s): Heiko Hirschmueller , Christian Emmerich , Felix Endres
autogenerated on Sat Mar 6 2021 03:53:19