jpeg_streamer.cpp
Go to the documentation of this file.
00001 /* JPEG CompressedImage -> HTTP streaming node for ROS
00002  * (c) 2010 Ken Tossell / ktossell@umd.edu
00003  */
00004 #include <ros/ros.h>
00005 #include <sensor_msgs/CompressedImage.h>
00006 #include <image_transport/image_transport.h>
00007 #include <boost/thread.hpp>
00008 #include <boost/tuple/tuple.hpp>
00009 #include <boost/lexical_cast.hpp>
00010 #include "mongoose.h"
00011 
00012 using namespace std;
00013 
00014 class JPEGStreamer {
00015   public:
00016     JPEGStreamer();
00017     void add_connection(struct mg_connection *con, boost::condition_variable *cond, boost::mutex *single_mutex);
00018 
00019   private:
00020     ros::NodeHandle node;
00021     ros::Subscriber image_sub;
00022     mg_context *web_context;
00023     boost::mutex con_mutex;
00024     boost::mutex data_mutex;
00025     list<boost::tuple<struct mg_connection*, boost::condition_variable*, boost::mutex*> > connections;
00026 
00027     void image_callback(const sensor_msgs::CompressedImage::ConstPtr& msg);
00028 
00029     int skip, skipped;
00030 };
00031 
00032 static JPEGStreamer *g_status_video;
00033 
00034 void *new_req_callback(enum mg_event event, struct mg_connection *con, const struct mg_request_info *req) {
00035   boost::condition_variable cond;
00036   boost::mutex single_mutex;
00037   boost::unique_lock<boost::mutex> lock(single_mutex);
00038   g_status_video->add_connection(con, &cond, &single_mutex);
00039   cond.wait(lock);
00040   return (void*) 1;
00041 }
00042 
00043 JPEGStreamer::JPEGStreamer() {
00044   string topic = node.resolveName("image");
00045   int port, start_threads, max_threads;
00046   ros::NodeHandle("~").param("port", port, 8080);
00047   ros::NodeHandle("~").param("skip", skip, 0);
00048   ros::NodeHandle("~").param("start_threads", start_threads, 1);
00049   ros::NodeHandle("~").param("max_threads", max_threads, 16);
00050   skipped = 0;
00051 
00052   std::stringstream port_ss, num_threads_ss, max_threads_ss;
00053   port_ss << port;
00054   num_threads_ss << start_threads;
00055   max_threads_ss << max_threads;
00056 
00057   const char *mg_options[] = {
00058     "listening_ports", strdup(port_ss.str().c_str()),
00059     "num_threads", strdup(num_threads_ss.str().c_str()),
00060     "max_threads", strdup(max_threads_ss.str().c_str()),
00061     "authentication_domain", ".",
00062     NULL
00063   };
00064 
00065   image_sub = node.subscribe<sensor_msgs::CompressedImage>(topic, 1,
00066     boost::bind(&JPEGStreamer::image_callback, this, _1)
00067   );
00068 
00069   g_status_video = this;
00070 
00071   web_context = mg_start(&new_req_callback, NULL, mg_options);
00072 }
00073 
00074 static char header_text[] = "HTTP/1.0 200 OK\r\nConnection: Close\r\n"
00075   "Server: ros/jpeg_streamer\r\n"
00076   "Content-Type: multipart/x-mixed-replace;boundary=--myboundary\r\n\r\n";
00077 
00078 void JPEGStreamer::add_connection(struct mg_connection *con, boost::condition_variable *cond, boost::mutex *single_mutex) {
00079   mg_write(con, header_text, sizeof(header_text));
00080 
00081   {
00082     boost::mutex::scoped_lock lock(con_mutex);
00083 
00084     connections.push_back(boost::tuple<struct mg_connection*, boost::condition_variable*, boost::mutex*>(con, cond, single_mutex));
00085   }
00086 }
00087 
00088 void JPEGStreamer::image_callback(const sensor_msgs::CompressedImage::ConstPtr& msg) {
00089   if (skipped++ == skip)
00090     skipped = 0;
00091   else
00092     return;
00093 
00094   string data = "--myboundary\r\nContent-Type: image/jpeg\r\nContent-Length: "
00095     + boost::lexical_cast<string>(msg->data.size()) + "\r\n\r\n";
00096 
00097   data.append((const char*) &(msg->data[0]), msg->data.size());
00098 
00099   data += "\r\n";
00100 
00101   const char *buf = data.c_str();
00102   int buf_len = data.length();
00103 
00104   /* Send frame to our subscribers */
00105   {
00106     boost::mutex::scoped_lock con_lock(con_mutex);
00107 
00108     for (list<boost::tuple<struct mg_connection*, boost::condition_variable*, boost::mutex*> >::iterator it = connections.begin();
00109          it != connections.end(); it++) {
00110       struct mg_connection *con = (*it).get<0>();
00111       boost::condition_variable *cond = (*it).get<1>();
00112       // boost::mutex *single_mutex = (*it).get<2>();
00113 
00114       if (mg_write(con, buf, buf_len) < buf_len) {
00115         it = connections.erase(it);
00116         cond->notify_one();
00117       }
00118     }
00119   }
00120 }
00121 
00122 int main (int argc, char **argv) {
00123   ros::init(argc, argv, "jpeg_streamer");
00124 
00125   JPEGStreamer streamer;
00126 
00127   ros::spin();
00128 
00129   return 0;
00130 }
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Defines


jpeg_streamer
Author(s): Ken Tossell/ktossell@umd.edu
autogenerated on Sat Jul 27 2013 22:46:56