00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038 #include <labust/tritech/TCPDevice.hpp>
00039 #include <labust/tritech/mtMessages.hpp>
00040
00041 #include <labust/tritech/USBLMessages.hpp>
00042
00043 #include <boost/bind.hpp>
00044 #include <boost/regex.hpp>
00045 #include <boost/archive/binary_oarchive.hpp>
00046 #include <boost/archive/binary_iarchive.hpp>
00047
00048 #include <fstream>
00049
00050 using namespace labust::tritech;
00051
00052 TCPDevice::TCPDevice(const std::string& address, uint32_t port,
00053 uint8_t device, uint8_t app_class, uint8_t priority):
00054 socket(io),
00055 address(address),
00056 port(port),
00057 ringBuffer(ringBufferSize),
00058 device(device),
00059 app_class(app_class),
00060 priority(priority)
00061 {
00062 this->_setup();
00063
00064
00065 this->start_receive(Sync);
00066 service = boost::thread(boost::bind(&boost::asio::io_service::run,&this->io));
00067 }
00068
00069 TCPDevice::~TCPDevice()
00070 {
00071 registerDevice(false);
00072 io.stop();
00073 service.join();
00074
00075 socket.close();
00076
00077 std::cout<<"Closed TCPDevice."<<std::endl;
00078 }
00079
00080 void TCPDevice::_setup()
00081 try
00082 {
00083
00084 boost::asio::ip::tcp::resolver resolver(io);
00085 std::stringstream portStr;
00086 portStr<<port;
00087 boost::asio::ip::tcp::resolver::query query(address, portStr.str());
00088
00089 boost::asio::ip::tcp::endpoint endpoint = *resolver.resolve(query);
00090 std::cout<<endpoint.address().to_string()<<", "<<address<<":"<<portStr.str()<<std::endl;
00091 socket.connect(endpoint);
00094 while (!socket.is_open())
00095 {
00096 socket.close();
00097 socket.connect(endpoint);
00098 usleep(1000*1000);
00099 }
00100
00101 if (socket.is_open())
00102 {
00103 registerDevice(true);
00104 }
00105 else
00106 throw std::runtime_error("TCPDevice::_setup : Socket not open.");
00107 }
00108 catch (std::exception& e)
00109 {
00110 std::cerr<<e.what()<<std::endl;
00111 throw std::runtime_error("TCPDevice::_setup : error while connecting to host.");
00112 }
00113
00114 void TCPDevice::registerDevice(bool attach)
00115 {
00117 boost::asio::streambuf output;
00118 std::ostream out(&output);
00119
00120 TCPRequest req;
00121 req.app_class = app_class;
00122 req.node =device;
00123 req.priority = priority;
00124
00125 if (attach)
00126 {
00127 std::cout<<"Attach to node "<<int(req.node)<<std::endl;
00128 req.command = TCPRequest::scAttachToNode;
00129 }
00130 else
00131 {
00132 std::cout<<"Detach from node "<<int(req.node)<<std::endl;
00133 req.command = TCPRequest::scDetachFromNode;
00134 }
00135
00136
00137 out<<'#';
00138 out.width(4);
00139 out.fill('0');
00140 out<<std::uppercase<<std::hex<<req.size;
00141 boost::archive::binary_oarchive dataSer(output, boost::archive::no_header);
00142 dataSer << req;
00143
00144 boost::asio::write(socket,output.data());
00145 }
00146
00147 void TCPDevice::start_receive(uint8_t state)
00148 {
00149 switch (state)
00150 {
00151 case Sync:
00152 boost::asio::async_read(socket,
00153 input.prepare(ringBufferSize),
00154 boost::bind(&TCPDevice::onSync,this,_1,_2));
00155 break;
00156
00157
00158
00159
00160
00161 }
00162 }
00163
00164 void TCPDevice::onSync(const boost::system::error_code& error, std::size_t bytes_transferred)
00165 {
00166 if (!error)
00167 {
00168 input.commit(bytes_transferred);
00169
00170 if (bytes_transferred == 1) ringBuffer.push_back(input.sbumpc());
00171 else input.sgetn(reinterpret_cast<char*>(ringBuffer.data()),bytes_transferred);
00172
00173 if (ringBuffer[0] == '@' || ringBuffer[0] == '#')
00174 {
00175 size_t len = HexHeader::length(&ringBuffer[1]);
00176 uint16_t binLength;
00177 memcpy(&binLength, &ringBuffer[5],sizeof(uint16_t));
00178
00179
00180
00181
00182
00183
00184
00185 if ((len != 0) && (len == (binLength)))
00186 {
00187 if (ringBuffer[0] == '@')
00188 {
00189 StreamPtr data(new boost::asio::streambuf());
00190
00191 data->sputc(ringBuffer[5]);
00192 data->sputc(ringBuffer[6]);
00193 data->pubseekoff(-2,std::ios_base::cur);
00194 boost::asio::async_read(socket,
00195 data->prepare(len-2),
00196 boost::bind(&TCPDevice::onHeader,this,data,_1,_2));
00197 return;
00198 }
00199 else
00200 {
00201 std::cout<<"Seanet command arrived."<<std::endl;
00202 char* data=new char[len-2];
00203 boost::asio::read(socket,boost::asio::buffer(data, len-2));
00204 delete[] data;
00205 this->start_receive(Sync);
00206 return;
00207 }
00208 }
00209 }
00210 std::cerr<<"Out of sync:"<<std::endl;
00211
00212 for(int i=0; i<ringBuffer.size(); ++i)
00213 std::cerr<<int(uint8_t(ringBuffer[i]))<<",";
00214 std::cerr<<std::endl;
00215
00216
00217 ringBuffer.erase(ringBuffer.begin());
00218 boost::asio::async_read(socket,
00219 input.prepare(1),
00220 boost::bind(&TCPDevice::onSync,this,_1,_2));
00221 }
00222 else
00223 {
00224 std::cerr<<error.message()<<std::endl;
00225 this->start_receive(Sync);
00226 }
00227 }
00228
00229 void TCPDevice::onHeader(StreamPtr data, const boost::system::error_code& error, std::size_t bytes_transferred)
00230 {
00231 if (!error)
00232 {
00233 data->commit(bytes_transferred);
00234 try
00235 {
00236 boost::archive::binary_iarchive dataSer(*data, boost::archive::no_header);
00237 TCONMsgPtr msg(new TCONMsg());
00238
00239 dataSer >> (*msg);
00240
00241 msg->data = data;
00242
00243 if (handlers.find(msg->msgType) != handlers.end()) handlers[msg->msgType](msg);
00244
00245
00246
00247
00248
00249
00250
00251
00252
00253
00254
00255
00256
00257
00258
00259
00260
00261
00262
00263
00264
00265 if (msg->msgType == 81) std::cout<<"Received attitude data."<<std::endl;
00266 if (msg->msgType == 94)
00267 {
00268 std::cout<<"Received Nav data:"<<std::endl;
00269 std::cout<<"Total size:"<<msg->data->size()<<std::endl;
00270 std::cout<<"\tMsg size:"<<msg->size<<std::endl;
00271 std::cout<<"\tMsg seq:"<<int(msg->seq)<<std::endl;
00272
00273
00274
00275
00276
00277
00278
00279
00280 }
00281
00282
00283
00284
00285
00286
00287
00288
00289
00290
00291
00292
00293
00294
00295
00296
00297
00298 std::cout<<"TCPDevice: node:"<<int(msg->node)<<", msg:"<<int(msg->msgType)<<std::endl;
00299 }
00300 catch (std::exception& e)
00301 {
00302 std::cerr<<"TCPDevice::onHeader : "<<e.what()<<std::endl;
00303 }
00304 }
00305 else
00306 {
00307 std::cerr<<error.message()<<std::endl;
00308 }
00309 this->start_receive(Sync);
00310 }
00311
00312 void TCPDevice::send(TCONMsgPtr message)
00313 {
00314 message->setup();
00315 boost::asio::streambuf output;
00316 std::ostream out(&output);
00317
00318 out<<'@';
00319 out.width(4);
00320 out.fill('0');
00321 out<<std::uppercase<<std::hex<<message->size;
00322 boost::archive::binary_oarchive dataSer(output, boost::archive::no_header);
00323 dataSer << (*message);
00324
00325
00326 boost::asio::write(socket, output.data());
00327
00328 boost::asio::write(socket, message->data->data());
00329 }