Go to the documentation of this file.
29 #include "absl/base/thread_annotations.h"
30 #include "absl/strings/str_cat.h"
31 #include "absl/strings/string_view.h"
32 #include "absl/types/optional.h"
65 #define STAGING_BUFFER_SIZE 8192
70 struct secure_endpoint {
76 size_t leftover_nslices)
79 zero_copy_protector(zero_copy_protector) {
85 for (
size_t i = 0;
i < leftover_nslices;
i++) {
95 self_reservation = memory_owner.MakeReservation(
sizeof(*
this));
96 if (zero_copy_protector) {
100 read_staging_buffer =
102 write_staging_buffer =
105 has_posted_reclaimer.store(
false, std::memory_order_relaxed);
106 min_progress_size = 1;
145 std::atomic<bool> has_posted_reclaimer;
146 int min_progress_size;
154 static void destroy(secure_endpoint* ep) {
delete ep; }
157 #define SECURE_ENDPOINT_UNREF(ep, reason) \
158 secure_endpoint_unref((ep), (reason), __FILE__, __LINE__)
159 #define SECURE_ENDPOINT_REF(ep, reason) \
160 secure_endpoint_ref((ep), (reason), __FILE__, __LINE__)
166 "SECENDP unref %p : %s %" PRIdPTR
" -> %" PRIdPTR, ep, reason, val,
179 "SECENDP ref %p : %s %" PRIdPTR
" -> %" PRIdPTR, ep, reason, val,
185 #define SECURE_ENDPOINT_UNREF(ep, reason) secure_endpoint_unref((ep))
186 #define SECURE_ENDPOINT_REF(ep, reason) secure_endpoint_ref((ep))
197 if (!ep->has_posted_reclaimer) {
199 ep->has_posted_reclaimer.exchange(
true, std::memory_order_relaxed);
200 ep->memory_owner.PostReclaimer(
204 if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
206 "secure endpoint: benign reclamation to free memory");
212 temp_read_slice = ep->read_staging_buffer;
214 ep->read_mu.Unlock();
217 temp_write_slice = ep->write_staging_buffer;
219 ep->write_mu.Unlock();
223 ep->has_posted_reclaimer.exchange(
false, std::memory_order_relaxed);
234 ep->read_staging_buffer =
243 for (
i = 0;
i < ep->read_buffer->count;
i++) {
250 ep->read_buffer =
nullptr;
259 secure_endpoint* ep =
static_cast<secure_endpoint*
>(user_data);
269 "Secure read failed", &
error, 1));
273 if (ep->zero_copy_protector !=
nullptr) {
275 int min_progress_size = 1;
284 ep->zero_copy_protector, &ep->source_buffer, ep->read_buffer,
286 min_progress_size =
std::max(1, min_progress_size);
287 ep->min_progress_size =
result !=
TSI_OK ? 1 : min_progress_size;
291 for (
i = 0;
i < ep->source_buffer.count;
i++) {
292 grpc_slice encrypted = ep->source_buffer.slices[
i];
296 while (message_size > 0 || keep_looping) {
297 size_t unprotected_buffer_size_written =
298 static_cast<size_t>(
end -
cur);
299 size_t processed_message_size = message_size;
302 ep->protector, message_bytes, &processed_message_size,
cur,
303 &unprotected_buffer_size_written);
310 message_bytes += processed_message_size;
311 message_size -= processed_message_size;
312 cur += unprotected_buffer_size_written;
321 }
else if (unprotected_buffer_size_written > 0) {
334 &ep->read_staging_buffer,
359 secure_endpoint* ep =
reinterpret_cast<secure_endpoint*
>(secure_ep);
365 if (ep->leftover_bytes.count) {
373 ep->min_progress_size);
380 ep->write_staging_buffer =
393 secure_endpoint* ep =
reinterpret_cast<secure_endpoint*
>(secure_ep);
411 if (ep->zero_copy_protector !=
nullptr) {
418 while (
slices->length >
static_cast<size_t>(max_frame_size) &&
421 static_cast<size_t>(max_frame_size),
422 &ep->protector_staging_buffer);
424 ep->zero_copy_protector, &ep->protector_staging_buffer,
429 ep->zero_copy_protector,
slices, &ep->output_buffer);
438 while (message_size > 0) {
439 size_t protected_buffer_size_to_send =
static_cast<size_t>(
end -
cur);
440 size_t processed_message_size = message_size;
443 &processed_message_size,
cur,
444 &protected_buffer_size_to_send);
451 message_bytes += processed_message_size;
452 message_size -= processed_message_size;
453 cur += protected_buffer_size_to_send;
462 size_t still_pending_size;
464 size_t protected_buffer_size_to_send =
static_cast<size_t>(
end -
cur);
467 ep->protector,
cur, &protected_buffer_size_to_send,
468 &still_pending_size);
471 cur += protected_buffer_size_to_send;
475 }
while (still_pending_size > 0);
480 &ep->write_staging_buffer,
503 secure_endpoint* ep =
reinterpret_cast<secure_endpoint*
>(secure_ep);
508 secure_endpoint* ep =
reinterpret_cast<secure_endpoint*
>(secure_ep);
509 ep->memory_owner.Reset();
515 secure_endpoint* ep =
reinterpret_cast<secure_endpoint*
>(secure_ep);
521 secure_endpoint* ep =
reinterpret_cast<secure_endpoint*
>(secure_ep);
527 secure_endpoint* ep =
reinterpret_cast<secure_endpoint*
>(secure_ep);
532 secure_endpoint* ep =
reinterpret_cast<secure_endpoint*
>(secure_ep);
537 secure_endpoint* ep =
reinterpret_cast<secure_endpoint*
>(secure_ep);
542 secure_endpoint* ep =
reinterpret_cast<secure_endpoint*
>(secure_ep);
547 secure_endpoint* ep =
reinterpret_cast<secure_endpoint*
>(secure_ep);
568 secure_endpoint* ep =
569 new secure_endpoint(&
vtable, protector, zero_copy_protector, to_wrap,
570 leftover_slices, channel_args, leftover_nslices);
static void flush_write_staging_buffer(secure_endpoint *ep, uint8_t **cur, uint8_t **end) ABSL_EXCLUSIVE_LOCKS_REQUIRED(ep -> write_mu)
#define GRPC_CLOSURE_INIT(closure, cb, cb_arg, scheduler)
static void maybe_post_reclaimer(secure_endpoint *ep)
static void on_read(void *user_data, grpc_error_handle error)
static void endpoint_add_to_pollset_set(grpc_endpoint *secure_ep, grpc_pollset_set *pollset_set)
GPRAPI void gpr_mu_unlock(gpr_mu *mu)
tsi_result tsi_zero_copy_grpc_protector_unprotect(tsi_zero_copy_grpc_protector *self, grpc_slice_buffer *protected_slices, grpc_slice_buffer *unprotected_slices, int *min_progress_size)
#define gpr_atm_no_barrier_load(p)
const grpc_slice & grpc_slice_ref_internal(const grpc_slice &slice)
static void read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf)
std::string StrCat(const AlphaNum &a, const AlphaNum &b)
static void flush_read_staging_buffer(secure_endpoint *ep, uint8_t **cur, uint8_t **end) ABSL_EXCLUSIVE_LOCKS_REQUIRED(ep -> read_mu)
void tsi_zero_copy_grpc_protector_destroy(tsi_zero_copy_grpc_protector *self)
char * grpc_dump_slice(const grpc_slice &s, uint32_t flags)
grpc_core::TraceFlag grpc_trace_secure_endpoint(false, "secure_endpoint")
static void endpoint_shutdown(grpc_endpoint *secure_ep, grpc_error_handle why)
#define GPR_TIMER_SCOPE(tag, important)
tsi_result tsi_zero_copy_grpc_protector_protect(tsi_zero_copy_grpc_protector *self, grpc_slice_buffer *unprotected_slices, grpc_slice_buffer *protected_slices)
struct grpc_pollset_set grpc_pollset_set
GPRAPI void gpr_free(void *ptr)
void grpc_endpoint_read(grpc_endpoint *ep, grpc_slice_buffer *slices, grpc_closure *cb, bool urgent, int min_progress_size)
static void endpoint_add_to_pollset(grpc_endpoint *secure_ep, grpc_pollset *pollset)
static void endpoint_write(grpc_endpoint *secure_ep, grpc_slice_buffer *slices, grpc_closure *cb, void *arg, int max_frame_size)
absl::string_view grpc_endpoint_get_peer(grpc_endpoint *ep)
void grpc_endpoint_delete_from_pollset_set(grpc_endpoint *ep, grpc_pollset_set *pollset_set)
#define GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(desc, errs, count)
#define ABSL_GUARDED_BY(x)
static void endpoint_destroy(grpc_endpoint *secure_ep)
#define GRPC_TRACE_FLAG_ENABLED(f)
static int endpoint_get_fd(grpc_endpoint *secure_ep)
tsi_result tsi_frame_protector_protect_flush(tsi_frame_protector *self, unsigned char *protected_output_frames, size_t *protected_output_frames_size, size_t *still_pending_size)
tsi_result tsi_frame_protector_unprotect(tsi_frame_protector *self, const unsigned char *protected_frames_bytes, size_t *protected_frames_bytes_size, unsigned char *unprotected_bytes, size_t *unprotected_bytes_size)
GPRAPI void gpr_mu_destroy(gpr_mu *mu)
constexpr bool has_value() const noexcept
grpc_error_handle grpc_set_tsi_error_result(grpc_error_handle error, tsi_result result)
grpc_endpoint * grpc_secure_endpoint_create(struct tsi_frame_protector *protector, struct tsi_zero_copy_grpc_protector *zero_copy_protector, grpc_endpoint *to_wrap, grpc_slice *leftover_slices, const grpc_channel_args *channel_args, size_t leftover_nslices)
GPRAPI void gpr_log(const char *file, int line, gpr_log_severity severity, const char *format,...) GPR_PRINT_FORMAT_CHECK(4
GPRAPI void gpr_mu_init(gpr_mu *mu)
#define ABSL_EXCLUSIVE_LOCKS_REQUIRED(...)
static const grpc_endpoint_vtable vtable
MemoryQuotaRefPtr memory_quota()
#define GRPC_SLICE_START_PTR(slice)
GPRAPI grpc_slice grpc_empty_slice(void)
GPRAPI void gpr_mu_lock(gpr_mu *mu)
GPRAPI void grpc_slice_buffer_swap(grpc_slice_buffer *a, grpc_slice_buffer *b)
void grpc_endpoint_shutdown(grpc_endpoint *ep, grpc_error_handle why)
static void secure_endpoint_unref(secure_endpoint *ep, const char *reason, const char *file, int line)
#define SECURE_ENDPOINT_REF(ep, reason)
An automatic releasing reservation of memory.
#define GRPC_SLICE_END_PTR(slice)
void grpc_endpoint_destroy(grpc_endpoint *ep)
GPRAPI void grpc_slice_buffer_init(grpc_slice_buffer *sb)
static void endpoint_delete_from_pollset_set(grpc_endpoint *secure_ep, grpc_pollset_set *pollset_set)
#define GRPC_ERROR_CREATE_FROM_STATIC_STRING(desc)
#define GRPC_SLICE_LENGTH(slice)
#define SECURE_ENDPOINT_UNREF(ep, reason)
static absl::string_view endpoint_get_local_address(grpc_endpoint *secure_ep)
GPRAPI void grpc_slice_buffer_add(grpc_slice_buffer *sb, grpc_slice slice)
int grpc_endpoint_get_fd(grpc_endpoint *ep)
static void destroy(secure_endpoint *ep)
ResourceQuotaRefPtr ResourceQuotaFromChannelArgs(const grpc_channel_args *args)
const char * tsi_result_to_string(tsi_result result)
void grpc_endpoint_add_to_pollset_set(grpc_endpoint *ep, grpc_pollset_set *pollset_set)
tsi_result tsi_frame_protector_protect(tsi_frame_protector *self, const unsigned char *unprotected_bytes, size_t *unprotected_bytes_size, unsigned char *protected_output_frames, size_t *protected_output_frames_size)
#define STAGING_BUFFER_SIZE
bool grpc_endpoint_can_track_err(grpc_endpoint *ep)
static void secure_endpoint_ref(secure_endpoint *ep, const char *reason, const char *file, int line)
static void endpoint_read(grpc_endpoint *secure_ep, grpc_slice_buffer *slices, grpc_closure *cb, bool urgent, int)
GPRAPI grpc_slice grpc_slice_split_head(grpc_slice *s, size_t split)
GPRAPI void grpc_slice_buffer_move_first(grpc_slice_buffer *src, size_t n, grpc_slice_buffer *dst)
static void Run(const DebugLocation &location, grpc_closure *closure, grpc_error_handle error)
void grpc_slice_buffer_destroy_internal(grpc_slice_buffer *sb)
void grpc_endpoint_write(grpc_endpoint *ep, grpc_slice_buffer *slices, grpc_closure *cb, void *arg, int max_frame_size)
GPRAPI void gpr_ref_init(gpr_refcount *r, int n)
void tsi_frame_protector_destroy(tsi_frame_protector *self)
absl::string_view grpc_endpoint_get_local_address(grpc_endpoint *ep)
GPRAPI int gpr_unref(gpr_refcount *r)
void grpc_endpoint_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset)
static void write_cb(uv_write_t *req, int status)
static absl::string_view endpoint_get_peer(grpc_endpoint *secure_ep)
GPRAPI void gpr_ref(gpr_refcount *r)
void grpc_slice_buffer_reset_and_unref_internal(grpc_slice_buffer *sb)
Reservation request - how much memory do we want to allocate?
static bool endpoint_can_track_err(grpc_endpoint *secure_ep)
OPENSSL_EXPORT pem_password_cb * cb
static void call_read_cb(secure_endpoint *ep, grpc_error_handle error)
void grpc_slice_unref_internal(const grpc_slice &slice)
GPRAPI size_t grpc_slice_buffer_add_indexed(grpc_slice_buffer *sb, grpc_slice slice)
#define GRPC_ERROR_IS_NONE(err)
grpc
Author(s):
autogenerated on Thu Mar 13 2025 03:01:15