EventSystem.h
Go to the documentation of this file.
00001 /*
00002  * EventSystem.h
00003  *
00004  *  Created on: Nov 14, 2013
00005  *      Author: dan
00006  */
00007 
00008 #ifndef EVENTSYSTEM_H_
00009 #define EVENTSYSTEM_H_
00010 
00011 
00012 #include <boost/thread.hpp>
00013 #include <deque>
00014 #include <boost/thread/mutex.hpp>
00015 #include <boost/thread/condition.hpp>
00016 #include <boost/foreach.hpp>
00017 #include <boost/regex.hpp>
00018 #include <boost/bind.hpp>
00019 
00020 namespace decision_making{
00021         using namespace std;
00022 
00023 class CallContextParameters{
00024 public:
00025                 virtual ~CallContextParameters(){}
00026                 typedef boost::shared_ptr<CallContextParameters> Ptr;
00027                 virtual std::string str()const=0;
00028 
00029 };
00030 struct CallContext{
00031 private:
00032         CallContextParameters::Ptr _parameters;
00033 public:
00034         vector<string> stack;
00035 
00036         CallContext(const CallContext& ctx, string name){
00037                 if(ctx.stack.size()>0)
00038                         stack = ctx.stack;
00039                 push(name);
00040                 _parameters = ctx._parameters;
00041                 //cout<<"[ ctx created : "<<str()<<" ]";
00042         }
00043         CallContext(string name){
00044                 push(name);
00045                 //cout<<"[ ctx created : "<<str()<<" ]";
00046         }
00047         CallContext(){
00048                 //cout<<"[ ctx created : "<<str()<<" ]";
00049         }
00050         ~CallContext(){
00051                 //cout<<"[ ctx deleted : "<<str()<<" ]";
00052         }
00053         string str()const{
00054                 if(stack.size()==0) return "/";
00055                 stringstream path;
00056                 BOOST_FOREACH(string s, stack){
00057                         path<<"/"<<s;
00058                 }
00059                 return path.str();
00060         }
00061         void push(string name){ stack.push_back(name); }
00062         void pop(){ stack.pop_back(); }
00063 
00064         template<class A>
00065         void createParameters(A* a= new A()){
00066                 _parameters = CallContextParameters::Ptr(a);
00067         }
00068         bool isParametersDefined()const{ return _parameters.get()!=NULL; }
00069         struct ExceptionParametersUndefined{};
00070         template<class A>
00071         A& parameters()const{
00072                 if(isParametersDefined()==false) throw ExceptionParametersUndefined();
00073                 return *(boost::static_pointer_cast<A>(_parameters).get());
00074         }
00075         template<class A>
00076         A& parameters(){
00077                 if(isParametersDefined()==false) createParameters<A>();
00078                 return *(boost::static_pointer_cast<A>(_parameters).get());
00079         }
00080 };
00081 typedef CallContext FSMCallContext;
00082 
00083 struct Event{
00084         string _name;
00085         Event(string lname, const CallContext& ctx){
00086                 if(lname.size()==0){ _name=lname; return; }
00087                 if(lname[0]=='/'){ _name=lname; return; }
00088                 if(lname[0]=='@' and lname.size()<3){ _name=ctx.str(); return; }
00089                 if(lname[0]=='@'){
00090                         if(lname[1]=='/') _name = lname;
00091                         else _name = '@'+ctx.str()+"/"+lname.substr(1);
00092                         return;
00093                 }
00094                 _name = ctx.str()+"/"+lname;
00095         }
00096         Event(string lname = ""){
00097                 if(lname.size()==0){ _name=lname; return; }
00098                 if(lname[0]=='/'){ _name = lname; return; }
00099                 _name = "/"+lname;
00100         }
00101         Event(const char _lname[]){
00102                 string lname(_lname);
00103                 if(lname.size()==0){ _name=lname; return; }
00104                 if(lname[0]=='/'){ _name = lname; return; }
00105                 _name = "/"+lname;
00106         }
00107         string name()const{ return _name; }
00108         string event_name()const{
00109                 size_t i=0,l=i;
00110                 while(i!=string::npos){
00111                         l=i; i=_name.find('/',l+1);
00112                 }
00113                 return _name.substr(l+1,_name.size());
00114         }
00115         bool isUndefined()const{ return _name.size()==0; }
00116         bool isDefined()const{ return not isUndefined(); }
00117         bool isRegEx()const{ return _name.size()>0 and _name[0]=='@'; }
00118         bool equals(const Event& e)const{
00119                 if(e.isRegEx() and !isRegEx()){ return e.regex(_name); }
00120                 if(isRegEx() and !e.isRegEx()){ return regex(e._name); }
00121                 return _name==e._name;
00122         }
00123         bool operator==(const Event& e)const{
00124                 return equals(e);
00125         }
00126         bool operator!=(const Event& e)const{
00127                 return not equals(e);
00128         }
00129         operator bool()const{ return isDefined(); }
00130 
00131         bool regex(std::string text)const{
00132                 if(isRegEx()==false) return _name==text;
00133                 boost::regex e(_name.substr(1));
00134                 return regex_match(text, e);
00135         }
00136         static Event SPIN_EVENT(){ return Event("/SPIN"); }
00137 };
00138 inline std::ostream& operator<<(std::ostream& o, Event t){
00139         return o<<"E["<<t.name()<<']';
00140 }
00141 
00142 class EventQueue{
00143 protected:
00144         const bool isTransit;
00145         mutable boost::mutex events_mutex;
00146         boost::condition_variable on_new_event;
00147         std::deque<Event> events;
00148         bool events_system_stop;
00149         std::deque<EventQueue*> subs;
00150         EventQueue* parent;
00151         int max_unreaded_events_number;
00152         static const int max_unreaded_events_number_dif=1000;
00153 #       define MUEN max_unreaded_events_number(max_unreaded_events_number_dif)
00154 public:
00155         EventQueue(EventQueue* parent):isTransit(false),events_system_stop(false),parent(parent),MUEN{
00156                 if(parent){
00157                         max_unreaded_events_number = parent->max_unreaded_events_number;
00158                         parent->subscribe(this);
00159                 }
00160         }
00161         EventQueue(EventQueue* parent, bool isTransit):isTransit(isTransit), events_system_stop(false),parent(parent),MUEN{
00162                 if(parent){
00163                         max_unreaded_events_number = parent->max_unreaded_events_number;
00164                         parent->subscribe(this);
00165                 }
00166         }
00167         EventQueue(int muen = max_unreaded_events_number_dif):isTransit(false),events_system_stop(false),parent(NULL),MUEN{
00168                 max_unreaded_events_number = (muen);
00169         }
00170 #       undef MUEN
00171         virtual ~EventQueue(){
00172                 if(parent)
00173                         parent->remove(this);
00174                 {
00175                         boost::mutex::scoped_lock l(events_mutex);
00176                         events_system_stop = true;
00177                         on_new_event.notify_all();
00178                         BOOST_FOREACH(EventQueue* sub, subs)
00179                                 sub->close();
00180                 }
00181         }
00182 
00183         virtual void raiseEvent(const Event& e){
00184                 if(parent) parent->raiseEvent(e);
00185                 else addEvent(e);
00186         }
00187 
00188         //Deprecated
00189         virtual void riseEvent(const Event& e){ raiseEvent(e); }
00190 
00191 private:
00192         void addEvent(Event e){
00193                 boost::mutex::scoped_lock l(events_mutex);
00194                 bool notify = true;
00195                 if(not isTransit){
00196                         if(events.empty()){
00197                                 events.push_back(e);
00198                         }else{
00199                                 if(e==Event::SPIN_EVENT()){
00200                                 }else{
00201                                         if(events.size() > max_unreaded_events_number){
00202                                                 events.pop_front();
00203                                         }
00204                                         events.push_back(e);
00205                                 }
00206                         }
00207                 }
00208                 BOOST_FOREACH(EventQueue* sub, subs) sub->addEvent(e);
00209                 if(notify) on_new_event.notify_one();
00210         }
00211 public:
00212         virtual Event waitEvent(){
00213                 boost::mutex::scoped_lock l(events_mutex);
00214                 while(events_system_stop==false && events.empty())      on_new_event.wait(l);
00215                 if(events_system_stop)
00216                         return Event();
00217                 Event e = events.front();
00218                 events.pop_front();
00219                 return e;
00220         }
00221         Event tryGetEvent(bool& success){
00222                 boost::mutex::scoped_lock l(events_mutex);
00223                 success = false;
00224                 if(events_system_stop or events.empty())
00225                         return Event();
00226                 Event e = events.front();
00227                 events.pop_front();
00228                 success = true;
00229                 return e;
00230         }
00231         void drop_all(){
00232                 boost::mutex::scoped_lock l(events_mutex);
00233                 events.clear();
00234         }
00235 public:
00236         void close(){
00237                 boost::mutex::scoped_lock l(events_mutex);
00238                 events_system_stop = true;
00239                 on_new_event.notify_all();
00240                 BOOST_FOREACH(EventQueue* sub, subs)
00241                         sub->close();
00242         }
00243 public:
00244         void subscribe(EventQueue* sub){
00245                 boost::mutex::scoped_lock l(events_mutex);
00246                 if(events_system_stop) return;
00247                 subs.push_back(sub);
00248         }
00249 private:
00250         void remove(EventQueue* sub){
00251                 boost::mutex::scoped_lock l(events_mutex);
00252                 for(deque<EventQueue*>::iterator i=subs.begin();i!=subs.end();i++){
00253                         if((*i)==sub){
00254                                 subs.erase(i);
00255                                 break;
00256                         }
00257                 }
00258         }
00259 public:
00260         bool isTerminated()const{
00261                 boost::mutex::scoped_lock l(events_mutex);
00262                 return events_system_stop;
00263         }
00264         typedef boost::shared_ptr<EventQueue> Ptr;
00265 #define EQ_SPINNER_DIF_RATE 10
00266         class Spinner{
00267                 friend class EventQueue;
00268                 EventQueue& events;
00269                 Spinner(EventQueue& events):events(events){}
00270                 boost::thread_group threads;
00271                 void spinOne(){
00272                         events.riseEvent(Event::SPIN_EVENT());
00273                         //cout<<"[spin]"<<endl;
00274                 }
00275                 void spin(double rate = EQ_SPINNER_DIF_RATE){
00276                         while(events.check_external_ok() and not events.isTerminated()){
00277                                 spinOne();
00278                                 boost::this_thread::sleep(boost::posix_time::seconds(1.0/rate));
00279                         }
00280                         if(not events.check_external_ok() and not events.isTerminated()){
00281                                 events.close();
00282                         }
00283                 }
00284                 void start(double rate = EQ_SPINNER_DIF_RATE){
00285                         threads.add_thread(new boost::thread(boost::bind(&EventQueue::Spinner::spin, this, rate)));
00286                 }
00287         public:
00288                 ~Spinner(){ if(events.check_external_ok()) threads.join_all(); }
00289         };
00290 
00291         virtual bool check_external_ok(){return true;}
00292         void spinOne(){ Spinner s(*this); s.spinOne(); }
00293         void spin(double rate=EQ_SPINNER_DIF_RATE){ Spinner s(*this); s.spin(rate); }
00294         void async_spin(double rate=EQ_SPINNER_DIF_RATE, double start_delay=0.0){
00295                 boost::this_thread::sleep(boost::posix_time::seconds(start_delay));
00296                 _spinner = boost::shared_ptr<Spinner>(new Spinner (*this));
00297                 _spinner->start(rate);
00298         }
00299 private:
00300         boost::shared_ptr<Spinner> _spinner;
00301 };
00302 
00303 
00304 class MapResultEvent{
00305         typedef std::map<int,string> mapper_item_type;
00306         typedef std::map<string, mapper_item_type> mapper_type;
00307 public:
00308         static mapper_type& get(){ static mapper_type mapper; return mapper; }
00309 
00310         static string map(string task, int result){
00311                 std::stringstream res; res<<result;
00312                 mapper_type& mapper = get();
00313                 if(mapper.find(task)==mapper.end()) return res.str();
00314                 mapper_item_type& item = mapper[task];
00315                 if(item.find(result)==item.end()) return res.str();
00316                 return item[result];
00317         }
00318         static void map(string task, int result, string event){
00319                 mapper_type& mapper = get();
00320                 mapper[task][result]=event;
00321         }
00322 };
00323 
00324 }
00325 
00326 #endif /* EVENTSYSTEM_H_ */


decision_making
Author(s):
autogenerated on Wed Aug 26 2015 11:16:53