$search
00001 /* JPEG CompressedImage -> HTTP streaming node for ROS 00002 * (c) 2010 Ken Tossell / ktossell@umd.edu 00003 */ 00004 #include <ros/ros.h> 00005 #include <sensor_msgs/CompressedImage.h> 00006 #include <image_transport/image_transport.h> 00007 #include <boost/thread.hpp> 00008 #include <boost/tuple/tuple.hpp> 00009 #include <boost/lexical_cast.hpp> 00010 #include "mongoose.h" 00011 00012 using namespace std; 00013 00014 class JPEGStreamer { 00015 public: 00016 JPEGStreamer(); 00017 void add_connection(struct mg_connection *con, boost::condition_variable *cond, boost::mutex *single_mutex); 00018 00019 private: 00020 ros::NodeHandle node; 00021 ros::Subscriber image_sub; 00022 mg_context *web_context; 00023 boost::mutex con_mutex; 00024 boost::mutex data_mutex; 00025 list<boost::tuple<struct mg_connection*, boost::condition_variable*, boost::mutex*> > connections; 00026 00027 void image_callback(const sensor_msgs::CompressedImage::ConstPtr& msg); 00028 00029 int skip, skipped; 00030 }; 00031 00032 static JPEGStreamer *g_status_video; 00033 00034 void *new_req_callback(enum mg_event event, struct mg_connection *con, const struct mg_request_info *req) { 00035 boost::condition_variable cond; 00036 boost::mutex single_mutex; 00037 boost::unique_lock<boost::mutex> lock(single_mutex); 00038 g_status_video->add_connection(con, &cond, &single_mutex); 00039 cond.wait(lock); 00040 return (void*) 1; 00041 } 00042 00043 JPEGStreamer::JPEGStreamer() { 00044 string topic = node.resolveName("image"); 00045 int port, start_threads, max_threads; 00046 ros::NodeHandle("~").param("port", port, 8080); 00047 ros::NodeHandle("~").param("skip", skip, 0); 00048 ros::NodeHandle("~").param("start_threads", start_threads, 1); 00049 ros::NodeHandle("~").param("max_threads", max_threads, 16); 00050 skipped = 0; 00051 00052 std::stringstream port_ss, num_threads_ss, max_threads_ss; 00053 port_ss << port; 00054 num_threads_ss << start_threads; 00055 max_threads_ss << max_threads; 00056 00057 const char *mg_options[] = { 00058 "listening_ports", strdup(port_ss.str().c_str()), 00059 "num_threads", strdup(num_threads_ss.str().c_str()), 00060 "max_threads", strdup(max_threads_ss.str().c_str()), 00061 "authentication_domain", ".", 00062 NULL 00063 }; 00064 00065 image_sub = node.subscribe<sensor_msgs::CompressedImage>(topic, 1, 00066 boost::bind(&JPEGStreamer::image_callback, this, _1) 00067 ); 00068 00069 g_status_video = this; 00070 00071 web_context = mg_start(&new_req_callback, NULL, mg_options); 00072 } 00073 00074 static char header_text[] = "HTTP/1.0 200 OK\r\nConnection: Close\r\n" 00075 "Server: ros/jpeg_streamer\r\n" 00076 "Content-Type: multipart/x-mixed-replace;boundary=--myboundary\r\n\r\n"; 00077 00078 void JPEGStreamer::add_connection(struct mg_connection *con, boost::condition_variable *cond, boost::mutex *single_mutex) { 00079 mg_write(con, header_text, sizeof(header_text)); 00080 00081 { 00082 boost::mutex::scoped_lock lock(con_mutex); 00083 00084 connections.push_back(boost::tuple<struct mg_connection*, boost::condition_variable*, boost::mutex*>(con, cond, single_mutex)); 00085 } 00086 } 00087 00088 void JPEGStreamer::image_callback(const sensor_msgs::CompressedImage::ConstPtr& msg) { 00089 if (skipped++ == skip) 00090 skipped = 0; 00091 else 00092 return; 00093 00094 string data = "--myboundary\r\nContent-Type: image/jpeg\r\nContent-Length: " 00095 + boost::lexical_cast<string>(msg->data.size()) + "\r\n\r\n"; 00096 00097 data.append((const char*) &(msg->data[0]), msg->data.size()); 00098 00099 data += "\r\n"; 00100 00101 const char *buf = data.c_str(); 00102 int buf_len = data.length(); 00103 00104 /* Send frame to our subscribers */ 00105 { 00106 boost::mutex::scoped_lock con_lock(con_mutex); 00107 00108 for (list<boost::tuple<struct mg_connection*, boost::condition_variable*, boost::mutex*> >::iterator it = connections.begin(); 00109 it != connections.end(); it++) { 00110 struct mg_connection *con = (*it).get<0>(); 00111 boost::condition_variable *cond = (*it).get<1>(); 00112 // boost::mutex *single_mutex = (*it).get<2>(); 00113 00114 if (mg_write(con, buf, buf_len) < buf_len) { 00115 it = connections.erase(it); 00116 cond->notify_one(); 00117 } 00118 } 00119 } 00120 } 00121 00122 int main (int argc, char **argv) { 00123 ros::init(argc, argv, "jpeg_streamer"); 00124 00125 JPEGStreamer streamer; 00126 00127 ros::spin(); 00128 00129 return 0; 00130 }