Go to the documentation of this file.00001
00002 from __future__ import print_function
00003 import sys
00004 import socket
00005 import time
00006 from random import randint
00007 from rosbridge_library.util import json
00008
00009
00010
00011
00012
00013
00014 tcp_socket_timeout = 10
00015 max_msg_length = 20000
00016
00017 rosbridge_ip = "localhost"
00018 rosbridge_port = 9090
00019
00020 service_type = "rosbridge_library/TestNestedService"
00021 service_name = "nested_srv"
00022
00023 send_fragment_size = 1000
00024
00025 send_fragment_delay = 0.000
00026 receive_fragment_size = 10
00027 receive_message_intervall = 0.0
00028
00029
00030
00031
00032
00033
00034
00035
00036 def calculate_service_response(request):
00037 request_object = json.loads(request)
00038 args = request_object["args"]
00039
00040
00041
00042
00043
00044
00045
00046
00047 message = {"data": {"data": 42.0}}
00048
00049 """
00050 IMPORTANT!
00051 use base64 encoding to avoid JSON-parsing problems!
00052 --> use .decode("base64","strict") at client side
00053 """
00054
00055 service_response_data = message
00056
00057 response_object = { "op": "service_response",
00058 "id": request_object["id"],
00059 "data": service_response_data
00060 }
00061 response_message = json.dumps(response_object)
00062 return response_message
00063
00064
00065
00066
00067
00068
00069
00070
00071
00072 buffer = ""
00073
00074 def connect_tcp_socket():
00075 tcp_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
00076 tcp_sock.settimeout(tcp_socket_timeout)
00077 tcp_sock.connect((rosbridge_ip, rosbridge_port))
00078 return tcp_sock
00079
00080 def advertise_service():
00081 advertise_message_object = {"op":"advertise_service",
00082 "type": service_type,
00083 "service": service_name,
00084 "fragment_size": receive_fragment_size,
00085 "message_intervall": receive_message_intervall
00086 }
00087 advertise_message = json.dumps(advertise_message_object)
00088 tcp_socket.send(str(advertise_message))
00089
00090 def unadvertise_service():
00091 unadvertise_message_object = {"op":"unadvertise_service",
00092 "service": service_name
00093 }
00094 unadvertise_message = json.dumps(unadvertise_message_object)
00095 tcp_socket.send(str(unadvertise_message))
00096
00097 def wait_for_service_request():
00098 data = None
00099 global buffer
00100
00101 try:
00102 done = False
00103 global buffer
00104 while not done:
00105 incoming = tcp_socket.recv(max_msg_length)
00106 if incoming == '':
00107 print("connection closed by peer")
00108 sys.exit(1)
00109 buffer = buffer + incoming
00110 try:
00111 data_object = json.loads(buffer)
00112 if data_object["op"] == "call_service":
00113 data = buffer
00114 done = True
00115 return data
00116 except Exception as e:
00117
00118
00119 pass
00120
00121
00122 try:
00123 result_string = buffer.split("}{")
00124 result = []
00125 for fragment in result_string:
00126 if fragment[0] != "{":
00127 fragment = "{"+fragment
00128 if fragment[len(fragment)-1] != "}":
00129 fragment = fragment + "}"
00130 result.append(json.loads(fragment))
00131
00132 try:
00133 fragment_count = len(result)
00134 announced = int(result[0]["total"])
00135
00136 if fragment_count == announced:
00137 reconstructed = ""
00138 sorted_result = [None] * fragment_count
00139 unsorted_result = []
00140 for fragment in result:
00141 unsorted_result.append(fragment)
00142 sorted_result[int(fragment["num"])] = fragment
00143
00144 for fragment in sorted_result:
00145 reconstructed = reconstructed + fragment["data"]
00146
00147
00148 buffer = ""
00149 done = True
00150 print("reconstructed message from", len(result), "fragments")
00151
00152 return reconstructed
00153 except Exception as e:
00154 print("not possible to defragment:", buffer)
00155 print(e)
00156 except Exception as e:
00157 print("defrag_error:", buffer)
00158 print(e)
00159 pass
00160 except Exception as e:
00161
00162 pass
00163 return data
00164
00165 def send_service_response(response):
00166 tcp_socket.send(response)
00167
00168 def list_of_fragments(full_message, fragment_size):
00169 message_id = randint(0,64000)
00170 fragments = []
00171 cursor = 0
00172 while cursor < len(full_message):
00173 fragment_begin = cursor
00174 if len(full_message) < cursor + fragment_size:
00175 fragment_end = len(full_message)
00176 cursor = len(full_message)
00177 else:
00178 fragment_end = cursor + fragment_size
00179 cursor += fragment_size
00180 fragment = full_message[fragment_begin:fragment_end]
00181 fragments.append(fragment)
00182
00183 fragmented_messages_list = []
00184 if len(fragments) > 1:
00185 for count, fragment in enumerate(fragments):
00186 fragmented_message_object = {"op":"fragment",
00187 "id": str(message_id),
00188 "data": str(fragment),
00189 "num": count,
00190 "total": len(fragments)
00191 }
00192 fragmented_message = json.dumps(fragmented_message_object)
00193 fragmented_messages_list.append(fragmented_message)
00194 else:
00195 fragmented_messages_list.append(str(fragment))
00196 return fragmented_messages_list
00197
00198
00199
00200
00201
00202
00203
00204
00205 tcp_socket = connect_tcp_socket()
00206 advertise_service()
00207 print("service provider started and waiting for requests")
00208
00209 try:
00210 while True:
00211 data = None
00212 try:
00213 data = wait_for_service_request()
00214 if data == '':
00215 break
00216 elif data != None and len(data) > 0:
00217 response = calculate_service_response(data)
00218
00219 print("response calculated, now splitting into fragments..")
00220 fragment_list = list_of_fragments(response, send_fragment_size)
00221
00222 print("sending", len(fragment_list), "messages as response")
00223 for fragment in fragment_list:
00224
00225 send_service_response(fragment)
00226 time.sleep(send_fragment_delay)
00227 except Exception as e:
00228 print(e)
00229 pass
00230 except KeyboardInterrupt:
00231 try:
00232 unadvertise_service()
00233 tcp_socket.close()
00234 except Exception as e:
00235 print(e)
00236 print("non-ros_service_server stopped because user pressed \"Ctrl-C\"")