3 from threading
import RLock, Lock
22 self.
logger = logging.getLogger(__name__)
24 self.
name = socket.get_extra_info(
'peername')
25 self.
sockname = socket.get_extra_info(
'sockname')
35 self._connection.set_policy_factories(policies)
37 def send_response(self, requesthandle, algohdr, seqhdr, response, msgtype=ua.MessageType.SecureMessage):
39 response.ResponseHeader.RequestHandle = requesthandle
40 data = self._connection.message_to_binary(
41 response.to_binary(), message_type=msgtype, request_id=seqhdr.RequestId, algohdr=algohdr)
43 self.socket.write(data)
46 request = ua.OpenSecureChannelRequest.from_binary(body)
48 self._connection.select_policy(
49 algohdr.SecurityPolicyURI, algohdr.SenderCertificate, request.Parameters.SecurityMode)
51 channel = self._connection.open(request.Parameters, self.
iserver)
54 response.Parameters = channel
55 self.
send_response(request.RequestHeader.RequestHandle,
None, seqhdr, response, ua.MessageType.SecureOpen)
58 self.logger.info(
"forward publish response %s", result)
62 self._publish_result_queue.append(result)
63 self.logger.info(
"Server wants to send publish answer but no publish request is available," 64 "enqueing notification, length of result queue is %s",
67 requestdata = self._publishdata_queue.pop(0)
68 if time.time() - requestdata.timestamp < requestdata.requesthdr.TimeoutHint / 1000:
72 response.Parameters = result
74 self.
send_response(requestdata.requesthdr.RequestHandle, requestdata.algohdr, requestdata.seqhdr, response)
77 msg = self._connection.receive_from_header_and_body(header, body)
79 if header.MessageType == ua.MessageType.SecureOpen:
82 elif header.MessageType == ua.MessageType.SecureClose:
83 self._connection.close()
86 elif header.MessageType == ua.MessageType.SecureMessage:
87 return self.
process_message(msg.SecurityHeader(), msg.SequenceHeader(), msg.body())
91 ack.ReceiveBufferSize = msg.ReceiveBufferSize
92 ack.SendBufferSize = msg.SendBufferSize
93 data = self._connection.tcp_to_binary(ua.MessageType.Acknowledge, ack)
94 self.socket.write(data)
97 self.logger.warning(
"Received an error message type")
100 self.logger.warning(
"Unsupported message type: %s", header.MessageType)
101 raise utils.ServiceError(ua.StatusCodes.BadTcpMessageTypeInvalid)
105 typeid = ua.NodeId.from_binary(body)
106 requesthdr = ua.RequestHeader.from_binary(body)
109 except utils.ServiceError
as e:
112 response.ResponseHeader.ServiceResult = status
113 self.logger.info(
"sending service fault response: %s (%s)", status.doc, status.name)
114 self.
send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
118 if typeid ==
ua.NodeId(ua.ObjectIds.CreateSessionRequest_Encoding_DefaultBinary):
119 self.logger.info(
"Create session request")
120 params = ua.CreateSessionParameters.from_binary(body)
123 self.
session = self.iserver.create_session(self.
name, external=
True)
125 sessiondata = self.session.create_session(params, sockname=self.
sockname)
128 response.Parameters = sessiondata
129 response.Parameters.ServerCertificate = self._connection._security_policy.client_certificate
130 if self._connection._security_policy.server_certificate
is None:
131 data = params.ClientNonce
133 data = self._connection._security_policy.server_certificate + params.ClientNonce
134 response.Parameters.ServerSignature.Signature =\
135 self._connection._security_policy.asymmetric_cryptography.signature(data)
137 response.Parameters.ServerSignature.Algorithm =
"http://www.w3.org/2000/09/xmldsig#rsa-sha1" 139 self.logger.info(
"sending create sesssion response")
140 self.
send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
142 elif typeid ==
ua.NodeId(ua.ObjectIds.CloseSessionRequest_Encoding_DefaultBinary):
143 self.logger.info(
"Close session request")
144 deletesubs = ua.ua_binary.Primitives.Boolean.unpack(body)
146 self.session.close_session(deletesubs)
149 self.logger.info(
"sending close sesssion response")
150 self.
send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
152 elif typeid ==
ua.NodeId(ua.ObjectIds.ActivateSessionRequest_Encoding_DefaultBinary):
153 self.logger.info(
"Activate session request")
154 params = ua.ActivateSessionParameters.from_binary(body)
157 self.logger.info(
"request to activate non-existing session")
158 raise utils.ServiceError(ua.StatusCodes.BadSessionIdInvalid)
160 if self._connection._security_policy.client_certificate
is None:
161 data = self.session.nonce
163 data = self._connection._security_policy.client_certificate + self.session.nonce
164 self._connection._security_policy.asymmetric_cryptography.verify(data, params.ClientSignature.Signature)
166 result = self.session.activate_session(params)
169 response.Parameters = result
171 self.logger.info(
"sending read response")
172 self.
send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
174 elif typeid ==
ua.NodeId(ua.ObjectIds.ReadRequest_Encoding_DefaultBinary):
175 self.logger.info(
"Read request")
176 params = ua.ReadParameters.from_binary(body)
178 results = self.session.read(params)
181 response.Results = results
183 self.logger.info(
"sending read response")
184 self.
send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
186 elif typeid ==
ua.NodeId(ua.ObjectIds.WriteRequest_Encoding_DefaultBinary):
187 self.logger.info(
"Write request")
188 params = ua.WriteParameters.from_binary(body)
190 results = self.session.write(params)
193 response.Results = results
195 self.logger.info(
"sending write response")
196 self.
send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
198 elif typeid ==
ua.NodeId(ua.ObjectIds.BrowseRequest_Encoding_DefaultBinary):
199 self.logger.info(
"Browse request")
200 params = ua.BrowseParameters.from_binary(body)
202 results = self.session.browse(params)
205 response.Results = results
207 self.logger.info(
"sending browse response")
208 self.
send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
210 elif typeid ==
ua.NodeId(ua.ObjectIds.GetEndpointsRequest_Encoding_DefaultBinary):
211 self.logger.info(
"get endpoints request")
212 params = ua.GetEndpointsParameters.from_binary(body)
214 endpoints = self.iserver.get_endpoints(params, sockname=self.
sockname)
217 response.Endpoints = endpoints
219 self.logger.info(
"sending get endpoints response")
220 self.
send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
222 elif typeid ==
ua.NodeId(ua.ObjectIds.FindServersRequest_Encoding_DefaultBinary):
223 self.logger.info(
"find servers request")
224 params = ua.FindServersParameters.from_binary(body)
226 servers = self.iserver.find_servers(params)
229 response.Servers = servers
231 self.logger.info(
"sending find servers response")
232 self.
send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
234 elif typeid ==
ua.NodeId(ua.ObjectIds.RegisterServerRequest_Encoding_DefaultBinary):
235 self.logger.info(
"register server request")
236 serv = ua.RegisteredServer.from_binary(body)
238 self.iserver.register_server(serv)
242 self.logger.info(
"sending register server response")
243 self.
send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
245 elif typeid ==
ua.NodeId(ua.ObjectIds.RegisterServer2Request_Encoding_DefaultBinary):
246 self.logger.info(
"register server 2 request")
247 params = ua.RegisterServer2Parameters.from_binary(body)
249 results = self.iserver.register_server2(params)
252 response.ConfigurationResults = results
254 self.logger.info(
"sending register server 2 response")
255 self.
send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
257 elif typeid ==
ua.NodeId(ua.ObjectIds.TranslateBrowsePathsToNodeIdsRequest_Encoding_DefaultBinary):
258 self.logger.info(
"translate browsepaths to nodeids request")
259 params = ua.TranslateBrowsePathsToNodeIdsParameters.from_binary(body)
261 paths = self.session.translate_browsepaths_to_nodeids(params.BrowsePaths)
264 response.Results = paths
266 self.logger.info(
"sending translate browsepaths to nodeids response")
267 self.
send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
269 elif typeid ==
ua.NodeId(ua.ObjectIds.AddNodesRequest_Encoding_DefaultBinary):
270 self.logger.info(
"add nodes request")
271 params = ua.AddNodesParameters.from_binary(body)
273 results = self.session.add_nodes(params.NodesToAdd)
276 response.Results = results
278 self.logger.info(
"sending add node response")
279 self.
send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
281 elif typeid ==
ua.NodeId(ua.ObjectIds.DeleteNodesRequest_Encoding_DefaultBinary):
282 self.logger.info(
"delete nodes request")
283 params = ua.DeleteNodesParameters.from_binary(body)
285 results = self.session.delete_nodes(params)
288 response.Results = results
290 self.logger.info(
"sending delete node response")
291 self.
send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
293 elif typeid ==
ua.NodeId(ua.ObjectIds.CreateSubscriptionRequest_Encoding_DefaultBinary):
294 self.logger.info(
"create subscription request")
295 params = ua.CreateSubscriptionParameters.from_binary(body)
300 response.Parameters = result
302 self.logger.info(
"sending create subscription response")
303 self.
send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
305 elif typeid ==
ua.NodeId(ua.ObjectIds.DeleteSubscriptionsRequest_Encoding_DefaultBinary):
306 self.logger.info(
"delete subscriptions request")
307 params = ua.DeleteSubscriptionsParameters.from_binary(body)
309 results = self.session.delete_subscriptions(params.SubscriptionIds)
312 response.Results = results
314 self.logger.info(
"sending delte subscription response")
315 self.
send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
317 elif typeid ==
ua.NodeId(ua.ObjectIds.CreateMonitoredItemsRequest_Encoding_DefaultBinary):
318 self.logger.info(
"create monitored items request")
319 params = ua.CreateMonitoredItemsParameters.from_binary(body)
320 results = self.session.create_monitored_items(params)
323 response.Results = results
325 self.logger.info(
"sending create monitored items response")
326 self.
send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
328 elif typeid ==
ua.NodeId(ua.ObjectIds.ModifyMonitoredItemsRequest_Encoding_DefaultBinary):
329 self.logger.info(
"modify monitored items request")
330 params = ua.ModifyMonitoredItemsParameters.from_binary(body)
331 results = self.session.modify_monitored_items(params)
334 response.Results = results
336 self.logger.info(
"sending modify monitored items response")
337 self.
send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
339 elif typeid ==
ua.NodeId(ua.ObjectIds.DeleteMonitoredItemsRequest_Encoding_DefaultBinary):
340 self.logger.info(
"delete monitored items request")
341 params = ua.DeleteMonitoredItemsParameters.from_binary(body)
343 results = self.session.delete_monitored_items(params)
346 response.Results = results
348 self.logger.info(
"sending delete monitored items response")
349 self.
send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
351 elif typeid ==
ua.NodeId(ua.ObjectIds.HistoryReadRequest_Encoding_DefaultBinary):
352 self.logger.info(
"history read request")
353 params = ua.HistoryReadParameters.from_binary(body)
355 results = self.session.history_read(params)
358 response.Results = results
360 self.logger.info(
"sending history read response")
361 self.
send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
363 elif typeid ==
ua.NodeId(ua.ObjectIds.RegisterNodesRequest_Encoding_DefaultBinary):
364 self.logger.info(
"register nodes request")
365 params = ua.RegisterNodesParameters.from_binary(body)
366 self.logger.info(
"Node registration not implemented")
369 response.Parameters.RegisteredNodeIds = params.NodesToRegister
371 self.logger.info(
"sending register nodes response")
372 self.
send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
374 elif typeid ==
ua.NodeId(ua.ObjectIds.UnregisterNodesRequest_Encoding_DefaultBinary):
375 self.logger.info(
"unregister nodes request")
376 params = ua.UnregisterNodesParameters.from_binary(body)
380 self.logger.info(
"sending unregister nodes response")
381 self.
send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
383 elif typeid ==
ua.NodeId(ua.ObjectIds.PublishRequest_Encoding_DefaultBinary):
384 self.logger.info(
"publish request")
389 params = ua.PublishParameters.from_binary(body)
392 data.requesthdr = requesthdr
394 data.algohdr = algohdr
396 self._publishdata_queue.append(data)
398 result = self._publish_result_queue.pop(0)
400 self.session.publish(params.SubscriptionAcknowledgements)
401 self.logger.info(
"publish forward to server")
403 elif typeid ==
ua.NodeId(ua.ObjectIds.RepublishRequest_Encoding_DefaultBinary):
404 self.logger.info(
"re-publish request")
406 params = ua.RepublishParameters.from_binary(body)
407 msg = self.session.republish(params)
410 response.NotificationMessage = msg
412 self.
send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
414 elif typeid ==
ua.NodeId(ua.ObjectIds.CloseSecureChannelRequest_Encoding_DefaultBinary):
415 self.logger.info(
"close secure channel request")
416 self._connection.close()
418 self.
send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
421 elif typeid ==
ua.NodeId(ua.ObjectIds.CallRequest_Encoding_DefaultBinary):
422 self.logger.info(
"call request")
424 params = ua.CallParameters.from_binary(body)
426 results = self.session.call(params.MethodsToCall)
429 response.Results = results
431 self.
send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
434 self.logger.warning(
"Unknown message received %s", typeid)
435 raise utils.ServiceError(ua.StatusCodes.BadNotImplemented)
441 to be called when client has disconnected to ensure we really close 444 print(
"Cleanup client connection: ", self.
name)
446 self.session.close_session(
True)
def _process_message(self, typeid, requesthdr, algohdr, seqhdr, body)
def send_response(self, requesthandle, algohdr, seqhdr, response, msgtype=ua.MessageType.SecureMessage)
def process_message(self, algohdr, seqhdr, body)
def set_policies(self, policies)
def __init__(self, internal_server, socket)
def process(self, header, body)
def open_secure_channel(self, algohdr, seqhdr, body)
def forward_publish_response(self, result)