00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036 #include "mjpeg_server/mjpeg_server.h"
00037 #include <highgui.h>
00038
00039 #include <sys/ioctl.h>
00040 #include <errno.h>
00041 #include <signal.h>
00042 #include <sys/socket.h>
00043 #include <arpa/inet.h>
00044 #include <sys/types.h>
00045 #include <sys/stat.h>
00046 #include <fcntl.h>
00047 #include <syslog.h>
00048 #include <netdb.h>
00049 #include <errno.h>
00050
00051 #include <opencv2/opencv.hpp>
00052
00053 #include <boost/thread.hpp>
00054 #include <boost/bind.hpp>
00055
00056
00057 template <typename T>
00058 inline T ABS(T a){
00059 return (a < 0 ? -a : a);
00060 }
00061
00062 template <typename T>
00063 inline T min(T a, T b){
00064 return (a < b ? a : b);
00065 }
00066
00067 template <typename T>
00068 inline T max(T a, T b){
00069 return (a > b ? a : b);
00070 }
00071
00072 template <typename T>
00073 inline T LENGTH_OF(T x){
00074 return (sizeof(x)/sizeof(x[0]));
00075 }
00076
00077
00078
00079 namespace mjpeg_server {
00080
00081 MJPEGServer::MJPEGServer(ros::NodeHandle& node) :
00082 node_(node), image_transport_(node), stop_requested_(false), www_folder_(NULL)
00083 {
00084 ros::NodeHandle private_nh("~");
00085 private_nh.param("port", port_, 8080);
00086 header= "Connection: close\r\nServer: mjpeg_server\r\n" \
00087 "Cache-Control: no-cache, no-store, must-revalidate, pre-check=0, post-check=0, max-age=0\r\n" \
00088 "Pragma: no-cache\r\n";
00089 }
00090
00091 MJPEGServer::~MJPEGServer() {
00092 cleanUp();
00093 }
00094
00095 void MJPEGServer::imageCallback(const sensor_msgs::ImageConstPtr& msg, const std::string& topic) {
00096
00097 ImageBuffer* image_buffer = getImageBuffer(topic);
00098 boost::unique_lock<boost::mutex> lock(image_buffer->mutex_);
00099
00100 image_buffer->msg = *msg;
00101
00102 image_buffer->condition_.notify_all();
00103 }
00104
00105 void MJPEGServer::splitString(const std::string& str, std::vector<std::string>& tokens, const std::string& delimiter)
00106 {
00107
00108 std::string::size_type lastPos = str.find_first_not_of(delimiter, 0);
00109
00110 std::string::size_type pos = str.find_first_of(delimiter, lastPos);
00111
00112 while (std::string::npos != pos || std::string::npos != lastPos)
00113 {
00114
00115 tokens.push_back(str.substr(lastPos, pos - lastPos));
00116
00117 lastPos = str.find_first_not_of(delimiter, pos);
00118
00119 pos = str.find_first_of(delimiter, lastPos);
00120 }
00121 }
00122
00123 int MJPEGServer::stringToInt(const std::string& str, const int default_value)
00124 {
00125 int value;
00126 int res;
00127 if(str.length() == 0)
00128 return default_value;
00129 res = sscanf(str.c_str(),"%i",&value);
00130 if (res == 1)
00131 return value;
00132 return default_value;
00133 }
00134
00135 void MJPEGServer::initIOBuffer(iobuffer *iobuf)
00136 {
00137 memset(iobuf->buffer, 0, sizeof(iobuf->buffer));
00138 iobuf->level = 0;
00139 }
00140
00141 void MJPEGServer::initRequest(request *req)
00142 {
00143 req->type = A_UNKNOWN;
00144 req->type = A_UNKNOWN;
00145 req->parameter = NULL;
00146 req->client = NULL;
00147 req->credentials = NULL;
00148 }
00149
00150 void MJPEGServer::freeRequest(request *req)
00151 {
00152 if(req->parameter != NULL) free(req->parameter);
00153 if(req->client != NULL) free(req->client);
00154 if(req->credentials != NULL) free(req->credentials);
00155 }
00156
00157 int MJPEGServer::readWithTimeout(int fd, iobuffer *iobuf, char *buffer, size_t len, int timeout)
00158 {
00159 size_t copied = 0;
00160 int rc, i;
00161 fd_set fds;
00162 struct timeval tv;
00163
00164 memset(buffer, 0, len);
00165
00166 while((copied < len)) {
00167 i = min((size_t)iobuf->level, len - copied);
00168 memcpy(buffer + copied, iobuf->buffer + IO_BUFFER - iobuf->level, i);
00169
00170 iobuf->level -= i;
00171 copied += i;
00172 if(copied >= len)
00173 return copied;
00174
00175
00176 tv.tv_sec = timeout;
00177 tv.tv_usec = 0;
00178 FD_ZERO(&fds);
00179 FD_SET(fd, &fds);
00180 if((rc = select(fd + 1, &fds, NULL, NULL, &tv)) <= 0) {
00181 if(rc < 0)
00182 exit(EXIT_FAILURE);
00183
00184
00185 return copied;
00186 }
00187
00188 initIOBuffer(iobuf);
00189
00190
00191
00192
00193
00194
00195
00196 if((iobuf->level = read(fd, &iobuf->buffer, IO_BUFFER)) <= 0) {
00197
00198 return -1;
00199 }
00200
00201
00202 memmove(iobuf->buffer + (IO_BUFFER - iobuf->level), iobuf->buffer, iobuf->level);
00203 }
00204
00205 return 0;
00206 }
00207
00208 int MJPEGServer::readLineWithTimeout(int fd, iobuffer *iobuf, char *buffer, size_t len, int timeout)
00209 {
00210 char c = '\0', *out = buffer;
00211 unsigned int i;
00212
00213 memset(buffer, 0, len);
00214
00215 for(i = 0; i < len && c != '\n'; i++) {
00216 if(readWithTimeout(fd, iobuf, &c, 1, timeout) <= 0) {
00217
00218 return -1;
00219 }
00220 *out++ = c;
00221 }
00222
00223 return i;
00224 }
00225
00226 void MJPEGServer::decodeBase64(char *data)
00227 {
00228 union {
00229 int i;
00230 char c[4];
00231 } buffer;
00232
00233 char* ptr = data;
00234 unsigned int size = strlen(data);
00235 char* temp = new char[size];
00236 char* tempptr = temp;
00237 char t;
00238
00239 for(buffer.i = 0,t = *ptr; ptr; ptr++)
00240 {
00241 if(t >= 'A' && t <= 'Z')
00242 t = t - 'A';
00243 else if(t >= 'a' && t <= 'z')
00244 t = t - 'a' + 26;
00245 else if(t >= '0' && t <= '9')
00246 t = t - '0' + 52;
00247 else if(t == '+')
00248 t = 62;
00249 else if(t == '/')
00250 t = 63;
00251 else
00252 continue;
00253
00254 buffer.i = (buffer.i << 6) | t;
00255
00256 if((ptr - data +1) % 4)
00257 {
00258 *tempptr++ = buffer.c[2];
00259 *tempptr++ = buffer.c[1];
00260 *tempptr++ = buffer.c[0];
00261 buffer.i = 0;
00262 }
00263 }
00264 *tempptr = '\0';
00265 strcpy(data,temp);
00266 delete temp;
00267 }
00268
00269 int MJPEGServer::hexCharToInt(char in)
00270 {
00271 if(in >= '0' && in <= '9')
00272 return in - '0';
00273
00274 if(in >= 'a' && in <= 'f')
00275 return (in - 'a') + 10;
00276
00277 if(in >= 'A' && in <= 'F')
00278 return (in - 'A') + 10;
00279
00280 return -1;
00281 }
00282
00283 int MJPEGServer::unescape(char *string)
00284 {
00285 char *source = string, *destination = string;
00286 int src, dst, length = strlen(string), rc;
00287
00288
00289 for(dst = 0, src = 0; src < length; src++) {
00290
00291
00292 if(source[src] != '%') {
00293
00294 destination[dst] = source[src];
00295 dst++;
00296 continue;
00297 }
00298
00299
00300
00301
00302 if(src + 2 > length) {
00303 return -1;
00304 break;
00305 }
00306
00307
00308 if((rc = hexCharToInt(source[src+1])) == -1) return -1;
00309 destination[dst] = rc * 16;
00310 if((rc = hexCharToInt(source[src+2])) == -1) return -1;
00311 destination[dst] += rc;
00312
00313
00314 dst++; src += 2;
00315 }
00316
00317
00318 destination[dst] = '\0';
00319
00320 return 0;
00321 }
00322
00323 void MJPEGServer::sendError(int fd, int which, const char *message)
00324 {
00325 char buffer[BUFFER_SIZE] = {0};
00326
00327 if(which == 401) {
00328 sprintf(buffer, "HTTP/1.0 401 Unauthorized\r\n" \
00329 "Content-type: text/plain\r\n" \
00330 "%s"\
00331 "WWW-Authenticate: Basic realm=\"MJPG-Streamer\"\r\n" \
00332 "\r\n" \
00333 "401: Not Authenticated!\r\n" \
00334 "%s", header.c_str(), message);
00335 } else if(which == 404) {
00336 sprintf(buffer, "HTTP/1.0 404 Not Found\r\n" \
00337 "Content-type: text/plain\r\n" \
00338 "%s" \
00339 "\r\n" \
00340 "404: Not Found!\r\n" \
00341 "%s", header.c_str(), message);
00342 } else if(which == 500) {
00343 sprintf(buffer, "HTTP/1.0 500 Internal Server Error\r\n" \
00344 "Content-type: text/plain\r\n" \
00345 "%s"\
00346 "\r\n" \
00347 "500: Internal Server Error!\r\n" \
00348 "%s", header.c_str(), message);
00349 } else if(which == 400) {
00350 sprintf(buffer, "HTTP/1.0 400 Bad Request\r\n" \
00351 "Content-type: text/plain\r\n" \
00352 "%s"\
00353 "\r\n" \
00354 "400: Not Found!\r\n" \
00355 "%s", header.c_str(), message);
00356 } else {
00357 sprintf(buffer, "HTTP/1.0 501 Not Implemented\r\n" \
00358 "Content-type: text/plain\r\n" \
00359 "%s"\
00360 "\r\n" \
00361 "501: Not Implemented!\r\n" \
00362 "%s", header.c_str(), message);
00363 }
00364
00365 if(write(fd, buffer, strlen(buffer)) < 0) {
00366 ROS_DEBUG("write failed, done anyway");
00367 }
00368 }
00369
00370 void MJPEGServer::decodeParameter(const std::string& parameter, ParameterMap& parameter_map)
00371 {
00372 std::vector<std::string> parameter_value_pairs;
00373 splitString(parameter,parameter_value_pairs, "?&");
00374
00375 for(size_t i=0; i<parameter_value_pairs.size(); i++) {
00376 std::vector<std::string> parameter_value;
00377 splitString(parameter_value_pairs[i], parameter_value, "=");
00378 if(parameter_value.size()==1) {
00379 parameter_map.insert(std::make_pair(parameter_value[0],std::string("")));
00380 }
00381 else if(parameter_value.size()==2) {
00382 parameter_map.insert(std::make_pair(parameter_value[0],parameter_value[1]));
00383 }
00384 }
00385 }
00386
00387 ImageBuffer* MJPEGServer::getImageBuffer(const std::string& topic)
00388 {
00389 boost::unique_lock<boost::mutex> lock(image_maps_mutex_);
00390 ImageSubscriberMap::iterator it = image_subscribers_.find(topic);
00391 if (it == image_subscribers_.end()) {
00392 image_subscribers_[topic] = image_transport_.subscribe(topic, 1, boost::bind(&MJPEGServer::imageCallback, this, _1, topic));
00393 image_buffers_[topic] = new ImageBuffer();
00394 ROS_INFO("Subscribing to topic %s", topic.c_str());
00395 }
00396 ImageBuffer* image_buffer = image_buffers_[topic];
00397 return image_buffer;
00398 }
00399
00400
00401 void MJPEGServer::invertImage(const cv::Mat& input, cv::Mat& output) {
00402
00403 cv::Mat_<cv::Vec3b>& input_img = (cv::Mat_<cv::Vec3b>&)input;
00404 cv::Mat_<cv::Vec3b>& output_img = (cv::Mat_<cv::Vec3b>&)output;
00405 cv::Size size = input.size();
00406
00407 for (int j = 0; j < size.height; ++j)
00408 for (int i = 0; i < size.width; ++i) {
00409
00410 output_img(size.height-j-1, size.width-i-1) = input_img(j,i);
00411 }
00412 return;
00413 }
00414
00415 void MJPEGServer::sendStream(int fd, const char *parameter)
00416 {
00417 unsigned char *frame = NULL, *tmp = NULL;
00418 int frame_size = 0, max_frame_size = 0;
00419 int tenk=10*1024;
00420 char buffer[BUFFER_SIZE] = {0};
00421 struct timeval timestamp;
00422
00423
00424 cv_bridge::CvImage image_bridge;
00425
00426 ROS_DEBUG("Decoding parameter");
00427
00428 std::string params = parameter;
00429
00430
00431
00432
00433
00434 ParameterMap parameter_map;
00435 decodeParameter(params, parameter_map);
00436
00437 ParameterMap::iterator itp = parameter_map.find("topic");
00438 if (itp == parameter_map.end()) return;
00439
00440 std::string topic = itp->second;
00441 ImageBuffer* image_buffer = getImageBuffer(topic);
00442
00443 ROS_DEBUG("preparing header");
00444 sprintf(buffer, "HTTP/1.0 200 OK\r\n" \
00445 "%s" \
00446 "Content-Type: multipart/x-mixed-replace;boundary=boundarydonotcross \r\n" \
00447 "\r\n" \
00448 "--boundarydonotcross \r\n", header.c_str());
00449
00450 if(write(fd, buffer, strlen(buffer)) < 0) {
00451 free(frame);
00452 return;
00453 }
00454
00455 ROS_DEBUG("Headers send, sending stream now");
00456
00457 while(!stop_requested_) {
00458 {
00459
00460 boost::unique_lock<boost::mutex> lock(image_buffer->mutex_);
00461 image_buffer->condition_.wait(lock);
00462
00463
00464 cv_bridge::CvImagePtr cv_msg;
00465 try {
00466 if (cv_msg = cv_bridge::toCvCopy(image_buffer->msg, "bgr8")) {
00467 ;
00468 }
00469 else {
00470 ROS_ERROR("Unable to convert %s image to bgr8", image_buffer->msg.encoding.c_str());
00471 return;
00472 }
00473 }
00474 catch(...) {
00475 ROS_ERROR("Unable to convert %s image to ipl format", image_buffer->msg.encoding.c_str());
00476 return;
00477 }
00478
00479
00480 cv::Mat img = cv_msg->image;
00481 std::vector<uchar> encoded_buffer;
00482 std::vector<int> encode_params;
00483
00484
00485
00486 if(parameter_map.find("invert") != parameter_map.end()) {
00487 cv::Mat cloned_image = img.clone();
00488 invertImage(cloned_image, img);
00489 }
00490
00491
00492 int quality = 95;
00493 if(parameter_map.find("quality") != parameter_map.end()) {
00494 quality = stringToInt(parameter_map["quality"]);
00495 }
00496 encode_params.push_back(CV_IMWRITE_JPEG_QUALITY);
00497 encode_params.push_back(quality);
00498
00499
00500 if(parameter_map.find("width") != parameter_map.end() && parameter_map.find("height") != parameter_map.end()) {
00501 int width = stringToInt(parameter_map["width"]);
00502 int height = stringToInt(parameter_map["height"]);
00503 if(width > 0 && height > 0) {
00504 cv::Mat img_resized;
00505 cv::Size new_size(width,height);
00506 cv::resize(img, img_resized, new_size);
00507 cv::imencode(".jpeg", img_resized, encoded_buffer, encode_params);
00508 }
00509 else {
00510 cv::imencode(".jpeg", img, encoded_buffer, encode_params);
00511 }
00512 }
00513 else {
00514 cv::imencode(".jpeg", img, encoded_buffer, encode_params);
00515 }
00516
00517
00518 frame_size = encoded_buffer.size();
00519
00520
00521 if(frame_size > max_frame_size) {
00522 ROS_DEBUG("increasing frame buffer size to %d", frame_size);
00523
00524 max_frame_size = frame_size + tenk;
00525 if((tmp = (unsigned char*)realloc(frame, max_frame_size)) == NULL) {
00526 free(frame);
00527 sendError(fd, 500, "not enough memory");
00528 return;
00529 }
00530 frame = tmp;
00531 }
00532
00533
00534 timestamp.tv_sec = ros::Time::now().toSec();
00535
00536 memcpy(frame, &encoded_buffer[0], frame_size);
00537 ROS_DEBUG("got frame (size: %d kB)", frame_size / 1024);
00538 }
00539
00540
00541
00542
00543
00544
00545 sprintf(buffer, "Content-Type: image/jpeg\r\n" \
00546 "Content-Length: %d\r\n" \
00547 "X-Timestamp: %d.%06d\r\n" \
00548 "\r\n", frame_size, (int)timestamp.tv_sec, (int)timestamp.tv_usec);
00549 ROS_DEBUG("sending intemdiate header");
00550 if(write(fd, buffer, strlen(buffer)) < 0) break;
00551
00552 ROS_DEBUG("sending frame");
00553 if(write(fd, frame, frame_size) < 0) break;
00554
00555 ROS_DEBUG("sending boundary");
00556 sprintf(buffer, "\r\n--boundarydonotcross \r\n");
00557 if(write(fd, buffer, strlen(buffer)) < 0) break;
00558 }
00559
00560 free(frame);
00561 }
00562
00563 void MJPEGServer::sendSnapshot(int fd, const char *parameter)
00564 {
00565 unsigned char *frame = NULL;
00566 int frame_size = 0;
00567 char buffer[BUFFER_SIZE] = {0};
00568 struct timeval timestamp;
00569
00570
00571
00572 std::string params = parameter;
00573 ParameterMap parameter_map;
00574 decodeParameter(params, parameter_map);
00575
00576 ParameterMap::iterator itp = parameter_map.find("topic");
00577 if (itp == parameter_map.end()) return;
00578
00579 std::string topic = itp->second;
00580 ImageBuffer* image_buffer = getImageBuffer(topic);
00581
00582
00583 boost::unique_lock<boost::mutex> lock(image_buffer->mutex_);
00584 image_buffer->condition_.wait(lock);
00585
00586
00587 cv_bridge::CvImagePtr cv_msg;
00588 try {
00589 if (cv_msg = cv_bridge::toCvCopy(image_buffer->msg, "bgr8")) {
00590 ;
00591 }
00592 else {
00593 ROS_ERROR("Unable to convert %s image to bgr8", image_buffer->msg.encoding.c_str());
00594 return;
00595 }
00596 }
00597 catch(...) {
00598 ROS_ERROR("Unable to convert %s image to ipl format", image_buffer->msg.encoding.c_str());
00599 return;
00600 }
00601
00602 cv::Mat img = cv_msg->image;
00603 std::vector<uchar> encoded_buffer;
00604 std::vector<int> encode_params;
00605
00606
00607
00608 if(parameter_map.find("invert") != parameter_map.end()) {
00609 cv::Mat cloned_image = img.clone();
00610 invertImage(cloned_image, img);
00611 }
00612
00613
00614 int quality = 95;
00615 if(parameter_map.find("quality") != parameter_map.end()) {
00616 quality = stringToInt(parameter_map["quality"]);
00617 }
00618 encode_params.push_back(CV_IMWRITE_JPEG_QUALITY);
00619 encode_params.push_back(quality);
00620
00621
00622 if(parameter_map.find("width") != parameter_map.end() && parameter_map.find("height") != parameter_map.end()) {
00623 int width = stringToInt(parameter_map["width"]);
00624 int height = stringToInt(parameter_map["height"]);
00625 if(width > 0 && height > 0) {
00626 cv::Mat img_resized;
00627 cv::Size new_size(width,height);
00628 cv::resize(img, img_resized, new_size);
00629 cv::imencode(".jpeg", img_resized, encoded_buffer, encode_params);
00630 }
00631 else {
00632 cv::imencode(".jpeg", img, encoded_buffer, encode_params);
00633 }
00634 }
00635 else {
00636 cv::imencode(".jpeg", img, encoded_buffer, encode_params);
00637 }
00638
00639
00640 frame_size = encoded_buffer.size();
00641
00642
00643 if((frame = (unsigned char*)malloc(frame_size)) == NULL) {
00644 free(frame);
00645 sendError(fd, 500, "not enough memory");
00646 return;
00647 }
00648
00649
00650 timestamp.tv_sec = ros::Time::now().toSec();
00651
00652 memcpy(frame, &encoded_buffer[0], frame_size);
00653 ROS_DEBUG("got frame (size: %d kB)", frame_size / 1024);
00654
00655
00656 sprintf(buffer, "HTTP/1.0 200 OK\r\n" \
00657 "%s" \
00658 "Content-type: image/jpeg\r\n" \
00659 "X-Timestamp: %d.%06d\r\n" \
00660 "\r\n",header.c_str(), (int) timestamp.tv_sec, (int) timestamp.tv_usec);
00661
00662
00663 if(write(fd, buffer, strlen(buffer)) < 0 || \
00664 write(fd, frame, frame_size) < 0) {
00665 free(frame);
00666 return;
00667 }
00668
00669 free(frame);
00670 }
00671
00672 void MJPEGServer::client(int fd) {
00673 int cnt;
00674 char buffer[BUFFER_SIZE] = {0}, *pb = buffer;
00675 iobuffer iobuf;
00676 request req;
00677
00678
00679 initIOBuffer(&iobuf);
00680 initRequest(&req);
00681
00682
00683 memset(buffer, 0, sizeof(buffer));
00684 if((cnt = readLineWithTimeout(fd, &iobuf, buffer, sizeof(buffer) - 1, 5)) == -1) {
00685 close(fd);
00686 return;
00687 }
00688
00689
00690 if(strstr(buffer, "GET /?") != NULL) {
00691 req.type = A_STREAM;
00692
00693
00694 if((pb = strstr(buffer, "GET /")) == NULL) {
00695 ROS_DEBUG("HTTP request seems to be malformed");
00696 sendError(fd, 400, "Malformed HTTP request");
00697 close(fd);
00698 return;
00699 }
00700 pb += strlen("GET /");
00701 int len = min(max((int)strspn(pb, "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ._/-1234567890?="), 0), 100);
00702 req.parameter = (char*)malloc(len + 1);
00703 if(req.parameter == NULL) {
00704 exit(EXIT_FAILURE);
00705 }
00706 memset(req.parameter, 0, len + 1);
00707 strncpy(req.parameter, pb, len);
00708
00709 ROS_DEBUG("requested image topic[%d]: \"%s\"", len, req.parameter);
00710 }
00711 else if(strstr(buffer, "GET /stream?") != NULL) {
00712 req.type = A_STREAM;
00713
00714
00715 if((pb = strstr(buffer, "GET /stream")) == NULL) {
00716 ROS_DEBUG("HTTP request seems to be malformed");
00717 sendError(fd, 400, "Malformed HTTP request");
00718 close(fd);
00719 return;
00720 }
00721 pb += strlen("GET /stream");
00722 int len = min(max((int)strspn(pb, "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ._/-1234567890?="), 0), 100);
00723 req.parameter = (char*)malloc(len + 1);
00724 if(req.parameter == NULL) {
00725 exit(EXIT_FAILURE);
00726 }
00727 memset(req.parameter, 0, len + 1);
00728 strncpy(req.parameter, pb, len);
00729
00730 ROS_DEBUG("requested image topic[%d]: \"%s\"", len, req.parameter);
00731 }
00732 else if(strstr(buffer, "GET /snapshot?") != NULL) {
00733 req.type = A_SNAPSHOT;
00734
00735
00736 if((pb = strstr(buffer, "GET /snapshot")) == NULL) {
00737 ROS_DEBUG("HTTP request seems to be malformed");
00738 sendError(fd, 400, "Malformed HTTP request");
00739 close(fd);
00740 return;
00741 }
00742 pb += strlen("GET /snapshot");
00743 int len = min(max((int)strspn(pb, "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ._/-1234567890?="), 0), 100);
00744 req.parameter = (char*)malloc(len + 1);
00745 if(req.parameter == NULL) {
00746 exit(EXIT_FAILURE);
00747 }
00748 memset(req.parameter, 0, len + 1);
00749 strncpy(req.parameter, pb, len);
00750
00751 ROS_DEBUG("requested image topic[%d]: \"%s\"", len, req.parameter);
00752 }
00753
00754
00755
00756
00757
00758 do {
00759 memset(buffer, 0, sizeof(buffer));
00760
00761 if((cnt = readLineWithTimeout(fd, &iobuf, buffer, sizeof(buffer) - 1, 5)) == -1) {
00762 freeRequest(&req);
00763 close(fd);
00764 return;
00765 }
00766
00767 if(strstr(buffer, "User-Agent: ") != NULL) {
00768 req.client = strdup(buffer + strlen("User-Agent: "));
00769 } else if(strstr(buffer, "Authorization: Basic ") != NULL) {
00770 req.credentials = strdup(buffer + strlen("Authorization: Basic "));
00771 decodeBase64(req.credentials);
00772 ROS_DEBUG("username:password: %s", req.credentials);
00773 }
00774
00775 } while(cnt > 2 && !(buffer[0] == '\r' && buffer[1] == '\n'));
00776
00777
00778
00779
00780 switch(req.type) {
00781 case A_STREAM: {
00782 ROS_DEBUG("Request for streaming");
00783 sendStream(fd, req.parameter);
00784 break;
00785 }
00786 case A_SNAPSHOT: {
00787 ROS_DEBUG("Request for snapshot");
00788 sendSnapshot(fd, req.parameter);
00789 break;
00790 }
00791 default:
00792 ROS_DEBUG("unknown request");
00793 }
00794
00795 close(fd);
00796 freeRequest(&req);
00797
00798 ROS_INFO("Disconnecting HTTP client");
00799 return;
00800 }
00801
00802 void MJPEGServer::execute() {
00803
00804 ROS_INFO("Starting mjpeg server");
00805
00806 struct addrinfo *aip, *aip2;
00807 struct addrinfo hints;
00808 struct sockaddr_storage client_addr;
00809 socklen_t addr_len = sizeof(struct sockaddr_storage);
00810 fd_set selectfds;
00811 int max_fds = 0;
00812
00813 char name[NI_MAXHOST];
00814
00815 bzero(&hints, sizeof(hints));
00816 hints.ai_family = PF_UNSPEC;
00817 hints.ai_flags = AI_PASSIVE;
00818 hints.ai_socktype = SOCK_STREAM;
00819
00820 int err;
00821 snprintf(name, sizeof(name), "%d", port_);
00822 if((err = getaddrinfo(NULL, name, &hints, &aip)) != 0) {
00823 perror(gai_strerror(err));
00824 exit(EXIT_FAILURE);
00825 }
00826
00827 for(int i = 0; i < MAX_NUM_SOCKETS; i++)
00828 sd[i] = -1;
00829
00830
00831
00832 int i = 0;
00833 int on;
00834 for(aip2 = aip; aip2 != NULL; aip2 = aip2->ai_next) {
00835 if((sd[i] = socket(aip2->ai_family, aip2->ai_socktype, 0)) < 0) {
00836 continue;
00837 }
00838
00839
00840 on = 1;
00841 if(setsockopt(sd[i], SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0) {
00842 perror("setsockopt(SO_REUSEADDR) failed");
00843 }
00844
00845
00846 on = 1;
00847 if(aip2->ai_family == AF_INET6 && setsockopt(sd[i], IPPROTO_IPV6, IPV6_V6ONLY, (const void *)&on , sizeof(on)) < 0) {
00848 perror("setsockopt(IPV6_V6ONLY) failed");
00849 }
00850
00851
00852
00853
00854 if(bind(sd[i], aip2->ai_addr, aip2->ai_addrlen) < 0) {
00855 perror("bind");
00856 sd[i] = -1;
00857 continue;
00858 }
00859
00860 if(listen(sd[i], 10) < 0) {
00861 perror("listen");
00862 sd[i] = -1;
00863 } else {
00864 i++;
00865 if(i >= MAX_NUM_SOCKETS) {
00866 ROS_ERROR("Maximum number of server sockets exceeded");
00867 i--;
00868 break;
00869 }
00870 }
00871 }
00872
00873 sd_len = i;
00874
00875 if(sd_len < 1) {
00876 ROS_ERROR("Bind(%d) failed", port_);
00877 closelog();
00878 exit(EXIT_FAILURE);
00879 }
00880 else {
00881 ROS_INFO("Bind(%d) succeeded", port_);
00882 }
00883
00884 ROS_INFO("waiting for clients to connect");
00885
00886
00887 while(!stop_requested_) {
00888
00889 do {
00890 FD_ZERO(&selectfds);
00891
00892 for(i = 0; i < MAX_NUM_SOCKETS; i++) {
00893 if(sd[i] != -1) {
00894 FD_SET(sd[i], &selectfds);
00895
00896 if(sd[i] > max_fds)
00897 max_fds = sd[i];
00898 }
00899 }
00900
00901 err = select(max_fds + 1, &selectfds, NULL, NULL, NULL);
00902
00903 if(err < 0 && errno != EINTR) {
00904 perror("select");
00905 exit(EXIT_FAILURE);
00906 }
00907 } while(err <= 0);
00908
00909 ROS_INFO("Client connected");
00910
00911 for(i = 0; i < max_fds + 1; i++) {
00912 if(sd[i] != -1 && FD_ISSET(sd[i], &selectfds)) {
00913 int fd = accept(sd[i], (struct sockaddr *)&client_addr, &addr_len);
00914
00915
00916 ROS_DEBUG("create thread to handle client that just established a connection");
00917
00918 if(getnameinfo((struct sockaddr *)&client_addr, addr_len, name, sizeof(name), NULL, 0, NI_NUMERICHOST) == 0) {
00919 syslog(LOG_INFO, "serving client: %s\n", name);
00920 }
00921
00922 boost::thread t(boost::bind( &MJPEGServer::client, this, fd ));
00923 t.detach();
00924 }
00925 }
00926 }
00927
00928 ROS_INFO("leaving server thread, calling cleanup function now");
00929 cleanUp();
00930 }
00931
00932 void MJPEGServer::cleanUp() {
00933 ROS_INFO("cleaning up ressources allocated by server thread");
00934
00935 for(int i = 0; i < MAX_NUM_SOCKETS; i++)
00936 close(sd[i]);
00937 }
00938
00939 void MJPEGServer::spin() {
00940 boost::thread t(boost::bind( &MJPEGServer::execute, this ));
00941 t.detach();
00942 ros::spin();
00943 ROS_INFO("stop requested");
00944 stop();
00945 }
00946
00947 void MJPEGServer::stop() {
00948 stop_requested_ = true;
00949 }
00950
00951 }
00952
00953 int main(int argc, char** argv){
00954 ros::init(argc, argv, "mjpeg_server");
00955
00956 ros::NodeHandle nh;
00957 mjpeg_server::MJPEGServer server(nh);
00958 server.spin();
00959
00960 return(0);
00961 }
00962