00001
00002 import roslib; roslib.load_manifest('cassandra_ros')
00003 import rospy
00004 import rostopic
00005
00006 from cassandra_ros.srv import *
00007 import RosCassandra.RosCassandra as rc
00008
00009 import time
00010 import threading
00011 from PyQt4.QtCore import QThread
00012
00013
00014
00015 def enum(**enums):
00016 return type('Enum', (), enums)
00017
00018 running = enum(START=1, STOP=0, PAUSE=-1)
00019
00020 class RosCassandraBag(QThread):
00021 MAX_GWMTIME = 4294967295
00022 MIN_GWMTIME = 0
00023
00024 def __init__(self, casTopic, parent=None):
00025 QThread.__init__(self,parent)
00026 self.casTopic= casTopic
00027
00028 self.record_start_time = self.MIN_GWMTIME
00029 self.record_stop_time = self.MAX_GWMTIME
00030 self.record_status = running.STOP
00031 self.record_filter = None
00032
00033 self.play_start_time = self.MIN_GWMTIME
00034 self.play_stop_time = self.MAX_GWMTIME
00035 self.play_status = running.STOP
00036 self.play_filter = None
00037
00038 def record(self, msg):
00039
00040
00041
00042
00043
00044 if rospy.get_time() >= self.record_start_time:
00045 if self.record_status == running.START:
00046 if self.record_filter:
00047 if( eval(self.record_filter) ):
00048 self.casTopic.addData(msg, ttl=self.record_ttl)
00049 else:
00050 self.casTopic.addData(msg)
00051
00052 if rospy.get_time() >= self.record_stop_time:
00053 self.stopRecord()
00054
00055 def startRecord(self, start_time=MIN_GWMTIME, stop_time=MAX_GWMTIME, filter=None, ttl=None):
00056 self.record_start_time = min(self.record_start_time, start_time)
00057 self.record_stop_time = max(self.record_stop_time, stop_time)
00058 self.record_filter = filter
00059 self.record_ttl = ttl
00060
00061 self.record_status = running.START
00062
00063 self.subscriber = rospy.Subscriber(self.casTopic.topic, self.casTopic.MsgClass, self.record)
00064
00065 def stopRecord(self):
00066 if self.record_status == running.START or self.record_status == running.PAUSE:
00067 self.record_status = running.STOP
00068 self.subscriber.unregister()
00069
00070 def pauseRecord(self):
00071 if self.record_status == running.START:
00072 self.record_status = running.PAUSE
00073 elif self.record_status == running.PAUSE:
00074 self.record_status = running.START
00075
00076 def exit(self):
00077 self.stopPlay()
00078 self.stopRecord()
00079
00080 def startPlay(self, start_time=MIN_GWMTIME, stop_time=MAX_GWMTIME, speed=1, delay=0, queuesize=100, loop=False, filter=None):
00081 self.play_start_time = start_time
00082 self.play_stop_time = stop_time
00083 self.play_speed = speed
00084 self.play_delay = delay
00085 self.play_queuesize = queuesize
00086 self.play_loop = loop
00087 self.play_filter = filter
00088
00089 self.play_status = running.START
00090
00091 if not self.play_filter or self.play_filter == "":
00092 self.play_filter = None
00093
00094 self.publisher = rospy.Publisher(self.casTopic.topic, self.casTopic.MsgClass)
00095
00096 self.start()
00097
00098 def stopPlay(self):
00099 if self.isRunning():
00100 self.terminate()
00101
00102
00103 def pausePlay(self):
00104
00105 if self.isRunning():
00106 if self.play_status == running.START:
00107 self.play_status = running.PAUSE
00108 elif self.play_status == running.PAUSE:
00109 self.play_status = running.START
00110
00111
00112 def run(self):
00113 rospy.sleep(self.play_delay)
00114
00115 while True:
00116
00117 from_key = str(self.play_start_time)
00118 to_key = str(self.play_stop_time)
00119
00120 _last_time = long(from_key)
00121 _currentTime = rospy.Time.now().to_nsec()
00122
00123 while True:
00124 data = self.casTopic.getData(from_key, to_key, self.play_queuesize)
00125
00126 for dat in data:
00127 from_key, msg = dat
00128
00129 timestamp = long(from_key)
00130
00131 if _last_time > 0:
00132 delta_t = float(timestamp - _last_time) / (self.play_speed*1000000000)
00133 else:
00134 delta_t = 0
00135
00136 time.sleep(delta_t)
00137
00138 _last_time = timestamp
00139
00140 if self.play_filter:
00141 if eval(self.play_filter):
00142 self.publisher.publish(msg)
00143 else:
00144 self.publisher.publish(msg)
00145
00146
00147 while self.play_status == running.PAUSE:
00148 self.yieldCurrentThread()
00149
00150
00151 from_key = str(long(from_key)+1)
00152
00153
00154 if len(data) < self.play_queuesize:
00155 break
00156
00157 if not self.play_loop:
00158 break
00159
00160 self.play_status = running.STOP
00161 rospy.loginfo("STOP")
00162
00163
00164 def handle_record(req):
00165 global rosCas, bag
00166 response = ""
00167
00168 if req.ttl == 0:
00169 req.ttl = None
00170
00171 for topic in (req.topics):
00172
00173 if req.record==1:
00174 if not bag.has_key(topic):
00175 if not rosCas.existTopic(topic):
00176
00177
00178
00179 msg_class, _, _ = rostopic.get_topic_class(topic, blocking=True)
00180
00181 rosCas.addTopic(topic,
00182 req.cassandra_format,
00183 msg_class.__name__,
00184 msg_class.__module__.split(".")[0],
00185 'time', None,
00186 comment='')
00187
00188
00189 bag[topic] = RosCassandraBag(rosCas.getTopic(topic))
00190
00191 bag[topic].startRecord(req.start_time, req.stop_time, req.filter, req.ttl)
00192 rospy.loginfo("start recording: "+topic)
00193
00194
00195 elif req.record == 0:
00196 if bag.has_key(topic):
00197 bag[topic].stopRecord()
00198 rospy.loginfo("stop recording: "+topic)
00199
00200
00201 else:
00202 if bag.has_key(topic):
00203 bag[topic].pauseRecord()
00204 rospy.loginfo("pause recording: "+topic)
00205
00206 return response
00207
00208 def handle_play(req):
00209 global rosCas, bag
00210 response = ""
00211 for topic in (req.topics):
00212
00213 if req.play == 1:
00214 if not bag.has_key(topic):
00215 if rosCas.existTopic(topic):
00216 bag[topic] = RosCassandraBag(rosCas.getTopic(topic))
00217 bag[topic].startPlay(start_time = req.start_time,
00218 stop_time = req.stop_time,
00219 speed = req.speed,
00220 delay = req.delay,
00221 queuesize = req.queuesize,
00222 loop = req.loop,
00223 filter = req.filter)
00224 else:
00225 rospy.loginfo("topic ("+topic+") does not exist: ")
00226
00227
00228 else:
00229 bag[topic].startPlay(start_time = req.start_time,
00230 stop_time = req.stop_time,
00231 speed = req.speed,
00232 delay = req.delay,
00233 queuesize = req.queuesize,
00234 loop = req.loop,
00235 filter = req.filter)
00236
00237 rospy.loginfo("start playing: "+topic)
00238
00239 elif req.play == 0:
00240 if bag.has_key(topic):
00241 bag[topic].stopPlay()
00242 rospy.loginfo("stop playing: "+topic)
00243
00244
00245 elif req.play == -1:
00246 if bag.has_key(topic):
00247 bag[topic].pausePlay()
00248 rospy.loginfo("pause playing: "+topic)
00249
00250 return response
00251
00252 def handle_delete(req):
00253 global rosCas, bag
00254 response = ""
00255 for topic in (req.topics):
00256
00257 if rosCas.existTopic(topic):
00258 _topic = rosCas.getTopic(topic)
00259 rospy.loginfo("deleting "+topic)
00260 _topic.removeData(key=str(req.start_time), to_key=str(req.stop_time))
00261 else:
00262 rospy.loginfo("deleting failed, topic ("+topic+") does not exist")
00263 return response
00264
00265 def handle_truncate(req):
00266 global rosCas, bag
00267 response = ""
00268 for topic in (req.topics):
00269
00270 if rosCas.existTopic(topic):
00271 rosCas.removeTopic(topic)
00272 rospy.loginfo("truncate "+topic)
00273 else:
00274 rospy.loginfo("truncate failed, topic ("+topic+") does not exist")
00275 return response
00276
00277 def handle_info(req):
00278 global rosCas, bag
00279 response = "\n"
00280 print req.command
00281
00282 if req.command == 'list':
00283 topics = rosCas.getAllTopics()
00284 response += "number of topics stored in CassandraDB: "+str(len(topics)) +"\n"
00285 for i in range(len(topics)):
00286 response += str(i+1)+". "+topics[i]+" ("+str(rosCas.countTopicData(topics[i]))+")"+"\n"
00287
00288 elif req.command == 'status':
00289 response += "list of connected hosts: "+str(rosCas.host) +"\n"
00290 for topic in bag.keys():
00291 if bag[topic].play_status == running.START:
00292 response += topic + ": playback is running\n"
00293 elif bag[topic].play_status == running.PAUSE:
00294 response += topic + ": playback is paused\n"
00295 elif bag[topic].record_status == running.START:
00296 response += topic + ": recording is running\n"
00297 elif bag[topic].record_status == running.PAUSE:
00298 response += topic + ": recording is paused\n"
00299 else:
00300 response += topic + ": is idle\n"
00301
00302 elif req.command == 'info':
00303 for topic in req.topics:
00304 meta = rosCas.getTopicMeta(topic)
00305 for key in meta.keys():
00306 response += key+": "+str(meta[key])+"\n"
00307 response += "column name: "+rosCas.topic2Hash(topic)+"\n"
00308 response += "number of entries: "+str(rosCas.countTopicData(topic))+"\n"
00309
00310 elif req.command == 'cql':
00311 print rosCas.exequteCQL(req.topics[0])
00312
00313 else:
00314 rsp += "unknown command: "+req.command+"\n"
00315
00316 return response
00317
00318 if __name__ == "__main__":
00319 host = rospy.get_param('/cassandraBag/host', "localhost")
00320 port = rospy.get_param('/cassandraBag/port', 9160)
00321 keyspace = rospy.get_param('/cassandraBag/keyspace', "test")
00322
00323 rosCas = rc.RosCassandra(host, port)
00324 rospy.loginfo("connected to Cassandra on %s:%d"%(host,port))
00325
00326
00327 if not rosCas.connectToKeyspace(keyspace):
00328 rosCas.createKeyspace(keyspace)
00329
00330 rosCas.connectToKeyspace(keyspace)
00331 rospy.loginfo("connected to Keyspace \"%s\""%(keyspace))
00332
00333 bag = {}
00334
00335 rospy.init_node('cassandraBag')
00336 service = {}
00337 service['record'] = rospy.Service('cassandra_record', record, handle_record)
00338 service['play'] = rospy.Service('cassandra_play', play, handle_play)
00339 service['delete'] = rospy.Service('cassandra_delete', delete, handle_delete)
00340 service['info'] = rospy.Service('cassandra_info', info, handle_info)
00341 service['truncate'] = rospy.Service('cassandra_truncate', truncate, handle_truncate)
00342
00343 rospy.loginfo("start listening ... ")
00344 rospy.spin()
00345
00346 for _bag in bag.itervalues():
00347 _bag.exit()
00348
00349 rosCas.disconnect()