pgm_receiver.cpp
Go to the documentation of this file.
1 /* SPDX-License-Identifier: MPL-2.0 */
2 
3 #include "precompiled.hpp"
4 #include "macros.hpp"
5 
6 #if defined ZMQ_HAVE_OPENPGM
7 
8 #include <new>
9 
10 #include "pgm_receiver.hpp"
11 #include "session_base.hpp"
12 #include "v1_decoder.hpp"
13 #include "stdint.hpp"
14 #include "wire.hpp"
15 #include "err.hpp"
16 
17 zmq::pgm_receiver_t::pgm_receiver_t (class io_thread_t *parent_,
18  const options_t &options_) :
19  io_object_t (parent_),
20  has_rx_timer (false),
21  pgm_socket (true, options_),
22  options (options_),
23  session (NULL),
24  active_tsi (NULL),
25  insize (0)
26 {
27 }
28 
29 zmq::pgm_receiver_t::~pgm_receiver_t ()
30 {
31  // Destructor should not be called before unplug.
32  zmq_assert (peers.empty ());
33 }
34 
35 int zmq::pgm_receiver_t::init (bool udp_encapsulation_, const char *network_)
36 {
37  return pgm_socket.init (udp_encapsulation_, network_);
38 }
39 
40 void zmq::pgm_receiver_t::plug (io_thread_t *io_thread_,
41  session_base_t *session_)
42 {
43  LIBZMQ_UNUSED (io_thread_);
44  // Retrieve PGM fds and start polling.
45  fd_t socket_fd = retired_fd;
46  fd_t waiting_pipe_fd = retired_fd;
47  pgm_socket.get_receiver_fds (&socket_fd, &waiting_pipe_fd);
48  socket_handle = add_fd (socket_fd);
49  pipe_handle = add_fd (waiting_pipe_fd);
50  set_pollin (pipe_handle);
51  set_pollin (socket_handle);
52 
53  session = session_;
54 
55  // If there are any subscriptions already queued in the session, drop them.
56  drop_subscriptions ();
57 }
58 
59 void zmq::pgm_receiver_t::unplug ()
60 {
61  // Delete decoders.
62  for (peers_t::iterator it = peers.begin (), end = peers.end (); it != end;
63  ++it) {
64  if (it->second.decoder != NULL) {
65  LIBZMQ_DELETE (it->second.decoder);
66  }
67  }
68  peers.clear ();
69  active_tsi = NULL;
70 
71  if (has_rx_timer) {
72  cancel_timer (rx_timer_id);
73  has_rx_timer = false;
74  }
75 
76  rm_fd (socket_handle);
77  rm_fd (pipe_handle);
78 
79  session = NULL;
80 }
81 
82 void zmq::pgm_receiver_t::terminate ()
83 {
84  unplug ();
85  delete this;
86 }
87 
88 void zmq::pgm_receiver_t::restart_output ()
89 {
90  drop_subscriptions ();
91 }
92 
93 bool zmq::pgm_receiver_t::restart_input ()
94 {
95  zmq_assert (session != NULL);
96  zmq_assert (active_tsi != NULL);
97 
98  const peers_t::iterator it = peers.find (*active_tsi);
99  zmq_assert (it != peers.end ());
100  zmq_assert (it->second.joined);
101 
102  // Push the pending message into the session.
103  int rc = session->push_msg (it->second.decoder->msg ());
104  errno_assert (rc == 0);
105 
106  if (insize > 0) {
107  rc = process_input (it->second.decoder);
108  if (rc == -1) {
109  // HWM reached; we will try later.
110  if (errno == EAGAIN) {
111  session->flush ();
112  return true;
113  }
114  // Data error. Delete message decoder, mark the
115  // peer as not joined and drop remaining data.
116  it->second.joined = false;
117  LIBZMQ_DELETE (it->second.decoder);
118  insize = 0;
119  }
120  }
121 
122  // Resume polling.
123  set_pollin (pipe_handle);
124  set_pollin (socket_handle);
125 
126  active_tsi = NULL;
127  in_event ();
128 
129  return true;
130 }
131 
132 const zmq::endpoint_uri_pair_t &zmq::pgm_receiver_t::get_endpoint () const
133 {
134  return _empty_endpoint;
135 }
136 
137 void zmq::pgm_receiver_t::in_event ()
138 {
139  // If active_tsi is not null, there is a pending restart_input.
140  // Keep the internal state as is so that restart_input would process the right data
141  if (active_tsi) {
142  return;
143  }
144 
145  // Read data from the underlying pgm_socket.
146  const pgm_tsi_t *tsi = NULL;
147 
148  if (has_rx_timer) {
149  cancel_timer (rx_timer_id);
150  has_rx_timer = false;
151  }
152 
153  // TODO: This loop can effectively block other engines in the same I/O
154  // thread in the case of high load.
155  while (true) {
156  // Get new batch of data.
157  // Note the workaround made not to break strict-aliasing rules.
158  insize = 0;
159  void *tmp = NULL;
160  ssize_t received = pgm_socket.receive (&tmp, &tsi);
161 
162  // No data to process. This may happen if the packet received is
163  // neither ODATA nor ODATA.
164  if (received == 0) {
165  if (errno == ENOMEM || errno == EBUSY) {
166  const long timeout = pgm_socket.get_rx_timeout ();
167  add_timer (timeout, rx_timer_id);
168  has_rx_timer = true;
169  }
170  break;
171  }
172 
173  // Find the peer based on its TSI.
174  peers_t::iterator it = peers.find (*tsi);
175 
176  // Data loss. Delete decoder and mark the peer as disjoint.
177  if (received == -1) {
178  if (it != peers.end ()) {
179  it->second.joined = false;
180  if (it->second.decoder != NULL) {
181  LIBZMQ_DELETE (it->second.decoder);
182  }
183  }
184  break;
185  }
186 
187  // New peer. Add it to the list of know but unjoint peers.
188  if (it == peers.end ()) {
189  peer_info_t peer_info = {false, NULL};
190  it = peers.ZMQ_MAP_INSERT_OR_EMPLACE (*tsi, peer_info).first;
191  }
192 
193  insize = static_cast<size_t> (received);
194  inpos = (unsigned char *) tmp;
195 
196  // Read the offset of the fist message in the current packet.
197  zmq_assert (insize >= sizeof (uint16_t));
198  uint16_t offset = get_uint16 (inpos);
199  inpos += sizeof (uint16_t);
200  insize -= sizeof (uint16_t);
201 
202  // Join the stream if needed.
203  if (!it->second.joined) {
204  // There is no beginning of the message in current packet.
205  // Ignore the data.
206  if (offset == 0xffff)
207  continue;
208 
209  zmq_assert (offset <= insize);
210  zmq_assert (it->second.decoder == NULL);
211 
212  // We have to move data to the beginning of the first message.
213  inpos += offset;
214  insize -= offset;
215 
216  // Mark the stream as joined.
217  it->second.joined = true;
218 
219  // Create and connect decoder for the peer.
220  it->second.decoder =
221  new (std::nothrow) v1_decoder_t (0, options.maxmsgsize);
222  alloc_assert (it->second.decoder);
223  }
224 
225  int rc = process_input (it->second.decoder);
226  if (rc == -1) {
227  if (errno == EAGAIN) {
228  active_tsi = tsi;
229 
230  // Stop polling.
231  reset_pollin (pipe_handle);
232  reset_pollin (socket_handle);
233 
234  break;
235  }
236 
237  it->second.joined = false;
238  LIBZMQ_DELETE (it->second.decoder);
239  insize = 0;
240  }
241  }
242 
243  // Flush any messages decoder may have produced.
244  session->flush ();
245 }
246 
247 int zmq::pgm_receiver_t::process_input (v1_decoder_t *decoder)
248 {
249  zmq_assert (session != NULL);
250 
251  while (insize > 0) {
252  size_t n = 0;
253  int rc = decoder->decode (inpos, insize, n);
254  if (rc == -1)
255  return -1;
256  inpos += n;
257  insize -= n;
258  if (rc == 0)
259  break;
260  rc = session->push_msg (decoder->msg ());
261  if (rc == -1) {
263  return -1;
264  }
265  }
266  return 0;
267 }
268 
269 
270 void zmq::pgm_receiver_t::timer_event (int token)
271 {
272  zmq_assert (token == rx_timer_id);
273 
274  // Timer cancels on return by poller_base.
275  has_rx_timer = false;
276  in_event ();
277 }
278 
279 void zmq::pgm_receiver_t::drop_subscriptions ()
280 {
281  msg_t msg;
282  msg.init ();
283  while (session->pull_msg (&msg) == 0)
284  msg.close ();
285 }
286 
287 #endif
LIBZMQ_DELETE
#define LIBZMQ_DELETE(p_object)
Definition: macros.hpp:7
pgm_receiver.hpp
init
WEPOLL_INTERNAL int init(void)
Definition: wepoll.c:858
end
GLuint GLuint end
Definition: glcorearb.h:2858
NULL
NULL
Definition: test_security_zap.cpp:405
zmq::retired_fd
@ retired_fd
Definition: fd.hpp:32
options
Message * options
Definition: src/google/protobuf/descriptor.cc:3119
EAGAIN
#define EAGAIN
Definition: errno.hpp:14
precompiled.hpp
zmq_assert
#define zmq_assert(x)
Definition: err.hpp:102
errno
int errno
zmq::get_uint16
uint16_t get_uint16(const unsigned char *buffer_)
Definition: wire.hpp:29
wire.hpp
alloc_assert
#define alloc_assert(x)
Definition: err.hpp:146
errno_assert
#define errno_assert(x)
Definition: err.hpp:113
macros.hpp
zmq::endpoint_uri_pair_t
Definition: endpoint.hpp:17
v1_decoder.hpp
stdint.hpp
offset
GLintptr offset
Definition: glcorearb.h:2944
LIBZMQ_UNUSED
#define LIBZMQ_UNUSED(object)
Definition: macros.hpp:6
timeout
GLbitfield GLuint64 timeout
Definition: glcorearb.h:3588
decoder
static uint8_t decoder[96]
Definition: zmq_utils.cpp:85
n
GLdouble n
Definition: glcorearb.h:4153
err.hpp
fd_t
zmq_fd_t fd_t
Definition: libzmq/tests/testutil.hpp:98
true
#define true
Definition: cJSON.c:65
session_base.hpp
false
#define false
Definition: cJSON.c:70
it
MapIter it
Definition: php/ext/google/protobuf/map.c:205
options_
DebugStringOptions options_
Definition: src/google/protobuf/descriptor.cc:2410


libaditof
Author(s):
autogenerated on Wed May 21 2025 02:06:57