6 #if defined ZMQ_HAVE_OPENPGM
17 zmq::pgm_receiver_t::pgm_receiver_t (
class io_thread_t *parent_,
19 io_object_t (parent_),
29 zmq::pgm_receiver_t::~pgm_receiver_t ()
37 return pgm_socket.init (udp_encapsulation_, network_);
40 void zmq::pgm_receiver_t::plug (io_thread_t *io_thread_,
41 session_base_t *session_)
47 pgm_socket.get_receiver_fds (&socket_fd, &waiting_pipe_fd);
48 socket_handle = add_fd (socket_fd);
49 pipe_handle = add_fd (waiting_pipe_fd);
50 set_pollin (pipe_handle);
51 set_pollin (socket_handle);
56 drop_subscriptions ();
59 void zmq::pgm_receiver_t::unplug ()
62 for (peers_t::iterator
it = peers.begin (),
end = peers.end ();
it !=
end;
64 if (
it->second.decoder !=
NULL) {
72 cancel_timer (rx_timer_id);
76 rm_fd (socket_handle);
82 void zmq::pgm_receiver_t::terminate ()
88 void zmq::pgm_receiver_t::restart_output ()
90 drop_subscriptions ();
93 bool zmq::pgm_receiver_t::restart_input ()
98 const peers_t::iterator
it = peers.find (*active_tsi);
103 int rc = session->push_msg (
it->second.decoder->msg ());
107 rc = process_input (
it->second.decoder);
116 it->second.joined =
false;
123 set_pollin (pipe_handle);
124 set_pollin (socket_handle);
134 return _empty_endpoint;
137 void zmq::pgm_receiver_t::in_event ()
146 const pgm_tsi_t *tsi =
NULL;
149 cancel_timer (rx_timer_id);
150 has_rx_timer =
false;
160 ssize_t received = pgm_socket.receive (&tmp, &tsi);
166 const long timeout = pgm_socket.get_rx_timeout ();
167 add_timer (
timeout, rx_timer_id);
174 peers_t::iterator
it = peers.find (*tsi);
177 if (received == -1) {
178 if (
it != peers.end ()) {
179 it->second.joined =
false;
180 if (
it->second.decoder !=
NULL) {
188 if (
it == peers.end ()) {
189 peer_info_t peer_info = {
false,
NULL};
190 it = peers.ZMQ_MAP_INSERT_OR_EMPLACE (*tsi, peer_info).first;
193 insize =
static_cast<size_t> (received);
194 inpos = (
unsigned char *) tmp;
199 inpos +=
sizeof (uint16_t);
200 insize -=
sizeof (uint16_t);
203 if (!
it->second.joined) {
217 it->second.joined =
true;
221 new (std::nothrow) v1_decoder_t (0,
options.maxmsgsize);
225 int rc = process_input (
it->second.decoder);
231 reset_pollin (pipe_handle);
232 reset_pollin (socket_handle);
237 it->second.joined =
false;
247 int zmq::pgm_receiver_t::process_input (v1_decoder_t *
decoder)
253 int rc =
decoder->decode (inpos, insize,
n);
260 rc = session->push_msg (
decoder->msg ());
270 void zmq::pgm_receiver_t::timer_event (
int token)
275 has_rx_timer =
false;
279 void zmq::pgm_receiver_t::drop_subscriptions ()
283 while (session->pull_msg (&msg) == 0)