00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011 #include <opc/ua/protocol/utils.h>
00012 #include <opc/ua/client/binary_client.h>
00013 #include <opc/ua/client/remote_connection.h>
00014
00015 #include <opc/common/uri_facade.h>
00016 #include <opc/ua/protocol/binary/stream.h>
00017 #include <opc/ua/protocol/channel.h>
00018 #include <opc/ua/protocol/secure_channel.h>
00019 #include <opc/ua/protocol/session.h>
00020 #include <opc/ua/protocol/string_utils.h>
00021 #include <opc/ua/services/services.h>
00022
00023 #include <atomic>
00024 #include <chrono>
00025 #include <condition_variable>
00026 #include <mutex>
00027 #include <queue>
00028 #include <thread>
00029 #include <iostream>
00030
00031
00032 namespace
00033 {
00034
00035 using namespace OpcUa;
00036 using namespace OpcUa::Binary;
00037
00038 typedef std::map<uint32_t, std::function<void (PublishResult)>> SubscriptionCallbackMap;
00039
00040 class BufferInputChannel : public OpcUa::InputChannel
00041 {
00042 public:
00043 BufferInputChannel(const std::vector<char>& buffer)
00044 : Buffer(buffer)
00045 , Pos(0)
00046 {
00047 Reset();
00048 }
00049
00050 virtual std::size_t Receive(char* data, std::size_t size)
00051 {
00052 if (Pos >= Buffer.size())
00053 {
00054 return 0;
00055 }
00056
00057 size = std::min(size, Buffer.size() - Pos);
00058 std::vector<char>::const_iterator begin = Buffer.begin() + Pos;
00059 std::vector<char>::const_iterator end = begin + size;
00060 std::copy(begin, end, data);
00061 Pos += size;
00062 return size;
00063 }
00064
00065 void Reset()
00066 {
00067 Pos = 0;
00068 }
00069
00070 virtual void Stop()
00071 {
00072 }
00073
00074 private:
00075 const std::vector<char>& Buffer;
00076 std::size_t Pos;
00077 };
00078
00079
00080 template <typename T>
00081 class RequestCallback
00082 {
00083 public:
00084 RequestCallback()
00085 : lock(m)
00086 {
00087 }
00088
00089 void OnData(std::vector<char> data, ResponseHeader h)
00090 {
00091
00092 Data = std::move(data);
00093 this->header = std::move(h);
00094 doneEvent.notify_all();
00095 }
00096
00097 T WaitForData(std::chrono::milliseconds msec)
00098 {
00099 doneEvent.wait_for(lock, msec);
00100 T result;
00101 result.Header = std::move(this->header);
00102 if ( Data.empty() )
00103 {
00104 std::cout << "Error: received empty packet from server" << std::endl;
00105 }
00106 else
00107 {
00108 BufferInputChannel bufferInput(Data);
00109 IStreamBinary in(bufferInput);
00110 in >> result;
00111 }
00112 return result;
00113 }
00114
00115 private:
00116 std::vector<char> Data;
00117 ResponseHeader header;
00118 std::mutex m;
00119 std::unique_lock<std::mutex> lock;
00120 std::condition_variable doneEvent;
00121 };
00122
00123 class CallbackThread
00124 {
00125 public:
00126 CallbackThread(bool debug=false) : Debug(debug), StopRequest(false)
00127 {
00128
00129 }
00130
00131 void post(std::function<void()> callback)
00132 {
00133 if (Debug) { std::cout << "binary_client| CallbackThread : start post" << std::endl; }
00134 std::unique_lock<std::mutex> lock(Mutex);
00135 Queue.push(callback);
00136 Condition.notify_one();
00137 if (Debug) { std::cout << "binary_client| CallbackThread : end post" << std::endl; }
00138 }
00139
00140 void Run()
00141 {
00142 while (true)
00143 {
00144 if (Debug) { std::cout << "binary_client| CallbackThread : waiting for next post" << std::endl; }
00145 std::unique_lock<std::mutex> lock(Mutex);
00146 Condition.wait(lock, [&]() { return (StopRequest == true) || ( ! Queue.empty() ) ;} );
00147 if (StopRequest)
00148 {
00149 if (Debug) { std::cout << "binary_client| CallbackThread : exited." << std::endl; }
00150 return;
00151 }
00152 while ( ! Queue.empty() )
00153 {
00154 if (Debug) { std::cout << "binary_client| CallbackThread : condition has triggered copying callback and poping. queue size is " << Queue.size() << std::endl; }
00155 std::function<void()> callbackcopy = Queue.front();
00156 Queue.pop();
00157 lock.unlock();
00158 if (Debug) { std::cout << "binary_client| CallbackThread : now calling callback." << std::endl; }
00159 callbackcopy();
00160 if (Debug) { std::cout << "binary_client| CallbackThread : callback called." << std::endl; }
00161 lock.lock();
00162 }
00163 }
00164 }
00165
00166 void Stop()
00167 {
00168 if (Debug) { std::cout << "binary_client| CallbackThread : stopping." << std::endl; }
00169 StopRequest = true;
00170 Condition.notify_all();
00171 }
00172
00173 private:
00174 bool Debug = false;
00175 std::mutex Mutex;
00176 std::condition_variable Condition;
00177 std::atomic<bool> StopRequest;
00178 std::queue<std::function<void()>> Queue;
00179 };
00180
00181 class BinaryClient
00182 : public Services
00183 , public AttributeServices
00184 , public EndpointServices
00185 , public MethodServices
00186 , public NodeManagementServices
00187 , public SubscriptionServices
00188 , public ViewServices
00189 , public std::enable_shared_from_this<BinaryClient>
00190 {
00191 private:
00192 typedef std::function<void(std::vector<char>, ResponseHeader)> ResponseCallback;
00193 typedef std::map<uint32_t, ResponseCallback> CallbackMap;
00194
00195 public:
00196 BinaryClient(std::shared_ptr<IOChannel> channel, const SecureConnectionParams& params, bool debug)
00197 : Channel(channel)
00198 , Stream(channel)
00199 , Params(params)
00200 , SequenceNumber(1)
00201 , RequestNumber(1)
00202 , RequestHandle(0)
00203 , Debug(debug)
00204 , CallbackService(debug)
00205
00206 {
00207
00208 callback_thread = std::thread([&](){ CallbackService.Run(); });
00209
00210 HelloServer(params);
00211
00212 ReceiveThread = std::move(std::thread([this](){
00213 try
00214 {
00215 while(!Finished)
00216 Receive();
00217 }
00218 catch (const std::exception& exc)
00219 {
00220 if (Debug) { std::cerr << "binary_client| CallbackThread : Error receiving data: "; }
00221 std::cerr << exc.what() << std::endl;
00222 }
00223 }));
00224 }
00225
00226 ~BinaryClient()
00227 {
00228 Finished = true;
00229
00230 if (Debug) std::cout << "binary_client| Stopping callback thread." << std::endl;
00231 CallbackService.Stop();
00232 if (Debug) std::cout << "binary_client| Joining service thread." << std::endl;
00233 callback_thread.join();
00234
00235 Channel->Stop();
00236 if (Debug) std::cout << "binary_client| Joining receive thread." << std::endl;
00237 ReceiveThread.join();
00238 if (Debug) std::cout << "binary_client| Receive tread stopped." << std::endl;
00239
00240 if (Debug) std::cout << "binary_client| Destroyed." << std::endl;
00241 }
00242
00246 virtual CreateSessionResponse CreateSession(const RemoteSessionParameters& parameters)
00247 {
00248 if (Debug) { std::cout << "binary_client| CreateSession -->" << std::endl; }
00249 CreateSessionRequest request;
00250 request.Header = CreateRequestHeader();
00251
00252 request.Parameters.ClientDescription.ApplicationUri = parameters.ClientDescription.ApplicationUri;
00253 request.Parameters.ClientDescription.ProductUri = parameters.ClientDescription.ProductUri;
00254 request.Parameters.ClientDescription.ApplicationName = parameters.ClientDescription.ApplicationName;
00255 request.Parameters.ClientDescription.ApplicationType = parameters.ClientDescription.ApplicationType;
00256 request.Parameters.ClientDescription.GatewayServerUri = parameters.ClientDescription.GatewayServerUri;
00257 request.Parameters.ClientDescription.DiscoveryProfileUri = parameters.ClientDescription.DiscoveryProfileUri;
00258 request.Parameters.ClientDescription.DiscoveryUrls = parameters.ClientDescription.DiscoveryUrls;
00259
00260 request.Parameters.ServerUri = parameters.ServerURI;
00261 request.Parameters.EndpointUrl = parameters.EndpointUrl;
00262 request.Parameters.SessionName = parameters.SessionName;
00263 request.Parameters.ClientNonce = ByteString(std::vector<uint8_t>(32,0));
00264 request.Parameters.ClientCertificate = ByteString(parameters.ClientCertificate);
00265 request.Parameters.RequestedSessionTimeout = parameters.Timeout;
00266 request.Parameters.MaxResponseMessageSize = 65536;
00267 CreateSessionResponse response = Send<CreateSessionResponse>(request);
00268 AuthenticationToken = response.Parameters.AuthenticationToken;
00269 if (Debug) { std::cout << "binary_client| CreateSession <--" << std::endl; }
00270 return response;
00271 }
00272
00273 ActivateSessionResponse ActivateSession(const ActivateSessionParameters &session_parameters) override
00274 {
00275 if (Debug) { std::cout << "binary_client| ActivateSession -->" << std::endl; }
00276 ActivateSessionRequest request;
00277 request.Parameters = session_parameters;
00278 request.Parameters.LocaleIds.push_back("en");
00279 ActivateSessionResponse response = Send<ActivateSessionResponse>(request);
00280 if (Debug) { std::cout << "binary_client| ActivateSession <--" << std::endl; }
00281 return response;
00282 }
00283
00284 virtual CloseSessionResponse CloseSession()
00285 {
00286 if (Debug) { std::cout << "binary_client| CloseSession -->" << std::endl; }
00287 CloseSessionRequest request;
00288 CloseSessionResponse response = Send<CloseSessionResponse>(request);
00289 RemoveSelfReferences();
00290 if (Debug) { std::cout << "binary_client| CloseSession <--" << std::endl; }
00291 return response;
00292 }
00293
00294 virtual void AbortSession()
00295 {
00296 if (Debug) { std::cout << "binary_client| AbortSession -->" << std::endl; }
00297 RemoveSelfReferences();
00298 if (Debug) { std::cout << "binary_client| AbortSession <--" << std::endl; }
00299 }
00300
00301 DeleteNodesResponse DeleteNodes(const std::vector<OpcUa::DeleteNodesItem> &nodesToDelete) override
00302 {
00303 if (Debug) { std::cout << "binary_client| DeleteNodes -->" << std::endl; }
00304 DeleteNodesRequest request;
00305 request.NodesToDelete = nodesToDelete;
00306 DeleteNodesResponse response = Send<DeleteNodesResponse>(request);
00307 if (Debug) { std::cout << "binary_client| DeleteNodes <--" << std::endl; }
00308 return response;
00309 }
00310
00314 virtual std::shared_ptr<AttributeServices> Attributes() override
00315 {
00316 return shared_from_this();
00317 }
00318
00319 public:
00320 virtual std::vector<DataValue> Read(const ReadParameters& params) const
00321 {
00322 if (Debug) {
00323 std::cout << "binary_client| Read -->" << std::endl;
00324 for ( ReadValueId attr : params.AttributesToRead )
00325 {
00326 std::cout << attr.NodeId << " " << (uint32_t)attr.AttributeId;
00327 }
00328 std::cout << std::endl;
00329 }
00330 ReadRequest request;
00331 request.Parameters = params;
00332 const ReadResponse response = Send<ReadResponse>(request);
00333 if (Debug) { std::cout << "binary_client| Read <--" << std::endl; }
00334 return response.Results;
00335 }
00336
00337 virtual std::vector<OpcUa::StatusCode> Write(const std::vector<WriteValue>& values)
00338 {
00339 if (Debug) { std::cout << "binary_client| Write -->" << std::endl; }
00340 WriteRequest request;
00341 request.Parameters.NodesToWrite = values;
00342 const WriteResponse response = Send<WriteResponse>(request);
00343 if (Debug) { std::cout << "binary_client| Write <--" << std::endl; }
00344 return response.Results;
00345 }
00346
00350 virtual std::shared_ptr<EndpointServices> Endpoints() override
00351 {
00352 return shared_from_this();
00353 }
00354
00355 virtual std::vector<ApplicationDescription> FindServers(const FindServersParameters& params) const
00356 {
00357 if (Debug) { std::cout << "binary_client| FindServers -->" << std::endl; }
00358 OpcUa::FindServersRequest request;
00359 request.Parameters = params;
00360 FindServersResponse response = Send<FindServersResponse>(request);
00361 if (Debug) { std::cout << "binary_client| FindServers <--" << std::endl; }
00362 return response.Data.Descriptions;
00363 }
00364
00365 virtual std::vector<EndpointDescription> GetEndpoints(const GetEndpointsParameters& filter) const
00366 {
00367 if (Debug) { std::cout << "binary_client| GetEndpoints -->" << std::endl; }
00368 OpcUa::GetEndpointsRequest request;
00369 request.Header = CreateRequestHeader();
00370 request.Parameters.EndpointUrl = filter.EndpointUrl;
00371 request.Parameters.LocaleIds = filter.LocaleIds;
00372 request.Parameters.ProfileUris = filter.ProfileUris;
00373 const GetEndpointsResponse response = Send<GetEndpointsResponse>(request);
00374 if (Debug) { std::cout << "binary_client| GetEndpoints <--" << std::endl; }
00375 return response.Endpoints;
00376 }
00377
00378 virtual void RegisterServer(const ServerParameters& parameters)
00379 {
00380 }
00381
00385 virtual std::shared_ptr<MethodServices> Method() override
00386 {
00387 return shared_from_this();
00388 }
00389
00390 virtual std::vector<CallMethodResult> Call(const std::vector<CallMethodRequest>& methodsToCall)
00391 {
00392 if (Debug) {std::cout << "binary_clinent | Call -->" << std::endl;}
00393 CallRequest request;
00394 request.MethodsToCall = methodsToCall;
00395 const CallResponse response = Send<CallResponse>(request);
00396 if (Debug) {std::cout << "binary_clinent | Call <--" << std::endl;}
00397
00398
00399
00400
00401
00402 return response.Results;
00403 }
00404
00408
00409 virtual std::shared_ptr<NodeManagementServices> NodeManagement() override
00410 {
00411 return shared_from_this();
00412 }
00413
00414 virtual std::vector<AddNodesResult> AddNodes(const std::vector<AddNodesItem>& items)
00415 {
00416 if (Debug) { std::cout << "binary_client| AddNodes -->" << std::endl; }
00417 AddNodesRequest request;
00418 request.Parameters.NodesToAdd = items;
00419 const AddNodesResponse response = Send<AddNodesResponse>(request);
00420 if (Debug) { std::cout << "binary_client| AddNodes <--" << std::endl; }
00421 return response.results;
00422 }
00423
00424 virtual std::vector<StatusCode> AddReferences(const std::vector<AddReferencesItem>& items)
00425 {
00426 if (Debug) { std::cout << "binary_client| AddReferences -->" << std::endl; }
00427 AddReferencesRequest request;
00428 request.Parameters.ReferencesToAdd = items;
00429 const AddReferencesResponse response = Send<AddReferencesResponse>(request);
00430 if (Debug) { std::cout << "binary_client| AddReferences <--" << std::endl; }
00431 return response.Results;
00432 }
00433
00437 virtual std::shared_ptr<SubscriptionServices> Subscriptions() override
00438 {
00439 return shared_from_this();
00440 }
00441
00442 virtual SubscriptionData CreateSubscription(const CreateSubscriptionRequest& request, std::function<void (PublishResult)> callback)
00443 {
00444 if (Debug) { std::cout << "binary_client| CreateSubscription -->" << std::endl; }
00445 const CreateSubscriptionResponse response = Send<CreateSubscriptionResponse>(request);
00446 if (Debug) std::cout << "BinaryClient | got CreateSubscriptionResponse" << std::endl;
00447 PublishCallbacks[response.Data.SubscriptionId] = callback;
00448 if (Debug) { std::cout << "binary_client| CreateSubscription <--" << std::endl; }
00449 return response.Data;
00450 }
00451
00452 virtual std::vector<StatusCode> DeleteSubscriptions(const std::vector<uint32_t>& subscriptions)
00453 {
00454 if (Debug) { std::cout << "binary_client| DeleteSubscriptions -->" << std::endl; }
00455 DeleteSubscriptionsRequest request;
00456 request.SubscriptionIds = subscriptions;
00457 const DeleteSubscriptionsResponse response = Send<DeleteSubscriptionsResponse>(request);
00458 if (Debug) { std::cout << "binary_client| DeleteSubscriptions <--" << std::endl; }
00459 return response.Results;
00460 }
00461
00462 virtual std::vector<MonitoredItemCreateResult> CreateMonitoredItems(const MonitoredItemsParameters& parameters)
00463 {
00464 if (Debug) { std::cout << "binary_client| CreateMonitoredItems -->" << std::endl; }
00465 CreateMonitoredItemsRequest request;
00466 request.Parameters = parameters;
00467 const CreateMonitoredItemsResponse response = Send<CreateMonitoredItemsResponse>(request);
00468 if (Debug) { std::cout << "binary_client| CreateMonitoredItems <--" << std::endl; }
00469 return response.Results;
00470 }
00471
00472 virtual std::vector<StatusCode> DeleteMonitoredItems(const DeleteMonitoredItemsParameters& params)
00473 {
00474 if (Debug) { std::cout << "binary_client| DeleteMonitoredItems -->" << std::endl; }
00475 DeleteMonitoredItemsRequest request;
00476 request.Parameters = params;
00477 const DeleteMonitoredItemsResponse response = Send<DeleteMonitoredItemsResponse>(request);
00478 if (Debug) { std::cout << "binary_client| DeleteMonitoredItems <--" << std::endl; }
00479 return response.Results;
00480 }
00481
00482 virtual void Publish(const PublishRequest& originalrequest)
00483 {
00484 if (Debug) {std::cout << "binary_client| Publish -->" << "request with " << originalrequest.SubscriptionAcknowledgements.size() << " acks" << std::endl;}
00485 PublishRequest request(originalrequest);
00486 request.Header = CreateRequestHeader();
00487 request.Header.Timeout = 0;
00488
00489 ResponseCallback responseCallback = [this](std::vector<char> buffer, ResponseHeader h){
00490 if (Debug) {std::cout << "BinaryClient | Got Publish Response, from server " << std::endl;}
00491 PublishResponse response;
00492 if (h.ServiceResult != OpcUa::StatusCode::Good)
00493 {
00494 response.Header = std::move(h);
00495 }
00496 else
00497 {
00498 BufferInputChannel bufferInput(buffer);
00499 IStreamBinary in(bufferInput);
00500 in >> response;
00501 }
00502
00503 CallbackService.post([this, response]()
00504 {
00505 if (response.Header.ServiceResult == OpcUa::StatusCode::Good)
00506 {
00507 if (Debug) { std::cout << "BinaryClient | Calling callback for Subscription " << response.Parameters.SubscriptionId << std::endl; }
00508 SubscriptionCallbackMap::const_iterator callbackIt = this->PublishCallbacks.find(response.Parameters.SubscriptionId);
00509 if (callbackIt == this->PublishCallbacks.end())
00510 {
00511 std::cout << "BinaryClient | Error Unknown SubscriptionId " << response.Parameters.SubscriptionId << std::endl;
00512 }
00513 else
00514 {
00515 try {
00516 callbackIt->second(response.Parameters);
00517 }
00518 catch (const std::exception& ex)
00519 {
00520 std::cout << "Error calling application callback " << ex.what() << std::endl;
00521 }
00522 }
00523 }
00524 else if (response.Header.ServiceResult == OpcUa::StatusCode::BadSessionClosed)
00525 {
00526 if (Debug)
00527 {
00528 std::cout << "BinaryClient | Session is closed";
00529 }
00530 }
00531 else
00532 {
00533
00534 }
00535 });
00536 };
00537 std::unique_lock<std::mutex> lock(Mutex);
00538 Callbacks.insert(std::make_pair(request.Header.RequestHandle, responseCallback));
00539 lock.unlock();
00540 Send(request);
00541 if (Debug) { std::cout << "binary_client| Publish <--" << std::endl; }
00542 }
00543
00544 virtual RepublishResponse Republish(const RepublishParameters& params)
00545 {
00546 if (Debug) { std::cout << "binary_client| Republish -->" << std::endl; }
00547 RepublishRequest request;
00548 request.Header = CreateRequestHeader();
00549 request.Parameters = params;
00550
00551 RepublishResponse response = Send<RepublishResponse>(request);
00552 if (Debug) { std::cout << "binary_client| Republish <--" << std::endl; }
00553 return response;
00554 }
00555
00559 virtual std::shared_ptr<ViewServices> Views() override
00560 {
00561 return shared_from_this();
00562 }
00563
00564 virtual std::vector<BrowsePathResult> TranslateBrowsePathsToNodeIds(const TranslateBrowsePathsParameters& params) const
00565 {
00566 if (Debug) { std::cout << "binary_client| TranslateBrowsePathsToNodeIds -->" << std::endl; }
00567 TranslateBrowsePathsToNodeIdsRequest request;
00568 request.Header = CreateRequestHeader();
00569 request.Parameters = params;
00570 const TranslateBrowsePathsToNodeIdsResponse response = Send<TranslateBrowsePathsToNodeIdsResponse>(request);
00571 if (Debug) { std::cout << "binary_client| TranslateBrowsePathsToNodeIds <--" << std::endl; }
00572 return response.Result.Paths;
00573 }
00574
00575
00576 virtual std::vector<BrowseResult> Browse(const OpcUa::NodesQuery& query) const
00577 {
00578 if (Debug) {
00579 std::cout << "binary_client| Browse -->";
00580 for (BrowseDescription desc : query.NodesToBrowse)
00581 {
00582 std::cout << desc.NodeToBrowse << " ";
00583 }
00584 std::cout << std::endl;
00585 }
00586 BrowseRequest request;
00587 request.Header = CreateRequestHeader();
00588 request.Query = query;
00589 const BrowseResponse response = Send<BrowseResponse>(request);
00590 ContinuationPoints.clear();
00591 for (BrowseResult result : response.Results)
00592 {
00593 if (!result.ContinuationPoint.empty())
00594 {
00595 ContinuationPoints.push_back(result.ContinuationPoint);
00596 }
00597 }
00598 if (Debug) { std::cout << "binary_client| Browse <--" << std::endl; }
00599 return response.Results;
00600 }
00601
00602 virtual std::vector<BrowseResult> BrowseNext() const
00603 {
00604
00605 if (ContinuationPoints.empty())
00606 {
00607 if (Debug) { std::cout << "No Continuation point, no need to send browse next request" << std::endl; }
00608 return std::vector<BrowseResult>();
00609 }
00610 if (Debug) { std::cout << "binary_client| BrowseNext -->" << std::endl; }
00611 BrowseNextRequest request;
00612 request.ReleaseContinuationPoints = ContinuationPoints.empty() ? true : false;
00613 request.ContinuationPoints = ContinuationPoints;
00614 const BrowseNextResponse response = Send<BrowseNextResponse>(request);
00615 ContinuationPoints.clear();
00616 for (auto result : response.Results)
00617 {
00618 if (!result.ContinuationPoint.empty())
00619 {
00620 ContinuationPoints.push_back(result.ContinuationPoint);
00621 }
00622 }
00623 if (Debug) { std::cout << "binary_client| BrowseNext <--" << std::endl; }
00624 return response.Results;
00625 }
00626
00627 std::vector<NodeId> RegisterNodes(const std::vector<NodeId>& params) const
00628 {
00629 if (Debug)
00630 {
00631 std::cout << "binary_clinet| RegisterNodes -->\n\tNodes to register:" << std::endl;
00632 for (auto& param : params) {
00633 std::cout << "\t\t" << param << std::endl;
00634 }
00635 }
00636
00637 RegisterNodesRequest request;
00638
00639 request.NodesToRegister = params;
00640 RegisterNodesResponse response = Send<RegisterNodesResponse>(request);
00641 if (Debug)
00642 {
00643 std::cout << "binary_client| RegisterNodes <--\n\tRegistered NodeIds:" << std::endl;
00644 for (auto&id : response.Result) {
00645 std::cout << "\t\t" << id << std::endl;
00646 }
00647 }
00648 return response.Result;
00649 }
00650
00651 void UnregisterNodes(const std::vector<NodeId>& params) const
00652 {
00653 if (Debug) {
00654 std::cout << "binary_client| UnregisterNodes -->\n\tUnregistering nodes:" << std::endl;
00655 for (auto& id : params) {
00656 std::cout << "\t\t" << id << std::endl;
00657 }
00658 }
00659
00660 UnregisterNodesRequest request;
00661 request.NodesToUnregister = params;
00662 UnregisterNodesResponse response = Send<UnregisterNodesResponse>(request);
00663
00664 if (Debug) {
00665 std::cout << "binary_client| UnregisterNodes <--" << std::endl;
00666 }
00667 }
00668
00669 private:
00670
00671 void Release() const
00672 {
00673 ContinuationPoints.clear();
00674 BrowseNext();
00675 }
00676
00677 public:
00678
00682 virtual OpcUa::OpenSecureChannelResponse OpenSecureChannel(const OpenSecureChannelParameters& params)
00683 {
00684 if (Debug) {std::cout << "binary_client| OpenChannel -->" << std::endl;}
00685
00686 OpenSecureChannelRequest request;
00687 request.Parameters = params;
00688
00689 OpenSecureChannelResponse response = Send<OpenSecureChannelResponse>(request);
00690
00691 ChannelSecurityToken = response.ChannelSecurityToken;
00692
00693 if (Debug) {std::cout << "binary_client| OpenChannel <--" << std::endl;}
00694 return response;
00695 }
00696
00697 virtual void CloseSecureChannel(uint32_t channelId)
00698 {
00699 try
00700 {
00701 if (Debug) {std::cout << "binary_client| CloseSecureChannel -->" << std::endl;}
00702 SecureHeader hdr(MT_SECURE_CLOSE, CHT_SINGLE, ChannelSecurityToken.SecureChannelId);
00703
00704 const SymmetricAlgorithmHeader algorithmHeader = CreateAlgorithmHeader();
00705 hdr.AddSize(RawSize(algorithmHeader));
00706
00707 const SequenceHeader sequence = CreateSequenceHeader();
00708 hdr.AddSize(RawSize(sequence));
00709
00710 CloseSecureChannelRequest request;
00711
00712 hdr.AddSize(RawSize(request));
00713
00714 Stream << hdr << algorithmHeader << sequence << request << flush;
00715 if (Debug) {std::cout << "binary_client| Secure channel closed." << std::endl;}
00716 }
00717 catch (const std::exception& exc)
00718 {
00719 std::cerr << "Closing secure channel failed with error: " << exc.what() << std::endl;
00720 }
00721 if (Debug) {std::cout << "binary_client| CloseSecureChannel <--" << std::endl;}
00722 }
00723
00724 private:
00725 template <typename Response, typename Request>
00726 Response Send(Request request) const
00727 {
00728 request.Header = CreateRequestHeader();
00729
00730 RequestCallback<Response> requestCallback;
00731 ResponseCallback responseCallback = [&requestCallback](std::vector<char> buffer, ResponseHeader h){
00732 requestCallback.OnData(std::move(buffer), std::move(h));
00733 };
00734 std::unique_lock<std::mutex> lock(Mutex);
00735 Callbacks.insert(std::make_pair(request.Header.RequestHandle, responseCallback));
00736 lock.unlock();
00737
00738 Send(request);
00739
00740 return requestCallback.WaitForData(std::chrono::milliseconds(request.Header.Timeout));
00741 }
00742
00743
00744 mutable std::mutex send_mutex;
00745
00746 template <typename Request>
00747 void Send(Request request) const
00748 {
00749
00750 SecureHeader hdr(MT_SECURE_MESSAGE, CHT_SINGLE, ChannelSecurityToken.SecureChannelId);
00751 const SymmetricAlgorithmHeader algorithmHeader = CreateAlgorithmHeader();
00752 hdr.AddSize(RawSize(algorithmHeader));
00753
00754 const SequenceHeader sequence = CreateSequenceHeader();
00755 hdr.AddSize(RawSize(sequence));
00756 hdr.AddSize(RawSize(request));
00757
00758 std::unique_lock<std::mutex> send_lock(send_mutex);
00759 Stream << hdr << algorithmHeader << sequence << request << flush;
00760 }
00761
00762
00763
00764 void Receive() const
00765 {
00766 Binary::SecureHeader responseHeader;
00767 Stream >> responseHeader;
00768
00769 size_t algo_size;
00770 if (responseHeader.Type == MessageType::MT_SECURE_OPEN )
00771 {
00772 AsymmetricAlgorithmHeader responseAlgo;
00773 Stream >> responseAlgo;
00774 algo_size = RawSize(responseAlgo);
00775 }
00776 else if (responseHeader.Type == MessageType::MT_ERROR )
00777 {
00778 StatusCode error;
00779 std::string msg;
00780 Stream >> error;
00781 Stream >> msg;
00782 std::stringstream stream;
00783 stream << "Received error message from server: " << ToString(error) << ", " << msg ;
00784 throw std::runtime_error(stream.str());
00785 }
00786 else
00787 {
00788 Binary::SymmetricAlgorithmHeader responseAlgo;
00789 Stream >> responseAlgo;
00790 algo_size = RawSize(responseAlgo);
00791 }
00792
00793 Binary::SequenceHeader responseSequence;
00794 Stream >> responseSequence;
00795
00796 const std::size_t expectedHeaderSize = RawSize(responseHeader) + algo_size + RawSize(responseSequence);
00797 if (expectedHeaderSize >= responseHeader.Size)
00798 {
00799 std::stringstream stream;
00800 stream << "Size of received message " << responseHeader.Size << " bytes is invalid. Expected size " << expectedHeaderSize << " bytes";
00801 throw std::runtime_error(stream.str());
00802 }
00803
00804 const std::size_t dataSize = responseHeader.Size - expectedHeaderSize;
00805 std::vector<char> buffer(dataSize);
00806 BufferInputChannel bufferInput(buffer);
00807 Binary::RawBuffer raw(&buffer[0], dataSize);
00808 Stream >> raw;
00809
00810 IStreamBinary in(bufferInput);
00811 NodeId id;
00812 in >> id;
00813 ResponseHeader header;
00814 in >> header;
00815 if ( Debug )std::cout << "binary_client| Got response id: " << id << " and handle " << header.RequestHandle<< std::endl;
00816
00817 if (header.ServiceResult != StatusCode::Good) {
00818 std::cout << "binary_client| Received a response from server with error status: " << OpcUa::ToString(header.ServiceResult) << std::endl;
00819 }
00820
00821 if (id == SERVICE_FAULT)
00822 {
00823 std::cerr << std::endl;
00824 std::cerr << "Receive ServiceFault from Server with StatusCode " << OpcUa::ToString(header.ServiceResult) << std::endl;
00825 std::cerr << std::endl;
00826 }
00827 std::unique_lock<std::mutex> lock(Mutex);
00828 CallbackMap::const_iterator callbackIt = Callbacks.find(header.RequestHandle);
00829 if (callbackIt == Callbacks.end())
00830 {
00831 std::cout << "binary_client| No callback found for message with id: " << id << " and handle " << header.RequestHandle << std::endl;
00832 return;
00833 }
00834 callbackIt->second(std::move(buffer), std::move(header));
00835
00836 Callbacks.erase(callbackIt);
00837 }
00838
00839 Binary::Acknowledge HelloServer(const SecureConnectionParams& params)
00840 {
00841 if (Debug) {std::cout << "binary_client| HelloServer -->" << std::endl;}
00842 Binary::Hello hello;
00843 hello.ProtocolVersion = 0;
00844 hello.ReceiveBufferSize = 65536;
00845 hello.SendBufferSize = 65536;
00846 hello.MaxMessageSize = 65536;
00847 hello.MaxChunkCount = 256;
00848 hello.EndpointUrl = params.EndpointUrl;
00849
00850 Binary::Header hdr(Binary::MT_HELLO, Binary::CHT_SINGLE);
00851 hdr.AddSize(RawSize(hello));
00852
00853 Stream << hdr << hello << flush;
00854
00855 Header respHeader;
00856 Stream >> respHeader;
00857
00858 Acknowledge ack;
00859 Stream >> ack;
00860 if (Debug) {std::cout << "binary_client| HelloServer <--" << std::endl;}
00861 return ack;
00862 }
00863
00864
00865 SymmetricAlgorithmHeader CreateAlgorithmHeader() const
00866 {
00867 SymmetricAlgorithmHeader algorithmHeader;
00868 algorithmHeader.TokenId = ChannelSecurityToken.TokenId;
00869 return algorithmHeader;
00870 }
00871
00872 SequenceHeader CreateSequenceHeader() const
00873 {
00874 SequenceHeader sequence;
00875 sequence.SequenceNumber = ++SequenceNumber;
00876 sequence.RequestId = ++RequestNumber;
00877 return sequence;
00878 }
00879
00880 RequestHeader CreateRequestHeader() const
00881 {
00882 RequestHeader header;
00883 header.SessionAuthenticationToken = AuthenticationToken;
00884 header.RequestHandle = GetRequestHandle();
00885 header.Timeout = 10000;
00886 return header;
00887 }
00888
00889 unsigned GetRequestHandle() const
00890 {
00891 return ++RequestHandle;
00892 }
00893
00894
00895
00896 void RemoveSelfReferences()
00897 {
00898 if (Debug) { std::cout << "binary_client| Clearing cached references to server" << std::endl; }
00899 PublishCallbacks.clear();
00900 }
00901
00902 private:
00903 std::shared_ptr<IOChannel> Channel;
00904 mutable IOStreamBinary Stream;
00905 SecureConnectionParams Params;
00906 std::thread ReceiveThread;
00907
00908 SubscriptionCallbackMap PublishCallbacks;
00909 SecurityToken ChannelSecurityToken;
00910 mutable std::atomic<uint32_t> SequenceNumber;
00911 mutable std::atomic<uint32_t> RequestNumber;
00912 ExpandedNodeId AuthenticationToken;
00913 mutable std::atomic<uint32_t> RequestHandle;
00914 mutable std::vector<std::vector<uint8_t>> ContinuationPoints;
00915 mutable CallbackMap Callbacks;
00916 const bool Debug = true;
00917 bool Finished = false;
00918
00919 std::thread callback_thread;
00920 CallbackThread CallbackService;
00921 mutable std::mutex Mutex;
00922
00923 };
00924
00925 template <>
00926 void BinaryClient::Send<OpenSecureChannelRequest>(OpenSecureChannelRequest request) const
00927 {
00928 SecureHeader hdr(MT_SECURE_OPEN, CHT_SINGLE, ChannelSecurityToken.SecureChannelId);
00929 AsymmetricAlgorithmHeader algorithmHeader;
00930 algorithmHeader.SecurityPolicyUri = Params.SecurePolicy;
00931 algorithmHeader.SenderCertificate = Params.SenderCertificate;
00932 algorithmHeader.ReceiverCertificateThumbPrint = Params.ReceiverCertificateThumbPrint;
00933 hdr.AddSize(RawSize(algorithmHeader));
00934 hdr.AddSize(RawSize(request));
00935
00936 const SequenceHeader sequence = CreateSequenceHeader();
00937 hdr.AddSize(RawSize(sequence));
00938 Stream << hdr << algorithmHeader << sequence << request << flush;
00939 }
00940
00941 }
00942
00943
00944 OpcUa::Services::SharedPtr OpcUa::CreateBinaryClient(OpcUa::IOChannel::SharedPtr channel, const OpcUa::SecureConnectionParams& params, bool debug)
00945 {
00946 return std::make_shared<BinaryClient>(channel, params, debug);
00947 }
00948
00949 OpcUa::Services::SharedPtr OpcUa::CreateBinaryClient(const std::string& endpointUrl, bool debug)
00950 {
00951 const Common::Uri serverUri(endpointUrl);
00952 OpcUa::IOChannel::SharedPtr channel = OpcUa::Connect(serverUri.Host(), serverUri.Port());
00953 OpcUa::SecureConnectionParams params;
00954 params.EndpointUrl = endpointUrl;
00955 params.SecurePolicy = "http://opcfoundation.org/UA/SecurityPolicy#None";
00956 return CreateBinaryClient(channel, params, debug);
00957 }