libzmq
src
encoder.hpp
Go to the documentation of this file.
1
/* SPDX-License-Identifier: MPL-2.0 */
2
3
#ifndef __ZMQ_ENCODER_HPP_INCLUDED__
4
#define __ZMQ_ENCODER_HPP_INCLUDED__
5
6
#if defined(_MSC_VER)
7
#ifndef NOMINMAX
8
#define NOMINMAX
9
#endif
10
#endif
11
12
#include <stddef.h>
13
#include <string.h>
14
#include <stdlib.h>
15
#include <algorithm>
16
17
#include "
err.hpp
"
18
#include "
i_encoder.hpp
"
19
#include "
msg.hpp
"
20
21
namespace
zmq
22
{
23
// Helper base class for encoders. It implements the state machine that
24
// fills the outgoing buffer. Derived classes should implement individual
25
// state machine actions.
26
27
template
<
typename
T>
class
encoder_base_t
:
public
i_encoder
28
{
29
public
:
30
explicit
encoder_base_t
(
size_t
bufsize_) :
31
_write_pos
(0),
32
_to_write
(0),
33
_next
(
NULL
),
34
_new_msg_flag
(
false
),
35
_buf_size
(bufsize_),
36
_buf
(static_cast<unsigned char *> (malloc (bufsize_))),
37
_in_progress
(
NULL
)
38
{
39
alloc_assert
(
_buf
);
40
}
41
42
~encoder_base_t
()
ZMQ_OVERRIDE
{ free (
_buf
); }
43
44
// The function returns a batch of binary data. The data
45
// are filled to a supplied buffer. If no buffer is supplied (data_
46
// points to NULL) decoder object will provide buffer of its own.
47
size_t
encode
(
unsigned
char
**
data_
,
size_t
size_)
ZMQ_FINAL
48
{
49
unsigned
char
*
buffer
= !*
data_
?
_buf
: *
data_
;
50
const
size_t
buffersize = !*
data_
?
_buf_size
: size_;
51
52
if
(
in_progress
() ==
NULL
)
53
return
0;
54
55
size_t
pos = 0;
56
while
(pos < buffersize) {
57
// If there are no more data to return, run the state machine.
58
// If there are still no data, return what we already have
59
// in the buffer.
60
if
(!
_to_write
) {
61
if
(
_new_msg_flag
) {
62
int
rc =
_in_progress
->
close
();
63
errno_assert
(rc == 0);
64
rc =
_in_progress
->
init
();
65
errno_assert
(rc == 0);
66
_in_progress
=
NULL
;
67
break
;
68
}
69
(
static_cast<
T
*
>
(
this
)->*
_next
) ();
70
}
71
72
// If there are no data in the buffer yet and we are able to
73
// fill whole buffer in a single go, let's use zero-copy.
74
// There's no disadvantage to it as we cannot stuck multiple
75
// messages into the buffer anyway. Note that subsequent
76
// write(s) are non-blocking, thus each single write writes
77
// at most SO_SNDBUF bytes at once not depending on how large
78
// is the chunk returned from here.
79
// As a consequence, large messages being sent won't block
80
// other engines running in the same I/O thread for excessive
81
// amounts of time.
82
if
(!pos && !*
data_
&&
_to_write
>= buffersize) {
83
*
data_
=
_write_pos
;
84
pos =
_to_write
;
85
_write_pos
=
NULL
;
86
_to_write
= 0;
87
return
pos;
88
}
89
90
// Copy data to the buffer. If the buffer is full, return.
91
const
size_t
to_copy = std::min (
_to_write
, buffersize - pos);
92
memcpy (
buffer
+ pos,
_write_pos
, to_copy);
93
pos += to_copy;
94
_write_pos
+= to_copy;
95
_to_write
-= to_copy;
96
}
97
98
*
data_
=
buffer
;
99
return
pos;
100
}
101
102
void
load_msg
(
msg_t
*msg_)
ZMQ_FINAL
103
{
104
zmq_assert
(
in_progress
() ==
NULL
);
105
_in_progress
= msg_;
106
(
static_cast<
T
*
>
(
this
)->*
_next
) ();
107
}
108
109
protected
:
110
// Prototype of state machine action.
111
typedef
void
(
T
::*
step_t
) ();
112
113
// This function should be called from derived class to write the data
114
// to the buffer and schedule next state machine action.
115
void
next_step
(
void
*write_pos_,
116
size_t
to_write_,
117
step_t
next_,
118
bool
new_msg_flag_)
119
{
120
_write_pos
=
static_cast<
unsigned
char
*
>
(write_pos_);
121
_to_write
= to_write_;
122
_next
= next_;
123
_new_msg_flag
= new_msg_flag_;
124
}
125
126
msg_t
*
in_progress
() {
return
_in_progress
; }
127
128
private
:
129
// Where to get the data to write from.
130
unsigned
char
*
_write_pos
;
131
132
// How much data to write before next step should be executed.
133
size_t
_to_write
;
134
135
// Next step. If set to NULL, it means that associated data stream
136
// is dead.
137
step_t
_next
;
138
139
bool
_new_msg_flag
;
140
141
// The buffer for encoded data.
142
const
size_t
_buf_size
;
143
unsigned
char
*
const
_buf
;
144
145
msg_t
*
_in_progress
;
146
147
ZMQ_NON_COPYABLE_NOR_MOVABLE
(
encoder_base_t
)
148
};
149
}
150
151
#endif
zmq::ZMQ_FINAL
Definition:
channel.hpp:17
data_
StringPiece data_
Definition:
bytestream_unittest.cc:60
zmq::encoder_base_t::encoder_base_t
encoder_base_t(size_t bufsize_)
Definition:
encoder.hpp:30
zmq::encoder_base_t::_next
step_t _next
Definition:
encoder.hpp:137
NULL
NULL
Definition:
test_security_zap.cpp:405
zmq_assert
#define zmq_assert(x)
Definition:
err.hpp:102
zmq::encoder_base_t
Definition:
encoder.hpp:27
zmq::encoder_base_t::_write_pos
unsigned char * _write_pos
Definition:
encoder.hpp:130
T
#define T(upbtypeconst, upbtype, ctype, default_value)
zmq::i_encoder
Definition:
i_encoder.hpp:16
zmq::encoder_base_t::next_step
void next_step(void *write_pos_, size_t to_write_, step_t next_, bool new_msg_flag_)
Definition:
encoder.hpp:115
zmq
Definition:
zmq.hpp:229
ZMQ_OVERRIDE
#define ZMQ_OVERRIDE
Definition:
zmq.hpp:91
alloc_assert
#define alloc_assert(x)
Definition:
err.hpp:146
errno_assert
#define errno_assert(x)
Definition:
err.hpp:113
buffer
GLuint buffer
Definition:
glcorearb.h:2939
zmq::msg_t::close
int close()
Definition:
msg.cpp:242
ZMQ_NON_COPYABLE_NOR_MOVABLE
#define ZMQ_NON_COPYABLE_NOR_MOVABLE(classname)
Definition:
macros.hpp:58
zmq::encoder_base_t::encode
size_t encode(unsigned char **data_, size_t size_) ZMQ_FINAL
Definition:
encoder.hpp:47
buffer
Definition:
buffer_processor.h:43
zmq::encoder_base_t::~encoder_base_t
~encoder_base_t() ZMQ_OVERRIDE
Definition:
encoder.hpp:42
zmq::encoder_base_t::load_msg
void load_msg(msg_t *msg_) ZMQ_FINAL
Definition:
encoder.hpp:102
void
typedef void(APIENTRY *GLDEBUGPROCARB)(GLenum source
zmq::msg_t::init
int init()
Definition:
msg.cpp:50
msg.hpp
zmq::encoder_base_t::step_t
void(T::* step_t)()
Definition:
encoder.hpp:111
zmq::encoder_base_t::in_progress
msg_t * in_progress()
Definition:
encoder.hpp:126
err.hpp
zmq::encoder_base_t::_buf_size
const size_t _buf_size
Definition:
encoder.hpp:142
zmq::encoder_base_t::_to_write
size_t _to_write
Definition:
encoder.hpp:133
zmq::encoder_base_t::_in_progress
msg_t * _in_progress
Definition:
encoder.hpp:145
zmq::encoder_base_t::_new_msg_flag
bool _new_msg_flag
Definition:
encoder.hpp:139
false
#define false
Definition:
cJSON.c:70
zmq::msg_t
Definition:
msg.hpp:33
i_encoder.hpp
zmq::encoder_base_t::_buf
unsigned char *const _buf
Definition:
encoder.hpp:143
libaditof
Author(s):
autogenerated on Wed May 21 2025 02:06:51