server.cpp
Go to the documentation of this file.
1 /* SPDX-License-Identifier: MPL-2.0 */
2 
3 #include "precompiled.hpp"
4 #include "macros.hpp"
5 #include "server.hpp"
6 #include "pipe.hpp"
7 #include "wire.hpp"
8 #include "random.hpp"
9 #include "likely.hpp"
10 #include "err.hpp"
11 
12 zmq::server_t::server_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
13  socket_base_t (parent_, tid_, sid_, true),
14  _next_routing_id (generate_random ())
15 {
19 }
20 
22 {
23  zmq_assert (_out_pipes.empty ());
24 }
25 
26 void zmq::server_t::xattach_pipe (pipe_t *pipe_,
27  bool subscribe_to_all_,
28  bool locally_initiated_)
29 {
30  LIBZMQ_UNUSED (subscribe_to_all_);
31  LIBZMQ_UNUSED (locally_initiated_);
32 
33  zmq_assert (pipe_);
34 
35  uint32_t routing_id = _next_routing_id++;
36  if (!routing_id)
37  routing_id = _next_routing_id++; // Never use Routing ID zero
38 
39  pipe_->set_server_socket_routing_id (routing_id);
40  // Add the record into output pipes lookup table
41  outpipe_t outpipe = {pipe_, true};
42  const bool ok =
43  _out_pipes.ZMQ_MAP_INSERT_OR_EMPLACE (routing_id, outpipe).second;
44  zmq_assert (ok);
45 
46  _fq.attach (pipe_);
47 }
48 
49 void zmq::server_t::xpipe_terminated (pipe_t *pipe_)
50 {
51  const out_pipes_t::iterator it =
52  _out_pipes.find (pipe_->get_server_socket_routing_id ());
53  zmq_assert (it != _out_pipes.end ());
54  _out_pipes.erase (it);
55  _fq.pipe_terminated (pipe_);
56 }
57 
58 void zmq::server_t::xread_activated (pipe_t *pipe_)
59 {
60  _fq.activated (pipe_);
61 }
62 
63 void zmq::server_t::xwrite_activated (pipe_t *pipe_)
64 {
65  const out_pipes_t::iterator end = _out_pipes.end ();
66  out_pipes_t::iterator it;
67  for (it = _out_pipes.begin (); it != end; ++it)
68  if (it->second.pipe == pipe_)
69  break;
70 
71  zmq_assert (it != _out_pipes.end ());
72  zmq_assert (!it->second.active);
73  it->second.active = true;
74 }
75 
77 {
78  // SERVER sockets do not allow multipart data (ZMQ_SNDMORE)
79  if (msg_->flags () & msg_t::more) {
80  errno = EINVAL;
81  return -1;
82  }
83  // Find the pipe associated with the routing stored in the message.
84  const uint32_t routing_id = msg_->get_routing_id ();
85  out_pipes_t::iterator it = _out_pipes.find (routing_id);
86 
87  if (it != _out_pipes.end ()) {
88  if (!it->second.pipe->check_write ()) {
89  it->second.active = false;
90  errno = EAGAIN;
91  return -1;
92  }
93  } else {
95  return -1;
96  }
97 
98  // Message might be delivered over inproc, so we reset routing id
99  int rc = msg_->reset_routing_id ();
100  errno_assert (rc == 0);
101 
102  const bool ok = it->second.pipe->write (msg_);
103  if (unlikely (!ok)) {
104  // Message failed to send - we must close it ourselves.
105  rc = msg_->close ();
106  errno_assert (rc == 0);
107  } else
108  it->second.pipe->flush ();
109 
110  // Detach the message from the data buffer.
111  rc = msg_->init ();
112  errno_assert (rc == 0);
113 
114  return 0;
115 }
116 
118 {
119  pipe_t *pipe = NULL;
120  int rc = _fq.recvpipe (msg_, &pipe);
121 
122  // Drop any messages with more flag
123  while (rc == 0 && msg_->flags () & msg_t::more) {
124  // drop all frames of the current multi-frame message
125  rc = _fq.recvpipe (msg_, NULL);
126 
127  while (rc == 0 && msg_->flags () & msg_t::more)
128  rc = _fq.recvpipe (msg_, NULL);
129 
130  // get the new message
131  if (rc == 0)
132  rc = _fq.recvpipe (msg_, &pipe);
133  }
134 
135  if (rc != 0)
136  return rc;
137 
138  zmq_assert (pipe != NULL);
139 
140  const uint32_t routing_id = pipe->get_server_socket_routing_id ();
141  msg_->set_routing_id (routing_id);
142 
143  return 0;
144 }
145 
147 {
148  return _fq.has_in ();
149 }
150 
152 {
153  // In theory, SERVER socket is always ready for writing. Whether actual
154  // attempt to write succeeds depends on which pipe the message is going
155  // to be routed to.
156  return true;
157 }
zmq::server_t::server_t
server_t(zmq::ctx_t *parent_, uint32_t tid_, int sid_)
Definition: server.cpp:12
zmq::server_t::~server_t
~server_t()
Definition: server.cpp:21
zmq::server_t::xattach_pipe
void xattach_pipe(zmq::pipe_t *pipe_, bool subscribe_to_all_, bool locally_initiated_)
Definition: server.cpp:26
ZMQ_SERVER
#define ZMQ_SERVER
Definition: zmq_draft.h:14
end
GLuint GLuint end
Definition: glcorearb.h:2858
zmq::msg_t::reset_routing_id
int reset_routing_id()
Definition: msg.cpp:643
NULL
NULL
Definition: test_security_zap.cpp:405
zmq::server_t::xread_activated
void xread_activated(zmq::pipe_t *pipe_)
Definition: server.cpp:58
EINVAL
#define EINVAL
Definition: errno.hpp:25
EAGAIN
#define EAGAIN
Definition: errno.hpp:14
precompiled.hpp
zmq_assert
#define zmq_assert(x)
Definition: err.hpp:102
random.hpp
errno
int errno
zmq::socket_base_t
Definition: socket_base.hpp:31
zmq::server_t::xrecv
int xrecv(zmq::msg_t *msg_)
Definition: server.cpp:117
zmq::server_t::outpipe_t
Definition: server.hpp:43
ok
ROSCPP_DECL bool ok()
wire.hpp
zmq::options_t::can_send_hello_msg
bool can_send_hello_msg
Definition: options.hpp:279
errno_assert
#define errno_assert(x)
Definition: err.hpp:113
server.hpp
macros.hpp
zmq::msg_t::get_routing_id
uint32_t get_routing_id() const
Definition: msg.cpp:628
zmq::msg_t::flags
unsigned char flags
Definition: msg.hpp:232
LIBZMQ_UNUSED
#define LIBZMQ_UNUSED(object)
Definition: macros.hpp:6
zmq::msg_t::close
int close()
Definition: msg.cpp:242
zmq::own_t::options
options_t options
Definition: own.hpp:78
zmq::server_t::xhas_in
bool xhas_in()
Definition: server.cpp:146
pipe.hpp
zmq::server_t::xpipe_terminated
void xpipe_terminated(zmq::pipe_t *pipe_)
Definition: server.cpp:49
zmq::msg_t::init
int init()
Definition: msg.cpp:50
zmq::server_t::xhas_out
bool xhas_out()
Definition: server.cpp:151
zmq::msg_t::more
@ more
Definition: msg.hpp:55
zmq::msg_t::set_routing_id
int set_routing_id(uint32_t routing_id_)
Definition: msg.cpp:633
zmq::server_t::xsend
int xsend(zmq::msg_t *msg_)
Definition: server.cpp:76
zmq::generate_random
uint32_t generate_random()
Definition: random.cpp:30
err.hpp
likely.hpp
EHOSTUNREACH
#define EHOSTUNREACH
Definition: zmq.h:152
true
#define true
Definition: cJSON.c:65
zmq::options_t::can_recv_disconnect_msg
bool can_recv_disconnect_msg
Definition: options.hpp:283
zmq::options_t::type
int8_t type
Definition: options.hpp:80
zmq::server_t::xwrite_activated
void xwrite_activated(zmq::pipe_t *pipe_)
Definition: server.cpp:63
it
MapIter it
Definition: php/ext/google/protobuf/map.c:205
zmq::msg_t
Definition: msg.hpp:33
unlikely
#define unlikely(x)
Definition: likely.hpp:11


libaditof
Author(s):
autogenerated on Wed May 21 2025 02:06:58