Program Listing for File producer.h

Return to documentation for file (include/ur_client_library/comm/producer.h)

/*
 * Copyright 2019, FZI Forschungszentrum Informatik (templating)
 *
 * Copyright 2017, 2018 Simon Rasmussen (refactor)
 *
 * Copyright 2015, 2016 Thomas Timm Andersen (original version)
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#pragma once
#include <chrono>
#include "ur_client_library/comm/pipeline.h"
#include "ur_client_library/comm/parser.h"
#include "ur_client_library/comm/stream.h"
#include "ur_client_library/comm/package.h"
#include "ur_client_library/exceptions.h"

namespace urcl
{
namespace comm
{
template <typename T>
class URProducer : public IProducer<T>
{
private:
  URStream<T>& stream_;
  Parser<T>& parser_;
  std::chrono::seconds timeout_;

  bool running_;

public:
  URProducer(URStream<T>& stream, Parser<T>& parser) : stream_(stream), parser_(parser), timeout_(1), running_(false)
  {
  }

  void setupProducer(const size_t max_num_tries = 0,
                     const std::chrono::milliseconds reconnection_time = std::chrono::seconds(10)) override
  {
    timeval tv;
    tv.tv_sec = 1;
    tv.tv_usec = 0;
    stream_.setReceiveTimeout(tv);
    if (!stream_.connect(max_num_tries, reconnection_time))
    {
      throw UrException("Failed to connect to robot. Please check if the robot is booted and connected.");
    }
  }
  void teardownProducer() override
  {
    stopProducer();
  }
  void stopProducer() override
  {
    running_ = false;
  }

  void startProducer() override
  {
    running_ = true;
  }

  bool tryGet(std::vector<std::unique_ptr<T>>& products) override
  {
    // TODO This function has become really ugly! That should be refactored!

    // 4KB should be enough to hold any packet received from UR
    uint8_t buf[4096];
    size_t read = 0;
    // expoential backoff reconnects
    while (true)
    {
      if (stream_.read(buf, sizeof(buf), read))
      {
        // reset sleep amount
        timeout_ = std::chrono::seconds(1);
        BinParser bp(buf, read);
        return parser_.parse(bp, products);
      }

      if (!running_)
        return true;

      if (stream_.closed())
        return false;

      URCL_LOG_WARN("Failed to read from stream, reconnecting in %ld seconds...", timeout_.count());
      std::this_thread::sleep_for(timeout_);

      if (stream_.connect())
        continue;

      auto next = timeout_ * 2;
      if (next <= std::chrono::seconds(120))
        timeout_ = next;
    }

    return false;
  }
};
}  // namespace comm
}  // namespace urcl