uaprocessor.py
Go to the documentation of this file.
1 
2 import logging
3 from threading import RLock, Lock
4 import time
5 
6 from opcua import ua
7 from opcua.common import utils
8 
9 
10 class PublishRequestData(object):
11 
12  def __init__(self):
13  self.requesthdr = None
14  self.algohdr = None
15  self.seqhdr = None
16  self.timestamp = time.time()
17 
18 
19 class UaProcessor(object):
20 
21  def __init__(self, internal_server, socket):
22  self.logger = logging.getLogger(__name__)
23  self.iserver = internal_server
24  self.name = socket.get_extra_info('peername')
25  self.sockname = socket.get_extra_info('sockname')
26  self.session = None
27  self.socket = socket
28  self._socketlock = Lock()
29  self._datalock = RLock()
31  self._publish_result_queue = [] # used when we need to wait for PublishRequest
33 
34  def set_policies(self, policies):
35  self._connection.set_policy_factories(policies)
36 
37  def send_response(self, requesthandle, algohdr, seqhdr, response, msgtype=ua.MessageType.SecureMessage):
38  with self._socketlock:
39  response.ResponseHeader.RequestHandle = requesthandle
40  data = self._connection.message_to_binary(
41  response.to_binary(), message_type=msgtype, request_id=seqhdr.RequestId, algohdr=algohdr)
42 
43  self.socket.write(data)
44 
45  def open_secure_channel(self, algohdr, seqhdr, body):
46  request = ua.OpenSecureChannelRequest.from_binary(body)
47 
48  self._connection.select_policy(
49  algohdr.SecurityPolicyURI, algohdr.SenderCertificate, request.Parameters.SecurityMode)
50 
51  channel = self._connection.open(request.Parameters, self.iserver)
52  # send response
53  response = ua.OpenSecureChannelResponse()
54  response.Parameters = channel
55  self.send_response(request.RequestHeader.RequestHandle, None, seqhdr, response, ua.MessageType.SecureOpen)
56 
57  def forward_publish_response(self, result):
58  self.logger.info("forward publish response %s", result)
59  with self._datalock:
60  while True:
61  if len(self._publishdata_queue) == 0:
62  self._publish_result_queue.append(result)
63  self.logger.info("Server wants to send publish answer but no publish request is available,"
64  "enqueing notification, length of result queue is %s",
65  len(self._publish_result_queue))
66  return
67  requestdata = self._publishdata_queue.pop(0)
68  if time.time() - requestdata.timestamp < requestdata.requesthdr.TimeoutHint / 1000:
69  break
70 
71  response = ua.PublishResponse()
72  response.Parameters = result
73 
74  self.send_response(requestdata.requesthdr.RequestHandle, requestdata.algohdr, requestdata.seqhdr, response)
75 
76  def process(self, header, body):
77  msg = self._connection.receive_from_header_and_body(header, body)
78  if isinstance(msg, ua.Message):
79  if header.MessageType == ua.MessageType.SecureOpen:
80  self.open_secure_channel(msg.SecurityHeader(), msg.SequenceHeader(), msg.body())
81 
82  elif header.MessageType == ua.MessageType.SecureClose:
83  self._connection.close()
84  return False
85 
86  elif header.MessageType == ua.MessageType.SecureMessage:
87  return self.process_message(msg.SecurityHeader(), msg.SequenceHeader(), msg.body())
88 
89  elif isinstance(msg, ua.Hello):
90  ack = ua.Acknowledge()
91  ack.ReceiveBufferSize = msg.ReceiveBufferSize
92  ack.SendBufferSize = msg.SendBufferSize
93  data = self._connection.tcp_to_binary(ua.MessageType.Acknowledge, ack)
94  self.socket.write(data)
95 
96  elif isinstance(msg, ua.ErrorMessage):
97  self.logger.warning("Received an error message type")
98 
99  else:
100  self.logger.warning("Unsupported message type: %s", header.MessageType)
101  raise utils.ServiceError(ua.StatusCodes.BadTcpMessageTypeInvalid)
102  return True
103 
104  def process_message(self, algohdr, seqhdr, body):
105  typeid = ua.NodeId.from_binary(body)
106  requesthdr = ua.RequestHeader.from_binary(body)
107  try:
108  return self._process_message(typeid, requesthdr, algohdr, seqhdr, body)
109  except utils.ServiceError as e:
110  status = ua.StatusCode(e.code)
111  response = ua.ServiceFault()
112  response.ResponseHeader.ServiceResult = status
113  self.logger.info("sending service fault response: %s (%s)", status.doc, status.name)
114  self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
115  return True
116 
117  def _process_message(self, typeid, requesthdr, algohdr, seqhdr, body):
118  if typeid == ua.NodeId(ua.ObjectIds.CreateSessionRequest_Encoding_DefaultBinary):
119  self.logger.info("Create session request")
120  params = ua.CreateSessionParameters.from_binary(body)
121 
122  # create the session on server
123  self.session = self.iserver.create_session(self.name, external=True)
124  # get a session creation result to send back
125  sessiondata = self.session.create_session(params, sockname=self.sockname)
126 
127  response = ua.CreateSessionResponse()
128  response.Parameters = sessiondata
129  response.Parameters.ServerCertificate = self._connection._security_policy.client_certificate
130  if self._connection._security_policy.server_certificate is None:
131  data = params.ClientNonce
132  else:
133  data = self._connection._security_policy.server_certificate + params.ClientNonce
134  response.Parameters.ServerSignature.Signature =\
135  self._connection._security_policy.asymmetric_cryptography.signature(data)
136 
137  response.Parameters.ServerSignature.Algorithm = "http://www.w3.org/2000/09/xmldsig#rsa-sha1"
138 
139  self.logger.info("sending create sesssion response")
140  self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
141 
142  elif typeid == ua.NodeId(ua.ObjectIds.CloseSessionRequest_Encoding_DefaultBinary):
143  self.logger.info("Close session request")
144  deletesubs = ua.ua_binary.Primitives.Boolean.unpack(body)
145 
146  self.session.close_session(deletesubs)
147 
148  response = ua.CloseSessionResponse()
149  self.logger.info("sending close sesssion response")
150  self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
151 
152  elif typeid == ua.NodeId(ua.ObjectIds.ActivateSessionRequest_Encoding_DefaultBinary):
153  self.logger.info("Activate session request")
154  params = ua.ActivateSessionParameters.from_binary(body)
155 
156  if not self.session:
157  self.logger.info("request to activate non-existing session")
158  raise utils.ServiceError(ua.StatusCodes.BadSessionIdInvalid)
159 
160  if self._connection._security_policy.client_certificate is None:
161  data = self.session.nonce
162  else:
163  data = self._connection._security_policy.client_certificate + self.session.nonce
164  self._connection._security_policy.asymmetric_cryptography.verify(data, params.ClientSignature.Signature)
165 
166  result = self.session.activate_session(params)
167 
168  response = ua.ActivateSessionResponse()
169  response.Parameters = result
170 
171  self.logger.info("sending read response")
172  self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
173 
174  elif typeid == ua.NodeId(ua.ObjectIds.ReadRequest_Encoding_DefaultBinary):
175  self.logger.info("Read request")
176  params = ua.ReadParameters.from_binary(body)
177 
178  results = self.session.read(params)
179 
180  response = ua.ReadResponse()
181  response.Results = results
182 
183  self.logger.info("sending read response")
184  self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
185 
186  elif typeid == ua.NodeId(ua.ObjectIds.WriteRequest_Encoding_DefaultBinary):
187  self.logger.info("Write request")
188  params = ua.WriteParameters.from_binary(body)
189 
190  results = self.session.write(params)
191 
192  response = ua.WriteResponse()
193  response.Results = results
194 
195  self.logger.info("sending write response")
196  self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
197 
198  elif typeid == ua.NodeId(ua.ObjectIds.BrowseRequest_Encoding_DefaultBinary):
199  self.logger.info("Browse request")
200  params = ua.BrowseParameters.from_binary(body)
201 
202  results = self.session.browse(params)
203 
204  response = ua.BrowseResponse()
205  response.Results = results
206 
207  self.logger.info("sending browse response")
208  self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
209 
210  elif typeid == ua.NodeId(ua.ObjectIds.GetEndpointsRequest_Encoding_DefaultBinary):
211  self.logger.info("get endpoints request")
212  params = ua.GetEndpointsParameters.from_binary(body)
213 
214  endpoints = self.iserver.get_endpoints(params, sockname=self.sockname)
215 
216  response = ua.GetEndpointsResponse()
217  response.Endpoints = endpoints
218 
219  self.logger.info("sending get endpoints response")
220  self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
221 
222  elif typeid == ua.NodeId(ua.ObjectIds.FindServersRequest_Encoding_DefaultBinary):
223  self.logger.info("find servers request")
224  params = ua.FindServersParameters.from_binary(body)
225 
226  servers = self.iserver.find_servers(params)
227 
228  response = ua.FindServersResponse()
229  response.Servers = servers
230 
231  self.logger.info("sending find servers response")
232  self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
233 
234  elif typeid == ua.NodeId(ua.ObjectIds.RegisterServerRequest_Encoding_DefaultBinary):
235  self.logger.info("register server request")
236  serv = ua.RegisteredServer.from_binary(body)
237 
238  self.iserver.register_server(serv)
239 
240  response = ua.RegisterServerResponse()
241 
242  self.logger.info("sending register server response")
243  self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
244 
245  elif typeid == ua.NodeId(ua.ObjectIds.RegisterServer2Request_Encoding_DefaultBinary):
246  self.logger.info("register server 2 request")
247  params = ua.RegisterServer2Parameters.from_binary(body)
248 
249  results = self.iserver.register_server2(params)
250 
251  response = ua.RegisterServer2Response()
252  response.ConfigurationResults = results
253 
254  self.logger.info("sending register server 2 response")
255  self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
256 
257  elif typeid == ua.NodeId(ua.ObjectIds.TranslateBrowsePathsToNodeIdsRequest_Encoding_DefaultBinary):
258  self.logger.info("translate browsepaths to nodeids request")
259  params = ua.TranslateBrowsePathsToNodeIdsParameters.from_binary(body)
260 
261  paths = self.session.translate_browsepaths_to_nodeids(params.BrowsePaths)
262 
264  response.Results = paths
265 
266  self.logger.info("sending translate browsepaths to nodeids response")
267  self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
268 
269  elif typeid == ua.NodeId(ua.ObjectIds.AddNodesRequest_Encoding_DefaultBinary):
270  self.logger.info("add nodes request")
271  params = ua.AddNodesParameters.from_binary(body)
272 
273  results = self.session.add_nodes(params.NodesToAdd)
274 
275  response = ua.AddNodesResponse()
276  response.Results = results
277 
278  self.logger.info("sending add node response")
279  self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
280 
281  elif typeid == ua.NodeId(ua.ObjectIds.DeleteNodesRequest_Encoding_DefaultBinary):
282  self.logger.info("delete nodes request")
283  params = ua.DeleteNodesParameters.from_binary(body)
284 
285  results = self.session.delete_nodes(params)
286 
287  response = ua.DeleteNodesResponse()
288  response.Results = results
289 
290  self.logger.info("sending delete node response")
291  self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
292 
293  elif typeid == ua.NodeId(ua.ObjectIds.CreateSubscriptionRequest_Encoding_DefaultBinary):
294  self.logger.info("create subscription request")
295  params = ua.CreateSubscriptionParameters.from_binary(body)
296 
297  result = self.session.create_subscription(params, self.forward_publish_response)
298 
299  response = ua.CreateSubscriptionResponse()
300  response.Parameters = result
301 
302  self.logger.info("sending create subscription response")
303  self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
304 
305  elif typeid == ua.NodeId(ua.ObjectIds.DeleteSubscriptionsRequest_Encoding_DefaultBinary):
306  self.logger.info("delete subscriptions request")
307  params = ua.DeleteSubscriptionsParameters.from_binary(body)
308 
309  results = self.session.delete_subscriptions(params.SubscriptionIds)
310 
311  response = ua.DeleteSubscriptionsResponse()
312  response.Results = results
313 
314  self.logger.info("sending delte subscription response")
315  self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
316 
317  elif typeid == ua.NodeId(ua.ObjectIds.CreateMonitoredItemsRequest_Encoding_DefaultBinary):
318  self.logger.info("create monitored items request")
319  params = ua.CreateMonitoredItemsParameters.from_binary(body)
320  results = self.session.create_monitored_items(params)
321 
323  response.Results = results
324 
325  self.logger.info("sending create monitored items response")
326  self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
327 
328  elif typeid == ua.NodeId(ua.ObjectIds.ModifyMonitoredItemsRequest_Encoding_DefaultBinary):
329  self.logger.info("modify monitored items request")
330  params = ua.ModifyMonitoredItemsParameters.from_binary(body)
331  results = self.session.modify_monitored_items(params)
332 
334  response.Results = results
335 
336  self.logger.info("sending modify monitored items response")
337  self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
338 
339  elif typeid == ua.NodeId(ua.ObjectIds.DeleteMonitoredItemsRequest_Encoding_DefaultBinary):
340  self.logger.info("delete monitored items request")
341  params = ua.DeleteMonitoredItemsParameters.from_binary(body)
342 
343  results = self.session.delete_monitored_items(params)
344 
346  response.Results = results
347 
348  self.logger.info("sending delete monitored items response")
349  self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
350 
351  elif typeid == ua.NodeId(ua.ObjectIds.HistoryReadRequest_Encoding_DefaultBinary):
352  self.logger.info("history read request")
353  params = ua.HistoryReadParameters.from_binary(body)
354 
355  results = self.session.history_read(params)
356 
357  response = ua.HistoryReadResponse()
358  response.Results = results
359 
360  self.logger.info("sending history read response")
361  self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
362 
363  elif typeid == ua.NodeId(ua.ObjectIds.RegisterNodesRequest_Encoding_DefaultBinary):
364  self.logger.info("register nodes request")
365  params = ua.RegisterNodesParameters.from_binary(body)
366  self.logger.info("Node registration not implemented")
367 
368  response = ua.RegisterNodesResponse()
369  response.Parameters.RegisteredNodeIds = params.NodesToRegister
370 
371  self.logger.info("sending register nodes response")
372  self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
373 
374  elif typeid == ua.NodeId(ua.ObjectIds.UnregisterNodesRequest_Encoding_DefaultBinary):
375  self.logger.info("unregister nodes request")
376  params = ua.UnregisterNodesParameters.from_binary(body)
377 
378  response = ua.UnregisterNodesResponse()
379 
380  self.logger.info("sending unregister nodes response")
381  self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
382 
383  elif typeid == ua.NodeId(ua.ObjectIds.PublishRequest_Encoding_DefaultBinary):
384  self.logger.info("publish request")
385 
386  if not self.session:
387  return False
388 
389  params = ua.PublishParameters.from_binary(body)
390 
391  data = PublishRequestData()
392  data.requesthdr = requesthdr
393  data.seqhdr = seqhdr
394  data.algohdr = algohdr
395  with self._datalock:
396  self._publishdata_queue.append(data) # will be used to send publish answers from server
397  if self._publish_result_queue:
398  result = self._publish_result_queue.pop(0)
399  self.forward_publish_response(result)
400  self.session.publish(params.SubscriptionAcknowledgements)
401  self.logger.info("publish forward to server")
402 
403  elif typeid == ua.NodeId(ua.ObjectIds.RepublishRequest_Encoding_DefaultBinary):
404  self.logger.info("re-publish request")
405 
406  params = ua.RepublishParameters.from_binary(body)
407  msg = self.session.republish(params)
408 
409  response = ua.RepublishResponse()
410  response.NotificationMessage = msg
411 
412  self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
413 
414  elif typeid == ua.NodeId(ua.ObjectIds.CloseSecureChannelRequest_Encoding_DefaultBinary):
415  self.logger.info("close secure channel request")
416  self._connection.close()
417  response = ua.CloseSecureChannelResponse()
418  self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
419  return False
420 
421  elif typeid == ua.NodeId(ua.ObjectIds.CallRequest_Encoding_DefaultBinary):
422  self.logger.info("call request")
423 
424  params = ua.CallParameters.from_binary(body)
425 
426  results = self.session.call(params.MethodsToCall)
427 
428  response = ua.CallResponse()
429  response.Results = results
430 
431  self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
432 
433  else:
434  self.logger.warning("Unknown message received %s", typeid)
435  raise utils.ServiceError(ua.StatusCodes.BadNotImplemented)
436 
437  return True
438 
439  def close(self):
440  """
441  to be called when client has disconnected to ensure we really close
442  everything we should
443  """
444  print("Cleanup client connection: ", self.name)
445  if self.session:
446  self.session.close_session(True)
def _process_message(self, typeid, requesthdr, algohdr, seqhdr, body)
Definition: uaprocessor.py:117
def send_response(self, requesthandle, algohdr, seqhdr, response, msgtype=ua.MessageType.SecureMessage)
Definition: uaprocessor.py:37
def process_message(self, algohdr, seqhdr, body)
Definition: uaprocessor.py:104
def set_policies(self, policies)
Definition: uaprocessor.py:34
def __init__(self, internal_server, socket)
Definition: uaprocessor.py:21
def process(self, header, body)
Definition: uaprocessor.py:76
def open_secure_channel(self, algohdr, seqhdr, body)
Definition: uaprocessor.py:45
def forward_publish_response(self, result)
Definition: uaprocessor.py:57


ros_opcua_impl_python_opcua
Author(s): Denis Štogl , Daniel Draper
autogenerated on Tue Jan 19 2021 03:12:44