generic_lazy_pubsub.cpp
Go to the documentation of this file.
1 // SPDX-License-Identifier: BSD-3-Clause
2 // SPDX-FileCopyrightText: Czech Technical University in Prague
3 
11 #include <mutex>
12 #include <string>
13 #include <utility>
14 
15 #include <boost/bind.hpp>
16 #include <boost/bind/placeholders.hpp>
17 
18 #include <ros/advertise_options.h>
19 #include <ros/duration.h>
20 #include <ros/init.h>
21 #include <ros/message_event.h>
22 #include <ros/node_handle.h>
24 #include <ros/subscribe_options.h>
25 #include <ros/subscriber.h>
26 #include <topic_tools/shape_shifter.h>
27 
30 
34 
36  const ros::NodeHandle& nhIn, const std::string& topicIn, const ros::NodeHandle& nhOut, const std::string& topicOut,
37  const size_t inQueueSize, const size_t outQueueSize, CallbackType callback,
38  ros::SubscribeOptions subscribeOptions, const cras::LogHelperPtr& logHelper) :
39  cras::ConditionalSubscriber(cras::bind_front(&cras::GenericLazyPubSub::subscribe, this), logHelper),
40  topicIn(topicIn), topicOut(topicOut), inQueueSize(inQueueSize), outQueueSize(outQueueSize),
41  nhIn(nhIn), nhOut(nhOut), callback(std::move(callback)), subscribeOptions(std::move(subscribeOptions))
42 {
43  // We have to connect at the beginning so that we can create the publisher from a subscribed message.
44  std::lock_guard<std::mutex> lock(this->connectMutex);
45  this->connectNoLock();
46 }
47 
49 {
50  ros::SubscribeOptions opts = this->subscribeOptions;
51  opts.template initByFullCallbackType<const ros::MessageEvent<topic_tools::ShapeShifter const>&>(
52  this->topicIn, this->inQueueSize, cras::bind_front(&cras::GenericLazyPubSub::cb, this));
53  sub = this->nhIn.subscribe(opts);
54 }
55 
57 {
58  if (!this->pub)
59  return true;
60  return this->pub.getNumSubscribers() > 0;
61 }
62 
64 {
65  if (!this->lazy)
66  return;
67  this->updateSubscription();
68 }
69 
71 {
72  const auto& msg = event.getConstMessage();
73  if (!this->pub)
74  {
75  std::lock_guard<std::mutex> pubLock(this->pubCreateMutex);
76  if (!this->pub) // the first check is outside mutex, this one is inside
77  {
78  this->advertiseOptions = this->createAdvertiseOptions(event); // cannot be const - advertise requires non-const
79  CRAS_INFO("Creating%s publisher on %s with type %s.",
80  (this->advertiseOptions->latch ? " latched" : ""),
81  this->nhOut.resolveName(this->topicOut).c_str(),
82  msg->getDataType().c_str());
83 
84  { // Advertise the publisher
85  std::lock_guard<std::mutex> lock(this->connectMutex);
86  this->pub = this->nhOut.advertise(this->advertiseOptions.value());
87  }
88  }
89 
90  for (size_t i = 0; i < 100 && ros::ok() && this->pub.getNumSubscribers() == 0; ++i)
91  ros::WallDuration(0.001).sleep();
92  this->updateSubscription();
93  }
94 
95  this->callback(event, this->pub);
96 }
97 
100 {
101  const auto& msg = event.getConstMessage();
102  auto cb = boost::bind(&cras::GenericLazyPubSub::connectCb, this, boost::placeholders::_1);
103  ros::AdvertiseOptions opts(this->topicOut, this->outQueueSize, msg->getMD5Sum(), msg->getDataType(),
104  msg->getMessageDefinition(), cb, cb);
105  opts.has_header = cras::hasHeader(*msg);
106  if (event.getConnectionHeaderPtr() != nullptr)
107  {
108  const auto& header = event.getConnectionHeader();
109  opts.latch = header.find("latching") != header.end() && event.getConnectionHeader()["latching"] == "1";
110  }
111  return opts;
112 }
113 
114 cras::GenericLazyPubSub::GenericLazyPubSub(const ros::NodeHandle& nhIn, const std::string& topicIn,
115  const ros::NodeHandle& nhOut, const std::string& topicOut, size_t queueSize,
117  const cras::LogHelperPtr& logHelper) :
119  nhIn, topicIn, nhOut, topicOut, queueSize, queueSize, std::move(callback), std::move(subscribeOptions), logHelper)
120 {
121 }
122 
123 cras::GenericLazyPubSub::GenericLazyPubSub(const ros::NodeHandle& nhIn, const std::string& topicIn,
124  const ros::NodeHandle& nhOut, const std::string& topicOut, cras::GenericLazyPubSub::CallbackType callback,
125  ros::SubscribeOptions subscribeOptions, const cras::LogHelperPtr& logHelper) :
126  GenericLazyPubSub(nhIn, topicIn, nhOut, topicOut, 10, std::move(callback), std::move(subscribeOptions), logHelper)
127 {
128 }
129 
130 cras::GenericLazyPubSub::GenericLazyPubSub(const ros::NodeHandle& nh, const std::string& topicIn,
131  const std::string& topicOut, size_t inQueueSize, size_t outQueueSize, cras::GenericLazyPubSub::CallbackType callback,
132  ros::SubscribeOptions subscribeOptions, const cras::LogHelperPtr& logHelper) :
133  GenericLazyPubSub(
134  nh, topicIn, nh, topicOut, inQueueSize, outQueueSize, std::move(callback), std::move(subscribeOptions), logHelper)
135 {
136 }
137 
138 cras::GenericLazyPubSub::GenericLazyPubSub(const ros::NodeHandle& nh, const std::string& topicIn,
139  const std::string& topicOut, size_t queueSize, cras::GenericLazyPubSub::CallbackType callback,
140  ros::SubscribeOptions subscribeOptions, const cras::LogHelperPtr& logHelper) :
141  GenericLazyPubSub(
142  nh, topicIn, topicOut, queueSize, queueSize, std::move(callback), std::move(subscribeOptions), logHelper)
143 {
144 }
145 
146 cras::GenericLazyPubSub::GenericLazyPubSub(const ros::NodeHandle& nh, const std::string& topicIn,
147  const std::string& topicOut, cras::GenericLazyPubSub::CallbackType callback,
148  ros::SubscribeOptions subscribeOptions, const cras::LogHelperPtr& logHelper) :
149  GenericLazyPubSub(nh, topicIn, topicOut, 10, std::move(callback), std::move(subscribeOptions), logHelper)
150 {
151 }
152 
cras::GenericLazyPubSub
A pair of lazy subscriber and publisher which use the same message type (unknown at compile time).
Definition: generic_lazy_pubsub.hpp:44
ros::WallDuration::sleep
bool sleep() const
node_handle.h
msg
msg
cras::GenericLazyPubSub::subscribe
void subscribe(::ros::Subscriber &sub)
Perform the subscription to the input topic.
Definition: generic_lazy_pubsub.cpp:48
cras::GenericLazyPubSub::shouldBeSubscribed
bool shouldBeSubscribed() const override
Returns true when the subscriber should be connected - i.e. either at the start, or when pub has clie...
Definition: generic_lazy_pubsub.cpp:56
cras
cras::hasHeader
bool hasHeader(const ::topic_tools::ShapeShifter &msg)
Tell whether the given message has header field.
shape_shifter.h
Tools for more convenient working with ShapeShifter objects.
cras::GenericLazyPubSub::CallbackType
::boost::function< void(::cras::GenericLazyPubSub::CallbackMessageType, ::ros::Publisher &)> CallbackType
Type of the user callback passed to this class.
Definition: generic_lazy_pubsub.hpp:51
init.h
cras::GenericLazyPubSub::cb
void cb(const ::ros::MessageEvent<::topic_tools::ShapeShifter const > &event)
Callback for the received messages from subscriber. It also handles publisher creation if none exists...
Definition: generic_lazy_pubsub.cpp:70
lazy_subscriber.hpp
Lazy subscriber that subscribes only when a paired publisher has subscribers.
cras::GenericLazyPubSub::GenericLazyPubSub
GenericLazyPubSub(const ::ros::NodeHandle &nhIn, const ::std::string &topicIn, const ::ros::NodeHandle &nhOut, const ::std::string &topicOut, size_t inQueueSize, size_t outQueueSize, CallbackType callback, ::ros::SubscribeOptions subscribeOptions={}, const ::cras::LogHelperPtr &logHelper=::std::make_shared<::cras::NodeLogHelper >())
Create the pair of lazy subscriber and publisher.
ros::AdvertiseOptions::has_header
bool has_header
ros::AdvertiseOptions
ros::ok
ROSCPP_DECL bool ok()
functional.hpp
CRAS_INFO
#define CRAS_INFO(...)
log_utils.h
ros::SingleSubscriberPublisher
generic_lazy_pubsub.hpp
Lazy subscriber that subscribes only when a paired publisher has subscribers. Version for unknown mes...
duration.h
ros::AdvertiseOptions::latch
bool latch
subscriber.h
subscribe
void subscribe()
single_subscriber_publisher.h
bind_front
auto bind_front(F &&f, Args &&... args)
advertise_options.h
cras::LogHelperPtr
::cras::LogHelper::Ptr LogHelperPtr
ros::SubscribeOptions
std
ros::MessageEvent::getConnectionHeaderPtr
const boost::shared_ptr< M_string > & getConnectionHeaderPtr() const
createAdvertiseOptions
ROSBAG_DECL ros::AdvertiseOptions createAdvertiseOptions(const ConnectionInfo *c, uint32_t queue_size, const std::string &prefix="")
ros::WallDuration
cras::GenericLazyPubSub::createAdvertiseOptions
virtual ::ros::AdvertiseOptions createAdvertiseOptions(const ::ros::MessageEvent<::topic_tools::ShapeShifter const > &event)
Create ros::AdvertiseOptions from a message event.
Definition: generic_lazy_pubsub.cpp:98
header
const std::string header
cras::GenericLazyPubSub::connectCb
void connectCb(const ::ros::SingleSubscriberPublisher &)
Callback that is called whenever someone (un)subscribes from the publisher.
Definition: generic_lazy_pubsub.cpp:63
message_event.h
subscribe_options.h
ros::NodeHandle
ros::MessageEvent
ros::Subscriber


cras_topic_tools
Author(s): Martin Pecka
autogenerated on Wed Jan 8 2025 03:50:28