subscription.cpp
Go to the documentation of this file.
1 /******************************************************************************
2  * Copyright (C) 2014-2014 Olivier Roulet-Dubonnet *
3  * olivier.roulet@gmail.com *
4  * *
5  * This library is free software; you can redistribute it and/or modify *
6  * it under the terms of the GNU Lesser General Public License as *
7  * published by the Free Software Foundation; version 3 of the License. *
8  * *
9  * This library is distributed in the hope that it will be useful, *
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of *
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
12  * GNU Lesser General Public License for more details. *
13  * *
14  * You should have received a copy of the GNU Lesser General Public License *
15  * along with this library; if not, write to the *
16  * Free Software Foundation, Inc., *
17  * 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. *
18  ******************************************************************************/
19 
20 
21 #include <opc/ua/subscription.h>
23 
24 #include <boost/asio.hpp>
25 #include <iostream>
26 
27 namespace OpcUa
28 {
29 Subscription::Subscription(Services::SharedPtr server, const CreateSubscriptionParameters & params, SubscriptionHandler & callback, const Common::Logger::SharedPtr & logger)
30  : Server(server), Client(callback), Logger(logger)
31 {
33  request.Parameters = params;
34  Services::SharedPtr serverptr = Server;
35  Data = Server->Subscriptions()->CreateSubscription(request, [this, serverptr](PublishResult i) { this->PublishCallback(serverptr, i); });
36  //After creating the subscription, it is expected to send at least one publish request
37  Server->Subscriptions()->Publish(PublishRequest());
38  Server->Subscriptions()->Publish(PublishRequest());
39 }
40 
42 {
43  std::vector<StatusCode> results = Server->Subscriptions()->DeleteSubscriptions(std::vector<uint32_t> {Data.SubscriptionId});
44 
45  for (auto res : results)
46  {
47  CheckStatusCode(res);
48  }
49 }
50 
51 void Subscription::PublishCallback(Services::SharedPtr server, const PublishResult result)
52 {
53 
54  LOG_DEBUG(Logger, "subscription | Suscription::PublishCallback called with {} notifications", result.NotificationMessage.NotificationData.size());
55 
56  for (const NotificationData & data : result.NotificationMessage.NotificationData)
57  {
59  {
60  LOG_DEBUG(Logger, "subscription | notification is of type DataChange");
61 
63  }
64 
66  {
67  LOG_DEBUG(Logger, "subscription | notification is of type Event");
68 
69  CallEventCallback(data);
70  }
71 
73  {
74  LOG_DEBUG(Logger, "subscription | notification is of type StatusChange");
75 
77  }
78 
79  else
80  {
81  LOG_WARN(Logger, "subscription | unknown notficiation type received: {}", data.Header.TypeId);
82  }
83  }
84 
86  ack.SubscriptionId = GetId();
88  PublishRequest request;
89  request.SubscriptionAcknowledgements.push_back(ack);
90  server->Subscriptions()->Publish(request);
91 }
92 
94 {
95  for (const MonitoredItems & item : data.DataChange.Notification)
96  {
97  std::unique_lock<std::mutex> lock(Mutex); //could used boost::shared_lock to improve perf
98 
99  AttValMap::iterator mapit = AttributeValueMap.find(item.ClientHandle);
100 
101  if (mapit == AttributeValueMap.end())
102  {
103  LOG_WARN(Logger, "subscription | got PublishResult for an unknown monitoreditem id: {}", item.ClientHandle);
104  }
105 
106  else
107  {
108  AttributeId attr = mapit->second.Attribute;
109  Node node = mapit->second.TargetNode;
110  lock.unlock(); //unlock before calling client cades, you never know what they may do
111 
112  LOG_DEBUG(Logger, "subscription | calling DataChange user callback: {} and node: {}", item.ClientHandle, mapit->second.TargetNode);
113 
114  Client.DataValueChange(mapit->second.MonitoredItemId, node, item.Value, attr);
115  Client.DataChange(mapit->second.MonitoredItemId, node, item.Value.Value, attr);
116  }
117  }
118 }
119 
121 {
123 }
124 
126 {
127  for (EventFieldList ef : data.Events.Events)
128  {
129  std::unique_lock<std::mutex> lock(Mutex); //could used boost::shared_lock to improve perf
130 
131  AttValMap::iterator mapit = AttributeValueMap.find(ef.ClientHandle);
132 
133  if (mapit == AttributeValueMap.end())
134  {
135  LOG_WARN(Logger, "subscription | got PublishResult for an unknown MonitoredItem id: {}", ef.ClientHandle);
136  }
137 
138  else
139  {
140  Event ev;
141  uint32_t count = 0;
142 
143  if (mapit->second.Filter.Event.SelectClauses.size() != ef.EventFields.size())
144  {
145  throw std::runtime_error("subscription | receive event format does not match requested filter");
146  }
147 
148  for (SimpleAttributeOperand op : mapit->second.Filter.Event.SelectClauses)
149  {
150  auto & value = ef.EventFields[count];
151  // add all fields as value
152  ev.SetValue(op.BrowsePath, value);
153  ++count;
154  // server may send NULL fields - do not try to convert them to actual values
155  if (value.IsNul())
156  {
157  continue;
158  }
159 
160  // set the default fields of events into their event attributes
161  if (op.BrowsePath.size() == 1)
162  {
163  auto & name = op.BrowsePath[0];
164 
165  if (name == QualifiedName("EventId", 0))
166  {
167  ev.EventId = value.As<ByteString>();
168  }
169 
170  else if (name == QualifiedName("EventType", 0))
171  {
172  ev.EventType = value.As<NodeId>();
173  }
174 
175  else if (name == QualifiedName("SourceNode", 0))
176  {
177  ev.SourceNode = value.As<NodeId>();
178  }
179 
180  else if (name == QualifiedName("SourceName", 0))
181  {
182  ev.SourceName = value.As<std::string>();
183  }
184 
185  else if (name == QualifiedName("Message", 0))
186  {
187  ev.Message = value.As<LocalizedText>();
188  }
189 
190  else if (name == QualifiedName("Severity", 0))
191  {
192  ev.Severity = value.As<uint16_t>();
193  }
194 
195  else if (name == QualifiedName("LocalTime", 0))
196  {
197  ev.LocalTime = value.As<DateTime>();
198  }
199 
200  else if (name == QualifiedName("ReceiveTime", 0))
201  {
202  ev.ReceiveTime = value.As<DateTime>();
203  }
204 
205  else if (name == QualifiedName("Time", 0))
206  {
207  ev.Time = value.As<DateTime>();
208  }
209  }
210  }
211 
212  lock.unlock();
213 
214  LOG_DEBUG(Logger, "subscription | calling client event callback");
215 
216  Client.Event(mapit->second.MonitoredItemId, ev);
217 
218  LOG_DEBUG(Logger, "subscription | callback call finished");
219  }
220  }
221 }
222 
224 {
225  RepublishParameters params;
227  params.RetransmitSequenceNumber = sequenceNumber;
228  RepublishResponse response = Server->Subscriptions()->Republish(params);
229  return response;
230 }
231 
233 {
234  ReadValueId avid;
235  avid.NodeId = node.GetId();
236  avid.AttributeId = attr;
237  //avid.IndexRange //We leave it null, then the entire array is returned
238  std::vector<uint32_t> results = SubscribeDataChange(std::vector<ReadValueId>({avid}));
239 
240  if (results.size() != 1) { throw std::runtime_error("subscription | SubscribeDataChange should have returned exactly one result"); }
241 
242  return results.front();
243 }
244 
245 std::vector<MonitoredItemCreateResult> Subscription::Subscribe(std::vector<MonitoredItemCreateRequest> request)
246 {
247  std::unique_lock<std::mutex> lock(Mutex);
248 
249  MonitoredItemsParameters itemsParams;
250  itemsParams.SubscriptionId = Data.SubscriptionId;
251  itemsParams.TimestampsToReturn = TimestampsToReturn(2); // Don't know for better
252 
253  for (auto req : request)
254  {
255  itemsParams.ItemsToCreate.push_back(req);
256  }
257 
258  return Server->Subscriptions()->CreateMonitoredItems(itemsParams);
259 }
260 
261 std::vector<uint32_t> Subscription::SubscribeDataChange(const std::vector<ReadValueId> & attributes)
262 {
263  std::unique_lock<std::mutex> lock(Mutex);
264 
265  MonitoredItemsParameters itemsParams;
266  itemsParams.SubscriptionId = Data.SubscriptionId;
267  itemsParams.TimestampsToReturn = TimestampsToReturn(2); // Don't know for better
268 
269  for (ReadValueId attr : attributes)
270  {
272  req.ItemToMonitor = attr;
274  MonitoringParameters params;
276  params.QueueSize = 1;
277  params.DiscardOldest = true;
278  params.ClientHandle = (uint32_t)++LastMonitoredItemHandle;
279  req.RequestedParameters = params;
280  itemsParams.ItemsToCreate.push_back(req);
281  }
282 
283  std::vector<MonitoredItemCreateResult> results = Server->Subscriptions()->CreateMonitoredItems(itemsParams);
284 
285  if (results.size() != attributes.size())
286  {
287  throw (std::runtime_error("subscription | server did not send answer for all MonitoredItem requests"));
288  }
289 
290  std::vector<uint32_t> monitoredItemsIds;
291  unsigned int i = 0;
292 
293  for (const auto & res : results)
294  {
295  CheckStatusCode(res.Status);
296 
297  LOG_DEBUG(Logger, "subscription | storing monitoreditem with handle: {} and id: {}", itemsParams.ItemsToCreate[i].RequestedParameters.ClientHandle, res.MonitoredItemId);
298 
299  MonitoredItemData mdata;
300  mdata.MonitoredItemId = res.MonitoredItemId;
301  mdata.Attribute = attributes[i].AttributeId;
302  mdata.TargetNode = Node(Server, attributes[i].NodeId);
303  AttributeValueMap[itemsParams.ItemsToCreate[i].RequestedParameters.ClientHandle] = mdata;
304  monitoredItemsIds.push_back(res.MonitoredItemId);
305  ++i;
306  }
307 
308  return monitoredItemsIds;
309 }
310 
312 {
313  AttributeValueMap[handle].usrVar = usr;
314 }
315 
317 {
318  return AttributeValueMap[handle].usrVar;
319 }
320 
322 {
323  return UnSubscribe(std::vector<uint32_t>(1, handle));
324 }
325 
326 void Subscription::UnSubscribe(std::vector<uint32_t> handles)
327 {
328  std::unique_lock<std::mutex> lock(Mutex);
329 
332  std::vector<uint32_t> mids;
333 
334  for (auto id : handles)
335  {
336  LOG_DEBUG(Logger, "subscription | sending unsubscribe for MonitoredItem id: {}", id);
337  mids.push_back(uint32_t(id));
338 
339  //Now trying to remove monitoreditem from our internal cache
340  for (auto pair : AttributeValueMap)
341  {
342  if (pair.second.MonitoredItemId == id)
343  {
344  AttributeValueMap.erase(pair.first);
345  break; //we modified our iterating object, so quit!!
346  }
347  }
348  }
349 
350  params.MonitoredItemIds = mids;
351  auto results = Server->Subscriptions()-> DeleteMonitoredItems(params);
352 
353  for (auto res : results)
354  {
355  CheckStatusCode(res);
356  }
357 }
358 
360 {
362 }
363 
364 uint32_t Subscription::SubscribeEvents(const Node & node, const Node & eventtype)
365 {
366  EventFilter filter;
367 
368  LOG_DEBUG(Logger, "subscription | subscribing events with filter for:");
369 
370  for (Node & child : eventtype.GetProperties())
371  {
372  auto propertyName = child.GetBrowseName();
373  LOG_DEBUG(Logger, " property: {}", propertyName);
374 
376  op.TypeId = eventtype.GetId();
378  op.BrowsePath = std::vector<QualifiedName>({propertyName});
379  filter.SelectClauses.push_back(op);
380  }
381 
382  return SubscribeEvents(node, filter);
383 }
384 
385 uint32_t Subscription::SubscribeEvents(const Node & node, const EventFilter & eventfilter)
386 {
387  std::unique_lock<std::mutex> lock(Mutex);
388 
389  MonitoredItemsParameters itemsParams;
390  itemsParams.SubscriptionId = Data.SubscriptionId;
391  itemsParams.TimestampsToReturn = TimestampsToReturn(2); // Don't know for better
392 
393  ReadValueId avid;
394  avid.NodeId = node.GetId();
396 
398  req.ItemToMonitor = avid;
400  MonitoringParameters params;
402  params.QueueSize = std::numeric_limits<uint32_t>::max();
403  params.DiscardOldest = true;
404  params.ClientHandle = (uint32_t)++LastMonitoredItemHandle;
405 
406  MonitoringFilter filter(eventfilter);
407  params.Filter = filter;
408  req.RequestedParameters = params;
409  itemsParams.ItemsToCreate.push_back(req);
410 
411  std::vector<MonitoredItemCreateResult> results = Server->Subscriptions()->CreateMonitoredItems(itemsParams);
412 
413  if (results.size() != 1)
414  {
415  throw (std::runtime_error("subscription | CreateMonitoredItems should return one result"));
416  }
417 
418  auto result = results[0];
419  // allow remote side to sloppily skip SelectClauses and WhereClause in its answer
420  if (result.FilterResult.Event.SelectClauses.empty())
421  {
422  result.FilterResult.Event.SelectClauses = eventfilter.SelectClauses;
423  }
424  if (result.FilterResult.Event.WhereClause.empty())
425  {
426  result.FilterResult.Event.WhereClause = eventfilter.WhereClause;
427  }
428  MonitoredItemData mdata;
429  mdata.TargetNode = Node(Server, avid.NodeId);
430  mdata.Attribute = avid.AttributeId;
431  mdata.MonitoredItemId = result.MonitoredItemId;
432  mdata.Filter = result.FilterResult;
433  AttributeValueMap[params.ClientHandle] = mdata;
434 
435  CheckStatusCode(result.Status);
436  SimpleAttributeOperandMap[result.MonitoredItemId] = eventfilter; //Not used
437  return result.MonitoredItemId;
438 }
439 
440 }
MonitoringFilter Filter
Definition: subscription.h:46
void CheckStatusCode(StatusCode code)
OpcUa Error codes. GNU LGPL.
#define LOG_WARN(__logger__,...)
Definition: common/logger.h:26
void CallEventCallback(const NotificationData &data)
int * count
OpcUa::CreateSubscriptionParameters Parameters
Subscription(Services::SharedPtr server, const CreateSubscriptionParameters &params, SubscriptionHandler &callback, const Common::Logger::SharedPtr &logger=nullptr)
std::vector< OpcUa::NotificationData > NotificationData
LocalizedText Message
Definition: event.h:41
virtual void DataChange(uint32_t handle, const Node &node, const Variant &val, AttributeId attribute)
Definition: subscription.h:58
DateTime ReceiveTime
Definition: event.h:39
uint32_t GetId() const
Definition: subscription.h:110
std::string SourceName
Definition: event.h:44
virtual void Event(uint32_t handle, const Event &event)
Definition: subscription.h:76
DateTime LocalTime
Definition: event.h:38
std::vector< QualifiedName > BrowsePath
Definition: types_manual.h:94
#define LOG_DEBUG(__logger__,...)
Definition: common/logger.h:24
void CallDataChangeCallback(const NotificationData &data)
std::vector< Node > GetProperties() const
Definition: node.cpp:296
OpcUa::MonitoringFilter Filter
NodeId SourceNode
Definition: event.h:43
uint32_t LastMonitoredItemHandle
Definition: subscription.h:154
std::vector< SimpleAttributeOperand > SelectClauses
Definition: types_manual.h:136
std::vector< Variant > EventFields
Definition: types_manual.h:48
void UnSubscribe(uint32_t handle)
name
Definition: setup.py:38
uint16_t Severity
Definition: event.h:42
RepublishResponse Republish(uint32_t sequenceNumber)
NodeId EventType
Definition: event.h:37
NodeId GetId() const
Definition: node.cpp:50
virtual void DataValueChange(uint32_t handle, const Node &node, const DataValue &val, AttributeId attribute)
Definition: subscription.h:68
Services::SharedPtr Server
Definition: subscription.h:151
std::vector< ContentFilterElement > WhereClause
Definition: types_manual.h:137
UserData * getUsrPtr(uint32_t handle)
uint32_t SubscribeDataChange(const Node &node, AttributeId attr=AttributeId::Value)
SimpleAttOpMap SimpleAttributeOperandMap
Definition: subscription.h:156
ExpandedNodeId TypeId
Definition: types.h:286
EventNotificationList Events
Definition: types_manual.h:72
ExtensionObjectHeader Header
Definition: types_manual.h:70
handle
Definition: client.py:58
AttValMap AttributeValueMap
Definition: subscription.h:155
OPC UA Address space part. GNU LGPL.
std::vector< EventFieldList > Events
Definition: types_manual.h:59
SubscriptionHandler & Client
Definition: subscription.h:153
Common::Logger::SharedPtr Logger
Definition: subscription.h:158
void setUsrPtr(uint32_t handle, UserData *usr)
A Node object represent an OPC-UA node. It is high level object intended for developper who want to e...
Definition: node.h:42
std::vector< MonitoredItemCreateResult > Subscribe(std::vector< MonitoredItemCreateRequest > request)
ByteString EventId
Definition: event.h:36
ev
Definition: server.py:46
std::vector< uint32_t > MonitoredItemIds
DataChangeNotification DataChange
Definition: types_manual.h:71
OpcUa::AttributeId AttributeId
OpcUa::MonitoringParameters RequestedParameters
OpcUa::NotificationMessage NotificationMessage
void CallStatusChangeCallback(const NotificationData &data)
virtual void StatusChange(StatusCode status)
Definition: subscription.h:83
void SetValue(const std::vector< QualifiedName > &path, Variant value)
Definition: event.cpp:35
std::vector< MonitoredItems > Notification
Definition: types_manual.h:64
std::vector< OpcUa::SubscriptionAcknowledgement > SubscriptionAcknowledgements
OpcUa::NodeId NodeId
uint32_t SubscribeEvents()
virtual void PublishCallback(Services::SharedPtr serverLocalPtr, const PublishResult result)
StatusChangeNotification StatusChange
Definition: types_manual.h:73
SubscriptionData Data
Definition: subscription.h:152
DateTime Time
Definition: event.h:40
Definition: server.py:1
OpcUa::TimestampsToReturn TimestampsToReturn
OpcUa::MonitoringMode MonitoringMode
std::vector< OpcUa::MonitoredItemCreateRequest > ItemsToCreate


ros_opcua_impl_freeopcua
Author(s): Denis Štogl
autogenerated on Tue Jan 19 2021 03:12:08