pipeline.h
Go to the documentation of this file.
1 /*
2  * Copyright 2019, FZI Forschungszentrum Informatik (templating)
3  *
4  * Copyright 2017, 2018 Simon Rasmussen (refactor)
5  *
6  * Copyright 2015, 2016 Thomas Timm Andersen (original version)
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  */
20 
21 #pragma once
22 
24 #include "ur_client_library/log.h"
26 #include <atomic>
27 #include <chrono>
28 #include <thread>
29 #include <vector>
30 #include <fstream>
31 
32 namespace urcl
33 {
34 namespace comm
35 {
41 template <typename T>
42 class IConsumer
43 {
44 public:
48  virtual void setupConsumer()
49  {
50  }
54  virtual void teardownConsumer()
55  {
56  stopConsumer();
57  }
61  virtual void stopConsumer()
62  {
63  }
67  virtual void onTimeout()
68  {
69  }
70 
78  virtual bool consume(std::shared_ptr<T> product) = 0;
79 };
80 
87 template <typename T>
88 class MultiConsumer : public IConsumer<T>
89 {
90 private:
91  std::vector<IConsumer<T>*> consumers_;
92 
93 public:
99  MultiConsumer(std::vector<IConsumer<T>*> consumers) : consumers_(consumers)
100  {
101  }
102 
106  virtual void setupConsumer()
107  {
108  for (auto& con : consumers_)
109  {
110  con->setupConsumer();
111  }
112  }
116  virtual void teardownConsumer()
117  {
118  for (auto& con : consumers_)
119  {
120  con->teardownConsumer();
121  }
122  }
126  virtual void stopConsumer()
127  {
128  for (auto& con : consumers_)
129  {
130  con->stopConsumer();
131  }
132  }
136  virtual void onTimeout()
137  {
138  for (auto& con : consumers_)
139  {
140  con->onTimeout();
141  }
142  }
143 
151  bool consume(std::shared_ptr<T> product)
152  {
153  bool res = true;
154  for (auto& con : consumers_)
155  {
156  if (!con->consume(product))
157  res = false;
158  }
159  return res;
160  }
161 };
162 
168 template <typename T>
170 {
171 public:
175  virtual void setupProducer()
176  {
177  }
181  virtual void teardownProducer()
182  {
183  stopProducer();
184  }
188  virtual void stopProducer()
189  {
190  }
191 
192  virtual void startProducer()
193  {
194  }
195 
203  virtual bool tryGet(std::vector<std::unique_ptr<T>>& products) = 0;
204 };
205 
210 {
211 public:
215  virtual void started(std::string name)
216  {
217  }
221  virtual void stopped(std::string name)
222  {
223  }
224 };
225 
233 template <typename T>
234 class Pipeline
235 {
236 public:
237  typedef std::chrono::high_resolution_clock Clock;
238  typedef Clock::time_point Time;
248  Pipeline(IProducer<T>& producer, IConsumer<T>* consumer, std::string name, INotifier& notifier)
249  : producer_(producer), consumer_(consumer), name_(name), notifier_(notifier), queue_{ 32 }, running_{ false }
250  {
251  }
260  Pipeline(IProducer<T>& producer, std::string name, INotifier& notifier)
261  : producer_(producer), consumer_(nullptr), name_(name), notifier_(notifier), queue_{ 32 }, running_{ false }
262  {
263  }
264 
268  virtual ~Pipeline()
269  {
270  URCL_LOG_DEBUG("Destructing pipeline");
271  stop();
272  }
273 
274  void init()
275  {
276  producer_.setupProducer();
277  if (consumer_ != nullptr)
278  consumer_->setupConsumer();
279  }
280 
284  void run()
285  {
286  if (running_)
287  return;
288 
289  running_ = true;
290  producer_.startProducer();
291  pThread_ = std::thread(&Pipeline::runProducer, this);
292  if (consumer_ != nullptr)
293  cThread_ = std::thread(&Pipeline::runConsumer, this);
294  notifier_.started(name_);
295  }
296 
300  void stop()
301  {
302  if (!running_)
303  return;
304 
305  URCL_LOG_DEBUG("Stopping pipeline! <%s>", name_.c_str());
306 
307  running_ = false;
308 
309  producer_.stopProducer();
310  if (pThread_.joinable())
311  {
312  pThread_.join();
313  }
314  if (cThread_.joinable())
315  {
316  cThread_.join();
317  }
318  notifier_.stopped(name_);
319  }
320 
329  bool getLatestProduct(std::unique_ptr<T>& product, std::chrono::milliseconds timeout)
330  {
331  return queue_.waitDequeTimed(product, timeout);
332  }
333 
334 private:
337  std::string name_;
340  std::atomic<bool> running_;
341  std::thread pThread_, cThread_;
342 
343  void runProducer()
344  {
345  URCL_LOG_DEBUG("Starting up producer");
346  std::ifstream realtime_file("/sys/kernel/realtime", std::ios::in);
347  bool has_realtime;
348  realtime_file >> has_realtime;
349  if (has_realtime)
350  {
351  const int max_thread_priority = sched_get_priority_max(SCHED_FIFO);
352  if (max_thread_priority != -1)
353  {
354  // We'll operate on the currently running thread.
355  pthread_t this_thread = pthread_self();
356 
357  // struct sched_param is used to store the scheduling priority
358  struct sched_param params;
359 
360  // We'll set the priority to the maximum.
361  params.sched_priority = max_thread_priority;
362 
363  int ret = pthread_setschedparam(this_thread, SCHED_FIFO, &params);
364  if (ret != 0)
365  {
366  URCL_LOG_ERROR("Unsuccessful in setting producer thread realtime priority. Error code: %d", ret);
367  }
368  // Now verify the change in thread priority
369  int policy = 0;
370  ret = pthread_getschedparam(this_thread, &policy, &params);
371  if (ret != 0)
372  {
373  std::cout << "Couldn't retrieve real-time scheduling paramers" << std::endl;
374  }
375 
376  // Check the correct policy was applied
377  if (policy != SCHED_FIFO)
378  {
379  URCL_LOG_ERROR("Producer thread: Scheduling is NOT SCHED_FIFO!");
380  }
381  else
382  {
383  URCL_LOG_INFO("Producer thread: SCHED_FIFO OK");
384  }
385 
386  // Print thread scheduling priority
387  URCL_LOG_INFO("Thread priority is %d", params.sched_priority);
388  }
389  else
390  {
391  URCL_LOG_ERROR("Could not get maximum thread priority for producer thread");
392  }
393  }
394  else
395  {
396  URCL_LOG_WARN("No realtime capabilities found. Consider using a realtime system for better performance");
397  }
398  std::vector<std::unique_ptr<T>> products;
399  while (running_)
400  {
401  if (!producer_.tryGet(products))
402  {
403  producer_.teardownProducer();
404  running_ = false;
405  break;
406  }
407 
408  for (auto& p : products)
409  {
410  if (!queue_.tryEnqueue(std::move(p)))
411  {
412  URCL_LOG_ERROR("Pipeline producer overflowed! <%s>", name_.c_str());
413  }
414  }
415 
416  products.clear();
417  }
418  URCL_LOG_DEBUG("Pipeline producer ended! <%s>", name_.c_str());
419  notifier_.stopped(name_);
420  }
421 
422  void runConsumer()
423  {
424  std::unique_ptr<T> product;
425  while (running_)
426  {
427  // timeout was chosen because we should receive messages
428  // at roughly 125hz (every 8ms) and have to update
429  // the controllers (i.e. the consumer) with *at least* 125Hz
430  // So we update the consumer more frequently via onTimeout
431  if (!queue_.waitDequeTimed(product, std::chrono::milliseconds(8)))
432  {
433  consumer_->onTimeout();
434  continue;
435  }
436 
437  if (!consumer_->consume(std::move(product)))
438  {
439  consumer_->teardownConsumer();
440  running_ = false;
441  break;
442  }
443  }
444  consumer_->stopConsumer();
445  URCL_LOG_DEBUG("Pipeline consumer ended! <%s>", name_.c_str());
446  notifier_.stopped(name_);
447  }
448 };
449 } // namespace comm
450 } // namespace urcl
virtual void onTimeout()
Functionality for handling consumer timeouts.
Definition: pipeline.h:67
Pipeline(IProducer< T > &producer, IConsumer< T > *consumer, std::string name, INotifier &notifier)
Creates a new Pipeline object, registering producer, consumer and notifier. Additionally, an empty queue is initialized.
Definition: pipeline.h:248
#define URCL_LOG_ERROR(...)
Definition: log.h:37
Consumer, that allows one product to be consumed by multiple arbitrary conusmers. ...
Definition: pipeline.h:88
virtual void teardownProducer()
Fully tears down the producer - by default no difference to stopping it.
Definition: pipeline.h:181
virtual void started(std::string name)
Start notification.
Definition: pipeline.h:215
virtual ~Pipeline()
The Pipeline object&#39;s destructor, stopping the pipeline and joining all running threads.
Definition: pipeline.h:268
void run()
Starts the producer and, if existing, the consumer in new threads.
Definition: pipeline.h:284
void stop()
Stops the pipeline and all running threads.
Definition: pipeline.h:300
virtual void onTimeout()
Triggers timeout functionality for all registered consumers.
Definition: pipeline.h:136
virtual bool consume(std::shared_ptr< T > product)=0
Consumes a product, utilizing it&#39;s contents.
Clock::time_point Time
Definition: pipeline.h:238
bool consume(std::shared_ptr< T > product)
Consumes a given product with all registered consumers.
Definition: pipeline.h:151
virtual void stopConsumer()
Stops the consumer.
Definition: pipeline.h:61
virtual void setupProducer()
Set-up functionality of the producers.
Definition: pipeline.h:175
virtual void teardownConsumer()
Fully tears down the consumer - by default no difference to stopping it.
Definition: pipeline.h:54
virtual void setupConsumer()
Sets up all registered consumers.
Definition: pipeline.h:106
virtual void stopConsumer()
Stops all registered consumers.
Definition: pipeline.h:126
moodycamel::BlockingReaderWriterQueue< std::unique_ptr< T > > queue_
Definition: pipeline.h:339
std::chrono::high_resolution_clock Clock
Definition: pipeline.h:237
Parent class for for arbitrary consumers.
Definition: pipeline.h:42
AE_FORCEINLINE bool tryEnqueue(T const &element)
virtual bool tryGet(std::vector< std::unique_ptr< T >> &products)=0
Reads packages from some source and produces corresponding objects.
Parent class for notifiers.
Definition: pipeline.h:209
IConsumer< T > * consumer_
Definition: pipeline.h:336
bool getLatestProduct(std::unique_ptr< T > &product, std::chrono::milliseconds timeout)
Returns the next package in the queue. Can be used instead of registering a consumer.
Definition: pipeline.h:329
std::vector< IConsumer< T > * > consumers_
Definition: pipeline.h:91
std::atomic< bool > running_
Definition: pipeline.h:340
#define URCL_LOG_DEBUG(...)
Definition: log.h:34
std::thread pThread_
Definition: pipeline.h:341
std::string name_
Definition: pipeline.h:337
IProducer< T > & producer_
Definition: pipeline.h:335
virtual void startProducer()
Definition: pipeline.h:192
#define URCL_LOG_WARN(...)
Definition: log.h:35
virtual void teardownConsumer()
Tears down all registered consumers.
Definition: pipeline.h:116
Parent class for arbitrary producers of packages.
Definition: pipeline.h:169
virtual void stopProducer()
Stops the producer.
Definition: pipeline.h:188
bool waitDequeTimed(U &result, std::int64_t timeout_usecs)
Pipeline(IProducer< T > &producer, std::string name, INotifier &notifier)
Creates a new Pipeline object, registering producer and notifier while no consumer is used...
Definition: pipeline.h:260
INotifier & notifier_
Definition: pipeline.h:338
virtual void stopped(std::string name)
Stop notification.
Definition: pipeline.h:221
The Pipepline manages the production and optionally consumption of packages. Cyclically the producer ...
Definition: pipeline.h:234
#define URCL_LOG_INFO(...)
Definition: log.h:36
MultiConsumer(std::vector< IConsumer< T > * > consumers)
Creates a new MultiConsumer object.
Definition: pipeline.h:99
virtual void setupConsumer()
Set-up functionality of the consumer.
Definition: pipeline.h:48


ur_client_library
Author(s): Thomas Timm Andersen, Simon Rasmussen, Felix Exner, Lea Steffen, Tristan Schnell
autogenerated on Sun May 9 2021 02:16:26