00001 #include "Tcp.h"
00002
00003 namespace castor { namespace net { namespace channels {
00004
00005 Tcp::Connection::Connection(asio::io_service &service, Tcp *tcp) :
00006 address(), socket(service), channel(tcp), closed(false), buffer()
00007 {
00008 }
00009
00010 Tcp::Connection::Connection(const Connection &other) :
00011 address(other.address), socket(other.channel->getIoService()),
00012 channel(other.channel), closed(false), buffer()
00013 {
00014 }
00015
00016 Tcp::Connection::~Connection() {
00017 }
00018
00019 void Tcp::Connection::handleRead(const asio::error_code& error, size_t count) {
00020
00021
00022 bool socketFailure = false;
00023
00024 if ((!error) && (!this->closed)) {
00025
00026 NetAddressPtr address;
00027
00028 try {
00029
00030 address = NetAddress::create(
00031 this->channel->getAddress().getSpicaSpecific(),
00032 this->socket.remote_endpoint());
00033
00034 } catch (const std::exception &e) {
00035 std::cout << "Error: " << e.what() << " closing socket." << std::endl;
00036 socketFailure = true;
00037 }
00038
00039 if (!socketFailure) {
00040
00041
00042
00043 this->channel->signalReceive(this->buffer, count, address);
00044
00045 if (this->channel->async) {
00046 this->socket.async_read_some(
00047 asio::buffer(this->buffer, sizeof(this->buffer)),
00048 this->channel->strand.wrap(
00049 boost::bind(&Connection::handleRead, this,
00050 asio::placeholders::error,
00051 asio::placeholders::bytes_transferred)));
00052 }
00053
00054 return;
00055 }
00056 }
00057
00058 if (this->channel->peerRemoved) {
00059
00060 this->channel->peerRemoved(this->address);
00061 }
00062
00063 this->channel->remove(this->address);
00064 }
00065
00066 Tcp::Connection &Tcp::Connection::operator=(const Tcp::Connection &other) {
00067 this->address = other.address;
00068 this->channel = other.channel;
00069 return *this;
00070 }
00071
00072 asio::ip::tcp::socket &Tcp::Connection::getSocket() {
00073 return this->socket;
00074 }
00075
00076 void Tcp::Connection::start(const NetAddress &address) {
00077
00078 this->address = address;
00079
00080
00081
00082
00083 this->socket.set_option(asio::ip::tcp::no_delay(true));
00084
00085 if (this->channel->async) {
00086 this->socket.async_read_some(asio::buffer(this->buffer, sizeof(this->buffer)),
00087 this->channel->strand.wrap(
00088 boost::bind(&Connection::handleRead, this,
00089 asio::placeholders::error,
00090 asio::placeholders::bytes_transferred)));
00091 }
00092 }
00093
00094 void Tcp::Connection::stop() {
00095
00096 if (this->closed) return;
00097
00098 this->socket.cancel();
00099
00100 this->socket.close();
00101
00102 this->closed = true;
00103 }
00104
00105 Tcp::Tcp(asio::io_service &service) :
00106 CastorChannel(service), connections(), service(&service), acceptor(service),
00107 socket(service), endpoint(), mutex(), opened(false)
00108 {
00109 }
00110
00111 Tcp::Tcp(asio::io_service &service, const NetAddress &a) :
00112 CastorChannel(service), connections(), service(&service), acceptor(service),
00113 socket(service), endpoint(a.getAddress(), a.getPort()), mutex(), opened(false)
00114 {
00115 this->address = a;
00116 }
00117
00118 Tcp::~Tcp() {
00119 }
00120
00121 void Tcp::setAsync(bool async) {
00122 this->async = async;
00123 if ((async) && (isOpen())) {
00124 this->socket.async_read_some(asio::buffer(this->buffer, sizeof(this->buffer)),
00125 this->strand.wrap(
00126 boost::bind(&Tcp::handleRead, this,
00127 asio::placeholders::error,
00128 asio::placeholders::bytes_transferred)));
00129 }
00130 }
00131
00132 void Tcp::add(ConnectionPtr connection) {
00133
00134 boost::mutex::scoped_lock lock(this->mutex);
00135
00136 ConnectionMap::iterator itr = this->connections.find(connection->address);
00137
00138 if (itr == this->connections.end()) {
00139 this->connections[connection->address] = connection;
00140 }
00141 }
00142
00143 void Tcp::remove(const NetAddress &address) {
00144
00145 boost::mutex::scoped_lock lock(this->mutex);
00146
00147 ConnectionMap::iterator itr = this->connections.find(address);
00148
00149 if (itr != this->connections.end()) {
00150 this->connections.erase(itr);
00151 }
00152 }
00153
00154 NetAddress Tcp::getLocalAddress() {
00155 if (this->acceptor.is_open()) {
00156
00157
00158 return NetAddress(this->address.getSpicaSpecific(), this->acceptor.local_endpoint());
00159
00160 } else if (this->socket.is_open()) {
00161
00162
00163 return NetAddress(this->address.getSpicaSpecific(), this->socket.local_endpoint());
00164 }
00165
00166
00167 return this->address;
00168 }
00169
00170 void Tcp::open() {
00171 this->acceptor.open(this->endpoint.protocol());
00172 }
00173
00174 void Tcp::open(const NetAddress &a) {
00175 this->address = a;
00176 }
00177
00178 void Tcp::bind() {
00179 this->acceptor.bind(this->endpoint);
00180 }
00181
00182 void Tcp::reuseAddress(bool on) {
00183 if (this->acceptor.is_open()) {
00184 this->acceptor.set_option(asio::socket_base::reuse_address(on));
00185
00186 } else if (this->socket.is_open()) {
00187 this->socket.set_option(asio::socket_base::reuse_address(on));
00188 }
00189 }
00190
00191 bool Tcp::reuseAddress() {
00192 asio::socket_base::reuse_address result;
00193 if (this->acceptor.is_open()) {
00194 this->acceptor.get_option(result);
00195 } else if (this->socket.is_open()) {
00196 this->socket.get_option(result);
00197 }
00198 return result.value();
00199 }
00200
00201 void Tcp::enableLoopback(bool) {
00202 }
00203
00204 bool Tcp::enableLoopback() {
00205 return false;
00206 }
00207
00208 void Tcp::enableBroadcast(bool) {
00209 }
00210
00211 bool Tcp::enableBroadcast() {
00212 return false;
00213 }
00214
00215 void Tcp::multicastHops(int) {
00216 }
00217
00218 int Tcp::multicastHops() {
00219 return 0;
00220 }
00221
00222 void Tcp::joinGroup(const asio::ip::address &) {
00223 }
00224
00225 void Tcp::leaveGroup(const asio::ip::address &) {
00226 }
00227
00228 void Tcp::accept() throw(NetException) {
00229
00230 this->acceptor.listen();
00231
00232 if (this->address.getPort() == 0) {
00233 if (this->acceptor.is_open()) {
00234 this->address.setPort(this->acceptor.local_endpoint().port());
00235 } else if (this->socket.is_open()) {
00236 this->address.setPort(this->socket.local_endpoint().port());
00237 }
00238 }
00239
00240 this->mode = mode_accept;
00241 this->opened = true;
00242
00243 ConnectionPtr connection = ConnectionPtr(new Connection(*this->service, this));
00244
00245 this->acceptor.async_accept(connection->getSocket(), this->remoteEndpoint,
00246 this->strand.wrap(boost::bind(&Tcp::handleAccept,
00247 this, connection, asio::placeholders::error)));
00248 }
00249
00250 void Tcp::connect() throw(NetException) {
00251
00252 this->mode = mode_connect;
00253 this->opened = true;
00254
00255 this->socket.async_connect(this->endpoint,
00256 this->strand.wrap(boost::bind(&Tcp::handleConnect,
00257 this, asio::placeholders::error)));
00258 }
00259
00260 void Tcp::close() throw(NetException) {
00261
00262 if (!this->opened) return;
00263
00264 this->opened = false;
00265
00266 boost::mutex::scoped_lock lock(this->mutex);
00267
00268 if (this->mode == mode_accept) {
00269
00270 for (ConnectionMap::iterator itr = this->connections.begin();
00271 itr != this->connections.end(); itr++)
00272 {
00273 itr->second->stop();
00274 }
00275
00276 this->acceptor.cancel();
00277 this->acceptor.close();
00278
00279 } else if (mode == mode_connect) {
00280
00281 this->socket.cancel();
00282 this->socket.close();
00283 }
00284 }
00285
00286 void Tcp::close(const NetAddress &address) throw(NetException) {
00287
00288 ConnectionMap::iterator itr = this->connections.find(address);
00289
00290 if (itr != this->connections.end()) {
00291 itr->second->stop();
00292 }
00293 }
00294
00295 bool Tcp::isOpen() {
00296
00297 if (this->mode == mode_accept) {
00298
00299 return this->acceptor.is_open();
00300
00301 } else if (this->mode == mode_connect) {
00302 return this->socket.is_open();
00303 }
00304
00305 return false;
00306 }
00307
00308 size_t Tcp::receive(char *data, size_t length) {
00309
00310 if (this->mode == mode_connect) {
00311 return this->socket.read_some(asio::buffer(data, length));
00312 }
00313
00314 return -1;
00315 }
00316
00317 size_t Tcp::receive(char *data, size_t length, NetAddress &remote) {
00318
00319 if (this->mode == mode_connect) {
00320 remote = this->address;
00321 return this->socket.read_some(asio::buffer(data, length));
00322 }
00323
00324 return -1;
00325 }
00326
00327 void Tcp::send(const char *data, size_t length) {
00328
00329 if (this->mode == mode_connect) {
00330
00331 if (this->async) {
00332 this->socket.async_send(asio::buffer(data, length),
00333 this->strand.wrap(
00334 boost::bind(&Tcp::handleSend, this,
00335 asio::placeholders::error,
00336 asio::placeholders::bytes_transferred)));
00337 } else {
00338 this->socket.send(asio::buffer(data, length));
00339 }
00340 }
00341 }
00342
00343 void Tcp::send(const char *data, size_t length, const NetAddress &remote) {
00344
00345 if (this->mode == mode_accept) {
00346
00347 boost::mutex::scoped_lock lock(mutex);
00348
00349 for (ConnectionMap::iterator itr = this->connections.begin();
00350 itr != this->connections.end(); itr++)
00351 {
00352 if (itr->first == remote) {
00353
00354 if (this->async) {
00355 itr->second->socket.async_send(asio::buffer(data, length),
00356 this->strand.wrap(
00357 boost::bind(&Tcp::handleSend, this,
00358 asio::placeholders::error,
00359 asio::placeholders::bytes_transferred)));
00360 } else {
00361
00362 itr->second->socket.send(asio::buffer(data, length));
00363 }
00364
00365 break;
00366 }
00367 }
00368
00369 } else if (this->mode == mode_connect) {
00370
00371 if (this->async) {
00372 this->socket.async_send(asio::buffer(data, length),
00373 this->strand.wrap(
00374 boost::bind(&Tcp::handleSend, this,
00375 asio::placeholders::error,
00376 asio::placeholders::bytes_transferred)));
00377 } else {
00378 this->socket.send(asio::buffer(data, length));
00379 }
00380 }
00381 }
00382
00383 void Tcp::handleAccept(ConnectionPtr connection, const asio::error_code& error) {
00384
00385 if (!error) {
00386
00387 connection->start(NetAddress(ss_none, this->remoteEndpoint));
00388
00389 add(connection);
00390
00391 if (this->peerAdded) {
00392 this->peerAdded(connection->address);
00393 }
00394
00395 ConnectionPtr connection = ConnectionPtr(new Connection(*this->service, this));
00396
00397 this->acceptor.async_accept(connection->getSocket(), this->remoteEndpoint,
00398 this->strand.wrap(
00399 boost::bind(&Tcp::handleAccept, this,
00400 connection, asio::placeholders::error)));
00401
00402 return;
00403 }
00404 }
00405
00406 void Tcp::handleConnect(const asio::error_code& error) {
00407
00408 if (!error) {
00409
00410 this->socket.set_option(asio::ip::tcp::no_delay(true));
00411
00412 if (this->connectionEstablished) {
00413 this->connectionEstablished(this->address);
00414 }
00415
00416 if (this->async) {
00417 this->socket.async_read_some(asio::buffer(this->buffer, sizeof(this->buffer)),
00418 this->strand.wrap(
00419 boost::bind(&Tcp::handleRead, this,
00420 asio::placeholders::error,
00421 asio::placeholders::bytes_transferred)));
00422 }
00423
00424 } else if (this->opened) {
00425
00426 asio::deadline_timer timer(*this->service, boost::posix_time::milliseconds(250));
00427 timer.wait();
00428
00429 this->socket.async_connect(this->endpoint,
00430 this->strand.wrap(
00431 boost::bind(&Tcp::handleConnect, this,
00432 asio::placeholders::error)));
00433 }
00434 }
00435
00436 void Tcp::handleSend(const asio::error_code&, size_t) {
00437
00438
00439
00440
00441 }
00442
00443 void Tcp::handleRead(const asio::error_code& error, size_t count) {
00444
00445
00446 bool socketFailure = false;
00447
00448 if (!error) {
00449
00450 NetAddressPtr address;
00451
00452 try {
00453
00454 address = NetAddress::create(
00455 this->address.getSpicaSpecific(),
00456 this->socket.remote_endpoint());
00457
00458 } catch (const std::exception &e) {
00459 std::cout << e.what() << std::endl;
00460 socketFailure = true;
00461 }
00462
00463 if (!socketFailure) {
00464
00465
00466
00467
00468
00469 this->signalReceive(this->buffer, count, address);
00470
00471
00472
00473
00474
00475
00476 if (this->async) {
00477
00478 this->socket.async_read_some(asio::buffer(this->buffer, sizeof(this->buffer)),
00479 this->strand.wrap(
00480 boost::bind(&Tcp::handleRead, this,
00481 asio::placeholders::error,
00482 asio::placeholders::bytes_transferred)));
00483 }
00484
00485 return;
00486 }
00487 }
00488
00489 if (this->mode == mode_accept) {
00490
00491
00492
00493 close();
00494
00495
00496 } else if (this->opened) {
00497
00498 if (this->connectionLost) {
00499 this->connectionLost(this->address);
00500 }
00501
00502 this->socket.close();
00503
00504 asio::deadline_timer timer(*this->service, boost::posix_time::milliseconds(250));
00505 timer.wait();
00506
00507 this->socket.async_connect(this->endpoint,
00508 this->strand.wrap(
00509 boost::bind(&Tcp::handleConnect, this,
00510 asio::placeholders::error)));
00511 }
00512 }
00513
00514 } } }
00515