00001
00002
00003 import roslib; roslib.load_manifest("cassandra_ros")
00004 import roslib.message
00005 import rospy
00006 import rosmsg
00007 import rostopic
00008
00009 import genpy.dynamic
00010
00011 import rosbag
00012 import simplejson as json
00013 import hashlib
00014 import time
00015 import StringIO
00016
00017 import cql
00018 import pycassa
00019
00020 from Cassandra import Cassandra
00021 from CassandraTopic import CassandraTopic
00022
00023 class RosCassandraException(rospy.ROSException): pass
00024
00025 class RosCassandra(Cassandra):
00026
00027 def __init__(self, host="localhost", port=9160, keyspace="ros"):
00028 Cassandra.__init__(self, host, port)
00029
00030 self.cql_conn = None
00031 self.cql_cursor = None
00032
00033 def addTopic(self, topic, cassandra_format=None,
00034 msg_class=None,
00035 msg_package=None,
00036 key_format=None,
00037 key_msg_part=None,
00038 comment = "",
00039 date=None):
00040
00041 if self.existTopic(topic):
00042 return False
00043
00044 topic_md5 = self.topic2Hash(topic)
00045
00046
00047
00048
00049
00050
00051
00052
00053 casTopic = CassandraTopic(topic, None, None,
00054 cassandra_format,
00055 msg_class, msg_package,
00056 key_format, key_msg_part,
00057 comment, date)
00058
00059 column_validation_classes = casTopic.getColumnValidationClasses()
00060
00061 column = self.createColumn(topic_md5, column_validation_classes=column_validation_classes)
00062
00063 meta = casTopic.getMeta()
00064 self.setTopicMeta(topic, meta)
00065
00066 def topic2Hash(self, topic):
00067 return hashlib.md5(topic).hexdigest()
00068
00069 def getTopic(self, topic):
00070
00071 if not self.existTopic(topic):
00072 return False
00073
00074 topic_md5 = self.topic2Hash(topic)
00075 column = self.getColumn(topic_md5)
00076
00077 meta = self.getTopicMeta(topic)
00078
00079 return CassandraTopic(column=column, cursor=None, meta=meta)
00080
00081 def removeTopic(self, topic):
00082 if self.existTopic(topic):
00083 topic_md5 = self.topic2Hash(topic)
00084 self.dropColumn(topic_md5)
00085
00086 def getTopicMeta(self, topic=None, topic_md5=None):
00087 if topic:
00088 topic_md5 = self.topic2Hash(topic)
00089 try:
00090 meta = json.loads(self.getColumnComment(topic_md5))
00091 except:
00092 meta = {}
00093
00094 return meta
00095
00096 def setTopicMeta(self, topic, meta):
00097 topic_md5 = self.topic2Hash(topic)
00098 self.setColumnComment(topic_md5, json.dumps(meta))
00099
00100 def existTopic(self, topic):
00101 topic_md5 = self.topic2Hash(topic)
00102 return self.existColumn(topic_md5)
00103
00104 def getAllTopics(self):
00105 topics = []
00106 columns = self.getAllColumns()
00107
00108 for col in columns:
00109 meta = self.getTopicMeta(topic_md5=col)
00110 if type(meta) == dict:
00111 if meta.has_key('topic'):
00112 topics.append(meta['topic'])
00113
00114 return topics
00115
00116 def renameTopic(self, topic, topic_new):
00117 topic_md5 = self.topic2Hash(topic)
00118 meta = self.getTopicMeta(topic_md5)
00119 meta['topic'] = topic_new
00120
00121 pass
00122
00123
00124 def fileExport(self, topics, filename):
00125 pass
00126
00127 def fileImport(self, filename, topics=None, format='ros', key='time', key_msg_part=None):
00128 print "read ...",
00129 bag = rosbag.Bag(filename)
00130 print "done"
00131 casTopic = {}
00132 i = 0
00133 for topic, msg, t in bag.read_messages(topics):
00134 if not casTopic.has_key(topic):
00135 if not self.existTopic(topic):
00136
00137 self.addTopic(topic,
00138 cassandra_format=format,
00139 msg_class = msg.__class__._type.split('/')[1],
00140 msg_package = msg.__class__._type.split('/')[0],
00141 key_format=key,
00142 key_msg_part=key_msg_part,
00143 comment = "",
00144 date=None)
00145
00146 casTopic[topic] = self.getTopic(topic)
00147
00148 i += 1
00149 print i
00150
00151 if key == 'time':
00152 casTopic[topic].addData(msg, str(t.to_nsec()))
00153 else:
00154 casTopic[topic].addData(msg)
00155
00156
00157 def countTopicData(self, topic):
00158 try:
00159 topic_md5 = self.topic2Hash(topic)
00160 self.cursor.execute("select count(*) from '"+topic_md5+"'")
00161 return self.cursor.fetchone()[0]
00162 except:
00163 return 0
00164
00165 def exequteCQL(self, query):
00166
00167 if self.cql_cursor == None:
00168 self.cql_conn = cql.connect(self.host, self.port, self.keyspace)
00169 self.cql_cursor = self.cql_conn.cursor()
00170
00171 topics = self.getAllTopics()
00172
00173
00174 for topic in topics:
00175 query = query.replace('"'+topic+'"', '"'+self.topic2Hash(topic)+'"')
00176 print query
00177
00178
00179
00180 self.cql_cursor.execute(query)
00181 return self.cql_cursor.fetchall()
00182
00183 def createIndex(self, topic, column):
00184 indexname = topic+column
00185 indexname = indexname.replace(".","_")
00186 column_validator = self.getTopic(topic).column.column_validators[column]
00187
00188 self.sysManager.create_index(self.keyspace,
00189 self.topic2Hash(topic),
00190 column,
00191 column_validator,
00192 index_name=indexname)
00193
00194 def removeIndex(self, topic, column):
00195 self.sysManager.drop_index(self.keyspace, self.topic2Hash(topic), column)