Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #include <opc/ua/subscription.h>
00022 #include <opc/ua/protocol/string_utils.h>
00023
00024 #include <boost/asio.hpp>
00025 #include <iostream>
00026
00027 namespace OpcUa
00028 {
00029 Subscription::Subscription(Services::SharedPtr server, const CreateSubscriptionParameters& params, SubscriptionHandler& callback, bool debug)
00030 : Server(server), Client(callback), Debug(debug)
00031 {
00032 CreateSubscriptionRequest request;
00033 request.Parameters = params;
00034 Services::SharedPtr serverptr = Server;
00035 Data = Server->Subscriptions()->CreateSubscription(request, [this, serverptr](PublishResult i){ this->PublishCallback(serverptr, i); } );
00036
00037 Server->Subscriptions()->Publish(PublishRequest());
00038 Server->Subscriptions()->Publish(PublishRequest());
00039 }
00040
00041 void Subscription::Delete()
00042 {
00043 std::vector<StatusCode> results = Server->Subscriptions()->DeleteSubscriptions(std::vector<uint32_t>{Data.SubscriptionId});
00044 for (auto res: results)
00045 {
00046 CheckStatusCode(res);
00047 }
00048 }
00049
00050 void Subscription::PublishCallback(Services::SharedPtr server, const PublishResult result)
00051 {
00052
00053 if (Debug){ std::cout << "Subscription | Suscription::PublishCallback called with " <<result.NotificationMessage.NotificationData.size() << " notifications " << std::endl; }
00054 for (const NotificationData& data: result.NotificationMessage.NotificationData )
00055 {
00056 if (data.Header.TypeId == ExpandedObjectId::DataChangeNotification)
00057 {
00058 if (Debug) { std::cout << "Subscription | Notification is of type DataChange\n"; }
00059 CallDataChangeCallback(data);
00060 }
00061 else if (data.Header.TypeId == ExpandedObjectId::EventNotificationList)
00062 {
00063 if (Debug) { std::cout << "Subscription | Notification is of type Event\n"; }
00064 CallEventCallback(data);
00065 }
00066 else if (data.Header.TypeId == ExpandedObjectId::StatusChangeNotification)
00067 {
00068 if (Debug) { std::cout << "Subscription | Notification is of type StatusChange\n"; }
00069 CallStatusChangeCallback(data);
00070 }
00071 else
00072 {
00073 std::cout << "Subscription | Error unknown notficiation type received: " << data.Header.TypeId <<std::endl;
00074 }
00075 }
00076 OpcUa::SubscriptionAcknowledgement ack;
00077 ack.SubscriptionId = GetId();
00078 ack.SequenceNumber = result.NotificationMessage.SequenceNumber;
00079 PublishRequest request;
00080 request.SubscriptionAcknowledgements.push_back(ack);
00081 server->Subscriptions()->Publish(request);
00082 }
00083
00084 void Subscription::CallDataChangeCallback(const NotificationData& data)
00085 {
00086 for ( const MonitoredItems& item: data.DataChange.Notification)
00087 {
00088 std::unique_lock<std::mutex> lock(Mutex);
00089
00090 AttValMap::iterator mapit = AttributeValueMap.find(item.ClientHandle);
00091 if ( mapit == AttributeValueMap.end() )
00092 {
00093 std::cout << "Subscription | Server Error got publishresult for an unknown monitoreditem id : "<< item.ClientHandle << std::endl;
00094 }
00095 else
00096 {
00097 AttributeId attr = mapit->second.Attribute;
00098 Node node = mapit->second.TargetNode;
00099 lock.unlock();
00100 if (Debug) { std::cout << "Subscription | Debug: Calling DataChange user callback " << item.ClientHandle << " and node: " << mapit->second.TargetNode << std::endl; }
00101 Client.DataValueChange(mapit->second.MonitoredItemId, node, item.Value, attr);
00102 Client.DataChange(mapit->second.MonitoredItemId, node, item.Value.Value, attr);
00103 }
00104 }
00105 }
00106
00107 void Subscription::CallStatusChangeCallback(const NotificationData& data)
00108 {
00109 Client.StatusChange(data.StatusChange.Status);
00110 }
00111
00112 void Subscription::CallEventCallback(const NotificationData& data)
00113 {
00114 for ( EventFieldList ef : data.Events.Events)
00115 {
00116 std::unique_lock<std::mutex> lock(Mutex);
00117
00118 AttValMap::iterator mapit = AttributeValueMap.find(ef.ClientHandle);
00119 if ( mapit == AttributeValueMap.end() )
00120 {
00121 std::cout << "Subscription | Server Error got publishresult for an unknown monitoreditem id : "<< ef.ClientHandle << std::endl;
00122 }
00123 else
00124 {
00125 Event ev;
00126 uint32_t count = 0;
00127 if ( mapit->second.Filter.Event.SelectClauses.size() != ef.EventFields.size() )
00128 {
00129 throw std::runtime_error("Subscription | Error receive event format does not match requested filter");
00130 }
00131 for (SimpleAttributeOperand op : mapit->second.Filter.Event.SelectClauses )
00132 {
00133
00134 if ( op.BrowsePath.size() == 1 )
00135 {
00136 if ( op.BrowsePath[0] == QualifiedName("EventId", 0) )
00137 {
00138 ev.EventId = ef.EventFields[count].As<ByteString>();
00139 }
00140 else if ( op.BrowsePath[0] == QualifiedName("EventType", 0) )
00141 {
00142 ev.EventType = ef.EventFields[count].As<NodeId>();
00143 }
00144 else if ( op.BrowsePath[0] == QualifiedName("SourceNode", 0) )
00145 {
00146 ev.SourceNode = ef.EventFields[count].As<NodeId>();
00147 }
00148 else if ( op.BrowsePath[0] == QualifiedName("SourceName", 0) )
00149 {
00150 ev.SourceName = ef.EventFields[count].As<std::string>();
00151 }
00152 else if ( op.BrowsePath[0] == QualifiedName("Message", 0) )
00153 {
00154 ev.Message = ef.EventFields[count].As<LocalizedText>();
00155 }
00156 else if ( op.BrowsePath[0] == QualifiedName("Severity", 0) )
00157 {
00158 ev.Severity = ef.EventFields[count].As<uint16_t>();
00159 }
00160 else if ( op.BrowsePath[0] == QualifiedName("LocalTime", 0) )
00161 {
00162 ev.LocalTime = ef.EventFields[count].As<DateTime>();
00163 }
00164 else if ( op.BrowsePath[0] == QualifiedName("ReceiveTime", 0) )
00165 {
00166 ev.ReceiveTime = ef.EventFields[count].As<DateTime>();
00167 }
00168 else if ( op.BrowsePath[0] == QualifiedName("Time", 0) )
00169 {
00170 ev.Time = ef.EventFields[count].As<DateTime>();
00171 }
00172 }
00173
00174 ev.SetValue(op.BrowsePath, ef.EventFields[count]);
00175 ++count;
00176 }
00177 lock.unlock();
00178 if (Debug) { std::cout << "Subscription | Debug: Calling client event callback\n"; }
00179 Client.Event(mapit->second.MonitoredItemId, ev);
00180 if (Debug) { std::cout << "Subscription | Debug: callback call finished\n"; }
00181 }
00182 }
00183 }
00184
00185 RepublishResponse Subscription::Republish(uint32_t sequenceNumber)
00186 {
00187 RepublishParameters params;
00188 params.SubscriptionId = Data.SubscriptionId;
00189 params.RetransmitSequenceNumber = sequenceNumber;
00190 RepublishResponse response = Server->Subscriptions()->Republish(params);
00191 return response;
00192 }
00193
00194 uint32_t Subscription::SubscribeDataChange(const Node& node, AttributeId attr)
00195 {
00196 ReadValueId avid;
00197 avid.NodeId = node.GetId();
00198 avid.AttributeId = attr;
00199
00200 std::vector<uint32_t> results = SubscribeDataChange(std::vector<ReadValueId>({avid}));
00201 if (results.size() != 1) { throw std::runtime_error("Subscription | Server error, SubscribeDataChange should have returned exactly one result"); }
00202 return results.front();
00203 }
00204
00205 std::vector<MonitoredItemCreateResult> Subscription::Subscribe(std::vector<MonitoredItemCreateRequest> request)
00206 {
00207 std::unique_lock<std::mutex> lock(Mutex);
00208
00209 MonitoredItemsParameters itemsParams;
00210 itemsParams.SubscriptionId = Data.SubscriptionId;
00211 itemsParams.TimestampsToReturn = TimestampsToReturn(2);
00212 for (auto req : request)
00213 {
00214 itemsParams.ItemsToCreate.push_back(req);
00215 }
00216
00217 return Server->Subscriptions()->CreateMonitoredItems(itemsParams);
00218 }
00219
00220 std::vector<uint32_t> Subscription::SubscribeDataChange(const std::vector<ReadValueId>& attributes)
00221 {
00222 std::unique_lock<std::mutex> lock(Mutex);
00223
00224 MonitoredItemsParameters itemsParams;
00225 itemsParams.SubscriptionId = Data.SubscriptionId;
00226 itemsParams.TimestampsToReturn = TimestampsToReturn(2);
00227
00228 for (ReadValueId attr : attributes)
00229 {
00230 MonitoredItemCreateRequest req;
00231 req.ItemToMonitor = attr;
00232 req.MonitoringMode = MonitoringMode::Reporting;
00233 MonitoringParameters params;
00234 params.SamplingInterval = Data.RevisedPublishingInterval;
00235 params.QueueSize = 1;
00236 params.DiscardOldest = true;
00237 params.ClientHandle = (uint32_t)++LastMonitoredItemHandle;
00238 req.RequestedParameters = params;
00239 itemsParams.ItemsToCreate.push_back(req);
00240 }
00241
00242 std::vector<MonitoredItemCreateResult> results = Server->Subscriptions()->CreateMonitoredItems(itemsParams);
00243
00244 if ( results.size() != attributes.size() )
00245 {
00246 throw(std::runtime_error("Subscription | Error server did not send answer for all monitoreditem requests"));
00247 }
00248
00249 std::vector<uint32_t> monitoredItemsIds;
00250 unsigned int i = 0;
00251 for (const auto& res : results)
00252 {
00253 CheckStatusCode(res.Status);
00254 if (Debug ) { std::cout << "Subscription | storing monitoreditem with handle " << itemsParams.ItemsToCreate[i].RequestedParameters.ClientHandle << " and id " << res.MonitoredItemId << std::endl; }
00255 MonitoredItemData mdata;
00256 mdata.MonitoredItemId = res.MonitoredItemId;
00257 mdata.Attribute = attributes[i].AttributeId;
00258 mdata.TargetNode = Node(Server, attributes[i].NodeId);
00259 AttributeValueMap[itemsParams.ItemsToCreate[i].RequestedParameters.ClientHandle] = mdata;
00260 monitoredItemsIds.push_back(res.MonitoredItemId);
00261 ++i;
00262 }
00263 return monitoredItemsIds;
00264 }
00265
00266 void Subscription::setUsrPtr(uint32_t handle,UserData *usr)
00267 {
00268 AttributeValueMap[handle].usrVar = usr;
00269 }
00270
00271 UserData * Subscription::getUsrPtr(uint32_t handle)
00272 {
00273 return AttributeValueMap[handle].usrVar;
00274 }
00275
00276 void Subscription::UnSubscribe(uint32_t handle)
00277 {
00278 return UnSubscribe(std::vector<uint32_t>(1, handle));
00279 }
00280
00281 void Subscription::UnSubscribe(std::vector<uint32_t> handles)
00282 {
00283 std::unique_lock<std::mutex> lock(Mutex);
00284
00285 DeleteMonitoredItemsParameters params;
00286 params.SubscriptionId = Data.SubscriptionId;
00287 std::vector<uint32_t> mids;
00288 for (auto id : handles)
00289 {
00290 if (Debug) std::cout << "Subscription | Sending unsubscribe for monitoreditemsid: " << id << std::endl;
00291 mids.push_back(uint32_t(id));
00292
00293 for ( auto pair : AttributeValueMap )
00294 {
00295 if (pair.second.MonitoredItemId == id)
00296 {
00297 AttributeValueMap.erase(pair.first);
00298 break;
00299 }
00300 }
00301 }
00302 params.MonitoredItemIds = mids;
00303 auto results = Server->Subscriptions()-> DeleteMonitoredItems(params);
00304 for (auto res : results)
00305 {
00306 CheckStatusCode(res);
00307 }
00308 }
00309
00310 uint32_t Subscription::SubscribeEvents()
00311 {
00312 return SubscribeEvents(Node(Server, ObjectId::Server), Node(Server, ObjectId::BaseEventType));
00313 }
00314
00315 uint32_t Subscription::SubscribeEvents(const Node& node, const Node& eventtype)
00316 {
00317 EventFilter filter;
00318 if (Debug) std::cout << "Subscription | Subscribing events with filter for properties:" << std::endl;
00319 for ( Node& child: eventtype.GetProperties() )
00320 {
00321 if (Debug) std::cout << " property: "<< child.GetBrowseName() << std::endl;
00322 SimpleAttributeOperand op;
00323 op.TypeId = eventtype.GetId();
00324 op.Attribute = AttributeId::Value;
00325 op.BrowsePath = std::vector<QualifiedName>({child.GetBrowseName()});
00326 filter.SelectClauses.push_back(op);
00327 }
00328 return SubscribeEvents(node, filter);
00329 }
00330
00331 uint32_t Subscription::SubscribeEvents(const Node& node, const EventFilter& eventfilter)
00332 {
00333 std::unique_lock<std::mutex> lock(Mutex);
00334
00335 MonitoredItemsParameters itemsParams;
00336 itemsParams.SubscriptionId = Data.SubscriptionId;
00337 itemsParams.TimestampsToReturn = TimestampsToReturn(2);
00338
00339 ReadValueId avid;
00340 avid.NodeId = node.GetId();
00341 avid.AttributeId = AttributeId::EventNotifier;
00342
00343 MonitoredItemCreateRequest req;
00344 req.ItemToMonitor = avid;
00345 req.MonitoringMode = MonitoringMode::Reporting;
00346 MonitoringParameters params;
00347 params.SamplingInterval = Data.RevisedPublishingInterval;
00348 params.QueueSize = std::numeric_limits<uint32_t>::max();
00349 params.DiscardOldest = true;
00350 params.ClientHandle = (uint32_t)++LastMonitoredItemHandle;
00351
00352 MonitoringFilter filter(eventfilter);
00353 params.Filter = filter;
00354 req.RequestedParameters = params;
00355 itemsParams.ItemsToCreate.push_back(req);
00356
00357 std::vector<MonitoredItemCreateResult> results = Server->Subscriptions()->CreateMonitoredItems(itemsParams);
00358 if ( results.size() != 1 )
00359 {
00360 throw(std::runtime_error("Subscription | Protocol Error CreateMonitoredItems should return one result"));
00361 }
00362
00363 MonitoredItemData mdata;
00364 mdata.TargetNode = Node(Server, avid.NodeId);
00365 mdata.Attribute = avid.AttributeId;
00366 mdata.MonitoredItemId = results[0].MonitoredItemId;
00367 mdata.Filter = results[0].FilterResult;
00368 AttributeValueMap[params.ClientHandle] = mdata;
00369
00370
00371 MonitoredItemCreateResult res = results[0];
00372 CheckStatusCode(res.Status);
00373 SimpleAttributeOperandMap[res.MonitoredItemId] = eventfilter;
00374 return res.MonitoredItemId;
00375 }
00376
00377 }