streamer.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License").
5  * You may not use this file except in compliance with the License.
6  * A copy of the License is located at
7  *
8  * http://aws.amazon.com/apache2.0
9  *
10  * or in the "license" file accompanying this file. This file is distributed
11  * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12  * express or implied. See the License for the specific language governing
13  * permissions and limitations under the License.
14  */
15 
16 #include <log4cplus/configurator.h>
17 
18 #include <aws/core/Aws.h>
19 #include <aws/core/utils/logging/LogMacros.h>
27 #include <ros/ros.h>
28 
29 using namespace Aws::Client;
30 using namespace Aws::Kinesis;
31 
32 
33 #ifndef RETURN_CODE_MASK
34 #define RETURN_CODE_MASK (0xff) /* Process exit code is in range (0, 255) */
35 #endif
36 
37 #ifndef UNKNOWN_ERROR_KINESIS_VIDEO_EXIT_CODE
38 #define UNKNOWN_ERROR_KINESIS_VIDEO_EXIT_CODE (0xf0)
39 #endif
40 
41 constexpr char kNodeName[] = "kinesis_video_streamer";
42 const char * kSpinnerThreadCountOverrideParameter = "spinner_thread_count";
43 
44 namespace Aws {
45 namespace Kinesis {
46 
47 StreamerNode::StreamerNode(const std::string & ns = std::string()) : ros::NodeHandle(ns)
48 {
49  parameter_reader_ = std::make_shared<Ros1NodeParameterReader>();
50  subscription_installer_ = std::make_shared<RosStreamSubscriptionInstaller>(*this);
51 
52  /* Log4cplus setup for the Kinesis Producer SDK */
53  std::string log4cplus_config;
54  parameter_reader_->ReadParam(
55  GetKinesisVideoParameter(kStreamParameters.log4cplus_config), log4cplus_config);
56  if (!log4cplus_config.empty()) {
57  log4cplus::PropertyConfigurator::doConfigure(log4cplus_config);
58  } else {
59  log4cplus::BasicConfigurator configurator;
60  configurator.configure();
61  }
62 }
63 
65 {
66  ClientConfigurationProvider configuration_provider(parameter_reader_);
67  ClientConfiguration aws_sdk_config =
68  configuration_provider.GetClientConfiguration();
69  /* Set up subscription callbacks */
70  if (!subscription_installer_->SetDefaultCallbacks()) {
71  AWS_LOG_FATAL(__func__, "Failed to set up subscription callbacks.");
73  }
74  auto kinesis_client = std::unique_ptr<KinesisClient>(
75  Aws::New<Aws::Kinesis::KinesisClientFacade>(__func__, aws_sdk_config));
76  stream_manager_ = std::make_shared<KinesisStreamManager>(
78  std::move(kinesis_client));
79  subscription_installer_->set_stream_manager(stream_manager_.get());
80  /* Initialization of video producer */
81  KinesisManagerStatus initialize_video_producer_result =
82  stream_manager_->InitializeVideoProducer(aws_sdk_config.region.c_str());
83  if (KINESIS_MANAGER_STATUS_FAILED(initialize_video_producer_result)) {
84  fprintf(stderr, "Failed to initialize video producer");
85  AWS_LOGSTREAM_FATAL(__func__, "Failed to initialize video producer. Error code: "
86  << initialize_video_producer_result);
87  return initialize_video_producer_result;
88  }
89 
91 }
92 
94 {
95  /* Set up subscriptions and get ready to start streaming */
96  KinesisManagerStatus streamer_setup_result = stream_manager_->KinesisVideoStreamerSetup();
97  if (KINESIS_MANAGER_STATUS_SUCCEEDED(streamer_setup_result)) {
98  AWS_LOG_DEBUG(__func__, "KinesisVideoStreamerSetup completed successfully.");
99  } else {
100  fprintf(stderr, "Failed to setup the kinesis video streamer");
101  AWS_LOGSTREAM_ERROR(__func__, "KinesisVideoStreamerSetup failed with error code : "
102  << streamer_setup_result << ". Exiting");
103  return streamer_setup_result;
104  }
105 
107 }
108 
110 {
111  uint32_t spinner_thread_count = kDefaultNumberOfSpinnerThreads;
112  int spinner_thread_count_input;
113  if (Aws::AwsError::AWS_ERR_OK ==
115  spinner_thread_count_input)) {
116  spinner_thread_count = static_cast<uint32_t>(spinner_thread_count_input);
117  }
118  ros::MultiThreadedSpinner spinner(spinner_thread_count);
119  spinner.spin();
120 }
121 
122 void StreamerNode::set_subscription_installer(std::shared_ptr<RosStreamSubscriptionInstaller> subscription_installer)
123 {
124  subscription_installer_ = subscription_installer;
125 }
126 
127 } // namespace Kinesis
128 } // namespace Aws
KINESIS_MANAGER_STATUS_SUCCESS
void set_subscription_installer(std::shared_ptr< RosStreamSubscriptionInstaller > subscription_installer)
Definition: streamer.cpp:122
constexpr uint32_t kDefaultNumberOfSpinnerThreads
Definition: streamer.h:30
#define KINESIS_MANAGER_STATUS_FAILED(status)
std::shared_ptr< KinesisStreamManager > stream_manager_
Definition: streamer.h:50
enum Aws::Kinesis::kinesis_manager_status_e KinesisManagerStatus
#define KINESIS_MANAGER_STATUS_SUCCEEDED(status)
std::shared_ptr< Aws::Client::ParameterReaderInterface > parameter_reader_
Definition: streamer.h:48
StreamDefinitionProvider stream_definition_provider_
Definition: streamer.h:51
const char * kSpinnerThreadCountOverrideParameter
Definition: streamer.cpp:42
KinesisManagerStatus Initialize()
Definition: streamer.cpp:64
std::shared_ptr< RosStreamSubscriptionInstaller > subscription_installer_
Definition: streamer.h:49
KinesisManagerStatus InitializeStreamSubscriptions()
Definition: streamer.cpp:93
KINESIS_MANAGER_STATUS_ERROR_BASE
virtual void spin(CallbackQueue *queue=0)
Aws::Client::ParameterPath GetKinesisVideoParameter(const char *parameter_name)
ClientConfiguration GetClientConfiguration(const std::string &ros_version_override="")
constexpr char kNodeName[]
Definition: streamer.cpp:41
const char * log4cplus_config


kinesis_video_streamer
Author(s): AWS RoboMaker
autogenerated on Fri Mar 5 2021 03:29:15