21 #ifndef INCLUDE_FKIE_MESSAGE_FILTERS_COMBINER_IMPL_H_
22 #define INCLUDE_FKIE_MESSAGE_FILTERS_COMBINER_IMPL_H_
29 template<
template<
typename...>
class PolicyTmpl,
class... IOs>
33 helpers::for_each_apply<std::tuple_size<IncomingTuples>::value>(
36 std::get<I>(this->sinks_).set_parent(
this);
42 template<
template<
typename...>
class PolicyTmpl,
class... IOs>
45 policy_.set_emitter_callback(
46 [
this](
const OutgoingTuple& out)
48 helpers::index_apply<std::tuple_size<OutgoingTuple>::value>(
49 [
this, &out](
auto... Is)
51 this->send(std::get<Is>(out)...);
56 helpers::for_each_apply<std::tuple_size<IncomingTuples>::value>(
59 std::get<I>(this->sinks_).set_policy_input(
60 [
this](std::unique_lock<std::mutex>& lock,
auto&& in)
62 this->policy_.template add<decltype(I)::value>(lock, std::forward<decltype(in)>(in));
69 template<
template<
typename...>
class PolicyTmpl,
class... IOs>
72 std::lock_guard<std::mutex> lock(combiner_mutex_);
77 template<
template<
typename...>
class PolicyTmpl,
class... IOs>
80 std::lock_guard<std::mutex> lock(combiner_mutex_);
84 template<
template<
typename...>
class PolicyTmpl,
class... IOs>
85 template<std::
size_t N>
88 return std::get<N>(sinks_);
91 template<
template<
typename...>
class PolicyTmpl,
class... IOs>
92 template<std::
size_t N>
95 return std::get<N>(sinks_);
98 template<
template<
typename...>
class PolicyTmpl,
class... IOs>
104 template<
template<
typename...>
class PolicyTmpl,
class... IOs>
110 std::get<I>(this->sinks_).disconnect_from_all_sources();
115 template<
template<
typename...>
class PolicyTmpl,
class... IOs>
118 disconnect_from_all_sources();
119 this->disconnect_from_all_sinks();
122 template<
template<
typename...>
class PolicyTmpl,
class... IOs>
123 template<std::
size_t N>
128 template<
template<
typename...>
class PolicyTmpl,
class... IOs>
129 template<std::size_t N,
typename ThisSource,
typename... OtherSources>
132 conn[N] = std::get<N>(sinks_).connect_to_source(src);
133 connect_to_sources_impl<N + 1>(conn, sources...);
136 template<
template<
typename...>
class PolicyTmpl,
class... IOs>
140 connect_to_sources_impl<0>(conn, sources...);
144 template<
template<
typename...>
class PolicyTmpl,
class... IOs>
145 template<
class... Inputs>
151 template<
template<
typename...>
class PolicyTmpl,
class... IOs>
152 template<
class... Inputs>
158 template<
template<
typename...>
class PolicyTmpl,
class... IOs>
159 template<
class... Inputs>
163 std::unique_lock<std::mutex> lock(parent_->combiner_mutex_);
166 forward_(lock, std::forward_as_tuple(in...));