tcp_server.cpp
Go to the documentation of this file.
1 // this is for emacs file handling -*- mode: c++; indent-tabs-mode: nil -*-
2 
3 // -- BEGIN LICENSE BLOCK ----------------------------------------------
4 // Copyright 2021 FZI Forschungszentrum Informatik
5 //
6 // Licensed under the Apache License, Version 2.0 (the "License");
7 // you may not use this file except in compliance with the License.
8 // You may obtain a copy of the License at
9 //
10 // http://www.apache.org/licenses/LICENSE-2.0
11 //
12 // Unless required by applicable law or agreed to in writing, software
13 // distributed under the License is distributed on an "AS IS" BASIS,
14 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 // See the License for the specific language governing permissions and
16 // limitations under the License.
17 // -- END LICENSE BLOCK ------------------------------------------------
18 
19 //----------------------------------------------------------------------
26 //----------------------------------------------------------------------
27 
28 #include <ur_client_library/log.h>
30 
31 #include <iostream>
32 
33 #include <sstream>
34 #include <strings.h>
35 #include <cstring>
36 #include <fcntl.h>
37 #include <algorithm>
38 #include <system_error>
39 
40 namespace urcl
41 {
42 namespace comm
43 {
44 TCPServer::TCPServer(const int port) : port_(port), maxfd_(0), max_clients_allowed_(0)
45 {
46  init();
47  bind();
48  startListen();
49 }
50 
52 {
53  URCL_LOG_DEBUG("Destroying TCPServer object.");
54  shutdown();
55  close(listen_fd_);
56 }
57 
59 {
60  int err = (listen_fd_ = socket(AF_INET, SOCK_STREAM, 0));
61  if (err == -1)
62  {
63  throw std::system_error(std::error_code(errno, std::generic_category()), "Failed to create socket endpoint");
64  }
65  int flag = 1;
66  setsockopt(listen_fd_, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof(int));
67  setsockopt(listen_fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, sizeof(int));
68 
69  URCL_LOG_DEBUG("Created socket with FD %d", (int)listen_fd_);
70 
71  FD_ZERO(&masterfds_);
72  FD_ZERO(&tempfds_);
73 
74  // Create self-pipe for interrupting the worker loop
75  if (pipe(self_pipe_) == -1)
76  {
77  throw std::system_error(std::error_code(errno, std::generic_category()), "Error creating self-pipe");
78  }
79  URCL_LOG_DEBUG("Created read pipe at FD %d", self_pipe_[0]);
80  FD_SET(self_pipe_[0], &masterfds_);
81 
82  // Make read and write ends of pipe nonblocking
83  int flags;
84  flags = fcntl(self_pipe_[0], F_GETFL);
85  if (flags == -1)
86  {
87  throw std::system_error(std::error_code(errno, std::generic_category()), "fcntl-F_GETFL");
88  }
89  flags |= O_NONBLOCK; // Make read end nonblocking
90  if (fcntl(self_pipe_[0], F_SETFL, flags) == -1)
91  {
92  throw std::system_error(std::error_code(errno, std::generic_category()), "fcntl-F_SETFL");
93  }
94 
95  flags = fcntl(self_pipe_[1], F_GETFL);
96  if (flags == -1)
97  {
98  throw std::system_error(std::error_code(errno, std::generic_category()), "fcntl-F_GETFL");
99  }
100  flags |= O_NONBLOCK; // Make write end nonblocking
101  if (fcntl(self_pipe_[1], F_SETFL, flags) == -1)
102  {
103  throw std::system_error(std::error_code(errno, std::generic_category()), "fcntl-F_SETFL");
104  }
105 }
106 
108 {
109  keep_running_ = false;
110 
111  // This is basically the self-pipe trick. Writing to the pipe will trigger an event for the event
112  // handler which will stop the select() call from blocking.
113  if (::write(self_pipe_[1], "x", 1) == -1 && errno != EAGAIN)
114  {
115  throw std::system_error(std::error_code(errno, std::generic_category()), "Writing to self-pipe failed.");
116  }
117 
118  // After the event loop has finished the thread will be joinable.
119  if (worker_thread_.joinable())
120  {
121  worker_thread_.join();
122  URCL_LOG_DEBUG("Worker thread joined.");
123  }
124 }
125 
127 {
128  struct sockaddr_in server_addr;
129  server_addr.sin_family = AF_INET;
130 
131  // INADDR_ANY is a special constant that signalizes "ANY IFACE",
132  server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
133  server_addr.sin_port = htons(port_);
134  int err = ::bind(listen_fd_, (struct sockaddr*)&server_addr, sizeof(server_addr));
135  if (err == -1)
136  {
137  std::ostringstream ss;
138  ss << "Failed to bind socket for port " << port_ << " to address. Reason: " << strerror(errno);
139  throw std::system_error(std::error_code(errno, std::generic_category()), ss.str());
140  }
141  URCL_LOG_DEBUG("Bound %d:%d to FD %d", server_addr.sin_addr.s_addr, port_, (int)listen_fd_);
142 
143  FD_SET(listen_fd_, &masterfds_);
144  maxfd_ = std::max((int)listen_fd_, self_pipe_[0]);
145 }
146 
148 {
149  int err = listen(listen_fd_, 1);
150  if (err == -1)
151  {
152  std::ostringstream ss;
153  ss << "Failed to start listen on port " << port_;
154  throw std::system_error(std::error_code(errno, std::generic_category()), ss.str());
155  }
156  URCL_LOG_DEBUG("Listening on port %d", port_);
157 }
158 
160 {
161  struct sockaddr_storage client_addr;
162  socklen_t addrlen = sizeof(client_addr);
163  int client_fd = accept(listen_fd_, (struct sockaddr*)&client_addr, &addrlen);
164  if (client_fd < 0)
165  {
166  std::ostringstream ss;
167  ss << "Failed to accept connection request on port " << port_;
168  throw std::system_error(std::error_code(errno, std::generic_category()), ss.str());
169  }
170 
172  {
173  client_fds_.push_back(client_fd);
174  FD_SET(client_fd, &masterfds_);
175  if (client_fd > maxfd_)
176  {
177  maxfd_ = std::max(client_fd, self_pipe_[0]);
178  }
180  {
181  new_connection_callback_(client_fd);
182  }
183  }
184  else
185  {
186  URCL_LOG_WARN("Connection attempt on port %d while maximum number of clients (%d) is already connected. Closing "
187  "connection.",
189  close(client_fd);
190  }
191 }
192 
194 {
196 
197  // blocks until activity on any socket from tempfds
198  int sel = select(maxfd_ + 1, &tempfds_, NULL, NULL, NULL);
199  if (sel < 0)
200  {
201  URCL_LOG_ERROR("select() failed. Shutting down socket event handler.");
202  keep_running_ = false;
203  return;
204  }
205 
206  // Read part if pipe-trick. This will help interrupting the event handler thread.
207  if (FD_ISSET(self_pipe_[0], &masterfds_))
208  {
209  URCL_LOG_DEBUG("Activity on self-pipe");
210  char buffer;
211  if (read(self_pipe_[0], &buffer, 1) == -1)
212  {
213  while (true)
214  {
215  if (errno == EAGAIN)
216  break;
217  else
218  URCL_LOG_ERROR("read failed");
219  }
220  }
221  else
222  {
223  URCL_LOG_DEBUG("Self-pipe triggered");
224  return;
225  }
226  }
227 
228  // Check which fd has an activity
229  for (int i = 0; i <= maxfd_; i++)
230  {
231  if (FD_ISSET(i, &tempfds_))
232  {
233  URCL_LOG_DEBUG("Activity on FD %d", i);
234  if (listen_fd_ == i)
235  {
236  // Activity on the listen_fd means we have a new connection
237  handleConnect();
238  }
239  else
240  {
241  readData(i);
242  }
243  }
244  }
245 }
246 
247 void TCPServer::handleDisconnect(const int fd)
248 {
249  URCL_LOG_DEBUG("%d disconnected.", fd);
250  close(fd);
252  {
254  }
255  FD_CLR(fd, &masterfds_);
256 
257  for (size_t i = 0; i < client_fds_.size(); ++i)
258  {
259  if (client_fds_[i] == fd)
260  {
261  client_fds_.erase(client_fds_.begin() + i);
262  break;
263  }
264  }
265 }
266 
267 void TCPServer::readData(const int fd)
268 {
269  bzero(&input_buffer_, INPUT_BUFFER_SIZE); // clear input buffer
270  int nbytesrecv = recv(fd, input_buffer_, INPUT_BUFFER_SIZE, 0);
271  if (nbytesrecv > 0)
272  {
273  if (message_callback_)
274  {
276  }
277  }
278  else
279  {
280  if (0 < nbytesrecv)
281  {
282  if (errno == ECONNRESET) // if connection gets reset by client, we want to suppress this output
283  {
284  URCL_LOG_DEBUG("client from FD %s sent a connection reset package.", fd);
285  }
286  {
287  URCL_LOG_ERROR("recv() on FD %d failed.", fd);
288  }
289  }
290  else
291  {
292  // normal disconnect
293  }
294  handleDisconnect(fd);
295  }
296 }
297 
299 {
300  while (keep_running_)
301  {
302  spin();
303  }
304  URCL_LOG_DEBUG("Finished worker thread of TCPServer");
305 }
306 
308 {
309  URCL_LOG_DEBUG("Starting worker thread");
310  keep_running_ = true;
311  worker_thread_ = std::thread(&TCPServer::worker, this);
312 }
313 
314 bool TCPServer::write(const int fd, const uint8_t* buf, const size_t buf_len, size_t& written)
315 {
316  written = 0;
317 
318  size_t remaining = buf_len;
319 
320  // handle partial sends
321  while (written < buf_len)
322  {
323  ssize_t sent = ::send(fd, buf + written, remaining, 0);
324 
325  if (sent <= 0)
326  {
327  URCL_LOG_ERROR("Sending data through socket failed.");
328  return false;
329  }
330 
331  written += sent;
332  remaining -= sent;
333  }
334 
335  return true;
336 }
337 
338 } // namespace comm
339 } // namespace urcl
#define URCL_LOG_ERROR(...)
Definition: log.h:37
std::thread worker_thread_
Definition: tcp_server.h:168
std::vector< int > client_fds_
Definition: tcp_server.h:178
void spin()
Event handler. Blocks until activity on any client or connection attempt.
Definition: tcp_server.cpp:193
std::atomic< int > listen_fd_
Definition: tcp_server.h:170
void start()
Start event handling.
Definition: tcp_server.cpp:307
void handleDisconnect(const int fd)
Definition: tcp_server.cpp:247
void readData(const int fd)
read data from socket
Definition: tcp_server.cpp:267
void shutdown()
Shut down the event listener thread. After calling this, no events will be handled anymore...
Definition: tcp_server.cpp:107
char input_buffer_[INPUT_BUFFER_SIZE]
Definition: tcp_server.h:184
std::function< void(const int)> disconnect_callback_
Definition: tcp_server.h:187
std::atomic< bool > keep_running_
Definition: tcp_server.h:167
void worker()
Runs spin() as long as keep_running_ is set to true.
Definition: tcp_server.cpp:298
uint32_t max_clients_allowed_
Definition: tcp_server.h:177
#define URCL_LOG_DEBUG(...)
Definition: log.h:34
bool write(const int fd, const uint8_t *buf, const size_t buf_len, size_t &written)
Writes to a client.
Definition: tcp_server.cpp:314
#define URCL_LOG_WARN(...)
Definition: log.h:35
static const int INPUT_BUFFER_SIZE
Definition: tcp_server.h:183
void handleConnect()
Handles connection events.
Definition: tcp_server.cpp:159
std::function< void(const int, char *buffer)> message_callback_
Definition: tcp_server.h:188
std::function< void(const int)> new_connection_callback_
Definition: tcp_server.h:186


ur_client_library
Author(s): Thomas Timm Andersen, Simon Rasmussen, Felix Exner, Lea Steffen, Tristan Schnell
autogenerated on Sun May 9 2021 02:16:26