pipeline.cpp
Go to the documentation of this file.
2 
4 
5 Pipeline::Pipeline(std::shared_ptr<Writer<PFPacket>> writer, std::shared_ptr<Reader<PFPacket>> reader,
6  std::function<void()> func, std::shared_ptr<std::mutex> net_mtx,
7  std::shared_ptr<std::condition_variable> net_cv, bool& net_fail)
8  : queue_{ 100 }
9  , reader_(reader)
10  , writer_(writer)
11  , shutdown(func)
12  , running_(false)
13  , shutdown_(false)
14  , net_mtx_(net_mtx)
15  , net_cv_(net_cv)
16  , net_fail_(net_fail)
17 {
18 }
19 
21 {
22  if (running_)
23  return true;
24 
25  if (!writer_->start() || !reader_->start())
26  {
27  ROS_ERROR("Unable to establish connection");
28  return false;
29  }
30 
31  running_ = true;
32  // shutdown_ = false;
33  // ROS_INFO("Starting read-write pipeline!");
34 
35  reader_thread_ = std::thread(&Pipeline::run_reader, this);
36  writer_thread_ = std::thread(&Pipeline::run_writer, this);
37  return true;
38 }
39 
41 {
42  // ROS_INFO("Stopping read-write pipeline!");
43  writer_->stop();
44  reader_->stop();
45 
46  running_ = false;
47 
48  if (reader_thread_.joinable() && writer_thread_.joinable())
49  {
50  reader_thread_.join();
51  writer_thread_.join();
52  }
53 }
54 
56 {
57  return running_;
58 }
59 
61 {
62  if (shutdown && !shutdown_)
63  {
64  shutdown_ = true;
65  shutdown();
66  }
67 }
68 
70 {
71  std::vector<std::unique_ptr<PFPacket>> packets;
72  while (running_)
73  {
74  if (!writer_->get(packets)) // packets are already parsed here
75  {
76  break;
77  }
78  for (auto& p : packets)
79  {
80  if (!queue_.try_enqueue(std::move(p)))
81  ROS_DEBUG("Queue overflow!");
82  }
83  packets.clear();
84  }
85  writer_->stop();
86  reader_->stop();
87 
88  running_ = false;
89 
90  // notify main thread about network failure
91  {
92  std::lock_guard<std::mutex> lock(*net_mtx_);
93  net_fail_ = true;
94  }
95  net_cv_->notify_one();
96 }
97 
99 {
100  std::unique_ptr<PFPacket> packet;
101  while (running_)
102  {
103  // ROS_INFO("reader loop");
104  // wait till next message is received from device with 1 second timeout
105  if (!queue_.wait_dequeue_timed(packet, std::chrono::milliseconds(100)))
106  {
107  // TODO: reader needs to handle if no packet is received
108  continue;
109  }
110  if (packet)
111  {
112  reader_->read(std::move(packet)); // here the scans will be published
113  }
114  }
115 }
std::atomic< bool > running_
Definition: pipeline.h:36
std::thread reader_thread_
Definition: pipeline.h:37
std::atomic< bool > shutdown_
Definition: pipeline.h:36
Definition: writer.h:7
std::shared_ptr< Reader< PFPacket > > reader_
Definition: pipeline.h:33
#define ROS_ERROR(...)
bool wait_dequeue_timed(U &result, std::int64_t timeout_usecs) AE_NO_TSAN
AE_FORCEINLINE bool try_enqueue(T const &element) AE_NO_TSAN
std::shared_ptr< std::condition_variable > net_cv_
Definition: pipeline.h:40
#define ROS_DEBUG(...)
void terminate()
Definition: pipeline.cpp:40
bool & net_fail_
Definition: pipeline.h:41
moodycamel::BlockingReaderWriterQueue< std::unique_ptr< PFPacket > > queue_
Definition: pipeline.h:32
std::shared_ptr< Writer< PFPacket > > writer_
Definition: pipeline.h:34
bool start()
Definition: pipeline.cpp:20
std::shared_ptr< std::mutex > net_mtx_
Definition: pipeline.h:39
void run_writer()
Definition: pipeline.cpp:69
std::thread writer_thread_
Definition: pipeline.h:37
bool is_running()
Definition: pipeline.cpp:55
void on_shutdown()
Definition: pipeline.cpp:60
std::function< void()> shutdown
Definition: pipeline.h:35
Pipeline(std::shared_ptr< Writer< PFPacket >> writer, std::shared_ptr< Reader< PFPacket >> reader, std::function< void()> func, std::shared_ptr< std::mutex > net_mtx, std::shared_ptr< std::condition_variable > net_cv, bool &net_fail)
Definition: pipeline.cpp:5
void run_reader()
Definition: pipeline.cpp:98


pf_driver
Author(s): Harsh Deshpande
autogenerated on Fri Feb 24 2023 03:59:35