35 #include <sys/types.h> 40 #include <arpa/inet.h> 42 #include <netinet/in.h> 45 #include <sys/socket.h> 50 using namespace OpcUa;
58 explicit SocketHolder(
int socket,
const Common::Logger::SharedPtr & logger)
66 if (close(Socket) < 0)
68 LOG_ERROR(Logger,
"unable to close server socket");
72 bool operator < (
int sock)
const 79 Common::Logger::SharedPtr Logger;
86 Client(std::shared_ptr<IOChannel> channel, std::shared_ptr<IncomingConnectionProcessor> processor, std::function<
void()> onFinish,
const Common::Logger::SharedPtr & logger)
88 , Processor(processor)
92 LOG_INFO(Logger,
"starting new client thread");
93 std::function<void()> func = std::bind(&
Client::Run, std::ref(*
this));
100 ClientThread.reset();
101 LOG_INFO(Logger,
"client thread stopped");
105 virtual void OnSuccess()
107 LOG_INFO(Logger,
"server thread exited successfully");
110 virtual void OnError(
const std::exception & exc)
112 LOG_ERROR(Logger,
"server thread terminated: {}", exc.what());
120 LOG_INFO(Logger,
"start to process client connection");
121 Processor->Process(Channel);
124 catch (
const std::exception & exc)
126 LOG_ERROR(Logger,
"unable to process client connection: {}", exc.what());
129 std::thread t(OnFinish);
134 std::shared_ptr<IOChannel> Channel;
135 std::shared_ptr<IncomingConnectionProcessor> Processor;
136 std::function<void()> OnFinish;
137 std::unique_ptr<Common::Thread> ClientThread;
138 Common::Logger::SharedPtr Logger;
145 TcpServerConnection(
const TcpParameters & params, std::shared_ptr<OpcUa::Server::IncomingConnectionProcessor> processor,
const Common::Logger::SharedPtr & logger)
149 , Processor(processor)
154 virtual ~TcpServerConnection()
161 catch (
const std::exception & exc)
163 LOG_ERROR(Logger,
"unable to stop server: ", exc.what());
179 LOG_INFO(Logger,
"shutting down opc ua binary server");
182 ServerThread->Join();
183 ServerThread.reset();
188 virtual void OnSuccess()
190 LOG_INFO(Logger,
"server thread exited successfully");
193 virtual void OnError(
const std::exception & exc)
195 LOG_ERROR(Logger,
"server thread exited with error: {}", exc.what());
200 void StartNewThread()
202 LOG_INFO(Logger,
"starting new server thread");
210 Socket = socket(AF_INET, SOCK_STREAM, 0);
219 throw std::logic_error(
std::string(
"unable to create server socket: ") + strerror(errno));
222 SocketHolder holder(Socket, Logger);
223 LOG_INFO(Logger,
"listening on: {}", Port);
226 addr.sin_family = AF_INET;
227 addr.sin_port = htons(Port);
228 addr.sin_addr.s_addr = htonl(INADDR_ANY);
230 if (bind(Socket, (sockaddr *)&addr,
sizeof(addr)) < 0)
237 throw std::logic_error(
std::string(
"unable bind socket: ") + strerror(errno));
240 const unsigned ServerQueueSize = 5;
241 listen(Socket, ServerQueueSize);
246 int clientSocket = accept(Socket, NULL, NULL);
253 if (clientSocket < 0)
255 throw std::logic_error(
std::string(
"unable to accept client connection: ") + strerror(errno));
258 std::unique_lock<std::mutex> lock(ClientsMutex);
259 std::shared_ptr<IOChannel> clientChannel(
new SocketChannel(clientSocket));
260 std::shared_ptr<Client> clientThread(
new Client(clientChannel, Processor, std::bind(&TcpServerConnection::Erase, std::ref(*
this), clientSocket), Logger));
261 ClientThreads.insert(std::make_pair(clientSocket, clientThread));
264 ClientThreads.clear();
269 std::unique_lock<std::mutex> lock(ClientsMutex);
273 ClientThreads.erase(client);
278 const unsigned short Port;
279 volatile bool Stopped;
281 std::shared_ptr<IncomingConnectionProcessor> Processor;
282 std::unique_ptr<Common::Thread> ServerThread;
283 std::mutex ClientsMutex;
284 std::map<int, std::shared_ptr<Client>> ClientThreads;
285 Common::Logger::SharedPtr Logger;
297 void Listen(
const OpcUa::Server::TcpParameters & params, std::shared_ptr<OpcUa::Server::IncomingConnectionProcessor> processor)
override 302 throw std::logic_error(
"server on this port already exists");
305 std::shared_ptr<TcpServerConnection> listener(
new TcpServerConnection(params, processor, Logger));
307 Servers.insert(std::make_pair(params.
Port, listener));
312 ServersMap::iterator serverIt = Servers.find(params.
Port);
321 serverIt->second->Stop();
322 Servers.erase(serverIt);
325 catch (
const std::exception & exc)
327 LOG_ERROR(Logger,
"stopping TcpServer on port: {} failed: {}", params.
Port, exc.what());
332 typedef std::map<unsigned short, std::shared_ptr<TcpServerConnection>> ServersMap;
340 return std::unique_ptr<OpcUa::Server::TcpServer>(new ::TcpServer(logger));
Addon interface definition GNU LGPL.
#define LOG_ERROR(__logger__,...)
#define LOG_INFO(__logger__,...)
OPC UA Address space part. GNU LGPL.
#define DEFINE_CLASS_POINTERS(ClassName)
Exception declarations GNU LGPL.
ROSCONSOLE_DECL void shutdown()
TcpServer::UniquePtr CreateTcpServer(const Common::Logger::SharedPtr &logger)