rb_completion_queue.c
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <ruby/ruby.h>
20 
21 #include "rb_completion_queue.h"
22 
23 #include <ruby/thread.h>
24 
25 #include "rb_grpc.h"
27 
28 #include <grpc/grpc.h>
29 #include <grpc/support/log.h>
30 #include <grpc/support/time.h>
31 
32 /* Used to allow grpc_completion_queue_next call to release the GIL */
33 typedef struct next_call_stack {
37  void* tag;
38  volatile int interrupted;
40 
41 /* Calls grpc_completion_queue_pluck without holding the ruby GIL */
42 static void* grpc_rb_completion_queue_pluck_no_gil(void* param) {
43  next_call_stack* const next_call = (next_call_stack*)param;
45  gpr_timespec deadline;
46  do {
47  deadline = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), increment);
49  next_call->cq, next_call->tag, deadline, NULL);
50  if (next_call->event.type != GRPC_QUEUE_TIMEOUT ||
51  gpr_time_cmp(deadline, next_call->timeout) > 0) {
52  break;
53  }
54  } while (!next_call->interrupted);
55  return NULL;
56 }
57 
58 /* Helper function to free a completion queue. */
60  /* Every function that adds an event to a queue also synchronously plucks
61  that event from the queue, and holds a reference to the Ruby object that
62  holds the queue, so we only get to this point if all of those functions
63  have completed, and the queue is empty */
66 }
67 
68 static void unblock_func(void* param) {
69  next_call_stack* const next_call = (next_call_stack*)param;
70  next_call->interrupted = 1;
71 }
72 
73 /* Does the same thing as grpc_completion_queue_pluck, while properly releasing
74  the GVL and handling interrupts */
76  gpr_timespec deadline, void* reserved) {
77  next_call_stack next_call;
78  MEMZERO(&next_call, next_call_stack, 1);
79  next_call.cq = queue;
80  next_call.timeout = deadline;
81  next_call.tag = tag;
82  next_call.event.type = GRPC_QUEUE_TIMEOUT;
83  (void)reserved;
84  /* Loop until we finish a pluck without an interruption. The internal
85  pluck function runs either until it is interrupted or it gets an
86  event, or time runs out.
87 
88  The basic reason we need this relatively complicated construction is that
89  we need to re-acquire the GVL when an interrupt comes in, so that the ruby
90  interpreter can do what it needs to do with the interrupt. But we also need
91  to get back to plucking when the interrupt has been handled. */
92  do {
93  next_call.interrupted = 0;
94  rb_thread_call_without_gvl(grpc_rb_completion_queue_pluck_no_gil,
95  (void*)&next_call, unblock_func,
96  (void*)&next_call);
97  /* If an interrupt prevented pluck from returning useful information, then
98  any plucks that did complete must have timed out */
99  } while (next_call.interrupted && next_call.event.type == GRPC_QUEUE_TIMEOUT);
100  return next_call.event;
101 }
rb_completion_queue.h
GPR_TIMESPAN
@ GPR_TIMESPAN
Definition: gpr_types.h:45
log.h
rb_grpc_imports.generated.h
next_call_stack::cq
grpc_completion_queue * cq
Definition: rb_completion_queue.c:34
rb_completion_queue_pluck
grpc_event rb_completion_queue_pluck(grpc_completion_queue *queue, void *tag, gpr_timespec deadline, void *reserved)
Definition: rb_completion_queue.c:75
time.h
queue
Definition: sync_test.cc:39
gpr_time_cmp
GPRAPI int gpr_time_cmp(gpr_timespec a, gpr_timespec b)
Definition: src/core/lib/gpr/time.cc:30
tag
static void * tag(intptr_t t)
Definition: bad_client.cc:318
next_call_stack
Definition: rb_completion_queue.c:33
grpc_event
Definition: grpc_types.h:564
grpc_completion_queue
Definition: completion_queue.cc:347
next_call_stack::event
grpc_event event
Definition: rb_completion_queue.c:35
grpc.h
unblock_func
static void unblock_func(void *param)
Definition: rb_completion_queue.c:68
gpr_now
GPRAPI gpr_timespec gpr_now(gpr_clock_type clock)
grpc_rb_completion_queue_pluck_no_gil
static void * grpc_rb_completion_queue_pluck_no_gil(void *param)
Definition: rb_completion_queue.c:42
next_call_stack::interrupted
volatile int interrupted
Definition: rb_completion_queue.c:38
grpc_completion_queue_pluck
GRPCAPI grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cq, void *tag, gpr_timespec deadline, void *reserved)
Definition: completion_queue.cc:1328
rb_grpc.h
queue
struct queue queue
grpc_rb_completion_queue_destroy
void grpc_rb_completion_queue_destroy(grpc_completion_queue *cq)
Definition: rb_completion_queue.c:59
gpr_time_add
GPRAPI gpr_timespec gpr_time_add(gpr_timespec a, gpr_timespec b)
Definition: src/core/lib/gpr/time.cc:135
grpc_completion_queue_destroy
GRPCAPI void grpc_completion_queue_destroy(grpc_completion_queue *cq)
Definition: completion_queue.cc:1424
grpc_completion_queue_shutdown
GRPCAPI void grpc_completion_queue_shutdown(grpc_completion_queue *cq)
Definition: completion_queue.cc:1416
gpr_time_from_millis
GPRAPI gpr_timespec gpr_time_from_millis(int64_t ms, gpr_clock_type clock_type)
Definition: src/core/lib/gpr/time.cc:119
next_call_stack::tag
void * tag
Definition: rb_completion_queue.c:37
gpr_timespec
Definition: gpr_types.h:50
next_call_stack::timeout
gpr_timespec timeout
Definition: rb_completion_queue.c:36
grpc_event::type
grpc_completion_type type
Definition: grpc_types.h:566
GPR_CLOCK_REALTIME
@ GPR_CLOCK_REALTIME
Definition: gpr_types.h:39
GRPC_QUEUE_TIMEOUT
@ GRPC_QUEUE_TIMEOUT
Definition: grpc_types.h:556
next_call_stack
struct next_call_stack next_call_stack
cq
static grpc_completion_queue * cq
Definition: test/core/fling/client.cc:37


grpc
Author(s):
autogenerated on Thu Mar 13 2025 03:01:06