subscriber_callbacks.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License").
5  * You may not use this file except in compliance with the License.
6  * A copy of the License is located at
7  *
8  * http://aws.amazon.com/apache2.0
9  *
10  * or in the "license" file accompanying this file. This file is distributed
11  * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12  * express or implied. See the License for the specific language governing
13  * permissions and limitations under the License.
14  */
15 #include <aws/core/utils/logging/LogMacros.h>
17 #include <std_msgs/String.h>
18 
19 #ifndef PUTFRAME_LOG_INTERVAL_IN_SECONDS
20 #define PUTFRAME_LOG_INTERVAL_IN_SECONDS (10)
21 #endif
22 
23 namespace Aws {
24 namespace Kinesis {
25 
27  KinesisStreamManagerInterface & stream_manager, std::string stream_name,
28  const kinesis_video_msgs::KinesisVideoFrame::ConstPtr & frame_msg,
29  const ros::Publisher & publisher)
30 {
31  KinesisVideoFrameTransportCallback(stream_manager, stream_name, frame_msg);
32  Aws::Vector<Model::Record> records;
33  KinesisManagerStatus status = stream_manager.FetchRekognitionResults(stream_name, &records);
34  if (KINESIS_MANAGER_STATUS_FAILED(status) &&
36  AWS_LOGSTREAM_WARN(__func__, stream_name.c_str()
37  << " FetchRekognitionResults failed. Error code: " << status);
38  return;
39  }
40  for (auto item = records.begin(); item != records.end(); item++) {
41  std_msgs::String message;
42  const char * data = reinterpret_cast<char *>(item->GetData().GetUnderlyingData());
43  size_t length = item->GetData().GetLength();
44  message.data = std::string(data, length);
45  publisher.publish(message);
46  }
47 }
48 
50  KinesisStreamManagerInterface & stream_manager, std::string stream_name,
51  const kinesis_video_msgs::KinesisVideoFrame::ConstPtr & frame_msg)
52 {
53  if (!frame_msg->codec_private_data.empty()) {
54  KinesisManagerStatus update_codec_data_result =
55  stream_manager.ProcessCodecPrivateDataForStream(stream_name, frame_msg->codec_private_data);
56  if (KINESIS_MANAGER_STATUS_FAILED(update_codec_data_result)) {
57  AWS_LOGSTREAM_WARN(
58  __func__, stream_name.c_str()
59  << " failed updating codec data, error code: " << update_codec_data_result
60  << ". Continuing streaming as a best effort, but you might not be able to "
61  "decode and render the stream.");
62  } else {
64  "%s Updated codec data successfully. Frame index: %du",
65  stream_name.c_str(), frame_msg->index);
66  }
67  }
68 
69  Frame frame;
70  frame.trackId = DEFAULT_TRACK_ID;
71  frame.size = frame_msg->frame_data.size();
72  frame.frameData = reinterpret_cast<PBYTE>((void *)(frame_msg->frame_data.data()));
73  frame.duration = frame_msg->duration *
74  HUNDREDS_OF_NANOS_IN_A_MICROSECOND; /* Duration is specified in microseconds, but
75  Kinesis expects 100ns units. */
76  frame.index = frame_msg->index;
77  UINT64 generated_timestamp = std::chrono::duration_cast<std::chrono::nanoseconds>(
78  std::chrono::system_clock::now().time_since_epoch())
79  .count() /
80  DEFAULT_TIME_UNIT_IN_NANOS;
81  frame.decodingTs = frame_msg->decoding_ts ? frame_msg->decoding_ts : generated_timestamp;
82  frame.presentationTs =
83  frame_msg->presentation_ts ? frame_msg->presentation_ts : generated_timestamp;
84  frame.flags = (FRAME_FLAGS)frame_msg->flags;
85 
86  KinesisManagerStatus status = stream_manager.PutFrame(stream_name, frame);
87  if (KINESIS_MANAGER_STATUS_FAILED(status)) {
88  AWS_LOGSTREAM_WARN(__func__, stream_name.c_str() << " PutFrame failed. Error code: " << status);
89  } else {
90  ROS_DEBUG_THROTTLE(PUTFRAME_LOG_INTERVAL_IN_SECONDS, "%s PutFrame succeeded. Frame index: %du",
91  stream_name.c_str(), frame.index);
92  }
93 
94  if (!frame_msg->metadata.empty()) {
96  for (auto iter = frame_msg->metadata.begin(); iter != frame_msg->metadata.end(); ++iter) {
97  status = static_cast<KinesisManagerStatus>(
98  status | stream_manager.PutMetadata(stream_name, iter->key, iter->value));
99  }
100  if (KINESIS_MANAGER_STATUS_FAILED(status)) {
101  AWS_LOGSTREAM_WARN(__func__, stream_name.c_str()
102  << " PutMetadata failed. Error code: " << status);
103  } else {
105  "%s PutMetadata succeeded. Frame index: %du", stream_name.c_str(),
106  frame.index);
107  }
108  }
109 }
110 
112  std::string stream_name, const sensor_msgs::ImageConstPtr & image)
113 {
114  Frame frame;
115  frame.trackId = DEFAULT_TRACK_ID;
116  frame.size = image->step * image->height;
117  /* Overflow check (since 'size', 'step' and 'height' are all 32 bit integers). */
118  if (image->step != 0 && frame.size / image->step != image->height) {
119  AWS_LOGSTREAM_WARN(
120  __func__,
121  stream_name.c_str()
122  << " Integer overflow detected - image size is too big. Aborting imageTransportCallback");
123  return;
124  }
125  frame.frameData = reinterpret_cast<PBYTE>((void *)(image->data.data()));
126  frame.duration = 0;
127  frame.index = image->header.seq;
128  UINT64 generated_timestamp = std::chrono::duration_cast<std::chrono::nanoseconds>(
129  std::chrono::system_clock::now().time_since_epoch())
130  .count() /
131  DEFAULT_TIME_UNIT_IN_NANOS;
132  /* Image uses standard ROS Header type which contains a (seconds, nseconds) timestamp structure.
133  * Need to convert to 100ns unit. */
134  std::chrono::seconds timestamp_in_seconds(image->header.stamp.sec);
135  UINT64 image_timestamp =
136  (std::chrono::microseconds(timestamp_in_seconds).count() * HUNDREDS_OF_NANOS_IN_A_MICROSECOND) +
137  (image->header.stamp.nsec / DEFAULT_TIME_UNIT_IN_NANOS);
138  frame.decodingTs = image_timestamp ? image_timestamp : generated_timestamp;
139  frame.presentationTs = image_timestamp ? image_timestamp : generated_timestamp;
140  frame.flags = FRAME_FLAG_NONE;
141 
142  KinesisManagerStatus status = stream_manager.PutFrame(stream_name, frame);
143  if (KINESIS_MANAGER_STATUS_FAILED(status)) {
144  AWS_LOGSTREAM_WARN(__func__, stream_name.c_str() << " PutFrame failed. Error code: " << status);
145  } else {
146  ROS_DEBUG_THROTTLE(PUTFRAME_LOG_INTERVAL_IN_SECONDS, "%s PutFrame succeeded. Frame index: %du",
147  stream_name.c_str(), frame.index);
148  }
149 }
150 
151 } // namespace Kinesis
152 } // namespace Aws
KINESIS_MANAGER_STATUS_SUCCESS
virtual KinesisManagerStatus PutMetadata(std::string stream_name, const std::string &name, const std::string &value) const =0
void publish(const boost::shared_ptr< M > &message) const
#define KINESIS_MANAGER_STATUS_FAILED(status)
#define ROS_DEBUG_THROTTLE(rate,...)
enum Aws::Kinesis::kinesis_manager_status_e KinesisManagerStatus
virtual KinesisManagerStatus FetchRekognitionResults(const std::string &stream_name, Aws::Vector< Model::Record > *records)=0
const char * stream_name
void RekognitionEnabledKinesisVideoFrameTransportCallback(KinesisStreamManagerInterface &stream_manager, std::string stream_name, const kinesis_video_msgs::KinesisVideoFrame::ConstPtr &frame_msg, const ros::Publisher &publisher)
virtual KinesisManagerStatus ProcessCodecPrivateDataForStream(const std::string &stream_name, std::vector< uint8_t > codec_private_data)=0
void KinesisVideoFrameTransportCallback(KinesisStreamManagerInterface &stream_manager, std::string stream_name, const kinesis_video_msgs::KinesisVideoFrame::ConstPtr &frame_msg)
void ImageTransportCallback(const KinesisStreamManagerInterface &stream_manager, std::string stream_name, const sensor_msgs::ImageConstPtr &image)
virtual KinesisManagerStatus PutFrame(std::string stream_name, Frame &frame) const =0
#define PUTFRAME_LOG_INTERVAL_IN_SECONDS
KINESIS_MANAGER_STATUS_GET_RECORDS_THROTTLED


kinesis_video_streamer
Author(s): AWS RoboMaker
autogenerated on Fri Mar 5 2021 03:29:15