producer.h
Go to the documentation of this file.
1 /*
2  * Copyright 2017, 2018 Simon Rasmussen (refactor)
3  *
4  * Copyright 2015, 2016 Thomas Timm Andersen (original version)
5  *
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 #pragma once
20 #include <chrono>
24 
25 template <typename T>
26 class URProducer : public IProducer<T>
27 {
28 private:
31  std::chrono::seconds timeout_;
32 
33 public:
34  URProducer(URStream& stream, URParser<T>& parser) : stream_(stream), parser_(parser), timeout_(1)
35  {
36  }
37 
39  {
40  stream_.connect();
41  }
43  {
44  stream_.disconnect();
45  }
46  void stopProducer()
47  {
48  stream_.disconnect();
49  }
50 
51  bool tryGet(std::vector<unique_ptr<T>>& products)
52  {
53  // 4KB should be enough to hold any packet received from UR
54  uint8_t buf[4096];
55  size_t read = 0;
56  // expoential backoff reconnects
57  while (true)
58  {
59  if (stream_.read(buf, sizeof(buf), read))
60  {
61  // reset sleep amount
62  timeout_ = std::chrono::seconds(1);
63  break;
64  }
65 
66  if (stream_.closed())
67  return false;
68 
69  LOG_WARN("Failed to read from stream, reconnecting in %ld seconds...", timeout_.count());
70  std::this_thread::sleep_for(timeout_);
71 
72  if (stream_.connect())
73  continue;
74 
75  auto next = timeout_ * 2;
76  if (next <= std::chrono::seconds(120))
77  timeout_ = next;
78  }
79 
80  BinParser bp(buf, read);
81  return parser_.parse(bp, products);
82  }
83 };
bool connect()
Definition: stream.h:47
virtual bool parse(BinParser &bp, std::vector< std::unique_ptr< T >> &results)=0
void disconnect()
Definition: stream.h:51
URProducer(URStream &stream, URParser< T > &parser)
Definition: producer.h:34
URStream & stream_
Definition: producer.h:29
bool closed()
Definition: stream.h:57
void stopProducer()
Definition: producer.h:46
void setupProducer()
Definition: producer.h:38
void teardownProducer()
Definition: producer.h:42
bool tryGet(std::vector< unique_ptr< T >> &products)
Definition: producer.h:51
std::chrono::seconds timeout_
Definition: producer.h:31
bool read(uint8_t *buf, size_t buf_len, size_t &read)
Definition: stream.cpp:33
URParser< T > & parser_
Definition: producer.h:30
#define LOG_WARN(format,...)
Definition: log.h:34


ur_modern_driver
Author(s): Thomas Timm Andersen, Simon Rasmussen
autogenerated on Fri Jun 26 2020 03:37:00