ctx.hpp
Go to the documentation of this file.
1 /* SPDX-License-Identifier: MPL-2.0 */
2 
3 #ifndef __ZMQ_CTX_HPP_INCLUDED__
4 #define __ZMQ_CTX_HPP_INCLUDED__
5 
6 #include <map>
7 #include <vector>
8 #include <string>
9 #include <stdarg.h>
10 
11 #include "mailbox.hpp"
12 #include "array.hpp"
13 #include "config.hpp"
14 #include "mutex.hpp"
15 #include "stdint.hpp"
16 #include "options.hpp"
17 #include "atomic_counter.hpp"
18 #include "thread.hpp"
19 
20 namespace zmq
21 {
22 class object_t;
23 class io_thread_t;
24 class socket_base_t;
25 class reaper_t;
26 class pipe_t;
27 
28 // Information associated with inproc endpoint. Note that endpoint options
29 // are registered as well so that the peer can access them without a need
30 // for synchronisation, handshaking or similar.
31 struct endpoint_t
32 {
35 };
36 
38 {
39  public:
40  thread_ctx_t ();
41 
42  // Start a new thread with proper scheduling parameters.
43  void start_thread (thread_t &thread_,
44  thread_fn *tfn_,
45  void *arg_,
46  const char *name_ = NULL) const;
47 
48  int set (int option_, const void *optval_, size_t optvallen_);
49  int get (int option_, void *optval_, const size_t *optvallen_);
50 
51  protected:
52  // Synchronisation of access to context options.
54 
55  private:
56  // Thread parameters.
59  std::set<int> _thread_affinity_cpus;
61 };
62 
63 // Context object encapsulates all the global state associated with
64 // the library.
65 
66 class ctx_t ZMQ_FINAL : public thread_ctx_t
67 {
68  public:
69  // Create the context object.
70  ctx_t ();
71 
72  // Returns false if object is not a context.
73  bool check_tag () const;
74 
75  // This function is called when user invokes zmq_ctx_term. If there are
76  // no more sockets open it'll cause all the infrastructure to be shut
77  // down. If there are open sockets still, the deallocation happens
78  // after the last one is closed.
79  int terminate ();
80 
81  // This function starts the terminate process by unblocking any blocking
82  // operations currently in progress and stopping any more socket activity
83  // (except zmq_close).
84  // This function is non-blocking.
85  // terminate must still be called afterwards.
86  // This function is optional, terminate will unblock any current
87  // operations as well.
88  int shutdown ();
89 
90  // Set and get context properties.
91  int set (int option_, const void *optval_, size_t optvallen_);
92  int get (int option_, void *optval_, const size_t *optvallen_);
93  int get (int option_);
94 
95  // Create and destroy a socket.
96  zmq::socket_base_t *create_socket (int type_);
97  void destroy_socket (zmq::socket_base_t *socket_);
98 
99  // Send command to the destination thread.
100  void send_command (uint32_t tid_, const command_t &command_);
101 
102  // Returns the I/O thread that is the least busy at the moment.
103  // Affinity specifies which I/O threads are eligible (0 = all).
104  // Returns NULL if no I/O thread is available.
105  zmq::io_thread_t *choose_io_thread (uint64_t affinity_);
106 
107  // Returns reaper thread object.
108  zmq::object_t *get_reaper () const;
109 
110  // Management of inproc endpoints.
111  int register_endpoint (const char *addr_, const endpoint_t &endpoint_);
112  int unregister_endpoint (const std::string &addr_,
113  const socket_base_t *socket_);
114  void unregister_endpoints (const zmq::socket_base_t *socket_);
115  endpoint_t find_endpoint (const char *addr_);
116  void pend_connection (const std::string &addr_,
117  const endpoint_t &endpoint_,
118  pipe_t **pipes_);
119  void connect_pending (const char *addr_, zmq::socket_base_t *bind_socket_);
120 
121 #ifdef ZMQ_HAVE_VMCI
122  // Return family for the VMCI socket or -1 if it's not available.
123  int get_vmci_socket_family ();
124 #endif
125 
126  enum
127  {
128  term_tid = 0,
129  reaper_tid = 1
130  };
131 
132  ~ctx_t ();
133 
134  bool valid () const;
135 
136  private:
137  bool start ();
138 
140  {
142  pipe_t *connect_pipe;
143  pipe_t *bind_pipe;
144  };
145 
146  // Used to check whether the object is a context.
147  uint32_t _tag;
148 
149  // Sockets belonging to this context. We need the list so that
150  // we can notify the sockets when zmq_ctx_term() is called.
151  // The sockets will return ETERM then.
154 
155  // List of unused thread slots.
156  typedef std::vector<uint32_t> empty_slots_t;
158 
159  // If true, zmq_init has been called but no socket has been created
160  // yet. Launching of I/O threads is delayed.
161  bool _starting;
162 
163  // If true, zmq_ctx_term was already called.
165 
166  // Synchronisation of accesses to global slot-related data:
167  // sockets, empty_slots, terminating. It also synchronises
168  // access to zombie sockets as such (as opposed to slots) and provides
169  // a memory barrier to ensure that all CPU cores see the same data.
171 
172  // The reaper thread.
173  zmq::reaper_t *_reaper;
174 
175  // I/O threads.
176  typedef std::vector<zmq::io_thread_t *> io_threads_t;
178 
179  // Array of pointers to mailboxes for both application and I/O threads.
180  std::vector<i_mailbox *> _slots;
181 
182  // Mailbox for zmq_ctx_term thread.
183  mailbox_t _term_mailbox;
184 
185  // List of inproc endpoints within this context.
186  typedef std::map<std::string, endpoint_t> endpoints_t;
188 
189  // List of inproc connection endpoints pending a bind
190  typedef std::multimap<std::string, pending_connection_t>
193 
194  // Synchronisation of access to the list of inproc endpoints.
196 
197  // Maximum socket ID.
199 
200  // Maximum number of sockets that can be opened at the same time.
202 
203  // Maximum allowed message size
205 
206  // Number of I/O threads to launch.
208 
209  // Does context wait (possibly forever) on termination?
210  bool _blocky;
211 
212  // Is IPv6 enabled on this context?
213  bool _ipv6;
214 
215  // Should we use zero copy message decoding in this context?
217 
219 
220 #ifdef HAVE_FORK
221  // the process that created this context. Used to detect forking.
222  pid_t _pid;
223 #endif
224  enum side
225  {
227  bind_side
228  };
229 
230  static void
231  connect_inproc_sockets (zmq::socket_base_t *bind_socket_,
232  const options_t &bind_options_,
233  const pending_connection_t &pending_connection_,
234  side side_);
235 
236 #ifdef ZMQ_HAVE_VMCI
237  int _vmci_fd;
238  int _vmci_family;
239  mutex_t _vmci_sync;
240 #endif
241 };
242 }
243 
244 #endif
zmq::ZMQ_FINAL
Definition: channel.hpp:17
zmq::ZMQ_FINAL::_io_threads
io_threads_t _io_threads
Definition: ctx.hpp:177
zmq::ZMQ_FINAL::side
side
Definition: ctx.hpp:224
zmq::ZMQ_FINAL::pending_connection_t
Definition: ctx.hpp:139
zmq::ZMQ_FINAL::pending_connection_t::bind_pipe
pipe_t * bind_pipe
Definition: ctx.hpp:143
zmq::ZMQ_FINAL::pending_connection_t::endpoint
endpoint_t endpoint
Definition: ctx.hpp:141
atomic_counter.hpp
NULL
NULL
Definition: test_security_zap.cpp:405
zmq::options_t
Definition: options.hpp:34
zmq::ZMQ_FINAL::_io_thread_count
int _io_thread_count
Definition: ctx.hpp:207
config.hpp
get
ROSCPP_DECL bool get(const std::string &key, bool &b)
zmq::thread_ctx_t::start_thread
void start_thread(thread_t &thread_, thread_fn *tfn_, void *arg_, const char *name_=NULL) const
Definition: ctx.cpp:528
zmq::thread_fn
void() thread_fn(void *)
Definition: thread.hpp:17
zmq::thread_ctx_t::_thread_sched_policy
int _thread_sched_policy
Definition: ctx.hpp:58
array.hpp
zmq::ZMQ_FINAL::_pending_connections
pending_connections_t _pending_connections
Definition: ctx.hpp:192
string
GLsizei const GLchar *const * string
Definition: glcorearb.h:3083
zmq::socket_base_t
Definition: socket_base.hpp:31
zmq::ZMQ_FINAL::_zero_copy
bool _zero_copy
Definition: ctx.hpp:216
zmq::ZMQ_FINAL::_max_msgsz
int _max_msgsz
Definition: ctx.hpp:204
zmq::ZMQ_FINAL::_endpoints
endpoints_t _endpoints
Definition: ctx.hpp:187
zmq::ZMQ_FINAL::empty_slots_t
std::vector< uint32_t > empty_slots_t
Definition: ctx.hpp:156
zmq::ZMQ_FINAL::connect_side
@ connect_side
Definition: ctx.hpp:226
zmq::ZMQ_FINAL::_slots
std::vector< i_mailbox * > _slots
Definition: ctx.hpp:180
zmq
Definition: zmq.hpp:229
zmq::ZMQ_FINAL::max_socket_id
static atomic_counter_t max_socket_id
Definition: ctx.hpp:198
shutdown
ROSCONSOLE_DECL void shutdown()
stdint.hpp
zmq::thread_ctx_t::_thread_priority
int _thread_priority
Definition: ctx.hpp:57
zmq::ZMQ_FINAL::endpoints_t
std::map< std::string, endpoint_t > endpoints_t
Definition: ctx.hpp:186
start
GLuint start
Definition: glcorearb.h:2858
zmq::atomic_counter_t
Definition: atomic_counter.hpp:61
zmq::ZMQ_FINAL::_reaper
zmq::reaper_t * _reaper
Definition: ctx.hpp:173
zmq::ZMQ_FINAL::_starting
bool _starting
Definition: ctx.hpp:161
ZMQ_NON_COPYABLE_NOR_MOVABLE
#define ZMQ_NON_COPYABLE_NOR_MOVABLE(classname)
Definition: macros.hpp:58
mailbox.hpp
thread.hpp
zmq::thread_t
Definition: thread.hpp:26
zmq::ZMQ_FINAL::_empty_slots
empty_slots_t _empty_slots
Definition: ctx.hpp:157
name_
string name_
Definition: googletest.cc:182
zmq::ZMQ_FINAL::_terminating
bool _terminating
Definition: ctx.hpp:164
zmq::mutex_t
Definition: mutex.hpp:82
zmq::ZMQ_FINAL::_max_sockets
int _max_sockets
Definition: ctx.hpp:201
send_command
void send_command(fd_t s_, char(&command_)[N])
Definition: test_security_curve.cpp:271
zmq::endpoint_t
Definition: ctx.hpp:31
zmq::ZMQ_FINAL::_sockets
sockets_t _sockets
Definition: ctx.hpp:153
zmq::object_t
Definition: object.hpp:28
zmq::thread_ctx_t
Definition: ctx.hpp:37
zmq::thread_ctx_t::thread_ctx_t
thread_ctx_t()
Definition: ctx.cpp:522
zmq::ZMQ_FINAL::sockets_t
array_t< socket_base_t > sockets_t
Definition: ctx.hpp:152
zmq::thread_ctx_t::set
int set(int option_, const void *optval_, size_t optvallen_)
Definition: ctx.cpp:544
options.hpp
zmq::ZMQ_FINAL::pending_connection_t::connect_pipe
pipe_t * connect_pipe
Definition: ctx.hpp:142
zmq::ZMQ_FINAL::_tag
uint32_t _tag
Definition: ctx.hpp:147
cpp.gmock_class.set
set
Definition: gmock_class.py:44
zmq::ZMQ_FINAL::io_threads_t
std::vector< zmq::io_thread_t * > io_threads_t
Definition: ctx.hpp:176
zmq::ZMQ_FINAL::_ipv6
bool _ipv6
Definition: ctx.hpp:213
zmq::ZMQ_FINAL::_slot_sync
mutex_t _slot_sync
Definition: ctx.hpp:170
zmq::thread_ctx_t::_opt_sync
mutex_t _opt_sync
Definition: ctx.hpp:53
zmq::thread_ctx_t::get
int get(int option_, void *optval_, const size_t *optvallen_)
Definition: ctx.cpp:608
zmq::ZMQ_FINAL::pending_connections_t
std::multimap< std::string, pending_connection_t > pending_connections_t
Definition: ctx.hpp:191
zmq::thread_ctx_t::_thread_affinity_cpus
std::set< int > _thread_affinity_cpus
Definition: ctx.hpp:59
zmq::endpoint_t::options
options_t options
Definition: ctx.hpp:34
zmq::array_t< socket_base_t >
zmq::command_t
Definition: command.hpp:21
zmq::ZMQ_FINAL::_endpoints_sync
mutex_t _endpoints_sync
Definition: ctx.hpp:195
zmq::thread_ctx_t::_thread_name_prefix
std::string _thread_name_prefix
Definition: ctx.hpp:60
zmq::ZMQ_FINAL::_term_mailbox
mailbox_t _term_mailbox
Definition: ctx.hpp:183
zmq::ZMQ_FINAL::_blocky
bool _blocky
Definition: ctx.hpp:210
mutex.hpp
zmq::endpoint_t::socket
socket_base_t * socket
Definition: ctx.hpp:33


libaditof
Author(s):
autogenerated on Wed May 21 2025 02:06:49