Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008 #ifndef EVENTSYSTEM_H_
00009 #define EVENTSYSTEM_H_
00010
00011
00012 #include <boost/thread.hpp>
00013 #include <deque>
00014 #include <boost/thread/mutex.hpp>
00015 #include <boost/thread/condition.hpp>
00016 #include <boost/foreach.hpp>
00017 #include <boost/regex.hpp>
00018 #include <boost/bind.hpp>
00019
00020 namespace decision_making{
00021 using namespace std;
00022
00023 class CallContextParameters{
00024 public:
00025 virtual ~CallContextParameters(){}
00026 typedef boost::shared_ptr<CallContextParameters> Ptr;
00027 virtual std::string str()const=0;
00028
00029 };
00030 struct CallContext{
00031 private:
00032 CallContextParameters::Ptr _parameters;
00033 public:
00034 vector<string> stack;
00035
00036 CallContext(const CallContext& ctx, string name){
00037 if(ctx.stack.size()>0)
00038 stack = ctx.stack;
00039 push(name);
00040 _parameters = ctx._parameters;
00041
00042 }
00043 CallContext(string name){
00044 push(name);
00045
00046 }
00047 CallContext(){
00048
00049 }
00050 ~CallContext(){
00051
00052 }
00053 string str()const{
00054 if(stack.size()==0) return "/";
00055 stringstream path;
00056 BOOST_FOREACH(string s, stack){
00057 path<<"/"<<s;
00058 }
00059 return path.str();
00060 }
00061 void push(string name){ stack.push_back(name); }
00062 void pop(){ stack.pop_back(); }
00063
00064 template<class A>
00065 void createParameters(A* a= new A()){
00066 _parameters = CallContextParameters::Ptr(a);
00067 }
00068 bool isParametersDefined()const{ return _parameters.get()!=NULL; }
00069 struct ExceptionParametersUndefined{};
00070 template<class A>
00071 A& parameters()const{
00072 if(isParametersDefined()==false) throw ExceptionParametersUndefined();
00073 return *(boost::static_pointer_cast<A>(_parameters).get());
00074 }
00075 template<class A>
00076 A& parameters(){
00077 if(isParametersDefined()==false) createParameters<A>();
00078 return *(boost::static_pointer_cast<A>(_parameters).get());
00079 }
00080 };
00081 typedef CallContext FSMCallContext;
00082
00083 struct Event{
00084 string _name;
00085 Event(string lname, const CallContext& ctx){
00086 if(lname.size()==0){ _name=lname; return; }
00087 if(lname[0]=='/'){ _name=lname; return; }
00088 if(lname[0]=='@' and lname.size()<3){ _name=ctx.str(); return; }
00089 if(lname[0]=='@'){
00090 if(lname[1]=='/') _name = lname;
00091 else _name = '@'+ctx.str()+"/"+lname.substr(1);
00092 return;
00093 }
00094 _name = ctx.str()+"/"+lname;
00095 }
00096 Event(string lname = ""){
00097 if(lname.size()==0){ _name=lname; return; }
00098 if(lname[0]=='/'){ _name = lname; return; }
00099 _name = "/"+lname;
00100 }
00101 Event(const char _lname[]){
00102 string lname(_lname);
00103 if(lname.size()==0){ _name=lname; return; }
00104 if(lname[0]=='/'){ _name = lname; return; }
00105 _name = "/"+lname;
00106 }
00107 string name()const{ return _name; }
00108 string event_name()const{
00109 size_t i=0,l=i;
00110 while(i!=string::npos){
00111 l=i; i=_name.find('/',l+1);
00112 }
00113 return _name.substr(l+1,_name.size());
00114 }
00115 bool isUndefined()const{ return _name.size()==0; }
00116 bool isDefined()const{ return not isUndefined(); }
00117 bool isRegEx()const{ return _name.size()>0 and _name[0]=='@'; }
00118 bool equals(const Event& e)const{
00119 if(e.isRegEx() and !isRegEx()){ return e.regex(_name); }
00120 if(isRegEx() and !e.isRegEx()){ return regex(e._name); }
00121 return _name==e._name;
00122 }
00123 bool operator==(const Event& e)const{
00124 return equals(e);
00125 }
00126 bool operator!=(const Event& e)const{
00127 return not equals(e);
00128 }
00129 operator bool()const{ return isDefined(); }
00130
00131 bool regex(std::string text)const{
00132 if(isRegEx()==false) return _name==text;
00133 boost::regex e(_name.substr(1));
00134 return regex_match(text, e);
00135 }
00136 static Event SPIN_EVENT(){ return Event("/SPIN"); }
00137 };
00138 inline std::ostream& operator<<(std::ostream& o, Event t){
00139 return o<<"E["<<t.name()<<']';
00140 }
00141
00142 class EventQueue{
00143 protected:
00144 const bool isTransit;
00145 mutable boost::mutex events_mutex;
00146 boost::condition_variable on_new_event;
00147 std::deque<Event> events;
00148 bool events_system_stop;
00149 std::deque<EventQueue*> subs;
00150 EventQueue* parent;
00151 int max_unreaded_events_number;
00152 static const int max_unreaded_events_number_dif=1000;
00153 # define MUEN max_unreaded_events_number(max_unreaded_events_number_dif)
00154 public:
00155 EventQueue(EventQueue* parent):isTransit(false),events_system_stop(false),parent(parent),MUEN{
00156 if(parent){
00157 max_unreaded_events_number = parent->max_unreaded_events_number;
00158 parent->subscribe(this);
00159 }
00160 }
00161 EventQueue(EventQueue* parent, bool isTransit):isTransit(isTransit), events_system_stop(false),parent(parent),MUEN{
00162 if(parent){
00163 max_unreaded_events_number = parent->max_unreaded_events_number;
00164 parent->subscribe(this);
00165 }
00166 }
00167 EventQueue(int muen = max_unreaded_events_number_dif):isTransit(false),events_system_stop(false),parent(NULL),MUEN{
00168 max_unreaded_events_number = (muen);
00169 }
00170 # undef MUEN
00171 virtual ~EventQueue(){
00172 if(parent)
00173 parent->remove(this);
00174 {
00175 boost::mutex::scoped_lock l(events_mutex);
00176 events_system_stop = true;
00177 on_new_event.notify_all();
00178 BOOST_FOREACH(EventQueue* sub, subs)
00179 sub->close();
00180 }
00181 }
00182
00183 virtual void raiseEvent(const Event& e){
00184 if(parent) parent->raiseEvent(e);
00185 else addEvent(e);
00186 }
00187
00188
00189 virtual void riseEvent(const Event& e){ raiseEvent(e); }
00190
00191 private:
00192 void addEvent(Event e){
00193 boost::mutex::scoped_lock l(events_mutex);
00194 bool notify = true;
00195 if(not isTransit){
00196 if(events.empty()){
00197 events.push_back(e);
00198 }else{
00199 if(e==Event::SPIN_EVENT()){
00200 }else{
00201 if(events.size() > max_unreaded_events_number){
00202 events.pop_front();
00203 }
00204 events.push_back(e);
00205 }
00206 }
00207 }
00208 BOOST_FOREACH(EventQueue* sub, subs) sub->addEvent(e);
00209 if(notify) on_new_event.notify_one();
00210 }
00211 public:
00212 virtual Event waitEvent(){
00213 boost::mutex::scoped_lock l(events_mutex);
00214 while(events_system_stop==false && events.empty()) on_new_event.wait(l);
00215 if(events_system_stop)
00216 return Event();
00217 Event e = events.front();
00218 events.pop_front();
00219 return e;
00220 }
00221 Event tryGetEvent(bool& success){
00222 boost::mutex::scoped_lock l(events_mutex);
00223 success = false;
00224 if(events_system_stop or events.empty())
00225 return Event();
00226 Event e = events.front();
00227 events.pop_front();
00228 success = true;
00229 return e;
00230 }
00231 void drop_all(){
00232 boost::mutex::scoped_lock l(events_mutex);
00233 events.clear();
00234 }
00235 public:
00236 void close(){
00237 boost::mutex::scoped_lock l(events_mutex);
00238 events_system_stop = true;
00239 on_new_event.notify_all();
00240 BOOST_FOREACH(EventQueue* sub, subs)
00241 sub->close();
00242 }
00243 public:
00244 void subscribe(EventQueue* sub){
00245 boost::mutex::scoped_lock l(events_mutex);
00246 if(events_system_stop) return;
00247 subs.push_back(sub);
00248 }
00249 private:
00250 void remove(EventQueue* sub){
00251 boost::mutex::scoped_lock l(events_mutex);
00252 for(deque<EventQueue*>::iterator i=subs.begin();i!=subs.end();i++){
00253 if((*i)==sub){
00254 subs.erase(i);
00255 break;
00256 }
00257 }
00258 }
00259 public:
00260 bool isTerminated()const{
00261 boost::mutex::scoped_lock l(events_mutex);
00262 return events_system_stop;
00263 }
00264 typedef boost::shared_ptr<EventQueue> Ptr;
00265 #define EQ_SPINNER_DIF_RATE 10
00266 class Spinner{
00267 friend class EventQueue;
00268 EventQueue& events;
00269 Spinner(EventQueue& events):events(events){}
00270 boost::thread_group threads;
00271 void spinOne(){
00272 events.riseEvent(Event::SPIN_EVENT());
00273
00274 }
00275 void spin(double rate = EQ_SPINNER_DIF_RATE){
00276 while(events.check_external_ok() and not events.isTerminated()){
00277 spinOne();
00278 boost::this_thread::sleep(boost::posix_time::seconds(1.0/rate));
00279 }
00280 if(not events.check_external_ok() and not events.isTerminated()){
00281 events.close();
00282 }
00283 }
00284 void start(double rate = EQ_SPINNER_DIF_RATE){
00285 threads.add_thread(new boost::thread(boost::bind(&EventQueue::Spinner::spin, this, rate)));
00286 }
00287 public:
00288 ~Spinner(){ if(events.check_external_ok()) threads.join_all(); }
00289 };
00290
00291 virtual bool check_external_ok(){return true;}
00292 void spinOne(){ Spinner s(*this); s.spinOne(); }
00293 void spin(double rate=EQ_SPINNER_DIF_RATE){ Spinner s(*this); s.spin(rate); }
00294 void async_spin(double rate=EQ_SPINNER_DIF_RATE, double start_delay=0.0){
00295 boost::this_thread::sleep(boost::posix_time::seconds(start_delay));
00296 _spinner = boost::shared_ptr<Spinner>(new Spinner (*this));
00297 _spinner->start(rate);
00298 }
00299 private:
00300 boost::shared_ptr<Spinner> _spinner;
00301 };
00302
00303
00304 class MapResultEvent{
00305 typedef std::map<int,string> mapper_item_type;
00306 typedef std::map<string, mapper_item_type> mapper_type;
00307 public:
00308 static mapper_type& get(){ static mapper_type mapper; return mapper; }
00309
00310 static string map(string task, int result){
00311 std::stringstream res; res<<result;
00312 mapper_type& mapper = get();
00313 if(mapper.find(task)==mapper.end()) return res.str();
00314 mapper_item_type& item = mapper[task];
00315 if(item.find(result)==item.end()) return res.str();
00316 return item[result];
00317 }
00318 static void map(string task, int result, string event){
00319 mapper_type& mapper = get();
00320 mapper[task][result]=event;
00321 }
00322 };
00323
00324 }
00325
00326 #endif