00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029 import roslib; roslib.load_manifest('warehouse')
00030 import rospy
00031 import warehouse.srv as whs
00032 from warehouse.msg import *
00033 from warehouse.exceptions import *
00034 import threading
00035
00036 class ClientCollectionHandler:
00037 """
00038 A handler for client connections to a collection in the warehouse.
00039 """
00040
00041 def __init__(self, db_name, collection_name, msg, indexed_fields):
00042 """
00043 Constructor
00044 """
00045 self.db_name = db_name
00046 self.collection_name = collection_name
00047
00048 self.pkg_type_name = msg._type[:msg._type.find("/")]
00049 self.msg_type_name = msg._type[ msg._type.find("/")+1:]
00050
00051 self.push_condition = threading.Condition()
00052 self.query_condition = threading.Condition()
00053
00054 self.notify_cb = None
00055
00056
00057 try:
00058 roslib.load_manifest(self.pkg_type_name)
00059 m = __import__(self.pkg_type_name+".msg")
00060 mod = getattr(m,'msg')
00061 except roslib.packages.InvalidROSPkgException:
00062 rospy.logerr( 'package {0} not found'.format(self.pkg_type_name) )
00063 except ImportError:
00064 rospy.logerr( 'no messages found in {0}'.format(self.pkg_type_name) )
00065
00066
00067 try:
00068 self.msg_type = getattr(mod, self.msg_type_name)
00069 except AttributeError:
00070 rospy.logerr( 'message type {0} not found in {1}'.format(self.msg_type_name, self.pkg_type_name) )
00071
00072
00073 rospy.wait_for_service('setup_collection')
00074 create_srv = rospy.ServiceProxy('setup_collection', whs.SetupCollection)
00075
00076
00077 rospy.wait_for_service('initiate_query')
00078 self.query_srv = rospy.ServiceProxy('initiate_query', whs.InitiateQuery)
00079
00080
00081 rospy.wait_for_service('pull_message')
00082 self.pull_srv = rospy.ServiceProxy('pull_message', whs.PullMessage)
00083
00084 rospy.wait_for_service('close_collection')
00085 self.close_collection_srv = rospy.ServiceProxy('close_collection', whs.CloseCollection)
00086
00087
00088 resp = create_srv(db_name=self.db_name, collection_name=self.collection_name,
00089 msg_pkg=self.pkg_type_name, msg_type=self.msg_type_name,
00090 indexed_fields=indexed_fields)
00091
00092 if resp.error_code == whs.SetupCollectionResponse.PACKAGE_NOT_FOUND:
00093 raise ClientException("Package not found: {0}".format(resp.error_msg))
00094 elif resp.error_code == whs.SetupCollectionResponse.MESSAGE_NOT_FOUND_IN_PACKAGE:
00095 raise ClientException("Message not found: {0}".format(resp.error_msg))
00096 elif resp.error_code == whs.SetupCollectionResponse.INVALID_INDEXED_FIELD:
00097 raise ClientException("Invalid indexed field: {0}".format(resp.error_msg))
00098 elif resp.error_code != whs.SetupCollectionResponse.SUCCESS:
00099 raise ClientException("Backend error: {0}".format(resp.error_msg))
00100 else:
00101
00102 insert_topic_name = "warehouse/%s/%s/insert" % (self.db_name, self.collection_name)
00103 notify_topic_name = "warehouse/%s/%s/notify" % (self.db_name, self.collection_name)
00104
00105 self.publisher = rospy.Publisher(insert_topic_name, self.msg_type, latch=True)
00106
00107 self.subscriber = rospy.Subscriber(notify_topic_name, UpdateNotification, self._handle_notify)
00108
00109 def close(self):
00110 if self.subscriber != None and self.publisher != None :
00111 self.subscriber.unregister()
00112 self.close_collection_srv(db_name=self.db_name, collection_name=self.collection_name)
00113 self.publisher.unregister()
00114 self.subscriber = None
00115 self.publisher = None
00116
00117 def push_data(self, msg):
00118 """
00119 Push data into a collection
00120 @param msg : The new data to insert into the database
00121 """
00122
00123 self.publisher.publish(msg)
00124 self.push_condition.acquire()
00125
00126
00127 if self.push_condition.wait(10.0):
00128 rospy.logerr("Timeout reached while waiting for Warehouse response to a push data")
00129 self.push_condition.release()
00130
00131
00132 def subscribe_to_insertion(self, cb):
00133 """
00134 Register a function callback that is triggered whenever new data is inserted into the collection
00135 @param cb: The callback
00136 """
00137 self.notify_cb = cb
00138
00139
00140 def query(self, conditions):
00141 """
00142 Query a collection for data
00143 @param conditions : Conditions for the query
00144 @return a list of matching entries
00145 """
00146 result = []
00147
00148
00149 init_resp = self.query_srv(db_name= self.db_name, collection_name=self.collection_name, query=conditions, metadata_only=False )
00150
00151
00152 topic = "warehouse/%s/%s/%s" % (self.db_name, self.collection_name, init_resp.query_id)
00153 query_sub = rospy.Subscriber(topic, self.msg_type, self._handle_pull, result)
00154
00155
00156
00157 while query_sub.get_num_connections() == 0:
00158 rospy.sleep(0.1)
00159
00160
00161 done = False
00162 while not done:
00163 self.query_condition.acquire()
00164 pull_resp = self.pull_srv( db_name=self.db_name, collection_name=self.collection_name, query_id=init_resp.query_id )
00165 done = pull_resp.num_messages_sent == 0
00166 if not done:
00167 self.query_condition.wait(10.0)
00168 self.query_condition.release()
00169
00170 query_sub.unregister()
00171 return result
00172
00173 def _handle_notify(self, msg):
00174 """
00175 Handles data insertion notifications.
00176 """
00177 self.push_condition.acquire()
00178 rospy.loginfo("Data inserted into collection[{0}:{1}]".format(self.db_name, self.collection_name) )
00179 self.push_condition.notify()
00180 self.push_condition.release()
00181 if self.notify_cb != None:
00182 self.notify_cb( msg )
00183
00184
00185 def _handle_pull(self, msg, result):
00186 """
00187 Handle a pull from a query. This funciton is only used internally
00188 """
00189 self.query_condition.acquire()
00190 result.append(msg)
00191 self.query_condition.notify()
00192 self.query_condition.release()
00193
00194
00195
00196
00197
00198
00199 class Client:
00200 """
00201 The warehouse library interface.
00202 """
00203
00204 def __init__(self):
00205 """
00206 Initialize the warehouse library
00207 """
00208 self.collections = {}
00209 rospy.wait_for_service('list_collections')
00210 self.list_collections_srv = rospy.ServiceProxy('list_collections', whs.ListCollections)
00211
00212 rospy.wait_for_service('drop_db')
00213 self.drop_db_srv = rospy.ServiceProxy('drop_db', whs.DropDB)
00214
00215 def __del__(self):
00216 for k,v in self.collections.items():
00217 for c,h in v.items():
00218 h.close()
00219
00220 def has_collection(self, db_name, collection_name):
00221 """
00222 Checks for the existence of a collection in a database.
00223 @param db_name : Name of a database
00224 @param collection_name : Name of a collection
00225 """
00226 resp = self.list_collections_srv(db_name=db_name)
00227 return (resp.db_exists, collection_name in resp.collections)
00228
00229 def drop_db(self, db_name):
00230 """
00231 Drop a database. This will invalidate all connections to the database
00232 @param db_name : Name of the databse to delete
00233 """
00234
00235 for k,v in self.collections[db_name].items():
00236 v.close()
00237 del self.collections[db_name][k]
00238 del self.collections[db_name]
00239
00240 self.drop_db_srv(db_name=db_name)
00241 return True
00242
00243
00244
00245
00246
00247
00248
00249
00250
00251
00252
00253 def setup_collection(self, db_name, collection_name, msg, indexed_fields):
00254 """
00255 Create a new collection in the specified database. The collection will contain documents of type <pkg_type>.<msg_type>
00256 @param db_name : Name of the database
00257 @param collection_name : Name of the collection
00258 @param msg : Message type
00259 @param indexed_fields: Fields on which to index the data. Each element must be a nested field specifier such as 'pose.position.x'
00260 @return A instance of connection to the specified collection
00261 """
00262
00263 if not db_name in self.collections:
00264 self.collections[db_name] = {}
00265
00266 if not collection_name in self.collections[db_name]:
00267 self.collections[db_name][collection_name] = ClientCollectionHandler(db_name, collection_name, msg, indexed_fields)
00268
00269 return self.collections[db_name][collection_name]