Go to the documentation of this file.00001
00006
00007
00008
00009
00010 #ifndef mm_mux_demux_DEMUX_HPP_
00011 #define mm_mux_demux_DEMUX_HPP_
00012
00013
00014
00015
00016
00017 #include <ecl/utilities/function_objects.hpp>
00018 #include <ecl/threads/mutex.hpp>
00019 #include <ecl/threads.hpp>
00020 #include <map>
00021 #include <memory>
00022 #include <mm_messages/verbosity.hpp>
00023 #include <string>
00024
00025
00026
00027
00028
00029 namespace mm_mux_demux {
00030 namespace impl {
00031
00032
00033
00034
00035
00036 class MessageDemux {
00037 public:
00038 MessageDemux(const std::string& name,
00039 const std::string& url,
00040 const mm_messages::Verbosity::Level& verbosity=mm_messages::Verbosity::QUIET,
00041 const bool bind = false
00042 );
00043 MessageDemux(const MessageDemux& other);
00044 ~MessageDemux();
00045 void spin();
00046 void shutdown();
00047
00048 template<typename C>
00049 void registerSubscriber(
00050 const unsigned int& id,
00051 void (C::*processPacket)(const unsigned char*, const unsigned int&),
00052 C &s) {
00053 SubscriberMapResultPair ret;
00054 mutex.lock();
00055 BufferCallbackFunction function = new ecl::PartiallyBoundBinaryMemberFunction<C,const unsigned char*,const unsigned int&, void>(processPacket,s);
00056 ret = subscribers.insert(SubscriberMapPair(id, function));
00057 mutex.unlock();
00058 if ( !ret.second ) {
00059
00060 }
00061 }
00062
00063 void unregisterSubscriber(const unsigned int& id);
00064
00065 private:
00066
00067 typedef ecl::BinaryFunction<const unsigned char*,const unsigned int&,void>* BufferCallbackFunction;
00068 typedef std::map<unsigned int, BufferCallbackFunction> SubscriberMap;
00069 typedef std::map<unsigned int, BufferCallbackFunction>::iterator SubscriberMapIterator;
00070 typedef std::map<unsigned int, BufferCallbackFunction>::const_iterator SubscriberMapConstIterator;
00071 typedef std::pair<unsigned int, BufferCallbackFunction> SubscriberMapPair;
00072 typedef std::pair<std::map<unsigned int, BufferCallbackFunction>::iterator,bool> SubscriberMapResultPair;
00073
00074 std::string name;
00075 std::string url;
00076 int socket;
00077 int endpoint_id;
00078 mm_messages::Verbosity::Level verbosity;
00079 bool shutdown_requested;
00080 ecl::Thread thread;
00081
00082 SubscriberMap subscribers;
00083 ecl::Mutex mutex;
00084 };
00085
00086 }
00087 }
00088
00089
00090
00091
00092
00093 namespace mm_mux_demux {
00094
00095
00096
00097
00098
00099 class MessageDemux {
00100 public:
00101 typedef std::map<std::string, std::shared_ptr<impl::MessageDemux>> DemuxMap;
00102 typedef std::pair<std::string, std::shared_ptr<impl::MessageDemux>> DemuxMapPair;
00103 typedef std::map<std::string, std::shared_ptr<impl::MessageDemux>>::iterator DemuxMapIterator;
00104 typedef std::map<std::string, std::shared_ptr<impl::MessageDemux>>::const_iterator DemuxMapConstIterator;
00105
00106 static void start(const std::string& name,
00107 const std::string& url,
00108 const mm_messages::Verbosity::Level& verbosity = mm_messages::Verbosity::QUIET,
00109 const bool bind = false);
00110 static void shutdown();
00111 static void shutdown(const std::string& name);
00119 static DemuxMap& demultiplexers();
00120
00135 template<typename C>
00136 static void registerSubscriber(
00137 const std::string& name,
00138 const unsigned int& id,
00139 void (C::*processPacket)(const unsigned char*, const unsigned int&),
00140 C &s)
00141 {
00142 DemuxMapIterator iter = demultiplexers().find(name);
00143 if ( iter != demultiplexers().end() ) {
00144 (iter->second)->registerSubscriber(id, &C::processPacket, s);
00145 } else {
00146 std::cout << "Demux : no demux by that name found (while registering subscriber)"<< std::endl;
00147 }
00148 }
00149
00150 static void unregisterSubscriber(const std::string& name, const unsigned int& id);
00151 };
00152
00153 typedef MessageDemux MessageClient;
00154
00155 }
00156
00157 #endif