$search
00001 #include "Udp.h" 00002 00003 #include <boost/thread/xtime.hpp> 00004 00005 namespace castor { namespace net { namespace channels { 00006 00007 Udp::Udp(asio::io_service &service) : 00008 CastorChannel(service), socket(service), endpoint(), remoteEndpoint(), 00009 cond(), monitor(), localMcastAddress(), localMcastAddressHack(false) 00010 { 00011 } 00012 00013 Udp::Udp(asio::io_service &service, const NetAddress &a) : 00014 CastorChannel(service), socket(service), endpoint(a.getAddress(), a.getPort()), 00015 remoteEndpoint(), cond(), monitor(), localMcastAddress(), localMcastAddressHack(false) 00016 { 00017 this->address = a; 00018 // this->socket.open(this->endpoint.protocol()); 00019 } 00020 00021 Udp::~Udp() { 00022 } 00023 00024 NetAddress Udp::getLocalAddress() { 00025 if (this->address.isMulticast()) { 00026 00027 if (!this->localMcastAddress) { 00028 00029 boost::mutex::scoped_lock lock(this->monitor); 00030 00031 enableLoopback(true); 00032 00033 this->localMcastAddressHack = true; 00034 00035 std::cout << "HACK: Determining local mcast address" << std::endl; 00036 send("", 0); 00037 00038 boost::xtime xt; 00039 boost::xtime_get(&xt, boost::TIME_UTC); 00040 xt.nsec += 10000000; 00041 00042 while ((!this->localMcastAddress) && (this->cond.timed_wait(lock, xt))) { 00043 std::cout << "HACK retry" << std::endl; 00044 send("", 0); 00045 } 00046 std::cout << "HACK: Determining done" << std::endl; 00047 enableLoopback(false); 00048 } 00049 00050 return *this->localMcastAddress; 00051 } 00052 00053 return NetAddress(this->address.getSpicaSpecific(), this->socket.local_endpoint()); 00054 } 00055 00056 void Udp::setAsync(bool async) { 00057 this->async = async; 00058 00059 if ((async) && (isOpen())) { 00060 this->socket.async_receive_from(getBuffer(), this->remoteEndpoint, 00061 this->strand.wrap(boost::bind(&Udp::handleReceive, this, 00062 asio::placeholders::error, 00063 asio::placeholders::bytes_transferred))); 00064 } 00065 } 00066 00067 void Udp::open() { 00068 this->socket.open(this->endpoint.protocol()); 00069 } 00070 00071 void Udp::open(const NetAddress &a) { 00072 this->address = a; 00073 this->endpoint = asio::ip::udp::endpoint(a.getAddress(), a.getPort()); 00074 this->socket.open(this->endpoint.protocol()); 00075 } 00076 00077 void Udp::bind() { 00078 if (this->address.isMulticast()) { 00079 if (this->address.getAddress().is_v6()) { 00080 this->socket.bind(asio::ip::udp::endpoint(asio::ip::address_v6::any(), this->address.getPort())); 00081 } else { 00082 this->socket.bind(asio::ip::udp::endpoint(asio::ip::address_v4::any(), this->address.getPort())); 00083 } 00084 } else { 00085 this->socket.bind(this->endpoint); 00086 } 00087 } 00088 00089 void Udp::reuseAddress(bool on) { 00090 this->socket.set_option(asio::socket_base::reuse_address(on)); 00091 } 00092 00093 bool Udp::reuseAddress() { 00094 asio::socket_base::reuse_address result; 00095 this->socket.get_option(result); 00096 return result.value(); 00097 } 00098 00099 void Udp::enableLoopback(bool on) { 00100 this->socket.set_option(asio::ip::multicast::enable_loopback(on)); 00101 } 00102 00103 bool Udp::enableLoopback() { 00104 asio::ip::multicast::enable_loopback result; 00105 this->socket.get_option(result); 00106 return result.value(); 00107 } 00108 00109 void Udp::enableBroadcast(bool on) { 00110 this->socket.set_option(asio::socket_base::broadcast(on)); 00111 } 00112 00113 bool Udp::enableBroadcast() { 00114 asio::socket_base::broadcast result; 00115 this->socket.get_option(result); 00116 return result.value(); 00117 } 00118 00119 00120 void Udp::multicastHops(int hops) { 00121 this->socket.set_option(asio::ip::multicast::hops(hops)); 00122 } 00123 00124 int Udp::multicastHops() { 00125 asio::ip::multicast::hops result; 00126 this->socket.get_option(result); 00127 return result.value(); 00128 } 00129 00130 void Udp::joinGroup(const asio::ip::address &address) { 00131 // std::cout << "Joining group " << address.to_string() << std::endl; 00132 // this->socket.set_option(asio::ip::multicast::leave_group(address)); 00133 this->socket.set_option(asio::ip::multicast::join_group(address)); 00134 this->address.setAddress(address); 00135 this->endpoint.address(address); 00136 } 00137 00138 void Udp::leaveGroup(const asio::ip::address &address) { 00139 this->socket.set_option(asio::ip::multicast::leave_group(address)); 00140 } 00141 00142 void Udp::accept() throw(NetException) { 00143 00144 this->mode = mode_accept; 00145 00146 if (this->address.getPort() == 0) { 00147 this->address.setPort(this->socket.local_endpoint().port()); 00148 } 00149 00150 if (this->async) { 00151 this->socket.async_receive_from(getBuffer(), this->remoteEndpoint, 00152 this->strand.wrap(boost::bind(&Udp::handleReceive, this, 00153 asio::placeholders::error, 00154 asio::placeholders::bytes_transferred))); 00155 } 00156 } 00157 00158 void Udp::connect() throw(NetException) { 00159 00160 this->mode = mode_connect; 00161 00162 asio::ip::udp::endpoint ep; 00163 if (this->address.getAddress().is_v4()) { 00164 ep = asio::ip::udp::endpoint(asio::ip::address_v4::any(), 0); 00165 } else if (this->address.getAddress().is_v6()) { 00166 ep = asio::ip::udp::endpoint(asio::ip::address_v6::any(), 0); 00167 } 00168 00169 this->socket.bind(ep); 00170 00171 // this->socket.open(this->endpoint.protocol()); 00172 00173 if (this->async) { 00174 this->socket.async_receive_from(getBuffer(), this->remoteEndpoint, 00175 this->strand.wrap(boost::bind(&Udp::handleReceive, this, 00176 asio::placeholders::error, 00177 asio::placeholders::bytes_transferred))); 00178 } 00179 } 00180 00181 void Udp::close() throw(NetException) { 00182 00183 if (this->closed) return; 00184 00185 this->closed = true; 00186 00187 this->socket.cancel(); 00188 this->socket.close(); 00189 } 00190 00191 void Udp::close(const NetAddress &) throw(NetException) { 00192 } 00193 00194 bool Udp::isOpen() { 00195 return this->socket.is_open(); 00196 } 00197 /* 00198 size_t Udp::receive(char *data, size_t length) { 00199 return this->socket.receive(asio::buffer(data, length)); 00200 }*/ 00201 /* 00202 size_t Udp::receive(char *data, size_t length, NetAddress &remote) { 00203 00204 asio::ip::udp::endpoint ep; 00205 00206 size_t count = this->socket.receive_from(asio::buffer(data, length), ep); 00207 00208 remote = *NetAddress::create(this->address.getSpicaSpecific(), ep); 00209 00210 return count; 00211 } 00212 */ 00213 /* void Udp::send(const char *data, size_t length) { 00214 00215 if ((this->mode == mode_connect) || (this->address.isMulticast())) { 00216 00217 try { 00218 this->socket.send_to(asio::buffer(data, length), this->endpoint); 00219 } catch (...) { 00220 // Silently drop exception 00221 } 00222 00223 } else { 00224 std::cerr << "Udp: unable to send data without remote endpoint in bind mode" << std::endl; 00225 } 00226 } 00227 */ 00228 /* void Udp::send(const char *data, size_t length, const NetAddress &remote) { 00229 00230 asio::ip::udp::endpoint ep(remote.getAddress(), remote.getPort()); 00231 00232 try { 00233 this->socket.send_to(asio::buffer(data, length), ep); 00234 } catch (...) { 00235 // Silently drop exception 00236 } 00237 } 00238 */ 00239 void Udp::handleReceive(const asio::error_code& error, size_t count) { 00240 00241 // std::cout << "geminga Udp handle receive" << std::endl; 00242 00243 if ((error) && (count == 0) && (!this->localMcastAddress) && (this->localMcastAddressHack)) { 00244 00245 NetAddressPtr address = NetAddress::create( 00246 this->address.getSpicaSpecific(), 00247 this->remoteEndpoint); 00248 00249 std::cout << "HACK: Determined local mcast address " << *address << std::endl; 00250 this->localMcastAddress = address; 00251 this->cond.notify_one(); 00252 00253 this->localMcastAddressHack = false; 00254 00255 if (this->async) { 00256 this->socket.async_receive_from( 00257 asio::buffer(this->buffer, sizeof(this->buffer)), this->remoteEndpoint, 00258 this->strand.wrap(boost::bind(&Udp::handleReceive, this, 00259 asio::placeholders::error, 00260 asio::placeholders::bytes_transferred))); 00261 } 00262 } 00263 00264 if (!error) { 00265 00266 NetAddressPtr address = NetAddress::create( 00267 this->address.getSpicaSpecific(), 00268 this->remoteEndpoint); 00269 00270 if (count > 0) { 00271 this->signalReceive(this->buffer, count, address); 00272 } 00273 00274 if (this->async) { 00275 this->socket.async_receive_from( 00276 asio::buffer(this->buffer, sizeof(this->buffer)), this->remoteEndpoint, 00277 this->strand.wrap(boost::bind(&Udp::handleReceive, this, 00278 asio::placeholders::error, 00279 asio::placeholders::bytes_transferred))); 00280 } 00281 00282 return; 00283 } 00284 00285 // std::cerr << "Udp: error receiving message: " << error.message() << std::endl; 00286 } 00287 00288 } } } 00289