Go to the documentation of this file.00001
00054 class CoLaFrame {
00055 std::vector<char> buffer_;
00056
00057 uint8_t checksum() {
00058 uint8_t c=0;
00059 for(size_t i=8; i<buffer_.size(); i++)
00060 c^=buffer_[i];
00061 return c;
00062 }
00063
00064 public:
00065 CoLaFrame() : buffer_(8, (char)0) {
00066 }
00067
00068 CoLaFrame(const char *str) : buffer_(8+strlen(str), (char)0) {
00069 int i=8;
00070 while(*str)
00071 buffer_[i++] = *(str++);
00072 }
00073
00074 std::vector<char> finish() {
00075 char *data = &buffer_[0];
00076 *(uint32_t*)(data+0) = htonl(0x02020202);
00077 *(uint32_t*)(data+4) = htonl(buffer_.size()-8);
00078
00079 buffer_.push_back(checksum());
00080
00081 return buffer_;
00082 }
00083 };
00084
00085
00086
00087 class Control : public TCP_Session {
00088 Any_Session::SIG_ON_DATA on_data_;
00089 const std::string remote_device_ip_;
00090 bool stream_started_;
00091
00092 void on_data(const char *data, const size_t size, Any_Session *writer)
00093 {
00094 ROS_DEBUG("got data for control");
00095 }
00096 public:
00097 Control(boost::asio::io_service& io_service, const std::string &remote_device_ip) :
00098 TCP_Session(io_service, on_data_),
00099 remote_device_ip_(remote_device_ip),
00100 stream_started_(false)
00101 {
00102 on_data_.connect( boost::bind(&Control::on_data, this, _1, _2, _3) );
00103 }
00104
00105
00106 bool open() {
00107 ROS_DEBUG("Connecting to device...");
00108
00109 if(!connect(remote_device_ip_, "2112")) {
00110 ROS_ERROR("Error on connecting to %s", remote_device_ip_.c_str());
00111 return false;
00112 }
00113
00114 ROS_DEBUG("done.");
00115 return true;
00116 }
00117
00118
00119
00120
00121 bool initStream() {
00122
00123 ROS_DEBUG("Initializing streaming...");
00124 CoLaFrame fr("sMN GetBlobClientConfig");
00125 ROS_DEBUG("Sending on sMN GetBlobClientConfig");
00126 write(fr.finish());
00127 ROS_DEBUG("done.");
00128
00129 return true;
00130 }
00131
00132
00133
00134
00135 bool startStream() {
00136 if(stream_started_) return true;
00137
00138 CoLaFrame fr("sMN PLAYSTART");
00139 ROS_DEBUG("Sending on sopas sMN PLAYSTART");
00140 write(fr.finish());
00141 stream_started_ = true;
00142
00143 return true;
00144 }
00145
00146
00147
00148 bool stopStream() {
00149 if(!stream_started_) return true;
00150
00151 CoLaFrame fr("sMN PLAYSTOP");
00152 ROS_DEBUG("Sending on sopas sMN PLAYSTOP");
00153 write(fr.finish());
00154 stream_started_ = false;
00155
00156 return true;
00157 }
00158 };
00159
00160
00161
00162
00163 class Streaming : public TCP_Session {
00164 public:
00165
00166 struct SFrame {
00167 std::vector<char> buffer;
00168
00169 typedef boost::shared_ptr<SFrame> Ptr;
00170 };
00171
00172 typedef boost::signals2::signal<void (const boost::shared_ptr<Data> &)> SIG_ON_FRAME;
00173
00174 private:
00175 const std::string remote_device_ip_;
00176 Any_Session::SIG_ON_DATA on_data_;
00177 SIG_ON_FRAME on_frame_;
00178 bool debugOutput_;
00179 SFrame::Ptr cur_frame_;
00180
00181 public:
00182
00183 Streaming(boost::asio::io_service& io_service, const std::string &remote_device_ip) :
00184 TCP_Session(io_service, on_data_),
00185 remote_device_ip_(remote_device_ip),
00186 debugOutput_(false)
00187 {
00188 on_data_.connect( boost::bind(&Streaming::on_data, this, _1, _2, _3) );
00189 }
00190
00191 SIG_ON_FRAME &getSignal() {return on_frame_;}
00192
00193 bool &debugFlag() {return debugOutput_;}
00194 bool debugFlag() const {return debugOutput_;}
00195
00196
00197 bool openStream() {
00198 ROS_DEBUG("Opening streaming socket...");
00199 if(!connect(remote_device_ip_, "2113")) {
00200 ROS_DEBUG("Error on connecting to %s", remote_device_ip_.c_str());
00201 return false;
00202 }
00203 ROS_DEBUG("done.");
00204
00205
00206 const char HEARTBEAT_MSG[] = "BlbReq";
00207
00208 ROS_DEBUG("Sending BlbReq: %s", HEARTBEAT_MSG);
00209 write(std::string(HEARTBEAT_MSG, sizeof(HEARTBEAT_MSG)-1));
00210
00211 return true;
00212 }
00213
00214
00215 bool closeStream() {
00216 ROS_DEBUG("Closing streaming connection...");
00217 close();
00218 cur_frame_.reset();
00219 ROS_DEBUG("done.");
00220
00221 return true;
00222 }
00223
00224
00225
00226
00227 void on_data(const char *data, const size_t size, Any_Session *writer)
00228 {
00229 if(debugOutput_)
00230 ROS_DEBUG("Reading image from stream...");
00231
00232 if(!data || size<0) {
00233 ROS_DEBUG("Socket not connected, terminating.");
00234 return;
00235 }
00236
00237 if(cur_frame_) {
00238 cur_frame_->buffer.insert(cur_frame_->buffer.end(), data, data+size );
00239
00240 bool goon=true;
00241 while(goon) {
00242 goon=false;
00243
00244 if(Data::check_header(&cur_frame_->buffer[0], cur_frame_->buffer.size())) {
00245 boost::shared_ptr<Data> parser(new Data);
00246 const size_t actual_size = parser->actual_size(&cur_frame_->buffer[0], cur_frame_->buffer.size());
00247 if(parser->read(&cur_frame_->buffer[0], actual_size)) {
00248 on_frame_(parser);
00249 goon=true;
00250 }
00251 else {
00252 cur_frame_.reset();
00253 ROS_ERROR("failed to parse frame");
00254 break;
00255 }
00256
00257 cur_frame_->buffer.erase(cur_frame_->buffer.begin(), cur_frame_->buffer.begin()+actual_size);
00258 }
00259 }
00260 }
00261 else {
00262 cur_frame_.reset(new SFrame());
00263 on_data(data, size, writer);
00264 }
00265
00266 if(debugOutput_)
00267 ROS_DEBUG("done.");
00268 }
00269 };