pgm_sender.cpp
Go to the documentation of this file.
1 /* SPDX-License-Identifier: MPL-2.0 */
2 
3 #include "precompiled.hpp"
4 
5 #if defined ZMQ_HAVE_OPENPGM
6 
7 #include <stdlib.h>
8 
9 #include "io_thread.hpp"
10 #include "pgm_sender.hpp"
11 #include "session_base.hpp"
12 #include "err.hpp"
13 #include "wire.hpp"
14 #include "stdint.hpp"
15 #include "macros.hpp"
16 
17 zmq::pgm_sender_t::pgm_sender_t (io_thread_t *parent_,
18  const options_t &options_) :
19  io_object_t (parent_),
20  has_tx_timer (false),
21  has_rx_timer (false),
22  session (NULL),
23  encoder (0),
24  more_flag (false),
25  pgm_socket (false, options_),
26  options (options_),
27  handle (static_cast<handle_t> (NULL)),
28  uplink_handle (static_cast<handle_t> (NULL)),
29  rdata_notify_handle (static_cast<handle_t> (NULL)),
30  pending_notify_handle (static_cast<handle_t> (NULL)),
31  out_buffer (NULL),
32  out_buffer_size (0),
33  write_size (0)
34 {
35  int rc = msg.init ();
36  errno_assert (rc == 0);
37 }
38 
39 int zmq::pgm_sender_t::init (bool udp_encapsulation_, const char *network_)
40 {
41  int rc = pgm_socket.init (udp_encapsulation_, network_);
42  if (rc != 0)
43  return rc;
44 
45  out_buffer_size = pgm_socket.get_max_tsdu_size ();
46  out_buffer = (unsigned char *) malloc (out_buffer_size);
47  alloc_assert (out_buffer);
48 
49  return rc;
50 }
51 
52 void zmq::pgm_sender_t::plug (io_thread_t *io_thread_, session_base_t *session_)
53 {
54  LIBZMQ_UNUSED (io_thread_);
55  // Allocate 2 fds for PGM socket.
56  fd_t downlink_socket_fd = retired_fd;
57  fd_t uplink_socket_fd = retired_fd;
58  fd_t rdata_notify_fd = retired_fd;
59  fd_t pending_notify_fd = retired_fd;
60 
61  session = session_;
62 
63  // Fill fds from PGM transport and add them to the poller.
64  pgm_socket.get_sender_fds (&downlink_socket_fd, &uplink_socket_fd,
65  &rdata_notify_fd, &pending_notify_fd);
66 
67  handle = add_fd (downlink_socket_fd);
68  uplink_handle = add_fd (uplink_socket_fd);
69  rdata_notify_handle = add_fd (rdata_notify_fd);
70  pending_notify_handle = add_fd (pending_notify_fd);
71 
72  // Set POLLIN. We will never want to stop polling for uplink = we never
73  // want to stop processing NAKs.
74  set_pollin (uplink_handle);
75  set_pollin (rdata_notify_handle);
76  set_pollin (pending_notify_handle);
77 
78  // Set POLLOUT for downlink_socket_handle.
79  set_pollout (handle);
80 }
81 
82 void zmq::pgm_sender_t::unplug ()
83 {
84  if (has_rx_timer) {
85  cancel_timer (rx_timer_id);
86  has_rx_timer = false;
87  }
88 
89  if (has_tx_timer) {
90  cancel_timer (tx_timer_id);
91  has_tx_timer = false;
92  }
93 
94  rm_fd (handle);
95  rm_fd (uplink_handle);
96  rm_fd (rdata_notify_handle);
97  rm_fd (pending_notify_handle);
98  session = NULL;
99 }
100 
101 void zmq::pgm_sender_t::terminate ()
102 {
103  unplug ();
104  delete this;
105 }
106 
107 void zmq::pgm_sender_t::restart_output ()
108 {
109  set_pollout (handle);
110  out_event ();
111 }
112 
113 bool zmq::pgm_sender_t::restart_input ()
114 {
115  zmq_assert (false);
116  return true;
117 }
118 
119 const zmq::endpoint_uri_pair_t &zmq::pgm_sender_t::get_endpoint () const
120 {
121  return _empty_endpoint;
122 }
123 
124 zmq::pgm_sender_t::~pgm_sender_t ()
125 {
126  int rc = msg.close ();
127  errno_assert (rc == 0);
128 
129  if (out_buffer) {
130  free (out_buffer);
131  out_buffer = NULL;
132  }
133 }
134 
135 void zmq::pgm_sender_t::in_event ()
136 {
137  if (has_rx_timer) {
138  cancel_timer (rx_timer_id);
139  has_rx_timer = false;
140  }
141 
142  // In-event on sender side means NAK or SPMR receiving from some peer.
143  pgm_socket.process_upstream ();
144  if (errno == ENOMEM || errno == EBUSY) {
145  const long timeout = pgm_socket.get_rx_timeout ();
146  add_timer (timeout, rx_timer_id);
147  has_rx_timer = true;
148  }
149 }
150 
151 void zmq::pgm_sender_t::out_event ()
152 {
153  // POLLOUT event from send socket. If write buffer is empty,
154  // try to read new data from the encoder.
155  if (write_size == 0) {
156  // First two bytes (sizeof uint16_t) are used to store message
157  // offset in following steps. Note that by passing our buffer to
158  // the get data function we prevent it from returning its own buffer.
159  unsigned char *bf = out_buffer + sizeof (uint16_t);
160  size_t bfsz = out_buffer_size - sizeof (uint16_t);
161  uint16_t offset = 0xffff;
162 
163  size_t bytes = encoder.encode (&bf, bfsz);
164  while (bytes < bfsz) {
165  if (!more_flag && offset == 0xffff)
166  offset = static_cast<uint16_t> (bytes);
167  int rc = session->pull_msg (&msg);
168  if (rc == -1)
169  break;
170  more_flag = msg.flags () & msg_t::more;
171  encoder.load_msg (&msg);
172  bf = out_buffer + sizeof (uint16_t) + bytes;
173  bytes += encoder.encode (&bf, bfsz - bytes);
174  }
175 
176  // If there are no data to write stop polling for output.
177  if (bytes == 0) {
178  reset_pollout (handle);
179  return;
180  }
181 
182  write_size = sizeof (uint16_t) + bytes;
183 
184  // Put offset information in the buffer.
185  put_uint16 (out_buffer, offset);
186  }
187 
188  if (has_tx_timer) {
189  cancel_timer (tx_timer_id);
190  set_pollout (handle);
191  has_tx_timer = false;
192  }
193 
194  // Send the data.
195  size_t nbytes = pgm_socket.send (out_buffer, write_size);
196 
197  // We can write either all data or 0 which means rate limit reached.
198  if (nbytes == write_size)
199  write_size = 0;
200  else {
201  zmq_assert (nbytes == 0);
202 
203  if (errno == ENOMEM) {
204  // Stop polling handle and wait for tx timeout
205  const long timeout = pgm_socket.get_tx_timeout ();
206  add_timer (timeout, tx_timer_id);
207  reset_pollout (handle);
208  has_tx_timer = true;
209  } else
210  errno_assert (errno == EBUSY);
211  }
212 }
213 
214 void zmq::pgm_sender_t::timer_event (int token)
215 {
216  // Timer cancels on return by poller_base.
217  if (token == rx_timer_id) {
218  has_rx_timer = false;
219  in_event ();
220  } else if (token == tx_timer_id) {
221  // Restart polling handle and retry sending
222  has_tx_timer = false;
223  set_pollout (handle);
224  out_event ();
225  } else
226  zmq_assert (false);
227 }
228 
229 #endif
init
WEPOLL_INTERNAL int init(void)
Definition: wepoll.c:858
NULL
NULL
Definition: test_security_zap.cpp:405
zmq::retired_fd
@ retired_fd
Definition: fd.hpp:32
options
Message * options
Definition: src/google/protobuf/descriptor.cc:3119
precompiled.hpp
pgm_sender.hpp
zmq_assert
#define zmq_assert(x)
Definition: err.hpp:102
errno
int errno
encoder
static char encoder[85+1]
Definition: zmq_utils.cpp:72
bytes
uint8 bytes[10]
Definition: coded_stream_unittest.cc:153
wire.hpp
alloc_assert
#define alloc_assert(x)
Definition: err.hpp:146
errno_assert
#define errno_assert(x)
Definition: err.hpp:113
macros.hpp
zmq::endpoint_uri_pair_t
Definition: endpoint.hpp:17
stdint.hpp
offset
GLintptr offset
Definition: glcorearb.h:2944
LIBZMQ_UNUSED
#define LIBZMQ_UNUSED(object)
Definition: macros.hpp:6
timeout
GLbitfield GLuint64 timeout
Definition: glcorearb.h:3588
zmq::msg_t::init
int init()
Definition: msg.cpp:50
zmq::ZMQ_FINAL::msg
msg_t * msg()
Definition: raw_decoder.hpp:27
zmq::put_uint16
void put_uint16(unsigned char *buffer_, uint16_t value_)
Definition: wire.hpp:23
zmq::msg_t::more
@ more
Definition: msg.hpp:55
io_thread.hpp
err.hpp
fd_t
zmq_fd_t fd_t
Definition: libzmq/tests/testutil.hpp:98
session_base.hpp
false
#define false
Definition: cJSON.c:70
options_
DebugStringOptions options_
Definition: src/google/protobuf/descriptor.cc:2410


libaditof
Author(s):
autogenerated on Wed May 21 2025 02:06:57