test_non-ros_service_server_fragmented.py
Go to the documentation of this file.
00001 #!/usr/bin/python
00002 import sys
00003 import socket
00004 import time
00005 from random import randint
00006 from rosbridge_library.util import json
00007 
00008 
00009 ####################### variables begin ########################################
00010 # these parameters should be changed to match the actual environment           #
00011 ################################################################################
00012 
00013 tcp_socket_timeout = 10                          # seconds
00014 max_msg_length = 20000                           # bytes
00015 
00016 rosbridge_ip = "localhost"                       # hostname or ip
00017 rosbridge_port = 9090                            # port as integer
00018 
00019 service_type = "rosbridge_library/SendBytes"                       # make sure this matches an existing service type on rosbridge-server (in specified srv_module)
00020 service_name = "send_bytes"                      # service name
00021 
00022 send_fragment_size = 1000
00023 # delay between sends to rosbridge is not needed anymore, if using my version of protocol (uses buffer to collect data from stream)
00024 send_fragment_delay = 0.000#1
00025 receive_fragment_size = 10
00026 receive_message_intervall = 0.0
00027 
00028 ####################### variables end ##########################################
00029 
00030 
00031 ####################### service_calculation begin ##############################
00032 # change this function to match whatever service should be provided            #
00033 ################################################################################
00034 
00035 def calculate_service_response(request):
00036     request_object = json.loads(request)                                        # parse string for service request
00037     args = request_object["args"]                                               # get parameter field (args)
00038     count = int(args["count"] )                                                 # get parameter(s) as described in corresponding ROS srv-file
00039     
00040     message = ""
00041     # calculate service response
00042     for i in range(0,count):
00043         #message += str(chr(randint(32,126)))
00044         message+= str(chr(randint(32,126)))
00045         if i% 100000 == 0:
00046             print count - i, "bytes left to generate"
00047 
00048     """
00049     IMPORTANT!
00050     use base64 encoding to avoid JSON-parsing problems!
00051     --> use .decode("base64","strict") at client side
00052     """
00053     message = message.encode('base64','strict')
00054     service_response_data = { "data": message}                                  # service response (as defined in srv-file)
00055 
00056     response_object = { "op": "service_response",
00057                         "id": request_object["id"],
00058                         "data": service_response_data                           # put service response in "data"-field of response object (in this case it's twice "data", because response value is also named data (in srv-file)
00059                       }
00060     response_message = json.dumps(response_object)
00061     return response_message
00062 
00063 ####################### service_calculation end ################################
00064 
00065 
00066 
00067 ####################### helper functions / and variables begin #################
00068 # should not need to be changed (but could be improved )                       #
00069 ################################################################################
00070 
00071 buffer = ""
00072 
00073 def connect_tcp_socket():
00074     tcp_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)                # connect to rosbridge
00075     tcp_sock.settimeout(tcp_socket_timeout)
00076     tcp_sock.connect((rosbridge_ip, rosbridge_port))
00077     return tcp_sock
00078 
00079 def advertise_service():                                                        # advertise service
00080     advertise_message_object = {"op":"advertise_service",
00081                                 "type": service_type,
00082                                 "service": service_name,
00083                                 "fragment_size": receive_fragment_size,
00084                                 "message_intervall": receive_message_intervall
00085                                 }
00086     advertise_message = json.dumps(advertise_message_object)                    
00087     tcp_socket.send(str(advertise_message))
00088 
00089 def unadvertise_service():                                                      # unadvertise service
00090     unadvertise_message_object = {"op":"unadvertise_service",
00091                                   "service": service_name
00092                                  }
00093     unadvertise_message = json.dumps(unadvertise_message_object)                   
00094     tcp_socket.send(str(unadvertise_message))
00095 
00096 def wait_for_service_request():                                                 # receive data from rosbridge
00097     data = None
00098     global buffer
00099 
00100     try:
00101         done = False
00102         global buffer
00103         while not done:
00104             incoming = tcp_socket.recv(max_msg_length)                          # get data from socket
00105             if incoming == '':
00106                 print "connection closed by peer"
00107                 sys.exit(1)
00108             buffer = buffer + incoming                                          # append data to buffer
00109             try:                                                                # try to parse JSON from buffer
00110                 data_object = json.loads(buffer)
00111                 if data_object["op"] == "call_service":
00112                     data = buffer
00113                     done = True
00114                     return data                                                 # if parsing was successful --> return data string
00115             except Exception, e:
00116                 #print "direct_access error:"
00117                 #print e
00118                 pass
00119                
00120             #print "trying to defragment"
00121             try:                                                                # opcode was not "call_service" -> try to defragment
00122                 result_string = buffer.split("}{")                              # split buffer into fragments and re-fill with curly brackets
00123                 result = []
00124                 for fragment in result_string:
00125                     if fragment[0] != "{":
00126                         fragment = "{"+fragment
00127                     if fragment[len(fragment)-1] != "}":
00128                         fragment = fragment + "}"
00129                     result.append(json.loads(fragment))
00130 
00131                 try:                                                            # try to defragment when received all fragments
00132                     fragment_count = len(result)
00133                     announced = int(result[0]["total"])
00134 
00135                     if fragment_count == announced:
00136                         reconstructed = ""
00137                         sorted_result = [None] * fragment_count                 # sort fragments..
00138                         unsorted_result = []
00139                         for fragment in result:
00140                             unsorted_result.append(fragment)
00141                             sorted_result[int(fragment["num"])] = fragment
00142 
00143                         for fragment in sorted_result:                          # reconstruct from fragments
00144                             reconstructed = reconstructed + fragment["data"]
00145 
00146                         #print "reconstructed", reconstructed
00147                         buffer = ""                                             # empty buffer
00148                         done = True
00149                         print "reconstructed message from", len(result), "fragments"
00150                         #print reconstructed
00151                         return reconstructed
00152                 except Exception, e:
00153                     print "not possible to defragment:", buffer
00154                     print e
00155             except Exception, e:
00156                 print "defrag_error:", buffer
00157                 print e
00158                 pass
00159     except Exception, e:
00160         #print "network-error(?):", e
00161         pass
00162     return data
00163 
00164 def send_service_response(response):                                            # send response to rosbridge
00165     tcp_socket.send(response)
00166 
00167 def list_of_fragments(full_message, fragment_size):                             # create fragment messages for a huge message
00168     message_id = randint(0,64000)                                               # generate random message id
00169     fragments = []                                                              # generate list of data fragments
00170     cursor = 0
00171     while cursor < len(full_message):
00172         fragment_begin = cursor
00173         if len(full_message) < cursor + fragment_size:
00174             fragment_end = len(full_message)
00175             cursor = len(full_message)
00176         else:
00177             fragment_end = cursor + fragment_size
00178             cursor += fragment_size
00179         fragment = full_message[fragment_begin:fragment_end]
00180         fragments.append(fragment)
00181 
00182     fragmented_messages_list = []                                               # generate list of fragmented messages (including headers)
00183     if len(fragments) > 1:
00184         for count, fragment in enumerate(fragments):                            # iterate through list and have index counter
00185             fragmented_message_object = {"op":"fragment",                       #   create python-object for each fragment message
00186                                          "id": str(message_id),
00187                                          "data": str(fragment),
00188                                          "num": count,
00189                                          "total": len(fragments)
00190                                          }
00191             fragmented_message = json.dumps(fragmented_message_object)          # create JSON-object from python-object for each fragment message
00192             fragmented_messages_list.append(fragmented_message)                 # append JSON-object to list of fragmented messages
00193     else:                                                                       # if only 1 fragment --> do not send as fragment, but as service_response
00194         fragmented_messages_list.append(str(fragment))
00195     return fragmented_messages_list                                             # return list of 'ready-to-send' fragmented messages
00196 
00197 ####################### helper functions end ###################################
00198 
00199 
00200 ####################### script begin ###########################################
00201 # should not need to be changed (but could be improved )                       #
00202 ################################################################################
00203 
00204 tcp_socket = connect_tcp_socket()                                               # open tcp_socket
00205 advertise_service()                                                             # advertise service in ROS (via rosbridge)
00206 print "service provider started and waiting for requests"
00207 
00208 try:                                                                            # allows to catch KeyboardInterrupt
00209     while True:                                                                 # loop forever (or until ctrl-c is pressed)
00210         data = None
00211         try:                                                                    # allows to catch any Exception (network, json, ..)
00212           data = wait_for_service_request()                                     # receive request from rosbridge
00213           if data == '':                                                        # exit on empty string
00214               break                                                             
00215           elif data != None and len(data) > 0:                                  # received service_request (or at least some data..)
00216             response = calculate_service_response(data)                         # generate service_response
00217 
00218             print "response calculated, now splitting into fragments.."
00219             fragment_list = list_of_fragments(response, send_fragment_size)     # generate fragments to send to rosbridge
00220 
00221             print "sending", len(fragment_list), "messages as response"
00222             for fragment in fragment_list:
00223                 #print "sending:" ,fragment
00224                 send_service_response(fragment)                                 # send service_response to rosbridge (or fragments; just send any list entry)
00225                 time.sleep(send_fragment_delay)                                 # (not needed if using patched rosbridge protocol.py)
00226         except Exception, e:
00227           print e
00228           pass
00229 except KeyboardInterrupt:
00230     try:
00231         unadvertise_service()                                                   # unadvertise service
00232         tcp_socket.close()                                                      # close tcp_socket
00233     except Exception, e:
00234         print e
00235     print "non-ros_service_server stopped because user pressed \"Ctrl-C\""


rosbridge_library
Author(s): Jonathan Mace
autogenerated on Wed Sep 13 2017 03:18:07