36 #include <gtest/gtest.h> 40 #include <boost/bind.hpp> 41 #include <boost/thread.hpp> 69 if (!transports_[1]->connect(
"localhost", transports_[0]->getServerPort()))
74 transports_[2] = transports_[0]->accept();
83 for (
int i = 0; i < 3; ++i)
87 transports_[i]->close();
97 std::string msg =
"test";
98 int32_t written = transports_[1]->write((uint8_t*)msg.c_str(), msg.length());
99 ASSERT_EQ(written, (int32_t)msg.length());
102 memset(buf, 0,
sizeof(buf));
103 int32_t read = transports_[2]->read(buf, msg.length());
104 ASSERT_EQ(read, (int32_t)msg.length());
105 ASSERT_STREQ((
const char*)buf, msg.c_str());
110 std::string msg =
"test";
111 int32_t written = transports_[1]->write((uint8_t*)msg.c_str(), msg.length());
112 ASSERT_EQ(written, (int32_t)msg.length());
115 memset(buf, 0,
sizeof(buf));
116 int32_t read = transports_[2]->read(buf, 1);
118 ASSERT_STREQ((
const char*)buf, msg.substr(0, 1).c_str());
123 while (*read_out < (int32_t)size)
125 *read_out += transport->read(buf + *read_out, size - *read_out);
132 for (
int i = 0; i < 10; ++i)
134 const uint32_t buf_size = 1024*1024;
135 std::auto_ptr<uint8_t> read_buf(
new uint8_t[buf_size]);
137 std::stringstream ss;
138 for (
int i = 0; i < 100000; ++i)
143 std::string msg = ss.str();
145 ASSERT_TRUE(msg.size() < buf_size);
147 volatile int32_t read_out = 0;
148 volatile bool done_read =
false;
149 boost::thread
t(boost::bind(
readThread, transports_[2], read_buf.get(), msg.size(), &read_out, &done_read));
151 boost::this_thread::sleep(boost::posix_time::milliseconds(50));
153 int32_t written = transports_[1]->write((uint8_t*)msg.c_str(), msg.length());
154 ASSERT_EQ(written, (int32_t)msg.length());
158 boost::this_thread::sleep(boost::posix_time::milliseconds(1));
161 ASSERT_EQ(done_read,
true);
162 ASSERT_EQ(read_out, (int32_t)msg.length());
163 ASSERT_STREQ((
const char*)read_buf.get(), msg.c_str());
169 transports_[1]->close();
172 int32_t read = transports_[1]->read(buf, 1);
178 transports_[1]->close();
180 std::string msg =
"test";
181 int32_t written = transports_[1]->write((uint8_t*)msg.c_str(), msg.length());
182 ASSERT_EQ(written, -1);
201 transports_[2] = transport;
208 poll_set_.update(10);
214 ASSERT_EQ(transport, transports_[index]);
217 while (transport->read(&b, 1) > 0)
219 ++bytes_read_[index];
225 ASSERT_EQ(transport, transports_[index]);
228 transport->write(&b, 1);
230 ++bytes_written_[index];
235 ASSERT_EQ(transport, transports_[index]);
237 disconnected_[index] =
true;
246 bytes_written_[0] = 0;
247 bytes_written_[1] = 0;
248 bytes_written_[2] = 0;
250 disconnected_[0] =
false;
251 disconnected_[1] =
false;
252 disconnected_[2] =
false;
254 transports_[0] = boost::make_shared<TransportTCP>(&poll_set_);
255 transports_[1] = boost::make_shared<TransportTCP>(&poll_set_);
262 if (!transports_[1]->connect(
"localhost", transports_[0]->getServerPort()))
271 while (!transports_[2] && count < 100)
273 boost::this_thread::sleep(boost::posix_time::milliseconds(50));
288 transports_[1]->enableRead();
289 transports_[2]->enableRead();
294 for (
int i = 0; i < 3; ++i)
296 transports_[i]->close();
305 int bytes_written_[3];
306 bool disconnected_[3];
316 transports_[1]->enableWrite();
317 boost::this_thread::sleep(boost::posix_time::milliseconds(50));
318 transports_[1]->disableWrite();
320 boost::this_thread::sleep(boost::posix_time::milliseconds(50));
321 ASSERT_GT(bytes_read_[2], 0);
322 ASSERT_EQ(bytes_read_[2], bytes_written_[1]);
324 int old_read_val = bytes_read_[2];
326 transports_[2]->enableWrite();
327 boost::this_thread::sleep(boost::posix_time::milliseconds(50));
328 transports_[2]->disableWrite();
330 boost::this_thread::sleep(boost::posix_time::milliseconds(50));
331 ASSERT_EQ(bytes_read_[1], bytes_written_[2]);
332 ASSERT_EQ(old_read_val, bytes_read_[2]);
334 transports_[1]->enableWrite();
335 transports_[2]->enableWrite();
336 boost::this_thread::sleep(boost::posix_time::milliseconds(50));
337 transports_[1]->disableWrite();
338 transports_[2]->disableWrite();
340 boost::this_thread::sleep(boost::posix_time::milliseconds(50));
341 ASSERT_GT(bytes_read_[2], 0);
342 ASSERT_EQ(bytes_read_[2], bytes_written_[1]);
343 ASSERT_GT(bytes_read_[1], 0);
344 ASSERT_EQ(bytes_read_[1], bytes_written_[2]);
349 boost::this_thread::sleep(boost::posix_time::milliseconds(50));
350 ASSERT_EQ(bytes_read_[1], 0);
351 ASSERT_EQ(bytes_read_[2], 0);
352 ASSERT_EQ(bytes_written_[1], 0);
353 ASSERT_EQ(bytes_written_[2], 0);
355 transports_[1]->enableWrite();
356 boost::this_thread::sleep(boost::posix_time::milliseconds(50));
357 transports_[1]->disableWrite();
359 boost::this_thread::sleep(boost::posix_time::milliseconds(50));
360 ASSERT_GT(bytes_read_[2], 0);
361 ASSERT_GT(bytes_written_[1], 0);
362 int old_read_val = bytes_read_[2];
363 int old_written_val = bytes_written_[1];
364 boost::this_thread::sleep(boost::posix_time::milliseconds(50));
365 ASSERT_EQ(bytes_read_[2], old_read_val);
366 ASSERT_EQ(bytes_written_[1], old_written_val);
371 ASSERT_EQ(disconnected_[1],
false);
372 ASSERT_EQ(disconnected_[2],
false);
374 transports_[1]->close();
375 ASSERT_EQ(disconnected_[1],
true);
377 boost::this_thread::sleep(boost::posix_time::milliseconds(50));
379 ASSERT_EQ(disconnected_[2],
true);
384 ASSERT_EQ(disconnected_[1],
false);
385 ASSERT_EQ(disconnected_[2],
false);
387 transports_[1]->enableWrite();
388 boost::this_thread::sleep(boost::posix_time::milliseconds(50));
389 ASSERT_GT(bytes_read_[2], 0);
391 transports_[1]->close();
392 ASSERT_EQ(disconnected_[1],
true);
394 boost::this_thread::sleep(boost::posix_time::milliseconds(50));
396 ASSERT_EQ(disconnected_[2],
true);
401 ASSERT_EQ(disconnected_[1],
false);
402 ASSERT_EQ(disconnected_[2],
false);
404 transports_[2]->enableWrite();
405 boost::this_thread::sleep(boost::posix_time::milliseconds(50));
406 ASSERT_GT(bytes_read_[1], 0);
408 transports_[1]->close();
409 ASSERT_EQ(disconnected_[1],
true);
411 boost::this_thread::sleep(boost::posix_time::milliseconds(50));
413 ASSERT_EQ(disconnected_[2],
true);
416 int main(
int argc,
char** argv)
418 testing::InitGoogleTest(&argc, argv);
420 signal(SIGPIPE, SIG_IGN);
422 return RUN_ALL_TESTS();
void connectionReceived(const TransportTCPPtr &transport)
boost::function< void(const TransportTCPPtr &)> AcceptCallback
void onWriteable(const TransportPtr &transport, int index)
boost::thread poll_thread_
TEST_F(Synchronous, writeThenRead)
int main(int argc, char **argv)
void onReadable(const TransportPtr &transport, int index)
void readThread(TransportTCPPtr transport, uint8_t *buf, uint32_t size, volatile int32_t *read_out, volatile bool *done_read)
void onDisconnect(const TransportPtr &transport, int index)