00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037 #include <mjpeg_server/mjpeg_server.h>
00038 #include <sys/ioctl.h>
00039 #include <errno.h>
00040 #include <signal.h>
00041 #include <sys/socket.h>
00042 #include <arpa/inet.h>
00043 #include <sys/types.h>
00044 #include <sys/stat.h>
00045 #include <fcntl.h>
00046 #include <syslog.h>
00047 #include <netdb.h>
00048 #include <errno.h>
00049 #include <opencv2/opencv.hpp>
00050 #include <boost/thread.hpp>
00051 #include <boost/bind.hpp>
00052
00053 template<typename T>
00054 inline T ABS(T a)
00055 {
00056 return (a < 0 ? -a : a);
00057 }
00058
00059 template<typename T>
00060 inline T min(T a, T b)
00061 {
00062 return (a < b ? a : b);
00063 }
00064
00065 template<typename T>
00066 inline T max(T a, T b)
00067 {
00068 return (a > b ? a : b);
00069 }
00070
00071 template<typename T>
00072 inline T LENGTH_OF(T x)
00073 {
00074 return (sizeof(x) / sizeof(x[0]));
00075 }
00076
00077 namespace mjpeg_server
00078 {
00079
00080 MJPEGServer::MJPEGServer(ros::NodeHandle& node) :
00081 node_(node), image_transport_(node), stop_requested_(false), www_folder_(NULL)
00082 {
00083 ros::NodeHandle private_nh("~");
00084 private_nh.param("port", port_, 8080);
00085 header = "Connection: close\r\nServer: mjpeg_server\r\n"
00086 "Cache-Control: no-cache, no-store, must-revalidate, pre-check=0, post-check=0, max-age=0\r\n"
00087 "Pragma: no-cache\r\n";
00088 sd_len = 0;
00089 }
00090
00091 MJPEGServer::~MJPEGServer()
00092 {
00093 cleanUp();
00094 }
00095
00096 void MJPEGServer::imageCallback(const sensor_msgs::ImageConstPtr& msg, const std::string& topic)
00097 {
00098
00099 ImageBuffer* image_buffer = getImageBuffer(topic);
00100 boost::unique_lock<boost::mutex> lock(image_buffer->mutex_);
00101
00102 image_buffer->msg = *msg;
00103
00104 image_buffer->condition_.notify_all();
00105 }
00106
00107 void MJPEGServer::splitString(const std::string& str, std::vector<std::string>& tokens, const std::string& delimiter)
00108 {
00109
00110 std::string::size_type lastPos = str.find_first_not_of(delimiter, 0);
00111
00112 std::string::size_type pos = str.find_first_of(delimiter, lastPos);
00113
00114 while (std::string::npos != pos || std::string::npos != lastPos)
00115 {
00116
00117 tokens.push_back(str.substr(lastPos, pos - lastPos));
00118
00119 lastPos = str.find_first_not_of(delimiter, pos);
00120
00121 pos = str.find_first_of(delimiter, lastPos);
00122 }
00123 }
00124
00125 int MJPEGServer::stringToInt(const std::string& str, const int default_value)
00126 {
00127 int value;
00128 int res;
00129 if (str.length() == 0)
00130 return default_value;
00131 res = sscanf(str.c_str(), "%i", &value);
00132 if (res == 1)
00133 return value;
00134 return default_value;
00135 }
00136
00137 void MJPEGServer::initIOBuffer(iobuffer *iobuf)
00138 {
00139 memset(iobuf->buffer, 0, sizeof(iobuf->buffer));
00140 iobuf->level = 0;
00141 }
00142
00143 void MJPEGServer::initRequest(request *req)
00144 {
00145 req->type = A_UNKNOWN;
00146 req->type = A_UNKNOWN;
00147 req->parameter = NULL;
00148 req->client = NULL;
00149 req->credentials = NULL;
00150 }
00151
00152 void MJPEGServer::freeRequest(request *req)
00153 {
00154 if (req->parameter != NULL)
00155 free(req->parameter);
00156 if (req->client != NULL)
00157 free(req->client);
00158 if (req->credentials != NULL)
00159 free(req->credentials);
00160 }
00161
00162 int MJPEGServer::readWithTimeout(int fd, iobuffer *iobuf, char *buffer, size_t len, int timeout)
00163 {
00164 size_t copied = 0;
00165 int rc, i;
00166 fd_set fds;
00167 struct timeval tv;
00168
00169 memset(buffer, 0, len);
00170
00171 while ((copied < len))
00172 {
00173 i = min((size_t)iobuf->level, len - copied);
00174 memcpy(buffer + copied, iobuf->buffer + IO_BUFFER - iobuf->level, i);
00175
00176 iobuf->level -= i;
00177 copied += i;
00178 if (copied >= len)
00179 return copied;
00180
00181
00182 tv.tv_sec = timeout;
00183 tv.tv_usec = 0;
00184 FD_ZERO(&fds);
00185 FD_SET(fd, &fds);
00186 if ((rc = select(fd + 1, &fds, NULL, NULL, &tv)) <= 0)
00187 {
00188 if (rc < 0)
00189 exit(EXIT_FAILURE);
00190
00191
00192 return copied;
00193 }
00194
00195 initIOBuffer(iobuf);
00196
00197
00198
00199
00200
00201
00202
00203 if ((iobuf->level = read(fd, &iobuf->buffer, IO_BUFFER)) <= 0)
00204 {
00205
00206 return -1;
00207 }
00208
00209
00210 memmove(iobuf->buffer + (IO_BUFFER - iobuf->level), iobuf->buffer, iobuf->level);
00211 }
00212
00213 return 0;
00214 }
00215
00216 int MJPEGServer::readLineWithTimeout(int fd, iobuffer *iobuf, char *buffer, size_t len, int timeout)
00217 {
00218 char c = '\0', *out = buffer;
00219 unsigned int i;
00220
00221 memset(buffer, 0, len);
00222
00223 for (i = 0; i < len && c != '\n'; i++)
00224 {
00225 if (readWithTimeout(fd, iobuf, &c, 1, timeout) <= 0)
00226 {
00227
00228 return -1;
00229 }
00230 *out++ = c;
00231 }
00232
00233 return i;
00234 }
00235
00236 void MJPEGServer::decodeBase64(char *data)
00237 {
00238 union
00239 {
00240 int i;
00241 char c[4];
00242 } buffer;
00243
00244 char* ptr = data;
00245 unsigned int size = strlen(data);
00246 char* temp = new char[size];
00247 char* tempptr = temp;
00248 char t;
00249
00250 for (buffer.i = 0, t = *ptr; ptr; ptr++)
00251 {
00252 if (t >= 'A' && t <= 'Z')
00253 t = t - 'A';
00254 else if (t >= 'a' && t <= 'z')
00255 t = t - 'a' + 26;
00256 else if (t >= '0' && t <= '9')
00257 t = t - '0' + 52;
00258 else if (t == '+')
00259 t = 62;
00260 else if (t == '/')
00261 t = 63;
00262 else
00263 continue;
00264
00265 buffer.i = (buffer.i << 6) | t;
00266
00267 if ((ptr - data + 1) % 4)
00268 {
00269 *tempptr++ = buffer.c[2];
00270 *tempptr++ = buffer.c[1];
00271 *tempptr++ = buffer.c[0];
00272 buffer.i = 0;
00273 }
00274 }
00275 *tempptr = '\0';
00276 strcpy(data, temp);
00277 delete temp;
00278 }
00279
00280 int MJPEGServer::hexCharToInt(char in)
00281 {
00282 if (in >= '0' && in <= '9')
00283 return in - '0';
00284
00285 if (in >= 'a' && in <= 'f')
00286 return (in - 'a') + 10;
00287
00288 if (in >= 'A' && in <= 'F')
00289 return (in - 'A') + 10;
00290
00291 return -1;
00292 }
00293
00294 int MJPEGServer::unescape(char *string)
00295 {
00296 char *source = string, *destination = string;
00297 int src, dst, length = strlen(string), rc;
00298
00299
00300 for (dst = 0, src = 0; src < length; src++)
00301 {
00302
00303
00304 if (source[src] != '%')
00305 {
00306
00307 destination[dst] = source[src];
00308 dst++;
00309 continue;
00310 }
00311
00312
00313
00314
00315 if (src + 2 > length)
00316 {
00317 return -1;
00318 break;
00319 }
00320
00321
00322 if ((rc = hexCharToInt(source[src + 1])) == -1)
00323 return -1;
00324 destination[dst] = rc * 16;
00325 if ((rc = hexCharToInt(source[src + 2])) == -1)
00326 return -1;
00327 destination[dst] += rc;
00328
00329
00330 dst++;
00331 src += 2;
00332 }
00333
00334
00335 destination[dst] = '\0';
00336
00337 return 0;
00338 }
00339
00340 void MJPEGServer::sendError(int fd, int which, const char *message)
00341 {
00342 char buffer[BUFFER_SIZE] = {0};
00343
00344 if (which == 401)
00345 {
00346 sprintf(buffer, "HTTP/1.0 401 Unauthorized\r\n"
00347 "Content-type: text/plain\r\n"
00348 "%s"
00349 "WWW-Authenticate: Basic realm=\"MJPG-Streamer\"\r\n"
00350 "\r\n"
00351 "401: Not Authenticated!\r\n"
00352 "%s",
00353 header.c_str(), message);
00354 }
00355 else if (which == 404)
00356 {
00357 sprintf(buffer, "HTTP/1.0 404 Not Found\r\n"
00358 "Content-type: text/plain\r\n"
00359 "%s"
00360 "\r\n"
00361 "404: Not Found!\r\n"
00362 "%s",
00363 header.c_str(), message);
00364 }
00365 else if (which == 500)
00366 {
00367 sprintf(buffer, "HTTP/1.0 500 Internal Server Error\r\n"
00368 "Content-type: text/plain\r\n"
00369 "%s"
00370 "\r\n"
00371 "500: Internal Server Error!\r\n"
00372 "%s",
00373 header.c_str(), message);
00374 }
00375 else if (which == 400)
00376 {
00377 sprintf(buffer, "HTTP/1.0 400 Bad Request\r\n"
00378 "Content-type: text/plain\r\n"
00379 "%s"
00380 "\r\n"
00381 "400: Not Found!\r\n"
00382 "%s",
00383 header.c_str(), message);
00384 }
00385 else
00386 {
00387 sprintf(buffer, "HTTP/1.0 501 Not Implemented\r\n"
00388 "Content-type: text/plain\r\n"
00389 "%s"
00390 "\r\n"
00391 "501: Not Implemented!\r\n"
00392 "%s",
00393 header.c_str(), message);
00394 }
00395
00396 if (write(fd, buffer, strlen(buffer)) < 0)
00397 {
00398 ROS_DEBUG("write failed, done anyway");
00399 }
00400 }
00401
00402 void MJPEGServer::decodeParameter(const std::string& parameter, ParameterMap& parameter_map)
00403 {
00404 std::vector<std::string> parameter_value_pairs;
00405 splitString(parameter, parameter_value_pairs, "?&");
00406
00407 for (size_t i = 0; i < parameter_value_pairs.size(); i++)
00408 {
00409 std::vector<std::string> parameter_value;
00410 splitString(parameter_value_pairs[i], parameter_value, "=");
00411 if (parameter_value.size() == 1)
00412 {
00413 parameter_map.insert(std::make_pair(parameter_value[0], std::string("")));
00414 }
00415 else if (parameter_value.size() == 2)
00416 {
00417 parameter_map.insert(std::make_pair(parameter_value[0], parameter_value[1]));
00418 }
00419 }
00420 }
00421
00422 ImageBuffer* MJPEGServer::getImageBuffer(const std::string& topic)
00423 {
00424 boost::unique_lock<boost::mutex> lock(image_maps_mutex_);
00425 ImageSubscriberMap::iterator it = image_subscribers_.find(topic);
00426 if (it == image_subscribers_.end())
00427 {
00428 image_subscribers_[topic] = image_transport_.subscribe(topic, 1,
00429 boost::bind(&MJPEGServer::imageCallback, this, _1, topic));
00430 image_buffers_[topic] = new ImageBuffer();
00431 ROS_INFO("Subscribing to topic %s", topic.c_str());
00432 }
00433 ImageBuffer* image_buffer = image_buffers_[topic];
00434 return image_buffer;
00435 }
00436
00437
00438 void MJPEGServer::invertImage(const cv::Mat& input, cv::Mat& output)
00439 {
00440
00441 cv::Mat_<cv::Vec3b>& input_img = (cv::Mat_<cv::Vec3b>&)input;
00442 cv::Mat_<cv::Vec3b>& output_img = (cv::Mat_<cv::Vec3b>&)output;
00443 cv::Size size = input.size();
00444
00445 for (int j = 0; j < size.height; ++j)
00446 for (int i = 0; i < size.width; ++i)
00447 {
00448
00449 output_img(size.height - j - 1, size.width - i - 1) = input_img(j, i);
00450 }
00451 return;
00452 }
00453
00454 void MJPEGServer::sendStream(int fd, const char *parameter)
00455 {
00456 unsigned char *frame = NULL, *tmp = NULL;
00457 int frame_size = 0, max_frame_size = 0;
00458 int tenk = 10 * 1024;
00459 char buffer[BUFFER_SIZE] = {0};
00460 double timestamp;
00461
00462
00463 cv_bridge::CvImage image_bridge;
00464
00465 ROS_DEBUG("Decoding parameter");
00466
00467 std::string params = parameter;
00468
00469 ParameterMap parameter_map;
00470 decodeParameter(params, parameter_map);
00471
00472 ParameterMap::iterator itp = parameter_map.find("topic");
00473 if (itp == parameter_map.end())
00474 return;
00475
00476 std::string topic = itp->second;
00477 increaseSubscriberCount(topic);
00478 ImageBuffer* image_buffer = getImageBuffer(topic);
00479
00480 ROS_DEBUG("preparing header");
00481 sprintf(buffer, "HTTP/1.0 200 OK\r\n"
00482 "%s"
00483 "Content-Type: multipart/x-mixed-replace;boundary=boundarydonotcross \r\n"
00484 "\r\n"
00485 "--boundarydonotcross \r\n",
00486 header.c_str());
00487
00488 if (write(fd, buffer, strlen(buffer)) < 0)
00489 {
00490 free(frame);
00491 return;
00492 }
00493
00494 ROS_DEBUG("Headers send, sending stream now");
00495
00496 while (!stop_requested_)
00497 {
00498 {
00499
00500 boost::unique_lock<boost::mutex> lock(image_buffer->mutex_);
00501 image_buffer->condition_.wait(lock);
00502
00503
00504 cv_bridge::CvImagePtr cv_msg;
00505 try
00506 {
00507 if (cv_msg = cv_bridge::toCvCopy(image_buffer->msg, "bgr8"))
00508 {
00509 ;
00510 }
00511 else
00512 {
00513 ROS_ERROR("Unable to convert %s image to bgr8", image_buffer->msg.encoding.c_str());
00514 return;
00515 }
00516 }
00517 catch (...)
00518 {
00519 ROS_ERROR("Unable to convert %s image to ipl format", image_buffer->msg.encoding.c_str());
00520 return;
00521 }
00522
00523
00524 cv::Mat img = cv_msg->image;
00525 std::vector<uchar> encoded_buffer;
00526 std::vector<int> encode_params;
00527
00528
00529
00530 if (parameter_map.find("invert") != parameter_map.end())
00531 {
00532 cv::Mat cloned_image = img.clone();
00533 invertImage(cloned_image, img);
00534 }
00535
00536
00537 int quality = 95;
00538 if (parameter_map.find("quality") != parameter_map.end())
00539 {
00540 quality = stringToInt(parameter_map["quality"]);
00541 }
00542 encode_params.push_back(CV_IMWRITE_JPEG_QUALITY);
00543 encode_params.push_back(quality);
00544
00545
00546 if (parameter_map.find("width") != parameter_map.end() && parameter_map.find("height") != parameter_map.end())
00547 {
00548 int width = stringToInt(parameter_map["width"]);
00549 int height = stringToInt(parameter_map["height"]);
00550 if (width > 0 && height > 0)
00551 {
00552 cv::Mat img_resized;
00553 cv::Size new_size(width, height);
00554 cv::resize(img, img_resized, new_size);
00555 cv::imencode(".jpeg", img_resized, encoded_buffer, encode_params);
00556 }
00557 else
00558 {
00559 cv::imencode(".jpeg", img, encoded_buffer, encode_params);
00560 }
00561 }
00562 else
00563 {
00564 cv::imencode(".jpeg", img, encoded_buffer, encode_params);
00565 }
00566
00567
00568 frame_size = encoded_buffer.size();
00569
00570
00571 if (frame_size > max_frame_size)
00572 {
00573 ROS_DEBUG("increasing frame buffer size to %d", frame_size);
00574
00575 max_frame_size = frame_size + tenk;
00576 if ((tmp = (unsigned char*)realloc(frame, max_frame_size)) == NULL)
00577 {
00578 free(frame);
00579 sendError(fd, 500, "not enough memory");
00580 return;
00581 }
00582 frame = tmp;
00583 }
00584
00585
00586 timestamp = ros::Time::now().toSec();
00587
00588 memcpy(frame, &encoded_buffer[0], frame_size);
00589 ROS_DEBUG("got frame (size: %d kB)", frame_size / 1024);
00590 }
00591
00592
00593
00594
00595
00596
00597 sprintf(buffer, "Content-Type: image/jpeg\r\n"
00598 "Content-Length: %d\r\n"
00599 "X-Timestamp: %.06lf\r\n"
00600 "\r\n",
00601 frame_size, (double)timestamp);
00602 ROS_DEBUG("sending intemdiate header");
00603 if (write(fd, buffer, strlen(buffer)) < 0)
00604 break;
00605
00606 ROS_DEBUG("sending frame");
00607 if (write(fd, frame, frame_size) < 0)
00608 break;
00609
00610 ROS_DEBUG("sending boundary");
00611 sprintf(buffer, "\r\n--boundarydonotcross \r\n");
00612 if (write(fd, buffer, strlen(buffer)) < 0)
00613 break;
00614 }
00615
00616 free(frame);
00617 decreaseSubscriberCount(topic);
00618 unregisterSubscriberIfPossible(topic);
00619
00620 }
00621
00622 void MJPEGServer::sendSnapshot(int fd, const char *parameter)
00623 {
00624 unsigned char *frame = NULL;
00625 int frame_size = 0;
00626 char buffer[BUFFER_SIZE] = {0};
00627 double timestamp;
00628
00629
00630
00631 std::string params = parameter;
00632 ParameterMap parameter_map;
00633 decodeParameter(params, parameter_map);
00634
00635 ParameterMap::iterator itp = parameter_map.find("topic");
00636 if (itp == parameter_map.end())
00637 return;
00638
00639 std::string topic = itp->second;
00640 increaseSubscriberCount(topic);
00641 ImageBuffer* image_buffer = getImageBuffer(topic);
00642
00643
00644 boost::unique_lock<boost::mutex> lock(image_buffer->mutex_);
00645 image_buffer->condition_.wait(lock);
00646
00647
00648 cv_bridge::CvImagePtr cv_msg;
00649 try
00650 {
00651 if (cv_msg = cv_bridge::toCvCopy(image_buffer->msg, "bgr8"))
00652 {
00653 ;
00654 }
00655 else
00656 {
00657 ROS_ERROR("Unable to convert %s image to bgr8", image_buffer->msg.encoding.c_str());
00658 return;
00659 }
00660 }
00661 catch (...)
00662 {
00663 ROS_ERROR("Unable to convert %s image to ipl format", image_buffer->msg.encoding.c_str());
00664 return;
00665 }
00666
00667 cv::Mat img = cv_msg->image;
00668 std::vector<uchar> encoded_buffer;
00669 std::vector<int> encode_params;
00670
00671
00672
00673 if (parameter_map.find("invert") != parameter_map.end())
00674 {
00675 cv::Mat cloned_image = img.clone();
00676 invertImage(cloned_image, img);
00677 }
00678
00679
00680 int quality = 95;
00681 if (parameter_map.find("quality") != parameter_map.end())
00682 {
00683 quality = stringToInt(parameter_map["quality"]);
00684 }
00685 encode_params.push_back(CV_IMWRITE_JPEG_QUALITY);
00686 encode_params.push_back(quality);
00687
00688
00689 if (parameter_map.find("width") != parameter_map.end() && parameter_map.find("height") != parameter_map.end())
00690 {
00691 int width = stringToInt(parameter_map["width"]);
00692 int height = stringToInt(parameter_map["height"]);
00693 if (width > 0 && height > 0)
00694 {
00695 cv::Mat img_resized;
00696 cv::Size new_size(width, height);
00697 cv::resize(img, img_resized, new_size);
00698 cv::imencode(".jpeg", img_resized, encoded_buffer, encode_params);
00699 }
00700 else
00701 {
00702 cv::imencode(".jpeg", img, encoded_buffer, encode_params);
00703 }
00704 }
00705 else
00706 {
00707 cv::imencode(".jpeg", img, encoded_buffer, encode_params);
00708 }
00709
00710
00711 frame_size = encoded_buffer.size();
00712
00713
00714 if ((frame = (unsigned char*)malloc(frame_size)) == NULL)
00715 {
00716 free(frame);
00717 sendError(fd, 500, "not enough memory");
00718 return;
00719 }
00720
00721
00722 timestamp = ros::Time::now().toSec();
00723
00724 memcpy(frame, &encoded_buffer[0], frame_size);
00725 ROS_DEBUG("got frame (size: %d kB)", frame_size / 1024);
00726
00727
00728 sprintf(buffer, "HTTP/1.0 200 OK\r\n"
00729 "%s"
00730 "Content-type: image/jpeg\r\n"
00731 "X-Timestamp: %.06lf\r\n"
00732 "\r\n",
00733 header.c_str(), (double)timestamp);
00734
00735
00736 if (write(fd, buffer, strlen(buffer)) < 0 || write(fd, frame, frame_size) < 0)
00737 {
00738 free(frame);
00739 return;
00740 }
00741
00742 free(frame);
00743 decreaseSubscriberCount(topic);
00744 unregisterSubscriberIfPossible(topic);
00745 }
00746
00747 void MJPEGServer::client(int fd)
00748 {
00749 int cnt;
00750 char buffer[BUFFER_SIZE] = {0}, *pb = buffer;
00751 iobuffer iobuf;
00752 request req;
00753
00754
00755 initIOBuffer(&iobuf);
00756 initRequest(&req);
00757
00758
00759 memset(buffer, 0, sizeof(buffer));
00760 if ((cnt = readLineWithTimeout(fd, &iobuf, buffer, sizeof(buffer) - 1, 5)) == -1)
00761 {
00762 close(fd);
00763 return;
00764 }
00765
00766
00767 if (strstr(buffer, "GET /?") != NULL)
00768 {
00769 req.type = A_STREAM;
00770
00771
00772 if ((pb = strstr(buffer, "GET /")) == NULL)
00773 {
00774 ROS_DEBUG("HTTP request seems to be malformed");
00775 sendError(fd, 400, "Malformed HTTP request");
00776 close(fd);
00777 return;
00778 }
00779 pb += strlen("GET /");
00780 int len = min(max((int)strspn(pb, "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ._/-1234567890?="), 0), 100);
00781 req.parameter = (char*)malloc(len + 1);
00782 if (req.parameter == NULL)
00783 {
00784 exit(EXIT_FAILURE);
00785 }
00786 memset(req.parameter, 0, len + 1);
00787 strncpy(req.parameter, pb, len);
00788
00789 ROS_DEBUG("requested image topic[%d]: \"%s\"", len, req.parameter);
00790 }
00791 else if (strstr(buffer, "GET /stream?") != NULL)
00792 {
00793 req.type = A_STREAM;
00794
00795
00796 if ((pb = strstr(buffer, "GET /stream")) == NULL)
00797 {
00798 ROS_DEBUG("HTTP request seems to be malformed");
00799 sendError(fd, 400, "Malformed HTTP request");
00800 close(fd);
00801 return;
00802 }
00803 pb += strlen("GET /stream");
00804 int len = min(max((int)strspn(pb, "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ._/-1234567890?="), 0), 100);
00805 req.parameter = (char*)malloc(len + 1);
00806 if (req.parameter == NULL)
00807 {
00808 exit(EXIT_FAILURE);
00809 }
00810 memset(req.parameter, 0, len + 1);
00811 strncpy(req.parameter, pb, len);
00812
00813 ROS_DEBUG("requested image topic[%d]: \"%s\"", len, req.parameter);
00814 }
00815 else if (strstr(buffer, "GET /snapshot?") != NULL)
00816 {
00817 req.type = A_SNAPSHOT;
00818
00819
00820 if ((pb = strstr(buffer, "GET /snapshot")) == NULL)
00821 {
00822 ROS_DEBUG("HTTP request seems to be malformed");
00823 sendError(fd, 400, "Malformed HTTP request");
00824 close(fd);
00825 return;
00826 }
00827 pb += strlen("GET /snapshot");
00828 int len = min(max((int)strspn(pb, "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ._/-1234567890?="), 0), 100);
00829 req.parameter = (char*)malloc(len + 1);
00830 if (req.parameter == NULL)
00831 {
00832 exit(EXIT_FAILURE);
00833 }
00834 memset(req.parameter, 0, len + 1);
00835 strncpy(req.parameter, pb, len);
00836
00837 ROS_DEBUG("requested image topic[%d]: \"%s\"", len, req.parameter);
00838 }
00839
00840
00841
00842
00843
00844 do
00845 {
00846 memset(buffer, 0, sizeof(buffer));
00847
00848 if ((cnt = readLineWithTimeout(fd, &iobuf, buffer, sizeof(buffer) - 1, 5)) == -1)
00849 {
00850 freeRequest(&req);
00851 close(fd);
00852 return;
00853 }
00854
00855 if (strstr(buffer, "User-Agent: ") != NULL)
00856 {
00857 req.client = strdup(buffer + strlen("User-Agent: "));
00858 }
00859 else if (strstr(buffer, "Authorization: Basic ") != NULL)
00860 {
00861 req.credentials = strdup(buffer + strlen("Authorization: Basic "));
00862 decodeBase64(req.credentials);
00863 ROS_DEBUG("username:password: %s", req.credentials);
00864 }
00865
00866 } while (cnt > 2 && !(buffer[0] == '\r' && buffer[1] == '\n'));
00867
00868
00869 switch (req.type)
00870 {
00871 case A_STREAM:
00872 {
00873 ROS_DEBUG("Request for streaming");
00874 sendStream(fd, req.parameter);
00875 break;
00876 }
00877 case A_SNAPSHOT:
00878 {
00879 ROS_DEBUG("Request for snapshot");
00880 sendSnapshot(fd, req.parameter);
00881 break;
00882 }
00883 default:
00884 ROS_DEBUG("unknown request");
00885 }
00886
00887 close(fd);
00888 freeRequest(&req);
00889 ROS_INFO("Disconnecting HTTP client");
00890 return;
00891 }
00892
00893 void MJPEGServer::execute()
00894 {
00895
00896 ROS_INFO("Starting mjpeg server");
00897
00898 struct addrinfo *aip, *aip2;
00899 struct addrinfo hints;
00900 struct sockaddr_storage client_addr;
00901 socklen_t addr_len = sizeof(struct sockaddr_storage);
00902 fd_set selectfds;
00903 int max_fds = 0;
00904
00905 char name[NI_MAXHOST];
00906
00907 bzero(&hints, sizeof(hints));
00908 hints.ai_family = PF_UNSPEC;
00909 hints.ai_flags = AI_PASSIVE;
00910 hints.ai_socktype = SOCK_STREAM;
00911
00912 int err;
00913 snprintf(name, sizeof(name), "%d", port_);
00914 if ((err = getaddrinfo(NULL, name, &hints, &aip)) != 0)
00915 {
00916 perror(gai_strerror(err));
00917 exit(EXIT_FAILURE);
00918 }
00919
00920 for (int i = 0; i < MAX_NUM_SOCKETS; i++)
00921 sd[i] = -1;
00922
00923
00924 int i = 0;
00925 int on;
00926 for (aip2 = aip; aip2 != NULL; aip2 = aip2->ai_next)
00927 {
00928 if ((sd[i] = socket(aip2->ai_family, aip2->ai_socktype, 0)) < 0)
00929 {
00930 continue;
00931 }
00932
00933
00934 on = 1;
00935 if (setsockopt(sd[i], SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0)
00936 {
00937 perror("setsockopt(SO_REUSEADDR) failed");
00938 }
00939
00940
00941 on = 1;
00942 if (aip2->ai_family == AF_INET6 && setsockopt(sd[i], IPPROTO_IPV6, IPV6_V6ONLY, (const void *)&on, sizeof(on)) < 0)
00943 {
00944 perror("setsockopt(IPV6_V6ONLY) failed");
00945 }
00946
00947
00948
00949
00950 if (bind(sd[i], aip2->ai_addr, aip2->ai_addrlen) < 0)
00951 {
00952 perror("bind");
00953 sd[i] = -1;
00954 continue;
00955 }
00956
00957 if (listen(sd[i], 10) < 0)
00958 {
00959 perror("listen");
00960 sd[i] = -1;
00961 }
00962 else
00963 {
00964 i++;
00965 if (i >= MAX_NUM_SOCKETS)
00966 {
00967 ROS_ERROR("Maximum number of server sockets exceeded");
00968 i--;
00969 break;
00970 }
00971 }
00972 }
00973
00974 sd_len = i;
00975
00976 if (sd_len < 1)
00977 {
00978 ROS_ERROR("Bind(%d) failed", port_);
00979 closelog();
00980 exit(EXIT_FAILURE);
00981 }
00982 else
00983 {
00984 ROS_INFO("Bind(%d) succeeded", port_);
00985 }
00986
00987 ROS_INFO("waiting for clients to connect");
00988
00989
00990 while (!stop_requested_)
00991 {
00992
00993 do
00994 {
00995 FD_ZERO(&selectfds);
00996
00997 for (i = 0; i < MAX_NUM_SOCKETS; i++)
00998 {
00999 if (sd[i] != -1)
01000 {
01001 FD_SET(sd[i], &selectfds);
01002
01003 if (sd[i] > max_fds)
01004 max_fds = sd[i];
01005 }
01006 }
01007
01008 err = select(max_fds + 1, &selectfds, NULL, NULL, NULL);
01009
01010 if (err < 0 && errno != EINTR)
01011 {
01012 perror("select");
01013 exit(EXIT_FAILURE);
01014 }
01015 } while (err <= 0);
01016
01017 ROS_INFO("Client connected");
01018
01019 for (i = 0; i < max_fds + 1; i++)
01020 {
01021 if (sd[i] != -1 && FD_ISSET(sd[i], &selectfds))
01022 {
01023 int fd = accept(sd[i], (struct sockaddr *)&client_addr, &addr_len);
01024
01025
01026 ROS_DEBUG("create thread to handle client that just established a connection");
01027
01028 if (getnameinfo((struct sockaddr *)&client_addr, addr_len, name, sizeof(name), NULL, 0, NI_NUMERICHOST) == 0)
01029 {
01030 syslog(LOG_INFO, "serving client: %s\n", name);
01031 }
01032
01033 boost::thread t(boost::bind(&MJPEGServer::client, this, fd));
01034 t.detach();
01035 }
01036 }
01037 }
01038
01039 ROS_INFO("leaving server thread, calling cleanup function now");
01040 cleanUp();
01041 }
01042
01043 void MJPEGServer::cleanUp()
01044 {
01045 ROS_INFO("cleaning up ressources allocated by server thread");
01046
01047 for (int i = 0; i < MAX_NUM_SOCKETS; i++)
01048 close(sd[i]);
01049 }
01050
01051 void MJPEGServer::spin()
01052 {
01053 boost::thread t(boost::bind(&MJPEGServer::execute, this));
01054 t.detach();
01055 ros::spin();
01056 ROS_INFO("stop requested");
01057 stop();
01058 }
01059
01060 void MJPEGServer::stop()
01061 {
01062 stop_requested_ = true;
01063 }
01064
01065 void MJPEGServer::decreaseSubscriberCount(const std::string topic)
01066 {
01067 boost::unique_lock<boost::mutex> lock(image_maps_mutex_);
01068 ImageSubscriberCountMap::iterator it = image_subscribers_count_.find(topic);
01069 if (it != image_subscribers_count_.end())
01070 {
01071 if (image_subscribers_count_[topic] == 1) {
01072 image_subscribers_count_.erase(it);
01073 ROS_INFO("no subscribers for %s", topic.c_str());
01074 }
01075 else if (image_subscribers_count_[topic] > 0) {
01076 image_subscribers_count_[topic] = image_subscribers_count_[topic] - 1;
01077 ROS_INFO("%lu subscribers for %s", image_subscribers_count_[topic], topic.c_str());
01078 }
01079 }
01080 else
01081 {
01082 ROS_INFO("no subscribers counter for %s", topic.c_str());
01083 }
01084 }
01085
01086 void MJPEGServer::increaseSubscriberCount(const std::string topic)
01087 {
01088 boost::unique_lock<boost::mutex> lock(image_maps_mutex_);
01089 ImageSubscriberCountMap::iterator it = image_subscribers_count_.find(topic);
01090 if (it == image_subscribers_count_.end())
01091 {
01092 image_subscribers_count_.insert(ImageSubscriberCountMap::value_type(topic, 1));
01093 }
01094 else {
01095 image_subscribers_count_[topic] = image_subscribers_count_[topic] + 1;
01096 }
01097 ROS_INFO("%lu subscribers for %s", image_subscribers_count_[topic], topic.c_str());
01098 }
01099
01100 void MJPEGServer::unregisterSubscriberIfPossible(const std::string topic)
01101 {
01102 boost::unique_lock<boost::mutex> lock(image_maps_mutex_);
01103 ImageSubscriberCountMap::iterator it = image_subscribers_count_.find(topic);
01104 if (it == image_subscribers_count_.end() ||
01105 image_subscribers_count_[topic] == 0)
01106 {
01107 ImageSubscriberMap::iterator sub_it = image_subscribers_.find(topic);
01108 if (sub_it != image_subscribers_.end())
01109 {
01110 ROS_INFO("Unsubscribing from %s", topic.c_str());
01111 image_subscribers_.erase(sub_it);
01112 }
01113 }
01114 }
01115 }
01116
01117 int main(int argc, char** argv)
01118 {
01119 ros::init(argc, argv, "mjpeg_server");
01120
01121 ros::NodeHandle nh;
01122 mjpeg_server::MJPEGServer server(nh);
01123 server.spin();
01124
01125 return (0);
01126 }
01127