mjpeg_server.cpp
Go to the documentation of this file.
00001 /*********************************************************************
00002  *
00003  * Software License Agreement (BSD License)
00004  *
00005  *  Copyright (c) 2014, Robert Bosch LLC.
00006  *  All rights reserved.
00007  *
00008  *  Redistribution and use in source and binary forms, with or without
00009  *  modification, are permitted provided that the following conditions
00010  *  are met:
00011  *
00012  *   * Redistributions of source code must retain the above copyright
00013  *     notice, this list of conditions and the following disclaimer.
00014  *   * Redistributions in binary form must reproduce the above
00015  *     copyright notice, this list of conditions and the following
00016  *     disclaimer in the documentation and/or other materials provided
00017  *     with the distribution.
00018  *   * Neither the name of the Robert Bosch nor the names of its
00019  *     contributors may be used to endorse or promote products derived
00020  *     from this software without specific prior written permission.
00021  *
00022  *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00023  *  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00024  *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00025  *  FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00026  *  COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00027  *  INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00028  *  BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00029  *  LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00030  *  CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00031  *  LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00032  *  ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00033  *  POSSIBILITY OF SUCH DAMAGE.
00034  *
00035  *********************************************************************/
00036 
00037 #include <mjpeg_server/mjpeg_server.h>
00038 #include <sys/ioctl.h>
00039 #include <errno.h>
00040 #include <signal.h>
00041 #include <sys/socket.h>
00042 #include <arpa/inet.h>
00043 #include <sys/types.h>
00044 #include <sys/stat.h>
00045 #include <fcntl.h>
00046 #include <syslog.h>
00047 #include <netdb.h>
00048 #include <errno.h>
00049 #include <opencv2/opencv.hpp>
00050 #include <boost/thread.hpp>
00051 #include <boost/bind.hpp>
00052 
00053 template<typename T>
00054   inline T ABS(T a)
00055   {
00056     return (a < 0 ? -a : a);
00057   }
00058 
00059 template<typename T>
00060   inline T min(T a, T b)
00061   {
00062     return (a < b ? a : b);
00063   }
00064 
00065 template<typename T>
00066   inline T max(T a, T b)
00067   {
00068     return (a > b ? a : b);
00069   }
00070 
00071 template<typename T>
00072   inline T LENGTH_OF(T x)
00073   {
00074     return (sizeof(x) / sizeof(x[0]));
00075   }
00076 
00077 namespace mjpeg_server
00078 {
00079 
00080 MJPEGServer::MJPEGServer(ros::NodeHandle& node) :
00081     node_(node), image_transport_(node), stop_requested_(false), www_folder_(NULL)
00082 {
00083   ros::NodeHandle private_nh("~");
00084   private_nh.param("port", port_, 8080);
00085   header = "Connection: close\r\nServer: mjpeg_server\r\n"
00086       "Cache-Control: no-cache, no-store, must-revalidate, pre-check=0, post-check=0, max-age=0\r\n"
00087       "Pragma: no-cache\r\n";
00088   sd_len = 0;
00089 }
00090 
00091 MJPEGServer::~MJPEGServer()
00092 {
00093   cleanUp();
00094 }
00095 
00096 void MJPEGServer::imageCallback(const sensor_msgs::ImageConstPtr& msg, const std::string& topic)
00097 {
00098 
00099   ImageBuffer* image_buffer = getImageBuffer(topic);
00100   boost::unique_lock<boost::mutex> lock(image_buffer->mutex_);
00101   // copy image
00102   image_buffer->msg = *msg;
00103   // notify senders
00104   image_buffer->condition_.notify_all();
00105 }
00106 
00107 void MJPEGServer::splitString(const std::string& str, std::vector<std::string>& tokens, const std::string& delimiter)
00108 {
00109   // Skip delimiters at beginning.
00110   std::string::size_type lastPos = str.find_first_not_of(delimiter, 0);
00111   // Find first "non-delimiter".
00112   std::string::size_type pos = str.find_first_of(delimiter, lastPos);
00113 
00114   while (std::string::npos != pos || std::string::npos != lastPos)
00115   {
00116     // Found a token, add it to the vector.
00117     tokens.push_back(str.substr(lastPos, pos - lastPos));
00118     // Skip delimiters.  Note the "not_of"
00119     lastPos = str.find_first_not_of(delimiter, pos);
00120     // Find next "non-delimiter"
00121     pos = str.find_first_of(delimiter, lastPos);
00122   }
00123 }
00124 
00125 int MJPEGServer::stringToInt(const std::string& str, const int default_value)
00126 {
00127   int value;
00128   int res;
00129   if (str.length() == 0)
00130     return default_value;
00131   res = sscanf(str.c_str(), "%i", &value);
00132   if (res == 1)
00133     return value;
00134   return default_value;
00135 }
00136 
00137 void MJPEGServer::initIOBuffer(iobuffer *iobuf)
00138 {
00139   memset(iobuf->buffer, 0, sizeof(iobuf->buffer));
00140   iobuf->level = 0;
00141 }
00142 
00143 void MJPEGServer::initRequest(request *req)
00144 {
00145   req->type = A_UNKNOWN;
00146   req->type = A_UNKNOWN;
00147   req->parameter = NULL;
00148   req->client = NULL;
00149   req->credentials = NULL;
00150 }
00151 
00152 void MJPEGServer::freeRequest(request *req)
00153 {
00154   if (req->parameter != NULL)
00155     free(req->parameter);
00156   if (req->client != NULL)
00157     free(req->client);
00158   if (req->credentials != NULL)
00159     free(req->credentials);
00160 }
00161 
00162 int MJPEGServer::readWithTimeout(int fd, iobuffer *iobuf, char *buffer, size_t len, int timeout)
00163 {
00164   size_t copied = 0;
00165   int rc, i;
00166   fd_set fds;
00167   struct timeval tv;
00168 
00169   memset(buffer, 0, len);
00170 
00171   while ((copied < len))
00172   {
00173     i = min((size_t)iobuf->level, len - copied);
00174     memcpy(buffer + copied, iobuf->buffer + IO_BUFFER - iobuf->level, i);
00175 
00176     iobuf->level -= i;
00177     copied += i;
00178     if (copied >= len)
00179       return copied;
00180 
00181     /* select will return in case of timeout or new data arrived */
00182     tv.tv_sec = timeout;
00183     tv.tv_usec = 0;
00184     FD_ZERO(&fds);
00185     FD_SET(fd, &fds);
00186     if ((rc = select(fd + 1, &fds, NULL, NULL, &tv)) <= 0)
00187     {
00188       if (rc < 0)
00189         exit(EXIT_FAILURE);
00190 
00191       /* this must be a timeout */
00192       return copied;
00193     }
00194 
00195     initIOBuffer(iobuf);
00196 
00197     /*
00198      * there should be at least one byte, because select signalled it.
00199      * But: It may happen (very seldomly), that the socket gets closed remotly between
00200      * the select() and the following read. That is the reason for not relying
00201      * on reading at least one byte.
00202      */
00203     if ((iobuf->level = read(fd, &iobuf->buffer, IO_BUFFER)) <= 0)
00204     {
00205       /* an error occured */
00206       return -1;
00207     }
00208 
00209     /* align data to the end of the buffer if less than IO_BUFFER bytes were read */
00210     memmove(iobuf->buffer + (IO_BUFFER - iobuf->level), iobuf->buffer, iobuf->level);
00211   }
00212 
00213   return 0;
00214 }
00215 
00216 int MJPEGServer::readLineWithTimeout(int fd, iobuffer *iobuf, char *buffer, size_t len, int timeout)
00217 {
00218   char c = '\0', *out = buffer;
00219   unsigned int i;
00220 
00221   memset(buffer, 0, len);
00222 
00223   for (i = 0; i < len && c != '\n'; i++)
00224   {
00225     if (readWithTimeout(fd, iobuf, &c, 1, timeout) <= 0)
00226     {
00227       /* timeout or error occured */
00228       return -1;
00229     }
00230     *out++ = c;
00231   }
00232 
00233   return i;
00234 }
00235 
00236 void MJPEGServer::decodeBase64(char *data)
00237 {
00238   union
00239   {
00240     int i;
00241     char c[4];
00242   } buffer;
00243 
00244   char* ptr = data;
00245   unsigned int size = strlen(data);
00246   char* temp = new char[size];
00247   char* tempptr = temp;
00248   char t;
00249 
00250   for (buffer.i = 0, t = *ptr; ptr; ptr++)
00251   {
00252     if (t >= 'A' && t <= 'Z')
00253       t = t - 'A';
00254     else if (t >= 'a' && t <= 'z')
00255       t = t - 'a' + 26;
00256     else if (t >= '0' && t <= '9')
00257       t = t - '0' + 52;
00258     else if (t == '+')
00259       t = 62;
00260     else if (t == '/')
00261       t = 63;
00262     else
00263       continue;
00264 
00265     buffer.i = (buffer.i << 6) | t;
00266 
00267     if ((ptr - data + 1) % 4)
00268     {
00269       *tempptr++ = buffer.c[2];
00270       *tempptr++ = buffer.c[1];
00271       *tempptr++ = buffer.c[0];
00272       buffer.i = 0;
00273     }
00274   }
00275   *tempptr = '\0';
00276   strcpy(data, temp);
00277   delete temp;
00278 }
00279 
00280 int MJPEGServer::hexCharToInt(char in)
00281 {
00282   if (in >= '0' && in <= '9')
00283     return in - '0';
00284 
00285   if (in >= 'a' && in <= 'f')
00286     return (in - 'a') + 10;
00287 
00288   if (in >= 'A' && in <= 'F')
00289     return (in - 'A') + 10;
00290 
00291   return -1;
00292 }
00293 
00294 int MJPEGServer::unescape(char *string)
00295 {
00296   char *source = string, *destination = string;
00297   int src, dst, length = strlen(string), rc;
00298 
00299   /* iterate over the string */
00300   for (dst = 0, src = 0; src < length; src++)
00301   {
00302 
00303     /* is it an escape character? */
00304     if (source[src] != '%')
00305     {
00306       /* no, so just go to the next character */
00307       destination[dst] = source[src];
00308       dst++;
00309       continue;
00310     }
00311 
00312     /* yes, it is an escaped character */
00313 
00314     /* check if there are enough characters */
00315     if (src + 2 > length)
00316     {
00317       return -1;
00318       break;
00319     }
00320 
00321     /* perform replacement of %## with the corresponding character */
00322     if ((rc = hexCharToInt(source[src + 1])) == -1)
00323       return -1;
00324     destination[dst] = rc * 16;
00325     if ((rc = hexCharToInt(source[src + 2])) == -1)
00326       return -1;
00327     destination[dst] += rc;
00328 
00329     /* advance pointers, here is the reason why the resulting string is shorter */
00330     dst++;
00331     src += 2;
00332   }
00333 
00334   /* ensure the string is properly finished with a null-character */
00335   destination[dst] = '\0';
00336 
00337   return 0;
00338 }
00339 
00340 void MJPEGServer::sendError(int fd, int which, const char *message)
00341 {
00342   char buffer[BUFFER_SIZE] = {0};
00343 
00344   if (which == 401)
00345   {
00346     sprintf(buffer, "HTTP/1.0 401 Unauthorized\r\n"
00347             "Content-type: text/plain\r\n"
00348             "%s"
00349             "WWW-Authenticate: Basic realm=\"MJPG-Streamer\"\r\n"
00350             "\r\n"
00351             "401: Not Authenticated!\r\n"
00352             "%s",
00353             header.c_str(), message);
00354   }
00355   else if (which == 404)
00356   {
00357     sprintf(buffer, "HTTP/1.0 404 Not Found\r\n"
00358             "Content-type: text/plain\r\n"
00359             "%s"
00360             "\r\n"
00361             "404: Not Found!\r\n"
00362             "%s",
00363             header.c_str(), message);
00364   }
00365   else if (which == 500)
00366   {
00367     sprintf(buffer, "HTTP/1.0 500 Internal Server Error\r\n"
00368             "Content-type: text/plain\r\n"
00369             "%s"
00370             "\r\n"
00371             "500: Internal Server Error!\r\n"
00372             "%s",
00373             header.c_str(), message);
00374   }
00375   else if (which == 400)
00376   {
00377     sprintf(buffer, "HTTP/1.0 400 Bad Request\r\n"
00378             "Content-type: text/plain\r\n"
00379             "%s"
00380             "\r\n"
00381             "400: Not Found!\r\n"
00382             "%s",
00383             header.c_str(), message);
00384   }
00385   else
00386   {
00387     sprintf(buffer, "HTTP/1.0 501 Not Implemented\r\n"
00388             "Content-type: text/plain\r\n"
00389             "%s"
00390             "\r\n"
00391             "501: Not Implemented!\r\n"
00392             "%s",
00393             header.c_str(), message);
00394   }
00395 
00396   if (write(fd, buffer, strlen(buffer)) < 0)
00397   {
00398     ROS_DEBUG("write failed, done anyway");
00399   }
00400 }
00401 
00402 void MJPEGServer::decodeParameter(const std::string& parameter, ParameterMap& parameter_map)
00403 {
00404   std::vector<std::string> parameter_value_pairs;
00405   splitString(parameter, parameter_value_pairs, "?&");
00406 
00407   for (size_t i = 0; i < parameter_value_pairs.size(); i++)
00408   {
00409     std::vector<std::string> parameter_value;
00410     splitString(parameter_value_pairs[i], parameter_value, "=");
00411     if (parameter_value.size() == 1)
00412     {
00413       parameter_map.insert(std::make_pair(parameter_value[0], std::string("")));
00414     }
00415     else if (parameter_value.size() == 2)
00416     {
00417       parameter_map.insert(std::make_pair(parameter_value[0], parameter_value[1]));
00418     }
00419   }
00420 }
00421 
00422 ImageBuffer* MJPEGServer::getImageBuffer(const std::string& topic)
00423 {
00424   boost::unique_lock<boost::mutex> lock(image_maps_mutex_);
00425   ImageSubscriberMap::iterator it = image_subscribers_.find(topic);
00426   if (it == image_subscribers_.end())
00427   {
00428     image_subscribers_[topic] = image_transport_.subscribe(topic, 1,
00429                                                            boost::bind(&MJPEGServer::imageCallback, this, _1, topic));
00430     image_buffers_[topic] = new ImageBuffer();
00431     ROS_INFO("Subscribing to topic %s", topic.c_str());
00432   }
00433   ImageBuffer* image_buffer = image_buffers_[topic];
00434   return image_buffer;
00435 }
00436 
00437 // rotate input image at 180 degrees
00438 void MJPEGServer::invertImage(const cv::Mat& input, cv::Mat& output)
00439 {
00440 
00441   cv::Mat_<cv::Vec3b>& input_img = (cv::Mat_<cv::Vec3b>&)input; //3 channel pointer to image
00442   cv::Mat_<cv::Vec3b>& output_img = (cv::Mat_<cv::Vec3b>&)output; //3 channel pointer to image
00443   cv::Size size = input.size();
00444 
00445   for (int j = 0; j < size.height; ++j)
00446     for (int i = 0; i < size.width; ++i)
00447     {
00448       //outputImage.imageData[size.height*size.width - (i + j*size.width) - 1] = inputImage.imageData[i + j*size.width];
00449       output_img(size.height - j - 1, size.width - i - 1) = input_img(j, i);
00450     }
00451   return;
00452 }
00453 
00454 void MJPEGServer::sendStream(int fd, const char *parameter)
00455 {
00456   unsigned char *frame = NULL, *tmp = NULL;
00457   int frame_size = 0, max_frame_size = 0;
00458   int tenk = 10 * 1024;
00459   char buffer[BUFFER_SIZE] = {0};
00460   double timestamp;
00461   //sensor_msgs::CvBridge image_bridge;
00462   //sensor_msgs::cv_bridge image_bridge;
00463   cv_bridge::CvImage image_bridge;
00464 
00465   ROS_DEBUG("Decoding parameter");
00466 
00467   std::string params = parameter;
00468 
00469   ParameterMap parameter_map;
00470   decodeParameter(params, parameter_map);
00471 
00472   ParameterMap::iterator itp = parameter_map.find("topic");
00473   if (itp == parameter_map.end())
00474     return;
00475 
00476   std::string topic = itp->second;
00477   increaseSubscriberCount(topic);
00478   ImageBuffer* image_buffer = getImageBuffer(topic);
00479 
00480   ROS_DEBUG("preparing header");
00481   sprintf(buffer, "HTTP/1.0 200 OK\r\n"
00482           "%s"
00483           "Content-Type: multipart/x-mixed-replace;boundary=boundarydonotcross \r\n"
00484           "\r\n"
00485           "--boundarydonotcross \r\n",
00486           header.c_str());
00487 
00488   if (write(fd, buffer, strlen(buffer)) < 0)
00489   {
00490     free(frame);
00491     return;
00492   }
00493 
00494   ROS_DEBUG("Headers send, sending stream now");
00495 
00496   while (!stop_requested_)
00497   {
00498     {
00499       /* wait for fresh frames */
00500       boost::unique_lock<boost::mutex> lock(image_buffer->mutex_);
00501       image_buffer->condition_.wait(lock);
00502 
00503       //IplImage* image;
00504       cv_bridge::CvImagePtr cv_msg;
00505       try
00506       {
00507         if (cv_msg = cv_bridge::toCvCopy(image_buffer->msg, "bgr8"))
00508         {
00509           ;    //image = image_bridge.toIpl();
00510         }
00511         else
00512         {
00513           ROS_ERROR("Unable to convert %s image to bgr8", image_buffer->msg.encoding.c_str());
00514           return;
00515         }
00516       }
00517       catch (...)
00518       {
00519         ROS_ERROR("Unable to convert %s image to ipl format", image_buffer->msg.encoding.c_str());
00520         return;
00521       }
00522 
00523       // encode image
00524       cv::Mat img = cv_msg->image;
00525       std::vector<uchar> encoded_buffer;
00526       std::vector<int> encode_params;
00527 
00528       // invert
00529       //int invert = 0;
00530       if (parameter_map.find("invert") != parameter_map.end())
00531       {
00532         cv::Mat cloned_image = img.clone();
00533         invertImage(cloned_image, img);
00534       }
00535 
00536       // quality
00537       int quality = 95;
00538       if (parameter_map.find("quality") != parameter_map.end())
00539       {
00540         quality = stringToInt(parameter_map["quality"]);
00541       }
00542       encode_params.push_back(CV_IMWRITE_JPEG_QUALITY);
00543       encode_params.push_back(quality);
00544 
00545       // resize image
00546       if (parameter_map.find("width") != parameter_map.end() && parameter_map.find("height") != parameter_map.end())
00547       {
00548         int width = stringToInt(parameter_map["width"]);
00549         int height = stringToInt(parameter_map["height"]);
00550         if (width > 0 && height > 0)
00551         {
00552           cv::Mat img_resized;
00553           cv::Size new_size(width, height);
00554           cv::resize(img, img_resized, new_size);
00555           cv::imencode(".jpeg", img_resized, encoded_buffer, encode_params);
00556         }
00557         else
00558         {
00559           cv::imencode(".jpeg", img, encoded_buffer, encode_params);
00560         }
00561       }
00562       else
00563       {
00564         cv::imencode(".jpeg", img, encoded_buffer, encode_params);
00565       }
00566 
00567       // copy encoded frame buffer
00568       frame_size = encoded_buffer.size();
00569 
00570       /* check if frame buffer is large enough, increase it if necessary */
00571       if (frame_size > max_frame_size)
00572       {
00573         ROS_DEBUG("increasing frame buffer size to %d", frame_size);
00574 
00575         max_frame_size = frame_size + tenk;
00576         if ((tmp = (unsigned char*)realloc(frame, max_frame_size)) == NULL)
00577         {
00578           free(frame);
00579           sendError(fd, 500, "not enough memory");
00580           return;
00581         }
00582         frame = tmp;
00583       }
00584 
00585       /* copy v4l2_buffer timeval to user space */
00586       timestamp = ros::Time::now().toSec();
00587 
00588       memcpy(frame, &encoded_buffer[0], frame_size);
00589       ROS_DEBUG("got frame (size: %d kB)", frame_size / 1024);
00590     }
00591 
00592     /*
00593      * print the individual mimetype and the length
00594      * sending the content-length fixes random stream disruption observed
00595      * with firefox
00596      */
00597     sprintf(buffer, "Content-Type: image/jpeg\r\n"
00598             "Content-Length: %d\r\n"
00599             "X-Timestamp: %.06lf\r\n"
00600             "\r\n",
00601             frame_size, (double)timestamp);
00602     ROS_DEBUG("sending intemdiate header");
00603     if (write(fd, buffer, strlen(buffer)) < 0)
00604       break;
00605 
00606     ROS_DEBUG("sending frame");
00607     if (write(fd, frame, frame_size) < 0)
00608       break;
00609 
00610     ROS_DEBUG("sending boundary");
00611     sprintf(buffer, "\r\n--boundarydonotcross \r\n");
00612     if (write(fd, buffer, strlen(buffer)) < 0)
00613       break;
00614   }
00615 
00616   free(frame);
00617   decreaseSubscriberCount(topic);
00618   unregisterSubscriberIfPossible(topic);
00619 
00620 }
00621 
00622 void MJPEGServer::sendSnapshot(int fd, const char *parameter)
00623 {
00624   unsigned char *frame = NULL;
00625   int frame_size = 0;
00626   char buffer[BUFFER_SIZE] = {0};
00627   double timestamp;
00628   //sensor_msgs::CvBridge image_bridge;
00629   //sensor_msgs::cv_bridge image_bridge;
00630 
00631   std::string params = parameter;
00632   ParameterMap parameter_map;
00633   decodeParameter(params, parameter_map); // http://merry:8080/stream?topic=/remote_lab_cam1/image_raw?invert=1
00634 
00635   ParameterMap::iterator itp = parameter_map.find("topic");
00636   if (itp == parameter_map.end())
00637     return;
00638 
00639   std::string topic = itp->second;
00640   increaseSubscriberCount(topic);
00641   ImageBuffer* image_buffer = getImageBuffer(topic);
00642 
00643   /* wait for fresh frames */
00644   boost::unique_lock<boost::mutex> lock(image_buffer->mutex_);
00645   image_buffer->condition_.wait(lock);
00646 
00647   //IplImage* image;
00648   cv_bridge::CvImagePtr cv_msg;
00649   try
00650   {
00651     if (cv_msg = cv_bridge::toCvCopy(image_buffer->msg, "bgr8"))
00652     {
00653       ; //image = image_bridge.toIpl();
00654     }
00655     else
00656     {
00657       ROS_ERROR("Unable to convert %s image to bgr8", image_buffer->msg.encoding.c_str());
00658       return;
00659     }
00660   }
00661   catch (...)
00662   {
00663     ROS_ERROR("Unable to convert %s image to ipl format", image_buffer->msg.encoding.c_str());
00664     return;
00665   }
00666 
00667   cv::Mat img = cv_msg->image;
00668   std::vector<uchar> encoded_buffer;
00669   std::vector<int> encode_params;
00670 
00671   // invert
00672   //int invert = 0;
00673   if (parameter_map.find("invert") != parameter_map.end())
00674   {
00675     cv::Mat cloned_image = img.clone();
00676     invertImage(cloned_image, img);
00677   }
00678 
00679   // quality
00680   int quality = 95;
00681   if (parameter_map.find("quality") != parameter_map.end())
00682   {
00683     quality = stringToInt(parameter_map["quality"]);
00684   }
00685   encode_params.push_back(CV_IMWRITE_JPEG_QUALITY);
00686   encode_params.push_back(quality);
00687 
00688   // resize image
00689   if (parameter_map.find("width") != parameter_map.end() && parameter_map.find("height") != parameter_map.end())
00690   {
00691     int width = stringToInt(parameter_map["width"]);
00692     int height = stringToInt(parameter_map["height"]);
00693     if (width > 0 && height > 0)
00694     {
00695       cv::Mat img_resized;
00696       cv::Size new_size(width, height);
00697       cv::resize(img, img_resized, new_size);
00698       cv::imencode(".jpeg", img_resized, encoded_buffer, encode_params);
00699     }
00700     else
00701     {
00702       cv::imencode(".jpeg", img, encoded_buffer, encode_params);
00703     }
00704   }
00705   else
00706   {
00707     cv::imencode(".jpeg", img, encoded_buffer, encode_params);
00708   }
00709 
00710   // copy encoded frame buffer
00711   frame_size = encoded_buffer.size();
00712 
00713   // resize buffer
00714   if ((frame = (unsigned char*)malloc(frame_size)) == NULL)
00715   {
00716     free(frame);
00717     sendError(fd, 500, "not enough memory");
00718     return;
00719   }
00720 
00721   /* copy v4l2_buffer timeval to user space */
00722   timestamp = ros::Time::now().toSec();
00723 
00724   memcpy(frame, &encoded_buffer[0], frame_size);
00725   ROS_DEBUG("got frame (size: %d kB)", frame_size / 1024);
00726 
00727   /* write the response */
00728   sprintf(buffer, "HTTP/1.0 200 OK\r\n"
00729           "%s"
00730           "Content-type: image/jpeg\r\n"
00731           "X-Timestamp: %.06lf\r\n"
00732           "\r\n",
00733           header.c_str(), (double)timestamp);
00734 
00735   /* send header and image now */
00736   if (write(fd, buffer, strlen(buffer)) < 0 || write(fd, frame, frame_size) < 0)
00737   {
00738     free(frame);
00739     return;
00740   }
00741 
00742   free(frame);
00743   decreaseSubscriberCount(topic);
00744   unregisterSubscriberIfPossible(topic);
00745 }
00746 
00747 void MJPEGServer::client(int fd)
00748 {
00749   int cnt;
00750   char buffer[BUFFER_SIZE] = {0}, *pb = buffer;
00751   iobuffer iobuf;
00752   request req;
00753 
00754   /* initializes the structures */
00755   initIOBuffer(&iobuf);
00756   initRequest(&req);
00757 
00758   /* What does the client want to receive? Read the request. */
00759   memset(buffer, 0, sizeof(buffer));
00760   if ((cnt = readLineWithTimeout(fd, &iobuf, buffer, sizeof(buffer) - 1, 5)) == -1)
00761   {
00762     close(fd);
00763     return;
00764   }
00765 
00766   /* determine what to deliver */
00767   if (strstr(buffer, "GET /?") != NULL)
00768   {
00769     req.type = A_STREAM;
00770 
00771     /* advance by the length of known string */
00772     if ((pb = strstr(buffer, "GET /")) == NULL)
00773     {
00774       ROS_DEBUG("HTTP request seems to be malformed");
00775       sendError(fd, 400, "Malformed HTTP request");
00776       close(fd);
00777       return;
00778     }
00779     pb += strlen("GET /"); // a pb points to the string after the first & after command
00780     int len = min(max((int)strspn(pb, "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ._/-1234567890?="), 0), 100);
00781     req.parameter = (char*)malloc(len + 1);
00782     if (req.parameter == NULL)
00783     {
00784       exit(EXIT_FAILURE);
00785     }
00786     memset(req.parameter, 0, len + 1);
00787     strncpy(req.parameter, pb, len);
00788 
00789     ROS_DEBUG("requested image topic[%d]: \"%s\"", len, req.parameter);
00790   }
00791   else if (strstr(buffer, "GET /stream?") != NULL)
00792   {
00793     req.type = A_STREAM;
00794 
00795     /* advance by the length of known string */
00796     if ((pb = strstr(buffer, "GET /stream")) == NULL)
00797     {
00798       ROS_DEBUG("HTTP request seems to be malformed");
00799       sendError(fd, 400, "Malformed HTTP request");
00800       close(fd);
00801       return;
00802     }
00803     pb += strlen("GET /stream"); // a pb points to the string after the first & after command
00804     int len = min(max((int)strspn(pb, "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ._/-1234567890?="), 0), 100);
00805     req.parameter = (char*)malloc(len + 1);
00806     if (req.parameter == NULL)
00807     {
00808       exit(EXIT_FAILURE);
00809     }
00810     memset(req.parameter, 0, len + 1);
00811     strncpy(req.parameter, pb, len);
00812 
00813     ROS_DEBUG("requested image topic[%d]: \"%s\"", len, req.parameter);
00814   }
00815   else if (strstr(buffer, "GET /snapshot?") != NULL)
00816   {
00817     req.type = A_SNAPSHOT;
00818 
00819     /* advance by the length of known string */
00820     if ((pb = strstr(buffer, "GET /snapshot")) == NULL)
00821     {
00822       ROS_DEBUG("HTTP request seems to be malformed");
00823       sendError(fd, 400, "Malformed HTTP request");
00824       close(fd);
00825       return;
00826     }
00827     pb += strlen("GET /snapshot"); // a pb points to the string after the first & after command
00828     int len = min(max((int)strspn(pb, "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ._/-1234567890?="), 0), 100);
00829     req.parameter = (char*)malloc(len + 1);
00830     if (req.parameter == NULL)
00831     {
00832       exit(EXIT_FAILURE);
00833     }
00834     memset(req.parameter, 0, len + 1);
00835     strncpy(req.parameter, pb, len);
00836 
00837     ROS_DEBUG("requested image topic[%d]: \"%s\"", len, req.parameter);
00838   }
00839 
00840   /*
00841    * parse the rest of the HTTP-request
00842    * the end of the request-header is marked by a single, empty line with "\r\n"
00843    */
00844   do
00845   {
00846     memset(buffer, 0, sizeof(buffer));
00847 
00848     if ((cnt = readLineWithTimeout(fd, &iobuf, buffer, sizeof(buffer) - 1, 5)) == -1)
00849     {
00850       freeRequest(&req);
00851       close(fd);
00852       return;
00853     }
00854 
00855     if (strstr(buffer, "User-Agent: ") != NULL)
00856     {
00857       req.client = strdup(buffer + strlen("User-Agent: "));
00858     }
00859     else if (strstr(buffer, "Authorization: Basic ") != NULL)
00860     {
00861       req.credentials = strdup(buffer + strlen("Authorization: Basic "));
00862       decodeBase64(req.credentials);
00863       ROS_DEBUG("username:password: %s", req.credentials);
00864     }
00865 
00866   } while (cnt > 2 && !(buffer[0] == '\r' && buffer[1] == '\n'));
00867 
00868   /* now it's time to answer */
00869   switch (req.type)
00870   {
00871     case A_STREAM:
00872     {
00873       ROS_DEBUG("Request for streaming");
00874       sendStream(fd, req.parameter);
00875       break;
00876     }
00877     case A_SNAPSHOT:
00878     {
00879       ROS_DEBUG("Request for snapshot");
00880       sendSnapshot(fd, req.parameter);
00881       break;
00882     }
00883     default:
00884       ROS_DEBUG("unknown request");
00885   }
00886 
00887   close(fd);
00888   freeRequest(&req);
00889   ROS_INFO("Disconnecting HTTP client");
00890   return;
00891 }
00892 
00893 void MJPEGServer::execute()
00894 {
00895 
00896   ROS_INFO("Starting mjpeg server");
00897 
00898   struct addrinfo *aip, *aip2;
00899   struct addrinfo hints;
00900   struct sockaddr_storage client_addr;
00901   socklen_t addr_len = sizeof(struct sockaddr_storage);
00902   fd_set selectfds;
00903   int max_fds = 0;
00904 
00905   char name[NI_MAXHOST];
00906 
00907   bzero(&hints, sizeof(hints));
00908   hints.ai_family = PF_UNSPEC;
00909   hints.ai_flags = AI_PASSIVE;
00910   hints.ai_socktype = SOCK_STREAM;
00911 
00912   int err;
00913   snprintf(name, sizeof(name), "%d", port_);
00914   if ((err = getaddrinfo(NULL, name, &hints, &aip)) != 0)
00915   {
00916     perror(gai_strerror(err));
00917     exit(EXIT_FAILURE);
00918   }
00919 
00920   for (int i = 0; i < MAX_NUM_SOCKETS; i++)
00921     sd[i] = -1;
00922 
00923   /* open sockets for server (1 socket / address family) */
00924   int i = 0;
00925   int on;
00926   for (aip2 = aip; aip2 != NULL; aip2 = aip2->ai_next)
00927   {
00928     if ((sd[i] = socket(aip2->ai_family, aip2->ai_socktype, 0)) < 0)
00929     {
00930       continue;
00931     }
00932 
00933     /* ignore "socket already in use" errors */
00934     on = 1;
00935     if (setsockopt(sd[i], SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0)
00936     {
00937       perror("setsockopt(SO_REUSEADDR) failed");
00938     }
00939 
00940     /* IPv6 socket should listen to IPv6 only, otherwise we will get "socket already in use" */
00941     on = 1;
00942     if (aip2->ai_family == AF_INET6 && setsockopt(sd[i], IPPROTO_IPV6, IPV6_V6ONLY, (const void *)&on, sizeof(on)) < 0)
00943     {
00944       perror("setsockopt(IPV6_V6ONLY) failed");
00945     }
00946 
00947     /* perhaps we will use this keep-alive feature oneday */
00948     /* setsockopt(sd, SOL_SOCKET, SO_KEEPALIVE, &on, sizeof(on)); */
00949 
00950     if (bind(sd[i], aip2->ai_addr, aip2->ai_addrlen) < 0)
00951     {
00952       perror("bind");
00953       sd[i] = -1;
00954       continue;
00955     }
00956 
00957     if (listen(sd[i], 10) < 0)
00958     {
00959       perror("listen");
00960       sd[i] = -1;
00961     }
00962     else
00963     {
00964       i++;
00965       if (i >= MAX_NUM_SOCKETS)
00966       {
00967         ROS_ERROR("Maximum number of server sockets exceeded");
00968         i--;
00969         break;
00970       }
00971     }
00972   }
00973 
00974   sd_len = i;
00975 
00976   if (sd_len < 1)
00977   {
00978     ROS_ERROR("Bind(%d) failed", port_);
00979     closelog();
00980     exit(EXIT_FAILURE);
00981   }
00982   else
00983   {
00984     ROS_INFO("Bind(%d) succeeded", port_);
00985   }
00986 
00987   ROS_INFO("waiting for clients to connect");
00988 
00989   /* create a child for every client that connects */
00990   while (!stop_requested_)
00991   {
00992 
00993     do
00994     {
00995       FD_ZERO(&selectfds);
00996 
00997       for (i = 0; i < MAX_NUM_SOCKETS; i++)
00998       {
00999         if (sd[i] != -1)
01000         {
01001           FD_SET(sd[i], &selectfds);
01002 
01003           if (sd[i] > max_fds)
01004             max_fds = sd[i];
01005         }
01006       }
01007 
01008       err = select(max_fds + 1, &selectfds, NULL, NULL, NULL);
01009 
01010       if (err < 0 && errno != EINTR)
01011       {
01012         perror("select");
01013         exit(EXIT_FAILURE);
01014       }
01015     } while (err <= 0);
01016 
01017     ROS_INFO("Client connected");
01018 
01019     for (i = 0; i < max_fds + 1; i++)
01020     {
01021       if (sd[i] != -1 && FD_ISSET(sd[i], &selectfds))
01022       {
01023         int fd = accept(sd[i], (struct sockaddr *)&client_addr, &addr_len);
01024 
01025         /* start new thread that will handle this TCP connected client */
01026         ROS_DEBUG("create thread to handle client that just established a connection");
01027 
01028         if (getnameinfo((struct sockaddr *)&client_addr, addr_len, name, sizeof(name), NULL, 0, NI_NUMERICHOST) == 0)
01029         {
01030           syslog(LOG_INFO, "serving client: %s\n", name);
01031         }
01032 
01033         boost::thread t(boost::bind(&MJPEGServer::client, this, fd));
01034         t.detach();
01035       }
01036     }
01037   }
01038 
01039   ROS_INFO("leaving server thread, calling cleanup function now");
01040   cleanUp();
01041 }
01042 
01043 void MJPEGServer::cleanUp()
01044 {
01045   ROS_INFO("cleaning up ressources allocated by server thread");
01046 
01047   for (int i = 0; i < MAX_NUM_SOCKETS; i++)
01048     close(sd[i]);
01049 }
01050 
01051 void MJPEGServer::spin()
01052 {
01053   boost::thread t(boost::bind(&MJPEGServer::execute, this));
01054   t.detach();
01055   ros::spin();
01056   ROS_INFO("stop requested");
01057   stop();
01058 }
01059 
01060 void MJPEGServer::stop()
01061 {
01062   stop_requested_ = true;
01063 }
01064 
01065 void MJPEGServer::decreaseSubscriberCount(const std::string topic)
01066 {
01067   boost::unique_lock<boost::mutex> lock(image_maps_mutex_);
01068   ImageSubscriberCountMap::iterator it = image_subscribers_count_.find(topic);
01069   if (it != image_subscribers_count_.end())
01070   {
01071     if (image_subscribers_count_[topic] == 1) {
01072       image_subscribers_count_.erase(it);
01073       ROS_INFO("no subscribers for %s", topic.c_str());
01074     }
01075     else if (image_subscribers_count_[topic] > 0) {
01076       image_subscribers_count_[topic] = image_subscribers_count_[topic] - 1;
01077       ROS_INFO("%lu subscribers for %s", image_subscribers_count_[topic], topic.c_str());
01078     }
01079   }
01080   else
01081   {
01082     ROS_INFO("no subscribers counter for %s", topic.c_str());
01083   }
01084 }
01085 
01086 void MJPEGServer::increaseSubscriberCount(const std::string topic)
01087 {
01088   boost::unique_lock<boost::mutex> lock(image_maps_mutex_);
01089   ImageSubscriberCountMap::iterator it = image_subscribers_count_.find(topic);
01090   if (it == image_subscribers_count_.end())
01091   {
01092     image_subscribers_count_.insert(ImageSubscriberCountMap::value_type(topic, 1));
01093   }
01094   else {
01095     image_subscribers_count_[topic] = image_subscribers_count_[topic] + 1;
01096   }
01097   ROS_INFO("%lu subscribers for %s", image_subscribers_count_[topic], topic.c_str());
01098 }
01099 
01100 void MJPEGServer::unregisterSubscriberIfPossible(const std::string topic)
01101 {
01102   boost::unique_lock<boost::mutex> lock(image_maps_mutex_);
01103   ImageSubscriberCountMap::iterator it = image_subscribers_count_.find(topic);
01104   if (it == image_subscribers_count_.end() ||
01105       image_subscribers_count_[topic] == 0)
01106   {
01107     ImageSubscriberMap::iterator sub_it = image_subscribers_.find(topic);
01108     if (sub_it != image_subscribers_.end())
01109     {
01110       ROS_INFO("Unsubscribing from %s", topic.c_str());
01111       image_subscribers_.erase(sub_it);
01112     }
01113   }
01114 }
01115 }
01116 
01117 int main(int argc, char** argv)
01118 {
01119   ros::init(argc, argv, "mjpeg_server");
01120 
01121   ROS_WARN("WARNING -- mjpeg_server IS NOW DEPRECATED.");
01122   ROS_WARN("PLEASE SEE https://github.com/RobotWebTools/web_video_server.");
01123 
01124   ros::NodeHandle nh;
01125   mjpeg_server::MJPEGServer server(nh);
01126   server.spin();
01127 
01128   return (0);
01129 }
01130 


mjpeg_server
Author(s): Benjamin Pitzer
autogenerated on Wed Aug 26 2015 12:32:35