internal_subscription.cpp
Go to the documentation of this file.
2 
3 #include <boost/thread/locks.hpp>
4 
5 namespace OpcUa
6 {
7 namespace Internal
8 {
9 
10 InternalSubscription::InternalSubscription(SubscriptionServiceInternal & service, const SubscriptionData & data, const NodeId & SessionAuthenticationToken, std::function<void (PublishResult)> callback, const Common::Logger::SharedPtr & logger)
11  : Service(service)
12  , AddressSpace(Service.GetAddressSpace())
13  , Data(data)
14  , CurrentSession(SessionAuthenticationToken)
15  , Callback(callback)
16  , io(service.GetIOService())
17  , Timer(io, boost::posix_time::microseconds(static_cast<unsigned long>(1000 * data.RevisedPublishingInterval)))
18  , LifeTimeCount(data.RevisedLifetimeCount)
19  , Logger(logger)
20 {
21  LOG_DEBUG(Logger, "internal_subscription | id: {}, create", Data.SubscriptionId);
22 }
23 
25 {
26  LOG_DEBUG(Logger, "internal_subscription | id: {}, start", Data.SubscriptionId);
27  std::shared_ptr<InternalSubscription> self = shared_from_this();
28  Timer.async_wait([self](const boost::system::error_code & error) { self->PublishResults(error); });
29 }
30 
32 {
33  //Stop();
34  LOG_DEBUG(Logger, "internal_subscription | id: {}, destroy", Data.SubscriptionId);
35 }
36 
38 {
39  LOG_DEBUG(Logger, "internal_subscription | id: {}, stop", Data.SubscriptionId);
41  Timer.cancel();
42 }
43 
45 {
46  LOG_DEBUG(Logger, "internal_subscription | id: {}, DeleteAllMonitoredItems", Data.SubscriptionId);
47 
48  std::vector<uint32_t> handles;
49  {
50  boost::shared_lock<boost::shared_mutex> lock(DbMutex);
51 
52  for (auto pair : MonitoredDataChanges)
53  {
54  handles.push_back(pair.first);
55  }
56  }
57  DeleteMonitoredItemsIds(handles);
58 }
59 
61 {
62  bool expired = KeepAliveCount > LifeTimeCount ;
63 
64  if (expired)
65  {
66  LOG_DEBUG(Logger, "internal_subscription | id: {} has expired: keep alive: {} > life time: {}", Data.SubscriptionId, KeepAliveCount, LifeTimeCount);
67  }
68 
69  return expired;
70 }
71 
72 void InternalSubscription::PublishResults(const boost::system::error_code & error)
73 {
74  if (error)
75  {
76  LOG_WARN(Logger, "internal_subscription | id: {}, PublishResults: error: stopping subscription timer", Data.SubscriptionId);
77  return;
78  }
79 
80  if (HasExpired())
81  {
82  return;
83  }
84 
85  if (HasPublishResult() && Service.PopPublishRequest(CurrentSession)) //Check we received a publishrequest before sending response
86  {
87 
88  std::vector<PublishResult> results = PopPublishResult();
89 
90  if (results.size() > 0)
91  {
92  LOG_DEBUG(Logger, "internal_subscription | id: {}, have {} results", Data.SubscriptionId, results.size());
93 
94  if (Callback)
95  {
96  LOG_DEBUG(Logger, "internal_subscription | id: {}, calling callback", Data.SubscriptionId);
97  Callback(results[0]);
98  }
99 
100  else
101  {
102  LOG_DEBUG(Logger, "internal_subscription | id: {}, no callback defined for this subscription", Data.SubscriptionId);
103  }
104  }
105  }
106 
107  TimerStopped = false;
108  Timer.expires_at(Timer.expires_at() + boost::posix_time::microseconds(static_cast<unsigned long>(1000 * Data.RevisedPublishingInterval)));
109  std::shared_ptr<InternalSubscription> self = shared_from_this();
110  Timer.async_wait([self](const boost::system::error_code & error) { self->PublishResults(error); });
111 }
112 
113 
115 {
116  boost::unique_lock<boost::shared_mutex> lock(DbMutex);
117 
118  if (Startup || !TriggeredDataChangeEvents.empty() || !TriggeredEvents.empty())
119  {
120  LOG_TRACE(Logger, "internal_subscription | id: {}, HasPublishResult: all queues empty, should send publish event", Data.SubscriptionId);
121  return true;
122  }
123 
124  if (KeepAliveCount > Data.RevisedMaxKeepAliveCount) //we need to send keepalive notification
125  {
126  LOG_TRACE(Logger, "internal_subscription | id: {}, HasPublishResult: KeepAliveCount: {} > MaxKeepAliveCount: {}, should send publish event", Data.SubscriptionId, KeepAliveCount, Data.RevisedMaxKeepAliveCount);
127  return true;
128  }
129 
130  LOG_TRACE(Logger, "internal_subscription | id: {}, HasPublishResult: KeepAliveCount: {}, MaxKeepAliveCount: {}", Data.SubscriptionId, KeepAliveCount, Data.RevisedMaxKeepAliveCount);
131  ++KeepAliveCount;
132  return false;
133 
134 }
135 
136 std::vector<PublishResult> InternalSubscription::PopPublishResult()
137 {
138  boost::unique_lock<boost::shared_mutex> lock(DbMutex);
139 
140  LOG_DEBUG(Logger, "internal_subscription | id: {}, PopPublishResult: {} queued items", Data.SubscriptionId, TriggeredDataChangeEvents.size());
141  PublishResult result;
143  result.NotificationMessage.PublishTime = DateTime::Current();
144 
145  if (!TriggeredDataChangeEvents.empty())
146  {
148  result.NotificationMessage.NotificationData.push_back(data);
149  result.Results.push_back(StatusCode::Good);
150  }
151 
152  if (!TriggeredEvents.empty())
153  {
154  LOG_DEBUG(Logger, "internal_subscription | id: {}, PopPublishResult: {} events to send", Data.SubscriptionId, TriggeredEvents.size());
155 
156  EventNotificationList notif;
157 
159  {
160  notif.Events.push_back(ev.Data);
161  }
162 
163  TriggeredEvents.clear();
164  NotificationData data(notif);
165  result.NotificationMessage.NotificationData.push_back(data);
166  result.Results.push_back(StatusCode::Good);
167  }
168 
169  // clear TriggerCount to enable new events for next
170  // publishing cycle
171  for (auto & mdc : MonitoredDataChanges)
172  {
173  mdc.second.TriggerCount = 0;
174  }
175 
176  // FIXME: also add statuschange notification since they can be send in same result
177 
178  KeepAliveCount = 0;
179  Startup = false;
180 
181  result.NotificationMessage.SequenceNumber = NotificationSequence;
183  result.MoreNotifications = false;
184 
185  for (const PublishResult & res : NotAcknowledgedResults)
186  {
187  result.AvailableSequenceNumbers.push_back(res.NotificationMessage.SequenceNumber);
188  }
189 
190  NotAcknowledgedResults.push_back(result);
191 
192  LOG_DEBUG(Logger, "internal_subscription | id: {}, sending PublishResult with: {} notifications", Data.SubscriptionId, result.NotificationMessage.NotificationData.size());
193 
194  std::vector<PublishResult> resultlist;
195  resultlist.push_back(result);
196 
197  return resultlist;
198 }
199 
201 {
202  LOG_DEBUG(Logger, "internal_subscription | id: {}, Republish request for sequence: {}", Data.SubscriptionId, params.RetransmitSequenceNumber);
203 
204  boost::unique_lock<boost::shared_mutex> lock(DbMutex);
205 
206  RepublishResponse response;
207 
208  for (const PublishResult & res : NotAcknowledgedResults)
209  {
210  if (res.NotificationMessage.SequenceNumber == params.RetransmitSequenceNumber)
211  {
213  return response;
214  }
215  }
216 
218  return response;
219 }
220 
222 {
223  LOG_DEBUG(Logger, "internal_subscription | id: {}, ModifySubscription", Data.SubscriptionId);
224 
226 
227  if (data.RequestedLifetimeCount)
228  {
230  }
231 
233 
235  {
237  }
238 
240 
242  {
244  }
245 
247 
248  return result;
249 }
250 
252 {
253  DataChangeNotification notification;
254 
255  for (const TriggeredDataChange & event : TriggeredDataChangeEvents)
256  {
257  notification.Notification.push_back(event.Data);
258  }
259 
260  TriggeredDataChangeEvents.clear();
261  NotificationData data(notification);
262  return data;
263 }
264 
266 {
267  boost::unique_lock<boost::shared_mutex> lock(DbMutex);
268 
269  NotAcknowledgedResults.remove_if([&](PublishResult res) { return ack.SequenceNumber == res.NotificationMessage.SequenceNumber; });
270 }
271 
272 
274 {
275  LOG_DEBUG(Logger, "internal_subscription | id: {}, CreateMonitoredItem", Data.SubscriptionId);
276 
278  uint32_t callbackHandle = 0;
279  {
280  boost::unique_lock<boost::shared_mutex> lock(DbMutex);
281 
284  result.RevisedSamplingInterval = Data.RevisedPublishingInterval; // Force our own rate
285  result.RevisedQueueSize = request.RequestedParameters.QueueSize; // We should check that value, maybe set to a default...
286  result.FilterResult = request.RequestedParameters.Filter; // We can omit that one if we do not change anything in filter
287 
289  {
290  LOG_DEBUG(Logger, "internal_subscription | id: {}, subscribe to event notifier", Data.SubscriptionId);
291  LOG_TRACE(Logger, "internal_subscription | id: {}, {}", Data.SubscriptionId, result.FilterResult);
292 
293  // Client wants to subscribe to events
294  // FIXME: check attribute EVENT notifier is set for the node
296  }
297  }
298 
299  // Do not lock this part as it (indirectly) calls a locked AddressSpaceInMemory
300  // function.
301  // AddressSpaceInMemory functions call locked InternalSubscription functions
302  // which will result in deadlocks when used from different threads
304  {
305  LOG_DEBUG(Logger, "internal_subscription | id: {}, subscribe to data changes", Data.SubscriptionId);
306 
307  uint32_t id = result.MonitoredItemId;
308  callbackHandle = AddressSpace.AddDataChangeCallback(request.ItemToMonitor.NodeId, request.ItemToMonitor.AttributeId, [this, id](const OpcUa::NodeId & nodeId, OpcUa::AttributeId attr, const DataValue & value)
309  {
310  this->DataChangeCallback(id, value);
311  });
312  }
313 
314  MonitoredDataChange mdata;
315  {
316  boost::unique_lock<boost::shared_mutex> lock(DbMutex);
317 
318  mdata.Parameters = result;
319  mdata.Mode = request.MonitoringMode;
320  mdata.TriggerCount = 0;
322  mdata.CallbackHandle = callbackHandle;
323  mdata.MonitoredItemId = result.MonitoredItemId;
324  MonitoredDataChanges[result.MonitoredItemId] = mdata;
325  }
326 
327  // Do not lock this part as it (indirectly) calls a locked AddressSpaceInMemory
328  // function.
329  LOG_DEBUG(Logger, "internal_subscription | id: {}, created MonitoredItem id: {}, ClientHandle: {}", Data.SubscriptionId, result.MonitoredItemId, mdata.ClientHandle);
330 
331  // Forcing event
333  {
334  TriggerDataChangeEvent(mdata, request.ItemToMonitor);
335  }
336 
337  return result;
338 }
339 
341 {
342  LOG_DEBUG(Logger, "internal_subscription | id: {}, TriggerDataChangeEvent: ClientHandle: {}", Data.SubscriptionId, monitoreditems.ClientHandle);
343 
344  ReadParameters params;
345  params.AttributesToRead.push_back(attrval);
346  std::vector<DataValue> vals = AddressSpace.Read(params);
347 
348  TriggeredDataChange event;
349  event.MonitoredItemId = monitoreditems.MonitoredItemId;
350  event.Data.ClientHandle = monitoreditems.ClientHandle;
351  event.Data.Value = vals[0];
352  {
353  boost::unique_lock<boost::shared_mutex> lock(DbMutex);
354 
355  TriggeredDataChangeEvents.push_back(event);
356  }
357 }
358 
359 std::vector<StatusCode> InternalSubscription::DeleteMonitoredItemsIds(const std::vector<uint32_t> & monitoreditemsids)
360 {
361  std::vector<StatusCode> results;
362 
363  for (const uint32_t & handle : monitoreditemsids)
364  {
365  LOG_DEBUG(Logger, "internal_subscription | id: {}, DeletingMonitoredItemsIds: handle: {}", Data.SubscriptionId, handle);
366 
368  {
369  results.push_back(StatusCode::Good);
370  continue;
371  }
372 
374  {
375  results.push_back(StatusCode::Good);
376  continue;
377  }
378 
379  results.push_back(StatusCode::BadMonitoredItemIdInvalid);
380 
381  }
382 
383  return results;
384 }
385 
387 {
388  boost::unique_lock<boost::shared_mutex> lock(DbMutex);
389 
390  MonitoredDataChangeMap::iterator it = MonitoredDataChanges.find(handle);
391 
392  if (it == MonitoredDataChanges.end())
393  {
394  return false;
395  }
396 
397  else
398  {
399  if (it->second.CallbackHandle != 0) //if 0 this monitoreditem did not use callbacks
400  {
401  lock.unlock();
402  // break deadlock condition: InternalSubscription <-> AddressSpace
403  AddressSpace.DeleteDataChangeCallback(it->second.CallbackHandle);
404  lock.lock();
405  }
406 
407  MonitoredDataChanges.erase(handle);
408 
409  //We remove you our monitoreditem, now empty events which are already triggered
410  for (auto ev = TriggeredDataChangeEvents.begin(); ev != TriggeredDataChangeEvents.end();)
411  {
412  if (ev->MonitoredItemId == handle)
413  {
414  LOG_DEBUG(Logger, "internal_subscription | id: {}, remove TriggeredDataChangeEvents of MonitoredItemId: {}", Data.SubscriptionId, handle);
415 
417  }
418 
419  else
420  {
421  ++ev;
422  }
423  }
424 
425  return true;
426  }
427 }
428 
430 {
431  boost::unique_lock<boost::shared_mutex> lock(DbMutex);
432 
433  for (auto pair : MonitoredEvents)
434  {
435  if (pair.second == handle)
436  {
437  MonitoredEvents.erase(pair.first);
438 
439  //We remove you our monitoreditem, now empty events which are already triggered
440  for (auto ev = TriggeredEvents.begin(); ev != TriggeredEvents.end();)
441  {
442  if (ev->MonitoredItemId == handle)
443  {
444  LOG_DEBUG(Logger, "internal_subscription | id: {}, remove TriggeredEvents of MonitoredItemId: {}", Data.SubscriptionId, handle);
445 
446  ev = TriggeredEvents.erase(ev);
447  }
448 
449  else
450  {
451  ++ev;
452  }
453  }
454 
455  return true;
456  }
457  }
458 
459  return false;
460 }
461 
462 void InternalSubscription::DataChangeCallback(const uint32_t & m_id, const DataValue & value)
463 {
464  boost::unique_lock<boost::shared_mutex> lock(DbMutex);
465 
466  TriggeredDataChange event;
467  MonitoredDataChangeMap::iterator it_monitoreditem = MonitoredDataChanges.find(m_id);
468 
469  if (it_monitoreditem == MonitoredDataChanges.end())
470  {
471  LOG_WARN(Logger, "internal_subscription | id: {}, DataChangeCallback called for unknown item: {}", Data.SubscriptionId, m_id);
472  return ;
473  }
474 
475  MonitoredDataChange& monitoredDataChange = it_monitoreditem->second;
476  // spec says default sample interval for MonitoredItems is the same
477  // as Subscription publishing interval, so bail out if event has been
478  // triggered before
479  if (monitoredDataChange.TriggerCount > 0)
480  {
481  return;
482  }
483  event.MonitoredItemId = it_monitoreditem->first;
484  event.Data.ClientHandle = monitoredDataChange.ClientHandle;
485  event.Data.Value = value;
486 
487  LOG_DEBUG(Logger, "internal_subscription | id: {}, enqueue TriggeredDataChange event: ClientHandle: {}", Data.SubscriptionId, event.Data.ClientHandle);
488 
489  ++monitoredDataChange.TriggerCount;
490  TriggeredDataChangeEvents.push_back(event);
491 }
492 
494 {
495  boost::shared_lock<boost::shared_mutex> lock(DbMutex);
496 
497  MonitoredEventsMap::iterator it = MonitoredEvents.find(node);
498 
499  if (it == MonitoredEvents.end())
500  {
501  LOG_DEBUG(Logger, "internal_subscription | id: {} does not monitor NodeId: {}", Data.SubscriptionId, node);
502 
503  return;
504  }
505 
506  lock.unlock();//Enqueue vill need to set a unique lock
507  EnqueueEvent(it->second, event);
508 }
509 
510 bool InternalSubscription::EnqueueEvent(uint32_t monitoredItemId, const Event & event)
511 {
512  LOG_DEBUG(Logger, "internal_subscription | id: {}, EnqueEvent: {}", Data.SubscriptionId, event);
513 
514  boost::unique_lock<boost::shared_mutex> lock(DbMutex);
515 
516  //Find monitoredItem
517  std::map<uint32_t, MonitoredDataChange>::iterator mii_it = MonitoredDataChanges.find(monitoredItemId);
518 
519  if (mii_it == MonitoredDataChanges.end())
520  {
521  LOG_DEBUG(Logger, "internal_subscription | id: {}, MonitoredItemId: {} is already deleted", Data.SubscriptionId, monitoredItemId);
522 
523  return false;
524  }
525 
526  //Check filter against event data and create EventFieldList to send
527  //FIXME: Here we should also check event agains WhereClause of filter
528  EventFieldList fieldlist;
529  fieldlist.ClientHandle = mii_it->second.ClientHandle;
530  fieldlist.EventFields = GetEventFields(mii_it->second.Parameters.FilterResult.Event, event);
532  ev.Data = fieldlist;
533  ev.MonitoredItemId = monitoredItemId;
534  TriggeredEvents.push_back(ev);
535  return true;
536 }
537 
538 std::vector<Variant> InternalSubscription::GetEventFields(const EventFilter & filter, const Event & event)
539 {
540  //Go through filter and add value og matches as in spec
541  std::vector<Variant> fields;
542 
543  LOG_DEBUG(Logger, "internal_subscription | id: {}, GetEventFields: filter size: {}", Data.SubscriptionId, filter.SelectClauses.size());
544 
545  for (SimpleAttributeOperand sattr : filter.SelectClauses)
546  {
547  LOG_DEBUG(Logger, "internal_subscription | id: {}, BrowsePath size: {}", Data.SubscriptionId, sattr.BrowsePath.size());
548 
549  if (sattr.BrowsePath.size() == 0)
550  {
551  fields.push_back(event.GetValue(sattr.Attribute));
552  }
553 
554  else
555  {
556  LOG_DEBUG(Logger, "internal_subscription | id: {}, send value for: {}", Data.SubscriptionId, sattr.BrowsePath[0]);
557 
558  if (sattr.BrowsePath[0] == QualifiedName("EventId", 0))
559  {
560  fields.push_back(event.EventId);
561  }
562 
563  else if (sattr.BrowsePath[0] == QualifiedName("EventType", 0))
564  {
565  fields.push_back(event.EventType);
566  }
567 
568  else if (sattr.BrowsePath[0] == QualifiedName("SourceNode", 0))
569  {
570  fields.push_back(event.SourceNode);
571  }
572 
573  else if (sattr.BrowsePath[0] == QualifiedName("SourceName", 0))
574  {
575  fields.push_back(event.SourceName);
576  }
577 
578  else if (sattr.BrowsePath[0] == QualifiedName("Message", 0))
579  {
580  LOG_DEBUG(Logger, "internal_subscription | message is: {}", event.Message);
581 
582  fields.push_back(event.Message);
583  }
584 
585  else if (sattr.BrowsePath[0] == QualifiedName("Severity", 0))
586  {
587  fields.push_back(event.Severity);
588  }
589 
590  else if (sattr.BrowsePath[0] == QualifiedName("LocalTime", 0))
591  {
592  fields.push_back(event.LocalTime);
593  }
594 
595  else if (sattr.BrowsePath[0] == QualifiedName("ReceiveTime", 0))
596  {
597  fields.push_back(event.ReceiveTime);
598  }
599 
600  else if (sattr.BrowsePath[0] == QualifiedName("Time", 0))
601  {
602  fields.push_back(event.Time);
603  }
604 
605  else
606  {
607  fields.push_back(event.GetValue(sattr.BrowsePath));
608  }
609  }
610  }
611 
612  return fields;
613 }
614 
615 
616 }
617 }
618 
619 
void TriggerDataChangeEvent(MonitoredDataChange monitoreditems, ReadValueId attrval)
std::vector< OpcUa::ReadValueId > AttributesToRead
#define LOG_TRACE(__logger__,...)
Definition: common/logger.h:23
#define LOG_WARN(__logger__,...)
Definition: common/logger.h:26
Opc Ua computer interface. GNU LGPL.
LocalizedText Message
Definition: event.h:41
bool EnqueueEvent(uint32_t monitoreditemid, const Event &event)
DateTime ReceiveTime
Definition: event.h:39
std::string SourceName
Definition: event.h:44
StatusCode ServiceResult
Definition: types.h:259
DateTime LocalTime
Definition: event.h:38
std::vector< QualifiedName > BrowsePath
Definition: types_manual.h:94
#define LOG_DEBUG(__logger__,...)
Definition: common/logger.h:24
void PublishResults(const boost::system::error_code &error)
std::list< PublishResult > NotAcknowledgedResults
OpcUa::MonitoringFilter Filter
RepublishResponse Republish(const RepublishParameters &params)
NodeId SourceNode
Definition: event.h:43
std::function< void(PublishResult)> Callback
std::vector< SimpleAttributeOperand > SelectClauses
Definition: types_manual.h:136
std::vector< Variant > EventFields
Definition: types_manual.h:48
OpcUa::MonitoringFilter FilterResult
uint16_t Severity
Definition: event.h:42
NodeId EventType
Definition: event.h:37
void NewAcknowlegment(const SubscriptionAcknowledgement &ack)
std::list< TriggeredEvent > TriggeredEvents
SubscriptionServiceInternal & Service
InternalSubscription(SubscriptionServiceInternal &service, const SubscriptionData &data, const NodeId &SessionAuthenticationToken, std::function< void(PublishResult)> Callback, const Common::Logger::SharedPtr &logger)
void TriggerEvent(NodeId node, Event event)
handle
Definition: client.py:58
std::vector< Variant > GetEventFields(const EventFilter &filter, const Event &event)
std::vector< StatusCode > DeleteMonitoredItemsIds(const std::vector< uint32_t > &ids)
OPC UA Address space part. GNU LGPL.
MonitoredItemCreateResult CreateMonitoredItem(const MonitoredItemCreateRequest &request)
Variant GetValue(const std::vector< QualifiedName > &path) const
Definition: event.cpp:57
std::vector< EventFieldList > Events
Definition: types_manual.h:59
void DataChangeCallback(const uint32_t &, const DataValue &value)
ByteString EventId
Definition: event.h:36
std::vector< PublishResult > PopPublishResult()
static DateTime Current()
ev
Definition: server.py:46
std::list< TriggeredDataChange > TriggeredDataChangeEvents
OpcUa::AttributeId AttributeId
BasicData Data
Definition: format.h:993
OpcUa::MonitoringParameters RequestedParameters
OpcUa::NotificationMessage NotificationMessage
std::vector< MonitoredItems > Notification
Definition: types_manual.h:64
OpcUa::ResponseHeader Header
OpcUa::NodeId NodeId
ModifySubscriptionResult ModifySubscription(const ModifySubscriptionParameters &data)
DateTime Time
Definition: event.h:40
OpcUa::NotificationMessage NotificationMessage
OpcUa::MonitoringMode MonitoringMode


ros_opcua_impl_freeopcua
Author(s): Denis Štogl
autogenerated on Tue Jan 19 2021 03:12:06