demux.hpp
Go to the documentation of this file.
00001 
00006 /*****************************************************************************
00007 ** Ifdefs
00008 *****************************************************************************/
00009 
00010 #ifndef mm_mux_demux_DEMUX_HPP_
00011 #define mm_mux_demux_DEMUX_HPP_
00012 
00013 /*****************************************************************************
00014 ** Includes
00015 *****************************************************************************/
00016 
00017 #include <ecl/utilities/function_objects.hpp>
00018 #include <ecl/threads/mutex.hpp>
00019 #include <ecl/threads.hpp>
00020 #include <map>
00021 #include <memory>
00022 #include <mm_messages/verbosity.hpp>
00023 #include <string>
00024 
00025 /*****************************************************************************
00026 ** Namespaces
00027 *****************************************************************************/
00028 
00029 namespace mm_mux_demux {
00030 namespace impl {
00031 
00032 /*****************************************************************************
00033  ** Interfaces
00034  *****************************************************************************/
00035 
00036 class MessageDemux {
00037 public:
00038   MessageDemux(const std::string& name,
00039                const std::string& url,
00040                const mm_messages::Verbosity::Level& verbosity=mm_messages::Verbosity::QUIET,
00041                const bool bind = false
00042               );
00043   MessageDemux(const MessageDemux& other);
00044   ~MessageDemux();
00045   void spin();
00046   void shutdown();
00047 
00048   template<typename C>
00049   void registerSubscriber(
00050       const unsigned int& id,
00051       void (C::*processPacket)(const unsigned char*, const unsigned int&),
00052       C &s) {
00053     SubscriberMapResultPair ret;
00054     mutex.lock();
00055     BufferCallbackFunction function = new ecl::PartiallyBoundBinaryMemberFunction<C,const unsigned char*,const unsigned int&, void>(processPacket,s);
00056     ret = subscribers.insert(SubscriberMapPair(id, function));
00057     mutex.unlock();
00058     if ( !ret.second )  {
00059       // some error handling
00060     }
00061   }
00062 
00063   void unregisterSubscriber(const unsigned int& id);
00064 
00065 private:
00066 //  typedef ecl::UnaryFunction<const unsigned char*,void>* BufferCallbackFunction;
00067   typedef ecl::BinaryFunction<const unsigned char*,const unsigned int&,void>* BufferCallbackFunction;
00068   typedef std::map<unsigned int, BufferCallbackFunction> SubscriberMap;
00069   typedef std::map<unsigned int, BufferCallbackFunction>::iterator SubscriberMapIterator;
00070   typedef std::map<unsigned int, BufferCallbackFunction>::const_iterator SubscriberMapConstIterator;
00071   typedef std::pair<unsigned int, BufferCallbackFunction> SubscriberMapPair;
00072   typedef std::pair<std::map<unsigned int, BufferCallbackFunction>::iterator,bool> SubscriberMapResultPair;
00073 
00074   std::string name;
00075   std::string url;  // copy of the connection url used for debugging purposes.
00076   int socket;
00077   int endpoint_id; // used by nanomsg to id the endpoint connected to the socket (used in shutdown).
00078   mm_messages::Verbosity::Level verbosity;
00079   bool shutdown_requested;
00080   ecl::Thread thread;
00081   //std::thread thread;
00082   SubscriberMap subscribers;
00083   ecl::Mutex mutex;
00084 };
00085 
00086 } // namespace impl
00087 } // mm_mux_demux
00088 
00089 /*****************************************************************************
00090 ** Namespaces
00091 *****************************************************************************/
00092 
00093 namespace mm_mux_demux {
00094 
00095 /*****************************************************************************
00096 ** Interfaces
00097 *****************************************************************************/
00098 
00099 class MessageDemux {
00100 public:
00101   typedef std::map<std::string, std::shared_ptr<impl::MessageDemux>> DemuxMap;
00102   typedef std::pair<std::string, std::shared_ptr<impl::MessageDemux>> DemuxMapPair;
00103   typedef std::map<std::string, std::shared_ptr<impl::MessageDemux>>::iterator DemuxMapIterator;
00104   typedef std::map<std::string, std::shared_ptr<impl::MessageDemux>>::const_iterator DemuxMapConstIterator;
00105 
00106   static void start(const std::string& name,
00107                     const std::string& url,
00108                     const mm_messages::Verbosity::Level& verbosity = mm_messages::Verbosity::QUIET,
00109                     const bool bind = false);
00110   static void shutdown();  // shutdown all demuxes
00111   static void shutdown(const std::string& name);  // shutdown only this demux
00119   static DemuxMap& demultiplexers();
00120 
00135   template<typename C>
00136   static void registerSubscriber(
00137       const std::string& name,
00138       const unsigned int& id,
00139       void (C::*processPacket)(const unsigned char*, const unsigned int&),
00140       C &s)
00141   {
00142     DemuxMapIterator iter = demultiplexers().find(name);
00143     if ( iter != demultiplexers().end() ) {
00144       (iter->second)->registerSubscriber(id, &C::processPacket, s);
00145     } else {
00146       std::cout << "Demux : no demux by that name found (while registering subscriber)"<< std::endl;
00147     }
00148   }
00149 
00150   static void unregisterSubscriber(const std::string& name, const unsigned int& id);
00151 };
00152 
00153 typedef MessageDemux MessageClient;
00154 
00155 } // namespace mm_mux_demux
00156 
00157 #endif /* mm_mux_demux_DEMUX_HPP_ */


mm_mux_demux
Author(s): Daniel Stonier
autogenerated on Thu Jun 6 2019 21:13:22