kinesis_stream_manager.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License").
5  * You may not use this file except in compliance with the License.
6  * A copy of the License is located at
7  *
8  * http://aws.amazon.com/apache2.0
9  *
10  * or in the "license" file accompanying this file. This file is distributed
11  * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12  * express or implied. See the License for the specific language governing
13  * permissions and limitations under the License.
14  */
15 #include <aws/core/utils/logging/LogMacros.h>
16 #include <aws/kinesis/model/GetShardIteratorRequest.h>
17 #include <aws/kinesis/model/ListShardsRequest.h>
18 #include <aws/kinesis/model/ListShardsResult.h>
19 #include <kinesis-video-producer/ClientCallbackProvider.h>
20 #include <kinesis-video-producer/KinesisVideoProducer.h>
25 
26 using namespace com::amazonaws::kinesis::video;
27 using namespace Aws::Utils::Logging;
28 
29 
30 namespace Aws {
31 namespace Kinesis {
32 
33 using namespace std;
34 
35 KinesisManagerStatus KinesisStreamManagerInterface::KinesisVideoStreamSetup(
36  const uint16_t stream_idx, const PBYTE codec_private_data, const uint32_t codec_private_data_size,
37  std::string * stream_name)
38 {
40  unique_ptr<StreamDefinition> stream_definition = stream_definition_provider_->GetStreamDefinition(
41  GetStreamParameterPrefix(stream_idx), *parameter_reader_, codec_private_data, codec_private_data_size);
42  if (!stream_definition) {
43  AWS_LOGSTREAM_ERROR(__func__, "Skipping stream id "
44  << stream_idx << " due to failure to load stream definition.");
46  }
47  /* Stream initialization */
48  if (nullptr != stream_name) {
49  *stream_name = stream_definition->getStreamName();
50  }
51  status = InitializeVideoStream(std::move(stream_definition));
52  if (KINESIS_MANAGER_STATUS_FAILED(status)) {
53  AWS_LOGSTREAM_ERROR(
54  __func__, "Skipping stream id "
55  << stream_idx << " due to failure initializing stream. Error code: " << status);
56  }
57  return status;
58 }
59 
60 KinesisManagerStatus KinesisStreamManagerInterface::GenerateStreamSubscriptionDescriptor(
61  int stream_idx, StreamSubscriptionDescriptor & descriptor)
62 {
64  int param_status = AWS_ERR_OK;
65  param_status |= parameter_reader_->ReadParam(
66  GetStreamParameterPath(stream_idx, kStreamParameters.topic_name),
67  descriptor.topic_name);
68  param_status |= parameter_reader_->ReadParam(
69  GetStreamParameterPath(stream_idx, kStreamParameters.stream_name),
70  descriptor.stream_name);
71  param_status |= parameter_reader_->ReadParam(
72  GetStreamParameterPath(stream_idx, kStreamParameters.topic_type),
73  descriptor.input_type);
74  if (AWS_ERR_OK != param_status) {
75  AWS_LOGSTREAM_ERROR(__func__, "Missing parameters - can't construct descriptor (topic: "
76  << descriptor.topic_name
77  << " stream: " << descriptor.stream_name
78  << " type: " << descriptor.input_type << ") " << param_status);
80  }
81  /* Rekognition data stream and topic name - one cannot be provided without the other */
82  AwsError data_stream_read_result = parameter_reader_->ReadParam(
83  GetStreamParameterPath(stream_idx, kStreamParameters.rekognition_data_stream),
84  descriptor.rekognition_data_stream);
85  AwsError rekognition_topic_read_result = parameter_reader_->ReadParam(
86  GetStreamParameterPath(stream_idx, kStreamParameters.rekognition_topic_name),
87  descriptor.rekognition_topic_name);
88  if (data_stream_read_result != rekognition_topic_read_result ||
89  (data_stream_read_result != AWS_ERR_OK && data_stream_read_result != AWS_ERR_NOT_FOUND)) {
90  AWS_LOGSTREAM_ERROR(
91  __func__, "Invalid input: error reading parameters for AWS Rekognition support (data stream: "
92  << descriptor.rekognition_data_stream << " code: " << data_stream_read_result
93  << " Rekognition topic: " << descriptor.rekognition_topic_name
94  << " code: " << rekognition_topic_read_result << ")");
96  }
98  int message_queue_size_input;
99  if (AWS_ERR_OK ==
100  parameter_reader_->ReadParam(
101  GetStreamParameterPath(stream_idx, kStreamParameters.message_queue_size),
102  message_queue_size_input)) {
103  if (0 > message_queue_size_input) {
104  AWS_LOGSTREAM_WARN(__func__, descriptor.stream_name << " Message queue size provided ("
105  << message_queue_size_input << ")"
106  << "is invalid. Using the default of "
107  << message_queue_size);
108  } else {
109  message_queue_size = static_cast<uint32_t>(message_queue_size_input);
110  }
111  }
113  return status;
114 }
115 
116 KinesisManagerStatus KinesisStreamManagerInterface::KinesisVideoStreamerSetup()
117 {
119  int video_stream_count = 0;
120  parameter_reader_->ReadParam(GetKinesisVideoParameter(kStreamParameters.stream_count),
121  video_stream_count);
122  if (0 >= video_stream_count) {
123  AWS_LOGSTREAM_WARN(__func__, "Stream count " << video_stream_count << " is invalid. Aborting");
124  }
125  for (int stream_idx = 0; stream_idx < video_stream_count; stream_idx++) {
126  /* Load stream definition */
127  PBYTE codec_private_data = nullptr;
128  uint32_t codec_private_data_size = 0;
129  KinesisManagerStatus get_codec_private_data_result =
130  stream_definition_provider_->GetCodecPrivateData(GetStreamParameterPrefix(stream_idx),
131  *parameter_reader_, &codec_private_data,
132  &codec_private_data_size);
133  if (KINESIS_MANAGER_STATUS_FAILED(get_codec_private_data_result)) {
134  AWS_LOGSTREAM_ERROR(__func__, "Skipping stream id "
135  << stream_idx
136  << " due to failure to load codec private data. Error code: "
137  << get_codec_private_data_result);
138  continue;
139  }
140  if (KINESIS_MANAGER_STATUS_FAILED(KinesisVideoStreamSetup(stream_idx, codec_private_data,
141  codec_private_data_size, nullptr))) {
142  SAFE_MEMFREE(codec_private_data);
143  continue;
144  }
145  /* Subscribe to the specified topic */
146  StreamSubscriptionDescriptor descriptor;
148  GenerateStreamSubscriptionDescriptor(stream_idx, descriptor))) {
149  FreeStream(descriptor.stream_name);
150  SAFE_MEMFREE(codec_private_data);
151  continue;
152  }
153  KinesisManagerStatus subscription_installation_result =
154  InitializeStreamSubscription(descriptor);
155  if (KINESIS_MANAGER_STATUS_FAILED(subscription_installation_result)) {
156  AWS_LOGSTREAM_ERROR(__func__, "Failed to subscribe to '"
157  << descriptor.topic_name << "' for stream '"
158  << descriptor.stream_name
159  << "'. Error code: " << subscription_installation_result);
160  FreeStream(descriptor.stream_name);
161  SAFE_MEMFREE(codec_private_data);
162  continue;
163  }
165  }
166  return status;
167 }
168 
169 unique_ptr<KinesisVideoProducerInterface> KinesisStreamManagerInterface::CreateDefaultVideoProducer(
170  std::string region,
171  unique_ptr<com::amazonaws::kinesis::video::DeviceInfoProvider> device_info_provider,
172  unique_ptr<com::amazonaws::kinesis::video::ClientCallbackProvider> client_callback_provider,
173  unique_ptr<com::amazonaws::kinesis::video::StreamCallbackProvider> stream_callback_provider,
174  unique_ptr<com::amazonaws::kinesis::video::CredentialProvider> credential_provider)
175 {
176  return std::make_unique<KinesisVideoProducerImpl>(KinesisVideoProducer::createSync(
177  std::move(device_info_provider), std::move(client_callback_provider),
178  std::move(stream_callback_provider), std::move(credential_provider), region));
179 }
180 
181 KinesisManagerStatus KinesisStreamManager::InitializeStreamSubscription(
182  const StreamSubscriptionDescriptor & descriptor)
183 {
184  KinesisManagerStatus status = subscription_installer_->Install(descriptor);
185  if (KINESIS_MANAGER_STATUS_SUCCEEDED(status) && !descriptor.rekognition_data_stream.empty()) {
186  RekognitionStreamInfo rekognition_info{
187  .data_stream_name = Aws::String(descriptor.rekognition_data_stream.c_str())};
188  rekognition_config_.insert({descriptor.stream_name, rekognition_info});
189  }
190  return status;
191 }
192 
193 KinesisManagerStatus KinesisStreamManager::InitializeVideoProducer(
194  std::string region, unique_ptr<DeviceInfoProvider> device_info_provider,
195  unique_ptr<ClientCallbackProvider> client_callback_provider,
196  unique_ptr<StreamCallbackProvider> stream_callback_provider,
197  unique_ptr<CredentialProvider> credential_provider,
199 {
200  if (video_producer_) {
202  }
203  if (region.empty()) {
204  AWS_LOG_ERROR(__func__,
205  "Region not provided. Check that the config file is correct and was loaded properly.");
207  }
208  if (!device_info_provider || !client_callback_provider || !stream_callback_provider || !credential_provider) {
210  }
211  video_producer_ = video_producer_factory(region, std::move(device_info_provider), std::move(client_callback_provider),
212  std::move(stream_callback_provider), std::move(credential_provider));
214 }
215 
216 KinesisManagerStatus KinesisStreamManager::InitializeVideoProducer(std::string region,
218 {
219  unique_ptr<DeviceInfoProvider> device_provider = make_unique<DefaultDeviceInfoProvider>();
220  unique_ptr<ClientCallbackProvider> client_callback_provider =
221  make_unique<DefaultClientCallbackProvider>();
222  unique_ptr<StreamCallbackProvider> stream_callback_provider =
223  make_unique<DefaultStreamCallbackProvider>();
224  unique_ptr<CredentialProvider> credentials_provider =
225  std::make_unique<ProducerSdkAWSCredentialsProvider>();
226  if (!credentials_provider) {
227  AWS_LOG_ERROR(__func__,
228  "Credential provider is invalid, have you set the environment variables required "
229  "for AWS access?");
231  }
232  return InitializeVideoProducer(
233  region, std::move(device_provider), std::move(client_callback_provider),
234  std::move(stream_callback_provider), std::move(credentials_provider),
235  video_producer_factory);
236 }
237 
238 KinesisManagerStatus KinesisStreamManager::InitializeVideoStream(
239  unique_ptr<StreamDefinition> stream_definition)
240 {
241  if (!video_producer_) {
243  }
244  if (!stream_definition) {
246  }
247  string stream_name = stream_definition->getStreamName();
248  if (stream_name.empty()) {
250  }
251  if (video_streams_.count(stream_name) > 0) {
253  }
254 
255  StreamInfo stream_info = stream_definition->getStreamInfo();
256  shared_ptr<KinesisVideoStreamInterface> stream;
257  try {
258  stream = video_producer_->CreateStreamSync(std::move(stream_definition));
259  } catch (const std::runtime_error & e) {
260  stream = nullptr;
261  }
262 
263  if (stream) {
264  video_streams_.insert({stream_name, stream});
265  if (0 < stream_info.streamCaps.trackInfoList[0].codecPrivateDataSize) {
266  std::vector<uint8_t> codec_private_data;
267  codec_private_data.assign(
268  stream_info.streamCaps.trackInfoList[0].codecPrivateData,
269  stream_info.streamCaps.trackInfoList[0].codecPrivateData
270  + stream_info.streamCaps.trackInfoList[0].codecPrivateDataSize);
271  video_streams_codec_data_.insert({stream_name, codec_private_data});
272  }
274  } else {
276  }
277 };
278 
279 KinesisManagerStatus KinesisStreamManager::PutFrame(std::string stream_name, Frame & frame) const
280 {
281  if (!video_producer_) {
283  }
284  if (0 == video_streams_.count(stream_name)) {
286  }
287  if (!video_streams_.at(stream_name)->IsReady()) {
288  AWS_LOG_WARN(__func__, "Stream not ready yet, skipping putFrame.");
290  }
291  bool result = video_streams_.at(stream_name)->PutFrame(frame);
293 };
294 
295 KinesisManagerStatus KinesisStreamManager::PutMetadata(std::string stream_name,
296  const std::string & name,
297  const std::string & value) const
298 {
299  if (!video_producer_) {
301  }
302  if (0 == video_streams_.count(stream_name)) {
304  }
305  if (!video_streams_.at(stream_name)->IsReady()) {
306  AWS_LOG_WARN(__func__, "Stream not ready yet, skipping putFragmentMetadata.");
308  }
309  bool result = video_streams_.at(stream_name)->PutFragmentMetadata(name, value, false);
311 };
312 
313 void KinesisStreamManager::FreeStream(std::string stream_name)
314 {
315  if (video_producer_ && video_streams_.count(stream_name) > 0) {
316  if (video_streams_.at(stream_name)->IsReady()) {
317  video_streams_.at(stream_name)->Stop();
318  }
319  video_producer_->FreeStream(video_streams_.at(stream_name));
320  video_streams_.erase(stream_name);
321  }
322 }
323 
324 KinesisManagerStatus KinesisStreamManager::ProcessCodecPrivateDataForStream(
325  const std::string & stream_name, std::vector<uint8_t> codec_private_data)
326 {
327  if (0 < video_streams_codec_data_.count(stream_name) &&
328  video_streams_codec_data_.at(stream_name) == codec_private_data) {
329  /* Codec data is already up to date */
331  }
332  AWS_LOGSTREAM_INFO(__func__, "Updating new codec data for " << stream_name);
333  /* Get stream configuration ID */
334  int video_stream_count = 0, stream_idx = 0;
336  parameter_reader_->ReadParam(GetKinesisVideoParameter(kStreamParameters.stream_count),
337  video_stream_count);
338  for (int stream_idx = 0; stream_idx < video_stream_count; stream_idx++) {
339  std::string configured_stream_name;
340  parameter_reader_->ReadParam(
341  GetStreamParameterPath(stream_idx, kStreamParameters.stream_name),
342  configured_stream_name);
343  if (configured_stream_name == stream_name) {
345  break;
346  }
347  }
348  if (KINESIS_MANAGER_STATUS_FAILED(status)) {
349  return status;
350  }
351  /* Re-create the stream with the new codec data */
352  FreeStream(stream_name);
353  status = KinesisVideoStreamSetup(stream_idx, codec_private_data.data(), codec_private_data.size(),
354  nullptr);
355  if (KINESIS_MANAGER_STATUS_FAILED(status)) {
356  /* At this point we have an active subscription without the ability to stream data; need to
357  * unsubscribe */
358  std::string topic_name;
359  parameter_reader_->ReadParam(
360  GetStreamParameterPath(stream_idx, kStreamParameters.topic_name), topic_name);
361  AWS_LOGSTREAM_ERROR(__func__, "KinesisVideoStreamSetup failed, uninstalling subscriptions to "
362  << topic_name << " Error code: " << status);
363  subscription_installer_->Uninstall(topic_name);
364  }
365  return status;
366 }
367 
368 KinesisManagerStatus KinesisStreamManager::UpdateShardIterator(const std::string & stream_name)
369 {
370  if (!rekognition_config_.at(stream_name).shard_iterator.empty()) {
371  /* Already loaded */
373  }
374  /* First ListShards, then GetShardIterator */
375  Model::ListShardsRequest list_shards_request;
376  list_shards_request.SetStreamName(rekognition_config_.at(stream_name).data_stream_name);
377  list_shards_request.SetMaxResults(1);
378  auto list_shards_outcome = kinesis_client_->ListShards(list_shards_request);
379  if (!list_shards_outcome.IsSuccess()) {
380  AWS_LOGSTREAM_ERROR(__func__,
381  "ListShards failed with code "
382  << static_cast<int>(list_shards_outcome.GetError().GetErrorType()) << ": "
383  << list_shards_outcome.GetError().GetMessage());
385  }
386  if (list_shards_outcome.GetResult().GetShards().empty()) {
387  AWS_LOG_ERROR(__func__, "ListShards: no shards available");
389  }
390 
391  Aws::String shard_id = list_shards_outcome.GetResult().GetShards().front().GetShardId();
392  Model::GetShardIteratorRequest get_shard_iterator_request;
393  get_shard_iterator_request.SetStreamName(rekognition_config_.at(stream_name).data_stream_name);
394  get_shard_iterator_request.SetShardId(shard_id);
395  get_shard_iterator_request.SetShardIteratorType(Model::ShardIteratorType::LATEST);
396  auto get_shard_iterator_outcome = kinesis_client_->GetShardIterator(get_shard_iterator_request);
397  if (!get_shard_iterator_outcome.IsSuccess()) {
398  AWS_LOGSTREAM_ERROR(
399  __func__, "GetShardIterator failed with code "
400  << static_cast<int>(get_shard_iterator_outcome.GetError().GetErrorType()) << ": "
401  << get_shard_iterator_outcome.GetError().GetMessage());
403  }
404  rekognition_config_.at(stream_name).shard_iterator =
405  get_shard_iterator_outcome.GetResult().GetShardIterator();
407 }
408 
409 KinesisManagerStatus KinesisStreamManager::FetchRekognitionResults(
410  const std::string & stream_name, Aws::Vector<Model::Record> * records)
411 {
413  if (0 == rekognition_config_.count(stream_name)) {
414  AWS_LOG_WARN(__func__, "AWS Rekognition configuration is missing for this stream. Skipping");
415  return status;
416  }
417  if (rekognition_config_.at(stream_name).shard_iterator.empty()) {
418  status = UpdateShardIterator(stream_name);
419  if (KINESIS_MANAGER_STATUS_FAILED(status)) {
420  return status;
421  }
422  }
423  Model::GetRecordsRequest get_records_request;
424  get_records_request.SetShardIterator(rekognition_config_.at(stream_name).shard_iterator);
425  get_records_request.SetLimit(kDefaultRecordsLimitForRekognitionResults);
426  auto get_records_outcome = kinesis_client_->GetRecords(get_records_request);
427  if (get_records_outcome.IsSuccess()) {
428  rekognition_config_.at(stream_name).shard_iterator =
429  get_records_outcome.GetResult().GetNextShardIterator();
430  *records = get_records_outcome.GetResult().GetRecords();
431  } else {
432  if (KinesisErrors::PROVISIONED_THROUGHPUT_EXCEEDED ==
433  get_records_outcome.GetError().GetErrorType()) {
435  } else if (KinesisErrors::EXPIRED_ITERATOR == get_records_outcome.GetError().GetErrorType()) {
436  rekognition_config_.at(stream_name).shard_iterator.clear();
437  AWS_LOG_WARN(
438  __func__,
439  "GetRecords failed due to expired iterator. A new one will be fetched at the next run.");
440  } else {
441  AWS_LOGSTREAM_ERROR(
442  __func__, "GetRecords failed with code "
443  << static_cast<int>(get_records_outcome.GetError().GetErrorType()) << ": "
444  << get_records_outcome.GetError().GetMessage());
445  }
447  }
448  return status;
449 }
450 
451 } // namespace Kinesis
452 } // namespace Aws
constexpr uint16_t kDefaultRecordsLimitForRekognitionResults
Definition: common.h:59
const char * message_queue_size
Definition: common.h:77
constexpr uint32_t kDefaultMessageQueueSize
Definition: common.h:58
#define KINESIS_MANAGER_STATUS_FAILED(status)
Definition: common.h:19
enum Aws::Kinesis::kinesis_manager_status_e KinesisManagerStatus
#define KINESIS_MANAGER_STATUS_SUCCEEDED(status)
Definition: common.h:18
Aws::Client::ParameterPath GetStreamParameterPrefix(int stream_idx)
const struct Aws::Kinesis::@0 stream
const char * topic_name
Definition: common.h:76
std::function< std::unique_ptr< KinesisVideoProducerInterface >(std::string, std::unique_ptr< com::amazonaws::kinesis::video::DeviceInfoProvider >, std::unique_ptr< com::amazonaws::kinesis::video::ClientCallbackProvider >, std::unique_ptr< com::amazonaws::kinesis::video::StreamCallbackProvider >, std::unique_ptr< com::amazonaws::kinesis::video::CredentialProvider >)> VideoProducerFactory
const char * stream_name
Definition: common.h:80
Aws::Client::ParameterPath GetStreamParameterPath(int stream_idx, const char *parameter_name)
AWS_ERR_NOT_FOUND
Aws::Client::ParameterPath GetKinesisVideoParameter(const char *parameter_name)
AWS_ERR_OK


kinesis_manager
Author(s): AWS RoboMaker
autogenerated on Thu Mar 4 2021 03:28:41