trajectory_buffer.cpp
Go to the documentation of this file.
1 #include <mutex>
2 #include <string>
3 #include <strings.h>
4 
5 #include <ram_msgs/AdditiveManufacturingTrajectory.h>
6 #include <ram_trajectory_buffer/BufferParams.h>
7 #include <ram_trajectory_buffer/UnmodifiedTrajectory.h>
8 #include <ros/ros.h>
9 #include <rosbag/bag.h>
10 #include <rosbag/buffer.h>
11 #include <rosbag/exceptions.h>
12 #include <rosbag/structures.h>
13 #include <rosbag/view.h>
14 #include <signal.h>
15 
17 const unsigned msg_to_keep(10);
18 unsigned last_msg_published = 0;
20 
21 std::mutex msgs_mutex;
22 std::vector<ram_msgs::AdditiveManufacturingTrajectory> msgs; // Buffer 1. Contain all msgs
23 
25 std::vector<ram_msgs::AdditiveManufacturingTrajectory> unmodified_msgs; // Buffer 2. Contain just the unmodified msgs
26 
27 // On shutdown, write bag file containing last trajectories
28 void mySigintHandler(int)
29 {
30  std::lock_guard<std::mutex> lock(msgs_mutex);
31  std::lock_guard<std::mutex> lock_1(unmodified_msgs_mutex);
32 
33  if ((last_msg_published + 1) < msgs.size()) // erase msgs
34  {
35  msgs.erase((msgs.begin() + last_msg_published + 1), msgs.end());
36  // Erase msgs in the second buffer
37  while (unmodified_msgs.back().generated != msgs.back().generated && !unmodified_msgs.empty())
38  unmodified_msgs.pop_back();
39  }
40 
41  bag.open("ram_buffer_trajectory.bag", rosbag::bagmode::Write);
42  for (auto msg : msgs)
43  bag.write("ram/trajectory", ros::Time::now(), msg);
44  bag.close();
45 
46  bag.open("ram_buffer_unmodified_trajectory.bag", rosbag::bagmode::Write);
47  for (auto msg : unmodified_msgs)
48  bag.write("ram/trajectory", ros::Time::now(), msg);
49  bag.close();
50 
51  ros::shutdown();
52 }
53 
55 {
56  if (ros::this_node::getName().compare(msg_event.getPublisherName()) == 0) // The publisher is himself
57  return;
58 
59  std::lock_guard<std::mutex> lock(msgs_mutex);
60  std::lock_guard<std::mutex> lock_1(unmodified_msgs_mutex);
61 
62  if ((last_msg_published + 1) < msgs.size()) // erase msgs
63  {
64  msgs.erase((msgs.begin() + last_msg_published + 1), msgs.end());
65  // Erase msgs in the second buffer
66  while (unmodified_msgs.back().generated != msgs.back().generated && !unmodified_msgs.empty())
67  unmodified_msgs.pop_back();
68  }
69 
70  last_msg_published = msgs.size();
71  msgs.push_back(*msg_event.getMessage());
72 
73  if (msg_event.getMessage()->generated == msg_event.getMessage()->modified) // the trajectory is not modified
74  unmodified_msgs.push_back(*msg_event.getMessage());
75  // Always save the same numbers of elements
76  if (msgs.size() > msg_to_keep)
77  {
78  msgs.erase(msgs.begin(), msgs.end() - msg_to_keep);
79  last_msg_published = msgs.size() - 1;
80  }
81  if (unmodified_msgs.size() > msg_to_keep)
83 }
84 
85 bool trajectoryBufferCallback(ram_trajectory_buffer::BufferParams::Request &req,
86  ram_trajectory_buffer::BufferParams::Response &res)
87 {
88  std::lock_guard<std::mutex> lock(msgs_mutex);
89 
90  ram_msgs::AdditiveManufacturingTrajectory trajectory;
91  switch (req.button_id)
92  {
93  case 1:
94  // Back button
95  if (last_msg_published > 0)
97  else
98  {
99  res.error = "Start of the buffer";
100  return true;
101  }
102  break;
103  case 2:
104  // Forward button
105  if ((last_msg_published + 1) < msgs.size())
107  else
108  {
109  res.error = "End of the buffer";
110  return true;
111  }
112  break;
113  }
114 
115  if (last_msg_published < msgs.size())
116  {
117  // Trajectory is published
118  trajectory = msgs[last_msg_published];
119  pub.publish(trajectory);
120  }
121 
122  res.error.clear();
123  return true;
124 }
125 
126 bool getUnmodifiedTrajectoryCallback(ram_trajectory_buffer::UnmodifiedTrajectory::Request &req,
127  ram_trajectory_buffer::UnmodifiedTrajectory::Response &res)
128 {
129  std::lock_guard<std::mutex> lock_1(unmodified_msgs_mutex);
130  for (auto msg : unmodified_msgs)
131  {
132  if (msg.generated == req.generated)
133  {
134  res.trajectory = msg;
135  return true;
136  }
137  }
138 
139  return false;
140 }
141 
142 int main(int argc,
143  char **argv)
144 {
145  ros::init(argc, argv, "ram_trajectory_buffer", ros::init_options::NoSigintHandler);
146  signal(SIGINT, mySigintHandler);
148 
149  bool error_in_first_bag = false;
150  bool error_in_second_bag = false;
151 
152  // Read the bag file (first buffer)
153  try
154  {
155  bag.open("ram_buffer_trajectory.bag", rosbag::bagmode::Read);
156  std::vector<std::string> topics;
157  topics.push_back(std::string("ram/trajectory"));
158  rosbag::View view(bag, rosbag::TopicQuery(topics));
159  std::lock_guard<std::mutex> lock(msgs_mutex);
160 
161  for (auto m : view)
162  {
163  ram_msgs::AdditiveManufacturingTrajectory::ConstPtr s =
164  m.instantiate<ram_msgs::AdditiveManufacturingTrajectory>();
165  if (s != NULL)
166  msgs.push_back(*s);
167  }
168 
169  last_msg_published = msgs.size(); // There are not published trajectories
170  bag.close();
171  }
172  catch (rosbag::BagException &e)
173  {
174  bag.close();
175  ROS_WARN_STREAM(e.what());
176 
177  std::string file(getenv("HOME"));
178  file += "/.ros/ram_buffer_trajectory.bag";
179 
180  if (std::remove(file.c_str()) == 0)
181  ROS_WARN_STREAM(file << " was removed because corrupted");
182 
183  error_in_first_bag = true;
184  }
185 
186  // Read the bag file (second buffer)
187  try
188  {
189  bag.open("ram_buffer_unmodified_trajectory.bag", rosbag::bagmode::Read);
190  std::vector<std::string> topics;
191  topics.push_back(std::string("ram/trajectory"));
192  rosbag::View view(bag, rosbag::TopicQuery(topics));
193  std::lock_guard<std::mutex> lock_1(unmodified_msgs_mutex);
194 
195  for (auto m : view)
196  {
197  ram_msgs::AdditiveManufacturingTrajectory::ConstPtr s =
198  m.instantiate<ram_msgs::AdditiveManufacturingTrajectory>();
199  if (s != NULL)
200  unmodified_msgs.push_back(*s);
201  }
202  bag.close();
203  }
204  catch (rosbag::BagException &e)
205  {
206  bag.close();
207  ROS_WARN_STREAM(e.what());
208 
209  std::string file(getenv("HOME"));
210  file += "/.ros/ram_buffer_unmodified_trajectory.bag";
211 
212  if (std::remove(file.c_str()) == 0)
213  ROS_WARN_STREAM(file << " was removed because corrupted");
214 
215  error_in_second_bag = true;
216  }
217 
218  if (error_in_first_bag || error_in_second_bag)
219  {
220  msgs.clear();
221  unmodified_msgs.clear();
222  last_msg_published = 0;
223  }
224  // Publish on "ram/trajectory"
225  pub = nh.advertise<ram_msgs::AdditiveManufacturingTrajectory>("ram/trajectory", msg_to_keep, true);
226 
227  // Subscribe on "ram/trajectory"
228  ros::Subscriber sub = nh.subscribe("ram/trajectory", msg_to_keep, saveTrajectoryCallback);
229  ros::ServiceServer service_1 = nh.advertiseService("ram/buffer/get_trajectory", trajectoryBufferCallback);
230  ros::ServiceServer service_2 = nh.advertiseService("ram/buffer/get_unmodified_trajectory",
232 
233  ros::AsyncSpinner spinner(0);
234  spinner.start();
236 
237  return 0;
238 }
std::mutex msgs_mutex
void publish(const boost::shared_ptr< M > &message) const
void open(std::string const &filename, uint32_t mode=bagmode::Read)
Subscriber subscribe(const std::string &topic, uint32_t queue_size, void(T::*fp)(M), T *obj, const TransportHints &transport_hints=TransportHints())
XmlRpcServer s
ROSCPP_DECL void init(int &argc, char **argv, const std::string &name, uint32_t options=0)
ROSCPP_DECL const std::string & getName()
ServiceServer advertiseService(const std::string &service, bool(T::*srv_func)(MReq &, MRes &), T *obj)
void close()
rosbag::Bag bag
std::vector< ram_msgs::AdditiveManufacturingTrajectory > msgs
void saveTrajectoryCallback(const ros::MessageEvent< ram_msgs::AdditiveManufacturingTrajectory > &msg_event)
void mySigintHandler(int)
const std::string & getPublisherName() const
ros::Publisher pub
bool trajectoryBufferCallback(ram_trajectory_buffer::BufferParams::Request &req, ram_trajectory_buffer::BufferParams::Response &res)
std::mutex unmodified_msgs_mutex
std::shared_ptr< ros::NodeHandle > nh
Publisher advertise(const std::string &topic, uint32_t queue_size, bool latch=false)
#define ROS_WARN_STREAM(args)
int main(int argc, char **argv)
bool getUnmodifiedTrajectoryCallback(ram_trajectory_buffer::UnmodifiedTrajectory::Request &req, ram_trajectory_buffer::UnmodifiedTrajectory::Response &res)
static Time now()
ROSCPP_DECL void shutdown()
const unsigned msg_to_keep(10)
std::vector< ram_msgs::AdditiveManufacturingTrajectory > unmodified_msgs
void write(std::string const &topic, ros::MessageEvent< T > const &event)
boost::shared_ptr< M > getMessage() const
ROSCPP_DECL void waitForShutdown()
unsigned last_msg_published


ram_trajectory_buffer
Author(s): Andres Campos - Institut Maupertuis
autogenerated on Thu Oct 12 2017 02:50:40