Go to the documentation of this file.00001
00002 import sys
00003 import socket
00004 import time
00005 from random import randint
00006 from rosbridge_library.util import json
00007
00008
00009
00010
00011
00012
00013 tcp_socket_timeout = 10
00014 max_msg_length = 20000
00015
00016 rosbridge_ip = "localhost"
00017 rosbridge_port = 9090
00018
00019 service_type = "rosbridge_library/TestNestedService"
00020 service_name = "nested_srv"
00021
00022 send_fragment_size = 1000
00023
00024 send_fragment_delay = 0.000
00025 receive_fragment_size = 10
00026 receive_message_intervall = 0.0
00027
00028
00029
00030
00031
00032
00033
00034
00035 def calculate_service_response(request):
00036 request_object = json.loads(request)
00037 args = request_object["args"]
00038
00039
00040
00041
00042
00043
00044
00045
00046 message = {"data": {"data": 42.0}}
00047
00048 """
00049 IMPORTANT!
00050 use base64 encoding to avoid JSON-parsing problems!
00051 --> use .decode("base64","strict") at client side
00052 """
00053
00054 service_response_data = message
00055
00056 response_object = { "op": "service_response",
00057 "id": request_object["id"],
00058 "data": service_response_data
00059 }
00060 response_message = json.dumps(response_object)
00061 return response_message
00062
00063
00064
00065
00066
00067
00068
00069
00070
00071 buffer = ""
00072
00073 def connect_tcp_socket():
00074 tcp_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
00075 tcp_sock.settimeout(tcp_socket_timeout)
00076 tcp_sock.connect((rosbridge_ip, rosbridge_port))
00077 return tcp_sock
00078
00079 def advertise_service():
00080 advertise_message_object = {"op":"advertise_service",
00081 "type": service_type,
00082 "service": service_name,
00083 "fragment_size": receive_fragment_size,
00084 "message_intervall": receive_message_intervall
00085 }
00086 advertise_message = json.dumps(advertise_message_object)
00087 tcp_socket.send(str(advertise_message))
00088
00089 def unadvertise_service():
00090 unadvertise_message_object = {"op":"unadvertise_service",
00091 "service": service_name
00092 }
00093 unadvertise_message = json.dumps(unadvertise_message_object)
00094 tcp_socket.send(str(unadvertise_message))
00095
00096 def wait_for_service_request():
00097 data = None
00098 global buffer
00099
00100 try:
00101 done = False
00102 global buffer
00103 while not done:
00104 incoming = tcp_socket.recv(max_msg_length)
00105 if incoming == '':
00106 print "connection closed by peer"
00107 sys.exit(1)
00108 buffer = buffer + incoming
00109 try:
00110 data_object = json.loads(buffer)
00111 if data_object["op"] == "call_service":
00112 data = buffer
00113 done = True
00114 return data
00115 except Exception, e:
00116
00117
00118 pass
00119
00120
00121 try:
00122 result_string = buffer.split("}{")
00123 result = []
00124 for fragment in result_string:
00125 if fragment[0] != "{":
00126 fragment = "{"+fragment
00127 if fragment[len(fragment)-1] != "}":
00128 fragment = fragment + "}"
00129 result.append(json.loads(fragment))
00130
00131 try:
00132 fragment_count = len(result)
00133 announced = int(result[0]["total"])
00134
00135 if fragment_count == announced:
00136 reconstructed = ""
00137 sorted_result = [None] * fragment_count
00138 unsorted_result = []
00139 for fragment in result:
00140 unsorted_result.append(fragment)
00141 sorted_result[int(fragment["num"])] = fragment
00142
00143 for fragment in sorted_result:
00144 reconstructed = reconstructed + fragment["data"]
00145
00146
00147 buffer = ""
00148 done = True
00149 print "reconstructed message from", len(result), "fragments"
00150
00151 return reconstructed
00152 except Exception, e:
00153 print "not possible to defragment:", buffer
00154 print e
00155 except Exception, e:
00156 print "defrag_error:", buffer
00157 print e
00158 pass
00159 except Exception, e:
00160
00161 pass
00162 return data
00163
00164 def send_service_response(response):
00165 tcp_socket.send(response)
00166
00167 def list_of_fragments(full_message, fragment_size):
00168 message_id = randint(0,64000)
00169 fragments = []
00170 cursor = 0
00171 while cursor < len(full_message):
00172 fragment_begin = cursor
00173 if len(full_message) < cursor + fragment_size:
00174 fragment_end = len(full_message)
00175 cursor = len(full_message)
00176 else:
00177 fragment_end = cursor + fragment_size
00178 cursor += fragment_size
00179 fragment = full_message[fragment_begin:fragment_end]
00180 fragments.append(fragment)
00181
00182 fragmented_messages_list = []
00183 if len(fragments) > 1:
00184 for count, fragment in enumerate(fragments):
00185 fragmented_message_object = {"op":"fragment",
00186 "id": str(message_id),
00187 "data": str(fragment),
00188 "num": count,
00189 "total": len(fragments)
00190 }
00191 fragmented_message = json.dumps(fragmented_message_object)
00192 fragmented_messages_list.append(fragmented_message)
00193 else:
00194 fragmented_messages_list.append(str(fragment))
00195 return fragmented_messages_list
00196
00197
00198
00199
00200
00201
00202
00203
00204 tcp_socket = connect_tcp_socket()
00205 advertise_service()
00206 print "service provider started and waiting for requests"
00207
00208 try:
00209 while True:
00210 data = None
00211 try:
00212 data = wait_for_service_request()
00213 if data == '':
00214 break
00215 elif data != None and len(data) > 0:
00216 response = calculate_service_response(data)
00217
00218 print "response calculated, now splitting into fragments.."
00219 fragment_list = list_of_fragments(response, send_fragment_size)
00220
00221 print "sending", len(fragment_list), "messages as response"
00222 for fragment in fragment_list:
00223
00224 send_service_response(fragment)
00225 time.sleep(send_fragment_delay)
00226 except Exception, e:
00227 print e
00228 pass
00229 except KeyboardInterrupt:
00230 try:
00231 unadvertise_service()
00232 tcp_socket.close()
00233 except Exception, e:
00234 print e
00235 print "non-ros_service_server stopped because user pressed \"Ctrl-C\""