Go to the documentation of this file.00001
00041 class CoLaFrame {
00042 std::vector<char> buffer_;
00043
00044 uint8_t checksum() {
00045 uint8_t c=0;
00046 for(size_t i=8; i<buffer_.size(); i++)
00047 c^=buffer_[i];
00048 return c;
00049 }
00050
00051 public:
00052 CoLaFrame() : buffer_(8, (char)0) {
00053 }
00054
00055 CoLaFrame(const char *str) : buffer_(8+strlen(str), (char)0) {
00056 int i=8;
00057 while(*str)
00058 buffer_[i++] = *(str++);
00059 }
00060
00061 std::vector<char> finish() {
00062 char *data = &buffer_[0];
00063 *(uint32_t*)(data+0) = htonl(0x02020202);
00064 *(uint32_t*)(data+4) = htonl(buffer_.size()-8);
00065
00066 buffer_.push_back(checksum());
00067
00068 return buffer_;
00069 }
00070 };
00071
00072
00073
00074 class Control : public TCP_Session {
00075 Any_Session::SIG_ON_DATA on_data_;
00076 const std::string remote_device_ip_;
00077 bool stream_started_;
00078
00079 void on_data(const char *data, const size_t size, Any_Session *writer)
00080 {
00081 ROS_DEBUG("got data for control");
00082 }
00083 public:
00084 Control(boost::asio::io_service& io_service, const std::string &remote_device_ip) :
00085 TCP_Session(io_service, on_data_),
00086 remote_device_ip_(remote_device_ip),
00087 stream_started_(false)
00088 {
00089 on_data_.connect( boost::bind(&Control::on_data, this, _1, _2, _3) );
00090 }
00091
00092
00093 bool open() {
00094 ROS_DEBUG("Connecting to device...");
00095
00096 if(!connect(remote_device_ip_, "2112")) {
00097 ROS_ERROR("Error on connecting to %s", remote_device_ip_.c_str());
00098 return false;
00099 }
00100
00101 ROS_DEBUG("done.");
00102 return true;
00103 }
00104
00105
00106
00107
00108 bool initStream() {
00109
00110 ROS_DEBUG("Initializing streaming...");
00111 CoLaFrame fr("sMN GetBlobClientConfig");
00112 ROS_DEBUG("Sending on sMN GetBlobClientConfig");
00113 write(fr.finish());
00114 ROS_DEBUG("done.");
00115
00116 return true;
00117 }
00118
00119
00120
00121
00122 bool startStream() {
00123 if(stream_started_) return true;
00124
00125 CoLaFrame fr("sMN PLAYSTART");
00126 ROS_DEBUG("Sending on sopas sMN PLAYSTART");
00127 write(fr.finish());
00128 stream_started_ = true;
00129
00130 return true;
00131 }
00132
00133
00134
00135 bool stopStream() {
00136 if(!stream_started_) return true;
00137
00138 CoLaFrame fr("sMN PLAYSTOP");
00139 ROS_DEBUG("Sending on sopas sMN PLAYSTOP");
00140 write(fr.finish());
00141 stream_started_ = false;
00142
00143 return true;
00144 }
00145 };
00146
00147
00148
00149
00150 class Streaming : public TCP_Session {
00151 public:
00152
00153 struct SFrame {
00154 std::vector<char> buffer;
00155
00156 typedef boost::shared_ptr<SFrame> Ptr;
00157 };
00158
00159 typedef boost::signals2::signal<void (const boost::shared_ptr<Data> &)> SIG_ON_FRAME;
00160
00161 private:
00162 const std::string remote_device_ip_;
00163 Any_Session::SIG_ON_DATA on_data_;
00164 SIG_ON_FRAME on_frame_;
00165 bool debugOutput_;
00166 SFrame::Ptr cur_frame_;
00167
00168 public:
00169
00170 Streaming(boost::asio::io_service& io_service, const std::string &remote_device_ip) :
00171 TCP_Session(io_service, on_data_),
00172 remote_device_ip_(remote_device_ip),
00173 debugOutput_(false)
00174 {
00175 on_data_.connect( boost::bind(&Streaming::on_data, this, _1, _2, _3) );
00176 }
00177
00178 SIG_ON_FRAME &getSignal() {return on_frame_;}
00179
00180 bool &debugFlag() {return debugOutput_;}
00181 bool debugFlag() const {return debugOutput_;}
00182
00183
00184 bool openStream() {
00185 ROS_DEBUG("Opening streaming socket...");
00186 if(!connect(remote_device_ip_, "2113")) {
00187 ROS_DEBUG("Error on connecting to %s", remote_device_ip_.c_str());
00188 return false;
00189 }
00190 ROS_DEBUG("done.");
00191
00192
00193 const char HEARTBEAT_MSG[] = "BlbReq";
00194
00195 ROS_DEBUG("Sending BlbReq: %s", HEARTBEAT_MSG);
00196 write(std::string(HEARTBEAT_MSG, sizeof(HEARTBEAT_MSG)-1));
00197
00198 return true;
00199 }
00200
00201
00202 bool closeStream() {
00203 ROS_DEBUG("Closing streaming connection...");
00204 close();
00205 cur_frame_.reset();
00206 ROS_DEBUG("done.");
00207
00208 return true;
00209 }
00210
00211
00212
00213
00214 void on_data(const char *data, const size_t size, Any_Session *writer)
00215 {
00216 if(debugOutput_)
00217 ROS_DEBUG("Reading image from stream...");
00218
00219 if(!data || size<0) {
00220 ROS_DEBUG("Socket not connected, terminating.");
00221 return;
00222 }
00223
00224 if(cur_frame_) {
00225 cur_frame_->buffer.insert(cur_frame_->buffer.end(), data, data+size );
00226
00227 bool goon=true;
00228 while(goon) {
00229 goon=false;
00230
00231 if(Data::check_header(&cur_frame_->buffer[0], cur_frame_->buffer.size())) {
00232 boost::shared_ptr<Data> parser(new Data);
00233 const size_t actual_size = parser->actual_size(&cur_frame_->buffer[0], cur_frame_->buffer.size());
00234 if(parser->read(&cur_frame_->buffer[0], actual_size)) {
00235 on_frame_(parser);
00236 goon=true;
00237 }
00238 else {
00239 cur_frame_.reset();
00240 ROS_ERROR("failed to parse frame");
00241 break;
00242 }
00243
00244 cur_frame_->buffer.erase(cur_frame_->buffer.begin(), cur_frame_->buffer.begin()+actual_size);
00245 }
00246 }
00247 }
00248 else {
00249 cur_frame_.reset(new SFrame());
00250 on_data(data, size, writer);
00251 }
00252
00253 if(debugOutput_)
00254 ROS_DEBUG("done.");
00255 }
00256 };