Tcp.cpp
Go to the documentation of this file.
00001 #include "Tcp.h"
00002 
00003         namespace castor { namespace net { namespace channels {
00004 
00005                 Tcp::Connection::Connection(asio::io_service &service, Tcp *tcp) :
00006                         address(), socket(service), channel(tcp), closed(false), buffer()
00007                 {
00008                 }
00009 
00010                 Tcp::Connection::Connection(const Connection &other) :
00011                         address(other.address), socket(other.channel->getIoService()),
00012                         channel(other.channel), closed(false), buffer()
00013                 {
00014                 }
00015 
00016                 Tcp::Connection::~Connection() {
00017                 }
00018 
00019                 void Tcp::Connection::handleRead(const asio::error_code& error, size_t count) {
00020 //                      std::cout << "Read " << count << std::endl;
00021 
00022                         bool socketFailure = false;
00023 
00024                         if ((!error) && (!this->closed)) {
00025 
00026                                 NetAddressPtr address;
00027 
00028                                 try {
00029 
00030                                         address = NetAddress::create(
00031                                                         this->channel->getAddress().getSpicaSpecific(),
00032                                                         this->socket.remote_endpoint());
00033 
00034                                 } catch (const std::exception &e) {
00035                                         std::cout << "Error: " << e.what() << " closing socket." << std::endl;
00036                                         socketFailure = true;
00037                                 }
00038 
00039                                 if (!socketFailure) {
00040 
00041         //std::cout << " XXXXX geminga: " << count << ": " << std::string(this->buffer, count) << " " << *address << std::endl;
00042 
00043                                 this->channel->signalReceive(this->buffer, count, address);
00044 
00045                                 if (this->channel->async) {
00046                                         this->socket.async_read_some(
00047                                                         asio::buffer(this->buffer, sizeof(this->buffer)),
00048                                                         this->channel->strand.wrap(
00049                                                                         boost::bind(&Connection::handleRead, this,
00050                                                                                         asio::placeholders::error,
00051                                                                                         asio::placeholders::bytes_transferred)));
00052                                 }
00053                                 
00054                                 return;
00055                         }
00056                 }
00057 
00058                 if (this->channel->peerRemoved) {
00059 //                      std::cout << error.message() << " " << this->closed << std::endl;
00060                         this->channel->peerRemoved(this->address);
00061                 }
00062 
00063                 this->channel->remove(this->address);
00064         }
00065 
00066         Tcp::Connection &Tcp::Connection::operator=(const Tcp::Connection &other) {
00067                 this->address = other.address;
00068                 this->channel = other.channel;
00069                 return *this;
00070         }
00071 
00072         asio::ip::tcp::socket &Tcp::Connection::getSocket() {
00073                 return this->socket;
00074         }
00075 
00076         void Tcp::Connection::start(const NetAddress &address) {
00077 
00078                 this->address = address;
00079 //              *NetAddress::create(
00080 //                              this->channel->getAddress().getSpicaSpecific(),
00081 //                              this->socket.remote_endpoint());
00082 
00083                 this->socket.set_option(asio::ip::tcp::no_delay(true));
00084 
00085                 if (this->channel->async) {
00086                         this->socket.async_read_some(asio::buffer(this->buffer, sizeof(this->buffer)),
00087                                                 this->channel->strand.wrap(
00088                                                                 boost::bind(&Connection::handleRead, this,
00089                                                                                 asio::placeholders::error,
00090                                                                                 asio::placeholders::bytes_transferred)));
00091                 }
00092         }
00093 
00094         void Tcp::Connection::stop() {
00095 
00096                 if (this->closed) return;
00097 
00098                 this->socket.cancel();
00099 
00100                 this->socket.close();
00101 
00102                 this->closed = true;
00103         }
00104 
00105         Tcp::Tcp(asio::io_service &service) :
00106                 CastorChannel(service), connections(), service(&service), acceptor(service),
00107                 socket(service), endpoint(), mutex(), opened(false)
00108         {
00109         }
00110 
00111         Tcp::Tcp(asio::io_service &service, const NetAddress &a) :
00112                 CastorChannel(service), connections(), service(&service), acceptor(service),
00113                 socket(service), endpoint(a.getAddress(), a.getPort()), mutex(), opened(false)
00114         {
00115                 this->address = a;
00116         }
00117 
00118         Tcp::~Tcp() {
00119         }
00120 
00121         void Tcp::setAsync(bool async) {
00122                 this->async = async;
00123                 if ((async) && (isOpen())) {
00124                         this->socket.async_read_some(asio::buffer(this->buffer, sizeof(this->buffer)),
00125                                         this->strand.wrap(
00126                                                         boost::bind(&Tcp::handleRead, this,
00127                                                                 asio::placeholders::error,
00128                                                                 asio::placeholders::bytes_transferred)));
00129                 }
00130         }
00131 
00132         void Tcp::add(ConnectionPtr connection) {
00133                 
00134                 boost::mutex::scoped_lock lock(this->mutex);
00135 
00136                 ConnectionMap::iterator itr = this->connections.find(connection->address);
00137 
00138                 if (itr == this->connections.end()) {
00139                         this->connections[connection->address] = connection;
00140                 }
00141         }
00142 
00143         void Tcp::remove(const NetAddress &address) {
00144                 
00145                 boost::mutex::scoped_lock lock(this->mutex);
00146 
00147                 ConnectionMap::iterator itr = this->connections.find(address);
00148 
00149                 if (itr != this->connections.end()) {
00150                         this->connections.erase(itr);
00151                 }
00152         }
00153 
00154         NetAddress Tcp::getLocalAddress() {
00155                 if (this->acceptor.is_open()) {
00156 //std::cout << "XXXXY get local address " <<
00157 //                      NetAddress(this->address.getSpicaSpecific(), this->acceptor.local_endpoint()) << std::endl;
00158                         return NetAddress(this->address.getSpicaSpecific(), this->acceptor.local_endpoint());
00159 
00160                 } else if (this->socket.is_open()) {
00161 //std::cout << "XXXXY get local address socket " << 
00162 //                      NetAddress(this->address.getSpicaSpecific(), this->socket.local_endpoint()) << std::endl;
00163                         return NetAddress(this->address.getSpicaSpecific(), this->socket.local_endpoint());
00164                 }
00165 
00166 //std::cout << "XXXXY get local address not" << std::endl;
00167                 return this->address;
00168         }
00169 
00170         void Tcp::open() {
00171                 this->acceptor.open(this->endpoint.protocol());
00172         }
00173 
00174         void Tcp::open(const NetAddress &a) {
00175                 this->address = a;
00176         }
00177 
00178         void Tcp::bind() {
00179                 this->acceptor.bind(this->endpoint);
00180         }
00181 
00182         void Tcp::reuseAddress(bool on) {
00183                 if (this->acceptor.is_open()) {
00184                         this->acceptor.set_option(asio::socket_base::reuse_address(on));
00185 
00186                 } else if (this->socket.is_open()) {
00187                         this->socket.set_option(asio::socket_base::reuse_address(on));
00188                 }
00189         }
00190 
00191         bool Tcp::reuseAddress() {
00192                 asio::socket_base::reuse_address result;
00193                 if (this->acceptor.is_open()) {
00194                         this->acceptor.get_option(result);
00195                 } else if (this->socket.is_open()) {
00196                         this->socket.get_option(result);
00197                 }
00198                 return result.value();
00199         }
00200 
00201         void Tcp::enableLoopback(bool) {
00202         }
00203 
00204         bool Tcp::enableLoopback() {
00205                 return false;
00206         }
00207 
00208         void Tcp::enableBroadcast(bool) {
00209         }
00210 
00211         bool Tcp::enableBroadcast() {
00212                 return false;
00213         }
00214 
00215         void Tcp::multicastHops(int) {
00216         }
00217 
00218         int Tcp::multicastHops() {
00219                 return 0;
00220         }
00221 
00222         void Tcp::joinGroup(const asio::ip::address &) {
00223         }
00224 
00225         void Tcp::leaveGroup(const asio::ip::address &) {
00226         }
00227 
00228         void Tcp::accept() throw(NetException) {
00229 
00230                 this->acceptor.listen();
00231 
00232                 if (this->address.getPort() == 0) {
00233                         if (this->acceptor.is_open()) {
00234                                 this->address.setPort(this->acceptor.local_endpoint().port());
00235                         } else if (this->socket.is_open()) {
00236                                 this->address.setPort(this->socket.local_endpoint().port());
00237                         }
00238                 }
00239 
00240                 this->mode = mode_accept;
00241                 this->opened = true;
00242 
00243                 ConnectionPtr connection = ConnectionPtr(new Connection(*this->service, this));
00244 
00245                 this->acceptor.async_accept(connection->getSocket(), this->remoteEndpoint,
00246                                 this->strand.wrap(boost::bind(&Tcp::handleAccept,
00247                                                 this, connection, asio::placeholders::error)));
00248         }
00249 
00250         void Tcp::connect() throw(NetException) {
00251 
00252                 this->mode = mode_connect;
00253                 this->opened = true;
00254 
00255                 this->socket.async_connect(this->endpoint,
00256                                 this->strand.wrap(boost::bind(&Tcp::handleConnect,
00257                                                 this, asio::placeholders::error)));
00258         }
00259 
00260         void Tcp::close() throw(NetException) {
00261 
00262                 if (!this->opened) return;
00263 
00264                 this->opened = false;
00265 
00266                 boost::mutex::scoped_lock lock(this->mutex);
00267 
00268                 if (this->mode == mode_accept) {
00269 
00270                         for (ConnectionMap::iterator itr = this->connections.begin();
00271                                  itr != this->connections.end(); itr++)
00272                         {
00273                                 itr->second->stop();
00274                         }
00275 
00276                         this->acceptor.cancel();
00277                         this->acceptor.close();
00278 
00279                 } else if (mode == mode_connect) {
00280 
00281                         this->socket.cancel();
00282                         this->socket.close();
00283                 }
00284         }
00285 
00286         void Tcp::close(const NetAddress &address) throw(NetException) {
00287 
00288                 ConnectionMap::iterator itr = this->connections.find(address);
00289 
00290                 if (itr != this->connections.end()) {
00291                         itr->second->stop();
00292                 }
00293         }
00294 
00295         bool Tcp::isOpen() {
00296 
00297                 if (this->mode == mode_accept) {
00298 
00299                         return this->acceptor.is_open();
00300 
00301                 } else if (this->mode == mode_connect) {
00302                         return this->socket.is_open();
00303                 }
00304 
00305                 return false;
00306         }
00307 
00308         size_t Tcp::receive(char *data, size_t length) {
00309 
00310                 if (this->mode == mode_connect) {
00311                         return this->socket.read_some(asio::buffer(data, length));
00312                 }
00313 
00314                 return -1;
00315         }
00316 
00317         size_t Tcp::receive(char *data, size_t length, NetAddress &remote) {
00318 
00319                 if (this->mode == mode_connect) {
00320                         remote = this->address;
00321                         return this->socket.read_some(asio::buffer(data, length));
00322                 }
00323 
00324                 return -1;
00325         }
00326 
00327         void Tcp::send(const char *data, size_t length) {
00328 
00329                 if (this->mode == mode_connect) {
00330 
00331                         if (this->async) {
00332                                 this->socket.async_send(asio::buffer(data, length),
00333                                                 this->strand.wrap(
00334                                                                 boost::bind(&Tcp::handleSend, this,
00335                                                                                 asio::placeholders::error,
00336                                                                                 asio::placeholders::bytes_transferred)));
00337                         } else {
00338                                 this->socket.send(asio::buffer(data, length));
00339                         }
00340                 }
00341         }
00342 
00343         void Tcp::send(const char *data, size_t length, const NetAddress &remote) {
00344 
00345                 if (this->mode == mode_accept) {
00346 
00347                         boost::mutex::scoped_lock lock(mutex);
00348 
00349                         for (ConnectionMap::iterator itr = this->connections.begin();
00350                                  itr != this->connections.end(); itr++)
00351                         {
00352                                 if (itr->first == remote) {
00353 //std::cout << "send to " << remote << std::endl;
00354                                         if (this->async) {
00355                                                 itr->second->socket.async_send(asio::buffer(data, length),
00356                                                                 this->strand.wrap(
00357                                                                                 boost::bind(&Tcp::handleSend, this,
00358                                                                                                 asio::placeholders::error,
00359                                                                                                 asio::placeholders::bytes_transferred)));
00360                                         } else {
00361 //                                      std::cout << itr->second->socket.send(asio::buffer(data, length)) << std::endl;
00362                                         itr->second->socket.send(asio::buffer(data, length));
00363                                         }
00364 
00365                                         break;
00366                                 }
00367                         }
00368 
00369                 } else if (this->mode == mode_connect) {
00370 
00371                         if (this->async) {
00372                                 this->socket.async_send(asio::buffer(data, length),
00373                                                 this->strand.wrap(
00374                                                                 boost::bind(&Tcp::handleSend, this,
00375                                                                                 asio::placeholders::error,
00376                                                                                 asio::placeholders::bytes_transferred)));
00377                         } else {
00378                                 this->socket.send(asio::buffer(data, length));
00379                         }
00380                 }
00381         }
00382 
00383         void Tcp::handleAccept(ConnectionPtr connection, const asio::error_code& error) {
00384 
00385                 if (!error) {
00386 
00387                         connection->start(NetAddress(ss_none, this->remoteEndpoint));
00388 
00389                         add(connection);
00390 
00391                         if (this->peerAdded) {
00392                                 this->peerAdded(connection->address);
00393                         }
00394 
00395                         ConnectionPtr connection = ConnectionPtr(new Connection(*this->service, this));
00396 
00397                         this->acceptor.async_accept(connection->getSocket(), this->remoteEndpoint,
00398                                         this->strand.wrap(
00399                                                         boost::bind(&Tcp::handleAccept, this,
00400                                                                         connection, asio::placeholders::error)));
00401 
00402                         return;
00403                 }
00404         }
00405 
00406         void Tcp::handleConnect(const asio::error_code& error) {
00407 
00408                 if (!error) {
00409 
00410                         this->socket.set_option(asio::ip::tcp::no_delay(true));
00411 
00412                         if (this->connectionEstablished) {
00413                                 this->connectionEstablished(this->address);
00414                         }
00415 
00416                         if (this->async) {
00417                                 this->socket.async_read_some(asio::buffer(this->buffer, sizeof(this->buffer)),
00418                                                 this->strand.wrap(
00419                                                                 boost::bind(&Tcp::handleRead, this,
00420                                                                         asio::placeholders::error,
00421                                                                         asio::placeholders::bytes_transferred)));
00422                         }
00423                                 
00424                 } else if (this->opened) {
00425 
00426                         asio::deadline_timer timer(*this->service, boost::posix_time::milliseconds(250));
00427                         timer.wait();
00428 
00429                         this->socket.async_connect(this->endpoint,
00430                                         this->strand.wrap(
00431                                                         boost::bind(&Tcp::handleConnect, this,
00432                                                                         asio::placeholders::error)));
00433                 }
00434         }
00435 
00436         void Tcp::handleSend(const asio::error_code&, size_t) {
00437 
00438 //              if (error) {
00439 //                      std::cerr << "Tcp: unable to send data: " << error.message() << std::endl;
00440 //              }
00441         }
00442 
00443         void Tcp::handleRead(const asio::error_code& error, size_t count) {
00444 //              std::cout << "Read" << std::endl;
00445 
00446                 bool socketFailure = false;
00447                 
00448                 if (!error) {
00449 
00450                         NetAddressPtr address;
00451 
00452                         try {
00453 
00454                                 address = NetAddress::create(
00455                                                 this->address.getSpicaSpecific(),
00456                                                 this->socket.remote_endpoint());
00457 
00458                         } catch (const std::exception &e) {
00459                                 std::cout << e.what() << std::endl;
00460                                 socketFailure = true;
00461                         }
00462 
00463                         if (!socketFailure) {
00464 //std::cout << " XXXXY geminga: " << count << ": " << std::string(this->buffer, count) << " " << *address << std::endl;
00465 
00466 //try {
00467 
00468 //std::cout << " XXXXY geminga: try " << std::endl;
00469                                 this->signalReceive(this->buffer, count, address);
00470 //std::cout << " XXXXY geminga: done " << std::endl;
00471 //} catch (const std::exception &e) {
00472 //std::cout << " XXXXY geminga: " << e.what() << std::endl;
00473 //}
00474 
00475 //std::cout << " XXXXY geminga: " << this->async << std::endl;
00476                                 if (this->async) {
00477 //std::cout << " XXXXY geminga: " << std::endl;
00478                                         this->socket.async_read_some(asio::buffer(this->buffer, sizeof(this->buffer)),
00479                                                         this->strand.wrap(
00480                                                                         boost::bind(&Tcp::handleRead, this,
00481                                                                                 asio::placeholders::error,
00482                                                                                 asio::placeholders::bytes_transferred)));
00483                                 }
00484                                 
00485                                 return;
00486                         }
00487                 }
00488 
00489                 if (this->mode == mode_accept) {
00490 
00491 //              std::cerr << "Tcp: error receiving from " << address << ": " << error.message() << std::endl;
00492 
00493                         close();
00494 
00495 //              std::cerr << "Tcp: Connection closed" << std::endl;
00496                 } else if (this->opened) {
00497 
00498                         if (this->connectionLost) {
00499                                 this->connectionLost(this->address);
00500                         }
00501 
00502                         this->socket.close();
00503 
00504                         asio::deadline_timer timer(*this->service, boost::posix_time::milliseconds(250));
00505                         timer.wait();
00506 
00507                         this->socket.async_connect(this->endpoint,
00508                                         this->strand.wrap(
00509                                                         boost::bind(&Tcp::handleConnect, this,
00510                                                                         asio::placeholders::error)));
00511                 }
00512         }
00513 
00514 } } }
00515 
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines


Castor
Author(s): Carpe Noctem
autogenerated on Fri Nov 8 2013 11:05:39