5 #if defined ZMQ_HAVE_OPENPGM
17 zmq::pgm_sender_t::pgm_sender_t (io_thread_t *parent_,
19 io_object_t (parent_),
27 handle (static_cast<handle_t> (
NULL)),
28 uplink_handle (static_cast<handle_t> (
NULL)),
29 rdata_notify_handle (static_cast<handle_t> (
NULL)),
30 pending_notify_handle (static_cast<handle_t> (
NULL)),
41 int rc = pgm_socket.init (udp_encapsulation_, network_);
45 out_buffer_size = pgm_socket.get_max_tsdu_size ();
46 out_buffer = (
unsigned char *) malloc (out_buffer_size);
52 void zmq::pgm_sender_t::plug (io_thread_t *io_thread_, session_base_t *session_)
64 pgm_socket.get_sender_fds (&downlink_socket_fd, &uplink_socket_fd,
65 &rdata_notify_fd, &pending_notify_fd);
67 handle = add_fd (downlink_socket_fd);
68 uplink_handle = add_fd (uplink_socket_fd);
69 rdata_notify_handle = add_fd (rdata_notify_fd);
70 pending_notify_handle = add_fd (pending_notify_fd);
74 set_pollin (uplink_handle);
75 set_pollin (rdata_notify_handle);
76 set_pollin (pending_notify_handle);
82 void zmq::pgm_sender_t::unplug ()
85 cancel_timer (rx_timer_id);
90 cancel_timer (tx_timer_id);
95 rm_fd (uplink_handle);
96 rm_fd (rdata_notify_handle);
97 rm_fd (pending_notify_handle);
101 void zmq::pgm_sender_t::terminate ()
107 void zmq::pgm_sender_t::restart_output ()
109 set_pollout (handle);
113 bool zmq::pgm_sender_t::restart_input ()
121 return _empty_endpoint;
124 zmq::pgm_sender_t::~pgm_sender_t ()
126 int rc = msg.close ();
135 void zmq::pgm_sender_t::in_event ()
138 cancel_timer (rx_timer_id);
139 has_rx_timer =
false;
143 pgm_socket.process_upstream ();
145 const long timeout = pgm_socket.get_rx_timeout ();
146 add_timer (
timeout, rx_timer_id);
151 void zmq::pgm_sender_t::out_event ()
155 if (write_size == 0) {
159 unsigned char *bf = out_buffer +
sizeof (uint16_t);
160 size_t bfsz = out_buffer_size -
sizeof (uint16_t);
164 while (
bytes < bfsz) {
165 if (!more_flag &&
offset == 0xffff)
167 int rc = session->pull_msg (&msg);
172 bf = out_buffer +
sizeof (uint16_t) +
bytes;
178 reset_pollout (handle);
182 write_size =
sizeof (uint16_t) +
bytes;
189 cancel_timer (tx_timer_id);
190 set_pollout (handle);
191 has_tx_timer =
false;
195 size_t nbytes = pgm_socket.send (out_buffer, write_size);
198 if (nbytes == write_size)
203 if (
errno == ENOMEM) {
205 const long timeout = pgm_socket.get_tx_timeout ();
206 add_timer (
timeout, tx_timer_id);
207 reset_pollout (handle);
214 void zmq::pgm_sender_t::timer_event (
int token)
217 if (token == rx_timer_id) {
218 has_rx_timer =
false;
220 }
else if (token == tx_timer_id) {
222 has_tx_timer =
false;
223 set_pollout (handle);