test_transport_tcp.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2008, Willow Garage, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  *
8  * * Redistributions of source code must retain the above copyright
9  * notice, this list of conditions and the following disclaimer.
10  * * Redistributions in binary form must reproduce the above copyright
11  * notice, this list of conditions and the following disclaimer in the
12  * documentation and/or other materials provided with the distribution.
13  * * Neither the name of Willow Garage, Inc. nor the names of its
14  * contributors may be used to endorse or promote products derived from
15  * this software without specific prior written permission.
16  *
17  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27  * POSSIBILITY OF SUCH DAMAGE.
28  */
29 
30 /* Author: Josh Faust */
31 
32 /*
33  * Test version macros
34  */
35 
36 #include <gtest/gtest.h>
37 #include "ros/poll_set.h"
39 
40 #include <boost/bind.hpp>
41 #include <boost/thread.hpp>
42 
43 using namespace ros;
44 
45 class Synchronous : public testing::Test
46 {
47 public:
49  {
50  }
51 
53  {
54  }
55 
56 
57 protected:
58 
59  virtual void SetUp()
60  {
61  transports_[0] = boost::make_shared<TransportTCP>(static_cast<ros::PollSet*>(NULL), TransportTCP::SYNCHRONOUS);
62  transports_[1] = boost::make_shared<TransportTCP>(static_cast<ros::PollSet*>(NULL), TransportTCP::SYNCHRONOUS);
63 
64  if (!transports_[0]->listen(0, 100, TransportTCP::AcceptCallback()))
65  {
66  FAIL();
67  }
68 
69  if (!transports_[1]->connect("localhost", transports_[0]->getServerPort()))
70  {
71  FAIL();
72  }
73 
74  transports_[2] = transports_[0]->accept();
75  if (!transports_[2])
76  {
77  FAIL();
78  }
79  }
80 
81  virtual void TearDown()
82  {
83  for (int i = 0; i < 3; ++i)
84  {
85  if (transports_[i])
86  {
87  transports_[i]->close();
88  }
89  }
90  }
91 
92  TransportTCPPtr transports_[3];
93 };
94 
95 TEST_F(Synchronous, writeThenRead)
96 {
97  std::string msg = "test";
98  int32_t written = transports_[1]->write((uint8_t*)msg.c_str(), msg.length());
99  ASSERT_EQ(written, (int32_t)msg.length());
100 
101  uint8_t buf[5];
102  memset(buf, 0, sizeof(buf));
103  int32_t read = transports_[2]->read(buf, msg.length());
104  ASSERT_EQ(read, (int32_t)msg.length());
105  ASSERT_STREQ((const char*)buf, msg.c_str());
106 }
107 
108 TEST_F(Synchronous, writeThenReadPartial)
109 {
110  std::string msg = "test";
111  int32_t written = transports_[1]->write((uint8_t*)msg.c_str(), msg.length());
112  ASSERT_EQ(written, (int32_t)msg.length());
113 
114  uint8_t buf[5];
115  memset(buf, 0, sizeof(buf));
116  int32_t read = transports_[2]->read(buf, 1);
117  ASSERT_EQ(read, 1);
118  ASSERT_STREQ((const char*)buf, msg.substr(0, 1).c_str());
119 }
120 
121 void readThread(TransportTCPPtr transport, uint8_t* buf, uint32_t size, volatile int32_t* read_out, volatile bool* done_read)
122 {
123  while (*read_out < (int32_t)size)
124  {
125  *read_out += transport->read(buf + *read_out, size - *read_out);
126  }
127  *done_read = true;
128 }
129 
130 TEST_F(Synchronous, readWhileWriting)
131 {
132  for (int i = 0; i < 10; ++i)
133  {
134  const uint32_t buf_size = 1024*1024;
135  std::auto_ptr<uint8_t> read_buf(new uint8_t[buf_size]);
136 
137  std::stringstream ss;
138  for (int i = 0; i < 100000; ++i)
139  {
140  ss << i;
141  }
142 
143  std::string msg = ss.str();
144 
145  ASSERT_TRUE(msg.size() < buf_size);
146 
147  volatile int32_t read_out = 0;
148  volatile bool done_read = false;
149  boost::thread t(boost::bind(readThread, transports_[2], read_buf.get(), msg.size(), &read_out, &done_read));
150 
151  boost::this_thread::sleep(boost::posix_time::milliseconds(50));
152 
153  int32_t written = transports_[1]->write((uint8_t*)msg.c_str(), msg.length());
154  ASSERT_EQ(written, (int32_t)msg.length());
155 
156  while (!done_read)
157  {
158  boost::this_thread::sleep(boost::posix_time::milliseconds(1));
159  }
160 
161  ASSERT_EQ(done_read, true);
162  ASSERT_EQ(read_out, (int32_t)msg.length());
163  ASSERT_STREQ((const char*)read_buf.get(), msg.c_str());
164  }
165 }
166 
167 TEST_F(Synchronous, readAfterClose)
168 {
169  transports_[1]->close();
170 
171  uint8_t buf[5];
172  int32_t read = transports_[1]->read(buf, 1);
173  ASSERT_EQ(read, -1);
174 }
175 
176 TEST_F(Synchronous, writeAfterClose)
177 {
178  transports_[1]->close();
179 
180  std::string msg = "test";
181  int32_t written = transports_[1]->write((uint8_t*)msg.c_str(), msg.length());
182  ASSERT_EQ(written, -1);
183 }
184 
185 class Polled : public testing::Test
186 {
187 public:
189  {
190  }
191 
193  {
194  }
195 
196 
197 protected:
198 
199  void connectionReceived(const TransportTCPPtr& transport)
200  {
201  transports_[2] = transport;
202  }
203 
204  void pollThread()
205  {
206  while (continue_)
207  {
208  poll_set_.update(10);
209  }
210  }
211 
212  void onReadable(const TransportPtr& transport, int index)
213  {
214  ASSERT_EQ(transport, transports_[index]);
215 
216  uint8_t b = 0;
217  while (transport->read(&b, 1) > 0)
218  {
219  ++bytes_read_[index];
220  }
221  }
222 
223  void onWriteable(const TransportPtr& transport, int index)
224  {
225  ASSERT_EQ(transport, transports_[index]);
226 
227  uint8_t b = 0;
228  transport->write(&b, 1);
229 
230  ++bytes_written_[index];
231  }
232 
233  void onDisconnect(const TransportPtr& transport, int index)
234  {
235  ASSERT_EQ(transport, transports_[index]);
236 
237  disconnected_[index] = true;
238  }
239 
240  virtual void SetUp()
241  {
242  bytes_read_[0] = 0;
243  bytes_read_[1] = 0;
244  bytes_read_[2] = 0;
245 
246  bytes_written_[0] = 0;
247  bytes_written_[1] = 0;
248  bytes_written_[2] = 0;
249 
250  disconnected_[0] = false;
251  disconnected_[1] = false;
252  disconnected_[2] = false;
253 
254  transports_[0] = boost::make_shared<TransportTCP>(&poll_set_);
255  transports_[1] = boost::make_shared<TransportTCP>(&poll_set_);
256 
257  if (!transports_[0]->listen(0, 100, boost::bind(&Polled::connectionReceived, this, _1)))
258  {
259  FAIL();
260  }
261 
262  if (!transports_[1]->connect("localhost", transports_[0]->getServerPort()))
263  {
264  FAIL();
265  }
266 
267  continue_ = true;
268  poll_thread_ = boost::thread(boost::bind(&Polled::pollThread, this));
269 
270  int count = 0;
271  while (!transports_[2] && count < 100)
272  {
273  boost::this_thread::sleep(boost::posix_time::milliseconds(50));
274  }
275 
276  if (!transports_[2])
277  {
278  FAIL();
279  }
280 
281  transports_[1]->setReadCallback(boost::bind(&Polled::onReadable, this, _1, 1));
282  transports_[2]->setReadCallback(boost::bind(&Polled::onReadable, this, _1, 2));
283  transports_[1]->setWriteCallback(boost::bind(&Polled::onWriteable, this, _1, 1));
284  transports_[2]->setWriteCallback(boost::bind(&Polled::onWriteable, this, _1, 2));
285  transports_[1]->setDisconnectCallback(boost::bind(&Polled::onDisconnect, this, _1, 1));
286  transports_[2]->setDisconnectCallback(boost::bind(&Polled::onDisconnect, this, _1, 2));
287 
288  transports_[1]->enableRead();
289  transports_[2]->enableRead();
290  }
291 
292  virtual void TearDown()
293  {
294  for (int i = 0; i < 3; ++i)
295  {
296  transports_[i]->close();
297  }
298 
299  continue_ = false;
300  poll_thread_.join();
301  }
302 
303  TransportTCPPtr transports_[3];
304  int bytes_read_[3];
305  int bytes_written_[3];
306  bool disconnected_[3];
307 
309 
310  boost::thread poll_thread_;
311  volatile bool continue_;
312 };
313 
314 TEST_F(Polled, readAndWrite)
315 {
316  transports_[1]->enableWrite();
317  boost::this_thread::sleep(boost::posix_time::milliseconds(50));
318  transports_[1]->disableWrite();
319 
320  boost::this_thread::sleep(boost::posix_time::milliseconds(50));
321  ASSERT_GT(bytes_read_[2], 0);
322  ASSERT_EQ(bytes_read_[2], bytes_written_[1]);
323 
324  int old_read_val = bytes_read_[2];
325 
326  transports_[2]->enableWrite();
327  boost::this_thread::sleep(boost::posix_time::milliseconds(50));
328  transports_[2]->disableWrite();
329 
330  boost::this_thread::sleep(boost::posix_time::milliseconds(50));
331  ASSERT_EQ(bytes_read_[1], bytes_written_[2]);
332  ASSERT_EQ(old_read_val, bytes_read_[2]);
333 
334  transports_[1]->enableWrite();
335  transports_[2]->enableWrite();
336  boost::this_thread::sleep(boost::posix_time::milliseconds(50));
337  transports_[1]->disableWrite();
338  transports_[2]->disableWrite();
339 
340  boost::this_thread::sleep(boost::posix_time::milliseconds(50));
341  ASSERT_GT(bytes_read_[2], 0);
342  ASSERT_EQ(bytes_read_[2], bytes_written_[1]);
343  ASSERT_GT(bytes_read_[1], 0);
344  ASSERT_EQ(bytes_read_[1], bytes_written_[2]);
345 }
346 
347 TEST_F(Polled, enableDisableWrite)
348 {
349  boost::this_thread::sleep(boost::posix_time::milliseconds(50));
350  ASSERT_EQ(bytes_read_[1], 0);
351  ASSERT_EQ(bytes_read_[2], 0);
352  ASSERT_EQ(bytes_written_[1], 0);
353  ASSERT_EQ(bytes_written_[2], 0);
354 
355  transports_[1]->enableWrite();
356  boost::this_thread::sleep(boost::posix_time::milliseconds(50));
357  transports_[1]->disableWrite();
358 
359  boost::this_thread::sleep(boost::posix_time::milliseconds(50));
360  ASSERT_GT(bytes_read_[2], 0);
361  ASSERT_GT(bytes_written_[1], 0);
362  int old_read_val = bytes_read_[2];
363  int old_written_val = bytes_written_[1];
364  boost::this_thread::sleep(boost::posix_time::milliseconds(50));
365  ASSERT_EQ(bytes_read_[2], old_read_val);
366  ASSERT_EQ(bytes_written_[1], old_written_val);
367 }
368 
369 TEST_F(Polled, disconnectNoTraffic)
370 {
371  ASSERT_EQ(disconnected_[1], false);
372  ASSERT_EQ(disconnected_[2], false);
373 
374  transports_[1]->close();
375  ASSERT_EQ(disconnected_[1], true);
376 
377  boost::this_thread::sleep(boost::posix_time::milliseconds(50));
378 
379  ASSERT_EQ(disconnected_[2], true);
380 }
381 
382 TEST_F(Polled, disconnectWriter)
383 {
384  ASSERT_EQ(disconnected_[1], false);
385  ASSERT_EQ(disconnected_[2], false);
386 
387  transports_[1]->enableWrite();
388  boost::this_thread::sleep(boost::posix_time::milliseconds(50));
389  ASSERT_GT(bytes_read_[2], 0);
390 
391  transports_[1]->close();
392  ASSERT_EQ(disconnected_[1], true);
393 
394  boost::this_thread::sleep(boost::posix_time::milliseconds(50));
395 
396  ASSERT_EQ(disconnected_[2], true);
397 }
398 
399 TEST_F(Polled, disconnectReader)
400 {
401  ASSERT_EQ(disconnected_[1], false);
402  ASSERT_EQ(disconnected_[2], false);
403 
404  transports_[2]->enableWrite();
405  boost::this_thread::sleep(boost::posix_time::milliseconds(50));
406  ASSERT_GT(bytes_read_[1], 0);
407 
408  transports_[1]->close();
409  ASSERT_EQ(disconnected_[1], true);
410 
411  boost::this_thread::sleep(boost::posix_time::milliseconds(50));
412 
413  ASSERT_EQ(disconnected_[2], true);
414 }
415 
416 int main(int argc, char** argv)
417 {
418  testing::InitGoogleTest(&argc, argv);
419 
420  signal(SIGPIPE, SIG_IGN);
421 
422  return RUN_ALL_TESTS();
423 }
void connectionReceived(const TransportTCPPtr &transport)
boost::function< void(const TransportTCPPtr &)> AcceptCallback
void onWriteable(const TransportPtr &transport, int index)
virtual void TearDown()
boost::thread poll_thread_
TEST_F(Synchronous, writeThenRead)
int main(int argc, char **argv)
void onReadable(const TransportPtr &transport, int index)
volatile bool continue_
virtual void SetUp()
virtual void SetUp()
void readThread(TransportTCPPtr transport, uint8_t *buf, uint32_t size, volatile int32_t *read_out, volatile bool *done_read)
PollSet poll_set_
void onDisconnect(const TransportPtr &transport, int index)
virtual void TearDown()
ros::WallTime t


test_roscpp
Author(s): Morgan Quigley, Josh Faust, Brian Gerkey, Troy Straszheim, Dirk Thomas
autogenerated on Mon Nov 2 2020 03:52:46