00001
00002
00003
00004 #include <ros/ros.h>
00005 #include <sensor_msgs/CompressedImage.h>
00006 #include <image_transport/image_transport.h>
00007 #include <boost/thread.hpp>
00008 #include <boost/tuple/tuple.hpp>
00009 #include <boost/lexical_cast.hpp>
00010 #include "mongoose.h"
00011
00012 using namespace std;
00013
00014 class JPEGStreamer {
00015 public:
00016 JPEGStreamer();
00017 void add_connection(struct mg_connection *con, boost::condition_variable *cond, boost::mutex *single_mutex);
00018
00019 private:
00020 ros::NodeHandle node;
00021 ros::Subscriber image_sub;
00022 mg_context *web_context;
00023 boost::mutex con_mutex;
00024 boost::mutex data_mutex;
00025 list<boost::tuple<struct mg_connection*, boost::condition_variable*, boost::mutex*> > connections;
00026
00027 void image_callback(const sensor_msgs::CompressedImage::ConstPtr& msg);
00028
00029 int skip, skipped;
00030 };
00031
00032 static JPEGStreamer *g_status_video;
00033
00034 void *new_req_callback(enum mg_event event, struct mg_connection *con, const struct mg_request_info *req) {
00035 boost::condition_variable cond;
00036 boost::mutex single_mutex;
00037 boost::unique_lock<boost::mutex> lock(single_mutex);
00038 g_status_video->add_connection(con, &cond, &single_mutex);
00039 cond.wait(lock);
00040 return (void*) 1;
00041 }
00042
00043 JPEGStreamer::JPEGStreamer() {
00044 string topic = node.resolveName("image");
00045 int port, start_threads, max_threads;
00046 ros::NodeHandle("~").param("port", port, 8080);
00047 ros::NodeHandle("~").param("skip", skip, 0);
00048 ros::NodeHandle("~").param("start_threads", start_threads, 1);
00049 ros::NodeHandle("~").param("max_threads", max_threads, 16);
00050 skipped = 0;
00051
00052 std::stringstream port_ss, num_threads_ss, max_threads_ss;
00053 port_ss << port;
00054 num_threads_ss << start_threads;
00055 max_threads_ss << max_threads;
00056
00057 const char *mg_options[] = {
00058 "listening_ports", strdup(port_ss.str().c_str()),
00059 "num_threads", strdup(num_threads_ss.str().c_str()),
00060 "max_threads", strdup(max_threads_ss.str().c_str()),
00061 "authentication_domain", ".",
00062 NULL
00063 };
00064
00065 image_sub = node.subscribe<sensor_msgs::CompressedImage>(topic, 1,
00066 boost::bind(&JPEGStreamer::image_callback, this, _1)
00067 );
00068
00069 g_status_video = this;
00070
00071 web_context = mg_start(&new_req_callback, NULL, mg_options);
00072 }
00073
00074 static char header_text[] = "HTTP/1.0 200 OK\r\nConnection: Close\r\n"
00075 "Server: ros/jpeg_streamer\r\n"
00076 "Content-Type: multipart/x-mixed-replace;boundary=--myboundary\r\n\r\n";
00077
00078 void JPEGStreamer::add_connection(struct mg_connection *con, boost::condition_variable *cond, boost::mutex *single_mutex) {
00079 mg_write(con, header_text, sizeof(header_text));
00080
00081 {
00082 boost::mutex::scoped_lock lock(con_mutex);
00083
00084 connections.push_back(boost::tuple<struct mg_connection*, boost::condition_variable*, boost::mutex*>(con, cond, single_mutex));
00085 }
00086 }
00087
00088 void JPEGStreamer::image_callback(const sensor_msgs::CompressedImage::ConstPtr& msg) {
00089 if (skipped++ == skip)
00090 skipped = 0;
00091 else
00092 return;
00093
00094 string data = "--myboundary\r\nContent-Type: image/jpeg\r\nContent-Length: "
00095 + boost::lexical_cast<string>(msg->data.size()) + "\r\n\r\n";
00096
00097 data.append((const char*) &(msg->data[0]), msg->data.size());
00098
00099 data += "\r\n";
00100
00101 const char *buf = data.c_str();
00102 int buf_len = data.length();
00103
00104
00105 {
00106 boost::mutex::scoped_lock con_lock(con_mutex);
00107
00108 for (list<boost::tuple<struct mg_connection*, boost::condition_variable*, boost::mutex*> >::iterator it = connections.begin();
00109 it != connections.end(); it++) {
00110 struct mg_connection *con = (*it).get<0>();
00111 boost::condition_variable *cond = (*it).get<1>();
00112
00113
00114 if (mg_write(con, buf, buf_len) < buf_len) {
00115 it = connections.erase(it);
00116 cond->notify_one();
00117 }
00118 }
00119 }
00120 }
00121
00122 int main (int argc, char **argv) {
00123 ros::init(argc, argv, "jpeg_streamer");
00124
00125 JPEGStreamer streamer;
00126
00127 ros::spin();
00128
00129 return 0;
00130 }