00001 #include "Udp.h"
00002
00003 #include <boost/thread/xtime.hpp>
00004
00005 namespace castor { namespace net { namespace channels {
00006
00007 Udp::Udp(asio::io_service &service) :
00008 CastorChannel(service), socket(service), endpoint(), remoteEndpoint(),
00009 cond(), monitor(), localMcastAddress(), localMcastAddressHack(false)
00010 {
00011 }
00012
00013 Udp::Udp(asio::io_service &service, const NetAddress &a) :
00014 CastorChannel(service), socket(service), endpoint(a.getAddress(), a.getPort()),
00015 remoteEndpoint(), cond(), monitor(), localMcastAddress(), localMcastAddressHack(false)
00016 {
00017 this->address = a;
00018
00019 }
00020
00021 Udp::~Udp() {
00022 }
00023
00024 NetAddress Udp::getLocalAddress() {
00025 if (this->address.isMulticast()) {
00026
00027 if (!this->localMcastAddress) {
00028
00029 boost::mutex::scoped_lock lock(this->monitor);
00030
00031 enableLoopback(true);
00032
00033 this->localMcastAddressHack = true;
00034
00035 std::cout << "HACK: Determining local mcast address" << std::endl;
00036 send("", 0);
00037
00038 boost::xtime xt;
00039 boost::xtime_get(&xt, boost::TIME_UTC);
00040 xt.nsec += 10000000;
00041
00042 while ((!this->localMcastAddress) && (this->cond.timed_wait(lock, xt))) {
00043 std::cout << "HACK retry" << std::endl;
00044 send("", 0);
00045 }
00046 std::cout << "HACK: Determining done" << std::endl;
00047 enableLoopback(false);
00048 }
00049
00050 return *this->localMcastAddress;
00051 }
00052
00053 return NetAddress(this->address.getSpicaSpecific(), this->socket.local_endpoint());
00054 }
00055
00056 void Udp::setAsync(bool async) {
00057 this->async = async;
00058
00059 if ((async) && (isOpen())) {
00060 this->socket.async_receive_from(getBuffer(), this->remoteEndpoint,
00061 this->strand.wrap(boost::bind(&Udp::handleReceive, this,
00062 asio::placeholders::error,
00063 asio::placeholders::bytes_transferred)));
00064 }
00065 }
00066
00067 void Udp::open() {
00068 this->socket.open(this->endpoint.protocol());
00069 }
00070
00071 void Udp::open(const NetAddress &a) {
00072 this->address = a;
00073 this->endpoint = asio::ip::udp::endpoint(a.getAddress(), a.getPort());
00074 this->socket.open(this->endpoint.protocol());
00075 }
00076
00077 void Udp::bind() {
00078 if (this->address.isMulticast()) {
00079 if (this->address.getAddress().is_v6()) {
00080 this->socket.bind(asio::ip::udp::endpoint(asio::ip::address_v6::any(), this->address.getPort()));
00081 } else {
00082 this->socket.bind(asio::ip::udp::endpoint(asio::ip::address_v4::any(), this->address.getPort()));
00083 }
00084 } else {
00085 this->socket.bind(this->endpoint);
00086 }
00087 }
00088
00089 void Udp::reuseAddress(bool on) {
00090 this->socket.set_option(asio::socket_base::reuse_address(on));
00091 }
00092
00093 bool Udp::reuseAddress() {
00094 asio::socket_base::reuse_address result;
00095 this->socket.get_option(result);
00096 return result.value();
00097 }
00098
00099 void Udp::enableLoopback(bool on) {
00100 this->socket.set_option(asio::ip::multicast::enable_loopback(on));
00101 }
00102
00103 bool Udp::enableLoopback() {
00104 asio::ip::multicast::enable_loopback result;
00105 this->socket.get_option(result);
00106 return result.value();
00107 }
00108
00109 void Udp::enableBroadcast(bool on) {
00110 this->socket.set_option(asio::socket_base::broadcast(on));
00111 }
00112
00113 bool Udp::enableBroadcast() {
00114 asio::socket_base::broadcast result;
00115 this->socket.get_option(result);
00116 return result.value();
00117 }
00118
00119
00120 void Udp::multicastHops(int hops) {
00121 this->socket.set_option(asio::ip::multicast::hops(hops));
00122 }
00123
00124 int Udp::multicastHops() {
00125 asio::ip::multicast::hops result;
00126 this->socket.get_option(result);
00127 return result.value();
00128 }
00129
00130 void Udp::joinGroup(const asio::ip::address &address) {
00131
00132
00133 this->socket.set_option(asio::ip::multicast::join_group(address));
00134 this->address.setAddress(address);
00135 this->endpoint.address(address);
00136 }
00137
00138 void Udp::leaveGroup(const asio::ip::address &address) {
00139 this->socket.set_option(asio::ip::multicast::leave_group(address));
00140 }
00141
00142 void Udp::accept() throw(NetException) {
00143
00144 this->mode = mode_accept;
00145
00146 if (this->address.getPort() == 0) {
00147 this->address.setPort(this->socket.local_endpoint().port());
00148 }
00149
00150 if (this->async) {
00151 this->socket.async_receive_from(getBuffer(), this->remoteEndpoint,
00152 this->strand.wrap(boost::bind(&Udp::handleReceive, this,
00153 asio::placeholders::error,
00154 asio::placeholders::bytes_transferred)));
00155 }
00156 }
00157
00158 void Udp::connect() throw(NetException) {
00159
00160 this->mode = mode_connect;
00161
00162 asio::ip::udp::endpoint ep;
00163 if (this->address.getAddress().is_v4()) {
00164 ep = asio::ip::udp::endpoint(asio::ip::address_v4::any(), 0);
00165 } else if (this->address.getAddress().is_v6()) {
00166 ep = asio::ip::udp::endpoint(asio::ip::address_v6::any(), 0);
00167 }
00168
00169 this->socket.bind(ep);
00170
00171
00172
00173 if (this->async) {
00174 this->socket.async_receive_from(getBuffer(), this->remoteEndpoint,
00175 this->strand.wrap(boost::bind(&Udp::handleReceive, this,
00176 asio::placeholders::error,
00177 asio::placeholders::bytes_transferred)));
00178 }
00179 }
00180
00181 void Udp::close() throw(NetException) {
00182
00183 if (this->closed) return;
00184
00185 this->closed = true;
00186
00187 this->socket.cancel();
00188 this->socket.close();
00189 }
00190
00191 void Udp::close(const NetAddress &) throw(NetException) {
00192 }
00193
00194 bool Udp::isOpen() {
00195 return this->socket.is_open();
00196 }
00197
00198
00199
00200
00201
00202
00203
00204
00205
00206
00207
00208
00209
00210
00211
00212
00213
00214
00215
00216
00217
00218
00219
00220
00221
00222
00223
00224
00225
00226
00227
00228
00229
00230
00231
00232
00233
00234
00235
00236
00237
00238
00239 void Udp::handleReceive(const asio::error_code& error, size_t count) {
00240
00241
00242
00243 if ((error) && (count == 0) && (!this->localMcastAddress) && (this->localMcastAddressHack)) {
00244
00245 NetAddressPtr address = NetAddress::create(
00246 this->address.getSpicaSpecific(),
00247 this->remoteEndpoint);
00248
00249 std::cout << "HACK: Determined local mcast address " << *address << std::endl;
00250 this->localMcastAddress = address;
00251 this->cond.notify_one();
00252
00253 this->localMcastAddressHack = false;
00254
00255 if (this->async) {
00256 this->socket.async_receive_from(
00257 asio::buffer(this->buffer, sizeof(this->buffer)), this->remoteEndpoint,
00258 this->strand.wrap(boost::bind(&Udp::handleReceive, this,
00259 asio::placeholders::error,
00260 asio::placeholders::bytes_transferred)));
00261 }
00262 }
00263
00264 if (!error) {
00265
00266 NetAddressPtr address = NetAddress::create(
00267 this->address.getSpicaSpecific(),
00268 this->remoteEndpoint);
00269
00270 if (count > 0) {
00271 this->signalReceive(this->buffer, count, address);
00272 }
00273
00274 if (this->async) {
00275 this->socket.async_receive_from(
00276 asio::buffer(this->buffer, sizeof(this->buffer)), this->remoteEndpoint,
00277 this->strand.wrap(boost::bind(&Udp::handleReceive, this,
00278 asio::placeholders::error,
00279 asio::placeholders::bytes_transferred)));
00280 }
00281
00282 return;
00283 }
00284
00285
00286 }
00287
00288 } } }
00289