duration_recorder_action_server_handler.h
Go to the documentation of this file.
1 /*
2  * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License").
5  * You may not use this file except in compliance with the License.
6  * A copy of the License is located at
7  *
8  * http://aws.amazon.com/apache2.0
9  *
10  * or in the "license" file accompanying this file. This file is distributed
11  * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12  * express or implied. See the License for the specific language governing
13  * permissions and limitations under the License.
14  */
15 #pragma once
16 
17 #include <sstream>
18 #include <string>
19 #include <thread>
20 
21 #include <ros/ros.h>
22 #include <ros/duration.h>
23 
27 
28 #include <file_uploader_msgs/UploadFilesAction.h>
29 #include <recorder_msgs/DurationRecorderAction.h>
30 
31 #include <aws/core/utils/logging/LogMacros.h>
32 
37 
38 namespace Aws{
39 namespace Rosbag{
40 
42 
43 
44 
45 template<typename GoalHandleT, typename UploadClientT>
47 {
48 private:
49  static bool ValidateGoal(GoalHandleT& goal_handle)
50  {
51  const auto & goal = goal_handle.getGoal();
52  if (goal->duration <= ros::Duration(0) || goal->duration > ros::DURATION_MAX) {
53  std::stringstream msg;
54  msg << "Goal rejected. Invalid record duration given: " << goal->duration;
55  recorder_msgs::DurationRecorderResult result;
56  Utils::GenerateResult(recorder_msgs::RecorderResult::INVALID_INPUT, msg.str(), result);
57  goal_handle.setRejected(result, result.result.message);
58  AWS_LOG_INFO(__func__, result.result.message.c_str());
59  return false;
60  }
61  return true;
62  }
63 public:
64  static void DurationRecorderStart(
66  const DurationRecorderOptions& duration_recorder_options,
67  UploadClientT& upload_client,
68  GoalHandleT& goal_handle)
69  {
70  // Used for logging in lambda function
71  static auto current_function = __func__;
72  ros::Time time_of_goal_received = ros::Time::now();
73 
74  AWS_LOG_INFO(__func__, "Goal received");
75  if (rosbag_recorder.IsActive()) {
76  const std::string msg = "Rejecting goal since recorder already active";
77  recorder_msgs::DurationRecorderResult result;
78  Utils::GenerateResult(recorder_msgs::RecorderResult::INTERNAL_ERROR, msg, result);
79  goal_handle.setRejected(result, msg);
80  AWS_LOG_INFO(__func__, msg.c_str());
81  return;
82  }
83 
84  if (!ValidateGoal(goal_handle)) {
85  // Goal was invalid and rejected
86  return;
87  }
88 
89  const auto & goal = goal_handle.getGoal();
91  options.max_duration = goal->duration;
92  options.min_space = 1024 * 1024 * duration_recorder_options.min_free_space_mib; // mebibytes to bytes
93  options.min_space_str = std::to_string(duration_recorder_options.min_free_space_mib) + 'M';
94  if (goal->topics_to_record.empty()) {
95  options.record_all = true;
96  } else {
97  options.record_all = false;
98  options.topics = goal->topics_to_record;
99  }
100  options.prefix = duration_recorder_options.write_directory;
101 
102  auto run_result = rosbag_recorder.Run(
103  options,
104  [goal_handle, time_of_goal_received]() mutable
105  {
106  goal_handle.setAccepted();
107  AWS_LOG_INFO(current_function, "Goal accepted");
108 
109  recorder_msgs::DurationRecorderFeedback recorder_feedback;
110  recorder_msgs::RecorderStatus recording_status;
112  recorder_msgs::RecorderStatus::RECORDING,
113  time_of_goal_received,
114  recorder_feedback,
115  recording_status);
116  goal_handle.publishFeedback(recorder_feedback);
117  },
118  [goal_handle, duration_recorder_options, time_of_goal_received, &upload_client](int exit_code) mutable
119  {
120  recorder_msgs::DurationRecorderResult result;
121  if (exit_code != 0) {
122  const std::string msg = "Rosbag recorder encountered errors.";
123  Utils::GenerateResult(recorder_msgs::RecorderResult::INTERNAL_ERROR, msg, result);
124  goal_handle.setAborted(result, msg);
125  AWS_LOG_INFO(current_function, "Recorder finished with non zero exit code, aborting goal");
126  return;
127  }
128 
129  recorder_msgs::DurationRecorderFeedback recorder_feedback;
130  recorder_msgs::RecorderStatus recording_status;
132  recorder_msgs::RecorderStatus::PREPARING_UPLOAD,
133  ros::Time::now(),
134  recorder_feedback,
135  recording_status);
136  goal_handle.publishFeedback(recorder_feedback);
137  std::vector<std::string> ros_bags_to_upload = Utils::GetRosbagsToUpload(duration_recorder_options.write_directory,
138  [time_of_goal_received](rosbag::View& rosbag) -> bool
139  {
140  // Select bags that were recorded during this duration recorder goal.
141  // Older bags may be left over from previous runs of the recorder.
142  return time_of_goal_received < rosbag.getBeginTime();
143  }
144  );
145 
146  bool upload_finished = Utils::UploadFiles(goal_handle, duration_recorder_options.upload_timeout_s, upload_client, ros_bags_to_upload);
147 
148  Utils::HandleRecorderUploadResult(goal_handle, upload_client.getState(), upload_finished, result);
149 
150  if (duration_recorder_options.delete_bags_after_upload) {
152  recorder_msgs::RecorderStatus::CLEANUP,
153  ros::Time::now(),
154  recorder_feedback,
155  recording_status);
156  goal_handle.publishFeedback(recorder_feedback);
157  for (const std::string & bag_file_name : upload_client.getResult()->files_uploaded) {
158  AWS_LOG_INFO(current_function, "Bag file named: %s was uploaded to S3 and is now being deleted locally.", bag_file_name.c_str());
159  Utils::DeleteFile(bag_file_name);
160  }
161  }
162  }
163  );
164 
165  if (Utils::RosbagRecorderRunResult::SKIPPED == run_result) {
166  recorder_msgs::DurationRecorderResult result;
167  result.result.result = recorder_msgs::RecorderResult::INTERNAL_ERROR;
168  goal_handle.setRejected(result, "Rejecting result, DurationRecorder already handling goal.");
169  }
170  }
171 
172  static void CancelDurationRecorder(GoalHandleT& goal_handle)
173  {
174  goal_handle.setCanceled();
175  }
176 };
177 
178 } // namespace Rosbag
179 } // namespace Aws
virtual RosbagRecorderRunResult Run(const RecorderOptions &recorder_options, const std::function< void()> &pre_record=nullptr, const std::function< void(int)> &post_record=nullptr)
actionlib::ActionServer< recorder_msgs::DurationRecorderAction > DurationRecorderActionServer
std::vector< std::string > GetRosbagsToUpload(const std::string &write_directory, const std::function< bool(rosbag::View &)> &select_file)
Definition: file_utils.cpp:181
Aws::Rosbag::RecorderErrorCode DeleteFile(const std::string &file_path)
delete a file
Definition: file_utils.cpp:93
options
static void DurationRecorderStart(Utils::RosbagRecorder< Utils::Recorder > &rosbag_recorder, const DurationRecorderOptions &duration_recorder_options, UploadClientT &upload_client, GoalHandleT &goal_handle)
bool UploadFiles(GoalHandleT &goal_handle, const double upload_time_out, UploadClientT &upload_client, std::vector< std::string > &ros_bags_to_upload)
void GenerateResult(uint8_t stage, std::string message, RecorderResultT &recorder_result)
std::vector< std::string > topics
Definition: recorder.h:122
static Time now()
void HandleRecorderUploadResult(GoalHandleT &goal_handle, const SimpleClientGoalStateT &end_state, bool upload_finished, RecorderResultT &recorder_result)
void GenerateFeedback(uint8_t stage, ros::Time time_of_feedback, RecorderFeedbackT &recorder_feedback, RecorderStatusT &recording_status)
ROSTIME_DECL const Duration DURATION_MAX


rosbag_cloud_recorders
Author(s): AWS RoboMaker
autogenerated on Tue Jun 1 2021 02:51:27