sink_impl.h
Go to the documentation of this file.
1 /****************************************************************************
2  *
3  * fkie_message_filters
4  * Copyright © 2018-2020 Fraunhofer FKIE
5  * Author: Timo Röhling
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  * http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  *
19  ****************************************************************************/
20 #ifndef INCLUDE_FKIE_MESSAGE_FILTERS_SINK_IMPL_H_
21 #define INCLUDE_FKIE_MESSAGE_FILTERS_SINK_IMPL_H_
22 
23 #include "sink.h"
24 #include "helpers/scoped_unlock.h"
25 
26 namespace fkie_message_filters
27 {
28 
29 template<typename... Inputs>
30 class Sink<Inputs...>::ReentryProtector
31 {
32 public:
33  ReentryProtector(Sink<Inputs...>& parent)
34  : running_(parent.running_), this_id_(std::this_thread::get_id())
35  {
36  if (!running_.insert(this_id_).second)
37  {
38  throw std::logic_error("recursive invocation detected");
39  }
40  }
41 
42  ~ReentryProtector()
43  {
44  running_.erase(this_id_);
45  }
46 
47 private:
48  std::set<std::thread::id>& running_;
49  const std::thread::id this_id_;
50 };
51 
52 
53 template<typename... Inputs>
55 {
56  std::lock_guard<std::mutex> lock(mutex_);
57  Connection c = src.signal_.connect_extended([this](const Connection& conn, const Inputs&... in) { this->receive_cb(conn, in...); });
58  conn_.push_back(c);
59  return c;
60 }
61 
62 template<typename... Inputs>
64 {
65  std::lock_guard<std::mutex> lock(mutex_);
66  conn_.clear();
67 }
68 
69 template<typename... Inputs>
70 void Sink<Inputs...>::disconnect() noexcept
71 {
72  disconnect_from_all_sources();
73 }
74 
75 template<typename... Inputs>
76 void Sink<Inputs...>::receive_cb(const Connection&, const Inputs&... in)
77 {
78  std::unique_lock<std::mutex> lock(mutex_);
79  conn_.erase(std::remove_if(conn_.begin(), conn_.end(), [](const Connection& c) -> bool { return !c.connected(); }), conn_.end());
80  ReentryProtector p{*this};
81  auto unlock = helpers::with_scoped_unlock(lock);
82  receive(in...);
83 }
84 
85 } // namespace fkie_message_filters
86 
87 #endif /* INCLUDE_FKIE_MESSAGE_FILTERS_SINK_IMPL_H_ */
fkie_message_filters
Definition: buffer.h:33
fkie_message_filters::Sink::connect_to_source
Connection connect_to_source(Source< Inputs... > &src) noexcept
Connect this sink to a source.
Definition: sink_impl.h:72
fkie_message_filters::Sink::ReentryProtector
Definition: sink_impl.h:48
fkie_message_filters::Sink::disconnect_from_all_sources
void disconnect_from_all_sources() noexcept
Disconnect from all connected sources.
Definition: sink_impl.h:81
fkie_message_filters::Connection
boost::signals2::connection Connection
Tracks connections from sources to sinks.
Definition: types.h:49
fkie_message_filters::Sink::running_
std::set< std::thread::id > running_
Definition: sink.h:116
scoped_unlock.h
sink.h
std
fkie_message_filters::Sink
Base class for data consumers.
Definition: sink.h:64
fkie_message_filters::Source
Base class for data providers.
Definition: sink.h:51
fkie_message_filters::helpers::with_scoped_unlock
ScopedUnlock< BasicLockable > with_scoped_unlock(BasicLockable &lockable)
Definition: scoped_unlock.h:95


fkie_message_filters
Author(s): Timo Röhling
autogenerated on Wed Mar 2 2022 00:18:57