36 from simple_message
import SimpleMessageSocket, JointPositionMessage, IOStateMessage, StatusMessage, TriStates, RobotMode, WrenchMessage
37 from force_sensor
import ForceSensor
38 from info_catch_client
import InfoCatchClient
41 HOST, PORT =
"0.0.0.0", 11002
46 address = self.client_address[0]
49 [InfoCatchClient.Label.I000,
50 InfoCatchClient.Label.R200,
51 InfoCatchClient.Label.M000,
52 InfoCatchClient.Label.M001,
53 InfoCatchClient.Label.M100,
54 InfoCatchClient.Label.M102,
55 InfoCatchClient.Label.F000,
56 InfoCatchClient.Label.F200,
57 InfoCatchClient.Label.F300,
62 print(
'receive from shared memory')
79 except Exception
as e:
86 msg.joint_data = info[InfoCatchClient.Label.R200] + [0, 0, 0, 0]
92 msg.digital = info[InfoCatchClient.Label.M000] + info[InfoCatchClient.Label.M001]
100 hw_stat =
BitAdapter(info[InfoCatchClient.Label.M100][0])
101 sw_stat =
BitAdapter(info[InfoCatchClient.Label.M102][0])
103 servo_on = hw_stat.as_bool(0)
104 e_stop = hw_stat.as_bool(1)
105 sys_err = sw_stat.as_bool(3)
106 stat_code = sw_stat.subseq(4, 4)
107 err_code = sw_stat.subseq(8, 8)
108 auto_mode = os.path.exists(
'/tmp/auto_ready')
111 msg.mode = RobotMode.AUTO
if auto_mode
else RobotMode.MANUAL
112 msg.e_stopped = TriStates.from_bool(e_stop)
113 msg.drives_powered = TriStates.from_bool(servo_on)
114 msg.motion_possible = TriStates.from_bool(auto_mode
and servo_on
and not sys_err)
115 msg.in_motion = TriStates.UNKNOWN
116 msg.in_error = TriStates.from_bool(sys_err)
117 print(
'{} {} {} {} {} {}'.format(auto_mode, servo_on,
118 e_stop, sys_err, stat_code, err_code))
119 if stat_code
in [SysStat.NORMAL_ERROR,
120 SysStat.CRITICAL_ERROR,
121 SysStat.USER_NORMAL_ERROR,
122 SysStat.USER_CRITICAL_ERROR]:
123 msg.error_code = stat_code << 8 | err_code
131 if self._fs.is_valid():
133 msg.force = [self._fs.fx, self._fs.fy, self._fs.fz]
134 msg.torque = [self._fs.mx, self._fs.my, self._fs.mz]
135 print(
'wrench: {} {}'.format(msg.force, msg.torque))
151 USER_NORMAL_ERROR = 12
152 USER_CRITICAL_ERROR = 13
159 return bool((self.
_num >> n) & 1)
162 mask = (1 << (n + bit_len)) - 1
163 return (self.
_num & mask) >> n
167 self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
168 self.socket.bind(self.server_address)
170 if __name__ ==
"__main__":
172 server.serve_forever()
def make_robot_status(self, info)
def make_wrench(self, info)
def make_io_state(self, info)
def make_joint_position(self, info)
def subseq(self, n, bit_len)