36 #include <gtest/gtest.h> 44 #include <boost/atomic.hpp> 45 #include <boost/shared_ptr.hpp> 46 #include <boost/bind.hpp> 47 #include <boost/thread.hpp> 48 #include <boost/function.hpp> 63 boost::mutex::scoped_lock lock(mutex);
81 EXPECT_EQ(cb->count, 1U);
86 EXPECT_EQ(cb->count, 2U);
89 EXPECT_EQ(cb->count, 2U);
92 EXPECT_EQ(cb->count, 2U);
99 for (uint32_t i = 0; i < 1000; ++i)
106 EXPECT_EQ(cb->count, 1000U);
113 for (uint32_t i = 0; i < 1000; ++i)
118 for (uint32_t i = 0; i < 1000; ++i)
121 EXPECT_EQ(cb->count, i + 1);
135 EXPECT_EQ(cb1->count, 0U);
136 EXPECT_EQ(cb2->count, 1U);
152 queue->removeByID(
id);
179 EXPECT_EQ(cb1->count, 1U);
180 EXPECT_EQ(cb2->count, 1U);
189 , use_available(use_available)
200 queue->callAvailable();
227 EXPECT_EQ(cb->count, 3U);
239 EXPECT_EQ(cb->count, 3U);
251 EXPECT_EQ(cb->count, 3U);
263 EXPECT_EQ(cb->count, 3U);
267 CallbackQueue* queue,
bool& done, boost::atomic<size_t>* num_calls,
279 num_calls->fetch_add(i);
286 size_t* num_calls = NULL,
size_t num_threads = 10,
292 boost::thread_group tg;
294 boost::atomic<size_t> calls(0);
296 for (uint32_t i = 0; i < num_threads; ++i)
298 tg.create_thread(boost::bind(
threadFunc, &queue, boost::ref(done), &calls, call_one_timeout));
307 if (!pause_between_callbacks.isZero())
309 pause_between_callbacks.sleep();
332 EXPECT_EQ(cb->count, i);
336 CallbackQueue* queue,
bool& done, boost::atomic<size_t>* num_calls,
348 num_calls->fetch_add(i);
357 EXPECT_EQ(cb->count, i);
364 const std::string& topic, int32_t queue_size,
365 bool allow_concurrent_callbacks)
372 ready_count.fetch_add(1);
405 const size_t queue_size =
static_cast<size_t>(test_duration.
toSec()) + 1;
408 const CountingSubscriptionQueuePtr cb(
409 boost::make_shared<CountingSubscriptionQueue>(
"test", queue_size,
false));
418 for (
size_t i = 0; i < queue_size; ++i)
425 size_t num_call_one_calls = 0;
428 num_threads, test_duration, pause_between_callbacks, call_one_timeout);
430 const uint32_t num_callbacks_called = helper->calls_;
431 const size_t num_ready_called = cb->ready_count;
447 num_callbacks_called <<
" callbacks out of " << num_callbacks_to_call);
448 ROS_INFO_STREAM(
"callOne() was called " << num_call_one_calls <<
" times.");
449 ROS_INFO_STREAM(
"ready() was called " << num_ready_called <<
" times.");
451 EXPECT_EQ(num_callbacks_called, queue_size);
452 EXPECT_LE(num_call_one_calls, 2 * num_callbacks_to_call + num_threads * (1/call_one_timeout.
toSec()) * queue_size);
524 recursiveTimerQueue = &queue;
525 TimerRecursionCallbackPtr cb(boost::make_shared<TimerRecursionCallback>(&queue));
528 boost::thread_group tg;
530 boost::atomic<size_t> calls(0);
532 for (uint32_t i = 0; i < 2; ++i)
550 : id(0), queue(_queue) {
551 condition_sync.store(
true);
552 condition_one.store(
false);
553 condition_stop.store(
false);
569 : condition_object(_condition_object), id(_id)
574 condition_object->condition_one.store(
false);
584 while(!condition_stop.load())
586 if(condition_sync.load() && queue->
isEmpty())
588 condition_one.store(
true);
590 queue->
addCallback(boost::make_shared<RaceConditionCallback>(
this, &
id),
id);
592 boost::this_thread::sleep(boost::posix_time::microseconds(1));
602 for(
unsigned int i = 0; i < 1000000; ++i)
621 int main(
int argc,
char** argv)
623 testing::InitGoogleTest(&argc, argv);
624 return RUN_ALL_TESTS();
boost::shared_ptr< TimerRecursionCallback > TimerRecursionCallbackPtr
int main(int argc, char **argv)
virtual CallResult call()
bool param(const std::string ¶m_name, T ¶m_val, const T &default_val)
virtual void addCallback(const CallbackInterfacePtr &callback, uint64_t removal_id=0)
ros::WallDuration callback_duration
void dummyTimer(const ros::TimerEvent &)
Timer createTimer(Rate r, Handler h, Obj o, bool oneshot=false, bool autostart=true) const
boost::shared_ptr< CountingCallback > CountingCallbackPtr
boost::atomic< size_t > ready_count
virtual CallResult call()
ros::WallDuration call_one_timeout
ConditionObject(CallbackQueue *_queue)
virtual CallResult call()
boost::atomic< bool > condition_stop
virtual CallResult call()
RecursiveCallback(CallbackQueue *queue, bool use_available)
ros::WallDuration pause_between_callbacks
boost::shared_ptr< RecursiveCallback > RecursiveCallbackPtr
bool add(test_roscpp::TestStringString::Request &, test_roscpp::TestStringString::Response &)
boost::atomic< bool > condition_one
CallbackQueueInterface * recursiveTimerQueue
TEST_P(CallbackQueueParamTest, threadedCallOneSlow)
ConditionObject * condition_object
CallbackQueueInterface * queue
boost::weak_ptr< void const > VoidConstWPtr
void threadFunc(boost::barrier *b)
virtual void removeByID(uint64_t removal_id)
INSTANTIATE_TEST_CASE_P(slow, CallbackQueueParamTest, ::testing::Values(ThreadedCallOneSlowParams{ros::WallDuration(1.0), 10, ros::WallDuration(0.1), ros::WallDuration(2), ros::WallDuration(0)}, ThreadedCallOneSlowParams{ros::WallDuration(1.0), 10, ros::WallDuration(0.1), ros::WallDuration(2), ros::WallDuration(0.1)}, ThreadedCallOneSlowParams{ros::WallDuration(1.0), 10, ros::WallDuration(0.1), ros::WallDuration(2), ros::WallDuration(0.001)}, ThreadedCallOneSlowParams{ros::WallDuration(0.1), 10, ros::WallDuration(0.1), ros::WallDuration(2), ros::WallDuration(0)}, ThreadedCallOneSlowParams{ros::WallDuration(0.001), 10, ros::WallDuration(0.1), ros::WallDuration(2), ros::WallDuration(0)}, ThreadedCallOneSlowParams{ros::WallDuration(1.0), 1, ros::WallDuration(0.1), ros::WallDuration(2), ros::WallDuration(0)}, ThreadedCallOneSlowParams{ros::WallDuration(1.0), 2, ros::WallDuration(0.1), ros::WallDuration(2), ros::WallDuration(0)}))
TEST(CallbackQueue, singleCallback)
void recursiveTimer(const ros::TimerEvent &)
#define ROS_INFO_STREAM(args)
void callAvailableThread(CallbackQueue *queue, bool &done, boost::atomic< size_t > *num_calls, ros::WallDuration call_timeout=ros::WallDuration(0.1))
boost::shared_ptr< SelfRemovingCallback > SelfRemovingCallbackPtr
CountingSubscriptionQueue(const std::string &topic, int32_t queue_size, bool allow_concurrent_callbacks)
size_t runThreadedTest(const CallbackInterfacePtr &cb, const boost::function< void(CallbackQueue *, bool &, boost::atomic< size_t > *, ros::WallDuration)> &threadFunc, size_t *num_calls=NULL, size_t num_threads=10, ros::WallDuration duration=ros::WallDuration(5), ros::WallDuration pause_between_callbacks=ros::WallDuration(0), ros::WallDuration call_one_timeout=ros::WallDuration(0.1))
SelfRemovingCallback(CallbackQueue *queue, uint64_t id)
boost::shared_ptr< CountingSubscriptionQueue > CountingSubscriptionQueuePtr
virtual CallResult call()
void callOneThread(CallbackQueue *queue, bool &done, boost::atomic< size_t > *num_calls, ros::WallDuration timeout=ros::WallDuration(0.1))
TimerRecursionCallback(CallbackQueueInterface *_queue)
boost::atomic< bool > condition_sync
ros::WallDuration test_duration
RaceConditionCallback(ConditionObject *_condition_object, unsigned long *_id)