opc_tcp_processor.cpp
Go to the documentation of this file.
00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 #include "opc_tcp_processor.h"
00012 
00013 #include "opcua_protocol.h"
00014 
00015 #include <opc/common/uri_facade.h>
00016 #include <opc/ua/connection_listener.h>
00017 #include <opc/ua/protocol/binary/common.h>
00018 #include <opc/ua/protocol/binary/stream.h>
00019 #include <opc/ua/protocol/input_from_buffer.h>
00020 #include <opc/ua/protocol/monitored_items.h>
00021 #include <opc/ua/protocol/object_ids.h>
00022 #include <opc/ua/protocol/secure_channel.h>
00023 #include <opc/ua/protocol/session.h>
00024 #include <opc/ua/protocol/status_codes.h>
00025 #include <opc/ua/protocol/string_utils.h>
00026 #include <opc/ua/server/addons/endpoints_services.h>
00027 #include <opc/ua/server/addons/opcua_protocol.h>
00028 #include <opc/ua/server/addons/services_registry.h>
00029 
00030 #include <boost/thread/locks.hpp>
00031 #include <chrono>
00032 #include <iostream>
00033 #include <list>
00034 #include <mutex>
00035 #include <stdexcept>
00036 #include <sstream>
00037 #include <queue>
00038 
00039 
00040 namespace OpcUa
00041 {
00042   namespace Server
00043   {
00044 
00045     using namespace OpcUa::Binary;
00046 
00047     OpcTcpMessages::OpcTcpMessages(std::shared_ptr<OpcUa::Services> computer, OpcUa::OutputChannel& outputChannel, bool debug)
00048       : Server(computer)
00049       , OutputStream(outputChannel)
00050       , Debug(debug)
00051       , ChannelId(1)
00052       , TokenId(2)
00053       , SessionId(GenerateSessionId())
00054       , SequenceNb(0)
00055     {
00056       std::cout << "opc_tcp_processor| Debug is " << Debug << std::endl;
00057       std::cout << "opc_tcp_processor| SessionId is " << Debug << std::endl;
00058     }
00059 
00060 
00061     OpcTcpMessages::~OpcTcpMessages()
00062     {
00063       // This is a hack, we cannot leave subcsriptoins running since they have a cllback to us
00064       try
00065       {
00066         DeleteAllSubscriptions();
00067       }
00068       catch (const std::exception& exc)
00069       {
00070         std::cerr << "Error during stopping OpcTcpMessages. " << exc.what() <<std::endl;
00071       }
00072     }
00073 
00074     bool OpcTcpMessages::ProcessMessage(MessageType msgType, IStreamBinary& iStream)
00075     {
00076       boost::unique_lock<boost::shared_mutex> lock(ProcessMutex);
00077 
00078       switch (msgType)
00079       {
00080         case MT_HELLO:
00081         {
00082           if (Debug) std::clog << "opc_tcp_processor| Accepted hello message." << std::endl;
00083           HelloClient(iStream, OutputStream);
00084           break;
00085         }
00086 
00087 
00088         case MT_SECURE_OPEN:
00089         {
00090           if (Debug) std::clog << "opc_tcp_processor| Opening secure channel." << std::endl;
00091           OpenChannel(iStream, OutputStream);
00092           break;
00093         }
00094 
00095         case MT_SECURE_CLOSE:
00096         {
00097           if (Debug) std::clog << "opc_tcp_processor| Closing secure channel." << std::endl;
00098           CloseChannel(iStream);
00099           return false;
00100         }
00101 
00102         case MT_SECURE_MESSAGE:
00103         {
00104           if (Debug) std::clog << "opc_tcp_processor| Processing secure message." << std::endl;
00105           ProcessRequest(iStream, OutputStream);
00106           break;
00107         }
00108 
00109         case MT_ACKNOWLEDGE:
00110         {
00111           if (Debug) std::clog << "opc_tcp_processor| Received acknowledge from client. This should not have happend..." << std::endl;
00112           throw std::logic_error("Thank to client about acknowledge.");
00113         }
00114 
00115         case MT_ERROR:
00116         {
00117           if (Debug) std::clog << "opc_tcp_processor| There is an error happend in the client!" << std::endl;
00118           throw std::logic_error("It is very nice get to know server about error in the client.");
00119         }
00120 
00121         default:
00122         {
00123           if (Debug) std::clog << "opc_tcp_processor| Unknown message type '" << msgType << "' received!" << std::endl;
00124           throw std::logic_error("Invalid message type received.");
00125         }
00126       }
00127       return true;
00128     }
00129 
00130     void OpcTcpMessages::ForwardPublishResponse(const PublishResult result)
00131     {
00132       boost::unique_lock<boost::shared_mutex> lock(ProcessMutex);
00133 
00134       if (Debug) std::clog << "opc_tcp_processor| Sending PublishResult to client!" << std::endl;
00135       if ( PublishRequestQueue.empty() )
00136       {
00137         std::cerr << "Error trying to send publish response while we do not have data from a PublishRequest" << std::endl;
00138         return;
00139       }
00140       PublishRequestElement requestData = PublishRequestQueue.front();
00141       PublishRequestQueue.pop();
00142 
00143       PublishResponse response;
00144 
00145       FillResponseHeader(requestData.requestHeader, response.Header);
00146       response.Parameters = result;
00147      
00148       requestData.sequence.SequenceNumber = ++SequenceNb;
00149 
00150       SecureHeader secureHeader(MT_SECURE_MESSAGE, CHT_SINGLE, ChannelId);
00151       secureHeader.AddSize(RawSize(requestData.algorithmHeader));
00152       secureHeader.AddSize(RawSize(requestData.sequence));
00153       secureHeader.AddSize(RawSize(response));
00154       if (Debug) {
00155         std::cout << "opc_tcp_processor| Sedning publishResponse with " << response.Parameters.NotificationMessage.NotificationData.size() << " PublishResults" << std::endl;
00156       }
00157       OutputStream << secureHeader << requestData.algorithmHeader << requestData.sequence << response << flush;
00158     };
00159     
00160     void OpcTcpMessages::HelloClient(IStreamBinary& istream, OStreamBinary& ostream)
00161     {
00162       using namespace OpcUa::Binary;
00163 
00164       if (Debug) std::clog << "opc_tcp_processor| Reading hello message." << std::endl;
00165       Hello hello;
00166       istream >> hello;
00167 
00168       Acknowledge ack;
00169       ack.ReceiveBufferSize = hello.ReceiveBufferSize;
00170       ack.SendBufferSize = hello.SendBufferSize;
00171       ack.MaxMessageSize = hello.MaxMessageSize;
00172       ack.MaxChunkCount = 1;
00173 
00174       Header ackHeader(MT_ACKNOWLEDGE, CHT_SINGLE);
00175       ackHeader.AddSize(RawSize(ack));
00176       if (Debug) std::clog << "opc_tcp_processor| Sending answer to client." << std::endl;
00177       ostream << ackHeader << ack << flush;
00178     }
00179 
00180     void OpcTcpMessages::OpenChannel(IStreamBinary& istream, OStreamBinary& ostream)
00181     {
00182       uint32_t channelId = 0;
00183       istream >> channelId;
00184       AsymmetricAlgorithmHeader algorithmHeader;
00185       istream >> algorithmHeader;
00186 
00187       if (algorithmHeader.SecurityPolicyUri != "http://opcfoundation.org/UA/SecurityPolicy#None")
00188       {
00189         throw std::logic_error(std::string("Client want to create secure channel with unsupported policy '") + algorithmHeader.SecurityPolicyUri + std::string("'"));
00190       }
00191 
00192       SequenceHeader sequence;
00193       istream >> sequence;
00194 
00195       OpenSecureChannelRequest request;
00196       istream >> request;
00197 
00198       if (request.Parameters.SecurityMode != MessageSecurityMode::None)
00199       {
00200         throw std::logic_error("Unsupported security mode.");
00201       }
00202 
00203       if (request.Parameters.RequestType == SecurityTokenRequestType::Renew)
00204       {
00205         //FIXME:Should check that channel has been issued first
00206         ++TokenId;
00207       }
00208 
00209       sequence.SequenceNumber = ++SequenceNb;
00210 
00211       OpenSecureChannelResponse response;
00212       FillResponseHeader(request.Header, response.Header);
00213       response.ChannelSecurityToken.SecureChannelId = ChannelId;
00214       response.ChannelSecurityToken.TokenId = TokenId;
00215       response.ChannelSecurityToken.CreatedAt = OpcUa::DateTime::Current();
00216       response.ChannelSecurityToken.RevisedLifetime = request.Parameters.RequestLifeTime;
00217 
00218       SecureHeader responseHeader(MT_SECURE_OPEN, CHT_SINGLE, ChannelId);
00219       responseHeader.AddSize(RawSize(algorithmHeader));
00220       responseHeader.AddSize(RawSize(sequence));
00221       responseHeader.AddSize(RawSize(response));
00222       ostream << responseHeader << algorithmHeader << sequence << response << flush;
00223     }
00224 
00225     void OpcTcpMessages::CloseChannel(IStreamBinary& istream)
00226     {
00227       uint32_t channelId = 0;
00228       istream >> channelId;
00229 
00230       SymmetricAlgorithmHeader algorithmHeader;
00231       istream >> algorithmHeader;
00232 
00233       SequenceHeader sequence;
00234       istream >> sequence;
00235 
00236       CloseSecureChannelRequest request;
00237       istream >> request;
00238     }
00239 
00240     void OpcTcpMessages::ProcessRequest(IStreamBinary& istream, OStreamBinary& ostream)
00241     {
00242       uint32_t channelId = 0;
00243       istream >> channelId;
00244 
00245       SymmetricAlgorithmHeader algorithmHeader;
00246       istream >> algorithmHeader;
00247 
00248       SequenceHeader sequence;
00249       istream >> sequence;
00250 
00251       NodeId typeId;
00252       istream >> typeId;
00253 
00254       RequestHeader requestHeader;
00255       istream >> requestHeader;
00256 
00257       sequence.SequenceNumber = ++SequenceNb;
00258 /*
00259       const std::size_t receivedSize =
00260         RawSize(channelId) +
00261         RawSize(algorithmHeader) +
00262         RawSize(sequence) +
00263         RawSize(typeId) +
00264         RawSize(requestHeader);
00265 */
00266       const OpcUa::MessageId message = GetMessageId(typeId);
00267       switch (message)
00268       {
00269         case OpcUa::GET_ENDPOINTS_REQUEST:
00270         {
00271           if (Debug) std::clog << "opc_tcp_processor| Processing get endpoints request." << std::endl;
00272           GetEndpointsParameters filter;
00273           istream >> filter;
00274 
00275           GetEndpointsResponse response;
00276           FillResponseHeader(requestHeader, response.Header);
00277           response.Endpoints = Server->Endpoints()->GetEndpoints(filter);
00278 
00279           SecureHeader secureHeader(MT_SECURE_MESSAGE, CHT_SINGLE, ChannelId);
00280           secureHeader.AddSize(RawSize(algorithmHeader));
00281           secureHeader.AddSize(RawSize(sequence));
00282           secureHeader.AddSize(RawSize(response));
00283           ostream << secureHeader << algorithmHeader << sequence << response << flush;
00284           return;
00285         }
00286 
00287         case OpcUa::FIND_ServerS_REQUEST:
00288         {
00289           if (Debug) std::clog << "opc_tcp_processor| Processing 'Find Servers' request." << std::endl;
00290           FindServersParameters params;
00291           istream >> params;
00292 
00293           FindServersResponse response;
00294           FillResponseHeader(requestHeader, response.Header);
00295           response.Data.Descriptions = Server->Endpoints()->FindServers(params);
00296 
00297           SecureHeader secureHeader(MT_SECURE_MESSAGE, CHT_SINGLE, ChannelId);
00298           secureHeader.AddSize(RawSize(algorithmHeader));
00299           secureHeader.AddSize(RawSize(sequence));
00300           secureHeader.AddSize(RawSize(response));
00301           ostream << secureHeader << algorithmHeader << sequence << response << flush;
00302           return;
00303         }
00304 
00305         case OpcUa::BROWSE_REQUEST:
00306         {
00307           if (Debug) std::clog << "opc_tcp_processor| Processing browse request." << std::endl;
00308           NodesQuery query;
00309           istream >> query;
00310 
00311           BrowseResponse response;
00312           response.Results =  Server->Views()->Browse(query);
00313 
00314           FillResponseHeader(requestHeader, response.Header);
00315 
00316           SecureHeader secureHeader(MT_SECURE_MESSAGE, CHT_SINGLE, ChannelId);
00317           secureHeader.AddSize(RawSize(algorithmHeader));
00318           secureHeader.AddSize(RawSize(sequence));
00319           secureHeader.AddSize(RawSize(response));
00320           ostream << secureHeader << algorithmHeader << sequence << response << flush;
00321           return;
00322         }
00323 
00324         case OpcUa::READ_REQUEST:
00325         {
00326           ReadParameters params;
00327           istream >> params;
00328 
00329           if (Debug)
00330           {
00331             std::clog << "opc_tcp_processor| Processing read request for Node:";
00332             for (ReadValueId id : params.AttributesToRead)
00333             {
00334               std::clog << "opc_tcp_processor|  " << id.NodeId;
00335             }
00336             std::cout << std::endl;
00337           }
00338 
00339           ReadResponse response;
00340           FillResponseHeader(requestHeader, response.Header);
00341           std::vector<DataValue> values;
00342           if (std::shared_ptr<OpcUa::AttributeServices> service = Server->Attributes())
00343           {
00344             values = service->Read(params);
00345           }
00346           else
00347           {
00348             for (auto attribId : params.AttributesToRead)
00349             {
00350               DataValue value;
00351               value.Encoding = DATA_VALUE_STATUS_CODE;
00352               value.Status = OpcUa::StatusCode::BadNotImplemented;
00353               values.push_back(value);
00354             }
00355           }
00356           response.Results = values;
00357 
00358           SecureHeader secureHeader(MT_SECURE_MESSAGE, CHT_SINGLE, ChannelId);
00359           secureHeader.AddSize(RawSize(algorithmHeader));
00360           secureHeader.AddSize(RawSize(sequence));
00361           secureHeader.AddSize(RawSize(response));
00362           ostream << secureHeader << algorithmHeader << sequence << response << flush;
00363 
00364           return;
00365         }
00366 
00367         case OpcUa::WRITE_REQUEST:
00368         {
00369           if (Debug) std::clog << "opc_tcp_processor| Processing write request." << std::endl;
00370           WriteParameters params;
00371           istream >> params;
00372 
00373           WriteResponse response;
00374           FillResponseHeader(requestHeader, response.Header);
00375           std::vector<DataValue> values;
00376           if (std::shared_ptr<OpcUa::AttributeServices> service = Server->Attributes())
00377           {
00378             response.Results = service->Write(params.NodesToWrite);
00379           }
00380           else
00381           {
00382             response.Results = std::vector<StatusCode>(params.NodesToWrite.size(), OpcUa::StatusCode::BadNotImplemented);
00383           }
00384 
00385           SecureHeader secureHeader(MT_SECURE_MESSAGE, CHT_SINGLE, ChannelId);
00386           secureHeader.AddSize(RawSize(algorithmHeader));
00387           secureHeader.AddSize(RawSize(sequence));
00388           secureHeader.AddSize(RawSize(response));
00389           ostream << secureHeader << algorithmHeader << sequence << response << flush;
00390 
00391           return;
00392         }
00393 
00394         case TRANSLATE_BROWSE_PATHS_TO_NODE_IdS_REQUEST:
00395         {
00396           if (Debug) std::clog << "opc_tcp_processor| Processing 'Translate Browse Paths To Node Ids' request." << std::endl;
00397           TranslateBrowsePathsParameters params;
00398           istream >> params;
00399 
00400           if (Debug)
00401           {
00402             for ( BrowsePath path : params.BrowsePaths)
00403             {
00404               std::cout << "opc_tcp_processor| Requested path is: " << path.StartingNode << " : " ;
00405               for ( RelativePathElement el : path.Path.Elements)
00406               {
00407                 std::cout << "/" << el.TargetName ;
00408               }
00409               std::cout << std::endl;
00410             }
00411           }
00412 
00413           std::vector<BrowsePathResult> result = Server->Views()->TranslateBrowsePathsToNodeIds(params);
00414 
00415           if (Debug)
00416           {
00417             for (BrowsePathResult res: result)
00418             {
00419               std::cout << "opc_tcp_processor| Result of browsePath is: " << (uint32_t) res.Status << ". Target is: ";
00420               for ( BrowsePathTarget path : res.Targets)
00421               {
00422                 std::cout << path.Node ;
00423               }
00424               std::cout << std::endl;
00425             }
00426           }
00427 
00428           TranslateBrowsePathsToNodeIdsResponse response;
00429           FillResponseHeader(requestHeader, response.Header);
00430           response.Result.Paths = result;
00431           SecureHeader secureHeader(MT_SECURE_MESSAGE, CHT_SINGLE, ChannelId);
00432           secureHeader.AddSize(RawSize(algorithmHeader));
00433           secureHeader.AddSize(RawSize(sequence));
00434           secureHeader.AddSize(RawSize(response));
00435 
00436           if (Debug) std::clog << "opc_tcp_processor| Sending response to 'Translate Browse Paths To Node Ids' request." << std::endl;
00437           ostream << secureHeader << algorithmHeader << sequence << response << flush;
00438           return;
00439         }
00440 
00441 
00442         case CREATE_SESSION_REQUEST:
00443         {
00444           if (Debug) std::clog << "opc_tcp_processor| Processing create session request." << std::endl;
00445           CreateSessionParameters params;
00446           istream >> params;
00447 
00448           CreateSessionResponse response;
00449           FillResponseHeader(requestHeader, response.Header);
00450 
00451           response.Parameters.SessionId = SessionId;
00452           response.Parameters.AuthenticationToken = SessionId;
00453           response.Parameters.RevisedSessionTimeout = params.RequestedSessionTimeout;
00454           response.Parameters.MaxRequestMessageSize = 65536;
00455           GetEndpointsParameters epf;
00456           response.Parameters.ServerEndpoints = Server->Endpoints()->GetEndpoints(epf);
00457 
00458 
00459           SecureHeader secureHeader(MT_SECURE_MESSAGE, CHT_SINGLE, ChannelId);
00460           secureHeader.AddSize(RawSize(algorithmHeader));
00461           secureHeader.AddSize(RawSize(sequence));
00462           secureHeader.AddSize(RawSize(response));
00463           ostream << secureHeader << algorithmHeader << sequence << response << flush;
00464 
00465           return;
00466         }
00467         case ACTIVATE_SESSION_REQUEST:
00468         {
00469           if (Debug) std::clog << "opc_tcp_processor| Processing activate session request." << std::endl;
00470           ActivateSessionParameters params;
00471           istream >> params;
00472 
00473           ActivateSessionResponse response;
00474           FillResponseHeader(requestHeader, response.Header);
00475 
00476           SecureHeader secureHeader(MT_SECURE_MESSAGE, CHT_SINGLE, ChannelId);
00477           secureHeader.AddSize(RawSize(algorithmHeader));
00478           secureHeader.AddSize(RawSize(sequence));
00479           secureHeader.AddSize(RawSize(response));
00480           ostream << secureHeader << algorithmHeader << sequence << response << flush;
00481           return;
00482         }
00483 
00484         case CLOSE_SESSION_REQUEST:
00485         {
00486           if (Debug) std::clog << "opc_tcp_processor| Processing close session request." << std::endl;
00487           bool deleteSubscriptions = false;
00488           istream >> deleteSubscriptions;
00489 
00490           if (deleteSubscriptions)
00491           {
00492             DeleteAllSubscriptions();
00493           }
00494 
00495           CloseSessionResponse response;
00496           FillResponseHeader(requestHeader, response.Header);
00497 
00498           SecureHeader secureHeader(MT_SECURE_MESSAGE, CHT_SINGLE, ChannelId);
00499           secureHeader.AddSize(RawSize(algorithmHeader));
00500           secureHeader.AddSize(RawSize(sequence));
00501           secureHeader.AddSize(RawSize(response));
00502           ostream << secureHeader << algorithmHeader << sequence << response << flush;
00503           if (Debug) std::clog << "opc_tcp_processor| Session Closed " << std::endl;
00504           return;
00505         }
00506 
00507         case CREATE_SUBSCRIPTION_REQUEST:
00508         {
00509           if (Debug) std::clog << "opc_tcp_processor| Processing create subscription request." << std::endl;
00510           CreateSubscriptionRequest request;
00511           istream >> request.Parameters;
00512           request.Header = requestHeader;
00513 
00514           CreateSubscriptionResponse response;
00515           FillResponseHeader(requestHeader, response.Header);
00516 
00517           response.Data = Server->Subscriptions()->CreateSubscription(request, [this](PublishResult i){ 
00518                 try
00519                 {
00520                   this->ForwardPublishResponse(i); 
00521                 }
00522                 catch (std::exception& ex)
00523                 {
00524                   // TODO Disconnect client!
00525                   std::cerr << "Error forwarding publishResult to client: " << ex.what() << std::endl;
00526                 }
00527               });
00528 
00529           Subscriptions.push_back(response.Data.SubscriptionId); //Keep a link to eventually delete subcriptions when exiting
00530 
00531           SecureHeader secureHeader(MT_SECURE_MESSAGE, CHT_SINGLE, ChannelId);
00532           secureHeader.AddSize(RawSize(algorithmHeader));
00533           secureHeader.AddSize(RawSize(sequence));
00534           secureHeader.AddSize(RawSize(response));
00535           ostream << secureHeader << algorithmHeader << sequence << response << flush;
00536           return;
00537         }
00538 
00539         case DELETE_SUBSCRIPTION_REQUEST:
00540         {
00541           if (Debug) std::clog << "opc_tcp_processor| Processing delete subscription request." << std::endl;
00542           std::vector<uint32_t> ids;
00543           istream >> ids;
00544 
00545           DeleteSubscriptions(ids); //remove from locale subscription lis
00546 
00547           DeleteSubscriptionsResponse response;
00548           FillResponseHeader(requestHeader, response.Header);
00549 
00550           response.Results = Server->Subscriptions()->DeleteSubscriptions(ids);
00551 
00552           SecureHeader secureHeader(MT_SECURE_MESSAGE, CHT_SINGLE, ChannelId);
00553           secureHeader.AddSize(RawSize(algorithmHeader));
00554           secureHeader.AddSize(RawSize(sequence));
00555           secureHeader.AddSize(RawSize(response));
00556 
00557           if (Debug) std::clog << "opc_tcp_processor| Sending response to Delete Subscription Request." << std::endl;
00558           ostream << secureHeader << algorithmHeader << sequence << response << flush;
00559           return;
00560         }
00561 
00562         case CREATE_MONITORED_ITEMS_REQUEST:
00563         {
00564           if (Debug) std::clog << "opc_tcp_processor| Processing 'Create Monitored Items' request." << std::endl;
00565           MonitoredItemsParameters params;
00566           istream >> params;
00567 
00568           CreateMonitoredItemsResponse response;
00569 
00570           response.Results = Server->Subscriptions()->CreateMonitoredItems(params);
00571 
00572           FillResponseHeader(requestHeader, response.Header);
00573           SecureHeader secureHeader(MT_SECURE_MESSAGE, CHT_SINGLE, ChannelId);
00574           secureHeader.AddSize(RawSize(algorithmHeader));
00575           secureHeader.AddSize(RawSize(sequence));
00576           secureHeader.AddSize(RawSize(response));
00577 
00578           if (Debug) std::clog << "opc_tcp_processor| Sending response to Create Monitored Items Request." << std::endl;
00579           ostream << secureHeader << algorithmHeader << sequence << response << flush;
00580           return;
00581         }
00582 
00583         case DELETE_MONITORED_ITEMS_REQUEST:
00584         {
00585           if (Debug) std::clog << "opc_tcp_processor| Processing 'Delete Monitored Items' request." << std::endl;
00586           DeleteMonitoredItemsParameters params;
00587           istream >> params;
00588 
00589           DeleteMonitoredItemsResponse response;
00590 
00591           response.Results = Server->Subscriptions()->DeleteMonitoredItems(params);
00592 
00593           FillResponseHeader(requestHeader, response.Header);
00594           SecureHeader secureHeader(MT_SECURE_MESSAGE, CHT_SINGLE, ChannelId);
00595           secureHeader.AddSize(RawSize(algorithmHeader));
00596           secureHeader.AddSize(RawSize(sequence));
00597           secureHeader.AddSize(RawSize(response));
00598 
00599           if (Debug) std::clog << "opc_tcp_processor| Sending response to Delete Monitored Items Request." << std::endl;
00600           ostream << secureHeader << algorithmHeader << sequence << response << flush;
00601           return;
00602         }
00603 
00604         case PUBLISH_REQUEST:
00605         {
00606           if (Debug) std::clog << "opc_tcp_processor| Processing 'Publish' request." << std::endl;
00607           PublishRequest request;
00608           request.Header = requestHeader;
00609           istream >> request.SubscriptionAcknowledgements;
00610 
00611           PublishRequestElement data;
00612           data.sequence = sequence;
00613           data.algorithmHeader = algorithmHeader;
00614           data.requestHeader = requestHeader;
00615           PublishRequestQueue.push(data);
00616           Server->Subscriptions()->Publish(request);
00617 
00618           --SequenceNb; //We do not send response, so do not increase sequence
00619 
00620           return;
00621         }
00622 
00623         case SET_PUBLISHING_MODE_REQUEST:
00624         {
00625           if (Debug) std::clog << "opc_tcp_processor| Processing 'Set Publishing Mode' request." << std::endl;
00626           PublishingModeParameters params;
00627           istream >> params;
00628 
00629           //FIXME: forward request to internal server!!
00630           SetPublishingModeResponse response;
00631           FillResponseHeader(requestHeader, response.Header);
00632           response.Result.Results.resize(params.SubscriptionIds.size(), StatusCode::Good);
00633 
00634           SecureHeader secureHeader(MT_SECURE_MESSAGE, CHT_SINGLE, ChannelId);
00635           secureHeader.AddSize(RawSize(algorithmHeader));
00636           secureHeader.AddSize(RawSize(sequence));
00637           secureHeader.AddSize(RawSize(response));
00638 
00639           if (Debug) std::clog << "opc_tcp_processor| Sending response to 'Set Publishing Mode' request." << std::endl;
00640           ostream << secureHeader << algorithmHeader << sequence << response << flush;
00641           return;
00642         }
00643 
00644         case ADD_NODES_REQUEST:
00645         {
00646           if (Debug) std::clog << "opc_tcp_processor| Processing 'Add Nodes' request." << std::endl;
00647           AddNodesParameters params;
00648           istream >> params;
00649 
00650           std::vector<AddNodesResult> results = Server->NodeManagement()->AddNodes(params.NodesToAdd);
00651 
00652           AddNodesResponse response;
00653           FillResponseHeader(requestHeader, response.Header);
00654           response.results = results;
00655 
00656           SecureHeader secureHeader(MT_SECURE_MESSAGE, CHT_SINGLE, ChannelId);
00657           secureHeader.AddSize(RawSize(algorithmHeader));
00658           secureHeader.AddSize(RawSize(sequence));
00659           secureHeader.AddSize(RawSize(response));
00660 
00661           if (Debug) std::clog << "opc_tcp_processor| Sending response to 'Add Nodes' request." << std::endl;
00662           ostream << secureHeader << algorithmHeader << sequence << response << flush;
00663           return;
00664         }
00665 
00666         case ADD_REFERENCES_REQUEST:
00667         {
00668           if (Debug) std::clog << "opc_tcp_processor| Processing 'Add References' request." << std::endl;
00669           AddReferencesParameters params;
00670           istream >> params;
00671 
00672           std::vector<StatusCode> results = Server->NodeManagement()->AddReferences(params.ReferencesToAdd);
00673 
00674           AddReferencesResponse response;
00675           FillResponseHeader(requestHeader, response.Header);
00676           response.Results = results;
00677 
00678           SecureHeader secureHeader(MT_SECURE_MESSAGE, CHT_SINGLE, ChannelId);
00679           secureHeader.AddSize(RawSize(algorithmHeader));
00680           secureHeader.AddSize(RawSize(sequence));
00681           secureHeader.AddSize(RawSize(response));
00682 
00683           if (Debug) std::clog << "opc_tcp_processor| Sending response to 'Add References' request." << std::endl;
00684           ostream << secureHeader << algorithmHeader << sequence << response << flush;
00685           return;
00686         }
00687 
00688         case REPUBLISH_REQUEST:
00689         {
00690           if (Debug) std::clog << "opc_tcp_processor| Processing 'Republish' request." << std::endl;
00691           RepublishParameters params;
00692           istream >> params;
00693 
00694           //Not implemented so we just say we do not have that notification
00695           RepublishResponse response;
00696           FillResponseHeader(requestHeader, response.Header);
00697           response.Header.ServiceResult = StatusCode::BadMessageNotAvailable;
00698 
00699           SecureHeader secureHeader(MT_SECURE_MESSAGE, CHT_SINGLE, ChannelId);
00700           secureHeader.AddSize(RawSize(algorithmHeader));
00701           secureHeader.AddSize(RawSize(sequence));
00702           secureHeader.AddSize(RawSize(response));
00703 
00704           if (Debug) std::clog << "opc_tcp_processor| Sending response to 'Republish' request." << std::endl;
00705           ostream << secureHeader << algorithmHeader << sequence << response << flush;
00706           return;
00707         }
00708 
00709         case CALL_REQUEST:
00710         {
00711           if (Debug) std::clog << "opc_tcp_processor| Processing call request." << std::endl;
00712           CreateSessionParameters params;
00713           istream >> params;
00714 
00715           CreateSessionResponse response;
00716           FillResponseHeader(requestHeader, response.Header);
00717 
00718           response.Parameters.SessionId = SessionId;
00719           response.Parameters.AuthenticationToken = SessionId;
00720           response.Parameters.RevisedSessionTimeout = params.RequestedSessionTimeout;
00721           response.Parameters.MaxRequestMessageSize = 65536;
00722           GetEndpointsParameters epf;
00723           response.Parameters.ServerEndpoints = Server->Endpoints()->GetEndpoints(epf);
00724 
00725 
00726           SecureHeader secureHeader(MT_SECURE_MESSAGE, CHT_SINGLE, ChannelId);
00727           secureHeader.AddSize(RawSize(algorithmHeader));
00728           secureHeader.AddSize(RawSize(sequence));
00729           secureHeader.AddSize(RawSize(response));
00730           ostream << secureHeader << algorithmHeader << sequence << response << flush;
00731 
00732           return;
00733         }
00734  
00735 
00736         default:
00737         {
00738           ServiceFaultResponse response;
00739           FillResponseHeader(requestHeader, response.Header);
00740           response.Header.ServiceResult = StatusCode::BadNotImplemented;
00741 
00742           SecureHeader secureHeader(MT_SECURE_MESSAGE, CHT_SINGLE, ChannelId);
00743           secureHeader.AddSize(RawSize(algorithmHeader));
00744           secureHeader.AddSize(RawSize(sequence));
00745           secureHeader.AddSize(RawSize(response));
00746 
00747           if (Debug) std::cerr << "opc_tcp_processor| Sending ServiceFaultResponse to unsupported request of id: " << message << std::endl;
00748           ostream << secureHeader << algorithmHeader << sequence << response << flush;
00749           return;
00750         }
00751       }
00752     }
00753 
00754     void OpcTcpMessages::FillResponseHeader(const RequestHeader& requestHeader, ResponseHeader& responseHeader)
00755     {
00756        //responseHeader.InnerDiagnostics.push_back(DiagnosticInfo());
00757        responseHeader.Timestamp = DateTime::Current();
00758        responseHeader.RequestHandle = requestHeader.RequestHandle;
00759     }
00760 
00761     void OpcTcpMessages::DeleteAllSubscriptions()
00762     {
00763       std::vector<uint32_t> subs;
00764       for (const uint32_t& subid: Subscriptions)
00765       {
00766         subs.push_back(subid);
00767       }
00768       Server->Subscriptions()->DeleteSubscriptions(subs);
00769       Subscriptions.clear();
00770     }
00771 
00772     void OpcTcpMessages::DeleteSubscriptions(const std::vector<uint32_t>& ids)
00773     {
00774       for ( auto id : ids )
00775       {
00776         Subscriptions.erase(std::remove_if(Subscriptions.begin(), Subscriptions.end(),
00777                       [&](const uint32_t d) { return ( d == id) ; }), Subscriptions.end());
00778       }
00779     }
00780 
00781   } // namespace UaServer
00782 } // namespace OpcUa


ros_opcua_impl_freeopcua
Author(s): Denis Štogl
autogenerated on Sat Jun 8 2019 18:24:56