test_non-ros_service_server_complex-srv.py
Go to the documentation of this file.
1 #!/usr/bin/python
2 from __future__ import print_function
3 import sys
4 import socket
5 import time
6 from random import randint
7 from rosbridge_library.util import json
8 
9 
10 
13 
14 tcp_socket_timeout = 10 # seconds
15 max_msg_length = 20000 # bytes
16 
17 rosbridge_ip = "localhost" # hostname or ip
18 rosbridge_port = 9090 # port as integer
19 
20 service_type = "rosbridge_library/TestNestedService" # make sure this matches an existing service type on rosbridge-server (in specified srv_module)
21 service_name = "nested_srv" # service name
22 
23 send_fragment_size = 1000
24 # delay between sends to rosbridge is not needed anymore, if using my version of protocol (uses buffer to collect data from stream)
25 send_fragment_delay = 0.000#1
26 receive_fragment_size = 10
27 receive_message_intervall = 0.0
28 
29 
30 
31 
32 
35 
37  request_object = json.loads(request) # parse string for service request
38  args = request_object["args"] # get parameter field (args)
39 # count = int(args["count"] ) # get parameter(s) as described in corresponding ROS srv-file
40 #
41 # message = "" # calculate service response
42 # for i in range(0,count):
43 # message += str(chr(randint(32,126)))
44 # if i% 100000 == 0:
45 # print count - i, "bytes left to generate"
46 
47  message = {"data": {"data": 42.0}}
48 
49  """
50  IMPORTANT!
51  use base64 encoding to avoid JSON-parsing problems!
52  --> use .decode("base64","strict") at client side
53  """
54  #message = message.encode('base64','strict')
55  service_response_data = message # service response (as defined in srv-file)
56 
57  response_object = { "op": "service_response",
58  "id": request_object["id"],
59  "data": service_response_data # put service response in "data"-field of response object (in this case it's twice "data", because response value is also named data (in srv-file)
60  }
61  response_message = json.dumps(response_object)
62  return response_message
63 
64 
65 
66 
67 
68 
71 
72 buffer = ""
73 
75  tcp_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # connect to rosbridge
76  tcp_sock.settimeout(tcp_socket_timeout)
77  tcp_sock.connect((rosbridge_ip, rosbridge_port))
78  return tcp_sock
79 
80 def advertise_service(): # advertise service
81  advertise_message_object = {"op":"advertise_service",
82  "type": service_type,
83  "service": service_name,
84  "fragment_size": receive_fragment_size,
85  "message_intervall": receive_message_intervall
86  }
87  advertise_message = json.dumps(advertise_message_object)
88  tcp_socket.send(str(advertise_message))
89 
90 def unadvertise_service(): # unadvertise service
91  unadvertise_message_object = {"op":"unadvertise_service",
92  "service": service_name
93  }
94  unadvertise_message = json.dumps(unadvertise_message_object)
95  tcp_socket.send(str(unadvertise_message))
96 
97 def wait_for_service_request(): # receive data from rosbridge
98  data = None
99  global buffer
100 
101  try:
102  done = False
103  global buffer
104  while not done:
105  incoming = tcp_socket.recv(max_msg_length) # get data from socket
106  if incoming == '':
107  print("connection closed by peer")
108  sys.exit(1)
109  buffer = buffer + incoming # append data to buffer
110  try: # try to parse JSON from buffer
111  data_object = json.loads(buffer)
112  if data_object["op"] == "call_service":
113  data = buffer
114  done = True
115  return data # if parsing was successful --> return data string
116  except Exception as e:
117  #print "direct_access error:"
118  #print e
119  pass
120 
121  #print "trying to defragment"
122  try: # opcode was not "call_service" -> try to defragment
123  result_string = buffer.split("}{") # split buffer into fragments and re-fill with curly brackets
124  result = []
125  for fragment in result_string:
126  if fragment[0] != "{":
127  fragment = "{"+fragment
128  if fragment[len(fragment)-1] != "}":
129  fragment = fragment + "}"
130  result.append(json.loads(fragment))
131 
132  try: # try to defragment when received all fragments
133  fragment_count = len(result)
134  announced = int(result[0]["total"])
135 
136  if fragment_count == announced:
137  reconstructed = ""
138  sorted_result = [None] * fragment_count # sort fragments..
139  unsorted_result = []
140  for fragment in result:
141  unsorted_result.append(fragment)
142  sorted_result[int(fragment["num"])] = fragment
143 
144  for fragment in sorted_result: # reconstruct from fragments
145  reconstructed = reconstructed + fragment["data"]
146 
147  #print "reconstructed", reconstructed
148  buffer = "" # empty buffer
149  done = True
150  print("reconstructed message from", len(result), "fragments")
151  #print reconstructed
152  return reconstructed
153  except Exception as e:
154  print("not possible to defragment:", buffer)
155  print(e)
156  except Exception as e:
157  print("defrag_error:", buffer)
158  print(e)
159  pass
160  except Exception as e:
161  #print "network-error(?):", e
162  pass
163  return data
164 
165 def send_service_response(response): # send response to rosbridge
166  tcp_socket.send(response)
167 
168 def list_of_fragments(full_message, fragment_size): # create fragment messages for a huge message
169  message_id = randint(0,64000) # generate random message id
170  fragments = [] # generate list of data fragments
171  cursor = 0
172  while cursor < len(full_message):
173  fragment_begin = cursor
174  if len(full_message) < cursor + fragment_size:
175  fragment_end = len(full_message)
176  cursor = len(full_message)
177  else:
178  fragment_end = cursor + fragment_size
179  cursor += fragment_size
180  fragment = full_message[fragment_begin:fragment_end]
181  fragments.append(fragment)
182 
183  fragmented_messages_list = [] # generate list of fragmented messages (including headers)
184  if len(fragments) > 1:
185  for count, fragment in enumerate(fragments): # iterate through list and have index counter
186  fragmented_message_object = {"op":"fragment", # create python-object for each fragment message
187  "id": str(message_id),
188  "data": str(fragment),
189  "num": count,
190  "total": len(fragments)
191  }
192  fragmented_message = json.dumps(fragmented_message_object) # create JSON-object from python-object for each fragment message
193  fragmented_messages_list.append(fragmented_message) # append JSON-object to list of fragmented messages
194  else: # if only 1 fragment --> do not send as fragment, but as service_response
195  fragmented_messages_list.append(str(fragment))
196  return fragmented_messages_list # return list of 'ready-to-send' fragmented messages
197 
198 
199 
200 
201 
204 
205 tcp_socket = connect_tcp_socket() # open tcp_socket
206 advertise_service() # advertise service in ROS (via rosbridge)
207 print("service provider started and waiting for requests")
208 
209 try: # allows to catch KeyboardInterrupt
210  while True: # loop forever (or until ctrl-c is pressed)
211  data = None
212  try: # allows to catch any Exception (network, json, ..)
213  data = wait_for_service_request() # receive request from rosbridge
214  if data == '': # exit on empty string
215  break
216  elif data != None and len(data) > 0: # received service_request (or at least some data..)
217  response = calculate_service_response(data) # generate service_response
218 
219  print("response calculated, now splitting into fragments..")
220  fragment_list = list_of_fragments(response, send_fragment_size) # generate fragments to send to rosbridge
221 
222  print("sending", len(fragment_list), "messages as response")
223  for fragment in fragment_list:
224  #print "sending:" ,fragment
225  send_service_response(fragment) # send service_response to rosbridge (or fragments; just send any list entry)
226  time.sleep(send_fragment_delay) # (not needed if using patched rosbridge protocol.py)
227  except Exception as e:
228  print(e)
229  pass
230 except KeyboardInterrupt:
231  try:
232  unadvertise_service() # unadvertise service
233  tcp_socket.close() # close tcp_socket
234  except Exception as e:
235  print(e)
236  print("non-ros_service_server stopped because user pressed \"Ctrl-C\"")
test_non-ros_service_server_complex-srv.advertise_service
def advertise_service()
Definition: test_non-ros_service_server_complex-srv.py:80
rosbridge_library.util
Definition: src/rosbridge_library/util/__init__.py:1
test_non-ros_service_server_complex-srv.list_of_fragments
def list_of_fragments(full_message, fragment_size)
Definition: test_non-ros_service_server_complex-srv.py:168
test_non-ros_service_server_complex-srv.send_service_response
def send_service_response(response)
Definition: test_non-ros_service_server_complex-srv.py:165
test_non-ros_service_server_complex-srv.unadvertise_service
def unadvertise_service()
Definition: test_non-ros_service_server_complex-srv.py:90
test_non-ros_service_server_complex-srv.calculate_service_response
def calculate_service_response(request)
variables end ##########################################
Definition: test_non-ros_service_server_complex-srv.py:36
test_non-ros_service_server_complex-srv.wait_for_service_request
def wait_for_service_request()
Definition: test_non-ros_service_server_complex-srv.py:97
test_non-ros_service_server_complex-srv.connect_tcp_socket
def connect_tcp_socket()
Definition: test_non-ros_service_server_complex-srv.py:74


rosbridge_library
Author(s): Jonathan Mace
autogenerated on Tue Oct 3 2023 02:12:45