25 #include <condition_variable> 35 using namespace OpcUa;
38 typedef std::map<uint32_t, std::function<void (PublishResult)>> SubscriptionCallbackMap;
43 BufferInputChannel(
const std::vector<char> & buffer)
50 virtual std::size_t Receive(
char * data, std::size_t size)
52 if (Pos >= Buffer.size())
57 size = std::min(size, Buffer.size() - Pos);
58 std::vector<char>::const_iterator begin = Buffer.begin() + Pos;
59 std::vector<char>::const_iterator
end = begin + size;
60 std::copy(begin, end, data);
75 const std::vector<char> & Buffer;
84 RequestCallback(
const Common::Logger::SharedPtr & logger)
95 doneEvent.notify_all();
98 T WaitForData(std::chrono::milliseconds msec)
100 if (doneEvent.wait_for(lock, msec) == std::cv_status::timeout)
101 {
throw std::runtime_error(
"Response timed out"); }
108 LOG_WARN(Logger,
"binary_client | received empty packet from server");
113 BufferInputChannel bufferInput(
Data);
122 Common::Logger::SharedPtr Logger;
123 std::vector<char>
Data;
126 std::unique_lock<std::mutex> lock;
127 std::condition_variable doneEvent;
133 CallbackThread(
const Common::Logger::SharedPtr & logger =
nullptr)
140 void post(std::function<
void()> callback)
142 LOG_DEBUG(Logger,
"binary_client | CallbackThread: post -->");
144 std::unique_lock<std::mutex> lock(Mutex);
145 Queue.push(callback);
146 Condition.notify_one();
148 LOG_DEBUG(Logger,
"binary_client | CallbackThread: post <--");
155 LOG_DEBUG(Logger,
"binary_client | CallbackThread: waiting for next post");
157 std::unique_lock<std::mutex> lock(Mutex);
158 Condition.wait(lock, [&]() {
return (StopRequest ==
true) || (! Queue.empty()) ;});
162 LOG_DEBUG(Logger,
"binary_client | CallbackThread: exited");
167 while (!Queue.empty())
169 std::function<void()> callback = Queue.front();
173 LOG_DEBUG(Logger,
"binary_client | CallbackThread: calling callback");
175 LOG_DEBUG(Logger,
"binary_client | CallbackThread: callback finished");
184 LOG_DEBUG(Logger,
"binary_client | CallbackThread: stopping");
187 Condition.notify_all();
191 Common::Logger::SharedPtr Logger;
193 std::condition_variable Condition;
194 std::atomic<bool> StopRequest;
195 std::queue<std::function<void()>> Queue;
206 ,
public std::enable_shared_from_this<BinaryClient>
209 typedef std::function<void(std::vector<char>,
ResponseHeader)> ResponseCallback;
210 typedef std::map<uint32_t, ResponseCallback> CallbackMap;
211 std::vector<char> messageBuffer;
214 BinaryClient(std::shared_ptr<IOChannel> channel,
const SecureConnectionParams & params,
const Common::Logger::SharedPtr & logger)
222 , CallbackService(logger)
226 callback_thread = std::thread([&]() { CallbackService.Run(); });
233 CallbackService.Stop();
234 callback_thread.join();
238 ReceiveThread = std::thread([
this]()
246 catch (
const std::exception & exc)
248 if (Finished) {
return; }
250 LOG_ERROR(Logger,
"binary_client | ReceiveThread: error receiving data: {}", exc.what());
259 LOG_DEBUG(Logger,
"binary_client | stopping callback thread");
261 CallbackService.Stop();
263 LOG_DEBUG(Logger,
"binary_client | joining service thread");
265 callback_thread.join();
269 LOG_DEBUG(Logger,
"binary_client | joining receive thread");
271 ReceiveThread.join();
273 LOG_DEBUG(Logger,
"binary_client | receive thread stopped");
281 LOG_DEBUG(Logger,
"binary_client | CreateSession -->");
284 request.
Header = CreateRequestHeader();
304 LOG_DEBUG(Logger,
"binary_client | CreateSession <--");
311 LOG_DEBUG(Logger,
"binary_client | ActivateSession -->");
318 LOG_DEBUG(Logger,
"binary_client | ActivateSession <--");
325 LOG_DEBUG(Logger,
"binary_client | CloseSession -->");
329 RemoveSelfReferences();
331 LOG_DEBUG(Logger,
"binary_client | CloseSession <--");
336 virtual void AbortSession()
override 338 LOG_DEBUG(Logger,
"binary_client | AbortSession -->");
340 RemoveSelfReferences();
342 LOG_DEBUG(Logger,
"binary_client | AbortSession <--");
345 DeleteNodesResponse DeleteNodes(
const std::vector<OpcUa::DeleteNodesItem> & nodesToDelete)
override 347 LOG_DEBUG(Logger,
"binary_client | DeleteNodes -->");
353 LOG_DEBUG(Logger,
"binary_client | DeleteNodes <--");
361 virtual std::shared_ptr<AttributeServices> Attributes()
override 363 return shared_from_this();
369 LOG_DEBUG(Logger,
"binary_client | Read -->");
380 const ReadResponse response = Send<ReadResponse>(request);
382 LOG_DEBUG(Logger,
"binary_client | Read <--");
387 virtual std::vector<OpcUa::StatusCode>
Write(
const std::vector<WriteValue> & values)
override 389 LOG_DEBUG(Logger,
"binary_client | Write -->");
395 LOG_DEBUG(Logger,
"binary_client | Write <--");
403 virtual std::shared_ptr<EndpointServices>
Endpoints()
override 405 return shared_from_this();
408 virtual std::vector<ApplicationDescription> FindServers(
const FindServersParameters & params)
const override 410 LOG_DEBUG(Logger,
"binary_client | FindServers -->");
416 LOG_DEBUG(Logger,
"binary_client | FindServers <--");
421 virtual std::vector<EndpointDescription> GetEndpoints(
const GetEndpointsParameters & filter)
const override 423 LOG_DEBUG(Logger,
"binary_client | GetEndpoints -->");
426 request.
Header = CreateRequestHeader();
432 LOG_DEBUG(Logger,
"binary_client | GetEndpoints <--");
444 virtual std::shared_ptr<MethodServices>
Method()
override 446 return shared_from_this();
449 virtual std::vector<CallMethodResult> Call(
const std::vector<CallMethodRequest> & methodsToCall)
override 451 LOG_DEBUG(Logger,
"binary_client | Call -->");
455 const CallResponse response = Send<CallResponse>(request);
457 LOG_DEBUG(Logger,
"binary_client | Call <--");
471 virtual std::shared_ptr<NodeManagementServices> NodeManagement()
override 473 return shared_from_this();
476 virtual std::vector<AddNodesResult> AddNodes(
const std::vector<AddNodesItem> & items)
override 478 LOG_DEBUG(Logger,
"binary_client | AddNodes -->");
484 LOG_DEBUG(Logger,
"binary_client | AddNodes <--");
489 virtual std::vector<StatusCode> AddReferences(
const std::vector<AddReferencesItem> & items)
override 491 LOG_DEBUG(Logger,
"binary_client | AddReferences -->");
497 LOG_DEBUG(Logger,
"binary_client | AddReferences <--");
502 virtual void SetMethod(
const NodeId & node, std::function<std::vector<OpcUa::Variant> (
NodeId context, std::vector<OpcUa::Variant> arguments)> callback)
override 504 LOG_WARN(Logger,
"binary_client | SetMethod has no effect on client!");
512 virtual std::shared_ptr<SubscriptionServices> Subscriptions()
override 514 return shared_from_this();
519 LOG_DEBUG(Logger,
"binary_client | CreateSubscription -->");
523 LOG_DEBUG(Logger,
"binary_client | got CreateSubscriptionResponse");
527 LOG_DEBUG(Logger,
"binary_client | CreateSubscription <--");
529 return response.
Data;
534 LOG_DEBUG(Logger,
"binary_client | ModifySubscription -->");
540 LOG_DEBUG(Logger,
"binary_client | ModifySubscription <--");
545 virtual std::vector<StatusCode> DeleteSubscriptions(
const std::vector<uint32_t> & subscriptions)
override 547 LOG_DEBUG(Logger,
"binary_client | DeleteSubscriptions -->");
553 LOG_DEBUG(Logger,
"binary_client | DeleteSubscriptions <--");
558 virtual std::vector<MonitoredItemCreateResult> CreateMonitoredItems(
const MonitoredItemsParameters & parameters)
override 560 LOG_DEBUG(Logger,
"binary_client | CreateMonitoredItems -->");
561 LOG_TRACE(Logger,
"binary_client | {}", parameters);
567 LOG_DEBUG(Logger,
"binary_client | CreateMonitoredItems <--");
574 LOG_DEBUG(Logger,
"binary_client | DeleteMonitoredItems -->");
580 LOG_DEBUG(Logger,
"binary_client | DeleteMonitoredItems <--");
585 virtual void Publish(
const PublishRequest & originalrequest)
override 590 request.
Header = CreateRequestHeader();
593 ResponseCallback responseCallback = [
this](std::vector<char> buffer,
ResponseHeader h)
595 LOG_DEBUG(Logger,
"binary_client | got publish response, from server");
606 BufferInputChannel bufferInput(buffer);
611 CallbackService.post([
this, response]()
615 LOG_DEBUG(Logger,
"binary_client | calling callback for Subscription: {}", response.Parameters.SubscriptionId);
617 SubscriptionCallbackMap::const_iterator callbackIt = this->PublishCallbacks.find(response.Parameters.SubscriptionId);
619 if (callbackIt == this->PublishCallbacks.end())
621 LOG_WARN(Logger,
"binary_client | unknown SubscriptionId {}", response.Parameters.SubscriptionId);
628 callbackIt->second(response.Parameters);
631 catch (
const std::exception & ex)
633 LOG_WARN(Logger,
"binary_client | error calling application callback: {}", ex.what());
640 LOG_WARN(Logger,
"binary_client | session is closed");
646 LOG_DEBUG(Logger,
"binary_client | not implemented");
650 std::unique_lock<std::mutex> lock(Mutex);
655 LOG_DEBUG(Logger,
"binary_client | Publish <--");
660 LOG_DEBUG(Logger,
"binary_client | Republish -->");
663 request.
Header = CreateRequestHeader();
668 LOG_DEBUG(Logger,
"binary_client | Republish <--");
676 virtual std::shared_ptr<ViewServices>
Views()
override 678 return shared_from_this();
683 LOG_DEBUG(Logger,
"binary_client | TranslateBrowsePathsToNodeIds -->");
686 request.
Header = CreateRequestHeader();
690 LOG_DEBUG(Logger,
"binary_client | TranslateBrowsePathsToNodeIds <--");
698 LOG_DEBUG(Logger,
"binary_client | Browse -->");
708 request.
Header = CreateRequestHeader();
709 request.
Query = query;
711 ContinuationPoints.clear();
721 LOG_DEBUG(Logger,
"binary_client | Browse <--");
726 virtual std::vector<BrowseResult> BrowseNext()
const override 728 LOG_DEBUG(Logger,
"binary_client | BrowseNext -->");
731 if (ContinuationPoints.empty())
733 LOG_DEBUG(Logger,
"binary_client | BrowseNext <-- no Continuation point, no need to send browse next request");
735 return std::vector<BrowseResult>();
739 request.ReleaseContinuationPoints = ContinuationPoints.empty() ?
true :
false;
740 request.ContinuationPoints = ContinuationPoints;
742 ContinuationPoints.clear();
744 for (
auto result : response.
Results)
752 LOG_DEBUG(Logger,
"binary_client | BrowseNext <--");
757 std::vector<NodeId> RegisterNodes(
const std::vector<NodeId> & params)
const override 759 LOG_DEBUG(Logger,
"binary_client | RegisterNodes -->");
762 Logger->trace(
"binary_client | Nodes to register:");
764 for (
auto & param : params)
766 Logger->trace(
" {}", param);
777 Logger->trace(
"binary_client | registered NodeIds:");
779 for (
auto &
id : response.
Result)
781 Logger->trace(
" {}",
id);
784 LOG_DEBUG(Logger,
"binary_client | RegisterNodes <--");
788 void UnregisterNodes(
const std::vector<NodeId> & params)
const override 790 LOG_DEBUG(Logger,
"binary_client | UnregisterNodes -->");
793 Logger->trace(
"binary_client | Nodes to unregister:");
795 for (
auto &
id : params)
797 Logger->trace(
" {}",
id);
805 LOG_DEBUG(Logger,
"binary_client | UnregisterNodes <--");
812 ContinuationPoints.clear();
823 LOG_DEBUG(Logger,
"binary_client | OpenChannel -->");
832 LOG_DEBUG(Logger,
"binary_client | OpenChannel <--");
837 virtual void CloseSecureChannel(uint32_t channelId)
override 839 LOG_DEBUG(Logger,
"binary_client | CloseSecureChannel -->");
845 hdr.AddSize(
RawSize(algorithmHeader));
847 std::unique_lock<std::mutex> send_lock(send_mutex);
850 hdr.AddSize(
RawSize(sequence));
856 Stream << hdr << algorithmHeader << sequence << request <<
flush;
859 catch (
const std::exception & exc)
861 LOG_WARN(Logger,
"closing secure channel failed with: {}", exc.what());
864 LOG_DEBUG(Logger,
"binary_client | CloseSecureChannel <--");
868 template <
typename Response,
typename Request>
869 Response Send(Request request)
const 871 request.Header = CreateRequestHeader();
873 RequestCallback<Response> requestCallback(Logger);
874 ResponseCallback responseCallback = [&requestCallback](std::vector<char> buffer,
ResponseHeader h)
878 std::unique_lock<std::mutex> lock(Mutex);
879 Callbacks.insert(std::make_pair(request.Header.RequestHandle, responseCallback));
882 LOG_DEBUG(Logger,
"binary_client | send: id: {} handle: {}, UtcTime: {}",
ToString(request.TypeId,
true), request.Header.RequestHandle, request.Header.UtcTime);
890 res = requestCallback.WaitForData(std::chrono::milliseconds(request.Header.Timeout));
893 catch (std::exception & ex)
896 std::unique_lock<std::mutex> lock(Mutex);
897 Callbacks.erase(request.Header.RequestHandle);
906 mutable std::mutex send_mutex;
908 template <
typename Request>
909 void Send(Request request)
const 914 hdr.AddSize(
RawSize(algorithmHeader));
916 std::unique_lock<std::mutex> send_lock(send_mutex);
919 hdr.AddSize(
RawSize(sequence));
922 Stream << hdr << algorithmHeader << sequence << request <<
flush;
930 Stream >> responseHeader;
931 LOG_DEBUG(Logger,
"binary_client | received message: Type: {}, ChunkType: {}, Size: {}, ChannelId: {}", responseHeader.Type, responseHeader.Chunk, responseHeader.Size, responseHeader.ChannelId);
938 Stream >> responseAlgo;
939 algo_size =
RawSize(responseAlgo);
948 std::stringstream stream;
949 stream <<
"Received error message from server: " <<
ToString(error) <<
", " << msg ;
950 throw std::runtime_error(stream.str());
956 Stream >> responseAlgo;
957 algo_size =
RawSize(responseAlgo);
962 Stream >> responseSequence;
964 const std::size_t expectedHeaderSize =
RawSize(responseHeader) + algo_size +
RawSize(responseSequence);
966 if (expectedHeaderSize >= responseHeader.Size)
968 std::stringstream stream;
969 stream <<
"Size of received message " << responseHeader.Size <<
" bytes is invalid. Expected size " << expectedHeaderSize <<
" bytes";
970 throw std::runtime_error(stream.str());
973 std::size_t dataSize = responseHeader.Size - expectedHeaderSize;
977 parseMessage(dataSize,
id);
978 firstMsgParsed =
false;
980 std::unique_lock<std::mutex> lock(Mutex);
981 CallbackMap::const_iterator callbackIt = Callbacks.find(
header.RequestHandle);
983 if (callbackIt == Callbacks.end())
985 LOG_WARN(Logger,
"binary_client | no callback found for message id: {}, handle: {}",
id,
header.RequestHandle);
986 messageBuffer.clear();
991 messageBuffer.clear();
992 Callbacks.erase(callbackIt);
997 parseMessage(dataSize,
id);
998 firstMsgParsed =
true;
1002 void parseMessage(std::size_t & dataSize,
NodeId &
id)
1004 std::vector<char> buffer(dataSize);
1005 BufferInputChannel bufferInput(buffer);
1010 if (!firstMsgParsed)
1016 LOG_DEBUG(Logger,
"binary_client | got response id: {}, handle: {}",
ToString(
id,
true), header.RequestHandle);
1020 LOG_WARN(Logger,
"binary_client | receive ServiceFault from Server with StatusCode: {}", header.ServiceResult);
1024 LOG_WARN(Logger,
"binary_client | received a response from server with error status: {}", header.ServiceResult);
1027 messageBuffer.insert(messageBuffer.end(), buffer.begin(), buffer.end());
1032 messageBuffer.insert(messageBuffer.end(), buffer.begin(), buffer.end());
1038 LOG_DEBUG(Logger,
"binary_client | HelloServer -->");
1051 Stream << hdr << hello <<
flush;
1054 Stream >> respHeader;
1059 LOG_DEBUG(Logger,
"binary_client | HelloServer <--");
1069 return algorithmHeader;
1089 unsigned GetRequestHandle()
const 1091 return ++RequestHandle;
1096 void RemoveSelfReferences()
1098 LOG_DEBUG(Logger,
"binary_client | clearing cached references to server");
1100 PublishCallbacks.clear();
1104 std::shared_ptr<IOChannel> Channel;
1107 std::thread ReceiveThread;
1109 SubscriptionCallbackMap PublishCallbacks;
1111 mutable std::atomic<uint32_t> SequenceNumber;
1112 mutable std::atomic<uint32_t> RequestNumber;
1114 mutable std::atomic<uint32_t> RequestHandle;
1115 mutable std::vector<std::vector<uint8_t>> ContinuationPoints;
1116 mutable CallbackMap Callbacks;
1117 Common::Logger::SharedPtr Logger;
1118 bool Finished =
false;
1120 std::thread callback_thread;
1121 CallbackThread CallbackService;
1122 mutable std::mutex Mutex;
1124 bool firstMsgParsed =
false;
1136 hdr.AddSize(
RawSize(algorithmHeader));
1137 hdr.AddSize(
RawSize(request));
1139 std::unique_lock<std::mutex> send_lock(send_mutex);
1142 hdr.AddSize(
RawSize(sequence));
1143 Stream << hdr << algorithmHeader << sequence << request <<
flush;
1151 return std::make_shared<BinaryClient>(channel, params, logger);
1160 params.SecurePolicy =
"http://opcfoundation.org/UA/SecurityPolicy#None";
std::vector< uint32_t > SubscriptionIds
std::vector< ApplicationDescription > Descriptions
std::vector< OpcUa::ReadValueId > AttributesToRead
#define LOG_TRACE(__logger__,...)
std::vector< OpcUa::NodeId > NodesToRegister
std::vector< uint8_t > ContinuationPoint
#define LOG_WARN(__logger__,...)
std::vector< AddNodesResult > results
void CloseSession(OpcUa::Binary::IOStream &stream, const OpcUa::Binary::CreateSessionResponse &session)
OpcUa::CreateSessionParameters Parameters
ApplicationDescription ClientDescription
std::string GatewayServerUri
std::vector< BrowseDescription > NodesToBrowse
std::vector< OpcUa::StatusCode > Results
TranslateBrowsePathsResult Result
#define LOG_ERROR(__logger__,...)
std::vector< AddNodesItem > NodesToAdd
std::vector< BrowseResult > Results
#define LOG_DEBUG(__logger__,...)
OpcUa::ByteString ClientNonce
std::vector< StatusCode > Results
std::vector< uint8_t > ClientCertificate
std::vector< T > Browse(const NodeId &node, NodeClass nodeClassMask, Services::SharedPtr services)
std_msgs::Header * header(M &m)
OpcUa::NodeId AuthenticationToken
TranslateBrowsePathsParameters Parameters
OpcUa::ByteString ClientCertificate
std::vector< OpcUa::StatusCode > Results
OpcUa::CreateSessionResult Parameters
OStream< ChannelType > & flush(OStream< ChannelType > &os)
uint32_t MaxResponseMessageSize
int Write(int fd, const void *buf, unsigned int count)
std::vector< BrowseResult > Results
OpcUa::ReadParameters Parameters
OpcUa::RequestHeader Header
OpcUa::CallParameters Parameters
fmt::BufferedFile & move(fmt::BufferedFile &f)
std::vector< OpcUa::DataValue > Results
OpcUa::RequestHeader Header
OpcUa::Binary::CreateSessionResponse CreateSession(OpcUa::Binary::IOStream &stream)
Services::SharedPtr CreateBinaryClient(IOChannel::SharedPtr channel, const SecureConnectionParams ¶ms, const Common::Logger::SharedPtr &logger=nullptr)
Create server based on opc ua binary protocol.
std::vector< OpcUa::DeleteNodesItem > NodesToDelete
OpcUa::ApplicationType ApplicationType
std::vector< OpcUa::MonitoredItemCreateResult > Results
std::vector< BrowsePathResult > Paths
int Read(int fd, void *buf, unsigned int count)
OpcUa::RequestHeader Header
double RequestedSessionTimeout
std::string DiscoveryProfileUri
SecurityToken ChannelSecurityToken
OPC UA Address space part. GNU LGPL.
std::string ApplicationUri
void ActivateSession(OpcUa::Binary::IOStream &stream, const OpcUa::Binary::CreateSessionResponse &session)
OpenSecureChannelParameters Parameters
std::string ToHexDump(const char *buf, std::size_t size)
OpcUa::ResponseHeader Header
OpcUa::DeleteMonitoredItemsParameters Parameters
std::vector< OpcUa::WriteValue > NodesToWrite
std::vector< std::string > LocaleIds
OpcUa::LocalizedText ApplicationName
OpcUa::WriteParameters Parameters
OpcUa::GetEndpointsParameters Parameters
OpcUa::SubscriptionData Data
OpcUa::RequestHeader Header
std::string ToString(const AttributeId &value)
std::vector< OpcUa::NodeId > NodesToUnregister
OpcUa::ModifySubscriptionParameters Parameters
std::vector< std::string > DiscoveryUrls
OpcUa::RepublishParameters Parameters
std::vector< OpcUa::CallMethodRequest > MethodsToCall
std::vector< OpcUa::CallMethodResult > Results
OpcUa::ApplicationDescription ClientDescription
OpcUa::ActivateSessionParameters Parameters
AddNodesParameters Parameters
OpcUa::AttributeId AttributeId
std::vector< OpcUa::NodeId > Result
std::vector< std::string > LocaleIds
std::vector< OpcUa::SubscriptionAcknowledgement > SubscriptionAcknowledgements
uint32_t ReceiveBufferSize
FindServersParameters Parameters
std::vector< AddReferencesItem > ReferencesToAdd
std::unique_ptr< RemoteConnection > Connect(const std::string &host, unsigned port, const Common::Logger::SharedPtr &logger)
OpcUa::MonitoredItemsParameters Parameters
std::vector< std::string > ProfileUris
AddReferencesParameters Parameters
std::vector< OpcUa::EndpointDescription > Endpoints
std::size_t RawSize(const T &obj)
std::vector< OpcUa::StatusCode > Results