norm_engine.hpp
Go to the documentation of this file.
1 
2 #ifndef __ZMQ_NORM_ENGINE_HPP_INCLUDED__
3 #define __ZMQ_NORM_ENGINE_HPP_INCLUDED__
4 
5 #if defined ZMQ_HAVE_NORM
6 
7 #if defined(ZMQ_HAVE_WINDOWS) && defined(ZMQ_IOTHREAD_POLLER_USE_EPOLL)
8 #define ZMQ_USE_NORM_SOCKET_WRAPPER
9 #endif
10 
11 
12 #include "io_object.hpp"
13 #include "i_engine.hpp"
14 #include "options.hpp"
15 #include "v2_decoder.hpp"
16 #include "v2_encoder.hpp"
17 
18 #include <normApi.h>
19 
20 namespace zmq
21 {
22 class io_thread_t;
23 class msg_t;
24 class session_base_t;
25 
26 class norm_engine_t ZMQ_FINAL : public io_object_t, public i_engine
27 {
28  public:
29  norm_engine_t (zmq::io_thread_t *parent_, const options_t &options_);
30  ~norm_engine_t () ZMQ_FINAL;
31 
32  // create NORM instance, session, etc
33  int init (const char *network_, bool send, bool recv);
34  void shutdown ();
35 
36  bool has_handshake_stage () ZMQ_FINAL { return false; };
37 
38  // i_engine interface implementation.
39  // Plug the engine to the session.
40  void plug (zmq::io_thread_t *io_thread_,
41  class session_base_t *session_) ZMQ_FINAL;
42 
43  // Terminate and deallocate the engine. Note that 'detached'
44  // events are not fired on termination.
45  void terminate () ZMQ_FINAL;
46 
47  // This method is called by the session to signalise that more
48  // messages can be written to the pipe.
49  bool restart_input () ZMQ_FINAL;
50 
51  // This method is called by the session to signalise that there
52  // are messages to send available.
53  void restart_output () ZMQ_FINAL;
54 
55  void zap_msg_available () ZMQ_FINAL {}
56 
57  const endpoint_uri_pair_t &get_endpoint () const ZMQ_FINAL;
58 
59  // i_poll_events interface implementation.
60  // (we only need in_event() for NormEvent notification)
61  // (i.e., don't have any output events or timers (yet))
62  void in_event ();
63 
64  private:
65  void unplug ();
66  void send_data ();
67  void recv_data (NormObjectHandle stream);
68 
69 
70  enum
71  {
72  BUFFER_SIZE = 2048
73  };
74 
75  // Used to keep track of streams from multiple senders
76  class NormRxStreamState
77  {
78  public:
79  NormRxStreamState (NormObjectHandle normStream,
80  int64_t maxMsgSize,
81  bool zeroCopy,
82  int inBatchSize);
83  ~NormRxStreamState ();
84 
85  NormObjectHandle GetStreamHandle () const { return norm_stream; }
86 
87  bool Init ();
88 
89  void SetRxReady (bool state) { rx_ready = state; }
90  bool IsRxReady () const { return rx_ready; }
91 
92  void SetSync (bool state) { in_sync = state; }
93  bool InSync () const { return in_sync; }
94 
95  // These are used to feed data to decoder
96  // and its underlying "msg" buffer
97  char *AccessBuffer () { return (char *) (buffer_ptr + buffer_count); }
98  size_t GetBytesNeeded () const { return buffer_size - buffer_count; }
99  void IncrementBufferCount (size_t count) { buffer_count += count; }
100  msg_t *AccessMsg () { return zmq_decoder->msg (); }
101  // This invokes the decoder "decode" method
102  // returning 0 if more data is needed,
103  // 1 if the message is complete, If an error
104  // occurs the 'sync' is dropped and the
105  // decoder re-initialized
106  int Decode ();
107 
108  class List
109  {
110  public:
111  List ();
112  ~List ();
113 
114  void Append (NormRxStreamState &item);
115  void Remove (NormRxStreamState &item);
116 
117  bool IsEmpty () const { return NULL == head; }
118 
119  void Destroy ();
120 
121  class Iterator
122  {
123  public:
124  Iterator (const List &list);
125  NormRxStreamState *GetNextItem ();
126 
127  private:
128  NormRxStreamState *next_item;
129  };
130  friend class Iterator;
131 
132  private:
133  NormRxStreamState *head;
134  NormRxStreamState *tail;
135 
136  }; // end class zmq::norm_engine_t::NormRxStreamState::List
137 
138  friend class List;
139 
140  List *AccessList () { return list; }
141 
142 
143  private:
144  NormObjectHandle norm_stream;
145  int64_t max_msg_size;
146  bool zero_copy;
147  int in_batch_size;
148  bool in_sync;
149  bool rx_ready;
150  v2_decoder_t *zmq_decoder;
151  bool skip_norm_sync;
152  unsigned char *buffer_ptr;
153  size_t buffer_size;
154  size_t buffer_count;
155 
156  NormRxStreamState *prev;
157  NormRxStreamState *next;
158  NormRxStreamState::List *list;
159 
160  }; // end class zmq::norm_engine_t::NormRxStreamState
161 
162  const endpoint_uri_pair_t _empty_endpoint;
163 
164  session_base_t *zmq_session;
165  options_t options;
166  NormInstanceHandle norm_instance;
167  handle_t norm_descriptor_handle;
168  NormSessionHandle norm_session;
169  bool is_sender;
170  bool is_receiver;
171  // Sender state
172  msg_t tx_msg;
173  v2_encoder_t zmq_encoder; // for tx messages (we use v2 for now)
174  NormObjectHandle norm_tx_stream;
175  bool tx_first_msg;
176  bool tx_more_bit;
177  bool zmq_output_ready; // zmq has msg(s) to send
178  bool norm_tx_ready; // norm has tx queue vacancy
179  // TBD - maybe don't need buffer if can access zmq message buffer directly?
180  char tx_buffer[BUFFER_SIZE];
181  unsigned int tx_index;
182  unsigned int tx_len;
183 
184  // Receiver state
185  // Lists of norm rx streams from remote senders
186  bool zmq_input_ready; // zmq ready to receive msg(s)
187  NormRxStreamState::List
188  rx_pending_list; // rx streams waiting for data reception
189  NormRxStreamState::List
190  rx_ready_list; // rx streams ready for NormStreamRead()
191  NormRxStreamState::List
192  msg_ready_list; // rx streams w/ msg ready for push to zmq
193 
194 #ifdef ZMQ_USE_NORM_SOCKET_WRAPPER
195  fd_t
196  wrapper_read_fd; // filedescriptor used to read norm events through the wrapper
197  DWORD wrapper_thread_id;
198  HANDLE wrapper_thread_handle;
199 #endif
200 
201 }; // end class norm_engine_t
202 }
203 
204 #endif // ZMQ_HAVE_NORM
205 
206 #endif // !__ZMQ_NORM_ENGINE_HPP_INCLUDED__
init
WEPOLL_INTERNAL int init(void)
Definition: wepoll.c:858
benchmarks.python.py_benchmark.const
const
Definition: py_benchmark.py:14
stream
GLuint GLuint stream
Definition: glcorearb.h:3946
NULL
NULL
Definition: test_security_zap.cpp:405
google::protobuf::python::repeated_composite_container::Remove
static PyObject * Remove(PyObject *pself, PyObject *value)
Definition: repeated_composite_container.cc:301
item
cJSON * item
Definition: cJSON.h:236
options
Message * options
Definition: src/google/protobuf/descriptor.cc:3119
Append
static void Append(State *state, const char *const str, const int length)
Definition: demangle.cc:272
google::protobuf::python::cmessage::Init
static int Init(CMessage *self, PyObject *args, PyObject *kwargs)
Definition: python/google/protobuf/pyext/message.cc:1286
send
void send(fd_t fd_, const char(&data_)[N])
Definition: test_security_curve.cpp:209
ZMQ_FINAL
#define ZMQ_FINAL
Definition: macros.hpp:35
zmq
Definition: zmq.hpp:229
v2_encoder.hpp
v2_decoder.hpp
versiongenerate.buffer_size
int buffer_size
Definition: versiongenerate.py:65
options.hpp
io_object.hpp
fd_t
zmq_fd_t fd_t
Definition: libzmq/tests/testutil.hpp:98
HANDLE
void * HANDLE
Definition: wepoll.c:70
next
static size_t next(const upb_table *t, size_t i)
Definition: php/ext/google/protobuf/upb.c:4889
i_engine.hpp
count
GLint GLsizei count
Definition: glcorearb.h:2830
ZMQ_FINAL
Definition: unittest_ip_resolver.cpp:26
options_
DebugStringOptions options_
Definition: src/google/protobuf/descriptor.cc:2410


libaditof
Author(s):
autogenerated on Wed May 21 2025 02:06:57