CassandraTopic.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 # -*- coding: utf-8 -*-
00003 import roslib; roslib.load_manifest("cassandra_ros")
00004 import rospy
00005 import genpy
00006 
00007 from CassandraTopic_        import *
00008 from CassandraTopic_binary  import *
00009 from CassandraTopic_ros     import *
00010 from CassandraTopic_string  import *
00011 from CassandraTopic_yaml    import *
00012 
00013 
00014 import rostopic
00015 
00016 import std_msgs.msg
00017 import rosmsg
00018 
00019 import pycassa
00020 import time
00021 
00022 import re
00023 
00024 import cql
00025 
00026 KeyFormats = ['time', 'hash', 'msg_part', 'manual']
00027 CassandraFormats = ['binary', 'string', 'ros', 'ros_old', 'yaml', 'manual']
00028 
00029 class CassandraTopicException(rospy.ROSException): pass
00030 
00031 class CassandraTopic(CassandraTopic_): #, CassandraTopic_ros, CassandraTopic_string):
00032     
00033     def __init__(self, topic="",
00034                  column=None,               # pycassa Column
00035                  cursor=None,               # cql cursor
00036                  cassandra_format=CassandraFormats[0],     # binary, string, ros...
00037                  msg_class='',            # tfMessage, uint8, etc
00038                  msg_package='',          # the package where the msg-format was defined
00039                  key_format=KeyFormats[0],  # Timestamp, hash of msg or a part of the msg, like a sequence 
00040                  key_msg_part='',         # if a part of the message was defined ... wich one
00041                  comment = "",              # add a comment to topic
00042                  date=None,
00043                  meta=None):                # add a date ...
00044         
00045         if meta:
00046             self.setMeta(meta)
00047             
00048         else:
00049             self.topic              = topic
00050             self.cassandra_format   = cassandra_format
00051         
00052             self.msg_class          = msg_class
00053             self.msg_package        = msg_package
00054             self.key_format         = key_format
00055             self.key_msg_part       = key_msg_part
00056             self.comment            = comment
00057             self.date               = date
00058           
00059             if not date:
00060                 self.date = time.asctime()
00061             
00062             self._generateMsg()
00063         
00064         self.column = column
00065         
00066         # change the baseclass verry importate ,)
00067         self.__class__.__bases__ = (eval("CassandraTopic_"+self.cassandra_format) ,)
00068         #CassandraTopic_.__init__(self, self.MsgClass)
00069         
00070         self.cursor=cursor
00071             
00072     def getMeta(self):
00073         meta = {}
00074         
00075         meta['topic']               = self.topic
00076         meta['cassandra_format']    = self.cassandra_format
00077         
00078         meta['msg_class']           = self.msg_class
00079         meta['msg_package']         = self.msg_package
00080         meta['key_format']          = self.key_format
00081         meta['key_msg_part']        = self.key_msg_part
00082         meta['comment']             = self.comment
00083         meta['date']                = self.date
00084         
00085         return meta
00086     
00087     def setMeta(self, meta):
00088         self.topic              = meta['topic']
00089         self.cassandra_format   = meta['cassandra_format']
00090         
00091         self.msg_class          = meta['msg_class']
00092         self.msg_package        = meta['msg_package']
00093         self.key_format         = meta['key_format']
00094         self.key_msg_part       = meta['key_msg_part']
00095         self.comment            = meta['comment']
00096         self.date               = meta['date']
00097         
00098         self._generateMsg()
00099         
00100     def _generateMsg(self):
00101         if (len(self.msg_class)==0 or len(self.msg_package))==0 and len(self.topic)>0:
00102             msg_class, _, _ = rostopic.get_topic_class(self.topic, blocking=False)
00103             self.msg_package = msg_class.__module__.split(".")[0]
00104             self.msg_class   = msg_class.__name__
00105         
00106         self.MsgClass           = genpy.message.get_message_class(self.msg_package+"/"+self.msg_class)
00107         
00108         if not self.MsgClass:
00109             roslib.launcher.load_manifest(self.msg_package)
00110             self.MsgClass = genpy.message.get_message_class(self.msg_package+"/"+self.msg_class)
00111             if not self.MsgClass:
00112                 raise CassandraTopicException("failed to initialize ros msg of type: "+self.msg_package+"/"+self.msg_class)
00113     
00114     def addData(self, msg, key=None, ttl=None):
00115         # add a list of messages
00116         if isinstance(msg, list):
00117             # keys are already defined
00118             if isinstance(key, list):
00119                 for i in len(msg):
00120                     self.column.insert(key[i], self.encode(msg[i]), ttl=ttl)
00121             else:
00122                 for i in len(msg):
00123                     self.column.insert(self.createKey(msg[i]), self.encode(msg[i]), ttl=ttl)
00124         else:
00125             if not key:
00126                 key = self.createKey(msg)
00127                 
00128             self.column.insert(key, self.encode(msg), ttl=ttl) 
00129     
00130     def getData(self, key, to_key=None, queue=100):
00131         if isinstance(key, list):
00132             data = []
00133             for k in key:
00134                 data.append(self.getData(k))
00135         # a range of keys is defined
00136         elif to_key:
00137             data = []
00138             values = self.column.get_range(start=key, finish=to_key, row_count=queue, include_timestamp=False)
00139             
00140             for value in values:
00141                 key, value = value
00142                 data.append((key, self.decode(value)))
00143         #return a single value
00144         else:
00145             value = self.column.get(key, include_timestamp=False)
00146             msg = self.decode(value)
00147             data = (key,msg)
00148             
00149         return data
00150     
00151     def getAllData(self, queue=100):
00152         data = []
00153         iter_ = self.column.get_range(start='', row_count=queue, include_timestamp=False)
00154         for element in iter_:
00155             key, value = element
00156             data.append((key, self.decode(value)))
00157         return data
00158         
00159     def removeData(self, key, to_key=None, queue=100):
00160         # remove a list of keys
00161         if isinstance(key, list):
00162             for k in key:
00163                 self.column.remove(k)
00164         
00165         # remove a range of values
00166         elif to_key:
00167             while True:
00168                 values = list(self.column.get_range(start=key, finish=to_key, row_count=100))
00169                 
00170                 for value in values:
00171                     _key, _ = value
00172                     self.column.remove(_key)
00173                 
00174                 if len(values) < queue:
00175                     break
00176                 
00177         # remove a single value
00178         else:
00179             self.column.remove(key) 
00180             
00181     def removeAllData(self):
00182         while True:
00183             values = list(self.column.get_range(start='', row_count=100))
00184             
00185             for value in values:
00186                 _key, _ = value
00187                 self.column.remove(_key)
00188                 
00189             if len(values) == 0:
00190                 break
00191     
00192     def createKey(self, msg):
00193         if self.key_format == KeyFormats[0]: # time
00194             _time = rospy.Time.now()
00195             return str(_time.to_nsec())
00196         elif self.key_format == KeyFormats[1]: # hash
00197             pass
00198         elif self.key_format == KeyFormats[2]: # msg_part
00199             return str(eval(self.key_msg_part))
00200         else:
00201             raise CassandraTopicException("unknown keyformat: "+str(self.key_format))
00202         
00203     def countData(self):
00204         self.cursor.execute("select count(*) from '"+self.column.column_family+"'")
00205         return self.cursor.fetchone()[0]
00206         


cassandra_ros
Author(s):
autogenerated on Thu Jun 6 2019 21:09:27