uaprocessor.py
Go to the documentation of this file.
00001 
00002 import logging
00003 from threading import RLock, Lock
00004 import time
00005 
00006 from opcua import ua
00007 from opcua.common import utils
00008 
00009 
00010 class PublishRequestData(object):
00011 
00012     def __init__(self):
00013         self.requesthdr = None
00014         self.algohdr = None
00015         self.seqhdr = None
00016         self.timestamp = time.time()
00017 
00018 
00019 class UaProcessor(object):
00020 
00021     def __init__(self, internal_server, socket):
00022         self.logger = logging.getLogger(__name__)
00023         self.iserver = internal_server
00024         self.name = socket.get_extra_info('peername')
00025         self.sockname = socket.get_extra_info('sockname')
00026         self.session = None
00027         self.socket = socket
00028         self._socketlock = Lock()
00029         self._datalock = RLock()
00030         self._publishdata_queue = []
00031         self._publish_result_queue = []  # used when we need to wait for PublishRequest
00032         self._connection = ua.SecureConnection(ua.SecurityPolicy())
00033 
00034     def set_policies(self, policies):
00035         self._connection.set_policy_factories(policies)
00036 
00037     def send_response(self, requesthandle, algohdr, seqhdr, response, msgtype=ua.MessageType.SecureMessage):
00038         with self._socketlock:
00039             response.ResponseHeader.RequestHandle = requesthandle
00040             data = self._connection.message_to_binary(
00041                 response.to_binary(), message_type=msgtype, request_id=seqhdr.RequestId, algohdr=algohdr)
00042 
00043             self.socket.write(data)
00044 
00045     def open_secure_channel(self, algohdr, seqhdr, body):
00046         request = ua.OpenSecureChannelRequest.from_binary(body)
00047 
00048         self._connection.select_policy(
00049             algohdr.SecurityPolicyURI, algohdr.SenderCertificate, request.Parameters.SecurityMode)
00050 
00051         channel = self._connection.open(request.Parameters, self.iserver)
00052         # send response
00053         response = ua.OpenSecureChannelResponse()
00054         response.Parameters = channel
00055         self.send_response(request.RequestHeader.RequestHandle, None, seqhdr, response, ua.MessageType.SecureOpen)
00056 
00057     def forward_publish_response(self, result):
00058         self.logger.info("forward publish response %s", result)
00059         with self._datalock:
00060             while True:
00061                 if len(self._publishdata_queue) == 0:
00062                     self._publish_result_queue.append(result)
00063                     self.logger.info("Server wants to send publish answer but no publish request is available,"
00064                                      "enqueing notification, length of result queue is %s",
00065                                      len(self._publish_result_queue))
00066                     return
00067                 requestdata = self._publishdata_queue.pop(0)
00068                 if time.time() - requestdata.timestamp < requestdata.requesthdr.TimeoutHint / 1000:
00069                     break
00070 
00071         response = ua.PublishResponse()
00072         response.Parameters = result
00073 
00074         self.send_response(requestdata.requesthdr.RequestHandle, requestdata.algohdr, requestdata.seqhdr, response)
00075 
00076     def process(self, header, body):
00077         msg = self._connection.receive_from_header_and_body(header, body)
00078         if isinstance(msg, ua.Message):
00079             if header.MessageType == ua.MessageType.SecureOpen:
00080                 self.open_secure_channel(msg.SecurityHeader(), msg.SequenceHeader(), msg.body())
00081 
00082             elif header.MessageType == ua.MessageType.SecureClose:
00083                 self._connection.close()
00084                 return False
00085 
00086             elif header.MessageType == ua.MessageType.SecureMessage:
00087                 return self.process_message(msg.SecurityHeader(), msg.SequenceHeader(), msg.body())
00088 
00089         elif isinstance(msg, ua.Hello):
00090             ack = ua.Acknowledge()
00091             ack.ReceiveBufferSize = msg.ReceiveBufferSize
00092             ack.SendBufferSize = msg.SendBufferSize
00093             data = self._connection.tcp_to_binary(ua.MessageType.Acknowledge, ack)
00094             self.socket.write(data)
00095 
00096         elif isinstance(msg, ua.ErrorMessage):
00097             self.logger.warning("Received an error message type")
00098 
00099         else:
00100             self.logger.warning("Unsupported message type: %s", header.MessageType)
00101             raise utils.ServiceError(ua.StatusCodes.BadTcpMessageTypeInvalid)
00102         return True
00103 
00104     def process_message(self, algohdr, seqhdr, body):
00105         typeid = ua.NodeId.from_binary(body)
00106         requesthdr = ua.RequestHeader.from_binary(body)
00107         try:
00108             return self._process_message(typeid, requesthdr, algohdr, seqhdr, body)
00109         except utils.ServiceError as e:
00110             status = ua.StatusCode(e.code)
00111             response = ua.ServiceFault()
00112             response.ResponseHeader.ServiceResult = status
00113             self.logger.info("sending service fault response: %s (%s)", status.doc, status.name)
00114             self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
00115             return True
00116 
00117     def _process_message(self, typeid, requesthdr, algohdr, seqhdr, body):
00118         if typeid == ua.NodeId(ua.ObjectIds.CreateSessionRequest_Encoding_DefaultBinary):
00119             self.logger.info("Create session request")
00120             params = ua.CreateSessionParameters.from_binary(body)
00121 
00122             # create the session on server
00123             self.session = self.iserver.create_session(self.name, external=True)
00124             # get a session creation result to send back
00125             sessiondata = self.session.create_session(params, sockname=self.sockname)
00126 
00127             response = ua.CreateSessionResponse()
00128             response.Parameters = sessiondata
00129             response.Parameters.ServerCertificate = self._connection._security_policy.client_certificate
00130             if self._connection._security_policy.server_certificate is None:
00131                 data = params.ClientNonce
00132             else:
00133                 data = self._connection._security_policy.server_certificate + params.ClientNonce
00134             response.Parameters.ServerSignature.Signature =\
00135                 self._connection._security_policy.asymmetric_cryptography.signature(data)
00136 
00137             response.Parameters.ServerSignature.Algorithm = "http://www.w3.org/2000/09/xmldsig#rsa-sha1"
00138 
00139             self.logger.info("sending create sesssion response")
00140             self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
00141 
00142         elif typeid == ua.NodeId(ua.ObjectIds.CloseSessionRequest_Encoding_DefaultBinary):
00143             self.logger.info("Close session request")
00144             deletesubs = ua.ua_binary.Primitives.Boolean.unpack(body)
00145 
00146             self.session.close_session(deletesubs)
00147 
00148             response = ua.CloseSessionResponse()
00149             self.logger.info("sending close sesssion response")
00150             self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
00151 
00152         elif typeid == ua.NodeId(ua.ObjectIds.ActivateSessionRequest_Encoding_DefaultBinary):
00153             self.logger.info("Activate session request")
00154             params = ua.ActivateSessionParameters.from_binary(body)
00155 
00156             if not self.session:
00157                 self.logger.info("request to activate non-existing session")
00158                 raise utils.ServiceError(ua.StatusCodes.BadSessionIdInvalid)
00159 
00160             if self._connection._security_policy.client_certificate is None:
00161                 data = self.session.nonce
00162             else:
00163                 data = self._connection._security_policy.client_certificate + self.session.nonce
00164             self._connection._security_policy.asymmetric_cryptography.verify(data, params.ClientSignature.Signature)
00165 
00166             result = self.session.activate_session(params)
00167 
00168             response = ua.ActivateSessionResponse()
00169             response.Parameters = result
00170 
00171             self.logger.info("sending read response")
00172             self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
00173 
00174         elif typeid == ua.NodeId(ua.ObjectIds.ReadRequest_Encoding_DefaultBinary):
00175             self.logger.info("Read request")
00176             params = ua.ReadParameters.from_binary(body)
00177 
00178             results = self.session.read(params)
00179 
00180             response = ua.ReadResponse()
00181             response.Results = results
00182 
00183             self.logger.info("sending read response")
00184             self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
00185 
00186         elif typeid == ua.NodeId(ua.ObjectIds.WriteRequest_Encoding_DefaultBinary):
00187             self.logger.info("Write request")
00188             params = ua.WriteParameters.from_binary(body)
00189 
00190             results = self.session.write(params)
00191 
00192             response = ua.WriteResponse()
00193             response.Results = results
00194 
00195             self.logger.info("sending write response")
00196             self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
00197 
00198         elif typeid == ua.NodeId(ua.ObjectIds.BrowseRequest_Encoding_DefaultBinary):
00199             self.logger.info("Browse request")
00200             params = ua.BrowseParameters.from_binary(body)
00201 
00202             results = self.session.browse(params)
00203 
00204             response = ua.BrowseResponse()
00205             response.Results = results
00206 
00207             self.logger.info("sending browse response")
00208             self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
00209 
00210         elif typeid == ua.NodeId(ua.ObjectIds.GetEndpointsRequest_Encoding_DefaultBinary):
00211             self.logger.info("get endpoints request")
00212             params = ua.GetEndpointsParameters.from_binary(body)
00213 
00214             endpoints = self.iserver.get_endpoints(params, sockname=self.sockname)
00215 
00216             response = ua.GetEndpointsResponse()
00217             response.Endpoints = endpoints
00218 
00219             self.logger.info("sending get endpoints response")
00220             self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
00221 
00222         elif typeid == ua.NodeId(ua.ObjectIds.FindServersRequest_Encoding_DefaultBinary):
00223             self.logger.info("find servers request")
00224             params = ua.FindServersParameters.from_binary(body)
00225 
00226             servers = self.iserver.find_servers(params)
00227 
00228             response = ua.FindServersResponse()
00229             response.Servers = servers
00230 
00231             self.logger.info("sending find servers response")
00232             self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
00233 
00234         elif typeid == ua.NodeId(ua.ObjectIds.RegisterServerRequest_Encoding_DefaultBinary):
00235             self.logger.info("register server request")
00236             serv = ua.RegisteredServer.from_binary(body)
00237 
00238             self.iserver.register_server(serv)
00239 
00240             response = ua.RegisterServerResponse()
00241 
00242             self.logger.info("sending register server response")
00243             self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
00244 
00245         elif typeid == ua.NodeId(ua.ObjectIds.RegisterServer2Request_Encoding_DefaultBinary):
00246             self.logger.info("register server 2 request")
00247             params = ua.RegisterServer2Parameters.from_binary(body)
00248 
00249             results = self.iserver.register_server2(params)
00250 
00251             response = ua.RegisterServer2Response()
00252             response.ConfigurationResults = results
00253 
00254             self.logger.info("sending register server 2 response")
00255             self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
00256 
00257         elif typeid == ua.NodeId(ua.ObjectIds.TranslateBrowsePathsToNodeIdsRequest_Encoding_DefaultBinary):
00258             self.logger.info("translate browsepaths to nodeids request")
00259             params = ua.TranslateBrowsePathsToNodeIdsParameters.from_binary(body)
00260 
00261             paths = self.session.translate_browsepaths_to_nodeids(params.BrowsePaths)
00262 
00263             response = ua.TranslateBrowsePathsToNodeIdsResponse()
00264             response.Results = paths
00265 
00266             self.logger.info("sending translate browsepaths to nodeids response")
00267             self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
00268 
00269         elif typeid == ua.NodeId(ua.ObjectIds.AddNodesRequest_Encoding_DefaultBinary):
00270             self.logger.info("add nodes request")
00271             params = ua.AddNodesParameters.from_binary(body)
00272 
00273             results = self.session.add_nodes(params.NodesToAdd)
00274 
00275             response = ua.AddNodesResponse()
00276             response.Results = results
00277 
00278             self.logger.info("sending add node response")
00279             self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
00280 
00281         elif typeid == ua.NodeId(ua.ObjectIds.DeleteNodesRequest_Encoding_DefaultBinary):
00282             self.logger.info("delete nodes request")
00283             params = ua.DeleteNodesParameters.from_binary(body)
00284 
00285             results = self.session.delete_nodes(params)
00286 
00287             response = ua.DeleteNodesResponse()
00288             response.Results = results
00289 
00290             self.logger.info("sending delete node response")
00291             self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
00292 
00293         elif typeid == ua.NodeId(ua.ObjectIds.CreateSubscriptionRequest_Encoding_DefaultBinary):
00294             self.logger.info("create subscription request")
00295             params = ua.CreateSubscriptionParameters.from_binary(body)
00296 
00297             result = self.session.create_subscription(params, self.forward_publish_response)
00298 
00299             response = ua.CreateSubscriptionResponse()
00300             response.Parameters = result
00301 
00302             self.logger.info("sending create subscription response")
00303             self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
00304 
00305         elif typeid == ua.NodeId(ua.ObjectIds.DeleteSubscriptionsRequest_Encoding_DefaultBinary):
00306             self.logger.info("delete subscriptions request")
00307             params = ua.DeleteSubscriptionsParameters.from_binary(body)
00308 
00309             results = self.session.delete_subscriptions(params.SubscriptionIds)
00310 
00311             response = ua.DeleteSubscriptionsResponse()
00312             response.Results = results
00313 
00314             self.logger.info("sending delte subscription response")
00315             self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
00316 
00317         elif typeid == ua.NodeId(ua.ObjectIds.CreateMonitoredItemsRequest_Encoding_DefaultBinary):
00318             self.logger.info("create monitored items request")
00319             params = ua.CreateMonitoredItemsParameters.from_binary(body)
00320             results = self.session.create_monitored_items(params)
00321 
00322             response = ua.CreateMonitoredItemsResponse()
00323             response.Results = results
00324 
00325             self.logger.info("sending create monitored items response")
00326             self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
00327 
00328         elif typeid == ua.NodeId(ua.ObjectIds.ModifyMonitoredItemsRequest_Encoding_DefaultBinary):
00329             self.logger.info("modify monitored items request")
00330             params = ua.ModifyMonitoredItemsParameters.from_binary(body)
00331             results = self.session.modify_monitored_items(params)
00332 
00333             response = ua.ModifyMonitoredItemsResponse()
00334             response.Results = results
00335 
00336             self.logger.info("sending modify monitored items response")
00337             self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
00338 
00339         elif typeid == ua.NodeId(ua.ObjectIds.DeleteMonitoredItemsRequest_Encoding_DefaultBinary):
00340             self.logger.info("delete monitored items request")
00341             params = ua.DeleteMonitoredItemsParameters.from_binary(body)
00342 
00343             results = self.session.delete_monitored_items(params)
00344 
00345             response = ua.DeleteMonitoredItemsResponse()
00346             response.Results = results
00347 
00348             self.logger.info("sending delete monitored items response")
00349             self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
00350 
00351         elif typeid == ua.NodeId(ua.ObjectIds.HistoryReadRequest_Encoding_DefaultBinary):
00352             self.logger.info("history read request")
00353             params = ua.HistoryReadParameters.from_binary(body)
00354 
00355             results = self.session.history_read(params)
00356 
00357             response = ua.HistoryReadResponse()
00358             response.Results = results
00359 
00360             self.logger.info("sending history read response")
00361             self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
00362 
00363         elif typeid == ua.NodeId(ua.ObjectIds.RegisterNodesRequest_Encoding_DefaultBinary):
00364             self.logger.info("register nodes request")
00365             params = ua.RegisterNodesParameters.from_binary(body)
00366             self.logger.info("Node registration not implemented")
00367 
00368             response = ua.RegisterNodesResponse()
00369             response.Parameters.RegisteredNodeIds = params.NodesToRegister
00370 
00371             self.logger.info("sending register nodes response")
00372             self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
00373 
00374         elif typeid == ua.NodeId(ua.ObjectIds.UnregisterNodesRequest_Encoding_DefaultBinary):
00375             self.logger.info("unregister nodes request")
00376             params = ua.UnregisterNodesParameters.from_binary(body)
00377 
00378             response = ua.UnregisterNodesResponse()
00379 
00380             self.logger.info("sending unregister nodes response")
00381             self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
00382 
00383         elif typeid == ua.NodeId(ua.ObjectIds.PublishRequest_Encoding_DefaultBinary):
00384             self.logger.info("publish request")
00385 
00386             if not self.session:
00387                 return False
00388 
00389             params = ua.PublishParameters.from_binary(body)
00390 
00391             data = PublishRequestData()
00392             data.requesthdr = requesthdr
00393             data.seqhdr = seqhdr
00394             data.algohdr = algohdr
00395             with self._datalock:
00396                 self._publishdata_queue.append(data)  # will be used to send publish answers from server
00397                 if self._publish_result_queue:
00398                     result = self._publish_result_queue.pop(0)
00399                     self.forward_publish_response(result)
00400             self.session.publish(params.SubscriptionAcknowledgements)
00401             self.logger.info("publish forward to server")
00402 
00403         elif typeid == ua.NodeId(ua.ObjectIds.RepublishRequest_Encoding_DefaultBinary):
00404             self.logger.info("re-publish request")
00405 
00406             params = ua.RepublishParameters.from_binary(body)
00407             msg = self.session.republish(params)
00408 
00409             response = ua.RepublishResponse()
00410             response.NotificationMessage = msg
00411 
00412             self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
00413 
00414         elif typeid == ua.NodeId(ua.ObjectIds.CloseSecureChannelRequest_Encoding_DefaultBinary):
00415             self.logger.info("close secure channel request")
00416             self._connection.close()
00417             response = ua.CloseSecureChannelResponse()
00418             self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
00419             return False
00420 
00421         elif typeid == ua.NodeId(ua.ObjectIds.CallRequest_Encoding_DefaultBinary):
00422             self.logger.info("call request")
00423 
00424             params = ua.CallParameters.from_binary(body)
00425 
00426             results = self.session.call(params.MethodsToCall)
00427 
00428             response = ua.CallResponse()
00429             response.Results = results
00430 
00431             self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
00432 
00433         else:
00434             self.logger.warning("Unknown message received %s", typeid)
00435             raise utils.ServiceError(ua.StatusCodes.BadNotImplemented)
00436 
00437         return True
00438 
00439     def close(self):
00440         """
00441         to be called when client has disconnected to ensure we really close
00442         everything we should
00443         """
00444         print("Cleanup client connection: ", self.name)
00445         if self.session:
00446             self.session.close_session(True)


ros_opcua_impl_python_opcua
Author(s): Denis Štogl , Daniel Draper
autogenerated on Sat Jun 8 2019 18:26:23