00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017 import os
00018 import json
00019 import time
00020 import thread
00021 import signal
00022 import urllib2
00023
00024 from include.constants import *
00025 from include.logger import Log
00026 from include.pubsub.iPubSub import Isubscriber
00027 from include.pubsub.contextbroker.ngsi9 import registerContext, deleteContext, deleteAllContexts, refreshAllContexts
00028
00029
00030 class CbSubscriber(Isubscriber):
00031
00032 subscriptions = {}
00033 refresh_thread = None
00034
00035 def subscribe(self, namespace, data_type, robot):
00036
00037
00038
00039
00040 if namespace not in self.subscriptions:
00041 registerContext(namespace, data_type, robot)
00042 topics = robot["publisher"].keys()
00043 Log("INFO", "Subscribing on context broker to " + data_type + " " + namespace + " and topics: " + str(topics))
00044 subscription = {
00045 "namespace": namespace,
00046 "data_type": data_type,
00047 "topics": topics
00048 }
00049 url = "http://{}:{}/NGSI10/subscribeContext".format(DATA_CONTEXTBROKER["ADDRESS"], DATA_CONTEXTBROKER["PORT"])
00050 subscriber_json = json.dumps(self._generateSubscription(namespace, data_type, topics))
00051 response_body = self._sendRequest(url, subscriber_json)
00052 if response_body is not None:
00053 if "subscribeError" in response_body:
00054 Log("ERROR", "Error Subscribing to Context Broker:")
00055 Log("ERROR", response_body["subscribeError"]["errorCode"]["details"])
00056 os.kill(os.getpid(), signal.SIGINT)
00057 else:
00058 subscription["id"] = response_body["subscribeResponse"]["subscriptionId"]
00059 self.subscriptions[namespace] = subscription
00060 Log("INFO", "Connected to Context Broker with id {}".format(subscription["id"]))
00061 if self.refresh_thread is None:
00062 self.refresh_thread = thread.start_new_thread(self._refreshSubscriptions, ("CBSub-Refresh", 2, ))
00063
00064 def disconnect(self, namespace, delete=False):
00065
00066
00067
00068 if namespace in self.subscriptions:
00069 deleteContext(namespace, True)
00070 subscription = self.subscriptions[namespace]
00071 subscriptionId = subscription["id"]
00072 Log("INFO", "\nDisconnecting Context Broker subscription {}".format(subscriptionId))
00073 url = "http://{}:{}/NGSI10/unsubscribeContext".format(DATA_CONTEXTBROKER["ADDRESS"], DATA_CONTEXTBROKER["PORT"])
00074 disconnect_json = json.dumps({
00075 "subscriptionId": subscriptionId
00076 })
00077 response_body = self._sendRequest(url, disconnect_json)
00078 if response_body is not None:
00079 if int(response_body["statusCode"]["code"]) >= 400:
00080 Log("ERROR", "Error Disconnecting from Context Broker (subscription: {}):".format(subscriptionId))
00081 Log("ERROR", response_body["statusCode"]["reasonPhrase"])
00082 Log("INFO", "\n")
00083 else:
00084 Log("INFO", "Disconnected subscription {} from Context Broker ".format(subscriptionId))
00085
00086 Log("INFO", "Deleting entity")
00087 self.deleteEntity(subscription["namespace"], subscription["data_type"])
00088 Log("INFO", "\n")
00089 if delete:
00090 del self.subscriptions[namespace]
00091
00092 def disconnectAll(self):
00093
00094 deleteAllContexts()
00095 for subscription in self.subscriptions:
00096 self.disconnect(subscription)
00097
00098 def refreshSubscriptions(self):
00099
00100 refreshAllContexts()
00101 for subscription in self.subscriptions.values():
00102 subscriber_dict = self._generateSubscription(subscription["namespace"], subscription["data_type"], subscription["topics"], subscription["id"])
00103 subscriber_dict.pop("entities", None)
00104 subscriber_dict.pop("reference", None)
00105 url = "http://{}:{}/NGSI10/contextSubscriptions/{}".format(DATA_CONTEXTBROKER["ADDRESS"], DATA_CONTEXTBROKER["PORT"], subscription["id"])
00106 subscriber_json = json.dumps(subscriber_dict)
00107 response_body = self._sendRequest(url, subscriber_json, 'PUT')
00108 if response_body is not None:
00109 if "subscribeError" in response_body:
00110 Log("ERROR", "Error Refreshing subscription")
00111 Log("ERROR", response_body["subscribeError"]["errorCode"]["details"])
00112 elif "orionError" in response_body:
00113 Log("ERROR", "Error Refreshing subscription")
00114 Log("ERROR", response_body["orionError"]["details"])
00115 else:
00116 Log("INFO", "Refreshed Connection to Context Broker with id {}".format(subscription["id"]))
00117
00118 def parseData(self, data):
00119
00120
00121
00122 return json.loads(data.replace(SEPARATOR_CHAR, '"'))
00123
00124 def deleteEntity(self, namespace, data_type, removeContext=True):
00125
00126
00127
00128 Log("INFO", "DELETING: ", namespace, data_type)
00129 operation_json = json.dumps({
00130 "contextElements": [
00131 {
00132 "type": data_type,
00133 "isPattern": "false",
00134 "id": namespace
00135 }
00136 ],
00137 "updateAction": "DELETE"
00138 })
00139 url = "http://{}:{}/NGSI10/updateContext".format(DATA_CONTEXTBROKER["ADDRESS"], DATA_CONTEXTBROKER["PORT"])
00140 response_body = self._sendRequest(url, operation_json)
00141 if response_body is not None:
00142 if "errorCode" in response_body:
00143 Log("ERROR", "Error deleting entity")
00144 Log("ERROR", response_body["errorCode"]["details"])
00145 elif "orionError" in response_body:
00146 Log("ERROR", "Error deleting entity")
00147 Log("ERROR", response_body["orionError"]["details"])
00148 else:
00149 Log("INFO", "Deleted entity " + namespace)
00150
00151 if removeContext:
00152 deleteContext(namespace, True)
00153
00154 def _generateSubscription(self, namespace, data_type=DEFAULT_CONTEXT_TYPE, topics=[], subscriptionId=None):
00155
00156
00157
00158
00159 data = {
00160 "entities": [
00161 {
00162 "type": data_type,
00163 "isPattern": "false",
00164 "id": namespace
00165 }
00166 ],
00167
00168 "reference": "http://{}:{}/firos".format(IP, SERVER_PORT),
00169 "duration": SUBSCRIPTION_LENGTH,
00170 "notifyConditions": [
00171 {
00172 "type": "ONCHANGE",
00173 "condValues": topics
00174 }
00175 ],
00176 "throttling": THROTTLING
00177 }
00178 if subscriptionId is not None:
00179 data["subscriptionId"] = str(subscriptionId)
00180 return data
00181
00182 def _refreshSubscriptions(self, threadName, delay):
00183
00184
00185
00186
00187
00188 total_delay = SUBSCRIPTION_REFRESH_DELAY * 60 * 60 * 24
00189 while True:
00190 time.sleep(total_delay)
00191 self.refreshSubscriptions()
00192
00193 def _sendRequest(self, url, data, method=None):
00194
00195
00196
00197
00198 try:
00199 request = urllib2.Request(url, data, {'Content-Type': 'application/json', 'Accept': 'application/json'})
00200 if method is not None:
00201 request.get_method = lambda: method
00202 response = urllib2.urlopen(request)
00203 response_body = json.loads(response.read())
00204 response.close()
00205 return response_body
00206 except Exception as ex:
00207 Log("ERROR", ex.reason)
00208 return None