sink_impl.h
Go to the documentation of this file.
1 /****************************************************************************
2  *
3  * fkie_message_filters
4  * Copyright © 2018-2020 Fraunhofer FKIE
5  * Author: Timo Röhling
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  * http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  *
19  ****************************************************************************/
20 #ifndef INCLUDE_FKIE_MESSAGE_FILTERS_SINK_IMPL_H_
21 #define INCLUDE_FKIE_MESSAGE_FILTERS_SINK_IMPL_H_
22 
23 #include "sink.h"
24 #include "helpers/scoped_unlock.h"
25 
26 namespace fkie_message_filters
27 {
28 
29 template<typename... Inputs>
30 class Sink<Inputs...>::ReentryProtector
31 {
32 public:
34  : running_(parent.running_), this_id_(std::this_thread::get_id())
35  {
36  if (!running_.insert(this_id_).second)
37  {
38  throw std::logic_error("recursive invocation detected");
39  }
40  }
41 
43  {
44  running_.erase(this_id_);
45  }
46 
47 private:
48  std::set<std::thread::id>& running_;
49  const std::thread::id this_id_;
50 };
51 
52 
53 template<typename... Inputs>
55 {
56  std::lock_guard<std::mutex> lock(mutex_);
57  Connection c = src.signal_.connect_extended([this](const Connection& conn, const Inputs&... in) { this->receive_cb(conn, in...); });
58  conn_.push_back(c);
59  return c;
60 }
61 
62 template<typename... Inputs>
64 {
65  std::lock_guard<std::mutex> lock(mutex_);
66  conn_.clear();
67 }
68 
69 template<typename... Inputs>
71 {
73 }
74 
75 template<typename... Inputs>
76 void Sink<Inputs...>::receive_cb(const Connection&, const Inputs&... in)
77 {
78  std::unique_lock<std::mutex> lock(mutex_);
79  conn_.erase(std::remove_if(conn_.begin(), conn_.end(), [](const Connection& c) -> bool { return !c.connected(); }), conn_.end());
80  ReentryProtector p{*this};
81  auto unlock = helpers::with_scoped_unlock(lock);
82  receive(in...);
83 }
84 
85 } // namespace fkie_message_filters
86 
87 #endif /* INCLUDE_FKIE_MESSAGE_FILTERS_SINK_IMPL_H_ */
Base class for data consumers.
Definition: sink.h:46
boost::signals2::connection Connection
Tracks connections from sources to sinks.
Definition: types.h:31
void receive_cb(const Connection &, const Inputs &... in)
Definition: sink_impl.h:76
virtual void receive(const Inputs &... in)=0
Process incoming data.
Base class for data providers.
Definition: sink.h:33
void disconnect_from_all_sources() noexcept
Disconnect from all connected sources.
Definition: sink_impl.h:63
Connection connect_to_source(Source< Inputs... > &src) noexcept
Connect this sink to a source.
Definition: sink_impl.h:54
std::vector< boost::signals2::scoped_connection > conn_
Definition: sink.h:96
std::set< std::thread::id > & running_
Definition: sink_impl.h:48
Primary namespace.
Definition: buffer.h:33
ScopedUnlock< BasicLockable > with_scoped_unlock(BasicLockable &lockable)
Definition: scoped_unlock.h:59
ReentryProtector(Sink< Inputs... > &parent)
Definition: sink_impl.h:33
virtual void disconnect() noexcept override
Disconnect from all connected sources.
Definition: sink_impl.h:70
std::set< std::thread::id > running_
Definition: sink.h:98
std::mutex mutex_
Definition: sink.h:97


fkie_message_filters
Author(s): Timo Röhling
autogenerated on Mon Feb 28 2022 22:21:43