33 from simple_message
import SimpleMessageSocket, \
34 SimpleMessageType, JointTrajPtReplyMessage, ReplyCode, SpecialSequence, \
35 SetIOReplyMessage, IOFunctionType, \
36 SetPostureReplyMessage, GetPostureReplyMessage, \
37 SysStatReplyMessage, \
38 SetToolOffsetReplyMessage
39 from robot_controller
import RobotController
41 HOST, PORT =
"0.0.0.0", 11003
47 address = self.client_address[0]
48 print(
'connected from {}'.format(address))
61 recv_msg = sock.recv()
63 msg_handler = msg_handlers.get(
66 reply_msg = msg_handler(ctl, recv_msg)
73 print(recv_msg.address)
74 print(recv_msg.data_size)
77 if recv_msg.fun == IOFunctionType.SET_DIGITAL_OUT:
79 recv_msg.address, recv_msg.data[0:recv_msg.data_size])
80 elif recv_msg.fun == IOFunctionType.SET_ADC_MODE:
81 result = ctl.set_adc_mode(recv_msg.address, recv_msg.data[0])
83 reply_msg = SetIOReplyMessage()
84 reply_msg.reply_code = ReplyCode.SUCCESS
if result
else ReplyCode.FAILURE
85 reply_msg.result = reply_msg.Result.SUCCESS
if result
else reply_msg.Result.FAILURE
86 print(
"received: I/O message")
91 print(recv_msg.posture)
92 print(
"received: Set posture message")
94 result = ctl.set_posture(recv_msg.posture)
96 reply_msg = SetPostureReplyMessage()
97 reply_msg.reply_code = ReplyCode.SUCCESS
if result
else ReplyCode.FAILURE
102 print(
"received: Get posture message")
104 posture = ctl.get_posture()
106 reply_msg = GetPostureReplyMessage()
108 reply_msg.reply_code = ReplyCode.FAILURE
110 reply_msg.reply_code = ReplyCode.SUCCESS
111 reply_msg.posture = posture
116 print(recv_msg.stat_type)
118 result = ctl.sys_stat(recv_msg.stat_type)
120 reply_msg = SysStatReplyMessage()
121 reply_msg.reply_code = ReplyCode.SUCCESS
if result
else ReplyCode.FAILURE
122 print(
"result: {}".format(result))
123 reply_msg.result = result
124 print(
"received: sys_stat message")
129 print(
"received: Set tool offset message")
130 result = ctl.set_tool_offset(recv_msg.x, recv_msg.y, recv_msg.z,
131 recv_msg.rz, recv_msg.ry, recv_msg.rx)
132 print(
"result: {}".format(result))
134 reply_msg = SetToolOffsetReplyMessage()
135 reply_msg.reply_code = SetToolOffsetReplyMessage.Result.SUCCESS
if result
else SetToolOffsetReplyMessage.Result.FAILURE
140 raise NotImplementedError(
141 "Unknown msg_type: {}".format(recv_msg.msg_type))
146 self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
147 self.socket.bind(self.server_address)
150 if __name__ ==
"__main__":
152 server.serve_forever()
def on_set_tool_offset(self, ctl, recv_msg)
def on_set_posture(self, ctl, recv_msg)
def on_set_io(self, ctl, recv_msg)
def on_unkown_message(self, ctl, recv_msg)
def on_get_posture(self, ctl, recv_msg)
def on_sys_stat(self, ctl, recv_msg)