$search
00001 #include "Tcp.h" 00002 00003 namespace castor { namespace net { namespace channels { 00004 00005 Tcp::Connection::Connection(asio::io_service &service, Tcp *tcp) : 00006 address(), socket(service), channel(tcp), closed(false), buffer() 00007 { 00008 } 00009 00010 Tcp::Connection::Connection(const Connection &other) : 00011 address(other.address), socket(other.channel->getIoService()), 00012 channel(other.channel), closed(false), buffer() 00013 { 00014 } 00015 00016 Tcp::Connection::~Connection() { 00017 } 00018 00019 void Tcp::Connection::handleRead(const asio::error_code& error, size_t count) { 00020 // std::cout << "Read " << count << std::endl; 00021 00022 bool socketFailure = false; 00023 00024 if ((!error) && (!this->closed)) { 00025 00026 NetAddressPtr address; 00027 00028 try { 00029 00030 address = NetAddress::create( 00031 this->channel->getAddress().getSpicaSpecific(), 00032 this->socket.remote_endpoint()); 00033 00034 } catch (const std::exception &e) { 00035 std::cout << "Error: " << e.what() << " closing socket." << std::endl; 00036 socketFailure = true; 00037 } 00038 00039 if (!socketFailure) { 00040 00041 //std::cout << " XXXXX geminga: " << count << ": " << std::string(this->buffer, count) << " " << *address << std::endl; 00042 00043 this->channel->signalReceive(this->buffer, count, address); 00044 00045 if (this->channel->async) { 00046 this->socket.async_read_some( 00047 asio::buffer(this->buffer, sizeof(this->buffer)), 00048 this->channel->strand.wrap( 00049 boost::bind(&Connection::handleRead, this, 00050 asio::placeholders::error, 00051 asio::placeholders::bytes_transferred))); 00052 } 00053 00054 return; 00055 } 00056 } 00057 00058 if (this->channel->peerRemoved) { 00059 // std::cout << error.message() << " " << this->closed << std::endl; 00060 this->channel->peerRemoved(this->address); 00061 } 00062 00063 this->channel->remove(this->address); 00064 } 00065 00066 Tcp::Connection &Tcp::Connection::operator=(const Tcp::Connection &other) { 00067 this->address = other.address; 00068 this->channel = other.channel; 00069 return *this; 00070 } 00071 00072 asio::ip::tcp::socket &Tcp::Connection::getSocket() { 00073 return this->socket; 00074 } 00075 00076 void Tcp::Connection::start(const NetAddress &address) { 00077 00078 this->address = address; 00079 // *NetAddress::create( 00080 // this->channel->getAddress().getSpicaSpecific(), 00081 // this->socket.remote_endpoint()); 00082 00083 this->socket.set_option(asio::ip::tcp::no_delay(true)); 00084 00085 if (this->channel->async) { 00086 this->socket.async_read_some(asio::buffer(this->buffer, sizeof(this->buffer)), 00087 this->channel->strand.wrap( 00088 boost::bind(&Connection::handleRead, this, 00089 asio::placeholders::error, 00090 asio::placeholders::bytes_transferred))); 00091 } 00092 } 00093 00094 void Tcp::Connection::stop() { 00095 00096 if (this->closed) return; 00097 00098 this->socket.cancel(); 00099 00100 this->socket.close(); 00101 00102 this->closed = true; 00103 } 00104 00105 Tcp::Tcp(asio::io_service &service) : 00106 CastorChannel(service), connections(), service(&service), acceptor(service), 00107 socket(service), endpoint(), mutex(), opened(false) 00108 { 00109 } 00110 00111 Tcp::Tcp(asio::io_service &service, const NetAddress &a) : 00112 CastorChannel(service), connections(), service(&service), acceptor(service), 00113 socket(service), endpoint(a.getAddress(), a.getPort()), mutex(), opened(false) 00114 { 00115 this->address = a; 00116 } 00117 00118 Tcp::~Tcp() { 00119 } 00120 00121 void Tcp::setAsync(bool async) { 00122 this->async = async; 00123 if ((async) && (isOpen())) { 00124 this->socket.async_read_some(asio::buffer(this->buffer, sizeof(this->buffer)), 00125 this->strand.wrap( 00126 boost::bind(&Tcp::handleRead, this, 00127 asio::placeholders::error, 00128 asio::placeholders::bytes_transferred))); 00129 } 00130 } 00131 00132 void Tcp::add(ConnectionPtr connection) { 00133 00134 boost::mutex::scoped_lock lock(this->mutex); 00135 00136 ConnectionMap::iterator itr = this->connections.find(connection->address); 00137 00138 if (itr == this->connections.end()) { 00139 this->connections[connection->address] = connection; 00140 } 00141 } 00142 00143 void Tcp::remove(const NetAddress &address) { 00144 00145 boost::mutex::scoped_lock lock(this->mutex); 00146 00147 ConnectionMap::iterator itr = this->connections.find(address); 00148 00149 if (itr != this->connections.end()) { 00150 this->connections.erase(itr); 00151 } 00152 } 00153 00154 NetAddress Tcp::getLocalAddress() { 00155 if (this->acceptor.is_open()) { 00156 //std::cout << "XXXXY get local address " << 00157 // NetAddress(this->address.getSpicaSpecific(), this->acceptor.local_endpoint()) << std::endl; 00158 return NetAddress(this->address.getSpicaSpecific(), this->acceptor.local_endpoint()); 00159 00160 } else if (this->socket.is_open()) { 00161 //std::cout << "XXXXY get local address socket " << 00162 // NetAddress(this->address.getSpicaSpecific(), this->socket.local_endpoint()) << std::endl; 00163 return NetAddress(this->address.getSpicaSpecific(), this->socket.local_endpoint()); 00164 } 00165 00166 //std::cout << "XXXXY get local address not" << std::endl; 00167 return this->address; 00168 } 00169 00170 void Tcp::open() { 00171 this->acceptor.open(this->endpoint.protocol()); 00172 } 00173 00174 void Tcp::open(const NetAddress &a) { 00175 this->address = a; 00176 } 00177 00178 void Tcp::bind() { 00179 this->acceptor.bind(this->endpoint); 00180 } 00181 00182 void Tcp::reuseAddress(bool on) { 00183 if (this->acceptor.is_open()) { 00184 this->acceptor.set_option(asio::socket_base::reuse_address(on)); 00185 00186 } else if (this->socket.is_open()) { 00187 this->socket.set_option(asio::socket_base::reuse_address(on)); 00188 } 00189 } 00190 00191 bool Tcp::reuseAddress() { 00192 asio::socket_base::reuse_address result; 00193 if (this->acceptor.is_open()) { 00194 this->acceptor.get_option(result); 00195 } else if (this->socket.is_open()) { 00196 this->socket.get_option(result); 00197 } 00198 return result.value(); 00199 } 00200 00201 void Tcp::enableLoopback(bool) { 00202 } 00203 00204 bool Tcp::enableLoopback() { 00205 return false; 00206 } 00207 00208 void Tcp::enableBroadcast(bool) { 00209 } 00210 00211 bool Tcp::enableBroadcast() { 00212 return false; 00213 } 00214 00215 void Tcp::multicastHops(int) { 00216 } 00217 00218 int Tcp::multicastHops() { 00219 return 0; 00220 } 00221 00222 void Tcp::joinGroup(const asio::ip::address &) { 00223 } 00224 00225 void Tcp::leaveGroup(const asio::ip::address &) { 00226 } 00227 00228 void Tcp::accept() throw(NetException) { 00229 00230 this->acceptor.listen(); 00231 00232 if (this->address.getPort() == 0) { 00233 if (this->acceptor.is_open()) { 00234 this->address.setPort(this->acceptor.local_endpoint().port()); 00235 } else if (this->socket.is_open()) { 00236 this->address.setPort(this->socket.local_endpoint().port()); 00237 } 00238 } 00239 00240 this->mode = mode_accept; 00241 this->opened = true; 00242 00243 ConnectionPtr connection = ConnectionPtr(new Connection(*this->service, this)); 00244 00245 this->acceptor.async_accept(connection->getSocket(), this->remoteEndpoint, 00246 this->strand.wrap(boost::bind(&Tcp::handleAccept, 00247 this, connection, asio::placeholders::error))); 00248 } 00249 00250 void Tcp::connect() throw(NetException) { 00251 00252 this->mode = mode_connect; 00253 this->opened = true; 00254 00255 this->socket.async_connect(this->endpoint, 00256 this->strand.wrap(boost::bind(&Tcp::handleConnect, 00257 this, asio::placeholders::error))); 00258 } 00259 00260 void Tcp::close() throw(NetException) { 00261 00262 if (!this->opened) return; 00263 00264 this->opened = false; 00265 00266 boost::mutex::scoped_lock lock(this->mutex); 00267 00268 if (this->mode == mode_accept) { 00269 00270 for (ConnectionMap::iterator itr = this->connections.begin(); 00271 itr != this->connections.end(); itr++) 00272 { 00273 itr->second->stop(); 00274 } 00275 00276 this->acceptor.cancel(); 00277 this->acceptor.close(); 00278 00279 } else if (mode == mode_connect) { 00280 00281 this->socket.cancel(); 00282 this->socket.close(); 00283 } 00284 } 00285 00286 void Tcp::close(const NetAddress &address) throw(NetException) { 00287 00288 ConnectionMap::iterator itr = this->connections.find(address); 00289 00290 if (itr != this->connections.end()) { 00291 itr->second->stop(); 00292 } 00293 } 00294 00295 bool Tcp::isOpen() { 00296 00297 if (this->mode == mode_accept) { 00298 00299 return this->acceptor.is_open(); 00300 00301 } else if (this->mode == mode_connect) { 00302 return this->socket.is_open(); 00303 } 00304 00305 return false; 00306 } 00307 00308 size_t Tcp::receive(char *data, size_t length) { 00309 00310 if (this->mode == mode_connect) { 00311 return this->socket.read_some(asio::buffer(data, length)); 00312 } 00313 00314 return -1; 00315 } 00316 00317 size_t Tcp::receive(char *data, size_t length, NetAddress &remote) { 00318 00319 if (this->mode == mode_connect) { 00320 remote = this->address; 00321 return this->socket.read_some(asio::buffer(data, length)); 00322 } 00323 00324 return -1; 00325 } 00326 00327 void Tcp::send(const char *data, size_t length) { 00328 00329 if (this->mode == mode_connect) { 00330 00331 if (this->async) { 00332 this->socket.async_send(asio::buffer(data, length), 00333 this->strand.wrap( 00334 boost::bind(&Tcp::handleSend, this, 00335 asio::placeholders::error, 00336 asio::placeholders::bytes_transferred))); 00337 } else { 00338 this->socket.send(asio::buffer(data, length)); 00339 } 00340 } 00341 } 00342 00343 void Tcp::send(const char *data, size_t length, const NetAddress &remote) { 00344 00345 if (this->mode == mode_accept) { 00346 00347 boost::mutex::scoped_lock lock(mutex); 00348 00349 for (ConnectionMap::iterator itr = this->connections.begin(); 00350 itr != this->connections.end(); itr++) 00351 { 00352 if (itr->first == remote) { 00353 //std::cout << "send to " << remote << std::endl; 00354 if (this->async) { 00355 itr->second->socket.async_send(asio::buffer(data, length), 00356 this->strand.wrap( 00357 boost::bind(&Tcp::handleSend, this, 00358 asio::placeholders::error, 00359 asio::placeholders::bytes_transferred))); 00360 } else { 00361 // std::cout << itr->second->socket.send(asio::buffer(data, length)) << std::endl; 00362 itr->second->socket.send(asio::buffer(data, length)); 00363 } 00364 00365 break; 00366 } 00367 } 00368 00369 } else if (this->mode == mode_connect) { 00370 00371 if (this->async) { 00372 this->socket.async_send(asio::buffer(data, length), 00373 this->strand.wrap( 00374 boost::bind(&Tcp::handleSend, this, 00375 asio::placeholders::error, 00376 asio::placeholders::bytes_transferred))); 00377 } else { 00378 this->socket.send(asio::buffer(data, length)); 00379 } 00380 } 00381 } 00382 00383 void Tcp::handleAccept(ConnectionPtr connection, const asio::error_code& error) { 00384 00385 if (!error) { 00386 00387 connection->start(NetAddress(ss_none, this->remoteEndpoint)); 00388 00389 add(connection); 00390 00391 if (this->peerAdded) { 00392 this->peerAdded(connection->address); 00393 } 00394 00395 ConnectionPtr connection = ConnectionPtr(new Connection(*this->service, this)); 00396 00397 this->acceptor.async_accept(connection->getSocket(), this->remoteEndpoint, 00398 this->strand.wrap( 00399 boost::bind(&Tcp::handleAccept, this, 00400 connection, asio::placeholders::error))); 00401 00402 return; 00403 } 00404 } 00405 00406 void Tcp::handleConnect(const asio::error_code& error) { 00407 00408 if (!error) { 00409 00410 this->socket.set_option(asio::ip::tcp::no_delay(true)); 00411 00412 if (this->connectionEstablished) { 00413 this->connectionEstablished(this->address); 00414 } 00415 00416 if (this->async) { 00417 this->socket.async_read_some(asio::buffer(this->buffer, sizeof(this->buffer)), 00418 this->strand.wrap( 00419 boost::bind(&Tcp::handleRead, this, 00420 asio::placeholders::error, 00421 asio::placeholders::bytes_transferred))); 00422 } 00423 00424 } else if (this->opened) { 00425 00426 asio::deadline_timer timer(*this->service, boost::posix_time::milliseconds(250)); 00427 timer.wait(); 00428 00429 this->socket.async_connect(this->endpoint, 00430 this->strand.wrap( 00431 boost::bind(&Tcp::handleConnect, this, 00432 asio::placeholders::error))); 00433 } 00434 } 00435 00436 void Tcp::handleSend(const asio::error_code&, size_t) { 00437 00438 // if (error) { 00439 // std::cerr << "Tcp: unable to send data: " << error.message() << std::endl; 00440 // } 00441 } 00442 00443 void Tcp::handleRead(const asio::error_code& error, size_t count) { 00444 // std::cout << "Read" << std::endl; 00445 00446 bool socketFailure = false; 00447 00448 if (!error) { 00449 00450 NetAddressPtr address; 00451 00452 try { 00453 00454 address = NetAddress::create( 00455 this->address.getSpicaSpecific(), 00456 this->socket.remote_endpoint()); 00457 00458 } catch (const std::exception &e) { 00459 std::cout << e.what() << std::endl; 00460 socketFailure = true; 00461 } 00462 00463 if (!socketFailure) { 00464 //std::cout << " XXXXY geminga: " << count << ": " << std::string(this->buffer, count) << " " << *address << std::endl; 00465 00466 //try { 00467 00468 //std::cout << " XXXXY geminga: try " << std::endl; 00469 this->signalReceive(this->buffer, count, address); 00470 //std::cout << " XXXXY geminga: done " << std::endl; 00471 //} catch (const std::exception &e) { 00472 //std::cout << " XXXXY geminga: " << e.what() << std::endl; 00473 //} 00474 00475 //std::cout << " XXXXY geminga: " << this->async << std::endl; 00476 if (this->async) { 00477 //std::cout << " XXXXY geminga: " << std::endl; 00478 this->socket.async_read_some(asio::buffer(this->buffer, sizeof(this->buffer)), 00479 this->strand.wrap( 00480 boost::bind(&Tcp::handleRead, this, 00481 asio::placeholders::error, 00482 asio::placeholders::bytes_transferred))); 00483 } 00484 00485 return; 00486 } 00487 } 00488 00489 if (this->mode == mode_accept) { 00490 00491 // std::cerr << "Tcp: error receiving from " << address << ": " << error.message() << std::endl; 00492 00493 close(); 00494 00495 // std::cerr << "Tcp: Connection closed" << std::endl; 00496 } else if (this->opened) { 00497 00498 if (this->connectionLost) { 00499 this->connectionLost(this->address); 00500 } 00501 00502 this->socket.close(); 00503 00504 asio::deadline_timer timer(*this->service, boost::posix_time::milliseconds(250)); 00505 timer.wait(); 00506 00507 this->socket.async_connect(this->endpoint, 00508 this->strand.wrap( 00509 boost::bind(&Tcp::handleConnect, this, 00510 asio::placeholders::error))); 00511 } 00512 } 00513 00514 } } } 00515