00001 #include <boost/foreach.hpp>
00002 #include <boost/lexical_cast.hpp>
00003 #include <boost/algorithm/string/predicate.hpp>
00004 #include <vector>
00005 #include <sensor_msgs/image_encodings.h>
00006 #include <opencv2/opencv.hpp>
00007
00008 #include "web_video_server/web_video_server.h"
00009 #include "web_video_server/ros_compressed_streamer.h"
00010 #include "web_video_server/jpeg_streamers.h"
00011 #include "web_video_server/png_streamers.h"
00012 #include "web_video_server/vp8_streamer.h"
00013 #include "web_video_server/h264_streamer.h"
00014 #include "web_video_server/vp9_streamer.h"
00015 #include "async_web_server_cpp/http_reply.hpp"
00016
00017 namespace web_video_server
00018 {
00019
00020 static bool __verbose;
00021
00022 static std::string __default_stream_type;
00023
00024 static bool ros_connection_logger(async_web_server_cpp::HttpServerRequestHandler forward,
00025 const async_web_server_cpp::HttpRequest &request,
00026 async_web_server_cpp::HttpConnectionPtr connection, const char* begin,
00027 const char* end)
00028 {
00029 if (__verbose)
00030 {
00031 ROS_INFO_STREAM("Handling Request: " << request.uri);
00032 }
00033 try
00034 {
00035 forward(request, connection, begin, end);
00036 return true;
00037 }
00038 catch (std::exception &e)
00039 {
00040 ROS_WARN_STREAM("Error Handling Request: " << e.what());
00041 return false;
00042 }
00043 return false;
00044 }
00045
00046 WebVideoServer::WebVideoServer(ros::NodeHandle &nh, ros::NodeHandle &private_nh) :
00047 nh_(nh), handler_group_(
00048 async_web_server_cpp::HttpReply::stock_reply(async_web_server_cpp::HttpReply::not_found))
00049 {
00050 #if ROS_HAS_STEADYTIMER || defined USE_STEADY_TIMER
00051 cleanup_timer_ = nh.createSteadyTimer(ros::WallDuration(0.5), boost::bind(&WebVideoServer::cleanup_inactive_streams, this));
00052 #else
00053 cleanup_timer_ = nh.createTimer(ros::Duration(0.5), boost::bind(&WebVideoServer::cleanup_inactive_streams, this));
00054 #endif
00055
00056 private_nh.param("port", port_, 8080);
00057 private_nh.param("verbose", __verbose, true);
00058
00059 private_nh.param<std::string>("address", address_, "0.0.0.0");
00060
00061 int server_threads;
00062 private_nh.param("server_threads", server_threads, 1);
00063
00064 private_nh.param("ros_threads", ros_threads_, 2);
00065 private_nh.param("publish_rate", publish_rate_, -1.0);
00066
00067 private_nh.param<std::string>("default_stream_type", __default_stream_type, "mjpeg");
00068
00069 stream_types_["mjpeg"] = boost::shared_ptr<ImageStreamerType>(new MjpegStreamerType());
00070 stream_types_["png"] = boost::shared_ptr<ImageStreamerType>(new PngStreamerType());
00071 stream_types_["ros_compressed"] = boost::shared_ptr<ImageStreamerType>(new RosCompressedStreamerType());
00072 stream_types_["vp8"] = boost::shared_ptr<ImageStreamerType>(new Vp8StreamerType());
00073 stream_types_["h264"] = boost::shared_ptr<ImageStreamerType>(new H264StreamerType());
00074 stream_types_["vp9"] = boost::shared_ptr<ImageStreamerType>(new Vp9StreamerType());
00075
00076 handler_group_.addHandlerForPath("/", boost::bind(&WebVideoServer::handle_list_streams, this, _1, _2, _3, _4));
00077 handler_group_.addHandlerForPath("/stream", boost::bind(&WebVideoServer::handle_stream, this, _1, _2, _3, _4));
00078 handler_group_.addHandlerForPath("/stream_viewer",
00079 boost::bind(&WebVideoServer::handle_stream_viewer, this, _1, _2, _3, _4));
00080 handler_group_.addHandlerForPath("/snapshot", boost::bind(&WebVideoServer::handle_snapshot, this, _1, _2, _3, _4));
00081
00082 try
00083 {
00084 server_.reset(
00085 new async_web_server_cpp::HttpServer(address_, boost::lexical_cast<std::string>(port_),
00086 boost::bind(ros_connection_logger, handler_group_, _1, _2, _3, _4),
00087 server_threads));
00088 }
00089 catch(boost::exception& e)
00090 {
00091 ROS_ERROR("Exception when creating the web server! %s:%d", address_.c_str(), port_);
00092 throw;
00093 }
00094 }
00095
00096 WebVideoServer::~WebVideoServer()
00097 {
00098 }
00099
00100 void WebVideoServer::spin()
00101 {
00102 server_->run();
00103 ROS_INFO_STREAM("Waiting For connections on " << address_ << ":" << port_);
00104
00105 ros::AsyncSpinner spinner(ros_threads_);
00106 spinner.start();
00107
00108 if ( publish_rate_ > 0 ) {
00109 ros::Rate r(publish_rate_);
00110
00111 while( ros::ok() ) {
00112 this->restreamFrames( 1.0 / publish_rate_ );
00113 r.sleep();
00114 }
00115 } else {
00116 ros::waitForShutdown();
00117 }
00118
00119 server_->stop();
00120 }
00121
00122 void WebVideoServer::restreamFrames( double max_age )
00123 {
00124 boost::mutex::scoped_lock lock(subscriber_mutex_);
00125
00126 typedef std::vector<boost::shared_ptr<ImageStreamer> >::iterator itr_type;
00127
00128 for (itr_type itr = image_subscribers_.begin(); itr < image_subscribers_.end(); ++itr)
00129 {
00130 (*itr)->restreamFrame( max_age );
00131 }
00132 }
00133
00134 void WebVideoServer::cleanup_inactive_streams()
00135 {
00136 boost::mutex::scoped_lock lock(subscriber_mutex_, boost::try_to_lock);
00137 if (lock)
00138 {
00139 typedef std::vector<boost::shared_ptr<ImageStreamer> >::iterator itr_type;
00140 itr_type new_end = std::partition(image_subscribers_.begin(), image_subscribers_.end(),
00141 !boost::bind(&ImageStreamer::isInactive, _1));
00142 if (__verbose)
00143 {
00144 for (itr_type itr = new_end; itr < image_subscribers_.end(); ++itr)
00145 {
00146 ROS_INFO_STREAM("Removed Stream: " << (*itr)->getTopic());
00147 }
00148 }
00149 image_subscribers_.erase(new_end, image_subscribers_.end());
00150 }
00151 }
00152
00153 bool WebVideoServer::handle_stream(const async_web_server_cpp::HttpRequest &request,
00154 async_web_server_cpp::HttpConnectionPtr connection, const char* begin,
00155 const char* end)
00156 {
00157 std::string type = request.get_query_param_value_or_default("type", __default_stream_type);
00158 if (stream_types_.find(type) != stream_types_.end())
00159 {
00160 std::string topic = request.get_query_param_value_or_default("topic", "");
00161
00162 if (type == std::string("ros_compressed"))
00163 {
00164 std::string compressed_topic_name = topic + "/compressed";
00165 ros::master::V_TopicInfo topics;
00166 ros::master::getTopics(topics);
00167 bool did_find_compressed_topic = false;
00168 for(ros::master::V_TopicInfo::iterator it = topics.begin(); it != topics.end(); ++it)
00169 {
00170 if (it->name == compressed_topic_name)
00171 {
00172 did_find_compressed_topic = true;
00173 break;
00174 }
00175 }
00176 if (!did_find_compressed_topic)
00177 {
00178 ROS_WARN_STREAM("Could not find compressed image topic for " << topic << ", falling back to mjpeg");
00179 type = "mjpeg";
00180 }
00181 }
00182 boost::shared_ptr<ImageStreamer> streamer = stream_types_[type]->create_streamer(request, connection, nh_);
00183 streamer->start();
00184 boost::mutex::scoped_lock lock(subscriber_mutex_);
00185 image_subscribers_.push_back(streamer);
00186 }
00187 else
00188 {
00189 async_web_server_cpp::HttpReply::stock_reply(async_web_server_cpp::HttpReply::not_found)(request, connection, begin,
00190 end);
00191 }
00192 return true;
00193 }
00194
00195 bool WebVideoServer::handle_snapshot(const async_web_server_cpp::HttpRequest &request,
00196 async_web_server_cpp::HttpConnectionPtr connection, const char* begin,
00197 const char* end)
00198 {
00199 boost::shared_ptr<ImageStreamer> streamer(new JpegSnapshotStreamer(request, connection, nh_));
00200 streamer->start();
00201
00202 boost::mutex::scoped_lock lock(subscriber_mutex_);
00203 image_subscribers_.push_back(streamer);
00204 return true;
00205 }
00206
00207 bool WebVideoServer::handle_stream_viewer(const async_web_server_cpp::HttpRequest &request,
00208 async_web_server_cpp::HttpConnectionPtr connection, const char* begin,
00209 const char* end)
00210 {
00211 std::string type = request.get_query_param_value_or_default("type", __default_stream_type);
00212 if (stream_types_.find(type) != stream_types_.end())
00213 {
00214 std::string topic = request.get_query_param_value_or_default("topic", "");
00215
00216 if (type == std::string("ros_compressed"))
00217 {
00218
00219 std::string compressed_topic_name = topic + "/compressed";
00220 ros::master::V_TopicInfo topics;
00221 ros::master::getTopics(topics);
00222 bool did_find_compressed_topic = false;
00223 for(ros::master::V_TopicInfo::iterator it = topics.begin(); it != topics.end(); ++it)
00224 {
00225 if (it->name == compressed_topic_name)
00226 {
00227 did_find_compressed_topic = true;
00228 break;
00229 }
00230 }
00231 if (!did_find_compressed_topic)
00232 {
00233 ROS_WARN_STREAM("Could not find compressed image topic for " << topic << ", falling back to mjpeg");
00234 type = "mjpeg";
00235 }
00236 }
00237
00238 async_web_server_cpp::HttpReply::builder(async_web_server_cpp::HttpReply::ok).header("Connection", "close").header(
00239 "Server", "web_video_server").header("Content-type", "text/html;").write(connection);
00240
00241 std::stringstream ss;
00242 ss << "<html><head><title>" << topic << "</title></head><body>";
00243 ss << "<h1>" << topic << "</h1>";
00244 ss << stream_types_[type]->create_viewer(request);
00245 ss << "</body></html>";
00246 connection->write(ss.str());
00247 }
00248 else
00249 {
00250 async_web_server_cpp::HttpReply::stock_reply(async_web_server_cpp::HttpReply::not_found)(request, connection, begin,
00251 end);
00252 }
00253 return true;
00254 }
00255
00256 bool WebVideoServer::handle_list_streams(const async_web_server_cpp::HttpRequest &request,
00257 async_web_server_cpp::HttpConnectionPtr connection, const char* begin,
00258 const char* end)
00259 {
00260 std::string image_message_type = ros::message_traits::datatype<sensor_msgs::Image>();
00261 std::string camera_info_message_type = ros::message_traits::datatype<sensor_msgs::CameraInfo>();
00262
00263 ros::master::V_TopicInfo topics;
00264 ros::master::getTopics(topics);
00265 ros::master::V_TopicInfo::iterator it;
00266 std::vector<std::string> image_topics;
00267 std::vector<std::string> camera_info_topics;
00268 for (it = topics.begin(); it != topics.end(); ++it)
00269 {
00270 const ros::master::TopicInfo &topic = *it;
00271 if (topic.datatype == image_message_type)
00272 {
00273 image_topics.push_back(topic.name);
00274 }
00275 else if (topic.datatype == camera_info_message_type)
00276 {
00277 camera_info_topics.push_back(topic.name);
00278 }
00279 }
00280
00281 async_web_server_cpp::HttpReply::builder(async_web_server_cpp::HttpReply::ok).header("Connection", "close").header(
00282 "Server", "web_video_server").header("Cache-Control",
00283 "no-cache, no-store, must-revalidate, pre-check=0, post-check=0, max-age=0").header(
00284 "Pragma", "no-cache").header("Content-type", "text/html;").write(connection);
00285
00286 connection->write("<html>"
00287 "<head><title>ROS Image Topic List</title></head>"
00288 "<body><h1>Available ROS Image Topics:</h1>");
00289 connection->write("<ul>");
00290 BOOST_FOREACH(std::string & camera_info_topic, camera_info_topics)
00291 {
00292 if (boost::algorithm::ends_with(camera_info_topic, "/camera_info"))
00293 {
00294 std::string base_topic = camera_info_topic.substr(0, camera_info_topic.size() - strlen("camera_info"));
00295 connection->write("<li>");
00296 connection->write(base_topic);
00297 connection->write("<ul>");
00298 std::vector<std::string>::iterator image_topic_itr = image_topics.begin();
00299 for (; image_topic_itr != image_topics.end();)
00300 {
00301 if (boost::starts_with(*image_topic_itr, base_topic))
00302 {
00303 connection->write("<li><a href=\"/stream_viewer?topic=");
00304 connection->write(*image_topic_itr);
00305 connection->write("\">");
00306 connection->write(image_topic_itr->substr(base_topic.size()));
00307 connection->write("</a> (");
00308 connection->write("<a href=\"/snapshot?topic=");
00309 connection->write(*image_topic_itr);
00310 connection->write("\">Snapshot</a>)");
00311 connection->write("</li>");
00312
00313 image_topic_itr = image_topics.erase(image_topic_itr);
00314 }
00315 else
00316 {
00317 ++image_topic_itr;
00318 }
00319 }
00320 connection->write("</ul>");
00321 }
00322 connection->write("</li>");
00323 }
00324 connection->write("</ul>");
00325
00326 connection->write("<ul>");
00327 std::vector<std::string>::iterator image_topic_itr = image_topics.begin();
00328 for (; image_topic_itr != image_topics.end();) {
00329 connection->write("<li><a href=\"/stream_viewer?topic=");
00330 connection->write(*image_topic_itr);
00331 connection->write("\">");
00332 connection->write(*image_topic_itr);
00333 connection->write("</a> (");
00334 connection->write("<a href=\"/snapshot?topic=");
00335 connection->write(*image_topic_itr);
00336 connection->write("\">Snapshot</a>)");
00337 connection->write("</li>");
00338
00339 image_topic_itr = image_topics.erase(image_topic_itr);
00340 }
00341 connection->write("</ul></body></html>");
00342 return true;
00343 }
00344
00345 }
00346
00347 int main(int argc, char **argv)
00348 {
00349 ros::init(argc, argv, "web_video_server");
00350
00351 ros::NodeHandle nh;
00352 ros::NodeHandle private_nh("~");
00353
00354 web_video_server::WebVideoServer server(nh, private_nh);
00355 server.spin();
00356
00357 return (0);
00358 }