own.cpp
Go to the documentation of this file.
1 /* SPDX-License-Identifier: MPL-2.0 */
2 
3 #include "precompiled.hpp"
4 #include "own.hpp"
5 #include "err.hpp"
6 #include "io_thread.hpp"
7 
8 zmq::own_t::own_t (class ctx_t *parent_, uint32_t tid_) :
9  object_t (parent_, tid_),
10  _terminating (false),
11  _sent_seqnum (0),
12  _processed_seqnum (0),
13  _owner (NULL),
14  _term_acks (0)
15 {
16 }
17 
18 zmq::own_t::own_t (io_thread_t *io_thread_, const options_t &options_) :
19  object_t (io_thread_),
20  options (options_),
21  _terminating (false),
22  _sent_seqnum (0),
23  _processed_seqnum (0),
24  _owner (NULL),
25  _term_acks (0)
26 {
27 }
28 
30 {
31 }
32 
34 {
35  zmq_assert (!_owner);
36  _owner = owner_;
37 }
38 
40 {
41  // This function may be called from a different thread!
42  _sent_seqnum.add (1);
43 }
44 
46 {
47  // Catch up with counter of processed commands.
48  _processed_seqnum++;
49 
50  // We may have caught up and still have pending terms acks.
51  check_term_acks ();
52 }
53 
55 {
56  // Specify the owner of the object.
57  object_->set_owner (this);
58 
59  // Plug the object into the I/O thread.
60  send_plug (object_);
61 
62  // Take ownership of the object.
63  send_own (this, object_);
64 }
65 
67 {
68  process_term_req (object_);
69 }
70 
72 {
73  // When shutting down we can ignore termination requests from owned
74  // objects. The termination request was already sent to the object.
75  if (_terminating)
76  return;
77 
78  // If not found, we assume that termination request was already sent to
79  // the object so we can safely ignore the request.
80  if (0 == _owned.erase (object_))
81  return;
82 
83  // If I/O object is well and alive let's ask it to terminate.
84  register_term_acks (1);
85 
86  // Note that this object is the root of the (partial shutdown) thus, its
87  // value of linger is used, rather than the value stored by the children.
88  send_term (object_, options.linger.load ());
89 }
90 
92 {
93  // If the object is already being shut down, new owned objects are
94  // immediately asked to terminate. Note that linger is set to zero.
95  if (_terminating) {
96  register_term_acks (1);
97  send_term (object_, 0);
98  return;
99  }
100 
101  // Store the reference to the owned object.
102  _owned.insert (object_);
103 }
104 
106 {
107  // If termination is already underway, there's no point
108  // in starting it anew.
109  if (_terminating)
110  return;
111 
112  // As for the root of the ownership tree, there's no one to terminate it,
113  // so it has to terminate itself.
114  if (!_owner) {
115  process_term (options.linger.load ());
116  return;
117  }
118 
119  // If I am an owned object, I'll ask my owner to terminate me.
120  send_term_req (_owner, this);
121 }
122 
124 {
125  return _terminating;
126 }
127 
128 void zmq::own_t::process_term (int linger_)
129 {
130  // Double termination should never happen.
131  zmq_assert (!_terminating);
132 
133  // Send termination request to all owned objects.
134  for (owned_t::iterator it = _owned.begin (), end = _owned.end (); it != end;
135  ++it)
136  send_term (*it, linger_);
137  register_term_acks (static_cast<int> (_owned.size ()));
138  _owned.clear ();
139 
140  // Start termination process and check whether by chance we cannot
141  // terminate immediately.
142  _terminating = true;
143  check_term_acks ();
144 }
145 
147 {
148  _term_acks += count_;
149 }
150 
152 {
153  zmq_assert (_term_acks > 0);
154  _term_acks--;
155 
156  // This may be a last ack we are waiting for before termination...
157  check_term_acks ();
158 }
159 
161 {
162  unregister_term_ack ();
163 }
164 
166 {
167  if (_terminating && _processed_seqnum == _sent_seqnum.get ()
168  && _term_acks == 0) {
169  // Sanity check. There should be no active children at this point.
170  zmq_assert (_owned.empty ());
171 
172  // The root object has nobody to confirm the termination to.
173  // Other nodes will confirm the termination to the owner.
174  if (_owner)
175  send_term_ack (_owner);
176 
177  // Deallocate the resources.
178  process_destroy ();
179  }
180 }
181 
183 {
184  delete this;
185 }
zmq::own_t::term_child
void term_child(own_t *object_)
Definition: own.cpp:66
zmq::own_t::launch_child
void launch_child(own_t *object_)
Definition: own.cpp:54
zmq::own_t::process_seqnum
void process_seqnum() ZMQ_OVERRIDE
Definition: own.cpp:45
end
GLuint GLuint end
Definition: glcorearb.h:2858
NULL
NULL
Definition: test_security_zap.cpp:405
options
Message * options
Definition: src/google/protobuf/descriptor.cc:3119
precompiled.hpp
zmq_assert
#define zmq_assert(x)
Definition: err.hpp:102
zmq::own_t::process_term_req
void process_term_req(own_t *object_) ZMQ_OVERRIDE
Definition: own.cpp:71
zmq::own_t::process_term
void process_term(int linger_) ZMQ_OVERRIDE
Definition: own.cpp:128
zmq::own_t::check_term_acks
void check_term_acks()
Definition: own.cpp:165
zmq::own_t::is_terminating
bool is_terminating() const
Definition: own.cpp:123
zmq::own_t::~own_t
~own_t() ZMQ_OVERRIDE
Definition: own.cpp:29
zmq::own_t::unregister_term_ack
void unregister_term_ack()
Definition: own.cpp:151
zmq::own_t::process_term_ack
void process_term_ack() ZMQ_OVERRIDE
Definition: own.cpp:160
zmq::own_t::own_t
own_t(zmq::ctx_t *parent_, uint32_t tid_)
zmq::own_t::register_term_acks
void register_term_acks(int count_)
Definition: own.cpp:146
zmq::own_t::terminate
void terminate()
Definition: own.cpp:105
zmq::own_t::inc_seqnum
void inc_seqnum()
Definition: own.cpp:39
io_thread.hpp
zmq::own_t::set_owner
void set_owner(own_t *owner_)
Definition: own.cpp:33
zmq::own_t::process_own
void process_own(own_t *object_) ZMQ_OVERRIDE
Definition: own.cpp:91
zmq::own_t
Definition: own.hpp:21
err.hpp
zmq::own_t::process_destroy
virtual void process_destroy()
Definition: own.cpp:182
own.hpp
false
#define false
Definition: cJSON.c:70
it
MapIter it
Definition: php/ext/google/protobuf/map.c:205
options_
DebugStringOptions options_
Definition: src/google/protobuf/descriptor.cc:2410


libaditof
Author(s):
autogenerated on Wed May 21 2025 02:06:57