kinesis_manager_test.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License").
5  * You may not use this file except in compliance with the License.
6  * A copy of the License is located at
7  *
8  * http://aws.amazon.com/apache2.0
9  *
10  * or in the "license" file accompanying this file. This file is distributed
11  * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12  * express or implied. See the License for the specific language governing
13  * permissions and limitations under the License.
14  */
15 
16 #include <aws/core/Aws.h>
18 #include <gtest/gtest.h>
19 #include <gmock/gmock.h>
20 #include <kinesis-video-producer/KinesisVideoProducer.h>
21 #include <kinesis-video-producer/Logger.h>
22 #include <kinesis_manager/common.h>
25 #include <aws/kinesis/model/GetShardIteratorRequest.h>
26 #include <aws/kinesis/model/ListShardsRequest.h>
27 
28 using namespace std;
29 using namespace com::amazonaws::kinesis::video;
30 using namespace Aws;
31 using namespace Aws::Client;
32 using namespace Aws::Kinesis;
33 
34 LOGGER_TAG("aws.kinesis.kinesis_manager_unittest");
35 
36 #define PARAM_NS_SEPARATOR "/"
37 #define PARAM_NS_SEPARATOR_CHAR '/'
38 
39 using namespace std;
40 using namespace std::chrono;
41 using namespace Aws;
42 using namespace Aws::Kinesis;
43 using namespace com::amazonaws::kinesis::video;
44 using ::testing::NiceMock;
45 using ::testing::_;
46 using ::testing::A;
47 using ::testing::Return;
48 using ::testing::Eq;
49 using ::testing::StrEq;
50 using ::testing::InSequence;
51 using ::testing::DoAll;
52 using ::testing::SetArgReferee;
53 using ::testing::ContainerEq;
54 using Aws::AwsError;
55 
60 {
61 public:
62  TestParameterReader(string test_prefix)
63  {
64  int_map_ = {
65  {test_prefix + PARAM_NS_SEPARATOR "retention_period", 2},
66  {test_prefix + PARAM_NS_SEPARATOR "streaming_type", 0},
67  {test_prefix + PARAM_NS_SEPARATOR "max_latency", 0},
68  {test_prefix + PARAM_NS_SEPARATOR "fragment_duration", 2},
69  {test_prefix + PARAM_NS_SEPARATOR "timecode_scale", 1},
70  {test_prefix + PARAM_NS_SEPARATOR "nal_adaptation_flags",
71  NAL_ADAPTATION_ANNEXB_NALS | NAL_ADAPTATION_ANNEXB_CPD_NALS},
72  {test_prefix + PARAM_NS_SEPARATOR"frame_rate", 24},
73  {test_prefix + PARAM_NS_SEPARATOR "avg_bandwidth_bps", 4 * 1024 * 1024},
74  {test_prefix + PARAM_NS_SEPARATOR "buffer_duration", 120},
75  {test_prefix + PARAM_NS_SEPARATOR "replay_duration", 40},
76  {test_prefix + PARAM_NS_SEPARATOR "connection_staleness", 30},
77  };
78  bool_map_ = {
79  {test_prefix + PARAM_NS_SEPARATOR "key_frame_fragmentation", true},
80  {test_prefix + PARAM_NS_SEPARATOR "frame_timecodes", true},
81  {test_prefix + PARAM_NS_SEPARATOR "absolute_fragment_time", true},
82  {test_prefix + PARAM_NS_SEPARATOR "fragment_acks", true},
83  {test_prefix + PARAM_NS_SEPARATOR "restart_on_error", true},
84  {test_prefix + PARAM_NS_SEPARATOR "recalculate_metrics", true},
85  };
86  string_map_ = {
87  {test_prefix + PARAM_NS_SEPARATOR "stream_name", "testStream"},
88  {test_prefix + PARAM_NS_SEPARATOR "kms_key_id", ""},
89  {test_prefix + PARAM_NS_SEPARATOR "content_type", "video/h264"},
90  {test_prefix + PARAM_NS_SEPARATOR "codec_id", "V_MPEG4/ISO/AVC"},
91  {test_prefix + PARAM_NS_SEPARATOR "track_name", "kinesis_video"},
92  };
93  map_map_ = {
94  {test_prefix + PARAM_NS_SEPARATOR "tags", {{"someKey", "someValue"}}},
95  };
96  }
97 
98  TestParameterReader(const vector<string> & test_prefix)
99  {
100  TestParameterReader(Join(test_prefix));
101  }
102 
103  TestParameterReader(map<string, int> int_map, map<string, bool> bool_map,
104  map<string, string> string_map, map<string, map<string, string>> map_map)
105  : int_map_(int_map), bool_map_(bool_map), string_map_(string_map), map_map_(map_map)
106  {
107  }
108 
109  string Join(const vector<string> & test_prefix)
110  {
111  string expanded;
112  for (auto item = test_prefix.begin(); item != test_prefix.end(); item++) {
113  expanded += *item + PARAM_NS_SEPARATOR;
114  }
115  if (!expanded.empty() && expanded.back() == PARAM_NS_SEPARATOR_CHAR) {
116  expanded.pop_back();
117  }
118  return expanded;
119  }
120 
121  AwsError ReadParam(const ParameterPath & param_path, int & out) const
122  {
123  AwsError result = AWS_ERR_NOT_FOUND;
124  std::string name = FormatParameterPath(param_path);
125  if (int_map_.count(name) > 0) {
126  out = int_map_.at(name);
127  result = AWS_ERR_OK;
128  }
129  return result;
130  }
131 
132  AwsError ReadParam(const ParameterPath & param_path, bool & out) const
133  {
134  AwsError result = AWS_ERR_NOT_FOUND;
135  std::string name = FormatParameterPath(param_path);
136  if (bool_map_.count(name) > 0) {
137  out = bool_map_.at(name);
138  result = AWS_ERR_OK;
139  }
140  return result;
141  }
142 
143  AwsError ReadParam(const ParameterPath & param_path, string & out) const
144  {
145  AwsError result = AWS_ERR_NOT_FOUND;
146  std::string name = FormatParameterPath(param_path);
147  if (string_map_.count(name) > 0) {
148  out = string_map_.at(name);
149  result = AWS_ERR_OK;
150  }
151  return result;
152  }
153 
154  AwsError ReadParam(const ParameterPath & param_path, Aws::String & out) const
155  {
156  return AWS_ERR_EMPTY;
157  }
158 
159  AwsError ReadParam(const ParameterPath & param_path, map<string, string> & out) const
160  {
161  AwsError result = AWS_ERR_NOT_FOUND;
162  std::string name = FormatParameterPath(param_path);
163  if (map_map_.count(name) > 0) {
164  out = map_map_.at(name);
165  result = AWS_ERR_OK;
166  }
167  return result;
168  }
169 
170  AwsError ReadParam(const ParameterPath & param_path, std::vector<std::string> & out) const
171  {
172  return AWS_ERR_EMPTY;
173  }
174 
175  AwsError ReadParam(const ParameterPath & param_path, double & out) const { return AWS_ERR_EMPTY; }
176 
177  map<string, int> int_map_;
178  map<string, bool> bool_map_;
179  map<string, string> string_map_;
180  map<string, map<string, string>> map_map_;
181 
182 
183  static string DoFormatParameterPath(const ParameterPath & param_path)
184  {
186  }
187 
188 private:
189  string FormatParameterPath(const ParameterPath & param_path) const
190  {
191  return DoFormatParameterPath(param_path);
192  }
193 };
194 
201 static bool are_streams_equivalent(unique_ptr<StreamDefinition> stream1,
202  unique_ptr<StreamDefinition> stream2)
203 {
204  bool result = true;
205  StreamInfo stream1_info = stream1->getStreamInfo();
206  StreamInfo stream2_info = stream2->getStreamInfo();
210  if (stream1_info.streamCaps.trackInfoList[0].codecPrivateDataSize !=
211  stream2_info.streamCaps.trackInfoList[0].codecPrivateDataSize) {
212  return false;
213  } else {
214  result &= (0 == memcmp((void *)&(stream1_info.streamCaps.trackInfoList[0].codecPrivateData),
215  (void *)&(stream2_info.streamCaps.trackInfoList[0].codecPrivateData),
216  stream1_info.streamCaps.trackInfoList[0].codecPrivateDataSize));
217  }
218  if (stream1_info.tagCount != stream2_info.tagCount) {
219  return false;
220  } else {
221  for (int tag_idx = 0; tag_idx < stream1_info.tagCount; tag_idx++) {
222  result &= (stream1_info.tags[tag_idx].version == stream2_info.tags[tag_idx].version);
223  result &= (0 == strncmp(stream1_info.tags[tag_idx].name, stream2_info.tags[tag_idx].name,
224  MAX_TAG_NAME_LEN));
225  result &= (0 == strncmp(stream1_info.tags[tag_idx].value, stream2_info.tags[tag_idx].value,
226  MAX_TAG_VALUE_LEN));
227  }
228  }
232  stream1_info.streamCaps.trackInfoList = nullptr;
233  stream2_info.streamCaps.trackInfoList = nullptr;
234  stream1_info.tags = nullptr;
235  stream2_info.tags = nullptr;
236  result &= (0 == memcmp((void *)&(stream1_info), (void *)&(stream2_info), sizeof(stream1_info)));
237  return result;
238 }
239 
243 unique_ptr<StreamDefinition> DefaultProducerSetup(
244  KinesisStreamManager & stream_manager,
245  string region, string test_prefix, std::shared_ptr<ParameterReaderInterface> parameter_reader,
247 {
248 #ifdef PLATFORM_TESTING_ACCESS_KEY
249  setenv("AWS_ACCESS_KEY_ID", PLATFORM_TESTING_ACCESS_KEY, 1);
250 #endif
251 #ifdef PLATFORM_TESTING_SECRET_KEY
252  setenv("AWS_SECRET_ACCESS_KEY", PLATFORM_TESTING_SECRET_KEY, 1);
253 #endif
254  stream_manager.InitializeVideoProducer(region, video_producer_factory);
255 
256  StreamDefinitionProvider stream_definition_provider;
257  unique_ptr<StreamDefinition> stream_definition = stream_definition_provider.GetStreamDefinition(
258  ParameterPath(test_prefix.c_str()), *parameter_reader, nullptr, 0);
259  return move(stream_definition);
260 }
261 
265 unique_ptr<StreamDefinition> DefaultProducerSetup(
266  KinesisStreamManager & stream_manager,
267  string region, string test_prefix,
269 {
270  std::shared_ptr<ParameterReaderInterface> parameter_reader =
271  std::make_shared<TestParameterReader>(test_prefix);
272  return DefaultProducerSetup(stream_manager, region, test_prefix, parameter_reader, video_producer_factory);
273 }
274 
278 class KinesisClientMock : public KinesisClient
279 {
280 public:
281  MOCK_CONST_METHOD0(GetServiceClientName, const char *());
282  MOCK_CONST_METHOD1(ListShards, Model::ListShardsOutcome(const Model::ListShardsRequest&));
283  MOCK_CONST_METHOD1(GetShardIterator,
284  Model::GetShardIteratorOutcome(const Model::GetShardIteratorRequest&));
285  MOCK_CONST_METHOD1(GetRecords,
286  Model::GetRecordsOutcome(const Model::GetRecordsRequest&));
287 };
288 
290 {
291 public:
292  MOCK_CONST_METHOD0(IsReady, bool());
293  MOCK_METHOD0(Stop, bool());
294  MOCK_CONST_METHOD1(PutFrame, bool(KinesisVideoFrame));
295  MOCK_METHOD3(PutFragmentMetadata, bool(const std::string&, const std::string&, bool));
296 };
297 
299 {
300 public:
301  std::shared_ptr<KinesisVideoStreamInterface> CreateStreamSync(std::unique_ptr<StreamDefinition> stream_definition) {
302  return CreateStreamSyncProxy(stream_definition.get());
303  }
304  MOCK_METHOD1(CreateStreamSyncProxy,
305  std::shared_ptr<KinesisVideoStreamInterface>(StreamDefinition* stream_definition));
306  MOCK_METHOD1(FreeStream, void(std::shared_ptr<KinesisVideoStreamInterface> kinesis_video_stream));
307 };
308 
309 namespace Aws {
310 namespace Kinesis {
311 
312 namespace Model
313 {
314  bool operator==(const Record & left, const Record & right)
315  {
316  bool result = true;
317 
318  result &= (left.GetSequenceNumber() == right.GetSequenceNumber());
319  result &= (left.GetApproximateArrivalTimestamp() == right.GetApproximateArrivalTimestamp());
320  result &= (left.GetData() == right.GetData());
321  result &= (left.GetPartitionKey() == right.GetPartitionKey());
322  result &= (left.GetEncryptionType() == right.GetEncryptionType());
323 
324  return true;
325  }
326 
327 } // namespace Model
328 } // namespace Kinesis
329 } // namespace Aws
330 
332 {
333 public:
334  MOCK_CONST_METHOD1(Install, KinesisManagerStatus(const StreamSubscriptionDescriptor & descriptor));
335  MOCK_METHOD1(Uninstall, void(const std::string & topic_name));
336 };
337 
339 {
340 public:
341  MOCK_CONST_METHOD4(GetCodecPrivateData,
342  KinesisManagerStatus(const ParameterPath &, const ParameterReaderInterface &, PBYTE *, uint32_t *));
343 };
344 
346 {
347 public:
348  MOCK_CONST_METHOD4(GetCodecPrivateData,
349  KinesisManagerStatus(const ParameterPath &, const ParameterReaderInterface &, PBYTE *, uint32_t *));
350 
351  MOCK_CONST_METHOD4(GetStreamDefinitionProxy,
352  StreamDefinition*(const ParameterPath &, const ParameterReaderInterface &, const PBYTE, uint32_t));
353 
354  unique_ptr<StreamDefinition> GetStreamDefinition(const ParameterPath & prefix,
355  const ParameterReaderInterface & reader, const PBYTE codec_private_data,
356  uint32_t codec_private_data_size) const override
357  {
358  StreamDefinition* stream_definition = GetStreamDefinitionProxy(prefix, reader,
359  codec_private_data, codec_private_data_size);
360  return std::unique_ptr<StreamDefinition>(stream_definition);
361  }
362 };
363 
365  unique_ptr<KinesisVideoProducerInterface> video_producer)
366  {
367  return [& video_producer](
368  std::string region,
369  unique_ptr<com::amazonaws::kinesis::video::DeviceInfoProvider> device_info_provider,
370  unique_ptr<com::amazonaws::kinesis::video::ClientCallbackProvider> client_callback_provider,
371  unique_ptr<com::amazonaws::kinesis::video::StreamCallbackProvider> stream_callback_provider,
372  unique_ptr<com::amazonaws::kinesis::video::CredentialProvider> credential_provider
373  ) -> unique_ptr<KinesisVideoProducerInterface> {
374  return std::move(video_producer);
375  };
376  }
377 
378 class KinesisStreamManagerMockingFixture : public ::testing::Test
379 {
380 public:
382  {
383  parameter_reader_ = std::make_shared<TestParameterReader>(int_map_, bool_map_, string_map_, map_map_);
384  }
385 
386 protected:
387  string test_prefix_ = "some/test/prefix";
388  string encoded_string_ = "aGVsbG8gd29ybGQ=";
389  map<string, int> int_map_ = {};
390  map<string, bool> bool_map_ = {};
391  map<string, string> tags_;
392  map<string, map<string, string>> map_map_ = {};
393  map<string, string> string_map_ = {
394  {test_prefix_ + "codecPrivateData", encoded_string_},
395  };
396 
397  std::shared_ptr<ParameterReaderInterface> parameter_reader_;
398 
401 };
402 
403 TEST_F(KinesisStreamManagerMockingFixture, testPutMetadataNotInitialized)
404 {
405  std::unique_ptr<NiceMock<KinesisClientMock>> kinesis_client = std::unique_ptr<NiceMock<KinesisClientMock>>{};
406  KinesisStreamManager stream_manager(parameter_reader_.get(), & stream_definition_provider_,
407  & subscription_installer_, std::move(kinesis_client));
408  std::string stream_name = "stream_name1";
409  std::string metadata_name = "metadata_name";
410  std::string metadata_value = "metadata_value";
411 
412  auto status = stream_manager.PutMetadata(stream_name, metadata_name, metadata_value);
413 
414  ASSERT_TRUE(KINESIS_MANAGER_STATUS_FAILED(status));
415 }
416 
417 TEST_F(KinesisStreamManagerMockingFixture, testPutMetadataStreamNotReady)
418 {
419  KinesisStreamManager stream_manager;
420  std::string test_prefix = "kinesis_video";
421  std::shared_ptr<ParameterReaderInterface> parameter_reader = std::make_shared<TestParameterReader>(test_prefix);
422  std::string stream_name;
423  parameter_reader->ReadParam(GetKinesisVideoParameter(kStreamParameters.stream_name), stream_name);
424  std::string metadata_name = "metadata_name";
425  std::string metadata_value = "metadata_value";
426  auto video_producer = std::make_unique<KinesisVideoProducerMock>();
427  auto video_stream_mock = std::make_shared<KinesisVideoStreamMock>();
428 
429  EXPECT_CALL(*video_producer.get(), CreateStreamSyncProxy(_))
430  .WillOnce(Return(video_stream_mock));
431  EXPECT_CALL(*video_stream_mock, IsReady())
432  .WillOnce(Return(false));
433 
434  auto stream_definition = DefaultProducerSetup(stream_manager, std::string("us-west-2"), test_prefix,
435  ConstVideoProducerFactory(std::move(video_producer)));
436 
437  auto status = stream_manager.InitializeVideoStream(std::move(stream_definition));
438  EXPECT_EQ(KINESIS_MANAGER_STATUS_SUCCESS, status);
439 
440  status = stream_manager.PutMetadata(stream_name, metadata_name, metadata_value);
442 }
443 
445 {
446  KinesisStreamManager stream_manager;
447  std::string test_prefix = "kinesis_video";
448  std::shared_ptr<ParameterReaderInterface> parameter_reader = std::make_shared<TestParameterReader>(test_prefix);
449  std::string stream_name;
450  parameter_reader->ReadParam(GetKinesisVideoParameter(kStreamParameters.stream_name), stream_name);
451  std::string metadata_name = "metadata_name";
452  std::string metadata_value = "metadata_value";
453  auto video_producer = std::make_unique<KinesisVideoProducerMock>();
454  auto video_stream_mock = std::make_shared<KinesisVideoStreamMock>();
455 
456  EXPECT_CALL(*video_producer.get(), CreateStreamSyncProxy(_))
457  .WillOnce(Return(video_stream_mock));
458 
459  auto status = stream_manager.InitializeVideoProducer(string("us-west-2"),
460  ConstVideoProducerFactory(std::move(video_producer)));
461  EXPECT_EQ(KINESIS_MANAGER_STATUS_SUCCESS, status);
462 
463  ON_CALL(*video_stream_mock, IsReady())
464  .WillByDefault(Return(true));
465  {
466  InSequence video_stream_mock_seq;
467 
468  EXPECT_CALL(*video_stream_mock, PutFragmentMetadata(StrEq(metadata_name), StrEq(metadata_value), _))
469  .WillOnce(Return(false));
470 
471  EXPECT_CALL(*video_stream_mock, PutFragmentMetadata(StrEq(metadata_name), StrEq(metadata_value), _))
472  .WillOnce(Return(true));
473  }
474 
475  auto stream_definition = DefaultProducerSetup(stream_manager, std::string("us-west-2"), test_prefix,
476  ConstVideoProducerFactory(std::move(video_producer)));
477  status = stream_manager.InitializeVideoStream(std::move(stream_definition));
478  EXPECT_EQ(KINESIS_MANAGER_STATUS_SUCCESS, status);
479 
480  status = stream_manager.PutMetadata(stream_name, metadata_name, metadata_value);
481  ASSERT_TRUE(KINESIS_MANAGER_STATUS_FAILED(status));
482  status = stream_manager.PutMetadata(stream_name, metadata_name, metadata_value);
483  ASSERT_TRUE(KINESIS_MANAGER_STATUS_SUCCEEDED(status));
484 }
485 
487 {
488  KinesisStreamManager stream_manager;
489  std::string test_prefix = "kinesis_video";
490  std::shared_ptr<ParameterReaderInterface> parameter_reader = std::make_shared<TestParameterReader>(test_prefix);
491  std::string stream_name;
492  parameter_reader->ReadParam(GetKinesisVideoParameter(kStreamParameters.stream_name), stream_name);
493  auto video_producer = std::make_unique<KinesisVideoProducerMock>();
494  auto video_stream_mock = std::make_shared<KinesisVideoStreamMock>();
495 
496  EXPECT_CALL(*video_producer.get(), CreateStreamSyncProxy(_))
497  .WillOnce(Return(video_stream_mock));
498  EXPECT_CALL(*video_producer.get(), FreeStream(_)).Times(1);
499 
500  auto stream_definition = DefaultProducerSetup(stream_manager, std::string("us-west-2"), test_prefix,
501  ConstVideoProducerFactory(std::move(video_producer)));
502  auto status = stream_manager.InitializeVideoStream(std::move(stream_definition));
503  EXPECT_EQ(KINESIS_MANAGER_STATUS_SUCCESS, status);
504 
505  ON_CALL(*video_stream_mock, IsReady())
506  .WillByDefault(Return(true));
507  EXPECT_CALL(*video_stream_mock, Stop()).Times(1);
508 
509  stream_manager.FreeStream(stream_name);
510 }
511 
512 TEST_F(KinesisStreamManagerMockingFixture, testProcessCodecPrivateDataForStreamKinesisVideoStreamSetupFailure)
513 {
514  std::string stream_name = "stream_name1";
515  std::string topic_name = "topic1";
516  std::vector<uint8_t> codec_private_data = {1,2,3};
517  int stream_idx = 0;
518  int stream_count_param = 1;
519  map<string, int> int_map = {
521  stream_count_param}
522  };
523  map<string, bool> bool_map;
524  map<string, string> string_map = {
525  {TestParameterReader::DoFormatParameterPath(GetStreamParameterPath(stream_idx, kStreamParameters.stream_name)),
526  stream_name},
527  {TestParameterReader::DoFormatParameterPath(GetStreamParameterPath(stream_idx, kStreamParameters.topic_name)),
528  topic_name}
529  };
530  map<string, map<string, string>> map_map;
531  TestParameterReader parameter_reader(int_map, bool_map, string_map, map_map);
532 
533  StreamDefinitionProviderFullMock stream_definition_provider;
534  std::unique_ptr<NiceMock<KinesisClientMock>> kinesis_client = std::unique_ptr<NiceMock<KinesisClientMock>>{};
535  KinesisStreamManager stream_manager(&parameter_reader,
536  & stream_definition_provider, & subscription_installer_, std::move(kinesis_client));
537 
538  // force failure on KinesisVideoStreamSetup, and thus recovery path
539  EXPECT_CALL(stream_definition_provider, GetStreamDefinitionProxy(_,_,_,_))
540  .WillOnce(Return(nullptr));
541 
542  EXPECT_CALL(subscription_installer_, Uninstall(StrEq(topic_name)))
543  .Times(1);
544 
545  auto status = stream_manager.ProcessCodecPrivateDataForStream(stream_name, codec_private_data);
546 
547  ASSERT_TRUE(KINESIS_MANAGER_STATUS_FAILED(status));
548 }
549 
550 TEST_F(KinesisStreamManagerMockingFixture, testKinesisVideoStreamSetupZeroStreamCount)
551 {
552  map<string, int> int_map = {
553  {TestParameterReader::DoFormatParameterPath(GetKinesisVideoParameter(kStreamParameters.stream_count)), 0}
554  };
555  auto parameter_reader = std::make_shared<TestParameterReader>(int_map, bool_map_, string_map_, map_map_);
556  std::unique_ptr<NiceMock<KinesisClientMock>> kinesis_client = std::unique_ptr<NiceMock<KinesisClientMock>>{};
557  KinesisStreamManager stream_manager(parameter_reader.get(), & stream_definition_provider_,
558  & subscription_installer_, std::move(kinesis_client));
559 
560  auto status = stream_manager.KinesisVideoStreamerSetup();
561 
562  ASSERT_TRUE(KINESIS_MANAGER_STATUS_FAILED(status));
563 }
564 
565 TEST_F(KinesisStreamManagerMockingFixture, testKinesisVideoStreamSetupSingleStreamFailsGetCodecPrivateData)
566 {
567  map<string, int> int_map = {
568  {TestParameterReader::DoFormatParameterPath(GetKinesisVideoParameter(kStreamParameters.stream_count)), 1}
569  };
570  auto parameter_reader = std::make_shared<TestParameterReader>(int_map, bool_map_, string_map_, map_map_);
571  StreamDefinitionProviderPartialMock stream_definition_provider;
572  std::unique_ptr<NiceMock<KinesisClientMock>> kinesis_client = std::unique_ptr<NiceMock<KinesisClientMock>>{};
573  KinesisStreamManager stream_manager(parameter_reader.get(), & stream_definition_provider,
574  & subscription_installer_, std::move(kinesis_client));
575 
576  EXPECT_CALL(stream_definition_provider, GetCodecPrivateData(_,_,_,_))
577  .WillOnce(Return(KINESIS_MANAGER_STATUS_ERROR_BASE));
578 
579  auto status = stream_manager.KinesisVideoStreamerSetup();
580 
581  ASSERT_TRUE(KINESIS_MANAGER_STATUS_FAILED(status));
582 }
583 
584 TEST_F(KinesisStreamManagerMockingFixture, testKinesisVideoStreamSetupAndFetchRekognitionResultsSingleStreamSuccessful)
585 {
586  int stream_idx = 0;
587  std::string stream_name = "stream_name1";
588  map<string, int> int_map = {
589  {TestParameterReader::DoFormatParameterPath(GetKinesisVideoParameter(kStreamParameters.stream_count)), 1},
590  {TestParameterReader::DoFormatParameterPath(GetStreamParameterPath(stream_idx, kStreamParameters.topic_type)), 42}
591  };
592  map<string, string> string_map = {
593  {TestParameterReader::DoFormatParameterPath(GetStreamParameterPath(stream_idx, kStreamParameters.topic_name)),
594  "foo"},
595  {TestParameterReader::DoFormatParameterPath(GetStreamParameterPath(stream_idx, kStreamParameters.stream_name)),
596  stream_name},
597  {TestParameterReader::DoFormatParameterPath(GetStreamParameterPath(stream_idx, kStreamParameters.rekognition_data_stream)),
598  "rekognition_data_stream"},
599  {TestParameterReader::DoFormatParameterPath(GetStreamParameterPath(stream_idx, kStreamParameters.rekognition_topic_name)),
600  "rekognition_topic_name"},
601  };
602 
603  TestParameterReader parameter_reader(int_map, bool_map_, string_map, map_map_);
604  Aws::Vector<Model::Record> kinesis_records;
605  StreamDefinitionProviderPartialMock stream_definition_provider;
606  // This takes almost 20 seconds due to call to parent class with no AWS credentials available
607  auto kinesis_client = std::make_unique<NiceMock<KinesisClientMock>>();
608  NiceMock<KinesisClientMock> * kinesis_client_p = kinesis_client.get();
609  KinesisStreamManager stream_manager(&parameter_reader, & stream_definition_provider,
610  & subscription_installer_, std::move(kinesis_client));
611  auto video_producer = std::make_unique<KinesisVideoProducerMock>();
612  auto video_stream_mock = std::make_shared<KinesisVideoStreamMock>();
613 
614  EXPECT_CALL(*video_producer.get(), CreateStreamSyncProxy(_))
615  .WillOnce(Return(video_stream_mock));
616 
617  stream_manager.InitializeVideoProducer(string("us-west-2"), ConstVideoProducerFactory(std::move(video_producer)));
618  EXPECT_CALL(stream_definition_provider, GetCodecPrivateData(_,_,_,_))
619  .WillOnce(Return(KINESIS_MANAGER_STATUS_SUCCESS));
620  EXPECT_CALL(subscription_installer_, Install(_))
621  .WillOnce(Return(KINESIS_MANAGER_STATUS_SUCCESS));
622 
623  auto fetch_status_not_configured = stream_manager.FetchRekognitionResults(stream_name, &kinesis_records);
624 
625  EXPECT_TRUE(KINESIS_MANAGER_STATUS_SUCCEEDED(fetch_status_not_configured));
626 
627  auto setup_status = stream_manager.KinesisVideoStreamerSetup();
628 
629  ASSERT_TRUE(KINESIS_MANAGER_STATUS_SUCCEEDED(setup_status));
630 
631  Model::ListShardsResult list_shards_result;
632  Model::Shard shard1;
633  shard1.SetShardId("shard1Id");
634  list_shards_result.SetShards({shard1});
635  Model::ListShardsOutcome list_shards_outcome(list_shards_result);
636 
637  EXPECT_CALL(*kinesis_client_p, ListShards(_))
638  .WillRepeatedly(Return(list_shards_outcome));
639  Model::GetShardIteratorResult get_shard_iterator_result;
640  get_shard_iterator_result.SetShardIterator("shardIterator");
641  Model::GetShardIteratorOutcome get_shard_iterator_outcome(get_shard_iterator_result);
642  EXPECT_CALL(*kinesis_client_p, GetShardIterator(_))
643  .WillRepeatedly(Return(get_shard_iterator_outcome));
644 
645  Model::Record record1;
646  record1.SetSequenceNumber("seq_number1");
647  Aws::Vector<Model::Record> expected_kinesis_records = {record1};
648  Model::GetRecordsResult get_records_result;
649  get_records_result.SetRecords(expected_kinesis_records);
650  {
651  InSequence get_records_seq;
652 
653  Model::GetRecordsOutcome get_records_outcome_ok(get_records_result);
654  EXPECT_CALL(*kinesis_client_p, GetRecords(_))
655  .WillOnce(Return(get_records_outcome_ok));
656 
657  AWSError<KinesisErrors> get_records_error1(Aws::Kinesis::KinesisErrors::PROVISIONED_THROUGHPUT_EXCEEDED, true);
658  Model::GetRecordsOutcome get_records_outcome_error1(get_records_error1);
659  EXPECT_CALL(*kinesis_client_p, GetRecords(_))
660  .WillOnce(Return(get_records_outcome_error1));
661 
662  AWSError<KinesisErrors> get_records_error2(Aws::Kinesis::KinesisErrors::EXPIRED_ITERATOR, true);
663  Model::GetRecordsOutcome get_records_outcome_error2(get_records_error2);
664  EXPECT_CALL(*kinesis_client_p, GetRecords(_))
665  .WillOnce(Return(get_records_outcome_error2));
666 
667  AWSError<KinesisErrors> get_records_error3(Aws::Kinesis::KinesisErrors::ACCESS_DENIED, true);
668  Model::GetRecordsOutcome get_records_outcome_error3(get_records_error3);
669  EXPECT_CALL(*kinesis_client_p, GetRecords(_))
670  .WillOnce(Return(get_records_outcome_error3));
671  }
672 
673  auto fetch_status = stream_manager.FetchRekognitionResults(stream_name, &kinesis_records);
674 
675  ASSERT_TRUE(KINESIS_MANAGER_STATUS_SUCCEEDED(fetch_status));
676  ASSERT_EQ(expected_kinesis_records, kinesis_records);
677  ASSERT_THAT(kinesis_records, ContainerEq(expected_kinesis_records));
678 
679  fetch_status = stream_manager.FetchRekognitionResults(stream_name, &kinesis_records);
680 
681  ASSERT_TRUE(KINESIS_MANAGER_STATUS_FAILED(fetch_status));
682 
683  fetch_status = stream_manager.FetchRekognitionResults(stream_name, &kinesis_records);
684 
685  ASSERT_TRUE(KINESIS_MANAGER_STATUS_FAILED(fetch_status));
686 
687  fetch_status = stream_manager.FetchRekognitionResults(stream_name, &kinesis_records);
688 
689  ASSERT_TRUE(KINESIS_MANAGER_STATUS_FAILED(fetch_status));
690 }
691 
692 TEST_F(KinesisStreamManagerMockingFixture, mockStreamInitializationTestActualKinesisVideoProducer)
693 {
694  std::unique_ptr<NiceMock<KinesisClientMock>> kinesis_client = std::unique_ptr<NiceMock<KinesisClientMock>>{};
695  KinesisStreamManager stream_manager(parameter_reader_.get(), & stream_definition_provider_,
696  & subscription_installer_, std::move(kinesis_client));
697 
698  /* Before calling InitializeVideoProducer */
699  KinesisManagerStatus status =
700  stream_manager.InitializeVideoStream(move(unique_ptr<StreamDefinition>()));
701  ASSERT_TRUE(KINESIS_MANAGER_STATUS_FAILED(status) &&
703 
704  ASSERT_FALSE(stream_manager.get_video_producer());
705  unique_ptr<StreamDefinition> stream_definition =
706  DefaultProducerSetup(stream_manager, string("us-west-2"), string("stream/test"), parameter_reader_,
707  // this takes almost 20 seconds because an actual client is created without AWS credentials available
708  KinesisStreamManagerInterface::CreateDefaultVideoProducer);
709  ASSERT_TRUE(stream_manager.get_video_producer());
710 
711  /* Video producer has been created but the stream definition is empty. */
712  status = stream_manager.InitializeVideoStream(unique_ptr<StreamDefinition>{});
713  ASSERT_TRUE(KINESIS_MANAGER_STATUS_FAILED(status) &&
715 }
716 
717 TEST_F(KinesisStreamManagerMockingFixture, mockStreamInitializationTestKinesisVideoProducerMock)
718 {
719  KinesisStreamManager stream_manager;
720  ASSERT_FALSE(stream_manager.get_video_producer());
721  auto video_producer = std::make_unique<KinesisVideoProducerMock>();
722  auto video_stream_mock = std::make_shared<KinesisVideoStreamMock>();
723 
724  EXPECT_CALL(*video_producer.get(), CreateStreamSyncProxy(_))
725  .WillOnce(Return(video_stream_mock));
726 
727  auto stream_definition =
728  DefaultProducerSetup(stream_manager, string("us-west-2"), string("stream/test"), parameter_reader_,
729  ConstVideoProducerFactory(std::move(video_producer)));
730  ASSERT_TRUE(stream_manager.get_video_producer());
731 
732  /* Video producer has been created but the stream definition is empty. */
733  KinesisManagerStatus status = stream_manager.InitializeVideoStream(unique_ptr<StreamDefinition>{});
734  EXPECT_TRUE(KINESIS_MANAGER_STATUS_FAILED(status));
735  ASSERT_EQ(KINESIS_MANAGER_STATUS_INVALID_INPUT, status);
736 
737  std::string stream_name = "stream_name1";
738  status = stream_manager.InitializeVideoStream(move(stream_definition));
739  ASSERT_TRUE(KINESIS_MANAGER_STATUS_SUCCEEDED(status));
740 }
741 
743 {
744  KinesisStreamManager stream_manager;
745  auto video_producer = std::make_unique<KinesisVideoProducerMock>();
746  auto video_stream_mock = std::make_shared<KinesisVideoStreamMock>();
747  Frame frame;
748  string stream_name("testStream");
749 
750  EXPECT_CALL(*video_producer.get(), CreateStreamSyncProxy(_))
751  .WillOnce(Return(video_stream_mock));
752 
753  /* Before calling InitializeVideoProducer */
754  KinesisManagerStatus status = stream_manager.PutFrame(stream_name, frame);
755  EXPECT_TRUE(KINESIS_MANAGER_STATUS_FAILED(status));
757 
758  /* Stream name not found (i.e. before calling InitializeVideoStream) */
759  auto stream_definition =
760  DefaultProducerSetup(stream_manager, string("us-west-2"), string("frame/test"),
761  ConstVideoProducerFactory(std::move(video_producer)));
762  status = stream_manager.PutFrame(string(stream_name), frame);
763  EXPECT_TRUE(KINESIS_MANAGER_STATUS_FAILED(status));
765 
766  status = stream_manager.InitializeVideoStream(move(stream_definition));
767  ASSERT_TRUE(KINESIS_MANAGER_STATUS_SUCCEEDED(status));
768 
769  {
770  InSequence video_stream_mock_seq;
771 
772  EXPECT_CALL(*video_stream_mock, IsReady())
773  .WillOnce(Return(false));
774 
775  EXPECT_CALL(*video_stream_mock, IsReady())
776  .WillOnce(Return(true));
777 
778  EXPECT_CALL(*video_stream_mock, PutFrame(_))
779  .WillOnce(Return(false));
780 
781  EXPECT_CALL(*video_stream_mock, IsReady())
782  .WillOnce(Return(true));
783 
784  EXPECT_CALL(*video_stream_mock, PutFrame(_))
785  .WillOnce(Return(true));
786  }
787 
788  // not ready
789  status = stream_manager.PutFrame(stream_name, frame);
790  ASSERT_TRUE(KINESIS_MANAGER_STATUS_FAILED(status) &&
792 
793  // ready but putFrame fails
794  status = stream_manager.PutFrame(stream_name, frame);
795  ASSERT_TRUE(KINESIS_MANAGER_STATUS_FAILED(status));
796 
797  // ready and putFrame ok
798  status = stream_manager.PutFrame(stream_name, frame);
799  ASSERT_TRUE(KINESIS_MANAGER_STATUS_SUCCEEDED(status));
800 }
801 
805 TEST(StreamDefinitionProviderSuite, getCodecPrivateDataTest)
806 {
807  string test_prefix = "some/test/prefix";
808  vector<string> test_prefix_list{"some", "test", "prefix"};
809  Aws::Kinesis::StreamDefinitionProvider stream_definition_provider;
810 
811  string decoded_string = "hello world";
812  string encoded_string = "aGVsbG8gd29ybGQ=";
813  map<string, int> int_map = {};
814  map<string, bool> bool_map = {};
815  map<string, string> tags;
816  map<string, map<string, string>> map_map = {};
817  map<string, string> string_map = {
818  {test_prefix + PARAM_NS_SEPARATOR "codecPrivateData", encoded_string},
819  };
820  TestParameterReader parameter_reader(int_map, bool_map, string_map, map_map);
821 
822  PBYTE codec_private_data;
823  uint32_t codec_private_data_size;
824  ASSERT_TRUE(KINESIS_MANAGER_STATUS_SUCCEEDED(stream_definition_provider.GetCodecPrivateData(
825  ParameterPath(test_prefix_list), parameter_reader, &codec_private_data,
826  &codec_private_data_size)));
827  ASSERT_EQ(decoded_string.length(), codec_private_data_size);
828  ASSERT_TRUE(0 == strncmp(decoded_string.c_str(), (const char *)codec_private_data,
829  codec_private_data_size));
830 
831  /* Invalid input tests */
832  KinesisManagerStatus status = stream_definition_provider.GetCodecPrivateData(
833  ParameterPath(test_prefix_list), parameter_reader, nullptr, &codec_private_data_size);
834  ASSERT_TRUE(KINESIS_MANAGER_STATUS_FAILED(status) &&
836  status = stream_definition_provider.GetCodecPrivateData(
837  ParameterPath(test_prefix_list), parameter_reader, &codec_private_data, nullptr);
838  ASSERT_TRUE(KINESIS_MANAGER_STATUS_FAILED(status) &&
840 
841  /* Empty input */
842  string_map = {};
843  TestParameterReader empty_parameter_reader(int_map, bool_map, string_map, map_map);
844  codec_private_data = nullptr;
845  ASSERT_TRUE(KINESIS_MANAGER_STATUS_SUCCEEDED(stream_definition_provider.GetCodecPrivateData(
846  ParameterPath(test_prefix_list), empty_parameter_reader, &codec_private_data,
847  &codec_private_data_size)) && !codec_private_data);
848 
849  /* Dependency failure */
850  string_map = {
851  {test_prefix + PARAM_NS_SEPARATOR "codecPrivateData", "1"},
852  };
853  TestParameterReader parameter_reader_with_invalid_values(int_map, bool_map, string_map, map_map);
854  status = stream_definition_provider.GetCodecPrivateData(ParameterPath(test_prefix_list),
855  parameter_reader_with_invalid_values, &codec_private_data, &codec_private_data_size);
856  ASSERT_TRUE(KINESIS_MANAGER_STATUS_FAILED(status) &&
858 }
859 
864 TEST(StreamDefinitionProviderSuite, getStreamDefinitionTest)
865 {
866  string test_prefix = "some/test/prefix";
867  vector<string> test_prefix_list{"some", "test", "prefix"};
868  Aws::Kinesis::StreamDefinitionProvider stream_definition_provider;
869 
870  TestParameterReader parameter_reader = TestParameterReader(test_prefix);
871  map<string, string> string_map = parameter_reader.string_map_;
872  map<string, bool> bool_map = parameter_reader.bool_map_;
873  map<string, int> int_map = parameter_reader.int_map_;
874  map<string, map<string, string>> map_map = parameter_reader.map_map_;
875 
876  unique_ptr<StreamDefinition> generated_stream_definition =
877  stream_definition_provider.GetStreamDefinition(ParameterPath(test_prefix_list),
878  parameter_reader, nullptr, 0);
879  auto equivalent_stream_definition = make_unique<StreamDefinition>(
880  string_map[test_prefix + PARAM_NS_SEPARATOR "stream_name"],
881  hours(int_map[test_prefix + PARAM_NS_SEPARATOR "retention_period"]),
882  &map_map[test_prefix + PARAM_NS_SEPARATOR "tags"],
883  string_map[test_prefix + PARAM_NS_SEPARATOR "kms_key_id"],
884  static_cast<STREAMING_TYPE>(int_map[test_prefix + PARAM_NS_SEPARATOR "streaming_type"]),
885  string_map[test_prefix + PARAM_NS_SEPARATOR "content_type"],
886  milliseconds(int_map[test_prefix + PARAM_NS_SEPARATOR "max_latency"]),
887  seconds(int_map[test_prefix + PARAM_NS_SEPARATOR "fragment_duration"]),
888  milliseconds(int_map[test_prefix + PARAM_NS_SEPARATOR "timecode_scale"]),
889  bool_map[test_prefix + PARAM_NS_SEPARATOR "key_frame_fragmentation"],
890  bool_map[test_prefix + PARAM_NS_SEPARATOR "frame_timecodes"],
891  bool_map[test_prefix + PARAM_NS_SEPARATOR "absolute_fragment_time"],
892  bool_map[test_prefix + PARAM_NS_SEPARATOR "fragment_acks"],
893  bool_map[test_prefix + PARAM_NS_SEPARATOR "restart_on_error"],
894  bool_map[test_prefix + PARAM_NS_SEPARATOR "recalculate_metrics"],
895  static_cast<NAL_ADAPTATION_FLAGS>(int_map[test_prefix + PARAM_NS_SEPARATOR "nal_adaptation_flags"]),
896  int_map[test_prefix + PARAM_NS_SEPARATOR "frame_rate"],
897  int_map[test_prefix + PARAM_NS_SEPARATOR "avg_bandwidth_bps"],
898  seconds(int_map[test_prefix + PARAM_NS_SEPARATOR "buffer_duration"]),
899  seconds(int_map[test_prefix + PARAM_NS_SEPARATOR "replay_duration"]),
900  seconds(int_map[test_prefix + PARAM_NS_SEPARATOR "connection_staleness"]),
901  string_map[test_prefix + PARAM_NS_SEPARATOR "codec_id"],
902  string_map[test_prefix + PARAM_NS_SEPARATOR "track_name"], nullptr, 0);
903  ASSERT_TRUE(
904  are_streams_equivalent(move(equivalent_stream_definition), move(generated_stream_definition)));
905 
906  auto different_stream_definition = make_unique<StreamDefinition>(
907  string_map[test_prefix + PARAM_NS_SEPARATOR "stream_name"],
908  hours(int_map[test_prefix + PARAM_NS_SEPARATOR "retention_period"]),
909  &map_map[test_prefix + PARAM_NS_SEPARATOR "tags"],
910  string_map[test_prefix + PARAM_NS_SEPARATOR "kms_key_id"],
911  static_cast<STREAMING_TYPE>(int_map[test_prefix + PARAM_NS_SEPARATOR "streaming_type"]),
912  string_map[test_prefix + PARAM_NS_SEPARATOR "content_type"],
913  milliseconds(int_map[test_prefix + PARAM_NS_SEPARATOR "max_latency"]),
914  seconds(int_map[test_prefix + PARAM_NS_SEPARATOR "fragment_duration"]),
915  milliseconds(int_map[test_prefix + PARAM_NS_SEPARATOR "timecode_scale"]),
916  bool_map[test_prefix + PARAM_NS_SEPARATOR "key_frame_fragmentation"],
917  bool_map[test_prefix + PARAM_NS_SEPARATOR "frame_timecodes"],
918  bool_map[test_prefix + PARAM_NS_SEPARATOR "absolute_fragment_time"],
919  bool_map[test_prefix + PARAM_NS_SEPARATOR "fragment_acks"],
920  bool_map[test_prefix + PARAM_NS_SEPARATOR "restart_on_error"],
921  bool_map[test_prefix + PARAM_NS_SEPARATOR "recalculate_metrics"],
922  static_cast<NAL_ADAPTATION_FLAGS>(int_map[test_prefix + PARAM_NS_SEPARATOR "nal_adaptation_flags"]), 4914918,
923  int_map[test_prefix + PARAM_NS_SEPARATOR "avg_bandwidth_bps"],
924  seconds(int_map[test_prefix + PARAM_NS_SEPARATOR "buffer_duration"]),
925  seconds(int_map[test_prefix + PARAM_NS_SEPARATOR "replay_duration"]),
926  seconds(int_map[test_prefix + PARAM_NS_SEPARATOR "connection_staleness"]),
927  string_map[test_prefix + PARAM_NS_SEPARATOR "codec_id"],
928  string_map[test_prefix + PARAM_NS_SEPARATOR "track_name"], nullptr, 0);
929  generated_stream_definition = stream_definition_provider.GetStreamDefinition(
930  ParameterPath(test_prefix_list), parameter_reader, nullptr, 0);
931  ASSERT_FALSE(
932  are_streams_equivalent(move(different_stream_definition), move(generated_stream_definition)));
933 
934  /* Invalid input tests */
935  generated_stream_definition = stream_definition_provider.GetStreamDefinition(
936  ParameterPath(test_prefix_list), parameter_reader, nullptr, 100);
937  ASSERT_FALSE(generated_stream_definition);
938 }
939 
943 unique_ptr<StreamDefinition> DefaultProducerSetup(
944  Aws::Kinesis::KinesisStreamManager & stream_manager, string region, const std::vector<std::string> & test_prefix)
945 {
946 #ifdef PLATFORM_TESTING_ACCESS_KEY
947  setenv("AWS_ACCESS_KEY_ID", PLATFORM_TESTING_ACCESS_KEY, 1);
948 #endif
949 #ifdef PLATFORM_TESTING_SECRET_KEY
950  setenv("AWS_SECRET_ACCESS_KEY", PLATFORM_TESTING_SECRET_KEY, 1);
951 #endif
952  stream_manager.InitializeVideoProducer(region);
953 
954  Aws::Kinesis::StreamDefinitionProvider stream_definition_provider;
955  TestParameterReader parameter_reader = TestParameterReader(test_prefix);
956  unique_ptr<StreamDefinition> stream_definition = stream_definition_provider.GetStreamDefinition(
957  ParameterPath(test_prefix), parameter_reader, nullptr, 0);
958  return move(stream_definition);
959 }
960 
964 TEST(KinesisStreamManagerSuite, videoInitializationTest)
965 {
966  string test_prefix = "some/test/prefix";
967  Aws::Kinesis::KinesisStreamManager stream_manager;
968 
969  /* Empty region */
970  KinesisManagerStatus status = stream_manager.InitializeVideoProducer("");
971  ASSERT_TRUE(KINESIS_MANAGER_STATUS_FAILED(status) &&
973 
974  /* Non empty region, invalid callback/info providers */
975  status = stream_manager.InitializeVideoProducer("us-west-2", nullptr, nullptr, nullptr, nullptr);
976  ASSERT_TRUE(KINESIS_MANAGER_STATUS_FAILED(status) &&
978 
979  status = stream_manager.InitializeVideoProducer("us-west-2");
980  ASSERT_TRUE(KINESIS_MANAGER_STATUS_SUCCEEDED(status));
981  ASSERT_TRUE(stream_manager.get_video_producer());
982 
983  /* Duplicate initialization */
984  auto video_producer = stream_manager.get_video_producer();
985  status = stream_manager.InitializeVideoProducer("us-west-2");
986  ASSERT_TRUE(KINESIS_MANAGER_STATUS_FAILED(status) &&
988  auto video_producer_post_call = stream_manager.get_video_producer();
989  ASSERT_EQ(video_producer, video_producer_post_call);
990 }
991 
992 #ifdef BUILD_AWS_TESTING
993 // the following tests perform AWS API calls and require user confiugration
994 // to enable them run: colcon build --cmake-args -DBUILD_AWS_TESTING=1
995 
1000 TEST(KinesisStreamManagerSuite, streamInitializationTest)
1001 {
1002  Aws::Kinesis::KinesisStreamManager stream_manager;
1003  /* Before calling InitializeVideoProducer */
1004  KinesisManagerStatus status =
1005  stream_manager.InitializeVideoStream(move(unique_ptr<StreamDefinition>()));
1006  ASSERT_TRUE(KINESIS_MANAGER_STATUS_FAILED(status) &&
1008 
1009  ASSERT_FALSE(stream_manager.get_video_producer());
1010  unique_ptr<StreamDefinition> stream_definition =
1011  DefaultProducerSetup(stream_manager, string("us-west-2"), string("stream/test"));
1012  ASSERT_TRUE(stream_manager.get_video_producer());
1013 
1014  /* Video producer has been created but the stream definition is empty. */
1015  status = stream_manager.InitializeVideoStream(unique_ptr<StreamDefinition>{});
1016  ASSERT_TRUE(KINESIS_MANAGER_STATUS_FAILED(status) &&
1018 
1019  status = stream_manager.InitializeVideoStream(move(stream_definition));
1020  ASSERT_TRUE(KINESIS_MANAGER_STATUS_SUCCEEDED(status));
1021 }
1022 
1027 TEST(KinesisStreamManagerSuite, putFrameTest)
1028 {
1029  Aws::Kinesis::KinesisStreamManager stream_manager;
1030  Frame frame;
1031  string stream_name("testStream");
1032  /* Before calling InitializeVideoProducer */
1033  KinesisManagerStatus status = stream_manager.PutFrame(stream_name, frame);
1034  ASSERT_TRUE(KINESIS_MANAGER_STATUS_FAILED(status) &&
1036 
1037  /* Stream name not found (i.e. before calling InitializeVideoStream) */
1038  unique_ptr<StreamDefinition> stream_definition =
1039  DefaultProducerSetup(stream_manager, string("us-west-2"), string("frame/test"));
1040  status = stream_manager.PutFrame(string(stream_name), frame);
1041  ASSERT_TRUE(KINESIS_MANAGER_STATUS_FAILED(status) &&
1043 
1044  status = stream_manager.InitializeVideoStream(move(stream_definition));
1045  ASSERT_TRUE(KINESIS_MANAGER_STATUS_SUCCEEDED(status));
1046 
1047  /* Invalid frame */
1048  frame.size = 0;
1049  status = stream_manager.PutFrame(stream_name, frame);
1050  ASSERT_TRUE(KINESIS_MANAGER_STATUS_FAILED(status) &&
1052 
1053  /* Valid (but dummy) frame */
1054  frame.size = 4;
1055  std::vector<uint8_t> bytes = {0x00, 0x01, 0x02, 0x03};
1056  frame.frameData = reinterpret_cast<PBYTE>((void *)(bytes.data()));
1057  frame.duration = 5000000;
1058  frame.index = 1;
1059  UINT64 timestamp = 0;
1060  timestamp = std::chrono::duration_cast<std::chrono::nanoseconds>(
1061  std::chrono::system_clock::now().time_since_epoch())
1062  .count() /
1063  DEFAULT_TIME_UNIT_IN_NANOS;
1064  frame.decodingTs = timestamp;
1065  frame.presentationTs = timestamp;
1066  frame.flags = (FRAME_FLAGS)0;
1067 
1068  status = stream_manager.PutFrame(stream_name, frame);
1069  ASSERT_TRUE(KINESIS_MANAGER_STATUS_SUCCEEDED(status));
1070 }
1071 #endif
1072 
1073 int main(int argc, char ** argv)
1074 {
1075  LOG_CONFIGURE_STDOUT("ERROR");
1076  Aws::SDKOptions options;
1077  Aws::InitAPI(options);
1078  testing::InitGoogleTest(&argc, argv);
1079  return RUN_ALL_TESTS();
1080 }
std::shared_ptr< KinesisVideoStreamInterface > CreateStreamSync(std::unique_ptr< StreamDefinition > stream_definition)
TestParameterReader(const vector< string > &test_prefix)
KinesisStreamManagerInterface::VideoProducerFactory ConstVideoProducerFactory(unique_ptr< KinesisVideoProducerInterface > video_producer)
std::shared_ptr< ParameterReaderInterface > parameter_reader_
#define KINESIS_MANAGER_STATUS_FAILED(status)
Definition: common.h:19
AwsError ReadParam(const ParameterPath &param_path, map< string, string > &out) const
KinesisManagerStatus PutFrame(std::string stream_name, Frame &frame) const override
enum Aws::Kinesis::kinesis_manager_status_e KinesisManagerStatus
StreamDefinitionProvider stream_definition_provider_
TestParameterReader(map< string, int > int_map, map< string, bool > bool_map, map< string, string > string_map, map< string, map< string, string >> map_map)
static bool are_streams_equivalent(unique_ptr< StreamDefinition > stream1, unique_ptr< StreamDefinition > stream2)
KinesisManagerStatus InitializeVideoProducer(std::string region, std::unique_ptr< com::amazonaws::kinesis::video::DeviceInfoProvider > device_info_provider, std::unique_ptr< com::amazonaws::kinesis::video::ClientCallbackProvider > client_callback_provider, std::unique_ptr< com::amazonaws::kinesis::video::StreamCallbackProvider > stream_callback_provider, std::unique_ptr< com::amazonaws::kinesis::video::CredentialProvider > credential_provider, KinesisStreamManagerInterface::VideoProducerFactory video_producer_factory=KinesisStreamManagerInterface::CreateDefaultVideoProducer) override
void FreeStream(std::string stream_name) override
#define KINESIS_MANAGER_STATUS_SUCCEEDED(status)
Definition: common.h:18
virtual KinesisManagerStatus GetCodecPrivateData(const Aws::Client::ParameterPath &prefix, const Aws::Client::ParameterReaderInterface &reader, PBYTE *out_codec_private_data, uint32_t *out_codec_private_data_size) const
unique_ptr< StreamDefinition > DefaultProducerSetup(KinesisStreamManager &stream_manager, string region, string test_prefix, std::shared_ptr< ParameterReaderInterface > parameter_reader, KinesisStreamManagerInterface::VideoProducerFactory video_producer_factory)
virtual std::unique_ptr< com::amazonaws::kinesis::video::StreamDefinition > GetStreamDefinition(const Aws::Client::ParameterPath &prefix, const Aws::Client::ParameterReaderInterface &reader, const PBYTE codec_private_data, uint32_t codec_private_data_size) const
AwsError ReadParam(const ParameterPath &param_path, string &out) const
KinesisManagerStatus KinesisVideoStreamerSetup() override
map< string, map< string, string > > map_map_
const char * topic_name
Definition: common.h:76
std::function< std::unique_ptr< KinesisVideoProducerInterface >(std::string, std::unique_ptr< com::amazonaws::kinesis::video::DeviceInfoProvider >, std::unique_ptr< com::amazonaws::kinesis::video::ClientCallbackProvider >, std::unique_ptr< com::amazonaws::kinesis::video::StreamCallbackProvider >, std::unique_ptr< com::amazonaws::kinesis::video::CredentialProvider >)> VideoProducerFactory
LOGGER_TAG("aws.kinesis.kinesis_manager_unittest")
TEST(StreamDefinitionProviderSuite, getCodecPrivateDataTest)
KinesisManagerStatus ProcessCodecPrivateDataForStream(const std::string &stream_name, std::vector< uint8_t > codec_private_data) override
static string DoFormatParameterPath(const ParameterPath &param_path)
const char * stream_name
Definition: common.h:80
map< string, string > string_map_
TestParameterReader(string test_prefix)
int main(int argc, char **argv)
std::string get_resolved_path(char node_namespace_separator, char parameter_namespace_separator) const
AWS_ERR_EMPTY
map< string, bool > bool_map_
#define PARAM_NS_SEPARATOR_CHAR
Aws::Client::ParameterPath GetStreamParameterPath(int stream_idx, const char *parameter_name)
#define PARAM_NS_SEPARATOR
AwsError ReadParam(const ParameterPath &param_path, std::vector< std::string > &out) const
string FormatParameterPath(const ParameterPath &param_path) const
AWS_ERR_NOT_FOUND
Aws::Client::ParameterPath GetKinesisVideoParameter(const char *parameter_name)
KinesisVideoProducerInterface * get_video_producer()
TEST_F(KinesisStreamManagerMockingFixture, testPutMetadataNotInitialized)
string Join(const vector< string > &test_prefix)
map< string, int > int_map_
AwsError ReadParam(const ParameterPath &param_path, double &out) const
unique_ptr< StreamDefinition > GetStreamDefinition(const ParameterPath &prefix, const ParameterReaderInterface &reader, const PBYTE codec_private_data, uint32_t codec_private_data_size) const override
StreamSubscriptionInstallerMock subscription_installer_
KinesisManagerStatus PutMetadata(std::string stream_name, const std::string &name, const std::string &value) const override
const char * prefix
Definition: common.h:73
KinesisManagerStatus FetchRekognitionResults(const std::string &stream_name, Aws::Vector< Model::Record > *records) override
AwsError ReadParam(const ParameterPath &param_path, int &out) const
AWS_ERR_OK
AwsError ReadParam(const ParameterPath &param_path, Aws::String &out) const
AwsError ReadParam(const ParameterPath &param_path, bool &out) const
KinesisManagerStatus InitializeVideoStream(std::unique_ptr< com::amazonaws::kinesis::video::StreamDefinition > stream_definition) override
bool operator==(const Record &left, const Record &right)


kinesis_manager
Author(s): AWS RoboMaker
autogenerated on Thu Mar 4 2021 03:28:41