00001
00002
00003 import roslib; roslib.load_manifest("cassandra_ros")
00004 import rospy
00005 import genpy
00006
00007 from CassandraTopic_ import *
00008 from CassandraTopic_binary import *
00009 from CassandraTopic_ros import *
00010 from CassandraTopic_string import *
00011 from CassandraTopic_yaml import *
00012
00013
00014 import rostopic
00015
00016 import std_msgs.msg
00017 import rosmsg
00018
00019 import pycassa
00020 import time
00021
00022 import re
00023
00024 import cql
00025
00026 KeyFormats = ['time', 'hash', 'msg_part', 'manual']
00027 CassandraFormats = ['binary', 'string', 'ros', 'ros_old', 'yaml', 'manual']
00028
00029 class CassandraTopicException(rospy.ROSException): pass
00030
00031 class CassandraTopic(CassandraTopic_):
00032
00033 def __init__(self, topic="",
00034 column=None,
00035 cursor=None,
00036 cassandra_format=CassandraFormats[0],
00037 msg_class='',
00038 msg_package='',
00039 key_format=KeyFormats[0],
00040 key_msg_part='',
00041 comment = "",
00042 date=None,
00043 meta=None):
00044
00045 if meta:
00046 self.setMeta(meta)
00047
00048 else:
00049 self.topic = topic
00050 self.cassandra_format = cassandra_format
00051
00052 self.msg_class = msg_class
00053 self.msg_package = msg_package
00054 self.key_format = key_format
00055 self.key_msg_part = key_msg_part
00056 self.comment = comment
00057 self.date = date
00058
00059 if not date:
00060 self.date = time.asctime()
00061
00062 self._generateMsg()
00063
00064 self.column = column
00065
00066
00067 self.__class__.__bases__ = (eval("CassandraTopic_"+self.cassandra_format) ,)
00068
00069
00070 self.cursor=cursor
00071
00072 def getMeta(self):
00073 meta = {}
00074
00075 meta['topic'] = self.topic
00076 meta['cassandra_format'] = self.cassandra_format
00077
00078 meta['msg_class'] = self.msg_class
00079 meta['msg_package'] = self.msg_package
00080 meta['key_format'] = self.key_format
00081 meta['key_msg_part'] = self.key_msg_part
00082 meta['comment'] = self.comment
00083 meta['date'] = self.date
00084
00085 return meta
00086
00087 def setMeta(self, meta):
00088 self.topic = meta['topic']
00089 self.cassandra_format = meta['cassandra_format']
00090
00091 self.msg_class = meta['msg_class']
00092 self.msg_package = meta['msg_package']
00093 self.key_format = meta['key_format']
00094 self.key_msg_part = meta['key_msg_part']
00095 self.comment = meta['comment']
00096 self.date = meta['date']
00097
00098 self._generateMsg()
00099
00100 def _generateMsg(self):
00101 if (len(self.msg_class)==0 or len(self.msg_package))==0 and len(self.topic)>0:
00102 msg_class, _, _ = rostopic.get_topic_class(self.topic, blocking=False)
00103 self.msg_package = msg_class.__module__.split(".")[0]
00104 self.msg_class = msg_class.__name__
00105
00106 self.MsgClass = genpy.message.get_message_class(self.msg_package+"/"+self.msg_class)
00107
00108 if not self.MsgClass:
00109 roslib.launcher.load_manifest(self.msg_package)
00110 self.MsgClass = genpy.message.get_message_class(self.msg_package+"/"+self.msg_class)
00111 if not self.MsgClass:
00112 raise CassandraTopicException("failed to initialize ros msg of type: "+self.msg_package+"/"+self.msg_class)
00113
00114 def addData(self, msg, key=None, ttl=None):
00115
00116 if isinstance(msg, list):
00117
00118 if isinstance(key, list):
00119 for i in len(msg):
00120 self.column.insert(key[i], self.encode(msg[i]), ttl=ttl)
00121 else:
00122 for i in len(msg):
00123 self.column.insert(self.createKey(msg[i]), self.encode(msg[i]), ttl=ttl)
00124 else:
00125 if not key:
00126 key = self.createKey(msg)
00127
00128 self.column.insert(key, self.encode(msg), ttl=ttl)
00129
00130 def getData(self, key, to_key=None, queue=100):
00131 if isinstance(key, list):
00132 data = []
00133 for k in key:
00134 data.append(self.getData(k))
00135
00136 elif to_key:
00137 data = []
00138 values = self.column.get_range(start=key, finish=to_key, row_count=queue, include_timestamp=False)
00139
00140 for value in values:
00141 key, value = value
00142 data.append((key, self.decode(value)))
00143
00144 else:
00145 value = self.column.get(key, include_timestamp=False)
00146 msg = self.decode(value)
00147 data = (key,msg)
00148
00149 return data
00150
00151 def getAllData(self, queue=100):
00152 data = []
00153 iter_ = self.column.get_range(start='', row_count=queue, include_timestamp=False)
00154 for element in iter_:
00155 key, value = element
00156 data.append((key, self.decode(value)))
00157 return data
00158
00159 def removeData(self, key, to_key=None, queue=100):
00160
00161 if isinstance(key, list):
00162 for k in key:
00163 self.column.remove(k)
00164
00165
00166 elif to_key:
00167 while True:
00168 values = list(self.column.get_range(start=key, finish=to_key, row_count=100))
00169
00170 for value in values:
00171 _key, _ = value
00172 self.column.remove(_key)
00173
00174 if len(values) < queue:
00175 break
00176
00177
00178 else:
00179 self.column.remove(key)
00180
00181 def removeAllData(self):
00182 while True:
00183 values = list(self.column.get_range(start='', row_count=100))
00184
00185 for value in values:
00186 _key, _ = value
00187 self.column.remove(_key)
00188
00189 if len(values) == 0:
00190 break
00191
00192 def createKey(self, msg):
00193 if self.key_format == KeyFormats[0]:
00194 _time = rospy.Time.now()
00195 return str(_time.to_nsec())
00196 elif self.key_format == KeyFormats[1]:
00197 pass
00198 elif self.key_format == KeyFormats[2]:
00199 return str(eval(self.key_msg_part))
00200 else:
00201 raise CassandraTopicException("unknown keyformat: "+str(self.key_format))
00202
00203 def countData(self):
00204 self.cursor.execute("select count(*) from '"+self.column.column_family+"'")
00205 return self.cursor.fetchone()[0]
00206