kinesis_stream_manager.h
Go to the documentation of this file.
1 /*
2  * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License").
5  * You may not use this file except in compliance with the License.
6  * A copy of the License is located at
7  *
8  * http://aws.amazon.com/apache2.0
9  *
10  * or in the "license" file accompanying this file. This file is distributed
11  * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12  * express or implied. See the License for the specific language governing
13  * permissions and limitations under the License.
14  */
15 #pragma once
16 #include <aws/kinesis/KinesisClient.h>
17 #include <aws/kinesis/model/GetRecordsRequest.h>
18 #include <aws/kinesis/model/GetRecordsResult.h>
19 #include <aws/kinesis/model/Record.h>
22 #include <kinesis-video-producer/KinesisVideoProducer.h>
23 #include <kinesis_manager/common.h>
28 
29 namespace Aws {
30 namespace Kinesis {
31 
42 inline Aws::Client::ParameterPath GetStreamParameterPath(int stream_idx, const char * parameter_name)
43 {
44  Aws::Client::ParameterPath path(kStreamParameters.prefix);
45  if (INVALID_STREAM_ID != stream_idx) {
46  path += kStreamParameters.stream_namespace + std::to_string(stream_idx);
47  }
48  if (nullptr != parameter_name) {
49  path += parameter_name;
50  }
51  return path;
52 }
53 
59 {
60  return GetStreamParameterPath(stream_idx, nullptr);
61 }
66 inline Aws::Client::ParameterPath GetKinesisVideoParameter(const char * parameter_name)
67 {
68  return GetStreamParameterPath(INVALID_STREAM_ID, parameter_name);
69 }
70 
72 {
73 public:
75  const StreamDefinitionProvider * stream_definition_provider,
76  StreamSubscriptionInstaller * subscription_installer)
77  : parameter_reader_(parameter_reader),
78  stream_definition_provider_(stream_definition_provider),
79  subscription_installer_(subscription_installer){};
81  virtual ~KinesisStreamManagerInterface() = default;
82 
83  using VideoProducerFactory = std::function<std::unique_ptr<KinesisVideoProducerInterface>(
84  std::string,
85  std::unique_ptr<com::amazonaws::kinesis::video::DeviceInfoProvider>,
86  std::unique_ptr<com::amazonaws::kinesis::video::ClientCallbackProvider>,
87  std::unique_ptr<com::amazonaws::kinesis::video::StreamCallbackProvider>,
88  std::unique_ptr<com::amazonaws::kinesis::video::CredentialProvider>
89  )>;
90 
103  virtual KinesisManagerStatus InitializeVideoProducer(std::string region,
104  std::unique_ptr<com::amazonaws::kinesis::video::DeviceInfoProvider> device_info_provider,
105  std::unique_ptr<com::amazonaws::kinesis::video::ClientCallbackProvider> client_callback_provider,
106  std::unique_ptr<com::amazonaws::kinesis::video::StreamCallbackProvider> stream_callback_provider,
107  std::unique_ptr<com::amazonaws::kinesis::video::CredentialProvider> credential_provider,
109 
117  virtual KinesisManagerStatus InitializeVideoProducer(std::string region,
119 
129  std::unique_ptr<com::amazonaws::kinesis::video::StreamDefinition> stream_definition) = 0;
130 
139  virtual KinesisManagerStatus PutFrame(std::string stream_name, Frame & frame) const = 0;
140 
150  virtual KinesisManagerStatus PutMetadata(std::string stream_name, const std::string & name,
151  const std::string & value) const = 0;
152 
158  virtual void FreeStream(std::string stream_name) = 0;
159 
170  const std::string & stream_name, std::vector<uint8_t> codec_private_data) = 0;
171 
179  virtual KinesisManagerStatus FetchRekognitionResults(const std::string & stream_name,
180  Aws::Vector<Model::Record> * records) = 0;
181 
182  static std::unique_ptr<KinesisVideoProducerInterface> CreateDefaultVideoProducer(
183  std::string region,
184  std::unique_ptr<com::amazonaws::kinesis::video::DeviceInfoProvider> device_info_provider,
185  std::unique_ptr<com::amazonaws::kinesis::video::ClientCallbackProvider> client_callback_provider,
186  std::unique_ptr<com::amazonaws::kinesis::video::StreamCallbackProvider> stream_callback_provider,
187  std::unique_ptr<com::amazonaws::kinesis::video::CredentialProvider> credential_provider);
188 
189 protected:
210  virtual KinesisManagerStatus KinesisVideoStreamSetup(const uint16_t stream_idx,
211  const PBYTE codec_private_data,
212  const uint32_t codec_private_data_size,
213  std::string * stream_name);
220  int stream_idx, StreamSubscriptionDescriptor & descriptor);
228  const StreamSubscriptionDescriptor & descriptor) = 0;
229 
233 };
234 
241 {
242 public:
244  StreamDefinitionProvider * stream_definition_provider,
245  StreamSubscriptionInstaller * subscription_installer,
246  std::unique_ptr<KinesisClient> kinesis_client)
247  : kinesis_client_(std::move(kinesis_client)),
248  KinesisStreamManagerInterface(parameter_reader, stream_definition_provider,
249  subscription_installer){};
250  KinesisStreamManager() = default;
251  ~KinesisStreamManager() = default;
252 
254  std::unique_ptr<com::amazonaws::kinesis::video::DeviceInfoProvider> device_info_provider,
255  std::unique_ptr<com::amazonaws::kinesis::video::ClientCallbackProvider> client_callback_provider,
256  std::unique_ptr<com::amazonaws::kinesis::video::StreamCallbackProvider> stream_callback_provider,
257  std::unique_ptr<com::amazonaws::kinesis::video::CredentialProvider> credential_provider,
261 
263  std::unique_ptr<com::amazonaws::kinesis::video::StreamDefinition> stream_definition) override;
264 
265  KinesisManagerStatus PutFrame(std::string stream_name, Frame & frame) const override;
266 
267  KinesisManagerStatus PutMetadata(std::string stream_name, const std::string & name,
268  const std::string & value) const override;
269 
270  void FreeStream(std::string stream_name) override;
271 
273  {
275  }
276 
278  const PBYTE codec_private_data,
279  const uint32_t codec_private_data_size,
280  std::string * stream_name) override
281  {
283  stream_idx, codec_private_data, codec_private_data_size, stream_name);
284  }
285 
287  const std::string & stream_name, std::vector<uint8_t> codec_private_data) override;
288 
289  KinesisManagerStatus FetchRekognitionResults(const std::string & stream_name,
290  Aws::Vector<Model::Record> * records) override;
291 
293  {
294  return video_producer_.get();
295  }
296 
297 protected:
299  const StreamSubscriptionDescriptor & descriptor) override;
300 
301 private:
308  KinesisManagerStatus UpdateShardIterator(const std::string & stream_name);
309 
310  std::map<std::string, std::shared_ptr<KinesisVideoStreamInterface>> video_streams_;
311  std::map<std::string, std::vector<uint8_t>> video_streams_codec_data_;
312  std::unique_ptr<KinesisVideoProducerInterface> video_producer_;
313  std::unique_ptr<KinesisClient> kinesis_client_;
314 
316  {
317  Aws::String data_stream_name; /* Analysis results will be fetched from this stream. */
318  Aws::String shard_iterator;
319  };
320  std::map<std::string, RekognitionStreamInfo>
321  rekognition_config_; /* Video stream name to RekognitionStreamInfo */
322 };
323 
324 } // namespace Kinesis
325 } // namespace Aws
std::unique_ptr< KinesisClient > kinesis_client_
virtual KinesisManagerStatus PutMetadata(std::string stream_name, const std::string &name, const std::string &value) const =0
std::map< std::string, std::shared_ptr< KinesisVideoStreamInterface > > video_streams_
virtual void FreeStream(std::string stream_name)=0
std::map< std::string, std::vector< uint8_t > > video_streams_codec_data_
KinesisManagerStatus KinesisVideoStreamSetup(const uint16_t stream_idx, const PBYTE codec_private_data, const uint32_t codec_private_data_size, std::string *stream_name) override
enum Aws::Kinesis::kinesis_manager_status_e KinesisManagerStatus
StreamSubscriptionInstaller * subscription_installer_
Aws::Client::ParameterPath GetStreamParameterPrefix(int stream_idx)
KinesisManagerStatus KinesisVideoStreamerSetup() override
static std::unique_ptr< KinesisVideoProducerInterface > CreateDefaultVideoProducer(std::string region, std::unique_ptr< com::amazonaws::kinesis::video::DeviceInfoProvider > device_info_provider, std::unique_ptr< com::amazonaws::kinesis::video::ClientCallbackProvider > client_callback_provider, std::unique_ptr< com::amazonaws::kinesis::video::StreamCallbackProvider > stream_callback_provider, std::unique_ptr< com::amazonaws::kinesis::video::CredentialProvider > credential_provider)
const StreamDefinitionProvider * stream_definition_provider_
std::function< std::unique_ptr< KinesisVideoProducerInterface >(std::string, std::unique_ptr< com::amazonaws::kinesis::video::DeviceInfoProvider >, std::unique_ptr< com::amazonaws::kinesis::video::ClientCallbackProvider >, std::unique_ptr< com::amazonaws::kinesis::video::StreamCallbackProvider >, std::unique_ptr< com::amazonaws::kinesis::video::CredentialProvider >)> VideoProducerFactory
virtual KinesisManagerStatus FetchRekognitionResults(const std::string &stream_name, Aws::Vector< Model::Record > *records)=0
virtual KinesisManagerStatus InitializeStreamSubscription(const StreamSubscriptionDescriptor &descriptor)=0
virtual KinesisManagerStatus GenerateStreamSubscriptionDescriptor(int stream_idx, StreamSubscriptionDescriptor &descriptor)
const char * stream_name
Definition: common.h:80
virtual KinesisManagerStatus InitializeVideoStream(std::unique_ptr< com::amazonaws::kinesis::video::StreamDefinition > stream_definition)=0
#define INVALID_STREAM_ID
Definition: common.h:23
Aws::Client::ParameterPath GetStreamParameterPath(int stream_idx, const char *parameter_name)
std::unique_ptr< KinesisVideoProducerInterface > video_producer_
virtual KinesisManagerStatus ProcessCodecPrivateDataForStream(const std::string &stream_name, std::vector< uint8_t > codec_private_data)=0
KinesisStreamManager(Aws::Client::ParameterReaderInterface *parameter_reader, StreamDefinitionProvider *stream_definition_provider, StreamSubscriptionInstaller *subscription_installer, std::unique_ptr< KinesisClient > kinesis_client)
virtual KinesisManagerStatus InitializeVideoProducer(std::string region, std::unique_ptr< com::amazonaws::kinesis::video::DeviceInfoProvider > device_info_provider, std::unique_ptr< com::amazonaws::kinesis::video::ClientCallbackProvider > client_callback_provider, std::unique_ptr< com::amazonaws::kinesis::video::StreamCallbackProvider > stream_callback_provider, std::unique_ptr< com::amazonaws::kinesis::video::CredentialProvider > credential_provider, VideoProducerFactory video_producer_factory=KinesisStreamManagerInterface::CreateDefaultVideoProducer)=0
virtual KinesisManagerStatus KinesisVideoStreamSetup(const uint16_t stream_idx, const PBYTE codec_private_data, const uint32_t codec_private_data_size, std::string *stream_name)
Aws::Client::ParameterPath GetKinesisVideoParameter(const char *parameter_name)
KinesisVideoProducerInterface * get_video_producer()
const Aws::Client::ParameterReaderInterface * parameter_reader_
virtual KinesisManagerStatus PutFrame(std::string stream_name, Frame &frame) const =0
virtual KinesisManagerStatus KinesisVideoStreamerSetup()
KinesisStreamManagerInterface(const Aws::Client::ParameterReaderInterface *parameter_reader, const StreamDefinitionProvider *stream_definition_provider, StreamSubscriptionInstaller *subscription_installer)
std::map< std::string, RekognitionStreamInfo > rekognition_config_


kinesis_manager
Author(s): AWS RoboMaker
autogenerated on Thu Mar 4 2021 03:28:41