binary_client.cpp
Go to the documentation of this file.
00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 #include <opc/ua/protocol/utils.h>
00012 #include <opc/ua/client/binary_client.h>
00013 #include <opc/ua/client/remote_connection.h>
00014 
00015 #include <opc/common/uri_facade.h>
00016 #include <opc/ua/protocol/binary/stream.h>
00017 #include <opc/ua/protocol/channel.h>
00018 #include <opc/ua/protocol/secure_channel.h>
00019 #include <opc/ua/protocol/session.h>
00020 #include <opc/ua/protocol/string_utils.h>
00021 #include <opc/ua/services/services.h>
00022 
00023 #include <atomic>
00024 #include <chrono>
00025 #include <condition_variable>
00026 #include <mutex>
00027 #include <queue>
00028 #include <thread>
00029 #include <iostream>
00030 
00031 
00032 namespace
00033 {
00034 
00035   using namespace OpcUa;
00036   using namespace OpcUa::Binary;
00037 
00038   typedef std::map<uint32_t, std::function<void (PublishResult)>> SubscriptionCallbackMap;
00039 
00040   class BufferInputChannel : public OpcUa::InputChannel
00041   {
00042   public:
00043      BufferInputChannel(const std::vector<char>& buffer)
00044        : Buffer(buffer)
00045        , Pos(0)
00046      {
00047        Reset();
00048      }
00049 
00050      virtual std::size_t Receive(char* data, std::size_t size)
00051      {
00052        if (Pos >= Buffer.size())
00053        {
00054          return 0;
00055        }
00056 
00057        size = std::min(size, Buffer.size() - Pos);
00058        std::vector<char>::const_iterator begin = Buffer.begin() + Pos;
00059        std::vector<char>::const_iterator end = begin + size;
00060        std::copy(begin, end, data);
00061        Pos += size;
00062        return size;
00063      }
00064 
00065      void Reset()
00066      {
00067        Pos = 0;
00068      }
00069 
00070      virtual void Stop()
00071      {
00072      }
00073 
00074   private:
00075     const std::vector<char>& Buffer;
00076     std::size_t Pos;
00077   };
00078 
00079 
00080   template <typename T>
00081   class RequestCallback
00082   {
00083   public:
00084     RequestCallback()
00085       : lock(m)
00086     {
00087     }
00088 
00089     void OnData(std::vector<char> data, ResponseHeader h)
00090     {
00091       //PrintBlob(data);
00092       Data = std::move(data);
00093           this->header = std::move(h);
00094       doneEvent.notify_all();
00095     }
00096 
00097     T WaitForData(std::chrono::milliseconds msec)
00098     {
00099       doneEvent.wait_for(lock, msec);
00100       T result;
00101           result.Header = std::move(this->header);
00102       if ( Data.empty() )
00103       {
00104         std::cout << "Error: received empty packet from server" << std::endl;
00105       }
00106           else
00107           {
00108                   BufferInputChannel bufferInput(Data);
00109                   IStreamBinary in(bufferInput);
00110                   in >> result;
00111           }
00112       return result;
00113     }
00114 
00115   private:
00116     std::vector<char> Data;
00117         ResponseHeader    header;
00118     std::mutex m;
00119     std::unique_lock<std::mutex> lock;
00120     std::condition_variable doneEvent;
00121   };
00122 
00123   class CallbackThread
00124   {
00125     public:
00126       CallbackThread(bool debug=false) : Debug(debug), StopRequest(false)
00127       {
00128 
00129       }
00130 
00131       void post(std::function<void()> callback)
00132       {
00133         if (Debug)  { std::cout << "binary_client| CallbackThread :  start post" << std::endl; }
00134         std::unique_lock<std::mutex> lock(Mutex);
00135         Queue.push(callback);
00136         Condition.notify_one();
00137         if (Debug)  { std::cout << "binary_client| CallbackThread :  end post" << std::endl; }
00138       }
00139 
00140       void Run()
00141       {
00142         while (true)
00143         {
00144           if (Debug)  { std::cout << "binary_client| CallbackThread : waiting for next post" << std::endl; }
00145           std::unique_lock<std::mutex> lock(Mutex);
00146           Condition.wait(lock, [&]() { return (StopRequest == true) || ( ! Queue.empty() ) ;} );
00147           if (StopRequest)
00148           {
00149             if (Debug)  { std::cout << "binary_client| CallbackThread : exited." << std::endl; }
00150             return;
00151           }
00152           while ( ! Queue.empty() ) //to avoid crashing on spurious events
00153           {
00154             if (Debug)  { std::cout << "binary_client| CallbackThread : condition has triggered copying callback and poping. queue size is  " << Queue.size() << std::endl; }
00155             std::function<void()> callbackcopy = Queue.front();
00156             Queue.pop();
00157             lock.unlock();
00158             if (Debug)  { std::cout << "binary_client| CallbackThread : now calling callback." << std::endl; }
00159             callbackcopy();
00160             if (Debug)  { std::cout << "binary_client| CallbackThread : callback called." << std::endl; }
00161             lock.lock();
00162           }
00163         }
00164       }
00165 
00166       void Stop()
00167       {
00168         if (Debug)  { std::cout << "binary_client| CallbackThread : stopping." << std::endl; }
00169         StopRequest = true;
00170         Condition.notify_all();
00171       }
00172 
00173     private:
00174       bool Debug = false;
00175       std::mutex Mutex;
00176       std::condition_variable Condition;
00177       std::atomic<bool> StopRequest;
00178       std::queue<std::function<void()>> Queue;
00179   };
00180 
00181   class BinaryClient
00182     : public Services
00183     , public AttributeServices
00184     , public EndpointServices
00185     , public MethodServices
00186     , public NodeManagementServices
00187     , public SubscriptionServices
00188     , public ViewServices
00189     , public std::enable_shared_from_this<BinaryClient>
00190   {
00191   private:
00192     typedef std::function<void(std::vector<char>, ResponseHeader)> ResponseCallback;
00193     typedef std::map<uint32_t, ResponseCallback> CallbackMap;
00194 
00195   public:
00196     BinaryClient(std::shared_ptr<IOChannel> channel, const SecureConnectionParams& params, bool debug)
00197       : Channel(channel)
00198       , Stream(channel)
00199       , Params(params)
00200       , SequenceNumber(1)
00201       , RequestNumber(1)
00202       , RequestHandle(0)
00203       , Debug(debug)
00204       , CallbackService(debug)
00205 
00206     {
00207       //Initialize the worker thread for subscriptions
00208       callback_thread = std::thread([&](){ CallbackService.Run(); });
00209 
00210       HelloServer(params);
00211 
00212       ReceiveThread = std::move(std::thread([this](){
00213         try
00214         {
00215           while(!Finished)
00216             Receive();
00217         }
00218         catch (const std::exception& exc)
00219         {
00220           if (Debug)  { std::cerr << "binary_client| CallbackThread : Error receiving data: "; }
00221           std::cerr << exc.what() << std::endl;
00222         }
00223       }));
00224     }
00225 
00226     ~BinaryClient()
00227     {
00228       Finished = true;
00229 
00230       if (Debug) std::cout << "binary_client| Stopping callback thread." << std::endl;
00231       CallbackService.Stop();
00232       if (Debug) std::cout << "binary_client| Joining service thread." << std::endl;
00233       callback_thread.join(); //Not sure it is necessary
00234 
00235       Channel->Stop();
00236       if (Debug) std::cout << "binary_client| Joining receive thread." << std::endl;
00237       ReceiveThread.join();
00238       if (Debug) std::cout << "binary_client| Receive tread stopped." << std::endl;
00239 
00240       if (Debug) std::cout << "binary_client| Destroyed." << std::endl;
00241     }
00242 
00246     virtual CreateSessionResponse CreateSession(const RemoteSessionParameters& parameters)
00247     {
00248       if (Debug)  { std::cout << "binary_client| CreateSession -->" << std::endl; }
00249       CreateSessionRequest request;
00250       request.Header = CreateRequestHeader();
00251 
00252       request.Parameters.ClientDescription.ApplicationUri = parameters.ClientDescription.ApplicationUri;
00253       request.Parameters.ClientDescription.ProductUri = parameters.ClientDescription.ProductUri;
00254       request.Parameters.ClientDescription.ApplicationName = parameters.ClientDescription.ApplicationName;
00255       request.Parameters.ClientDescription.ApplicationType = parameters.ClientDescription.ApplicationType;
00256       request.Parameters.ClientDescription.GatewayServerUri = parameters.ClientDescription.GatewayServerUri;
00257       request.Parameters.ClientDescription.DiscoveryProfileUri = parameters.ClientDescription.DiscoveryProfileUri;
00258       request.Parameters.ClientDescription.DiscoveryUrls = parameters.ClientDescription.DiscoveryUrls;
00259 
00260       request.Parameters.ServerUri = parameters.ServerURI;
00261       request.Parameters.EndpointUrl = parameters.EndpointUrl; // TODO make just endpoint.URL;
00262       request.Parameters.SessionName = parameters.SessionName;
00263       request.Parameters.ClientNonce = ByteString(std::vector<uint8_t>(32,0));
00264       request.Parameters.ClientCertificate = ByteString(parameters.ClientCertificate);
00265       request.Parameters.RequestedSessionTimeout = parameters.Timeout;
00266       request.Parameters.MaxResponseMessageSize = 65536;
00267       CreateSessionResponse response = Send<CreateSessionResponse>(request);
00268       AuthenticationToken = response.Parameters.AuthenticationToken;
00269       if (Debug)  { std::cout << "binary_client| CreateSession <--" << std::endl; }
00270       return response;
00271     }
00272 
00273     ActivateSessionResponse ActivateSession(const ActivateSessionParameters &session_parameters) override
00274     {
00275       if (Debug)  { std::cout << "binary_client| ActivateSession -->" << std::endl; }
00276       ActivateSessionRequest request;
00277       request.Parameters = session_parameters;
00278       request.Parameters.LocaleIds.push_back("en");
00279       ActivateSessionResponse response = Send<ActivateSessionResponse>(request);
00280       if (Debug)  { std::cout << "binary_client| ActivateSession <--" << std::endl; }
00281       return response;
00282     }
00283 
00284     virtual CloseSessionResponse CloseSession()
00285     {
00286       if (Debug)  { std::cout << "binary_client| CloseSession -->" << std::endl; }
00287       CloseSessionRequest request;
00288       CloseSessionResponse response = Send<CloseSessionResponse>(request);
00289       RemoveSelfReferences();
00290       if (Debug)  { std::cout << "binary_client| CloseSession <--" << std::endl; }
00291       return response;
00292     }
00293 
00294     virtual void AbortSession()
00295     {
00296       if (Debug)  { std::cout << "binary_client| AbortSession -->" << std::endl; }
00297       RemoveSelfReferences();
00298       if (Debug)  { std::cout << "binary_client| AbortSession <--" << std::endl; }
00299     }
00300 
00301     DeleteNodesResponse DeleteNodes(const std::vector<OpcUa::DeleteNodesItem> &nodesToDelete) override
00302     {
00303       if (Debug)  { std::cout << "binary_client| DeleteNodes -->" << std::endl; }
00304       DeleteNodesRequest request;
00305       request.NodesToDelete = nodesToDelete;
00306       DeleteNodesResponse response = Send<DeleteNodesResponse>(request);
00307       if (Debug)  { std::cout << "binary_client| DeleteNodes <--" << std::endl; }
00308       return response;
00309     }
00310 
00314     virtual std::shared_ptr<AttributeServices> Attributes() override
00315     {
00316       return shared_from_this();
00317     }
00318 
00319   public:
00320     virtual std::vector<DataValue> Read(const ReadParameters& params) const
00321     {
00322       if (Debug)  {
00323         std::cout << "binary_client| Read -->" << std::endl;
00324         for ( ReadValueId attr : params.AttributesToRead )
00325         {
00326           std::cout << attr.NodeId << "  " << (uint32_t)attr.AttributeId;
00327         }
00328         std::cout << std::endl;
00329       }
00330       ReadRequest request;
00331       request.Parameters = params;
00332       const ReadResponse response = Send<ReadResponse>(request);
00333       if (Debug)  { std::cout << "binary_client| Read <--" << std::endl; }
00334       return response.Results;
00335     }
00336 
00337     virtual std::vector<OpcUa::StatusCode> Write(const std::vector<WriteValue>& values)
00338     {
00339       if (Debug)  { std::cout << "binary_client| Write -->" << std::endl; }
00340       WriteRequest request;
00341       request.Parameters.NodesToWrite = values;
00342       const WriteResponse response = Send<WriteResponse>(request);
00343       if (Debug)  { std::cout << "binary_client| Write <--" << std::endl; }
00344       return response.Results;
00345     }
00346 
00350     virtual std::shared_ptr<EndpointServices> Endpoints() override
00351     {
00352       return shared_from_this();
00353     }
00354 
00355     virtual std::vector<ApplicationDescription> FindServers(const FindServersParameters& params) const
00356     {
00357       if (Debug)  { std::cout << "binary_client| FindServers -->" << std::endl; }
00358       OpcUa::FindServersRequest request;
00359       request.Parameters = params;
00360       FindServersResponse response = Send<FindServersResponse>(request);
00361       if (Debug)  { std::cout << "binary_client| FindServers <--" << std::endl; }
00362       return response.Data.Descriptions;
00363     }
00364 
00365     virtual std::vector<EndpointDescription> GetEndpoints(const GetEndpointsParameters& filter) const
00366     {
00367       if (Debug)  { std::cout << "binary_client| GetEndpoints -->" << std::endl; }
00368       OpcUa::GetEndpointsRequest request;
00369       request.Header = CreateRequestHeader();
00370       request.Parameters.EndpointUrl = filter.EndpointUrl;
00371       request.Parameters.LocaleIds = filter.LocaleIds;
00372       request.Parameters.ProfileUris = filter.ProfileUris;
00373       const GetEndpointsResponse response = Send<GetEndpointsResponse>(request);
00374       if (Debug)  { std::cout << "binary_client| GetEndpoints <--" << std::endl; }
00375       return response.Endpoints;
00376     }
00377 
00378     virtual void RegisterServer(const ServerParameters& parameters)
00379     {
00380     }
00381 
00385     virtual std::shared_ptr<MethodServices> Method() override
00386     {
00387       return shared_from_this();
00388     }
00389 
00390     virtual std::vector<CallMethodResult> Call(const std::vector<CallMethodRequest>& methodsToCall)
00391     {
00392       if (Debug) {std::cout << "binary_clinent | Call -->" << std::endl;}
00393       CallRequest request;
00394       request.MethodsToCall = methodsToCall;
00395       const CallResponse response = Send<CallResponse>(request);
00396       if (Debug) {std::cout << "binary_clinent | Call <--" << std::endl;}
00397       // Manage errors
00398 //       if (!response.DiagnosticInfos.empty())
00399 //       {
00400 // For now commented out, handling of diagnostic should be probably added for all communication
00401 //       }
00402       return response.Results;
00403     }
00404 
00408 
00409     virtual std::shared_ptr<NodeManagementServices> NodeManagement() override
00410     {
00411       return shared_from_this();
00412     }
00413 
00414     virtual std::vector<AddNodesResult> AddNodes(const std::vector<AddNodesItem>& items)
00415     {
00416       if (Debug)  { std::cout << "binary_client| AddNodes -->" << std::endl; }
00417       AddNodesRequest request;
00418       request.Parameters.NodesToAdd = items;
00419       const AddNodesResponse response = Send<AddNodesResponse>(request);
00420       if (Debug)  { std::cout << "binary_client| AddNodes <--" << std::endl; }
00421       return response.results;
00422     }
00423 
00424     virtual std::vector<StatusCode> AddReferences(const std::vector<AddReferencesItem>& items)
00425     {
00426       if (Debug)  { std::cout << "binary_client| AddReferences -->" << std::endl; }
00427       AddReferencesRequest request;
00428       request.Parameters.ReferencesToAdd = items;
00429       const AddReferencesResponse response = Send<AddReferencesResponse>(request);
00430       if (Debug)  { std::cout << "binary_client| AddReferences <--" << std::endl; }
00431       return response.Results;
00432     }
00433 
00437     virtual std::shared_ptr<SubscriptionServices> Subscriptions() override
00438     {
00439       return shared_from_this();
00440     }
00441 
00442     virtual SubscriptionData CreateSubscription(const CreateSubscriptionRequest& request, std::function<void (PublishResult)> callback)
00443     {
00444       if (Debug)  { std::cout << "binary_client| CreateSubscription -->" << std::endl; }
00445       const CreateSubscriptionResponse response = Send<CreateSubscriptionResponse>(request);
00446       if (Debug) std::cout << "BinaryClient | got CreateSubscriptionResponse" << std::endl;
00447       PublishCallbacks[response.Data.SubscriptionId] = callback;// TODO Pass calback to the Publish method.
00448       if (Debug)  { std::cout << "binary_client| CreateSubscription <--" << std::endl; }
00449       return response.Data;
00450     }
00451 
00452     virtual std::vector<StatusCode> DeleteSubscriptions(const std::vector<uint32_t>& subscriptions)
00453     {
00454       if (Debug)  { std::cout << "binary_client| DeleteSubscriptions -->" << std::endl; }
00455       DeleteSubscriptionsRequest request;
00456       request.SubscriptionIds = subscriptions;
00457       const DeleteSubscriptionsResponse response = Send<DeleteSubscriptionsResponse>(request);
00458       if (Debug)  { std::cout << "binary_client| DeleteSubscriptions <--" << std::endl; }
00459       return response.Results;
00460     }
00461 
00462     virtual std::vector<MonitoredItemCreateResult> CreateMonitoredItems(const MonitoredItemsParameters& parameters)
00463     {
00464       if (Debug)  { std::cout << "binary_client| CreateMonitoredItems -->" << std::endl; }
00465       CreateMonitoredItemsRequest request;
00466       request.Parameters = parameters;
00467       const CreateMonitoredItemsResponse response = Send<CreateMonitoredItemsResponse>(request);
00468       if (Debug)  { std::cout << "binary_client| CreateMonitoredItems <--" << std::endl; }
00469       return response.Results;
00470     }
00471 
00472     virtual std::vector<StatusCode> DeleteMonitoredItems(const DeleteMonitoredItemsParameters& params)
00473     {
00474       if (Debug)  { std::cout << "binary_client| DeleteMonitoredItems -->" << std::endl; }
00475       DeleteMonitoredItemsRequest request;
00476       request.Parameters = params;
00477       const DeleteMonitoredItemsResponse response = Send<DeleteMonitoredItemsResponse>(request);
00478       if (Debug)  { std::cout << "binary_client| DeleteMonitoredItems <--" << std::endl; }
00479       return response.Results;
00480     }
00481 
00482     virtual void Publish(const PublishRequest& originalrequest)
00483     {
00484       if (Debug) {std::cout << "binary_client| Publish -->" << "request with " << originalrequest.SubscriptionAcknowledgements.size() << " acks" << std::endl;}
00485       PublishRequest request(originalrequest);
00486       request.Header = CreateRequestHeader();
00487       request.Header.Timeout = 0; //We do not want the request to timeout!
00488 
00489       ResponseCallback responseCallback = [this](std::vector<char> buffer, ResponseHeader h){
00490         if (Debug) {std::cout << "BinaryClient | Got Publish Response, from server " << std::endl;}
00491                 PublishResponse response;
00492                 if (h.ServiceResult != OpcUa::StatusCode::Good)
00493                 {
00494                         response.Header = std::move(h);
00495                 }
00496                 else
00497                 {
00498                         BufferInputChannel bufferInput(buffer);
00499                         IStreamBinary in(bufferInput);
00500                         in >> response;
00501                 }
00502 
00503         CallbackService.post([this, response]()
00504             {
00505                         if (response.Header.ServiceResult == OpcUa::StatusCode::Good)
00506                         {
00507                                 if (Debug) { std::cout << "BinaryClient | Calling callback for Subscription " << response.Parameters.SubscriptionId << std::endl; }
00508                                 SubscriptionCallbackMap::const_iterator callbackIt = this->PublishCallbacks.find(response.Parameters.SubscriptionId);
00509                                 if (callbackIt == this->PublishCallbacks.end())
00510                                 {
00511                                         std::cout << "BinaryClient | Error Unknown SubscriptionId " << response.Parameters.SubscriptionId << std::endl;
00512                                 }
00513                                 else
00514                                 {
00515                                         try { //calling client code, better put it under try/catch otherwise we crash entire client
00516                                                 callbackIt->second(response.Parameters);
00517                                         }
00518                                         catch (const std::exception& ex)
00519                                         {
00520                                                 std::cout << "Error calling application callback " << ex.what() << std::endl;
00521                                         }
00522                                 }
00523                         }
00524                         else if (response.Header.ServiceResult == OpcUa::StatusCode::BadSessionClosed)
00525                         {
00526                                 if (Debug)
00527                                 {
00528                                         std::cout << "BinaryClient | Session is closed";
00529                                 }
00530                         }
00531                         else
00532                         {
00533                                 // TODO
00534                         }
00535                         });
00536           };
00537           std::unique_lock<std::mutex> lock(Mutex);
00538           Callbacks.insert(std::make_pair(request.Header.RequestHandle, responseCallback));
00539           lock.unlock();
00540           Send(request);
00541           if (Debug) { std::cout << "binary_client| Publish  <--" << std::endl; }
00542         }
00543 
00544         virtual RepublishResponse Republish(const RepublishParameters& params)
00545         {
00546                 if (Debug) { std::cout << "binary_client| Republish -->" << std::endl; }
00547                 RepublishRequest request;
00548                 request.Header = CreateRequestHeader();
00549                 request.Parameters = params;
00550 
00551                 RepublishResponse response = Send<RepublishResponse>(request);
00552                 if (Debug) { std::cout << "binary_client| Republish  <--" << std::endl; }
00553                 return response;
00554         }
00555 
00559         virtual std::shared_ptr<ViewServices> Views() override
00560         {
00561                 return shared_from_this();
00562         }
00563 
00564         virtual std::vector<BrowsePathResult> TranslateBrowsePathsToNodeIds(const TranslateBrowsePathsParameters& params) const
00565         {
00566                 if (Debug)  { std::cout << "binary_client| TranslateBrowsePathsToNodeIds -->" << std::endl; }
00567                 TranslateBrowsePathsToNodeIdsRequest request;
00568                 request.Header = CreateRequestHeader();
00569                 request.Parameters = params;
00570                 const TranslateBrowsePathsToNodeIdsResponse response = Send<TranslateBrowsePathsToNodeIdsResponse>(request);
00571                 if (Debug)  { std::cout << "binary_client| TranslateBrowsePathsToNodeIds <--" << std::endl; }
00572                 return response.Result.Paths;
00573         }
00574 
00575 
00576         virtual std::vector<BrowseResult> Browse(const OpcUa::NodesQuery& query) const
00577         {
00578                 if (Debug)  {
00579                         std::cout << "binary_client| Browse -->";
00580                         for (BrowseDescription desc : query.NodesToBrowse)
00581                         {
00582                                 std::cout << desc.NodeToBrowse << "  ";
00583                         }
00584                         std::cout << std::endl;
00585                 }
00586                 BrowseRequest request;
00587                 request.Header = CreateRequestHeader();
00588                 request.Query = query;
00589                 const BrowseResponse response = Send<BrowseResponse>(request);
00590                 ContinuationPoints.clear();
00591                 for (BrowseResult result : response.Results)
00592                 {
00593                         if (!result.ContinuationPoint.empty())
00594                         {
00595                                 ContinuationPoints.push_back(result.ContinuationPoint);
00596                         }
00597                 }
00598                 if (Debug)  { std::cout << "binary_client| Browse <--" << std::endl; }
00599                 return  response.Results;
00600         }
00601 
00602         virtual std::vector<BrowseResult> BrowseNext() const
00603         {
00604                 //FIXME: fix method interface so we do not need to decice arbitriraly if we need to send BrowseNext or not...
00605                 if (ContinuationPoints.empty())
00606                 {
00607                         if (Debug)  { std::cout << "No Continuation point, no need to send browse next request" << std::endl; }
00608                         return std::vector<BrowseResult>();
00609                 }
00610                 if (Debug)  { std::cout << "binary_client| BrowseNext -->" << std::endl; }
00611                 BrowseNextRequest request;
00612                 request.ReleaseContinuationPoints = ContinuationPoints.empty() ? true : false;
00613                 request.ContinuationPoints = ContinuationPoints;
00614                 const BrowseNextResponse response = Send<BrowseNextResponse>(request);
00615                 ContinuationPoints.clear();
00616                 for (auto result : response.Results)
00617                 {
00618                         if (!result.ContinuationPoint.empty())
00619                         {
00620                                 ContinuationPoints.push_back(result.ContinuationPoint);
00621                         }
00622                 }
00623                 if (Debug)  { std::cout << "binary_client| BrowseNext <--" << std::endl; }
00624                 return response.Results;
00625         }
00626 
00627         std::vector<NodeId> RegisterNodes(const std::vector<NodeId>& params) const
00628         {
00629                 if (Debug)
00630                 {
00631                         std::cout << "binary_clinet| RegisterNodes -->\n\tNodes to register:" << std::endl;
00632                         for (auto& param : params) {
00633                                 std::cout << "\t\t" << param << std::endl;
00634                         }
00635                 }
00636 
00637                 RegisterNodesRequest request;
00638 
00639                 request.NodesToRegister = params;
00640                 RegisterNodesResponse response = Send<RegisterNodesResponse>(request);
00641                 if (Debug)
00642                 {
00643                         std::cout << "binary_client| RegisterNodes <--\n\tRegistered NodeIds:" << std::endl;
00644                         for (auto&id : response.Result) {
00645                                 std::cout << "\t\t" << id << std::endl;
00646                         }
00647                 }
00648                 return response.Result;
00649         }
00650 
00651         void UnregisterNodes(const std::vector<NodeId>& params) const
00652         {
00653                 if (Debug) {
00654                         std::cout << "binary_client| UnregisterNodes -->\n\tUnregistering nodes:" << std::endl;
00655                         for (auto& id : params) {
00656                                 std::cout << "\t\t" << id << std::endl;
00657                         }
00658                 }
00659 
00660                 UnregisterNodesRequest request;
00661                 request.NodesToUnregister = params;
00662                 UnregisterNodesResponse response = Send<UnregisterNodesResponse>(request);
00663 
00664                 if (Debug) {
00665                         std::cout << "binary_client| UnregisterNodes <--" << std::endl;
00666                 }
00667         }
00668 
00669   private:
00670     //FIXME: this method should be removed, better add realease option to BrowseNext
00671     void Release() const
00672     {
00673       ContinuationPoints.clear();
00674       BrowseNext();
00675     }
00676 
00677   public:
00678 
00682     virtual OpcUa::OpenSecureChannelResponse OpenSecureChannel(const OpenSecureChannelParameters& params)
00683     {
00684       if (Debug) {std::cout << "binary_client| OpenChannel -->" << std::endl;}
00685 
00686       OpenSecureChannelRequest request;
00687       request.Parameters = params;
00688 
00689       OpenSecureChannelResponse response = Send<OpenSecureChannelResponse>(request);
00690 
00691       ChannelSecurityToken = response.ChannelSecurityToken; //Save security token, we need it
00692 
00693       if (Debug) {std::cout << "binary_client| OpenChannel <--" << std::endl;}
00694       return response;
00695     }
00696 
00697     virtual void CloseSecureChannel(uint32_t channelId)
00698     {
00699       try
00700       {
00701         if (Debug) {std::cout << "binary_client| CloseSecureChannel -->" << std::endl;}
00702         SecureHeader hdr(MT_SECURE_CLOSE, CHT_SINGLE, ChannelSecurityToken.SecureChannelId);
00703 
00704         const SymmetricAlgorithmHeader algorithmHeader = CreateAlgorithmHeader();
00705         hdr.AddSize(RawSize(algorithmHeader));
00706 
00707         const SequenceHeader sequence = CreateSequenceHeader();
00708         hdr.AddSize(RawSize(sequence));
00709 
00710         CloseSecureChannelRequest request;
00711         //request. ChannelId = channelId; FIXME: spec says it hsould be here, in practice it is not even sent?!?!
00712         hdr.AddSize(RawSize(request));
00713 
00714         Stream << hdr << algorithmHeader << sequence << request << flush;
00715         if (Debug) {std::cout << "binary_client| Secure channel closed." << std::endl;}
00716       }
00717       catch (const std::exception& exc)
00718       {
00719         std::cerr << "Closing secure channel failed with error: " << exc.what() << std::endl;
00720       }
00721       if (Debug) {std::cout << "binary_client| CloseSecureChannel <--" << std::endl;}
00722     }
00723 
00724 private:
00725     template <typename Response, typename Request>
00726     Response Send(Request request) const
00727     {
00728       request.Header = CreateRequestHeader();
00729 
00730       RequestCallback<Response> requestCallback;
00731       ResponseCallback responseCallback = [&requestCallback](std::vector<char> buffer, ResponseHeader h){
00732         requestCallback.OnData(std::move(buffer), std::move(h));
00733       };
00734       std::unique_lock<std::mutex> lock(Mutex);
00735       Callbacks.insert(std::make_pair(request.Header.RequestHandle, responseCallback));
00736       lock.unlock();
00737 
00738       Send(request);
00739 
00740       return requestCallback.WaitForData(std::chrono::milliseconds(request.Header.Timeout));
00741     }
00742 
00743     // Prevent multiple threads from sending parts of different packets at the same time.
00744     mutable std::mutex send_mutex;
00745 
00746     template <typename Request>
00747     void Send(Request request) const
00748     {
00749       // TODO add support for breaking message into multiple chunks
00750       SecureHeader hdr(MT_SECURE_MESSAGE, CHT_SINGLE, ChannelSecurityToken.SecureChannelId);
00751       const SymmetricAlgorithmHeader algorithmHeader = CreateAlgorithmHeader();
00752       hdr.AddSize(RawSize(algorithmHeader));
00753 
00754       const SequenceHeader sequence = CreateSequenceHeader();
00755       hdr.AddSize(RawSize(sequence));
00756       hdr.AddSize(RawSize(request));
00757 
00758           std::unique_lock<std::mutex> send_lock(send_mutex);
00759       Stream << hdr << algorithmHeader << sequence << request << flush;
00760     }
00761 
00762 
00763 
00764     void Receive() const
00765     {
00766       Binary::SecureHeader responseHeader;
00767       Stream >> responseHeader;
00768 
00769       size_t algo_size;
00770       if (responseHeader.Type == MessageType::MT_SECURE_OPEN )
00771       {
00772         AsymmetricAlgorithmHeader responseAlgo;
00773         Stream >> responseAlgo;
00774         algo_size = RawSize(responseAlgo);
00775       }
00776       else if (responseHeader.Type == MessageType::MT_ERROR )
00777       {
00778         StatusCode error;
00779         std::string msg;
00780         Stream >> error;
00781         Stream >> msg;
00782         std::stringstream stream;
00783         stream << "Received error message from server: " << ToString(error) << ", " << msg ;
00784         throw std::runtime_error(stream.str());
00785       }
00786       else //(responseHeader.Type == MessageType::MT_SECURE_MESSAGE )
00787       {
00788         Binary::SymmetricAlgorithmHeader responseAlgo;
00789         Stream >> responseAlgo;
00790         algo_size = RawSize(responseAlgo);
00791       }
00792 
00793       Binary::SequenceHeader responseSequence;
00794       Stream >> responseSequence; // TODO Check for request Number
00795 
00796       const std::size_t expectedHeaderSize = RawSize(responseHeader) + algo_size + RawSize(responseSequence);
00797       if (expectedHeaderSize >= responseHeader.Size)
00798       {
00799         std::stringstream stream;
00800         stream << "Size of received message " << responseHeader.Size << " bytes is invalid. Expected size " << expectedHeaderSize << " bytes";
00801         throw std::runtime_error(stream.str());
00802       }
00803 
00804       const std::size_t dataSize = responseHeader.Size - expectedHeaderSize;
00805       std::vector<char> buffer(dataSize);
00806       BufferInputChannel bufferInput(buffer);
00807       Binary::RawBuffer raw(&buffer[0], dataSize);
00808       Stream >> raw;
00809 
00810       IStreamBinary in(bufferInput);
00811       NodeId id;
00812       in >> id;
00813       ResponseHeader header;
00814       in >> header;
00815       if ( Debug )std::cout << "binary_client| Got response id: " << id << " and handle " << header.RequestHandle<< std::endl;
00816 
00817       if (header.ServiceResult != StatusCode::Good) {
00818         std::cout << "binary_client| Received a response from server with error status: " << OpcUa::ToString(header.ServiceResult) <<  std::endl;
00819       }
00820 
00821       if (id == SERVICE_FAULT)
00822       {
00823         std::cerr << std::endl;
00824         std::cerr << "Receive ServiceFault from Server with StatusCode " << OpcUa::ToString(header.ServiceResult) << std::endl;
00825         std::cerr << std::endl;
00826       }
00827       std::unique_lock<std::mutex> lock(Mutex);
00828       CallbackMap::const_iterator callbackIt = Callbacks.find(header.RequestHandle);
00829       if (callbackIt == Callbacks.end())
00830       {
00831         std::cout << "binary_client| No callback found for message with id: " << id << " and handle " << header.RequestHandle << std::endl;
00832         return;
00833       }
00834           callbackIt->second(std::move(buffer), std::move(header));
00835 
00836       Callbacks.erase(callbackIt);
00837     }
00838 
00839     Binary::Acknowledge HelloServer(const SecureConnectionParams& params)
00840     {
00841       if (Debug) {std::cout << "binary_client| HelloServer -->" << std::endl;}
00842       Binary::Hello hello;
00843       hello.ProtocolVersion = 0;
00844       hello.ReceiveBufferSize = 65536;
00845       hello.SendBufferSize = 65536;
00846       hello.MaxMessageSize = 65536;
00847       hello.MaxChunkCount = 256;
00848       hello.EndpointUrl = params.EndpointUrl;
00849 
00850       Binary::Header hdr(Binary::MT_HELLO, Binary::CHT_SINGLE);
00851       hdr.AddSize(RawSize(hello));
00852 
00853       Stream << hdr << hello << flush;
00854 
00855       Header respHeader;
00856       Stream >> respHeader; // TODO add check for acknowledge header
00857 
00858       Acknowledge ack;
00859       Stream >> ack; // TODO check for connection parameters
00860       if (Debug) {std::cout << "binary_client| HelloServer <--" << std::endl;}
00861       return ack;
00862     }
00863 
00864 
00865     SymmetricAlgorithmHeader CreateAlgorithmHeader() const
00866     {
00867       SymmetricAlgorithmHeader algorithmHeader;
00868       algorithmHeader.TokenId = ChannelSecurityToken.TokenId;
00869       return algorithmHeader;
00870     }
00871 
00872     SequenceHeader CreateSequenceHeader() const
00873     {
00874       SequenceHeader sequence;
00875       sequence.SequenceNumber = ++SequenceNumber;
00876       sequence.RequestId = ++RequestNumber;
00877       return sequence;
00878     }
00879 
00880     RequestHeader CreateRequestHeader() const
00881     {
00882       RequestHeader header;
00883       header.SessionAuthenticationToken = AuthenticationToken;
00884       header.RequestHandle = GetRequestHandle();
00885       header.Timeout = 10000;
00886       return header;
00887     }
00888 
00889     unsigned GetRequestHandle() const
00890     {
00891       return ++RequestHandle;
00892     }
00893 
00894     // Binary client is self-referenced from captures of subscription callbacks
00895     // Remove this references to make ~BinaryClient() run possible
00896     void RemoveSelfReferences()
00897     {
00898       if (Debug)  { std::cout << "binary_client| Clearing cached references to server" << std::endl; }
00899       PublishCallbacks.clear();
00900     }
00901 
00902   private:
00903     std::shared_ptr<IOChannel> Channel;
00904     mutable IOStreamBinary Stream;
00905     SecureConnectionParams Params;
00906     std::thread ReceiveThread;
00907 
00908     SubscriptionCallbackMap PublishCallbacks;
00909     SecurityToken ChannelSecurityToken;
00910     mutable std::atomic<uint32_t> SequenceNumber;
00911     mutable std::atomic<uint32_t> RequestNumber;
00912     ExpandedNodeId AuthenticationToken;
00913     mutable std::atomic<uint32_t> RequestHandle;
00914     mutable std::vector<std::vector<uint8_t>> ContinuationPoints;
00915     mutable CallbackMap Callbacks;
00916     const bool Debug = true;
00917     bool Finished = false;
00918 
00919     std::thread callback_thread;
00920     CallbackThread CallbackService;
00921     mutable std::mutex Mutex;
00922 
00923   };
00924 
00925   template <>
00926   void BinaryClient::Send<OpenSecureChannelRequest>(OpenSecureChannelRequest request) const
00927   {
00928     SecureHeader hdr(MT_SECURE_OPEN, CHT_SINGLE, ChannelSecurityToken.SecureChannelId);
00929     AsymmetricAlgorithmHeader algorithmHeader;
00930     algorithmHeader.SecurityPolicyUri = Params.SecurePolicy;
00931     algorithmHeader.SenderCertificate = Params.SenderCertificate;
00932     algorithmHeader.ReceiverCertificateThumbPrint = Params.ReceiverCertificateThumbPrint;
00933     hdr.AddSize(RawSize(algorithmHeader));
00934     hdr.AddSize(RawSize(request));
00935 
00936     const SequenceHeader sequence = CreateSequenceHeader();
00937     hdr.AddSize(RawSize(sequence));
00938     Stream << hdr << algorithmHeader << sequence << request << flush;
00939   }
00940 
00941 } // namespace
00942 
00943 
00944 OpcUa::Services::SharedPtr OpcUa::CreateBinaryClient(OpcUa::IOChannel::SharedPtr channel, const OpcUa::SecureConnectionParams& params, bool debug)
00945 {
00946   return std::make_shared<BinaryClient>(channel, params, debug);
00947 }
00948 
00949 OpcUa::Services::SharedPtr OpcUa::CreateBinaryClient(const std::string& endpointUrl, bool debug)
00950 {
00951   const Common::Uri serverUri(endpointUrl);
00952   OpcUa::IOChannel::SharedPtr channel = OpcUa::Connect(serverUri.Host(), serverUri.Port());
00953   OpcUa::SecureConnectionParams params;
00954   params.EndpointUrl = endpointUrl;
00955   params.SecurePolicy = "http://opcfoundation.org/UA/SecurityPolicy#None";
00956   return CreateBinaryClient(channel, params, debug);
00957 }


ros_opcua_impl_freeopcua
Author(s): Denis Štogl
autogenerated on Sat Jun 8 2019 18:24:39