publisher.cpp
Go to the documentation of this file.
1 /*********************************************************************
2 * Software License Agreement (BSD License)
3 *
4 * Copyright (c) 2010, Willow Garage, Inc.
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
9 * are met:
10 *
11 * * Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * * Redistributions in binary form must reproduce the above
14 * copyright notice, this list of conditions and the following
15 * disclaimer in the documentation and/or other materials provided
16 * with the distribution.
17 * * Neither the name of the Willow Garage nor the names of its
18 * contributors may be used to endorse or promote products derived
19 * from this software without specific prior written permission.
20 *
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29 * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32 * POSSIBILITY OF SUCH DAMAGE.
33 *********************************************************************/
34 
35 #include <rosrt/publisher.h>
37 #include <rosrt/detail/managers.h>
38 #include <rosrt/init.h>
39 #include <ros/debug.h>
40 
41 #include <lockfree/object_pool.h>
42 
43 #include "boost/thread.hpp"
44 
45 namespace rosrt
46 {
47 namespace detail
48 {
49 
50 bool publish(const ros::Publisher& pub, const VoidConstPtr& msg, PublishFunc pub_func, CloneFunc clone_func)
51 {
52  return detail::getPublisherManager()->publish(pub, msg, pub_func, clone_func);
53 }
54 
55 PublishQueue::PublishQueue(uint32_t size)
56 : queue_(size)
57 {
58 }
59 
60 bool PublishQueue::push(const ros::Publisher& pub, const VoidConstPtr& msg, PublishFunc pub_func, CloneFunc clone_func)
61 {
62  PubItem i;
63  i.pub = pub;
64  i.msg = msg;
65  i.pub_func = pub_func;
66  i.clone_func = clone_func;
67  return queue_.push(i);
68 }
69 
70 uint32_t PublishQueue::publishAll()
71 {
72  uint32_t count = 0;
73 
74  MWSRQueue<PubItem>::Node* it = queue_.popAll();
75  while (it)
76  {
77  // Always clone the message before publishing. Otherwise, if there's an intraprocess non-realtime subscriber that stores off the messages
78  // it could starve the realtime publisher for messages.
79  VoidConstPtr clone = it->val.clone_func(it->val.msg);
80  it->val.pub_func(it->val.pub, clone);
81  it->val.msg.reset();
82  it->val.pub = ros::Publisher();
83  MWSRQueue<PubItem>::Node* tmp = it;
84  it = it->next;
85 
86  queue_.free(tmp);
87 
88  ++count;
89  }
90 
91  return count;
92 }
93 
94 PublisherManager::PublisherManager(const InitOptions& ops)
95 : queue_(ops.pubmanager_queue_size)
96 , pub_count_(0)
97 , running_(true)
98 {
99  pub_thread_ = boost::thread(&PublisherManager::publishThread, this);
100 }
101 
103 {
104  running_ = false;
105  cond_.notify_one();
106  pub_thread_.join();
107 }
108 
110 {
111  while (running_)
112  {
113  {
114  boost::mutex::scoped_lock lock(cond_mutex_);
115  while (running_ && pub_count_.load() == 0)
116  {
117  cond_.wait(lock);
118  }
119 
120  if (!running_)
121  {
122  return;
123  }
124  }
125 
126  uint32_t count = queue_.publishAll();
127  pub_count_.fetch_sub(count);
128  }
129 }
130 
131 bool PublisherManager::publish(const ros::Publisher& pub, const VoidConstPtr& msg, PublishFunc pub_func, CloneFunc clone_func)
132 {
133  if (!queue_.push(pub, msg, pub_func, clone_func))
134  {
135  return false;
136  }
137 
138  pub_count_.fetch_add(1);
139  cond_.notify_one();
140 
141  return true;
142 }
143 
144 } // namespace detail
145 } // namespace rosrt
rosrt::detail::PublisherManager::queue_
PublishQueue queue_
Definition: publisher_manager.h:115
publisher_manager.h
ros::Publisher
object_pool.h
boost::shared_ptr< void const >
rosrt::PublishFunc
void(* PublishFunc)(const ros::Publisher &pub, const VoidConstPtr &msg)
Definition: publisher.h:82
rosrt
Definition: managers.h:38
rosrt::CloneFunc
VoidConstPtr(* CloneFunc)(const VoidConstPtr &msg)
Definition: publisher.h:83
rosrt::detail::PublisherManager::running_
volatile bool running_
Definition: publisher_manager.h:120
rosrt::detail::PublisherManager::~PublisherManager
~PublisherManager()
Definition: publisher.cpp:166
managers.h
rosrt::detail::PublishQueue::queue_
MWSRQueue< PubItem > queue_
Definition: publisher_manager.h:102
rosrt::detail::PublisherManager::PublisherManager
PublisherManager(const InitOptions &ops)
Definition: publisher.cpp:158
rosrt::detail::PublishQueue::push
bool push(const ros::Publisher &pub, const VoidConstPtr &msg, PublishFunc pub_func, CloneFunc clone_func)
Definition: publisher.cpp:124
init.h
rosrt::detail::PublishQueue::PublishQueue
PublishQueue(uint32_t size)
Definition: publisher.cpp:119
rosrt::detail::PublisherManager::pub_count_
ros::atomic< uint32_t > pub_count_
Definition: publisher_manager.h:119
rosrt::detail::PublisherManager::publish
bool publish(const ros::Publisher &pub, const VoidConstPtr &msg, PublishFunc pub_func, CloneFunc clone_func)
Definition: publisher.cpp:195
rosrt::detail::PublisherManager::pub_thread_
boost::thread pub_thread_
Definition: publisher_manager.h:118
debug.h
publisher.h
rosrt::detail::PublisherManager::cond_mutex_
boost::mutex cond_mutex_
Definition: publisher_manager.h:117
rosrt::detail::getPublisherManager
PublisherManager * getPublisherManager()
Definition: init.cpp:145
rosrt::detail::publish
bool publish(const ros::Publisher &pub, const VoidConstPtr &msg, PublishFunc pub_func, CloneFunc clone_func)
Definition: publisher.cpp:114
rosrt::detail::PublisherManager::publishThread
void publishThread()
Definition: publisher.cpp:173
rosrt::detail::PublishQueue::publishAll
uint32_t publishAll()
Definition: publisher.cpp:134
rosrt::detail::PublisherManager::cond_
boost::condition_variable cond_
Definition: publisher_manager.h:116


rosrt
Author(s): Josh Faust
autogenerated on Wed Mar 2 2022 00:54:17