jpeg_streamer.cpp
Go to the documentation of this file.
1 /* JPEG CompressedImage -> HTTP streaming node for ROS
2  * (c) 2010 Ken Tossell / ktossell@umd.edu
3  */
4 #include <ros/ros.h>
5 #include <sensor_msgs/CompressedImage.h>
7 #include <boost/thread.hpp>
8 #include <boost/tuple/tuple.hpp>
9 #include <boost/lexical_cast.hpp>
10 #include "mongoose.h"
11 
12 using namespace std;
13 
14 class JPEGStreamer {
15  public:
16  JPEGStreamer();
17  void add_connection(struct mg_connection *con, boost::condition_variable *cond, boost::mutex *single_mutex);
18 
19  private:
23  boost::mutex con_mutex;
24  boost::mutex data_mutex;
25  list<boost::tuple<struct mg_connection*, boost::condition_variable*, boost::mutex*> > connections;
26 
27  void image_callback(const sensor_msgs::CompressedImage::ConstPtr& msg);
28 
29  int skip, skipped;
30 };
31 
33 
34 void *new_req_callback(enum mg_event event, struct mg_connection *con, const struct mg_request_info *req) {
36  boost::mutex single_mutex;
37  boost::unique_lock<boost::mutex> lock(single_mutex);
38  g_status_video->add_connection(con, &cond, &single_mutex);
39  cond.wait(lock);
40  return (void*) 1;
41 }
42 
44  string topic = node.resolveName("image");
45  int port, start_threads, max_threads;
46  ros::NodeHandle("~").param("port", port, 8080);
47  ros::NodeHandle("~").param("skip", skip, 0);
48  ros::NodeHandle("~").param("start_threads", start_threads, 1);
49  ros::NodeHandle("~").param("max_threads", max_threads, 16);
50  skipped = 0;
51 
52  std::stringstream port_ss, num_threads_ss, max_threads_ss;
53  port_ss << port;
54  num_threads_ss << start_threads;
55  max_threads_ss << max_threads;
56 
57  const char *mg_options[] = {
58  "listening_ports", strdup(port_ss.str().c_str()),
59  "num_threads", strdup(num_threads_ss.str().c_str()),
60  "max_threads", strdup(max_threads_ss.str().c_str()),
61  "authentication_domain", ".",
62  NULL
63  };
64 
65  image_sub = node.subscribe<sensor_msgs::CompressedImage>(topic, 1,
66  boost::bind(&JPEGStreamer::image_callback, this, _1)
67  );
68 
69  g_status_video = this;
70 
71  web_context = mg_start(&new_req_callback, NULL, mg_options);
72 }
73 
74 static char header_text[] = "HTTP/1.0 200 OK\r\nConnection: Close\r\n"
75  "Server: ros/jpeg_streamer\r\n"
76  "Content-Type: multipart/x-mixed-replace;boundary=--myboundary\r\n\r\n";
77 
78 void JPEGStreamer::add_connection(struct mg_connection *con, boost::condition_variable *cond, boost::mutex *single_mutex) {
79  mg_write(con, header_text, sizeof(header_text));
80 
81  {
82  boost::mutex::scoped_lock lock(con_mutex);
83 
84  connections.push_back(boost::tuple<struct mg_connection*, boost::condition_variable*, boost::mutex*>(con, cond, single_mutex));
85  }
86 }
87 
88 void JPEGStreamer::image_callback(const sensor_msgs::CompressedImage::ConstPtr& msg) {
89  if (skipped++ == skip)
90  skipped = 0;
91  else
92  return;
93 
94  string data = "--myboundary\r\nContent-Type: image/jpeg\r\nContent-Length: "
95  + boost::lexical_cast<string>(msg->data.size()) + "\r\n\r\n";
96 
97  data.append((const char*) &(msg->data[0]), msg->data.size());
98 
99  data += "\r\n";
100 
101  const char *buf = data.c_str();
102  int buf_len = data.length();
103 
104  /* Send frame to our subscribers */
105  {
106  boost::mutex::scoped_lock con_lock(con_mutex);
107 
108  for (list<boost::tuple<struct mg_connection*, boost::condition_variable*, boost::mutex*> >::iterator it = connections.begin();
109  it != connections.end(); it++) {
110  struct mg_connection *con = (*it).get<0>();
111  boost::condition_variable *cond = (*it).get<1>();
112  // boost::mutex *single_mutex = (*it).get<2>();
113 
114  if (mg_write(con, buf, buf_len) < buf_len) {
115  it = connections.erase(it);
116  cond->notify_one();
117  }
118  }
119  }
120 }
121 
122 int main (int argc, char **argv) {
123  ros::init(argc, argv, "jpeg_streamer");
124 
125  JPEGStreamer streamer;
126 
127  ros::spin();
128 
129  return 0;
130 }
boost::mutex data_mutex
ros::NodeHandle node
static char header_text[]
void wait(unique_lock< mutex > &m)
ROSCPP_DECL void init(int &argc, char **argv, const std::string &name, uint32_t options=0)
int mg_write(struct mg_connection *, const void *buf, size_t len)
Definition: mongoose.c:1410
void * new_req_callback(enum mg_event event, struct mg_connection *con, const struct mg_request_info *req)
int main(int argc, char **argv)
ROSCPP_DECL void spin(Spinner &spinner)
ros::Subscriber image_sub
boost::mutex con_mutex
mg_event
Definition: mongoose.h:53
struct mg_context * mg_start(mg_callback_t callback, void *user_data, const char **options)
Definition: mongoose.c:4076
mg_context * web_context
list< boost::tuple< struct mg_connection *, boost::condition_variable *, boost::mutex * > > connections
static char * skip(char **buf, const char *delimiters)
Definition: mongoose.c:691
static JPEGStreamer * g_status_video
void add_connection(struct mg_connection *con, boost::condition_variable *cond, boost::mutex *single_mutex)
void notify_one() BOOST_NOEXCEPT
void image_callback(const sensor_msgs::CompressedImage::ConstPtr &msg)


jpeg_streamer
Author(s): Ken Tossell
autogenerated on Mon Jun 10 2019 12:51:46