ros_stream_subscription_installer.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License").
5  * You may not use this file except in compliance with the License.
6  * A copy of the License is located at
7  *
8  * http://aws.amazon.com/apache2.0
9  *
10  * or in the "license" file accompanying this file. This file is distributed
11  * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12  * express or implied. See the License for the specific language governing
13  * permissions and limitations under the License.
14  */
15 #include <aws/core/utils/logging/LogMacros.h>
17 #include <std_msgs/String.h>
18 
19 namespace Aws {
20 namespace Kinesis {
21 
23 {
24  if (!callback) {
25  AWS_LOG_ERROR(__func__, "Invalid callback was provided");
26  return false;
27  }
28  SubscriberSetupFn image_transport_setup_closure =
29  [this, callback](const StreamSubscriptionDescriptor & descriptor) -> bool {
31  boost::function<void(const sensor_msgs::ImageConstPtr &)> callback_wrapper;
32  callback_wrapper = [this, callback,
33  descriptor](const sensor_msgs::ImageConstPtr & image) -> void {
34  callback(*this->stream_manager_, descriptor.stream_name, image);
35  };
37  it.subscribe(descriptor.topic_name, descriptor.message_queue_size, callback_wrapper));
38  return true;
39  };
40  installers_.insert({KINESIS_STREAM_INPUT_TYPE_IMAGE_TRANSPORT, image_transport_setup_closure});
41  return true;
42 }
43 
46 {
47  if (!callback) {
48  AWS_LOG_ERROR(__func__, "Invalid callback was provided");
49  return false;
50  }
51  SubscriberSetupFn kinesis_video_frame_setup_closure =
52  [this, callback](const StreamSubscriptionDescriptor & descriptor) -> bool {
53  boost::function<void(const kinesis_video_msgs::KinesisVideoFrame::ConstPtr &)> callback_wrapper;
54  callback_wrapper = [this, callback, descriptor](
55  const kinesis_video_msgs::KinesisVideoFrame::ConstPtr & frame) -> void {
56  callback(*this->stream_manager_, descriptor.stream_name, frame);
57  };
59  descriptor.topic_name.c_str(), descriptor.message_queue_size, callback_wrapper));
60  return true;
61  };
62  installers_.insert(
63  {KINESIS_STREAM_INPUT_TYPE_KINESIS_VIDEO_FRAME, kinesis_video_frame_setup_closure});
64  return true;
65 }
66 
69 {
70  if (!callback) {
71  AWS_LOG_ERROR(__func__, "Invalid callback was provided");
72  return false;
73  }
74  SubscriberSetupFn rekognition_kinesis_video_frame_setup_closure =
75  [this, callback](const StreamSubscriptionDescriptor & descriptor) -> bool {
76  if (descriptor.rekognition_topic_name.empty()) {
77  AWS_LOG_ERROR(__func__, "Can't set up subscription: Rekognition topic name is empty.");
78  return false;
79  }
80  boost::function<void(const kinesis_video_msgs::KinesisVideoFrame::ConstPtr &)> callback_wrapper;
81  ros::Publisher publisher = handle_->advertise<std_msgs::String>(
82  descriptor.rekognition_topic_name, descriptor.message_queue_size);
83  callback_wrapper = [this, callback, descriptor, publisher](
84  const kinesis_video_msgs::KinesisVideoFrame::ConstPtr & frame) -> void {
85  callback(*this->stream_manager_, descriptor.stream_name, frame, publisher);
86  };
88  descriptor.topic_name.c_str(), descriptor.message_queue_size, callback_wrapper));
89  publishers_[descriptor.topic_name] = publisher;
90  return true;
91  };
93  rekognition_kinesis_video_frame_setup_closure});
94  return true;
95 }
96 
98 {
99  if (topic_name.empty()) {
100  return;
101  }
102  for (auto subscriber = standard_subscribers_.begin(); subscriber != standard_subscribers_.end();
103  subscriber++) {
104  if (subscriber->getTopic() == topic_name) {
105  subscriber->shutdown();
106  if (0 < publishers_.count(topic_name)) {
107  publishers_.at(topic_name).shutdown();
108  }
109  }
110  }
111  for (auto subscriber = image_transport_subscribers_.begin();
112  subscriber != image_transport_subscribers_.end(); subscriber++) {
113  if (subscriber->getTopic() == topic_name) {
114  subscriber->shutdown();
115  }
116  }
117 }
118 
119 } // namespace Kinesis
120 } // namespace Aws
void(* KinesisVideoFrameTransportCallbackFn)(KinesisStreamManagerInterface &stream_manager, std::string stream_name, const kinesis_video_msgs::KinesisVideoFrame::ConstPtr &frame_msg)
Subscriber subscribe(const std::string &base_topic, uint32_t queue_size, const boost::function< void(const sensor_msgs::ImageConstPtr &)> &callback, const ros::VoidPtr &tracked_object=ros::VoidPtr(), const TransportHints &transport_hints=TransportHints())
bool SetupImageTransport(const ImageTransportCallbackFn callback)
Subscriber subscribe(const std::string &topic, uint32_t queue_size, void(T::*fp)(M), T *obj, const TransportHints &transport_hints=TransportHints())
std::map< KinesisStreamInputType, SubscriberSetupFn > installers_
std::function< bool(const StreamSubscriptionDescriptor &descriptor)> SubscriberSetupFn
bool SetupKinesisVideoFrameTransport(const KinesisVideoFrameTransportCallbackFn callback)
const char * topic_name
void Uninstall(const std::string &topic_name) override
void(* RekognitionEnabledKinesisVideoFrameTransportCallbackFn)(KinesisStreamManagerInterface &stream_manager, std::string stream_name, const kinesis_video_msgs::KinesisVideoFrame::ConstPtr &frame_msg, const ros::Publisher &publisher)
void(* ImageTransportCallbackFn)(const KinesisStreamManagerInterface &stream_manager, std::string stream_name, const sensor_msgs::ImageConstPtr &image)
Publisher advertise(const std::string &topic, uint32_t queue_size, bool latch=false)
Aws::Kinesis::KinesisStreamManagerInterface * stream_manager_
std::vector< image_transport::Subscriber > image_transport_subscribers_
bool SetupRekognitionEnabledKinesisVideoFrameTransport(const RekognitionEnabledKinesisVideoFrameTransportCallbackFn callback)


kinesis_video_streamer
Author(s): AWS RoboMaker
autogenerated on Fri Mar 5 2021 03:29:15