15 #include <aws/core/utils/logging/LogMacros.h> 16 #include <aws/kinesis/model/GetShardIteratorRequest.h> 17 #include <aws/kinesis/model/ListShardsRequest.h> 18 #include <aws/kinesis/model/ListShardsResult.h> 19 #include <kinesis-video-producer/ClientCallbackProvider.h> 20 #include <kinesis-video-producer/KinesisVideoProducer.h> 36 const uint16_t stream_idx,
const PBYTE codec_private_data,
const uint32_t codec_private_data_size,
40 unique_ptr<StreamDefinition> stream_definition = stream_definition_provider_->GetStreamDefinition(
42 if (!stream_definition) {
43 AWS_LOGSTREAM_ERROR(__func__,
"Skipping stream id " 44 << stream_idx <<
" due to failure to load stream definition.");
48 if (
nullptr != stream_name) {
49 *stream_name = stream_definition->getStreamName();
51 status = InitializeVideoStream(std::move(stream_definition));
54 __func__,
"Skipping stream id " 55 << stream_idx <<
" due to failure initializing stream. Error code: " << status);
65 param_status |= parameter_reader_->ReadParam(
68 param_status |= parameter_reader_->ReadParam(
71 param_status |= parameter_reader_->ReadParam(
75 AWS_LOGSTREAM_ERROR(__func__,
"Missing parameters - can't construct descriptor (topic: " 78 <<
" type: " << descriptor.
input_type <<
") " << param_status);
82 AwsError data_stream_read_result = parameter_reader_->ReadParam(
85 AwsError rekognition_topic_read_result = parameter_reader_->ReadParam(
88 if (data_stream_read_result != rekognition_topic_read_result ||
91 __func__,
"Invalid input: error reading parameters for AWS Rekognition support (data stream: " 94 <<
" code: " << rekognition_topic_read_result <<
")");
98 int message_queue_size_input;
100 parameter_reader_->ReadParam(
102 message_queue_size_input)) {
103 if (0 > message_queue_size_input) {
104 AWS_LOGSTREAM_WARN(__func__, descriptor.
stream_name <<
" Message queue size provided (" 105 << message_queue_size_input <<
")" 106 <<
"is invalid. Using the default of " 107 << message_queue_size);
109 message_queue_size =
static_cast<uint32_t
>(message_queue_size_input);
119 int video_stream_count = 0;
122 if (0 >= video_stream_count) {
123 AWS_LOGSTREAM_WARN(__func__,
"Stream count " << video_stream_count <<
" is invalid. Aborting");
125 for (
int stream_idx = 0; stream_idx < video_stream_count; stream_idx++) {
127 PBYTE codec_private_data =
nullptr;
128 uint32_t codec_private_data_size = 0;
131 *parameter_reader_, &codec_private_data,
132 &codec_private_data_size);
134 AWS_LOGSTREAM_ERROR(__func__,
"Skipping stream id " 136 <<
" due to failure to load codec private data. Error code: " 137 << get_codec_private_data_result);
141 codec_private_data_size,
nullptr))) {
142 SAFE_MEMFREE(codec_private_data);
148 GenerateStreamSubscriptionDescriptor(stream_idx, descriptor))) {
150 SAFE_MEMFREE(codec_private_data);
154 InitializeStreamSubscription(descriptor);
156 AWS_LOGSTREAM_ERROR(__func__,
"Failed to subscribe to '" 159 <<
"'. Error code: " << subscription_installation_result);
161 SAFE_MEMFREE(codec_private_data);
169 unique_ptr<KinesisVideoProducerInterface> KinesisStreamManagerInterface::CreateDefaultVideoProducer(
171 unique_ptr<com::amazonaws::kinesis::video::DeviceInfoProvider> device_info_provider,
172 unique_ptr<com::amazonaws::kinesis::video::ClientCallbackProvider> client_callback_provider,
173 unique_ptr<com::amazonaws::kinesis::video::StreamCallbackProvider> stream_callback_provider,
174 unique_ptr<com::amazonaws::kinesis::video::CredentialProvider> credential_provider)
176 return std::make_unique<KinesisVideoProducerImpl>(KinesisVideoProducer::createSync(
177 std::move(device_info_provider), std::move(client_callback_provider),
178 std::move(stream_callback_provider), std::move(credential_provider), region));
188 rekognition_config_.insert({descriptor.
stream_name, rekognition_info});
194 std::string region, unique_ptr<DeviceInfoProvider> device_info_provider,
195 unique_ptr<ClientCallbackProvider> client_callback_provider,
196 unique_ptr<StreamCallbackProvider> stream_callback_provider,
197 unique_ptr<CredentialProvider> credential_provider,
200 if (video_producer_) {
203 if (region.empty()) {
204 AWS_LOG_ERROR(__func__,
205 "Region not provided. Check that the config file is correct and was loaded properly.");
208 if (!device_info_provider || !client_callback_provider || !stream_callback_provider || !credential_provider) {
211 video_producer_ = video_producer_factory(region, std::move(device_info_provider), std::move(client_callback_provider),
212 std::move(stream_callback_provider), std::move(credential_provider));
219 unique_ptr<DeviceInfoProvider> device_provider = make_unique<DefaultDeviceInfoProvider>();
220 unique_ptr<ClientCallbackProvider> client_callback_provider =
221 make_unique<DefaultClientCallbackProvider>();
222 unique_ptr<StreamCallbackProvider> stream_callback_provider =
223 make_unique<DefaultStreamCallbackProvider>();
224 unique_ptr<CredentialProvider> credentials_provider =
225 std::make_unique<ProducerSdkAWSCredentialsProvider>();
226 if (!credentials_provider) {
227 AWS_LOG_ERROR(__func__,
228 "Credential provider is invalid, have you set the environment variables required " 232 return InitializeVideoProducer(
233 region, std::move(device_provider), std::move(client_callback_provider),
234 std::move(stream_callback_provider), std::move(credentials_provider),
235 video_producer_factory);
239 unique_ptr<StreamDefinition> stream_definition)
241 if (!video_producer_) {
244 if (!stream_definition) {
247 string stream_name = stream_definition->getStreamName();
248 if (stream_name.empty()) {
251 if (video_streams_.count(stream_name) > 0) {
255 StreamInfo stream_info = stream_definition->getStreamInfo();
256 shared_ptr<KinesisVideoStreamInterface>
stream;
258 stream = video_producer_->CreateStreamSync(std::move(stream_definition));
259 }
catch (
const std::runtime_error & e) {
265 if (0 < stream_info.streamCaps.trackInfoList[0].codecPrivateDataSize) {
266 std::vector<uint8_t> codec_private_data;
267 codec_private_data.assign(
268 stream_info.streamCaps.trackInfoList[0].codecPrivateData,
269 stream_info.streamCaps.trackInfoList[0].codecPrivateData
270 + stream_info.streamCaps.trackInfoList[0].codecPrivateDataSize);
271 video_streams_codec_data_.insert({
stream_name, codec_private_data});
281 if (!video_producer_) {
284 if (0 == video_streams_.count(stream_name)) {
287 if (!video_streams_.at(stream_name)->IsReady()) {
288 AWS_LOG_WARN(__func__,
"Stream not ready yet, skipping putFrame.");
291 bool result = video_streams_.at(stream_name)->PutFrame(frame);
296 const std::string & name,
297 const std::string & value)
const 299 if (!video_producer_) {
302 if (0 == video_streams_.count(stream_name)) {
305 if (!video_streams_.at(stream_name)->IsReady()) {
306 AWS_LOG_WARN(__func__,
"Stream not ready yet, skipping putFragmentMetadata.");
309 bool result = video_streams_.at(stream_name)->PutFragmentMetadata(name, value,
false);
315 if (video_producer_ && video_streams_.count(stream_name) > 0) {
316 if (video_streams_.at(stream_name)->IsReady()) {
317 video_streams_.at(stream_name)->Stop();
319 video_producer_->FreeStream(video_streams_.at(stream_name));
320 video_streams_.erase(stream_name);
325 const std::string &
stream_name, std::vector<uint8_t> codec_private_data)
327 if (0 < video_streams_codec_data_.count(stream_name) &&
328 video_streams_codec_data_.at(stream_name) == codec_private_data) {
332 AWS_LOGSTREAM_INFO(__func__,
"Updating new codec data for " << stream_name);
334 int video_stream_count = 0, stream_idx = 0;
338 for (
int stream_idx = 0; stream_idx < video_stream_count; stream_idx++) {
339 std::string configured_stream_name;
340 parameter_reader_->ReadParam(
342 configured_stream_name);
343 if (configured_stream_name == stream_name) {
352 FreeStream(stream_name);
353 status = KinesisVideoStreamSetup(stream_idx, codec_private_data.data(), codec_private_data.size(),
359 parameter_reader_->ReadParam(
361 AWS_LOGSTREAM_ERROR(__func__,
"KinesisVideoStreamSetup failed, uninstalling subscriptions to " 362 << topic_name <<
" Error code: " << status);
363 subscription_installer_->Uninstall(topic_name);
370 if (!rekognition_config_.at(stream_name).shard_iterator.empty()) {
375 Model::ListShardsRequest list_shards_request;
376 list_shards_request.SetStreamName(rekognition_config_.at(stream_name).data_stream_name);
377 list_shards_request.SetMaxResults(1);
378 auto list_shards_outcome = kinesis_client_->ListShards(list_shards_request);
379 if (!list_shards_outcome.IsSuccess()) {
380 AWS_LOGSTREAM_ERROR(__func__,
381 "ListShards failed with code " 382 << static_cast<int>(list_shards_outcome.GetError().GetErrorType()) <<
": " 383 << list_shards_outcome.GetError().GetMessage());
386 if (list_shards_outcome.GetResult().GetShards().empty()) {
387 AWS_LOG_ERROR(__func__,
"ListShards: no shards available");
391 Aws::String shard_id = list_shards_outcome.GetResult().GetShards().front().GetShardId();
392 Model::GetShardIteratorRequest get_shard_iterator_request;
393 get_shard_iterator_request.SetStreamName(rekognition_config_.at(stream_name).data_stream_name);
394 get_shard_iterator_request.SetShardId(shard_id);
395 get_shard_iterator_request.SetShardIteratorType(Model::ShardIteratorType::LATEST);
396 auto get_shard_iterator_outcome = kinesis_client_->GetShardIterator(get_shard_iterator_request);
397 if (!get_shard_iterator_outcome.IsSuccess()) {
399 __func__,
"GetShardIterator failed with code " 400 << static_cast<int>(get_shard_iterator_outcome.GetError().GetErrorType()) <<
": " 401 << get_shard_iterator_outcome.GetError().GetMessage());
404 rekognition_config_.at(stream_name).shard_iterator =
405 get_shard_iterator_outcome.GetResult().GetShardIterator();
410 const std::string &
stream_name, Aws::Vector<Model::Record> * records)
413 if (0 == rekognition_config_.count(stream_name)) {
414 AWS_LOG_WARN(__func__,
"AWS Rekognition configuration is missing for this stream. Skipping");
417 if (rekognition_config_.at(stream_name).shard_iterator.empty()) {
418 status = UpdateShardIterator(stream_name);
423 Model::GetRecordsRequest get_records_request;
424 get_records_request.SetShardIterator(rekognition_config_.at(stream_name).shard_iterator);
426 auto get_records_outcome = kinesis_client_->GetRecords(get_records_request);
427 if (get_records_outcome.IsSuccess()) {
428 rekognition_config_.at(stream_name).shard_iterator =
429 get_records_outcome.GetResult().GetNextShardIterator();
430 *records = get_records_outcome.GetResult().GetRecords();
432 if (KinesisErrors::PROVISIONED_THROUGHPUT_EXCEEDED ==
433 get_records_outcome.GetError().GetErrorType()) {
435 }
else if (KinesisErrors::EXPIRED_ITERATOR == get_records_outcome.GetError().GetErrorType()) {
436 rekognition_config_.at(stream_name).shard_iterator.clear();
439 "GetRecords failed due to expired iterator. A new one will be fetched at the next run.");
442 __func__,
"GetRecords failed with code " 443 << static_cast<int>(get_records_outcome.GetError().GetErrorType()) <<
": " 444 << get_records_outcome.GetError().GetMessage());
constexpr uint16_t kDefaultRecordsLimitForRekognitionResults
const char * message_queue_size
std::string rekognition_data_stream
Aws::String data_stream_name
constexpr uint32_t kDefaultMessageQueueSize
#define KINESIS_MANAGER_STATUS_FAILED(status)
enum Aws::Kinesis::kinesis_manager_status_e KinesisManagerStatus
#define KINESIS_MANAGER_STATUS_SUCCEEDED(status)
Aws::Client::ParameterPath GetStreamParameterPrefix(int stream_idx)
const struct Aws::Kinesis::@0 stream
std::function< std::unique_ptr< KinesisVideoProducerInterface >(std::string, std::unique_ptr< com::amazonaws::kinesis::video::DeviceInfoProvider >, std::unique_ptr< com::amazonaws::kinesis::video::ClientCallbackProvider >, std::unique_ptr< com::amazonaws::kinesis::video::StreamCallbackProvider >, std::unique_ptr< com::amazonaws::kinesis::video::CredentialProvider >)> VideoProducerFactory
uint32_t message_queue_size
Aws::Client::ParameterPath GetStreamParameterPath(int stream_idx, const char *parameter_name)
Aws::Client::ParameterPath GetKinesisVideoParameter(const char *parameter_name)
KinesisStreamInputType input_type
std::string rekognition_topic_name