pgm_receiver.hpp
Go to the documentation of this file.
1 /* SPDX-License-Identifier: MPL-2.0 */
2 
3 #ifndef __ZMQ_PGM_RECEIVER_HPP_INCLUDED__
4 #define __ZMQ_PGM_RECEIVER_HPP_INCLUDED__
5 
6 #if defined ZMQ_HAVE_OPENPGM
7 
8 #include <map>
9 #include <algorithm>
10 
11 #include "io_object.hpp"
12 #include "i_engine.hpp"
13 #include "options.hpp"
14 #include "v1_decoder.hpp"
15 #include "pgm_socket.hpp"
16 
17 namespace zmq
18 {
19 class io_thread_t;
20 class session_base_t;
21 
22 class pgm_receiver_t ZMQ_FINAL : public io_object_t, public i_engine
23 {
24  public:
25  pgm_receiver_t (zmq::io_thread_t *parent_, const options_t &options_);
26  ~pgm_receiver_t ();
27 
28  int init (bool udp_encapsulation_, const char *network_);
29 
30  // i_engine interface implementation.
31  bool has_handshake_stage () { return false; };
32  void plug (zmq::io_thread_t *io_thread_, zmq::session_base_t *session_);
33  void terminate ();
34  bool restart_input ();
35  void restart_output ();
36  void zap_msg_available () {}
37  const endpoint_uri_pair_t &get_endpoint () const;
38 
39  // i_poll_events interface implementation.
40  void in_event ();
41  void timer_event (int token);
42 
43  private:
44  // Unplug the engine from the session.
45  void unplug ();
46 
47  // Decode received data (inpos, insize) and forward decoded
48  // messages to the session.
49  int process_input (v1_decoder_t *decoder);
50 
51  // PGM is not able to move subscriptions upstream. Thus, drop all
52  // the pending subscriptions.
53  void drop_subscriptions ();
54 
55  // RX timeout timer ID.
56  enum
57  {
58  rx_timer_id = 0xa1
59  };
60 
61  const endpoint_uri_pair_t _empty_endpoint;
62 
63  // RX timer is running.
64  bool has_rx_timer;
65 
66  // If joined is true we are already getting messages from the peer.
67  // It it's false, we are getting data but still we haven't seen
68  // beginning of a message.
69  struct peer_info_t
70  {
71  bool joined;
72  v1_decoder_t *decoder;
73  };
74 
75  struct tsi_comp
76  {
77  bool operator() (const pgm_tsi_t &ltsi, const pgm_tsi_t &rtsi) const
78  {
79  uint32_t ll[2], rl[2];
80  memcpy (ll, &ltsi, sizeof (ll));
81  memcpy (rl, &rtsi, sizeof (rl));
82  return (ll[0] < rl[0]) || (ll[0] == rl[0] && ll[1] < rl[1]);
83  }
84  };
85 
86  typedef std::map<pgm_tsi_t, peer_info_t, tsi_comp> peers_t;
87  peers_t peers;
88 
89  // PGM socket.
90  pgm_socket_t pgm_socket;
91 
92  // Socket options.
93  options_t options;
94 
95  // Associated session.
96  zmq::session_base_t *session;
97 
98  const pgm_tsi_t *active_tsi;
99 
100  // Number of bytes not consumed by the decoder due to pipe overflow.
101  size_t insize;
102 
103  // Pointer to data still waiting to be processed by the decoder.
104  const unsigned char *inpos;
105 
106  // Poll handle associated with PGM socket.
107  handle_t socket_handle;
108 
109  // Poll handle associated with engine PGM waiting pipe.
110  handle_t pipe_handle;
111 
112  ZMQ_NON_COPYABLE_NOR_MOVABLE (pgm_receiver_t)
113 };
114 }
115 
116 #endif
117 
118 #endif
zmq::session_base_t
Definition: session_base.hpp:21
init
WEPOLL_INTERNAL int init(void)
Definition: wepoll.c:858
options
Message * options
Definition: src/google/protobuf/descriptor.cc:3119
zmq
Definition: zmq.hpp:229
v1_decoder.hpp
ZMQ_NON_COPYABLE_NOR_MOVABLE
#define ZMQ_NON_COPYABLE_NOR_MOVABLE(classname)
Definition: macros.hpp:58
pgm_socket.hpp
decoder
static uint8_t decoder[96]
Definition: zmq_utils.cpp:85
options.hpp
io_object.hpp
i_engine.hpp
ZMQ_FINAL
Definition: unittest_ip_resolver.cpp:26
options_
DebugStringOptions options_
Definition: src/google/protobuf/descriptor.cc:2410


libaditof
Author(s):
autogenerated on Wed May 21 2025 02:06:57