binary_client.cpp
Go to the documentation of this file.
1 
11 #include <opc/ua/protocol/utils.h>
14 
15 #include <opc/common/uri_facade.h>
22 
23 #include <atomic>
24 #include <chrono>
25 #include <condition_variable>
26 #include <mutex>
27 #include <queue>
28 #include <thread>
29 #include <iostream>
30 
31 
32 namespace
33 {
34 
35 using namespace OpcUa;
36 using namespace OpcUa::Binary;
37 
38 typedef std::map<uint32_t, std::function<void (PublishResult)>> SubscriptionCallbackMap;
39 
40 class BufferInputChannel : public OpcUa::InputChannel
41 {
42 public:
43  BufferInputChannel(const std::vector<char> & buffer)
44  : Buffer(buffer)
45  , Pos(0)
46  {
47  Reset();
48  }
49 
50  virtual std::size_t Receive(char * data, std::size_t size)
51  {
52  if (Pos >= Buffer.size())
53  {
54  return 0;
55  }
56 
57  size = std::min(size, Buffer.size() - Pos);
58  std::vector<char>::const_iterator begin = Buffer.begin() + Pos;
59  std::vector<char>::const_iterator end = begin + size;
60  std::copy(begin, end, data);
61  Pos += size;
62  return size;
63  }
64 
65  void Reset()
66  {
67  Pos = 0;
68  }
69 
70  virtual void Stop()
71  {
72  }
73 
74 private:
75  const std::vector<char> & Buffer;
76  std::size_t Pos;
77 };
78 
79 
80 template <typename T>
81 class RequestCallback
82 {
83 public:
84  RequestCallback(const Common::Logger::SharedPtr & logger)
85  : Logger(logger)
86  , lock(m)
87  {
88  }
89 
90  void OnData(std::vector<char> data, ResponseHeader h)
91  {
92  //std::cout << ToHexDump(data);
93  Data = std::move(data);
94  this->header = std::move(h);
95  doneEvent.notify_all();
96  }
97 
98  T WaitForData(std::chrono::milliseconds msec)
99  {
100  if (doneEvent.wait_for(lock, msec) == std::cv_status::timeout)
101  { throw std::runtime_error("Response timed out"); }
102 
103  T result;
104  result.Header = std::move(this->header);
105 
106  if (Data.empty())
107  {
108  LOG_WARN(Logger, "binary_client | received empty packet from server");
109  }
110 
111  else
112  {
113  BufferInputChannel bufferInput(Data);
114  IStreamBinary in(bufferInput);
115  in >> result;
116  }
117 
118  return result;
119  }
120 
121 private:
122  Common::Logger::SharedPtr Logger;
123  std::vector<char> Data;
125  std::mutex m;
126  std::unique_lock<std::mutex> lock;
127  std::condition_variable doneEvent;
128 };
129 
130 class CallbackThread
131 {
132 public:
133  CallbackThread(const Common::Logger::SharedPtr & logger = nullptr)
134  : Logger(logger)
135  , StopRequest(false)
136  {
137 
138  }
139 
140  void post(std::function<void()> callback)
141  {
142  LOG_DEBUG(Logger, "binary_client | CallbackThread: post -->");
143 
144  std::unique_lock<std::mutex> lock(Mutex);
145  Queue.push(callback);
146  Condition.notify_one();
147 
148  LOG_DEBUG(Logger, "binary_client | CallbackThread: post <--");
149  }
150 
151  void Run()
152  {
153  while (true)
154  {
155  LOG_DEBUG(Logger, "binary_client | CallbackThread: waiting for next post");
156 
157  std::unique_lock<std::mutex> lock(Mutex);
158  Condition.wait(lock, [&]() { return (StopRequest == true) || (! Queue.empty()) ;});
159 
160  if (StopRequest)
161  {
162  LOG_DEBUG(Logger, "binary_client | CallbackThread: exited");
163 
164  return;
165  }
166 
167  while (!Queue.empty()) //to avoid crashing on spurious events
168  {
169  std::function<void()> callback = Queue.front();
170  Queue.pop();
171  lock.unlock();
172 
173  LOG_DEBUG(Logger, "binary_client | CallbackThread: calling callback");
174  callback();
175  LOG_DEBUG(Logger, "binary_client | CallbackThread: callback finished");
176 
177  lock.lock();
178  }
179  }
180  }
181 
182  void Stop()
183  {
184  LOG_DEBUG(Logger, "binary_client | CallbackThread: stopping");
185 
186  StopRequest = true;
187  Condition.notify_all();
188  }
189 
190 private:
191  Common::Logger::SharedPtr Logger;
192  std::mutex Mutex;
193  std::condition_variable Condition;
194  std::atomic<bool> StopRequest;
195  std::queue<std::function<void()>> Queue;
196 };
197 
198 class BinaryClient
199  : public Services
200  , public AttributeServices
201  , public EndpointServices
202  , public MethodServices
203  , public NodeManagementServices
204  , public SubscriptionServices
205  , public ViewServices
206  , public std::enable_shared_from_this<BinaryClient>
207 {
208 private:
209  typedef std::function<void(std::vector<char>, ResponseHeader)> ResponseCallback;
210  typedef std::map<uint32_t, ResponseCallback> CallbackMap;
211  std::vector<char> messageBuffer;
212 
213 public:
214  BinaryClient(std::shared_ptr<IOChannel> channel, const SecureConnectionParams & params, const Common::Logger::SharedPtr & logger)
215  : Channel(channel)
216  , Stream(channel)
217  , Params(params)
218  , SequenceNumber(1)
219  , RequestNumber(1)
220  , RequestHandle(0)
221  , Logger(logger)
222  , CallbackService(logger)
223 
224  {
225  //Initialize the worker thread for subscriptions
226  callback_thread = std::thread([&]() { CallbackService.Run(); });
227  try
228  {
229  HelloServer(params);
230  }
231  catch (...)
232  {
233  CallbackService.Stop();
234  callback_thread.join();
235  throw;
236  }
237 
238  ReceiveThread = std::thread([this]()
239  {
240  try
241  {
242  while (!Finished)
243  { Receive(); }
244  }
245 
246  catch (const std::exception & exc)
247  {
248  if (Finished) { return; }
249 
250  LOG_ERROR(Logger, "binary_client | ReceiveThread: error receiving data: {}", exc.what());
251  }
252  });
253  }
254 
255  ~BinaryClient()
256  {
257  Finished = true;
258 
259  LOG_DEBUG(Logger, "binary_client | stopping callback thread");
260 
261  CallbackService.Stop();
262 
263  LOG_DEBUG(Logger, "binary_client | joining service thread");
264 
265  callback_thread.join(); //Not sure it is necessary
266 
267  Channel->Stop();
268 
269  LOG_DEBUG(Logger, "binary_client | joining receive thread");
270 
271  ReceiveThread.join();
272 
273  LOG_DEBUG(Logger, "binary_client | receive thread stopped");
274  }
275 
279  virtual CreateSessionResponse CreateSession(const RemoteSessionParameters & parameters) override
280  {
281  LOG_DEBUG(Logger, "binary_client | CreateSession -->");
282 
283  CreateSessionRequest request;
284  request.Header = CreateRequestHeader();
285 
293 
294  request.Parameters.ServerUri = parameters.ServerURI;
295  request.Parameters.EndpointUrl = parameters.EndpointUrl; // TODO make just endpoint.URL;
296  request.Parameters.SessionName = parameters.SessionName;
297  request.Parameters.ClientNonce = ByteString(std::vector<uint8_t>(32, 0));
299  request.Parameters.RequestedSessionTimeout = parameters.Timeout;
300  request.Parameters.MaxResponseMessageSize = 65536;
301  CreateSessionResponse response = Send<CreateSessionResponse>(request);
302  AuthenticationToken = response.Parameters.AuthenticationToken;
303 
304  LOG_DEBUG(Logger, "binary_client | CreateSession <--");
305 
306  return response;
307  }
308 
309  ActivateSessionResponse ActivateSession(const ActivateSessionParameters & session_parameters) override
310  {
311  LOG_DEBUG(Logger, "binary_client | ActivateSession -->");
312 
313  ActivateSessionRequest request;
314  request.Parameters = session_parameters;
315  request.Parameters.LocaleIds.push_back("en");
316  ActivateSessionResponse response = Send<ActivateSessionResponse>(request);
317 
318  LOG_DEBUG(Logger, "binary_client | ActivateSession <--");
319 
320  return response;
321  }
322 
323  virtual CloseSessionResponse CloseSession() override
324  {
325  LOG_DEBUG(Logger, "binary_client | CloseSession -->");
326 
327  CloseSessionRequest request;
328  CloseSessionResponse response = Send<CloseSessionResponse>(request);
329  RemoveSelfReferences();
330 
331  LOG_DEBUG(Logger, "binary_client | CloseSession <--");
332 
333  return response;
334  }
335 
336  virtual void AbortSession() override
337  {
338  LOG_DEBUG(Logger, "binary_client | AbortSession -->");
339 
340  RemoveSelfReferences();
341 
342  LOG_DEBUG(Logger, "binary_client | AbortSession <--");
343  }
344 
345  DeleteNodesResponse DeleteNodes(const std::vector<OpcUa::DeleteNodesItem> & nodesToDelete) override
346  {
347  LOG_DEBUG(Logger, "binary_client | DeleteNodes -->");
348 
349  DeleteNodesRequest request;
350  request.NodesToDelete = nodesToDelete;
351  DeleteNodesResponse response = Send<DeleteNodesResponse>(request);
352 
353  LOG_DEBUG(Logger, "binary_client | DeleteNodes <--");
354 
355  return response;
356  }
357 
361  virtual std::shared_ptr<AttributeServices> Attributes() override
362  {
363  return shared_from_this();
364  }
365 
366 public:
367  virtual std::vector<DataValue> Read(const ReadParameters & params) const override
368  {
369  LOG_DEBUG(Logger, "binary_client | Read -->");
370  if (Logger && Logger->should_log(spdlog::level::trace))
371  {
372  for (ReadValueId attr : params.AttributesToRead)
373  {
374  Logger->trace("binary_client | Read: node id: {} attr id: {}", attr.NodeId, ToString(attr.AttributeId));
375  }
376  }
377 
378  ReadRequest request;
379  request.Parameters = params;
380  const ReadResponse response = Send<ReadResponse>(request);
381 
382  LOG_DEBUG(Logger, "binary_client | Read <--");
383 
384  return response.Results;
385  }
386 
387  virtual std::vector<OpcUa::StatusCode> Write(const std::vector<WriteValue> & values) override
388  {
389  LOG_DEBUG(Logger, "binary_client | Write -->");
390 
391  WriteRequest request;
392  request.Parameters.NodesToWrite = values;
393  const WriteResponse response = Send<WriteResponse>(request);
394 
395  LOG_DEBUG(Logger, "binary_client | Write <--");
396 
397  return response.Results;
398  }
399 
403  virtual std::shared_ptr<EndpointServices> Endpoints() override
404  {
405  return shared_from_this();
406  }
407 
408  virtual std::vector<ApplicationDescription> FindServers(const FindServersParameters & params) const override
409  {
410  LOG_DEBUG(Logger, "binary_client | FindServers -->");
411 
413  request.Parameters = params;
414  FindServersResponse response = Send<FindServersResponse>(request);
415 
416  LOG_DEBUG(Logger, "binary_client | FindServers <--");
417 
418  return response.Data.Descriptions;
419  }
420 
421  virtual std::vector<EndpointDescription> GetEndpoints(const GetEndpointsParameters & filter) const override
422  {
423  LOG_DEBUG(Logger, "binary_client | GetEndpoints -->");
424 
426  request.Header = CreateRequestHeader();
427  request.Parameters.EndpointUrl = filter.EndpointUrl;
428  request.Parameters.LocaleIds = filter.LocaleIds;
429  request.Parameters.ProfileUris = filter.ProfileUris;
430  const GetEndpointsResponse response = Send<GetEndpointsResponse>(request);
431 
432  LOG_DEBUG(Logger, "binary_client | GetEndpoints <--");
433 
434  return response.Endpoints;
435  }
436 
437  virtual void RegisterServer(const ServerParameters & parameters) override
438  {
439  }
440 
444  virtual std::shared_ptr<MethodServices> Method() override
445  {
446  return shared_from_this();
447  }
448 
449  virtual std::vector<CallMethodResult> Call(const std::vector<CallMethodRequest> & methodsToCall) override
450  {
451  LOG_DEBUG(Logger, "binary_client | Call -->");
452 
453  CallRequest request;
454  request.Parameters.MethodsToCall = methodsToCall;
455  const CallResponse response = Send<CallResponse>(request);
456 
457  LOG_DEBUG(Logger, "binary_client | Call <--");
458 
459  // Manage errors
460 // if (!response.DiagnosticInfos.empty())
461 // {
462 // For now commented out, handling of diagnostic should be probably added for all communication
463 // }
464  return response.Results;
465  }
466 
470 
471  virtual std::shared_ptr<NodeManagementServices> NodeManagement() override
472  {
473  return shared_from_this();
474  }
475 
476  virtual std::vector<AddNodesResult> AddNodes(const std::vector<AddNodesItem> & items) override
477  {
478  LOG_DEBUG(Logger, "binary_client | AddNodes -->");
479 
480  AddNodesRequest request;
481  request.Parameters.NodesToAdd = items;
482  const AddNodesResponse response = Send<AddNodesResponse>(request);
483 
484  LOG_DEBUG(Logger, "binary_client | AddNodes <--");
485 
486  return response.results;
487  }
488 
489  virtual std::vector<StatusCode> AddReferences(const std::vector<AddReferencesItem> & items) override
490  {
491  LOG_DEBUG(Logger, "binary_client | AddReferences -->");
492 
493  AddReferencesRequest request;
494  request.Parameters.ReferencesToAdd = items;
495  const AddReferencesResponse response = Send<AddReferencesResponse>(request);
496 
497  LOG_DEBUG(Logger, "binary_client | AddReferences <--");
498 
499  return response.Results;
500  }
501 
502  virtual void SetMethod(const NodeId & node, std::function<std::vector<OpcUa::Variant> (NodeId context, std::vector<OpcUa::Variant> arguments)> callback) override
503  {
504  LOG_WARN(Logger, "binary_client | SetMethod has no effect on client!");
505 
506  return;
507  }
508 
512  virtual std::shared_ptr<SubscriptionServices> Subscriptions() override
513  {
514  return shared_from_this();
515  }
516 
517  virtual SubscriptionData CreateSubscription(const CreateSubscriptionRequest & request, std::function<void (PublishResult)> callback) override
518  {
519  LOG_DEBUG(Logger, "binary_client | CreateSubscription -->");
520 
521  const CreateSubscriptionResponse response = Send<CreateSubscriptionResponse>(request);
522 
523  LOG_DEBUG(Logger, "binary_client | got CreateSubscriptionResponse");
524 
525  PublishCallbacks[response.Data.SubscriptionId] = callback;// TODO Pass callback to the Publish method.
526 
527  LOG_DEBUG(Logger, "binary_client | CreateSubscription <--");
528 
529  return response.Data;
530  }
531 
532  virtual ModifySubscriptionResponse ModifySubscription(const ModifySubscriptionParameters & parameters) override
533  {
534  LOG_DEBUG(Logger, "binary_client | ModifySubscription -->");
535 
537  request.Parameters = parameters;
538  const ModifySubscriptionResponse response = Send<ModifySubscriptionResponse>(request);
539 
540  LOG_DEBUG(Logger, "binary_client | ModifySubscription <--");
541 
542  return response;
543  }
544 
545  virtual std::vector<StatusCode> DeleteSubscriptions(const std::vector<uint32_t> & subscriptions) override
546  {
547  LOG_DEBUG(Logger, "binary_client | DeleteSubscriptions -->");
548 
550  request.SubscriptionIds = subscriptions;
551  const DeleteSubscriptionsResponse response = Send<DeleteSubscriptionsResponse>(request);
552 
553  LOG_DEBUG(Logger, "binary_client | DeleteSubscriptions <--");
554 
555  return response.Results;
556  }
557 
558  virtual std::vector<MonitoredItemCreateResult> CreateMonitoredItems(const MonitoredItemsParameters & parameters) override
559  {
560  LOG_DEBUG(Logger, "binary_client | CreateMonitoredItems -->");
561  LOG_TRACE(Logger, "binary_client | {}", parameters);
562 
564  request.Parameters = parameters;
565  const CreateMonitoredItemsResponse response = Send<CreateMonitoredItemsResponse>(request);
566 
567  LOG_DEBUG(Logger, "binary_client | CreateMonitoredItems <--");
568 
569  return response.Results;
570  }
571 
572  virtual std::vector<StatusCode> DeleteMonitoredItems(const DeleteMonitoredItemsParameters & params) override
573  {
574  LOG_DEBUG(Logger, "binary_client | DeleteMonitoredItems -->");
575 
577  request.Parameters = params;
578  const DeleteMonitoredItemsResponse response = Send<DeleteMonitoredItemsResponse>(request);
579 
580  LOG_DEBUG(Logger, "binary_client | DeleteMonitoredItems <--");
581 
582  return response.Results;
583  }
584 
585  virtual void Publish(const PublishRequest & originalrequest) override
586  {
587  LOG_DEBUG(Logger, "binary_client | Publish --> request with {} acks", originalrequest.SubscriptionAcknowledgements.size());
588 
589  PublishRequest request(originalrequest);
590  request.Header = CreateRequestHeader();
591  request.Header.Timeout = 0; //We do not want the request to timeout!
592 
593  ResponseCallback responseCallback = [this](std::vector<char> buffer, ResponseHeader h)
594  {
595  LOG_DEBUG(Logger, "binary_client | got publish response, from server");
596 
597  PublishResponse response;
598 
600  {
601  response.Header = std::move(h);
602  }
603 
604  else
605  {
606  BufferInputChannel bufferInput(buffer);
607  IStreamBinary in(bufferInput);
608  in >> response;
609  }
610 
611  CallbackService.post([this, response]()
612  {
614  {
615  LOG_DEBUG(Logger, "binary_client | calling callback for Subscription: {}", response.Parameters.SubscriptionId);
616 
617  SubscriptionCallbackMap::const_iterator callbackIt = this->PublishCallbacks.find(response.Parameters.SubscriptionId);
618 
619  if (callbackIt == this->PublishCallbacks.end())
620  {
621  LOG_WARN(Logger, "binary_client | unknown SubscriptionId {}", response.Parameters.SubscriptionId);
622  }
623 
624  else
625  {
626  try //calling client code, better put it under try/catch otherwise we crash entire client
627  {
628  callbackIt->second(response.Parameters);
629  }
630 
631  catch (const std::exception & ex)
632  {
633  LOG_WARN(Logger, "binary_client | error calling application callback: {}", ex.what());
634  }
635  }
636  }
637 
639  {
640  LOG_WARN(Logger, "binary_client | session is closed");
641  }
642 
643  else
644  {
645  // TODO
646  LOG_DEBUG(Logger, "binary_client | not implemented");
647  }
648  });
649  };
650  std::unique_lock<std::mutex> lock(Mutex);
651  Callbacks.insert(std::make_pair(request.Header.RequestHandle, responseCallback));
652  lock.unlock();
653  Send(request);
654 
655  LOG_DEBUG(Logger, "binary_client | Publish <--");
656  }
657 
658  virtual RepublishResponse Republish(const RepublishParameters & params) override
659  {
660  LOG_DEBUG(Logger, "binary_client | Republish -->");
661 
662  RepublishRequest request;
663  request.Header = CreateRequestHeader();
664  request.Parameters = params;
665 
666  RepublishResponse response = Send<RepublishResponse>(request);
667 
668  LOG_DEBUG(Logger, "binary_client | Republish <--");
669 
670  return response;
671  }
672 
676  virtual std::shared_ptr<ViewServices> Views() override
677  {
678  return shared_from_this();
679  }
680 
681  virtual std::vector<BrowsePathResult> TranslateBrowsePathsToNodeIds(const TranslateBrowsePathsParameters & params) const override
682  {
683  LOG_DEBUG(Logger, "binary_client | TranslateBrowsePathsToNodeIds -->");
684 
686  request.Header = CreateRequestHeader();
687  request.Parameters = params;
688  const TranslateBrowsePathsToNodeIdsResponse response = Send<TranslateBrowsePathsToNodeIdsResponse>(request);
689 
690  LOG_DEBUG(Logger, "binary_client | TranslateBrowsePathsToNodeIds <--");
691 
692  return response.Result.Paths;
693  }
694 
695 
696  virtual std::vector<BrowseResult> Browse(const OpcUa::NodesQuery & query) const override
697  {
698  LOG_DEBUG(Logger, "binary_client | Browse -->");
699  if (Logger && Logger->should_log(spdlog::level::trace))
700  {
701  for (BrowseDescription desc : query.NodesToBrowse)
702  {
703  Logger->trace("Node: {}", desc.NodeToBrowse);
704  }
705  }
706 
707  BrowseRequest request;
708  request.Header = CreateRequestHeader();
709  request.Query = query;
710  const BrowseResponse response = Send<BrowseResponse>(request);
711  ContinuationPoints.clear();
712 
713  for (BrowseResult result : response.Results)
714  {
715  if (!result.ContinuationPoint.empty())
716  {
717  ContinuationPoints.push_back(result.ContinuationPoint);
718  }
719  }
720 
721  LOG_DEBUG(Logger, "binary_client | Browse <--");
722 
723  return response.Results;
724  }
725 
726  virtual std::vector<BrowseResult> BrowseNext() const override
727  {
728  LOG_DEBUG(Logger, "binary_client | BrowseNext -->");
729 
730  //FIXME: fix method interface so we do not need to decice arbitriraly if we need to send BrowseNext or not...
731  if (ContinuationPoints.empty())
732  {
733  LOG_DEBUG(Logger, "binary_client | BrowseNext <-- no Continuation point, no need to send browse next request");
734 
735  return std::vector<BrowseResult>();
736  }
737 
738  BrowseNextRequest request;
739  request.ReleaseContinuationPoints = ContinuationPoints.empty() ? true : false;
740  request.ContinuationPoints = ContinuationPoints;
741  const BrowseNextResponse response = Send<BrowseNextResponse>(request);
742  ContinuationPoints.clear();
743 
744  for (auto result : response.Results)
745  {
746  if (!result.ContinuationPoint.empty())
747  {
748  ContinuationPoints.push_back(result.ContinuationPoint);
749  }
750  }
751 
752  LOG_DEBUG(Logger, "binary_client | BrowseNext <--");
753 
754  return response.Results;
755  }
756 
757  std::vector<NodeId> RegisterNodes(const std::vector<NodeId> & params) const override
758  {
759  LOG_DEBUG(Logger, "binary_client | RegisterNodes -->");
760  if (Logger && Logger->should_log(spdlog::level::trace))
761  {
762  Logger->trace("binary_client | Nodes to register:");
763 
764  for (auto & param : params)
765  {
766  Logger->trace(" {}", param);
767  }
768  }
769 
770  RegisterNodesRequest request;
771 
772  request.NodesToRegister = params;
773  RegisterNodesResponse response = Send<RegisterNodesResponse>(request);
774 
775  if (Logger && Logger->should_log(spdlog::level::trace))
776  {
777  Logger->trace("binary_client | registered NodeIds:");
778 
779  for (auto & id : response.Result)
780  {
781  Logger->trace(" {}", id);
782  }
783  }
784  LOG_DEBUG(Logger, "binary_client | RegisterNodes <--");
785  return response.Result;
786  }
787 
788  void UnregisterNodes(const std::vector<NodeId> & params) const override
789  {
790  LOG_DEBUG(Logger, "binary_client | UnregisterNodes -->");
791  if (Logger && Logger->should_log(spdlog::level::trace))
792  {
793  Logger->trace("binary_client | Nodes to unregister:");
794 
795  for (auto & id : params)
796  {
797  Logger->trace(" {}", id);
798  }
799  }
800 
801  UnregisterNodesRequest request;
802  request.NodesToUnregister = params;
803  UnregisterNodesResponse response = Send<UnregisterNodesResponse>(request);
804 
805  LOG_DEBUG(Logger, "binary_client | UnregisterNodes <--");
806  }
807 
808 private:
809  //FIXME: this method should be removed, better add realease option to BrowseNext
810  void Release() const
811  {
812  ContinuationPoints.clear();
813  BrowseNext();
814  }
815 
816 public:
817 
821  virtual OpcUa::OpenSecureChannelResponse OpenSecureChannel(const OpenSecureChannelParameters & params) override
822  {
823  LOG_DEBUG(Logger, "binary_client | OpenChannel -->");
824 
825  OpenSecureChannelRequest request;
826  request.Parameters = params;
827 
828  OpenSecureChannelResponse response = Send<OpenSecureChannelResponse>(request);
829 
830  ChannelSecurityToken = response.ChannelSecurityToken; //Save security token, we need it
831 
832  LOG_DEBUG(Logger, "binary_client | OpenChannel <--");
833 
834  return response;
835  }
836 
837  virtual void CloseSecureChannel(uint32_t channelId) override
838  {
839  LOG_DEBUG(Logger, "binary_client | CloseSecureChannel -->");
840  try
841  {
843 
844  const SymmetricAlgorithmHeader algorithmHeader = CreateAlgorithmHeader();
845  hdr.AddSize(RawSize(algorithmHeader));
846 
847  std::unique_lock<std::mutex> send_lock(send_mutex);
848 
849  const SequenceHeader sequence = CreateSequenceHeader();
850  hdr.AddSize(RawSize(sequence));
851 
853  //request. ChannelId = channelId; FIXME: spec says it hsould be here, in practice it is not even sent?!?!
854  hdr.AddSize(RawSize(request));
855 
856  Stream << hdr << algorithmHeader << sequence << request << flush;
857  }
858 
859  catch (const std::exception & exc)
860  {
861  LOG_WARN(Logger, "closing secure channel failed with: {}", exc.what());
862  }
863 
864  LOG_DEBUG(Logger, "binary_client | CloseSecureChannel <--");
865  }
866 
867 private:
868  template <typename Response, typename Request>
869  Response Send(Request request) const
870  {
871  request.Header = CreateRequestHeader();
872 
873  RequestCallback<Response> requestCallback(Logger);
874  ResponseCallback responseCallback = [&requestCallback](std::vector<char> buffer, ResponseHeader h)
875  {
876  requestCallback.OnData(std::move(buffer), std::move(h));
877  };
878  std::unique_lock<std::mutex> lock(Mutex);
879  Callbacks.insert(std::make_pair(request.Header.RequestHandle, responseCallback));
880  lock.unlock();
881 
882  LOG_DEBUG(Logger, "binary_client | send: id: {} handle: {}, UtcTime: {}", ToString(request.TypeId, true), request.Header.RequestHandle, request.Header.UtcTime);
883 
884  Send(request);
885 
886  Response res;
887 
888  try
889  {
890  res = requestCallback.WaitForData(std::chrono::milliseconds(request.Header.Timeout));
891  }
892 
893  catch (std::exception & ex)
894  {
895  //Remove the callback on timeout
896  std::unique_lock<std::mutex> lock(Mutex);
897  Callbacks.erase(request.Header.RequestHandle);
898  lock.unlock();
899  throw;
900  }
901 
902  return res;
903  }
904 
905  // Prevent multiple threads from sending parts of different packets at the same time.
906  mutable std::mutex send_mutex;
907 
908  template <typename Request>
909  void Send(Request request) const
910  {
911  // TODO add support for breaking message into multiple chunks
913  const SymmetricAlgorithmHeader algorithmHeader = CreateAlgorithmHeader();
914  hdr.AddSize(RawSize(algorithmHeader));
915 
916  std::unique_lock<std::mutex> send_lock(send_mutex);
917 
918  const SequenceHeader sequence = CreateSequenceHeader();
919  hdr.AddSize(RawSize(sequence));
920  hdr.AddSize(RawSize(request));
921 
922  Stream << hdr << algorithmHeader << sequence << request << flush;
923  }
924 
925 
926 
927  void Receive()
928  {
929  Binary::SecureHeader responseHeader;
930  Stream >> responseHeader;
931  LOG_DEBUG(Logger, "binary_client | received message: Type: {}, ChunkType: {}, Size: {}, ChannelId: {}", responseHeader.Type, responseHeader.Chunk, responseHeader.Size, responseHeader.ChannelId);
932 
933  size_t algo_size;
934 
935  if (responseHeader.Type == MessageType::MT_SECURE_OPEN)
936  {
937  AsymmetricAlgorithmHeader responseAlgo;
938  Stream >> responseAlgo;
939  algo_size = RawSize(responseAlgo);
940  }
941 
942  else if (responseHeader.Type == MessageType::MT_ERROR)
943  {
944  StatusCode error;
945  std::string msg;
946  Stream >> error;
947  Stream >> msg;
948  std::stringstream stream;
949  stream << "Received error message from server: " << ToString(error) << ", " << msg ;
950  throw std::runtime_error(stream.str());
951  }
952 
953  else //(responseHeader.Type == MessageType::MT_SECURE_MESSAGE )
954  {
956  Stream >> responseAlgo;
957  algo_size = RawSize(responseAlgo);
958  }
959 
960  NodeId id;
961  Binary::SequenceHeader responseSequence;
962  Stream >> responseSequence; // TODO Check for request Number
963 
964  const std::size_t expectedHeaderSize = RawSize(responseHeader) + algo_size + RawSize(responseSequence);
965 
966  if (expectedHeaderSize >= responseHeader.Size)
967  {
968  std::stringstream stream;
969  stream << "Size of received message " << responseHeader.Size << " bytes is invalid. Expected size " << expectedHeaderSize << " bytes";
970  throw std::runtime_error(stream.str());
971  }
972 
973  std::size_t dataSize = responseHeader.Size - expectedHeaderSize;
974 
975  if (responseHeader.Chunk == CHT_SINGLE)
976  {
977  parseMessage(dataSize, id);
978  firstMsgParsed = false;
979 
980  std::unique_lock<std::mutex> lock(Mutex);
981  CallbackMap::const_iterator callbackIt = Callbacks.find(header.RequestHandle);
982 
983  if (callbackIt == Callbacks.end())
984  {
985  LOG_WARN(Logger, "binary_client | no callback found for message id: {}, handle: {}", id, header.RequestHandle);
986  messageBuffer.clear();
987  return;
988  }
989 
990  callbackIt->second(std::move(messageBuffer), std::move(header));
991  messageBuffer.clear();
992  Callbacks.erase(callbackIt);
993  }
994 
995  else if (responseHeader.Chunk == CHT_INTERMEDIATE)
996  {
997  parseMessage(dataSize, id);
998  firstMsgParsed = true;
999  }
1000  }
1001 
1002  void parseMessage(std::size_t & dataSize, NodeId & id)
1003  {
1004  std::vector<char> buffer(dataSize);
1005  BufferInputChannel bufferInput(buffer);
1006  Binary::RawBuffer raw(&buffer[0], dataSize);
1007  Stream >> raw;
1008  LOG_TRACE(Logger, "binary_client | received message data: {}", ToHexDump(buffer));
1009 
1010  if (!firstMsgParsed)
1011  {
1012  IStreamBinary in(bufferInput);
1013  in >> id;
1014  in >> header;
1015 
1016  LOG_DEBUG(Logger, "binary_client | got response id: {}, handle: {}", ToString(id, true), header.RequestHandle);
1017 
1018  if (id == SERVICE_FAULT)
1019  {
1020  LOG_WARN(Logger, "binary_client | receive ServiceFault from Server with StatusCode: {}", header.ServiceResult);
1021  }
1022  else if (header.ServiceResult != StatusCode::Good)
1023  {
1024  LOG_WARN(Logger, "binary_client | received a response from server with error status: {}", header.ServiceResult);
1025  }
1026 
1027  messageBuffer.insert(messageBuffer.end(), buffer.begin(), buffer.end());
1028  }
1029 
1030  else
1031  {
1032  messageBuffer.insert(messageBuffer.end(), buffer.begin(), buffer.end());
1033  }
1034  }
1035 
1036  Binary::Acknowledge HelloServer(const SecureConnectionParams & params)
1037  {
1038  LOG_DEBUG(Logger, "binary_client | HelloServer -->");
1039 
1040  Binary::Hello hello;
1041  hello.ProtocolVersion = 0;
1042  hello.ReceiveBufferSize = 65536;
1043  hello.SendBufferSize = 65536;
1044  hello.MaxMessageSize = 65536;
1045  hello.MaxChunkCount = 256;
1046  hello.EndpointUrl = params.EndpointUrl;
1047 
1049  hdr.AddSize(RawSize(hello));
1050 
1051  Stream << hdr << hello << flush;
1052 
1053  Header respHeader;
1054  Stream >> respHeader; // TODO add check for acknowledge header
1055 
1056  Acknowledge ack;
1057  Stream >> ack; // TODO check for connection parameters
1058 
1059  LOG_DEBUG(Logger, "binary_client | HelloServer <--");
1060 
1061  return ack;
1062  }
1063 
1064 
1065  SymmetricAlgorithmHeader CreateAlgorithmHeader() const
1066  {
1067  SymmetricAlgorithmHeader algorithmHeader;
1068  algorithmHeader.TokenId = ChannelSecurityToken.TokenId;
1069  return algorithmHeader;
1070  }
1071 
1072  SequenceHeader CreateSequenceHeader() const
1073  {
1074  SequenceHeader sequence;
1075  sequence.SequenceNumber = ++SequenceNumber;
1076  sequence.RequestId = ++RequestNumber;
1077  return sequence;
1078  }
1079 
1080  RequestHeader CreateRequestHeader() const
1081  {
1082  RequestHeader header;
1083  header.SessionAuthenticationToken = AuthenticationToken;
1084  header.RequestHandle = GetRequestHandle();
1085  header.Timeout = 10000;
1086  return header;
1087  }
1088 
1089  unsigned GetRequestHandle() const
1090  {
1091  return ++RequestHandle;
1092  }
1093 
1094  // Binary client is self-referenced from captures of subscription callbacks
1095  // Remove this references to make ~BinaryClient() run possible
1096  void RemoveSelfReferences()
1097  {
1098  LOG_DEBUG(Logger, "binary_client | clearing cached references to server");
1099 
1100  PublishCallbacks.clear();
1101  }
1102 
1103 private:
1104  std::shared_ptr<IOChannel> Channel;
1105  mutable IOStreamBinary Stream;
1106  SecureConnectionParams Params;
1107  std::thread ReceiveThread;
1108 
1109  SubscriptionCallbackMap PublishCallbacks;
1111  mutable std::atomic<uint32_t> SequenceNumber;
1112  mutable std::atomic<uint32_t> RequestNumber;
1113  ExpandedNodeId AuthenticationToken;
1114  mutable std::atomic<uint32_t> RequestHandle;
1115  mutable std::vector<std::vector<uint8_t>> ContinuationPoints;
1116  mutable CallbackMap Callbacks;
1117  Common::Logger::SharedPtr Logger;
1118  bool Finished = false;
1119 
1120  std::thread callback_thread;
1121  CallbackThread CallbackService;
1122  mutable std::mutex Mutex;
1123 
1124  bool firstMsgParsed = false;
1125  ResponseHeader header;
1126 };
1127 
1128 template <>
1129 void BinaryClient::Send<OpenSecureChannelRequest>(OpenSecureChannelRequest request) const
1130 {
1132  AsymmetricAlgorithmHeader algorithmHeader;
1133  algorithmHeader.SecurityPolicyUri = Params.SecurePolicy;
1134  algorithmHeader.SenderCertificate = Params.SenderCertificate;
1135  algorithmHeader.ReceiverCertificateThumbPrint = Params.ReceiverCertificateThumbPrint;
1136  hdr.AddSize(RawSize(algorithmHeader));
1137  hdr.AddSize(RawSize(request));
1138 
1139  std::unique_lock<std::mutex> send_lock(send_mutex);
1140 
1141  const SequenceHeader sequence = CreateSequenceHeader();
1142  hdr.AddSize(RawSize(sequence));
1143  Stream << hdr << algorithmHeader << sequence << request << flush;
1144 }
1145 
1146 } // namespace
1147 
1148 
1149 OpcUa::Services::SharedPtr OpcUa::CreateBinaryClient(OpcUa::IOChannel::SharedPtr channel, const OpcUa::SecureConnectionParams & params, const Common::Logger::SharedPtr & logger)
1150 {
1151  return std::make_shared<BinaryClient>(channel, params, logger);
1152 }
1153 
1154 OpcUa::Services::SharedPtr OpcUa::CreateBinaryClient(const std::string & endpointUrl, const Common::Logger::SharedPtr & logger)
1155 {
1156  const Common::Uri serverUri(endpointUrl);
1157  OpcUa::IOChannel::SharedPtr channel = OpcUa::Connect(serverUri.Host(), serverUri.Port(), logger);
1159  params.EndpointUrl = endpointUrl;
1160  params.SecurePolicy = "http://opcfoundation.org/UA/SecurityPolicy#None";
1161  return CreateBinaryClient(channel, params, logger);
1162 }
std::vector< uint32_t > SubscriptionIds
std::vector< ApplicationDescription > Descriptions
std::vector< OpcUa::ReadValueId > AttributesToRead
uint32_t Timeout
Definition: types.h:193
#define LOG_TRACE(__logger__,...)
Definition: common/logger.h:23
std::vector< OpcUa::NodeId > NodesToRegister
string endpointUrl
Definition: test.py:5
std::vector< uint8_t > ContinuationPoint
Definition: protocol/view.h:81
#define LOG_WARN(__logger__,...)
Definition: common/logger.h:26
std::vector< AddNodesResult > results
void CloseSession(OpcUa::Binary::IOStream &stream, const OpcUa::Binary::CreateSessionResponse &session)
OpcUa::CreateSessionParameters Parameters
ApplicationDescription ClientDescription
Definition: services.h:49
std::vector< BrowseDescription > NodesToBrowse
Definition: protocol/view.h:50
std::vector< OpcUa::StatusCode > Results
#define LOG_ERROR(__logger__,...)
Definition: common/logger.h:27
std::vector< AddNodesItem > NodesToAdd
std::vector< BrowseResult > Results
StatusCode ServiceResult
Definition: types.h:259
#define LOG_DEBUG(__logger__,...)
Definition: common/logger.h:24
std::vector< StatusCode > Results
std::vector< uint8_t > ClientCertificate
Definition: services.h:50
std::vector< T > Browse(const NodeId &node, NodeClass nodeClassMask, Services::SharedPtr services)
Definition: model_impl.h:31
std_msgs::Header * header(M &m)
OpcUa::NodeId AuthenticationToken
unsigned Port() const
Definition: uri_facade.h:46
TranslateBrowsePathsParameters Parameters
OpcUa::ByteString ClientCertificate
std::vector< OpcUa::StatusCode > Results
OpcUa::CreateSessionResult Parameters
OStream< ChannelType > & flush(OStream< ChannelType > &os)
Definition: stream.h:147
int Write(int fd, const void *buf, unsigned int count)
std::vector< BrowseResult > Results
Definition: protocol/view.h:92
OpcUa::ReadParameters Parameters
OpcUa::RequestHeader Header
OpcUa::CallParameters Parameters
fmt::BufferedFile & move(fmt::BufferedFile &f)
Definition: posix.h:432
std::vector< OpcUa::DataValue > Results
OpcUa::RequestHeader Header
OpcUa::Binary::CreateSessionResponse CreateSession(OpcUa::Binary::IOStream &stream)
Services::SharedPtr CreateBinaryClient(IOChannel::SharedPtr channel, const SecureConnectionParams &params, const Common::Logger::SharedPtr &logger=nullptr)
Create server based on opc ua binary protocol.
RequestHeader Header
Definition: protocol/view.h:58
std::vector< OpcUa::DeleteNodesItem > NodesToDelete
OpcUa::ApplicationType ApplicationType
std::vector< OpcUa::MonitoredItemCreateResult > Results
std::vector< BrowsePathResult > Paths
int Read(int fd, void *buf, unsigned int count)
OpcUa::RequestHeader Header
OPC UA Address space part. GNU LGPL.
void ActivateSession(OpcUa::Binary::IOStream &stream, const OpcUa::Binary::CreateSessionResponse &session)
OpenSecureChannelParameters Parameters
std::string ToHexDump(const char *buf, std::size_t size)
Definition: utils.h:29
OpcUa::ResponseHeader Header
OpcUa::DeleteMonitoredItemsParameters Parameters
const char Views[]
Definition: strings.h:208
uint32_t RequestHandle
Definition: types.h:190
std::vector< OpcUa::WriteValue > NodesToWrite
std::vector< std::string > LocaleIds
OpcUa::LocalizedText ApplicationName
ExpandedNodeId SessionAuthenticationToken
Definition: types.h:188
OpcUa::WriteParameters Parameters
#define Method
OpcUa::GetEndpointsParameters Parameters
OpcUa::SubscriptionData Data
std::string ToString(const AttributeId &value)
std::vector< OpcUa::NodeId > NodesToUnregister
OpcUa::ModifySubscriptionParameters Parameters
std::vector< std::string > DiscoveryUrls
OpcUa::RepublishParameters Parameters
std::vector< OpcUa::CallMethodRequest > MethodsToCall
std::vector< OpcUa::CallMethodResult > Results
OpcUa::ApplicationDescription ClientDescription
OpcUa::ActivateSessionParameters Parameters
OpcUa::AttributeId AttributeId
BasicData Data
Definition: format.h:993
std::vector< OpcUa::NodeId > Result
std::vector< std::string > LocaleIds
std::vector< OpcUa::SubscriptionAcknowledgement > SubscriptionAcknowledgements
std::string Host() const
Definition: uri_facade.h:41
OpcUa::NodeId NodeId
FindServersParameters Parameters
std::vector< AddReferencesItem > ReferencesToAdd
std::unique_ptr< RemoteConnection > Connect(const std::string &host, unsigned port, const Common::Logger::SharedPtr &logger)
OpcUa::MonitoredItemsParameters Parameters
std::vector< std::string > ProfileUris
AddReferencesParameters Parameters
std::vector< OpcUa::EndpointDescription > Endpoints
std::size_t RawSize(const T &obj)
std::vector< OpcUa::StatusCode > Results


ros_opcua_impl_freeopcua
Author(s): Denis Štogl
autogenerated on Tue Jan 19 2021 03:06:03