00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036 #include "mjpeg_server/mjpeg_server.h"
00037 #include <highgui.h>
00038
00039 #include <sys/ioctl.h>
00040 #include <errno.h>
00041 #include <signal.h>
00042 #include <sys/socket.h>
00043 #include <arpa/inet.h>
00044 #include <sys/types.h>
00045 #include <sys/stat.h>
00046 #include <fcntl.h>
00047 #include <syslog.h>
00048 #include <netdb.h>
00049 #include <errno.h>
00050
00051 #include <boost/thread.hpp>
00052 #include <boost/bind.hpp>
00053
00054 #define ABS(a) (((a) < 0) ? -(a) : (a))
00055 #ifndef MIN
00056 #define MIN(a, b) (((a) < (b)) ? (a) : (b))
00057 #endif
00058 #ifndef MAX
00059 #define MAX(a, b) (((a) > (b)) ? (a) : (b))
00060 #endif
00061 #define LENGTH_OF(x) (sizeof(x)/sizeof(x[0]))
00062
00063
00064 #define BOUNDARY "boundarydonotcross"
00065
00066
00067
00068
00069
00070
00071 #define MAX_FRAME_SIZE (256*1024)
00072 #define TEN_K (10*1024)
00073
00074
00075
00076
00077
00078
00079
00080
00081
00082
00083 #define STD_HEADER "Connection: close\r\n" \
00084 "Server: mjpeg_server\r\n" \
00085 "Cache-Control: no-store, no-cache, must-revalidate, pre-check=0, post-check=0, max-age=0\r\n" \
00086 "Pragma: no-cache\r\n" \
00087 "Expires: Mon, 3 Jan 2000 12:34:56 GMT\r\n"
00088
00089
00090 namespace mjpeg_server {
00091
00092 MJPEGServer::MJPEGServer(ros::NodeHandle& node) :
00093 node_(node), image_transport_(node), stop_requested_(false), www_folder_(NULL)
00094 {
00095 ros::NodeHandle private_nh("~");
00096 private_nh.param("port", port_, 8080);
00097 }
00098
00099 MJPEGServer::~MJPEGServer() {
00100 cleanUp();
00101 }
00102
00103 void MJPEGServer::imageCallback(const sensor_msgs::ImageConstPtr& msg, const std::string& topic) {
00104
00105 ImageBuffer* image_buffer = getImageBuffer(topic);
00106 boost::unique_lock<boost::mutex> lock(image_buffer->mutex_);
00107
00108 image_buffer->msg = *msg;
00109
00110 image_buffer->condition_.notify_all();
00111 }
00112
00113 void MJPEGServer::splitString(const std::string& str, std::vector<std::string>& tokens, const std::string& delimiter)
00114 {
00115
00116 std::string::size_type lastPos = str.find_first_not_of(delimiter, 0);
00117
00118 std::string::size_type pos = str.find_first_of(delimiter, lastPos);
00119
00120 while (std::string::npos != pos || std::string::npos != lastPos)
00121 {
00122
00123 tokens.push_back(str.substr(lastPos, pos - lastPos));
00124
00125 lastPos = str.find_first_not_of(delimiter, pos);
00126
00127 pos = str.find_first_of(delimiter, lastPos);
00128 }
00129 }
00130
00131 int MJPEGServer::stringToInt(const std::string& str, const int default_value)
00132 {
00133 int value;
00134 int res;
00135 if(str.length() == 0)
00136 return default_value;
00137 res = sscanf(str.c_str(),"%i",&value);
00138 if (res == 1)
00139 return value;
00140 return default_value;
00141 }
00142
00143 void MJPEGServer::initIOBuffer(iobuffer *iobuf)
00144 {
00145 memset(iobuf->buffer, 0, sizeof(iobuf->buffer));
00146 iobuf->level = 0;
00147 }
00148
00149 void MJPEGServer::initRequest(request *req)
00150 {
00151 req->type = A_UNKNOWN;
00152 req->type = A_UNKNOWN;
00153 req->parameter = NULL;
00154 req->client = NULL;
00155 req->credentials = NULL;
00156 }
00157
00158 void MJPEGServer::freeRequest(request *req)
00159 {
00160 if(req->parameter != NULL) free(req->parameter);
00161 if(req->client != NULL) free(req->client);
00162 if(req->credentials != NULL) free(req->credentials);
00163 }
00164
00165 int MJPEGServer::readWithTimeout(int fd, iobuffer *iobuf, char *buffer, size_t len, int timeout)
00166 {
00167 int copied = 0, rc, i;
00168 fd_set fds;
00169 struct timeval tv;
00170
00171 memset(buffer, 0, len);
00172
00173 while((copied < len)) {
00174 i = MIN(iobuf->level, len - copied);
00175 memcpy(buffer + copied, iobuf->buffer + IO_BUFFER - iobuf->level, i);
00176
00177 iobuf->level -= i;
00178 copied += i;
00179 if(copied >= len)
00180 return copied;
00181
00182
00183 tv.tv_sec = timeout;
00184 tv.tv_usec = 0;
00185 FD_ZERO(&fds);
00186 FD_SET(fd, &fds);
00187 if((rc = select(fd + 1, &fds, NULL, NULL, &tv)) <= 0) {
00188 if(rc < 0)
00189 exit(EXIT_FAILURE);
00190
00191
00192 return copied;
00193 }
00194
00195 initIOBuffer(iobuf);
00196
00197
00198
00199
00200
00201
00202
00203 if((iobuf->level = read(fd, &iobuf->buffer, IO_BUFFER)) <= 0) {
00204
00205 return -1;
00206 }
00207
00208
00209 memmove(iobuf->buffer + (IO_BUFFER - iobuf->level), iobuf->buffer, iobuf->level);
00210 }
00211
00212 return 0;
00213 }
00214
00215 int MJPEGServer::readLineWithTimeout(int fd, iobuffer *iobuf, char *buffer, size_t len, int timeout)
00216 {
00217 char c = '\0', *out = buffer;
00218 int i;
00219
00220 memset(buffer, 0, len);
00221
00222 for(i = 0; i < len && c != '\n'; i++) {
00223 if(readWithTimeout(fd, iobuf, &c, 1, timeout) <= 0) {
00224
00225 return -1;
00226 }
00227 *out++ = c;
00228 }
00229
00230 return i;
00231 }
00232
00233 void MJPEGServer::decodeBase64(char *data)
00234 {
00235 const unsigned char *in = (const unsigned char *)data;
00236
00237 unsigned ch = 0;
00238 int i = 0;
00239
00240 while(*in) {
00241 int t = *in++;
00242
00243 if(t >= '0' && t <= '9')
00244 t = t - '0' + 52;
00245 else if(t >= 'A' && t <= 'Z')
00246 t = t - 'A';
00247 else if(t >= 'a' && t <= 'z')
00248 t = t - 'a' + 26;
00249 else if(t == '+')
00250 t = 62;
00251 else if(t == '/')
00252 t = 63;
00253 else if(t == '=')
00254 t = 0;
00255 else
00256 continue;
00257
00258 ch = (ch << 6) | t;
00259 i++;
00260 if(i == 4) {
00261 *data++ = (char)(ch >> 16);
00262 *data++ = (char)(ch >> 8);
00263 *data++ = (char) ch;
00264 i = 0;
00265 }
00266 }
00267 *data = '\0';
00268 }
00269
00270 int MJPEGServer::hexCharToInt(char in)
00271 {
00272 if(in >= '0' && in <= '9')
00273 return in - '0';
00274
00275 if(in >= 'a' && in <= 'f')
00276 return (in - 'a') + 10;
00277
00278 if(in >= 'A' && in <= 'F')
00279 return (in - 'A') + 10;
00280
00281 return -1;
00282 }
00283
00284 int MJPEGServer::unescape(char *string)
00285 {
00286 char *source = string, *destination = string;
00287 int src, dst, length = strlen(string), rc;
00288
00289
00290 for(dst = 0, src = 0; src < length; src++) {
00291
00292
00293 if(source[src] != '%') {
00294
00295 destination[dst] = source[src];
00296 dst++;
00297 continue;
00298 }
00299
00300
00301
00302
00303 if(src + 2 > length) {
00304 return -1;
00305 break;
00306 }
00307
00308
00309 if((rc = hexCharToInt(source[src+1])) == -1) return -1;
00310 destination[dst] = rc * 16;
00311 if((rc = hexCharToInt(source[src+2])) == -1) return -1;
00312 destination[dst] += rc;
00313
00314
00315 dst++; src += 2;
00316 }
00317
00318
00319 destination[dst] = '\0';
00320
00321 return 0;
00322 }
00323
00324 void MJPEGServer::sendError(int fd, int which, const char *message)
00325 {
00326 char buffer[BUFFER_SIZE] = {0};
00327
00328 if(which == 401) {
00329 sprintf(buffer, "HTTP/1.0 401 Unauthorized\r\n" \
00330 "Content-type: text/plain\r\n" \
00331 STD_HEADER \
00332 "WWW-Authenticate: Basic realm=\"MJPG-Streamer\"\r\n" \
00333 "\r\n" \
00334 "401: Not Authenticated!\r\n" \
00335 "%s", message);
00336 } else if(which == 404) {
00337 sprintf(buffer, "HTTP/1.0 404 Not Found\r\n" \
00338 "Content-type: text/plain\r\n" \
00339 STD_HEADER \
00340 "\r\n" \
00341 "404: Not Found!\r\n" \
00342 "%s", message);
00343 } else if(which == 500) {
00344 sprintf(buffer, "HTTP/1.0 500 Internal Server Error\r\n" \
00345 "Content-type: text/plain\r\n" \
00346 STD_HEADER \
00347 "\r\n" \
00348 "500: Internal Server Error!\r\n" \
00349 "%s", message);
00350 } else if(which == 400) {
00351 sprintf(buffer, "HTTP/1.0 400 Bad Request\r\n" \
00352 "Content-type: text/plain\r\n" \
00353 STD_HEADER \
00354 "\r\n" \
00355 "400: Not Found!\r\n" \
00356 "%s", message);
00357 } else {
00358 sprintf(buffer, "HTTP/1.0 501 Not Implemented\r\n" \
00359 "Content-type: text/plain\r\n" \
00360 STD_HEADER \
00361 "\r\n" \
00362 "501: Not Implemented!\r\n" \
00363 "%s", message);
00364 }
00365
00366 if(write(fd, buffer, strlen(buffer)) < 0) {
00367 ROS_DEBUG("write failed, done anyway");
00368 }
00369 }
00370
00371 void MJPEGServer::decodeParameter(const std::string& parameter, ParameterMap& parameter_map)
00372 {
00373 std::vector<std::string> parameter_value_pairs;
00374 splitString(parameter,parameter_value_pairs, "?&");
00375
00376 for(size_t i=0; i<parameter_value_pairs.size(); i++) {
00377 std::vector<std::string> parameter_value;
00378 splitString(parameter_value_pairs[i], parameter_value, "=");
00379 if(parameter_value.size()==1) {
00380 parameter_map.insert(std::make_pair(parameter_value[0],std::string("")));
00381 }
00382 else if(parameter_value.size()==2) {
00383 parameter_map.insert(std::make_pair(parameter_value[0],parameter_value[1]));
00384 }
00385 }
00386 }
00387
00388 ImageBuffer* MJPEGServer::getImageBuffer(const std::string& topic)
00389 {
00390 boost::unique_lock<boost::mutex> lock(image_maps_mutex_);
00391 ImageSubscriberMap::iterator it = image_subscribers_.find(topic);
00392 if (it == image_subscribers_.end()) {
00393 image_subscribers_[topic] = image_transport_.subscribe(topic, 1, boost::bind(&MJPEGServer::imageCallback, this, _1, topic));
00394 image_buffers_[topic] = new ImageBuffer();
00395 ROS_INFO("Subscribing to topic %s", topic.c_str());
00396 }
00397 ImageBuffer* image_buffer = image_buffers_[topic];
00398 return image_buffer;
00399 }
00400
00401
00402 void MJPEGServer::invertImage(const cv::Mat& input, cv::Mat& output) {
00403
00404 cv::Mat_<cv::Vec3b>& input_img = (cv::Mat_<cv::Vec3b>&)input;
00405 cv::Mat_<cv::Vec3b>& output_img = (cv::Mat_<cv::Vec3b>&)output;
00406 cv::Size size = input.size();
00407
00408 for (int j = 0; j < size.height; ++j)
00409 for (int i = 0; i < size.width; ++i) {
00410
00411 output_img(size.height-j-1, size.width-i-1) = input_img(j,i);
00412 }
00413 return;
00414 }
00415
00416 void MJPEGServer::sendStream(int fd, const char *parameter)
00417 {
00418 unsigned char *frame = NULL, *tmp = NULL;
00419 int frame_size = 0, max_frame_size = 0;
00420 char buffer[BUFFER_SIZE] = {0};
00421 struct timeval timestamp;
00422 sensor_msgs::CvBridge image_bridge;
00423
00424 ROS_DEBUG("Decoding parameter");
00425
00426 std::string params = parameter;
00427 ParameterMap parameter_map;
00428 decodeParameter(params, parameter_map);
00429
00430 ParameterMap::iterator itp = parameter_map.find("topic");
00431 if (itp == parameter_map.end()) return;
00432
00433 std::string topic = itp->second;
00434 ImageBuffer* image_buffer = getImageBuffer(topic);
00435
00436 ROS_DEBUG("preparing header");
00437 sprintf(buffer, "HTTP/1.0 200 OK\r\n" \
00438 STD_HEADER \
00439 "Content-Type: multipart/x-mixed-replace;boundary=" BOUNDARY "\r\n" \
00440 "\r\n" \
00441 "--" BOUNDARY "\r\n");
00442
00443 if(write(fd, buffer, strlen(buffer)) < 0) {
00444 free(frame);
00445 return;
00446 }
00447
00448 ROS_DEBUG("Headers send, sending stream now");
00449
00450 while(!stop_requested_) {
00451 {
00452
00453 boost::unique_lock<boost::mutex> lock(image_buffer->mutex_);
00454 image_buffer->condition_.wait(lock);
00455
00456 IplImage* image;
00457 try {
00458 if (image_bridge.fromImage(image_buffer->msg, "bgr8")) {
00459 image = image_bridge.toIpl();
00460 }
00461 else {
00462 ROS_ERROR("Unable to convert %s image to bgr8", image_buffer->msg.encoding.c_str());
00463 return;
00464 }
00465 }
00466 catch(...) {
00467 ROS_ERROR("Unable to convert %s image to ipl format", image_buffer->msg.encoding.c_str());
00468 return;
00469 }
00470
00471
00472 cv::Mat img = image;
00473 std::vector<uchar> encoded_buffer;
00474 std::vector<int> encode_params;
00475
00476
00477 int invert = 0;
00478 if(parameter_map.find("invert") != parameter_map.end()) {
00479 cv::Mat cloned_image = img.clone();
00480 invertImage(cloned_image, img);
00481 }
00482
00483
00484 int quality = 95;
00485 if(parameter_map.find("quality") != parameter_map.end()) {
00486 quality = stringToInt(parameter_map["quality"]);
00487 }
00488 encode_params.push_back(CV_IMWRITE_JPEG_QUALITY);
00489 encode_params.push_back(quality);
00490
00491
00492 if(parameter_map.find("width") != parameter_map.end() && parameter_map.find("height") != parameter_map.end()) {
00493 int width = stringToInt(parameter_map["width"]);
00494 int height = stringToInt(parameter_map["height"]);
00495 if(width > 0 && height > 0) {
00496 cv::Mat img_resized;
00497 cv::Size new_size(width,height);
00498 cv::resize(img, img_resized, new_size);
00499 cv::imencode(".jpeg", img_resized, encoded_buffer, encode_params);
00500 }
00501 else {
00502 cv::imencode(".jpeg", img, encoded_buffer, encode_params);
00503 }
00504 }
00505 else {
00506 cv::imencode(".jpeg", img, encoded_buffer, encode_params);
00507 }
00508
00509
00510 frame_size = encoded_buffer.size();
00511
00512
00513 if(frame_size > max_frame_size) {
00514 ROS_DEBUG("increasing frame buffer size to %d", frame_size);
00515
00516 max_frame_size = frame_size + TEN_K;
00517 if((tmp = (unsigned char*)realloc(frame, max_frame_size)) == NULL) {
00518 free(frame);
00519 sendError(fd, 500, "not enough memory");
00520 return;
00521 }
00522 frame = tmp;
00523 }
00524
00525
00526 timestamp.tv_sec = ros::Time::now().toSec();
00527
00528 memcpy(frame, &encoded_buffer[0], frame_size);
00529 ROS_DEBUG("got frame (size: %d kB)", frame_size / 1024);
00530 }
00531
00532
00533
00534
00535
00536
00537 sprintf(buffer, "Content-Type: image/jpeg\r\n" \
00538 "Content-Length: %d\r\n" \
00539 "X-Timestamp: %d.%06d\r\n" \
00540 "\r\n", frame_size, (int)timestamp.tv_sec, (int)timestamp.tv_usec);
00541 ROS_DEBUG("sending intemdiate header");
00542 if(write(fd, buffer, strlen(buffer)) < 0) break;
00543
00544 ROS_DEBUG("sending frame");
00545 if(write(fd, frame, frame_size) < 0) break;
00546
00547 ROS_DEBUG("sending boundary");
00548 sprintf(buffer, "\r\n--" BOUNDARY "\r\n");
00549 if(write(fd, buffer, strlen(buffer)) < 0) break;
00550 }
00551
00552 free(frame);
00553 }
00554
00555 void MJPEGServer::sendSnapshot(int fd, const char *parameter)
00556 {
00557 unsigned char *frame = NULL;
00558 int frame_size = 0;
00559 char buffer[BUFFER_SIZE] = {0};
00560 struct timeval timestamp;
00561 sensor_msgs::CvBridge image_bridge;
00562
00563 std::string params = parameter;
00564 ParameterMap parameter_map;
00565 decodeParameter(params, parameter_map);http:
00566
00567 ParameterMap::iterator itp = parameter_map.find("topic");
00568 if (itp == parameter_map.end()) return;
00569
00570 std::string topic = itp->second;
00571 ImageBuffer* image_buffer = getImageBuffer(topic);
00572
00573
00574 boost::unique_lock<boost::mutex> lock(image_buffer->mutex_);
00575 image_buffer->condition_.wait(lock);
00576
00577 IplImage* image;
00578 try {
00579 if (image_bridge.fromImage(image_buffer->msg, "bgr8")) {
00580 image = image_bridge.toIpl();
00581 }
00582 else {
00583 ROS_ERROR("Unable to convert %s image to bgr8", image_buffer->msg.encoding.c_str());
00584 return;
00585 }
00586 }
00587 catch(...) {
00588 ROS_ERROR("Unable to convert %s image to ipl format", image_buffer->msg.encoding.c_str());
00589 return;
00590 }
00591
00592 cv::Mat img = image;
00593 std::vector<uchar> encoded_buffer;
00594 std::vector<int> encode_params;
00595
00596
00597 int invert = 0;
00598 if(parameter_map.find("invert") != parameter_map.end()) {
00599 cv::Mat cloned_image = img.clone();
00600 invertImage(cloned_image, img);
00601 }
00602
00603
00604 int quality = 95;
00605 if(parameter_map.find("quality") != parameter_map.end()) {
00606 quality = stringToInt(parameter_map["quality"]);
00607 }
00608 encode_params.push_back(CV_IMWRITE_JPEG_QUALITY);
00609 encode_params.push_back(quality);
00610
00611
00612 if(parameter_map.find("width") != parameter_map.end() && parameter_map.find("height") != parameter_map.end()) {
00613 int width = stringToInt(parameter_map["width"]);
00614 int height = stringToInt(parameter_map["height"]);
00615 if(width > 0 && height > 0) {
00616 cv::Mat img_resized;
00617 cv::Size new_size(width,height);
00618 cv::resize(img, img_resized, new_size);
00619 cv::imencode(".jpeg", img_resized, encoded_buffer, encode_params);
00620 }
00621 else {
00622 cv::imencode(".jpeg", img, encoded_buffer, encode_params);
00623 }
00624 }
00625 else {
00626 cv::imencode(".jpeg", img, encoded_buffer, encode_params);
00627 }
00628
00629
00630 frame_size = encoded_buffer.size();
00631
00632
00633 if((frame = (unsigned char*)malloc(frame_size)) == NULL) {
00634 free(frame);
00635 sendError(fd, 500, "not enough memory");
00636 return;
00637 }
00638
00639
00640 timestamp.tv_sec = ros::Time::now().toSec();
00641
00642 memcpy(frame, &encoded_buffer[0], frame_size);
00643 ROS_DEBUG("got frame (size: %d kB)", frame_size / 1024);
00644
00645
00646 sprintf(buffer, "HTTP/1.0 200 OK\r\n" \
00647 STD_HEADER \
00648 "Content-type: image/jpeg\r\n" \
00649 "X-Timestamp: %d.%06d\r\n" \
00650 "\r\n", (int) timestamp.tv_sec, (int) timestamp.tv_usec);
00651
00652
00653 if(write(fd, buffer, strlen(buffer)) < 0 || \
00654 write(fd, frame, frame_size) < 0) {
00655 free(frame);
00656 return;
00657 }
00658
00659 free(frame);
00660 }
00661
00662 void MJPEGServer::client(int fd) {
00663 int cnt;
00664 char buffer[BUFFER_SIZE] = {0}, *pb = buffer;
00665 iobuffer iobuf;
00666 request req;
00667
00668
00669 initIOBuffer(&iobuf);
00670 initRequest(&req);
00671
00672
00673 memset(buffer, 0, sizeof(buffer));
00674 if((cnt = readLineWithTimeout(fd, &iobuf, buffer, sizeof(buffer) - 1, 5)) == -1) {
00675 close(fd);
00676 return;
00677 }
00678
00679
00680 if(strstr(buffer, "GET /?") != NULL) {
00681 req.type = A_STREAM;
00682
00683
00684 if((pb = strstr(buffer, "GET /")) == NULL) {
00685 ROS_DEBUG("HTTP request seems to be malformed");
00686 sendError(fd, 400, "Malformed HTTP request");
00687 close(fd);
00688 return;
00689 }
00690 pb += strlen("GET /");
00691 int len = MIN(MAX(strspn(pb, "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ._/-1234567890?="), 0), 100);
00692 req.parameter = (char*)malloc(len + 1);
00693 if(req.parameter == NULL) {
00694 exit(EXIT_FAILURE);
00695 }
00696 memset(req.parameter, 0, len + 1);
00697 strncpy(req.parameter, pb, len);
00698
00699 ROS_DEBUG("requested image topic: \"%s\"", len, req.parameter);
00700 }
00701 else if(strstr(buffer, "GET /stream?") != NULL) {
00702 req.type = A_STREAM;
00703
00704
00705 if((pb = strstr(buffer, "GET /stream")) == NULL) {
00706 ROS_DEBUG("HTTP request seems to be malformed");
00707 sendError(fd, 400, "Malformed HTTP request");
00708 close(fd);
00709 return;
00710 }
00711 pb += strlen("GET /stream");
00712 int len = MIN(MAX(strspn(pb, "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ._/-1234567890?="), 0), 100);
00713 req.parameter = (char*)malloc(len + 1);
00714 if(req.parameter == NULL) {
00715 exit(EXIT_FAILURE);
00716 }
00717 memset(req.parameter, 0, len + 1);
00718 strncpy(req.parameter, pb, len);
00719
00720 ROS_DEBUG("requested image topic: \"%s\"", len, req.parameter);
00721 }
00722 else if(strstr(buffer, "GET /snapshot?") != NULL) {
00723 req.type = A_SNAPSHOT;
00724
00725
00726 if((pb = strstr(buffer, "GET /snapshot")) == NULL) {
00727 ROS_DEBUG("HTTP request seems to be malformed");
00728 sendError(fd, 400, "Malformed HTTP request");
00729 close(fd);
00730 return;
00731 }
00732 pb += strlen("GET /snapshot");
00733 int len = MIN(MAX(strspn(pb, "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ._/-1234567890?="), 0), 100);
00734 req.parameter = (char*)malloc(len + 1);
00735 if(req.parameter == NULL) {
00736 exit(EXIT_FAILURE);
00737 }
00738 memset(req.parameter, 0, len + 1);
00739 strncpy(req.parameter, pb, len);
00740
00741 ROS_DEBUG("requested image topic: \"%s\"", len, req.parameter);
00742 }
00743
00744
00745
00746
00747
00748 do {
00749 memset(buffer, 0, sizeof(buffer));
00750
00751 if((cnt = readLineWithTimeout(fd, &iobuf, buffer, sizeof(buffer) - 1, 5)) == -1) {
00752 freeRequest(&req);
00753 close(fd);
00754 return;
00755 }
00756
00757 if(strstr(buffer, "User-Agent: ") != NULL) {
00758 req.client = strdup(buffer + strlen("User-Agent: "));
00759 } else if(strstr(buffer, "Authorization: Basic ") != NULL) {
00760 req.credentials = strdup(buffer + strlen("Authorization: Basic "));
00761 decodeBase64(req.credentials);
00762 ROS_DEBUG("username:password: %s", req.credentials);
00763 }
00764
00765 } while(cnt > 2 && !(buffer[0] == '\r' && buffer[1] == '\n'));
00766
00767
00768
00769
00770 switch(req.type) {
00771 case A_STREAM: {
00772 ROS_DEBUG("Request for streaming");
00773 sendStream(fd, req.parameter);
00774 break;
00775 }
00776 case A_SNAPSHOT: {
00777 ROS_DEBUG("Request for snapshot");
00778 sendSnapshot(fd, req.parameter);
00779 break;
00780 }
00781 default:
00782 ROS_DEBUG("unknown request");
00783 }
00784
00785 close(fd);
00786 freeRequest(&req);
00787
00788 ROS_INFO("Disconnecting HTTP client");
00789 return;
00790 }
00791
00792 void MJPEGServer::execute() {
00793
00794 ROS_INFO("Starting mjpeg server");
00795
00796 struct addrinfo *aip, *aip2;
00797 struct addrinfo hints;
00798 struct sockaddr_storage client_addr;
00799 socklen_t addr_len = sizeof(struct sockaddr_storage);
00800 fd_set selectfds;
00801 int max_fds = 0;
00802
00803 char name[NI_MAXHOST];
00804
00805 bzero(&hints, sizeof(hints));
00806 hints.ai_family = PF_UNSPEC;
00807 hints.ai_flags = AI_PASSIVE;
00808 hints.ai_socktype = SOCK_STREAM;
00809
00810 int err;
00811 snprintf(name, sizeof(name), "%d", port_);
00812 if((err = getaddrinfo(NULL, name, &hints, &aip)) != 0) {
00813 perror(gai_strerror(err));
00814 exit(EXIT_FAILURE);
00815 }
00816
00817 for(int i = 0; i < MAX_NUM_SOCKETS; i++)
00818 sd[i] = -1;
00819
00820
00821
00822 int i = 0;
00823 int on;
00824 for(aip2 = aip; aip2 != NULL; aip2 = aip2->ai_next) {
00825 if((sd[i] = socket(aip2->ai_family, aip2->ai_socktype, 0)) < 0) {
00826 continue;
00827 }
00828
00829
00830 on = 1;
00831 if(setsockopt(sd[i], SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0) {
00832 perror("setsockopt(SO_REUSEADDR) failed");
00833 }
00834
00835
00836 on = 1;
00837 if(aip2->ai_family == AF_INET6 && setsockopt(sd[i], IPPROTO_IPV6, IPV6_V6ONLY, (const void *)&on , sizeof(on)) < 0) {
00838 perror("setsockopt(IPV6_V6ONLY) failed");
00839 }
00840
00841
00842
00843
00844 if(bind(sd[i], aip2->ai_addr, aip2->ai_addrlen) < 0) {
00845 perror("bind");
00846 sd[i] = -1;
00847 continue;
00848 }
00849
00850 if(listen(sd[i], 10) < 0) {
00851 perror("listen");
00852 sd[i] = -1;
00853 } else {
00854 i++;
00855 if(i >= MAX_NUM_SOCKETS) {
00856 ROS_ERROR("Maximum number of server sockets exceeded");
00857 i--;
00858 break;
00859 }
00860 }
00861 }
00862
00863 sd_len = i;
00864
00865 if(sd_len < 1) {
00866 ROS_ERROR("Bind(%d) failed", port_);
00867 closelog();
00868 exit(EXIT_FAILURE);
00869 }
00870 else {
00871 ROS_INFO("Bind(%d) succeeded", port_);
00872 }
00873
00874 ROS_INFO("waiting for clients to connect");
00875
00876
00877 while(!stop_requested_) {
00878
00879 do {
00880 FD_ZERO(&selectfds);
00881
00882 for(i = 0; i < MAX_NUM_SOCKETS; i++) {
00883 if(sd[i] != -1) {
00884 FD_SET(sd[i], &selectfds);
00885
00886 if(sd[i] > max_fds)
00887 max_fds = sd[i];
00888 }
00889 }
00890
00891 err = select(max_fds + 1, &selectfds, NULL, NULL, NULL);
00892
00893 if(err < 0 && errno != EINTR) {
00894 perror("select");
00895 exit(EXIT_FAILURE);
00896 }
00897 } while(err <= 0);
00898
00899 ROS_INFO("Client connected");
00900
00901 for(i = 0; i < max_fds + 1; i++) {
00902 if(sd[i] != -1 && FD_ISSET(sd[i], &selectfds)) {
00903 int fd = accept(sd[i], (struct sockaddr *)&client_addr, &addr_len);
00904
00905
00906 ROS_DEBUG("create thread to handle client that just established a connection");
00907
00908 if(getnameinfo((struct sockaddr *)&client_addr, addr_len, name, sizeof(name), NULL, 0, NI_NUMERICHOST) == 0) {
00909 syslog(LOG_INFO, "serving client: %s\n", name);
00910 }
00911
00912 boost::thread t(boost::bind( &MJPEGServer::client, this, fd ));
00913 t.detach();
00914 }
00915 }
00916 }
00917
00918 ROS_INFO("leaving server thread, calling cleanup function now");
00919 cleanUp();
00920 }
00921
00922 void MJPEGServer::cleanUp() {
00923 ROS_INFO("cleaning up ressources allocated by server thread");
00924
00925 for(int i = 0; i < MAX_NUM_SOCKETS; i++)
00926 close(sd[i]);
00927 }
00928
00929 void MJPEGServer::spin() {
00930 boost::thread t(boost::bind( &MJPEGServer::execute, this ));
00931 t.detach();
00932 ros::spin();
00933 ROS_INFO("stop requested");
00934 stop();
00935 }
00936
00937 void MJPEGServer::stop() {
00938 stop_requested_ = true;
00939 }
00940
00941 }
00942
00943 int main(int argc, char** argv){
00944 ros::init(argc, argv, "mjpeg_server");
00945
00946 ros::NodeHandle nh;
00947 mjpeg_server::MJPEGServer server(nh);
00948 server.spin();
00949
00950 return(0);
00951 }
00952