16 #include <gtest/gtest.h> 19 #include <kinesis_video_msgs/KinesisVideoFrame.h> 25 #include <rosgraph_msgs/Log.h> 42 #define ROS_POSTCALLBACK_ASSERT_TRUE(expr) \ 44 ros::getGlobalCallbackQueue()->callAvailable(); \ 46 ros::getGlobalCallbackQueue()->callAvailable(ros::WallDuration(kSingleCallbackWaitTime)); \ 48 ros::getGlobalCallbackQueue()->callAvailable( \ 49 ros::WallDuration(kMultipleCallbacksWaitTime)); \ 67 kTestData = &test_data;
69 subscription_installer =
72 stream_definition_provider, subscription_installer);
73 subscription_installer->set_stream_manager(stream_manager);
80 delete stream_manager;
81 delete stream_definition_provider;
82 delete subscription_installer;
100 parameter_reader.int_map_.insert({string(
"kinesis_video/stream_count"), stream_count});
101 parameter_reader.int_map_.at(
"kinesis_video/stream_count") =
stream_count;
102 parameter_reader.int_map_.insert({string(
"kinesis_video/stream0/topic_type"), 1});
103 parameter_reader.string_map_.insert({string(
"kinesis_video/stream0/stream_name"),
"stream-name"});
104 parameter_reader.string_map_.insert(
105 {string(
"kinesis_video/stream0/subscription_topic"),
"topic-name"});
106 unique_ptr<StreamDefinition> stream_definition =
107 real_stream_definition_provider.GetStreamDefinition(
ParameterPath(
""), parameter_reader,
nullptr, 0);
108 test_data.get_stream_definition_return_value = (StreamDefinition *)stream_definition.release();
125 parameter_reader.int_map_.insert({string(
"kinesis_video/stream_count"), stream_count});
133 ASSERT_EQ(test_data.get_codec_private_data_call_count, stream_count);
134 ASSERT_EQ(test_data.get_stream_definition_call_count, 0);
135 ASSERT_EQ(test_data.subscribe_call_count, 0);
141 parameter_reader.int_map_.insert({string(
"kinesis_video/stream_count"), stream_count});
148 ASSERT_EQ(test_data.get_codec_private_data_call_count, stream_count);
149 ASSERT_EQ(test_data.get_stream_definition_call_count, stream_count);
150 ASSERT_EQ(test_data.subscribe_call_count, 0);
156 int initialization_attempts = 100;
157 parameter_reader.int_map_.insert({string(
"kinesis_video/stream_count"), stream_count});
164 for (
int idx = 0; idx < initialization_attempts; idx++) {
165 parameter_reader.int_map_.at(
"kinesis_video/stream_count") =
stream_count;
166 unique_ptr<StreamDefinition> stream_definition =
167 real_stream_definition_provider.GetStreamDefinition(
ParameterPath(
""), parameter_reader,
nullptr, 0);
168 test_data.get_stream_definition_return_value = (StreamDefinition *)stream_definition.release();
169 setup_result = stream_manager->KinesisVideoStreamerSetup();
171 ASSERT_EQ(test_data.subscribe_call_count, 0);
174 ASSERT_EQ(test_data.get_codec_private_data_call_count, initialization_attempts);
175 ASSERT_EQ(test_data.get_stream_definition_call_count, initialization_attempts);
176 ASSERT_EQ(test_data.initialize_video_stream_call_count, initialization_attempts);
182 parameter_reader.int_map_.insert({string(
"kinesis_video/stream0/topic_type"), 1});
183 parameter_reader.string_map_.insert({string(
"kinesis_video/stream0/stream_name"),
"stream-name"});
184 parameter_reader.string_map_.insert(
185 {string(
"kinesis_video/stream0/subscription_topic"),
"topic-name"});
186 for (
int idx = 0; idx < initialization_attempts; idx++) {
187 parameter_reader.int_map_.at(
"kinesis_video/stream_count") =
stream_count;
188 unique_ptr<StreamDefinition> stream_definition =
189 real_stream_definition_provider.GetStreamDefinition(
ParameterPath(
""), parameter_reader,
nullptr, 0);
190 test_data.get_stream_definition_return_value = (StreamDefinition *)stream_definition.release();
194 ASSERT_EQ(test_data.get_codec_private_data_call_count, initialization_attempts);
195 ASSERT_EQ(test_data.get_stream_definition_call_count, initialization_attempts);
196 ASSERT_EQ(test_data.initialize_video_stream_call_count, initialization_attempts);
197 ASSERT_EQ(test_data.subscribe_call_count, initialization_attempts);
198 ASSERT_EQ(test_data.free_stream_call_count, initialization_attempts);
203 const kinesis_video_msgs::KinesisVideoFrame::ConstPtr & frame_msg)
209 std::string
stream_name,
const sensor_msgs::ImageConstPtr & image)
216 const kinesis_video_msgs::KinesisVideoFrame::ConstPtr & frame_msg,
224 public ::testing::WithParamInterface<std::tuple<
225 Aws::Kinesis::KinesisVideoFrameTransportCallbackFn, Aws::Kinesis::ImageTransportCallbackFn,
226 Aws::Kinesis::RekognitionEnabledKinesisVideoFrameTransportCallbackFn>>
233 real_subscription_installer->SetupKinesisVideoFrameTransport(std::get<0>(GetParam()));
234 real_subscription_installer->SetupImageTransport(std::get<1>(GetParam()));
235 real_subscription_installer->SetupRekognitionEnabledKinesisVideoFrameTransport(
236 std::get<2>(GetParam()));
239 if (stream_manager)
delete stream_manager;
241 stream_definition_provider, real_subscription_installer);
242 real_subscription_installer->set_stream_manager(stream_manager);
247 delete real_subscription_installer;
255 string subscription_topic_name(
"test0");
256 parameter_reader.int_map_.insert({string(
"kinesis_video/stream_count"), 1});
257 parameter_reader.int_map_.insert(
259 parameter_reader.string_map_.insert(
260 {string(
"kinesis_video/stream0/subscription_topic"), subscription_topic_name});
261 unique_ptr<StreamDefinition> stream_definition =
262 real_stream_definition_provider.GetStreamDefinition(
ParameterPath(
"kinesis_video",
"stream0"),
263 parameter_reader,
nullptr, 0);
264 test_data.get_stream_definition_return_value = (StreamDefinition *)stream_definition.release();
270 handle.advertise<kinesis_video_msgs::KinesisVideoFrame>(subscription_topic_name,
272 kinesis_video_msgs::KinesisVideoFrame message;
273 message.codec_private_data = {1, 2, 3};
274 for (
int idx = 0; idx < publish_call_count; idx++) {
275 kinesis_video_frame_publisher.
publish(message);
281 publish_call_count ||
282 test_data.put_frame_call_count == publish_call_count);
283 ASSERT_EQ(test_data.image_callback_call_count, 0);
284 if (test_data.put_frame_call_count == publish_call_count) {
285 ASSERT_EQ(test_data.process_codec_private_data_call_count, publish_call_count);
292 subscription_topic_name = string(
"test1");
293 parameter_reader.int_map_.at(
"kinesis_video/stream0/topic_type") =
295 parameter_reader.string_map_.at(
"kinesis_video/stream0/subscription_topic") =
296 subscription_topic_name;
297 stream_definition = real_stream_definition_provider.GetStreamDefinition(
298 ParameterPath(
"kinesis_video",
"stream0"), parameter_reader,
nullptr, 0);
299 test_data.get_stream_definition_return_value = (StreamDefinition *)stream_definition.release();
300 setup_result = stream_manager->KinesisVideoStreamerSetup();
306 sensor_msgs::Image image_message;
307 for (
int idx = 0; idx < publish_call_count; idx++) {
308 image_publisher.
publish(image_message);
311 ASSERT_EQ(test_data.kinesis_video_frame_callback_call_count, 0);
313 test_data.put_frame_call_count == publish_call_count);
319 string rekognition_results_topic =
"/rekognition/results";
320 subscription_topic_name = string(
"test2");
321 parameter_reader.int_map_.at(
"kinesis_video/stream0/topic_type") =
323 parameter_reader.string_map_.at(
"kinesis_video/stream0/subscription_topic") =
324 subscription_topic_name;
325 parameter_reader.string_map_.insert(
326 {
"kinesis_video/stream0/rekognition_topic_name", rekognition_results_topic});
327 parameter_reader.string_map_.insert(
328 {
"kinesis_video/stream0/rekognition_data_stream",
"kinesis-sample"});
329 stream_definition = real_stream_definition_provider.GetStreamDefinition(
330 ParameterPath(
"kinesis_video",
"stream0"), parameter_reader,
nullptr, 0);
331 test_data.get_stream_definition_return_value = (StreamDefinition *)stream_definition.release();
332 setup_result = stream_manager->KinesisVideoStreamerSetup();
335 kinesis_video_frame_publisher = handle.advertise<kinesis_video_msgs::KinesisVideoFrame>(
337 message = kinesis_video_msgs::KinesisVideoFrame();
338 message.codec_private_data = {1, 2, 3};
339 for (
int idx = 0; idx < publish_call_count; idx++) {
340 kinesis_video_frame_publisher.
publish(message);
346 test_data.rekognition_kinesis_video_frame_callback_call_count == publish_call_count ||
347 (test_data.put_frame_call_count == publish_call_count &&
348 test_data.fetch_rekognition_results_call_count == publish_call_count));
349 ASSERT_EQ(test_data.image_callback_call_count, 0);
350 if (test_data.put_frame_call_count == publish_call_count) {
351 ASSERT_EQ(test_data.process_codec_private_data_call_count, publish_call_count);
354 string cmd_line =
"rostopic list -v | grep " + rekognition_results_topic;
355 string expected_rostopic_list_output =
356 " * " + rekognition_results_topic +
" [std_msgs/String] 1 publisher\n";
357 shared_ptr<FILE> pipe(popen(cmd_line.c_str(),
"r"), pclose);
358 array<char, 256> buffer;
359 string rostopic_list_output;
360 while (!feof(pipe.get())) {
361 if (
nullptr != fgets(buffer.data(), buffer.size(), pipe.get())) {
362 rostopic_list_output += buffer.data();
365 ASSERT_EQ(rostopic_list_output, expected_rostopic_list_output);
391 TEST(StreamerGlobalSuite, rosParameterConstruction)
394 const char * parameter_name =
"my_param";
395 string kinesis_video_prefix =
"kinesis_video/";
396 string parameter_path_prefix =
397 kinesis_video_prefix + string(
"stream") + to_string(stream_idx);
398 string parameter_path = string(
"/") + parameter_path_prefix + string(parameter_name);
400 ASSERT_EQ(parameter_path_prefix +
string(
"/") +
string(parameter_name),
402 ASSERT_EQ(parameter_path_prefix,
404 ASSERT_EQ(kinesis_video_prefix +
string(parameter_name),
413 int main(
int argc,
char ** argv)
415 testing::InitGoogleTest(&argc, argv);
417 Aws::Utils::Logging::InitializeAWSLogging(
418 Aws::MakeShared<Aws::Utils::Logging::AWSROSLogger>(
"kinesis_video_streamer_test"));
420 queue<rosgraph_msgs::Log> rosout_queue;
423 rosout_queue.push(rosgraph_msgs::Log());
425 ros::init(argc, argv,
"test_kinesis_video_streamer");
428 int ret = RUN_ALL_TESTS();
430 Aws::Utils::Logging::ShutdownAWSLogging();
uint32_t image_callback_call_count
const char * stream_count
void(* KinesisVideoFrameTransportCallbackFn)(KinesisStreamManagerInterface &stream_manager, std::string stream_name, const kinesis_video_msgs::KinesisVideoFrame::ConstPtr &frame_msg)
void ImageTransportTestCallback(const KinesisStreamManagerInterface &stream_manager, std::string stream_name, const sensor_msgs::ImageConstPtr &image)
uint32_t rekognition_kinesis_video_frame_callback_call_count
TestParameterReader parameter_reader
constexpr uint32_t kDefaultMessageQueueSize
KINESIS_MANAGER_STATUS_MALLOC_FAILED
INSTANTIATE_TEST_CASE_P(End2EndTest, KinesisVideoStreamerE2ETest,::testing::ValuesIn(callback_tuples))
bool SetupImageTransport(const ImageTransportCallbackFn callback)
void publish(const boost::shared_ptr< M > &message) const
#define KINESIS_MANAGER_STATUS_FAILED(status)
Subscriber subscribe(const std::string &topic, uint32_t queue_size, void(T::*fp)(M), T *obj, const TransportHints &transport_hints=TransportHints())
MockStreamDefinitionProvider * stream_definition_provider
enum Aws::Kinesis::kinesis_manager_status_e KinesisManagerStatus
StreamDefinitionProvider real_stream_definition_provider
ROSCPP_DECL void init(int &argc, char **argv, const std::string &name, uint32_t options=0)
Publisher advertise(const std::string &base_topic, uint32_t queue_size, bool latch=false)
#define KINESIS_MANAGER_STATUS_SUCCEEDED(status)
RosStreamSubscriptionInstaller * real_subscription_installer
void rosout_logger_callback(const rosgraph_msgs::Log &published_log)
TEST_F(KinesisVideoStreamerTestBase, sanity)
Aws::Client::ParameterPath GetStreamParameterPrefix(int stream_idx)
ROSCPP_DECL CallbackQueue * getGlobalCallbackQueue()
bool SetupKinesisVideoFrameTransport(const KinesisVideoFrameTransportCallbackFn callback)
void publish(const sensor_msgs::Image &message) const
void(* RekognitionEnabledKinesisVideoFrameTransportCallbackFn)(KinesisStreamManagerInterface &stream_manager, std::string stream_name, const kinesis_video_msgs::KinesisVideoFrame::ConstPtr &frame_msg, const ros::Publisher &publisher)
void(* ImageTransportCallbackFn)(const KinesisStreamManagerInterface &stream_manager, std::string stream_name, const sensor_msgs::ImageConstPtr &image)
int main(int argc, char **argv)
KINESIS_MANAGER_STATUS_ERROR_BASE
Aws::Client::ParameterPath GetStreamParameterPath(int stream_idx, const char *parameter_name)
void RekognitionEnabledKinesisVideoFrameTransportCallback(KinesisStreamManagerInterface &stream_manager, std::string stream_name, const kinesis_video_msgs::KinesisVideoFrame::ConstPtr &frame_msg, const ros::Publisher &publisher)
MockStreamManager * stream_manager
void KinesisVideoFrameTransportCallback(KinesisStreamManagerInterface &stream_manager, std::string stream_name, const kinesis_video_msgs::KinesisVideoFrame::ConstPtr &frame_msg)
MockStreamSubscriptionInstaller * subscription_installer
Aws::Client::ParameterPath GetKinesisVideoParameter(const char *parameter_name)
queue< rosgraph_msgs::Log > * kRosoutQueue
void ImageTransportCallback(const KinesisStreamManagerInterface &stream_manager, std::string stream_name, const sensor_msgs::ImageConstPtr &image)
#define ROS_POSTCALLBACK_ASSERT_TRUE(expr)
Some tests rely on ROS callback processing which may take time. To avoid unnecessary waits we define ...
void RekognitionEnabledKinesisVideoFrameTransportTestCallback(KinesisStreamManagerInterface &stream_manager, std::string stream_name, const kinesis_video_msgs::KinesisVideoFrame::ConstPtr &frame_msg, const ros::Publisher &publisher)
constexpr double kSingleCallbackWaitTime
constexpr double kMultipleCallbacksWaitTime
vector< tuple< Aws::Kinesis::KinesisVideoFrameTransportCallbackFn, Aws::Kinesis::ImageTransportCallbackFn, Aws::Kinesis::RekognitionEnabledKinesisVideoFrameTransportCallbackFn > > callback_tuples
TEST_P(KinesisVideoStreamerE2ETest, E2ETest)
void KinesisVideoFrameTransportTestCallback(KinesisStreamManagerInterface &stream_manager, std::string stream_name, const kinesis_video_msgs::KinesisVideoFrame::ConstPtr &frame_msg)
uint32_t kinesis_video_frame_callback_call_count
bool SetupRekognitionEnabledKinesisVideoFrameTransport(const RekognitionEnabledKinesisVideoFrameTransportCallbackFn callback)
TEST(StreamerGlobalSuite, rosParameterConstruction)