test_pipeline.cpp
Go to the documentation of this file.
1 // -- BEGIN LICENSE BLOCK ----------------------------------------------
2 // Copyright 2022 Universal Robots A/S
3 //
4 // Redistribution and use in source and binary forms, with or without
5 // modification, are permitted provided that the following conditions are met:
6 //
7 // * Redistributions of source code must retain the above copyright
8 // notice, this list of conditions and the following disclaimer.
9 //
10 // * Redistributions in binary form must reproduce the above copyright
11 // notice, this list of conditions and the following disclaimer in the
12 // documentation and/or other materials provided with the distribution.
13 //
14 // * Neither the name of the {copyright_holder} nor the names of its
15 // contributors may be used to endorse or promote products derived from
16 // this software without specific prior written permission.
17 //
18 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19 // AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20 // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21 // ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
22 // LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23 // CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24 // SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25 // INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26 // CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27 // ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 // POSSIBILITY OF SUCH DAMAGE.
29 // -- END LICENSE BLOCK ------------------------------------------------
30 
31 #include <gtest/gtest.h>
32 #include <condition_variable>
33 
40 
41 using namespace urcl;
42 
43 class PipelineTest : public ::testing::Test
44 {
45 protected:
46  void SetUp()
47  {
48  server_.reset(new comm::TCPServer(60002));
49  server_->setConnectCallback(std::bind(&PipelineTest::connectionCallback, this, std::placeholders::_1));
50  server_->start();
51 
52  // Setup pipeline
53  stream_.reset(new comm::URStream<rtde_interface::RTDEPackage>("127.0.0.1", 60002));
54  std::vector<std::string> recipe = { "timestamp" };
55  parser_.reset(new rtde_interface::RTDEParser(recipe));
56  parser_->setProtocolVersion(2);
57  producer_.reset(new comm::URProducer<rtde_interface::RTDEPackage>(*stream_.get(), *parser_.get()));
58 
59  pipeline_.reset(new comm::Pipeline<rtde_interface::RTDEPackage>(*producer_.get(), "RTDE_PIPELINE", notifier_));
60  pipeline_->init();
61  }
62 
63  void Teardown()
64  {
65  // Clean up
66  pipeline_->stop();
67  pipeline_.reset();
68  server_.reset();
69  }
70 
71  void connectionCallback(const int filedescriptor)
72  {
73  std::lock_guard<std::mutex> lk(connect_mutex_);
74  client_fd_ = filedescriptor;
75  connect_cv_.notify_one();
76  connection_callback_ = true;
77  }
78 
79  bool waitForConnectionCallback(int milliseconds = 100)
80  {
81  std::unique_lock<std::mutex> lk(connect_mutex_);
82  if (connect_cv_.wait_for(lk, std::chrono::milliseconds(milliseconds)) == std::cv_status::no_timeout ||
83  connection_callback_ == true)
84  {
85  connection_callback_ = false;
86  return true;
87  }
88  return false;
89  }
90 
91  std::unique_ptr<comm::TCPServer> server_;
93 
94  std::unique_ptr<comm::URStream<rtde_interface::RTDEPackage>> stream_;
95  std::unique_ptr<rtde_interface::RTDEParser> parser_;
96  std::unique_ptr<comm::URProducer<rtde_interface::RTDEPackage>> producer_;
97  std::unique_ptr<comm::Pipeline<rtde_interface::RTDEPackage>> pipeline_;
99 
100  // Consumer class
101  class TestConsumer : public comm::IConsumer<rtde_interface::RTDEPackage>
102  {
103  public:
104  TestConsumer() = default;
105  virtual ~TestConsumer() = default;
106 
107  bool waitForConsumer(int milliseconds = 100)
108  {
109  std::unique_lock<std::mutex> lk(consumed_mutex_);
110  if (consumed_cv_.wait_for(lk, std::chrono::milliseconds(milliseconds)) == std::cv_status::no_timeout ||
111  consumed_callback_ == true)
112  {
113  consumed_callback_ = true;
114  return true;
115  }
116  return false;
117  }
118 
119  // Consume a package
120  virtual bool consume(std::shared_ptr<rtde_interface::RTDEPackage> product)
121  {
122  std::lock_guard<std::mutex> lk(consumed_mutex_);
123  if (rtde_interface::DataPackage* data = dynamic_cast<rtde_interface::DataPackage*>(product.get()))
124  {
125  data->getData("timestamp", timestamp_);
126  }
127  consumed_cv_.notify_one();
128  consumed_callback_ = true;
129  return true;
130  }
131 
132  double timestamp_ = 0.0;
133  std::condition_variable consumed_cv_;
134  std::mutex consumed_mutex_;
135  bool consumed_callback_ = false;
136  };
137 
138 private:
139  std::condition_variable connect_cv_;
140  std::mutex connect_mutex_;
141 
142  bool connection_callback_ = false;
143 };
144 
145 TEST_F(PipelineTest, get_product_from_stopped_pipeline)
146 {
147  std::unique_ptr<rtde_interface::RTDEPackage> urpackage;
148  std::chrono::milliseconds timeout{ 100 };
149  EXPECT_EQ(pipeline_->getLatestProduct(urpackage, timeout), false);
150 }
151 
152 TEST_F(PipelineTest, get_product_from_running_pipeline)
153 {
154  waitForConnectionCallback();
155  pipeline_->run();
156 
157  // RTDE package with timestamp
158  uint8_t data_package[] = { 0x00, 0x0c, 0x55, 0x01, 0x40, 0xbb, 0xbf, 0xdb, 0xa5, 0xe3, 0x53, 0xf7 };
159  size_t written;
160  server_->write(client_fd_, data_package, sizeof(data_package), written);
161 
162  std::unique_ptr<rtde_interface::RTDEPackage> urpackage;
163  std::chrono::milliseconds timeout{ 500 };
164  EXPECT_EQ(pipeline_->getLatestProduct(urpackage, timeout), true);
165  if (rtde_interface::DataPackage* data = dynamic_cast<rtde_interface::DataPackage*>(urpackage.get()))
166  {
167  double timestamp;
168  double expected_timestamp = 7103.8579;
169  data->getData("timestamp", timestamp);
170  EXPECT_FLOAT_EQ(timestamp, expected_timestamp);
171  }
172  else
173  {
174  std::cout << "Failed to get data package data" << std::endl;
175  GTEST_FAIL();
176  }
177 }
178 
179 TEST_F(PipelineTest, stop_pipeline)
180 {
181  waitForConnectionCallback();
182  pipeline_->run();
183 
184  // RTDE package with timestamp
185  uint8_t data_package[] = { 0x00, 0x0c, 0x55, 0x01, 0x40, 0xbb, 0xbf, 0xdb, 0xa5, 0xe3, 0x53, 0xf7 };
186  size_t written;
187  server_->write(client_fd_, data_package, sizeof(data_package), written);
188 
189  std::unique_ptr<rtde_interface::RTDEPackage> urpackage;
190  std::chrono::milliseconds timeout{ 500 };
191  // Ensure that pipeline is running
192  EXPECT_EQ(pipeline_->getLatestProduct(urpackage, timeout), true);
193 
194  pipeline_->stop();
195 
196  // We shouldn't be able to fetch a package when the pipeline has been stopped
197  EXPECT_EQ(pipeline_->getLatestProduct(urpackage, timeout), false);
198 }
199 
200 TEST_F(PipelineTest, consumer_pipeline)
201 {
202  stream_.reset(new comm::URStream<rtde_interface::RTDEPackage>("127.0.0.1", 60002));
203  producer_.reset(new comm::URProducer<rtde_interface::RTDEPackage>(*stream_.get(), *parser_.get()));
204  TestConsumer consumer;
205  pipeline_.reset(
206  new comm::Pipeline<rtde_interface::RTDEPackage>(*producer_.get(), &consumer, "RTDE_PIPELINE", notifier_));
207  pipeline_->init();
208  waitForConnectionCallback();
209  pipeline_->run();
210 
211  // RTDE package with timestamp
212  uint8_t data_package[] = { 0x00, 0x0c, 0x55, 0x01, 0x40, 0xbb, 0xbf, 0xdb, 0xa5, 0xe3, 0x53, 0xf7 };
213  size_t written;
214  server_->write(client_fd_, data_package, sizeof(data_package), written);
215 
216  // Wait for data to be consumed
217  int max_retries = 3;
218  int count = 0;
219  while (consumer.waitForConsumer(500) == false)
220  {
221  if (count >= max_retries)
222  {
223  break;
224  }
225  server_->write(client_fd_, data_package, sizeof(data_package), written);
226  count++;
227  }
228  EXPECT_LT(count, max_retries);
229 
230  // Test that the package was consumed
231  double expected_timestamp = 7103.8579;
232  EXPECT_FLOAT_EQ(consumer.timestamp_, expected_timestamp);
233 
234  pipeline_->stop();
235 }
236 
237 int main(int argc, char* argv[])
238 {
239  ::testing::InitGoogleTest(&argc, argv);
240 
241  return RUN_ALL_TESTS();
242 }
std::mutex connect_mutex_
TEST_F(PipelineTest, get_product_from_stopped_pipeline)
bool waitForConsumer(int milliseconds=100)
std::unique_ptr< comm::Pipeline< rtde_interface::RTDEPackage > > pipeline_
The stream is an abstraction of the TCPSocket that offers reading a full UR data package out of the s...
Definition: stream.h:42
std::condition_variable consumed_cv_
Wrapper class for a TCP socket server.
Definition: tcp_server.h:59
int main(int argc, char *argv[])
Parent class for for arbitrary consumers.
Definition: pipeline.h:43
bool waitForConnectionCallback(int milliseconds=100)
virtual bool consume(std::shared_ptr< rtde_interface::RTDEPackage > product)
Consumes a product, utilizing it&#39;s contents.
Parent class for notifiers.
Definition: pipeline.h:210
The RTDE specific parser. Interprets a given byte stream as serialized RTDE packages and parses it ac...
Definition: rtde_parser.h:45
std::condition_variable connect_cv_
void connectionCallback(const int filedescriptor)
std::unique_ptr< comm::URStream< rtde_interface::RTDEPackage > > stream_
std::unique_ptr< comm::URProducer< rtde_interface::RTDEPackage > > producer_
The DataPackage class handles communication in the form of RTDE data packages both to and from the ro...
Definition: data_package.h:60
comm::INotifier notifier_
std::unique_ptr< rtde_interface::RTDEParser > parser_
The Pipepline manages the production and optionally consumption of packages. Cyclically the producer ...
Definition: pipeline.h:235
A general producer for URPackages. Implements funcionality to produce packages by reading and parsing...
Definition: producer.h:40
std::unique_ptr< comm::TCPServer > server_


ur_client_library
Author(s): Thomas Timm Andersen, Simon Rasmussen, Felix Exner, Lea Steffen, Tristan Schnell
autogenerated on Tue Jul 4 2023 02:09:47