object.cpp
Go to the documentation of this file.
1 /* SPDX-License-Identifier: MPL-2.0 */
2 
3 #include "precompiled.hpp"
4 #include <string.h>
5 #include <stdarg.h>
6 
7 #include "object.hpp"
8 #include "ctx.hpp"
9 #include "err.hpp"
10 #include "pipe.hpp"
11 #include "io_thread.hpp"
12 #include "session_base.hpp"
13 #include "socket_base.hpp"
14 
15 zmq::object_t::object_t (ctx_t *ctx_, uint32_t tid_) : _ctx (ctx_), _tid (tid_)
16 {
17 }
18 
20  _ctx (parent_->_ctx), _tid (parent_->_tid)
21 {
22 }
23 
25 {
26 }
27 
28 uint32_t zmq::object_t::get_tid () const
29 {
30  return _tid;
31 }
32 
33 void zmq::object_t::set_tid (uint32_t id_)
34 {
35  _tid = id_;
36 }
37 
38 zmq::ctx_t *zmq::object_t::get_ctx () const
39 {
40  return _ctx;
41 }
42 
44 {
45  switch (cmd_.type) {
47  process_activate_read ();
48  break;
49 
51  process_activate_write (cmd_.args.activate_write.msgs_read);
52  break;
53 
54  case command_t::stop:
55  process_stop ();
56  break;
57 
58  case command_t::plug:
59  process_plug ();
60  process_seqnum ();
61  break;
62 
63  case command_t::own:
64  process_own (cmd_.args.own.object);
65  process_seqnum ();
66  break;
67 
68  case command_t::attach:
69  process_attach (cmd_.args.attach.engine);
70  process_seqnum ();
71  break;
72 
73  case command_t::bind:
74  process_bind (cmd_.args.bind.pipe);
75  process_seqnum ();
76  break;
77 
78  case command_t::hiccup:
79  process_hiccup (cmd_.args.hiccup.pipe);
80  break;
81 
83  process_pipe_peer_stats (cmd_.args.pipe_peer_stats.queue_count,
86  break;
87 
89  process_pipe_stats_publish (
93  break;
94 
96  process_pipe_term ();
97  break;
98 
100  process_pipe_term_ack ();
101  break;
102 
103  case command_t::pipe_hwm:
104  process_pipe_hwm (cmd_.args.pipe_hwm.inhwm,
105  cmd_.args.pipe_hwm.outhwm);
106  break;
107 
108  case command_t::term_req:
109  process_term_req (cmd_.args.term_req.object);
110  break;
111 
112  case command_t::term:
113  process_term (cmd_.args.term.linger);
114  break;
115 
116  case command_t::term_ack:
117  process_term_ack ();
118  break;
119 
121  process_term_endpoint (cmd_.args.term_endpoint.endpoint);
122  break;
123 
124  case command_t::reap:
125  process_reap (cmd_.args.reap.socket);
126  break;
127 
128  case command_t::reaped:
129  process_reaped ();
130  break;
131 
133  process_seqnum ();
134  break;
135 
137  process_conn_failed ();
138  break;
139 
140  case command_t::done:
141  default:
142  zmq_assert (false);
143  }
144 }
145 
146 int zmq::object_t::register_endpoint (const char *addr_,
147  const endpoint_t &endpoint_)
148 {
149  return _ctx->register_endpoint (addr_, endpoint_);
150 }
151 
153  socket_base_t *socket_)
154 {
155  return _ctx->unregister_endpoint (addr_, socket_);
156 }
157 
159 {
160  return _ctx->unregister_endpoints (socket_);
161 }
162 
164 {
165  return _ctx->find_endpoint (addr_);
166 }
167 
169  const endpoint_t &endpoint_,
170  pipe_t **pipes_)
171 {
172  _ctx->pend_connection (addr_, endpoint_, pipes_);
173 }
174 
175 void zmq::object_t::connect_pending (const char *addr_,
176  zmq::socket_base_t *bind_socket_)
177 {
178  return _ctx->connect_pending (addr_, bind_socket_);
179 }
180 
182 {
183  _ctx->destroy_socket (socket_);
184 }
185 
186 zmq::io_thread_t *zmq::object_t::choose_io_thread (uint64_t affinity_) const
187 {
188  return _ctx->choose_io_thread (affinity_);
189 }
190 
192 {
193  // 'stop' command goes always from administrative thread to
194  // the current object.
195  command_t cmd;
196  cmd.destination = this;
197  cmd.type = command_t::stop;
198  _ctx->send_command (_tid, cmd);
199 }
200 
201 void zmq::object_t::send_plug (own_t *destination_, bool inc_seqnum_)
202 {
203  if (inc_seqnum_)
204  destination_->inc_seqnum ();
205 
206  command_t cmd;
207  cmd.destination = destination_;
208  cmd.type = command_t::plug;
209  send_command (cmd);
210 }
211 
212 void zmq::object_t::send_own (own_t *destination_, own_t *object_)
213 {
214  destination_->inc_seqnum ();
215  command_t cmd;
216  cmd.destination = destination_;
217  cmd.type = command_t::own;
218  cmd.args.own.object = object_;
219  send_command (cmd);
220 }
221 
223  i_engine *engine_,
224  bool inc_seqnum_)
225 {
226  if (inc_seqnum_)
227  destination_->inc_seqnum ();
228 
229  command_t cmd;
230  cmd.destination = destination_;
231  cmd.type = command_t::attach;
232  cmd.args.attach.engine = engine_;
233  send_command (cmd);
234 }
235 
237 {
238  command_t cmd;
239  cmd.destination = destination_;
241  send_command (cmd);
242 }
243 
244 void zmq::object_t::send_bind (own_t *destination_,
245  pipe_t *pipe_,
246  bool inc_seqnum_)
247 {
248  if (inc_seqnum_)
249  destination_->inc_seqnum ();
250 
251  command_t cmd;
252  cmd.destination = destination_;
253  cmd.type = command_t::bind;
254  cmd.args.bind.pipe = pipe_;
255  send_command (cmd);
256 }
257 
258 void zmq::object_t::send_activate_read (pipe_t *destination_)
259 {
260  command_t cmd;
261  cmd.destination = destination_;
263  send_command (cmd);
264 }
265 
266 void zmq::object_t::send_activate_write (pipe_t *destination_,
267  uint64_t msgs_read_)
268 {
269  command_t cmd;
270  cmd.destination = destination_;
272  cmd.args.activate_write.msgs_read = msgs_read_;
273  send_command (cmd);
274 }
275 
276 void zmq::object_t::send_hiccup (pipe_t *destination_, void *pipe_)
277 {
278  command_t cmd;
279  cmd.destination = destination_;
280  cmd.type = command_t::hiccup;
281  cmd.args.hiccup.pipe = pipe_;
282  send_command (cmd);
283 }
284 
285 void zmq::object_t::send_pipe_peer_stats (pipe_t *destination_,
286  uint64_t queue_count_,
287  own_t *socket_base_,
288  endpoint_uri_pair_t *endpoint_pair_)
289 {
290  command_t cmd;
291  cmd.destination = destination_;
293  cmd.args.pipe_peer_stats.queue_count = queue_count_;
294  cmd.args.pipe_peer_stats.socket_base = socket_base_;
295  cmd.args.pipe_peer_stats.endpoint_pair = endpoint_pair_;
296  send_command (cmd);
297 }
298 
300  own_t *destination_,
301  uint64_t outbound_queue_count_,
302  uint64_t inbound_queue_count_,
303  endpoint_uri_pair_t *endpoint_pair_)
304 {
305  command_t cmd;
306  cmd.destination = destination_;
308  cmd.args.pipe_stats_publish.outbound_queue_count = outbound_queue_count_;
309  cmd.args.pipe_stats_publish.inbound_queue_count = inbound_queue_count_;
310  cmd.args.pipe_stats_publish.endpoint_pair = endpoint_pair_;
311  send_command (cmd);
312 }
313 
314 void zmq::object_t::send_pipe_term (pipe_t *destination_)
315 {
316  command_t cmd;
317  cmd.destination = destination_;
319  send_command (cmd);
320 }
321 
322 void zmq::object_t::send_pipe_term_ack (pipe_t *destination_)
323 {
324  command_t cmd;
325  cmd.destination = destination_;
327  send_command (cmd);
328 }
329 
330 void zmq::object_t::send_pipe_hwm (pipe_t *destination_,
331  int inhwm_,
332  int outhwm_)
333 {
334  command_t cmd;
335  cmd.destination = destination_;
337  cmd.args.pipe_hwm.inhwm = inhwm_;
338  cmd.args.pipe_hwm.outhwm = outhwm_;
339  send_command (cmd);
340 }
341 
342 void zmq::object_t::send_term_req (own_t *destination_, own_t *object_)
343 {
344  command_t cmd;
345  cmd.destination = destination_;
347  cmd.args.term_req.object = object_;
348  send_command (cmd);
349 }
350 
351 void zmq::object_t::send_term (own_t *destination_, int linger_)
352 {
353  command_t cmd;
354  cmd.destination = destination_;
355  cmd.type = command_t::term;
356  cmd.args.term.linger = linger_;
357  send_command (cmd);
358 }
359 
361 {
362  command_t cmd;
363  cmd.destination = destination_;
365  send_command (cmd);
366 }
367 
369  std::string *endpoint_)
370 {
371  command_t cmd;
372  cmd.destination = destination_;
374  cmd.args.term_endpoint.endpoint = endpoint_;
375  send_command (cmd);
376 }
377 
379 {
380  command_t cmd;
381  cmd.destination = _ctx->get_reaper ();
382  cmd.type = command_t::reap;
383  cmd.args.reap.socket = socket_;
384  send_command (cmd);
385 }
386 
388 {
389  command_t cmd;
390  cmd.destination = _ctx->get_reaper ();
391  cmd.type = command_t::reaped;
392  send_command (cmd);
393 }
394 
396 {
397  command_t cmd;
398  cmd.destination = socket_;
400  send_command (cmd);
401 }
402 
404 {
405  command_t cmd;
406  cmd.destination = NULL;
407  cmd.type = command_t::done;
408  _ctx->send_command (ctx_t::term_tid, cmd);
409 }
410 
412 {
413  zmq_assert (false);
414 }
415 
417 {
418  zmq_assert (false);
419 }
420 
422 {
423  zmq_assert (false);
424 }
425 
427 {
428  zmq_assert (false);
429 }
430 
432 {
433  zmq_assert (false);
434 }
435 
437 {
438  zmq_assert (false);
439 }
440 
442 {
443  zmq_assert (false);
444 }
445 
447 {
448  zmq_assert (false);
449 }
450 
452  own_t *,
454 {
455  zmq_assert (false);
456 }
457 
459  uint64_t,
461 {
462  zmq_assert (false);
463 }
464 
466 {
467  zmq_assert (false);
468 }
469 
471 {
472  zmq_assert (false);
473 }
474 
476 {
477  zmq_assert (false);
478 }
479 
481 {
482  zmq_assert (false);
483 }
484 
486 {
487  zmq_assert (false);
488 }
489 
491 {
492  zmq_assert (false);
493 }
494 
496 {
497  zmq_assert (false);
498 }
499 
501 {
502  zmq_assert (false);
503 }
504 
506 {
507  zmq_assert (false);
508 }
509 
511 {
512  zmq_assert (false);
513 }
514 
516 {
517  zmq_assert (false);
518 }
519 
521 {
522  _ctx->send_command (cmd_.destination->get_tid (), cmd_);
523 }
zmq::object_t::connect_pending
void connect_pending(const char *addr_, zmq::socket_base_t *bind_socket_)
Definition: object.cpp:175
zmq::object_t::~object_t
virtual ~object_t()
Definition: object.cpp:24
zmq::session_base_t
Definition: session_base.hpp:21
zmq::command_t::args_t::attach
struct zmq::command_t::args_t::@27 attach
zmq::object_t::process_pipe_peer_stats
virtual void process_pipe_peer_stats(uint64_t queue_count_, zmq::own_t *socket_base_, endpoint_uri_pair_t *endpoint_pair_)
Definition: object.cpp:451
zmq::command_t::args_t::endpoint_pair
endpoint_uri_pair_t * endpoint_pair
Definition: command.hpp:167
zmq::object_t::process_stop
virtual void process_stop()
Definition: object.cpp:411
zmq::object_t::process_plug
virtual void process_plug()
Definition: object.cpp:416
zmq::object_t::send_bind
void send_bind(zmq::own_t *destination_, zmq::pipe_t *pipe_, bool inc_seqnum_=true)
Definition: object.cpp:244
zmq::command_t::done
@ done
Definition: command.hpp:49
zmq::object_t::process_pipe_term_ack
virtual void process_pipe_term_ack()
Definition: object.cpp:470
zmq::command_t::args_t::socket
zmq::socket_base_t * socket
Definition: command.hpp:154
zmq::object_t::process_pipe_stats_publish
virtual void process_pipe_stats_publish(uint64_t outbound_queue_count_, uint64_t inbound_queue_count_, endpoint_uri_pair_t *endpoint_pair_)
Definition: object.cpp:458
zmq::object_t::send_stop
void send_stop()
Definition: object.cpp:191
zmq::command_t::term
@ term
Definition: command.hpp:40
NULL
NULL
Definition: test_security_zap.cpp:405
zmq::object_t::unregister_endpoints
void unregister_endpoints(zmq::socket_base_t *socket_)
Definition: object.cpp:158
zmq::object_t::find_endpoint
zmq::endpoint_t find_endpoint(const char *addr_) const
Definition: object.cpp:163
zmq::command_t::args_t::outbound_queue_count
uint64_t outbound_queue_count
Definition: command.hpp:174
zmq::object_t::send_command
void send_command(const command_t &cmd_)
Definition: object.cpp:520
zmq::command_t::args_t::inhwm
int inhwm
Definition: command.hpp:120
zmq::object_t::process_command
void process_command(const zmq::command_t &cmd_)
Definition: object.cpp:43
zmq::command_t::args_t::pipe
zmq::pipe_t * pipe
Definition: command.hpp:82
zmq::command_t::args_t::linger
int linger
Definition: command.hpp:134
zmq::command_t::args_t::hiccup
struct zmq::command_t::args_t::@31 hiccup
zmq::object_t::choose_io_thread
zmq::io_thread_t * choose_io_thread(uint64_t affinity_) const
Definition: object.cpp:186
zmq::command_t::args_t::term_req
struct zmq::command_t::args_t::@35 term_req
zmq::command_t::args_t::term_endpoint
struct zmq::command_t::args_t::@38 term_endpoint
precompiled.hpp
zmq::command_t::args_t::bind
struct zmq::command_t::args_t::@28 bind
zmq::command_t::activate_write
@ activate_write
Definition: command.hpp:34
zmq::command_t::term_ack
@ term_ack
Definition: command.hpp:41
zmq_assert
#define zmq_assert(x)
Definition: err.hpp:102
zmq::command_t::args_t::socket_base
zmq::own_t * socket_base
Definition: command.hpp:166
zmq::object_t::send_activate_read
void send_activate_read(zmq::pipe_t *destination_)
Definition: object.cpp:258
zmq::command_t::own
@ own
Definition: command.hpp:30
zmq::command_t::args_t::own
struct zmq::command_t::args_t::@26 own
string
GLsizei const GLchar *const * string
Definition: glcorearb.h:3083
zmq::command_t::args_t::queue_count
uint64_t queue_count
Definition: command.hpp:165
zmq::object_t::send_reap
void send_reap(zmq::socket_base_t *socket_)
Definition: object.cpp:378
zmq::command_t::pipe_term_ack
@ pipe_term_ack
Definition: command.hpp:37
zmq::command_t::plug
@ plug
Definition: command.hpp:29
zmq::command_t::args_t::engine
struct i_engine * engine
Definition: command.hpp:75
zmq::socket_base_t
Definition: socket_base.hpp:31
zmq::command_t::args_t::object
zmq::own_t * object
Definition: command.hpp:68
zmq::command_t::bind
@ bind
Definition: command.hpp:32
zmq::command_t::attach
@ attach
Definition: command.hpp:31
zmq::object_t::send_pipe_peer_stats
void send_pipe_peer_stats(zmq::pipe_t *destination_, uint64_t queue_count_, zmq::own_t *socket_base, endpoint_uri_pair_t *endpoint_pair_)
Definition: object.cpp:285
ctx.hpp
zmq::object_t::set_tid
void set_tid(uint32_t id_)
Definition: object.cpp:33
zmq::object_t::get_ctx
ctx_t * get_ctx() const
Definition: object.cpp:38
zmq::command_t::type
enum zmq::command_t::type_t type
zmq::command_t::args_t::msgs_read
uint64_t msgs_read
Definition: command.hpp:95
zmq::object_t::process_term
virtual void process_term(int linger_)
Definition: object.cpp:485
zmq::object_t::send_pipe_term
void send_pipe_term(zmq::pipe_t *destination_)
Definition: object.cpp:314
zmq::command_t::args_t::outhwm
int outhwm
Definition: command.hpp:121
zmq::endpoint_uri_pair_t
Definition: endpoint.hpp:17
zmq::command_t::pipe_term
@ pipe_term
Definition: command.hpp:36
zmq::command_t::stop
@ stop
Definition: command.hpp:28
zmq::object_t::send_term_endpoint
void send_term_endpoint(own_t *destination_, std::string *endpoint_)
Definition: object.cpp:368
zmq::command_t::reap
@ reap
Definition: command.hpp:43
zmq::command_t::args_t::inbound_queue_count
uint64_t inbound_queue_count
Definition: command.hpp:175
zmq::object_t::process_conn_failed
virtual void process_conn_failed()
Definition: object.cpp:515
zmq::command_t::inproc_connected
@ inproc_connected
Definition: command.hpp:45
pipe.hpp
zmq::command_t::term_endpoint
@ term_endpoint
Definition: command.hpp:42
zmq::command_t::args_t::activate_write
struct zmq::command_t::args_t::@30 activate_write
zmq::command_t::reaped
@ reaped
Definition: command.hpp:44
zmq::command_t::args_t::term
struct zmq::command_t::args_t::@36 term
zmq::object_t::send_conn_failed
void send_conn_failed(zmq::session_base_t *destination_)
Definition: object.cpp:236
zmq::object_t::send_done
void send_done()
Definition: object.cpp:403
send_command
void send_command(fd_t s_, char(&command_)[N])
Definition: test_security_curve.cpp:271
zmq::object_t::process_term_endpoint
virtual void process_term_endpoint(std::string *endpoint_)
Definition: object.cpp:495
zmq::command_t::pipe_peer_stats
@ pipe_peer_stats
Definition: command.hpp:47
zmq::object_t::send_hiccup
void send_hiccup(zmq::pipe_t *destination_, void *pipe_)
Definition: object.cpp:276
zmq::endpoint_t
Definition: ctx.hpp:31
zmq::object_t::send_attach
void send_attach(zmq::session_base_t *destination_, zmq::i_engine *engine_, bool inc_seqnum_=true)
Definition: object.cpp:222
zmq::object_t
Definition: object.hpp:28
zmq::object_t::object_t
object_t(zmq::ctx_t *ctx_, uint32_t tid_)
zmq::object_t::send_pipe_stats_publish
void send_pipe_stats_publish(zmq::own_t *destination_, uint64_t outbound_queue_count_, uint64_t inbound_queue_count_, endpoint_uri_pair_t *endpoint_pair_)
Definition: object.cpp:299
zmq::object_t::process_attach
virtual void process_attach(zmq::i_engine *engine_)
Definition: object.cpp:426
zmq::command_t::args_t::reap
struct zmq::command_t::args_t::@39 reap
zmq::own_t::inc_seqnum
void inc_seqnum()
Definition: own.cpp:39
io_thread.hpp
zmq::command_t::activate_read
@ activate_read
Definition: command.hpp:33
zmq::object_t::send_plug
void send_plug(zmq::own_t *destination_, bool inc_seqnum_=true)
Definition: object.cpp:201
zmq::command_t::args_t::pipe_hwm
struct zmq::command_t::args_t::@34 pipe_hwm
zmq::object_t::process_pipe_hwm
virtual void process_pipe_hwm(int inhwm_, int outhwm_)
Definition: object.cpp:475
socket_base.hpp
zmq::object_t::register_endpoint
int register_endpoint(const char *addr_, const zmq::endpoint_t &endpoint_)
Definition: object.cpp:146
zmq::object_t::process_term_req
virtual void process_term_req(zmq::own_t *object_)
Definition: object.cpp:480
zmq::object_t::process_term_ack
virtual void process_term_ack()
Definition: object.cpp:490
zmq::object_t::send_reaped
void send_reaped()
Definition: object.cpp:387
zmq::command_t::pipe_stats_publish
@ pipe_stats_publish
Definition: command.hpp:48
zmq::command_t::args_t::endpoint
std::string * endpoint
Definition: command.hpp:147
zmq::object_t::pend_connection
void pend_connection(const std::string &addr_, const endpoint_t &endpoint_, pipe_t **pipes_)
Definition: object.cpp:168
zmq::own_t
Definition: own.hpp:21
zmq::command_t::term_req
@ term_req
Definition: command.hpp:39
zmq::object_t::send_pipe_hwm
void send_pipe_hwm(zmq::pipe_t *destination_, int inhwm_, int outhwm_)
Definition: object.cpp:330
zmq::command_t::args_t::pipe_peer_stats
struct zmq::command_t::args_t::@41 pipe_peer_stats
err.hpp
zmq::object_t::process_activate_read
virtual void process_activate_read()
Definition: object.cpp:436
zmq::object_t::process_activate_write
virtual void process_activate_write(uint64_t msgs_read_)
Definition: object.cpp:441
zmq::command_t::args_t::pipe_stats_publish
struct zmq::command_t::args_t::@42 pipe_stats_publish
zmq::command_t::destination
zmq::object_t * destination
Definition: command.hpp:24
zmq::object_t::send_term_ack
void send_term_ack(zmq::own_t *destination_)
Definition: object.cpp:360
zmq::object_t::destroy_socket
void destroy_socket(zmq::socket_base_t *socket_)
Definition: object.cpp:181
zmq::object_t::process_seqnum
virtual void process_seqnum()
Definition: object.cpp:510
zmq::object_t::process_reap
virtual void process_reap(zmq::socket_base_t *socket_)
Definition: object.cpp:500
zmq::object_t::process_bind
virtual void process_bind(zmq::pipe_t *pipe_)
Definition: object.cpp:431
zmq::object_t::send_own
void send_own(zmq::own_t *destination_, zmq::own_t *object_)
Definition: object.cpp:212
zmq::object_t::unregister_endpoint
int unregister_endpoint(const std::string &addr_, socket_base_t *socket_)
Definition: object.cpp:152
zmq::object_t::send_activate_write
void send_activate_write(zmq::pipe_t *destination_, uint64_t msgs_read_)
Definition: object.cpp:266
zmq::object_t::process_pipe_term
virtual void process_pipe_term()
Definition: object.cpp:465
zmq::object_t::send_term_req
void send_term_req(zmq::own_t *destination_, zmq::own_t *object_)
Definition: object.cpp:342
zmq::i_engine
Definition: i_engine.hpp:15
zmq::object_t::process_own
virtual void process_own(zmq::own_t *object_)
Definition: object.cpp:421
zmq::command_t::hiccup
@ hiccup
Definition: command.hpp:35
zmq::object_t::send_pipe_term_ack
void send_pipe_term_ack(zmq::pipe_t *destination_)
Definition: object.cpp:322
zmq::object_t::process_reaped
virtual void process_reaped()
Definition: object.cpp:505
session_base.hpp
object.hpp
zmq::object_t::process_hiccup
virtual void process_hiccup(void *pipe_)
Definition: object.cpp:446
zmq::command_t::args
union zmq::command_t::args_t args
zmq::command_t
Definition: command.hpp:21
zmq::command_t::pipe_hwm
@ pipe_hwm
Definition: command.hpp:38
zmq::object_t::send_inproc_connected
void send_inproc_connected(zmq::socket_base_t *socket_)
Definition: object.cpp:395
zmq::command_t::conn_failed
@ conn_failed
Definition: command.hpp:46
zmq::object_t::get_tid
uint32_t get_tid() const
Definition: object.cpp:28
zmq::object_t::send_term
void send_term(zmq::own_t *destination_, int linger_)
Definition: object.cpp:351


libaditof
Author(s):
autogenerated on Wed May 21 2025 02:06:57