cassandraBag.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 import roslib; roslib.load_manifest('cassandra_ros')
00003 import rospy
00004 import rostopic
00005 
00006 from cassandra_ros.srv import *
00007 import RosCassandra as rc
00008 
00009 import time
00010 import threading
00011 from PyQt4.QtCore import QThread
00012 
00013 
00014 
00015 def enum(**enums):
00016     return type('Enum', (), enums)
00017 
00018 running = enum(START=1, STOP=0, PAUSE=-1)
00019 
00020 class RosCassandraBag(QThread):
00021     MAX_GWMTIME = 4294967295
00022     MIN_GWMTIME = 0
00023     
00024     def __init__(self, casTopic, parent=None):
00025         QThread.__init__(self,parent)
00026         self.casTopic= casTopic
00027         
00028         self.record_start_time = self.MIN_GWMTIME
00029         self.record_stop_time  = self.MAX_GWMTIME
00030         self.record_status     = running.STOP
00031         self.record_filter     = None
00032         
00033         self.play_start_time = self.MIN_GWMTIME
00034         self.play_stop_time  = self.MAX_GWMTIME
00035         self.play_status     = running.STOP
00036         self.play_filter     = None
00037         
00038     def record(self, msg):
00039         # start recording
00040         
00041         # Andre
00042         #msg = msg.transforms.pop()
00043         
00044         if rospy.get_time() >= self.record_start_time:
00045             if self.record_status == running.START:
00046                 if self.record_filter:
00047                     if( eval(self.record_filter) ):
00048                         self.casTopic.addData(msg, ttl=self.record_ttl)
00049                 else:
00050                     self.casTopic.addData(msg) 
00051         
00052         if rospy.get_time() >= self.record_stop_time:
00053             self.stopRecord()
00054             
00055     def startRecord(self, start_time=MIN_GWMTIME, stop_time=MAX_GWMTIME, filter=None, ttl=None):
00056         self.record_start_time = min(self.record_start_time, start_time)
00057         self.record_stop_time  = max(self.record_stop_time, stop_time)
00058         self.record_filter     = filter
00059         self.record_ttl        = ttl
00060         
00061         self.record_status     = running.START
00062         
00063         self.subscriber = rospy.Subscriber(self.casTopic.topic, self.casTopic.MsgClass, self.record)
00064             
00065     def stopRecord(self):
00066         if self.record_status == running.START or self.record_status == running.PAUSE:
00067             self.record_status = running.STOP
00068             self.subscriber.unregister()
00069         
00070     def pauseRecord(self):
00071         if self.record_status == running.START:
00072             self.record_status = running.PAUSE
00073         elif self.record_status == running.PAUSE:        
00074             self.record_status = running.START
00075     
00076     def exit(self):
00077         self.stopPlay()
00078         self.stopRecord()
00079         
00080     def startPlay(self, start_time=MIN_GWMTIME, stop_time=MAX_GWMTIME, speed=1, delay=0, queuesize=100, loop=False, filter=None):
00081         self.play_start_time    = start_time
00082         self.play_stop_time     = stop_time
00083         self.play_speed         = speed
00084         self.play_delay         = delay
00085         self.play_queuesize     = queuesize
00086         self.play_loop          = loop
00087         self.play_filter        = filter
00088         
00089         self.play_status     = running.START
00090         
00091         if not self.play_filter or self.play_filter == "":
00092             self.play_filter = None
00093         
00094         self.publisher = rospy.Publisher(self.casTopic.topic, self.casTopic.MsgClass)
00095         # start thread
00096         self.start()
00097         
00098     def stopPlay(self):
00099         if self.isRunning():
00100             self.terminate()
00101         
00102 
00103     def pausePlay(self):
00104         # waiting
00105         if self.isRunning():
00106             if self.play_status == running.START:
00107                 self.play_status = running.PAUSE
00108             elif self.play_status == running.PAUSE:
00109                 self.play_status = running.START
00110     
00111     # play ...
00112     def run(self):
00113         rospy.sleep(self.play_delay)
00114         
00115         while True:
00116             
00117             from_key = str(self.play_start_time)
00118             to_key   = str(self.play_stop_time)
00119             
00120             _last_time   = long(from_key)
00121             _currentTime = rospy.Time.now().to_nsec()
00122             
00123             while True:
00124                 data = self.casTopic.getData(from_key, to_key, self.play_queuesize)
00125                 
00126                 for dat in data:
00127                     from_key, msg = dat
00128                     
00129                     timestamp = long(from_key)
00130                     
00131                     if _last_time > 0:
00132                         delta_t = float(timestamp - _last_time) / (self.play_speed*1000000000)
00133                     else:
00134                         delta_t = 0
00135 
00136                     time.sleep(delta_t)                    
00137                     
00138                     _last_time = timestamp
00139                     
00140                     if self.play_filter:
00141                         if eval(self.play_filter):
00142                             self.publisher.publish(msg)
00143                     else:
00144                         self.publisher.publish(msg)
00145                     
00146                     # pause                    
00147                     while self.play_status == running.PAUSE:
00148                         self.yieldCurrentThread()
00149                     
00150                 
00151                 from_key = str(long(from_key)+1)
00152                                 
00153                 # end reached 
00154                 if len(data) < self.play_queuesize:
00155                     break
00156             
00157             if not self.play_loop:
00158                 break
00159 
00160         self.play_status     = running.STOP
00161         rospy.loginfo("STOP")
00162 
00163 
00164 def handle_record(req):
00165     global rosCas, bag
00166     response = ""
00167     
00168     if req.ttl == 0:
00169             req.ttl = None
00170     
00171     for topic in (req.topics):
00172         # start recording
00173         if req.record==1:
00174             if not bag.has_key(topic):
00175                 if not rosCas.existTopic(topic):
00176                     
00177                     
00178                     
00179                     msg_class, _, _ = rostopic.get_topic_class(topic, blocking=True)
00180                     
00181                     rosCas.addTopic(topic,
00182                                     req.cassandra_format,
00183                                     msg_class.__name__,
00184                                     msg_class.__module__.split(".")[0],
00185                                     'time', None,
00186                                     comment='')
00187                 
00188                 
00189                 bag[topic] =  RosCassandraBag(rosCas.getTopic(topic))
00190             
00191             bag[topic].startRecord(req.start_time, req.stop_time, req.filter, req.ttl)
00192             rospy.loginfo("start recording: "+topic)
00193             
00194         # stop recording
00195         elif req.record == 0:
00196             if bag.has_key(topic):
00197                 bag[topic].stopRecord()
00198                 rospy.loginfo("stop recording: "+topic)
00199         
00200         # pause recording
00201         else:
00202             if bag.has_key(topic):
00203                 bag[topic].pauseRecord()
00204                 rospy.loginfo("pause recording: "+topic)
00205                 
00206     return response
00207 
00208 def handle_play(req):
00209     global rosCas, bag
00210     response = ""
00211     for topic in (req.topics):
00212         # start playing    
00213         if req.play == 1:
00214             if not bag.has_key(topic):
00215                 if rosCas.existTopic(topic):
00216                     bag[topic] =  RosCassandraBag(rosCas.getTopic(topic))
00217                     bag[topic].startPlay(start_time = req.start_time,
00218                                          stop_time  = req.stop_time,
00219                                          speed      = req.speed,
00220                                          delay      = req.delay,
00221                                          queuesize  = req.queuesize,
00222                                          loop       = req.loop,
00223                                          filter     = req.filter)
00224                 else:
00225                     rospy.loginfo("topic ("+topic+") does not exist: ")
00226             
00227                              
00228             else:
00229                 bag[topic].startPlay(start_time = req.start_time,
00230                                      stop_time  = req.stop_time,
00231                                      speed      = req.speed,
00232                                      delay      = req.delay,
00233                                      queuesize  = req.queuesize,
00234                                      loop       = req.loop,
00235                                      filter     = req.filter)
00236             
00237             rospy.loginfo("start playing: "+topic)
00238         # stop playing
00239         elif req.play == 0:
00240             if bag.has_key(topic):
00241                 bag[topic].stopPlay()
00242                 rospy.loginfo("stop playing: "+topic)
00243         
00244         # pause playing
00245         elif req.play == -1:
00246             if bag.has_key(topic):
00247                 bag[topic].pausePlay()
00248                 rospy.loginfo("pause playing: "+topic)
00249     
00250     return response
00251     
00252 def handle_delete(req):
00253     global rosCas, bag
00254     response = ""
00255     for topic in (req.topics):
00256         # start delete
00257         if rosCas.existTopic(topic):
00258             _topic = rosCas.getTopic(topic)
00259             rospy.loginfo("deleting "+topic)
00260             _topic.removeData(key=str(req.start_time), to_key=str(req.stop_time))
00261         else:
00262             rospy.loginfo("deleting failed, topic ("+topic+") does not exist")
00263     return response
00264 
00265 def handle_truncate(req):
00266     global rosCas, bag
00267     response = ""
00268     for topic in (req.topics):
00269         # start delete
00270         if rosCas.existTopic(topic):
00271             rosCas.removeTopic(topic)
00272             rospy.loginfo("truncate "+topic)
00273         else:
00274             rospy.loginfo("truncate failed, topic ("+topic+") does not exist")
00275     return response
00276     
00277 def handle_info(req):
00278     global rosCas, bag
00279     response = "\n"
00280     print req.command
00281     # return all available topics
00282     if req.command == 'list':
00283         topics = rosCas.getAllTopics()
00284         response += "number of topics stored in CassandraDB: "+str(len(topics)) +"\n"
00285         for i in range(len(topics)):
00286             response += str(i+1)+". "+topics[i]+" ("+str(rosCas.countTopicData(topics[i]))+")"+"\n"
00287             
00288     elif req.command == 'status':
00289         response += "list of connected hosts: "+str(rosCas.host) +"\n"
00290         for topic in bag.keys():
00291             if bag[topic].play_status == running.START:
00292                 response += topic + ": playback is running\n"
00293             elif bag[topic].play_status == running.PAUSE:
00294                 response += topic + ": playback is paused\n"
00295             elif bag[topic].record_status == running.START:
00296                 response += topic + ": recording is running\n"
00297             elif bag[topic].record_status == running.PAUSE:
00298                 response += topic + ": recording is paused\n"
00299             else:
00300                 response += topic + ": is idle\n"
00301                 
00302     elif req.command == 'info':
00303         for topic in req.topics:
00304             meta = rosCas.getTopicMeta(topic)
00305             for key in meta.keys():
00306                 response += key+": "+str(meta[key])+"\n"
00307             response += "column name: "+rosCas.topic2Hash(topic)+"\n"
00308             response += "number of entries: "+str(rosCas.countTopicData(topic))+"\n"
00309             
00310     elif req.command == 'cql':
00311         print rosCas.exequteCQL(req.topics[0])
00312             
00313     else:
00314         rsp += "unknown command: "+req.command+"\n"
00315         
00316     return response
00317         
00318 if __name__ == "__main__":
00319     host =      rospy.get_param('/cassandraBag/host', "localhost")
00320     port =      rospy.get_param('/cassandraBag/port', 9160)
00321     keyspace =  rospy.get_param('/cassandraBag/keyspace', "test")
00322     
00323     rosCas = rc.RosCassandra(host, port)
00324     rospy.loginfo("connected to Cassandra on %s:%d"%(host,port))
00325     
00326 #    rosCas.dropKeyspace(keyspace)
00327     if not rosCas.connectToKeyspace(keyspace):
00328         rosCas.createKeyspace(keyspace)
00329         
00330     rosCas.connectToKeyspace(keyspace)
00331     rospy.loginfo("connected to Keyspace \"%s\""%(keyspace))
00332     
00333     bag = {}
00334     
00335     rospy.init_node('cassandraBag')
00336     service = {}
00337     service['record']   = rospy.Service('cassandra_record',     record,     handle_record)
00338     service['play']     = rospy.Service('cassandra_play',       play,       handle_play)
00339     service['delete']   = rospy.Service('cassandra_delete',     delete,     handle_delete)
00340     service['info']     = rospy.Service('cassandra_info',       info,       handle_info)
00341     service['truncate'] = rospy.Service('cassandra_truncate',   truncate,   handle_truncate)
00342     
00343     rospy.loginfo("start listening ... ")
00344     rospy.spin()
00345     
00346     for _bag in bag.itervalues():
00347         _bag.exit()
00348         
00349     rosCas.disconnect()


cassandra_ros
Author(s): André Dietrich, Sebastian Zug
autogenerated on Sun Jan 5 2014 11:10:29