Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007 #include "burst_calc/burst_creator.h"
00008 #include "burst_calc/ranges.h"
00009 #include "time_server/time_srv.h"
00010 #include <cstdio>
00011
00018 void BurstCreator::init()
00019 {
00020
00021 getParams();
00022
00023
00024 time_client_ = n_.serviceClient<time_server::time_srv>("time_service");
00025 burst_pub_cat_ = n_.advertise<burst_calc::burst>("bursts_to_cat_creator", 1000);
00026 burst_pub_viz_ = n_.advertise<burst_calc::burst>("bursts_to_dish_viz", 1);
00027 ranges_pub_cat_ = n_.advertise<burst_calc::ranges>("ranges_to_cat_creator", 1);
00028 ranges_pub_viz_ = n_.advertise<burst_calc::ranges>("ranges_to_dish_viz", 1);
00029
00030
00031 ROS_INFO("Waiting for subscribers...");
00032 while (burst_pub_cat_.getNumSubscribers() < 1 &&
00033 burst_pub_viz_.getNumSubscribers() < 1 &&
00034 ranges_pub_cat_.getNumSubscribers() < 1 &&
00035 ranges_pub_viz_.getNumSubscribers() < 1 && ros::ok());
00036
00037
00038 dish_state_sub_ = n_.subscribe("dish_states_to_burst_creator", 1000, &BurstCreator::callback,
00039 this);
00040
00041
00042 buf_.init(buffer_size_, stdev_mult_);
00043
00044
00045 run();
00046 }
00047
00051 void BurstCreator::getParams()
00052 {
00053
00054 if (!n_.getParam("buffer_size", buffer_size_))
00055 {
00056 ROS_WARN("Could not load buffer_size parameter, default is 1000");
00057 buffer_size_ = 1000;
00058 }
00059
00060
00061 if (!n_.getParam("stdev_mult", stdev_mult_))
00062 {
00063 ROS_WARN("Could not load stdev_mult parameter, default is 3.0");
00064 stdev_mult_ = 3.0;
00065 }
00066
00067
00068 if (!n_.getParam("burst_window", burst_window_))
00069 {
00070 ROS_WARN("Could not load burst_window parameter, default is 1000");
00071 burst_window_ = 1000;
00072 }
00073 }
00074
00080 void BurstCreator::run()
00081 {
00082 while (ros::ok())
00083 {
00084 ros::spinOnce();
00085 if (!queue_.empty())
00086 addDish();
00087 }
00088 }
00089
00097 void BurstCreator::addDish()
00098 {
00099 neuro_recv::dish_state d = queue_.front();
00100 queue_.pop();
00101
00102 if (buf_.isBuffered())
00103 {
00104
00105
00106
00107
00108
00109
00110
00111
00112 for (int i = 0; i < 60; i++)
00113 {
00114 bursts_[i].update(d);
00115 merger_.updateTime(i, bursts_[i].getTimePtr());
00116 if (bursts_[i].endOfBurst())
00117 {
00118 printf("* Burst : [Sz %4d] [%6.3f - %6.3f] [Ch %d]\n",
00119 static_cast<int>(bursts_[i].getBurst().dishes.size()),
00120 bursts_[i].getBurst().header.stamp.toSec(),
00121 bursts_[i].getBurst().end.toSec(), i);
00122 merger_.add(bursts_[i].getBurst());
00123 bursts_[i].reset();
00124 }
00125 }
00126
00127 merger_.update();
00128
00129
00130 static bool run_once = true;
00131
00132 while (merger_.canPublish())
00133 {
00134
00135 if (run_once)
00136 {
00137 time_server::time_srv seed;
00138 if (time_client_.call(seed))
00139 ROS_INFO("Time server seeded successfully");
00140 else
00141 ROS_ERROR("Time server is not responding");
00142 burst_pub_viz_.publish(merger_.getBurst());
00143 run_once = false;
00144 }
00145
00146 burst_pub_cat_.publish(merger_.getBurst());
00147
00148 printf("*** Publish : [Sz %4d] [%6.3f - %6.3f] [Ch",
00149 static_cast<int>(merger_.getBurst().dishes.size()),
00150 merger_.getBurst().header.stamp.toSec(),
00151 merger_.getBurst().end.toSec());
00152 for (unsigned int j = 0; j < merger_.getBurst().channels.size(); j++)
00153 printf(" %d", merger_.getBurst().channels[j]);
00154 printf("]\n");
00155
00156 ROS_INFO("Burst %.3f published", merger_.getBurst().header.stamp.toSec());
00157
00158 merger_.deletePublished();
00159 }
00160 }
00161 else
00162 {
00163 buf_.add(d);
00164
00165
00166 if (buf_.isBuffered())
00167 {
00168 for (int i = 0; i < 60; i++)
00169 bursts_[i].init(i, buf_.getBaseline(i), buf_.getThreshold(i),
00170 burst_window_);
00171
00172 burst_calc::ranges ranges = buf_.getRanges();
00173 ranges_pub_cat_.publish(ranges);
00174 ranges_pub_viz_.publish(ranges);
00175 }
00176 }
00177 }
00178
00183 void BurstCreator::finish()
00184 {
00185 ROS_INFO("Finishing burst creator node");
00186
00187 for (int i = 0; i < 60; i++)
00188 {
00189
00190 if (bursts_[i].isBursting())
00191 {
00192 merger_.add(bursts_[i].getBurst());
00193 printf("* Burst : [Sz %4d] [%6.3f - %6.3f] [Ch %d]\n",
00194 static_cast<int>(bursts_[i].getBurst().dishes.size()),
00195 bursts_[i].getBurst().header.stamp.toSec(),
00196 bursts_[i].getBurst().end.toSec(), i);
00197 }
00198
00199
00200
00201 merger_.updateTime(i, NULL);
00202
00203
00204 merger_.update();
00205
00206
00207 while (merger_.canPublish())
00208 {
00209 burst_pub_cat_.publish(merger_.getBurst());
00210
00211 printf("*** Publish : [Sz %4d] [%6.3f - %6.3f] [Ch",
00212 static_cast<int>(merger_.getBurst().dishes.size()),
00213 merger_.getBurst().header.stamp.toSec(),
00214 merger_.getBurst().end.toSec());
00215 for (unsigned int j = 0; j < merger_.getBurst().channels.size(); j++)
00216 printf(" %d", merger_.getBurst().channels[j]);
00217 printf("]\n");
00218
00219 ROS_INFO("Burst %.3f published", merger_.getBurst().header.stamp.toSec());
00220
00221 merger_.deletePublished();
00222 }
00223 }
00224 }
00225
00235 void BurstCreator::callback(const neuro_recv::dish_state::ConstPtr& d)
00236 {
00237
00238 if (d->last_dish)
00239 finish();
00240 else
00241 queue_.push(*d);
00242 }
00243
00247 int main(int argc, char** argv)
00248 {
00249 ros::init(argc, argv, "burst_creator");
00250 BurstCreator burst_creator;
00251 return 0;
00252 }