test_pipeline.cpp
Go to the documentation of this file.
1 // -- BEGIN LICENSE BLOCK ----------------------------------------------
2 // Copyright 2022 Universal Robots A/S
3 //
4 // Redistribution and use in source and binary forms, with or without
5 // modification, are permitted provided that the following conditions are met:
6 //
7 // * Redistributions of source code must retain the above copyright
8 // notice, this list of conditions and the following disclaimer.
9 //
10 // * Redistributions in binary form must reproduce the above copyright
11 // notice, this list of conditions and the following disclaimer in the
12 // documentation and/or other materials provided with the distribution.
13 //
14 // * Neither the name of the {copyright_holder} nor the names of its
15 // contributors may be used to endorse or promote products derived from
16 // this software without specific prior written permission.
17 //
18 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19 // AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20 // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21 // ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
22 // LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23 // CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24 // SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25 // INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26 // CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27 // ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 // POSSIBILITY OF SUCH DAMAGE.
29 // -- END LICENSE BLOCK ------------------------------------------------
30 
31 #include <gtest/gtest.h>
32 #include <condition_variable>
33 
40 
41 using namespace urcl;
42 
43 class PipelineTest : public ::testing::Test
44 {
45 protected:
46  void SetUp()
47  {
48  server_.reset(new comm::TCPServer(60002));
49  server_->setConnectCallback(std::bind(&PipelineTest::connectionCallback, this, std::placeholders::_1));
50  server_->start();
51 
52  // Setup pipeline
53  stream_.reset(new comm::URStream<rtde_interface::RTDEPackage>("127.0.0.1", 60002));
54  std::vector<std::string> recipe = { "timestamp" };
55  parser_.reset(new rtde_interface::RTDEParser(recipe));
56  parser_->setProtocolVersion(2);
57  producer_.reset(new comm::URProducer<rtde_interface::RTDEPackage>(*stream_.get(), *parser_.get()));
58 
59  pipeline_.reset(new comm::Pipeline<rtde_interface::RTDEPackage>(*producer_.get(), "RTDE_PIPELINE", notifier_));
60  pipeline_->init();
61  }
62 
63  void teardown()
64  {
65  // Clean up
66  pipeline_->stop();
67  pipeline_.reset();
68  server_.reset();
69  }
70 
71  void connectionCallback(const socket_t filedescriptor)
72  {
73  std::lock_guard<std::mutex> lk(connect_mutex_);
74  client_fd_ = filedescriptor;
75  connect_cv_.notify_one();
76  connection_callback_ = true;
77  }
78 
79  bool waitForConnectionCallback(int milliseconds = 100)
80  {
81  std::unique_lock<std::mutex> lk(connect_mutex_);
82  if (connect_cv_.wait_for(lk, std::chrono::milliseconds(milliseconds)) == std::cv_status::no_timeout ||
83  connection_callback_ == true)
84  {
85  connection_callback_ = false;
86  return true;
87  }
88  return false;
89  }
90 
91  std::unique_ptr<comm::TCPServer> server_;
93 
94  std::unique_ptr<comm::URStream<rtde_interface::RTDEPackage>> stream_;
95  std::unique_ptr<rtde_interface::RTDEParser> parser_;
96  std::unique_ptr<comm::URProducer<rtde_interface::RTDEPackage>> producer_;
97  std::unique_ptr<comm::Pipeline<rtde_interface::RTDEPackage>> pipeline_;
99 
100  // Consumer class
101  class TestConsumer : public comm::IConsumer<rtde_interface::RTDEPackage>
102  {
103  public:
104  TestConsumer() = default;
105  virtual ~TestConsumer() = default;
106 
107  bool waitForConsumer(int milliseconds = 100)
108  {
109  std::unique_lock<std::mutex> lk(consumed_mutex);
110  if (consumed_cv.wait_for(lk, std::chrono::milliseconds(milliseconds)) == std::cv_status::no_timeout ||
111  consumed_callback == true)
112  {
113  consumed_callback = true;
114  return true;
115  }
116  return false;
117  }
118 
119  // Consume a package
120  virtual bool consume(std::shared_ptr<rtde_interface::RTDEPackage> product)
121  {
122  std::lock_guard<std::mutex> lk(consumed_mutex);
123  if (rtde_interface::DataPackage* data = dynamic_cast<rtde_interface::DataPackage*>(product.get()))
124  {
125  data->getData("timestamp", timestamp);
126  }
127  consumed_cv.notify_one();
128  consumed_callback = true;
129  return true;
130  }
131 
132  double timestamp = 0.0;
133  std::condition_variable consumed_cv;
134  std::mutex consumed_mutex;
135  bool consumed_callback = false;
136  };
137 
138 private:
139  std::condition_variable connect_cv_;
140  std::mutex connect_mutex_;
141 
142  bool connection_callback_ = false;
143 };
144 
145 TEST_F(PipelineTest, get_product_from_stopped_pipeline)
146 {
147  std::unique_ptr<rtde_interface::RTDEPackage> urpackage;
148  std::chrono::milliseconds timeout{ 100 };
149  EXPECT_EQ(pipeline_->getLatestProduct(urpackage, timeout), false);
150 }
151 
152 TEST_F(PipelineTest, get_product_from_running_pipeline)
153 {
154  waitForConnectionCallback();
155  pipeline_->run();
156 
157  // RTDE package with timestamp
158  uint8_t data_package[] = { 0x00, 0x0c, 0x55, 0x01, 0x40, 0xbb, 0xbf, 0xdb, 0xa5, 0xe3, 0x53, 0xf7 };
159  size_t written;
160  server_->write(client_fd_, data_package, sizeof(data_package), written);
161 
162  std::unique_ptr<rtde_interface::RTDEPackage> urpackage;
163  std::chrono::milliseconds timeout{ 500 };
164  EXPECT_EQ(pipeline_->getLatestProduct(urpackage, timeout), true);
165  if (rtde_interface::DataPackage* data = dynamic_cast<rtde_interface::DataPackage*>(urpackage.get()))
166  {
167  double timestamp;
168  double expected_timestamp = 7103.8579;
169  data->getData("timestamp", timestamp);
170  EXPECT_FLOAT_EQ(timestamp, expected_timestamp);
171  }
172  else
173  {
174  std::cout << "Failed to get data package data" << std::endl;
175  GTEST_FAIL();
176  }
177 }
178 
179 TEST_F(PipelineTest, stop_pipeline)
180 {
181  waitForConnectionCallback();
182  pipeline_->run();
183 
184  // RTDE package with timestamp
185  uint8_t data_package[] = { 0x00, 0x0c, 0x55, 0x01, 0x40, 0xbb, 0xbf, 0xdb, 0xa5, 0xe3, 0x53, 0xf7 };
186  size_t written;
187  server_->write(client_fd_, data_package, sizeof(data_package), written);
188 
189  std::unique_ptr<rtde_interface::RTDEPackage> urpackage;
190  std::chrono::milliseconds timeout{ 500 };
191  // Ensure that pipeline is running
192  EXPECT_EQ(pipeline_->getLatestProduct(urpackage, timeout), true);
193 
194  pipeline_->stop();
195 
196  // We shouldn't be able to fetch a package when the pipeline has been stopped
197  EXPECT_EQ(pipeline_->getLatestProduct(urpackage, timeout), false);
198 }
199 
200 TEST_F(PipelineTest, consumer_pipeline)
201 {
202  stream_.reset(new comm::URStream<rtde_interface::RTDEPackage>("127.0.0.1", 60002));
203  producer_.reset(new comm::URProducer<rtde_interface::RTDEPackage>(*stream_.get(), *parser_.get()));
204  TestConsumer consumer;
205  pipeline_.reset(
206  new comm::Pipeline<rtde_interface::RTDEPackage>(*producer_.get(), &consumer, "RTDE_PIPELINE", notifier_));
207  pipeline_->init();
208  waitForConnectionCallback();
209  pipeline_->run();
210 
211  // RTDE package with timestamp
212  uint8_t data_package[] = { 0x00, 0x0c, 0x55, 0x01, 0x40, 0xbb, 0xbf, 0xdb, 0xa5, 0xe3, 0x53, 0xf7 };
213  size_t written;
214  server_->write(client_fd_, data_package, sizeof(data_package), written);
215 
216  // Wait for data to be consumed
217  int max_retries = 3;
218  int count = 0;
219  while (consumer.waitForConsumer(500) == false)
220  {
221  if (count >= max_retries)
222  {
223  break;
224  }
225  server_->write(client_fd_, data_package, sizeof(data_package), written);
226  count++;
227  }
228  EXPECT_LT(count, max_retries);
229 
230  // Test that the package was consumed
231  double expected_timestamp = 7103.8579;
232  EXPECT_FLOAT_EQ(consumer.timestamp, expected_timestamp);
233 
234  pipeline_->stop();
235 }
236 
237 TEST_F(PipelineTest, connect_non_connected_robot)
238 {
239  stream_.reset(new comm::URStream<rtde_interface::RTDEPackage>("127.0.0.1", 12321));
240  producer_.reset(new comm::URProducer<rtde_interface::RTDEPackage>(*stream_.get(), *parser_.get()));
241  TestConsumer consumer;
242  pipeline_.reset(
243  new comm::Pipeline<rtde_interface::RTDEPackage>(*producer_.get(), &consumer, "RTDE_PIPELINE", notifier_));
244 
245  auto start = std::chrono::system_clock::now();
246  EXPECT_THROW(pipeline_->init(2, std::chrono::milliseconds(500)), UrException);
247  auto end = std::chrono::system_clock::now();
248  auto elapsed = end - start;
249  // This is only a rough estimate, obviously
250  EXPECT_LT(elapsed, std::chrono::milliseconds(7500));
251 }
252 
253 int main(int argc, char* argv[])
254 {
255  ::testing::InitGoogleTest(&argc, argv);
256 
257  return RUN_ALL_TESTS();
258 }
pipeline.h
PipelineTest::connectionCallback
void connectionCallback(const socket_t filedescriptor)
Definition: test_pipeline.cpp:71
urcl::comm::URStream
The stream is an abstraction of the TCPSocket that offers reading a full UR data package out of the s...
Definition: stream.h:40
PipelineTest::stream_
std::unique_ptr< comm::URStream< rtde_interface::RTDEPackage > > stream_
Definition: test_pipeline.cpp:94
PipelineTest::connect_cv_
std::condition_variable connect_cv_
Definition: test_pipeline.cpp:139
socket_t
int socket_t
Definition: socket_t.h:57
PipelineTest::connect_mutex_
std::mutex connect_mutex_
Definition: test_pipeline.cpp:140
urcl::rtde_interface::DataPackage
The DataPackage class handles communication in the form of RTDE data packages both to and from the ro...
Definition: data_package.h:60
PipelineTest::server_
std::unique_ptr< comm::TCPServer > server_
Definition: test_pipeline.cpp:91
urcl
Definition: bin_parser.h:36
urcl::rtde_interface::RTDEParser
The RTDE specific parser. Interprets a given byte stream as serialized RTDE packages and parses it ac...
Definition: rtde_parser.h:45
rtde_parser.h
PipelineTest::pipeline_
std::unique_ptr< comm::Pipeline< rtde_interface::RTDEPackage > > pipeline_
Definition: test_pipeline.cpp:97
urcl::comm::URProducer
A general producer for URPackages. Implements funcionality to produce packages by reading and parsing...
Definition: producer.h:40
PipelineTest
Definition: test_pipeline.cpp:43
urcl::UrException
Our base class for exceptions. Specialized exceptions should inherit from those.
Definition: exceptions.h:53
PipelineTest::waitForConnectionCallback
bool waitForConnectionCallback(int milliseconds=100)
Definition: test_pipeline.cpp:79
main
int main(int argc, char *argv[])
Definition: test_pipeline.cpp:253
urcl::comm::INotifier
Parent class for notifiers.
Definition: pipeline.h:253
PipelineTest::SetUp
void SetUp()
Definition: test_pipeline.cpp:46
PipelineTest::TestConsumer::consumed_mutex
std::mutex consumed_mutex
Definition: test_pipeline.cpp:134
producer.h
stream.h
PipelineTest::parser_
std::unique_ptr< rtde_interface::RTDEParser > parser_
Definition: test_pipeline.cpp:95
urcl::comm::TCPServer
Wrapper class for a TCP socket server.
Definition: tcp_server.h:58
urcl::comm::Pipeline
The Pipeline manages the production and optionally consumption of packages. Cyclically the producer i...
Definition: pipeline.h:278
rtde_package.h
PipelineTest::TestConsumer
Definition: test_pipeline.cpp:101
PipelineTest::TestConsumer::waitForConsumer
bool waitForConsumer(int milliseconds=100)
Definition: test_pipeline.cpp:107
urcl::comm::IConsumer
Parent class for for arbitrary consumers.
Definition: pipeline.h:45
PipelineTest::TestConsumer::consumed_cv
std::condition_variable consumed_cv
Definition: test_pipeline.cpp:133
PipelineTest::notifier_
comm::INotifier notifier_
Definition: test_pipeline.cpp:98
PipelineTest::producer_
std::unique_ptr< comm::URProducer< rtde_interface::RTDEPackage > > producer_
Definition: test_pipeline.cpp:96
TEST_F
TEST_F(PipelineTest, get_product_from_stopped_pipeline)
Definition: test_pipeline.cpp:145
tcp_server.h
PipelineTest::teardown
void teardown()
Definition: test_pipeline.cpp:63
PipelineTest::TestConsumer::consume
virtual bool consume(std::shared_ptr< rtde_interface::RTDEPackage > product)
Consumes a product, utilizing it's contents.
Definition: test_pipeline.cpp:120
PipelineTest::client_fd_
socket_t client_fd_
Definition: test_pipeline.cpp:92


ur_client_library
Author(s): Thomas Timm Andersen, Simon Rasmussen, Felix Exner, Lea Steffen, Tristan Schnell
autogenerated on Mon May 26 2025 02:35:58