5 #if defined ZMQ_HAVE_OPENPGM 
   17 zmq::pgm_sender_t::pgm_sender_t (io_thread_t *parent_,
 
   19     io_object_t (parent_),
 
   27     handle (static_cast<handle_t> (
NULL)),
 
   28     uplink_handle (static_cast<handle_t> (
NULL)),
 
   29     rdata_notify_handle (static_cast<handle_t> (
NULL)),
 
   30     pending_notify_handle (static_cast<handle_t> (
NULL)),
 
   41     int rc = pgm_socket.init (udp_encapsulation_, network_);
 
   45     out_buffer_size = pgm_socket.get_max_tsdu_size ();
 
   46     out_buffer = (
unsigned char *) malloc (out_buffer_size);
 
   52 void zmq::pgm_sender_t::plug (io_thread_t *io_thread_, session_base_t *session_)
 
   64     pgm_socket.get_sender_fds (&downlink_socket_fd, &uplink_socket_fd,
 
   65                                &rdata_notify_fd, &pending_notify_fd);
 
   67     handle = add_fd (downlink_socket_fd);
 
   68     uplink_handle = add_fd (uplink_socket_fd);
 
   69     rdata_notify_handle = add_fd (rdata_notify_fd);
 
   70     pending_notify_handle = add_fd (pending_notify_fd);
 
   74     set_pollin (uplink_handle);
 
   75     set_pollin (rdata_notify_handle);
 
   76     set_pollin (pending_notify_handle);
 
   82 void zmq::pgm_sender_t::unplug ()
 
   85         cancel_timer (rx_timer_id);
 
   90         cancel_timer (tx_timer_id);
 
   95     rm_fd (uplink_handle);
 
   96     rm_fd (rdata_notify_handle);
 
   97     rm_fd (pending_notify_handle);
 
  101 void zmq::pgm_sender_t::terminate ()
 
  107 void zmq::pgm_sender_t::restart_output ()
 
  109     set_pollout (handle);
 
  113 bool zmq::pgm_sender_t::restart_input ()
 
  121     return _empty_endpoint;
 
  124 zmq::pgm_sender_t::~pgm_sender_t ()
 
  126     int rc = msg.close ();
 
  135 void zmq::pgm_sender_t::in_event ()
 
  138         cancel_timer (rx_timer_id);
 
  139         has_rx_timer = 
false;
 
  143     pgm_socket.process_upstream ();
 
  145         const long timeout = pgm_socket.get_rx_timeout ();
 
  146         add_timer (
timeout, rx_timer_id);
 
  151 void zmq::pgm_sender_t::out_event ()
 
  155     if (write_size == 0) {
 
  159         unsigned char *bf = out_buffer + 
sizeof (uint16_t);
 
  160         size_t bfsz = out_buffer_size - 
sizeof (uint16_t);
 
  164         while (
bytes < bfsz) {
 
  165             if (!more_flag && 
offset == 0xffff)
 
  167             int rc = session->pull_msg (&msg);
 
  172             bf = out_buffer + 
sizeof (uint16_t) + 
bytes;
 
  178             reset_pollout (handle);
 
  182         write_size = 
sizeof (uint16_t) + 
bytes;
 
  189         cancel_timer (tx_timer_id);
 
  190         set_pollout (handle);
 
  191         has_tx_timer = 
false;
 
  195     size_t nbytes = pgm_socket.send (out_buffer, write_size);
 
  198     if (nbytes == write_size)
 
  203         if (
errno == ENOMEM) {
 
  205             const long timeout = pgm_socket.get_tx_timeout ();
 
  206             add_timer (
timeout, tx_timer_id);
 
  207             reset_pollout (handle);
 
  214 void zmq::pgm_sender_t::timer_event (
int token)
 
  217     if (token == rx_timer_id) {
 
  218         has_rx_timer = 
false;
 
  220     } 
else if (token == tx_timer_id) {
 
  222         has_tx_timer = 
false;
 
  223         set_pollout (handle);