subscription_service_internal.cpp
Go to the documentation of this file.
1 
11 
12 #include <boost/thread/locks.hpp>
13 
14 namespace
15 {
16 OpcUa::ByteString GenerateEventId()
17 {
18  //stupid id generator
20 
21  for (int i = 0; i < 8; i++)
22  {
23  int32_t val = rand() % std::numeric_limits<int32_t>::max();
24  str.Data.push_back(val);
25  }
26 
27  return str;
28 }
29 }
30 
31 namespace OpcUa
32 {
33 namespace Internal
34 {
35 
36 SubscriptionServiceInternal::SubscriptionServiceInternal(Server::AddressSpace::SharedPtr addressspace, boost::asio::io_service & ioService, const Common::Logger::SharedPtr & logger)
37  : io(ioService)
38  , AddressSpace(addressspace)
39  , Logger(logger)
40 {
41 }
42 
44 {
45 }
46 
48 {
49  return *AddressSpace;
50 }
51 
52 boost::asio::io_service & SubscriptionServiceInternal::GetIOService()
53 {
54  return io;
55 }
56 
58 {
59  LOG_DEBUG(Logger, "subscription_service | DeleteAllSubscriptions");
60 
61  std::vector<uint32_t> ids(SubscriptionsMap.size());
62  {
63  boost::shared_lock<boost::shared_mutex> lock(DbMutex);
64  std::transform(SubscriptionsMap.begin(), SubscriptionsMap.end(), ids.begin(), [](const SubscriptionsIdMap::value_type & i) {return i.first;});
65  }
66 
68 }
69 
70 std::vector<StatusCode> SubscriptionServiceInternal::DeleteSubscriptions(const std::vector<uint32_t> & subscriptions)
71 {
72  boost::unique_lock<boost::shared_mutex> lock(DbMutex);
73 
74  std::vector<StatusCode> result;
75 
76  for (const uint32_t & subid : subscriptions)
77  {
78  SubscriptionsIdMap::iterator itsub = SubscriptionsMap.find(subid);
79 
80  if (itsub == SubscriptionsMap.end())
81  {
82  LOG_ERROR(Logger, "subscription_service | got request to delete non existing SubscriptionId: {}", subid);
83  result.push_back(StatusCode::BadSubscriptionIdInvalid);
84  }
85 
86  else
87  {
88  LOG_DEBUG(Logger, "subscription_service | delete SubscriptionId: {}", subid);
89 
90  itsub->second->Stop();
91  SubscriptionsMap.erase(subid);
92  result.push_back(StatusCode::Good);
93  }
94  }
95 
96  return result;
97 }
98 
100 {
101  boost::unique_lock<boost::shared_mutex> lock(DbMutex);
102 
104 
105  uint32_t subid = parameters.SubscriptionId;
106  SubscriptionsIdMap::iterator itsub = SubscriptionsMap.find(subid);
107 
108  if (itsub == SubscriptionsMap.end())
109  {
110  LOG_ERROR(Logger, "subscription_service | got request to modify non existing SubscriptionId: {}", subid);
112  return response;
113  }
114 
115  LOG_DEBUG(Logger, "subscription_service | modify SubscriptionId: {}", subid);
116 
117  std::shared_ptr<InternalSubscription> sub = itsub->second;
118  response.Parameters = sub->ModifySubscription(parameters);
119  return response;
120 }
121 
123 {
124  boost::unique_lock<boost::shared_mutex> lock(DbMutex);
125 
126  SubscriptionData data;
131 
132  LOG_DEBUG(Logger, "subscription_service | CreateSubscription id: {}", data.SubscriptionId);
133 
134  std::shared_ptr<InternalSubscription> sub(new InternalSubscription(*this, data, request.Header.SessionAuthenticationToken, callback, Logger));
135  sub->Start();
137  return data;
138 }
139 
140 std::vector<MonitoredItemCreateResult> SubscriptionServiceInternal::CreateMonitoredItems(const MonitoredItemsParameters & params)
141 {
142  boost::unique_lock<boost::shared_mutex> lock(DbMutex);
143 
144  std::vector<MonitoredItemCreateResult> data;
145 
146  SubscriptionsIdMap::iterator itsub = SubscriptionsMap.find(params.SubscriptionId);
147 
148  if (itsub == SubscriptionsMap.end()) //SubscriptionId does not exist, return errors for all items
149  {
150  for (int j = 0; j < (int)params.ItemsToCreate.size(); j++)
151  {
154  data.push_back(res);
155  }
156 
157  return data;
158  }
159 
160  for (const MonitoredItemCreateRequest & req : params.ItemsToCreate) //FIXME: loop could be in InternalSubscription
161  {
162  MonitoredItemCreateResult result = itsub->second->CreateMonitoredItem(req);
163  data.push_back(result);
164  }
165 
166  return data;
167 
168 }
169 
171 {
172  boost::unique_lock<boost::shared_mutex> lock(DbMutex);
173 
174  std::vector<StatusCode> results;
175 
176  SubscriptionsIdMap::iterator itsub = SubscriptionsMap.find(params.SubscriptionId);
177 
178  if (itsub == SubscriptionsMap.end()) //SubscriptionId does not exist, return errors for all items
179  {
180  for (int j = 0; j < (int)params.MonitoredItemIds.size(); j++)
181  {
182  results.push_back(StatusCode::BadSubscriptionIdInvalid);
183  }
184 
185  return results;
186  }
187 
188  results = itsub->second->DeleteMonitoredItemsIds(params.MonitoredItemIds);
189  return results;
190 }
191 
193 {
194  boost::unique_lock<boost::shared_mutex> lock(DbMutex);
195 
196  const NodeId& session = request.Header.SessionAuthenticationToken;
197  if (PublishRequestQueues[session] < 100)
198  {
199  PublishRequestQueues[session] += 1;
200  LOG_DEBUG(Logger, "subscription_service | push PublishRequest for session: {}: available requests: {}", session, PublishRequestQueues[session]);
201  }
202 
203  //FIXME: else spec says we should return error to warn client
204 
206  {
207  SubscriptionsIdMap::iterator sub_it = SubscriptionsMap.find(ack.SubscriptionId);
208 
209  if (sub_it != SubscriptionsMap.end())
210  {
211  sub_it->second->NewAcknowlegment(ack);
212  }
213  }
214 }
215 
217 {
218  boost::shared_lock<boost::shared_mutex> lock(DbMutex);
219 
220  SubscriptionsIdMap::iterator sub_it = SubscriptionsMap.find(params.SubscriptionId);
221 
222  if (sub_it == SubscriptionsMap.end())
223  {
224  RepublishResponse response;
226  return response;
227  }
228 
229  return sub_it->second->Republish(params);
230 }
231 
232 
234 {
235  std::map<NodeId, uint32_t>::iterator queue_it = PublishRequestQueues.find(node);
236 
237  if (queue_it == PublishRequestQueues.end())
238  {
239  LOG_ERROR(Logger, "subscription_service | attempt to pop publish request for unknown session: {}", node);
240 
241  if (Logger && Logger->should_log(spdlog::level::debug))
242  {
243  for (auto i : PublishRequestQueues)
244  {
245  Logger->debug("subscription_service | available session: {}", i.first);
246  }
247  }
248  return false;
249  }
250 
251  else
252  {
253  if (queue_it->second == 0)
254  {
255  LOG_ERROR(Logger, "subscription_service | unable to send response: no publish request for session: {}", node);
256  return false;
257  }
258 
259  else
260  {
261  LOG_DEBUG(Logger, "subscription_service | pop PublishRequest for session: {}: available requests: {}", node, queue_it->second);
262  --queue_it->second;
263  return true;
264  }
265  }
266 }
267 
269 {
270  boost::shared_lock<boost::shared_mutex> lock(DbMutex);
271 
272  //A new id must be generated every time we trigger an event,
273  //if user have not set it manually we force something
274  if (event.EventId.Data.empty())
275  {
276  event.EventId = GenerateEventId();
277  }
278 
279  for (auto sub : SubscriptionsMap)
280  {
281  sub.second->TriggerEvent(node, event);
282  }
283 
284 }
285 
286 } // namespace Internal
287 
288 namespace Server
289 {
290 
291 SubscriptionService::UniquePtr CreateSubscriptionService(std::shared_ptr<Server::AddressSpace> addressspace, boost::asio::io_service & io, const Common::Logger::SharedPtr & logger)
292 {
293  return SubscriptionService::UniquePtr(new Internal::SubscriptionServiceInternal(addressspace, io, logger));
294 }
295 
296 }
297 }
298 
OpcUa::CreateSubscriptionParameters Parameters
virtual SubscriptionData CreateSubscription(const CreateSubscriptionRequest &request, std::function< void(PublishResult)> callback)
sub
Definition: client.py:55
#define LOG_ERROR(__logger__,...)
Definition: common/logger.h:27
StatusCode ServiceResult
Definition: types.h:259
#define LOG_DEBUG(__logger__,...)
Definition: common/logger.h:24
std::vector< uint8_t > Data
Definition: types.h:37
SubscriptionService::UniquePtr CreateSubscriptionService(std::shared_ptr< Server::AddressSpace > addressspace, boost::asio::io_service &io, const Common::Logger::SharedPtr &logger)
SubscriptionServiceInternal(Server::AddressSpace::SharedPtr addressspace, boost::asio::io_service &io, const Common::Logger::SharedPtr &logger)
virtual RepublishResponse Republish(const RepublishParameters &request)
OpcUa::ModifySubscriptionResult Parameters
OPC UA Address space part. GNU LGPL.
virtual std::vector< StatusCode > DeleteSubscriptions(const std::vector< uint32_t > &subscriptions)
ExpandedNodeId SessionAuthenticationToken
Definition: types.h:188
ByteString EventId
Definition: event.h:36
std::vector< uint32_t > MonitoredItemIds
virtual std::vector< MonitoredItemCreateResult > CreateMonitoredItems(const MonitoredItemsParameters &params)
virtual std::vector< StatusCode > DeleteMonitoredItems(const DeleteMonitoredItemsParameters &params)
OpcUa::ResponseHeader Header
std::vector< OpcUa::SubscriptionAcknowledgement > SubscriptionAcknowledgements
virtual ModifySubscriptionResponse ModifySubscription(const ModifySubscriptionParameters &parameters)
virtual void Publish(const PublishRequest &request)
const char Server[]
Definition: strings.h:121
OpcUa::RequestHeader Header
std::vector< OpcUa::MonitoredItemCreateRequest > ItemsToCreate


ros_opcua_impl_freeopcua
Author(s): Denis Štogl
autogenerated on Tue Jan 19 2021 03:12:08