opc_tcp_processor.cpp
Go to the documentation of this file.
1 
11 #include "opc_tcp_processor.h"
12 
13 #include "opcua_protocol.h"
14 
15 #include <opc/common/uri_facade.h>
17 #include <opc/ua/node.h>
30 
31 #include <chrono>
32 #include <iostream>
33 #include <list>
34 #include <mutex>
35 #include <stdexcept>
36 #include <sstream>
37 #include <queue>
38 
39 
40 namespace OpcUa
41 {
42 namespace Server
43 {
44 
45 using namespace OpcUa::Binary;
46 
47 OpcTcpMessages::OpcTcpMessages(OpcUa::Services::SharedPtr server, OpcUa::OutputChannel::SharedPtr outputChannel, const Common::Logger::SharedPtr & logger)
48  : Server(server)
49  , OutputChannel(outputChannel)
50  // do not create a reference loop - if OutputStream is called with a
51  // shared_ptr it holds a strong reference to it! So call it with a dereferenced
52  // pointer
53  , OutputStream(*outputChannel)
54  , Logger(logger)
55  , ChannelId(1)
56  , TokenId(2)
57  , SessionId(GenerateSessionId())
58  , SequenceNb(0)
59 {
60  LOG_INFO(Logger, "opc_tcp_processor | log level: {}", Logger->level());
61  LOG_INFO(Logger, "opc_tcp_processor | SessionId; {}", SessionId);
62 }
63 
64 
66 {
67  // This is a hack, we cannot leave subscriptions running since they have a callback to us
68  try
69  {
71  }
72 
73  catch (const std::exception & exc)
74  {
75  LOG_ERROR(Logger, "opc_tcp_processor | stopping OpcTcpMessages failed: {}", exc.what());
76  }
77 }
78 
80 {
81  std::lock_guard<std::mutex> lock(ProcessMutex);
82 
83  switch (msgType)
84  {
85  case MT_HELLO:
86  {
87  LOG_DEBUG(Logger, "opc_tcp_processor | accepted hello message");
88 
89  HelloClient(iStream, OutputStream);
90  break;
91  }
92 
93 
94  case MT_SECURE_OPEN:
95  {
96  LOG_DEBUG(Logger, "opc_tcp_processor | opening secure channel");
97 
98  OpenChannel(iStream, OutputStream);
99  break;
100  }
101 
102  case MT_SECURE_CLOSE:
103  {
104  LOG_DEBUG(Logger, "opc_tcp_processor | closing secure channel");
105 
106  CloseChannel(iStream);
107  return false;
108  }
109 
110  case MT_SECURE_MESSAGE:
111  {
112  LOG_DEBUG(Logger, "opc_tcp_processor | processing secure message");
113 
114  ProcessRequest(iStream, OutputStream);
115  break;
116  }
117 
118  case MT_ACKNOWLEDGE:
119  {
120  LOG_ERROR(Logger, "opc_tcp_processor | received acknowledge from client: this should not have happend...");
121 
122  throw std::logic_error("Thank to client about acknowledge.");
123  }
124 
125  case MT_ERROR:
126  {
127  LOG_ERROR(Logger, "opc_tcp_processor | client signaled an error");
128 
129  throw std::logic_error("It is very nice get to know server about error in the client.");
130  }
131 
132  default:
133  {
134  LOG_ERROR(Logger, "opc_tcp_processor | unknown message type '{}' received", msgType);
135 
136  throw std::logic_error("unknown message type received.");
137  }
138  }
139 
140  return true;
141 }
142 
144 {
145  std::lock_guard<std::mutex> lock(ProcessMutex);
146 
147  LOG_DEBUG(Logger, "opc_tcp_processor | sending PublishResult to client");
148 
149  // get a shared_ptr from weak_ptr to make sure OutputChannel
150  // does not get deleted before end of operation
151  OpcUa::OutputChannel::SharedPtr outputChannel = OutputChannel.lock();
152  // test if OutputChannel was still active when calling lock
153  if (!outputChannel) {
154  LOG_WARN(Logger, "opc_tcp_processor | parent instance already deleted");
155  return;
156  }
157 
158  if (PublishRequestQueue.empty())
159  {
160  LOG_WARN(Logger, "error trying to send publish response while we do not have data from a PublishRequest");
161  return;
162  }
163 
164  PublishRequestElement requestData = PublishRequestQueue.front();
165  PublishRequestQueue.pop();
166 
167  PublishResponse response;
168 
169  FillResponseHeader(requestData.requestHeader, response.Header);
170  response.Parameters = result;
171 
172  requestData.sequence.SequenceNumber = ++SequenceNb;
173 
175  secureHeader.AddSize(RawSize(requestData.algorithmHeader));
176  secureHeader.AddSize(RawSize(requestData.sequence));
177  secureHeader.AddSize(RawSize(response));
178 
179  LOG_DEBUG(Logger, "opc_tcp_processor | sending PublishResponse with: {} PublishResults", response.Parameters.NotificationMessage.NotificationData.size());
180 
181  OutputStream << secureHeader << requestData.algorithmHeader << requestData.sequence << response << flush;
182 }
183 
185 {
186  using namespace OpcUa::Binary;
187 
188  LOG_DEBUG(Logger, "opc_tcp_processor | reading hello message");
189 
190  Hello hello;
191  istream >> hello;
192 
193  Acknowledge ack;
194  ack.ReceiveBufferSize = hello.ReceiveBufferSize;
195  ack.SendBufferSize = hello.SendBufferSize;
196  ack.MaxMessageSize = hello.MaxMessageSize;
197  ack.MaxChunkCount = 1;
198 
199  Header ackHeader(MT_ACKNOWLEDGE, CHT_SINGLE);
200  ackHeader.AddSize(RawSize(ack));
201 
202  LOG_DEBUG(Logger, "opc_tcp_processor | sending answer");
203 
204  ostream << ackHeader << ack << flush;
205 }
206 
208 {
209  uint32_t channelId = 0;
210  istream >> channelId;
211  AsymmetricAlgorithmHeader algorithmHeader;
212  istream >> algorithmHeader;
213 
214  if (algorithmHeader.SecurityPolicyUri != "http://opcfoundation.org/UA/SecurityPolicy#None")
215  {
216  throw std::logic_error(std::string("Client want to create secure channel with unsupported policy '") + algorithmHeader.SecurityPolicyUri + std::string("'"));
217  }
218 
219  SequenceHeader sequence;
220  istream >> sequence;
221 
222  OpenSecureChannelRequest request;
223  istream >> request;
224 
225  if (request.Parameters.SecurityMode != MessageSecurityMode::None)
226  {
227  throw std::logic_error("Unsupported security mode.");
228  }
229 
230  if (request.Parameters.RequestType == SecurityTokenRequestType::Renew)
231  {
232  //FIXME:Should check that channel has been issued first
233  ++TokenId;
234  }
235 
236  sequence.SequenceNumber = ++SequenceNb;
237 
238  OpenSecureChannelResponse response;
239  FillResponseHeader(request.Header, response.Header);
243  response.ChannelSecurityToken.RevisedLifetime = request.Parameters.RequestLifeTime;
244 
246  responseHeader.AddSize(RawSize(algorithmHeader));
247  responseHeader.AddSize(RawSize(sequence));
248  responseHeader.AddSize(RawSize(response));
249  ostream << responseHeader << algorithmHeader << sequence << response << flush;
250 }
251 
253 {
254  uint32_t channelId = 0;
255  istream >> channelId;
256 
257  SymmetricAlgorithmHeader algorithmHeader;
258  istream >> algorithmHeader;
259 
260  SequenceHeader sequence;
261  istream >> sequence;
262 
264  istream >> request;
265 }
266 
268 {
269  uint32_t channelId = 0;
270  istream >> channelId;
271 
272  SymmetricAlgorithmHeader algorithmHeader;
273  istream >> algorithmHeader;
274 
275  SequenceHeader sequence;
276  istream >> sequence;
277 
278  NodeId typeId;
279  istream >> typeId;
280 
281  RequestHeader requestHeader;
282  istream >> requestHeader;
283 
284  sequence.SequenceNumber = ++SequenceNb;
285  /*
286  const std::size_t receivedSize =
287  RawSize(channelId) +
288  RawSize(algorithmHeader) +
289  RawSize(sequence) +
290  RawSize(typeId) +
291  RawSize(requestHeader);
292  */
293  const OpcUa::MessageId message = GetMessageId(typeId);
294 
295  switch (message)
296  {
298  {
299  LOG_DEBUG(Logger, "opc_tcp_processor | processing 'Get Endpoints' request");
300 
301  GetEndpointsParameters filter;
302  istream >> filter;
303 
304  GetEndpointsResponse response;
305  FillResponseHeader(requestHeader, response.Header);
306  response.Endpoints = Server->Endpoints()->GetEndpoints(filter);
307 
309  secureHeader.AddSize(RawSize(algorithmHeader));
310  secureHeader.AddSize(RawSize(sequence));
311  secureHeader.AddSize(RawSize(response));
312  ostream << secureHeader << algorithmHeader << sequence << response << flush;
313  return;
314  }
315 
317  {
318  LOG_DEBUG(Logger, "opc_tcp_processor | processing 'Find Servers' request");
319 
320  FindServersParameters params;
321  istream >> params;
322 
323  FindServersResponse response;
324  FillResponseHeader(requestHeader, response.Header);
325  response.Data.Descriptions = Server->Endpoints()->FindServers(params);
326 
328  secureHeader.AddSize(RawSize(algorithmHeader));
329  secureHeader.AddSize(RawSize(sequence));
330  secureHeader.AddSize(RawSize(response));
331  ostream << secureHeader << algorithmHeader << sequence << response << flush;
332  return;
333  }
334 
336  {
337  LOG_DEBUG(Logger, "opc_tcp_processor | processing 'Browse' request");
338 
339  NodesQuery query;
340  istream >> query;
341 
342  BrowseResponse response;
343  response.Results = Server->Views()->Browse(query);
344 
345  FillResponseHeader(requestHeader, response.Header);
346 
348  secureHeader.AddSize(RawSize(algorithmHeader));
349  secureHeader.AddSize(RawSize(sequence));
350  secureHeader.AddSize(RawSize(response));
351  ostream << secureHeader << algorithmHeader << sequence << response << flush;
352  return;
353  }
354 
355  case OpcUa::READ_REQUEST:
356  {
357  ReadParameters params;
358  istream >> params;
359 
360  if (Logger && Logger->should_log(spdlog::level::debug))
361  {
362  Logger->debug("opc_tcp_processor | processing 'Read' request for Node:");
363 
364  for (ReadValueId id : params.AttributesToRead)
365  {
366  std::string name = "unknown";
367  {
368  Node node(Server, id.NodeId);
369  name = node.GetBrowseName().Name;
370  }
371  Logger->debug("opc_tcp_processor | {} ({})", id.NodeId, name);
372  }
373  }
374 
375  ReadResponse response;
376  FillResponseHeader(requestHeader, response.Header);
377  std::vector<DataValue> values;
378 
379  if (std::shared_ptr<OpcUa::AttributeServices> service = Server->Attributes())
380  {
381  values = service->Read(params);
382  }
383 
384  else
385  {
386  for (auto attribId : params.AttributesToRead)
387  {
388  DataValue value;
391  values.push_back(value);
392  }
393  }
394 
395  response.Results = values;
396 
398  secureHeader.AddSize(RawSize(algorithmHeader));
399  secureHeader.AddSize(RawSize(sequence));
400  secureHeader.AddSize(RawSize(response));
401  ostream << secureHeader << algorithmHeader << sequence << response << flush;
402 
403  return;
404  }
405 
407  {
408  LOG_DEBUG(Logger, "opc_tcp_processor | processing 'Write' request");
409 
410  WriteParameters params;
411  istream >> params;
412 
413  WriteResponse response;
414  FillResponseHeader(requestHeader, response.Header);
415  std::vector<DataValue> values;
416 
417  if (std::shared_ptr<OpcUa::AttributeServices> service = Server->Attributes())
418  {
419  response.Results = service->Write(params.NodesToWrite);
420  }
421 
422  else
423  {
424  response.Results = std::vector<StatusCode>(params.NodesToWrite.size(), OpcUa::StatusCode::BadNotImplemented);
425  }
426 
428  secureHeader.AddSize(RawSize(algorithmHeader));
429  secureHeader.AddSize(RawSize(sequence));
430  secureHeader.AddSize(RawSize(response));
431  ostream << secureHeader << algorithmHeader << sequence << response << flush;
432 
433  return;
434  }
435 
437  {
438  LOG_DEBUG(Logger, "opc_tcp_processor | processing 'Translate Browse Paths To Node Ids' request");
439 
441  istream >> params;
442 
443  if (Logger && Logger->should_log(spdlog::level::debug))
444  {
445  for (BrowsePath path : params.BrowsePaths)
446  {
447  std::stringstream result;
448  result << path.StartingNode << ":";
449 
450  for (RelativePathElement el : path.Path.Elements)
451  {
452  result << "/" << el.TargetName ;
453  }
454  Logger->debug("opc_tcp_processor | requested path is: {}", result.str());
455  }
456  }
457 
458  std::vector<BrowsePathResult> result = Server->Views()->TranslateBrowsePathsToNodeIds(params);
459 
460  if (Logger && Logger->should_log(spdlog::level::debug))
461  {
462  for (BrowsePathResult res : result)
463  {
464  std::stringstream target;
465  for (BrowsePathTarget path : res.Targets)
466  {
467  target << path.Node ;
468  }
469  Logger->debug("opc_tcp_processor | result of browsePath is: {}, target is: {}", (uint32_t)res.Status, target.str());
470  }
471  }
472 
474  FillResponseHeader(requestHeader, response.Header);
475  response.Result.Paths = result;
477  secureHeader.AddSize(RawSize(algorithmHeader));
478  secureHeader.AddSize(RawSize(sequence));
479  secureHeader.AddSize(RawSize(response));
480 
481  LOG_DEBUG(Logger, "opc_tcp_processor | sending response to 'Translate Browse Paths To Node Ids' request");
482 
483  ostream << secureHeader << algorithmHeader << sequence << response << flush;
484  return;
485  }
486 
487 
489  {
490  LOG_DEBUG(Logger, "opc_tcp_processor | processing 'Create Session' request");
491 
493  istream >> params;
494 
495  CreateSessionResponse response;
496  FillResponseHeader(requestHeader, response.Header);
497 
498  response.Parameters.SessionId = SessionId;
500  response.Parameters.RevisedSessionTimeout = params.RequestedSessionTimeout;
501  response.Parameters.MaxRequestMessageSize = 65536;
503  response.Parameters.ServerEndpoints = Server->Endpoints()->GetEndpoints(epf);
504 
505 
507  secureHeader.AddSize(RawSize(algorithmHeader));
508  secureHeader.AddSize(RawSize(sequence));
509  secureHeader.AddSize(RawSize(response));
510  ostream << secureHeader << algorithmHeader << sequence << response << flush;
511 
512  return;
513  }
514 
516  {
517  LOG_DEBUG(Logger, "opc_tcp_processor | processing 'Activate Session' request");
518 
520  istream >> params;
521 
522  ActivateSessionResponse response;
523  FillResponseHeader(requestHeader, response.Header);
524 
526  secureHeader.AddSize(RawSize(algorithmHeader));
527  secureHeader.AddSize(RawSize(sequence));
528  secureHeader.AddSize(RawSize(response));
529  ostream << secureHeader << algorithmHeader << sequence << response << flush;
530  return;
531  }
532 
534  {
535  LOG_DEBUG(Logger, "opc_tcp_processor | processing 'Close Session' request");
536 
537  bool deleteSubscriptions = false;
538  istream >> deleteSubscriptions;
539 
540  if (deleteSubscriptions)
541  {
543  }
544 
545  CloseSessionResponse response;
546  FillResponseHeader(requestHeader, response.Header);
547 
549  secureHeader.AddSize(RawSize(algorithmHeader));
550  secureHeader.AddSize(RawSize(sequence));
551  secureHeader.AddSize(RawSize(response));
552  ostream << secureHeader << algorithmHeader << sequence << response << flush;
553 
554  LOG_DEBUG(Logger, "opc_tcp_processor | session closed");
555 
556  return;
557  }
558 
560  {
561  LOG_DEBUG(Logger, "opc_tcp_processor | processing 'Create Subscription' request");
562 
564  istream >> request.Parameters;
565  request.Header = requestHeader;
566 
568  FillResponseHeader(requestHeader, response.Header);
569 
570  SharedPtr self = shared_from_this();
571  response.Data = Server->Subscriptions()->CreateSubscription(request, [self](PublishResult i)
572  {
573  try
574  {
575  self->ForwardPublishResponse(i);
576  }
577 
578  catch (std::exception & ex)
579  {
580  // TODO Disconnect client!
581  LOG_WARN(self->Logger, "error forwarding PublishResult to client: {}", ex.what());
582  }
583  });
584 
585  Subscriptions.push_back(response.Data.SubscriptionId); //Keep a link to eventually delete subcriptions when exiting
586 
588  secureHeader.AddSize(RawSize(algorithmHeader));
589  secureHeader.AddSize(RawSize(sequence));
590  secureHeader.AddSize(RawSize(response));
591  ostream << secureHeader << algorithmHeader << sequence << response << flush;
592  return;
593  }
594 
596  {
597  LOG_DEBUG(Logger, "opc_tcp_processor | processing 'Modify Subscription' request");
598 
600  istream >> request.Parameters;
601  request.Header = requestHeader;
602 
603  ModifySubscriptionResponse response = Server->Subscriptions()->ModifySubscription(request.Parameters);
604  FillResponseHeader(requestHeader, response.Header);
605 
607  secureHeader.AddSize(RawSize(algorithmHeader));
608  secureHeader.AddSize(RawSize(sequence));
609  secureHeader.AddSize(RawSize(response));
610 
611  LOG_DEBUG(Logger, "opc_tcp_processor | sending response to 'Modify Subscription' request");
612 
613  ostream << secureHeader << algorithmHeader << sequence << response << flush;
614  return;
615  }
616 
618  {
619  LOG_DEBUG(Logger, "opc_tcp_processor | processing 'Delete Subscription' request");
620 
621  std::vector<uint32_t> ids;
622  istream >> ids;
623 
624  DeleteSubscriptions(ids); //remove from locale subscription lis
625 
627  FillResponseHeader(requestHeader, response.Header);
628 
629  response.Results = Server->Subscriptions()->DeleteSubscriptions(ids);
630 
632  secureHeader.AddSize(RawSize(algorithmHeader));
633  secureHeader.AddSize(RawSize(sequence));
634  secureHeader.AddSize(RawSize(response));
635 
636  LOG_DEBUG(Logger, "opc_tcp_processor | sending response to 'Delete Subscription' request");
637 
638  ostream << secureHeader << algorithmHeader << sequence << response << flush;
639  return;
640  }
641 
643  {
644  LOG_DEBUG(Logger, "opc_tcp_processor | processing 'Create Monitored Items' request");
645 
647  istream >> params;
648 
650 
651  response.Results = Server->Subscriptions()->CreateMonitoredItems(params);
652 
653  FillResponseHeader(requestHeader, response.Header);
655  secureHeader.AddSize(RawSize(algorithmHeader));
656  secureHeader.AddSize(RawSize(sequence));
657  secureHeader.AddSize(RawSize(response));
658 
659  LOG_DEBUG(Logger, "opc_tcp_processor | sending response to 'Create Monitored Items' request");
660 
661  ostream << secureHeader << algorithmHeader << sequence << response << flush;
662  return;
663  }
664 
666  {
667  LOG_DEBUG(Logger, "opc_tcp_processor | processing 'Delete Monitored Items' request");
668 
670  istream >> params;
671 
673 
674  response.Results = Server->Subscriptions()->DeleteMonitoredItems(params);
675 
676  FillResponseHeader(requestHeader, response.Header);
678  secureHeader.AddSize(RawSize(algorithmHeader));
679  secureHeader.AddSize(RawSize(sequence));
680  secureHeader.AddSize(RawSize(response));
681 
682  LOG_DEBUG(Logger, "opc_tcp_processor | sending response to 'Delete Monitored Items' request");
683 
684  ostream << secureHeader << algorithmHeader << sequence << response << flush;
685  return;
686  }
687 
688  case PUBLISH_REQUEST:
689  {
690  LOG_DEBUG(Logger, "opc_tcp_processor | processing 'Publish' request");
691 
692  PublishRequest request;
693  request.Header = requestHeader;
694  istream >> request.SubscriptionAcknowledgements;
695 
697  data.sequence = sequence;
698  data.algorithmHeader = algorithmHeader;
699  data.requestHeader = requestHeader;
700  PublishRequestQueue.push(data);
701  Server->Subscriptions()->Publish(request);
702 
703  --SequenceNb; //We do not send response, so do not increase sequence
704 
705  return;
706  }
707 
709  {
710  LOG_DEBUG(Logger, "opc_tcp_processor | processing 'Set Publishing Mode' request");
711 
713  istream >> params;
714 
715  //FIXME: forward request to internal server!!
716  SetPublishingModeResponse response;
717  FillResponseHeader(requestHeader, response.Header);
718  response.Result.Results.resize(params.SubscriptionIds.size(), StatusCode::Good);
719 
721  secureHeader.AddSize(RawSize(algorithmHeader));
722  secureHeader.AddSize(RawSize(sequence));
723  secureHeader.AddSize(RawSize(response));
724 
725  LOG_DEBUG(Logger, "opc_tcp_processor | sending response to 'Set Publishing Mode' request");
726 
727  ostream << secureHeader << algorithmHeader << sequence << response << flush;
728  return;
729  }
730 
731  case ADD_NODES_REQUEST:
732  {
733  LOG_DEBUG(Logger, "opc_tcp_processor | processing 'Add Nodes' request");
734 
735  AddNodesParameters params;
736  istream >> params;
737 
738  std::vector<AddNodesResult> results = Server->NodeManagement()->AddNodes(params.NodesToAdd);
739 
740  AddNodesResponse response;
741  FillResponseHeader(requestHeader, response.Header);
742  response.results = results;
743 
745  secureHeader.AddSize(RawSize(algorithmHeader));
746  secureHeader.AddSize(RawSize(sequence));
747  secureHeader.AddSize(RawSize(response));
748 
749  LOG_DEBUG(Logger, "opc_tcp_processor | sending response to 'Add Nodes' request");
750 
751  ostream << secureHeader << algorithmHeader << sequence << response << flush;
752  return;
753  }
754 
756  {
757  LOG_DEBUG(Logger, "opc_tcp_processor | processing 'Add References' request");
758 
760  istream >> params;
761 
762  std::vector<StatusCode> results = Server->NodeManagement()->AddReferences(params.ReferencesToAdd);
763 
764  AddReferencesResponse response;
765  FillResponseHeader(requestHeader, response.Header);
766  response.Results = results;
767 
769  secureHeader.AddSize(RawSize(algorithmHeader));
770  secureHeader.AddSize(RawSize(sequence));
771  secureHeader.AddSize(RawSize(response));
772 
773  LOG_DEBUG(Logger, "opc_tcp_processor | sending response to 'Add References' request");
774 
775  ostream << secureHeader << algorithmHeader << sequence << response << flush;
776  return;
777  }
778 
779  case REPUBLISH_REQUEST:
780  {
781  LOG_DEBUG(Logger, "opc_tcp_processor | processing 'Republish' request");
782 
783  RepublishParameters params;
784  istream >> params;
785 
786  //Not implemented so we just say we do not have that notification
787  RepublishResponse response;
788  FillResponseHeader(requestHeader, response.Header);
790 
792  secureHeader.AddSize(RawSize(algorithmHeader));
793  secureHeader.AddSize(RawSize(sequence));
794  secureHeader.AddSize(RawSize(response));
795 
796  LOG_DEBUG(Logger, "opc_tcp_processor | sending response to 'Republish' request");
797 
798  ostream << secureHeader << algorithmHeader << sequence << response << flush;
799  return;
800  }
801 
802  case CALL_REQUEST:
803  {
804  LOG_DEBUG(Logger, "opc_tcp_processor | processing 'Call' request");
805 
806  CallParameters params;
807  istream >> params;
808 
809  CallResponse response;
810  FillResponseHeader(requestHeader, response.Header);
811 
812  if (std::shared_ptr<OpcUa::MethodServices> service = Server->Method())
813  {
814  response.Results = service->Call(params.MethodsToCall);
815  }
816 
817  else
818  {
819  for (auto callMethodRequest : params.MethodsToCall)
820  {
823  response.Results.push_back(result);
824  }
825  }
826 
828  secureHeader.AddSize(RawSize(algorithmHeader));
829  secureHeader.AddSize(RawSize(sequence));
830  secureHeader.AddSize(RawSize(response));
831  ostream << secureHeader << algorithmHeader << sequence << response << flush;
832 
833  return;
834  }
835 
837  {
838  LOG_DEBUG(Logger, "opc_tcp_processor | processing 'Register Nodes' request");
839 
840  RegisterNodesRequest request;
841 
842  istream >> request.NodesToRegister;
843 
844  RegisterNodesResponse response;
845  response.Result = Server->Views()->RegisterNodes(request.NodesToRegister);
846 
847  FillResponseHeader(requestHeader, response.Header);
848 
850  secureHeader.AddSize(RawSize(algorithmHeader));
851  secureHeader.AddSize(RawSize(sequence));
852  secureHeader.AddSize(RawSize(response));
853 
854  LOG_DEBUG(Logger, "opc_tcp_processor | sending response to 'Register Nodes' request");
855 
856  ostream << secureHeader << algorithmHeader << sequence << response << flush;
857  return;
858  }
859 
861  {
862  LOG_DEBUG(Logger, "opc_tcp_processor | processing 'Unregister Nodes' request");
863 
864  UnregisterNodesRequest request;
865 
866  istream >> request.NodesToUnregister;
867 
868  UnregisterNodesResponse response;
869  Server->Views()->UnregisterNodes(request.NodesToUnregister);
870 
871  FillResponseHeader(requestHeader, response.Header);
872 
874  secureHeader.AddSize(RawSize(algorithmHeader));
875  secureHeader.AddSize(RawSize(sequence));
876  secureHeader.AddSize(RawSize(response));
877 
878  LOG_DEBUG(Logger, "opc_tcp_processor | sending response to 'Unregister Nodes' request");
879 
880  ostream << secureHeader << algorithmHeader << sequence << response << flush;
881  return;
882  }
883 
884  default:
885  {
886  ServiceFaultResponse response;
887  FillResponseHeader(requestHeader, response.Header);
889 
891  secureHeader.AddSize(RawSize(algorithmHeader));
892  secureHeader.AddSize(RawSize(sequence));
893  secureHeader.AddSize(RawSize(response));
894 
895  LOG_WARN(Logger, "opc_tcp_processor | sending 'ServiceFaultResponse' to unsupported request of id: {}", message);
896 
897  ostream << secureHeader << algorithmHeader << sequence << response << flush;
898  return;
899  }
900  }
901 }
902 
903 void OpcTcpMessages::FillResponseHeader(const RequestHeader & requestHeader, ResponseHeader & responseHeader)
904 {
905  //responseHeader.InnerDiagnostics.push_back(DiagnosticInfo());
906  responseHeader.Timestamp = DateTime::Current();
907  responseHeader.RequestHandle = requestHeader.RequestHandle;
908 }
909 
911 {
912  std::vector<uint32_t> subs;
913 
914  for (const uint32_t & subid : Subscriptions)
915  {
916  subs.push_back(subid);
917  }
918 
919  Server->Subscriptions()->DeleteSubscriptions(subs);
920  Subscriptions.clear();
921 }
922 
923 void OpcTcpMessages::DeleteSubscriptions(const std::vector<uint32_t> & ids)
924 {
925  for (auto id : ids)
926  {
927  Subscriptions.erase(std::remove_if(Subscriptions.begin(), Subscriptions.end(),
928  [&](const uint32_t d) { return (d == id) ; }), Subscriptions.end());
929  }
930 }
931 
932 } // namespace UaServer
933 } // namespace OpcUa
d
std::vector< ApplicationDescription > Descriptions
OpcUa::ResponseHeader Header
std::vector< OpcUa::NodeId > NodesToRegister
#define LOG_WARN(__logger__,...)
Definition: common/logger.h:26
QualifiedName GetBrowseName() const
Definition: node.cpp:227
ResponseHeader Header
Definition: session.h:36
OpcUa::CreateSubscriptionParameters Parameters
std::vector< AddNodesResult > results
std::vector< OpcUa::EndpointDescription > ServerEndpoints
std::vector< OpcUa::NotificationData > NotificationData
const uint8_t DATA_VALUE_STATUS_CODE
Definition: data_value.h:19
std::vector< OpcUa::StatusCode > Results
OpcUa::ResponseHeader Header
#define LOG_ERROR(__logger__,...)
Definition: common/logger.h:27
StatusCode ServiceResult
Definition: types.h:259
#define LOG_DEBUG(__logger__,...)
Definition: common/logger.h:24
std::vector< StatusCode > Results
void CloseChannel(Binary::IStreamBinary &istream)
OpcUa::NodeId AuthenticationToken
OpcUa::ResponseHeader Header
name
Definition: setup.py:38
uint32_t RequestHandle
Definition: types.h:258
std::vector< OpcUa::StatusCode > Results
OpcUa::CreateSessionResult Parameters
OStream< ChannelType > & flush(OStream< ChannelType > &os)
Definition: stream.h:147
OpcUa::ResponseHeader Header
bool ProcessMessage(Binary::MessageType msgType, Binary::IStreamBinary &iStream)
void FillResponseHeader(const RequestHeader &requestHeader, ResponseHeader &responseHeader)
OpcUa::Binary::OStreamBinary OutputStream
std::vector< BrowseResult > Results
Definition: protocol/view.h:92
OpcUa::ResponseHeader Header
Common::Logger::SharedPtr Logger
std::vector< OpcUa::DataValue > Results
#define LOG_INFO(__logger__,...)
Definition: common/logger.h:25
message
Definition: server.py:50
OpcUa::ResponseHeader Header
std::size_t AddSize(std::size_t size)
std::vector< OpcUa::MonitoredItemCreateResult > Results
std::vector< RelativePathElement > Elements
Definition: types.h:125
std::vector< BrowsePathResult > Paths
RelativePath Path
OpcUa::Services::SharedPtr Server
OPC UA Address space part. GNU LGPL.
OpcUa::PublishResult Parameters
OpcUa::ResponseHeader Header
std::vector< OpcUa::StatusCode > Results
void ForwardPublishResponse(const PublishResult response)
uint32_t RequestHandle
Definition: types.h:190
OpcUa::PublishingModeResult Result
A Node object represent an OPC-UA node. It is high level object intended for developper who want to e...
Definition: node.h:42
std::string Name
Definition: types.h:74
std::size_t AddSize(std::size_t size)
OpcUa::ResponseHeader Header
OpcUa::SubscriptionData Data
void OpenChannel(Binary::IStreamBinary &istream, Binary::OStreamBinary &ostream)
static DateTime Current()
std::queue< PublishRequestElement > PublishRequestQueue
std::vector< OpcUa::NodeId > NodesToUnregister
OpcUa::ModifySubscriptionParameters Parameters
OpcTcpMessages(OpcUa::Services::SharedPtr server, OpcUa::OutputChannel::SharedPtr outputChannel, const Common::Logger::SharedPtr &logger)
std::vector< OpcUa::CallMethodResult > Results
DateTime Timestamp
Definition: types.h:257
OpcUa::NotificationMessage NotificationMessage
std::vector< OpcUa::NodeId > Result
void DeleteSubscriptions(const std::vector< uint32_t > &ids)
uint8_t Encoding
Definition: data_value.h:28
ExpandedNodeId GenerateSessionId()
Definition: session.cpp:5
OpcUa::ResponseHeader Header
OpcUa::StatusCode Status
std::vector< OpcUa::SubscriptionAcknowledgement > SubscriptionAcknowledgements
StatusCode Status
Definition: data_value.h:30
ResponseHeader Header
Definition: session.h:46
OpcUa::ResponseHeader Header
ResponseHeader Header
Definition: protocol/view.h:90
std::list< uint32_t > Subscriptions
void ProcessRequest(Binary::IStreamBinary &istream, Binary::OStreamBinary &ostream)
const char Server[]
Definition: strings.h:121
OpcUa::ResponseHeader Header
OpcUa::RequestHeader Header
MessageId GetMessageId(const NodeId &id)
Definition: nodeid.cpp:301
Definition: server.py:1
void HelloClient(Binary::IStreamBinary &istream, Binary::OStreamBinary &ostream)
std::vector< OpcUa::EndpointDescription > Endpoints
QualifiedName TargetName
Definition: types.h:120
std::size_t RawSize(const T &obj)
std::vector< OpcUa::StatusCode > Results


ros_opcua_impl_freeopcua
Author(s): Denis Štogl
autogenerated on Tue Jan 19 2021 03:12:07