00001 #include <resized_image_transport/image_processing_nodelet.h>
00002
00003 #include <std_msgs/Float32.h>
00004 #include <image_transport/camera_common.h>
00005
00006 namespace resized_image_transport
00007 {
00008 void ImageProcessing::onInit() {
00009 DiagnosticNodelet::onInit();
00010 initReconfigure();
00011 initParams();
00012 initPublishersAndSubscribers();
00013 }
00014
00015 void ImageProcessing::initReconfigure(){
00016 }
00017
00018 void ImageProcessing::initParams(){
00019 publish_once_ = true;
00020
00021 pnh_->param("resize_scale_x", resize_x_, 1.0);
00022 NODELET_INFO("resize_scale_x : %f", resize_x_);
00023 pnh_->param("resize_scale_y", resize_y_, 1.0);
00024 NODELET_INFO("resize_scale_y : %f", resize_y_);
00025
00026 pnh_->param("width", dst_width_, 0);
00027 NODELET_INFO("width : %d", dst_width_);
00028 pnh_->param("height", dst_height_, 0);
00029 NODELET_INFO("height : %d", dst_height_);
00030 pnh_->param("use_camera_subscriber", use_camera_subscriber_, false);
00031 pnh_->param("max_queue_size", max_queue_size_, 5);
00032 pnh_->param("use_snapshot", use_snapshot_, false);
00033 pnh_->param("use_messages", use_messages_, true);
00034 if (use_messages_) {
00035 double d_period;
00036 pnh_->param("period", d_period, 1.0);
00037 period_ = ros::Duration(d_period);
00038 NODELET_INFO("use_messages : %d", use_messages_);
00039 NODELET_INFO("message period : %f", period_.toSec());
00040 }
00041 pnh_->param("use_bytes", use_bytes_, false);
00042 pnh_->param("use_camera_info", use_camera_info_, true);
00043 }
00044
00045 void ImageProcessing::unsubscribe()
00046 {
00047 if (use_snapshot_) {
00048 sub_.shutdown();
00049 }
00050 if (use_camera_info_) {
00051 if (use_camera_subscriber_) {
00052 cs_.shutdown();
00053 }
00054 else {
00055 camera_info_sub_.shutdown();
00056 image_nonsync_sub_.shutdown();
00057 }
00058 }
00059 else {
00060 image_sub_.shutdown();
00061 }
00062 }
00063
00064 void ImageProcessing::subscribe()
00065 {
00066 if (use_snapshot_) {
00067 sub_ = pnh_->subscribe("snapshot", 1, &ImageProcessing::snapshot_msg_cb, this);
00068 }
00069 if (use_camera_info_) {
00070 if (use_camera_subscriber_) {
00071 cs_ = it_->subscribeCamera("input/image", max_queue_size_,
00072 &ImageProcessing::callback, this);
00073 }
00074 else {
00075 camera_info_sub_ = nh_->subscribe(image_transport::getCameraInfoTopic(pnh_->resolveName("input/image")),
00076 1,
00077 &ImageProcessing::info_cb,
00078 this);
00079 image_nonsync_sub_ = it_->subscribe("input/image",
00080 1,
00081 &ImageProcessing::image_nonsync_cb,
00082 this);
00083 }
00084 }
00085 else {
00086 image_sub_ = pnh_->subscribe("input/image", max_queue_size_, &ImageProcessing::image_cb, this);
00087 }
00088 }
00089
00090
00091 void ImageProcessing::initPublishersAndSubscribers(){
00092 double vital_rate;
00093 pnh_->param("vital_rate", vital_rate, 1.0);
00094 image_vital_.reset(
00095 new jsk_topic_tools::VitalChecker(1 / vital_rate));
00096 info_vital_.reset(
00097 new jsk_topic_tools::VitalChecker(1 / vital_rate));
00098 it_ = new image_transport::ImageTransport(*pnh_);
00099 std::string img = nh_->resolveName("image");
00100 std::string cam = nh_->resolveName("camera");
00101 if (img.at(0) == '/') {
00102 img.erase(0, 1);
00103 }
00104 NODELET_INFO("camera = %s", cam.c_str());
00105 NODELET_INFO("image = %s", img.c_str());
00106
00107 width_scale_pub_ = advertise<std_msgs::Float32>(*pnh_, "output/width_scale", max_queue_size_);
00108 height_scale_pub_ = advertise<std_msgs::Float32>(*pnh_, "output/height_scale", max_queue_size_);
00109
00110 if (use_snapshot_) {
00111 publish_once_ = false;
00112 srv_ = pnh_->advertiseService("snapshot", &ImageProcessing::snapshot_srv_cb, this);
00113 }
00114
00115 if (use_camera_info_) {
00116 #ifdef USE_2_0_9_ADVERTISE_CAMERA
00117 cp_ = advertiseCamera(*pnh_, *it_, "output/image", max_queue_size_);
00118 #else
00119 cp_ = advertiseCamera(*pnh_, "output/image", max_queue_size_);
00120 #endif
00121 }
00122 else {
00123 image_pub_ = advertise<sensor_msgs::Image>(*pnh_, "output/image", max_queue_size_);
00124 }
00125
00126 }
00127
00128 void ImageProcessing::snapshot_msg_cb (const std_msgs::EmptyConstPtr msg) {
00129 boost::mutex::scoped_lock lock(mutex_);
00130 publish_once_ = true;
00131 }
00132
00133 bool ImageProcessing::snapshot_srv_cb (std_srvs::Empty::Request &req,
00134 std_srvs::Empty::Response &res) {
00135 boost::mutex::scoped_lock lock(mutex_);
00136
00137 publish_once_ = true;
00138 return true;
00139 }
00140
00141 void ImageProcessing::info_cb(const sensor_msgs::CameraInfoConstPtr &msg) {
00142 boost::mutex::scoped_lock lock(mutex_);
00143 info_vital_->poke();
00144 info_msg_ = msg;
00145 }
00146
00147 void ImageProcessing::image_nonsync_cb(const sensor_msgs::ImageConstPtr& msg) {
00148 {
00149 boost::mutex::scoped_lock lock(mutex_);
00150 image_vital_->poke();
00151 if (!info_msg_) {
00152 NODELET_WARN_THROTTLE(1, "camera info is not yet available");
00153 return;
00154 }
00155 }
00156 callback(msg, info_msg_);
00157 }
00158
00159 void ImageProcessing::updateDiagnostic(diagnostic_updater::DiagnosticStatusWrapper &stat)
00160 {
00161 boost::mutex::scoped_lock lock(mutex_);
00162
00163 stat.add("use_camera_info", use_camera_info_);
00164 stat.add("use_snapshot", use_snapshot_);
00165 stat.add("input image", pnh_->resolveName("input/image"));
00166 stat.add("output image", pnh_->resolveName("output/image"));
00167
00168 if (!image_vital_->isAlive()) {
00169 stat.summary(diagnostic_msgs::DiagnosticStatus::ERROR,
00170 "no image input. Is " + pnh_->resolveName("input/image") + " active?");
00171 }
00172 else {
00173 if (use_camera_info_) {
00174 if (!info_vital_->isAlive()) {
00175 stat.summary(diagnostic_msgs::DiagnosticStatus::ERROR,
00176 "no info input. Is " + camera_info_sub_.getTopic() + " active?");
00177 }
00178 else {
00179 stat.summary(diagnostic_msgs::DiagnosticStatus::OK,
00180 "running");
00181 }
00182 stat.add("input info", camera_info_sub_.getTopic());
00183 stat.add("info_last_received_time", info_vital_->lastAliveTimeRelative());
00184 }
00185 else {
00186 stat.summary(diagnostic_msgs::DiagnosticStatus::OK,
00187 "running");
00188 }
00189 }
00190 stat.add("image_last_received_time", image_vital_->lastAliveTimeRelative());
00191 ros::Time now = ros::Time::now();
00192 float duration = (now - last_rosinfo_time_).toSec();
00193 int in_time_n = in_times.size();
00194 int out_time_n = out_times.size();
00195 double in_time_mean = 0, in_time_rate = 1.0, in_time_std_dev = 0.0, in_time_max_delta, in_time_min_delta;
00196 double out_time_mean = 0, out_time_rate = 1.0, out_time_std_dev = 0.0, out_time_max_delta, out_time_min_delta;
00197
00198 std::for_each( in_times.begin(), in_times.end(), (in_time_mean += boost::lambda::_1) );
00199 in_time_mean /= in_time_n;
00200 in_time_rate /= in_time_mean;
00201 std::for_each( in_times.begin(), in_times.end(), (in_time_std_dev += (boost::lambda::_1 - in_time_mean)*(boost::lambda::_1 - in_time_mean) ) );
00202 in_time_std_dev = sqrt(in_time_std_dev/in_time_n);
00203 if (in_time_n > 1) {
00204 in_time_min_delta = *std::min_element(in_times.begin(), in_times.end());
00205 in_time_max_delta = *std::max_element(in_times.begin(), in_times.end());
00206 }
00207
00208 std::for_each( out_times.begin(), out_times.end(), (out_time_mean += boost::lambda::_1) );
00209 out_time_mean /= out_time_n;
00210 out_time_rate /= out_time_mean;
00211 std::for_each( out_times.begin(), out_times.end(), (out_time_std_dev += (boost::lambda::_1 - out_time_mean)*(boost::lambda::_1 - out_time_mean) ) );
00212 out_time_std_dev = sqrt(out_time_std_dev/out_time_n);
00213 if (out_time_n > 1) {
00214 out_time_min_delta = *std::min_element(out_times.begin(), out_times.end());
00215 out_time_max_delta = *std::max_element(out_times.begin(), out_times.end());
00216 }
00217
00218 double in_byte_mean = 0, out_byte_mean = 0;
00219 std::for_each( in_bytes.begin(), in_bytes.end(), (in_byte_mean += boost::lambda::_1) );
00220 std::for_each( out_bytes.begin(), out_bytes.end(), (out_byte_mean += boost::lambda::_1) );
00221 in_byte_mean /= duration;
00222 out_byte_mean /= duration;
00223 stat.add("compressed rate", in_byte_mean / out_byte_mean);
00224 stat.add("input bandwidth (Kbps)", in_byte_mean / 1000 * 8);
00225 stat.add("input bandwidth (Mbps)", in_byte_mean / 1000 / 1000 * 8);
00226 stat.add("input rate (hz)", in_time_rate);
00227 stat.add("input min delta (s)", in_time_min_delta);
00228 stat.add("input max delta (s)", in_time_max_delta);
00229 stat.add("input std_dev delta (s)", in_time_std_dev);
00230 stat.add("input times (n)", in_time_n);
00231 stat.add("output bandwidth (Kbps)", out_byte_mean / 1000 * 8);
00232 stat.add("output bandwidth (Mbps)", out_byte_mean / 1000 / 1000 * 8);
00233 stat.add("output rate (hz)", out_time_rate);
00234 stat.add("output min delta (s)", out_time_min_delta);
00235 stat.add("output max delta (s)", out_time_max_delta);
00236 stat.add("output std_dev delta (s)", out_time_std_dev);
00237 stat.add("output times (n)", out_time_n);
00238 in_times.clear();
00239 in_bytes.clear();
00240 out_times.clear();
00241 out_bytes.clear();
00242 last_rosinfo_time_ = now;
00243
00244 }
00245
00246 void ImageProcessing::image_cb(const sensor_msgs::ImageConstPtr &img){
00247 image_vital_->poke();
00248 callback(img, sensor_msgs::CameraInfo::ConstPtr());
00249 }
00250
00251 void ImageProcessing::callback(const sensor_msgs::ImageConstPtr &img,
00252 const sensor_msgs::CameraInfoConstPtr &info) {
00253 boost::mutex::scoped_lock lock(mutex_);
00254 ros::Time now = ros::Time::now();
00255 ROS_DEBUG("image processing callback");
00256 if ( !publish_once_ || (cp_.getNumSubscribers () == 0 && image_pub_.getNumSubscribers () == 0 )) {
00257 ROS_DEBUG("number of subscribers is 0, ignoring image");
00258 return;
00259 }
00260 if (use_messages_ && now - last_publish_time_ < period_) {
00261 ROS_DEBUG("to reduce load, ignoring image");
00262 return;
00263 }
00264 in_times.push_front((now - last_subscribe_time_).toSec());
00265 in_bytes.push_front(img->data.size());
00266
00267 try {
00268 sensor_msgs::ImagePtr dst_img;
00269 sensor_msgs::CameraInfo dst_info;
00270 process(img, info, dst_img, dst_info);
00271
00272
00273 if (use_camera_info_) {
00274 cp_.publish(dst_img,
00275 boost::make_shared<sensor_msgs::CameraInfo> (dst_info));
00276 }
00277 else {
00278 image_pub_.publish(dst_img);
00279 }
00280
00281
00282 std_msgs::Float32 width_scale;
00283 width_scale.data = resize_x_;
00284 std_msgs::Float32 height_scale;
00285 height_scale.data = resize_y_;
00286 width_scale_pub_.publish(width_scale);
00287 height_scale_pub_.publish(height_scale);
00288
00289 out_times.push_front((now - last_publish_time_).toSec());
00290 out_bytes.push_front(dst_img->step * dst_img->height);
00291
00292 last_publish_time_ = now;
00293 }
00294 catch (cv::Exception& e) {
00295 ROS_ERROR("%s", e.what());
00296 }
00297
00298 last_subscribe_time_ = now;
00299
00300 if (use_snapshot_) {
00301 publish_once_ = false;
00302 }
00303 }
00304 }
00305