15 #include <aws/core/utils/logging/LogMacros.h> 17 #include <std_msgs/String.h> 19 #ifndef PUTFRAME_LOG_INTERVAL_IN_SECONDS 20 #define PUTFRAME_LOG_INTERVAL_IN_SECONDS (10) 28 const kinesis_video_msgs::KinesisVideoFrame::ConstPtr & frame_msg,
32 Aws::Vector<Model::Record> records;
36 AWS_LOGSTREAM_WARN(__func__, stream_name.c_str()
37 <<
" FetchRekognitionResults failed. Error code: " << status);
40 for (
auto item = records.begin(); item != records.end(); item++) {
41 std_msgs::String message;
42 const char * data =
reinterpret_cast<char *
>(item->GetData().GetUnderlyingData());
43 size_t length = item->GetData().GetLength();
44 message.data = std::string(data, length);
51 const kinesis_video_msgs::KinesisVideoFrame::ConstPtr & frame_msg)
53 if (!frame_msg->codec_private_data.empty()) {
58 __func__, stream_name.c_str()
59 <<
" failed updating codec data, error code: " << update_codec_data_result
60 <<
". Continuing streaming as a best effort, but you might not be able to " 61 "decode and render the stream.");
64 "%s Updated codec data successfully. Frame index: %du",
65 stream_name.c_str(), frame_msg->index);
70 frame.trackId = DEFAULT_TRACK_ID;
71 frame.size = frame_msg->frame_data.size();
72 frame.frameData =
reinterpret_cast<PBYTE
>((
void *)(frame_msg->frame_data.data()));
73 frame.duration = frame_msg->duration *
74 HUNDREDS_OF_NANOS_IN_A_MICROSECOND;
76 frame.index = frame_msg->index;
77 UINT64 generated_timestamp = std::chrono::duration_cast<std::chrono::nanoseconds>(
78 std::chrono::system_clock::now().time_since_epoch())
80 DEFAULT_TIME_UNIT_IN_NANOS;
81 frame.decodingTs = frame_msg->decoding_ts ? frame_msg->decoding_ts : generated_timestamp;
82 frame.presentationTs =
83 frame_msg->presentation_ts ? frame_msg->presentation_ts : generated_timestamp;
84 frame.flags = (FRAME_FLAGS)frame_msg->flags;
88 AWS_LOGSTREAM_WARN(__func__, stream_name.c_str() <<
" PutFrame failed. Error code: " << status);
91 stream_name.c_str(), frame.index);
94 if (!frame_msg->metadata.empty()) {
96 for (
auto iter = frame_msg->metadata.begin(); iter != frame_msg->metadata.end(); ++iter) {
98 status | stream_manager.
PutMetadata(stream_name, iter->key, iter->value));
101 AWS_LOGSTREAM_WARN(__func__, stream_name.c_str()
102 <<
" PutMetadata failed. Error code: " << status);
105 "%s PutMetadata succeeded. Frame index: %du", stream_name.c_str(),
112 std::string
stream_name,
const sensor_msgs::ImageConstPtr & image)
115 frame.trackId = DEFAULT_TRACK_ID;
116 frame.size = image->step * image->height;
118 if (image->step != 0 && frame.size / image->step != image->height) {
122 <<
" Integer overflow detected - image size is too big. Aborting imageTransportCallback");
125 frame.frameData =
reinterpret_cast<PBYTE
>((
void *)(image->data.data()));
127 frame.index = image->header.seq;
128 UINT64 generated_timestamp = std::chrono::duration_cast<std::chrono::nanoseconds>(
129 std::chrono::system_clock::now().time_since_epoch())
131 DEFAULT_TIME_UNIT_IN_NANOS;
134 std::chrono::seconds timestamp_in_seconds(image->header.stamp.sec);
135 UINT64 image_timestamp =
136 (std::chrono::microseconds(timestamp_in_seconds).count() * HUNDREDS_OF_NANOS_IN_A_MICROSECOND) +
137 (image->header.stamp.nsec / DEFAULT_TIME_UNIT_IN_NANOS);
138 frame.decodingTs = image_timestamp ? image_timestamp : generated_timestamp;
139 frame.presentationTs = image_timestamp ? image_timestamp : generated_timestamp;
140 frame.flags = FRAME_FLAG_NONE;
144 AWS_LOGSTREAM_WARN(__func__, stream_name.c_str() <<
" PutFrame failed. Error code: " << status);
147 stream_name.c_str(), frame.index);
KINESIS_MANAGER_STATUS_SUCCESS
virtual KinesisManagerStatus PutMetadata(std::string stream_name, const std::string &name, const std::string &value) const =0
void publish(const boost::shared_ptr< M > &message) const
#define KINESIS_MANAGER_STATUS_FAILED(status)
#define ROS_DEBUG_THROTTLE(rate,...)
enum Aws::Kinesis::kinesis_manager_status_e KinesisManagerStatus
virtual KinesisManagerStatus FetchRekognitionResults(const std::string &stream_name, Aws::Vector< Model::Record > *records)=0
void RekognitionEnabledKinesisVideoFrameTransportCallback(KinesisStreamManagerInterface &stream_manager, std::string stream_name, const kinesis_video_msgs::KinesisVideoFrame::ConstPtr &frame_msg, const ros::Publisher &publisher)
virtual KinesisManagerStatus ProcessCodecPrivateDataForStream(const std::string &stream_name, std::vector< uint8_t > codec_private_data)=0
void KinesisVideoFrameTransportCallback(KinesisStreamManagerInterface &stream_manager, std::string stream_name, const kinesis_video_msgs::KinesisVideoFrame::ConstPtr &frame_msg)
void ImageTransportCallback(const KinesisStreamManagerInterface &stream_manager, std::string stream_name, const sensor_msgs::ImageConstPtr &image)
virtual KinesisManagerStatus PutFrame(std::string stream_name, Frame &frame) const =0
#define PUTFRAME_LOG_INTERVAL_IN_SECONDS
KINESIS_MANAGER_STATUS_GET_RECORDS_THROTTLED