Program Listing for File pipeline.h

Return to documentation for file (/tmp/ws/src/ur_client_library/include/ur_client_library/comm/pipeline.h)

/*
 * Copyright 2019, FZI Forschungszentrum Informatik (templating)
 *
 * Copyright 2017, 2018 Simon Rasmussen (refactor)
 *
 * Copyright 2015, 2016 Thomas Timm Andersen (original version)
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#pragma once

#include "ur_client_library/comm/package.h"
#include "ur_client_library/log.h"
#include "ur_client_library/helpers.h"
#include "ur_client_library/queue/readerwriterqueue.h"
#include <atomic>
#include <chrono>
#include <thread>
#include <vector>
#include <fstream>

namespace urcl
{
namespace comm
{
template <typename T>
class IConsumer
{
public:
  virtual void setupConsumer()
  {
  }
  virtual void teardownConsumer()
  {
    stopConsumer();
  }
  virtual void stopConsumer()
  {
  }
  virtual void onTimeout()
  {
  }

  virtual bool consume(std::shared_ptr<T> product) = 0;
};

template <typename T>
class MultiConsumer : public IConsumer<T>
{
private:
  std::vector<IConsumer<T>*> consumers_;

public:
  MultiConsumer(std::vector<IConsumer<T>*> consumers) : consumers_(consumers)
  {
  }

  virtual void setupConsumer()
  {
    for (auto& con : consumers_)
    {
      con->setupConsumer();
    }
  }
  virtual void teardownConsumer()
  {
    for (auto& con : consumers_)
    {
      con->teardownConsumer();
    }
  }
  virtual void stopConsumer()
  {
    for (auto& con : consumers_)
    {
      con->stopConsumer();
    }
  }
  virtual void onTimeout()
  {
    for (auto& con : consumers_)
    {
      con->onTimeout();
    }
  }

  bool consume(std::shared_ptr<T> product)
  {
    bool res = true;
    for (auto& con : consumers_)
    {
      if (!con->consume(product))
        res = false;
    }
    return res;
  }
};

template <typename T>
class IProducer
{
public:
  virtual void setupProducer(const size_t max_num_tries = 0,
                             const std::chrono::milliseconds reconnection_time = std::chrono::seconds(10))
  {
  }
  virtual void teardownProducer()
  {
    stopProducer();
  }
  virtual void stopProducer()
  {
  }

  virtual void startProducer()
  {
  }

  virtual bool tryGet(std::vector<std::unique_ptr<T>>& products) = 0;
};

class INotifier
{
public:
  virtual void started(std::string name)
  {
  }
  virtual void stopped(std::string name)
  {
  }
};

template <typename T>
class Pipeline
{
public:
  typedef std::chrono::high_resolution_clock Clock;
  typedef Clock::time_point Time;
  Pipeline(IProducer<T>& producer, IConsumer<T>* consumer, std::string name, INotifier& notifier,
           const bool producer_fifo_scheduling = false)
    : producer_(producer)
    , consumer_(consumer)
    , name_(name)
    , notifier_(notifier)
    , queue_{ 32 }
    , running_{ false }
    , producer_fifo_scheduling_(producer_fifo_scheduling)
  {
  }
  Pipeline(IProducer<T>& producer, std::string name, INotifier& notifier, const bool producer_fifo_scheduling = false)
    : producer_(producer)
    , consumer_(nullptr)
    , name_(name)
    , notifier_(notifier)
    , queue_{ 32 }
    , running_{ false }
    , producer_fifo_scheduling_(producer_fifo_scheduling)
  {
  }

  virtual ~Pipeline()
  {
    URCL_LOG_DEBUG("Destructing pipeline");
    stop();
  }

  void init(const size_t max_num_tries = 0,
            const std::chrono::milliseconds reconnection_time = std::chrono::seconds(10))
  {
    producer_.setupProducer(max_num_tries, reconnection_time);
    if (consumer_ != nullptr)
      consumer_->setupConsumer();
  }

  void run()
  {
    if (running_)
      return;

    running_ = true;
    producer_.startProducer();
    pThread_ = std::thread(&Pipeline::runProducer, this);
    if (consumer_ != nullptr)
      cThread_ = std::thread(&Pipeline::runConsumer, this);
    notifier_.started(name_);
  }

  void stop()
  {
    if (!running_)
      return;

    URCL_LOG_DEBUG("Stopping pipeline! <%s>", name_.c_str());

    running_ = false;

    producer_.stopProducer();
    if (pThread_.joinable())
    {
      pThread_.join();
    }
    if (cThread_.joinable())
    {
      cThread_.join();
    }
    notifier_.stopped(name_);
  }

  bool getLatestProduct(std::unique_ptr<T>& product, std::chrono::milliseconds timeout)
  {
    // If the queue has more than one package, get the latest one.
    bool res = false;
    while (queue_.tryDequeue(product))
    {
      res = true;
    }

    // If the queue is empty, wait for a package.
    return res || queue_.waitDequeTimed(product, timeout);
  }

private:
  IProducer<T>& producer_;
  IConsumer<T>* consumer_;
  std::string name_;
  INotifier& notifier_;
  moodycamel::BlockingReaderWriterQueue<std::unique_ptr<T>> queue_;
  std::atomic<bool> running_;
  std::thread pThread_, cThread_;
  bool producer_fifo_scheduling_;

  void runProducer()
  {
    URCL_LOG_DEBUG("Starting up producer");
    if (producer_fifo_scheduling_)
    {
      pthread_t this_thread = pthread_self();
      const int max_thread_priority = sched_get_priority_max(SCHED_FIFO);
      setFiFoScheduling(this_thread, max_thread_priority);
    }
    std::vector<std::unique_ptr<T>> products;
    while (running_)
    {
      if (!producer_.tryGet(products))
      {
        producer_.teardownProducer();
        running_ = false;
        break;
      }

      for (auto& p : products)
      {
        if (!queue_.tryEnqueue(std::move(p)))
        {
          URCL_LOG_ERROR("Pipeline producer overflowed! <%s>", name_.c_str());
        }
      }

      products.clear();
    }
    URCL_LOG_DEBUG("Pipeline producer ended! <%s>", name_.c_str());
    notifier_.stopped(name_);
  }

  void runConsumer()
  {
    std::unique_ptr<T> product;
    while (running_)
    {
      // timeout was chosen because we should receive messages
      // at roughly 125hz (every 8ms) and have to update
      // the controllers (i.e. the consumer) with *at least* 125Hz
      // So we update the consumer more frequently via onTimeout
      if (!queue_.waitDequeTimed(product, std::chrono::milliseconds(8)))
      {
        consumer_->onTimeout();
        continue;
      }

      if (!consumer_->consume(std::move(product)))
      {
        consumer_->teardownConsumer();
        running_ = false;
        break;
      }
    }
    consumer_->stopConsumer();
    URCL_LOG_DEBUG("Pipeline consumer ended! <%s>", name_.c_str());
    notifier_.stopped(name_);
  }
};
}  // namespace comm
}  // namespace urcl