RosCassandra.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 # -*- coding: utf-8 -*-
00003 import roslib; roslib.load_manifest("cassandra_ros")
00004 import roslib.message
00005 import rospy
00006 import rosmsg
00007 import rostopic
00008 import rosjson
00009 import genpy.dynamic
00010 
00011 import rosbag
00012 import simplejson as json
00013 import hashlib
00014 import time
00015 import StringIO
00016 
00017 import cql
00018 import pycassa
00019 
00020 from Cassandra import Cassandra
00021 from CassandraTopic import CassandraTopic
00022 
00023 class RosCassandraException(rospy.ROSException): pass
00024 
00025 class RosCassandra(Cassandra):
00026     
00027     def __init__(self, host="localhost", port=9160, keyspace="ros"):
00028         Cassandra.__init__(self, host, port)
00029         
00030         self.cql_conn   = None
00031         self.cql_cursor = None        
00032     
00033     def addTopic(self, topic, cassandra_format=None,     # binary, json-string, ros...
00034                                msg_class=None,            # tfMessage, uint8, etc
00035                                msg_package=None,          # the package where the msg-format was defined
00036                                key_format=None,           # Timestamp, hash of msg or a part of the msg, like a sequence 
00037                                key_msg_part=None,         # if a part of the message was defined ... wich one
00038                                comment = "",              # add a comment to topic
00039                                date=None):
00040         
00041         if self.existTopic(topic):
00042             return False
00043         
00044         topic_md5 = self.topic2Hash(topic)
00045         
00046         #column_validation_classes={'data':pycassa.BYTES_TYPE,
00047         #                           'format':pycassa.UTF8_TYPE,
00048         #                           'header.frame_id':pycassa.UTF8_TYPE,
00049         #                           'header.seq':pycassa.INT_TYPE,
00050         #                           'header.stamp.nsecs':pycassa.INT_TYPE,
00051         #                           'header.stamp.secs':pycassa.INT_TYPE}
00052         
00053         casTopic = CassandraTopic(topic, None, None,
00054                                   cassandra_format,
00055                                   msg_class, msg_package,
00056                                   key_format, key_msg_part,
00057                                   comment, date)
00058         
00059         column_validation_classes = casTopic.getColumnValidationClasses()
00060                
00061         column = self.createColumn(topic_md5, column_validation_classes=column_validation_classes)
00062         
00063         meta = casTopic.getMeta()
00064         self.setTopicMeta(topic, meta)
00065         
00066     def topic2Hash(self, topic):
00067         return hashlib.md5(topic).hexdigest()
00068         
00069     def getTopic(self, topic):
00070         
00071         if not self.existTopic(topic):
00072             return False        
00073         
00074         topic_md5 = self.topic2Hash(topic)
00075         column = self.getColumn(topic_md5)
00076         
00077         meta = self.getTopicMeta(topic)
00078         
00079         return CassandraTopic(column=column, cursor=None, meta=meta)
00080     
00081     def removeTopic(self, topic):
00082         if self.existTopic(topic):
00083             topic_md5 = self.topic2Hash(topic)
00084             self.dropColumn(topic_md5)
00085         
00086     def getTopicMeta(self, topic=None, topic_md5=None):
00087         if topic:
00088             topic_md5 = self.topic2Hash(topic)
00089         try:
00090             meta = json.loads(self.getColumnComment(topic_md5))
00091         except:
00092             meta = {}
00093         
00094         return meta
00095     
00096     def setTopicMeta(self, topic, meta):
00097         topic_md5 = self.topic2Hash(topic)
00098         self.setColumnComment(topic_md5, json.dumps(meta))
00099         
00100     def existTopic(self, topic):
00101         topic_md5 = self.topic2Hash(topic)
00102         return self.existColumn(topic_md5)
00103     
00104     def getAllTopics(self):
00105         topics = []
00106         columns = self.getAllColumns()
00107 
00108         for col in columns:
00109             meta = self.getTopicMeta(topic_md5=col)
00110             if type(meta) == dict:
00111                 if meta.has_key('topic'):
00112                     topics.append(meta['topic'])
00113         
00114         return topics
00115      
00116     def renameTopic(self, topic, topic_new):
00117         topic_md5 = self.topic2Hash(topic)
00118         meta = self.getTopicMeta(topic_md5)
00119         meta['topic'] = topic_new
00120         
00121         pass
00122     
00123     
00124     def fileExport(self, topics, filename):
00125         pass
00126     
00127     def fileImport(self, filename, topics=None, format='ros', key='time', key_msg_part=None):
00128         print "read ...",
00129         bag = rosbag.Bag(filename)
00130         print "done"
00131         casTopic = {}
00132         i = 0
00133         for topic, msg, t in bag.read_messages(topics):
00134             if not casTopic.has_key(topic):
00135                 if not self.existTopic(topic):
00136                     
00137                     self.addTopic(topic,
00138                                   cassandra_format=format,
00139                                   msg_class = msg.__class__._type.split('/')[1],
00140                                   msg_package = msg.__class__._type.split('/')[0],
00141                                   key_format=key, 
00142                                   key_msg_part=key_msg_part,
00143                                   comment = "",
00144                                   date=None)
00145                     
00146                 casTopic[topic] = self.getTopic(topic)
00147 
00148             i += 1
00149             print i
00150             #time.sleep(5)
00151             if key == 'time':
00152                 casTopic[topic].addData(msg, str(t.to_nsec()))
00153             else:
00154                 casTopic[topic].addData(msg)
00155         
00156     
00157     def countTopicData(self, topic):
00158         try:
00159             topic_md5 = self.topic2Hash(topic)
00160             self.cursor.execute("select count(*) from '"+topic_md5+"'")
00161             return self.cursor.fetchone()[0]
00162         except:
00163             return 0
00164     
00165     def exequteCQL(self, query):
00166         # establish the cql connection
00167         if self.cql_cursor == None:
00168             self.cql_conn = cql.connect(self.host, self.port, self.keyspace)
00169             self.cql_cursor = self.cql_conn.cursor()
00170         
00171         topics = self.getAllTopics()
00172         
00173         # rename topics to column names ...
00174         for topic in topics:
00175             query = query.replace('"'+topic+'"', '"'+self.topic2Hash(topic)+'"')
00176         print query
00177         
00178         #return query
00179         # execute querie
00180         self.cql_cursor.execute(query)
00181         return self.cql_cursor.fetchall()
00182     
00183     def createIndex(self, topic, column):
00184         indexname = topic+column
00185         indexname = indexname.replace(".","_")
00186         column_validator = self.getTopic(topic).column.column_validators[column]
00187         
00188         self.sysManager.create_index(self.keyspace,
00189                                      self.topic2Hash(topic), 
00190                                      column, 
00191                                      column_validator,
00192                                      index_name=indexname)
00193 
00194     def removeIndex(self, topic, column):
00195         self.sysManager.drop_index(self.keyspace, self.topic2Hash(topic), column)


cassandra_ros
Author(s): André Dietrich, Sebastian Zug
autogenerated on Sun Jan 5 2014 11:10:29