00001
00002 import logging
00003 from threading import RLock, Lock
00004 import time
00005
00006 from opcua import ua
00007 from opcua.common import utils
00008
00009
00010 class PublishRequestData(object):
00011
00012 def __init__(self):
00013 self.requesthdr = None
00014 self.algohdr = None
00015 self.seqhdr = None
00016 self.timestamp = time.time()
00017
00018
00019 class UaProcessor(object):
00020
00021 def __init__(self, internal_server, socket):
00022 self.logger = logging.getLogger(__name__)
00023 self.iserver = internal_server
00024 self.name = socket.get_extra_info('peername')
00025 self.sockname = socket.get_extra_info('sockname')
00026 self.session = None
00027 self.socket = socket
00028 self._socketlock = Lock()
00029 self._datalock = RLock()
00030 self._publishdata_queue = []
00031 self._publish_result_queue = []
00032 self._connection = ua.SecureConnection(ua.SecurityPolicy())
00033
00034 def set_policies(self, policies):
00035 self._connection.set_policy_factories(policies)
00036
00037 def send_response(self, requesthandle, algohdr, seqhdr, response, msgtype=ua.MessageType.SecureMessage):
00038 with self._socketlock:
00039 response.ResponseHeader.RequestHandle = requesthandle
00040 data = self._connection.message_to_binary(
00041 response.to_binary(), message_type=msgtype, request_id=seqhdr.RequestId, algohdr=algohdr)
00042
00043 self.socket.write(data)
00044
00045 def open_secure_channel(self, algohdr, seqhdr, body):
00046 request = ua.OpenSecureChannelRequest.from_binary(body)
00047
00048 self._connection.select_policy(
00049 algohdr.SecurityPolicyURI, algohdr.SenderCertificate, request.Parameters.SecurityMode)
00050
00051 channel = self._connection.open(request.Parameters, self.iserver)
00052
00053 response = ua.OpenSecureChannelResponse()
00054 response.Parameters = channel
00055 self.send_response(request.RequestHeader.RequestHandle, None, seqhdr, response, ua.MessageType.SecureOpen)
00056
00057 def forward_publish_response(self, result):
00058 self.logger.info("forward publish response %s", result)
00059 with self._datalock:
00060 while True:
00061 if len(self._publishdata_queue) == 0:
00062 self._publish_result_queue.append(result)
00063 self.logger.info("Server wants to send publish answer but no publish request is available,"
00064 "enqueing notification, length of result queue is %s",
00065 len(self._publish_result_queue))
00066 return
00067 requestdata = self._publishdata_queue.pop(0)
00068 if time.time() - requestdata.timestamp < requestdata.requesthdr.TimeoutHint / 1000:
00069 break
00070
00071 response = ua.PublishResponse()
00072 response.Parameters = result
00073
00074 self.send_response(requestdata.requesthdr.RequestHandle, requestdata.algohdr, requestdata.seqhdr, response)
00075
00076 def process(self, header, body):
00077 msg = self._connection.receive_from_header_and_body(header, body)
00078 if isinstance(msg, ua.Message):
00079 if header.MessageType == ua.MessageType.SecureOpen:
00080 self.open_secure_channel(msg.SecurityHeader(), msg.SequenceHeader(), msg.body())
00081
00082 elif header.MessageType == ua.MessageType.SecureClose:
00083 self._connection.close()
00084 return False
00085
00086 elif header.MessageType == ua.MessageType.SecureMessage:
00087 return self.process_message(msg.SecurityHeader(), msg.SequenceHeader(), msg.body())
00088
00089 elif isinstance(msg, ua.Hello):
00090 ack = ua.Acknowledge()
00091 ack.ReceiveBufferSize = msg.ReceiveBufferSize
00092 ack.SendBufferSize = msg.SendBufferSize
00093 data = self._connection.tcp_to_binary(ua.MessageType.Acknowledge, ack)
00094 self.socket.write(data)
00095
00096 elif isinstance(msg, ua.ErrorMessage):
00097 self.logger.warning("Received an error message type")
00098
00099 else:
00100 self.logger.warning("Unsupported message type: %s", header.MessageType)
00101 raise utils.ServiceError(ua.StatusCodes.BadTcpMessageTypeInvalid)
00102 return True
00103
00104 def process_message(self, algohdr, seqhdr, body):
00105 typeid = ua.NodeId.from_binary(body)
00106 requesthdr = ua.RequestHeader.from_binary(body)
00107 try:
00108 return self._process_message(typeid, requesthdr, algohdr, seqhdr, body)
00109 except utils.ServiceError as e:
00110 status = ua.StatusCode(e.code)
00111 response = ua.ServiceFault()
00112 response.ResponseHeader.ServiceResult = status
00113 self.logger.info("sending service fault response: %s (%s)", status.doc, status.name)
00114 self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
00115 return True
00116
00117 def _process_message(self, typeid, requesthdr, algohdr, seqhdr, body):
00118 if typeid == ua.NodeId(ua.ObjectIds.CreateSessionRequest_Encoding_DefaultBinary):
00119 self.logger.info("Create session request")
00120 params = ua.CreateSessionParameters.from_binary(body)
00121
00122
00123 self.session = self.iserver.create_session(self.name, external=True)
00124
00125 sessiondata = self.session.create_session(params, sockname=self.sockname)
00126
00127 response = ua.CreateSessionResponse()
00128 response.Parameters = sessiondata
00129 response.Parameters.ServerCertificate = self._connection._security_policy.client_certificate
00130 if self._connection._security_policy.server_certificate is None:
00131 data = params.ClientNonce
00132 else:
00133 data = self._connection._security_policy.server_certificate + params.ClientNonce
00134 response.Parameters.ServerSignature.Signature =\
00135 self._connection._security_policy.asymmetric_cryptography.signature(data)
00136
00137 response.Parameters.ServerSignature.Algorithm = "http://www.w3.org/2000/09/xmldsig#rsa-sha1"
00138
00139 self.logger.info("sending create sesssion response")
00140 self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
00141
00142 elif typeid == ua.NodeId(ua.ObjectIds.CloseSessionRequest_Encoding_DefaultBinary):
00143 self.logger.info("Close session request")
00144 deletesubs = ua.ua_binary.Primitives.Boolean.unpack(body)
00145
00146 self.session.close_session(deletesubs)
00147
00148 response = ua.CloseSessionResponse()
00149 self.logger.info("sending close sesssion response")
00150 self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
00151
00152 elif typeid == ua.NodeId(ua.ObjectIds.ActivateSessionRequest_Encoding_DefaultBinary):
00153 self.logger.info("Activate session request")
00154 params = ua.ActivateSessionParameters.from_binary(body)
00155
00156 if not self.session:
00157 self.logger.info("request to activate non-existing session")
00158 raise utils.ServiceError(ua.StatusCodes.BadSessionIdInvalid)
00159
00160 if self._connection._security_policy.client_certificate is None:
00161 data = self.session.nonce
00162 else:
00163 data = self._connection._security_policy.client_certificate + self.session.nonce
00164 self._connection._security_policy.asymmetric_cryptography.verify(data, params.ClientSignature.Signature)
00165
00166 result = self.session.activate_session(params)
00167
00168 response = ua.ActivateSessionResponse()
00169 response.Parameters = result
00170
00171 self.logger.info("sending read response")
00172 self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
00173
00174 elif typeid == ua.NodeId(ua.ObjectIds.ReadRequest_Encoding_DefaultBinary):
00175 self.logger.info("Read request")
00176 params = ua.ReadParameters.from_binary(body)
00177
00178 results = self.session.read(params)
00179
00180 response = ua.ReadResponse()
00181 response.Results = results
00182
00183 self.logger.info("sending read response")
00184 self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
00185
00186 elif typeid == ua.NodeId(ua.ObjectIds.WriteRequest_Encoding_DefaultBinary):
00187 self.logger.info("Write request")
00188 params = ua.WriteParameters.from_binary(body)
00189
00190 results = self.session.write(params)
00191
00192 response = ua.WriteResponse()
00193 response.Results = results
00194
00195 self.logger.info("sending write response")
00196 self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
00197
00198 elif typeid == ua.NodeId(ua.ObjectIds.BrowseRequest_Encoding_DefaultBinary):
00199 self.logger.info("Browse request")
00200 params = ua.BrowseParameters.from_binary(body)
00201
00202 results = self.session.browse(params)
00203
00204 response = ua.BrowseResponse()
00205 response.Results = results
00206
00207 self.logger.info("sending browse response")
00208 self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
00209
00210 elif typeid == ua.NodeId(ua.ObjectIds.GetEndpointsRequest_Encoding_DefaultBinary):
00211 self.logger.info("get endpoints request")
00212 params = ua.GetEndpointsParameters.from_binary(body)
00213
00214 endpoints = self.iserver.get_endpoints(params, sockname=self.sockname)
00215
00216 response = ua.GetEndpointsResponse()
00217 response.Endpoints = endpoints
00218
00219 self.logger.info("sending get endpoints response")
00220 self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
00221
00222 elif typeid == ua.NodeId(ua.ObjectIds.FindServersRequest_Encoding_DefaultBinary):
00223 self.logger.info("find servers request")
00224 params = ua.FindServersParameters.from_binary(body)
00225
00226 servers = self.iserver.find_servers(params)
00227
00228 response = ua.FindServersResponse()
00229 response.Servers = servers
00230
00231 self.logger.info("sending find servers response")
00232 self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
00233
00234 elif typeid == ua.NodeId(ua.ObjectIds.RegisterServerRequest_Encoding_DefaultBinary):
00235 self.logger.info("register server request")
00236 serv = ua.RegisteredServer.from_binary(body)
00237
00238 self.iserver.register_server(serv)
00239
00240 response = ua.RegisterServerResponse()
00241
00242 self.logger.info("sending register server response")
00243 self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
00244
00245 elif typeid == ua.NodeId(ua.ObjectIds.RegisterServer2Request_Encoding_DefaultBinary):
00246 self.logger.info("register server 2 request")
00247 params = ua.RegisterServer2Parameters.from_binary(body)
00248
00249 results = self.iserver.register_server2(params)
00250
00251 response = ua.RegisterServer2Response()
00252 response.ConfigurationResults = results
00253
00254 self.logger.info("sending register server 2 response")
00255 self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
00256
00257 elif typeid == ua.NodeId(ua.ObjectIds.TranslateBrowsePathsToNodeIdsRequest_Encoding_DefaultBinary):
00258 self.logger.info("translate browsepaths to nodeids request")
00259 params = ua.TranslateBrowsePathsToNodeIdsParameters.from_binary(body)
00260
00261 paths = self.session.translate_browsepaths_to_nodeids(params.BrowsePaths)
00262
00263 response = ua.TranslateBrowsePathsToNodeIdsResponse()
00264 response.Results = paths
00265
00266 self.logger.info("sending translate browsepaths to nodeids response")
00267 self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
00268
00269 elif typeid == ua.NodeId(ua.ObjectIds.AddNodesRequest_Encoding_DefaultBinary):
00270 self.logger.info("add nodes request")
00271 params = ua.AddNodesParameters.from_binary(body)
00272
00273 results = self.session.add_nodes(params.NodesToAdd)
00274
00275 response = ua.AddNodesResponse()
00276 response.Results = results
00277
00278 self.logger.info("sending add node response")
00279 self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
00280
00281 elif typeid == ua.NodeId(ua.ObjectIds.DeleteNodesRequest_Encoding_DefaultBinary):
00282 self.logger.info("delete nodes request")
00283 params = ua.DeleteNodesParameters.from_binary(body)
00284
00285 results = self.session.delete_nodes(params)
00286
00287 response = ua.DeleteNodesResponse()
00288 response.Results = results
00289
00290 self.logger.info("sending delete node response")
00291 self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
00292
00293 elif typeid == ua.NodeId(ua.ObjectIds.CreateSubscriptionRequest_Encoding_DefaultBinary):
00294 self.logger.info("create subscription request")
00295 params = ua.CreateSubscriptionParameters.from_binary(body)
00296
00297 result = self.session.create_subscription(params, self.forward_publish_response)
00298
00299 response = ua.CreateSubscriptionResponse()
00300 response.Parameters = result
00301
00302 self.logger.info("sending create subscription response")
00303 self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
00304
00305 elif typeid == ua.NodeId(ua.ObjectIds.DeleteSubscriptionsRequest_Encoding_DefaultBinary):
00306 self.logger.info("delete subscriptions request")
00307 params = ua.DeleteSubscriptionsParameters.from_binary(body)
00308
00309 results = self.session.delete_subscriptions(params.SubscriptionIds)
00310
00311 response = ua.DeleteSubscriptionsResponse()
00312 response.Results = results
00313
00314 self.logger.info("sending delte subscription response")
00315 self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
00316
00317 elif typeid == ua.NodeId(ua.ObjectIds.CreateMonitoredItemsRequest_Encoding_DefaultBinary):
00318 self.logger.info("create monitored items request")
00319 params = ua.CreateMonitoredItemsParameters.from_binary(body)
00320 results = self.session.create_monitored_items(params)
00321
00322 response = ua.CreateMonitoredItemsResponse()
00323 response.Results = results
00324
00325 self.logger.info("sending create monitored items response")
00326 self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
00327
00328 elif typeid == ua.NodeId(ua.ObjectIds.ModifyMonitoredItemsRequest_Encoding_DefaultBinary):
00329 self.logger.info("modify monitored items request")
00330 params = ua.ModifyMonitoredItemsParameters.from_binary(body)
00331 results = self.session.modify_monitored_items(params)
00332
00333 response = ua.ModifyMonitoredItemsResponse()
00334 response.Results = results
00335
00336 self.logger.info("sending modify monitored items response")
00337 self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
00338
00339 elif typeid == ua.NodeId(ua.ObjectIds.DeleteMonitoredItemsRequest_Encoding_DefaultBinary):
00340 self.logger.info("delete monitored items request")
00341 params = ua.DeleteMonitoredItemsParameters.from_binary(body)
00342
00343 results = self.session.delete_monitored_items(params)
00344
00345 response = ua.DeleteMonitoredItemsResponse()
00346 response.Results = results
00347
00348 self.logger.info("sending delete monitored items response")
00349 self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
00350
00351 elif typeid == ua.NodeId(ua.ObjectIds.HistoryReadRequest_Encoding_DefaultBinary):
00352 self.logger.info("history read request")
00353 params = ua.HistoryReadParameters.from_binary(body)
00354
00355 results = self.session.history_read(params)
00356
00357 response = ua.HistoryReadResponse()
00358 response.Results = results
00359
00360 self.logger.info("sending history read response")
00361 self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
00362
00363 elif typeid == ua.NodeId(ua.ObjectIds.RegisterNodesRequest_Encoding_DefaultBinary):
00364 self.logger.info("register nodes request")
00365 params = ua.RegisterNodesParameters.from_binary(body)
00366 self.logger.info("Node registration not implemented")
00367
00368 response = ua.RegisterNodesResponse()
00369 response.Parameters.RegisteredNodeIds = params.NodesToRegister
00370
00371 self.logger.info("sending register nodes response")
00372 self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
00373
00374 elif typeid == ua.NodeId(ua.ObjectIds.UnregisterNodesRequest_Encoding_DefaultBinary):
00375 self.logger.info("unregister nodes request")
00376 params = ua.UnregisterNodesParameters.from_binary(body)
00377
00378 response = ua.UnregisterNodesResponse()
00379
00380 self.logger.info("sending unregister nodes response")
00381 self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
00382
00383 elif typeid == ua.NodeId(ua.ObjectIds.PublishRequest_Encoding_DefaultBinary):
00384 self.logger.info("publish request")
00385
00386 if not self.session:
00387 return False
00388
00389 params = ua.PublishParameters.from_binary(body)
00390
00391 data = PublishRequestData()
00392 data.requesthdr = requesthdr
00393 data.seqhdr = seqhdr
00394 data.algohdr = algohdr
00395 with self._datalock:
00396 self._publishdata_queue.append(data)
00397 if self._publish_result_queue:
00398 result = self._publish_result_queue.pop(0)
00399 self.forward_publish_response(result)
00400 self.session.publish(params.SubscriptionAcknowledgements)
00401 self.logger.info("publish forward to server")
00402
00403 elif typeid == ua.NodeId(ua.ObjectIds.RepublishRequest_Encoding_DefaultBinary):
00404 self.logger.info("re-publish request")
00405
00406 params = ua.RepublishParameters.from_binary(body)
00407 msg = self.session.republish(params)
00408
00409 response = ua.RepublishResponse()
00410 response.NotificationMessage = msg
00411
00412 self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
00413
00414 elif typeid == ua.NodeId(ua.ObjectIds.CloseSecureChannelRequest_Encoding_DefaultBinary):
00415 self.logger.info("close secure channel request")
00416 self._connection.close()
00417 response = ua.CloseSecureChannelResponse()
00418 self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
00419 return False
00420
00421 elif typeid == ua.NodeId(ua.ObjectIds.CallRequest_Encoding_DefaultBinary):
00422 self.logger.info("call request")
00423
00424 params = ua.CallParameters.from_binary(body)
00425
00426 results = self.session.call(params.MethodsToCall)
00427
00428 response = ua.CallResponse()
00429 response.Results = results
00430
00431 self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
00432
00433 else:
00434 self.logger.warning("Unknown message received %s", typeid)
00435 raise utils.ServiceError(ua.StatusCodes.BadNotImplemented)
00436
00437 return True
00438
00439 def close(self):
00440 """
00441 to be called when client has disconnected to ensure we really close
00442 everything we should
00443 """
00444 print("Cleanup client connection: ", self.name)
00445 if self.session:
00446 self.session.close_session(True)