00001 #include <resized_image_transport/image_processing_nodelet.h>
00002
00003 #include <std_msgs/Float32.h>
00004 #include <image_transport/camera_common.h>
00005
00006 namespace resized_image_transport
00007 {
00008 void ImageProcessing::onInit() {
00009 initNodeHandle();
00010 initReconfigure();
00011 initParams();
00012 initPublishersAndSubscribers();
00013 }
00014
00015 void ImageProcessing::initNodeHandle(){
00016 nh = getNodeHandle();
00017 pnh = getPrivateNodeHandle();
00018 }
00019
00020 void ImageProcessing::initReconfigure(){
00021 }
00022
00023 void ImageProcessing::initParams(){
00024 publish_once_ = true;
00025
00026 pnh.param("resize_scale_x", resize_x_, 1.0);
00027 NODELET_INFO("resize_scale_x : %f", resize_x_);
00028 pnh.param("resize_scale_y", resize_y_, 1.0);
00029 NODELET_INFO("resize_scale_y : %f", resize_y_);
00030
00031 pnh.param("width", dst_width_, 0);
00032 NODELET_INFO("width : %d", dst_width_);
00033 pnh.param("height", dst_height_, 0);
00034 NODELET_INFO("height : %d", dst_height_);
00035 pnh.param("use_camera_subscriber", use_camera_subscriber_, false);
00036 pnh.param("max_queue_size", max_queue_size_, 5);
00037 pnh.param("use_snapshot", use_snapshot_, false);
00038 pnh.param("use_messages", use_messages_, true);
00039 if (use_messages_) {
00040 double d_period;
00041 pnh.param("period", d_period, 1.0);
00042 period_ = ros::Duration(d_period);
00043 NODELET_INFO("use_messages : %d", use_messages_);
00044 NODELET_INFO("message period : %f", period_.toSec());
00045 }
00046 pnh.param("use_bytes", use_bytes_, false);
00047 pnh.param("use_camera_info", use_camera_info_, true);
00048 }
00049
00050 void ImageProcessing::initPublishersAndSubscribers(){
00051 double vital_rate;
00052 pnh.param("vital_rate", vital_rate, 1.0);
00053 image_vital_.reset(
00054 new jsk_topic_tools::VitalChecker(1 / vital_rate));
00055 info_vital_.reset(
00056 new jsk_topic_tools::VitalChecker(1 / vital_rate));
00057 it_ = new image_transport::ImageTransport(pnh);
00058 std::string img = nh.resolveName("image");
00059 std::string cam = nh.resolveName("camera");
00060 if (img.at(0) == '/') {
00061 img.erase(0, 1);
00062 }
00063 NODELET_INFO("camera = %s", cam.c_str());
00064 NODELET_INFO("image = %s", img.c_str());
00065
00066 width_scale_pub_ = pnh.advertise<std_msgs::Float32>("output/width_scale", max_queue_size_);
00067 height_scale_pub_ = pnh.advertise<std_msgs::Float32>("output/height_scale", max_queue_size_);
00068
00069 if (use_snapshot_) {
00070 publish_once_ = false;
00071 srv_ = pnh.advertiseService("snapshot", &ImageProcessing::snapshot_srv_cb, this);
00072 sub_ = pnh.subscribe("snapshot", 1, &ImageProcessing::snapshot_msg_cb, this);
00073 }
00074
00075 if (use_camera_info_) {
00076 cp_ = it_->advertiseCamera("output/image", max_queue_size_);
00077 if (use_camera_subscriber_) {
00078 cs_ = it_->subscribeCamera("input/image", max_queue_size_,
00079 &ImageProcessing::callback, this);
00080 }
00081 else {
00082 camera_info_sub_ = nh.subscribe(image_transport::getCameraInfoTopic(pnh.resolveName("input/image")),
00083 1,
00084 &ImageProcessing::info_cb,
00085 this);
00086 image_nonsync_sub_ = it_->subscribe("input/image",
00087 1,
00088 &ImageProcessing::image_nonsync_cb,
00089 this);
00090 }
00091 }
00092 else {
00093 image_pub_ = pnh.advertise<sensor_msgs::Image>("output/image", max_queue_size_);
00094 image_sub_ = pnh.subscribe("input/image", max_queue_size_, &ImageProcessing::image_cb, this);
00095 }
00096
00097
00098 diagnostic_updater_.reset(
00099 new jsk_topic_tools::TimeredDiagnosticUpdater(pnh, ros::Duration(1.0)));
00100 diagnostic_updater_->setHardwareID(getName());
00101 diagnostic_updater_->add(
00102 getName(),
00103 boost::bind(
00104 &ImageProcessing::updateDiagnostic, this, _1));
00105 diagnostic_updater_->start();
00106
00107
00108 }
00109
00110 void ImageProcessing::snapshot_msg_cb (const std_msgs::EmptyConstPtr msg) {
00111 boost::mutex::scoped_lock lock(mutex_);
00112 publish_once_ = true;
00113 }
00114
00115 bool ImageProcessing::snapshot_srv_cb (std_srvs::Empty::Request &req,
00116 std_srvs::Empty::Response &res) {
00117 boost::mutex::scoped_lock lock(mutex_);
00118
00119 publish_once_ = true;
00120 return true;
00121 }
00122
00123 void ImageProcessing::info_cb(const sensor_msgs::CameraInfoConstPtr &msg) {
00124 boost::mutex::scoped_lock lock(mutex_);
00125 info_vital_->poke();
00126 info_msg_ = msg;
00127 }
00128
00129 void ImageProcessing::image_nonsync_cb(const sensor_msgs::ImageConstPtr& msg) {
00130 {
00131 boost::mutex::scoped_lock lock(mutex_);
00132 image_vital_->poke();
00133 if (!info_msg_) {
00134 NODELET_WARN_THROTTLE(1, "camera info is not yet available");
00135 return;
00136 }
00137 }
00138 callback(msg, info_msg_);
00139 }
00140
00141 void ImageProcessing::updateDiagnostic(diagnostic_updater::DiagnosticStatusWrapper &stat)
00142 {
00143 boost::mutex::scoped_lock lock(mutex_);
00144
00145 stat.add("use_camera_info", use_camera_info_);
00146 stat.add("use_snapshot", use_snapshot_);
00147 stat.add("input image", pnh.resolveName("input/image"));
00148 stat.add("output image", pnh.resolveName("output/image"));
00149
00150 if (!image_vital_->isAlive()) {
00151 stat.summary(diagnostic_msgs::DiagnosticStatus::ERROR,
00152 "no image input. Is " + pnh.resolveName("input/image") + " active?");
00153 }
00154 else {
00155 if (use_camera_info_) {
00156 if (!info_vital_->isAlive()) {
00157 stat.summary(diagnostic_msgs::DiagnosticStatus::ERROR,
00158 "no info input. Is " + camera_info_sub_.getTopic() + " active?");
00159 }
00160 else {
00161 stat.summary(diagnostic_msgs::DiagnosticStatus::OK,
00162 "running");
00163 }
00164 stat.add("input info", camera_info_sub_.getTopic());
00165 stat.add("info_last_received_time", info_vital_->lastAliveTimeRelative());
00166 }
00167 else {
00168 stat.summary(diagnostic_msgs::DiagnosticStatus::OK,
00169 "running");
00170 }
00171 }
00172 stat.add("image_last_received_time", image_vital_->lastAliveTimeRelative());
00173 ros::Time now = ros::Time::now();
00174 float duration = (now - last_rosinfo_time_).toSec();
00175 int in_time_n = in_times.size();
00176 int out_time_n = out_times.size();
00177 double in_time_mean = 0, in_time_rate = 1.0, in_time_std_dev = 0.0, in_time_max_delta, in_time_min_delta;
00178 double out_time_mean = 0, out_time_rate = 1.0, out_time_std_dev = 0.0, out_time_max_delta, out_time_min_delta;
00179
00180 std::for_each( in_times.begin(), in_times.end(), (in_time_mean += boost::lambda::_1) );
00181 in_time_mean /= in_time_n;
00182 in_time_rate /= in_time_mean;
00183 std::for_each( in_times.begin(), in_times.end(), (in_time_std_dev += (boost::lambda::_1 - in_time_mean)*(boost::lambda::_1 - in_time_mean) ) );
00184 in_time_std_dev = sqrt(in_time_std_dev/in_time_n);
00185 if (in_time_n > 1) {
00186 in_time_min_delta = *std::min_element(in_times.begin(), in_times.end());
00187 in_time_max_delta = *std::max_element(in_times.begin(), in_times.end());
00188 }
00189
00190 std::for_each( out_times.begin(), out_times.end(), (out_time_mean += boost::lambda::_1) );
00191 out_time_mean /= out_time_n;
00192 out_time_rate /= out_time_mean;
00193 std::for_each( out_times.begin(), out_times.end(), (out_time_std_dev += (boost::lambda::_1 - out_time_mean)*(boost::lambda::_1 - out_time_mean) ) );
00194 out_time_std_dev = sqrt(out_time_std_dev/out_time_n);
00195 if (out_time_n > 1) {
00196 out_time_min_delta = *std::min_element(out_times.begin(), out_times.end());
00197 out_time_max_delta = *std::max_element(out_times.begin(), out_times.end());
00198 }
00199
00200 double in_byte_mean = 0, out_byte_mean = 0;
00201 std::for_each( in_bytes.begin(), in_bytes.end(), (in_byte_mean += boost::lambda::_1) );
00202 std::for_each( out_bytes.begin(), out_bytes.end(), (out_byte_mean += boost::lambda::_1) );
00203 in_byte_mean /= duration;
00204 out_byte_mean /= duration;
00205 stat.add("compressed rate", in_byte_mean / out_byte_mean);
00206 stat.add("input bandwidth (Kbps)", in_byte_mean / 1000 * 8);
00207 stat.add("input bandwidth (Mbps)", in_byte_mean / 1000 / 1000 * 8);
00208 stat.add("input rate (hz)", in_time_rate);
00209 stat.add("input min delta (s)", in_time_min_delta);
00210 stat.add("input max delta (s)", in_time_max_delta);
00211 stat.add("input std_dev delta (s)", in_time_std_dev);
00212 stat.add("input times (n)", in_time_n);
00213 stat.add("output bandwidth (Kbps)", out_byte_mean / 1000 * 8);
00214 stat.add("output bandwidth (Mbps)", out_byte_mean / 1000 / 1000 * 8);
00215 stat.add("output rate (hz)", out_time_rate);
00216 stat.add("output min delta (s)", out_time_min_delta);
00217 stat.add("output max delta (s)", out_time_max_delta);
00218 stat.add("output std_dev delta (s)", out_time_std_dev);
00219 stat.add("output times (n)", out_time_n);
00220 in_times.clear();
00221 in_bytes.clear();
00222 out_times.clear();
00223 out_bytes.clear();
00224 last_rosinfo_time_ = now;
00225
00226 }
00227
00228 void ImageProcessing::image_cb(const sensor_msgs::ImageConstPtr &img){
00229 image_vital_->poke();
00230 callback(img, sensor_msgs::CameraInfo::ConstPtr());
00231 }
00232
00233 void ImageProcessing::callback(const sensor_msgs::ImageConstPtr &img,
00234 const sensor_msgs::CameraInfoConstPtr &info) {
00235 boost::mutex::scoped_lock lock(mutex_);
00236 ros::Time now = ros::Time::now();
00237 ROS_DEBUG("image processing callback");
00238 if ( !publish_once_ || (cp_.getNumSubscribers () == 0 && image_pub_.getNumSubscribers () == 0 )) {
00239 ROS_DEBUG("number of subscribers is 0, ignoring image");
00240 return;
00241 }
00242 if (use_messages_ && now - last_publish_time_ < period_) {
00243 ROS_DEBUG("to reduce load, ignoring image");
00244 return;
00245 }
00246 in_times.push_front((now - last_subscribe_time_).toSec());
00247 in_bytes.push_front(img->data.size());
00248
00249 try {
00250 sensor_msgs::ImagePtr dst_img;
00251 sensor_msgs::CameraInfo dst_info;
00252 process(img, info, dst_img, dst_info);
00253
00254
00255 if (use_camera_info_) {
00256 cp_.publish(dst_img,
00257 boost::make_shared<sensor_msgs::CameraInfo> (dst_info));
00258 }
00259 else {
00260 image_pub_.publish(dst_img);
00261 }
00262
00263
00264 std_msgs::Float32 width_scale;
00265 width_scale.data = resize_x_;
00266 std_msgs::Float32 height_scale;
00267 height_scale.data = resize_y_;
00268 width_scale_pub_.publish(width_scale);
00269 height_scale_pub_.publish(height_scale);
00270
00271 out_times.push_front((now - last_publish_time_).toSec());
00272 out_bytes.push_front(dst_img->step * dst_img->height);
00273
00274 last_publish_time_ = now;
00275 }
00276 catch (cv::Exception& e) {
00277 ROS_ERROR("%s", e.what());
00278 }
00279
00280 last_subscribe_time_ = now;
00281
00282 if (use_snapshot_) {
00283 publish_once_ = false;
00284 }
00285 }
00286 }
00287