channel.cpp
Go to the documentation of this file.
1 /* SPDX-License-Identifier: MPL-2.0 */
2 
3 #include "precompiled.hpp"
4 #include "macros.hpp"
5 #include "channel.hpp"
6 #include "err.hpp"
7 #include "pipe.hpp"
8 #include "msg.hpp"
9 
10 zmq::channel_t::channel_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
11  socket_base_t (parent_, tid_, sid_, true), _pipe (NULL)
12 {
13  options.type = ZMQ_CHANNEL;
14 }
15 
16 zmq::channel_t::~channel_t ()
17 {
18  zmq_assert (!_pipe);
19 }
20 
21 void zmq::channel_t::xattach_pipe (pipe_t *pipe_,
22  bool subscribe_to_all_,
23  bool locally_initiated_)
24 {
25  LIBZMQ_UNUSED (subscribe_to_all_);
26  LIBZMQ_UNUSED (locally_initiated_);
27 
28  zmq_assert (pipe_ != NULL);
29 
30  // ZMQ_PAIR socket can only be connected to a single peer.
31  // The socket rejects any further connection requests.
32  if (_pipe == NULL)
33  _pipe = pipe_;
34  else
35  pipe_->terminate (false);
36 }
37 
38 void zmq::channel_t::xpipe_terminated (pipe_t *pipe_)
39 {
40  if (pipe_ == _pipe)
41  _pipe = NULL;
42 }
43 
44 void zmq::channel_t::xread_activated (pipe_t *)
45 {
46  // There's just one pipe. No lists of active and inactive pipes.
47  // There's nothing to do here.
48 }
49 
50 void zmq::channel_t::xwrite_activated (pipe_t *)
51 {
52  // There's just one pipe. No lists of active and inactive pipes.
53  // There's nothing to do here.
54 }
55 
56 int zmq::channel_t::xsend (msg_t *msg_)
57 {
58  // CHANNEL sockets do not allow multipart data (ZMQ_SNDMORE)
59  if (msg_->flags () & msg_t::more) {
60  errno = EINVAL;
61  return -1;
62  }
63 
64  if (!_pipe || !_pipe->write (msg_)) {
65  errno = EAGAIN;
66  return -1;
67  }
68 
69  _pipe->flush ();
70 
71  // Detach the original message from the data buffer.
72  const int rc = msg_->init ();
73  errno_assert (rc == 0);
74 
75  return 0;
76 }
77 
78 int zmq::channel_t::xrecv (msg_t *msg_)
79 {
80  // Deallocate old content of the message.
81  int rc = msg_->close ();
82  errno_assert (rc == 0);
83 
84  if (!_pipe) {
85  // Initialise the output parameter to be a 0-byte message.
86  rc = msg_->init ();
87  errno_assert (rc == 0);
88 
89  errno = EAGAIN;
90  return -1;
91  }
92 
93  // Drop any messages with more flag
94  bool read = _pipe->read (msg_);
95  while (read && msg_->flags () & msg_t::more) {
96  // drop all frames of the current multi-frame message
97  read = _pipe->read (msg_);
98  while (read && msg_->flags () & msg_t::more)
99  read = _pipe->read (msg_);
100 
101  // get the new message
102  if (read)
103  read = _pipe->read (msg_);
104  }
105 
106  if (!read) {
107  // Initialise the output parameter to be a 0-byte message.
108  rc = msg_->init ();
109  errno_assert (rc == 0);
110 
111  errno = EAGAIN;
112  return -1;
113  }
114 
115  return 0;
116 }
117 
118 bool zmq::channel_t::xhas_in ()
119 {
120  if (!_pipe)
121  return false;
122 
123  return _pipe->check_read ();
124 }
125 
126 bool zmq::channel_t::xhas_out ()
127 {
128  if (!_pipe)
129  return false;
130 
131  return _pipe->check_write ();
132 }
NULL
NULL
Definition: test_security_zap.cpp:405
options
Message * options
Definition: src/google/protobuf/descriptor.cc:3119
EINVAL
#define EINVAL
Definition: errno.hpp:25
EAGAIN
#define EAGAIN
Definition: errno.hpp:14
precompiled.hpp
zmq_assert
#define zmq_assert(x)
Definition: err.hpp:102
errno
int errno
errno_assert
#define errno_assert(x)
Definition: err.hpp:113
ZMQ_CHANNEL
#define ZMQ_CHANNEL
Definition: zmq_draft.h:22
macros.hpp
LIBZMQ_UNUSED
#define LIBZMQ_UNUSED(object)
Definition: macros.hpp:6
pipe.hpp
msg.hpp
err.hpp
true
#define true
Definition: cJSON.c:65
channel.hpp


libaditof
Author(s):
autogenerated on Wed May 21 2025 02:06:48