combiner_impl.h
Go to the documentation of this file.
1 /****************************************************************************
2  *
3  * fkie_message_filters
4  * Copyright © 2018-2020 Fraunhofer FKIE
5  * Author: Timo Röhling
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  * http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  *
19  ****************************************************************************/
20 
21 #ifndef INCLUDE_FKIE_MESSAGE_FILTERS_COMBINER_IMPL_H_
22 #define INCLUDE_FKIE_MESSAGE_FILTERS_COMBINER_IMPL_H_
23 
24 #include "combiner.h"
25 
26 namespace fkie_message_filters
27 {
28 
29 template<template<typename...> class PolicyTmpl, class... IOs>
31 : policy_(policy)
32 {
33  helpers::for_each_apply<std::tuple_size<IncomingTuples>::value>(
34  [this](auto I)
35  {
36  std::get<I>(this->sinks_).set_parent(this);
37  }
38  );
39  connect_policy();
40 }
41 
42 template<template<typename...> class PolicyTmpl, class... IOs>
44 {
45  policy_.set_emitter_callback(
46  [this](const OutgoingTuple& out)
47  {
48  helpers::index_apply<std::tuple_size<OutgoingTuple>::value>(
49  [this, &out](auto... Is)
50  {
51  this->send(std::get<Is>(out)...);
52  }
53  );
54  }
55  );
56  helpers::for_each_apply<std::tuple_size<IncomingTuples>::value>(
57  [this](auto I)
58  {
59  std::get<I>(this->sinks_).set_policy_input(
60  [this](std::unique_lock<std::mutex>& lock, auto&& in)
61  {
62  this->policy_.template add<decltype(I)::value>(lock, std::forward<decltype(in)>(in));
63  }
64  );
65  }
66  );
67 }
68 
69 template<template<typename...> class PolicyTmpl, class... IOs>
71 {
72  std::lock_guard<std::mutex> lock(combiner_mutex_);
73  policy_ = policy;
74  connect_policy();
75 }
76 
77 template<template<typename...> class PolicyTmpl, class... IOs>
79 {
80  std::lock_guard<std::mutex> lock(combiner_mutex_);
81  policy_.reset();
82 }
83 
84 template<template<typename...> class PolicyTmpl, class... IOs>
85 template<std::size_t N>
86 typename Combiner<PolicyTmpl, IOs...>::template SinkType<N>& Combiner<PolicyTmpl, IOs...>::sink() noexcept
87 {
88  return std::get<N>(sinks_);
89 }
90 
91 template<template<typename...> class PolicyTmpl, class... IOs>
92 template<std::size_t N>
93 const typename Combiner<PolicyTmpl, IOs...>::template SinkType<N>& Combiner<PolicyTmpl, IOs...>::sink() const noexcept
94 {
95  return std::get<N>(sinks_);
96 }
97 
98 template<template<typename...> class PolicyTmpl, class... IOs>
99 const typename Combiner<PolicyTmpl, IOs...>::Policy& Combiner<PolicyTmpl, IOs...>::policy() const noexcept
100 {
101  return policy_;
102 }
103 
104 template<template<typename...> class PolicyTmpl, class... IOs>
106 {
107  helpers::for_each_apply<sizeof...(IOs)>(
108  [this](auto I)
109  {
110  std::get<I>(this->sinks_).disconnect_from_all_sources();
111  }
112  );
113 }
114 
115 template<template<typename...> class PolicyTmpl, class... IOs>
117 {
118  disconnect_from_all_sources();
119  this->disconnect_from_all_sinks();
120 }
121 
122 template<template<typename...> class PolicyTmpl, class... IOs>
123 template<std::size_t N>
125 {
126 }
127 
128 template<template<typename...> class PolicyTmpl, class... IOs>
129 template<std::size_t N, typename ThisSource, typename... OtherSources>
130 void Combiner<PolicyTmpl, IOs...>::connect_to_sources_impl(Connections& conn, ThisSource& src, OtherSources&... sources) noexcept
131 {
132  conn[N] = std::get<N>(sinks_).connect_to_source(src);
133  connect_to_sources_impl<N + 1>(conn, sources...);
134 }
135 
136 template<template<typename...> class PolicyTmpl, class... IOs>
138 {
139  Connections conn;
140  connect_to_sources_impl<0>(conn, sources...);
141  return conn;
142 }
143 
144 template<template<typename...> class PolicyTmpl, class... IOs>
145 template<class... Inputs>
147 {
148  parent_ = parent;
149 }
150 
151 template<template<typename...> class PolicyTmpl, class... IOs>
152 template<class... Inputs>
154 {
155  forward_ = f;
156 }
157 
158 template<template<typename...> class PolicyTmpl, class... IOs>
159 template<class... Inputs>
161 {
162  assert(parent_);
163  std::unique_lock<std::mutex> lock(parent_->combiner_mutex_);
164  if (forward_)
165  {
166  forward_(lock, std::forward_as_tuple(in...));
167  }
168 }
169 
170 } // namespace fkie_message_filters
171 
172 #endif /* INCLUDE_FKIE_MESSAGE_FILTERS_COMBINER_IMPL_H_ */
void disconnect() noexcept override
Disconnect from all connected sources and sinks.
helpers::io_tuple_t< helpers::io_concat_t< IOs... > > OutgoingTuple
Definition: combiner.h:133
void disconnect_from_all_sources() noexcept
Disconnect the sinks from their sources.
f
std::function< void(std::unique_lock< std::mutex > &, const Tuple &)> PolicyInFunc
Definition: combiner.h:139
const Policy & policy() const noexcept
Access combiner policy.
Definition: combiner_impl.h:99
std::array< Connection, NUM_SINKS > Connections
Array of connection objects.
Definition: combiner.h:82
SinkType< N > & sink() noexcept
Access the sink for the Nth input.
Connections connect_to_sources(helpers::io_rewrap_t< IOs, Source > &... sources) noexcept
Convenience function to connect all sinks at once.
void reset() noexcept override
Reset filter.
Definition: combiner_impl.h:78
void for_each_apply(Function f)
Definition: tuple.h:52
typename io_rewrap< IO, Wrap >::type io_rewrap_t
Definition: io.h:75
Primary namespace.
Definition: buffer.h:33
void connect_to_sources_impl(Connections &conn, ThisSource &src, OtherSources &... sources) noexcept
Combiner(const Policy &policy=Policy()) noexcept
Constructor.
Definition: combiner_impl.h:30
helpers::io_rewrap_t< helpers::select_nth< N, IOs... >, Sink > SinkType
Base class for the Nth sink.
Definition: combiner.h:85
Combine multiple sources into a single one.
Definition: combiner.h:72
PolicyTmpl< IOs... > Policy
Class type of the policy that applies to the combiner.
Definition: combiner.h:87
void set_policy(const Policy &policy) noexcept
Set combiner policy.
Definition: combiner_impl.h:70


fkie_message_filters
Author(s): Timo Röhling
autogenerated on Mon Feb 28 2022 22:21:43