tcp_server.cpp
Go to the documentation of this file.
1 // this is for emacs file handling -*- mode: c++; indent-tabs-mode: nil -*-
2 
3 // -- BEGIN LICENSE BLOCK ----------------------------------------------
4 // Copyright 2021 FZI Forschungszentrum Informatik
5 // Created on behalf of Universal Robots A/S
6 //
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
10 //
11 // http://www.apache.org/licenses/LICENSE-2.0
12 //
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
18 // -- END LICENSE BLOCK ------------------------------------------------
19 
20 //----------------------------------------------------------------------
27 //----------------------------------------------------------------------
28 
29 #include <ur_client_library/log.h>
31 
32 #include <iostream>
33 
34 #include <sstream>
35 #include <cstring>
36 #include <fcntl.h>
37 #include <algorithm>
38 #include <system_error>
39 
40 namespace urcl
41 {
42 namespace comm
43 {
44 TCPServer::TCPServer(const int port, const size_t max_num_tries, const std::chrono::milliseconds reconnection_time)
45  : port_(port), maxfd_(0), max_clients_allowed_(0)
46 {
47 #ifdef _WIN32
48  WSAData data;
49  ::WSAStartup(MAKEWORD(1, 1), &data);
50 #endif // _WIN32
51 
52  init();
53  bind(max_num_tries, reconnection_time);
54  startListen();
55 }
56 
58 {
59  URCL_LOG_DEBUG("Destroying TCPServer object.");
60  shutdown();
62 }
63 
65 {
66  socket_t err = (listen_fd_ = socket(AF_INET, SOCK_STREAM, 0));
67  if (err < 0)
68  {
69  throw std::system_error(std::error_code(errno, std::generic_category()), "Failed to create socket endpoint");
70  }
71  int flag = 1;
72 #ifndef _WIN32
73  ur_setsockopt(listen_fd_, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof(int));
74 #endif
75  ur_setsockopt(listen_fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, sizeof(int));
76 
77  URCL_LOG_DEBUG("Created socket with FD %d", (int)listen_fd_);
78 
79  FD_ZERO(&masterfds_);
80  FD_ZERO(&tempfds_);
81 }
82 
84 {
85  keep_running_ = false;
86 
87  socket_t shutdown_socket = ::socket(AF_INET, SOCK_STREAM, 0);
88  if (shutdown_socket == INVALID_SOCKET)
89  {
90  throw std::system_error(std::error_code(errno, std::generic_category()), "Unable to create shutdown socket.");
91  }
92 
93 #ifdef _WIN32
94  unsigned long mode = 1;
95  ::ioctlsocket(shutdown_socket, FIONBIO, &mode);
96 #else
97  int flags = ::fcntl(shutdown_socket, F_GETFL, 0);
98  if (flags >= 0)
99  {
100  ::fcntl(shutdown_socket, F_SETFL, flags | O_NONBLOCK);
101  }
102 #endif
103 
104  struct sockaddr_in address;
105  memset(&address, 0, sizeof(address));
106  address.sin_family = AF_INET;
107  address.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
108  address.sin_port = htons(port_);
109 
110  ::connect(shutdown_socket, reinterpret_cast<const sockaddr*>(&address), sizeof(address));
111 
112  // After the event loop has finished the thread will be joinable.
113  if (worker_thread_.joinable())
114  {
115  worker_thread_.join();
116  URCL_LOG_DEBUG("Worker thread joined.");
117  }
118 }
119 
120 void TCPServer::bind(const size_t max_num_tries, const std::chrono::milliseconds reconnection_time)
121 {
122  struct sockaddr_in server_addr;
123  server_addr.sin_family = AF_INET;
124 
125  // INADDR_ANY is a special constant that signalizes "ANY IFACE",
126  server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
127  server_addr.sin_port = htons(port_);
128  int err = -1;
129  size_t connection_counter = 0;
130  do
131  {
132  err = ::bind(listen_fd_, (struct sockaddr*)&server_addr, sizeof(server_addr));
133  if (err == -1)
134  {
135  std::ostringstream ss;
136  ss << "Failed to bind socket for port " << port_ << " to address. Reason: " << strerror(errno);
137 
138  if (connection_counter++ < max_num_tries || max_num_tries == 0)
139  {
140  std::this_thread::sleep_for(reconnection_time);
141  ss << "Retrying in " << std::chrono::duration_cast<std::chrono::duration<float>>(reconnection_time).count()
142  << " seconds";
143  URCL_LOG_WARN("%s", ss.str().c_str());
144  }
145  else
146  {
147  throw std::system_error(std::error_code(errno, std::generic_category()), ss.str());
148  }
149  }
150  } while (err == -1 && (connection_counter <= max_num_tries || max_num_tries == 0));
151 
152  URCL_LOG_DEBUG("Bound %d:%d to FD %d", server_addr.sin_addr.s_addr, port_, (int)listen_fd_);
153 
154  FD_SET(listen_fd_, &masterfds_);
155  maxfd_ = listen_fd_;
156 }
157 
159 {
160  int err = listen(listen_fd_, 1);
161  if (err == -1)
162  {
163  std::ostringstream ss;
164  ss << "Failed to start listen on port " << port_;
165  throw std::system_error(std::error_code(errno, std::generic_category()), ss.str());
166  }
167  URCL_LOG_DEBUG("Listening on port %d", port_);
168 }
169 
171 {
172  struct sockaddr_storage client_addr;
173  socklen_t addrlen = sizeof(client_addr);
174  socket_t client_fd = accept(listen_fd_, (struct sockaddr*)&client_addr, &addrlen);
175  if (client_fd == INVALID_SOCKET)
176  {
177  std::ostringstream ss;
178  ss << "Failed to accept connection request on port " << port_;
179  throw std::system_error(std::error_code(errno, std::generic_category()), ss.str());
180  }
181 
183  {
184  client_fds_.push_back(client_fd);
185  FD_SET(client_fd, &masterfds_);
186  if (client_fd > maxfd_)
187  {
188  maxfd_ = client_fd;
189  }
191  {
192  new_connection_callback_(client_fd);
193  }
194  }
195  else
196  {
197  URCL_LOG_WARN("Connection attempt on port %d while maximum number of clients (%d) is already connected. Closing "
198  "connection.",
200  ur_close(client_fd);
201  }
202 }
203 
205 {
207 
208  // blocks until activity on any socket from tempfds
209  int sel = select(static_cast<int>(maxfd_ + 1), &tempfds_, NULL, NULL, NULL);
210  if (sel < 0)
211  {
212  URCL_LOG_ERROR("select() failed. Shutting down socket event handler.");
213  keep_running_ = false;
214  return;
215  }
216 
217  if (!keep_running_)
218  {
219  return;
220  }
221 
222  // Check which fd has an activity
223  for (socket_t i = 0; i <= maxfd_; i++)
224  {
225  if (FD_ISSET(i, &tempfds_))
226  {
227  URCL_LOG_DEBUG("Activity on FD %d", i);
228  if (listen_fd_ == i)
229  {
230  // Activity on the listen_fd means we have a new connection
231  handleConnect();
232  }
233  else
234  {
235  readData(i);
236  }
237  }
238  }
239 }
240 
242 {
243  URCL_LOG_DEBUG("%d disconnected.", fd);
244  ur_close(fd);
246  {
248  }
249  FD_CLR(fd, &masterfds_);
250 
251  for (size_t i = 0; i < client_fds_.size(); ++i)
252  {
253  if (client_fds_[i] == fd)
254  {
255  client_fds_.erase(client_fds_.begin() + i);
256  break;
257  }
258  }
259 }
260 
262 {
263  memset(input_buffer_, 0, INPUT_BUFFER_SIZE); // clear input buffer
264  int nbytesrecv = recv(fd, input_buffer_, INPUT_BUFFER_SIZE, 0);
265  if (nbytesrecv > 0)
266  {
267  if (message_callback_)
268  {
269  message_callback_(fd, input_buffer_, nbytesrecv);
270  }
271  }
272  else
273  {
274  if (nbytesrecv < 0)
275  {
276  if (errno == ECONNRESET) // if connection gets reset by client, we want to suppress this output
277  {
278  URCL_LOG_DEBUG("client from FD %d sent a connection reset package.", fd);
279  }
280  else
281  {
282  URCL_LOG_ERROR("recv() on FD %d failed.", fd);
283  }
284  }
285  else
286  {
287  // normal disconnect
288  }
289  handleDisconnect(fd);
290  }
291 }
292 
294 {
295  while (keep_running_)
296  {
297  spin();
298  }
299  URCL_LOG_DEBUG("Finished worker thread of TCPServer");
300 }
301 
303 {
304  URCL_LOG_DEBUG("Starting worker thread");
305  keep_running_ = true;
306  worker_thread_ = std::thread(&TCPServer::worker, this);
307 }
308 
309 bool TCPServer::write(const socket_t fd, const uint8_t* buf, const size_t buf_len, size_t& written)
310 {
311  written = 0;
312 
313  size_t remaining = buf_len;
314 
315  // handle partial sends
316  while (written < buf_len)
317  {
318  ssize_t sent = ::send(fd, reinterpret_cast<const char*>(buf + written), static_cast<socklen_t>(remaining), 0);
319 
320  if (sent <= 0)
321  {
322  URCL_LOG_ERROR("Sending data through socket failed.");
323  return false;
324  }
325 
326  written += sent;
327  remaining -= sent;
328  }
329 
330  return true;
331 }
332 
333 } // namespace comm
334 } // namespace urcl
urcl::comm::TCPServer::masterfds_
fd_set masterfds_
Definition: tcp_server.h:183
socket_t
int socket_t
Definition: socket_t.h:57
ur_setsockopt
#define ur_setsockopt
Definition: socket_t.h:63
urcl::comm::TCPServer::shutdown
void shutdown()
Shut down the event listener thread. After calling this, no events will be handled anymore,...
Definition: tcp_server.cpp:83
INVALID_SOCKET
#define INVALID_SOCKET
Definition: socket_t.h:60
urcl::comm::TCPServer::start
void start()
Start event handling.
Definition: tcp_server.cpp:302
urcl::comm::TCPServer::INPUT_BUFFER_SIZE
static const int INPUT_BUFFER_SIZE
Definition: tcp_server.h:189
urcl::comm::TCPServer::handleConnect
void handleConnect()
Handles connection events.
Definition: tcp_server.cpp:170
urcl::comm::TCPServer::input_buffer_
char input_buffer_[INPUT_BUFFER_SIZE]
Definition: tcp_server.h:190
urcl::comm::TCPServer::max_clients_allowed_
uint32_t max_clients_allowed_
Definition: tcp_server.h:186
urcl
Definition: bin_parser.h:36
URCL_LOG_ERROR
#define URCL_LOG_ERROR(...)
Definition: log.h:26
urcl::comm::TCPServer::message_callback_
std::function< void(const socket_t, char *buffer, int nbytesrecv)> message_callback_
Definition: tcp_server.h:194
urcl::comm::TCPServer::keep_running_
std::atomic< bool > keep_running_
Definition: tcp_server.h:176
urcl::comm::TCPServer::port_
int port_
Definition: tcp_server.h:180
URCL_LOG_DEBUG
#define URCL_LOG_DEBUG(...)
Definition: log.h:23
urcl::comm::TCPServer::client_fds_
std::vector< socket_t > client_fds_
Definition: tcp_server.h:187
urcl::comm::TCPServer::tempfds_
fd_set tempfds_
Definition: tcp_server.h:184
urcl::comm::TCPServer::readData
void readData(const socket_t fd)
read data from socket
Definition: tcp_server.cpp:261
urcl::comm::TCPServer::write
bool write(const socket_t fd, const uint8_t *buf, const size_t buf_len, size_t &written)
Writes to a client.
Definition: tcp_server.cpp:309
urcl::comm::TCPServer::handleDisconnect
void handleDisconnect(const socket_t fd)
Definition: tcp_server.cpp:241
urcl::comm::TCPServer::startListen
void startListen()
Definition: tcp_server.cpp:158
urcl::comm::TCPServer::worker_thread_
std::thread worker_thread_
Definition: tcp_server.h:177
log.h
urcl::comm::TCPServer::spin
void spin()
Event handler. Blocks until activity on any client or connection attempt.
Definition: tcp_server.cpp:204
urcl::comm::TCPServer::bind
void bind(const size_t max_num_tries, const std::chrono::milliseconds reconnection_time)
Definition: tcp_server.cpp:120
urcl::comm::TCPServer::init
void init()
Definition: tcp_server.cpp:64
urcl::comm::TCPServer::worker
void worker()
Runs spin() as long as keep_running_ is set to true.
Definition: tcp_server.cpp:293
URCL_LOG_WARN
#define URCL_LOG_WARN(...)
Definition: log.h:24
urcl::comm::TCPServer::TCPServer
TCPServer()=delete
urcl::comm::TCPServer::disconnect_callback_
std::function< void(const socket_t)> disconnect_callback_
Definition: tcp_server.h:193
urcl::comm::TCPServer::new_connection_callback_
std::function< void(const socket_t)> new_connection_callback_
Definition: tcp_server.h:192
urcl::comm::TCPServer::listen_fd_
std::atomic< socket_t > listen_fd_
Definition: tcp_server.h:179
ur_close
#define ur_close
Definition: socket_t.h:64
tcp_server.h
urcl::comm::TCPServer::maxfd_
socket_t maxfd_
Definition: tcp_server.h:182
urcl::comm::TCPServer::~TCPServer
virtual ~TCPServer()
Definition: tcp_server.cpp:57


ur_client_library
Author(s): Thomas Timm Andersen, Simon Rasmussen, Felix Exner, Lea Steffen, Tristan Schnell
autogenerated on Mon May 26 2025 02:35:58