cbSubscriber.py
Go to the documentation of this file.
00001 # MIT License
00002 #
00003 # Copyright (c) <2015> <Ikergune, Etxetar>
00004 #
00005 # Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files
00006 # (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge,
00007 # publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so,
00008 # subject to the following conditions:
00009 #
00010 # The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
00011 #
00012 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
00013 # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE
00014 # FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
00015 # WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
00016 
00017 import os
00018 import json
00019 import time
00020 import thread
00021 import signal
00022 import urllib2
00023 
00024 from include.constants import *
00025 from include.logger import Log
00026 from include.pubsub.iPubSub import Isubscriber
00027 from include.pubsub.contextbroker.ngsi9 import registerContext, deleteContext, deleteAllContexts, refreshAllContexts
00028 
00029 
00030 class CbSubscriber(Isubscriber):
00031     ## \brief Context broker subscription handler
00032     subscriptions = {}
00033     refresh_thread = None
00034 
00035     def subscribe(self, namespace, data_type, robot):
00036         ## \brief Subscribe to entities' changes
00037         # \param entity name
00038         # \param entity type
00039         # \param robot object
00040         if namespace not in self.subscriptions:
00041             registerContext(namespace, data_type, robot)
00042             topics = robot["publisher"].keys()
00043             Log("INFO", "Subscribing on context broker to " + data_type + " " + namespace + " and topics: " + str(topics))
00044             subscription = {
00045                 "namespace": namespace,
00046                 "data_type": data_type,
00047                 "topics": topics
00048             }
00049             url = "http://{}:{}/NGSI10/subscribeContext".format(DATA_CONTEXTBROKER["ADDRESS"], DATA_CONTEXTBROKER["PORT"])
00050             subscriber_json = json.dumps(self._generateSubscription(namespace, data_type, topics))
00051             response_body = self._sendRequest(url, subscriber_json)
00052             if response_body is not None:
00053                 if "subscribeError" in response_body:
00054                     Log("ERROR", "Error Subscribing to Context Broker:")
00055                     Log("ERROR", response_body["subscribeError"]["errorCode"]["details"])
00056                     os.kill(os.getpid(), signal.SIGINT)
00057                 else:
00058                     subscription["id"] = response_body["subscribeResponse"]["subscriptionId"]
00059                     self.subscriptions[namespace] = subscription
00060                     Log("INFO", "Connected to Context Broker with id {}".format(subscription["id"]))
00061             if self.refresh_thread is None:
00062                 self.refresh_thread = thread.start_new_thread(self._refreshSubscriptions, ("CBSub-Refresh", 2, ))
00063 
00064     def disconnect(self, namespace, delete=False):
00065         ## \brief Delete subscription by namespace
00066         # \param entity name
00067         # \param flag to indicate if the subscription must be deleted locally (False by default)
00068         if namespace in self.subscriptions:
00069             deleteContext(namespace, True)
00070             subscription = self.subscriptions[namespace]
00071             subscriptionId = subscription["id"]
00072             Log("INFO", "\nDisconnecting Context Broker subscription {}".format(subscriptionId))
00073             url = "http://{}:{}/NGSI10/unsubscribeContext".format(DATA_CONTEXTBROKER["ADDRESS"], DATA_CONTEXTBROKER["PORT"])
00074             disconnect_json = json.dumps({
00075                 "subscriptionId": subscriptionId
00076             })
00077             response_body = self._sendRequest(url, disconnect_json)
00078             if response_body is not None:
00079                 if int(response_body["statusCode"]["code"]) >= 400:
00080                     Log("ERROR", "Error Disconnecting from Context Broker (subscription: {}):".format(subscriptionId))
00081                     Log("ERROR", response_body["statusCode"]["reasonPhrase"])
00082                     Log("INFO", "\n")
00083                 else:
00084                     Log("INFO", "Disconnected subscription {} from Context Broker ".format(subscriptionId))
00085 
00086             Log("INFO", "Deleting entity")
00087             self.deleteEntity(subscription["namespace"], subscription["data_type"])
00088             Log("INFO", "\n")
00089             if delete:
00090                 del self.subscriptions[namespace]
00091 
00092     def disconnectAll(self):
00093         ## \brief Delete all subscriptions
00094         deleteAllContexts()
00095         for subscription in self.subscriptions:
00096             self.disconnect(subscription)
00097 
00098     def refreshSubscriptions(self):
00099         ## \brief Refresh exisiting subscriptions on context broker
00100         refreshAllContexts()
00101         for subscription in self.subscriptions.values():
00102             subscriber_dict = self._generateSubscription(subscription["namespace"], subscription["data_type"], subscription["topics"], subscription["id"])
00103             subscriber_dict.pop("entities", None)
00104             subscriber_dict.pop("reference", None)
00105             url = "http://{}:{}/NGSI10/contextSubscriptions/{}".format(DATA_CONTEXTBROKER["ADDRESS"], DATA_CONTEXTBROKER["PORT"], subscription["id"])
00106             subscriber_json = json.dumps(subscriber_dict)
00107             response_body = self._sendRequest(url, subscriber_json, 'PUT')
00108             if response_body is not None:
00109                 if "subscribeError" in response_body:
00110                     Log("ERROR", "Error Refreshing subscription")
00111                     Log("ERROR", response_body["subscribeError"]["errorCode"]["details"])
00112                 elif "orionError" in response_body:
00113                     Log("ERROR", "Error Refreshing subscription")
00114                     Log("ERROR", response_body["orionError"]["details"])
00115                 else:
00116                     Log("INFO", "Refreshed Connection to Context Broker with id {}".format(subscription["id"]))
00117 
00118     def parseData(self, data):
00119         ## \brief Parse the received data
00120         # \param data
00121         # print data
00122         return json.loads(data.replace(SEPARATOR_CHAR, '"'))
00123 
00124     def deleteEntity(self, namespace, data_type, removeContext=True):
00125         ## \brief Delete entity from context broker
00126         # \param entity name
00127         # \param entity type
00128         Log("INFO", "DELETING: ", namespace, data_type)
00129         operation_json = json.dumps({
00130             "contextElements": [
00131                 {
00132                     "type": data_type,
00133                     "isPattern": "false",
00134                     "id": namespace
00135                 }
00136             ],
00137             "updateAction": "DELETE"
00138         })
00139         url = "http://{}:{}/NGSI10/updateContext".format(DATA_CONTEXTBROKER["ADDRESS"], DATA_CONTEXTBROKER["PORT"])
00140         response_body = self._sendRequest(url, operation_json)
00141         if response_body is not None:
00142             if "errorCode" in response_body:
00143                 Log("ERROR", "Error deleting entity")
00144                 Log("ERROR", response_body["errorCode"]["details"])
00145             elif "orionError" in response_body:
00146                 Log("ERROR", "Error deleting entity")
00147                 Log("ERROR", response_body["orionError"]["details"])
00148             else:
00149                 Log("INFO", "Deleted entity " + namespace)
00150 
00151         if removeContext:
00152             deleteContext(namespace, True)
00153 
00154     def _generateSubscription(self, namespace, data_type=DEFAULT_CONTEXT_TYPE, topics=[], subscriptionId=None):
00155         ## \brief Generate subscription message
00156         # \param entity name
00157         # \param entity type
00158         # \param entity's topics
00159         data = {
00160             "entities": [
00161                 {
00162                     "type": data_type,
00163                     "isPattern": "false",
00164                     "id": namespace
00165                 }
00166             ],
00167             # "attributes": topics,
00168             "reference": "http://{}:{}/firos".format(IP, SERVER_PORT),
00169             "duration": SUBSCRIPTION_LENGTH,
00170             "notifyConditions": [
00171                 {
00172                     "type": "ONCHANGE",
00173                     "condValues": topics
00174                 }
00175             ],
00176             "throttling": THROTTLING
00177         }
00178         if subscriptionId is not None:
00179             data["subscriptionId"] = str(subscriptionId)
00180         return data
00181 
00182     def _refreshSubscriptions(self, threadName, delay):
00183         ## \brief Thread handler for subscripiton refresh
00184         # \param theradname
00185         # \param delay time
00186 
00187         # Seconds to days
00188         total_delay = SUBSCRIPTION_REFRESH_DELAY * 60 * 60 * 24
00189         while True:
00190             time.sleep(total_delay)
00191             self.refreshSubscriptions()
00192 
00193     def _sendRequest(self, url, data, method=None):
00194         ## \brief Send request to context broker
00195         # \param url to request to
00196         # \param data to send
00197         # \param HTTP method (GET by default)
00198         try:
00199             request = urllib2.Request(url, data, {'Content-Type': 'application/json', 'Accept': 'application/json'})
00200             if method is not None:
00201                 request.get_method = lambda: method
00202             response = urllib2.urlopen(request)
00203             response_body = json.loads(response.read())
00204             response.close()
00205             return response_body
00206         except Exception as ex:
00207             Log("ERROR", ex.reason)
00208             return None


firos
Author(s): IƱigo Gonzalez, igonzalez@ikergune.com
autogenerated on Thu Jun 6 2019 17:51:04