subscription_service_internal.cpp
Go to the documentation of this file.
00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 #include "subscription_service_internal.h"
00011 
00012 #include <boost/thread/locks.hpp>
00013 
00014 
00015 namespace OpcUa
00016 {
00017   namespace Internal
00018   {
00019 
00020     SubscriptionServiceInternal::SubscriptionServiceInternal(Server::AddressSpace::SharedPtr addressspace, boost::asio::io_service& ioService, bool debug)
00021       : io(ioService)
00022       , AddressSpace(addressspace)
00023       , Debug(debug)
00024     {
00025     }
00026 
00027     SubscriptionServiceInternal::~SubscriptionServiceInternal()
00028     {
00029     }
00030 
00031     Server::AddressSpace& SubscriptionServiceInternal::GetAddressSpace()
00032     {
00033       return *AddressSpace;
00034     }
00035 
00036     boost::asio::io_service& SubscriptionServiceInternal::GetIOService()
00037     {
00038       return io;
00039     }
00040 
00041     void SubscriptionServiceInternal::DeleteAllSubscriptions()
00042     {
00043       if (Debug) std::cout << "SubscriptionService | Deleting all subscriptions." << std::endl;
00044 
00045       std::vector<uint32_t> ids(SubscriptionsMap.size());
00046       {
00047         boost::shared_lock<boost::shared_mutex> lock(DbMutex);
00048         std::transform(SubscriptionsMap.begin(), SubscriptionsMap.end(), ids.begin(), [](const SubscriptionsIdMap::value_type& i){return i.first;});
00049       }
00050 
00051       DeleteSubscriptions(ids);
00052     }
00053 
00054     std::vector<StatusCode> SubscriptionServiceInternal::DeleteSubscriptions(const std::vector<uint32_t>& subscriptions)
00055     {
00056       boost::unique_lock<boost::shared_mutex> lock(DbMutex);
00057 
00058       std::vector<StatusCode> result;
00059       for (const uint32_t& subid: subscriptions)
00060       {
00061         SubscriptionsIdMap::iterator itsub = SubscriptionsMap.find(subid);
00062         if ( itsub == SubscriptionsMap.end())
00063         {
00064           std::cout << "SubscriptionService | Error, got request to delete non existing Subscription: " << subid << std::endl;
00065           result.push_back(StatusCode::BadSubscriptionIdInvalid);
00066         }
00067         else
00068         {
00069           if (Debug) std::cout << "SubscriptionService | Deleting Subscription: " << subid << std::endl;
00070           itsub->second->Stop();
00071           SubscriptionsMap.erase(subid);
00072           result.push_back(StatusCode::Good);
00073         }
00074       }
00075       return result;
00076     }
00077 
00078     SubscriptionData SubscriptionServiceInternal::CreateSubscription(const CreateSubscriptionRequest& request, std::function<void (PublishResult)> callback)
00079     {
00080       boost::unique_lock<boost::shared_mutex> lock(DbMutex);
00081 
00082       SubscriptionData data;
00083       data.SubscriptionId = ++LastSubscriptionId;
00084       data.RevisedLifetimeCount = request.Parameters.RequestedLifetimeCount;
00085       data.RevisedPublishingInterval = request.Parameters.RequestedPublishingInterval;
00086       data.RevisedMaxKeepAliveCount = request.Parameters.RequestedMaxKeepAliveCount;
00087       if (Debug) std::cout << "SubscriptionService | Creating Subscription with Id: " << data.SubscriptionId << std::endl;
00088 
00089       std::shared_ptr<InternalSubscription> sub(new InternalSubscription(*this, data, request.Header.SessionAuthenticationToken, callback, Debug));
00090       sub->Start();
00091       SubscriptionsMap[data.SubscriptionId] = sub;
00092       return data;
00093     }
00094 
00095     std::vector<MonitoredItemCreateResult> SubscriptionServiceInternal::CreateMonitoredItems(const MonitoredItemsParameters& params)
00096     {
00097       boost::unique_lock<boost::shared_mutex> lock(DbMutex);
00098 
00099       std::vector<MonitoredItemCreateResult> data;
00100 
00101       SubscriptionsIdMap::iterator itsub = SubscriptionsMap.find(params.SubscriptionId);
00102       if ( itsub == SubscriptionsMap.end()) //SubscriptionId does not exist, return errors for all items
00103       {
00104         for (int j=0; j<(int)params.ItemsToCreate.size(); j++)
00105         {
00106           MonitoredItemCreateResult res;
00107           res.Status = StatusCode::BadSubscriptionIdInvalid;
00108           data.push_back(res);
00109         }
00110         return data;
00111       }
00112 
00113       for (const MonitoredItemCreateRequest& req: params.ItemsToCreate) //FIXME: loop could be in InternalSubscription
00114       {
00115         MonitoredItemCreateResult result = itsub->second->CreateMonitoredItem(req);
00116         data.push_back(result);
00117       }
00118       return data;
00119 
00120     }
00121 
00122     std::vector<StatusCode> SubscriptionServiceInternal::DeleteMonitoredItems(const DeleteMonitoredItemsParameters& params)
00123     {
00124       boost::unique_lock<boost::shared_mutex> lock(DbMutex);
00125 
00126       std::vector<StatusCode> results;
00127 
00128       SubscriptionsIdMap::iterator itsub = SubscriptionsMap.find(params.SubscriptionId);
00129       if ( itsub == SubscriptionsMap.end()) //SubscriptionId does not exist, return errors for all items
00130       {
00131         for (int j=0; j<(int)params.MonitoredItemIds.size(); j++)
00132         {
00133           results.push_back(StatusCode::BadSubscriptionIdInvalid);
00134         }
00135         return results;
00136       }
00137 
00138       results = itsub->second->DeleteMonitoredItemsIds(params.MonitoredItemIds);
00139       return results;
00140     }
00141 
00142     void SubscriptionServiceInternal::Publish(const PublishRequest& request)
00143     {
00144       boost::unique_lock<boost::shared_mutex> lock(DbMutex);
00145 
00146       if ( PublishRequestQueues[request.Header.SessionAuthenticationToken] < 100 )
00147       {
00148         PublishRequestQueues[request.Header.SessionAuthenticationToken] += 1;
00149       }
00150       //FIXME: else spec says we should return error to warn client
00151 
00152       for (SubscriptionAcknowledgement ack:  request.SubscriptionAcknowledgements)
00153       {
00154         SubscriptionsIdMap::iterator sub_it = SubscriptionsMap.find(ack.SubscriptionId);
00155         if ( sub_it != SubscriptionsMap.end())
00156         {
00157           sub_it->second->NewAcknowlegment(ack);
00158         }
00159       }
00160     }
00161 
00162     RepublishResponse SubscriptionServiceInternal::Republish(const RepublishParameters& params)
00163     {
00164       boost::shared_lock<boost::shared_mutex> lock(DbMutex);
00165       
00166       SubscriptionsIdMap::iterator sub_it = SubscriptionsMap.find(params.SubscriptionId);
00167       if ( sub_it == SubscriptionsMap.end())
00168       {
00169         RepublishResponse response;
00170         response.Header.ServiceResult = StatusCode::BadSubscriptionIdInvalid;
00171         return response;
00172       }
00173       return sub_it->second->Republish(params);
00174     }
00175 
00176 
00177     bool SubscriptionServiceInternal::PopPublishRequest(NodeId node)
00178     {
00179       std::map<NodeId, uint32_t>::iterator queue_it = PublishRequestQueues.find(node);
00180       if ( queue_it == PublishRequestQueues.end() )
00181       {
00182         std::cout << "SubscriptionService | Error request for publish queue for unknown session: " << node << " queue are available for: ";
00183         for ( auto i: PublishRequestQueues ){
00184           std::cout << "    " << i.first ;
00185         }
00186         std::cout << std::endl;
00187         return false;
00188       }
00189       else
00190       {
00191         if ( queue_it->second == 0 )
00192         {
00193           std::cout << "SubscriptionService | Missing publish request, cannot send response for session: " << node << std::endl;
00194           return false;
00195         }
00196         else
00197         {
00198           --queue_it->second;
00199           return true;
00200         }
00201       }
00202     }
00203 
00204     void SubscriptionServiceInternal::TriggerEvent(NodeId node, Event event)
00205     {
00206       boost::shared_lock<boost::shared_mutex> lock(DbMutex);
00207 
00208       //A new id must be generated every time we trigger an event, 
00209       //if user have not set it manually we force something
00210       if ( event.EventId.Data.empty() ) 
00211       {
00212         event.EventId = GenerateEventId();
00213       }
00214 
00215       for (auto sub : SubscriptionsMap)
00216       {
00217         sub.second->TriggerEvent(node, event);
00218       }
00219 
00220     }
00221 
00222   } // namespace Internal
00223 
00224   namespace Server
00225   {
00226 
00227     SubscriptionService::UniquePtr CreateSubscriptionService(std::shared_ptr<Server::AddressSpace> addressspace, boost::asio::io_service& io, bool debug)
00228     {
00229       return SubscriptionService::UniquePtr(new Internal::SubscriptionServiceInternal(addressspace, io, debug));
00230     }
00231 
00232   }
00233 }
00234 


ros_opcua_impl_freeopcua
Author(s): Denis Štogl
autogenerated on Sat Jun 8 2019 18:24:57