$search
00001 /********************************************************************* 00002 * 00003 * Software License Agreement (BSD License) 00004 * 00005 * Copyright (c) 2010, Robert Bosch LLC. 00006 * All rights reserved. 00007 * 00008 * Redistribution and use in source and binary forms, with or without 00009 * modification, are permitted provided that the following conditions 00010 * are met: 00011 * 00012 * * Redistributions of source code must retain the above copyright 00013 * notice, this list of conditions and the following disclaimer. 00014 * * Redistributions in binary form must reproduce the above 00015 * copyright notice, this list of conditions and the following 00016 * disclaimer in the documentation and/or other materials provided 00017 * with the distribution. 00018 * * Neither the name of the Robert Bosch nor the names of its 00019 * contributors may be used to endorse or promote products derived 00020 * from this software without specific prior written permission. 00021 * 00022 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 00023 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 00024 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 00025 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 00026 * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 00027 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 00028 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 00029 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 00030 * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 00031 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 00032 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 00033 * POSSIBILITY OF SUCH DAMAGE. 00034 * 00035 *********************************************************************/ 00036 #include "mjpeg_server/mjpeg_server.h" 00037 #include <highgui.h> 00038 00039 #include <sys/ioctl.h> 00040 #include <errno.h> 00041 #include <signal.h> 00042 #include <sys/socket.h> 00043 #include <arpa/inet.h> 00044 #include <sys/types.h> 00045 #include <sys/stat.h> 00046 #include <fcntl.h> 00047 #include <syslog.h> 00048 #include <netdb.h> 00049 #include <errno.h> 00050 00051 #include <boost/thread.hpp> 00052 #include <boost/bind.hpp> 00053 00054 #define ABS(a) (((a) < 0) ? -(a) : (a)) 00055 #ifndef MIN 00056 #define MIN(a, b) (((a) < (b)) ? (a) : (b)) 00057 #endif 00058 #ifndef MAX 00059 #define MAX(a, b) (((a) > (b)) ? (a) : (b)) 00060 #endif 00061 #define LENGTH_OF(x) (sizeof(x)/sizeof(x[0])) 00062 00063 /* the boundary is used for the M-JPEG stream, it separates the multipart stream of pictures */ 00064 #define BOUNDARY "boundarydonotcross" 00065 00066 /* 00067 * this defines the buffer size for a JPG-frame 00068 * selecting to large values will allocate much wasted RAM for each buffer 00069 * selecting to small values will lead to crashes due to to small buffers 00070 */ 00071 #define MAX_FRAME_SIZE (256*1024) 00072 #define TEN_K (10*1024) 00073 00074 /* 00075 * Standard header to be send along with other header information like mimetype. 00076 * 00077 * The parameters should ensure the browser does not cache our answer. 00078 * A browser should connect for each file and not serve files from his cache. 00079 * Using cached pictures would lead to showing old/outdated pictures 00080 * Many browser seem to ignore, or at least not always obey those headers 00081 * since i observed caching of files from time to time. 00082 */ 00083 #define STD_HEADER "Connection: close\r\n" \ 00084 "Server: mjpeg_server\r\n" \ 00085 "Cache-Control: no-store, no-cache, must-revalidate, pre-check=0, post-check=0, max-age=0\r\n" \ 00086 "Pragma: no-cache\r\n" \ 00087 "Expires: Mon, 3 Jan 2000 12:34:56 GMT\r\n" 00088 00089 00090 namespace mjpeg_server { 00091 00092 MJPEGServer::MJPEGServer(ros::NodeHandle& node) : 00093 node_(node), image_transport_(node), stop_requested_(false), www_folder_(NULL) 00094 { 00095 ros::NodeHandle private_nh("~"); 00096 private_nh.param("port", port_, 8080); 00097 } 00098 00099 MJPEGServer::~MJPEGServer() { 00100 cleanUp(); 00101 } 00102 00103 void MJPEGServer::imageCallback(const sensor_msgs::ImageConstPtr& msg, const std::string& topic) { 00104 00105 ImageBuffer* image_buffer = getImageBuffer(topic); 00106 boost::unique_lock<boost::mutex> lock(image_buffer->mutex_); 00107 // copy image 00108 image_buffer->msg = *msg; 00109 // notify senders 00110 image_buffer->condition_.notify_all(); 00111 } 00112 00113 void MJPEGServer::splitString(const std::string& str, std::vector<std::string>& tokens, const std::string& delimiter) 00114 { 00115 // Skip delimiters at beginning. 00116 std::string::size_type lastPos = str.find_first_not_of(delimiter, 0); 00117 // Find first "non-delimiter". 00118 std::string::size_type pos = str.find_first_of(delimiter, lastPos); 00119 00120 while (std::string::npos != pos || std::string::npos != lastPos) 00121 { 00122 // Found a token, add it to the vector. 00123 tokens.push_back(str.substr(lastPos, pos - lastPos)); 00124 // Skip delimiters. Note the "not_of" 00125 lastPos = str.find_first_not_of(delimiter, pos); 00126 // Find next "non-delimiter" 00127 pos = str.find_first_of(delimiter, lastPos); 00128 } 00129 } 00130 00131 int MJPEGServer::stringToInt(const std::string& str, const int default_value) 00132 { 00133 int value; 00134 int res; 00135 if(str.length() == 0) 00136 return default_value; 00137 res = sscanf(str.c_str(),"%i",&value); 00138 if (res == 1) 00139 return value; 00140 return default_value; 00141 } 00142 00143 void MJPEGServer::initIOBuffer(iobuffer *iobuf) 00144 { 00145 memset(iobuf->buffer, 0, sizeof(iobuf->buffer)); 00146 iobuf->level = 0; 00147 } 00148 00149 void MJPEGServer::initRequest(request *req) 00150 { 00151 req->type = A_UNKNOWN; 00152 req->type = A_UNKNOWN; 00153 req->parameter = NULL; 00154 req->client = NULL; 00155 req->credentials = NULL; 00156 } 00157 00158 void MJPEGServer::freeRequest(request *req) 00159 { 00160 if(req->parameter != NULL) free(req->parameter); 00161 if(req->client != NULL) free(req->client); 00162 if(req->credentials != NULL) free(req->credentials); 00163 } 00164 00165 int MJPEGServer::readWithTimeout(int fd, iobuffer *iobuf, char *buffer, size_t len, int timeout) 00166 { 00167 int copied = 0, rc, i; 00168 fd_set fds; 00169 struct timeval tv; 00170 00171 memset(buffer, 0, len); 00172 00173 while((copied < len)) { 00174 i = MIN(iobuf->level, len - copied); 00175 memcpy(buffer + copied, iobuf->buffer + IO_BUFFER - iobuf->level, i); 00176 00177 iobuf->level -= i; 00178 copied += i; 00179 if(copied >= len) 00180 return copied; 00181 00182 /* select will return in case of timeout or new data arrived */ 00183 tv.tv_sec = timeout; 00184 tv.tv_usec = 0; 00185 FD_ZERO(&fds); 00186 FD_SET(fd, &fds); 00187 if((rc = select(fd + 1, &fds, NULL, NULL, &tv)) <= 0) { 00188 if(rc < 0) 00189 exit(EXIT_FAILURE); 00190 00191 /* this must be a timeout */ 00192 return copied; 00193 } 00194 00195 initIOBuffer(iobuf); 00196 00197 /* 00198 * there should be at least one byte, because select signalled it. 00199 * But: It may happen (very seldomly), that the socket gets closed remotly between 00200 * the select() and the following read. That is the reason for not relying 00201 * on reading at least one byte. 00202 */ 00203 if((iobuf->level = read(fd, &iobuf->buffer, IO_BUFFER)) <= 0) { 00204 /* an error occured */ 00205 return -1; 00206 } 00207 00208 /* align data to the end of the buffer if less than IO_BUFFER bytes were read */ 00209 memmove(iobuf->buffer + (IO_BUFFER - iobuf->level), iobuf->buffer, iobuf->level); 00210 } 00211 00212 return 0; 00213 } 00214 00215 int MJPEGServer::readLineWithTimeout(int fd, iobuffer *iobuf, char *buffer, size_t len, int timeout) 00216 { 00217 char c = '\0', *out = buffer; 00218 int i; 00219 00220 memset(buffer, 0, len); 00221 00222 for(i = 0; i < len && c != '\n'; i++) { 00223 if(readWithTimeout(fd, iobuf, &c, 1, timeout) <= 0) { 00224 /* timeout or error occured */ 00225 return -1; 00226 } 00227 *out++ = c; 00228 } 00229 00230 return i; 00231 } 00232 00233 void MJPEGServer::decodeBase64(char *data) 00234 { 00235 const unsigned char *in = (const unsigned char *)data; 00236 /* The decoded size will be at most 3/4 the size of the encoded */ 00237 unsigned ch = 0; 00238 int i = 0; 00239 00240 while(*in) { 00241 int t = *in++; 00242 00243 if(t >= '0' && t <= '9') 00244 t = t - '0' + 52; 00245 else if(t >= 'A' && t <= 'Z') 00246 t = t - 'A'; 00247 else if(t >= 'a' && t <= 'z') 00248 t = t - 'a' + 26; 00249 else if(t == '+') 00250 t = 62; 00251 else if(t == '/') 00252 t = 63; 00253 else if(t == '=') 00254 t = 0; 00255 else 00256 continue; 00257 00258 ch = (ch << 6) | t; 00259 i++; 00260 if(i == 4) { 00261 *data++ = (char)(ch >> 16); 00262 *data++ = (char)(ch >> 8); 00263 *data++ = (char) ch; 00264 i = 0; 00265 } 00266 } 00267 *data = '\0'; 00268 } 00269 00270 int MJPEGServer::hexCharToInt(char in) 00271 { 00272 if(in >= '0' && in <= '9') 00273 return in - '0'; 00274 00275 if(in >= 'a' && in <= 'f') 00276 return (in - 'a') + 10; 00277 00278 if(in >= 'A' && in <= 'F') 00279 return (in - 'A') + 10; 00280 00281 return -1; 00282 } 00283 00284 int MJPEGServer::unescape(char *string) 00285 { 00286 char *source = string, *destination = string; 00287 int src, dst, length = strlen(string), rc; 00288 00289 /* iterate over the string */ 00290 for(dst = 0, src = 0; src < length; src++) { 00291 00292 /* is it an escape character? */ 00293 if(source[src] != '%') { 00294 /* no, so just go to the next character */ 00295 destination[dst] = source[src]; 00296 dst++; 00297 continue; 00298 } 00299 00300 /* yes, it is an escaped character */ 00301 00302 /* check if there are enough characters */ 00303 if(src + 2 > length) { 00304 return -1; 00305 break; 00306 } 00307 00308 /* perform replacement of %## with the corresponding character */ 00309 if((rc = hexCharToInt(source[src+1])) == -1) return -1; 00310 destination[dst] = rc * 16; 00311 if((rc = hexCharToInt(source[src+2])) == -1) return -1; 00312 destination[dst] += rc; 00313 00314 /* advance pointers, here is the reason why the resulting string is shorter */ 00315 dst++; src += 2; 00316 } 00317 00318 /* ensure the string is properly finished with a null-character */ 00319 destination[dst] = '\0'; 00320 00321 return 0; 00322 } 00323 00324 void MJPEGServer::sendError(int fd, int which, const char *message) 00325 { 00326 char buffer[BUFFER_SIZE] = {0}; 00327 00328 if(which == 401) { 00329 sprintf(buffer, "HTTP/1.0 401 Unauthorized\r\n" \ 00330 "Content-type: text/plain\r\n" \ 00331 STD_HEADER \ 00332 "WWW-Authenticate: Basic realm=\"MJPG-Streamer\"\r\n" \ 00333 "\r\n" \ 00334 "401: Not Authenticated!\r\n" \ 00335 "%s", message); 00336 } else if(which == 404) { 00337 sprintf(buffer, "HTTP/1.0 404 Not Found\r\n" \ 00338 "Content-type: text/plain\r\n" \ 00339 STD_HEADER \ 00340 "\r\n" \ 00341 "404: Not Found!\r\n" \ 00342 "%s", message); 00343 } else if(which == 500) { 00344 sprintf(buffer, "HTTP/1.0 500 Internal Server Error\r\n" \ 00345 "Content-type: text/plain\r\n" \ 00346 STD_HEADER \ 00347 "\r\n" \ 00348 "500: Internal Server Error!\r\n" \ 00349 "%s", message); 00350 } else if(which == 400) { 00351 sprintf(buffer, "HTTP/1.0 400 Bad Request\r\n" \ 00352 "Content-type: text/plain\r\n" \ 00353 STD_HEADER \ 00354 "\r\n" \ 00355 "400: Not Found!\r\n" \ 00356 "%s", message); 00357 } else { 00358 sprintf(buffer, "HTTP/1.0 501 Not Implemented\r\n" \ 00359 "Content-type: text/plain\r\n" \ 00360 STD_HEADER \ 00361 "\r\n" \ 00362 "501: Not Implemented!\r\n" \ 00363 "%s", message); 00364 } 00365 00366 if(write(fd, buffer, strlen(buffer)) < 0) { 00367 ROS_DEBUG("write failed, done anyway"); 00368 } 00369 } 00370 00371 void MJPEGServer::decodeParameter(const std::string& parameter, ParameterMap& parameter_map) 00372 { 00373 std::vector<std::string> parameter_value_pairs; 00374 splitString(parameter,parameter_value_pairs, "?&"); 00375 00376 for(size_t i=0; i<parameter_value_pairs.size(); i++) { 00377 std::vector<std::string> parameter_value; 00378 splitString(parameter_value_pairs[i], parameter_value, "="); 00379 if(parameter_value.size()==1) { 00380 parameter_map.insert(std::make_pair(parameter_value[0],std::string(""))); 00381 } 00382 else if(parameter_value.size()==2) { 00383 parameter_map.insert(std::make_pair(parameter_value[0],parameter_value[1])); 00384 } 00385 } 00386 } 00387 00388 ImageBuffer* MJPEGServer::getImageBuffer(const std::string& topic) 00389 { 00390 boost::unique_lock<boost::mutex> lock(image_maps_mutex_); 00391 ImageSubscriberMap::iterator it = image_subscribers_.find(topic); 00392 if (it == image_subscribers_.end()) { 00393 image_subscribers_[topic] = image_transport_.subscribe(topic, 1, boost::bind(&MJPEGServer::imageCallback, this, _1, topic)); 00394 image_buffers_[topic] = new ImageBuffer(); 00395 ROS_INFO("Subscribing to topic %s", topic.c_str()); 00396 } 00397 ImageBuffer* image_buffer = image_buffers_[topic]; 00398 return image_buffer; 00399 } 00400 00401 // rotate input image at 180 degrees 00402 void MJPEGServer::invertImage(const cv::Mat& input, cv::Mat& output) { 00403 00404 cv::Mat_<cv::Vec3b>& input_img = (cv::Mat_<cv::Vec3b>&)input; //3 channel pointer to image 00405 cv::Mat_<cv::Vec3b>& output_img = (cv::Mat_<cv::Vec3b>&)output; //3 channel pointer to image 00406 cv::Size size = input.size(); 00407 00408 for (int j = 0; j < size.height; ++j) 00409 for (int i = 0; i < size.width; ++i) { 00410 //outputImage.imageData[size.height*size.width - (i + j*size.width) - 1] = inputImage.imageData[i + j*size.width]; 00411 output_img(size.height-j-1, size.width-i-1) = input_img(j,i); 00412 } 00413 return; 00414 } 00415 00416 void MJPEGServer::sendStream(int fd, const char *parameter) 00417 { 00418 unsigned char *frame = NULL, *tmp = NULL; 00419 int frame_size = 0, max_frame_size = 0; 00420 char buffer[BUFFER_SIZE] = {0}; 00421 struct timeval timestamp; 00422 sensor_msgs::CvBridge image_bridge; 00423 00424 ROS_DEBUG("Decoding parameter"); 00425 00426 std::string params = parameter; 00427 ParameterMap parameter_map; 00428 decodeParameter(params, parameter_map); 00429 00430 ParameterMap::iterator itp = parameter_map.find("topic"); 00431 if (itp == parameter_map.end()) return; 00432 00433 std::string topic = itp->second; 00434 ImageBuffer* image_buffer = getImageBuffer(topic); 00435 00436 ROS_DEBUG("preparing header"); 00437 sprintf(buffer, "HTTP/1.0 200 OK\r\n" \ 00438 STD_HEADER \ 00439 "Content-Type: multipart/x-mixed-replace;boundary=" BOUNDARY "\r\n" \ 00440 "\r\n" \ 00441 "--" BOUNDARY "\r\n"); 00442 00443 if(write(fd, buffer, strlen(buffer)) < 0) { 00444 free(frame); 00445 return; 00446 } 00447 00448 ROS_DEBUG("Headers send, sending stream now"); 00449 00450 while(!stop_requested_) { 00451 { 00452 /* wait for fresh frames */ 00453 boost::unique_lock<boost::mutex> lock(image_buffer->mutex_); 00454 image_buffer->condition_.wait(lock); 00455 00456 IplImage* image; 00457 try { 00458 if (image_bridge.fromImage(image_buffer->msg, "bgr8")) { 00459 image = image_bridge.toIpl(); 00460 } 00461 else { 00462 ROS_ERROR("Unable to convert %s image to bgr8", image_buffer->msg.encoding.c_str()); 00463 return; 00464 } 00465 } 00466 catch(...) { 00467 ROS_ERROR("Unable to convert %s image to ipl format", image_buffer->msg.encoding.c_str()); 00468 return; 00469 } 00470 00471 // encode image 00472 cv::Mat img = image; 00473 std::vector<uchar> encoded_buffer; 00474 std::vector<int> encode_params; 00475 00476 // invert 00477 int invert = 0; 00478 if(parameter_map.find("invert") != parameter_map.end()) { 00479 cv::Mat cloned_image = img.clone(); 00480 invertImage(cloned_image, img); 00481 } 00482 00483 // quality 00484 int quality = 95; 00485 if(parameter_map.find("quality") != parameter_map.end()) { 00486 quality = stringToInt(parameter_map["quality"]); 00487 } 00488 encode_params.push_back(CV_IMWRITE_JPEG_QUALITY); 00489 encode_params.push_back(quality); 00490 00491 // resize image 00492 if(parameter_map.find("width") != parameter_map.end() && parameter_map.find("height") != parameter_map.end()) { 00493 int width = stringToInt(parameter_map["width"]); 00494 int height = stringToInt(parameter_map["height"]); 00495 if(width > 0 && height > 0) { 00496 cv::Mat img_resized; 00497 cv::Size new_size(width,height); 00498 cv::resize(img, img_resized, new_size); 00499 cv::imencode(".jpeg", img_resized, encoded_buffer, encode_params); 00500 } 00501 else { 00502 cv::imencode(".jpeg", img, encoded_buffer, encode_params); 00503 } 00504 } 00505 else { 00506 cv::imencode(".jpeg", img, encoded_buffer, encode_params); 00507 } 00508 00509 // copy encoded frame buffer 00510 frame_size = encoded_buffer.size(); 00511 00512 /* check if frame buffer is large enough, increase it if necessary */ 00513 if(frame_size > max_frame_size) { 00514 ROS_DEBUG("increasing frame buffer size to %d", frame_size); 00515 00516 max_frame_size = frame_size + TEN_K; 00517 if((tmp = (unsigned char*)realloc(frame, max_frame_size)) == NULL) { 00518 free(frame); 00519 sendError(fd, 500, "not enough memory"); 00520 return; 00521 } 00522 frame = tmp; 00523 } 00524 00525 /* copy v4l2_buffer timeval to user space */ 00526 timestamp.tv_sec = ros::Time::now().toSec(); 00527 00528 memcpy(frame, &encoded_buffer[0], frame_size); 00529 ROS_DEBUG("got frame (size: %d kB)", frame_size / 1024); 00530 } 00531 00532 /* 00533 * print the individual mimetype and the length 00534 * sending the content-length fixes random stream disruption observed 00535 * with firefox 00536 */ 00537 sprintf(buffer, "Content-Type: image/jpeg\r\n" \ 00538 "Content-Length: %d\r\n" \ 00539 "X-Timestamp: %d.%06d\r\n" \ 00540 "\r\n", frame_size, (int)timestamp.tv_sec, (int)timestamp.tv_usec); 00541 ROS_DEBUG("sending intemdiate header"); 00542 if(write(fd, buffer, strlen(buffer)) < 0) break; 00543 00544 ROS_DEBUG("sending frame"); 00545 if(write(fd, frame, frame_size) < 0) break; 00546 00547 ROS_DEBUG("sending boundary"); 00548 sprintf(buffer, "\r\n--" BOUNDARY "\r\n"); 00549 if(write(fd, buffer, strlen(buffer)) < 0) break; 00550 } 00551 00552 free(frame); 00553 } 00554 00555 void MJPEGServer::sendSnapshot(int fd, const char *parameter) 00556 { 00557 unsigned char *frame = NULL; 00558 int frame_size = 0; 00559 char buffer[BUFFER_SIZE] = {0}; 00560 struct timeval timestamp; 00561 sensor_msgs::CvBridge image_bridge; 00562 00563 std::string params = parameter; 00564 ParameterMap parameter_map; 00565 decodeParameter(params, parameter_map);http://merry:8080/stream?topic=/remote_lab_cam1/image_raw?invert=1 00566 00567 ParameterMap::iterator itp = parameter_map.find("topic"); 00568 if (itp == parameter_map.end()) return; 00569 00570 std::string topic = itp->second; 00571 ImageBuffer* image_buffer = getImageBuffer(topic); 00572 00573 /* wait for fresh frames */ 00574 boost::unique_lock<boost::mutex> lock(image_buffer->mutex_); 00575 image_buffer->condition_.wait(lock); 00576 00577 IplImage* image; 00578 try { 00579 if (image_bridge.fromImage(image_buffer->msg, "bgr8")) { 00580 image = image_bridge.toIpl(); 00581 } 00582 else { 00583 ROS_ERROR("Unable to convert %s image to bgr8", image_buffer->msg.encoding.c_str()); 00584 return; 00585 } 00586 } 00587 catch(...) { 00588 ROS_ERROR("Unable to convert %s image to ipl format", image_buffer->msg.encoding.c_str()); 00589 return; 00590 } 00591 00592 cv::Mat img = image; 00593 std::vector<uchar> encoded_buffer; 00594 std::vector<int> encode_params; 00595 00596 // invert 00597 int invert = 0; 00598 if(parameter_map.find("invert") != parameter_map.end()) { 00599 cv::Mat cloned_image = img.clone(); 00600 invertImage(cloned_image, img); 00601 } 00602 00603 // quality 00604 int quality = 95; 00605 if(parameter_map.find("quality") != parameter_map.end()) { 00606 quality = stringToInt(parameter_map["quality"]); 00607 } 00608 encode_params.push_back(CV_IMWRITE_JPEG_QUALITY); 00609 encode_params.push_back(quality); 00610 00611 // resize image 00612 if(parameter_map.find("width") != parameter_map.end() && parameter_map.find("height") != parameter_map.end()) { 00613 int width = stringToInt(parameter_map["width"]); 00614 int height = stringToInt(parameter_map["height"]); 00615 if(width > 0 && height > 0) { 00616 cv::Mat img_resized; 00617 cv::Size new_size(width,height); 00618 cv::resize(img, img_resized, new_size); 00619 cv::imencode(".jpeg", img_resized, encoded_buffer, encode_params); 00620 } 00621 else { 00622 cv::imencode(".jpeg", img, encoded_buffer, encode_params); 00623 } 00624 } 00625 else { 00626 cv::imencode(".jpeg", img, encoded_buffer, encode_params); 00627 } 00628 00629 // copy encoded frame buffer 00630 frame_size = encoded_buffer.size(); 00631 00632 // resize buffer 00633 if((frame = (unsigned char*)malloc(frame_size)) == NULL) { 00634 free(frame); 00635 sendError(fd, 500, "not enough memory"); 00636 return; 00637 } 00638 00639 /* copy v4l2_buffer timeval to user space */ 00640 timestamp.tv_sec = ros::Time::now().toSec(); 00641 00642 memcpy(frame, &encoded_buffer[0], frame_size); 00643 ROS_DEBUG("got frame (size: %d kB)", frame_size / 1024); 00644 00645 /* write the response */ 00646 sprintf(buffer, "HTTP/1.0 200 OK\r\n" \ 00647 STD_HEADER \ 00648 "Content-type: image/jpeg\r\n" \ 00649 "X-Timestamp: %d.%06d\r\n" \ 00650 "\r\n", (int) timestamp.tv_sec, (int) timestamp.tv_usec); 00651 00652 /* send header and image now */ 00653 if(write(fd, buffer, strlen(buffer)) < 0 || \ 00654 write(fd, frame, frame_size) < 0) { 00655 free(frame); 00656 return; 00657 } 00658 00659 free(frame); 00660 } 00661 00662 void MJPEGServer::client(int fd) { 00663 int cnt; 00664 char buffer[BUFFER_SIZE] = {0}, *pb = buffer; 00665 iobuffer iobuf; 00666 request req; 00667 00668 /* initializes the structures */ 00669 initIOBuffer(&iobuf); 00670 initRequest(&req); 00671 00672 /* What does the client want to receive? Read the request. */ 00673 memset(buffer, 0, sizeof(buffer)); 00674 if((cnt = readLineWithTimeout(fd, &iobuf, buffer, sizeof(buffer) - 1, 5)) == -1) { 00675 close(fd); 00676 return; 00677 } 00678 00679 /* determine what to deliver */ 00680 if(strstr(buffer, "GET /?") != NULL) { 00681 req.type = A_STREAM; 00682 00683 /* advance by the length of known string */ 00684 if((pb = strstr(buffer, "GET /")) == NULL) { 00685 ROS_DEBUG("HTTP request seems to be malformed"); 00686 sendError(fd, 400, "Malformed HTTP request"); 00687 close(fd); 00688 return; 00689 } 00690 pb += strlen("GET /"); // a pb points to the string after the first & after command 00691 int len = MIN(MAX(strspn(pb, "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ._/-1234567890?="), 0), 100); 00692 req.parameter = (char*)malloc(len + 1); 00693 if(req.parameter == NULL) { 00694 exit(EXIT_FAILURE); 00695 } 00696 memset(req.parameter, 0, len + 1); 00697 strncpy(req.parameter, pb, len); 00698 00699 ROS_DEBUG("requested image topic: \"%s\"", len, req.parameter); 00700 } 00701 else if(strstr(buffer, "GET /stream?") != NULL) { 00702 req.type = A_STREAM; 00703 00704 /* advance by the length of known string */ 00705 if((pb = strstr(buffer, "GET /stream")) == NULL) { 00706 ROS_DEBUG("HTTP request seems to be malformed"); 00707 sendError(fd, 400, "Malformed HTTP request"); 00708 close(fd); 00709 return; 00710 } 00711 pb += strlen("GET /stream"); // a pb points to the string after the first & after command 00712 int len = MIN(MAX(strspn(pb, "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ._/-1234567890?="), 0), 100); 00713 req.parameter = (char*)malloc(len + 1); 00714 if(req.parameter == NULL) { 00715 exit(EXIT_FAILURE); 00716 } 00717 memset(req.parameter, 0, len + 1); 00718 strncpy(req.parameter, pb, len); 00719 00720 ROS_DEBUG("requested image topic: \"%s\"", len, req.parameter); 00721 } 00722 else if(strstr(buffer, "GET /snapshot?") != NULL) { 00723 req.type = A_SNAPSHOT; 00724 00725 /* advance by the length of known string */ 00726 if((pb = strstr(buffer, "GET /snapshot")) == NULL) { 00727 ROS_DEBUG("HTTP request seems to be malformed"); 00728 sendError(fd, 400, "Malformed HTTP request"); 00729 close(fd); 00730 return; 00731 } 00732 pb += strlen("GET /snapshot"); // a pb points to the string after the first & after command 00733 int len = MIN(MAX(strspn(pb, "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ._/-1234567890?="), 0), 100); 00734 req.parameter = (char*)malloc(len + 1); 00735 if(req.parameter == NULL) { 00736 exit(EXIT_FAILURE); 00737 } 00738 memset(req.parameter, 0, len + 1); 00739 strncpy(req.parameter, pb, len); 00740 00741 ROS_DEBUG("requested image topic: \"%s\"", len, req.parameter); 00742 } 00743 00744 /* 00745 * parse the rest of the HTTP-request 00746 * the end of the request-header is marked by a single, empty line with "\r\n" 00747 */ 00748 do { 00749 memset(buffer, 0, sizeof(buffer)); 00750 00751 if((cnt = readLineWithTimeout(fd, &iobuf, buffer, sizeof(buffer) - 1, 5)) == -1) { 00752 freeRequest(&req); 00753 close(fd); 00754 return; 00755 } 00756 00757 if(strstr(buffer, "User-Agent: ") != NULL) { 00758 req.client = strdup(buffer + strlen("User-Agent: ")); 00759 } else if(strstr(buffer, "Authorization: Basic ") != NULL) { 00760 req.credentials = strdup(buffer + strlen("Authorization: Basic ")); 00761 decodeBase64(req.credentials); 00762 ROS_DEBUG("username:password: %s", req.credentials); 00763 } 00764 00765 } while(cnt > 2 && !(buffer[0] == '\r' && buffer[1] == '\n')); 00766 00767 00768 00769 /* now it's time to answer */ 00770 switch(req.type) { 00771 case A_STREAM: { 00772 ROS_DEBUG("Request for streaming"); 00773 sendStream(fd, req.parameter); 00774 break; 00775 } 00776 case A_SNAPSHOT: { 00777 ROS_DEBUG("Request for snapshot"); 00778 sendSnapshot(fd, req.parameter); 00779 break; 00780 } 00781 default: 00782 ROS_DEBUG("unknown request"); 00783 } 00784 00785 close(fd); 00786 freeRequest(&req); 00787 00788 ROS_INFO("Disconnecting HTTP client"); 00789 return; 00790 } 00791 00792 void MJPEGServer::execute() { 00793 00794 ROS_INFO("Starting mjpeg server"); 00795 00796 struct addrinfo *aip, *aip2; 00797 struct addrinfo hints; 00798 struct sockaddr_storage client_addr; 00799 socklen_t addr_len = sizeof(struct sockaddr_storage); 00800 fd_set selectfds; 00801 int max_fds = 0; 00802 00803 char name[NI_MAXHOST]; 00804 00805 bzero(&hints, sizeof(hints)); 00806 hints.ai_family = PF_UNSPEC; 00807 hints.ai_flags = AI_PASSIVE; 00808 hints.ai_socktype = SOCK_STREAM; 00809 00810 int err; 00811 snprintf(name, sizeof(name), "%d", port_); 00812 if((err = getaddrinfo(NULL, name, &hints, &aip)) != 0) { 00813 perror(gai_strerror(err)); 00814 exit(EXIT_FAILURE); 00815 } 00816 00817 for(int i = 0; i < MAX_NUM_SOCKETS; i++) 00818 sd[i] = -1; 00819 00820 00821 /* open sockets for server (1 socket / address family) */ 00822 int i = 0; 00823 int on; 00824 for(aip2 = aip; aip2 != NULL; aip2 = aip2->ai_next) { 00825 if((sd[i] = socket(aip2->ai_family, aip2->ai_socktype, 0)) < 0) { 00826 continue; 00827 } 00828 00829 /* ignore "socket already in use" errors */ 00830 on = 1; 00831 if(setsockopt(sd[i], SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0) { 00832 perror("setsockopt(SO_REUSEADDR) failed"); 00833 } 00834 00835 /* IPv6 socket should listen to IPv6 only, otherwise we will get "socket already in use" */ 00836 on = 1; 00837 if(aip2->ai_family == AF_INET6 && setsockopt(sd[i], IPPROTO_IPV6, IPV6_V6ONLY, (const void *)&on , sizeof(on)) < 0) { 00838 perror("setsockopt(IPV6_V6ONLY) failed"); 00839 } 00840 00841 /* perhaps we will use this keep-alive feature oneday */ 00842 /* setsockopt(sd, SOL_SOCKET, SO_KEEPALIVE, &on, sizeof(on)); */ 00843 00844 if(bind(sd[i], aip2->ai_addr, aip2->ai_addrlen) < 0) { 00845 perror("bind"); 00846 sd[i] = -1; 00847 continue; 00848 } 00849 00850 if(listen(sd[i], 10) < 0) { 00851 perror("listen"); 00852 sd[i] = -1; 00853 } else { 00854 i++; 00855 if(i >= MAX_NUM_SOCKETS) { 00856 ROS_ERROR("Maximum number of server sockets exceeded"); 00857 i--; 00858 break; 00859 } 00860 } 00861 } 00862 00863 sd_len = i; 00864 00865 if(sd_len < 1) { 00866 ROS_ERROR("Bind(%d) failed", port_); 00867 closelog(); 00868 exit(EXIT_FAILURE); 00869 } 00870 else { 00871 ROS_INFO("Bind(%d) succeeded", port_); 00872 } 00873 00874 ROS_INFO("waiting for clients to connect"); 00875 00876 /* create a child for every client that connects */ 00877 while(!stop_requested_) { 00878 00879 do { 00880 FD_ZERO(&selectfds); 00881 00882 for(i = 0; i < MAX_NUM_SOCKETS; i++) { 00883 if(sd[i] != -1) { 00884 FD_SET(sd[i], &selectfds); 00885 00886 if(sd[i] > max_fds) 00887 max_fds = sd[i]; 00888 } 00889 } 00890 00891 err = select(max_fds + 1, &selectfds, NULL, NULL, NULL); 00892 00893 if(err < 0 && errno != EINTR) { 00894 perror("select"); 00895 exit(EXIT_FAILURE); 00896 } 00897 } while(err <= 0); 00898 00899 ROS_INFO("Client connected"); 00900 00901 for(i = 0; i < max_fds + 1; i++) { 00902 if(sd[i] != -1 && FD_ISSET(sd[i], &selectfds)) { 00903 int fd = accept(sd[i], (struct sockaddr *)&client_addr, &addr_len); 00904 00905 /* start new thread that will handle this TCP connected client */ 00906 ROS_DEBUG("create thread to handle client that just established a connection"); 00907 00908 if(getnameinfo((struct sockaddr *)&client_addr, addr_len, name, sizeof(name), NULL, 0, NI_NUMERICHOST) == 0) { 00909 syslog(LOG_INFO, "serving client: %s\n", name); 00910 } 00911 00912 boost::thread t(boost::bind( &MJPEGServer::client, this, fd )); 00913 t.detach(); 00914 } 00915 } 00916 } 00917 00918 ROS_INFO("leaving server thread, calling cleanup function now"); 00919 cleanUp(); 00920 } 00921 00922 void MJPEGServer::cleanUp() { 00923 ROS_INFO("cleaning up ressources allocated by server thread"); 00924 00925 for(int i = 0; i < MAX_NUM_SOCKETS; i++) 00926 close(sd[i]); 00927 } 00928 00929 void MJPEGServer::spin() { 00930 boost::thread t(boost::bind( &MJPEGServer::execute, this )); 00931 t.detach(); 00932 ros::spin(); 00933 ROS_INFO("stop requested"); 00934 stop(); 00935 } 00936 00937 void MJPEGServer::stop() { 00938 stop_requested_ = true; 00939 } 00940 00941 } 00942 00943 int main(int argc, char** argv){ 00944 ros::init(argc, argv, "mjpeg_server"); 00945 00946 ros::NodeHandle nh; 00947 mjpeg_server::MJPEGServer server(nh); 00948 server.spin(); 00949 00950 return(0); 00951 } 00952