tcp_server.cpp
Go to the documentation of this file.
1 // this is for emacs file handling -*- mode: c++; indent-tabs-mode: nil -*-
2 
3 // -- BEGIN LICENSE BLOCK ----------------------------------------------
4 // Copyright 2021 FZI Forschungszentrum Informatik
5 // Created on behalf of Universal Robots A/S
6 //
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
10 //
11 // http://www.apache.org/licenses/LICENSE-2.0
12 //
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
18 // -- END LICENSE BLOCK ------------------------------------------------
19 
20 //----------------------------------------------------------------------
27 //----------------------------------------------------------------------
28 
29 #include <ur_client_library/log.h>
31 
32 #include <iostream>
33 
34 #include <sstream>
35 #include <strings.h>
36 #include <cstring>
37 #include <fcntl.h>
38 #include <algorithm>
39 #include <system_error>
40 
41 namespace urcl
42 {
43 namespace comm
44 {
45 TCPServer::TCPServer(const int port) : port_(port), maxfd_(0), max_clients_allowed_(0)
46 {
47  init();
48  bind();
49  startListen();
50 }
51 
53 {
54  URCL_LOG_DEBUG("Destroying TCPServer object.");
55  shutdown();
56  close(listen_fd_);
57 }
58 
60 {
61  int err = (listen_fd_ = socket(AF_INET, SOCK_STREAM, 0));
62  if (err == -1)
63  {
64  throw std::system_error(std::error_code(errno, std::generic_category()), "Failed to create socket endpoint");
65  }
66  int flag = 1;
67  setsockopt(listen_fd_, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof(int));
68  setsockopt(listen_fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, sizeof(int));
69 
70  URCL_LOG_DEBUG("Created socket with FD %d", (int)listen_fd_);
71 
72  FD_ZERO(&masterfds_);
73  FD_ZERO(&tempfds_);
74 
75  // Create self-pipe for interrupting the worker loop
76  if (pipe(self_pipe_) == -1)
77  {
78  throw std::system_error(std::error_code(errno, std::generic_category()), "Error creating self-pipe");
79  }
80  URCL_LOG_DEBUG("Created read pipe at FD %d", self_pipe_[0]);
81  FD_SET(self_pipe_[0], &masterfds_);
82 
83  // Make read and write ends of pipe nonblocking
84  int flags;
85  flags = fcntl(self_pipe_[0], F_GETFL);
86  if (flags == -1)
87  {
88  throw std::system_error(std::error_code(errno, std::generic_category()), "fcntl-F_GETFL");
89  }
90  flags |= O_NONBLOCK; // Make read end nonblocking
91  if (fcntl(self_pipe_[0], F_SETFL, flags) == -1)
92  {
93  throw std::system_error(std::error_code(errno, std::generic_category()), "fcntl-F_SETFL");
94  }
95 
96  flags = fcntl(self_pipe_[1], F_GETFL);
97  if (flags == -1)
98  {
99  throw std::system_error(std::error_code(errno, std::generic_category()), "fcntl-F_GETFL");
100  }
101  flags |= O_NONBLOCK; // Make write end nonblocking
102  if (fcntl(self_pipe_[1], F_SETFL, flags) == -1)
103  {
104  throw std::system_error(std::error_code(errno, std::generic_category()), "fcntl-F_SETFL");
105  }
106 }
107 
109 {
110  keep_running_ = false;
111 
112  // This is basically the self-pipe trick. Writing to the pipe will trigger an event for the event
113  // handler which will stop the select() call from blocking.
114  if (::write(self_pipe_[1], "x", 1) == -1 && errno != EAGAIN)
115  {
116  throw std::system_error(std::error_code(errno, std::generic_category()), "Writing to self-pipe failed.");
117  }
118 
119  // After the event loop has finished the thread will be joinable.
120  if (worker_thread_.joinable())
121  {
122  worker_thread_.join();
123  URCL_LOG_DEBUG("Worker thread joined.");
124  }
125 }
126 
128 {
129  struct sockaddr_in server_addr;
130  server_addr.sin_family = AF_INET;
131 
132  // INADDR_ANY is a special constant that signalizes "ANY IFACE",
133  server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
134  server_addr.sin_port = htons(port_);
135  int err = ::bind(listen_fd_, (struct sockaddr*)&server_addr, sizeof(server_addr));
136  if (err == -1)
137  {
138  std::ostringstream ss;
139  ss << "Failed to bind socket for port " << port_ << " to address. Reason: " << strerror(errno);
140  throw std::system_error(std::error_code(errno, std::generic_category()), ss.str());
141  }
142  URCL_LOG_DEBUG("Bound %d:%d to FD %d", server_addr.sin_addr.s_addr, port_, (int)listen_fd_);
143 
144  FD_SET(listen_fd_, &masterfds_);
145  maxfd_ = std::max((int)listen_fd_, self_pipe_[0]);
146 }
147 
149 {
150  int err = listen(listen_fd_, 1);
151  if (err == -1)
152  {
153  std::ostringstream ss;
154  ss << "Failed to start listen on port " << port_;
155  throw std::system_error(std::error_code(errno, std::generic_category()), ss.str());
156  }
157  URCL_LOG_DEBUG("Listening on port %d", port_);
158 }
159 
161 {
162  struct sockaddr_storage client_addr;
163  socklen_t addrlen = sizeof(client_addr);
164  int client_fd = accept(listen_fd_, (struct sockaddr*)&client_addr, &addrlen);
165  if (client_fd < 0)
166  {
167  std::ostringstream ss;
168  ss << "Failed to accept connection request on port " << port_;
169  throw std::system_error(std::error_code(errno, std::generic_category()), ss.str());
170  }
171 
173  {
174  client_fds_.push_back(client_fd);
175  FD_SET(client_fd, &masterfds_);
176  if (client_fd > maxfd_)
177  {
178  maxfd_ = std::max(client_fd, self_pipe_[0]);
179  }
181  {
182  new_connection_callback_(client_fd);
183  }
184  }
185  else
186  {
187  URCL_LOG_WARN("Connection attempt on port %d while maximum number of clients (%d) is already connected. Closing "
188  "connection.",
190  close(client_fd);
191  }
192 }
193 
195 {
197 
198  // blocks until activity on any socket from tempfds
199  int sel = select(maxfd_ + 1, &tempfds_, NULL, NULL, NULL);
200  if (sel < 0)
201  {
202  URCL_LOG_ERROR("select() failed. Shutting down socket event handler.");
203  keep_running_ = false;
204  return;
205  }
206 
207  // Read part if pipe-trick. This will help interrupting the event handler thread.
208  if (FD_ISSET(self_pipe_[0], &masterfds_))
209  {
210  URCL_LOG_DEBUG("Activity on self-pipe");
211  char buffer;
212  if (read(self_pipe_[0], &buffer, 1) == -1)
213  {
214  while (true)
215  {
216  if (errno == EAGAIN)
217  break;
218  else
219  URCL_LOG_ERROR("read failed");
220  }
221  }
222  else
223  {
224  URCL_LOG_DEBUG("Self-pipe triggered");
225  return;
226  }
227  }
228 
229  // Check which fd has an activity
230  for (int i = 0; i <= maxfd_; i++)
231  {
232  if (FD_ISSET(i, &tempfds_))
233  {
234  URCL_LOG_DEBUG("Activity on FD %d", i);
235  if (listen_fd_ == i)
236  {
237  // Activity on the listen_fd means we have a new connection
238  handleConnect();
239  }
240  else
241  {
242  readData(i);
243  }
244  }
245  }
246 }
247 
248 void TCPServer::handleDisconnect(const int fd)
249 {
250  URCL_LOG_DEBUG("%d disconnected.", fd);
251  close(fd);
253  {
255  }
256  FD_CLR(fd, &masterfds_);
257 
258  for (size_t i = 0; i < client_fds_.size(); ++i)
259  {
260  if (client_fds_[i] == fd)
261  {
262  client_fds_.erase(client_fds_.begin() + i);
263  break;
264  }
265  }
266 }
267 
268 void TCPServer::readData(const int fd)
269 {
270  bzero(&input_buffer_, INPUT_BUFFER_SIZE); // clear input buffer
271  int nbytesrecv = recv(fd, input_buffer_, INPUT_BUFFER_SIZE, 0);
272  if (nbytesrecv > 0)
273  {
274  if (message_callback_)
275  {
276  message_callback_(fd, input_buffer_, nbytesrecv);
277  }
278  }
279  else
280  {
281  if (0 < nbytesrecv)
282  {
283  if (errno == ECONNRESET) // if connection gets reset by client, we want to suppress this output
284  {
285  URCL_LOG_DEBUG("client from FD %s sent a connection reset package.", fd);
286  }
287  {
288  URCL_LOG_ERROR("recv() on FD %d failed.", fd);
289  }
290  }
291  else
292  {
293  // normal disconnect
294  }
295  handleDisconnect(fd);
296  }
297 }
298 
300 {
301  while (keep_running_)
302  {
303  spin();
304  }
305  URCL_LOG_DEBUG("Finished worker thread of TCPServer");
306 }
307 
309 {
310  URCL_LOG_DEBUG("Starting worker thread");
311  keep_running_ = true;
312  worker_thread_ = std::thread(&TCPServer::worker, this);
313 }
314 
315 bool TCPServer::write(const int fd, const uint8_t* buf, const size_t buf_len, size_t& written)
316 {
317  written = 0;
318 
319  size_t remaining = buf_len;
320 
321  // handle partial sends
322  while (written < buf_len)
323  {
324  ssize_t sent = ::send(fd, buf + written, remaining, 0);
325 
326  if (sent <= 0)
327  {
328  URCL_LOG_ERROR("Sending data through socket failed.");
329  return false;
330  }
331 
332  written += sent;
333  remaining -= sent;
334  }
335 
336  return true;
337 }
338 
339 } // namespace comm
340 } // namespace urcl
#define URCL_LOG_ERROR(...)
Definition: log.h:26
std::thread worker_thread_
Definition: tcp_server.h:168
std::vector< int > client_fds_
Definition: tcp_server.h:178
void spin()
Event handler. Blocks until activity on any client or connection attempt.
Definition: tcp_server.cpp:194
std::atomic< int > listen_fd_
Definition: tcp_server.h:170
void start()
Start event handling.
Definition: tcp_server.cpp:308
void handleDisconnect(const int fd)
Definition: tcp_server.cpp:248
void readData(const int fd)
read data from socket
Definition: tcp_server.cpp:268
void shutdown()
Shut down the event listener thread. After calling this, no events will be handled anymore...
Definition: tcp_server.cpp:108
char input_buffer_[INPUT_BUFFER_SIZE]
Definition: tcp_server.h:184
std::function< void(const int)> disconnect_callback_
Definition: tcp_server.h:187
std::atomic< bool > keep_running_
Definition: tcp_server.h:167
void worker()
Runs spin() as long as keep_running_ is set to true.
Definition: tcp_server.cpp:299
uint32_t max_clients_allowed_
Definition: tcp_server.h:177
#define URCL_LOG_DEBUG(...)
Definition: log.h:23
std::function< void(const int, char *buffer, int nbytesrecv)> message_callback_
Definition: tcp_server.h:188
bool write(const int fd, const uint8_t *buf, const size_t buf_len, size_t &written)
Writes to a client.
Definition: tcp_server.cpp:315
#define URCL_LOG_WARN(...)
Definition: log.h:24
static const int INPUT_BUFFER_SIZE
Definition: tcp_server.h:183
void handleConnect()
Handles connection events.
Definition: tcp_server.cpp:160
std::function< void(const int)> new_connection_callback_
Definition: tcp_server.h:186


ur_client_library
Author(s): Thomas Timm Andersen, Simon Rasmussen, Felix Exner, Lea Steffen, Tristan Schnell
autogenerated on Tue Jul 4 2023 02:09:47