mjpeg_server.cpp
Go to the documentation of this file.
00001 /*********************************************************************
00002  *
00003  * Software License Agreement (BSD License)
00004  *
00005  *  Copyright (c) 2010, Robert Bosch LLC.
00006  *  All rights reserved.
00007  *
00008  *  Redistribution and use in source and binary forms, with or without
00009  *  modification, are permitted provided that the following conditions
00010  *  are met:
00011  *
00012  *   * Redistributions of source code must retain the above copyright
00013  *     notice, this list of conditions and the following disclaimer.
00014  *   * Redistributions in binary form must reproduce the above
00015  *     copyright notice, this list of conditions and the following
00016  *     disclaimer in the documentation and/or other materials provided
00017  *     with the distribution.
00018  *   * Neither the name of the Robert Bosch nor the names of its
00019  *     contributors may be used to endorse or promote products derived
00020  *     from this software without specific prior written permission.
00021  *
00022  *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00023  *  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00024  *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00025  *  FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00026  *  COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00027  *  INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00028  *  BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00029  *  LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00030  *  CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00031  *  LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00032  *  ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00033  *  POSSIBILITY OF SUCH DAMAGE.
00034  *
00035  *********************************************************************/
00036 #include "mjpeg_server/mjpeg_server.h"
00037 #include <highgui.h>
00038 
00039 #include <sys/ioctl.h>
00040 #include <errno.h>
00041 #include <signal.h>
00042 #include <sys/socket.h>
00043 #include <arpa/inet.h>
00044 #include <sys/types.h>
00045 #include <sys/stat.h>
00046 #include <fcntl.h>
00047 #include <syslog.h>
00048 #include <netdb.h>
00049 #include <errno.h>
00050 
00051 #include <opencv2/opencv.hpp>
00052 
00053 #include <boost/thread.hpp>
00054 #include <boost/bind.hpp>
00055 
00056 
00057 template <typename T>
00058 inline T ABS(T a){ 
00059     return (a < 0 ? -a : a);
00060 }
00061 
00062 template <typename T>
00063 inline T min(T a, T b){ 
00064     return (a < b ? a : b);
00065 }
00066 
00067 template <typename T>
00068 inline T max(T a, T b){ 
00069     return (a > b ? a : b);
00070 }
00071 
00072 template <typename T>
00073 inline T LENGTH_OF(T x){ 
00074     return (sizeof(x)/sizeof(x[0]));
00075 }
00076 
00077 
00078 
00079 namespace mjpeg_server {
00080 
00081 MJPEGServer::MJPEGServer(ros::NodeHandle& node) :
00082   node_(node), image_transport_(node), stop_requested_(false), www_folder_(NULL)
00083 {
00084   ros::NodeHandle private_nh("~");
00085   private_nh.param("port", port_, 8080);
00086   header= "Connection: close\r\nServer: mjpeg_server\r\n"               \
00087     "Cache-Control: no-cache, no-store, must-revalidate, pre-check=0, post-check=0, max-age=0\r\n" \
00088     "Pragma: no-cache\r\n";
00089 }
00090 
00091 MJPEGServer::~MJPEGServer() {
00092   cleanUp();
00093 }
00094 
00095 void MJPEGServer::imageCallback(const sensor_msgs::ImageConstPtr& msg, const std::string& topic) {
00096 
00097   ImageBuffer* image_buffer = getImageBuffer(topic);
00098   boost::unique_lock<boost::mutex> lock(image_buffer->mutex_);
00099   // copy image
00100   image_buffer->msg = *msg;
00101   // notify senders
00102   image_buffer->condition_.notify_all();
00103 }
00104 
00105 void MJPEGServer::splitString(const std::string& str, std::vector<std::string>& tokens, const std::string& delimiter)
00106 {
00107   // Skip delimiters at beginning.
00108   std::string::size_type lastPos = str.find_first_not_of(delimiter, 0);
00109   // Find first "non-delimiter".
00110   std::string::size_type pos     = str.find_first_of(delimiter, lastPos);
00111 
00112   while (std::string::npos != pos || std::string::npos != lastPos)
00113   {
00114     // Found a token, add it to the vector.
00115     tokens.push_back(str.substr(lastPos, pos - lastPos));
00116     // Skip delimiters.  Note the "not_of"
00117     lastPos = str.find_first_not_of(delimiter, pos);
00118     // Find next "non-delimiter"
00119     pos = str.find_first_of(delimiter, lastPos);
00120   }
00121 }
00122 
00123 int MJPEGServer::stringToInt(const std::string& str, const int default_value)
00124 {
00125   int value;
00126   int res;
00127   if(str.length() == 0)
00128     return default_value;
00129   res = sscanf(str.c_str(),"%i",&value);
00130   if (res == 1)
00131     return value;
00132   return default_value;
00133 }
00134 
00135 void MJPEGServer::initIOBuffer(iobuffer *iobuf)
00136 {
00137     memset(iobuf->buffer, 0, sizeof(iobuf->buffer));
00138     iobuf->level = 0;
00139 }
00140 
00141 void MJPEGServer::initRequest(request *req)
00142 {
00143     req->type        = A_UNKNOWN;
00144     req->type        = A_UNKNOWN;
00145     req->parameter   = NULL;
00146     req->client      = NULL;
00147     req->credentials = NULL;
00148 }
00149 
00150 void MJPEGServer::freeRequest(request *req)
00151 {
00152     if(req->parameter != NULL) free(req->parameter);
00153     if(req->client != NULL) free(req->client);
00154     if(req->credentials != NULL) free(req->credentials);
00155 }
00156 
00157 int MJPEGServer::readWithTimeout(int fd, iobuffer *iobuf, char *buffer, size_t len, int timeout)
00158 {
00159     size_t copied = 0;
00160     int rc, i;
00161     fd_set fds;
00162     struct timeval tv;
00163 
00164     memset(buffer, 0, len);
00165 
00166     while((copied < len)) {
00167         i = min((size_t)iobuf->level, len - copied);
00168         memcpy(buffer + copied, iobuf->buffer + IO_BUFFER - iobuf->level, i);
00169 
00170         iobuf->level -= i;
00171         copied += i;
00172         if(copied >= len)
00173             return copied;
00174 
00175         /* select will return in case of timeout or new data arrived */
00176         tv.tv_sec = timeout;
00177         tv.tv_usec = 0;
00178         FD_ZERO(&fds);
00179         FD_SET(fd, &fds);
00180         if((rc = select(fd + 1, &fds, NULL, NULL, &tv)) <= 0) {
00181             if(rc < 0)
00182                 exit(EXIT_FAILURE);
00183 
00184             /* this must be a timeout */
00185             return copied;
00186         }
00187 
00188         initIOBuffer(iobuf);
00189 
00190         /*
00191          * there should be at least one byte, because select signalled it.
00192          * But: It may happen (very seldomly), that the socket gets closed remotly between
00193          * the select() and the following read. That is the reason for not relying
00194          * on reading at least one byte.
00195          */
00196         if((iobuf->level = read(fd, &iobuf->buffer, IO_BUFFER)) <= 0) {
00197             /* an error occured */
00198             return -1;
00199         }
00200 
00201         /* align data to the end of the buffer if less than IO_BUFFER bytes were read */
00202         memmove(iobuf->buffer + (IO_BUFFER - iobuf->level), iobuf->buffer, iobuf->level);
00203     }
00204 
00205     return 0;
00206 }
00207 
00208 int MJPEGServer::readLineWithTimeout(int fd, iobuffer *iobuf, char *buffer, size_t len, int timeout)
00209 {
00210     char c = '\0', *out = buffer;
00211     unsigned int i;
00212 
00213     memset(buffer, 0, len);
00214 
00215     for(i = 0; i < len && c != '\n'; i++) {
00216         if(readWithTimeout(fd, iobuf, &c, 1, timeout) <= 0) {
00217             /* timeout or error occured */
00218             return -1;
00219         }
00220         *out++ = c;
00221     }
00222 
00223     return i;
00224 }
00225 
00226 void MJPEGServer::decodeBase64(char *data)
00227 {
00228   union {
00229     int i;
00230     char c[4];
00231   } buffer;
00232 
00233   char* ptr = data;
00234   unsigned int size = strlen(data);
00235   char* temp = new char[size];
00236   char* tempptr = temp;
00237   char t;
00238 
00239   for(buffer.i = 0,t = *ptr; ptr; ptr++)
00240   {
00241     if(t >= 'A' && t <= 'Z')
00242       t = t - 'A';
00243     else if(t >= 'a' && t <= 'z')
00244       t = t - 'a' + 26;
00245     else if(t >= '0' && t <= '9')
00246       t = t - '0' + 52;
00247     else if(t == '+')
00248       t = 62;
00249     else if(t == '/')
00250       t = 63;
00251     else
00252       continue;
00253 
00254     buffer.i = (buffer.i << 6) | t;
00255 
00256     if((ptr - data +1) % 4)
00257     {
00258       *tempptr++ = buffer.c[2];
00259       *tempptr++ = buffer.c[1];
00260       *tempptr++ = buffer.c[0];
00261       buffer.i = 0;
00262     }
00263   }
00264   *tempptr = '\0';
00265   strcpy(data,temp);
00266   delete temp;
00267 }
00268 
00269 int MJPEGServer::hexCharToInt(char in)
00270 {
00271     if(in >= '0' && in <= '9')
00272         return in - '0';
00273 
00274     if(in >= 'a' && in <= 'f')
00275         return (in - 'a') + 10;
00276 
00277     if(in >= 'A' && in <= 'F')
00278         return (in - 'A') + 10;
00279 
00280     return -1;
00281 }
00282 
00283 int MJPEGServer::unescape(char *string)
00284 {
00285     char *source = string, *destination = string;
00286     int src, dst, length = strlen(string), rc;
00287 
00288     /* iterate over the string */
00289     for(dst = 0, src = 0; src < length; src++) {
00290 
00291         /* is it an escape character? */
00292         if(source[src] != '%') {
00293             /* no, so just go to the next character */
00294             destination[dst] = source[src];
00295             dst++;
00296             continue;
00297         }
00298 
00299         /* yes, it is an escaped character */
00300 
00301         /* check if there are enough characters */
00302         if(src + 2 > length) {
00303             return -1;
00304             break;
00305         }
00306 
00307         /* perform replacement of %## with the corresponding character */
00308         if((rc = hexCharToInt(source[src+1])) == -1) return -1;
00309         destination[dst] = rc * 16;
00310         if((rc = hexCharToInt(source[src+2])) == -1) return -1;
00311         destination[dst] += rc;
00312 
00313         /* advance pointers, here is the reason why the resulting string is shorter */
00314         dst++; src += 2;
00315     }
00316 
00317     /* ensure the string is properly finished with a null-character */
00318     destination[dst] = '\0';
00319 
00320     return 0;
00321 }
00322 
00323 void MJPEGServer::sendError(int fd, int which, const char *message)
00324 {
00325     char buffer[BUFFER_SIZE] = {0};
00326 
00327     if(which == 401) {
00328         sprintf(buffer, "HTTP/1.0 401 Unauthorized\r\n" \
00329                 "Content-type: text/plain\r\n" \
00330                 "%s"\
00331                 "WWW-Authenticate: Basic realm=\"MJPG-Streamer\"\r\n" \
00332                 "\r\n" \
00333                 "401: Not Authenticated!\r\n" \
00334                 "%s", header.c_str(), message);
00335     } else if(which == 404) {
00336         sprintf(buffer, "HTTP/1.0 404 Not Found\r\n" \
00337                 "Content-type: text/plain\r\n" \
00338                 "%s" \
00339                 "\r\n" \
00340                 "404: Not Found!\r\n" \
00341                 "%s", header.c_str(), message);
00342     } else if(which == 500) {
00343         sprintf(buffer, "HTTP/1.0 500 Internal Server Error\r\n" \
00344                 "Content-type: text/plain\r\n" \
00345                 "%s"\
00346                 "\r\n" \
00347                 "500: Internal Server Error!\r\n" \
00348                 "%s", header.c_str(), message);
00349     } else if(which == 400) {
00350         sprintf(buffer, "HTTP/1.0 400 Bad Request\r\n" \
00351                 "Content-type: text/plain\r\n" \
00352                 "%s"\
00353                 "\r\n" \
00354                 "400: Not Found!\r\n" \
00355                 "%s", header.c_str(), message);
00356     } else {
00357         sprintf(buffer, "HTTP/1.0 501 Not Implemented\r\n" \
00358                 "Content-type: text/plain\r\n" \
00359                 "%s"\
00360                 "\r\n" \
00361                 "501: Not Implemented!\r\n" \
00362                 "%s", header.c_str(), message);
00363     }
00364 
00365     if(write(fd, buffer, strlen(buffer)) < 0) {
00366         ROS_DEBUG("write failed, done anyway");
00367     }
00368 }
00369 
00370 void MJPEGServer::decodeParameter(const std::string& parameter, ParameterMap& parameter_map)
00371 {
00372   std::vector<std::string> parameter_value_pairs;
00373   splitString(parameter,parameter_value_pairs, "?&");
00374 
00375   for(size_t i=0; i<parameter_value_pairs.size(); i++) {
00376     std::vector<std::string> parameter_value;
00377     splitString(parameter_value_pairs[i], parameter_value, "=");
00378     if(parameter_value.size()==1) {
00379       parameter_map.insert(std::make_pair(parameter_value[0],std::string("")));
00380     }
00381     else if(parameter_value.size()==2) {
00382       parameter_map.insert(std::make_pair(parameter_value[0],parameter_value[1]));
00383     }
00384   }
00385 }
00386 
00387 ImageBuffer* MJPEGServer::getImageBuffer(const std::string& topic)
00388 {
00389   boost::unique_lock<boost::mutex> lock(image_maps_mutex_);
00390   ImageSubscriberMap::iterator it = image_subscribers_.find(topic);
00391   if (it == image_subscribers_.end()) {
00392     image_subscribers_[topic] = image_transport_.subscribe(topic, 1, boost::bind(&MJPEGServer::imageCallback, this, _1, topic));
00393     image_buffers_[topic] = new ImageBuffer();
00394     ROS_INFO("Subscribing to topic %s", topic.c_str());
00395   }
00396   ImageBuffer* image_buffer = image_buffers_[topic];
00397   return image_buffer;
00398 }
00399 
00400 // rotate input image at 180 degrees
00401 void MJPEGServer::invertImage(const cv::Mat& input, cv::Mat& output) {
00402 
00403   cv::Mat_<cv::Vec3b>& input_img = (cv::Mat_<cv::Vec3b>&)input; //3 channel pointer to image
00404   cv::Mat_<cv::Vec3b>& output_img = (cv::Mat_<cv::Vec3b>&)output; //3 channel pointer to image
00405   cv::Size size = input.size();
00406 
00407   for (int j = 0; j < size.height; ++j)
00408     for (int i = 0; i < size.width; ++i) {
00409       //outputImage.imageData[size.height*size.width - (i + j*size.width) - 1] = inputImage.imageData[i + j*size.width];
00410       output_img(size.height-j-1, size.width-i-1) = input_img(j,i);
00411     }
00412   return;
00413 }
00414 
00415 void MJPEGServer::sendStream(int fd, const char *parameter)
00416 {
00417     unsigned char *frame = NULL, *tmp = NULL;
00418     int frame_size = 0, max_frame_size = 0;
00419     int tenk=10*1024;
00420     char buffer[BUFFER_SIZE] = {0};
00421     struct timeval timestamp;
00422     //sensor_msgs::CvBridge image_bridge;
00423     //sensor_msgs::cv_bridge image_bridge;
00424     cv_bridge::CvImage image_bridge;
00425 
00426     ROS_DEBUG("Decoding parameter");
00427 
00428     std::string params = parameter;
00429 
00430 
00431 
00432 
00433 
00434     ParameterMap parameter_map;
00435     decodeParameter(params, parameter_map);
00436 
00437     ParameterMap::iterator itp = parameter_map.find("topic");
00438     if (itp == parameter_map.end()) return;
00439 
00440     std::string topic = itp->second;
00441     ImageBuffer* image_buffer = getImageBuffer(topic);
00442 
00443     ROS_DEBUG("preparing header");
00444     sprintf(buffer, "HTTP/1.0 200 OK\r\n" \
00445             "%s" \
00446             "Content-Type: multipart/x-mixed-replace;boundary=boundarydonotcross \r\n" \
00447             "\r\n" \
00448             "--boundarydonotcross \r\n", header.c_str());
00449 
00450     if(write(fd, buffer, strlen(buffer)) < 0) {
00451         free(frame);
00452         return;
00453     }
00454 
00455     ROS_DEBUG("Headers send, sending stream now");
00456 
00457     while(!stop_requested_) {
00458         {
00459           /* wait for fresh frames */
00460           boost::unique_lock<boost::mutex> lock(image_buffer->mutex_);
00461           image_buffer->condition_.wait(lock);
00462 
00463           //IplImage* image;
00464           cv_bridge::CvImagePtr cv_msg;
00465           try {
00466            if (cv_msg = cv_bridge::toCvCopy(image_buffer->msg, "bgr8")) {
00467              ;//image = image_bridge.toIpl();
00468            }
00469            else {
00470              ROS_ERROR("Unable to convert %s image to bgr8", image_buffer->msg.encoding.c_str());
00471              return;
00472            }
00473           }
00474           catch(...) {
00475            ROS_ERROR("Unable to convert %s image to ipl format", image_buffer->msg.encoding.c_str());
00476            return;
00477           }
00478 
00479           // encode image
00480           cv::Mat img = cv_msg->image;
00481           std::vector<uchar> encoded_buffer;
00482           std::vector<int> encode_params;
00483 
00484           // invert
00485           //int invert = 0;
00486           if(parameter_map.find("invert") != parameter_map.end()) {
00487             cv::Mat cloned_image = img.clone();
00488             invertImage(cloned_image, img);
00489           }
00490 
00491           // quality
00492           int quality = 95;
00493           if(parameter_map.find("quality") != parameter_map.end()) {
00494             quality = stringToInt(parameter_map["quality"]);
00495           }
00496           encode_params.push_back(CV_IMWRITE_JPEG_QUALITY);
00497           encode_params.push_back(quality);
00498 
00499           // resize image
00500           if(parameter_map.find("width") != parameter_map.end() && parameter_map.find("height") != parameter_map.end()) {
00501             int width = stringToInt(parameter_map["width"]);
00502             int height = stringToInt(parameter_map["height"]);
00503             if(width > 0 && height > 0) {
00504               cv::Mat img_resized;
00505               cv::Size new_size(width,height);
00506               cv::resize(img, img_resized, new_size);
00507               cv::imencode(".jpeg", img_resized, encoded_buffer, encode_params);
00508             }
00509             else {
00510               cv::imencode(".jpeg", img, encoded_buffer, encode_params);
00511             }
00512           }
00513           else {
00514             cv::imencode(".jpeg", img, encoded_buffer, encode_params);
00515           }
00516 
00517           // copy encoded frame buffer
00518           frame_size = encoded_buffer.size();
00519 
00520           /* check if frame buffer is large enough, increase it if necessary */
00521           if(frame_size > max_frame_size) {
00522               ROS_DEBUG("increasing frame buffer size to %d", frame_size);
00523 
00524               max_frame_size = frame_size + tenk;
00525               if((tmp = (unsigned char*)realloc(frame, max_frame_size)) == NULL) {
00526                   free(frame);
00527                   sendError(fd, 500, "not enough memory");
00528                   return;
00529               }
00530               frame = tmp;
00531           }
00532 
00533           /* copy v4l2_buffer timeval to user space */
00534           timestamp.tv_sec = ros::Time::now().toSec();
00535 
00536           memcpy(frame, &encoded_buffer[0], frame_size);
00537           ROS_DEBUG("got frame (size: %d kB)", frame_size / 1024);
00538         }
00539 
00540         /*
00541          * print the individual mimetype and the length
00542          * sending the content-length fixes random stream disruption observed
00543          * with firefox
00544          */
00545         sprintf(buffer, "Content-Type: image/jpeg\r\n" \
00546                 "Content-Length: %d\r\n" \
00547                 "X-Timestamp: %d.%06d\r\n" \
00548                 "\r\n", frame_size, (int)timestamp.tv_sec, (int)timestamp.tv_usec);
00549         ROS_DEBUG("sending intemdiate header");
00550         if(write(fd, buffer, strlen(buffer)) < 0) break;
00551 
00552         ROS_DEBUG("sending frame");
00553         if(write(fd, frame, frame_size) < 0) break;
00554 
00555         ROS_DEBUG("sending boundary");
00556         sprintf(buffer, "\r\n--boundarydonotcross \r\n");
00557         if(write(fd, buffer, strlen(buffer)) < 0) break;
00558     }
00559 
00560     free(frame);
00561 }
00562 
00563 void MJPEGServer::sendSnapshot(int fd, const char *parameter)
00564 {
00565   unsigned char *frame = NULL;
00566   int frame_size = 0;
00567   char buffer[BUFFER_SIZE] = {0};
00568   struct timeval timestamp;
00569   //sensor_msgs::CvBridge image_bridge;
00570   //sensor_msgs::cv_bridge image_bridge;
00571 
00572   std::string params = parameter;
00573   ParameterMap parameter_map;
00574   decodeParameter(params, parameter_map); // http://merry:8080/stream?topic=/remote_lab_cam1/image_raw?invert=1
00575 
00576   ParameterMap::iterator itp = parameter_map.find("topic");
00577   if (itp == parameter_map.end()) return;
00578 
00579   std::string topic = itp->second;
00580   ImageBuffer* image_buffer = getImageBuffer(topic);
00581 
00582   /* wait for fresh frames */
00583   boost::unique_lock<boost::mutex> lock(image_buffer->mutex_);
00584   image_buffer->condition_.wait(lock);
00585 
00586   //IplImage* image;
00587   cv_bridge::CvImagePtr cv_msg;
00588   try {
00589    if (cv_msg = cv_bridge::toCvCopy(image_buffer->msg, "bgr8")) {
00590      ;//image = image_bridge.toIpl();
00591    }
00592    else {
00593      ROS_ERROR("Unable to convert %s image to bgr8", image_buffer->msg.encoding.c_str());
00594      return;
00595    }
00596   }
00597   catch(...) {
00598    ROS_ERROR("Unable to convert %s image to ipl format", image_buffer->msg.encoding.c_str());
00599    return;
00600   }
00601 
00602   cv::Mat img = cv_msg->image;
00603   std::vector<uchar> encoded_buffer;
00604   std::vector<int> encode_params;
00605 
00606   // invert
00607   //int invert = 0;
00608   if(parameter_map.find("invert") != parameter_map.end()) {
00609     cv::Mat cloned_image = img.clone();
00610     invertImage(cloned_image, img);
00611   }
00612 
00613   // quality
00614   int quality = 95;
00615   if(parameter_map.find("quality") != parameter_map.end()) {
00616     quality = stringToInt(parameter_map["quality"]);
00617   }
00618   encode_params.push_back(CV_IMWRITE_JPEG_QUALITY);
00619   encode_params.push_back(quality);
00620 
00621   // resize image
00622   if(parameter_map.find("width") != parameter_map.end() && parameter_map.find("height") != parameter_map.end()) {
00623     int width = stringToInt(parameter_map["width"]);
00624     int height = stringToInt(parameter_map["height"]);
00625     if(width > 0 && height > 0) {
00626       cv::Mat img_resized;
00627       cv::Size new_size(width,height);
00628       cv::resize(img, img_resized, new_size);
00629       cv::imencode(".jpeg", img_resized, encoded_buffer, encode_params);
00630     }
00631     else {
00632       cv::imencode(".jpeg", img, encoded_buffer, encode_params);
00633     }
00634   }
00635   else {
00636     cv::imencode(".jpeg", img, encoded_buffer, encode_params);
00637   }
00638 
00639   // copy encoded frame buffer
00640   frame_size = encoded_buffer.size();
00641 
00642   // resize buffer
00643   if((frame = (unsigned char*)malloc(frame_size)) == NULL) {
00644       free(frame);
00645       sendError(fd, 500, "not enough memory");
00646       return;
00647   }
00648 
00649   /* copy v4l2_buffer timeval to user space */
00650   timestamp.tv_sec = ros::Time::now().toSec();
00651 
00652   memcpy(frame, &encoded_buffer[0], frame_size);
00653   ROS_DEBUG("got frame (size: %d kB)", frame_size / 1024);
00654 
00655   /* write the response */
00656   sprintf(buffer, "HTTP/1.0 200 OK\r\n" \
00657           "%s" \
00658           "Content-type: image/jpeg\r\n" \
00659           "X-Timestamp: %d.%06d\r\n" \
00660           "\r\n",header.c_str(), (int) timestamp.tv_sec, (int) timestamp.tv_usec);
00661 
00662   /* send header and image now */
00663   if(write(fd, buffer, strlen(buffer)) < 0 || \
00664           write(fd, frame, frame_size) < 0) {
00665       free(frame);
00666       return;
00667   }
00668 
00669   free(frame);
00670 }
00671 
00672 void MJPEGServer::client(int fd) {
00673   int cnt;
00674   char buffer[BUFFER_SIZE] = {0}, *pb = buffer;
00675   iobuffer iobuf;
00676   request req;
00677 
00678   /* initializes the structures */
00679   initIOBuffer(&iobuf);
00680   initRequest(&req);
00681 
00682   /* What does the client want to receive? Read the request. */
00683   memset(buffer, 0, sizeof(buffer));
00684   if((cnt = readLineWithTimeout(fd, &iobuf, buffer, sizeof(buffer) - 1, 5)) == -1) {
00685       close(fd);
00686       return;
00687   }
00688 
00689   /* determine what to deliver */
00690   if(strstr(buffer, "GET /?") != NULL) {
00691     req.type = A_STREAM;
00692 
00693     /* advance by the length of known string */
00694     if((pb = strstr(buffer, "GET /")) == NULL) {
00695         ROS_DEBUG("HTTP request seems to be malformed");
00696         sendError(fd, 400, "Malformed HTTP request");
00697         close(fd);
00698         return;
00699     }
00700     pb += strlen("GET /"); // a pb points to the string after the first & after command
00701     int len = min(max((int)strspn(pb, "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ._/-1234567890?="), 0), 100);
00702     req.parameter = (char*)malloc(len + 1);
00703     if(req.parameter == NULL) {
00704         exit(EXIT_FAILURE);
00705     }
00706     memset(req.parameter, 0, len + 1);
00707     strncpy(req.parameter, pb, len);
00708 
00709     ROS_DEBUG("requested image topic[%d]: \"%s\"", len, req.parameter);
00710   }
00711   else if(strstr(buffer, "GET /stream?") != NULL) {
00712     req.type = A_STREAM;
00713 
00714     /* advance by the length of known string */
00715     if((pb = strstr(buffer, "GET /stream")) == NULL) {
00716         ROS_DEBUG("HTTP request seems to be malformed");
00717         sendError(fd, 400, "Malformed HTTP request");
00718         close(fd);
00719         return;
00720     }
00721     pb += strlen("GET /stream"); // a pb points to the string after the first & after command
00722     int len = min(max((int)strspn(pb, "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ._/-1234567890?="), 0), 100);
00723     req.parameter = (char*)malloc(len + 1);
00724     if(req.parameter == NULL) {
00725         exit(EXIT_FAILURE);
00726     }
00727     memset(req.parameter, 0, len + 1);
00728     strncpy(req.parameter, pb, len);
00729 
00730     ROS_DEBUG("requested image topic[%d]: \"%s\"", len, req.parameter);
00731   }
00732   else if(strstr(buffer, "GET /snapshot?") != NULL) {
00733     req.type = A_SNAPSHOT;
00734 
00735     /* advance by the length of known string */
00736     if((pb = strstr(buffer, "GET /snapshot")) == NULL) {
00737         ROS_DEBUG("HTTP request seems to be malformed");
00738         sendError(fd, 400, "Malformed HTTP request");
00739         close(fd);
00740         return;
00741     }
00742     pb += strlen("GET /snapshot"); // a pb points to the string after the first & after command
00743     int len = min(max((int)strspn(pb, "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ._/-1234567890?="), 0), 100);
00744     req.parameter = (char*)malloc(len + 1);
00745     if(req.parameter == NULL) {
00746         exit(EXIT_FAILURE);
00747     }
00748     memset(req.parameter, 0, len + 1);
00749     strncpy(req.parameter, pb, len);
00750 
00751     ROS_DEBUG("requested image topic[%d]: \"%s\"", len, req.parameter);
00752   }
00753 
00754   /*
00755    * parse the rest of the HTTP-request
00756    * the end of the request-header is marked by a single, empty line with "\r\n"
00757    */
00758   do {
00759     memset(buffer, 0, sizeof(buffer));
00760 
00761     if((cnt = readLineWithTimeout(fd, &iobuf, buffer, sizeof(buffer) - 1, 5)) == -1) {
00762         freeRequest(&req);
00763         close(fd);
00764         return;
00765     }
00766 
00767     if(strstr(buffer, "User-Agent: ") != NULL) {
00768         req.client = strdup(buffer + strlen("User-Agent: "));
00769     } else if(strstr(buffer, "Authorization: Basic ") != NULL) {
00770         req.credentials = strdup(buffer + strlen("Authorization: Basic "));
00771         decodeBase64(req.credentials);
00772         ROS_DEBUG("username:password: %s", req.credentials);
00773     }
00774 
00775   } while(cnt > 2 && !(buffer[0] == '\r' && buffer[1] == '\n'));
00776 
00777 
00778 
00779   /* now it's time to answer */
00780   switch(req.type) {
00781   case A_STREAM: {
00782       ROS_DEBUG("Request for streaming");
00783       sendStream(fd, req.parameter);
00784       break;
00785   }
00786   case A_SNAPSHOT: {
00787       ROS_DEBUG("Request for snapshot");
00788       sendSnapshot(fd, req.parameter);
00789       break;
00790   }
00791   default:
00792       ROS_DEBUG("unknown request");
00793   }
00794 
00795   close(fd);
00796   freeRequest(&req);
00797 
00798   ROS_INFO("Disconnecting HTTP client");
00799   return;
00800 }
00801 
00802 void MJPEGServer::execute() {
00803 
00804   ROS_INFO("Starting mjpeg server");
00805 
00806   struct addrinfo *aip, *aip2;
00807   struct addrinfo hints;
00808   struct sockaddr_storage client_addr;
00809   socklen_t addr_len = sizeof(struct sockaddr_storage);
00810   fd_set selectfds;
00811   int max_fds = 0;
00812 
00813   char name[NI_MAXHOST];
00814 
00815   bzero(&hints, sizeof(hints));
00816   hints.ai_family = PF_UNSPEC;
00817   hints.ai_flags = AI_PASSIVE;
00818   hints.ai_socktype = SOCK_STREAM;
00819 
00820   int err;
00821   snprintf(name, sizeof(name), "%d", port_);
00822   if((err = getaddrinfo(NULL, name, &hints, &aip)) != 0) {
00823       perror(gai_strerror(err));
00824       exit(EXIT_FAILURE);
00825   }
00826 
00827   for(int i = 0; i < MAX_NUM_SOCKETS; i++)
00828       sd[i] = -1;
00829 
00830 
00831   /* open sockets for server (1 socket / address family) */
00832   int i = 0;
00833   int on;
00834   for(aip2 = aip; aip2 != NULL; aip2 = aip2->ai_next) {
00835       if((sd[i] = socket(aip2->ai_family, aip2->ai_socktype, 0)) < 0) {
00836           continue;
00837       }
00838 
00839       /* ignore "socket already in use" errors */
00840       on = 1;
00841       if(setsockopt(sd[i], SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0) {
00842           perror("setsockopt(SO_REUSEADDR) failed");
00843       }
00844 
00845       /* IPv6 socket should listen to IPv6 only, otherwise we will get "socket already in use" */
00846       on = 1;
00847       if(aip2->ai_family == AF_INET6 && setsockopt(sd[i], IPPROTO_IPV6, IPV6_V6ONLY, (const void *)&on , sizeof(on)) < 0) {
00848           perror("setsockopt(IPV6_V6ONLY) failed");
00849       }
00850 
00851       /* perhaps we will use this keep-alive feature oneday */
00852       /* setsockopt(sd, SOL_SOCKET, SO_KEEPALIVE, &on, sizeof(on)); */
00853 
00854       if(bind(sd[i], aip2->ai_addr, aip2->ai_addrlen) < 0) {
00855           perror("bind");
00856           sd[i] = -1;
00857           continue;
00858       }
00859 
00860       if(listen(sd[i], 10) < 0) {
00861           perror("listen");
00862           sd[i] = -1;
00863       } else {
00864           i++;
00865           if(i >= MAX_NUM_SOCKETS) {
00866               ROS_ERROR("Maximum number of server sockets exceeded");
00867               i--;
00868               break;
00869           }
00870       }
00871   }
00872 
00873   sd_len = i;
00874 
00875   if(sd_len < 1) {
00876       ROS_ERROR("Bind(%d) failed", port_);
00877       closelog();
00878       exit(EXIT_FAILURE);
00879   }
00880   else {
00881       ROS_INFO("Bind(%d) succeeded", port_);
00882   }
00883 
00884   ROS_INFO("waiting for clients to connect");
00885 
00886   /* create a child for every client that connects */
00887   while(!stop_requested_) {
00888 
00889       do {
00890           FD_ZERO(&selectfds);
00891 
00892           for(i = 0; i < MAX_NUM_SOCKETS; i++) {
00893               if(sd[i] != -1) {
00894                   FD_SET(sd[i], &selectfds);
00895 
00896                   if(sd[i] > max_fds)
00897                       max_fds = sd[i];
00898               }
00899           }
00900 
00901           err = select(max_fds + 1, &selectfds, NULL, NULL, NULL);
00902 
00903           if(err < 0 && errno != EINTR) {
00904               perror("select");
00905               exit(EXIT_FAILURE);
00906           }
00907       } while(err <= 0);
00908 
00909       ROS_INFO("Client connected");
00910 
00911       for(i = 0; i < max_fds + 1; i++) {
00912           if(sd[i] != -1 && FD_ISSET(sd[i], &selectfds)) {
00913               int fd = accept(sd[i], (struct sockaddr *)&client_addr, &addr_len);
00914 
00915               /* start new thread that will handle this TCP connected client */
00916               ROS_DEBUG("create thread to handle client that just established a connection");
00917 
00918               if(getnameinfo((struct sockaddr *)&client_addr, addr_len, name, sizeof(name), NULL, 0, NI_NUMERICHOST) == 0) {
00919                   syslog(LOG_INFO, "serving client: %s\n", name);
00920               }
00921 
00922               boost::thread t(boost::bind( &MJPEGServer::client, this, fd ));
00923               t.detach();
00924           }
00925       }
00926   }
00927 
00928   ROS_INFO("leaving server thread, calling cleanup function now");
00929   cleanUp();
00930 }
00931 
00932 void MJPEGServer::cleanUp() {
00933   ROS_INFO("cleaning up ressources allocated by server thread");
00934 
00935   for(int i = 0; i < MAX_NUM_SOCKETS; i++)
00936       close(sd[i]);
00937 }
00938 
00939 void MJPEGServer::spin() {
00940   boost::thread t(boost::bind( &MJPEGServer::execute, this ));
00941   t.detach();
00942   ros::spin();
00943   ROS_INFO("stop requested");
00944   stop();
00945 }
00946 
00947 void MJPEGServer::stop() {
00948   stop_requested_ = true;
00949 }
00950 
00951 }
00952 
00953 int main(int argc, char** argv){
00954   ros::init(argc, argv, "mjpeg_server");
00955 
00956   ros::NodeHandle nh;
00957   mjpeg_server::MJPEGServer server(nh);
00958   server.spin();
00959 
00960   return(0);
00961 }
00962 
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines


mjpeg_server
Author(s): Maintained by Benjamin Pitzer
autogenerated on Thu May 2 2013 11:28:51