kinesis_video_streamer_test.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License").
5  * You may not use this file except in compliance with the License.
6  * A copy of the License is located at
7  *
8  * http://aws.amazon.com/apache2.0
9  *
10  * or in the "license" file accompanying this file. This file is distributed
11  * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12  * express or implied. See the License for the specific language governing
13  * permissions and limitations under the License.
14  */
16 #include <gtest/gtest.h>
19 #include <kinesis_video_msgs/KinesisVideoFrame.h>
23 #include <ros/callback_queue.h>
24 #include <ros/ros.h>
25 #include <rosgraph_msgs/Log.h>
26 
27 #include <queue>
28 
30 
31 /* Maximum time to wait in calls to callOne and callAvailable */
32 constexpr double kSingleCallbackWaitTime = 0.1;
33 constexpr double kMultipleCallbacksWaitTime = 5;
42 #define ROS_POSTCALLBACK_ASSERT_TRUE(expr) \
43  if (!(expr)) { \
44  ros::getGlobalCallbackQueue()->callAvailable(); \
45  if (!(expr)) { \
46  ros::getGlobalCallbackQueue()->callAvailable(ros::WallDuration(kSingleCallbackWaitTime)); \
47  if (!(expr)) { \
48  ros::getGlobalCallbackQueue()->callAvailable( \
49  ros::WallDuration(kMultipleCallbacksWaitTime)); \
50  } \
51  } \
52  } \
53  ASSERT_TRUE(expr)
54 
55 using namespace Aws::Kinesis;
56 using namespace Aws::Utils::Logging;
57 using namespace std;
58 
59 queue<rosgraph_msgs::Log> * kRosoutQueue = nullptr;
60 TestData * kTestData = nullptr;
61 
62 class KinesisVideoStreamerTestBase : public ::testing::Test
63 {
64 protected:
65  void SetUp() override
66  {
67  kTestData = &test_data;
68  stream_definition_provider = new MockStreamDefinitionProvider(&test_data);
69  subscription_installer =
70  new MockStreamSubscriptionInstaller(&test_data, *stream_manager, handle);
71  stream_manager = new MockStreamManager(&test_data, &parameter_reader,
72  stream_definition_provider, subscription_installer);
73  subscription_installer->set_stream_manager(stream_manager);
74  }
75 
76  void TearDown() override
77  {
78  /* Empty callback queue at the end of each test */
80  delete stream_manager;
81  delete stream_definition_provider;
82  delete subscription_installer;
83  }
84 
92 };
93 
98 {
99  int stream_count = 1;
100  parameter_reader.int_map_.insert({string("kinesis_video/stream_count"), stream_count});
101  parameter_reader.int_map_.at("kinesis_video/stream_count") = stream_count;
102  parameter_reader.int_map_.insert({string("kinesis_video/stream0/topic_type"), 1});
103  parameter_reader.string_map_.insert({string("kinesis_video/stream0/stream_name"), "stream-name"});
104  parameter_reader.string_map_.insert(
105  {string("kinesis_video/stream0/subscription_topic"), "topic-name"});
106  unique_ptr<StreamDefinition> stream_definition =
107  real_stream_definition_provider.GetStreamDefinition(ParameterPath(""), parameter_reader, nullptr, 0);
108  test_data.get_stream_definition_return_value = (StreamDefinition *)stream_definition.release();
109  KinesisManagerStatus setup_result = stream_manager->KinesisVideoStreamerSetup();
110  ASSERT_TRUE(KINESIS_MANAGER_STATUS_SUCCEEDED(setup_result));
111 }
112 
113 TEST_F(KinesisVideoStreamerTestBase, invalidCallbackShouldReturnFalse)
114 {
115  RosStreamSubscriptionInstaller * real_subscription_installer;
116  real_subscription_installer = new RosStreamSubscriptionInstaller(handle);
117  ASSERT_EQ(real_subscription_installer->SetupImageTransport(nullptr), false);
118  ASSERT_EQ(real_subscription_installer->SetupKinesisVideoFrameTransport(nullptr), false);
119  ASSERT_EQ(real_subscription_installer->SetupRekognitionEnabledKinesisVideoFrameTransport(nullptr), false);
120 }
121 
122 TEST_F(KinesisVideoStreamerTestBase, codecPrivateDataFailure)
123 {
124  int stream_count = 100;
125  parameter_reader.int_map_.insert({string("kinesis_video/stream_count"), stream_count});
129  test_data.Reset();
130  test_data.get_codec_private_data_return_value = KINESIS_MANAGER_STATUS_MALLOC_FAILED;
131  KinesisManagerStatus setup_result = stream_manager->KinesisVideoStreamerSetup();
132  ASSERT_TRUE(KINESIS_MANAGER_STATUS_FAILED(setup_result));
133  ASSERT_EQ(test_data.get_codec_private_data_call_count, stream_count);
134  ASSERT_EQ(test_data.get_stream_definition_call_count, 0);
135  ASSERT_EQ(test_data.subscribe_call_count, 0);
136 }
137 
138 TEST_F(KinesisVideoStreamerTestBase, invalidStreamDefinition)
139 {
140  int stream_count = 100;
141  parameter_reader.int_map_.insert({string("kinesis_video/stream_count"), stream_count});
145  test_data.Reset();
146  KinesisManagerStatus setup_result = stream_manager->KinesisVideoStreamerSetup();
147  ASSERT_TRUE(KINESIS_MANAGER_STATUS_FAILED(setup_result));
148  ASSERT_EQ(test_data.get_codec_private_data_call_count, stream_count);
149  ASSERT_EQ(test_data.get_stream_definition_call_count, stream_count);
150  ASSERT_EQ(test_data.subscribe_call_count, 0);
151 }
152 
153 TEST_F(KinesisVideoStreamerTestBase, streamInitializationFailures)
154 {
155  int stream_count = 1;
156  int initialization_attempts = 100;
157  parameter_reader.int_map_.insert({string("kinesis_video/stream_count"), stream_count});
161  test_data.Reset();
162  test_data.initialize_video_stream_return_value = KINESIS_MANAGER_STATUS_ERROR_BASE;
163  KinesisManagerStatus setup_result;
164  for (int idx = 0; idx < initialization_attempts; idx++) {
165  parameter_reader.int_map_.at("kinesis_video/stream_count") = stream_count;
166  unique_ptr<StreamDefinition> stream_definition =
167  real_stream_definition_provider.GetStreamDefinition(ParameterPath(""), parameter_reader, nullptr, 0);
168  test_data.get_stream_definition_return_value = (StreamDefinition *)stream_definition.release();
169  setup_result = stream_manager->KinesisVideoStreamerSetup();
170  ASSERT_TRUE(KINESIS_MANAGER_STATUS_FAILED(setup_result));
171  ASSERT_EQ(test_data.subscribe_call_count, 0);
172  }
173 
174  ASSERT_EQ(test_data.get_codec_private_data_call_count, initialization_attempts);
175  ASSERT_EQ(test_data.get_stream_definition_call_count, initialization_attempts);
176  ASSERT_EQ(test_data.initialize_video_stream_call_count, initialization_attempts);
180  test_data.Reset();
181  test_data.subscribe_return_value = KINESIS_MANAGER_STATUS_ERROR_BASE;
182  parameter_reader.int_map_.insert({string("kinesis_video/stream0/topic_type"), 1});
183  parameter_reader.string_map_.insert({string("kinesis_video/stream0/stream_name"), "stream-name"});
184  parameter_reader.string_map_.insert(
185  {string("kinesis_video/stream0/subscription_topic"), "topic-name"});
186  for (int idx = 0; idx < initialization_attempts; idx++) {
187  parameter_reader.int_map_.at("kinesis_video/stream_count") = stream_count;
188  unique_ptr<StreamDefinition> stream_definition =
189  real_stream_definition_provider.GetStreamDefinition(ParameterPath(""), parameter_reader, nullptr, 0);
190  test_data.get_stream_definition_return_value = (StreamDefinition *)stream_definition.release();
191  KinesisManagerStatus setup_result = stream_manager->KinesisVideoStreamerSetup();
192  ASSERT_TRUE(KINESIS_MANAGER_STATUS_FAILED(setup_result));
193  }
194  ASSERT_EQ(test_data.get_codec_private_data_call_count, initialization_attempts);
195  ASSERT_EQ(test_data.get_stream_definition_call_count, initialization_attempts);
196  ASSERT_EQ(test_data.initialize_video_stream_call_count, initialization_attempts);
197  ASSERT_EQ(test_data.subscribe_call_count, initialization_attempts);
198  ASSERT_EQ(test_data.free_stream_call_count, initialization_attempts);
199 }
200 
202  KinesisStreamManagerInterface & stream_manager, std::string stream_name,
203  const kinesis_video_msgs::KinesisVideoFrame::ConstPtr & frame_msg)
204 {
206 }
207 
209  std::string stream_name, const sensor_msgs::ImageConstPtr & image)
210 {
211  kTestData->image_callback_call_count++;
212 }
213 
215  KinesisStreamManagerInterface & stream_manager, std::string stream_name,
216  const kinesis_video_msgs::KinesisVideoFrame::ConstPtr & frame_msg,
217  const ros::Publisher & publisher)
218 {
220 }
221 
224  public ::testing::WithParamInterface<std::tuple<
225  Aws::Kinesis::KinesisVideoFrameTransportCallbackFn, Aws::Kinesis::ImageTransportCallbackFn,
226  Aws::Kinesis::RekognitionEnabledKinesisVideoFrameTransportCallbackFn>>
227 {
228 protected:
229  void SetUp()
230  {
232  real_subscription_installer = new RosStreamSubscriptionInstaller(handle);
233  real_subscription_installer->SetupKinesisVideoFrameTransport(std::get<0>(GetParam()));
234  real_subscription_installer->SetupImageTransport(std::get<1>(GetParam()));
235  real_subscription_installer->SetupRekognitionEnabledKinesisVideoFrameTransport(
236  std::get<2>(GetParam()));
237 
238  parameter_reader = TestParameterReader("kinesis_video/stream0/");
239  if (stream_manager) delete stream_manager;
240  stream_manager = new MockStreamManager(&test_data, &parameter_reader,
241  stream_definition_provider, real_subscription_installer);
242  real_subscription_installer->set_stream_manager(stream_manager);
243  }
244  void TearDown()
245  {
247  delete real_subscription_installer;
248  }
249 
250  void RunTest()
251  {
255  string subscription_topic_name("test0");
256  parameter_reader.int_map_.insert({string("kinesis_video/stream_count"), 1});
257  parameter_reader.int_map_.insert(
258  {string("kinesis_video/stream0/topic_type"), KINESIS_STREAM_INPUT_TYPE_KINESIS_VIDEO_FRAME});
259  parameter_reader.string_map_.insert(
260  {string("kinesis_video/stream0/subscription_topic"), subscription_topic_name});
261  unique_ptr<StreamDefinition> stream_definition =
262  real_stream_definition_provider.GetStreamDefinition(ParameterPath("kinesis_video", "stream0"),
263  parameter_reader, nullptr, 0);
264  test_data.get_stream_definition_return_value = (StreamDefinition *)stream_definition.release();
265  KinesisManagerStatus setup_result = stream_manager->KinesisVideoStreamerSetup();
266  ASSERT_TRUE(KINESIS_MANAGER_STATUS_SUCCEEDED(setup_result));
267 
268  int publish_call_count = kDefaultMessageQueueSize;
269  ros::Publisher kinesis_video_frame_publisher =
270  handle.advertise<kinesis_video_msgs::KinesisVideoFrame>(subscription_topic_name,
272  kinesis_video_msgs::KinesisVideoFrame message;
273  message.codec_private_data = {1, 2, 3};
274  for (int idx = 0; idx < publish_call_count; idx++) {
275  kinesis_video_frame_publisher.publish(message);
277  }
278  /* One of these will hold true depending on whether we're dealing with the mocked or the real
279  * callbacks. */
280  ROS_POSTCALLBACK_ASSERT_TRUE(test_data.kinesis_video_frame_callback_call_count ==
281  publish_call_count ||
282  test_data.put_frame_call_count == publish_call_count);
283  ASSERT_EQ(test_data.image_callback_call_count, 0);
284  if (test_data.put_frame_call_count == publish_call_count) {
285  ASSERT_EQ(test_data.process_codec_private_data_call_count, publish_call_count);
286  }
287 
291  test_data.Reset();
292  subscription_topic_name = string("test1");
293  parameter_reader.int_map_.at("kinesis_video/stream0/topic_type") =
295  parameter_reader.string_map_.at("kinesis_video/stream0/subscription_topic") =
296  subscription_topic_name;
297  stream_definition = real_stream_definition_provider.GetStreamDefinition(
298  ParameterPath("kinesis_video", "stream0"), parameter_reader, nullptr, 0);
299  test_data.get_stream_definition_return_value = (StreamDefinition *)stream_definition.release();
300  setup_result = stream_manager->KinesisVideoStreamerSetup();
301  ASSERT_TRUE(KINESIS_MANAGER_STATUS_SUCCEEDED(setup_result));
302 
304  image_transport::Publisher image_publisher =
305  it.advertise(subscription_topic_name, kDefaultMessageQueueSize);
306  sensor_msgs::Image image_message;
307  for (int idx = 0; idx < publish_call_count; idx++) {
308  image_publisher.publish(image_message);
310  }
311  ASSERT_EQ(test_data.kinesis_video_frame_callback_call_count, 0);
312  ROS_POSTCALLBACK_ASSERT_TRUE(test_data.image_callback_call_count == publish_call_count ||
313  test_data.put_frame_call_count == publish_call_count);
314 
318  test_data.Reset();
319  string rekognition_results_topic = "/rekognition/results";
320  subscription_topic_name = string("test2");
321  parameter_reader.int_map_.at("kinesis_video/stream0/topic_type") =
323  parameter_reader.string_map_.at("kinesis_video/stream0/subscription_topic") =
324  subscription_topic_name;
325  parameter_reader.string_map_.insert(
326  {"kinesis_video/stream0/rekognition_topic_name", rekognition_results_topic});
327  parameter_reader.string_map_.insert(
328  {"kinesis_video/stream0/rekognition_data_stream", "kinesis-sample"});
329  stream_definition = real_stream_definition_provider.GetStreamDefinition(
330  ParameterPath("kinesis_video", "stream0"), parameter_reader, nullptr, 0);
331  test_data.get_stream_definition_return_value = (StreamDefinition *)stream_definition.release();
332  setup_result = stream_manager->KinesisVideoStreamerSetup();
333  ASSERT_TRUE(KINESIS_MANAGER_STATUS_SUCCEEDED(setup_result));
334 
335  kinesis_video_frame_publisher = handle.advertise<kinesis_video_msgs::KinesisVideoFrame>(
336  subscription_topic_name, kDefaultMessageQueueSize);
337  message = kinesis_video_msgs::KinesisVideoFrame();
338  message.codec_private_data = {1, 2, 3};
339  for (int idx = 0; idx < publish_call_count; idx++) {
340  kinesis_video_frame_publisher.publish(message);
342  }
343  /* One of these will hold true depending on whether we're dealing with the mocked or the real
344  * callbacks. */
346  test_data.rekognition_kinesis_video_frame_callback_call_count == publish_call_count ||
347  (test_data.put_frame_call_count == publish_call_count &&
348  test_data.fetch_rekognition_results_call_count == publish_call_count));
349  ASSERT_EQ(test_data.image_callback_call_count, 0);
350  if (test_data.put_frame_call_count == publish_call_count) {
351  ASSERT_EQ(test_data.process_codec_private_data_call_count, publish_call_count);
352  }
353  /* Check that a publisher to rekognition_results_topic has been created */
354  string cmd_line = "rostopic list -v | grep " + rekognition_results_topic;
355  string expected_rostopic_list_output =
356  " * " + rekognition_results_topic + " [std_msgs/String] 1 publisher\n";
357  shared_ptr<FILE> pipe(popen(cmd_line.c_str(), "r"), pclose);
358  array<char, 256> buffer;
359  string rostopic_list_output;
360  while (!feof(pipe.get())) {
361  if (nullptr != fgets(buffer.data(), buffer.size(), pipe.get())) {
362  rostopic_list_output += buffer.data();
363  }
364  }
365  ASSERT_EQ(rostopic_list_output, expected_rostopic_list_output);
366  }
367 
369 };
370 
371 TEST_P(KinesisVideoStreamerE2ETest, E2ETest) { RunTest(); }
372 
373 vector<
389  ::testing::ValuesIn(callback_tuples));
390 
391 TEST(StreamerGlobalSuite, rosParameterConstruction)
392 {
393  int stream_idx = 7;
394  const char * parameter_name = "my_param";
395  string kinesis_video_prefix = "kinesis_video/";
396  string parameter_path_prefix =
397  kinesis_video_prefix + string("stream") + to_string(stream_idx);
398  string parameter_path = string("/") + parameter_path_prefix + string(parameter_name);
399 
400  ASSERT_EQ(parameter_path_prefix + string("/") + string(parameter_name),
401  Aws::Kinesis::GetStreamParameterPath(stream_idx, parameter_name).get_resolved_path('/', '/'));
402  ASSERT_EQ(parameter_path_prefix,
403  Aws::Kinesis::GetStreamParameterPrefix(stream_idx).get_resolved_path('/', '/'));
404  ASSERT_EQ(kinesis_video_prefix + string(parameter_name),
405  Aws::Kinesis::GetKinesisVideoParameter(parameter_name).get_resolved_path('/', '/'));
406 }
407 
408 void rosout_logger_callback(const rosgraph_msgs::Log & published_log)
409 {
410  kRosoutQueue->push(published_log);
411 }
412 
413 int main(int argc, char ** argv)
414 {
415  testing::InitGoogleTest(&argc, argv);
416 
417  Aws::Utils::Logging::InitializeAWSLogging(
418  Aws::MakeShared<Aws::Utils::Logging::AWSROSLogger>("kinesis_video_streamer_test"));
419 
420  queue<rosgraph_msgs::Log> rosout_queue;
421  /* Bootstrap the queue with an empty message so we can access .back without checking if it exists.
422  */
423  rosout_queue.push(rosgraph_msgs::Log());
424  kRosoutQueue = &rosout_queue;
425  ros::init(argc, argv, "test_kinesis_video_streamer");
426  ros::NodeHandle n;
427  ros::Subscriber sub = n.subscribe("/rosout", 1, rosout_logger_callback);
428  int ret = RUN_ALL_TESTS();
429 
430  Aws::Utils::Logging::ShutdownAWSLogging();
431 
432  return ret;
433 }
const char * stream_count
void(* KinesisVideoFrameTransportCallbackFn)(KinesisStreamManagerInterface &stream_manager, std::string stream_name, const kinesis_video_msgs::KinesisVideoFrame::ConstPtr &frame_msg)
void ImageTransportTestCallback(const KinesisStreamManagerInterface &stream_manager, std::string stream_name, const sensor_msgs::ImageConstPtr &image)
uint32_t rekognition_kinesis_video_frame_callback_call_count
constexpr uint32_t kDefaultMessageQueueSize
KINESIS_MANAGER_STATUS_MALLOC_FAILED
INSTANTIATE_TEST_CASE_P(End2EndTest, KinesisVideoStreamerE2ETest,::testing::ValuesIn(callback_tuples))
bool SetupImageTransport(const ImageTransportCallbackFn callback)
void publish(const boost::shared_ptr< M > &message) const
#define KINESIS_MANAGER_STATUS_FAILED(status)
Subscriber subscribe(const std::string &topic, uint32_t queue_size, void(T::*fp)(M), T *obj, const TransportHints &transport_hints=TransportHints())
MockStreamDefinitionProvider * stream_definition_provider
enum Aws::Kinesis::kinesis_manager_status_e KinesisManagerStatus
StreamDefinitionProvider real_stream_definition_provider
ROSCPP_DECL void init(int &argc, char **argv, const std::string &name, uint32_t options=0)
Publisher advertise(const std::string &base_topic, uint32_t queue_size, bool latch=false)
CallOneResult callOne()
#define KINESIS_MANAGER_STATUS_SUCCEEDED(status)
RosStreamSubscriptionInstaller * real_subscription_installer
void rosout_logger_callback(const rosgraph_msgs::Log &published_log)
TEST_F(KinesisVideoStreamerTestBase, sanity)
Aws::Client::ParameterPath GetStreamParameterPrefix(int stream_idx)
ROSCPP_DECL CallbackQueue * getGlobalCallbackQueue()
bool SetupKinesisVideoFrameTransport(const KinesisVideoFrameTransportCallbackFn callback)
TestData * kTestData
void publish(const sensor_msgs::Image &message) const
void(* RekognitionEnabledKinesisVideoFrameTransportCallbackFn)(KinesisStreamManagerInterface &stream_manager, std::string stream_name, const kinesis_video_msgs::KinesisVideoFrame::ConstPtr &frame_msg, const ros::Publisher &publisher)
void(* ImageTransportCallbackFn)(const KinesisStreamManagerInterface &stream_manager, std::string stream_name, const sensor_msgs::ImageConstPtr &image)
const char * stream_name
int main(int argc, char **argv)
KINESIS_MANAGER_STATUS_ERROR_BASE
Aws::Client::ParameterPath GetStreamParameterPath(int stream_idx, const char *parameter_name)
void RekognitionEnabledKinesisVideoFrameTransportCallback(KinesisStreamManagerInterface &stream_manager, std::string stream_name, const kinesis_video_msgs::KinesisVideoFrame::ConstPtr &frame_msg, const ros::Publisher &publisher)
void KinesisVideoFrameTransportCallback(KinesisStreamManagerInterface &stream_manager, std::string stream_name, const kinesis_video_msgs::KinesisVideoFrame::ConstPtr &frame_msg)
MockStreamSubscriptionInstaller * subscription_installer
Aws::Client::ParameterPath GetKinesisVideoParameter(const char *parameter_name)
queue< rosgraph_msgs::Log > * kRosoutQueue
void ImageTransportCallback(const KinesisStreamManagerInterface &stream_manager, std::string stream_name, const sensor_msgs::ImageConstPtr &image)
#define ROS_POSTCALLBACK_ASSERT_TRUE(expr)
Some tests rely on ROS callback processing which may take time. To avoid unnecessary waits we define ...
void RekognitionEnabledKinesisVideoFrameTransportTestCallback(KinesisStreamManagerInterface &stream_manager, std::string stream_name, const kinesis_video_msgs::KinesisVideoFrame::ConstPtr &frame_msg, const ros::Publisher &publisher)
constexpr double kSingleCallbackWaitTime
constexpr double kMultipleCallbacksWaitTime
vector< tuple< Aws::Kinesis::KinesisVideoFrameTransportCallbackFn, Aws::Kinesis::ImageTransportCallbackFn, Aws::Kinesis::RekognitionEnabledKinesisVideoFrameTransportCallbackFn > > callback_tuples
TEST_P(KinesisVideoStreamerE2ETest, E2ETest)
void KinesisVideoFrameTransportTestCallback(KinesisStreamManagerInterface &stream_manager, std::string stream_name, const kinesis_video_msgs::KinesisVideoFrame::ConstPtr &frame_msg)
uint32_t kinesis_video_frame_callback_call_count
bool SetupRekognitionEnabledKinesisVideoFrameTransport(const RekognitionEnabledKinesisVideoFrameTransportCallbackFn callback)
TEST(StreamerGlobalSuite, rosParameterConstruction)


kinesis_video_streamer
Author(s): AWS RoboMaker
autogenerated on Fri Mar 5 2021 03:29:15