test_non-ros_service_server_fragmented.py
Go to the documentation of this file.
1 #!/usr/bin/python
2 from __future__ import print_function
3 import sys
4 import socket
5 import time
6 from random import randint
7 from rosbridge_library.util import json
8 
9 
10 
13 
14 tcp_socket_timeout = 10 # seconds
15 max_msg_length = 20000 # bytes
16 
17 rosbridge_ip = "localhost" # hostname or ip
18 rosbridge_port = 9090 # port as integer
19 
20 service_type = "rosbridge_library/SendBytes" # make sure this matches an existing service type on rosbridge-server (in specified srv_module)
21 service_name = "send_bytes" # service name
22 
23 send_fragment_size = 1000
24 # delay between sends to rosbridge is not needed anymore, if using my version of protocol (uses buffer to collect data from stream)
25 send_fragment_delay = 0.000#1
26 receive_fragment_size = 10
27 receive_message_intervall = 0.0
28 
29 
30 
31 
32 
35 
37  request_object = json.loads(request) # parse string for service request
38  args = request_object["args"] # get parameter field (args)
39  count = int(args["count"] ) # get parameter(s) as described in corresponding ROS srv-file
40 
41  message = ""
42  # calculate service response
43  for i in range(0,count):
44  #message += str(chr(randint(32,126)))
45  message+= str(chr(randint(32,126)))
46  if i% 100000 == 0:
47  print(count - i, "bytes left to generate")
48 
49  """
50  IMPORTANT!
51  use base64 encoding to avoid JSON-parsing problems!
52  --> use .decode("base64","strict") at client side
53  """
54  message = message.encode('base64','strict')
55  service_response_data = { "data": message} # service response (as defined in srv-file)
56 
57  response_object = { "op": "service_response",
58  "id": request_object["id"],
59  "service": service_name,
60  "values": service_response_data # put service response in "data"-field of response object (in this case it's twice "data", because response value is also named data (in srv-file)
61  }
62  response_message = json.dumps(response_object)
63  return response_message
64 
65 
66 
67 
68 
69 
72 
73 buffer = ""
74 
76  tcp_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # connect to rosbridge
77  tcp_sock.settimeout(tcp_socket_timeout)
78  tcp_sock.connect((rosbridge_ip, rosbridge_port))
79  return tcp_sock
80 
81 def advertise_service(): # advertise service
82  advertise_message_object = {"op":"advertise_service",
83  "type": service_type,
84  "service": service_name,
85  "fragment_size": receive_fragment_size,
86  "message_intervall": receive_message_intervall
87  }
88  advertise_message = json.dumps(advertise_message_object)
89  tcp_socket.send(str(advertise_message))
90 
91 def unadvertise_service(): # unadvertise service
92  unadvertise_message_object = {"op":"unadvertise_service",
93  "service": service_name
94  }
95  unadvertise_message = json.dumps(unadvertise_message_object)
96  tcp_socket.send(str(unadvertise_message))
97 
98 def wait_for_service_request(): # receive data from rosbridge
99  data = None
100  global buffer
101 
102  try:
103  done = False
104  global buffer
105  while not done:
106  incoming = tcp_socket.recv(max_msg_length) # get data from socket
107  if incoming == '':
108  print("connection closed by peer")
109  sys.exit(1)
110  buffer = buffer + incoming # append data to buffer
111  try: # try to parse JSON from buffer
112  data_object = json.loads(buffer)
113  if data_object["op"] == "call_service":
114  data = buffer
115  done = True
116  return data # if parsing was successful --> return data string
117  except Exception as e:
118  #print "direct_access error:"
119  #print e
120  pass
121 
122  #print "trying to defragment"
123  try: # opcode was not "call_service" -> try to defragment
124  result_string = buffer.split("}{") # split buffer into fragments and re-fill with curly brackets
125  result = []
126  for fragment in result_string:
127  if fragment[0] != "{":
128  fragment = "{"+fragment
129  if fragment[len(fragment)-1] != "}":
130  fragment = fragment + "}"
131  result.append(json.loads(fragment))
132 
133  try: # try to defragment when received all fragments
134  fragment_count = len(result)
135  announced = int(result[0]["total"])
136 
137  if fragment_count == announced:
138  reconstructed = ""
139  sorted_result = [None] * fragment_count # sort fragments..
140  unsorted_result = []
141  for fragment in result:
142  unsorted_result.append(fragment)
143  sorted_result[int(fragment["num"])] = fragment
144 
145  for fragment in sorted_result: # reconstruct from fragments
146  reconstructed = reconstructed + fragment["data"]
147 
148  #print "reconstructed", reconstructed
149  buffer = "" # empty buffer
150  done = True
151  print("reconstructed message from", len(result), "fragments")
152  #print reconstructed
153  return reconstructed
154  except Exception as e:
155  print("not possible to defragment:", buffer)
156  print(e)
157  except Exception as e:
158  print("defrag_error:", buffer)
159  print(e)
160  pass
161  except Exception as e:
162  #print "network-error(?):", e
163  pass
164  return data
165 
166 def send_service_response(response): # send response to rosbridge
167  tcp_socket.send(response)
168 
169 def list_of_fragments(full_message, fragment_size): # create fragment messages for a huge message
170  message_id = randint(0,64000) # generate random message id
171  fragments = [] # generate list of data fragments
172  cursor = 0
173  while cursor < len(full_message):
174  fragment_begin = cursor
175  if len(full_message) < cursor + fragment_size:
176  fragment_end = len(full_message)
177  cursor = len(full_message)
178  else:
179  fragment_end = cursor + fragment_size
180  cursor += fragment_size
181  fragment = full_message[fragment_begin:fragment_end]
182  fragments.append(fragment)
183 
184  fragmented_messages_list = [] # generate list of fragmented messages (including headers)
185  if len(fragments) > 1:
186  for count, fragment in enumerate(fragments): # iterate through list and have index counter
187  fragmented_message_object = {"op":"fragment", # create python-object for each fragment message
188  "id": str(message_id),
189  "data": str(fragment),
190  "num": count,
191  "total": len(fragments)
192  }
193  fragmented_message = json.dumps(fragmented_message_object) # create JSON-object from python-object for each fragment message
194  fragmented_messages_list.append(fragmented_message) # append JSON-object to list of fragmented messages
195  else: # if only 1 fragment --> do not send as fragment, but as service_response
196  fragmented_messages_list.append(str(fragment))
197  return fragmented_messages_list # return list of 'ready-to-send' fragmented messages
198 
199 
200 
201 
202 
205 
206 tcp_socket = connect_tcp_socket() # open tcp_socket
207 advertise_service() # advertise service in ROS (via rosbridge)
208 print("service provider started and waiting for requests")
209 
210 try: # allows to catch KeyboardInterrupt
211  while True: # loop forever (or until ctrl-c is pressed)
212  data = None
213  try: # allows to catch any Exception (network, json, ..)
214  data = wait_for_service_request() # receive request from rosbridge
215  if data == '': # exit on empty string
216  break
217  elif data != None and len(data) > 0: # received service_request (or at least some data..)
218  response = calculate_service_response(data) # generate service_response
219 
220  print("response calculated, now splitting into fragments..")
221  fragment_list = list_of_fragments(response, send_fragment_size) # generate fragments to send to rosbridge
222 
223  print("sending", len(fragment_list), "messages as response")
224  for fragment in fragment_list:
225  #print "sending:" ,fragment
226  send_service_response(fragment) # send service_response to rosbridge (or fragments; just send any list entry)
227  time.sleep(send_fragment_delay) # (not needed if using patched rosbridge protocol.py)
228  except Exception as e:
229  print(e)
230  pass
231 except KeyboardInterrupt:
232  try:
233  unadvertise_service() # unadvertise service
234  tcp_socket.close() # close tcp_socket
235  except Exception as e:
236  print(e)
237  print("non-ros_service_server stopped because user pressed \"Ctrl-C\"")
def calculate_service_response(request)
variables end ##########################################


rosbridge_library
Author(s): Jonathan Mace
autogenerated on Fri Oct 21 2022 02:45:18